ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent/lib/AnyEvent/Handle.pm
Revision: 1.15
Committed: Sat May 17 21:34:15 2008 UTC (16 years ago) by root
Branch: MAIN
CVS Tags: rel-3_5
Changes since 1.14: +3 -1 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 elmex 1.1 package AnyEvent::Handle;
2    
3 elmex 1.6 no warnings;
4 elmex 1.1 use strict;
5    
6 root 1.8 use AnyEvent ();
7     use AnyEvent::Util ();
8     use Scalar::Util ();
9     use Carp ();
10     use Fcntl ();
11 elmex 1.1 use Errno qw/EAGAIN EINTR/;
12    
13     =head1 NAME
14    
15     AnyEvent::Handle - non-blocking I/O on filehandles via AnyEvent
16    
17 root 1.15 This module is experimental.
18    
19 elmex 1.1 =cut
20    
21 root 1.15 our $VERSION = '0.04';
22 elmex 1.1
23     =head1 SYNOPSIS
24    
25     use AnyEvent;
26     use AnyEvent::Handle;
27    
28     my $cv = AnyEvent->condvar;
29    
30     my $ae_fh = AnyEvent::Handle->new (fh => \*STDIN);
31    
32 root 1.8 #TODO
33 elmex 1.1
34 elmex 1.2 # or use the constructor to pass the callback:
35    
36     my $ae_fh2 =
37     AnyEvent::Handle->new (
38     fh => \*STDIN,
39     on_eof => sub {
40     $cv->broadcast;
41     },
42 root 1.8 #TODO
43 elmex 1.2 );
44    
45 elmex 1.1 $cv->wait;
46    
47     =head1 DESCRIPTION
48    
49 root 1.8 This module is a helper module to make it easier to do event-based I/O on
50 elmex 1.13 filehandles. For utility functions for doing non-blocking connects and accepts
51     on sockets see L<AnyEvent::Util>.
52 root 1.8
53     In the following, when the documentation refers to of "bytes" then this
54     means characters. As sysread and syswrite are used for all I/O, their
55     treatment of characters applies to this module as well.
56 elmex 1.1
57 root 1.8 All callbacks will be invoked with the handle object as their first
58     argument.
59 elmex 1.1
60     =head1 METHODS
61    
62     =over 4
63    
64     =item B<new (%args)>
65    
66 root 1.8 The constructor supports these arguments (all as key => value pairs).
67 elmex 1.1
68     =over 4
69    
70 root 1.8 =item fh => $filehandle [MANDATORY]
71 elmex 1.1
72     The filehandle this L<AnyEvent::Handle> object will operate on.
73    
74 root 1.8 NOTE: The filehandle will be set to non-blocking (using
75     AnyEvent::Util::fh_nonblocking).
76    
77 root 1.10 =item on_eof => $cb->($self) [MANDATORY]
78    
79     Set the callback to be called on EOF.
80 root 1.8
81 root 1.10 =item on_error => $cb->($self)
82    
83     This is the fatal error callback, that is called when, well, a fatal error
84     ocurs, such as not being able to resolve the hostname, failure to connect
85     or a read error.
86 root 1.8
87     The object will not be in a usable state when this callback has been
88     called.
89    
90 root 1.10 On callback entrance, the value of C<$!> contains the operating system
91 root 1.8 error (or C<ENOSPC> or C<EPIPE>).
92    
93 root 1.10 While not mandatory, it is I<highly> recommended to set this callback, as
94     you will not be notified of errors otherwise. The default simply calls
95     die.
96 root 1.8
97     =item on_read => $cb->($self)
98    
99     This sets the default read callback, which is called when data arrives
100 root 1.10 and no read request is in the queue.
101 root 1.8
102     To access (and remove data from) the read buffer, use the C<< ->rbuf >>
103     method or acces sthe C<$self->{rbuf}> member directly.
104    
105     When an EOF condition is detected then AnyEvent::Handle will first try to
106     feed all the remaining data to the queued callbacks and C<on_read> before
107     calling the C<on_eof> callback. If no progress can be made, then a fatal
108     error will be raised (with C<$!> set to C<EPIPE>).
109 elmex 1.1
110 root 1.8 =item on_drain => $cb->()
111 elmex 1.1
112 root 1.8 This sets the callback that is called when the write buffer becomes empty
113     (or when the callback is set and the buffer is empty already).
114 elmex 1.1
115 root 1.8 To append to the write buffer, use the C<< ->push_write >> method.
116 elmex 1.2
117 root 1.8 =item rbuf_max => <bytes>
118 elmex 1.2
119 root 1.8 If defined, then a fatal error will be raised (with C<$!> set to C<ENOSPC>)
120     when the read buffer ever (strictly) exceeds this size. This is useful to
121     avoid denial-of-service attacks.
122 elmex 1.2
123 root 1.8 For example, a server accepting connections from untrusted sources should
124     be configured to accept only so-and-so much data that it cannot act on
125     (for example, when expecting a line, an attacker could send an unlimited
126     amount of data without a callback ever being called as long as the line
127     isn't finished).
128 elmex 1.2
129 root 1.8 =item read_size => <bytes>
130 elmex 1.2
131 root 1.8 The default read block size (the amount of bytes this module will try to read
132     on each [loop iteration). Default: C<4096>.
133    
134     =item low_water_mark => <bytes>
135    
136     Sets the amount of bytes (default: C<0>) that make up an "empty" write
137     buffer: If the write reaches this size or gets even samller it is
138     considered empty.
139 elmex 1.2
140 elmex 1.1 =back
141    
142     =cut
143    
144     sub new {
145 root 1.8 my $class = shift;
146    
147     my $self = bless { @_ }, $class;
148    
149     $self->{fh} or Carp::croak "mandatory argument fh is missing";
150    
151     AnyEvent::Util::fh_nonblocking $self->{fh}, 1;
152 elmex 1.1
153 root 1.8 $self->on_eof ((delete $self->{on_eof} ) or Carp::croak "mandatory argument on_eof is missing");
154 elmex 1.1
155 root 1.10 $self->on_error (delete $self->{on_error}) if $self->{on_error};
156 root 1.8 $self->on_drain (delete $self->{on_drain}) if $self->{on_drain};
157     $self->on_read (delete $self->{on_read} ) if $self->{on_read};
158 elmex 1.1
159 root 1.10 $self->start_read;
160    
161 root 1.8 $self
162     }
163 elmex 1.2
164 root 1.8 sub _shutdown {
165     my ($self) = @_;
166 elmex 1.2
167 root 1.8 delete $self->{rw};
168     delete $self->{ww};
169     delete $self->{fh};
170     }
171    
172     sub error {
173     my ($self) = @_;
174    
175     {
176     local $!;
177     $self->_shutdown;
178 elmex 1.1 }
179    
180 root 1.10 if ($self->{on_error}) {
181     $self->{on_error}($self);
182     } else {
183     die "AnyEvent::Handle uncaught fatal error: $!";
184     }
185 elmex 1.1 }
186    
187 root 1.8 =item $fh = $handle->fh
188 elmex 1.1
189     This method returns the filehandle of the L<AnyEvent::Handle> object.
190    
191     =cut
192    
193     sub fh { $_[0]->{fh} }
194    
195 root 1.8 =item $handle->on_error ($cb)
196 elmex 1.1
197 root 1.8 Replace the current C<on_error> callback (see the C<on_error> constructor argument).
198 elmex 1.1
199 root 1.8 =cut
200    
201     sub on_error {
202     $_[0]{on_error} = $_[1];
203     }
204    
205     =item $handle->on_eof ($cb)
206    
207     Replace the current C<on_eof> callback (see the C<on_eof> constructor argument).
208 elmex 1.1
209     =cut
210    
211 root 1.8 sub on_eof {
212     $_[0]{on_eof} = $_[1];
213     }
214    
215 root 1.9 #############################################################################
216    
217     =back
218    
219     =head2 WRITE QUEUE
220    
221     AnyEvent::Handle manages two queues per handle, one for writing and one
222     for reading.
223    
224     The write queue is very simple: you can add data to its end, and
225     AnyEvent::Handle will automatically try to get rid of it for you.
226    
227     When data could be writtena nd the write buffer is shorter then the low
228     water mark, the C<on_drain> callback will be invoked.
229    
230     =over 4
231    
232 root 1.8 =item $handle->on_drain ($cb)
233    
234     Sets the C<on_drain> callback or clears it (see the description of
235     C<on_drain> in the constructor).
236    
237     =cut
238    
239     sub on_drain {
240 elmex 1.1 my ($self, $cb) = @_;
241    
242 root 1.8 $self->{on_drain} = $cb;
243    
244     $cb->($self)
245     if $cb && $self->{low_water_mark} >= length $self->{wbuf};
246     }
247    
248     =item $handle->push_write ($data)
249    
250     Queues the given scalar to be written. You can push as much data as you
251     want (only limited by the available memory), as C<AnyEvent::Handle>
252     buffers it independently of the kernel.
253    
254     =cut
255    
256     sub push_write {
257     my ($self, $data) = @_;
258    
259     $self->{wbuf} .= $data;
260    
261     unless ($self->{ww}) {
262     Scalar::Util::weaken $self;
263     my $cb = sub {
264     my $len = syswrite $self->{fh}, $self->{wbuf};
265    
266     if ($len > 0) {
267     substr $self->{wbuf}, 0, $len, "";
268    
269    
270     $self->{on_drain}($self)
271     if $self->{low_water_mark} >= length $self->{wbuf}
272     && $self->{on_drain};
273    
274     delete $self->{ww} unless length $self->{wbuf};
275     } elsif ($! != EAGAIN && $! != EINTR) {
276     $self->error;
277 elmex 1.1 }
278 root 1.8 };
279    
280     $self->{ww} = AnyEvent->io (fh => $self->{fh}, poll => "w", cb => $cb);
281    
282     $cb->($self);
283     };
284     }
285    
286     #############################################################################
287    
288 root 1.9 =back
289    
290     =head2 READ QUEUE
291    
292     AnyEvent::Handle manages two queues per handle, one for writing and one
293     for reading.
294    
295     The read queue is more complex than the write queue. It can be used in two
296     ways, the "simple" way, using only C<on_read> and the "complex" way, using
297     a queue.
298    
299     In the simple case, you just install an C<on_read> callback and whenever
300     new data arrives, it will be called. You can then remove some data (if
301     enough is there) from the read buffer (C<< $handle->rbuf >>) if you want
302     or not.
303    
304     In the more complex case, you want to queue multiple callbacks. In this
305     case, AnyEvent::Handle will call the first queued callback each time new
306     data arrives and removes it when it has done its job (see C<push_read>,
307     below).
308    
309     This way you can, for example, push three line-reads, followed by reading
310     a chunk of data, and AnyEvent::Handle will execute them in order.
311    
312     Example 1: EPP protocol parser. EPP sends 4 byte length info, followed by
313     the specified number of bytes which give an XML datagram.
314    
315     # in the default state, expect some header bytes
316     $handle->on_read (sub {
317     # some data is here, now queue the length-header-read (4 octets)
318     shift->unshift_read_chunk (4, sub {
319     # header arrived, decode
320     my $len = unpack "N", $_[1];
321    
322     # now read the payload
323     shift->unshift_read_chunk ($len, sub {
324     my $xml = $_[1];
325     # handle xml
326     });
327     });
328     });
329    
330     Example 2: Implement a client for a protocol that replies either with
331     "OK" and another line or "ERROR" for one request, and 64 bytes for the
332     second request. Due tot he availability of a full queue, we can just
333     pipeline sending both requests and manipulate the queue as necessary in
334     the callbacks:
335    
336     # request one
337     $handle->push_write ("request 1\015\012");
338    
339     # we expect "ERROR" or "OK" as response, so push a line read
340     $handle->push_read_line (sub {
341     # if we got an "OK", we have to _prepend_ another line,
342     # so it will be read before the second request reads its 64 bytes
343     # which are already in the queue when this callback is called
344     # we don't do this in case we got an error
345     if ($_[1] eq "OK") {
346     $_[0]->unshift_read_line (sub {
347     my $response = $_[1];
348     ...
349     });
350     }
351     });
352    
353     # request two
354     $handle->push_write ("request 2\015\012");
355    
356     # simply read 64 bytes, always
357     $handle->push_read_chunk (64, sub {
358     my $response = $_[1];
359     ...
360     });
361    
362     =over 4
363    
364 root 1.10 =cut
365    
366 root 1.8 sub _drain_rbuf {
367     my ($self) = @_;
368 elmex 1.1
369 root 1.11 return if $self->{in_drain};
370 root 1.8 local $self->{in_drain} = 1;
371 elmex 1.1
372 root 1.8 while (my $len = length $self->{rbuf}) {
373     no strict 'refs';
374 root 1.10 if (my $cb = shift @{ $self->{queue} }) {
375     if (!$cb->($self)) {
376     if ($self->{eof}) {
377     # no progress can be made (not enough data and no data forthcoming)
378     $! = &Errno::EPIPE; return $self->error;
379     }
380    
381     unshift @{ $self->{queue} }, $cb;
382 root 1.8 return;
383     }
384     } elsif ($self->{on_read}) {
385     $self->{on_read}($self);
386    
387     if (
388     $self->{eof} # if no further data will arrive
389     && $len == length $self->{rbuf} # and no data has been consumed
390     && !@{ $self->{queue} } # and the queue is still empty
391     && $self->{on_read} # and we still want to read data
392     ) {
393     # then no progress can be made
394     $! = &Errno::EPIPE; return $self->error;
395 elmex 1.1 }
396 root 1.8 } else {
397     # read side becomes idle
398     delete $self->{rw};
399     return;
400     }
401     }
402    
403     if ($self->{eof}) {
404     $self->_shutdown;
405     $self->{on_eof}($self);
406     }
407 elmex 1.1 }
408    
409 root 1.8 =item $handle->on_read ($cb)
410 elmex 1.1
411 root 1.8 This replaces the currently set C<on_read> callback, or clears it (when
412     the new callback is C<undef>). See the description of C<on_read> in the
413     constructor.
414 elmex 1.1
415 root 1.8 =cut
416    
417     sub on_read {
418     my ($self, $cb) = @_;
419 elmex 1.1
420 root 1.8 $self->{on_read} = $cb;
421 elmex 1.1 }
422    
423 root 1.8 =item $handle->rbuf
424    
425     Returns the read buffer (as a modifiable lvalue).
426 elmex 1.1
427 root 1.8 You can access the read buffer directly as the C<< ->{rbuf} >> member, if
428     you want.
429 elmex 1.1
430 root 1.8 NOTE: The read buffer should only be used or modified if the C<on_read>,
431     C<push_read> or C<unshift_read> methods are used. The other read methods
432     automatically manage the read buffer.
433 elmex 1.1
434     =cut
435    
436 elmex 1.2 sub rbuf : lvalue {
437 root 1.8 $_[0]{rbuf}
438 elmex 1.2 }
439 elmex 1.1
440 root 1.8 =item $handle->push_read ($cb)
441    
442     =item $handle->unshift_read ($cb)
443    
444     Append the given callback to the end of the queue (C<push_read>) or
445     prepend it (C<unshift_read>).
446    
447     The callback is called each time some additional read data arrives.
448 elmex 1.1
449 root 1.8 It must check wether enough data is in the read buffer already.
450 elmex 1.1
451 root 1.8 If not enough data is available, it must return the empty list or a false
452     value, in which case it will be called repeatedly until enough data is
453     available (or an error condition is detected).
454    
455     If enough data was available, then the callback must remove all data it is
456     interested in (which can be none at all) and return a true value. After returning
457     true, it will be removed from the queue.
458 elmex 1.1
459     =cut
460    
461 root 1.8 sub push_read {
462     my ($self, $cb) = @_;
463 elmex 1.1
464 root 1.8 push @{ $self->{queue} }, $cb;
465     $self->_drain_rbuf;
466 elmex 1.1 }
467    
468 root 1.8 sub unshift_read {
469     my ($self, $cb) = @_;
470    
471     push @{ $self->{queue} }, $cb;
472     $self->_drain_rbuf;
473     }
474 elmex 1.1
475 root 1.8 =item $handle->push_read_chunk ($len, $cb->($self, $data))
476 elmex 1.1
477 root 1.8 =item $handle->unshift_read_chunk ($len, $cb->($self, $data))
478 elmex 1.1
479 root 1.8 Append the given callback to the end of the queue (C<push_read_chunk>) or
480     prepend it (C<unshift_read_chunk>).
481 elmex 1.1
482 root 1.8 The callback will be called only once C<$len> bytes have been read, and
483     these C<$len> bytes will be passed to the callback.
484 elmex 1.1
485     =cut
486    
487 root 1.8 sub _read_chunk($$) {
488 root 1.10 my ($self, $len, $cb) = @_;
489 elmex 1.1
490 root 1.8 sub {
491     $len <= length $_[0]{rbuf} or return;
492 elmex 1.12 $cb->($_[0], substr $_[0]{rbuf}, 0, $len, "");
493 root 1.8 1
494     }
495     }
496    
497     sub push_read_chunk {
498 root 1.10 $_[0]->push_read (&_read_chunk);
499 root 1.8 }
500 elmex 1.1
501 elmex 1.5
502 root 1.8 sub unshift_read_chunk {
503 root 1.10 $_[0]->unshift_read (&_read_chunk);
504 elmex 1.1 }
505    
506 root 1.8 =item $handle->push_read_line ([$eol, ]$cb->($self, $line, $eol))
507 elmex 1.1
508 root 1.8 =item $handle->unshift_read_line ([$eol, ]$cb->($self, $line, $eol))
509 elmex 1.1
510 root 1.8 Append the given callback to the end of the queue (C<push_read_line>) or
511     prepend it (C<unshift_read_line>).
512 elmex 1.1
513 root 1.8 The callback will be called only once a full line (including the end of
514     line marker, C<$eol>) has been read. This line (excluding the end of line
515     marker) will be passed to the callback as second argument (C<$line>), and
516     the end of line marker as the third argument (C<$eol>).
517 elmex 1.1
518 root 1.8 The end of line marker, C<$eol>, can be either a string, in which case it
519     will be interpreted as a fixed record end marker, or it can be a regex
520     object (e.g. created by C<qr>), in which case it is interpreted as a
521     regular expression.
522 elmex 1.1
523 root 1.8 The end of line marker argument C<$eol> is optional, if it is missing (NOT
524     undef), then C<qr|\015?\012|> is used (which is good for most internet
525     protocols).
526 elmex 1.1
527 root 1.8 Partial lines at the end of the stream will never be returned, as they are
528     not marked by the end of line marker.
529 elmex 1.1
530 root 1.8 =cut
531 elmex 1.1
532 root 1.8 sub _read_line($$) {
533 root 1.10 my $self = shift;
534 root 1.8 my $cb = pop;
535     my $eol = @_ ? shift : qr|(\015?\012)|;
536     my $pos;
537 elmex 1.1
538 root 1.14 $eol = quotemeta $eol unless ref $eol;
539     $eol = qr|^(.*?)($eol)|s;
540 elmex 1.1
541 root 1.8 sub {
542     $_[0]{rbuf} =~ s/$eol// or return;
543 elmex 1.1
544 elmex 1.12 $cb->($_[0], $1, $2);
545 root 1.8 1
546     }
547     }
548 elmex 1.1
549 root 1.8 sub push_read_line {
550 root 1.10 $_[0]->push_read (&_read_line);
551     }
552    
553     sub unshift_read_line {
554     $_[0]->unshift_read (&_read_line);
555     }
556    
557     =item $handle->stop_read
558    
559     =item $handle->start_read
560    
561     In rare cases you actually do not want to read anything form the
562     socket. In this case you can call C<stop_read>. Neither C<on_read> no
563     any queued callbacks will be executed then. To start readign again, call
564     C<start_read>.
565    
566     =cut
567    
568     sub stop_read {
569     my ($self) = @_;
570 elmex 1.1
571 root 1.10 delete $self->{rw};
572 root 1.8 }
573 elmex 1.1
574 root 1.10 sub start_read {
575     my ($self) = @_;
576    
577     unless ($self->{rw} || $self->{eof}) {
578     Scalar::Util::weaken $self;
579    
580     $self->{rw} = AnyEvent->io (fh => $self->{fh}, poll => "r", cb => sub {
581     my $len = sysread $self->{fh}, $self->{rbuf}, $self->{read_size} || 8192, length $self->{rbuf};
582    
583     if ($len > 0) {
584 root 1.11 if (defined $self->{rbuf_max}) {
585 root 1.10 if ($self->{rbuf_max} < length $self->{rbuf}) {
586     $! = &Errno::ENOSPC; return $self->error;
587     }
588     }
589    
590     } elsif (defined $len) {
591     $self->{eof} = 1;
592     delete $self->{rw};
593    
594     } elsif ($! != EAGAIN && $! != EINTR) {
595     return $self->error;
596     }
597 elmex 1.1
598 root 1.10 $self->_drain_rbuf;
599     });
600     }
601 elmex 1.1 }
602    
603     =back
604    
605     =head1 AUTHOR
606    
607 root 1.8 Robin Redeker C<< <elmex at ta-sa.org> >>, Marc Lehmann <schmorp@schmorp.de>.
608 elmex 1.1
609     =cut
610    
611     1; # End of AnyEvent::Handle