--- AnyEvent/lib/AnyEvent/Handle.pm 2008/06/05 18:30:08 1.60 +++ AnyEvent/lib/AnyEvent/Handle.pm 2008/06/25 20:29:32 1.70 @@ -16,7 +16,7 @@ =cut -our $VERSION = 4.14; +our $VERSION = 4.151; =head1 SYNOPSIS @@ -107,7 +107,9 @@ =item on_read => $cb->($handle) This sets the default read callback, which is called when data arrives -and no read request is in the queue. +and no read request is in the queue (unlike read queue callbacks, this +callback will only be called when at least one octet of data is in the +read buffer). To access (and remove data from) the read buffer, use the C<< ->rbuf >> method or access the C<$handle->{rbuf}> member directly. @@ -124,6 +126,12 @@ To append to the write buffer, use the C<< ->push_write >> method. +This callback is useful when you don't want to put all of your write data +into the queue at once, for example, when you want to write the contents +of some file to the socket you might not want to read the whole file into +memory and push it into the queue, but instead only read more data from +the file when the write queue becomes empty. + =item timeout => $fractional_seconds If non-zero, then this enables an "inactivity" timeout: whenever this many @@ -156,6 +164,30 @@ amount of data without a callback ever being called as long as the line isn't finished). +=item autocork => + +When disabled (the default), then C will try to immediately +write the data to the handle if possible. This avoids having to register +a write watcher and wait for the next event loop iteration, but can be +inefficient if you write multiple small chunks (this disadvantage is +usually avoided by your kernel's nagle algorithm, see C). + +When enabled, then writes will always be queued till the next event loop +iteration. This is efficient when you do many small writes per iteration, +but less efficient when you do a single write only. + +=item no_delay => + +When doing small writes on sockets, your operating system kernel might +wait a bit for more data before actually sending it out. This is called +the Nagle algorithm, and usually it is beneficial. + +In some situations you want as low a delay as possible, which cna be +accomplishd by setting this option to true. + +The default is your opertaing system's default behaviour, this option +explicitly enables or disables it, if possible. + =item read_size => The default read block size (the amount of bytes this module will try to read @@ -167,6 +199,17 @@ buffer: If the write reaches this size or gets even samller it is considered empty. +=item linger => + +If non-zero (default: C<3600>), then the destructor of the +AnyEvent::Handle object will check wether there is still outstanding write +data and will install a watcher that will write out this data. No errors +will be reported (this mostly matches how the operating system treats +outstanding data at socket close time). + +This will not work for partial TLS data that could not yet been +encoded. This data will be lost. + =item tls => "accept" | "connect" | Net::SSLeay::SSL object When this parameter is given, it enables TLS (SSL) mode, that means it @@ -229,8 +272,11 @@ $self->{_activity} = AnyEvent->now; $self->_timeout; - $self->on_drain (delete $self->{on_drain}) if $self->{on_drain}; - $self->on_read (delete $self->{on_read} ) if $self->{on_read}; + $self->on_drain (delete $self->{on_drain}) if exists $self->{on_drain}; + $self->no_delay (delete $self->{no_delay}) if exists $self->{no_delay}; + + $self->start_read + if $self->{on_read}; $self } @@ -301,6 +347,29 @@ $_[0]{on_timeout} = $_[1]; } +=item $handle->autocork ($boolean) + +Enables or disables the current autocork behaviour (see C +constructor argument). + +=cut + +=item $handle->no_delay ($boolean) + +Enables or disables the C setting (see constructor argument of +the same name for details). + +=cut + +sub no_delay { + $_[0]{no_delay} = $_[1]; + + eval { + local $SIG{__DIE__}; + setsockopt $_[0]{fh}, &Socket::IPPROTO_TCP, &Socket::TCP_NODELAY, int $_[1]; + }; +} + ############################################################################# =item $handle->timeout ($seconds) @@ -423,7 +492,7 @@ }; # try to write data immediately - $cb->(); + $cb->() unless $self->{autocork}; # if still data left in wbuf, we need to poll $self->{_ww} = AnyEvent->io (fh => $self->{fh}, poll => "w", cb => $cb) @@ -478,6 +547,21 @@ sprintf "%d:%s,", (length $string), $string }; +=item packstring => $format, $data + +An octet string prefixed with an encoded length. The encoding C<$format> +uses the same format as a Perl C format, but must specify a single +integer only (only one of C is allowed, plus an +optional C, C<< < >> or C<< > >> modifier). + +=cut + +register_write_type packstring => sub { + my ($self, $format, $string) = @_; + + pack "$format/a*", $string +}; + =item json => $array_or_hashref Encodes the given hash or array reference into a JSON object. Unless you @@ -519,6 +603,21 @@ : JSON::encode_json ($ref) }; +=item storable => $reference + +Freezes the given reference using L and writes it to the +handle. Uses the C format. + +=cut + +register_write_type storable => sub { + my ($self, $ref) = @_; + + require Storable; + + pack "w/a*", Storable::nfreeze ($ref) +}; + =back =item AnyEvent::Handle::register_write_type type => $coderef->($handle, @args) @@ -550,13 +649,14 @@ In the simple case, you just install an C callback and whenever new data arrives, it will be called. You can then remove some data (if -enough is there) from the read buffer (C<< $handle->rbuf >>) if you want -or not. +enough is there) from the read buffer (C<< $handle->rbuf >>). Or you cna +leave the data there if you want to accumulate more (e.g. when only a +partial message has been received so far). In the more complex case, you want to queue multiple callbacks. In this case, AnyEvent::Handle will call the first queued callback each time new -data arrives and removes it when it has done its job (see C, -below). +data arrives (also the first time it is queued) and removes it when it has +done its job (see C, below). This way you can, for example, push three line-reads, followed by reading a chunk of data, and AnyEvent::Handle will execute them in order. @@ -579,13 +679,17 @@ }); }); -Example 2: Implement a client for a protocol that replies either with -"OK" and another line or "ERROR" for one request, and 64 bytes for the -second request. Due tot he availability of a full queue, we can just -pipeline sending both requests and manipulate the queue as necessary in -the callbacks: +Example 2: Implement a client for a protocol that replies either with "OK" +and another line or "ERROR" for the first request that is sent, and 64 +bytes for the second request. Due to the availability of a queue, we can +just pipeline sending both requests and manipulate the queue as necessary +in the callbacks. + +When the first callback is called and sees an "OK" response, it will +C another line-read. This line-read will be queued I the +64-byte chunk callback. - # request one + # request one, returns either "OK + extra line" or "ERROR" $handle->push_write ("request 1\015\012"); # we expect "ERROR" or "OK" as response, so push a line read @@ -602,7 +706,7 @@ } }); - # request two + # request two, simply returns 64 octets $handle->push_write ("request 2\015\012"); # simply read 64 bytes, always @@ -636,13 +740,15 @@ unless ($cb->($self)) { if ($self->{_eof}) { # no progress can be made (not enough data and no data forthcoming) - return $self->_error (&Errno::EPIPE, 1); + $self->_error (&Errno::EPIPE, 1), last; } unshift @{ $self->{_queue} }, $cb; last; } } elsif ($self->{on_read}) { + last unless $len; + $self->{on_read}($self); if ( @@ -652,7 +758,7 @@ ) { # no further data will arrive # so no progress can be made - return $self->_error (&Errno::EPIPE, 1) + $self->_error (&Errno::EPIPE, 1), last if $self->{_eof}; last; # more data might arrive @@ -857,42 +963,6 @@ $self->unshift_read (line => @_); } -=item netstring => $cb->($handle, $string) - -A netstring (http://cr.yp.to/proto/netstrings.txt, this is not an endorsement). - -Throws an error with C<$!> set to EBADMSG on format violations. - -=cut - -register_read_type netstring => sub { - my ($self, $cb) = @_; - - sub { - unless ($_[0]{rbuf} =~ s/^(0|[1-9][0-9]*)://) { - if ($_[0]{rbuf} =~ /[^0-9]/) { - $self->_error (&Errno::EBADMSG); - } - return; - } - - my $len = $1; - - $self->unshift_read (chunk => $len, sub { - my $string = $_[1]; - $_[0]->unshift_read (chunk => 1, sub { - if ($_[1] eq ",") { - $cb->($_[0], $string); - } else { - $self->_error (&Errno::EBADMSG); - } - }); - }); - - 1 - } -}; - =item regex => $accept[, $reject[, $skip], $cb->($handle, $data) Makes a regex match against the regex object C<$accept> and returns @@ -963,6 +1033,78 @@ } }; +=item netstring => $cb->($handle, $string) + +A netstring (http://cr.yp.to/proto/netstrings.txt, this is not an endorsement). + +Throws an error with C<$!> set to EBADMSG on format violations. + +=cut + +register_read_type netstring => sub { + my ($self, $cb) = @_; + + sub { + unless ($_[0]{rbuf} =~ s/^(0|[1-9][0-9]*)://) { + if ($_[0]{rbuf} =~ /[^0-9]/) { + $self->_error (&Errno::EBADMSG); + } + return; + } + + my $len = $1; + + $self->unshift_read (chunk => $len, sub { + my $string = $_[1]; + $_[0]->unshift_read (chunk => 1, sub { + if ($_[1] eq ",") { + $cb->($_[0], $string); + } else { + $self->_error (&Errno::EBADMSG); + } + }); + }); + + 1 + } +}; + +=item packstring => $format, $cb->($handle, $string) + +An octet string prefixed with an encoded length. The encoding C<$format> +uses the same format as a Perl C format, but must specify a single +integer only (only one of C is allowed, plus an +optional C, C<< < >> or C<< > >> modifier). + +DNS over TCP uses a prefix of C, EPP uses a prefix of C. + +Example: read a block of data prefixed by its length in BER-encoded +format (very efficient). + + $handle->push_read (packstring => "w", sub { + my ($handle, $data) = @_; + }); + +=cut + +register_read_type packstring => sub { + my ($self, $cb, $format) = @_; + + sub { + # when we can use 5.10 we can use ".", but for 5.8 we use the re-pack method + defined (my $len = eval { unpack $format, $_[0]->{rbuf} }) + or return; + + # remove prefix + substr $_[0]->{rbuf}, 0, (length pack $format, $len), ""; + + # read rest + $_[0]->unshift_read (chunk => $len, $cb); + + 1 + } +}; + =item json => $cb->($handle, $hash_or_arrayref) Reads a JSON object or array, decodes it and passes it to the callback. @@ -982,7 +1124,7 @@ =cut register_read_type json => sub { - my ($self, $cb, $accept, $reject, $skip) = @_; + my ($self, $cb) = @_; require JSON; @@ -1007,6 +1149,40 @@ } }; +=item storable => $cb->($handle, $ref) + +Deserialises a L frozen representation as written by the +C write type (BER-encoded length prefix followed by nfreeze'd +data). + +Raises C error if the data could not be decoded. + +=cut + +register_read_type storable => sub { + my ($self, $cb) = @_; + + require Storable; + + sub { + # when we can use 5.10 we can use ".", but for 5.8 we use the re-pack method + defined (my $len = eval { unpack "w", $_[0]->{rbuf} }) + or return; + + # remove prefix + substr $_[0]->{rbuf}, 0, (length pack "w", $len), ""; + + # read rest + $_[0]->unshift_read (chunk => $len, sub { + if (my $ref = eval { Storable::thaw ($_[1]) }) { + $cb->($_[0], $ref); + } else { + $self->_error (&Errno::EBADMSG); + } + }); + } +}; + =back =item AnyEvent::Handle::register_read_type type => $coderef->($handle, $cb, @args) @@ -1201,6 +1377,28 @@ my $self = shift; $self->stoptls; + + my $linger = exists $self->{linger} ? $self->{linger} : 3600; + + if ($linger && length $self->{wbuf}) { + my $fh = delete $self->{fh}; + my $wbuf = delete $self->{wbuf}; + + my @linger; + + push @linger, AnyEvent->io (fh => $fh, poll => "w", cb => sub { + my $len = syswrite $fh, $wbuf, length $wbuf; + + if ($len > 0) { + substr $wbuf, 0, $len, ""; + } else { + @linger = (); # end + } + }); + push @linger, AnyEvent->timer (after => $linger, cb => sub { + @linger = (); + }); + } } =item AnyEvent::Handle::TLS_CTX