ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Node.pm
(Generate patch)

Comparing AnyEvent-MP/MP/Node.pm (file contents):
Revision 1.47 by root, Sat Nov 28 12:40:22 2009 UTC vs.
Revision 1.64 by root, Fri Mar 23 21:16:25 2012 UTC

10 10
11This is an internal utility module, horrible to look at, so don't. 11This is an internal utility module, horrible to look at, so don't.
12 12
13=cut 13=cut
14 14
15package AnyEvent::MP::Node; 15package AnyEvent::MP::Node; # base class for nodes
16 16
17use common::sense; 17use common::sense;
18 18
19use AE ();
20use AnyEvent::Util (); 19use AnyEvent ();
21use AnyEvent::Socket (); 20use AnyEvent::Socket ();
22 21
23use AnyEvent::MP::Transport (); 22use AnyEvent::MP::Transport ();
24 23
25sub new { 24sub new {
40sub send { 39sub send {
41 &{ shift->{send} } 40 &{ shift->{send} }
42} 41}
43 42
44# nodes reachable via the network 43# nodes reachable via the network
45package AnyEvent::MP::Node::External; 44package AnyEvent::MP::Node::Remote; # a remote node
46 45
47use base "AnyEvent::MP::Node"; 46use base "AnyEvent::MP::Node";
48 47
49# called at init time, mostly sets {send} 48# called at init time, mostly sets {send}
50sub transport_reset { 49sub transport_reset {
89sub transport_connect { 88sub transport_connect {
90 my ($self, $transport) = @_; 89 my ($self, $transport) = @_;
91 90
92 delete $self->{trial}; 91 delete $self->{trial};
93 92
94 $self->transport_error (transport_error => "switched connections") 93 $self->transport_error (transport_error => $self->{id}, "switched connections")
95 if $self->{transport}; 94 if $self->{transport};
96 95
97 delete $self->{connect_addr};
98 delete $self->{connect_w}; 96 delete $self->{connect_w};
99 delete $self->{connect_to}; 97 delete $self->{connect_to};
100 98
101 $self->{transport} = $transport; 99 $self->{transport} = $transport;
102 100
103 my $transport_send = $transport->can ("send"); 101 my $transport_send = $transport->{send};
104 102
105 AnyEvent::MP::Kernel::_inject_nodeevent ($self, 1); 103 AnyEvent::MP::Kernel::_inject_nodeevent ($self, 1);
106 104
107 $self->{send} = sub { 105 $self->{send} = $transport_send;
108 $transport_send->($transport, $_[0]);
109 };
110 106
111 $transport->send ($_) 107 $transport_send->($_)
112 for @{ delete $self->{queue} || [] }; 108 for @{ delete $self->{queue} || [] };
113} 109}
114 110
115sub connect { 111sub connect {
116 my ($self, @addresses) = @_; 112 my ($self) = @_;
117 113
118 return if $self->{transport}; 114 return if $self->{transport};
115 return if $self->{connect_w};
119 116
120 Scalar::Util::weaken $self; 117 Scalar::Util::weaken $self;
118 use Carp;Carp::cluck;#d#
121 119
122 my $monitor = $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout}; 120 $self->{connect_to} ||= AE::timer $AnyEvent::MP::Kernel::CONFIG->{connect_interval}, 0, sub {
123
124 $self->{connect_to} ||= AE::timer $monitor, 0, sub {
125 $self->transport_error (transport_error => $self->{id}, "unable to connect"); 121 $self->transport_error (transport_error => $self->{id}, "unable to connect");
126 }; 122 };
127 123
124 # maybe @$addresses?
125 my $addresses = $AnyEvent::MP::Kernel::GLOBAL_DB{"'l"}{$self->{id}};
126
127 if ($addresses) {
128 $self->connect_to ($addresses);
129 } else {
130 # on global nodes, all bets are off now - we either know the node, or we don't
131 unless ($AnyEvent::MP::Kernel::GLOBAL) {
132 AnyEvent::MP::Kernel::g_find ($self->{id});
133 }
134 }
135}
136
137sub connect_to {
138 my ($self, $addresses) = @_;
139
140 return if $self->{transport};
141 return if $self->{connect_w};
142
128 return unless @addresses; 143 return unless @$addresses;
129
130 if ($self->{connect_w}) {
131 # sometimes we get told about new addresses after we started to connect
132 unshift @{$self->{connect_addr}}, @addresses;
133 return;
134 } 144
135
136 $self->{connect_addr} = \@addresses; # a bit weird, but efficient
137
138 $AnyEvent::MP::Kernel::WARN->(9, "connecting to $self->{id} with [@addresses]"); 145 AE::log 9 => "connecting to $self->{id} with [@$addresses]";
139 146
147 my $monitor = $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout};
140 my $interval = $AnyEvent::MP::Kernel::CONFIG->{connect_interval}; 148 my $interval = $AnyEvent::MP::Kernel::CONFIG->{connect_interval};
141 149
142 $interval = ($monitor - $interval) / @addresses 150 $interval = ($monitor - $interval) / @$addresses
143 if ($monitor - $interval) / @addresses < $interval; 151 if ($monitor - $interval) / @$addresses < $interval;
144 152
145 $interval = 0.4 if $interval < 0.4; 153 $interval = 0.4 if $interval < 0.4;
146 154
147 my @endpoints; 155 my @endpoints = reverse @$addresses;
148 156
149 $self->{connect_w} = AE::timer 0.050 * rand, $interval * (0.9 + 0.1 * rand), sub { 157 $self->{connect_w} = AE::timer 0, $interval * (0.9 + 0.1 * rand), sub {
150 @endpoints = @addresses
151 unless @endpoints;
152
153 my $endpoint = shift @endpoints; 158 my $endpoint = pop @endpoints;
154 159
155 $AnyEvent::MP::Kernel::WARN->(9, "connecting to $self->{id} at $endpoint"); 160 AE::log 9 => "connecting to $self->{id} at $endpoint";
156 161
157 $self->{trial}{$endpoint} ||= do { 162 $self->{trial}{$endpoint} ||= do {
158 my ($host, $port) = AnyEvent::Socket::parse_hostport $endpoint 163 my ($host, $port) = AnyEvent::Socket::parse_hostport $endpoint
159 or return $AnyEvent::MP::Kernel::WARN->(1, "$self->{id}: not a resolved node reference."); 164 or return AE::log critical => "$self->{id}: '$endpoint' is not a resolved node reference.";
160 165
161 AnyEvent::MP::Transport::mp_connect 166 AnyEvent::MP::Transport::mp_connect
162 $host, $port, 167 $host, $port,
163 sub { delete $self->{trial}{$endpoint} }, 168 sub { delete $self->{trial}{$endpoint} },
164 }; 169 };
166} 171}
167 172
168sub kill { 173sub kill {
169 my ($self, $port, @reason) = @_; 174 my ($self, $port, @reason) = @_;
170 175
171 $self->send (["", kil => $port, @reason]); 176 $self->{send} (["", kil => $port, @reason]);
172} 177}
173 178
174sub monitor { 179sub monitor {
175 my ($self, $portid, $cb) = @_; 180 my ($self, $portid, $cb) = @_;
176 181
194 $self->send (["", mon0 => $portid]); 199 $self->send (["", mon0 => $portid]);
195 delete $self->{monitor}{$portid}; 200 delete $self->{monitor}{$portid};
196 } 201 }
197} 202}
198 203
199# used for direct slave connections as well
200package AnyEvent::MP::Node::Direct;
201
202use base "AnyEvent::MP::Node::External";
203
204package AnyEvent::MP::Node::Self; 204package AnyEvent::MP::Node::Self; # the local node
205 205
206use base "AnyEvent::MP::Node"; 206use base "AnyEvent::MP::Node";
207 207
208sub connect { 208sub connect {
209 # we are trivially connected 209 # we are trivially connected
212# delay every so often to avoid recursion, also used to delay after spawn 212# delay every so often to avoid recursion, also used to delay after spawn
213our $DELAY = -50; 213our $DELAY = -50;
214our @DELAY; 214our @DELAY;
215our $DELAY_W; 215our $DELAY_W;
216 216
217sub _send_delayed { 217our $send_delayed = sub {
218 local $AnyEvent::MP::Kernel::SRCNODE = $AnyEvent::MP::Kernel::NODE{""}; 218 $AnyEvent::MP::Kernel::SRCNODE = $AnyEvent::MP::Kernel::NODE;
219 (shift @DELAY)->() 219 (shift @DELAY)->()
220 while @DELAY; 220 while @DELAY;
221 undef $DELAY_W; 221 undef $DELAY_W;
222 $DELAY = -50; 222 $DELAY = -50;
223} 223};
224 224
225sub transport_reset { 225sub transport_reset {
226 my ($self) = @_; 226 my ($self) = @_;
227 227
228 Scalar::Util::weaken $self; 228 Scalar::Util::weaken $self;
229 229
230 $self->{send} = sub { 230 $self->{send} = sub {
231 if ($DELAY++ >= 0) { 231 if (++$DELAY > 0) {
232 my $msg = $_[0]; 232 my $msg = $_[0];
233 push @DELAY, sub { AnyEvent::MP::Kernel::_inject (@$msg) }; 233 push @DELAY, sub { AnyEvent::MP::Kernel::_inject (@$msg) };
234 $DELAY_W ||= AE::timer 0, 0, \&_send_delayed; 234 $DELAY_W ||= AE::timer 0, 0, $send_delayed;
235 return; 235 return;
236 } 236 }
237 237
238 local $AnyEvent::MP::Kernel::SRCNODE = $self; 238 local $AnyEvent::MP::Kernel::SRCNODE = $AnyEvent::MP::Kernel::NODE;
239 AnyEvent::MP::Kernel::_inject (@{ $_[0] }); 239 AnyEvent::MP::Kernel::_inject (@{ $_[0] });
240 }; 240 };
241} 241}
242 242
243sub transport_connect { 243sub transport_connect {
244 my ($self, $tp) = @_; 244 my ($self, $tp) = @_;
245 245
246 $AnyEvent::MP::Kernel::WARN->(9, "I refuse to talk to myself ($tp->{peerhost}:$tp->{peerport})"); 246 AE::log 9 => "I refuse to talk to myself ($tp->{peerhost}:$tp->{peerport})";
247} 247}
248 248
249sub kill { 249sub kill {
250 my ($self, $port, @reason) = @_; 250 my (undef, @args) = @_;
251
252 my $delay_cb = sub {
253 delete $AnyEvent::MP::Kernel::PORT{$port}
254 or return; # killing nonexistent ports is O.K.
255 delete $AnyEvent::MP::Kernel::PORT_DATA{$port};
256
257 my $mon = delete $AnyEvent::MP::Kernel::LMON{$port}
258 or !@reason
259 or $AnyEvent::MP::Kernel::WARN->(2, "unmonitored local port $port died with reason: @reason");
260
261 $_->(@reason) for values %$mon;
262 };
263 251
264 # we _always_ delay kil's, to avoid calling mon callbacks 252 # we _always_ delay kil's, to avoid calling mon callbacks
265 # from anything but the event loop context. 253 # from anything but the event loop context.
266 $DELAY = 1; 254 $DELAY = 1;
267 push @DELAY, $delay_cb; 255 push @DELAY, sub { AnyEvent::MP::Kernel::_kill (@args) };
268 $DELAY_W ||= AE::timer 0, 0, \&_send_delayed; 256 $DELAY_W ||= AE::timer 0, 0, $send_delayed;
269} 257}
270 258
271sub monitor { 259sub monitor {
272 my ($self, $portid, $cb) = @_; 260 # maybe always delay, too?
273 261 if ($DELAY_W) {
274 my $delay_cb = sub { 262 my @args = @_;
275 return $cb->(no_such_port => "cannot monitor nonexistent port", "$self->{id}#$portid") 263 push @DELAY, sub { AnyEvent::MP::Kernel::_monitor (@args) };
276 unless exists $AnyEvent::MP::Kernel::PORT{$portid}; 264 return;
277
278 $AnyEvent::MP::Kernel::LMON{$portid}{$cb+0} = $cb;
279 }; 265 }
280 266 &AnyEvent::MP::Kernel::_monitor;
281 $DELAY_W ? push @DELAY, $delay_cb : &$delay_cb;
282} 267}
283 268
284sub unmonitor { 269sub unmonitor {
285 my ($self, $portid, $cb) = @_; 270 # no need to always delay
286 271 if ($DELAY_W) {
287 my $delay_cb = sub { 272 my @args = @_;
288 delete $AnyEvent::MP::Kernel::LMON{$portid}{$cb+0}; 273 push @DELAY, sub { AnyEvent::MP::Kernel::_unmonitor (@args) };
274 return;
289 }; 275 }
290 276
291 $DELAY_W ? push @DELAY, $delay_cb : &$delay_cb; 277 &AnyEvent::MP::Kernel::_unmonitor;
292} 278}
293 279
294=head1 SEE ALSO 280=head1 SEE ALSO
295 281
296L<AnyEvent::MP>. 282L<AnyEvent::MP>.

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines