ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent/lib/AnyEvent/Handle.pm
(Generate patch)

Comparing AnyEvent/lib/AnyEvent/Handle.pm (file contents):
Revision 1.211 by root, Fri Dec 31 04:47:41 2010 UTC vs.
Revision 1.222 by root, Thu Aug 25 03:08:48 2011 UTC

11 11
12 my $hdl; $hdl = new AnyEvent::Handle 12 my $hdl; $hdl = new AnyEvent::Handle
13 fh => \*STDIN, 13 fh => \*STDIN,
14 on_error => sub { 14 on_error => sub {
15 my ($hdl, $fatal, $msg) = @_; 15 my ($hdl, $fatal, $msg) = @_;
16 warn "got error $msg\n"; 16 AE::log warn => "got error $msg\n";
17 $hdl->destroy; 17 $hdl->destroy;
18 $cv->send; 18 $cv->send;
19 }; 19 };
20 20
21 # send some request line 21 # send some request line
22 $hdl->push_write ("getinfo\015\012"); 22 $hdl->push_write ("getinfo\015\012");
23 23
24 # read the response line 24 # read the response line
25 $hdl->push_read (line => sub { 25 $hdl->push_read (line => sub {
26 my ($hdl, $line) = @_; 26 my ($hdl, $line) = @_;
27 warn "got line <$line>\n"; 27 AE::log warn => "got line <$line>\n";
28 $cv->send; 28 $cv->send;
29 }); 29 });
30 30
31 $cv->recv; 31 $cv->recv;
32 32
247many seconds pass without a successful read or write on the underlying 247many seconds pass without a successful read or write on the underlying
248file handle (or a call to C<timeout_reset>), the C<on_timeout> callback 248file handle (or a call to C<timeout_reset>), the C<on_timeout> callback
249will be invoked (and if that one is missing, a non-fatal C<ETIMEDOUT> 249will be invoked (and if that one is missing, a non-fatal C<ETIMEDOUT>
250error will be raised). 250error will be raised).
251 251
252There are three variants of the timeouts that work independently 252There are three variants of the timeouts that work independently of each
253of each other, for both read and write, just read, and just write: 253other, for both read and write (triggered when nothing was read I<OR>
254written), just read (triggered when nothing was read), and just write:
254C<timeout>, C<rtimeout> and C<wtimeout>, with corresponding callbacks 255C<timeout>, C<rtimeout> and C<wtimeout>, with corresponding callbacks
255C<on_timeout>, C<on_rtimeout> and C<on_wtimeout>, and reset functions 256C<on_timeout>, C<on_rtimeout> and C<on_wtimeout>, and reset functions
256C<timeout_reset>, C<rtimeout_reset>, and C<wtimeout_reset>. 257C<timeout_reset>, C<rtimeout_reset>, and C<wtimeout_reset>.
257 258
258Note that timeout processing is active even when you do not have 259Note that timeout processing is active even when you do not have any
259any outstanding read or write requests: If you plan to keep the connection 260outstanding read or write requests: If you plan to keep the connection
260idle then you should disable the timeout temporarily or ignore the timeout 261idle then you should disable the timeout temporarily or ignore the
261in the C<on_timeout> callback, in which case AnyEvent::Handle will simply 262timeout in the corresponding C<on_timeout> callback, in which case
262restart the timeout. 263AnyEvent::Handle will simply restart the timeout.
263 264
264Zero (the default) disables this timeout. 265Zero (the default) disables the corresponding timeout.
265 266
266=item on_timeout => $cb->($handle) 267=item on_timeout => $cb->($handle)
268
269=item on_rtimeout => $cb->($handle)
270
271=item on_wtimeout => $cb->($handle)
267 272
268Called whenever the inactivity timeout passes. If you return from this 273Called whenever the inactivity timeout passes. If you return from this
269callback, then the timeout will be reset as if some activity had happened, 274callback, then the timeout will be reset as if some activity had happened,
270so this condition is not fatal in any way. 275so this condition is not fatal in any way.
271 276
354already have occured on BSD systems), but at least it will protect you 359already have occured on BSD systems), but at least it will protect you
355from most attacks. 360from most attacks.
356 361
357=item read_size => <bytes> 362=item read_size => <bytes>
358 363
359The initial read block size, the number of bytes this module will try to 364The initial read block size, the number of bytes this module will try
360read during each loop iteration. Each handle object will consume at least 365to read during each loop iteration. Each handle object will consume
361this amount of memory for the read buffer as well, so when handling many 366at least this amount of memory for the read buffer as well, so when
362connections requirements). See also C<max_read_size>. Default: C<2048>. 367handling many connections watch out for memory requirements). See also
368C<max_read_size>. Default: C<2048>.
363 369
364=item max_read_size => <bytes> 370=item max_read_size => <bytes>
365 371
366The maximum read buffer size used by the dynamic adjustment 372The maximum read buffer size used by the dynamic adjustment
367algorithm: Each time AnyEvent::Handle can read C<read_size> bytes in 373algorithm: Each time AnyEvent::Handle can read C<read_size> bytes in
536 }); 542 });
537 543
538 } else { 544 } else {
539 if ($self->{on_connect_error}) { 545 if ($self->{on_connect_error}) {
540 $self->{on_connect_error}($self, "$!"); 546 $self->{on_connect_error}($self, "$!");
541 $self->destroy; 547 $self->destroy if $self;
542 } else { 548 } else {
543 $self->_error ($!, 1); 549 $self->_error ($!, 1);
544 } 550 }
545 } 551 }
546 }, 552 },
765 771
766sub rbuf_max { 772sub rbuf_max {
767 $_[0]{rbuf_max} = $_[1]; 773 $_[0]{rbuf_max} = $_[1];
768} 774}
769 775
770sub rbuf_max { 776sub wbuf_max {
771 $_[0]{wbuf_max} = $_[1]; 777 $_[0]{wbuf_max} = $_[1];
772} 778}
773 779
774############################################################################# 780#############################################################################
775 781
778=item $handle->rtimeout ($seconds) 784=item $handle->rtimeout ($seconds)
779 785
780=item $handle->wtimeout ($seconds) 786=item $handle->wtimeout ($seconds)
781 787
782Configures (or disables) the inactivity timeout. 788Configures (or disables) the inactivity timeout.
789
790The timeout will be checked instantly, so this method might destroy the
791handle before it returns.
783 792
784=item $handle->timeout_reset 793=item $handle->timeout_reset
785 794
786=item $handle->rtimeout_reset 795=item $handle->rtimeout_reset
787 796
1087before it was actually written. One way to do that is to replace your 1096before it was actually written. One way to do that is to replace your
1088C<on_drain> handler by a callback that shuts down the socket (and set 1097C<on_drain> handler by a callback that shuts down the socket (and set
1089C<low_water_mark> to C<0>). This method is a shorthand for just that, and 1098C<low_water_mark> to C<0>). This method is a shorthand for just that, and
1090replaces the C<on_drain> callback with: 1099replaces the C<on_drain> callback with:
1091 1100
1092 sub { shutdown $_[0]{fh}, 1 } # for push_shutdown 1101 sub { shutdown $_[0]{fh}, 1 }
1093 1102
1094This simply shuts down the write side and signals an EOF condition to the 1103This simply shuts down the write side and signals an EOF condition to the
1095the peer. 1104the peer.
1096 1105
1097You can rely on the normal read queue and C<on_eof> handling 1106You can rely on the normal read queue and C<on_eof> handling
1423data. 1432data.
1424 1433
1425Example: read 2 bytes. 1434Example: read 2 bytes.
1426 1435
1427 $handle->push_read (chunk => 2, sub { 1436 $handle->push_read (chunk => 2, sub {
1428 warn "yay ", unpack "H*", $_[1]; 1437 AE::log debug => "yay " . unpack "H*", $_[1];
1429 }); 1438 });
1430 1439
1431=cut 1440=cut
1432 1441
1433register_read_type chunk => sub { 1442register_read_type chunk => sub {
1535 1544
1536 sub { 1545 sub {
1537 # accept 1546 # accept
1538 if ($$rbuf =~ $accept) { 1547 if ($$rbuf =~ $accept) {
1539 $data .= substr $$rbuf, 0, $+[0], ""; 1548 $data .= substr $$rbuf, 0, $+[0], "";
1540 $cb->($self, $data); 1549 $cb->($_[0], $data);
1541 return 1; 1550 return 1;
1542 } 1551 }
1543 1552
1544 # reject 1553 # reject
1545 if ($reject && $$rbuf =~ $reject) { 1554 if ($reject && $$rbuf =~ $reject) {
1546 $self->_error (Errno::EBADMSG); 1555 $_[0]->_error (Errno::EBADMSG);
1547 } 1556 }
1548 1557
1549 # skip 1558 # skip
1550 if ($skip && $$rbuf =~ $skip) { 1559 if ($skip && $$rbuf =~ $skip) {
1551 $data .= substr $$rbuf, 0, $+[0], ""; 1560 $data .= substr $$rbuf, 0, $+[0], "";
1567 my ($self, $cb) = @_; 1576 my ($self, $cb) = @_;
1568 1577
1569 sub { 1578 sub {
1570 unless ($_[0]{rbuf} =~ s/^(0|[1-9][0-9]*)://) { 1579 unless ($_[0]{rbuf} =~ s/^(0|[1-9][0-9]*)://) {
1571 if ($_[0]{rbuf} =~ /[^0-9]/) { 1580 if ($_[0]{rbuf} =~ /[^0-9]/) {
1572 $self->_error (Errno::EBADMSG); 1581 $_[0]->_error (Errno::EBADMSG);
1573 } 1582 }
1574 return; 1583 return;
1575 } 1584 }
1576 1585
1577 my $len = $1; 1586 my $len = $1;
1578 1587
1579 $self->unshift_read (chunk => $len, sub { 1588 $_[0]->unshift_read (chunk => $len, sub {
1580 my $string = $_[1]; 1589 my $string = $_[1];
1581 $_[0]->unshift_read (chunk => 1, sub { 1590 $_[0]->unshift_read (chunk => 1, sub {
1582 if ($_[1] eq ",") { 1591 if ($_[1] eq ",") {
1583 $cb->($_[0], $string); 1592 $cb->($_[0], $string);
1584 } else { 1593 } else {
1585 $self->_error (Errno::EBADMSG); 1594 $_[0]->_error (Errno::EBADMSG);
1586 } 1595 }
1587 }); 1596 });
1588 }); 1597 });
1589 1598
1590 1 1599 1
1616 sub { 1625 sub {
1617 # when we can use 5.10 we can use ".", but for 5.8 we use the re-pack method 1626 # when we can use 5.10 we can use ".", but for 5.8 we use the re-pack method
1618 defined (my $len = eval { unpack $format, $_[0]{rbuf} }) 1627 defined (my $len = eval { unpack $format, $_[0]{rbuf} })
1619 or return; 1628 or return;
1620 1629
1621 warn "len $len\n";#d#
1622 $format = length pack $format, $len; 1630 $format = length pack $format, $len;
1623 warn "len2 $format\n";#d#
1624 1631
1625 # bypass unshift if we already have the remaining chunk 1632 # bypass unshift if we already have the remaining chunk
1626 if ($format + $len <= length $_[0]{rbuf}) { 1633 if ($format + $len <= length $_[0]{rbuf}) {
1627 my $data = substr $_[0]{rbuf}, $format, $len; 1634 my $data = substr $_[0]{rbuf}, $format, $len;
1628 substr $_[0]{rbuf}, 0, $format + $len, ""; 1635 substr $_[0]{rbuf}, 0, $format + $len, "";
1665 1672
1666 my $data; 1673 my $data;
1667 my $rbuf = \$self->{rbuf}; 1674 my $rbuf = \$self->{rbuf};
1668 1675
1669 sub { 1676 sub {
1670 my $ref = eval { $json->incr_parse ($self->{rbuf}) }; 1677 my $ref = eval { $json->incr_parse ($_[0]{rbuf}) };
1671 1678
1672 if ($ref) { 1679 if ($ref) {
1673 $self->{rbuf} = $json->incr_text; 1680 $_[0]{rbuf} = $json->incr_text;
1674 $json->incr_text = ""; 1681 $json->incr_text = "";
1675 $cb->($self, $ref); 1682 $cb->($_[0], $ref);
1676 1683
1677 1 1684 1
1678 } elsif ($@) { 1685 } elsif ($@) {
1679 # error case 1686 # error case
1680 $json->incr_skip; 1687 $json->incr_skip;
1681 1688
1682 $self->{rbuf} = $json->incr_text; 1689 $_[0]{rbuf} = $json->incr_text;
1683 $json->incr_text = ""; 1690 $json->incr_text = "";
1684 1691
1685 $self->_error (Errno::EBADMSG); 1692 $_[0]->_error (Errno::EBADMSG);
1686 1693
1687 () 1694 ()
1688 } else { 1695 } else {
1689 $self->{rbuf} = ""; 1696 $_[0]{rbuf} = "";
1690 1697
1691 () 1698 ()
1692 } 1699 }
1693 } 1700 }
1694}; 1701};
1727 # read remaining chunk 1734 # read remaining chunk
1728 $_[0]->unshift_read (chunk => $len, sub { 1735 $_[0]->unshift_read (chunk => $len, sub {
1729 if (my $ref = eval { Storable::thaw ($_[1]) }) { 1736 if (my $ref = eval { Storable::thaw ($_[1]) }) {
1730 $cb->($_[0], $ref); 1737 $cb->($_[0], $ref);
1731 } else { 1738 } else {
1732 $self->_error (Errno::EBADMSG); 1739 $_[0]->_error (Errno::EBADMSG);
1733 } 1740 }
1734 }); 1741 });
1735 } 1742 }
1736 1743
1737 1 1744 1
1775Note that AnyEvent::Handle will automatically C<start_read> for you when 1782Note that AnyEvent::Handle will automatically C<start_read> for you when
1776you change the C<on_read> callback or push/unshift a read callback, and it 1783you change the C<on_read> callback or push/unshift a read callback, and it
1777will automatically C<stop_read> for you when neither C<on_read> is set nor 1784will automatically C<stop_read> for you when neither C<on_read> is set nor
1778there are any read requests in the queue. 1785there are any read requests in the queue.
1779 1786
1780These methods will have no effect when in TLS mode (as TLS doesn't support 1787In older versions of this module (<= 5.3), these methods had no effect,
1781half-duplex connections). 1788as TLS does not support half-duplex connections. In current versions they
1789work as expected, as this behaviour is required to avoid certain resource
1790attacks, where the program would be forced to read (and buffer) arbitrary
1791amounts of data before being able to send some data. The drawback is that
1792some readings of the the SSL/TLS specifications basically require this
1793attack to be working, as SSL/TLS implementations might stall sending data
1794during a rehandshake.
1795
1796As a guideline, during the initial handshake, you should not stop reading,
1797and as a client, it might cause problems, depending on your applciation.
1782 1798
1783=cut 1799=cut
1784 1800
1785sub stop_read { 1801sub stop_read {
1786 my ($self) = @_; 1802 my ($self) = @_;
1787 1803
1788 delete $self->{_rw} unless $self->{tls}; 1804 delete $self->{_rw};
1789} 1805}
1790 1806
1791sub start_read { 1807sub start_read {
1792 my ($self) = @_; 1808 my ($self) = @_;
1793 1809
1995 Net::SSLeay::CTX_set_mode ($tls, 1|2); 2011 Net::SSLeay::CTX_set_mode ($tls, 1|2);
1996 2012
1997 $self->{_rbio} = Net::SSLeay::BIO_new (Net::SSLeay::BIO_s_mem ()); 2013 $self->{_rbio} = Net::SSLeay::BIO_new (Net::SSLeay::BIO_s_mem ());
1998 $self->{_wbio} = Net::SSLeay::BIO_new (Net::SSLeay::BIO_s_mem ()); 2014 $self->{_wbio} = Net::SSLeay::BIO_new (Net::SSLeay::BIO_s_mem ());
1999 2015
2000 Net::SSLeay::BIO_write ($self->{_rbio}, delete $self->{rbuf}); 2016 Net::SSLeay::BIO_write ($self->{_rbio}, $self->{rbuf});
2017 $self->{rbuf} = "";
2001 2018
2002 Net::SSLeay::set_bio ($tls, $self->{_rbio}, $self->{_wbio}); 2019 Net::SSLeay::set_bio ($tls, $self->{_rbio}, $self->{_wbio});
2003 2020
2004 $self->{_on_starttls} = sub { $_[0]{on_starttls}(@_) } 2021 $self->{_on_starttls} = sub { $_[0]{on_starttls}(@_) }
2005 if $self->{on_starttls}; 2022 if $self->{on_starttls};
2042 $self->{tls_ctx}->_put_session (delete $self->{tls}) 2059 $self->{tls_ctx}->_put_session (delete $self->{tls})
2043 if $self->{tls} > 0; 2060 if $self->{tls} > 0;
2044 2061
2045 delete @$self{qw(_rbio _wbio _tls_wbuf _on_starttls)}; 2062 delete @$self{qw(_rbio _wbio _tls_wbuf _on_starttls)};
2046} 2063}
2064
2065=item $handle->resettls
2066
2067This rarely-used method simply resets and TLS state on the handle, usually
2068causing data loss.
2069
2070One case where it may be useful is when you want to skip over the data in
2071the stream but you are not interested in interpreting it, so data loss is
2072no concern.
2073
2074=cut
2075
2076*resettls = \&_freetls;
2047 2077
2048sub DESTROY { 2078sub DESTROY {
2049 my ($self) = @_; 2079 my ($self) = @_;
2050 2080
2051 &_freetls; 2081 &_freetls;
2277 $handle->on_eof (undef); 2307 $handle->on_eof (undef);
2278 $handle->on_error (sub { 2308 $handle->on_error (sub {
2279 my $data = delete $_[0]{rbuf}; 2309 my $data = delete $_[0]{rbuf};
2280 }); 2310 });
2281 2311
2312Note that this example removes the C<rbuf> member from the handle object,
2313which is not normally allowed by the API. It is expressly permitted in
2314this case only, as the handle object needs to be destroyed afterwards.
2315
2282The reason to use C<on_error> is that TCP connections, due to latencies 2316The reason to use C<on_error> is that TCP connections, due to latencies
2283and packets loss, might get closed quite violently with an error, when in 2317and packets loss, might get closed quite violently with an error, when in
2284fact all data has been received. 2318fact all data has been received.
2285 2319
2286It is usually better to use acknowledgements when transferring data, 2320It is usually better to use acknowledgements when transferring data,
2296C<low_water_mark> this will be called precisely when all data has been 2330C<low_water_mark> this will be called precisely when all data has been
2297written to the socket: 2331written to the socket:
2298 2332
2299 $handle->push_write (...); 2333 $handle->push_write (...);
2300 $handle->on_drain (sub { 2334 $handle->on_drain (sub {
2301 warn "all data submitted to the kernel\n"; 2335 AE::log debug => "all data submitted to the kernel\n";
2302 undef $handle; 2336 undef $handle;
2303 }); 2337 });
2304 2338
2305If you just want to queue some data and then signal EOF to the other side, 2339If you just want to queue some data and then signal EOF to the other side,
2306consider using C<< ->push_shutdown >> instead. 2340consider using C<< ->push_shutdown >> instead.

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines