… | |
… | |
34 | use AnyEvent::Util (); |
34 | use AnyEvent::Util (); |
35 | |
35 | |
36 | use AnyEvent::MP; |
36 | use AnyEvent::MP; |
37 | use AnyEvent::MP::Kernel; |
37 | use AnyEvent::MP::Kernel; |
38 | |
38 | |
|
|
39 | use base "Exporter"; |
|
|
40 | |
|
|
41 | our @EXPORT = qw( |
|
|
42 | grp_reg |
|
|
43 | grp_get |
|
|
44 | grp_mon |
|
|
45 | ); |
|
|
46 | |
39 | our $VERSION = $AnyEvent::MP::VERSION; |
47 | our $VERSION = $AnyEvent::MP::VERSION; |
40 | |
48 | |
41 | our %addr; # port ID => [address...] mapping |
49 | our %addr; # port ID => [address...] mapping |
42 | |
50 | |
43 | our %port; # our rendezvous port on the other side |
51 | our %port; # our rendezvous port on the other side |
44 | our %lreg; # local registry, name => [pid...] |
52 | our %lreg; # local registry, name => [pid...] |
45 | our %lmon; # local rgeistry monitoring name,pid => mon |
53 | our %lmon; # local registry monitoring name,pid => mon |
46 | our %greg; # global regstry, name => [pid...] |
54 | our %greg; # global regstry, name => pid => undef |
|
|
55 | our %gmon; # group monitorign, group => [$cb...] |
47 | |
56 | |
48 | our $nodecnt; |
57 | our $nodecnt; |
49 | |
58 | |
50 | $AnyEvent::MP::Kernel::WARN->(7, "starting global service."); |
59 | $AnyEvent::MP::Kernel::WARN->(7, "starting global service."); |
51 | |
60 | |
… | |
… | |
95 | } |
104 | } |
96 | } |
105 | } |
97 | |
106 | |
98 | ############################################################################# |
107 | ############################################################################# |
99 | |
108 | |
|
|
109 | sub _change { |
|
|
110 | my ($group, $add, $del) = @_; |
|
|
111 | |
|
|
112 | my $kv = $greg{$group} ||= {}; |
|
|
113 | |
|
|
114 | delete @$kv{@$del}; |
|
|
115 | @$kv{@$add} = (); |
|
|
116 | |
|
|
117 | my $ports = [keys %$kv]; |
|
|
118 | $_->($ports, $add, $del) for @{ $gmon{$group} }; |
|
|
119 | } |
|
|
120 | |
100 | sub unreg_groups($) { |
121 | sub unreg_groups($) { |
101 | my ($node) = @_; |
122 | my ($node) = @_; |
102 | |
123 | |
103 | my $qr = qr/^\Q$node\E(?:#|$)/; |
124 | my $qr = qr/^\Q$node\E(?:#|$)/; |
|
|
125 | my @del; |
104 | |
126 | |
105 | for my $group (values %greg) { |
127 | while (my ($group, $ports) = each %greg) { |
106 | @$group = grep $_ !~ $qr, @$group; |
128 | @del = grep /$qr/, keys %$ports; |
|
|
129 | _change $group, [], \@del |
|
|
130 | if @del; |
107 | } |
131 | } |
108 | } |
132 | } |
109 | |
133 | |
110 | sub set_groups($$) { |
134 | sub set_groups($$) { |
111 | my ($node, $lreg) = @_; |
135 | my ($node, $lreg) = @_; |
112 | |
136 | |
113 | while (my ($k, $v) = each %$lreg) { |
137 | while (my ($k, $v) = each %$lreg) { |
114 | push @{ $greg{$k} }, @$v; |
138 | _change $k, $v, []; |
115 | } |
139 | } |
116 | } |
140 | } |
117 | |
141 | |
118 | =item $guard = register $port, $group |
142 | =item $guard = grp_reg $group, $port |
119 | |
143 | |
120 | Register the given (local!) port in the named global group C<$group>. |
144 | Register the given (local!) port in the named global group C<$group>. |
121 | |
145 | |
122 | The port will be unregistered automatically when the port is destroyed. |
146 | The port will be unregistered automatically when the port is destroyed. |
123 | |
147 | |
124 | When not called in void context, then a guard object will be returned that |
148 | When not called in void context, then a guard object will be returned that |
125 | will also cause the name to be unregistered when destroyed. |
149 | will also cause the name to be unregistered when destroyed. |
126 | |
150 | |
127 | =cut |
151 | =cut |
128 | |
|
|
129 | # register port from any node |
|
|
130 | sub _register { |
|
|
131 | my ($port, $group) = @_; |
|
|
132 | |
|
|
133 | push @{ $greg{$group} }, $port; |
|
|
134 | } |
|
|
135 | |
|
|
136 | # unregister port from any node |
|
|
137 | sub _unregister { |
|
|
138 | my ($port, $group) = @_; |
|
|
139 | |
|
|
140 | @{ $greg{$group} } = grep $_ ne $port, @{ $greg{$group} }; |
|
|
141 | } |
|
|
142 | |
152 | |
143 | # unregister local port |
153 | # unregister local port |
144 | sub unregister { |
154 | sub unregister { |
145 | my ($port, $group) = @_; |
155 | my ($port, $group) = @_; |
146 | |
156 | |
147 | delete $lmon{"$group\x00$port"}; |
157 | delete $lmon{"$group\x00$port"}; |
148 | @{ $lreg{$group} } = grep $_ ne $port, @{ $lreg{$group} }; |
158 | @{ $lreg{$group} } = grep $_ ne $port, @{ $lreg{$group} }; |
149 | |
159 | |
150 | _unregister $port, $group; |
160 | _change $group, [], [$port]; |
151 | |
161 | |
152 | snd $_, reg0 => $port, $group |
162 | snd $_, reg0 => $group, $port |
153 | for values %port; |
163 | for values %port; |
154 | } |
164 | } |
155 | |
165 | |
156 | # register local port |
166 | # register local port |
157 | sub register($$) { |
167 | sub grp_reg($$) { |
158 | my ($port, $group) = @_; |
168 | my ($group, $port) = @_; |
159 | |
169 | |
160 | port_is_local $port |
170 | port_is_local $port |
161 | or Carp::croak "AnyEvent::MP::Global::register can only be called for local ports, caught"; |
171 | or Carp::croak "AnyEvent::MP::Global::grp_reg can only be called for local ports, caught"; |
|
|
172 | |
|
|
173 | grep $_ eq $port, @{ $lreg{$group} } |
|
|
174 | and Carp::croak "'$group': group already registered, cannot register a second time"; |
162 | |
175 | |
163 | $lmon{"$group\x00$port"} = mon $port, sub { unregister $port, $group }; |
176 | $lmon{"$group\x00$port"} = mon $port, sub { unregister $port, $group }; |
164 | push @{ $lreg{$group} }, $port; |
177 | push @{ $lreg{$group} }, $port; |
165 | |
178 | |
166 | snd $_, reg1 => $port, $group |
179 | snd $_, reg1 => $group, $port |
167 | for values %port; |
180 | for values %port; |
168 | |
181 | |
169 | _register $port, $group; |
182 | _change $group, [$port], []; |
170 | |
183 | |
171 | wantarray && AnyEvent::Util::guard { unregister $port, $group } |
184 | wantarray && AnyEvent::Util::guard { unregister $port, $group } |
172 | } |
185 | } |
173 | |
186 | |
174 | =item $ports = find $group |
187 | =item $ports = grp_get $group |
175 | |
188 | |
176 | Returns all the ports currently registered to the given group (as |
189 | Returns all the ports currently registered to the given group (as |
177 | read-only array reference). When the group has no registered members, |
190 | read-only(!) array reference). When the group has no registered members, |
178 | return C<undef>. |
191 | return C<undef>. |
179 | |
192 | |
180 | =cut |
193 | =cut |
181 | |
194 | |
182 | sub find($) { |
195 | sub grp_get($) { |
183 | @{ $greg{$_[0]} } |
196 | my @ports = keys %{ $greg{$_[0]} }; |
184 | ? $greg{$_[0]} |
197 | @ports ? \@ports : undef |
|
|
198 | } |
|
|
199 | |
|
|
200 | =item $guard = grp_mon $group, $callback->($ports, $add, $del) |
|
|
201 | |
|
|
202 | Installs a monitor on the given group. Each time there is a change it |
|
|
203 | will be called with the current group members as an arrayref as the |
|
|
204 | first argument. The second argument only contains ports added, the third |
|
|
205 | argument only ports removed. |
|
|
206 | |
|
|
207 | Unlike C<grp_get>, all three arguments will always be array-refs, even if |
|
|
208 | the array is empty. None of the arrays must be modified in any way. |
|
|
209 | |
|
|
210 | The first invocation will be with the first two arguments set to the |
|
|
211 | current members, as if all of them were just added, but only when the |
|
|
212 | group is atcually non-empty. |
|
|
213 | |
|
|
214 | Optionally returns a guard object that uninstalls the watcher when it is |
|
|
215 | destroyed. |
|
|
216 | |
|
|
217 | =cut |
|
|
218 | |
|
|
219 | sub grp_mon($$) { |
|
|
220 | my ($grp, $cb) = @_; |
|
|
221 | |
|
|
222 | AnyEvent::MP::Kernel::delay sub { |
|
|
223 | return unless $cb; |
|
|
224 | |
|
|
225 | push @{ $gmon{$grp} }, $cb; |
|
|
226 | $cb->(((grp_get $grp) || return) x 2, []); |
|
|
227 | }; |
|
|
228 | |
|
|
229 | defined wantarray && AnyEvent::Util::::guard { |
|
|
230 | my @mon = grep $_ != $cb, @{ delete $gmon{$grp} }; |
|
|
231 | $gmon{$grp} = \@mon if @mon; |
185 | : undef |
232 | undef $cb; |
|
|
233 | } |
186 | } |
234 | } |
187 | |
235 | |
188 | sub start_node { |
236 | sub start_node { |
189 | my ($node) = @_; |
237 | my ($node) = @_; |
190 | |
238 | |
… | |
… | |
251 | if $addr{$othernode}; |
299 | if $addr{$othernode}; |
252 | }, |
300 | }, |
253 | set => sub { |
301 | set => sub { |
254 | set_groups $node, shift; |
302 | set_groups $node, shift; |
255 | }, |
303 | }, |
256 | reg1 => \&_register, |
304 | reg0 => sub { |
257 | reg0 => \&_unregister, |
305 | _change $_[0], [], [$_[1]]; |
|
|
306 | }, |
|
|
307 | reg1 => sub { |
|
|
308 | _change $_[0], [$_[1]], []; |
|
|
309 | }, |
258 | ; |
310 | ; |
259 | } |
311 | } |
260 | |
312 | |
261 | sub mon_node { |
313 | sub mon_node { |
262 | my ($node, $is_up) = @_; |
314 | my ($node, $is_up) = @_; |