ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Kernel.pm
(Generate patch)

Comparing AnyEvent-MP/MP/Kernel.pm (file contents):
Revision 1.68 by root, Mon Apr 19 04:50:46 2010 UTC vs.
Revision 1.69 by root, Sun Feb 26 10:29:59 2012 UTC

164our %LISTENER; 164our %LISTENER;
165our $LISTENER; # our listeners, as arrayref 165our $LISTENER; # our listeners, as arrayref
166 166
167our $SRCNODE; # holds the sending node during _inject 167our $SRCNODE; # holds the sending node during _inject
168 168
169sub _seed { 169sub _init_names {
170 $RUNIQ = alnumbits nonce 96/8; 170 $RUNIQ = alnumbits nonce 96/8;
171 $UNIQ = gen_uniq; 171 $UNIQ = gen_uniq;
172 $NODE = "anon/$RUNIQ"; 172 $NODE = "anon/$RUNIQ";
173} 173}
174 174
175_seed; 175_init_names;
176 176
177sub NODE() { 177sub NODE() {
178 $NODE 178 $NODE
179} 179}
180 180
295 or Carp::croak "$nodeid#$portid: killing a node port is not allowed, caught"; 295 or Carp::croak "$nodeid#$portid: killing a node port is not allowed, caught";
296 296
297 ($NODE{$nodeid} || add_node $nodeid) 297 ($NODE{$nodeid} || add_node $nodeid)
298 ->kill ("$portid", @_); 298 ->kill ("$portid", @_);
299} 299}
300
301#############################################################################
302# node monitoring and info
303
304=item node_is_known $nodeid
305
306Returns true iff the given node is currently known to the system. The only
307time a node is known but not up currently is when a connection request is
308pending.
309
310=cut
311
312sub node_is_known($) {
313 exists $NODE{$_[0]}
314}
315
316=item node_is_up $nodeid
317
318Returns true if the given node is "up", that is, the kernel thinks it has
319a working connection to it.
320
321If the node is known (to this local node) but not currently connected,
322returns C<0>. If the node is not known, returns C<undef>.
323
324=cut
325
326sub node_is_up($) {
327 ($NODE{$_[0]} or return)->{transport}
328 ? 1 : 0
329}
330
331=item known_nodes
332
333Returns the node IDs of all nodes currently known to this node, including
334itself and nodes not currently connected.
335
336=cut
337
338sub known_nodes() {
339 map $_->{id}, values %NODE
340}
341
342=item up_nodes
343
344Return the node IDs of all nodes that are currently connected (excluding
345the node itself).
346
347=cut
348
349sub up_nodes() {
350 map $_->{id}, grep $_->{transport}, values %NODE
351}
352
353=item $guard = mon_nodes $callback->($nodeid, $is_up, @reason)
354
355Registers a callback that is called each time a node goes up (a connection
356is established) or down (the connection is lost).
357
358Node up messages can only be followed by node down messages for the same
359node, and vice versa.
360
361Note that monitoring a node is usually better done by monitoring it's node
362port. This function is mainly of interest to modules that are concerned
363about the network topology and low-level connection handling.
364
365Callbacks I<must not> block and I<should not> send any messages.
366
367The function returns an optional guard which can be used to unregister
368the monitoring callback again.
369
370Example: make sure you call function C<newnode> for all nodes that are up
371or go up (and down).
372
373 newnode $_, 1 for up_nodes;
374 mon_nodes \&newnode;
375
376=cut
377
378our %MON_NODES;
379
380sub mon_nodes($) {
381 my ($cb) = @_;
382
383 $MON_NODES{$cb+0} = $cb;
384
385 defined wantarray && AnyEvent::Util::guard { delete $MON_NODES{$cb+0} }
386}
387
388sub _inject_nodeevent($$;@) {
389 my ($node, $up, @reason) = @_;
390
391 for my $cb (values %MON_NODES) {
392 eval { $cb->($node->{id}, $up, @reason); 1 }
393 or $WARN->(1, $@);
394 }
395
396 $WARN->(7, "$node->{id} is " . ($up ? "up" : "down") . " (@reason)");
397}
398
399#############################################################################
400# self node code
401
402sub _kill {
403 my $port = shift;
404
405 delete $PORT{$port}
406 or return; # killing nonexistent ports is O.K.
407 delete $PORT_DATA{$port};
408
409 my $mon = delete $LMON{$port}
410 or !@_
411 or $WARN->(2, "unmonitored local port $port died with reason: @_");
412
413 $_->(@_) for values %$mon;
414}
415
416sub _monitor {
417 return $_[2](no_such_port => "cannot monitor nonexistent port", "$NODE#$_[1]")
418 unless exists $PORT{$_[1]};
419
420 $LMON{$_[1]}{$_[2]+0} = $_[2];
421}
422
423sub _unmonitor {
424 delete $LMON{$_[1]}{$_[2]+0}
425 if exists $LMON{$_[1]};
426}
427
428our %node_req = (
429 # internal services
430
431 # monitoring
432 mon0 => sub { # stop monitoring a port for another node
433 my $portid = shift;
434 _unmonitor undef, $portid, delete $SRCNODE->{rmon}{$portid};
435 },
436 mon1 => sub { # start monitoring a port for another node
437 my $portid = shift;
438 Scalar::Util::weaken (my $node = $SRCNODE);
439 _monitor undef, $portid, $node->{rmon}{$portid} = sub {
440 delete $node->{rmon}{$portid};
441 $node->send (["", kil0 => $portid, @_])
442 if $node && $node->{transport};
443 };
444 },
445 # another node has killed a monitored port
446 kil0 => sub {
447 my $cbs = delete $SRCNODE->{lmon}{+shift}
448 or return;
449
450 $_->(@_) for @$cbs;
451 },
452
453 # "public" services - not actually public
454
455 # another node wants to kill a local port
456 kil => \&_kill,
457
458 # relay message to another node / generic echo
459 snd => \&snd,
460 snd_multiple => sub {
461 snd @$_ for @_
462 },
463
464 # informational
465 info => sub {
466 snd @_, $NODE;
467 },
468 known_nodes => sub {
469 snd @_, known_nodes;
470 },
471 up_nodes => sub {
472 snd @_, up_nodes;
473 },
474
475 # random utilities
476 eval => sub {
477 my @res = do { package main; eval shift };
478 snd @_, "$@", @res if @_;
479 },
480 time => sub {
481 snd @_, AE::time;
482 },
483 devnull => sub {
484 #
485 },
486 "" => sub {
487 # empty messages are keepalives or similar devnull-applications
488 },
489);
490
491$NODE{""} = $NODE{$NODE} = new AnyEvent::MP::Node::Self $NODE;
492$PORT{""} = sub {
493 my $tag = shift;
494 eval { &{ $node_req{$tag} ||= load_func $tag } };
495 $WARN->(2, "error processing node message: $@") if $@;
496};
497
498#############################################################################
499# seed management, try to keep connections to all seeds
500
501our %SEED_NODE; # seed ID => node ID|undef
502our %NODE_SEED; # map node ID to seed ID
503our %SEED_CONNECT; # $seed => transport_connector | 1=connected | 2=connecting
504our $SEED_WATCHER;
505
506# called before sending the greeting, grabs address we connect to
507push @AnyEvent::MP::Transport::HOOK_CONNECT, sub {
508 defined (my $seed = $_[0]{seed})
509 or return;
510
511 $SEED_CONNECT{$seed} = 2;
512};
513
514# called after receiving remote greeting, grabs remote node name
515push @AnyEvent::MP::Transport::HOOK_GREETING, sub {
516 defined (my $seed = $_[0]{seed})
517 or return;
518
519 # we rely on untrusted data here (the remote node name) this is
520 # hopefully ok, as this can at most be used for DOSing, which is easy
521 # when you can do MITM anyway.
522
523 # if we connect to ourselves, nuke this seed
524 if ($_[0]{remote_node} eq $AnyEvent::MP::Kernel::NODE) {
525 delete $SEED_NODE{$_[0]{seed}};
526 delete $NODE_SEED{$_[0]{seed}};
527 } else {
528 $SEED_NODE{$seed} = $_[0]{remote_node};
529 $NODE_SEED{$_[0]{remote_node}} = $seed;
530 }
531};
532
533## called when connection is up, same as above, but now verified
534#push @AnyEvent::MP::Transport::HOOK_CONNECTED, sub {
535# defined (my $seed = $_[0]{seed})
536# or return;
537# AE::log 5, "connected($seed)\n";#d#
538#
539# $SEED_NODE{$seed} = $_[0]{remote_node};
540# $NODE_SEED{$_[0]{remote_node}} = $seed;
541#};
542
543# called when connections get destroyed, update our data structures
544# and check for self-connects
545push @AnyEvent::MP::Transport::HOOK_DESTROY, sub {
546 # if we lost the connection to a seed node, make sure we start seeding
547 seed_again ()#d#
548 if exists $NODE_SEED{ $_[0]{remote_node} };
549
550 defined (my $seed = $_[0]{seed})
551 or return;
552
553 delete $SEED_CONNECT{$seed};
554};
555
556sub seed_connect {
557 my ($seed) = @_;
558
559 my ($host, $port) = AnyEvent::Socket::parse_hostport $seed
560 or Carp::croak "$seed: unparsable seed address";
561
562 $AnyEvent::MP::Kernel::WARN->(9, "trying connect to seed node $seed.");
563
564 # ughhh
565 $SEED_CONNECT{$seed} ||= AnyEvent::MP::Transport::mp_connect $host, $port,
566 seed => $seed,
567 sub {
568 $SEED_CONNECT{$seed} = 1;
569 },
570 ;
571}
572
573sub seed_all {
574# my $next = List::Util::max 1,
575# $AnyEvent::MP::Kernel::CONFIG->{connect_interval}
576# * ($nodecnt ? keys %AnyEvent::MP::Kernel::NODE : 1)
577# - rand;
578
579 my @seeds = grep {
580 !exists $SEED_CONNECT{$_}
581 && !(defined $SEED_NODE{$_} && node_is_up $SEED_NODE{$_})
582 } keys %SEED_NODE;
583
584 if (@seeds) {
585 # start conenction attempt for every seed we are not connected to yet
586 seed_connect $_
587 for @seeds;
588 } else {
589 # all seeds connected or connecting
590 undef $SEED_WATCHER;
591 }
592}
593
594sub seed_again {
595 $SEED_WATCHER ||= AE::timer 0, $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout}, \&seed_all;
596}
597
598# sets new seed list, starts connecting
599sub set_seeds(@) {
600 %SEED_NODE = ();
601 @SEED_NODE{@_} = ();
602
603 seed_again;
604}
605
606#############################################################################
607# configure
300 608
301sub _nodename { 609sub _nodename {
302 require POSIX; 610 require POSIX;
303 (POSIX::uname ())[1] 611 (POSIX::uname ())[1]
304} 612}
387sub configure(@) { 695sub configure(@) {
388 unshift @_, "profile" if @_ & 1; 696 unshift @_, "profile" if @_ & 1;
389 my (%kv) = @_; 697 my (%kv) = @_;
390 698
391 delete $NODE{$NODE}; # we do not support doing stuff before configure 699 delete $NODE{$NODE}; # we do not support doing stuff before configure
392 _seed; 700 _init_names;
393 701
394 my $profile = delete $kv{profile}; 702 my $profile = delete $kv{profile};
395 703
396 $profile = _nodename 704 $profile = _nodename
397 unless defined $profile; 705 unless defined $profile;
441 749
442 # the global service is mandatory currently 750 # the global service is mandatory currently
443 require AnyEvent::MP::Global; 751 require AnyEvent::MP::Global;
444 752
445 # connect to all seednodes 753 # connect to all seednodes
446 AnyEvent::MP::Global::set_seeds (map $_->recv, map _resolve $_, @$seeds); 754 set_seeds map $_->recv, map _resolve $_, @$seeds;
447 755
448 for (@{ $CONFIG->{services} }) { 756 for (@{ $CONFIG->{services} }) {
449 if (ref) { 757 if (ref) {
450 my ($func, @args) = @$_; 758 my ($func, @args) = @$_;
451 (load_func $func)->(@args); 759 (load_func $func)->(@args);
456 (load_func $_)->(); 764 (load_func $_)->();
457 } 765 }
458 } 766 }
459} 767}
460 768
461#############################################################################
462# node monitoring and info
463
464=item node_is_known $nodeid
465
466Returns true iff the given node is currently known to the system. The only
467time a node is known but not up currently is when a conenction request is
468pending.
469
470=cut
471
472sub node_is_known($) {
473 exists $NODE{$_[0]}
474}
475
476=item node_is_up $nodeid
477
478Returns true if the given node is "up", that is, the kernel thinks it has
479a working connection to it.
480
481If the node is known but not currently connected, returns C<0>. If the
482node is not known, returns C<undef>.
483
484=cut
485
486sub node_is_up($) {
487 ($NODE{$_[0]} or return)->{transport}
488 ? 1 : 0
489}
490
491=item known_nodes
492
493Returns the node IDs of all nodes currently known to this node, including
494itself and nodes not currently connected.
495
496=cut
497
498sub known_nodes() {
499 map $_->{id}, values %NODE
500}
501
502=item up_nodes
503
504Return the node IDs of all nodes that are currently connected (excluding
505the node itself).
506
507=cut
508
509sub up_nodes() {
510 map $_->{id}, grep $_->{transport}, values %NODE
511}
512
513=item $guard = mon_nodes $callback->($nodeid, $is_up, @reason)
514
515Registers a callback that is called each time a node goes up (a connection
516is established) or down (the connection is lost).
517
518Node up messages can only be followed by node down messages for the same
519node, and vice versa.
520
521Note that monitoring a node is usually better done by monitoring it's node
522port. This function is mainly of interest to modules that are concerned
523about the network topology and low-level connection handling.
524
525Callbacks I<must not> block and I<should not> send any messages.
526
527The function returns an optional guard which can be used to unregister
528the monitoring callback again.
529
530Example: make sure you call function C<newnode> for all nodes that are up
531or go up (and down).
532
533 newnode $_, 1 for up_nodes;
534 mon_nodes \&newnode;
535
536=cut
537
538our %MON_NODES;
539
540sub mon_nodes($) {
541 my ($cb) = @_;
542
543 $MON_NODES{$cb+0} = $cb;
544
545 defined wantarray && AnyEvent::Util::guard { delete $MON_NODES{$cb+0} }
546}
547
548sub _inject_nodeevent($$;@) {
549 my ($node, $up, @reason) = @_;
550
551 for my $cb (values %MON_NODES) {
552 eval { $cb->($node->{id}, $up, @reason); 1 }
553 or $WARN->(1, $@);
554 }
555
556 $WARN->(7, "$node->{id} is " . ($up ? "up" : "down") . " (@reason)");
557}
558
559#############################################################################
560# self node code
561
562sub _kill {
563 my $port = shift;
564
565 delete $PORT{$port}
566 or return; # killing nonexistent ports is O.K.
567 delete $PORT_DATA{$port};
568
569 my $mon = delete $LMON{$port}
570 or !@_
571 or $WARN->(2, "unmonitored local port $port died with reason: @_");
572
573 $_->(@_) for values %$mon;
574}
575
576sub _monitor {
577 return $_[2](no_such_port => "cannot monitor nonexistent port", "$NODE#$_[1]")
578 unless exists $PORT{$_[1]};
579
580 $LMON{$_[1]}{$_[2]+0} = $_[2];
581}
582
583sub _unmonitor {
584 delete $LMON{$_[1]}{$_[2]+0}
585 if exists $LMON{$_[1]};
586}
587
588our %node_req = (
589 # internal services
590
591 # monitoring
592 mon0 => sub { # stop monitoring a port for another node
593 my $portid = shift;
594 _unmonitor undef, $portid, delete $SRCNODE->{rmon}{$portid};
595 },
596 mon1 => sub { # start monitoring a port for another node
597 my $portid = shift;
598 Scalar::Util::weaken (my $node = $SRCNODE);
599 _monitor undef, $portid, $node->{rmon}{$portid} = sub {
600 delete $node->{rmon}{$portid};
601 $node->send (["", kil0 => $portid, @_])
602 if $node && $node->{transport};
603 };
604 },
605 # another node has killed a monitored port
606 kil0 => sub {
607 my $cbs = delete $SRCNODE->{lmon}{+shift}
608 or return;
609
610 $_->(@_) for @$cbs;
611 },
612
613 # "public" services - not actually public
614
615 # another node wants to kill a local port
616 kil => \&_kill,
617
618 # relay message to another node / generic echo
619 snd => \&snd,
620 snd_multiple => sub {
621 snd @$_ for @_
622 },
623
624 # informational
625 info => sub {
626 snd @_, $NODE;
627 },
628 known_nodes => sub {
629 snd @_, known_nodes;
630 },
631 up_nodes => sub {
632 snd @_, up_nodes;
633 },
634
635 # random utilities
636 eval => sub {
637 my @res = do { package main; eval shift };
638 snd @_, "$@", @res if @_;
639 },
640 time => sub {
641 snd @_, AE::time;
642 },
643 devnull => sub {
644 #
645 },
646 "" => sub {
647 # empty messages are keepalives or similar devnull-applications
648 },
649);
650
651$NODE{""} = $NODE{$NODE} = new AnyEvent::MP::Node::Self $NODE;
652$PORT{""} = sub {
653 my $tag = shift;
654 eval { &{ $node_req{$tag} ||= load_func $tag } };
655 $WARN->(2, "error processing node message: $@") if $@;
656};
657
658=back 769=back
659 770
660=head1 SEE ALSO 771=head1 SEE ALSO
661 772
662L<AnyEvent::MP>. 773L<AnyEvent::MP>.

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines