ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Node.pm
(Generate patch)

Comparing AnyEvent-MP/MP/Node.pm (file contents):
Revision 1.32 by root, Sun Aug 30 09:24:09 2009 UTC vs.
Revision 1.64 by root, Fri Mar 23 21:16:25 2012 UTC

10 10
11This is an internal utility module, horrible to look at, so don't. 11This is an internal utility module, horrible to look at, so don't.
12 12
13=cut 13=cut
14 14
15package AnyEvent::MP::Node; 15package AnyEvent::MP::Node; # base class for nodes
16 16
17use common::sense; 17use common::sense;
18 18
19use AE ();
20use AnyEvent::Util (); 19use AnyEvent ();
21use AnyEvent::Socket (); 20use AnyEvent::Socket ();
22 21
23use AnyEvent::MP::Transport (); 22use AnyEvent::MP::Transport ();
24 23
25sub new { 24sub new {
40sub send { 39sub send {
41 &{ shift->{send} } 40 &{ shift->{send} }
42} 41}
43 42
44# nodes reachable via the network 43# nodes reachable via the network
45package AnyEvent::MP::Node::External; 44package AnyEvent::MP::Node::Remote; # a remote node
46 45
47use base "AnyEvent::MP::Node"; 46use base "AnyEvent::MP::Node";
48 47
49# called at init time, mostly sets {send} 48# called at init time, mostly sets {send}
50sub transport_reset { 49sub transport_reset {
89sub transport_connect { 88sub transport_connect {
90 my ($self, $transport) = @_; 89 my ($self, $transport) = @_;
91 90
92 delete $self->{trial}; 91 delete $self->{trial};
93 92
94 $self->transport_error (transport_error => "switched connections") 93 $self->transport_error (transport_error => $self->{id}, "switched connections")
95 if $self->{transport}; 94 if $self->{transport};
96 95
97 delete $self->{connect_w}; 96 delete $self->{connect_w};
98 delete $self->{connect_to}; 97 delete $self->{connect_to};
99 98
100 $self->{transport} = $transport; 99 $self->{transport} = $transport;
101 100
102 my $transport_send = $transport->can ("send"); 101 my $transport_send = $transport->{send};
103
104 $self->{send} = sub {
105 $transport_send->($transport, $_[0]);
106 };
107 102
108 AnyEvent::MP::Kernel::_inject_nodeevent ($self, 1); 103 AnyEvent::MP::Kernel::_inject_nodeevent ($self, 1);
109 104
105 $self->{send} = $transport_send;
106
110 $transport->send ($_) 107 $transport_send->($_)
111 for @{ delete $self->{queue} || [] }; 108 for @{ delete $self->{queue} || [] };
112} 109}
113 110
114sub connect { 111sub connect {
115 my ($self, @addresses) = @_; 112 my ($self) = @_;
116 113
117 return if $self->{transport}; 114 return if $self->{transport};
115 return if $self->{connect_w};
118 116
119 Scalar::Util::weaken $self; 117 Scalar::Util::weaken $self;
118 use Carp;Carp::cluck;#d#
120 119
121 $self->{connect_to} ||= AE::timer 120 $self->{connect_to} ||= AE::timer $AnyEvent::MP::Kernel::CONFIG->{connect_interval}, 0, sub {
122 $AnyEvent::MP::Config::CFG{monitor_timeout} || $AnyEvent::MP::Kernel::MONITOR_TIMEOUT,
123 0,
124 sub {
125 $self->transport_error (transport_error => $self->{id}, "unable to connect"); 121 $self->transport_error (transport_error => $self->{id}, "unable to connect");
122 };
123
124 # maybe @$addresses?
125 my $addresses = $AnyEvent::MP::Kernel::GLOBAL_DB{"'l"}{$self->{id}};
126
127 if ($addresses) {
128 $self->connect_to ($addresses);
129 } else {
130 # on global nodes, all bets are off now - we either know the node, or we don't
131 unless ($AnyEvent::MP::Kernel::GLOBAL) {
132 AnyEvent::MP::Kernel::g_find ($self->{id});
133 }
134 }
135}
136
137sub connect_to {
138 my ($self, $addresses) = @_;
139
140 return if $self->{transport};
141 return if $self->{connect_w};
142
143 return unless @$addresses;
144
145 AE::log 9 => "connecting to $self->{id} with [@$addresses]";
146
147 my $monitor = $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout};
148 my $interval = $AnyEvent::MP::Kernel::CONFIG->{connect_interval};
149
150 $interval = ($monitor - $interval) / @$addresses
151 if ($monitor - $interval) / @$addresses < $interval;
152
153 $interval = 0.4 if $interval < 0.4;
154
155 my @endpoints = reverse @$addresses;
156
157 $self->{connect_w} = AE::timer 0, $interval * (0.9 + 0.1 * rand), sub {
158 my $endpoint = pop @endpoints;
159
160 AE::log 9 => "connecting to $self->{id} at $endpoint";
161
162 $self->{trial}{$endpoint} ||= do {
163 my ($host, $port) = AnyEvent::Socket::parse_hostport $endpoint
164 or return AE::log critical => "$self->{id}: '$endpoint' is not a resolved node reference.";
165
166 AnyEvent::MP::Transport::mp_connect
167 $host, $port,
168 sub { delete $self->{trial}{$endpoint} },
126 }; 169 };
127
128 return unless @addresses;
129
130 $AnyEvent::MP::Kernel::WARN->(9, "connecting to $self->{id} with [@addresses]");
131
132 unless ($self->{connect_w}) {
133 my @endpoints;
134
135 $self->{connect_w} = AE::timer
136 rand,
137 $AnyEvent::MP::Config::CFG{connect_interval} || $AnyEvent::MP::Kernel::CONNECT_INTERVAL,
138 sub {
139 @endpoints = @addresses
140 unless @endpoints;
141
142 my $endpoint = shift @endpoints;
143
144 $AnyEvent::MP::Kernel::WARN->(9, "connecting to $self->{id} at $endpoint");
145
146 $self->{trial}{$endpoint} ||= do {
147 my ($host, $port) = AnyEvent::Socket::parse_hostport $endpoint
148 or return $AnyEvent::MP::Kernel::WARN->(1, "$self->{id}: not a resolved node reference.");
149
150 AnyEvent::MP::Transport::mp_connect
151 $host, $port,
152 sub { delete $self->{trial}{$endpoint} },
153 };
154 }
155 ;
156 } 170 };
157} 171}
158 172
159sub kill { 173sub kill {
160 my ($self, $port, @reason) = @_; 174 my ($self, $port, @reason) = @_;
161 175
162 $self->send (["", kil => $port, @reason]); 176 $self->{send} (["", kil => $port, @reason]);
163} 177}
164 178
165sub monitor { 179sub monitor {
166 my ($self, $portid, $cb) = @_; 180 my ($self, $portid, $cb) = @_;
167 181
185 $self->send (["", mon0 => $portid]); 199 $self->send (["", mon0 => $portid]);
186 delete $self->{monitor}{$portid}; 200 delete $self->{monitor}{$portid};
187 } 201 }
188} 202}
189 203
190# used for direct slave connections as well
191package AnyEvent::MP::Node::Direct;
192
193use base "AnyEvent::MP::Node::External";
194
195package AnyEvent::MP::Node::Self; 204package AnyEvent::MP::Node::Self; # the local node
196 205
197use base "AnyEvent::MP::Node"; 206use base "AnyEvent::MP::Node";
198 207
199sub connect { 208sub connect {
200 # we are trivially connected 209 # we are trivially connected
201} 210}
202 211
212# delay every so often to avoid recursion, also used to delay after spawn
213our $DELAY = -50;
214our @DELAY;
215our $DELAY_W;
216
217our $send_delayed = sub {
218 $AnyEvent::MP::Kernel::SRCNODE = $AnyEvent::MP::Kernel::NODE;
219 (shift @DELAY)->()
220 while @DELAY;
221 undef $DELAY_W;
222 $DELAY = -50;
223};
224
203sub transport_reset { 225sub transport_reset {
204 my ($self) = @_; 226 my ($self) = @_;
205 227
206 Scalar::Util::weaken $self; 228 Scalar::Util::weaken $self;
207 229
208 $self->{send} = sub { 230 $self->{send} = sub {
231 if (++$DELAY > 0) {
232 my $msg = $_[0];
233 push @DELAY, sub { AnyEvent::MP::Kernel::_inject (@$msg) };
234 $DELAY_W ||= AE::timer 0, 0, $send_delayed;
235 return;
236 }
237
209 local $AnyEvent::MP::Kernel::SRCNODE = $self; 238 local $AnyEvent::MP::Kernel::SRCNODE = $AnyEvent::MP::Kernel::NODE;
210 AnyEvent::MP::Kernel::_inject (@{ $_[0] }); 239 AnyEvent::MP::Kernel::_inject (@{ $_[0] });
211 }; 240 };
212} 241}
213 242
214sub transport_connect { 243sub transport_connect {
215 my ($self, $tp) = @_; 244 my ($self, $tp) = @_;
216 245
217 $AnyEvent::MP::Kernel::WARN->(9, "I refuse to talk to myself ($tp->{peerhost}:$tp->{peerport})"); 246 AE::log 9 => "I refuse to talk to myself ($tp->{peerhost}:$tp->{peerport})";
218} 247}
219 248
220sub kill { 249sub kill {
221 my ($self, $port, @reason) = @_; 250 my (undef, @args) = @_;
222 251
223 delete $AnyEvent::MP::Kernel::PORT{$port}; 252 # we _always_ delay kil's, to avoid calling mon callbacks
224 delete $AnyEvent::MP::Kernel::PORT_DATA{$port}; 253 # from anything but the event loop context.
225 254 $DELAY = 1;
226 my $mon = delete $AnyEvent::MP::Kernel::LMON{$port} 255 push @DELAY, sub { AnyEvent::MP::Kernel::_kill (@args) };
227 or !@reason 256 $DELAY_W ||= AE::timer 0, 0, $send_delayed;
228 or $AnyEvent::MP::Kernel::WARN->(2, "unmonitored local port $port died with reason: @reason");
229
230 $_->(@reason) for values %$mon;
231} 257}
232 258
233sub monitor { 259sub monitor {
234 my ($self, $portid, $cb) = @_; 260 # maybe always delay, too?
235 261 if ($DELAY_W) {
236 return $cb->(no_such_port => "cannot monitor nonexistent port") 262 my @args = @_;
237 unless exists $AnyEvent::MP::Kernel::PORT{$portid}; 263 push @DELAY, sub { AnyEvent::MP::Kernel::_monitor (@args) };
238 264 return;
239 $AnyEvent::MP::Kernel::LMON{$portid}{$cb+0} = $cb; 265 }
266 &AnyEvent::MP::Kernel::_monitor;
240} 267}
241 268
242sub unmonitor { 269sub unmonitor {
243 my ($self, $portid, $cb) = @_; 270 # no need to always delay
271 if ($DELAY_W) {
272 my @args = @_;
273 push @DELAY, sub { AnyEvent::MP::Kernel::_unmonitor (@args) };
274 return;
275 }
244 276
245 delete $AnyEvent::MP::Kernel::LMON{$portid}{$cb+0}; 277 &AnyEvent::MP::Kernel::_unmonitor;
246} 278}
247 279
248=head1 SEE ALSO 280=head1 SEE ALSO
249 281
250L<AnyEvent::MP>. 282L<AnyEvent::MP>.

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines