ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Global.pm
Revision: 1.67
Committed: Sat Mar 24 01:00:28 2012 UTC (12 years, 2 months ago) by root
Branch: MAIN
Changes since 1.66: +4 -5 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3 root 1.58 AnyEvent::MP::Global - network backbone services
4 root 1.1
5     =head1 SYNOPSIS
6    
7     use AnyEvent::MP::Global;
8    
9     =head1 DESCRIPTION
10    
11 root 1.58 This module is usually run (or started on) seed nodes and provides a
12     variety of services to connected nodes, such as the distributed database.
13 root 1.1
14 root 1.58 The global nodes form a fully-meshed network, that is, all global nodes
15     currently maintain connections to all other global nodes.
16 root 1.1
17 root 1.58 Loading this module (e.g. as a service) transforms the local node into a
18     global node. There are no user-servicable parts inside.
19 root 1.1
20 root 1.61 For a limited time, this module also exports some AEMP 1.x compatibility
21     functions (C<grp_reg>, C<grp_get> and C<grp_mon>).
22    
23 root 1.1 =cut
24    
25     package AnyEvent::MP::Global;
26    
27     use common::sense;
28     use Carp ();
29    
30 root 1.8 use AnyEvent ();
31 root 1.4
32 root 1.1 use AnyEvent::MP;
33     use AnyEvent::MP::Kernel;
34 root 1.20
35 root 1.60 AE::log 7 => "starting global service.";
36 root 1.5
37 root 1.8 #############################################################################
38 root 1.44 # node protocol parts for global nodes
39 root 1.8
40 root 1.49 package AnyEvent::MP::Kernel;
41 root 1.8
42 root 1.49 # TODO: this is ugly (classical use vars vs. our),
43     # maybe this should go into MP::Kernel
44 root 1.26
45 root 1.49 our %NODE;
46     our $NODE;
47 root 1.46
48 root 1.49 our $GLOBAL;
49     our $MASTER;
50     our $MASTER_TIMER;
51 root 1.46
52 root 1.49 our %GLOBAL_SLAVE;
53 root 1.26
54 root 1.49 our %GLOBAL_DB; # global db
55     our %LOCAL_DBS; # local databases of other global nodes
56     our %LOCAL_DB; # this node database
57 root 1.46
58 root 1.59 our $SRCNODE; # the origin node id
59 root 1.49 our %NODE_REQ;
60 root 1.46
61 root 1.49 # only in global code
62 root 1.51 our %GLOBAL_MON; # monitors {family}
63 root 1.47
64 root 1.49 # broadcasts a message to all other global nodes
65     sub g_broadcast {
66     snd $_, @_
67 root 1.63 for grep $_ ne $NODE && node_is_up $_, keys %{ $GLOBAL_DB{"'g"} }
68 root 1.49 }
69 root 1.46
70 root 1.57 # add/replace/del inside a family in the database
71 root 1.66 # @$del must not contain any key in %$set
72 root 1.57 sub g_upd {
73     my ($node, $family, $set, $del) = @_;
74    
75     my $ldb = $LOCAL_DBS{$node}{$family} ||= {};
76     my $gdb = $GLOBAL_DB {$family} ||= {};
77    
78     # add/replace keys
79     while (my ($k, $v) = each %$set) {
80     $ldb->{$k} =
81     $gdb->{$k} = $v;
82     }
83    
84     my @del; # actual deletes
85    
86     # take care of deletes
87     keydel:
88     for my $k (@$del) {
89     delete $ldb->{$k};
90     delete $gdb->{$k};
91    
92     # check if some other node still has the key, then we don't delete, but change
93     for (values %LOCAL_DBS) {
94     if (exists $_->{$family}{$k}) {
95     $set->{$k} = $gdb->{$k} = $_->{$family}{$k};
96 root 1.46
97 root 1.57 next keydel;
98     }
99     }
100 root 1.46
101 root 1.57 push @del, $k;
102     }
103 root 1.51
104 root 1.57 # family could be empty now
105     delete $GLOBAL_DB{$family} unless %$gdb;
106     delete $LOCAL_DBS{$node}{$family} unless %$ldb;
107 root 1.51
108 root 1.57 g_broadcast g_upd => $family, $set, \@del
109     if exists $GLOBAL_SLAVE{$node};
110 root 1.46
111 root 1.57 # tell subscribers we have changed the family
112     snd $_ => g_chg2 => $family, $set, \@del
113     for keys %{ $GLOBAL_MON{$family} };
114 root 1.51 }
115    
116     # set the whole (node-local) database - previous value must be empty
117     sub g_set($$) {
118     my ($node, $db) = @_;
119    
120     while (my ($f, $k) = each %$db) {
121 root 1.57 g_upd $node, $f, $k;
122 root 1.51 }
123 root 1.49 }
124 root 1.46
125 root 1.49 # delete all keys from a database
126     sub g_clr($) {
127     my ($node) = @_;
128    
129     my $db = $LOCAL_DBS{$node};
130 root 1.57
131 root 1.49 while (my ($f, $k) = each %$db) {
132 root 1.57 g_upd $node, $f, undef, [keys %$k];
133 root 1.46 }
134    
135 root 1.49 delete $LOCAL_DBS{$node};
136     }
137    
138     # gather node databases from slaves
139 root 1.46
140 root 1.49 # other node wants to make us the master
141     $NODE_REQ{g_slave} = sub {
142 root 1.58 my ($db) = @_
143     or return; # empty g_slave is used to start global service
144 root 1.46
145 root 1.59 my $node = $SRCNODE;
146 root 1.49 undef $GLOBAL_SLAVE{$node};
147     g_set $node, $db;
148     };
149 root 1.46
150 root 1.66 # other node (global and slave) sends us their database
151 root 1.57 $NODE_REQ{g_set} = sub {
152 root 1.61 &g_set ($SRCNODE, @_);
153 root 1.49 };
154 root 1.46
155 root 1.66 # other node (global and slave) sends us a family update
156 root 1.57 $NODE_REQ{g_upd} = sub {
157 root 1.61 &g_upd ($SRCNODE, @_);
158 root 1.49 };
159 root 1.46
160 root 1.66 # slave node wants to know the listeners of a node
161 root 1.49 $NODE_REQ{g_find} = sub {
162     my ($node) = @_;
163 root 1.46
164 root 1.59 snd $SRCNODE, g_found => $node, $GLOBAL_DB{"'l"}{$node};
165 root 1.49 };
166 root 1.46
167 root 1.55 $NODE_REQ{g_db_family} = sub {
168     my ($family, $id) = @_;
169 root 1.59 snd $SRCNODE, g_reply => $id, $GLOBAL_DB{$family} || {};
170 root 1.55 };
171    
172     $NODE_REQ{g_db_keys} = sub {
173     my ($family, $id) = @_;
174 root 1.59 snd $SRCNODE, g_reply => $id, [keys %{ $GLOBAL_DB{$family} } ];
175 root 1.55 };
176    
177     $NODE_REQ{g_db_values} = sub {
178     my ($family, $id) = @_;
179 root 1.59 snd $SRCNODE, g_reply => $id, [values %{ $GLOBAL_DB{$family} } ];
180 root 1.55 };
181    
182 root 1.49 # monitoring
183 root 1.47
184 root 1.67 sub g_disconnect($) {
185 root 1.49 my ($node) = @_;
186 root 1.47
187 root 1.49 g_clr $node;
188 root 1.47
189 root 1.49 if (my $mon = delete $GLOBAL_SLAVE{$node}) {
190     while (my ($f, $fv) = each %$mon) {
191     delete $GLOBAL_MON{$f}{$_}
192     for keys %$fv;
193 root 1.51
194     delete $GLOBAL_MON{$f}
195     unless %{ $GLOBAL_MON{$f} };
196 root 1.47 }
197     }
198 root 1.49 }
199    
200 root 1.51 # g_mon0 family - stop monitoring
201 root 1.49 $NODE_REQ{g_mon0} = sub {
202 root 1.59 delete $GLOBAL_MON{$_[0]}{$SRCNODE};
203 root 1.51 delete $GLOBAL_MON{$_[0]} unless %{ $GLOBAL_MON{$_[0]} };
204    
205 root 1.59 delete $GLOBAL_SLAVE{$SRCNODE}{$_[0]};
206 root 1.49 };
207    
208     # g_mon1 family key - start monitoring
209     $NODE_REQ{g_mon1} = sub {
210 root 1.59 undef $GLOBAL_SLAVE{$SRCNODE}{$_[0]};
211     undef $GLOBAL_MON{$_[0]}{$SRCNODE};
212 root 1.49
213 root 1.59 snd $SRCNODE, g_chg1 => $_[0], $GLOBAL_DB{$_[0]};
214 root 1.49 };
215 root 1.47
216 root 1.49 #############################################################################
217     # switch to global mode
218    
219 root 1.66 # maintain conenctions to all global nodes that we know of
220 root 1.64 db_mon "'g" => sub {
221 root 1.66 keepalive_add $_ for @{ $_[1] };
222     keepalive_del $_ for @{ $_[3] };
223 root 1.64 };
224 root 1.47
225 root 1.67 # delete data from other nodes on node-down
226 root 1.66 mon_nodes sub {
227 root 1.67 g_disconnect $_[0] unless $_[1];
228 root 1.49 };
229    
230     # tell everybody who connects that we are a global node
231     push @AnyEvent::MP::Transport::HOOK_GREET, sub {
232     $_[0]{local_greeting}{global} = 1;
233     };
234    
235     # connect from a global node
236     sub g_global_connect {
237     my ($node) = @_;
238    
239     # we need to set this currently, as to avoid race conditions
240 root 1.66 # because it takes a while until the other global node tells us it is global,
241     # but we need to send updates even before that.
242 root 1.49
243 root 1.66 undef $GLOBAL_DB {"'g"}{$node};
244 root 1.49 undef $LOCAL_DBS{$node}{"'g"}{$node};
245    
246     # global nodes send all local databases, merged,
247 root 1.66 # as their local database to other global nodes
248 root 1.49 my %db;
249    
250     for (values %LOCAL_DBS) {
251     while (my ($f, $fv) = each %$_) {
252     while (my ($k, $kv) = each %$fv) {
253     $db{$f}{$k} = $kv;
254 root 1.46 }
255     }
256 root 1.47 }
257    
258 root 1.49 snd $node => g_set => \%db;
259 root 1.4 }
260    
261 root 1.49 # send our database to every global node that connects
262     push @AnyEvent::MP::Transport::HOOK_CONNECT, sub {
263     return unless $_[0]{remote_greeting}{global};
264    
265     g_global_connect $_[0]{remote_node};
266     };
267    
268 root 1.66 # tell other global nodes that we are global now
269     # TODO: there is probably a race when two conencted nodes beocme global at the same time
270     # very ugly.
271 root 1.49 for (values %NODE) {
272     if ($_->{transport} && $_->{transport}{remote_greeting}{global}) {
273     snd $_->{id} => "g_global";
274     g_global_connect $_->{id};
275     }
276 root 1.20 }
277    
278 root 1.49 $NODE_REQ{g_global} = sub {
279 root 1.67 g_disconnect $SRCNODE;
280 root 1.59
281     my $node = $NODE{$SRCNODE};
282     $node->{transport}{remote_greeting}{global} = 1;
283    
284     g_global_connect $SRCNODE;
285 root 1.49 };
286 root 1.20
287 root 1.56 # enable global mode
288     $GLOBAL = 1;
289    
290     # global nodes are their own masters - this
291     # resends global requests and sets the local database.
292     master_set $NODE;
293    
294 root 1.49 # now add us to the set of global nodes
295     db_set "'g" => $NODE => undef;
296 root 1.3
297 root 1.58 #############################################################################
298     # compatibility functions for aemp 1.0
299    
300 root 1.61 package AnyEvent::MP::Global;
301    
302     use base "Exporter";
303     our @EXPORT = qw(grp_reg grp_get grp_mon);
304    
305     sub grp_reg($$) {
306     &db_reg
307     }
308    
309     sub grp_get($) {
310     my @ports = keys %{ $AnyEvent::MP::Kernel::GLOBAL_DB{$_[0]} };
311    
312     @ports ? \@ports : undef
313     }
314    
315     sub grp_mon($$) {
316     my ($grp, $cb) = @_;
317    
318     db_mon $grp => sub {
319     my ($ports, $add, $chg, $del) = @_;
320    
321     $cb->([keys %$ports], $add, $del);
322     };
323     }
324 root 1.1
325     =head1 SEE ALSO
326    
327     L<AnyEvent::MP>.
328    
329     =head1 AUTHOR
330    
331     Marc Lehmann <schmorp@schmorp.de>
332     http://home.schmorp.de/
333    
334     =cut
335    
336     1
337