… | |
… | |
59 | $AnyEvent::MP::Kernel::WARN->(7, "starting global service."); |
59 | $AnyEvent::MP::Kernel::WARN->(7, "starting global service."); |
60 | |
60 | |
61 | ############################################################################# |
61 | ############################################################################# |
62 | # seednodes |
62 | # seednodes |
63 | |
63 | |
|
|
64 | our $MASTER; # our current master (which we regularly query for net updates) |
|
|
65 | |
|
|
66 | our %SEEDME; # $node => $port |
64 | our @SEEDS; |
67 | our @SEEDS; |
65 | our %SEEDS; # just to check whether a seed is a seed |
68 | our %SEEDS; # just to check whether a seed is a seed |
66 | our %SEED_CONNECT; |
69 | our %SEED_CONNECT; |
67 | our $SEED_WATCHER; |
70 | our $SEED_WATCHER; |
68 | |
71 | |
… | |
… | |
129 | for (1 .. keys %SEEDS) { |
132 | for (1 .. keys %SEEDS) { |
130 | after 0.100 * rand, \&more_seeding; |
133 | after 0.100 * rand, \&more_seeding; |
131 | } |
134 | } |
132 | } |
135 | } |
133 | |
136 | |
|
|
137 | sub up_seeds() { |
|
|
138 | grep node_is_up $_, values %SEEDS |
|
|
139 | } |
|
|
140 | |
|
|
141 | sub node_is_seed($) { |
|
|
142 | grep $_ eq $_[0], grep defined, values %SEEDS |
|
|
143 | } |
|
|
144 | |
134 | # returns all (up) seed nodes, or all nodes if no seednodes are up/known |
145 | # returns all (up) seed nodes, or all nodes if no seednodes are up/known |
135 | sub _route_nodes { |
146 | sub route_nodes { |
136 | my @seeds = grep node_is_up $_, values %SEEDS; |
147 | my @seeds = up_seeds; |
137 | @seeds = up_nodes unless @seeds; |
148 | @seeds = up_nodes unless @seeds; |
138 | @seeds |
149 | @seeds |
139 | } |
150 | } |
140 | |
151 | |
141 | ############################################################################# |
152 | ############################################################################# |
… | |
… | |
289 | # other nodes connect via this |
300 | # other nodes connect via this |
290 | sub connect { |
301 | sub connect { |
291 | my ($version, $node) = @_; |
302 | my ($version, $node) = @_; |
292 | |
303 | |
293 | # monitor them, silently die |
304 | # monitor them, silently die |
294 | mon $node, psub { kil $SELF }; |
305 | mon $node, psub { |
|
|
306 | delete $SEEDME{$node}; |
|
|
307 | kil $SELF; |
|
|
308 | }; |
295 | |
309 | |
296 | rcv $SELF, |
310 | rcv $SELF, |
297 | addr => sub { |
311 | addr => sub { |
298 | my $addresses = shift; |
312 | my $addresses = shift; |
299 | $AnyEvent::MP::Kernel::WARN->(9, "$node told us its addresses (@$addresses)."); |
313 | $AnyEvent::MP::Kernel::WARN->(9, "$node told us its addresses (@$addresses)."); |
300 | $addr{$node} = $addresses; |
314 | $addr{$node} = $addresses; |
301 | |
315 | |
302 | # to help listener-less nodes, we broadcast new addresses to them unconditionally |
316 | for my $node (values %SEEDME) { |
303 | #TODO: should be done by a node finding out about a listener-less one |
317 | my $port = $port{$node}; |
304 | if (@$addresses) { |
|
|
305 | for my $other (values %AnyEvent::MP::Kernel::NODE) { |
|
|
306 | if ($other->{transport}) { |
|
|
307 | if ($addr{$other->{id}} && !@{ $addr{$other->{id}} }) { |
|
|
308 | $AnyEvent::MP::Kernel::WARN->(9, "helping $other->{id} to find $node."); |
|
|
309 | snd $port{$other->{id}}, nodes => { $node => $addresses }; |
318 | snd $port, nodes => { $node => $addresses }; |
310 | } |
|
|
311 | } |
|
|
312 | } |
|
|
313 | } |
319 | } |
314 | }, |
320 | }, |
315 | nodes => sub { |
321 | nodes => sub { |
316 | my ($kv) = @_; |
322 | my ($kv) = @_; |
317 | |
323 | |
… | |
… | |
339 | _change $_[0], [], [$_[1]]; |
345 | _change $_[0], [], [$_[1]]; |
340 | }, |
346 | }, |
341 | reg1 => sub { |
347 | reg1 => sub { |
342 | _change $_[0], [$_[1]], []; |
348 | _change $_[0], [$_[1]], []; |
343 | }, |
349 | }, |
|
|
350 | |
|
|
351 | # some node asks us to provide network updates |
|
|
352 | seedme0 => sub { |
|
|
353 | $AnyEvent::MP::Kernel::WARN->(0, "$node asked us to NOT seed it.");#d# |
|
|
354 | delete $SEEDME{$node}; |
|
|
355 | }, |
|
|
356 | seedme1 => sub { |
|
|
357 | $AnyEvent::MP::Kernel::WARN->(0, "$node asked us to seed it.");#d# |
|
|
358 | $SEEDME{$node} = (); |
|
|
359 | |
|
|
360 | # for good measure |
|
|
361 | snd $port{$node}, nodes => \%addr if %addr; |
|
|
362 | }, |
344 | ; |
363 | ; |
345 | } |
364 | } |
346 | |
365 | |
347 | sub mon_node { |
366 | sub mon_node { |
348 | my ($node, $is_up) = @_; |
367 | my ($node, $is_up) = @_; |
349 | |
368 | |
350 | if ($is_up) { |
369 | if ($is_up) { |
351 | ++$nodecnt; |
370 | ++$nodecnt; |
352 | start_node $node; |
371 | start_node $node; |
|
|
372 | } |
|
|
373 | |
|
|
374 | # now select a new(?) master |
|
|
375 | my $master; |
|
|
376 | |
|
|
377 | if (my @SEEDS = up_seeds) { |
|
|
378 | # switching here with lower chance roughly hopefully still gives us |
|
|
379 | # an equal selection. |
|
|
380 | $master = node_is_up $MASTER && node_is_seed $MASTER && 1 < rand @SEEDS |
|
|
381 | ? $MASTER |
|
|
382 | : $SEEDS[rand @SEEDS]; |
353 | } else { |
383 | } else { |
|
|
384 | # select "last" non-seed node |
|
|
385 | $master = (sort +up_nodes)[-1]; |
|
|
386 | #TODO maybe avoid listener-less nodes? |
|
|
387 | } |
|
|
388 | |
|
|
389 | if ($MASTER ne $master) { |
|
|
390 | snd $port{$MASTER}, "seedme0" |
|
|
391 | if $MASTER && node_is_up $MASTER; |
|
|
392 | |
|
|
393 | $MASTER = $master; |
|
|
394 | if ($MASTER) { |
|
|
395 | snd $port{$MASTER}, "seedme1"; |
|
|
396 | $AnyEvent::MP::Kernel::WARN->(7, "selected new master: $MASTER."); |
|
|
397 | } else { |
|
|
398 | $AnyEvent::MP::Kernel::WARN->(1, "no contact to any other node."); |
|
|
399 | } |
|
|
400 | } |
|
|
401 | |
|
|
402 | unless ($is_up) { |
354 | --$nodecnt; |
403 | --$nodecnt; |
355 | more_seeding unless $nodecnt; |
404 | more_seeding unless $nodecnt; |
356 | unreg_groups $node; |
405 | unreg_groups $node; |
357 | |
406 | |
358 | # forget about the node |
407 | # forget about the node |
359 | delete $addr{$node}; |
408 | delete $addr{$node}; |
360 | # ask other nodes if they know the node |
409 | |
|
|
410 | # ask our master for quick recovery |
361 | snd $_, find => $node |
411 | snd $port{$MASTER}, find => $node |
362 | for grep $_, map $port{$_}, _route_nodes; |
412 | if $MASTER; |
363 | } |
413 | } |
364 | #warn "node<$node,$is_up>\n";#d# |
|
|
365 | } |
414 | } |
366 | |
415 | |
367 | mon_node $_, 1 |
416 | mon_node $_, 1 |
368 | for up_nodes; |
417 | for up_nodes; |
369 | |
418 | |