ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Global.pm
(Generate patch)

Comparing AnyEvent-MP/MP/Global.pm (file contents):
Revision 1.25 by root, Tue Sep 8 13:46:25 2009 UTC vs.
Revision 1.26 by root, Wed Sep 9 01:47:01 2009 UTC

59$AnyEvent::MP::Kernel::WARN->(7, "starting global service."); 59$AnyEvent::MP::Kernel::WARN->(7, "starting global service.");
60 60
61############################################################################# 61#############################################################################
62# seednodes 62# seednodes
63 63
64our $MASTER; # our current master (which we regularly query for net updates)
65
66our %SEEDME; # $node => $port
64our @SEEDS; 67our @SEEDS;
65our %SEEDS; # just to check whether a seed is a seed 68our %SEEDS; # just to check whether a seed is a seed
66our %SEED_CONNECT; 69our %SEED_CONNECT;
67our $SEED_WATCHER; 70our $SEED_WATCHER;
68 71
129 for (1 .. keys %SEEDS) { 132 for (1 .. keys %SEEDS) {
130 after 0.100 * rand, \&more_seeding; 133 after 0.100 * rand, \&more_seeding;
131 } 134 }
132} 135}
133 136
137sub up_seeds() {
138 grep node_is_up $_, values %SEEDS
139}
140
141sub node_is_seed($) {
142 grep $_ eq $_[0], grep defined, values %SEEDS
143}
144
134# returns all (up) seed nodes, or all nodes if no seednodes are up/known 145# returns all (up) seed nodes, or all nodes if no seednodes are up/known
135sub _route_nodes { 146sub route_nodes {
136 my @seeds = grep node_is_up $_, values %SEEDS; 147 my @seeds = up_seeds;
137 @seeds = up_nodes unless @seeds; 148 @seeds = up_nodes unless @seeds;
138 @seeds 149 @seeds
139} 150}
140 151
141############################################################################# 152#############################################################################
289# other nodes connect via this 300# other nodes connect via this
290sub connect { 301sub connect {
291 my ($version, $node) = @_; 302 my ($version, $node) = @_;
292 303
293 # monitor them, silently die 304 # monitor them, silently die
294 mon $node, psub { kil $SELF }; 305 mon $node, psub {
306 delete $SEEDME{$node};
307 kil $SELF;
308 };
295 309
296 rcv $SELF, 310 rcv $SELF,
297 addr => sub { 311 addr => sub {
298 my $addresses = shift; 312 my $addresses = shift;
299 $AnyEvent::MP::Kernel::WARN->(9, "$node told us its addresses (@$addresses)."); 313 $AnyEvent::MP::Kernel::WARN->(9, "$node told us its addresses (@$addresses).");
300 $addr{$node} = $addresses; 314 $addr{$node} = $addresses;
301 315
302 # to help listener-less nodes, we broadcast new addresses to them unconditionally 316 for my $node (values %SEEDME) {
303 #TODO: should be done by a node finding out about a listener-less one 317 my $port = $port{$node};
304 if (@$addresses) {
305 for my $other (values %AnyEvent::MP::Kernel::NODE) {
306 if ($other->{transport}) {
307 if ($addr{$other->{id}} && !@{ $addr{$other->{id}} }) {
308 $AnyEvent::MP::Kernel::WARN->(9, "helping $other->{id} to find $node.");
309 snd $port{$other->{id}}, nodes => { $node => $addresses }; 318 snd $port, nodes => { $node => $addresses };
310 }
311 }
312 }
313 } 319 }
314 }, 320 },
315 nodes => sub { 321 nodes => sub {
316 my ($kv) = @_; 322 my ($kv) = @_;
317 323
339 _change $_[0], [], [$_[1]]; 345 _change $_[0], [], [$_[1]];
340 }, 346 },
341 reg1 => sub { 347 reg1 => sub {
342 _change $_[0], [$_[1]], []; 348 _change $_[0], [$_[1]], [];
343 }, 349 },
350
351 # some node asks us to provide network updates
352 seedme0 => sub {
353 $AnyEvent::MP::Kernel::WARN->(0, "$node asked us to NOT seed it.");#d#
354 delete $SEEDME{$node};
355 },
356 seedme1 => sub {
357 $AnyEvent::MP::Kernel::WARN->(0, "$node asked us to seed it.");#d#
358 $SEEDME{$node} = ();
359
360 # for good measure
361 snd $port{$node}, nodes => \%addr if %addr;
362 },
344 ; 363 ;
345} 364}
346 365
347sub mon_node { 366sub mon_node {
348 my ($node, $is_up) = @_; 367 my ($node, $is_up) = @_;
349 368
350 if ($is_up) { 369 if ($is_up) {
351 ++$nodecnt; 370 ++$nodecnt;
352 start_node $node; 371 start_node $node;
372 }
373
374 # now select a new(?) master
375 my $master;
376
377 if (my @SEEDS = up_seeds) {
378 # switching here with lower chance roughly hopefully still gives us
379 # an equal selection.
380 $master = node_is_up $MASTER && node_is_seed $MASTER && 1 < rand @SEEDS
381 ? $MASTER
382 : $SEEDS[rand @SEEDS];
353 } else { 383 } else {
384 # select "last" non-seed node
385 $master = (sort +up_nodes)[-1];
386 #TODO maybe avoid listener-less nodes?
387 }
388
389 if ($MASTER ne $master) {
390 snd $port{$MASTER}, "seedme0"
391 if $MASTER && node_is_up $MASTER;
392
393 $MASTER = $master;
394 if ($MASTER) {
395 snd $port{$MASTER}, "seedme1";
396 $AnyEvent::MP::Kernel::WARN->(7, "selected new master: $MASTER.");
397 } else {
398 $AnyEvent::MP::Kernel::WARN->(1, "no contact to any other node.");
399 }
400 }
401
402 unless ($is_up) {
354 --$nodecnt; 403 --$nodecnt;
355 more_seeding unless $nodecnt; 404 more_seeding unless $nodecnt;
356 unreg_groups $node; 405 unreg_groups $node;
357 406
358 # forget about the node 407 # forget about the node
359 delete $addr{$node}; 408 delete $addr{$node};
360 # ask other nodes if they know the node 409
410 # ask our master for quick recovery
361 snd $_, find => $node 411 snd $port{$MASTER}, find => $node
362 for grep $_, map $port{$_}, _route_nodes; 412 if $MASTER;
363 } 413 }
364 #warn "node<$node,$is_up>\n";#d#
365} 414}
366 415
367mon_node $_, 1 416mon_node $_, 1
368 for up_nodes; 417 for up_nodes;
369 418

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines