ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Global.pm
(Generate patch)

Comparing AnyEvent-MP/MP/Global.pm (file contents):
Revision 1.48 by root, Fri Mar 2 19:21:16 2012 UTC vs.
Revision 1.49 by root, Sat Mar 3 11:38:43 2012 UTC

28 28
29use common::sense; 29use common::sense;
30use Carp (); 30use Carp ();
31 31
32use AnyEvent (); 32use AnyEvent ();
33use AnyEvent::Util ();
34 33
35use AnyEvent::MP; 34use AnyEvent::MP;
36use AnyEvent::MP::Kernel; 35use AnyEvent::MP::Kernel;
37use AnyEvent::MP::Transport ();
38
39use base "Exporter";
40
41our @EXPORT = qw(
42 grp_reg
43 grp_get
44 grp_mon
45);
46 36
47$AnyEvent::MP::Kernel::WARN->(7, "starting global service."); 37$AnyEvent::MP::Kernel::WARN->(7, "starting global service.");
48 38
49############################################################################# 39#############################################################################
50# node protocol parts for global nodes 40# node protocol parts for global nodes
51 41
52{
53 package AnyEvent::MP::Kernel; 42package AnyEvent::MP::Kernel;
54 43
55 # TODO: this is ugly, maybe this should go into MP::Kernel or a separate module #d# 44# TODO: this is ugly (classical use vars vs. our),
45# maybe this should go into MP::Kernel
56 46
57 our %NODE; 47our %NODE;
58 our $NODE; 48our $NODE;
59 our $LISTENER; 49our $LISTENER;
60 50
61 our $GLOBAL; 51our $GLOBAL;
62 our $MASTER; 52our $MASTER;
63 our $MASTER_MON; 53our $MASTER_MON;
64 our $MASTER_TIMER; 54our $MASTER_TIMER;
65 55
66 our %GLOBAL_SLAVE; 56our %GLOBAL_SLAVE;
67 57
68 our %GLOBAL_DB; # global db 58our %GLOBAL_DB; # global db
69 our %LOCAL_DBS; # local databases of other global nodes 59our %LOCAL_DBS; # local databases of other global nodes
70 our %LOCAL_DB; # this node database 60our %LOCAL_DB; # this node database
71 61
72 our $SRCNODE; 62our $SRCNODE;
73 our %node_req; 63our %NODE_REQ;
74 64
75 # only in global code 65# only in global code
76 our %GLOBAL_MON; # monitors {family}{"" or key} 66our %GLOBAL_MON; # monitors {family}{"" or key}
77 67
78 sub other_globals() { 68sub other_globals() {
79 grep $_ ne $NODE && node_is_up $_, keys %{ $GLOBAL_DB{"'g"} } 69 grep $_ ne $NODE && node_is_up $_, keys %{ $GLOBAL_DB{"'g"} }
80 } 70}
81 71
82 # broadcasts a message to all other global nodes 72# broadcasts a message to all other global nodes
83 sub g_broadcast { 73sub g_broadcast {
84 snd $_, @_ 74 snd $_, @_
85 for other_globals; 75 for other_globals;
86 } 76}
87 77
88 sub g_mon_check { 78sub g_mon_check {
89 warn "g_mon_check<@_>\n";#d# 79 warn "g_mon_check<@_>\n";#d#
90 80
91 my %node = ( 81 my %node = (
92 %{ $GLOBAL_MON{$_[1]}{$_[2]} }, 82 %{ $GLOBAL_MON{$_[1]}{$_[2]} },
93 %{ $GLOBAL_MON{$_[1]}{"" } }, 83 %{ $GLOBAL_MON{$_[1]}{"" } },
94 %{ $GLOBAL_MON{"" }{"" } }, 84 %{ $GLOBAL_MON{"" }{"" } },
95 ); 85 );
96 86
97 snd $_ => g_chg1 => $_[1], $_[2], @_ > 2 ? $_[3] : () 87 snd $_ => g_chg1 => $_[1], $_[2], @_ > 2 ? $_[3] : ()
98 for keys %node; 88 for keys %node;
99 } 89}
100 90
101 # add/replace a key in the database 91# add/replace a key in the database
102 sub g_add($$$$) { 92sub g_add($$$$) {
103 $LOCAL_DBS{$_[0]}{$_[1]}{$_[2]} = 93 $LOCAL_DBS{$_[0]}{$_[1]}{$_[2]} =
104 $GLOBAL_DB {$_[1]}{$_[2]} = $_[3]; 94 $GLOBAL_DB {$_[1]}{$_[2]} = $_[3];
105 95
106 g_broadcast g_add => $_[1] => $_[2] => $_[3] 96 g_broadcast g_add => $_[1] => $_[2] => $_[3]
107 if exists $GLOBAL_SLAVE{$_[0]}; 97 if exists $GLOBAL_SLAVE{$_[0]};
108 98
109 warn "g_add<@_>\n";#d# 99 warn "g_add<@_>\n";#d#
110 &g_mon_check; 100 &g_mon_check;
111 } 101}
112 102
113 # delete a key from the database 103# delete a key from the database
114 sub g_del($$$) { 104sub g_del($$$) {
115 delete $LOCAL_DBS{$_[0]}{$_[1]}{$_[2]}; 105 delete $LOCAL_DBS{$_[0]}{$_[1]}{$_[2]};
116 106
117 g_broadcast g_del => $_[1] => $_[2] 107 g_broadcast g_del => $_[1] => $_[2]
118 if exists $GLOBAL_SLAVE{$_[0]}; 108 if exists $GLOBAL_SLAVE{$_[0]};
119 109
120 delete $GLOBAL_DB{$_[1]}{$_[2]}; 110 delete $GLOBAL_DB{$_[1]}{$_[2]};
121 111
122 # check if other node maybe still has the key, then we don't delete, but add 112 # check if other node maybe still has the key, then we don't delete, but add
123 for (values %LOCAL_DBS) { 113 for (values %LOCAL_DBS) {
124 if (exists $_->{$_[1]}{$_[2]}) { 114 if (exists $_->{$_[1]}{$_[2]}) {
125 $GLOBAL_DB{$_[1]}{$_[2]} = $_->{$_[1]}{$_[2]}; 115 $GLOBAL_DB{$_[1]}{$_[2]} = $_->{$_[1]}{$_[2]};
126 last; 116 last;
127 }
128 } 117 }
118 }
129 119
130 warn "g_del<@_>\n";#d# 120 warn "g_del<@_>\n";#d#
131 &g_mon_check; 121 &g_mon_check;
132 } 122}
133 123
134 # delete all keys from a database 124# delete all keys from a database
135 sub g_clr($) { 125sub g_clr($) {
136 my ($node) = @_; 126 my ($node) = @_;
137 127
138 my $db = $LOCAL_DBS{$node}; 128 my $db = $LOCAL_DBS{$node};
139 while (my ($f, $k) = each %$db) { 129 while (my ($f, $k) = each %$db) {
140 g_del $node, $f => $_ 130 g_del $node, $f => $_
131 for keys %$k;
132 }
133
134 delete $LOCAL_DBS{$node};
135}
136
137# set the whole (node-local) database - previous value must be empty
138sub g_set($$) {
139 my ($node, $db) = @_;
140
141 while (my ($f, $k) = each %$db) {
142 g_add $node, $f => $_ => delete $k->{$_}
143 for keys %$k;
144 }
145}
146
147# gather node databases from slaves
148
149# other node wants to make us the master
150$NODE_REQ{g_slave} = sub {
151 my ($db) = @_;
152
153 my $node = $SRCNODE->{id};
154 undef $GLOBAL_SLAVE{$node};
155 g_set $node, $db;
156};
157
158$NODE_REQ{g_add} = sub {
159 &g_add ($SRCNODE->{id}, @_);
160};
161
162$NODE_REQ{g_del} = sub {
163 &g_del ($SRCNODE->{id}, @_);
164};
165
166$NODE_REQ{g_set} = sub {
167 g_set $SRCNODE->{id}, $_[0];
168};
169
170$NODE_REQ{g_find} = sub {
171 my ($node) = @_;
172
173 snd $SRCNODE->{id}, g_found => $node, $GLOBAL_DB{"'l"}{$node};
174};
175
176# monitoring
177
178sub g_slave_disconnect($) {
179 my ($node) = @_;
180
181 g_clr $node;
182
183 if (my $mon = delete $GLOBAL_SLAVE{$node}) {
184 while (my ($f, $fv) = each %$mon) {
185 delete $GLOBAL_MON{$f}{$_}
141 for keys %$k; 186 for keys %$fv;
142 } 187 }
143
144 delete $LOCAL_DBS{$node};
145 }
146
147 # set the whole (node-local) database - previous value must be empty
148 sub g_set($$) {
149 my ($node, $db) = @_;
150
151 while (my ($f, $k) = each %$db) {
152 g_add $node, $f => $_ => delete $k->{$_}
153 for keys %$k;
154 }
155 }
156
157 # gather node databases from slaves
158
159 # other node wants to make us the master
160 $node_req{g_slave} = sub {
161 my ($db) = @_;
162
163 my $node = $SRCNODE->{id};
164 undef $GLOBAL_SLAVE{$node};
165 g_set $node, $db;
166 }; 188 }
189}
167 190
168 $node_req{g_add} = sub {
169 &g_add ($SRCNODE->{id}, @_);
170 };
171
172 $node_req{g_del} = sub {
173 &g_del ($SRCNODE->{id}, @_);
174 };
175
176 $node_req{g_set} = sub {
177 g_set $SRCNODE->{id}, $_[0];
178 };
179
180 $node_req{g_find} = sub {
181 my ($node) = @_;
182
183 snd $SRCNODE->{id}, g_found => $node, $GLOBAL_DB{"'l"}{$node};
184 };
185
186 # monitoring
187
188 sub g_slave_disconnect($) {
189 my ($node) = @_;
190
191 g_clr $node;
192
193 if (my $mon = delete $GLOBAL_SLAVE{$node}) {
194 while (my ($f, $fv) = each %$mon) {
195 delete $GLOBAL_MON{$f}{$_}
196 for keys %$fv;
197 }
198 }
199 }
200
201 # g_mon0 family key - stop monitoring 191# g_mon0 family key - stop monitoring
202 $node_req{g_mon0} = sub { 192$NODE_REQ{g_mon0} = sub {
203 delete $GLOBAL_MON{$_[0]}{$_[1]}{$SRCNODE->{id}}; 193 delete $GLOBAL_MON{$_[0]}{$_[1]}{$SRCNODE->{id}};
204 delete $GLOBAL_SLAVE{$SRCNODE->{id}}{$_[0]}{$_[1]}; 194 delete $GLOBAL_SLAVE{$SRCNODE->{id}}{$_[0]}{$_[1]};
205 }; 195};
206 196
207 # g_mon1 family key - start monitoring 197# g_mon1 family key - start monitoring
208 $node_req{g_mon1} = sub { 198$NODE_REQ{g_mon1} = sub {
209 undef $GLOBAL_SLAVE{$SRCNODE->{id}}{$_[0]}{$_[1]}; 199 undef $GLOBAL_SLAVE{$SRCNODE->{id}}{$_[0]}{$_[1]};
210 undef $GLOBAL_MON{$_[0]}{$_[1]}{$SRCNODE->{id}}; 200 undef $GLOBAL_MON{$_[0]}{$_[1]}{$SRCNODE->{id}};
211 #d# generate lots of initial change requests, or one big? 201 #d# generate lots of initial change requests, or one big?
212 202
213 snd $SRCNODE->{id}, g_chg0 => $_[0], $_[1] 203 snd $SRCNODE->{id}, g_chg0 => $_[0], $_[1]
214 }; 204};
215 205
216 ############################################################################# 206#############################################################################
217 # switch to global mode 207# switch to global mode
218 208
219 $GLOBAL = 1; 209$GLOBAL = 1;
220 $MASTER = $NODE; 210$MASTER = $NODE;
221 undef $GLOBAL_SLAVE{$NODE}; # we are our own master (and slave) 211undef $GLOBAL_SLAVE{$NODE}; # we are our own master (and slave)
222 $LOCAL_DBS{$NODE} = { %LOCAL_DB }; 212$LOCAL_DBS{$NODE} = { %LOCAL_DB };
223 213
224 # regularly try to connect to global nodes - maybe use seeding code? 214# regularly try to connect to global nodes - maybe use seeding code?
225 $MASTER_TIMER = AE::timer 0, $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout}, sub { 215$MASTER_TIMER = AE::timer 0, $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout}, sub {
226 (add_node $_)->connect 216 (add_node $_)->connect
227 for keys %{ $GLOBAL_DB{"'g"} }; 217 for keys %{ $GLOBAL_DB{"'g"} };
228 }; 218};
229 219
230 # instantly connect to other global nodes when we learn of them 220# instantly connect to other global nodes when we learn of them
231 # so we don't have to wait for the timer. 221# so we don't have to wait for the timer.
232 #TODO 222#TODO
233# $GLOBAL_MON{"'g"}{""}{""} = sub { 223# $GLOBAL_MON{"'g"}{""}{""} = sub {
234# (add_node $_[1])->connect; 224# (add_node $_[1])->connect;
235# }; 225# };
236 226
237 # delete slaves on node-down 227# delete slaves on node-down
238 # clear slave db on node-down 228# clear slave db on node-down
239 $MASTER_MON = mon_nodes sub { 229$MASTER_MON = mon_nodes sub {
240 g_slave_disconnect $_[0] unless $_[1]; 230 g_slave_disconnect $_[0] unless $_[1];
241 }; 231};
242 232
243 # tell everybody who connects that we are a global node 233# tell everybody who connects that we are a global node
244 push @AnyEvent::MP::Transport::HOOK_GREET, sub { 234push @AnyEvent::MP::Transport::HOOK_GREET, sub {
245 $_[0]{local_greeting}{global} = 1; 235 $_[0]{local_greeting}{global} = 1;
246 }; 236};
247 237
248 # connect from a global node 238# connect from a global node
249 sub g_global_connect { 239sub g_global_connect {
250 my ($node) = @_; 240 my ($node) = @_;
251 241
252 # we need to set this currently, as to avoid race conditions 242 # we need to set this currently, as to avoid race conditions
253 # because it takes a while until the other global node tells us it is global. 243 # because it takes a while until the other global node tells us it is global.
254 244
255 undef $GLOBAL_DB{"'g"}{$node}; 245 undef $GLOBAL_DB{"'g"}{$node};
256 undef $LOCAL_DBS{$node}{"'g"}{$node}; 246 undef $LOCAL_DBS{$node}{"'g"}{$node};
257 247
258 # global nodes send all local databases, merged, 248 # global nodes send all local databases, merged,
259 # as their local database to global nodes 249 # as their local database to global nodes
260 my %db; 250 my %db;
261 251
262 for (values %LOCAL_DBS) { 252 for (values %LOCAL_DBS) {
263 while (my ($f, $fv) = each %$_) { 253 while (my ($f, $fv) = each %$_) {
264 while (my ($k, $kv) = each %$fv) { 254 while (my ($k, $kv) = each %$fv) {
265 $db{$f}{$k} = $kv; 255 $db{$f}{$k} = $kv;
266 }
267 } 256 }
268 } 257 }
258 }
269 259
270 snd $node => g_set => \%db; 260 snd $node => g_set => \%db;
271 } 261}
272 262
273 # send our database to every global node that connects 263# send our database to every global node that connects
274 push @AnyEvent::MP::Transport::HOOK_CONNECT, sub { 264push @AnyEvent::MP::Transport::HOOK_CONNECT, sub {
275 return unless $_[0]{remote_greeting}{global}; 265 return unless $_[0]{remote_greeting}{global};
276 266
277 g_global_connect $_[0]{remote_node}; 267 g_global_connect $_[0]{remote_node};
278 }; 268};
279 269
280 # tell our master that we are global now 270# tell our master that we are global now
281 for (values %NODE) { 271for (values %NODE) {
282 if ($_->{transport} && $_->{transport}{remote_greeting}{global}) { 272 if ($_->{transport} && $_->{transport}{remote_greeting}{global}) {
283 snd $_->{id} => "g_global"; 273 snd $_->{id} => "g_global";
284 g_global_connect $_->{id}; 274 g_global_connect $_->{id};
285 }
286 } 275 }
276}
287 277
288 $node_req{g_global} = sub { 278$NODE_REQ{g_global} = sub {
289 g_slave_disconnect $SRCNODE->{id}; 279 g_slave_disconnect $SRCNODE->{id};
290 $SRCNODE->{transport}{remote_greeting}{global} = 1; 280 $SRCNODE->{transport}{remote_greeting}{global} = 1;
291 g_global_connect $SRCNODE->{id}; 281 g_global_connect $SRCNODE->{id};
292 }; 282};
293 283
294 # now add us to the set of global nodes 284# now add us to the set of global nodes
295 ldb_set "'g" => $NODE => undef; 285db_set "'g" => $NODE => undef;
296}
297
298=item $guard = grp_reg $group, $port
299
300Register the given (local!) port in the named global group C<$group>.
301
302The port will be unregistered automatically when the port is destroyed.
303
304When not called in void context, then a guard object will be returned that
305will also cause the name to be unregistered when destroyed.
306
307=cut
308
309# register local port
310sub grp_reg($$) {
311 my ($group, $port) = @_;
312
313 port_is_local $port
314 or Carp::croak "AnyEvent::MP::Global::grp_reg can only be called for local ports, caught";
315
316 defined wantarray && AnyEvent::Util::guard { unregister ($port, $group) }
317}
318
319=item $ports = grp_get $group
320
321Returns all the ports currently registered to the given group (as
322read-only(!) array reference). When the group has no registered members,
323return C<undef>.
324
325=cut
326
327sub grp_get($) {
328}
329
330=item $guard = grp_mon $group, $callback->($ports, $add, $del)
331
332Installs a monitor on the given group. Each time there is a change it
333will be called with the current group members as an arrayref as the
334first argument. The second argument only contains ports added, the third
335argument only ports removed.
336
337Unlike C<grp_get>, all three arguments will always be array-refs, even if
338the array is empty. None of the arrays must be modified in any way.
339
340The first invocation will be with the first two arguments set to the
341current members, as if all of them were just added, but only when the
342group is actually non-empty.
343
344Optionally returns a guard object that uninstalls the watcher when it is
345destroyed.
346
347=cut
348
349sub grp_mon($$) {
350 my ($grp, $cb) = @_;
351}
352 286
353=back 287=back
354 288
355=head1 SEE ALSO 289=head1 SEE ALSO
356 290

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines