ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Kernel.pm
(Generate patch)

Comparing AnyEvent-MP/MP/Kernel.pm (file contents):
Revision 1.70 by root, Sun Feb 26 11:12:54 2012 UTC vs.
Revision 1.71 by root, Tue Feb 28 18:37:24 2012 UTC

33use AnyEvent::MP::Node; 33use AnyEvent::MP::Node;
34use AnyEvent::MP::Transport; 34use AnyEvent::MP::Transport;
35 35
36use base "Exporter"; 36use base "Exporter";
37 37
38our @EXPORT_OK = qw(
39 %NODE %PORT %PORT_DATA $UNIQ $RUNIQ $ID
40);
41
38our @EXPORT = qw( 42our @EXPORT = qw(
39 %NODE %PORT %PORT_DATA $UNIQ $RUNIQ $ID
40 add_node load_func snd_to_func snd_on eval_on 43 add_node load_func snd_to_func snd_on eval_on
41 44
42 NODE $NODE node_of snd kil port_is_local 45 NODE $NODE node_of snd kil port_is_local
43 configure 46 configure
44 up_nodes mon_nodes node_is_up 47 up_nodes mon_nodes node_is_up
159our (%PORT, %PORT_DATA); # local ports 162our (%PORT, %PORT_DATA); # local ports
160 163
161our %RMON; # local ports monitored by remote nodes ($RMON{nodeid}{portid} == cb) 164our %RMON; # local ports monitored by remote nodes ($RMON{nodeid}{portid} == cb)
162our %LMON; # monitored _local_ ports 165our %LMON; # monitored _local_ ports
163 166
167our $GLOBAL; # true if node is a global ("directory") node
164our %LISTENER; 168our %LISTENER;
165our $LISTENER; # our listeners, as arrayref 169our $LISTENER; # our listeners, as arrayref
166 170
167our $SRCNODE; # holds the sending node during _inject 171our $SRCNODE; # holds the sending node during _inject
168 172
210# this function adds a node-ref, so you can send stuff to it 214# this function adds a node-ref, so you can send stuff to it
211# it is basically the central routing component. 215# it is basically the central routing component.
212sub add_node { 216sub add_node {
213 my ($node) = @_; 217 my ($node) = @_;
214 218
215 $NODE{$node} ||= new AnyEvent::MP::Node::Direct $node 219 $NODE{$node} ||= new AnyEvent::MP::Node::Remote $node
216} 220}
217 221
218sub snd(@) { 222sub snd(@) {
219 my ($nodeid, $portid) = split /#/, shift, 2; 223 my ($nodeid, $portid) = split /#/, shift, 2;
220 224
258 if $nodeid ne $NODE; 262 if $nodeid ne $NODE;
259 263
260 defined $nodeid #d#UGLY 264 defined $nodeid #d#UGLY
261 or Carp::croak "'undef' is not a valid node ID/port ID"; 265 or Carp::croak "'undef' is not a valid node ID/port ID";
262 266
263 ($NODE{$nodeid} || add_node $nodeid)->send (["", @_]); 267 ($NODE{$nodeid} || add_node $nodeid)->{send} (["", @_]);
264} 268}
265 269
266=item snd_on $node, @msg 270=item snd_on $node, @msg
267 271
268Executes C<snd> with the given C<@msg> (which must include the destination 272Executes C<snd> with the given C<@msg> (which must include the destination
301############################################################################# 305#############################################################################
302# node monitoring and info 306# node monitoring and info
303 307
304=item node_is_known $nodeid 308=item node_is_known $nodeid
305 309
310#TODO#
306Returns true iff the given node is currently known to the system. The only 311Returns true iff the given node is currently known to this node.
307time a node is known but not up currently is when a connection request is
308pending.
309 312
310=cut 313=cut
311 314
312sub node_is_known($) { 315sub node_is_known($) {
313 exists $NODE{$_[0]} 316 exists $NODE{$_[0]}
328 ? 1 : 0 331 ? 1 : 0
329} 332}
330 333
331=item known_nodes 334=item known_nodes
332 335
336#TODO#
333Returns the node IDs of all nodes currently known to this node, including 337Returns the node IDs of all nodes currently known to this node, including
334itself and nodes not currently connected. 338itself and nodes not currently connected.
335 339
336=cut 340=cut
337 341
356is established) or down (the connection is lost). 360is established) or down (the connection is lost).
357 361
358Node up messages can only be followed by node down messages for the same 362Node up messages can only be followed by node down messages for the same
359node, and vice versa. 363node, and vice versa.
360 364
361Note that monitoring a node is usually better done by monitoring it's node 365Note that monitoring a node is usually better done by monitoring its node
362port. This function is mainly of interest to modules that are concerned 366port. This function is mainly of interest to modules that are concerned
363about the network topology and low-level connection handling. 367about the network topology and low-level connection handling.
364 368
365Callbacks I<must not> block and I<should not> send any messages. 369Callbacks I<must not> block and I<should not> send any messages.
366 370
494 eval { &{ $node_req{$tag} ||= load_func $tag } }; 498 eval { &{ $node_req{$tag} ||= load_func $tag } };
495 $WARN->(2, "error processing node message: $@") if $@; 499 $WARN->(2, "error processing node message: $@") if $@;
496}; 500};
497 501
498############################################################################# 502#############################################################################
499# seed management, try to keep connections to all seeds 503# seed management, try to keep connections to all seeds at all times
500 504
501our %SEED_NODE; # seed ID => node ID|undef 505our %SEED_NODE; # seed ID => node ID|undef
502our %NODE_SEED; # map node ID to seed ID 506our %NODE_SEED; # map node ID to seed ID
503our %SEED_CONNECT; # $seed => transport_connector | 1=connected | 2=connecting 507our %SEED_CONNECT; # $seed => transport_connector | 1=connected | 2=connecting
504our $SEED_WATCHER; 508our $SEED_WATCHER;
505 509our $SEED_RETRY;
506# called before sending the greeting, grabs address we connect to
507push @AnyEvent::MP::Transport::HOOK_CONNECT, sub {
508 defined (my $seed = $_[0]{seed})
509 or return;
510
511 $SEED_CONNECT{$seed} = 2;
512};
513
514# called after receiving remote greeting, grabs remote node name
515push @AnyEvent::MP::Transport::HOOK_GREETING, sub {
516 defined (my $seed = $_[0]{seed})
517 or return;
518
519 # we rely on untrusted data here (the remote node name) this is
520 # hopefully ok, as this can at most be used for DOSing, which is easy
521 # when you can do MITM anyway.
522
523 # if we connect to ourselves, nuke this seed
524 if ($_[0]{remote_node} eq $AnyEvent::MP::Kernel::NODE) {
525 require AnyEvent::MP::Global; # every seed becomes a global node currently
526 delete $SEED_NODE{$_[0]{seed}};
527 delete $NODE_SEED{$_[0]{seed}};
528 } else {
529 $SEED_NODE{$seed} = $_[0]{remote_node};
530 $NODE_SEED{$_[0]{remote_node}} = $seed;
531 }
532};
533
534## called when connection is up, same as above, but now verified
535#push @AnyEvent::MP::Transport::HOOK_CONNECTED, sub {
536# defined (my $seed = $_[0]{seed})
537# or return;
538# AE::log 5, "connected($seed)\n";#d#
539#
540# $SEED_NODE{$seed} = $_[0]{remote_node};
541# $NODE_SEED{$_[0]{remote_node}} = $seed;
542#};
543
544# called when connections get destroyed, update our data structures
545# and check for self-connects
546push @AnyEvent::MP::Transport::HOOK_DESTROY, sub {
547 # if we lost the connection to a seed node, make sure we start seeding
548 seed_again ()#d#
549 if exists $NODE_SEED{ $_[0]{remote_node} };
550
551 defined (my $seed = $_[0]{seed})
552 or return;
553
554 delete $SEED_CONNECT{$seed};
555};
556 510
557sub seed_connect { 511sub seed_connect {
558 my ($seed) = @_; 512 my ($seed) = @_;
559 513
560 my ($host, $port) = AnyEvent::Socket::parse_hostport $seed 514 my ($host, $port) = AnyEvent::Socket::parse_hostport $seed
561 or Carp::croak "$seed: unparsable seed address"; 515 or Carp::croak "$seed: unparsable seed address";
562 516
563 $AnyEvent::MP::Kernel::WARN->(9, "trying connect to seed node $seed."); 517 $AnyEvent::MP::Kernel::WARN->(9, "trying connect to seed node $seed.");
564 518
565 # ughhh
566 $SEED_CONNECT{$seed} ||= AnyEvent::MP::Transport::mp_connect $host, $port, 519 $SEED_CONNECT{$seed} ||= AnyEvent::MP::Transport::mp_connect
567 seed => $seed, 520 $host, $port,
521 on_greeted => sub {
522 # called after receiving remote greeting, learn remote node name
523
524 # we rely on untrusted data here (the remote node name) this is
525 # hopefully ok, as this can at most be used for DOSing, which is easy
526 # when you can do MITM anyway.
527
528 # if we connect to ourselves, nuke this seed, but make sure we act like a seed
529 if ($_[0]{remote_node} eq $AnyEvent::MP::Kernel::NODE) {
530 #d# require AnyEvent::MP::Global; # every seed becomes a global node currently
531 _become_global ();#d#
532 delete $SEED_NODE{$seed};
533 delete $NODE_SEED{$seed};
534 } else {
535 $SEED_NODE{$seed} = $_[0]{remote_node};
536 $NODE_SEED{$_[0]{remote_node}} = $seed;
537 }
538 },
539 on_destroy => sub {
540 delete $SEED_CONNECT{$seed};
541 },
568 sub { 542 sub {
569 $SEED_CONNECT{$seed} = 1; 543 $SEED_CONNECT{$seed} = 1;
570 }, 544 }
571 ; 545 ;
572} 546}
573 547
574sub seed_all { 548sub seed_all {
575# my $next = List::Util::max 1, 549# my $next = List::Util::max 1,
584 558
585 if (@seeds) { 559 if (@seeds) {
586 # start connection attempt for every seed we are not connected to yet 560 # start connection attempt for every seed we are not connected to yet
587 seed_connect $_ 561 seed_connect $_
588 for @seeds; 562 for @seeds;
563
564 $SEED_RETRY = $SEED_RETRY * 2 + rand;
565 $SEED_RETRY = $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout}
566 if $SEED_RETRY > $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout};
567
568 $SEED_WATCHER = AE::timer $SEED_RETRY, 0, \&seed_all;
569
589 } else { 570 } else {
590 # all seeds connected or connecting 571 # all seeds connected or connecting, no need to restart timer
591 undef $SEED_WATCHER; 572 undef $SEED_WATCHER;
592 } 573 }
593} 574}
594 575
595sub seed_again { 576sub seed_again {
596 $SEED_WATCHER ||= AE::timer 0, $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout}, \&seed_all; 577 $SEED_RETRY = 1;
578 $SEED_WATCHER ||= AE::timer 1, 0, \&seed_all;
597} 579}
598 580
599# sets new seed list, starts connecting 581# sets new seed list, starts connecting
600sub set_seeds(@) { 582sub set_seeds(@) {
601 %SEED_NODE = (); 583 %SEED_NODE = ();
584 %NODE_SEED = ();
585 %SEED_CONNECT = ();
586
602 @SEED_NODE{@_} = (); 587 @SEED_NODE{@_} = ();
603 588
589 seed_again;#d#
590 seed_all;
591}
592
593mon_nodes sub {
594 # if we lost the connection to a seed node, make sure we are seeding
604 seed_again; 595 seed_again
596 if !$_[1] && exists $NODE_SEED{$_[0]};
597};
598
599#############################################################################
600# talk with/to global nodes
601
602our $NODE_ADDR; # node => [listeners] - contains global nodes or, for global nodes, all nodes
603our $GLOBAL_ADDR;
604our $GLOBAL_MON;
605our $GLOBAL_TIMER;
606our $MASTER; # the global node we use for lookups ourselves
607
608# master requests
609our %GLOBAL_REQ; # [$cb, \@req]
610
611sub global_req {
612 my $cb = pop;
613
614 $GLOBAL_REQ{$cb+0} = [$cb, [@_]];
615
616 snd $MASTER, @_, $cb+0
617 if $MASTER;
618
619 defined wantarray && AnyEvent::Util::guard { delete $GLOBAL_REQ{$cb+0} }
605} 620}
621
622sub global_master {
623 $MASTER = $_[0];
624
625 snd $MASTER, "global_slave";
626 # ask master for list of global nodes (already done by global_reg)
627 # snd $MASTER, "global_db_get";
628
629 # send queued requests
630 while (my ($k, $v) = each %GLOBAL_REQ) {
631 snd $MASTER, @{$v->[1]}, $k;
632 }
633}
634
635$node_req{global_reply} = sub {
636 my $id = shift;
637
638 my $cb = delete $GLOBAL_REQ{$id}
639 or return;
640
641 $cb->[0]->(@_);
642};
643
644{#d#
645# global code
646
647our %SLAVE;
648
649# other node wants to make us the master
650$node_req{global_slave} = sub {
651 undef $SLAVE{$SRCNODE->{id}};
652 snd $SRCNODE->{id}, global_db_set => $GLOBAL_ADDR;
653};
654
655$node_req{global_find} = sub {
656 my ($node, $id) = @_;
657
658 snd $SRCNODE->{id}, global_reply => $id, $NODE_ADDR->{$node};
659};
660
661# delete slaves on node-down
662mon_nodes sub {
663 return if $_[1];
664
665 delete $SLAVE{$_[0]};
666};
667
668}#d#
669
670# reply with global db
671$node_req{global_db_get} = sub {
672 snd $SRCNODE->{id}, global_db_set => $GLOBAL_ADDR;
673};
674
675$node_req{global_db_set} = sub {
676 $GLOBAL_ADDR = shift;
677 $GLOBAL_TIMER = AE::timer 0, $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout}, sub {
678 (add_node $_)->connect
679 for keys %$GLOBAL_ADDR;
680 };
681};
682
683sub global_search {
684 if ($GLOBAL) {
685 $MASTER = $NODE;
686 undef $GLOBAL_MON;
687 } else {
688 for (keys %NODE_SEED, keys %$GLOBAL_ADDR) {
689 if (node_is_up $_) {
690 global_master $_;
691 return;
692 }
693 }
694
695 $GLOBAL_MON = mon_nodes sub {
696 return unless $_[1]; # we are only interested in node-ups
697 return unless $NODE_SEED{$_[0]}; # we are only interested in seed nodes
698
699 global_master $_[0];
700
701 $GLOBAL_MON = mon_nodes sub {
702 if ($_[0] eq $MASTER && !$_[1]) {
703 undef $MASTER;
704 global_search ();
705 }
706 };
707 };
708 }
709}
710
711# register remote node listeners
712$node_req{global_reg} = sub {
713 if ($GLOBAL) {
714 $NODE_ADDR->{$SRCNODE->{id}} = $_[0];
715 } else {
716 # should not happen normally, except when there are bad races
717 $WARN->(1, "$SRCNODE->{id} treats us as global node, but we aren't");
718 }
719};
720
721# also used
722$node_req{global_add} = sub {
723 my ($node, $listener) = @_;
724
725 # no-op if we already know the info
726 return
727 if (join "\x00", @$listener) eq (join "\x00", @{ $GLOBAL_ADDR->{$node} });
728
729 $GLOBAL_ADDR->{$node} = $listener;
730
731 # let us be a good citizen and register (also connects)
732 snd $node, global_reg => $LISTENER;
733
734 # if we are global, we update all our slaves
735 our %SLAVE; # ugh, will be in AnyEvent::MP::Global
736 snd $_, global_add => $node, $listener
737 for keys %SLAVE;
738};
739
740$node_req{global_del} = sub {
741 my ($node) = @_;
742
743 delete $GLOBAL_ADDR->{$node};
744};
745
746# tell every global node our listeners
747push @AnyEvent::MP::Transport::HOOK_CONNECT, sub {
748 snd $_[0]{remote_node}, global_reg => $LISTENER
749 if $_[0]{remote_greeting}{global};
750};
751
752sub _become_global { #d# hack
753 *_become_global = sub { };
754
755 warn "becoming global\n";#d#
756
757 $GLOBAL = 1;
758 global_search;
759
760 $GLOBAL_ADDR->{$NODE} = $LISTENER;
761
762 $GLOBAL_MON = mon_nodes sub {
763 return if $_[1];
764
765 delete $NODE_ADDR->{$_[0]};
766
767 if (delete $GLOBAL_ADDR->{$_[0]}) {
768 # if the node is global, tell our slaves
769
770 warn "global_del $_[0]\n";#d#
771 our %SLAVE; # ugh, will be in AnyEvent::MP::Global
772 snd $_, global_del => $_[0]
773 for keys %SLAVE;
774 }
775 };
776
777 # tell everybody who connects that we are a master
778 push @AnyEvent::MP::Transport::HOOK_GREET, sub {
779 $_[0]{local_greeting}{global} = 1;
780 };
781
782 # tell every global node that connects that we are global too
783 push @AnyEvent::MP::Transport::HOOK_CONNECT, sub {
784 snd $_[0], global_add => $NODE, $LISTENER
785 if $_[0]{remote_greeting}{global};
786 };
787
788 # tell everybody else that we are global now
789 snd $_ => global_add => $NODE, $LISTENER
790 for up_nodes;
791}
792
793global_search;
606 794
607############################################################################# 795#############################################################################
608# configure 796# configure
609 797
610sub _nodename { 798sub _nodename {
747 } 935 }
748 936
749 $WARN->(8, "node listens on [@$LISTENER]."); 937 $WARN->(8, "node listens on [@$LISTENER].");
750 938
751 # the global service is mandatory currently 939 # the global service is mandatory currently
752 require AnyEvent::MP::Global; 940 #require AnyEvent::MP::Global;
753 941
754 # connect to all seednodes 942 # connect to all seednodes
755 set_seeds map $_->recv, map _resolve $_, @$seeds; 943 set_seeds map $_->recv, map _resolve $_, @$seeds;
944
945 if ($NODE eq "atha") {;#d#
946 my $w; $w = AE::timer 4, 0, sub { undef $w; _become_global };
947 }
756 948
757 for (@{ $CONFIG->{services} }) { 949 for (@{ $CONFIG->{services} }) {
758 if (ref) { 950 if (ref) {
759 my ($func, @args) = @$_; 951 my ($func, @args) = @$_;
760 (load_func $func)->(@args); 952 (load_func $func)->(@args);

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines