ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent/lib/AnyEvent/Handle.pm
(Generate patch)

Comparing AnyEvent/lib/AnyEvent/Handle.pm (file contents):
Revision 1.20 by elmex, Sat May 24 08:16:50 2008 UTC vs.
Revision 1.29 by root, Sat May 24 23:10:18 2008 UTC

10use Fcntl (); 10use Fcntl ();
11use Errno qw/EAGAIN EINTR/; 11use Errno qw/EAGAIN EINTR/;
12 12
13=head1 NAME 13=head1 NAME
14 14
15AnyEvent::Handle - non-blocking I/O on filehandles via AnyEvent 15AnyEvent::Handle - non-blocking I/O on file handles via AnyEvent
16 16
17This module is experimental. 17This module is experimental.
18 18
19=cut 19=cut
20 20
90 90
91The object will not be in a usable state when this callback has been 91The object will not be in a usable state when this callback has been
92called. 92called.
93 93
94On callback entrance, the value of C<$!> contains the operating system 94On callback entrance, the value of C<$!> contains the operating system
95error (or C<ENOSPC> or C<EPIPE>). 95error (or C<ENOSPC>, C<EPIPE> or C<EBADMSG>).
96 96
97While not mandatory, it is I<highly> recommended to set this callback, as 97While not mandatory, it is I<highly> recommended to set this callback, as
98you will not be notified of errors otherwise. The default simply calls 98you will not be notified of errors otherwise. The default simply calls
99die. 99die.
100 100
145 145
146When this parameter is given, it enables TLS (SSL) mode, that means it 146When this parameter is given, it enables TLS (SSL) mode, that means it
147will start making tls handshake and will transparently encrypt/decrypt 147will start making tls handshake and will transparently encrypt/decrypt
148data. 148data.
149 149
150TLS mode requires Net::SSLeay to be installed (it will be loaded
151automatically when you try to create a TLS handle).
152
150For the TLS server side, use C<accept>, and for the TLS client side of a 153For the TLS server side, use C<accept>, and for the TLS client side of a
151connection, use C<connect> mode. 154connection, use C<connect> mode.
152 155
153You can also provide your own TLS connection object, but you have 156You can also provide your own TLS connection object, but you have
154to make sure that you call either C<Net::SSLeay::set_connect_state> 157to make sure that you call either C<Net::SSLeay::set_connect_state>
155or C<Net::SSLeay::set_accept_state> on it before you pass it to 158or C<Net::SSLeay::set_accept_state> on it before you pass it to
156AnyEvent::Handle. 159AnyEvent::Handle.
157 160
161See the C<starttls> method if you need to start TLs negotiation later.
162
158=item tls_ctx => $ssl_ctx 163=item tls_ctx => $ssl_ctx
159 164
160Use the given Net::SSLeay::CTX object to create the new TLS connection 165Use the given Net::SSLeay::CTX object to create the new TLS connection
161(unless a connection object was specified directly). If this parameter is 166(unless a connection object was specified directly). If this parameter is
162missing, then AnyEvent::Handle will use C<AnyEvent::Handle::TLS_CTX>. 167missing, then AnyEvent::Handle will use C<AnyEvent::Handle::TLS_CTX>.
163 168
164=back 169=back
165 170
166=cut 171=cut
172
173our (%RH, %WH);
174
175sub register_read_type($$) {
176 $RH{$_[0]} = $_[1];
177}
178
179sub register_write_type($$) {
180 $WH{$_[0]} = $_[1];
181}
167 182
168sub new { 183sub new {
169 my $class = shift; 184 my $class = shift;
170 185
171 my $self = bless { @_ }, $class; 186 my $self = bless { @_ }, $class;
206 } 221 }
207 222
208 if ($self->{on_error}) { 223 if ($self->{on_error}) {
209 $self->{on_error}($self); 224 $self->{on_error}($self);
210 } else { 225 } else {
211 die "AnyEvent::Handle uncaught fatal error: $!"; 226 Carp::croak "AnyEvent::Handle uncaught fatal error: $!";
212 } 227 }
213} 228}
214 229
215=item $fh = $handle->fh 230=item $fh = $handle->fh
216 231
217This method returns the filehandle of the L<AnyEvent::Handle> object. 232This method returns the file handle of the L<AnyEvent::Handle> object.
218 233
219=cut 234=cut
220 235
221sub fh { $_[0]->{fh} } 236sub fh { $_[0]->{fh} }
222 237
282=cut 297=cut
283 298
284sub _drain_wbuf { 299sub _drain_wbuf {
285 my ($self) = @_; 300 my ($self) = @_;
286 301
287 unless ($self->{ww}) { 302 if (!$self->{ww} && length $self->{wbuf}) {
288 Scalar::Util::weaken $self; 303 Scalar::Util::weaken $self;
289 my $cb = sub { 304 my $cb = sub {
290 my $len = syswrite $self->{fh}, $self->{wbuf}; 305 my $len = syswrite $self->{fh}, $self->{wbuf};
291 306
292 if ($len > 0) { 307 if ($len >= 0) {
293 substr $self->{wbuf}, 0, $len, ""; 308 substr $self->{wbuf}, 0, $len, "";
294 309
295 $self->{on_drain}($self) 310 $self->{on_drain}($self)
296 if $self->{low_water_mark} >= length $self->{wbuf} 311 if $self->{low_water_mark} >= length $self->{wbuf}
297 && $self->{on_drain}; 312 && $self->{on_drain};
309} 324}
310 325
311sub push_write { 326sub push_write {
312 my $self = shift; 327 my $self = shift;
313 328
329 if (@_ > 1) {
330 my $type = shift;
331
332 @_ = ($WH{$type} or Carp::croak "unsupported type passed to AnyEvent::Handle::push_write")
333 ->($self, @_);
334 }
335
314 if ($self->{filter_w}) { 336 if ($self->{filter_w}) {
315 $self->{filter_w}->($self, \$_[0]); 337 $self->{filter_w}->($self, \$_[0]);
316 } else { 338 } else {
317 $self->{wbuf} .= $_[0]; 339 $self->{wbuf} .= $_[0];
318 $self->_drain_wbuf; 340 $self->_drain_wbuf;
319 } 341 }
320} 342}
343
344=item $handle->push_write (type => @args)
345
346=item $handle->unshift_write (type => @args)
347
348Instead of formatting your data yourself, you can also let this module do
349the job by specifying a type and type-specific arguments.
350
351Predefined types are:
352
353=over 4
354
355=item netstring => $string
356
357Formats the given value as netstring
358(http://cr.yp.to/proto/netstrings.txt, this is not a recommendation to use them).
359
360=cut
361
362register_write_type netstring => sub {
363 my ($self, $string) = @_;
364
365 sprintf "%d:%s,", (length $string), $string
366};
367
368=back
369
370=cut
371
372
321 373
322############################################################################# 374#############################################################################
323 375
324=back 376=back
325 377
413 local $self->{in_drain} = 1; 465 local $self->{in_drain} = 1;
414 466
415 while (my $len = length $self->{rbuf}) { 467 while (my $len = length $self->{rbuf}) {
416 no strict 'refs'; 468 no strict 'refs';
417 if (my $cb = shift @{ $self->{queue} }) { 469 if (my $cb = shift @{ $self->{queue} }) {
418 if (!$cb->($self)) { 470 unless ($cb->($self)) {
419 if ($self->{eof}) { 471 if ($self->{eof}) {
420 # no progress can be made (not enough data and no data forthcoming) 472 # no progress can be made (not enough data and no data forthcoming)
421 $! = &Errno::EPIPE; return $self->error; 473 $! = &Errno::EPIPE; return $self->error;
422 } 474 }
423 475
501true, it will be removed from the queue. 553true, it will be removed from the queue.
502 554
503=cut 555=cut
504 556
505sub push_read { 557sub push_read {
506 my ($self, $cb) = @_; 558 my $self = shift;
559 my $cb = pop;
560
561 if (@_) {
562 my $type = shift;
563
564 $cb = ($RH{$type} or Carp::croak "unsupported type passed to AnyEvent::Handle::push_read")
565 ->($self, $cb, @_);
566 }
507 567
508 push @{ $self->{queue} }, $cb; 568 push @{ $self->{queue} }, $cb;
509 $self->_drain_rbuf; 569 $self->_drain_rbuf;
510} 570}
511 571
512sub unshift_read { 572sub unshift_read {
513 my ($self, $cb) = @_; 573 my $self = shift;
574 my $cb = pop;
514 575
576 if (@_) {
577 my $type = shift;
578
579 $cb = ($RH{$type} or Carp::croak "unsupported type passed to AnyEvent::Handle::unshift_read")
580 ->($self, $cb, @_);
581 }
582
583
515 push @{ $self->{queue} }, $cb; 584 unshift @{ $self->{queue} }, $cb;
516 $self->_drain_rbuf; 585 $self->_drain_rbuf;
517} 586}
518 587
519=item $handle->push_read_chunk ($len, $cb->($self, $data)) 588=item $handle->push_read (type => @args, $cb)
520 589
521=item $handle->unshift_read_chunk ($len, $cb->($self, $data)) 590=item $handle->unshift_read (type => @args, $cb)
522 591
523Append the given callback to the end of the queue (C<push_read_chunk>) or 592Instead of providing a callback that parses the data itself you can chose
524prepend it (C<unshift_read_chunk>). 593between a number of predefined parsing formats, for chunks of data, lines
594etc.
525 595
526The callback will be called only once C<$len> bytes have been read, and 596The types currently supported are:
527these C<$len> bytes will be passed to the callback.
528 597
529=cut 598=over 4
530 599
531sub _read_chunk($$) { 600=item chunk => $octets, $cb->($self, $data)
601
602Invoke the callback only once C<$octets> bytes have been read. Pass the
603data read to the callback. The callback will never be called with less
604data.
605
606Example: read 2 bytes.
607
608 $handle->push_read (chunk => 2, sub {
609 warn "yay ", unpack "H*", $_[1];
610 });
611
612=cut
613
614register_read_type chunk => sub {
532 my ($self, $len, $cb) = @_; 615 my ($self, $cb, $len) = @_;
533 616
534 sub { 617 sub {
535 $len <= length $_[0]{rbuf} or return; 618 $len <= length $_[0]{rbuf} or return;
536 $cb->($_[0], substr $_[0]{rbuf}, 0, $len, ""); 619 $cb->($_[0], substr $_[0]{rbuf}, 0, $len, "");
537 1 620 1
538 } 621 }
539} 622};
540 623
624# compatibility with older API
541sub push_read_chunk { 625sub push_read_chunk {
542 $_[0]->push_read (&_read_chunk); 626 $_[0]->push_read (chunk => $_[1], $_[2]);
543} 627}
544
545 628
546sub unshift_read_chunk { 629sub unshift_read_chunk {
547 $_[0]->unshift_read (&_read_chunk); 630 $_[0]->unshift_read (chunk => $_[1], $_[2]);
548} 631}
549 632
550=item $handle->push_read_line ([$eol, ]$cb->($self, $line, $eol)) 633=item line => [$eol, ]$cb->($self, $line, $eol)
551
552=item $handle->unshift_read_line ([$eol, ]$cb->($self, $line, $eol))
553
554Append the given callback to the end of the queue (C<push_read_line>) or
555prepend it (C<unshift_read_line>).
556 634
557The callback will be called only once a full line (including the end of 635The callback will be called only once a full line (including the end of
558line marker, C<$eol>) has been read. This line (excluding the end of line 636line marker, C<$eol>) has been read. This line (excluding the end of line
559marker) will be passed to the callback as second argument (C<$line>), and 637marker) will be passed to the callback as second argument (C<$line>), and
560the end of line marker as the third argument (C<$eol>). 638the end of line marker as the third argument (C<$eol>).
571Partial lines at the end of the stream will never be returned, as they are 649Partial lines at the end of the stream will never be returned, as they are
572not marked by the end of line marker. 650not marked by the end of line marker.
573 651
574=cut 652=cut
575 653
576sub _read_line($$) { 654register_read_type line => sub {
577 my $self = shift; 655 my ($self, $cb, $eol) = @_;
578 my $cb = pop;
579 my $eol = @_ ? shift : qr|(\015?\012)|;
580 my $pos;
581 656
657 $eol = qr|(\015?\012)| if @_ < 3;
582 $eol = quotemeta $eol unless ref $eol; 658 $eol = quotemeta $eol unless ref $eol;
583 $eol = qr|^(.*?)($eol)|s; 659 $eol = qr|^(.*?)($eol)|s;
584 660
585 sub { 661 sub {
586 $_[0]{rbuf} =~ s/$eol// or return; 662 $_[0]{rbuf} =~ s/$eol// or return;
587 663
588 $cb->($_[0], $1, $2); 664 $cb->($_[0], $1, $2);
589 1 665 1
590 } 666 }
591} 667};
592 668
669# compatibility with older API
593sub push_read_line { 670sub push_read_line {
594 $_[0]->push_read (&_read_line); 671 my $self = shift;
672 $self->push_read (line => @_);
595} 673}
596 674
597sub unshift_read_line { 675sub unshift_read_line {
598 $_[0]->unshift_read (&_read_line); 676 my $self = shift;
677 $self->unshift_read (line => @_);
599} 678}
679
680=item netstring => $cb->($string)
681
682A netstring (http://cr.yp.to/proto/netstrings.txt, this is not an endorsement).
683
684Throws an error with C<$!> set to EBADMSG on format violations.
685
686=cut
687
688register_read_type netstring => sub {
689 my ($self, $cb) = @_;
690
691 sub {
692 unless ($_[0]{rbuf} =~ s/^(0|[1-9][0-9]*)://) {
693 if ($_[0]{rbuf} =~ /[^0-9]/) {
694 $! = &Errno::EBADMSG;
695 $self->error;
696 }
697 return;
698 }
699
700 my $len = $1;
701
702 $self->unshift_read (chunk => $len, sub {
703 my $string = $_[1];
704 $_[0]->unshift_read (chunk => 1, sub {
705 if ($_[1] eq ",") {
706 $cb->($_[0], $string);
707 } else {
708 $! = &Errno::EBADMSG;
709 $self->error;
710 }
711 });
712 });
713
714 1
715 }
716};
717
718=back
600 719
601=item $handle->stop_read 720=item $handle->stop_read
602 721
603=item $handle->start_read 722=item $handle->start_read
604 723
605In rare cases you actually do not want to read anything from the 724In rare cases you actually do not want to read anything from the
606socket. In this case you can call C<stop_read>. Neither C<on_read> no 725socket. In this case you can call C<stop_read>. Neither C<on_read> no
607any queued callbacks will be executed then. To start readign again, call 726any queued callbacks will be executed then. To start reading again, call
608C<start_read>. 727C<start_read>.
609 728
610=cut 729=cut
611 730
612sub stop_read { 731sub stop_read {
644 763
645sub _dotls { 764sub _dotls {
646 my ($self) = @_; 765 my ($self) = @_;
647 766
648 if (length $self->{tls_wbuf}) { 767 if (length $self->{tls_wbuf}) {
649 my $len = Net::SSLeay::write ($self->{tls}, $self->{tls_wbuf}); 768 while ((my $len = Net::SSLeay::write ($self->{tls}, $self->{tls_wbuf})) > 0) {
650 substr $self->{tls_wbuf}, 0, $len, "" if $len > 0; 769 substr $self->{tls_wbuf}, 0, $len, "";
770 }
651 } 771 }
652 772
653 if (defined (my $buf = Net::SSLeay::BIO_read ($self->{tls_wbio}))) { 773 if (defined (my $buf = Net::SSLeay::BIO_read ($self->{tls_wbio}))) {
654 $self->{wbuf} .= $buf; 774 $self->{wbuf} .= $buf;
655 $self->_drain_wbuf; 775 $self->_drain_wbuf;
656 } 776 }
657 777
658 if (defined (my $buf = Net::SSLeay::read ($self->{tls}))) { 778 while (defined (my $buf = Net::SSLeay::read ($self->{tls}))) {
659 $self->{rbuf} .= $buf; 779 $self->{rbuf} .= $buf;
660 $self->_drain_rbuf; 780 $self->_drain_rbuf;
661 } elsif ( 781 }
782
662 (my $err = Net::SSLeay::get_error ($self->{tls}, -1)) 783 my $err = Net::SSLeay::get_error ($self->{tls}, -1);
784
663 != Net::SSLeay::ERROR_WANT_READ () 785 if ($err!= Net::SSLeay::ERROR_WANT_READ ()) {
664 ) {
665 if ($err == Net::SSLeay::ERROR_SYSCALL ()) { 786 if ($err == Net::SSLeay::ERROR_SYSCALL ()) {
666 $self->error; 787 $self->error;
667 } elsif ($err == Net::SSLeay::ERROR_SSL ()) { 788 } elsif ($err == Net::SSLeay::ERROR_SSL ()) {
668 $! = &Errno::EIO; 789 $! = &Errno::EIO;
669 $self->error; 790 $self->error;
671 792
672 # all others are fine for our purposes 793 # all others are fine for our purposes
673 } 794 }
674} 795}
675 796
797=item $handle->starttls ($tls[, $tls_ctx])
798
799Instead of starting TLS negotiation immediately when the AnyEvent::Handle
800object is created, you can also do that at a later time by calling
801C<starttls>.
802
803The first argument is the same as the C<tls> constructor argument (either
804C<"connect">, C<"accept"> or an existing Net::SSLeay object).
805
806The second argument is the optional C<Net::SSLeay::CTX> object that is
807used when AnyEvent::Handle has to create its own TLS connection object.
808
809=cut
810
676# TODO: maybe document... 811# TODO: maybe document...
677sub starttls { 812sub starttls {
678 my ($self, $ssl, $ctx) = @_; 813 my ($self, $ssl, $ctx) = @_;
814
815 $self->stoptls;
679 816
680 if ($ssl eq "accept") { 817 if ($ssl eq "accept") {
681 $ssl = Net::SSLeay::new ($ctx || TLS_CTX ()); 818 $ssl = Net::SSLeay::new ($ctx || TLS_CTX ());
682 Net::SSLeay::set_accept_state ($ssl); 819 Net::SSLeay::set_accept_state ($ssl);
683 } elsif ($ssl eq "connect") { 820 } elsif ($ssl eq "connect") {
684 $ssl = Net::SSLeay::new ($ctx || TLS_CTX ()); 821 $ssl = Net::SSLeay::new ($ctx || TLS_CTX ());
685 Net::SSLeay::set_connect_state ($ssl); 822 Net::SSLeay::set_connect_state ($ssl);
686 } 823 }
687 824
688 $self->{tls} = $ssl; 825 $self->{tls} = $ssl;
826
827 # basically, this is deep magic (because SSL_read should have the same issues)
828 # but the openssl maintainers basically said: "trust us, it just works".
829 # (unfortunately, we have to hardcode constants because the abysmally misdesigned
830 # and mismaintained ssleay-module doesn't even offer them).
831 # http://www.mail-archive.com/openssl-dev@openssl.org/msg22420.html
832 Net::SSLeay::CTX_set_mode ($self->{tls},
833 (eval { Net::SSLeay::MODE_ENABLE_PARTIAL_WRITE () } || 1)
834 | (eval { Net::SSLeay::MODE_ACCEPT_MOVING_WRITE_BUFFER () } || 2));
689 835
690 $self->{tls_rbio} = Net::SSLeay::BIO_new (Net::SSLeay::BIO_s_mem ()); 836 $self->{tls_rbio} = Net::SSLeay::BIO_new (Net::SSLeay::BIO_s_mem ());
691 $self->{tls_wbio} = Net::SSLeay::BIO_new (Net::SSLeay::BIO_s_mem ()); 837 $self->{tls_wbio} = Net::SSLeay::BIO_new (Net::SSLeay::BIO_s_mem ());
692 838
693 Net::SSLeay::set_bio ($ssl, $self->{tls_rbio}, $self->{tls_wbio}); 839 Net::SSLeay::set_bio ($ssl, $self->{tls_rbio}, $self->{tls_wbio});
700 Net::SSLeay::BIO_write ($_[0]{tls_rbio}, ${$_[1]}); 846 Net::SSLeay::BIO_write ($_[0]{tls_rbio}, ${$_[1]});
701 &_dotls; 847 &_dotls;
702 }; 848 };
703} 849}
704 850
851=item $handle->stoptls
852
853Destroys the SSL connection, if any. Partial read or write data will be
854lost.
855
856=cut
857
858sub stoptls {
859 my ($self) = @_;
860
861 Net::SSLeay::free (delete $self->{tls}) if $self->{tls};
862 delete $self->{tls_rbio};
863 delete $self->{tls_wbio};
864 delete $self->{tls_wbuf};
865 delete $self->{filter_r};
866 delete $self->{filter_w};
867}
868
705sub DESTROY { 869sub DESTROY {
706 my $self = shift; 870 my $self = shift;
707 871
708 Net::SSLeay::free (delete $self->{tls}) if $self->{tls}; 872 $self->stoptls;
709} 873}
710 874
711=item AnyEvent::Handle::TLS_CTX 875=item AnyEvent::Handle::TLS_CTX
712 876
713This function creates and returns the Net::SSLeay::CTX object used by 877This function creates and returns the Net::SSLeay::CTX object used by

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines