--- AnyEvent-MP/MP/Node.pm 2009/08/04 14:10:51 1.7 +++ AnyEvent-MP/MP/Node.pm 2009/08/05 09:38:56 1.11 @@ -24,77 +24,6 @@ our $VERSION = '0.0'; -our $DEFAULT_PORT = "4040"; - -sub normalise_noderef($) { - my ($noderef) = @_; - - my $cv = AE::cv; - my @res; - - $cv->begin (sub { - my %seen; - my @refs; - for (sort { $a->[0] <=> $b->[0] } @res) { - push @refs, $_->[1] unless $seen{$_->[1]}++ - } - shift->send (join ",", @refs); - }); - - $noderef = $DEFAULT_PORT unless length $noderef; - - my $idx; - for my $t (split /,/, $noderef) { - my $pri = ++$idx; - - #TODO: this should be outside normalise_noderef and in become_public - if ($t =~ /^\d*$/) { - require POSIX; - my $nodename = (POSIX::uname ())[1]; - - $cv->begin; - AnyEvent::Socket::resolve_sockaddr $nodename, $t || "aemp=$DEFAULT_PORT", "tcp", 0, undef, sub { - for (@_) { - my ($service, $host) = AnyEvent::Socket::unpack_sockaddr $_->[3]; - push @res, [ - $pri += 1e-5, - AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $host, $service - ]; - } - $cv->end; - }; - -# my (undef, undef, undef, undef, @ipv4) = gethostbyname $nodename; -# -# for (@ipv4) { -# push @res, [ -# $pri, -# AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $_, $t || $DEFAULT_PORT, -# ]; -# } - } else { - my ($host, $port) = AnyEvent::Socket::parse_hostport $t, "aemp=$DEFAULT_PORT" - or Carp::croak "$t: unparsable transport descriptor"; - - $cv->begin; - AnyEvent::Socket::resolve_sockaddr $host, $port, "tcp", 0, undef, sub { - for (@_) { - my ($service, $host) = AnyEvent::Socket::unpack_sockaddr $_->[3]; - push @res, [ - $pri += 1e-5, - AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $host, $service - ]; - } - $cv->end; - } - } - } - - $cv->end; - - $cv -} - sub new { my ($class, $noderef) = @_; @@ -122,19 +51,12 @@ my ($self, $port, @reason) = @_; $self->send (["", kil => $port, @reason]); - -# delete $AnyEvent::MP::Base::PORT{$port}; - -# my $mon = delete $AnyEvent::MP::Base::LMON{$port} -# or return; - -# $_->(@reason) for values %$mon; } sub monitor { my ($self, $portid, $cb) = @_; - return $cb->() + return $cb->(transport_error => "node unreachable") if $self->{failed}; my $list = $self->{lmon}{$portid} ||= []; @@ -165,19 +87,18 @@ $self->clr_transport if $self->{transport}; - delete $self->{trial}; - delete $self->{next_connect}; - delete $self->{failed}; - if ( exists $self->{remote_uniq} && $self->{remote_uniq} ne $transport->{remote_uniq} ) { - # uniq changed, drop queue - delete $self->{queue}; - #TODO: "DOWN" + # uniq changed, different node + $self->fail ("node restart detected"); } + delete $self->{trial}; + delete $self->{next_connect}; + delete $self->{failed}; + $self->{remote_uniq} = $transport->{remote_uniq}; $self->{transport} = $transport; @@ -185,16 +106,22 @@ for @{ delete $self->{queue} || [] }; } -sub clr_transport { - my ($self) = @_; +sub fail { + my ($self, @reason) = @_; + + delete $self->{queue}; - delete $self->{transport}; $self->{failed} = 1; - if (my $mon = delete $self->{monitor}) { - $_->() for map @$_, values %$mon; + if (my $mon = delete $self->{lmon}) { + $_->(@reason) for map @$_, values %$mon; } +} + +sub clr_transport { + my ($self, @reason) = @_; + delete $self->{transport}; $self->connect; } @@ -203,31 +130,14 @@ Scalar::Util::weaken $self; - unless (exists $self->{n_noderef}) { - return if $self->{n_noderef_}++; - (AnyEvent::MP::Node::normalise_noderef ($self->{noderef}))->cb (sub { - $self or return; - delete $self->{n_noderef_}; - my $noderef = shift->recv; - - $self->{n_noderef} = $noderef; - - $AnyEvent::MP::Base::NODE{$_} = $self - for split /,/, $noderef; - - $self->connect; - }); - return; - } - - $self->{retry} ||= [split /,/, $self->{n_noderef}]; + $self->{retry} ||= [split /,/, $self->{noderef}]; my $endpoint = shift @{ $self->{retry} }; if (defined $endpoint) { $self->{trial}{$endpoint} ||= do { my ($host, $port) = AnyEvent::Socket::parse_hostport $endpoint - or return; + or return $AnyEvent::MP::Base::WARN->("$self->{noderef}: not a resolved node reference."); my ($w, $g); @@ -247,6 +157,7 @@ }; } else { delete $self->{retry}; + $self->fail (transport_error => $self->{noderef}, "unable to connect"); } $self->{next_connect} = AE::timer $AnyEvent::MP::Base::CONNECT_INTERVAL, 0, sub { @@ -271,9 +182,11 @@ my ($self, $port, @reason) = @_; delete $AnyEvent::MP::Base::PORT{$port}; + delete $AnyEvent::MP::Base::PORT_DATA{$port}; my $mon = delete $AnyEvent::MP::Base::LMON{$port} - or return; + or !@reason + or $AnyEvent::MP::Base::WARN->("unmonitored local port $port died with reason: @reason"); $_->(@reason) for values %$mon; }