… | |
… | |
113 | our (%PORT, %PORT_DATA); # local ports |
113 | our (%PORT, %PORT_DATA); # local ports |
114 | |
114 | |
115 | our %RMON; # local ports monitored by remote nodes ($RMON{nodeid}{portid} == cb) |
115 | our %RMON; # local ports monitored by remote nodes ($RMON{nodeid}{portid} == cb) |
116 | our %LMON; # monitored _local_ ports |
116 | our %LMON; # monitored _local_ ports |
117 | |
117 | |
118 | our $GLOBAL; # true if node is a global ("directory") node |
118 | #our $GLOBAL; # true if node is a global ("directory") node |
119 | our %BINDS; |
119 | our %BINDS; |
120 | our $BINDS; # our listeners, as arrayref |
120 | our $BINDS; # our listeners, as arrayref |
121 | |
121 | |
122 | our $SRCNODE; # holds the sending node _object_ during _inject |
122 | our $SRCNODE; # holds the sending node _object_ during _inject |
|
|
123 | our $GLOBAL; # true when this is a global node (only set by AnyEvent::MP::Global) |
123 | |
124 | |
124 | # initialise names for non-networked operation |
125 | # initialise names for non-networked operation |
125 | { |
126 | { |
126 | # ~54 bits, for local port names, lowercase $ID appended |
127 | # ~54 bits, for local port names, lowercase $ID appended |
127 | my $now = AE::now; |
128 | my $now = AE::now; |
… | |
… | |
462 | our %SEED_NODE; # seed ID => node ID|undef |
463 | our %SEED_NODE; # seed ID => node ID|undef |
463 | our %NODE_SEED; # map node ID to seed ID |
464 | our %NODE_SEED; # map node ID to seed ID |
464 | our %SEED_CONNECT; # $seed => transport_connector | 1=connected | 2=connecting |
465 | our %SEED_CONNECT; # $seed => transport_connector | 1=connected | 2=connecting |
465 | our $SEED_WATCHER; |
466 | our $SEED_WATCHER; |
466 | our $SEED_RETRY; |
467 | our $SEED_RETRY; |
|
|
468 | our %GLOBAL_NODE; # global => undef |
467 | |
469 | |
468 | sub seed_connect { |
470 | sub seed_connect { |
469 | my ($seed) = @_; |
471 | my ($seed) = @_; |
470 | |
472 | |
471 | my ($host, $port) = AnyEvent::Socket::parse_hostport $seed |
473 | my ($host, $port) = AnyEvent::Socket::parse_hostport $seed |
… | |
… | |
487 | require AnyEvent::MP::Global; # every seed becomes a global node currently |
489 | require AnyEvent::MP::Global; # every seed becomes a global node currently |
488 | delete $SEED_NODE{$seed}; |
490 | delete $SEED_NODE{$seed}; |
489 | } else { |
491 | } else { |
490 | $SEED_NODE{$seed} = $_[0]{remote_node}; |
492 | $SEED_NODE{$seed} = $_[0]{remote_node}; |
491 | $NODE_SEED{$_[0]{remote_node}} = $seed; |
493 | $NODE_SEED{$_[0]{remote_node}} = $seed; |
|
|
494 | |
492 | # also start global service, if not running |
495 | # also start global service, in case it isn't running |
493 | # we need to check here in addition to the mon_nodes below |
496 | # since we probably switch conenctions, maybe we don't need to do this here? |
494 | # because we might only learn late that a node is a seed |
|
|
495 | # and then we might already be connected |
|
|
496 | snd $_[0]{remote_node}, "g_slave" |
497 | snd $_[0]{remote_node}, "g_slave"; |
497 | unless $_[0]{remote_greeting}{global}; |
|
|
498 | } |
498 | } |
499 | }, |
499 | }, |
500 | sub { |
500 | sub { |
501 | delete $SEED_CONNECT{$seed}; |
501 | delete $SEED_CONNECT{$seed}; |
502 | } |
502 | } |
… | |
… | |
512 | if (@seeds) { |
512 | if (@seeds) { |
513 | # start connection attempt for every seed we are not connected to yet |
513 | # start connection attempt for every seed we are not connected to yet |
514 | seed_connect $_ |
514 | seed_connect $_ |
515 | for @seeds; |
515 | for @seeds; |
516 | |
516 | |
517 | $SEED_RETRY = $SEED_RETRY * 2 + rand; |
517 | $SEED_RETRY = $SEED_RETRY * 2; |
518 | $SEED_RETRY = $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout} |
518 | $SEED_RETRY = $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout} |
519 | if $SEED_RETRY > $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout}; |
519 | if $SEED_RETRY > $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout}; |
520 | |
520 | |
521 | $SEED_WATCHER = AE::timer $SEED_RETRY, 0, \&seed_all; |
521 | $SEED_WATCHER = AE::timer $SEED_RETRY, 0, \&seed_all; |
522 | |
522 | |
… | |
… | |
525 | undef $SEED_WATCHER; |
525 | undef $SEED_WATCHER; |
526 | } |
526 | } |
527 | } |
527 | } |
528 | |
528 | |
529 | sub seed_again { |
529 | sub seed_again { |
530 | $SEED_RETRY = 1; |
530 | $SEED_RETRY = (1 + rand) * 0.6; |
531 | $SEED_WATCHER ||= AE::timer 1, 0, \&seed_all; |
531 | $SEED_WATCHER ||= AE::timer $SEED_RETRY, 0, \&seed_all; |
532 | } |
532 | } |
533 | |
533 | |
534 | # sets new seed list, starts connecting |
534 | # sets new seed list, starts connecting |
535 | sub set_seeds(@) { |
535 | sub set_seeds(@) { |
536 | %SEED_NODE = (); |
536 | %SEED_NODE = (); |
… | |
… | |
540 | @SEED_NODE{@_} = (); |
540 | @SEED_NODE{@_} = (); |
541 | |
541 | |
542 | seed_all; |
542 | seed_all; |
543 | } |
543 | } |
544 | |
544 | |
|
|
545 | # normal nodes only record global node connections |
|
|
546 | $NODE_REQ{g_global} = sub { |
|
|
547 | undef $GLOBAL_NODE{$SRCNODE}; |
|
|
548 | }; |
|
|
549 | |
545 | mon_nodes sub { |
550 | mon_nodes sub { |
|
|
551 | delete $GLOBAL_NODE{$_[0]} |
|
|
552 | unless $_[1]; |
|
|
553 | |
546 | return unless exists $NODE_SEED{$_[0]}; |
554 | return unless exists $NODE_SEED{$_[0]}; |
547 | |
555 | |
548 | if ($_[1]) { |
556 | if ($_[1]) { |
549 | # each time a connection to a seed node goes up, make |
557 | # each time a connection to a seed node goes up, make |
550 | # sure it runs the global service. |
558 | # sure it runs the global service. |
551 | snd $_[0], "g_slave" |
559 | snd $_[0], "g_slave"; |
552 | unless $NODE{$_[0]}{transport}{remote_greeting}{global}; |
|
|
553 | } else { |
560 | } else { |
554 | # if we lost the connection to a seed node, make sure we are seeding |
561 | # if we lost the connection to a seed node, make sure we are seeding |
555 | seed_again; |
562 | seed_again; |
556 | } |
563 | } |
557 | }; |
564 | }; |
… | |
… | |
572 | . "."; |
579 | . "."; |
573 | |
580 | |
574 | (add_node $_)->connect |
581 | (add_node $_)->connect |
575 | for keys %KEEPALIVE_DOWN; |
582 | for keys %KEEPALIVE_DOWN; |
576 | |
583 | |
577 | $KEEPALIVE_RETRY = $KEEPALIVE_RETRY * 2 + rand; |
584 | $KEEPALIVE_RETRY = $KEEPALIVE_RETRY * 2; |
578 | $KEEPALIVE_RETRY = $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout} |
585 | $KEEPALIVE_RETRY = $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout} |
579 | if $KEEPALIVE_RETRY > $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout}; |
586 | if $KEEPALIVE_RETRY > $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout}; |
580 | |
587 | |
581 | $KEEPALIVE_WATCHER = AE::timer $KEEPALIVE_RETRY, 0, \&keepalive_all; |
588 | $KEEPALIVE_WATCHER = AE::timer $KEEPALIVE_RETRY, 0, \&keepalive_all; |
582 | } |
589 | } |
583 | |
590 | |
584 | sub keepalive_again { |
591 | sub keepalive_again { |
585 | $KEEPALIVE_RETRY = 1; |
592 | $KEEPALIVE_RETRY = (1 + rand) * 0.3; |
586 | keepalive_all; |
593 | keepalive_all; |
587 | } |
594 | } |
588 | |
595 | |
589 | sub keepalive_add { |
596 | sub keepalive_add { |
590 | return if $KEEPALIVE{$_[0]}++; |
597 | return if $KEEPALIVE{$_[0]}++; |
… | |
… | |
623 | # talk with/to global nodes |
630 | # talk with/to global nodes |
624 | |
631 | |
625 | # protocol messages: |
632 | # protocol messages: |
626 | # |
633 | # |
627 | # sent by global nodes |
634 | # sent by global nodes |
628 | # g_global - node became global, similar to global=1 greeting |
635 | # g_global - global nodes send this to all others |
629 | # |
636 | # |
630 | # database protocol |
637 | # database protocol |
631 | # g_slave database - make other global node master of the sender |
638 | # g_slave database - make other global node master of the sender |
632 | # g_set database - global node's database to other global nodes |
639 | # g_set database - global node's database to other global nodes |
633 | # g_upd family set del - update single family (all to global) |
640 | # g_upd family set del - update single family (any to global) |
634 | # |
641 | # |
635 | # slave <-> global protocol |
642 | # slave <-> global protocol |
636 | # g_find node - query addresses for node (slave to global) |
643 | # g_find node - query addresses for node (slave to global) |
637 | # g_found node binds - node addresses (global to slave) |
644 | # g_found node binds - node addresses (global to slave) |
638 | # g_db_family family id - send g_reply with data (global to slave) |
645 | # g_db_family family id - send g_reply with data (global to slave) |
… | |
… | |
642 | # g_mon1 family - start to monitor family, replies with g_chg1 |
649 | # g_mon1 family - start to monitor family, replies with g_chg1 |
643 | # g_mon0 family - stop monitoring family |
650 | # g_mon0 family - stop monitoring family |
644 | # g_chg1 family hash - initial value of family when starting to monitor |
651 | # g_chg1 family hash - initial value of family when starting to monitor |
645 | # g_chg2 family set del - like g_upd, but for monitoring only |
652 | # g_chg2 family set del - like g_upd, but for monitoring only |
646 | # |
653 | # |
647 | # internal database families |
654 | # internal database families: |
648 | # "'l" -> node -> listeners |
655 | # "'l" -> node -> listeners |
649 | # "'g" -> node -> undef |
656 | # "'g" -> node -> undef |
650 | # ... |
657 | # ... |
651 | # |
658 | # |
652 | |
659 | |
653 | # used on all nodes: |
660 | # used on all nodes: |
654 | our $MASTER; # the global node we bind ourselves to |
661 | our $MASTER; # the global node we bind ourselves to |
655 | our $MASTER_MON; |
662 | our $MASTER_MON; |
656 | our %LOCAL_DB; # this node database |
663 | our %LOCAL_DB; # this node database |
657 | |
|
|
658 | our %GLOBAL_DB; # all local databases, merged - empty on non-global nodes |
|
|
659 | |
664 | |
660 | our $GPROTO = 1; |
665 | our $GPROTO = 1; |
661 | |
666 | |
662 | # tell everybody who connects our nproto |
667 | # tell everybody who connects our nproto |
663 | push @AnyEvent::MP::Transport::HOOK_GREET, sub { |
668 | push @AnyEvent::MP::Transport::HOOK_GREET, sub { |
… | |
… | |
720 | $node->connect_to ($_[1]); |
725 | $node->connect_to ($_[1]); |
721 | }; |
726 | }; |
722 | |
727 | |
723 | sub master_set { |
728 | sub master_set { |
724 | $MASTER = $_[0]; |
729 | $MASTER = $_[0]; |
|
|
730 | AE::log 8 => "new master node: $MASTER."; |
|
|
731 | |
|
|
732 | $MASTER_MON = mon_nodes sub { |
|
|
733 | if ($_[0] eq $MASTER && !$_[1]) { |
|
|
734 | undef $MASTER; |
|
|
735 | master_search (); |
|
|
736 | } |
|
|
737 | }; |
725 | |
738 | |
726 | snd $MASTER, g_slave => \%LOCAL_DB; |
739 | snd $MASTER, g_slave => \%LOCAL_DB; |
727 | |
740 | |
728 | # (re-)send queued requests |
741 | # (re-)send queued requests |
729 | snd $MASTER, @$_ |
742 | snd $MASTER, @$_ |
730 | for values %GLOBAL_REQ; |
743 | for values %GLOBAL_REQ; |
731 | } |
744 | } |
732 | |
745 | |
733 | sub master_search { |
746 | sub master_search { |
|
|
747 | AE::log 9 => "starting search for master node."; |
|
|
748 | |
734 | #TODO: should also look for other global nodes, but we don't know them #d# |
749 | #TODO: should also look for other global nodes, but we don't know them #d# |
735 | for (keys %NODE_SEED) { |
750 | for (keys %NODE_SEED) { |
736 | if (node_is_up $_) { |
751 | if (node_is_up $_) { |
737 | master_set $_; |
752 | master_set $_; |
738 | return; |
753 | return; |
… | |
… | |
742 | $MASTER_MON = mon_nodes sub { |
757 | $MASTER_MON = mon_nodes sub { |
743 | return unless $_[1]; # we are only interested in node-ups |
758 | return unless $_[1]; # we are only interested in node-ups |
744 | return unless $NODE_SEED{$_[0]}; # we are only interested in seed nodes |
759 | return unless $NODE_SEED{$_[0]}; # we are only interested in seed nodes |
745 | |
760 | |
746 | master_set $_[0]; |
761 | master_set $_[0]; |
747 | |
|
|
748 | $MASTER_MON = mon_nodes sub { |
|
|
749 | if ($_[0] eq $MASTER && !$_[1]) { |
|
|
750 | undef $MASTER; |
|
|
751 | master_search (); |
|
|
752 | } |
|
|
753 | }; |
|
|
754 | }; |
762 | }; |
755 | } |
763 | } |
756 | |
764 | |
757 | # other node wants to make us the master, so start the global service |
765 | # other node wants to make us the master, so start the global service |
758 | $NODE_REQ{g_slave} = sub { |
766 | $NODE_REQ{g_slave} = sub { |
759 | my ($db) = @_; |
|
|
760 | |
|
|
761 | # load global module and redo the request |
767 | # load global module and redo the request |
762 | require AnyEvent::MP::Global; |
768 | require AnyEvent::MP::Global; |
763 | &{ $NODE_REQ{g_slave} } |
769 | &{ $NODE_REQ{g_slave} } |
764 | }; |
770 | }; |
765 | |
771 | |
766 | ############################################################################# |
772 | ############################################################################# |
767 | # local database operations |
773 | # local database operations |
768 | |
774 | |
|
|
775 | # canonical probably not needed |
|
|
776 | our $sv_eq_coder = JSON::XS->new->utf8->allow_nonref; |
|
|
777 | |
|
|
778 | # are the two scalars equal? very very ugly and slow, need better way |
|
|
779 | sub sv_eq($$) { |
|
|
780 | ref $_[0] || ref $_[1] |
|
|
781 | ? (JSON::XS::encode $sv_eq_coder, $_[0]) eq (JSON::XS::encode $sv_eq_coder, $_[1]) |
|
|
782 | : $_[0] eq $_[1] |
|
|
783 | && defined $_[0] == defined $_[1] |
|
|
784 | } |
|
|
785 | |
769 | # local database management |
786 | # local database management |
|
|
787 | |
|
|
788 | sub db_del($@) { |
|
|
789 | my $family = shift; |
|
|
790 | |
|
|
791 | my @del = grep exists $LOCAL_DB{$family}{$_}, @_; |
|
|
792 | |
|
|
793 | return unless @del; |
|
|
794 | |
|
|
795 | delete @{ $LOCAL_DB{$family} }{@del}; |
|
|
796 | snd $MASTER, g_upd => $family => undef, \@del |
|
|
797 | if defined $MASTER; |
|
|
798 | } |
770 | |
799 | |
771 | sub db_set($$;$) { |
800 | sub db_set($$;$) { |
772 | my ($family, $subkey) = @_; |
801 | my ($family, $subkey) = @_; |
773 | |
802 | |
774 | # if (ref $_[1]) { |
803 | # if (ref $_[1]) { |
… | |
… | |
777 | # $LOCAL_DB{$_[0]} = $_[1]; |
806 | # $LOCAL_DB{$_[0]} = $_[1]; |
778 | # snd $MASTER, g_upd => $_[0] => $_[1], \@del |
807 | # snd $MASTER, g_upd => $_[0] => $_[1], \@del |
779 | # if defined $MASTER; |
808 | # if defined $MASTER; |
780 | # } else { |
809 | # } else { |
781 | # single-key |
810 | # single-key |
|
|
811 | unless (exists $LOCAL_DB{$family}{$subkey} && sv_eq $LOCAL_DB{$family}{$subkey}, $_[2]) { |
782 | $LOCAL_DB{$family}{$subkey} = $_[2]; |
812 | $LOCAL_DB{$family}{$subkey} = $_[2]; |
783 | snd $MASTER, g_upd => $family => { $subkey => $_[2] } |
813 | snd $MASTER, g_upd => $family => { $subkey => $_[2] } |
784 | if defined $MASTER; |
814 | if defined $MASTER; |
|
|
815 | } |
785 | # } |
816 | # } |
786 | |
817 | |
787 | defined wantarray |
818 | defined wantarray |
788 | and Guard::guard { db_del $family => $subkey } |
819 | and Guard::guard { db_del $family => $subkey } |
789 | } |
|
|
790 | |
|
|
791 | sub db_del($@) { |
|
|
792 | my $family = shift; |
|
|
793 | |
|
|
794 | my @del = grep exists $LOCAL_DB{$family}{$_}, @_; |
|
|
795 | |
|
|
796 | return unless @del; |
|
|
797 | |
|
|
798 | delete @{ $LOCAL_DB{$family} }{@del}; |
|
|
799 | snd $MASTER, g_upd => $family => undef, \@del |
|
|
800 | if defined $MASTER; |
|
|
801 | } |
820 | } |
802 | |
821 | |
803 | # database query |
822 | # database query |
804 | |
823 | |
805 | sub db_family { |
824 | sub db_family { |