--- AnyEvent-MP/MP/Transport.pm 2009/08/04 23:35:51 1.21 +++ AnyEvent-MP/MP/Transport.pm 2009/08/06 10:21:48 1.25 @@ -134,6 +134,11 @@ $arg{secret} = AnyEvent::MP::Base::default_secret () unless exists $arg{secret}; + $arg{timeout} = 30 + unless exists $arg{timeout}; + + my $keepalive = (int $arg{timeout} * 0.75) || 1; + my $secret = $arg{secret}; if ($secret =~ /-----BEGIN RSA PRIVATE KEY-----.*-----END RSA PRIVATE KEY-----.*-----BEGIN CERTIFICATE-----.*-----END CERTIFICATE-----/s) { @@ -156,19 +161,23 @@ on_error => sub { $self->error ($_[2]); }, + timeout => $AnyEvent::MP::Base::CONNECT_TIMEOUT, peername => delete $arg{peername}, ; my $greeting_kv = $self->{greeting} ||= {}; - $greeting_kv->{"tls"} = "1.0" - if $arg{tls_ctx}; + + $self->{local_node} = $AnyEvent::MP::Base::NODE; + + $greeting_kv->{"tls"} = "1.0" if $arg{tls_ctx}; $greeting_kv->{provider} = "AE-$VERSION"; $greeting_kv->{peeraddr} = AnyEvent::Socket::format_hostport $self->{peerhost}, $self->{peerport}; + $greeting_kv->{maxidle} = $keepalive; # send greeting my $lgreeting1 = "aemp;$PROTOCOL_VERSION" . ";$AnyEvent::MP::Base::UNIQ" - . ";$AnyEvent::MP::Base::NODE" + . ";$self->{local_node}" . ";" . (join ",", @AUTH_RCV) . ";" . (join ",", @FRAMINGS) . (join "", map ";$_=$greeting_kv->{$_}", keys %$greeting_kv); @@ -265,14 +274,17 @@ $hdl->rbuf_max (undef); my $queue = delete $self->{queue}; # we are connected + $self->{hdl}->timeout ($self->{remote_greeting}{keepalive} + 5) + if $self->{remote_greeting}{keepalive}; + $self->connected; my $src_node = $self->{node}; - $hdl->push_write ($self->{s_framing} => $_) + $self->send ($_) for @$queue; - my $rmsg; $rmsg = sub { + my $rmsg; $rmsg = sub { $_[0]->push_read ($r_framing => $rmsg); local $AnyEvent::MP::Base::SRCNODE = $src_node; @@ -302,6 +314,20 @@ sub connected { my ($self) = @_; + if (ref $AnyEvent::MP::Base::SLAVE) { + # first connect with a master node + my $via = $self->{remote_node}; + $via =~ s/,/!/g; + $AnyEvent::MP::Base::NODE .= "\@$via"; + $AnyEvent::MP::Base::NODE{$AnyEvent::MP::Base::NODE} = $AnyEvent::MP::Base::NODE{""}; + $AnyEvent::MP::Base::SLAVE->(); + } + + if ($self->{local_node} ne $AnyEvent::MP::Base::NODE) { + # node changed its name since first greeting + $self->send (["", iam => $AnyEvent::MP::Base::NODE]); + } + my $node = AnyEvent::MP::Base::add_node ($self->{remote_node}); Scalar::Util::weaken ($self->{node} = $node); $node->set_transport ($self); @@ -416,6 +442,12 @@ Indicates that the other side supports TLS (version should be 1.0) and wishes to do a TLS handshake. +=item maxidle= + +The maximum amount of time the node will not sent data, i.e., idle. This +can be used to close the conenction when no data has been received for a +too-long time (say, maxidle + 5 seconds). + =back =head3 Second Greeting Line