… | |
… | |
525 | # hopefully ok, as this can at most be used for DOSing, which is easy |
525 | # hopefully ok, as this can at most be used for DOSing, which is easy |
526 | # when you can do MITM anyway. |
526 | # when you can do MITM anyway. |
527 | |
527 | |
528 | # if we connect to ourselves, nuke this seed, but make sure we act like a seed |
528 | # if we connect to ourselves, nuke this seed, but make sure we act like a seed |
529 | if ($_[0]{remote_node} eq $AnyEvent::MP::Kernel::NODE) { |
529 | if ($_[0]{remote_node} eq $AnyEvent::MP::Kernel::NODE) { |
530 | #d# require AnyEvent::MP::Global; # every seed becomes a global node currently |
530 | require AnyEvent::MP::Global; # every seed becomes a global node currently |
531 | _become_global ();#d# |
|
|
532 | delete $SEED_NODE{$seed}; |
531 | delete $SEED_NODE{$seed}; |
533 | delete $NODE_SEED{$seed}; |
532 | delete $NODE_SEED{$seed}; |
534 | } else { |
533 | } else { |
535 | $SEED_NODE{$seed} = $_[0]{remote_node}; |
534 | $SEED_NODE{$seed} = $_[0]{remote_node}; |
536 | $NODE_SEED{$_[0]{remote_node}} = $seed; |
535 | $NODE_SEED{$_[0]{remote_node}} = $seed; |
… | |
… | |
597 | }; |
596 | }; |
598 | |
597 | |
599 | ############################################################################# |
598 | ############################################################################# |
600 | # talk with/to global nodes |
599 | # talk with/to global nodes |
601 | |
600 | |
|
|
601 | # protocol messages: |
|
|
602 | # |
|
|
603 | # sent by all nodes |
|
|
604 | # g_reg listeners - register node at all global nodes |
|
|
605 | # g_slave - make other global node master of the sender |
|
|
606 | # g_db_get - request global node database |
|
|
607 | # g_find node id - g_reply with listeners |
|
|
608 | # g_reply - used for rpc (global_req function) |
|
|
609 | # |
|
|
610 | # sent by global nodes (master to slave) |
|
|
611 | # g_db_set db - master sets global database in slave |
|
|
612 | # g_add node listen - add node to global database |
|
|
613 | # g_del - remove node from global database |
|
|
614 | # |
|
|
615 | |
602 | our $NODE_ADDR; # node => [listeners] - contains global nodes or, for global nodes, all nodes |
616 | our $NODE_ADDR; # node => [listeners] - contains global nodes or, for global nodes, all nodes |
603 | our $GLOBAL_ADDR; |
617 | our $GLOBAL_ADDR; |
604 | our $GLOBAL_MON; |
618 | our $GLOBAL_MON; |
605 | our $GLOBAL_TIMER; |
619 | our $GLOBAL_TIMER; |
606 | our $MASTER; # the global node we use for lookups ourselves |
620 | our $MASTER; # the global node we use for lookups ourselves |
… | |
… | |
617 | if $MASTER; |
631 | if $MASTER; |
618 | |
632 | |
619 | defined wantarray && AnyEvent::Util::guard { delete $GLOBAL_REQ{$cb+0} } |
633 | defined wantarray && AnyEvent::Util::guard { delete $GLOBAL_REQ{$cb+0} } |
620 | } |
634 | } |
621 | |
635 | |
622 | sub global_master { |
636 | sub master_set { |
623 | $MASTER = $_[0]; |
637 | $MASTER = $_[0]; |
624 | |
638 | |
625 | snd $MASTER, "global_slave"; |
639 | snd $MASTER, "g_slave"; |
626 | # ask master for list of global nodes (already done by global_reg) |
|
|
627 | # snd $MASTER, "global_db_get"; |
|
|
628 | |
640 | |
629 | # send queued requests |
641 | # send queued requests |
630 | while (my ($k, $v) = each %GLOBAL_REQ) { |
642 | while (my ($k, $v) = each %GLOBAL_REQ) { |
631 | snd $MASTER, @{$v->[1]}, $k; |
643 | snd $MASTER, @{$v->[1]}, $k; |
632 | } |
644 | } |
633 | } |
645 | } |
634 | |
646 | |
635 | $node_req{global_reply} = sub { |
647 | $node_req{g_reply} = sub { |
636 | my $id = shift; |
648 | my $id = shift; |
637 | |
649 | |
638 | my $cb = delete $GLOBAL_REQ{$id} |
650 | my $cb = delete $GLOBAL_REQ{$id} |
639 | or return; |
651 | or return; |
640 | |
652 | |
… | |
… | |
642 | }; |
654 | }; |
643 | |
655 | |
644 | {#d# |
656 | {#d# |
645 | # global code |
657 | # global code |
646 | |
658 | |
647 | our %SLAVE; |
659 | our %GLOBAL_SLAVE; |
648 | |
660 | |
649 | # other node wants to make us the master |
661 | # other node wants to make us the master |
650 | $node_req{global_slave} = sub { |
662 | $node_req{g_slave} = sub { |
|
|
663 | require AnyEvent::MP::Global; |
|
|
664 | |
651 | undef $SLAVE{$SRCNODE->{id}}; |
665 | undef $GLOBAL_SLAVE{$SRCNODE->{id}}; |
652 | snd $SRCNODE->{id}, global_db_set => $GLOBAL_ADDR; |
666 | snd $SRCNODE->{id}, g_set => $GLOBAL_ADDR; |
653 | }; |
667 | }; |
654 | |
668 | |
655 | $node_req{global_find} = sub { |
669 | $node_req{g_find} = sub { |
656 | my ($node, $id) = @_; |
670 | my ($node, $id) = @_; |
657 | |
671 | |
658 | snd $SRCNODE->{id}, global_reply => $id, $NODE_ADDR->{$node}; |
672 | snd $SRCNODE->{id}, g_reply => $id, $NODE_ADDR->{$node}; |
659 | }; |
673 | }; |
660 | |
674 | |
661 | # delete slaves on node-down |
675 | # delete slaves on node-down |
662 | mon_nodes sub { |
676 | mon_nodes sub { |
663 | return if $_[1]; |
677 | return if $_[1]; |
664 | |
678 | |
665 | delete $SLAVE{$_[0]}; |
679 | delete $GLOBAL_SLAVE{$_[0]}; |
666 | }; |
680 | }; |
667 | |
681 | |
668 | }#d# |
682 | }#d# |
669 | |
683 | |
670 | # reply with global db |
|
|
671 | $node_req{global_db_get} = sub { |
|
|
672 | snd $SRCNODE->{id}, global_db_set => $GLOBAL_ADDR; |
|
|
673 | }; |
|
|
674 | |
|
|
675 | $node_req{global_db_set} = sub { |
684 | $node_req{g_set} = sub { |
676 | $GLOBAL_ADDR = shift; |
685 | $GLOBAL_ADDR = shift; |
677 | $GLOBAL_TIMER = AE::timer 0, $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout}, sub { |
686 | $GLOBAL_TIMER = AE::timer 0, $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout}, sub { |
678 | (add_node $_)->connect |
687 | (add_node $_)->connect |
679 | for keys %$GLOBAL_ADDR; |
688 | for keys %$GLOBAL_ADDR; |
680 | }; |
689 | }; |
681 | }; |
690 | }; |
682 | |
691 | |
683 | sub global_search { |
692 | sub master_search { |
684 | if ($GLOBAL) { |
|
|
685 | $MASTER = $NODE; |
|
|
686 | undef $GLOBAL_MON; |
|
|
687 | } else { |
|
|
688 | for (keys %NODE_SEED, keys %$GLOBAL_ADDR) { |
693 | for (keys %NODE_SEED, keys %$GLOBAL_ADDR) { |
689 | if (node_is_up $_) { |
694 | if (node_is_up $_) { |
690 | global_master $_; |
695 | master_set $_; |
691 | return; |
696 | return; |
|
|
697 | } |
|
|
698 | } |
|
|
699 | |
|
|
700 | $GLOBAL_MON = mon_nodes sub { |
|
|
701 | return unless $_[1]; # we are only interested in node-ups |
|
|
702 | return unless $NODE_SEED{$_[0]}; # we are only interested in seed nodes |
|
|
703 | |
|
|
704 | master_set $_[0]; |
|
|
705 | |
|
|
706 | $GLOBAL_MON = mon_nodes sub { |
|
|
707 | if ($_[0] eq $MASTER && !$_[1]) { |
|
|
708 | undef $MASTER; |
|
|
709 | master_search (); |
692 | } |
710 | } |
693 | } |
|
|
694 | |
|
|
695 | $GLOBAL_MON = mon_nodes sub { |
|
|
696 | return unless $_[1]; # we are only interested in node-ups |
|
|
697 | return unless $NODE_SEED{$_[0]}; # we are only interested in seed nodes |
|
|
698 | |
|
|
699 | global_master $_[0]; |
|
|
700 | |
|
|
701 | $GLOBAL_MON = mon_nodes sub { |
|
|
702 | if ($_[0] eq $MASTER && !$_[1]) { |
|
|
703 | undef $MASTER; |
|
|
704 | global_search (); |
|
|
705 | } |
|
|
706 | }; |
|
|
707 | }; |
711 | }; |
708 | } |
712 | }; |
709 | } |
713 | } |
710 | |
714 | |
711 | # register remote node listeners |
715 | # register remote node listeners |
712 | $node_req{global_reg} = sub { |
716 | $node_req{g_reg} = sub { |
713 | if ($GLOBAL) { |
717 | if ($GLOBAL) { |
714 | $NODE_ADDR->{$SRCNODE->{id}} = $_[0]; |
718 | $NODE_ADDR->{$SRCNODE->{id}} = $_[0]; |
715 | } else { |
719 | } else { |
716 | # should not happen normally, except when there are bad races |
720 | # should not happen normally, except when there are bad races |
717 | $WARN->(1, "$SRCNODE->{id} treats us as global node, but we aren't"); |
721 | $WARN->(1, "$SRCNODE->{id} treats us as global node, but we aren't"); |
718 | } |
722 | } |
719 | }; |
723 | }; |
720 | |
724 | |
721 | # also used |
725 | # also used |
722 | $node_req{global_add} = sub { |
726 | $node_req{g_add} = sub { |
723 | my ($node, $listener) = @_; |
727 | my ($node, $listener) = @_; |
724 | |
728 | |
725 | # no-op if we already know the info |
729 | # no-op if we already know the info |
726 | return |
730 | return |
727 | if (join "\x00", @$listener) eq (join "\x00", @{ $GLOBAL_ADDR->{$node} }); |
731 | if (join "\x00", @$listener) eq (join "\x00", @{ $GLOBAL_ADDR->{$node} }); |
728 | |
732 | |
729 | $GLOBAL_ADDR->{$node} = $listener; |
733 | $GLOBAL_ADDR->{$node} = $listener; |
730 | |
734 | |
731 | # let us be a good citizen and register (also connects) |
735 | # let us be a good citizen and register (also connects) |
732 | snd $node, global_reg => $LISTENER; |
736 | snd $node, g_reg => $LISTENER; |
733 | |
737 | |
734 | # if we are global, we update all our slaves |
738 | # if we are global, we update all our slaves |
735 | our %SLAVE; # ugh, will be in AnyEvent::MP::Global |
739 | our %GLOBAL_SLAVE; # ugh, will be in AnyEvent::MP::Global |
736 | snd $_, global_add => $node, $listener |
740 | snd $_, g_add => $node, $listener |
737 | for keys %SLAVE; |
741 | for keys %GLOBAL_SLAVE; |
738 | }; |
742 | }; |
739 | |
743 | |
740 | $node_req{global_del} = sub { |
744 | $node_req{g_del} = sub { |
741 | my ($node) = @_; |
745 | my ($node) = @_; |
742 | |
746 | |
743 | delete $GLOBAL_ADDR->{$node}; |
747 | delete $GLOBAL_ADDR->{$node}; |
744 | }; |
748 | }; |
745 | |
749 | |
746 | # tell every global node our listeners |
750 | # tell every global node our listeners |
747 | push @AnyEvent::MP::Transport::HOOK_CONNECT, sub { |
751 | push @AnyEvent::MP::Transport::HOOK_CONNECT, sub { |
748 | snd $_[0]{remote_node}, global_reg => $LISTENER |
752 | snd $_[0]{remote_node}, g_reg => $LISTENER |
749 | if $_[0]{remote_greeting}{global}; |
753 | if $_[0]{remote_greeting}{global}; |
750 | }; |
754 | }; |
751 | |
755 | |
752 | sub _become_global { #d# hack |
756 | master_search; |
753 | *_become_global = sub { }; |
|
|
754 | |
|
|
755 | warn "becoming global\n";#d# |
|
|
756 | |
|
|
757 | $GLOBAL = 1; |
|
|
758 | global_search; |
|
|
759 | |
|
|
760 | $GLOBAL_ADDR->{$NODE} = $LISTENER; |
|
|
761 | |
|
|
762 | $GLOBAL_MON = mon_nodes sub { |
|
|
763 | return if $_[1]; |
|
|
764 | |
|
|
765 | delete $NODE_ADDR->{$_[0]}; |
|
|
766 | |
|
|
767 | if (delete $GLOBAL_ADDR->{$_[0]}) { |
|
|
768 | # if the node is global, tell our slaves |
|
|
769 | |
|
|
770 | warn "global_del $_[0]\n";#d# |
|
|
771 | our %SLAVE; # ugh, will be in AnyEvent::MP::Global |
|
|
772 | snd $_, global_del => $_[0] |
|
|
773 | for keys %SLAVE; |
|
|
774 | } |
|
|
775 | }; |
|
|
776 | |
|
|
777 | # tell everybody who connects that we are a master |
|
|
778 | push @AnyEvent::MP::Transport::HOOK_GREET, sub { |
|
|
779 | $_[0]{local_greeting}{global} = 1; |
|
|
780 | }; |
|
|
781 | |
|
|
782 | # tell every global node that connects that we are global too |
|
|
783 | push @AnyEvent::MP::Transport::HOOK_CONNECT, sub { |
|
|
784 | snd $_[0], global_add => $NODE, $LISTENER |
|
|
785 | if $_[0]{remote_greeting}{global}; |
|
|
786 | }; |
|
|
787 | |
|
|
788 | # tell everybody else that we are global now |
|
|
789 | snd $_ => global_add => $NODE, $LISTENER |
|
|
790 | for up_nodes; |
|
|
791 | } |
|
|
792 | |
|
|
793 | global_search; |
|
|
794 | |
757 | |
795 | ############################################################################# |
758 | ############################################################################# |
796 | # configure |
759 | # configure |
797 | |
760 | |
798 | sub _nodename { |
761 | sub _nodename { |
… | |
… | |
893 | $profile = _nodename |
856 | $profile = _nodename |
894 | unless defined $profile; |
857 | unless defined $profile; |
895 | |
858 | |
896 | $CONFIG = AnyEvent::MP::Config::find_profile $profile, %kv; |
859 | $CONFIG = AnyEvent::MP::Config::find_profile $profile, %kv; |
897 | |
860 | |
898 | my $node = exists $CONFIG->{nodeid} ? $CONFIG->{nodeid} : $profile; |
861 | my $node = exists $CONFIG->{nodeid} ? $CONFIG->{nodeid} : "$profile/"; |
899 | |
862 | |
900 | $node or Carp::croak "$node: illegal node ID (see AnyEvent::MP manpage for syntax)\n"; |
863 | $node or Carp::croak "$node: illegal node ID (see AnyEvent::MP manpage for syntax)\n"; |
901 | |
864 | |
902 | $NODE = $node |
865 | $NODE = $node; |
903 | unless $node eq "anon/"; |
866 | $NODE =~ s%/$%/$RUNIQ%; |
904 | |
867 | |
905 | $NODE{$NODE} = $NODE{""}; |
868 | $NODE{$NODE} = $NODE{""}; |
906 | $NODE{$NODE}{id} = $NODE; |
869 | $NODE{$NODE}{id} = $NODE; |
907 | |
870 | |
908 | my $seeds = $CONFIG->{seeds}; |
871 | my $seeds = $CONFIG->{seeds}; |
… | |
… | |
941 | |
904 | |
942 | # connect to all seednodes |
905 | # connect to all seednodes |
943 | set_seeds map $_->recv, map _resolve $_, @$seeds; |
906 | set_seeds map $_->recv, map _resolve $_, @$seeds; |
944 | |
907 | |
945 | if ($NODE eq "atha") {;#d# |
908 | if ($NODE eq "atha") {;#d# |
946 | my $w; $w = AE::timer 4, 0, sub { undef $w; _become_global }; |
909 | my $w; $w = AE::timer 4, 0, sub { undef $w; require AnyEvent::MP::Global };#d# |
947 | } |
910 | } |
948 | |
911 | |
949 | for (@{ $CONFIG->{services} }) { |
912 | for (@{ $CONFIG->{services} }) { |
950 | if (ref) { |
913 | if (ref) { |
951 | my ($func, @args) = @$_; |
914 | my ($func, @args) = @$_; |