… | |
… | |
164 | our %LISTENER; |
164 | our %LISTENER; |
165 | our $LISTENER; # our listeners, as arrayref |
165 | our $LISTENER; # our listeners, as arrayref |
166 | |
166 | |
167 | our $SRCNODE; # holds the sending node during _inject |
167 | our $SRCNODE; # holds the sending node during _inject |
168 | |
168 | |
169 | sub _seed { |
169 | sub _init_names { |
170 | $RUNIQ = alnumbits nonce 96/8; |
170 | $RUNIQ = alnumbits nonce 96/8; |
171 | $UNIQ = gen_uniq; |
171 | $UNIQ = gen_uniq; |
172 | $NODE = "anon/$RUNIQ"; |
172 | $NODE = "anon/$RUNIQ"; |
173 | } |
173 | } |
174 | |
174 | |
175 | _seed; |
175 | _init_names; |
176 | |
176 | |
177 | sub NODE() { |
177 | sub NODE() { |
178 | $NODE |
178 | $NODE |
179 | } |
179 | } |
180 | |
180 | |
… | |
… | |
295 | or Carp::croak "$nodeid#$portid: killing a node port is not allowed, caught"; |
295 | or Carp::croak "$nodeid#$portid: killing a node port is not allowed, caught"; |
296 | |
296 | |
297 | ($NODE{$nodeid} || add_node $nodeid) |
297 | ($NODE{$nodeid} || add_node $nodeid) |
298 | ->kill ("$portid", @_); |
298 | ->kill ("$portid", @_); |
299 | } |
299 | } |
|
|
300 | |
|
|
301 | ############################################################################# |
|
|
302 | # node monitoring and info |
|
|
303 | |
|
|
304 | =item node_is_known $nodeid |
|
|
305 | |
|
|
306 | Returns true iff the given node is currently known to the system. The only |
|
|
307 | time a node is known but not up currently is when a connection request is |
|
|
308 | pending. |
|
|
309 | |
|
|
310 | =cut |
|
|
311 | |
|
|
312 | sub node_is_known($) { |
|
|
313 | exists $NODE{$_[0]} |
|
|
314 | } |
|
|
315 | |
|
|
316 | =item node_is_up $nodeid |
|
|
317 | |
|
|
318 | Returns true if the given node is "up", that is, the kernel thinks it has |
|
|
319 | a working connection to it. |
|
|
320 | |
|
|
321 | If the node is known (to this local node) but not currently connected, |
|
|
322 | returns C<0>. If the node is not known, returns C<undef>. |
|
|
323 | |
|
|
324 | =cut |
|
|
325 | |
|
|
326 | sub node_is_up($) { |
|
|
327 | ($NODE{$_[0]} or return)->{transport} |
|
|
328 | ? 1 : 0 |
|
|
329 | } |
|
|
330 | |
|
|
331 | =item known_nodes |
|
|
332 | |
|
|
333 | Returns the node IDs of all nodes currently known to this node, including |
|
|
334 | itself and nodes not currently connected. |
|
|
335 | |
|
|
336 | =cut |
|
|
337 | |
|
|
338 | sub known_nodes() { |
|
|
339 | map $_->{id}, values %NODE |
|
|
340 | } |
|
|
341 | |
|
|
342 | =item up_nodes |
|
|
343 | |
|
|
344 | Return the node IDs of all nodes that are currently connected (excluding |
|
|
345 | the node itself). |
|
|
346 | |
|
|
347 | =cut |
|
|
348 | |
|
|
349 | sub up_nodes() { |
|
|
350 | map $_->{id}, grep $_->{transport}, values %NODE |
|
|
351 | } |
|
|
352 | |
|
|
353 | =item $guard = mon_nodes $callback->($nodeid, $is_up, @reason) |
|
|
354 | |
|
|
355 | Registers a callback that is called each time a node goes up (a connection |
|
|
356 | is established) or down (the connection is lost). |
|
|
357 | |
|
|
358 | Node up messages can only be followed by node down messages for the same |
|
|
359 | node, and vice versa. |
|
|
360 | |
|
|
361 | Note that monitoring a node is usually better done by monitoring it's node |
|
|
362 | port. This function is mainly of interest to modules that are concerned |
|
|
363 | about the network topology and low-level connection handling. |
|
|
364 | |
|
|
365 | Callbacks I<must not> block and I<should not> send any messages. |
|
|
366 | |
|
|
367 | The function returns an optional guard which can be used to unregister |
|
|
368 | the monitoring callback again. |
|
|
369 | |
|
|
370 | Example: make sure you call function C<newnode> for all nodes that are up |
|
|
371 | or go up (and down). |
|
|
372 | |
|
|
373 | newnode $_, 1 for up_nodes; |
|
|
374 | mon_nodes \&newnode; |
|
|
375 | |
|
|
376 | =cut |
|
|
377 | |
|
|
378 | our %MON_NODES; |
|
|
379 | |
|
|
380 | sub mon_nodes($) { |
|
|
381 | my ($cb) = @_; |
|
|
382 | |
|
|
383 | $MON_NODES{$cb+0} = $cb; |
|
|
384 | |
|
|
385 | defined wantarray && AnyEvent::Util::guard { delete $MON_NODES{$cb+0} } |
|
|
386 | } |
|
|
387 | |
|
|
388 | sub _inject_nodeevent($$;@) { |
|
|
389 | my ($node, $up, @reason) = @_; |
|
|
390 | |
|
|
391 | for my $cb (values %MON_NODES) { |
|
|
392 | eval { $cb->($node->{id}, $up, @reason); 1 } |
|
|
393 | or $WARN->(1, $@); |
|
|
394 | } |
|
|
395 | |
|
|
396 | $WARN->(7, "$node->{id} is " . ($up ? "up" : "down") . " (@reason)"); |
|
|
397 | } |
|
|
398 | |
|
|
399 | ############################################################################# |
|
|
400 | # self node code |
|
|
401 | |
|
|
402 | sub _kill { |
|
|
403 | my $port = shift; |
|
|
404 | |
|
|
405 | delete $PORT{$port} |
|
|
406 | or return; # killing nonexistent ports is O.K. |
|
|
407 | delete $PORT_DATA{$port}; |
|
|
408 | |
|
|
409 | my $mon = delete $LMON{$port} |
|
|
410 | or !@_ |
|
|
411 | or $WARN->(2, "unmonitored local port $port died with reason: @_"); |
|
|
412 | |
|
|
413 | $_->(@_) for values %$mon; |
|
|
414 | } |
|
|
415 | |
|
|
416 | sub _monitor { |
|
|
417 | return $_[2](no_such_port => "cannot monitor nonexistent port", "$NODE#$_[1]") |
|
|
418 | unless exists $PORT{$_[1]}; |
|
|
419 | |
|
|
420 | $LMON{$_[1]}{$_[2]+0} = $_[2]; |
|
|
421 | } |
|
|
422 | |
|
|
423 | sub _unmonitor { |
|
|
424 | delete $LMON{$_[1]}{$_[2]+0} |
|
|
425 | if exists $LMON{$_[1]}; |
|
|
426 | } |
|
|
427 | |
|
|
428 | our %node_req = ( |
|
|
429 | # internal services |
|
|
430 | |
|
|
431 | # monitoring |
|
|
432 | mon0 => sub { # stop monitoring a port for another node |
|
|
433 | my $portid = shift; |
|
|
434 | _unmonitor undef, $portid, delete $SRCNODE->{rmon}{$portid}; |
|
|
435 | }, |
|
|
436 | mon1 => sub { # start monitoring a port for another node |
|
|
437 | my $portid = shift; |
|
|
438 | Scalar::Util::weaken (my $node = $SRCNODE); |
|
|
439 | _monitor undef, $portid, $node->{rmon}{$portid} = sub { |
|
|
440 | delete $node->{rmon}{$portid}; |
|
|
441 | $node->send (["", kil0 => $portid, @_]) |
|
|
442 | if $node && $node->{transport}; |
|
|
443 | }; |
|
|
444 | }, |
|
|
445 | # another node has killed a monitored port |
|
|
446 | kil0 => sub { |
|
|
447 | my $cbs = delete $SRCNODE->{lmon}{+shift} |
|
|
448 | or return; |
|
|
449 | |
|
|
450 | $_->(@_) for @$cbs; |
|
|
451 | }, |
|
|
452 | |
|
|
453 | # "public" services - not actually public |
|
|
454 | |
|
|
455 | # another node wants to kill a local port |
|
|
456 | kil => \&_kill, |
|
|
457 | |
|
|
458 | # relay message to another node / generic echo |
|
|
459 | snd => \&snd, |
|
|
460 | snd_multiple => sub { |
|
|
461 | snd @$_ for @_ |
|
|
462 | }, |
|
|
463 | |
|
|
464 | # informational |
|
|
465 | info => sub { |
|
|
466 | snd @_, $NODE; |
|
|
467 | }, |
|
|
468 | known_nodes => sub { |
|
|
469 | snd @_, known_nodes; |
|
|
470 | }, |
|
|
471 | up_nodes => sub { |
|
|
472 | snd @_, up_nodes; |
|
|
473 | }, |
|
|
474 | |
|
|
475 | # random utilities |
|
|
476 | eval => sub { |
|
|
477 | my @res = do { package main; eval shift }; |
|
|
478 | snd @_, "$@", @res if @_; |
|
|
479 | }, |
|
|
480 | time => sub { |
|
|
481 | snd @_, AE::time; |
|
|
482 | }, |
|
|
483 | devnull => sub { |
|
|
484 | # |
|
|
485 | }, |
|
|
486 | "" => sub { |
|
|
487 | # empty messages are keepalives or similar devnull-applications |
|
|
488 | }, |
|
|
489 | ); |
|
|
490 | |
|
|
491 | $NODE{""} = $NODE{$NODE} = new AnyEvent::MP::Node::Self $NODE; |
|
|
492 | $PORT{""} = sub { |
|
|
493 | my $tag = shift; |
|
|
494 | eval { &{ $node_req{$tag} ||= load_func $tag } }; |
|
|
495 | $WARN->(2, "error processing node message: $@") if $@; |
|
|
496 | }; |
|
|
497 | |
|
|
498 | ############################################################################# |
|
|
499 | # seed management, try to keep connections to all seeds |
|
|
500 | |
|
|
501 | our %SEED_NODE; # seed ID => node ID|undef |
|
|
502 | our %NODE_SEED; # map node ID to seed ID |
|
|
503 | our %SEED_CONNECT; # $seed => transport_connector | 1=connected | 2=connecting |
|
|
504 | our $SEED_WATCHER; |
|
|
505 | |
|
|
506 | # called before sending the greeting, grabs address we connect to |
|
|
507 | push @AnyEvent::MP::Transport::HOOK_CONNECT, sub { |
|
|
508 | defined (my $seed = $_[0]{seed}) |
|
|
509 | or return; |
|
|
510 | |
|
|
511 | $SEED_CONNECT{$seed} = 2; |
|
|
512 | }; |
|
|
513 | |
|
|
514 | # called after receiving remote greeting, grabs remote node name |
|
|
515 | push @AnyEvent::MP::Transport::HOOK_GREETING, sub { |
|
|
516 | defined (my $seed = $_[0]{seed}) |
|
|
517 | or return; |
|
|
518 | |
|
|
519 | # we rely on untrusted data here (the remote node name) this is |
|
|
520 | # hopefully ok, as this can at most be used for DOSing, which is easy |
|
|
521 | # when you can do MITM anyway. |
|
|
522 | |
|
|
523 | # if we connect to ourselves, nuke this seed |
|
|
524 | if ($_[0]{remote_node} eq $AnyEvent::MP::Kernel::NODE) { |
|
|
525 | delete $SEED_NODE{$_[0]{seed}}; |
|
|
526 | delete $NODE_SEED{$_[0]{seed}}; |
|
|
527 | } else { |
|
|
528 | $SEED_NODE{$seed} = $_[0]{remote_node}; |
|
|
529 | $NODE_SEED{$_[0]{remote_node}} = $seed; |
|
|
530 | } |
|
|
531 | }; |
|
|
532 | |
|
|
533 | ## called when connection is up, same as above, but now verified |
|
|
534 | #push @AnyEvent::MP::Transport::HOOK_CONNECTED, sub { |
|
|
535 | # defined (my $seed = $_[0]{seed}) |
|
|
536 | # or return; |
|
|
537 | # AE::log 5, "connected($seed)\n";#d# |
|
|
538 | # |
|
|
539 | # $SEED_NODE{$seed} = $_[0]{remote_node}; |
|
|
540 | # $NODE_SEED{$_[0]{remote_node}} = $seed; |
|
|
541 | #}; |
|
|
542 | |
|
|
543 | # called when connections get destroyed, update our data structures |
|
|
544 | # and check for self-connects |
|
|
545 | push @AnyEvent::MP::Transport::HOOK_DESTROY, sub { |
|
|
546 | # if we lost the connection to a seed node, make sure we start seeding |
|
|
547 | seed_again ()#d# |
|
|
548 | if exists $NODE_SEED{ $_[0]{remote_node} }; |
|
|
549 | |
|
|
550 | defined (my $seed = $_[0]{seed}) |
|
|
551 | or return; |
|
|
552 | |
|
|
553 | delete $SEED_CONNECT{$seed}; |
|
|
554 | }; |
|
|
555 | |
|
|
556 | sub seed_connect { |
|
|
557 | my ($seed) = @_; |
|
|
558 | |
|
|
559 | my ($host, $port) = AnyEvent::Socket::parse_hostport $seed |
|
|
560 | or Carp::croak "$seed: unparsable seed address"; |
|
|
561 | |
|
|
562 | $AnyEvent::MP::Kernel::WARN->(9, "trying connect to seed node $seed."); |
|
|
563 | |
|
|
564 | # ughhh |
|
|
565 | $SEED_CONNECT{$seed} ||= AnyEvent::MP::Transport::mp_connect $host, $port, |
|
|
566 | seed => $seed, |
|
|
567 | sub { |
|
|
568 | $SEED_CONNECT{$seed} = 1; |
|
|
569 | }, |
|
|
570 | ; |
|
|
571 | } |
|
|
572 | |
|
|
573 | sub seed_all { |
|
|
574 | # my $next = List::Util::max 1, |
|
|
575 | # $AnyEvent::MP::Kernel::CONFIG->{connect_interval} |
|
|
576 | # * ($nodecnt ? keys %AnyEvent::MP::Kernel::NODE : 1) |
|
|
577 | # - rand; |
|
|
578 | |
|
|
579 | my @seeds = grep { |
|
|
580 | !exists $SEED_CONNECT{$_} |
|
|
581 | && !(defined $SEED_NODE{$_} && node_is_up $SEED_NODE{$_}) |
|
|
582 | } keys %SEED_NODE; |
|
|
583 | |
|
|
584 | if (@seeds) { |
|
|
585 | # start conenction attempt for every seed we are not connected to yet |
|
|
586 | seed_connect $_ |
|
|
587 | for @seeds; |
|
|
588 | } else { |
|
|
589 | # all seeds connected or connecting |
|
|
590 | undef $SEED_WATCHER; |
|
|
591 | } |
|
|
592 | } |
|
|
593 | |
|
|
594 | sub seed_again { |
|
|
595 | $SEED_WATCHER ||= AE::timer 0, $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout}, \&seed_all; |
|
|
596 | } |
|
|
597 | |
|
|
598 | # sets new seed list, starts connecting |
|
|
599 | sub set_seeds(@) { |
|
|
600 | %SEED_NODE = (); |
|
|
601 | @SEED_NODE{@_} = (); |
|
|
602 | |
|
|
603 | seed_again; |
|
|
604 | } |
|
|
605 | |
|
|
606 | ############################################################################# |
|
|
607 | # configure |
300 | |
608 | |
301 | sub _nodename { |
609 | sub _nodename { |
302 | require POSIX; |
610 | require POSIX; |
303 | (POSIX::uname ())[1] |
611 | (POSIX::uname ())[1] |
304 | } |
612 | } |
… | |
… | |
387 | sub configure(@) { |
695 | sub configure(@) { |
388 | unshift @_, "profile" if @_ & 1; |
696 | unshift @_, "profile" if @_ & 1; |
389 | my (%kv) = @_; |
697 | my (%kv) = @_; |
390 | |
698 | |
391 | delete $NODE{$NODE}; # we do not support doing stuff before configure |
699 | delete $NODE{$NODE}; # we do not support doing stuff before configure |
392 | _seed; |
700 | _init_names; |
393 | |
701 | |
394 | my $profile = delete $kv{profile}; |
702 | my $profile = delete $kv{profile}; |
395 | |
703 | |
396 | $profile = _nodename |
704 | $profile = _nodename |
397 | unless defined $profile; |
705 | unless defined $profile; |
… | |
… | |
441 | |
749 | |
442 | # the global service is mandatory currently |
750 | # the global service is mandatory currently |
443 | require AnyEvent::MP::Global; |
751 | require AnyEvent::MP::Global; |
444 | |
752 | |
445 | # connect to all seednodes |
753 | # connect to all seednodes |
446 | AnyEvent::MP::Global::set_seeds (map $_->recv, map _resolve $_, @$seeds); |
754 | set_seeds map $_->recv, map _resolve $_, @$seeds; |
447 | |
755 | |
448 | for (@{ $CONFIG->{services} }) { |
756 | for (@{ $CONFIG->{services} }) { |
449 | if (ref) { |
757 | if (ref) { |
450 | my ($func, @args) = @$_; |
758 | my ($func, @args) = @$_; |
451 | (load_func $func)->(@args); |
759 | (load_func $func)->(@args); |
… | |
… | |
456 | (load_func $_)->(); |
764 | (load_func $_)->(); |
457 | } |
765 | } |
458 | } |
766 | } |
459 | } |
767 | } |
460 | |
768 | |
461 | ############################################################################# |
|
|
462 | # node monitoring and info |
|
|
463 | |
|
|
464 | =item node_is_known $nodeid |
|
|
465 | |
|
|
466 | Returns true iff the given node is currently known to the system. The only |
|
|
467 | time a node is known but not up currently is when a conenction request is |
|
|
468 | pending. |
|
|
469 | |
|
|
470 | =cut |
|
|
471 | |
|
|
472 | sub node_is_known($) { |
|
|
473 | exists $NODE{$_[0]} |
|
|
474 | } |
|
|
475 | |
|
|
476 | =item node_is_up $nodeid |
|
|
477 | |
|
|
478 | Returns true if the given node is "up", that is, the kernel thinks it has |
|
|
479 | a working connection to it. |
|
|
480 | |
|
|
481 | If the node is known but not currently connected, returns C<0>. If the |
|
|
482 | node is not known, returns C<undef>. |
|
|
483 | |
|
|
484 | =cut |
|
|
485 | |
|
|
486 | sub node_is_up($) { |
|
|
487 | ($NODE{$_[0]} or return)->{transport} |
|
|
488 | ? 1 : 0 |
|
|
489 | } |
|
|
490 | |
|
|
491 | =item known_nodes |
|
|
492 | |
|
|
493 | Returns the node IDs of all nodes currently known to this node, including |
|
|
494 | itself and nodes not currently connected. |
|
|
495 | |
|
|
496 | =cut |
|
|
497 | |
|
|
498 | sub known_nodes() { |
|
|
499 | map $_->{id}, values %NODE |
|
|
500 | } |
|
|
501 | |
|
|
502 | =item up_nodes |
|
|
503 | |
|
|
504 | Return the node IDs of all nodes that are currently connected (excluding |
|
|
505 | the node itself). |
|
|
506 | |
|
|
507 | =cut |
|
|
508 | |
|
|
509 | sub up_nodes() { |
|
|
510 | map $_->{id}, grep $_->{transport}, values %NODE |
|
|
511 | } |
|
|
512 | |
|
|
513 | =item $guard = mon_nodes $callback->($nodeid, $is_up, @reason) |
|
|
514 | |
|
|
515 | Registers a callback that is called each time a node goes up (a connection |
|
|
516 | is established) or down (the connection is lost). |
|
|
517 | |
|
|
518 | Node up messages can only be followed by node down messages for the same |
|
|
519 | node, and vice versa. |
|
|
520 | |
|
|
521 | Note that monitoring a node is usually better done by monitoring it's node |
|
|
522 | port. This function is mainly of interest to modules that are concerned |
|
|
523 | about the network topology and low-level connection handling. |
|
|
524 | |
|
|
525 | Callbacks I<must not> block and I<should not> send any messages. |
|
|
526 | |
|
|
527 | The function returns an optional guard which can be used to unregister |
|
|
528 | the monitoring callback again. |
|
|
529 | |
|
|
530 | Example: make sure you call function C<newnode> for all nodes that are up |
|
|
531 | or go up (and down). |
|
|
532 | |
|
|
533 | newnode $_, 1 for up_nodes; |
|
|
534 | mon_nodes \&newnode; |
|
|
535 | |
|
|
536 | =cut |
|
|
537 | |
|
|
538 | our %MON_NODES; |
|
|
539 | |
|
|
540 | sub mon_nodes($) { |
|
|
541 | my ($cb) = @_; |
|
|
542 | |
|
|
543 | $MON_NODES{$cb+0} = $cb; |
|
|
544 | |
|
|
545 | defined wantarray && AnyEvent::Util::guard { delete $MON_NODES{$cb+0} } |
|
|
546 | } |
|
|
547 | |
|
|
548 | sub _inject_nodeevent($$;@) { |
|
|
549 | my ($node, $up, @reason) = @_; |
|
|
550 | |
|
|
551 | for my $cb (values %MON_NODES) { |
|
|
552 | eval { $cb->($node->{id}, $up, @reason); 1 } |
|
|
553 | or $WARN->(1, $@); |
|
|
554 | } |
|
|
555 | |
|
|
556 | $WARN->(7, "$node->{id} is " . ($up ? "up" : "down") . " (@reason)"); |
|
|
557 | } |
|
|
558 | |
|
|
559 | ############################################################################# |
|
|
560 | # self node code |
|
|
561 | |
|
|
562 | sub _kill { |
|
|
563 | my $port = shift; |
|
|
564 | |
|
|
565 | delete $PORT{$port} |
|
|
566 | or return; # killing nonexistent ports is O.K. |
|
|
567 | delete $PORT_DATA{$port}; |
|
|
568 | |
|
|
569 | my $mon = delete $LMON{$port} |
|
|
570 | or !@_ |
|
|
571 | or $WARN->(2, "unmonitored local port $port died with reason: @_"); |
|
|
572 | |
|
|
573 | $_->(@_) for values %$mon; |
|
|
574 | } |
|
|
575 | |
|
|
576 | sub _monitor { |
|
|
577 | return $_[2](no_such_port => "cannot monitor nonexistent port", "$NODE#$_[1]") |
|
|
578 | unless exists $PORT{$_[1]}; |
|
|
579 | |
|
|
580 | $LMON{$_[1]}{$_[2]+0} = $_[2]; |
|
|
581 | } |
|
|
582 | |
|
|
583 | sub _unmonitor { |
|
|
584 | delete $LMON{$_[1]}{$_[2]+0} |
|
|
585 | if exists $LMON{$_[1]}; |
|
|
586 | } |
|
|
587 | |
|
|
588 | our %node_req = ( |
|
|
589 | # internal services |
|
|
590 | |
|
|
591 | # monitoring |
|
|
592 | mon0 => sub { # stop monitoring a port for another node |
|
|
593 | my $portid = shift; |
|
|
594 | _unmonitor undef, $portid, delete $SRCNODE->{rmon}{$portid}; |
|
|
595 | }, |
|
|
596 | mon1 => sub { # start monitoring a port for another node |
|
|
597 | my $portid = shift; |
|
|
598 | Scalar::Util::weaken (my $node = $SRCNODE); |
|
|
599 | _monitor undef, $portid, $node->{rmon}{$portid} = sub { |
|
|
600 | delete $node->{rmon}{$portid}; |
|
|
601 | $node->send (["", kil0 => $portid, @_]) |
|
|
602 | if $node && $node->{transport}; |
|
|
603 | }; |
|
|
604 | }, |
|
|
605 | # another node has killed a monitored port |
|
|
606 | kil0 => sub { |
|
|
607 | my $cbs = delete $SRCNODE->{lmon}{+shift} |
|
|
608 | or return; |
|
|
609 | |
|
|
610 | $_->(@_) for @$cbs; |
|
|
611 | }, |
|
|
612 | |
|
|
613 | # "public" services - not actually public |
|
|
614 | |
|
|
615 | # another node wants to kill a local port |
|
|
616 | kil => \&_kill, |
|
|
617 | |
|
|
618 | # relay message to another node / generic echo |
|
|
619 | snd => \&snd, |
|
|
620 | snd_multiple => sub { |
|
|
621 | snd @$_ for @_ |
|
|
622 | }, |
|
|
623 | |
|
|
624 | # informational |
|
|
625 | info => sub { |
|
|
626 | snd @_, $NODE; |
|
|
627 | }, |
|
|
628 | known_nodes => sub { |
|
|
629 | snd @_, known_nodes; |
|
|
630 | }, |
|
|
631 | up_nodes => sub { |
|
|
632 | snd @_, up_nodes; |
|
|
633 | }, |
|
|
634 | |
|
|
635 | # random utilities |
|
|
636 | eval => sub { |
|
|
637 | my @res = do { package main; eval shift }; |
|
|
638 | snd @_, "$@", @res if @_; |
|
|
639 | }, |
|
|
640 | time => sub { |
|
|
641 | snd @_, AE::time; |
|
|
642 | }, |
|
|
643 | devnull => sub { |
|
|
644 | # |
|
|
645 | }, |
|
|
646 | "" => sub { |
|
|
647 | # empty messages are keepalives or similar devnull-applications |
|
|
648 | }, |
|
|
649 | ); |
|
|
650 | |
|
|
651 | $NODE{""} = $NODE{$NODE} = new AnyEvent::MP::Node::Self $NODE; |
|
|
652 | $PORT{""} = sub { |
|
|
653 | my $tag = shift; |
|
|
654 | eval { &{ $node_req{$tag} ||= load_func $tag } }; |
|
|
655 | $WARN->(2, "error processing node message: $@") if $@; |
|
|
656 | }; |
|
|
657 | |
|
|
658 | =back |
769 | =back |
659 | |
770 | |
660 | =head1 SEE ALSO |
771 | =head1 SEE ALSO |
661 | |
772 | |
662 | L<AnyEvent::MP>. |
773 | L<AnyEvent::MP>. |