--- AnyEvent/lib/AnyEvent/Handle.pm 2008/05/27 06:23:15 1.42 +++ AnyEvent/lib/AnyEvent/Handle.pm 2008/05/31 13:38:01 1.51 @@ -8,7 +8,7 @@ use Scalar::Util (); use Carp (); use Fcntl (); -use Errno qw/EAGAIN EINTR/; +use Errno qw(EAGAIN EINTR); =head1 NAME @@ -16,7 +16,7 @@ =cut -our $VERSION = '0.04'; +our $VERSION = 4.1; =head1 SYNOPSIS @@ -93,7 +93,7 @@ called. On callback entrance, the value of C<$!> contains the operating system -error (or C, C or C). +error (or C, C, C or C). The callback should throw an exception. If it returns, then AnyEvent::Handle will C for you. @@ -122,6 +122,26 @@ To append to the write buffer, use the C<< ->push_write >> method. +=item timeout => $fractional_seconds + +If non-zero, then this enables an "inactivity" timeout: whenever this many +seconds pass without a successful read or write on the underlying file +handle, the C callback will be invoked (and if that one is +missing, an C error will be raised). + +Note that timeout processing is also active when you currently do not have +any outstanding read or write requests: If you plan to keep the connection +idle then you should disable the timout temporarily or ignore the timeout +in the C callback. + +Zero (the default) disables this timeout. + +=item on_timeout => $cb->($handle) + +Called whenever the inactivity timeout passes. If you return from this +callback, then the timeout will be reset as if some activity had happened, +so this condition is not fatal in any way. + =item rbuf_max => If defined, then a fatal error will be raised (with C<$!> set to C) @@ -137,7 +157,7 @@ =item read_size => The default read block size (the amount of bytes this module will try to read -on each [loop iteration). Default: C<4096>. +during each (loop iteration). Default: C<8192>. =item low_water_mark => @@ -204,10 +224,13 @@ $self->starttls (delete $self->{tls}, delete $self->{tls_ctx}); } - $self->on_eof (delete $self->{on_eof} ) if $self->{on_eof}; - $self->on_error (delete $self->{on_error}) if $self->{on_error}; +# $self->on_eof (delete $self->{on_eof} ) if $self->{on_eof}; # nop +# $self->on_error (delete $self->{on_error}) if $self->{on_error}; # nop +# $self->on_read (delete $self->{on_read} ) if $self->{on_read}; # nop $self->on_drain (delete $self->{on_drain}) if $self->{on_drain}; - $self->on_read (delete $self->{on_read} ) if $self->{on_read}; + + $self->{_activity} = AnyEvent->now; + $self->_timeout; $self->start_read; @@ -217,6 +240,7 @@ sub _shutdown { my ($self) = @_; + delete $self->{_tw}; delete $self->{_rw}; delete $self->{_ww}; delete $self->{fh}; @@ -264,6 +288,73 @@ $_[0]{on_eof} = $_[1]; } +=item $handle->on_timeout ($cb) + +Replace the current C callback, or disables the callback +(but not the timeout) if C<$cb> = C. See C constructor +argument. + +=cut + +sub on_timeout { + $_[0]{on_timeout} = $_[1]; +} + +############################################################################# + +=item $handle->timeout ($seconds) + +Configures (or disables) the inactivity timeout. + +=cut + +sub timeout { + my ($self, $timeout) = @_; + + $self->{timeout} = $timeout; + $self->_timeout; +} + +# reset the timeout watcher, as neccessary +# also check for time-outs +sub _timeout { + my ($self) = @_; + + if ($self->{timeout}) { + my $NOW = AnyEvent->now; + + # when would the timeout trigger? + my $after = $self->{_activity} + $self->{timeout} - $NOW; + + # now or in the past already? + if ($after <= 0) { + $self->{_activity} = $NOW; + + if ($self->{on_timeout}) { + $self->{on_timeout}($self); + } else { + $! = Errno::ETIMEDOUT; + $self->error; + } + + # callbakx could have changed timeout value, optimise + return unless $self->{timeout}; + + # calculate new after + $after = $self->{timeout}; + } + + Scalar::Util::weaken $self; + + $self->{_tw} ||= AnyEvent->timer (after => $after, cb => sub { + delete $self->{_tw}; + $self->_timeout; + }); + } else { + delete $self->{_tw}; + } +} + ############################################################################# =back @@ -318,6 +409,8 @@ if ($len >= 0) { substr $self->{wbuf}, 0, $len, ""; + $self->{_activity} = AnyEvent->now; + $self->{on_drain}($self) if $self->{low_water_mark} >= length $self->{wbuf} && $self->{on_drain}; @@ -354,7 +447,7 @@ } if ($self->{filter_w}) { - $self->{filter_w}->($self, \$_[0]); + $self->{filter_w}($self, \$_[0]); } else { $self->{wbuf} .= $_[0]; $self->_drain_wbuf; @@ -570,11 +663,8 @@ } } - if ($self->{_eof}) { - $self->_shutdown; - $self->{on_eof}($self) - if $self->{on_eof}; - } + $self->{on_eof}($self) + if $self->{_eof} && $self->{on_eof}; } =item $handle->on_read ($cb) @@ -962,8 +1052,10 @@ my $len = sysread $self->{fh}, $$rbuf, $self->{read_size} || 8192, length $$rbuf; if ($len > 0) { + $self->{_activity} = AnyEvent->now; + $self->{filter_r} - ? $self->{filter_r}->($self, $rbuf) + ? $self->{filter_r}($self, $rbuf) : $self->_drain_rbuf; } elsif (defined $len) { @@ -1029,7 +1121,6 @@ =cut -# TODO: maybe document... sub starttls { my ($self, $ssl, $ctx) = @_;