ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent/lib/AnyEvent/Handle.pm
(Generate patch)

Comparing AnyEvent/lib/AnyEvent/Handle.pm (file contents):
Revision 1.170 by root, Sat Aug 1 09:14:54 2009 UTC vs.
Revision 1.178 by root, Tue Aug 11 01:15:17 2009 UTC

1package AnyEvent::Handle;
2
3use Scalar::Util ();
4use Carp ();
5use Errno qw(EAGAIN EINTR);
6
7use AnyEvent (); BEGIN { AnyEvent::common_sense }
8use AnyEvent::Util qw(WSAEWOULDBLOCK);
9
10=head1 NAME 1=head1 NAME
11 2
12AnyEvent::Handle - non-blocking I/O on file handles via AnyEvent 3AnyEvent::Handle - non-blocking I/O on file handles via AnyEvent
13
14=cut
15
16our $VERSION = 4.9;
17 4
18=head1 SYNOPSIS 5=head1 SYNOPSIS
19 6
20 use AnyEvent; 7 use AnyEvent;
21 use AnyEvent::Handle; 8 use AnyEvent::Handle;
59C<on_error> callback. 46C<on_error> callback.
60 47
61All callbacks will be invoked with the handle object as their first 48All callbacks will be invoked with the handle object as their first
62argument. 49argument.
63 50
51=cut
52
53package AnyEvent::Handle;
54
55use Scalar::Util ();
56use List::Util ();
57use Carp ();
58use Errno qw(EAGAIN EINTR);
59
60use AnyEvent (); BEGIN { AnyEvent::common_sense }
61use AnyEvent::Util qw(WSAEWOULDBLOCK);
62
63our $VERSION = $AnyEvent::VERSION;
64
64=head1 METHODS 65=head1 METHODS
65 66
66=over 4 67=over 4
67 68
68=item $handle = B<new> AnyEvent::TLS fh => $filehandle, key => value... 69=item $handle = B<new> AnyEvent::TLS fh => $filehandle, key => value...
216memory and push it into the queue, but instead only read more data from 217memory and push it into the queue, but instead only read more data from
217the file when the write queue becomes empty. 218the file when the write queue becomes empty.
218 219
219=item timeout => $fractional_seconds 220=item timeout => $fractional_seconds
220 221
222=item rtimeout => $fractional_seconds
223
224=item wtimeout => $fractional_seconds
225
221If non-zero, then this enables an "inactivity" timeout: whenever this many 226If non-zero, then these enables an "inactivity" timeout: whenever this
222seconds pass without a successful read or write on the underlying file 227many seconds pass without a successful read or write on the underlying
223handle, the C<on_timeout> callback will be invoked (and if that one is 228file handle (or a call to C<timeout_reset>), the C<on_timeout> callback
224missing, a non-fatal C<ETIMEDOUT> error will be raised). 229will be invoked (and if that one is missing, a non-fatal C<ETIMEDOUT>
230error will be raised).
231
232There are three variants of the timeouts that work fully independent
233of each other, for both read and write, just read, and just write:
234C<timeout>, C<rtimeout> and C<wtimeout>, with corresponding callbacks
235C<on_timeout>, C<on_rtimeout> and C<on_wtimeout>, and reset functions
236C<timeout_reset>, C<rtimeout_reset>, and C<wtimeout_reset>.
225 237
226Note that timeout processing is also active when you currently do not have 238Note that timeout processing is also active when you currently do not have
227any outstanding read or write requests: If you plan to keep the connection 239any outstanding read or write requests: If you plan to keep the connection
228idle then you should disable the timout temporarily or ignore the timeout 240idle then you should disable the timout temporarily or ignore the timeout
229in the C<on_timeout> callback, in which case AnyEvent::Handle will simply 241in the C<on_timeout> callback, in which case AnyEvent::Handle will simply
438 delete $self->{_skip_drain_rbuf}; 450 delete $self->{_skip_drain_rbuf};
439 $self->_start; 451 $self->_start;
440 452
441 $self->{on_connect} 453 $self->{on_connect}
442 and $self->{on_connect}($self, $host, $port, sub { 454 and $self->{on_connect}($self, $host, $port, sub {
443 delete @$self{qw(fh _tw _ww _rw _eof _queue rbuf _wbuf tls _tls_rbuf _tls_wbuf)}; 455 delete @$self{qw(fh _tw _rtw _wtw _ww _rw _eof _queue rbuf _wbuf tls _tls_rbuf _tls_wbuf)};
444 $self->{_skip_drain_rbuf} = 1; 456 $self->{_skip_drain_rbuf} = 1;
445 &$retry; 457 &$retry;
446 }); 458 });
447 459
448 } else { 460 } else {
474sub _start { 486sub _start {
475 my ($self) = @_; 487 my ($self) = @_;
476 488
477 AnyEvent::Util::fh_nonblocking $self->{fh}, 1; 489 AnyEvent::Util::fh_nonblocking $self->{fh}, 1;
478 490
491 $self->{_activity} =
492 $self->{_ractivity} =
479 $self->{_activity} = AnyEvent->now; 493 $self->{_wactivity} = AE::now;
480 $self->_timeout; 494
495 $self->timeout (delete $self->{timeout} ) if $self->{timeout};
496 $self->rtimeout (delete $self->{rtimeout}) if $self->{rtimeout};
497 $self->wtimeout (delete $self->{wtimeout}) if $self->{wtimeout};
481 498
482 $self->no_delay (delete $self->{no_delay}) if exists $self->{no_delay}; 499 $self->no_delay (delete $self->{no_delay}) if exists $self->{no_delay};
483 500
484 $self->starttls (delete $self->{tls}, delete $self->{tls_ctx}) 501 $self->starttls (delete $self->{tls}, delete $self->{tls_ctx})
485 if $self->{tls}; 502 if $self->{tls};
489 $self->start_read 506 $self->start_read
490 if $self->{on_read} || @{ $self->{_queue} }; 507 if $self->{on_read} || @{ $self->{_queue} };
491 508
492 $self->_drain_wbuf; 509 $self->_drain_wbuf;
493} 510}
494
495#sub _shutdown {
496# my ($self) = @_;
497#
498# delete @$self{qw(_tw _rw _ww fh wbuf on_read _queue)};
499# $self->{_eof} = 1; # tell starttls et. al to stop trying
500#
501# &_freetls;
502#}
503 511
504sub _error { 512sub _error {
505 my ($self, $errno, $fatal, $message) = @_; 513 my ($self, $errno, $fatal, $message) = @_;
506 514
507 $! = $errno; 515 $! = $errno;
544 $_[0]{on_eof} = $_[1]; 552 $_[0]{on_eof} = $_[1];
545} 553}
546 554
547=item $handle->on_timeout ($cb) 555=item $handle->on_timeout ($cb)
548 556
549Replace the current C<on_timeout> callback, or disables the callback (but 557=item $handle->on_rtimeout ($cb)
550not the timeout) if C<$cb> = C<undef>. See the C<timeout> constructor
551argument and method.
552 558
553=cut 559=item $handle->on_wtimeout ($cb)
554 560
555sub on_timeout { 561Replace the current C<on_timeout>, C<on_rtimeout> or C<on_wtimeout>
556 $_[0]{on_timeout} = $_[1]; 562callback, or disables the callback (but not the timeout) if C<$cb> =
557} 563C<undef>. See the C<timeout> constructor argument and method.
564
565=cut
566
567# see below
558 568
559=item $handle->autocork ($boolean) 569=item $handle->autocork ($boolean)
560 570
561Enables or disables the current autocork behaviour (see C<autocork> 571Enables or disables the current autocork behaviour (see C<autocork>
562constructor argument). Changes will only take effect on the next write. 572constructor argument). Changes will only take effect on the next write.
616 626
617############################################################################# 627#############################################################################
618 628
619=item $handle->timeout ($seconds) 629=item $handle->timeout ($seconds)
620 630
631=item $handle->rtimeout ($seconds)
632
633=item $handle->wtimeout ($seconds)
634
621Configures (or disables) the inactivity timeout. 635Configures (or disables) the inactivity timeout.
622 636
623=cut 637=item $handle->timeout_reset
624 638
625sub timeout { 639=item $handle->rtimeout_reset
640
641=item $handle->wtimeout_reset
642
643Reset the activity timeout, as if data was received or sent.
644
645These methods are cheap to call.
646
647=cut
648
649for my $dir ("", "r", "w") {
650 my $timeout = "${dir}timeout";
651 my $tw = "_${dir}tw";
652 my $on_timeout = "on_${dir}timeout";
653 my $activity = "_${dir}activity";
654 my $cb;
655
656 *$on_timeout = sub {
657 $_[0]{$on_timeout} = $_[1];
658 };
659
660 *$timeout = sub {
626 my ($self, $timeout) = @_; 661 my ($self, $new_value) = @_;
627 662
628 $self->{timeout} = $timeout; 663 $self->{$timeout} = $new_value;
629 $self->_timeout; 664 delete $self->{$tw}; &$cb;
630} 665 };
631 666
667 *{"${dir}timeout_reset"} = sub {
668 $_[0]{$activity} = AE::now;
669 };
670
671 # main workhorse:
632# reset the timeout watcher, as neccessary 672 # reset the timeout watcher, as neccessary
633# also check for time-outs 673 # also check for time-outs
634sub _timeout { 674 $cb = sub {
635 my ($self) = @_; 675 my ($self) = @_;
636 676
637 if ($self->{timeout} && $self->{fh}) { 677 if ($self->{$timeout} && $self->{fh}) {
638 my $NOW = AnyEvent->now; 678 my $NOW = AE::now;
639 679
640 # when would the timeout trigger? 680 # when would the timeout trigger?
641 my $after = $self->{_activity} + $self->{timeout} - $NOW; 681 my $after = $self->{$activity} + $self->{$timeout} - $NOW;
642 682
643 # now or in the past already? 683 # now or in the past already?
644 if ($after <= 0) { 684 if ($after <= 0) {
645 $self->{_activity} = $NOW; 685 $self->{$activity} = $NOW;
646 686
647 if ($self->{on_timeout}) { 687 if ($self->{$on_timeout}) {
648 $self->{on_timeout}($self); 688 $self->{$on_timeout}($self);
649 } else { 689 } else {
650 $self->_error (Errno::ETIMEDOUT); 690 $self->_error (Errno::ETIMEDOUT);
691 }
692
693 # callback could have changed timeout value, optimise
694 return unless $self->{$timeout};
695
696 # calculate new after
697 $after = $self->{$timeout};
651 } 698 }
652 699
653 # callback could have changed timeout value, optimise 700 Scalar::Util::weaken $self;
654 return unless $self->{timeout}; 701 return unless $self; # ->error could have destroyed $self
655 702
656 # calculate new after 703 $self->{$tw} ||= AE::timer $after, 0, sub {
657 $after = $self->{timeout}; 704 delete $self->{$tw};
705 $cb->($self);
706 };
707 } else {
708 delete $self->{$tw};
658 } 709 }
659
660 Scalar::Util::weaken $self;
661 return unless $self; # ->error could have destroyed $self
662
663 $self->{_tw} ||= AnyEvent->timer (after => $after, cb => sub {
664 delete $self->{_tw};
665 $self->_timeout;
666 });
667 } else {
668 delete $self->{_tw};
669 } 710 }
670} 711}
671 712
672############################################################################# 713#############################################################################
673 714
721 my $len = syswrite $self->{fh}, $self->{wbuf}; 762 my $len = syswrite $self->{fh}, $self->{wbuf};
722 763
723 if (defined $len) { 764 if (defined $len) {
724 substr $self->{wbuf}, 0, $len, ""; 765 substr $self->{wbuf}, 0, $len, "";
725 766
726 $self->{_activity} = AnyEvent->now; 767 $self->{_activity} = $self->{_wactivity} = AE::now;
727 768
728 $self->{on_drain}($self) 769 $self->{on_drain}($self)
729 if $self->{low_water_mark} >= (length $self->{wbuf}) + (length $self->{_tls_wbuf}) 770 if $self->{low_water_mark} >= (length $self->{wbuf}) + (length $self->{_tls_wbuf})
730 && $self->{on_drain}; 771 && $self->{on_drain};
731 772
737 778
738 # try to write data immediately 779 # try to write data immediately
739 $cb->() unless $self->{autocork}; 780 $cb->() unless $self->{autocork};
740 781
741 # if still data left in wbuf, we need to poll 782 # if still data left in wbuf, we need to poll
742 $self->{_ww} = AnyEvent->io (fh => $self->{fh}, poll => "w", cb => $cb) 783 $self->{_ww} = AE::io $self->{fh}, 1, $cb
743 if length $self->{wbuf}; 784 if length $self->{wbuf};
744 }; 785 };
745} 786}
746 787
747our %WH; 788our %WH;
1539 my ($self) = @_; 1580 my ($self) = @_;
1540 1581
1541 unless ($self->{_rw} || $self->{_eof}) { 1582 unless ($self->{_rw} || $self->{_eof}) {
1542 Scalar::Util::weaken $self; 1583 Scalar::Util::weaken $self;
1543 1584
1544 $self->{_rw} = AnyEvent->io (fh => $self->{fh}, poll => "r", cb => sub { 1585 $self->{_rw} = AE::io $self->{fh}, 0, sub {
1545 my $rbuf = \($self->{tls} ? my $buf : $self->{rbuf}); 1586 my $rbuf = \($self->{tls} ? my $buf : $self->{rbuf});
1546 my $len = sysread $self->{fh}, $$rbuf, $self->{read_size} || 8192, length $$rbuf; 1587 my $len = sysread $self->{fh}, $$rbuf, $self->{read_size} || 8192, length $$rbuf;
1547 1588
1548 if ($len > 0) { 1589 if ($len > 0) {
1549 $self->{_activity} = AnyEvent->now; 1590 $self->{_activity} = $self->{_ractivity} = AE::now;
1550 1591
1551 if ($self->{tls}) { 1592 if ($self->{tls}) {
1552 Net::SSLeay::BIO_write ($self->{_rbio}, $$rbuf); 1593 Net::SSLeay::BIO_write ($self->{_rbio}, $$rbuf);
1553 1594
1554 &_dotls ($self); 1595 &_dotls ($self);
1562 $self->_drain_rbuf; 1603 $self->_drain_rbuf;
1563 1604
1564 } elsif ($! != EAGAIN && $! != EINTR && $! != WSAEWOULDBLOCK) { 1605 } elsif ($! != EAGAIN && $! != EINTR && $! != WSAEWOULDBLOCK) {
1565 return $self->_error ($!, 1); 1606 return $self->_error ($!, 1);
1566 } 1607 }
1567 }); 1608 };
1568 } 1609 }
1569} 1610}
1570 1611
1571our $ERROR_SYSCALL; 1612our $ERROR_SYSCALL;
1572our $ERROR_WANT_READ; 1613our $ERROR_WANT_READ;
1732 Net::SSLeay::CTX_set_mode ($tls, 1|2); 1773 Net::SSLeay::CTX_set_mode ($tls, 1|2);
1733 1774
1734 $self->{_rbio} = Net::SSLeay::BIO_new (Net::SSLeay::BIO_s_mem ()); 1775 $self->{_rbio} = Net::SSLeay::BIO_new (Net::SSLeay::BIO_s_mem ());
1735 $self->{_wbio} = Net::SSLeay::BIO_new (Net::SSLeay::BIO_s_mem ()); 1776 $self->{_wbio} = Net::SSLeay::BIO_new (Net::SSLeay::BIO_s_mem ());
1736 1777
1778 Net::SSLeay::BIO_write ($self->{_rbio}, delete $self->{rbuf});
1779
1737 Net::SSLeay::set_bio ($tls, $self->{_rbio}, $self->{_wbio}); 1780 Net::SSLeay::set_bio ($tls, $self->{_rbio}, $self->{_wbio});
1738 1781
1739 $self->{_on_starttls} = sub { $_[0]{on_starttls}(@_) } 1782 $self->{_on_starttls} = sub { $_[0]{on_starttls}(@_) }
1740 if $self->{on_starttls}; 1783 if $self->{on_starttls};
1741 1784
1770 my ($self) = @_; 1813 my ($self) = @_;
1771 1814
1772 return unless $self->{tls}; 1815 return unless $self->{tls};
1773 1816
1774 $self->{tls_ctx}->_put_session (delete $self->{tls}) 1817 $self->{tls_ctx}->_put_session (delete $self->{tls})
1775 if ref $self->{tls}; 1818 if $self->{tls} > 0;
1776 1819
1777 delete @$self{qw(_rbio _wbio _tls_wbuf _on_starttls)}; 1820 delete @$self{qw(_rbio _wbio _tls_wbuf _on_starttls)};
1778} 1821}
1779 1822
1780sub DESTROY { 1823sub DESTROY {
1788 my $fh = delete $self->{fh}; 1831 my $fh = delete $self->{fh};
1789 my $wbuf = delete $self->{wbuf}; 1832 my $wbuf = delete $self->{wbuf};
1790 1833
1791 my @linger; 1834 my @linger;
1792 1835
1793 push @linger, AnyEvent->io (fh => $fh, poll => "w", cb => sub { 1836 push @linger, AE::io $fh, 1, sub {
1794 my $len = syswrite $fh, $wbuf, length $wbuf; 1837 my $len = syswrite $fh, $wbuf, length $wbuf;
1795 1838
1796 if ($len > 0) { 1839 if ($len > 0) {
1797 substr $wbuf, 0, $len, ""; 1840 substr $wbuf, 0, $len, "";
1798 } else { 1841 } else {
1799 @linger = (); # end 1842 @linger = (); # end
1800 } 1843 }
1801 }); 1844 };
1802 push @linger, AnyEvent->timer (after => $linger, cb => sub { 1845 push @linger, AE::timer $linger, 0, sub {
1803 @linger = (); 1846 @linger = ();
1804 }); 1847 };
1805 } 1848 }
1806} 1849}
1807 1850
1808=item $handle->destroy 1851=item $handle->destroy
1809 1852

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines