ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Kernel.pm
(Generate patch)

Comparing AnyEvent-MP/MP/Kernel.pm (file contents):
Revision 1.83 by root, Sat Mar 3 20:35:10 2012 UTC vs.
Revision 1.84 by root, Sun Mar 4 14:28:44 2012 UTC

44 44
45 NODE $NODE node_of snd kil port_is_local 45 NODE $NODE node_of snd kil port_is_local
46 configure 46 configure
47 up_nodes mon_nodes node_is_up 47 up_nodes mon_nodes node_is_up
48 db_set db_del db_reg 48 db_set db_del db_reg
49 db_mon db_family db_keys db_values
49); 50);
50 51
51=item $AnyEvent::MP::Kernel::WARN->($level, $msg) 52=item $AnyEvent::MP::Kernel::WARN->($level, $msg)
52 53
53This value is called with an error or warning message, when e.g. a 54This value is called with an error or warning message, when e.g. a
471$NODE{""} = $NODE{$NODE} = new AnyEvent::MP::Node::Self $NODE; 472$NODE{""} = $NODE{$NODE} = new AnyEvent::MP::Node::Self $NODE;
472$PORT{""} = sub { 473$PORT{""} = sub {
473 my $tag = shift; 474 my $tag = shift;
474 eval { &{ $NODE_REQ{$tag} ||= do { &_secure_check; load_func $tag } } }; 475 eval { &{ $NODE_REQ{$tag} ||= do { &_secure_check; load_func $tag } } };
475 $WARN->(2, "error processing node message from $SRCNODE->{id}: $@") if $@; 476 $WARN->(2, "error processing node message from $SRCNODE->{id}: $@") if $@;
477};
478
479our $NPROTO = 1;
480
481# tell everybody who connects our nproto
482push @AnyEvent::MP::Transport::HOOK_GREET, sub {
483 $_[0]{local_greeting}{nproto} = $NPROTO;
476}; 484};
477 485
478############################################################################# 486#############################################################################
479# seed management, try to keep connections to all seeds at all times 487# seed management, try to keep connections to all seeds at all times
480 488
594our $MASTER_MON; 602our $MASTER_MON;
595our %LOCAL_DB; # this node database 603our %LOCAL_DB; # this node database
596 604
597our %GLOBAL_DB; # all local databases, merged - empty on non-global nodes 605our %GLOBAL_DB; # all local databases, merged - empty on non-global nodes
598 606
607our $GPROTO = 1;
608
609# tell everybody who connects our nproto
610push @AnyEvent::MP::Transport::HOOK_GREET, sub {
611 $_[0]{local_greeting}{gproto} = $GPROTO;
612};
613
599############################################################################# 614#############################################################################
600# master selection 615# master selection
601 616
602# master requests 617# master requests
603our %GLOBAL_REQ; # $id => \@req 618our %GLOBAL_REQ; # $id => \@req
704#d# db_key 719#d# db_key
705 720
706our %LOCAL_MON; # f, reply 721our %LOCAL_MON; # f, reply
707our %MON_DB; # f, k, value 722our %MON_DB; # f, k, value
708 723
709sub g_chg {
710 my $f = shift;
711
712 $#_ ? $MON_DB{$f}{$_[0]} = $_[1]
713 : delete $MON_DB{$f}{$_[0]};
714
715 &{ $_->[0] }
716 for values %{ $LOCAL_MON{$f} };
717}
718
719sub db_mon($@) { 724sub db_mon($@) {
720 my ($family, @reply) = @_; 725 my ($family, $cb) = @_;
721 726
722 my $reply = \@reply; 727 if (my $db = $MON_DB{$family}) {
723 my $id = $reply + 0;
724
725 if (%{ $LOCAL_MON{$family} }) {
726 # if we already monitor this thingy, generate 728 # if we already monitor this thingy, generate
727 # create events for all of them 729 # create events for all of them
728 while (my ($key, $value) = each %{ $MON_DB{$family} }) { 730 $cb->($db, [keys %$db]);
729 $reply->[0]->($key, $value);
730 }
731 } else { 731 } else {
732 # new monitor, request chg1 from upstream 732 # new monitor, request chg1 from upstream
733 global_req_add "mon1 $family" => [g_mon1 => $family]; 733 global_req_add "mon1 $family" => [g_mon1 => $family];
734 $MON_DB{$family} = {}; 734 $MON_DB{$family} = {};
735 } 735 }
736 736
737 $LOCAL_MON{$family}{$id} = \@reply; 737 $LOCAL_MON{$family}{$cb+0} = $cb;
738 738
739 Guard::guard { 739 Guard::guard {
740 my $mon = $LOCAL_MON{$family}; 740 my $mon = $LOCAL_MON{$family};
741 delete $mon->{$id}; 741 delete $mon->{$cb+0};
742 742
743 unless (%$mon) { 743 unless (%$mon) {
744 global_req_del "mon1 $family"; 744 global_req_del "mon1 $family";
745 745
746 # no global_req, because we don't care if we are not connected 746 # no global_req, because we don't care if we are not connected
753 } 753 }
754} 754}
755 755
756# full update 756# full update
757$NODE_REQ{g_chg1} = sub { 757$NODE_REQ{g_chg1} = sub {
758 my ($f, $db) = @_; 758 my ($f, $ndb) = @_;
759
760 my $db = $MON_DB{$f};
761 my @k;
759 762
760 # add or replace keys 763 # add or replace keys
761 while (my ($k, $v) = each %$db) { 764 while (my ($k, $v) = each %$ndb) {
762 g_chg $f, $k, $v; 765 $db->{$k} = $v;
766 push @k, $k;
763 } 767 }
764 768
765 # delete keys that are no longer present 769 # delete keys that are no longer present
766 for (keys %{ $MON_DB{$f} }) { 770 for (grep !exists $ndb->{$_}, keys %$db) {
767 g_chg $f, $_ 771 delete $db->{$_};
768 unless exists $db->{$_}; 772 push @k, $_;
769 } 773 }
774
775 $_->($db, \@k)
776 for values %{ $LOCAL_MON{$_[0]} };
770}; 777};
771 778
772# incremental update 779# incremental update
773$NODE_REQ{g_chg2} = \&g_chg; 780$NODE_REQ{g_chg2} = sub {
781 my $db = $MON_DB{$_[0]};
782
783 @_ >= 3
784 ? $db->{$_[1]} = $_[2]
785 : delete $db->{$_[1]};
786
787 $_->($db, [$_[1]])
788 for values %{ $LOCAL_MON{$_[0]} };
789};
774 790
775############################################################################# 791#############################################################################
776# configure 792# configure
777 793
778sub nodename { 794sub nodename {

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines