ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Node.pm
(Generate patch)

Comparing AnyEvent-MP/MP/Node.pm (file contents):
Revision 1.53 by root, Tue Feb 28 18:37:24 2012 UTC vs.
Revision 1.68 by root, Sun Aug 28 09:39:23 2016 UTC

10 10
11This is an internal utility module, horrible to look at, so don't. 11This is an internal utility module, horrible to look at, so don't.
12 12
13=cut 13=cut
14 14
15package AnyEvent::MP::Node; 15package AnyEvent::MP::Node; # base class for nodes
16 16
17use common::sense; 17use common::sense;
18 18
19use AE ();
20use AnyEvent::Util (); 19use AnyEvent ();
21use AnyEvent::Socket (); 20use AnyEvent::Socket ();
22 21
23use AnyEvent::MP::Transport (); 22use AnyEvent::MP::Transport ();
24 23
25sub new { 24sub new {
26 my ($self, $id) = @_; 25 my ($self, $id) = @_;
27 26
28 $self = bless { id => $id }, $self; 27 $self = bless { id => $id }, $self;
29 28
29 # register
30 $AnyEvent::MP::Kernel::NODE{$id} = $self;
31
30 $self->init; 32 $self->init;
31 $self->transport_reset; 33 $self->transport_reset;
32 34
33 $self 35 $self
34} 36}
35 37
38sub DESTROY {
39 # unregister
40 delete $AnyEvent::MP::Kernel::NODE{$_[0]{id}};
41}
42
36sub init { 43sub init {
37 # 44 #
38} 45}
39 46
40sub send { 47sub send {
41 &{ shift->{send} } 48 &{ shift->{send} }
42} 49}
43 50
44# nodes reachable via the network 51# nodes reachable via the network
45package AnyEvent::MP::Node::Remote; 52package AnyEvent::MP::Node::Remote; # a remote node
46 53
47use base "AnyEvent::MP::Node"; 54use base "AnyEvent::MP::Node";
48 55
49# called at init time, mostly sets {send} 56# called at init time, mostly sets {send}
50sub transport_reset { 57sub transport_reset {
78 } 85 }
79 86
80 AnyEvent::MP::Kernel::_inject_nodeevent ($self, 0, @reason) 87 AnyEvent::MP::Kernel::_inject_nodeevent ($self, 0, @reason)
81 unless $no_transport; 88 unless $no_transport;
82 89
83 # if we are here and are idle, we nuke ourselves 90 # we weaken the node reference, so it can go away if unused
84 delete $AnyEvent::MP::Kernel::NODE{$self->{id}} 91 Scalar::Util::weaken $AnyEvent::MP::Kernel::NODE{$self->{id}}
85 unless $self->{transport} || $self->{connect_to}; 92 unless $self->{connect_to};
93
94 AE::log 9 => "@reason";
86} 95}
87 96
88# called after handshake was successful 97# called after handshake was successful
89sub transport_connect { 98sub transport_connect {
90 my ($self, $transport) = @_; 99 my ($self, $transport) = @_;
92 delete $self->{trial}; 101 delete $self->{trial};
93 102
94 $self->transport_error (transport_error => $self->{id}, "switched connections") 103 $self->transport_error (transport_error => $self->{id}, "switched connections")
95 if $self->{transport}; 104 if $self->{transport};
96 105
97 delete $self->{connect_addr};
98 delete $self->{connect_w}; 106 delete $self->{connect_w};
99 delete $self->{connect_to}; 107 delete $self->{connect_to};
100 108
101 $self->{transport} = $transport; 109 $self->{transport} = $transport;
102 110
112 120
113sub connect { 121sub connect {
114 my ($self) = @_; 122 my ($self) = @_;
115 123
116 return if $self->{transport}; 124 return if $self->{transport};
125 return if $self->{connect_w};
126
127 # we unweaken the node reference, in case it was weakened before
128 $AnyEvent::MP::Kernel::NODE{$self->{id}}
129 = $AnyEvent::MP::Kernel::NODE{$self->{id}};
117 130
118 Scalar::Util::weaken $self; 131 Scalar::Util::weaken $self;
119 132
120 my $monitor = $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout}; 133 $self->{connect_to} ||= AE::timer $AnyEvent::MP::Kernel::CONFIG->{connect_interval}, 0, sub {
121
122 $self->{connect_to} ||= AE::timer $monitor, 0, sub {
123 $self->transport_error (transport_error => $self->{id}, "unable to connect"); 134 $self->transport_error (transport_error => $self->{id}, "connect timeout");
124 }; 135 };
125 136
126 # maybe @$addresses? 137 # maybe @$addresses?
127 my @addresses = @{
128 $AnyEvent::MP::Kernel::GLOBAL_ADDR->{$self->{id}} 138 my $addresses = $AnyEvent::MP::Kernel::GLOBAL_DB{"'l"}{$self->{id}};
129 || $AnyEvent::MP::Kernel::NODE_ADDR->{$self->{id}}
130 };
131 139
132 unless (@addresses) { 140 if ($addresses) {
141 $self->connect_to ($addresses);
142 } else {
133 # on global nodes, all betsa re off now - we either know the node, or we don't 143 # on global nodes, all bets are off now - we either know the node, or we don't
134 unless ($AnyEvent::MP::Kernel::GLOBAL) { 144 if ($AnyEvent::MP::Kernel::GLOBAL) {
135 $self->{connect_w} = AnyEvent::MP::Kernel::global_req ( 145 $self->transport_error (transport_error => $self->{id}, "no known address");
136 global_find => $self->{id}, 146 } else {
137 sub { 147 AnyEvent::MP::Kernel::g_find ($self->{id});
138 return unless $self; # just to be sure
139 return unless @{ $_[0] };
140
141 local $AnyEvent::MP::Kernel::GLOBAL_ADDR->{$self->{id}} = $_[0]; #d# UGLY
142 $self->connect;
143 }
144 );
145 } 148 }
149 }
150}
146 151
152sub connect_to {
153 my ($self, $addresses) = @_;
154
155 return if $self->{transport};
156 return if $self->{connect_w};
157
158 unless (@$addresses) {
159 $self->transport_error (transport_error => $self->{id}, "no known address");
147 return; 160 return;
148 } 161 }
149 162
150 $self->{connect_addr} = \@addresses; # a bit weird, but efficient
151
152 $AnyEvent::MP::Kernel::WARN->(9, "connecting to $self->{id} with [@addresses]"); 163 AE::log 9 => "connecting to $self->{id} with [@$addresses]";
153 164
165 my $monitor = $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout};
154 my $interval = $AnyEvent::MP::Kernel::CONFIG->{connect_interval}; 166 my $interval = $AnyEvent::MP::Kernel::CONFIG->{connect_interval};
155 167
156 $interval = ($monitor - $interval) / @addresses 168 $interval = ($monitor - $interval) / @$addresses
157 if ($monitor - $interval) / @addresses < $interval; 169 if ($monitor - $interval) / @$addresses < $interval;
158 170
159 $interval = 0.4 if $interval < 0.4; 171 $interval = 0.4 if $interval < 0.4;
160 172
161 my @endpoints; 173 my @endpoints = reverse @$addresses;
162 174
163 $self->{connect_w} = AE::timer 0.050 * rand, $interval * (0.9 + 0.1 * rand), sub { 175 $self->{connect_w} = AE::timer 0, $interval * (0.9 + 0.1 * rand), sub {
164 @endpoints = @addresses
165 unless @endpoints;
166
167 my $endpoint = shift @endpoints; 176 my $endpoint = pop @endpoints
177 or return $self->transport_error (transport_error => $self->{id}, "unable to connect to any address");
168 178
169 $AnyEvent::MP::Kernel::WARN->(9, "connecting to $self->{id} at $endpoint"); 179 AE::log 9 => "connecting to $self->{id} at $endpoint";
170 180
171 $self->{trial}{$endpoint} ||= do { 181 $self->{trial}{$endpoint} ||= do {
172 my ($host, $port) = AnyEvent::Socket::parse_hostport $endpoint 182 my ($host, $port) = AnyEvent::Socket::parse_hostport $endpoint
173 or return $AnyEvent::MP::Kernel::WARN->(1, "$self->{id}: not a resolved node reference."); 183 or return AE::log critical => "$self->{id}: '$endpoint' is not a resolved node reference.";
174 184
175 AnyEvent::MP::Transport::mp_connect 185 AnyEvent::MP::Transport::mp_connect
176 $host, $port, 186 $host, $port,
177 sub { delete $self->{trial}{$endpoint} }, 187 sub { delete $self->{trial}{$endpoint} },
178 }; 188 };
180} 190}
181 191
182sub kill { 192sub kill {
183 my ($self, $port, @reason) = @_; 193 my ($self, $port, @reason) = @_;
184 194
185 $self->{send} (["", kil => $port, @reason]); 195 $self->{send} (["", kil1 => $port, @reason]);
186} 196}
187 197
188sub monitor { 198sub monitor {
189 my ($self, $portid, $cb) = @_; 199 my ($self, $portid, $cb) = @_;
190 200
208 $self->send (["", mon0 => $portid]); 218 $self->send (["", mon0 => $portid]);
209 delete $self->{monitor}{$portid}; 219 delete $self->{monitor}{$portid};
210 } 220 }
211} 221}
212 222
213package AnyEvent::MP::Node::Self; 223package AnyEvent::MP::Node::Self; # the local node
214 224
215use base "AnyEvent::MP::Node"; 225use base "AnyEvent::MP::Node";
216 226
217sub connect { 227sub connect {
218 # we are trivially connected 228 # we are trivially connected
221# delay every so often to avoid recursion, also used to delay after spawn 231# delay every so often to avoid recursion, also used to delay after spawn
222our $DELAY = -50; 232our $DELAY = -50;
223our @DELAY; 233our @DELAY;
224our $DELAY_W; 234our $DELAY_W;
225 235
226sub _send_delayed { 236our $send_delayed = sub {
227 local $AnyEvent::MP::Kernel::SRCNODE = $AnyEvent::MP::Kernel::NODE{""}; 237 $AnyEvent::MP::Kernel::SRCNODE = $AnyEvent::MP::Kernel::NODE;
228 (shift @DELAY)->() 238 (shift @DELAY)->()
229 while @DELAY; 239 while @DELAY;
230 undef $DELAY_W; 240 undef $DELAY_W;
231 $DELAY = -50; 241 $DELAY = -50;
232} 242};
233 243
234sub transport_reset { 244sub transport_reset {
235 my ($self) = @_; 245 my ($self) = @_;
236 246
237 Scalar::Util::weaken $self; 247 Scalar::Util::weaken $self;
238 248
239 $self->{send} = sub { 249 $self->{send} = sub {
240 if ($DELAY++ >= 0) { 250 if (++$DELAY > 0) {
241 my $msg = $_[0]; 251 my $msg = $_[0];
242 push @DELAY, sub { AnyEvent::MP::Kernel::_inject (@$msg) }; 252 push @DELAY, sub { AnyEvent::MP::Kernel::_inject (@$msg) };
243 $DELAY_W ||= AE::timer 0, 0, \&_send_delayed; 253 $DELAY_W ||= AE::timer 0, 0, $send_delayed;
244 return; 254 return;
245 } 255 }
246 256
247 local $AnyEvent::MP::Kernel::SRCNODE = $self; 257 local $AnyEvent::MP::Kernel::SRCNODE = $AnyEvent::MP::Kernel::NODE;
248 AnyEvent::MP::Kernel::_inject (@{ $_[0] }); 258 AnyEvent::MP::Kernel::_inject (@{ $_[0] });
249 }; 259 };
250} 260}
251 261
252sub transport_connect { 262sub transport_connect {
253 my ($self, $tp) = @_; 263 my ($self, $tp) = @_;
254 264
255 $AnyEvent::MP::Kernel::WARN->(9, "I refuse to talk to myself ($tp->{peerhost}:$tp->{peerport})"); 265 AE::log 9 => "I refuse to talk to myself ($tp->{peerhost}:$tp->{peerport})";
256} 266}
257 267
258sub kill { 268sub kill {
259 my (undef, @args) = @_; 269 my (undef, @args) = @_;
260 270
261 # we _always_ delay kil's, to avoid calling mon callbacks 271 # we _always_ delay kil's, to avoid calling mon callbacks
262 # from anything but the event loop context. 272 # from anything but the event loop context.
263 $DELAY = 1; 273 $DELAY = 1;
264 push @DELAY, sub { AnyEvent::MP::Kernel::_kill (@args) }; 274 push @DELAY, sub { AnyEvent::MP::Kernel::_kill (@args) };
265 $DELAY_W ||= AE::timer 0, 0, \&_send_delayed; 275 $DELAY_W ||= AE::timer 0, 0, $send_delayed;
266} 276}
267 277
268sub monitor { 278sub monitor {
269 # maybe always delay, too? 279 # maybe always delay, too?
270 if ($DELAY_W) { 280 if ($DELAY_W) {

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines