ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Node.pm
(Generate patch)

Comparing AnyEvent-MP/MP/Node.pm (file contents):
Revision 1.40 by root, Tue Sep 8 20:56:05 2009 UTC vs.
Revision 1.64 by root, Fri Mar 23 21:16:25 2012 UTC

10 10
11This is an internal utility module, horrible to look at, so don't. 11This is an internal utility module, horrible to look at, so don't.
12 12
13=cut 13=cut
14 14
15package AnyEvent::MP::Node; 15package AnyEvent::MP::Node; # base class for nodes
16 16
17use common::sense; 17use common::sense;
18 18
19use AE ();
20use AnyEvent::Util (); 19use AnyEvent ();
21use AnyEvent::Socket (); 20use AnyEvent::Socket ();
22 21
23use AnyEvent::MP::Transport (); 22use AnyEvent::MP::Transport ();
24 23
25sub new { 24sub new {
40sub send { 39sub send {
41 &{ shift->{send} } 40 &{ shift->{send} }
42} 41}
43 42
44# nodes reachable via the network 43# nodes reachable via the network
45package AnyEvent::MP::Node::External; 44package AnyEvent::MP::Node::Remote; # a remote node
46 45
47use base "AnyEvent::MP::Node"; 46use base "AnyEvent::MP::Node";
48 47
49# called at init time, mostly sets {send} 48# called at init time, mostly sets {send}
50sub transport_reset { 49sub transport_reset {
89sub transport_connect { 88sub transport_connect {
90 my ($self, $transport) = @_; 89 my ($self, $transport) = @_;
91 90
92 delete $self->{trial}; 91 delete $self->{trial};
93 92
94 $self->transport_error (transport_error => "switched connections") 93 $self->transport_error (transport_error => $self->{id}, "switched connections")
95 if $self->{transport}; 94 if $self->{transport};
96 95
97 delete $self->{connect_w}; 96 delete $self->{connect_w};
98 delete $self->{connect_to}; 97 delete $self->{connect_to};
99 98
100 $self->{transport} = $transport; 99 $self->{transport} = $transport;
101 100
102 my $transport_send = $transport->can ("send"); 101 my $transport_send = $transport->{send};
103
104 $self->{send} = sub {
105 $transport_send->($transport, $_[0]);
106 };
107 102
108 AnyEvent::MP::Kernel::_inject_nodeevent ($self, 1); 103 AnyEvent::MP::Kernel::_inject_nodeevent ($self, 1);
109 104
105 $self->{send} = $transport_send;
106
110 $transport->send ($_) 107 $transport_send->($_)
111 for @{ delete $self->{queue} || [] }; 108 for @{ delete $self->{queue} || [] };
112} 109}
113 110
114sub connect { 111sub connect {
115 my ($self, @addresses) = @_; 112 my ($self) = @_;
116 113
117 return if $self->{transport}; 114 return if $self->{transport};
115 return if $self->{connect_w};
118 116
119 Scalar::Util::weaken $self; 117 Scalar::Util::weaken $self;
118 use Carp;Carp::cluck;#d#
120 119
121 my $monitor = $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout}; 120 $self->{connect_to} ||= AE::timer $AnyEvent::MP::Kernel::CONFIG->{connect_interval}, 0, sub {
122
123 $self->{connect_to} ||= AE::timer $monitor, 0, sub {
124 $self->transport_error (transport_error => $self->{id}, "unable to connect"); 121 $self->transport_error (transport_error => $self->{id}, "unable to connect");
125 }; 122 };
126 123
127 return unless @addresses; 124 # maybe @$addresses?
125 my $addresses = $AnyEvent::MP::Kernel::GLOBAL_DB{"'l"}{$self->{id}};
126
127 if ($addresses) {
128 $self->connect_to ($addresses);
129 } else {
130 # on global nodes, all bets are off now - we either know the node, or we don't
131 unless ($AnyEvent::MP::Kernel::GLOBAL) {
132 AnyEvent::MP::Kernel::g_find ($self->{id});
133 }
134 }
135}
136
137sub connect_to {
138 my ($self, $addresses) = @_;
139
140 return if $self->{transport};
128 return if $self->{connect_w}; 141 return if $self->{connect_w};
129 142
143 return unless @$addresses;
144
130 $AnyEvent::MP::Kernel::WARN->(9, "connecting to $self->{id} with [@addresses]"); 145 AE::log 9 => "connecting to $self->{id} with [@$addresses]";
131 146
147 my $monitor = $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout};
132 my $interval = $AnyEvent::MP::Kernel::CONFIG->{connect_interval}; 148 my $interval = $AnyEvent::MP::Kernel::CONFIG->{connect_interval};
133 149
134 $interval = ($monitor - $interval) / @addresses 150 $interval = ($monitor - $interval) / @$addresses
135 if ($monitor - $interval) / @addresses < $interval; 151 if ($monitor - $interval) / @$addresses < $interval;
136 152
137 $interval = 0.4 if $interval < 0.4; 153 $interval = 0.4 if $interval < 0.4;
138 154
139 my @endpoints; 155 my @endpoints = reverse @$addresses;
140 156
141 $self->{connect_w} = AE::timer 0.050 * rand, $interval * (0.9 + 0.1 * rand), sub { 157 $self->{connect_w} = AE::timer 0, $interval * (0.9 + 0.1 * rand), sub {
142 @endpoints = @addresses
143 unless @endpoints;
144
145 my $endpoint = shift @endpoints; 158 my $endpoint = pop @endpoints;
146 159
147 $AnyEvent::MP::Kernel::WARN->(9, "connecting to $self->{id} at $endpoint"); 160 AE::log 9 => "connecting to $self->{id} at $endpoint";
148 161
149 $self->{trial}{$endpoint} ||= do { 162 $self->{trial}{$endpoint} ||= do {
150 my ($host, $port) = AnyEvent::Socket::parse_hostport $endpoint 163 my ($host, $port) = AnyEvent::Socket::parse_hostport $endpoint
151 or return $AnyEvent::MP::Kernel::WARN->(1, "$self->{id}: not a resolved node reference."); 164 or return AE::log critical => "$self->{id}: '$endpoint' is not a resolved node reference.";
152 165
153 AnyEvent::MP::Transport::mp_connect 166 AnyEvent::MP::Transport::mp_connect
154 $host, $port, 167 $host, $port,
155 sub { delete $self->{trial}{$endpoint} }, 168 sub { delete $self->{trial}{$endpoint} },
156 }; 169 };
158} 171}
159 172
160sub kill { 173sub kill {
161 my ($self, $port, @reason) = @_; 174 my ($self, $port, @reason) = @_;
162 175
163 $self->send (["", kil => $port, @reason]); 176 $self->{send} (["", kil => $port, @reason]);
164} 177}
165 178
166sub monitor { 179sub monitor {
167 my ($self, $portid, $cb) = @_; 180 my ($self, $portid, $cb) = @_;
168 181
186 $self->send (["", mon0 => $portid]); 199 $self->send (["", mon0 => $portid]);
187 delete $self->{monitor}{$portid}; 200 delete $self->{monitor}{$portid};
188 } 201 }
189} 202}
190 203
191# used for direct slave connections as well
192package AnyEvent::MP::Node::Direct;
193
194use base "AnyEvent::MP::Node::External";
195
196package AnyEvent::MP::Node::Self; 204package AnyEvent::MP::Node::Self; # the local node
197 205
198use base "AnyEvent::MP::Node"; 206use base "AnyEvent::MP::Node";
199 207
200sub connect { 208sub connect {
201 # we are trivially connected 209 # we are trivially connected
202} 210}
203 211
204# delay every so often to avoid recursion, also used to delay after spawn 212# delay every so often to avoid recursion, also used to delay after spawn
205our $DELAY; 213our $DELAY = -50;
206our @DELAY; 214our @DELAY;
207our $DELAY_W; 215our $DELAY_W;
208 216
209sub _send_delayed { 217our $send_delayed = sub {
210 local $AnyEvent::MP::Kernel::SRCNODE = $AnyEvent::MP::Kernel::NODE{""}; 218 $AnyEvent::MP::Kernel::SRCNODE = $AnyEvent::MP::Kernel::NODE;
211 (shift @DELAY)->() 219 (shift @DELAY)->()
212 while @DELAY; 220 while @DELAY;
213 undef $DELAY_W; 221 undef $DELAY_W;
214 $DELAY = -50; 222 $DELAY = -50;
215} 223};
216 224
217sub transport_reset { 225sub transport_reset {
218 my ($self) = @_; 226 my ($self) = @_;
219 227
220 Scalar::Util::weaken $self; 228 Scalar::Util::weaken $self;
221 229
222 $self->{send} = sub { 230 $self->{send} = sub {
223 if ($DELAY++ >= 0) { 231 if (++$DELAY > 0) {
224 my $msg = $_[0]; 232 my $msg = $_[0];
225 push @DELAY, sub { AnyEvent::MP::Kernel::_inject (@$msg) }; 233 push @DELAY, sub { AnyEvent::MP::Kernel::_inject (@$msg) };
226 $DELAY_W ||= AE::timer 0, 0, \&_send_delayed; 234 $DELAY_W ||= AE::timer 0, 0, $send_delayed;
227 return; 235 return;
228 } 236 }
229 237
230 local $AnyEvent::MP::Kernel::SRCNODE = $self; 238 local $AnyEvent::MP::Kernel::SRCNODE = $AnyEvent::MP::Kernel::NODE;
231 AnyEvent::MP::Kernel::_inject (@{ $_[0] }); 239 AnyEvent::MP::Kernel::_inject (@{ $_[0] });
232 }; 240 };
233} 241}
234 242
235sub transport_connect { 243sub transport_connect {
236 my ($self, $tp) = @_; 244 my ($self, $tp) = @_;
237 245
238 $AnyEvent::MP::Kernel::WARN->(9, "I refuse to talk to myself ($tp->{peerhost}:$tp->{peerport})"); 246 AE::log 9 => "I refuse to talk to myself ($tp->{peerhost}:$tp->{peerport})";
239} 247}
240 248
241sub kill { 249sub kill {
242 my ($self, $port, @reason) = @_; 250 my (undef, @args) = @_;
243 251
244 my $delay_cb = sub { 252 # we _always_ delay kil's, to avoid calling mon callbacks
245 delete $AnyEvent::MP::Kernel::PORT{$port}; 253 # from anything but the event loop context.
246 delete $AnyEvent::MP::Kernel::PORT_DATA{$port}; 254 $DELAY = 1;
247 255 push @DELAY, sub { AnyEvent::MP::Kernel::_kill (@args) };
248 my $mon = delete $AnyEvent::MP::Kernel::LMON{$port} 256 $DELAY_W ||= AE::timer 0, 0, $send_delayed;
249 or !@reason
250 or $AnyEvent::MP::Kernel::WARN->(8, "unmonitored local port $port died with reason: @reason");
251
252 $_->(@reason) for values %$mon;
253 };
254
255 $DELAY_W ? push @DELAY, $delay_cb : &$delay_cb;
256} 257}
257 258
258sub monitor { 259sub monitor {
259 my ($self, $portid, $cb) = @_; 260 # maybe always delay, too?
260 261 if ($DELAY_W) {
261 my $delay_cb = sub { 262 my @args = @_;
262 return $cb->(no_such_port => "cannot monitor nonexistent port", "$self->{id}#$portid") 263 push @DELAY, sub { AnyEvent::MP::Kernel::_monitor (@args) };
263 unless exists $AnyEvent::MP::Kernel::PORT{$portid}; 264 return;
264
265 $AnyEvent::MP::Kernel::LMON{$portid}{$cb+0} = $cb;
266 }; 265 }
267 266 &AnyEvent::MP::Kernel::_monitor;
268 $DELAY_W ? push @DELAY, $delay_cb : &$delay_cb;
269} 267}
270 268
271sub unmonitor { 269sub unmonitor {
272 my ($self, $portid, $cb) = @_; 270 # no need to always delay
273 271 if ($DELAY_W) {
274 my $delay_cb = sub { 272 my @args = @_;
275 delete $AnyEvent::MP::Kernel::LMON{$portid}{$cb+0}; 273 push @DELAY, sub { AnyEvent::MP::Kernel::_unmonitor (@args) };
274 return;
276 }; 275 }
277 276
278 $DELAY_W ? push @DELAY, $delay_cb : &$delay_cb; 277 &AnyEvent::MP::Kernel::_unmonitor;
279} 278}
280 279
281=head1 SEE ALSO 280=head1 SEE ALSO
282 281
283L<AnyEvent::MP>. 282L<AnyEvent::MP>.

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines