… | |
… | |
34 | our $VERSION = '0.01'; |
34 | our $VERSION = '0.01'; |
35 | our @EXPORT = qw( |
35 | our @EXPORT = qw( |
36 | %NODE %PORT %PORT_DATA %REG $UNIQ $ID add_node |
36 | %NODE %PORT %PORT_DATA %REG $UNIQ $ID add_node |
37 | |
37 | |
38 | NODE $NODE node_of snd kil _any_ |
38 | NODE $NODE node_of snd kil _any_ |
|
|
39 | resolve_node |
39 | become_slave become_public |
40 | become_slave become_public |
40 | ); |
41 | ); |
41 | |
42 | |
42 | our $DEFAULT_SECRET; |
43 | our $DEFAULT_SECRET; |
|
|
44 | our $DEFAULT_PORT = "4040"; |
43 | |
45 | |
44 | our $CONNECT_INTERVAL = 5; # new connect every 5s, at least |
46 | our $CONNECT_INTERVAL = 5; # new connect every 5s, at least |
45 | our $CONNECT_TIMEOUT = 30; # includes handshake |
47 | our $CONNECT_TIMEOUT = 30; # includes handshake |
46 | |
48 | |
47 | =item $AnyEvent::MP::Base::WARN |
49 | =item $AnyEvent::MP::Base::WARN |
… | |
… | |
139 | for (split /,/, $noderef) { |
141 | for (split /,/, $noderef) { |
140 | return $NODE{$noderef} = $NODE{$_} |
142 | return $NODE{$noderef} = $NODE{$_} |
141 | if exists $NODE{$_}; |
143 | if exists $NODE{$_}; |
142 | } |
144 | } |
143 | |
145 | |
|
|
146 | # new node, check validity |
|
|
147 | |
|
|
148 | for (split /,/, $noderef) { |
|
|
149 | my ($host, $port) = AnyEvent::Socket::parse_hostport $_ |
|
|
150 | or Carp::croak "$noderef: not a resolved node reference ('$_' not parsable)"; |
|
|
151 | |
|
|
152 | $port > 0 |
|
|
153 | or Carp::croak "$noderef: not a resolved node reference ('$_' contains non-numeric port)"; |
|
|
154 | |
|
|
155 | AnyEvent::Socket::parse_address $host |
|
|
156 | or Carp::croak "$noderef: not a resolved node reference ('$_' contains unresolved address)"; |
|
|
157 | } |
|
|
158 | |
144 | # for indirect sends, use a different class |
159 | # TODO: for indirect sends, use a different class |
145 | my $node = new AnyEvent::MP::Node::Direct $noderef; |
160 | my $node = new AnyEvent::MP::Node::Direct $noderef; |
146 | |
161 | |
147 | $NODE{$_} = $node |
162 | $NODE{$_} = $node |
148 | for $noderef, split /,/, $noderef; |
163 | for $noderef, split /,/, $noderef; |
149 | |
164 | |
… | |
… | |
158 | } |
173 | } |
159 | |
174 | |
160 | sub kil(@) { |
175 | sub kil(@) { |
161 | my ($noderef, $port) = split /#/, shift, 2; |
176 | my ($noderef, $port) = split /#/, shift, 2; |
162 | |
177 | |
|
|
178 | length $port |
|
|
179 | or Carp::croak "killing the node port is not allowed, caught"; |
|
|
180 | |
163 | ($NODE{$noderef} || add_node $noderef) |
181 | ($NODE{$noderef} || add_node $noderef) |
164 | ->kill ($port, @_); |
182 | ->kill ($port, @_); |
165 | } |
183 | } |
166 | |
184 | |
|
|
185 | sub resolve_node($) { |
|
|
186 | my ($noderef) = @_; |
|
|
187 | |
|
|
188 | my $cv = AE::cv; |
|
|
189 | my @res; |
|
|
190 | |
|
|
191 | $cv->begin (sub { |
|
|
192 | my %seen; |
|
|
193 | my @refs; |
|
|
194 | for (sort { $a->[0] <=> $b->[0] } @res) { |
|
|
195 | push @refs, $_->[1] unless $seen{$_->[1]}++ |
|
|
196 | } |
|
|
197 | shift->send (join ",", @refs); |
|
|
198 | }); |
|
|
199 | |
|
|
200 | $noderef = $DEFAULT_PORT unless length $noderef; |
|
|
201 | |
|
|
202 | my $idx; |
|
|
203 | for my $t (split /,/, $noderef) { |
|
|
204 | my $pri = ++$idx; |
|
|
205 | |
|
|
206 | if ($t =~ /^\d*$/) { |
|
|
207 | require POSIX; |
|
|
208 | my $nodename = (POSIX::uname ())[1]; |
|
|
209 | |
|
|
210 | $cv->begin; |
|
|
211 | AnyEvent::Socket::resolve_sockaddr $nodename, $t || "aemp=$DEFAULT_PORT", "tcp", 0, undef, sub { |
|
|
212 | for (@_) { |
|
|
213 | my ($service, $host) = AnyEvent::Socket::unpack_sockaddr $_->[3]; |
|
|
214 | push @res, [ |
|
|
215 | $pri += 1e-5, |
|
|
216 | AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $host, $service |
|
|
217 | ]; |
|
|
218 | } |
|
|
219 | $cv->end; |
|
|
220 | }; |
|
|
221 | |
|
|
222 | # my (undef, undef, undef, undef, @ipv4) = gethostbyname $nodename; |
|
|
223 | # |
|
|
224 | # for (@ipv4) { |
|
|
225 | # push @res, [ |
|
|
226 | # $pri, |
|
|
227 | # AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $_, $t || $DEFAULT_PORT, |
|
|
228 | # ]; |
|
|
229 | # } |
|
|
230 | } else { |
|
|
231 | my ($host, $port) = AnyEvent::Socket::parse_hostport $t, "aemp=$DEFAULT_PORT" |
|
|
232 | or Carp::croak "$t: unparsable transport descriptor"; |
|
|
233 | |
|
|
234 | $cv->begin; |
|
|
235 | AnyEvent::Socket::resolve_sockaddr $host, $port, "tcp", 0, undef, sub { |
|
|
236 | for (@_) { |
|
|
237 | my ($service, $host) = AnyEvent::Socket::unpack_sockaddr $_->[3]; |
|
|
238 | push @res, [ |
|
|
239 | $pri += 1e-5, |
|
|
240 | AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $host, $service |
|
|
241 | ]; |
|
|
242 | } |
|
|
243 | $cv->end; |
|
|
244 | } |
|
|
245 | } |
|
|
246 | } |
|
|
247 | |
|
|
248 | $cv->end; |
|
|
249 | |
|
|
250 | $cv |
|
|
251 | } |
|
|
252 | |
167 | sub become_public { |
253 | sub become_public { |
168 | return if $PUBLIC; |
254 | return if $PUBLIC; |
169 | |
255 | |
170 | my $noderef = join ",", @_; |
256 | my $noderef = join ",", @_; |
171 | my @args = @_; |
257 | my @args = @_; |
172 | |
258 | |
173 | $NODE = (AnyEvent::MP::Node::normalise_noderef $noderef)->recv; |
259 | $NODE = (resolve_node $noderef)->recv; |
174 | |
260 | |
175 | for my $t (split /,/, $NODE) { |
261 | for my $t (split /,/, $NODE) { |
176 | $NODE{$t} = $NODE{""}; |
262 | $NODE{$t} = $NODE{""}; |
177 | |
263 | |
178 | my ($host, $port) = AnyEvent::Socket::parse_hostport $t; |
264 | my ($host, $port) = AnyEvent::Socket::parse_hostport $t; |