ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Kernel.pm
(Generate patch)

Comparing AnyEvent-MP/MP/Kernel.pm (file contents):
Revision 1.78 by root, Fri Mar 2 19:19:21 2012 UTC vs.
Revision 1.79 by root, Sat Mar 3 11:38:43 2012 UTC

25 25
26use common::sense; 26use common::sense;
27use POSIX (); 27use POSIX ();
28use Carp (); 28use Carp ();
29 29
30use AE (); 30use AnyEvent ();
31use Guard ();
31 32
32use AnyEvent::MP::Node; 33use AnyEvent::MP::Node;
33use AnyEvent::MP::Transport; 34use AnyEvent::MP::Transport;
34 35
35use base "Exporter"; 36use base "Exporter";
42 add_node load_func snd_to_func snd_on eval_on 43 add_node load_func snd_to_func snd_on eval_on
43 44
44 NODE $NODE node_of snd kil port_is_local 45 NODE $NODE node_of snd kil port_is_local
45 configure 46 configure
46 up_nodes mon_nodes node_is_up 47 up_nodes mon_nodes node_is_up
48 db_set db_del db_reg
47); 49);
48 50
49=item $AnyEvent::MP::Kernel::WARN->($level, $msg) 51=item $AnyEvent::MP::Kernel::WARN->($level, $msg)
50 52
51This value is called with an error or warning message, when e.g. a 53This value is called with an error or warning message, when e.g. a
374sub mon_nodes($) { 376sub mon_nodes($) {
375 my ($cb) = @_; 377 my ($cb) = @_;
376 378
377 $MON_NODES{$cb+0} = $cb; 379 $MON_NODES{$cb+0} = $cb;
378 380
379 defined wantarray && AnyEvent::Util::guard { delete $MON_NODES{$cb+0} } 381 defined wantarray && Guard::guard { delete $MON_NODES{$cb+0} }
380} 382}
381 383
382sub _inject_nodeevent($$;@) { 384sub _inject_nodeevent($$;@) {
383 my ($node, $up, @reason) = @_; 385 my ($node, $up, @reason) = @_;
384 386
417sub _unmonitor { 419sub _unmonitor {
418 delete $LMON{$_[1]}{$_[2]+0} 420 delete $LMON{$_[1]}{$_[2]+0}
419 if exists $LMON{$_[1]}; 421 if exists $LMON{$_[1]};
420} 422}
421 423
422our %node_req = ( 424our %NODE_REQ = (
423 # internal services 425 # internal services
424 426
425 # monitoring 427 # monitoring
426 mon0 => sub { # stop monitoring a port for another node 428 mon0 => sub { # stop monitoring a port for another node
427 my $portid = shift; 429 my $portid = shift;
474 476
475$NODE{""} = $NODE{$NODE} = new AnyEvent::MP::Node::Self $NODE; 477$NODE{""} = $NODE{$NODE} = new AnyEvent::MP::Node::Self $NODE;
476$PORT{""} = sub { 478$PORT{""} = sub {
477 my $tag = shift; 479 my $tag = shift;
478 #d#SECURE (load_func) 480 #d#SECURE (load_func)
479 eval { &{ $node_req{$tag} ||= load_func $tag } }; 481 eval { &{ $NODE_REQ{$tag} ||= load_func $tag } };
480 $WARN->(2, "error processing node message: $@") if $@; 482 $WARN->(2, "error processing node message: $@") if $@;
481}; 483};
482 484
483############################################################################# 485#############################################################################
484# seed management, try to keep connections to all seeds at all times 486# seed management, try to keep connections to all seeds at all times
524 } 526 }
525 ; 527 ;
526} 528}
527 529
528sub seed_all { 530sub seed_all {
529# my $next = List::Util::max 1,
530# $AnyEvent::MP::Kernel::CONFIG->{connect_interval}
531# * ($nodecnt ? keys %AnyEvent::MP::Kernel::NODE : 1)
532# - rand;
533
534 my @seeds = grep { 531 my @seeds = grep {
535 !exists $SEED_CONNECT{$_} 532 !exists $SEED_CONNECT{$_}
536 && !(defined $SEED_NODE{$_} && node_is_up $SEED_NODE{$_}) 533 && !(defined $SEED_NODE{$_} && node_is_up $SEED_NODE{$_})
537 } keys %SEED_NODE; 534 } keys %SEED_NODE;
538 535
604our $MASTER_MON; 601our $MASTER_MON;
605our %LOCAL_DB; # this node database 602our %LOCAL_DB; # this node database
606 603
607our %GLOBAL_DB; # all local databases, merged - empty on non-global nodes 604our %GLOBAL_DB; # all local databases, merged - empty on non-global nodes
608 605
609#our $sv_ne_json = JSON::XS->new->canonical;
610#
611#sub sv_ne($$) {
612# $sv_ne_json->encode ($_[0]) ne $sv_ne_json->encode ($_[1])
613#}
614
615# local database management
616sub ldb_set($$;$) {
617 warn "ldb_set<@_>\n";#d#
618 if (@_ > 2) {
619 $LOCAL_DB{$_[0]}{$_[1]} = $_[2];
620 snd $MASTER, g_add => $_[0] => $_[1] => $_[2]
621 if defined $MASTER;
622 } else {
623 delete $LOCAL_DB{$_[0]}{$_[1]};
624 snd $MASTER, g_del => $_[0] => $_[1]
625 if defined $MASTER;
626 }
627}
628
629############################################################################# 606#############################################################################
630# master selection 607# master selection
631 608
632# master requests 609# master requests
633our %GLOBAL_REQ; # $id => \@req 610our %GLOBAL_REQ; # $id => \@req
650sub g_find { 627sub g_find {
651 global_req_add "g_find $_[0]", g_find => $_[0]; 628 global_req_add "g_find $_[0]", g_find => $_[0];
652} 629}
653 630
654# reply for g_find started in Node.pm 631# reply for g_find started in Node.pm
655$node_req{g_found} = sub { 632$NODE_REQ{g_found} = sub {
656 global_req_del "g_find $_[0]"; 633 global_req_del "g_find $_[0]";
657 634
658 @{ $_[1] } or return; # d'oh
659
660 my $node = $NODE{$_[0]} or return; 635 my $node = $NODE{$_[0]} or return;
661 636
662 local $GLOBAL_DB{"'l"}{$_[0]} = $_[1]; #d# UGLY
663 $node->connect; 637 $node->connect_to ($_[1]);
664}; 638};
665 639
666sub master_set { 640sub master_set {
667 $MASTER = $_[0]; 641 $MASTER = $_[0];
668 642
696 }; 670 };
697 }; 671 };
698} 672}
699 673
700# other node wants to make us the master 674# other node wants to make us the master
701$node_req{g_slave} = sub { 675$NODE_REQ{g_slave} = sub {
702 my ($db) = @_; 676 my ($db) = @_;
703 677
704 warn "slave1\n";#d# 678 warn "slave1\n";#d#
705 679
706 require AnyEvent::MP::Global; 680 require AnyEvent::MP::Global;
707 &{ $node_req{g_slave} }; 681 &{ $NODE_REQ{g_slave} };
708}; 682};
709 683
710#$node_req{g_reply} = sub {
711# my $id = shift;
712#
713# my $cb = delete $GLOBAL_REQ{$id}
714# or return;
715#
716# $cb->[0]->(@_);
717#};
718
719############################################################################# 684#############################################################################
685# local database operations
720 686
721############################################################################# 687# local database management
688sub db_set($$$) {
689 $LOCAL_DB{$_[0]}{$_[1]} = $_[2];
690 snd $MASTER, g_add => $_[0] => $_[1] => $_[2]
691 if defined $MASTER;
692}
722 693
723# $WARN->(1, "$SRCNODE->{id} treats us as global node, but we aren't"); 694sub db_del($$) {
695 delete $LOCAL_DB{$_[0]}{$_[1]};
696 snd $MASTER, g_del => $_[0] => $_[1]
697 if defined $MASTER;
698}
699
700sub db_reg($$;$) {
701 my ($family, $key) = @_;
702 &db_set;
703 Guard::guard { db_del $family => $key }
704}
724 705
725############################################################################# 706#############################################################################
726# configure 707# configure
727 708
728sub _nodename { 709sub _nodename {
866 $LISTENER{$bind} = $listener; 847 $LISTENER{$bind} = $listener;
867 push @$LISTENER, $bind; 848 push @$LISTENER, $bind;
868 } 849 }
869 } 850 }
870 851
871 ldb_set "'l" => $NODE => $LISTENER; 852 db_set "'l" => $NODE => $LISTENER;
872 853
873 $WARN->(8, "node listens on [@$LISTENER]."); 854 $WARN->(8, "node listens on [@$LISTENER].");
874
875 # the global service is mandatory currently
876 #require AnyEvent::MP::Global;
877 855
878 # connect to all seednodes 856 # connect to all seednodes
879 set_seeds map $_->recv, map _resolve $_, @$seeds; 857 set_seeds map $_->recv, map _resolve $_, @$seeds;
880 858
881 master_search; 859 master_search;

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines