ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Kernel.pm
(Generate patch)

Comparing AnyEvent-MP/MP/Kernel.pm (file contents):
Revision 1.106 by root, Fri Mar 23 21:16:25 2012 UTC vs.
Revision 1.107 by root, Sat Mar 24 00:48:53 2012 UTC

187} 187}
188 188
189# this function adds a node-ref, so you can send stuff to it 189# this function adds a node-ref, so you can send stuff to it
190# it is basically the central routing component. 190# it is basically the central routing component.
191sub add_node { 191sub add_node {
192 $NODE{$_[0]} || do {
192 my ($node) = @_; 193 my ($node) = @_;
193 194
194 length $node 195 length $node
195 or Carp::croak "'undef' or the empty string are not valid node/port IDs"; 196 or Carp::croak "'undef' or the empty string are not valid node/port IDs";
196 197
198 # registers itself in %NODE
197 $NODE{$node} ||= new AnyEvent::MP::Node::Remote $node 199 new AnyEvent::MP::Node::Remote $node
200 }
198} 201}
199 202
200sub snd(@) { 203sub snd(@) {
201 my ($nodeid, $portid) = split /#/, shift, 2; 204 my ($nodeid, $portid) = split /#/, shift, 2;
202 205
274=item $bool = node_is_up $nodeid 277=item $bool = node_is_up $nodeid
275 278
276Returns true if the given node is "up", that is, the kernel thinks it has 279Returns true if the given node is "up", that is, the kernel thinks it has
277a working connection to it. 280a working connection to it.
278 281
279If the node is up, returns C<1>. If the node is currently connecting or 282More precisely, if the node is up, returns C<1>. If the node is currently
280otherwise known but not connected, returns C<0>. If nothing is known about 283connecting or otherwise known but not connected, returns C<0>. If nothing
281the node, returns C<undef>. 284is known about the node, returns C<undef>.
282 285
283=cut 286=cut
284 287
285sub node_is_up($) { 288sub node_is_up($) {
286 ($NODE{$_[0]} or return)->{transport} 289 ($_[0] eq $NODE) || ($NODE{$_[0]} or return)->{transport}
287 ? 1 : 0 290 ? 1 : 0
288} 291}
289 292
290=item @nodes = up_nodes 293=item @nodes = up_nodes
291 294
335} 338}
336 339
337sub _inject_nodeevent($$;@) { 340sub _inject_nodeevent($$;@) {
338 my ($node, $up, @reason) = @_; 341 my ($node, $up, @reason) = @_;
339 342
343 AE::log 7 => "$node->{id} is " . ($up ? "up." : "down (@reason).");
344
340 for my $cb (values %MON_NODES) { 345 for my $cb (values %MON_NODES) {
341 eval { $cb->($node->{id}, $up, @reason); 1 } 346 eval { $cb->($node->{id}, $up, @reason); 1 }
342 or AE::log die => $@; 347 or AE::log die => $@;
343 } 348 }
344
345 AE::log 7 => "$node->{id} is " . ($up ? "up." : "down (@reason).");
346} 349}
347 350
348############################################################################# 351#############################################################################
349# self node code 352# self node code
350 353
377sub _secure_check { 380sub _secure_check {
378 $SECURE 381 $SECURE
379 and die "remote execution not allowed\n"; 382 and die "remote execution not allowed\n";
380} 383}
381 384
385our %NODE_REQ;
386
382our %NODE_REQ = ( 387%NODE_REQ = (
383 # internal services 388 # "mproto" - monitoring protocol
384 389
385 # monitoring 390 # monitoring
386 mon0 => sub { # stop monitoring a port for another node 391 mon0 => sub { # stop monitoring a port for another node
387 my $portid = shift; 392 my $portid = shift;
388 _unmonitor undef, $portid, delete $NODE{$SRCNODE}{rmon}{$portid}; 393 _unmonitor undef, $portid, delete $NODE{$SRCNODE}{rmon}{$portid};
401 my $cbs = delete $NODE{$SRCNODE}{lmon}{+shift} 406 my $cbs = delete $NODE{$SRCNODE}{lmon}{+shift}
402 or return; 407 or return;
403 408
404 $_->(@_) for @$cbs; 409 $_->(@_) for @$cbs;
405 }, 410 },
411 # another node wants to kill a local port
412 kil1 => \&_kill,
406 413
407 # "public" services - not actually public 414 # "public" services - not actually public
408
409 # another node wants to kill a local port
410 kil => \&_kill,
411 415
412 # relay message to another node / generic echo 416 # relay message to another node / generic echo
413 snd => sub { 417 snd => sub {
414 &_secure_check;
415 &snd 418 &snd
419 },
420 # ask if a node supports the given request, only works for fixed tags
421 can => sub {
422 my $method = shift;
423 snd @_, exists $NODE_REQ{$method};
416 }, 424 },
417 425
418 # random utilities 426 # random utilities
419 eval => sub { 427 eval => sub {
420 &_secure_check; 428 &_secure_check;
421 my @res = do { package main; eval shift }; 429 my @res = do { package main; eval shift };
422 snd @_, "$@", @res if @_; 430 snd @_, "$@", @res if @_;
423 }, 431 },
424 time => sub { 432 time => sub {
425 &_secure_check;
426 snd @_, AE::now; 433 snd @_, AE::now;
427 }, 434 },
428 devnull => sub { 435 devnull => sub {
429 # 436 #
430 }, 437 },
432 # empty messages are keepalives or similar devnull-applications 439 # empty messages are keepalives or similar devnull-applications
433 }, 440 },
434); 441);
435 442
436# the node port 443# the node port
437$NODE{$NODE} = new AnyEvent::MP::Node::Self $NODE; 444new AnyEvent::MP::Node::Self $NODE; # registers itself in %NODE
445
438$PORT{""} = sub { 446$PORT{""} = sub {
439 my $tag = shift; 447 my $tag = shift;
440 eval { &{ $NODE_REQ{$tag} ||= do { &_secure_check; load_func $tag } } }; 448 eval { &{ $NODE_REQ{$tag} ||= do { &_secure_check; load_func $tag } } };
441 AE::log die => "error processing node message from $SRCNODE: $@" if $@; 449 AE::log die => "error processing node message from $SRCNODE: $@" if $@;
442}; 450};
443 451
444our $NPROTO = 1; 452our $MPROTO = 1;
445 453
446# tell everybody who connects our nproto 454# tell everybody who connects our nproto
447push @AnyEvent::MP::Transport::HOOK_GREET, sub { 455push @AnyEvent::MP::Transport::HOOK_GREET, sub {
448 $_[0]{local_greeting}{nproto} = $NPROTO; 456 $_[0]{local_greeting}{mproto} = $MPROTO;
449}; 457};
450 458
451############################################################################# 459#############################################################################
452# seed management, try to keep connections to all seeds at all times 460# seed management, try to keep connections to all seeds at all times
453 461
543 snd $_[0], "g_slave" 551 snd $_[0], "g_slave"
544 unless $NODE{$_[0]}{transport}{remote_greeting}{global}; 552 unless $NODE{$_[0]}{transport}{remote_greeting}{global};
545 } else { 553 } else {
546 # if we lost the connection to a seed node, make sure we are seeding 554 # if we lost the connection to a seed node, make sure we are seeding
547 seed_again; 555 seed_again;
556 }
557};
558
559#############################################################################
560# keepalive code - used to kepe conenctions to certain nodes alive
561# only used by global code atm., but ought to be exposed somehow.
562
563our $KEEPALIVE_RETRY;
564our $KEEPALIVE_WATCHER;
565our %KEEPALIVE; # we want to keep these nodes alive
566our %KEEPALIVE_DOWN; # nodes that are down currently
567
568sub keepalive_all {
569 AE::log 9 => "keepalive: trying to establish connections with: "
570 . (join " ", keys %KEEPALIVE_DOWN)
571 . ".";
572
573 (add_node $_)->connect
574 for keys %KEEPALIVE_DOWN;
575
576 $KEEPALIVE_RETRY = $KEEPALIVE_RETRY * 2 + rand;
577 $KEEPALIVE_RETRY = $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout}
578 if $KEEPALIVE_RETRY > $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout};
579
580 $KEEPALIVE_WATCHER = AE::timer $KEEPALIVE_RETRY, 0, \&keepalive_all;
581}
582
583sub keepalive_again {
584 $KEEPALIVE_RETRY = 1;
585 $KEEPALIVE_WATCHER ||= AE::timer 1, 0, \&keepalive_all;
586}
587
588sub keepalive_add {
589 return if $KEEPALIVE{$_[0]}++;
590
591 return if node_is_up $_[0];
592 undef $KEEPALIVE_DOWN{$_[0]};
593 keepalive_again;
594}
595
596sub keepalive_del {
597 return if --$KEEPALIVE{$_[0]};
598
599 delete $KEEPALIVE {$_[0]};
600 delete $KEEPALIVE_DOWN{$_[0]};
601
602 undef $KEEPALIVE_WATCHER
603 unless %KEEPALIVE_DOWN;
604}
605
606mon_nodes sub {
607 return unless exists $KEEPALIVE{$_[0]};
608
609 if ($_[1]) {
610 delete $KEEPALIVE_DOWN{$_[0]};
611
612 undef $KEEPALIVE_WATCHER
613 unless %KEEPALIVE_DOWN;
614 } else {
615 # lost the conenction, try to connect again
616 undef $KEEPALIVE_DOWN{$_[0]};
617 keepalive_again;
548 } 618 }
549}; 619};
550 620
551############################################################################# 621#############################################################################
552# talk with/to global nodes 622# talk with/to global nodes

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines