ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Node.pm
(Generate patch)

Comparing AnyEvent-MP/MP/Node.pm (file contents):
Revision 1.57 by root, Thu Mar 1 18:31:42 2012 UTC vs.
Revision 1.64 by root, Fri Mar 23 21:16:25 2012 UTC

10 10
11This is an internal utility module, horrible to look at, so don't. 11This is an internal utility module, horrible to look at, so don't.
12 12
13=cut 13=cut
14 14
15package AnyEvent::MP::Node; 15package AnyEvent::MP::Node; # base class for nodes
16 16
17use common::sense; 17use common::sense;
18 18
19use AE ();
20use AnyEvent::Util (); 19use AnyEvent ();
21use AnyEvent::Socket (); 20use AnyEvent::Socket ();
22 21
23use AnyEvent::MP::Transport (); 22use AnyEvent::MP::Transport ();
24 23
25sub new { 24sub new {
35 34
36sub init { 35sub init {
37 # 36 #
38} 37}
39 38
40#d# only needed for bad global code at the moment
41sub transport_error {
42}
43
44sub send { 39sub send {
45 &{ shift->{send} } 40 &{ shift->{send} }
46} 41}
47 42
48# nodes reachable via the network 43# nodes reachable via the network
49package AnyEvent::MP::Node::Remote; 44package AnyEvent::MP::Node::Remote; # a remote node
50 45
51use base "AnyEvent::MP::Node"; 46use base "AnyEvent::MP::Node";
52 47
53# called at init time, mostly sets {send} 48# called at init time, mostly sets {send}
54sub transport_reset { 49sub transport_reset {
96 delete $self->{trial}; 91 delete $self->{trial};
97 92
98 $self->transport_error (transport_error => $self->{id}, "switched connections") 93 $self->transport_error (transport_error => $self->{id}, "switched connections")
99 if $self->{transport}; 94 if $self->{transport};
100 95
101 delete $self->{connect_addr};
102 delete $self->{connect_w}; 96 delete $self->{connect_w};
103 delete $self->{connect_to}; 97 delete $self->{connect_to};
104 98
105 $self->{transport} = $transport; 99 $self->{transport} = $transport;
106 100
119 113
120 return if $self->{transport}; 114 return if $self->{transport};
121 return if $self->{connect_w}; 115 return if $self->{connect_w};
122 116
123 Scalar::Util::weaken $self; 117 Scalar::Util::weaken $self;
118 use Carp;Carp::cluck;#d#
124 119
125 my $monitor = $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout}; 120 $self->{connect_to} ||= AE::timer $AnyEvent::MP::Kernel::CONFIG->{connect_interval}, 0, sub {
126
127 $self->{connect_to} ||= AE::timer $monitor, 0, sub {
128 $self->transport_error (transport_error => $self->{id}, "unable to connect"); 121 $self->transport_error (transport_error => $self->{id}, "unable to connect");
129 }; 122 };
130 123
131 # maybe @$addresses? 124 # maybe @$addresses?
132 my @addresses = @{ $AnyEvent::MP::Kernel::GLOBAL_DB{"'l"}{$self->{id}} }; 125 my $addresses = $AnyEvent::MP::Kernel::GLOBAL_DB{"'l"}{$self->{id}};
133 126
134 unless (@addresses) { 127 if ($addresses) {
128 $self->connect_to ($addresses);
129 } else {
135 # on global nodes, all bets are off now - we either know the node, or we don't 130 # on global nodes, all bets are off now - we either know the node, or we don't
136 unless ($AnyEvent::MP::Kernel::GLOBAL) { 131 unless ($AnyEvent::MP::Kernel::GLOBAL) {
137 AnyEvent::MP::Kernel::g_find ($self->{id}); 132 AnyEvent::MP::Kernel::g_find ($self->{id});
138 } 133 }
139 return; 134 }
135}
136
137sub connect_to {
138 my ($self, $addresses) = @_;
139
140 return if $self->{transport};
141 return if $self->{connect_w};
142
143 return unless @$addresses;
140 } 144
141
142 $self->{connect_addr} = \@addresses; # a bit weird, but efficient
143
144 $AnyEvent::MP::Kernel::WARN->(9, "connecting to $self->{id} with [@addresses]"); 145 AE::log 9 => "connecting to $self->{id} with [@$addresses]";
145 146
147 my $monitor = $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout};
146 my $interval = $AnyEvent::MP::Kernel::CONFIG->{connect_interval}; 148 my $interval = $AnyEvent::MP::Kernel::CONFIG->{connect_interval};
147 149
148 $interval = ($monitor - $interval) / @addresses 150 $interval = ($monitor - $interval) / @$addresses
149 if ($monitor - $interval) / @addresses < $interval; 151 if ($monitor - $interval) / @$addresses < $interval;
150 152
151 $interval = 0.4 if $interval < 0.4; 153 $interval = 0.4 if $interval < 0.4;
152 154
153 my @endpoints; 155 my @endpoints = reverse @$addresses;
154 156
155 $self->{connect_w} = AE::timer 0, $interval * (0.9 + 0.1 * rand), sub { 157 $self->{connect_w} = AE::timer 0, $interval * (0.9 + 0.1 * rand), sub {
156 @endpoints = @addresses
157 unless @endpoints;
158
159 my $endpoint = shift @endpoints; 158 my $endpoint = pop @endpoints;
160 159
161 $AnyEvent::MP::Kernel::WARN->(9, "connecting to $self->{id} at $endpoint"); 160 AE::log 9 => "connecting to $self->{id} at $endpoint";
162 161
163 $self->{trial}{$endpoint} ||= do { 162 $self->{trial}{$endpoint} ||= do {
164 my ($host, $port) = AnyEvent::Socket::parse_hostport $endpoint 163 my ($host, $port) = AnyEvent::Socket::parse_hostport $endpoint
165 or return $AnyEvent::MP::Kernel::WARN->(1, "$self->{id}: not a resolved node reference."); 164 or return AE::log critical => "$self->{id}: '$endpoint' is not a resolved node reference.";
166 165
167 AnyEvent::MP::Transport::mp_connect 166 AnyEvent::MP::Transport::mp_connect
168 $host, $port, 167 $host, $port,
169 sub { delete $self->{trial}{$endpoint} }, 168 sub { delete $self->{trial}{$endpoint} },
170 }; 169 };
200 $self->send (["", mon0 => $portid]); 199 $self->send (["", mon0 => $portid]);
201 delete $self->{monitor}{$portid}; 200 delete $self->{monitor}{$portid};
202 } 201 }
203} 202}
204 203
205package AnyEvent::MP::Node::Self; 204package AnyEvent::MP::Node::Self; # the local node
206 205
207use base "AnyEvent::MP::Node"; 206use base "AnyEvent::MP::Node";
208 207
209sub connect { 208sub connect {
210 # we are trivially connected 209 # we are trivially connected
213# delay every so often to avoid recursion, also used to delay after spawn 212# delay every so often to avoid recursion, also used to delay after spawn
214our $DELAY = -50; 213our $DELAY = -50;
215our @DELAY; 214our @DELAY;
216our $DELAY_W; 215our $DELAY_W;
217 216
218sub _send_delayed { 217our $send_delayed = sub {
219 local $AnyEvent::MP::Kernel::SRCNODE = $AnyEvent::MP::Kernel::NODE{""}; 218 $AnyEvent::MP::Kernel::SRCNODE = $AnyEvent::MP::Kernel::NODE;
220 (shift @DELAY)->() 219 (shift @DELAY)->()
221 while @DELAY; 220 while @DELAY;
222 undef $DELAY_W; 221 undef $DELAY_W;
223 $DELAY = -50; 222 $DELAY = -50;
224} 223};
225 224
226sub transport_reset { 225sub transport_reset {
227 my ($self) = @_; 226 my ($self) = @_;
228 227
229 Scalar::Util::weaken $self; 228 Scalar::Util::weaken $self;
230 229
231 $self->{send} = sub { 230 $self->{send} = sub {
232 if ($DELAY++ >= 0) { 231 if (++$DELAY > 0) {
233 my $msg = $_[0]; 232 my $msg = $_[0];
234 push @DELAY, sub { AnyEvent::MP::Kernel::_inject (@$msg) }; 233 push @DELAY, sub { AnyEvent::MP::Kernel::_inject (@$msg) };
235 $DELAY_W ||= AE::timer 0, 0, \&_send_delayed; 234 $DELAY_W ||= AE::timer 0, 0, $send_delayed;
236 return; 235 return;
237 } 236 }
238 237
239 local $AnyEvent::MP::Kernel::SRCNODE = $self; 238 local $AnyEvent::MP::Kernel::SRCNODE = $AnyEvent::MP::Kernel::NODE;
240 AnyEvent::MP::Kernel::_inject (@{ $_[0] }); 239 AnyEvent::MP::Kernel::_inject (@{ $_[0] });
241 }; 240 };
242} 241}
243 242
244sub transport_connect { 243sub transport_connect {
245 my ($self, $tp) = @_; 244 my ($self, $tp) = @_;
246 245
247 $AnyEvent::MP::Kernel::WARN->(9, "I refuse to talk to myself ($tp->{peerhost}:$tp->{peerport})"); 246 AE::log 9 => "I refuse to talk to myself ($tp->{peerhost}:$tp->{peerport})";
248} 247}
249 248
250sub kill { 249sub kill {
251 my (undef, @args) = @_; 250 my (undef, @args) = @_;
252 251
253 # we _always_ delay kil's, to avoid calling mon callbacks 252 # we _always_ delay kil's, to avoid calling mon callbacks
254 # from anything but the event loop context. 253 # from anything but the event loop context.
255 $DELAY = 1; 254 $DELAY = 1;
256 push @DELAY, sub { AnyEvent::MP::Kernel::_kill (@args) }; 255 push @DELAY, sub { AnyEvent::MP::Kernel::_kill (@args) };
257 $DELAY_W ||= AE::timer 0, 0, \&_send_delayed; 256 $DELAY_W ||= AE::timer 0, 0, $send_delayed;
258} 257}
259 258
260sub monitor { 259sub monitor {
261 # maybe always delay, too? 260 # maybe always delay, too?
262 if ($DELAY_W) { 261 if ($DELAY_W) {

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines