ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Base.pm
(Generate patch)

Comparing AnyEvent-MP/MP/Base.pm (file contents):
Revision 1.9 by root, Tue Aug 4 21:06:47 2009 UTC vs.
Revision 1.10 by root, Tue Aug 4 23:16:57 2009 UTC

34our $VERSION = '0.01'; 34our $VERSION = '0.01';
35our @EXPORT = qw( 35our @EXPORT = qw(
36 %NODE %PORT %PORT_DATA %REG $UNIQ $ID add_node 36 %NODE %PORT %PORT_DATA %REG $UNIQ $ID add_node
37 37
38 NODE $NODE node_of snd kil _any_ 38 NODE $NODE node_of snd kil _any_
39 resolve_node
39 become_slave become_public 40 become_slave become_public
40); 41);
41 42
42our $DEFAULT_SECRET; 43our $DEFAULT_SECRET;
44our $DEFAULT_PORT = "4040";
43 45
44our $CONNECT_INTERVAL = 5; # new connect every 5s, at least 46our $CONNECT_INTERVAL = 5; # new connect every 5s, at least
45our $CONNECT_TIMEOUT = 30; # includes handshake 47our $CONNECT_TIMEOUT = 30; # includes handshake
46 48
47=item $AnyEvent::MP::Base::WARN 49=item $AnyEvent::MP::Base::WARN
139 for (split /,/, $noderef) { 141 for (split /,/, $noderef) {
140 return $NODE{$noderef} = $NODE{$_} 142 return $NODE{$noderef} = $NODE{$_}
141 if exists $NODE{$_}; 143 if exists $NODE{$_};
142 } 144 }
143 145
146 # new node, check validity
147
148 for (split /,/, $noderef) {
149 my ($host, $port) = AnyEvent::Socket::parse_hostport $_
150 or Carp::croak "$noderef: not a resolved node reference ('$_' not parsable)";
151
152 $port > 0
153 or Carp::croak "$noderef: not a resolved node reference ('$_' contains non-numeric port)";
154
155 AnyEvent::Socket::parse_address $host
156 or Carp::croak "$noderef: not a resolved node reference ('$_' contains unresolved address)";
157 }
158
144 # for indirect sends, use a different class 159 # TODO: for indirect sends, use a different class
145 my $node = new AnyEvent::MP::Node::Direct $noderef; 160 my $node = new AnyEvent::MP::Node::Direct $noderef;
146 161
147 $NODE{$_} = $node 162 $NODE{$_} = $node
148 for $noderef, split /,/, $noderef; 163 for $noderef, split /,/, $noderef;
149 164
158} 173}
159 174
160sub kil(@) { 175sub kil(@) {
161 my ($noderef, $port) = split /#/, shift, 2; 176 my ($noderef, $port) = split /#/, shift, 2;
162 177
178 length $port
179 or Carp::croak "killing the node port is not allowed, caught";
180
163 ($NODE{$noderef} || add_node $noderef) 181 ($NODE{$noderef} || add_node $noderef)
164 ->kill ($port, @_); 182 ->kill ($port, @_);
165} 183}
166 184
185sub resolve_node($) {
186 my ($noderef) = @_;
187
188 my $cv = AE::cv;
189 my @res;
190
191 $cv->begin (sub {
192 my %seen;
193 my @refs;
194 for (sort { $a->[0] <=> $b->[0] } @res) {
195 push @refs, $_->[1] unless $seen{$_->[1]}++
196 }
197 shift->send (join ",", @refs);
198 });
199
200 $noderef = $DEFAULT_PORT unless length $noderef;
201
202 my $idx;
203 for my $t (split /,/, $noderef) {
204 my $pri = ++$idx;
205
206 if ($t =~ /^\d*$/) {
207 require POSIX;
208 my $nodename = (POSIX::uname ())[1];
209
210 $cv->begin;
211 AnyEvent::Socket::resolve_sockaddr $nodename, $t || "aemp=$DEFAULT_PORT", "tcp", 0, undef, sub {
212 for (@_) {
213 my ($service, $host) = AnyEvent::Socket::unpack_sockaddr $_->[3];
214 push @res, [
215 $pri += 1e-5,
216 AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $host, $service
217 ];
218 }
219 $cv->end;
220 };
221
222# my (undef, undef, undef, undef, @ipv4) = gethostbyname $nodename;
223#
224# for (@ipv4) {
225# push @res, [
226# $pri,
227# AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $_, $t || $DEFAULT_PORT,
228# ];
229# }
230 } else {
231 my ($host, $port) = AnyEvent::Socket::parse_hostport $t, "aemp=$DEFAULT_PORT"
232 or Carp::croak "$t: unparsable transport descriptor";
233
234 $cv->begin;
235 AnyEvent::Socket::resolve_sockaddr $host, $port, "tcp", 0, undef, sub {
236 for (@_) {
237 my ($service, $host) = AnyEvent::Socket::unpack_sockaddr $_->[3];
238 push @res, [
239 $pri += 1e-5,
240 AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $host, $service
241 ];
242 }
243 $cv->end;
244 }
245 }
246 }
247
248 $cv->end;
249
250 $cv
251}
252
167sub become_public { 253sub become_public {
168 return if $PUBLIC; 254 return if $PUBLIC;
169 255
170 my $noderef = join ",", @_; 256 my $noderef = join ",", @_;
171 my @args = @_; 257 my @args = @_;
172 258
173 $NODE = (AnyEvent::MP::Node::normalise_noderef $noderef)->recv; 259 $NODE = (resolve_node $noderef)->recv;
174 260
175 for my $t (split /,/, $NODE) { 261 for my $t (split /,/, $NODE) {
176 $NODE{$t} = $NODE{""}; 262 $NODE{$t} = $NODE{""};
177 263
178 my ($host, $port) = AnyEvent::Socket::parse_hostport $t; 264 my ($host, $port) = AnyEvent::Socket::parse_hostport $t;

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines