ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Node.pm
(Generate patch)

Comparing AnyEvent-MP/MP/Node.pm (file contents):
Revision 1.63 by root, Fri Mar 23 03:24:41 2012 UTC vs.
Revision 1.68 by root, Sun Aug 28 09:39:23 2016 UTC

10 10
11This is an internal utility module, horrible to look at, so don't. 11This is an internal utility module, horrible to look at, so don't.
12 12
13=cut 13=cut
14 14
15package AnyEvent::MP::Node; 15package AnyEvent::MP::Node; # base class for nodes
16 16
17use common::sense; 17use common::sense;
18 18
19use AnyEvent (); 19use AnyEvent ();
20use AnyEvent::Socket (); 20use AnyEvent::Socket ();
24sub new { 24sub new {
25 my ($self, $id) = @_; 25 my ($self, $id) = @_;
26 26
27 $self = bless { id => $id }, $self; 27 $self = bless { id => $id }, $self;
28 28
29 # register
30 $AnyEvent::MP::Kernel::NODE{$id} = $self;
31
29 $self->init; 32 $self->init;
30 $self->transport_reset; 33 $self->transport_reset;
31 34
32 $self 35 $self
33} 36}
34 37
38sub DESTROY {
39 # unregister
40 delete $AnyEvent::MP::Kernel::NODE{$_[0]{id}};
41}
42
35sub init { 43sub init {
36 # 44 #
37} 45}
38 46
39sub send { 47sub send {
40 &{ shift->{send} } 48 &{ shift->{send} }
41} 49}
42 50
43# nodes reachable via the network 51# nodes reachable via the network
44package AnyEvent::MP::Node::Remote; 52package AnyEvent::MP::Node::Remote; # a remote node
45 53
46use base "AnyEvent::MP::Node"; 54use base "AnyEvent::MP::Node";
47 55
48# called at init time, mostly sets {send} 56# called at init time, mostly sets {send}
49sub transport_reset { 57sub transport_reset {
77 } 85 }
78 86
79 AnyEvent::MP::Kernel::_inject_nodeevent ($self, 0, @reason) 87 AnyEvent::MP::Kernel::_inject_nodeevent ($self, 0, @reason)
80 unless $no_transport; 88 unless $no_transport;
81 89
82 # if we are here and are idle, we nuke ourselves 90 # we weaken the node reference, so it can go away if unused
83 delete $AnyEvent::MP::Kernel::NODE{$self->{id}} 91 Scalar::Util::weaken $AnyEvent::MP::Kernel::NODE{$self->{id}}
84 unless $self->{transport} || $self->{connect_to}; 92 unless $self->{connect_to};
93
94 AE::log 9 => "@reason";
85} 95}
86 96
87# called after handshake was successful 97# called after handshake was successful
88sub transport_connect { 98sub transport_connect {
89 my ($self, $transport) = @_; 99 my ($self, $transport) = @_;
112 my ($self) = @_; 122 my ($self) = @_;
113 123
114 return if $self->{transport}; 124 return if $self->{transport};
115 return if $self->{connect_w}; 125 return if $self->{connect_w};
116 126
127 # we unweaken the node reference, in case it was weakened before
128 $AnyEvent::MP::Kernel::NODE{$self->{id}}
129 = $AnyEvent::MP::Kernel::NODE{$self->{id}};
130
117 Scalar::Util::weaken $self; 131 Scalar::Util::weaken $self;
118 132
119 $self->{connect_to} ||= AE::timer $AnyEvent::MP::Kernel::CONFIG->{connect_interval}, 0, sub { 133 $self->{connect_to} ||= AE::timer $AnyEvent::MP::Kernel::CONFIG->{connect_interval}, 0, sub {
120 $self->transport_error (transport_error => $self->{id}, "unable to connect"); 134 $self->transport_error (transport_error => $self->{id}, "connect timeout");
121 }; 135 };
122 136
123 # maybe @$addresses? 137 # maybe @$addresses?
124 my $addresses = $AnyEvent::MP::Kernel::GLOBAL_DB{"'l"}{$self->{id}}; 138 my $addresses = $AnyEvent::MP::Kernel::GLOBAL_DB{"'l"}{$self->{id}};
125 139
126 if ($addresses) { 140 if ($addresses) {
127 $self->connect_to ($addresses); 141 $self->connect_to ($addresses);
128 } else { 142 } else {
129 # on global nodes, all bets are off now - we either know the node, or we don't 143 # on global nodes, all bets are off now - we either know the node, or we don't
130 unless ($AnyEvent::MP::Kernel::GLOBAL) { 144 if ($AnyEvent::MP::Kernel::GLOBAL) {
145 $self->transport_error (transport_error => $self->{id}, "no known address");
146 } else {
131 AnyEvent::MP::Kernel::g_find ($self->{id}); 147 AnyEvent::MP::Kernel::g_find ($self->{id});
132 } 148 }
133 } 149 }
134} 150}
135 151
137 my ($self, $addresses) = @_; 153 my ($self, $addresses) = @_;
138 154
139 return if $self->{transport}; 155 return if $self->{transport};
140 return if $self->{connect_w}; 156 return if $self->{connect_w};
141 157
142 return unless @$addresses; 158 unless (@$addresses) {
159 $self->transport_error (transport_error => $self->{id}, "no known address");
160 return;
161 }
143 162
144 AE::log 9 => "connecting to $self->{id} with [@$addresses]"; 163 AE::log 9 => "connecting to $self->{id} with [@$addresses]";
145 164
146 my $monitor = $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout}; 165 my $monitor = $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout};
147 my $interval = $AnyEvent::MP::Kernel::CONFIG->{connect_interval}; 166 my $interval = $AnyEvent::MP::Kernel::CONFIG->{connect_interval};
152 $interval = 0.4 if $interval < 0.4; 171 $interval = 0.4 if $interval < 0.4;
153 172
154 my @endpoints = reverse @$addresses; 173 my @endpoints = reverse @$addresses;
155 174
156 $self->{connect_w} = AE::timer 0, $interval * (0.9 + 0.1 * rand), sub { 175 $self->{connect_w} = AE::timer 0, $interval * (0.9 + 0.1 * rand), sub {
157 my $endpoint = pop @endpoints; 176 my $endpoint = pop @endpoints
177 or return $self->transport_error (transport_error => $self->{id}, "unable to connect to any address");
158 178
159 AE::log 9 => "connecting to $self->{id} at $endpoint"; 179 AE::log 9 => "connecting to $self->{id} at $endpoint";
160 180
161 $self->{trial}{$endpoint} ||= do { 181 $self->{trial}{$endpoint} ||= do {
162 my ($host, $port) = AnyEvent::Socket::parse_hostport $endpoint 182 my ($host, $port) = AnyEvent::Socket::parse_hostport $endpoint
170} 190}
171 191
172sub kill { 192sub kill {
173 my ($self, $port, @reason) = @_; 193 my ($self, $port, @reason) = @_;
174 194
175 $self->{send} (["", kil => $port, @reason]); 195 $self->{send} (["", kil1 => $port, @reason]);
176} 196}
177 197
178sub monitor { 198sub monitor {
179 my ($self, $portid, $cb) = @_; 199 my ($self, $portid, $cb) = @_;
180 200
198 $self->send (["", mon0 => $portid]); 218 $self->send (["", mon0 => $portid]);
199 delete $self->{monitor}{$portid}; 219 delete $self->{monitor}{$portid};
200 } 220 }
201} 221}
202 222
203package AnyEvent::MP::Node::Self; 223package AnyEvent::MP::Node::Self; # the local node
204 224
205use base "AnyEvent::MP::Node"; 225use base "AnyEvent::MP::Node";
206 226
207sub connect { 227sub connect {
208 # we are trivially connected 228 # we are trivially connected

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines