--- AnyEvent/lib/AnyEvent/Handle.pm 2008/05/24 05:01:16 1.18 +++ AnyEvent/lib/AnyEvent/Handle.pm 2008/05/27 04:59:51 1.39 @@ -4,7 +4,7 @@ use strict; use AnyEvent (); -use AnyEvent::Util (); +use AnyEvent::Util qw(WSAWOULDBLOCK); use Scalar::Util (); use Carp (); use Fcntl (); @@ -12,9 +12,7 @@ =head1 NAME -AnyEvent::Handle - non-blocking I/O on filehandles via AnyEvent - -This module is experimental. +AnyEvent::Handle - non-blocking I/O on file handles via AnyEvent =cut @@ -27,22 +25,25 @@ my $cv = AnyEvent->condvar; - my $ae_fh = AnyEvent::Handle->new (fh => \*STDIN); - - #TODO - - # or use the constructor to pass the callback: - - my $ae_fh2 = + my $handle = AnyEvent::Handle->new ( fh => \*STDIN, on_eof => sub { $cv->broadcast; }, - #TODO ); - $cv->wait; + # send some request line + $handle->push_write ("getinfo\015\012"); + + # read the response line + $handle->push_read (line => sub { + my ($handle, $line) = @_; + warn "read line <$line>\n"; + $cv->send; + }); + + $cv->recv; =head1 DESCRIPTION @@ -85,14 +86,17 @@ =item on_error => $cb->($self) This is the fatal error callback, that is called when, well, a fatal error -ocurs, such as not being able to resolve the hostname, failure to connect +occurs, such as not being able to resolve the hostname, failure to connect or a read error. The object will not be in a usable state when this callback has been called. On callback entrance, the value of C<$!> contains the operating system -error (or C or C). +error (or C, C or C). + +The callback should throw an exception. If it returns, then +AnyEvent::Handle will C for you. While not mandatory, it is I recommended to set this callback, as you will not be notified of errors otherwise. The default simply calls @@ -104,7 +108,7 @@ and no read request is in the queue. To access (and remove data from) the read buffer, use the C<< ->rbuf >> -method or acces sthe C<$self->{rbuf}> member directly. +method or access the C<$self->{rbuf}> member directly. When an EOF condition is detected then AnyEvent::Handle will first try to feed all the remaining data to the queued callbacks and C before @@ -141,6 +145,37 @@ buffer: If the write reaches this size or gets even samller it is considered empty. +=item tls => "accept" | "connect" | Net::SSLeay::SSL object + +When this parameter is given, it enables TLS (SSL) mode, that means it +will start making tls handshake and will transparently encrypt/decrypt +data. + +TLS mode requires Net::SSLeay to be installed (it will be loaded +automatically when you try to create a TLS handle). + +For the TLS server side, use C, and for the TLS client side of a +connection, use C mode. + +You can also provide your own TLS connection object, but you have +to make sure that you call either C +or C on it before you pass it to +AnyEvent::Handle. + +See the C method if you need to start TLs negotiation later. + +=item tls_ctx => $ssl_ctx + +Use the given Net::SSLeay::CTX object to create the new TLS connection +(unless a connection object was specified directly). If this parameter is +missing, then AnyEvent::Handle will use C. + +=item filter_r => $cb + +=item filter_w => $cb + +These exist, but are undocumented at this time. + =back =cut @@ -154,6 +189,11 @@ AnyEvent::Util::fh_nonblocking $self->{fh}, 1; + if ($self->{tls}) { + require Net::SSLeay; + $self->starttls (delete $self->{tls}, delete $self->{tls_ctx}); + } + $self->on_eof (delete $self->{on_eof} ) if $self->{on_eof}; $self->on_error (delete $self->{on_error}) if $self->{on_error}; $self->on_drain (delete $self->{on_drain}) if $self->{on_drain}; @@ -167,8 +207,8 @@ sub _shutdown { my ($self) = @_; - delete $self->{rw}; - delete $self->{ww}; + delete $self->{_rw}; + delete $self->{_ww}; delete $self->{fh}; } @@ -180,20 +220,19 @@ $self->_shutdown; } - if ($self->{on_error}) { - $self->{on_error}($self); - } else { - die "AnyEvent::Handle uncaught fatal error: $!"; - } + $self->{on_error}($self) + if $self->{on_error}; + + Carp::croak "AnyEvent::Handle uncaught fatal error: $!"; } =item $fh = $handle->fh -This method returns the filehandle of the L object. +This method returns the file handle of the L object. =cut -sub fh { $_[0]->{fh} } +sub fh { $_[0]{fh} } =item $handle->on_error ($cb) @@ -227,7 +266,7 @@ The write queue is very simple: you can add data to its end, and AnyEvent::Handle will automatically try to get rid of it for you. -When data could be writtena nd the write buffer is shorter then the low +When data could be written and the write buffer is shorter then the low water mark, the C callback will be invoked. =over 4 @@ -259,33 +298,51 @@ sub _drain_wbuf { my ($self) = @_; - unless ($self->{ww}) { + if (!$self->{_ww} && length $self->{wbuf}) { + Scalar::Util::weaken $self; + my $cb = sub { my $len = syswrite $self->{fh}, $self->{wbuf}; - if ($len > 0) { + if ($len >= 0) { substr $self->{wbuf}, 0, $len, ""; $self->{on_drain}($self) if $self->{low_water_mark} >= length $self->{wbuf} && $self->{on_drain}; - delete $self->{ww} unless length $self->{wbuf}; - } elsif ($! != EAGAIN && $! != EINTR) { + delete $self->{_ww} unless length $self->{wbuf}; + } elsif ($! != EAGAIN && $! != EINTR && $! != WSAWOULDBLOCK) { $self->error; } }; - $self->{ww} = AnyEvent->io (fh => $self->{fh}, poll => "w", cb => $cb); + # try to write data immediately + $cb->(); - $cb->($self); + # if still data left in wbuf, we need to poll + $self->{_ww} = AnyEvent->io (fh => $self->{fh}, poll => "w", cb => $cb) + if length $self->{wbuf}; }; } +our %WH; + +sub register_write_type($$) { + $WH{$_[0]} = $_[1]; +} + sub push_write { my $self = shift; + if (@_ > 1) { + my $type = shift; + + @_ = ($WH{$type} or Carp::croak "unsupported type passed to AnyEvent::Handle::push_write") + ->($self, @_); + } + if ($self->{filter_w}) { $self->{filter_w}->($self, \$_[0]); } else { @@ -294,6 +351,49 @@ } } +=item $handle->push_write (type => @args) + +=item $handle->unshift_write (type => @args) + +Instead of formatting your data yourself, you can also let this module do +the job by specifying a type and type-specific arguments. + +Predefined types are (if you have ideas for additional types, feel free to +drop by and tell us): + +=over 4 + +=item netstring => $string + +Formats the given value as netstring +(http://cr.yp.to/proto/netstrings.txt, this is not a recommendation to use them). + +=back + +=cut + +register_write_type netstring => sub { + my ($self, $string) = @_; + + sprintf "%d:%s,", (length $string), $string +}; + +=item json => $array_or_hashref + +=item AnyEvent::Handle::register_write_type type => $coderef->($self, @args) + +This function (not method) lets you add your own types to C. +Whenever the given C is used, C will invoke the code +reference with the handle object and the remaining arguments. + +The code reference is supposed to return a single octet string that will +be appended to the write buffer. + +Note that this is a function, and all types registered this way will be +global, so try to use unique names. + +=cut + ############################################################################# =back @@ -381,7 +481,8 @@ defined $self->{rbuf_max} && $self->{rbuf_max} < length $self->{rbuf} ) { - $! = &Errno::ENOSPC; return $self->error; + $! = &Errno::ENOSPC; + $self->error; } return if $self->{in_drain}; @@ -389,36 +490,38 @@ while (my $len = length $self->{rbuf}) { no strict 'refs'; - if (my $cb = shift @{ $self->{queue} }) { - if (!$cb->($self)) { - if ($self->{eof}) { + if (my $cb = shift @{ $self->{_queue} }) { + unless ($cb->($self)) { + if ($self->{_eof}) { # no progress can be made (not enough data and no data forthcoming) - $! = &Errno::EPIPE; return $self->error; + $! = &Errno::EPIPE; + $self->error; } - unshift @{ $self->{queue} }, $cb; + unshift @{ $self->{_queue} }, $cb; return; } } elsif ($self->{on_read}) { $self->{on_read}($self); if ( - $self->{eof} # if no further data will arrive + $self->{_eof} # if no further data will arrive && $len == length $self->{rbuf} # and no data has been consumed - && !@{ $self->{queue} } # and the queue is still empty + && !@{ $self->{_queue} } # and the queue is still empty && $self->{on_read} # and we still want to read data ) { # then no progress can be made - $! = &Errno::EPIPE; return $self->error; + $! = &Errno::EPIPE; + $self->error; } } else { # read side becomes idle - delete $self->{rw}; + delete $self->{_rw}; return; } } - if ($self->{eof}) { + if ($self->{_eof}) { $self->_shutdown; $self->{on_eof}($self) if $self->{on_eof}; @@ -465,7 +568,7 @@ The callback is called each time some additional read data arrives. -It must check wether enough data is in the read buffer already. +It must check whether enough data is in the read buffer already. If not enough data is available, it must return the empty list or a false value, in which case it will be called repeatedly until enough data is @@ -477,57 +580,90 @@ =cut +our %RH; + +sub register_read_type($$) { + $RH{$_[0]} = $_[1]; +} + sub push_read { - my ($self, $cb) = @_; + my $self = shift; + my $cb = pop; + + if (@_) { + my $type = shift; - push @{ $self->{queue} }, $cb; + $cb = ($RH{$type} or Carp::croak "unsupported type passed to AnyEvent::Handle::push_read") + ->($self, $cb, @_); + } + + push @{ $self->{_queue} }, $cb; $self->_drain_rbuf; } sub unshift_read { - my ($self, $cb) = @_; + my $self = shift; + my $cb = pop; - push @{ $self->{queue} }, $cb; + if (@_) { + my $type = shift; + + $cb = ($RH{$type} or Carp::croak "unsupported type passed to AnyEvent::Handle::unshift_read") + ->($self, $cb, @_); + } + + + unshift @{ $self->{_queue} }, $cb; $self->_drain_rbuf; } -=item $handle->push_read_chunk ($len, $cb->($self, $data)) +=item $handle->push_read (type => @args, $cb) + +=item $handle->unshift_read (type => @args, $cb) -=item $handle->unshift_read_chunk ($len, $cb->($self, $data)) +Instead of providing a callback that parses the data itself you can chose +between a number of predefined parsing formats, for chunks of data, lines +etc. -Append the given callback to the end of the queue (C) or -prepend it (C). +Predefined types are (if you have ideas for additional types, feel free to +drop by and tell us): -The callback will be called only once C<$len> bytes have been read, and -these C<$len> bytes will be passed to the callback. +=over 4 + +=item chunk => $octets, $cb->($self, $data) + +Invoke the callback only once C<$octets> bytes have been read. Pass the +data read to the callback. The callback will never be called with less +data. + +Example: read 2 bytes. + + $handle->push_read (chunk => 2, sub { + warn "yay ", unpack "H*", $_[1]; + }); =cut -sub _read_chunk($$) { - my ($self, $len, $cb) = @_; +register_read_type chunk => sub { + my ($self, $cb, $len) = @_; sub { $len <= length $_[0]{rbuf} or return; $cb->($_[0], substr $_[0]{rbuf}, 0, $len, ""); 1 } -} +}; +# compatibility with older API sub push_read_chunk { - $_[0]->push_read (&_read_chunk); + $_[0]->push_read (chunk => $_[1], $_[2]); } - sub unshift_read_chunk { - $_[0]->unshift_read (&_read_chunk); + $_[0]->unshift_read (chunk => $_[1], $_[2]); } -=item $handle->push_read_line ([$eol, ]$cb->($self, $line, $eol)) - -=item $handle->unshift_read_line ([$eol, ]$cb->($self, $line, $eol)) - -Append the given callback to the end of the queue (C) or -prepend it (C). +=item line => [$eol, ]$cb->($self, $line, $eol) The callback will be called only once a full line (including the end of line marker, C<$eol>) has been read. This line (excluding the end of line @@ -548,12 +684,10 @@ =cut -sub _read_line($$) { - my $self = shift; - my $cb = pop; - my $eol = @_ ? shift : qr|(\015?\012)|; - my $pos; +register_read_type line => sub { + my ($self, $cb, $eol) = @_; + $eol = qr|(\015?\012)| if @_ < 3; $eol = quotemeta $eol unless ref $eol; $eol = qr|^(.*?)($eol)|s; @@ -563,23 +697,157 @@ $cb->($_[0], $1, $2); 1 } -} +}; +# compatibility with older API sub push_read_line { - $_[0]->push_read (&_read_line); + my $self = shift; + $self->push_read (line => @_); } sub unshift_read_line { - $_[0]->unshift_read (&_read_line); + my $self = shift; + $self->unshift_read (line => @_); } +=item netstring => $cb->($string) + +A netstring (http://cr.yp.to/proto/netstrings.txt, this is not an endorsement). + +Throws an error with C<$!> set to EBADMSG on format violations. + +=cut + +register_read_type netstring => sub { + my ($self, $cb) = @_; + + sub { + unless ($_[0]{rbuf} =~ s/^(0|[1-9][0-9]*)://) { + if ($_[0]{rbuf} =~ /[^0-9]/) { + $! = &Errno::EBADMSG; + $self->error; + } + return; + } + + my $len = $1; + + $self->unshift_read (chunk => $len, sub { + my $string = $_[1]; + $_[0]->unshift_read (chunk => 1, sub { + if ($_[1] eq ",") { + $cb->($_[0], $string); + } else { + $! = &Errno::EBADMSG; + $self->error; + } + }); + }); + + 1 + } +}; + +=item regex => $accept[, $reject[, $skip], $cb->($data) + +Makes a regex match against the regex object C<$accept> and returns +everything up to and including the match. + +Example: read a single line terminated by '\n'. + + $handle->push_read (regex => qr<\n>, sub { ... }); + +If C<$reject> is given and not undef, then it determines when the data is +to be rejected: it is matched against the data when the C<$accept> regex +does not match and generates an C error when it matches. This is +useful to quickly reject wrong data (to avoid waiting for a timeout or a +receive buffer overflow). + +Example: expect a single decimal number followed by whitespace, reject +anything else (not the use of an anchor). + + $handle->push_read (regex => qr<^[0-9]+\s>, qr<[^0-9]>, sub { ... }); + +If C<$skip> is given and not C, then it will be matched against +the receive buffer when neither C<$accept> nor C<$reject> match, +and everything preceding and including the match will be accepted +unconditionally. This is useful to skip large amounts of data that you +know cannot be matched, so that the C<$accept> or C<$reject> regex do not +have to start matching from the beginning. This is purely an optimisation +and is usually worth only when you expect more than a few kilobytes. + +Example: expect a http header, which ends at C<\015\012\015\012>. Since we +expect the header to be very large (it isn't in practise, but...), we use +a skip regex to skip initial portions. The skip regex is tricky in that +it only accepts something not ending in either \015 or \012, as these are +required for the accept regex. + + $handle->push_read (regex => + qr<\015\012\015\012>, + undef, # no reject + qr<^.*[^\015\012]>, + sub { ... }); + +=cut + +register_read_type regex => sub { + my ($self, $cb, $accept, $reject, $skip) = @_; + + my $data; + my $rbuf = \$self->{rbuf}; + + sub { + # accept + if ($$rbuf =~ $accept) { + $data .= substr $$rbuf, 0, $+[0], ""; + $cb->($self, $data); + return 1; + } + + # reject + if ($reject && $$rbuf =~ $reject) { + $! = &Errno::EBADMSG; + $self->error; + } + + # skip + if ($skip && $$rbuf =~ $skip) { + $data .= substr $$rbuf, 0, $+[0], ""; + } + + () + } +}; + +=back + +=item AnyEvent::Handle::register_read_type type => $coderef->($self, $cb, @args) + +This function (not method) lets you add your own types to C. + +Whenever the given C is used, C will invoke the code +reference with the handle object, the callback and the remaining +arguments. + +The code reference is supposed to return a callback (usually a closure) +that works as a plain read callback (see C<< ->push_read ($cb) >>). + +It should invoke the passed callback when it is done reading (remember to +pass C<$self> as first argument as all other callbacks do that). + +Note that this is a function, and all types registered this way will be +global, so try to use unique names. + +For examples, see the source of this module (F, +search for C)). + =item $handle->stop_read =item $handle->start_read In rare cases you actually do not want to read anything from the socket. In this case you can call C. Neither C no -any queued callbacks will be executed then. To start readign again, call +any queued callbacks will be executed then. To start reading again, call C. =cut @@ -587,16 +855,16 @@ sub stop_read { my ($self) = @_; - delete $self->{rw}; + delete $self->{_rw}; } sub start_read { my ($self) = @_; - unless ($self->{rw} || $self->{eof}) { + unless ($self->{_rw} || $self->{_eof}) { Scalar::Util::weaken $self; - $self->{rw} = AnyEvent->io (fh => $self->{fh}, poll => "r", cb => sub { + $self->{_rw} = AnyEvent->io (fh => $self->{fh}, poll => "r", cb => sub { my $rbuf = $self->{filter_r} ? \my $buf : \$self->{rbuf}; my $len = sysread $self->{fh}, $$rbuf, $self->{read_size} || 8192, length $$rbuf; @@ -606,17 +874,197 @@ : $self->_drain_rbuf; } elsif (defined $len) { - delete $self->{rw}; - $self->{eof} = 1; + delete $self->{_rw}; + $self->{_eof} = 1; $self->_drain_rbuf; - } elsif ($! != EAGAIN && $! != EINTR) { + } elsif ($! != EAGAIN && $! != EINTR && $! != &AnyEvent::Util::WSAWOULDBLOCK) { return $self->error; } }); } } +sub _dotls { + my ($self) = @_; + + if (length $self->{_tls_wbuf}) { + while ((my $len = Net::SSLeay::write ($self->{tls}, $self->{_tls_wbuf})) > 0) { + substr $self->{_tls_wbuf}, 0, $len, ""; + } + } + + if (defined (my $buf = Net::SSLeay::BIO_read ($self->{_wbio}))) { + $self->{wbuf} .= $buf; + $self->_drain_wbuf; + } + + while (defined (my $buf = Net::SSLeay::read ($self->{tls}))) { + $self->{rbuf} .= $buf; + $self->_drain_rbuf; + } + + my $err = Net::SSLeay::get_error ($self->{tls}, -1); + + if ($err!= Net::SSLeay::ERROR_WANT_READ ()) { + if ($err == Net::SSLeay::ERROR_SYSCALL ()) { + $self->error; + } elsif ($err == Net::SSLeay::ERROR_SSL ()) { + $! = &Errno::EIO; + $self->error; + } + + # all others are fine for our purposes + } +} + +=item $handle->starttls ($tls[, $tls_ctx]) + +Instead of starting TLS negotiation immediately when the AnyEvent::Handle +object is created, you can also do that at a later time by calling +C. + +The first argument is the same as the C constructor argument (either +C<"connect">, C<"accept"> or an existing Net::SSLeay object). + +The second argument is the optional C object that is +used when AnyEvent::Handle has to create its own TLS connection object. + +The TLS connection object will end up in C<< $handle->{tls} >> after this +call and can be used or changed to your liking. Note that the handshake +might have already started when this function returns. + +=cut + +# TODO: maybe document... +sub starttls { + my ($self, $ssl, $ctx) = @_; + + $self->stoptls; + + if ($ssl eq "accept") { + $ssl = Net::SSLeay::new ($ctx || TLS_CTX ()); + Net::SSLeay::set_accept_state ($ssl); + } elsif ($ssl eq "connect") { + $ssl = Net::SSLeay::new ($ctx || TLS_CTX ()); + Net::SSLeay::set_connect_state ($ssl); + } + + $self->{tls} = $ssl; + + # basically, this is deep magic (because SSL_read should have the same issues) + # but the openssl maintainers basically said: "trust us, it just works". + # (unfortunately, we have to hardcode constants because the abysmally misdesigned + # and mismaintained ssleay-module doesn't even offer them). + # http://www.mail-archive.com/openssl-dev@openssl.org/msg22420.html + Net::SSLeay::CTX_set_mode ($self->{tls}, + (eval { local $SIG{__DIE__}; Net::SSLeay::MODE_ENABLE_PARTIAL_WRITE () } || 1) + | (eval { local $SIG{__DIE__}; Net::SSLeay::MODE_ACCEPT_MOVING_WRITE_BUFFER () } || 2)); + + $self->{_rbio} = Net::SSLeay::BIO_new (Net::SSLeay::BIO_s_mem ()); + $self->{_wbio} = Net::SSLeay::BIO_new (Net::SSLeay::BIO_s_mem ()); + + Net::SSLeay::set_bio ($ssl, $self->{_rbio}, $self->{_wbio}); + + $self->{filter_w} = sub { + $_[0]{_tls_wbuf} .= ${$_[1]}; + &_dotls; + }; + $self->{filter_r} = sub { + Net::SSLeay::BIO_write ($_[0]{_rbio}, ${$_[1]}); + &_dotls; + }; +} + +=item $handle->stoptls + +Destroys the SSL connection, if any. Partial read or write data will be +lost. + +=cut + +sub stoptls { + my ($self) = @_; + + Net::SSLeay::free (delete $self->{tls}) if $self->{tls}; + + delete $self->{_rbio}; + delete $self->{_wbio}; + delete $self->{_tls_wbuf}; + delete $self->{filter_r}; + delete $self->{filter_w}; +} + +sub DESTROY { + my $self = shift; + + $self->stoptls; +} + +=item AnyEvent::Handle::TLS_CTX + +This function creates and returns the Net::SSLeay::CTX object used by +default for TLS mode. + +The context is created like this: + + Net::SSLeay::load_error_strings; + Net::SSLeay::SSLeay_add_ssl_algorithms; + Net::SSLeay::randomize; + + my $CTX = Net::SSLeay::CTX_new; + + Net::SSLeay::CTX_set_options $CTX, Net::SSLeay::OP_ALL + +=cut + +our $TLS_CTX; + +sub TLS_CTX() { + $TLS_CTX || do { + require Net::SSLeay; + + Net::SSLeay::load_error_strings (); + Net::SSLeay::SSLeay_add_ssl_algorithms (); + Net::SSLeay::randomize (); + + $TLS_CTX = Net::SSLeay::CTX_new (); + + Net::SSLeay::CTX_set_options ($TLS_CTX, Net::SSLeay::OP_ALL ()); + + $TLS_CTX + } +} + +=back + +=head1 SUBCLASSING AnyEvent::Handle + +In many cases, you might want to subclass AnyEvent::Handle. + +To make this easier, a given version of AnyEvent::Handle uses these +conventions: + +=over 4 + +=item * all constructor arguments become object members. + +At least initially, when you pass a C-argument to the constructor it +will end up in C<< $handle->{tls} >>. Those members might be changes or +mutated later on (for example C will hold the TLS connection object). + +=item * other object member names are prefixed with an C<_>. + +All object members not explicitly documented (internal use) are prefixed +with an underscore character, so the remaining non-C<_>-namespace is free +for use for subclasses. + +=item * all members not documented here and not prefixed with an underscore +are free to use in subclasses. + +Of course, new versions of AnyEvent::Handle may introduce more "public" +member variables, but thats just life, at least it is documented. + =back =head1 AUTHOR