--- AnyEvent/lib/AnyEvent/Handle.pm 2008/05/15 09:03:43 1.12 +++ AnyEvent/lib/AnyEvent/Handle.pm 2008/05/24 15:19:43 1.25 @@ -12,11 +12,13 @@ =head1 NAME -AnyEvent::Handle - non-blocking I/O on filehandles via AnyEvent +AnyEvent::Handle - non-blocking I/O on file handles via AnyEvent + +This module is experimental. =cut -our $VERSION = '0.02'; +our $VERSION = '0.04'; =head1 SYNOPSIS @@ -45,8 +47,8 @@ =head1 DESCRIPTION This module is a helper module to make it easier to do event-based I/O on -filehandles (and sockets, see L for an easy way to make -non-blocking resolves and connects). +filehandles. For utility functions for doing non-blocking connects and accepts +on sockets see L. In the following, when the documentation refers to of "bytes" then this means characters. As sysread and syswrite are used for all I/O, their @@ -72,14 +74,18 @@ NOTE: The filehandle will be set to non-blocking (using AnyEvent::Util::fh_nonblocking). -=item on_eof => $cb->($self) [MANDATORY] +=item on_eof => $cb->($self) Set the callback to be called on EOF. +While not mandatory, it is highly recommended to set an eof callback, +otherwise you might end up with a closed socket while you are still +waiting for data. + =item on_error => $cb->($self) This is the fatal error callback, that is called when, well, a fatal error -ocurs, such as not being able to resolve the hostname, failure to connect +occurs, such as not being able to resolve the hostname, failure to connect or a read error. The object will not be in a usable state when this callback has been @@ -98,7 +104,7 @@ and no read request is in the queue. To access (and remove data from) the read buffer, use the C<< ->rbuf >> -method or acces sthe C<$self->{rbuf}> member directly. +method or access the C<$self->{rbuf}> member directly. When an EOF condition is detected then AnyEvent::Handle will first try to feed all the remaining data to the queued callbacks and C before @@ -135,6 +141,26 @@ buffer: If the write reaches this size or gets even samller it is considered empty. +=item tls => "accept" | "connect" | Net::SSLeay::SSL object + +When this parameter is given, it enables TLS (SSL) mode, that means it +will start making tls handshake and will transparently encrypt/decrypt +data. + +For the TLS server side, use C, and for the TLS client side of a +connection, use C mode. + +You can also provide your own TLS connection object, but you have +to make sure that you call either C +or C on it before you pass it to +AnyEvent::Handle. + +=item tls_ctx => $ssl_ctx + +Use the given Net::SSLeay::CTX object to create the new TLS connection +(unless a connection object was specified directly). If this parameter is +missing, then AnyEvent::Handle will use C. + =back =cut @@ -148,8 +174,12 @@ AnyEvent::Util::fh_nonblocking $self->{fh}, 1; - $self->on_eof ((delete $self->{on_eof} ) or Carp::croak "mandatory argument on_eof is missing"); + if ($self->{tls}) { + require Net::SSLeay; + $self->starttls (delete $self->{tls}, delete $self->{tls_ctx}); + } + $self->on_eof (delete $self->{on_eof} ) if $self->{on_eof}; $self->on_error (delete $self->{on_error}) if $self->{on_error}; $self->on_drain (delete $self->{on_drain}) if $self->{on_drain}; $self->on_read (delete $self->{on_read} ) if $self->{on_read}; @@ -184,7 +214,7 @@ =item $fh = $handle->fh -This method returns the filehandle of the L object. +This method returns the file handle of the L object. =cut @@ -222,7 +252,7 @@ The write queue is very simple: you can add data to its end, and AnyEvent::Handle will automatically try to get rid of it for you. -When data could be writtena nd the write buffer is shorter then the low +When data could be written and the write buffer is shorter then the low water mark, the C callback will be invoked. =over 4 @@ -251,10 +281,8 @@ =cut -sub push_write { - my ($self, $data) = @_; - - $self->{wbuf} .= $data; +sub _drain_wbuf { + my ($self) = @_; unless ($self->{ww}) { Scalar::Util::weaken $self; @@ -264,7 +292,6 @@ if ($len > 0) { substr $self->{wbuf}, 0, $len, ""; - $self->{on_drain}($self) if $self->{low_water_mark} >= length $self->{wbuf} && $self->{on_drain}; @@ -281,6 +308,17 @@ }; } +sub push_write { + my $self = shift; + + if ($self->{filter_w}) { + $self->{filter_w}->($self, \$_[0]); + } else { + $self->{wbuf} .= $_[0]; + $self->_drain_wbuf; + } +} + ############################################################################# =back @@ -364,6 +402,13 @@ sub _drain_rbuf { my ($self) = @_; + if ( + defined $self->{rbuf_max} + && $self->{rbuf_max} < length $self->{rbuf} + ) { + $! = &Errno::ENOSPC; return $self->error; + } + return if $self->{in_drain}; local $self->{in_drain} = 1; @@ -400,7 +445,8 @@ if ($self->{eof}) { $self->_shutdown; - $self->{on_eof}($self); + $self->{on_eof}($self) + if $self->{on_eof}; } } @@ -444,7 +490,7 @@ The callback is called each time some additional read data arrives. -It must check wether enough data is in the read buffer already. +It must check whether enough data is in the read buffer already. If not enough data is available, it must return the empty list or a false value, in which case it will be called repeatedly until enough data is @@ -533,8 +579,8 @@ my $eol = @_ ? shift : qr|(\015?\012)|; my $pos; - $eol = qr|(\Q$eol\E)| unless ref $eol; - $eol = qr|^(.*?)($eol)|; + $eol = quotemeta $eol unless ref $eol; + $eol = qr|^(.*?)($eol)|s; sub { $_[0]{rbuf} =~ s/$eol// or return; @@ -556,9 +602,9 @@ =item $handle->start_read -In rare cases you actually do not want to read anything form the +In rare cases you actually do not want to read anything from the socket. In this case you can call C. Neither C no -any queued callbacks will be executed then. To start readign again, call +any queued callbacks will be executed then. To start reading again, call C. =cut @@ -576,28 +622,171 @@ Scalar::Util::weaken $self; $self->{rw} = AnyEvent->io (fh => $self->{fh}, poll => "r", cb => sub { - my $len = sysread $self->{fh}, $self->{rbuf}, $self->{read_size} || 8192, length $self->{rbuf}; + my $rbuf = $self->{filter_r} ? \my $buf : \$self->{rbuf}; + my $len = sysread $self->{fh}, $$rbuf, $self->{read_size} || 8192, length $$rbuf; if ($len > 0) { - if (defined $self->{rbuf_max}) { - if ($self->{rbuf_max} < length $self->{rbuf}) { - $! = &Errno::ENOSPC; return $self->error; - } - } + $self->{filter_r} + ? $self->{filter_r}->($self, $rbuf) + : $self->_drain_rbuf; } elsif (defined $len) { - $self->{eof} = 1; delete $self->{rw}; + $self->{eof} = 1; + $self->_drain_rbuf; } elsif ($! != EAGAIN && $! != EINTR) { return $self->error; } - - $self->_drain_rbuf; }); } } +sub _dotls { + my ($self) = @_; + + if (length $self->{tls_wbuf}) { + while ((my $len = Net::SSLeay::write ($self->{tls}, $self->{tls_wbuf})) > 0) { + substr $self->{tls_wbuf}, 0, $len, ""; + } + } + + if (defined (my $buf = Net::SSLeay::BIO_read ($self->{tls_wbio}))) { + $self->{wbuf} .= $buf; + $self->_drain_wbuf; + } + + while (defined (my $buf = Net::SSLeay::read ($self->{tls}))) { + $self->{rbuf} .= $buf; + $self->_drain_rbuf; + } + + my $err = Net::SSLeay::get_error ($self->{tls}, -1); + + if ($err!= Net::SSLeay::ERROR_WANT_READ ()) { + if ($err == Net::SSLeay::ERROR_SYSCALL ()) { + $self->error; + } elsif ($err == Net::SSLeay::ERROR_SSL ()) { + $! = &Errno::EIO; + $self->error; + } + + # all others are fine for our purposes + } +} + +=item $handle->starttls ($tls[, $tls_ctx]) + +Instead of starting TLS negotiation immediately when the AnyEvent::Handle +object is created, you can also do that at a later time by calling +C. + +The first argument is the same as the C constructor argument (either +C<"connect">, C<"accept"> or an existing Net::SSLeay object). + +The second argument is the optional C object that is +used when AnyEvent::Handle has to create its own TLS connection object. + +=cut + +# TODO: maybe document... +sub starttls { + my ($self, $ssl, $ctx) = @_; + + $self->stoptls; + + if ($ssl eq "accept") { + $ssl = Net::SSLeay::new ($ctx || TLS_CTX ()); + Net::SSLeay::set_accept_state ($ssl); + } elsif ($ssl eq "connect") { + $ssl = Net::SSLeay::new ($ctx || TLS_CTX ()); + Net::SSLeay::set_connect_state ($ssl); + } + + $self->{tls} = $ssl; + + # basically, this is deep magic (because SSL_read should have the same issues) + # but the openssl maintainers basically said: "trust us, it just works". + # (unfortunately, we have to hardcode constants because the abysmally misdesigned + # and mismaintained ssleay-module doesn't even offer them). + Net::SSLeay::CTX_set_mode ($self->{tls}, + (eval { Net::SSLeay::MODE_ENABLE_PARTIAL_WRITE () } || 1) + | (eval { Net::SSLeay::MODE_ACCEPT_MOVING_WRITE_BUFFER () } || 2)); + + $self->{tls_rbio} = Net::SSLeay::BIO_new (Net::SSLeay::BIO_s_mem ()); + $self->{tls_wbio} = Net::SSLeay::BIO_new (Net::SSLeay::BIO_s_mem ()); + + Net::SSLeay::set_bio ($ssl, $self->{tls_rbio}, $self->{tls_wbio}); + + $self->{filter_w} = sub { + $_[0]{tls_wbuf} .= ${$_[1]}; + &_dotls; + }; + $self->{filter_r} = sub { + Net::SSLeay::BIO_write ($_[0]{tls_rbio}, ${$_[1]}); + &_dotls; + }; +} + +=item $handle->stoptls + +Destroys the SSL connection, if any. Partial read or write data will be +lost. + +=cut + +sub stoptls { + my ($self) = @_; + + Net::SSLeay::free (delete $self->{tls}) if $self->{tls}; + delete $self->{tls_rbio}; + delete $self->{tls_wbio}; + delete $self->{tls_wbuf}; + delete $self->{filter_r}; + delete $self->{filter_w}; +} + +sub DESTROY { + my $self = shift; + + $self->stoptls; +} + +=item AnyEvent::Handle::TLS_CTX + +This function creates and returns the Net::SSLeay::CTX object used by +default for TLS mode. + +The context is created like this: + + Net::SSLeay::load_error_strings; + Net::SSLeay::SSLeay_add_ssl_algorithms; + Net::SSLeay::randomize; + + my $CTX = Net::SSLeay::CTX_new; + + Net::SSLeay::CTX_set_options $CTX, Net::SSLeay::OP_ALL + +=cut + +our $TLS_CTX; + +sub TLS_CTX() { + $TLS_CTX || do { + require Net::SSLeay; + + Net::SSLeay::load_error_strings (); + Net::SSLeay::SSLeay_add_ssl_algorithms (); + Net::SSLeay::randomize (); + + $TLS_CTX = Net::SSLeay::CTX_new (); + + Net::SSLeay::CTX_set_options ($TLS_CTX, Net::SSLeay::OP_ALL ()); + + $TLS_CTX + } +} + =back =head1 AUTHOR