ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Global.pm
Revision: 1.47
Committed: Fri Mar 2 19:19:21 2012 UTC (12 years, 3 months ago) by root
Branch: MAIN
Changes since 1.46: +101 -30 lines
Log Message:
*** empty log message ***

File Contents

# Content
1 =head1 NAME
2
3 AnyEvent::MP::Global - some network-global services
4
5 =head1 SYNOPSIS
6
7 use AnyEvent::MP::Global;
8
9 =head1 DESCRIPTION
10
11 This module maintains a fully-meshed network between global nodes and
12 tries to have connections with all nodes in the network.
13
14 It also manages named port groups - ports can register themselves in any
15 number of groups that will be available network-wide, which is great for
16 discovering services.
17
18 Running it on one node will automatically run it on all nodes, although,
19 at the moment, the global service is started by default anyways.
20
21 =head1 GLOBALS AND FUNCTIONS
22
23 =over 4
24
25 =cut
26
27 package AnyEvent::MP::Global;
28
29 use common::sense;
30 use Carp ();
31
32 use AnyEvent ();
33 use AnyEvent::Util ();
34
35 use AnyEvent::MP;
36 use AnyEvent::MP::Kernel;
37 use AnyEvent::MP::Transport ();
38
39 use base "Exporter";
40
41 our @EXPORT = qw(
42 grp_reg
43 grp_get
44 grp_mon
45 );
46
47 $AnyEvent::MP::Kernel::WARN->(7, "starting global service.");
48
49 #############################################################################
50 # node protocol parts for global nodes
51
52 {
53 package AnyEvent::MP::Kernel;
54
55 # TODO: this is ugly, maybe this should go into MP::Kernel or a separate module #d#
56
57 our %NODE;
58 our $NODE;
59 our $LISTENER;
60
61 our $GLOBAL;
62 our $MASTER;
63 our $MASTER_MON;
64 our $MASTER_TIMER;
65
66 our %GLOBAL_SLAVE;
67
68 our %GLOBAL_DB; # global db
69 our %LOCAL_DBS; # local databases of other global nodes
70 our %LOCAL_DB; # this node database
71
72 our $SRCNODE;
73 our %node_req;
74
75 # only in global code
76 our %GLOBAL_MON; # monitors {family}{"" or key}
77
78 sub other_globals() {
79 grep $_ ne $NODE && node_is_up $_, keys %{ $GLOBAL_DB{"'g"} }
80 }
81
82 # broadcasts a message to all other global nodes
83 sub g_broadcast {
84 snd $_, @_
85 for other_globals;
86 }
87
88 sub g_mon_check {
89 warn "g_mon_check<@_>\n";#d#
90
91 my %node = (
92 %{ $GLOBAL_MON{$_[1]}{$_[2]} },
93 %{ $GLOBAL_MON{$_[1]}{"" } },
94 %{ $GLOBAL_MON{"" }{"" } },
95 );
96
97 snd $_ => g_chg1 => $_[1], $_[2], @_ > 2 ? $_[3] : ()
98 for keys %node;
99 }
100
101 # add/replace a key in the database
102 sub g_add($$$$) {
103 $LOCAL_DBS{$_[0]}{$_[1]}{$_[2]} =
104 $GLOBAL_DB {$_[1]}{$_[2]} = $_[3];
105
106 g_broadcast g_add => $_[1] => $_[2] => $_[3]
107 if exists $GLOBAL_SLAVE{$_[0]};
108
109 warn "g_add<@_>\n";#d#
110 &g_mon_check;
111 }
112
113 # delete a key from the database
114 sub g_del($$$) {
115 delete $LOCAL_DBS{$_[0]}{$_[1]}{$_[2]};
116
117 g_broadcast g_del => $_[1] => $_[2]
118 if exists $GLOBAL_SLAVE{$_[0]};
119
120 delete $GLOBAL_DB{$_[1]}{$_[2]};
121
122 # check if other node maybe still has the key, then we don't delete, but add
123 for (values %LOCAL_DBS) {
124 if (exists $_->{$_[1]}{$_[2]}) {
125 $GLOBAL_DB{$_[1]}{$_[2]} = $_->{$_[1]}{$_[2]};
126 last;
127 }
128 }
129
130 warn "g_del<@_>\n";#d#
131 &g_mon_check;
132 }
133
134 # delete all keys from a database
135 sub g_clr($) {
136 my ($node) = @_;
137
138 my $db = $LOCAL_DBS{$node};
139 while (my ($f, $k) = each %$db) {
140 g_del $node, $f => $_
141 for keys %$k;
142 }
143
144 delete $LOCAL_DBS{$node};
145 }
146
147 # set the whole (node-local) database - previous value must be empty
148 sub g_set($$) {
149 my ($node, $db) = @_;
150
151 while (my ($f, $k) = each %$db) {
152 g_add $node, $f => $_ => delete $k->{$_}
153 for keys %$k;
154 }
155 }
156
157 # gather node databases from slaves
158
159 # other node wants to make us the master
160 $node_req{g_slave} = sub {
161 my ($db) = @_;
162
163 my $node = $SRCNODE->{id};
164 undef $GLOBAL_SLAVE{$node};
165 g_set $node, $db;
166 };
167
168 $node_req{g_add} = sub {
169 &g_add ($SRCNODE->{id}, @_);
170 };
171
172 $node_req{g_del} = sub {
173 &g_del ($SRCNODE->{id}, @_);
174 };
175
176 $node_req{g_set} = sub {
177 g_set $SRCNODE->{id}, $_[0];
178 };
179
180 $node_req{g_find} = sub {
181 my ($node) = @_;
182
183 snd $SRCNODE->{id}, g_found => $node, $GLOBAL_DB{"'l"}{$node};
184 };
185
186 # monitoring
187
188 sub g_slave_disconnect($) {
189 my ($node) = @_;
190
191 g_clr $node;
192
193 if (my $mon = delete $GLOBAL_SLAVE{$node}) {
194 while (my ($f, $fv) = each %$mon) {
195 delete $GLOBAL_MON{$f}{$_}
196 for keys %$fv;
197 }
198 }
199 }
200
201 # g_mon0 family key - stop monitoring
202 $node_req{g_mon0} = sub {
203 delete $GLOBAL_MON{$_[0]}{$_[1]}{$SRCNODE->{id}};
204 delete $GLOBAL_SLAVE{$SRCNODE->{id}}{$_[0]}{$_[1]};
205 };
206
207 # g_mon1 family key - start monitoring
208 $node_req{g_mon1} = sub {
209 undef $GLOBAL_SLAVE{$SRCNODE->{id}}{$_[0]}{$_[1]};
210 undef $GLOBAL_MON{$_[0]}{$_[1]}{$SRCNODE->{id}};
211 #d# generate lots of initial change requests, or one big?
212
213 snd $SRCNODE->{id}, g_chg0 => $_[0], $_[1]
214 };
215
216 #############################################################################
217 # switch to global mode
218
219 $GLOBAL = 1;
220 $MASTER = $NODE;
221 undef $GLOBAL_SLAVE{$NODE}; # we are our own master (and slave)
222 $LOCAL_DBS{$NODE} = { %LOCAL_DB };
223
224 # regularly try to connect to global nodes - maybe use seeding code?
225 $MASTER_TIMER = AE::timer 0, $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout}, sub {
226 (add_node $_)->connect
227 for keys %{ $GLOBAL_DB{"'g"} };
228 };
229
230 # instantly connect to other global nodes when we learn of them
231 # so we don't have to wait for the timer.
232 #TODO
233 # $GLOBAL_MON{"'g"}{""}{""} = sub {
234 # (add_node $_[1])->connect;
235 # };
236
237 # delete slaves on node-down
238 # clear slave db on node-down
239 $MASTER_MON = mon_nodes sub {
240 g_slave_disconnect $_[0] unless $_[1];
241 };
242
243 # tell everybody who connects that we are a global node
244 push @AnyEvent::MP::Transport::HOOK_GREET, sub {
245 $_[0]{local_greeting}{global} = 1;
246 };
247
248 # connect from a global node
249 sub g_global_connect {
250 my ($node) = @_;
251
252 # we need to set this currently, as to avoid race conditions
253 # because it takes a while until the other global node tells us it is global.
254
255 undef $GLOBAL_DB{"'g"}{$node};
256 undef $LOCAL_DBS{$node}{"'g"}{$node};
257
258 # global nodes send all local databases, merged,
259 # as their local database to global nodes
260 my %db;
261
262 for (values %LOCAL_DBS) {
263 while (my ($f, $fv) = each %$_) {
264 while (my ($k, $kv) = each %$fv) {
265 $db{$f}{$k} = $kv;
266 }
267 }
268 }
269
270 snd $node => g_set => \%db;
271 }
272
273 # send our database to every global node that connects
274 push @AnyEvent::MP::Transport::HOOK_CONNECT, sub {
275 return unless $_[0]{remote_greeting}{global};
276
277 g_global_connect $_[0]{remote_node};
278 };
279
280 # tell our master that we are global now
281 for (values %NODE) {
282 if ($_->{transport} && $_->{transport}{remote_greeting}{global}) {
283 snd $_->{id} => "g_global";
284 g_global_connect $_->{id};
285 }
286 }
287
288 $node_req{g_global} = sub {
289 g_slave_disconnect $SRCNODE->{id};
290 $SRCNODE->{transport}{remote_greeting}{global} = 1;
291 g_global_connect $SRCNODE->{id};
292 };
293
294 #d#d disconnect everybody to bootstrap development grr
295
296 # $_->transport_error # remove Self::transport_error
297 # for values %NODE;
298
299 # now add us to the set of global nodes
300 ldb_set "'g" => $NODE => undef;
301 }
302
303 =item $guard = grp_reg $group, $port
304
305 Register the given (local!) port in the named global group C<$group>.
306
307 The port will be unregistered automatically when the port is destroyed.
308
309 When not called in void context, then a guard object will be returned that
310 will also cause the name to be unregistered when destroyed.
311
312 =cut
313
314 # register local port
315 sub grp_reg($$) {
316 my ($group, $port) = @_;
317
318 port_is_local $port
319 or Carp::croak "AnyEvent::MP::Global::grp_reg can only be called for local ports, caught";
320
321 defined wantarray && AnyEvent::Util::guard { unregister ($port, $group) }
322 }
323
324 =item $ports = grp_get $group
325
326 Returns all the ports currently registered to the given group (as
327 read-only(!) array reference). When the group has no registered members,
328 return C<undef>.
329
330 =cut
331
332 sub grp_get($) {
333 }
334
335 =item $guard = grp_mon $group, $callback->($ports, $add, $del)
336
337 Installs a monitor on the given group. Each time there is a change it
338 will be called with the current group members as an arrayref as the
339 first argument. The second argument only contains ports added, the third
340 argument only ports removed.
341
342 Unlike C<grp_get>, all three arguments will always be array-refs, even if
343 the array is empty. None of the arrays must be modified in any way.
344
345 The first invocation will be with the first two arguments set to the
346 current members, as if all of them were just added, but only when the
347 group is actually non-empty.
348
349 Optionally returns a guard object that uninstalls the watcher when it is
350 destroyed.
351
352 =cut
353
354 sub grp_mon($$) {
355 my ($grp, $cb) = @_;
356 }
357
358 =back
359
360 =head1 SEE ALSO
361
362 L<AnyEvent::MP>.
363
364 =head1 AUTHOR
365
366 Marc Lehmann <schmorp@schmorp.de>
367 http://home.schmorp.de/
368
369 =cut
370
371 1
372