ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Node.pm
(Generate patch)

Comparing AnyEvent-MP/MP/Node.pm (file contents):
Revision 1.43 by root, Thu Oct 1 16:11:05 2009 UTC vs.
Revision 1.68 by root, Sun Aug 28 09:39:23 2016 UTC

10 10
11This is an internal utility module, horrible to look at, so don't. 11This is an internal utility module, horrible to look at, so don't.
12 12
13=cut 13=cut
14 14
15package AnyEvent::MP::Node; 15package AnyEvent::MP::Node; # base class for nodes
16 16
17use common::sense; 17use common::sense;
18 18
19use AE ();
20use AnyEvent::Util (); 19use AnyEvent ();
21use AnyEvent::Socket (); 20use AnyEvent::Socket ();
22 21
23use AnyEvent::MP::Transport (); 22use AnyEvent::MP::Transport ();
24 23
25sub new { 24sub new {
26 my ($self, $id) = @_; 25 my ($self, $id) = @_;
27 26
28 $self = bless { id => $id }, $self; 27 $self = bless { id => $id }, $self;
29 28
29 # register
30 $AnyEvent::MP::Kernel::NODE{$id} = $self;
31
30 $self->init; 32 $self->init;
31 $self->transport_reset; 33 $self->transport_reset;
32 34
33 $self 35 $self
34} 36}
35 37
38sub DESTROY {
39 # unregister
40 delete $AnyEvent::MP::Kernel::NODE{$_[0]{id}};
41}
42
36sub init { 43sub init {
37 # 44 #
38} 45}
39 46
40sub send { 47sub send {
41 &{ shift->{send} } 48 &{ shift->{send} }
42} 49}
43 50
44# nodes reachable via the network 51# nodes reachable via the network
45package AnyEvent::MP::Node::External; 52package AnyEvent::MP::Node::Remote; # a remote node
46 53
47use base "AnyEvent::MP::Node"; 54use base "AnyEvent::MP::Node";
48 55
49# called at init time, mostly sets {send} 56# called at init time, mostly sets {send}
50sub transport_reset { 57sub transport_reset {
78 } 85 }
79 86
80 AnyEvent::MP::Kernel::_inject_nodeevent ($self, 0, @reason) 87 AnyEvent::MP::Kernel::_inject_nodeevent ($self, 0, @reason)
81 unless $no_transport; 88 unless $no_transport;
82 89
83 # if we are here and are idle, we nuke ourselves 90 # we weaken the node reference, so it can go away if unused
84 delete $AnyEvent::MP::Kernel::NODE{$self->{id}} 91 Scalar::Util::weaken $AnyEvent::MP::Kernel::NODE{$self->{id}}
85 unless $self->{transport} || $self->{connect_to}; 92 unless $self->{connect_to};
93
94 AE::log 9 => "@reason";
86} 95}
87 96
88# called after handshake was successful 97# called after handshake was successful
89sub transport_connect { 98sub transport_connect {
90 my ($self, $transport) = @_; 99 my ($self, $transport) = @_;
91 100
92 delete $self->{trial}; 101 delete $self->{trial};
93 102
94 $self->transport_error (transport_error => "switched connections") 103 $self->transport_error (transport_error => $self->{id}, "switched connections")
95 if $self->{transport}; 104 if $self->{transport};
96 105
97 delete $self->{connect_addr};
98 delete $self->{connect_w}; 106 delete $self->{connect_w};
99 delete $self->{connect_to}; 107 delete $self->{connect_to};
100 108
101 $self->{transport} = $transport; 109 $self->{transport} = $transport;
102 110
103 my $transport_send = $transport->can ("send"); 111 my $transport_send = $transport->{send};
104 112
105 $self->{send} = sub { 113 AnyEvent::MP::Kernel::_inject_nodeevent ($self, 1);
106 $transport_send->($transport, $_[0]); 114
115 $self->{send} = $transport_send;
116
117 $transport_send->($_)
118 for @{ delete $self->{queue} || [] };
119}
120
121sub connect {
122 my ($self) = @_;
123
124 return if $self->{transport};
125 return if $self->{connect_w};
126
127 # we unweaken the node reference, in case it was weakened before
128 $AnyEvent::MP::Kernel::NODE{$self->{id}}
129 = $AnyEvent::MP::Kernel::NODE{$self->{id}};
130
131 Scalar::Util::weaken $self;
132
133 $self->{connect_to} ||= AE::timer $AnyEvent::MP::Kernel::CONFIG->{connect_interval}, 0, sub {
134 $self->transport_error (transport_error => $self->{id}, "connect timeout");
107 }; 135 };
108 136
109 AnyEvent::MP::Kernel::_inject_nodeevent ($self, 1); 137 # maybe @$addresses?
138 my $addresses = $AnyEvent::MP::Kernel::GLOBAL_DB{"'l"}{$self->{id}};
110 139
111 $transport->send ($_) 140 if ($addresses) {
112 for @{ delete $self->{queue} || [] }; 141 $self->connect_to ($addresses);
142 } else {
143 # on global nodes, all bets are off now - we either know the node, or we don't
144 if ($AnyEvent::MP::Kernel::GLOBAL) {
145 $self->transport_error (transport_error => $self->{id}, "no known address");
146 } else {
147 AnyEvent::MP::Kernel::g_find ($self->{id});
148 }
149 }
113} 150}
114 151
115sub connect { 152sub connect_to {
116 my ($self, @addresses) = @_; 153 my ($self, $addresses) = @_;
117 154
118 return if $self->{transport}; 155 return if $self->{transport};
156 return if $self->{connect_w};
119 157
120 Scalar::Util::weaken $self; 158 unless (@$addresses) {
121
122 my $monitor = $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout};
123
124 $self->{connect_to} ||= AE::timer $monitor, 0, sub {
125 $self->transport_error (transport_error => $self->{id}, "unable to connect"); 159 $self->transport_error (transport_error => $self->{id}, "no known address");
126 };
127
128 return unless @addresses;
129
130 if ($self->{connect_w}) {
131 # sometimes we get told about new addresses after we started to connect
132 unshift @{$self->{connect_addr}}, @addresses;
133 return; 160 return;
134 } 161 }
135 162
136 $self->{connect_addr} = \@addresses; # a bit weird, but efficient
137
138 $AnyEvent::MP::Kernel::WARN->(9, "connecting to $self->{id} with [@addresses]"); 163 AE::log 9 => "connecting to $self->{id} with [@$addresses]";
139 164
165 my $monitor = $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout};
140 my $interval = $AnyEvent::MP::Kernel::CONFIG->{connect_interval}; 166 my $interval = $AnyEvent::MP::Kernel::CONFIG->{connect_interval};
141 167
142 $interval = ($monitor - $interval) / @addresses 168 $interval = ($monitor - $interval) / @$addresses
143 if ($monitor - $interval) / @addresses < $interval; 169 if ($monitor - $interval) / @$addresses < $interval;
144 170
145 $interval = 0.4 if $interval < 0.4; 171 $interval = 0.4 if $interval < 0.4;
146 172
147 my @endpoints; 173 my @endpoints = reverse @$addresses;
148 174
149 $self->{connect_w} = AE::timer 0.050 * rand, $interval * (0.9 + 0.1 * rand), sub { 175 $self->{connect_w} = AE::timer 0, $interval * (0.9 + 0.1 * rand), sub {
150 @endpoints = @addresses
151 unless @endpoints;
152
153 my $endpoint = shift @endpoints; 176 my $endpoint = pop @endpoints
177 or return $self->transport_error (transport_error => $self->{id}, "unable to connect to any address");
154 178
155 $AnyEvent::MP::Kernel::WARN->(9, "connecting to $self->{id} at $endpoint"); 179 AE::log 9 => "connecting to $self->{id} at $endpoint";
156 180
157 $self->{trial}{$endpoint} ||= do { 181 $self->{trial}{$endpoint} ||= do {
158 my ($host, $port) = AnyEvent::Socket::parse_hostport $endpoint 182 my ($host, $port) = AnyEvent::Socket::parse_hostport $endpoint
159 or return $AnyEvent::MP::Kernel::WARN->(1, "$self->{id}: not a resolved node reference."); 183 or return AE::log critical => "$self->{id}: '$endpoint' is not a resolved node reference.";
160 184
161 AnyEvent::MP::Transport::mp_connect 185 AnyEvent::MP::Transport::mp_connect
162 $host, $port, 186 $host, $port,
163 sub { delete $self->{trial}{$endpoint} }, 187 sub { delete $self->{trial}{$endpoint} },
164 }; 188 };
166} 190}
167 191
168sub kill { 192sub kill {
169 my ($self, $port, @reason) = @_; 193 my ($self, $port, @reason) = @_;
170 194
171 $self->send (["", kil => $port, @reason]); 195 $self->{send} (["", kil1 => $port, @reason]);
172} 196}
173 197
174sub monitor { 198sub monitor {
175 my ($self, $portid, $cb) = @_; 199 my ($self, $portid, $cb) = @_;
176 200
194 $self->send (["", mon0 => $portid]); 218 $self->send (["", mon0 => $portid]);
195 delete $self->{monitor}{$portid}; 219 delete $self->{monitor}{$portid};
196 } 220 }
197} 221}
198 222
199# used for direct slave connections as well
200package AnyEvent::MP::Node::Direct;
201
202use base "AnyEvent::MP::Node::External";
203
204package AnyEvent::MP::Node::Self; 223package AnyEvent::MP::Node::Self; # the local node
205 224
206use base "AnyEvent::MP::Node"; 225use base "AnyEvent::MP::Node";
207 226
208sub connect { 227sub connect {
209 # we are trivially connected 228 # we are trivially connected
212# delay every so often to avoid recursion, also used to delay after spawn 231# delay every so often to avoid recursion, also used to delay after spawn
213our $DELAY = -50; 232our $DELAY = -50;
214our @DELAY; 233our @DELAY;
215our $DELAY_W; 234our $DELAY_W;
216 235
217sub _send_delayed { 236our $send_delayed = sub {
218 local $AnyEvent::MP::Kernel::SRCNODE = $AnyEvent::MP::Kernel::NODE{""}; 237 $AnyEvent::MP::Kernel::SRCNODE = $AnyEvent::MP::Kernel::NODE;
219 (shift @DELAY)->() 238 (shift @DELAY)->()
220 while @DELAY; 239 while @DELAY;
221 undef $DELAY_W; 240 undef $DELAY_W;
222 $DELAY = -50; 241 $DELAY = -50;
223} 242};
224 243
225sub transport_reset { 244sub transport_reset {
226 my ($self) = @_; 245 my ($self) = @_;
227 246
228 Scalar::Util::weaken $self; 247 Scalar::Util::weaken $self;
229 248
230 $self->{send} = sub { 249 $self->{send} = sub {
231 if ($DELAY++ >= 0) { 250 if (++$DELAY > 0) {
232 my $msg = $_[0]; 251 my $msg = $_[0];
233 push @DELAY, sub { AnyEvent::MP::Kernel::_inject (@$msg) }; 252 push @DELAY, sub { AnyEvent::MP::Kernel::_inject (@$msg) };
234 $DELAY_W ||= AE::timer 0, 0, \&_send_delayed; 253 $DELAY_W ||= AE::timer 0, 0, $send_delayed;
235 return; 254 return;
236 } 255 }
237 256
238 local $AnyEvent::MP::Kernel::SRCNODE = $self; 257 local $AnyEvent::MP::Kernel::SRCNODE = $AnyEvent::MP::Kernel::NODE;
239 AnyEvent::MP::Kernel::_inject (@{ $_[0] }); 258 AnyEvent::MP::Kernel::_inject (@{ $_[0] });
240 }; 259 };
241} 260}
242 261
243sub transport_connect { 262sub transport_connect {
244 my ($self, $tp) = @_; 263 my ($self, $tp) = @_;
245 264
246 $AnyEvent::MP::Kernel::WARN->(9, "I refuse to talk to myself ($tp->{peerhost}:$tp->{peerport})"); 265 AE::log 9 => "I refuse to talk to myself ($tp->{peerhost}:$tp->{peerport})";
247} 266}
248 267
249sub kill { 268sub kill {
250 my ($self, $port, @reason) = @_; 269 my (undef, @args) = @_;
251 270
252 my $delay_cb = sub { 271 # we _always_ delay kil's, to avoid calling mon callbacks
253 delete $AnyEvent::MP::Kernel::PORT{$port} 272 # from anything but the event loop context.
254 or return; # killing nonexistent ports is O.K. 273 $DELAY = 1;
255 delete $AnyEvent::MP::Kernel::PORT_DATA{$port}; 274 push @DELAY, sub { AnyEvent::MP::Kernel::_kill (@args) };
256 275 $DELAY_W ||= AE::timer 0, 0, $send_delayed;
257 my $mon = delete $AnyEvent::MP::Kernel::LMON{$port}
258 or !@reason
259 or $AnyEvent::MP::Kernel::WARN->(8, "unmonitored local port $port died with reason: @reason");
260
261 $_->(@reason) for values %$mon;
262 };
263
264 $DELAY_W ? push @DELAY, $delay_cb : &$delay_cb;
265} 276}
266 277
267sub monitor { 278sub monitor {
268 my ($self, $portid, $cb) = @_; 279 # maybe always delay, too?
269 280 if ($DELAY_W) {
270 my $delay_cb = sub { 281 my @args = @_;
271 return $cb->(no_such_port => "cannot monitor nonexistent port", "$self->{id}#$portid") 282 push @DELAY, sub { AnyEvent::MP::Kernel::_monitor (@args) };
272 unless exists $AnyEvent::MP::Kernel::PORT{$portid}; 283 return;
273
274 $AnyEvent::MP::Kernel::LMON{$portid}{$cb+0} = $cb;
275 }; 284 }
276 285 &AnyEvent::MP::Kernel::_monitor;
277 $DELAY_W ? push @DELAY, $delay_cb : &$delay_cb;
278} 286}
279 287
280sub unmonitor { 288sub unmonitor {
281 my ($self, $portid, $cb) = @_; 289 # no need to always delay
282 290 if ($DELAY_W) {
283 my $delay_cb = sub { 291 my @args = @_;
284 delete $AnyEvent::MP::Kernel::LMON{$portid}{$cb+0}; 292 push @DELAY, sub { AnyEvent::MP::Kernel::_unmonitor (@args) };
293 return;
285 }; 294 }
286 295
287 $DELAY_W ? push @DELAY, $delay_cb : &$delay_cb; 296 &AnyEvent::MP::Kernel::_unmonitor;
288} 297}
289 298
290=head1 SEE ALSO 299=head1 SEE ALSO
291 300
292L<AnyEvent::MP>. 301L<AnyEvent::MP>.

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines