ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Kernel.pm
(Generate patch)

Comparing AnyEvent-MP/MP/Kernel.pm (file contents):
Revision 1.110 by root, Sat Mar 24 16:50:15 2012 UTC vs.
Revision 1.111 by root, Sun Mar 25 20:50:49 2012 UTC

113our (%PORT, %PORT_DATA); # local ports 113our (%PORT, %PORT_DATA); # local ports
114 114
115our %RMON; # local ports monitored by remote nodes ($RMON{nodeid}{portid} == cb) 115our %RMON; # local ports monitored by remote nodes ($RMON{nodeid}{portid} == cb)
116our %LMON; # monitored _local_ ports 116our %LMON; # monitored _local_ ports
117 117
118our $GLOBAL; # true if node is a global ("directory") node 118#our $GLOBAL; # true if node is a global ("directory") node
119our %BINDS; 119our %BINDS;
120our $BINDS; # our listeners, as arrayref 120our $BINDS; # our listeners, as arrayref
121 121
122our $SRCNODE; # holds the sending node _object_ during _inject 122our $SRCNODE; # holds the sending node _object_ during _inject
123our $GLOBAL; # true when this is a global node (only set by AnyEvent::MP::Global)
123 124
124# initialise names for non-networked operation 125# initialise names for non-networked operation
125{ 126{
126 # ~54 bits, for local port names, lowercase $ID appended 127 # ~54 bits, for local port names, lowercase $ID appended
127 my $now = AE::now; 128 my $now = AE::now;
462our %SEED_NODE; # seed ID => node ID|undef 463our %SEED_NODE; # seed ID => node ID|undef
463our %NODE_SEED; # map node ID to seed ID 464our %NODE_SEED; # map node ID to seed ID
464our %SEED_CONNECT; # $seed => transport_connector | 1=connected | 2=connecting 465our %SEED_CONNECT; # $seed => transport_connector | 1=connected | 2=connecting
465our $SEED_WATCHER; 466our $SEED_WATCHER;
466our $SEED_RETRY; 467our $SEED_RETRY;
468our %GLOBAL_NODE; # global => undef
467 469
468sub seed_connect { 470sub seed_connect {
469 my ($seed) = @_; 471 my ($seed) = @_;
470 472
471 my ($host, $port) = AnyEvent::Socket::parse_hostport $seed 473 my ($host, $port) = AnyEvent::Socket::parse_hostport $seed
487 require AnyEvent::MP::Global; # every seed becomes a global node currently 489 require AnyEvent::MP::Global; # every seed becomes a global node currently
488 delete $SEED_NODE{$seed}; 490 delete $SEED_NODE{$seed};
489 } else { 491 } else {
490 $SEED_NODE{$seed} = $_[0]{remote_node}; 492 $SEED_NODE{$seed} = $_[0]{remote_node};
491 $NODE_SEED{$_[0]{remote_node}} = $seed; 493 $NODE_SEED{$_[0]{remote_node}} = $seed;
494
492 # also start global service, if not running 495 # also start global service, in case it isn't running
493 # we need to check here in addition to the mon_nodes below 496 # since we probably switch conenctions, maybe we don't need to do this here?
494 # because we might only learn late that a node is a seed
495 # and then we might already be connected
496 snd $_[0]{remote_node}, "g_slave" 497 snd $_[0]{remote_node}, "g_slave";
497 unless $_[0]{remote_greeting}{global};
498 } 498 }
499 }, 499 },
500 sub { 500 sub {
501 delete $SEED_CONNECT{$seed}; 501 delete $SEED_CONNECT{$seed};
502 } 502 }
512 if (@seeds) { 512 if (@seeds) {
513 # start connection attempt for every seed we are not connected to yet 513 # start connection attempt for every seed we are not connected to yet
514 seed_connect $_ 514 seed_connect $_
515 for @seeds; 515 for @seeds;
516 516
517 $SEED_RETRY = $SEED_RETRY * 2 + rand; 517 $SEED_RETRY = $SEED_RETRY * 2;
518 $SEED_RETRY = $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout} 518 $SEED_RETRY = $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout}
519 if $SEED_RETRY > $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout}; 519 if $SEED_RETRY > $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout};
520 520
521 $SEED_WATCHER = AE::timer $SEED_RETRY, 0, \&seed_all; 521 $SEED_WATCHER = AE::timer $SEED_RETRY, 0, \&seed_all;
522 522
525 undef $SEED_WATCHER; 525 undef $SEED_WATCHER;
526 } 526 }
527} 527}
528 528
529sub seed_again { 529sub seed_again {
530 $SEED_RETRY = 1; 530 $SEED_RETRY = (1 + rand) * 0.6;
531 $SEED_WATCHER ||= AE::timer 1, 0, \&seed_all; 531 $SEED_WATCHER ||= AE::timer $SEED_RETRY, 0, \&seed_all;
532} 532}
533 533
534# sets new seed list, starts connecting 534# sets new seed list, starts connecting
535sub set_seeds(@) { 535sub set_seeds(@) {
536 %SEED_NODE = (); 536 %SEED_NODE = ();
540 @SEED_NODE{@_} = (); 540 @SEED_NODE{@_} = ();
541 541
542 seed_all; 542 seed_all;
543} 543}
544 544
545# normal nodes only record global node connections
546$NODE_REQ{g_global} = sub {
547 undef $GLOBAL_NODE{$SRCNODE};
548};
549
545mon_nodes sub { 550mon_nodes sub {
551 delete $GLOBAL_NODE{$_[0]}
552 unless $_[1];
553
546 return unless exists $NODE_SEED{$_[0]}; 554 return unless exists $NODE_SEED{$_[0]};
547 555
548 if ($_[1]) { 556 if ($_[1]) {
549 # each time a connection to a seed node goes up, make 557 # each time a connection to a seed node goes up, make
550 # sure it runs the global service. 558 # sure it runs the global service.
551 snd $_[0], "g_slave" 559 snd $_[0], "g_slave";
552 unless $NODE{$_[0]}{transport}{remote_greeting}{global};
553 } else { 560 } else {
554 # if we lost the connection to a seed node, make sure we are seeding 561 # if we lost the connection to a seed node, make sure we are seeding
555 seed_again; 562 seed_again;
556 } 563 }
557}; 564};
572 . "."; 579 . ".";
573 580
574 (add_node $_)->connect 581 (add_node $_)->connect
575 for keys %KEEPALIVE_DOWN; 582 for keys %KEEPALIVE_DOWN;
576 583
577 $KEEPALIVE_RETRY = $KEEPALIVE_RETRY * 2 + rand; 584 $KEEPALIVE_RETRY = $KEEPALIVE_RETRY * 2;
578 $KEEPALIVE_RETRY = $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout} 585 $KEEPALIVE_RETRY = $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout}
579 if $KEEPALIVE_RETRY > $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout}; 586 if $KEEPALIVE_RETRY > $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout};
580 587
581 $KEEPALIVE_WATCHER = AE::timer $KEEPALIVE_RETRY, 0, \&keepalive_all; 588 $KEEPALIVE_WATCHER = AE::timer $KEEPALIVE_RETRY, 0, \&keepalive_all;
582} 589}
583 590
584sub keepalive_again { 591sub keepalive_again {
585 $KEEPALIVE_RETRY = 1; 592 $KEEPALIVE_RETRY = (1 + rand) * 0.3;
586 keepalive_all; 593 keepalive_all;
587} 594}
588 595
589sub keepalive_add { 596sub keepalive_add {
590 return if $KEEPALIVE{$_[0]}++; 597 return if $KEEPALIVE{$_[0]}++;
623# talk with/to global nodes 630# talk with/to global nodes
624 631
625# protocol messages: 632# protocol messages:
626# 633#
627# sent by global nodes 634# sent by global nodes
628# g_global - node became global, similar to global=1 greeting 635# g_global - global nodes send this to all others
629# 636#
630# database protocol 637# database protocol
631# g_slave database - make other global node master of the sender 638# g_slave database - make other global node master of the sender
632# g_set database - global node's database to other global nodes 639# g_set database - global node's database to other global nodes
633# g_upd family set del - update single family (all to global) 640# g_upd family set del - update single family (any to global)
634# 641#
635# slave <-> global protocol 642# slave <-> global protocol
636# g_find node - query addresses for node (slave to global) 643# g_find node - query addresses for node (slave to global)
637# g_found node binds - node addresses (global to slave) 644# g_found node binds - node addresses (global to slave)
638# g_db_family family id - send g_reply with data (global to slave) 645# g_db_family family id - send g_reply with data (global to slave)
642# g_mon1 family - start to monitor family, replies with g_chg1 649# g_mon1 family - start to monitor family, replies with g_chg1
643# g_mon0 family - stop monitoring family 650# g_mon0 family - stop monitoring family
644# g_chg1 family hash - initial value of family when starting to monitor 651# g_chg1 family hash - initial value of family when starting to monitor
645# g_chg2 family set del - like g_upd, but for monitoring only 652# g_chg2 family set del - like g_upd, but for monitoring only
646# 653#
647# internal database families 654# internal database families:
648# "'l" -> node -> listeners 655# "'l" -> node -> listeners
649# "'g" -> node -> undef 656# "'g" -> node -> undef
650# ... 657# ...
651# 658#
652 659
653# used on all nodes: 660# used on all nodes:
654our $MASTER; # the global node we bind ourselves to 661our $MASTER; # the global node we bind ourselves to
655our $MASTER_MON; 662our $MASTER_MON;
656our %LOCAL_DB; # this node database 663our %LOCAL_DB; # this node database
657
658our %GLOBAL_DB; # all local databases, merged - empty on non-global nodes
659 664
660our $GPROTO = 1; 665our $GPROTO = 1;
661 666
662# tell everybody who connects our nproto 667# tell everybody who connects our nproto
663push @AnyEvent::MP::Transport::HOOK_GREET, sub { 668push @AnyEvent::MP::Transport::HOOK_GREET, sub {
720 $node->connect_to ($_[1]); 725 $node->connect_to ($_[1]);
721}; 726};
722 727
723sub master_set { 728sub master_set {
724 $MASTER = $_[0]; 729 $MASTER = $_[0];
730 AE::log 8 => "new master node: $MASTER.";
731
732 $MASTER_MON = mon_nodes sub {
733 if ($_[0] eq $MASTER && !$_[1]) {
734 undef $MASTER;
735 master_search ();
736 }
737 };
725 738
726 snd $MASTER, g_slave => \%LOCAL_DB; 739 snd $MASTER, g_slave => \%LOCAL_DB;
727 740
728 # (re-)send queued requests 741 # (re-)send queued requests
729 snd $MASTER, @$_ 742 snd $MASTER, @$_
730 for values %GLOBAL_REQ; 743 for values %GLOBAL_REQ;
731} 744}
732 745
733sub master_search { 746sub master_search {
747 AE::log 9 => "starting search for master node.";
748
734 #TODO: should also look for other global nodes, but we don't know them #d# 749 #TODO: should also look for other global nodes, but we don't know them #d#
735 for (keys %NODE_SEED) { 750 for (keys %NODE_SEED) {
736 if (node_is_up $_) { 751 if (node_is_up $_) {
737 master_set $_; 752 master_set $_;
738 return; 753 return;
742 $MASTER_MON = mon_nodes sub { 757 $MASTER_MON = mon_nodes sub {
743 return unless $_[1]; # we are only interested in node-ups 758 return unless $_[1]; # we are only interested in node-ups
744 return unless $NODE_SEED{$_[0]}; # we are only interested in seed nodes 759 return unless $NODE_SEED{$_[0]}; # we are only interested in seed nodes
745 760
746 master_set $_[0]; 761 master_set $_[0];
747
748 $MASTER_MON = mon_nodes sub {
749 if ($_[0] eq $MASTER && !$_[1]) {
750 undef $MASTER;
751 master_search ();
752 }
753 };
754 }; 762 };
755} 763}
756 764
757# other node wants to make us the master, so start the global service 765# other node wants to make us the master, so start the global service
758$NODE_REQ{g_slave} = sub { 766$NODE_REQ{g_slave} = sub {
759 my ($db) = @_;
760
761 # load global module and redo the request 767 # load global module and redo the request
762 require AnyEvent::MP::Global; 768 require AnyEvent::MP::Global;
763 &{ $NODE_REQ{g_slave} } 769 &{ $NODE_REQ{g_slave} }
764}; 770};
765 771
766############################################################################# 772#############################################################################
767# local database operations 773# local database operations
768 774
775# canonical probably not needed
776our $sv_eq_coder = JSON::XS->new->utf8->allow_nonref;
777
778# are the two scalars equal? very very ugly and slow, need better way
779sub sv_eq($$) {
780 ref $_[0] || ref $_[1]
781 ? (JSON::XS::encode $sv_eq_coder, $_[0]) eq (JSON::XS::encode $sv_eq_coder, $_[1])
782 : $_[0] eq $_[1]
783 && defined $_[0] == defined $_[1]
784}
785
769# local database management 786# local database management
787
788sub db_del($@) {
789 my $family = shift;
790
791 my @del = grep exists $LOCAL_DB{$family}{$_}, @_;
792
793 return unless @del;
794
795 delete @{ $LOCAL_DB{$family} }{@del};
796 snd $MASTER, g_upd => $family => undef, \@del
797 if defined $MASTER;
798}
770 799
771sub db_set($$;$) { 800sub db_set($$;$) {
772 my ($family, $subkey) = @_; 801 my ($family, $subkey) = @_;
773 802
774# if (ref $_[1]) { 803# if (ref $_[1]) {
777# $LOCAL_DB{$_[0]} = $_[1]; 806# $LOCAL_DB{$_[0]} = $_[1];
778# snd $MASTER, g_upd => $_[0] => $_[1], \@del 807# snd $MASTER, g_upd => $_[0] => $_[1], \@del
779# if defined $MASTER; 808# if defined $MASTER;
780# } else { 809# } else {
781 # single-key 810 # single-key
811 unless (exists $LOCAL_DB{$family}{$subkey} && sv_eq $LOCAL_DB{$family}{$subkey}, $_[2]) {
782 $LOCAL_DB{$family}{$subkey} = $_[2]; 812 $LOCAL_DB{$family}{$subkey} = $_[2];
783 snd $MASTER, g_upd => $family => { $subkey => $_[2] } 813 snd $MASTER, g_upd => $family => { $subkey => $_[2] }
784 if defined $MASTER; 814 if defined $MASTER;
815 }
785# } 816# }
786 817
787 defined wantarray 818 defined wantarray
788 and Guard::guard { db_del $family => $subkey } 819 and Guard::guard { db_del $family => $subkey }
789}
790
791sub db_del($@) {
792 my $family = shift;
793
794 my @del = grep exists $LOCAL_DB{$family}{$_}, @_;
795
796 return unless @del;
797
798 delete @{ $LOCAL_DB{$family} }{@del};
799 snd $MASTER, g_upd => $family => undef, \@del
800 if defined $MASTER;
801} 820}
802 821
803# database query 822# database query
804 823
805sub db_family { 824sub db_family {

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines