--- AnyEvent/lib/AnyEvent/Handle.pm 2008/05/27 05:47:36 1.41 +++ AnyEvent/lib/AnyEvent/Handle.pm 2008/10/01 07:40:39 1.91 @@ -1,14 +1,14 @@ package AnyEvent::Handle; no warnings; -use strict; +use strict qw(subs vars); use AnyEvent (); -use AnyEvent::Util qw(WSAWOULDBLOCK); +use AnyEvent::Util qw(WSAEWOULDBLOCK); use Scalar::Util (); use Carp (); use Fcntl (); -use Errno qw/EAGAIN EINTR/; +use Errno qw(EAGAIN EINTR); =head1 NAME @@ -16,7 +16,7 @@ =cut -our $VERSION = '0.04'; +our $VERSION = 4.3; =head1 SYNOPSIS @@ -51,6 +51,9 @@ filehandles. For utility functions for doing non-blocking connects and accepts on sockets see L. +The L tutorial contains some well-documented +AnyEvent::Handle examples. + In the following, when the documentation refers to of "bytes" then this means characters. As sysread and syswrite are used for all I/O, their treatment of characters applies to this module as well. @@ -58,6 +61,14 @@ All callbacks will be invoked with the handle object as their first argument. +=head2 SIGPIPE is not handled by this module + +SIGPIPE is not handled by this module, so one of the practical +requirements of using it is to ignore SIGPIPE (C<$SIG{PIPE} = +'IGNORE'>). At least, this is highly recommend in a networked program: If +you use AnyEvent::Handle in a filter program (like sort), exiting on +SIGPIPE is probably the right thing to do. + =head1 METHODS =over 4 @@ -72,40 +83,58 @@ The filehandle this L object will operate on. -NOTE: The filehandle will be set to non-blocking (using -AnyEvent::Util::fh_nonblocking). +NOTE: The filehandle will be set to non-blocking mode (using +C) by the constructor and needs to stay in +that mode. =item on_eof => $cb->($handle) -Set the callback to be called on EOF. +Set the callback to be called when an end-of-file condition is detected, +i.e. in the case of a socket, when the other side has closed the +connection cleanly. + +For sockets, this just means that the other side has stopped sending data, +you can still try to write data, and, in fact, one can return from the eof +callback and continue writing data, as only the read part has been shut +down. -While not mandatory, it is highly recommended to set an eof callback, +While not mandatory, it is I recommended to set an eof callback, otherwise you might end up with a closed socket while you are still waiting for data. -=item on_error => $cb->($handle) +If an EOF condition has been detected but no C callback has been +set, then a fatal error will be raised with C<$!> set to <0>. -This is the fatal error callback, that is called when, well, a fatal error -occurs, such as not being able to resolve the hostname, failure to connect -or a read error. +=item on_error => $cb->($handle, $fatal) -The object will not be in a usable state when this callback has been -called. +This is the error callback, which is called when, well, some error +occured, such as not being able to resolve the hostname, failure to +connect or a read error. + +Some errors are fatal (which is indicated by C<$fatal> being true). On +fatal errors the handle object will be shut down and will not be usable +(but you are free to look at the current C<< ->rbuf >>). Examples of fatal +errors are an EOF condition with active (but unsatisifable) read watchers +(C) or I/O errors. + +Non-fatal errors can be retried by simply returning, but it is recommended +to simply ignore this parameter and instead abondon the handle object +when this callback is invoked. Examples of non-fatal errors are timeouts +C) or badly-formatted data (C). On callback entrance, the value of C<$!> contains the operating system -error (or C, C or C). - -The callback should throw an exception. If it returns, then -AnyEvent::Handle will C for you. +error (or C, C, C or C). While not mandatory, it is I recommended to set this callback, as you will not be notified of errors otherwise. The default simply calls -die. +C. =item on_read => $cb->($handle) This sets the default read callback, which is called when data arrives -and no read request is in the queue. +and no read request is in the queue (unlike read queue callbacks, this +callback will only be called when at least one octet of data is in the +read buffer). To access (and remove data from) the read buffer, use the C<< ->rbuf >> method or access the C<$handle->{rbuf}> member directly. @@ -122,11 +151,38 @@ To append to the write buffer, use the C<< ->push_write >> method. +This callback is useful when you don't want to put all of your write data +into the queue at once, for example, when you want to write the contents +of some file to the socket you might not want to read the whole file into +memory and push it into the queue, but instead only read more data from +the file when the write queue becomes empty. + +=item timeout => $fractional_seconds + +If non-zero, then this enables an "inactivity" timeout: whenever this many +seconds pass without a successful read or write on the underlying file +handle, the C callback will be invoked (and if that one is +missing, a non-fatal C error will be raised). + +Note that timeout processing is also active when you currently do not have +any outstanding read or write requests: If you plan to keep the connection +idle then you should disable the timout temporarily or ignore the timeout +in the C callback, in which case AnyEvent::Handle will simply +restart the timeout. + +Zero (the default) disables this timeout. + +=item on_timeout => $cb->($handle) + +Called whenever the inactivity timeout passes. If you return from this +callback, then the timeout will be reset as if some activity had happened, +so this condition is not fatal in any way. + =item rbuf_max => If defined, then a fatal error will be raised (with C<$!> set to C) when the read buffer ever (strictly) exceeds this size. This is useful to -avoid denial-of-service attacks. +avoid some forms of denial-of-service attacks. For example, a server accepting connections from untrusted sources should be configured to accept only so-and-so much data that it cannot act on @@ -134,10 +190,37 @@ amount of data without a callback ever being called as long as the line isn't finished). +=item autocork => + +When disabled (the default), then C will try to immediately +write the data to the handle, if possible. This avoids having to register +a write watcher and wait for the next event loop iteration, but can +be inefficient if you write multiple small chunks (on the wire, this +disadvantage is usually avoided by your kernel's nagle algorithm, see +C, but this option can save costly syscalls). + +When enabled, then writes will always be queued till the next event loop +iteration. This is efficient when you do many small writes per iteration, +but less efficient when you do a single write only per iteration (or when +the write buffer often is full). It also increases write latency. + +=item no_delay => + +When doing small writes on sockets, your operating system kernel might +wait a bit for more data before actually sending it out. This is called +the Nagle algorithm, and usually it is beneficial. + +In some situations you want as low a delay as possible, which can be +accomplishd by setting this option to a true value. + +The default is your opertaing system's default behaviour (most likely +enabled), this option explicitly enables or disables it, if possible. + =item read_size => -The default read block size (the amount of bytes this module will try to read -on each [loop iteration). Default: C<4096>. +The default read block size (the amount of bytes this module will +try to read during each loop iteration, which affects memory +requirements). Default: C<8192>. =item low_water_mark => @@ -145,28 +228,47 @@ buffer: If the write reaches this size or gets even samller it is considered empty. +Sometimes it can be beneficial (for performance reasons) to add data to +the write buffer before it is fully drained, but this is a rare case, as +the operating system kernel usually buffers data as well, so the default +is good in almost all cases. + +=item linger => + +If non-zero (default: C<3600>), then the destructor of the +AnyEvent::Handle object will check whether there is still outstanding +write data and will install a watcher that will write this data to the +socket. No errors will be reported (this mostly matches how the operating +system treats outstanding data at socket close time). + +This will not work for partial TLS data that could not be encoded +yet. This data will be lost. + =item tls => "accept" | "connect" | Net::SSLeay::SSL object -When this parameter is given, it enables TLS (SSL) mode, that means it -will start making tls handshake and will transparently encrypt/decrypt -data. +When this parameter is given, it enables TLS (SSL) mode, that means +AnyEvent will start a TLS handshake as soon as the conenction has been +established and will transparently encrypt/decrypt data afterwards. TLS mode requires Net::SSLeay to be installed (it will be loaded -automatically when you try to create a TLS handle). - -For the TLS server side, use C, and for the TLS client side of a -connection, use C mode. +automatically when you try to create a TLS handle): this module doesn't +have a dependency on that module, so if your module requires it, you have +to add the dependency yourself. + +Unlike TCP, TLS has a server and client side: for the TLS server side, use +C, and for the TLS client side of a connection, use C +mode. You can also provide your own TLS connection object, but you have to make sure that you call either C or C on it before you pass it to AnyEvent::Handle. -See the C method if you need to start TLs negotiation later. +See the C<< ->starttls >> method for when need to start TLS negotiation later. =item tls_ctx => $ssl_ctx -Use the given Net::SSLeay::CTX object to create the new TLS connection +Use the given C object to create the new TLS connection (unless a connection object was specified directly). If this parameter is missing, then AnyEvent::Handle will use C. @@ -175,7 +277,8 @@ This is the json coder object used by the C read and write types. If you don't supply it, then AnyEvent::Handle will create and use a -suitable one, which will write and expect UTF-8 encoded JSON texts. +suitable one (on demand), which will write and expect UTF-8 encoded JSON +texts. Note that you are responsible to depend on the JSON module if you want to use this functionality, as AnyEvent does not have a dependency itself. @@ -184,7 +287,8 @@ =item filter_w => $cb -These exist, but are undocumented at this time. +These exist, but are undocumented at this time. (They are used internally +by the TLS code). =back @@ -204,12 +308,14 @@ $self->starttls (delete $self->{tls}, delete $self->{tls_ctx}); } - $self->on_eof (delete $self->{on_eof} ) if $self->{on_eof}; - $self->on_error (delete $self->{on_error}) if $self->{on_error}; - $self->on_drain (delete $self->{on_drain}) if $self->{on_drain}; - $self->on_read (delete $self->{on_read} ) if $self->{on_read}; + $self->{_activity} = AnyEvent->now; + $self->_timeout; + + $self->on_drain (delete $self->{on_drain}) if exists $self->{on_drain}; + $self->no_delay (delete $self->{no_delay}) if exists $self->{no_delay}; - $self->start_read; + $self->start_read + if $self->{on_read}; $self } @@ -217,28 +323,35 @@ sub _shutdown { my ($self) = @_; + delete $self->{_tw}; delete $self->{_rw}; delete $self->{_ww}; delete $self->{fh}; + + $self->stoptls; + + delete $self->{on_read}; + delete $self->{_queue}; } -sub error { - my ($self) = @_; +sub _error { + my ($self, $errno, $fatal) = @_; - { - local $!; - $self->_shutdown; - } + $self->_shutdown + if $fatal; - $self->{on_error}($self) - if $self->{on_error}; + $! = $errno; - Carp::croak "AnyEvent::Handle uncaught fatal error: $!"; + if ($self->{on_error}) { + $self->{on_error}($self, $fatal); + } else { + Carp::croak "AnyEvent::Handle uncaught error: $!"; + } } =item $fh = $handle->fh -This method returns the file handle of the L object. +This method returns the file handle used to create the L object. =cut @@ -264,6 +377,96 @@ $_[0]{on_eof} = $_[1]; } +=item $handle->on_timeout ($cb) + +Replace the current C callback, or disables the callback (but +not the timeout) if C<$cb> = C. See the C constructor +argument and method. + +=cut + +sub on_timeout { + $_[0]{on_timeout} = $_[1]; +} + +=item $handle->autocork ($boolean) + +Enables or disables the current autocork behaviour (see C +constructor argument). + +=cut + +=item $handle->no_delay ($boolean) + +Enables or disables the C setting (see constructor argument of +the same name for details). + +=cut + +sub no_delay { + $_[0]{no_delay} = $_[1]; + + eval { + local $SIG{__DIE__}; + setsockopt $_[0]{fh}, &Socket::IPPROTO_TCP, &Socket::TCP_NODELAY, int $_[1]; + }; +} + +############################################################################# + +=item $handle->timeout ($seconds) + +Configures (or disables) the inactivity timeout. + +=cut + +sub timeout { + my ($self, $timeout) = @_; + + $self->{timeout} = $timeout; + $self->_timeout; +} + +# reset the timeout watcher, as neccessary +# also check for time-outs +sub _timeout { + my ($self) = @_; + + if ($self->{timeout}) { + my $NOW = AnyEvent->now; + + # when would the timeout trigger? + my $after = $self->{_activity} + $self->{timeout} - $NOW; + + # now or in the past already? + if ($after <= 0) { + $self->{_activity} = $NOW; + + if ($self->{on_timeout}) { + $self->{on_timeout}($self); + } else { + $self->_error (&Errno::ETIMEDOUT); + } + + # callback could have changed timeout value, optimise + return unless $self->{timeout}; + + # calculate new after + $after = $self->{timeout}; + } + + Scalar::Util::weaken $self; + return unless $self; # ->error could have destroyed $self + + $self->{_tw} ||= AnyEvent->timer (after => $after, cb => sub { + delete $self->{_tw}; + $self->_timeout; + }); + } else { + delete $self->{_tw}; + } +} + ############################################################################# =back @@ -318,18 +521,20 @@ if ($len >= 0) { substr $self->{wbuf}, 0, $len, ""; + $self->{_activity} = AnyEvent->now; + $self->{on_drain}($self) if $self->{low_water_mark} >= length $self->{wbuf} && $self->{on_drain}; delete $self->{_ww} unless length $self->{wbuf}; - } elsif ($! != EAGAIN && $! != EINTR && $! != WSAWOULDBLOCK) { - $self->error; + } elsif ($! != EAGAIN && $! != EINTR && $! != WSAEWOULDBLOCK) { + $self->_error ($!, 1); } }; # try to write data immediately - $cb->(); + $cb->() unless $self->{autocork}; # if still data left in wbuf, we need to poll $self->{_ww} = AnyEvent->io (fh => $self->{fh}, poll => "w", cb => $cb) @@ -354,7 +559,7 @@ } if ($self->{filter_w}) { - $self->{filter_w}->($self, \$_[0]); + $self->{filter_w}($self, \$_[0]); } else { $self->{wbuf} .= $_[0]; $self->_drain_wbuf; @@ -363,8 +568,6 @@ =item $handle->push_write (type => @args) -=item $handle->unshift_write (type => @args) - Instead of formatting your data yourself, you can also let this module do the job by specifying a type and type-specific arguments. @@ -378,8 +581,6 @@ Formats the given value as netstring (http://cr.yp.to/proto/netstrings.txt, this is not a recommendation to use them). -=back - =cut register_write_type netstring => sub { @@ -388,6 +589,21 @@ sprintf "%d:%s,", (length $string), $string }; +=item packstring => $format, $data + +An octet string prefixed with an encoded length. The encoding C<$format> +uses the same format as a Perl C format, but must specify a single +integer only (only one of C is allowed, plus an +optional C, C<< < >> or C<< > >> modifier). + +=cut + +register_write_type packstring => sub { + my ($self, $format, $string) = @_; + + pack "$format/a*", $string +}; + =item json => $array_or_hashref Encodes the given hash or array reference into a JSON object. Unless you @@ -429,6 +645,23 @@ : JSON::encode_json ($ref) }; +=item storable => $reference + +Freezes the given reference using L and writes it to the +handle. Uses the C format. + +=cut + +register_write_type storable => sub { + my ($self, $ref) = @_; + + require Storable; + + pack "w/a*", Storable::nfreeze ($ref) +}; + +=back + =item AnyEvent::Handle::register_write_type type => $coderef->($handle, @args) This function (not method) lets you add your own types to C. @@ -458,13 +691,14 @@ In the simple case, you just install an C callback and whenever new data arrives, it will be called. You can then remove some data (if -enough is there) from the read buffer (C<< $handle->rbuf >>) if you want -or not. +enough is there) from the read buffer (C<< $handle->rbuf >>). Or you cna +leave the data there if you want to accumulate more (e.g. when only a +partial message has been received so far). In the more complex case, you want to queue multiple callbacks. In this case, AnyEvent::Handle will call the first queued callback each time new -data arrives and removes it when it has done its job (see C, -below). +data arrives (also the first time it is queued) and removes it when it has +done its job (see C, below). This way you can, for example, push three line-reads, followed by reading a chunk of data, and AnyEvent::Handle will execute them in order. @@ -475,46 +709,50 @@ # in the default state, expect some header bytes $handle->on_read (sub { # some data is here, now queue the length-header-read (4 octets) - shift->unshift_read_chunk (4, sub { + shift->unshift_read (chunk => 4, sub { # header arrived, decode my $len = unpack "N", $_[1]; # now read the payload - shift->unshift_read_chunk ($len, sub { + shift->unshift_read (chunk => $len, sub { my $xml = $_[1]; # handle xml }); }); }); -Example 2: Implement a client for a protocol that replies either with -"OK" and another line or "ERROR" for one request, and 64 bytes for the -second request. Due tot he availability of a full queue, we can just -pipeline sending both requests and manipulate the queue as necessary in -the callbacks: +Example 2: Implement a client for a protocol that replies either with "OK" +and another line or "ERROR" for the first request that is sent, and 64 +bytes for the second request. Due to the availability of a queue, we can +just pipeline sending both requests and manipulate the queue as necessary +in the callbacks. + +When the first callback is called and sees an "OK" response, it will +C another line-read. This line-read will be queued I the +64-byte chunk callback. - # request one + # request one, returns either "OK + extra line" or "ERROR" $handle->push_write ("request 1\015\012"); # we expect "ERROR" or "OK" as response, so push a line read - $handle->push_read_line (sub { + $handle->push_read (line => sub { # if we got an "OK", we have to _prepend_ another line, # so it will be read before the second request reads its 64 bytes # which are already in the queue when this callback is called # we don't do this in case we got an error if ($_[1] eq "OK") { - $_[0]->unshift_read_line (sub { + $_[0]->unshift_read (line => sub { my $response = $_[1]; ... }); } }); - # request two + # request two, simply returns 64 octets $handle->push_write ("request 2\015\012"); # simply read 64 bytes, always - $handle->push_read_chunk (64, sub { + $handle->push_read (chunk => 64, sub { my $response = $_[1]; ... }); @@ -526,54 +764,64 @@ sub _drain_rbuf { my ($self) = @_; + local $self->{_in_drain} = 1; + if ( defined $self->{rbuf_max} && $self->{rbuf_max} < length $self->{rbuf} ) { - $! = &Errno::ENOSPC; - $self->error; + $self->_error (&Errno::ENOSPC, 1), return; } - return if $self->{in_drain}; - local $self->{in_drain} = 1; + while () { + my $len = length $self->{rbuf}; - while (my $len = length $self->{rbuf}) { - no strict 'refs'; if (my $cb = shift @{ $self->{_queue} }) { unless ($cb->($self)) { if ($self->{_eof}) { # no progress can be made (not enough data and no data forthcoming) - $! = &Errno::EPIPE; - $self->error; + $self->_error (&Errno::EPIPE, 1), return; } unshift @{ $self->{_queue} }, $cb; - return; + last; } } elsif ($self->{on_read}) { + last unless $len; + $self->{on_read}($self); if ( - $self->{_eof} # if no further data will arrive - && $len == length $self->{rbuf} # and no data has been consumed - && !@{ $self->{_queue} } # and the queue is still empty - && $self->{on_read} # and we still want to read data + $len == length $self->{rbuf} # if no data has been consumed + && !@{ $self->{_queue} } # and the queue is still empty + && $self->{on_read} # but we still have on_read ) { - # then no progress can be made - $! = &Errno::EPIPE; - $self->error; + # no further data will arrive + # so no progress can be made + $self->_error (&Errno::EPIPE, 1), return + if $self->{_eof}; + + last; # more data might arrive } } else { # read side becomes idle delete $self->{_rw}; - return; + last; } } if ($self->{_eof}) { - $self->_shutdown; - $self->{on_eof}($self) - if $self->{on_eof}; + if ($self->{on_eof}) { + $self->{on_eof}($self) + } else { + $self->_error (0, 1); + } + } + + # may need to restart read watcher + unless ($self->{_rw}) { + $self->start_read + if $self->{on_read} || @{ $self->{_queue} }; } } @@ -589,6 +837,7 @@ my ($self, $cb) = @_; $self->{on_read} = $cb; + $self->_drain_rbuf if $cb && !$self->{_in_drain}; } =item $handle->rbuf @@ -647,7 +896,7 @@ } push @{ $self->{_queue} }, $cb; - $self->_drain_rbuf; + $self->_drain_rbuf unless $self->{_in_drain}; } sub unshift_read { @@ -663,7 +912,7 @@ unshift @{ $self->{_queue} }, $cb; - $self->_drain_rbuf; + $self->_drain_rbuf unless $self->{_in_drain}; } =item $handle->push_read (type => @args, $cb) @@ -703,15 +952,6 @@ } }; -# compatibility with older API -sub push_read_chunk { - $_[0]->push_read (chunk => $_[1], $_[2]); -} - -sub unshift_read_chunk { - $_[0]->unshift_read (chunk => $_[1], $_[2]); -} - =item line => [$eol, ]$cb->($handle, $line, $eol) The callback will be called only once a full line (including the end of @@ -736,64 +976,24 @@ register_read_type line => sub { my ($self, $cb, $eol) = @_; - $eol = qr|(\015?\012)| if @_ < 3; - $eol = quotemeta $eol unless ref $eol; - $eol = qr|^(.*?)($eol)|s; - - sub { - $_[0]{rbuf} =~ s/$eol// or return; - - $cb->($_[0], $1, $2); - 1 - } -}; - -# compatibility with older API -sub push_read_line { - my $self = shift; - $self->push_read (line => @_); -} - -sub unshift_read_line { - my $self = shift; - $self->unshift_read (line => @_); -} - -=item netstring => $cb->($handle, $string) + if (@_ < 3) { + # this is more than twice as fast as the generic code below + sub { + $_[0]{rbuf} =~ s/^([^\015\012]*)(\015?\012)// or return; -A netstring (http://cr.yp.to/proto/netstrings.txt, this is not an endorsement). - -Throws an error with C<$!> set to EBADMSG on format violations. - -=cut - -register_read_type netstring => sub { - my ($self, $cb) = @_; - - sub { - unless ($_[0]{rbuf} =~ s/^(0|[1-9][0-9]*)://) { - if ($_[0]{rbuf} =~ /[^0-9]/) { - $! = &Errno::EBADMSG; - $self->error; - } - return; + $cb->($_[0], $1, $2); + 1 } + } else { + $eol = quotemeta $eol unless ref $eol; + $eol = qr|^(.*?)($eol)|s; - my $len = $1; - - $self->unshift_read (chunk => $len, sub { - my $string = $_[1]; - $_[0]->unshift_read (chunk => 1, sub { - if ($_[1] eq ",") { - $cb->($_[0], $string); - } else { - $! = &Errno::EBADMSG; - $self->error; - } - }); - }); + sub { + $_[0]{rbuf} =~ s/$eol// or return; - 1 + $cb->($_[0], $1, $2); + 1 + } } }; @@ -855,8 +1055,7 @@ # reject if ($reject && $$rbuf =~ $reject) { - $! = &Errno::EBADMSG; - $self->error; + $self->_error (&Errno::EBADMSG); } # skip @@ -868,6 +1067,87 @@ } }; +=item netstring => $cb->($handle, $string) + +A netstring (http://cr.yp.to/proto/netstrings.txt, this is not an endorsement). + +Throws an error with C<$!> set to EBADMSG on format violations. + +=cut + +register_read_type netstring => sub { + my ($self, $cb) = @_; + + sub { + unless ($_[0]{rbuf} =~ s/^(0|[1-9][0-9]*)://) { + if ($_[0]{rbuf} =~ /[^0-9]/) { + $self->_error (&Errno::EBADMSG); + } + return; + } + + my $len = $1; + + $self->unshift_read (chunk => $len, sub { + my $string = $_[1]; + $_[0]->unshift_read (chunk => 1, sub { + if ($_[1] eq ",") { + $cb->($_[0], $string); + } else { + $self->_error (&Errno::EBADMSG); + } + }); + }); + + 1 + } +}; + +=item packstring => $format, $cb->($handle, $string) + +An octet string prefixed with an encoded length. The encoding C<$format> +uses the same format as a Perl C format, but must specify a single +integer only (only one of C is allowed, plus an +optional C, C<< < >> or C<< > >> modifier). + +DNS over TCP uses a prefix of C, EPP uses a prefix of C. + +Example: read a block of data prefixed by its length in BER-encoded +format (very efficient). + + $handle->push_read (packstring => "w", sub { + my ($handle, $data) = @_; + }); + +=cut + +register_read_type packstring => sub { + my ($self, $cb, $format) = @_; + + sub { + # when we can use 5.10 we can use ".", but for 5.8 we use the re-pack method + defined (my $len = eval { unpack $format, $_[0]{rbuf} }) + or return; + + $format = length pack $format, $len; + + # bypass unshift if we already have the remaining chunk + if ($format + $len <= length $_[0]{rbuf}) { + my $data = substr $_[0]{rbuf}, $format, $len; + substr $_[0]{rbuf}, 0, $format + $len, ""; + $cb->($_[0], $data); + } else { + # remove prefix + substr $_[0]{rbuf}, 0, $format, ""; + + # read remaining chunk + $_[0]->unshift_read (chunk => $len, $cb); + } + + 1 + } +}; + =item json => $cb->($handle, $hash_or_arrayref) Reads a JSON object or array, decodes it and passes it to the callback. @@ -887,7 +1167,7 @@ =cut register_read_type json => sub { - my ($self, $cb, $accept, $reject, $skip) = @_; + my ($self, $cb) = @_; require JSON; @@ -912,6 +1192,51 @@ } }; +=item storable => $cb->($handle, $ref) + +Deserialises a L frozen representation as written by the +C write type (BER-encoded length prefix followed by nfreeze'd +data). + +Raises C error if the data could not be decoded. + +=cut + +register_read_type storable => sub { + my ($self, $cb) = @_; + + require Storable; + + sub { + # when we can use 5.10 we can use ".", but for 5.8 we use the re-pack method + defined (my $len = eval { unpack "w", $_[0]{rbuf} }) + or return; + + my $format = length pack "w", $len; + + # bypass unshift if we already have the remaining chunk + if ($format + $len <= length $_[0]{rbuf}) { + my $data = substr $_[0]{rbuf}, $format, $len; + substr $_[0]{rbuf}, 0, $format + $len, ""; + $cb->($_[0], Storable::thaw ($data)); + } else { + # remove prefix + substr $_[0]{rbuf}, 0, $format, ""; + + # read remaining chunk + $_[0]->unshift_read (chunk => $len, sub { + if (my $ref = eval { Storable::thaw ($_[1]) }) { + $cb->($_[0], $ref); + } else { + $self->_error (&Errno::EBADMSG); + } + }); + } + + 1 + } +}; + =back =item AnyEvent::Handle::register_read_type type => $coderef->($handle, $cb, @args) @@ -939,10 +1264,15 @@ =item $handle->start_read In rare cases you actually do not want to read anything from the -socket. In this case you can call C. Neither C no +socket. In this case you can call C. Neither C nor any queued callbacks will be executed then. To start reading again, call C. +Note that AnyEvent::Handle will automatically C for you when +you change the C callback or push/unshift a read callback, and it +will automatically C for you when neither C is set nor +there are any read requests in the queue. + =cut sub stop_read { @@ -962,17 +1292,19 @@ my $len = sysread $self->{fh}, $$rbuf, $self->{read_size} || 8192, length $$rbuf; if ($len > 0) { + $self->{_activity} = AnyEvent->now; + $self->{filter_r} - ? $self->{filter_r}->($self, $rbuf) - : $self->_drain_rbuf; + ? $self->{filter_r}($self, $rbuf) + : $self->{_in_drain} || $self->_drain_rbuf; } elsif (defined $len) { delete $self->{_rw}; $self->{_eof} = 1; - $self->_drain_rbuf; + $self->_drain_rbuf unless $self->{_in_drain}; - } elsif ($! != EAGAIN && $! != EINTR && $! != &AnyEvent::Util::WSAWOULDBLOCK) { - return $self->error; + } elsif ($! != EAGAIN && $! != EINTR && $! != WSAEWOULDBLOCK) { + return $self->_error ($!, 1); } }); } @@ -981,34 +1313,43 @@ sub _dotls { my ($self) = @_; + my $buf; + if (length $self->{_tls_wbuf}) { while ((my $len = Net::SSLeay::write ($self->{tls}, $self->{_tls_wbuf})) > 0) { substr $self->{_tls_wbuf}, 0, $len, ""; } } - if (defined (my $buf = Net::SSLeay::BIO_read ($self->{_wbio}))) { - $self->{wbuf} .= $buf; - $self->_drain_wbuf; - } + while (defined ($buf = Net::SSLeay::read ($self->{tls}))) { + unless (length $buf) { + # let's treat SSL-eof as we treat normal EOF + delete $self->{_rw}; + $self->{_eof} = 1; + } - while (defined (my $buf = Net::SSLeay::read ($self->{tls}))) { $self->{rbuf} .= $buf; - $self->_drain_rbuf; + $self->_drain_rbuf unless $self->{_in_drain}; + + $self->{tls} or return; # tls could have gone away } my $err = Net::SSLeay::get_error ($self->{tls}, -1); if ($err!= Net::SSLeay::ERROR_WANT_READ ()) { if ($err == Net::SSLeay::ERROR_SYSCALL ()) { - $self->error; + return $self->_error ($!, 1); } elsif ($err == Net::SSLeay::ERROR_SSL ()) { - $! = &Errno::EIO; - $self->error; + return $self->_error (&Errno::EIO, 1); } # all others are fine for our purposes } + + if (length ($buf = Net::SSLeay::BIO_read ($self->{_wbio}))) { + $self->{wbuf} .= $buf; + $self->_drain_wbuf; + } } =item $handle->starttls ($tls[, $tls_ctx]) @@ -1029,7 +1370,6 @@ =cut -# TODO: maybe document... sub starttls { my ($self, $ssl, $ctx) = @_; @@ -1050,6 +1390,12 @@ # (unfortunately, we have to hardcode constants because the abysmally misdesigned # and mismaintained ssleay-module doesn't even offer them). # http://www.mail-archive.com/openssl-dev@openssl.org/msg22420.html + # + # in short: this is a mess. + # + # note that we do not try to kepe the length constant between writes as we are required to do. + # we assume that most (but not all) of this insanity only applies to non-blocking cases, + # and we drive openssl fully in blocking mode here. Net::SSLeay::CTX_set_mode ($self->{tls}, (eval { local $SIG{__DIE__}; Net::SSLeay::MODE_ENABLE_PARTIAL_WRITE () } || 1) | (eval { local $SIG{__DIE__}; Net::SSLeay::MODE_ACCEPT_MOVING_WRITE_BUFFER () } || 2)); @@ -1067,6 +1413,8 @@ Net::SSLeay::BIO_write ($_[0]{_rbio}, ${$_[1]}); &_dotls; }; + + &_dotls; # need to trigger the initial negotiation exchange } =item $handle->stoptls @@ -1092,6 +1440,28 @@ my $self = shift; $self->stoptls; + + my $linger = exists $self->{linger} ? $self->{linger} : 3600; + + if ($linger && length $self->{wbuf}) { + my $fh = delete $self->{fh}; + my $wbuf = delete $self->{wbuf}; + + my @linger; + + push @linger, AnyEvent->io (fh => $fh, poll => "w", cb => sub { + my $len = syswrite $fh, $wbuf, length $wbuf; + + if ($len > 0) { + substr $wbuf, 0, $len, ""; + } else { + @linger = (); # end + } + }); + push @linger, AnyEvent->timer (after => $linger, cb => sub { + @linger = (); + }); + } } =item AnyEvent::Handle::TLS_CTX @@ -1143,7 +1513,7 @@ =item * all constructor arguments become object members. At least initially, when you pass a C-argument to the constructor it -will end up in C<< $handle->{tls} >>. Those members might be changes or +will end up in C<< $handle->{tls} >>. Those members might be changed or mutated later on (for example C will hold the TLS connection object). =item * other object member names are prefixed with an C<_>.