ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Global.pm
Revision: 1.26
Committed: Wed Sep 9 01:47:01 2009 UTC (14 years, 8 months ago) by root
Branch: MAIN
Changes since 1.25: +67 -18 lines
Log Message:
*** empty log message ***

File Contents

# Content
1 =head1 NAME
2
3 AnyEvent::MP::Global - some network-global services
4
5 =head1 SYNOPSIS
6
7 use AnyEvent::MP::Global;
8
9 =head1 DESCRIPTION
10
11 This module maintains a fully-meshed network, if possible, and tries to
12 ensure that we are connected to at least one other node.
13
14 It also manages named port groups - ports can register themselves in any
15 number of groups that will be available network-wide, which is great for
16 discovering services.
17
18 Running it on one node will automatically run it on all nodes, although,
19 at the moment, the global service is started by default anyways.
20
21 =head1 GLOBALS AND FUNCTIONS
22
23 =over 4
24
25 =cut
26
27 package AnyEvent::MP::Global;
28
29 use common::sense;
30 use Carp ();
31
32 use AnyEvent ();
33 use AnyEvent::Util ();
34
35 use AnyEvent::MP;
36 use AnyEvent::MP::Kernel;
37 use AnyEvent::MP::Transport ();
38
39 use base "Exporter";
40
41 our @EXPORT = qw(
42 grp_reg
43 grp_get
44 grp_mon
45 );
46
47 our $VERSION = $AnyEvent::MP::VERSION;
48
49 our %addr; # port ID => [address...] mapping
50
51 our %port; # our rendezvous port on the other side
52 our %lreg; # local registry, name => [pid...]
53 our %lmon; # local registry monitoring name,pid => mon
54 our %greg; # global regstry, name => pid => undef
55 our %gmon; # group monitoring, group => [$cb...]
56
57 our $nodecnt;
58
59 $AnyEvent::MP::Kernel::WARN->(7, "starting global service.");
60
61 #############################################################################
62 # seednodes
63
64 our $MASTER; # our current master (which we regularly query for net updates)
65
66 our %SEEDME; # $node => $port
67 our @SEEDS;
68 our %SEEDS; # just to check whether a seed is a seed
69 our %SEED_CONNECT;
70 our $SEED_WATCHER;
71
72 push @AnyEvent::MP::Transport::HOOK_CONNECT, sub {
73 my $peer = $_[0]{local_greeting}{peeraddr};
74 return unless exists $SEEDS{$peer};
75 $SEED_CONNECT{$peer} = 2;
76 };
77
78 push @AnyEvent::MP::Transport::HOOK_CONNECTED, sub {
79 my $peer = $_[0]{local_greeting}{peeraddr};
80 return unless exists $SEEDS{$peer};
81 $SEEDS{$peer} = $_[0]{remote_node};
82 };
83
84 push @AnyEvent::MP::Transport::HOOK_DESTROY, sub {
85 delete $SEED_CONNECT{$_[0]{local_greeting}{peeraddr}};
86
87 # check if we contacted ourselves, so nuke this seed
88 if (exists $_[0]{seed} && $_[0]{remote_node} eq $AnyEvent::MP::Kernel::NODE) {
89 # $AnyEvent::MP::Kernel::WARN->(0,"avoiding seed $_[0]{seed}\n");#d#
90 delete $SEEDS{$_[0]{seed}};
91 }
92 };
93
94 sub seed_connect {
95 my ($seed) = @_;
96
97 my ($host, $port) = AnyEvent::Socket::parse_hostport $seed
98 or Carp::croak "$seed: unparsable seed address";
99
100 return if $SEED_CONNECT{$seed};
101
102 $AnyEvent::MP::Kernel::WARN->(9, "trying connect to seed node $seed.");
103
104 # ughhh
105 $SEED_CONNECT{$seed} ||= AnyEvent::MP::Transport::mp_connect $host, $port,
106 seed => $seed,
107 sub {
108 $SEED_CONNECT{$seed} = 1;
109 },
110 ;
111 }
112
113 sub more_seeding {
114 my $int = List::Util::max 1,
115 $AnyEvent::MP::Kernel::CONFIG->{connect_interval}
116 * ($nodecnt ? keys %AnyEvent::MP::Kernel::NODE : 1)
117 - rand;
118
119 $SEED_WATCHER = AE::timer $int, 0, \&more_seeding;
120
121 @SEEDS = keys %SEEDS unless @SEEDS;
122 return unless @SEEDS;
123
124 seed_connect splice @SEEDS, rand @SEEDS, 1;
125 }
126
127 sub set_seeds(@) {
128 @SEEDS{@_} = ();
129
130 $SEED_WATCHER ||= AE::timer 5, $AnyEvent::MP::Kernel::MONITOR_TIMEOUT, \&more_seeding;
131
132 for (1 .. keys %SEEDS) {
133 after 0.100 * rand, \&more_seeding;
134 }
135 }
136
137 sub up_seeds() {
138 grep node_is_up $_, values %SEEDS
139 }
140
141 sub node_is_seed($) {
142 grep $_ eq $_[0], grep defined, values %SEEDS
143 }
144
145 # returns all (up) seed nodes, or all nodes if no seednodes are up/known
146 sub route_nodes {
147 my @seeds = up_seeds;
148 @seeds = up_nodes unless @seeds;
149 @seeds
150 }
151
152 #############################################################################
153
154 sub _change {
155 my ($group, $add, $del) = @_;
156
157 my $kv = $greg{$group} ||= {};
158
159 delete @$kv{@$del};
160 @$kv{@$add} = ();
161
162 my $ports = [keys %$kv];
163 $_->($ports, $add, $del) for @{ $gmon{$group} };
164 }
165
166 sub unreg_groups($) {
167 my ($node) = @_;
168
169 my $qr = qr/^\Q$node\E(?:#|$)/;
170 my @del;
171
172 while (my ($group, $ports) = each %greg) {
173 @del = grep /$qr/, keys %$ports;
174 _change $group, [], \@del
175 if @del;
176 }
177 }
178
179 sub set_groups($$) {
180 my ($node, $lreg) = @_;
181
182 while (my ($k, $v) = each %$lreg) {
183 _change $k, $v, [];
184 }
185 }
186
187 =item $guard = grp_reg $group, $port
188
189 Register the given (local!) port in the named global group C<$group>.
190
191 The port will be unregistered automatically when the port is destroyed.
192
193 When not called in void context, then a guard object will be returned that
194 will also cause the name to be unregistered when destroyed.
195
196 =cut
197
198 # unregister local port
199 sub unregister {
200 my ($port, $group) = @_;
201
202 delete $lmon{"$group\x00$port"};
203 @{ $lreg{$group} } = grep $_ ne $port, @{ $lreg{$group} };
204
205 _change $group, [], [$port];
206
207 snd $_, reg0 => $group, $port
208 for values %port;
209 }
210
211 # register local port
212 sub grp_reg($$) {
213 my ($group, $port) = @_;
214
215 port_is_local $port
216 or Carp::croak "AnyEvent::MP::Global::grp_reg can only be called for local ports, caught";
217
218 grep $_ eq $port, @{ $lreg{$group} }
219 and Carp::croak "'$group': group already registered, cannot register a second time";
220
221 $lmon{"$group\x00$port"} = mon $port, sub { unregister $port, $group };
222 push @{ $lreg{$group} }, $port;
223
224 snd $_, reg1 => $group, $port
225 for values %port;
226
227 _change $group, [$port], [];
228
229 wantarray && AnyEvent::Util::guard { unregister $port, $group }
230 }
231
232 =item $ports = grp_get $group
233
234 Returns all the ports currently registered to the given group (as
235 read-only(!) array reference). When the group has no registered members,
236 return C<undef>.
237
238 =cut
239
240 sub grp_get($) {
241 my @ports = keys %{ $greg{$_[0]} };
242 @ports ? \@ports : undef
243 }
244
245 =item $guard = grp_mon $group, $callback->($ports, $add, $del)
246
247 Installs a monitor on the given group. Each time there is a change it
248 will be called with the current group members as an arrayref as the
249 first argument. The second argument only contains ports added, the third
250 argument only ports removed.
251
252 Unlike C<grp_get>, all three arguments will always be array-refs, even if
253 the array is empty. None of the arrays must be modified in any way.
254
255 The first invocation will be with the first two arguments set to the
256 current members, as if all of them were just added, but only when the
257 group is atcually non-empty.
258
259 Optionally returns a guard object that uninstalls the watcher when it is
260 destroyed.
261
262 =cut
263
264 sub grp_mon($$) {
265 my ($grp, $cb) = @_;
266
267 AnyEvent::MP::Kernel::delay sub {
268 return unless $cb;
269
270 push @{ $gmon{$grp} }, $cb;
271 $cb->(((grp_get $grp) || return) x 2, []);
272 };
273
274 defined wantarray && AnyEvent::Util::::guard {
275 my @mon = grep $_ != $cb, @{ delete $gmon{$grp} };
276 $gmon{$grp} = \@mon if @mon;
277 undef $cb;
278 }
279 }
280
281 sub start_node {
282 my ($node) = @_;
283
284 return if exists $port{$node};
285 return if $node eq $NODE; # do not connect to ourselves
286
287 # establish connection
288 my $port = $port{$node} = spawn $node, "AnyEvent::MP::Global::connect", 0, $NODE;
289
290 mon $port, sub {
291 unreg_groups $node;
292 delete $port{$node};
293 };
294
295 snd $port, addr => $AnyEvent::MP::Kernel::LISTENER;
296 snd $port, nodes => \%addr if %addr;
297 snd $port, set => \%lreg if %lreg;
298 }
299
300 # other nodes connect via this
301 sub connect {
302 my ($version, $node) = @_;
303
304 # monitor them, silently die
305 mon $node, psub {
306 delete $SEEDME{$node};
307 kil $SELF;
308 };
309
310 rcv $SELF,
311 addr => sub {
312 my $addresses = shift;
313 $AnyEvent::MP::Kernel::WARN->(9, "$node told us its addresses (@$addresses).");
314 $addr{$node} = $addresses;
315
316 for my $node (values %SEEDME) {
317 my $port = $port{$node};
318 snd $port, nodes => { $node => $addresses };
319 }
320 },
321 nodes => sub {
322 my ($kv) = @_;
323
324 use JSON::XS;#d#
325 my $kv_txt = JSON::XS->new->encode ($kv);#d#
326 $AnyEvent::MP::Kernel::WARN->(9, "$node told us it knows about $kv_txt.");#d#
327
328 while (my ($id, $addresses) = each %$kv) {
329 my $node = AnyEvent::MP::Kernel::add_node $id;
330 $node->connect (@$addresses);
331 start_node $id;
332 }
333 },
334 find => sub {
335 my ($othernode) = @_;
336
337 $AnyEvent::MP::Kernel::WARN->(9, "$node asked us to find $othernode.");
338 snd $port{$node}, nodes => { $othernode => $addr{$othernode} }
339 if $addr{$othernode};
340 },
341 set => sub {
342 set_groups $node, shift;
343 },
344 reg0 => sub {
345 _change $_[0], [], [$_[1]];
346 },
347 reg1 => sub {
348 _change $_[0], [$_[1]], [];
349 },
350
351 # some node asks us to provide network updates
352 seedme0 => sub {
353 $AnyEvent::MP::Kernel::WARN->(0, "$node asked us to NOT seed it.");#d#
354 delete $SEEDME{$node};
355 },
356 seedme1 => sub {
357 $AnyEvent::MP::Kernel::WARN->(0, "$node asked us to seed it.");#d#
358 $SEEDME{$node} = ();
359
360 # for good measure
361 snd $port{$node}, nodes => \%addr if %addr;
362 },
363 ;
364 }
365
366 sub mon_node {
367 my ($node, $is_up) = @_;
368
369 if ($is_up) {
370 ++$nodecnt;
371 start_node $node;
372 }
373
374 # now select a new(?) master
375 my $master;
376
377 if (my @SEEDS = up_seeds) {
378 # switching here with lower chance roughly hopefully still gives us
379 # an equal selection.
380 $master = node_is_up $MASTER && node_is_seed $MASTER && 1 < rand @SEEDS
381 ? $MASTER
382 : $SEEDS[rand @SEEDS];
383 } else {
384 # select "last" non-seed node
385 $master = (sort +up_nodes)[-1];
386 #TODO maybe avoid listener-less nodes?
387 }
388
389 if ($MASTER ne $master) {
390 snd $port{$MASTER}, "seedme0"
391 if $MASTER && node_is_up $MASTER;
392
393 $MASTER = $master;
394 if ($MASTER) {
395 snd $port{$MASTER}, "seedme1";
396 $AnyEvent::MP::Kernel::WARN->(7, "selected new master: $MASTER.");
397 } else {
398 $AnyEvent::MP::Kernel::WARN->(1, "no contact to any other node.");
399 }
400 }
401
402 unless ($is_up) {
403 --$nodecnt;
404 more_seeding unless $nodecnt;
405 unreg_groups $node;
406
407 # forget about the node
408 delete $addr{$node};
409
410 # ask our master for quick recovery
411 snd $port{$MASTER}, find => $node
412 if $MASTER;
413 }
414 }
415
416 mon_node $_, 1
417 for up_nodes;
418
419 mon_nodes \&mon_node;
420
421 =back
422
423 =head1 SEE ALSO
424
425 L<AnyEvent::MP>.
426
427 =head1 AUTHOR
428
429 Marc Lehmann <schmorp@schmorp.de>
430 http://home.schmorp.de/
431
432 =cut
433
434 1
435