ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent/lib/AnyEvent/Handle.pm
Revision: 1.61
Committed: Fri Jun 6 10:23:50 2008 UTC (15 years, 11 months ago) by root
Branch: MAIN
Changes since 1.60: +96 -42 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 elmex 1.1 package AnyEvent::Handle;
2    
3 elmex 1.6 no warnings;
4 elmex 1.1 use strict;
5    
6 root 1.8 use AnyEvent ();
7 root 1.42 use AnyEvent::Util qw(WSAEWOULDBLOCK);
8 root 1.8 use Scalar::Util ();
9     use Carp ();
10     use Fcntl ();
11 root 1.43 use Errno qw(EAGAIN EINTR);
12 elmex 1.1
13     =head1 NAME
14    
15 root 1.22 AnyEvent::Handle - non-blocking I/O on file handles via AnyEvent
16 elmex 1.1
17     =cut
18    
19 root 1.60 our $VERSION = 4.14;
20 elmex 1.1
21     =head1 SYNOPSIS
22    
23     use AnyEvent;
24     use AnyEvent::Handle;
25    
26     my $cv = AnyEvent->condvar;
27    
28 root 1.31 my $handle =
29 elmex 1.2 AnyEvent::Handle->new (
30     fh => \*STDIN,
31     on_eof => sub {
32     $cv->broadcast;
33     },
34     );
35    
36 root 1.31 # send some request line
37     $handle->push_write ("getinfo\015\012");
38    
39     # read the response line
40     $handle->push_read (line => sub {
41     my ($handle, $line) = @_;
42     warn "read line <$line>\n";
43     $cv->send;
44     });
45    
46     $cv->recv;
47 elmex 1.1
48     =head1 DESCRIPTION
49    
50 root 1.8 This module is a helper module to make it easier to do event-based I/O on
51 elmex 1.13 filehandles. For utility functions for doing non-blocking connects and accepts
52     on sockets see L<AnyEvent::Util>.
53 root 1.8
54     In the following, when the documentation refers to of "bytes" then this
55     means characters. As sysread and syswrite are used for all I/O, their
56     treatment of characters applies to this module as well.
57 elmex 1.1
58 root 1.8 All callbacks will be invoked with the handle object as their first
59     argument.
60 elmex 1.1
61     =head1 METHODS
62    
63     =over 4
64    
65     =item B<new (%args)>
66    
67 root 1.8 The constructor supports these arguments (all as key => value pairs).
68 elmex 1.1
69     =over 4
70    
71 root 1.8 =item fh => $filehandle [MANDATORY]
72 elmex 1.1
73     The filehandle this L<AnyEvent::Handle> object will operate on.
74    
75 root 1.8 NOTE: The filehandle will be set to non-blocking (using
76     AnyEvent::Util::fh_nonblocking).
77    
78 root 1.40 =item on_eof => $cb->($handle)
79 root 1.10
80 root 1.52 Set the callback to be called when an end-of-file condition is detcted,
81     i.e. in the case of a socket, when the other side has closed the
82     connection cleanly.
83 root 1.8
84 root 1.16 While not mandatory, it is highly recommended to set an eof callback,
85     otherwise you might end up with a closed socket while you are still
86     waiting for data.
87    
88 root 1.52 =item on_error => $cb->($handle, $fatal)
89 root 1.10
90 root 1.52 This is the error callback, which is called when, well, some error
91     occured, such as not being able to resolve the hostname, failure to
92     connect or a read error.
93    
94     Some errors are fatal (which is indicated by C<$fatal> being true). On
95     fatal errors the handle object will be shut down and will not be
96     usable. Non-fatal errors can be retried by simply returning, but it is
97     recommended to simply ignore this parameter and instead abondon the handle
98     object when this callback is invoked.
99 root 1.8
100 root 1.10 On callback entrance, the value of C<$!> contains the operating system
101 root 1.43 error (or C<ENOSPC>, C<EPIPE>, C<ETIMEDOUT> or C<EBADMSG>).
102 root 1.8
103 root 1.10 While not mandatory, it is I<highly> recommended to set this callback, as
104     you will not be notified of errors otherwise. The default simply calls
105 root 1.52 C<croak>.
106 root 1.8
107 root 1.40 =item on_read => $cb->($handle)
108 root 1.8
109     This sets the default read callback, which is called when data arrives
110 root 1.61 and no read request is in the queue (unlike read queue callbacks, this
111     callback will only be called when at least one octet of data is in the
112     read buffer).
113 root 1.8
114     To access (and remove data from) the read buffer, use the C<< ->rbuf >>
115 root 1.40 method or access the C<$handle->{rbuf}> member directly.
116 root 1.8
117     When an EOF condition is detected then AnyEvent::Handle will first try to
118     feed all the remaining data to the queued callbacks and C<on_read> before
119     calling the C<on_eof> callback. If no progress can be made, then a fatal
120     error will be raised (with C<$!> set to C<EPIPE>).
121 elmex 1.1
122 root 1.40 =item on_drain => $cb->($handle)
123 elmex 1.1
124 root 1.8 This sets the callback that is called when the write buffer becomes empty
125     (or when the callback is set and the buffer is empty already).
126 elmex 1.1
127 root 1.8 To append to the write buffer, use the C<< ->push_write >> method.
128 elmex 1.2
129 root 1.43 =item timeout => $fractional_seconds
130    
131     If non-zero, then this enables an "inactivity" timeout: whenever this many
132     seconds pass without a successful read or write on the underlying file
133     handle, the C<on_timeout> callback will be invoked (and if that one is
134 root 1.45 missing, an C<ETIMEDOUT> error will be raised).
135 root 1.43
136     Note that timeout processing is also active when you currently do not have
137     any outstanding read or write requests: If you plan to keep the connection
138     idle then you should disable the timout temporarily or ignore the timeout
139     in the C<on_timeout> callback.
140    
141     Zero (the default) disables this timeout.
142    
143     =item on_timeout => $cb->($handle)
144    
145     Called whenever the inactivity timeout passes. If you return from this
146     callback, then the timeout will be reset as if some activity had happened,
147     so this condition is not fatal in any way.
148    
149 root 1.8 =item rbuf_max => <bytes>
150 elmex 1.2
151 root 1.8 If defined, then a fatal error will be raised (with C<$!> set to C<ENOSPC>)
152     when the read buffer ever (strictly) exceeds this size. This is useful to
153     avoid denial-of-service attacks.
154 elmex 1.2
155 root 1.8 For example, a server accepting connections from untrusted sources should
156     be configured to accept only so-and-so much data that it cannot act on
157     (for example, when expecting a line, an attacker could send an unlimited
158     amount of data without a callback ever being called as long as the line
159     isn't finished).
160 elmex 1.2
161 root 1.8 =item read_size => <bytes>
162 elmex 1.2
163 root 1.8 The default read block size (the amount of bytes this module will try to read
164 root 1.46 during each (loop iteration). Default: C<8192>.
165 root 1.8
166     =item low_water_mark => <bytes>
167    
168     Sets the amount of bytes (default: C<0>) that make up an "empty" write
169     buffer: If the write reaches this size or gets even samller it is
170     considered empty.
171 elmex 1.2
172 root 1.19 =item tls => "accept" | "connect" | Net::SSLeay::SSL object
173    
174     When this parameter is given, it enables TLS (SSL) mode, that means it
175     will start making tls handshake and will transparently encrypt/decrypt
176     data.
177    
178 root 1.26 TLS mode requires Net::SSLeay to be installed (it will be loaded
179     automatically when you try to create a TLS handle).
180    
181 root 1.19 For the TLS server side, use C<accept>, and for the TLS client side of a
182     connection, use C<connect> mode.
183    
184     You can also provide your own TLS connection object, but you have
185     to make sure that you call either C<Net::SSLeay::set_connect_state>
186     or C<Net::SSLeay::set_accept_state> on it before you pass it to
187     AnyEvent::Handle.
188    
189 root 1.26 See the C<starttls> method if you need to start TLs negotiation later.
190    
191 root 1.19 =item tls_ctx => $ssl_ctx
192    
193     Use the given Net::SSLeay::CTX object to create the new TLS connection
194     (unless a connection object was specified directly). If this parameter is
195     missing, then AnyEvent::Handle will use C<AnyEvent::Handle::TLS_CTX>.
196    
197 root 1.40 =item json => JSON or JSON::XS object
198    
199     This is the json coder object used by the C<json> read and write types.
200    
201 root 1.41 If you don't supply it, then AnyEvent::Handle will create and use a
202     suitable one, which will write and expect UTF-8 encoded JSON texts.
203 root 1.40
204     Note that you are responsible to depend on the JSON module if you want to
205     use this functionality, as AnyEvent does not have a dependency itself.
206    
207 root 1.38 =item filter_r => $cb
208    
209     =item filter_w => $cb
210    
211     These exist, but are undocumented at this time.
212    
213 elmex 1.1 =back
214    
215     =cut
216    
217     sub new {
218 root 1.8 my $class = shift;
219    
220     my $self = bless { @_ }, $class;
221    
222     $self->{fh} or Carp::croak "mandatory argument fh is missing";
223    
224     AnyEvent::Util::fh_nonblocking $self->{fh}, 1;
225 elmex 1.1
226 root 1.19 if ($self->{tls}) {
227     require Net::SSLeay;
228     $self->starttls (delete $self->{tls}, delete $self->{tls_ctx});
229     }
230    
231 root 1.44 $self->{_activity} = AnyEvent->now;
232 root 1.43 $self->_timeout;
233 elmex 1.1
234 root 1.58 $self->on_drain (delete $self->{on_drain}) if $self->{on_drain};
235 root 1.10
236 root 1.8 $self
237     }
238 elmex 1.2
239 root 1.8 sub _shutdown {
240     my ($self) = @_;
241 elmex 1.2
242 root 1.46 delete $self->{_tw};
243 root 1.38 delete $self->{_rw};
244     delete $self->{_ww};
245 root 1.8 delete $self->{fh};
246 root 1.52
247     $self->stoptls;
248 root 1.8 }
249    
250 root 1.52 sub _error {
251     my ($self, $errno, $fatal) = @_;
252 root 1.8
253 root 1.52 $self->_shutdown
254     if $fatal;
255 elmex 1.1
256 root 1.52 $! = $errno;
257 root 1.37
258 root 1.52 if ($self->{on_error}) {
259     $self->{on_error}($self, $fatal);
260     } else {
261     Carp::croak "AnyEvent::Handle uncaught error: $!";
262     }
263 elmex 1.1 }
264    
265 root 1.8 =item $fh = $handle->fh
266 elmex 1.1
267 root 1.22 This method returns the file handle of the L<AnyEvent::Handle> object.
268 elmex 1.1
269     =cut
270    
271 root 1.38 sub fh { $_[0]{fh} }
272 elmex 1.1
273 root 1.8 =item $handle->on_error ($cb)
274 elmex 1.1
275 root 1.8 Replace the current C<on_error> callback (see the C<on_error> constructor argument).
276 elmex 1.1
277 root 1.8 =cut
278    
279     sub on_error {
280     $_[0]{on_error} = $_[1];
281     }
282    
283     =item $handle->on_eof ($cb)
284    
285     Replace the current C<on_eof> callback (see the C<on_eof> constructor argument).
286 elmex 1.1
287     =cut
288    
289 root 1.8 sub on_eof {
290     $_[0]{on_eof} = $_[1];
291     }
292    
293 root 1.43 =item $handle->on_timeout ($cb)
294    
295     Replace the current C<on_timeout> callback, or disables the callback
296     (but not the timeout) if C<$cb> = C<undef>. See C<timeout> constructor
297     argument.
298    
299     =cut
300    
301     sub on_timeout {
302     $_[0]{on_timeout} = $_[1];
303     }
304    
305     #############################################################################
306    
307     =item $handle->timeout ($seconds)
308    
309     Configures (or disables) the inactivity timeout.
310    
311     =cut
312    
313     sub timeout {
314     my ($self, $timeout) = @_;
315    
316     $self->{timeout} = $timeout;
317     $self->_timeout;
318     }
319    
320     # reset the timeout watcher, as neccessary
321     # also check for time-outs
322     sub _timeout {
323     my ($self) = @_;
324    
325     if ($self->{timeout}) {
326 root 1.44 my $NOW = AnyEvent->now;
327 root 1.43
328     # when would the timeout trigger?
329     my $after = $self->{_activity} + $self->{timeout} - $NOW;
330    
331     # now or in the past already?
332     if ($after <= 0) {
333     $self->{_activity} = $NOW;
334    
335     if ($self->{on_timeout}) {
336 root 1.48 $self->{on_timeout}($self);
337 root 1.43 } else {
338 root 1.52 $self->_error (&Errno::ETIMEDOUT);
339 root 1.43 }
340    
341 root 1.56 # callback could have changed timeout value, optimise
342 root 1.43 return unless $self->{timeout};
343    
344     # calculate new after
345     $after = $self->{timeout};
346     }
347    
348     Scalar::Util::weaken $self;
349 root 1.56 return unless $self; # ->error could have destroyed $self
350 root 1.43
351     $self->{_tw} ||= AnyEvent->timer (after => $after, cb => sub {
352     delete $self->{_tw};
353     $self->_timeout;
354     });
355     } else {
356     delete $self->{_tw};
357     }
358     }
359    
360 root 1.9 #############################################################################
361    
362     =back
363    
364     =head2 WRITE QUEUE
365    
366     AnyEvent::Handle manages two queues per handle, one for writing and one
367     for reading.
368    
369     The write queue is very simple: you can add data to its end, and
370     AnyEvent::Handle will automatically try to get rid of it for you.
371    
372 elmex 1.20 When data could be written and the write buffer is shorter then the low
373 root 1.9 water mark, the C<on_drain> callback will be invoked.
374    
375     =over 4
376    
377 root 1.8 =item $handle->on_drain ($cb)
378    
379     Sets the C<on_drain> callback or clears it (see the description of
380     C<on_drain> in the constructor).
381    
382     =cut
383    
384     sub on_drain {
385 elmex 1.1 my ($self, $cb) = @_;
386    
387 root 1.8 $self->{on_drain} = $cb;
388    
389     $cb->($self)
390     if $cb && $self->{low_water_mark} >= length $self->{wbuf};
391     }
392    
393     =item $handle->push_write ($data)
394    
395     Queues the given scalar to be written. You can push as much data as you
396     want (only limited by the available memory), as C<AnyEvent::Handle>
397     buffers it independently of the kernel.
398    
399     =cut
400    
401 root 1.17 sub _drain_wbuf {
402     my ($self) = @_;
403 root 1.8
404 root 1.38 if (!$self->{_ww} && length $self->{wbuf}) {
405 root 1.35
406 root 1.8 Scalar::Util::weaken $self;
407 root 1.35
408 root 1.8 my $cb = sub {
409     my $len = syswrite $self->{fh}, $self->{wbuf};
410    
411 root 1.29 if ($len >= 0) {
412 root 1.8 substr $self->{wbuf}, 0, $len, "";
413    
414 root 1.44 $self->{_activity} = AnyEvent->now;
415 root 1.43
416 root 1.8 $self->{on_drain}($self)
417     if $self->{low_water_mark} >= length $self->{wbuf}
418     && $self->{on_drain};
419    
420 root 1.38 delete $self->{_ww} unless length $self->{wbuf};
421 root 1.42 } elsif ($! != EAGAIN && $! != EINTR && $! != WSAEWOULDBLOCK) {
422 root 1.52 $self->_error ($!, 1);
423 elmex 1.1 }
424 root 1.8 };
425    
426 root 1.35 # try to write data immediately
427     $cb->();
428 root 1.8
429 root 1.35 # if still data left in wbuf, we need to poll
430 root 1.38 $self->{_ww} = AnyEvent->io (fh => $self->{fh}, poll => "w", cb => $cb)
431 root 1.35 if length $self->{wbuf};
432 root 1.8 };
433     }
434    
435 root 1.30 our %WH;
436    
437     sub register_write_type($$) {
438     $WH{$_[0]} = $_[1];
439     }
440    
441 root 1.17 sub push_write {
442     my $self = shift;
443    
444 root 1.29 if (@_ > 1) {
445     my $type = shift;
446    
447     @_ = ($WH{$type} or Carp::croak "unsupported type passed to AnyEvent::Handle::push_write")
448     ->($self, @_);
449     }
450    
451 root 1.17 if ($self->{filter_w}) {
452 root 1.48 $self->{filter_w}($self, \$_[0]);
453 root 1.17 } else {
454     $self->{wbuf} .= $_[0];
455     $self->_drain_wbuf;
456     }
457     }
458    
459 root 1.29 =item $handle->push_write (type => @args)
460    
461     Instead of formatting your data yourself, you can also let this module do
462     the job by specifying a type and type-specific arguments.
463    
464 root 1.30 Predefined types are (if you have ideas for additional types, feel free to
465     drop by and tell us):
466 root 1.29
467     =over 4
468    
469     =item netstring => $string
470    
471     Formats the given value as netstring
472     (http://cr.yp.to/proto/netstrings.txt, this is not a recommendation to use them).
473    
474     =cut
475    
476     register_write_type netstring => sub {
477     my ($self, $string) = @_;
478    
479     sprintf "%d:%s,", (length $string), $string
480     };
481    
482 root 1.61 =item packstring => $format, $data
483    
484     An octet string prefixed with an encoded length. The encoding C<$format>
485     uses the same format as a Perl C<pack> format, but must specify a single
486     integer only (only one of C<cCsSlLqQiInNvVjJw> is allowed, plus an
487     optional C<!>, C<< < >> or C<< > >> modifier).
488    
489     =cut
490    
491     register_write_type packstring => sub {
492     my ($self, $format, $string) = @_;
493    
494     pack "$format/a", $string
495     };
496    
497 root 1.39 =item json => $array_or_hashref
498    
499 root 1.40 Encodes the given hash or array reference into a JSON object. Unless you
500     provide your own JSON object, this means it will be encoded to JSON text
501     in UTF-8.
502    
503     JSON objects (and arrays) are self-delimiting, so you can write JSON at
504     one end of a handle and read them at the other end without using any
505     additional framing.
506    
507 root 1.41 The generated JSON text is guaranteed not to contain any newlines: While
508     this module doesn't need delimiters after or between JSON texts to be
509     able to read them, many other languages depend on that.
510    
511     A simple RPC protocol that interoperates easily with others is to send
512     JSON arrays (or objects, although arrays are usually the better choice as
513     they mimic how function argument passing works) and a newline after each
514     JSON text:
515    
516     $handle->push_write (json => ["method", "arg1", "arg2"]); # whatever
517     $handle->push_write ("\012");
518    
519     An AnyEvent::Handle receiver would simply use the C<json> read type and
520     rely on the fact that the newline will be skipped as leading whitespace:
521    
522     $handle->push_read (json => sub { my $array = $_[1]; ... });
523    
524     Other languages could read single lines terminated by a newline and pass
525     this line into their JSON decoder of choice.
526    
527 root 1.40 =cut
528    
529     register_write_type json => sub {
530     my ($self, $ref) = @_;
531    
532     require JSON;
533    
534     $self->{json} ? $self->{json}->encode ($ref)
535     : JSON::encode_json ($ref)
536     };
537    
538 root 1.53 =back
539    
540 root 1.40 =item AnyEvent::Handle::register_write_type type => $coderef->($handle, @args)
541 root 1.30
542     This function (not method) lets you add your own types to C<push_write>.
543     Whenever the given C<type> is used, C<push_write> will invoke the code
544     reference with the handle object and the remaining arguments.
545 root 1.29
546 root 1.30 The code reference is supposed to return a single octet string that will
547     be appended to the write buffer.
548 root 1.29
549 root 1.30 Note that this is a function, and all types registered this way will be
550     global, so try to use unique names.
551 root 1.29
552 root 1.30 =cut
553 root 1.29
554 root 1.8 #############################################################################
555    
556 root 1.9 =back
557    
558     =head2 READ QUEUE
559    
560     AnyEvent::Handle manages two queues per handle, one for writing and one
561     for reading.
562    
563     The read queue is more complex than the write queue. It can be used in two
564     ways, the "simple" way, using only C<on_read> and the "complex" way, using
565     a queue.
566    
567     In the simple case, you just install an C<on_read> callback and whenever
568     new data arrives, it will be called. You can then remove some data (if
569     enough is there) from the read buffer (C<< $handle->rbuf >>) if you want
570     or not.
571    
572     In the more complex case, you want to queue multiple callbacks. In this
573     case, AnyEvent::Handle will call the first queued callback each time new
574 root 1.61 data arrives (also the first time it is queued) and removes it when it has
575     done its job (see C<push_read>, below).
576 root 1.9
577     This way you can, for example, push three line-reads, followed by reading
578     a chunk of data, and AnyEvent::Handle will execute them in order.
579    
580     Example 1: EPP protocol parser. EPP sends 4 byte length info, followed by
581     the specified number of bytes which give an XML datagram.
582    
583     # in the default state, expect some header bytes
584     $handle->on_read (sub {
585     # some data is here, now queue the length-header-read (4 octets)
586 root 1.52 shift->unshift_read (chunk => 4, sub {
587 root 1.9 # header arrived, decode
588     my $len = unpack "N", $_[1];
589    
590     # now read the payload
591 root 1.52 shift->unshift_read (chunk => $len, sub {
592 root 1.9 my $xml = $_[1];
593     # handle xml
594     });
595     });
596     });
597    
598     Example 2: Implement a client for a protocol that replies either with
599     "OK" and another line or "ERROR" for one request, and 64 bytes for the
600     second request. Due tot he availability of a full queue, we can just
601     pipeline sending both requests and manipulate the queue as necessary in
602     the callbacks:
603    
604     # request one
605     $handle->push_write ("request 1\015\012");
606    
607     # we expect "ERROR" or "OK" as response, so push a line read
608 root 1.52 $handle->push_read (line => sub {
609 root 1.9 # if we got an "OK", we have to _prepend_ another line,
610     # so it will be read before the second request reads its 64 bytes
611     # which are already in the queue when this callback is called
612     # we don't do this in case we got an error
613     if ($_[1] eq "OK") {
614 root 1.52 $_[0]->unshift_read (line => sub {
615 root 1.9 my $response = $_[1];
616     ...
617     });
618     }
619     });
620    
621     # request two
622     $handle->push_write ("request 2\015\012");
623    
624     # simply read 64 bytes, always
625 root 1.52 $handle->push_read (chunk => 64, sub {
626 root 1.9 my $response = $_[1];
627     ...
628     });
629    
630     =over 4
631    
632 root 1.10 =cut
633    
634 root 1.8 sub _drain_rbuf {
635     my ($self) = @_;
636 elmex 1.1
637 root 1.59 local $self->{_in_drain} = 1;
638    
639 root 1.17 if (
640     defined $self->{rbuf_max}
641     && $self->{rbuf_max} < length $self->{rbuf}
642     ) {
643 root 1.52 return $self->_error (&Errno::ENOSPC, 1);
644 root 1.17 }
645    
646 root 1.59 while () {
647     no strict 'refs';
648    
649     my $len = length $self->{rbuf};
650 elmex 1.1
651 root 1.38 if (my $cb = shift @{ $self->{_queue} }) {
652 root 1.29 unless ($cb->($self)) {
653 root 1.38 if ($self->{_eof}) {
654 root 1.10 # no progress can be made (not enough data and no data forthcoming)
655 root 1.61 $self->_error (&Errno::EPIPE, 1), last;
656 root 1.10 }
657    
658 root 1.38 unshift @{ $self->{_queue} }, $cb;
659 root 1.55 last;
660 root 1.8 }
661     } elsif ($self->{on_read}) {
662 root 1.61 last unless $len;
663    
664 root 1.8 $self->{on_read}($self);
665    
666     if (
667 root 1.55 $len == length $self->{rbuf} # if no data has been consumed
668     && !@{ $self->{_queue} } # and the queue is still empty
669     && $self->{on_read} # but we still have on_read
670 root 1.8 ) {
671 root 1.55 # no further data will arrive
672     # so no progress can be made
673 root 1.61 $self->_error (&Errno::EPIPE, 1), last
674 root 1.55 if $self->{_eof};
675    
676     last; # more data might arrive
677 elmex 1.1 }
678 root 1.8 } else {
679     # read side becomes idle
680 root 1.38 delete $self->{_rw};
681 root 1.55 last;
682 root 1.8 }
683     }
684    
685 root 1.48 $self->{on_eof}($self)
686     if $self->{_eof} && $self->{on_eof};
687 root 1.55
688     # may need to restart read watcher
689     unless ($self->{_rw}) {
690     $self->start_read
691     if $self->{on_read} || @{ $self->{_queue} };
692     }
693 elmex 1.1 }
694    
695 root 1.8 =item $handle->on_read ($cb)
696 elmex 1.1
697 root 1.8 This replaces the currently set C<on_read> callback, or clears it (when
698     the new callback is C<undef>). See the description of C<on_read> in the
699     constructor.
700 elmex 1.1
701 root 1.8 =cut
702    
703     sub on_read {
704     my ($self, $cb) = @_;
705 elmex 1.1
706 root 1.8 $self->{on_read} = $cb;
707 root 1.59 $self->_drain_rbuf if $cb && !$self->{_in_drain};
708 elmex 1.1 }
709    
710 root 1.8 =item $handle->rbuf
711    
712     Returns the read buffer (as a modifiable lvalue).
713 elmex 1.1
714 root 1.8 You can access the read buffer directly as the C<< ->{rbuf} >> member, if
715     you want.
716 elmex 1.1
717 root 1.8 NOTE: The read buffer should only be used or modified if the C<on_read>,
718     C<push_read> or C<unshift_read> methods are used. The other read methods
719     automatically manage the read buffer.
720 elmex 1.1
721     =cut
722    
723 elmex 1.2 sub rbuf : lvalue {
724 root 1.8 $_[0]{rbuf}
725 elmex 1.2 }
726 elmex 1.1
727 root 1.8 =item $handle->push_read ($cb)
728    
729     =item $handle->unshift_read ($cb)
730    
731     Append the given callback to the end of the queue (C<push_read>) or
732     prepend it (C<unshift_read>).
733    
734     The callback is called each time some additional read data arrives.
735 elmex 1.1
736 elmex 1.20 It must check whether enough data is in the read buffer already.
737 elmex 1.1
738 root 1.8 If not enough data is available, it must return the empty list or a false
739     value, in which case it will be called repeatedly until enough data is
740     available (or an error condition is detected).
741    
742     If enough data was available, then the callback must remove all data it is
743     interested in (which can be none at all) and return a true value. After returning
744     true, it will be removed from the queue.
745 elmex 1.1
746     =cut
747    
748 root 1.30 our %RH;
749    
750     sub register_read_type($$) {
751     $RH{$_[0]} = $_[1];
752     }
753    
754 root 1.8 sub push_read {
755 root 1.28 my $self = shift;
756     my $cb = pop;
757    
758     if (@_) {
759     my $type = shift;
760    
761     $cb = ($RH{$type} or Carp::croak "unsupported type passed to AnyEvent::Handle::push_read")
762     ->($self, $cb, @_);
763     }
764 elmex 1.1
765 root 1.38 push @{ $self->{_queue} }, $cb;
766 root 1.59 $self->_drain_rbuf unless $self->{_in_drain};
767 elmex 1.1 }
768    
769 root 1.8 sub unshift_read {
770 root 1.28 my $self = shift;
771     my $cb = pop;
772    
773     if (@_) {
774     my $type = shift;
775    
776     $cb = ($RH{$type} or Carp::croak "unsupported type passed to AnyEvent::Handle::unshift_read")
777     ->($self, $cb, @_);
778     }
779    
780 root 1.8
781 root 1.38 unshift @{ $self->{_queue} }, $cb;
782 root 1.59 $self->_drain_rbuf unless $self->{_in_drain};
783 root 1.8 }
784 elmex 1.1
785 root 1.28 =item $handle->push_read (type => @args, $cb)
786 elmex 1.1
787 root 1.28 =item $handle->unshift_read (type => @args, $cb)
788 elmex 1.1
789 root 1.28 Instead of providing a callback that parses the data itself you can chose
790     between a number of predefined parsing formats, for chunks of data, lines
791     etc.
792 elmex 1.1
793 root 1.30 Predefined types are (if you have ideas for additional types, feel free to
794     drop by and tell us):
795 root 1.28
796     =over 4
797    
798 root 1.40 =item chunk => $octets, $cb->($handle, $data)
799 root 1.28
800     Invoke the callback only once C<$octets> bytes have been read. Pass the
801     data read to the callback. The callback will never be called with less
802     data.
803    
804     Example: read 2 bytes.
805    
806     $handle->push_read (chunk => 2, sub {
807     warn "yay ", unpack "H*", $_[1];
808     });
809 elmex 1.1
810     =cut
811    
812 root 1.28 register_read_type chunk => sub {
813     my ($self, $cb, $len) = @_;
814 elmex 1.1
815 root 1.8 sub {
816     $len <= length $_[0]{rbuf} or return;
817 elmex 1.12 $cb->($_[0], substr $_[0]{rbuf}, 0, $len, "");
818 root 1.8 1
819     }
820 root 1.28 };
821 root 1.8
822 root 1.28 # compatibility with older API
823 root 1.8 sub push_read_chunk {
824 root 1.28 $_[0]->push_read (chunk => $_[1], $_[2]);
825 root 1.8 }
826 elmex 1.1
827 root 1.8 sub unshift_read_chunk {
828 root 1.28 $_[0]->unshift_read (chunk => $_[1], $_[2]);
829 elmex 1.1 }
830    
831 root 1.40 =item line => [$eol, ]$cb->($handle, $line, $eol)
832 elmex 1.1
833 root 1.8 The callback will be called only once a full line (including the end of
834     line marker, C<$eol>) has been read. This line (excluding the end of line
835     marker) will be passed to the callback as second argument (C<$line>), and
836     the end of line marker as the third argument (C<$eol>).
837 elmex 1.1
838 root 1.8 The end of line marker, C<$eol>, can be either a string, in which case it
839     will be interpreted as a fixed record end marker, or it can be a regex
840     object (e.g. created by C<qr>), in which case it is interpreted as a
841     regular expression.
842 elmex 1.1
843 root 1.8 The end of line marker argument C<$eol> is optional, if it is missing (NOT
844     undef), then C<qr|\015?\012|> is used (which is good for most internet
845     protocols).
846 elmex 1.1
847 root 1.8 Partial lines at the end of the stream will never be returned, as they are
848     not marked by the end of line marker.
849 elmex 1.1
850 root 1.8 =cut
851 elmex 1.1
852 root 1.28 register_read_type line => sub {
853     my ($self, $cb, $eol) = @_;
854 elmex 1.1
855 root 1.28 $eol = qr|(\015?\012)| if @_ < 3;
856 root 1.14 $eol = quotemeta $eol unless ref $eol;
857     $eol = qr|^(.*?)($eol)|s;
858 elmex 1.1
859 root 1.8 sub {
860     $_[0]{rbuf} =~ s/$eol// or return;
861 elmex 1.1
862 elmex 1.12 $cb->($_[0], $1, $2);
863 root 1.8 1
864     }
865 root 1.28 };
866 elmex 1.1
867 root 1.28 # compatibility with older API
868 root 1.8 sub push_read_line {
869 root 1.28 my $self = shift;
870     $self->push_read (line => @_);
871 root 1.10 }
872    
873     sub unshift_read_line {
874 root 1.28 my $self = shift;
875     $self->unshift_read (line => @_);
876 root 1.10 }
877    
878 root 1.40 =item regex => $accept[, $reject[, $skip], $cb->($handle, $data)
879 root 1.36
880     Makes a regex match against the regex object C<$accept> and returns
881     everything up to and including the match.
882    
883     Example: read a single line terminated by '\n'.
884    
885     $handle->push_read (regex => qr<\n>, sub { ... });
886    
887     If C<$reject> is given and not undef, then it determines when the data is
888     to be rejected: it is matched against the data when the C<$accept> regex
889     does not match and generates an C<EBADMSG> error when it matches. This is
890     useful to quickly reject wrong data (to avoid waiting for a timeout or a
891     receive buffer overflow).
892    
893     Example: expect a single decimal number followed by whitespace, reject
894     anything else (not the use of an anchor).
895    
896     $handle->push_read (regex => qr<^[0-9]+\s>, qr<[^0-9]>, sub { ... });
897    
898     If C<$skip> is given and not C<undef>, then it will be matched against
899     the receive buffer when neither C<$accept> nor C<$reject> match,
900     and everything preceding and including the match will be accepted
901     unconditionally. This is useful to skip large amounts of data that you
902     know cannot be matched, so that the C<$accept> or C<$reject> regex do not
903     have to start matching from the beginning. This is purely an optimisation
904     and is usually worth only when you expect more than a few kilobytes.
905    
906     Example: expect a http header, which ends at C<\015\012\015\012>. Since we
907     expect the header to be very large (it isn't in practise, but...), we use
908     a skip regex to skip initial portions. The skip regex is tricky in that
909     it only accepts something not ending in either \015 or \012, as these are
910     required for the accept regex.
911    
912     $handle->push_read (regex =>
913     qr<\015\012\015\012>,
914     undef, # no reject
915     qr<^.*[^\015\012]>,
916     sub { ... });
917    
918     =cut
919    
920     register_read_type regex => sub {
921     my ($self, $cb, $accept, $reject, $skip) = @_;
922    
923     my $data;
924     my $rbuf = \$self->{rbuf};
925    
926     sub {
927     # accept
928     if ($$rbuf =~ $accept) {
929     $data .= substr $$rbuf, 0, $+[0], "";
930     $cb->($self, $data);
931     return 1;
932     }
933    
934     # reject
935     if ($reject && $$rbuf =~ $reject) {
936 root 1.52 $self->_error (&Errno::EBADMSG);
937 root 1.36 }
938    
939     # skip
940     if ($skip && $$rbuf =~ $skip) {
941     $data .= substr $$rbuf, 0, $+[0], "";
942     }
943    
944     ()
945     }
946     };
947    
948 root 1.61 =item netstring => $cb->($handle, $string)
949    
950     A netstring (http://cr.yp.to/proto/netstrings.txt, this is not an endorsement).
951    
952     Throws an error with C<$!> set to EBADMSG on format violations.
953    
954     =cut
955    
956     register_read_type netstring => sub {
957     my ($self, $cb) = @_;
958    
959     sub {
960     unless ($_[0]{rbuf} =~ s/^(0|[1-9][0-9]*)://) {
961     if ($_[0]{rbuf} =~ /[^0-9]/) {
962     $self->_error (&Errno::EBADMSG);
963     }
964     return;
965     }
966    
967     my $len = $1;
968    
969     $self->unshift_read (chunk => $len, sub {
970     my $string = $_[1];
971     $_[0]->unshift_read (chunk => 1, sub {
972     if ($_[1] eq ",") {
973     $cb->($_[0], $string);
974     } else {
975     $self->_error (&Errno::EBADMSG);
976     }
977     });
978     });
979    
980     1
981     }
982     };
983    
984     =item packstring => $format, $cb->($handle, $string)
985    
986     An octet string prefixed with an encoded length. The encoding C<$format>
987     uses the same format as a Perl C<pack> format, but must specify a single
988     integer only (only one of C<cCsSlLqQiInNvVjJw> is allowed, plus an
989     optional C<!>, C<< < >> or C<< > >> modifier).
990    
991     DNS over TCP uses a prefix of C<n>, EPP uses a prefix of C<N>.
992    
993     Example: read a block of data prefixed by its length in BER-encoded
994     format (very efficient).
995    
996     $handle->push_read (packstring => "w", sub {
997     my ($handle, $data) = @_;
998     });
999    
1000     =cut
1001    
1002     register_read_type packstring => sub {
1003     my ($self, $cb, $format) = @_;
1004    
1005     sub {
1006     # when we can use 5.10 we can use ".", but for 5.8 we use the re-pack method
1007     defined (my $len = eval { unpack $format, $_[0]->{rbuf} })
1008     or return;
1009    
1010     # remove prefix
1011     substr $_[0]->{rbuf}, 0, (length pack $format, $len), "";
1012    
1013     # read rest
1014     $_[0]->unshift_read (chunk => $len, $cb);
1015    
1016     1
1017     }
1018     };
1019    
1020 root 1.40 =item json => $cb->($handle, $hash_or_arrayref)
1021    
1022     Reads a JSON object or array, decodes it and passes it to the callback.
1023    
1024     If a C<json> object was passed to the constructor, then that will be used
1025     for the final decode, otherwise it will create a JSON coder expecting UTF-8.
1026    
1027     This read type uses the incremental parser available with JSON version
1028     2.09 (and JSON::XS version 2.2) and above. You have to provide a
1029     dependency on your own: this module will load the JSON module, but
1030     AnyEvent does not depend on it itself.
1031    
1032     Since JSON texts are fully self-delimiting, the C<json> read and write
1033 root 1.41 types are an ideal simple RPC protocol: just exchange JSON datagrams. See
1034     the C<json> write type description, above, for an actual example.
1035 root 1.40
1036     =cut
1037    
1038     register_read_type json => sub {
1039     my ($self, $cb, $accept, $reject, $skip) = @_;
1040    
1041     require JSON;
1042    
1043     my $data;
1044     my $rbuf = \$self->{rbuf};
1045    
1046 root 1.41 my $json = $self->{json} ||= JSON->new->utf8;
1047 root 1.40
1048     sub {
1049     my $ref = $json->incr_parse ($self->{rbuf});
1050    
1051     if ($ref) {
1052     $self->{rbuf} = $json->incr_text;
1053     $json->incr_text = "";
1054     $cb->($self, $ref);
1055    
1056     1
1057     } else {
1058     $self->{rbuf} = "";
1059     ()
1060     }
1061     }
1062     };
1063    
1064 root 1.28 =back
1065    
1066 root 1.40 =item AnyEvent::Handle::register_read_type type => $coderef->($handle, $cb, @args)
1067 root 1.30
1068     This function (not method) lets you add your own types to C<push_read>.
1069    
1070     Whenever the given C<type> is used, C<push_read> will invoke the code
1071     reference with the handle object, the callback and the remaining
1072     arguments.
1073    
1074     The code reference is supposed to return a callback (usually a closure)
1075     that works as a plain read callback (see C<< ->push_read ($cb) >>).
1076    
1077     It should invoke the passed callback when it is done reading (remember to
1078 root 1.40 pass C<$handle> as first argument as all other callbacks do that).
1079 root 1.30
1080     Note that this is a function, and all types registered this way will be
1081     global, so try to use unique names.
1082    
1083     For examples, see the source of this module (F<perldoc -m AnyEvent::Handle>,
1084     search for C<register_read_type>)).
1085    
1086 root 1.10 =item $handle->stop_read
1087    
1088     =item $handle->start_read
1089    
1090 root 1.18 In rare cases you actually do not want to read anything from the
1091 root 1.58 socket. In this case you can call C<stop_read>. Neither C<on_read> nor
1092 root 1.22 any queued callbacks will be executed then. To start reading again, call
1093 root 1.10 C<start_read>.
1094    
1095 root 1.56 Note that AnyEvent::Handle will automatically C<start_read> for you when
1096     you change the C<on_read> callback or push/unshift a read callback, and it
1097     will automatically C<stop_read> for you when neither C<on_read> is set nor
1098     there are any read requests in the queue.
1099    
1100 root 1.10 =cut
1101    
1102     sub stop_read {
1103     my ($self) = @_;
1104 elmex 1.1
1105 root 1.38 delete $self->{_rw};
1106 root 1.8 }
1107 elmex 1.1
1108 root 1.10 sub start_read {
1109     my ($self) = @_;
1110    
1111 root 1.38 unless ($self->{_rw} || $self->{_eof}) {
1112 root 1.10 Scalar::Util::weaken $self;
1113    
1114 root 1.38 $self->{_rw} = AnyEvent->io (fh => $self->{fh}, poll => "r", cb => sub {
1115 root 1.17 my $rbuf = $self->{filter_r} ? \my $buf : \$self->{rbuf};
1116     my $len = sysread $self->{fh}, $$rbuf, $self->{read_size} || 8192, length $$rbuf;
1117 root 1.10
1118     if ($len > 0) {
1119 root 1.44 $self->{_activity} = AnyEvent->now;
1120 root 1.43
1121 root 1.17 $self->{filter_r}
1122 root 1.48 ? $self->{filter_r}($self, $rbuf)
1123 root 1.59 : $self->{_in_drain} || $self->_drain_rbuf;
1124 root 1.10
1125     } elsif (defined $len) {
1126 root 1.38 delete $self->{_rw};
1127     $self->{_eof} = 1;
1128 root 1.59 $self->_drain_rbuf unless $self->{_in_drain};
1129 root 1.10
1130 root 1.42 } elsif ($! != EAGAIN && $! != EINTR && $! != WSAEWOULDBLOCK) {
1131 root 1.52 return $self->_error ($!, 1);
1132 root 1.10 }
1133     });
1134     }
1135 elmex 1.1 }
1136    
1137 root 1.19 sub _dotls {
1138     my ($self) = @_;
1139    
1140 root 1.56 my $buf;
1141    
1142 root 1.38 if (length $self->{_tls_wbuf}) {
1143     while ((my $len = Net::SSLeay::write ($self->{tls}, $self->{_tls_wbuf})) > 0) {
1144     substr $self->{_tls_wbuf}, 0, $len, "";
1145 root 1.22 }
1146 root 1.19 }
1147    
1148 root 1.56 if (length ($buf = Net::SSLeay::BIO_read ($self->{_wbio}))) {
1149 root 1.19 $self->{wbuf} .= $buf;
1150     $self->_drain_wbuf;
1151     }
1152    
1153 root 1.56 while (defined ($buf = Net::SSLeay::read ($self->{tls}))) {
1154     if (length $buf) {
1155     $self->{rbuf} .= $buf;
1156 root 1.59 $self->_drain_rbuf unless $self->{_in_drain};
1157 root 1.56 } else {
1158     # let's treat SSL-eof as we treat normal EOF
1159     $self->{_eof} = 1;
1160     $self->_shutdown;
1161     return;
1162     }
1163 root 1.23 }
1164    
1165 root 1.24 my $err = Net::SSLeay::get_error ($self->{tls}, -1);
1166    
1167     if ($err!= Net::SSLeay::ERROR_WANT_READ ()) {
1168 root 1.23 if ($err == Net::SSLeay::ERROR_SYSCALL ()) {
1169 root 1.52 return $self->_error ($!, 1);
1170 root 1.23 } elsif ($err == Net::SSLeay::ERROR_SSL ()) {
1171 root 1.52 return $self->_error (&Errno::EIO, 1);
1172 root 1.19 }
1173 root 1.23
1174     # all others are fine for our purposes
1175 root 1.19 }
1176     }
1177    
1178 root 1.25 =item $handle->starttls ($tls[, $tls_ctx])
1179    
1180     Instead of starting TLS negotiation immediately when the AnyEvent::Handle
1181     object is created, you can also do that at a later time by calling
1182     C<starttls>.
1183    
1184     The first argument is the same as the C<tls> constructor argument (either
1185     C<"connect">, C<"accept"> or an existing Net::SSLeay object).
1186    
1187     The second argument is the optional C<Net::SSLeay::CTX> object that is
1188     used when AnyEvent::Handle has to create its own TLS connection object.
1189    
1190 root 1.38 The TLS connection object will end up in C<< $handle->{tls} >> after this
1191     call and can be used or changed to your liking. Note that the handshake
1192     might have already started when this function returns.
1193    
1194 root 1.25 =cut
1195    
1196 root 1.19 sub starttls {
1197     my ($self, $ssl, $ctx) = @_;
1198    
1199 root 1.25 $self->stoptls;
1200    
1201 root 1.19 if ($ssl eq "accept") {
1202     $ssl = Net::SSLeay::new ($ctx || TLS_CTX ());
1203     Net::SSLeay::set_accept_state ($ssl);
1204     } elsif ($ssl eq "connect") {
1205     $ssl = Net::SSLeay::new ($ctx || TLS_CTX ());
1206     Net::SSLeay::set_connect_state ($ssl);
1207     }
1208    
1209     $self->{tls} = $ssl;
1210    
1211 root 1.21 # basically, this is deep magic (because SSL_read should have the same issues)
1212     # but the openssl maintainers basically said: "trust us, it just works".
1213     # (unfortunately, we have to hardcode constants because the abysmally misdesigned
1214     # and mismaintained ssleay-module doesn't even offer them).
1215 root 1.27 # http://www.mail-archive.com/openssl-dev@openssl.org/msg22420.html
1216 root 1.21 Net::SSLeay::CTX_set_mode ($self->{tls},
1217 root 1.34 (eval { local $SIG{__DIE__}; Net::SSLeay::MODE_ENABLE_PARTIAL_WRITE () } || 1)
1218     | (eval { local $SIG{__DIE__}; Net::SSLeay::MODE_ACCEPT_MOVING_WRITE_BUFFER () } || 2));
1219 root 1.21
1220 root 1.38 $self->{_rbio} = Net::SSLeay::BIO_new (Net::SSLeay::BIO_s_mem ());
1221     $self->{_wbio} = Net::SSLeay::BIO_new (Net::SSLeay::BIO_s_mem ());
1222 root 1.19
1223 root 1.38 Net::SSLeay::set_bio ($ssl, $self->{_rbio}, $self->{_wbio});
1224 root 1.19
1225     $self->{filter_w} = sub {
1226 root 1.38 $_[0]{_tls_wbuf} .= ${$_[1]};
1227 root 1.19 &_dotls;
1228     };
1229     $self->{filter_r} = sub {
1230 root 1.38 Net::SSLeay::BIO_write ($_[0]{_rbio}, ${$_[1]});
1231 root 1.19 &_dotls;
1232     };
1233     }
1234    
1235 root 1.25 =item $handle->stoptls
1236    
1237     Destroys the SSL connection, if any. Partial read or write data will be
1238     lost.
1239    
1240     =cut
1241    
1242     sub stoptls {
1243     my ($self) = @_;
1244    
1245     Net::SSLeay::free (delete $self->{tls}) if $self->{tls};
1246 root 1.38
1247     delete $self->{_rbio};
1248     delete $self->{_wbio};
1249     delete $self->{_tls_wbuf};
1250 root 1.25 delete $self->{filter_r};
1251     delete $self->{filter_w};
1252     }
1253    
1254 root 1.19 sub DESTROY {
1255     my $self = shift;
1256    
1257 root 1.25 $self->stoptls;
1258 root 1.19 }
1259    
1260     =item AnyEvent::Handle::TLS_CTX
1261    
1262     This function creates and returns the Net::SSLeay::CTX object used by
1263     default for TLS mode.
1264    
1265     The context is created like this:
1266    
1267     Net::SSLeay::load_error_strings;
1268     Net::SSLeay::SSLeay_add_ssl_algorithms;
1269     Net::SSLeay::randomize;
1270    
1271     my $CTX = Net::SSLeay::CTX_new;
1272    
1273     Net::SSLeay::CTX_set_options $CTX, Net::SSLeay::OP_ALL
1274    
1275     =cut
1276    
1277     our $TLS_CTX;
1278    
1279     sub TLS_CTX() {
1280     $TLS_CTX || do {
1281     require Net::SSLeay;
1282    
1283     Net::SSLeay::load_error_strings ();
1284     Net::SSLeay::SSLeay_add_ssl_algorithms ();
1285     Net::SSLeay::randomize ();
1286    
1287     $TLS_CTX = Net::SSLeay::CTX_new ();
1288    
1289     Net::SSLeay::CTX_set_options ($TLS_CTX, Net::SSLeay::OP_ALL ());
1290    
1291     $TLS_CTX
1292     }
1293     }
1294    
1295 elmex 1.1 =back
1296    
1297 root 1.38 =head1 SUBCLASSING AnyEvent::Handle
1298    
1299     In many cases, you might want to subclass AnyEvent::Handle.
1300    
1301     To make this easier, a given version of AnyEvent::Handle uses these
1302     conventions:
1303    
1304     =over 4
1305    
1306     =item * all constructor arguments become object members.
1307    
1308     At least initially, when you pass a C<tls>-argument to the constructor it
1309     will end up in C<< $handle->{tls} >>. Those members might be changes or
1310     mutated later on (for example C<tls> will hold the TLS connection object).
1311    
1312     =item * other object member names are prefixed with an C<_>.
1313    
1314     All object members not explicitly documented (internal use) are prefixed
1315     with an underscore character, so the remaining non-C<_>-namespace is free
1316     for use for subclasses.
1317    
1318     =item * all members not documented here and not prefixed with an underscore
1319     are free to use in subclasses.
1320    
1321     Of course, new versions of AnyEvent::Handle may introduce more "public"
1322     member variables, but thats just life, at least it is documented.
1323    
1324     =back
1325    
1326 elmex 1.1 =head1 AUTHOR
1327    
1328 root 1.8 Robin Redeker C<< <elmex at ta-sa.org> >>, Marc Lehmann <schmorp@schmorp.de>.
1329 elmex 1.1
1330     =cut
1331    
1332     1; # End of AnyEvent::Handle