--- AnyEvent/lib/AnyEvent/Handle.pm 2008/05/01 16:35:40 1.7 +++ AnyEvent/lib/AnyEvent/Handle.pm 2008/06/04 22:51:15 1.58 @@ -3,21 +3,20 @@ no warnings; use strict; -use AnyEvent; -use IO::Handle; -use Errno qw/EAGAIN EINTR/; +use AnyEvent (); +use AnyEvent::Util qw(WSAEWOULDBLOCK); +use Scalar::Util (); +use Carp (); +use Fcntl (); +use Errno qw(EAGAIN EINTR); =head1 NAME -AnyEvent::Handle - non-blocking I/O on filehandles via AnyEvent - -=head1 VERSION - -Version 0.01 +AnyEvent::Handle - non-blocking I/O on file handles via AnyEvent =cut -our $VERSION = '0.01'; +our $VERSION = 4.13; =head1 SYNOPSIS @@ -26,43 +25,38 @@ my $cv = AnyEvent->condvar; - my $ae_fh = AnyEvent::Handle->new (fh => \*STDIN); - - $ae_fh->on_eof (sub { $cv->broadcast }); - - $ae_fh->readlines (sub { - my ($ae_fh, @lines) = @_; - for (@lines) { - chomp; - print "Line: $_"; - } - }); - - # or use the constructor to pass the callback: - - my $ae_fh2 = + my $handle = AnyEvent::Handle->new ( fh => \*STDIN, on_eof => sub { $cv->broadcast; }, - on_readline => sub { - my ($ae_fh, @lines) = @_; - for (@lines) { - chomp; - print "Line: $_"; - } - } ); - $cv->wait; + # send some request line + $handle->push_write ("getinfo\015\012"); + + # read the response line + $handle->push_read (line => sub { + my ($handle, $line) = @_; + warn "read line <$line>\n"; + $cv->send; + }); + + $cv->recv; =head1 DESCRIPTION -This module is a helper module to make it easier to do non-blocking I/O -on filehandles (and sockets, see L). +This module is a helper module to make it easier to do event-based I/O on +filehandles. For utility functions for doing non-blocking connects and accepts +on sockets see L. + +In the following, when the documentation refers to of "bytes" then this +means characters. As sysread and syswrite are used for all I/O, their +treatment of characters applies to this module as well. -The event loop is provided by L. +All callbacks will be invoked with the handle object as their first +argument. =head1 METHODS @@ -70,304 +64,1212 @@ =item B -The constructor has these arguments: +The constructor supports these arguments (all as key => value pairs). =over 4 -=item fh => $filehandle +=item fh => $filehandle [MANDATORY] The filehandle this L object will operate on. -NOTE: The filehandle will be set to non-blocking. +NOTE: The filehandle will be set to non-blocking (using +AnyEvent::Util::fh_nonblocking). + +=item on_eof => $cb->($handle) + +Set the callback to be called when an end-of-file condition is detcted, +i.e. in the case of a socket, when the other side has closed the +connection cleanly. + +While not mandatory, it is highly recommended to set an eof callback, +otherwise you might end up with a closed socket while you are still +waiting for data. + +=item on_error => $cb->($handle, $fatal) + +This is the error callback, which is called when, well, some error +occured, such as not being able to resolve the hostname, failure to +connect or a read error. + +Some errors are fatal (which is indicated by C<$fatal> being true). On +fatal errors the handle object will be shut down and will not be +usable. Non-fatal errors can be retried by simply returning, but it is +recommended to simply ignore this parameter and instead abondon the handle +object when this callback is invoked. + +On callback entrance, the value of C<$!> contains the operating system +error (or C, C, C or C). + +While not mandatory, it is I recommended to set this callback, as +you will not be notified of errors otherwise. The default simply calls +C. + +=item on_read => $cb->($handle) + +This sets the default read callback, which is called when data arrives +and no read request is in the queue. + +To access (and remove data from) the read buffer, use the C<< ->rbuf >> +method or access the C<$handle->{rbuf}> member directly. + +When an EOF condition is detected then AnyEvent::Handle will first try to +feed all the remaining data to the queued callbacks and C before +calling the C callback. If no progress can be made, then a fatal +error will be raised (with C<$!> set to C). + +=item on_drain => $cb->($handle) + +This sets the callback that is called when the write buffer becomes empty +(or when the callback is set and the buffer is empty already). + +To append to the write buffer, use the C<< ->push_write >> method. + +=item timeout => $fractional_seconds + +If non-zero, then this enables an "inactivity" timeout: whenever this many +seconds pass without a successful read or write on the underlying file +handle, the C callback will be invoked (and if that one is +missing, an C error will be raised). + +Note that timeout processing is also active when you currently do not have +any outstanding read or write requests: If you plan to keep the connection +idle then you should disable the timout temporarily or ignore the timeout +in the C callback. + +Zero (the default) disables this timeout. + +=item on_timeout => $cb->($handle) + +Called whenever the inactivity timeout passes. If you return from this +callback, then the timeout will be reset as if some activity had happened, +so this condition is not fatal in any way. + +=item rbuf_max => + +If defined, then a fatal error will be raised (with C<$!> set to C) +when the read buffer ever (strictly) exceeds this size. This is useful to +avoid denial-of-service attacks. + +For example, a server accepting connections from untrusted sources should +be configured to accept only so-and-so much data that it cannot act on +(for example, when expecting a line, an attacker could send an unlimited +amount of data without a callback ever being called as long as the line +isn't finished). + +=item read_size => + +The default read block size (the amount of bytes this module will try to read +during each (loop iteration). Default: C<8192>. + +=item low_water_mark => + +Sets the amount of bytes (default: C<0>) that make up an "empty" write +buffer: If the write reaches this size or gets even samller it is +considered empty. + +=item tls => "accept" | "connect" | Net::SSLeay::SSL object + +When this parameter is given, it enables TLS (SSL) mode, that means it +will start making tls handshake and will transparently encrypt/decrypt +data. + +TLS mode requires Net::SSLeay to be installed (it will be loaded +automatically when you try to create a TLS handle). + +For the TLS server side, use C, and for the TLS client side of a +connection, use C mode. + +You can also provide your own TLS connection object, but you have +to make sure that you call either C +or C on it before you pass it to +AnyEvent::Handle. + +See the C method if you need to start TLs negotiation later. -=item read_block_size => $size +=item tls_ctx => $ssl_ctx -The default read block size use for reads via the C -method. +Use the given Net::SSLeay::CTX object to create the new TLS connection +(unless a connection object was specified directly). If this parameter is +missing, then AnyEvent::Handle will use C. -=item on_read => $cb +=item json => JSON or JSON::XS object -=item on_eof => $cb +This is the json coder object used by the C read and write types. -=item on_error => $cb +If you don't supply it, then AnyEvent::Handle will create and use a +suitable one, which will write and expect UTF-8 encoded JSON texts. -These are shortcuts, that will call the corresponding method and set the callback to C<$cb>. +Note that you are responsible to depend on the JSON module if you want to +use this functionality, as AnyEvent does not have a dependency itself. -=item on_readline => $cb +=item filter_r => $cb -The C method is called with the default separated and C<$cb> as callback -for you. +=item filter_w => $cb + +These exist, but are undocumented at this time. =back =cut sub new { - my $this = shift; - my $class = ref($this) || $this; - my $self = { - read_block_size => 4096, - rbuf => '', - @_ - }; - bless $self, $class; + my $class = shift; + + my $self = bless { @_ }, $class; + + $self->{fh} or Carp::croak "mandatory argument fh is missing"; + + AnyEvent::Util::fh_nonblocking $self->{fh}, 1; + + if ($self->{tls}) { + require Net::SSLeay; + $self->starttls (delete $self->{tls}, delete $self->{tls_ctx}); + } + + $self->{_activity} = AnyEvent->now; + $self->_timeout; + + $self->on_drain (delete $self->{on_drain}) if $self->{on_drain}; + $self->on_read (delete $self->{on_read} ) if $self->{on_read}; + + $self +} + +sub _shutdown { + my ($self) = @_; + + delete $self->{_tw}; + delete $self->{_rw}; + delete $self->{_ww}; + delete $self->{fh}; - $self->{fh}->blocking (0) if $self->{fh}; + $self->stoptls; +} - if ($self->{on_read}) { - $self->on_read ($self->{on_read}); +sub _error { + my ($self, $errno, $fatal) = @_; - } elsif ($self->{on_readline}) { - $self->readlines ($self->{on_readline}); + $self->_shutdown + if $fatal; - } elsif ($self->{on_eof}) { - $self->on_eof ($self->{on_eof}); + $! = $errno; - } elsif ($self->{on_error}) { - $self->on_eof ($self->{on_error}); + if ($self->{on_error}) { + $self->{on_error}($self, $fatal); + } else { + Carp::croak "AnyEvent::Handle uncaught error: $!"; } +} + +=item $fh = $handle->fh + +This method returns the file handle of the L object. - return $self +=cut + +sub fh { $_[0]{fh} } + +=item $handle->on_error ($cb) + +Replace the current C callback (see the C constructor argument). + +=cut + +sub on_error { + $_[0]{on_error} = $_[1]; +} + +=item $handle->on_eof ($cb) + +Replace the current C callback (see the C constructor argument). + +=cut + +sub on_eof { + $_[0]{on_eof} = $_[1]; } -=item B +=item $handle->on_timeout ($cb) -This method returns the filehandle of the L object. +Replace the current C callback, or disables the callback +(but not the timeout) if C<$cb> = C. See C constructor +argument. =cut -sub fh { $_[0]->{fh} } +sub on_timeout { + $_[0]{on_timeout} = $_[1]; +} -=item B +############################################################################# -This method installs a C<$callback> that will be called -when new data arrived. You can access the read buffer via the C -method (see below). +=item $handle->timeout ($seconds) -The first argument of the C<$callback> will be the L object. +Configures (or disables) the inactivity timeout. =cut -sub on_read { - my ($self, $cb) = @_; - $self->{on_read} = $cb; +sub timeout { + my ($self, $timeout) = @_; - unless (defined $self->{on_read}) { - delete $self->{on_read_w}; - return; - } - - $self->{on_read_w} = - AnyEvent->io (poll => 'r', fh => $self->{fh}, cb => sub { - #d# warn "READ:[$self->{read_size}] $self->{read_block_size} : ".length ($self->{rbuf})."\n"; - my $rbuf_len = length $self->{rbuf}; - my $l; - if (defined $self->{read_size}) { - $l = sysread $self->{fh}, $self->{rbuf}, - ($self->{read_size} - $rbuf_len), $rbuf_len; - } else { - $l = sysread $self->{fh}, $self->{rbuf}, $self->{read_block_size}, $rbuf_len; - } - #d# warn "READL $l [$self->{rbuf}]\n"; + $self->{timeout} = $timeout; + $self->_timeout; +} + +# reset the timeout watcher, as neccessary +# also check for time-outs +sub _timeout { + my ($self) = @_; - if (not defined $l) { - return if $! == EAGAIN || $! == EINTR; - $self->{on_error}->($self) if $self->{on_error}; - delete $self->{on_read_w}; - - } elsif ($l == 0) { - $self->{on_eof}->($self) if $self->{on_eof}; - delete $self->{on_read_w}; + if ($self->{timeout}) { + my $NOW = AnyEvent->now; + # when would the timeout trigger? + my $after = $self->{_activity} + $self->{timeout} - $NOW; + + # now or in the past already? + if ($after <= 0) { + $self->{_activity} = $NOW; + + if ($self->{on_timeout}) { + $self->{on_timeout}($self); } else { - $self->{on_read}->($self); + $self->_error (&Errno::ETIMEDOUT); } + + # callback could have changed timeout value, optimise + return unless $self->{timeout}; + + # calculate new after + $after = $self->{timeout}; + } + + Scalar::Util::weaken $self; + return unless $self; # ->error could have destroyed $self + + $self->{_tw} ||= AnyEvent->timer (after => $after, cb => sub { + delete $self->{_tw}; + $self->_timeout; }); + } else { + delete $self->{_tw}; + } } -=item B +############################################################################# + +=back + +=head2 WRITE QUEUE -Whenever a read or write operation resulted in an error the C<$callback> -will be called. +AnyEvent::Handle manages two queues per handle, one for writing and one +for reading. -The first argument of C<$callback> will be the L object itself. -The error is given as errno in C<$!>. +The write queue is very simple: you can add data to its end, and +AnyEvent::Handle will automatically try to get rid of it for you. + +When data could be written and the write buffer is shorter then the low +water mark, the C callback will be invoked. + +=over 4 + +=item $handle->on_drain ($cb) + +Sets the C callback or clears it (see the description of +C in the constructor). =cut -sub on_error { - $_[0]->{on_error} = $_[1]; +sub on_drain { + my ($self, $cb) = @_; + + $self->{on_drain} = $cb; + + $cb->($self) + if $cb && $self->{low_water_mark} >= length $self->{wbuf}; } -=item B +=item $handle->push_write ($data) -Installs the C<$callback> that will be called when the end of file is -encountered in a read operation this C<$callback> will be called. The first -argument will be the L object itself. +Queues the given scalar to be written. You can push as much data as you +want (only limited by the available memory), as C +buffers it independently of the kernel. =cut -sub on_eof { - $_[0]->{on_eof} = $_[1]; +sub _drain_wbuf { + my ($self) = @_; + + if (!$self->{_ww} && length $self->{wbuf}) { + + Scalar::Util::weaken $self; + + my $cb = sub { + my $len = syswrite $self->{fh}, $self->{wbuf}; + + if ($len >= 0) { + substr $self->{wbuf}, 0, $len, ""; + + $self->{_activity} = AnyEvent->now; + + $self->{on_drain}($self) + if $self->{low_water_mark} >= length $self->{wbuf} + && $self->{on_drain}; + + delete $self->{_ww} unless length $self->{wbuf}; + } elsif ($! != EAGAIN && $! != EINTR && $! != WSAEWOULDBLOCK) { + $self->_error ($!, 1); + } + }; + + # try to write data immediately + $cb->(); + + # if still data left in wbuf, we need to poll + $self->{_ww} = AnyEvent->io (fh => $self->{fh}, poll => "w", cb => $cb) + if length $self->{wbuf}; + }; } -=item B +our %WH; -Returns a reference to the read buffer. +sub register_write_type($$) { + $WH{$_[0]} = $_[1]; +} -NOTE: The read buffer should only be used or modified if the C -method is used directly. The C and C methods will provide -the read data to their callbacks. +sub push_write { + my $self = shift; -=cut + if (@_ > 1) { + my $type = shift; -sub rbuf : lvalue { - $_[0]->{rbuf} + @_ = ($WH{$type} or Carp::croak "unsupported type passed to AnyEvent::Handle::push_write") + ->($self, @_); + } + + if ($self->{filter_w}) { + $self->{filter_w}($self, \$_[0]); + } else { + $self->{wbuf} .= $_[0]; + $self->_drain_wbuf; + } } -=item B +=item $handle->push_write (type => @args) + +Instead of formatting your data yourself, you can also let this module do +the job by specifying a type and type-specific arguments. -Will read exactly C<$len> bytes from the filehandle and call the C<$callback> -if done so. The first argument to the C<$callback> will be the L -object itself and the second argument the read data. +Predefined types are (if you have ideas for additional types, feel free to +drop by and tell us): -NOTE: This method will override any callbacks installed via the C method. +=over 4 + +=item netstring => $string + +Formats the given value as netstring +(http://cr.yp.to/proto/netstrings.txt, this is not a recommendation to use them). =cut -sub read { - my ($self, $len, $cb) = @_; +register_write_type netstring => sub { + my ($self, $string) = @_; + + sprintf "%d:%s,", (length $string), $string +}; + +=item json => $array_or_hashref + +Encodes the given hash or array reference into a JSON object. Unless you +provide your own JSON object, this means it will be encoded to JSON text +in UTF-8. + +JSON objects (and arrays) are self-delimiting, so you can write JSON at +one end of a handle and read them at the other end without using any +additional framing. - $self->{read_cb} = $cb; - my $old_blk_size = $self->{read_block_size}; - $self->{read_block_size} = $len; +The generated JSON text is guaranteed not to contain any newlines: While +this module doesn't need delimiters after or between JSON texts to be +able to read them, many other languages depend on that. - $self->on_read (sub { - #d# warn "OFOFO $len || ".length($_[0]->{rbuf})."||\n"; +A simple RPC protocol that interoperates easily with others is to send +JSON arrays (or objects, although arrays are usually the better choice as +they mimic how function argument passing works) and a newline after each +JSON text: + + $handle->push_write (json => ["method", "arg1", "arg2"]); # whatever + $handle->push_write ("\012"); + +An AnyEvent::Handle receiver would simply use the C read type and +rely on the fact that the newline will be skipped as leading whitespace: + + $handle->push_read (json => sub { my $array = $_[1]; ... }); + +Other languages could read single lines terminated by a newline and pass +this line into their JSON decoder of choice. + +=cut - if ($len == length $_[0]->{rbuf}) { - $_[0]->{read_block_size} = $old_blk_size; - $_[0]->on_read (undef); - $_[0]->{read_cb}->($_[0], (substr $self->{rbuf}, 0, $len, '')); +register_write_type json => sub { + my ($self, $ref) = @_; + + require JSON; + + $self->{json} ? $self->{json}->encode ($ref) + : JSON::encode_json ($ref) +}; + +=back + +=item AnyEvent::Handle::register_write_type type => $coderef->($handle, @args) + +This function (not method) lets you add your own types to C. +Whenever the given C is used, C will invoke the code +reference with the handle object and the remaining arguments. + +The code reference is supposed to return a single octet string that will +be appended to the write buffer. + +Note that this is a function, and all types registered this way will be +global, so try to use unique names. + +=cut + +############################################################################# + +=back + +=head2 READ QUEUE + +AnyEvent::Handle manages two queues per handle, one for writing and one +for reading. + +The read queue is more complex than the write queue. It can be used in two +ways, the "simple" way, using only C and the "complex" way, using +a queue. + +In the simple case, you just install an C callback and whenever +new data arrives, it will be called. You can then remove some data (if +enough is there) from the read buffer (C<< $handle->rbuf >>) if you want +or not. + +In the more complex case, you want to queue multiple callbacks. In this +case, AnyEvent::Handle will call the first queued callback each time new +data arrives and removes it when it has done its job (see C, +below). + +This way you can, for example, push three line-reads, followed by reading +a chunk of data, and AnyEvent::Handle will execute them in order. + +Example 1: EPP protocol parser. EPP sends 4 byte length info, followed by +the specified number of bytes which give an XML datagram. + + # in the default state, expect some header bytes + $handle->on_read (sub { + # some data is here, now queue the length-header-read (4 octets) + shift->unshift_read (chunk => 4, sub { + # header arrived, decode + my $len = unpack "N", $_[1]; + + # now read the payload + shift->unshift_read (chunk => $len, sub { + my $xml = $_[1]; + # handle xml + }); + }); + }); + +Example 2: Implement a client for a protocol that replies either with +"OK" and another line or "ERROR" for one request, and 64 bytes for the +second request. Due tot he availability of a full queue, we can just +pipeline sending both requests and manipulate the queue as necessary in +the callbacks: + + # request one + $handle->push_write ("request 1\015\012"); + + # we expect "ERROR" or "OK" as response, so push a line read + $handle->push_read (line => sub { + # if we got an "OK", we have to _prepend_ another line, + # so it will be read before the second request reads its 64 bytes + # which are already in the queue when this callback is called + # we don't do this in case we got an error + if ($_[1] eq "OK") { + $_[0]->unshift_read (line => sub { + my $response = $_[1]; + ... + }); } }); + + # request two + $handle->push_write ("request 2\015\012"); + + # simply read 64 bytes, always + $handle->push_read (chunk => 64, sub { + my $response = $_[1]; + ... + }); + +=over 4 + +=cut + +sub _drain_rbuf { + my ($self) = @_; + + if ( + defined $self->{rbuf_max} + && $self->{rbuf_max} < length $self->{rbuf} + ) { + return $self->_error (&Errno::ENOSPC, 1); + } + + return if $self->{in_drain}; + local $self->{in_drain} = 1; + + while (my $len = length $self->{rbuf}) { + no strict 'refs'; + if (my $cb = shift @{ $self->{_queue} }) { + unless ($cb->($self)) { + if ($self->{_eof}) { + # no progress can be made (not enough data and no data forthcoming) + return $self->_error (&Errno::EPIPE, 1); + } + + unshift @{ $self->{_queue} }, $cb; + last; + } + } elsif ($self->{on_read}) { + $self->{on_read}($self); + + if ( + $len == length $self->{rbuf} # if no data has been consumed + && !@{ $self->{_queue} } # and the queue is still empty + && $self->{on_read} # but we still have on_read + ) { + # no further data will arrive + # so no progress can be made + return $self->_error (&Errno::EPIPE, 1) + if $self->{_eof}; + + last; # more data might arrive + } + } else { + # read side becomes idle + delete $self->{_rw}; + last; + } + } + + $self->{on_eof}($self) + if $self->{_eof} && $self->{on_eof}; + + # may need to restart read watcher + unless ($self->{_rw}) { + $self->start_read + if $self->{on_read} || @{ $self->{_queue} }; + } } -=item B +=item $handle->on_read ($cb) -=item B +This replaces the currently set C callback, or clears it (when +the new callback is C). See the description of C in the +constructor. -This method will read lines from the filehandle, separated by C<$sep> or C<"\n"> -if C<$sep> is not provided. C<$sep> will be used as "line" separated. +=cut -The C<$callback> will be called when at least one -line could be read. The first argument to the C<$callback> will be the L -object itself and the rest of the arguments will be the read lines. +sub on_read { + my ($self, $cb) = @_; -NOTE: This method will override any callbacks installed via the C method. + $self->{on_read} = $cb; + $self->_drain_rbuf if $cb; +} + +=item $handle->rbuf + +Returns the read buffer (as a modifiable lvalue). + +You can access the read buffer directly as the C<< ->{rbuf} >> member, if +you want. + +NOTE: The read buffer should only be used or modified if the C, +C or C methods are used. The other read methods +automatically manage the read buffer. + +=cut + +sub rbuf : lvalue { + $_[0]{rbuf} +} + +=item $handle->push_read ($cb) + +=item $handle->unshift_read ($cb) + +Append the given callback to the end of the queue (C) or +prepend it (C). + +The callback is called each time some additional read data arrives. + +It must check whether enough data is in the read buffer already. + +If not enough data is available, it must return the empty list or a false +value, in which case it will be called repeatedly until enough data is +available (or an error condition is detected). + +If enough data was available, then the callback must remove all data it is +interested in (which can be none at all) and return a true value. After returning +true, it will be removed from the queue. =cut -sub readlines { - my ($self, $sep, $cb) = @_; +our %RH; - if (ref $sep) { - $cb = $sep; - $sep = "\n"; +sub register_read_type($$) { + $RH{$_[0]} = $_[1]; +} + +sub push_read { + my $self = shift; + my $cb = pop; + + if (@_) { + my $type = shift; - } elsif (not defined $sep) { - $sep = "\n"; + $cb = ($RH{$type} or Carp::croak "unsupported type passed to AnyEvent::Handle::push_read") + ->($self, $cb, @_); } - my $sep_len = length $sep; + push @{ $self->{_queue} }, $cb; + $self->_drain_rbuf; +} - $self->{on_readline} = $cb; +sub unshift_read { + my $self = shift; + my $cb = pop; - $self->on_read (sub { - my @lines; - my $rb = \$_[0]->{rbuf}; - my $pos; - while (($pos = index ($$rb, $sep)) >= 0) { - push @lines, substr $$rb, 0, $pos + $sep_len, ''; - } - $self->{on_readline}->($_[0], @lines); + if (@_) { + my $type = shift; + + $cb = ($RH{$type} or Carp::croak "unsupported type passed to AnyEvent::Handle::unshift_read") + ->($self, $cb, @_); + } + + + unshift @{ $self->{_queue} }, $cb; + $self->_drain_rbuf; +} + +=item $handle->push_read (type => @args, $cb) + +=item $handle->unshift_read (type => @args, $cb) + +Instead of providing a callback that parses the data itself you can chose +between a number of predefined parsing formats, for chunks of data, lines +etc. + +Predefined types are (if you have ideas for additional types, feel free to +drop by and tell us): + +=over 4 + +=item chunk => $octets, $cb->($handle, $data) + +Invoke the callback only once C<$octets> bytes have been read. Pass the +data read to the callback. The callback will never be called with less +data. + +Example: read 2 bytes. + + $handle->push_read (chunk => 2, sub { + warn "yay ", unpack "H*", $_[1]; }); + +=cut + +register_read_type chunk => sub { + my ($self, $cb, $len) = @_; + + sub { + $len <= length $_[0]{rbuf} or return; + $cb->($_[0], substr $_[0]{rbuf}, 0, $len, ""); + 1 + } +}; + +# compatibility with older API +sub push_read_chunk { + $_[0]->push_read (chunk => $_[1], $_[2]); } -=item B +sub unshift_read_chunk { + $_[0]->unshift_read (chunk => $_[1], $_[2]); +} -=item B +=item line => [$eol, ]$cb->($handle, $line, $eol) -=item B +The callback will be called only once a full line (including the end of +line marker, C<$eol>) has been read. This line (excluding the end of line +marker) will be passed to the callback as second argument (C<$line>), and +the end of line marker as the third argument (C<$eol>). + +The end of line marker, C<$eol>, can be either a string, in which case it +will be interpreted as a fixed record end marker, or it can be a regex +object (e.g. created by C), in which case it is interpreted as a +regular expression. + +The end of line marker argument C<$eol> is optional, if it is missing (NOT +undef), then C is used (which is good for most internet +protocols). -This method will write C<$data> to the filehandle and call the C<$callback> -afterwards. If only C<$callback> is provided it will be called when the -write buffer becomes empty the next time (or immediately if it already is empty). +Partial lines at the end of the stream will never be returned, as they are +not marked by the end of line marker. =cut -sub write { - my ($self, $data, $cb) = @_; - if (ref $data) { $cb = $data; undef $data } - push @{$self->{write_bufs}}, [$data, $cb]; - $self->_check_writer; +register_read_type line => sub { + my ($self, $cb, $eol) = @_; + + $eol = qr|(\015?\012)| if @_ < 3; + $eol = quotemeta $eol unless ref $eol; + $eol = qr|^(.*?)($eol)|s; + + sub { + $_[0]{rbuf} =~ s/$eol// or return; + + $cb->($_[0], $1, $2); + 1 + } +}; + +# compatibility with older API +sub push_read_line { + my $self = shift; + $self->push_read (line => @_); } -sub _check_writer { - my ($self) = @_; +sub unshift_read_line { + my $self = shift; + $self->unshift_read (line => @_); +} + +=item netstring => $cb->($handle, $string) + +A netstring (http://cr.yp.to/proto/netstrings.txt, this is not an endorsement). + +Throws an error with C<$!> set to EBADMSG on format violations. + +=cut + +register_read_type netstring => sub { + my ($self, $cb) = @_; - if ($self->{write_w}) { - unless ($self->{write_cb}) { - while (@{$self->{write_bufs}} && not defined $self->{write_bufs}->[0]->[1]) { - my $wba = shift @{$self->{write_bufs}}; - $self->{wbuf} .= $wba->[0]; + sub { + unless ($_[0]{rbuf} =~ s/^(0|[1-9][0-9]*)://) { + if ($_[0]{rbuf} =~ /[^0-9]/) { + $self->_error (&Errno::EBADMSG); } + return; } - return; + + my $len = $1; + + $self->unshift_read (chunk => $len, sub { + my $string = $_[1]; + $_[0]->unshift_read (chunk => 1, sub { + if ($_[1] eq ",") { + $cb->($_[0], $string); + } else { + $self->_error (&Errno::EBADMSG); + } + }); + }); + + 1 } +}; + +=item regex => $accept[, $reject[, $skip], $cb->($handle, $data) + +Makes a regex match against the regex object C<$accept> and returns +everything up to and including the match. + +Example: read a single line terminated by '\n'. + + $handle->push_read (regex => qr<\n>, sub { ... }); + +If C<$reject> is given and not undef, then it determines when the data is +to be rejected: it is matched against the data when the C<$accept> regex +does not match and generates an C error when it matches. This is +useful to quickly reject wrong data (to avoid waiting for a timeout or a +receive buffer overflow). + +Example: expect a single decimal number followed by whitespace, reject +anything else (not the use of an anchor). + + $handle->push_read (regex => qr<^[0-9]+\s>, qr<[^0-9]>, sub { ... }); + +If C<$skip> is given and not C, then it will be matched against +the receive buffer when neither C<$accept> nor C<$reject> match, +and everything preceding and including the match will be accepted +unconditionally. This is useful to skip large amounts of data that you +know cannot be matched, so that the C<$accept> or C<$reject> regex do not +have to start matching from the beginning. This is purely an optimisation +and is usually worth only when you expect more than a few kilobytes. + +Example: expect a http header, which ends at C<\015\012\015\012>. Since we +expect the header to be very large (it isn't in practise, but...), we use +a skip regex to skip initial portions. The skip regex is tricky in that +it only accepts something not ending in either \015 or \012, as these are +required for the accept regex. + + $handle->push_read (regex => + qr<\015\012\015\012>, + undef, # no reject + qr<^.*[^\015\012]>, + sub { ... }); + +=cut - my $wba = shift @{$self->{write_bufs}} - or return; +register_read_type regex => sub { + my ($self, $cb, $accept, $reject, $skip) = @_; - unless (defined $wba->[0]) { - $wba->[1]->($self) if $wba->[1]; - $self->_check_writer; - return; + my $data; + my $rbuf = \$self->{rbuf}; + + sub { + # accept + if ($$rbuf =~ $accept) { + $data .= substr $$rbuf, 0, $+[0], ""; + $cb->($self, $data); + return 1; + } + + # reject + if ($reject && $$rbuf =~ $reject) { + $self->_error (&Errno::EBADMSG); + } + + # skip + if ($skip && $$rbuf =~ $skip) { + $data .= substr $$rbuf, 0, $+[0], ""; + } + + () } +}; - $self->{wbuf} = $wba->[0]; - $self->{write_cb} = $wba->[1]; +=item json => $cb->($handle, $hash_or_arrayref) - $self->{write_w} = - AnyEvent->io (poll => 'w', fh => $self->{fh}, cb => sub { - my $l = syswrite $self->{fh}, $self->{wbuf}, length $self->{wbuf}; +Reads a JSON object or array, decodes it and passes it to the callback. - if (not defined $l) { - return if $! == EAGAIN || $! == EINTR; - delete $self->{write_w}; - $self->{on_error}->($self) if $self->{on_error}; +If a C object was passed to the constructor, then that will be used +for the final decode, otherwise it will create a JSON coder expecting UTF-8. - } else { - substr $self->{wbuf}, 0, $l, ''; +This read type uses the incremental parser available with JSON version +2.09 (and JSON::XS version 2.2) and above. You have to provide a +dependency on your own: this module will load the JSON module, but +AnyEvent does not depend on it itself. - if (length ($self->{wbuf}) == 0) { - $self->{write_cb}->($self) if $self->{write_cb}; +Since JSON texts are fully self-delimiting, the C read and write +types are an ideal simple RPC protocol: just exchange JSON datagrams. See +the C write type description, above, for an actual example. - delete $self->{write_w}; - delete $self->{wbuf}; - delete $self->{write_cb}; +=cut - $self->_check_writer; - } +register_read_type json => sub { + my ($self, $cb, $accept, $reject, $skip) = @_; + + require JSON; + + my $data; + my $rbuf = \$self->{rbuf}; + + my $json = $self->{json} ||= JSON->new->utf8; + + sub { + my $ref = $json->incr_parse ($self->{rbuf}); + + if ($ref) { + $self->{rbuf} = $json->incr_text; + $json->incr_text = ""; + $cb->($self, $ref); + + 1 + } else { + $self->{rbuf} = ""; + () + } + } +}; + +=back + +=item AnyEvent::Handle::register_read_type type => $coderef->($handle, $cb, @args) + +This function (not method) lets you add your own types to C. + +Whenever the given C is used, C will invoke the code +reference with the handle object, the callback and the remaining +arguments. + +The code reference is supposed to return a callback (usually a closure) +that works as a plain read callback (see C<< ->push_read ($cb) >>). + +It should invoke the passed callback when it is done reading (remember to +pass C<$handle> as first argument as all other callbacks do that). + +Note that this is a function, and all types registered this way will be +global, so try to use unique names. + +For examples, see the source of this module (F, +search for C)). + +=item $handle->stop_read + +=item $handle->start_read + +In rare cases you actually do not want to read anything from the +socket. In this case you can call C. Neither C nor +any queued callbacks will be executed then. To start reading again, call +C. + +Note that AnyEvent::Handle will automatically C for you when +you change the C callback or push/unshift a read callback, and it +will automatically C for you when neither C is set nor +there are any read requests in the queue. + +=cut + +sub stop_read { + my ($self) = @_; + + delete $self->{_rw}; +} + +sub start_read { + my ($self) = @_; + + unless ($self->{_rw} || $self->{_eof}) { + Scalar::Util::weaken $self; + + $self->{_rw} = AnyEvent->io (fh => $self->{fh}, poll => "r", cb => sub { + my $rbuf = $self->{filter_r} ? \my $buf : \$self->{rbuf}; + my $len = sysread $self->{fh}, $$rbuf, $self->{read_size} || 8192, length $$rbuf; + + if ($len > 0) { + $self->{_activity} = AnyEvent->now; + + $self->{filter_r} + ? $self->{filter_r}($self, $rbuf) + : $self->_drain_rbuf; + + } elsif (defined $len) { + delete $self->{_rw}; + $self->{_eof} = 1; + $self->_drain_rbuf; + + } elsif ($! != EAGAIN && $! != EINTR && $! != WSAEWOULDBLOCK) { + return $self->_error ($!, 1); } }); + } } +sub _dotls { + my ($self) = @_; + + my $buf; + + if (length $self->{_tls_wbuf}) { + while ((my $len = Net::SSLeay::write ($self->{tls}, $self->{_tls_wbuf})) > 0) { + substr $self->{_tls_wbuf}, 0, $len, ""; + } + } + + if (length ($buf = Net::SSLeay::BIO_read ($self->{_wbio}))) { + $self->{wbuf} .= $buf; + $self->_drain_wbuf; + } + + while (defined ($buf = Net::SSLeay::read ($self->{tls}))) { + if (length $buf) { + $self->{rbuf} .= $buf; + $self->_drain_rbuf; + } else { + # let's treat SSL-eof as we treat normal EOF + $self->{_eof} = 1; + $self->_shutdown; + return; + } + } + + my $err = Net::SSLeay::get_error ($self->{tls}, -1); + + if ($err!= Net::SSLeay::ERROR_WANT_READ ()) { + if ($err == Net::SSLeay::ERROR_SYSCALL ()) { + return $self->_error ($!, 1); + } elsif ($err == Net::SSLeay::ERROR_SSL ()) { + return $self->_error (&Errno::EIO, 1); + } + + # all others are fine for our purposes + } +} + +=item $handle->starttls ($tls[, $tls_ctx]) + +Instead of starting TLS negotiation immediately when the AnyEvent::Handle +object is created, you can also do that at a later time by calling +C. + +The first argument is the same as the C constructor argument (either +C<"connect">, C<"accept"> or an existing Net::SSLeay object). + +The second argument is the optional C object that is +used when AnyEvent::Handle has to create its own TLS connection object. + +The TLS connection object will end up in C<< $handle->{tls} >> after this +call and can be used or changed to your liking. Note that the handshake +might have already started when this function returns. + +=cut + +sub starttls { + my ($self, $ssl, $ctx) = @_; + + $self->stoptls; + + if ($ssl eq "accept") { + $ssl = Net::SSLeay::new ($ctx || TLS_CTX ()); + Net::SSLeay::set_accept_state ($ssl); + } elsif ($ssl eq "connect") { + $ssl = Net::SSLeay::new ($ctx || TLS_CTX ()); + Net::SSLeay::set_connect_state ($ssl); + } + + $self->{tls} = $ssl; + + # basically, this is deep magic (because SSL_read should have the same issues) + # but the openssl maintainers basically said: "trust us, it just works". + # (unfortunately, we have to hardcode constants because the abysmally misdesigned + # and mismaintained ssleay-module doesn't even offer them). + # http://www.mail-archive.com/openssl-dev@openssl.org/msg22420.html + Net::SSLeay::CTX_set_mode ($self->{tls}, + (eval { local $SIG{__DIE__}; Net::SSLeay::MODE_ENABLE_PARTIAL_WRITE () } || 1) + | (eval { local $SIG{__DIE__}; Net::SSLeay::MODE_ACCEPT_MOVING_WRITE_BUFFER () } || 2)); + + $self->{_rbio} = Net::SSLeay::BIO_new (Net::SSLeay::BIO_s_mem ()); + $self->{_wbio} = Net::SSLeay::BIO_new (Net::SSLeay::BIO_s_mem ()); + + Net::SSLeay::set_bio ($ssl, $self->{_rbio}, $self->{_wbio}); + + $self->{filter_w} = sub { + $_[0]{_tls_wbuf} .= ${$_[1]}; + &_dotls; + }; + $self->{filter_r} = sub { + Net::SSLeay::BIO_write ($_[0]{_rbio}, ${$_[1]}); + &_dotls; + }; +} + +=item $handle->stoptls + +Destroys the SSL connection, if any. Partial read or write data will be +lost. + +=cut + +sub stoptls { + my ($self) = @_; + + Net::SSLeay::free (delete $self->{tls}) if $self->{tls}; + + delete $self->{_rbio}; + delete $self->{_wbio}; + delete $self->{_tls_wbuf}; + delete $self->{filter_r}; + delete $self->{filter_w}; +} + +sub DESTROY { + my $self = shift; + + $self->stoptls; +} + +=item AnyEvent::Handle::TLS_CTX + +This function creates and returns the Net::SSLeay::CTX object used by +default for TLS mode. + +The context is created like this: + + Net::SSLeay::load_error_strings; + Net::SSLeay::SSLeay_add_ssl_algorithms; + Net::SSLeay::randomize; + + my $CTX = Net::SSLeay::CTX_new; + + Net::SSLeay::CTX_set_options $CTX, Net::SSLeay::OP_ALL + +=cut + +our $TLS_CTX; + +sub TLS_CTX() { + $TLS_CTX || do { + require Net::SSLeay; + + Net::SSLeay::load_error_strings (); + Net::SSLeay::SSLeay_add_ssl_algorithms (); + Net::SSLeay::randomize (); + + $TLS_CTX = Net::SSLeay::CTX_new (); + + Net::SSLeay::CTX_set_options ($TLS_CTX, Net::SSLeay::OP_ALL ()); + + $TLS_CTX + } +} + +=back + +=head1 SUBCLASSING AnyEvent::Handle + +In many cases, you might want to subclass AnyEvent::Handle. + +To make this easier, a given version of AnyEvent::Handle uses these +conventions: + +=over 4 + +=item * all constructor arguments become object members. + +At least initially, when you pass a C-argument to the constructor it +will end up in C<< $handle->{tls} >>. Those members might be changes or +mutated later on (for example C will hold the TLS connection object). + +=item * other object member names are prefixed with an C<_>. + +All object members not explicitly documented (internal use) are prefixed +with an underscore character, so the remaining non-C<_>-namespace is free +for use for subclasses. + +=item * all members not documented here and not prefixed with an underscore +are free to use in subclasses. + +Of course, new versions of AnyEvent::Handle may introduce more "public" +member variables, but thats just life, at least it is documented. + =back =head1 AUTHOR -Robin Redeker, C<< >> +Robin Redeker C<< >>, Marc Lehmann . =cut