ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Global.pm
Revision: 1.33
Committed: Wed Nov 4 16:38:44 2009 UTC (14 years, 6 months ago) by elmex
Branch: MAIN
Changes since 1.32: +1 -1 lines
Log Message:
also return guard in scalar context.

File Contents

# Content
1 =head1 NAME
2
3 AnyEvent::MP::Global - some network-global services
4
5 =head1 SYNOPSIS
6
7 use AnyEvent::MP::Global;
8
9 =head1 DESCRIPTION
10
11 This module maintains a fully-meshed network, if possible, and tries to
12 ensure that we are connected to at least one other node.
13
14 It also manages named port groups - ports can register themselves in any
15 number of groups that will be available network-wide, which is great for
16 discovering services.
17
18 Running it on one node will automatically run it on all nodes, although,
19 at the moment, the global service is started by default anyways.
20
21 =head1 GLOBALS AND FUNCTIONS
22
23 =over 4
24
25 =cut
26
27 package AnyEvent::MP::Global;
28
29 use common::sense;
30 use Carp ();
31
32 use AnyEvent ();
33 use AnyEvent::Util ();
34
35 use AnyEvent::MP;
36 use AnyEvent::MP::Kernel;
37 use AnyEvent::MP::Transport ();
38
39 use base "Exporter";
40
41 our @EXPORT = qw(
42 grp_reg
43 grp_get
44 grp_mon
45 );
46
47 our $GLOBAL_VERSION = 0;
48
49 our %addr; # port ID => [address...] mapping
50
51 our %port; # our rendezvous port on the other side
52 our %lreg; # local registry, name => [pid...]
53 our %lmon; # local registry monitoring name,pid => mon
54 our %greg; # global regstry, name => pid => undef
55 our %gmon; # group monitoring, group => [$cb...]
56
57 our $nodecnt;
58
59 $AnyEvent::MP::Kernel::WARN->(7, "starting global service.");
60
61 #############################################################################
62 # seednodes
63
64 our $MASTER; # our current master (which we regularly query for net updates)
65
66 our %SEEDME; # $node => $port
67 our @SEEDS;
68 our %SEEDS; # just to check whether a seed is a seed
69 our %SEED_CONNECT;
70 our $SEED_WATCHER;
71
72 push @AnyEvent::MP::Transport::HOOK_CONNECT, sub {
73 my $peer = $_[0]{local_greeting}{peeraddr};
74 return unless exists $SEEDS{$peer};
75 $SEED_CONNECT{$peer} = 2;
76 };
77
78 push @AnyEvent::MP::Transport::HOOK_GREETING, sub {
79 # we rely on untrusted data here (the remote node name)
80 # this is hopefully ok, as we override it on successful
81 # connects, and this can at most be used for DOSing,
82 # which is easy when you can do MITM.
83 my $peer = $_[0]{local_greeting}{peeraddr};
84 return unless exists $SEEDS{$peer};
85 $SEEDS{$peer} ||= $_[0]{remote_node};
86 };
87
88 push @AnyEvent::MP::Transport::HOOK_CONNECTED, sub {
89 my $peer = $_[0]{local_greeting}{peeraddr};
90 return unless exists $SEEDS{$peer};
91 $SEEDS{$peer} = $_[0]{remote_node};
92 };
93
94 push @AnyEvent::MP::Transport::HOOK_DESTROY, sub {
95 delete $SEED_CONNECT{$_[0]{local_greeting}{peeraddr}};
96
97 # check if we contacted ourselves, so nuke this seed
98 if (exists $_[0]{seed} && $_[0]{remote_node} eq $AnyEvent::MP::Kernel::NODE) {
99 # $AnyEvent::MP::Kernel::WARN->(0,"avoiding seed $_[0]{seed}\n");#d#
100 delete $SEEDS{$_[0]{seed}};
101 }
102 };
103
104 sub seed_connect {
105 my ($seed) = @_;
106
107 my ($host, $port) = AnyEvent::Socket::parse_hostport $seed
108 or Carp::croak "$seed: unparsable seed address";
109
110 return if $SEED_CONNECT{$seed};
111 return if defined $SEEDS{$seed} && node_is_up $SEEDS{$seed};
112
113 $AnyEvent::MP::Kernel::WARN->(9, "trying connect to seed node $seed.");
114
115 # ughhh
116 $SEED_CONNECT{$seed} ||= AnyEvent::MP::Transport::mp_connect $host, $port,
117 seed => $seed,
118 sub {
119 $SEED_CONNECT{$seed} = 1;
120 },
121 ;
122 }
123
124 sub more_seeding {
125 my $int = List::Util::max 1,
126 $AnyEvent::MP::Kernel::CONFIG->{connect_interval}
127 * ($nodecnt ? keys %AnyEvent::MP::Kernel::NODE : 1)
128 - rand;
129
130 $SEED_WATCHER = AE::timer $int, 0, \&more_seeding;
131
132 @SEEDS = keys %SEEDS unless @SEEDS;
133 return unless @SEEDS;
134
135 seed_connect splice @SEEDS, rand @SEEDS, 1;
136 }
137
138 sub set_seeds(@) {
139 @SEEDS{@_} = ();
140
141 $SEED_WATCHER ||= AE::timer 5, $AnyEvent::MP::Kernel::MONITOR_TIMEOUT, \&more_seeding;
142
143 after 0.100 * rand, \&more_seeding
144 for 1 .. keys %SEEDS;
145 }
146
147 sub up_seeds() {
148 grep node_is_up $_, values %SEEDS
149 }
150
151 sub node_is_seed($) {
152 grep $_ eq $_[0], grep defined, values %SEEDS
153 }
154
155 # returns all (up) seed nodes, or all nodes if no seednodes are up/known
156 sub route_nodes {
157 my @seeds = up_seeds;
158 @seeds = up_nodes unless @seeds;
159 @seeds
160 }
161
162 #############################################################################
163
164 sub _change {
165 my ($group, $add, $del) = @_;
166
167 my $kv = $greg{$group} ||= {};
168
169 delete @$kv{@$del};
170 @$kv{@$add} = ();
171
172 my $ports = [keys %$kv];
173 $_->($ports, $add, $del) for @{ $gmon{$group} };
174 }
175
176 sub unreg_groups($) {
177 my ($node) = @_;
178
179 my $qr = qr/^\Q$node\E(?:#|$)/;
180 my @del;
181
182 while (my ($group, $ports) = each %greg) {
183 @del = grep /$qr/, keys %$ports;
184 _change $group, [], \@del
185 if @del;
186 }
187 }
188
189 sub set_groups($$) {
190 my ($node, $lreg) = @_;
191
192 while (my ($k, $v) = each %$lreg) {
193 _change $k, $v, [];
194 }
195 }
196
197 =item $guard = grp_reg $group, $port
198
199 Register the given (local!) port in the named global group C<$group>.
200
201 The port will be unregistered automatically when the port is destroyed.
202
203 When not called in void context, then a guard object will be returned that
204 will also cause the name to be unregistered when destroyed.
205
206 =cut
207
208 # unregister local port
209 sub unregister {
210 my ($port, $group) = @_;
211
212 delete $lmon{"$group\x00$port"};
213 @{ $lreg{$group} } = grep $_ ne $port, @{ $lreg{$group} };
214
215 _change $group, [], [$port];
216
217 snd $_, reg0 => $group, $port
218 for values %port;
219 }
220
221 # register local port
222 sub grp_reg($$) {
223 my ($group, $port) = @_;
224
225 port_is_local $port
226 or Carp::croak "AnyEvent::MP::Global::grp_reg can only be called for local ports, caught";
227
228 grep $_ eq $port, @{ $lreg{$group} }
229 and Carp::croak "'$group': group already registered, cannot register a second time";
230
231 $lmon{"$group\x00$port"} = mon $port, sub { unregister $port, $group };
232 push @{ $lreg{$group} }, $port;
233
234 snd $_, reg1 => $group, $port
235 for values %port;
236
237 _change $group, [$port], [];
238
239 defined wantarray && AnyEvent::Util::guard { unregister $port, $group }
240 }
241
242 =item $ports = grp_get $group
243
244 Returns all the ports currently registered to the given group (as
245 read-only(!) array reference). When the group has no registered members,
246 return C<undef>.
247
248 =cut
249
250 sub grp_get($) {
251 my @ports = keys %{ $greg{$_[0]} };
252 @ports ? \@ports : undef
253 }
254
255 =item $guard = grp_mon $group, $callback->($ports, $add, $del)
256
257 Installs a monitor on the given group. Each time there is a change it
258 will be called with the current group members as an arrayref as the
259 first argument. The second argument only contains ports added, the third
260 argument only ports removed.
261
262 Unlike C<grp_get>, all three arguments will always be array-refs, even if
263 the array is empty. None of the arrays must be modified in any way.
264
265 The first invocation will be with the first two arguments set to the
266 current members, as if all of them were just added, but only when the
267 group is atcually non-empty.
268
269 Optionally returns a guard object that uninstalls the watcher when it is
270 destroyed.
271
272 =cut
273
274 sub grp_mon($$) {
275 my ($grp, $cb) = @_;
276
277 AnyEvent::MP::Kernel::delay sub {
278 return unless $cb;
279
280 push @{ $gmon{$grp} }, $cb;
281 $cb->(((grp_get $grp) || return) x 2, []);
282 };
283
284 defined wantarray && AnyEvent::Util::guard {
285 my @mon = grep $_ != $cb, @{ delete $gmon{$grp} };
286 $gmon{$grp} = \@mon if @mon;
287 undef $cb;
288 }
289 }
290
291 sub start_node {
292 my ($node) = @_;
293
294 return if exists $port{$node};
295 return if $node eq $NODE; # do not connect to ourselves
296
297 # establish connection
298 my $port = $port{$node} = spawn $node, "AnyEvent::MP::Global::connect", $GLOBAL_VERSION, $NODE;
299
300 mon $port, sub {
301 unreg_groups $node;
302 delete $port{$node};
303 };
304
305 snd $port, addr => $AnyEvent::MP::Kernel::LISTENER;
306 snd $port, nodes => \%addr if %addr;
307 snd $port, set => \%lreg if %lreg;
308 }
309
310 # other nodes connect via this
311 sub connect {
312 my ($version, $node) = @_;
313
314 (int $version) == (int $GLOBAL_VERSION)
315 or die "node version mismatch (requested $version; we have $GLOBAL_VERSION)";
316
317 # monitor them, silently die
318 mon $node, psub {
319 delete $SEEDME{$node};
320 kil $SELF;
321 };
322
323 rcv $SELF,
324 addr => sub {
325 my $addresses = shift;
326 $AnyEvent::MP::Kernel::WARN->(9, "$node told us its addresses (@$addresses).");
327 $addr{$node} = $addresses;
328
329 for my $node (keys %SEEDME) {
330 my $port = $port{$node};
331 snd $port, nodes => { $node => $addresses };
332 }
333 },
334 nodes => sub {
335 my ($kv) = @_;
336
337 use JSON::XS;#d#
338 my $kv_txt = JSON::XS->new->encode ($kv);#d#
339 $AnyEvent::MP::Kernel::WARN->(9, "$node told us it knows about $kv_txt.");#d#
340
341 while (my ($id, $addresses) = each %$kv) {
342 my $node = AnyEvent::MP::Kernel::add_node $id;
343 $node->connect (@$addresses);
344 start_node $id;
345 }
346 },
347 find => sub {
348 my ($othernode) = @_;
349
350 $AnyEvent::MP::Kernel::WARN->(9, "$node asked us to find $othernode.");
351 snd $port{$node}, nodes => { $othernode => $addr{$othernode} }
352 if $addr{$othernode};
353 },
354 set => sub {
355 set_groups $node, shift;
356 },
357 reg0 => sub {
358 _change $_[0], [], [$_[1]];
359 },
360 reg1 => sub {
361 _change $_[0], [$_[1]], [];
362 },
363
364 # some node asks us to provide network updates
365 seedme0 => sub {
366 $AnyEvent::MP::Kernel::WARN->(0, "$node asked us to NOT seed it.");#d#
367 delete $SEEDME{$node};
368 },
369 seedme1 => sub {
370 $AnyEvent::MP::Kernel::WARN->(0, "$node asked us to seed it.");#d#
371 $SEEDME{$node} = ();
372
373 # for good measure
374 snd $port{$node}, nodes => \%addr if %addr;
375 },
376 ;
377 }
378
379 sub set_master($) {
380 return if $MASTER eq $_[0];
381
382 snd $port{$MASTER}, "seedme0"
383 if $MASTER && node_is_up $MASTER;
384
385 $MASTER = $_[0];
386
387 if ($MASTER) {
388 snd $port{$MASTER}, "seedme1";
389 $AnyEvent::MP::Kernel::WARN->(7, "selected new master: $MASTER.");
390 } else {
391 $AnyEvent::MP::Kernel::WARN->(1, "no contact to any other node.");
392 }
393 }
394
395 sub mon_node {
396 my ($node, $is_up) = @_;
397
398 if ($is_up) {
399 ++$nodecnt;
400 start_node $node;
401
402 if (node_is_seed $node) {
403 if (node_is_seed $MASTER) {
404 my @SEEDS = up_seeds;
405
406 # switching here with lower chance roughly hopefully still gives us
407 # an equal selection.
408 set_master $node
409 if 1 < rand @SEEDS;
410 } else {
411 # a seed always beats a non-seed
412 set_master $node;
413 }
414 }
415 }
416
417 # select a new(?) master, if required
418 unless ($MASTER and node_is_up $MASTER) {
419 if (my @SEEDS = up_seeds) {
420 set_master $SEEDS[rand @SEEDS];
421 } else {
422 # select "last" non-seed node
423 set_master +(sort +up_nodes)[-1];
424 }
425 }
426
427 unless ($is_up) {
428 --$nodecnt;
429 more_seeding unless $nodecnt;
430 unreg_groups $node;
431
432 # forget about the node
433 delete $addr{$node};
434
435 # ask our master for quick recovery
436 snd $port{$MASTER}, find => $node
437 if $MASTER;
438 }
439 }
440
441 mon_node $_, 1
442 for up_nodes;
443
444 mon_nodes \&mon_node;
445
446 =back
447
448 =head1 SEE ALSO
449
450 L<AnyEvent::MP>.
451
452 =head1 AUTHOR
453
454 Marc Lehmann <schmorp@schmorp.de>
455 http://home.schmorp.de/
456
457 =cut
458
459 1
460