ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent/lib/AnyEvent/Handle.pm
(Generate patch)

Comparing AnyEvent/lib/AnyEvent/Handle.pm (file contents):
Revision 1.15 by root, Sat May 17 21:34:15 2008 UTC vs.
Revision 1.17 by root, Sat May 24 04:17:45 2008 UTC

72The filehandle this L<AnyEvent::Handle> object will operate on. 72The filehandle this L<AnyEvent::Handle> object will operate on.
73 73
74NOTE: The filehandle will be set to non-blocking (using 74NOTE: The filehandle will be set to non-blocking (using
75AnyEvent::Util::fh_nonblocking). 75AnyEvent::Util::fh_nonblocking).
76 76
77=item on_eof => $cb->($self) [MANDATORY] 77=item on_eof => $cb->($self)
78 78
79Set the callback to be called on EOF. 79Set the callback to be called on EOF.
80
81While not mandatory, it is highly recommended to set an eof callback,
82otherwise you might end up with a closed socket while you are still
83waiting for data.
80 84
81=item on_error => $cb->($self) 85=item on_error => $cb->($self)
82 86
83This is the fatal error callback, that is called when, well, a fatal error 87This is the fatal error callback, that is called when, well, a fatal error
84ocurs, such as not being able to resolve the hostname, failure to connect 88ocurs, such as not being able to resolve the hostname, failure to connect
148 152
149 $self->{fh} or Carp::croak "mandatory argument fh is missing"; 153 $self->{fh} or Carp::croak "mandatory argument fh is missing";
150 154
151 AnyEvent::Util::fh_nonblocking $self->{fh}, 1; 155 AnyEvent::Util::fh_nonblocking $self->{fh}, 1;
152 156
153 $self->on_eof ((delete $self->{on_eof} ) or Carp::croak "mandatory argument on_eof is missing"); 157 $self->on_eof (delete $self->{on_eof} ) if $self->{on_eof};
154
155 $self->on_error (delete $self->{on_error}) if $self->{on_error}; 158 $self->on_error (delete $self->{on_error}) if $self->{on_error};
156 $self->on_drain (delete $self->{on_drain}) if $self->{on_drain}; 159 $self->on_drain (delete $self->{on_drain}) if $self->{on_drain};
157 $self->on_read (delete $self->{on_read} ) if $self->{on_read}; 160 $self->on_read (delete $self->{on_read} ) if $self->{on_read};
158 161
159 $self->start_read; 162 $self->start_read;
251want (only limited by the available memory), as C<AnyEvent::Handle> 254want (only limited by the available memory), as C<AnyEvent::Handle>
252buffers it independently of the kernel. 255buffers it independently of the kernel.
253 256
254=cut 257=cut
255 258
256sub push_write { 259sub _drain_wbuf {
257 my ($self, $data) = @_; 260 my ($self) = @_;
258
259 $self->{wbuf} .= $data;
260 261
261 unless ($self->{ww}) { 262 unless ($self->{ww}) {
262 Scalar::Util::weaken $self; 263 Scalar::Util::weaken $self;
263 my $cb = sub { 264 my $cb = sub {
264 my $len = syswrite $self->{fh}, $self->{wbuf}; 265 my $len = syswrite $self->{fh}, $self->{wbuf};
265 266
266 if ($len > 0) { 267 if ($len > 0) {
267 substr $self->{wbuf}, 0, $len, ""; 268 substr $self->{wbuf}, 0, $len, "";
268
269 269
270 $self->{on_drain}($self) 270 $self->{on_drain}($self)
271 if $self->{low_water_mark} >= length $self->{wbuf} 271 if $self->{low_water_mark} >= length $self->{wbuf}
272 && $self->{on_drain}; 272 && $self->{on_drain};
273 273
279 279
280 $self->{ww} = AnyEvent->io (fh => $self->{fh}, poll => "w", cb => $cb); 280 $self->{ww} = AnyEvent->io (fh => $self->{fh}, poll => "w", cb => $cb);
281 281
282 $cb->($self); 282 $cb->($self);
283 }; 283 };
284}
285
286sub push_write {
287 my $self = shift;
288
289 if ($self->{filter_w}) {
290 $self->{filter_w}->(\$_[0]);
291 } else {
292 $self->{wbuf} .= $_[0];
293 $self->_drain_wbuf;
294 }
284} 295}
285 296
286############################################################################# 297#############################################################################
287 298
288=back 299=back
363 374
364=cut 375=cut
365 376
366sub _drain_rbuf { 377sub _drain_rbuf {
367 my ($self) = @_; 378 my ($self) = @_;
379
380 if (
381 defined $self->{rbuf_max}
382 && $self->{rbuf_max} < length $self->{rbuf}
383 ) {
384 $! = &Errno::ENOSPC; return $self->error;
385 }
368 386
369 return if $self->{in_drain}; 387 return if $self->{in_drain};
370 local $self->{in_drain} = 1; 388 local $self->{in_drain} = 1;
371 389
372 while (my $len = length $self->{rbuf}) { 390 while (my $len = length $self->{rbuf}) {
400 } 418 }
401 } 419 }
402 420
403 if ($self->{eof}) { 421 if ($self->{eof}) {
404 $self->_shutdown; 422 $self->_shutdown;
405 $self->{on_eof}($self); 423 $self->{on_eof}($self)
424 if $self->{on_eof};
406 } 425 }
407} 426}
408 427
409=item $handle->on_read ($cb) 428=item $handle->on_read ($cb)
410 429
576 595
577 unless ($self->{rw} || $self->{eof}) { 596 unless ($self->{rw} || $self->{eof}) {
578 Scalar::Util::weaken $self; 597 Scalar::Util::weaken $self;
579 598
580 $self->{rw} = AnyEvent->io (fh => $self->{fh}, poll => "r", cb => sub { 599 $self->{rw} = AnyEvent->io (fh => $self->{fh}, poll => "r", cb => sub {
600 my $rbuf = $self->{filter_r} ? \my $buf : \$self->{rbuf};
581 my $len = sysread $self->{fh}, $self->{rbuf}, $self->{read_size} || 8192, length $self->{rbuf}; 601 my $len = sysread $self->{fh}, $$rbuf, $self->{read_size} || 8192, length $$rbuf;
582 602
583 if ($len > 0) { 603 if ($len > 0) {
584 if (defined $self->{rbuf_max}) { 604 $self->{filter_r}
585 if ($self->{rbuf_max} < length $self->{rbuf}) { 605 ? $self->{filter_r}->($rbuf)
586 $! = &Errno::ENOSPC; return $self->error; 606 : $self->_drain_rbuf;
587 }
588 }
589 607
590 } elsif (defined $len) { 608 } elsif (defined $len) {
609 delete $self->{rw};
591 $self->{eof} = 1; 610 $self->{eof} = 1;
592 delete $self->{rw}; 611 $self->_drain_rbuf;
593 612
594 } elsif ($! != EAGAIN && $! != EINTR) { 613 } elsif ($! != EAGAIN && $! != EINTR) {
595 return $self->error; 614 return $self->error;
596 } 615 }
597
598 $self->_drain_rbuf;
599 }); 616 });
600 } 617 }
601} 618}
602 619
603=back 620=back

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines