ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent/lib/AnyEvent/Handle.pm
(Generate patch)

Comparing AnyEvent/lib/AnyEvent/Handle.pm (file contents):
Revision 1.55 by root, Tue Jun 3 16:15:30 2008 UTC vs.
Revision 1.59 by root, Thu Jun 5 16:53:11 2008 UTC

14 14
15AnyEvent::Handle - non-blocking I/O on file handles via AnyEvent 15AnyEvent::Handle - non-blocking I/O on file handles via AnyEvent
16 16
17=cut 17=cut
18 18
19our $VERSION = 4.12; 19our $VERSION = 4.13;
20 20
21=head1 SYNOPSIS 21=head1 SYNOPSIS
22 22
23 use AnyEvent; 23 use AnyEvent;
24 use AnyEvent::Handle; 24 use AnyEvent::Handle;
224 if ($self->{tls}) { 224 if ($self->{tls}) {
225 require Net::SSLeay; 225 require Net::SSLeay;
226 $self->starttls (delete $self->{tls}, delete $self->{tls_ctx}); 226 $self->starttls (delete $self->{tls}, delete $self->{tls_ctx});
227 } 227 }
228 228
229# $self->on_eof (delete $self->{on_eof} ) if $self->{on_eof}; # nop
230# $self->on_error (delete $self->{on_error}) if $self->{on_error}; # nop
231# $self->on_read (delete $self->{on_read} ) if $self->{on_read}; # nop
232 $self->on_drain (delete $self->{on_drain}) if $self->{on_drain};
233
234 $self->{_activity} = AnyEvent->now; 229 $self->{_activity} = AnyEvent->now;
235 $self->_timeout; 230 $self->_timeout;
236 231
237 $self->start_read; 232 $self->on_drain (delete $self->{on_drain}) if $self->{on_drain};
233 $self->on_read (delete $self->{on_read} ) if $self->{on_read};
238 234
239 $self 235 $self
240} 236}
241 237
242sub _shutdown { 238sub _shutdown {
339 $self->{on_timeout}($self); 335 $self->{on_timeout}($self);
340 } else { 336 } else {
341 $self->_error (&Errno::ETIMEDOUT); 337 $self->_error (&Errno::ETIMEDOUT);
342 } 338 }
343 339
344 # callbakx could have changed timeout value, optimise 340 # callback could have changed timeout value, optimise
345 return unless $self->{timeout}; 341 return unless $self->{timeout};
346 342
347 # calculate new after 343 # calculate new after
348 $after = $self->{timeout}; 344 $after = $self->{timeout};
349 } 345 }
350 346
351 Scalar::Util::weaken $self; 347 Scalar::Util::weaken $self;
348 return unless $self; # ->error could have destroyed $self
352 349
353 $self->{_tw} ||= AnyEvent->timer (after => $after, cb => sub { 350 $self->{_tw} ||= AnyEvent->timer (after => $after, cb => sub {
354 delete $self->{_tw}; 351 delete $self->{_tw};
355 $self->_timeout; 352 $self->_timeout;
356 }); 353 });
619=cut 616=cut
620 617
621sub _drain_rbuf { 618sub _drain_rbuf {
622 my ($self) = @_; 619 my ($self) = @_;
623 620
621 local $self->{_in_drain} = 1;
622
624 if ( 623 if (
625 defined $self->{rbuf_max} 624 defined $self->{rbuf_max}
626 && $self->{rbuf_max} < length $self->{rbuf} 625 && $self->{rbuf_max} < length $self->{rbuf}
627 ) { 626 ) {
628 return $self->_error (&Errno::ENOSPC, 1); 627 return $self->_error (&Errno::ENOSPC, 1);
629 } 628 }
630 629
631 return if $self->{in_drain}; 630 while () {
632 local $self->{in_drain} = 1;
633
634 while (my $len = length $self->{rbuf}) {
635 no strict 'refs'; 631 no strict 'refs';
632
633 my $len = length $self->{rbuf};
634
636 if (my $cb = shift @{ $self->{_queue} }) { 635 if (my $cb = shift @{ $self->{_queue} }) {
637 unless ($cb->($self)) { 636 unless ($cb->($self)) {
638 if ($self->{_eof}) { 637 if ($self->{_eof}) {
639 # no progress can be made (not enough data and no data forthcoming) 638 # no progress can be made (not enough data and no data forthcoming)
640 return $self->_error (&Errno::EPIPE, 1); 639 return $self->_error (&Errno::EPIPE, 1);
685 684
686sub on_read { 685sub on_read {
687 my ($self, $cb) = @_; 686 my ($self, $cb) = @_;
688 687
689 $self->{on_read} = $cb; 688 $self->{on_read} = $cb;
689 $self->_drain_rbuf if $cb && !$self->{_in_drain};
690} 690}
691 691
692=item $handle->rbuf 692=item $handle->rbuf
693 693
694Returns the read buffer (as a modifiable lvalue). 694Returns the read buffer (as a modifiable lvalue).
743 $cb = ($RH{$type} or Carp::croak "unsupported type passed to AnyEvent::Handle::push_read") 743 $cb = ($RH{$type} or Carp::croak "unsupported type passed to AnyEvent::Handle::push_read")
744 ->($self, $cb, @_); 744 ->($self, $cb, @_);
745 } 745 }
746 746
747 push @{ $self->{_queue} }, $cb; 747 push @{ $self->{_queue} }, $cb;
748 $self->_drain_rbuf; 748 $self->_drain_rbuf unless $self->{_in_drain};
749} 749}
750 750
751sub unshift_read { 751sub unshift_read {
752 my $self = shift; 752 my $self = shift;
753 my $cb = pop; 753 my $cb = pop;
759 ->($self, $cb, @_); 759 ->($self, $cb, @_);
760 } 760 }
761 761
762 762
763 unshift @{ $self->{_queue} }, $cb; 763 unshift @{ $self->{_queue} }, $cb;
764 $self->_drain_rbuf; 764 $self->_drain_rbuf unless $self->{_in_drain};
765} 765}
766 766
767=item $handle->push_read (type => @args, $cb) 767=item $handle->push_read (type => @args, $cb)
768 768
769=item $handle->unshift_read (type => @args, $cb) 769=item $handle->unshift_read (type => @args, $cb)
1032=item $handle->stop_read 1032=item $handle->stop_read
1033 1033
1034=item $handle->start_read 1034=item $handle->start_read
1035 1035
1036In rare cases you actually do not want to read anything from the 1036In rare cases you actually do not want to read anything from the
1037socket. In this case you can call C<stop_read>. Neither C<on_read> no 1037socket. In this case you can call C<stop_read>. Neither C<on_read> nor
1038any queued callbacks will be executed then. To start reading again, call 1038any queued callbacks will be executed then. To start reading again, call
1039C<start_read>. 1039C<start_read>.
1040
1041Note that AnyEvent::Handle will automatically C<start_read> for you when
1042you change the C<on_read> callback or push/unshift a read callback, and it
1043will automatically C<stop_read> for you when neither C<on_read> is set nor
1044there are any read requests in the queue.
1040 1045
1041=cut 1046=cut
1042 1047
1043sub stop_read { 1048sub stop_read {
1044 my ($self) = @_; 1049 my ($self) = @_;
1059 if ($len > 0) { 1064 if ($len > 0) {
1060 $self->{_activity} = AnyEvent->now; 1065 $self->{_activity} = AnyEvent->now;
1061 1066
1062 $self->{filter_r} 1067 $self->{filter_r}
1063 ? $self->{filter_r}($self, $rbuf) 1068 ? $self->{filter_r}($self, $rbuf)
1064 : $self->_drain_rbuf; 1069 : $self->{_in_drain} || $self->_drain_rbuf;
1065 1070
1066 } elsif (defined $len) { 1071 } elsif (defined $len) {
1067 delete $self->{_rw}; 1072 delete $self->{_rw};
1068 $self->{_eof} = 1; 1073 $self->{_eof} = 1;
1069 $self->_drain_rbuf; 1074 $self->_drain_rbuf unless $self->{_in_drain};
1070 1075
1071 } elsif ($! != EAGAIN && $! != EINTR && $! != WSAEWOULDBLOCK) { 1076 } elsif ($! != EAGAIN && $! != EINTR && $! != WSAEWOULDBLOCK) {
1072 return $self->_error ($!, 1); 1077 return $self->_error ($!, 1);
1073 } 1078 }
1074 }); 1079 });
1076} 1081}
1077 1082
1078sub _dotls { 1083sub _dotls {
1079 my ($self) = @_; 1084 my ($self) = @_;
1080 1085
1086 my $buf;
1087
1081 if (length $self->{_tls_wbuf}) { 1088 if (length $self->{_tls_wbuf}) {
1082 while ((my $len = Net::SSLeay::write ($self->{tls}, $self->{_tls_wbuf})) > 0) { 1089 while ((my $len = Net::SSLeay::write ($self->{tls}, $self->{_tls_wbuf})) > 0) {
1083 substr $self->{_tls_wbuf}, 0, $len, ""; 1090 substr $self->{_tls_wbuf}, 0, $len, "";
1084 } 1091 }
1085 } 1092 }
1086 1093
1087 if (defined (my $buf = Net::SSLeay::BIO_read ($self->{_wbio}))) { 1094 if (length ($buf = Net::SSLeay::BIO_read ($self->{_wbio}))) {
1088 $self->{wbuf} .= $buf; 1095 $self->{wbuf} .= $buf;
1089 $self->_drain_wbuf; 1096 $self->_drain_wbuf;
1090 } 1097 }
1091 1098
1092 while (defined (my $buf = Net::SSLeay::read ($self->{tls}))) { 1099 while (defined ($buf = Net::SSLeay::read ($self->{tls}))) {
1100 if (length $buf) {
1093 $self->{rbuf} .= $buf; 1101 $self->{rbuf} .= $buf;
1094 $self->_drain_rbuf; 1102 $self->_drain_rbuf unless $self->{_in_drain};
1103 } else {
1104 # let's treat SSL-eof as we treat normal EOF
1105 $self->{_eof} = 1;
1106 $self->_shutdown;
1107 return;
1108 }
1095 } 1109 }
1096 1110
1097 my $err = Net::SSLeay::get_error ($self->{tls}, -1); 1111 my $err = Net::SSLeay::get_error ($self->{tls}, -1);
1098 1112
1099 if ($err!= Net::SSLeay::ERROR_WANT_READ ()) { 1113 if ($err!= Net::SSLeay::ERROR_WANT_READ ()) {

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines