ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Node.pm
(Generate patch)

Comparing AnyEvent-MP/MP/Node.pm (file contents):
Revision 1.58 by root, Fri Mar 2 19:21:16 2012 UTC vs.
Revision 1.64 by root, Fri Mar 23 21:16:25 2012 UTC

10 10
11This is an internal utility module, horrible to look at, so don't. 11This is an internal utility module, horrible to look at, so don't.
12 12
13=cut 13=cut
14 14
15package AnyEvent::MP::Node; 15package AnyEvent::MP::Node; # base class for nodes
16 16
17use common::sense; 17use common::sense;
18 18
19use AE ();
20use AnyEvent::Util (); 19use AnyEvent ();
21use AnyEvent::Socket (); 20use AnyEvent::Socket ();
22 21
23use AnyEvent::MP::Transport (); 22use AnyEvent::MP::Transport ();
24 23
25sub new { 24sub new {
40sub send { 39sub send {
41 &{ shift->{send} } 40 &{ shift->{send} }
42} 41}
43 42
44# nodes reachable via the network 43# nodes reachable via the network
45package AnyEvent::MP::Node::Remote; 44package AnyEvent::MP::Node::Remote; # a remote node
46 45
47use base "AnyEvent::MP::Node"; 46use base "AnyEvent::MP::Node";
48 47
49# called at init time, mostly sets {send} 48# called at init time, mostly sets {send}
50sub transport_reset { 49sub transport_reset {
92 delete $self->{trial}; 91 delete $self->{trial};
93 92
94 $self->transport_error (transport_error => $self->{id}, "switched connections") 93 $self->transport_error (transport_error => $self->{id}, "switched connections")
95 if $self->{transport}; 94 if $self->{transport};
96 95
97 delete $self->{connect_addr};
98 delete $self->{connect_w}; 96 delete $self->{connect_w};
99 delete $self->{connect_to}; 97 delete $self->{connect_to};
100 98
101 $self->{transport} = $transport; 99 $self->{transport} = $transport;
102 100
115 113
116 return if $self->{transport}; 114 return if $self->{transport};
117 return if $self->{connect_w}; 115 return if $self->{connect_w};
118 116
119 Scalar::Util::weaken $self; 117 Scalar::Util::weaken $self;
118 use Carp;Carp::cluck;#d#
120 119
121 my $monitor = $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout}; 120 $self->{connect_to} ||= AE::timer $AnyEvent::MP::Kernel::CONFIG->{connect_interval}, 0, sub {
122
123 $self->{connect_to} ||= AE::timer $monitor, 0, sub {
124 $self->transport_error (transport_error => $self->{id}, "unable to connect"); 121 $self->transport_error (transport_error => $self->{id}, "unable to connect");
125 }; 122 };
126 123
127 # maybe @$addresses? 124 # maybe @$addresses?
128 my @addresses = @{ $AnyEvent::MP::Kernel::GLOBAL_DB{"'l"}{$self->{id}} }; 125 my $addresses = $AnyEvent::MP::Kernel::GLOBAL_DB{"'l"}{$self->{id}};
129 126
130 unless (@addresses) { 127 if ($addresses) {
128 $self->connect_to ($addresses);
129 } else {
131 # on global nodes, all bets are off now - we either know the node, or we don't 130 # on global nodes, all bets are off now - we either know the node, or we don't
132 unless ($AnyEvent::MP::Kernel::GLOBAL) { 131 unless ($AnyEvent::MP::Kernel::GLOBAL) {
133 AnyEvent::MP::Kernel::g_find ($self->{id}); 132 AnyEvent::MP::Kernel::g_find ($self->{id});
134 } 133 }
135 return; 134 }
135}
136
137sub connect_to {
138 my ($self, $addresses) = @_;
139
140 return if $self->{transport};
141 return if $self->{connect_w};
142
143 return unless @$addresses;
136 } 144
137
138 $self->{connect_addr} = \@addresses; # a bit weird, but efficient
139
140 $AnyEvent::MP::Kernel::WARN->(9, "connecting to $self->{id} with [@addresses]"); 145 AE::log 9 => "connecting to $self->{id} with [@$addresses]";
141 146
147 my $monitor = $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout};
142 my $interval = $AnyEvent::MP::Kernel::CONFIG->{connect_interval}; 148 my $interval = $AnyEvent::MP::Kernel::CONFIG->{connect_interval};
143 149
144 $interval = ($monitor - $interval) / @addresses 150 $interval = ($monitor - $interval) / @$addresses
145 if ($monitor - $interval) / @addresses < $interval; 151 if ($monitor - $interval) / @$addresses < $interval;
146 152
147 $interval = 0.4 if $interval < 0.4; 153 $interval = 0.4 if $interval < 0.4;
148 154
149 my @endpoints; 155 my @endpoints = reverse @$addresses;
150 156
151 $self->{connect_w} = AE::timer 0, $interval * (0.9 + 0.1 * rand), sub { 157 $self->{connect_w} = AE::timer 0, $interval * (0.9 + 0.1 * rand), sub {
152 @endpoints = @addresses
153 unless @endpoints;
154
155 my $endpoint = shift @endpoints; 158 my $endpoint = pop @endpoints;
156 159
157 $AnyEvent::MP::Kernel::WARN->(9, "connecting to $self->{id} at $endpoint"); 160 AE::log 9 => "connecting to $self->{id} at $endpoint";
158 161
159 $self->{trial}{$endpoint} ||= do { 162 $self->{trial}{$endpoint} ||= do {
160 my ($host, $port) = AnyEvent::Socket::parse_hostport $endpoint 163 my ($host, $port) = AnyEvent::Socket::parse_hostport $endpoint
161 or return $AnyEvent::MP::Kernel::WARN->(1, "$self->{id}: not a resolved node reference."); 164 or return AE::log critical => "$self->{id}: '$endpoint' is not a resolved node reference.";
162 165
163 AnyEvent::MP::Transport::mp_connect 166 AnyEvent::MP::Transport::mp_connect
164 $host, $port, 167 $host, $port,
165 sub { delete $self->{trial}{$endpoint} }, 168 sub { delete $self->{trial}{$endpoint} },
166 }; 169 };
196 $self->send (["", mon0 => $portid]); 199 $self->send (["", mon0 => $portid]);
197 delete $self->{monitor}{$portid}; 200 delete $self->{monitor}{$portid};
198 } 201 }
199} 202}
200 203
201package AnyEvent::MP::Node::Self; 204package AnyEvent::MP::Node::Self; # the local node
202 205
203use base "AnyEvent::MP::Node"; 206use base "AnyEvent::MP::Node";
204 207
205sub connect { 208sub connect {
206 # we are trivially connected 209 # we are trivially connected
209# delay every so often to avoid recursion, also used to delay after spawn 212# delay every so often to avoid recursion, also used to delay after spawn
210our $DELAY = -50; 213our $DELAY = -50;
211our @DELAY; 214our @DELAY;
212our $DELAY_W; 215our $DELAY_W;
213 216
214sub _send_delayed { 217our $send_delayed = sub {
215 local $AnyEvent::MP::Kernel::SRCNODE = $AnyEvent::MP::Kernel::NODE{""}; 218 $AnyEvent::MP::Kernel::SRCNODE = $AnyEvent::MP::Kernel::NODE;
216 (shift @DELAY)->() 219 (shift @DELAY)->()
217 while @DELAY; 220 while @DELAY;
218 undef $DELAY_W; 221 undef $DELAY_W;
219 $DELAY = -50; 222 $DELAY = -50;
220} 223};
221 224
222sub transport_reset { 225sub transport_reset {
223 my ($self) = @_; 226 my ($self) = @_;
224 227
225 Scalar::Util::weaken $self; 228 Scalar::Util::weaken $self;
226 229
227 $self->{send} = sub { 230 $self->{send} = sub {
228 if ($DELAY++ >= 0) { 231 if (++$DELAY > 0) {
229 my $msg = $_[0]; 232 my $msg = $_[0];
230 push @DELAY, sub { AnyEvent::MP::Kernel::_inject (@$msg) }; 233 push @DELAY, sub { AnyEvent::MP::Kernel::_inject (@$msg) };
231 $DELAY_W ||= AE::timer 0, 0, \&_send_delayed; 234 $DELAY_W ||= AE::timer 0, 0, $send_delayed;
232 return; 235 return;
233 } 236 }
234 237
235 local $AnyEvent::MP::Kernel::SRCNODE = $self; 238 local $AnyEvent::MP::Kernel::SRCNODE = $AnyEvent::MP::Kernel::NODE;
236 AnyEvent::MP::Kernel::_inject (@{ $_[0] }); 239 AnyEvent::MP::Kernel::_inject (@{ $_[0] });
237 }; 240 };
238} 241}
239 242
240sub transport_connect { 243sub transport_connect {
241 my ($self, $tp) = @_; 244 my ($self, $tp) = @_;
242 245
243 $AnyEvent::MP::Kernel::WARN->(9, "I refuse to talk to myself ($tp->{peerhost}:$tp->{peerport})"); 246 AE::log 9 => "I refuse to talk to myself ($tp->{peerhost}:$tp->{peerport})";
244} 247}
245 248
246sub kill { 249sub kill {
247 my (undef, @args) = @_; 250 my (undef, @args) = @_;
248 251
249 # we _always_ delay kil's, to avoid calling mon callbacks 252 # we _always_ delay kil's, to avoid calling mon callbacks
250 # from anything but the event loop context. 253 # from anything but the event loop context.
251 $DELAY = 1; 254 $DELAY = 1;
252 push @DELAY, sub { AnyEvent::MP::Kernel::_kill (@args) }; 255 push @DELAY, sub { AnyEvent::MP::Kernel::_kill (@args) };
253 $DELAY_W ||= AE::timer 0, 0, \&_send_delayed; 256 $DELAY_W ||= AE::timer 0, 0, $send_delayed;
254} 257}
255 258
256sub monitor { 259sub monitor {
257 # maybe always delay, too? 260 # maybe always delay, too?
258 if ($DELAY_W) { 261 if ($DELAY_W) {

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines