ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Node.pm
(Generate patch)

Comparing AnyEvent-MP/MP/Node.pm (file contents):
Revision 1.58 by root, Fri Mar 2 19:21:16 2012 UTC vs.
Revision 1.67 by root, Sun Aug 28 08:23:34 2016 UTC

10 10
11This is an internal utility module, horrible to look at, so don't. 11This is an internal utility module, horrible to look at, so don't.
12 12
13=cut 13=cut
14 14
15package AnyEvent::MP::Node; 15package AnyEvent::MP::Node; # base class for nodes
16 16
17use common::sense; 17use common::sense;
18 18
19use AE ();
20use AnyEvent::Util (); 19use AnyEvent ();
21use AnyEvent::Socket (); 20use AnyEvent::Socket ();
22 21
23use AnyEvent::MP::Transport (); 22use AnyEvent::MP::Transport ();
24 23
25sub new { 24sub new {
26 my ($self, $id) = @_; 25 my ($self, $id) = @_;
27 26
28 $self = bless { id => $id }, $self; 27 $self = bless { id => $id }, $self;
29 28
29 # register
30 $AnyEvent::MP::Kernel::NODE{$id} = $self;
31
30 $self->init; 32 $self->init;
31 $self->transport_reset; 33 $self->transport_reset;
32 34
33 $self 35 $self
34} 36}
35 37
38sub DESTROY {
39 # unregister
40 delete $AnyEvent::MP::Kernel::NODE{$_[0]{id}};
41}
42
36sub init { 43sub init {
37 # 44 #
38} 45}
39 46
40sub send { 47sub send {
41 &{ shift->{send} } 48 &{ shift->{send} }
42} 49}
43 50
44# nodes reachable via the network 51# nodes reachable via the network
45package AnyEvent::MP::Node::Remote; 52package AnyEvent::MP::Node::Remote; # a remote node
46 53
47use base "AnyEvent::MP::Node"; 54use base "AnyEvent::MP::Node";
48 55
49# called at init time, mostly sets {send} 56# called at init time, mostly sets {send}
50sub transport_reset { 57sub transport_reset {
78 } 85 }
79 86
80 AnyEvent::MP::Kernel::_inject_nodeevent ($self, 0, @reason) 87 AnyEvent::MP::Kernel::_inject_nodeevent ($self, 0, @reason)
81 unless $no_transport; 88 unless $no_transport;
82 89
83 # if we are here and are idle, we nuke ourselves 90 # we weaken the node reference, so it can go away if unused
84 delete $AnyEvent::MP::Kernel::NODE{$self->{id}} 91 Scalar::Util::weaken $AnyEvent::MP::Kernel::NODE{$self->{id}}
85 unless $self->{transport} || $self->{connect_to}; 92 unless $self->{connect_to};
93
94 AE::log 9 => "@reason";
86} 95}
87 96
88# called after handshake was successful 97# called after handshake was successful
89sub transport_connect { 98sub transport_connect {
90 my ($self, $transport) = @_; 99 my ($self, $transport) = @_;
92 delete $self->{trial}; 101 delete $self->{trial};
93 102
94 $self->transport_error (transport_error => $self->{id}, "switched connections") 103 $self->transport_error (transport_error => $self->{id}, "switched connections")
95 if $self->{transport}; 104 if $self->{transport};
96 105
97 delete $self->{connect_addr};
98 delete $self->{connect_w}; 106 delete $self->{connect_w};
99 delete $self->{connect_to}; 107 delete $self->{connect_to};
100 108
101 $self->{transport} = $transport; 109 $self->{transport} = $transport;
102 110
114 my ($self) = @_; 122 my ($self) = @_;
115 123
116 return if $self->{transport}; 124 return if $self->{transport};
117 return if $self->{connect_w}; 125 return if $self->{connect_w};
118 126
127 # we unweaken the node reference, in case it was weakened before
128 $AnyEvent::MP::Kernel::NODE{$self->{id}}
129 = $AnyEvent::MP::Kernel::NODE{$self->{id}};
130
119 Scalar::Util::weaken $self; 131 Scalar::Util::weaken $self;
120 132
121 my $monitor = $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout}; 133 $self->{connect_to} ||= AE::timer $AnyEvent::MP::Kernel::CONFIG->{connect_interval}, 0, sub {
122
123 $self->{connect_to} ||= AE::timer $monitor, 0, sub {
124 $self->transport_error (transport_error => $self->{id}, "unable to connect"); 134 $self->transport_error (transport_error => $self->{id}, "connect timeout");
125 }; 135 };
126 136
127 # maybe @$addresses? 137 # maybe @$addresses?
128 my @addresses = @{ $AnyEvent::MP::Kernel::GLOBAL_DB{"'l"}{$self->{id}} }; 138 my $addresses = $AnyEvent::MP::Kernel::GLOBAL_DB{"'l"}{$self->{id}};
129 139
130 unless (@addresses) { 140 if ($addresses) {
141 $self->connect_to ($addresses);
142 } else {
131 # on global nodes, all bets are off now - we either know the node, or we don't 143 # on global nodes, all bets are off now - we either know the node, or we don't
132 unless ($AnyEvent::MP::Kernel::GLOBAL) { 144 if ($AnyEvent::MP::Kernel::GLOBAL) {
145 $self->transport_error (transport_error => $self->{id}, "no known address");
146 } else {
133 AnyEvent::MP::Kernel::g_find ($self->{id}); 147 AnyEvent::MP::Kernel::g_find ($self->{id});
134 } 148 }
149 }
150}
151
152sub connect_to {
153 my ($self, $addresses) = @_;
154
155 return if $self->{transport};
156 return if $self->{connect_w};
157
158 unless (@$addresses) {
159 $self->transport_error (transport_error => $self->{id}, "no known address");
135 return; 160 return;
136 } 161 }
137 162
138 $self->{connect_addr} = \@addresses; # a bit weird, but efficient
139
140 $AnyEvent::MP::Kernel::WARN->(9, "connecting to $self->{id} with [@addresses]"); 163 AE::log 9 => "connecting to $self->{id} with [@$addresses]";
141 164
165 my $monitor = $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout};
142 my $interval = $AnyEvent::MP::Kernel::CONFIG->{connect_interval}; 166 my $interval = $AnyEvent::MP::Kernel::CONFIG->{connect_interval};
143 167
144 $interval = ($monitor - $interval) / @addresses 168 $interval = ($monitor - $interval) / @$addresses
145 if ($monitor - $interval) / @addresses < $interval; 169 if ($monitor - $interval) / @$addresses < $interval;
146 170
147 $interval = 0.4 if $interval < 0.4; 171 $interval = 0.4 if $interval < 0.4;
148 172
149 my @endpoints; 173 my @endpoints = reverse @$addresses;
150 174
151 $self->{connect_w} = AE::timer 0, $interval * (0.9 + 0.1 * rand), sub { 175 $self->{connect_w} = AE::timer 0, $interval * (0.9 + 0.1 * rand), sub {
152 @endpoints = @addresses
153 unless @endpoints;
154
155 my $endpoint = shift @endpoints; 176 my $endpoint = pop @endpoints;
156 177
157 $AnyEvent::MP::Kernel::WARN->(9, "connecting to $self->{id} at $endpoint"); 178 AE::log 9 => "connecting to $self->{id} at $endpoint";
158 179
159 $self->{trial}{$endpoint} ||= do { 180 $self->{trial}{$endpoint} ||= do {
160 my ($host, $port) = AnyEvent::Socket::parse_hostport $endpoint 181 my ($host, $port) = AnyEvent::Socket::parse_hostport $endpoint
161 or return $AnyEvent::MP::Kernel::WARN->(1, "$self->{id}: not a resolved node reference."); 182 or return AE::log critical => "$self->{id}: '$endpoint' is not a resolved node reference.";
162 183
163 AnyEvent::MP::Transport::mp_connect 184 AnyEvent::MP::Transport::mp_connect
164 $host, $port, 185 $host, $port,
165 sub { delete $self->{trial}{$endpoint} }, 186 sub { delete $self->{trial}{$endpoint} },
166 }; 187 };
168} 189}
169 190
170sub kill { 191sub kill {
171 my ($self, $port, @reason) = @_; 192 my ($self, $port, @reason) = @_;
172 193
173 $self->{send} (["", kil => $port, @reason]); 194 $self->{send} (["", kil1 => $port, @reason]);
174} 195}
175 196
176sub monitor { 197sub monitor {
177 my ($self, $portid, $cb) = @_; 198 my ($self, $portid, $cb) = @_;
178 199
196 $self->send (["", mon0 => $portid]); 217 $self->send (["", mon0 => $portid]);
197 delete $self->{monitor}{$portid}; 218 delete $self->{monitor}{$portid};
198 } 219 }
199} 220}
200 221
201package AnyEvent::MP::Node::Self; 222package AnyEvent::MP::Node::Self; # the local node
202 223
203use base "AnyEvent::MP::Node"; 224use base "AnyEvent::MP::Node";
204 225
205sub connect { 226sub connect {
206 # we are trivially connected 227 # we are trivially connected
209# delay every so often to avoid recursion, also used to delay after spawn 230# delay every so often to avoid recursion, also used to delay after spawn
210our $DELAY = -50; 231our $DELAY = -50;
211our @DELAY; 232our @DELAY;
212our $DELAY_W; 233our $DELAY_W;
213 234
214sub _send_delayed { 235our $send_delayed = sub {
215 local $AnyEvent::MP::Kernel::SRCNODE = $AnyEvent::MP::Kernel::NODE{""}; 236 $AnyEvent::MP::Kernel::SRCNODE = $AnyEvent::MP::Kernel::NODE;
216 (shift @DELAY)->() 237 (shift @DELAY)->()
217 while @DELAY; 238 while @DELAY;
218 undef $DELAY_W; 239 undef $DELAY_W;
219 $DELAY = -50; 240 $DELAY = -50;
220} 241};
221 242
222sub transport_reset { 243sub transport_reset {
223 my ($self) = @_; 244 my ($self) = @_;
224 245
225 Scalar::Util::weaken $self; 246 Scalar::Util::weaken $self;
226 247
227 $self->{send} = sub { 248 $self->{send} = sub {
228 if ($DELAY++ >= 0) { 249 if (++$DELAY > 0) {
229 my $msg = $_[0]; 250 my $msg = $_[0];
230 push @DELAY, sub { AnyEvent::MP::Kernel::_inject (@$msg) }; 251 push @DELAY, sub { AnyEvent::MP::Kernel::_inject (@$msg) };
231 $DELAY_W ||= AE::timer 0, 0, \&_send_delayed; 252 $DELAY_W ||= AE::timer 0, 0, $send_delayed;
232 return; 253 return;
233 } 254 }
234 255
235 local $AnyEvent::MP::Kernel::SRCNODE = $self; 256 local $AnyEvent::MP::Kernel::SRCNODE = $AnyEvent::MP::Kernel::NODE;
236 AnyEvent::MP::Kernel::_inject (@{ $_[0] }); 257 AnyEvent::MP::Kernel::_inject (@{ $_[0] });
237 }; 258 };
238} 259}
239 260
240sub transport_connect { 261sub transport_connect {
241 my ($self, $tp) = @_; 262 my ($self, $tp) = @_;
242 263
243 $AnyEvent::MP::Kernel::WARN->(9, "I refuse to talk to myself ($tp->{peerhost}:$tp->{peerport})"); 264 AE::log 9 => "I refuse to talk to myself ($tp->{peerhost}:$tp->{peerport})";
244} 265}
245 266
246sub kill { 267sub kill {
247 my (undef, @args) = @_; 268 my (undef, @args) = @_;
248 269
249 # we _always_ delay kil's, to avoid calling mon callbacks 270 # we _always_ delay kil's, to avoid calling mon callbacks
250 # from anything but the event loop context. 271 # from anything but the event loop context.
251 $DELAY = 1; 272 $DELAY = 1;
252 push @DELAY, sub { AnyEvent::MP::Kernel::_kill (@args) }; 273 push @DELAY, sub { AnyEvent::MP::Kernel::_kill (@args) };
253 $DELAY_W ||= AE::timer 0, 0, \&_send_delayed; 274 $DELAY_W ||= AE::timer 0, 0, $send_delayed;
254} 275}
255 276
256sub monitor { 277sub monitor {
257 # maybe always delay, too? 278 # maybe always delay, too?
258 if ($DELAY_W) { 279 if ($DELAY_W) {

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines