ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Global.pm
(Generate patch)

Comparing AnyEvent-MP/MP/Global.pm (file contents):
Revision 1.21 by root, Sun Sep 6 00:13:21 2009 UTC vs.
Revision 1.22 by root, Tue Sep 8 01:38:16 2009 UTC

32use AnyEvent (); 32use AnyEvent ();
33use AnyEvent::Util (); 33use AnyEvent::Util ();
34 34
35use AnyEvent::MP; 35use AnyEvent::MP;
36use AnyEvent::MP::Kernel; 36use AnyEvent::MP::Kernel;
37use AnyEvent::MP::Transport ();
37 38
38use base "Exporter"; 39use base "Exporter";
39 40
40our @EXPORT = qw( 41our @EXPORT = qw(
41 grp_reg 42 grp_reg
49 50
50our %port; # our rendezvous port on the other side 51our %port; # our rendezvous port on the other side
51our %lreg; # local registry, name => [pid...] 52our %lreg; # local registry, name => [pid...]
52our %lmon; # local registry monitoring name,pid => mon 53our %lmon; # local registry monitoring name,pid => mon
53our %greg; # global regstry, name => pid => undef 54our %greg; # global regstry, name => pid => undef
54our %gmon; # group monitorign, group => [$cb...] 55our %gmon; # group monitoring, group => [$cb...]
55 56
56our $nodecnt; 57our $nodecnt;
57 58
58$AnyEvent::MP::Kernel::WARN->(7, "starting global service."); 59$AnyEvent::MP::Kernel::WARN->(7, "starting global service.");
59 60
60############################################################################# 61#############################################################################
61# seednodes 62# seednodes
62 63
63our @SEEDS; 64our @SEEDS;
65our %SEEDS; # just to check whether a seed is a seed
64our %SEED_CONNECT; 66our %SEED_CONNECT;
65our $SEED_WATCHER; 67our $SEED_WATCHER;
66 68
69push @AnyEvent::MP::Transport::HOOK_CONNECT, sub {
70 my $peer = $_[0]{local_greeting}{peeraddr};
71 return unless exists $SEEDS{$peer};
72 $SEED_CONNECT{$peer} = 2;
73};
74
75push @AnyEvent::MP::Transport::HOOK_CONNECTED, sub {
76 my $peer = $_[0]{local_greeting}{peeraddr};
77 return unless exists $SEEDS{$peer};
78 $SEEDS{$peer} = $_[0]{remote_node};
79};
80
81push @AnyEvent::MP::Transport::HOOK_DESTROY, sub {
82 delete $SEED_CONNECT{$_[0]{local_greeting}{peeraddr}};
83};
84
67sub seed_connect { 85sub seed_connect {
68 my ($seed) = @_; 86 my ($seed) = @_;
69 87
70 my ($host, $port) = AnyEvent::Socket::parse_hostport $seed 88 my ($host, $port) = AnyEvent::Socket::parse_hostport $seed
71 or Carp::croak "$seed: unparsable seed address"; 89 or Carp::croak "$seed: unparsable seed address";
90
91 return if $SEED_CONNECT{$seed};
92
93 $AnyEvent::MP::Kernel::WARN->(9, "trying connect to seed node $seed.");
72 94
73 # ughhh 95 # ughhh
74 $SEED_CONNECT{$seed} ||= AnyEvent::MP::Transport::mp_connect $host, $port, 96 $SEED_CONNECT{$seed} ||= AnyEvent::MP::Transport::mp_connect $host, $port,
75 seed => $seed, 97 seed => $seed,
76 sub { 98 sub {
77 delete $SEED_CONNECT{$seed}; 99 $SEED_CONNECT{$seed} = 1;
78 after 1, \&more_seeding;
79 }, 100 },
80 ; 101 ;
81} 102}
82 103
83sub more_seeding { 104sub more_seeding {
84 return if $nodecnt; 105 my $int = List::Util::max 1,
106 $AnyEvent::MP::Kernel::CONFIG->{connect_interval}
107 * ($nodecnt ? keys %AnyEvent::MP::Kernel::NODE : 1)
108 - rand;
109
110 $SEED_WATCHER = AE::timer $int, 0, \&more_seeding;
111
112 @SEEDS = keys %SEEDS unless @SEEDS;
85 return unless @SEEDS; 113 return unless @SEEDS;
86 114
87 $AnyEvent::MP::Kernel::WARN->(9, "no nodes connected, seeding."); 115 $AnyEvent::MP::Kernel::WARN->(9, "no nodes connected, seeding.");
88 116
89 seed_connect $SEEDS[rand @SEEDS]; 117 seed_connect splice @SEEDS, rand @SEEDS, 1;
90} 118}
91 119
92sub avoid_seed($) { 120sub avoid_seed($) {
93 @SEEDS = grep $_ ne $_[0], @SEEDS; 121 @SEEDS = grep $_ ne $_[0], @SEEDS;
94} 122}
95 123
96sub set_seeds(@) { 124sub set_seeds(@) {
97 @SEEDS = @_; 125 @SEEDS{@_} = ();
98 126
99 $SEED_WATCHER ||= AE::timer 5, $AnyEvent::MP::Kernel::MONITOR_TIMEOUT, \&more_seeding; 127 $SEED_WATCHER ||= AE::timer 5, $AnyEvent::MP::Kernel::MONITOR_TIMEOUT, \&more_seeding;
100 128
101 for my $seed (@SEEDS) { 129 for (1 .. keys %SEEDS) {
102 after 0.100 * rand, sub { seed_connect $seed }; 130 after 0.100 * rand, \&more_seeding;
103 } 131 }
132}
133
134# returns all (up) seed nodes, or all nodes if no seednodes are up/known
135sub _route_nodes {
136 my @seeds = grep node_is_up $_, values %SEEDS;
137 @seeds = up_nodes unless @seeds;
138 @seeds
104} 139}
105 140
106############################################################################# 141#############################################################################
107 142
108sub _change { 143sub _change {

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines