ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Node.pm
(Generate patch)

Comparing AnyEvent-MP/MP/Node.pm (file contents):
Revision 1.60 by root, Wed Mar 21 15:22:16 2012 UTC vs.
Revision 1.67 by root, Sun Aug 28 08:23:34 2016 UTC

10 10
11This is an internal utility module, horrible to look at, so don't. 11This is an internal utility module, horrible to look at, so don't.
12 12
13=cut 13=cut
14 14
15package AnyEvent::MP::Node; 15package AnyEvent::MP::Node; # base class for nodes
16 16
17use common::sense; 17use common::sense;
18 18
19use AE ();
20use AnyEvent::Util (); 19use AnyEvent ();
21use AnyEvent::Socket (); 20use AnyEvent::Socket ();
22 21
23use AnyEvent::MP::Transport (); 22use AnyEvent::MP::Transport ();
24 23
25sub new { 24sub new {
26 my ($self, $id) = @_; 25 my ($self, $id) = @_;
27 26
28 $self = bless { id => $id }, $self; 27 $self = bless { id => $id }, $self;
29 28
29 # register
30 $AnyEvent::MP::Kernel::NODE{$id} = $self;
31
30 $self->init; 32 $self->init;
31 $self->transport_reset; 33 $self->transport_reset;
32 34
33 $self 35 $self
34} 36}
35 37
38sub DESTROY {
39 # unregister
40 delete $AnyEvent::MP::Kernel::NODE{$_[0]{id}};
41}
42
36sub init { 43sub init {
37 # 44 #
38} 45}
39 46
40sub send { 47sub send {
41 &{ shift->{send} } 48 &{ shift->{send} }
42} 49}
43 50
44# nodes reachable via the network 51# nodes reachable via the network
45package AnyEvent::MP::Node::Remote; 52package AnyEvent::MP::Node::Remote; # a remote node
46 53
47use base "AnyEvent::MP::Node"; 54use base "AnyEvent::MP::Node";
48 55
49# called at init time, mostly sets {send} 56# called at init time, mostly sets {send}
50sub transport_reset { 57sub transport_reset {
78 } 85 }
79 86
80 AnyEvent::MP::Kernel::_inject_nodeevent ($self, 0, @reason) 87 AnyEvent::MP::Kernel::_inject_nodeevent ($self, 0, @reason)
81 unless $no_transport; 88 unless $no_transport;
82 89
83 # if we are here and are idle, we nuke ourselves 90 # we weaken the node reference, so it can go away if unused
84 delete $AnyEvent::MP::Kernel::NODE{$self->{id}} 91 Scalar::Util::weaken $AnyEvent::MP::Kernel::NODE{$self->{id}}
85 unless $self->{transport} || $self->{connect_to}; 92 unless $self->{connect_to};
93
94 AE::log 9 => "@reason";
86} 95}
87 96
88# called after handshake was successful 97# called after handshake was successful
89sub transport_connect { 98sub transport_connect {
90 my ($self, $transport) = @_; 99 my ($self, $transport) = @_;
113 my ($self) = @_; 122 my ($self) = @_;
114 123
115 return if $self->{transport}; 124 return if $self->{transport};
116 return if $self->{connect_w}; 125 return if $self->{connect_w};
117 126
127 # we unweaken the node reference, in case it was weakened before
128 $AnyEvent::MP::Kernel::NODE{$self->{id}}
129 = $AnyEvent::MP::Kernel::NODE{$self->{id}};
130
118 Scalar::Util::weaken $self; 131 Scalar::Util::weaken $self;
119 132
120 $self->{connect_to} ||= AE::timer $AnyEvent::MP::Kernel::CONFIG->{connect_interval}, 0, sub { 133 $self->{connect_to} ||= AE::timer $AnyEvent::MP::Kernel::CONFIG->{connect_interval}, 0, sub {
121 $self->transport_error (transport_error => $self->{id}, "unable to connect"); 134 $self->transport_error (transport_error => $self->{id}, "connect timeout");
122 }; 135 };
123 136
124 # maybe @$addresses? 137 # maybe @$addresses?
125 my $addresses = $AnyEvent::MP::Kernel::GLOBAL_DB{"'l"}{$self->{id}}; 138 my $addresses = $AnyEvent::MP::Kernel::GLOBAL_DB{"'l"}{$self->{id}};
126 139
127 if ($addresses) { 140 if ($addresses) {
128 $self->connect_to ($addresses); 141 $self->connect_to ($addresses);
129 } else { 142 } else {
130 # on global nodes, all bets are off now - we either know the node, or we don't 143 # on global nodes, all bets are off now - we either know the node, or we don't
131 unless ($AnyEvent::MP::Kernel::GLOBAL) { 144 if ($AnyEvent::MP::Kernel::GLOBAL) {
145 $self->transport_error (transport_error => $self->{id}, "no known address");
146 } else {
132 AnyEvent::MP::Kernel::g_find ($self->{id}); 147 AnyEvent::MP::Kernel::g_find ($self->{id});
133 } 148 }
134 } 149 }
135} 150}
136 151
138 my ($self, $addresses) = @_; 153 my ($self, $addresses) = @_;
139 154
140 return if $self->{transport}; 155 return if $self->{transport};
141 return if $self->{connect_w}; 156 return if $self->{connect_w};
142 157
143 return unless @$addresses; 158 unless (@$addresses) {
159 $self->transport_error (transport_error => $self->{id}, "no known address");
160 return;
161 }
144 162
145 $AnyEvent::MP::Kernel::WARN->(9, "connecting to $self->{id} with [@$addresses]"); 163 AE::log 9 => "connecting to $self->{id} with [@$addresses]";
146 164
147 my $monitor = $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout}; 165 my $monitor = $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout};
148 my $interval = $AnyEvent::MP::Kernel::CONFIG->{connect_interval}; 166 my $interval = $AnyEvent::MP::Kernel::CONFIG->{connect_interval};
149 167
150 $interval = ($monitor - $interval) / @$addresses 168 $interval = ($monitor - $interval) / @$addresses
155 my @endpoints = reverse @$addresses; 173 my @endpoints = reverse @$addresses;
156 174
157 $self->{connect_w} = AE::timer 0, $interval * (0.9 + 0.1 * rand), sub { 175 $self->{connect_w} = AE::timer 0, $interval * (0.9 + 0.1 * rand), sub {
158 my $endpoint = pop @endpoints; 176 my $endpoint = pop @endpoints;
159 177
160 $AnyEvent::MP::Kernel::WARN->(9, "connecting to $self->{id} at $endpoint"); 178 AE::log 9 => "connecting to $self->{id} at $endpoint";
161 179
162 $self->{trial}{$endpoint} ||= do { 180 $self->{trial}{$endpoint} ||= do {
163 my ($host, $port) = AnyEvent::Socket::parse_hostport $endpoint 181 my ($host, $port) = AnyEvent::Socket::parse_hostport $endpoint
164 or return $AnyEvent::MP::Kernel::WARN->(1, "$self->{id}: not a resolved node reference."); 182 or return AE::log critical => "$self->{id}: '$endpoint' is not a resolved node reference.";
165 183
166 AnyEvent::MP::Transport::mp_connect 184 AnyEvent::MP::Transport::mp_connect
167 $host, $port, 185 $host, $port,
168 sub { delete $self->{trial}{$endpoint} }, 186 sub { delete $self->{trial}{$endpoint} },
169 }; 187 };
170 }; 188 };
171} 189}
172 190
173sub connect {
174 my ($self) = @_;
175
176 return if $self->{transport};
177 return if $self->{connect_w};
178
179 Scalar::Util::weaken $self;
180
181 my $monitor = $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout};
182
183 $self->{connect_to} ||= AE::timer $monitor, 0, sub {
184 $self->transport_error (transport_error => $self->{id}, "unable to connect");
185 };
186
187 # maybe @$addresses?
188 my @addresses = @{ $AnyEvent::MP::Kernel::GLOBAL_DB{"'l"}{$self->{id}} };
189
190 unless (@addresses) {
191 # on global nodes, all bets are off now - we either know the node, or we don't
192 unless ($AnyEvent::MP::Kernel::GLOBAL) {
193 AnyEvent::MP::Kernel::g_find ($self->{id});
194 }
195 return;
196 }
197
198 $AnyEvent::MP::Kernel::WARN->(9, "connecting to $self->{id} with [@addresses]");
199
200 my $interval = $AnyEvent::MP::Kernel::CONFIG->{connect_interval};
201
202 $interval = ($monitor - $interval) / @addresses
203 if ($monitor - $interval) / @addresses < $interval;
204
205 $interval = 0.4 if $interval < 0.4;
206
207 my @endpoints;
208
209 $self->{connect_w} = AE::timer 0, $interval * (0.9 + 0.1 * rand), sub {
210 @endpoints = @addresses
211 unless @endpoints;
212
213 my $endpoint = shift @endpoints;
214
215 $AnyEvent::MP::Kernel::WARN->(9, "connecting to $self->{id} at $endpoint");
216
217 $self->{trial}{$endpoint} ||= do {
218 my ($host, $port) = AnyEvent::Socket::parse_hostport $endpoint
219 or return $AnyEvent::MP::Kernel::WARN->(1, "$self->{id}: not a resolved node reference.");
220
221 AnyEvent::MP::Transport::mp_connect
222 $host, $port,
223 sub { delete $self->{trial}{$endpoint} },
224 };
225 };
226}
227
228sub kill { 191sub kill {
229 my ($self, $port, @reason) = @_; 192 my ($self, $port, @reason) = @_;
230 193
231 $self->{send} (["", kil => $port, @reason]); 194 $self->{send} (["", kil1 => $port, @reason]);
232} 195}
233 196
234sub monitor { 197sub monitor {
235 my ($self, $portid, $cb) = @_; 198 my ($self, $portid, $cb) = @_;
236 199
254 $self->send (["", mon0 => $portid]); 217 $self->send (["", mon0 => $portid]);
255 delete $self->{monitor}{$portid}; 218 delete $self->{monitor}{$portid};
256 } 219 }
257} 220}
258 221
259package AnyEvent::MP::Node::Self; 222package AnyEvent::MP::Node::Self; # the local node
260 223
261use base "AnyEvent::MP::Node"; 224use base "AnyEvent::MP::Node";
262 225
263sub connect { 226sub connect {
264 # we are trivially connected 227 # we are trivially connected
267# delay every so often to avoid recursion, also used to delay after spawn 230# delay every so often to avoid recursion, also used to delay after spawn
268our $DELAY = -50; 231our $DELAY = -50;
269our @DELAY; 232our @DELAY;
270our $DELAY_W; 233our $DELAY_W;
271 234
272sub _send_delayed { 235our $send_delayed = sub {
273 $AnyEvent::MP::Kernel::SRCNODE = $AnyEvent::MP::Kernel::NODE; 236 $AnyEvent::MP::Kernel::SRCNODE = $AnyEvent::MP::Kernel::NODE;
274 (shift @DELAY)->() 237 (shift @DELAY)->()
275 while @DELAY; 238 while @DELAY;
276 undef $DELAY_W; 239 undef $DELAY_W;
277 $DELAY = -50; 240 $DELAY = -50;
278} 241};
279 242
280sub transport_reset { 243sub transport_reset {
281 my ($self) = @_; 244 my ($self) = @_;
282 245
283 Scalar::Util::weaken $self; 246 Scalar::Util::weaken $self;
284 247
285 $self->{send} = sub { 248 $self->{send} = sub {
286 if ($DELAY++ >= 0) { 249 if (++$DELAY > 0) {
287 my $msg = $_[0]; 250 my $msg = $_[0];
288 push @DELAY, sub { AnyEvent::MP::Kernel::_inject (@$msg) }; 251 push @DELAY, sub { AnyEvent::MP::Kernel::_inject (@$msg) };
289 $DELAY_W ||= AE::timer 0, 0, \&_send_delayed; 252 $DELAY_W ||= AE::timer 0, 0, $send_delayed;
290 return; 253 return;
291 } 254 }
292 255
293 local $AnyEvent::MP::Kernel::SRCNODE = $AnyEvent::MP::Kernel::NODE; 256 local $AnyEvent::MP::Kernel::SRCNODE = $AnyEvent::MP::Kernel::NODE;
294 AnyEvent::MP::Kernel::_inject (@{ $_[0] }); 257 AnyEvent::MP::Kernel::_inject (@{ $_[0] });
296} 259}
297 260
298sub transport_connect { 261sub transport_connect {
299 my ($self, $tp) = @_; 262 my ($self, $tp) = @_;
300 263
301 $AnyEvent::MP::Kernel::WARN->(9, "I refuse to talk to myself ($tp->{peerhost}:$tp->{peerport})"); 264 AE::log 9 => "I refuse to talk to myself ($tp->{peerhost}:$tp->{peerport})";
302} 265}
303 266
304sub kill { 267sub kill {
305 my (undef, @args) = @_; 268 my (undef, @args) = @_;
306 269
307 # we _always_ delay kil's, to avoid calling mon callbacks 270 # we _always_ delay kil's, to avoid calling mon callbacks
308 # from anything but the event loop context. 271 # from anything but the event loop context.
309 $DELAY = 1; 272 $DELAY = 1;
310 push @DELAY, sub { AnyEvent::MP::Kernel::_kill (@args) }; 273 push @DELAY, sub { AnyEvent::MP::Kernel::_kill (@args) };
311 $DELAY_W ||= AE::timer 0, 0, \&_send_delayed; 274 $DELAY_W ||= AE::timer 0, 0, $send_delayed;
312} 275}
313 276
314sub monitor { 277sub monitor {
315 # maybe always delay, too? 278 # maybe always delay, too?
316 if ($DELAY_W) { 279 if ($DELAY_W) {

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines