ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Kernel.pm
(Generate patch)

Comparing AnyEvent-MP/MP/Kernel.pm (file contents):
Revision 1.95 by root, Wed Mar 21 00:14:25 2012 UTC vs.
Revision 1.96 by root, Wed Mar 21 15:22:16 2012 UTC

198sub delay($) { 198sub delay($) {
199 push @DELAY_QUEUE, shift; 199 push @DELAY_QUEUE, shift;
200 $DELAY_TIMER ||= AE::timer 0, 0, \&_delay_run; 200 $DELAY_TIMER ||= AE::timer 0, 0, \&_delay_run;
201} 201}
202 202
203=item $AnyEvent::MP::Kernel::SRCNODE
204
205During execution of a message callback, this variable contains the node ID
206of the origin node.
207
208The main use of this variable is for debugging output - there are probably
209very few other cases where you need to know the source node ID.
210
211=cut
212
203sub _inject { 213sub _inject {
204 warn "RCV $SRCNODE->{id} -> " . eval { JSON::XS->new->encode (\@_) } . "\n" if TRACE && @_;#d# 214 warn "RCV $SRCNODE -> " . eval { JSON::XS->new->encode (\@_) } . "\n" if TRACE && @_;#d#
205 &{ $PORT{+shift} or return }; 215 &{ $PORT{+shift} or return };
206} 216}
207 217
208# this function adds a node-ref, so you can send stuff to it 218# this function adds a node-ref, so you can send stuff to it
209# it is basically the central routing component. 219# it is basically the central routing component.
398 delete $LMON{$_[1]}{$_[2]+0} 408 delete $LMON{$_[1]}{$_[2]+0}
399 if exists $LMON{$_[1]}; 409 if exists $LMON{$_[1]};
400} 410}
401 411
402sub _secure_check { 412sub _secure_check {
403 $SECURE->($SRCNODE->{id}) 413 &$SECURE
404 or $SRCNODE->{id} eq $NODE
405 or die "remote execution attempt by insecure node\n"; 414 or die "remote execution attempt by insecure node\n";
406} 415}
407 416
408our %NODE_REQ = ( 417our %NODE_REQ = (
409 # internal services 418 # internal services
410 419
411 # monitoring 420 # monitoring
412 mon0 => sub { # stop monitoring a port for another node 421 mon0 => sub { # stop monitoring a port for another node
413 my $portid = shift; 422 my $portid = shift;
414 _unmonitor undef, $portid, delete $NODE{$SRCNODE->{id}}{rmon}{$portid}; 423 _unmonitor undef, $portid, delete $NODE{$SRCNODE}{rmon}{$portid};
415 }, 424 },
416 mon1 => sub { # start monitoring a port for another node 425 mon1 => sub { # start monitoring a port for another node
417 my $portid = shift; 426 my $portid = shift;
418 Scalar::Util::weaken (my $node = $NODE{$SRCNODE->{id}}); 427 Scalar::Util::weaken (my $node = $NODE{$SRCNODE});
419 _monitor undef, $portid, $node->{rmon}{$portid} = sub { 428 _monitor undef, $portid, $node->{rmon}{$portid} = sub {
420 delete $node->{rmon}{$portid}; 429 delete $node->{rmon}{$portid};
421 $node->send (["", kil0 => $portid, @_]) 430 $node->send (["", kil0 => $portid, @_])
422 if $node && $node->{transport}; 431 if $node && $node->{transport};
423 }; 432 };
424 }, 433 },
425 # another node has killed a monitored port 434 # another node has killed a monitored port
426 kil0 => sub { 435 kil0 => sub {
427 my $cbs = delete $NODE{$SRCNODE->{id}}{lmon}{+shift} 436 my $cbs = delete $NODE{$SRCNODE}{lmon}{+shift}
428 or return; 437 or return;
429 438
430 $_->(@_) for @$cbs; 439 $_->(@_) for @$cbs;
431 }, 440 },
432 441
433 # "public" services - not actually public 442 # "public" services - not actually public
434 443
435 # another node wants to kill a local port 444 # another node wants to kill a local port
436 kil => \&_kill, 445 kil => \&_kill,
437
438 # is the remote node considered secure?
439# secure => sub {
440# #TODO#
441# },
442 446
443 # relay message to another node / generic echo 447 # relay message to another node / generic echo
444 snd => sub { 448 snd => sub {
445 &_secure_check; 449 &_secure_check;
446 &snd 450 &snd
466 470
467$NODE{""} = $NODE{$NODE} = new AnyEvent::MP::Node::Self $NODE; 471$NODE{""} = $NODE{$NODE} = new AnyEvent::MP::Node::Self $NODE;
468$PORT{""} = sub { 472$PORT{""} = sub {
469 my $tag = shift; 473 my $tag = shift;
470 eval { &{ $NODE_REQ{$tag} ||= do { &_secure_check; load_func $tag } } }; 474 eval { &{ $NODE_REQ{$tag} ||= do { &_secure_check; load_func $tag } } };
471 $WARN->(2, "error processing node message from $SRCNODE->{id}: $@") if $@; 475 $WARN->(2, "error processing node message from $SRCNODE: $@") if $@;
472}; 476};
473 477
474our $NPROTO = 1; 478our $NPROTO = 1;
475 479
476# tell everybody who connects our nproto 480# tell everybody who connects our nproto
810 } 814 }
811} 815}
812 816
813# full update 817# full update
814$NODE_REQ{g_chg1} = sub { 818$NODE_REQ{g_chg1} = sub {
819 return unless $SRCNODE eq $MASTER;
815 my ($f, $ndb) = @_; 820 my ($f, $ndb) = @_;
816 821
817 my $db = $MON_DB{$f}; 822 my $db = $MON_DB{$f};
818 my (@a, @c, @d); 823 my (@a, @c, @d);
819 824
835 for values %{ $LOCAL_MON{$_[0]} }; 840 for values %{ $LOCAL_MON{$_[0]} };
836}; 841};
837 842
838# incremental update 843# incremental update
839$NODE_REQ{g_chg2} = sub { 844$NODE_REQ{g_chg2} = sub {
845 return unless $SRCNODE eq $MASTER;
840 my ($family, $set, $del) = @_; 846 my ($family, $set, $del) = @_;
841 847
842 my $db = $MON_DB{$family}; 848 my $db = $MON_DB{$family};
843 849
844 my (@a, @c); 850 my (@a, @c);
958 unless defined $profile; 964 unless defined $profile;
959 965
960 $CONFIG = AnyEvent::MP::Config::find_profile $profile, %kv; 966 $CONFIG = AnyEvent::MP::Config::find_profile $profile, %kv;
961 967
962 if (exists $CONFIG->{secure}) { 968 if (exists $CONFIG->{secure}) {
963 my $pass = !$CONFIG->{secure}; 969 $SECURE = eval $CONFIG->{secure} ? "sub { 0 }" : "sub { 1 }";
964 $SECURE = sub { $pass };
965 } 970 }
966 971
967 my $node = exists $CONFIG->{nodeid} ? $CONFIG->{nodeid} : "$profile/"; 972 my $node = exists $CONFIG->{nodeid} ? $CONFIG->{nodeid} : "$profile/";
968 973
969 $node or Carp::croak "$node: illegal node ID (see AnyEvent::MP manpage for syntax)\n"; 974 $node or Carp::croak "$node: illegal node ID (see AnyEvent::MP manpage for syntax)\n";

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines