ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Global.pm
Revision: 1.48
Committed: Fri Mar 2 19:21:16 2012 UTC (12 years, 3 months ago) by root
Branch: MAIN
Changes since 1.47: +0 -5 lines
Log Message:
*** empty log message ***

File Contents

# Content
1 =head1 NAME
2
3 AnyEvent::MP::Global - some network-global services
4
5 =head1 SYNOPSIS
6
7 use AnyEvent::MP::Global;
8
9 =head1 DESCRIPTION
10
11 This module maintains a fully-meshed network between global nodes and
12 tries to have connections with all nodes in the network.
13
14 It also manages named port groups - ports can register themselves in any
15 number of groups that will be available network-wide, which is great for
16 discovering services.
17
18 Running it on one node will automatically run it on all nodes, although,
19 at the moment, the global service is started by default anyways.
20
21 =head1 GLOBALS AND FUNCTIONS
22
23 =over 4
24
25 =cut
26
27 package AnyEvent::MP::Global;
28
29 use common::sense;
30 use Carp ();
31
32 use AnyEvent ();
33 use AnyEvent::Util ();
34
35 use AnyEvent::MP;
36 use AnyEvent::MP::Kernel;
37 use AnyEvent::MP::Transport ();
38
39 use base "Exporter";
40
41 our @EXPORT = qw(
42 grp_reg
43 grp_get
44 grp_mon
45 );
46
47 $AnyEvent::MP::Kernel::WARN->(7, "starting global service.");
48
49 #############################################################################
50 # node protocol parts for global nodes
51
52 {
53 package AnyEvent::MP::Kernel;
54
55 # TODO: this is ugly, maybe this should go into MP::Kernel or a separate module #d#
56
57 our %NODE;
58 our $NODE;
59 our $LISTENER;
60
61 our $GLOBAL;
62 our $MASTER;
63 our $MASTER_MON;
64 our $MASTER_TIMER;
65
66 our %GLOBAL_SLAVE;
67
68 our %GLOBAL_DB; # global db
69 our %LOCAL_DBS; # local databases of other global nodes
70 our %LOCAL_DB; # this node database
71
72 our $SRCNODE;
73 our %node_req;
74
75 # only in global code
76 our %GLOBAL_MON; # monitors {family}{"" or key}
77
78 sub other_globals() {
79 grep $_ ne $NODE && node_is_up $_, keys %{ $GLOBAL_DB{"'g"} }
80 }
81
82 # broadcasts a message to all other global nodes
83 sub g_broadcast {
84 snd $_, @_
85 for other_globals;
86 }
87
88 sub g_mon_check {
89 warn "g_mon_check<@_>\n";#d#
90
91 my %node = (
92 %{ $GLOBAL_MON{$_[1]}{$_[2]} },
93 %{ $GLOBAL_MON{$_[1]}{"" } },
94 %{ $GLOBAL_MON{"" }{"" } },
95 );
96
97 snd $_ => g_chg1 => $_[1], $_[2], @_ > 2 ? $_[3] : ()
98 for keys %node;
99 }
100
101 # add/replace a key in the database
102 sub g_add($$$$) {
103 $LOCAL_DBS{$_[0]}{$_[1]}{$_[2]} =
104 $GLOBAL_DB {$_[1]}{$_[2]} = $_[3];
105
106 g_broadcast g_add => $_[1] => $_[2] => $_[3]
107 if exists $GLOBAL_SLAVE{$_[0]};
108
109 warn "g_add<@_>\n";#d#
110 &g_mon_check;
111 }
112
113 # delete a key from the database
114 sub g_del($$$) {
115 delete $LOCAL_DBS{$_[0]}{$_[1]}{$_[2]};
116
117 g_broadcast g_del => $_[1] => $_[2]
118 if exists $GLOBAL_SLAVE{$_[0]};
119
120 delete $GLOBAL_DB{$_[1]}{$_[2]};
121
122 # check if other node maybe still has the key, then we don't delete, but add
123 for (values %LOCAL_DBS) {
124 if (exists $_->{$_[1]}{$_[2]}) {
125 $GLOBAL_DB{$_[1]}{$_[2]} = $_->{$_[1]}{$_[2]};
126 last;
127 }
128 }
129
130 warn "g_del<@_>\n";#d#
131 &g_mon_check;
132 }
133
134 # delete all keys from a database
135 sub g_clr($) {
136 my ($node) = @_;
137
138 my $db = $LOCAL_DBS{$node};
139 while (my ($f, $k) = each %$db) {
140 g_del $node, $f => $_
141 for keys %$k;
142 }
143
144 delete $LOCAL_DBS{$node};
145 }
146
147 # set the whole (node-local) database - previous value must be empty
148 sub g_set($$) {
149 my ($node, $db) = @_;
150
151 while (my ($f, $k) = each %$db) {
152 g_add $node, $f => $_ => delete $k->{$_}
153 for keys %$k;
154 }
155 }
156
157 # gather node databases from slaves
158
159 # other node wants to make us the master
160 $node_req{g_slave} = sub {
161 my ($db) = @_;
162
163 my $node = $SRCNODE->{id};
164 undef $GLOBAL_SLAVE{$node};
165 g_set $node, $db;
166 };
167
168 $node_req{g_add} = sub {
169 &g_add ($SRCNODE->{id}, @_);
170 };
171
172 $node_req{g_del} = sub {
173 &g_del ($SRCNODE->{id}, @_);
174 };
175
176 $node_req{g_set} = sub {
177 g_set $SRCNODE->{id}, $_[0];
178 };
179
180 $node_req{g_find} = sub {
181 my ($node) = @_;
182
183 snd $SRCNODE->{id}, g_found => $node, $GLOBAL_DB{"'l"}{$node};
184 };
185
186 # monitoring
187
188 sub g_slave_disconnect($) {
189 my ($node) = @_;
190
191 g_clr $node;
192
193 if (my $mon = delete $GLOBAL_SLAVE{$node}) {
194 while (my ($f, $fv) = each %$mon) {
195 delete $GLOBAL_MON{$f}{$_}
196 for keys %$fv;
197 }
198 }
199 }
200
201 # g_mon0 family key - stop monitoring
202 $node_req{g_mon0} = sub {
203 delete $GLOBAL_MON{$_[0]}{$_[1]}{$SRCNODE->{id}};
204 delete $GLOBAL_SLAVE{$SRCNODE->{id}}{$_[0]}{$_[1]};
205 };
206
207 # g_mon1 family key - start monitoring
208 $node_req{g_mon1} = sub {
209 undef $GLOBAL_SLAVE{$SRCNODE->{id}}{$_[0]}{$_[1]};
210 undef $GLOBAL_MON{$_[0]}{$_[1]}{$SRCNODE->{id}};
211 #d# generate lots of initial change requests, or one big?
212
213 snd $SRCNODE->{id}, g_chg0 => $_[0], $_[1]
214 };
215
216 #############################################################################
217 # switch to global mode
218
219 $GLOBAL = 1;
220 $MASTER = $NODE;
221 undef $GLOBAL_SLAVE{$NODE}; # we are our own master (and slave)
222 $LOCAL_DBS{$NODE} = { %LOCAL_DB };
223
224 # regularly try to connect to global nodes - maybe use seeding code?
225 $MASTER_TIMER = AE::timer 0, $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout}, sub {
226 (add_node $_)->connect
227 for keys %{ $GLOBAL_DB{"'g"} };
228 };
229
230 # instantly connect to other global nodes when we learn of them
231 # so we don't have to wait for the timer.
232 #TODO
233 # $GLOBAL_MON{"'g"}{""}{""} = sub {
234 # (add_node $_[1])->connect;
235 # };
236
237 # delete slaves on node-down
238 # clear slave db on node-down
239 $MASTER_MON = mon_nodes sub {
240 g_slave_disconnect $_[0] unless $_[1];
241 };
242
243 # tell everybody who connects that we are a global node
244 push @AnyEvent::MP::Transport::HOOK_GREET, sub {
245 $_[0]{local_greeting}{global} = 1;
246 };
247
248 # connect from a global node
249 sub g_global_connect {
250 my ($node) = @_;
251
252 # we need to set this currently, as to avoid race conditions
253 # because it takes a while until the other global node tells us it is global.
254
255 undef $GLOBAL_DB{"'g"}{$node};
256 undef $LOCAL_DBS{$node}{"'g"}{$node};
257
258 # global nodes send all local databases, merged,
259 # as their local database to global nodes
260 my %db;
261
262 for (values %LOCAL_DBS) {
263 while (my ($f, $fv) = each %$_) {
264 while (my ($k, $kv) = each %$fv) {
265 $db{$f}{$k} = $kv;
266 }
267 }
268 }
269
270 snd $node => g_set => \%db;
271 }
272
273 # send our database to every global node that connects
274 push @AnyEvent::MP::Transport::HOOK_CONNECT, sub {
275 return unless $_[0]{remote_greeting}{global};
276
277 g_global_connect $_[0]{remote_node};
278 };
279
280 # tell our master that we are global now
281 for (values %NODE) {
282 if ($_->{transport} && $_->{transport}{remote_greeting}{global}) {
283 snd $_->{id} => "g_global";
284 g_global_connect $_->{id};
285 }
286 }
287
288 $node_req{g_global} = sub {
289 g_slave_disconnect $SRCNODE->{id};
290 $SRCNODE->{transport}{remote_greeting}{global} = 1;
291 g_global_connect $SRCNODE->{id};
292 };
293
294 # now add us to the set of global nodes
295 ldb_set "'g" => $NODE => undef;
296 }
297
298 =item $guard = grp_reg $group, $port
299
300 Register the given (local!) port in the named global group C<$group>.
301
302 The port will be unregistered automatically when the port is destroyed.
303
304 When not called in void context, then a guard object will be returned that
305 will also cause the name to be unregistered when destroyed.
306
307 =cut
308
309 # register local port
310 sub grp_reg($$) {
311 my ($group, $port) = @_;
312
313 port_is_local $port
314 or Carp::croak "AnyEvent::MP::Global::grp_reg can only be called for local ports, caught";
315
316 defined wantarray && AnyEvent::Util::guard { unregister ($port, $group) }
317 }
318
319 =item $ports = grp_get $group
320
321 Returns all the ports currently registered to the given group (as
322 read-only(!) array reference). When the group has no registered members,
323 return C<undef>.
324
325 =cut
326
327 sub grp_get($) {
328 }
329
330 =item $guard = grp_mon $group, $callback->($ports, $add, $del)
331
332 Installs a monitor on the given group. Each time there is a change it
333 will be called with the current group members as an arrayref as the
334 first argument. The second argument only contains ports added, the third
335 argument only ports removed.
336
337 Unlike C<grp_get>, all three arguments will always be array-refs, even if
338 the array is empty. None of the arrays must be modified in any way.
339
340 The first invocation will be with the first two arguments set to the
341 current members, as if all of them were just added, but only when the
342 group is actually non-empty.
343
344 Optionally returns a guard object that uninstalls the watcher when it is
345 destroyed.
346
347 =cut
348
349 sub grp_mon($$) {
350 my ($grp, $cb) = @_;
351 }
352
353 =back
354
355 =head1 SEE ALSO
356
357 L<AnyEvent::MP>.
358
359 =head1 AUTHOR
360
361 Marc Lehmann <schmorp@schmorp.de>
362 http://home.schmorp.de/
363
364 =cut
365
366 1
367