ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Node.pm
(Generate patch)

Comparing AnyEvent-MP/MP/Node.pm (file contents):
Revision 1.62 by root, Thu Mar 22 00:48:29 2012 UTC vs.
Revision 1.69 by root, Sun Aug 28 14:49:44 2016 UTC

10 10
11This is an internal utility module, horrible to look at, so don't. 11This is an internal utility module, horrible to look at, so don't.
12 12
13=cut 13=cut
14 14
15package AnyEvent::MP::Node; 15package AnyEvent::MP::Node; # base class for nodes
16 16
17use common::sense; 17use common::sense;
18 18
19use AE ();
20use AnyEvent::Util (); 19use AnyEvent ();
21use AnyEvent::Socket (); 20use AnyEvent::Socket ();
22 21
23use AnyEvent::MP::Transport (); 22use AnyEvent::MP::Transport ();
24 23
25sub new { 24sub new {
26 my ($self, $id) = @_; 25 my ($self, $id) = @_;
27 26
28 $self = bless { id => $id }, $self; 27 $self = bless { id => $id }, $self;
29 28
29 # register
30 $AnyEvent::MP::Kernel::NODE{$id} = $self;
31
30 $self->init; 32 $self->init;
31 $self->transport_reset; 33 $self->transport_reset;
32 34
33 $self 35 $self
34} 36}
35 37
38sub DESTROY {
39 # unregister
40 delete $AnyEvent::MP::Kernel::NODE{$_[0]{id}};
41}
42
36sub init { 43sub init {
37 # 44 #
38} 45}
39 46
40sub send { 47sub send {
41 &{ shift->{send} } 48 &{ shift->{send} }
42} 49}
43 50
44# nodes reachable via the network 51# nodes reachable via the network
45package AnyEvent::MP::Node::Remote; 52package AnyEvent::MP::Node::Remote; # a remote node
46 53
47use base "AnyEvent::MP::Node"; 54use base "AnyEvent::MP::Node";
48 55
49# called at init time, mostly sets {send} 56# called at init time, mostly sets {send}
50sub transport_reset { 57sub transport_reset {
78 } 85 }
79 86
80 AnyEvent::MP::Kernel::_inject_nodeevent ($self, 0, @reason) 87 AnyEvent::MP::Kernel::_inject_nodeevent ($self, 0, @reason)
81 unless $no_transport; 88 unless $no_transport;
82 89
83 # if we are here and are idle, we nuke ourselves 90 # we weaken the node reference, so it can go away if unused
84 delete $AnyEvent::MP::Kernel::NODE{$self->{id}} 91 Scalar::Util::weaken $AnyEvent::MP::Kernel::NODE{$self->{id}}
85 unless $self->{transport} || $self->{connect_to}; 92 unless $self->{connect_to};
93
94 AE::log 9 => "@reason";
86} 95}
87 96
88# called after handshake was successful 97# called after handshake was successful
89sub transport_connect { 98sub transport_connect {
90 my ($self, $transport) = @_; 99 my ($self, $transport) = @_;
113 my ($self) = @_; 122 my ($self) = @_;
114 123
115 return if $self->{transport}; 124 return if $self->{transport};
116 return if $self->{connect_w}; 125 return if $self->{connect_w};
117 126
127 # we unweaken the node reference, in case it was weakened before
128 $AnyEvent::MP::Kernel::NODE{$self->{id}}
129 = $AnyEvent::MP::Kernel::NODE{$self->{id}};
130
118 Scalar::Util::weaken $self; 131 Scalar::Util::weaken $self;
119 132
120 $self->{connect_to} ||= AE::timer $AnyEvent::MP::Kernel::CONFIG->{connect_interval}, 0, sub { 133 $self->{connect_to} ||= AE::timer $AnyEvent::MP::Kernel::CONFIG->{connect_interval}, 0, sub {
121 $self->transport_error (transport_error => $self->{id}, "unable to connect"); 134 $self->transport_error (transport_error => $self->{id}, "connect timeout");
122 }; 135 };
123 136
124 # maybe @$addresses? 137 # maybe @$addresses?
125 my $addresses = $AnyEvent::MP::Kernel::GLOBAL_DB{"'l"}{$self->{id}}; 138 my $addresses = $AnyEvent::MP::Kernel::GLOBAL_DB{"'l"}{$self->{id}};
126 139
127 if ($addresses) { 140 if ($addresses) {
128 $self->connect_to ($addresses); 141 $self->connect_to ($addresses);
129 } else { 142 } else {
130 # on global nodes, all bets are off now - we either know the node, or we don't 143 # on global nodes, all bets are off now - we either know the node, or we don't
131 unless ($AnyEvent::MP::Kernel::GLOBAL) { 144 if ($AnyEvent::MP::Kernel::GLOBAL) {
145 $self->transport_error (transport_error => $self->{id}, "no known address");
146 } else {
132 AnyEvent::MP::Kernel::g_find ($self->{id}); 147 AnyEvent::MP::Kernel::g_find ($self->{id});
133 } 148 }
134 } 149 }
135} 150}
136 151
138 my ($self, $addresses) = @_; 153 my ($self, $addresses) = @_;
139 154
140 return if $self->{transport}; 155 return if $self->{transport};
141 return if $self->{connect_w}; 156 return if $self->{connect_w};
142 157
143 return unless @$addresses; 158 unless (@$addresses) {
159 $self->transport_error (transport_error => $self->{id}, "no known address");
160 return;
161 }
144 162
145 AE::log 9 => "connecting to $self->{id} with [@$addresses]"; 163 AE::log 9 => "connecting to $self->{id} with [@$addresses]";
146 164
147 my $monitor = $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout}; 165 my $monitor = $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout};
148 my $interval = $AnyEvent::MP::Kernel::CONFIG->{connect_interval}; 166 my $interval = $AnyEvent::MP::Kernel::CONFIG->{connect_interval};
153 $interval = 0.4 if $interval < 0.4; 171 $interval = 0.4 if $interval < 0.4;
154 172
155 my @endpoints = reverse @$addresses; 173 my @endpoints = reverse @$addresses;
156 174
157 $self->{connect_w} = AE::timer 0, $interval * (0.9 + 0.1 * rand), sub { 175 $self->{connect_w} = AE::timer 0, $interval * (0.9 + 0.1 * rand), sub {
158 my $endpoint = pop @endpoints; 176 my $endpoint = pop @endpoints
177 or return;
159 178
160 AE::log 9 => "connecting to $self->{id} at $endpoint"; 179 AE::log 9 => "connecting to $self->{id} at $endpoint";
161 180
162 $self->{trial}{$endpoint} ||= do { 181 $self->{trial}{$endpoint} ||= do {
163 my ($host, $port) = AnyEvent::Socket::parse_hostport $endpoint 182 my ($host, $port) = AnyEvent::Socket::parse_hostport $endpoint
171} 190}
172 191
173sub kill { 192sub kill {
174 my ($self, $port, @reason) = @_; 193 my ($self, $port, @reason) = @_;
175 194
176 $self->{send} (["", kil => $port, @reason]); 195 $self->{send} (["", kil1 => $port, @reason]);
177} 196}
178 197
179sub monitor { 198sub monitor {
180 my ($self, $portid, $cb) = @_; 199 my ($self, $portid, $cb) = @_;
181 200
199 $self->send (["", mon0 => $portid]); 218 $self->send (["", mon0 => $portid]);
200 delete $self->{monitor}{$portid}; 219 delete $self->{monitor}{$portid};
201 } 220 }
202} 221}
203 222
204package AnyEvent::MP::Node::Self; 223package AnyEvent::MP::Node::Self; # the local node
205 224
206use base "AnyEvent::MP::Node"; 225use base "AnyEvent::MP::Node";
207 226
208sub connect { 227sub connect {
209 # we are trivially connected 228 # we are trivially connected

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines