… | |
… | |
589 | # sent by all slave nodes (slave to master) |
589 | # sent by all slave nodes (slave to master) |
590 | # g_slave database - make other global node master of the sender |
590 | # g_slave database - make other global node master of the sender |
591 | # |
591 | # |
592 | # sent by any node to global nodes |
592 | # sent by any node to global nodes |
593 | # g_set database - set whole database |
593 | # g_set database - set whole database |
594 | # g_add family key val - add/replace key to database |
594 | # g_upd family set del - update single family |
595 | # g_del family key - delete key from database |
595 | # g_del family key - delete key from database |
596 | # g_get family key reply... - send reply with data |
596 | # g_get family key reply... - send reply with data |
597 | # |
597 | # |
598 | # send by global nodes |
598 | # send by global nodes |
599 | # g_global - node became global, similar to global=1 greeting |
599 | # g_global - node became global, similar to global=1 greeting |
… | |
… | |
721 | # local database operations |
721 | # local database operations |
722 | |
722 | |
723 | # local database management |
723 | # local database management |
724 | |
724 | |
725 | sub db_set($$;$) { |
725 | sub db_set($$;$) { |
|
|
726 | # if (ref $_[1]) { |
|
|
727 | # # bulk |
|
|
728 | # my @del = grep exists $LOCAL_DB{$_[0]}{$_}, keys ${ $_[1] }; |
|
|
729 | # $LOCAL_DB{$_[0]} = $_[1]; |
|
|
730 | # snd $MASTER, g_upd => $_[0] => $_[1], \@del |
|
|
731 | # if defined $MASTER; |
|
|
732 | # } else { |
|
|
733 | # single-key |
726 | $LOCAL_DB{$_[0]}{$_[1]} = $_[2]; |
734 | $LOCAL_DB{$_[0]}{$_[1]} = $_[2]; |
727 | snd $MASTER, g_add => $_[0] => $_[1] => $_[2] |
735 | snd $MASTER, g_upd => $_[0] => { $_[1] => $_[2] } |
728 | if defined $MASTER; |
736 | if defined $MASTER; |
|
|
737 | # } |
729 | } |
738 | } |
730 | |
739 | |
731 | sub db_del($$) { |
740 | sub db_del($@) { |
732 | delete $LOCAL_DB{$_[0]}{$_[1]}; |
741 | my $family = shift; |
733 | snd $MASTER, g_del => $_[0] => $_[1] |
742 | |
|
|
743 | delete @{ $LOCAL_DB{$family} }{@_}; |
|
|
744 | snd $MASTER, g_upd => $family => undef, \@_ |
734 | if defined $MASTER; |
745 | if defined $MASTER; |
735 | } |
746 | } |
736 | |
747 | |
737 | sub db_reg($$;$) { |
748 | sub db_reg($$;$) { |
738 | my ($family, $key) = @_; |
749 | my ($family, $key) = @_; |
… | |
… | |
764 | |
775 | |
765 | sub db_mon($@) { |
776 | sub db_mon($@) { |
766 | my ($family, $cb) = @_; |
777 | my ($family, $cb) = @_; |
767 | |
778 | |
768 | if (my $db = $MON_DB{$family}) { |
779 | if (my $db = $MON_DB{$family}) { |
769 | # if we already monitor this thingy, generate |
780 | # we already monitor, so create a "dummy" change event |
770 | # create events for all of them |
781 | # this is postponed, which might be too late (we could process |
|
|
782 | # change events), so disable the callback at first |
|
|
783 | $LOCAL_MON{$family}{$cb+0} = sub { }; |
|
|
784 | AE::postpone { |
|
|
785 | return unless exists $LOCAL_MON{$family}{$cb+0}; # guard might have gone away already |
|
|
786 | |
|
|
787 | # set actual callback |
|
|
788 | $LOCAL_MON{$family}{$cb+0} = $cb; |
771 | $cb->($db, [keys %$db]); |
789 | $cb->($db, [keys %$db]); |
|
|
790 | }; |
772 | } else { |
791 | } else { |
773 | # new monitor, request chg1 from upstream |
792 | # new monitor, request chg1 from upstream |
|
|
793 | $LOCAL_MON{$family}{$cb+0} = $cb; |
774 | global_req_add "mon1 $family" => [g_mon1 => $family]; |
794 | global_req_add "mon1 $family" => [g_mon1 => $family]; |
775 | $MON_DB{$family} = {}; |
795 | $MON_DB{$family} = {}; |
776 | } |
796 | } |
777 | |
|
|
778 | $LOCAL_MON{$family}{$cb+0} = $cb; |
|
|
779 | |
797 | |
780 | Guard::guard { |
798 | Guard::guard { |
781 | my $mon = $LOCAL_MON{$family}; |
799 | my $mon = $LOCAL_MON{$family}; |
782 | delete $mon->{$cb+0}; |
800 | delete $mon->{$cb+0}; |
783 | |
801 | |
… | |
… | |
797 | # full update |
815 | # full update |
798 | $NODE_REQ{g_chg1} = sub { |
816 | $NODE_REQ{g_chg1} = sub { |
799 | my ($f, $ndb) = @_; |
817 | my ($f, $ndb) = @_; |
800 | |
818 | |
801 | my $db = $MON_DB{$f}; |
819 | my $db = $MON_DB{$f}; |
802 | my @k; |
820 | my (@a, @c, @d); |
803 | |
821 | |
804 | # add or replace keys |
822 | # add or replace keys |
805 | while (my ($k, $v) = each %$ndb) { |
823 | while (my ($k, $v) = each %$ndb) { |
|
|
824 | exists $db->{$k} |
|
|
825 | ? push @c, $k |
|
|
826 | : push @a, $k; |
806 | $db->{$k} = $v; |
827 | $db->{$k} = $v; |
807 | push @k, $k; |
|
|
808 | } |
828 | } |
809 | |
829 | |
810 | # delete keys that are no longer present |
830 | # delete keys that are no longer present |
811 | for (grep !exists $ndb->{$_}, keys %$db) { |
831 | for (grep !exists $ndb->{$_}, keys %$db) { |
812 | delete $db->{$_}; |
832 | delete $db->{$_}; |
813 | push @k, $_; |
833 | push @d, $_; |
814 | } |
834 | } |
815 | |
835 | |
816 | $_->($db, \@k) |
836 | $_->($db, \@a, \@c, \@d) |
817 | for values %{ $LOCAL_MON{$_[0]} }; |
837 | for values %{ $LOCAL_MON{$_[0]} }; |
818 | }; |
838 | }; |
819 | |
839 | |
820 | # incremental update |
840 | # incremental update |
821 | $NODE_REQ{g_chg2} = sub { |
841 | $NODE_REQ{g_chg2} = sub { |
|
|
842 | my ($family, $set, $del) = @_; |
|
|
843 | |
822 | my $db = $MON_DB{$_[0]}; |
844 | my $db = $MON_DB{$family}; |
823 | |
845 | |
824 | @_ >= 3 |
846 | my (@a, @c); |
825 | ? $db->{$_[1]} = $_[2] |
|
|
826 | : delete $db->{$_[1]}; |
|
|
827 | |
847 | |
828 | $_->($db, [$_[1]]) |
848 | while (my ($k, $v) = each %$set) { |
|
|
849 | exists $db->{$k} |
|
|
850 | ? push @c, $k |
|
|
851 | : push @a, $k; |
|
|
852 | $db->{$k} = $v; |
|
|
853 | } |
|
|
854 | |
|
|
855 | delete @$db{@$del}; |
|
|
856 | |
|
|
857 | $_->($db, \@a, \@c, $del) |
829 | for values %{ $LOCAL_MON{$_[0]} }; |
858 | for values %{ $LOCAL_MON{$family} }; |
830 | }; |
859 | }; |
831 | |
860 | |
832 | ############################################################################# |
861 | ############################################################################# |
833 | # configure |
862 | # configure |
834 | |
863 | |