ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Global.pm
Revision: 1.71
Committed: Tue Oct 2 13:58:07 2012 UTC (11 years, 7 months ago) by root
Branch: MAIN
CVS Tags: rel-2_02, rel-2_01, rel-2_0, HEAD
Changes since 1.70: +28 -25 lines
Log Message:
*** empty log message ***

File Contents

# Content
1 =head1 NAME
2
3 AnyEvent::MP::Global - network backbone services
4
5 =head1 SYNOPSIS
6
7 use AnyEvent::MP::Global;
8
9 =head1 DESCRIPTION
10
11 This module is usually run (or started on) seed nodes and provides a
12 variety of services to connected nodes, such as the distributed database.
13
14 The global nodes form a fully-meshed network, that is, all global nodes
15 currently maintain connections to all other global nodes.
16
17 Loading this module (e.g. as a service) transforms the local node into a
18 global node. There are no user-servicable parts inside.
19
20 For a limited time, this module also exports some AEMP 1.x compatibility
21 functions (C<grp_reg>, C<grp_get> and C<grp_mon>).
22
23 =cut
24
25 package AnyEvent::MP::Global;
26
27 use common::sense;
28 use Carp ();
29 use List::Util ();
30
31 use AnyEvent ();
32
33 use AnyEvent::MP;
34 use AnyEvent::MP::Kernel;
35
36 AE::log 7 => "starting global service.";
37
38 #############################################################################
39 # node protocol parts for global nodes
40
41 package AnyEvent::MP::Kernel;
42
43 use JSON::XS ();
44
45 # TODO: this is ugly (classical use vars vs. our),
46 # maybe this should go into MP::Kernel
47
48 # "import" from Kernel
49 our %NODE;
50 our $NODE;
51 #our $GLOBAL;
52 our $SRCNODE; # the origin node id
53 our %NODE_REQ;
54 our %GLOBAL_NODE;
55 our $GLOBAL;
56
57 # only in global code
58 our %GLOBAL_SLAVE;
59 our %GLOBAL_MON; # monitors {family}
60
61 our %GLOBAL_DB; # all local databases, merged - empty on non-global nodes
62 our %LOCAL_DBS; # local databases of other nodes (global and slave)
63 our %LOCAL_DB; # this node database
64
65 # broadcasts a message to all other global nodes
66 sub g_broadcast {
67 snd $_, @_
68 for keys %GLOBAL_NODE;
69 }
70
71 # add/replace/del inside a family in the database
72 # @$del must not contain any key in %$set
73 sub g_upd {
74 my ($node, $family, $set, $del) = @_;
75
76 my $ldb = $LOCAL_DBS{$node}{$family} ||= {};
77 my $gdb = $GLOBAL_DB {$family} ||= {};
78
79 my %local_set; # extra local set's created by deletes
80
81 # add/replace keys
82 while (my ($k, $v) = each %$set) {
83 #TODO# optimize duplicate gdb-set's, to some extent, maybe
84 # but is probably difficult and slow, so don't for the time being.
85
86 $ldb->{$k} =
87 $gdb->{$k} = $v;
88 }
89
90 my (@del_local, @del_global); # actual deletes for other global nodes / our slaves
91
92 # take care of deletes
93 for my $k (@$del) {
94 delete $ldb->{$k};
95
96 if (my @other = grep exists $LOCAL_DBS{$_}{$family}{$k}, keys %LOCAL_DBS) {
97 # key exists in some other db shard(s)
98
99 # if there is a local one, we have to update
100 # otherwise, we update and delete on other globals
101
102 if (my $local = List::Util::first { exists $GLOBAL_SLAVE{$_} } @other) {
103 $set->{$k} =
104 $gdb->{$k} = $LOCAL_DBS{$local}{$family}{$k}
105 unless sv_eq $gdb->{$k}, $LOCAL_DBS{$local}{$family}{$k};
106
107 } else {
108 # must be in a global one then
109 my $global = List::Util::first { !exists $GLOBAL_SLAVE{$_} } @other;
110
111 push @del_global, $k;
112
113 $local_set{$k} =
114 $gdb->{$k} = $LOCAL_DBS{$global}{$family}{$k}
115 unless sv_eq $gdb->{$k}, $LOCAL_DBS{$global}{$family}{$k};
116 }
117 } else {
118 delete $gdb->{$k};
119
120 # this was the only one, so delete locally
121 push @del_local, $k;
122 # and globally, if it's a local key
123 push @del_global, $k if exists $GLOBAL_SLAVE{$node};
124 }
125 }
126
127 # family could be empty now
128 delete $GLOBAL_DB {$family} unless %$gdb;
129 delete $LOCAL_DBS{$node}{$family} unless %$ldb;
130
131 # tell other global nodes any changes in our database
132 g_broadcast g_upd => $family, $set, \@del_global
133 if exists $GLOBAL_SLAVE{$node} && (%$set || @del_global);
134
135 # tell subscribers we have changed the family
136 if (%$set || %local_set || @del_local) {
137 @$set{keys %local_set} = values %local_set;
138
139 snd $_ => g_chg2 => $family, $set, \@del_local
140 for keys %{ $GLOBAL_MON{$family} };
141 }
142 }
143
144 # set the whole (node-local) database - previous value must be empty
145 sub g_set($$) {
146 my ($node, $db) = @_;
147
148 while (my ($f, $k) = each %$db) {
149 g_upd $node, $f, $k;
150 }
151 }
152
153 # delete all keys from a database
154 sub g_clr($) {
155 my ($node) = @_;
156
157 my $db = $LOCAL_DBS{$node};
158
159 while (my ($f, $k) = each %$db) {
160 g_upd $node, $f, undef, [keys %$k];
161 }
162
163 delete $LOCAL_DBS{$node};
164 }
165
166 # gather node databases from slaves
167
168 # other node wants to make us the master and sends us their db
169 $NODE_REQ{g_slave} = sub {
170 my ($db) = @_
171 or return; # empty g_slave is used to start global service
172
173 my $node = $SRCNODE;
174 undef $GLOBAL_SLAVE{$node};
175 g_set $node, $db;
176 };
177
178 # other global node sends us their database
179 $NODE_REQ{g_set} = sub {
180 my ($db) = @_;
181
182 # need to get it here, because g_set destroys it
183 my $binds = $db->{"'l"}{$SRCNODE};
184
185 g_set $SRCNODE, $db;
186
187 # a remote node always has to provide their listeners. for global
188 # nodes, we mirror their 'l locally, just as we also set 'g.
189 # that's not very efficient, but ensures that global nodes
190 # find each other.
191 db_set "'l" => $SRCNODE => $binds;
192 };
193
194 # other node (global and slave) sends us a family update
195 $NODE_REQ{g_upd} = sub {
196 &g_upd ($SRCNODE, @_);
197 };
198
199 # slave node wants to know the listeners of a node
200 $NODE_REQ{g_find} = sub {
201 my ($node) = @_;
202
203 snd $SRCNODE, g_found => $node, $GLOBAL_DB{"'l"}{$node};
204 };
205
206 $NODE_REQ{g_db_family} = sub {
207 my ($family, $id) = @_;
208 snd $SRCNODE, g_reply => $id, $GLOBAL_DB{$family} || {};
209 };
210
211 $NODE_REQ{g_db_keys} = sub {
212 my ($family, $id) = @_;
213 snd $SRCNODE, g_reply => $id, [keys %{ $GLOBAL_DB{$family} } ];
214 };
215
216 $NODE_REQ{g_db_values} = sub {
217 my ($family, $id) = @_;
218 snd $SRCNODE, g_reply => $id, [values %{ $GLOBAL_DB{$family} } ];
219 };
220
221 # monitoring
222
223 sub g_disconnect($) {
224 my ($node) = @_;
225
226 delete $GLOBAL_NODE{$node}; # also done in Kernel.pm, but doing it here avoids overhead
227
228 db_del "'g" => $node;
229 db_del "'l" => $node;
230 g_clr $node;
231
232 if (my $mon = delete $GLOBAL_SLAVE{$node}) {
233 while (my ($f, $fv) = each %$mon) {
234 delete $GLOBAL_MON{$f}{$_}
235 for keys %$fv;
236
237 delete $GLOBAL_MON{$f}
238 unless %{ $GLOBAL_MON{$f} };
239 }
240 }
241 }
242
243 # g_mon0 family - stop monitoring
244 $NODE_REQ{g_mon0} = sub {
245 delete $GLOBAL_MON{$_[0]}{$SRCNODE};
246 delete $GLOBAL_MON{$_[0]} unless %{ $GLOBAL_MON{$_[0]} };
247
248 delete $GLOBAL_SLAVE{$SRCNODE}{$_[0]};
249 };
250
251 # g_mon1 family key - start monitoring
252 $NODE_REQ{g_mon1} = sub {
253 undef $GLOBAL_SLAVE{$SRCNODE}{$_[0]};
254 undef $GLOBAL_MON{$_[0]}{$SRCNODE};
255
256 snd $SRCNODE, g_chg1 => $_[0], $GLOBAL_DB{$_[0]};
257 };
258
259 #############################################################################
260 # switch to global mode
261
262 # connect from a global node
263 sub g_global_connect {
264 my ($node) = @_;
265
266 # each node puts the set of connected global nodes into
267 # 'g - this causes a big duplication and mergefest, but
268 # is the easiest way to ensure global nodes have a list
269 # of all other global nodes.
270 # we also mirror 'l as soon as we receive it, causing
271 # even more overhead.
272 db_set "'g" => $node;
273
274 # global nodes send all local databases of their slaves, merged,
275 # as their database to other global nodes
276 my %db;
277
278 while (my ($k, $v) = each %LOCAL_DBS) {
279 next unless exists $GLOBAL_SLAVE{$k};
280
281 while (my ($f, $fv) = each %$v) {
282 while (my ($k, $kv) = each %$fv) {
283 $db{$f}{$k} = $kv;
284 }
285 }
286 }
287
288 snd $node => g_set => \%db;
289 }
290
291 # overrides request in Kernel
292 $NODE_REQ{g_global} = sub {
293 g_disconnect $SRCNODE; # usually a nop, but not when a normal node becomes global
294 undef $GLOBAL_NODE{$SRCNODE}; # same as in Kernel.pm
295 g_global_connect $SRCNODE;
296 };
297
298 # delete data from other nodes on node-down
299 mon_nodes sub {
300 if ($_[1]) {
301 snd $_[0] => "g_global"; # tell everybody that we are a global node
302 } else {
303 g_disconnect $_[0];
304 }
305 };
306
307 # now, this is messy
308 AnyEvent::MP::Kernel::post_configure {
309 # enable global mode
310 $GLOBAL = 1;
311
312 # global nodes are their own masters - this
313 # resends global requests and sets the local database.
314 master_set $NODE;
315
316 # now add us to the set of global nodes
317 db_set "'g" => $NODE;
318
319 # tell other nodes that we are global now
320 for (up_nodes) {
321 snd $_, "g_global";
322
323 # if the node is global, connect
324 g_global_connect $_
325 if exists $GLOBAL_NODE{$_};
326 }
327
328 # from here on we should be able to act "normally"
329
330 # maintain connections to all global nodes that we know of
331 db_mon "'g" => sub {
332 keepalive_add $_ for @{ $_[1] };
333 keepalive_del $_ for @{ $_[3] };
334 };
335 };
336
337 #############################################################################
338 # compatibility functions for aemp 1.0
339
340 package AnyEvent::MP::Global;
341
342 use base "Exporter";
343 our @EXPORT = qw(grp_reg grp_get grp_mon);
344
345 sub grp_reg($$) {
346 &db_reg
347 }
348
349 sub grp_get($) {
350 my @ports = keys %{ $AnyEvent::MP::Kernel::GLOBAL_DB{$_[0]} };
351
352 @ports ? \@ports : undef
353 }
354
355 sub grp_mon($$) {
356 my ($grp, $cb) = @_;
357
358 db_mon $grp => sub {
359 my ($ports, $add, $chg, $del) = @_;
360
361 $cb->([keys %$ports], $add, $del);
362 };
363 }
364
365 =head1 SEE ALSO
366
367 L<AnyEvent::MP>.
368
369 =head1 AUTHOR
370
371 Marc Lehmann <schmorp@schmorp.de>
372 http://home.schmorp.de/
373
374 =cut
375
376 1
377