ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent/lib/AnyEvent/Handle.pm
(Generate patch)

Comparing AnyEvent/lib/AnyEvent/Handle.pm (file contents):
Revision 1.9 by root, Fri May 2 16:07:46 2008 UTC vs.
Revision 1.27 by root, Sat May 24 15:26:04 2008 UTC

10use Fcntl (); 10use Fcntl ();
11use Errno qw/EAGAIN EINTR/; 11use Errno qw/EAGAIN EINTR/;
12 12
13=head1 NAME 13=head1 NAME
14 14
15AnyEvent::Handle - non-blocking I/O on filehandles via AnyEvent 15AnyEvent::Handle - non-blocking I/O on file handles via AnyEvent
16 16
17=cut 17This module is experimental.
18 18
19=cut
20
19our $VERSION = '0.02'; 21our $VERSION = '0.04';
20 22
21=head1 SYNOPSIS 23=head1 SYNOPSIS
22 24
23 use AnyEvent; 25 use AnyEvent;
24 use AnyEvent::Handle; 26 use AnyEvent::Handle;
43 $cv->wait; 45 $cv->wait;
44 46
45=head1 DESCRIPTION 47=head1 DESCRIPTION
46 48
47This module is a helper module to make it easier to do event-based I/O on 49This module is a helper module to make it easier to do event-based I/O on
48filehandles (and sockets, see L<AnyEvent::Socket> for an easy way to make 50filehandles. For utility functions for doing non-blocking connects and accepts
49non-blocking resolves and connects). 51on sockets see L<AnyEvent::Util>.
50 52
51In the following, when the documentation refers to of "bytes" then this 53In the following, when the documentation refers to of "bytes" then this
52means characters. As sysread and syswrite are used for all I/O, their 54means characters. As sysread and syswrite are used for all I/O, their
53treatment of characters applies to this module as well. 55treatment of characters applies to this module as well.
54 56
70The filehandle this L<AnyEvent::Handle> object will operate on. 72The filehandle this L<AnyEvent::Handle> object will operate on.
71 73
72NOTE: The filehandle will be set to non-blocking (using 74NOTE: The filehandle will be set to non-blocking (using
73AnyEvent::Util::fh_nonblocking). 75AnyEvent::Util::fh_nonblocking).
74 76
77=item on_eof => $cb->($self)
78
79Set the callback to be called on EOF.
80
81While not mandatory, it is highly recommended to set an eof callback,
82otherwise you might end up with a closed socket while you are still
83waiting for data.
84
75=item on_error => $cb->($self) [MANDATORY] 85=item on_error => $cb->($self)
76 86
77This is the fatal error callback, that is called when a fatal error ocurs, 87This is the fatal error callback, that is called when, well, a fatal error
78such as not being able to resolve the hostname, failure to connect or a 88occurs, such as not being able to resolve the hostname, failure to connect
79read error. 89or a read error.
80 90
81The object will not be in a usable state when this callback has been 91The object will not be in a usable state when this callback has been
82called. 92called.
83 93
84On callback entrance, the value of C<$!> contains the opertaing system 94On callback entrance, the value of C<$!> contains the operating system
85error (or C<ENOSPC> or C<EPIPE>). 95error (or C<ENOSPC> or C<EPIPE>).
86 96
87=item on_eof => $cb->($self) [MANDATORY] 97While not mandatory, it is I<highly> recommended to set this callback, as
88 98you will not be notified of errors otherwise. The default simply calls
89Set the callback to be called on EOF. 99die.
90 100
91=item on_read => $cb->($self) 101=item on_read => $cb->($self)
92 102
93This sets the default read callback, which is called when data arrives 103This sets the default read callback, which is called when data arrives
94and no read request is in the queue. If the read callback is C<undef> 104and no read request is in the queue.
95or has never been set, than AnyEvent::Handle will cease reading from the
96filehandle.
97 105
98To access (and remove data from) the read buffer, use the C<< ->rbuf >> 106To access (and remove data from) the read buffer, use the C<< ->rbuf >>
99method or acces sthe C<$self->{rbuf}> member directly. 107method or access the C<$self->{rbuf}> member directly.
100 108
101When an EOF condition is detected then AnyEvent::Handle will first try to 109When an EOF condition is detected then AnyEvent::Handle will first try to
102feed all the remaining data to the queued callbacks and C<on_read> before 110feed all the remaining data to the queued callbacks and C<on_read> before
103calling the C<on_eof> callback. If no progress can be made, then a fatal 111calling the C<on_eof> callback. If no progress can be made, then a fatal
104error will be raised (with C<$!> set to C<EPIPE>). 112error will be raised (with C<$!> set to C<EPIPE>).
131 139
132Sets the amount of bytes (default: C<0>) that make up an "empty" write 140Sets the amount of bytes (default: C<0>) that make up an "empty" write
133buffer: If the write reaches this size or gets even samller it is 141buffer: If the write reaches this size or gets even samller it is
134considered empty. 142considered empty.
135 143
144=item tls => "accept" | "connect" | Net::SSLeay::SSL object
145
146When this parameter is given, it enables TLS (SSL) mode, that means it
147will start making tls handshake and will transparently encrypt/decrypt
148data.
149
150TLS mode requires Net::SSLeay to be installed (it will be loaded
151automatically when you try to create a TLS handle).
152
153For the TLS server side, use C<accept>, and for the TLS client side of a
154connection, use C<connect> mode.
155
156You can also provide your own TLS connection object, but you have
157to make sure that you call either C<Net::SSLeay::set_connect_state>
158or C<Net::SSLeay::set_accept_state> on it before you pass it to
159AnyEvent::Handle.
160
161See the C<starttls> method if you need to start TLs negotiation later.
162
163=item tls_ctx => $ssl_ctx
164
165Use the given Net::SSLeay::CTX object to create the new TLS connection
166(unless a connection object was specified directly). If this parameter is
167missing, then AnyEvent::Handle will use C<AnyEvent::Handle::TLS_CTX>.
168
136=back 169=back
137 170
138=cut 171=cut
139 172
140sub new { 173sub new {
144 177
145 $self->{fh} or Carp::croak "mandatory argument fh is missing"; 178 $self->{fh} or Carp::croak "mandatory argument fh is missing";
146 179
147 AnyEvent::Util::fh_nonblocking $self->{fh}, 1; 180 AnyEvent::Util::fh_nonblocking $self->{fh}, 1;
148 181
149 $self->on_error ((delete $self->{on_error}) or Carp::croak "mandatory argument on_error is missing"); 182 if ($self->{tls}) {
150 $self->on_eof ((delete $self->{on_eof} ) or Carp::croak "mandatory argument on_eof is missing"); 183 require Net::SSLeay;
184 $self->starttls (delete $self->{tls}, delete $self->{tls_ctx});
185 }
151 186
187 $self->on_eof (delete $self->{on_eof} ) if $self->{on_eof};
188 $self->on_error (delete $self->{on_error}) if $self->{on_error};
152 $self->on_drain (delete $self->{on_drain}) if $self->{on_drain}; 189 $self->on_drain (delete $self->{on_drain}) if $self->{on_drain};
153 $self->on_read (delete $self->{on_read} ) if $self->{on_read}; 190 $self->on_read (delete $self->{on_read} ) if $self->{on_read};
191
192 $self->start_read;
154 193
155 $self 194 $self
156} 195}
157 196
158sub _shutdown { 197sub _shutdown {
169 { 208 {
170 local $!; 209 local $!;
171 $self->_shutdown; 210 $self->_shutdown;
172 } 211 }
173 212
213 if ($self->{on_error}) {
174 $self->{on_error}($self); 214 $self->{on_error}($self);
215 } else {
216 die "AnyEvent::Handle uncaught fatal error: $!";
217 }
175} 218}
176 219
177=item $fh = $handle->fh 220=item $fh = $handle->fh
178 221
179This method returns the filehandle of the L<AnyEvent::Handle> object. 222This method returns the file handle of the L<AnyEvent::Handle> object.
180 223
181=cut 224=cut
182 225
183sub fh { $_[0]->{fh} } 226sub fh { $_[0]->{fh} }
184 227
212for reading. 255for reading.
213 256
214The write queue is very simple: you can add data to its end, and 257The write queue is very simple: you can add data to its end, and
215AnyEvent::Handle will automatically try to get rid of it for you. 258AnyEvent::Handle will automatically try to get rid of it for you.
216 259
217When data could be writtena nd the write buffer is shorter then the low 260When data could be written and the write buffer is shorter then the low
218water mark, the C<on_drain> callback will be invoked. 261water mark, the C<on_drain> callback will be invoked.
219 262
220=over 4 263=over 4
221 264
222=item $handle->on_drain ($cb) 265=item $handle->on_drain ($cb)
241want (only limited by the available memory), as C<AnyEvent::Handle> 284want (only limited by the available memory), as C<AnyEvent::Handle>
242buffers it independently of the kernel. 285buffers it independently of the kernel.
243 286
244=cut 287=cut
245 288
246sub push_write { 289sub _drain_wbuf {
247 my ($self, $data) = @_; 290 my ($self) = @_;
248
249 $self->{wbuf} .= $data;
250 291
251 unless ($self->{ww}) { 292 unless ($self->{ww}) {
252 Scalar::Util::weaken $self; 293 Scalar::Util::weaken $self;
253 my $cb = sub { 294 my $cb = sub {
254 my $len = syswrite $self->{fh}, $self->{wbuf}; 295 my $len = syswrite $self->{fh}, $self->{wbuf};
255 296
256 if ($len > 0) { 297 if ($len > 0) {
257 substr $self->{wbuf}, 0, $len, ""; 298 substr $self->{wbuf}, 0, $len, "";
258
259 299
260 $self->{on_drain}($self) 300 $self->{on_drain}($self)
261 if $self->{low_water_mark} >= length $self->{wbuf} 301 if $self->{low_water_mark} >= length $self->{wbuf}
262 && $self->{on_drain}; 302 && $self->{on_drain};
263 303
269 309
270 $self->{ww} = AnyEvent->io (fh => $self->{fh}, poll => "w", cb => $cb); 310 $self->{ww} = AnyEvent->io (fh => $self->{fh}, poll => "w", cb => $cb);
271 311
272 $cb->($self); 312 $cb->($self);
273 }; 313 };
314}
315
316sub push_write {
317 my $self = shift;
318
319 if ($self->{filter_w}) {
320 $self->{filter_w}->($self, \$_[0]);
321 } else {
322 $self->{wbuf} .= $_[0];
323 $self->_drain_wbuf;
324 }
274} 325}
275 326
276############################################################################# 327#############################################################################
277 328
278=back 329=back
349 ... 400 ...
350 }); 401 });
351 402
352=over 4 403=over 4
353 404
405=cut
406
354sub _drain_rbuf { 407sub _drain_rbuf {
355 my ($self) = @_; 408 my ($self) = @_;
356 409
410 if (
411 defined $self->{rbuf_max}
412 && $self->{rbuf_max} < length $self->{rbuf}
413 ) {
414 $! = &Errno::ENOSPC; return $self->error;
415 }
416
357 return if exists $self->{in_drain}; 417 return if $self->{in_drain};
358 local $self->{in_drain} = 1; 418 local $self->{in_drain} = 1;
359 419
360 while (my $len = length $self->{rbuf}) { 420 while (my $len = length $self->{rbuf}) {
361 no strict 'refs'; 421 no strict 'refs';
362 if (@{ $self->{queue} }) { 422 if (my $cb = shift @{ $self->{queue} }) {
363 if ($self->{queue}[0]($self)) { 423 if (!$cb->($self)) {
364 shift @{ $self->{queue} };
365 } elsif ($self->{eof}) { 424 if ($self->{eof}) {
366 # no progress can be made (not enough data and no data forthcoming) 425 # no progress can be made (not enough data and no data forthcoming)
367 $! = &Errno::EPIPE; return $self->error; 426 $! = &Errno::EPIPE; return $self->error;
368 } else { 427 }
428
429 unshift @{ $self->{queue} }, $cb;
369 return; 430 return;
370 } 431 }
371 } elsif ($self->{on_read}) { 432 } elsif ($self->{on_read}) {
372 $self->{on_read}($self); 433 $self->{on_read}($self);
373 434
387 } 448 }
388 } 449 }
389 450
390 if ($self->{eof}) { 451 if ($self->{eof}) {
391 $self->_shutdown; 452 $self->_shutdown;
392 $self->{on_eof}($self); 453 $self->{on_eof}($self)
454 if $self->{on_eof};
393 } 455 }
394} 456}
395 457
396=item $handle->on_read ($cb) 458=item $handle->on_read ($cb)
397 459
403 465
404sub on_read { 466sub on_read {
405 my ($self, $cb) = @_; 467 my ($self, $cb) = @_;
406 468
407 $self->{on_read} = $cb; 469 $self->{on_read} = $cb;
408
409 unless ($self->{rw} || $self->{eof}) {
410 Scalar::Util::weaken $self;
411
412 $self->{rw} = AnyEvent->io (fh => $self->{fh}, poll => "r", cb => sub {
413 my $len = sysread $self->{fh}, $self->{rbuf}, $self->{read_size} || 8192, length $self->{rbuf};
414
415 if ($len > 0) {
416 if (exists $self->{rbuf_max}) {
417 if ($self->{rbuf_max} < length $self->{rbuf}) {
418 $! = &Errno::ENOSPC; return $self->error;
419 }
420 }
421
422 } elsif (defined $len) {
423 $self->{eof} = 1;
424 delete $self->{rw};
425
426 } elsif ($! != EAGAIN && $! != EINTR) {
427 return $self->error;
428 }
429
430 $self->_drain_rbuf;
431 });
432 }
433} 470}
434 471
435=item $handle->rbuf 472=item $handle->rbuf
436 473
437Returns the read buffer (as a modifiable lvalue). 474Returns the read buffer (as a modifiable lvalue).
456Append the given callback to the end of the queue (C<push_read>) or 493Append the given callback to the end of the queue (C<push_read>) or
457prepend it (C<unshift_read>). 494prepend it (C<unshift_read>).
458 495
459The callback is called each time some additional read data arrives. 496The callback is called each time some additional read data arrives.
460 497
461It must check wether enough data is in the read buffer already. 498It must check whether enough data is in the read buffer already.
462 499
463If not enough data is available, it must return the empty list or a false 500If not enough data is available, it must return the empty list or a false
464value, in which case it will be called repeatedly until enough data is 501value, in which case it will be called repeatedly until enough data is
465available (or an error condition is detected). 502available (or an error condition is detected).
466 503
495these C<$len> bytes will be passed to the callback. 532these C<$len> bytes will be passed to the callback.
496 533
497=cut 534=cut
498 535
499sub _read_chunk($$) { 536sub _read_chunk($$) {
500 my ($len, $cb) = @_; 537 my ($self, $len, $cb) = @_;
501 538
502 sub { 539 sub {
503 $len <= length $_[0]{rbuf} or return; 540 $len <= length $_[0]{rbuf} or return;
504 $cb->($_[0], substr $_[0]{rbuf}, 0, $len, ""); 541 $cb->($_[0], substr $_[0]{rbuf}, 0, $len, "");
505 1 542 1
506 } 543 }
507} 544}
508 545
509sub push_read_chunk { 546sub push_read_chunk {
510 my ($self, $len, $cb) = @_;
511
512 $self->push_read (_read_chunk $len, $cb); 547 $_[0]->push_read (&_read_chunk);
513} 548}
514 549
515 550
516sub unshift_read_chunk { 551sub unshift_read_chunk {
517 my ($self, $len, $cb) = @_;
518
519 $self->unshift_read (_read_chunk $len, $cb); 552 $_[0]->unshift_read (&_read_chunk);
520} 553}
521 554
522=item $handle->push_read_line ([$eol, ]$cb->($self, $line, $eol)) 555=item $handle->push_read_line ([$eol, ]$cb->($self, $line, $eol))
523 556
524=item $handle->unshift_read_line ([$eol, ]$cb->($self, $line, $eol)) 557=item $handle->unshift_read_line ([$eol, ]$cb->($self, $line, $eol))
544not marked by the end of line marker. 577not marked by the end of line marker.
545 578
546=cut 579=cut
547 580
548sub _read_line($$) { 581sub _read_line($$) {
582 my $self = shift;
549 my $cb = pop; 583 my $cb = pop;
550 my $eol = @_ ? shift : qr|(\015?\012)|; 584 my $eol = @_ ? shift : qr|(\015?\012)|;
551 my $pos; 585 my $pos;
552 586
553 $eol = qr|(\Q$eol\E)| unless ref $eol; 587 $eol = quotemeta $eol unless ref $eol;
554 $eol = qr|^(.*?)($eol)|; 588 $eol = qr|^(.*?)($eol)|s;
555 589
556 sub { 590 sub {
557 $_[0]{rbuf} =~ s/$eol// or return; 591 $_[0]{rbuf} =~ s/$eol// or return;
558 592
559 $cb->($1, $2); 593 $cb->($_[0], $1, $2);
560 1 594 1
561 } 595 }
562} 596}
563 597
564sub push_read_line { 598sub push_read_line {
599 $_[0]->push_read (&_read_line);
600}
601
602sub unshift_read_line {
603 $_[0]->unshift_read (&_read_line);
604}
605
606=item $handle->stop_read
607
608=item $handle->start_read
609
610In rare cases you actually do not want to read anything from the
611socket. In this case you can call C<stop_read>. Neither C<on_read> no
612any queued callbacks will be executed then. To start reading again, call
613C<start_read>.
614
615=cut
616
617sub stop_read {
618 my ($self) = @_;
619
620 delete $self->{rw};
621}
622
623sub start_read {
624 my ($self) = @_;
625
626 unless ($self->{rw} || $self->{eof}) {
627 Scalar::Util::weaken $self;
628
629 $self->{rw} = AnyEvent->io (fh => $self->{fh}, poll => "r", cb => sub {
630 my $rbuf = $self->{filter_r} ? \my $buf : \$self->{rbuf};
631 my $len = sysread $self->{fh}, $$rbuf, $self->{read_size} || 8192, length $$rbuf;
632
633 if ($len > 0) {
634 $self->{filter_r}
635 ? $self->{filter_r}->($self, $rbuf)
636 : $self->_drain_rbuf;
637
638 } elsif (defined $len) {
639 delete $self->{rw};
640 $self->{eof} = 1;
641 $self->_drain_rbuf;
642
643 } elsif ($! != EAGAIN && $! != EINTR) {
644 return $self->error;
645 }
646 });
647 }
648}
649
650sub _dotls {
651 my ($self) = @_;
652
653 if (length $self->{tls_wbuf}) {
654 while ((my $len = Net::SSLeay::write ($self->{tls}, $self->{tls_wbuf})) > 0) {
655 substr $self->{tls_wbuf}, 0, $len, "";
656 }
657 }
658
659 if (defined (my $buf = Net::SSLeay::BIO_read ($self->{tls_wbio}))) {
660 $self->{wbuf} .= $buf;
661 $self->_drain_wbuf;
662 }
663
664 while (defined (my $buf = Net::SSLeay::read ($self->{tls}))) {
665 $self->{rbuf} .= $buf;
666 $self->_drain_rbuf;
667 }
668
669 my $err = Net::SSLeay::get_error ($self->{tls}, -1);
670
671 if ($err!= Net::SSLeay::ERROR_WANT_READ ()) {
672 if ($err == Net::SSLeay::ERROR_SYSCALL ()) {
673 $self->error;
674 } elsif ($err == Net::SSLeay::ERROR_SSL ()) {
675 $! = &Errno::EIO;
676 $self->error;
677 }
678
679 # all others are fine for our purposes
680 }
681}
682
683=item $handle->starttls ($tls[, $tls_ctx])
684
685Instead of starting TLS negotiation immediately when the AnyEvent::Handle
686object is created, you can also do that at a later time by calling
687C<starttls>.
688
689The first argument is the same as the C<tls> constructor argument (either
690C<"connect">, C<"accept"> or an existing Net::SSLeay object).
691
692The second argument is the optional C<Net::SSLeay::CTX> object that is
693used when AnyEvent::Handle has to create its own TLS connection object.
694
695=cut
696
697# TODO: maybe document...
698sub starttls {
699 my ($self, $ssl, $ctx) = @_;
700
701 $self->stoptls;
702
703 if ($ssl eq "accept") {
704 $ssl = Net::SSLeay::new ($ctx || TLS_CTX ());
705 Net::SSLeay::set_accept_state ($ssl);
706 } elsif ($ssl eq "connect") {
707 $ssl = Net::SSLeay::new ($ctx || TLS_CTX ());
708 Net::SSLeay::set_connect_state ($ssl);
709 }
710
711 $self->{tls} = $ssl;
712
713 # basically, this is deep magic (because SSL_read should have the same issues)
714 # but the openssl maintainers basically said: "trust us, it just works".
715 # (unfortunately, we have to hardcode constants because the abysmally misdesigned
716 # and mismaintained ssleay-module doesn't even offer them).
717 # http://www.mail-archive.com/openssl-dev@openssl.org/msg22420.html
718 Net::SSLeay::CTX_set_mode ($self->{tls},
719 (eval { Net::SSLeay::MODE_ENABLE_PARTIAL_WRITE () } || 1)
720 | (eval { Net::SSLeay::MODE_ACCEPT_MOVING_WRITE_BUFFER () } || 2));
721
722 $self->{tls_rbio} = Net::SSLeay::BIO_new (Net::SSLeay::BIO_s_mem ());
723 $self->{tls_wbio} = Net::SSLeay::BIO_new (Net::SSLeay::BIO_s_mem ());
724
725 Net::SSLeay::set_bio ($ssl, $self->{tls_rbio}, $self->{tls_wbio});
726
727 $self->{filter_w} = sub {
728 $_[0]{tls_wbuf} .= ${$_[1]};
729 &_dotls;
730 };
731 $self->{filter_r} = sub {
732 Net::SSLeay::BIO_write ($_[0]{tls_rbio}, ${$_[1]});
733 &_dotls;
734 };
735}
736
737=item $handle->stoptls
738
739Destroys the SSL connection, if any. Partial read or write data will be
740lost.
741
742=cut
743
744sub stoptls {
745 my ($self) = @_;
746
747 Net::SSLeay::free (delete $self->{tls}) if $self->{tls};
748 delete $self->{tls_rbio};
749 delete $self->{tls_wbio};
750 delete $self->{tls_wbuf};
751 delete $self->{filter_r};
752 delete $self->{filter_w};
753}
754
755sub DESTROY {
565 my $self = shift; 756 my $self = shift;
566 757
567 $self->push_read (&_read_line); 758 $self->stoptls;
568} 759}
569 760
570sub unshift_read_line { 761=item AnyEvent::Handle::TLS_CTX
571 my $self = shift;
572 762
573 $self->unshift_read (&_read_line); 763This function creates and returns the Net::SSLeay::CTX object used by
764default for TLS mode.
765
766The context is created like this:
767
768 Net::SSLeay::load_error_strings;
769 Net::SSLeay::SSLeay_add_ssl_algorithms;
770 Net::SSLeay::randomize;
771
772 my $CTX = Net::SSLeay::CTX_new;
773
774 Net::SSLeay::CTX_set_options $CTX, Net::SSLeay::OP_ALL
775
776=cut
777
778our $TLS_CTX;
779
780sub TLS_CTX() {
781 $TLS_CTX || do {
782 require Net::SSLeay;
783
784 Net::SSLeay::load_error_strings ();
785 Net::SSLeay::SSLeay_add_ssl_algorithms ();
786 Net::SSLeay::randomize ();
787
788 $TLS_CTX = Net::SSLeay::CTX_new ();
789
790 Net::SSLeay::CTX_set_options ($TLS_CTX, Net::SSLeay::OP_ALL ());
791
792 $TLS_CTX
793 }
574} 794}
575 795
576=back 796=back
577 797
578=head1 AUTHOR 798=head1 AUTHOR

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines