--- AnyEvent-MP/MP/Transport.pm 2009/08/06 10:21:48 1.25 +++ AnyEvent-MP/MP/Transport.pm 2009/08/09 00:31:40 1.27 @@ -24,7 +24,8 @@ use common::sense; -use Scalar::Util; +use Scalar::Util (); +use List::Util (); use MIME::Base64 (); use Storable (); use JSON::XS (); @@ -34,7 +35,7 @@ use AE (); use AnyEvent::Socket (); -use AnyEvent::Handle (); +use AnyEvent::Handle 4.92 (); use base Exporter::; @@ -114,6 +115,8 @@ =cut +sub LATENCY() { 3 } # assumed max. network latency + our @FRAMINGS = qw(json storable); # the framing types we accept and send, in order of preference our @AUTH_SND = qw(hmac_md6_64_256); # auth types we send our @AUTH_RCV = (@AUTH_SND, qw(cleartext)); # auth types we accept @@ -137,7 +140,8 @@ $arg{timeout} = 30 unless exists $arg{timeout}; - my $keepalive = (int $arg{timeout} * 0.75) || 1; + $arg{timeout} = 1 + LATENCY + if $arg{timeout} < 1 + LATENCY; my $secret = $arg{secret}; @@ -161,7 +165,7 @@ on_error => sub { $self->error ($_[2]); }, - timeout => $AnyEvent::MP::Base::CONNECT_TIMEOUT, + rtimeout => $AnyEvent::MP::Base::CONNECT_TIMEOUT, peername => delete $arg{peername}, ; @@ -172,11 +176,10 @@ $greeting_kv->{"tls"} = "1.0" if $arg{tls_ctx}; $greeting_kv->{provider} = "AE-$VERSION"; $greeting_kv->{peeraddr} = AnyEvent::Socket::format_hostport $self->{peerhost}, $self->{peerport}; - $greeting_kv->{maxidle} = $keepalive; + $greeting_kv->{timeout} = $arg{timeout}; # send greeting my $lgreeting1 = "aemp;$PROTOCOL_VERSION" - . ";$AnyEvent::MP::Base::UNIQ" . ";$self->{local_node}" . ";" . (join ",", @AUTH_RCV) . ";" . (join ",", @FRAMINGS) @@ -191,7 +194,7 @@ $self->{hdl}->push_read (line => sub { my $rgreeting1 = $_[1]; - my ($aemp, $version, $uniq, $rnode, $auths, $framings, @kv) = split /;/, $rgreeting1; + my ($aemp, $version, $rnode, $auths, $framings, @kv) = split /;/, $rgreeting1; if ($aemp ne "aemp") { return $self->error ("unparsable greeting"); @@ -223,7 +226,6 @@ defined $s_framing or return $self->error ("$framings: no common framing method supported"); - $self->{remote_uniq} = $uniq; $self->{remote_node} = $rnode; $self->{remote_greeting} = { @@ -274,16 +276,19 @@ $hdl->rbuf_max (undef); my $queue = delete $self->{queue}; # we are connected - $self->{hdl}->timeout ($self->{remote_greeting}{keepalive} + 5) - if $self->{remote_greeting}{keepalive}; + $self->{hdl}->rtimeout ($self->{remote_greeting}{timeout}); + $self->{hdl}->wtimeout ($arg{timeout} - LATENCY); + $self->{hdl}->on_wtimeout (sub { $self->send (["", "devnull"]) }); $self->connected; - my $src_node = $self->{node}; - + # send queued messages $self->send ($_) for @$queue; + # receive handling + my $src_node = $self->{node}; + my $rmsg; $rmsg = sub { $_[0]->push_read ($r_framing => $rmsg); @@ -394,16 +399,11 @@ versions don't match then no communication is possible. Minor extensions are supposed to be handled through additional key-value pairs. -=item a token uniquely identifying the current node instance - -This is a string that must change between restarts. It usually contains -things like the current time, the (OS) process id or similar values, but -no meaning of the contents are assumed. - =item the node endpoint descriptors for public nodes, this is a comma-separated list of protocol endpoints, -i.e., the noderef. For slave nodes, this is a unique identifier. +i.e., the noderef. For slave nodes, this is a unique identifier of the +form C. =item the acceptable authentication methods @@ -442,11 +442,12 @@ Indicates that the other side supports TLS (version should be 1.0) and wishes to do a TLS handshake. -=item maxidle= +=item timeout= -The maximum amount of time the node will not sent data, i.e., idle. This -can be used to close the conenction when no data has been received for a -too-long time (say, maxidle + 5 seconds). +The amount of time after which this node should be detected as dead unless +some data has been received. The node is responsible to send traffic +reasonably more often than this interval (such as every timeout minus five +seconds). =back