ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/cvsroot/AnyEvent-MP/MP/Kernel.pm
(Generate patch)

Comparing cvsroot/AnyEvent-MP/MP/Kernel.pm (file contents):
Revision 1.92 by root, Wed Mar 14 22:59:58 2012 UTC vs.
Revision 1.93 by root, Wed Mar 14 23:34:10 2012 UTC

208# this function adds a node-ref, so you can send stuff to it 208# this function adds a node-ref, so you can send stuff to it
209# it is basically the central routing component. 209# it is basically the central routing component.
210sub add_node { 210sub add_node {
211 my ($node) = @_; 211 my ($node) = @_;
212 212
213 defined $node #d#UGLY
214 or Carp::croak "'undef' is not a valid node ID/port ID";
215
213 $NODE{$node} ||= new AnyEvent::MP::Node::Remote $node 216 $NODE{$node} ||= new AnyEvent::MP::Node::Remote $node
214} 217}
215 218
216sub snd(@) { 219sub snd(@) {
217 my ($nodeid, $portid) = split /#/, shift, 2; 220 my ($nodeid, $portid) = split /#/, shift, 2;
218 221
219 warn "SND $nodeid <- " . eval { JSON::XS->new->encode ([$portid, @_]) } . "\n" if TRACE && @_;#d# 222 warn "SND $nodeid <- " . eval { JSON::XS->new->encode ([$portid, @_]) } . "\n" if TRACE && @_;#d#
220
221 defined $nodeid #d#UGLY
222 or Carp::croak "'undef' is not a valid node ID/port ID";
223 223
224 ($NODE{$nodeid} || add_node $nodeid) 224 ($NODE{$nodeid} || add_node $nodeid)
225 ->{send} (["$portid", @_]); 225 ->{send} (["$portid", @_]);
226} 226}
227 227
252 # on $NODE, we artificially delay... (for spawn) 252 # on $NODE, we artificially delay... (for spawn)
253 # this is very ugly - maybe we should simply delay ALL messages, 253 # this is very ugly - maybe we should simply delay ALL messages,
254 # to avoid deep recursion issues. but that's so... slow... 254 # to avoid deep recursion issues. but that's so... slow...
255 $AnyEvent::MP::Node::Self::DELAY = 1 255 $AnyEvent::MP::Node::Self::DELAY = 1
256 if $nodeid ne $NODE; 256 if $nodeid ne $NODE;
257
258 defined $nodeid #d#UGLY
259 or Carp::croak "'undef' is not a valid node ID/port ID";
260 257
261 ($NODE{$nodeid} || add_node $nodeid)->{send} (["", @_]); 258 ($NODE{$nodeid} || add_node $nodeid)->{send} (["", @_]);
262} 259}
263 260
264=item snd_on $node, @msg 261=item snd_on $node, @msg
519 516
520 # if we connect to ourselves, nuke this seed, but make sure we act like a seed 517 # if we connect to ourselves, nuke this seed, but make sure we act like a seed
521 if ($_[0]{remote_node} eq $AnyEvent::MP::Kernel::NODE) { 518 if ($_[0]{remote_node} eq $AnyEvent::MP::Kernel::NODE) {
522 require AnyEvent::MP::Global; # every seed becomes a global node currently 519 require AnyEvent::MP::Global; # every seed becomes a global node currently
523 delete $SEED_NODE{$seed}; 520 delete $SEED_NODE{$seed};
524 delete $NODE_SEED{$seed};
525 } else { 521 } else {
526 $SEED_NODE{$seed} = $_[0]{remote_node}; 522 $SEED_NODE{$seed} = $_[0]{remote_node};
527 $NODE_SEED{$_[0]{remote_node}} = $seed; 523 $NODE_SEED{$_[0]{remote_node}} = $seed;
524 # also start global service, if not running
525 # we need to check here in addition to the mon_nodes below
526 # because we might only learn late that a node is a seed
527 # and then we might already be connected
528 snd $_[0]{remote_node}, "g_slave"
529 unless $_[0]{remote_greeting}{global};
528 } 530 }
529 }, 531 },
530 on_destroy => sub { 532 sub {
531 delete $SEED_CONNECT{$seed}; 533 delete $SEED_CONNECT{$seed};
532 },
533 sub {
534 $SEED_CONNECT{$seed} = 1;
535 } 534 }
536 ; 535 ;
537} 536}
538 537
539sub seed_all { 538sub seed_all {
540 my @seeds; 539 my @seeds = grep
541 540 !exists $SEED_CONNECT{$_}
542 for (grep !exists $SEED_CONNECT{$_}, keys %SEED_NODE) {
543 if (defined $SEED_NODE{$_} && node_is_up $SEED_NODE{$_})) { 541 && !(defined $SEED_NODE{$_} && node_is_up $SEED_NODE{$_}),
544 # node is up, make sure it's running the global service 542 keys %SEED_NODE;
545 snd $_, "g_slave"
546 unless $NODE{$_}{transport}{remote_greeting}{global};
547 } else {
548 # else node is down, we need to seed
549 push @seeds, $_;
550 }
551 }
552 543
553 if (@seeds) { 544 if (@seeds) {
554 # start connection attempt for every seed we are not connected to yet 545 # start connection attempt for every seed we are not connected to yet
555 seed_connect $_ 546 seed_connect $_
556 for @seeds; 547 for @seeds;
578 %NODE_SEED = (); 569 %NODE_SEED = ();
579 %SEED_CONNECT = (); 570 %SEED_CONNECT = ();
580 571
581 @SEED_NODE{@_} = (); 572 @SEED_NODE{@_} = ();
582 573
583 seed_again;#d#
584 seed_all; 574 seed_all;
585} 575}
586 576
587mon_nodes sub { 577mon_nodes sub {
578 return unless exists $NODE_SEED{$_[0]};
579
580 if ($_[1]) {
581 # each time a connection to a seed node goes up, make
582 # sure it runs the global service.
583 snd $_[0], "g_slave"
584 unless $NODE{$_[0]}{transport}{remote_greeting}{global};
585 } else {
588 # if we lost the connection to a seed node, make sure we are seeding 586 # if we lost the connection to a seed node, make sure we are seeding
589 seed_again 587 seed_again;
590 if !$_[1] && exists $NODE_SEED{$_[0]}; 588 }
591}; 589};
592 590
593############################################################################# 591#############################################################################
594# talk with/to global nodes 592# talk with/to global nodes
595 593
1028 # connect to all seednodes 1026 # connect to all seednodes
1029 set_seeds map $_->recv, map _resolve $_, @$seeds; 1027 set_seeds map $_->recv, map _resolve $_, @$seeds;
1030 1028
1031 master_search; 1029 master_search;
1032 1030
1033 if ($NODE eq "atha") {;#d#
1034 my $w; $w = AE::timer 4, 0, sub { undef $w; require AnyEvent::MP::Global };#d#
1035 }
1036
1037 for (@{ $CONFIG->{services} }) { 1031 for (@{ $CONFIG->{services} }) {
1038 if (ref) { 1032 if (ref) {
1039 my ($func, @args) = @$_; 1033 my ($func, @args) = @$_;
1040 (load_func $func)->(@args); 1034 (load_func $func)->(@args);
1041 } elsif (s/::$//) { 1035 } elsif (s/::$//) {

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines