ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent/lib/AnyEvent/Handle.pm
(Generate patch)

Comparing AnyEvent/lib/AnyEvent/Handle.pm (file contents):
Revision 1.170 by root, Sat Aug 1 09:14:54 2009 UTC vs.
Revision 1.175 by root, Sat Aug 8 22:20:43 2009 UTC

11 11
12AnyEvent::Handle - non-blocking I/O on file handles via AnyEvent 12AnyEvent::Handle - non-blocking I/O on file handles via AnyEvent
13 13
14=cut 14=cut
15 15
16our $VERSION = 4.9; 16our $VERSION = 4.91;
17 17
18=head1 SYNOPSIS 18=head1 SYNOPSIS
19 19
20 use AnyEvent; 20 use AnyEvent;
21 use AnyEvent::Handle; 21 use AnyEvent::Handle;
474sub _start { 474sub _start {
475 my ($self) = @_; 475 my ($self) = @_;
476 476
477 AnyEvent::Util::fh_nonblocking $self->{fh}, 1; 477 AnyEvent::Util::fh_nonblocking $self->{fh}, 1;
478 478
479 $self->{_activity} = AnyEvent->now; 479 $self->{_activity} = AE::now;
480 $self->_timeout; 480 $self->_timeout;
481 481
482 $self->no_delay (delete $self->{no_delay}) if exists $self->{no_delay}; 482 $self->no_delay (delete $self->{no_delay}) if exists $self->{no_delay};
483 483
484 $self->starttls (delete $self->{tls}, delete $self->{tls_ctx}) 484 $self->starttls (delete $self->{tls}, delete $self->{tls_ctx})
624 624
625sub timeout { 625sub timeout {
626 my ($self, $timeout) = @_; 626 my ($self, $timeout) = @_;
627 627
628 $self->{timeout} = $timeout; 628 $self->{timeout} = $timeout;
629 delete $self->{_tw};
629 $self->_timeout; 630 $self->_timeout;
630} 631}
631 632
632# reset the timeout watcher, as neccessary 633# reset the timeout watcher, as neccessary
633# also check for time-outs 634# also check for time-outs
634sub _timeout { 635sub _timeout {
635 my ($self) = @_; 636 my ($self) = @_;
636 637
637 if ($self->{timeout} && $self->{fh}) { 638 if ($self->{timeout} && $self->{fh}) {
638 my $NOW = AnyEvent->now; 639 my $NOW = AE::now;
639 640
640 # when would the timeout trigger? 641 # when would the timeout trigger?
641 my $after = $self->{_activity} + $self->{timeout} - $NOW; 642 my $after = $self->{_activity} + $self->{timeout} - $NOW;
642 643
643 # now or in the past already? 644 # now or in the past already?
658 } 659 }
659 660
660 Scalar::Util::weaken $self; 661 Scalar::Util::weaken $self;
661 return unless $self; # ->error could have destroyed $self 662 return unless $self; # ->error could have destroyed $self
662 663
663 $self->{_tw} ||= AnyEvent->timer (after => $after, cb => sub { 664 $self->{_tw} ||= AE::timer $after, 0, sub {
664 delete $self->{_tw}; 665 delete $self->{_tw};
665 $self->_timeout; 666 $self->_timeout;
666 }); 667 };
667 } else { 668 } else {
668 delete $self->{_tw}; 669 delete $self->{_tw};
669 } 670 }
670} 671}
671 672
721 my $len = syswrite $self->{fh}, $self->{wbuf}; 722 my $len = syswrite $self->{fh}, $self->{wbuf};
722 723
723 if (defined $len) { 724 if (defined $len) {
724 substr $self->{wbuf}, 0, $len, ""; 725 substr $self->{wbuf}, 0, $len, "";
725 726
726 $self->{_activity} = AnyEvent->now; 727 $self->{_activity} = AE::now;
727 728
728 $self->{on_drain}($self) 729 $self->{on_drain}($self)
729 if $self->{low_water_mark} >= (length $self->{wbuf}) + (length $self->{_tls_wbuf}) 730 if $self->{low_water_mark} >= (length $self->{wbuf}) + (length $self->{_tls_wbuf})
730 && $self->{on_drain}; 731 && $self->{on_drain};
731 732
737 738
738 # try to write data immediately 739 # try to write data immediately
739 $cb->() unless $self->{autocork}; 740 $cb->() unless $self->{autocork};
740 741
741 # if still data left in wbuf, we need to poll 742 # if still data left in wbuf, we need to poll
742 $self->{_ww} = AnyEvent->io (fh => $self->{fh}, poll => "w", cb => $cb) 743 $self->{_ww} = AE::io $self->{fh}, 1, $cb
743 if length $self->{wbuf}; 744 if length $self->{wbuf};
744 }; 745 };
745} 746}
746 747
747our %WH; 748our %WH;
1539 my ($self) = @_; 1540 my ($self) = @_;
1540 1541
1541 unless ($self->{_rw} || $self->{_eof}) { 1542 unless ($self->{_rw} || $self->{_eof}) {
1542 Scalar::Util::weaken $self; 1543 Scalar::Util::weaken $self;
1543 1544
1544 $self->{_rw} = AnyEvent->io (fh => $self->{fh}, poll => "r", cb => sub { 1545 $self->{_rw} = AE::io $self->{fh}, 0, sub {
1545 my $rbuf = \($self->{tls} ? my $buf : $self->{rbuf}); 1546 my $rbuf = \($self->{tls} ? my $buf : $self->{rbuf});
1546 my $len = sysread $self->{fh}, $$rbuf, $self->{read_size} || 8192, length $$rbuf; 1547 my $len = sysread $self->{fh}, $$rbuf, $self->{read_size} || 8192, length $$rbuf;
1547 1548
1548 if ($len > 0) { 1549 if ($len > 0) {
1549 $self->{_activity} = AnyEvent->now; 1550 $self->{_activity} = AE::now;
1550 1551
1551 if ($self->{tls}) { 1552 if ($self->{tls}) {
1552 Net::SSLeay::BIO_write ($self->{_rbio}, $$rbuf); 1553 Net::SSLeay::BIO_write ($self->{_rbio}, $$rbuf);
1553 1554
1554 &_dotls ($self); 1555 &_dotls ($self);
1562 $self->_drain_rbuf; 1563 $self->_drain_rbuf;
1563 1564
1564 } elsif ($! != EAGAIN && $! != EINTR && $! != WSAEWOULDBLOCK) { 1565 } elsif ($! != EAGAIN && $! != EINTR && $! != WSAEWOULDBLOCK) {
1565 return $self->_error ($!, 1); 1566 return $self->_error ($!, 1);
1566 } 1567 }
1567 }); 1568 };
1568 } 1569 }
1569} 1570}
1570 1571
1571our $ERROR_SYSCALL; 1572our $ERROR_SYSCALL;
1572our $ERROR_WANT_READ; 1573our $ERROR_WANT_READ;
1732 Net::SSLeay::CTX_set_mode ($tls, 1|2); 1733 Net::SSLeay::CTX_set_mode ($tls, 1|2);
1733 1734
1734 $self->{_rbio} = Net::SSLeay::BIO_new (Net::SSLeay::BIO_s_mem ()); 1735 $self->{_rbio} = Net::SSLeay::BIO_new (Net::SSLeay::BIO_s_mem ());
1735 $self->{_wbio} = Net::SSLeay::BIO_new (Net::SSLeay::BIO_s_mem ()); 1736 $self->{_wbio} = Net::SSLeay::BIO_new (Net::SSLeay::BIO_s_mem ());
1736 1737
1738 Net::SSLeay::BIO_write ($self->{_rbio}, delete $self->{rbuf});
1739
1737 Net::SSLeay::set_bio ($tls, $self->{_rbio}, $self->{_wbio}); 1740 Net::SSLeay::set_bio ($tls, $self->{_rbio}, $self->{_wbio});
1738 1741
1739 $self->{_on_starttls} = sub { $_[0]{on_starttls}(@_) } 1742 $self->{_on_starttls} = sub { $_[0]{on_starttls}(@_) }
1740 if $self->{on_starttls}; 1743 if $self->{on_starttls};
1741 1744
1770 my ($self) = @_; 1773 my ($self) = @_;
1771 1774
1772 return unless $self->{tls}; 1775 return unless $self->{tls};
1773 1776
1774 $self->{tls_ctx}->_put_session (delete $self->{tls}) 1777 $self->{tls_ctx}->_put_session (delete $self->{tls})
1775 if ref $self->{tls}; 1778 if $self->{tls} > 0;
1776 1779
1777 delete @$self{qw(_rbio _wbio _tls_wbuf _on_starttls)}; 1780 delete @$self{qw(_rbio _wbio _tls_wbuf _on_starttls)};
1778} 1781}
1779 1782
1780sub DESTROY { 1783sub DESTROY {
1788 my $fh = delete $self->{fh}; 1791 my $fh = delete $self->{fh};
1789 my $wbuf = delete $self->{wbuf}; 1792 my $wbuf = delete $self->{wbuf};
1790 1793
1791 my @linger; 1794 my @linger;
1792 1795
1793 push @linger, AnyEvent->io (fh => $fh, poll => "w", cb => sub { 1796 push @linger, AE::io $fh, 1, sub {
1794 my $len = syswrite $fh, $wbuf, length $wbuf; 1797 my $len = syswrite $fh, $wbuf, length $wbuf;
1795 1798
1796 if ($len > 0) { 1799 if ($len > 0) {
1797 substr $wbuf, 0, $len, ""; 1800 substr $wbuf, 0, $len, "";
1798 } else { 1801 } else {
1799 @linger = (); # end 1802 @linger = (); # end
1800 } 1803 }
1801 }); 1804 };
1802 push @linger, AnyEvent->timer (after => $linger, cb => sub { 1805 push @linger, AE::timer $linger, 0, sub {
1803 @linger = (); 1806 @linger = ();
1804 }); 1807 };
1805 } 1808 }
1806} 1809}
1807 1810
1808=item $handle->destroy 1811=item $handle->destroy
1809 1812

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines