ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Node.pm
(Generate patch)

Comparing AnyEvent-MP/MP/Node.pm (file contents):
Revision 1.59 by root, Sat Mar 3 11:38:43 2012 UTC vs.
Revision 1.65 by root, Fri Mar 23 21:16:36 2012 UTC

10 10
11This is an internal utility module, horrible to look at, so don't. 11This is an internal utility module, horrible to look at, so don't.
12 12
13=cut 13=cut
14 14
15package AnyEvent::MP::Node; 15package AnyEvent::MP::Node; # base class for nodes
16 16
17use common::sense; 17use common::sense;
18 18
19use AE ();
20use AnyEvent::Util (); 19use AnyEvent ();
21use AnyEvent::Socket (); 20use AnyEvent::Socket ();
22 21
23use AnyEvent::MP::Transport (); 22use AnyEvent::MP::Transport ();
24 23
25sub new { 24sub new {
40sub send { 39sub send {
41 &{ shift->{send} } 40 &{ shift->{send} }
42} 41}
43 42
44# nodes reachable via the network 43# nodes reachable via the network
45package AnyEvent::MP::Node::Remote; 44package AnyEvent::MP::Node::Remote; # a remote node
46 45
47use base "AnyEvent::MP::Node"; 46use base "AnyEvent::MP::Node";
48 47
49# called at init time, mostly sets {send} 48# called at init time, mostly sets {send}
50sub transport_reset { 49sub transport_reset {
140 return if $self->{transport}; 139 return if $self->{transport};
141 return if $self->{connect_w}; 140 return if $self->{connect_w};
142 141
143 return unless @$addresses; 142 return unless @$addresses;
144 143
145 $AnyEvent::MP::Kernel::WARN->(9, "connecting to $self->{id} with [@$addresses]"); 144 AE::log 9 => "connecting to $self->{id} with [@$addresses]";
146 145
147 my $monitor = $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout}; 146 my $monitor = $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout};
148 my $interval = $AnyEvent::MP::Kernel::CONFIG->{connect_interval}; 147 my $interval = $AnyEvent::MP::Kernel::CONFIG->{connect_interval};
149 148
150 $interval = ($monitor - $interval) / @$addresses 149 $interval = ($monitor - $interval) / @$addresses
155 my @endpoints = reverse @$addresses; 154 my @endpoints = reverse @$addresses;
156 155
157 $self->{connect_w} = AE::timer 0, $interval * (0.9 + 0.1 * rand), sub { 156 $self->{connect_w} = AE::timer 0, $interval * (0.9 + 0.1 * rand), sub {
158 my $endpoint = pop @endpoints; 157 my $endpoint = pop @endpoints;
159 158
160 $AnyEvent::MP::Kernel::WARN->(9, "connecting to $self->{id} at $endpoint"); 159 AE::log 9 => "connecting to $self->{id} at $endpoint";
161 160
162 $self->{trial}{$endpoint} ||= do { 161 $self->{trial}{$endpoint} ||= do {
163 my ($host, $port) = AnyEvent::Socket::parse_hostport $endpoint 162 my ($host, $port) = AnyEvent::Socket::parse_hostport $endpoint
164 or return $AnyEvent::MP::Kernel::WARN->(1, "$self->{id}: not a resolved node reference."); 163 or return AE::log critical => "$self->{id}: '$endpoint' is not a resolved node reference.";
165 164
166 AnyEvent::MP::Transport::mp_connect 165 AnyEvent::MP::Transport::mp_connect
167 $host, $port, 166 $host, $port,
168 sub { delete $self->{trial}{$endpoint} }, 167 sub { delete $self->{trial}{$endpoint} },
169 }; 168 };
170 }; 169 };
171} 170}
172 171
173sub connect {
174 my ($self) = @_;
175
176 return if $self->{transport};
177 return if $self->{connect_w};
178
179 Scalar::Util::weaken $self;
180
181 my $monitor = $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout};
182
183 $self->{connect_to} ||= AE::timer $monitor, 0, sub {
184 $self->transport_error (transport_error => $self->{id}, "unable to connect");
185 };
186
187 # maybe @$addresses?
188 my @addresses = @{ $AnyEvent::MP::Kernel::GLOBAL_DB{"'l"}{$self->{id}} };
189
190 unless (@addresses) {
191 # on global nodes, all bets are off now - we either know the node, or we don't
192 unless ($AnyEvent::MP::Kernel::GLOBAL) {
193 AnyEvent::MP::Kernel::g_find ($self->{id});
194 }
195 return;
196 }
197
198 $AnyEvent::MP::Kernel::WARN->(9, "connecting to $self->{id} with [@addresses]");
199
200 my $interval = $AnyEvent::MP::Kernel::CONFIG->{connect_interval};
201
202 $interval = ($monitor - $interval) / @addresses
203 if ($monitor - $interval) / @addresses < $interval;
204
205 $interval = 0.4 if $interval < 0.4;
206
207 my @endpoints;
208
209 $self->{connect_w} = AE::timer 0, $interval * (0.9 + 0.1 * rand), sub {
210 @endpoints = @addresses
211 unless @endpoints;
212
213 my $endpoint = shift @endpoints;
214
215 $AnyEvent::MP::Kernel::WARN->(9, "connecting to $self->{id} at $endpoint");
216
217 $self->{trial}{$endpoint} ||= do {
218 my ($host, $port) = AnyEvent::Socket::parse_hostport $endpoint
219 or return $AnyEvent::MP::Kernel::WARN->(1, "$self->{id}: not a resolved node reference.");
220
221 AnyEvent::MP::Transport::mp_connect
222 $host, $port,
223 sub { delete $self->{trial}{$endpoint} },
224 };
225 };
226}
227
228sub kill { 172sub kill {
229 my ($self, $port, @reason) = @_; 173 my ($self, $port, @reason) = @_;
230 174
231 $self->{send} (["", kil => $port, @reason]); 175 $self->{send} (["", kil => $port, @reason]);
232} 176}
254 $self->send (["", mon0 => $portid]); 198 $self->send (["", mon0 => $portid]);
255 delete $self->{monitor}{$portid}; 199 delete $self->{monitor}{$portid};
256 } 200 }
257} 201}
258 202
259package AnyEvent::MP::Node::Self; 203package AnyEvent::MP::Node::Self; # the local node
260 204
261use base "AnyEvent::MP::Node"; 205use base "AnyEvent::MP::Node";
262 206
263sub connect { 207sub connect {
264 # we are trivially connected 208 # we are trivially connected
267# delay every so often to avoid recursion, also used to delay after spawn 211# delay every so often to avoid recursion, also used to delay after spawn
268our $DELAY = -50; 212our $DELAY = -50;
269our @DELAY; 213our @DELAY;
270our $DELAY_W; 214our $DELAY_W;
271 215
272sub _send_delayed { 216our $send_delayed = sub {
273 local $AnyEvent::MP::Kernel::SRCNODE = $AnyEvent::MP::Kernel::NODE{""}; 217 $AnyEvent::MP::Kernel::SRCNODE = $AnyEvent::MP::Kernel::NODE;
274 (shift @DELAY)->() 218 (shift @DELAY)->()
275 while @DELAY; 219 while @DELAY;
276 undef $DELAY_W; 220 undef $DELAY_W;
277 $DELAY = -50; 221 $DELAY = -50;
278} 222};
279 223
280sub transport_reset { 224sub transport_reset {
281 my ($self) = @_; 225 my ($self) = @_;
282 226
283 Scalar::Util::weaken $self; 227 Scalar::Util::weaken $self;
284 228
285 $self->{send} = sub { 229 $self->{send} = sub {
286 if ($DELAY++ >= 0) { 230 if (++$DELAY > 0) {
287 my $msg = $_[0]; 231 my $msg = $_[0];
288 push @DELAY, sub { AnyEvent::MP::Kernel::_inject (@$msg) }; 232 push @DELAY, sub { AnyEvent::MP::Kernel::_inject (@$msg) };
289 $DELAY_W ||= AE::timer 0, 0, \&_send_delayed; 233 $DELAY_W ||= AE::timer 0, 0, $send_delayed;
290 return; 234 return;
291 } 235 }
292 236
293 local $AnyEvent::MP::Kernel::SRCNODE = $self; 237 local $AnyEvent::MP::Kernel::SRCNODE = $AnyEvent::MP::Kernel::NODE;
294 AnyEvent::MP::Kernel::_inject (@{ $_[0] }); 238 AnyEvent::MP::Kernel::_inject (@{ $_[0] });
295 }; 239 };
296} 240}
297 241
298sub transport_connect { 242sub transport_connect {
299 my ($self, $tp) = @_; 243 my ($self, $tp) = @_;
300 244
301 $AnyEvent::MP::Kernel::WARN->(9, "I refuse to talk to myself ($tp->{peerhost}:$tp->{peerport})"); 245 AE::log 9 => "I refuse to talk to myself ($tp->{peerhost}:$tp->{peerport})";
302} 246}
303 247
304sub kill { 248sub kill {
305 my (undef, @args) = @_; 249 my (undef, @args) = @_;
306 250
307 # we _always_ delay kil's, to avoid calling mon callbacks 251 # we _always_ delay kil's, to avoid calling mon callbacks
308 # from anything but the event loop context. 252 # from anything but the event loop context.
309 $DELAY = 1; 253 $DELAY = 1;
310 push @DELAY, sub { AnyEvent::MP::Kernel::_kill (@args) }; 254 push @DELAY, sub { AnyEvent::MP::Kernel::_kill (@args) };
311 $DELAY_W ||= AE::timer 0, 0, \&_send_delayed; 255 $DELAY_W ||= AE::timer 0, 0, $send_delayed;
312} 256}
313 257
314sub monitor { 258sub monitor {
315 # maybe always delay, too? 259 # maybe always delay, too?
316 if ($DELAY_W) { 260 if ($DELAY_W) {

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines