ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Global.pm
Revision: 1.69
Committed: Sat Mar 24 16:50:15 2012 UTC (12 years, 2 months ago) by root
Branch: MAIN
Changes since 1.68: +16 -6 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3 root 1.58 AnyEvent::MP::Global - network backbone services
4 root 1.1
5     =head1 SYNOPSIS
6    
7     use AnyEvent::MP::Global;
8    
9     =head1 DESCRIPTION
10    
11 root 1.58 This module is usually run (or started on) seed nodes and provides a
12     variety of services to connected nodes, such as the distributed database.
13 root 1.1
14 root 1.58 The global nodes form a fully-meshed network, that is, all global nodes
15     currently maintain connections to all other global nodes.
16 root 1.1
17 root 1.58 Loading this module (e.g. as a service) transforms the local node into a
18     global node. There are no user-servicable parts inside.
19 root 1.1
20 root 1.61 For a limited time, this module also exports some AEMP 1.x compatibility
21     functions (C<grp_reg>, C<grp_get> and C<grp_mon>).
22    
23 root 1.1 =cut
24    
25     package AnyEvent::MP::Global;
26    
27     use common::sense;
28     use Carp ();
29    
30 root 1.8 use AnyEvent ();
31 root 1.4
32 root 1.1 use AnyEvent::MP;
33     use AnyEvent::MP::Kernel;
34 root 1.20
35 root 1.60 AE::log 7 => "starting global service.";
36 root 1.5
37 root 1.8 #############################################################################
38 root 1.44 # node protocol parts for global nodes
39 root 1.8
40 root 1.49 package AnyEvent::MP::Kernel;
41 root 1.8
42 root 1.49 # TODO: this is ugly (classical use vars vs. our),
43     # maybe this should go into MP::Kernel
44 root 1.26
45 root 1.49 our %NODE;
46     our $NODE;
47 root 1.46
48 root 1.49 our $GLOBAL;
49     our $MASTER;
50     our $MASTER_TIMER;
51 root 1.46
52 root 1.49 our %GLOBAL_SLAVE;
53 root 1.26
54 root 1.49 our %GLOBAL_DB; # global db
55 root 1.68 our %LOCAL_DBS; # local databases of other nodes (global and slave)
56 root 1.49 our %LOCAL_DB; # this node database
57 root 1.46
58 root 1.59 our $SRCNODE; # the origin node id
59 root 1.49 our %NODE_REQ;
60 root 1.46
61 root 1.49 # only in global code
62 root 1.51 our %GLOBAL_MON; # monitors {family}
63 root 1.47
64 root 1.49 # broadcasts a message to all other global nodes
65     sub g_broadcast {
66     snd $_, @_
67 root 1.63 for grep $_ ne $NODE && node_is_up $_, keys %{ $GLOBAL_DB{"'g"} }
68 root 1.49 }
69 root 1.46
70 root 1.57 # add/replace/del inside a family in the database
71 root 1.66 # @$del must not contain any key in %$set
72 root 1.57 sub g_upd {
73     my ($node, $family, $set, $del) = @_;
74    
75     my $ldb = $LOCAL_DBS{$node}{$family} ||= {};
76     my $gdb = $GLOBAL_DB {$family} ||= {};
77    
78     # add/replace keys
79     while (my ($k, $v) = each %$set) {
80     $ldb->{$k} =
81     $gdb->{$k} = $v;
82     }
83    
84     my @del; # actual deletes
85    
86     # take care of deletes
87     keydel:
88     for my $k (@$del) {
89 root 1.68 next unless exists $ldb->{$k};
90    
91 root 1.57 delete $ldb->{$k};
92     delete $gdb->{$k};
93    
94     # check if some other node still has the key, then we don't delete, but change
95     for (values %LOCAL_DBS) {
96     if (exists $_->{$family}{$k}) {
97     $set->{$k} = $gdb->{$k} = $_->{$family}{$k};
98 root 1.46
99 root 1.57 next keydel;
100     }
101     }
102 root 1.46
103 root 1.57 push @del, $k;
104     }
105 root 1.51
106 root 1.57 # family could be empty now
107     delete $GLOBAL_DB{$family} unless %$gdb;
108     delete $LOCAL_DBS{$node}{$family} unless %$ldb;
109 root 1.51
110 root 1.68 return unless %$set || @del;
111    
112 root 1.57 g_broadcast g_upd => $family, $set, \@del
113     if exists $GLOBAL_SLAVE{$node};
114 root 1.46
115 root 1.57 # tell subscribers we have changed the family
116     snd $_ => g_chg2 => $family, $set, \@del
117     for keys %{ $GLOBAL_MON{$family} };
118 root 1.51 }
119    
120     # set the whole (node-local) database - previous value must be empty
121     sub g_set($$) {
122     my ($node, $db) = @_;
123    
124     while (my ($f, $k) = each %$db) {
125 root 1.57 g_upd $node, $f, $k;
126 root 1.51 }
127 root 1.49 }
128 root 1.46
129 root 1.49 # delete all keys from a database
130     sub g_clr($) {
131     my ($node) = @_;
132    
133     my $db = $LOCAL_DBS{$node};
134 root 1.57
135 root 1.49 while (my ($f, $k) = each %$db) {
136 root 1.57 g_upd $node, $f, undef, [keys %$k];
137 root 1.46 }
138    
139 root 1.49 delete $LOCAL_DBS{$node};
140     }
141    
142     # gather node databases from slaves
143 root 1.46
144 root 1.69 # other node wants to make us the master and sends us their db
145 root 1.49 $NODE_REQ{g_slave} = sub {
146 root 1.58 my ($db) = @_
147     or return; # empty g_slave is used to start global service
148 root 1.46
149 root 1.59 my $node = $SRCNODE;
150 root 1.49 undef $GLOBAL_SLAVE{$node};
151     g_set $node, $db;
152     };
153 root 1.46
154 root 1.69 # other global node sends us their database
155 root 1.57 $NODE_REQ{g_set} = sub {
156 root 1.69 my ($db) = @_;
157    
158     g_set $SRCNODE, $db;
159    
160     # a remote node always has to provide their listeners. for global
161     # nodes, we mirror their 'l locally, just as we also set 'g.
162     # that's not very efficient, but ensures that global nodes
163     # find each other.
164     db_set "'l" => $SRCNODE => $db->{"'l"}{$SRCNODE};
165 root 1.49 };
166 root 1.46
167 root 1.66 # other node (global and slave) sends us a family update
168 root 1.57 $NODE_REQ{g_upd} = sub {
169 root 1.61 &g_upd ($SRCNODE, @_);
170 root 1.49 };
171 root 1.46
172 root 1.66 # slave node wants to know the listeners of a node
173 root 1.49 $NODE_REQ{g_find} = sub {
174     my ($node) = @_;
175 root 1.46
176 root 1.59 snd $SRCNODE, g_found => $node, $GLOBAL_DB{"'l"}{$node};
177 root 1.49 };
178 root 1.46
179 root 1.55 $NODE_REQ{g_db_family} = sub {
180     my ($family, $id) = @_;
181 root 1.59 snd $SRCNODE, g_reply => $id, $GLOBAL_DB{$family} || {};
182 root 1.55 };
183    
184     $NODE_REQ{g_db_keys} = sub {
185     my ($family, $id) = @_;
186 root 1.59 snd $SRCNODE, g_reply => $id, [keys %{ $GLOBAL_DB{$family} } ];
187 root 1.55 };
188    
189     $NODE_REQ{g_db_values} = sub {
190     my ($family, $id) = @_;
191 root 1.59 snd $SRCNODE, g_reply => $id, [values %{ $GLOBAL_DB{$family} } ];
192 root 1.55 };
193    
194 root 1.49 # monitoring
195 root 1.47
196 root 1.67 sub g_disconnect($) {
197 root 1.49 my ($node) = @_;
198 root 1.47
199 root 1.68 db_del "'g" => $node;
200 root 1.69 db_del "'l" => $node;
201 root 1.49 g_clr $node;
202 root 1.47
203 root 1.49 if (my $mon = delete $GLOBAL_SLAVE{$node}) {
204     while (my ($f, $fv) = each %$mon) {
205     delete $GLOBAL_MON{$f}{$_}
206     for keys %$fv;
207 root 1.51
208     delete $GLOBAL_MON{$f}
209     unless %{ $GLOBAL_MON{$f} };
210 root 1.47 }
211     }
212 root 1.49 }
213    
214 root 1.51 # g_mon0 family - stop monitoring
215 root 1.49 $NODE_REQ{g_mon0} = sub {
216 root 1.59 delete $GLOBAL_MON{$_[0]}{$SRCNODE};
217 root 1.51 delete $GLOBAL_MON{$_[0]} unless %{ $GLOBAL_MON{$_[0]} };
218    
219 root 1.59 delete $GLOBAL_SLAVE{$SRCNODE}{$_[0]};
220 root 1.49 };
221    
222     # g_mon1 family key - start monitoring
223     $NODE_REQ{g_mon1} = sub {
224 root 1.59 undef $GLOBAL_SLAVE{$SRCNODE}{$_[0]};
225     undef $GLOBAL_MON{$_[0]}{$SRCNODE};
226 root 1.49
227 root 1.59 snd $SRCNODE, g_chg1 => $_[0], $GLOBAL_DB{$_[0]};
228 root 1.49 };
229 root 1.47
230 root 1.49 #############################################################################
231     # switch to global mode
232    
233 root 1.67 # delete data from other nodes on node-down
234 root 1.66 mon_nodes sub {
235 root 1.67 g_disconnect $_[0] unless $_[1];
236 root 1.49 };
237    
238     # tell everybody who connects that we are a global node
239     push @AnyEvent::MP::Transport::HOOK_GREET, sub {
240     $_[0]{local_greeting}{global} = 1;
241     };
242    
243     # connect from a global node
244     sub g_global_connect {
245     my ($node) = @_;
246    
247 root 1.68 # each node puts the set of connected global nodes into
248     # 'g - this causes a big duplication and mergefest, but
249     # is the easiest way to ensure global nodes have a list
250     # of all other global nodes.
251 root 1.69 # we also mirror 'l as soon as we receive it, causing
252     # even more overhead.
253 root 1.68 db_set "'g" => $node;
254 root 1.49
255 root 1.68 # global nodes send all local databases of their slaves, merged,
256 root 1.69 # as their database to other global nodes
257 root 1.49 my %db;
258    
259 root 1.68 while (my ($k, $v) = each %LOCAL_DBS) {
260     next unless exists $GLOBAL_SLAVE{$k};
261    
262 root 1.69 while (my ($f, $fv) = each %$v) {
263 root 1.49 while (my ($k, $kv) = each %$fv) {
264     $db{$f}{$k} = $kv;
265 root 1.46 }
266     }
267 root 1.47 }
268    
269 root 1.49 snd $node => g_set => \%db;
270 root 1.4 }
271    
272 root 1.49 # send our database to every global node that connects
273     push @AnyEvent::MP::Transport::HOOK_CONNECT, sub {
274     return unless $_[0]{remote_greeting}{global};
275    
276     g_global_connect $_[0]{remote_node};
277     };
278    
279 root 1.66 # tell other global nodes that we are global now
280     # TODO: there is probably a race when two conencted nodes beocme global at the same time
281     # very ugly.
282 root 1.49 for (values %NODE) {
283     if ($_->{transport} && $_->{transport}{remote_greeting}{global}) {
284     snd $_->{id} => "g_global";
285     g_global_connect $_->{id};
286     }
287 root 1.20 }
288    
289 root 1.49 $NODE_REQ{g_global} = sub {
290 root 1.67 g_disconnect $SRCNODE;
291 root 1.59
292     my $node = $NODE{$SRCNODE};
293     $node->{transport}{remote_greeting}{global} = 1;
294    
295     g_global_connect $SRCNODE;
296 root 1.49 };
297 root 1.20
298 root 1.56 # enable global mode
299     $GLOBAL = 1;
300    
301     # global nodes are their own masters - this
302     # resends global requests and sets the local database.
303     master_set $NODE;
304    
305 root 1.68 # from here on we should be able to act normally
306    
307 root 1.49 # now add us to the set of global nodes
308 root 1.68 db_set "'g" => $NODE;
309    
310     # maintain connections to all global nodes that we know of
311     db_mon "'g" => sub {
312     keepalive_add $_ for @{ $_[1] };
313     keepalive_del $_ for @{ $_[3] };
314     };
315 root 1.3
316 root 1.58 #############################################################################
317     # compatibility functions for aemp 1.0
318    
319 root 1.61 package AnyEvent::MP::Global;
320    
321     use base "Exporter";
322     our @EXPORT = qw(grp_reg grp_get grp_mon);
323    
324     sub grp_reg($$) {
325     &db_reg
326     }
327    
328     sub grp_get($) {
329     my @ports = keys %{ $AnyEvent::MP::Kernel::GLOBAL_DB{$_[0]} };
330    
331     @ports ? \@ports : undef
332     }
333    
334     sub grp_mon($$) {
335     my ($grp, $cb) = @_;
336    
337     db_mon $grp => sub {
338     my ($ports, $add, $chg, $del) = @_;
339    
340     $cb->([keys %$ports], $add, $del);
341     };
342     }
343 root 1.1
344     =head1 SEE ALSO
345    
346     L<AnyEvent::MP>.
347    
348     =head1 AUTHOR
349    
350     Marc Lehmann <schmorp@schmorp.de>
351     http://home.schmorp.de/
352    
353     =cut
354    
355     1
356