ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Node.pm
(Generate patch)

Comparing AnyEvent-MP/MP/Node.pm (file contents):
Revision 1.55 by root, Wed Feb 29 19:23:44 2012 UTC vs.
Revision 1.69 by root, Sun Aug 28 14:49:44 2016 UTC

10 10
11This is an internal utility module, horrible to look at, so don't. 11This is an internal utility module, horrible to look at, so don't.
12 12
13=cut 13=cut
14 14
15package AnyEvent::MP::Node; 15package AnyEvent::MP::Node; # base class for nodes
16 16
17use common::sense; 17use common::sense;
18 18
19use AE ();
20use AnyEvent::Util (); 19use AnyEvent ();
21use AnyEvent::Socket (); 20use AnyEvent::Socket ();
22 21
23use AnyEvent::MP::Transport (); 22use AnyEvent::MP::Transport ();
24 23
25sub new { 24sub new {
26 my ($self, $id) = @_; 25 my ($self, $id) = @_;
27 26
28 $self = bless { id => $id }, $self; 27 $self = bless { id => $id }, $self;
29 28
29 # register
30 $AnyEvent::MP::Kernel::NODE{$id} = $self;
31
30 $self->init; 32 $self->init;
31 $self->transport_reset; 33 $self->transport_reset;
32 34
33 $self 35 $self
34} 36}
35 37
38sub DESTROY {
39 # unregister
40 delete $AnyEvent::MP::Kernel::NODE{$_[0]{id}};
41}
42
36sub init { 43sub init {
37 # 44 #
38} 45}
39 46
40sub send { 47sub send {
41 &{ shift->{send} } 48 &{ shift->{send} }
42} 49}
43 50
44# nodes reachable via the network 51# nodes reachable via the network
45package AnyEvent::MP::Node::Remote; 52package AnyEvent::MP::Node::Remote; # a remote node
46 53
47use base "AnyEvent::MP::Node"; 54use base "AnyEvent::MP::Node";
48 55
49# called at init time, mostly sets {send} 56# called at init time, mostly sets {send}
50sub transport_reset { 57sub transport_reset {
78 } 85 }
79 86
80 AnyEvent::MP::Kernel::_inject_nodeevent ($self, 0, @reason) 87 AnyEvent::MP::Kernel::_inject_nodeevent ($self, 0, @reason)
81 unless $no_transport; 88 unless $no_transport;
82 89
83 # if we are here and are idle, we nuke ourselves 90 # we weaken the node reference, so it can go away if unused
84 delete $AnyEvent::MP::Kernel::NODE{$self->{id}} 91 Scalar::Util::weaken $AnyEvent::MP::Kernel::NODE{$self->{id}}
85 unless $self->{transport} || $self->{connect_to}; 92 unless $self->{connect_to};
93
94 AE::log 9 => "@reason";
86} 95}
87 96
88# called after handshake was successful 97# called after handshake was successful
89sub transport_connect { 98sub transport_connect {
90 my ($self, $transport) = @_; 99 my ($self, $transport) = @_;
92 delete $self->{trial}; 101 delete $self->{trial};
93 102
94 $self->transport_error (transport_error => $self->{id}, "switched connections") 103 $self->transport_error (transport_error => $self->{id}, "switched connections")
95 if $self->{transport}; 104 if $self->{transport};
96 105
97 delete $self->{connect_addr};
98 delete $self->{connect_w}; 106 delete $self->{connect_w};
99 delete $self->{connect_to}; 107 delete $self->{connect_to};
100 108
101 $self->{transport} = $transport; 109 $self->{transport} = $transport;
102 110
114 my ($self) = @_; 122 my ($self) = @_;
115 123
116 return if $self->{transport}; 124 return if $self->{transport};
117 return if $self->{connect_w}; 125 return if $self->{connect_w};
118 126
127 # we unweaken the node reference, in case it was weakened before
128 $AnyEvent::MP::Kernel::NODE{$self->{id}}
129 = $AnyEvent::MP::Kernel::NODE{$self->{id}};
130
119 Scalar::Util::weaken $self; 131 Scalar::Util::weaken $self;
120 132
121 my $monitor = $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout}; 133 $self->{connect_to} ||= AE::timer $AnyEvent::MP::Kernel::CONFIG->{connect_interval}, 0, sub {
122
123 $self->{connect_to} ||= AE::timer $monitor, 0, sub {
124 $self->transport_error (transport_error => $self->{id}, "unable to connect"); 134 $self->transport_error (transport_error => $self->{id}, "connect timeout");
125 }; 135 };
126 136
127 # maybe @$addresses? 137 # maybe @$addresses?
128 my @addresses = @{
129 $AnyEvent::MP::Kernel::GLOBAL_ADDR->{$self->{id}} 138 my $addresses = $AnyEvent::MP::Kernel::GLOBAL_DB{"'l"}{$self->{id}};
130 || $AnyEvent::MP::Kernel::NODE_ADDR->{$self->{id}}
131 };
132 139
133 unless (@addresses) { 140 if ($addresses) {
141 $self->connect_to ($addresses);
142 } else {
134 # on global nodes, all bets are off now - we either know the node, or we don't 143 # on global nodes, all bets are off now - we either know the node, or we don't
135 unless ($AnyEvent::MP::Kernel::GLOBAL) { 144 if ($AnyEvent::MP::Kernel::GLOBAL) {
136 $self->{connect_w} = AnyEvent::MP::Kernel::global_req ( 145 $self->transport_error (transport_error => $self->{id}, "no known address");
137 g_find => $self->{id}, 146 } else {
138 sub { 147 AnyEvent::MP::Kernel::g_find ($self->{id});
139 return unless $self; # just to be sure
140 return unless @{ $_[0] };
141
142 local $AnyEvent::MP::Kernel::GLOBAL_ADDR->{$self->{id}} = $_[0]; #d# UGLY
143 delete $self->{connect_w};
144 $self->connect;
145 }
146 );
147 } 148 }
149 }
150}
148 151
152sub connect_to {
153 my ($self, $addresses) = @_;
154
155 return if $self->{transport};
156 return if $self->{connect_w};
157
158 unless (@$addresses) {
159 $self->transport_error (transport_error => $self->{id}, "no known address");
149 return; 160 return;
150 } 161 }
151 162
152 $self->{connect_addr} = \@addresses; # a bit weird, but efficient
153
154 $AnyEvent::MP::Kernel::WARN->(9, "connecting to $self->{id} with [@addresses]"); 163 AE::log 9 => "connecting to $self->{id} with [@$addresses]";
155 164
165 my $monitor = $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout};
156 my $interval = $AnyEvent::MP::Kernel::CONFIG->{connect_interval}; 166 my $interval = $AnyEvent::MP::Kernel::CONFIG->{connect_interval};
157 167
158 $interval = ($monitor - $interval) / @addresses 168 $interval = ($monitor - $interval) / @$addresses
159 if ($monitor - $interval) / @addresses < $interval; 169 if ($monitor - $interval) / @$addresses < $interval;
160 170
161 $interval = 0.4 if $interval < 0.4; 171 $interval = 0.4 if $interval < 0.4;
162 172
163 my @endpoints; 173 my @endpoints = reverse @$addresses;
164 174
165 $self->{connect_w} = AE::timer 0, $interval * (0.9 + 0.1 * rand), sub { 175 $self->{connect_w} = AE::timer 0, $interval * (0.9 + 0.1 * rand), sub {
166 @endpoints = @addresses
167 unless @endpoints;
168
169 my $endpoint = shift @endpoints; 176 my $endpoint = pop @endpoints
177 or return;
170 178
171 $AnyEvent::MP::Kernel::WARN->(9, "connecting to $self->{id} at $endpoint"); 179 AE::log 9 => "connecting to $self->{id} at $endpoint";
172 180
173 $self->{trial}{$endpoint} ||= do { 181 $self->{trial}{$endpoint} ||= do {
174 my ($host, $port) = AnyEvent::Socket::parse_hostport $endpoint 182 my ($host, $port) = AnyEvent::Socket::parse_hostport $endpoint
175 or return $AnyEvent::MP::Kernel::WARN->(1, "$self->{id}: not a resolved node reference."); 183 or return AE::log critical => "$self->{id}: '$endpoint' is not a resolved node reference.";
176 184
177 AnyEvent::MP::Transport::mp_connect 185 AnyEvent::MP::Transport::mp_connect
178 $host, $port, 186 $host, $port,
179 sub { delete $self->{trial}{$endpoint} }, 187 sub { delete $self->{trial}{$endpoint} },
180 }; 188 };
182} 190}
183 191
184sub kill { 192sub kill {
185 my ($self, $port, @reason) = @_; 193 my ($self, $port, @reason) = @_;
186 194
187 $self->{send} (["", kil => $port, @reason]); 195 $self->{send} (["", kil1 => $port, @reason]);
188} 196}
189 197
190sub monitor { 198sub monitor {
191 my ($self, $portid, $cb) = @_; 199 my ($self, $portid, $cb) = @_;
192 200
210 $self->send (["", mon0 => $portid]); 218 $self->send (["", mon0 => $portid]);
211 delete $self->{monitor}{$portid}; 219 delete $self->{monitor}{$portid};
212 } 220 }
213} 221}
214 222
215package AnyEvent::MP::Node::Self; 223package AnyEvent::MP::Node::Self; # the local node
216 224
217use base "AnyEvent::MP::Node"; 225use base "AnyEvent::MP::Node";
218 226
219sub connect { 227sub connect {
220 # we are trivially connected 228 # we are trivially connected
223# delay every so often to avoid recursion, also used to delay after spawn 231# delay every so often to avoid recursion, also used to delay after spawn
224our $DELAY = -50; 232our $DELAY = -50;
225our @DELAY; 233our @DELAY;
226our $DELAY_W; 234our $DELAY_W;
227 235
228sub _send_delayed { 236our $send_delayed = sub {
229 local $AnyEvent::MP::Kernel::SRCNODE = $AnyEvent::MP::Kernel::NODE{""}; 237 $AnyEvent::MP::Kernel::SRCNODE = $AnyEvent::MP::Kernel::NODE;
230 (shift @DELAY)->() 238 (shift @DELAY)->()
231 while @DELAY; 239 while @DELAY;
232 undef $DELAY_W; 240 undef $DELAY_W;
233 $DELAY = -50; 241 $DELAY = -50;
234} 242};
235 243
236sub transport_reset { 244sub transport_reset {
237 my ($self) = @_; 245 my ($self) = @_;
238 246
239 Scalar::Util::weaken $self; 247 Scalar::Util::weaken $self;
240 248
241 $self->{send} = sub { 249 $self->{send} = sub {
242 if ($DELAY++ >= 0) { 250 if (++$DELAY > 0) {
243 my $msg = $_[0]; 251 my $msg = $_[0];
244 push @DELAY, sub { AnyEvent::MP::Kernel::_inject (@$msg) }; 252 push @DELAY, sub { AnyEvent::MP::Kernel::_inject (@$msg) };
245 $DELAY_W ||= AE::timer 0, 0, \&_send_delayed; 253 $DELAY_W ||= AE::timer 0, 0, $send_delayed;
246 return; 254 return;
247 } 255 }
248 256
249 local $AnyEvent::MP::Kernel::SRCNODE = $self; 257 local $AnyEvent::MP::Kernel::SRCNODE = $AnyEvent::MP::Kernel::NODE;
250 AnyEvent::MP::Kernel::_inject (@{ $_[0] }); 258 AnyEvent::MP::Kernel::_inject (@{ $_[0] });
251 }; 259 };
252} 260}
253 261
254sub transport_connect { 262sub transport_connect {
255 my ($self, $tp) = @_; 263 my ($self, $tp) = @_;
256 264
257 $AnyEvent::MP::Kernel::WARN->(9, "I refuse to talk to myself ($tp->{peerhost}:$tp->{peerport})"); 265 AE::log 9 => "I refuse to talk to myself ($tp->{peerhost}:$tp->{peerport})";
258} 266}
259 267
260sub kill { 268sub kill {
261 my (undef, @args) = @_; 269 my (undef, @args) = @_;
262 270
263 # we _always_ delay kil's, to avoid calling mon callbacks 271 # we _always_ delay kil's, to avoid calling mon callbacks
264 # from anything but the event loop context. 272 # from anything but the event loop context.
265 $DELAY = 1; 273 $DELAY = 1;
266 push @DELAY, sub { AnyEvent::MP::Kernel::_kill (@args) }; 274 push @DELAY, sub { AnyEvent::MP::Kernel::_kill (@args) };
267 $DELAY_W ||= AE::timer 0, 0, \&_send_delayed; 275 $DELAY_W ||= AE::timer 0, 0, $send_delayed;
268} 276}
269 277
270sub monitor { 278sub monitor {
271 # maybe always delay, too? 279 # maybe always delay, too?
272 if ($DELAY_W) { 280 if ($DELAY_W) {

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines