ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent/lib/AnyEvent/Handle.pm
(Generate patch)

Comparing AnyEvent/lib/AnyEvent/Handle.pm (file contents):
Revision 1.167 by root, Tue Jul 28 11:02:19 2009 UTC vs.
Revision 1.176 by root, Sun Aug 9 00:20:35 2009 UTC

1package AnyEvent::Handle;
2
3use Scalar::Util ();
4use Carp ();
5use Errno qw(EAGAIN EINTR);
6
7use AnyEvent (); BEGIN { AnyEvent::common_sense }
8use AnyEvent::Util qw(WSAEWOULDBLOCK);
9
10=head1 NAME 1=head1 NAME
11 2
12AnyEvent::Handle - non-blocking I/O on file handles via AnyEvent 3AnyEvent::Handle - non-blocking I/O on file handles via AnyEvent
13 4
14=cut 5=cut
15 6
16our $VERSION = 4.881; 7our $VERSION = 4.92;
17 8
18=head1 SYNOPSIS 9=head1 SYNOPSIS
19 10
20 use AnyEvent; 11 use AnyEvent;
21 use AnyEvent::Handle; 12 use AnyEvent::Handle;
59C<on_error> callback. 50C<on_error> callback.
60 51
61All callbacks will be invoked with the handle object as their first 52All callbacks will be invoked with the handle object as their first
62argument. 53argument.
63 54
55=cut
56
57package AnyEvent::Handle;
58
59use Scalar::Util ();
60use List::Util ();
61use Carp ();
62use Errno qw(EAGAIN EINTR);
63
64use AnyEvent (); BEGIN { AnyEvent::common_sense }
65use AnyEvent::Util qw(WSAEWOULDBLOCK);
66
64=head1 METHODS 67=head1 METHODS
65 68
66=over 4 69=over 4
67 70
68=item $handle = B<new> AnyEvent::TLS fh => $filehandle, key => value... 71=item $handle = B<new> AnyEvent::TLS fh => $filehandle, key => value...
216memory and push it into the queue, but instead only read more data from 219memory and push it into the queue, but instead only read more data from
217the file when the write queue becomes empty. 220the file when the write queue becomes empty.
218 221
219=item timeout => $fractional_seconds 222=item timeout => $fractional_seconds
220 223
224=item rtimeout => $fractional_seconds
225
226=item wtimeout => $fractional_seconds
227
221If non-zero, then this enables an "inactivity" timeout: whenever this many 228If non-zero, then these enables an "inactivity" timeout: whenever this
222seconds pass without a successful read or write on the underlying file 229many seconds pass without a successful read or write on the underlying
223handle, the C<on_timeout> callback will be invoked (and if that one is 230file handle (or a call to C<timeout_reset>), the C<on_timeout> callback
224missing, a non-fatal C<ETIMEDOUT> error will be raised). 231will be invoked (and if that one is missing, a non-fatal C<ETIMEDOUT>
232error will be raised).
233
234There are three variants of the timeouts that work fully independent
235of each other, for both read and write, just read, and just write:
236C<timeout>, C<rtimeout> and C<wtimeout>, with corresponding callbacks
237C<on_timeout>, C<on_rtimeout> and C<on_wtimeout>, and reset functions
238C<timeout_reset>, C<rtimeout_reset>, and C<wtimeout_reset>.
225 239
226Note that timeout processing is also active when you currently do not have 240Note that timeout processing is also active when you currently do not have
227any outstanding read or write requests: If you plan to keep the connection 241any outstanding read or write requests: If you plan to keep the connection
228idle then you should disable the timout temporarily or ignore the timeout 242idle then you should disable the timout temporarily or ignore the timeout
229in the C<on_timeout> callback, in which case AnyEvent::Handle will simply 243in the C<on_timeout> callback, in which case AnyEvent::Handle will simply
474sub _start { 488sub _start {
475 my ($self) = @_; 489 my ($self) = @_;
476 490
477 AnyEvent::Util::fh_nonblocking $self->{fh}, 1; 491 AnyEvent::Util::fh_nonblocking $self->{fh}, 1;
478 492
493 $self->{_activity} =
494 $self->{_ractivity} =
479 $self->{_activity} = AnyEvent->now; 495 $self->{_wactivity} = AE::now;
480 $self->_timeout; 496
497 $self->timeout (delete $self->{timeout} ) if $self->{timeout};
498 $self->rtimeout (delete $self->{rtimeout}) if $self->{rtimeout};
499 $self->wtimeout (delete $self->{wtimeout}) if $self->{wtimeout};
481 500
482 $self->no_delay (delete $self->{no_delay}) if exists $self->{no_delay}; 501 $self->no_delay (delete $self->{no_delay}) if exists $self->{no_delay};
483 502
484 $self->starttls (delete $self->{tls}, delete $self->{tls_ctx}) 503 $self->starttls (delete $self->{tls}, delete $self->{tls_ctx})
485 if $self->{tls}; 504 if $self->{tls};
544 $_[0]{on_eof} = $_[1]; 563 $_[0]{on_eof} = $_[1];
545} 564}
546 565
547=item $handle->on_timeout ($cb) 566=item $handle->on_timeout ($cb)
548 567
549Replace the current C<on_timeout> callback, or disables the callback (but 568=item $handle->on_rtimeout ($cb)
550not the timeout) if C<$cb> = C<undef>. See the C<timeout> constructor
551argument and method.
552 569
553=cut 570=item $handle->on_wtimeout ($cb)
554 571
555sub on_timeout { 572Replace the current C<on_timeout>, C<on_rtimeout> or C<on_wtimeout>
556 $_[0]{on_timeout} = $_[1]; 573callback, or disables the callback (but not the timeout) if C<$cb> =
557} 574C<undef>. See the C<timeout> constructor argument and method.
575
576=cut
577
578# see below
558 579
559=item $handle->autocork ($boolean) 580=item $handle->autocork ($boolean)
560 581
561Enables or disables the current autocork behaviour (see C<autocork> 582Enables or disables the current autocork behaviour (see C<autocork>
562constructor argument). Changes will only take effect on the next write. 583constructor argument). Changes will only take effect on the next write.
602 623
603sub on_starttls { 624sub on_starttls {
604 $_[0]{on_stoptls} = $_[1]; 625 $_[0]{on_stoptls} = $_[1];
605} 626}
606 627
628=item $handle->rbuf_max ($max_octets)
629
630Configures the C<rbuf_max> setting (C<undef> disables it).
631
632=cut
633
634sub rbuf_max {
635 $_[0]{rbuf_max} = $_[1];
636}
637
607############################################################################# 638#############################################################################
608 639
609=item $handle->timeout ($seconds) 640=item $handle->timeout ($seconds)
610 641
642=item $handle->rtimeout ($seconds)
643
644=item $handle->wtimeout ($seconds)
645
611Configures (or disables) the inactivity timeout. 646Configures (or disables) the inactivity timeout.
612 647
613=cut 648=item $handle->timeout_reset
614 649
615sub timeout { 650=item $handle->rtimeout_reset
651
652=item $handle->wtimeout_reset
653
654Reset the activity timeout, as if data was received or sent.
655
656These methods are cheap to call.
657
658=cut
659
660for my $dir ("", "r", "w") {
661 my $timeout = "${dir}timeout";
662 my $tw = "_${dir}tw";
663 my $on_timeout = "on_${dir}timeout";
664 my $activity = "_${dir}activity";
665 my $cb;
666
667 *$on_timeout = sub {
668 $_[0]{$on_timeout} = $_[1];
669 };
670
671 *$timeout = sub {
616 my ($self, $timeout) = @_; 672 my ($self, $new_value) = @_;
617 673
618 $self->{timeout} = $timeout; 674 $self->{$timeout} = $new_value;
619 $self->_timeout; 675 delete $self->{$tw}; &$cb;
620} 676 };
621 677
678 *{"${dir}timeout_reset"} = sub {
679 $_[0]{$activity} = AE::now;
680 };
681
682 # main workhorse:
622# reset the timeout watcher, as neccessary 683 # reset the timeout watcher, as neccessary
623# also check for time-outs 684 # also check for time-outs
624sub _timeout { 685 $cb = sub {
625 my ($self) = @_; 686 my ($self) = @_;
626 687
627 if ($self->{timeout} && $self->{fh}) { 688 if ($self->{$timeout} && $self->{fh}) {
628 my $NOW = AnyEvent->now; 689 my $NOW = AE::now;
629 690
630 # when would the timeout trigger? 691 # when would the timeout trigger?
631 my $after = $self->{_activity} + $self->{timeout} - $NOW; 692 my $after = $self->{$activity} + $self->{$timeout} - $NOW;
632 693
633 # now or in the past already? 694 # now or in the past already?
634 if ($after <= 0) { 695 if ($after <= 0) {
635 $self->{_activity} = $NOW; 696 $self->{$activity} = $NOW;
636 697
637 if ($self->{on_timeout}) { 698 if ($self->{$on_timeout}) {
638 $self->{on_timeout}($self); 699 $self->{$on_timeout}($self);
639 } else { 700 } else {
640 $self->_error (Errno::ETIMEDOUT); 701 $self->_error (Errno::ETIMEDOUT);
702 }
703
704 # callback could have changed timeout value, optimise
705 return unless $self->{$timeout};
706
707 # calculate new after
708 $after = $self->{$timeout};
641 } 709 }
642 710
643 # callback could have changed timeout value, optimise 711 Scalar::Util::weaken $self;
644 return unless $self->{timeout}; 712 return unless $self; # ->error could have destroyed $self
645 713
646 # calculate new after 714 $self->{$tw} ||= AE::timer $after, 0, sub {
647 $after = $self->{timeout}; 715 delete $self->{$tw};
716 $cb->($self);
717 };
718 } else {
719 delete $self->{$tw};
648 } 720 }
649
650 Scalar::Util::weaken $self;
651 return unless $self; # ->error could have destroyed $self
652
653 $self->{_tw} ||= AnyEvent->timer (after => $after, cb => sub {
654 delete $self->{_tw};
655 $self->_timeout;
656 });
657 } else {
658 delete $self->{_tw};
659 } 721 }
660} 722}
661 723
662############################################################################# 724#############################################################################
663 725
711 my $len = syswrite $self->{fh}, $self->{wbuf}; 773 my $len = syswrite $self->{fh}, $self->{wbuf};
712 774
713 if (defined $len) { 775 if (defined $len) {
714 substr $self->{wbuf}, 0, $len, ""; 776 substr $self->{wbuf}, 0, $len, "";
715 777
716 $self->{_activity} = AnyEvent->now; 778 $self->{_activity} = $self->{_wactivity} = AE::now;
717 779
718 $self->{on_drain}($self) 780 $self->{on_drain}($self)
719 if $self->{low_water_mark} >= (length $self->{wbuf}) + (length $self->{_tls_wbuf}) 781 if $self->{low_water_mark} >= (length $self->{wbuf}) + (length $self->{_tls_wbuf})
720 && $self->{on_drain}; 782 && $self->{on_drain};
721 783
727 789
728 # try to write data immediately 790 # try to write data immediately
729 $cb->() unless $self->{autocork}; 791 $cb->() unless $self->{autocork};
730 792
731 # if still data left in wbuf, we need to poll 793 # if still data left in wbuf, we need to poll
732 $self->{_ww} = AnyEvent->io (fh => $self->{fh}, poll => "w", cb => $cb) 794 $self->{_ww} = AE::io $self->{fh}, 1, $cb
733 if length $self->{wbuf}; 795 if length $self->{wbuf};
734 }; 796 };
735} 797}
736 798
737our %WH; 799our %WH;
984 1046
985 # avoid recursion 1047 # avoid recursion
986 return if $self->{_skip_drain_rbuf}; 1048 return if $self->{_skip_drain_rbuf};
987 local $self->{_skip_drain_rbuf} = 1; 1049 local $self->{_skip_drain_rbuf} = 1;
988 1050
989 if (
990 defined $self->{rbuf_max}
991 && $self->{rbuf_max} < length $self->{rbuf}
992 ) {
993 $self->_error (Errno::ENOSPC, 1), return;
994 }
995
996 while () { 1051 while () {
997 # we need to use a separate tls read buffer, as we must not receive data while 1052 # we need to use a separate tls read buffer, as we must not receive data while
998 # we are draining the buffer, and this can only happen with TLS. 1053 # we are draining the buffer, and this can only happen with TLS.
999 $self->{rbuf} .= delete $self->{_tls_rbuf} 1054 $self->{rbuf} .= delete $self->{_tls_rbuf}
1000 if exists $self->{_tls_rbuf}; 1055 if exists $self->{_tls_rbuf};
1039 $self->{on_eof} 1094 $self->{on_eof}
1040 ? $self->{on_eof}($self) 1095 ? $self->{on_eof}($self)
1041 : $self->_error (0, 1, "Unexpected end-of-file"); 1096 : $self->_error (0, 1, "Unexpected end-of-file");
1042 1097
1043 return; 1098 return;
1099 }
1100
1101 if (
1102 defined $self->{rbuf_max}
1103 && $self->{rbuf_max} < length $self->{rbuf}
1104 ) {
1105 $self->_error (Errno::ENOSPC, 1), return;
1044 } 1106 }
1045 1107
1046 # may need to restart read watcher 1108 # may need to restart read watcher
1047 unless ($self->{_rw}) { 1109 unless ($self->{_rw}) {
1048 $self->start_read 1110 $self->start_read
1529 my ($self) = @_; 1591 my ($self) = @_;
1530 1592
1531 unless ($self->{_rw} || $self->{_eof}) { 1593 unless ($self->{_rw} || $self->{_eof}) {
1532 Scalar::Util::weaken $self; 1594 Scalar::Util::weaken $self;
1533 1595
1534 $self->{_rw} = AnyEvent->io (fh => $self->{fh}, poll => "r", cb => sub { 1596 $self->{_rw} = AE::io $self->{fh}, 0, sub {
1535 my $rbuf = \($self->{tls} ? my $buf : $self->{rbuf}); 1597 my $rbuf = \($self->{tls} ? my $buf : $self->{rbuf});
1536 my $len = sysread $self->{fh}, $$rbuf, $self->{read_size} || 8192, length $$rbuf; 1598 my $len = sysread $self->{fh}, $$rbuf, $self->{read_size} || 8192, length $$rbuf;
1537 1599
1538 if ($len > 0) { 1600 if ($len > 0) {
1539 $self->{_activity} = AnyEvent->now; 1601 $self->{_activity} = $self->{_ractivity} = AE::now;
1540 1602
1541 if ($self->{tls}) { 1603 if ($self->{tls}) {
1542 Net::SSLeay::BIO_write ($self->{_rbio}, $$rbuf); 1604 Net::SSLeay::BIO_write ($self->{_rbio}, $$rbuf);
1543 1605
1544 &_dotls ($self); 1606 &_dotls ($self);
1552 $self->_drain_rbuf; 1614 $self->_drain_rbuf;
1553 1615
1554 } elsif ($! != EAGAIN && $! != EINTR && $! != WSAEWOULDBLOCK) { 1616 } elsif ($! != EAGAIN && $! != EINTR && $! != WSAEWOULDBLOCK) {
1555 return $self->_error ($!, 1); 1617 return $self->_error ($!, 1);
1556 } 1618 }
1557 }); 1619 };
1558 } 1620 }
1559} 1621}
1560 1622
1561our $ERROR_SYSCALL; 1623our $ERROR_SYSCALL;
1562our $ERROR_WANT_READ; 1624our $ERROR_WANT_READ;
1722 Net::SSLeay::CTX_set_mode ($tls, 1|2); 1784 Net::SSLeay::CTX_set_mode ($tls, 1|2);
1723 1785
1724 $self->{_rbio} = Net::SSLeay::BIO_new (Net::SSLeay::BIO_s_mem ()); 1786 $self->{_rbio} = Net::SSLeay::BIO_new (Net::SSLeay::BIO_s_mem ());
1725 $self->{_wbio} = Net::SSLeay::BIO_new (Net::SSLeay::BIO_s_mem ()); 1787 $self->{_wbio} = Net::SSLeay::BIO_new (Net::SSLeay::BIO_s_mem ());
1726 1788
1789 Net::SSLeay::BIO_write ($self->{_rbio}, delete $self->{rbuf});
1790
1727 Net::SSLeay::set_bio ($tls, $self->{_rbio}, $self->{_wbio}); 1791 Net::SSLeay::set_bio ($tls, $self->{_rbio}, $self->{_wbio});
1728 1792
1729 $self->{_on_starttls} = sub { $_[0]{on_starttls}(@_) } 1793 $self->{_on_starttls} = sub { $_[0]{on_starttls}(@_) }
1730 if $self->{on_starttls}; 1794 if $self->{on_starttls};
1731 1795
1760 my ($self) = @_; 1824 my ($self) = @_;
1761 1825
1762 return unless $self->{tls}; 1826 return unless $self->{tls};
1763 1827
1764 $self->{tls_ctx}->_put_session (delete $self->{tls}) 1828 $self->{tls_ctx}->_put_session (delete $self->{tls})
1765 if ref $self->{tls}; 1829 if $self->{tls} > 0;
1766 1830
1767 delete @$self{qw(_rbio _wbio _tls_wbuf _on_starttls)}; 1831 delete @$self{qw(_rbio _wbio _tls_wbuf _on_starttls)};
1768} 1832}
1769 1833
1770sub DESTROY { 1834sub DESTROY {
1778 my $fh = delete $self->{fh}; 1842 my $fh = delete $self->{fh};
1779 my $wbuf = delete $self->{wbuf}; 1843 my $wbuf = delete $self->{wbuf};
1780 1844
1781 my @linger; 1845 my @linger;
1782 1846
1783 push @linger, AnyEvent->io (fh => $fh, poll => "w", cb => sub { 1847 push @linger, AE::io $fh, 1, sub {
1784 my $len = syswrite $fh, $wbuf, length $wbuf; 1848 my $len = syswrite $fh, $wbuf, length $wbuf;
1785 1849
1786 if ($len > 0) { 1850 if ($len > 0) {
1787 substr $wbuf, 0, $len, ""; 1851 substr $wbuf, 0, $len, "";
1788 } else { 1852 } else {
1789 @linger = (); # end 1853 @linger = (); # end
1790 } 1854 }
1791 }); 1855 };
1792 push @linger, AnyEvent->timer (after => $linger, cb => sub { 1856 push @linger, AE::timer $linger, 0, sub {
1793 @linger = (); 1857 @linger = ();
1794 }); 1858 };
1795 } 1859 }
1796} 1860}
1797 1861
1798=item $handle->destroy 1862=item $handle->destroy
1799 1863

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines