1 |
=head1 NAME |
2 |
|
3 |
AnyEvent::MP::Global - network backbone services |
4 |
|
5 |
=head1 SYNOPSIS |
6 |
|
7 |
use AnyEvent::MP::Global; |
8 |
|
9 |
=head1 DESCRIPTION |
10 |
|
11 |
This module is usually run (or started on) seed nodes and provides a |
12 |
variety of services to connected nodes, such as the distributed database. |
13 |
|
14 |
The global nodes form a fully-meshed network, that is, all global nodes |
15 |
currently maintain connections to all other global nodes. |
16 |
|
17 |
Loading this module (e.g. as a service) transforms the local node into a |
18 |
global node. There are no user-servicable parts inside. |
19 |
|
20 |
For a limited time, this module also exports some AEMP 1.x compatibility |
21 |
functions (C<grp_reg>, C<grp_get> and C<grp_mon>). |
22 |
|
23 |
=cut |
24 |
|
25 |
package AnyEvent::MP::Global; |
26 |
|
27 |
use common::sense; |
28 |
use Carp (); |
29 |
use List::Util (); |
30 |
|
31 |
use AnyEvent (); |
32 |
|
33 |
use AnyEvent::MP; |
34 |
use AnyEvent::MP::Kernel; |
35 |
|
36 |
AE::log 7 => "starting global service."; |
37 |
|
38 |
############################################################################# |
39 |
# node protocol parts for global nodes |
40 |
|
41 |
package AnyEvent::MP::Kernel; |
42 |
|
43 |
use JSON::XS (); |
44 |
|
45 |
# TODO: this is ugly (classical use vars vs. our), |
46 |
# maybe this should go into MP::Kernel |
47 |
|
48 |
# "import" from Kernel |
49 |
our %NODE; |
50 |
our $NODE; |
51 |
#our $GLOBAL; |
52 |
our $SRCNODE; # the origin node id |
53 |
our %NODE_REQ; |
54 |
our %GLOBAL_NODE; |
55 |
our $GLOBAL; |
56 |
|
57 |
# only in global code |
58 |
our %GLOBAL_SLAVE; |
59 |
our %GLOBAL_MON; # monitors {family} |
60 |
|
61 |
our %GLOBAL_DB; # all local databases, merged - empty on non-global nodes |
62 |
our %LOCAL_DBS; # local databases of other nodes (global and slave) |
63 |
our %LOCAL_DB; # this node database |
64 |
|
65 |
# broadcasts a message to all other global nodes |
66 |
sub g_broadcast { |
67 |
snd $_, @_ |
68 |
for keys %GLOBAL_NODE; |
69 |
} |
70 |
|
71 |
# add/replace/del inside a family in the database |
72 |
# @$del must not contain any key in %$set |
73 |
sub g_upd { |
74 |
my ($node, $family, $set, $del) = @_; |
75 |
|
76 |
my $ldb = $LOCAL_DBS{$node}{$family} ||= {}; |
77 |
my $gdb = $GLOBAL_DB {$family} ||= {}; |
78 |
|
79 |
my %local_set; # extra local set's created by deletes |
80 |
|
81 |
# add/replace keys |
82 |
while (my ($k, $v) = each %$set) { |
83 |
#TODO# optimize duplicate gdb-set's, to some extent, maybe |
84 |
# but is probably difficult and slow, so don't for the time being. |
85 |
|
86 |
$ldb->{$k} = |
87 |
$gdb->{$k} = $v; |
88 |
} |
89 |
|
90 |
my (@del_local, @del_global); # actual deletes for other global nodes / our slaves |
91 |
|
92 |
# take care of deletes |
93 |
for my $k (@$del) { |
94 |
delete $ldb->{$k}; |
95 |
|
96 |
if (my @other = grep exists $LOCAL_DBS{$_}{$family}{$k}, keys %LOCAL_DBS) { |
97 |
# key exists in some other db shard(s) |
98 |
|
99 |
# if there is a local one, we have to update |
100 |
# otherwise, we update and delete on other globals |
101 |
|
102 |
if (my $local = List::Util::first { exists $GLOBAL_SLAVE{$_} } @other) { |
103 |
$set->{$k} = |
104 |
$gdb->{$k} = $LOCAL_DBS{$local}{$family}{$k} |
105 |
unless sv_eq $gdb->{$k}, $LOCAL_DBS{$local}{$family}{$k}; |
106 |
|
107 |
} else { |
108 |
# must be in a global one then |
109 |
my $global = List::Util::first { !exists $GLOBAL_SLAVE{$_} } @other; |
110 |
|
111 |
push @del_global, $k; |
112 |
|
113 |
$local_set{$k} = |
114 |
$gdb->{$k} = $LOCAL_DBS{$global}{$family}{$k} |
115 |
unless sv_eq $gdb->{$k}, $LOCAL_DBS{$global}{$family}{$k}; |
116 |
} |
117 |
} else { |
118 |
delete $gdb->{$k}; |
119 |
|
120 |
# this was the only one, so delete locally |
121 |
push @del_local, $k; |
122 |
# and globally, if it's a local key |
123 |
push @del_global, $k if exists $GLOBAL_SLAVE{$node}; |
124 |
} |
125 |
} |
126 |
|
127 |
# family could be empty now |
128 |
delete $GLOBAL_DB {$family} unless %$gdb; |
129 |
delete $LOCAL_DBS{$node}{$family} unless %$ldb; |
130 |
|
131 |
# tell other global nodes any changes in our database |
132 |
g_broadcast g_upd => $family, $set, \@del_global |
133 |
if exists $GLOBAL_SLAVE{$node} && (%$set || @del_global); |
134 |
|
135 |
# tell subscribers we have changed the family |
136 |
if (%$set || %local_set || @del_local) { |
137 |
@$set{keys %local_set} = values %local_set; |
138 |
|
139 |
snd $_ => g_chg2 => $family, $set, \@del_local |
140 |
for keys %{ $GLOBAL_MON{$family} }; |
141 |
} |
142 |
} |
143 |
|
144 |
# set the whole (node-local) database - previous value must be empty |
145 |
sub g_set($$) { |
146 |
my ($node, $db) = @_; |
147 |
|
148 |
while (my ($f, $k) = each %$db) { |
149 |
g_upd $node, $f, $k; |
150 |
} |
151 |
} |
152 |
|
153 |
# delete all keys from a database |
154 |
sub g_clr($) { |
155 |
my ($node) = @_; |
156 |
|
157 |
my $db = $LOCAL_DBS{$node}; |
158 |
|
159 |
while (my ($f, $k) = each %$db) { |
160 |
g_upd $node, $f, undef, [keys %$k]; |
161 |
} |
162 |
|
163 |
delete $LOCAL_DBS{$node}; |
164 |
} |
165 |
|
166 |
# gather node databases from slaves |
167 |
|
168 |
# other node wants to make us the master and sends us their db |
169 |
$NODE_REQ{g_slave} = sub { |
170 |
my ($db) = @_ |
171 |
or return; # empty g_slave is used to start global service |
172 |
|
173 |
my $node = $SRCNODE; |
174 |
undef $GLOBAL_SLAVE{$node}; |
175 |
g_set $node, $db; |
176 |
}; |
177 |
|
178 |
# other global node sends us their database |
179 |
$NODE_REQ{g_set} = sub { |
180 |
my ($db) = @_; |
181 |
|
182 |
# need to get it here, because g_set destroys it |
183 |
my $binds = $db->{"'l"}{$SRCNODE}; |
184 |
|
185 |
g_set $SRCNODE, $db; |
186 |
|
187 |
# a remote node always has to provide their listeners. for global |
188 |
# nodes, we mirror their 'l locally, just as we also set 'g. |
189 |
# that's not very efficient, but ensures that global nodes |
190 |
# find each other. |
191 |
db_set "'l" => $SRCNODE => $binds; |
192 |
}; |
193 |
|
194 |
# other node (global and slave) sends us a family update |
195 |
$NODE_REQ{g_upd} = sub { |
196 |
&g_upd ($SRCNODE, @_); |
197 |
}; |
198 |
|
199 |
# slave node wants to know the listeners of a node |
200 |
$NODE_REQ{g_find} = sub { |
201 |
my ($node) = @_; |
202 |
|
203 |
snd $SRCNODE, g_found => $node, $GLOBAL_DB{"'l"}{$node}; |
204 |
}; |
205 |
|
206 |
$NODE_REQ{g_db_family} = sub { |
207 |
my ($family, $id) = @_; |
208 |
snd $SRCNODE, g_reply => $id, $GLOBAL_DB{$family} || {}; |
209 |
}; |
210 |
|
211 |
$NODE_REQ{g_db_keys} = sub { |
212 |
my ($family, $id) = @_; |
213 |
snd $SRCNODE, g_reply => $id, [keys %{ $GLOBAL_DB{$family} } ]; |
214 |
}; |
215 |
|
216 |
$NODE_REQ{g_db_values} = sub { |
217 |
my ($family, $id) = @_; |
218 |
snd $SRCNODE, g_reply => $id, [values %{ $GLOBAL_DB{$family} } ]; |
219 |
}; |
220 |
|
221 |
# monitoring |
222 |
|
223 |
sub g_disconnect($) { |
224 |
my ($node) = @_; |
225 |
|
226 |
delete $GLOBAL_NODE{$node}; # also done in Kernel.pm, but doing it here avoids overhead |
227 |
|
228 |
db_del "'g" => $node; |
229 |
db_del "'l" => $node; |
230 |
g_clr $node; |
231 |
|
232 |
if (my $mon = delete $GLOBAL_SLAVE{$node}) { |
233 |
while (my ($f, $fv) = each %$mon) { |
234 |
delete $GLOBAL_MON{$f}{$_} |
235 |
for keys %$fv; |
236 |
|
237 |
delete $GLOBAL_MON{$f} |
238 |
unless %{ $GLOBAL_MON{$f} }; |
239 |
} |
240 |
} |
241 |
} |
242 |
|
243 |
# g_mon0 family - stop monitoring |
244 |
$NODE_REQ{g_mon0} = sub { |
245 |
delete $GLOBAL_MON{$_[0]}{$SRCNODE}; |
246 |
delete $GLOBAL_MON{$_[0]} unless %{ $GLOBAL_MON{$_[0]} }; |
247 |
|
248 |
delete $GLOBAL_SLAVE{$SRCNODE}{$_[0]}; |
249 |
}; |
250 |
|
251 |
# g_mon1 family key - start monitoring |
252 |
$NODE_REQ{g_mon1} = sub { |
253 |
undef $GLOBAL_SLAVE{$SRCNODE}{$_[0]}; |
254 |
undef $GLOBAL_MON{$_[0]}{$SRCNODE}; |
255 |
|
256 |
snd $SRCNODE, g_chg1 => $_[0], $GLOBAL_DB{$_[0]}; |
257 |
}; |
258 |
|
259 |
############################################################################# |
260 |
# switch to global mode |
261 |
|
262 |
# connect from a global node |
263 |
sub g_global_connect { |
264 |
my ($node) = @_; |
265 |
|
266 |
# each node puts the set of connected global nodes into |
267 |
# 'g - this causes a big duplication and mergefest, but |
268 |
# is the easiest way to ensure global nodes have a list |
269 |
# of all other global nodes. |
270 |
# we also mirror 'l as soon as we receive it, causing |
271 |
# even more overhead. |
272 |
db_set "'g" => $node; |
273 |
|
274 |
# global nodes send all local databases of their slaves, merged, |
275 |
# as their database to other global nodes |
276 |
my %db; |
277 |
|
278 |
while (my ($k, $v) = each %LOCAL_DBS) { |
279 |
next unless exists $GLOBAL_SLAVE{$k}; |
280 |
|
281 |
while (my ($f, $fv) = each %$v) { |
282 |
while (my ($k, $kv) = each %$fv) { |
283 |
$db{$f}{$k} = $kv; |
284 |
} |
285 |
} |
286 |
} |
287 |
|
288 |
snd $node => g_set => \%db; |
289 |
} |
290 |
|
291 |
# overrides request in Kernel |
292 |
$NODE_REQ{g_global} = sub { |
293 |
g_disconnect $SRCNODE; # usually a nop, but not when a normal node becomes global |
294 |
undef $GLOBAL_NODE{$SRCNODE}; # same as in Kernel.pm |
295 |
g_global_connect $SRCNODE; |
296 |
}; |
297 |
|
298 |
# delete data from other nodes on node-down |
299 |
mon_nodes sub { |
300 |
if ($_[1]) { |
301 |
snd $_[0] => "g_global"; # tell everybody that we are a global node |
302 |
} else { |
303 |
g_disconnect $_[0]; |
304 |
} |
305 |
}; |
306 |
|
307 |
# now, this is messy |
308 |
AnyEvent::MP::Kernel::post_configure { |
309 |
# enable global mode |
310 |
$GLOBAL = 1; |
311 |
|
312 |
# global nodes are their own masters - this |
313 |
# resends global requests and sets the local database. |
314 |
master_set $NODE; |
315 |
|
316 |
# now add us to the set of global nodes |
317 |
db_set "'g" => $NODE; |
318 |
|
319 |
# tell other nodes that we are global now |
320 |
for (up_nodes) { |
321 |
snd $_, "g_global"; |
322 |
|
323 |
# if the node is global, connect |
324 |
g_global_connect $_ |
325 |
if exists $GLOBAL_NODE{$_}; |
326 |
} |
327 |
|
328 |
# from here on we should be able to act "normally" |
329 |
|
330 |
# maintain connections to all global nodes that we know of |
331 |
db_mon "'g" => sub { |
332 |
keepalive_add $_ for @{ $_[1] }; |
333 |
keepalive_del $_ for @{ $_[3] }; |
334 |
}; |
335 |
}; |
336 |
|
337 |
############################################################################# |
338 |
# compatibility functions for aemp 1.0 |
339 |
|
340 |
package AnyEvent::MP::Global; |
341 |
|
342 |
use base "Exporter"; |
343 |
our @EXPORT = qw(grp_reg grp_get grp_mon); |
344 |
|
345 |
sub grp_reg($$) { |
346 |
&db_reg |
347 |
} |
348 |
|
349 |
sub grp_get($) { |
350 |
my @ports = keys %{ $AnyEvent::MP::Kernel::GLOBAL_DB{$_[0]} }; |
351 |
|
352 |
@ports ? \@ports : undef |
353 |
} |
354 |
|
355 |
sub grp_mon($$) { |
356 |
my ($grp, $cb) = @_; |
357 |
|
358 |
db_mon $grp => sub { |
359 |
my ($ports, $add, $chg, $del) = @_; |
360 |
|
361 |
$cb->([keys %$ports], $add, $del); |
362 |
}; |
363 |
} |
364 |
|
365 |
=head1 SEE ALSO |
366 |
|
367 |
L<AnyEvent::MP>. |
368 |
|
369 |
=head1 AUTHOR |
370 |
|
371 |
Marc Lehmann <schmorp@schmorp.de> |
372 |
http://home.schmorp.de/ |
373 |
|
374 |
=cut |
375 |
|
376 |
1 |
377 |
|