ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent/lib/AnyEvent/Handle.pm
(Generate patch)

Comparing AnyEvent/lib/AnyEvent/Handle.pm (file contents):
Revision 1.109 by root, Wed Jan 14 02:03:43 2009 UTC vs.
Revision 1.129 by root, Mon Jun 29 11:04:09 2009 UTC

14 14
15AnyEvent::Handle - non-blocking I/O on file handles via AnyEvent 15AnyEvent::Handle - non-blocking I/O on file handles via AnyEvent
16 16
17=cut 17=cut
18 18
19our $VERSION = 4.331; 19our $VERSION = 4.42;
20 20
21=head1 SYNOPSIS 21=head1 SYNOPSIS
22 22
23 use AnyEvent; 23 use AnyEvent;
24 use AnyEvent::Handle; 24 use AnyEvent::Handle;
127and no read request is in the queue (unlike read queue callbacks, this 127and no read request is in the queue (unlike read queue callbacks, this
128callback will only be called when at least one octet of data is in the 128callback will only be called when at least one octet of data is in the
129read buffer). 129read buffer).
130 130
131To access (and remove data from) the read buffer, use the C<< ->rbuf >> 131To access (and remove data from) the read buffer, use the C<< ->rbuf >>
132method or access the C<$handle->{rbuf}> member directly. 132method or access the C<$handle->{rbuf}> member directly. Note that you
133must not enlarge or modify the read buffer, you can only remove data at
134the beginning from it.
133 135
134When an EOF condition is detected then AnyEvent::Handle will first try to 136When an EOF condition is detected then AnyEvent::Handle will first try to
135feed all the remaining data to the queued callbacks and C<on_read> before 137feed all the remaining data to the queued callbacks and C<on_read> before
136calling the C<on_eof> callback. If no progress can be made, then a fatal 138calling the C<on_eof> callback. If no progress can be made, then a fatal
137error will be raised (with C<$!> set to C<EPIPE>). 139error will be raised (with C<$!> set to C<EPIPE>).
310} 312}
311 313
312sub _shutdown { 314sub _shutdown {
313 my ($self) = @_; 315 my ($self) = @_;
314 316
315 delete $self->{_tw}; 317 delete @$self{qw(_tw _rw _ww fh rbuf wbuf on_read _queue)};
316 delete $self->{_rw};
317 delete $self->{_ww};
318 delete $self->{fh};
319 318
320 &_freetls; 319 &_freetls;
321
322 delete $self->{on_read};
323 delete $self->{_queue};
324} 320}
325 321
326sub _error { 322sub _error {
327 my ($self, $errno, $fatal) = @_; 323 my ($self, $errno, $fatal) = @_;
328 324
767 ) { 763 ) {
768 $self->_error (&Errno::ENOSPC, 1), return; 764 $self->_error (&Errno::ENOSPC, 1), return;
769 } 765 }
770 766
771 while () { 767 while () {
768 # we need to use a separate tls read buffer, as we must not receive data while
769 # we are draining the buffer, and this can only happen with TLS.
770 $self->{rbuf} .= delete $self->{_tls_rbuf} if exists $self->{_tls_rbuf};
771
772 my $len = length $self->{rbuf}; 772 my $len = length $self->{rbuf};
773 773
774 if (my $cb = shift @{ $self->{_queue} }) { 774 if (my $cb = shift @{ $self->{_queue} }) {
775 unless ($cb->($self)) { 775 unless ($cb->($self)) {
776 if ($self->{_eof}) { 776 if ($self->{_eof}) {
837 837
838=item $handle->rbuf 838=item $handle->rbuf
839 839
840Returns the read buffer (as a modifiable lvalue). 840Returns the read buffer (as a modifiable lvalue).
841 841
842You can access the read buffer directly as the C<< ->{rbuf} >> member, if 842You can access the read buffer directly as the C<< ->{rbuf} >>
843you want. 843member, if you want. However, the only operation allowed on the
844read buffer (apart from looking at it) is removing data from its
845beginning. Otherwise modifying or appending to it is not allowed and will
846lead to hard-to-track-down bugs.
844 847
845NOTE: The read buffer should only be used or modified if the C<on_read>, 848NOTE: The read buffer should only be used or modified if the C<on_read>,
846C<push_read> or C<unshift_read> methods are used. The other read methods 849C<push_read> or C<unshift_read> methods are used. The other read methods
847automatically manage the read buffer. 850automatically manage the read buffer.
848 851
1144 } 1147 }
1145}; 1148};
1146 1149
1147=item json => $cb->($handle, $hash_or_arrayref) 1150=item json => $cb->($handle, $hash_or_arrayref)
1148 1151
1149Reads a JSON object or array, decodes it and passes it to the callback. 1152Reads a JSON object or array, decodes it and passes it to the
1153callback. When a parse error occurs, an C<EBADMSG> error will be raised.
1150 1154
1151If a C<json> object was passed to the constructor, then that will be used 1155If a C<json> object was passed to the constructor, then that will be used
1152for the final decode, otherwise it will create a JSON coder expecting UTF-8. 1156for the final decode, otherwise it will create a JSON coder expecting UTF-8.
1153 1157
1154This read type uses the incremental parser available with JSON version 1158This read type uses the incremental parser available with JSON version
1171 my $rbuf = \$self->{rbuf}; 1175 my $rbuf = \$self->{rbuf};
1172 1176
1173 my $json = $self->{json} ||= JSON->new->utf8; 1177 my $json = $self->{json} ||= JSON->new->utf8;
1174 1178
1175 sub { 1179 sub {
1176 my $ref = $json->incr_parse ($self->{rbuf}); 1180 my $ref = eval { $json->incr_parse ($self->{rbuf}) };
1177 1181
1178 if ($ref) { 1182 if ($ref) {
1179 $self->{rbuf} = $json->incr_text; 1183 $self->{rbuf} = $json->incr_text;
1180 $json->incr_text = ""; 1184 $json->incr_text = "";
1181 $cb->($self, $ref); 1185 $cb->($self, $ref);
1182 1186
1183 1 1187 1
1188 } elsif ($@) {
1189 # error case
1190 $json->incr_skip;
1191
1192 $self->{rbuf} = $json->incr_text;
1193 $json->incr_text = "";
1194
1195 $self->_error (&Errno::EBADMSG);
1196
1197 ()
1184 } else { 1198 } else {
1185 $self->{rbuf} = ""; 1199 $self->{rbuf} = "";
1200
1186 () 1201 ()
1187 } 1202 }
1188 } 1203 }
1189}; 1204};
1190 1205
1331 delete $self->{_rw}; 1346 delete $self->{_rw};
1332 $self->{_eof} = 1; 1347 $self->{_eof} = 1;
1333 &_freetls; 1348 &_freetls;
1334 } 1349 }
1335 1350
1336 $self->{rbuf} .= $tmp; 1351 $self->{_tls_rbuf} .= $tmp;
1337 $self->_drain_rbuf unless $self->{_in_drain}; 1352 $self->_drain_rbuf unless $self->{_in_drain};
1338 $self->{tls} or return; # tls session might have gone away in callback 1353 $self->{tls} or return; # tls session might have gone away in callback
1339 } 1354 }
1340 1355
1341 $tmp = Net::SSLeay::get_error ($self->{tls}, -1); 1356 $tmp = Net::SSLeay::get_error ($self->{tls}, -1);
1342 1357
1343 if ($tmp != Net::SSLeay::ERROR_WANT_READ ()) { 1358 if ($tmp != Net::SSLeay::ERROR_WANT_READ ()) {
1344 if ($tmp == Net::SSLeay::ERROR_SYSCALL ()) { 1359 if ($tmp == Net::SSLeay::ERROR_SYSCALL ()) {
1345 return $self->_error ($!, 1); 1360 return $self->_error ($!, 1);
1346 } elsif ($tmp == Net::SSLeay::ERROR_SSL ()) { 1361 } elsif ($tmp == Net::SSLeay::ERROR_SSL ()) {
1347 return $self->_error (&Errno::EIO, 1); 1362 return $self->_error (&Errno::EIO, 1);
1348 } 1363 }
1349 1364
1350 # all other errors are fine for our purposes 1365 # all other errors are fine for our purposes
1351 } 1366 }
1452 1467
1453 delete @$self{qw(_rbio _wbio _tls_wbuf)}; 1468 delete @$self{qw(_rbio _wbio _tls_wbuf)};
1454} 1469}
1455 1470
1456sub DESTROY { 1471sub DESTROY {
1457 my $self = shift; 1472 my ($self) = @_;
1458 1473
1459 &_freetls; 1474 &_freetls;
1460 1475
1461 my $linger = exists $self->{linger} ? $self->{linger} : 3600; 1476 my $linger = exists $self->{linger} ? $self->{linger} : 3600;
1462 1477

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines