ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Node.pm
(Generate patch)

Comparing AnyEvent-MP/MP/Node.pm (file contents):
Revision 1.52 by root, Sat Mar 13 20:29:04 2010 UTC vs.
Revision 1.66 by root, Sat Mar 24 00:48:54 2012 UTC

10 10
11This is an internal utility module, horrible to look at, so don't. 11This is an internal utility module, horrible to look at, so don't.
12 12
13=cut 13=cut
14 14
15package AnyEvent::MP::Node; 15package AnyEvent::MP::Node; # base class for nodes
16 16
17use common::sense; 17use common::sense;
18 18
19use AE ();
20use AnyEvent::Util (); 19use AnyEvent ();
21use AnyEvent::Socket (); 20use AnyEvent::Socket ();
22 21
23use AnyEvent::MP::Transport (); 22use AnyEvent::MP::Transport ();
24 23
25sub new { 24sub new {
26 my ($self, $id) = @_; 25 my ($self, $id) = @_;
27 26
28 $self = bless { id => $id }, $self; 27 $self = bless { id => $id }, $self;
29 28
29 # register
30 $AnyEvent::MP::Kernel::NODE{$id} = $self;
31
30 $self->init; 32 $self->init;
31 $self->transport_reset; 33 $self->transport_reset;
32 34
33 $self 35 $self
34} 36}
35 37
38sub DESTROY {
39 # unregister
40 delete $AnyEvent::MP::Kernel::NODE{$_[0]{id}};
41}
42
36sub init { 43sub init {
37 # 44 #
38} 45}
39 46
40sub send { 47sub send {
41 &{ shift->{send} } 48 &{ shift->{send} }
42} 49}
43 50
44# nodes reachable via the network 51# nodes reachable via the network
45package AnyEvent::MP::Node::External; 52package AnyEvent::MP::Node::Remote; # a remote node
46 53
47use base "AnyEvent::MP::Node"; 54use base "AnyEvent::MP::Node";
48 55
49# called at init time, mostly sets {send} 56# called at init time, mostly sets {send}
50sub transport_reset { 57sub transport_reset {
78 } 85 }
79 86
80 AnyEvent::MP::Kernel::_inject_nodeevent ($self, 0, @reason) 87 AnyEvent::MP::Kernel::_inject_nodeevent ($self, 0, @reason)
81 unless $no_transport; 88 unless $no_transport;
82 89
83 # if we are here and are idle, we nuke ourselves 90 # we weaken the node reference, so it can go away if unused
84 delete $AnyEvent::MP::Kernel::NODE{$self->{id}} 91 Scalar::Util::weaken $AnyEvent::MP::Kernel::NODE{$self->{id}}
85 unless $self->{transport} || $self->{connect_to}; 92 unless $self->{connect_to};
93
94 AE::log 9 => "@reason";
86} 95}
87 96
88# called after handshake was successful 97# called after handshake was successful
89sub transport_connect { 98sub transport_connect {
90 my ($self, $transport) = @_; 99 my ($self, $transport) = @_;
92 delete $self->{trial}; 101 delete $self->{trial};
93 102
94 $self->transport_error (transport_error => $self->{id}, "switched connections") 103 $self->transport_error (transport_error => $self->{id}, "switched connections")
95 if $self->{transport}; 104 if $self->{transport};
96 105
97 delete $self->{connect_addr};
98 delete $self->{connect_w}; 106 delete $self->{connect_w};
99 delete $self->{connect_to}; 107 delete $self->{connect_to};
100 108
101 $self->{transport} = $transport; 109 $self->{transport} = $transport;
102 110
109 $transport_send->($_) 117 $transport_send->($_)
110 for @{ delete $self->{queue} || [] }; 118 for @{ delete $self->{queue} || [] };
111} 119}
112 120
113sub connect { 121sub connect {
114 my ($self, @addresses) = @_; 122 my ($self) = @_;
115 123
116 return if $self->{transport}; 124 return if $self->{transport};
125 return if $self->{connect_w};
126
127 # we unweaken the node reference, in case it was weakened before
128 $AnyEvent::MP::Kernel::NODE{$self->{id}}
129 = $AnyEvent::MP::Kernel::NODE{$self->{id}};
117 130
118 Scalar::Util::weaken $self; 131 Scalar::Util::weaken $self;
119 132
120 my $monitor = $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout}; 133 $self->{connect_to} ||= AE::timer $AnyEvent::MP::Kernel::CONFIG->{connect_interval}, 0, sub {
121
122 $self->{connect_to} ||= AE::timer $monitor, 0, sub {
123 $self->transport_error (transport_error => $self->{id}, "unable to connect"); 134 $self->transport_error (transport_error => $self->{id}, "connect timeout");
124 }; 135 };
125 136
126 return unless @addresses; 137 # maybe @$addresses?
138 my $addresses = $AnyEvent::MP::Kernel::GLOBAL_DB{"'l"}{$self->{id}};
127 139
140 if ($addresses) {
141 $self->connect_to ($addresses);
142 } else {
143 # on global nodes, all bets are off now - we either know the node, or we don't
144 if ($AnyEvent::MP::Kernel::GLOBAL) {
145 $self->transport_error (transport_error => $self->{id}, "no known address");
146 } else {
147 AnyEvent::MP::Kernel::g_find ($self->{id});
148 }
149 }
150}
151
152sub connect_to {
153 my ($self, $addresses) = @_;
154
155 return if $self->{transport};
128 if ($self->{connect_w}) { 156 return if $self->{connect_w};
129 # sometimes we get told about new addresses after we started to connect 157
130 unshift @{$self->{connect_addr}}, @addresses; 158 unless (@$addresses) {
159 $self->transport_error (transport_error => $self->{id}, "no known address");
131 return; 160 return;
132 } 161 }
133 162
134 $self->{connect_addr} = \@addresses; # a bit weird, but efficient
135
136 $AnyEvent::MP::Kernel::WARN->(9, "connecting to $self->{id} with [@addresses]"); 163 AE::log 9 => "connecting to $self->{id} with [@$addresses]";
137 164
165 my $monitor = $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout};
138 my $interval = $AnyEvent::MP::Kernel::CONFIG->{connect_interval}; 166 my $interval = $AnyEvent::MP::Kernel::CONFIG->{connect_interval};
139 167
140 $interval = ($monitor - $interval) / @addresses 168 $interval = ($monitor - $interval) / @$addresses
141 if ($monitor - $interval) / @addresses < $interval; 169 if ($monitor - $interval) / @$addresses < $interval;
142 170
143 $interval = 0.4 if $interval < 0.4; 171 $interval = 0.4 if $interval < 0.4;
144 172
145 my @endpoints; 173 my @endpoints = reverse @$addresses;
146 174
147 $self->{connect_w} = AE::timer 0.050 * rand, $interval * (0.9 + 0.1 * rand), sub { 175 $self->{connect_w} = AE::timer 0, $interval * (0.9 + 0.1 * rand), sub {
148 @endpoints = @addresses
149 unless @endpoints;
150
151 my $endpoint = shift @endpoints; 176 my $endpoint = pop @endpoints;
152 177
153 $AnyEvent::MP::Kernel::WARN->(9, "connecting to $self->{id} at $endpoint"); 178 AE::log 9 => "connecting to $self->{id} at $endpoint";
154 179
155 $self->{trial}{$endpoint} ||= do { 180 $self->{trial}{$endpoint} ||= do {
181 use Carp;AnyEvent::Socket::parse_hostport $endpoint or Carp::confess "not a reslved blabla\n";#d#
156 my ($host, $port) = AnyEvent::Socket::parse_hostport $endpoint 182 my ($host, $port) = AnyEvent::Socket::parse_hostport $endpoint
157 or return $AnyEvent::MP::Kernel::WARN->(1, "$self->{id}: not a resolved node reference."); 183 or return AE::log critical => "$self->{id}: '$endpoint' is not a resolved node reference.";
158 184
159 AnyEvent::MP::Transport::mp_connect 185 AnyEvent::MP::Transport::mp_connect
160 $host, $port, 186 $host, $port,
161 sub { delete $self->{trial}{$endpoint} }, 187 sub { delete $self->{trial}{$endpoint} },
162 }; 188 };
164} 190}
165 191
166sub kill { 192sub kill {
167 my ($self, $port, @reason) = @_; 193 my ($self, $port, @reason) = @_;
168 194
169 $self->send (["", kil => $port, @reason]); 195 $self->{send} (["", kil1 => $port, @reason]);
170} 196}
171 197
172sub monitor { 198sub monitor {
173 my ($self, $portid, $cb) = @_; 199 my ($self, $portid, $cb) = @_;
174 200
192 $self->send (["", mon0 => $portid]); 218 $self->send (["", mon0 => $portid]);
193 delete $self->{monitor}{$portid}; 219 delete $self->{monitor}{$portid};
194 } 220 }
195} 221}
196 222
197# used for direct slave connections as well
198package AnyEvent::MP::Node::Direct;
199
200use base "AnyEvent::MP::Node::External";
201
202package AnyEvent::MP::Node::Self; 223package AnyEvent::MP::Node::Self; # the local node
203 224
204use base "AnyEvent::MP::Node"; 225use base "AnyEvent::MP::Node";
205 226
206sub connect { 227sub connect {
207 # we are trivially connected 228 # we are trivially connected
210# delay every so often to avoid recursion, also used to delay after spawn 231# delay every so often to avoid recursion, also used to delay after spawn
211our $DELAY = -50; 232our $DELAY = -50;
212our @DELAY; 233our @DELAY;
213our $DELAY_W; 234our $DELAY_W;
214 235
215sub _send_delayed { 236our $send_delayed = sub {
216 local $AnyEvent::MP::Kernel::SRCNODE = $AnyEvent::MP::Kernel::NODE{""}; 237 $AnyEvent::MP::Kernel::SRCNODE = $AnyEvent::MP::Kernel::NODE;
217 (shift @DELAY)->() 238 (shift @DELAY)->()
218 while @DELAY; 239 while @DELAY;
219 undef $DELAY_W; 240 undef $DELAY_W;
220 $DELAY = -50; 241 $DELAY = -50;
221} 242};
222 243
223sub transport_reset { 244sub transport_reset {
224 my ($self) = @_; 245 my ($self) = @_;
225 246
226 Scalar::Util::weaken $self; 247 Scalar::Util::weaken $self;
227 248
228 $self->{send} = sub { 249 $self->{send} = sub {
229 if ($DELAY++ >= 0) { 250 if (++$DELAY > 0) {
230 my $msg = $_[0]; 251 my $msg = $_[0];
231 push @DELAY, sub { AnyEvent::MP::Kernel::_inject (@$msg) }; 252 push @DELAY, sub { AnyEvent::MP::Kernel::_inject (@$msg) };
232 $DELAY_W ||= AE::timer 0, 0, \&_send_delayed; 253 $DELAY_W ||= AE::timer 0, 0, $send_delayed;
233 return; 254 return;
234 } 255 }
235 256
236 local $AnyEvent::MP::Kernel::SRCNODE = $self; 257 local $AnyEvent::MP::Kernel::SRCNODE = $AnyEvent::MP::Kernel::NODE;
237 AnyEvent::MP::Kernel::_inject (@{ $_[0] }); 258 AnyEvent::MP::Kernel::_inject (@{ $_[0] });
238 }; 259 };
239} 260}
240 261
241sub transport_connect { 262sub transport_connect {
242 my ($self, $tp) = @_; 263 my ($self, $tp) = @_;
243 264
244 $AnyEvent::MP::Kernel::WARN->(9, "I refuse to talk to myself ($tp->{peerhost}:$tp->{peerport})"); 265 AE::log 9 => "I refuse to talk to myself ($tp->{peerhost}:$tp->{peerport})";
245} 266}
246 267
247sub kill { 268sub kill {
248 my (undef, @args) = @_; 269 my (undef, @args) = @_;
249 270
250 # we _always_ delay kil's, to avoid calling mon callbacks 271 # we _always_ delay kil's, to avoid calling mon callbacks
251 # from anything but the event loop context. 272 # from anything but the event loop context.
252 $DELAY = 1; 273 $DELAY = 1;
253 push @DELAY, sub { AnyEvent::MP::Kernel::_kill (@args) }; 274 push @DELAY, sub { AnyEvent::MP::Kernel::_kill (@args) };
254 $DELAY_W ||= AE::timer 0, 0, \&_send_delayed; 275 $DELAY_W ||= AE::timer 0, 0, $send_delayed;
255} 276}
256 277
257sub monitor { 278sub monitor {
258 # maybe always delay, too? 279 # maybe always delay, too?
259 if ($DELAY_W) { 280 if ($DELAY_W) {

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines