--- AnyEvent/lib/AnyEvent/Handle.pm 2008/05/24 22:27:11 1.28 +++ AnyEvent/lib/AnyEvent/Handle.pm 2008/05/26 20:02:22 1.37 @@ -4,7 +4,7 @@ use strict; use AnyEvent (); -use AnyEvent::Util (); +use AnyEvent::Util qw(WSAWOULDBLOCK); use Scalar::Util (); use Carp (); use Fcntl (); @@ -14,8 +14,6 @@ AnyEvent::Handle - non-blocking I/O on file handles via AnyEvent -This module is experimental. - =cut our $VERSION = '0.04'; @@ -27,22 +25,25 @@ my $cv = AnyEvent->condvar; - my $ae_fh = AnyEvent::Handle->new (fh => \*STDIN); - - #TODO - - # or use the constructor to pass the callback: - - my $ae_fh2 = + my $handle = AnyEvent::Handle->new ( fh => \*STDIN, on_eof => sub { $cv->broadcast; }, - #TODO ); - $cv->wait; + # send some request line + $handle->push_write ("getinfo\015\012"); + + # read the response line + $handle->push_read (line => sub { + my ($handle, $line) = @_; + warn "read line <$line>\n"; + $cv->send; + }); + + $cv->recv; =head1 DESCRIPTION @@ -92,7 +93,10 @@ called. On callback entrance, the value of C<$!> contains the operating system -error (or C or C). +error (or C, C or C). + +The callbakc should throw an exception. If it returns, then +AnyEvent::Handle will C for you. While not mandatory, it is I recommended to set this callback, as you will not be notified of errors otherwise. The default simply calls @@ -170,16 +174,6 @@ =cut -our (%RH, %WH); - -sub register_read_type($$) { - $RH{$_[0]} = $_[1]; -} - -sub register_write_type($$) { - $WH{$_[0]} = $_[1]; -} - sub new { my $class = shift; @@ -220,11 +214,10 @@ $self->_shutdown; } - if ($self->{on_error}) { - $self->{on_error}($self); - } else { - die "AnyEvent::Handle uncaught fatal error: $!"; - } + $self->{on_error}($self) + if $self->{on_error}; + + Carp::croak "AnyEvent::Handle uncaught fatal error: $!"; } =item $fh = $handle->fh @@ -299,12 +292,14 @@ sub _drain_wbuf { my ($self) = @_; - unless ($self->{ww}) { + if (!$self->{ww} && length $self->{wbuf}) { + Scalar::Util::weaken $self; + my $cb = sub { my $len = syswrite $self->{fh}, $self->{wbuf}; - if ($len > 0) { + if ($len >= 0) { substr $self->{wbuf}, 0, $len, ""; $self->{on_drain}($self) @@ -312,20 +307,36 @@ && $self->{on_drain}; delete $self->{ww} unless length $self->{wbuf}; - } elsif ($! != EAGAIN && $! != EINTR) { + } elsif ($! != EAGAIN && $! != EINTR && $! != WSAWOULDBLOCK) { $self->error; } }; - $self->{ww} = AnyEvent->io (fh => $self->{fh}, poll => "w", cb => $cb); + # try to write data immediately + $cb->(); - $cb->($self); + # if still data left in wbuf, we need to poll + $self->{ww} = AnyEvent->io (fh => $self->{fh}, poll => "w", cb => $cb) + if length $self->{wbuf}; }; } +our %WH; + +sub register_write_type($$) { + $WH{$_[0]} = $_[1]; +} + sub push_write { my $self = shift; + if (@_ > 1) { + my $type = shift; + + @_ = ($WH{$type} or Carp::croak "unsupported type passed to AnyEvent::Handle::push_write") + ->($self, @_); + } + if ($self->{filter_w}) { $self->{filter_w}->($self, \$_[0]); } else { @@ -334,6 +345,47 @@ } } +=item $handle->push_write (type => @args) + +=item $handle->unshift_write (type => @args) + +Instead of formatting your data yourself, you can also let this module do +the job by specifying a type and type-specific arguments. + +Predefined types are (if you have ideas for additional types, feel free to +drop by and tell us): + +=over 4 + +=item netstring => $string + +Formats the given value as netstring +(http://cr.yp.to/proto/netstrings.txt, this is not a recommendation to use them). + +=back + +=cut + +register_write_type netstring => sub { + my ($self, $string) = @_; + + sprintf "%d:%s,", (length $string), $string +}; + +=item AnyEvent::Handle::register_write_type type => $coderef->($self, @args) + +This function (not method) lets you add your own types to C. +Whenever the given C is used, C will invoke the code +reference with the handle object and the remaining arguments. + +The code reference is supposed to return a single octet string that will +be appended to the write buffer. + +Note that this is a function, and all types registered this way will be +global, so try to use unique names. + +=cut + ############################################################################# =back @@ -421,7 +473,8 @@ defined $self->{rbuf_max} && $self->{rbuf_max} < length $self->{rbuf} ) { - $! = &Errno::ENOSPC; return $self->error; + $! = &Errno::ENOSPC; + $self->error; } return if $self->{in_drain}; @@ -430,10 +483,11 @@ while (my $len = length $self->{rbuf}) { no strict 'refs'; if (my $cb = shift @{ $self->{queue} }) { - if (!$cb->($self)) { + unless ($cb->($self)) { if ($self->{eof}) { # no progress can be made (not enough data and no data forthcoming) - $! = &Errno::EPIPE; return $self->error; + $! = &Errno::EPIPE; + $self->error; } unshift @{ $self->{queue} }, $cb; @@ -449,7 +503,8 @@ && $self->{on_read} # and we still want to read data ) { # then no progress can be made - $! = &Errno::EPIPE; return $self->error; + $! = &Errno::EPIPE; + $self->error; } } else { # read side becomes idle @@ -517,6 +572,12 @@ =cut +our %RH; + +sub register_read_type($$) { + $RH{$_[0]} = $_[1]; +} + sub push_read { my $self = shift; my $cb = pop; @@ -556,7 +617,8 @@ between a number of predefined parsing formats, for chunks of data, lines etc. -The types currently supported are: +Predefined types are (if you have ideas for additional types, feel free to +drop by and tell us): =over 4 @@ -640,8 +702,137 @@ $self->unshift_read (line => @_); } +=item netstring => $cb->($string) + +A netstring (http://cr.yp.to/proto/netstrings.txt, this is not an endorsement). + +Throws an error with C<$!> set to EBADMSG on format violations. + +=cut + +register_read_type netstring => sub { + my ($self, $cb) = @_; + + sub { + unless ($_[0]{rbuf} =~ s/^(0|[1-9][0-9]*)://) { + if ($_[0]{rbuf} =~ /[^0-9]/) { + $! = &Errno::EBADMSG; + $self->error; + } + return; + } + + my $len = $1; + + $self->unshift_read (chunk => $len, sub { + my $string = $_[1]; + $_[0]->unshift_read (chunk => 1, sub { + if ($_[1] eq ",") { + $cb->($_[0], $string); + } else { + $! = &Errno::EBADMSG; + $self->error; + } + }); + }); + + 1 + } +}; + +=item regex => $accept[, $reject[, $skip], $cb->($data) + +Makes a regex match against the regex object C<$accept> and returns +everything up to and including the match. + +Example: read a single line terminated by '\n'. + + $handle->push_read (regex => qr<\n>, sub { ... }); + +If C<$reject> is given and not undef, then it determines when the data is +to be rejected: it is matched against the data when the C<$accept> regex +does not match and generates an C error when it matches. This is +useful to quickly reject wrong data (to avoid waiting for a timeout or a +receive buffer overflow). + +Example: expect a single decimal number followed by whitespace, reject +anything else (not the use of an anchor). + + $handle->push_read (regex => qr<^[0-9]+\s>, qr<[^0-9]>, sub { ... }); + +If C<$skip> is given and not C, then it will be matched against +the receive buffer when neither C<$accept> nor C<$reject> match, +and everything preceding and including the match will be accepted +unconditionally. This is useful to skip large amounts of data that you +know cannot be matched, so that the C<$accept> or C<$reject> regex do not +have to start matching from the beginning. This is purely an optimisation +and is usually worth only when you expect more than a few kilobytes. + +Example: expect a http header, which ends at C<\015\012\015\012>. Since we +expect the header to be very large (it isn't in practise, but...), we use +a skip regex to skip initial portions. The skip regex is tricky in that +it only accepts something not ending in either \015 or \012, as these are +required for the accept regex. + + $handle->push_read (regex => + qr<\015\012\015\012>, + undef, # no reject + qr<^.*[^\015\012]>, + sub { ... }); + +=cut + +register_read_type regex => sub { + my ($self, $cb, $accept, $reject, $skip) = @_; + + my $data; + my $rbuf = \$self->{rbuf}; + + sub { + # accept + if ($$rbuf =~ $accept) { + $data .= substr $$rbuf, 0, $+[0], ""; + $cb->($self, $data); + return 1; + } + + # reject + if ($reject && $$rbuf =~ $reject) { + $! = &Errno::EBADMSG; + $self->error; + } + + # skip + if ($skip && $$rbuf =~ $skip) { + $data .= substr $$rbuf, 0, $+[0], ""; + } + + () + } +}; + =back +=item AnyEvent::Handle::register_read_type type => $coderef->($self, $cb, @args) + +This function (not method) lets you add your own types to C. + +Whenever the given C is used, C will invoke the code +reference with the handle object, the callback and the remaining +arguments. + +The code reference is supposed to return a callback (usually a closure) +that works as a plain read callback (see C<< ->push_read ($cb) >>). + +It should invoke the passed callback when it is done reading (remember to +pass C<$self> as first argument as all other callbacks do that). + +Note that this is a function, and all types registered this way will be +global, so try to use unique names. + +For examples, see the source of this module (F, +search for C)). + =item $handle->stop_read =item $handle->start_read @@ -679,7 +870,7 @@ $self->{eof} = 1; $self->_drain_rbuf; - } elsif ($! != EAGAIN && $! != EINTR) { + } elsif ($! != EAGAIN && $! != EINTR && $! != &AnyEvent::Util::WSAWOULDBLOCK) { return $self->error; } }); @@ -755,8 +946,8 @@ # and mismaintained ssleay-module doesn't even offer them). # http://www.mail-archive.com/openssl-dev@openssl.org/msg22420.html Net::SSLeay::CTX_set_mode ($self->{tls}, - (eval { Net::SSLeay::MODE_ENABLE_PARTIAL_WRITE () } || 1) - | (eval { Net::SSLeay::MODE_ACCEPT_MOVING_WRITE_BUFFER () } || 2)); + (eval { local $SIG{__DIE__}; Net::SSLeay::MODE_ENABLE_PARTIAL_WRITE () } || 1) + | (eval { local $SIG{__DIE__}; Net::SSLeay::MODE_ACCEPT_MOVING_WRITE_BUFFER () } || 2)); $self->{tls_rbio} = Net::SSLeay::BIO_new (Net::SSLeay::BIO_s_mem ()); $self->{tls_wbio} = Net::SSLeay::BIO_new (Net::SSLeay::BIO_s_mem ());