--- AnyEvent/lib/AnyEvent/Handle.pm 2008/05/27 05:47:36 1.41 +++ AnyEvent/lib/AnyEvent/Handle.pm 2008/06/02 09:10:38 1.52 @@ -4,11 +4,11 @@ use strict; use AnyEvent (); -use AnyEvent::Util qw(WSAWOULDBLOCK); +use AnyEvent::Util qw(WSAEWOULDBLOCK); use Scalar::Util (); use Carp (); use Fcntl (); -use Errno qw/EAGAIN EINTR/; +use Errno qw(EAGAIN EINTR); =head1 NAME @@ -16,7 +16,7 @@ =cut -our $VERSION = '0.04'; +our $VERSION = 4.1; =head1 SYNOPSIS @@ -77,30 +77,32 @@ =item on_eof => $cb->($handle) -Set the callback to be called on EOF. +Set the callback to be called when an end-of-file condition is detcted, +i.e. in the case of a socket, when the other side has closed the +connection cleanly. While not mandatory, it is highly recommended to set an eof callback, otherwise you might end up with a closed socket while you are still waiting for data. -=item on_error => $cb->($handle) +=item on_error => $cb->($handle, $fatal) -This is the fatal error callback, that is called when, well, a fatal error -occurs, such as not being able to resolve the hostname, failure to connect -or a read error. - -The object will not be in a usable state when this callback has been -called. +This is the error callback, which is called when, well, some error +occured, such as not being able to resolve the hostname, failure to +connect or a read error. + +Some errors are fatal (which is indicated by C<$fatal> being true). On +fatal errors the handle object will be shut down and will not be +usable. Non-fatal errors can be retried by simply returning, but it is +recommended to simply ignore this parameter and instead abondon the handle +object when this callback is invoked. On callback entrance, the value of C<$!> contains the operating system -error (or C, C or C). - -The callback should throw an exception. If it returns, then -AnyEvent::Handle will C for you. +error (or C, C, C or C). While not mandatory, it is I recommended to set this callback, as you will not be notified of errors otherwise. The default simply calls -die. +C. =item on_read => $cb->($handle) @@ -122,6 +124,26 @@ To append to the write buffer, use the C<< ->push_write >> method. +=item timeout => $fractional_seconds + +If non-zero, then this enables an "inactivity" timeout: whenever this many +seconds pass without a successful read or write on the underlying file +handle, the C callback will be invoked (and if that one is +missing, an C error will be raised). + +Note that timeout processing is also active when you currently do not have +any outstanding read or write requests: If you plan to keep the connection +idle then you should disable the timout temporarily or ignore the timeout +in the C callback. + +Zero (the default) disables this timeout. + +=item on_timeout => $cb->($handle) + +Called whenever the inactivity timeout passes. If you return from this +callback, then the timeout will be reset as if some activity had happened, +so this condition is not fatal in any way. + =item rbuf_max => If defined, then a fatal error will be raised (with C<$!> set to C) @@ -137,7 +159,7 @@ =item read_size => The default read block size (the amount of bytes this module will try to read -on each [loop iteration). Default: C<4096>. +during each (loop iteration). Default: C<8192>. =item low_water_mark => @@ -204,10 +226,13 @@ $self->starttls (delete $self->{tls}, delete $self->{tls_ctx}); } - $self->on_eof (delete $self->{on_eof} ) if $self->{on_eof}; - $self->on_error (delete $self->{on_error}) if $self->{on_error}; +# $self->on_eof (delete $self->{on_eof} ) if $self->{on_eof}; # nop +# $self->on_error (delete $self->{on_error}) if $self->{on_error}; # nop +# $self->on_read (delete $self->{on_read} ) if $self->{on_read}; # nop $self->on_drain (delete $self->{on_drain}) if $self->{on_drain}; - $self->on_read (delete $self->{on_read} ) if $self->{on_read}; + + $self->{_activity} = AnyEvent->now; + $self->_timeout; $self->start_read; @@ -217,23 +242,27 @@ sub _shutdown { my ($self) = @_; + delete $self->{_tw}; delete $self->{_rw}; delete $self->{_ww}; delete $self->{fh}; + + $self->stoptls; } -sub error { - my ($self) = @_; +sub _error { + my ($self, $errno, $fatal) = @_; - { - local $!; - $self->_shutdown; - } + $self->_shutdown + if $fatal; - $self->{on_error}($self) - if $self->{on_error}; + $! = $errno; - Carp::croak "AnyEvent::Handle uncaught fatal error: $!"; + if ($self->{on_error}) { + $self->{on_error}($self, $fatal); + } else { + Carp::croak "AnyEvent::Handle uncaught error: $!"; + } } =item $fh = $handle->fh @@ -264,6 +293,72 @@ $_[0]{on_eof} = $_[1]; } +=item $handle->on_timeout ($cb) + +Replace the current C callback, or disables the callback +(but not the timeout) if C<$cb> = C. See C constructor +argument. + +=cut + +sub on_timeout { + $_[0]{on_timeout} = $_[1]; +} + +############################################################################# + +=item $handle->timeout ($seconds) + +Configures (or disables) the inactivity timeout. + +=cut + +sub timeout { + my ($self, $timeout) = @_; + + $self->{timeout} = $timeout; + $self->_timeout; +} + +# reset the timeout watcher, as neccessary +# also check for time-outs +sub _timeout { + my ($self) = @_; + + if ($self->{timeout}) { + my $NOW = AnyEvent->now; + + # when would the timeout trigger? + my $after = $self->{_activity} + $self->{timeout} - $NOW; + + # now or in the past already? + if ($after <= 0) { + $self->{_activity} = $NOW; + + if ($self->{on_timeout}) { + $self->{on_timeout}($self); + } else { + $self->_error (&Errno::ETIMEDOUT); + } + + # callbakx could have changed timeout value, optimise + return unless $self->{timeout}; + + # calculate new after + $after = $self->{timeout}; + } + + Scalar::Util::weaken $self; + + $self->{_tw} ||= AnyEvent->timer (after => $after, cb => sub { + delete $self->{_tw}; + $self->_timeout; + }); + } else { + delete $self->{_tw}; + } +} + ############################################################################# =back @@ -318,13 +413,15 @@ if ($len >= 0) { substr $self->{wbuf}, 0, $len, ""; + $self->{_activity} = AnyEvent->now; + $self->{on_drain}($self) if $self->{low_water_mark} >= length $self->{wbuf} && $self->{on_drain}; delete $self->{_ww} unless length $self->{wbuf}; - } elsif ($! != EAGAIN && $! != EINTR && $! != WSAWOULDBLOCK) { - $self->error; + } elsif ($! != EAGAIN && $! != EINTR && $! != WSAEWOULDBLOCK) { + $self->_error ($!, 1); } }; @@ -354,7 +451,7 @@ } if ($self->{filter_w}) { - $self->{filter_w}->($self, \$_[0]); + $self->{filter_w}($self, \$_[0]); } else { $self->{wbuf} .= $_[0]; $self->_drain_wbuf; @@ -363,8 +460,6 @@ =item $handle->push_write (type => @args) -=item $handle->unshift_write (type => @args) - Instead of formatting your data yourself, you can also let this module do the job by specifying a type and type-specific arguments. @@ -378,8 +473,6 @@ Formats the given value as netstring (http://cr.yp.to/proto/netstrings.txt, this is not a recommendation to use them). -=back - =cut register_write_type netstring => sub { @@ -441,6 +534,8 @@ Note that this is a function, and all types registered this way will be global, so try to use unique names. +=back + =cut ############################################################################# @@ -475,12 +570,12 @@ # in the default state, expect some header bytes $handle->on_read (sub { # some data is here, now queue the length-header-read (4 octets) - shift->unshift_read_chunk (4, sub { + shift->unshift_read (chunk => 4, sub { # header arrived, decode my $len = unpack "N", $_[1]; # now read the payload - shift->unshift_read_chunk ($len, sub { + shift->unshift_read (chunk => $len, sub { my $xml = $_[1]; # handle xml }); @@ -497,13 +592,13 @@ $handle->push_write ("request 1\015\012"); # we expect "ERROR" or "OK" as response, so push a line read - $handle->push_read_line (sub { + $handle->push_read (line => sub { # if we got an "OK", we have to _prepend_ another line, # so it will be read before the second request reads its 64 bytes # which are already in the queue when this callback is called # we don't do this in case we got an error if ($_[1] eq "OK") { - $_[0]->unshift_read_line (sub { + $_[0]->unshift_read (line => sub { my $response = $_[1]; ... }); @@ -514,7 +609,7 @@ $handle->push_write ("request 2\015\012"); # simply read 64 bytes, always - $handle->push_read_chunk (64, sub { + $handle->push_read (chunk => 64, sub { my $response = $_[1]; ... }); @@ -530,8 +625,7 @@ defined $self->{rbuf_max} && $self->{rbuf_max} < length $self->{rbuf} ) { - $! = &Errno::ENOSPC; - $self->error; + return $self->_error (&Errno::ENOSPC, 1); } return if $self->{in_drain}; @@ -543,8 +637,7 @@ unless ($cb->($self)) { if ($self->{_eof}) { # no progress can be made (not enough data and no data forthcoming) - $! = &Errno::EPIPE; - $self->error; + return $self->_error (&Errno::EPIPE, 1); } unshift @{ $self->{_queue} }, $cb; @@ -560,8 +653,7 @@ && $self->{on_read} # and we still want to read data ) { # then no progress can be made - $! = &Errno::EPIPE; - $self->error; + return $self->_error (&Errno::EPIPE, 1); } } else { # read side becomes idle @@ -570,11 +662,8 @@ } } - if ($self->{_eof}) { - $self->_shutdown; - $self->{on_eof}($self) - if $self->{on_eof}; - } + $self->{on_eof}($self) + if $self->{_eof} && $self->{on_eof}; } =item $handle->on_read ($cb) @@ -773,8 +862,7 @@ sub { unless ($_[0]{rbuf} =~ s/^(0|[1-9][0-9]*)://) { if ($_[0]{rbuf} =~ /[^0-9]/) { - $! = &Errno::EBADMSG; - $self->error; + $self->_error (&Errno::EBADMSG); } return; } @@ -787,8 +875,7 @@ if ($_[1] eq ",") { $cb->($_[0], $string); } else { - $! = &Errno::EBADMSG; - $self->error; + $self->_error (&Errno::EBADMSG); } }); }); @@ -855,8 +942,7 @@ # reject if ($reject && $$rbuf =~ $reject) { - $! = &Errno::EBADMSG; - $self->error; + $self->_error (&Errno::EBADMSG); } # skip @@ -962,8 +1048,10 @@ my $len = sysread $self->{fh}, $$rbuf, $self->{read_size} || 8192, length $$rbuf; if ($len > 0) { + $self->{_activity} = AnyEvent->now; + $self->{filter_r} - ? $self->{filter_r}->($self, $rbuf) + ? $self->{filter_r}($self, $rbuf) : $self->_drain_rbuf; } elsif (defined $len) { @@ -971,8 +1059,8 @@ $self->{_eof} = 1; $self->_drain_rbuf; - } elsif ($! != EAGAIN && $! != EINTR && $! != &AnyEvent::Util::WSAWOULDBLOCK) { - return $self->error; + } elsif ($! != EAGAIN && $! != EINTR && $! != WSAEWOULDBLOCK) { + return $self->_error ($!, 1); } }); } @@ -1001,10 +1089,9 @@ if ($err!= Net::SSLeay::ERROR_WANT_READ ()) { if ($err == Net::SSLeay::ERROR_SYSCALL ()) { - $self->error; + return $self->_error ($!, 1); } elsif ($err == Net::SSLeay::ERROR_SSL ()) { - $! = &Errno::EIO; - $self->error; + return $self->_error (&Errno::EIO, 1); } # all others are fine for our purposes @@ -1029,7 +1116,6 @@ =cut -# TODO: maybe document... sub starttls { my ($self, $ssl, $ctx) = @_;