… | |
… | |
12 | |
12 | |
13 | =head1 NAME |
13 | =head1 NAME |
14 | |
14 | |
15 | AnyEvent::Handle - non-blocking I/O on filehandles via AnyEvent |
15 | AnyEvent::Handle - non-blocking I/O on filehandles via AnyEvent |
16 | |
16 | |
17 | =cut |
17 | This module is experimental. |
18 | |
18 | |
|
|
19 | =cut |
|
|
20 | |
19 | our $VERSION = '0.02'; |
21 | our $VERSION = '0.04'; |
20 | |
22 | |
21 | =head1 SYNOPSIS |
23 | =head1 SYNOPSIS |
22 | |
24 | |
23 | use AnyEvent; |
25 | use AnyEvent; |
24 | use AnyEvent::Handle; |
26 | use AnyEvent::Handle; |
… | |
… | |
70 | The filehandle this L<AnyEvent::Handle> object will operate on. |
72 | The filehandle this L<AnyEvent::Handle> object will operate on. |
71 | |
73 | |
72 | NOTE: The filehandle will be set to non-blocking (using |
74 | NOTE: The filehandle will be set to non-blocking (using |
73 | AnyEvent::Util::fh_nonblocking). |
75 | AnyEvent::Util::fh_nonblocking). |
74 | |
76 | |
75 | =item on_eof => $cb->($self) [MANDATORY] |
77 | =item on_eof => $cb->($self) |
76 | |
78 | |
77 | Set the callback to be called on EOF. |
79 | Set the callback to be called on EOF. |
|
|
80 | |
|
|
81 | While not mandatory, it is highly recommended to set an eof callback, |
|
|
82 | otherwise you might end up with a closed socket while you are still |
|
|
83 | waiting for data. |
78 | |
84 | |
79 | =item on_error => $cb->($self) |
85 | =item on_error => $cb->($self) |
80 | |
86 | |
81 | This is the fatal error callback, that is called when, well, a fatal error |
87 | This is the fatal error callback, that is called when, well, a fatal error |
82 | ocurs, such as not being able to resolve the hostname, failure to connect |
88 | ocurs, such as not being able to resolve the hostname, failure to connect |
… | |
… | |
146 | |
152 | |
147 | $self->{fh} or Carp::croak "mandatory argument fh is missing"; |
153 | $self->{fh} or Carp::croak "mandatory argument fh is missing"; |
148 | |
154 | |
149 | AnyEvent::Util::fh_nonblocking $self->{fh}, 1; |
155 | AnyEvent::Util::fh_nonblocking $self->{fh}, 1; |
150 | |
156 | |
151 | $self->on_eof ((delete $self->{on_eof} ) or Carp::croak "mandatory argument on_eof is missing"); |
157 | $self->on_eof (delete $self->{on_eof} ) if $self->{on_eof}; |
152 | |
|
|
153 | $self->on_error (delete $self->{on_error}) if $self->{on_error}; |
158 | $self->on_error (delete $self->{on_error}) if $self->{on_error}; |
154 | $self->on_drain (delete $self->{on_drain}) if $self->{on_drain}; |
159 | $self->on_drain (delete $self->{on_drain}) if $self->{on_drain}; |
155 | $self->on_read (delete $self->{on_read} ) if $self->{on_read}; |
160 | $self->on_read (delete $self->{on_read} ) if $self->{on_read}; |
156 | |
161 | |
157 | $self->start_read; |
162 | $self->start_read; |
… | |
… | |
249 | want (only limited by the available memory), as C<AnyEvent::Handle> |
254 | want (only limited by the available memory), as C<AnyEvent::Handle> |
250 | buffers it independently of the kernel. |
255 | buffers it independently of the kernel. |
251 | |
256 | |
252 | =cut |
257 | =cut |
253 | |
258 | |
254 | sub push_write { |
259 | sub _drain_wbuf { |
255 | my ($self, $data) = @_; |
260 | my ($self) = @_; |
256 | |
|
|
257 | $self->{wbuf} .= $data; |
|
|
258 | |
261 | |
259 | unless ($self->{ww}) { |
262 | unless ($self->{ww}) { |
260 | Scalar::Util::weaken $self; |
263 | Scalar::Util::weaken $self; |
261 | my $cb = sub { |
264 | my $cb = sub { |
262 | my $len = syswrite $self->{fh}, $self->{wbuf}; |
265 | my $len = syswrite $self->{fh}, $self->{wbuf}; |
263 | |
266 | |
264 | if ($len > 0) { |
267 | if ($len > 0) { |
265 | substr $self->{wbuf}, 0, $len, ""; |
268 | substr $self->{wbuf}, 0, $len, ""; |
266 | |
|
|
267 | |
269 | |
268 | $self->{on_drain}($self) |
270 | $self->{on_drain}($self) |
269 | if $self->{low_water_mark} >= length $self->{wbuf} |
271 | if $self->{low_water_mark} >= length $self->{wbuf} |
270 | && $self->{on_drain}; |
272 | && $self->{on_drain}; |
271 | |
273 | |
… | |
… | |
277 | |
279 | |
278 | $self->{ww} = AnyEvent->io (fh => $self->{fh}, poll => "w", cb => $cb); |
280 | $self->{ww} = AnyEvent->io (fh => $self->{fh}, poll => "w", cb => $cb); |
279 | |
281 | |
280 | $cb->($self); |
282 | $cb->($self); |
281 | }; |
283 | }; |
|
|
284 | } |
|
|
285 | |
|
|
286 | sub push_write { |
|
|
287 | my $self = shift; |
|
|
288 | |
|
|
289 | if ($self->{filter_w}) { |
|
|
290 | $self->{filter_w}->(\$_[0]); |
|
|
291 | } else { |
|
|
292 | $self->{wbuf} .= $_[0]; |
|
|
293 | $self->_drain_wbuf; |
|
|
294 | } |
282 | } |
295 | } |
283 | |
296 | |
284 | ############################################################################# |
297 | ############################################################################# |
285 | |
298 | |
286 | =back |
299 | =back |
… | |
… | |
361 | |
374 | |
362 | =cut |
375 | =cut |
363 | |
376 | |
364 | sub _drain_rbuf { |
377 | sub _drain_rbuf { |
365 | my ($self) = @_; |
378 | my ($self) = @_; |
|
|
379 | |
|
|
380 | if ( |
|
|
381 | defined $self->{rbuf_max} |
|
|
382 | && $self->{rbuf_max} < length $self->{rbuf} |
|
|
383 | ) { |
|
|
384 | $! = &Errno::ENOSPC; return $self->error; |
|
|
385 | } |
366 | |
386 | |
367 | return if $self->{in_drain}; |
387 | return if $self->{in_drain}; |
368 | local $self->{in_drain} = 1; |
388 | local $self->{in_drain} = 1; |
369 | |
389 | |
370 | while (my $len = length $self->{rbuf}) { |
390 | while (my $len = length $self->{rbuf}) { |
… | |
… | |
398 | } |
418 | } |
399 | } |
419 | } |
400 | |
420 | |
401 | if ($self->{eof}) { |
421 | if ($self->{eof}) { |
402 | $self->_shutdown; |
422 | $self->_shutdown; |
403 | $self->{on_eof}($self); |
423 | $self->{on_eof}($self) |
|
|
424 | if $self->{on_eof}; |
404 | } |
425 | } |
405 | } |
426 | } |
406 | |
427 | |
407 | =item $handle->on_read ($cb) |
428 | =item $handle->on_read ($cb) |
408 | |
429 | |
… | |
… | |
574 | |
595 | |
575 | unless ($self->{rw} || $self->{eof}) { |
596 | unless ($self->{rw} || $self->{eof}) { |
576 | Scalar::Util::weaken $self; |
597 | Scalar::Util::weaken $self; |
577 | |
598 | |
578 | $self->{rw} = AnyEvent->io (fh => $self->{fh}, poll => "r", cb => sub { |
599 | $self->{rw} = AnyEvent->io (fh => $self->{fh}, poll => "r", cb => sub { |
|
|
600 | my $rbuf = $self->{filter_r} ? \my $buf : \$self->{rbuf}; |
579 | my $len = sysread $self->{fh}, $self->{rbuf}, $self->{read_size} || 8192, length $self->{rbuf}; |
601 | my $len = sysread $self->{fh}, $$rbuf, $self->{read_size} || 8192, length $$rbuf; |
580 | |
602 | |
581 | if ($len > 0) { |
603 | if ($len > 0) { |
582 | if (defined $self->{rbuf_max}) { |
604 | $self->{filter_r} |
583 | if ($self->{rbuf_max} < length $self->{rbuf}) { |
605 | ? $self->{filter_r}->($rbuf) |
584 | $! = &Errno::ENOSPC; return $self->error; |
606 | : $self->_drain_rbuf; |
585 | } |
|
|
586 | } |
|
|
587 | |
607 | |
588 | } elsif (defined $len) { |
608 | } elsif (defined $len) { |
|
|
609 | delete $self->{rw}; |
589 | $self->{eof} = 1; |
610 | $self->{eof} = 1; |
590 | delete $self->{rw}; |
611 | $self->_drain_rbuf; |
591 | |
612 | |
592 | } elsif ($! != EAGAIN && $! != EINTR) { |
613 | } elsif ($! != EAGAIN && $! != EINTR) { |
593 | return $self->error; |
614 | return $self->error; |
594 | } |
615 | } |
595 | |
|
|
596 | $self->_drain_rbuf; |
|
|
597 | }); |
616 | }); |
598 | } |
617 | } |
599 | } |
618 | } |
600 | |
619 | |
601 | =back |
620 | =back |