ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Node.pm
(Generate patch)

Comparing AnyEvent-MP/MP/Node.pm (file contents):
Revision 1.52 by root, Sat Mar 13 20:29:04 2010 UTC vs.
Revision 1.64 by root, Fri Mar 23 21:16:25 2012 UTC

10 10
11This is an internal utility module, horrible to look at, so don't. 11This is an internal utility module, horrible to look at, so don't.
12 12
13=cut 13=cut
14 14
15package AnyEvent::MP::Node; 15package AnyEvent::MP::Node; # base class for nodes
16 16
17use common::sense; 17use common::sense;
18 18
19use AE ();
20use AnyEvent::Util (); 19use AnyEvent ();
21use AnyEvent::Socket (); 20use AnyEvent::Socket ();
22 21
23use AnyEvent::MP::Transport (); 22use AnyEvent::MP::Transport ();
24 23
25sub new { 24sub new {
40sub send { 39sub send {
41 &{ shift->{send} } 40 &{ shift->{send} }
42} 41}
43 42
44# nodes reachable via the network 43# nodes reachable via the network
45package AnyEvent::MP::Node::External; 44package AnyEvent::MP::Node::Remote; # a remote node
46 45
47use base "AnyEvent::MP::Node"; 46use base "AnyEvent::MP::Node";
48 47
49# called at init time, mostly sets {send} 48# called at init time, mostly sets {send}
50sub transport_reset { 49sub transport_reset {
92 delete $self->{trial}; 91 delete $self->{trial};
93 92
94 $self->transport_error (transport_error => $self->{id}, "switched connections") 93 $self->transport_error (transport_error => $self->{id}, "switched connections")
95 if $self->{transport}; 94 if $self->{transport};
96 95
97 delete $self->{connect_addr};
98 delete $self->{connect_w}; 96 delete $self->{connect_w};
99 delete $self->{connect_to}; 97 delete $self->{connect_to};
100 98
101 $self->{transport} = $transport; 99 $self->{transport} = $transport;
102 100
109 $transport_send->($_) 107 $transport_send->($_)
110 for @{ delete $self->{queue} || [] }; 108 for @{ delete $self->{queue} || [] };
111} 109}
112 110
113sub connect { 111sub connect {
114 my ($self, @addresses) = @_; 112 my ($self) = @_;
115 113
116 return if $self->{transport}; 114 return if $self->{transport};
115 return if $self->{connect_w};
117 116
118 Scalar::Util::weaken $self; 117 Scalar::Util::weaken $self;
118 use Carp;Carp::cluck;#d#
119 119
120 my $monitor = $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout}; 120 $self->{connect_to} ||= AE::timer $AnyEvent::MP::Kernel::CONFIG->{connect_interval}, 0, sub {
121
122 $self->{connect_to} ||= AE::timer $monitor, 0, sub {
123 $self->transport_error (transport_error => $self->{id}, "unable to connect"); 121 $self->transport_error (transport_error => $self->{id}, "unable to connect");
124 }; 122 };
125 123
124 # maybe @$addresses?
125 my $addresses = $AnyEvent::MP::Kernel::GLOBAL_DB{"'l"}{$self->{id}};
126
127 if ($addresses) {
128 $self->connect_to ($addresses);
129 } else {
130 # on global nodes, all bets are off now - we either know the node, or we don't
131 unless ($AnyEvent::MP::Kernel::GLOBAL) {
132 AnyEvent::MP::Kernel::g_find ($self->{id});
133 }
134 }
135}
136
137sub connect_to {
138 my ($self, $addresses) = @_;
139
140 return if $self->{transport};
141 return if $self->{connect_w};
142
126 return unless @addresses; 143 return unless @$addresses;
127
128 if ($self->{connect_w}) {
129 # sometimes we get told about new addresses after we started to connect
130 unshift @{$self->{connect_addr}}, @addresses;
131 return;
132 } 144
133
134 $self->{connect_addr} = \@addresses; # a bit weird, but efficient
135
136 $AnyEvent::MP::Kernel::WARN->(9, "connecting to $self->{id} with [@addresses]"); 145 AE::log 9 => "connecting to $self->{id} with [@$addresses]";
137 146
147 my $monitor = $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout};
138 my $interval = $AnyEvent::MP::Kernel::CONFIG->{connect_interval}; 148 my $interval = $AnyEvent::MP::Kernel::CONFIG->{connect_interval};
139 149
140 $interval = ($monitor - $interval) / @addresses 150 $interval = ($monitor - $interval) / @$addresses
141 if ($monitor - $interval) / @addresses < $interval; 151 if ($monitor - $interval) / @$addresses < $interval;
142 152
143 $interval = 0.4 if $interval < 0.4; 153 $interval = 0.4 if $interval < 0.4;
144 154
145 my @endpoints; 155 my @endpoints = reverse @$addresses;
146 156
147 $self->{connect_w} = AE::timer 0.050 * rand, $interval * (0.9 + 0.1 * rand), sub { 157 $self->{connect_w} = AE::timer 0, $interval * (0.9 + 0.1 * rand), sub {
148 @endpoints = @addresses
149 unless @endpoints;
150
151 my $endpoint = shift @endpoints; 158 my $endpoint = pop @endpoints;
152 159
153 $AnyEvent::MP::Kernel::WARN->(9, "connecting to $self->{id} at $endpoint"); 160 AE::log 9 => "connecting to $self->{id} at $endpoint";
154 161
155 $self->{trial}{$endpoint} ||= do { 162 $self->{trial}{$endpoint} ||= do {
156 my ($host, $port) = AnyEvent::Socket::parse_hostport $endpoint 163 my ($host, $port) = AnyEvent::Socket::parse_hostport $endpoint
157 or return $AnyEvent::MP::Kernel::WARN->(1, "$self->{id}: not a resolved node reference."); 164 or return AE::log critical => "$self->{id}: '$endpoint' is not a resolved node reference.";
158 165
159 AnyEvent::MP::Transport::mp_connect 166 AnyEvent::MP::Transport::mp_connect
160 $host, $port, 167 $host, $port,
161 sub { delete $self->{trial}{$endpoint} }, 168 sub { delete $self->{trial}{$endpoint} },
162 }; 169 };
164} 171}
165 172
166sub kill { 173sub kill {
167 my ($self, $port, @reason) = @_; 174 my ($self, $port, @reason) = @_;
168 175
169 $self->send (["", kil => $port, @reason]); 176 $self->{send} (["", kil => $port, @reason]);
170} 177}
171 178
172sub monitor { 179sub monitor {
173 my ($self, $portid, $cb) = @_; 180 my ($self, $portid, $cb) = @_;
174 181
192 $self->send (["", mon0 => $portid]); 199 $self->send (["", mon0 => $portid]);
193 delete $self->{monitor}{$portid}; 200 delete $self->{monitor}{$portid};
194 } 201 }
195} 202}
196 203
197# used for direct slave connections as well
198package AnyEvent::MP::Node::Direct;
199
200use base "AnyEvent::MP::Node::External";
201
202package AnyEvent::MP::Node::Self; 204package AnyEvent::MP::Node::Self; # the local node
203 205
204use base "AnyEvent::MP::Node"; 206use base "AnyEvent::MP::Node";
205 207
206sub connect { 208sub connect {
207 # we are trivially connected 209 # we are trivially connected
210# delay every so often to avoid recursion, also used to delay after spawn 212# delay every so often to avoid recursion, also used to delay after spawn
211our $DELAY = -50; 213our $DELAY = -50;
212our @DELAY; 214our @DELAY;
213our $DELAY_W; 215our $DELAY_W;
214 216
215sub _send_delayed { 217our $send_delayed = sub {
216 local $AnyEvent::MP::Kernel::SRCNODE = $AnyEvent::MP::Kernel::NODE{""}; 218 $AnyEvent::MP::Kernel::SRCNODE = $AnyEvent::MP::Kernel::NODE;
217 (shift @DELAY)->() 219 (shift @DELAY)->()
218 while @DELAY; 220 while @DELAY;
219 undef $DELAY_W; 221 undef $DELAY_W;
220 $DELAY = -50; 222 $DELAY = -50;
221} 223};
222 224
223sub transport_reset { 225sub transport_reset {
224 my ($self) = @_; 226 my ($self) = @_;
225 227
226 Scalar::Util::weaken $self; 228 Scalar::Util::weaken $self;
227 229
228 $self->{send} = sub { 230 $self->{send} = sub {
229 if ($DELAY++ >= 0) { 231 if (++$DELAY > 0) {
230 my $msg = $_[0]; 232 my $msg = $_[0];
231 push @DELAY, sub { AnyEvent::MP::Kernel::_inject (@$msg) }; 233 push @DELAY, sub { AnyEvent::MP::Kernel::_inject (@$msg) };
232 $DELAY_W ||= AE::timer 0, 0, \&_send_delayed; 234 $DELAY_W ||= AE::timer 0, 0, $send_delayed;
233 return; 235 return;
234 } 236 }
235 237
236 local $AnyEvent::MP::Kernel::SRCNODE = $self; 238 local $AnyEvent::MP::Kernel::SRCNODE = $AnyEvent::MP::Kernel::NODE;
237 AnyEvent::MP::Kernel::_inject (@{ $_[0] }); 239 AnyEvent::MP::Kernel::_inject (@{ $_[0] });
238 }; 240 };
239} 241}
240 242
241sub transport_connect { 243sub transport_connect {
242 my ($self, $tp) = @_; 244 my ($self, $tp) = @_;
243 245
244 $AnyEvent::MP::Kernel::WARN->(9, "I refuse to talk to myself ($tp->{peerhost}:$tp->{peerport})"); 246 AE::log 9 => "I refuse to talk to myself ($tp->{peerhost}:$tp->{peerport})";
245} 247}
246 248
247sub kill { 249sub kill {
248 my (undef, @args) = @_; 250 my (undef, @args) = @_;
249 251
250 # we _always_ delay kil's, to avoid calling mon callbacks 252 # we _always_ delay kil's, to avoid calling mon callbacks
251 # from anything but the event loop context. 253 # from anything but the event loop context.
252 $DELAY = 1; 254 $DELAY = 1;
253 push @DELAY, sub { AnyEvent::MP::Kernel::_kill (@args) }; 255 push @DELAY, sub { AnyEvent::MP::Kernel::_kill (@args) };
254 $DELAY_W ||= AE::timer 0, 0, \&_send_delayed; 256 $DELAY_W ||= AE::timer 0, 0, $send_delayed;
255} 257}
256 258
257sub monitor { 259sub monitor {
258 # maybe always delay, too? 260 # maybe always delay, too?
259 if ($DELAY_W) { 261 if ($DELAY_W) {

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines