--- AnyEvent/lib/AnyEvent/Handle.pm 2009/07/03 00:09:04 1.134 +++ AnyEvent/lib/AnyEvent/Handle.pm 2010/03/15 18:51:30 1.193 @@ -1,23 +1,7 @@ -package AnyEvent::Handle; - -no warnings; -use strict qw(subs vars); - -use AnyEvent (); -use AnyEvent::Util qw(WSAEWOULDBLOCK); -use Scalar::Util (); -use Carp (); -use Fcntl (); -use Errno qw(EAGAIN EINTR); - =head1 NAME AnyEvent::Handle - non-blocking I/O on file handles via AnyEvent -=cut - -our $VERSION = 4.45; - =head1 SYNOPSIS use AnyEvent; @@ -25,21 +9,22 @@ my $cv = AnyEvent->condvar; - my $handle = - AnyEvent::Handle->new ( - fh => \*STDIN, - on_eof => sub { - $cv->send; - }, - ); + my $hdl; $hdl = new AnyEvent::Handle + fh => \*STDIN, + on_error => sub { + my ($hdl, $fatal, $msg) = @_; + warn "got error $msg\n"; + $hdl->destroy; + $cv->send; + }; # send some request line - $handle->push_write ("getinfo\015\012"); + $hdl->push_write ("getinfo\015\012"); # read the response line - $handle->push_read (line => sub { - my ($handle, $line) = @_; - warn "read line <$line>\n"; + $hdl->push_read (line => sub { + my ($hdl, $line) = @_; + warn "got line <$line>\n"; $cv->send; }); @@ -48,8 +33,7 @@ =head1 DESCRIPTION This module is a helper module to make it easier to do event-based I/O on -filehandles. For utility functions for doing non-blocking connects and accepts -on sockets see L. +filehandles. The L tutorial contains some well-documented AnyEvent::Handle examples. @@ -58,44 +42,113 @@ means characters. As sysread and syswrite are used for all I/O, their treatment of characters applies to this module as well. +At the very minimum, you should specify C or C, and the +C callback. + All callbacks will be invoked with the handle object as their first argument. +=cut + +package AnyEvent::Handle; + +use Scalar::Util (); +use List::Util (); +use Carp (); +use Errno qw(EAGAIN EINTR); + +use AnyEvent (); BEGIN { AnyEvent::common_sense } +use AnyEvent::Util qw(WSAEWOULDBLOCK); + +our $VERSION = $AnyEvent::VERSION; + +sub _load_func($) { + my $func = $_[0]; + + unless (defined &$func) { + my $pkg = $func; + do { + $pkg =~ s/::[^:]+$// + or return; + eval "require $pkg"; + } until defined &$func; + } + + \&$func +} + =head1 METHODS =over 4 -=item $handle = B AnyEvent::TLS fh => $filehandle, key => value... +=item $handle = B AnyEvent::Handle fh => $filehandle, key => value... The constructor supports these arguments (all as C<< key => value >> pairs). =over 4 -=item fh => $filehandle [MANDATORY] +=item fh => $filehandle [C or C MANDATORY] The filehandle this L object will operate on. - NOTE: The filehandle will be set to non-blocking mode (using C) by the constructor and needs to stay in that mode. -=item on_eof => $cb->($handle) +=item connect => [$host, $service] [C or C MANDATORY] -Set the callback to be called when an end-of-file condition is detected, -i.e. in the case of a socket, when the other side has closed the -connection cleanly. +Try to connect to the specified host and service (port), using +C. The C<$host> additionally becomes the +default C. -For sockets, this just means that the other side has stopped sending data, -you can still try to write data, and, in fact, one can return from the EOF -callback and continue writing data, as only the read part has been shut -down. +You have to specify either this parameter, or C, above. -While not mandatory, it is I recommended to set an EOF callback, -otherwise you might end up with a closed socket while you are still -waiting for data. +It is possible to push requests on the read and write queues, and modify +properties of the stream, even while AnyEvent::Handle is connecting. -If an EOF condition has been detected but no C callback has been -set, then a fatal error will be raised with C<$!> set to <0>. +When this parameter is specified, then the C, +C and C callbacks will be called under the +appropriate circumstances: + +=over 4 + +=item on_prepare => $cb->($handle) + +This (rarely used) callback is called before a new connection is +attempted, but after the file handle has been created. It could be used to +prepare the file handle with parameters required for the actual connect +(as opposed to settings that can be changed when the connection is already +established). + +The return value of this callback should be the connect timeout value in +seconds (or C<0>, or C, or the empty list, to indicate the default +timeout is to be used). + +=item on_connect => $cb->($handle, $host, $port, $retry->()) + +This callback is called when a connection has been successfully established. + +The actual numeric host and port (the socket peername) are passed as +parameters, together with a retry callback. + +When, for some reason, the handle is not acceptable, then calling +C<$retry> will continue with the next connection target (in case of +multi-homed hosts or SRV records there can be multiple connection +endpoints). At the time it is called the read and write queues, eof +status, tls status and similar properties of the handle will have been +reset. + +In most cases, ignoring the C<$retry> parameter is the way to go. + +=item on_connect_error => $cb->($handle, $message) + +This callback is called when the connection could not be +established. C<$!> will contain the relevant error code, and C<$message> a +message describing it (usually the same as C<"$!">). + +If this callback isn't specified, then C will be called with a +fatal error instead. + +=back =item on_error => $cb->($handle, $fatal, $message) @@ -104,10 +157,12 @@ connect or a read error. Some errors are fatal (which is indicated by C<$fatal> being true). On -fatal errors the handle object will be shut down and will not be usable -(but you are free to look at the current C<< ->rbuf >>). Examples of fatal -errors are an EOF condition with active (but unsatisifable) read watchers -(C) or I/O errors. +fatal errors the handle object will be destroyed (by a call to C<< -> +destroy >>) after invoking the error callback (which means you are free to +examine the handle object). Examples of fatal errors are an EOF condition +with active (but unsatisifable) read watchers (C) or I/O errors. In +cases where the other side can close the connection at their will it is +often easiest to not report C errors in this callback. AnyEvent::Handle tries to find an appropriate error code for you to check against, but in some cases (TLS errors), this does not work well. It is @@ -135,7 +190,7 @@ read buffer). To access (and remove data from) the read buffer, use the C<< ->rbuf >> -method or access the C<$handle->{rbuf}> member directly. Note that you +method or access the C<< $handle->{rbuf} >> member directly. Note that you must not enlarge or modify the read buffer, you can only remove data at the beginning from it. @@ -144,6 +199,27 @@ calling the C callback. If no progress can be made, then a fatal error will be raised (with C<$!> set to C). +Note that, unlike requests in the read queue, an C callback +doesn't mean you I some data: if there is an EOF and there +are outstanding read requests then an error will be flagged. With an +C callback, the C callback will be invoked. + +=item on_eof => $cb->($handle) + +Set the callback to be called when an end-of-file condition is detected, +i.e. in the case of a socket, when the other side has closed the +connection cleanly, and there are no outstanding read requests in the +queue (if there are read requests, then an EOF counts as an unexpected +connection close and will be flagged as an error). + +For sockets, this just means that the other side has stopped sending data, +you can still try to write data, and, in fact, one can return from the EOF +callback and continue writing data, as only the read part has been shut +down. + +If an EOF condition has been detected but no C callback has been +set, then a fatal error will be raised with C<$!> set to <0>. + =item on_drain => $cb->($handle) This sets the callback that is called when the write buffer becomes empty @@ -159,10 +235,21 @@ =item timeout => $fractional_seconds -If non-zero, then this enables an "inactivity" timeout: whenever this many -seconds pass without a successful read or write on the underlying file -handle, the C callback will be invoked (and if that one is -missing, a non-fatal C error will be raised). +=item rtimeout => $fractional_seconds + +=item wtimeout => $fractional_seconds + +If non-zero, then these enables an "inactivity" timeout: whenever this +many seconds pass without a successful read or write on the underlying +file handle (or a call to C), the C callback +will be invoked (and if that one is missing, a non-fatal C +error will be raised). + +There are three variants of the timeouts that work fully independent +of each other, for both read and write, just read, and just write: +C, C and C, with corresponding callbacks +C, C and C, and reset functions +C, C, and C. Note that timeout processing is also active when you currently do not have any outstanding read or write requests: If you plan to keep the connection @@ -216,6 +303,38 @@ The default is your opertaing system's default behaviour (most likely enabled), this option explicitly enables or disables it, if possible. +=item keepalive => + +Enables (default disable) the SO_KEEPALIVE option on the stream socket: +normally, TCP connections have no time-out once established, so TCP +connections, once established, can stay alive forever even when the other +side has long gone. TCP keepalives are a cheap way to take down long-lived +TCP connections whent he other side becomes unreachable. While the default +is OS-dependent, TCP keepalives usually kick in after around two hours, +and, if the other side doesn't reply, take down the TCP connection some 10 +to 15 minutes later. + +It is harmless to specify this option for file handles that do not support +keepalives, and enabling it on connections that are potentially long-lived +is usually a good idea. + +=item oobinline => + +BSD majorly fucked up the implementation of TCP urgent data. The result +is that almost no OS implements TCP according to the specs, and every OS +implements it slightly differently. + +If you want to handle TCP urgent data, then setting this flag (the default +is enabled) gives you the most portable way of getting urgent data, by +putting it into the stream. + +Since BSD emulation of OOB data on top of TCP's urgent data can have +security implications, AnyEvent::Handle sets this flag automatically +unless explicitly specified. Note that setting this flag after +establishing a connection I be a bit too late (data loss could +already have occured on BSD systems), but at least it will protect you +from most attacks. + =item read_size => The default read block size (the amount of bytes this module will @@ -251,12 +370,14 @@ (I IDN!) used to create the connection, rarely the IP address. Apart from being useful in error messages, this string is also used in TLS -common name verification (see C in L). +peername verification (see C in L). This +verification will be skipped when C is not specified or +C. =item tls => "accept" | "connect" | Net::SSLeay::SSL object When this parameter is given, it enables TLS (SSL) mode, that means -AnyEvent will start a TLS handshake as soon as the conenction has been +AnyEvent will start a TLS handshake as soon as the connection has been established and will transparently encrypt/decrypt data afterwards. All TLS protocol errors will be signalled as C, with an @@ -298,6 +419,38 @@ => value >> pairs. Those will be passed to L to create a new TLS context object. +=item on_starttls => $cb->($handle, $success[, $error_message]) + +This callback will be invoked when the TLS/SSL handshake has finished. If +C<$success> is true, then the TLS handshake succeeded, otherwise it failed +(C will not be called in this case). + +The session in C<< $handle->{tls} >> can still be examined in this +callback, even when the handshake was not successful. + +TLS handshake failures will not cause C to be invoked when this +callback is in effect, instead, the error message will be passed to C. + +Without this callback, handshake failures lead to C being +called, as normal. + +Note that you cannot call C right again in this callback. If you +need to do that, start an zero-second timer instead whose callback can +then call C<< ->starttls >> again. + +=item on_stoptls => $cb->($handle) + +When a SSLv3/TLS shutdown/close notify/EOF is detected and this callback is +set, then it will be invoked after freeing the TLS session. If it is not, +then a TLS shutdown condition will be treated like a normal EOF condition +on the handle. + +The session in C<< $handle->{tls} >> can still be examined in this +callback. + +This callback will only be called on TLS shutdowns, not when the +underlying handle signals EOF. + =item json => JSON or JSON::XS object This is the json coder object used by the C read and write types. @@ -317,47 +470,107 @@ my $class = shift; my $self = bless { @_ }, $class; - $self->{fh} or Carp::croak "mandatory argument fh is missing"; + if ($self->{fh}) { + $self->_start; + return unless $self->{fh}; # could be gone by now + + } elsif ($self->{connect}) { + require AnyEvent::Socket; + + $self->{peername} = $self->{connect}[0] + unless exists $self->{peername}; + + $self->{_skip_drain_rbuf} = 1; + + { + Scalar::Util::weaken (my $self = $self); + + $self->{_connect} = + AnyEvent::Socket::tcp_connect ( + $self->{connect}[0], + $self->{connect}[1], + sub { + my ($fh, $host, $port, $retry) = @_; + + if ($fh) { + $self->{fh} = $fh; + + delete $self->{_skip_drain_rbuf}; + $self->_start; + + $self->{on_connect} + and $self->{on_connect}($self, $host, $port, sub { + delete @$self{qw(fh _tw _rtw _wtw _ww _rw _eof _queue rbuf _wbuf tls _tls_rbuf _tls_wbuf)}; + $self->{_skip_drain_rbuf} = 1; + &$retry; + }); + + } else { + if ($self->{on_connect_error}) { + $self->{on_connect_error}($self, "$!"); + $self->destroy; + } else { + $self->_error ($!, 1); + } + } + }, + sub { + local $self->{fh} = $_[0]; + + $self->{on_prepare} + ? $self->{on_prepare}->($self) + : () + } + ); + } - AnyEvent::Util::fh_nonblocking $self->{fh}, 1; + } else { + Carp::croak "AnyEvent::Handle: either an existing fh or the connect parameter must be specified"; + } - $self->{_activity} = AnyEvent->now; - $self->_timeout; + $self +} - $self->no_delay (delete $self->{no_delay}) if exists $self->{no_delay}; +sub _start { + my ($self) = @_; - $self->starttls (delete $self->{tls}, delete $self->{tls_ctx}) - if $self->{tls}; + AnyEvent::Util::fh_nonblocking $self->{fh}, 1; - $self->on_drain (delete $self->{on_drain}) if exists $self->{on_drain}; + $self->{_activity} = + $self->{_ractivity} = + $self->{_wactivity} = AE::now; - $self->start_read - if $self->{on_read}; + $self->timeout (delete $self->{timeout} ) if $self->{timeout}; + $self->rtimeout (delete $self->{rtimeout} ) if $self->{rtimeout}; + $self->wtimeout (delete $self->{wtimeout} ) if $self->{wtimeout}; - $self->{fh} && $self -} + $self->no_delay (delete $self->{no_delay} ) if exists $self->{no_delay} && $self->{no_delay}; + $self->keepalive (delete $self->{keepalive}) if exists $self->{keepalive} && $self->{keepalive}; -sub _shutdown { - my ($self) = @_; + $self->oobinline (exists $self->{oobinline} ? delete $self->{oobinline} : 1); - delete @$self{qw(_tw _rw _ww fh wbuf on_read _queue)}; - $self->{_eof} = 1; # tell starttls et. al to stop trying + $self->starttls (delete $self->{tls}, delete $self->{tls_ctx}) + if $self->{tls}; - &_freetls; + $self->on_drain (delete $self->{on_drain}) if $self->{on_drain}; + + $self->start_read + if $self->{on_read} || @{ $self->{_queue} }; + + $self->_drain_wbuf; } sub _error { my ($self, $errno, $fatal, $message) = @_; - $self->_shutdown - if $fatal; - $! = $errno; $message ||= "$!"; if ($self->{on_error}) { $self->{on_error}($self, $fatal, $message); - } elsif ($self->{fh}) { + $self->destroy if $fatal; + } elsif ($self->{fh} || $self->{connect}) { + $self->destroy; Carp::croak "AnyEvent::Handle uncaught error: $message"; } } @@ -392,15 +605,17 @@ =item $handle->on_timeout ($cb) -Replace the current C callback, or disables the callback (but -not the timeout) if C<$cb> = C. See the C constructor -argument and method. +=item $handle->on_rtimeout ($cb) + +=item $handle->on_wtimeout ($cb) + +Replace the current C, C or C +callback, or disables the callback (but not the timeout) if C<$cb> = +C. See the C constructor argument and method. =cut -sub on_timeout { - $_[0]{on_timeout} = $_[1]; -} +# see below =item $handle->autocork ($boolean) @@ -425,62 +640,175 @@ eval { local $SIG{__DIE__}; - setsockopt $_[0]{fh}, &Socket::IPPROTO_TCP, &Socket::TCP_NODELAY, int $_[1]; + setsockopt $_[0]{fh}, Socket::IPPROTO_TCP (), Socket::TCP_NODELAY (), int $_[1] + if $_[0]{fh}; }; } +=item $handle->keepalive ($boolean) + +Enables or disables the C setting (see constructor argument of +the same name for details). + +=cut + +sub keepalive { + $_[0]{keepalive} = $_[1]; + + eval { + local $SIG{__DIE__}; + setsockopt $_[0]{fh}, Socket::SOL_SOCKET (), Socket::SO_KEEPALIVE (), int $_[1] + if $_[0]{fh}; + }; +} + +=item $handle->oobinline ($boolean) + +Enables or disables the C setting (see constructor argument of +the same name for details). + +=cut + +sub oobinline { + $_[0]{oobinline} = $_[1]; + + eval { + local $SIG{__DIE__}; + setsockopt $_[0]{fh}, Socket::SOL_SOCKET (), Socket::SO_OOBINLINE (), int $_[1] + if $_[0]{fh}; + }; +} + +=item $handle->keepalive ($boolean) + +Enables or disables the C setting (see constructor argument of +the same name for details). + +=cut + +sub keepalive { + $_[0]{keepalive} = $_[1]; + + eval { + local $SIG{__DIE__}; + setsockopt $_[0]{fh}, Socket::SOL_SOCKET (), Socket::SO_KEEPALIVE (), int $_[1] + if $_[0]{fh}; + }; +} + +=item $handle->on_starttls ($cb) + +Replace the current C callback (see the C constructor argument). + +=cut + +sub on_starttls { + $_[0]{on_starttls} = $_[1]; +} + +=item $handle->on_stoptls ($cb) + +Replace the current C callback (see the C constructor argument). + +=cut + +sub on_stoptls { + $_[0]{on_stoptls} = $_[1]; +} + +=item $handle->rbuf_max ($max_octets) + +Configures the C setting (C disables it). + +=cut + +sub rbuf_max { + $_[0]{rbuf_max} = $_[1]; +} + ############################################################################# =item $handle->timeout ($seconds) +=item $handle->rtimeout ($seconds) + +=item $handle->wtimeout ($seconds) + Configures (or disables) the inactivity timeout. +=item $handle->timeout_reset + +=item $handle->rtimeout_reset + +=item $handle->wtimeout_reset + +Reset the activity timeout, as if data was received or sent. + +These methods are cheap to call. + =cut -sub timeout { - my ($self, $timeout) = @_; +for my $dir ("", "r", "w") { + my $timeout = "${dir}timeout"; + my $tw = "_${dir}tw"; + my $on_timeout = "on_${dir}timeout"; + my $activity = "_${dir}activity"; + my $cb; - $self->{timeout} = $timeout; - $self->_timeout; -} + *$on_timeout = sub { + $_[0]{$on_timeout} = $_[1]; + }; -# reset the timeout watcher, as neccessary -# also check for time-outs -sub _timeout { - my ($self) = @_; + *$timeout = sub { + my ($self, $new_value) = @_; - if ($self->{timeout}) { - my $NOW = AnyEvent->now; + $self->{$timeout} = $new_value; + delete $self->{$tw}; &$cb; + }; - # when would the timeout trigger? - my $after = $self->{_activity} + $self->{timeout} - $NOW; + *{"${dir}timeout_reset"} = sub { + $_[0]{$activity} = AE::now; + }; - # now or in the past already? - if ($after <= 0) { - $self->{_activity} = $NOW; + # main workhorse: + # reset the timeout watcher, as neccessary + # also check for time-outs + $cb = sub { + my ($self) = @_; + + if ($self->{$timeout} && $self->{fh}) { + my $NOW = AE::now; + + # when would the timeout trigger? + my $after = $self->{$activity} + $self->{$timeout} - $NOW; + + # now or in the past already? + if ($after <= 0) { + $self->{$activity} = $NOW; - if ($self->{on_timeout}) { - $self->{on_timeout}($self); - } else { - $self->_error (&Errno::ETIMEDOUT); - } + if ($self->{$on_timeout}) { + $self->{$on_timeout}($self); + } else { + $self->_error (Errno::ETIMEDOUT); + } - # callback could have changed timeout value, optimise - return unless $self->{timeout}; + # callback could have changed timeout value, optimise + return unless $self->{$timeout}; - # calculate new after - $after = $self->{timeout}; - } + # calculate new after + $after = $self->{$timeout}; + } - Scalar::Util::weaken $self; - return unless $self; # ->error could have destroyed $self + Scalar::Util::weaken $self; + return unless $self; # ->error could have destroyed $self - $self->{_tw} ||= AnyEvent->timer (after => $after, cb => sub { - delete $self->{_tw}; - $self->_timeout; - }); - } else { - delete $self->{_tw}; + $self->{$tw} ||= AE::timer $after, 0, sub { + delete $self->{$tw}; + $cb->($self); + }; + } else { + delete $self->{$tw}; + } } } @@ -506,6 +834,9 @@ Sets the C callback or clears it (see the description of C in the constructor). +This method may invoke callbacks (and therefore the handle might be +destroyed after it returns). + =cut sub on_drain { @@ -523,6 +854,9 @@ want (only limited by the available memory), as C buffers it independently of the kernel. +This method may invoke callbacks (and therefore the handle might be +destroyed after it returns). + =cut sub _drain_wbuf { @@ -535,10 +869,10 @@ my $cb = sub { my $len = syswrite $self->{fh}, $self->{wbuf}; - if ($len >= 0) { + if (defined $len) { substr $self->{wbuf}, 0, $len, ""; - $self->{_activity} = AnyEvent->now; + $self->{_activity} = $self->{_wactivity} = AE::now; $self->{on_drain}($self) if $self->{low_water_mark} >= (length $self->{wbuf}) + (length $self->{_tls_wbuf}) @@ -554,13 +888,14 @@ $cb->() unless $self->{autocork}; # if still data left in wbuf, we need to poll - $self->{_ww} = AnyEvent->io (fh => $self->{fh}, poll => "w", cb => $cb) + $self->{_ww} = AE::io $self->{fh}, 1, $cb if length $self->{wbuf}; }; } our %WH; +# deprecated sub register_write_type($$) { $WH{$_[0]} = $_[1]; } @@ -571,24 +906,30 @@ if (@_ > 1) { my $type = shift; - @_ = ($WH{$type} or Carp::croak "unsupported type passed to AnyEvent::Handle::push_write") + @_ = ($WH{$type} ||= _load_func "$type\::anyevent_write_type" + or Carp::croak "unsupported/unloadable type '$type' passed to AnyEvent::Handle::push_write") ->($self, @_); } - if ($self->{tls}) { - $self->{_tls_wbuf} .= $_[0]; + # we downgrade here to avoid hard-to-track-down bugs, + # and diagnose the problem earlier and better. - &_dotls ($self); + if ($self->{tls}) { + utf8::downgrade $self->{_tls_wbuf} .= $_[0]; + &_dotls ($self) if $self->{fh}; } else { - $self->{wbuf} .= $_[0]; - $self->_drain_wbuf; + utf8::downgrade $self->{wbuf} .= $_[0]; + $self->_drain_wbuf if $self->{fh}; } } =item $handle->push_write (type => @args) -Instead of formatting your data yourself, you can also let this module do -the job by specifying a type and type-specific arguments. +Instead of formatting your data yourself, you can also let this module +do the job by specifying a type and type-specific arguments. You +can also specify the (fully qualified) name of a package, in which +case AnyEvent tries to load the package and then expects to find the +C function inside (see "custom write types", below). Predefined types are (if you have ideas for additional types, feel free to drop by and tell us): @@ -655,13 +996,17 @@ =cut +sub json_coder() { + eval { require JSON::XS; JSON::XS->new->utf8 } + || do { require JSON; JSON->new->utf8 } +} + register_write_type json => sub { my ($self, $ref) = @_; - require JSON; + my $json = $self->{json} ||= json_coder; - $self->{json} ? $self->{json}->encode ($ref) - : JSON::encode_json ($ref) + $json->encode ($ref) }; =item storable => $reference @@ -685,8 +1030,9 @@ Sometimes you know you want to close the socket after writing your data before it was actually written. One way to do that is to replace your -C handler by a callback that shuts down the socket. This method -is a shorthand for just that, and replaces the C callback with: +C handler by a callback that shuts down the socket (and set +C to C<0>). This method is a shorthand for just that, and +replaces the C callback with: sub { shutdown $_[0]{fh}, 1 } # for push_shutdown @@ -696,23 +1042,49 @@ You can rely on the normal read queue and C handling afterwards. This is the cleanest way to close a connection. +This method may invoke callbacks (and therefore the handle might be +destroyed after it returns). + =cut sub push_shutdown { - $_[0]->{on_drain} = sub { shutdown $_[0]{fh}, 1 }; + my ($self) = @_; + + delete $self->{low_water_mark}; + $self->on_drain (sub { shutdown $_[0]{fh}, 1 }); } -=item AnyEvent::Handle::register_write_type type => $coderef->($handle, @args) +=item custom write types - Package::anyevent_write_type $handle, @args + +Instead of one of the predefined types, you can also specify the name of +a package. AnyEvent will try to load the package and then expects to find +a function named C inside. If it isn't found, it +progressively tries to load the parent package until it either finds the +function (good) or runs out of packages (bad). + +Whenever the given C is used, C will the function with +the handle object and the remaining arguments. + +The function is supposed to return a single octet string that will be +appended to the write buffer, so you cna mentally treat this function as a +"arguments to on-the-wire-format" converter. -This function (not method) lets you add your own types to C. -Whenever the given C is used, C will invoke the code -reference with the handle object and the remaining arguments. +Example: implement a custom write type C that joins the remaining +arguments using the first one. -The code reference is supposed to return a single octet string that will -be appended to the write buffer. + $handle->push_write (My::Type => " ", 1,2,3); -Note that this is a function, and all types registered this way will be -global, so try to use unique names. + # uses the following package, which can be defined in the "My::Type" or in + # the "My" modules to be auto-loaded, or just about anywhere when the + # My::Type::anyevent_write_type is defined before invoking it. + + package My::Type; + + sub anyevent_write_type { + my ($handle, $delim, @args) = @_; + + join $delim, @args + } =cut @@ -804,28 +1176,24 @@ sub _drain_rbuf { my ($self) = @_; - local $self->{_in_drain} = 1; - - if ( - defined $self->{rbuf_max} - && $self->{rbuf_max} < length $self->{rbuf} - ) { - $self->_error (&Errno::ENOSPC, 1), return; - } + # avoid recursion + return if $self->{_skip_drain_rbuf}; + local $self->{_skip_drain_rbuf} = 1; while () { # we need to use a separate tls read buffer, as we must not receive data while # we are draining the buffer, and this can only happen with TLS. - $self->{rbuf} .= delete $self->{_tls_rbuf} if exists $self->{_tls_rbuf}; + $self->{rbuf} .= delete $self->{_tls_rbuf} + if exists $self->{_tls_rbuf}; my $len = length $self->{rbuf}; if (my $cb = shift @{ $self->{_queue} }) { unless ($cb->($self)) { - if ($self->{_eof}) { - # no progress can be made (not enough data and no data forthcoming) - $self->_error (&Errno::EPIPE, 1), return; - } + # no progress can be made + # (not enough data and no data forthcoming) + $self->_error (Errno::EPIPE, 1), return + if $self->{_eof}; unshift @{ $self->{_queue} }, $cb; last; @@ -842,7 +1210,7 @@ ) { # no further data will arrive # so no progress can be made - $self->_error (&Errno::EPIPE, 1), return + $self->_error (Errno::EPIPE, 1), return if $self->{_eof}; last; # more data might arrive @@ -855,11 +1223,18 @@ } if ($self->{_eof}) { - if ($self->{on_eof}) { - $self->{on_eof}($self) - } else { - $self->_error (0, 1); - } + $self->{on_eof} + ? $self->{on_eof}($self) + : $self->_error (0, 1, "Unexpected end-of-file"); + + return; + } + + if ( + defined $self->{rbuf_max} + && $self->{rbuf_max} < length $self->{rbuf} + ) { + $self->_error (Errno::ENOSPC, 1), return; } # may need to restart read watcher @@ -875,13 +1250,16 @@ the new callback is C). See the description of C in the constructor. +This method may invoke callbacks (and therefore the handle might be +destroyed after it returns). + =cut sub on_read { my ($self, $cb) = @_; $self->{on_read} = $cb; - $self->_drain_rbuf if $cb && !$self->{_in_drain}; + $self->_drain_rbuf if $cb; } =item $handle->rbuf @@ -923,6 +1301,9 @@ interested in (which can be none at all) and return a true value. After returning true, it will be removed from the queue. +These methods may invoke callbacks (and therefore the handle might be +destroyed after it returns). + =cut our %RH; @@ -938,12 +1319,13 @@ if (@_) { my $type = shift; - $cb = ($RH{$type} or Carp::croak "unsupported type passed to AnyEvent::Handle::push_read") + $cb = ($RH{$type} ||= _load_func "$type\::anyevent_read_type" + or Carp::croak "unsupported/unloadable type '$type' passed to AnyEvent::Handle::push_read") ->($self, $cb, @_); } push @{ $self->{_queue} }, $cb; - $self->_drain_rbuf unless $self->{_in_drain}; + $self->_drain_rbuf; } sub unshift_read { @@ -957,9 +1339,8 @@ ->($self, $cb, @_); } - unshift @{ $self->{_queue} }, $cb; - $self->_drain_rbuf unless $self->{_in_drain}; + $self->_drain_rbuf; } =item $handle->push_read (type => @args, $cb) @@ -968,7 +1349,9 @@ Instead of providing a callback that parses the data itself you can chose between a number of predefined parsing formats, for chunks of data, lines -etc. +etc. You can also specify the (fully qualified) name of a package, in +which case AnyEvent tries to load the package and then expects to find the +C function inside (see "custom read types", below). Predefined types are (if you have ideas for additional types, feel free to drop by and tell us): @@ -1102,7 +1485,7 @@ # reject if ($reject && $$rbuf =~ $reject) { - $self->_error (&Errno::EBADMSG); + $self->_error (Errno::EBADMSG); } # skip @@ -1128,7 +1511,7 @@ sub { unless ($_[0]{rbuf} =~ s/^(0|[1-9][0-9]*)://) { if ($_[0]{rbuf} =~ /[^0-9]/) { - $self->_error (&Errno::EBADMSG); + $self->_error (Errno::EBADMSG); } return; } @@ -1141,7 +1524,7 @@ if ($_[1] eq ",") { $cb->($_[0], $string); } else { - $self->_error (&Errno::EBADMSG); + $self->_error (Errno::EBADMSG); } }); }); @@ -1218,13 +1601,11 @@ register_read_type json => sub { my ($self, $cb) = @_; - require JSON; + my $json = $self->{json} ||= json_coder; my $data; my $rbuf = \$self->{rbuf}; - my $json = $self->{json} ||= JSON->new->utf8; - sub { my $ref = eval { $json->incr_parse ($self->{rbuf}) }; @@ -1241,7 +1622,7 @@ $self->{rbuf} = $json->incr_text; $json->incr_text = ""; - $self->_error (&Errno::EBADMSG); + $self->_error (Errno::EBADMSG); () } else { @@ -1288,7 +1669,7 @@ if (my $ref = eval { Storable::thaw ($_[1]) }) { $cb->($_[0], $ref); } else { - $self->_error (&Errno::EBADMSG); + $self->_error (Errno::EBADMSG); } }); } @@ -1299,25 +1680,28 @@ =back -=item AnyEvent::Handle::register_read_type type => $coderef->($handle, $cb, @args) - -This function (not method) lets you add your own types to C. +=item custom read types - Package::anyevent_read_type $handle, $cb, @args -Whenever the given C is used, C will invoke the code -reference with the handle object, the callback and the remaining -arguments. +Instead of one of the predefined types, you can also specify the name +of a package. AnyEvent will try to load the package and then expects to +find a function named C inside. If it isn't found, it +progressively tries to load the parent package until it either finds the +function (good) or runs out of packages (bad). + +Whenever this type is used, C will invoke the function with the +handle object, the original callback and the remaining arguments. + +The function is supposed to return a callback (usually a closure) that +works as a plain read callback (see C<< ->push_read ($cb) >>), so you can +mentally treat the function as a "configurable read type to read callback" +converter. + +It should invoke the original callback when it is done reading (remember +to pass C<$handle> as first argument as all other callbacks do that, +although there is no strict requirement on this). -The code reference is supposed to return a callback (usually a closure) -that works as a plain read callback (see C<< ->push_read ($cb) >>). - -It should invoke the passed callback when it is done reading (remember to -pass C<$handle> as first argument as all other callbacks do that). - -Note that this is a function, and all types registered this way will be -global, so try to use unique names. - -For examples, see the source of this module (F, -search for C)). +For examples, see the source of this module (F, search for C)). =item $handle->stop_read @@ -1347,49 +1731,57 @@ sub start_read { my ($self) = @_; - unless ($self->{_rw} || $self->{_eof}) { + unless ($self->{_rw} || $self->{_eof} || !$self->{fh}) { Scalar::Util::weaken $self; - $self->{_rw} = AnyEvent->io (fh => $self->{fh}, poll => "r", cb => sub { + $self->{_rw} = AE::io $self->{fh}, 0, sub { my $rbuf = \($self->{tls} ? my $buf : $self->{rbuf}); my $len = sysread $self->{fh}, $$rbuf, $self->{read_size} || 8192, length $$rbuf; if ($len > 0) { - $self->{_activity} = AnyEvent->now; + $self->{_activity} = $self->{_ractivity} = AE::now; if ($self->{tls}) { Net::SSLeay::BIO_write ($self->{_rbio}, $$rbuf); &_dotls ($self); } else { - $self->_drain_rbuf unless $self->{_in_drain}; + $self->_drain_rbuf; } } elsif (defined $len) { delete $self->{_rw}; $self->{_eof} = 1; - $self->_drain_rbuf unless $self->{_in_drain}; + $self->_drain_rbuf; } elsif ($! != EAGAIN && $! != EINTR && $! != WSAEWOULDBLOCK) { return $self->_error ($!, 1); } - }); + }; } } our $ERROR_SYSCALL; our $ERROR_WANT_READ; -our $ERROR_ZERO_RETURN; sub _tls_error { my ($self, $err) = @_; - warn "$err,$!\n";#d# return $self->_error ($!, 1) if $err == Net::SSLeay::ERROR_SYSCALL (); - $self->_error (&Errno::EPROTO, 1, - Net::SSLeay::ERR_error_string (Net::SSLeay::ERR_get_error ())); + my $err =Net::SSLeay::ERR_error_string (Net::SSLeay::ERR_get_error ()); + + # reduce error string to look less scary + $err =~ s/^error:[0-9a-fA-F]{8}:[^:]+:([^:]+):/\L$1: /; + + if ($self->{_on_starttls}) { + (delete $self->{_on_starttls})->($self, undef, $err); + &_freetls; + } else { + &_freetls; + $self->_error (Errno::EPROTO, 1, $err); + } } # poll the write BIO and send the data if applicable @@ -1410,33 +1802,44 @@ $tmp = Net::SSLeay::get_error ($self->{tls}, $tmp); return $self->_tls_error ($tmp) if $tmp != $ERROR_WANT_READ - && ($tmp != $ERROR_SYSCALL || $!) - && $tmp != $ERROR_ZERO_RETURN; + && ($tmp != $ERROR_SYSCALL || $!); } while (defined ($tmp = Net::SSLeay::read ($self->{tls}))) { unless (length $tmp) { - # let's treat SSL-eof as we treat normal EOF - delete $self->{_rw}; - $self->{_eof} = 1; + $self->{_on_starttls} + and (delete $self->{_on_starttls})->($self, undef, "EOF during handshake"); # ??? &_freetls; + + if ($self->{on_stoptls}) { + $self->{on_stoptls}($self); + return; + } else { + # let's treat SSL-eof as we treat normal EOF + delete $self->{_rw}; + $self->{_eof} = 1; + } } $self->{_tls_rbuf} .= $tmp; - $self->_drain_rbuf unless $self->{_in_drain}; + $self->_drain_rbuf; $self->{tls} or return; # tls session might have gone away in callback } $tmp = Net::SSLeay::get_error ($self->{tls}, -1); return $self->_tls_error ($tmp) if $tmp != $ERROR_WANT_READ - && ($tmp != $ERROR_SYSCALL || $!) - && $tmp != $ERROR_ZERO_RETURN; + && ($tmp != $ERROR_SYSCALL || $!); while (length ($tmp = Net::SSLeay::BIO_read ($self->{_wbio}))) { $self->{wbuf} .= $tmp; $self->_drain_wbuf; + $self->{tls} or return; # tls session might have gone away in callback } + + $self->{_on_starttls} + and Net::SSLeay::state ($self->{tls}) == Net::SSLeay::ST_OK () + and (delete $self->{_on_starttls})->($self, 1, "TLS/SSL connection established"); } =item $handle->starttls ($tls[, $tls_ctx]) @@ -1445,6 +1848,10 @@ object is created, you can also do that at a later time by calling C. +Starting TLS is currently an asynchronous operation - when you push some +write data and then call C<< ->starttls >> then TLS negotiation will start +immediately, after which the queued write data is then sent. + The first argument is the same as the C constructor argument (either C<"connect">, C<"accept"> or an existing Net::SSLeay object). @@ -1458,34 +1865,51 @@ changed to your liking. Note that the handshake might have already started when this function returns. -If it an error to start a TLS handshake more than once per -AnyEvent::Handle object (this is due to bugs in OpenSSL). +Due to bugs in OpenSSL, it might or might not be possible to do multiple +handshakes on the same stream. Best do not attempt to use the stream after +stopping TLS. + +This method may invoke callbacks (and therefore the handle might be +destroyed after it returns). =cut +our %TLS_CACHE; #TODO not yet documented, should we? + sub starttls { - my ($self, $ssl, $ctx) = @_; + my ($self, $tls, $ctx) = @_; + + Carp::croak "It is an error to call starttls on an AnyEvent::Handle object while TLS is already active, caught" + if $self->{tls}; + + $self->{tls} = $tls; + $self->{tls_ctx} = $ctx if @_ > 2; + + return unless $self->{fh}; require Net::SSLeay; - Carp::croak "it is an error to call starttls more than once on an AnyEvent::Handle object" - if $self->{tls}; + $ERROR_SYSCALL = Net::SSLeay::ERROR_SYSCALL (); + $ERROR_WANT_READ = Net::SSLeay::ERROR_WANT_READ (); - $ERROR_SYSCALL = Net::SSLeay::ERROR_SYSCALL (); - $ERROR_WANT_READ = Net::SSLeay::ERROR_WANT_READ (); - $ERROR_ZERO_RETURN = Net::SSLeay::ERROR_ZERO_RETURN (); + $tls = delete $self->{tls}; + $ctx = $self->{tls_ctx}; - $ctx ||= $self->{tls_ctx}; + local $Carp::CarpLevel = 1; # skip ourselves when creating a new context or session if ("HASH" eq ref $ctx) { require AnyEvent::TLS; - local $Carp::CarpLevel = 1; # skip ourselves when creating a new context - $ctx = new AnyEvent::TLS %$ctx; + if ($ctx->{cache}) { + my $key = $ctx+0; + $ctx = $TLS_CACHE{$key} ||= new AnyEvent::TLS %$ctx; + } else { + $ctx = new AnyEvent::TLS %$ctx; + } } $self->{tls_ctx} = $ctx || TLS_CTX (); - $self->{tls} = $ssl = $self->{tls_ctx}->_get_session ($ssl, $self, $self->{peername}); + $self->{tls} = $tls = $self->{tls_ctx}->_get_session ($tls, $self, $self->{peername}); # basically, this is deep magic (because SSL_read should have the same issues) # but the openssl maintainers basically said: "trust us, it just works". @@ -1502,12 +1926,17 @@ # Net::SSLeay::CTX_set_mode ($ssl, # (eval { local $SIG{__DIE__}; Net::SSLeay::MODE_ENABLE_PARTIAL_WRITE () } || 1) # | (eval { local $SIG{__DIE__}; Net::SSLeay::MODE_ACCEPT_MOVING_WRITE_BUFFER () } || 2)); - Net::SSLeay::CTX_set_mode ($ssl, 1|2); + Net::SSLeay::CTX_set_mode ($tls, 1|2); $self->{_rbio} = Net::SSLeay::BIO_new (Net::SSLeay::BIO_s_mem ()); $self->{_wbio} = Net::SSLeay::BIO_new (Net::SSLeay::BIO_s_mem ()); - Net::SSLeay::set_bio ($ssl, $self->{_rbio}, $self->{_wbio}); + Net::SSLeay::BIO_write ($self->{_rbio}, delete $self->{rbuf}); + + Net::SSLeay::set_bio ($tls, $self->{_rbio}, $self->{_wbio}); + + $self->{_on_starttls} = sub { $_[0]{on_starttls}(@_) } + if $self->{on_starttls}; &_dotls; # need to trigger the initial handshake $self->start_read; # make sure we actually do read @@ -1517,22 +1946,25 @@ Shuts down the SSL connection - this makes a proper EOF handshake by sending a close notify to the other side, but since OpenSSL doesn't -support non-blocking shut downs, it is not possible to re-use the stream -afterwards. +support non-blocking shut downs, it is not guaranteed that you can re-use +the stream afterwards. + +This method may invoke callbacks (and therefore the handle might be +destroyed after it returns). =cut sub stoptls { my ($self) = @_; - if ($self->{tls}) { + if ($self->{tls} && $self->{fh}) { Net::SSLeay::shutdown ($self->{tls}); &_dotls; - # we don't give a shit. no, we do, but we can't. no... - # we, we... have to use openssl :/ - &_freetls; +# # we don't give a shit. no, we do, but we can't. no...#d# +# # we, we... have to use openssl :/#d# +# &_freetls;#d# } } @@ -1541,9 +1973,10 @@ return unless $self->{tls}; - $self->{tls_ctx}->_put_session (delete $self->{tls}); + $self->{tls_ctx}->_put_session (delete $self->{tls}) + if $self->{tls} > 0; - delete @$self{qw(_rbio _wbio _tls_wbuf)}; + delete @$self{qw(_rbio _wbio _tls_wbuf _on_starttls)}; } sub DESTROY { @@ -1553,13 +1986,13 @@ my $linger = exists $self->{linger} ? $self->{linger} : 3600; - if ($linger && length $self->{wbuf}) { + if ($linger && length $self->{wbuf} && $self->{fh}) { my $fh = delete $self->{fh}; my $wbuf = delete $self->{wbuf}; my @linger; - push @linger, AnyEvent->io (fh => $fh, poll => "w", cb => sub { + push @linger, AE::io $fh, 1, sub { my $len = syswrite $fh, $wbuf, length $wbuf; if ($len > 0) { @@ -1567,18 +2000,20 @@ } else { @linger = (); # end } - }); - push @linger, AnyEvent->timer (after => $linger, cb => sub { + }; + push @linger, AE::timer $linger, 0, sub { @linger = (); - }); + }; } } =item $handle->destroy Shuts down the handle object as much as possible - this call ensures that -no further callbacks will be invoked and resources will be freed as much -as possible. You must not call any methods on the object afterwards. +no further callbacks will be invoked and as many resources as possible +will be freed. Any method you will call on the handle object after +destroying it in this way will be silently ignored (and it will return the +empty list). Normally, you can just "forget" any references to an AnyEvent::Handle object and it will simply shut down. This works in fatal error and EOF @@ -1587,6 +2022,11 @@ within such an callback. You I call C<< ->destroy >> explicitly in that case. +Destroying the handle object in this way has the advantage that callbacks +will be removed as well, so if those are the only reference holders (as +is common), then one doesn't need to do anything special to break any +reference cycles. + The handle might still linger in the background and write out remaining data, as specified by the C option, however. @@ -1597,8 +2037,36 @@ $self->DESTROY; %$self = (); + bless $self, "AnyEvent::Handle::destroyed"; } +sub AnyEvent::Handle::destroyed::AUTOLOAD { + #nop +} + +=item $handle->destroyed + +Returns false as long as the handle hasn't been destroyed by a call to C<< +->destroy >>, true otherwise. + +Can be useful to decide whether the handle is still valid after some +callback possibly destroyed the handle. For example, C<< ->push_write >>, +C<< ->starttls >> and other methods can call user callbacks, which in turn +can destroy the handle, so work can be avoided by checking sometimes: + + $hdl->starttls ("accept"); + return if $hdl->destroyed; + $hdl->push_write (... + +Note that the call to C will silently be ignored if the handle +has been destroyed, so often you can just ignore the possibility of the +handle being destroyed. + +=cut + +sub destroyed { 0 } +sub AnyEvent::Handle::destroyed::destroyed { 1 } + =item AnyEvent::Handle::TLS_CTX This function creates and returns the AnyEvent::TLS object used by default @@ -1663,7 +2131,6 @@ $handle->on_eof (undef); $handle->on_error (sub { my $data = delete $_[0]{rbuf}; - undef $handle; }); The reason to use C is that TCP connections, due to latencies @@ -1689,6 +2156,94 @@ undef $handle; }); +If you just want to queue some data and then signal EOF to the other side, +consider using C<< ->push_shutdown >> instead. + +=item I want to contact a TLS/SSL server, I don't care about security. + +If your TLS server is a pure TLS server (e.g. HTTPS) that only speaks TLS, +simply connect to it and then create the AnyEvent::Handle with the C +parameter: + + tcp_connect $host, $port, sub { + my ($fh) = @_; + + my $handle = new AnyEvent::Handle + fh => $fh, + tls => "connect", + on_error => sub { ... }; + + $handle->push_write (...); + }; + +=item I want to contact a TLS/SSL server, I do care about security. + +Then you should additionally enable certificate verification, including +peername verification, if the protocol you use supports it (see +L, C). + +E.g. for HTTPS: + + tcp_connect $host, $port, sub { + my ($fh) = @_; + + my $handle = new AnyEvent::Handle + fh => $fh, + peername => $host, + tls => "connect", + tls_ctx => { verify => 1, verify_peername => "https" }, + ... + +Note that you must specify the hostname you connected to (or whatever +"peername" the protocol needs) as the C argument, otherwise no +peername verification will be done. + +The above will use the system-dependent default set of trusted CA +certificates. If you want to check against a specific CA, add the +C (or C) arguments to C: + + tls_ctx => { + verify => 1, + verify_peername => "https", + ca_file => "my-ca-cert.pem", + }, + +=item I want to create a TLS/SSL server, how do I do that? + +Well, you first need to get a server certificate and key. You have +three options: a) ask a CA (buy one, use cacert.org etc.) b) create a +self-signed certificate (cheap. check the search engine of your choice, +there are many tutorials on the net) or c) make your own CA (tinyca2 is a +nice program for that purpose). + +Then create a file with your private key (in PEM format, see +L), followed by the certificate (also in PEM format). The +file should then look like this: + + -----BEGIN RSA PRIVATE KEY----- + ...header data + ... lots of base64'y-stuff + -----END RSA PRIVATE KEY----- + + -----BEGIN CERTIFICATE----- + ... lots of base64'y-stuff + -----END CERTIFICATE----- + +The important bits are the "PRIVATE KEY" and "CERTIFICATE" parts. Then +specify this file as C: + + tcp_server undef, $port, sub { + my ($fh) = @_; + + my $handle = new AnyEvent::Handle + fh => $fh, + tls => "accept", + tls_ctx => { cert_file => "my-server-keycert.pem" }, + ... + +When you have intermediate CA certificates that your clients might not +know about, just append them to the C. + =back