… | |
… | |
7 | use AnyEvent::Util qw(WSAEWOULDBLOCK); |
7 | use AnyEvent::Util qw(WSAEWOULDBLOCK); |
8 | use Scalar::Util (); |
8 | use Scalar::Util (); |
9 | use Carp (); |
9 | use Carp (); |
10 | use Fcntl (); |
10 | use Fcntl (); |
11 | use Errno qw(EAGAIN EINTR); |
11 | use Errno qw(EAGAIN EINTR); |
12 | use Time::HiRes qw(time); |
|
|
13 | |
12 | |
14 | =head1 NAME |
13 | =head1 NAME |
15 | |
14 | |
16 | AnyEvent::Handle - non-blocking I/O on file handles via AnyEvent |
15 | AnyEvent::Handle - non-blocking I/O on file handles via AnyEvent |
17 | |
16 | |
… | |
… | |
126 | =item timeout => $fractional_seconds |
125 | =item timeout => $fractional_seconds |
127 | |
126 | |
128 | If non-zero, then this enables an "inactivity" timeout: whenever this many |
127 | If non-zero, then this enables an "inactivity" timeout: whenever this many |
129 | seconds pass without a successful read or write on the underlying file |
128 | seconds pass without a successful read or write on the underlying file |
130 | handle, the C<on_timeout> callback will be invoked (and if that one is |
129 | handle, the C<on_timeout> callback will be invoked (and if that one is |
131 | missing, an C<ETIMEDOUT> errror will be raised). |
130 | missing, an C<ETIMEDOUT> error will be raised). |
132 | |
131 | |
133 | Note that timeout processing is also active when you currently do not have |
132 | Note that timeout processing is also active when you currently do not have |
134 | any outstanding read or write requests: If you plan to keep the connection |
133 | any outstanding read or write requests: If you plan to keep the connection |
135 | idle then you should disable the timout temporarily or ignore the timeout |
134 | idle then you should disable the timout temporarily or ignore the timeout |
136 | in the C<on_timeout> callback. |
135 | in the C<on_timeout> callback. |
… | |
… | |
156 | isn't finished). |
155 | isn't finished). |
157 | |
156 | |
158 | =item read_size => <bytes> |
157 | =item read_size => <bytes> |
159 | |
158 | |
160 | The default read block size (the amount of bytes this module will try to read |
159 | The default read block size (the amount of bytes this module will try to read |
161 | on each [loop iteration). Default: C<4096>. |
160 | during each (loop iteration). Default: C<8192>. |
162 | |
161 | |
163 | =item low_water_mark => <bytes> |
162 | =item low_water_mark => <bytes> |
164 | |
163 | |
165 | Sets the amount of bytes (default: C<0>) that make up an "empty" write |
164 | Sets the amount of bytes (default: C<0>) that make up an "empty" write |
166 | buffer: If the write reaches this size or gets even samller it is |
165 | buffer: If the write reaches this size or gets even samller it is |
… | |
… | |
228 | # $self->on_eof (delete $self->{on_eof} ) if $self->{on_eof}; # nop |
227 | # $self->on_eof (delete $self->{on_eof} ) if $self->{on_eof}; # nop |
229 | # $self->on_error (delete $self->{on_error}) if $self->{on_error}; # nop |
228 | # $self->on_error (delete $self->{on_error}) if $self->{on_error}; # nop |
230 | # $self->on_read (delete $self->{on_read} ) if $self->{on_read}; # nop |
229 | # $self->on_read (delete $self->{on_read} ) if $self->{on_read}; # nop |
231 | $self->on_drain (delete $self->{on_drain}) if $self->{on_drain}; |
230 | $self->on_drain (delete $self->{on_drain}) if $self->{on_drain}; |
232 | |
231 | |
233 | $self->{_activity} = time; |
232 | $self->{_activity} = AnyEvent->now; |
234 | $self->_timeout; |
233 | $self->_timeout; |
235 | |
234 | |
236 | $self->start_read; |
235 | $self->start_read; |
237 | |
236 | |
238 | $self |
237 | $self |
239 | } |
238 | } |
240 | |
239 | |
241 | sub _shutdown { |
240 | sub _shutdown { |
242 | my ($self) = @_; |
241 | my ($self) = @_; |
243 | |
242 | |
|
|
243 | delete $self->{_tw}; |
244 | delete $self->{_rw}; |
244 | delete $self->{_rw}; |
245 | delete $self->{_ww}; |
245 | delete $self->{_ww}; |
246 | delete $self->{fh}; |
246 | delete $self->{fh}; |
247 | } |
247 | } |
248 | |
248 | |
… | |
… | |
319 | # also check for time-outs |
319 | # also check for time-outs |
320 | sub _timeout { |
320 | sub _timeout { |
321 | my ($self) = @_; |
321 | my ($self) = @_; |
322 | |
322 | |
323 | if ($self->{timeout}) { |
323 | if ($self->{timeout}) { |
324 | my $NOW = time; |
324 | my $NOW = AnyEvent->now; |
325 | |
325 | |
326 | # when would the timeout trigger? |
326 | # when would the timeout trigger? |
327 | my $after = $self->{_activity} + $self->{timeout} - $NOW; |
327 | my $after = $self->{_activity} + $self->{timeout} - $NOW; |
328 | |
|
|
329 | warn "next to in $after\n";#d# |
|
|
330 | |
328 | |
331 | # now or in the past already? |
329 | # now or in the past already? |
332 | if ($after <= 0) { |
330 | if ($after <= 0) { |
333 | $self->{_activity} = $NOW; |
331 | $self->{_activity} = $NOW; |
334 | |
332 | |
335 | if ($self->{on_timeout}) { |
333 | if ($self->{on_timeout}) { |
336 | $self->{on_timeout}->($self); |
334 | $self->{on_timeout}($self); |
337 | } else { |
335 | } else { |
338 | $! = Errno::ETIMEDOUT; |
336 | $! = Errno::ETIMEDOUT; |
339 | $self->error; |
337 | $self->error; |
340 | } |
338 | } |
341 | |
339 | |
… | |
… | |
346 | $after = $self->{timeout}; |
344 | $after = $self->{timeout}; |
347 | } |
345 | } |
348 | |
346 | |
349 | Scalar::Util::weaken $self; |
347 | Scalar::Util::weaken $self; |
350 | |
348 | |
351 | warn "after $after\n";#d# |
|
|
352 | $self->{_tw} ||= AnyEvent->timer (after => $after, cb => sub { |
349 | $self->{_tw} ||= AnyEvent->timer (after => $after, cb => sub { |
353 | delete $self->{_tw}; |
350 | delete $self->{_tw}; |
354 | $self->_timeout; |
351 | $self->_timeout; |
355 | }); |
352 | }); |
356 | } else { |
353 | } else { |
… | |
… | |
410 | my $len = syswrite $self->{fh}, $self->{wbuf}; |
407 | my $len = syswrite $self->{fh}, $self->{wbuf}; |
411 | |
408 | |
412 | if ($len >= 0) { |
409 | if ($len >= 0) { |
413 | substr $self->{wbuf}, 0, $len, ""; |
410 | substr $self->{wbuf}, 0, $len, ""; |
414 | |
411 | |
415 | $self->{_activity} = time; |
412 | $self->{_activity} = AnyEvent->now; |
416 | |
413 | |
417 | $self->{on_drain}($self) |
414 | $self->{on_drain}($self) |
418 | if $self->{low_water_mark} >= length $self->{wbuf} |
415 | if $self->{low_water_mark} >= length $self->{wbuf} |
419 | && $self->{on_drain}; |
416 | && $self->{on_drain}; |
420 | |
417 | |
… | |
… | |
448 | @_ = ($WH{$type} or Carp::croak "unsupported type passed to AnyEvent::Handle::push_write") |
445 | @_ = ($WH{$type} or Carp::croak "unsupported type passed to AnyEvent::Handle::push_write") |
449 | ->($self, @_); |
446 | ->($self, @_); |
450 | } |
447 | } |
451 | |
448 | |
452 | if ($self->{filter_w}) { |
449 | if ($self->{filter_w}) { |
453 | $self->{filter_w}->($self, \$_[0]); |
450 | $self->{filter_w}($self, \$_[0]); |
454 | } else { |
451 | } else { |
455 | $self->{wbuf} .= $_[0]; |
452 | $self->{wbuf} .= $_[0]; |
456 | $self->_drain_wbuf; |
453 | $self->_drain_wbuf; |
457 | } |
454 | } |
458 | } |
455 | } |
… | |
… | |
664 | delete $self->{_rw}; |
661 | delete $self->{_rw}; |
665 | return; |
662 | return; |
666 | } |
663 | } |
667 | } |
664 | } |
668 | |
665 | |
669 | if ($self->{_eof}) { |
|
|
670 | $self->_shutdown; |
|
|
671 | $self->{on_eof}($self) |
666 | $self->{on_eof}($self) |
672 | if $self->{on_eof}; |
667 | if $self->{_eof} && $self->{on_eof}; |
673 | } |
|
|
674 | } |
668 | } |
675 | |
669 | |
676 | =item $handle->on_read ($cb) |
670 | =item $handle->on_read ($cb) |
677 | |
671 | |
678 | This replaces the currently set C<on_read> callback, or clears it (when |
672 | This replaces the currently set C<on_read> callback, or clears it (when |
… | |
… | |
1056 | $self->{_rw} = AnyEvent->io (fh => $self->{fh}, poll => "r", cb => sub { |
1050 | $self->{_rw} = AnyEvent->io (fh => $self->{fh}, poll => "r", cb => sub { |
1057 | my $rbuf = $self->{filter_r} ? \my $buf : \$self->{rbuf}; |
1051 | my $rbuf = $self->{filter_r} ? \my $buf : \$self->{rbuf}; |
1058 | my $len = sysread $self->{fh}, $$rbuf, $self->{read_size} || 8192, length $$rbuf; |
1052 | my $len = sysread $self->{fh}, $$rbuf, $self->{read_size} || 8192, length $$rbuf; |
1059 | |
1053 | |
1060 | if ($len > 0) { |
1054 | if ($len > 0) { |
1061 | $self->{_activity} = time; |
1055 | $self->{_activity} = AnyEvent->now; |
1062 | |
1056 | |
1063 | $self->{filter_r} |
1057 | $self->{filter_r} |
1064 | ? $self->{filter_r}->($self, $rbuf) |
1058 | ? $self->{filter_r}($self, $rbuf) |
1065 | : $self->_drain_rbuf; |
1059 | : $self->_drain_rbuf; |
1066 | |
1060 | |
1067 | } elsif (defined $len) { |
1061 | } elsif (defined $len) { |
1068 | delete $self->{_rw}; |
1062 | delete $self->{_rw}; |
1069 | delete $self->{_ww}; |
|
|
1070 | delete $self->{_tw}; |
|
|
1071 | $self->{_eof} = 1; |
1063 | $self->{_eof} = 1; |
1072 | $self->_drain_rbuf; |
1064 | $self->_drain_rbuf; |
1073 | |
1065 | |
1074 | } elsif ($! != EAGAIN && $! != EINTR && $! != WSAEWOULDBLOCK) { |
1066 | } elsif ($! != EAGAIN && $! != EINTR && $! != WSAEWOULDBLOCK) { |
1075 | return $self->error; |
1067 | return $self->error; |