… | |
… | |
20 | use AnyEvent::MP::Transport (); |
20 | use AnyEvent::MP::Transport (); |
21 | |
21 | |
22 | use base Exporter::; |
22 | use base Exporter::; |
23 | |
23 | |
24 | our $VERSION = '0.0'; |
24 | our $VERSION = '0.0'; |
|
|
25 | |
|
|
26 | our $DEFAULT_PORT = "4040"; |
|
|
27 | |
|
|
28 | sub normalise_noderef($) { |
|
|
29 | my ($noderef) = @_; |
|
|
30 | |
|
|
31 | my $cv = AE::cv; |
|
|
32 | my @res; |
|
|
33 | |
|
|
34 | $cv->begin (sub { |
|
|
35 | my %seen; |
|
|
36 | my @refs; |
|
|
37 | for (sort { $a->[0] <=> $b->[0] } @res) { |
|
|
38 | push @refs, $_->[1] unless $seen{$_->[1]}++ |
|
|
39 | } |
|
|
40 | shift->send (join ",", @refs); |
|
|
41 | }); |
|
|
42 | |
|
|
43 | $noderef = $DEFAULT_PORT unless length $noderef; |
|
|
44 | |
|
|
45 | my $idx; |
|
|
46 | for my $t (split /,/, $noderef) { |
|
|
47 | my $pri = ++$idx; |
|
|
48 | |
|
|
49 | #TODO: this should be outside normalise_noderef and in become_public |
|
|
50 | if ($t =~ /^\d*$/) { |
|
|
51 | require POSIX; |
|
|
52 | my $nodename = (POSIX::uname ())[1]; |
|
|
53 | |
|
|
54 | $cv->begin; |
|
|
55 | AnyEvent::Socket::resolve_sockaddr $nodename, $t || "aemp=$DEFAULT_PORT", "tcp", 0, undef, sub { |
|
|
56 | for (@_) { |
|
|
57 | my ($service, $host) = AnyEvent::Socket::unpack_sockaddr $_->[3]; |
|
|
58 | push @res, [ |
|
|
59 | $pri += 1e-5, |
|
|
60 | AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $host, $service |
|
|
61 | ]; |
|
|
62 | } |
|
|
63 | $cv->end; |
|
|
64 | }; |
|
|
65 | |
|
|
66 | # my (undef, undef, undef, undef, @ipv4) = gethostbyname $nodename; |
|
|
67 | # |
|
|
68 | # for (@ipv4) { |
|
|
69 | # push @res, [ |
|
|
70 | # $pri, |
|
|
71 | # AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $_, $t || $DEFAULT_PORT, |
|
|
72 | # ]; |
|
|
73 | # } |
|
|
74 | } else { |
|
|
75 | my ($host, $port) = AnyEvent::Socket::parse_hostport $t, "aemp=$DEFAULT_PORT" |
|
|
76 | or Carp::croak "$t: unparsable transport descriptor"; |
|
|
77 | |
|
|
78 | $cv->begin; |
|
|
79 | AnyEvent::Socket::resolve_sockaddr $host, $port, "tcp", 0, undef, sub { |
|
|
80 | for (@_) { |
|
|
81 | my ($service, $host) = AnyEvent::Socket::unpack_sockaddr $_->[3]; |
|
|
82 | push @res, [ |
|
|
83 | $pri += 1e-5, |
|
|
84 | AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $host, $service |
|
|
85 | ]; |
|
|
86 | } |
|
|
87 | $cv->end; |
|
|
88 | } |
|
|
89 | } |
|
|
90 | } |
|
|
91 | |
|
|
92 | $cv->end; |
|
|
93 | |
|
|
94 | $cv |
|
|
95 | } |
25 | |
96 | |
26 | sub new { |
97 | sub new { |
27 | my ($class, $noderef) = @_; |
98 | my ($class, $noderef) = @_; |
28 | |
99 | |
29 | bless { noderef => $noderef }, $class |
100 | bless { noderef => $noderef }, $class |
… | |
… | |
70 | |
141 | |
71 | sub clr_transport { |
142 | sub clr_transport { |
72 | my ($self) = @_; |
143 | my ($self) = @_; |
73 | |
144 | |
74 | delete $self->{transport}; |
145 | delete $self->{transport}; |
75 | warn "clr_transport\n"; |
146 | |
|
|
147 | $self->connect; |
76 | } |
148 | } |
77 | |
149 | |
78 | sub connect { |
150 | sub connect { |
79 | my ($self) = @_; |
151 | my ($self) = @_; |
80 | |
152 | |
81 | Scalar::Util::weaken $self; |
153 | Scalar::Util::weaken $self; |
82 | |
154 | |
83 | unless (exists $self->{n_noderef}) { |
155 | unless (exists $self->{n_noderef}) { |
|
|
156 | return if $self->{n_noderef_}++; |
84 | (AnyEvent::MP::normalise_noderef ($self->{noderef}))->cb (sub { |
157 | (AnyEvent::MP::Node::normalise_noderef ($self->{noderef}))->cb (sub { |
85 | $self or return; |
158 | $self or return; |
|
|
159 | delete $self->{n_noderef_}; |
86 | my $noderef = shift->recv; |
160 | my $noderef = shift->recv; |
87 | |
161 | |
88 | $self->{n_noderef} = $noderef; |
162 | $self->{n_noderef} = $noderef; |
89 | |
163 | |
90 | $AnyEvent::MP::NODE{$_} = $self |
164 | $AnyEvent::MP::Base::NODE{$_} = $self |
91 | for split /,/, $noderef; |
165 | for split /,/, $noderef; |
92 | |
166 | |
93 | $self->connect; |
167 | $self->connect; |
94 | }); |
168 | }); |
95 | return; |
169 | return; |
… | |
… | |
104 | my ($host, $port) = AnyEvent::Socket::parse_hostport $endpoint |
178 | my ($host, $port) = AnyEvent::Socket::parse_hostport $endpoint |
105 | or return; |
179 | or return; |
106 | |
180 | |
107 | my ($w, $g); |
181 | my ($w, $g); |
108 | |
182 | |
109 | $w = AE::timer $AnyEvent::MP::CONNECT_TIMEOUT, 0, sub { |
183 | $w = AE::timer $AnyEvent::MP::Base::CONNECT_TIMEOUT, 0, sub { |
110 | delete $self->{trial}{$endpoint}; |
184 | delete $self->{trial}{$endpoint}; |
111 | }; |
185 | }; |
112 | $g = AnyEvent::MP::Transport::mp_connect |
186 | $g = AnyEvent::MP::Transport::mp_connect |
113 | $host, $port, |
187 | $host, $port, |
114 | sub { |
188 | sub { |
… | |
… | |
122 | }; |
196 | }; |
123 | } else { |
197 | } else { |
124 | delete $self->{retry}; |
198 | delete $self->{retry}; |
125 | } |
199 | } |
126 | |
200 | |
127 | $self->{next_connect} = AE::timer $AnyEvent::MP::CONNECT_INTERVAL, 0, sub { |
201 | $self->{next_connect} = AE::timer $AnyEvent::MP::Base::CONNECT_INTERVAL, 0, sub { |
128 | $self->connect; |
202 | $self->connect; |
129 | }; |
203 | }; |
130 | } |
204 | } |
131 | |
205 | |
132 | package AnyEvent::MP::Node::Self; |
206 | package AnyEvent::MP::Node::Self; |
… | |
… | |
136 | sub set_transport { |
210 | sub set_transport { |
137 | die "FATAL error, set_transport was called"; |
211 | die "FATAL error, set_transport was called"; |
138 | } |
212 | } |
139 | |
213 | |
140 | sub send { |
214 | sub send { |
141 | die "self-send not implemented yet\n";#d# |
215 | AnyEvent::MP::Base::_inject ($_[1]); |
142 | } |
216 | } |
143 | |
217 | |
144 | =head1 SEE ALSO |
218 | =head1 SEE ALSO |
145 | |
219 | |
146 | L<AnyEvent>. |
220 | L<AnyEvent>. |