--- AnyEvent/lib/AnyEvent/Handle.pm 2008/06/02 09:12:14 1.53 +++ AnyEvent/lib/AnyEvent/Handle.pm 2008/06/06 10:49:20 1.62 @@ -16,7 +16,7 @@ =cut -our $VERSION = 4.1; +our $VERSION = 4.14; =head1 SYNOPSIS @@ -107,7 +107,9 @@ =item on_read => $cb->($handle) This sets the default read callback, which is called when data arrives -and no read request is in the queue. +and no read request is in the queue (unlike read queue callbacks, this +callback will only be called when at least one octet of data is in the +read buffer). To access (and remove data from) the read buffer, use the C<< ->rbuf >> method or access the C<$handle->{rbuf}> member directly. @@ -167,6 +169,17 @@ buffer: If the write reaches this size or gets even samller it is considered empty. +=item linger => + +If non-zero (default: C<3600>), then the destructor of the +AnyEvent::Handle object will check wether there is still outstanding write +data and will install a watcher that will write out this data. No errors +will be reported (this mostly matches how the operating system treats +outstanding data at socket close time). + +This will not work for partial TLS data that could not yet been +encoded. This data will be lost. + =item tls => "accept" | "connect" | Net::SSLeay::SSL object When this parameter is given, it enables TLS (SSL) mode, that means it @@ -226,15 +239,10 @@ $self->starttls (delete $self->{tls}, delete $self->{tls_ctx}); } -# $self->on_eof (delete $self->{on_eof} ) if $self->{on_eof}; # nop -# $self->on_error (delete $self->{on_error}) if $self->{on_error}; # nop -# $self->on_read (delete $self->{on_read} ) if $self->{on_read}; # nop - $self->on_drain (delete $self->{on_drain}) if $self->{on_drain}; - $self->{_activity} = AnyEvent->now; $self->_timeout; - $self->start_read; + $self->on_drain (delete $self->{on_drain}) if $self->{on_drain}; $self } @@ -341,7 +349,7 @@ $self->_error (&Errno::ETIMEDOUT); } - # callbakx could have changed timeout value, optimise + # callback could have changed timeout value, optimise return unless $self->{timeout}; # calculate new after @@ -349,6 +357,7 @@ } Scalar::Util::weaken $self; + return unless $self; # ->error could have destroyed $self $self->{_tw} ||= AnyEvent->timer (after => $after, cb => sub { delete $self->{_tw}; @@ -481,6 +490,21 @@ sprintf "%d:%s,", (length $string), $string }; +=item packstring => $format, $data + +An octet string prefixed with an encoded length. The encoding C<$format> +uses the same format as a Perl C format, but must specify a single +integer only (only one of C is allowed, plus an +optional C, C<< < >> or C<< > >> modifier). + +=cut + +register_write_type packstring => sub { + my ($self, $format, $string) = @_; + + pack "$format/a", $string +}; + =item json => $array_or_hashref Encodes the given hash or array reference into a JSON object. Unless you @@ -558,8 +582,8 @@ In the more complex case, you want to queue multiple callbacks. In this case, AnyEvent::Handle will call the first queued callback each time new -data arrives and removes it when it has done its job (see C, -below). +data arrives (also the first time it is queued) and removes it when it has +done its job (see C, below). This way you can, for example, push three line-reads, followed by reading a chunk of data, and AnyEvent::Handle will execute them in order. @@ -621,6 +645,8 @@ sub _drain_rbuf { my ($self) = @_; + local $self->{_in_drain} = 1; + if ( defined $self->{rbuf_max} && $self->{rbuf_max} < length $self->{rbuf} @@ -628,42 +654,53 @@ return $self->_error (&Errno::ENOSPC, 1); } - return if $self->{in_drain}; - local $self->{in_drain} = 1; - - while (my $len = length $self->{rbuf}) { + while () { no strict 'refs'; + + my $len = length $self->{rbuf}; + if (my $cb = shift @{ $self->{_queue} }) { unless ($cb->($self)) { if ($self->{_eof}) { # no progress can be made (not enough data and no data forthcoming) - return $self->_error (&Errno::EPIPE, 1); + $self->_error (&Errno::EPIPE, 1), last; } unshift @{ $self->{_queue} }, $cb; - return; + last; } } elsif ($self->{on_read}) { + last unless $len; + $self->{on_read}($self); if ( - $self->{_eof} # if no further data will arrive - && $len == length $self->{rbuf} # and no data has been consumed - && !@{ $self->{_queue} } # and the queue is still empty - && $self->{on_read} # and we still want to read data + $len == length $self->{rbuf} # if no data has been consumed + && !@{ $self->{_queue} } # and the queue is still empty + && $self->{on_read} # but we still have on_read ) { - # then no progress can be made - return $self->_error (&Errno::EPIPE, 1); + # no further data will arrive + # so no progress can be made + $self->_error (&Errno::EPIPE, 1), last + if $self->{_eof}; + + last; # more data might arrive } } else { # read side becomes idle delete $self->{_rw}; - return; + last; } } $self->{on_eof}($self) if $self->{_eof} && $self->{on_eof}; + + # may need to restart read watcher + unless ($self->{_rw}) { + $self->start_read + if $self->{on_read} || @{ $self->{_queue} }; + } } =item $handle->on_read ($cb) @@ -678,6 +715,7 @@ my ($self, $cb) = @_; $self->{on_read} = $cb; + $self->_drain_rbuf if $cb && !$self->{_in_drain}; } =item $handle->rbuf @@ -736,7 +774,7 @@ } push @{ $self->{_queue} }, $cb; - $self->_drain_rbuf; + $self->_drain_rbuf unless $self->{_in_drain}; } sub unshift_read { @@ -752,7 +790,7 @@ unshift @{ $self->{_queue} }, $cb; - $self->_drain_rbuf; + $self->_drain_rbuf unless $self->{_in_drain}; } =item $handle->push_read (type => @args, $cb) @@ -848,42 +886,6 @@ $self->unshift_read (line => @_); } -=item netstring => $cb->($handle, $string) - -A netstring (http://cr.yp.to/proto/netstrings.txt, this is not an endorsement). - -Throws an error with C<$!> set to EBADMSG on format violations. - -=cut - -register_read_type netstring => sub { - my ($self, $cb) = @_; - - sub { - unless ($_[0]{rbuf} =~ s/^(0|[1-9][0-9]*)://) { - if ($_[0]{rbuf} =~ /[^0-9]/) { - $self->_error (&Errno::EBADMSG); - } - return; - } - - my $len = $1; - - $self->unshift_read (chunk => $len, sub { - my $string = $_[1]; - $_[0]->unshift_read (chunk => 1, sub { - if ($_[1] eq ",") { - $cb->($_[0], $string); - } else { - $self->_error (&Errno::EBADMSG); - } - }); - }); - - 1 - } -}; - =item regex => $accept[, $reject[, $skip], $cb->($handle, $data) Makes a regex match against the regex object C<$accept> and returns @@ -954,6 +956,78 @@ } }; +=item netstring => $cb->($handle, $string) + +A netstring (http://cr.yp.to/proto/netstrings.txt, this is not an endorsement). + +Throws an error with C<$!> set to EBADMSG on format violations. + +=cut + +register_read_type netstring => sub { + my ($self, $cb) = @_; + + sub { + unless ($_[0]{rbuf} =~ s/^(0|[1-9][0-9]*)://) { + if ($_[0]{rbuf} =~ /[^0-9]/) { + $self->_error (&Errno::EBADMSG); + } + return; + } + + my $len = $1; + + $self->unshift_read (chunk => $len, sub { + my $string = $_[1]; + $_[0]->unshift_read (chunk => 1, sub { + if ($_[1] eq ",") { + $cb->($_[0], $string); + } else { + $self->_error (&Errno::EBADMSG); + } + }); + }); + + 1 + } +}; + +=item packstring => $format, $cb->($handle, $string) + +An octet string prefixed with an encoded length. The encoding C<$format> +uses the same format as a Perl C format, but must specify a single +integer only (only one of C is allowed, plus an +optional C, C<< < >> or C<< > >> modifier). + +DNS over TCP uses a prefix of C, EPP uses a prefix of C. + +Example: read a block of data prefixed by its length in BER-encoded +format (very efficient). + + $handle->push_read (packstring => "w", sub { + my ($handle, $data) = @_; + }); + +=cut + +register_read_type packstring => sub { + my ($self, $cb, $format) = @_; + + sub { + # when we can use 5.10 we can use ".", but for 5.8 we use the re-pack method + defined (my $len = eval { unpack $format, $_[0]->{rbuf} }) + or return; + + # remove prefix + substr $_[0]->{rbuf}, 0, (length pack $format, $len), ""; + + # read rest + $_[0]->unshift_read (chunk => $len, $cb); + + 1 + } +}; + =item json => $cb->($handle, $hash_or_arrayref) Reads a JSON object or array, decodes it and passes it to the callback. @@ -1025,10 +1099,15 @@ =item $handle->start_read In rare cases you actually do not want to read anything from the -socket. In this case you can call C. Neither C no +socket. In this case you can call C. Neither C nor any queued callbacks will be executed then. To start reading again, call C. +Note that AnyEvent::Handle will automatically C for you when +you change the C callback or push/unshift a read callback, and it +will automatically C for you when neither C is set nor +there are any read requests in the queue. + =cut sub stop_read { @@ -1052,12 +1131,12 @@ $self->{filter_r} ? $self->{filter_r}($self, $rbuf) - : $self->_drain_rbuf; + : $self->{_in_drain} || $self->_drain_rbuf; } elsif (defined $len) { delete $self->{_rw}; $self->{_eof} = 1; - $self->_drain_rbuf; + $self->_drain_rbuf unless $self->{_in_drain}; } elsif ($! != EAGAIN && $! != EINTR && $! != WSAEWOULDBLOCK) { return $self->_error ($!, 1); @@ -1069,20 +1148,29 @@ sub _dotls { my ($self) = @_; + my $buf; + if (length $self->{_tls_wbuf}) { while ((my $len = Net::SSLeay::write ($self->{tls}, $self->{_tls_wbuf})) > 0) { substr $self->{_tls_wbuf}, 0, $len, ""; } } - if (defined (my $buf = Net::SSLeay::BIO_read ($self->{_wbio}))) { + if (length ($buf = Net::SSLeay::BIO_read ($self->{_wbio}))) { $self->{wbuf} .= $buf; $self->_drain_wbuf; } - while (defined (my $buf = Net::SSLeay::read ($self->{tls}))) { - $self->{rbuf} .= $buf; - $self->_drain_rbuf; + while (defined ($buf = Net::SSLeay::read ($self->{tls}))) { + if (length $buf) { + $self->{rbuf} .= $buf; + $self->_drain_rbuf unless $self->{_in_drain}; + } else { + # let's treat SSL-eof as we treat normal EOF + $self->{_eof} = 1; + $self->_shutdown; + return; + } } my $err = Net::SSLeay::get_error ($self->{tls}, -1); @@ -1178,6 +1266,28 @@ my $self = shift; $self->stoptls; + + my $linger = exists $self->{linger} ? $self->{linger} : 3600; + + if ($linger && length $self->{wbuf}) { + my $fh = delete $self->{fh}; + my $wbuf = delete $self->{wbuf}; + + my @linger; + + push @linger, AnyEvent->io (fh => $fh, poll => "w", cb => sub { + my $len = syswrite $fh, $wbuf, length $wbuf; + + if ($len > 0) { + substr $wbuf, 0, $len, ""; + } else { + @linger = (); # end + } + }); + push @linger, AnyEvent->timer (after => $linger, cb => sub { + @linger = (); + }); + } } =item AnyEvent::Handle::TLS_CTX