ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent/lib/AnyEvent/Handle.pm
(Generate patch)

Comparing AnyEvent/lib/AnyEvent/Handle.pm (file contents):
Revision 1.43 by root, Wed May 28 23:57:38 2008 UTC vs.
Revision 1.48 by root, Thu May 29 00:27:06 2008 UTC

7use AnyEvent::Util qw(WSAEWOULDBLOCK); 7use AnyEvent::Util qw(WSAEWOULDBLOCK);
8use Scalar::Util (); 8use Scalar::Util ();
9use Carp (); 9use Carp ();
10use Fcntl (); 10use Fcntl ();
11use Errno qw(EAGAIN EINTR); 11use Errno qw(EAGAIN EINTR);
12use Time::HiRes qw(time);
13 12
14=head1 NAME 13=head1 NAME
15 14
16AnyEvent::Handle - non-blocking I/O on file handles via AnyEvent 15AnyEvent::Handle - non-blocking I/O on file handles via AnyEvent
17 16
126=item timeout => $fractional_seconds 125=item timeout => $fractional_seconds
127 126
128If non-zero, then this enables an "inactivity" timeout: whenever this many 127If non-zero, then this enables an "inactivity" timeout: whenever this many
129seconds pass without a successful read or write on the underlying file 128seconds pass without a successful read or write on the underlying file
130handle, the C<on_timeout> callback will be invoked (and if that one is 129handle, the C<on_timeout> callback will be invoked (and if that one is
131missing, an C<ETIMEDOUT> errror will be raised). 130missing, an C<ETIMEDOUT> error will be raised).
132 131
133Note that timeout processing is also active when you currently do not have 132Note that timeout processing is also active when you currently do not have
134any outstanding read or write requests: If you plan to keep the connection 133any outstanding read or write requests: If you plan to keep the connection
135idle then you should disable the timout temporarily or ignore the timeout 134idle then you should disable the timout temporarily or ignore the timeout
136in the C<on_timeout> callback. 135in the C<on_timeout> callback.
156isn't finished). 155isn't finished).
157 156
158=item read_size => <bytes> 157=item read_size => <bytes>
159 158
160The default read block size (the amount of bytes this module will try to read 159The default read block size (the amount of bytes this module will try to read
161on each [loop iteration). Default: C<4096>. 160during each (loop iteration). Default: C<8192>.
162 161
163=item low_water_mark => <bytes> 162=item low_water_mark => <bytes>
164 163
165Sets the amount of bytes (default: C<0>) that make up an "empty" write 164Sets the amount of bytes (default: C<0>) that make up an "empty" write
166buffer: If the write reaches this size or gets even samller it is 165buffer: If the write reaches this size or gets even samller it is
228# $self->on_eof (delete $self->{on_eof} ) if $self->{on_eof}; # nop 227# $self->on_eof (delete $self->{on_eof} ) if $self->{on_eof}; # nop
229# $self->on_error (delete $self->{on_error}) if $self->{on_error}; # nop 228# $self->on_error (delete $self->{on_error}) if $self->{on_error}; # nop
230# $self->on_read (delete $self->{on_read} ) if $self->{on_read}; # nop 229# $self->on_read (delete $self->{on_read} ) if $self->{on_read}; # nop
231 $self->on_drain (delete $self->{on_drain}) if $self->{on_drain}; 230 $self->on_drain (delete $self->{on_drain}) if $self->{on_drain};
232 231
233 $self->{_activity} = time; 232 $self->{_activity} = AnyEvent->now;
234 $self->_timeout; 233 $self->_timeout;
235 234
236 $self->start_read; 235 $self->start_read;
237 236
238 $self 237 $self
239} 238}
240 239
241sub _shutdown { 240sub _shutdown {
242 my ($self) = @_; 241 my ($self) = @_;
243 242
243 delete $self->{_tw};
244 delete $self->{_rw}; 244 delete $self->{_rw};
245 delete $self->{_ww}; 245 delete $self->{_ww};
246 delete $self->{fh}; 246 delete $self->{fh};
247} 247}
248 248
319# also check for time-outs 319# also check for time-outs
320sub _timeout { 320sub _timeout {
321 my ($self) = @_; 321 my ($self) = @_;
322 322
323 if ($self->{timeout}) { 323 if ($self->{timeout}) {
324 my $NOW = time; 324 my $NOW = AnyEvent->now;
325 325
326 # when would the timeout trigger? 326 # when would the timeout trigger?
327 my $after = $self->{_activity} + $self->{timeout} - $NOW; 327 my $after = $self->{_activity} + $self->{timeout} - $NOW;
328
329 warn "next to in $after\n";#d#
330 328
331 # now or in the past already? 329 # now or in the past already?
332 if ($after <= 0) { 330 if ($after <= 0) {
333 $self->{_activity} = $NOW; 331 $self->{_activity} = $NOW;
334 332
335 if ($self->{on_timeout}) { 333 if ($self->{on_timeout}) {
336 $self->{on_timeout}->($self); 334 $self->{on_timeout}($self);
337 } else { 335 } else {
338 $! = Errno::ETIMEDOUT; 336 $! = Errno::ETIMEDOUT;
339 $self->error; 337 $self->error;
340 } 338 }
341 339
346 $after = $self->{timeout}; 344 $after = $self->{timeout};
347 } 345 }
348 346
349 Scalar::Util::weaken $self; 347 Scalar::Util::weaken $self;
350 348
351 warn "after $after\n";#d#
352 $self->{_tw} ||= AnyEvent->timer (after => $after, cb => sub { 349 $self->{_tw} ||= AnyEvent->timer (after => $after, cb => sub {
353 delete $self->{_tw}; 350 delete $self->{_tw};
354 $self->_timeout; 351 $self->_timeout;
355 }); 352 });
356 } else { 353 } else {
410 my $len = syswrite $self->{fh}, $self->{wbuf}; 407 my $len = syswrite $self->{fh}, $self->{wbuf};
411 408
412 if ($len >= 0) { 409 if ($len >= 0) {
413 substr $self->{wbuf}, 0, $len, ""; 410 substr $self->{wbuf}, 0, $len, "";
414 411
415 $self->{_activity} = time; 412 $self->{_activity} = AnyEvent->now;
416 413
417 $self->{on_drain}($self) 414 $self->{on_drain}($self)
418 if $self->{low_water_mark} >= length $self->{wbuf} 415 if $self->{low_water_mark} >= length $self->{wbuf}
419 && $self->{on_drain}; 416 && $self->{on_drain};
420 417
448 @_ = ($WH{$type} or Carp::croak "unsupported type passed to AnyEvent::Handle::push_write") 445 @_ = ($WH{$type} or Carp::croak "unsupported type passed to AnyEvent::Handle::push_write")
449 ->($self, @_); 446 ->($self, @_);
450 } 447 }
451 448
452 if ($self->{filter_w}) { 449 if ($self->{filter_w}) {
453 $self->{filter_w}->($self, \$_[0]); 450 $self->{filter_w}($self, \$_[0]);
454 } else { 451 } else {
455 $self->{wbuf} .= $_[0]; 452 $self->{wbuf} .= $_[0];
456 $self->_drain_wbuf; 453 $self->_drain_wbuf;
457 } 454 }
458} 455}
664 delete $self->{_rw}; 661 delete $self->{_rw};
665 return; 662 return;
666 } 663 }
667 } 664 }
668 665
669 if ($self->{_eof}) {
670 $self->_shutdown;
671 $self->{on_eof}($self) 666 $self->{on_eof}($self)
672 if $self->{on_eof}; 667 if $self->{_eof} && $self->{on_eof};
673 }
674} 668}
675 669
676=item $handle->on_read ($cb) 670=item $handle->on_read ($cb)
677 671
678This replaces the currently set C<on_read> callback, or clears it (when 672This replaces the currently set C<on_read> callback, or clears it (when
1056 $self->{_rw} = AnyEvent->io (fh => $self->{fh}, poll => "r", cb => sub { 1050 $self->{_rw} = AnyEvent->io (fh => $self->{fh}, poll => "r", cb => sub {
1057 my $rbuf = $self->{filter_r} ? \my $buf : \$self->{rbuf}; 1051 my $rbuf = $self->{filter_r} ? \my $buf : \$self->{rbuf};
1058 my $len = sysread $self->{fh}, $$rbuf, $self->{read_size} || 8192, length $$rbuf; 1052 my $len = sysread $self->{fh}, $$rbuf, $self->{read_size} || 8192, length $$rbuf;
1059 1053
1060 if ($len > 0) { 1054 if ($len > 0) {
1061 $self->{_activity} = time; 1055 $self->{_activity} = AnyEvent->now;
1062 1056
1063 $self->{filter_r} 1057 $self->{filter_r}
1064 ? $self->{filter_r}->($self, $rbuf) 1058 ? $self->{filter_r}($self, $rbuf)
1065 : $self->_drain_rbuf; 1059 : $self->_drain_rbuf;
1066 1060
1067 } elsif (defined $len) { 1061 } elsif (defined $len) {
1068 delete $self->{_rw}; 1062 delete $self->{_rw};
1069 delete $self->{_ww};
1070 delete $self->{_tw};
1071 $self->{_eof} = 1; 1063 $self->{_eof} = 1;
1072 $self->_drain_rbuf; 1064 $self->_drain_rbuf;
1073 1065
1074 } elsif ($! != EAGAIN && $! != EINTR && $! != WSAEWOULDBLOCK) { 1066 } elsif ($! != EAGAIN && $! != EINTR && $! != WSAEWOULDBLOCK) {
1075 return $self->error; 1067 return $self->error;

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines