ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Global.pm
Revision: 1.65
Committed: Fri Mar 23 21:30:32 2012 UTC (12 years, 2 months ago) by root
Branch: MAIN
Changes since 1.64: +1 -1 lines
Log Message:
*** empty log message ***

File Contents

# Content
1 =head1 NAME
2
3 AnyEvent::MP::Global - network backbone services
4
5 =head1 SYNOPSIS
6
7 use AnyEvent::MP::Global;
8
9 =head1 DESCRIPTION
10
11 This module is usually run (or started on) seed nodes and provides a
12 variety of services to connected nodes, such as the distributed database.
13
14 The global nodes form a fully-meshed network, that is, all global nodes
15 currently maintain connections to all other global nodes.
16
17 Loading this module (e.g. as a service) transforms the local node into a
18 global node. There are no user-servicable parts inside.
19
20 For a limited time, this module also exports some AEMP 1.x compatibility
21 functions (C<grp_reg>, C<grp_get> and C<grp_mon>).
22
23 =cut
24
25 package AnyEvent::MP::Global;
26
27 use common::sense;
28 use Carp ();
29
30 use AnyEvent ();
31
32 use AnyEvent::MP;
33 use AnyEvent::MP::Kernel;
34
35 AE::log 7 => "starting global service.";
36
37 #############################################################################
38 # node protocol parts for global nodes
39
40 package AnyEvent::MP::Kernel;
41
42 # TODO: this is ugly (classical use vars vs. our),
43 # maybe this should go into MP::Kernel
44
45 our %NODE;
46 our $NODE;
47
48 our $GLOBAL;
49 our $MASTER;
50 our $MASTER_MON;
51 our $MASTER_TIMER;
52
53 our %GLOBAL_SLAVE;
54
55 our %GLOBAL_DB; # global db
56 our %LOCAL_DBS; # local databases of other global nodes
57 our %LOCAL_DB; # this node database
58
59 our $SRCNODE; # the origin node id
60 our %NODE_REQ;
61
62 # only in global code
63 our %GLOBAL_MON; # monitors {family}
64
65 # broadcasts a message to all other global nodes
66 sub g_broadcast {
67 snd $_, @_
68 for grep $_ ne $NODE && node_is_up $_, keys %{ $GLOBAL_DB{"'g"} }
69 }
70
71 # add/replace/del inside a family in the database
72 # @$dle must not contain any key in %$set
73 sub g_upd {
74 my ($node, $family, $set, $del) = @_;
75
76 my $ldb = $LOCAL_DBS{$node}{$family} ||= {};
77 my $gdb = $GLOBAL_DB {$family} ||= {};
78
79 # add/replace keys
80 while (my ($k, $v) = each %$set) {
81 $ldb->{$k} =
82 $gdb->{$k} = $v;
83 }
84
85 my @del; # actual deletes
86
87 # take care of deletes
88 keydel:
89 for my $k (@$del) {
90 delete $ldb->{$k};
91 delete $gdb->{$k};
92
93 # check if some other node still has the key, then we don't delete, but change
94 for (values %LOCAL_DBS) {
95 if (exists $_->{$family}{$k}) {
96 $set->{$k} = $gdb->{$k} = $_->{$family}{$k};
97
98 next keydel;
99 }
100 }
101
102 push @del, $k;
103 }
104
105 # family could be empty now
106 delete $GLOBAL_DB{$family} unless %$gdb;
107 delete $LOCAL_DBS{$node}{$family} unless %$ldb;
108
109 g_broadcast g_upd => $family, $set, \@del
110 if exists $GLOBAL_SLAVE{$node};
111
112 # tell subscribers we have changed the family
113 snd $_ => g_chg2 => $family, $set, \@del
114 for keys %{ $GLOBAL_MON{$family} };
115 }
116
117 # set the whole (node-local) database - previous value must be empty
118 sub g_set($$) {
119 my ($node, $db) = @_;
120
121 while (my ($f, $k) = each %$db) {
122 g_upd $node, $f, $k;
123 }
124 }
125
126 # delete all keys from a database
127 sub g_clr($) {
128 my ($node) = @_;
129
130 my $db = $LOCAL_DBS{$node};
131
132 while (my ($f, $k) = each %$db) {
133 g_upd $node, $f, undef, [keys %$k];
134 }
135
136 delete $LOCAL_DBS{$node};
137 }
138
139 # gather node databases from slaves
140
141 # other node wants to make us the master
142 $NODE_REQ{g_slave} = sub {
143 my ($db) = @_
144 or return; # empty g_slave is used to start global service
145
146 my $node = $SRCNODE;
147 undef $GLOBAL_SLAVE{$node};
148 g_set $node, $db;
149 };
150
151 $NODE_REQ{g_set} = sub {
152 &g_set ($SRCNODE, @_);
153 };
154
155 $NODE_REQ{g_upd} = sub {
156 &g_upd ($SRCNODE, @_);
157 };
158
159 $NODE_REQ{g_find} = sub {
160 my ($node) = @_;
161
162 snd $SRCNODE, g_found => $node, $GLOBAL_DB{"'l"}{$node};
163 };
164
165 $NODE_REQ{g_db_family} = sub {
166 my ($family, $id) = @_;
167 snd $SRCNODE, g_reply => $id, $GLOBAL_DB{$family} || {};
168 };
169
170 $NODE_REQ{g_db_keys} = sub {
171 my ($family, $id) = @_;
172 snd $SRCNODE, g_reply => $id, [keys %{ $GLOBAL_DB{$family} } ];
173 };
174
175 $NODE_REQ{g_db_values} = sub {
176 my ($family, $id) = @_;
177 snd $SRCNODE, g_reply => $id, [values %{ $GLOBAL_DB{$family} } ];
178 };
179
180 # monitoring
181
182 sub g_slave_disconnect($) {
183 my ($node) = @_;
184
185 g_clr $node;
186
187 if (my $mon = delete $GLOBAL_SLAVE{$node}) {
188 while (my ($f, $fv) = each %$mon) {
189 delete $GLOBAL_MON{$f}{$_}
190 for keys %$fv;
191
192 delete $GLOBAL_MON{$f}
193 unless %{ $GLOBAL_MON{$f} };
194 }
195 }
196 }
197
198 # g_mon0 family - stop monitoring
199 $NODE_REQ{g_mon0} = sub {
200 delete $GLOBAL_MON{$_[0]}{$SRCNODE};
201 delete $GLOBAL_MON{$_[0]} unless %{ $GLOBAL_MON{$_[0]} };
202
203 delete $GLOBAL_SLAVE{$SRCNODE}{$_[0]};
204 };
205
206 # g_mon1 family key - start monitoring
207 $NODE_REQ{g_mon1} = sub {
208 undef $GLOBAL_SLAVE{$SRCNODE}{$_[0]};
209 undef $GLOBAL_MON{$_[0]}{$SRCNODE};
210
211 snd $SRCNODE, g_chg1 => $_[0], $GLOBAL_DB{$_[0]};
212 };
213
214 #############################################################################
215 # switch to global mode
216
217 # regularly try to connect to global nodes - maybe use seeding code?
218 $MASTER_TIMER = AE::timer $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout}, $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout}, sub {
219 (add_node $_)->connect
220 for keys %{ $GLOBAL_DB{"'g"} }
221 };
222
223 # instantly connect to other global nodes when we learn of them
224 # so we don't have to wait for the timer.
225 db_mon "'g" => sub {
226 (add_node $_)->connect
227 for @{ $_[1] };
228 };
229
230 # delete slaves on node-down
231 # clear slave db on node-down
232 $MASTER_MON = mon_nodes sub {
233 g_slave_disconnect $_[0] unless $_[1];
234 };
235
236 # tell everybody who connects that we are a global node
237 push @AnyEvent::MP::Transport::HOOK_GREET, sub {
238 $_[0]{local_greeting}{global} = 1;
239 };
240
241 # connect from a global node
242 sub g_global_connect {
243 my ($node) = @_;
244
245 # we need to set this currently, as to avoid race conditions
246 # because it takes a while until the other global node tells us it is global.
247
248 undef $GLOBAL_DB{"'g"}{$node};
249 undef $LOCAL_DBS{$node}{"'g"}{$node};
250
251 # global nodes send all local databases, merged,
252 # as their local database to global nodes
253 my %db;
254
255 for (values %LOCAL_DBS) {
256 while (my ($f, $fv) = each %$_) {
257 while (my ($k, $kv) = each %$fv) {
258 $db{$f}{$k} = $kv;
259 }
260 }
261 }
262
263 snd $node => g_set => \%db;
264 }
265
266 # send our database to every global node that connects
267 push @AnyEvent::MP::Transport::HOOK_CONNECT, sub {
268 return unless $_[0]{remote_greeting}{global};
269
270 g_global_connect $_[0]{remote_node};
271 };
272
273 # tell our master that we are global now
274 for (values %NODE) {
275 if ($_->{transport} && $_->{transport}{remote_greeting}{global}) {
276 snd $_->{id} => "g_global";
277 g_global_connect $_->{id};
278 }
279 }
280
281 $NODE_REQ{g_global} = sub {
282 g_slave_disconnect $SRCNODE;
283
284 my $node = $NODE{$SRCNODE};
285 $node->{transport}{remote_greeting}{global} = 1;
286
287 g_global_connect $SRCNODE;
288 };
289
290 # enable global mode
291 $GLOBAL = 1;
292
293 # global nodes are their own masters - this
294 # resends global requests and sets the local database.
295 master_set $NODE;
296
297 # now add us to the set of global nodes
298 db_set "'g" => $NODE => undef;
299
300 #############################################################################
301 # compatibility functions for aemp 1.0
302
303 package AnyEvent::MP::Global;
304
305 use base "Exporter";
306 our @EXPORT = qw(grp_reg grp_get grp_mon);
307
308 sub grp_reg($$) {
309 &db_reg
310 }
311
312 sub grp_get($) {
313 my @ports = keys %{ $AnyEvent::MP::Kernel::GLOBAL_DB{$_[0]} };
314
315 @ports ? \@ports : undef
316 }
317
318 sub grp_mon($$) {
319 my ($grp, $cb) = @_;
320
321 db_mon $grp => sub {
322 my ($ports, $add, $chg, $del) = @_;
323
324 $cb->([keys %$ports], $add, $del);
325 };
326 }
327
328 =head1 SEE ALSO
329
330 L<AnyEvent::MP>.
331
332 =head1 AUTHOR
333
334 Marc Lehmann <schmorp@schmorp.de>
335 http://home.schmorp.de/
336
337 =cut
338
339 1
340