ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent/lib/AnyEvent/Handle.pm
(Generate patch)

Comparing AnyEvent/lib/AnyEvent/Handle.pm (file contents):
Revision 1.12 by elmex, Thu May 15 09:03:43 2008 UTC vs.
Revision 1.26 by root, Sat May 24 15:20:46 2008 UTC

10use Fcntl (); 10use Fcntl ();
11use Errno qw/EAGAIN EINTR/; 11use Errno qw/EAGAIN EINTR/;
12 12
13=head1 NAME 13=head1 NAME
14 14
15AnyEvent::Handle - non-blocking I/O on filehandles via AnyEvent 15AnyEvent::Handle - non-blocking I/O on file handles via AnyEvent
16 16
17=cut 17This module is experimental.
18 18
19=cut
20
19our $VERSION = '0.02'; 21our $VERSION = '0.04';
20 22
21=head1 SYNOPSIS 23=head1 SYNOPSIS
22 24
23 use AnyEvent; 25 use AnyEvent;
24 use AnyEvent::Handle; 26 use AnyEvent::Handle;
43 $cv->wait; 45 $cv->wait;
44 46
45=head1 DESCRIPTION 47=head1 DESCRIPTION
46 48
47This module is a helper module to make it easier to do event-based I/O on 49This module is a helper module to make it easier to do event-based I/O on
48filehandles (and sockets, see L<AnyEvent::Socket> for an easy way to make 50filehandles. For utility functions for doing non-blocking connects and accepts
49non-blocking resolves and connects). 51on sockets see L<AnyEvent::Util>.
50 52
51In the following, when the documentation refers to of "bytes" then this 53In the following, when the documentation refers to of "bytes" then this
52means characters. As sysread and syswrite are used for all I/O, their 54means characters. As sysread and syswrite are used for all I/O, their
53treatment of characters applies to this module as well. 55treatment of characters applies to this module as well.
54 56
70The filehandle this L<AnyEvent::Handle> object will operate on. 72The filehandle this L<AnyEvent::Handle> object will operate on.
71 73
72NOTE: The filehandle will be set to non-blocking (using 74NOTE: The filehandle will be set to non-blocking (using
73AnyEvent::Util::fh_nonblocking). 75AnyEvent::Util::fh_nonblocking).
74 76
75=item on_eof => $cb->($self) [MANDATORY] 77=item on_eof => $cb->($self)
76 78
77Set the callback to be called on EOF. 79Set the callback to be called on EOF.
78 80
81While not mandatory, it is highly recommended to set an eof callback,
82otherwise you might end up with a closed socket while you are still
83waiting for data.
84
79=item on_error => $cb->($self) 85=item on_error => $cb->($self)
80 86
81This is the fatal error callback, that is called when, well, a fatal error 87This is the fatal error callback, that is called when, well, a fatal error
82ocurs, such as not being able to resolve the hostname, failure to connect 88occurs, such as not being able to resolve the hostname, failure to connect
83or a read error. 89or a read error.
84 90
85The object will not be in a usable state when this callback has been 91The object will not be in a usable state when this callback has been
86called. 92called.
87 93
96 102
97This sets the default read callback, which is called when data arrives 103This sets the default read callback, which is called when data arrives
98and no read request is in the queue. 104and no read request is in the queue.
99 105
100To access (and remove data from) the read buffer, use the C<< ->rbuf >> 106To access (and remove data from) the read buffer, use the C<< ->rbuf >>
101method or acces sthe C<$self->{rbuf}> member directly. 107method or access the C<$self->{rbuf}> member directly.
102 108
103When an EOF condition is detected then AnyEvent::Handle will first try to 109When an EOF condition is detected then AnyEvent::Handle will first try to
104feed all the remaining data to the queued callbacks and C<on_read> before 110feed all the remaining data to the queued callbacks and C<on_read> before
105calling the C<on_eof> callback. If no progress can be made, then a fatal 111calling the C<on_eof> callback. If no progress can be made, then a fatal
106error will be raised (with C<$!> set to C<EPIPE>). 112error will be raised (with C<$!> set to C<EPIPE>).
133 139
134Sets the amount of bytes (default: C<0>) that make up an "empty" write 140Sets the amount of bytes (default: C<0>) that make up an "empty" write
135buffer: If the write reaches this size or gets even samller it is 141buffer: If the write reaches this size or gets even samller it is
136considered empty. 142considered empty.
137 143
144=item tls => "accept" | "connect" | Net::SSLeay::SSL object
145
146When this parameter is given, it enables TLS (SSL) mode, that means it
147will start making tls handshake and will transparently encrypt/decrypt
148data.
149
150TLS mode requires Net::SSLeay to be installed (it will be loaded
151automatically when you try to create a TLS handle).
152
153For the TLS server side, use C<accept>, and for the TLS client side of a
154connection, use C<connect> mode.
155
156You can also provide your own TLS connection object, but you have
157to make sure that you call either C<Net::SSLeay::set_connect_state>
158or C<Net::SSLeay::set_accept_state> on it before you pass it to
159AnyEvent::Handle.
160
161See the C<starttls> method if you need to start TLs negotiation later.
162
163=item tls_ctx => $ssl_ctx
164
165Use the given Net::SSLeay::CTX object to create the new TLS connection
166(unless a connection object was specified directly). If this parameter is
167missing, then AnyEvent::Handle will use C<AnyEvent::Handle::TLS_CTX>.
168
138=back 169=back
139 170
140=cut 171=cut
141 172
142sub new { 173sub new {
146 177
147 $self->{fh} or Carp::croak "mandatory argument fh is missing"; 178 $self->{fh} or Carp::croak "mandatory argument fh is missing";
148 179
149 AnyEvent::Util::fh_nonblocking $self->{fh}, 1; 180 AnyEvent::Util::fh_nonblocking $self->{fh}, 1;
150 181
151 $self->on_eof ((delete $self->{on_eof} ) or Carp::croak "mandatory argument on_eof is missing"); 182 if ($self->{tls}) {
183 require Net::SSLeay;
184 $self->starttls (delete $self->{tls}, delete $self->{tls_ctx});
185 }
152 186
187 $self->on_eof (delete $self->{on_eof} ) if $self->{on_eof};
153 $self->on_error (delete $self->{on_error}) if $self->{on_error}; 188 $self->on_error (delete $self->{on_error}) if $self->{on_error};
154 $self->on_drain (delete $self->{on_drain}) if $self->{on_drain}; 189 $self->on_drain (delete $self->{on_drain}) if $self->{on_drain};
155 $self->on_read (delete $self->{on_read} ) if $self->{on_read}; 190 $self->on_read (delete $self->{on_read} ) if $self->{on_read};
156 191
157 $self->start_read; 192 $self->start_read;
182 } 217 }
183} 218}
184 219
185=item $fh = $handle->fh 220=item $fh = $handle->fh
186 221
187This method returns the filehandle of the L<AnyEvent::Handle> object. 222This method returns the file handle of the L<AnyEvent::Handle> object.
188 223
189=cut 224=cut
190 225
191sub fh { $_[0]->{fh} } 226sub fh { $_[0]->{fh} }
192 227
220for reading. 255for reading.
221 256
222The write queue is very simple: you can add data to its end, and 257The write queue is very simple: you can add data to its end, and
223AnyEvent::Handle will automatically try to get rid of it for you. 258AnyEvent::Handle will automatically try to get rid of it for you.
224 259
225When data could be writtena nd the write buffer is shorter then the low 260When data could be written and the write buffer is shorter then the low
226water mark, the C<on_drain> callback will be invoked. 261water mark, the C<on_drain> callback will be invoked.
227 262
228=over 4 263=over 4
229 264
230=item $handle->on_drain ($cb) 265=item $handle->on_drain ($cb)
249want (only limited by the available memory), as C<AnyEvent::Handle> 284want (only limited by the available memory), as C<AnyEvent::Handle>
250buffers it independently of the kernel. 285buffers it independently of the kernel.
251 286
252=cut 287=cut
253 288
254sub push_write { 289sub _drain_wbuf {
255 my ($self, $data) = @_; 290 my ($self) = @_;
256
257 $self->{wbuf} .= $data;
258 291
259 unless ($self->{ww}) { 292 unless ($self->{ww}) {
260 Scalar::Util::weaken $self; 293 Scalar::Util::weaken $self;
261 my $cb = sub { 294 my $cb = sub {
262 my $len = syswrite $self->{fh}, $self->{wbuf}; 295 my $len = syswrite $self->{fh}, $self->{wbuf};
263 296
264 if ($len > 0) { 297 if ($len > 0) {
265 substr $self->{wbuf}, 0, $len, ""; 298 substr $self->{wbuf}, 0, $len, "";
266
267 299
268 $self->{on_drain}($self) 300 $self->{on_drain}($self)
269 if $self->{low_water_mark} >= length $self->{wbuf} 301 if $self->{low_water_mark} >= length $self->{wbuf}
270 && $self->{on_drain}; 302 && $self->{on_drain};
271 303
277 309
278 $self->{ww} = AnyEvent->io (fh => $self->{fh}, poll => "w", cb => $cb); 310 $self->{ww} = AnyEvent->io (fh => $self->{fh}, poll => "w", cb => $cb);
279 311
280 $cb->($self); 312 $cb->($self);
281 }; 313 };
314}
315
316sub push_write {
317 my $self = shift;
318
319 if ($self->{filter_w}) {
320 $self->{filter_w}->($self, \$_[0]);
321 } else {
322 $self->{wbuf} .= $_[0];
323 $self->_drain_wbuf;
324 }
282} 325}
283 326
284############################################################################# 327#############################################################################
285 328
286=back 329=back
361 404
362=cut 405=cut
363 406
364sub _drain_rbuf { 407sub _drain_rbuf {
365 my ($self) = @_; 408 my ($self) = @_;
409
410 if (
411 defined $self->{rbuf_max}
412 && $self->{rbuf_max} < length $self->{rbuf}
413 ) {
414 $! = &Errno::ENOSPC; return $self->error;
415 }
366 416
367 return if $self->{in_drain}; 417 return if $self->{in_drain};
368 local $self->{in_drain} = 1; 418 local $self->{in_drain} = 1;
369 419
370 while (my $len = length $self->{rbuf}) { 420 while (my $len = length $self->{rbuf}) {
398 } 448 }
399 } 449 }
400 450
401 if ($self->{eof}) { 451 if ($self->{eof}) {
402 $self->_shutdown; 452 $self->_shutdown;
403 $self->{on_eof}($self); 453 $self->{on_eof}($self)
454 if $self->{on_eof};
404 } 455 }
405} 456}
406 457
407=item $handle->on_read ($cb) 458=item $handle->on_read ($cb)
408 459
442Append the given callback to the end of the queue (C<push_read>) or 493Append the given callback to the end of the queue (C<push_read>) or
443prepend it (C<unshift_read>). 494prepend it (C<unshift_read>).
444 495
445The callback is called each time some additional read data arrives. 496The callback is called each time some additional read data arrives.
446 497
447It must check wether enough data is in the read buffer already. 498It must check whether enough data is in the read buffer already.
448 499
449If not enough data is available, it must return the empty list or a false 500If not enough data is available, it must return the empty list or a false
450value, in which case it will be called repeatedly until enough data is 501value, in which case it will be called repeatedly until enough data is
451available (or an error condition is detected). 502available (or an error condition is detected).
452 503
531 my $self = shift; 582 my $self = shift;
532 my $cb = pop; 583 my $cb = pop;
533 my $eol = @_ ? shift : qr|(\015?\012)|; 584 my $eol = @_ ? shift : qr|(\015?\012)|;
534 my $pos; 585 my $pos;
535 586
536 $eol = qr|(\Q$eol\E)| unless ref $eol; 587 $eol = quotemeta $eol unless ref $eol;
537 $eol = qr|^(.*?)($eol)|; 588 $eol = qr|^(.*?)($eol)|s;
538 589
539 sub { 590 sub {
540 $_[0]{rbuf} =~ s/$eol// or return; 591 $_[0]{rbuf} =~ s/$eol// or return;
541 592
542 $cb->($_[0], $1, $2); 593 $cb->($_[0], $1, $2);
554 605
555=item $handle->stop_read 606=item $handle->stop_read
556 607
557=item $handle->start_read 608=item $handle->start_read
558 609
559In rare cases you actually do not want to read anything form the 610In rare cases you actually do not want to read anything from the
560socket. In this case you can call C<stop_read>. Neither C<on_read> no 611socket. In this case you can call C<stop_read>. Neither C<on_read> no
561any queued callbacks will be executed then. To start readign again, call 612any queued callbacks will be executed then. To start reading again, call
562C<start_read>. 613C<start_read>.
563 614
564=cut 615=cut
565 616
566sub stop_read { 617sub stop_read {
574 625
575 unless ($self->{rw} || $self->{eof}) { 626 unless ($self->{rw} || $self->{eof}) {
576 Scalar::Util::weaken $self; 627 Scalar::Util::weaken $self;
577 628
578 $self->{rw} = AnyEvent->io (fh => $self->{fh}, poll => "r", cb => sub { 629 $self->{rw} = AnyEvent->io (fh => $self->{fh}, poll => "r", cb => sub {
630 my $rbuf = $self->{filter_r} ? \my $buf : \$self->{rbuf};
579 my $len = sysread $self->{fh}, $self->{rbuf}, $self->{read_size} || 8192, length $self->{rbuf}; 631 my $len = sysread $self->{fh}, $$rbuf, $self->{read_size} || 8192, length $$rbuf;
580 632
581 if ($len > 0) { 633 if ($len > 0) {
582 if (defined $self->{rbuf_max}) { 634 $self->{filter_r}
583 if ($self->{rbuf_max} < length $self->{rbuf}) { 635 ? $self->{filter_r}->($self, $rbuf)
584 $! = &Errno::ENOSPC; return $self->error; 636 : $self->_drain_rbuf;
585 }
586 }
587 637
588 } elsif (defined $len) { 638 } elsif (defined $len) {
639 delete $self->{rw};
589 $self->{eof} = 1; 640 $self->{eof} = 1;
590 delete $self->{rw}; 641 $self->_drain_rbuf;
591 642
592 } elsif ($! != EAGAIN && $! != EINTR) { 643 } elsif ($! != EAGAIN && $! != EINTR) {
593 return $self->error; 644 return $self->error;
594 } 645 }
595
596 $self->_drain_rbuf;
597 }); 646 });
598 } 647 }
599} 648}
600 649
650sub _dotls {
651 my ($self) = @_;
652
653 if (length $self->{tls_wbuf}) {
654 while ((my $len = Net::SSLeay::write ($self->{tls}, $self->{tls_wbuf})) > 0) {
655 substr $self->{tls_wbuf}, 0, $len, "";
656 }
657 }
658
659 if (defined (my $buf = Net::SSLeay::BIO_read ($self->{tls_wbio}))) {
660 $self->{wbuf} .= $buf;
661 $self->_drain_wbuf;
662 }
663
664 while (defined (my $buf = Net::SSLeay::read ($self->{tls}))) {
665 $self->{rbuf} .= $buf;
666 $self->_drain_rbuf;
667 }
668
669 my $err = Net::SSLeay::get_error ($self->{tls}, -1);
670
671 if ($err!= Net::SSLeay::ERROR_WANT_READ ()) {
672 if ($err == Net::SSLeay::ERROR_SYSCALL ()) {
673 $self->error;
674 } elsif ($err == Net::SSLeay::ERROR_SSL ()) {
675 $! = &Errno::EIO;
676 $self->error;
677 }
678
679 # all others are fine for our purposes
680 }
681}
682
683=item $handle->starttls ($tls[, $tls_ctx])
684
685Instead of starting TLS negotiation immediately when the AnyEvent::Handle
686object is created, you can also do that at a later time by calling
687C<starttls>.
688
689The first argument is the same as the C<tls> constructor argument (either
690C<"connect">, C<"accept"> or an existing Net::SSLeay object).
691
692The second argument is the optional C<Net::SSLeay::CTX> object that is
693used when AnyEvent::Handle has to create its own TLS connection object.
694
695=cut
696
697# TODO: maybe document...
698sub starttls {
699 my ($self, $ssl, $ctx) = @_;
700
701 $self->stoptls;
702
703 if ($ssl eq "accept") {
704 $ssl = Net::SSLeay::new ($ctx || TLS_CTX ());
705 Net::SSLeay::set_accept_state ($ssl);
706 } elsif ($ssl eq "connect") {
707 $ssl = Net::SSLeay::new ($ctx || TLS_CTX ());
708 Net::SSLeay::set_connect_state ($ssl);
709 }
710
711 $self->{tls} = $ssl;
712
713 # basically, this is deep magic (because SSL_read should have the same issues)
714 # but the openssl maintainers basically said: "trust us, it just works".
715 # (unfortunately, we have to hardcode constants because the abysmally misdesigned
716 # and mismaintained ssleay-module doesn't even offer them).
717 Net::SSLeay::CTX_set_mode ($self->{tls},
718 (eval { Net::SSLeay::MODE_ENABLE_PARTIAL_WRITE () } || 1)
719 | (eval { Net::SSLeay::MODE_ACCEPT_MOVING_WRITE_BUFFER () } || 2));
720
721 $self->{tls_rbio} = Net::SSLeay::BIO_new (Net::SSLeay::BIO_s_mem ());
722 $self->{tls_wbio} = Net::SSLeay::BIO_new (Net::SSLeay::BIO_s_mem ());
723
724 Net::SSLeay::set_bio ($ssl, $self->{tls_rbio}, $self->{tls_wbio});
725
726 $self->{filter_w} = sub {
727 $_[0]{tls_wbuf} .= ${$_[1]};
728 &_dotls;
729 };
730 $self->{filter_r} = sub {
731 Net::SSLeay::BIO_write ($_[0]{tls_rbio}, ${$_[1]});
732 &_dotls;
733 };
734}
735
736=item $handle->stoptls
737
738Destroys the SSL connection, if any. Partial read or write data will be
739lost.
740
741=cut
742
743sub stoptls {
744 my ($self) = @_;
745
746 Net::SSLeay::free (delete $self->{tls}) if $self->{tls};
747 delete $self->{tls_rbio};
748 delete $self->{tls_wbio};
749 delete $self->{tls_wbuf};
750 delete $self->{filter_r};
751 delete $self->{filter_w};
752}
753
754sub DESTROY {
755 my $self = shift;
756
757 $self->stoptls;
758}
759
760=item AnyEvent::Handle::TLS_CTX
761
762This function creates and returns the Net::SSLeay::CTX object used by
763default for TLS mode.
764
765The context is created like this:
766
767 Net::SSLeay::load_error_strings;
768 Net::SSLeay::SSLeay_add_ssl_algorithms;
769 Net::SSLeay::randomize;
770
771 my $CTX = Net::SSLeay::CTX_new;
772
773 Net::SSLeay::CTX_set_options $CTX, Net::SSLeay::OP_ALL
774
775=cut
776
777our $TLS_CTX;
778
779sub TLS_CTX() {
780 $TLS_CTX || do {
781 require Net::SSLeay;
782
783 Net::SSLeay::load_error_strings ();
784 Net::SSLeay::SSLeay_add_ssl_algorithms ();
785 Net::SSLeay::randomize ();
786
787 $TLS_CTX = Net::SSLeay::CTX_new ();
788
789 Net::SSLeay::CTX_set_options ($TLS_CTX, Net::SSLeay::OP_ALL ());
790
791 $TLS_CTX
792 }
793}
794
601=back 795=back
602 796
603=head1 AUTHOR 797=head1 AUTHOR
604 798
605Robin Redeker C<< <elmex at ta-sa.org> >>, Marc Lehmann <schmorp@schmorp.de>. 799Robin Redeker C<< <elmex at ta-sa.org> >>, Marc Lehmann <schmorp@schmorp.de>.

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines