ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Global.pm
(Generate patch)

Comparing AnyEvent-MP/MP/Global.pm (file contents):
Revision 1.19 by root, Wed Sep 2 13:05:29 2009 UTC vs.
Revision 1.20 by root, Sat Sep 5 21:16:59 2009 UTC

34use AnyEvent::Util (); 34use AnyEvent::Util ();
35 35
36use AnyEvent::MP; 36use AnyEvent::MP;
37use AnyEvent::MP::Kernel; 37use AnyEvent::MP::Kernel;
38 38
39use base "Exporter";
40
41our @EXPORT = qw(
42 grp_reg
43 grp_get
44 grp_mon
45);
46
39our $VERSION = $AnyEvent::MP::VERSION; 47our $VERSION = $AnyEvent::MP::VERSION;
40 48
41our %addr; # port ID => [address...] mapping 49our %addr; # port ID => [address...] mapping
42 50
43our %port; # our rendezvous port on the other side 51our %port; # our rendezvous port on the other side
44our %lreg; # local registry, name => [pid...] 52our %lreg; # local registry, name => [pid...]
45our %lmon; # local rgeistry monitoring name,pid => mon 53our %lmon; # local registry monitoring name,pid => mon
46our %greg; # global regstry, name => [pid...] 54our %greg; # global regstry, name => pid => undef
55our %gmon; # group monitorign, group => [$cb...]
47 56
48our $nodecnt; 57our $nodecnt;
49 58
50$AnyEvent::MP::Kernel::WARN->(7, "starting global service."); 59$AnyEvent::MP::Kernel::WARN->(7, "starting global service.");
51 60
95 } 104 }
96} 105}
97 106
98############################################################################# 107#############################################################################
99 108
109sub _change {
110 my ($group, $add, $del) = @_;
111
112 my $kv = $greg{$group} ||= {};
113
114 delete @$kv{@$del};
115 @$kv{@$add} = ();
116
117 my $ports = [keys %$kv];
118 $_->($ports, $add, $del) for @{ $gmon{$group} };
119}
120
100sub unreg_groups($) { 121sub unreg_groups($) {
101 my ($node) = @_; 122 my ($node) = @_;
102 123
103 my $qr = qr/^\Q$node\E(?:#|$)/; 124 my $qr = qr/^\Q$node\E(?:#|$)/;
125 my @del;
104 126
105 for my $group (values %greg) { 127 while (my ($group, $ports) = each %greg) {
106 @$group = grep $_ !~ $qr, @$group; 128 @del = grep /$qr/, keys %$ports;
129 _change $group, [], \@del
130 if @del;
107 } 131 }
108} 132}
109 133
110sub set_groups($$) { 134sub set_groups($$) {
111 my ($node, $lreg) = @_; 135 my ($node, $lreg) = @_;
112 136
113 while (my ($k, $v) = each %$lreg) { 137 while (my ($k, $v) = each %$lreg) {
114 push @{ $greg{$k} }, @$v; 138 _change $k, $v, [];
115 } 139 }
116} 140}
117 141
118=item $guard = register $port, $group 142=item $guard = grp_reg $group, $port
119 143
120Register the given (local!) port in the named global group C<$group>. 144Register the given (local!) port in the named global group C<$group>.
121 145
122The port will be unregistered automatically when the port is destroyed. 146The port will be unregistered automatically when the port is destroyed.
123 147
124When not called in void context, then a guard object will be returned that 148When not called in void context, then a guard object will be returned that
125will also cause the name to be unregistered when destroyed. 149will also cause the name to be unregistered when destroyed.
126 150
127=cut 151=cut
128
129# register port from any node
130sub _register {
131 my ($port, $group) = @_;
132
133 push @{ $greg{$group} }, $port;
134}
135
136# unregister port from any node
137sub _unregister {
138 my ($port, $group) = @_;
139
140 @{ $greg{$group} } = grep $_ ne $port, @{ $greg{$group} };
141}
142 152
143# unregister local port 153# unregister local port
144sub unregister { 154sub unregister {
145 my ($port, $group) = @_; 155 my ($port, $group) = @_;
146 156
147 delete $lmon{"$group\x00$port"}; 157 delete $lmon{"$group\x00$port"};
148 @{ $lreg{$group} } = grep $_ ne $port, @{ $lreg{$group} }; 158 @{ $lreg{$group} } = grep $_ ne $port, @{ $lreg{$group} };
149 159
150 _unregister $port, $group; 160 _change $group, [], [$port];
151 161
152 snd $_, reg0 => $port, $group 162 snd $_, reg0 => $group, $port
153 for values %port; 163 for values %port;
154} 164}
155 165
156# register local port 166# register local port
157sub register($$) { 167sub grp_reg($$) {
158 my ($port, $group) = @_; 168 my ($group, $port) = @_;
159 169
160 port_is_local $port 170 port_is_local $port
161 or Carp::croak "AnyEvent::MP::Global::register can only be called for local ports, caught"; 171 or Carp::croak "AnyEvent::MP::Global::grp_reg can only be called for local ports, caught";
172
173 grep $_ eq $port, @{ $lreg{$group} }
174 and Carp::croak "'$group': group already registered, cannot register a second time";
162 175
163 $lmon{"$group\x00$port"} = mon $port, sub { unregister $port, $group }; 176 $lmon{"$group\x00$port"} = mon $port, sub { unregister $port, $group };
164 push @{ $lreg{$group} }, $port; 177 push @{ $lreg{$group} }, $port;
165 178
166 snd $_, reg1 => $port, $group 179 snd $_, reg1 => $group, $port
167 for values %port; 180 for values %port;
168 181
169 _register $port, $group; 182 _change $group, [$port], [];
170 183
171 wantarray && AnyEvent::Util::guard { unregister $port, $group } 184 wantarray && AnyEvent::Util::guard { unregister $port, $group }
172} 185}
173 186
174=item $ports = find $group 187=item $ports = grp_get $group
175 188
176Returns all the ports currently registered to the given group (as 189Returns all the ports currently registered to the given group (as
177read-only array reference). When the group has no registered members, 190read-only(!) array reference). When the group has no registered members,
178return C<undef>. 191return C<undef>.
179 192
180=cut 193=cut
181 194
182sub find($) { 195sub grp_get($) {
183 @{ $greg{$_[0]} } 196 my @ports = keys %{ $greg{$_[0]} };
184 ? $greg{$_[0]} 197 @ports ? \@ports : undef
198}
199
200=item $guard = grp_mon $group, $callback->($ports, $add, $del)
201
202Installs a monitor on the given group. Each time there is a change it
203will be called with the current group members as an arrayref as the
204first argument. The second argument only contains ports added, the third
205argument only ports removed.
206
207Unlike C<grp_get>, all three arguments will always be array-refs, even if
208the array is empty. None of the arrays must be modified in any way.
209
210The first invocation will be with the first two arguments set to the
211current members, as if all of them were just added, but only when the
212group is atcually non-empty.
213
214Optionally returns a guard object that uninstalls the watcher when it is
215destroyed.
216
217=cut
218
219sub grp_mon($$) {
220 my ($grp, $cb) = @_;
221
222 AnyEvent::MP::Kernel::delay sub {
223 return unless $cb;
224
225 push @{ $gmon{$grp} }, $cb;
226 $cb->(((grp_get $grp) || return) x 2, []);
227 };
228
229 defined wantarray && AnyEvent::Util::::guard {
230 my @mon = grep $_ != $cb, @{ delete $gmon{$grp} };
231 $gmon{$grp} = \@mon if @mon;
185 : undef 232 undef $cb;
233 }
186} 234}
187 235
188sub start_node { 236sub start_node {
189 my ($node) = @_; 237 my ($node) = @_;
190 238
251 if $addr{$othernode}; 299 if $addr{$othernode};
252 }, 300 },
253 set => sub { 301 set => sub {
254 set_groups $node, shift; 302 set_groups $node, shift;
255 }, 303 },
256 reg1 => \&_register, 304 reg0 => sub {
257 reg0 => \&_unregister, 305 _change $_[0], [], [$_[1]];
306 },
307 reg1 => sub {
308 _change $_[0], [$_[1]], [];
309 },
258 ; 310 ;
259} 311}
260 312
261sub mon_node { 313sub mon_node {
262 my ($node, $is_up) = @_; 314 my ($node, $is_up) = @_;

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines