--- AnyEvent/lib/AnyEvent/Handle.pm 2008/08/21 23:48:35 1.88 +++ AnyEvent/lib/AnyEvent/Handle.pm 2009/04/26 18:12:53 1.124 @@ -16,7 +16,7 @@ =cut -our $VERSION = 4.233; +our $VERSION = 4.4; =head1 SYNOPSIS @@ -29,7 +29,7 @@ AnyEvent::Handle->new ( fh => \*STDIN, on_eof => sub { - $cv->broadcast; + $cv->send; }, ); @@ -86,11 +86,11 @@ connection cleanly. For sockets, this just means that the other side has stopped sending data, -you can still try to write data, and, in fact, one can return from the eof +you can still try to write data, and, in fact, one can return from the EOF callback and continue writing data, as only the read part has been shut down. -While not mandatory, it is I recommended to set an eof callback, +While not mandatory, it is I recommended to set an EOF callback, otherwise you might end up with a closed socket while you are still waiting for data. @@ -129,7 +129,9 @@ read buffer). To access (and remove data from) the read buffer, use the C<< ->rbuf >> -method or access the C<$handle->{rbuf}> member directly. +method or access the C<$handle->{rbuf}> member directly. Note that you +must not enlarge or modify the read buffer, you can only remove data at +the beginning from it. When an EOF condition is detected then AnyEvent::Handle will first try to feed all the remaining data to the queued callbacks and C before @@ -234,7 +236,8 @@ system treats outstanding data at socket close time). This will not work for partial TLS data that could not be encoded -yet. This data will be lost. +yet. This data will be lost. Calling the C method in time might +help. =item tls => "accept" | "connect" | Net::SSLeay::SSL object @@ -256,6 +259,11 @@ or C on it before you pass it to AnyEvent::Handle. +B since Net::SSLeay "objects" are really only integers, +passing in the wrong integer will lead to certain crash. This most often +happens when one uses a stylish C<< tls => 1 >> and is surprised about the +segmentation fault. + See the C<< ->starttls >> method for when need to start TLS negotiation later. =item tls_ctx => $ssl_ctx @@ -275,13 +283,6 @@ Note that you are responsible to depend on the JSON module if you want to use this functionality, as AnyEvent does not have a dependency itself. -=item filter_r => $cb - -=item filter_w => $cb - -These exist, but are undocumented at this time. (They are used internally -by the TLS code). - =back =cut @@ -295,10 +296,8 @@ AnyEvent::Util::fh_nonblocking $self->{fh}, 1; - if ($self->{tls}) { - require Net::SSLeay; - $self->starttls (delete $self->{tls}, delete $self->{tls_ctx}); - } + $self->starttls (delete $self->{tls}, delete $self->{tls_ctx}) + if $self->{tls}; $self->{_activity} = AnyEvent->now; $self->_timeout; @@ -320,7 +319,7 @@ delete $self->{_ww}; delete $self->{fh}; - $self->stoptls; + &_freetls; delete $self->{on_read}; delete $self->{_queue}; @@ -336,7 +335,7 @@ if ($self->{on_error}) { $self->{on_error}($self, $fatal); - } else { + } elsif ($self->{fh}) { Carp::croak "AnyEvent::Handle uncaught error: $!"; } } @@ -384,10 +383,14 @@ =item $handle->autocork ($boolean) Enables or disables the current autocork behaviour (see C -constructor argument). +constructor argument). Changes will only take effect on the next write. =cut +sub autocork { + $_[0]{autocork} = $_[1]; +} + =item $handle->no_delay ($boolean) Enables or disables the C setting (see constructor argument of @@ -489,7 +492,7 @@ $self->{on_drain} = $cb; $cb->($self) - if $cb && $self->{low_water_mark} >= length $self->{wbuf}; + if $cb && $self->{low_water_mark} >= (length $self->{wbuf}) + (length $self->{_tls_wbuf}); } =item $handle->push_write ($data) @@ -516,7 +519,7 @@ $self->{_activity} = AnyEvent->now; $self->{on_drain}($self) - if $self->{low_water_mark} >= length $self->{wbuf} + if $self->{low_water_mark} >= (length $self->{wbuf}) + (length $self->{_tls_wbuf}) && $self->{on_drain}; delete $self->{_ww} unless length $self->{wbuf}; @@ -550,8 +553,10 @@ ->($self, @_); } - if ($self->{filter_w}) { - $self->{filter_w}($self, \$_[0]); + if ($self->{tls}) { + $self->{_tls_wbuf} .= $_[0]; + + &_dotls ($self); } else { $self->{wbuf} .= $_[0]; $self->_drain_wbuf; @@ -578,7 +583,7 @@ register_write_type netstring => sub { my ($self, $string) = @_; - sprintf "%d:%s,", (length $string), $string + (length $string) . ":$string," }; =item packstring => $format, $data @@ -766,6 +771,10 @@ } while () { + # we need to use a separate tls read buffer, as we must not receive data while + # we are draining the buffer, and this can only happen with TLS. + $self->{rbuf} .= delete $self->{_tls_rbuf} if exists $self->{_tls_rbuf}; + my $len = length $self->{rbuf}; if (my $cb = shift @{ $self->{_queue} }) { @@ -797,7 +806,7 @@ } } else { # read side becomes idle - delete $self->{_rw}; + delete $self->{_rw} unless $self->{tls}; last; } } @@ -836,8 +845,11 @@ Returns the read buffer (as a modifiable lvalue). -You can access the read buffer directly as the C<< ->{rbuf} >> member, if -you want. +You can access the read buffer directly as the C<< ->{rbuf} >> +member, if you want. However, the only operation allowed on the +read buffer (apart from looking at it) is removing data from its +beginning. Otherwise modifying or appending to it is not allowed and will +lead to hard-to-track-down bugs. NOTE: The read buffer should only be used or modified if the C, C or C methods are used. The other read methods @@ -1102,7 +1114,8 @@ integer only (only one of C is allowed, plus an optional C, C<< < >> or C<< > >> modifier). -DNS over TCP uses a prefix of C, EPP uses a prefix of C. +For example, DNS over TCP uses a prefix of C (2 octet network order), +EPP uses a prefix of C (4 octtes). Example: read a block of data prefixed by its length in BER-encoded format (very efficient). @@ -1142,7 +1155,8 @@ =item json => $cb->($handle, $hash_or_arrayref) -Reads a JSON object or array, decodes it and passes it to the callback. +Reads a JSON object or array, decodes it and passes it to the +callback. When a parse error occurs, an C error will be raised. If a C object was passed to the constructor, then that will be used for the final decode, otherwise it will create a JSON coder expecting UTF-8. @@ -1169,7 +1183,7 @@ my $json = $self->{json} ||= JSON->new->utf8; sub { - my $ref = $json->incr_parse ($self->{rbuf}); + my $ref = eval { $json->incr_parse ($self->{rbuf}) }; if ($ref) { $self->{rbuf} = $json->incr_text; @@ -1177,8 +1191,19 @@ $cb->($self, $ref); 1 + } elsif ($@) { + # error case + $json->incr_skip; + + $self->{rbuf} = $json->incr_text; + $json->incr_text = ""; + + $self->_error (&Errno::EBADMSG); + + () } else { $self->{rbuf} = ""; + () } } @@ -1265,12 +1290,15 @@ will automatically C for you when neither C is set nor there are any read requests in the queue. +These methods will have no effect when in TLS mode (as TLS doesn't support +half-duplex connections). + =cut sub stop_read { my ($self) = @_; - delete $self->{_rw}; + delete $self->{_rw} unless $self->{tls}; } sub start_read { @@ -1280,15 +1308,19 @@ Scalar::Util::weaken $self; $self->{_rw} = AnyEvent->io (fh => $self->{fh}, poll => "r", cb => sub { - my $rbuf = $self->{filter_r} ? \my $buf : \$self->{rbuf}; + my $rbuf = \($self->{tls} ? my $buf : $self->{rbuf}); my $len = sysread $self->{fh}, $$rbuf, $self->{read_size} || 8192, length $$rbuf; if ($len > 0) { $self->{_activity} = AnyEvent->now; - $self->{filter_r} - ? $self->{filter_r}($self, $rbuf) - : $self->{_in_drain} || $self->_drain_rbuf; + if ($self->{tls}) { + Net::SSLeay::BIO_write ($self->{_rbio}, $$rbuf); + + &_dotls ($self); + } else { + $self->_drain_rbuf unless $self->{_in_drain}; + } } elsif (defined $len) { delete $self->{_rw}; @@ -1302,44 +1334,46 @@ } } +# poll the write BIO and send the data if applicable sub _dotls { my ($self) = @_; - my $buf; + my $tmp; if (length $self->{_tls_wbuf}) { - while ((my $len = Net::SSLeay::write ($self->{tls}, $self->{_tls_wbuf})) > 0) { - substr $self->{_tls_wbuf}, 0, $len, ""; + while (($tmp = Net::SSLeay::write ($self->{tls}, $self->{_tls_wbuf})) > 0) { + substr $self->{_tls_wbuf}, 0, $tmp, ""; } } - if (length ($buf = Net::SSLeay::BIO_read ($self->{_wbio}))) { - $self->{wbuf} .= $buf; - $self->_drain_wbuf; - } - - while (defined ($buf = Net::SSLeay::read ($self->{tls}))) { - if (length $buf) { - $self->{rbuf} .= $buf; - $self->_drain_rbuf unless $self->{_in_drain}; - } else { + while (defined ($tmp = Net::SSLeay::read ($self->{tls}))) { + unless (length $tmp) { # let's treat SSL-eof as we treat normal EOF + delete $self->{_rw}; $self->{_eof} = 1; - $self->_shutdown; - return; + &_freetls; } + + $self->{_tls_rbuf} .= $tmp; + $self->_drain_rbuf unless $self->{_in_drain}; + $self->{tls} or return; # tls session might have gone away in callback } - my $err = Net::SSLeay::get_error ($self->{tls}, -1); + $tmp = Net::SSLeay::get_error ($self->{tls}, -1); - if ($err!= Net::SSLeay::ERROR_WANT_READ ()) { - if ($err == Net::SSLeay::ERROR_SYSCALL ()) { + if ($tmp != Net::SSLeay::ERROR_WANT_READ ()) { + if ($tmp == Net::SSLeay::ERROR_SYSCALL ()) { return $self->_error ($!, 1); - } elsif ($err == Net::SSLeay::ERROR_SSL ()) { + } elsif ($tmp == Net::SSLeay::ERROR_SSL ()) { return $self->_error (&Errno::EIO, 1); } - # all others are fine for our purposes + # all other errors are fine for our purposes + } + + while (length ($tmp = Net::SSLeay::BIO_read ($self->{_wbio}))) { + $self->{wbuf} .= $tmp; + $self->_drain_wbuf; } } @@ -1359,13 +1393,19 @@ call and can be used or changed to your liking. Note that the handshake might have already started when this function returns. +If it an error to start a TLS handshake more than once per +AnyEvent::Handle object (this is due to bugs in OpenSSL). + =cut sub starttls { my ($self, $ssl, $ctx) = @_; - $self->stoptls; + require Net::SSLeay; + Carp::croak "it is an error to call starttls more than once on an AnyEvent::Handle object" + if $self->{tls}; + if ($ssl eq "accept") { $ssl = Net::SSLeay::new ($ctx || TLS_CTX ()); Net::SSLeay::set_accept_state ($ssl); @@ -1384,9 +1424,10 @@ # # in short: this is a mess. # - # note that we do not try to kepe the length constant between writes as we are required to do. + # note that we do not try to keep the length constant between writes as we are required to do. # we assume that most (but not all) of this insanity only applies to non-blocking cases, - # and we drive openssl fully in blocking mode here. + # and we drive openssl fully in blocking mode here. Or maybe we don't - openssl seems to + # have identity issues in that area. Net::SSLeay::CTX_set_mode ($self->{tls}, (eval { local $SIG{__DIE__}; Net::SSLeay::MODE_ENABLE_PARTIAL_WRITE () } || 1) | (eval { local $SIG{__DIE__}; Net::SSLeay::MODE_ACCEPT_MOVING_WRITE_BUFFER () } || 2)); @@ -1396,39 +1437,47 @@ Net::SSLeay::set_bio ($ssl, $self->{_rbio}, $self->{_wbio}); - $self->{filter_w} = sub { - $_[0]{_tls_wbuf} .= ${$_[1]}; - &_dotls; - }; - $self->{filter_r} = sub { - Net::SSLeay::BIO_write ($_[0]{_rbio}, ${$_[1]}); - &_dotls; - }; + &_dotls; # need to trigger the initial handshake + $self->start_read; # make sure we actually do read } =item $handle->stoptls -Destroys the SSL connection, if any. Partial read or write data will be -lost. +Shuts down the SSL connection - this makes a proper EOF handshake by +sending a close notify to the other side, but since OpenSSL doesn't +support non-blocking shut downs, it is not possible to re-use the stream +afterwards. =cut sub stoptls { my ($self) = @_; - Net::SSLeay::free (delete $self->{tls}) if $self->{tls}; + if ($self->{tls}) { + Net::SSLeay::shutdown ($self->{tls}); + + &_dotls; - delete $self->{_rbio}; - delete $self->{_wbio}; - delete $self->{_tls_wbuf}; - delete $self->{filter_r}; - delete $self->{filter_w}; + # we don't give a shit. no, we do, but we can't. no... + # we, we... have to use openssl :/ + &_freetls; + } +} + +sub _freetls { + my ($self) = @_; + + return unless $self->{tls}; + + Net::SSLeay::free (delete $self->{tls}); + + delete @$self{qw(_rbio _wbio _tls_wbuf)}; } sub DESTROY { - my $self = shift; + my ($self) = @_; - $self->stoptls; + &_freetls; my $linger = exists $self->{linger} ? $self->{linger} : 3600; @@ -1453,6 +1502,31 @@ } } +=item $handle->destroy + +Shuts down the handle object as much as possible - this call ensures that +no further callbacks will be invoked and resources will be freed as much +as possible. You must not call any methods on the object afterwards. + +Normally, you can just "forget" any references to an AnyEvent::Handle +object and it will simply shut down. This works in fatal error and EOF +callbacks, as well as code outside. It does I work in a read or write +callback, so when you want to destroy the AnyEvent::Handle object from +within such an callback. You I call C<< ->destroy >> explicitly in +that case. + +The handle might still linger in the background and write out remaining +data, as specified by the C option, however. + +=cut + +sub destroy { + my ($self) = @_; + + $self->DESTROY; + %$self = (); +} + =item AnyEvent::Handle::TLS_CTX This function creates and returns the Net::SSLeay::CTX object used by @@ -1490,6 +1564,78 @@ =back + +=head1 NONFREQUENTLY ASKED QUESTIONS + +=over 4 + +=item I C the AnyEvent::Handle reference inside my callback and +still get further invocations! + +That's because AnyEvent::Handle keeps a reference to itself when handling +read or write callbacks. + +It is only safe to "forget" the reference inside EOF or error callbacks, +from within all other callbacks, you need to explicitly call the C<< +->destroy >> method. + +=item I get different callback invocations in TLS mode/Why can't I pause +reading? + +Unlike, say, TCP, TLS connections do not consist of two independent +communication channels, one for each direction. Or put differently. The +read and write directions are not independent of each other: you cannot +write data unless you are also prepared to read, and vice versa. + +This can mean than, in TLS mode, you might get C or C +callback invocations when you are not expecting any read data - the reason +is that AnyEvent::Handle always reads in TLS mode. + +During the connection, you have to make sure that you always have a +non-empty read-queue, or an C watcher. At the end of the +connection (or when you no longer want to use it) you can call the +C method. + +=item How do I read data until the other side closes the connection? + +If you just want to read your data into a perl scalar, the easiest way +to achieve this is by setting an C callback that does nothing, +clearing the C callback and in the C callback, the data +will be in C<$_[0]{rbuf}>: + + $handle->on_read (sub { }); + $handle->on_eof (undef); + $handle->on_error (sub { + my $data = delete $_[0]{rbuf}; + undef $handle; + }); + +The reason to use C is that TCP connections, due to latencies +and packets loss, might get closed quite violently with an error, when in +fact, all data has been received. + +It is usually better to use acknowledgements when transferring data, +to make sure the other side hasn't just died and you got the data +intact. This is also one reason why so many internet protocols have an +explicit QUIT command. + +=item I don't want to destroy the handle too early - how do I wait until +all data has been written? + +After writing your last bits of data, set the C callback +and destroy the handle in there - with the default setting of +C this will be called precisely when all data has been +written to the socket: + + $handle->push_write (...); + $handle->on_drain (sub { + warn "all data submitted to the kernel\n"; + undef $handle; + }); + +=back + + =head1 SUBCLASSING AnyEvent::Handle In many cases, you might want to subclass AnyEvent::Handle.