ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent/lib/AnyEvent/Handle.pm
(Generate patch)

Comparing AnyEvent/lib/AnyEvent/Handle.pm (file contents):
Revision 1.31 by root, Sun May 25 00:08:49 2008 UTC vs.
Revision 1.37 by root, Mon May 26 20:02:22 2008 UTC

2 2
3no warnings; 3no warnings;
4use strict; 4use strict;
5 5
6use AnyEvent (); 6use AnyEvent ();
7use AnyEvent::Util (); 7use AnyEvent::Util qw(WSAWOULDBLOCK);
8use Scalar::Util (); 8use Scalar::Util ();
9use Carp (); 9use Carp ();
10use Fcntl (); 10use Fcntl ();
11use Errno qw/EAGAIN EINTR/; 11use Errno qw/EAGAIN EINTR/;
12 12
93called. 93called.
94 94
95On callback entrance, the value of C<$!> contains the operating system 95On callback entrance, the value of C<$!> contains the operating system
96error (or C<ENOSPC>, C<EPIPE> or C<EBADMSG>). 96error (or C<ENOSPC>, C<EPIPE> or C<EBADMSG>).
97 97
98The callbakc should throw an exception. If it returns, then
99AnyEvent::Handle will C<croak> for you.
100
98While not mandatory, it is I<highly> recommended to set this callback, as 101While not mandatory, it is I<highly> recommended to set this callback, as
99you will not be notified of errors otherwise. The default simply calls 102you will not be notified of errors otherwise. The default simply calls
100die. 103die.
101 104
102=item on_read => $cb->($self) 105=item on_read => $cb->($self)
209 { 212 {
210 local $!; 213 local $!;
211 $self->_shutdown; 214 $self->_shutdown;
212 } 215 }
213 216
214 if ($self->{on_error}) {
215 $self->{on_error}($self); 217 $self->{on_error}($self)
216 } else { 218 if $self->{on_error};
219
217 Carp::croak "AnyEvent::Handle uncaught fatal error: $!"; 220 Carp::croak "AnyEvent::Handle uncaught fatal error: $!";
218 }
219} 221}
220 222
221=item $fh = $handle->fh 223=item $fh = $handle->fh
222 224
223This method returns the file handle of the L<AnyEvent::Handle> object. 225This method returns the file handle of the L<AnyEvent::Handle> object.
289 291
290sub _drain_wbuf { 292sub _drain_wbuf {
291 my ($self) = @_; 293 my ($self) = @_;
292 294
293 if (!$self->{ww} && length $self->{wbuf}) { 295 if (!$self->{ww} && length $self->{wbuf}) {
296
294 Scalar::Util::weaken $self; 297 Scalar::Util::weaken $self;
298
295 my $cb = sub { 299 my $cb = sub {
296 my $len = syswrite $self->{fh}, $self->{wbuf}; 300 my $len = syswrite $self->{fh}, $self->{wbuf};
297 301
298 if ($len >= 0) { 302 if ($len >= 0) {
299 substr $self->{wbuf}, 0, $len, ""; 303 substr $self->{wbuf}, 0, $len, "";
301 $self->{on_drain}($self) 305 $self->{on_drain}($self)
302 if $self->{low_water_mark} >= length $self->{wbuf} 306 if $self->{low_water_mark} >= length $self->{wbuf}
303 && $self->{on_drain}; 307 && $self->{on_drain};
304 308
305 delete $self->{ww} unless length $self->{wbuf}; 309 delete $self->{ww} unless length $self->{wbuf};
306 } elsif ($! != EAGAIN && $! != EINTR) { 310 } elsif ($! != EAGAIN && $! != EINTR && $! != WSAWOULDBLOCK) {
307 $self->error; 311 $self->error;
308 } 312 }
309 }; 313 };
310 314
315 # try to write data immediately
316 $cb->();
317
318 # if still data left in wbuf, we need to poll
311 $self->{ww} = AnyEvent->io (fh => $self->{fh}, poll => "w", cb => $cb); 319 $self->{ww} = AnyEvent->io (fh => $self->{fh}, poll => "w", cb => $cb)
312 320 if length $self->{wbuf};
313 $cb->($self);
314 }; 321 };
315} 322}
316 323
317our %WH; 324our %WH;
318 325
464 471
465 if ( 472 if (
466 defined $self->{rbuf_max} 473 defined $self->{rbuf_max}
467 && $self->{rbuf_max} < length $self->{rbuf} 474 && $self->{rbuf_max} < length $self->{rbuf}
468 ) { 475 ) {
469 $! = &Errno::ENOSPC; return $self->error; 476 $! = &Errno::ENOSPC;
477 $self->error;
470 } 478 }
471 479
472 return if $self->{in_drain}; 480 return if $self->{in_drain};
473 local $self->{in_drain} = 1; 481 local $self->{in_drain} = 1;
474 482
476 no strict 'refs'; 484 no strict 'refs';
477 if (my $cb = shift @{ $self->{queue} }) { 485 if (my $cb = shift @{ $self->{queue} }) {
478 unless ($cb->($self)) { 486 unless ($cb->($self)) {
479 if ($self->{eof}) { 487 if ($self->{eof}) {
480 # no progress can be made (not enough data and no data forthcoming) 488 # no progress can be made (not enough data and no data forthcoming)
481 $! = &Errno::EPIPE; return $self->error; 489 $! = &Errno::EPIPE;
490 $self->error;
482 } 491 }
483 492
484 unshift @{ $self->{queue} }, $cb; 493 unshift @{ $self->{queue} }, $cb;
485 return; 494 return;
486 } 495 }
492 && $len == length $self->{rbuf} # and no data has been consumed 501 && $len == length $self->{rbuf} # and no data has been consumed
493 && !@{ $self->{queue} } # and the queue is still empty 502 && !@{ $self->{queue} } # and the queue is still empty
494 && $self->{on_read} # and we still want to read data 503 && $self->{on_read} # and we still want to read data
495 ) { 504 ) {
496 # then no progress can be made 505 # then no progress can be made
497 $! = &Errno::EPIPE; return $self->error; 506 $! = &Errno::EPIPE;
507 $self->error;
498 } 508 }
499 } else { 509 } else {
500 # read side becomes idle 510 # read side becomes idle
501 delete $self->{rw}; 511 delete $self->{rw};
502 return; 512 return;
728 738
729 1 739 1
730 } 740 }
731}; 741};
732 742
743=item regex => $accept[, $reject[, $skip], $cb->($data)
744
745Makes a regex match against the regex object C<$accept> and returns
746everything up to and including the match.
747
748Example: read a single line terminated by '\n'.
749
750 $handle->push_read (regex => qr<\n>, sub { ... });
751
752If C<$reject> is given and not undef, then it determines when the data is
753to be rejected: it is matched against the data when the C<$accept> regex
754does not match and generates an C<EBADMSG> error when it matches. This is
755useful to quickly reject wrong data (to avoid waiting for a timeout or a
756receive buffer overflow).
757
758Example: expect a single decimal number followed by whitespace, reject
759anything else (not the use of an anchor).
760
761 $handle->push_read (regex => qr<^[0-9]+\s>, qr<[^0-9]>, sub { ... });
762
763If C<$skip> is given and not C<undef>, then it will be matched against
764the receive buffer when neither C<$accept> nor C<$reject> match,
765and everything preceding and including the match will be accepted
766unconditionally. This is useful to skip large amounts of data that you
767know cannot be matched, so that the C<$accept> or C<$reject> regex do not
768have to start matching from the beginning. This is purely an optimisation
769and is usually worth only when you expect more than a few kilobytes.
770
771Example: expect a http header, which ends at C<\015\012\015\012>. Since we
772expect the header to be very large (it isn't in practise, but...), we use
773a skip regex to skip initial portions. The skip regex is tricky in that
774it only accepts something not ending in either \015 or \012, as these are
775required for the accept regex.
776
777 $handle->push_read (regex =>
778 qr<\015\012\015\012>,
779 undef, # no reject
780 qr<^.*[^\015\012]>,
781 sub { ... });
782
783=cut
784
785register_read_type regex => sub {
786 my ($self, $cb, $accept, $reject, $skip) = @_;
787
788 my $data;
789 my $rbuf = \$self->{rbuf};
790
791 sub {
792 # accept
793 if ($$rbuf =~ $accept) {
794 $data .= substr $$rbuf, 0, $+[0], "";
795 $cb->($self, $data);
796 return 1;
797 }
798
799 # reject
800 if ($reject && $$rbuf =~ $reject) {
801 $! = &Errno::EBADMSG;
802 $self->error;
803 }
804
805 # skip
806 if ($skip && $$rbuf =~ $skip) {
807 $data .= substr $$rbuf, 0, $+[0], "";
808 }
809
810 ()
811 }
812};
813
733=back 814=back
734 815
735=item AnyEvent::Handle::register_read_type type => $coderef->($self, $cb, @args) 816=item AnyEvent::Handle::register_read_type type => $coderef->($self, $cb, @args)
736 817
737This function (not method) lets you add your own types to C<push_read>. 818This function (not method) lets you add your own types to C<push_read>.
787 } elsif (defined $len) { 868 } elsif (defined $len) {
788 delete $self->{rw}; 869 delete $self->{rw};
789 $self->{eof} = 1; 870 $self->{eof} = 1;
790 $self->_drain_rbuf; 871 $self->_drain_rbuf;
791 872
792 } elsif ($! != EAGAIN && $! != EINTR) { 873 } elsif ($! != EAGAIN && $! != EINTR && $! != &AnyEvent::Util::WSAWOULDBLOCK) {
793 return $self->error; 874 return $self->error;
794 } 875 }
795 }); 876 });
796 } 877 }
797} 878}
863 # but the openssl maintainers basically said: "trust us, it just works". 944 # but the openssl maintainers basically said: "trust us, it just works".
864 # (unfortunately, we have to hardcode constants because the abysmally misdesigned 945 # (unfortunately, we have to hardcode constants because the abysmally misdesigned
865 # and mismaintained ssleay-module doesn't even offer them). 946 # and mismaintained ssleay-module doesn't even offer them).
866 # http://www.mail-archive.com/openssl-dev@openssl.org/msg22420.html 947 # http://www.mail-archive.com/openssl-dev@openssl.org/msg22420.html
867 Net::SSLeay::CTX_set_mode ($self->{tls}, 948 Net::SSLeay::CTX_set_mode ($self->{tls},
868 (eval { Net::SSLeay::MODE_ENABLE_PARTIAL_WRITE () } || 1) 949 (eval { local $SIG{__DIE__}; Net::SSLeay::MODE_ENABLE_PARTIAL_WRITE () } || 1)
869 | (eval { Net::SSLeay::MODE_ACCEPT_MOVING_WRITE_BUFFER () } || 2)); 950 | (eval { local $SIG{__DIE__}; Net::SSLeay::MODE_ACCEPT_MOVING_WRITE_BUFFER () } || 2));
870 951
871 $self->{tls_rbio} = Net::SSLeay::BIO_new (Net::SSLeay::BIO_s_mem ()); 952 $self->{tls_rbio} = Net::SSLeay::BIO_new (Net::SSLeay::BIO_s_mem ());
872 $self->{tls_wbio} = Net::SSLeay::BIO_new (Net::SSLeay::BIO_s_mem ()); 953 $self->{tls_wbio} = Net::SSLeay::BIO_new (Net::SSLeay::BIO_s_mem ());
873 954
874 Net::SSLeay::set_bio ($ssl, $self->{tls_rbio}, $self->{tls_wbio}); 955 Net::SSLeay::set_bio ($ssl, $self->{tls_rbio}, $self->{tls_wbio});

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines