ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent/lib/AnyEvent/Handle.pm
(Generate patch)

Comparing AnyEvent/lib/AnyEvent/Handle.pm (file contents):
Revision 1.214 by root, Sun Jan 16 17:12:27 2011 UTC vs.
Revision 1.227 by root, Tue Jan 10 13:32:23 2012 UTC

11 11
12 my $hdl; $hdl = new AnyEvent::Handle 12 my $hdl; $hdl = new AnyEvent::Handle
13 fh => \*STDIN, 13 fh => \*STDIN,
14 on_error => sub { 14 on_error => sub {
15 my ($hdl, $fatal, $msg) = @_; 15 my ($hdl, $fatal, $msg) = @_;
16 warn "got error $msg\n"; 16 AE::log error => "got error $msg\n";
17 $hdl->destroy; 17 $hdl->destroy;
18 $cv->send; 18 $cv->send;
19 }; 19 };
20 20
21 # send some request line 21 # send some request line
22 $hdl->push_write ("getinfo\015\012"); 22 $hdl->push_write ("getinfo\015\012");
23 23
24 # read the response line 24 # read the response line
25 $hdl->push_read (line => sub { 25 $hdl->push_read (line => sub {
26 my ($hdl, $line) = @_; 26 my ($hdl, $line) = @_;
27 warn "got line <$line>\n"; 27 say "got line <$line>";
28 $cv->send; 28 $cv->send;
29 }); 29 });
30 30
31 $cv->recv; 31 $cv->recv;
32 32
247many seconds pass without a successful read or write on the underlying 247many seconds pass without a successful read or write on the underlying
248file handle (or a call to C<timeout_reset>), the C<on_timeout> callback 248file handle (or a call to C<timeout_reset>), the C<on_timeout> callback
249will be invoked (and if that one is missing, a non-fatal C<ETIMEDOUT> 249will be invoked (and if that one is missing, a non-fatal C<ETIMEDOUT>
250error will be raised). 250error will be raised).
251 251
252There are three variants of the timeouts that work independently 252There are three variants of the timeouts that work independently of each
253of each other, for both read and write, just read, and just write: 253other, for both read and write (triggered when nothing was read I<OR>
254written), just read (triggered when nothing was read), and just write:
254C<timeout>, C<rtimeout> and C<wtimeout>, with corresponding callbacks 255C<timeout>, C<rtimeout> and C<wtimeout>, with corresponding callbacks
255C<on_timeout>, C<on_rtimeout> and C<on_wtimeout>, and reset functions 256C<on_timeout>, C<on_rtimeout> and C<on_wtimeout>, and reset functions
256C<timeout_reset>, C<rtimeout_reset>, and C<wtimeout_reset>. 257C<timeout_reset>, C<rtimeout_reset>, and C<wtimeout_reset>.
257 258
258Note that timeout processing is active even when you do not have 259Note that timeout processing is active even when you do not have any
259any outstanding read or write requests: If you plan to keep the connection 260outstanding read or write requests: If you plan to keep the connection
260idle then you should disable the timeout temporarily or ignore the timeout 261idle then you should disable the timeout temporarily or ignore the
261in the C<on_timeout> callback, in which case AnyEvent::Handle will simply 262timeout in the corresponding C<on_timeout> callback, in which case
262restart the timeout. 263AnyEvent::Handle will simply restart the timeout.
263 264
264Zero (the default) disables this timeout. 265Zero (the default) disables the corresponding timeout.
265 266
266=item on_timeout => $cb->($handle) 267=item on_timeout => $cb->($handle)
268
269=item on_rtimeout => $cb->($handle)
270
271=item on_wtimeout => $cb->($handle)
267 272
268Called whenever the inactivity timeout passes. If you return from this 273Called whenever the inactivity timeout passes. If you return from this
269callback, then the timeout will be reset as if some activity had happened, 274callback, then the timeout will be reset as if some activity had happened,
270so this condition is not fatal in any way. 275so this condition is not fatal in any way.
271 276
354already have occured on BSD systems), but at least it will protect you 359already have occured on BSD systems), but at least it will protect you
355from most attacks. 360from most attacks.
356 361
357=item read_size => <bytes> 362=item read_size => <bytes>
358 363
359The initial read block size, the number of bytes this module will try to 364The initial read block size, the number of bytes this module will try
360read during each loop iteration. Each handle object will consume at least 365to read during each loop iteration. Each handle object will consume
361this amount of memory for the read buffer as well, so when handling many 366at least this amount of memory for the read buffer as well, so when
362connections requirements). See also C<max_read_size>. Default: C<2048>. 367handling many connections watch out for memory requirements). See also
368C<max_read_size>. Default: C<2048>.
363 369
364=item max_read_size => <bytes> 370=item max_read_size => <bytes>
365 371
366The maximum read buffer size used by the dynamic adjustment 372The maximum read buffer size used by the dynamic adjustment
367algorithm: Each time AnyEvent::Handle can read C<read_size> bytes in 373algorithm: Each time AnyEvent::Handle can read C<read_size> bytes in
536 }); 542 });
537 543
538 } else { 544 } else {
539 if ($self->{on_connect_error}) { 545 if ($self->{on_connect_error}) {
540 $self->{on_connect_error}($self, "$!"); 546 $self->{on_connect_error}($self, "$!");
541 $self->destroy; 547 $self->destroy if $self;
542 } else { 548 } else {
543 $self->_error ($!, 1); 549 $self->_error ($!, 1);
544 } 550 }
545 } 551 }
546 }, 552 },
765 771
766sub rbuf_max { 772sub rbuf_max {
767 $_[0]{rbuf_max} = $_[1]; 773 $_[0]{rbuf_max} = $_[1];
768} 774}
769 775
770sub rbuf_max { 776sub wbuf_max {
771 $_[0]{wbuf_max} = $_[1]; 777 $_[0]{wbuf_max} = $_[1];
772} 778}
773 779
774############################################################################# 780#############################################################################
775 781
778=item $handle->rtimeout ($seconds) 784=item $handle->rtimeout ($seconds)
779 785
780=item $handle->wtimeout ($seconds) 786=item $handle->wtimeout ($seconds)
781 787
782Configures (or disables) the inactivity timeout. 788Configures (or disables) the inactivity timeout.
789
790The timeout will be checked instantly, so this method might destroy the
791handle before it returns.
783 792
784=item $handle->timeout_reset 793=item $handle->timeout_reset
785 794
786=item $handle->rtimeout_reset 795=item $handle->rtimeout_reset
787 796
1072=cut 1081=cut
1073 1082
1074register_write_type storable => sub { 1083register_write_type storable => sub {
1075 my ($self, $ref) = @_; 1084 my ($self, $ref) = @_;
1076 1085
1077 require Storable; 1086 require Storable unless $Storable::VERSION;
1078 1087
1079 pack "w/a*", Storable::nfreeze ($ref) 1088 pack "w/a*", Storable::nfreeze ($ref)
1080}; 1089};
1081 1090
1082=back 1091=back
1119 1128
1120Whenever the given C<type> is used, C<push_write> will the function with 1129Whenever the given C<type> is used, C<push_write> will the function with
1121the handle object and the remaining arguments. 1130the handle object and the remaining arguments.
1122 1131
1123The function is supposed to return a single octet string that will be 1132The function is supposed to return a single octet string that will be
1124appended to the write buffer, so you cna mentally treat this function as a 1133appended to the write buffer, so you can mentally treat this function as a
1125"arguments to on-the-wire-format" converter. 1134"arguments to on-the-wire-format" converter.
1126 1135
1127Example: implement a custom write type C<join> that joins the remaining 1136Example: implement a custom write type C<join> that joins the remaining
1128arguments using the first one. 1137arguments using the first one.
1129 1138
1423data. 1432data.
1424 1433
1425Example: read 2 bytes. 1434Example: read 2 bytes.
1426 1435
1427 $handle->push_read (chunk => 2, sub { 1436 $handle->push_read (chunk => 2, sub {
1428 warn "yay ", unpack "H*", $_[1]; 1437 say "yay " . unpack "H*", $_[1];
1429 }); 1438 });
1430 1439
1431=cut 1440=cut
1432 1441
1433register_read_type chunk => sub { 1442register_read_type chunk => sub {
1467 if (@_ < 3) { 1476 if (@_ < 3) {
1468 # this is more than twice as fast as the generic code below 1477 # this is more than twice as fast as the generic code below
1469 sub { 1478 sub {
1470 $_[0]{rbuf} =~ s/^([^\015\012]*)(\015?\012)// or return; 1479 $_[0]{rbuf} =~ s/^([^\015\012]*)(\015?\012)// or return;
1471 1480
1472 $cb->($_[0], $1, $2); 1481 $cb->($_[0], "$1", "$2");
1473 1 1482 1
1474 } 1483 }
1475 } else { 1484 } else {
1476 $eol = quotemeta $eol unless ref $eol; 1485 $eol = quotemeta $eol unless ref $eol;
1477 $eol = qr|^(.*?)($eol)|s; 1486 $eol = qr|^(.*?)($eol)|s;
1478 1487
1479 sub { 1488 sub {
1480 $_[0]{rbuf} =~ s/$eol// or return; 1489 $_[0]{rbuf} =~ s/$eol// or return;
1481 1490
1482 $cb->($_[0], $1, $2); 1491 $cb->($_[0], "$1", "$2");
1483 1 1492 1
1484 } 1493 }
1485 } 1494 }
1486}; 1495};
1487 1496
1535 1544
1536 sub { 1545 sub {
1537 # accept 1546 # accept
1538 if ($$rbuf =~ $accept) { 1547 if ($$rbuf =~ $accept) {
1539 $data .= substr $$rbuf, 0, $+[0], ""; 1548 $data .= substr $$rbuf, 0, $+[0], "";
1540 $cb->($self, $data); 1549 $cb->($_[0], $data);
1541 return 1; 1550 return 1;
1542 } 1551 }
1543 1552
1544 # reject 1553 # reject
1545 if ($reject && $$rbuf =~ $reject) { 1554 if ($reject && $$rbuf =~ $reject) {
1546 $self->_error (Errno::EBADMSG); 1555 $_[0]->_error (Errno::EBADMSG);
1547 } 1556 }
1548 1557
1549 # skip 1558 # skip
1550 if ($skip && $$rbuf =~ $skip) { 1559 if ($skip && $$rbuf =~ $skip) {
1551 $data .= substr $$rbuf, 0, $+[0], ""; 1560 $data .= substr $$rbuf, 0, $+[0], "";
1567 my ($self, $cb) = @_; 1576 my ($self, $cb) = @_;
1568 1577
1569 sub { 1578 sub {
1570 unless ($_[0]{rbuf} =~ s/^(0|[1-9][0-9]*)://) { 1579 unless ($_[0]{rbuf} =~ s/^(0|[1-9][0-9]*)://) {
1571 if ($_[0]{rbuf} =~ /[^0-9]/) { 1580 if ($_[0]{rbuf} =~ /[^0-9]/) {
1572 $self->_error (Errno::EBADMSG); 1581 $_[0]->_error (Errno::EBADMSG);
1573 } 1582 }
1574 return; 1583 return;
1575 } 1584 }
1576 1585
1577 my $len = $1; 1586 my $len = $1;
1578 1587
1579 $self->unshift_read (chunk => $len, sub { 1588 $_[0]->unshift_read (chunk => $len, sub {
1580 my $string = $_[1]; 1589 my $string = $_[1];
1581 $_[0]->unshift_read (chunk => 1, sub { 1590 $_[0]->unshift_read (chunk => 1, sub {
1582 if ($_[1] eq ",") { 1591 if ($_[1] eq ",") {
1583 $cb->($_[0], $string); 1592 $cb->($_[0], $string);
1584 } else { 1593 } else {
1585 $self->_error (Errno::EBADMSG); 1594 $_[0]->_error (Errno::EBADMSG);
1586 } 1595 }
1587 }); 1596 });
1588 }); 1597 });
1589 1598
1590 1 1599 1
1663 1672
1664 my $data; 1673 my $data;
1665 my $rbuf = \$self->{rbuf}; 1674 my $rbuf = \$self->{rbuf};
1666 1675
1667 sub { 1676 sub {
1668 my $ref = eval { $json->incr_parse ($self->{rbuf}) }; 1677 my $ref = eval { $json->incr_parse ($_[0]{rbuf}) };
1669 1678
1670 if ($ref) { 1679 if ($ref) {
1671 $self->{rbuf} = $json->incr_text; 1680 $_[0]{rbuf} = $json->incr_text;
1672 $json->incr_text = ""; 1681 $json->incr_text = "";
1673 $cb->($self, $ref); 1682 $cb->($_[0], $ref);
1674 1683
1675 1 1684 1
1676 } elsif ($@) { 1685 } elsif ($@) {
1677 # error case 1686 # error case
1678 $json->incr_skip; 1687 $json->incr_skip;
1679 1688
1680 $self->{rbuf} = $json->incr_text; 1689 $_[0]{rbuf} = $json->incr_text;
1681 $json->incr_text = ""; 1690 $json->incr_text = "";
1682 1691
1683 $self->_error (Errno::EBADMSG); 1692 $_[0]->_error (Errno::EBADMSG);
1684 1693
1685 () 1694 ()
1686 } else { 1695 } else {
1687 $self->{rbuf} = ""; 1696 $_[0]{rbuf} = "";
1688 1697
1689 () 1698 ()
1690 } 1699 }
1691 } 1700 }
1692}; 1701};
1702=cut 1711=cut
1703 1712
1704register_read_type storable => sub { 1713register_read_type storable => sub {
1705 my ($self, $cb) = @_; 1714 my ($self, $cb) = @_;
1706 1715
1707 require Storable; 1716 require Storable unless $Storable::VERSION;
1708 1717
1709 sub { 1718 sub {
1710 # when we can use 5.10 we can use ".", but for 5.8 we use the re-pack method 1719 # when we can use 5.10 we can use ".", but for 5.8 we use the re-pack method
1711 defined (my $len = eval { unpack "w", $_[0]{rbuf} }) 1720 defined (my $len = eval { unpack "w", $_[0]{rbuf} })
1712 or return; 1721 or return;
1725 # read remaining chunk 1734 # read remaining chunk
1726 $_[0]->unshift_read (chunk => $len, sub { 1735 $_[0]->unshift_read (chunk => $len, sub {
1727 if (my $ref = eval { Storable::thaw ($_[1]) }) { 1736 if (my $ref = eval { Storable::thaw ($_[1]) }) {
1728 $cb->($_[0], $ref); 1737 $cb->($_[0], $ref);
1729 } else { 1738 } else {
1730 $self->_error (Errno::EBADMSG); 1739 $_[0]->_error (Errno::EBADMSG);
1731 } 1740 }
1732 }); 1741 });
1733 } 1742 }
1734 1743
1735 1 1744 1
1783some readings of the the SSL/TLS specifications basically require this 1792some readings of the the SSL/TLS specifications basically require this
1784attack to be working, as SSL/TLS implementations might stall sending data 1793attack to be working, as SSL/TLS implementations might stall sending data
1785during a rehandshake. 1794during a rehandshake.
1786 1795
1787As a guideline, during the initial handshake, you should not stop reading, 1796As a guideline, during the initial handshake, you should not stop reading,
1788and as a client, it might cause problems, depending on your applciation. 1797and as a client, it might cause problems, depending on your application.
1789 1798
1790=cut 1799=cut
1791 1800
1792sub stop_read { 1801sub stop_read {
1793 my ($self) = @_; 1802 my ($self) = @_;
2002 Net::SSLeay::CTX_set_mode ($tls, 1|2); 2011 Net::SSLeay::CTX_set_mode ($tls, 1|2);
2003 2012
2004 $self->{_rbio} = Net::SSLeay::BIO_new (Net::SSLeay::BIO_s_mem ()); 2013 $self->{_rbio} = Net::SSLeay::BIO_new (Net::SSLeay::BIO_s_mem ());
2005 $self->{_wbio} = Net::SSLeay::BIO_new (Net::SSLeay::BIO_s_mem ()); 2014 $self->{_wbio} = Net::SSLeay::BIO_new (Net::SSLeay::BIO_s_mem ());
2006 2015
2007 Net::SSLeay::BIO_write ($self->{_rbio}, delete $self->{rbuf}); 2016 Net::SSLeay::BIO_write ($self->{_rbio}, $self->{rbuf});
2017 $self->{rbuf} = "";
2008 2018
2009 Net::SSLeay::set_bio ($tls, $self->{_rbio}, $self->{_wbio}); 2019 Net::SSLeay::set_bio ($tls, $self->{_rbio}, $self->{_wbio});
2010 2020
2011 $self->{_on_starttls} = sub { $_[0]{on_starttls}(@_) } 2021 $self->{_on_starttls} = sub { $_[0]{on_starttls}(@_) }
2012 if $self->{on_starttls}; 2022 if $self->{on_starttls};
2049 $self->{tls_ctx}->_put_session (delete $self->{tls}) 2059 $self->{tls_ctx}->_put_session (delete $self->{tls})
2050 if $self->{tls} > 0; 2060 if $self->{tls} > 0;
2051 2061
2052 delete @$self{qw(_rbio _wbio _tls_wbuf _on_starttls)}; 2062 delete @$self{qw(_rbio _wbio _tls_wbuf _on_starttls)};
2053} 2063}
2064
2065=item $handle->resettls
2066
2067This rarely-used method simply resets and TLS state on the handle, usually
2068causing data loss.
2069
2070One case where it may be useful is when you want to skip over the data in
2071the stream but you are not interested in interpreting it, so data loss is
2072no concern.
2073
2074=cut
2075
2076*resettls = \&_freetls;
2054 2077
2055sub DESTROY { 2078sub DESTROY {
2056 my ($self) = @_; 2079 my ($self) = @_;
2057 2080
2058 &_freetls; 2081 &_freetls;
2181Probably because your C<on_error> callback is being called instead: When 2204Probably because your C<on_error> callback is being called instead: When
2182you have outstanding requests in your read queue, then an EOF is 2205you have outstanding requests in your read queue, then an EOF is
2183considered an error as you clearly expected some data. 2206considered an error as you clearly expected some data.
2184 2207
2185To avoid this, make sure you have an empty read queue whenever your handle 2208To avoid this, make sure you have an empty read queue whenever your handle
2186is supposed to be "idle" (i.e. connection closes are O.K.). You cna set 2209is supposed to be "idle" (i.e. connection closes are O.K.). You can set
2187an C<on_read> handler that simply pushes the first read requests in the 2210an C<on_read> handler that simply pushes the first read requests in the
2188queue. 2211queue.
2189 2212
2190See also the next question, which explains this in a bit more detail. 2213See also the next question, which explains this in a bit more detail.
2191 2214
2222some data and raises the C<EPIPE> error when the connction is dropped 2245some data and raises the C<EPIPE> error when the connction is dropped
2223unexpectedly. 2246unexpectedly.
2224 2247
2225The second variant is a protocol where the client can drop the connection 2248The second variant is a protocol where the client can drop the connection
2226at any time. For TCP, this means that the server machine may run out of 2249at any time. For TCP, this means that the server machine may run out of
2227sockets easier, and in general, it means you cnanot distinguish a protocl 2250sockets easier, and in general, it means you cannot distinguish a protocl
2228failure/client crash from a normal connection close. Nevertheless, these 2251failure/client crash from a normal connection close. Nevertheless, these
2229kinds of protocols are common (and sometimes even the best solution to the 2252kinds of protocols are common (and sometimes even the best solution to the
2230problem). 2253problem).
2231 2254
2232Having an outstanding read request at all times is possible if you ignore 2255Having an outstanding read request at all times is possible if you ignore
2284 $handle->on_eof (undef); 2307 $handle->on_eof (undef);
2285 $handle->on_error (sub { 2308 $handle->on_error (sub {
2286 my $data = delete $_[0]{rbuf}; 2309 my $data = delete $_[0]{rbuf};
2287 }); 2310 });
2288 2311
2312Note that this example removes the C<rbuf> member from the handle object,
2313which is not normally allowed by the API. It is expressly permitted in
2314this case only, as the handle object needs to be destroyed afterwards.
2315
2289The reason to use C<on_error> is that TCP connections, due to latencies 2316The reason to use C<on_error> is that TCP connections, due to latencies
2290and packets loss, might get closed quite violently with an error, when in 2317and packets loss, might get closed quite violently with an error, when in
2291fact all data has been received. 2318fact all data has been received.
2292 2319
2293It is usually better to use acknowledgements when transferring data, 2320It is usually better to use acknowledgements when transferring data,
2303C<low_water_mark> this will be called precisely when all data has been 2330C<low_water_mark> this will be called precisely when all data has been
2304written to the socket: 2331written to the socket:
2305 2332
2306 $handle->push_write (...); 2333 $handle->push_write (...);
2307 $handle->on_drain (sub { 2334 $handle->on_drain (sub {
2308 warn "all data submitted to the kernel\n"; 2335 AE::log debug => "all data submitted to the kernel\n";
2309 undef $handle; 2336 undef $handle;
2310 }); 2337 });
2311 2338
2312If you just want to queue some data and then signal EOF to the other side, 2339If you just want to queue some data and then signal EOF to the other side,
2313consider using C<< ->push_shutdown >> instead. 2340consider using C<< ->push_shutdown >> instead.

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines