--- AnyEvent-MP/MP/Node.pm 2009/08/10 01:37:19 1.17 +++ AnyEvent-MP/MP/Node.pm 2009/08/12 21:39:59 1.18 @@ -21,164 +21,173 @@ use AnyEvent::MP::Transport (); sub new { - my ($class, $noderef) = @_; + my ($self, $noderef) = @_; - bless { noderef => $noderef }, $class -} + $self = bless { noderef => $noderef }, $self; -package AnyEvent::MP::Node::Direct; + $self->transport_reset; -use base "AnyEvent::MP::Node"; + $self +} sub send { - my ($self, $msg) = @_; - - if ($self->{transport}) { - $self->{transport}->send ($msg); - } elsif ($self->{queue}) { - push @{ $self->{queue} }, $msg; - } else { - $self->{queue} = [$msg]; - $self->connect; - } + &{$_[0]->{send}} } -sub kill { - my ($self, $port, @reason) = @_; - - $self->send (["", kil => $port, @reason]); -} +package AnyEvent::MP::Node::External; -sub monitor { - my ($self, $portid, $cb) = @_; +use base "AnyEvent::MP::Node"; - my $list = $self->{lmon}{$portid} ||= []; +# called at init time, mostly sets {send} +sub transport_reset { + my ($self) = @_; - $self->send (["", mon1 => $portid]) - unless @$list; + delete $self->{transport}; - push @$list, $cb; + $self->{send} = sub { + push @{$_[0]{queue}}, $_[1]; + $_[0]->connect; + }; } -sub unmonitor { - my ($self, $portid, $cb) = @_; - - my $list = $self->{lmon}{$portid} - or return; +# called only after successful handshake +sub transport_error { + my ($self, @reason) = @_; - @$list = grep $_ != $cb, @$list; + delete $self->{queue}; + $self->transport_reset; - unless (@$list) { - $self->send (["", mon0 => $portid]); - delete $self->{monitor}{$portid}; + if (my $mon = delete $self->{lmon}) { + $_->(@reason) for map @$_, values %$mon; } } -sub set_transport { +# called after handshake was successful +sub transport_connect { my ($self, $transport) = @_; - $self->clr_transport + $self->transport_reset if $self->{transport}; - delete $self->{trial}; - delete $self->{retry}; - delete $self->{next_connect}; + delete $self->{connect_w}; + delete $self->{connect_to}; $self->{transport} = $transport; + $self->{send} = sub { + $transport->send ($_[1]); + }; + $transport->send ($_) for @{ delete $self->{queue} || [] }; } -sub fail { - my ($self, @reason) = @_; +sub connect { + my ($self) = @_; - delete $self->{queue}; + Scalar::Util::weaken $self; - if (my $mon = delete $self->{lmon}) { - $_->(@reason) for map @$_, values %$mon; + $self->{connect_to} ||= AE::timer + $AnyEvent::MP::Config::CFG{monitor_timeout} || $AnyEvent::MP::Kernel::MONITOR_TIMEOUT, + 0, + sub { + $self->transport_error (transport_error => $self->{noderef}, "unable to connect"); + }; + + unless ($self->{connect_w}) { + my @endpoints; + my %trial; + + $self->{connect_w} = AE::timer + 0, + $AnyEvent::MP::Config::CFG{connect_interval} || $AnyEvent::MP::Kernel::CONNECT_INTERVAL, + sub { + @endpoints = split /,/, $self->{noderef} + unless @endpoints; + + my $endpoint = shift @endpoints; + + $trial{$endpoint} ||= do { + my ($host, $port) = AnyEvent::Socket::parse_hostport $endpoint + or return $AnyEvent::MP::Kernel::WARN->("$self->{noderef}: not a resolved node reference."); + + AnyEvent::MP::Transport::mp_connect + $host, $port, + sub { delete $trial{$endpoint} } + ; + }; + } + ; } } -sub clr_transport { - my ($self, @reason) = @_; +sub kill { + my ($self, $port, @reason) = @_; - delete $self->{transport}; - $self->connect; + $self->send (["", kil => $port, @reason]); } -sub connect { - my ($self) = @_; +sub monitor { + my ($self, $portid, $cb) = @_; - Scalar::Util::weaken $self; + my $list = $self->{lmon}{$portid} ||= []; - $self->{retry} ||= [split /,/, $self->{noderef}]; + $self->send (["", mon1 => $portid]) + unless @$list; - my $endpoint = shift @{ $self->{retry} }; + push @$list, $cb; +} - if (defined $endpoint) { - $self->{trial}{$endpoint} ||= do { - my ($host, $port) = AnyEvent::Socket::parse_hostport $endpoint - or return $AnyEvent::MP::Base::WARN->("$self->{noderef}: not a resolved node reference."); - - my ($w, $g); - - $w = AE::timer $AnyEvent::MP::Base::CONNECT_TIMEOUT, 0, sub { - delete $self->{trial}{$endpoint}; - }; - $g = AnyEvent::MP::Transport::mp_connect - $host, $port, - sub { - delete $self->{trial}{$endpoint} - unless @_; - $g = shift; - }; - ; +sub unmonitor { + my ($self, $portid, $cb) = @_; - [$w, \$g] - }; - } else { - $self->fail (transport_error => $self->{noderef}, "unable to connect"); - } + my $list = $self->{lmon}{$portid} + or return; - $self->{next_connect} = AE::timer $AnyEvent::MP::Base::CONNECT_INTERVAL, 0, sub { - delete $self->{retry}; - $self->connect; - }; + @$list = grep $_ != $cb, @$list; + + unless (@$list) { + $self->send (["", mon0 => $portid]); + delete $self->{monitor}{$portid}; + } } -package AnyEvent::MP::Node::Slave; +package AnyEvent::MP::Node::Direct; + +use base "AnyEvent::MP::Node::External"; + +package AnyEvent::MP::Node::Indirect; use base "AnyEvent::MP::Node::Direct"; sub connect { my ($self) = @_; - $self->fail (transport_error => $self->{noderef}, "unable to connect to slave node"); + $self->transport_error (transport_error => $self->{noderef}, "unable to connect to indirect node"); } package AnyEvent::MP::Node::Self; use base "AnyEvent::MP::Node"; -sub set_transport { - Carp::confess "FATAL error, set_transport was called on local node"; -} +sub transport_reset { + my ($self) = @_; -sub send { - local $AnyEvent::MP::Base::SRCNODE = $_[0]; - AnyEvent::MP::Base::_inject (@{ $_[1] }); + $self->{send} = sub { + local $AnyEvent::MP::Kernel::SRCNODE = $_[0]; + AnyEvent::MP::Kernel::_inject (@{ $_[1] }); + }; } sub kill { my ($self, $port, @reason) = @_; - delete $AnyEvent::MP::Base::PORT{$port}; - delete $AnyEvent::MP::Base::PORT_DATA{$port}; + delete $AnyEvent::MP::Kernel::PORT{$port}; + delete $AnyEvent::MP::Kernel::PORT_DATA{$port}; - my $mon = delete $AnyEvent::MP::Base::LMON{$port} + my $mon = delete $AnyEvent::MP::Kernel::LMON{$port} or !@reason - or $AnyEvent::MP::Base::WARN->("unmonitored local port $port died with reason: @reason"); + or $AnyEvent::MP::Kernel::WARN->("unmonitored local port $port died with reason: @reason"); $_->(@reason) for values %$mon; } @@ -187,15 +196,15 @@ my ($self, $portid, $cb) = @_; return $cb->(no_such_port => "cannot monitor nonexistent port") - unless exists $AnyEvent::MP::Base::PORT{$portid}; + unless exists $AnyEvent::MP::Kernel::PORT{$portid}; - $AnyEvent::MP::Base::LMON{$portid}{$cb+0} = $cb; + $AnyEvent::MP::Kernel::LMON{$portid}{$cb+0} = $cb; } sub unmonitor { my ($self, $portid, $cb) = @_; - delete $AnyEvent::MP::Base::LMON{$portid}{$cb+0}; + delete $AnyEvent::MP::Kernel::LMON{$portid}{$cb+0}; } =head1 SEE ALSO