--- AnyEvent-MP/MP/Node.pm 2009/08/03 21:35:03 1.6 +++ AnyEvent-MP/MP/Node.pm 2009/08/09 16:08:16 1.16 @@ -22,79 +22,6 @@ use base Exporter::; -our $VERSION = '0.0'; - -our $DEFAULT_PORT = "4040"; - -sub normalise_noderef($) { - my ($noderef) = @_; - - my $cv = AE::cv; - my @res; - - $cv->begin (sub { - my %seen; - my @refs; - for (sort { $a->[0] <=> $b->[0] } @res) { - push @refs, $_->[1] unless $seen{$_->[1]}++ - } - shift->send (join ",", @refs); - }); - - $noderef = $DEFAULT_PORT unless length $noderef; - - my $idx; - for my $t (split /,/, $noderef) { - my $pri = ++$idx; - - #TODO: this should be outside normalise_noderef and in become_public - if ($t =~ /^\d*$/) { - require POSIX; - my $nodename = (POSIX::uname ())[1]; - - $cv->begin; - AnyEvent::Socket::resolve_sockaddr $nodename, $t || "aemp=$DEFAULT_PORT", "tcp", 0, undef, sub { - for (@_) { - my ($service, $host) = AnyEvent::Socket::unpack_sockaddr $_->[3]; - push @res, [ - $pri += 1e-5, - AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $host, $service - ]; - } - $cv->end; - }; - -# my (undef, undef, undef, undef, @ipv4) = gethostbyname $nodename; -# -# for (@ipv4) { -# push @res, [ -# $pri, -# AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $_, $t || $DEFAULT_PORT, -# ]; -# } - } else { - my ($host, $port) = AnyEvent::Socket::parse_hostport $t, "aemp=$DEFAULT_PORT" - or Carp::croak "$t: unparsable transport descriptor"; - - $cv->begin; - AnyEvent::Socket::resolve_sockaddr $host, $port, "tcp", 0, undef, sub { - for (@_) { - my ($service, $host) = AnyEvent::Socket::unpack_sockaddr $_->[3]; - push @res, [ - $pri += 1e-5, - AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $host, $service - ]; - } - $cv->end; - } - } - } - - $cv->end; - - $cv -} - sub new { my ($class, $noderef) = @_; @@ -118,12 +45,15 @@ } } +sub kill { + my ($self, $port, @reason) = @_; + + $self->send (["", kil => $port, @reason]); +} + sub monitor { my ($self, $portid, $cb) = @_; - return $cb->() - if $self->{failed}; - my $list = $self->{lmon}{$portid} ||= []; $self->send (["", mon1 => $portid]) @@ -153,35 +83,29 @@ if $self->{transport}; delete $self->{trial}; + delete $self->{retry}; delete $self->{next_connect}; - delete $self->{failed}; - - if ( - exists $self->{remote_uniq} - && $self->{remote_uniq} ne $transport->{remote_uniq} - ) { - # uniq changed, drop queue - delete $self->{queue}; - #TODO: "DOWN" - } - $self->{remote_uniq} = $transport->{remote_uniq}; - $self->{transport} = $transport; + $self->{transport} = $transport; $transport->send ($_) for @{ delete $self->{queue} || [] }; } -sub clr_transport { - my ($self) = @_; +sub fail { + my ($self, @reason) = @_; - delete $self->{transport}; - $self->{failed} = 1; + delete $self->{queue}; - if (my $mon = delete $self->{monitor}) { - $_->() for map @$_, values %$mon; + if (my $mon = delete $self->{lmon}) { + $_->(@reason) for map @$_, values %$mon; } +} +sub clr_transport { + my ($self, @reason) = @_; + + delete $self->{transport}; $self->connect; } @@ -190,31 +114,14 @@ Scalar::Util::weaken $self; - unless (exists $self->{n_noderef}) { - return if $self->{n_noderef_}++; - (AnyEvent::MP::Node::normalise_noderef ($self->{noderef}))->cb (sub { - $self or return; - delete $self->{n_noderef_}; - my $noderef = shift->recv; - - $self->{n_noderef} = $noderef; - - $AnyEvent::MP::Base::NODE{$_} = $self - for split /,/, $noderef; - - $self->connect; - }); - return; - } - - $self->{retry} ||= [split /,/, $self->{n_noderef}]; + $self->{retry} ||= [split /,/, $self->{noderef}]; my $endpoint = shift @{ $self->{retry} }; if (defined $endpoint) { $self->{trial}{$endpoint} ||= do { my ($host, $port) = AnyEvent::Socket::parse_hostport $endpoint - or return; + or return $AnyEvent::MP::Base::WARN->("$self->{noderef}: not a resolved node reference."); my ($w, $g); @@ -233,20 +140,31 @@ [$w, \$g] }; } else { - delete $self->{retry}; + $self->fail (transport_error => $self->{noderef}, "unable to connect"); } $self->{next_connect} = AE::timer $AnyEvent::MP::Base::CONNECT_INTERVAL, 0, sub { + delete $self->{retry}; $self->connect; }; } +package AnyEvent::MP::Node::Slave; + +use base "AnyEvent::MP::Node::Direct"; + +sub connect { + my ($self) = @_; + + $self->fail (transport_error => $self->{noderef}, "unable to connect to slave node"); +} + package AnyEvent::MP::Node::Self; use base "AnyEvent::MP::Node"; sub set_transport { - die "FATAL error, set_transport was called"; + Carp::confess "FATAL error, set_transport was called on local node"; } sub send { @@ -254,10 +172,23 @@ AnyEvent::MP::Base::_inject (@{ $_[1] }); } +sub kill { + my ($self, $port, @reason) = @_; + + delete $AnyEvent::MP::Base::PORT{$port}; + delete $AnyEvent::MP::Base::PORT_DATA{$port}; + + my $mon = delete $AnyEvent::MP::Base::LMON{$port} + or !@reason + or $AnyEvent::MP::Base::WARN->("unmonitored local port $port died with reason: @reason"); + + $_->(@reason) for values %$mon; +} + sub monitor { my ($self, $portid, $cb) = @_; - return $cb->() + return $cb->(no_such_port => "cannot monitor nonexistent port") unless exists $AnyEvent::MP::Base::PORT{$portid}; $AnyEvent::MP::Base::LMON{$portid}{$cb+0} = $cb;