ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent/lib/AnyEvent/Handle.pm
(Generate patch)

Comparing AnyEvent/lib/AnyEvent/Handle.pm (file contents):
Revision 1.54 by root, Tue Jun 3 09:02:46 2008 UTC vs.
Revision 1.58 by root, Wed Jun 4 22:51:15 2008 UTC

14 14
15AnyEvent::Handle - non-blocking I/O on file handles via AnyEvent 15AnyEvent::Handle - non-blocking I/O on file handles via AnyEvent
16 16
17=cut 17=cut
18 18
19our $VERSION = 4.12; 19our $VERSION = 4.13;
20 20
21=head1 SYNOPSIS 21=head1 SYNOPSIS
22 22
23 use AnyEvent; 23 use AnyEvent;
24 use AnyEvent::Handle; 24 use AnyEvent::Handle;
224 if ($self->{tls}) { 224 if ($self->{tls}) {
225 require Net::SSLeay; 225 require Net::SSLeay;
226 $self->starttls (delete $self->{tls}, delete $self->{tls_ctx}); 226 $self->starttls (delete $self->{tls}, delete $self->{tls_ctx});
227 } 227 }
228 228
229# $self->on_eof (delete $self->{on_eof} ) if $self->{on_eof}; # nop
230# $self->on_error (delete $self->{on_error}) if $self->{on_error}; # nop
231# $self->on_read (delete $self->{on_read} ) if $self->{on_read}; # nop
232 $self->on_drain (delete $self->{on_drain}) if $self->{on_drain};
233
234 $self->{_activity} = AnyEvent->now; 229 $self->{_activity} = AnyEvent->now;
235 $self->_timeout; 230 $self->_timeout;
236 231
237 $self->start_read; 232 $self->on_drain (delete $self->{on_drain}) if $self->{on_drain};
233 $self->on_read (delete $self->{on_read} ) if $self->{on_read};
238 234
239 $self 235 $self
240} 236}
241 237
242sub _shutdown { 238sub _shutdown {
339 $self->{on_timeout}($self); 335 $self->{on_timeout}($self);
340 } else { 336 } else {
341 $self->_error (&Errno::ETIMEDOUT); 337 $self->_error (&Errno::ETIMEDOUT);
342 } 338 }
343 339
344 # callbakx could have changed timeout value, optimise 340 # callback could have changed timeout value, optimise
345 return unless $self->{timeout}; 341 return unless $self->{timeout};
346 342
347 # calculate new after 343 # calculate new after
348 $after = $self->{timeout}; 344 $after = $self->{timeout};
349 } 345 }
350 346
351 Scalar::Util::weaken $self; 347 Scalar::Util::weaken $self;
348 return unless $self; # ->error could have destroyed $self
352 349
353 $self->{_tw} ||= AnyEvent->timer (after => $after, cb => sub { 350 $self->{_tw} ||= AnyEvent->timer (after => $after, cb => sub {
354 delete $self->{_tw}; 351 delete $self->{_tw};
355 $self->_timeout; 352 $self->_timeout;
356 }); 353 });
639 # no progress can be made (not enough data and no data forthcoming) 636 # no progress can be made (not enough data and no data forthcoming)
640 return $self->_error (&Errno::EPIPE, 1); 637 return $self->_error (&Errno::EPIPE, 1);
641 } 638 }
642 639
643 unshift @{ $self->{_queue} }, $cb; 640 unshift @{ $self->{_queue} }, $cb;
644 return; 641 last;
645 } 642 }
646 } elsif ($self->{on_read}) { 643 } elsif ($self->{on_read}) {
647 $self->{on_read}($self); 644 $self->{on_read}($self);
648 645
649 if ( 646 if (
650 $self->{_eof} # if no further data will arrive
651 && $len == length $self->{rbuf} # and no data has been consumed 647 $len == length $self->{rbuf} # if no data has been consumed
652 && !@{ $self->{_queue} } # and the queue is still empty 648 && !@{ $self->{_queue} } # and the queue is still empty
653 && $self->{on_read} # and we still want to read data 649 && $self->{on_read} # but we still have on_read
654 ) { 650 ) {
651 # no further data will arrive
655 # then no progress can be made 652 # so no progress can be made
656 return $self->_error (&Errno::EPIPE, 1); 653 return $self->_error (&Errno::EPIPE, 1)
654 if $self->{_eof};
655
656 last; # more data might arrive
657 } 657 }
658 } else { 658 } else {
659 # read side becomes idle 659 # read side becomes idle
660 delete $self->{_rw}; 660 delete $self->{_rw};
661 return; 661 last;
662 } 662 }
663 } 663 }
664 664
665 $self->{on_eof}($self) 665 $self->{on_eof}($self)
666 if $self->{_eof} && $self->{on_eof}; 666 if $self->{_eof} && $self->{on_eof};
667
668 # may need to restart read watcher
669 unless ($self->{_rw}) {
670 $self->start_read
671 if $self->{on_read} || @{ $self->{_queue} };
672 }
667} 673}
668 674
669=item $handle->on_read ($cb) 675=item $handle->on_read ($cb)
670 676
671This replaces the currently set C<on_read> callback, or clears it (when 677This replaces the currently set C<on_read> callback, or clears it (when
676 682
677sub on_read { 683sub on_read {
678 my ($self, $cb) = @_; 684 my ($self, $cb) = @_;
679 685
680 $self->{on_read} = $cb; 686 $self->{on_read} = $cb;
687 $self->_drain_rbuf if $cb;
681} 688}
682 689
683=item $handle->rbuf 690=item $handle->rbuf
684 691
685Returns the read buffer (as a modifiable lvalue). 692Returns the read buffer (as a modifiable lvalue).
1023=item $handle->stop_read 1030=item $handle->stop_read
1024 1031
1025=item $handle->start_read 1032=item $handle->start_read
1026 1033
1027In rare cases you actually do not want to read anything from the 1034In rare cases you actually do not want to read anything from the
1028socket. In this case you can call C<stop_read>. Neither C<on_read> no 1035socket. In this case you can call C<stop_read>. Neither C<on_read> nor
1029any queued callbacks will be executed then. To start reading again, call 1036any queued callbacks will be executed then. To start reading again, call
1030C<start_read>. 1037C<start_read>.
1038
1039Note that AnyEvent::Handle will automatically C<start_read> for you when
1040you change the C<on_read> callback or push/unshift a read callback, and it
1041will automatically C<stop_read> for you when neither C<on_read> is set nor
1042there are any read requests in the queue.
1031 1043
1032=cut 1044=cut
1033 1045
1034sub stop_read { 1046sub stop_read {
1035 my ($self) = @_; 1047 my ($self) = @_;
1067} 1079}
1068 1080
1069sub _dotls { 1081sub _dotls {
1070 my ($self) = @_; 1082 my ($self) = @_;
1071 1083
1084 my $buf;
1085
1072 if (length $self->{_tls_wbuf}) { 1086 if (length $self->{_tls_wbuf}) {
1073 while ((my $len = Net::SSLeay::write ($self->{tls}, $self->{_tls_wbuf})) > 0) { 1087 while ((my $len = Net::SSLeay::write ($self->{tls}, $self->{_tls_wbuf})) > 0) {
1074 substr $self->{_tls_wbuf}, 0, $len, ""; 1088 substr $self->{_tls_wbuf}, 0, $len, "";
1075 } 1089 }
1076 } 1090 }
1077 1091
1078 if (defined (my $buf = Net::SSLeay::BIO_read ($self->{_wbio}))) { 1092 if (length ($buf = Net::SSLeay::BIO_read ($self->{_wbio}))) {
1079 $self->{wbuf} .= $buf; 1093 $self->{wbuf} .= $buf;
1080 $self->_drain_wbuf; 1094 $self->_drain_wbuf;
1081 } 1095 }
1082 1096
1083 while (defined (my $buf = Net::SSLeay::read ($self->{tls}))) { 1097 while (defined ($buf = Net::SSLeay::read ($self->{tls}))) {
1098 if (length $buf) {
1084 $self->{rbuf} .= $buf; 1099 $self->{rbuf} .= $buf;
1085 $self->_drain_rbuf; 1100 $self->_drain_rbuf;
1101 } else {
1102 # let's treat SSL-eof as we treat normal EOF
1103 $self->{_eof} = 1;
1104 $self->_shutdown;
1105 return;
1106 }
1086 } 1107 }
1087 1108
1088 my $err = Net::SSLeay::get_error ($self->{tls}, -1); 1109 my $err = Net::SSLeay::get_error ($self->{tls}, -1);
1089 1110
1090 if ($err!= Net::SSLeay::ERROR_WANT_READ ()) { 1111 if ($err!= Net::SSLeay::ERROR_WANT_READ ()) {

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines