ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Node.pm
(Generate patch)

Comparing AnyEvent-MP/MP/Node.pm (file contents):
Revision 1.35 by root, Thu Sep 3 20:16:36 2009 UTC vs.
Revision 1.65 by root, Fri Mar 23 21:16:36 2012 UTC

10 10
11This is an internal utility module, horrible to look at, so don't. 11This is an internal utility module, horrible to look at, so don't.
12 12
13=cut 13=cut
14 14
15package AnyEvent::MP::Node; 15package AnyEvent::MP::Node; # base class for nodes
16 16
17use common::sense; 17use common::sense;
18 18
19use AE ();
20use AnyEvent::Util (); 19use AnyEvent ();
21use AnyEvent::Socket (); 20use AnyEvent::Socket ();
22 21
23use AnyEvent::MP::Transport (); 22use AnyEvent::MP::Transport ();
24 23
25sub new { 24sub new {
40sub send { 39sub send {
41 &{ shift->{send} } 40 &{ shift->{send} }
42} 41}
43 42
44# nodes reachable via the network 43# nodes reachable via the network
45package AnyEvent::MP::Node::External; 44package AnyEvent::MP::Node::Remote; # a remote node
46 45
47use base "AnyEvent::MP::Node"; 46use base "AnyEvent::MP::Node";
48 47
49# called at init time, mostly sets {send} 48# called at init time, mostly sets {send}
50sub transport_reset { 49sub transport_reset {
89sub transport_connect { 88sub transport_connect {
90 my ($self, $transport) = @_; 89 my ($self, $transport) = @_;
91 90
92 delete $self->{trial}; 91 delete $self->{trial};
93 92
94 $self->transport_error (transport_error => "switched connections") 93 $self->transport_error (transport_error => $self->{id}, "switched connections")
95 if $self->{transport}; 94 if $self->{transport};
96 95
97 delete $self->{connect_w}; 96 delete $self->{connect_w};
98 delete $self->{connect_to}; 97 delete $self->{connect_to};
99 98
100 $self->{transport} = $transport; 99 $self->{transport} = $transport;
101 100
102 my $transport_send = $transport->can ("send"); 101 my $transport_send = $transport->{send};
103
104 $self->{send} = sub {
105 $transport_send->($transport, $_[0]);
106 };
107 102
108 AnyEvent::MP::Kernel::_inject_nodeevent ($self, 1); 103 AnyEvent::MP::Kernel::_inject_nodeevent ($self, 1);
109 104
105 $self->{send} = $transport_send;
106
110 $transport->send ($_) 107 $transport_send->($_)
111 for @{ delete $self->{queue} || [] }; 108 for @{ delete $self->{queue} || [] };
112} 109}
113 110
114sub connect { 111sub connect {
115 my ($self, @addresses) = @_; 112 my ($self) = @_;
116 113
117 return if $self->{transport}; 114 return if $self->{transport};
115 return if $self->{connect_w};
118 116
119 Scalar::Util::weaken $self; 117 Scalar::Util::weaken $self;
120 118
121 my $monitor = $AnyEvent::MP::Config::CFG{monitor_timeout} || $AnyEvent::MP::Kernel::MONITOR_TIMEOUT; 119 $self->{connect_to} ||= AE::timer $AnyEvent::MP::Kernel::CONFIG->{connect_interval}, 0, sub {
122
123 $self->{connect_to} ||= AE::timer $monitor, 0, sub {
124 $self->transport_error (transport_error => $self->{id}, "unable to connect"); 120 $self->transport_error (transport_error => $self->{id}, "unable to connect");
125 }; 121 };
126 122
127 return unless @addresses; 123 # maybe @$addresses?
124 my $addresses = $AnyEvent::MP::Kernel::GLOBAL_DB{"'l"}{$self->{id}};
125
126 if ($addresses) {
127 $self->connect_to ($addresses);
128 } else {
129 # on global nodes, all bets are off now - we either know the node, or we don't
130 unless ($AnyEvent::MP::Kernel::GLOBAL) {
131 AnyEvent::MP::Kernel::g_find ($self->{id});
132 }
133 }
134}
135
136sub connect_to {
137 my ($self, $addresses) = @_;
138
139 return if $self->{transport};
128 return if $self->{connect_w}; 140 return if $self->{connect_w};
129 141
142 return unless @$addresses;
143
130 $AnyEvent::MP::Kernel::WARN->(9, "connecting to $self->{id} with [@addresses]"); 144 AE::log 9 => "connecting to $self->{id} with [@$addresses]";
131 145
132 my $interval = $AnyEvent::MP::Config::CFG{connect_interval} || $AnyEvent::MP::Kernel::CONNECT_INTERVAL; 146 my $monitor = $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout};
147 my $interval = $AnyEvent::MP::Kernel::CONFIG->{connect_interval};
133 148
134 $interval = ($monitor - $interval) / @addresses 149 $interval = ($monitor - $interval) / @$addresses
135 if ($monitor - $interval) / @addresses < $interval; 150 if ($monitor - $interval) / @$addresses < $interval;
136 151
137 $interval = 0.4 if $interval < 0.4; 152 $interval = 0.4 if $interval < 0.4;
138 153
139 my @endpoints; 154 my @endpoints = reverse @$addresses;
140 155
141 $self->{connect_w} = AE::timer 0.050 * rand, $interval * (0.9 + 0.1 * rand), sub { 156 $self->{connect_w} = AE::timer 0, $interval * (0.9 + 0.1 * rand), sub {
142 @endpoints = @addresses
143 unless @endpoints;
144
145 my $endpoint = shift @endpoints; 157 my $endpoint = pop @endpoints;
146 158
147 $AnyEvent::MP::Kernel::WARN->(9, "connecting to $self->{id} at $endpoint"); 159 AE::log 9 => "connecting to $self->{id} at $endpoint";
148 160
149 $self->{trial}{$endpoint} ||= do { 161 $self->{trial}{$endpoint} ||= do {
150 my ($host, $port) = AnyEvent::Socket::parse_hostport $endpoint 162 my ($host, $port) = AnyEvent::Socket::parse_hostport $endpoint
151 or return $AnyEvent::MP::Kernel::WARN->(1, "$self->{id}: not a resolved node reference."); 163 or return AE::log critical => "$self->{id}: '$endpoint' is not a resolved node reference.";
152 164
153 AnyEvent::MP::Transport::mp_connect 165 AnyEvent::MP::Transport::mp_connect
154 $host, $port, 166 $host, $port,
155 sub { delete $self->{trial}{$endpoint} }, 167 sub { delete $self->{trial}{$endpoint} },
156 }; 168 };
158} 170}
159 171
160sub kill { 172sub kill {
161 my ($self, $port, @reason) = @_; 173 my ($self, $port, @reason) = @_;
162 174
163 $self->send (["", kil => $port, @reason]); 175 $self->{send} (["", kil => $port, @reason]);
164} 176}
165 177
166sub monitor { 178sub monitor {
167 my ($self, $portid, $cb) = @_; 179 my ($self, $portid, $cb) = @_;
168 180
186 $self->send (["", mon0 => $portid]); 198 $self->send (["", mon0 => $portid]);
187 delete $self->{monitor}{$portid}; 199 delete $self->{monitor}{$portid};
188 } 200 }
189} 201}
190 202
191# used for direct slave connections as well
192package AnyEvent::MP::Node::Direct;
193
194use base "AnyEvent::MP::Node::External";
195
196package AnyEvent::MP::Node::Self; 203package AnyEvent::MP::Node::Self; # the local node
197 204
198use base "AnyEvent::MP::Node"; 205use base "AnyEvent::MP::Node";
199 206
200sub connect { 207sub connect {
201 # we are trivially connected 208 # we are trivially connected
202} 209}
203 210
211# delay every so often to avoid recursion, also used to delay after spawn
212our $DELAY = -50;
213our @DELAY;
214our $DELAY_W;
215
216our $send_delayed = sub {
217 $AnyEvent::MP::Kernel::SRCNODE = $AnyEvent::MP::Kernel::NODE;
218 (shift @DELAY)->()
219 while @DELAY;
220 undef $DELAY_W;
221 $DELAY = -50;
222};
223
204sub transport_reset { 224sub transport_reset {
205 my ($self) = @_; 225 my ($self) = @_;
206 226
207 Scalar::Util::weaken $self; 227 Scalar::Util::weaken $self;
208 228
209 $self->{send} = sub { 229 $self->{send} = sub {
230 if (++$DELAY > 0) {
231 my $msg = $_[0];
232 push @DELAY, sub { AnyEvent::MP::Kernel::_inject (@$msg) };
233 $DELAY_W ||= AE::timer 0, 0, $send_delayed;
234 return;
235 }
236
210 local $AnyEvent::MP::Kernel::SRCNODE = $self; 237 local $AnyEvent::MP::Kernel::SRCNODE = $AnyEvent::MP::Kernel::NODE;
211 AnyEvent::MP::Kernel::_inject (@{ $_[0] }); 238 AnyEvent::MP::Kernel::_inject (@{ $_[0] });
212 }; 239 };
213} 240}
214 241
215sub transport_connect { 242sub transport_connect {
216 my ($self, $tp) = @_; 243 my ($self, $tp) = @_;
217 244
218 $AnyEvent::MP::Kernel::WARN->(9, "I refuse to talk to myself ($tp->{peerhost}:$tp->{peerport})"); 245 AE::log 9 => "I refuse to talk to myself ($tp->{peerhost}:$tp->{peerport})";
219} 246}
220 247
221sub kill { 248sub kill {
222 my ($self, $port, @reason) = @_; 249 my (undef, @args) = @_;
223 250
224 delete $AnyEvent::MP::Kernel::PORT{$port}; 251 # we _always_ delay kil's, to avoid calling mon callbacks
225 delete $AnyEvent::MP::Kernel::PORT_DATA{$port}; 252 # from anything but the event loop context.
226 253 $DELAY = 1;
227 my $mon = delete $AnyEvent::MP::Kernel::LMON{$port} 254 push @DELAY, sub { AnyEvent::MP::Kernel::_kill (@args) };
228 or !@reason 255 $DELAY_W ||= AE::timer 0, 0, $send_delayed;
229 or $AnyEvent::MP::Kernel::WARN->(8, "unmonitored local port $port died with reason: @reason");
230
231 $_->(@reason) for values %$mon;
232} 256}
233 257
234sub monitor { 258sub monitor {
235 my ($self, $portid, $cb) = @_; 259 # maybe always delay, too?
236 260 if ($DELAY_W) {
237 return $cb->(no_such_port => "cannot monitor nonexistent port") 261 my @args = @_;
238 unless exists $AnyEvent::MP::Kernel::PORT{$portid}; 262 push @DELAY, sub { AnyEvent::MP::Kernel::_monitor (@args) };
239 263 return;
240 $AnyEvent::MP::Kernel::LMON{$portid}{$cb+0} = $cb; 264 }
265 &AnyEvent::MP::Kernel::_monitor;
241} 266}
242 267
243sub unmonitor { 268sub unmonitor {
244 my ($self, $portid, $cb) = @_; 269 # no need to always delay
270 if ($DELAY_W) {
271 my @args = @_;
272 push @DELAY, sub { AnyEvent::MP::Kernel::_unmonitor (@args) };
273 return;
274 }
245 275
246 delete $AnyEvent::MP::Kernel::LMON{$portid}{$cb+0}; 276 &AnyEvent::MP::Kernel::_unmonitor;
247} 277}
248 278
249=head1 SEE ALSO 279=head1 SEE ALSO
250 280
251L<AnyEvent::MP>. 281L<AnyEvent::MP>.

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines