ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Node.pm
(Generate patch)

Comparing AnyEvent-MP/MP/Node.pm (file contents):
Revision 1.31 by root, Fri Aug 28 23:06:33 2009 UTC vs.
Revision 1.68 by root, Sun Aug 28 09:39:23 2016 UTC

10 10
11This is an internal utility module, horrible to look at, so don't. 11This is an internal utility module, horrible to look at, so don't.
12 12
13=cut 13=cut
14 14
15package AnyEvent::MP::Node; 15package AnyEvent::MP::Node; # base class for nodes
16 16
17use common::sense; 17use common::sense;
18 18
19use AE ();
20use AnyEvent::Util (); 19use AnyEvent ();
21use AnyEvent::Socket (); 20use AnyEvent::Socket ();
22 21
23use AnyEvent::MP::Transport (); 22use AnyEvent::MP::Transport ();
24 23
25sub new { 24sub new {
26 my ($self, $id) = @_; 25 my ($self, $id) = @_;
27 26
28 $self = bless { id => $id }, $self; 27 $self = bless { id => $id }, $self;
29 28
29 # register
30 $AnyEvent::MP::Kernel::NODE{$id} = $self;
31
30 $self->init; 32 $self->init;
31 $self->transport_reset; 33 $self->transport_reset;
32 34
33 $self 35 $self
34} 36}
35 37
38sub DESTROY {
39 # unregister
40 delete $AnyEvent::MP::Kernel::NODE{$_[0]{id}};
41}
42
36sub init { 43sub init {
37 # 44 #
38} 45}
39 46
40sub send { 47sub send {
41 &{ shift->{send} } 48 &{ shift->{send} }
42} 49}
43 50
44# nodes reachable via the network 51# nodes reachable via the network
45package AnyEvent::MP::Node::External; 52package AnyEvent::MP::Node::Remote; # a remote node
46 53
47use base "AnyEvent::MP::Node"; 54use base "AnyEvent::MP::Node";
48 55
49# called at init time, mostly sets {send} 56# called at init time, mostly sets {send}
50sub transport_reset { 57sub transport_reset {
58 push @{$self->{queue}}, shift; 65 push @{$self->{queue}}, shift;
59 $self->connect; 66 $self->connect;
60 }; 67 };
61} 68}
62 69
63# called only after successful handshake 70# called each time we fail to establish a connection,
71# or the existing connection failed
64sub transport_error { 72sub transport_error {
65 my ($self, @reason) = @_; 73 my ($self, @reason) = @_;
66 74
67 my $no_transport = !$self->{transport}; 75 my $no_transport = !$self->{transport};
68 76
76 $_->(@reason) for map @$_, values %$mon; 84 $_->(@reason) for map @$_, values %$mon;
77 } 85 }
78 86
79 AnyEvent::MP::Kernel::_inject_nodeevent ($self, 0, @reason) 87 AnyEvent::MP::Kernel::_inject_nodeevent ($self, 0, @reason)
80 unless $no_transport; 88 unless $no_transport;
89
90 # we weaken the node reference, so it can go away if unused
91 Scalar::Util::weaken $AnyEvent::MP::Kernel::NODE{$self->{id}}
92 unless $self->{connect_to};
93
94 AE::log 9 => "@reason";
81} 95}
82 96
83# called after handshake was successful 97# called after handshake was successful
84sub transport_connect { 98sub transport_connect {
85 my ($self, $transport) = @_; 99 my ($self, $transport) = @_;
86 100
87 delete $self->{trial}; 101 delete $self->{trial};
88 102
89 $self->transport_error (transport_error => "switched connections") 103 $self->transport_error (transport_error => $self->{id}, "switched connections")
90 if $self->{transport}; 104 if $self->{transport};
91 105
92 delete $self->{connect_w}; 106 delete $self->{connect_w};
93 delete $self->{connect_to}; 107 delete $self->{connect_to};
94 108
95 $self->{transport} = $transport; 109 $self->{transport} = $transport;
96 110
97 my $transport_send = $transport->can ("send"); 111 my $transport_send = $transport->{send};
98 112
99 $self->{send} = sub { 113 AnyEvent::MP::Kernel::_inject_nodeevent ($self, 1);
100 $transport_send->($transport, $_[0]); 114
115 $self->{send} = $transport_send;
116
117 $transport_send->($_)
118 for @{ delete $self->{queue} || [] };
119}
120
121sub connect {
122 my ($self) = @_;
123
124 return if $self->{transport};
125 return if $self->{connect_w};
126
127 # we unweaken the node reference, in case it was weakened before
128 $AnyEvent::MP::Kernel::NODE{$self->{id}}
129 = $AnyEvent::MP::Kernel::NODE{$self->{id}};
130
131 Scalar::Util::weaken $self;
132
133 $self->{connect_to} ||= AE::timer $AnyEvent::MP::Kernel::CONFIG->{connect_interval}, 0, sub {
134 $self->transport_error (transport_error => $self->{id}, "connect timeout");
101 }; 135 };
102 136
103 AnyEvent::MP::Kernel::_inject_nodeevent ($self, 1); 137 # maybe @$addresses?
138 my $addresses = $AnyEvent::MP::Kernel::GLOBAL_DB{"'l"}{$self->{id}};
104 139
105 $transport->send ($_) 140 if ($addresses) {
106 for @{ delete $self->{queue} || [] }; 141 $self->connect_to ($addresses);
142 } else {
143 # on global nodes, all bets are off now - we either know the node, or we don't
144 if ($AnyEvent::MP::Kernel::GLOBAL) {
145 $self->transport_error (transport_error => $self->{id}, "no known address");
146 } else {
147 AnyEvent::MP::Kernel::g_find ($self->{id});
148 }
149 }
107} 150}
108 151
109sub connect { 152sub connect_to {
110 my ($self, @addresses) = @_; 153 my ($self, $addresses) = @_;
111 154
112 return if $self->{transport}; 155 return if $self->{transport};
156 return if $self->{connect_w};
113 157
114 Scalar::Util::weaken $self; 158 unless (@$addresses) {
159 $self->transport_error (transport_error => $self->{id}, "no known address");
160 return;
161 }
162
163 AE::log 9 => "connecting to $self->{id} with [@$addresses]";
115 164
116 $self->{connect_to} ||= AE::timer 165 my $monitor = $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout};
117 $AnyEvent::MP::Config::CFG{monitor_timeout} || $AnyEvent::MP::Kernel::MONITOR_TIMEOUT, 166 my $interval = $AnyEvent::MP::Kernel::CONFIG->{connect_interval};
118 0, 167
119 sub { 168 $interval = ($monitor - $interval) / @$addresses
169 if ($monitor - $interval) / @$addresses < $interval;
170
171 $interval = 0.4 if $interval < 0.4;
172
173 my @endpoints = reverse @$addresses;
174
175 $self->{connect_w} = AE::timer 0, $interval * (0.9 + 0.1 * rand), sub {
176 my $endpoint = pop @endpoints
120 $self->transport_error (transport_error => $self->{id}, "unable to connect"); 177 or return $self->transport_error (transport_error => $self->{id}, "unable to connect to any address");
178
179 AE::log 9 => "connecting to $self->{id} at $endpoint";
180
181 $self->{trial}{$endpoint} ||= do {
182 my ($host, $port) = AnyEvent::Socket::parse_hostport $endpoint
183 or return AE::log critical => "$self->{id}: '$endpoint' is not a resolved node reference.";
184
185 AnyEvent::MP::Transport::mp_connect
186 $host, $port,
187 sub { delete $self->{trial}{$endpoint} },
121 }; 188 };
122
123 return unless @addresses;
124
125 $AnyEvent::MP::Kernel::WARN->(9, "connecting to $self->{id} with [@addresses]");
126
127 unless ($self->{connect_w}) {
128 my @endpoints;
129
130 $self->{connect_w} = AE::timer
131 rand,
132 $AnyEvent::MP::Config::CFG{connect_interval} || $AnyEvent::MP::Kernel::CONNECT_INTERVAL,
133 sub {
134 @endpoints = @addresses
135 unless @endpoints;
136
137 my $endpoint = shift @endpoints;
138
139 $AnyEvent::MP::Kernel::WARN->(9, "connecting to $self->{id} at $endpoint");
140
141 $self->{trial}{$endpoint} ||= do {
142 my ($host, $port) = AnyEvent::Socket::parse_hostport $endpoint
143 or return $AnyEvent::MP::Kernel::WARN->(1, "$self->{id}: not a resolved node reference.");
144
145 AnyEvent::MP::Transport::mp_connect
146 $host, $port,
147 sub { delete $self->{trial}{$endpoint} },
148 };
149 }
150 ;
151 } 189 };
152} 190}
153 191
154sub kill { 192sub kill {
155 my ($self, $port, @reason) = @_; 193 my ($self, $port, @reason) = @_;
156 194
157 $self->send (["", kil => $port, @reason]); 195 $self->{send} (["", kil1 => $port, @reason]);
158} 196}
159 197
160sub monitor { 198sub monitor {
161 my ($self, $portid, $cb) = @_; 199 my ($self, $portid, $cb) = @_;
162 200
180 $self->send (["", mon0 => $portid]); 218 $self->send (["", mon0 => $portid]);
181 delete $self->{monitor}{$portid}; 219 delete $self->{monitor}{$portid};
182 } 220 }
183} 221}
184 222
185# used for direct slave connections as well
186package AnyEvent::MP::Node::Direct;
187
188use base "AnyEvent::MP::Node::External";
189
190package AnyEvent::MP::Node::Self; 223package AnyEvent::MP::Node::Self; # the local node
191 224
192use base "AnyEvent::MP::Node"; 225use base "AnyEvent::MP::Node";
193 226
194sub connect { 227sub connect {
195 # we are trivially connected 228 # we are trivially connected
196} 229}
197 230
231# delay every so often to avoid recursion, also used to delay after spawn
232our $DELAY = -50;
233our @DELAY;
234our $DELAY_W;
235
236our $send_delayed = sub {
237 $AnyEvent::MP::Kernel::SRCNODE = $AnyEvent::MP::Kernel::NODE;
238 (shift @DELAY)->()
239 while @DELAY;
240 undef $DELAY_W;
241 $DELAY = -50;
242};
243
198sub transport_reset { 244sub transport_reset {
199 my ($self) = @_; 245 my ($self) = @_;
200 246
201 Scalar::Util::weaken $self; 247 Scalar::Util::weaken $self;
202 248
203 $self->{send} = sub { 249 $self->{send} = sub {
250 if (++$DELAY > 0) {
251 my $msg = $_[0];
252 push @DELAY, sub { AnyEvent::MP::Kernel::_inject (@$msg) };
253 $DELAY_W ||= AE::timer 0, 0, $send_delayed;
254 return;
255 }
256
204 local $AnyEvent::MP::Kernel::SRCNODE = $self; 257 local $AnyEvent::MP::Kernel::SRCNODE = $AnyEvent::MP::Kernel::NODE;
205 AnyEvent::MP::Kernel::_inject (@{ $_[0] }); 258 AnyEvent::MP::Kernel::_inject (@{ $_[0] });
206 }; 259 };
207} 260}
208 261
209sub transport_connect { 262sub transport_connect {
210 my ($self, $tp) = @_; 263 my ($self, $tp) = @_;
211 264
212 $AnyEvent::MP::Kernel::WARN->(9, "I refuse to talk to myself ($tp->{peerhost}:$tp->{peerport})"); 265 AE::log 9 => "I refuse to talk to myself ($tp->{peerhost}:$tp->{peerport})";
213} 266}
214 267
215sub kill { 268sub kill {
216 my ($self, $port, @reason) = @_; 269 my (undef, @args) = @_;
217 270
218 delete $AnyEvent::MP::Kernel::PORT{$port}; 271 # we _always_ delay kil's, to avoid calling mon callbacks
219 delete $AnyEvent::MP::Kernel::PORT_DATA{$port}; 272 # from anything but the event loop context.
220 273 $DELAY = 1;
221 my $mon = delete $AnyEvent::MP::Kernel::LMON{$port} 274 push @DELAY, sub { AnyEvent::MP::Kernel::_kill (@args) };
222 or !@reason 275 $DELAY_W ||= AE::timer 0, 0, $send_delayed;
223 or $AnyEvent::MP::Kernel::WARN->(2, "unmonitored local port $port died with reason: @reason");
224
225 $_->(@reason) for values %$mon;
226} 276}
227 277
228sub monitor { 278sub monitor {
229 my ($self, $portid, $cb) = @_; 279 # maybe always delay, too?
230 280 if ($DELAY_W) {
231 return $cb->(no_such_port => "cannot monitor nonexistent port") 281 my @args = @_;
232 unless exists $AnyEvent::MP::Kernel::PORT{$portid}; 282 push @DELAY, sub { AnyEvent::MP::Kernel::_monitor (@args) };
233 283 return;
234 $AnyEvent::MP::Kernel::LMON{$portid}{$cb+0} = $cb; 284 }
285 &AnyEvent::MP::Kernel::_monitor;
235} 286}
236 287
237sub unmonitor { 288sub unmonitor {
238 my ($self, $portid, $cb) = @_; 289 # no need to always delay
290 if ($DELAY_W) {
291 my @args = @_;
292 push @DELAY, sub { AnyEvent::MP::Kernel::_unmonitor (@args) };
293 return;
294 }
239 295
240 delete $AnyEvent::MP::Kernel::LMON{$portid}{$cb+0}; 296 &AnyEvent::MP::Kernel::_unmonitor;
241} 297}
242 298
243=head1 SEE ALSO 299=head1 SEE ALSO
244 300
245L<AnyEvent::MP>. 301L<AnyEvent::MP>.

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines