ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Global.pm
Revision: 1.28
Committed: Wed Sep 9 21:13:03 2009 UTC (14 years, 8 months ago) by root
Branch: MAIN
Changes since 1.27: +1 -1 lines
Log Message:
*** empty log message ***

File Contents

# Content
1 =head1 NAME
2
3 AnyEvent::MP::Global - some network-global services
4
5 =head1 SYNOPSIS
6
7 use AnyEvent::MP::Global;
8
9 =head1 DESCRIPTION
10
11 This module maintains a fully-meshed network, if possible, and tries to
12 ensure that we are connected to at least one other node.
13
14 It also manages named port groups - ports can register themselves in any
15 number of groups that will be available network-wide, which is great for
16 discovering services.
17
18 Running it on one node will automatically run it on all nodes, although,
19 at the moment, the global service is started by default anyways.
20
21 =head1 GLOBALS AND FUNCTIONS
22
23 =over 4
24
25 =cut
26
27 package AnyEvent::MP::Global;
28
29 use common::sense;
30 use Carp ();
31
32 use AnyEvent ();
33 use AnyEvent::Util ();
34
35 use AnyEvent::MP;
36 use AnyEvent::MP::Kernel;
37 use AnyEvent::MP::Transport ();
38
39 use base "Exporter";
40
41 our @EXPORT = qw(
42 grp_reg
43 grp_get
44 grp_mon
45 );
46
47 our $VERSION = $AnyEvent::MP::VERSION;
48
49 our %addr; # port ID => [address...] mapping
50
51 our %port; # our rendezvous port on the other side
52 our %lreg; # local registry, name => [pid...]
53 our %lmon; # local registry monitoring name,pid => mon
54 our %greg; # global regstry, name => pid => undef
55 our %gmon; # group monitoring, group => [$cb...]
56
57 our $nodecnt;
58
59 $AnyEvent::MP::Kernel::WARN->(7, "starting global service.");
60
61 #############################################################################
62 # seednodes
63
64 our $MASTER; # our current master (which we regularly query for net updates)
65
66 our %SEEDME; # $node => $port
67 our @SEEDS;
68 our %SEEDS; # just to check whether a seed is a seed
69 our %SEED_CONNECT;
70 our $SEED_WATCHER;
71
72 push @AnyEvent::MP::Transport::HOOK_CONNECT, sub {
73 my $peer = $_[0]{local_greeting}{peeraddr};
74 return unless exists $SEEDS{$peer};
75 $SEED_CONNECT{$peer} = 2;
76 };
77
78 push @AnyEvent::MP::Transport::HOOK_GREETING, sub {
79 # we rely on untrusted data here (the remote node name)
80 # this is hopefully ok, as we override it on successful
81 # connects, and this can at most be used for DOSing,
82 # which is easy when you can do MITM.
83 my $peer = $_[0]{local_greeting}{peeraddr};
84 return unless exists $SEEDS{$peer};
85 $SEEDS{$peer} ||= $_[0]{remote_node};
86 };
87
88 push @AnyEvent::MP::Transport::HOOK_CONNECTED, sub {
89 my $peer = $_[0]{local_greeting}{peeraddr};
90 return unless exists $SEEDS{$peer};
91 $SEEDS{$peer} = $_[0]{remote_node};
92 };
93
94 push @AnyEvent::MP::Transport::HOOK_DESTROY, sub {
95 delete $SEED_CONNECT{$_[0]{local_greeting}{peeraddr}};
96
97 # check if we contacted ourselves, so nuke this seed
98 if (exists $_[0]{seed} && $_[0]{remote_node} eq $AnyEvent::MP::Kernel::NODE) {
99 # $AnyEvent::MP::Kernel::WARN->(0,"avoiding seed $_[0]{seed}\n");#d#
100 delete $SEEDS{$_[0]{seed}};
101 }
102 };
103
104 sub seed_connect {
105 my ($seed) = @_;
106
107 my ($host, $port) = AnyEvent::Socket::parse_hostport $seed
108 or Carp::croak "$seed: unparsable seed address";
109
110 return if $SEED_CONNECT{$seed};
111 return if defined $SEEDS{$seed} && node_is_up $SEEDS{$seed};
112
113 $AnyEvent::MP::Kernel::WARN->(9, "trying connect to seed node $seed.");
114
115 # ughhh
116 $SEED_CONNECT{$seed} ||= AnyEvent::MP::Transport::mp_connect $host, $port,
117 seed => $seed,
118 sub {
119 $SEED_CONNECT{$seed} = 1;
120 },
121 ;
122 }
123
124 sub more_seeding {
125 my $int = List::Util::max 1,
126 $AnyEvent::MP::Kernel::CONFIG->{connect_interval}
127 * ($nodecnt ? keys %AnyEvent::MP::Kernel::NODE : 1)
128 - rand;
129
130 $SEED_WATCHER = AE::timer $int, 0, \&more_seeding;
131
132 @SEEDS = keys %SEEDS unless @SEEDS;
133 return unless @SEEDS;
134
135 seed_connect splice @SEEDS, rand @SEEDS, 1;
136 }
137
138 sub set_seeds(@) {
139 @SEEDS{@_} = ();
140
141 $SEED_WATCHER ||= AE::timer 5, $AnyEvent::MP::Kernel::MONITOR_TIMEOUT, \&more_seeding;
142
143 for (1 .. keys %SEEDS) {
144 after 0.100 * rand, \&more_seeding;
145 }
146 }
147
148 sub up_seeds() {
149 grep node_is_up $_, values %SEEDS
150 }
151
152 sub node_is_seed($) {
153 grep $_ eq $_[0], grep defined, values %SEEDS
154 }
155
156 # returns all (up) seed nodes, or all nodes if no seednodes are up/known
157 sub route_nodes {
158 my @seeds = up_seeds;
159 @seeds = up_nodes unless @seeds;
160 @seeds
161 }
162
163 #############################################################################
164
165 sub _change {
166 my ($group, $add, $del) = @_;
167
168 my $kv = $greg{$group} ||= {};
169
170 delete @$kv{@$del};
171 @$kv{@$add} = ();
172
173 my $ports = [keys %$kv];
174 $_->($ports, $add, $del) for @{ $gmon{$group} };
175 }
176
177 sub unreg_groups($) {
178 my ($node) = @_;
179
180 my $qr = qr/^\Q$node\E(?:#|$)/;
181 my @del;
182
183 while (my ($group, $ports) = each %greg) {
184 @del = grep /$qr/, keys %$ports;
185 _change $group, [], \@del
186 if @del;
187 }
188 }
189
190 sub set_groups($$) {
191 my ($node, $lreg) = @_;
192
193 while (my ($k, $v) = each %$lreg) {
194 _change $k, $v, [];
195 }
196 }
197
198 =item $guard = grp_reg $group, $port
199
200 Register the given (local!) port in the named global group C<$group>.
201
202 The port will be unregistered automatically when the port is destroyed.
203
204 When not called in void context, then a guard object will be returned that
205 will also cause the name to be unregistered when destroyed.
206
207 =cut
208
209 # unregister local port
210 sub unregister {
211 my ($port, $group) = @_;
212
213 delete $lmon{"$group\x00$port"};
214 @{ $lreg{$group} } = grep $_ ne $port, @{ $lreg{$group} };
215
216 _change $group, [], [$port];
217
218 snd $_, reg0 => $group, $port
219 for values %port;
220 }
221
222 # register local port
223 sub grp_reg($$) {
224 my ($group, $port) = @_;
225
226 port_is_local $port
227 or Carp::croak "AnyEvent::MP::Global::grp_reg can only be called for local ports, caught";
228
229 grep $_ eq $port, @{ $lreg{$group} }
230 and Carp::croak "'$group': group already registered, cannot register a second time";
231
232 $lmon{"$group\x00$port"} = mon $port, sub { unregister $port, $group };
233 push @{ $lreg{$group} }, $port;
234
235 snd $_, reg1 => $group, $port
236 for values %port;
237
238 _change $group, [$port], [];
239
240 wantarray && AnyEvent::Util::guard { unregister $port, $group }
241 }
242
243 =item $ports = grp_get $group
244
245 Returns all the ports currently registered to the given group (as
246 read-only(!) array reference). When the group has no registered members,
247 return C<undef>.
248
249 =cut
250
251 sub grp_get($) {
252 my @ports = keys %{ $greg{$_[0]} };
253 @ports ? \@ports : undef
254 }
255
256 =item $guard = grp_mon $group, $callback->($ports, $add, $del)
257
258 Installs a monitor on the given group. Each time there is a change it
259 will be called with the current group members as an arrayref as the
260 first argument. The second argument only contains ports added, the third
261 argument only ports removed.
262
263 Unlike C<grp_get>, all three arguments will always be array-refs, even if
264 the array is empty. None of the arrays must be modified in any way.
265
266 The first invocation will be with the first two arguments set to the
267 current members, as if all of them were just added, but only when the
268 group is atcually non-empty.
269
270 Optionally returns a guard object that uninstalls the watcher when it is
271 destroyed.
272
273 =cut
274
275 sub grp_mon($$) {
276 my ($grp, $cb) = @_;
277
278 AnyEvent::MP::Kernel::delay sub {
279 return unless $cb;
280
281 push @{ $gmon{$grp} }, $cb;
282 $cb->(((grp_get $grp) || return) x 2, []);
283 };
284
285 defined wantarray && AnyEvent::Util::::guard {
286 my @mon = grep $_ != $cb, @{ delete $gmon{$grp} };
287 $gmon{$grp} = \@mon if @mon;
288 undef $cb;
289 }
290 }
291
292 sub start_node {
293 my ($node) = @_;
294
295 return if exists $port{$node};
296 return if $node eq $NODE; # do not connect to ourselves
297
298 # establish connection
299 my $port = $port{$node} = spawn $node, "AnyEvent::MP::Global::connect", 0, $NODE;
300
301 mon $port, sub {
302 unreg_groups $node;
303 delete $port{$node};
304 };
305
306 snd $port, addr => $AnyEvent::MP::Kernel::LISTENER;
307 snd $port, nodes => \%addr if %addr;
308 snd $port, set => \%lreg if %lreg;
309 }
310
311 # other nodes connect via this
312 sub connect {
313 my ($version, $node) = @_;
314
315 # monitor them, silently die
316 mon $node, psub {
317 delete $SEEDME{$node};
318 kil $SELF;
319 };
320
321 rcv $SELF,
322 addr => sub {
323 my $addresses = shift;
324 $AnyEvent::MP::Kernel::WARN->(9, "$node told us its addresses (@$addresses).");
325 $addr{$node} = $addresses;
326
327 for my $node (keys %SEEDME) {
328 my $port = $port{$node};
329 snd $port, nodes => { $node => $addresses };
330 }
331 },
332 nodes => sub {
333 my ($kv) = @_;
334
335 use JSON::XS;#d#
336 my $kv_txt = JSON::XS->new->encode ($kv);#d#
337 $AnyEvent::MP::Kernel::WARN->(9, "$node told us it knows about $kv_txt.");#d#
338
339 while (my ($id, $addresses) = each %$kv) {
340 my $node = AnyEvent::MP::Kernel::add_node $id;
341 $node->connect (@$addresses);
342 start_node $id;
343 }
344 },
345 find => sub {
346 my ($othernode) = @_;
347
348 $AnyEvent::MP::Kernel::WARN->(9, "$node asked us to find $othernode.");
349 snd $port{$node}, nodes => { $othernode => $addr{$othernode} }
350 if $addr{$othernode};
351 },
352 set => sub {
353 set_groups $node, shift;
354 },
355 reg0 => sub {
356 _change $_[0], [], [$_[1]];
357 },
358 reg1 => sub {
359 _change $_[0], [$_[1]], [];
360 },
361
362 # some node asks us to provide network updates
363 seedme0 => sub {
364 $AnyEvent::MP::Kernel::WARN->(0, "$node asked us to NOT seed it.");#d#
365 delete $SEEDME{$node};
366 },
367 seedme1 => sub {
368 $AnyEvent::MP::Kernel::WARN->(0, "$node asked us to seed it.");#d#
369 $SEEDME{$node} = ();
370
371 # for good measure
372 snd $port{$node}, nodes => \%addr if %addr;
373 },
374 ;
375 }
376
377 sub mon_node {
378 my ($node, $is_up) = @_;
379
380 if ($is_up) {
381 ++$nodecnt;
382 start_node $node;
383 }
384
385 # now select a new(?) master
386 my $master;
387
388 if (my @SEEDS = up_seeds) {
389 # switching here with lower chance roughly hopefully still gives us
390 # an equal selection.
391 $master = node_is_up $MASTER && node_is_seed $MASTER && 1 < rand @SEEDS
392 ? $MASTER
393 : $SEEDS[rand @SEEDS];
394 } else {
395 # select "last" non-seed node
396 $master = (sort +up_nodes)[-1];
397 #TODO maybe avoid listener-less nodes?
398 }
399
400 if ($MASTER ne $master) {
401 snd $port{$MASTER}, "seedme0"
402 if $MASTER && node_is_up $MASTER;
403
404 $MASTER = $master;
405 if ($MASTER) {
406 snd $port{$MASTER}, "seedme1";
407 $AnyEvent::MP::Kernel::WARN->(7, "selected new master: $MASTER.");
408 } else {
409 $AnyEvent::MP::Kernel::WARN->(1, "no contact to any other node.");
410 }
411 }
412
413 unless ($is_up) {
414 --$nodecnt;
415 more_seeding unless $nodecnt;
416 unreg_groups $node;
417
418 # forget about the node
419 delete $addr{$node};
420
421 # ask our master for quick recovery
422 snd $port{$MASTER}, find => $node
423 if $MASTER;
424 }
425 }
426
427 mon_node $_, 1
428 for up_nodes;
429
430 mon_nodes \&mon_node;
431
432 =back
433
434 =head1 SEE ALSO
435
436 L<AnyEvent::MP>.
437
438 =head1 AUTHOR
439
440 Marc Lehmann <schmorp@schmorp.de>
441 http://home.schmorp.de/
442
443 =cut
444
445 1
446