--- AnyEvent-MP/MP/Node.pm 2009/08/01 07:11:45 1.2 +++ AnyEvent-MP/MP/Node.pm 2009/08/09 16:08:16 1.16 @@ -15,14 +15,13 @@ use common::sense; use AE (); +use AnyEvent::Util (); use AnyEvent::Socket (); use AnyEvent::MP::Transport (); use base Exporter::; -our $VERSION = '0.0'; - sub new { my ($class, $noderef) = @_; @@ -36,8 +35,6 @@ sub send { my ($self, $msg) = @_; - warn "send $self $self->{noderef}\n";#d# - if ($self->{transport}) { $self->{transport}->send ($msg); } elsif ($self->{queue}) { @@ -48,35 +45,68 @@ } } +sub kill { + my ($self, $port, @reason) = @_; + + $self->send (["", kil => $port, @reason]); +} + +sub monitor { + my ($self, $portid, $cb) = @_; + + my $list = $self->{lmon}{$portid} ||= []; + + $self->send (["", mon1 => $portid]) + unless @$list; + + push @$list, $cb; +} + +sub unmonitor { + my ($self, $portid, $cb) = @_; + + my $list = $self->{lmon}{$portid} + or return; + + @$list = grep $_ != $cb, @$list; + + unless (@$list) { + $self->send (["", mon0 => $portid]); + delete $self->{monitor}{$portid}; + } +} + sub set_transport { my ($self, $transport) = @_; + $self->clr_transport + if $self->{transport}; + delete $self->{trial}; + delete $self->{retry}; delete $self->{next_connect}; - use Data::Dumper; warn "set_transport $self->{noderef} $self $AnyEvent::MP::NODE{'10.0.0.1:4040'}\n";#d# - - if ( - exists $self->{remote_uniq} - && $self->{remote_uniq} ne $transport->{remote_uniq} - ) { - # uniq changed, drop queue - delete $self->{queue}; - #TODO: "DOWN" - } - - $self->{remote_uniq} = $transport->{remote_uniq}; - $self->{transport} = $transport; + $self->{transport} = $transport; $transport->send ($_) for @{ delete $self->{queue} || [] }; } +sub fail { + my ($self, @reason) = @_; + + delete $self->{queue}; + + if (my $mon = delete $self->{lmon}) { + $_->(@reason) for map @$_, values %$mon; + } +} + sub clr_transport { - my ($self) = @_; + my ($self, @reason) = @_; delete $self->{transport}; - warn "clr_transport\n"; + $self->connect; } sub connect { @@ -84,33 +114,18 @@ Scalar::Util::weaken $self; - unless (exists $self->{n_noderef}) { - (AnyEvent::MP::normalise_noderef ($self->{noderef}))->cb (sub { - $self or return; - my $noderef = shift->recv; - - $self->{n_noderef} = $noderef; - - $AnyEvent::MP::NODE{$_} = $self - for split /,/, $noderef; - - $self->connect; - }); - return; - } - - $self->{retry} ||= [split /,/, $self->{n_noderef}]; + $self->{retry} ||= [split /,/, $self->{noderef}]; my $endpoint = shift @{ $self->{retry} }; if (defined $endpoint) { $self->{trial}{$endpoint} ||= do { my ($host, $port) = AnyEvent::Socket::parse_hostport $endpoint - or return; + or return $AnyEvent::MP::Base::WARN->("$self->{noderef}: not a resolved node reference."); my ($w, $g); - $w = AE::timer $AnyEvent::MP::CONNECT_TIMEOUT, 0, sub { + $w = AE::timer $AnyEvent::MP::Base::CONNECT_TIMEOUT, 0, sub { delete $self->{trial}{$endpoint}; }; $g = AnyEvent::MP::Transport::mp_connect @@ -125,24 +140,64 @@ [$w, \$g] }; } else { - delete $self->{retry}; + $self->fail (transport_error => $self->{noderef}, "unable to connect"); } - $self->{next_connect} = AE::timer $AnyEvent::MP::CONNECT_INTERVAL, 0, sub { + $self->{next_connect} = AE::timer $AnyEvent::MP::Base::CONNECT_INTERVAL, 0, sub { + delete $self->{retry}; $self->connect; }; } +package AnyEvent::MP::Node::Slave; + +use base "AnyEvent::MP::Node::Direct"; + +sub connect { + my ($self) = @_; + + $self->fail (transport_error => $self->{noderef}, "unable to connect to slave node"); +} + package AnyEvent::MP::Node::Self; use base "AnyEvent::MP::Node"; sub set_transport { - die "FATAL error, set_transport was called"; + Carp::confess "FATAL error, set_transport was called on local node"; } sub send { - AnyEvent::MP::_inject ($_[1]); + local $AnyEvent::MP::Base::SRCNODE = $_[0]; + AnyEvent::MP::Base::_inject (@{ $_[1] }); +} + +sub kill { + my ($self, $port, @reason) = @_; + + delete $AnyEvent::MP::Base::PORT{$port}; + delete $AnyEvent::MP::Base::PORT_DATA{$port}; + + my $mon = delete $AnyEvent::MP::Base::LMON{$port} + or !@reason + or $AnyEvent::MP::Base::WARN->("unmonitored local port $port died with reason: @reason"); + + $_->(@reason) for values %$mon; +} + +sub monitor { + my ($self, $portid, $cb) = @_; + + return $cb->(no_such_port => "cannot monitor nonexistent port") + unless exists $AnyEvent::MP::Base::PORT{$portid}; + + $AnyEvent::MP::Base::LMON{$portid}{$cb+0} = $cb; +} + +sub unmonitor { + my ($self, $portid, $cb) = @_; + + delete $AnyEvent::MP::Base::LMON{$portid}{$cb+0}; } =head1 SEE ALSO