--- AnyEvent-MP/MP/Node.pm 2009/08/04 18:33:30 1.8 +++ AnyEvent-MP/MP/Node.pm 2009/08/05 09:38:56 1.11 @@ -24,77 +24,6 @@ our $VERSION = '0.0'; -our $DEFAULT_PORT = "4040"; - -sub normalise_noderef($) { - my ($noderef) = @_; - - my $cv = AE::cv; - my @res; - - $cv->begin (sub { - my %seen; - my @refs; - for (sort { $a->[0] <=> $b->[0] } @res) { - push @refs, $_->[1] unless $seen{$_->[1]}++ - } - shift->send (join ",", @refs); - }); - - $noderef = $DEFAULT_PORT unless length $noderef; - - my $idx; - for my $t (split /,/, $noderef) { - my $pri = ++$idx; - - #TODO: this should be outside normalise_noderef and in become_public - if ($t =~ /^\d*$/) { - require POSIX; - my $nodename = (POSIX::uname ())[1]; - - $cv->begin; - AnyEvent::Socket::resolve_sockaddr $nodename, $t || "aemp=$DEFAULT_PORT", "tcp", 0, undef, sub { - for (@_) { - my ($service, $host) = AnyEvent::Socket::unpack_sockaddr $_->[3]; - push @res, [ - $pri += 1e-5, - AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $host, $service - ]; - } - $cv->end; - }; - -# my (undef, undef, undef, undef, @ipv4) = gethostbyname $nodename; -# -# for (@ipv4) { -# push @res, [ -# $pri, -# AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $_, $t || $DEFAULT_PORT, -# ]; -# } - } else { - my ($host, $port) = AnyEvent::Socket::parse_hostport $t, "aemp=$DEFAULT_PORT" - or Carp::croak "$t: unparsable transport descriptor"; - - $cv->begin; - AnyEvent::Socket::resolve_sockaddr $host, $port, "tcp", 0, undef, sub { - for (@_) { - my ($service, $host) = AnyEvent::Socket::unpack_sockaddr $_->[3]; - push @res, [ - $pri += 1e-5, - AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $host, $service - ]; - } - $cv->end; - } - } - } - - $cv->end; - - $cv -} - sub new { my ($class, $noderef) = @_; @@ -127,7 +56,7 @@ sub monitor { my ($self, $portid, $cb) = @_; - return $cb->("node failed conenction") + return $cb->(transport_error => "node unreachable") if $self->{failed}; my $list = $self->{lmon}{$portid} ||= []; @@ -201,31 +130,14 @@ Scalar::Util::weaken $self; - unless (exists $self->{n_noderef}) { - return if $self->{n_noderef_}++; - (AnyEvent::MP::Node::normalise_noderef ($self->{noderef}))->cb (sub { - $self or return; - delete $self->{n_noderef_}; - my $noderef = shift->recv; - - $self->{n_noderef} = $noderef; - - $AnyEvent::MP::Base::NODE{$_} = $self - for split /,/, $noderef; - - $self->connect; - }); - return; - } - - $self->{retry} ||= [split /,/, $self->{n_noderef}]; + $self->{retry} ||= [split /,/, $self->{noderef}]; my $endpoint = shift @{ $self->{retry} }; if (defined $endpoint) { $self->{trial}{$endpoint} ||= do { my ($host, $port) = AnyEvent::Socket::parse_hostport $endpoint - or return; + or return $AnyEvent::MP::Base::WARN->("$self->{noderef}: not a resolved node reference."); my ($w, $g); @@ -245,6 +157,7 @@ }; } else { delete $self->{retry}; + $self->fail (transport_error => $self->{noderef}, "unable to connect"); } $self->{next_connect} = AE::timer $AnyEvent::MP::Base::CONNECT_INTERVAL, 0, sub {