ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Global.pm
Revision: 1.39
Committed: Sat Mar 13 20:29:04 2010 UTC (14 years, 2 months ago) by root
Branch: MAIN
CVS Tags: rel-1_28, rel-1_27
Changes since 1.38: +1 -1 lines
Log Message:
µopts

File Contents

# Content
1 =head1 NAME
2
3 AnyEvent::MP::Global - some network-global services
4
5 =head1 SYNOPSIS
6
7 use AnyEvent::MP::Global;
8
9 =head1 DESCRIPTION
10
11 This module maintains a fully-meshed network, if possible, and tries to
12 ensure that we are connected to at least one other node.
13
14 It also manages named port groups - ports can register themselves in any
15 number of groups that will be available network-wide, which is great for
16 discovering services.
17
18 Running it on one node will automatically run it on all nodes, although,
19 at the moment, the global service is started by default anyways.
20
21 =head1 GLOBALS AND FUNCTIONS
22
23 =over 4
24
25 =cut
26
27 package AnyEvent::MP::Global;
28
29 use common::sense;
30 use Carp ();
31
32 use AnyEvent ();
33 use AnyEvent::Util ();
34
35 use AnyEvent::MP;
36 use AnyEvent::MP::Kernel;
37 use AnyEvent::MP::Transport ();
38
39 use base "Exporter";
40
41 our @EXPORT = qw(
42 grp_reg
43 grp_get
44 grp_mon
45 );
46
47 our $GLOBAL_VERSION = 0;
48
49 our %ON_SETUP; # take note: not public
50
51 our %addr; # port ID => [address...] mapping
52
53 our %port; # our rendezvous port on the other side
54 our %lreg; # local registry, name => [pid...]
55 our %lmon; # local registry monitoring name,pid => mon
56 our %greg; # global regstry, name => pid => undef
57 our %gmon; # group monitoring, group => [$cb...]
58
59 our $nodecnt;
60
61 $AnyEvent::MP::Kernel::WARN->(7, "starting global service.");
62
63 #############################################################################
64 # seednodes
65
66 our $MASTER; # our current master (which we regularly query for net updates)
67
68 our %SEEDME; # $node => $port
69 our @SEEDS;
70 our %SEEDS; # just to check whether a seed is a seed
71 our %SEED_CONNECT;
72 our $SEED_WATCHER;
73
74 push @AnyEvent::MP::Transport::HOOK_CONNECT, sub {
75 my $peer = $_[0]{local_greeting}{peeraddr};
76 return unless exists $SEEDS{$peer};
77 $SEED_CONNECT{$peer} = 2;
78 };
79
80 push @AnyEvent::MP::Transport::HOOK_GREETING, sub {
81 # we rely on untrusted data here (the remote node name)
82 # this is hopefully ok, as we override it on successful
83 # connects, and this can at most be used for DOSing,
84 # which is easy when you can do MITM.
85 my $peer = $_[0]{local_greeting}{peeraddr};
86 return unless exists $SEEDS{$peer};
87 $SEEDS{$peer} ||= $_[0]{remote_node};
88 };
89
90 push @AnyEvent::MP::Transport::HOOK_CONNECTED, sub {
91 my $peer = $_[0]{local_greeting}{peeraddr};
92 return unless exists $SEEDS{$peer};
93 $SEEDS{$peer} = $_[0]{remote_node};
94 };
95
96 push @AnyEvent::MP::Transport::HOOK_DESTROY, sub {
97 delete $SEED_CONNECT{$_[0]{local_greeting}{peeraddr}};
98
99 # check if we contacted ourselves, so nuke this seed
100 if (exists $_[0]{seed} && $_[0]{remote_node} eq $AnyEvent::MP::Kernel::NODE) {
101 # $AnyEvent::MP::Kernel::WARN->(0,"avoiding seed $_[0]{seed}\n");#d#
102 delete $SEEDS{$_[0]{seed}};
103 }
104 };
105
106 sub seed_connect {
107 my ($seed) = @_;
108
109 my ($host, $port) = AnyEvent::Socket::parse_hostport $seed
110 or Carp::croak "$seed: unparsable seed address";
111
112 return if $SEED_CONNECT{$seed};
113 return if defined $SEEDS{$seed} && node_is_up $SEEDS{$seed};
114
115 $AnyEvent::MP::Kernel::WARN->(9, "trying connect to seed node $seed.");
116
117 # ughhh
118 $SEED_CONNECT{$seed} ||= AnyEvent::MP::Transport::mp_connect $host, $port,
119 seed => $seed,
120 sub {
121 $SEED_CONNECT{$seed} = 1;
122 },
123 ;
124 }
125
126 sub more_seeding {
127 my $int = List::Util::max 1,
128 $AnyEvent::MP::Kernel::CONFIG->{connect_interval}
129 * ($nodecnt ? keys %AnyEvent::MP::Kernel::NODE : 1)
130 - rand;
131
132 $SEED_WATCHER = AE::timer $int, 0, \&more_seeding;
133
134 @SEEDS = keys %SEEDS unless @SEEDS;
135 return unless @SEEDS;
136
137 seed_connect splice @SEEDS, rand @SEEDS, 1;
138 }
139
140 sub set_seeds(@) {
141 @SEEDS{@_} = ();
142
143 $SEED_WATCHER ||= AE::timer 5, $AnyEvent::MP::Kernel::MONITOR_TIMEOUT, \&more_seeding;
144
145 after 0.100 * rand, \&more_seeding
146 for 1 .. keys %SEEDS;
147 }
148
149 sub up_seeds() {
150 grep node_is_up $_, values %SEEDS
151 }
152
153 sub node_is_seed($) {
154 grep $_ eq $_[0], grep defined, values %SEEDS
155 }
156
157 # returns all (up) seed nodes, or all nodes if no seednodes are up/known
158 sub route_nodes {
159 my @seeds = up_seeds;
160 @seeds = up_nodes unless @seeds;
161 @seeds
162 }
163
164 #############################################################################
165
166 sub _change {
167 my ($group, $add, $del) = @_;
168
169 my $kv = $greg{$group} ||= {};
170
171 delete @$kv{@$del};
172 @$kv{@$add} = ();
173
174 my $ports = [keys %$kv];
175 $_->($ports, $add, $del) for @{ $gmon{$group} };
176 }
177
178 sub unreg_groups($) {
179 my ($node) = @_;
180
181 my $qr = qr/^\Q$node\E(?:#|$)/;
182 my @del;
183
184 while (my ($group, $ports) = each %greg) {
185 @del = grep /$qr/, keys %$ports;
186 _change $group, [], \@del
187 if @del;
188 }
189 }
190
191 sub set_groups($$) {
192 my ($node, $lreg) = @_;
193
194 while (my ($k, $v) = each %$lreg) {
195 _change $k, $v, [];
196 }
197 }
198
199 =item $guard = grp_reg $group, $port
200
201 Register the given (local!) port in the named global group C<$group>.
202
203 The port will be unregistered automatically when the port is destroyed.
204
205 When not called in void context, then a guard object will be returned that
206 will also cause the name to be unregistered when destroyed.
207
208 =cut
209
210 # unregister local port
211 sub unregister {
212 my ($port, $group) = @_;
213
214 delete $lmon{"$group\x00$port"};
215 @{ $lreg{$group} } = grep $_ ne $port, @{ $lreg{$group} };
216
217 _change $group, [], [$port];
218
219 snd $_, reg0 => $group, $port
220 for values %port;
221 }
222
223 # register local port
224 sub grp_reg($$) {
225 my ($group, $port) = @_;
226
227 port_is_local $port
228 or Carp::croak "AnyEvent::MP::Global::grp_reg can only be called for local ports, caught";
229
230 grep $_ eq $port, @{ $lreg{$group} }
231 and Carp::croak "'$group': group already registered, cannot register a second time";
232
233 $lmon{"$group\x00$port"} = mon $port, sub { unregister $port, $group };
234 push @{ $lreg{$group} }, $port;
235
236 snd $_, reg1 => $group, $port
237 for values %port;
238
239 _change $group, [$port], [];
240
241 defined wantarray && AnyEvent::Util::guard { unregister $port, $group }
242 }
243
244 =item $ports = grp_get $group
245
246 Returns all the ports currently registered to the given group (as
247 read-only(!) array reference). When the group has no registered members,
248 return C<undef>.
249
250 =cut
251
252 sub grp_get($) {
253 my @ports = keys %{ $greg{$_[0]} };
254 @ports ? \@ports : undef
255 }
256
257 =item $guard = grp_mon $group, $callback->($ports, $add, $del)
258
259 Installs a monitor on the given group. Each time there is a change it
260 will be called with the current group members as an arrayref as the
261 first argument. The second argument only contains ports added, the third
262 argument only ports removed.
263
264 Unlike C<grp_get>, all three arguments will always be array-refs, even if
265 the array is empty. None of the arrays must be modified in any way.
266
267 The first invocation will be with the first two arguments set to the
268 current members, as if all of them were just added, but only when the
269 group is actually non-empty.
270
271 Optionally returns a guard object that uninstalls the watcher when it is
272 destroyed.
273
274 =cut
275
276 sub grp_mon($$) {
277 my ($grp, $cb) = @_;
278
279 AnyEvent::MP::Kernel::delay sub {
280 return unless $cb;
281
282 push @{ $gmon{$grp} }, $cb;
283 $cb->(((grp_get $grp) || return) x 2, []);
284 };
285
286 defined wantarray && AnyEvent::Util::guard {
287 my @mon = grep $_ != $cb, @{ delete $gmon{$grp} };
288 $gmon{$grp} = \@mon if @mon;
289 undef $cb;
290 }
291 }
292
293 sub start_node {
294 my ($node) = @_;
295
296 return if exists $port{$node};
297 return if $node eq $NODE; # do not connect to ourselves
298
299 # establish connection
300 my $port = $port{$node} = spawn $node, "AnyEvent::MP::Global::connect", $GLOBAL_VERSION, $NODE;
301
302 mon $port, sub {
303 unreg_groups $node;
304 delete $port{$node};
305 };
306
307 snd $port, addr => $AnyEvent::MP::Kernel::LISTENER;
308 snd $port, nodes => \%addr if %addr;
309 snd $port, set => \%lreg if %lreg;
310 snd $port, "setup"; # tell the other side that we are in business now
311 }
312
313 # other nodes connect via this
314 sub connect {
315 my ($version, $node) = @_;
316
317 (int $version) == (int $GLOBAL_VERSION)
318 or die "node version mismatch (requested $version; we have $GLOBAL_VERSION)";
319
320 # monitor them, silently die
321 mon $node, psub {
322 delete $SEEDME{$node};
323 kil $SELF;
324 };
325
326 rcv $SELF,
327 setup => sub {
328 $_->($node) for values %ON_SETUP;
329 },
330 addr => sub {
331 my $addresses = shift;
332 $AnyEvent::MP::Kernel::WARN->(9, "$node told us its addresses (@$addresses).");
333 $addr{$node} = $addresses;
334
335 # delay broadcast by a random amount, to avoid nodes connecting to each other
336 # at the same time.
337 after 1 + rand 2, sub {
338 for my $slave (keys %SEEDME) {
339 snd $port{$slave}, nodes => { $node => $addresses };
340 }
341 };
342 },
343 nodes => sub {
344 my ($kv) = @_;
345
346 use JSON::XS;#d#
347 my $kv_txt = JSON::XS->new->encode ($kv);#d#
348 $AnyEvent::MP::Kernel::WARN->(9, "$node told us it knows about $kv_txt.");#d#
349
350 while (my ($id, $addresses) = each %$kv) {
351 my $node = AnyEvent::MP::Kernel::add_node $id;
352 $node->connect (@$addresses);
353 start_node $id;
354 }
355 },
356 set => sub {
357 set_groups $node, shift;
358 },
359 find => sub {
360 my ($othernode) = @_;
361
362 $AnyEvent::MP::Kernel::WARN->(9, "$node asked us to find $othernode.");
363 snd $port{$node}, nodes => { $othernode => $addr{$othernode} }
364 if $addr{$othernode};
365 },
366 reg0 => sub {
367 _change $_[0], [], [$_[1]];
368 },
369 reg1 => sub {
370 _change $_[0], [$_[1]], [];
371 },
372
373 # some node asks us to provide network updates
374 seedme0 => sub {
375 $AnyEvent::MP::Kernel::WARN->(9, "$node asked us to NOT seed it.");#d#
376 delete $SEEDME{$node};
377 },
378 seedme1 => sub {
379 $AnyEvent::MP::Kernel::WARN->(9, "$node asked us to seed it.");#d#
380 $SEEDME{$node} = ();
381
382 # for good measure
383 snd $port{$node}, nodes => \%addr if %addr;
384 },
385 ;
386 }
387
388 sub set_master($) {
389 return if $MASTER eq $_[0];
390
391 snd $port{$MASTER}, "seedme0"
392 if $MASTER && node_is_up $MASTER;
393
394 $MASTER = $_[0];
395
396 if ($MASTER) {
397 snd $port{$MASTER}, "seedme1";
398 $AnyEvent::MP::Kernel::WARN->(7, "selected new master: $MASTER.");
399 } else {
400 $AnyEvent::MP::Kernel::WARN->(1, "no contact to any other node, cannot seed.");
401 }
402 }
403
404 sub mon_node {
405 my ($node, $is_up) = @_;
406
407 if ($is_up) {
408 ++$nodecnt;
409 start_node $node;
410
411 if (node_is_seed $node) {
412 if (node_is_seed $MASTER) {
413 my @SEEDS = up_seeds;
414
415 # switching here with lower chance roughly hopefully still gives us
416 # an equal selection.
417 set_master $node
418 if 1 < rand @SEEDS;
419 } else {
420 # a seed always beats a non-seed
421 set_master $node;
422 }
423 }
424 }
425
426 # select a new(?) master, if required
427 unless ($MASTER and node_is_up $MASTER) {
428 if (my @SEEDS = up_seeds) {
429 set_master $SEEDS[rand @SEEDS];
430 } else {
431 # select "last" non-seed node
432 set_master +(sort +up_nodes)[-1];
433 }
434 }
435
436 unless ($is_up) {
437 --$nodecnt;
438 more_seeding unless $nodecnt;
439 unreg_groups $node;
440
441 # forget about the node
442 delete $addr{$node};
443
444 # ask our master for quick recovery
445 snd $port{$MASTER}, find => $node
446 if $MASTER;
447 }
448 }
449
450 mon_node $_, 1
451 for up_nodes;
452
453 mon_nodes \&mon_node;
454
455 =back
456
457 =head1 SEE ALSO
458
459 L<AnyEvent::MP>.
460
461 =head1 AUTHOR
462
463 Marc Lehmann <schmorp@schmorp.de>
464 http://home.schmorp.de/
465
466 =cut
467
468 1
469