ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Node.pm
(Generate patch)

Comparing AnyEvent-MP/MP/Node.pm (file contents):
Revision 1.39 by root, Tue Sep 8 13:46:25 2009 UTC vs.
Revision 1.68 by root, Sun Aug 28 09:39:23 2016 UTC

10 10
11This is an internal utility module, horrible to look at, so don't. 11This is an internal utility module, horrible to look at, so don't.
12 12
13=cut 13=cut
14 14
15package AnyEvent::MP::Node; 15package AnyEvent::MP::Node; # base class for nodes
16 16
17use common::sense; 17use common::sense;
18 18
19use AE ();
20use AnyEvent::Util (); 19use AnyEvent ();
21use AnyEvent::Socket (); 20use AnyEvent::Socket ();
22 21
23use AnyEvent::MP::Transport (); 22use AnyEvent::MP::Transport ();
24 23
25sub new { 24sub new {
26 my ($self, $id) = @_; 25 my ($self, $id) = @_;
27 26
28 $self = bless { id => $id }, $self; 27 $self = bless { id => $id }, $self;
29 28
29 # register
30 $AnyEvent::MP::Kernel::NODE{$id} = $self;
31
30 $self->init; 32 $self->init;
31 $self->transport_reset; 33 $self->transport_reset;
32 34
33 $self 35 $self
34} 36}
35 37
38sub DESTROY {
39 # unregister
40 delete $AnyEvent::MP::Kernel::NODE{$_[0]{id}};
41}
42
36sub init { 43sub init {
37 # 44 #
38} 45}
39 46
40sub send { 47sub send {
41 &{ shift->{send} } 48 &{ shift->{send} }
42} 49}
43 50
44# nodes reachable via the network 51# nodes reachable via the network
45package AnyEvent::MP::Node::External; 52package AnyEvent::MP::Node::Remote; # a remote node
46 53
47use base "AnyEvent::MP::Node"; 54use base "AnyEvent::MP::Node";
48 55
49# called at init time, mostly sets {send} 56# called at init time, mostly sets {send}
50sub transport_reset { 57sub transport_reset {
78 } 85 }
79 86
80 AnyEvent::MP::Kernel::_inject_nodeevent ($self, 0, @reason) 87 AnyEvent::MP::Kernel::_inject_nodeevent ($self, 0, @reason)
81 unless $no_transport; 88 unless $no_transport;
82 89
83 # if we are here and are idle, we nuke ourselves 90 # we weaken the node reference, so it can go away if unused
84 delete $AnyEvent::MP::Kernel::NODE{$self->{id}} 91 Scalar::Util::weaken $AnyEvent::MP::Kernel::NODE{$self->{id}}
85 unless $self->{transport} || $self->{connect_to}; 92 unless $self->{connect_to};
93
94 AE::log 9 => "@reason";
86} 95}
87 96
88# called after handshake was successful 97# called after handshake was successful
89sub transport_connect { 98sub transport_connect {
90 my ($self, $transport) = @_; 99 my ($self, $transport) = @_;
91 100
92 delete $self->{trial}; 101 delete $self->{trial};
93 102
94 $self->transport_error (transport_error => "switched connections") 103 $self->transport_error (transport_error => $self->{id}, "switched connections")
95 if $self->{transport}; 104 if $self->{transport};
96 105
97 delete $self->{connect_w}; 106 delete $self->{connect_w};
98 delete $self->{connect_to}; 107 delete $self->{connect_to};
99 108
100 $self->{transport} = $transport; 109 $self->{transport} = $transport;
101 110
102 my $transport_send = $transport->can ("send"); 111 my $transport_send = $transport->{send};
103 112
104 $self->{send} = sub { 113 AnyEvent::MP::Kernel::_inject_nodeevent ($self, 1);
105 $transport_send->($transport, $_[0]); 114
115 $self->{send} = $transport_send;
116
117 $transport_send->($_)
118 for @{ delete $self->{queue} || [] };
119}
120
121sub connect {
122 my ($self) = @_;
123
124 return if $self->{transport};
125 return if $self->{connect_w};
126
127 # we unweaken the node reference, in case it was weakened before
128 $AnyEvent::MP::Kernel::NODE{$self->{id}}
129 = $AnyEvent::MP::Kernel::NODE{$self->{id}};
130
131 Scalar::Util::weaken $self;
132
133 $self->{connect_to} ||= AE::timer $AnyEvent::MP::Kernel::CONFIG->{connect_interval}, 0, sub {
134 $self->transport_error (transport_error => $self->{id}, "connect timeout");
106 }; 135 };
107 136
108 AnyEvent::MP::Kernel::_inject_nodeevent ($self, 1); 137 # maybe @$addresses?
138 my $addresses = $AnyEvent::MP::Kernel::GLOBAL_DB{"'l"}{$self->{id}};
109 139
110 $transport->send ($_) 140 if ($addresses) {
111 for @{ delete $self->{queue} || [] }; 141 $self->connect_to ($addresses);
142 } else {
143 # on global nodes, all bets are off now - we either know the node, or we don't
144 if ($AnyEvent::MP::Kernel::GLOBAL) {
145 $self->transport_error (transport_error => $self->{id}, "no known address");
146 } else {
147 AnyEvent::MP::Kernel::g_find ($self->{id});
148 }
149 }
112} 150}
113 151
114sub connect { 152sub connect_to {
115 my ($self, @addresses) = @_; 153 my ($self, $addresses) = @_;
116 154
117 return if $self->{transport}; 155 return if $self->{transport};
118
119 Scalar::Util::weaken $self;
120
121 my $monitor = $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout};
122
123 $self->{connect_to} ||= AE::timer $monitor, 0, sub {
124 $self->transport_error (transport_error => $self->{id}, "unable to connect");
125 };
126
127 return unless @addresses;
128 return if $self->{connect_w}; 156 return if $self->{connect_w};
129 157
158 unless (@$addresses) {
159 $self->transport_error (transport_error => $self->{id}, "no known address");
160 return;
161 }
162
130 $AnyEvent::MP::Kernel::WARN->(9, "connecting to $self->{id} with [@addresses]"); 163 AE::log 9 => "connecting to $self->{id} with [@$addresses]";
131 164
165 my $monitor = $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout};
132 my $interval = $AnyEvent::MP::Kernel::CONFIG->{connect_interval}; 166 my $interval = $AnyEvent::MP::Kernel::CONFIG->{connect_interval};
133 167
134 $interval = ($monitor - $interval) / @addresses 168 $interval = ($monitor - $interval) / @$addresses
135 if ($monitor - $interval) / @addresses < $interval; 169 if ($monitor - $interval) / @$addresses < $interval;
136 170
137 $interval = 0.4 if $interval < 0.4; 171 $interval = 0.4 if $interval < 0.4;
138 172
139 my @endpoints; 173 my @endpoints = reverse @$addresses;
140 174
141 $self->{connect_w} = AE::timer 0.050 * rand, $interval * (0.9 + 0.1 * rand), sub { 175 $self->{connect_w} = AE::timer 0, $interval * (0.9 + 0.1 * rand), sub {
142 @endpoints = @addresses
143 unless @endpoints;
144
145 my $endpoint = shift @endpoints; 176 my $endpoint = pop @endpoints
177 or return $self->transport_error (transport_error => $self->{id}, "unable to connect to any address");
146 178
147 $AnyEvent::MP::Kernel::WARN->(9, "connecting to $self->{id} at $endpoint"); 179 AE::log 9 => "connecting to $self->{id} at $endpoint";
148 180
149 $self->{trial}{$endpoint} ||= do { 181 $self->{trial}{$endpoint} ||= do {
150 my ($host, $port) = AnyEvent::Socket::parse_hostport $endpoint 182 my ($host, $port) = AnyEvent::Socket::parse_hostport $endpoint
151 or return $AnyEvent::MP::Kernel::WARN->(1, "$self->{id}: not a resolved node reference."); 183 or return AE::log critical => "$self->{id}: '$endpoint' is not a resolved node reference.";
152 184
153 AnyEvent::MP::Transport::mp_connect 185 AnyEvent::MP::Transport::mp_connect
154 $host, $port, 186 $host, $port,
155 sub { delete $self->{trial}{$endpoint} }, 187 sub { delete $self->{trial}{$endpoint} },
156 }; 188 };
158} 190}
159 191
160sub kill { 192sub kill {
161 my ($self, $port, @reason) = @_; 193 my ($self, $port, @reason) = @_;
162 194
163 $self->send (["", kil => $port, @reason]); 195 $self->{send} (["", kil1 => $port, @reason]);
164} 196}
165 197
166sub monitor { 198sub monitor {
167 my ($self, $portid, $cb) = @_; 199 my ($self, $portid, $cb) = @_;
168 200
186 $self->send (["", mon0 => $portid]); 218 $self->send (["", mon0 => $portid]);
187 delete $self->{monitor}{$portid}; 219 delete $self->{monitor}{$portid};
188 } 220 }
189} 221}
190 222
191# used for direct slave connections as well
192package AnyEvent::MP::Node::Direct;
193
194use base "AnyEvent::MP::Node::External";
195
196package AnyEvent::MP::Node::Self; 223package AnyEvent::MP::Node::Self; # the local node
197 224
198use base "AnyEvent::MP::Node"; 225use base "AnyEvent::MP::Node";
199 226
200sub connect { 227sub connect {
201 # we are trivially connected 228 # we are trivially connected
202} 229}
203 230
204# delay every so often to avoid recursion, also used to delay after spawn 231# delay every so often to avoid recursion, also used to delay after spawn
205our $DELAY; 232our $DELAY = -50;
206our @DELAY; 233our @DELAY;
207our $DELAY_W; 234our $DELAY_W;
208 235
209sub _send_delayed { 236our $send_delayed = sub {
210 local $AnyEvent::MP::Kernel::SRCNODE = $AnyEvent::MP::Kernel::NODE{""}; 237 $AnyEvent::MP::Kernel::SRCNODE = $AnyEvent::MP::Kernel::NODE;
211 (shift @DELAY)->() 238 (shift @DELAY)->()
212 while @DELAY; 239 while @DELAY;
213 undef $DELAY_W; 240 undef $DELAY_W;
214 $DELAY = -50; 241 $DELAY = -50;
215} 242};
216 243
217sub transport_reset { 244sub transport_reset {
218 my ($self) = @_; 245 my ($self) = @_;
219 246
220 Scalar::Util::weaken $self; 247 Scalar::Util::weaken $self;
221 248
222 $self->{send} = sub { 249 $self->{send} = sub {
223 if ($DELAY++ >= 0) { 250 if (++$DELAY > 0) {
224 my $msg = $_[0]; 251 my $msg = $_[0];
225 push @DELAY, sub { AnyEvent::MP::Kernel::_inject (@$msg) }; 252 push @DELAY, sub { AnyEvent::MP::Kernel::_inject (@$msg) };
226 $DELAY_W ||= AE::timer 0, 0, \&_send_delayed; 253 $DELAY_W ||= AE::timer 0, 0, $send_delayed;
227 } else { 254 return;
228 local $AnyEvent::MP::Kernel::SRCNODE = $self;
229 AnyEvent::MP::Kernel::_inject (@{ $_[0] });
230 } 255 }
256
257 local $AnyEvent::MP::Kernel::SRCNODE = $AnyEvent::MP::Kernel::NODE;
258 AnyEvent::MP::Kernel::_inject (@{ $_[0] });
231 }; 259 };
232} 260}
233 261
234sub transport_connect { 262sub transport_connect {
235 my ($self, $tp) = @_; 263 my ($self, $tp) = @_;
236 264
237 $AnyEvent::MP::Kernel::WARN->(9, "I refuse to talk to myself ($tp->{peerhost}:$tp->{peerport})"); 265 AE::log 9 => "I refuse to talk to myself ($tp->{peerhost}:$tp->{peerport})";
238} 266}
239 267
240sub kill { 268sub kill {
241 my ($self, $port, @reason) = @_; 269 my (undef, @args) = @_;
242 270
243 my $delay_cb = sub { 271 # we _always_ delay kil's, to avoid calling mon callbacks
244 delete $AnyEvent::MP::Kernel::PORT{$port}; 272 # from anything but the event loop context.
245 delete $AnyEvent::MP::Kernel::PORT_DATA{$port}; 273 $DELAY = 1;
246 274 push @DELAY, sub { AnyEvent::MP::Kernel::_kill (@args) };
247 my $mon = delete $AnyEvent::MP::Kernel::LMON{$port} 275 $DELAY_W ||= AE::timer 0, 0, $send_delayed;
248 or !@reason
249 or $AnyEvent::MP::Kernel::WARN->(8, "unmonitored local port $port died with reason: @reason");
250
251 $_->(@reason) for values %$mon;
252 };
253
254 $DELAY_W ? push @DELAY, $delay_cb : &$delay_cb;
255} 276}
256 277
257sub monitor { 278sub monitor {
258 my ($self, $portid, $cb) = @_; 279 # maybe always delay, too?
259 280 if ($DELAY_W) {
260 my $delay_cb = sub { 281 my @args = @_;
261 return $cb->(no_such_port => "cannot monitor nonexistent port", "$self->{id}#$portid") 282 push @DELAY, sub { AnyEvent::MP::Kernel::_monitor (@args) };
262 unless exists $AnyEvent::MP::Kernel::PORT{$portid}; 283 return;
263
264 $AnyEvent::MP::Kernel::LMON{$portid}{$cb+0} = $cb;
265 }; 284 }
266 285 &AnyEvent::MP::Kernel::_monitor;
267 $DELAY_W ? push @DELAY, $delay_cb : &$delay_cb;
268} 286}
269 287
270sub unmonitor { 288sub unmonitor {
271 my ($self, $portid, $cb) = @_; 289 # no need to always delay
272 290 if ($DELAY_W) {
273 my $delay_cb = sub { 291 my @args = @_;
274 delete $AnyEvent::MP::Kernel::LMON{$portid}{$cb+0}; 292 push @DELAY, sub { AnyEvent::MP::Kernel::_unmonitor (@args) };
293 return;
275 }; 294 }
276 295
277 $DELAY_W ? push @DELAY, $delay_cb : &$delay_cb; 296 &AnyEvent::MP::Kernel::_unmonitor;
278} 297}
279 298
280=head1 SEE ALSO 299=head1 SEE ALSO
281 300
282L<AnyEvent::MP>. 301L<AnyEvent::MP>.

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines