ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent/lib/AnyEvent/Handle.pm
(Generate patch)

Comparing AnyEvent/lib/AnyEvent/Handle.pm (file contents):
Revision 1.9 by root, Fri May 2 16:07:46 2008 UTC vs.
Revision 1.16 by root, Fri May 23 05:16:57 2008 UTC

12 12
13=head1 NAME 13=head1 NAME
14 14
15AnyEvent::Handle - non-blocking I/O on filehandles via AnyEvent 15AnyEvent::Handle - non-blocking I/O on filehandles via AnyEvent
16 16
17=cut 17This module is experimental.
18 18
19=cut
20
19our $VERSION = '0.02'; 21our $VERSION = '0.04';
20 22
21=head1 SYNOPSIS 23=head1 SYNOPSIS
22 24
23 use AnyEvent; 25 use AnyEvent;
24 use AnyEvent::Handle; 26 use AnyEvent::Handle;
43 $cv->wait; 45 $cv->wait;
44 46
45=head1 DESCRIPTION 47=head1 DESCRIPTION
46 48
47This module is a helper module to make it easier to do event-based I/O on 49This module is a helper module to make it easier to do event-based I/O on
48filehandles (and sockets, see L<AnyEvent::Socket> for an easy way to make 50filehandles. For utility functions for doing non-blocking connects and accepts
49non-blocking resolves and connects). 51on sockets see L<AnyEvent::Util>.
50 52
51In the following, when the documentation refers to of "bytes" then this 53In the following, when the documentation refers to of "bytes" then this
52means characters. As sysread and syswrite are used for all I/O, their 54means characters. As sysread and syswrite are used for all I/O, their
53treatment of characters applies to this module as well. 55treatment of characters applies to this module as well.
54 56
70The filehandle this L<AnyEvent::Handle> object will operate on. 72The filehandle this L<AnyEvent::Handle> object will operate on.
71 73
72NOTE: The filehandle will be set to non-blocking (using 74NOTE: The filehandle will be set to non-blocking (using
73AnyEvent::Util::fh_nonblocking). 75AnyEvent::Util::fh_nonblocking).
74 76
77=item on_eof => $cb->($self)
78
79Set the callback to be called on EOF.
80
81While not mandatory, it is highly recommended to set an eof callback,
82otherwise you might end up with a closed socket while you are still
83waiting for data.
84
75=item on_error => $cb->($self) [MANDATORY] 85=item on_error => $cb->($self)
76 86
77This is the fatal error callback, that is called when a fatal error ocurs, 87This is the fatal error callback, that is called when, well, a fatal error
78such as not being able to resolve the hostname, failure to connect or a 88ocurs, such as not being able to resolve the hostname, failure to connect
79read error. 89or a read error.
80 90
81The object will not be in a usable state when this callback has been 91The object will not be in a usable state when this callback has been
82called. 92called.
83 93
84On callback entrance, the value of C<$!> contains the opertaing system 94On callback entrance, the value of C<$!> contains the operating system
85error (or C<ENOSPC> or C<EPIPE>). 95error (or C<ENOSPC> or C<EPIPE>).
86 96
87=item on_eof => $cb->($self) [MANDATORY] 97While not mandatory, it is I<highly> recommended to set this callback, as
88 98you will not be notified of errors otherwise. The default simply calls
89Set the callback to be called on EOF. 99die.
90 100
91=item on_read => $cb->($self) 101=item on_read => $cb->($self)
92 102
93This sets the default read callback, which is called when data arrives 103This sets the default read callback, which is called when data arrives
94and no read request is in the queue. If the read callback is C<undef> 104and no read request is in the queue.
95or has never been set, than AnyEvent::Handle will cease reading from the
96filehandle.
97 105
98To access (and remove data from) the read buffer, use the C<< ->rbuf >> 106To access (and remove data from) the read buffer, use the C<< ->rbuf >>
99method or acces sthe C<$self->{rbuf}> member directly. 107method or acces sthe C<$self->{rbuf}> member directly.
100 108
101When an EOF condition is detected then AnyEvent::Handle will first try to 109When an EOF condition is detected then AnyEvent::Handle will first try to
144 152
145 $self->{fh} or Carp::croak "mandatory argument fh is missing"; 153 $self->{fh} or Carp::croak "mandatory argument fh is missing";
146 154
147 AnyEvent::Util::fh_nonblocking $self->{fh}, 1; 155 AnyEvent::Util::fh_nonblocking $self->{fh}, 1;
148 156
149 $self->on_error ((delete $self->{on_error}) or Carp::croak "mandatory argument on_error is missing"); 157 $self->on_eof (delete $self->{on_eof} ) if $self->{on_eof};
150 $self->on_eof ((delete $self->{on_eof} ) or Carp::croak "mandatory argument on_eof is missing"); 158 $self->on_error (delete $self->{on_error}) if $self->{on_error};
151
152 $self->on_drain (delete $self->{on_drain}) if $self->{on_drain}; 159 $self->on_drain (delete $self->{on_drain}) if $self->{on_drain};
153 $self->on_read (delete $self->{on_read} ) if $self->{on_read}; 160 $self->on_read (delete $self->{on_read} ) if $self->{on_read};
161
162 $self->start_read;
154 163
155 $self 164 $self
156} 165}
157 166
158sub _shutdown { 167sub _shutdown {
169 { 178 {
170 local $!; 179 local $!;
171 $self->_shutdown; 180 $self->_shutdown;
172 } 181 }
173 182
183 if ($self->{on_error}) {
174 $self->{on_error}($self); 184 $self->{on_error}($self);
185 } else {
186 die "AnyEvent::Handle uncaught fatal error: $!";
187 }
175} 188}
176 189
177=item $fh = $handle->fh 190=item $fh = $handle->fh
178 191
179This method returns the filehandle of the L<AnyEvent::Handle> object. 192This method returns the filehandle of the L<AnyEvent::Handle> object.
349 ... 362 ...
350 }); 363 });
351 364
352=over 4 365=over 4
353 366
367=cut
368
354sub _drain_rbuf { 369sub _drain_rbuf {
355 my ($self) = @_; 370 my ($self) = @_;
356 371
357 return if exists $self->{in_drain}; 372 return if $self->{in_drain};
358 local $self->{in_drain} = 1; 373 local $self->{in_drain} = 1;
359 374
360 while (my $len = length $self->{rbuf}) { 375 while (my $len = length $self->{rbuf}) {
361 no strict 'refs'; 376 no strict 'refs';
362 if (@{ $self->{queue} }) { 377 if (my $cb = shift @{ $self->{queue} }) {
363 if ($self->{queue}[0]($self)) { 378 if (!$cb->($self)) {
364 shift @{ $self->{queue} };
365 } elsif ($self->{eof}) { 379 if ($self->{eof}) {
366 # no progress can be made (not enough data and no data forthcoming) 380 # no progress can be made (not enough data and no data forthcoming)
367 $! = &Errno::EPIPE; return $self->error; 381 $! = &Errno::EPIPE; return $self->error;
368 } else { 382 }
383
384 unshift @{ $self->{queue} }, $cb;
369 return; 385 return;
370 } 386 }
371 } elsif ($self->{on_read}) { 387 } elsif ($self->{on_read}) {
372 $self->{on_read}($self); 388 $self->{on_read}($self);
373 389
387 } 403 }
388 } 404 }
389 405
390 if ($self->{eof}) { 406 if ($self->{eof}) {
391 $self->_shutdown; 407 $self->_shutdown;
392 $self->{on_eof}($self); 408 $self->{on_eof}($self)
409 if $self->{on_eof};
393 } 410 }
394} 411}
395 412
396=item $handle->on_read ($cb) 413=item $handle->on_read ($cb)
397 414
403 420
404sub on_read { 421sub on_read {
405 my ($self, $cb) = @_; 422 my ($self, $cb) = @_;
406 423
407 $self->{on_read} = $cb; 424 $self->{on_read} = $cb;
408
409 unless ($self->{rw} || $self->{eof}) {
410 Scalar::Util::weaken $self;
411
412 $self->{rw} = AnyEvent->io (fh => $self->{fh}, poll => "r", cb => sub {
413 my $len = sysread $self->{fh}, $self->{rbuf}, $self->{read_size} || 8192, length $self->{rbuf};
414
415 if ($len > 0) {
416 if (exists $self->{rbuf_max}) {
417 if ($self->{rbuf_max} < length $self->{rbuf}) {
418 $! = &Errno::ENOSPC; return $self->error;
419 }
420 }
421
422 } elsif (defined $len) {
423 $self->{eof} = 1;
424 delete $self->{rw};
425
426 } elsif ($! != EAGAIN && $! != EINTR) {
427 return $self->error;
428 }
429
430 $self->_drain_rbuf;
431 });
432 }
433} 425}
434 426
435=item $handle->rbuf 427=item $handle->rbuf
436 428
437Returns the read buffer (as a modifiable lvalue). 429Returns the read buffer (as a modifiable lvalue).
495these C<$len> bytes will be passed to the callback. 487these C<$len> bytes will be passed to the callback.
496 488
497=cut 489=cut
498 490
499sub _read_chunk($$) { 491sub _read_chunk($$) {
500 my ($len, $cb) = @_; 492 my ($self, $len, $cb) = @_;
501 493
502 sub { 494 sub {
503 $len <= length $_[0]{rbuf} or return; 495 $len <= length $_[0]{rbuf} or return;
504 $cb->($_[0], substr $_[0]{rbuf}, 0, $len, ""); 496 $cb->($_[0], substr $_[0]{rbuf}, 0, $len, "");
505 1 497 1
506 } 498 }
507} 499}
508 500
509sub push_read_chunk { 501sub push_read_chunk {
510 my ($self, $len, $cb) = @_;
511
512 $self->push_read (_read_chunk $len, $cb); 502 $_[0]->push_read (&_read_chunk);
513} 503}
514 504
515 505
516sub unshift_read_chunk { 506sub unshift_read_chunk {
517 my ($self, $len, $cb) = @_;
518
519 $self->unshift_read (_read_chunk $len, $cb); 507 $_[0]->unshift_read (&_read_chunk);
520} 508}
521 509
522=item $handle->push_read_line ([$eol, ]$cb->($self, $line, $eol)) 510=item $handle->push_read_line ([$eol, ]$cb->($self, $line, $eol))
523 511
524=item $handle->unshift_read_line ([$eol, ]$cb->($self, $line, $eol)) 512=item $handle->unshift_read_line ([$eol, ]$cb->($self, $line, $eol))
544not marked by the end of line marker. 532not marked by the end of line marker.
545 533
546=cut 534=cut
547 535
548sub _read_line($$) { 536sub _read_line($$) {
537 my $self = shift;
549 my $cb = pop; 538 my $cb = pop;
550 my $eol = @_ ? shift : qr|(\015?\012)|; 539 my $eol = @_ ? shift : qr|(\015?\012)|;
551 my $pos; 540 my $pos;
552 541
553 $eol = qr|(\Q$eol\E)| unless ref $eol; 542 $eol = quotemeta $eol unless ref $eol;
554 $eol = qr|^(.*?)($eol)|; 543 $eol = qr|^(.*?)($eol)|s;
555 544
556 sub { 545 sub {
557 $_[0]{rbuf} =~ s/$eol// or return; 546 $_[0]{rbuf} =~ s/$eol// or return;
558 547
559 $cb->($1, $2); 548 $cb->($_[0], $1, $2);
560 1 549 1
561 } 550 }
562} 551}
563 552
564sub push_read_line { 553sub push_read_line {
565 my $self = shift;
566
567 $self->push_read (&_read_line); 554 $_[0]->push_read (&_read_line);
568} 555}
569 556
570sub unshift_read_line { 557sub unshift_read_line {
571 my $self = shift;
572
573 $self->unshift_read (&_read_line); 558 $_[0]->unshift_read (&_read_line);
559}
560
561=item $handle->stop_read
562
563=item $handle->start_read
564
565In rare cases you actually do not want to read anything form the
566socket. In this case you can call C<stop_read>. Neither C<on_read> no
567any queued callbacks will be executed then. To start readign again, call
568C<start_read>.
569
570=cut
571
572sub stop_read {
573 my ($self) = @_;
574
575 delete $self->{rw};
576}
577
578sub start_read {
579 my ($self) = @_;
580
581 unless ($self->{rw} || $self->{eof}) {
582 Scalar::Util::weaken $self;
583
584 $self->{rw} = AnyEvent->io (fh => $self->{fh}, poll => "r", cb => sub {
585 my $len = sysread $self->{fh}, $self->{rbuf}, $self->{read_size} || 8192, length $self->{rbuf};
586
587 if ($len > 0) {
588 if (defined $self->{rbuf_max}) {
589 if ($self->{rbuf_max} < length $self->{rbuf}) {
590 $! = &Errno::ENOSPC; return $self->error;
591 }
592 }
593
594 } elsif (defined $len) {
595 $self->{eof} = 1;
596 delete $self->{rw};
597
598 } elsif ($! != EAGAIN && $! != EINTR) {
599 return $self->error;
600 }
601
602 $self->_drain_rbuf;
603 });
604 }
574} 605}
575 606
576=back 607=back
577 608
578=head1 AUTHOR 609=head1 AUTHOR

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines