ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent/lib/AnyEvent/Handle.pm
(Generate patch)

Comparing AnyEvent/lib/AnyEvent/Handle.pm (file contents):
Revision 1.16 by root, Fri May 23 05:16:57 2008 UTC vs.
Revision 1.17 by root, Sat May 24 04:17:45 2008 UTC

254want (only limited by the available memory), as C<AnyEvent::Handle> 254want (only limited by the available memory), as C<AnyEvent::Handle>
255buffers it independently of the kernel. 255buffers it independently of the kernel.
256 256
257=cut 257=cut
258 258
259sub push_write { 259sub _drain_wbuf {
260 my ($self, $data) = @_; 260 my ($self) = @_;
261
262 $self->{wbuf} .= $data;
263 261
264 unless ($self->{ww}) { 262 unless ($self->{ww}) {
265 Scalar::Util::weaken $self; 263 Scalar::Util::weaken $self;
266 my $cb = sub { 264 my $cb = sub {
267 my $len = syswrite $self->{fh}, $self->{wbuf}; 265 my $len = syswrite $self->{fh}, $self->{wbuf};
268 266
269 if ($len > 0) { 267 if ($len > 0) {
270 substr $self->{wbuf}, 0, $len, ""; 268 substr $self->{wbuf}, 0, $len, "";
271
272 269
273 $self->{on_drain}($self) 270 $self->{on_drain}($self)
274 if $self->{low_water_mark} >= length $self->{wbuf} 271 if $self->{low_water_mark} >= length $self->{wbuf}
275 && $self->{on_drain}; 272 && $self->{on_drain};
276 273
282 279
283 $self->{ww} = AnyEvent->io (fh => $self->{fh}, poll => "w", cb => $cb); 280 $self->{ww} = AnyEvent->io (fh => $self->{fh}, poll => "w", cb => $cb);
284 281
285 $cb->($self); 282 $cb->($self);
286 }; 283 };
284}
285
286sub push_write {
287 my $self = shift;
288
289 if ($self->{filter_w}) {
290 $self->{filter_w}->(\$_[0]);
291 } else {
292 $self->{wbuf} .= $_[0];
293 $self->_drain_wbuf;
294 }
287} 295}
288 296
289############################################################################# 297#############################################################################
290 298
291=back 299=back
366 374
367=cut 375=cut
368 376
369sub _drain_rbuf { 377sub _drain_rbuf {
370 my ($self) = @_; 378 my ($self) = @_;
379
380 if (
381 defined $self->{rbuf_max}
382 && $self->{rbuf_max} < length $self->{rbuf}
383 ) {
384 $! = &Errno::ENOSPC; return $self->error;
385 }
371 386
372 return if $self->{in_drain}; 387 return if $self->{in_drain};
373 local $self->{in_drain} = 1; 388 local $self->{in_drain} = 1;
374 389
375 while (my $len = length $self->{rbuf}) { 390 while (my $len = length $self->{rbuf}) {
580 595
581 unless ($self->{rw} || $self->{eof}) { 596 unless ($self->{rw} || $self->{eof}) {
582 Scalar::Util::weaken $self; 597 Scalar::Util::weaken $self;
583 598
584 $self->{rw} = AnyEvent->io (fh => $self->{fh}, poll => "r", cb => sub { 599 $self->{rw} = AnyEvent->io (fh => $self->{fh}, poll => "r", cb => sub {
600 my $rbuf = $self->{filter_r} ? \my $buf : \$self->{rbuf};
585 my $len = sysread $self->{fh}, $self->{rbuf}, $self->{read_size} || 8192, length $self->{rbuf}; 601 my $len = sysread $self->{fh}, $$rbuf, $self->{read_size} || 8192, length $$rbuf;
586 602
587 if ($len > 0) { 603 if ($len > 0) {
588 if (defined $self->{rbuf_max}) { 604 $self->{filter_r}
589 if ($self->{rbuf_max} < length $self->{rbuf}) { 605 ? $self->{filter_r}->($rbuf)
590 $! = &Errno::ENOSPC; return $self->error; 606 : $self->_drain_rbuf;
591 }
592 }
593 607
594 } elsif (defined $len) { 608 } elsif (defined $len) {
609 delete $self->{rw};
595 $self->{eof} = 1; 610 $self->{eof} = 1;
596 delete $self->{rw}; 611 $self->_drain_rbuf;
597 612
598 } elsif ($! != EAGAIN && $! != EINTR) { 613 } elsif ($! != EAGAIN && $! != EINTR) {
599 return $self->error; 614 return $self->error;
600 } 615 }
601
602 $self->_drain_rbuf;
603 }); 616 });
604 } 617 }
605} 618}
606 619
607=back 620=back

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines