… | |
… | |
44 | |
44 | |
45 | NODE $NODE node_of snd kil port_is_local |
45 | NODE $NODE node_of snd kil port_is_local |
46 | configure |
46 | configure |
47 | up_nodes mon_nodes node_is_up |
47 | up_nodes mon_nodes node_is_up |
48 | db_set db_del db_reg |
48 | db_set db_del db_reg |
|
|
49 | db_mon db_family db_keys db_values |
49 | ); |
50 | ); |
50 | |
51 | |
51 | =item $AnyEvent::MP::Kernel::WARN->($level, $msg) |
52 | =item $AnyEvent::MP::Kernel::WARN->($level, $msg) |
52 | |
53 | |
53 | This value is called with an error or warning message, when e.g. a |
54 | This value is called with an error or warning message, when e.g. a |
… | |
… | |
471 | $NODE{""} = $NODE{$NODE} = new AnyEvent::MP::Node::Self $NODE; |
472 | $NODE{""} = $NODE{$NODE} = new AnyEvent::MP::Node::Self $NODE; |
472 | $PORT{""} = sub { |
473 | $PORT{""} = sub { |
473 | my $tag = shift; |
474 | my $tag = shift; |
474 | eval { &{ $NODE_REQ{$tag} ||= do { &_secure_check; load_func $tag } } }; |
475 | eval { &{ $NODE_REQ{$tag} ||= do { &_secure_check; load_func $tag } } }; |
475 | $WARN->(2, "error processing node message from $SRCNODE->{id}: $@") if $@; |
476 | $WARN->(2, "error processing node message from $SRCNODE->{id}: $@") if $@; |
|
|
477 | }; |
|
|
478 | |
|
|
479 | our $NPROTO = 1; |
|
|
480 | |
|
|
481 | # tell everybody who connects our nproto |
|
|
482 | push @AnyEvent::MP::Transport::HOOK_GREET, sub { |
|
|
483 | $_[0]{local_greeting}{nproto} = $NPROTO; |
476 | }; |
484 | }; |
477 | |
485 | |
478 | ############################################################################# |
486 | ############################################################################# |
479 | # seed management, try to keep connections to all seeds at all times |
487 | # seed management, try to keep connections to all seeds at all times |
480 | |
488 | |
… | |
… | |
594 | our $MASTER_MON; |
602 | our $MASTER_MON; |
595 | our %LOCAL_DB; # this node database |
603 | our %LOCAL_DB; # this node database |
596 | |
604 | |
597 | our %GLOBAL_DB; # all local databases, merged - empty on non-global nodes |
605 | our %GLOBAL_DB; # all local databases, merged - empty on non-global nodes |
598 | |
606 | |
|
|
607 | our $GPROTO = 1; |
|
|
608 | |
|
|
609 | # tell everybody who connects our nproto |
|
|
610 | push @AnyEvent::MP::Transport::HOOK_GREET, sub { |
|
|
611 | $_[0]{local_greeting}{gproto} = $GPROTO; |
|
|
612 | }; |
|
|
613 | |
599 | ############################################################################# |
614 | ############################################################################# |
600 | # master selection |
615 | # master selection |
601 | |
616 | |
602 | # master requests |
617 | # master requests |
603 | our %GLOBAL_REQ; # $id => \@req |
618 | our %GLOBAL_REQ; # $id => \@req |
… | |
… | |
704 | #d# db_key |
719 | #d# db_key |
705 | |
720 | |
706 | our %LOCAL_MON; # f, reply |
721 | our %LOCAL_MON; # f, reply |
707 | our %MON_DB; # f, k, value |
722 | our %MON_DB; # f, k, value |
708 | |
723 | |
709 | sub g_chg { |
|
|
710 | my $f = shift; |
|
|
711 | |
|
|
712 | $#_ ? $MON_DB{$f}{$_[0]} = $_[1] |
|
|
713 | : delete $MON_DB{$f}{$_[0]}; |
|
|
714 | |
|
|
715 | &{ $_->[0] } |
|
|
716 | for values %{ $LOCAL_MON{$f} }; |
|
|
717 | } |
|
|
718 | |
|
|
719 | sub db_mon($@) { |
724 | sub db_mon($@) { |
720 | my ($family, @reply) = @_; |
725 | my ($family, $cb) = @_; |
721 | |
726 | |
722 | my $reply = \@reply; |
727 | if (my $db = $MON_DB{$family}) { |
723 | my $id = $reply + 0; |
|
|
724 | |
|
|
725 | if (%{ $LOCAL_MON{$family} }) { |
|
|
726 | # if we already monitor this thingy, generate |
728 | # if we already monitor this thingy, generate |
727 | # create events for all of them |
729 | # create events for all of them |
728 | while (my ($key, $value) = each %{ $MON_DB{$family} }) { |
730 | $cb->($db, [keys %$db]); |
729 | $reply->[0]->($key, $value); |
|
|
730 | } |
|
|
731 | } else { |
731 | } else { |
732 | # new monitor, request chg1 from upstream |
732 | # new monitor, request chg1 from upstream |
733 | global_req_add "mon1 $family" => [g_mon1 => $family]; |
733 | global_req_add "mon1 $family" => [g_mon1 => $family]; |
734 | $MON_DB{$family} = {}; |
734 | $MON_DB{$family} = {}; |
735 | } |
735 | } |
736 | |
736 | |
737 | $LOCAL_MON{$family}{$id} = \@reply; |
737 | $LOCAL_MON{$family}{$cb+0} = $cb; |
738 | |
738 | |
739 | Guard::guard { |
739 | Guard::guard { |
740 | my $mon = $LOCAL_MON{$family}; |
740 | my $mon = $LOCAL_MON{$family}; |
741 | delete $mon->{$id}; |
741 | delete $mon->{$cb+0}; |
742 | |
742 | |
743 | unless (%$mon) { |
743 | unless (%$mon) { |
744 | global_req_del "mon1 $family"; |
744 | global_req_del "mon1 $family"; |
745 | |
745 | |
746 | # no global_req, because we don't care if we are not connected |
746 | # no global_req, because we don't care if we are not connected |
… | |
… | |
753 | } |
753 | } |
754 | } |
754 | } |
755 | |
755 | |
756 | # full update |
756 | # full update |
757 | $NODE_REQ{g_chg1} = sub { |
757 | $NODE_REQ{g_chg1} = sub { |
758 | my ($f, $db) = @_; |
758 | my ($f, $ndb) = @_; |
|
|
759 | |
|
|
760 | my $db = $MON_DB{$f}; |
|
|
761 | my @k; |
759 | |
762 | |
760 | # add or replace keys |
763 | # add or replace keys |
761 | while (my ($k, $v) = each %$db) { |
764 | while (my ($k, $v) = each %$ndb) { |
762 | g_chg $f, $k, $v; |
765 | $db->{$k} = $v; |
|
|
766 | push @k, $k; |
763 | } |
767 | } |
764 | |
768 | |
765 | # delete keys that are no longer present |
769 | # delete keys that are no longer present |
766 | for (keys %{ $MON_DB{$f} }) { |
770 | for (grep !exists $ndb->{$_}, keys %$db) { |
767 | g_chg $f, $_ |
771 | delete $db->{$_}; |
768 | unless exists $db->{$_}; |
772 | push @k, $_; |
769 | } |
773 | } |
|
|
774 | |
|
|
775 | $_->($db, \@k) |
|
|
776 | for values %{ $LOCAL_MON{$_[0]} }; |
770 | }; |
777 | }; |
771 | |
778 | |
772 | # incremental update |
779 | # incremental update |
773 | $NODE_REQ{g_chg2} = \&g_chg; |
780 | $NODE_REQ{g_chg2} = sub { |
|
|
781 | my $db = $MON_DB{$_[0]}; |
|
|
782 | |
|
|
783 | @_ >= 3 |
|
|
784 | ? $db->{$_[1]} = $_[2] |
|
|
785 | : delete $db->{$_[1]}; |
|
|
786 | |
|
|
787 | $_->($db, [$_[1]]) |
|
|
788 | for values %{ $LOCAL_MON{$_[0]} }; |
|
|
789 | }; |
774 | |
790 | |
775 | ############################################################################# |
791 | ############################################################################# |
776 | # configure |
792 | # configure |
777 | |
793 | |
778 | sub nodename { |
794 | sub nodename { |