ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent/lib/AnyEvent/Handle.pm
Revision: 1.12
Committed: Thu May 15 09:03:43 2008 UTC (16 years ago) by elmex
Branch: MAIN
Changes since 1.11: +2 -2 lines
Log Message:
fixed write test and bug in *_read_chunk

File Contents

# User Rev Content
1 elmex 1.1 package AnyEvent::Handle;
2    
3 elmex 1.6 no warnings;
4 elmex 1.1 use strict;
5    
6 root 1.8 use AnyEvent ();
7     use AnyEvent::Util ();
8     use Scalar::Util ();
9     use Carp ();
10     use Fcntl ();
11 elmex 1.1 use Errno qw/EAGAIN EINTR/;
12    
13     =head1 NAME
14    
15     AnyEvent::Handle - non-blocking I/O on filehandles via AnyEvent
16    
17     =cut
18    
19 root 1.8 our $VERSION = '0.02';
20 elmex 1.1
21     =head1 SYNOPSIS
22    
23     use AnyEvent;
24     use AnyEvent::Handle;
25    
26     my $cv = AnyEvent->condvar;
27    
28     my $ae_fh = AnyEvent::Handle->new (fh => \*STDIN);
29    
30 root 1.8 #TODO
31 elmex 1.1
32 elmex 1.2 # or use the constructor to pass the callback:
33    
34     my $ae_fh2 =
35     AnyEvent::Handle->new (
36     fh => \*STDIN,
37     on_eof => sub {
38     $cv->broadcast;
39     },
40 root 1.8 #TODO
41 elmex 1.2 );
42    
43 elmex 1.1 $cv->wait;
44    
45     =head1 DESCRIPTION
46    
47 root 1.8 This module is a helper module to make it easier to do event-based I/O on
48     filehandles (and sockets, see L<AnyEvent::Socket> for an easy way to make
49     non-blocking resolves and connects).
50    
51     In the following, when the documentation refers to of "bytes" then this
52     means characters. As sysread and syswrite are used for all I/O, their
53     treatment of characters applies to this module as well.
54 elmex 1.1
55 root 1.8 All callbacks will be invoked with the handle object as their first
56     argument.
57 elmex 1.1
58     =head1 METHODS
59    
60     =over 4
61    
62     =item B<new (%args)>
63    
64 root 1.8 The constructor supports these arguments (all as key => value pairs).
65 elmex 1.1
66     =over 4
67    
68 root 1.8 =item fh => $filehandle [MANDATORY]
69 elmex 1.1
70     The filehandle this L<AnyEvent::Handle> object will operate on.
71    
72 root 1.8 NOTE: The filehandle will be set to non-blocking (using
73     AnyEvent::Util::fh_nonblocking).
74    
75 root 1.10 =item on_eof => $cb->($self) [MANDATORY]
76    
77     Set the callback to be called on EOF.
78 root 1.8
79 root 1.10 =item on_error => $cb->($self)
80    
81     This is the fatal error callback, that is called when, well, a fatal error
82     ocurs, such as not being able to resolve the hostname, failure to connect
83     or a read error.
84 root 1.8
85     The object will not be in a usable state when this callback has been
86     called.
87    
88 root 1.10 On callback entrance, the value of C<$!> contains the operating system
89 root 1.8 error (or C<ENOSPC> or C<EPIPE>).
90    
91 root 1.10 While not mandatory, it is I<highly> recommended to set this callback, as
92     you will not be notified of errors otherwise. The default simply calls
93     die.
94 root 1.8
95     =item on_read => $cb->($self)
96    
97     This sets the default read callback, which is called when data arrives
98 root 1.10 and no read request is in the queue.
99 root 1.8
100     To access (and remove data from) the read buffer, use the C<< ->rbuf >>
101     method or acces sthe C<$self->{rbuf}> member directly.
102    
103     When an EOF condition is detected then AnyEvent::Handle will first try to
104     feed all the remaining data to the queued callbacks and C<on_read> before
105     calling the C<on_eof> callback. If no progress can be made, then a fatal
106     error will be raised (with C<$!> set to C<EPIPE>).
107 elmex 1.1
108 root 1.8 =item on_drain => $cb->()
109 elmex 1.1
110 root 1.8 This sets the callback that is called when the write buffer becomes empty
111     (or when the callback is set and the buffer is empty already).
112 elmex 1.1
113 root 1.8 To append to the write buffer, use the C<< ->push_write >> method.
114 elmex 1.2
115 root 1.8 =item rbuf_max => <bytes>
116 elmex 1.2
117 root 1.8 If defined, then a fatal error will be raised (with C<$!> set to C<ENOSPC>)
118     when the read buffer ever (strictly) exceeds this size. This is useful to
119     avoid denial-of-service attacks.
120 elmex 1.2
121 root 1.8 For example, a server accepting connections from untrusted sources should
122     be configured to accept only so-and-so much data that it cannot act on
123     (for example, when expecting a line, an attacker could send an unlimited
124     amount of data without a callback ever being called as long as the line
125     isn't finished).
126 elmex 1.2
127 root 1.8 =item read_size => <bytes>
128 elmex 1.2
129 root 1.8 The default read block size (the amount of bytes this module will try to read
130     on each [loop iteration). Default: C<4096>.
131    
132     =item low_water_mark => <bytes>
133    
134     Sets the amount of bytes (default: C<0>) that make up an "empty" write
135     buffer: If the write reaches this size or gets even samller it is
136     considered empty.
137 elmex 1.2
138 elmex 1.1 =back
139    
140     =cut
141    
142     sub new {
143 root 1.8 my $class = shift;
144    
145     my $self = bless { @_ }, $class;
146    
147     $self->{fh} or Carp::croak "mandatory argument fh is missing";
148    
149     AnyEvent::Util::fh_nonblocking $self->{fh}, 1;
150 elmex 1.1
151 root 1.8 $self->on_eof ((delete $self->{on_eof} ) or Carp::croak "mandatory argument on_eof is missing");
152 elmex 1.1
153 root 1.10 $self->on_error (delete $self->{on_error}) if $self->{on_error};
154 root 1.8 $self->on_drain (delete $self->{on_drain}) if $self->{on_drain};
155     $self->on_read (delete $self->{on_read} ) if $self->{on_read};
156 elmex 1.1
157 root 1.10 $self->start_read;
158    
159 root 1.8 $self
160     }
161 elmex 1.2
162 root 1.8 sub _shutdown {
163     my ($self) = @_;
164 elmex 1.2
165 root 1.8 delete $self->{rw};
166     delete $self->{ww};
167     delete $self->{fh};
168     }
169    
170     sub error {
171     my ($self) = @_;
172    
173     {
174     local $!;
175     $self->_shutdown;
176 elmex 1.1 }
177    
178 root 1.10 if ($self->{on_error}) {
179     $self->{on_error}($self);
180     } else {
181     die "AnyEvent::Handle uncaught fatal error: $!";
182     }
183 elmex 1.1 }
184    
185 root 1.8 =item $fh = $handle->fh
186 elmex 1.1
187     This method returns the filehandle of the L<AnyEvent::Handle> object.
188    
189     =cut
190    
191     sub fh { $_[0]->{fh} }
192    
193 root 1.8 =item $handle->on_error ($cb)
194 elmex 1.1
195 root 1.8 Replace the current C<on_error> callback (see the C<on_error> constructor argument).
196 elmex 1.1
197 root 1.8 =cut
198    
199     sub on_error {
200     $_[0]{on_error} = $_[1];
201     }
202    
203     =item $handle->on_eof ($cb)
204    
205     Replace the current C<on_eof> callback (see the C<on_eof> constructor argument).
206 elmex 1.1
207     =cut
208    
209 root 1.8 sub on_eof {
210     $_[0]{on_eof} = $_[1];
211     }
212    
213 root 1.9 #############################################################################
214    
215     =back
216    
217     =head2 WRITE QUEUE
218    
219     AnyEvent::Handle manages two queues per handle, one for writing and one
220     for reading.
221    
222     The write queue is very simple: you can add data to its end, and
223     AnyEvent::Handle will automatically try to get rid of it for you.
224    
225     When data could be writtena nd the write buffer is shorter then the low
226     water mark, the C<on_drain> callback will be invoked.
227    
228     =over 4
229    
230 root 1.8 =item $handle->on_drain ($cb)
231    
232     Sets the C<on_drain> callback or clears it (see the description of
233     C<on_drain> in the constructor).
234    
235     =cut
236    
237     sub on_drain {
238 elmex 1.1 my ($self, $cb) = @_;
239    
240 root 1.8 $self->{on_drain} = $cb;
241    
242     $cb->($self)
243     if $cb && $self->{low_water_mark} >= length $self->{wbuf};
244     }
245    
246     =item $handle->push_write ($data)
247    
248     Queues the given scalar to be written. You can push as much data as you
249     want (only limited by the available memory), as C<AnyEvent::Handle>
250     buffers it independently of the kernel.
251    
252     =cut
253    
254     sub push_write {
255     my ($self, $data) = @_;
256    
257     $self->{wbuf} .= $data;
258    
259     unless ($self->{ww}) {
260     Scalar::Util::weaken $self;
261     my $cb = sub {
262     my $len = syswrite $self->{fh}, $self->{wbuf};
263    
264     if ($len > 0) {
265     substr $self->{wbuf}, 0, $len, "";
266    
267    
268     $self->{on_drain}($self)
269     if $self->{low_water_mark} >= length $self->{wbuf}
270     && $self->{on_drain};
271    
272     delete $self->{ww} unless length $self->{wbuf};
273     } elsif ($! != EAGAIN && $! != EINTR) {
274     $self->error;
275 elmex 1.1 }
276 root 1.8 };
277    
278     $self->{ww} = AnyEvent->io (fh => $self->{fh}, poll => "w", cb => $cb);
279    
280     $cb->($self);
281     };
282     }
283    
284     #############################################################################
285    
286 root 1.9 =back
287    
288     =head2 READ QUEUE
289    
290     AnyEvent::Handle manages two queues per handle, one for writing and one
291     for reading.
292    
293     The read queue is more complex than the write queue. It can be used in two
294     ways, the "simple" way, using only C<on_read> and the "complex" way, using
295     a queue.
296    
297     In the simple case, you just install an C<on_read> callback and whenever
298     new data arrives, it will be called. You can then remove some data (if
299     enough is there) from the read buffer (C<< $handle->rbuf >>) if you want
300     or not.
301    
302     In the more complex case, you want to queue multiple callbacks. In this
303     case, AnyEvent::Handle will call the first queued callback each time new
304     data arrives and removes it when it has done its job (see C<push_read>,
305     below).
306    
307     This way you can, for example, push three line-reads, followed by reading
308     a chunk of data, and AnyEvent::Handle will execute them in order.
309    
310     Example 1: EPP protocol parser. EPP sends 4 byte length info, followed by
311     the specified number of bytes which give an XML datagram.
312    
313     # in the default state, expect some header bytes
314     $handle->on_read (sub {
315     # some data is here, now queue the length-header-read (4 octets)
316     shift->unshift_read_chunk (4, sub {
317     # header arrived, decode
318     my $len = unpack "N", $_[1];
319    
320     # now read the payload
321     shift->unshift_read_chunk ($len, sub {
322     my $xml = $_[1];
323     # handle xml
324     });
325     });
326     });
327    
328     Example 2: Implement a client for a protocol that replies either with
329     "OK" and another line or "ERROR" for one request, and 64 bytes for the
330     second request. Due tot he availability of a full queue, we can just
331     pipeline sending both requests and manipulate the queue as necessary in
332     the callbacks:
333    
334     # request one
335     $handle->push_write ("request 1\015\012");
336    
337     # we expect "ERROR" or "OK" as response, so push a line read
338     $handle->push_read_line (sub {
339     # if we got an "OK", we have to _prepend_ another line,
340     # so it will be read before the second request reads its 64 bytes
341     # which are already in the queue when this callback is called
342     # we don't do this in case we got an error
343     if ($_[1] eq "OK") {
344     $_[0]->unshift_read_line (sub {
345     my $response = $_[1];
346     ...
347     });
348     }
349     });
350    
351     # request two
352     $handle->push_write ("request 2\015\012");
353    
354     # simply read 64 bytes, always
355     $handle->push_read_chunk (64, sub {
356     my $response = $_[1];
357     ...
358     });
359    
360     =over 4
361    
362 root 1.10 =cut
363    
364 root 1.8 sub _drain_rbuf {
365     my ($self) = @_;
366 elmex 1.1
367 root 1.11 return if $self->{in_drain};
368 root 1.8 local $self->{in_drain} = 1;
369 elmex 1.1
370 root 1.8 while (my $len = length $self->{rbuf}) {
371     no strict 'refs';
372 root 1.10 if (my $cb = shift @{ $self->{queue} }) {
373     if (!$cb->($self)) {
374     if ($self->{eof}) {
375     # no progress can be made (not enough data and no data forthcoming)
376     $! = &Errno::EPIPE; return $self->error;
377     }
378    
379     unshift @{ $self->{queue} }, $cb;
380 root 1.8 return;
381     }
382     } elsif ($self->{on_read}) {
383     $self->{on_read}($self);
384    
385     if (
386     $self->{eof} # if no further data will arrive
387     && $len == length $self->{rbuf} # and no data has been consumed
388     && !@{ $self->{queue} } # and the queue is still empty
389     && $self->{on_read} # and we still want to read data
390     ) {
391     # then no progress can be made
392     $! = &Errno::EPIPE; return $self->error;
393 elmex 1.1 }
394 root 1.8 } else {
395     # read side becomes idle
396     delete $self->{rw};
397     return;
398     }
399     }
400    
401     if ($self->{eof}) {
402     $self->_shutdown;
403     $self->{on_eof}($self);
404     }
405 elmex 1.1 }
406    
407 root 1.8 =item $handle->on_read ($cb)
408 elmex 1.1
409 root 1.8 This replaces the currently set C<on_read> callback, or clears it (when
410     the new callback is C<undef>). See the description of C<on_read> in the
411     constructor.
412 elmex 1.1
413 root 1.8 =cut
414    
415     sub on_read {
416     my ($self, $cb) = @_;
417 elmex 1.1
418 root 1.8 $self->{on_read} = $cb;
419 elmex 1.1 }
420    
421 root 1.8 =item $handle->rbuf
422    
423     Returns the read buffer (as a modifiable lvalue).
424 elmex 1.1
425 root 1.8 You can access the read buffer directly as the C<< ->{rbuf} >> member, if
426     you want.
427 elmex 1.1
428 root 1.8 NOTE: The read buffer should only be used or modified if the C<on_read>,
429     C<push_read> or C<unshift_read> methods are used. The other read methods
430     automatically manage the read buffer.
431 elmex 1.1
432     =cut
433    
434 elmex 1.2 sub rbuf : lvalue {
435 root 1.8 $_[0]{rbuf}
436 elmex 1.2 }
437 elmex 1.1
438 root 1.8 =item $handle->push_read ($cb)
439    
440     =item $handle->unshift_read ($cb)
441    
442     Append the given callback to the end of the queue (C<push_read>) or
443     prepend it (C<unshift_read>).
444    
445     The callback is called each time some additional read data arrives.
446 elmex 1.1
447 root 1.8 It must check wether enough data is in the read buffer already.
448 elmex 1.1
449 root 1.8 If not enough data is available, it must return the empty list or a false
450     value, in which case it will be called repeatedly until enough data is
451     available (or an error condition is detected).
452    
453     If enough data was available, then the callback must remove all data it is
454     interested in (which can be none at all) and return a true value. After returning
455     true, it will be removed from the queue.
456 elmex 1.1
457     =cut
458    
459 root 1.8 sub push_read {
460     my ($self, $cb) = @_;
461 elmex 1.1
462 root 1.8 push @{ $self->{queue} }, $cb;
463     $self->_drain_rbuf;
464 elmex 1.1 }
465    
466 root 1.8 sub unshift_read {
467     my ($self, $cb) = @_;
468    
469     push @{ $self->{queue} }, $cb;
470     $self->_drain_rbuf;
471     }
472 elmex 1.1
473 root 1.8 =item $handle->push_read_chunk ($len, $cb->($self, $data))
474 elmex 1.1
475 root 1.8 =item $handle->unshift_read_chunk ($len, $cb->($self, $data))
476 elmex 1.1
477 root 1.8 Append the given callback to the end of the queue (C<push_read_chunk>) or
478     prepend it (C<unshift_read_chunk>).
479 elmex 1.1
480 root 1.8 The callback will be called only once C<$len> bytes have been read, and
481     these C<$len> bytes will be passed to the callback.
482 elmex 1.1
483     =cut
484    
485 root 1.8 sub _read_chunk($$) {
486 root 1.10 my ($self, $len, $cb) = @_;
487 elmex 1.1
488 root 1.8 sub {
489     $len <= length $_[0]{rbuf} or return;
490 elmex 1.12 $cb->($_[0], substr $_[0]{rbuf}, 0, $len, "");
491 root 1.8 1
492     }
493     }
494    
495     sub push_read_chunk {
496 root 1.10 $_[0]->push_read (&_read_chunk);
497 root 1.8 }
498 elmex 1.1
499 elmex 1.5
500 root 1.8 sub unshift_read_chunk {
501 root 1.10 $_[0]->unshift_read (&_read_chunk);
502 elmex 1.1 }
503    
504 root 1.8 =item $handle->push_read_line ([$eol, ]$cb->($self, $line, $eol))
505 elmex 1.1
506 root 1.8 =item $handle->unshift_read_line ([$eol, ]$cb->($self, $line, $eol))
507 elmex 1.1
508 root 1.8 Append the given callback to the end of the queue (C<push_read_line>) or
509     prepend it (C<unshift_read_line>).
510 elmex 1.1
511 root 1.8 The callback will be called only once a full line (including the end of
512     line marker, C<$eol>) has been read. This line (excluding the end of line
513     marker) will be passed to the callback as second argument (C<$line>), and
514     the end of line marker as the third argument (C<$eol>).
515 elmex 1.1
516 root 1.8 The end of line marker, C<$eol>, can be either a string, in which case it
517     will be interpreted as a fixed record end marker, or it can be a regex
518     object (e.g. created by C<qr>), in which case it is interpreted as a
519     regular expression.
520 elmex 1.1
521 root 1.8 The end of line marker argument C<$eol> is optional, if it is missing (NOT
522     undef), then C<qr|\015?\012|> is used (which is good for most internet
523     protocols).
524 elmex 1.1
525 root 1.8 Partial lines at the end of the stream will never be returned, as they are
526     not marked by the end of line marker.
527 elmex 1.1
528 root 1.8 =cut
529 elmex 1.1
530 root 1.8 sub _read_line($$) {
531 root 1.10 my $self = shift;
532 root 1.8 my $cb = pop;
533     my $eol = @_ ? shift : qr|(\015?\012)|;
534     my $pos;
535 elmex 1.1
536 root 1.8 $eol = qr|(\Q$eol\E)| unless ref $eol;
537     $eol = qr|^(.*?)($eol)|;
538 elmex 1.1
539 root 1.8 sub {
540     $_[0]{rbuf} =~ s/$eol// or return;
541 elmex 1.1
542 elmex 1.12 $cb->($_[0], $1, $2);
543 root 1.8 1
544     }
545     }
546 elmex 1.1
547 root 1.8 sub push_read_line {
548 root 1.10 $_[0]->push_read (&_read_line);
549     }
550    
551     sub unshift_read_line {
552     $_[0]->unshift_read (&_read_line);
553     }
554    
555     =item $handle->stop_read
556    
557     =item $handle->start_read
558    
559     In rare cases you actually do not want to read anything form the
560     socket. In this case you can call C<stop_read>. Neither C<on_read> no
561     any queued callbacks will be executed then. To start readign again, call
562     C<start_read>.
563    
564     =cut
565    
566     sub stop_read {
567     my ($self) = @_;
568 elmex 1.1
569 root 1.10 delete $self->{rw};
570 root 1.8 }
571 elmex 1.1
572 root 1.10 sub start_read {
573     my ($self) = @_;
574    
575     unless ($self->{rw} || $self->{eof}) {
576     Scalar::Util::weaken $self;
577    
578     $self->{rw} = AnyEvent->io (fh => $self->{fh}, poll => "r", cb => sub {
579     my $len = sysread $self->{fh}, $self->{rbuf}, $self->{read_size} || 8192, length $self->{rbuf};
580    
581     if ($len > 0) {
582 root 1.11 if (defined $self->{rbuf_max}) {
583 root 1.10 if ($self->{rbuf_max} < length $self->{rbuf}) {
584     $! = &Errno::ENOSPC; return $self->error;
585     }
586     }
587    
588     } elsif (defined $len) {
589     $self->{eof} = 1;
590     delete $self->{rw};
591    
592     } elsif ($! != EAGAIN && $! != EINTR) {
593     return $self->error;
594     }
595 elmex 1.1
596 root 1.10 $self->_drain_rbuf;
597     });
598     }
599 elmex 1.1 }
600    
601     =back
602    
603     =head1 AUTHOR
604    
605 root 1.8 Robin Redeker C<< <elmex at ta-sa.org> >>, Marc Lehmann <schmorp@schmorp.de>.
606 elmex 1.1
607     =cut
608    
609     1; # End of AnyEvent::Handle