--- AnyEvent-MP/MP/Node.pm 2009/08/01 07:11:45 1.2 +++ AnyEvent-MP/MP/Node.pm 2009/08/02 14:44:37 1.5 @@ -23,6 +23,77 @@ our $VERSION = '0.0'; +our $DEFAULT_PORT = "4040"; + +sub normalise_noderef($) { + my ($noderef) = @_; + + my $cv = AE::cv; + my @res; + + $cv->begin (sub { + my %seen; + my @refs; + for (sort { $a->[0] <=> $b->[0] } @res) { + push @refs, $_->[1] unless $seen{$_->[1]}++ + } + shift->send (join ",", @refs); + }); + + $noderef = $DEFAULT_PORT unless length $noderef; + + my $idx; + for my $t (split /,/, $noderef) { + my $pri = ++$idx; + + #TODO: this should be outside normalise_noderef and in become_public + if ($t =~ /^\d*$/) { + require POSIX; + my $nodename = (POSIX::uname ())[1]; + + $cv->begin; + AnyEvent::Socket::resolve_sockaddr $nodename, $t || "aemp=$DEFAULT_PORT", "tcp", 0, undef, sub { + for (@_) { + my ($service, $host) = AnyEvent::Socket::unpack_sockaddr $_->[3]; + push @res, [ + $pri += 1e-5, + AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $host, $service + ]; + } + $cv->end; + }; + +# my (undef, undef, undef, undef, @ipv4) = gethostbyname $nodename; +# +# for (@ipv4) { +# push @res, [ +# $pri, +# AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $_, $t || $DEFAULT_PORT, +# ]; +# } + } else { + my ($host, $port) = AnyEvent::Socket::parse_hostport $t, "aemp=$DEFAULT_PORT" + or Carp::croak "$t: unparsable transport descriptor"; + + $cv->begin; + AnyEvent::Socket::resolve_sockaddr $host, $port, "tcp", 0, undef, sub { + for (@_) { + my ($service, $host) = AnyEvent::Socket::unpack_sockaddr $_->[3]; + push @res, [ + $pri += 1e-5, + AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $host, $service + ]; + } + $cv->end; + } + } + } + + $cv->end; + + $cv +} + sub new { my ($class, $noderef) = @_; @@ -36,8 +107,6 @@ sub send { my ($self, $msg) = @_; - warn "send $self $self->{noderef}\n";#d# - if ($self->{transport}) { $self->{transport}->send ($msg); } elsif ($self->{queue}) { @@ -54,8 +123,6 @@ delete $self->{trial}; delete $self->{next_connect}; - use Data::Dumper; warn "set_transport $self->{noderef} $self $AnyEvent::MP::NODE{'10.0.0.1:4040'}\n";#d# - if ( exists $self->{remote_uniq} && $self->{remote_uniq} ne $transport->{remote_uniq} @@ -76,7 +143,8 @@ my ($self) = @_; delete $self->{transport}; - warn "clr_transport\n"; + + $self->connect; } sub connect { @@ -85,13 +153,15 @@ Scalar::Util::weaken $self; unless (exists $self->{n_noderef}) { - (AnyEvent::MP::normalise_noderef ($self->{noderef}))->cb (sub { + return if $self->{n_noderef_}++; + (AnyEvent::MP::Node::normalise_noderef ($self->{noderef}))->cb (sub { $self or return; + delete $self->{n_noderef_}; my $noderef = shift->recv; $self->{n_noderef} = $noderef; - $AnyEvent::MP::NODE{$_} = $self + $AnyEvent::MP::Base::NODE{$_} = $self for split /,/, $noderef; $self->connect; @@ -110,7 +180,7 @@ my ($w, $g); - $w = AE::timer $AnyEvent::MP::CONNECT_TIMEOUT, 0, sub { + $w = AE::timer $AnyEvent::MP::Base::CONNECT_TIMEOUT, 0, sub { delete $self->{trial}{$endpoint}; }; $g = AnyEvent::MP::Transport::mp_connect @@ -128,7 +198,7 @@ delete $self->{retry}; } - $self->{next_connect} = AE::timer $AnyEvent::MP::CONNECT_INTERVAL, 0, sub { + $self->{next_connect} = AE::timer $AnyEvent::MP::Base::CONNECT_INTERVAL, 0, sub { $self->connect; }; } @@ -142,7 +212,7 @@ } sub send { - AnyEvent::MP::_inject ($_[1]); + AnyEvent::MP::Base::_inject ($_[1]); } =head1 SEE ALSO