ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Node.pm
(Generate patch)

Comparing AnyEvent-MP/MP/Node.pm (file contents):
Revision 1.56 by root, Thu Mar 1 18:11:56 2012 UTC vs.
Revision 1.69 by root, Sun Aug 28 14:49:44 2016 UTC

10 10
11This is an internal utility module, horrible to look at, so don't. 11This is an internal utility module, horrible to look at, so don't.
12 12
13=cut 13=cut
14 14
15package AnyEvent::MP::Node; 15package AnyEvent::MP::Node; # base class for nodes
16 16
17use common::sense; 17use common::sense;
18 18
19use AE ();
20use AnyEvent::Util (); 19use AnyEvent ();
21use AnyEvent::Socket (); 20use AnyEvent::Socket ();
22 21
23use AnyEvent::MP::Transport (); 22use AnyEvent::MP::Transport ();
24 23
25sub new { 24sub new {
26 my ($self, $id) = @_; 25 my ($self, $id) = @_;
27 26
28 $self = bless { id => $id }, $self; 27 $self = bless { id => $id }, $self;
29 28
29 # register
30 $AnyEvent::MP::Kernel::NODE{$id} = $self;
31
30 $self->init; 32 $self->init;
31 $self->transport_reset; 33 $self->transport_reset;
32 34
33 $self 35 $self
34} 36}
35 37
38sub DESTROY {
39 # unregister
40 delete $AnyEvent::MP::Kernel::NODE{$_[0]{id}};
41}
42
36sub init { 43sub init {
37 # 44 #
38} 45}
39 46
40#d# only needed for bad global code at the moment
41sub transport_error {
42}
43
44sub send { 47sub send {
45 &{ shift->{send} } 48 &{ shift->{send} }
46} 49}
47 50
48# nodes reachable via the network 51# nodes reachable via the network
49package AnyEvent::MP::Node::Remote; 52package AnyEvent::MP::Node::Remote; # a remote node
50 53
51use base "AnyEvent::MP::Node"; 54use base "AnyEvent::MP::Node";
52 55
53# called at init time, mostly sets {send} 56# called at init time, mostly sets {send}
54sub transport_reset { 57sub transport_reset {
82 } 85 }
83 86
84 AnyEvent::MP::Kernel::_inject_nodeevent ($self, 0, @reason) 87 AnyEvent::MP::Kernel::_inject_nodeevent ($self, 0, @reason)
85 unless $no_transport; 88 unless $no_transport;
86 89
87 # if we are here and are idle, we nuke ourselves 90 # we weaken the node reference, so it can go away if unused
88 delete $AnyEvent::MP::Kernel::NODE{$self->{id}} 91 Scalar::Util::weaken $AnyEvent::MP::Kernel::NODE{$self->{id}}
89 unless $self->{transport} || $self->{connect_to}; 92 unless $self->{connect_to};
93
94 AE::log 9 => "@reason";
90} 95}
91 96
92# called after handshake was successful 97# called after handshake was successful
93sub transport_connect { 98sub transport_connect {
94 my ($self, $transport) = @_; 99 my ($self, $transport) = @_;
96 delete $self->{trial}; 101 delete $self->{trial};
97 102
98 $self->transport_error (transport_error => $self->{id}, "switched connections") 103 $self->transport_error (transport_error => $self->{id}, "switched connections")
99 if $self->{transport}; 104 if $self->{transport};
100 105
101 delete $self->{connect_addr};
102 delete $self->{connect_w}; 106 delete $self->{connect_w};
103 delete $self->{connect_to}; 107 delete $self->{connect_to};
104 108
105 $self->{transport} = $transport; 109 $self->{transport} = $transport;
106 110
118 my ($self) = @_; 122 my ($self) = @_;
119 123
120 return if $self->{transport}; 124 return if $self->{transport};
121 return if $self->{connect_w}; 125 return if $self->{connect_w};
122 126
127 # we unweaken the node reference, in case it was weakened before
128 $AnyEvent::MP::Kernel::NODE{$self->{id}}
129 = $AnyEvent::MP::Kernel::NODE{$self->{id}};
130
123 Scalar::Util::weaken $self; 131 Scalar::Util::weaken $self;
124 132
125 my $monitor = $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout}; 133 $self->{connect_to} ||= AE::timer $AnyEvent::MP::Kernel::CONFIG->{connect_interval}, 0, sub {
126
127 $self->{connect_to} ||= AE::timer $monitor, 0, sub {
128 $self->transport_error (transport_error => $self->{id}, "unable to connect"); 134 $self->transport_error (transport_error => $self->{id}, "connect timeout");
129 }; 135 };
130 136
131 # maybe @$addresses? 137 # maybe @$addresses?
132 my @addresses = @{ $AnyEvent::MP::Kernel::GLOBAL_DB{"'l"}{$self->{id}} }; 138 my $addresses = $AnyEvent::MP::Kernel::GLOBAL_DB{"'l"}{$self->{id}};
133 139
134 unless (@addresses) { 140 if ($addresses) {
141 $self->connect_to ($addresses);
142 } else {
135 # on global nodes, all bets are off now - we either know the node, or we don't 143 # on global nodes, all bets are off now - we either know the node, or we don't
136 unless ($AnyEvent::MP::Kernel::GLOBAL) { 144 if ($AnyEvent::MP::Kernel::GLOBAL) {
137 $self->{connect_w} = AnyEvent::MP::Kernel::global_req (g_find => $self->{id}); 145 $self->transport_error (transport_error => $self->{id}, "no known address");
146 } else {
147 AnyEvent::MP::Kernel::g_find ($self->{id});
138 } 148 }
149 }
150}
151
152sub connect_to {
153 my ($self, $addresses) = @_;
154
155 return if $self->{transport};
156 return if $self->{connect_w};
157
158 unless (@$addresses) {
159 $self->transport_error (transport_error => $self->{id}, "no known address");
139 return; 160 return;
140 } 161 }
141 162
142 $self->{connect_addr} = \@addresses; # a bit weird, but efficient
143
144 $AnyEvent::MP::Kernel::WARN->(9, "connecting to $self->{id} with [@addresses]"); 163 AE::log 9 => "connecting to $self->{id} with [@$addresses]";
145 164
165 my $monitor = $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout};
146 my $interval = $AnyEvent::MP::Kernel::CONFIG->{connect_interval}; 166 my $interval = $AnyEvent::MP::Kernel::CONFIG->{connect_interval};
147 167
148 $interval = ($monitor - $interval) / @addresses 168 $interval = ($monitor - $interval) / @$addresses
149 if ($monitor - $interval) / @addresses < $interval; 169 if ($monitor - $interval) / @$addresses < $interval;
150 170
151 $interval = 0.4 if $interval < 0.4; 171 $interval = 0.4 if $interval < 0.4;
152 172
153 my @endpoints; 173 my @endpoints = reverse @$addresses;
154 174
155 $self->{connect_w} = AE::timer 0, $interval * (0.9 + 0.1 * rand), sub { 175 $self->{connect_w} = AE::timer 0, $interval * (0.9 + 0.1 * rand), sub {
156 @endpoints = @addresses
157 unless @endpoints;
158
159 my $endpoint = shift @endpoints; 176 my $endpoint = pop @endpoints
177 or return;
160 178
161 $AnyEvent::MP::Kernel::WARN->(9, "connecting to $self->{id} at $endpoint"); 179 AE::log 9 => "connecting to $self->{id} at $endpoint";
162 180
163 $self->{trial}{$endpoint} ||= do { 181 $self->{trial}{$endpoint} ||= do {
164 my ($host, $port) = AnyEvent::Socket::parse_hostport $endpoint 182 my ($host, $port) = AnyEvent::Socket::parse_hostport $endpoint
165 or return $AnyEvent::MP::Kernel::WARN->(1, "$self->{id}: not a resolved node reference."); 183 or return AE::log critical => "$self->{id}: '$endpoint' is not a resolved node reference.";
166 184
167 AnyEvent::MP::Transport::mp_connect 185 AnyEvent::MP::Transport::mp_connect
168 $host, $port, 186 $host, $port,
169 sub { delete $self->{trial}{$endpoint} }, 187 sub { delete $self->{trial}{$endpoint} },
170 }; 188 };
172} 190}
173 191
174sub kill { 192sub kill {
175 my ($self, $port, @reason) = @_; 193 my ($self, $port, @reason) = @_;
176 194
177 $self->{send} (["", kil => $port, @reason]); 195 $self->{send} (["", kil1 => $port, @reason]);
178} 196}
179 197
180sub monitor { 198sub monitor {
181 my ($self, $portid, $cb) = @_; 199 my ($self, $portid, $cb) = @_;
182 200
200 $self->send (["", mon0 => $portid]); 218 $self->send (["", mon0 => $portid]);
201 delete $self->{monitor}{$portid}; 219 delete $self->{monitor}{$portid};
202 } 220 }
203} 221}
204 222
205package AnyEvent::MP::Node::Self; 223package AnyEvent::MP::Node::Self; # the local node
206 224
207use base "AnyEvent::MP::Node"; 225use base "AnyEvent::MP::Node";
208 226
209sub connect { 227sub connect {
210 # we are trivially connected 228 # we are trivially connected
213# delay every so often to avoid recursion, also used to delay after spawn 231# delay every so often to avoid recursion, also used to delay after spawn
214our $DELAY = -50; 232our $DELAY = -50;
215our @DELAY; 233our @DELAY;
216our $DELAY_W; 234our $DELAY_W;
217 235
218sub _send_delayed { 236our $send_delayed = sub {
219 local $AnyEvent::MP::Kernel::SRCNODE = $AnyEvent::MP::Kernel::NODE{""}; 237 $AnyEvent::MP::Kernel::SRCNODE = $AnyEvent::MP::Kernel::NODE;
220 (shift @DELAY)->() 238 (shift @DELAY)->()
221 while @DELAY; 239 while @DELAY;
222 undef $DELAY_W; 240 undef $DELAY_W;
223 $DELAY = -50; 241 $DELAY = -50;
224} 242};
225 243
226sub transport_reset { 244sub transport_reset {
227 my ($self) = @_; 245 my ($self) = @_;
228 246
229 Scalar::Util::weaken $self; 247 Scalar::Util::weaken $self;
230 248
231 $self->{send} = sub { 249 $self->{send} = sub {
232 if ($DELAY++ >= 0) { 250 if (++$DELAY > 0) {
233 my $msg = $_[0]; 251 my $msg = $_[0];
234 push @DELAY, sub { AnyEvent::MP::Kernel::_inject (@$msg) }; 252 push @DELAY, sub { AnyEvent::MP::Kernel::_inject (@$msg) };
235 $DELAY_W ||= AE::timer 0, 0, \&_send_delayed; 253 $DELAY_W ||= AE::timer 0, 0, $send_delayed;
236 return; 254 return;
237 } 255 }
238 256
239 local $AnyEvent::MP::Kernel::SRCNODE = $self; 257 local $AnyEvent::MP::Kernel::SRCNODE = $AnyEvent::MP::Kernel::NODE;
240 AnyEvent::MP::Kernel::_inject (@{ $_[0] }); 258 AnyEvent::MP::Kernel::_inject (@{ $_[0] });
241 }; 259 };
242} 260}
243 261
244sub transport_connect { 262sub transport_connect {
245 my ($self, $tp) = @_; 263 my ($self, $tp) = @_;
246 264
247 $AnyEvent::MP::Kernel::WARN->(9, "I refuse to talk to myself ($tp->{peerhost}:$tp->{peerport})"); 265 AE::log 9 => "I refuse to talk to myself ($tp->{peerhost}:$tp->{peerport})";
248} 266}
249 267
250sub kill { 268sub kill {
251 my (undef, @args) = @_; 269 my (undef, @args) = @_;
252 270
253 # we _always_ delay kil's, to avoid calling mon callbacks 271 # we _always_ delay kil's, to avoid calling mon callbacks
254 # from anything but the event loop context. 272 # from anything but the event loop context.
255 $DELAY = 1; 273 $DELAY = 1;
256 push @DELAY, sub { AnyEvent::MP::Kernel::_kill (@args) }; 274 push @DELAY, sub { AnyEvent::MP::Kernel::_kill (@args) };
257 $DELAY_W ||= AE::timer 0, 0, \&_send_delayed; 275 $DELAY_W ||= AE::timer 0, 0, $send_delayed;
258} 276}
259 277
260sub monitor { 278sub monitor {
261 # maybe always delay, too? 279 # maybe always delay, too?
262 if ($DELAY_W) { 280 if ($DELAY_W) {

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines