… | |
… | |
72 | The filehandle this L<AnyEvent::Handle> object will operate on. |
72 | The filehandle this L<AnyEvent::Handle> object will operate on. |
73 | |
73 | |
74 | NOTE: The filehandle will be set to non-blocking (using |
74 | NOTE: The filehandle will be set to non-blocking (using |
75 | AnyEvent::Util::fh_nonblocking). |
75 | AnyEvent::Util::fh_nonblocking). |
76 | |
76 | |
77 | =item on_eof => $cb->($self) [MANDATORY] |
77 | =item on_eof => $cb->($self) |
78 | |
78 | |
79 | Set the callback to be called on EOF. |
79 | Set the callback to be called on EOF. |
|
|
80 | |
|
|
81 | While not mandatory, it is highly recommended to set an eof callback, |
|
|
82 | otherwise you might end up with a closed socket while you are still |
|
|
83 | waiting for data. |
80 | |
84 | |
81 | =item on_error => $cb->($self) |
85 | =item on_error => $cb->($self) |
82 | |
86 | |
83 | This is the fatal error callback, that is called when, well, a fatal error |
87 | This is the fatal error callback, that is called when, well, a fatal error |
84 | ocurs, such as not being able to resolve the hostname, failure to connect |
88 | ocurs, such as not being able to resolve the hostname, failure to connect |
… | |
… | |
148 | |
152 | |
149 | $self->{fh} or Carp::croak "mandatory argument fh is missing"; |
153 | $self->{fh} or Carp::croak "mandatory argument fh is missing"; |
150 | |
154 | |
151 | AnyEvent::Util::fh_nonblocking $self->{fh}, 1; |
155 | AnyEvent::Util::fh_nonblocking $self->{fh}, 1; |
152 | |
156 | |
153 | $self->on_eof ((delete $self->{on_eof} ) or Carp::croak "mandatory argument on_eof is missing"); |
157 | $self->on_eof (delete $self->{on_eof} ) if $self->{on_eof}; |
154 | |
|
|
155 | $self->on_error (delete $self->{on_error}) if $self->{on_error}; |
158 | $self->on_error (delete $self->{on_error}) if $self->{on_error}; |
156 | $self->on_drain (delete $self->{on_drain}) if $self->{on_drain}; |
159 | $self->on_drain (delete $self->{on_drain}) if $self->{on_drain}; |
157 | $self->on_read (delete $self->{on_read} ) if $self->{on_read}; |
160 | $self->on_read (delete $self->{on_read} ) if $self->{on_read}; |
158 | |
161 | |
159 | $self->start_read; |
162 | $self->start_read; |
… | |
… | |
251 | want (only limited by the available memory), as C<AnyEvent::Handle> |
254 | want (only limited by the available memory), as C<AnyEvent::Handle> |
252 | buffers it independently of the kernel. |
255 | buffers it independently of the kernel. |
253 | |
256 | |
254 | =cut |
257 | =cut |
255 | |
258 | |
256 | sub push_write { |
259 | sub _drain_wbuf { |
257 | my ($self, $data) = @_; |
260 | my ($self) = @_; |
258 | |
|
|
259 | $self->{wbuf} .= $data; |
|
|
260 | |
261 | |
261 | unless ($self->{ww}) { |
262 | unless ($self->{ww}) { |
262 | Scalar::Util::weaken $self; |
263 | Scalar::Util::weaken $self; |
263 | my $cb = sub { |
264 | my $cb = sub { |
264 | my $len = syswrite $self->{fh}, $self->{wbuf}; |
265 | my $len = syswrite $self->{fh}, $self->{wbuf}; |
265 | |
266 | |
266 | if ($len > 0) { |
267 | if ($len > 0) { |
267 | substr $self->{wbuf}, 0, $len, ""; |
268 | substr $self->{wbuf}, 0, $len, ""; |
268 | |
|
|
269 | |
269 | |
270 | $self->{on_drain}($self) |
270 | $self->{on_drain}($self) |
271 | if $self->{low_water_mark} >= length $self->{wbuf} |
271 | if $self->{low_water_mark} >= length $self->{wbuf} |
272 | && $self->{on_drain}; |
272 | && $self->{on_drain}; |
273 | |
273 | |
… | |
… | |
279 | |
279 | |
280 | $self->{ww} = AnyEvent->io (fh => $self->{fh}, poll => "w", cb => $cb); |
280 | $self->{ww} = AnyEvent->io (fh => $self->{fh}, poll => "w", cb => $cb); |
281 | |
281 | |
282 | $cb->($self); |
282 | $cb->($self); |
283 | }; |
283 | }; |
|
|
284 | } |
|
|
285 | |
|
|
286 | sub push_write { |
|
|
287 | my $self = shift; |
|
|
288 | |
|
|
289 | if ($self->{filter_w}) { |
|
|
290 | $self->{filter_w}->(\$_[0]); |
|
|
291 | } else { |
|
|
292 | $self->{wbuf} .= $_[0]; |
|
|
293 | $self->_drain_wbuf; |
|
|
294 | } |
284 | } |
295 | } |
285 | |
296 | |
286 | ############################################################################# |
297 | ############################################################################# |
287 | |
298 | |
288 | =back |
299 | =back |
… | |
… | |
363 | |
374 | |
364 | =cut |
375 | =cut |
365 | |
376 | |
366 | sub _drain_rbuf { |
377 | sub _drain_rbuf { |
367 | my ($self) = @_; |
378 | my ($self) = @_; |
|
|
379 | |
|
|
380 | if ( |
|
|
381 | defined $self->{rbuf_max} |
|
|
382 | && $self->{rbuf_max} < length $self->{rbuf} |
|
|
383 | ) { |
|
|
384 | $! = &Errno::ENOSPC; return $self->error; |
|
|
385 | } |
368 | |
386 | |
369 | return if $self->{in_drain}; |
387 | return if $self->{in_drain}; |
370 | local $self->{in_drain} = 1; |
388 | local $self->{in_drain} = 1; |
371 | |
389 | |
372 | while (my $len = length $self->{rbuf}) { |
390 | while (my $len = length $self->{rbuf}) { |
… | |
… | |
400 | } |
418 | } |
401 | } |
419 | } |
402 | |
420 | |
403 | if ($self->{eof}) { |
421 | if ($self->{eof}) { |
404 | $self->_shutdown; |
422 | $self->_shutdown; |
405 | $self->{on_eof}($self); |
423 | $self->{on_eof}($self) |
|
|
424 | if $self->{on_eof}; |
406 | } |
425 | } |
407 | } |
426 | } |
408 | |
427 | |
409 | =item $handle->on_read ($cb) |
428 | =item $handle->on_read ($cb) |
410 | |
429 | |
… | |
… | |
576 | |
595 | |
577 | unless ($self->{rw} || $self->{eof}) { |
596 | unless ($self->{rw} || $self->{eof}) { |
578 | Scalar::Util::weaken $self; |
597 | Scalar::Util::weaken $self; |
579 | |
598 | |
580 | $self->{rw} = AnyEvent->io (fh => $self->{fh}, poll => "r", cb => sub { |
599 | $self->{rw} = AnyEvent->io (fh => $self->{fh}, poll => "r", cb => sub { |
|
|
600 | my $rbuf = $self->{filter_r} ? \my $buf : \$self->{rbuf}; |
581 | my $len = sysread $self->{fh}, $self->{rbuf}, $self->{read_size} || 8192, length $self->{rbuf}; |
601 | my $len = sysread $self->{fh}, $$rbuf, $self->{read_size} || 8192, length $$rbuf; |
582 | |
602 | |
583 | if ($len > 0) { |
603 | if ($len > 0) { |
584 | if (defined $self->{rbuf_max}) { |
604 | $self->{filter_r} |
585 | if ($self->{rbuf_max} < length $self->{rbuf}) { |
605 | ? $self->{filter_r}->($rbuf) |
586 | $! = &Errno::ENOSPC; return $self->error; |
606 | : $self->_drain_rbuf; |
587 | } |
|
|
588 | } |
|
|
589 | |
607 | |
590 | } elsif (defined $len) { |
608 | } elsif (defined $len) { |
|
|
609 | delete $self->{rw}; |
591 | $self->{eof} = 1; |
610 | $self->{eof} = 1; |
592 | delete $self->{rw}; |
611 | $self->_drain_rbuf; |
593 | |
612 | |
594 | } elsif ($! != EAGAIN && $! != EINTR) { |
613 | } elsif ($! != EAGAIN && $! != EINTR) { |
595 | return $self->error; |
614 | return $self->error; |
596 | } |
615 | } |
597 | |
|
|
598 | $self->_drain_rbuf; |
|
|
599 | }); |
616 | }); |
600 | } |
617 | } |
601 | } |
618 | } |
602 | |
619 | |
603 | =back |
620 | =back |