ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/cvsroot/AnyEvent-MP/MP/Kernel.pm
(Generate patch)

Comparing cvsroot/AnyEvent-MP/MP/Kernel.pm (file contents):
Revision 1.88 by root, Thu Mar 8 21:37:51 2012 UTC vs.
Revision 1.89 by root, Fri Mar 9 17:05:26 2012 UTC

589# sent by all slave nodes (slave to master) 589# sent by all slave nodes (slave to master)
590# g_slave database - make other global node master of the sender 590# g_slave database - make other global node master of the sender
591# 591#
592# sent by any node to global nodes 592# sent by any node to global nodes
593# g_set database - set whole database 593# g_set database - set whole database
594# g_add family key val - add/replace key to database 594# g_upd family set del - update single family
595# g_del family key - delete key from database 595# g_del family key - delete key from database
596# g_get family key reply... - send reply with data 596# g_get family key reply... - send reply with data
597# 597#
598# send by global nodes 598# send by global nodes
599# g_global - node became global, similar to global=1 greeting 599# g_global - node became global, similar to global=1 greeting
721# local database operations 721# local database operations
722 722
723# local database management 723# local database management
724 724
725sub db_set($$;$) { 725sub db_set($$;$) {
726# if (ref $_[1]) {
727# # bulk
728# my @del = grep exists $LOCAL_DB{$_[0]}{$_}, keys ${ $_[1] };
729# $LOCAL_DB{$_[0]} = $_[1];
730# snd $MASTER, g_upd => $_[0] => $_[1], \@del
731# if defined $MASTER;
732# } else {
733 # single-key
726 $LOCAL_DB{$_[0]}{$_[1]} = $_[2]; 734 $LOCAL_DB{$_[0]}{$_[1]} = $_[2];
727 snd $MASTER, g_add => $_[0] => $_[1] => $_[2] 735 snd $MASTER, g_upd => $_[0] => { $_[1] => $_[2] }
728 if defined $MASTER; 736 if defined $MASTER;
737# }
729} 738}
730 739
731sub db_del($$) { 740sub db_del($@) {
732 delete $LOCAL_DB{$_[0]}{$_[1]}; 741 my $family = shift;
733 snd $MASTER, g_del => $_[0] => $_[1] 742
743 delete @{ $LOCAL_DB{$family} }{@_};
744 snd $MASTER, g_upd => $family => undef, \@_
734 if defined $MASTER; 745 if defined $MASTER;
735} 746}
736 747
737sub db_reg($$;$) { 748sub db_reg($$;$) {
738 my ($family, $key) = @_; 749 my ($family, $key) = @_;
764 775
765sub db_mon($@) { 776sub db_mon($@) {
766 my ($family, $cb) = @_; 777 my ($family, $cb) = @_;
767 778
768 if (my $db = $MON_DB{$family}) { 779 if (my $db = $MON_DB{$family}) {
769 # if we already monitor this thingy, generate 780 # we already monitor, so create a "dummy" change event
770 # create events for all of them 781 # this is postponed, which might be too late (we could process
782 # change events), so disable the callback at first
783 $LOCAL_MON{$family}{$cb+0} = sub { };
784 AE::postpone {
785 return unless exists $LOCAL_MON{$family}{$cb+0}; # guard might have gone away already
786
787 # set actual callback
788 $LOCAL_MON{$family}{$cb+0} = $cb;
771 $cb->($db, [keys %$db]); 789 $cb->($db, [keys %$db]);
790 };
772 } else { 791 } else {
773 # new monitor, request chg1 from upstream 792 # new monitor, request chg1 from upstream
793 $LOCAL_MON{$family}{$cb+0} = $cb;
774 global_req_add "mon1 $family" => [g_mon1 => $family]; 794 global_req_add "mon1 $family" => [g_mon1 => $family];
775 $MON_DB{$family} = {}; 795 $MON_DB{$family} = {};
776 } 796 }
777
778 $LOCAL_MON{$family}{$cb+0} = $cb;
779 797
780 Guard::guard { 798 Guard::guard {
781 my $mon = $LOCAL_MON{$family}; 799 my $mon = $LOCAL_MON{$family};
782 delete $mon->{$cb+0}; 800 delete $mon->{$cb+0};
783 801
797# full update 815# full update
798$NODE_REQ{g_chg1} = sub { 816$NODE_REQ{g_chg1} = sub {
799 my ($f, $ndb) = @_; 817 my ($f, $ndb) = @_;
800 818
801 my $db = $MON_DB{$f}; 819 my $db = $MON_DB{$f};
802 my @k; 820 my (@a, @c, @d);
803 821
804 # add or replace keys 822 # add or replace keys
805 while (my ($k, $v) = each %$ndb) { 823 while (my ($k, $v) = each %$ndb) {
824 exists $db->{$k}
825 ? push @c, $k
826 : push @a, $k;
806 $db->{$k} = $v; 827 $db->{$k} = $v;
807 push @k, $k;
808 } 828 }
809 829
810 # delete keys that are no longer present 830 # delete keys that are no longer present
811 for (grep !exists $ndb->{$_}, keys %$db) { 831 for (grep !exists $ndb->{$_}, keys %$db) {
812 delete $db->{$_}; 832 delete $db->{$_};
813 push @k, $_; 833 push @d, $_;
814 } 834 }
815 835
816 $_->($db, \@k) 836 $_->($db, \@a, \@c, \@d)
817 for values %{ $LOCAL_MON{$_[0]} }; 837 for values %{ $LOCAL_MON{$_[0]} };
818}; 838};
819 839
820# incremental update 840# incremental update
821$NODE_REQ{g_chg2} = sub { 841$NODE_REQ{g_chg2} = sub {
842 my ($family, $set, $del) = @_;
843
822 my $db = $MON_DB{$_[0]}; 844 my $db = $MON_DB{$family};
823 845
824 @_ >= 3 846 my (@a, @c);
825 ? $db->{$_[1]} = $_[2]
826 : delete $db->{$_[1]};
827 847
828 $_->($db, [$_[1]]) 848 while (my ($k, $v) = each %$set) {
849 exists $db->{$k}
850 ? push @c, $k
851 : push @a, $k;
852 $db->{$k} = $v;
853 }
854
855 delete @$db{@$del};
856
857 $_->($db, \@a, \@c, $del)
829 for values %{ $LOCAL_MON{$_[0]} }; 858 for values %{ $LOCAL_MON{$family} };
830}; 859};
831 860
832############################################################################# 861#############################################################################
833# configure 862# configure
834 863

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines