ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent/lib/AnyEvent/Handle.pm
(Generate patch)

Comparing AnyEvent/lib/AnyEvent/Handle.pm (file contents):
Revision 1.175 by root, Sat Aug 8 22:20:43 2009 UTC vs.
Revision 1.183 by root, Thu Sep 3 12:45:35 2009 UTC

1package AnyEvent::Handle;
2
3use Scalar::Util ();
4use Carp ();
5use Errno qw(EAGAIN EINTR);
6
7use AnyEvent (); BEGIN { AnyEvent::common_sense }
8use AnyEvent::Util qw(WSAEWOULDBLOCK);
9
10=head1 NAME 1=head1 NAME
11 2
12AnyEvent::Handle - non-blocking I/O on file handles via AnyEvent 3AnyEvent::Handle - non-blocking I/O on file handles via AnyEvent
13
14=cut
15
16our $VERSION = 4.91;
17 4
18=head1 SYNOPSIS 5=head1 SYNOPSIS
19 6
20 use AnyEvent; 7 use AnyEvent;
21 use AnyEvent::Handle; 8 use AnyEvent::Handle;
59C<on_error> callback. 46C<on_error> callback.
60 47
61All callbacks will be invoked with the handle object as their first 48All callbacks will be invoked with the handle object as their first
62argument. 49argument.
63 50
51=cut
52
53package AnyEvent::Handle;
54
55use Scalar::Util ();
56use List::Util ();
57use Carp ();
58use Errno qw(EAGAIN EINTR);
59
60use AnyEvent (); BEGIN { AnyEvent::common_sense }
61use AnyEvent::Util qw(WSAEWOULDBLOCK);
62
63our $VERSION = $AnyEvent::VERSION;
64
64=head1 METHODS 65=head1 METHODS
65 66
66=over 4 67=over 4
67 68
68=item $handle = B<new> AnyEvent::TLS fh => $filehandle, key => value... 69=item $handle = B<new> AnyEvent::TLS fh => $filehandle, key => value...
216memory and push it into the queue, but instead only read more data from 217memory and push it into the queue, but instead only read more data from
217the file when the write queue becomes empty. 218the file when the write queue becomes empty.
218 219
219=item timeout => $fractional_seconds 220=item timeout => $fractional_seconds
220 221
222=item rtimeout => $fractional_seconds
223
224=item wtimeout => $fractional_seconds
225
221If non-zero, then this enables an "inactivity" timeout: whenever this many 226If non-zero, then these enables an "inactivity" timeout: whenever this
222seconds pass without a successful read or write on the underlying file 227many seconds pass without a successful read or write on the underlying
223handle, the C<on_timeout> callback will be invoked (and if that one is 228file handle (or a call to C<timeout_reset>), the C<on_timeout> callback
224missing, a non-fatal C<ETIMEDOUT> error will be raised). 229will be invoked (and if that one is missing, a non-fatal C<ETIMEDOUT>
230error will be raised).
231
232There are three variants of the timeouts that work fully independent
233of each other, for both read and write, just read, and just write:
234C<timeout>, C<rtimeout> and C<wtimeout>, with corresponding callbacks
235C<on_timeout>, C<on_rtimeout> and C<on_wtimeout>, and reset functions
236C<timeout_reset>, C<rtimeout_reset>, and C<wtimeout_reset>.
225 237
226Note that timeout processing is also active when you currently do not have 238Note that timeout processing is also active when you currently do not have
227any outstanding read or write requests: If you plan to keep the connection 239any outstanding read or write requests: If you plan to keep the connection
228idle then you should disable the timout temporarily or ignore the timeout 240idle then you should disable the timout temporarily or ignore the timeout
229in the C<on_timeout> callback, in which case AnyEvent::Handle will simply 241in the C<on_timeout> callback, in which case AnyEvent::Handle will simply
273accomplishd by setting this option to a true value. 285accomplishd by setting this option to a true value.
274 286
275The default is your opertaing system's default behaviour (most likely 287The default is your opertaing system's default behaviour (most likely
276enabled), this option explicitly enables or disables it, if possible. 288enabled), this option explicitly enables or disables it, if possible.
277 289
290=item keepalive => <boolean>
291
292Enables (default disable) the SO_KEEPALIVE option on the stream socket:
293normally, TCP connections have no time-out once established, so TCP
294conenctions, once established, can stay alive forever even when the other
295side has long gone. TCP keepalives are a cheap way to take down long-lived
296TCP connections whent he other side becomes unreachable. While the default
297is OS-dependent, TCP keepalives usually kick in after around two hours,
298and, if the other side doesn't reply, take down the TCP connection some 10
299to 15 minutes later.
300
301It is harmless to specify this option for file handles that do not support
302keepalives, and enabling it on connections that are potentially long-lived
303is usually a good idea.
304
305=item oobinline => <boolean>
306
307BSD majorly fucked up the implementation of TCP urgent data. The result
308is that almost no OS implements TCP according to the specs, and every OS
309implements it slightly differently.
310
311If you want to handle TCP urgent data, then setting this flag (the default
312is enabled) gives you the most portable way of getting urgent data, by
313putting it into the stream.
314
315Since BSD emulation of OOB data on top of TCP's urgent data can have
316security implications, AnyEvent::Handle sets this flag automatically
317unless explicitly specified.
318
278=item read_size => <bytes> 319=item read_size => <bytes>
279 320
280The default read block size (the amount of bytes this module will 321The default read block size (the amount of bytes this module will
281try to read during each loop iteration, which affects memory 322try to read during each loop iteration, which affects memory
282requirements). Default: C<8192>. 323requirements). Default: C<8192>.
438 delete $self->{_skip_drain_rbuf}; 479 delete $self->{_skip_drain_rbuf};
439 $self->_start; 480 $self->_start;
440 481
441 $self->{on_connect} 482 $self->{on_connect}
442 and $self->{on_connect}($self, $host, $port, sub { 483 and $self->{on_connect}($self, $host, $port, sub {
443 delete @$self{qw(fh _tw _ww _rw _eof _queue rbuf _wbuf tls _tls_rbuf _tls_wbuf)}; 484 delete @$self{qw(fh _tw _rtw _wtw _ww _rw _eof _queue rbuf _wbuf tls _tls_rbuf _tls_wbuf)};
444 $self->{_skip_drain_rbuf} = 1; 485 $self->{_skip_drain_rbuf} = 1;
445 &$retry; 486 &$retry;
446 }); 487 });
447 488
448 } else { 489 } else {
474sub _start { 515sub _start {
475 my ($self) = @_; 516 my ($self) = @_;
476 517
477 AnyEvent::Util::fh_nonblocking $self->{fh}, 1; 518 AnyEvent::Util::fh_nonblocking $self->{fh}, 1;
478 519
520 $self->{_activity} =
521 $self->{_ractivity} =
479 $self->{_activity} = AE::now; 522 $self->{_wactivity} = AE::now;
480 $self->_timeout;
481 523
524 $self->timeout (delete $self->{timeout} ) if $self->{timeout};
525 $self->rtimeout (delete $self->{rtimeout} ) if $self->{rtimeout};
526 $self->wtimeout (delete $self->{wtimeout} ) if $self->{wtimeout};
527
482 $self->no_delay (delete $self->{no_delay}) if exists $self->{no_delay}; 528 $self->no_delay (delete $self->{no_delay} ) if exists $self->{no_delay} && $self->{no_delay};
529 $self->keepalive (delete $self->{keepalive}) if exists $self->{keepalive} && $self->{keepalive};
483 530
531 $self->oobinline (exists $self->{oobinline} ? delete $self->{oobinline} : 1);
532
484 $self->starttls (delete $self->{tls}, delete $self->{tls_ctx}) 533 $self->starttls (delete $self->{tls}, delete $self->{tls_ctx})
485 if $self->{tls}; 534 if $self->{tls};
486 535
487 $self->on_drain (delete $self->{on_drain}) if $self->{on_drain}; 536 $self->on_drain (delete $self->{on_drain}) if $self->{on_drain};
488 537
489 $self->start_read 538 $self->start_read
490 if $self->{on_read} || @{ $self->{_queue} }; 539 if $self->{on_read} || @{ $self->{_queue} };
491 540
492 $self->_drain_wbuf; 541 $self->_drain_wbuf;
493} 542}
494
495#sub _shutdown {
496# my ($self) = @_;
497#
498# delete @$self{qw(_tw _rw _ww fh wbuf on_read _queue)};
499# $self->{_eof} = 1; # tell starttls et. al to stop trying
500#
501# &_freetls;
502#}
503 543
504sub _error { 544sub _error {
505 my ($self, $errno, $fatal, $message) = @_; 545 my ($self, $errno, $fatal, $message) = @_;
506 546
507 $! = $errno; 547 $! = $errno;
544 $_[0]{on_eof} = $_[1]; 584 $_[0]{on_eof} = $_[1];
545} 585}
546 586
547=item $handle->on_timeout ($cb) 587=item $handle->on_timeout ($cb)
548 588
549Replace the current C<on_timeout> callback, or disables the callback (but 589=item $handle->on_rtimeout ($cb)
550not the timeout) if C<$cb> = C<undef>. See the C<timeout> constructor
551argument and method.
552 590
553=cut 591=item $handle->on_wtimeout ($cb)
554 592
555sub on_timeout { 593Replace the current C<on_timeout>, C<on_rtimeout> or C<on_wtimeout>
556 $_[0]{on_timeout} = $_[1]; 594callback, or disables the callback (but not the timeout) if C<$cb> =
557} 595C<undef>. See the C<timeout> constructor argument and method.
596
597=cut
598
599# see below
558 600
559=item $handle->autocork ($boolean) 601=item $handle->autocork ($boolean)
560 602
561Enables or disables the current autocork behaviour (see C<autocork> 603Enables or disables the current autocork behaviour (see C<autocork>
562constructor argument). Changes will only take effect on the next write. 604constructor argument). Changes will only take effect on the next write.
577sub no_delay { 619sub no_delay {
578 $_[0]{no_delay} = $_[1]; 620 $_[0]{no_delay} = $_[1];
579 621
580 eval { 622 eval {
581 local $SIG{__DIE__}; 623 local $SIG{__DIE__};
582 setsockopt $_[0]{fh}, &Socket::IPPROTO_TCP, &Socket::TCP_NODELAY, int $_[1] 624 setsockopt $_[0]{fh}, Socket::IPPROTO_TCP (), Socket::TCP_NODELAY (), int $_[1]
583 if $_[0]{fh}; 625 if $_[0]{fh};
584 }; 626 };
585} 627}
586 628
629=item $handle->keepalive ($boolean)
630
631Enables or disables the C<keepalive> setting (see constructor argument of
632the same name for details).
633
634=cut
635
636sub keepalive {
637 $_[0]{keepalive} = $_[1];
638
639 eval {
640 local $SIG{__DIE__};
641 setsockopt $_[0]{fh}, Socket::SOL_SOCKET (), Socket::SO_KEEPALIVE (), int $_[1]
642 if $_[0]{fh};
643 };
644}
645
646=item $handle->oobinline ($boolean)
647
648Enables or disables the C<oobinline> setting (see constructor argument of
649the same name for details).
650
651=cut
652
653sub oobinline {
654 $_[0]{oobinline} = $_[1];
655
656 eval {
657 local $SIG{__DIE__};
658 setsockopt $_[0]{fh}, Socket::SOL_SOCKET (), Socket::SO_OOBINLINE (), int $_[1]
659 if $_[0]{fh};
660 };
661}
662
663=item $handle->keepalive ($boolean)
664
665Enables or disables the C<keepalive> setting (see constructor argument of
666the same name for details).
667
668=cut
669
670sub keepalive {
671 $_[0]{keepalive} = $_[1];
672
673 eval {
674 local $SIG{__DIE__};
675 setsockopt $_[0]{fh}, Socket::SOL_SOCKET (), Socket::SO_KEEPALIVE (), int $_[1]
676 if $_[0]{fh};
677 };
678}
679
587=item $handle->on_starttls ($cb) 680=item $handle->on_starttls ($cb)
588 681
589Replace the current C<on_starttls> callback (see the C<on_starttls> constructor argument). 682Replace the current C<on_starttls> callback (see the C<on_starttls> constructor argument).
590 683
591=cut 684=cut
616 709
617############################################################################# 710#############################################################################
618 711
619=item $handle->timeout ($seconds) 712=item $handle->timeout ($seconds)
620 713
714=item $handle->rtimeout ($seconds)
715
716=item $handle->wtimeout ($seconds)
717
621Configures (or disables) the inactivity timeout. 718Configures (or disables) the inactivity timeout.
622 719
623=cut 720=item $handle->timeout_reset
624 721
625sub timeout { 722=item $handle->rtimeout_reset
723
724=item $handle->wtimeout_reset
725
726Reset the activity timeout, as if data was received or sent.
727
728These methods are cheap to call.
729
730=cut
731
732for my $dir ("", "r", "w") {
733 my $timeout = "${dir}timeout";
734 my $tw = "_${dir}tw";
735 my $on_timeout = "on_${dir}timeout";
736 my $activity = "_${dir}activity";
737 my $cb;
738
739 *$on_timeout = sub {
740 $_[0]{$on_timeout} = $_[1];
741 };
742
743 *$timeout = sub {
626 my ($self, $timeout) = @_; 744 my ($self, $new_value) = @_;
627 745
628 $self->{timeout} = $timeout; 746 $self->{$timeout} = $new_value;
629 delete $self->{_tw}; 747 delete $self->{$tw}; &$cb;
630 $self->_timeout; 748 };
631}
632 749
750 *{"${dir}timeout_reset"} = sub {
751 $_[0]{$activity} = AE::now;
752 };
753
754 # main workhorse:
633# reset the timeout watcher, as neccessary 755 # reset the timeout watcher, as neccessary
634# also check for time-outs 756 # also check for time-outs
635sub _timeout { 757 $cb = sub {
636 my ($self) = @_; 758 my ($self) = @_;
637 759
638 if ($self->{timeout} && $self->{fh}) { 760 if ($self->{$timeout} && $self->{fh}) {
639 my $NOW = AE::now; 761 my $NOW = AE::now;
640 762
641 # when would the timeout trigger? 763 # when would the timeout trigger?
642 my $after = $self->{_activity} + $self->{timeout} - $NOW; 764 my $after = $self->{$activity} + $self->{$timeout} - $NOW;
643 765
644 # now or in the past already? 766 # now or in the past already?
645 if ($after <= 0) { 767 if ($after <= 0) {
646 $self->{_activity} = $NOW; 768 $self->{$activity} = $NOW;
647 769
648 if ($self->{on_timeout}) { 770 if ($self->{$on_timeout}) {
649 $self->{on_timeout}($self); 771 $self->{$on_timeout}($self);
650 } else { 772 } else {
651 $self->_error (Errno::ETIMEDOUT); 773 $self->_error (Errno::ETIMEDOUT);
774 }
775
776 # callback could have changed timeout value, optimise
777 return unless $self->{$timeout};
778
779 # calculate new after
780 $after = $self->{$timeout};
652 } 781 }
653 782
654 # callback could have changed timeout value, optimise 783 Scalar::Util::weaken $self;
655 return unless $self->{timeout}; 784 return unless $self; # ->error could have destroyed $self
656 785
657 # calculate new after 786 $self->{$tw} ||= AE::timer $after, 0, sub {
658 $after = $self->{timeout}; 787 delete $self->{$tw};
788 $cb->($self);
789 };
790 } else {
791 delete $self->{$tw};
659 } 792 }
660
661 Scalar::Util::weaken $self;
662 return unless $self; # ->error could have destroyed $self
663
664 $self->{_tw} ||= AE::timer $after, 0, sub {
665 delete $self->{_tw};
666 $self->_timeout;
667 };
668 } else {
669 delete $self->{_tw};
670 } 793 }
671} 794}
672 795
673############################################################################# 796#############################################################################
674 797
722 my $len = syswrite $self->{fh}, $self->{wbuf}; 845 my $len = syswrite $self->{fh}, $self->{wbuf};
723 846
724 if (defined $len) { 847 if (defined $len) {
725 substr $self->{wbuf}, 0, $len, ""; 848 substr $self->{wbuf}, 0, $len, "";
726 849
727 $self->{_activity} = AE::now; 850 $self->{_activity} = $self->{_wactivity} = AE::now;
728 851
729 $self->{on_drain}($self) 852 $self->{on_drain}($self)
730 if $self->{low_water_mark} >= (length $self->{wbuf}) + (length $self->{_tls_wbuf}) 853 if $self->{low_water_mark} >= (length $self->{wbuf}) + (length $self->{_tls_wbuf})
731 && $self->{on_drain}; 854 && $self->{on_drain};
732 855
838Other languages could read single lines terminated by a newline and pass 961Other languages could read single lines terminated by a newline and pass
839this line into their JSON decoder of choice. 962this line into their JSON decoder of choice.
840 963
841=cut 964=cut
842 965
966sub json_coder() {
967 eval { require JSON::XS; JSON::XS->new->utf8 }
968 || do { require JSON; JSON->new->utf8 }
969}
970
843register_write_type json => sub { 971register_write_type json => sub {
844 my ($self, $ref) = @_; 972 my ($self, $ref) = @_;
845 973
846 require JSON; 974 my $json = $self->{json} ||= json_coder;
847 975
848 $self->{json} ? $self->{json}->encode ($ref) 976 $json->encode ($ref)
849 : JSON::encode_json ($ref)
850}; 977};
851 978
852=item storable => $reference 979=item storable => $reference
853 980
854Freezes the given reference using L<Storable> and writes it to the 981Freezes the given reference using L<Storable> and writes it to the
1147 1274
1148 $cb = ($RH{$type} or Carp::croak "unsupported type passed to AnyEvent::Handle::unshift_read") 1275 $cb = ($RH{$type} or Carp::croak "unsupported type passed to AnyEvent::Handle::unshift_read")
1149 ->($self, $cb, @_); 1276 ->($self, $cb, @_);
1150 } 1277 }
1151 1278
1152
1153 unshift @{ $self->{_queue} }, $cb; 1279 unshift @{ $self->{_queue} }, $cb;
1154 $self->_drain_rbuf; 1280 $self->_drain_rbuf;
1155} 1281}
1156 1282
1157=item $handle->push_read (type => @args, $cb) 1283=item $handle->push_read (type => @args, $cb)
1408=cut 1534=cut
1409 1535
1410register_read_type json => sub { 1536register_read_type json => sub {
1411 my ($self, $cb) = @_; 1537 my ($self, $cb) = @_;
1412 1538
1413 my $json = $self->{json} ||= 1539 my $json = $self->{json} ||= json_coder;
1414 eval { require JSON::XS; JSON::XS->new->utf8 }
1415 || do { require JSON; JSON->new->utf8 };
1416 1540
1417 my $data; 1541 my $data;
1418 my $rbuf = \$self->{rbuf}; 1542 my $rbuf = \$self->{rbuf};
1419 1543
1420 sub { 1544 sub {
1545 $self->{_rw} = AE::io $self->{fh}, 0, sub { 1669 $self->{_rw} = AE::io $self->{fh}, 0, sub {
1546 my $rbuf = \($self->{tls} ? my $buf : $self->{rbuf}); 1670 my $rbuf = \($self->{tls} ? my $buf : $self->{rbuf});
1547 my $len = sysread $self->{fh}, $$rbuf, $self->{read_size} || 8192, length $$rbuf; 1671 my $len = sysread $self->{fh}, $$rbuf, $self->{read_size} || 8192, length $$rbuf;
1548 1672
1549 if ($len > 0) { 1673 if ($len > 0) {
1550 $self->{_activity} = AE::now; 1674 $self->{_activity} = $self->{_ractivity} = AE::now;
1551 1675
1552 if ($self->{tls}) { 1676 if ($self->{tls}) {
1553 Net::SSLeay::BIO_write ($self->{_rbio}, $$rbuf); 1677 Net::SSLeay::BIO_write ($self->{_rbio}, $$rbuf);
1554 1678
1555 &_dotls ($self); 1679 &_dotls ($self);
1694 require Net::SSLeay; 1818 require Net::SSLeay;
1695 1819
1696 $ERROR_SYSCALL = Net::SSLeay::ERROR_SYSCALL (); 1820 $ERROR_SYSCALL = Net::SSLeay::ERROR_SYSCALL ();
1697 $ERROR_WANT_READ = Net::SSLeay::ERROR_WANT_READ (); 1821 $ERROR_WANT_READ = Net::SSLeay::ERROR_WANT_READ ();
1698 1822
1699 $tls = $self->{tls}; 1823 $tls = delete $self->{tls};
1700 $ctx = $self->{tls_ctx}; 1824 $ctx = $self->{tls_ctx};
1701 1825
1702 local $Carp::CarpLevel = 1; # skip ourselves when creating a new context or session 1826 local $Carp::CarpLevel = 1; # skip ourselves when creating a new context or session
1703 1827
1704 if ("HASH" eq ref $ctx) { 1828 if ("HASH" eq ref $ctx) {

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines