… | |
… | |
198 | sub delay($) { |
198 | sub delay($) { |
199 | push @DELAY_QUEUE, shift; |
199 | push @DELAY_QUEUE, shift; |
200 | $DELAY_TIMER ||= AE::timer 0, 0, \&_delay_run; |
200 | $DELAY_TIMER ||= AE::timer 0, 0, \&_delay_run; |
201 | } |
201 | } |
202 | |
202 | |
|
|
203 | =item $AnyEvent::MP::Kernel::SRCNODE |
|
|
204 | |
|
|
205 | During execution of a message callback, this variable contains the node ID |
|
|
206 | of the origin node. |
|
|
207 | |
|
|
208 | The main use of this variable is for debugging output - there are probably |
|
|
209 | very few other cases where you need to know the source node ID. |
|
|
210 | |
|
|
211 | =cut |
|
|
212 | |
203 | sub _inject { |
213 | sub _inject { |
204 | warn "RCV $SRCNODE->{id} -> " . eval { JSON::XS->new->encode (\@_) } . "\n" if TRACE && @_;#d# |
214 | warn "RCV $SRCNODE -> " . eval { JSON::XS->new->encode (\@_) } . "\n" if TRACE && @_;#d# |
205 | &{ $PORT{+shift} or return }; |
215 | &{ $PORT{+shift} or return }; |
206 | } |
216 | } |
207 | |
217 | |
208 | # this function adds a node-ref, so you can send stuff to it |
218 | # this function adds a node-ref, so you can send stuff to it |
209 | # it is basically the central routing component. |
219 | # it is basically the central routing component. |
… | |
… | |
398 | delete $LMON{$_[1]}{$_[2]+0} |
408 | delete $LMON{$_[1]}{$_[2]+0} |
399 | if exists $LMON{$_[1]}; |
409 | if exists $LMON{$_[1]}; |
400 | } |
410 | } |
401 | |
411 | |
402 | sub _secure_check { |
412 | sub _secure_check { |
403 | $SECURE->($SRCNODE->{id}) |
413 | &$SECURE |
404 | or $SRCNODE->{id} eq $NODE |
|
|
405 | or die "remote execution attempt by insecure node\n"; |
414 | or die "remote execution attempt by insecure node\n"; |
406 | } |
415 | } |
407 | |
416 | |
408 | our %NODE_REQ = ( |
417 | our %NODE_REQ = ( |
409 | # internal services |
418 | # internal services |
410 | |
419 | |
411 | # monitoring |
420 | # monitoring |
412 | mon0 => sub { # stop monitoring a port for another node |
421 | mon0 => sub { # stop monitoring a port for another node |
413 | my $portid = shift; |
422 | my $portid = shift; |
414 | _unmonitor undef, $portid, delete $NODE{$SRCNODE->{id}}{rmon}{$portid}; |
423 | _unmonitor undef, $portid, delete $NODE{$SRCNODE}{rmon}{$portid}; |
415 | }, |
424 | }, |
416 | mon1 => sub { # start monitoring a port for another node |
425 | mon1 => sub { # start monitoring a port for another node |
417 | my $portid = shift; |
426 | my $portid = shift; |
418 | Scalar::Util::weaken (my $node = $NODE{$SRCNODE->{id}}); |
427 | Scalar::Util::weaken (my $node = $NODE{$SRCNODE}); |
419 | _monitor undef, $portid, $node->{rmon}{$portid} = sub { |
428 | _monitor undef, $portid, $node->{rmon}{$portid} = sub { |
420 | delete $node->{rmon}{$portid}; |
429 | delete $node->{rmon}{$portid}; |
421 | $node->send (["", kil0 => $portid, @_]) |
430 | $node->send (["", kil0 => $portid, @_]) |
422 | if $node && $node->{transport}; |
431 | if $node && $node->{transport}; |
423 | }; |
432 | }; |
424 | }, |
433 | }, |
425 | # another node has killed a monitored port |
434 | # another node has killed a monitored port |
426 | kil0 => sub { |
435 | kil0 => sub { |
427 | my $cbs = delete $NODE{$SRCNODE->{id}}{lmon}{+shift} |
436 | my $cbs = delete $NODE{$SRCNODE}{lmon}{+shift} |
428 | or return; |
437 | or return; |
429 | |
438 | |
430 | $_->(@_) for @$cbs; |
439 | $_->(@_) for @$cbs; |
431 | }, |
440 | }, |
432 | |
441 | |
433 | # "public" services - not actually public |
442 | # "public" services - not actually public |
434 | |
443 | |
435 | # another node wants to kill a local port |
444 | # another node wants to kill a local port |
436 | kil => \&_kill, |
445 | kil => \&_kill, |
437 | |
|
|
438 | # is the remote node considered secure? |
|
|
439 | # secure => sub { |
|
|
440 | # #TODO# |
|
|
441 | # }, |
|
|
442 | |
446 | |
443 | # relay message to another node / generic echo |
447 | # relay message to another node / generic echo |
444 | snd => sub { |
448 | snd => sub { |
445 | &_secure_check; |
449 | &_secure_check; |
446 | &snd |
450 | &snd |
… | |
… | |
466 | |
470 | |
467 | $NODE{""} = $NODE{$NODE} = new AnyEvent::MP::Node::Self $NODE; |
471 | $NODE{""} = $NODE{$NODE} = new AnyEvent::MP::Node::Self $NODE; |
468 | $PORT{""} = sub { |
472 | $PORT{""} = sub { |
469 | my $tag = shift; |
473 | my $tag = shift; |
470 | eval { &{ $NODE_REQ{$tag} ||= do { &_secure_check; load_func $tag } } }; |
474 | eval { &{ $NODE_REQ{$tag} ||= do { &_secure_check; load_func $tag } } }; |
471 | $WARN->(2, "error processing node message from $SRCNODE->{id}: $@") if $@; |
475 | $WARN->(2, "error processing node message from $SRCNODE: $@") if $@; |
472 | }; |
476 | }; |
473 | |
477 | |
474 | our $NPROTO = 1; |
478 | our $NPROTO = 1; |
475 | |
479 | |
476 | # tell everybody who connects our nproto |
480 | # tell everybody who connects our nproto |
… | |
… | |
810 | } |
814 | } |
811 | } |
815 | } |
812 | |
816 | |
813 | # full update |
817 | # full update |
814 | $NODE_REQ{g_chg1} = sub { |
818 | $NODE_REQ{g_chg1} = sub { |
|
|
819 | return unless $SRCNODE eq $MASTER; |
815 | my ($f, $ndb) = @_; |
820 | my ($f, $ndb) = @_; |
816 | |
821 | |
817 | my $db = $MON_DB{$f}; |
822 | my $db = $MON_DB{$f}; |
818 | my (@a, @c, @d); |
823 | my (@a, @c, @d); |
819 | |
824 | |
… | |
… | |
835 | for values %{ $LOCAL_MON{$_[0]} }; |
840 | for values %{ $LOCAL_MON{$_[0]} }; |
836 | }; |
841 | }; |
837 | |
842 | |
838 | # incremental update |
843 | # incremental update |
839 | $NODE_REQ{g_chg2} = sub { |
844 | $NODE_REQ{g_chg2} = sub { |
|
|
845 | return unless $SRCNODE eq $MASTER; |
840 | my ($family, $set, $del) = @_; |
846 | my ($family, $set, $del) = @_; |
841 | |
847 | |
842 | my $db = $MON_DB{$family}; |
848 | my $db = $MON_DB{$family}; |
843 | |
849 | |
844 | my (@a, @c); |
850 | my (@a, @c); |
… | |
… | |
958 | unless defined $profile; |
964 | unless defined $profile; |
959 | |
965 | |
960 | $CONFIG = AnyEvent::MP::Config::find_profile $profile, %kv; |
966 | $CONFIG = AnyEvent::MP::Config::find_profile $profile, %kv; |
961 | |
967 | |
962 | if (exists $CONFIG->{secure}) { |
968 | if (exists $CONFIG->{secure}) { |
963 | my $pass = !$CONFIG->{secure}; |
969 | $SECURE = eval $CONFIG->{secure} ? "sub { 0 }" : "sub { 1 }"; |
964 | $SECURE = sub { $pass }; |
|
|
965 | } |
970 | } |
966 | |
971 | |
967 | my $node = exists $CONFIG->{nodeid} ? $CONFIG->{nodeid} : "$profile/"; |
972 | my $node = exists $CONFIG->{nodeid} ? $CONFIG->{nodeid} : "$profile/"; |
968 | |
973 | |
969 | $node or Carp::croak "$node: illegal node ID (see AnyEvent::MP manpage for syntax)\n"; |
974 | $node or Carp::croak "$node: illegal node ID (see AnyEvent::MP manpage for syntax)\n"; |