ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent/lib/AnyEvent/Handle.pm
(Generate patch)

Comparing AnyEvent/lib/AnyEvent/Handle.pm (file contents):
Revision 1.12 by elmex, Thu May 15 09:03:43 2008 UTC vs.
Revision 1.17 by root, Sat May 24 04:17:45 2008 UTC

12 12
13=head1 NAME 13=head1 NAME
14 14
15AnyEvent::Handle - non-blocking I/O on filehandles via AnyEvent 15AnyEvent::Handle - non-blocking I/O on filehandles via AnyEvent
16 16
17=cut 17This module is experimental.
18 18
19=cut
20
19our $VERSION = '0.02'; 21our $VERSION = '0.04';
20 22
21=head1 SYNOPSIS 23=head1 SYNOPSIS
22 24
23 use AnyEvent; 25 use AnyEvent;
24 use AnyEvent::Handle; 26 use AnyEvent::Handle;
43 $cv->wait; 45 $cv->wait;
44 46
45=head1 DESCRIPTION 47=head1 DESCRIPTION
46 48
47This module is a helper module to make it easier to do event-based I/O on 49This module is a helper module to make it easier to do event-based I/O on
48filehandles (and sockets, see L<AnyEvent::Socket> for an easy way to make 50filehandles. For utility functions for doing non-blocking connects and accepts
49non-blocking resolves and connects). 51on sockets see L<AnyEvent::Util>.
50 52
51In the following, when the documentation refers to of "bytes" then this 53In the following, when the documentation refers to of "bytes" then this
52means characters. As sysread and syswrite are used for all I/O, their 54means characters. As sysread and syswrite are used for all I/O, their
53treatment of characters applies to this module as well. 55treatment of characters applies to this module as well.
54 56
70The filehandle this L<AnyEvent::Handle> object will operate on. 72The filehandle this L<AnyEvent::Handle> object will operate on.
71 73
72NOTE: The filehandle will be set to non-blocking (using 74NOTE: The filehandle will be set to non-blocking (using
73AnyEvent::Util::fh_nonblocking). 75AnyEvent::Util::fh_nonblocking).
74 76
75=item on_eof => $cb->($self) [MANDATORY] 77=item on_eof => $cb->($self)
76 78
77Set the callback to be called on EOF. 79Set the callback to be called on EOF.
80
81While not mandatory, it is highly recommended to set an eof callback,
82otherwise you might end up with a closed socket while you are still
83waiting for data.
78 84
79=item on_error => $cb->($self) 85=item on_error => $cb->($self)
80 86
81This is the fatal error callback, that is called when, well, a fatal error 87This is the fatal error callback, that is called when, well, a fatal error
82ocurs, such as not being able to resolve the hostname, failure to connect 88ocurs, such as not being able to resolve the hostname, failure to connect
146 152
147 $self->{fh} or Carp::croak "mandatory argument fh is missing"; 153 $self->{fh} or Carp::croak "mandatory argument fh is missing";
148 154
149 AnyEvent::Util::fh_nonblocking $self->{fh}, 1; 155 AnyEvent::Util::fh_nonblocking $self->{fh}, 1;
150 156
151 $self->on_eof ((delete $self->{on_eof} ) or Carp::croak "mandatory argument on_eof is missing"); 157 $self->on_eof (delete $self->{on_eof} ) if $self->{on_eof};
152
153 $self->on_error (delete $self->{on_error}) if $self->{on_error}; 158 $self->on_error (delete $self->{on_error}) if $self->{on_error};
154 $self->on_drain (delete $self->{on_drain}) if $self->{on_drain}; 159 $self->on_drain (delete $self->{on_drain}) if $self->{on_drain};
155 $self->on_read (delete $self->{on_read} ) if $self->{on_read}; 160 $self->on_read (delete $self->{on_read} ) if $self->{on_read};
156 161
157 $self->start_read; 162 $self->start_read;
249want (only limited by the available memory), as C<AnyEvent::Handle> 254want (only limited by the available memory), as C<AnyEvent::Handle>
250buffers it independently of the kernel. 255buffers it independently of the kernel.
251 256
252=cut 257=cut
253 258
254sub push_write { 259sub _drain_wbuf {
255 my ($self, $data) = @_; 260 my ($self) = @_;
256
257 $self->{wbuf} .= $data;
258 261
259 unless ($self->{ww}) { 262 unless ($self->{ww}) {
260 Scalar::Util::weaken $self; 263 Scalar::Util::weaken $self;
261 my $cb = sub { 264 my $cb = sub {
262 my $len = syswrite $self->{fh}, $self->{wbuf}; 265 my $len = syswrite $self->{fh}, $self->{wbuf};
263 266
264 if ($len > 0) { 267 if ($len > 0) {
265 substr $self->{wbuf}, 0, $len, ""; 268 substr $self->{wbuf}, 0, $len, "";
266
267 269
268 $self->{on_drain}($self) 270 $self->{on_drain}($self)
269 if $self->{low_water_mark} >= length $self->{wbuf} 271 if $self->{low_water_mark} >= length $self->{wbuf}
270 && $self->{on_drain}; 272 && $self->{on_drain};
271 273
277 279
278 $self->{ww} = AnyEvent->io (fh => $self->{fh}, poll => "w", cb => $cb); 280 $self->{ww} = AnyEvent->io (fh => $self->{fh}, poll => "w", cb => $cb);
279 281
280 $cb->($self); 282 $cb->($self);
281 }; 283 };
284}
285
286sub push_write {
287 my $self = shift;
288
289 if ($self->{filter_w}) {
290 $self->{filter_w}->(\$_[0]);
291 } else {
292 $self->{wbuf} .= $_[0];
293 $self->_drain_wbuf;
294 }
282} 295}
283 296
284############################################################################# 297#############################################################################
285 298
286=back 299=back
361 374
362=cut 375=cut
363 376
364sub _drain_rbuf { 377sub _drain_rbuf {
365 my ($self) = @_; 378 my ($self) = @_;
379
380 if (
381 defined $self->{rbuf_max}
382 && $self->{rbuf_max} < length $self->{rbuf}
383 ) {
384 $! = &Errno::ENOSPC; return $self->error;
385 }
366 386
367 return if $self->{in_drain}; 387 return if $self->{in_drain};
368 local $self->{in_drain} = 1; 388 local $self->{in_drain} = 1;
369 389
370 while (my $len = length $self->{rbuf}) { 390 while (my $len = length $self->{rbuf}) {
398 } 418 }
399 } 419 }
400 420
401 if ($self->{eof}) { 421 if ($self->{eof}) {
402 $self->_shutdown; 422 $self->_shutdown;
403 $self->{on_eof}($self); 423 $self->{on_eof}($self)
424 if $self->{on_eof};
404 } 425 }
405} 426}
406 427
407=item $handle->on_read ($cb) 428=item $handle->on_read ($cb)
408 429
531 my $self = shift; 552 my $self = shift;
532 my $cb = pop; 553 my $cb = pop;
533 my $eol = @_ ? shift : qr|(\015?\012)|; 554 my $eol = @_ ? shift : qr|(\015?\012)|;
534 my $pos; 555 my $pos;
535 556
536 $eol = qr|(\Q$eol\E)| unless ref $eol; 557 $eol = quotemeta $eol unless ref $eol;
537 $eol = qr|^(.*?)($eol)|; 558 $eol = qr|^(.*?)($eol)|s;
538 559
539 sub { 560 sub {
540 $_[0]{rbuf} =~ s/$eol// or return; 561 $_[0]{rbuf} =~ s/$eol// or return;
541 562
542 $cb->($_[0], $1, $2); 563 $cb->($_[0], $1, $2);
574 595
575 unless ($self->{rw} || $self->{eof}) { 596 unless ($self->{rw} || $self->{eof}) {
576 Scalar::Util::weaken $self; 597 Scalar::Util::weaken $self;
577 598
578 $self->{rw} = AnyEvent->io (fh => $self->{fh}, poll => "r", cb => sub { 599 $self->{rw} = AnyEvent->io (fh => $self->{fh}, poll => "r", cb => sub {
600 my $rbuf = $self->{filter_r} ? \my $buf : \$self->{rbuf};
579 my $len = sysread $self->{fh}, $self->{rbuf}, $self->{read_size} || 8192, length $self->{rbuf}; 601 my $len = sysread $self->{fh}, $$rbuf, $self->{read_size} || 8192, length $$rbuf;
580 602
581 if ($len > 0) { 603 if ($len > 0) {
582 if (defined $self->{rbuf_max}) { 604 $self->{filter_r}
583 if ($self->{rbuf_max} < length $self->{rbuf}) { 605 ? $self->{filter_r}->($rbuf)
584 $! = &Errno::ENOSPC; return $self->error; 606 : $self->_drain_rbuf;
585 }
586 }
587 607
588 } elsif (defined $len) { 608 } elsif (defined $len) {
609 delete $self->{rw};
589 $self->{eof} = 1; 610 $self->{eof} = 1;
590 delete $self->{rw}; 611 $self->_drain_rbuf;
591 612
592 } elsif ($! != EAGAIN && $! != EINTR) { 613 } elsif ($! != EAGAIN && $! != EINTR) {
593 return $self->error; 614 return $self->error;
594 } 615 }
595
596 $self->_drain_rbuf;
597 }); 616 });
598 } 617 }
599} 618}
600 619
601=back 620=back

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines