--- AnyEvent-MP/MP/Node.pm 2009/08/01 15:04:30 1.4 +++ AnyEvent-MP/MP/Node.pm 2009/08/02 14:44:37 1.5 @@ -23,6 +23,77 @@ our $VERSION = '0.0'; +our $DEFAULT_PORT = "4040"; + +sub normalise_noderef($) { + my ($noderef) = @_; + + my $cv = AE::cv; + my @res; + + $cv->begin (sub { + my %seen; + my @refs; + for (sort { $a->[0] <=> $b->[0] } @res) { + push @refs, $_->[1] unless $seen{$_->[1]}++ + } + shift->send (join ",", @refs); + }); + + $noderef = $DEFAULT_PORT unless length $noderef; + + my $idx; + for my $t (split /,/, $noderef) { + my $pri = ++$idx; + + #TODO: this should be outside normalise_noderef and in become_public + if ($t =~ /^\d*$/) { + require POSIX; + my $nodename = (POSIX::uname ())[1]; + + $cv->begin; + AnyEvent::Socket::resolve_sockaddr $nodename, $t || "aemp=$DEFAULT_PORT", "tcp", 0, undef, sub { + for (@_) { + my ($service, $host) = AnyEvent::Socket::unpack_sockaddr $_->[3]; + push @res, [ + $pri += 1e-5, + AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $host, $service + ]; + } + $cv->end; + }; + +# my (undef, undef, undef, undef, @ipv4) = gethostbyname $nodename; +# +# for (@ipv4) { +# push @res, [ +# $pri, +# AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $_, $t || $DEFAULT_PORT, +# ]; +# } + } else { + my ($host, $port) = AnyEvent::Socket::parse_hostport $t, "aemp=$DEFAULT_PORT" + or Carp::croak "$t: unparsable transport descriptor"; + + $cv->begin; + AnyEvent::Socket::resolve_sockaddr $host, $port, "tcp", 0, undef, sub { + for (@_) { + my ($service, $host) = AnyEvent::Socket::unpack_sockaddr $_->[3]; + push @res, [ + $pri += 1e-5, + AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $host, $service + ]; + } + $cv->end; + } + } + } + + $cv->end; + + $cv +} + sub new { my ($class, $noderef) = @_; @@ -82,13 +153,15 @@ Scalar::Util::weaken $self; unless (exists $self->{n_noderef}) { - (AnyEvent::MP::normalise_noderef ($self->{noderef}))->cb (sub { + return if $self->{n_noderef_}++; + (AnyEvent::MP::Node::normalise_noderef ($self->{noderef}))->cb (sub { $self or return; + delete $self->{n_noderef_}; my $noderef = shift->recv; $self->{n_noderef} = $noderef; - $AnyEvent::MP::NODE{$_} = $self + $AnyEvent::MP::Base::NODE{$_} = $self for split /,/, $noderef; $self->connect; @@ -107,7 +180,7 @@ my ($w, $g); - $w = AE::timer $AnyEvent::MP::CONNECT_TIMEOUT, 0, sub { + $w = AE::timer $AnyEvent::MP::Base::CONNECT_TIMEOUT, 0, sub { delete $self->{trial}{$endpoint}; }; $g = AnyEvent::MP::Transport::mp_connect @@ -125,7 +198,7 @@ delete $self->{retry}; } - $self->{next_connect} = AE::timer $AnyEvent::MP::CONNECT_INTERVAL, 0, sub { + $self->{next_connect} = AE::timer $AnyEvent::MP::Base::CONNECT_INTERVAL, 0, sub { $self->connect; }; } @@ -139,7 +212,7 @@ } sub send { - AnyEvent::MP::_inject ($_[1]); + AnyEvent::MP::Base::_inject ($_[1]); } =head1 SEE ALSO