--- AnyEvent/lib/AnyEvent/Handle.pm 2008/05/27 05:36:27 1.40 +++ AnyEvent/lib/AnyEvent/Handle.pm 2008/05/29 00:20:39 1.45 @@ -4,11 +4,11 @@ use strict; use AnyEvent (); -use AnyEvent::Util qw(WSAWOULDBLOCK); +use AnyEvent::Util qw(WSAEWOULDBLOCK); use Scalar::Util (); use Carp (); use Fcntl (); -use Errno qw/EAGAIN EINTR/; +use Errno qw(EAGAIN EINTR); =head1 NAME @@ -93,7 +93,7 @@ called. On callback entrance, the value of C<$!> contains the operating system -error (or C, C or C). +error (or C, C, C or C). The callback should throw an exception. If it returns, then AnyEvent::Handle will C for you. @@ -122,6 +122,26 @@ To append to the write buffer, use the C<< ->push_write >> method. +=item timeout => $fractional_seconds + +If non-zero, then this enables an "inactivity" timeout: whenever this many +seconds pass without a successful read or write on the underlying file +handle, the C callback will be invoked (and if that one is +missing, an C error will be raised). + +Note that timeout processing is also active when you currently do not have +any outstanding read or write requests: If you plan to keep the connection +idle then you should disable the timout temporarily or ignore the timeout +in the C callback. + +Zero (the default) disables this timeout. + +=item on_timeout => $cb->($handle) + +Called whenever the inactivity timeout passes. If you return from this +callback, then the timeout will be reset as if some activity had happened, +so this condition is not fatal in any way. + =item rbuf_max => If defined, then a fatal error will be raised (with C<$!> set to C) @@ -174,8 +194,8 @@ This is the json coder object used by the C read and write types. -If you don't supply it, then AnyEvent::Handle will use C and -C. +If you don't supply it, then AnyEvent::Handle will create and use a +suitable one, which will write and expect UTF-8 encoded JSON texts. Note that you are responsible to depend on the JSON module if you want to use this functionality, as AnyEvent does not have a dependency itself. @@ -204,10 +224,13 @@ $self->starttls (delete $self->{tls}, delete $self->{tls_ctx}); } - $self->on_eof (delete $self->{on_eof} ) if $self->{on_eof}; - $self->on_error (delete $self->{on_error}) if $self->{on_error}; +# $self->on_eof (delete $self->{on_eof} ) if $self->{on_eof}; # nop +# $self->on_error (delete $self->{on_error}) if $self->{on_error}; # nop +# $self->on_read (delete $self->{on_read} ) if $self->{on_read}; # nop $self->on_drain (delete $self->{on_drain}) if $self->{on_drain}; - $self->on_read (delete $self->{on_read} ) if $self->{on_read}; + + $self->{_activity} = AnyEvent->now; + $self->_timeout; $self->start_read; @@ -264,6 +287,73 @@ $_[0]{on_eof} = $_[1]; } +=item $handle->on_timeout ($cb) + +Replace the current C callback, or disables the callback +(but not the timeout) if C<$cb> = C. See C constructor +argument. + +=cut + +sub on_timeout { + $_[0]{on_timeout} = $_[1]; +} + +############################################################################# + +=item $handle->timeout ($seconds) + +Configures (or disables) the inactivity timeout. + +=cut + +sub timeout { + my ($self, $timeout) = @_; + + $self->{timeout} = $timeout; + $self->_timeout; +} + +# reset the timeout watcher, as neccessary +# also check for time-outs +sub _timeout { + my ($self) = @_; + + if ($self->{timeout}) { + my $NOW = AnyEvent->now; + + # when would the timeout trigger? + my $after = $self->{_activity} + $self->{timeout} - $NOW; + + # now or in the past already? + if ($after <= 0) { + $self->{_activity} = $NOW; + + if ($self->{on_timeout}) { + $self->{on_timeout}->($self); + } else { + $! = Errno::ETIMEDOUT; + $self->error; + } + + # callbakx could have changed timeout value, optimise + return unless $self->{timeout}; + + # calculate new after + $after = $self->{timeout}; + } + + Scalar::Util::weaken $self; + + $self->{_tw} ||= AnyEvent->timer (after => $after, cb => sub { + delete $self->{_tw}; + $self->_timeout; + }); + } else { + delete $self->{_tw}; + } +} + ############################################################################# =back @@ -318,12 +408,14 @@ if ($len >= 0) { substr $self->{wbuf}, 0, $len, ""; + $self->{_activity} = AnyEvent->now; + $self->{on_drain}($self) if $self->{low_water_mark} >= length $self->{wbuf} && $self->{on_drain}; delete $self->{_ww} unless length $self->{wbuf}; - } elsif ($! != EAGAIN && $! != EINTR && $! != WSAWOULDBLOCK) { + } elsif ($! != EAGAIN && $! != EINTR && $! != WSAEWOULDBLOCK) { $self->error; } }; @@ -398,6 +490,26 @@ one end of a handle and read them at the other end without using any additional framing. +The generated JSON text is guaranteed not to contain any newlines: While +this module doesn't need delimiters after or between JSON texts to be +able to read them, many other languages depend on that. + +A simple RPC protocol that interoperates easily with others is to send +JSON arrays (or objects, although arrays are usually the better choice as +they mimic how function argument passing works) and a newline after each +JSON text: + + $handle->push_write (json => ["method", "arg1", "arg2"]); # whatever + $handle->push_write ("\012"); + +An AnyEvent::Handle receiver would simply use the C read type and +rely on the fact that the newline will be skipped as leading whitespace: + + $handle->push_read (json => sub { my $array = $_[1]; ... }); + +Other languages could read single lines terminated by a newline and pass +this line into their JSON decoder of choice. + =cut register_write_type json => sub { @@ -861,7 +973,8 @@ AnyEvent does not depend on it itself. Since JSON texts are fully self-delimiting, the C read and write -types are an ideal simple RPC protocol: just exchange JSON datagrams. +types are an ideal simple RPC protocol: just exchange JSON datagrams. See +the C write type description, above, for an actual example. =cut @@ -873,7 +986,7 @@ my $data; my $rbuf = \$self->{rbuf}; - my $json = $self->{json} ||= JSON::XS->new->utf8; + my $json = $self->{json} ||= JSON->new->utf8; sub { my $ref = $json->incr_parse ($self->{rbuf}); @@ -941,16 +1054,20 @@ my $len = sysread $self->{fh}, $$rbuf, $self->{read_size} || 8192, length $$rbuf; if ($len > 0) { + $self->{_activity} = AnyEvent->now; + $self->{filter_r} ? $self->{filter_r}->($self, $rbuf) : $self->_drain_rbuf; } elsif (defined $len) { delete $self->{_rw}; + delete $self->{_ww}; + delete $self->{_tw}; $self->{_eof} = 1; $self->_drain_rbuf; - } elsif ($! != EAGAIN && $! != EINTR && $! != &AnyEvent::Util::WSAWOULDBLOCK) { + } elsif ($! != EAGAIN && $! != EINTR && $! != WSAEWOULDBLOCK) { return $self->error; } });