ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent/lib/AnyEvent/Handle.pm
(Generate patch)

Comparing AnyEvent/lib/AnyEvent/Handle.pm (file contents):
Revision 1.42 by root, Tue May 27 06:23:15 2008 UTC vs.
Revision 1.43 by root, Wed May 28 23:57:38 2008 UTC

6use AnyEvent (); 6use AnyEvent ();
7use AnyEvent::Util qw(WSAEWOULDBLOCK); 7use AnyEvent::Util qw(WSAEWOULDBLOCK);
8use Scalar::Util (); 8use Scalar::Util ();
9use Carp (); 9use Carp ();
10use Fcntl (); 10use Fcntl ();
11use Errno qw/EAGAIN EINTR/; 11use Errno qw(EAGAIN EINTR);
12use Time::HiRes qw(time);
12 13
13=head1 NAME 14=head1 NAME
14 15
15AnyEvent::Handle - non-blocking I/O on file handles via AnyEvent 16AnyEvent::Handle - non-blocking I/O on file handles via AnyEvent
16 17
91 92
92The object will not be in a usable state when this callback has been 93The object will not be in a usable state when this callback has been
93called. 94called.
94 95
95On callback entrance, the value of C<$!> contains the operating system 96On callback entrance, the value of C<$!> contains the operating system
96error (or C<ENOSPC>, C<EPIPE> or C<EBADMSG>). 97error (or C<ENOSPC>, C<EPIPE>, C<ETIMEDOUT> or C<EBADMSG>).
97 98
98The callback should throw an exception. If it returns, then 99The callback should throw an exception. If it returns, then
99AnyEvent::Handle will C<croak> for you. 100AnyEvent::Handle will C<croak> for you.
100 101
101While not mandatory, it is I<highly> recommended to set this callback, as 102While not mandatory, it is I<highly> recommended to set this callback, as
119 120
120This sets the callback that is called when the write buffer becomes empty 121This sets the callback that is called when the write buffer becomes empty
121(or when the callback is set and the buffer is empty already). 122(or when the callback is set and the buffer is empty already).
122 123
123To append to the write buffer, use the C<< ->push_write >> method. 124To append to the write buffer, use the C<< ->push_write >> method.
125
126=item timeout => $fractional_seconds
127
128If non-zero, then this enables an "inactivity" timeout: whenever this many
129seconds pass without a successful read or write on the underlying file
130handle, the C<on_timeout> callback will be invoked (and if that one is
131missing, an C<ETIMEDOUT> errror will be raised).
132
133Note that timeout processing is also active when you currently do not have
134any outstanding read or write requests: If you plan to keep the connection
135idle then you should disable the timout temporarily or ignore the timeout
136in the C<on_timeout> callback.
137
138Zero (the default) disables this timeout.
139
140=item on_timeout => $cb->($handle)
141
142Called whenever the inactivity timeout passes. If you return from this
143callback, then the timeout will be reset as if some activity had happened,
144so this condition is not fatal in any way.
124 145
125=item rbuf_max => <bytes> 146=item rbuf_max => <bytes>
126 147
127If defined, then a fatal error will be raised (with C<$!> set to C<ENOSPC>) 148If defined, then a fatal error will be raised (with C<$!> set to C<ENOSPC>)
128when the read buffer ever (strictly) exceeds this size. This is useful to 149when the read buffer ever (strictly) exceeds this size. This is useful to
202 if ($self->{tls}) { 223 if ($self->{tls}) {
203 require Net::SSLeay; 224 require Net::SSLeay;
204 $self->starttls (delete $self->{tls}, delete $self->{tls_ctx}); 225 $self->starttls (delete $self->{tls}, delete $self->{tls_ctx});
205 } 226 }
206 227
207 $self->on_eof (delete $self->{on_eof} ) if $self->{on_eof}; 228# $self->on_eof (delete $self->{on_eof} ) if $self->{on_eof}; # nop
208 $self->on_error (delete $self->{on_error}) if $self->{on_error}; 229# $self->on_error (delete $self->{on_error}) if $self->{on_error}; # nop
230# $self->on_read (delete $self->{on_read} ) if $self->{on_read}; # nop
209 $self->on_drain (delete $self->{on_drain}) if $self->{on_drain}; 231 $self->on_drain (delete $self->{on_drain}) if $self->{on_drain};
210 $self->on_read (delete $self->{on_read} ) if $self->{on_read}; 232
233 $self->{_activity} = time;
234 $self->_timeout;
211 235
212 $self->start_read; 236 $self->start_read;
213 237
214 $self 238 $self
215} 239}
262 286
263sub on_eof { 287sub on_eof {
264 $_[0]{on_eof} = $_[1]; 288 $_[0]{on_eof} = $_[1];
265} 289}
266 290
291=item $handle->on_timeout ($cb)
292
293Replace the current C<on_timeout> callback, or disables the callback
294(but not the timeout) if C<$cb> = C<undef>. See C<timeout> constructor
295argument.
296
297=cut
298
299sub on_timeout {
300 $_[0]{on_timeout} = $_[1];
301}
302
303#############################################################################
304
305=item $handle->timeout ($seconds)
306
307Configures (or disables) the inactivity timeout.
308
309=cut
310
311sub timeout {
312 my ($self, $timeout) = @_;
313
314 $self->{timeout} = $timeout;
315 $self->_timeout;
316}
317
318# reset the timeout watcher, as neccessary
319# also check for time-outs
320sub _timeout {
321 my ($self) = @_;
322
323 if ($self->{timeout}) {
324 my $NOW = time;
325
326 # when would the timeout trigger?
327 my $after = $self->{_activity} + $self->{timeout} - $NOW;
328
329 warn "next to in $after\n";#d#
330
331 # now or in the past already?
332 if ($after <= 0) {
333 $self->{_activity} = $NOW;
334
335 if ($self->{on_timeout}) {
336 $self->{on_timeout}->($self);
337 } else {
338 $! = Errno::ETIMEDOUT;
339 $self->error;
340 }
341
342 # callbakx could have changed timeout value, optimise
343 return unless $self->{timeout};
344
345 # calculate new after
346 $after = $self->{timeout};
347 }
348
349 Scalar::Util::weaken $self;
350
351 warn "after $after\n";#d#
352 $self->{_tw} ||= AnyEvent->timer (after => $after, cb => sub {
353 delete $self->{_tw};
354 $self->_timeout;
355 });
356 } else {
357 delete $self->{_tw};
358 }
359}
360
267############################################################################# 361#############################################################################
268 362
269=back 363=back
270 364
271=head2 WRITE QUEUE 365=head2 WRITE QUEUE
315 my $cb = sub { 409 my $cb = sub {
316 my $len = syswrite $self->{fh}, $self->{wbuf}; 410 my $len = syswrite $self->{fh}, $self->{wbuf};
317 411
318 if ($len >= 0) { 412 if ($len >= 0) {
319 substr $self->{wbuf}, 0, $len, ""; 413 substr $self->{wbuf}, 0, $len, "";
414
415 $self->{_activity} = time;
320 416
321 $self->{on_drain}($self) 417 $self->{on_drain}($self)
322 if $self->{low_water_mark} >= length $self->{wbuf} 418 if $self->{low_water_mark} >= length $self->{wbuf}
323 && $self->{on_drain}; 419 && $self->{on_drain};
324 420
960 $self->{_rw} = AnyEvent->io (fh => $self->{fh}, poll => "r", cb => sub { 1056 $self->{_rw} = AnyEvent->io (fh => $self->{fh}, poll => "r", cb => sub {
961 my $rbuf = $self->{filter_r} ? \my $buf : \$self->{rbuf}; 1057 my $rbuf = $self->{filter_r} ? \my $buf : \$self->{rbuf};
962 my $len = sysread $self->{fh}, $$rbuf, $self->{read_size} || 8192, length $$rbuf; 1058 my $len = sysread $self->{fh}, $$rbuf, $self->{read_size} || 8192, length $$rbuf;
963 1059
964 if ($len > 0) { 1060 if ($len > 0) {
1061 $self->{_activity} = time;
1062
965 $self->{filter_r} 1063 $self->{filter_r}
966 ? $self->{filter_r}->($self, $rbuf) 1064 ? $self->{filter_r}->($self, $rbuf)
967 : $self->_drain_rbuf; 1065 : $self->_drain_rbuf;
968 1066
969 } elsif (defined $len) { 1067 } elsif (defined $len) {
970 delete $self->{_rw}; 1068 delete $self->{_rw};
1069 delete $self->{_ww};
1070 delete $self->{_tw};
971 $self->{_eof} = 1; 1071 $self->{_eof} = 1;
972 $self->_drain_rbuf; 1072 $self->_drain_rbuf;
973 1073
974 } elsif ($! != EAGAIN && $! != EINTR && $! != WSAEWOULDBLOCK) { 1074 } elsif ($! != EAGAIN && $! != EINTR && $! != WSAEWOULDBLOCK) {
975 return $self->error; 1075 return $self->error;

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines