ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent/lib/AnyEvent/Handle.pm
(Generate patch)

Comparing AnyEvent/lib/AnyEvent/Handle.pm (file contents):
Revision 1.159 by root, Fri Jul 24 12:35:58 2009 UTC vs.
Revision 1.175 by root, Sat Aug 8 22:20:43 2009 UTC

11 11
12AnyEvent::Handle - non-blocking I/O on file handles via AnyEvent 12AnyEvent::Handle - non-blocking I/O on file handles via AnyEvent
13 13
14=cut 14=cut
15 15
16our $VERSION = 4.86; 16our $VERSION = 4.91;
17 17
18=head1 SYNOPSIS 18=head1 SYNOPSIS
19 19
20 use AnyEvent; 20 use AnyEvent;
21 use AnyEvent::Handle; 21 use AnyEvent::Handle;
84C<AnyEvent::Socket::tcp_connect>. The C<$host> additionally becomes the 84C<AnyEvent::Socket::tcp_connect>. The C<$host> additionally becomes the
85default C<peername>. 85default C<peername>.
86 86
87You have to specify either this parameter, or C<fh>, above. 87You have to specify either this parameter, or C<fh>, above.
88 88
89It is possible to push requests on the read and write queues, and modify
90properties of the stream, even while AnyEvent::Handle is connecting.
91
89When this parameter is specified, then the C<on_prepare>, 92When this parameter is specified, then the C<on_prepare>,
90C<on_connect_error> and C<on_connect> callbacks will be called under the 93C<on_connect_error> and C<on_connect> callbacks will be called under the
91appropriate circumstances: 94appropriate circumstances:
92 95
93=over 4 96=over 4
97This (rarely used) callback is called before a new connection is 100This (rarely used) callback is called before a new connection is
98attempted, but after the file handle has been created. It could be used to 101attempted, but after the file handle has been created. It could be used to
99prepare the file handle with parameters required for the actual connect 102prepare the file handle with parameters required for the actual connect
100(as opposed to settings that can be changed when the connection is already 103(as opposed to settings that can be changed when the connection is already
101established). 104established).
105
106The return value of this callback should be the connect timeout value in
107seconds (or C<0>, or C<undef>, or the empty list, to indicate the default
108timeout is to be used).
102 109
103=item on_connect => $cb->($handle, $host, $port, $retry->()) 110=item on_connect => $cb->($handle, $host, $port, $retry->())
104 111
105This callback is called when a connection has been successfully established. 112This callback is called when a connection has been successfully established.
106 113
441 } else { 448 } else {
442 if ($self->{on_connect_error}) { 449 if ($self->{on_connect_error}) {
443 $self->{on_connect_error}($self, "$!"); 450 $self->{on_connect_error}($self, "$!");
444 $self->destroy; 451 $self->destroy;
445 } else { 452 } else {
446 $self->fatal ($!, 1); 453 $self->_error ($!, 1);
447 } 454 }
448 } 455 }
449 }, 456 },
450 sub { 457 sub {
451 local $self->{fh} = $_[0]; 458 local $self->{fh} = $_[0];
452 459
460 $self->{on_prepare}
453 $self->{on_prepare}->($self) 461 ? $self->{on_prepare}->($self)
454 if $self->{on_prepare}; 462 : ()
455 } 463 }
456 ); 464 );
457 } 465 }
458 466
459 } else { 467 } else {
466sub _start { 474sub _start {
467 my ($self) = @_; 475 my ($self) = @_;
468 476
469 AnyEvent::Util::fh_nonblocking $self->{fh}, 1; 477 AnyEvent::Util::fh_nonblocking $self->{fh}, 1;
470 478
471 $self->{_activity} = AnyEvent->now; 479 $self->{_activity} = AE::now;
472 $self->_timeout; 480 $self->_timeout;
473 481
474 $self->no_delay (delete $self->{no_delay}) if exists $self->{no_delay}; 482 $self->no_delay (delete $self->{no_delay}) if exists $self->{no_delay};
475 483
476 $self->starttls (delete $self->{tls}, delete $self->{tls_ctx}) 484 $self->starttls (delete $self->{tls}, delete $self->{tls_ctx})
478 486
479 $self->on_drain (delete $self->{on_drain}) if $self->{on_drain}; 487 $self->on_drain (delete $self->{on_drain}) if $self->{on_drain};
480 488
481 $self->start_read 489 $self->start_read
482 if $self->{on_read} || @{ $self->{_queue} }; 490 if $self->{on_read} || @{ $self->{_queue} };
491
492 $self->_drain_wbuf;
483} 493}
484 494
485#sub _shutdown { 495#sub _shutdown {
486# my ($self) = @_; 496# my ($self) = @_;
487# 497#
592 602
593sub on_starttls { 603sub on_starttls {
594 $_[0]{on_stoptls} = $_[1]; 604 $_[0]{on_stoptls} = $_[1];
595} 605}
596 606
607=item $handle->rbuf_max ($max_octets)
608
609Configures the C<rbuf_max> setting (C<undef> disables it).
610
611=cut
612
613sub rbuf_max {
614 $_[0]{rbuf_max} = $_[1];
615}
616
597############################################################################# 617#############################################################################
598 618
599=item $handle->timeout ($seconds) 619=item $handle->timeout ($seconds)
600 620
601Configures (or disables) the inactivity timeout. 621Configures (or disables) the inactivity timeout.
604 624
605sub timeout { 625sub timeout {
606 my ($self, $timeout) = @_; 626 my ($self, $timeout) = @_;
607 627
608 $self->{timeout} = $timeout; 628 $self->{timeout} = $timeout;
629 delete $self->{_tw};
609 $self->_timeout; 630 $self->_timeout;
610} 631}
611 632
612# reset the timeout watcher, as neccessary 633# reset the timeout watcher, as neccessary
613# also check for time-outs 634# also check for time-outs
614sub _timeout { 635sub _timeout {
615 my ($self) = @_; 636 my ($self) = @_;
616 637
617 if ($self->{timeout} && $self->{fh}) { 638 if ($self->{timeout} && $self->{fh}) {
618 my $NOW = AnyEvent->now; 639 my $NOW = AE::now;
619 640
620 # when would the timeout trigger? 641 # when would the timeout trigger?
621 my $after = $self->{_activity} + $self->{timeout} - $NOW; 642 my $after = $self->{_activity} + $self->{timeout} - $NOW;
622 643
623 # now or in the past already? 644 # now or in the past already?
638 } 659 }
639 660
640 Scalar::Util::weaken $self; 661 Scalar::Util::weaken $self;
641 return unless $self; # ->error could have destroyed $self 662 return unless $self; # ->error could have destroyed $self
642 663
643 $self->{_tw} ||= AnyEvent->timer (after => $after, cb => sub { 664 $self->{_tw} ||= AE::timer $after, 0, sub {
644 delete $self->{_tw}; 665 delete $self->{_tw};
645 $self->_timeout; 666 $self->_timeout;
646 }); 667 };
647 } else { 668 } else {
648 delete $self->{_tw}; 669 delete $self->{_tw};
649 } 670 }
650} 671}
651 672
701 my $len = syswrite $self->{fh}, $self->{wbuf}; 722 my $len = syswrite $self->{fh}, $self->{wbuf};
702 723
703 if (defined $len) { 724 if (defined $len) {
704 substr $self->{wbuf}, 0, $len, ""; 725 substr $self->{wbuf}, 0, $len, "";
705 726
706 $self->{_activity} = AnyEvent->now; 727 $self->{_activity} = AE::now;
707 728
708 $self->{on_drain}($self) 729 $self->{on_drain}($self)
709 if $self->{low_water_mark} >= (length $self->{wbuf}) + (length $self->{_tls_wbuf}) 730 if $self->{low_water_mark} >= (length $self->{wbuf}) + (length $self->{_tls_wbuf})
710 && $self->{on_drain}; 731 && $self->{on_drain};
711 732
717 738
718 # try to write data immediately 739 # try to write data immediately
719 $cb->() unless $self->{autocork}; 740 $cb->() unless $self->{autocork};
720 741
721 # if still data left in wbuf, we need to poll 742 # if still data left in wbuf, we need to poll
722 $self->{_ww} = AnyEvent->io (fh => $self->{fh}, poll => "w", cb => $cb) 743 $self->{_ww} = AE::io $self->{fh}, 1, $cb
723 if length $self->{wbuf}; 744 if length $self->{wbuf};
724 }; 745 };
725} 746}
726 747
727our %WH; 748our %WH;
740 ->($self, @_); 761 ->($self, @_);
741 } 762 }
742 763
743 if ($self->{tls}) { 764 if ($self->{tls}) {
744 $self->{_tls_wbuf} .= $_[0]; 765 $self->{_tls_wbuf} .= $_[0];
745 766 &_dotls ($self) if $self->{fh};
746 &_dotls ($self);
747 } else { 767 } else {
748 $self->{wbuf} .= $_[0]; 768 $self->{wbuf} .= $_[0];
749 $self->_drain_wbuf if $self->{fh}; 769 $self->_drain_wbuf if $self->{fh};
750 } 770 }
751} 771}
752 772
753=item $handle->push_write (type => @args) 773=item $handle->push_write (type => @args)
972 992
973sub _drain_rbuf { 993sub _drain_rbuf {
974 my ($self) = @_; 994 my ($self) = @_;
975 995
976 # avoid recursion 996 # avoid recursion
977 return if exists $self->{_skip_drain_rbuf}; 997 return if $self->{_skip_drain_rbuf};
978 local $self->{_skip_drain_rbuf} = 1; 998 local $self->{_skip_drain_rbuf} = 1;
979
980 if (
981 defined $self->{rbuf_max}
982 && $self->{rbuf_max} < length $self->{rbuf}
983 ) {
984 $self->_error (Errno::ENOSPC, 1), return;
985 }
986 999
987 while () { 1000 while () {
988 # we need to use a separate tls read buffer, as we must not receive data while 1001 # we need to use a separate tls read buffer, as we must not receive data while
989 # we are draining the buffer, and this can only happen with TLS. 1002 # we are draining the buffer, and this can only happen with TLS.
990 $self->{rbuf} .= delete $self->{_tls_rbuf} if exists $self->{_tls_rbuf}; 1003 $self->{rbuf} .= delete $self->{_tls_rbuf}
1004 if exists $self->{_tls_rbuf};
991 1005
992 my $len = length $self->{rbuf}; 1006 my $len = length $self->{rbuf};
993 1007
994 if (my $cb = shift @{ $self->{_queue} }) { 1008 if (my $cb = shift @{ $self->{_queue} }) {
995 unless ($cb->($self)) { 1009 unless ($cb->($self)) {
996 if ($self->{_eof}) { 1010 # no progress can be made
997 # no progress can be made (not enough data and no data forthcoming) 1011 # (not enough data and no data forthcoming)
998 $self->_error (Errno::EPIPE, 1), return; 1012 $self->_error (Errno::EPIPE, 1), return
999 } 1013 if $self->{_eof};
1000 1014
1001 unshift @{ $self->{_queue} }, $cb; 1015 unshift @{ $self->{_queue} }, $cb;
1002 last; 1016 last;
1003 } 1017 }
1004 } elsif ($self->{on_read}) { 1018 } elsif ($self->{on_read}) {
1024 last; 1038 last;
1025 } 1039 }
1026 } 1040 }
1027 1041
1028 if ($self->{_eof}) { 1042 if ($self->{_eof}) {
1029 if ($self->{on_eof}) { 1043 $self->{on_eof}
1030 $self->{on_eof}($self) 1044 ? $self->{on_eof}($self)
1031 } else {
1032 $self->_error (0, 1, "Unexpected end-of-file"); 1045 : $self->_error (0, 1, "Unexpected end-of-file");
1033 } 1046
1047 return;
1048 }
1049
1050 if (
1051 defined $self->{rbuf_max}
1052 && $self->{rbuf_max} < length $self->{rbuf}
1053 ) {
1054 $self->_error (Errno::ENOSPC, 1), return;
1034 } 1055 }
1035 1056
1036 # may need to restart read watcher 1057 # may need to restart read watcher
1037 unless ($self->{_rw}) { 1058 unless ($self->{_rw}) {
1038 $self->start_read 1059 $self->start_read
1519 my ($self) = @_; 1540 my ($self) = @_;
1520 1541
1521 unless ($self->{_rw} || $self->{_eof}) { 1542 unless ($self->{_rw} || $self->{_eof}) {
1522 Scalar::Util::weaken $self; 1543 Scalar::Util::weaken $self;
1523 1544
1524 $self->{_rw} = AnyEvent->io (fh => $self->{fh}, poll => "r", cb => sub { 1545 $self->{_rw} = AE::io $self->{fh}, 0, sub {
1525 my $rbuf = \($self->{tls} ? my $buf : $self->{rbuf}); 1546 my $rbuf = \($self->{tls} ? my $buf : $self->{rbuf});
1526 my $len = sysread $self->{fh}, $$rbuf, $self->{read_size} || 8192, length $$rbuf; 1547 my $len = sysread $self->{fh}, $$rbuf, $self->{read_size} || 8192, length $$rbuf;
1527 1548
1528 if ($len > 0) { 1549 if ($len > 0) {
1529 $self->{_activity} = AnyEvent->now; 1550 $self->{_activity} = AE::now;
1530 1551
1531 if ($self->{tls}) { 1552 if ($self->{tls}) {
1532 Net::SSLeay::BIO_write ($self->{_rbio}, $$rbuf); 1553 Net::SSLeay::BIO_write ($self->{_rbio}, $$rbuf);
1533 1554
1534 &_dotls ($self); 1555 &_dotls ($self);
1542 $self->_drain_rbuf; 1563 $self->_drain_rbuf;
1543 1564
1544 } elsif ($! != EAGAIN && $! != EINTR && $! != WSAEWOULDBLOCK) { 1565 } elsif ($! != EAGAIN && $! != EINTR && $! != WSAEWOULDBLOCK) {
1545 return $self->_error ($!, 1); 1566 return $self->_error ($!, 1);
1546 } 1567 }
1547 }); 1568 };
1548 } 1569 }
1549} 1570}
1550 1571
1551our $ERROR_SYSCALL; 1572our $ERROR_SYSCALL;
1552our $ERROR_WANT_READ; 1573our $ERROR_WANT_READ;
1649The TLS connection object will end up in C<< $handle->{tls} >>, the TLS 1670The TLS connection object will end up in C<< $handle->{tls} >>, the TLS
1650context in C<< $handle->{tls_ctx} >> after this call and can be used or 1671context in C<< $handle->{tls_ctx} >> after this call and can be used or
1651changed to your liking. Note that the handshake might have already started 1672changed to your liking. Note that the handshake might have already started
1652when this function returns. 1673when this function returns.
1653 1674
1654If it an error to start a TLS handshake more than once per 1675Due to bugs in OpenSSL, it might or might not be possible to do multiple
1655AnyEvent::Handle object (this is due to bugs in OpenSSL). 1676handshakes on the same stream. Best do not attempt to use the stream after
1677stopping TLS.
1656 1678
1657=cut 1679=cut
1658 1680
1659our %TLS_CACHE; #TODO not yet documented, should we? 1681our %TLS_CACHE; #TODO not yet documented, should we?
1660 1682
1661sub starttls { 1683sub starttls {
1662 my ($self, $ssl, $ctx) = @_; 1684 my ($self, $tls, $ctx) = @_;
1685
1686 Carp::croak "It is an error to call starttls on an AnyEvent::Handle object while TLS is already active, caught"
1687 if $self->{tls};
1688
1689 $self->{tls} = $tls;
1690 $self->{tls_ctx} = $ctx if @_ > 2;
1691
1692 return unless $self->{fh};
1663 1693
1664 require Net::SSLeay; 1694 require Net::SSLeay;
1665
1666 Carp::croak "it is an error to call starttls more than once on an AnyEvent::Handle object"
1667 if $self->{tls};
1668 1695
1669 $ERROR_SYSCALL = Net::SSLeay::ERROR_SYSCALL (); 1696 $ERROR_SYSCALL = Net::SSLeay::ERROR_SYSCALL ();
1670 $ERROR_WANT_READ = Net::SSLeay::ERROR_WANT_READ (); 1697 $ERROR_WANT_READ = Net::SSLeay::ERROR_WANT_READ ();
1671 1698
1699 $tls = $self->{tls};
1672 $ctx ||= $self->{tls_ctx}; 1700 $ctx = $self->{tls_ctx};
1673 1701
1674 local $Carp::CarpLevel = 1; # skip ourselves when creating a new context or session 1702 local $Carp::CarpLevel = 1; # skip ourselves when creating a new context or session
1675 1703
1676 if ("HASH" eq ref $ctx) { 1704 if ("HASH" eq ref $ctx) {
1677 require AnyEvent::TLS; 1705 require AnyEvent::TLS;
1683 $ctx = new AnyEvent::TLS %$ctx; 1711 $ctx = new AnyEvent::TLS %$ctx;
1684 } 1712 }
1685 } 1713 }
1686 1714
1687 $self->{tls_ctx} = $ctx || TLS_CTX (); 1715 $self->{tls_ctx} = $ctx || TLS_CTX ();
1688 $self->{tls} = $ssl = $self->{tls_ctx}->_get_session ($ssl, $self, $self->{peername}); 1716 $self->{tls} = $tls = $self->{tls_ctx}->_get_session ($tls, $self, $self->{peername});
1689 1717
1690 # basically, this is deep magic (because SSL_read should have the same issues) 1718 # basically, this is deep magic (because SSL_read should have the same issues)
1691 # but the openssl maintainers basically said: "trust us, it just works". 1719 # but the openssl maintainers basically said: "trust us, it just works".
1692 # (unfortunately, we have to hardcode constants because the abysmally misdesigned 1720 # (unfortunately, we have to hardcode constants because the abysmally misdesigned
1693 # and mismaintained ssleay-module doesn't even offer them). 1721 # and mismaintained ssleay-module doesn't even offer them).
1700 # and we drive openssl fully in blocking mode here. Or maybe we don't - openssl seems to 1728 # and we drive openssl fully in blocking mode here. Or maybe we don't - openssl seems to
1701 # have identity issues in that area. 1729 # have identity issues in that area.
1702# Net::SSLeay::CTX_set_mode ($ssl, 1730# Net::SSLeay::CTX_set_mode ($ssl,
1703# (eval { local $SIG{__DIE__}; Net::SSLeay::MODE_ENABLE_PARTIAL_WRITE () } || 1) 1731# (eval { local $SIG{__DIE__}; Net::SSLeay::MODE_ENABLE_PARTIAL_WRITE () } || 1)
1704# | (eval { local $SIG{__DIE__}; Net::SSLeay::MODE_ACCEPT_MOVING_WRITE_BUFFER () } || 2)); 1732# | (eval { local $SIG{__DIE__}; Net::SSLeay::MODE_ACCEPT_MOVING_WRITE_BUFFER () } || 2));
1705 Net::SSLeay::CTX_set_mode ($ssl, 1|2); 1733 Net::SSLeay::CTX_set_mode ($tls, 1|2);
1706 1734
1707 $self->{_rbio} = Net::SSLeay::BIO_new (Net::SSLeay::BIO_s_mem ()); 1735 $self->{_rbio} = Net::SSLeay::BIO_new (Net::SSLeay::BIO_s_mem ());
1708 $self->{_wbio} = Net::SSLeay::BIO_new (Net::SSLeay::BIO_s_mem ()); 1736 $self->{_wbio} = Net::SSLeay::BIO_new (Net::SSLeay::BIO_s_mem ());
1709 1737
1738 Net::SSLeay::BIO_write ($self->{_rbio}, delete $self->{rbuf});
1739
1710 Net::SSLeay::set_bio ($ssl, $self->{_rbio}, $self->{_wbio}); 1740 Net::SSLeay::set_bio ($tls, $self->{_rbio}, $self->{_wbio});
1711 1741
1712 $self->{_on_starttls} = sub { $_[0]{on_starttls}(@_) } 1742 $self->{_on_starttls} = sub { $_[0]{on_starttls}(@_) }
1713 if $self->{on_starttls}; 1743 if $self->{on_starttls};
1714 1744
1715 &_dotls; # need to trigger the initial handshake 1745 &_dotls; # need to trigger the initial handshake
1718 1748
1719=item $handle->stoptls 1749=item $handle->stoptls
1720 1750
1721Shuts down the SSL connection - this makes a proper EOF handshake by 1751Shuts down the SSL connection - this makes a proper EOF handshake by
1722sending a close notify to the other side, but since OpenSSL doesn't 1752sending a close notify to the other side, but since OpenSSL doesn't
1723support non-blocking shut downs, it is not possible to re-use the stream 1753support non-blocking shut downs, it is not guarenteed that you can re-use
1724afterwards. 1754the stream afterwards.
1725 1755
1726=cut 1756=cut
1727 1757
1728sub stoptls { 1758sub stoptls {
1729 my ($self) = @_; 1759 my ($self) = @_;
1742sub _freetls { 1772sub _freetls {
1743 my ($self) = @_; 1773 my ($self) = @_;
1744 1774
1745 return unless $self->{tls}; 1775 return unless $self->{tls};
1746 1776
1747 $self->{tls_ctx}->_put_session (delete $self->{tls}); 1777 $self->{tls_ctx}->_put_session (delete $self->{tls})
1778 if $self->{tls} > 0;
1748 1779
1749 delete @$self{qw(_rbio _wbio _tls_wbuf _on_starttls)}; 1780 delete @$self{qw(_rbio _wbio _tls_wbuf _on_starttls)};
1750} 1781}
1751 1782
1752sub DESTROY { 1783sub DESTROY {
1760 my $fh = delete $self->{fh}; 1791 my $fh = delete $self->{fh};
1761 my $wbuf = delete $self->{wbuf}; 1792 my $wbuf = delete $self->{wbuf};
1762 1793
1763 my @linger; 1794 my @linger;
1764 1795
1765 push @linger, AnyEvent->io (fh => $fh, poll => "w", cb => sub { 1796 push @linger, AE::io $fh, 1, sub {
1766 my $len = syswrite $fh, $wbuf, length $wbuf; 1797 my $len = syswrite $fh, $wbuf, length $wbuf;
1767 1798
1768 if ($len > 0) { 1799 if ($len > 0) {
1769 substr $wbuf, 0, $len, ""; 1800 substr $wbuf, 0, $len, "";
1770 } else { 1801 } else {
1771 @linger = (); # end 1802 @linger = (); # end
1772 } 1803 }
1773 }); 1804 };
1774 push @linger, AnyEvent->timer (after => $linger, cb => sub { 1805 push @linger, AE::timer $linger, 0, sub {
1775 @linger = (); 1806 @linger = ();
1776 }); 1807 };
1777 } 1808 }
1778} 1809}
1779 1810
1780=item $handle->destroy 1811=item $handle->destroy
1781 1812
1782Shuts down the handle object as much as possible - this call ensures that 1813Shuts down the handle object as much as possible - this call ensures that
1783no further callbacks will be invoked and as many resources as possible 1814no further callbacks will be invoked and as many resources as possible
1784will be freed. You must not call any methods on the object afterwards. 1815will be freed. Any method you will call on the handle object after
1816destroying it in this way will be silently ignored (and it will return the
1817empty list).
1785 1818
1786Normally, you can just "forget" any references to an AnyEvent::Handle 1819Normally, you can just "forget" any references to an AnyEvent::Handle
1787object and it will simply shut down. This works in fatal error and EOF 1820object and it will simply shut down. This works in fatal error and EOF
1788callbacks, as well as code outside. It does I<NOT> work in a read or write 1821callbacks, as well as code outside. It does I<NOT> work in a read or write
1789callback, so when you want to destroy the AnyEvent::Handle object from 1822callback, so when you want to destroy the AnyEvent::Handle object from
1803sub destroy { 1836sub destroy {
1804 my ($self) = @_; 1837 my ($self) = @_;
1805 1838
1806 $self->DESTROY; 1839 $self->DESTROY;
1807 %$self = (); 1840 %$self = ();
1841 bless $self, "AnyEvent::Handle::destroyed";
1842}
1843
1844sub AnyEvent::Handle::destroyed::AUTOLOAD {
1845 #nop
1808} 1846}
1809 1847
1810=item AnyEvent::Handle::TLS_CTX 1848=item AnyEvent::Handle::TLS_CTX
1811 1849
1812This function creates and returns the AnyEvent::TLS object used by default 1850This function creates and returns the AnyEvent::TLS object used by default

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines