--- AnyEvent/lib/AnyEvent/Handle.pm 2008/05/26 18:26:52 1.36 +++ AnyEvent/lib/AnyEvent/Handle.pm 2008/06/04 09:55:16 1.56 @@ -4,11 +4,11 @@ use strict; use AnyEvent (); -use AnyEvent::Util qw(WSAWOULDBLOCK); +use AnyEvent::Util qw(WSAEWOULDBLOCK); use Scalar::Util (); use Carp (); use Fcntl (); -use Errno qw/EAGAIN EINTR/; +use Errno qw(EAGAIN EINTR); =head1 NAME @@ -16,7 +16,7 @@ =cut -our $VERSION = '0.04'; +our $VERSION = 4.12; =head1 SYNOPSIS @@ -75,50 +75,75 @@ NOTE: The filehandle will be set to non-blocking (using AnyEvent::Util::fh_nonblocking). -=item on_eof => $cb->($self) +=item on_eof => $cb->($handle) -Set the callback to be called on EOF. +Set the callback to be called when an end-of-file condition is detcted, +i.e. in the case of a socket, when the other side has closed the +connection cleanly. While not mandatory, it is highly recommended to set an eof callback, otherwise you might end up with a closed socket while you are still waiting for data. -=item on_error => $cb->($self) +=item on_error => $cb->($handle, $fatal) -This is the fatal error callback, that is called when, well, a fatal error -occurs, such as not being able to resolve the hostname, failure to connect -or a read error. - -The object will not be in a usable state when this callback has been -called. +This is the error callback, which is called when, well, some error +occured, such as not being able to resolve the hostname, failure to +connect or a read error. + +Some errors are fatal (which is indicated by C<$fatal> being true). On +fatal errors the handle object will be shut down and will not be +usable. Non-fatal errors can be retried by simply returning, but it is +recommended to simply ignore this parameter and instead abondon the handle +object when this callback is invoked. On callback entrance, the value of C<$!> contains the operating system -error (or C, C or C). +error (or C, C, C or C). While not mandatory, it is I recommended to set this callback, as you will not be notified of errors otherwise. The default simply calls -die. +C. -=item on_read => $cb->($self) +=item on_read => $cb->($handle) This sets the default read callback, which is called when data arrives and no read request is in the queue. To access (and remove data from) the read buffer, use the C<< ->rbuf >> -method or access the C<$self->{rbuf}> member directly. +method or access the C<$handle->{rbuf}> member directly. When an EOF condition is detected then AnyEvent::Handle will first try to feed all the remaining data to the queued callbacks and C before calling the C callback. If no progress can be made, then a fatal error will be raised (with C<$!> set to C). -=item on_drain => $cb->() +=item on_drain => $cb->($handle) This sets the callback that is called when the write buffer becomes empty (or when the callback is set and the buffer is empty already). To append to the write buffer, use the C<< ->push_write >> method. +=item timeout => $fractional_seconds + +If non-zero, then this enables an "inactivity" timeout: whenever this many +seconds pass without a successful read or write on the underlying file +handle, the C callback will be invoked (and if that one is +missing, an C error will be raised). + +Note that timeout processing is also active when you currently do not have +any outstanding read or write requests: If you plan to keep the connection +idle then you should disable the timout temporarily or ignore the timeout +in the C callback. + +Zero (the default) disables this timeout. + +=item on_timeout => $cb->($handle) + +Called whenever the inactivity timeout passes. If you return from this +callback, then the timeout will be reset as if some activity had happened, +so this condition is not fatal in any way. + =item rbuf_max => If defined, then a fatal error will be raised (with C<$!> set to C) @@ -134,7 +159,7 @@ =item read_size => The default read block size (the amount of bytes this module will try to read -on each [loop iteration). Default: C<4096>. +during each (loop iteration). Default: C<8192>. =item low_water_mark => @@ -167,6 +192,22 @@ (unless a connection object was specified directly). If this parameter is missing, then AnyEvent::Handle will use C. +=item json => JSON or JSON::XS object + +This is the json coder object used by the C read and write types. + +If you don't supply it, then AnyEvent::Handle will create and use a +suitable one, which will write and expect UTF-8 encoded JSON texts. + +Note that you are responsible to depend on the JSON module if you want to +use this functionality, as AnyEvent does not have a dependency itself. + +=item filter_r => $cb + +=item filter_w => $cb + +These exist, but are undocumented at this time. + =back =cut @@ -185,10 +226,13 @@ $self->starttls (delete $self->{tls}, delete $self->{tls_ctx}); } - $self->on_eof (delete $self->{on_eof} ) if $self->{on_eof}; - $self->on_error (delete $self->{on_error}) if $self->{on_error}; +# $self->on_eof (delete $self->{on_eof} ) if $self->{on_eof}; # nop +# $self->on_error (delete $self->{on_error}) if $self->{on_error}; # nop +# $self->on_read (delete $self->{on_read} ) if $self->{on_read}; # nop $self->on_drain (delete $self->{on_drain}) if $self->{on_drain}; - $self->on_read (delete $self->{on_read} ) if $self->{on_read}; + + $self->{_activity} = AnyEvent->now; + $self->_timeout; $self->start_read; @@ -198,23 +242,26 @@ sub _shutdown { my ($self) = @_; - delete $self->{rw}; - delete $self->{ww}; + delete $self->{_tw}; + delete $self->{_rw}; + delete $self->{_ww}; delete $self->{fh}; + + $self->stoptls; } -sub error { - my ($self) = @_; +sub _error { + my ($self, $errno, $fatal) = @_; - { - local $!; - $self->_shutdown; - } + $self->_shutdown + if $fatal; + + $! = $errno; if ($self->{on_error}) { - $self->{on_error}($self); + $self->{on_error}($self, $fatal); } else { - Carp::croak "AnyEvent::Handle uncaught fatal error: $!"; + Carp::croak "AnyEvent::Handle uncaught error: $!"; } } @@ -224,7 +271,7 @@ =cut -sub fh { $_[0]->{fh} } +sub fh { $_[0]{fh} } =item $handle->on_error ($cb) @@ -246,6 +293,73 @@ $_[0]{on_eof} = $_[1]; } +=item $handle->on_timeout ($cb) + +Replace the current C callback, or disables the callback +(but not the timeout) if C<$cb> = C. See C constructor +argument. + +=cut + +sub on_timeout { + $_[0]{on_timeout} = $_[1]; +} + +############################################################################# + +=item $handle->timeout ($seconds) + +Configures (or disables) the inactivity timeout. + +=cut + +sub timeout { + my ($self, $timeout) = @_; + + $self->{timeout} = $timeout; + $self->_timeout; +} + +# reset the timeout watcher, as neccessary +# also check for time-outs +sub _timeout { + my ($self) = @_; + + if ($self->{timeout}) { + my $NOW = AnyEvent->now; + + # when would the timeout trigger? + my $after = $self->{_activity} + $self->{timeout} - $NOW; + + # now or in the past already? + if ($after <= 0) { + $self->{_activity} = $NOW; + + if ($self->{on_timeout}) { + $self->{on_timeout}($self); + } else { + $self->_error (&Errno::ETIMEDOUT); + } + + # callback could have changed timeout value, optimise + return unless $self->{timeout}; + + # calculate new after + $after = $self->{timeout}; + } + + Scalar::Util::weaken $self; + return unless $self; # ->error could have destroyed $self + + $self->{_tw} ||= AnyEvent->timer (after => $after, cb => sub { + delete $self->{_tw}; + $self->_timeout; + }); + } else { + delete $self->{_tw}; + } +} + ############################################################################# =back @@ -290,7 +404,7 @@ sub _drain_wbuf { my ($self) = @_; - if (!$self->{ww} && length $self->{wbuf}) { + if (!$self->{_ww} && length $self->{wbuf}) { Scalar::Util::weaken $self; @@ -300,13 +414,15 @@ if ($len >= 0) { substr $self->{wbuf}, 0, $len, ""; + $self->{_activity} = AnyEvent->now; + $self->{on_drain}($self) if $self->{low_water_mark} >= length $self->{wbuf} && $self->{on_drain}; - delete $self->{ww} unless length $self->{wbuf}; - } elsif ($! != EAGAIN && $! != EINTR && $! != WSAWOULDBLOCK) { - $self->error; + delete $self->{_ww} unless length $self->{wbuf}; + } elsif ($! != EAGAIN && $! != EINTR && $! != WSAEWOULDBLOCK) { + $self->_error ($!, 1); } }; @@ -314,7 +430,7 @@ $cb->(); # if still data left in wbuf, we need to poll - $self->{ww} = AnyEvent->io (fh => $self->{fh}, poll => "w", cb => $cb) + $self->{_ww} = AnyEvent->io (fh => $self->{fh}, poll => "w", cb => $cb) if length $self->{wbuf}; }; } @@ -336,7 +452,7 @@ } if ($self->{filter_w}) { - $self->{filter_w}->($self, \$_[0]); + $self->{filter_w}($self, \$_[0]); } else { $self->{wbuf} .= $_[0]; $self->_drain_wbuf; @@ -345,8 +461,6 @@ =item $handle->push_write (type => @args) -=item $handle->unshift_write (type => @args) - Instead of formatting your data yourself, you can also let this module do the job by specifying a type and type-specific arguments. @@ -360,8 +474,6 @@ Formats the given value as netstring (http://cr.yp.to/proto/netstrings.txt, this is not a recommendation to use them). -=back - =cut register_write_type netstring => sub { @@ -370,7 +482,50 @@ sprintf "%d:%s,", (length $string), $string }; -=item AnyEvent::Handle::register_write_type type => $coderef->($self, @args) +=item json => $array_or_hashref + +Encodes the given hash or array reference into a JSON object. Unless you +provide your own JSON object, this means it will be encoded to JSON text +in UTF-8. + +JSON objects (and arrays) are self-delimiting, so you can write JSON at +one end of a handle and read them at the other end without using any +additional framing. + +The generated JSON text is guaranteed not to contain any newlines: While +this module doesn't need delimiters after or between JSON texts to be +able to read them, many other languages depend on that. + +A simple RPC protocol that interoperates easily with others is to send +JSON arrays (or objects, although arrays are usually the better choice as +they mimic how function argument passing works) and a newline after each +JSON text: + + $handle->push_write (json => ["method", "arg1", "arg2"]); # whatever + $handle->push_write ("\012"); + +An AnyEvent::Handle receiver would simply use the C read type and +rely on the fact that the newline will be skipped as leading whitespace: + + $handle->push_read (json => sub { my $array = $_[1]; ... }); + +Other languages could read single lines terminated by a newline and pass +this line into their JSON decoder of choice. + +=cut + +register_write_type json => sub { + my ($self, $ref) = @_; + + require JSON; + + $self->{json} ? $self->{json}->encode ($ref) + : JSON::encode_json ($ref) +}; + +=back + +=item AnyEvent::Handle::register_write_type type => $coderef->($handle, @args) This function (not method) lets you add your own types to C. Whenever the given C is used, C will invoke the code @@ -416,12 +571,12 @@ # in the default state, expect some header bytes $handle->on_read (sub { # some data is here, now queue the length-header-read (4 octets) - shift->unshift_read_chunk (4, sub { + shift->unshift_read (chunk => 4, sub { # header arrived, decode my $len = unpack "N", $_[1]; # now read the payload - shift->unshift_read_chunk ($len, sub { + shift->unshift_read (chunk => $len, sub { my $xml = $_[1]; # handle xml }); @@ -438,13 +593,13 @@ $handle->push_write ("request 1\015\012"); # we expect "ERROR" or "OK" as response, so push a line read - $handle->push_read_line (sub { + $handle->push_read (line => sub { # if we got an "OK", we have to _prepend_ another line, # so it will be read before the second request reads its 64 bytes # which are already in the queue when this callback is called # we don't do this in case we got an error if ($_[1] eq "OK") { - $_[0]->unshift_read_line (sub { + $_[0]->unshift_read (line => sub { my $response = $_[1]; ... }); @@ -455,7 +610,7 @@ $handle->push_write ("request 2\015\012"); # simply read 64 bytes, always - $handle->push_read_chunk (64, sub { + $handle->push_read (chunk => 64, sub { my $response = $_[1]; ... }); @@ -471,7 +626,7 @@ defined $self->{rbuf_max} && $self->{rbuf_max} < length $self->{rbuf} ) { - $! = &Errno::ENOSPC; return $self->error; + return $self->_error (&Errno::ENOSPC, 1); } return if $self->{in_drain}; @@ -479,39 +634,45 @@ while (my $len = length $self->{rbuf}) { no strict 'refs'; - if (my $cb = shift @{ $self->{queue} }) { + if (my $cb = shift @{ $self->{_queue} }) { unless ($cb->($self)) { - if ($self->{eof}) { + if ($self->{_eof}) { # no progress can be made (not enough data and no data forthcoming) - $! = &Errno::EPIPE; return $self->error; + return $self->_error (&Errno::EPIPE, 1); } - unshift @{ $self->{queue} }, $cb; - return; + unshift @{ $self->{_queue} }, $cb; + last; } } elsif ($self->{on_read}) { $self->{on_read}($self); if ( - $self->{eof} # if no further data will arrive - && $len == length $self->{rbuf} # and no data has been consumed - && !@{ $self->{queue} } # and the queue is still empty - && $self->{on_read} # and we still want to read data + $len == length $self->{rbuf} # if no data has been consumed + && !@{ $self->{_queue} } # and the queue is still empty + && $self->{on_read} # but we still have on_read ) { - # then no progress can be made - $! = &Errno::EPIPE; return $self->error; + # no further data will arrive + # so no progress can be made + return $self->_error (&Errno::EPIPE, 1) + if $self->{_eof}; + + last; # more data might arrive } } else { # read side becomes idle - delete $self->{rw}; - return; + delete $self->{_rw}; + last; } } - if ($self->{eof}) { - $self->_shutdown; - $self->{on_eof}($self) - if $self->{on_eof}; + $self->{on_eof}($self) + if $self->{_eof} && $self->{on_eof}; + + # may need to restart read watcher + unless ($self->{_rw}) { + $self->start_read + if $self->{on_read} || @{ $self->{_queue} }; } } @@ -584,7 +745,7 @@ ->($self, $cb, @_); } - push @{ $self->{queue} }, $cb; + push @{ $self->{_queue} }, $cb; $self->_drain_rbuf; } @@ -600,7 +761,7 @@ } - unshift @{ $self->{queue} }, $cb; + unshift @{ $self->{_queue} }, $cb; $self->_drain_rbuf; } @@ -617,7 +778,7 @@ =over 4 -=item chunk => $octets, $cb->($self, $data) +=item chunk => $octets, $cb->($handle, $data) Invoke the callback only once C<$octets> bytes have been read. Pass the data read to the callback. The callback will never be called with less @@ -650,7 +811,7 @@ $_[0]->unshift_read (chunk => $_[1], $_[2]); } -=item line => [$eol, ]$cb->($self, $line, $eol) +=item line => [$eol, ]$cb->($handle, $line, $eol) The callback will be called only once a full line (including the end of line marker, C<$eol>) has been read. This line (excluding the end of line @@ -697,7 +858,7 @@ $self->unshift_read (line => @_); } -=item netstring => $cb->($string) +=item netstring => $cb->($handle, $string) A netstring (http://cr.yp.to/proto/netstrings.txt, this is not an endorsement). @@ -711,8 +872,7 @@ sub { unless ($_[0]{rbuf} =~ s/^(0|[1-9][0-9]*)://) { if ($_[0]{rbuf} =~ /[^0-9]/) { - $! = &Errno::EBADMSG; - $self->error; + $self->_error (&Errno::EBADMSG); } return; } @@ -725,8 +885,7 @@ if ($_[1] eq ",") { $cb->($_[0], $string); } else { - $! = &Errno::EBADMSG; - $self->error; + $self->_error (&Errno::EBADMSG); } }); }); @@ -735,7 +894,7 @@ } }; -=item regex => $accept[, $reject[, $skip], $cb->($data) +=item regex => $accept[, $reject[, $skip], $cb->($handle, $data) Makes a regex match against the regex object C<$accept> and returns everything up to and including the match. @@ -793,8 +952,7 @@ # reject if ($reject && $$rbuf =~ $reject) { - $! = &Errno::EBADMSG; - $self->error; + $self->_error (&Errno::EBADMSG); } # skip @@ -806,9 +964,53 @@ } }; +=item json => $cb->($handle, $hash_or_arrayref) + +Reads a JSON object or array, decodes it and passes it to the callback. + +If a C object was passed to the constructor, then that will be used +for the final decode, otherwise it will create a JSON coder expecting UTF-8. + +This read type uses the incremental parser available with JSON version +2.09 (and JSON::XS version 2.2) and above. You have to provide a +dependency on your own: this module will load the JSON module, but +AnyEvent does not depend on it itself. + +Since JSON texts are fully self-delimiting, the C read and write +types are an ideal simple RPC protocol: just exchange JSON datagrams. See +the C write type description, above, for an actual example. + +=cut + +register_read_type json => sub { + my ($self, $cb, $accept, $reject, $skip) = @_; + + require JSON; + + my $data; + my $rbuf = \$self->{rbuf}; + + my $json = $self->{json} ||= JSON->new->utf8; + + sub { + my $ref = $json->incr_parse ($self->{rbuf}); + + if ($ref) { + $self->{rbuf} = $json->incr_text; + $json->incr_text = ""; + $cb->($self, $ref); + + 1 + } else { + $self->{rbuf} = ""; + () + } + } +}; + =back -=item AnyEvent::Handle::register_read_type type => $coderef->($self, $cb, @args) +=item AnyEvent::Handle::register_read_type type => $coderef->($handle, $cb, @args) This function (not method) lets you add your own types to C. @@ -820,7 +1022,7 @@ that works as a plain read callback (see C<< ->push_read ($cb) >>). It should invoke the passed callback when it is done reading (remember to -pass C<$self> as first argument as all other callbacks do that). +pass C<$handle> as first argument as all other callbacks do that). Note that this is a function, and all types registered this way will be global, so try to use unique names. @@ -837,36 +1039,43 @@ any queued callbacks will be executed then. To start reading again, call C. +Note that AnyEvent::Handle will automatically C for you when +you change the C callback or push/unshift a read callback, and it +will automatically C for you when neither C is set nor +there are any read requests in the queue. + =cut sub stop_read { my ($self) = @_; - delete $self->{rw}; + delete $self->{_rw}; } sub start_read { my ($self) = @_; - unless ($self->{rw} || $self->{eof}) { + unless ($self->{_rw} || $self->{_eof}) { Scalar::Util::weaken $self; - $self->{rw} = AnyEvent->io (fh => $self->{fh}, poll => "r", cb => sub { + $self->{_rw} = AnyEvent->io (fh => $self->{fh}, poll => "r", cb => sub { my $rbuf = $self->{filter_r} ? \my $buf : \$self->{rbuf}; my $len = sysread $self->{fh}, $$rbuf, $self->{read_size} || 8192, length $$rbuf; if ($len > 0) { + $self->{_activity} = AnyEvent->now; + $self->{filter_r} - ? $self->{filter_r}->($self, $rbuf) + ? $self->{filter_r}($self, $rbuf) : $self->_drain_rbuf; } elsif (defined $len) { - delete $self->{rw}; - $self->{eof} = 1; + delete $self->{_rw}; + $self->{_eof} = 1; $self->_drain_rbuf; - } elsif ($! != EAGAIN && $! != EINTR && $! != &AnyEvent::Util::WSAWOULDBLOCK) { - return $self->error; + } elsif ($! != EAGAIN && $! != EINTR && $! != WSAEWOULDBLOCK) { + return $self->_error ($!, 1); } }); } @@ -875,30 +1084,38 @@ sub _dotls { my ($self) = @_; - if (length $self->{tls_wbuf}) { - while ((my $len = Net::SSLeay::write ($self->{tls}, $self->{tls_wbuf})) > 0) { - substr $self->{tls_wbuf}, 0, $len, ""; + my $buf; + + if (length $self->{_tls_wbuf}) { + while ((my $len = Net::SSLeay::write ($self->{tls}, $self->{_tls_wbuf})) > 0) { + substr $self->{_tls_wbuf}, 0, $len, ""; } } - if (defined (my $buf = Net::SSLeay::BIO_read ($self->{tls_wbio}))) { + if (length ($buf = Net::SSLeay::BIO_read ($self->{_wbio}))) { $self->{wbuf} .= $buf; $self->_drain_wbuf; } - while (defined (my $buf = Net::SSLeay::read ($self->{tls}))) { - $self->{rbuf} .= $buf; - $self->_drain_rbuf; + while (defined ($buf = Net::SSLeay::read ($self->{tls}))) { + if (length $buf) { + $self->{rbuf} .= $buf; + $self->_drain_rbuf; + } else { + # let's treat SSL-eof as we treat normal EOF + $self->{_eof} = 1; + $self->_shutdown; + return; + } } my $err = Net::SSLeay::get_error ($self->{tls}, -1); if ($err!= Net::SSLeay::ERROR_WANT_READ ()) { if ($err == Net::SSLeay::ERROR_SYSCALL ()) { - $self->error; + return $self->_error ($!, 1); } elsif ($err == Net::SSLeay::ERROR_SSL ()) { - $! = &Errno::EIO; - $self->error; + return $self->_error (&Errno::EIO, 1); } # all others are fine for our purposes @@ -917,9 +1134,12 @@ The second argument is the optional C object that is used when AnyEvent::Handle has to create its own TLS connection object. +The TLS connection object will end up in C<< $handle->{tls} >> after this +call and can be used or changed to your liking. Note that the handshake +might have already started when this function returns. + =cut -# TODO: maybe document... sub starttls { my ($self, $ssl, $ctx) = @_; @@ -944,17 +1164,17 @@ (eval { local $SIG{__DIE__}; Net::SSLeay::MODE_ENABLE_PARTIAL_WRITE () } || 1) | (eval { local $SIG{__DIE__}; Net::SSLeay::MODE_ACCEPT_MOVING_WRITE_BUFFER () } || 2)); - $self->{tls_rbio} = Net::SSLeay::BIO_new (Net::SSLeay::BIO_s_mem ()); - $self->{tls_wbio} = Net::SSLeay::BIO_new (Net::SSLeay::BIO_s_mem ()); + $self->{_rbio} = Net::SSLeay::BIO_new (Net::SSLeay::BIO_s_mem ()); + $self->{_wbio} = Net::SSLeay::BIO_new (Net::SSLeay::BIO_s_mem ()); - Net::SSLeay::set_bio ($ssl, $self->{tls_rbio}, $self->{tls_wbio}); + Net::SSLeay::set_bio ($ssl, $self->{_rbio}, $self->{_wbio}); $self->{filter_w} = sub { - $_[0]{tls_wbuf} .= ${$_[1]}; + $_[0]{_tls_wbuf} .= ${$_[1]}; &_dotls; }; $self->{filter_r} = sub { - Net::SSLeay::BIO_write ($_[0]{tls_rbio}, ${$_[1]}); + Net::SSLeay::BIO_write ($_[0]{_rbio}, ${$_[1]}); &_dotls; }; } @@ -970,9 +1190,10 @@ my ($self) = @_; Net::SSLeay::free (delete $self->{tls}) if $self->{tls}; - delete $self->{tls_rbio}; - delete $self->{tls_wbio}; - delete $self->{tls_wbuf}; + + delete $self->{_rbio}; + delete $self->{_wbio}; + delete $self->{_tls_wbuf}; delete $self->{filter_r}; delete $self->{filter_w}; } @@ -1020,6 +1241,35 @@ =back +=head1 SUBCLASSING AnyEvent::Handle + +In many cases, you might want to subclass AnyEvent::Handle. + +To make this easier, a given version of AnyEvent::Handle uses these +conventions: + +=over 4 + +=item * all constructor arguments become object members. + +At least initially, when you pass a C-argument to the constructor it +will end up in C<< $handle->{tls} >>. Those members might be changes or +mutated later on (for example C will hold the TLS connection object). + +=item * other object member names are prefixed with an C<_>. + +All object members not explicitly documented (internal use) are prefixed +with an underscore character, so the remaining non-C<_>-namespace is free +for use for subclasses. + +=item * all members not documented here and not prefixed with an underscore +are free to use in subclasses. + +Of course, new versions of AnyEvent::Handle may introduce more "public" +member variables, but thats just life, at least it is documented. + +=back + =head1 AUTHOR Robin Redeker C<< >>, Marc Lehmann .