ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Node.pm
(Generate patch)

Comparing AnyEvent-MP/MP/Node.pm (file contents):
Revision 1.42 by root, Wed Sep 23 11:57:16 2009 UTC vs.
Revision 1.65 by root, Fri Mar 23 21:16:36 2012 UTC

10 10
11This is an internal utility module, horrible to look at, so don't. 11This is an internal utility module, horrible to look at, so don't.
12 12
13=cut 13=cut
14 14
15package AnyEvent::MP::Node; 15package AnyEvent::MP::Node; # base class for nodes
16 16
17use common::sense; 17use common::sense;
18 18
19use AE ();
20use AnyEvent::Util (); 19use AnyEvent ();
21use AnyEvent::Socket (); 20use AnyEvent::Socket ();
22 21
23use AnyEvent::MP::Transport (); 22use AnyEvent::MP::Transport ();
24 23
25sub new { 24sub new {
40sub send { 39sub send {
41 &{ shift->{send} } 40 &{ shift->{send} }
42} 41}
43 42
44# nodes reachable via the network 43# nodes reachable via the network
45package AnyEvent::MP::Node::External; 44package AnyEvent::MP::Node::Remote; # a remote node
46 45
47use base "AnyEvent::MP::Node"; 46use base "AnyEvent::MP::Node";
48 47
49# called at init time, mostly sets {send} 48# called at init time, mostly sets {send}
50sub transport_reset { 49sub transport_reset {
89sub transport_connect { 88sub transport_connect {
90 my ($self, $transport) = @_; 89 my ($self, $transport) = @_;
91 90
92 delete $self->{trial}; 91 delete $self->{trial};
93 92
94 $self->transport_error (transport_error => "switched connections") 93 $self->transport_error (transport_error => $self->{id}, "switched connections")
95 if $self->{transport}; 94 if $self->{transport};
96 95
97 delete $self->{connect_addr};
98 delete $self->{connect_w}; 96 delete $self->{connect_w};
99 delete $self->{connect_to}; 97 delete $self->{connect_to};
100 98
101 $self->{transport} = $transport; 99 $self->{transport} = $transport;
102 100
103 my $transport_send = $transport->can ("send"); 101 my $transport_send = $transport->{send};
104
105 $self->{send} = sub {
106 $transport_send->($transport, $_[0]);
107 };
108 102
109 AnyEvent::MP::Kernel::_inject_nodeevent ($self, 1); 103 AnyEvent::MP::Kernel::_inject_nodeevent ($self, 1);
110 104
105 $self->{send} = $transport_send;
106
111 $transport->send ($_) 107 $transport_send->($_)
112 for @{ delete $self->{queue} || [] }; 108 for @{ delete $self->{queue} || [] };
113} 109}
114 110
115sub connect { 111sub connect {
116 my ($self, @addresses) = @_; 112 my ($self) = @_;
117 113
118 return if $self->{transport}; 114 return if $self->{transport};
115 return if $self->{connect_w};
119 116
120 Scalar::Util::weaken $self; 117 Scalar::Util::weaken $self;
121 118
122 my $monitor = $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout}; 119 $self->{connect_to} ||= AE::timer $AnyEvent::MP::Kernel::CONFIG->{connect_interval}, 0, sub {
123
124 $self->{connect_to} ||= AE::timer $monitor, 0, sub {
125 $self->transport_error (transport_error => $self->{id}, "unable to connect"); 120 $self->transport_error (transport_error => $self->{id}, "unable to connect");
126 }; 121 };
127 122
123 # maybe @$addresses?
124 my $addresses = $AnyEvent::MP::Kernel::GLOBAL_DB{"'l"}{$self->{id}};
125
126 if ($addresses) {
127 $self->connect_to ($addresses);
128 } else {
129 # on global nodes, all bets are off now - we either know the node, or we don't
130 unless ($AnyEvent::MP::Kernel::GLOBAL) {
131 AnyEvent::MP::Kernel::g_find ($self->{id});
132 }
133 }
134}
135
136sub connect_to {
137 my ($self, $addresses) = @_;
138
139 return if $self->{transport};
140 return if $self->{connect_w};
141
128 return unless @addresses; 142 return unless @$addresses;
129
130 if ($self->{connect_w}) {
131 # sometimes we get told about new addresses after we started to connect
132 unshift @{$self->{connect_addr}}, @addresses;
133 return;
134 } 143
135
136 $self->{connect_addr} = \@addresses; # a bit weird, but efficient
137
138 $AnyEvent::MP::Kernel::WARN->(9, "connecting to $self->{id} with [@addresses]"); 144 AE::log 9 => "connecting to $self->{id} with [@$addresses]";
139 145
146 my $monitor = $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout};
140 my $interval = $AnyEvent::MP::Kernel::CONFIG->{connect_interval}; 147 my $interval = $AnyEvent::MP::Kernel::CONFIG->{connect_interval};
141 148
142 $interval = ($monitor - $interval) / @addresses 149 $interval = ($monitor - $interval) / @$addresses
143 if ($monitor - $interval) / @addresses < $interval; 150 if ($monitor - $interval) / @$addresses < $interval;
144 151
145 $interval = 0.4 if $interval < 0.4; 152 $interval = 0.4 if $interval < 0.4;
146 153
147 my @endpoints; 154 my @endpoints = reverse @$addresses;
148 155
149 $self->{connect_w} = AE::timer 0.050 * rand, $interval * (0.9 + 0.1 * rand), sub { 156 $self->{connect_w} = AE::timer 0, $interval * (0.9 + 0.1 * rand), sub {
150 @endpoints = @addresses
151 unless @endpoints;
152
153 my $endpoint = shift @endpoints; 157 my $endpoint = pop @endpoints;
154 158
155 $AnyEvent::MP::Kernel::WARN->(9, "connecting to $self->{id} at $endpoint"); 159 AE::log 9 => "connecting to $self->{id} at $endpoint";
156 160
157 $self->{trial}{$endpoint} ||= do { 161 $self->{trial}{$endpoint} ||= do {
158 my ($host, $port) = AnyEvent::Socket::parse_hostport $endpoint 162 my ($host, $port) = AnyEvent::Socket::parse_hostport $endpoint
159 or return $AnyEvent::MP::Kernel::WARN->(1, "$self->{id}: not a resolved node reference."); 163 or return AE::log critical => "$self->{id}: '$endpoint' is not a resolved node reference.";
160 164
161 AnyEvent::MP::Transport::mp_connect 165 AnyEvent::MP::Transport::mp_connect
162 $host, $port, 166 $host, $port,
163 sub { delete $self->{trial}{$endpoint} }, 167 sub { delete $self->{trial}{$endpoint} },
164 }; 168 };
166} 170}
167 171
168sub kill { 172sub kill {
169 my ($self, $port, @reason) = @_; 173 my ($self, $port, @reason) = @_;
170 174
171 $self->send (["", kil => $port, @reason]); 175 $self->{send} (["", kil => $port, @reason]);
172} 176}
173 177
174sub monitor { 178sub monitor {
175 my ($self, $portid, $cb) = @_; 179 my ($self, $portid, $cb) = @_;
176 180
194 $self->send (["", mon0 => $portid]); 198 $self->send (["", mon0 => $portid]);
195 delete $self->{monitor}{$portid}; 199 delete $self->{monitor}{$portid};
196 } 200 }
197} 201}
198 202
199# used for direct slave connections as well
200package AnyEvent::MP::Node::Direct;
201
202use base "AnyEvent::MP::Node::External";
203
204package AnyEvent::MP::Node::Self; 203package AnyEvent::MP::Node::Self; # the local node
205 204
206use base "AnyEvent::MP::Node"; 205use base "AnyEvent::MP::Node";
207 206
208sub connect { 207sub connect {
209 # we are trivially connected 208 # we are trivially connected
210} 209}
211 210
212# delay every so often to avoid recursion, also used to delay after spawn 211# delay every so often to avoid recursion, also used to delay after spawn
213our $DELAY; 212our $DELAY = -50;
214our @DELAY; 213our @DELAY;
215our $DELAY_W; 214our $DELAY_W;
216 215
217sub _send_delayed { 216our $send_delayed = sub {
218 local $AnyEvent::MP::Kernel::SRCNODE = $AnyEvent::MP::Kernel::NODE{""}; 217 $AnyEvent::MP::Kernel::SRCNODE = $AnyEvent::MP::Kernel::NODE;
219 (shift @DELAY)->() 218 (shift @DELAY)->()
220 while @DELAY; 219 while @DELAY;
221 undef $DELAY_W; 220 undef $DELAY_W;
222 $DELAY = -50; 221 $DELAY = -50;
223} 222};
224 223
225sub transport_reset { 224sub transport_reset {
226 my ($self) = @_; 225 my ($self) = @_;
227 226
228 Scalar::Util::weaken $self; 227 Scalar::Util::weaken $self;
229 228
230 $self->{send} = sub { 229 $self->{send} = sub {
231 if ($DELAY++ >= 0) { 230 if (++$DELAY > 0) {
232 my $msg = $_[0]; 231 my $msg = $_[0];
233 push @DELAY, sub { AnyEvent::MP::Kernel::_inject (@$msg) }; 232 push @DELAY, sub { AnyEvent::MP::Kernel::_inject (@$msg) };
234 $DELAY_W ||= AE::timer 0, 0, \&_send_delayed; 233 $DELAY_W ||= AE::timer 0, 0, $send_delayed;
235 return; 234 return;
236 } 235 }
237 236
238 local $AnyEvent::MP::Kernel::SRCNODE = $self; 237 local $AnyEvent::MP::Kernel::SRCNODE = $AnyEvent::MP::Kernel::NODE;
239 AnyEvent::MP::Kernel::_inject (@{ $_[0] }); 238 AnyEvent::MP::Kernel::_inject (@{ $_[0] });
240 }; 239 };
241} 240}
242 241
243sub transport_connect { 242sub transport_connect {
244 my ($self, $tp) = @_; 243 my ($self, $tp) = @_;
245 244
246 $AnyEvent::MP::Kernel::WARN->(9, "I refuse to talk to myself ($tp->{peerhost}:$tp->{peerport})"); 245 AE::log 9 => "I refuse to talk to myself ($tp->{peerhost}:$tp->{peerport})";
247} 246}
248 247
249sub kill { 248sub kill {
250 my ($self, $port, @reason) = @_; 249 my (undef, @args) = @_;
251 250
252 my $delay_cb = sub { 251 # we _always_ delay kil's, to avoid calling mon callbacks
253 delete $AnyEvent::MP::Kernel::PORT{$port} 252 # from anything but the event loop context.
254 or return; # killing nonexistent ports is O.K. 253 $DELAY = 1;
255 delete $AnyEvent::MP::Kernel::PORT_DATA{$port}; 254 push @DELAY, sub { AnyEvent::MP::Kernel::_kill (@args) };
256 255 $DELAY_W ||= AE::timer 0, 0, $send_delayed;
257 my $mon = delete $AnyEvent::MP::Kernel::LMON{$port}
258 or !@reason
259 or $AnyEvent::MP::Kernel::WARN->(8, "unmonitored local port $port died with reason: @reason");
260
261 $_->(@reason) for values %$mon;
262 };
263
264 $DELAY_W ? push @DELAY, $delay_cb : &$delay_cb;
265} 256}
266 257
267sub monitor { 258sub monitor {
268 my ($self, $portid, $cb) = @_; 259 # maybe always delay, too?
269 260 if ($DELAY_W) {
270 my $delay_cb = sub { 261 my @args = @_;
271 return $cb->(no_such_port => "cannot monitor nonexistent port", "$self->{id}#$portid") 262 push @DELAY, sub { AnyEvent::MP::Kernel::_monitor (@args) };
272 unless exists $AnyEvent::MP::Kernel::PORT{$portid}; 263 return;
273
274 $AnyEvent::MP::Kernel::LMON{$portid}{$cb+0} = $cb;
275 }; 264 }
276 265 &AnyEvent::MP::Kernel::_monitor;
277 $DELAY_W ? push @DELAY, $delay_cb : &$delay_cb;
278} 266}
279 267
280sub unmonitor { 268sub unmonitor {
281 my ($self, $portid, $cb) = @_; 269 # no need to always delay
282 270 if ($DELAY_W) {
283 my $delay_cb = sub { 271 my @args = @_;
284 delete $AnyEvent::MP::Kernel::LMON{$portid}{$cb+0}; 272 push @DELAY, sub { AnyEvent::MP::Kernel::_unmonitor (@args) };
273 return;
285 }; 274 }
286 275
287 $DELAY_W ? push @DELAY, $delay_cb : &$delay_cb; 276 &AnyEvent::MP::Kernel::_unmonitor;
288} 277}
289 278
290=head1 SEE ALSO 279=head1 SEE ALSO
291 280
292L<AnyEvent::MP>. 281L<AnyEvent::MP>.

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines