--- AnyEvent/lib/AnyEvent/Handle.pm 2008/05/26 20:02:22 1.37 +++ AnyEvent/lib/AnyEvent/Handle.pm 2008/07/27 08:37:56 1.79 @@ -1,14 +1,14 @@ package AnyEvent::Handle; no warnings; -use strict; +use strict qw(subs vars); use AnyEvent (); -use AnyEvent::Util qw(WSAWOULDBLOCK); +use AnyEvent::Util qw(WSAEWOULDBLOCK); use Scalar::Util (); use Carp (); use Fcntl (); -use Errno qw/EAGAIN EINTR/; +use Errno qw(EAGAIN EINTR); =head1 NAME @@ -16,7 +16,7 @@ =cut -our $VERSION = '0.04'; +our $VERSION = 4.22; =head1 SYNOPSIS @@ -75,53 +75,83 @@ NOTE: The filehandle will be set to non-blocking (using AnyEvent::Util::fh_nonblocking). -=item on_eof => $cb->($self) +=item on_eof => $cb->($handle) -Set the callback to be called on EOF. +Set the callback to be called when an end-of-file condition is detected, +i.e. in the case of a socket, when the other side has closed the +connection cleanly. While not mandatory, it is highly recommended to set an eof callback, otherwise you might end up with a closed socket while you are still waiting for data. -=item on_error => $cb->($self) +=item on_error => $cb->($handle, $fatal) -This is the fatal error callback, that is called when, well, a fatal error -occurs, such as not being able to resolve the hostname, failure to connect -or a read error. - -The object will not be in a usable state when this callback has been -called. +This is the error callback, which is called when, well, some error +occured, such as not being able to resolve the hostname, failure to +connect or a read error. + +Some errors are fatal (which is indicated by C<$fatal> being true). On +fatal errors the handle object will be shut down and will not be +usable. Non-fatal errors can be retried by simply returning, but it is +recommended to simply ignore this parameter and instead abondon the handle +object when this callback is invoked. On callback entrance, the value of C<$!> contains the operating system -error (or C, C or C). - -The callbakc should throw an exception. If it returns, then -AnyEvent::Handle will C for you. +error (or C, C, C or C). While not mandatory, it is I recommended to set this callback, as you will not be notified of errors otherwise. The default simply calls -die. +C. -=item on_read => $cb->($self) +=item on_read => $cb->($handle) This sets the default read callback, which is called when data arrives -and no read request is in the queue. +and no read request is in the queue (unlike read queue callbacks, this +callback will only be called when at least one octet of data is in the +read buffer). To access (and remove data from) the read buffer, use the C<< ->rbuf >> -method or access the C<$self->{rbuf}> member directly. +method or access the C<$handle->{rbuf}> member directly. When an EOF condition is detected then AnyEvent::Handle will first try to feed all the remaining data to the queued callbacks and C before calling the C callback. If no progress can be made, then a fatal error will be raised (with C<$!> set to C). -=item on_drain => $cb->() +=item on_drain => $cb->($handle) This sets the callback that is called when the write buffer becomes empty (or when the callback is set and the buffer is empty already). To append to the write buffer, use the C<< ->push_write >> method. +This callback is useful when you don't want to put all of your write data +into the queue at once, for example, when you want to write the contents +of some file to the socket you might not want to read the whole file into +memory and push it into the queue, but instead only read more data from +the file when the write queue becomes empty. + +=item timeout => $fractional_seconds + +If non-zero, then this enables an "inactivity" timeout: whenever this many +seconds pass without a successful read or write on the underlying file +handle, the C callback will be invoked (and if that one is +missing, an C error will be raised). + +Note that timeout processing is also active when you currently do not have +any outstanding read or write requests: If you plan to keep the connection +idle then you should disable the timout temporarily or ignore the timeout +in the C callback. + +Zero (the default) disables this timeout. + +=item on_timeout => $cb->($handle) + +Called whenever the inactivity timeout passes. If you return from this +callback, then the timeout will be reset as if some activity had happened, +so this condition is not fatal in any way. + =item rbuf_max => If defined, then a fatal error will be raised (with C<$!> set to C) @@ -134,10 +164,34 @@ amount of data without a callback ever being called as long as the line isn't finished). +=item autocork => + +When disabled (the default), then C will try to immediately +write the data to the handle if possible. This avoids having to register +a write watcher and wait for the next event loop iteration, but can be +inefficient if you write multiple small chunks (this disadvantage is +usually avoided by your kernel's nagle algorithm, see C). + +When enabled, then writes will always be queued till the next event loop +iteration. This is efficient when you do many small writes per iteration, +but less efficient when you do a single write only. + +=item no_delay => + +When doing small writes on sockets, your operating system kernel might +wait a bit for more data before actually sending it out. This is called +the Nagle algorithm, and usually it is beneficial. + +In some situations you want as low a delay as possible, which cna be +accomplishd by setting this option to true. + +The default is your opertaing system's default behaviour, this option +explicitly enables or disables it, if possible. + =item read_size => The default read block size (the amount of bytes this module will try to read -on each [loop iteration). Default: C<4096>. +during each (loop iteration). Default: C<8192>. =item low_water_mark => @@ -145,6 +199,17 @@ buffer: If the write reaches this size or gets even samller it is considered empty. +=item linger => + +If non-zero (default: C<3600>), then the destructor of the +AnyEvent::Handle object will check wether there is still outstanding write +data and will install a watcher that will write out this data. No errors +will be reported (this mostly matches how the operating system treats +outstanding data at socket close time). + +This will not work for partial TLS data that could not yet been +encoded. This data will be lost. + =item tls => "accept" | "connect" | Net::SSLeay::SSL object When this parameter is given, it enables TLS (SSL) mode, that means it @@ -162,7 +227,7 @@ or C on it before you pass it to AnyEvent::Handle. -See the C method if you need to start TLs negotiation later. +See the C method if you need to start TLS negotiation later. =item tls_ctx => $ssl_ctx @@ -170,6 +235,22 @@ (unless a connection object was specified directly). If this parameter is missing, then AnyEvent::Handle will use C. +=item json => JSON or JSON::XS object + +This is the json coder object used by the C read and write types. + +If you don't supply it, then AnyEvent::Handle will create and use a +suitable one, which will write and expect UTF-8 encoded JSON texts. + +Note that you are responsible to depend on the JSON module if you want to +use this functionality, as AnyEvent does not have a dependency itself. + +=item filter_r => $cb + +=item filter_w => $cb + +These exist, but are undocumented at this time. + =back =cut @@ -188,12 +269,14 @@ $self->starttls (delete $self->{tls}, delete $self->{tls_ctx}); } - $self->on_eof (delete $self->{on_eof} ) if $self->{on_eof}; - $self->on_error (delete $self->{on_error}) if $self->{on_error}; - $self->on_drain (delete $self->{on_drain}) if $self->{on_drain}; - $self->on_read (delete $self->{on_read} ) if $self->{on_read}; + $self->{_activity} = AnyEvent->now; + $self->_timeout; + + $self->on_drain (delete $self->{on_drain}) if exists $self->{on_drain}; + $self->no_delay (delete $self->{no_delay}) if exists $self->{no_delay}; - $self->start_read; + $self->start_read + if $self->{on_read}; $self } @@ -201,23 +284,27 @@ sub _shutdown { my ($self) = @_; - delete $self->{rw}; - delete $self->{ww}; + delete $self->{_tw}; + delete $self->{_rw}; + delete $self->{_ww}; delete $self->{fh}; + + $self->stoptls; } -sub error { - my ($self) = @_; +sub _error { + my ($self, $errno, $fatal) = @_; - { - local $!; - $self->_shutdown; - } + $self->_shutdown + if $fatal; - $self->{on_error}($self) - if $self->{on_error}; + $! = $errno; - Carp::croak "AnyEvent::Handle uncaught fatal error: $!"; + if ($self->{on_error}) { + $self->{on_error}($self, $fatal); + } else { + Carp::croak "AnyEvent::Handle uncaught error: $!"; + } } =item $fh = $handle->fh @@ -226,7 +313,7 @@ =cut -sub fh { $_[0]->{fh} } +sub fh { $_[0]{fh} } =item $handle->on_error ($cb) @@ -248,6 +335,96 @@ $_[0]{on_eof} = $_[1]; } +=item $handle->on_timeout ($cb) + +Replace the current C callback, or disables the callback +(but not the timeout) if C<$cb> = C. See C constructor +argument. + +=cut + +sub on_timeout { + $_[0]{on_timeout} = $_[1]; +} + +=item $handle->autocork ($boolean) + +Enables or disables the current autocork behaviour (see C +constructor argument). + +=cut + +=item $handle->no_delay ($boolean) + +Enables or disables the C setting (see constructor argument of +the same name for details). + +=cut + +sub no_delay { + $_[0]{no_delay} = $_[1]; + + eval { + local $SIG{__DIE__}; + setsockopt $_[0]{fh}, &Socket::IPPROTO_TCP, &Socket::TCP_NODELAY, int $_[1]; + }; +} + +############################################################################# + +=item $handle->timeout ($seconds) + +Configures (or disables) the inactivity timeout. + +=cut + +sub timeout { + my ($self, $timeout) = @_; + + $self->{timeout} = $timeout; + $self->_timeout; +} + +# reset the timeout watcher, as neccessary +# also check for time-outs +sub _timeout { + my ($self) = @_; + + if ($self->{timeout}) { + my $NOW = AnyEvent->now; + + # when would the timeout trigger? + my $after = $self->{_activity} + $self->{timeout} - $NOW; + + # now or in the past already? + if ($after <= 0) { + $self->{_activity} = $NOW; + + if ($self->{on_timeout}) { + $self->{on_timeout}($self); + } else { + $self->_error (&Errno::ETIMEDOUT); + } + + # callback could have changed timeout value, optimise + return unless $self->{timeout}; + + # calculate new after + $after = $self->{timeout}; + } + + Scalar::Util::weaken $self; + return unless $self; # ->error could have destroyed $self + + $self->{_tw} ||= AnyEvent->timer (after => $after, cb => sub { + delete $self->{_tw}; + $self->_timeout; + }); + } else { + delete $self->{_tw}; + } +} + ############################################################################# =back @@ -292,7 +469,7 @@ sub _drain_wbuf { my ($self) = @_; - if (!$self->{ww} && length $self->{wbuf}) { + if (!$self->{_ww} && length $self->{wbuf}) { Scalar::Util::weaken $self; @@ -302,21 +479,23 @@ if ($len >= 0) { substr $self->{wbuf}, 0, $len, ""; + $self->{_activity} = AnyEvent->now; + $self->{on_drain}($self) if $self->{low_water_mark} >= length $self->{wbuf} && $self->{on_drain}; - delete $self->{ww} unless length $self->{wbuf}; - } elsif ($! != EAGAIN && $! != EINTR && $! != WSAWOULDBLOCK) { - $self->error; + delete $self->{_ww} unless length $self->{wbuf}; + } elsif ($! != EAGAIN && $! != EINTR && $! != WSAEWOULDBLOCK) { + $self->_error ($!, 1); } }; # try to write data immediately - $cb->(); + $cb->() unless $self->{autocork}; # if still data left in wbuf, we need to poll - $self->{ww} = AnyEvent->io (fh => $self->{fh}, poll => "w", cb => $cb) + $self->{_ww} = AnyEvent->io (fh => $self->{fh}, poll => "w", cb => $cb) if length $self->{wbuf}; }; } @@ -338,7 +517,7 @@ } if ($self->{filter_w}) { - $self->{filter_w}->($self, \$_[0]); + $self->{filter_w}($self, \$_[0]); } else { $self->{wbuf} .= $_[0]; $self->_drain_wbuf; @@ -347,8 +526,6 @@ =item $handle->push_write (type => @args) -=item $handle->unshift_write (type => @args) - Instead of formatting your data yourself, you can also let this module do the job by specifying a type and type-specific arguments. @@ -362,8 +539,6 @@ Formats the given value as netstring (http://cr.yp.to/proto/netstrings.txt, this is not a recommendation to use them). -=back - =cut register_write_type netstring => sub { @@ -372,7 +547,80 @@ sprintf "%d:%s,", (length $string), $string }; -=item AnyEvent::Handle::register_write_type type => $coderef->($self, @args) +=item packstring => $format, $data + +An octet string prefixed with an encoded length. The encoding C<$format> +uses the same format as a Perl C format, but must specify a single +integer only (only one of C is allowed, plus an +optional C, C<< < >> or C<< > >> modifier). + +=cut + +register_write_type packstring => sub { + my ($self, $format, $string) = @_; + + pack "$format/a*", $string +}; + +=item json => $array_or_hashref + +Encodes the given hash or array reference into a JSON object. Unless you +provide your own JSON object, this means it will be encoded to JSON text +in UTF-8. + +JSON objects (and arrays) are self-delimiting, so you can write JSON at +one end of a handle and read them at the other end without using any +additional framing. + +The generated JSON text is guaranteed not to contain any newlines: While +this module doesn't need delimiters after or between JSON texts to be +able to read them, many other languages depend on that. + +A simple RPC protocol that interoperates easily with others is to send +JSON arrays (or objects, although arrays are usually the better choice as +they mimic how function argument passing works) and a newline after each +JSON text: + + $handle->push_write (json => ["method", "arg1", "arg2"]); # whatever + $handle->push_write ("\012"); + +An AnyEvent::Handle receiver would simply use the C read type and +rely on the fact that the newline will be skipped as leading whitespace: + + $handle->push_read (json => sub { my $array = $_[1]; ... }); + +Other languages could read single lines terminated by a newline and pass +this line into their JSON decoder of choice. + +=cut + +register_write_type json => sub { + my ($self, $ref) = @_; + + require JSON; + + $self->{json} ? $self->{json}->encode ($ref) + : JSON::encode_json ($ref) +}; + +=item storable => $reference + +Freezes the given reference using L and writes it to the +handle. Uses the C format. + +=cut + +register_write_type storable => sub { + my ($self, $ref) = @_; + + require Storable; + + pack "w/a*", Storable::nfreeze ($ref) +}; + +=back + +=item AnyEvent::Handle::register_write_type type => $coderef->($handle, @args) This function (not method) lets you add your own types to C. Whenever the given C is used, C will invoke the code @@ -401,13 +649,14 @@ In the simple case, you just install an C callback and whenever new data arrives, it will be called. You can then remove some data (if -enough is there) from the read buffer (C<< $handle->rbuf >>) if you want -or not. +enough is there) from the read buffer (C<< $handle->rbuf >>). Or you cna +leave the data there if you want to accumulate more (e.g. when only a +partial message has been received so far). In the more complex case, you want to queue multiple callbacks. In this case, AnyEvent::Handle will call the first queued callback each time new -data arrives and removes it when it has done its job (see C, -below). +data arrives (also the first time it is queued) and removes it when it has +done its job (see C, below). This way you can, for example, push three line-reads, followed by reading a chunk of data, and AnyEvent::Handle will execute them in order. @@ -418,46 +667,50 @@ # in the default state, expect some header bytes $handle->on_read (sub { # some data is here, now queue the length-header-read (4 octets) - shift->unshift_read_chunk (4, sub { + shift->unshift_read (chunk => 4, sub { # header arrived, decode my $len = unpack "N", $_[1]; # now read the payload - shift->unshift_read_chunk ($len, sub { + shift->unshift_read (chunk => $len, sub { my $xml = $_[1]; # handle xml }); }); }); -Example 2: Implement a client for a protocol that replies either with -"OK" and another line or "ERROR" for one request, and 64 bytes for the -second request. Due tot he availability of a full queue, we can just -pipeline sending both requests and manipulate the queue as necessary in -the callbacks: +Example 2: Implement a client for a protocol that replies either with "OK" +and another line or "ERROR" for the first request that is sent, and 64 +bytes for the second request. Due to the availability of a queue, we can +just pipeline sending both requests and manipulate the queue as necessary +in the callbacks. + +When the first callback is called and sees an "OK" response, it will +C another line-read. This line-read will be queued I the +64-byte chunk callback. - # request one + # request one, returns either "OK + extra line" or "ERROR" $handle->push_write ("request 1\015\012"); # we expect "ERROR" or "OK" as response, so push a line read - $handle->push_read_line (sub { + $handle->push_read (line => sub { # if we got an "OK", we have to _prepend_ another line, # so it will be read before the second request reads its 64 bytes # which are already in the queue when this callback is called # we don't do this in case we got an error if ($_[1] eq "OK") { - $_[0]->unshift_read_line (sub { + $_[0]->unshift_read (line => sub { my $response = $_[1]; ... }); } }); - # request two + # request two, simply returns 64 octets $handle->push_write ("request 2\015\012"); # simply read 64 bytes, always - $handle->push_read_chunk (64, sub { + $handle->push_read (chunk => 64, sub { my $response = $_[1]; ... }); @@ -469,54 +722,59 @@ sub _drain_rbuf { my ($self) = @_; + local $self->{_in_drain} = 1; + if ( defined $self->{rbuf_max} && $self->{rbuf_max} < length $self->{rbuf} ) { - $! = &Errno::ENOSPC; - $self->error; + return $self->_error (&Errno::ENOSPC, 1); } - return if $self->{in_drain}; - local $self->{in_drain} = 1; + while () { + my $len = length $self->{rbuf}; - while (my $len = length $self->{rbuf}) { - no strict 'refs'; - if (my $cb = shift @{ $self->{queue} }) { + if (my $cb = shift @{ $self->{_queue} }) { unless ($cb->($self)) { - if ($self->{eof}) { + if ($self->{_eof}) { # no progress can be made (not enough data and no data forthcoming) - $! = &Errno::EPIPE; - $self->error; + $self->_error (&Errno::EPIPE, 1), last; } - unshift @{ $self->{queue} }, $cb; - return; + unshift @{ $self->{_queue} }, $cb; + last; } } elsif ($self->{on_read}) { + last unless $len; + $self->{on_read}($self); if ( - $self->{eof} # if no further data will arrive - && $len == length $self->{rbuf} # and no data has been consumed - && !@{ $self->{queue} } # and the queue is still empty - && $self->{on_read} # and we still want to read data + $len == length $self->{rbuf} # if no data has been consumed + && !@{ $self->{_queue} } # and the queue is still empty + && $self->{on_read} # but we still have on_read ) { - # then no progress can be made - $! = &Errno::EPIPE; - $self->error; + # no further data will arrive + # so no progress can be made + $self->_error (&Errno::EPIPE, 1), last + if $self->{_eof}; + + last; # more data might arrive } } else { # read side becomes idle - delete $self->{rw}; - return; + delete $self->{_rw}; + last; } } - if ($self->{eof}) { - $self->_shutdown; - $self->{on_eof}($self) - if $self->{on_eof}; + $self->{on_eof}($self) + if $self->{_eof} && $self->{on_eof}; + + # may need to restart read watcher + unless ($self->{_rw}) { + $self->start_read + if $self->{on_read} || @{ $self->{_queue} }; } } @@ -532,6 +790,7 @@ my ($self, $cb) = @_; $self->{on_read} = $cb; + $self->_drain_rbuf if $cb && !$self->{_in_drain}; } =item $handle->rbuf @@ -589,8 +848,8 @@ ->($self, $cb, @_); } - push @{ $self->{queue} }, $cb; - $self->_drain_rbuf; + push @{ $self->{_queue} }, $cb; + $self->_drain_rbuf unless $self->{_in_drain}; } sub unshift_read { @@ -605,8 +864,8 @@ } - unshift @{ $self->{queue} }, $cb; - $self->_drain_rbuf; + unshift @{ $self->{_queue} }, $cb; + $self->_drain_rbuf unless $self->{_in_drain}; } =item $handle->push_read (type => @args, $cb) @@ -622,7 +881,7 @@ =over 4 -=item chunk => $octets, $cb->($self, $data) +=item chunk => $octets, $cb->($handle, $data) Invoke the callback only once C<$octets> bytes have been read. Pass the data read to the callback. The callback will never be called with less @@ -646,16 +905,7 @@ } }; -# compatibility with older API -sub push_read_chunk { - $_[0]->push_read (chunk => $_[1], $_[2]); -} - -sub unshift_read_chunk { - $_[0]->unshift_read (chunk => $_[1], $_[2]); -} - -=item line => [$eol, ]$cb->($self, $line, $eol) +=item line => [$eol, ]$cb->($handle, $line, $eol) The callback will be called only once a full line (including the end of line marker, C<$eol>) has been read. This line (excluding the end of line @@ -679,68 +929,28 @@ register_read_type line => sub { my ($self, $cb, $eol) = @_; - $eol = qr|(\015?\012)| if @_ < 3; - $eol = quotemeta $eol unless ref $eol; - $eol = qr|^(.*?)($eol)|s; - - sub { - $_[0]{rbuf} =~ s/$eol// or return; + if (@_ < 3) { + # this is more than twice as fast as the generic code below + sub { + $_[0]{rbuf} =~ s/^([^\015\012]*)(\015?\012)// or return; - $cb->($_[0], $1, $2); - 1 - } -}; - -# compatibility with older API -sub push_read_line { - my $self = shift; - $self->push_read (line => @_); -} - -sub unshift_read_line { - my $self = shift; - $self->unshift_read (line => @_); -} - -=item netstring => $cb->($string) - -A netstring (http://cr.yp.to/proto/netstrings.txt, this is not an endorsement). - -Throws an error with C<$!> set to EBADMSG on format violations. - -=cut - -register_read_type netstring => sub { - my ($self, $cb) = @_; - - sub { - unless ($_[0]{rbuf} =~ s/^(0|[1-9][0-9]*)://) { - if ($_[0]{rbuf} =~ /[^0-9]/) { - $! = &Errno::EBADMSG; - $self->error; - } - return; + $cb->($_[0], $1, $2); + 1 } + } else { + $eol = quotemeta $eol unless ref $eol; + $eol = qr|^(.*?)($eol)|s; - my $len = $1; - - $self->unshift_read (chunk => $len, sub { - my $string = $_[1]; - $_[0]->unshift_read (chunk => 1, sub { - if ($_[1] eq ",") { - $cb->($_[0], $string); - } else { - $! = &Errno::EBADMSG; - $self->error; - } - }); - }); + sub { + $_[0]{rbuf} =~ s/$eol// or return; - 1 + $cb->($_[0], $1, $2); + 1 + } } }; -=item regex => $accept[, $reject[, $skip], $cb->($data) +=item regex => $accept[, $reject[, $skip], $cb->($handle, $data) Makes a regex match against the regex object C<$accept> and returns everything up to and including the match. @@ -798,8 +1008,7 @@ # reject if ($reject && $$rbuf =~ $reject) { - $! = &Errno::EBADMSG; - $self->error; + $self->_error (&Errno::EBADMSG); } # skip @@ -811,9 +1020,179 @@ } }; +=item netstring => $cb->($handle, $string) + +A netstring (http://cr.yp.to/proto/netstrings.txt, this is not an endorsement). + +Throws an error with C<$!> set to EBADMSG on format violations. + +=cut + +register_read_type netstring => sub { + my ($self, $cb) = @_; + + sub { + unless ($_[0]{rbuf} =~ s/^(0|[1-9][0-9]*)://) { + if ($_[0]{rbuf} =~ /[^0-9]/) { + $self->_error (&Errno::EBADMSG); + } + return; + } + + my $len = $1; + + $self->unshift_read (chunk => $len, sub { + my $string = $_[1]; + $_[0]->unshift_read (chunk => 1, sub { + if ($_[1] eq ",") { + $cb->($_[0], $string); + } else { + $self->_error (&Errno::EBADMSG); + } + }); + }); + + 1 + } +}; + +=item packstring => $format, $cb->($handle, $string) + +An octet string prefixed with an encoded length. The encoding C<$format> +uses the same format as a Perl C format, but must specify a single +integer only (only one of C is allowed, plus an +optional C, C<< < >> or C<< > >> modifier). + +DNS over TCP uses a prefix of C, EPP uses a prefix of C. + +Example: read a block of data prefixed by its length in BER-encoded +format (very efficient). + + $handle->push_read (packstring => "w", sub { + my ($handle, $data) = @_; + }); + +=cut + +register_read_type packstring => sub { + my ($self, $cb, $format) = @_; + + sub { + # when we can use 5.10 we can use ".", but for 5.8 we use the re-pack method + defined (my $len = eval { unpack $format, $_[0]{rbuf} }) + or return; + + $format = length pack $format, $len; + + # bypass unshift if we already have the remaining chunk + if ($format + $len <= length $_[0]{rbuf}) { + my $data = substr $_[0]{rbuf}, $format, $len; + substr $_[0]{rbuf}, 0, $format + $len, ""; + $cb->($_[0], $data); + } else { + # remove prefix + substr $_[0]{rbuf}, 0, $format, ""; + + # read remaining chunk + $_[0]->unshift_read (chunk => $len, $cb); + } + + 1 + } +}; + +=item json => $cb->($handle, $hash_or_arrayref) + +Reads a JSON object or array, decodes it and passes it to the callback. + +If a C object was passed to the constructor, then that will be used +for the final decode, otherwise it will create a JSON coder expecting UTF-8. + +This read type uses the incremental parser available with JSON version +2.09 (and JSON::XS version 2.2) and above. You have to provide a +dependency on your own: this module will load the JSON module, but +AnyEvent does not depend on it itself. + +Since JSON texts are fully self-delimiting, the C read and write +types are an ideal simple RPC protocol: just exchange JSON datagrams. See +the C write type description, above, for an actual example. + +=cut + +register_read_type json => sub { + my ($self, $cb) = @_; + + require JSON; + + my $data; + my $rbuf = \$self->{rbuf}; + + my $json = $self->{json} ||= JSON->new->utf8; + + sub { + my $ref = $json->incr_parse ($self->{rbuf}); + + if ($ref) { + $self->{rbuf} = $json->incr_text; + $json->incr_text = ""; + $cb->($self, $ref); + + 1 + } else { + $self->{rbuf} = ""; + () + } + } +}; + +=item storable => $cb->($handle, $ref) + +Deserialises a L frozen representation as written by the +C write type (BER-encoded length prefix followed by nfreeze'd +data). + +Raises C error if the data could not be decoded. + +=cut + +register_read_type storable => sub { + my ($self, $cb) = @_; + + require Storable; + + sub { + # when we can use 5.10 we can use ".", but for 5.8 we use the re-pack method + defined (my $len = eval { unpack "w", $_[0]{rbuf} }) + or return; + + my $format = length pack "w", $len; + + # bypass unshift if we already have the remaining chunk + if ($format + $len <= length $_[0]{rbuf}) { + my $data = substr $_[0]{rbuf}, $format, $len; + substr $_[0]{rbuf}, 0, $format + $len, ""; + $cb->($_[0], Storable::thaw ($data)); + } else { + # remove prefix + substr $_[0]{rbuf}, 0, $format, ""; + + # read remaining chunk + $_[0]->unshift_read (chunk => $len, sub { + if (my $ref = eval { Storable::thaw ($_[1]) }) { + $cb->($_[0], $ref); + } else { + $self->_error (&Errno::EBADMSG); + } + }); + } + + 1 + } +}; + =back -=item AnyEvent::Handle::register_read_type type => $coderef->($self, $cb, @args) +=item AnyEvent::Handle::register_read_type type => $coderef->($handle, $cb, @args) This function (not method) lets you add your own types to C. @@ -825,7 +1204,7 @@ that works as a plain read callback (see C<< ->push_read ($cb) >>). It should invoke the passed callback when it is done reading (remember to -pass C<$self> as first argument as all other callbacks do that). +pass C<$handle> as first argument as all other callbacks do that). Note that this is a function, and all types registered this way will be global, so try to use unique names. @@ -838,40 +1217,47 @@ =item $handle->start_read In rare cases you actually do not want to read anything from the -socket. In this case you can call C. Neither C no +socket. In this case you can call C. Neither C nor any queued callbacks will be executed then. To start reading again, call C. +Note that AnyEvent::Handle will automatically C for you when +you change the C callback or push/unshift a read callback, and it +will automatically C for you when neither C is set nor +there are any read requests in the queue. + =cut sub stop_read { my ($self) = @_; - delete $self->{rw}; + delete $self->{_rw}; } sub start_read { my ($self) = @_; - unless ($self->{rw} || $self->{eof}) { + unless ($self->{_rw} || $self->{_eof}) { Scalar::Util::weaken $self; - $self->{rw} = AnyEvent->io (fh => $self->{fh}, poll => "r", cb => sub { + $self->{_rw} = AnyEvent->io (fh => $self->{fh}, poll => "r", cb => sub { my $rbuf = $self->{filter_r} ? \my $buf : \$self->{rbuf}; my $len = sysread $self->{fh}, $$rbuf, $self->{read_size} || 8192, length $$rbuf; if ($len > 0) { + $self->{_activity} = AnyEvent->now; + $self->{filter_r} - ? $self->{filter_r}->($self, $rbuf) - : $self->_drain_rbuf; + ? $self->{filter_r}($self, $rbuf) + : $self->{_in_drain} || $self->_drain_rbuf; } elsif (defined $len) { - delete $self->{rw}; - $self->{eof} = 1; - $self->_drain_rbuf; + delete $self->{_rw}; + $self->{_eof} = 1; + $self->_drain_rbuf unless $self->{_in_drain}; - } elsif ($! != EAGAIN && $! != EINTR && $! != &AnyEvent::Util::WSAWOULDBLOCK) { - return $self->error; + } elsif ($! != EAGAIN && $! != EINTR && $! != WSAEWOULDBLOCK) { + return $self->_error ($!, 1); } }); } @@ -880,30 +1266,38 @@ sub _dotls { my ($self) = @_; - if (length $self->{tls_wbuf}) { - while ((my $len = Net::SSLeay::write ($self->{tls}, $self->{tls_wbuf})) > 0) { - substr $self->{tls_wbuf}, 0, $len, ""; + my $buf; + + if (length $self->{_tls_wbuf}) { + while ((my $len = Net::SSLeay::write ($self->{tls}, $self->{_tls_wbuf})) > 0) { + substr $self->{_tls_wbuf}, 0, $len, ""; } } - if (defined (my $buf = Net::SSLeay::BIO_read ($self->{tls_wbio}))) { + if (length ($buf = Net::SSLeay::BIO_read ($self->{_wbio}))) { $self->{wbuf} .= $buf; $self->_drain_wbuf; } - while (defined (my $buf = Net::SSLeay::read ($self->{tls}))) { - $self->{rbuf} .= $buf; - $self->_drain_rbuf; + while (defined ($buf = Net::SSLeay::read ($self->{tls}))) { + if (length $buf) { + $self->{rbuf} .= $buf; + $self->_drain_rbuf unless $self->{_in_drain}; + } else { + # let's treat SSL-eof as we treat normal EOF + $self->{_eof} = 1; + $self->_shutdown; + return; + } } my $err = Net::SSLeay::get_error ($self->{tls}, -1); if ($err!= Net::SSLeay::ERROR_WANT_READ ()) { if ($err == Net::SSLeay::ERROR_SYSCALL ()) { - $self->error; + return $self->_error ($!, 1); } elsif ($err == Net::SSLeay::ERROR_SSL ()) { - $! = &Errno::EIO; - $self->error; + return $self->_error (&Errno::EIO, 1); } # all others are fine for our purposes @@ -922,9 +1316,12 @@ The second argument is the optional C object that is used when AnyEvent::Handle has to create its own TLS connection object. +The TLS connection object will end up in C<< $handle->{tls} >> after this +call and can be used or changed to your liking. Note that the handshake +might have already started when this function returns. + =cut -# TODO: maybe document... sub starttls { my ($self, $ssl, $ctx) = @_; @@ -949,17 +1346,17 @@ (eval { local $SIG{__DIE__}; Net::SSLeay::MODE_ENABLE_PARTIAL_WRITE () } || 1) | (eval { local $SIG{__DIE__}; Net::SSLeay::MODE_ACCEPT_MOVING_WRITE_BUFFER () } || 2)); - $self->{tls_rbio} = Net::SSLeay::BIO_new (Net::SSLeay::BIO_s_mem ()); - $self->{tls_wbio} = Net::SSLeay::BIO_new (Net::SSLeay::BIO_s_mem ()); + $self->{_rbio} = Net::SSLeay::BIO_new (Net::SSLeay::BIO_s_mem ()); + $self->{_wbio} = Net::SSLeay::BIO_new (Net::SSLeay::BIO_s_mem ()); - Net::SSLeay::set_bio ($ssl, $self->{tls_rbio}, $self->{tls_wbio}); + Net::SSLeay::set_bio ($ssl, $self->{_rbio}, $self->{_wbio}); $self->{filter_w} = sub { - $_[0]{tls_wbuf} .= ${$_[1]}; + $_[0]{_tls_wbuf} .= ${$_[1]}; &_dotls; }; $self->{filter_r} = sub { - Net::SSLeay::BIO_write ($_[0]{tls_rbio}, ${$_[1]}); + Net::SSLeay::BIO_write ($_[0]{_rbio}, ${$_[1]}); &_dotls; }; } @@ -975,9 +1372,10 @@ my ($self) = @_; Net::SSLeay::free (delete $self->{tls}) if $self->{tls}; - delete $self->{tls_rbio}; - delete $self->{tls_wbio}; - delete $self->{tls_wbuf}; + + delete $self->{_rbio}; + delete $self->{_wbio}; + delete $self->{_tls_wbuf}; delete $self->{filter_r}; delete $self->{filter_w}; } @@ -986,6 +1384,28 @@ my $self = shift; $self->stoptls; + + my $linger = exists $self->{linger} ? $self->{linger} : 3600; + + if ($linger && length $self->{wbuf}) { + my $fh = delete $self->{fh}; + my $wbuf = delete $self->{wbuf}; + + my @linger; + + push @linger, AnyEvent->io (fh => $fh, poll => "w", cb => sub { + my $len = syswrite $fh, $wbuf, length $wbuf; + + if ($len > 0) { + substr $wbuf, 0, $len, ""; + } else { + @linger = (); # end + } + }); + push @linger, AnyEvent->timer (after => $linger, cb => sub { + @linger = (); + }); + } } =item AnyEvent::Handle::TLS_CTX @@ -1025,6 +1445,35 @@ =back +=head1 SUBCLASSING AnyEvent::Handle + +In many cases, you might want to subclass AnyEvent::Handle. + +To make this easier, a given version of AnyEvent::Handle uses these +conventions: + +=over 4 + +=item * all constructor arguments become object members. + +At least initially, when you pass a C-argument to the constructor it +will end up in C<< $handle->{tls} >>. Those members might be changed or +mutated later on (for example C will hold the TLS connection object). + +=item * other object member names are prefixed with an C<_>. + +All object members not explicitly documented (internal use) are prefixed +with an underscore character, so the remaining non-C<_>-namespace is free +for use for subclasses. + +=item * all members not documented here and not prefixed with an underscore +are free to use in subclasses. + +Of course, new versions of AnyEvent::Handle may introduce more "public" +member variables, but thats just life, at least it is documented. + +=back + =head1 AUTHOR Robin Redeker C<< >>, Marc Lehmann .