… | |
… | |
12 | |
12 | |
13 | =head1 NAME |
13 | =head1 NAME |
14 | |
14 | |
15 | AnyEvent::Handle - non-blocking I/O on filehandles via AnyEvent |
15 | AnyEvent::Handle - non-blocking I/O on filehandles via AnyEvent |
16 | |
16 | |
17 | =cut |
17 | This module is experimental. |
18 | |
18 | |
|
|
19 | =cut |
|
|
20 | |
19 | our $VERSION = '0.02'; |
21 | our $VERSION = '0.04'; |
20 | |
22 | |
21 | =head1 SYNOPSIS |
23 | =head1 SYNOPSIS |
22 | |
24 | |
23 | use AnyEvent; |
25 | use AnyEvent; |
24 | use AnyEvent::Handle; |
26 | use AnyEvent::Handle; |
… | |
… | |
43 | $cv->wait; |
45 | $cv->wait; |
44 | |
46 | |
45 | =head1 DESCRIPTION |
47 | =head1 DESCRIPTION |
46 | |
48 | |
47 | This module is a helper module to make it easier to do event-based I/O on |
49 | This module is a helper module to make it easier to do event-based I/O on |
48 | filehandles (and sockets, see L<AnyEvent::Socket> for an easy way to make |
50 | filehandles. For utility functions for doing non-blocking connects and accepts |
49 | non-blocking resolves and connects). |
51 | on sockets see L<AnyEvent::Util>. |
50 | |
52 | |
51 | In the following, when the documentation refers to of "bytes" then this |
53 | In the following, when the documentation refers to of "bytes" then this |
52 | means characters. As sysread and syswrite are used for all I/O, their |
54 | means characters. As sysread and syswrite are used for all I/O, their |
53 | treatment of characters applies to this module as well. |
55 | treatment of characters applies to this module as well. |
54 | |
56 | |
… | |
… | |
70 | The filehandle this L<AnyEvent::Handle> object will operate on. |
72 | The filehandle this L<AnyEvent::Handle> object will operate on. |
71 | |
73 | |
72 | NOTE: The filehandle will be set to non-blocking (using |
74 | NOTE: The filehandle will be set to non-blocking (using |
73 | AnyEvent::Util::fh_nonblocking). |
75 | AnyEvent::Util::fh_nonblocking). |
74 | |
76 | |
|
|
77 | =item on_eof => $cb->($self) |
|
|
78 | |
|
|
79 | Set the callback to be called on EOF. |
|
|
80 | |
|
|
81 | While not mandatory, it is highly recommended to set an eof callback, |
|
|
82 | otherwise you might end up with a closed socket while you are still |
|
|
83 | waiting for data. |
|
|
84 | |
75 | =item on_error => $cb->($self) [MANDATORY] |
85 | =item on_error => $cb->($self) |
76 | |
86 | |
77 | This is the fatal error callback, that is called when a fatal error ocurs, |
87 | This is the fatal error callback, that is called when, well, a fatal error |
78 | such as not being able to resolve the hostname, failure to connect or a |
88 | ocurs, such as not being able to resolve the hostname, failure to connect |
79 | read error. |
89 | or a read error. |
80 | |
90 | |
81 | The object will not be in a usable state when this callback has been |
91 | The object will not be in a usable state when this callback has been |
82 | called. |
92 | called. |
83 | |
93 | |
84 | On callback entrance, the value of C<$!> contains the opertaing system |
94 | On callback entrance, the value of C<$!> contains the operating system |
85 | error (or C<ENOSPC> or C<EPIPE>). |
95 | error (or C<ENOSPC> or C<EPIPE>). |
86 | |
96 | |
87 | =item on_eof => $cb->($self) [MANDATORY] |
97 | While not mandatory, it is I<highly> recommended to set this callback, as |
88 | |
98 | you will not be notified of errors otherwise. The default simply calls |
89 | Set the callback to be called on EOF. |
99 | die. |
90 | |
100 | |
91 | =item on_read => $cb->($self) |
101 | =item on_read => $cb->($self) |
92 | |
102 | |
93 | This sets the default read callback, which is called when data arrives |
103 | This sets the default read callback, which is called when data arrives |
94 | and no read request is in the queue. If the read callback is C<undef> |
104 | and no read request is in the queue. |
95 | or has never been set, than AnyEvent::Handle will cease reading from the |
|
|
96 | filehandle. |
|
|
97 | |
105 | |
98 | To access (and remove data from) the read buffer, use the C<< ->rbuf >> |
106 | To access (and remove data from) the read buffer, use the C<< ->rbuf >> |
99 | method or acces sthe C<$self->{rbuf}> member directly. |
107 | method or acces sthe C<$self->{rbuf}> member directly. |
100 | |
108 | |
101 | When an EOF condition is detected then AnyEvent::Handle will first try to |
109 | When an EOF condition is detected then AnyEvent::Handle will first try to |
… | |
… | |
144 | |
152 | |
145 | $self->{fh} or Carp::croak "mandatory argument fh is missing"; |
153 | $self->{fh} or Carp::croak "mandatory argument fh is missing"; |
146 | |
154 | |
147 | AnyEvent::Util::fh_nonblocking $self->{fh}, 1; |
155 | AnyEvent::Util::fh_nonblocking $self->{fh}, 1; |
148 | |
156 | |
149 | $self->on_error ((delete $self->{on_error}) or Carp::croak "mandatory argument on_error is missing"); |
157 | $self->on_eof (delete $self->{on_eof} ) if $self->{on_eof}; |
150 | $self->on_eof ((delete $self->{on_eof} ) or Carp::croak "mandatory argument on_eof is missing"); |
158 | $self->on_error (delete $self->{on_error}) if $self->{on_error}; |
151 | |
|
|
152 | $self->on_drain (delete $self->{on_drain}) if $self->{on_drain}; |
159 | $self->on_drain (delete $self->{on_drain}) if $self->{on_drain}; |
153 | $self->on_read (delete $self->{on_read} ) if $self->{on_read}; |
160 | $self->on_read (delete $self->{on_read} ) if $self->{on_read}; |
|
|
161 | |
|
|
162 | $self->start_read; |
154 | |
163 | |
155 | $self |
164 | $self |
156 | } |
165 | } |
157 | |
166 | |
158 | sub _shutdown { |
167 | sub _shutdown { |
… | |
… | |
169 | { |
178 | { |
170 | local $!; |
179 | local $!; |
171 | $self->_shutdown; |
180 | $self->_shutdown; |
172 | } |
181 | } |
173 | |
182 | |
|
|
183 | if ($self->{on_error}) { |
174 | $self->{on_error}($self); |
184 | $self->{on_error}($self); |
|
|
185 | } else { |
|
|
186 | die "AnyEvent::Handle uncaught fatal error: $!"; |
|
|
187 | } |
175 | } |
188 | } |
176 | |
189 | |
177 | =item $fh = $handle->fh |
190 | =item $fh = $handle->fh |
178 | |
191 | |
179 | This method returns the filehandle of the L<AnyEvent::Handle> object. |
192 | This method returns the filehandle of the L<AnyEvent::Handle> object. |
… | |
… | |
349 | ... |
362 | ... |
350 | }); |
363 | }); |
351 | |
364 | |
352 | =over 4 |
365 | =over 4 |
353 | |
366 | |
|
|
367 | =cut |
|
|
368 | |
354 | sub _drain_rbuf { |
369 | sub _drain_rbuf { |
355 | my ($self) = @_; |
370 | my ($self) = @_; |
356 | |
371 | |
357 | return if exists $self->{in_drain}; |
372 | return if $self->{in_drain}; |
358 | local $self->{in_drain} = 1; |
373 | local $self->{in_drain} = 1; |
359 | |
374 | |
360 | while (my $len = length $self->{rbuf}) { |
375 | while (my $len = length $self->{rbuf}) { |
361 | no strict 'refs'; |
376 | no strict 'refs'; |
362 | if (@{ $self->{queue} }) { |
377 | if (my $cb = shift @{ $self->{queue} }) { |
363 | if ($self->{queue}[0]($self)) { |
378 | if (!$cb->($self)) { |
364 | shift @{ $self->{queue} }; |
|
|
365 | } elsif ($self->{eof}) { |
379 | if ($self->{eof}) { |
366 | # no progress can be made (not enough data and no data forthcoming) |
380 | # no progress can be made (not enough data and no data forthcoming) |
367 | $! = &Errno::EPIPE; return $self->error; |
381 | $! = &Errno::EPIPE; return $self->error; |
368 | } else { |
382 | } |
|
|
383 | |
|
|
384 | unshift @{ $self->{queue} }, $cb; |
369 | return; |
385 | return; |
370 | } |
386 | } |
371 | } elsif ($self->{on_read}) { |
387 | } elsif ($self->{on_read}) { |
372 | $self->{on_read}($self); |
388 | $self->{on_read}($self); |
373 | |
389 | |
… | |
… | |
387 | } |
403 | } |
388 | } |
404 | } |
389 | |
405 | |
390 | if ($self->{eof}) { |
406 | if ($self->{eof}) { |
391 | $self->_shutdown; |
407 | $self->_shutdown; |
392 | $self->{on_eof}($self); |
408 | $self->{on_eof}($self) |
|
|
409 | if $self->{on_eof}; |
393 | } |
410 | } |
394 | } |
411 | } |
395 | |
412 | |
396 | =item $handle->on_read ($cb) |
413 | =item $handle->on_read ($cb) |
397 | |
414 | |
… | |
… | |
403 | |
420 | |
404 | sub on_read { |
421 | sub on_read { |
405 | my ($self, $cb) = @_; |
422 | my ($self, $cb) = @_; |
406 | |
423 | |
407 | $self->{on_read} = $cb; |
424 | $self->{on_read} = $cb; |
408 | |
|
|
409 | unless ($self->{rw} || $self->{eof}) { |
|
|
410 | Scalar::Util::weaken $self; |
|
|
411 | |
|
|
412 | $self->{rw} = AnyEvent->io (fh => $self->{fh}, poll => "r", cb => sub { |
|
|
413 | my $len = sysread $self->{fh}, $self->{rbuf}, $self->{read_size} || 8192, length $self->{rbuf}; |
|
|
414 | |
|
|
415 | if ($len > 0) { |
|
|
416 | if (exists $self->{rbuf_max}) { |
|
|
417 | if ($self->{rbuf_max} < length $self->{rbuf}) { |
|
|
418 | $! = &Errno::ENOSPC; return $self->error; |
|
|
419 | } |
|
|
420 | } |
|
|
421 | |
|
|
422 | } elsif (defined $len) { |
|
|
423 | $self->{eof} = 1; |
|
|
424 | delete $self->{rw}; |
|
|
425 | |
|
|
426 | } elsif ($! != EAGAIN && $! != EINTR) { |
|
|
427 | return $self->error; |
|
|
428 | } |
|
|
429 | |
|
|
430 | $self->_drain_rbuf; |
|
|
431 | }); |
|
|
432 | } |
|
|
433 | } |
425 | } |
434 | |
426 | |
435 | =item $handle->rbuf |
427 | =item $handle->rbuf |
436 | |
428 | |
437 | Returns the read buffer (as a modifiable lvalue). |
429 | Returns the read buffer (as a modifiable lvalue). |
… | |
… | |
495 | these C<$len> bytes will be passed to the callback. |
487 | these C<$len> bytes will be passed to the callback. |
496 | |
488 | |
497 | =cut |
489 | =cut |
498 | |
490 | |
499 | sub _read_chunk($$) { |
491 | sub _read_chunk($$) { |
500 | my ($len, $cb) = @_; |
492 | my ($self, $len, $cb) = @_; |
501 | |
493 | |
502 | sub { |
494 | sub { |
503 | $len <= length $_[0]{rbuf} or return; |
495 | $len <= length $_[0]{rbuf} or return; |
504 | $cb->($_[0], substr $_[0]{rbuf}, 0, $len, ""); |
496 | $cb->($_[0], substr $_[0]{rbuf}, 0, $len, ""); |
505 | 1 |
497 | 1 |
506 | } |
498 | } |
507 | } |
499 | } |
508 | |
500 | |
509 | sub push_read_chunk { |
501 | sub push_read_chunk { |
510 | my ($self, $len, $cb) = @_; |
|
|
511 | |
|
|
512 | $self->push_read (_read_chunk $len, $cb); |
502 | $_[0]->push_read (&_read_chunk); |
513 | } |
503 | } |
514 | |
504 | |
515 | |
505 | |
516 | sub unshift_read_chunk { |
506 | sub unshift_read_chunk { |
517 | my ($self, $len, $cb) = @_; |
|
|
518 | |
|
|
519 | $self->unshift_read (_read_chunk $len, $cb); |
507 | $_[0]->unshift_read (&_read_chunk); |
520 | } |
508 | } |
521 | |
509 | |
522 | =item $handle->push_read_line ([$eol, ]$cb->($self, $line, $eol)) |
510 | =item $handle->push_read_line ([$eol, ]$cb->($self, $line, $eol)) |
523 | |
511 | |
524 | =item $handle->unshift_read_line ([$eol, ]$cb->($self, $line, $eol)) |
512 | =item $handle->unshift_read_line ([$eol, ]$cb->($self, $line, $eol)) |
… | |
… | |
544 | not marked by the end of line marker. |
532 | not marked by the end of line marker. |
545 | |
533 | |
546 | =cut |
534 | =cut |
547 | |
535 | |
548 | sub _read_line($$) { |
536 | sub _read_line($$) { |
|
|
537 | my $self = shift; |
549 | my $cb = pop; |
538 | my $cb = pop; |
550 | my $eol = @_ ? shift : qr|(\015?\012)|; |
539 | my $eol = @_ ? shift : qr|(\015?\012)|; |
551 | my $pos; |
540 | my $pos; |
552 | |
541 | |
553 | $eol = qr|(\Q$eol\E)| unless ref $eol; |
542 | $eol = quotemeta $eol unless ref $eol; |
554 | $eol = qr|^(.*?)($eol)|; |
543 | $eol = qr|^(.*?)($eol)|s; |
555 | |
544 | |
556 | sub { |
545 | sub { |
557 | $_[0]{rbuf} =~ s/$eol// or return; |
546 | $_[0]{rbuf} =~ s/$eol// or return; |
558 | |
547 | |
559 | $cb->($1, $2); |
548 | $cb->($_[0], $1, $2); |
560 | 1 |
549 | 1 |
561 | } |
550 | } |
562 | } |
551 | } |
563 | |
552 | |
564 | sub push_read_line { |
553 | sub push_read_line { |
565 | my $self = shift; |
|
|
566 | |
|
|
567 | $self->push_read (&_read_line); |
554 | $_[0]->push_read (&_read_line); |
568 | } |
555 | } |
569 | |
556 | |
570 | sub unshift_read_line { |
557 | sub unshift_read_line { |
571 | my $self = shift; |
|
|
572 | |
|
|
573 | $self->unshift_read (&_read_line); |
558 | $_[0]->unshift_read (&_read_line); |
|
|
559 | } |
|
|
560 | |
|
|
561 | =item $handle->stop_read |
|
|
562 | |
|
|
563 | =item $handle->start_read |
|
|
564 | |
|
|
565 | In rare cases you actually do not want to read anything form the |
|
|
566 | socket. In this case you can call C<stop_read>. Neither C<on_read> no |
|
|
567 | any queued callbacks will be executed then. To start readign again, call |
|
|
568 | C<start_read>. |
|
|
569 | |
|
|
570 | =cut |
|
|
571 | |
|
|
572 | sub stop_read { |
|
|
573 | my ($self) = @_; |
|
|
574 | |
|
|
575 | delete $self->{rw}; |
|
|
576 | } |
|
|
577 | |
|
|
578 | sub start_read { |
|
|
579 | my ($self) = @_; |
|
|
580 | |
|
|
581 | unless ($self->{rw} || $self->{eof}) { |
|
|
582 | Scalar::Util::weaken $self; |
|
|
583 | |
|
|
584 | $self->{rw} = AnyEvent->io (fh => $self->{fh}, poll => "r", cb => sub { |
|
|
585 | my $len = sysread $self->{fh}, $self->{rbuf}, $self->{read_size} || 8192, length $self->{rbuf}; |
|
|
586 | |
|
|
587 | if ($len > 0) { |
|
|
588 | if (defined $self->{rbuf_max}) { |
|
|
589 | if ($self->{rbuf_max} < length $self->{rbuf}) { |
|
|
590 | $! = &Errno::ENOSPC; return $self->error; |
|
|
591 | } |
|
|
592 | } |
|
|
593 | |
|
|
594 | } elsif (defined $len) { |
|
|
595 | $self->{eof} = 1; |
|
|
596 | delete $self->{rw}; |
|
|
597 | |
|
|
598 | } elsif ($! != EAGAIN && $! != EINTR) { |
|
|
599 | return $self->error; |
|
|
600 | } |
|
|
601 | |
|
|
602 | $self->_drain_rbuf; |
|
|
603 | }); |
|
|
604 | } |
574 | } |
605 | } |
575 | |
606 | |
576 | =back |
607 | =back |
577 | |
608 | |
578 | =head1 AUTHOR |
609 | =head1 AUTHOR |