ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Kernel.pm
(Generate patch)

Comparing AnyEvent-MP/MP/Kernel.pm (file contents):
Revision 1.71 by root, Tue Feb 28 18:37:24 2012 UTC vs.
Revision 1.72 by root, Wed Feb 29 18:44:59 2012 UTC

525 # hopefully ok, as this can at most be used for DOSing, which is easy 525 # hopefully ok, as this can at most be used for DOSing, which is easy
526 # when you can do MITM anyway. 526 # when you can do MITM anyway.
527 527
528 # if we connect to ourselves, nuke this seed, but make sure we act like a seed 528 # if we connect to ourselves, nuke this seed, but make sure we act like a seed
529 if ($_[0]{remote_node} eq $AnyEvent::MP::Kernel::NODE) { 529 if ($_[0]{remote_node} eq $AnyEvent::MP::Kernel::NODE) {
530 #d# require AnyEvent::MP::Global; # every seed becomes a global node currently 530 require AnyEvent::MP::Global; # every seed becomes a global node currently
531 _become_global ();#d#
532 delete $SEED_NODE{$seed}; 531 delete $SEED_NODE{$seed};
533 delete $NODE_SEED{$seed}; 532 delete $NODE_SEED{$seed};
534 } else { 533 } else {
535 $SEED_NODE{$seed} = $_[0]{remote_node}; 534 $SEED_NODE{$seed} = $_[0]{remote_node};
536 $NODE_SEED{$_[0]{remote_node}} = $seed; 535 $NODE_SEED{$_[0]{remote_node}} = $seed;
597}; 596};
598 597
599############################################################################# 598#############################################################################
600# talk with/to global nodes 599# talk with/to global nodes
601 600
601# protocol messages:
602#
603# sent by all nodes
604# g_reg listeners - register node at all global nodes
605# g_slave - make other global node master of the sender
606# g_db_get - request global node database
607# g_find node id - g_reply with listeners
608# g_reply - used for rpc (global_req function)
609#
610# sent by global nodes (master to slave)
611# g_db_set db - master sets global database in slave
612# g_add node listen - add node to global database
613# g_del - remove node from global database
614#
615
602our $NODE_ADDR; # node => [listeners] - contains global nodes or, for global nodes, all nodes 616our $NODE_ADDR; # node => [listeners] - contains global nodes or, for global nodes, all nodes
603our $GLOBAL_ADDR; 617our $GLOBAL_ADDR;
604our $GLOBAL_MON; 618our $GLOBAL_MON;
605our $GLOBAL_TIMER; 619our $GLOBAL_TIMER;
606our $MASTER; # the global node we use for lookups ourselves 620our $MASTER; # the global node we use for lookups ourselves
617 if $MASTER; 631 if $MASTER;
618 632
619 defined wantarray && AnyEvent::Util::guard { delete $GLOBAL_REQ{$cb+0} } 633 defined wantarray && AnyEvent::Util::guard { delete $GLOBAL_REQ{$cb+0} }
620} 634}
621 635
622sub global_master { 636sub master_set {
623 $MASTER = $_[0]; 637 $MASTER = $_[0];
624 638
625 snd $MASTER, "global_slave"; 639 snd $MASTER, "g_slave";
626 # ask master for list of global nodes (already done by global_reg)
627 # snd $MASTER, "global_db_get";
628 640
629 # send queued requests 641 # send queued requests
630 while (my ($k, $v) = each %GLOBAL_REQ) { 642 while (my ($k, $v) = each %GLOBAL_REQ) {
631 snd $MASTER, @{$v->[1]}, $k; 643 snd $MASTER, @{$v->[1]}, $k;
632 } 644 }
633} 645}
634 646
635$node_req{global_reply} = sub { 647$node_req{g_reply} = sub {
636 my $id = shift; 648 my $id = shift;
637 649
638 my $cb = delete $GLOBAL_REQ{$id} 650 my $cb = delete $GLOBAL_REQ{$id}
639 or return; 651 or return;
640 652
642}; 654};
643 655
644{#d# 656{#d#
645# global code 657# global code
646 658
647our %SLAVE; 659our %GLOBAL_SLAVE;
648 660
649# other node wants to make us the master 661# other node wants to make us the master
650$node_req{global_slave} = sub { 662$node_req{g_slave} = sub {
663 require AnyEvent::MP::Global;
664
651 undef $SLAVE{$SRCNODE->{id}}; 665 undef $GLOBAL_SLAVE{$SRCNODE->{id}};
652 snd $SRCNODE->{id}, global_db_set => $GLOBAL_ADDR; 666 snd $SRCNODE->{id}, g_set => $GLOBAL_ADDR;
653}; 667};
654 668
655$node_req{global_find} = sub { 669$node_req{g_find} = sub {
656 my ($node, $id) = @_; 670 my ($node, $id) = @_;
657 671
658 snd $SRCNODE->{id}, global_reply => $id, $NODE_ADDR->{$node}; 672 snd $SRCNODE->{id}, g_reply => $id, $NODE_ADDR->{$node};
659}; 673};
660 674
661# delete slaves on node-down 675# delete slaves on node-down
662mon_nodes sub { 676mon_nodes sub {
663 return if $_[1]; 677 return if $_[1];
664 678
665 delete $SLAVE{$_[0]}; 679 delete $GLOBAL_SLAVE{$_[0]};
666}; 680};
667 681
668}#d# 682}#d#
669 683
670# reply with global db
671$node_req{global_db_get} = sub {
672 snd $SRCNODE->{id}, global_db_set => $GLOBAL_ADDR;
673};
674
675$node_req{global_db_set} = sub { 684$node_req{g_set} = sub {
676 $GLOBAL_ADDR = shift; 685 $GLOBAL_ADDR = shift;
677 $GLOBAL_TIMER = AE::timer 0, $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout}, sub { 686 $GLOBAL_TIMER = AE::timer 0, $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout}, sub {
678 (add_node $_)->connect 687 (add_node $_)->connect
679 for keys %$GLOBAL_ADDR; 688 for keys %$GLOBAL_ADDR;
680 }; 689 };
681}; 690};
682 691
683sub global_search { 692sub master_search {
684 if ($GLOBAL) {
685 $MASTER = $NODE;
686 undef $GLOBAL_MON;
687 } else {
688 for (keys %NODE_SEED, keys %$GLOBAL_ADDR) { 693 for (keys %NODE_SEED, keys %$GLOBAL_ADDR) {
689 if (node_is_up $_) { 694 if (node_is_up $_) {
690 global_master $_; 695 master_set $_;
691 return; 696 return;
697 }
698 }
699
700 $GLOBAL_MON = mon_nodes sub {
701 return unless $_[1]; # we are only interested in node-ups
702 return unless $NODE_SEED{$_[0]}; # we are only interested in seed nodes
703
704 master_set $_[0];
705
706 $GLOBAL_MON = mon_nodes sub {
707 if ($_[0] eq $MASTER && !$_[1]) {
708 undef $MASTER;
709 master_search ();
692 } 710 }
693 }
694
695 $GLOBAL_MON = mon_nodes sub {
696 return unless $_[1]; # we are only interested in node-ups
697 return unless $NODE_SEED{$_[0]}; # we are only interested in seed nodes
698
699 global_master $_[0];
700
701 $GLOBAL_MON = mon_nodes sub {
702 if ($_[0] eq $MASTER && !$_[1]) {
703 undef $MASTER;
704 global_search ();
705 }
706 };
707 }; 711 };
708 } 712 };
709} 713}
710 714
711# register remote node listeners 715# register remote node listeners
712$node_req{global_reg} = sub { 716$node_req{g_reg} = sub {
713 if ($GLOBAL) { 717 if ($GLOBAL) {
714 $NODE_ADDR->{$SRCNODE->{id}} = $_[0]; 718 $NODE_ADDR->{$SRCNODE->{id}} = $_[0];
715 } else { 719 } else {
716 # should not happen normally, except when there are bad races 720 # should not happen normally, except when there are bad races
717 $WARN->(1, "$SRCNODE->{id} treats us as global node, but we aren't"); 721 $WARN->(1, "$SRCNODE->{id} treats us as global node, but we aren't");
718 } 722 }
719}; 723};
720 724
721# also used 725# also used
722$node_req{global_add} = sub { 726$node_req{g_add} = sub {
723 my ($node, $listener) = @_; 727 my ($node, $listener) = @_;
724 728
725 # no-op if we already know the info 729 # no-op if we already know the info
726 return 730 return
727 if (join "\x00", @$listener) eq (join "\x00", @{ $GLOBAL_ADDR->{$node} }); 731 if (join "\x00", @$listener) eq (join "\x00", @{ $GLOBAL_ADDR->{$node} });
728 732
729 $GLOBAL_ADDR->{$node} = $listener; 733 $GLOBAL_ADDR->{$node} = $listener;
730 734
731 # let us be a good citizen and register (also connects) 735 # let us be a good citizen and register (also connects)
732 snd $node, global_reg => $LISTENER; 736 snd $node, g_reg => $LISTENER;
733 737
734 # if we are global, we update all our slaves 738 # if we are global, we update all our slaves
735 our %SLAVE; # ugh, will be in AnyEvent::MP::Global 739 our %GLOBAL_SLAVE; # ugh, will be in AnyEvent::MP::Global
736 snd $_, global_add => $node, $listener 740 snd $_, g_add => $node, $listener
737 for keys %SLAVE; 741 for keys %GLOBAL_SLAVE;
738}; 742};
739 743
740$node_req{global_del} = sub { 744$node_req{g_del} = sub {
741 my ($node) = @_; 745 my ($node) = @_;
742 746
743 delete $GLOBAL_ADDR->{$node}; 747 delete $GLOBAL_ADDR->{$node};
744}; 748};
745 749
746# tell every global node our listeners 750# tell every global node our listeners
747push @AnyEvent::MP::Transport::HOOK_CONNECT, sub { 751push @AnyEvent::MP::Transport::HOOK_CONNECT, sub {
748 snd $_[0]{remote_node}, global_reg => $LISTENER 752 snd $_[0]{remote_node}, g_reg => $LISTENER
749 if $_[0]{remote_greeting}{global}; 753 if $_[0]{remote_greeting}{global};
750}; 754};
751 755
752sub _become_global { #d# hack 756master_search;
753 *_become_global = sub { };
754
755 warn "becoming global\n";#d#
756
757 $GLOBAL = 1;
758 global_search;
759
760 $GLOBAL_ADDR->{$NODE} = $LISTENER;
761
762 $GLOBAL_MON = mon_nodes sub {
763 return if $_[1];
764
765 delete $NODE_ADDR->{$_[0]};
766
767 if (delete $GLOBAL_ADDR->{$_[0]}) {
768 # if the node is global, tell our slaves
769
770 warn "global_del $_[0]\n";#d#
771 our %SLAVE; # ugh, will be in AnyEvent::MP::Global
772 snd $_, global_del => $_[0]
773 for keys %SLAVE;
774 }
775 };
776
777 # tell everybody who connects that we are a master
778 push @AnyEvent::MP::Transport::HOOK_GREET, sub {
779 $_[0]{local_greeting}{global} = 1;
780 };
781
782 # tell every global node that connects that we are global too
783 push @AnyEvent::MP::Transport::HOOK_CONNECT, sub {
784 snd $_[0], global_add => $NODE, $LISTENER
785 if $_[0]{remote_greeting}{global};
786 };
787
788 # tell everybody else that we are global now
789 snd $_ => global_add => $NODE, $LISTENER
790 for up_nodes;
791}
792
793global_search;
794 757
795############################################################################# 758#############################################################################
796# configure 759# configure
797 760
798sub _nodename { 761sub _nodename {
893 $profile = _nodename 856 $profile = _nodename
894 unless defined $profile; 857 unless defined $profile;
895 858
896 $CONFIG = AnyEvent::MP::Config::find_profile $profile, %kv; 859 $CONFIG = AnyEvent::MP::Config::find_profile $profile, %kv;
897 860
898 my $node = exists $CONFIG->{nodeid} ? $CONFIG->{nodeid} : $profile; 861 my $node = exists $CONFIG->{nodeid} ? $CONFIG->{nodeid} : "$profile/";
899 862
900 $node or Carp::croak "$node: illegal node ID (see AnyEvent::MP manpage for syntax)\n"; 863 $node or Carp::croak "$node: illegal node ID (see AnyEvent::MP manpage for syntax)\n";
901 864
902 $NODE = $node 865 $NODE = $node;
903 unless $node eq "anon/"; 866 $NODE =~ s%/$%/$RUNIQ%;
904 867
905 $NODE{$NODE} = $NODE{""}; 868 $NODE{$NODE} = $NODE{""};
906 $NODE{$NODE}{id} = $NODE; 869 $NODE{$NODE}{id} = $NODE;
907 870
908 my $seeds = $CONFIG->{seeds}; 871 my $seeds = $CONFIG->{seeds};
941 904
942 # connect to all seednodes 905 # connect to all seednodes
943 set_seeds map $_->recv, map _resolve $_, @$seeds; 906 set_seeds map $_->recv, map _resolve $_, @$seeds;
944 907
945 if ($NODE eq "atha") {;#d# 908 if ($NODE eq "atha") {;#d#
946 my $w; $w = AE::timer 4, 0, sub { undef $w; _become_global }; 909 my $w; $w = AE::timer 4, 0, sub { undef $w; require AnyEvent::MP::Global };#d#
947 } 910 }
948 911
949 for (@{ $CONFIG->{services} }) { 912 for (@{ $CONFIG->{services} }) {
950 if (ref) { 913 if (ref) {
951 my ($func, @args) = @$_; 914 my ($func, @args) = @$_;

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines