--- AnyEvent-MP/MP/Node.pm 2009/08/02 14:44:37 1.5 +++ AnyEvent-MP/MP/Node.pm 2009/08/04 14:10:51 1.7 @@ -15,6 +15,7 @@ use common::sense; use AE (); +use AnyEvent::Util (); use AnyEvent::Socket (); use AnyEvent::MP::Transport (); @@ -117,11 +118,56 @@ } } +sub kill { + my ($self, $port, @reason) = @_; + + $self->send (["", kil => $port, @reason]); + +# delete $AnyEvent::MP::Base::PORT{$port}; + +# my $mon = delete $AnyEvent::MP::Base::LMON{$port} +# or return; + +# $_->(@reason) for values %$mon; +} + +sub monitor { + my ($self, $portid, $cb) = @_; + + return $cb->() + if $self->{failed}; + + my $list = $self->{lmon}{$portid} ||= []; + + $self->send (["", mon1 => $portid]) + unless @$list; + + push @$list, $cb; +} + +sub unmonitor { + my ($self, $portid, $cb) = @_; + + my $list = $self->{lmon}{$portid} + or return; + + @$list = grep $_ != $cb, @$list; + + unless (@$list) { + $self->send (["", mon0 => $portid]); + delete $self->{monitor}{$portid}; + } +} + sub set_transport { my ($self, $transport) = @_; + $self->clr_transport + if $self->{transport}; + delete $self->{trial}; delete $self->{next_connect}; + delete $self->{failed}; if ( exists $self->{remote_uniq} @@ -143,6 +189,11 @@ my ($self) = @_; delete $self->{transport}; + $self->{failed} = 1; + + if (my $mon = delete $self->{monitor}) { + $_->() for map @$_, values %$mon; + } $self->connect; } @@ -212,7 +263,34 @@ } sub send { - AnyEvent::MP::Base::_inject ($_[1]); + local $AnyEvent::MP::Base::SRCNODE = $_[0]; + AnyEvent::MP::Base::_inject (@{ $_[1] }); +} + +sub kill { + my ($self, $port, @reason) = @_; + + delete $AnyEvent::MP::Base::PORT{$port}; + + my $mon = delete $AnyEvent::MP::Base::LMON{$port} + or return; + + $_->(@reason) for values %$mon; +} + +sub monitor { + my ($self, $portid, $cb) = @_; + + return $cb->() + unless exists $AnyEvent::MP::Base::PORT{$portid}; + + $AnyEvent::MP::Base::LMON{$portid}{$cb+0} = $cb; +} + +sub unmonitor { + my ($self, $portid, $cb) = @_; + + delete $AnyEvent::MP::Base::LMON{$portid}{$cb+0}; } =head1 SEE ALSO