… | |
… | |
208 | # this function adds a node-ref, so you can send stuff to it |
208 | # this function adds a node-ref, so you can send stuff to it |
209 | # it is basically the central routing component. |
209 | # it is basically the central routing component. |
210 | sub add_node { |
210 | sub add_node { |
211 | my ($node) = @_; |
211 | my ($node) = @_; |
212 | |
212 | |
|
|
213 | defined $node #d#UGLY |
|
|
214 | or Carp::croak "'undef' is not a valid node ID/port ID"; |
|
|
215 | |
213 | $NODE{$node} ||= new AnyEvent::MP::Node::Remote $node |
216 | $NODE{$node} ||= new AnyEvent::MP::Node::Remote $node |
214 | } |
217 | } |
215 | |
218 | |
216 | sub snd(@) { |
219 | sub snd(@) { |
217 | my ($nodeid, $portid) = split /#/, shift, 2; |
220 | my ($nodeid, $portid) = split /#/, shift, 2; |
218 | |
221 | |
219 | warn "SND $nodeid <- " . eval { JSON::XS->new->encode ([$portid, @_]) } . "\n" if TRACE && @_;#d# |
222 | warn "SND $nodeid <- " . eval { JSON::XS->new->encode ([$portid, @_]) } . "\n" if TRACE && @_;#d# |
220 | |
|
|
221 | defined $nodeid #d#UGLY |
|
|
222 | or Carp::croak "'undef' is not a valid node ID/port ID"; |
|
|
223 | |
223 | |
224 | ($NODE{$nodeid} || add_node $nodeid) |
224 | ($NODE{$nodeid} || add_node $nodeid) |
225 | ->{send} (["$portid", @_]); |
225 | ->{send} (["$portid", @_]); |
226 | } |
226 | } |
227 | |
227 | |
… | |
… | |
252 | # on $NODE, we artificially delay... (for spawn) |
252 | # on $NODE, we artificially delay... (for spawn) |
253 | # this is very ugly - maybe we should simply delay ALL messages, |
253 | # this is very ugly - maybe we should simply delay ALL messages, |
254 | # to avoid deep recursion issues. but that's so... slow... |
254 | # to avoid deep recursion issues. but that's so... slow... |
255 | $AnyEvent::MP::Node::Self::DELAY = 1 |
255 | $AnyEvent::MP::Node::Self::DELAY = 1 |
256 | if $nodeid ne $NODE; |
256 | if $nodeid ne $NODE; |
257 | |
|
|
258 | defined $nodeid #d#UGLY |
|
|
259 | or Carp::croak "'undef' is not a valid node ID/port ID"; |
|
|
260 | |
257 | |
261 | ($NODE{$nodeid} || add_node $nodeid)->{send} (["", @_]); |
258 | ($NODE{$nodeid} || add_node $nodeid)->{send} (["", @_]); |
262 | } |
259 | } |
263 | |
260 | |
264 | =item snd_on $node, @msg |
261 | =item snd_on $node, @msg |
… | |
… | |
519 | |
516 | |
520 | # if we connect to ourselves, nuke this seed, but make sure we act like a seed |
517 | # if we connect to ourselves, nuke this seed, but make sure we act like a seed |
521 | if ($_[0]{remote_node} eq $AnyEvent::MP::Kernel::NODE) { |
518 | if ($_[0]{remote_node} eq $AnyEvent::MP::Kernel::NODE) { |
522 | require AnyEvent::MP::Global; # every seed becomes a global node currently |
519 | require AnyEvent::MP::Global; # every seed becomes a global node currently |
523 | delete $SEED_NODE{$seed}; |
520 | delete $SEED_NODE{$seed}; |
524 | delete $NODE_SEED{$seed}; |
|
|
525 | } else { |
521 | } else { |
526 | $SEED_NODE{$seed} = $_[0]{remote_node}; |
522 | $SEED_NODE{$seed} = $_[0]{remote_node}; |
527 | $NODE_SEED{$_[0]{remote_node}} = $seed; |
523 | $NODE_SEED{$_[0]{remote_node}} = $seed; |
|
|
524 | # also start global service, if not running |
|
|
525 | # we need to check here in addition to the mon_nodes below |
|
|
526 | # because we might only learn late that a node is a seed |
|
|
527 | # and then we might already be connected |
|
|
528 | snd $_[0]{remote_node}, "g_slave" |
|
|
529 | unless $_[0]{remote_greeting}{global}; |
528 | } |
530 | } |
529 | }, |
531 | }, |
530 | on_destroy => sub { |
532 | sub { |
531 | delete $SEED_CONNECT{$seed}; |
533 | delete $SEED_CONNECT{$seed}; |
532 | }, |
|
|
533 | sub { |
|
|
534 | $SEED_CONNECT{$seed} = 1; |
|
|
535 | } |
534 | } |
536 | ; |
535 | ; |
537 | } |
536 | } |
538 | |
537 | |
539 | sub seed_all { |
538 | sub seed_all { |
540 | my @seeds; |
539 | my @seeds = grep |
541 | |
540 | !exists $SEED_CONNECT{$_} |
542 | for (grep !exists $SEED_CONNECT{$_}, keys %SEED_NODE) { |
|
|
543 | if (defined $SEED_NODE{$_} && node_is_up $SEED_NODE{$_})) { |
541 | && !(defined $SEED_NODE{$_} && node_is_up $SEED_NODE{$_}), |
544 | # node is up, make sure it's running the global service |
542 | keys %SEED_NODE; |
545 | snd $_, "g_slave" |
|
|
546 | unless $NODE{$_}{transport}{remote_greeting}{global}; |
|
|
547 | } else { |
|
|
548 | # else node is down, we need to seed |
|
|
549 | push @seeds, $_; |
|
|
550 | } |
|
|
551 | } |
|
|
552 | |
543 | |
553 | if (@seeds) { |
544 | if (@seeds) { |
554 | # start connection attempt for every seed we are not connected to yet |
545 | # start connection attempt for every seed we are not connected to yet |
555 | seed_connect $_ |
546 | seed_connect $_ |
556 | for @seeds; |
547 | for @seeds; |
… | |
… | |
578 | %NODE_SEED = (); |
569 | %NODE_SEED = (); |
579 | %SEED_CONNECT = (); |
570 | %SEED_CONNECT = (); |
580 | |
571 | |
581 | @SEED_NODE{@_} = (); |
572 | @SEED_NODE{@_} = (); |
582 | |
573 | |
583 | seed_again;#d# |
|
|
584 | seed_all; |
574 | seed_all; |
585 | } |
575 | } |
586 | |
576 | |
587 | mon_nodes sub { |
577 | mon_nodes sub { |
|
|
578 | return unless exists $NODE_SEED{$_[0]}; |
|
|
579 | |
|
|
580 | if ($_[1]) { |
|
|
581 | # each time a connection to a seed node goes up, make |
|
|
582 | # sure it runs the global service. |
|
|
583 | snd $_[0], "g_slave" |
|
|
584 | unless $NODE{$_[0]}{transport}{remote_greeting}{global}; |
|
|
585 | } else { |
588 | # if we lost the connection to a seed node, make sure we are seeding |
586 | # if we lost the connection to a seed node, make sure we are seeding |
589 | seed_again |
587 | seed_again; |
590 | if !$_[1] && exists $NODE_SEED{$_[0]}; |
588 | } |
591 | }; |
589 | }; |
592 | |
590 | |
593 | ############################################################################# |
591 | ############################################################################# |
594 | # talk with/to global nodes |
592 | # talk with/to global nodes |
595 | |
593 | |
… | |
… | |
1028 | # connect to all seednodes |
1026 | # connect to all seednodes |
1029 | set_seeds map $_->recv, map _resolve $_, @$seeds; |
1027 | set_seeds map $_->recv, map _resolve $_, @$seeds; |
1030 | |
1028 | |
1031 | master_search; |
1029 | master_search; |
1032 | |
1030 | |
1033 | if ($NODE eq "atha") {;#d# |
|
|
1034 | my $w; $w = AE::timer 4, 0, sub { undef $w; require AnyEvent::MP::Global };#d# |
|
|
1035 | } |
|
|
1036 | |
|
|
1037 | for (@{ $CONFIG->{services} }) { |
1031 | for (@{ $CONFIG->{services} }) { |
1038 | if (ref) { |
1032 | if (ref) { |
1039 | my ($func, @args) = @$_; |
1033 | my ($func, @args) = @$_; |
1040 | (load_func $func)->(@args); |
1034 | (load_func $func)->(@args); |
1041 | } elsif (s/::$//) { |
1035 | } elsif (s/::$//) { |