ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent/lib/AnyEvent/Handle.pm
(Generate patch)

Comparing AnyEvent/lib/AnyEvent/Handle.pm (file contents):
Revision 1.149 by root, Thu Jul 16 03:48:33 2009 UTC vs.
Revision 1.158 by root, Fri Jul 24 08:40:35 2009 UTC

1package AnyEvent::Handle; 1package AnyEvent::Handle;
2 2
3no warnings;
4use strict qw(subs vars);
5
6use AnyEvent ();
7use AnyEvent::Util qw(WSAEWOULDBLOCK);
8use Scalar::Util (); 3use Scalar::Util ();
9use Carp (); 4use Carp ();
10use Fcntl ();
11use Errno qw(EAGAIN EINTR); 5use Errno qw(EAGAIN EINTR);
12 6
7use AnyEvent (); BEGIN { AnyEvent::common_sense }
8use AnyEvent::Util qw(WSAEWOULDBLOCK);
9
13=head1 NAME 10=head1 NAME
14 11
15AnyEvent::Handle - non-blocking I/O on file handles via AnyEvent 12AnyEvent::Handle - non-blocking I/O on file handles via AnyEvent
16 13
17=cut 14=cut
18 15
19our $VERSION = 4.82; 16our $VERSION = 4.86;
20 17
21=head1 SYNOPSIS 18=head1 SYNOPSIS
22 19
23 use AnyEvent; 20 use AnyEvent;
24 use AnyEvent::Handle; 21 use AnyEvent::Handle;
26 my $cv = AnyEvent->condvar; 23 my $cv = AnyEvent->condvar;
27 24
28 my $hdl; $hdl = new AnyEvent::Handle 25 my $hdl; $hdl = new AnyEvent::Handle
29 fh => \*STDIN, 26 fh => \*STDIN,
30 on_error => sub { 27 on_error => sub {
28 my ($hdl, $fatal, $msg) = @_;
31 warn "got error $_[2]\n"; 29 warn "got error $msg\n";
30 $hdl->destroy;
32 $cv->send; 31 $cv->send;
33 ); 32 );
34 33
35 # send some request line 34 # send some request line
36 $hdl->push_write ("getinfo\015\012"); 35 $hdl->push_write ("getinfo\015\012");
70 69
71=over 4 70=over 4
72 71
73=item fh => $filehandle [MANDATORY] 72=item fh => $filehandle [MANDATORY]
74 73
74#=item fh => $filehandle [C<fh> or C<connect> MANDATORY]
75
75The filehandle this L<AnyEvent::Handle> object will operate on. 76The filehandle this L<AnyEvent::Handle> object will operate on.
76
77NOTE: The filehandle will be set to non-blocking mode (using 77NOTE: The filehandle will be set to non-blocking mode (using
78C<AnyEvent::Util::fh_nonblocking>) by the constructor and needs to stay in 78C<AnyEvent::Util::fh_nonblocking>) by the constructor and needs to stay in
79that mode. 79that mode.
80 80
81#=item connect => [$host, $service]
82#
83# You have to specify either this parameter, or C<connect>, below.
84#Try to connect to the specified host and service (port), using
85#C<AnyEvent::Socket::tcp_connect>.
86#
87#When this
88
81=item on_eof => $cb->($handle) 89=item on_eof => $cb->($handle)
82 90
83Set the callback to be called when an end-of-file condition is detected, 91Set the callback to be called when an end-of-file condition is detected,
84i.e. in the case of a socket, when the other side has closed the 92i.e. in the case of a socket, when the other side has closed the
85connection cleanly. 93connection cleanly, and there are no outstanding read requests in the
94queue (if there are read requests, then an EOF counts as an unexpected
95connection close and will be flagged as an error).
86 96
87For sockets, this just means that the other side has stopped sending data, 97For sockets, this just means that the other side has stopped sending data,
88you can still try to write data, and, in fact, one can return from the EOF 98you can still try to write data, and, in fact, one can return from the EOF
89callback and continue writing data, as only the read part has been shut 99callback and continue writing data, as only the read part has been shut
90down. 100down.
91
92While not mandatory, it is I<highly> recommended to set an EOF callback,
93otherwise you might end up with a closed socket while you are still
94waiting for data.
95 101
96If an EOF condition has been detected but no C<on_eof> callback has been 102If an EOF condition has been detected but no C<on_eof> callback has been
97set, then a fatal error will be raised with C<$!> set to <0>. 103set, then a fatal error will be raised with C<$!> set to <0>.
98 104
99=item on_error => $cb->($handle, $fatal, $message) 105=item on_error => $cb->($handle, $fatal, $message)
140 146
141When an EOF condition is detected then AnyEvent::Handle will first try to 147When an EOF condition is detected then AnyEvent::Handle will first try to
142feed all the remaining data to the queued callbacks and C<on_read> before 148feed all the remaining data to the queued callbacks and C<on_read> before
143calling the C<on_eof> callback. If no progress can be made, then a fatal 149calling the C<on_eof> callback. If no progress can be made, then a fatal
144error will be raised (with C<$!> set to C<EPIPE>). 150error will be raised (with C<$!> set to C<EPIPE>).
151
152Note that, unlike requests in the read queue, an C<on_read> callback
153doesn't mean you I<require> some data: if there is an EOF and there
154are outstanding read requests then an error will be flagged. With an
155C<on_read> callback, the C<on_eof> callback will be invoked.
145 156
146=item on_drain => $cb->($handle) 157=item on_drain => $cb->($handle)
147 158
148This sets the callback that is called when the write buffer becomes empty 159This sets the callback that is called when the write buffer becomes empty
149(or when the callback is set and the buffer is empty already). 160(or when the callback is set and the buffer is empty already).
385 $! = $errno; 396 $! = $errno;
386 $message ||= "$!"; 397 $message ||= "$!";
387 398
388 if ($self->{on_error}) { 399 if ($self->{on_error}) {
389 $self->{on_error}($self, $fatal, $message); 400 $self->{on_error}($self, $fatal, $message);
390 $self->destroy; 401 $self->destroy if $fatal;
391 } elsif ($self->{fh}) { 402 } elsif ($self->{fh}) {
392 $self->destroy; 403 $self->destroy;
393 Carp::croak "AnyEvent::Handle uncaught error: $message"; 404 Carp::croak "AnyEvent::Handle uncaught error: $message";
394 } 405 }
395} 406}
512 $self->{_activity} = $NOW; 523 $self->{_activity} = $NOW;
513 524
514 if ($self->{on_timeout}) { 525 if ($self->{on_timeout}) {
515 $self->{on_timeout}($self); 526 $self->{on_timeout}($self);
516 } else { 527 } else {
517 $self->_error (&Errno::ETIMEDOUT); 528 $self->_error (Errno::ETIMEDOUT);
518 } 529 }
519 530
520 # callback could have changed timeout value, optimise 531 # callback could have changed timeout value, optimise
521 return unless $self->{timeout}; 532 return unless $self->{timeout};
522 533
864 875
865 if ( 876 if (
866 defined $self->{rbuf_max} 877 defined $self->{rbuf_max}
867 && $self->{rbuf_max} < length $self->{rbuf} 878 && $self->{rbuf_max} < length $self->{rbuf}
868 ) { 879 ) {
869 $self->_error (&Errno::ENOSPC, 1), return; 880 $self->_error (Errno::ENOSPC, 1), return;
870 } 881 }
871 882
872 while () { 883 while () {
873 # we need to use a separate tls read buffer, as we must not receive data while 884 # we need to use a separate tls read buffer, as we must not receive data while
874 # we are draining the buffer, and this can only happen with TLS. 885 # we are draining the buffer, and this can only happen with TLS.
878 889
879 if (my $cb = shift @{ $self->{_queue} }) { 890 if (my $cb = shift @{ $self->{_queue} }) {
880 unless ($cb->($self)) { 891 unless ($cb->($self)) {
881 if ($self->{_eof}) { 892 if ($self->{_eof}) {
882 # no progress can be made (not enough data and no data forthcoming) 893 # no progress can be made (not enough data and no data forthcoming)
883 $self->_error (&Errno::EPIPE, 1), return; 894 $self->_error (Errno::EPIPE, 1), return;
884 } 895 }
885 896
886 unshift @{ $self->{_queue} }, $cb; 897 unshift @{ $self->{_queue} }, $cb;
887 last; 898 last;
888 } 899 }
896 && !@{ $self->{_queue} } # and the queue is still empty 907 && !@{ $self->{_queue} } # and the queue is still empty
897 && $self->{on_read} # but we still have on_read 908 && $self->{on_read} # but we still have on_read
898 ) { 909 ) {
899 # no further data will arrive 910 # no further data will arrive
900 # so no progress can be made 911 # so no progress can be made
901 $self->_error (&Errno::EPIPE, 1), return 912 $self->_error (Errno::EPIPE, 1), return
902 if $self->{_eof}; 913 if $self->{_eof};
903 914
904 last; # more data might arrive 915 last; # more data might arrive
905 } 916 }
906 } else { 917 } else {
1156 return 1; 1167 return 1;
1157 } 1168 }
1158 1169
1159 # reject 1170 # reject
1160 if ($reject && $$rbuf =~ $reject) { 1171 if ($reject && $$rbuf =~ $reject) {
1161 $self->_error (&Errno::EBADMSG); 1172 $self->_error (Errno::EBADMSG);
1162 } 1173 }
1163 1174
1164 # skip 1175 # skip
1165 if ($skip && $$rbuf =~ $skip) { 1176 if ($skip && $$rbuf =~ $skip) {
1166 $data .= substr $$rbuf, 0, $+[0], ""; 1177 $data .= substr $$rbuf, 0, $+[0], "";
1182 my ($self, $cb) = @_; 1193 my ($self, $cb) = @_;
1183 1194
1184 sub { 1195 sub {
1185 unless ($_[0]{rbuf} =~ s/^(0|[1-9][0-9]*)://) { 1196 unless ($_[0]{rbuf} =~ s/^(0|[1-9][0-9]*)://) {
1186 if ($_[0]{rbuf} =~ /[^0-9]/) { 1197 if ($_[0]{rbuf} =~ /[^0-9]/) {
1187 $self->_error (&Errno::EBADMSG); 1198 $self->_error (Errno::EBADMSG);
1188 } 1199 }
1189 return; 1200 return;
1190 } 1201 }
1191 1202
1192 my $len = $1; 1203 my $len = $1;
1195 my $string = $_[1]; 1206 my $string = $_[1];
1196 $_[0]->unshift_read (chunk => 1, sub { 1207 $_[0]->unshift_read (chunk => 1, sub {
1197 if ($_[1] eq ",") { 1208 if ($_[1] eq ",") {
1198 $cb->($_[0], $string); 1209 $cb->($_[0], $string);
1199 } else { 1210 } else {
1200 $self->_error (&Errno::EBADMSG); 1211 $self->_error (Errno::EBADMSG);
1201 } 1212 }
1202 }); 1213 });
1203 }); 1214 });
1204 1215
1205 1 1216 1
1295 $json->incr_skip; 1306 $json->incr_skip;
1296 1307
1297 $self->{rbuf} = $json->incr_text; 1308 $self->{rbuf} = $json->incr_text;
1298 $json->incr_text = ""; 1309 $json->incr_text = "";
1299 1310
1300 $self->_error (&Errno::EBADMSG); 1311 $self->_error (Errno::EBADMSG);
1301 1312
1302 () 1313 ()
1303 } else { 1314 } else {
1304 $self->{rbuf} = ""; 1315 $self->{rbuf} = "";
1305 1316
1342 # read remaining chunk 1353 # read remaining chunk
1343 $_[0]->unshift_read (chunk => $len, sub { 1354 $_[0]->unshift_read (chunk => $len, sub {
1344 if (my $ref = eval { Storable::thaw ($_[1]) }) { 1355 if (my $ref = eval { Storable::thaw ($_[1]) }) {
1345 $cb->($_[0], $ref); 1356 $cb->($_[0], $ref);
1346 } else { 1357 } else {
1347 $self->_error (&Errno::EBADMSG); 1358 $self->_error (Errno::EBADMSG);
1348 } 1359 }
1349 }); 1360 });
1350 } 1361 }
1351 1362
1352 1 1363 1
1450 if ($self->{_on_starttls}) { 1461 if ($self->{_on_starttls}) {
1451 (delete $self->{_on_starttls})->($self, undef, $err); 1462 (delete $self->{_on_starttls})->($self, undef, $err);
1452 &_freetls; 1463 &_freetls;
1453 } else { 1464 } else {
1454 &_freetls; 1465 &_freetls;
1455 $self->_error (&Errno::EPROTO, 1, $err); 1466 $self->_error (Errno::EPROTO, 1, $err);
1456 } 1467 }
1457} 1468}
1458 1469
1459# poll the write BIO and send the data if applicable 1470# poll the write BIO and send the data if applicable
1460# also decode read data if possible 1471# also decode read data if possible
1517 1528
1518Instead of starting TLS negotiation immediately when the AnyEvent::Handle 1529Instead of starting TLS negotiation immediately when the AnyEvent::Handle
1519object is created, you can also do that at a later time by calling 1530object is created, you can also do that at a later time by calling
1520C<starttls>. 1531C<starttls>.
1521 1532
1533Starting TLS is currently an asynchronous operation - when you push some
1534write data and then call C<< ->starttls >> then TLS negotiation will start
1535immediately, after which the queued write data is then sent.
1536
1522The first argument is the same as the C<tls> constructor argument (either 1537The first argument is the same as the C<tls> constructor argument (either
1523C<"connect">, C<"accept"> or an existing Net::SSLeay object). 1538C<"connect">, C<"accept"> or an existing Net::SSLeay object).
1524 1539
1525The second argument is the optional C<AnyEvent::TLS> object that is used 1540The second argument is the optional C<AnyEvent::TLS> object that is used
1526when AnyEvent::Handle has to create its own TLS connection object, or 1541when AnyEvent::Handle has to create its own TLS connection object, or
1550 $ERROR_SYSCALL = Net::SSLeay::ERROR_SYSCALL (); 1565 $ERROR_SYSCALL = Net::SSLeay::ERROR_SYSCALL ();
1551 $ERROR_WANT_READ = Net::SSLeay::ERROR_WANT_READ (); 1566 $ERROR_WANT_READ = Net::SSLeay::ERROR_WANT_READ ();
1552 1567
1553 $ctx ||= $self->{tls_ctx}; 1568 $ctx ||= $self->{tls_ctx};
1554 1569
1570 local $Carp::CarpLevel = 1; # skip ourselves when creating a new context or session
1571
1555 if ("HASH" eq ref $ctx) { 1572 if ("HASH" eq ref $ctx) {
1556 require AnyEvent::TLS; 1573 require AnyEvent::TLS;
1557
1558 local $Carp::CarpLevel = 1; # skip ourselves when creating a new context
1559 1574
1560 if ($ctx->{cache}) { 1575 if ($ctx->{cache}) {
1561 my $key = $ctx+0; 1576 my $key = $ctx+0;
1562 $ctx = $TLS_CACHE{$key} ||= new AnyEvent::TLS %$ctx; 1577 $ctx = $TLS_CACHE{$key} ||= new AnyEvent::TLS %$ctx;
1563 } else { 1578 } else {
1635 1650
1636 &_freetls; 1651 &_freetls;
1637 1652
1638 my $linger = exists $self->{linger} ? $self->{linger} : 3600; 1653 my $linger = exists $self->{linger} ? $self->{linger} : 3600;
1639 1654
1640 if ($linger && length $self->{wbuf}) { 1655 if ($linger && length $self->{wbuf} && $self->{fh}) {
1641 my $fh = delete $self->{fh}; 1656 my $fh = delete $self->{fh};
1642 my $wbuf = delete $self->{wbuf}; 1657 my $wbuf = delete $self->{wbuf};
1643 1658
1644 my @linger; 1659 my @linger;
1645 1660

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines