… | |
… | |
696 | |
696 | |
697 | #d# db_values |
697 | #d# db_values |
698 | #d# db_family |
698 | #d# db_family |
699 | #d# db_key |
699 | #d# db_key |
700 | |
700 | |
701 | our %LOCAL_MON; |
701 | our %LOCAL_MON; # f, reply |
|
|
702 | our %MON_DB; # f, k, value |
702 | |
703 | |
|
|
704 | sub g_chg { |
|
|
705 | my $f = shift; |
|
|
706 | |
|
|
707 | $#_ ? $MON_DB{$f}{$_[0]} = $_[1] |
|
|
708 | : delete $MON_DB{$f}{$_[0]}; |
|
|
709 | |
|
|
710 | &{ $_->[0] } |
|
|
711 | for values %{ $LOCAL_MON{$f} }; |
|
|
712 | } |
|
|
713 | |
703 | sub db_mon($$@) { |
714 | sub db_mon($@) { |
704 | my ($family, $key, @reply) = @_; |
715 | my ($family, @reply) = @_; |
705 | |
716 | |
|
|
717 | my $reply = \@reply; |
706 | my $id = \@reply + 0; |
718 | my $id = $reply + 0; |
707 | |
719 | |
|
|
720 | if (%{ $LOCAL_MON{$family} }) { |
|
|
721 | # if we already monitor this thingy, generate |
|
|
722 | # create events for all of them |
|
|
723 | while (my ($key, $value) = each %{ $MON_DB{$family} }) { |
|
|
724 | $reply->[0]->($key, $value); |
|
|
725 | } |
|
|
726 | } else { |
|
|
727 | # new monitor, request chg1 from upstream |
|
|
728 | global_req_add "mon1 $family" => [g_mon1 => $family]; |
|
|
729 | $MON_DB{$family} = {}; |
|
|
730 | } |
|
|
731 | |
708 | $LOCAL_MON{$family}{$key}{$id} = \@reply; |
732 | $LOCAL_MON{$family}{$id} = \@reply; |
709 | |
|
|
710 | # always request the data, to generate initial change requests |
|
|
711 | global_req_add "mon1 $family $key" => [g_mon1 => $family => $key]; |
|
|
712 | |
733 | |
713 | Guard::guard { |
734 | Guard::guard { |
714 | my $key = $LOCAL_MON{$family}{$key}; |
735 | my $mon = $LOCAL_MON{$family}; |
715 | delete $key->{$id}; |
736 | delete $mon->{$id}; |
716 | |
737 | |
717 | unless (%$key) { |
738 | unless (%$mon) { |
|
|
739 | global_req_del "mon1 $family"; |
|
|
740 | |
718 | # no global_req, because we don't care if we are not connected |
741 | # no global_req, because we don't care if we are not connected |
719 | snd $MASTER, g_mon0 => $family => $key |
742 | snd $MASTER, g_mon0 => $family |
720 | if $MASTER; |
743 | if $MASTER; |
721 | |
744 | |
722 | delete $LOCAL_MON{$family}{$key}; |
|
|
723 | delete $LOCAL_MON{$family} |
745 | delete $LOCAL_MON{$family}; |
724 | unless %{ $LOCAL_MON{$family} }; |
746 | delete $MON_DB{$family}; |
725 | } |
747 | } |
726 | } |
748 | } |
727 | } |
749 | } |
728 | |
750 | |
729 | $NODE_REQ{g_chg1} = sub { |
751 | $NODE_REQ{g_chg1} = sub { |
730 | warn "one big <@_>\n";#d# |
752 | my ($f, $db) = @_; |
|
|
753 | |
|
|
754 | my $odb = delete $MON_DB{$f}; |
|
|
755 | |
|
|
756 | for (keys %$odb) { |
|
|
757 | g_chg $f, $_ |
|
|
758 | unless exists $db->{$_}; |
|
|
759 | } |
|
|
760 | |
|
|
761 | while (my ($k, $v) = each %$db) { |
|
|
762 | g_chg $f, $k, $v; |
|
|
763 | } |
731 | }; |
764 | }; |
732 | |
765 | |
733 | $NODE_REQ{g_chg2} = sub { |
766 | $NODE_REQ{g_chg2} = \&g_chg; |
734 | }; |
|
|
735 | |
767 | |
736 | ############################################################################# |
768 | ############################################################################# |
737 | # configure |
769 | # configure |
738 | |
770 | |
739 | sub _nodename { |
771 | sub nodename { |
740 | require POSIX; |
772 | require POSIX; |
741 | (POSIX::uname ())[1] |
773 | (POSIX::uname ())[1] |
742 | } |
774 | } |
743 | |
775 | |
744 | sub _resolve($) { |
776 | sub _resolve($) { |
… | |
… | |
758 | |
790 | |
759 | my $idx; |
791 | my $idx; |
760 | for my $t (split /,/, $nodeid) { |
792 | for my $t (split /,/, $nodeid) { |
761 | my $pri = ++$idx; |
793 | my $pri = ++$idx; |
762 | |
794 | |
763 | $t = length $t ? _nodename . ":$t" : _nodename |
795 | $t = length $t ? nodename . ":$t" : nodename |
764 | if $t =~ /^\d*$/; |
796 | if $t =~ /^\d*$/; |
765 | |
797 | |
766 | my ($host, $port) = AnyEvent::Socket::parse_hostport $t, 0 |
798 | my ($host, $port) = AnyEvent::Socket::parse_hostport $t, 0 |
767 | or Carp::croak "$t: unparsable transport descriptor"; |
799 | or Carp::croak "$t: unparsable transport descriptor"; |
768 | |
800 | |
… | |
… | |
829 | delete $NODE{$NODE}; # we do not support doing stuff before configure |
861 | delete $NODE{$NODE}; # we do not support doing stuff before configure |
830 | _init_names; |
862 | _init_names; |
831 | |
863 | |
832 | my $profile = delete $kv{profile}; |
864 | my $profile = delete $kv{profile}; |
833 | |
865 | |
834 | $profile = _nodename |
866 | $profile = nodename |
835 | unless defined $profile; |
867 | unless defined $profile; |
836 | |
868 | |
837 | $CONFIG = AnyEvent::MP::Config::find_profile $profile, %kv; |
869 | $CONFIG = AnyEvent::MP::Config::find_profile $profile, %kv; |
838 | |
870 | |
839 | my $node = exists $CONFIG->{nodeid} ? $CONFIG->{nodeid} : "$profile/"; |
871 | my $node = exists $CONFIG->{nodeid} ? $CONFIG->{nodeid} : "$profile/"; |
840 | |
872 | |
841 | $node or Carp::croak "$node: illegal node ID (see AnyEvent::MP manpage for syntax)\n"; |
873 | $node or Carp::croak "$node: illegal node ID (see AnyEvent::MP manpage for syntax)\n"; |
842 | |
874 | |
843 | $NODE = $node; |
875 | $NODE = $node; |
844 | |
876 | |
845 | if ($NODE =~ s%/$%/$RUNIQ%) { |
877 | $NODE =~ s/%n/nodename/ge; |
|
|
878 | |
|
|
879 | if ($NODE =~ s!(?:(?<=/)$|%u)!$RUNIQ!g) { |
846 | # nodes with randomised node names do not need randomised port names |
880 | # nodes with randomised node names do not need randomised port names |
847 | $UNIQ = ""; |
881 | $UNIQ = ""; |
848 | } |
882 | } |
849 | |
883 | |
850 | $NODE{$NODE} = $NODE{""}; |
884 | $NODE{$NODE} = $NODE{""}; |