ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent/lib/AnyEvent/Handle.pm
(Generate patch)

Comparing AnyEvent/lib/AnyEvent/Handle.pm (file contents):
Revision 1.9 by root, Fri May 2 16:07:46 2008 UTC vs.
Revision 1.15 by root, Sat May 17 21:34:15 2008 UTC

12 12
13=head1 NAME 13=head1 NAME
14 14
15AnyEvent::Handle - non-blocking I/O on filehandles via AnyEvent 15AnyEvent::Handle - non-blocking I/O on filehandles via AnyEvent
16 16
17=cut 17This module is experimental.
18 18
19=cut
20
19our $VERSION = '0.02'; 21our $VERSION = '0.04';
20 22
21=head1 SYNOPSIS 23=head1 SYNOPSIS
22 24
23 use AnyEvent; 25 use AnyEvent;
24 use AnyEvent::Handle; 26 use AnyEvent::Handle;
43 $cv->wait; 45 $cv->wait;
44 46
45=head1 DESCRIPTION 47=head1 DESCRIPTION
46 48
47This module is a helper module to make it easier to do event-based I/O on 49This module is a helper module to make it easier to do event-based I/O on
48filehandles (and sockets, see L<AnyEvent::Socket> for an easy way to make 50filehandles. For utility functions for doing non-blocking connects and accepts
49non-blocking resolves and connects). 51on sockets see L<AnyEvent::Util>.
50 52
51In the following, when the documentation refers to of "bytes" then this 53In the following, when the documentation refers to of "bytes" then this
52means characters. As sysread and syswrite are used for all I/O, their 54means characters. As sysread and syswrite are used for all I/O, their
53treatment of characters applies to this module as well. 55treatment of characters applies to this module as well.
54 56
70The filehandle this L<AnyEvent::Handle> object will operate on. 72The filehandle this L<AnyEvent::Handle> object will operate on.
71 73
72NOTE: The filehandle will be set to non-blocking (using 74NOTE: The filehandle will be set to non-blocking (using
73AnyEvent::Util::fh_nonblocking). 75AnyEvent::Util::fh_nonblocking).
74 76
75=item on_error => $cb->($self) [MANDATORY] 77=item on_eof => $cb->($self) [MANDATORY]
76 78
79Set the callback to be called on EOF.
80
81=item on_error => $cb->($self)
82
77This is the fatal error callback, that is called when a fatal error ocurs, 83This is the fatal error callback, that is called when, well, a fatal error
78such as not being able to resolve the hostname, failure to connect or a 84ocurs, such as not being able to resolve the hostname, failure to connect
79read error. 85or a read error.
80 86
81The object will not be in a usable state when this callback has been 87The object will not be in a usable state when this callback has been
82called. 88called.
83 89
84On callback entrance, the value of C<$!> contains the opertaing system 90On callback entrance, the value of C<$!> contains the operating system
85error (or C<ENOSPC> or C<EPIPE>). 91error (or C<ENOSPC> or C<EPIPE>).
86 92
87=item on_eof => $cb->($self) [MANDATORY] 93While not mandatory, it is I<highly> recommended to set this callback, as
88 94you will not be notified of errors otherwise. The default simply calls
89Set the callback to be called on EOF. 95die.
90 96
91=item on_read => $cb->($self) 97=item on_read => $cb->($self)
92 98
93This sets the default read callback, which is called when data arrives 99This sets the default read callback, which is called when data arrives
94and no read request is in the queue. If the read callback is C<undef> 100and no read request is in the queue.
95or has never been set, than AnyEvent::Handle will cease reading from the
96filehandle.
97 101
98To access (and remove data from) the read buffer, use the C<< ->rbuf >> 102To access (and remove data from) the read buffer, use the C<< ->rbuf >>
99method or acces sthe C<$self->{rbuf}> member directly. 103method or acces sthe C<$self->{rbuf}> member directly.
100 104
101When an EOF condition is detected then AnyEvent::Handle will first try to 105When an EOF condition is detected then AnyEvent::Handle will first try to
144 148
145 $self->{fh} or Carp::croak "mandatory argument fh is missing"; 149 $self->{fh} or Carp::croak "mandatory argument fh is missing";
146 150
147 AnyEvent::Util::fh_nonblocking $self->{fh}, 1; 151 AnyEvent::Util::fh_nonblocking $self->{fh}, 1;
148 152
149 $self->on_error ((delete $self->{on_error}) or Carp::croak "mandatory argument on_error is missing");
150 $self->on_eof ((delete $self->{on_eof} ) or Carp::croak "mandatory argument on_eof is missing"); 153 $self->on_eof ((delete $self->{on_eof} ) or Carp::croak "mandatory argument on_eof is missing");
151 154
155 $self->on_error (delete $self->{on_error}) if $self->{on_error};
152 $self->on_drain (delete $self->{on_drain}) if $self->{on_drain}; 156 $self->on_drain (delete $self->{on_drain}) if $self->{on_drain};
153 $self->on_read (delete $self->{on_read} ) if $self->{on_read}; 157 $self->on_read (delete $self->{on_read} ) if $self->{on_read};
158
159 $self->start_read;
154 160
155 $self 161 $self
156} 162}
157 163
158sub _shutdown { 164sub _shutdown {
169 { 175 {
170 local $!; 176 local $!;
171 $self->_shutdown; 177 $self->_shutdown;
172 } 178 }
173 179
180 if ($self->{on_error}) {
174 $self->{on_error}($self); 181 $self->{on_error}($self);
182 } else {
183 die "AnyEvent::Handle uncaught fatal error: $!";
184 }
175} 185}
176 186
177=item $fh = $handle->fh 187=item $fh = $handle->fh
178 188
179This method returns the filehandle of the L<AnyEvent::Handle> object. 189This method returns the filehandle of the L<AnyEvent::Handle> object.
349 ... 359 ...
350 }); 360 });
351 361
352=over 4 362=over 4
353 363
364=cut
365
354sub _drain_rbuf { 366sub _drain_rbuf {
355 my ($self) = @_; 367 my ($self) = @_;
356 368
357 return if exists $self->{in_drain}; 369 return if $self->{in_drain};
358 local $self->{in_drain} = 1; 370 local $self->{in_drain} = 1;
359 371
360 while (my $len = length $self->{rbuf}) { 372 while (my $len = length $self->{rbuf}) {
361 no strict 'refs'; 373 no strict 'refs';
362 if (@{ $self->{queue} }) { 374 if (my $cb = shift @{ $self->{queue} }) {
363 if ($self->{queue}[0]($self)) { 375 if (!$cb->($self)) {
364 shift @{ $self->{queue} };
365 } elsif ($self->{eof}) { 376 if ($self->{eof}) {
366 # no progress can be made (not enough data and no data forthcoming) 377 # no progress can be made (not enough data and no data forthcoming)
367 $! = &Errno::EPIPE; return $self->error; 378 $! = &Errno::EPIPE; return $self->error;
368 } else { 379 }
380
381 unshift @{ $self->{queue} }, $cb;
369 return; 382 return;
370 } 383 }
371 } elsif ($self->{on_read}) { 384 } elsif ($self->{on_read}) {
372 $self->{on_read}($self); 385 $self->{on_read}($self);
373 386
403 416
404sub on_read { 417sub on_read {
405 my ($self, $cb) = @_; 418 my ($self, $cb) = @_;
406 419
407 $self->{on_read} = $cb; 420 $self->{on_read} = $cb;
408
409 unless ($self->{rw} || $self->{eof}) {
410 Scalar::Util::weaken $self;
411
412 $self->{rw} = AnyEvent->io (fh => $self->{fh}, poll => "r", cb => sub {
413 my $len = sysread $self->{fh}, $self->{rbuf}, $self->{read_size} || 8192, length $self->{rbuf};
414
415 if ($len > 0) {
416 if (exists $self->{rbuf_max}) {
417 if ($self->{rbuf_max} < length $self->{rbuf}) {
418 $! = &Errno::ENOSPC; return $self->error;
419 }
420 }
421
422 } elsif (defined $len) {
423 $self->{eof} = 1;
424 delete $self->{rw};
425
426 } elsif ($! != EAGAIN && $! != EINTR) {
427 return $self->error;
428 }
429
430 $self->_drain_rbuf;
431 });
432 }
433} 421}
434 422
435=item $handle->rbuf 423=item $handle->rbuf
436 424
437Returns the read buffer (as a modifiable lvalue). 425Returns the read buffer (as a modifiable lvalue).
495these C<$len> bytes will be passed to the callback. 483these C<$len> bytes will be passed to the callback.
496 484
497=cut 485=cut
498 486
499sub _read_chunk($$) { 487sub _read_chunk($$) {
500 my ($len, $cb) = @_; 488 my ($self, $len, $cb) = @_;
501 489
502 sub { 490 sub {
503 $len <= length $_[0]{rbuf} or return; 491 $len <= length $_[0]{rbuf} or return;
504 $cb->($_[0], substr $_[0]{rbuf}, 0, $len, ""); 492 $cb->($_[0], substr $_[0]{rbuf}, 0, $len, "");
505 1 493 1
506 } 494 }
507} 495}
508 496
509sub push_read_chunk { 497sub push_read_chunk {
510 my ($self, $len, $cb) = @_;
511
512 $self->push_read (_read_chunk $len, $cb); 498 $_[0]->push_read (&_read_chunk);
513} 499}
514 500
515 501
516sub unshift_read_chunk { 502sub unshift_read_chunk {
517 my ($self, $len, $cb) = @_;
518
519 $self->unshift_read (_read_chunk $len, $cb); 503 $_[0]->unshift_read (&_read_chunk);
520} 504}
521 505
522=item $handle->push_read_line ([$eol, ]$cb->($self, $line, $eol)) 506=item $handle->push_read_line ([$eol, ]$cb->($self, $line, $eol))
523 507
524=item $handle->unshift_read_line ([$eol, ]$cb->($self, $line, $eol)) 508=item $handle->unshift_read_line ([$eol, ]$cb->($self, $line, $eol))
544not marked by the end of line marker. 528not marked by the end of line marker.
545 529
546=cut 530=cut
547 531
548sub _read_line($$) { 532sub _read_line($$) {
533 my $self = shift;
549 my $cb = pop; 534 my $cb = pop;
550 my $eol = @_ ? shift : qr|(\015?\012)|; 535 my $eol = @_ ? shift : qr|(\015?\012)|;
551 my $pos; 536 my $pos;
552 537
553 $eol = qr|(\Q$eol\E)| unless ref $eol; 538 $eol = quotemeta $eol unless ref $eol;
554 $eol = qr|^(.*?)($eol)|; 539 $eol = qr|^(.*?)($eol)|s;
555 540
556 sub { 541 sub {
557 $_[0]{rbuf} =~ s/$eol// or return; 542 $_[0]{rbuf} =~ s/$eol// or return;
558 543
559 $cb->($1, $2); 544 $cb->($_[0], $1, $2);
560 1 545 1
561 } 546 }
562} 547}
563 548
564sub push_read_line { 549sub push_read_line {
565 my $self = shift;
566
567 $self->push_read (&_read_line); 550 $_[0]->push_read (&_read_line);
568} 551}
569 552
570sub unshift_read_line { 553sub unshift_read_line {
571 my $self = shift;
572
573 $self->unshift_read (&_read_line); 554 $_[0]->unshift_read (&_read_line);
555}
556
557=item $handle->stop_read
558
559=item $handle->start_read
560
561In rare cases you actually do not want to read anything form the
562socket. In this case you can call C<stop_read>. Neither C<on_read> no
563any queued callbacks will be executed then. To start readign again, call
564C<start_read>.
565
566=cut
567
568sub stop_read {
569 my ($self) = @_;
570
571 delete $self->{rw};
572}
573
574sub start_read {
575 my ($self) = @_;
576
577 unless ($self->{rw} || $self->{eof}) {
578 Scalar::Util::weaken $self;
579
580 $self->{rw} = AnyEvent->io (fh => $self->{fh}, poll => "r", cb => sub {
581 my $len = sysread $self->{fh}, $self->{rbuf}, $self->{read_size} || 8192, length $self->{rbuf};
582
583 if ($len > 0) {
584 if (defined $self->{rbuf_max}) {
585 if ($self->{rbuf_max} < length $self->{rbuf}) {
586 $! = &Errno::ENOSPC; return $self->error;
587 }
588 }
589
590 } elsif (defined $len) {
591 $self->{eof} = 1;
592 delete $self->{rw};
593
594 } elsif ($! != EAGAIN && $! != EINTR) {
595 return $self->error;
596 }
597
598 $self->_drain_rbuf;
599 });
600 }
574} 601}
575 602
576=back 603=back
577 604
578=head1 AUTHOR 605=head1 AUTHOR

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines