ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Node.pm
(Generate patch)

Comparing AnyEvent-MP/MP/Node.pm (file contents):
Revision 1.32 by root, Sun Aug 30 09:24:09 2009 UTC vs.
Revision 1.67 by root, Sun Aug 28 08:23:34 2016 UTC

10 10
11This is an internal utility module, horrible to look at, so don't. 11This is an internal utility module, horrible to look at, so don't.
12 12
13=cut 13=cut
14 14
15package AnyEvent::MP::Node; 15package AnyEvent::MP::Node; # base class for nodes
16 16
17use common::sense; 17use common::sense;
18 18
19use AE ();
20use AnyEvent::Util (); 19use AnyEvent ();
21use AnyEvent::Socket (); 20use AnyEvent::Socket ();
22 21
23use AnyEvent::MP::Transport (); 22use AnyEvent::MP::Transport ();
24 23
25sub new { 24sub new {
26 my ($self, $id) = @_; 25 my ($self, $id) = @_;
27 26
28 $self = bless { id => $id }, $self; 27 $self = bless { id => $id }, $self;
29 28
29 # register
30 $AnyEvent::MP::Kernel::NODE{$id} = $self;
31
30 $self->init; 32 $self->init;
31 $self->transport_reset; 33 $self->transport_reset;
32 34
33 $self 35 $self
34} 36}
35 37
38sub DESTROY {
39 # unregister
40 delete $AnyEvent::MP::Kernel::NODE{$_[0]{id}};
41}
42
36sub init { 43sub init {
37 # 44 #
38} 45}
39 46
40sub send { 47sub send {
41 &{ shift->{send} } 48 &{ shift->{send} }
42} 49}
43 50
44# nodes reachable via the network 51# nodes reachable via the network
45package AnyEvent::MP::Node::External; 52package AnyEvent::MP::Node::Remote; # a remote node
46 53
47use base "AnyEvent::MP::Node"; 54use base "AnyEvent::MP::Node";
48 55
49# called at init time, mostly sets {send} 56# called at init time, mostly sets {send}
50sub transport_reset { 57sub transport_reset {
78 } 85 }
79 86
80 AnyEvent::MP::Kernel::_inject_nodeevent ($self, 0, @reason) 87 AnyEvent::MP::Kernel::_inject_nodeevent ($self, 0, @reason)
81 unless $no_transport; 88 unless $no_transport;
82 89
83 # if we are here and are idle, we nuke ourselves 90 # we weaken the node reference, so it can go away if unused
84 delete $AnyEvent::MP::Kernel::NODE{$self->{id}} 91 Scalar::Util::weaken $AnyEvent::MP::Kernel::NODE{$self->{id}}
85 unless $self->{transport} || $self->{connect_to}; 92 unless $self->{connect_to};
93
94 AE::log 9 => "@reason";
86} 95}
87 96
88# called after handshake was successful 97# called after handshake was successful
89sub transport_connect { 98sub transport_connect {
90 my ($self, $transport) = @_; 99 my ($self, $transport) = @_;
91 100
92 delete $self->{trial}; 101 delete $self->{trial};
93 102
94 $self->transport_error (transport_error => "switched connections") 103 $self->transport_error (transport_error => $self->{id}, "switched connections")
95 if $self->{transport}; 104 if $self->{transport};
96 105
97 delete $self->{connect_w}; 106 delete $self->{connect_w};
98 delete $self->{connect_to}; 107 delete $self->{connect_to};
99 108
100 $self->{transport} = $transport; 109 $self->{transport} = $transport;
101 110
102 my $transport_send = $transport->can ("send"); 111 my $transport_send = $transport->{send};
103 112
104 $self->{send} = sub { 113 AnyEvent::MP::Kernel::_inject_nodeevent ($self, 1);
105 $transport_send->($transport, $_[0]); 114
115 $self->{send} = $transport_send;
116
117 $transport_send->($_)
118 for @{ delete $self->{queue} || [] };
119}
120
121sub connect {
122 my ($self) = @_;
123
124 return if $self->{transport};
125 return if $self->{connect_w};
126
127 # we unweaken the node reference, in case it was weakened before
128 $AnyEvent::MP::Kernel::NODE{$self->{id}}
129 = $AnyEvent::MP::Kernel::NODE{$self->{id}};
130
131 Scalar::Util::weaken $self;
132
133 $self->{connect_to} ||= AE::timer $AnyEvent::MP::Kernel::CONFIG->{connect_interval}, 0, sub {
134 $self->transport_error (transport_error => $self->{id}, "connect timeout");
106 }; 135 };
107 136
108 AnyEvent::MP::Kernel::_inject_nodeevent ($self, 1); 137 # maybe @$addresses?
138 my $addresses = $AnyEvent::MP::Kernel::GLOBAL_DB{"'l"}{$self->{id}};
109 139
110 $transport->send ($_) 140 if ($addresses) {
111 for @{ delete $self->{queue} || [] }; 141 $self->connect_to ($addresses);
142 } else {
143 # on global nodes, all bets are off now - we either know the node, or we don't
144 if ($AnyEvent::MP::Kernel::GLOBAL) {
145 $self->transport_error (transport_error => $self->{id}, "no known address");
146 } else {
147 AnyEvent::MP::Kernel::g_find ($self->{id});
148 }
149 }
112} 150}
113 151
114sub connect { 152sub connect_to {
115 my ($self, @addresses) = @_; 153 my ($self, $addresses) = @_;
116 154
117 return if $self->{transport}; 155 return if $self->{transport};
156 return if $self->{connect_w};
118 157
119 Scalar::Util::weaken $self; 158 unless (@$addresses) {
120
121 $self->{connect_to} ||= AE::timer
122 $AnyEvent::MP::Config::CFG{monitor_timeout} || $AnyEvent::MP::Kernel::MONITOR_TIMEOUT,
123 0,
124 sub {
125 $self->transport_error (transport_error => $self->{id}, "unable to connect"); 159 $self->transport_error (transport_error => $self->{id}, "no known address");
160 return;
161 }
162
163 AE::log 9 => "connecting to $self->{id} with [@$addresses]";
164
165 my $monitor = $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout};
166 my $interval = $AnyEvent::MP::Kernel::CONFIG->{connect_interval};
167
168 $interval = ($monitor - $interval) / @$addresses
169 if ($monitor - $interval) / @$addresses < $interval;
170
171 $interval = 0.4 if $interval < 0.4;
172
173 my @endpoints = reverse @$addresses;
174
175 $self->{connect_w} = AE::timer 0, $interval * (0.9 + 0.1 * rand), sub {
176 my $endpoint = pop @endpoints;
177
178 AE::log 9 => "connecting to $self->{id} at $endpoint";
179
180 $self->{trial}{$endpoint} ||= do {
181 my ($host, $port) = AnyEvent::Socket::parse_hostport $endpoint
182 or return AE::log critical => "$self->{id}: '$endpoint' is not a resolved node reference.";
183
184 AnyEvent::MP::Transport::mp_connect
185 $host, $port,
186 sub { delete $self->{trial}{$endpoint} },
126 }; 187 };
127
128 return unless @addresses;
129
130 $AnyEvent::MP::Kernel::WARN->(9, "connecting to $self->{id} with [@addresses]");
131
132 unless ($self->{connect_w}) {
133 my @endpoints;
134
135 $self->{connect_w} = AE::timer
136 rand,
137 $AnyEvent::MP::Config::CFG{connect_interval} || $AnyEvent::MP::Kernel::CONNECT_INTERVAL,
138 sub {
139 @endpoints = @addresses
140 unless @endpoints;
141
142 my $endpoint = shift @endpoints;
143
144 $AnyEvent::MP::Kernel::WARN->(9, "connecting to $self->{id} at $endpoint");
145
146 $self->{trial}{$endpoint} ||= do {
147 my ($host, $port) = AnyEvent::Socket::parse_hostport $endpoint
148 or return $AnyEvent::MP::Kernel::WARN->(1, "$self->{id}: not a resolved node reference.");
149
150 AnyEvent::MP::Transport::mp_connect
151 $host, $port,
152 sub { delete $self->{trial}{$endpoint} },
153 };
154 }
155 ;
156 } 188 };
157} 189}
158 190
159sub kill { 191sub kill {
160 my ($self, $port, @reason) = @_; 192 my ($self, $port, @reason) = @_;
161 193
162 $self->send (["", kil => $port, @reason]); 194 $self->{send} (["", kil1 => $port, @reason]);
163} 195}
164 196
165sub monitor { 197sub monitor {
166 my ($self, $portid, $cb) = @_; 198 my ($self, $portid, $cb) = @_;
167 199
185 $self->send (["", mon0 => $portid]); 217 $self->send (["", mon0 => $portid]);
186 delete $self->{monitor}{$portid}; 218 delete $self->{monitor}{$portid};
187 } 219 }
188} 220}
189 221
190# used for direct slave connections as well
191package AnyEvent::MP::Node::Direct;
192
193use base "AnyEvent::MP::Node::External";
194
195package AnyEvent::MP::Node::Self; 222package AnyEvent::MP::Node::Self; # the local node
196 223
197use base "AnyEvent::MP::Node"; 224use base "AnyEvent::MP::Node";
198 225
199sub connect { 226sub connect {
200 # we are trivially connected 227 # we are trivially connected
201} 228}
202 229
230# delay every so often to avoid recursion, also used to delay after spawn
231our $DELAY = -50;
232our @DELAY;
233our $DELAY_W;
234
235our $send_delayed = sub {
236 $AnyEvent::MP::Kernel::SRCNODE = $AnyEvent::MP::Kernel::NODE;
237 (shift @DELAY)->()
238 while @DELAY;
239 undef $DELAY_W;
240 $DELAY = -50;
241};
242
203sub transport_reset { 243sub transport_reset {
204 my ($self) = @_; 244 my ($self) = @_;
205 245
206 Scalar::Util::weaken $self; 246 Scalar::Util::weaken $self;
207 247
208 $self->{send} = sub { 248 $self->{send} = sub {
249 if (++$DELAY > 0) {
250 my $msg = $_[0];
251 push @DELAY, sub { AnyEvent::MP::Kernel::_inject (@$msg) };
252 $DELAY_W ||= AE::timer 0, 0, $send_delayed;
253 return;
254 }
255
209 local $AnyEvent::MP::Kernel::SRCNODE = $self; 256 local $AnyEvent::MP::Kernel::SRCNODE = $AnyEvent::MP::Kernel::NODE;
210 AnyEvent::MP::Kernel::_inject (@{ $_[0] }); 257 AnyEvent::MP::Kernel::_inject (@{ $_[0] });
211 }; 258 };
212} 259}
213 260
214sub transport_connect { 261sub transport_connect {
215 my ($self, $tp) = @_; 262 my ($self, $tp) = @_;
216 263
217 $AnyEvent::MP::Kernel::WARN->(9, "I refuse to talk to myself ($tp->{peerhost}:$tp->{peerport})"); 264 AE::log 9 => "I refuse to talk to myself ($tp->{peerhost}:$tp->{peerport})";
218} 265}
219 266
220sub kill { 267sub kill {
221 my ($self, $port, @reason) = @_; 268 my (undef, @args) = @_;
222 269
223 delete $AnyEvent::MP::Kernel::PORT{$port}; 270 # we _always_ delay kil's, to avoid calling mon callbacks
224 delete $AnyEvent::MP::Kernel::PORT_DATA{$port}; 271 # from anything but the event loop context.
225 272 $DELAY = 1;
226 my $mon = delete $AnyEvent::MP::Kernel::LMON{$port} 273 push @DELAY, sub { AnyEvent::MP::Kernel::_kill (@args) };
227 or !@reason 274 $DELAY_W ||= AE::timer 0, 0, $send_delayed;
228 or $AnyEvent::MP::Kernel::WARN->(2, "unmonitored local port $port died with reason: @reason");
229
230 $_->(@reason) for values %$mon;
231} 275}
232 276
233sub monitor { 277sub monitor {
234 my ($self, $portid, $cb) = @_; 278 # maybe always delay, too?
235 279 if ($DELAY_W) {
236 return $cb->(no_such_port => "cannot monitor nonexistent port") 280 my @args = @_;
237 unless exists $AnyEvent::MP::Kernel::PORT{$portid}; 281 push @DELAY, sub { AnyEvent::MP::Kernel::_monitor (@args) };
238 282 return;
239 $AnyEvent::MP::Kernel::LMON{$portid}{$cb+0} = $cb; 283 }
284 &AnyEvent::MP::Kernel::_monitor;
240} 285}
241 286
242sub unmonitor { 287sub unmonitor {
243 my ($self, $portid, $cb) = @_; 288 # no need to always delay
289 if ($DELAY_W) {
290 my @args = @_;
291 push @DELAY, sub { AnyEvent::MP::Kernel::_unmonitor (@args) };
292 return;
293 }
244 294
245 delete $AnyEvent::MP::Kernel::LMON{$portid}{$cb+0}; 295 &AnyEvent::MP::Kernel::_unmonitor;
246} 296}
247 297
248=head1 SEE ALSO 298=head1 SEE ALSO
249 299
250L<AnyEvent::MP>. 300L<AnyEvent::MP>.

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines