… | |
… | |
2 | |
2 | |
3 | no warnings; |
3 | no warnings; |
4 | use strict; |
4 | use strict; |
5 | |
5 | |
6 | use AnyEvent (); |
6 | use AnyEvent (); |
7 | use AnyEvent::Util qw(WSAWOULDBLOCK); |
7 | use AnyEvent::Util qw(WSAEWOULDBLOCK); |
8 | use Scalar::Util (); |
8 | use Scalar::Util (); |
9 | use Carp (); |
9 | use Carp (); |
10 | use Fcntl (); |
10 | use Fcntl (); |
11 | use Errno qw/EAGAIN EINTR/; |
11 | use Errno qw(EAGAIN EINTR); |
|
|
12 | use Time::HiRes qw(time); |
12 | |
13 | |
13 | =head1 NAME |
14 | =head1 NAME |
14 | |
15 | |
15 | AnyEvent::Handle - non-blocking I/O on file handles via AnyEvent |
16 | AnyEvent::Handle - non-blocking I/O on file handles via AnyEvent |
16 | |
17 | |
… | |
… | |
91 | |
92 | |
92 | The object will not be in a usable state when this callback has been |
93 | The object will not be in a usable state when this callback has been |
93 | called. |
94 | called. |
94 | |
95 | |
95 | On callback entrance, the value of C<$!> contains the operating system |
96 | On callback entrance, the value of C<$!> contains the operating system |
96 | error (or C<ENOSPC>, C<EPIPE> or C<EBADMSG>). |
97 | error (or C<ENOSPC>, C<EPIPE>, C<ETIMEDOUT> or C<EBADMSG>). |
97 | |
98 | |
98 | The callback should throw an exception. If it returns, then |
99 | The callback should throw an exception. If it returns, then |
99 | AnyEvent::Handle will C<croak> for you. |
100 | AnyEvent::Handle will C<croak> for you. |
100 | |
101 | |
101 | While not mandatory, it is I<highly> recommended to set this callback, as |
102 | While not mandatory, it is I<highly> recommended to set this callback, as |
… | |
… | |
119 | |
120 | |
120 | This sets the callback that is called when the write buffer becomes empty |
121 | This sets the callback that is called when the write buffer becomes empty |
121 | (or when the callback is set and the buffer is empty already). |
122 | (or when the callback is set and the buffer is empty already). |
122 | |
123 | |
123 | To append to the write buffer, use the C<< ->push_write >> method. |
124 | To append to the write buffer, use the C<< ->push_write >> method. |
|
|
125 | |
|
|
126 | =item timeout => $fractional_seconds |
|
|
127 | |
|
|
128 | If non-zero, then this enables an "inactivity" timeout: whenever this many |
|
|
129 | seconds pass without a successful read or write on the underlying file |
|
|
130 | handle, the C<on_timeout> callback will be invoked (and if that one is |
|
|
131 | missing, an C<ETIMEDOUT> errror will be raised). |
|
|
132 | |
|
|
133 | Note that timeout processing is also active when you currently do not have |
|
|
134 | any outstanding read or write requests: If you plan to keep the connection |
|
|
135 | idle then you should disable the timout temporarily or ignore the timeout |
|
|
136 | in the C<on_timeout> callback. |
|
|
137 | |
|
|
138 | Zero (the default) disables this timeout. |
|
|
139 | |
|
|
140 | =item on_timeout => $cb->($handle) |
|
|
141 | |
|
|
142 | Called whenever the inactivity timeout passes. If you return from this |
|
|
143 | callback, then the timeout will be reset as if some activity had happened, |
|
|
144 | so this condition is not fatal in any way. |
124 | |
145 | |
125 | =item rbuf_max => <bytes> |
146 | =item rbuf_max => <bytes> |
126 | |
147 | |
127 | If defined, then a fatal error will be raised (with C<$!> set to C<ENOSPC>) |
148 | If defined, then a fatal error will be raised (with C<$!> set to C<ENOSPC>) |
128 | when the read buffer ever (strictly) exceeds this size. This is useful to |
149 | when the read buffer ever (strictly) exceeds this size. This is useful to |
… | |
… | |
202 | if ($self->{tls}) { |
223 | if ($self->{tls}) { |
203 | require Net::SSLeay; |
224 | require Net::SSLeay; |
204 | $self->starttls (delete $self->{tls}, delete $self->{tls_ctx}); |
225 | $self->starttls (delete $self->{tls}, delete $self->{tls_ctx}); |
205 | } |
226 | } |
206 | |
227 | |
207 | $self->on_eof (delete $self->{on_eof} ) if $self->{on_eof}; |
228 | # $self->on_eof (delete $self->{on_eof} ) if $self->{on_eof}; # nop |
208 | $self->on_error (delete $self->{on_error}) if $self->{on_error}; |
229 | # $self->on_error (delete $self->{on_error}) if $self->{on_error}; # nop |
|
|
230 | # $self->on_read (delete $self->{on_read} ) if $self->{on_read}; # nop |
209 | $self->on_drain (delete $self->{on_drain}) if $self->{on_drain}; |
231 | $self->on_drain (delete $self->{on_drain}) if $self->{on_drain}; |
210 | $self->on_read (delete $self->{on_read} ) if $self->{on_read}; |
232 | |
|
|
233 | $self->{_activity} = time; |
|
|
234 | $self->_timeout; |
211 | |
235 | |
212 | $self->start_read; |
236 | $self->start_read; |
213 | |
237 | |
214 | $self |
238 | $self |
215 | } |
239 | } |
… | |
… | |
262 | |
286 | |
263 | sub on_eof { |
287 | sub on_eof { |
264 | $_[0]{on_eof} = $_[1]; |
288 | $_[0]{on_eof} = $_[1]; |
265 | } |
289 | } |
266 | |
290 | |
|
|
291 | =item $handle->on_timeout ($cb) |
|
|
292 | |
|
|
293 | Replace the current C<on_timeout> callback, or disables the callback |
|
|
294 | (but not the timeout) if C<$cb> = C<undef>. See C<timeout> constructor |
|
|
295 | argument. |
|
|
296 | |
|
|
297 | =cut |
|
|
298 | |
|
|
299 | sub on_timeout { |
|
|
300 | $_[0]{on_timeout} = $_[1]; |
|
|
301 | } |
|
|
302 | |
|
|
303 | ############################################################################# |
|
|
304 | |
|
|
305 | =item $handle->timeout ($seconds) |
|
|
306 | |
|
|
307 | Configures (or disables) the inactivity timeout. |
|
|
308 | |
|
|
309 | =cut |
|
|
310 | |
|
|
311 | sub timeout { |
|
|
312 | my ($self, $timeout) = @_; |
|
|
313 | |
|
|
314 | $self->{timeout} = $timeout; |
|
|
315 | $self->_timeout; |
|
|
316 | } |
|
|
317 | |
|
|
318 | # reset the timeout watcher, as neccessary |
|
|
319 | # also check for time-outs |
|
|
320 | sub _timeout { |
|
|
321 | my ($self) = @_; |
|
|
322 | |
|
|
323 | if ($self->{timeout}) { |
|
|
324 | my $NOW = time; |
|
|
325 | |
|
|
326 | # when would the timeout trigger? |
|
|
327 | my $after = $self->{_activity} + $self->{timeout} - $NOW; |
|
|
328 | |
|
|
329 | warn "next to in $after\n";#d# |
|
|
330 | |
|
|
331 | # now or in the past already? |
|
|
332 | if ($after <= 0) { |
|
|
333 | $self->{_activity} = $NOW; |
|
|
334 | |
|
|
335 | if ($self->{on_timeout}) { |
|
|
336 | $self->{on_timeout}->($self); |
|
|
337 | } else { |
|
|
338 | $! = Errno::ETIMEDOUT; |
|
|
339 | $self->error; |
|
|
340 | } |
|
|
341 | |
|
|
342 | # callbakx could have changed timeout value, optimise |
|
|
343 | return unless $self->{timeout}; |
|
|
344 | |
|
|
345 | # calculate new after |
|
|
346 | $after = $self->{timeout}; |
|
|
347 | } |
|
|
348 | |
|
|
349 | Scalar::Util::weaken $self; |
|
|
350 | |
|
|
351 | warn "after $after\n";#d# |
|
|
352 | $self->{_tw} ||= AnyEvent->timer (after => $after, cb => sub { |
|
|
353 | delete $self->{_tw}; |
|
|
354 | $self->_timeout; |
|
|
355 | }); |
|
|
356 | } else { |
|
|
357 | delete $self->{_tw}; |
|
|
358 | } |
|
|
359 | } |
|
|
360 | |
267 | ############################################################################# |
361 | ############################################################################# |
268 | |
362 | |
269 | =back |
363 | =back |
270 | |
364 | |
271 | =head2 WRITE QUEUE |
365 | =head2 WRITE QUEUE |
… | |
… | |
316 | my $len = syswrite $self->{fh}, $self->{wbuf}; |
410 | my $len = syswrite $self->{fh}, $self->{wbuf}; |
317 | |
411 | |
318 | if ($len >= 0) { |
412 | if ($len >= 0) { |
319 | substr $self->{wbuf}, 0, $len, ""; |
413 | substr $self->{wbuf}, 0, $len, ""; |
320 | |
414 | |
|
|
415 | $self->{_activity} = time; |
|
|
416 | |
321 | $self->{on_drain}($self) |
417 | $self->{on_drain}($self) |
322 | if $self->{low_water_mark} >= length $self->{wbuf} |
418 | if $self->{low_water_mark} >= length $self->{wbuf} |
323 | && $self->{on_drain}; |
419 | && $self->{on_drain}; |
324 | |
420 | |
325 | delete $self->{_ww} unless length $self->{wbuf}; |
421 | delete $self->{_ww} unless length $self->{wbuf}; |
326 | } elsif ($! != EAGAIN && $! != EINTR && $! != WSAWOULDBLOCK) { |
422 | } elsif ($! != EAGAIN && $! != EINTR && $! != WSAEWOULDBLOCK) { |
327 | $self->error; |
423 | $self->error; |
328 | } |
424 | } |
329 | }; |
425 | }; |
330 | |
426 | |
331 | # try to write data immediately |
427 | # try to write data immediately |
… | |
… | |
960 | $self->{_rw} = AnyEvent->io (fh => $self->{fh}, poll => "r", cb => sub { |
1056 | $self->{_rw} = AnyEvent->io (fh => $self->{fh}, poll => "r", cb => sub { |
961 | my $rbuf = $self->{filter_r} ? \my $buf : \$self->{rbuf}; |
1057 | my $rbuf = $self->{filter_r} ? \my $buf : \$self->{rbuf}; |
962 | my $len = sysread $self->{fh}, $$rbuf, $self->{read_size} || 8192, length $$rbuf; |
1058 | my $len = sysread $self->{fh}, $$rbuf, $self->{read_size} || 8192, length $$rbuf; |
963 | |
1059 | |
964 | if ($len > 0) { |
1060 | if ($len > 0) { |
|
|
1061 | $self->{_activity} = time; |
|
|
1062 | |
965 | $self->{filter_r} |
1063 | $self->{filter_r} |
966 | ? $self->{filter_r}->($self, $rbuf) |
1064 | ? $self->{filter_r}->($self, $rbuf) |
967 | : $self->_drain_rbuf; |
1065 | : $self->_drain_rbuf; |
968 | |
1066 | |
969 | } elsif (defined $len) { |
1067 | } elsif (defined $len) { |
970 | delete $self->{_rw}; |
1068 | delete $self->{_rw}; |
|
|
1069 | delete $self->{_ww}; |
|
|
1070 | delete $self->{_tw}; |
971 | $self->{_eof} = 1; |
1071 | $self->{_eof} = 1; |
972 | $self->_drain_rbuf; |
1072 | $self->_drain_rbuf; |
973 | |
1073 | |
974 | } elsif ($! != EAGAIN && $! != EINTR && $! != &AnyEvent::Util::WSAWOULDBLOCK) { |
1074 | } elsif ($! != EAGAIN && $! != EINTR && $! != WSAEWOULDBLOCK) { |
975 | return $self->error; |
1075 | return $self->error; |
976 | } |
1076 | } |
977 | }); |
1077 | }); |
978 | } |
1078 | } |
979 | } |
1079 | } |