--- AnyEvent-MP/MP/Node.pm 2009/08/01 07:11:45 1.2 +++ AnyEvent-MP/MP/Node.pm 2009/08/04 23:35:51 1.10 @@ -15,6 +15,7 @@ use common::sense; use AE (); +use AnyEvent::Util (); use AnyEvent::Socket (); use AnyEvent::MP::Transport (); @@ -36,8 +37,6 @@ sub send { my ($self, $msg) = @_; - warn "send $self $self->{noderef}\n";#d# - if ($self->{transport}) { $self->{transport}->send ($msg); } elsif ($self->{queue}) { @@ -48,23 +47,58 @@ } } +sub kill { + my ($self, $port, @reason) = @_; + + $self->send (["", kil => $port, @reason]); +} + +sub monitor { + my ($self, $portid, $cb) = @_; + + return $cb->("node failed conenction") + if $self->{failed}; + + my $list = $self->{lmon}{$portid} ||= []; + + $self->send (["", mon1 => $portid]) + unless @$list; + + push @$list, $cb; +} + +sub unmonitor { + my ($self, $portid, $cb) = @_; + + my $list = $self->{lmon}{$portid} + or return; + + @$list = grep $_ != $cb, @$list; + + unless (@$list) { + $self->send (["", mon0 => $portid]); + delete $self->{monitor}{$portid}; + } +} + sub set_transport { my ($self, $transport) = @_; - delete $self->{trial}; - delete $self->{next_connect}; - - use Data::Dumper; warn "set_transport $self->{noderef} $self $AnyEvent::MP::NODE{'10.0.0.1:4040'}\n";#d# + $self->clr_transport + if $self->{transport}; if ( exists $self->{remote_uniq} && $self->{remote_uniq} ne $transport->{remote_uniq} ) { - # uniq changed, drop queue - delete $self->{queue}; - #TODO: "DOWN" + # uniq changed, different node + $self->fail ("node restart detected"); } + delete $self->{trial}; + delete $self->{next_connect}; + delete $self->{failed}; + $self->{remote_uniq} = $transport->{remote_uniq}; $self->{transport} = $transport; @@ -72,11 +106,23 @@ for @{ delete $self->{queue} || [] }; } +sub fail { + my ($self, @reason) = @_; + + delete $self->{queue}; + + $self->{failed} = 1; + + if (my $mon = delete $self->{lmon}) { + $_->(@reason) for map @$_, values %$mon; + } +} + sub clr_transport { - my ($self) = @_; + my ($self, @reason) = @_; delete $self->{transport}; - warn "clr_transport\n"; + $self->connect; } sub connect { @@ -84,33 +130,18 @@ Scalar::Util::weaken $self; - unless (exists $self->{n_noderef}) { - (AnyEvent::MP::normalise_noderef ($self->{noderef}))->cb (sub { - $self or return; - my $noderef = shift->recv; - - $self->{n_noderef} = $noderef; - - $AnyEvent::MP::NODE{$_} = $self - for split /,/, $noderef; - - $self->connect; - }); - return; - } - - $self->{retry} ||= [split /,/, $self->{n_noderef}]; + $self->{retry} ||= [split /,/, $self->{noderef}]; my $endpoint = shift @{ $self->{retry} }; if (defined $endpoint) { $self->{trial}{$endpoint} ||= do { my ($host, $port) = AnyEvent::Socket::parse_hostport $endpoint - or return; + or return $AnyEvent::MP::Base::WARN->("$self->{noderef}: not a resolved node reference."); my ($w, $g); - $w = AE::timer $AnyEvent::MP::CONNECT_TIMEOUT, 0, sub { + $w = AE::timer $AnyEvent::MP::Base::CONNECT_TIMEOUT, 0, sub { delete $self->{trial}{$endpoint}; }; $g = AnyEvent::MP::Transport::mp_connect @@ -126,9 +157,10 @@ }; } else { delete $self->{retry}; + $self->fail (transport_error => $self->{noderef}, "unable to connect"); } - $self->{next_connect} = AE::timer $AnyEvent::MP::CONNECT_INTERVAL, 0, sub { + $self->{next_connect} = AE::timer $AnyEvent::MP::Base::CONNECT_INTERVAL, 0, sub { $self->connect; }; } @@ -142,7 +174,36 @@ } sub send { - AnyEvent::MP::_inject ($_[1]); + local $AnyEvent::MP::Base::SRCNODE = $_[0]; + AnyEvent::MP::Base::_inject (@{ $_[1] }); +} + +sub kill { + my ($self, $port, @reason) = @_; + + delete $AnyEvent::MP::Base::PORT{$port}; + delete $AnyEvent::MP::Base::PORT_DATA{$port}; + + my $mon = delete $AnyEvent::MP::Base::LMON{$port} + or !@reason + or $AnyEvent::MP::Base::WARN->("unmonitored local port $port died with reason: @reason"); + + $_->(@reason) for values %$mon; +} + +sub monitor { + my ($self, $portid, $cb) = @_; + + return $cb->() + unless exists $AnyEvent::MP::Base::PORT{$portid}; + + $AnyEvent::MP::Base::LMON{$portid}{$cb+0} = $cb; +} + +sub unmonitor { + my ($self, $portid, $cb) = @_; + + delete $AnyEvent::MP::Base::LMON{$portid}{$cb+0}; } =head1 SEE ALSO