ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Node.pm
(Generate patch)

Comparing AnyEvent-MP/MP/Node.pm (file contents):
Revision 1.55 by root, Wed Feb 29 19:23:44 2012 UTC vs.
Revision 1.65 by root, Fri Mar 23 21:16:36 2012 UTC

10 10
11This is an internal utility module, horrible to look at, so don't. 11This is an internal utility module, horrible to look at, so don't.
12 12
13=cut 13=cut
14 14
15package AnyEvent::MP::Node; 15package AnyEvent::MP::Node; # base class for nodes
16 16
17use common::sense; 17use common::sense;
18 18
19use AE ();
20use AnyEvent::Util (); 19use AnyEvent ();
21use AnyEvent::Socket (); 20use AnyEvent::Socket ();
22 21
23use AnyEvent::MP::Transport (); 22use AnyEvent::MP::Transport ();
24 23
25sub new { 24sub new {
40sub send { 39sub send {
41 &{ shift->{send} } 40 &{ shift->{send} }
42} 41}
43 42
44# nodes reachable via the network 43# nodes reachable via the network
45package AnyEvent::MP::Node::Remote; 44package AnyEvent::MP::Node::Remote; # a remote node
46 45
47use base "AnyEvent::MP::Node"; 46use base "AnyEvent::MP::Node";
48 47
49# called at init time, mostly sets {send} 48# called at init time, mostly sets {send}
50sub transport_reset { 49sub transport_reset {
92 delete $self->{trial}; 91 delete $self->{trial};
93 92
94 $self->transport_error (transport_error => $self->{id}, "switched connections") 93 $self->transport_error (transport_error => $self->{id}, "switched connections")
95 if $self->{transport}; 94 if $self->{transport};
96 95
97 delete $self->{connect_addr};
98 delete $self->{connect_w}; 96 delete $self->{connect_w};
99 delete $self->{connect_to}; 97 delete $self->{connect_to};
100 98
101 $self->{transport} = $transport; 99 $self->{transport} = $transport;
102 100
116 return if $self->{transport}; 114 return if $self->{transport};
117 return if $self->{connect_w}; 115 return if $self->{connect_w};
118 116
119 Scalar::Util::weaken $self; 117 Scalar::Util::weaken $self;
120 118
121 my $monitor = $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout}; 119 $self->{connect_to} ||= AE::timer $AnyEvent::MP::Kernel::CONFIG->{connect_interval}, 0, sub {
122
123 $self->{connect_to} ||= AE::timer $monitor, 0, sub {
124 $self->transport_error (transport_error => $self->{id}, "unable to connect"); 120 $self->transport_error (transport_error => $self->{id}, "unable to connect");
125 }; 121 };
126 122
127 # maybe @$addresses? 123 # maybe @$addresses?
128 my @addresses = @{
129 $AnyEvent::MP::Kernel::GLOBAL_ADDR->{$self->{id}} 124 my $addresses = $AnyEvent::MP::Kernel::GLOBAL_DB{"'l"}{$self->{id}};
130 || $AnyEvent::MP::Kernel::NODE_ADDR->{$self->{id}}
131 };
132 125
133 unless (@addresses) { 126 if ($addresses) {
127 $self->connect_to ($addresses);
128 } else {
134 # on global nodes, all bets are off now - we either know the node, or we don't 129 # on global nodes, all bets are off now - we either know the node, or we don't
135 unless ($AnyEvent::MP::Kernel::GLOBAL) { 130 unless ($AnyEvent::MP::Kernel::GLOBAL) {
136 $self->{connect_w} = AnyEvent::MP::Kernel::global_req ( 131 AnyEvent::MP::Kernel::g_find ($self->{id});
137 g_find => $self->{id},
138 sub {
139 return unless $self; # just to be sure
140 return unless @{ $_[0] };
141
142 local $AnyEvent::MP::Kernel::GLOBAL_ADDR->{$self->{id}} = $_[0]; #d# UGLY
143 delete $self->{connect_w};
144 $self->connect;
145 }
146 );
147 } 132 }
133 }
134}
148 135
149 return; 136sub connect_to {
137 my ($self, $addresses) = @_;
138
139 return if $self->{transport};
140 return if $self->{connect_w};
141
142 return unless @$addresses;
150 } 143
151
152 $self->{connect_addr} = \@addresses; # a bit weird, but efficient
153
154 $AnyEvent::MP::Kernel::WARN->(9, "connecting to $self->{id} with [@addresses]"); 144 AE::log 9 => "connecting to $self->{id} with [@$addresses]";
155 145
146 my $monitor = $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout};
156 my $interval = $AnyEvent::MP::Kernel::CONFIG->{connect_interval}; 147 my $interval = $AnyEvent::MP::Kernel::CONFIG->{connect_interval};
157 148
158 $interval = ($monitor - $interval) / @addresses 149 $interval = ($monitor - $interval) / @$addresses
159 if ($monitor - $interval) / @addresses < $interval; 150 if ($monitor - $interval) / @$addresses < $interval;
160 151
161 $interval = 0.4 if $interval < 0.4; 152 $interval = 0.4 if $interval < 0.4;
162 153
163 my @endpoints; 154 my @endpoints = reverse @$addresses;
164 155
165 $self->{connect_w} = AE::timer 0, $interval * (0.9 + 0.1 * rand), sub { 156 $self->{connect_w} = AE::timer 0, $interval * (0.9 + 0.1 * rand), sub {
166 @endpoints = @addresses
167 unless @endpoints;
168
169 my $endpoint = shift @endpoints; 157 my $endpoint = pop @endpoints;
170 158
171 $AnyEvent::MP::Kernel::WARN->(9, "connecting to $self->{id} at $endpoint"); 159 AE::log 9 => "connecting to $self->{id} at $endpoint";
172 160
173 $self->{trial}{$endpoint} ||= do { 161 $self->{trial}{$endpoint} ||= do {
174 my ($host, $port) = AnyEvent::Socket::parse_hostport $endpoint 162 my ($host, $port) = AnyEvent::Socket::parse_hostport $endpoint
175 or return $AnyEvent::MP::Kernel::WARN->(1, "$self->{id}: not a resolved node reference."); 163 or return AE::log critical => "$self->{id}: '$endpoint' is not a resolved node reference.";
176 164
177 AnyEvent::MP::Transport::mp_connect 165 AnyEvent::MP::Transport::mp_connect
178 $host, $port, 166 $host, $port,
179 sub { delete $self->{trial}{$endpoint} }, 167 sub { delete $self->{trial}{$endpoint} },
180 }; 168 };
210 $self->send (["", mon0 => $portid]); 198 $self->send (["", mon0 => $portid]);
211 delete $self->{monitor}{$portid}; 199 delete $self->{monitor}{$portid};
212 } 200 }
213} 201}
214 202
215package AnyEvent::MP::Node::Self; 203package AnyEvent::MP::Node::Self; # the local node
216 204
217use base "AnyEvent::MP::Node"; 205use base "AnyEvent::MP::Node";
218 206
219sub connect { 207sub connect {
220 # we are trivially connected 208 # we are trivially connected
223# delay every so often to avoid recursion, also used to delay after spawn 211# delay every so often to avoid recursion, also used to delay after spawn
224our $DELAY = -50; 212our $DELAY = -50;
225our @DELAY; 213our @DELAY;
226our $DELAY_W; 214our $DELAY_W;
227 215
228sub _send_delayed { 216our $send_delayed = sub {
229 local $AnyEvent::MP::Kernel::SRCNODE = $AnyEvent::MP::Kernel::NODE{""}; 217 $AnyEvent::MP::Kernel::SRCNODE = $AnyEvent::MP::Kernel::NODE;
230 (shift @DELAY)->() 218 (shift @DELAY)->()
231 while @DELAY; 219 while @DELAY;
232 undef $DELAY_W; 220 undef $DELAY_W;
233 $DELAY = -50; 221 $DELAY = -50;
234} 222};
235 223
236sub transport_reset { 224sub transport_reset {
237 my ($self) = @_; 225 my ($self) = @_;
238 226
239 Scalar::Util::weaken $self; 227 Scalar::Util::weaken $self;
240 228
241 $self->{send} = sub { 229 $self->{send} = sub {
242 if ($DELAY++ >= 0) { 230 if (++$DELAY > 0) {
243 my $msg = $_[0]; 231 my $msg = $_[0];
244 push @DELAY, sub { AnyEvent::MP::Kernel::_inject (@$msg) }; 232 push @DELAY, sub { AnyEvent::MP::Kernel::_inject (@$msg) };
245 $DELAY_W ||= AE::timer 0, 0, \&_send_delayed; 233 $DELAY_W ||= AE::timer 0, 0, $send_delayed;
246 return; 234 return;
247 } 235 }
248 236
249 local $AnyEvent::MP::Kernel::SRCNODE = $self; 237 local $AnyEvent::MP::Kernel::SRCNODE = $AnyEvent::MP::Kernel::NODE;
250 AnyEvent::MP::Kernel::_inject (@{ $_[0] }); 238 AnyEvent::MP::Kernel::_inject (@{ $_[0] });
251 }; 239 };
252} 240}
253 241
254sub transport_connect { 242sub transport_connect {
255 my ($self, $tp) = @_; 243 my ($self, $tp) = @_;
256 244
257 $AnyEvent::MP::Kernel::WARN->(9, "I refuse to talk to myself ($tp->{peerhost}:$tp->{peerport})"); 245 AE::log 9 => "I refuse to talk to myself ($tp->{peerhost}:$tp->{peerport})";
258} 246}
259 247
260sub kill { 248sub kill {
261 my (undef, @args) = @_; 249 my (undef, @args) = @_;
262 250
263 # we _always_ delay kil's, to avoid calling mon callbacks 251 # we _always_ delay kil's, to avoid calling mon callbacks
264 # from anything but the event loop context. 252 # from anything but the event loop context.
265 $DELAY = 1; 253 $DELAY = 1;
266 push @DELAY, sub { AnyEvent::MP::Kernel::_kill (@args) }; 254 push @DELAY, sub { AnyEvent::MP::Kernel::_kill (@args) };
267 $DELAY_W ||= AE::timer 0, 0, \&_send_delayed; 255 $DELAY_W ||= AE::timer 0, 0, $send_delayed;
268} 256}
269 257
270sub monitor { 258sub monitor {
271 # maybe always delay, too? 259 # maybe always delay, too?
272 if ($DELAY_W) { 260 if ($DELAY_W) {

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines