ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Node.pm
(Generate patch)

Comparing AnyEvent-MP/MP/Node.pm (file contents):
Revision 1.54 by root, Wed Feb 29 18:44:59 2012 UTC vs.
Revision 1.64 by root, Fri Mar 23 21:16:25 2012 UTC

10 10
11This is an internal utility module, horrible to look at, so don't. 11This is an internal utility module, horrible to look at, so don't.
12 12
13=cut 13=cut
14 14
15package AnyEvent::MP::Node; 15package AnyEvent::MP::Node; # base class for nodes
16 16
17use common::sense; 17use common::sense;
18 18
19use AE ();
20use AnyEvent::Util (); 19use AnyEvent ();
21use AnyEvent::Socket (); 20use AnyEvent::Socket ();
22 21
23use AnyEvent::MP::Transport (); 22use AnyEvent::MP::Transport ();
24 23
25sub new { 24sub new {
40sub send { 39sub send {
41 &{ shift->{send} } 40 &{ shift->{send} }
42} 41}
43 42
44# nodes reachable via the network 43# nodes reachable via the network
45package AnyEvent::MP::Node::Remote; 44package AnyEvent::MP::Node::Remote; # a remote node
46 45
47use base "AnyEvent::MP::Node"; 46use base "AnyEvent::MP::Node";
48 47
49# called at init time, mostly sets {send} 48# called at init time, mostly sets {send}
50sub transport_reset { 49sub transport_reset {
92 delete $self->{trial}; 91 delete $self->{trial};
93 92
94 $self->transport_error (transport_error => $self->{id}, "switched connections") 93 $self->transport_error (transport_error => $self->{id}, "switched connections")
95 if $self->{transport}; 94 if $self->{transport};
96 95
97 delete $self->{connect_addr};
98 delete $self->{connect_w}; 96 delete $self->{connect_w};
99 delete $self->{connect_to}; 97 delete $self->{connect_to};
100 98
101 $self->{transport} = $transport; 99 $self->{transport} = $transport;
102 100
112 110
113sub connect { 111sub connect {
114 my ($self) = @_; 112 my ($self) = @_;
115 113
116 return if $self->{transport}; 114 return if $self->{transport};
115 return if $self->{connect_w};
117 116
118 Scalar::Util::weaken $self; 117 Scalar::Util::weaken $self;
118 use Carp;Carp::cluck;#d#
119 119
120 my $monitor = $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout}; 120 $self->{connect_to} ||= AE::timer $AnyEvent::MP::Kernel::CONFIG->{connect_interval}, 0, sub {
121
122 $self->{connect_to} ||= AE::timer $monitor, 0, sub {
123 $self->transport_error (transport_error => $self->{id}, "unable to connect"); 121 $self->transport_error (transport_error => $self->{id}, "unable to connect");
124 }; 122 };
125 123
126 # maybe @$addresses? 124 # maybe @$addresses?
127 my @addresses = @{
128 $AnyEvent::MP::Kernel::GLOBAL_ADDR->{$self->{id}} 125 my $addresses = $AnyEvent::MP::Kernel::GLOBAL_DB{"'l"}{$self->{id}};
129 || $AnyEvent::MP::Kernel::NODE_ADDR->{$self->{id}}
130 };
131 126
132 unless (@addresses) { 127 if ($addresses) {
128 $self->connect_to ($addresses);
129 } else {
133 # on global nodes, all betsa re off now - we either know the node, or we don't 130 # on global nodes, all bets are off now - we either know the node, or we don't
134 unless ($AnyEvent::MP::Kernel::GLOBAL) { 131 unless ($AnyEvent::MP::Kernel::GLOBAL) {
135 $self->{connect_w} = AnyEvent::MP::Kernel::global_req ( 132 AnyEvent::MP::Kernel::g_find ($self->{id});
136 g_find => $self->{id},
137 sub {
138 return unless $self; # just to be sure
139 return unless @{ $_[0] };
140
141 local $AnyEvent::MP::Kernel::GLOBAL_ADDR->{$self->{id}} = $_[0]; #d# UGLY
142 $self->connect;
143 }
144 );
145 } 133 }
134 }
135}
146 136
147 return; 137sub connect_to {
138 my ($self, $addresses) = @_;
139
140 return if $self->{transport};
141 return if $self->{connect_w};
142
143 return unless @$addresses;
148 } 144
149
150 $self->{connect_addr} = \@addresses; # a bit weird, but efficient
151
152 $AnyEvent::MP::Kernel::WARN->(9, "connecting to $self->{id} with [@addresses]"); 145 AE::log 9 => "connecting to $self->{id} with [@$addresses]";
153 146
147 my $monitor = $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout};
154 my $interval = $AnyEvent::MP::Kernel::CONFIG->{connect_interval}; 148 my $interval = $AnyEvent::MP::Kernel::CONFIG->{connect_interval};
155 149
156 $interval = ($monitor - $interval) / @addresses 150 $interval = ($monitor - $interval) / @$addresses
157 if ($monitor - $interval) / @addresses < $interval; 151 if ($monitor - $interval) / @$addresses < $interval;
158 152
159 $interval = 0.4 if $interval < 0.4; 153 $interval = 0.4 if $interval < 0.4;
160 154
161 my @endpoints; 155 my @endpoints = reverse @$addresses;
162 156
163 $self->{connect_w} = AE::timer 0.050 * rand, $interval * (0.9 + 0.1 * rand), sub { 157 $self->{connect_w} = AE::timer 0, $interval * (0.9 + 0.1 * rand), sub {
164 @endpoints = @addresses
165 unless @endpoints;
166
167 my $endpoint = shift @endpoints; 158 my $endpoint = pop @endpoints;
168 159
169 $AnyEvent::MP::Kernel::WARN->(9, "connecting to $self->{id} at $endpoint"); 160 AE::log 9 => "connecting to $self->{id} at $endpoint";
170 161
171 $self->{trial}{$endpoint} ||= do { 162 $self->{trial}{$endpoint} ||= do {
172 my ($host, $port) = AnyEvent::Socket::parse_hostport $endpoint 163 my ($host, $port) = AnyEvent::Socket::parse_hostport $endpoint
173 or return $AnyEvent::MP::Kernel::WARN->(1, "$self->{id}: not a resolved node reference."); 164 or return AE::log critical => "$self->{id}: '$endpoint' is not a resolved node reference.";
174 165
175 AnyEvent::MP::Transport::mp_connect 166 AnyEvent::MP::Transport::mp_connect
176 $host, $port, 167 $host, $port,
177 sub { delete $self->{trial}{$endpoint} }, 168 sub { delete $self->{trial}{$endpoint} },
178 }; 169 };
208 $self->send (["", mon0 => $portid]); 199 $self->send (["", mon0 => $portid]);
209 delete $self->{monitor}{$portid}; 200 delete $self->{monitor}{$portid};
210 } 201 }
211} 202}
212 203
213package AnyEvent::MP::Node::Self; 204package AnyEvent::MP::Node::Self; # the local node
214 205
215use base "AnyEvent::MP::Node"; 206use base "AnyEvent::MP::Node";
216 207
217sub connect { 208sub connect {
218 # we are trivially connected 209 # we are trivially connected
221# delay every so often to avoid recursion, also used to delay after spawn 212# delay every so often to avoid recursion, also used to delay after spawn
222our $DELAY = -50; 213our $DELAY = -50;
223our @DELAY; 214our @DELAY;
224our $DELAY_W; 215our $DELAY_W;
225 216
226sub _send_delayed { 217our $send_delayed = sub {
227 local $AnyEvent::MP::Kernel::SRCNODE = $AnyEvent::MP::Kernel::NODE{""}; 218 $AnyEvent::MP::Kernel::SRCNODE = $AnyEvent::MP::Kernel::NODE;
228 (shift @DELAY)->() 219 (shift @DELAY)->()
229 while @DELAY; 220 while @DELAY;
230 undef $DELAY_W; 221 undef $DELAY_W;
231 $DELAY = -50; 222 $DELAY = -50;
232} 223};
233 224
234sub transport_reset { 225sub transport_reset {
235 my ($self) = @_; 226 my ($self) = @_;
236 227
237 Scalar::Util::weaken $self; 228 Scalar::Util::weaken $self;
238 229
239 $self->{send} = sub { 230 $self->{send} = sub {
240 if ($DELAY++ >= 0) { 231 if (++$DELAY > 0) {
241 my $msg = $_[0]; 232 my $msg = $_[0];
242 push @DELAY, sub { AnyEvent::MP::Kernel::_inject (@$msg) }; 233 push @DELAY, sub { AnyEvent::MP::Kernel::_inject (@$msg) };
243 $DELAY_W ||= AE::timer 0, 0, \&_send_delayed; 234 $DELAY_W ||= AE::timer 0, 0, $send_delayed;
244 return; 235 return;
245 } 236 }
246 237
247 local $AnyEvent::MP::Kernel::SRCNODE = $self; 238 local $AnyEvent::MP::Kernel::SRCNODE = $AnyEvent::MP::Kernel::NODE;
248 AnyEvent::MP::Kernel::_inject (@{ $_[0] }); 239 AnyEvent::MP::Kernel::_inject (@{ $_[0] });
249 }; 240 };
250} 241}
251 242
252sub transport_connect { 243sub transport_connect {
253 my ($self, $tp) = @_; 244 my ($self, $tp) = @_;
254 245
255 $AnyEvent::MP::Kernel::WARN->(9, "I refuse to talk to myself ($tp->{peerhost}:$tp->{peerport})"); 246 AE::log 9 => "I refuse to talk to myself ($tp->{peerhost}:$tp->{peerport})";
256} 247}
257 248
258sub kill { 249sub kill {
259 my (undef, @args) = @_; 250 my (undef, @args) = @_;
260 251
261 # we _always_ delay kil's, to avoid calling mon callbacks 252 # we _always_ delay kil's, to avoid calling mon callbacks
262 # from anything but the event loop context. 253 # from anything but the event loop context.
263 $DELAY = 1; 254 $DELAY = 1;
264 push @DELAY, sub { AnyEvent::MP::Kernel::_kill (@args) }; 255 push @DELAY, sub { AnyEvent::MP::Kernel::_kill (@args) };
265 $DELAY_W ||= AE::timer 0, 0, \&_send_delayed; 256 $DELAY_W ||= AE::timer 0, 0, $send_delayed;
266} 257}
267 258
268sub monitor { 259sub monitor {
269 # maybe always delay, too? 260 # maybe always delay, too?
270 if ($DELAY_W) { 261 if ($DELAY_W) {

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines