--- AnyEvent/lib/AnyEvent/Handle.pm 2008/05/24 15:26:04 1.27 +++ AnyEvent/lib/AnyEvent/Handle.pm 2008/05/27 06:23:15 1.42 @@ -4,7 +4,7 @@ use strict; use AnyEvent (); -use AnyEvent::Util (); +use AnyEvent::Util qw(WSAEWOULDBLOCK); use Scalar::Util (); use Carp (); use Fcntl (); @@ -14,8 +14,6 @@ AnyEvent::Handle - non-blocking I/O on file handles via AnyEvent -This module is experimental. - =cut our $VERSION = '0.04'; @@ -27,22 +25,25 @@ my $cv = AnyEvent->condvar; - my $ae_fh = AnyEvent::Handle->new (fh => \*STDIN); - - #TODO - - # or use the constructor to pass the callback: - - my $ae_fh2 = + my $handle = AnyEvent::Handle->new ( fh => \*STDIN, on_eof => sub { $cv->broadcast; }, - #TODO ); - $cv->wait; + # send some request line + $handle->push_write ("getinfo\015\012"); + + # read the response line + $handle->push_read (line => sub { + my ($handle, $line) = @_; + warn "read line <$line>\n"; + $cv->send; + }); + + $cv->recv; =head1 DESCRIPTION @@ -74,7 +75,7 @@ NOTE: The filehandle will be set to non-blocking (using AnyEvent::Util::fh_nonblocking). -=item on_eof => $cb->($self) +=item on_eof => $cb->($handle) Set the callback to be called on EOF. @@ -82,7 +83,7 @@ otherwise you might end up with a closed socket while you are still waiting for data. -=item on_error => $cb->($self) +=item on_error => $cb->($handle) This is the fatal error callback, that is called when, well, a fatal error occurs, such as not being able to resolve the hostname, failure to connect @@ -92,26 +93,29 @@ called. On callback entrance, the value of C<$!> contains the operating system -error (or C or C). +error (or C, C or C). + +The callback should throw an exception. If it returns, then +AnyEvent::Handle will C for you. While not mandatory, it is I recommended to set this callback, as you will not be notified of errors otherwise. The default simply calls die. -=item on_read => $cb->($self) +=item on_read => $cb->($handle) This sets the default read callback, which is called when data arrives and no read request is in the queue. To access (and remove data from) the read buffer, use the C<< ->rbuf >> -method or access the C<$self->{rbuf}> member directly. +method or access the C<$handle->{rbuf}> member directly. When an EOF condition is detected then AnyEvent::Handle will first try to feed all the remaining data to the queued callbacks and C before calling the C callback. If no progress can be made, then a fatal error will be raised (with C<$!> set to C). -=item on_drain => $cb->() +=item on_drain => $cb->($handle) This sets the callback that is called when the write buffer becomes empty (or when the callback is set and the buffer is empty already). @@ -166,6 +170,22 @@ (unless a connection object was specified directly). If this parameter is missing, then AnyEvent::Handle will use C. +=item json => JSON or JSON::XS object + +This is the json coder object used by the C read and write types. + +If you don't supply it, then AnyEvent::Handle will create and use a +suitable one, which will write and expect UTF-8 encoded JSON texts. + +Note that you are responsible to depend on the JSON module if you want to +use this functionality, as AnyEvent does not have a dependency itself. + +=item filter_r => $cb + +=item filter_w => $cb + +These exist, but are undocumented at this time. + =back =cut @@ -197,8 +217,8 @@ sub _shutdown { my ($self) = @_; - delete $self->{rw}; - delete $self->{ww}; + delete $self->{_rw}; + delete $self->{_ww}; delete $self->{fh}; } @@ -210,11 +230,10 @@ $self->_shutdown; } - if ($self->{on_error}) { - $self->{on_error}($self); - } else { - die "AnyEvent::Handle uncaught fatal error: $!"; - } + $self->{on_error}($self) + if $self->{on_error}; + + Carp::croak "AnyEvent::Handle uncaught fatal error: $!"; } =item $fh = $handle->fh @@ -223,7 +242,7 @@ =cut -sub fh { $_[0]->{fh} } +sub fh { $_[0]{fh} } =item $handle->on_error ($cb) @@ -289,33 +308,51 @@ sub _drain_wbuf { my ($self) = @_; - unless ($self->{ww}) { + if (!$self->{_ww} && length $self->{wbuf}) { + Scalar::Util::weaken $self; + my $cb = sub { my $len = syswrite $self->{fh}, $self->{wbuf}; - if ($len > 0) { + if ($len >= 0) { substr $self->{wbuf}, 0, $len, ""; $self->{on_drain}($self) if $self->{low_water_mark} >= length $self->{wbuf} && $self->{on_drain}; - delete $self->{ww} unless length $self->{wbuf}; - } elsif ($! != EAGAIN && $! != EINTR) { + delete $self->{_ww} unless length $self->{wbuf}; + } elsif ($! != EAGAIN && $! != EINTR && $! != WSAEWOULDBLOCK) { $self->error; } }; - $self->{ww} = AnyEvent->io (fh => $self->{fh}, poll => "w", cb => $cb); + # try to write data immediately + $cb->(); - $cb->($self); + # if still data left in wbuf, we need to poll + $self->{_ww} = AnyEvent->io (fh => $self->{fh}, poll => "w", cb => $cb) + if length $self->{wbuf}; }; } +our %WH; + +sub register_write_type($$) { + $WH{$_[0]} = $_[1]; +} + sub push_write { my $self = shift; + if (@_ > 1) { + my $type = shift; + + @_ = ($WH{$type} or Carp::croak "unsupported type passed to AnyEvent::Handle::push_write") + ->($self, @_); + } + if ($self->{filter_w}) { $self->{filter_w}->($self, \$_[0]); } else { @@ -324,6 +361,88 @@ } } +=item $handle->push_write (type => @args) + +=item $handle->unshift_write (type => @args) + +Instead of formatting your data yourself, you can also let this module do +the job by specifying a type and type-specific arguments. + +Predefined types are (if you have ideas for additional types, feel free to +drop by and tell us): + +=over 4 + +=item netstring => $string + +Formats the given value as netstring +(http://cr.yp.to/proto/netstrings.txt, this is not a recommendation to use them). + +=back + +=cut + +register_write_type netstring => sub { + my ($self, $string) = @_; + + sprintf "%d:%s,", (length $string), $string +}; + +=item json => $array_or_hashref + +Encodes the given hash or array reference into a JSON object. Unless you +provide your own JSON object, this means it will be encoded to JSON text +in UTF-8. + +JSON objects (and arrays) are self-delimiting, so you can write JSON at +one end of a handle and read them at the other end without using any +additional framing. + +The generated JSON text is guaranteed not to contain any newlines: While +this module doesn't need delimiters after or between JSON texts to be +able to read them, many other languages depend on that. + +A simple RPC protocol that interoperates easily with others is to send +JSON arrays (or objects, although arrays are usually the better choice as +they mimic how function argument passing works) and a newline after each +JSON text: + + $handle->push_write (json => ["method", "arg1", "arg2"]); # whatever + $handle->push_write ("\012"); + +An AnyEvent::Handle receiver would simply use the C read type and +rely on the fact that the newline will be skipped as leading whitespace: + + $handle->push_read (json => sub { my $array = $_[1]; ... }); + +Other languages could read single lines terminated by a newline and pass +this line into their JSON decoder of choice. + +=cut + +register_write_type json => sub { + my ($self, $ref) = @_; + + require JSON; + + $self->{json} ? $self->{json}->encode ($ref) + : JSON::encode_json ($ref) +}; + +=item AnyEvent::Handle::register_write_type type => $coderef->($handle, @args) + +This function (not method) lets you add your own types to C. +Whenever the given C is used, C will invoke the code +reference with the handle object and the remaining arguments. + +The code reference is supposed to return a single octet string that will +be appended to the write buffer. + +Note that this is a function, and all types registered this way will be +global, so try to use unique names. + +=cut + ############################################################################# =back @@ -411,7 +530,8 @@ defined $self->{rbuf_max} && $self->{rbuf_max} < length $self->{rbuf} ) { - $! = &Errno::ENOSPC; return $self->error; + $! = &Errno::ENOSPC; + $self->error; } return if $self->{in_drain}; @@ -419,36 +539,38 @@ while (my $len = length $self->{rbuf}) { no strict 'refs'; - if (my $cb = shift @{ $self->{queue} }) { - if (!$cb->($self)) { - if ($self->{eof}) { + if (my $cb = shift @{ $self->{_queue} }) { + unless ($cb->($self)) { + if ($self->{_eof}) { # no progress can be made (not enough data and no data forthcoming) - $! = &Errno::EPIPE; return $self->error; + $! = &Errno::EPIPE; + $self->error; } - unshift @{ $self->{queue} }, $cb; + unshift @{ $self->{_queue} }, $cb; return; } } elsif ($self->{on_read}) { $self->{on_read}($self); if ( - $self->{eof} # if no further data will arrive + $self->{_eof} # if no further data will arrive && $len == length $self->{rbuf} # and no data has been consumed - && !@{ $self->{queue} } # and the queue is still empty + && !@{ $self->{_queue} } # and the queue is still empty && $self->{on_read} # and we still want to read data ) { # then no progress can be made - $! = &Errno::EPIPE; return $self->error; + $! = &Errno::EPIPE; + $self->error; } } else { # read side becomes idle - delete $self->{rw}; + delete $self->{_rw}; return; } } - if ($self->{eof}) { + if ($self->{_eof}) { $self->_shutdown; $self->{on_eof}($self) if $self->{on_eof}; @@ -507,57 +629,90 @@ =cut +our %RH; + +sub register_read_type($$) { + $RH{$_[0]} = $_[1]; +} + sub push_read { - my ($self, $cb) = @_; + my $self = shift; + my $cb = pop; + + if (@_) { + my $type = shift; + + $cb = ($RH{$type} or Carp::croak "unsupported type passed to AnyEvent::Handle::push_read") + ->($self, $cb, @_); + } - push @{ $self->{queue} }, $cb; + push @{ $self->{_queue} }, $cb; $self->_drain_rbuf; } sub unshift_read { - my ($self, $cb) = @_; + my $self = shift; + my $cb = pop; + + if (@_) { + my $type = shift; + + $cb = ($RH{$type} or Carp::croak "unsupported type passed to AnyEvent::Handle::unshift_read") + ->($self, $cb, @_); + } + - push @{ $self->{queue} }, $cb; + unshift @{ $self->{_queue} }, $cb; $self->_drain_rbuf; } -=item $handle->push_read_chunk ($len, $cb->($self, $data)) +=item $handle->push_read (type => @args, $cb) -=item $handle->unshift_read_chunk ($len, $cb->($self, $data)) +=item $handle->unshift_read (type => @args, $cb) -Append the given callback to the end of the queue (C) or -prepend it (C). +Instead of providing a callback that parses the data itself you can chose +between a number of predefined parsing formats, for chunks of data, lines +etc. -The callback will be called only once C<$len> bytes have been read, and -these C<$len> bytes will be passed to the callback. +Predefined types are (if you have ideas for additional types, feel free to +drop by and tell us): + +=over 4 + +=item chunk => $octets, $cb->($handle, $data) + +Invoke the callback only once C<$octets> bytes have been read. Pass the +data read to the callback. The callback will never be called with less +data. + +Example: read 2 bytes. + + $handle->push_read (chunk => 2, sub { + warn "yay ", unpack "H*", $_[1]; + }); =cut -sub _read_chunk($$) { - my ($self, $len, $cb) = @_; +register_read_type chunk => sub { + my ($self, $cb, $len) = @_; sub { $len <= length $_[0]{rbuf} or return; $cb->($_[0], substr $_[0]{rbuf}, 0, $len, ""); 1 } -} +}; +# compatibility with older API sub push_read_chunk { - $_[0]->push_read (&_read_chunk); + $_[0]->push_read (chunk => $_[1], $_[2]); } - sub unshift_read_chunk { - $_[0]->unshift_read (&_read_chunk); + $_[0]->unshift_read (chunk => $_[1], $_[2]); } -=item $handle->push_read_line ([$eol, ]$cb->($self, $line, $eol)) - -=item $handle->unshift_read_line ([$eol, ]$cb->($self, $line, $eol)) - -Append the given callback to the end of the queue (C) or -prepend it (C). +=item line => [$eol, ]$cb->($handle, $line, $eol) The callback will be called only once a full line (including the end of line marker, C<$eol>) has been read. This line (excluding the end of line @@ -578,12 +733,10 @@ =cut -sub _read_line($$) { - my $self = shift; - my $cb = pop; - my $eol = @_ ? shift : qr|(\015?\012)|; - my $pos; +register_read_type line => sub { + my ($self, $cb, $eol) = @_; + $eol = qr|(\015?\012)| if @_ < 3; $eol = quotemeta $eol unless ref $eol; $eol = qr|^(.*?)($eol)|s; @@ -593,16 +746,194 @@ $cb->($_[0], $1, $2); 1 } -} +}; +# compatibility with older API sub push_read_line { - $_[0]->push_read (&_read_line); + my $self = shift; + $self->push_read (line => @_); } sub unshift_read_line { - $_[0]->unshift_read (&_read_line); + my $self = shift; + $self->unshift_read (line => @_); } +=item netstring => $cb->($handle, $string) + +A netstring (http://cr.yp.to/proto/netstrings.txt, this is not an endorsement). + +Throws an error with C<$!> set to EBADMSG on format violations. + +=cut + +register_read_type netstring => sub { + my ($self, $cb) = @_; + + sub { + unless ($_[0]{rbuf} =~ s/^(0|[1-9][0-9]*)://) { + if ($_[0]{rbuf} =~ /[^0-9]/) { + $! = &Errno::EBADMSG; + $self->error; + } + return; + } + + my $len = $1; + + $self->unshift_read (chunk => $len, sub { + my $string = $_[1]; + $_[0]->unshift_read (chunk => 1, sub { + if ($_[1] eq ",") { + $cb->($_[0], $string); + } else { + $! = &Errno::EBADMSG; + $self->error; + } + }); + }); + + 1 + } +}; + +=item regex => $accept[, $reject[, $skip], $cb->($handle, $data) + +Makes a regex match against the regex object C<$accept> and returns +everything up to and including the match. + +Example: read a single line terminated by '\n'. + + $handle->push_read (regex => qr<\n>, sub { ... }); + +If C<$reject> is given and not undef, then it determines when the data is +to be rejected: it is matched against the data when the C<$accept> regex +does not match and generates an C error when it matches. This is +useful to quickly reject wrong data (to avoid waiting for a timeout or a +receive buffer overflow). + +Example: expect a single decimal number followed by whitespace, reject +anything else (not the use of an anchor). + + $handle->push_read (regex => qr<^[0-9]+\s>, qr<[^0-9]>, sub { ... }); + +If C<$skip> is given and not C, then it will be matched against +the receive buffer when neither C<$accept> nor C<$reject> match, +and everything preceding and including the match will be accepted +unconditionally. This is useful to skip large amounts of data that you +know cannot be matched, so that the C<$accept> or C<$reject> regex do not +have to start matching from the beginning. This is purely an optimisation +and is usually worth only when you expect more than a few kilobytes. + +Example: expect a http header, which ends at C<\015\012\015\012>. Since we +expect the header to be very large (it isn't in practise, but...), we use +a skip regex to skip initial portions. The skip regex is tricky in that +it only accepts something not ending in either \015 or \012, as these are +required for the accept regex. + + $handle->push_read (regex => + qr<\015\012\015\012>, + undef, # no reject + qr<^.*[^\015\012]>, + sub { ... }); + +=cut + +register_read_type regex => sub { + my ($self, $cb, $accept, $reject, $skip) = @_; + + my $data; + my $rbuf = \$self->{rbuf}; + + sub { + # accept + if ($$rbuf =~ $accept) { + $data .= substr $$rbuf, 0, $+[0], ""; + $cb->($self, $data); + return 1; + } + + # reject + if ($reject && $$rbuf =~ $reject) { + $! = &Errno::EBADMSG; + $self->error; + } + + # skip + if ($skip && $$rbuf =~ $skip) { + $data .= substr $$rbuf, 0, $+[0], ""; + } + + () + } +}; + +=item json => $cb->($handle, $hash_or_arrayref) + +Reads a JSON object or array, decodes it and passes it to the callback. + +If a C object was passed to the constructor, then that will be used +for the final decode, otherwise it will create a JSON coder expecting UTF-8. + +This read type uses the incremental parser available with JSON version +2.09 (and JSON::XS version 2.2) and above. You have to provide a +dependency on your own: this module will load the JSON module, but +AnyEvent does not depend on it itself. + +Since JSON texts are fully self-delimiting, the C read and write +types are an ideal simple RPC protocol: just exchange JSON datagrams. See +the C write type description, above, for an actual example. + +=cut + +register_read_type json => sub { + my ($self, $cb, $accept, $reject, $skip) = @_; + + require JSON; + + my $data; + my $rbuf = \$self->{rbuf}; + + my $json = $self->{json} ||= JSON->new->utf8; + + sub { + my $ref = $json->incr_parse ($self->{rbuf}); + + if ($ref) { + $self->{rbuf} = $json->incr_text; + $json->incr_text = ""; + $cb->($self, $ref); + + 1 + } else { + $self->{rbuf} = ""; + () + } + } +}; + +=back + +=item AnyEvent::Handle::register_read_type type => $coderef->($handle, $cb, @args) + +This function (not method) lets you add your own types to C. + +Whenever the given C is used, C will invoke the code +reference with the handle object, the callback and the remaining +arguments. + +The code reference is supposed to return a callback (usually a closure) +that works as a plain read callback (see C<< ->push_read ($cb) >>). + +It should invoke the passed callback when it is done reading (remember to +pass C<$handle> as first argument as all other callbacks do that). + +Note that this is a function, and all types registered this way will be +global, so try to use unique names. + +For examples, see the source of this module (F, +search for C)). + =item $handle->stop_read =item $handle->start_read @@ -617,16 +948,16 @@ sub stop_read { my ($self) = @_; - delete $self->{rw}; + delete $self->{_rw}; } sub start_read { my ($self) = @_; - unless ($self->{rw} || $self->{eof}) { + unless ($self->{_rw} || $self->{_eof}) { Scalar::Util::weaken $self; - $self->{rw} = AnyEvent->io (fh => $self->{fh}, poll => "r", cb => sub { + $self->{_rw} = AnyEvent->io (fh => $self->{fh}, poll => "r", cb => sub { my $rbuf = $self->{filter_r} ? \my $buf : \$self->{rbuf}; my $len = sysread $self->{fh}, $$rbuf, $self->{read_size} || 8192, length $$rbuf; @@ -636,11 +967,11 @@ : $self->_drain_rbuf; } elsif (defined $len) { - delete $self->{rw}; - $self->{eof} = 1; + delete $self->{_rw}; + $self->{_eof} = 1; $self->_drain_rbuf; - } elsif ($! != EAGAIN && $! != EINTR) { + } elsif ($! != EAGAIN && $! != EINTR && $! != WSAEWOULDBLOCK) { return $self->error; } }); @@ -650,13 +981,13 @@ sub _dotls { my ($self) = @_; - if (length $self->{tls_wbuf}) { - while ((my $len = Net::SSLeay::write ($self->{tls}, $self->{tls_wbuf})) > 0) { - substr $self->{tls_wbuf}, 0, $len, ""; + if (length $self->{_tls_wbuf}) { + while ((my $len = Net::SSLeay::write ($self->{tls}, $self->{_tls_wbuf})) > 0) { + substr $self->{_tls_wbuf}, 0, $len, ""; } } - if (defined (my $buf = Net::SSLeay::BIO_read ($self->{tls_wbio}))) { + if (defined (my $buf = Net::SSLeay::BIO_read ($self->{_wbio}))) { $self->{wbuf} .= $buf; $self->_drain_wbuf; } @@ -692,6 +1023,10 @@ The second argument is the optional C object that is used when AnyEvent::Handle has to create its own TLS connection object. +The TLS connection object will end up in C<< $handle->{tls} >> after this +call and can be used or changed to your liking. Note that the handshake +might have already started when this function returns. + =cut # TODO: maybe document... @@ -716,20 +1051,20 @@ # and mismaintained ssleay-module doesn't even offer them). # http://www.mail-archive.com/openssl-dev@openssl.org/msg22420.html Net::SSLeay::CTX_set_mode ($self->{tls}, - (eval { Net::SSLeay::MODE_ENABLE_PARTIAL_WRITE () } || 1) - | (eval { Net::SSLeay::MODE_ACCEPT_MOVING_WRITE_BUFFER () } || 2)); + (eval { local $SIG{__DIE__}; Net::SSLeay::MODE_ENABLE_PARTIAL_WRITE () } || 1) + | (eval { local $SIG{__DIE__}; Net::SSLeay::MODE_ACCEPT_MOVING_WRITE_BUFFER () } || 2)); - $self->{tls_rbio} = Net::SSLeay::BIO_new (Net::SSLeay::BIO_s_mem ()); - $self->{tls_wbio} = Net::SSLeay::BIO_new (Net::SSLeay::BIO_s_mem ()); + $self->{_rbio} = Net::SSLeay::BIO_new (Net::SSLeay::BIO_s_mem ()); + $self->{_wbio} = Net::SSLeay::BIO_new (Net::SSLeay::BIO_s_mem ()); - Net::SSLeay::set_bio ($ssl, $self->{tls_rbio}, $self->{tls_wbio}); + Net::SSLeay::set_bio ($ssl, $self->{_rbio}, $self->{_wbio}); $self->{filter_w} = sub { - $_[0]{tls_wbuf} .= ${$_[1]}; + $_[0]{_tls_wbuf} .= ${$_[1]}; &_dotls; }; $self->{filter_r} = sub { - Net::SSLeay::BIO_write ($_[0]{tls_rbio}, ${$_[1]}); + Net::SSLeay::BIO_write ($_[0]{_rbio}, ${$_[1]}); &_dotls; }; } @@ -745,9 +1080,10 @@ my ($self) = @_; Net::SSLeay::free (delete $self->{tls}) if $self->{tls}; - delete $self->{tls_rbio}; - delete $self->{tls_wbio}; - delete $self->{tls_wbuf}; + + delete $self->{_rbio}; + delete $self->{_wbio}; + delete $self->{_tls_wbuf}; delete $self->{filter_r}; delete $self->{filter_w}; } @@ -795,6 +1131,35 @@ =back +=head1 SUBCLASSING AnyEvent::Handle + +In many cases, you might want to subclass AnyEvent::Handle. + +To make this easier, a given version of AnyEvent::Handle uses these +conventions: + +=over 4 + +=item * all constructor arguments become object members. + +At least initially, when you pass a C-argument to the constructor it +will end up in C<< $handle->{tls} >>. Those members might be changes or +mutated later on (for example C will hold the TLS connection object). + +=item * other object member names are prefixed with an C<_>. + +All object members not explicitly documented (internal use) are prefixed +with an underscore character, so the remaining non-C<_>-namespace is free +for use for subclasses. + +=item * all members not documented here and not prefixed with an underscore +are free to use in subclasses. + +Of course, new versions of AnyEvent::Handle may introduce more "public" +member variables, but thats just life, at least it is documented. + +=back + =head1 AUTHOR Robin Redeker C<< >>, Marc Lehmann .