ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent/lib/AnyEvent/Handle.pm
(Generate patch)

Comparing AnyEvent/lib/AnyEvent/Handle.pm (file contents):
Revision 1.167 by root, Tue Jul 28 11:02:19 2009 UTC vs.
Revision 1.177 by root, Sun Aug 9 00:24:35 2009 UTC

1package AnyEvent::Handle;
2
3use Scalar::Util ();
4use Carp ();
5use Errno qw(EAGAIN EINTR);
6
7use AnyEvent (); BEGIN { AnyEvent::common_sense }
8use AnyEvent::Util qw(WSAEWOULDBLOCK);
9
10=head1 NAME 1=head1 NAME
11 2
12AnyEvent::Handle - non-blocking I/O on file handles via AnyEvent 3AnyEvent::Handle - non-blocking I/O on file handles via AnyEvent
13
14=cut
15
16our $VERSION = 4.881;
17 4
18=head1 SYNOPSIS 5=head1 SYNOPSIS
19 6
20 use AnyEvent; 7 use AnyEvent;
21 use AnyEvent::Handle; 8 use AnyEvent::Handle;
59C<on_error> callback. 46C<on_error> callback.
60 47
61All callbacks will be invoked with the handle object as their first 48All callbacks will be invoked with the handle object as their first
62argument. 49argument.
63 50
51=cut
52
53package AnyEvent::Handle;
54
55use Scalar::Util ();
56use List::Util ();
57use Carp ();
58use Errno qw(EAGAIN EINTR);
59
60use AnyEvent (); BEGIN { AnyEvent::common_sense }
61use AnyEvent::Util qw(WSAEWOULDBLOCK);
62
63our $VERSION = $AnyEvent::VERSION;
64
64=head1 METHODS 65=head1 METHODS
65 66
66=over 4 67=over 4
67 68
68=item $handle = B<new> AnyEvent::TLS fh => $filehandle, key => value... 69=item $handle = B<new> AnyEvent::TLS fh => $filehandle, key => value...
216memory and push it into the queue, but instead only read more data from 217memory and push it into the queue, but instead only read more data from
217the file when the write queue becomes empty. 218the file when the write queue becomes empty.
218 219
219=item timeout => $fractional_seconds 220=item timeout => $fractional_seconds
220 221
222=item rtimeout => $fractional_seconds
223
224=item wtimeout => $fractional_seconds
225
221If non-zero, then this enables an "inactivity" timeout: whenever this many 226If non-zero, then these enables an "inactivity" timeout: whenever this
222seconds pass without a successful read or write on the underlying file 227many seconds pass without a successful read or write on the underlying
223handle, the C<on_timeout> callback will be invoked (and if that one is 228file handle (or a call to C<timeout_reset>), the C<on_timeout> callback
224missing, a non-fatal C<ETIMEDOUT> error will be raised). 229will be invoked (and if that one is missing, a non-fatal C<ETIMEDOUT>
230error will be raised).
231
232There are three variants of the timeouts that work fully independent
233of each other, for both read and write, just read, and just write:
234C<timeout>, C<rtimeout> and C<wtimeout>, with corresponding callbacks
235C<on_timeout>, C<on_rtimeout> and C<on_wtimeout>, and reset functions
236C<timeout_reset>, C<rtimeout_reset>, and C<wtimeout_reset>.
225 237
226Note that timeout processing is also active when you currently do not have 238Note that timeout processing is also active when you currently do not have
227any outstanding read or write requests: If you plan to keep the connection 239any outstanding read or write requests: If you plan to keep the connection
228idle then you should disable the timout temporarily or ignore the timeout 240idle then you should disable the timout temporarily or ignore the timeout
229in the C<on_timeout> callback, in which case AnyEvent::Handle will simply 241in the C<on_timeout> callback, in which case AnyEvent::Handle will simply
474sub _start { 486sub _start {
475 my ($self) = @_; 487 my ($self) = @_;
476 488
477 AnyEvent::Util::fh_nonblocking $self->{fh}, 1; 489 AnyEvent::Util::fh_nonblocking $self->{fh}, 1;
478 490
491 $self->{_activity} =
492 $self->{_ractivity} =
479 $self->{_activity} = AnyEvent->now; 493 $self->{_wactivity} = AE::now;
480 $self->_timeout; 494
495 $self->timeout (delete $self->{timeout} ) if $self->{timeout};
496 $self->rtimeout (delete $self->{rtimeout}) if $self->{rtimeout};
497 $self->wtimeout (delete $self->{wtimeout}) if $self->{wtimeout};
481 498
482 $self->no_delay (delete $self->{no_delay}) if exists $self->{no_delay}; 499 $self->no_delay (delete $self->{no_delay}) if exists $self->{no_delay};
483 500
484 $self->starttls (delete $self->{tls}, delete $self->{tls_ctx}) 501 $self->starttls (delete $self->{tls}, delete $self->{tls_ctx})
485 if $self->{tls}; 502 if $self->{tls};
544 $_[0]{on_eof} = $_[1]; 561 $_[0]{on_eof} = $_[1];
545} 562}
546 563
547=item $handle->on_timeout ($cb) 564=item $handle->on_timeout ($cb)
548 565
549Replace the current C<on_timeout> callback, or disables the callback (but 566=item $handle->on_rtimeout ($cb)
550not the timeout) if C<$cb> = C<undef>. See the C<timeout> constructor
551argument and method.
552 567
553=cut 568=item $handle->on_wtimeout ($cb)
554 569
555sub on_timeout { 570Replace the current C<on_timeout>, C<on_rtimeout> or C<on_wtimeout>
556 $_[0]{on_timeout} = $_[1]; 571callback, or disables the callback (but not the timeout) if C<$cb> =
557} 572C<undef>. See the C<timeout> constructor argument and method.
573
574=cut
575
576# see below
558 577
559=item $handle->autocork ($boolean) 578=item $handle->autocork ($boolean)
560 579
561Enables or disables the current autocork behaviour (see C<autocork> 580Enables or disables the current autocork behaviour (see C<autocork>
562constructor argument). Changes will only take effect on the next write. 581constructor argument). Changes will only take effect on the next write.
602 621
603sub on_starttls { 622sub on_starttls {
604 $_[0]{on_stoptls} = $_[1]; 623 $_[0]{on_stoptls} = $_[1];
605} 624}
606 625
626=item $handle->rbuf_max ($max_octets)
627
628Configures the C<rbuf_max> setting (C<undef> disables it).
629
630=cut
631
632sub rbuf_max {
633 $_[0]{rbuf_max} = $_[1];
634}
635
607############################################################################# 636#############################################################################
608 637
609=item $handle->timeout ($seconds) 638=item $handle->timeout ($seconds)
610 639
640=item $handle->rtimeout ($seconds)
641
642=item $handle->wtimeout ($seconds)
643
611Configures (or disables) the inactivity timeout. 644Configures (or disables) the inactivity timeout.
612 645
613=cut 646=item $handle->timeout_reset
614 647
615sub timeout { 648=item $handle->rtimeout_reset
649
650=item $handle->wtimeout_reset
651
652Reset the activity timeout, as if data was received or sent.
653
654These methods are cheap to call.
655
656=cut
657
658for my $dir ("", "r", "w") {
659 my $timeout = "${dir}timeout";
660 my $tw = "_${dir}tw";
661 my $on_timeout = "on_${dir}timeout";
662 my $activity = "_${dir}activity";
663 my $cb;
664
665 *$on_timeout = sub {
666 $_[0]{$on_timeout} = $_[1];
667 };
668
669 *$timeout = sub {
616 my ($self, $timeout) = @_; 670 my ($self, $new_value) = @_;
617 671
618 $self->{timeout} = $timeout; 672 $self->{$timeout} = $new_value;
619 $self->_timeout; 673 delete $self->{$tw}; &$cb;
620} 674 };
621 675
676 *{"${dir}timeout_reset"} = sub {
677 $_[0]{$activity} = AE::now;
678 };
679
680 # main workhorse:
622# reset the timeout watcher, as neccessary 681 # reset the timeout watcher, as neccessary
623# also check for time-outs 682 # also check for time-outs
624sub _timeout { 683 $cb = sub {
625 my ($self) = @_; 684 my ($self) = @_;
626 685
627 if ($self->{timeout} && $self->{fh}) { 686 if ($self->{$timeout} && $self->{fh}) {
628 my $NOW = AnyEvent->now; 687 my $NOW = AE::now;
629 688
630 # when would the timeout trigger? 689 # when would the timeout trigger?
631 my $after = $self->{_activity} + $self->{timeout} - $NOW; 690 my $after = $self->{$activity} + $self->{$timeout} - $NOW;
632 691
633 # now or in the past already? 692 # now or in the past already?
634 if ($after <= 0) { 693 if ($after <= 0) {
635 $self->{_activity} = $NOW; 694 $self->{$activity} = $NOW;
636 695
637 if ($self->{on_timeout}) { 696 if ($self->{$on_timeout}) {
638 $self->{on_timeout}($self); 697 $self->{$on_timeout}($self);
639 } else { 698 } else {
640 $self->_error (Errno::ETIMEDOUT); 699 $self->_error (Errno::ETIMEDOUT);
700 }
701
702 # callback could have changed timeout value, optimise
703 return unless $self->{$timeout};
704
705 # calculate new after
706 $after = $self->{$timeout};
641 } 707 }
642 708
643 # callback could have changed timeout value, optimise 709 Scalar::Util::weaken $self;
644 return unless $self->{timeout}; 710 return unless $self; # ->error could have destroyed $self
645 711
646 # calculate new after 712 $self->{$tw} ||= AE::timer $after, 0, sub {
647 $after = $self->{timeout}; 713 delete $self->{$tw};
714 $cb->($self);
715 };
716 } else {
717 delete $self->{$tw};
648 } 718 }
649
650 Scalar::Util::weaken $self;
651 return unless $self; # ->error could have destroyed $self
652
653 $self->{_tw} ||= AnyEvent->timer (after => $after, cb => sub {
654 delete $self->{_tw};
655 $self->_timeout;
656 });
657 } else {
658 delete $self->{_tw};
659 } 719 }
660} 720}
661 721
662############################################################################# 722#############################################################################
663 723
711 my $len = syswrite $self->{fh}, $self->{wbuf}; 771 my $len = syswrite $self->{fh}, $self->{wbuf};
712 772
713 if (defined $len) { 773 if (defined $len) {
714 substr $self->{wbuf}, 0, $len, ""; 774 substr $self->{wbuf}, 0, $len, "";
715 775
716 $self->{_activity} = AnyEvent->now; 776 $self->{_activity} = $self->{_wactivity} = AE::now;
717 777
718 $self->{on_drain}($self) 778 $self->{on_drain}($self)
719 if $self->{low_water_mark} >= (length $self->{wbuf}) + (length $self->{_tls_wbuf}) 779 if $self->{low_water_mark} >= (length $self->{wbuf}) + (length $self->{_tls_wbuf})
720 && $self->{on_drain}; 780 && $self->{on_drain};
721 781
727 787
728 # try to write data immediately 788 # try to write data immediately
729 $cb->() unless $self->{autocork}; 789 $cb->() unless $self->{autocork};
730 790
731 # if still data left in wbuf, we need to poll 791 # if still data left in wbuf, we need to poll
732 $self->{_ww} = AnyEvent->io (fh => $self->{fh}, poll => "w", cb => $cb) 792 $self->{_ww} = AE::io $self->{fh}, 1, $cb
733 if length $self->{wbuf}; 793 if length $self->{wbuf};
734 }; 794 };
735} 795}
736 796
737our %WH; 797our %WH;
984 1044
985 # avoid recursion 1045 # avoid recursion
986 return if $self->{_skip_drain_rbuf}; 1046 return if $self->{_skip_drain_rbuf};
987 local $self->{_skip_drain_rbuf} = 1; 1047 local $self->{_skip_drain_rbuf} = 1;
988 1048
989 if (
990 defined $self->{rbuf_max}
991 && $self->{rbuf_max} < length $self->{rbuf}
992 ) {
993 $self->_error (Errno::ENOSPC, 1), return;
994 }
995
996 while () { 1049 while () {
997 # we need to use a separate tls read buffer, as we must not receive data while 1050 # we need to use a separate tls read buffer, as we must not receive data while
998 # we are draining the buffer, and this can only happen with TLS. 1051 # we are draining the buffer, and this can only happen with TLS.
999 $self->{rbuf} .= delete $self->{_tls_rbuf} 1052 $self->{rbuf} .= delete $self->{_tls_rbuf}
1000 if exists $self->{_tls_rbuf}; 1053 if exists $self->{_tls_rbuf};
1039 $self->{on_eof} 1092 $self->{on_eof}
1040 ? $self->{on_eof}($self) 1093 ? $self->{on_eof}($self)
1041 : $self->_error (0, 1, "Unexpected end-of-file"); 1094 : $self->_error (0, 1, "Unexpected end-of-file");
1042 1095
1043 return; 1096 return;
1097 }
1098
1099 if (
1100 defined $self->{rbuf_max}
1101 && $self->{rbuf_max} < length $self->{rbuf}
1102 ) {
1103 $self->_error (Errno::ENOSPC, 1), return;
1044 } 1104 }
1045 1105
1046 # may need to restart read watcher 1106 # may need to restart read watcher
1047 unless ($self->{_rw}) { 1107 unless ($self->{_rw}) {
1048 $self->start_read 1108 $self->start_read
1529 my ($self) = @_; 1589 my ($self) = @_;
1530 1590
1531 unless ($self->{_rw} || $self->{_eof}) { 1591 unless ($self->{_rw} || $self->{_eof}) {
1532 Scalar::Util::weaken $self; 1592 Scalar::Util::weaken $self;
1533 1593
1534 $self->{_rw} = AnyEvent->io (fh => $self->{fh}, poll => "r", cb => sub { 1594 $self->{_rw} = AE::io $self->{fh}, 0, sub {
1535 my $rbuf = \($self->{tls} ? my $buf : $self->{rbuf}); 1595 my $rbuf = \($self->{tls} ? my $buf : $self->{rbuf});
1536 my $len = sysread $self->{fh}, $$rbuf, $self->{read_size} || 8192, length $$rbuf; 1596 my $len = sysread $self->{fh}, $$rbuf, $self->{read_size} || 8192, length $$rbuf;
1537 1597
1538 if ($len > 0) { 1598 if ($len > 0) {
1539 $self->{_activity} = AnyEvent->now; 1599 $self->{_activity} = $self->{_ractivity} = AE::now;
1540 1600
1541 if ($self->{tls}) { 1601 if ($self->{tls}) {
1542 Net::SSLeay::BIO_write ($self->{_rbio}, $$rbuf); 1602 Net::SSLeay::BIO_write ($self->{_rbio}, $$rbuf);
1543 1603
1544 &_dotls ($self); 1604 &_dotls ($self);
1552 $self->_drain_rbuf; 1612 $self->_drain_rbuf;
1553 1613
1554 } elsif ($! != EAGAIN && $! != EINTR && $! != WSAEWOULDBLOCK) { 1614 } elsif ($! != EAGAIN && $! != EINTR && $! != WSAEWOULDBLOCK) {
1555 return $self->_error ($!, 1); 1615 return $self->_error ($!, 1);
1556 } 1616 }
1557 }); 1617 };
1558 } 1618 }
1559} 1619}
1560 1620
1561our $ERROR_SYSCALL; 1621our $ERROR_SYSCALL;
1562our $ERROR_WANT_READ; 1622our $ERROR_WANT_READ;
1722 Net::SSLeay::CTX_set_mode ($tls, 1|2); 1782 Net::SSLeay::CTX_set_mode ($tls, 1|2);
1723 1783
1724 $self->{_rbio} = Net::SSLeay::BIO_new (Net::SSLeay::BIO_s_mem ()); 1784 $self->{_rbio} = Net::SSLeay::BIO_new (Net::SSLeay::BIO_s_mem ());
1725 $self->{_wbio} = Net::SSLeay::BIO_new (Net::SSLeay::BIO_s_mem ()); 1785 $self->{_wbio} = Net::SSLeay::BIO_new (Net::SSLeay::BIO_s_mem ());
1726 1786
1787 Net::SSLeay::BIO_write ($self->{_rbio}, delete $self->{rbuf});
1788
1727 Net::SSLeay::set_bio ($tls, $self->{_rbio}, $self->{_wbio}); 1789 Net::SSLeay::set_bio ($tls, $self->{_rbio}, $self->{_wbio});
1728 1790
1729 $self->{_on_starttls} = sub { $_[0]{on_starttls}(@_) } 1791 $self->{_on_starttls} = sub { $_[0]{on_starttls}(@_) }
1730 if $self->{on_starttls}; 1792 if $self->{on_starttls};
1731 1793
1760 my ($self) = @_; 1822 my ($self) = @_;
1761 1823
1762 return unless $self->{tls}; 1824 return unless $self->{tls};
1763 1825
1764 $self->{tls_ctx}->_put_session (delete $self->{tls}) 1826 $self->{tls_ctx}->_put_session (delete $self->{tls})
1765 if ref $self->{tls}; 1827 if $self->{tls} > 0;
1766 1828
1767 delete @$self{qw(_rbio _wbio _tls_wbuf _on_starttls)}; 1829 delete @$self{qw(_rbio _wbio _tls_wbuf _on_starttls)};
1768} 1830}
1769 1831
1770sub DESTROY { 1832sub DESTROY {
1778 my $fh = delete $self->{fh}; 1840 my $fh = delete $self->{fh};
1779 my $wbuf = delete $self->{wbuf}; 1841 my $wbuf = delete $self->{wbuf};
1780 1842
1781 my @linger; 1843 my @linger;
1782 1844
1783 push @linger, AnyEvent->io (fh => $fh, poll => "w", cb => sub { 1845 push @linger, AE::io $fh, 1, sub {
1784 my $len = syswrite $fh, $wbuf, length $wbuf; 1846 my $len = syswrite $fh, $wbuf, length $wbuf;
1785 1847
1786 if ($len > 0) { 1848 if ($len > 0) {
1787 substr $wbuf, 0, $len, ""; 1849 substr $wbuf, 0, $len, "";
1788 } else { 1850 } else {
1789 @linger = (); # end 1851 @linger = (); # end
1790 } 1852 }
1791 }); 1853 };
1792 push @linger, AnyEvent->timer (after => $linger, cb => sub { 1854 push @linger, AE::timer $linger, 0, sub {
1793 @linger = (); 1855 @linger = ();
1794 }); 1856 };
1795 } 1857 }
1796} 1858}
1797 1859
1798=item $handle->destroy 1860=item $handle->destroy
1799 1861

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines