… | |
… | |
187 | } |
187 | } |
188 | |
188 | |
189 | # this function adds a node-ref, so you can send stuff to it |
189 | # this function adds a node-ref, so you can send stuff to it |
190 | # it is basically the central routing component. |
190 | # it is basically the central routing component. |
191 | sub add_node { |
191 | sub add_node { |
|
|
192 | $NODE{$_[0]} || do { |
192 | my ($node) = @_; |
193 | my ($node) = @_; |
193 | |
194 | |
194 | length $node |
195 | length $node |
195 | or Carp::croak "'undef' or the empty string are not valid node/port IDs"; |
196 | or Carp::croak "'undef' or the empty string are not valid node/port IDs"; |
196 | |
197 | |
|
|
198 | # registers itself in %NODE |
197 | $NODE{$node} ||= new AnyEvent::MP::Node::Remote $node |
199 | new AnyEvent::MP::Node::Remote $node |
|
|
200 | } |
198 | } |
201 | } |
199 | |
202 | |
200 | sub snd(@) { |
203 | sub snd(@) { |
201 | my ($nodeid, $portid) = split /#/, shift, 2; |
204 | my ($nodeid, $portid) = split /#/, shift, 2; |
202 | |
205 | |
… | |
… | |
274 | =item $bool = node_is_up $nodeid |
277 | =item $bool = node_is_up $nodeid |
275 | |
278 | |
276 | Returns true if the given node is "up", that is, the kernel thinks it has |
279 | Returns true if the given node is "up", that is, the kernel thinks it has |
277 | a working connection to it. |
280 | a working connection to it. |
278 | |
281 | |
279 | If the node is up, returns C<1>. If the node is currently connecting or |
282 | More precisely, if the node is up, returns C<1>. If the node is currently |
280 | otherwise known but not connected, returns C<0>. If nothing is known about |
283 | connecting or otherwise known but not connected, returns C<0>. If nothing |
281 | the node, returns C<undef>. |
284 | is known about the node, returns C<undef>. |
282 | |
285 | |
283 | =cut |
286 | =cut |
284 | |
287 | |
285 | sub node_is_up($) { |
288 | sub node_is_up($) { |
286 | ($NODE{$_[0]} or return)->{transport} |
289 | ($_[0] eq $NODE) || ($NODE{$_[0]} or return)->{transport} |
287 | ? 1 : 0 |
290 | ? 1 : 0 |
288 | } |
291 | } |
289 | |
292 | |
290 | =item @nodes = up_nodes |
293 | =item @nodes = up_nodes |
291 | |
294 | |
… | |
… | |
335 | } |
338 | } |
336 | |
339 | |
337 | sub _inject_nodeevent($$;@) { |
340 | sub _inject_nodeevent($$;@) { |
338 | my ($node, $up, @reason) = @_; |
341 | my ($node, $up, @reason) = @_; |
339 | |
342 | |
|
|
343 | AE::log 7 => "$node->{id} is " . ($up ? "up." : "down (@reason)."); |
|
|
344 | |
340 | for my $cb (values %MON_NODES) { |
345 | for my $cb (values %MON_NODES) { |
341 | eval { $cb->($node->{id}, $up, @reason); 1 } |
346 | eval { $cb->($node->{id}, $up, @reason); 1 } |
342 | or AE::log die => $@; |
347 | or AE::log die => $@; |
343 | } |
348 | } |
344 | |
|
|
345 | AE::log 7 => "$node->{id} is " . ($up ? "up." : "down (@reason)."); |
|
|
346 | } |
349 | } |
347 | |
350 | |
348 | ############################################################################# |
351 | ############################################################################# |
349 | # self node code |
352 | # self node code |
350 | |
353 | |
… | |
… | |
377 | sub _secure_check { |
380 | sub _secure_check { |
378 | $SECURE |
381 | $SECURE |
379 | and die "remote execution not allowed\n"; |
382 | and die "remote execution not allowed\n"; |
380 | } |
383 | } |
381 | |
384 | |
|
|
385 | our %NODE_REQ; |
|
|
386 | |
382 | our %NODE_REQ = ( |
387 | %NODE_REQ = ( |
383 | # internal services |
388 | # "mproto" - monitoring protocol |
384 | |
389 | |
385 | # monitoring |
390 | # monitoring |
386 | mon0 => sub { # stop monitoring a port for another node |
391 | mon0 => sub { # stop monitoring a port for another node |
387 | my $portid = shift; |
392 | my $portid = shift; |
388 | _unmonitor undef, $portid, delete $NODE{$SRCNODE}{rmon}{$portid}; |
393 | _unmonitor undef, $portid, delete $NODE{$SRCNODE}{rmon}{$portid}; |
… | |
… | |
401 | my $cbs = delete $NODE{$SRCNODE}{lmon}{+shift} |
406 | my $cbs = delete $NODE{$SRCNODE}{lmon}{+shift} |
402 | or return; |
407 | or return; |
403 | |
408 | |
404 | $_->(@_) for @$cbs; |
409 | $_->(@_) for @$cbs; |
405 | }, |
410 | }, |
|
|
411 | # another node wants to kill a local port |
|
|
412 | kil1 => \&_kill, |
406 | |
413 | |
407 | # "public" services - not actually public |
414 | # "public" services - not actually public |
408 | |
|
|
409 | # another node wants to kill a local port |
|
|
410 | kil => \&_kill, |
|
|
411 | |
415 | |
412 | # relay message to another node / generic echo |
416 | # relay message to another node / generic echo |
413 | snd => sub { |
417 | snd => sub { |
414 | &_secure_check; |
|
|
415 | &snd |
418 | &snd |
|
|
419 | }, |
|
|
420 | # ask if a node supports the given request, only works for fixed tags |
|
|
421 | can => sub { |
|
|
422 | my $method = shift; |
|
|
423 | snd @_, exists $NODE_REQ{$method}; |
416 | }, |
424 | }, |
417 | |
425 | |
418 | # random utilities |
426 | # random utilities |
419 | eval => sub { |
427 | eval => sub { |
420 | &_secure_check; |
428 | &_secure_check; |
421 | my @res = do { package main; eval shift }; |
429 | my @res = do { package main; eval shift }; |
422 | snd @_, "$@", @res if @_; |
430 | snd @_, "$@", @res if @_; |
423 | }, |
431 | }, |
424 | time => sub { |
432 | time => sub { |
425 | &_secure_check; |
|
|
426 | snd @_, AE::now; |
433 | snd @_, AE::now; |
427 | }, |
434 | }, |
428 | devnull => sub { |
435 | devnull => sub { |
429 | # |
436 | # |
430 | }, |
437 | }, |
… | |
… | |
432 | # empty messages are keepalives or similar devnull-applications |
439 | # empty messages are keepalives or similar devnull-applications |
433 | }, |
440 | }, |
434 | ); |
441 | ); |
435 | |
442 | |
436 | # the node port |
443 | # the node port |
437 | $NODE{$NODE} = new AnyEvent::MP::Node::Self $NODE; |
444 | new AnyEvent::MP::Node::Self $NODE; # registers itself in %NODE |
|
|
445 | |
438 | $PORT{""} = sub { |
446 | $PORT{""} = sub { |
439 | my $tag = shift; |
447 | my $tag = shift; |
440 | eval { &{ $NODE_REQ{$tag} ||= do { &_secure_check; load_func $tag } } }; |
448 | eval { &{ $NODE_REQ{$tag} ||= do { &_secure_check; load_func $tag } } }; |
441 | AE::log die => "error processing node message from $SRCNODE: $@" if $@; |
449 | AE::log die => "error processing node message from $SRCNODE: $@" if $@; |
442 | }; |
450 | }; |
443 | |
451 | |
444 | our $NPROTO = 1; |
452 | our $MPROTO = 1; |
445 | |
453 | |
446 | # tell everybody who connects our nproto |
454 | # tell everybody who connects our nproto |
447 | push @AnyEvent::MP::Transport::HOOK_GREET, sub { |
455 | push @AnyEvent::MP::Transport::HOOK_GREET, sub { |
448 | $_[0]{local_greeting}{nproto} = $NPROTO; |
456 | $_[0]{local_greeting}{mproto} = $MPROTO; |
449 | }; |
457 | }; |
450 | |
458 | |
451 | ############################################################################# |
459 | ############################################################################# |
452 | # seed management, try to keep connections to all seeds at all times |
460 | # seed management, try to keep connections to all seeds at all times |
453 | |
461 | |
… | |
… | |
543 | snd $_[0], "g_slave" |
551 | snd $_[0], "g_slave" |
544 | unless $NODE{$_[0]}{transport}{remote_greeting}{global}; |
552 | unless $NODE{$_[0]}{transport}{remote_greeting}{global}; |
545 | } else { |
553 | } else { |
546 | # if we lost the connection to a seed node, make sure we are seeding |
554 | # if we lost the connection to a seed node, make sure we are seeding |
547 | seed_again; |
555 | seed_again; |
|
|
556 | } |
|
|
557 | }; |
|
|
558 | |
|
|
559 | ############################################################################# |
|
|
560 | # keepalive code - used to kepe conenctions to certain nodes alive |
|
|
561 | # only used by global code atm., but ought to be exposed somehow. |
|
|
562 | |
|
|
563 | our $KEEPALIVE_RETRY; |
|
|
564 | our $KEEPALIVE_WATCHER; |
|
|
565 | our %KEEPALIVE; # we want to keep these nodes alive |
|
|
566 | our %KEEPALIVE_DOWN; # nodes that are down currently |
|
|
567 | |
|
|
568 | sub keepalive_all { |
|
|
569 | AE::log 9 => "keepalive: trying to establish connections with: " |
|
|
570 | . (join " ", keys %KEEPALIVE_DOWN) |
|
|
571 | . "."; |
|
|
572 | |
|
|
573 | (add_node $_)->connect |
|
|
574 | for keys %KEEPALIVE_DOWN; |
|
|
575 | |
|
|
576 | $KEEPALIVE_RETRY = $KEEPALIVE_RETRY * 2 + rand; |
|
|
577 | $KEEPALIVE_RETRY = $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout} |
|
|
578 | if $KEEPALIVE_RETRY > $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout}; |
|
|
579 | |
|
|
580 | $KEEPALIVE_WATCHER = AE::timer $KEEPALIVE_RETRY, 0, \&keepalive_all; |
|
|
581 | } |
|
|
582 | |
|
|
583 | sub keepalive_again { |
|
|
584 | $KEEPALIVE_RETRY = 1; |
|
|
585 | $KEEPALIVE_WATCHER ||= AE::timer 1, 0, \&keepalive_all; |
|
|
586 | } |
|
|
587 | |
|
|
588 | sub keepalive_add { |
|
|
589 | return if $KEEPALIVE{$_[0]}++; |
|
|
590 | |
|
|
591 | return if node_is_up $_[0]; |
|
|
592 | undef $KEEPALIVE_DOWN{$_[0]}; |
|
|
593 | keepalive_again; |
|
|
594 | } |
|
|
595 | |
|
|
596 | sub keepalive_del { |
|
|
597 | return if --$KEEPALIVE{$_[0]}; |
|
|
598 | |
|
|
599 | delete $KEEPALIVE {$_[0]}; |
|
|
600 | delete $KEEPALIVE_DOWN{$_[0]}; |
|
|
601 | |
|
|
602 | undef $KEEPALIVE_WATCHER |
|
|
603 | unless %KEEPALIVE_DOWN; |
|
|
604 | } |
|
|
605 | |
|
|
606 | mon_nodes sub { |
|
|
607 | return unless exists $KEEPALIVE{$_[0]}; |
|
|
608 | |
|
|
609 | if ($_[1]) { |
|
|
610 | delete $KEEPALIVE_DOWN{$_[0]}; |
|
|
611 | |
|
|
612 | undef $KEEPALIVE_WATCHER |
|
|
613 | unless %KEEPALIVE_DOWN; |
|
|
614 | } else { |
|
|
615 | # lost the conenction, try to connect again |
|
|
616 | undef $KEEPALIVE_DOWN{$_[0]}; |
|
|
617 | keepalive_again; |
548 | } |
618 | } |
549 | }; |
619 | }; |
550 | |
620 | |
551 | ############################################################################# |
621 | ############################################################################# |
552 | # talk with/to global nodes |
622 | # talk with/to global nodes |