ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Node.pm
(Generate patch)

Comparing AnyEvent-MP/MP/Node.pm (file contents):
Revision 1.31 by root, Fri Aug 28 23:06:33 2009 UTC vs.
Revision 1.64 by root, Fri Mar 23 21:16:25 2012 UTC

10 10
11This is an internal utility module, horrible to look at, so don't. 11This is an internal utility module, horrible to look at, so don't.
12 12
13=cut 13=cut
14 14
15package AnyEvent::MP::Node; 15package AnyEvent::MP::Node; # base class for nodes
16 16
17use common::sense; 17use common::sense;
18 18
19use AE ();
20use AnyEvent::Util (); 19use AnyEvent ();
21use AnyEvent::Socket (); 20use AnyEvent::Socket ();
22 21
23use AnyEvent::MP::Transport (); 22use AnyEvent::MP::Transport ();
24 23
25sub new { 24sub new {
40sub send { 39sub send {
41 &{ shift->{send} } 40 &{ shift->{send} }
42} 41}
43 42
44# nodes reachable via the network 43# nodes reachable via the network
45package AnyEvent::MP::Node::External; 44package AnyEvent::MP::Node::Remote; # a remote node
46 45
47use base "AnyEvent::MP::Node"; 46use base "AnyEvent::MP::Node";
48 47
49# called at init time, mostly sets {send} 48# called at init time, mostly sets {send}
50sub transport_reset { 49sub transport_reset {
58 push @{$self->{queue}}, shift; 57 push @{$self->{queue}}, shift;
59 $self->connect; 58 $self->connect;
60 }; 59 };
61} 60}
62 61
63# called only after successful handshake 62# called each time we fail to establish a connection,
63# or the existing connection failed
64sub transport_error { 64sub transport_error {
65 my ($self, @reason) = @_; 65 my ($self, @reason) = @_;
66 66
67 my $no_transport = !$self->{transport}; 67 my $no_transport = !$self->{transport};
68 68
76 $_->(@reason) for map @$_, values %$mon; 76 $_->(@reason) for map @$_, values %$mon;
77 } 77 }
78 78
79 AnyEvent::MP::Kernel::_inject_nodeevent ($self, 0, @reason) 79 AnyEvent::MP::Kernel::_inject_nodeevent ($self, 0, @reason)
80 unless $no_transport; 80 unless $no_transport;
81
82 # if we are here and are idle, we nuke ourselves
83 delete $AnyEvent::MP::Kernel::NODE{$self->{id}}
84 unless $self->{transport} || $self->{connect_to};
81} 85}
82 86
83# called after handshake was successful 87# called after handshake was successful
84sub transport_connect { 88sub transport_connect {
85 my ($self, $transport) = @_; 89 my ($self, $transport) = @_;
86 90
87 delete $self->{trial}; 91 delete $self->{trial};
88 92
89 $self->transport_error (transport_error => "switched connections") 93 $self->transport_error (transport_error => $self->{id}, "switched connections")
90 if $self->{transport}; 94 if $self->{transport};
91 95
92 delete $self->{connect_w}; 96 delete $self->{connect_w};
93 delete $self->{connect_to}; 97 delete $self->{connect_to};
94 98
95 $self->{transport} = $transport; 99 $self->{transport} = $transport;
96 100
97 my $transport_send = $transport->can ("send"); 101 my $transport_send = $transport->{send};
98
99 $self->{send} = sub {
100 $transport_send->($transport, $_[0]);
101 };
102 102
103 AnyEvent::MP::Kernel::_inject_nodeevent ($self, 1); 103 AnyEvent::MP::Kernel::_inject_nodeevent ($self, 1);
104 104
105 $self->{send} = $transport_send;
106
105 $transport->send ($_) 107 $transport_send->($_)
106 for @{ delete $self->{queue} || [] }; 108 for @{ delete $self->{queue} || [] };
107} 109}
108 110
109sub connect { 111sub connect {
110 my ($self, @addresses) = @_; 112 my ($self) = @_;
111 113
112 return if $self->{transport}; 114 return if $self->{transport};
115 return if $self->{connect_w};
113 116
114 Scalar::Util::weaken $self; 117 Scalar::Util::weaken $self;
118 use Carp;Carp::cluck;#d#
115 119
116 $self->{connect_to} ||= AE::timer 120 $self->{connect_to} ||= AE::timer $AnyEvent::MP::Kernel::CONFIG->{connect_interval}, 0, sub {
117 $AnyEvent::MP::Config::CFG{monitor_timeout} || $AnyEvent::MP::Kernel::MONITOR_TIMEOUT,
118 0,
119 sub {
120 $self->transport_error (transport_error => $self->{id}, "unable to connect"); 121 $self->transport_error (transport_error => $self->{id}, "unable to connect");
122 };
123
124 # maybe @$addresses?
125 my $addresses = $AnyEvent::MP::Kernel::GLOBAL_DB{"'l"}{$self->{id}};
126
127 if ($addresses) {
128 $self->connect_to ($addresses);
129 } else {
130 # on global nodes, all bets are off now - we either know the node, or we don't
131 unless ($AnyEvent::MP::Kernel::GLOBAL) {
132 AnyEvent::MP::Kernel::g_find ($self->{id});
133 }
134 }
135}
136
137sub connect_to {
138 my ($self, $addresses) = @_;
139
140 return if $self->{transport};
141 return if $self->{connect_w};
142
143 return unless @$addresses;
144
145 AE::log 9 => "connecting to $self->{id} with [@$addresses]";
146
147 my $monitor = $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout};
148 my $interval = $AnyEvent::MP::Kernel::CONFIG->{connect_interval};
149
150 $interval = ($monitor - $interval) / @$addresses
151 if ($monitor - $interval) / @$addresses < $interval;
152
153 $interval = 0.4 if $interval < 0.4;
154
155 my @endpoints = reverse @$addresses;
156
157 $self->{connect_w} = AE::timer 0, $interval * (0.9 + 0.1 * rand), sub {
158 my $endpoint = pop @endpoints;
159
160 AE::log 9 => "connecting to $self->{id} at $endpoint";
161
162 $self->{trial}{$endpoint} ||= do {
163 my ($host, $port) = AnyEvent::Socket::parse_hostport $endpoint
164 or return AE::log critical => "$self->{id}: '$endpoint' is not a resolved node reference.";
165
166 AnyEvent::MP::Transport::mp_connect
167 $host, $port,
168 sub { delete $self->{trial}{$endpoint} },
121 }; 169 };
122
123 return unless @addresses;
124
125 $AnyEvent::MP::Kernel::WARN->(9, "connecting to $self->{id} with [@addresses]");
126
127 unless ($self->{connect_w}) {
128 my @endpoints;
129
130 $self->{connect_w} = AE::timer
131 rand,
132 $AnyEvent::MP::Config::CFG{connect_interval} || $AnyEvent::MP::Kernel::CONNECT_INTERVAL,
133 sub {
134 @endpoints = @addresses
135 unless @endpoints;
136
137 my $endpoint = shift @endpoints;
138
139 $AnyEvent::MP::Kernel::WARN->(9, "connecting to $self->{id} at $endpoint");
140
141 $self->{trial}{$endpoint} ||= do {
142 my ($host, $port) = AnyEvent::Socket::parse_hostport $endpoint
143 or return $AnyEvent::MP::Kernel::WARN->(1, "$self->{id}: not a resolved node reference.");
144
145 AnyEvent::MP::Transport::mp_connect
146 $host, $port,
147 sub { delete $self->{trial}{$endpoint} },
148 };
149 }
150 ;
151 } 170 };
152} 171}
153 172
154sub kill { 173sub kill {
155 my ($self, $port, @reason) = @_; 174 my ($self, $port, @reason) = @_;
156 175
157 $self->send (["", kil => $port, @reason]); 176 $self->{send} (["", kil => $port, @reason]);
158} 177}
159 178
160sub monitor { 179sub monitor {
161 my ($self, $portid, $cb) = @_; 180 my ($self, $portid, $cb) = @_;
162 181
180 $self->send (["", mon0 => $portid]); 199 $self->send (["", mon0 => $portid]);
181 delete $self->{monitor}{$portid}; 200 delete $self->{monitor}{$portid};
182 } 201 }
183} 202}
184 203
185# used for direct slave connections as well
186package AnyEvent::MP::Node::Direct;
187
188use base "AnyEvent::MP::Node::External";
189
190package AnyEvent::MP::Node::Self; 204package AnyEvent::MP::Node::Self; # the local node
191 205
192use base "AnyEvent::MP::Node"; 206use base "AnyEvent::MP::Node";
193 207
194sub connect { 208sub connect {
195 # we are trivially connected 209 # we are trivially connected
196} 210}
197 211
212# delay every so often to avoid recursion, also used to delay after spawn
213our $DELAY = -50;
214our @DELAY;
215our $DELAY_W;
216
217our $send_delayed = sub {
218 $AnyEvent::MP::Kernel::SRCNODE = $AnyEvent::MP::Kernel::NODE;
219 (shift @DELAY)->()
220 while @DELAY;
221 undef $DELAY_W;
222 $DELAY = -50;
223};
224
198sub transport_reset { 225sub transport_reset {
199 my ($self) = @_; 226 my ($self) = @_;
200 227
201 Scalar::Util::weaken $self; 228 Scalar::Util::weaken $self;
202 229
203 $self->{send} = sub { 230 $self->{send} = sub {
231 if (++$DELAY > 0) {
232 my $msg = $_[0];
233 push @DELAY, sub { AnyEvent::MP::Kernel::_inject (@$msg) };
234 $DELAY_W ||= AE::timer 0, 0, $send_delayed;
235 return;
236 }
237
204 local $AnyEvent::MP::Kernel::SRCNODE = $self; 238 local $AnyEvent::MP::Kernel::SRCNODE = $AnyEvent::MP::Kernel::NODE;
205 AnyEvent::MP::Kernel::_inject (@{ $_[0] }); 239 AnyEvent::MP::Kernel::_inject (@{ $_[0] });
206 }; 240 };
207} 241}
208 242
209sub transport_connect { 243sub transport_connect {
210 my ($self, $tp) = @_; 244 my ($self, $tp) = @_;
211 245
212 $AnyEvent::MP::Kernel::WARN->(9, "I refuse to talk to myself ($tp->{peerhost}:$tp->{peerport})"); 246 AE::log 9 => "I refuse to talk to myself ($tp->{peerhost}:$tp->{peerport})";
213} 247}
214 248
215sub kill { 249sub kill {
216 my ($self, $port, @reason) = @_; 250 my (undef, @args) = @_;
217 251
218 delete $AnyEvent::MP::Kernel::PORT{$port}; 252 # we _always_ delay kil's, to avoid calling mon callbacks
219 delete $AnyEvent::MP::Kernel::PORT_DATA{$port}; 253 # from anything but the event loop context.
220 254 $DELAY = 1;
221 my $mon = delete $AnyEvent::MP::Kernel::LMON{$port} 255 push @DELAY, sub { AnyEvent::MP::Kernel::_kill (@args) };
222 or !@reason 256 $DELAY_W ||= AE::timer 0, 0, $send_delayed;
223 or $AnyEvent::MP::Kernel::WARN->(2, "unmonitored local port $port died with reason: @reason");
224
225 $_->(@reason) for values %$mon;
226} 257}
227 258
228sub monitor { 259sub monitor {
229 my ($self, $portid, $cb) = @_; 260 # maybe always delay, too?
230 261 if ($DELAY_W) {
231 return $cb->(no_such_port => "cannot monitor nonexistent port") 262 my @args = @_;
232 unless exists $AnyEvent::MP::Kernel::PORT{$portid}; 263 push @DELAY, sub { AnyEvent::MP::Kernel::_monitor (@args) };
233 264 return;
234 $AnyEvent::MP::Kernel::LMON{$portid}{$cb+0} = $cb; 265 }
266 &AnyEvent::MP::Kernel::_monitor;
235} 267}
236 268
237sub unmonitor { 269sub unmonitor {
238 my ($self, $portid, $cb) = @_; 270 # no need to always delay
271 if ($DELAY_W) {
272 my @args = @_;
273 push @DELAY, sub { AnyEvent::MP::Kernel::_unmonitor (@args) };
274 return;
275 }
239 276
240 delete $AnyEvent::MP::Kernel::LMON{$portid}{$cb+0}; 277 &AnyEvent::MP::Kernel::_unmonitor;
241} 278}
242 279
243=head1 SEE ALSO 280=head1 SEE ALSO
244 281
245L<AnyEvent::MP>. 282L<AnyEvent::MP>.

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines