ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Global.pm
Revision: 1.69
Committed: Sat Mar 24 16:50:15 2012 UTC (12 years, 2 months ago) by root
Branch: MAIN
Changes since 1.68: +16 -6 lines
Log Message:
*** empty log message ***

File Contents

# Content
1 =head1 NAME
2
3 AnyEvent::MP::Global - network backbone services
4
5 =head1 SYNOPSIS
6
7 use AnyEvent::MP::Global;
8
9 =head1 DESCRIPTION
10
11 This module is usually run (or started on) seed nodes and provides a
12 variety of services to connected nodes, such as the distributed database.
13
14 The global nodes form a fully-meshed network, that is, all global nodes
15 currently maintain connections to all other global nodes.
16
17 Loading this module (e.g. as a service) transforms the local node into a
18 global node. There are no user-servicable parts inside.
19
20 For a limited time, this module also exports some AEMP 1.x compatibility
21 functions (C<grp_reg>, C<grp_get> and C<grp_mon>).
22
23 =cut
24
25 package AnyEvent::MP::Global;
26
27 use common::sense;
28 use Carp ();
29
30 use AnyEvent ();
31
32 use AnyEvent::MP;
33 use AnyEvent::MP::Kernel;
34
35 AE::log 7 => "starting global service.";
36
37 #############################################################################
38 # node protocol parts for global nodes
39
40 package AnyEvent::MP::Kernel;
41
42 # TODO: this is ugly (classical use vars vs. our),
43 # maybe this should go into MP::Kernel
44
45 our %NODE;
46 our $NODE;
47
48 our $GLOBAL;
49 our $MASTER;
50 our $MASTER_TIMER;
51
52 our %GLOBAL_SLAVE;
53
54 our %GLOBAL_DB; # global db
55 our %LOCAL_DBS; # local databases of other nodes (global and slave)
56 our %LOCAL_DB; # this node database
57
58 our $SRCNODE; # the origin node id
59 our %NODE_REQ;
60
61 # only in global code
62 our %GLOBAL_MON; # monitors {family}
63
64 # broadcasts a message to all other global nodes
65 sub g_broadcast {
66 snd $_, @_
67 for grep $_ ne $NODE && node_is_up $_, keys %{ $GLOBAL_DB{"'g"} }
68 }
69
70 # add/replace/del inside a family in the database
71 # @$del must not contain any key in %$set
72 sub g_upd {
73 my ($node, $family, $set, $del) = @_;
74
75 my $ldb = $LOCAL_DBS{$node}{$family} ||= {};
76 my $gdb = $GLOBAL_DB {$family} ||= {};
77
78 # add/replace keys
79 while (my ($k, $v) = each %$set) {
80 $ldb->{$k} =
81 $gdb->{$k} = $v;
82 }
83
84 my @del; # actual deletes
85
86 # take care of deletes
87 keydel:
88 for my $k (@$del) {
89 next unless exists $ldb->{$k};
90
91 delete $ldb->{$k};
92 delete $gdb->{$k};
93
94 # check if some other node still has the key, then we don't delete, but change
95 for (values %LOCAL_DBS) {
96 if (exists $_->{$family}{$k}) {
97 $set->{$k} = $gdb->{$k} = $_->{$family}{$k};
98
99 next keydel;
100 }
101 }
102
103 push @del, $k;
104 }
105
106 # family could be empty now
107 delete $GLOBAL_DB{$family} unless %$gdb;
108 delete $LOCAL_DBS{$node}{$family} unless %$ldb;
109
110 return unless %$set || @del;
111
112 g_broadcast g_upd => $family, $set, \@del
113 if exists $GLOBAL_SLAVE{$node};
114
115 # tell subscribers we have changed the family
116 snd $_ => g_chg2 => $family, $set, \@del
117 for keys %{ $GLOBAL_MON{$family} };
118 }
119
120 # set the whole (node-local) database - previous value must be empty
121 sub g_set($$) {
122 my ($node, $db) = @_;
123
124 while (my ($f, $k) = each %$db) {
125 g_upd $node, $f, $k;
126 }
127 }
128
129 # delete all keys from a database
130 sub g_clr($) {
131 my ($node) = @_;
132
133 my $db = $LOCAL_DBS{$node};
134
135 while (my ($f, $k) = each %$db) {
136 g_upd $node, $f, undef, [keys %$k];
137 }
138
139 delete $LOCAL_DBS{$node};
140 }
141
142 # gather node databases from slaves
143
144 # other node wants to make us the master and sends us their db
145 $NODE_REQ{g_slave} = sub {
146 my ($db) = @_
147 or return; # empty g_slave is used to start global service
148
149 my $node = $SRCNODE;
150 undef $GLOBAL_SLAVE{$node};
151 g_set $node, $db;
152 };
153
154 # other global node sends us their database
155 $NODE_REQ{g_set} = sub {
156 my ($db) = @_;
157
158 g_set $SRCNODE, $db;
159
160 # a remote node always has to provide their listeners. for global
161 # nodes, we mirror their 'l locally, just as we also set 'g.
162 # that's not very efficient, but ensures that global nodes
163 # find each other.
164 db_set "'l" => $SRCNODE => $db->{"'l"}{$SRCNODE};
165 };
166
167 # other node (global and slave) sends us a family update
168 $NODE_REQ{g_upd} = sub {
169 &g_upd ($SRCNODE, @_);
170 };
171
172 # slave node wants to know the listeners of a node
173 $NODE_REQ{g_find} = sub {
174 my ($node) = @_;
175
176 snd $SRCNODE, g_found => $node, $GLOBAL_DB{"'l"}{$node};
177 };
178
179 $NODE_REQ{g_db_family} = sub {
180 my ($family, $id) = @_;
181 snd $SRCNODE, g_reply => $id, $GLOBAL_DB{$family} || {};
182 };
183
184 $NODE_REQ{g_db_keys} = sub {
185 my ($family, $id) = @_;
186 snd $SRCNODE, g_reply => $id, [keys %{ $GLOBAL_DB{$family} } ];
187 };
188
189 $NODE_REQ{g_db_values} = sub {
190 my ($family, $id) = @_;
191 snd $SRCNODE, g_reply => $id, [values %{ $GLOBAL_DB{$family} } ];
192 };
193
194 # monitoring
195
196 sub g_disconnect($) {
197 my ($node) = @_;
198
199 db_del "'g" => $node;
200 db_del "'l" => $node;
201 g_clr $node;
202
203 if (my $mon = delete $GLOBAL_SLAVE{$node}) {
204 while (my ($f, $fv) = each %$mon) {
205 delete $GLOBAL_MON{$f}{$_}
206 for keys %$fv;
207
208 delete $GLOBAL_MON{$f}
209 unless %{ $GLOBAL_MON{$f} };
210 }
211 }
212 }
213
214 # g_mon0 family - stop monitoring
215 $NODE_REQ{g_mon0} = sub {
216 delete $GLOBAL_MON{$_[0]}{$SRCNODE};
217 delete $GLOBAL_MON{$_[0]} unless %{ $GLOBAL_MON{$_[0]} };
218
219 delete $GLOBAL_SLAVE{$SRCNODE}{$_[0]};
220 };
221
222 # g_mon1 family key - start monitoring
223 $NODE_REQ{g_mon1} = sub {
224 undef $GLOBAL_SLAVE{$SRCNODE}{$_[0]};
225 undef $GLOBAL_MON{$_[0]}{$SRCNODE};
226
227 snd $SRCNODE, g_chg1 => $_[0], $GLOBAL_DB{$_[0]};
228 };
229
230 #############################################################################
231 # switch to global mode
232
233 # delete data from other nodes on node-down
234 mon_nodes sub {
235 g_disconnect $_[0] unless $_[1];
236 };
237
238 # tell everybody who connects that we are a global node
239 push @AnyEvent::MP::Transport::HOOK_GREET, sub {
240 $_[0]{local_greeting}{global} = 1;
241 };
242
243 # connect from a global node
244 sub g_global_connect {
245 my ($node) = @_;
246
247 # each node puts the set of connected global nodes into
248 # 'g - this causes a big duplication and mergefest, but
249 # is the easiest way to ensure global nodes have a list
250 # of all other global nodes.
251 # we also mirror 'l as soon as we receive it, causing
252 # even more overhead.
253 db_set "'g" => $node;
254
255 # global nodes send all local databases of their slaves, merged,
256 # as their database to other global nodes
257 my %db;
258
259 while (my ($k, $v) = each %LOCAL_DBS) {
260 next unless exists $GLOBAL_SLAVE{$k};
261
262 while (my ($f, $fv) = each %$v) {
263 while (my ($k, $kv) = each %$fv) {
264 $db{$f}{$k} = $kv;
265 }
266 }
267 }
268
269 snd $node => g_set => \%db;
270 }
271
272 # send our database to every global node that connects
273 push @AnyEvent::MP::Transport::HOOK_CONNECT, sub {
274 return unless $_[0]{remote_greeting}{global};
275
276 g_global_connect $_[0]{remote_node};
277 };
278
279 # tell other global nodes that we are global now
280 # TODO: there is probably a race when two conencted nodes beocme global at the same time
281 # very ugly.
282 for (values %NODE) {
283 if ($_->{transport} && $_->{transport}{remote_greeting}{global}) {
284 snd $_->{id} => "g_global";
285 g_global_connect $_->{id};
286 }
287 }
288
289 $NODE_REQ{g_global} = sub {
290 g_disconnect $SRCNODE;
291
292 my $node = $NODE{$SRCNODE};
293 $node->{transport}{remote_greeting}{global} = 1;
294
295 g_global_connect $SRCNODE;
296 };
297
298 # enable global mode
299 $GLOBAL = 1;
300
301 # global nodes are their own masters - this
302 # resends global requests and sets the local database.
303 master_set $NODE;
304
305 # from here on we should be able to act normally
306
307 # now add us to the set of global nodes
308 db_set "'g" => $NODE;
309
310 # maintain connections to all global nodes that we know of
311 db_mon "'g" => sub {
312 keepalive_add $_ for @{ $_[1] };
313 keepalive_del $_ for @{ $_[3] };
314 };
315
316 #############################################################################
317 # compatibility functions for aemp 1.0
318
319 package AnyEvent::MP::Global;
320
321 use base "Exporter";
322 our @EXPORT = qw(grp_reg grp_get grp_mon);
323
324 sub grp_reg($$) {
325 &db_reg
326 }
327
328 sub grp_get($) {
329 my @ports = keys %{ $AnyEvent::MP::Kernel::GLOBAL_DB{$_[0]} };
330
331 @ports ? \@ports : undef
332 }
333
334 sub grp_mon($$) {
335 my ($grp, $cb) = @_;
336
337 db_mon $grp => sub {
338 my ($ports, $add, $chg, $del) = @_;
339
340 $cb->([keys %$ports], $add, $del);
341 };
342 }
343
344 =head1 SEE ALSO
345
346 L<AnyEvent::MP>.
347
348 =head1 AUTHOR
349
350 Marc Lehmann <schmorp@schmorp.de>
351 http://home.schmorp.de/
352
353 =cut
354
355 1
356