… | |
… | |
6 | use AnyEvent (); |
6 | use AnyEvent (); |
7 | use AnyEvent::Util qw(WSAEWOULDBLOCK); |
7 | use AnyEvent::Util qw(WSAEWOULDBLOCK); |
8 | use Scalar::Util (); |
8 | use Scalar::Util (); |
9 | use Carp (); |
9 | use Carp (); |
10 | use Fcntl (); |
10 | use Fcntl (); |
11 | use Errno qw/EAGAIN EINTR/; |
11 | use Errno qw(EAGAIN EINTR); |
12 | |
12 | |
13 | =head1 NAME |
13 | =head1 NAME |
14 | |
14 | |
15 | AnyEvent::Handle - non-blocking I/O on file handles via AnyEvent |
15 | AnyEvent::Handle - non-blocking I/O on file handles via AnyEvent |
16 | |
16 | |
… | |
… | |
91 | |
91 | |
92 | The object will not be in a usable state when this callback has been |
92 | The object will not be in a usable state when this callback has been |
93 | called. |
93 | called. |
94 | |
94 | |
95 | On callback entrance, the value of C<$!> contains the operating system |
95 | On callback entrance, the value of C<$!> contains the operating system |
96 | error (or C<ENOSPC>, C<EPIPE> or C<EBADMSG>). |
96 | error (or C<ENOSPC>, C<EPIPE>, C<ETIMEDOUT> or C<EBADMSG>). |
97 | |
97 | |
98 | The callback should throw an exception. If it returns, then |
98 | The callback should throw an exception. If it returns, then |
99 | AnyEvent::Handle will C<croak> for you. |
99 | AnyEvent::Handle will C<croak> for you. |
100 | |
100 | |
101 | While not mandatory, it is I<highly> recommended to set this callback, as |
101 | While not mandatory, it is I<highly> recommended to set this callback, as |
… | |
… | |
119 | |
119 | |
120 | This sets the callback that is called when the write buffer becomes empty |
120 | This sets the callback that is called when the write buffer becomes empty |
121 | (or when the callback is set and the buffer is empty already). |
121 | (or when the callback is set and the buffer is empty already). |
122 | |
122 | |
123 | To append to the write buffer, use the C<< ->push_write >> method. |
123 | To append to the write buffer, use the C<< ->push_write >> method. |
|
|
124 | |
|
|
125 | =item timeout => $fractional_seconds |
|
|
126 | |
|
|
127 | If non-zero, then this enables an "inactivity" timeout: whenever this many |
|
|
128 | seconds pass without a successful read or write on the underlying file |
|
|
129 | handle, the C<on_timeout> callback will be invoked (and if that one is |
|
|
130 | missing, an C<ETIMEDOUT> error will be raised). |
|
|
131 | |
|
|
132 | Note that timeout processing is also active when you currently do not have |
|
|
133 | any outstanding read or write requests: If you plan to keep the connection |
|
|
134 | idle then you should disable the timout temporarily or ignore the timeout |
|
|
135 | in the C<on_timeout> callback. |
|
|
136 | |
|
|
137 | Zero (the default) disables this timeout. |
|
|
138 | |
|
|
139 | =item on_timeout => $cb->($handle) |
|
|
140 | |
|
|
141 | Called whenever the inactivity timeout passes. If you return from this |
|
|
142 | callback, then the timeout will be reset as if some activity had happened, |
|
|
143 | so this condition is not fatal in any way. |
124 | |
144 | |
125 | =item rbuf_max => <bytes> |
145 | =item rbuf_max => <bytes> |
126 | |
146 | |
127 | If defined, then a fatal error will be raised (with C<$!> set to C<ENOSPC>) |
147 | If defined, then a fatal error will be raised (with C<$!> set to C<ENOSPC>) |
128 | when the read buffer ever (strictly) exceeds this size. This is useful to |
148 | when the read buffer ever (strictly) exceeds this size. This is useful to |
… | |
… | |
135 | isn't finished). |
155 | isn't finished). |
136 | |
156 | |
137 | =item read_size => <bytes> |
157 | =item read_size => <bytes> |
138 | |
158 | |
139 | The default read block size (the amount of bytes this module will try to read |
159 | The default read block size (the amount of bytes this module will try to read |
140 | on each [loop iteration). Default: C<4096>. |
160 | during each (loop iteration). Default: C<8192>. |
141 | |
161 | |
142 | =item low_water_mark => <bytes> |
162 | =item low_water_mark => <bytes> |
143 | |
163 | |
144 | Sets the amount of bytes (default: C<0>) that make up an "empty" write |
164 | Sets the amount of bytes (default: C<0>) that make up an "empty" write |
145 | buffer: If the write reaches this size or gets even samller it is |
165 | buffer: If the write reaches this size or gets even samller it is |
… | |
… | |
202 | if ($self->{tls}) { |
222 | if ($self->{tls}) { |
203 | require Net::SSLeay; |
223 | require Net::SSLeay; |
204 | $self->starttls (delete $self->{tls}, delete $self->{tls_ctx}); |
224 | $self->starttls (delete $self->{tls}, delete $self->{tls_ctx}); |
205 | } |
225 | } |
206 | |
226 | |
207 | $self->on_eof (delete $self->{on_eof} ) if $self->{on_eof}; |
227 | # $self->on_eof (delete $self->{on_eof} ) if $self->{on_eof}; # nop |
208 | $self->on_error (delete $self->{on_error}) if $self->{on_error}; |
228 | # $self->on_error (delete $self->{on_error}) if $self->{on_error}; # nop |
|
|
229 | # $self->on_read (delete $self->{on_read} ) if $self->{on_read}; # nop |
209 | $self->on_drain (delete $self->{on_drain}) if $self->{on_drain}; |
230 | $self->on_drain (delete $self->{on_drain}) if $self->{on_drain}; |
210 | $self->on_read (delete $self->{on_read} ) if $self->{on_read}; |
231 | |
|
|
232 | $self->{_activity} = AnyEvent->now; |
|
|
233 | $self->_timeout; |
211 | |
234 | |
212 | $self->start_read; |
235 | $self->start_read; |
213 | |
236 | |
214 | $self |
237 | $self |
215 | } |
238 | } |
216 | |
239 | |
217 | sub _shutdown { |
240 | sub _shutdown { |
218 | my ($self) = @_; |
241 | my ($self) = @_; |
219 | |
242 | |
|
|
243 | delete $self->{_tw}; |
220 | delete $self->{_rw}; |
244 | delete $self->{_rw}; |
221 | delete $self->{_ww}; |
245 | delete $self->{_ww}; |
222 | delete $self->{fh}; |
246 | delete $self->{fh}; |
223 | } |
247 | } |
224 | |
248 | |
… | |
… | |
262 | |
286 | |
263 | sub on_eof { |
287 | sub on_eof { |
264 | $_[0]{on_eof} = $_[1]; |
288 | $_[0]{on_eof} = $_[1]; |
265 | } |
289 | } |
266 | |
290 | |
|
|
291 | =item $handle->on_timeout ($cb) |
|
|
292 | |
|
|
293 | Replace the current C<on_timeout> callback, or disables the callback |
|
|
294 | (but not the timeout) if C<$cb> = C<undef>. See C<timeout> constructor |
|
|
295 | argument. |
|
|
296 | |
|
|
297 | =cut |
|
|
298 | |
|
|
299 | sub on_timeout { |
|
|
300 | $_[0]{on_timeout} = $_[1]; |
|
|
301 | } |
|
|
302 | |
|
|
303 | ############################################################################# |
|
|
304 | |
|
|
305 | =item $handle->timeout ($seconds) |
|
|
306 | |
|
|
307 | Configures (or disables) the inactivity timeout. |
|
|
308 | |
|
|
309 | =cut |
|
|
310 | |
|
|
311 | sub timeout { |
|
|
312 | my ($self, $timeout) = @_; |
|
|
313 | |
|
|
314 | $self->{timeout} = $timeout; |
|
|
315 | $self->_timeout; |
|
|
316 | } |
|
|
317 | |
|
|
318 | # reset the timeout watcher, as neccessary |
|
|
319 | # also check for time-outs |
|
|
320 | sub _timeout { |
|
|
321 | my ($self) = @_; |
|
|
322 | |
|
|
323 | if ($self->{timeout}) { |
|
|
324 | my $NOW = AnyEvent->now; |
|
|
325 | |
|
|
326 | # when would the timeout trigger? |
|
|
327 | my $after = $self->{_activity} + $self->{timeout} - $NOW; |
|
|
328 | |
|
|
329 | # now or in the past already? |
|
|
330 | if ($after <= 0) { |
|
|
331 | $self->{_activity} = $NOW; |
|
|
332 | |
|
|
333 | if ($self->{on_timeout}) { |
|
|
334 | $self->{on_timeout}->($self); |
|
|
335 | } else { |
|
|
336 | $! = Errno::ETIMEDOUT; |
|
|
337 | $self->error; |
|
|
338 | } |
|
|
339 | |
|
|
340 | # callbakx could have changed timeout value, optimise |
|
|
341 | return unless $self->{timeout}; |
|
|
342 | |
|
|
343 | # calculate new after |
|
|
344 | $after = $self->{timeout}; |
|
|
345 | } |
|
|
346 | |
|
|
347 | Scalar::Util::weaken $self; |
|
|
348 | |
|
|
349 | $self->{_tw} ||= AnyEvent->timer (after => $after, cb => sub { |
|
|
350 | delete $self->{_tw}; |
|
|
351 | $self->_timeout; |
|
|
352 | }); |
|
|
353 | } else { |
|
|
354 | delete $self->{_tw}; |
|
|
355 | } |
|
|
356 | } |
|
|
357 | |
267 | ############################################################################# |
358 | ############################################################################# |
268 | |
359 | |
269 | =back |
360 | =back |
270 | |
361 | |
271 | =head2 WRITE QUEUE |
362 | =head2 WRITE QUEUE |
… | |
… | |
315 | my $cb = sub { |
406 | my $cb = sub { |
316 | my $len = syswrite $self->{fh}, $self->{wbuf}; |
407 | my $len = syswrite $self->{fh}, $self->{wbuf}; |
317 | |
408 | |
318 | if ($len >= 0) { |
409 | if ($len >= 0) { |
319 | substr $self->{wbuf}, 0, $len, ""; |
410 | substr $self->{wbuf}, 0, $len, ""; |
|
|
411 | |
|
|
412 | $self->{_activity} = AnyEvent->now; |
320 | |
413 | |
321 | $self->{on_drain}($self) |
414 | $self->{on_drain}($self) |
322 | if $self->{low_water_mark} >= length $self->{wbuf} |
415 | if $self->{low_water_mark} >= length $self->{wbuf} |
323 | && $self->{on_drain}; |
416 | && $self->{on_drain}; |
324 | |
417 | |
… | |
… | |
960 | $self->{_rw} = AnyEvent->io (fh => $self->{fh}, poll => "r", cb => sub { |
1053 | $self->{_rw} = AnyEvent->io (fh => $self->{fh}, poll => "r", cb => sub { |
961 | my $rbuf = $self->{filter_r} ? \my $buf : \$self->{rbuf}; |
1054 | my $rbuf = $self->{filter_r} ? \my $buf : \$self->{rbuf}; |
962 | my $len = sysread $self->{fh}, $$rbuf, $self->{read_size} || 8192, length $$rbuf; |
1055 | my $len = sysread $self->{fh}, $$rbuf, $self->{read_size} || 8192, length $$rbuf; |
963 | |
1056 | |
964 | if ($len > 0) { |
1057 | if ($len > 0) { |
|
|
1058 | $self->{_activity} = AnyEvent->now; |
|
|
1059 | |
965 | $self->{filter_r} |
1060 | $self->{filter_r} |
966 | ? $self->{filter_r}->($self, $rbuf) |
1061 | ? $self->{filter_r}->($self, $rbuf) |
967 | : $self->_drain_rbuf; |
1062 | : $self->_drain_rbuf; |
968 | |
1063 | |
969 | } elsif (defined $len) { |
1064 | } elsif (defined $len) { |