ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Global.pm
Revision: 1.65
Committed: Fri Mar 23 21:30:32 2012 UTC (12 years, 2 months ago) by root
Branch: MAIN
Changes since 1.64: +1 -1 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3 root 1.58 AnyEvent::MP::Global - network backbone services
4 root 1.1
5     =head1 SYNOPSIS
6    
7     use AnyEvent::MP::Global;
8    
9     =head1 DESCRIPTION
10    
11 root 1.58 This module is usually run (or started on) seed nodes and provides a
12     variety of services to connected nodes, such as the distributed database.
13 root 1.1
14 root 1.58 The global nodes form a fully-meshed network, that is, all global nodes
15     currently maintain connections to all other global nodes.
16 root 1.1
17 root 1.58 Loading this module (e.g. as a service) transforms the local node into a
18     global node. There are no user-servicable parts inside.
19 root 1.1
20 root 1.61 For a limited time, this module also exports some AEMP 1.x compatibility
21     functions (C<grp_reg>, C<grp_get> and C<grp_mon>).
22    
23 root 1.1 =cut
24    
25     package AnyEvent::MP::Global;
26    
27     use common::sense;
28     use Carp ();
29    
30 root 1.8 use AnyEvent ();
31 root 1.4
32 root 1.1 use AnyEvent::MP;
33     use AnyEvent::MP::Kernel;
34 root 1.20
35 root 1.60 AE::log 7 => "starting global service.";
36 root 1.5
37 root 1.8 #############################################################################
38 root 1.44 # node protocol parts for global nodes
39 root 1.8
40 root 1.49 package AnyEvent::MP::Kernel;
41 root 1.8
42 root 1.49 # TODO: this is ugly (classical use vars vs. our),
43     # maybe this should go into MP::Kernel
44 root 1.26
45 root 1.49 our %NODE;
46     our $NODE;
47 root 1.46
48 root 1.49 our $GLOBAL;
49     our $MASTER;
50     our $MASTER_MON;
51     our $MASTER_TIMER;
52 root 1.46
53 root 1.49 our %GLOBAL_SLAVE;
54 root 1.26
55 root 1.49 our %GLOBAL_DB; # global db
56     our %LOCAL_DBS; # local databases of other global nodes
57     our %LOCAL_DB; # this node database
58 root 1.46
59 root 1.59 our $SRCNODE; # the origin node id
60 root 1.49 our %NODE_REQ;
61 root 1.46
62 root 1.49 # only in global code
63 root 1.51 our %GLOBAL_MON; # monitors {family}
64 root 1.47
65 root 1.49 # broadcasts a message to all other global nodes
66     sub g_broadcast {
67     snd $_, @_
68 root 1.63 for grep $_ ne $NODE && node_is_up $_, keys %{ $GLOBAL_DB{"'g"} }
69 root 1.49 }
70 root 1.46
71 root 1.57 # add/replace/del inside a family in the database
72     # @$dle must not contain any key in %$set
73     sub g_upd {
74     my ($node, $family, $set, $del) = @_;
75    
76     my $ldb = $LOCAL_DBS{$node}{$family} ||= {};
77     my $gdb = $GLOBAL_DB {$family} ||= {};
78    
79     # add/replace keys
80     while (my ($k, $v) = each %$set) {
81     $ldb->{$k} =
82     $gdb->{$k} = $v;
83     }
84    
85     my @del; # actual deletes
86    
87     # take care of deletes
88     keydel:
89     for my $k (@$del) {
90     delete $ldb->{$k};
91     delete $gdb->{$k};
92    
93     # check if some other node still has the key, then we don't delete, but change
94     for (values %LOCAL_DBS) {
95     if (exists $_->{$family}{$k}) {
96     $set->{$k} = $gdb->{$k} = $_->{$family}{$k};
97 root 1.46
98 root 1.57 next keydel;
99     }
100     }
101 root 1.46
102 root 1.57 push @del, $k;
103     }
104 root 1.51
105 root 1.57 # family could be empty now
106     delete $GLOBAL_DB{$family} unless %$gdb;
107     delete $LOCAL_DBS{$node}{$family} unless %$ldb;
108 root 1.51
109 root 1.57 g_broadcast g_upd => $family, $set, \@del
110     if exists $GLOBAL_SLAVE{$node};
111 root 1.46
112 root 1.57 # tell subscribers we have changed the family
113     snd $_ => g_chg2 => $family, $set, \@del
114     for keys %{ $GLOBAL_MON{$family} };
115 root 1.51 }
116    
117     # set the whole (node-local) database - previous value must be empty
118     sub g_set($$) {
119     my ($node, $db) = @_;
120    
121     while (my ($f, $k) = each %$db) {
122 root 1.57 g_upd $node, $f, $k;
123 root 1.51 }
124 root 1.49 }
125 root 1.46
126 root 1.49 # delete all keys from a database
127     sub g_clr($) {
128     my ($node) = @_;
129    
130     my $db = $LOCAL_DBS{$node};
131 root 1.57
132 root 1.49 while (my ($f, $k) = each %$db) {
133 root 1.57 g_upd $node, $f, undef, [keys %$k];
134 root 1.46 }
135    
136 root 1.49 delete $LOCAL_DBS{$node};
137     }
138    
139     # gather node databases from slaves
140 root 1.46
141 root 1.49 # other node wants to make us the master
142     $NODE_REQ{g_slave} = sub {
143 root 1.58 my ($db) = @_
144     or return; # empty g_slave is used to start global service
145 root 1.46
146 root 1.59 my $node = $SRCNODE;
147 root 1.49 undef $GLOBAL_SLAVE{$node};
148     g_set $node, $db;
149     };
150 root 1.46
151 root 1.57 $NODE_REQ{g_set} = sub {
152 root 1.61 &g_set ($SRCNODE, @_);
153 root 1.49 };
154 root 1.46
155 root 1.57 $NODE_REQ{g_upd} = sub {
156 root 1.61 &g_upd ($SRCNODE, @_);
157 root 1.49 };
158 root 1.46
159 root 1.49 $NODE_REQ{g_find} = sub {
160     my ($node) = @_;
161 root 1.46
162 root 1.59 snd $SRCNODE, g_found => $node, $GLOBAL_DB{"'l"}{$node};
163 root 1.49 };
164 root 1.46
165 root 1.55 $NODE_REQ{g_db_family} = sub {
166     my ($family, $id) = @_;
167 root 1.59 snd $SRCNODE, g_reply => $id, $GLOBAL_DB{$family} || {};
168 root 1.55 };
169    
170     $NODE_REQ{g_db_keys} = sub {
171     my ($family, $id) = @_;
172 root 1.59 snd $SRCNODE, g_reply => $id, [keys %{ $GLOBAL_DB{$family} } ];
173 root 1.55 };
174    
175     $NODE_REQ{g_db_values} = sub {
176     my ($family, $id) = @_;
177 root 1.59 snd $SRCNODE, g_reply => $id, [values %{ $GLOBAL_DB{$family} } ];
178 root 1.55 };
179    
180 root 1.49 # monitoring
181 root 1.47
182 root 1.49 sub g_slave_disconnect($) {
183     my ($node) = @_;
184 root 1.47
185 root 1.49 g_clr $node;
186 root 1.47
187 root 1.49 if (my $mon = delete $GLOBAL_SLAVE{$node}) {
188     while (my ($f, $fv) = each %$mon) {
189     delete $GLOBAL_MON{$f}{$_}
190     for keys %$fv;
191 root 1.51
192     delete $GLOBAL_MON{$f}
193     unless %{ $GLOBAL_MON{$f} };
194 root 1.47 }
195     }
196 root 1.49 }
197    
198 root 1.51 # g_mon0 family - stop monitoring
199 root 1.49 $NODE_REQ{g_mon0} = sub {
200 root 1.59 delete $GLOBAL_MON{$_[0]}{$SRCNODE};
201 root 1.51 delete $GLOBAL_MON{$_[0]} unless %{ $GLOBAL_MON{$_[0]} };
202    
203 root 1.59 delete $GLOBAL_SLAVE{$SRCNODE}{$_[0]};
204 root 1.49 };
205    
206     # g_mon1 family key - start monitoring
207     $NODE_REQ{g_mon1} = sub {
208 root 1.59 undef $GLOBAL_SLAVE{$SRCNODE}{$_[0]};
209     undef $GLOBAL_MON{$_[0]}{$SRCNODE};
210 root 1.49
211 root 1.59 snd $SRCNODE, g_chg1 => $_[0], $GLOBAL_DB{$_[0]};
212 root 1.49 };
213 root 1.47
214 root 1.49 #############################################################################
215     # switch to global mode
216    
217     # regularly try to connect to global nodes - maybe use seeding code?
218 root 1.65 $MASTER_TIMER = AE::timer $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout}, $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout}, sub {
219 root 1.49 (add_node $_)->connect
220 root 1.63 for keys %{ $GLOBAL_DB{"'g"} }
221 root 1.49 };
222    
223     # instantly connect to other global nodes when we learn of them
224     # so we don't have to wait for the timer.
225 root 1.64 db_mon "'g" => sub {
226     (add_node $_)->connect
227     for @{ $_[1] };
228     };
229 root 1.47
230 root 1.49 # delete slaves on node-down
231     # clear slave db on node-down
232     $MASTER_MON = mon_nodes sub {
233     g_slave_disconnect $_[0] unless $_[1];
234     };
235    
236     # tell everybody who connects that we are a global node
237     push @AnyEvent::MP::Transport::HOOK_GREET, sub {
238     $_[0]{local_greeting}{global} = 1;
239     };
240    
241     # connect from a global node
242     sub g_global_connect {
243     my ($node) = @_;
244    
245     # we need to set this currently, as to avoid race conditions
246     # because it takes a while until the other global node tells us it is global.
247    
248     undef $GLOBAL_DB{"'g"}{$node};
249     undef $LOCAL_DBS{$node}{"'g"}{$node};
250    
251     # global nodes send all local databases, merged,
252     # as their local database to global nodes
253     my %db;
254    
255     for (values %LOCAL_DBS) {
256     while (my ($f, $fv) = each %$_) {
257     while (my ($k, $kv) = each %$fv) {
258     $db{$f}{$k} = $kv;
259 root 1.46 }
260     }
261 root 1.47 }
262    
263 root 1.49 snd $node => g_set => \%db;
264 root 1.4 }
265    
266 root 1.49 # send our database to every global node that connects
267     push @AnyEvent::MP::Transport::HOOK_CONNECT, sub {
268     return unless $_[0]{remote_greeting}{global};
269    
270     g_global_connect $_[0]{remote_node};
271     };
272    
273     # tell our master that we are global now
274     for (values %NODE) {
275     if ($_->{transport} && $_->{transport}{remote_greeting}{global}) {
276     snd $_->{id} => "g_global";
277     g_global_connect $_->{id};
278     }
279 root 1.20 }
280    
281 root 1.49 $NODE_REQ{g_global} = sub {
282 root 1.59 g_slave_disconnect $SRCNODE;
283    
284     my $node = $NODE{$SRCNODE};
285     $node->{transport}{remote_greeting}{global} = 1;
286    
287     g_global_connect $SRCNODE;
288 root 1.49 };
289 root 1.20
290 root 1.56 # enable global mode
291     $GLOBAL = 1;
292    
293     # global nodes are their own masters - this
294     # resends global requests and sets the local database.
295     master_set $NODE;
296    
297 root 1.49 # now add us to the set of global nodes
298     db_set "'g" => $NODE => undef;
299 root 1.3
300 root 1.58 #############################################################################
301     # compatibility functions for aemp 1.0
302    
303 root 1.61 package AnyEvent::MP::Global;
304    
305     use base "Exporter";
306     our @EXPORT = qw(grp_reg grp_get grp_mon);
307    
308     sub grp_reg($$) {
309     &db_reg
310     }
311    
312     sub grp_get($) {
313     my @ports = keys %{ $AnyEvent::MP::Kernel::GLOBAL_DB{$_[0]} };
314    
315     @ports ? \@ports : undef
316     }
317    
318     sub grp_mon($$) {
319     my ($grp, $cb) = @_;
320    
321     db_mon $grp => sub {
322     my ($ports, $add, $chg, $del) = @_;
323    
324     $cb->([keys %$ports], $add, $del);
325     };
326     }
327 root 1.1
328     =head1 SEE ALSO
329    
330     L<AnyEvent::MP>.
331    
332     =head1 AUTHOR
333    
334     Marc Lehmann <schmorp@schmorp.de>
335     http://home.schmorp.de/
336    
337     =cut
338    
339     1
340