ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Global.pm
Revision: 1.71
Committed: Tue Oct 2 13:58:07 2012 UTC (11 years, 7 months ago) by root
Branch: MAIN
CVS Tags: rel-2_02, rel-2_01, rel-2_0, HEAD
Changes since 1.70: +28 -25 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3 root 1.58 AnyEvent::MP::Global - network backbone services
4 root 1.1
5     =head1 SYNOPSIS
6    
7     use AnyEvent::MP::Global;
8    
9     =head1 DESCRIPTION
10    
11 root 1.58 This module is usually run (or started on) seed nodes and provides a
12     variety of services to connected nodes, such as the distributed database.
13 root 1.1
14 root 1.58 The global nodes form a fully-meshed network, that is, all global nodes
15     currently maintain connections to all other global nodes.
16 root 1.1
17 root 1.58 Loading this module (e.g. as a service) transforms the local node into a
18     global node. There are no user-servicable parts inside.
19 root 1.1
20 root 1.61 For a limited time, this module also exports some AEMP 1.x compatibility
21     functions (C<grp_reg>, C<grp_get> and C<grp_mon>).
22    
23 root 1.1 =cut
24    
25     package AnyEvent::MP::Global;
26    
27     use common::sense;
28     use Carp ();
29 root 1.70 use List::Util ();
30 root 1.1
31 root 1.8 use AnyEvent ();
32 root 1.4
33 root 1.1 use AnyEvent::MP;
34     use AnyEvent::MP::Kernel;
35 root 1.20
36 root 1.60 AE::log 7 => "starting global service.";
37 root 1.5
38 root 1.8 #############################################################################
39 root 1.44 # node protocol parts for global nodes
40 root 1.8
41 root 1.49 package AnyEvent::MP::Kernel;
42 root 1.8
43 root 1.70 use JSON::XS ();
44    
45 root 1.49 # TODO: this is ugly (classical use vars vs. our),
46     # maybe this should go into MP::Kernel
47 root 1.26
48 root 1.70 # "import" from Kernel
49 root 1.49 our %NODE;
50     our $NODE;
51 root 1.70 #our $GLOBAL;
52     our $SRCNODE; # the origin node id
53     our %NODE_REQ;
54     our %GLOBAL_NODE;
55 root 1.49 our $GLOBAL;
56 root 1.46
57 root 1.70 # only in global code
58 root 1.49 our %GLOBAL_SLAVE;
59 root 1.70 our %GLOBAL_MON; # monitors {family}
60 root 1.26
61 root 1.70 our %GLOBAL_DB; # all local databases, merged - empty on non-global nodes
62 root 1.68 our %LOCAL_DBS; # local databases of other nodes (global and slave)
63 root 1.49 our %LOCAL_DB; # this node database
64 root 1.46
65 root 1.49 # broadcasts a message to all other global nodes
66     sub g_broadcast {
67     snd $_, @_
68 root 1.70 for keys %GLOBAL_NODE;
69 root 1.49 }
70 root 1.46
71 root 1.57 # add/replace/del inside a family in the database
72 root 1.66 # @$del must not contain any key in %$set
73 root 1.57 sub g_upd {
74     my ($node, $family, $set, $del) = @_;
75    
76     my $ldb = $LOCAL_DBS{$node}{$family} ||= {};
77     my $gdb = $GLOBAL_DB {$family} ||= {};
78    
79 root 1.70 my %local_set; # extra local set's created by deletes
80    
81 root 1.57 # add/replace keys
82     while (my ($k, $v) = each %$set) {
83 root 1.70 #TODO# optimize duplicate gdb-set's, to some extent, maybe
84     # but is probably difficult and slow, so don't for the time being.
85    
86 root 1.57 $ldb->{$k} =
87     $gdb->{$k} = $v;
88     }
89    
90 root 1.70 my (@del_local, @del_global); # actual deletes for other global nodes / our slaves
91 root 1.57
92     # take care of deletes
93     for my $k (@$del) {
94 root 1.70 delete $ldb->{$k};
95 root 1.68
96 root 1.70 if (my @other = grep exists $LOCAL_DBS{$_}{$family}{$k}, keys %LOCAL_DBS) {
97     # key exists in some other db shard(s)
98 root 1.57
99 root 1.70 # if there is a local one, we have to update
100     # otherwise, we update and delete on other globals
101 root 1.46
102 root 1.70 if (my $local = List::Util::first { exists $GLOBAL_SLAVE{$_} } @other) {
103     $set->{$k} =
104     $gdb->{$k} = $LOCAL_DBS{$local}{$family}{$k}
105     unless sv_eq $gdb->{$k}, $LOCAL_DBS{$local}{$family}{$k};
106    
107     } else {
108     # must be in a global one then
109     my $global = List::Util::first { !exists $GLOBAL_SLAVE{$_} } @other;
110    
111     push @del_global, $k;
112    
113     $local_set{$k} =
114     $gdb->{$k} = $LOCAL_DBS{$global}{$family}{$k}
115     unless sv_eq $gdb->{$k}, $LOCAL_DBS{$global}{$family}{$k};
116 root 1.57 }
117 root 1.70 } else {
118     delete $gdb->{$k};
119    
120     # this was the only one, so delete locally
121     push @del_local, $k;
122     # and globally, if it's a local key
123     push @del_global, $k if exists $GLOBAL_SLAVE{$node};
124 root 1.57 }
125     }
126 root 1.51
127 root 1.57 # family could be empty now
128 root 1.70 delete $GLOBAL_DB {$family} unless %$gdb;
129 root 1.57 delete $LOCAL_DBS{$node}{$family} unless %$ldb;
130 root 1.51
131 root 1.70 # tell other global nodes any changes in our database
132     g_broadcast g_upd => $family, $set, \@del_global
133     if exists $GLOBAL_SLAVE{$node} && (%$set || @del_global);
134 root 1.68
135 root 1.70 # tell subscribers we have changed the family
136     if (%$set || %local_set || @del_local) {
137     @$set{keys %local_set} = values %local_set;
138 root 1.46
139 root 1.70 snd $_ => g_chg2 => $family, $set, \@del_local
140     for keys %{ $GLOBAL_MON{$family} };
141     }
142 root 1.51 }
143    
144     # set the whole (node-local) database - previous value must be empty
145     sub g_set($$) {
146     my ($node, $db) = @_;
147    
148     while (my ($f, $k) = each %$db) {
149 root 1.57 g_upd $node, $f, $k;
150 root 1.51 }
151 root 1.49 }
152 root 1.46
153 root 1.49 # delete all keys from a database
154     sub g_clr($) {
155     my ($node) = @_;
156    
157     my $db = $LOCAL_DBS{$node};
158 root 1.57
159 root 1.49 while (my ($f, $k) = each %$db) {
160 root 1.57 g_upd $node, $f, undef, [keys %$k];
161 root 1.46 }
162    
163 root 1.49 delete $LOCAL_DBS{$node};
164     }
165    
166     # gather node databases from slaves
167 root 1.46
168 root 1.69 # other node wants to make us the master and sends us their db
169 root 1.49 $NODE_REQ{g_slave} = sub {
170 root 1.58 my ($db) = @_
171     or return; # empty g_slave is used to start global service
172 root 1.46
173 root 1.59 my $node = $SRCNODE;
174 root 1.49 undef $GLOBAL_SLAVE{$node};
175     g_set $node, $db;
176     };
177 root 1.46
178 root 1.69 # other global node sends us their database
179 root 1.57 $NODE_REQ{g_set} = sub {
180 root 1.69 my ($db) = @_;
181    
182 root 1.70 # need to get it here, because g_set destroys it
183     my $binds = $db->{"'l"}{$SRCNODE};
184    
185 root 1.69 g_set $SRCNODE, $db;
186    
187     # a remote node always has to provide their listeners. for global
188     # nodes, we mirror their 'l locally, just as we also set 'g.
189     # that's not very efficient, but ensures that global nodes
190     # find each other.
191 root 1.70 db_set "'l" => $SRCNODE => $binds;
192 root 1.49 };
193 root 1.46
194 root 1.66 # other node (global and slave) sends us a family update
195 root 1.57 $NODE_REQ{g_upd} = sub {
196 root 1.61 &g_upd ($SRCNODE, @_);
197 root 1.49 };
198 root 1.46
199 root 1.66 # slave node wants to know the listeners of a node
200 root 1.49 $NODE_REQ{g_find} = sub {
201     my ($node) = @_;
202 root 1.46
203 root 1.59 snd $SRCNODE, g_found => $node, $GLOBAL_DB{"'l"}{$node};
204 root 1.49 };
205 root 1.46
206 root 1.55 $NODE_REQ{g_db_family} = sub {
207     my ($family, $id) = @_;
208 root 1.59 snd $SRCNODE, g_reply => $id, $GLOBAL_DB{$family} || {};
209 root 1.55 };
210    
211     $NODE_REQ{g_db_keys} = sub {
212     my ($family, $id) = @_;
213 root 1.59 snd $SRCNODE, g_reply => $id, [keys %{ $GLOBAL_DB{$family} } ];
214 root 1.55 };
215    
216     $NODE_REQ{g_db_values} = sub {
217     my ($family, $id) = @_;
218 root 1.59 snd $SRCNODE, g_reply => $id, [values %{ $GLOBAL_DB{$family} } ];
219 root 1.55 };
220    
221 root 1.49 # monitoring
222 root 1.47
223 root 1.67 sub g_disconnect($) {
224 root 1.49 my ($node) = @_;
225 root 1.47
226 root 1.70 delete $GLOBAL_NODE{$node}; # also done in Kernel.pm, but doing it here avoids overhead
227    
228 root 1.68 db_del "'g" => $node;
229 root 1.69 db_del "'l" => $node;
230 root 1.49 g_clr $node;
231 root 1.47
232 root 1.49 if (my $mon = delete $GLOBAL_SLAVE{$node}) {
233     while (my ($f, $fv) = each %$mon) {
234     delete $GLOBAL_MON{$f}{$_}
235     for keys %$fv;
236 root 1.51
237     delete $GLOBAL_MON{$f}
238     unless %{ $GLOBAL_MON{$f} };
239 root 1.47 }
240     }
241 root 1.49 }
242    
243 root 1.51 # g_mon0 family - stop monitoring
244 root 1.49 $NODE_REQ{g_mon0} = sub {
245 root 1.59 delete $GLOBAL_MON{$_[0]}{$SRCNODE};
246 root 1.51 delete $GLOBAL_MON{$_[0]} unless %{ $GLOBAL_MON{$_[0]} };
247    
248 root 1.59 delete $GLOBAL_SLAVE{$SRCNODE}{$_[0]};
249 root 1.49 };
250    
251     # g_mon1 family key - start monitoring
252     $NODE_REQ{g_mon1} = sub {
253 root 1.59 undef $GLOBAL_SLAVE{$SRCNODE}{$_[0]};
254     undef $GLOBAL_MON{$_[0]}{$SRCNODE};
255 root 1.49
256 root 1.59 snd $SRCNODE, g_chg1 => $_[0], $GLOBAL_DB{$_[0]};
257 root 1.49 };
258 root 1.47
259 root 1.49 #############################################################################
260     # switch to global mode
261    
262     # connect from a global node
263     sub g_global_connect {
264     my ($node) = @_;
265    
266 root 1.68 # each node puts the set of connected global nodes into
267     # 'g - this causes a big duplication and mergefest, but
268     # is the easiest way to ensure global nodes have a list
269     # of all other global nodes.
270 root 1.69 # we also mirror 'l as soon as we receive it, causing
271     # even more overhead.
272 root 1.68 db_set "'g" => $node;
273 root 1.49
274 root 1.68 # global nodes send all local databases of their slaves, merged,
275 root 1.69 # as their database to other global nodes
276 root 1.49 my %db;
277    
278 root 1.68 while (my ($k, $v) = each %LOCAL_DBS) {
279     next unless exists $GLOBAL_SLAVE{$k};
280    
281 root 1.69 while (my ($f, $fv) = each %$v) {
282 root 1.49 while (my ($k, $kv) = each %$fv) {
283     $db{$f}{$k} = $kv;
284 root 1.46 }
285     }
286 root 1.47 }
287    
288 root 1.49 snd $node => g_set => \%db;
289 root 1.4 }
290    
291 root 1.70 # overrides request in Kernel
292     $NODE_REQ{g_global} = sub {
293     g_disconnect $SRCNODE; # usually a nop, but not when a normal node becomes global
294     undef $GLOBAL_NODE{$SRCNODE}; # same as in Kernel.pm
295     g_global_connect $SRCNODE;
296 root 1.49 };
297    
298 root 1.70 # delete data from other nodes on node-down
299     mon_nodes sub {
300     if ($_[1]) {
301     snd $_[0] => "g_global"; # tell everybody that we are a global node
302     } else {
303     g_disconnect $_[0];
304 root 1.49 }
305     };
306 root 1.20
307 root 1.71 # now, this is messy
308     AnyEvent::MP::Kernel::post_configure {
309     # enable global mode
310     $GLOBAL = 1;
311    
312     # global nodes are their own masters - this
313     # resends global requests and sets the local database.
314     master_set $NODE;
315    
316     # now add us to the set of global nodes
317     db_set "'g" => $NODE;
318    
319     # tell other nodes that we are global now
320     for (up_nodes) {
321     snd $_, "g_global";
322    
323     # if the node is global, connect
324     g_global_connect $_
325     if exists $GLOBAL_NODE{$_};
326     }
327    
328     # from here on we should be able to act "normally"
329    
330     # maintain connections to all global nodes that we know of
331     db_mon "'g" => sub {
332     keepalive_add $_ for @{ $_[1] };
333     keepalive_del $_ for @{ $_[3] };
334     };
335 root 1.68 };
336 root 1.3
337 root 1.58 #############################################################################
338     # compatibility functions for aemp 1.0
339    
340 root 1.61 package AnyEvent::MP::Global;
341    
342     use base "Exporter";
343     our @EXPORT = qw(grp_reg grp_get grp_mon);
344    
345     sub grp_reg($$) {
346     &db_reg
347     }
348    
349     sub grp_get($) {
350     my @ports = keys %{ $AnyEvent::MP::Kernel::GLOBAL_DB{$_[0]} };
351    
352     @ports ? \@ports : undef
353     }
354    
355     sub grp_mon($$) {
356     my ($grp, $cb) = @_;
357    
358     db_mon $grp => sub {
359     my ($ports, $add, $chg, $del) = @_;
360    
361     $cb->([keys %$ports], $add, $del);
362     };
363     }
364 root 1.1
365     =head1 SEE ALSO
366    
367     L<AnyEvent::MP>.
368    
369     =head1 AUTHOR
370    
371     Marc Lehmann <schmorp@schmorp.de>
372     http://home.schmorp.de/
373    
374     =cut
375    
376     1
377