ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent/lib/AnyEvent/Handle.pm
Revision: 1.40
Committed: Tue May 27 05:36:27 2008 UTC (16 years ago) by root
Branch: MAIN
Changes since 1.39: +84 -12 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 elmex 1.1 package AnyEvent::Handle;
2    
3 elmex 1.6 no warnings;
4 elmex 1.1 use strict;
5    
6 root 1.8 use AnyEvent ();
7 root 1.33 use AnyEvent::Util qw(WSAWOULDBLOCK);
8 root 1.8 use Scalar::Util ();
9     use Carp ();
10     use Fcntl ();
11 elmex 1.1 use Errno qw/EAGAIN EINTR/;
12    
13     =head1 NAME
14    
15 root 1.22 AnyEvent::Handle - non-blocking I/O on file handles via AnyEvent
16 elmex 1.1
17     =cut
18    
19 root 1.15 our $VERSION = '0.04';
20 elmex 1.1
21     =head1 SYNOPSIS
22    
23     use AnyEvent;
24     use AnyEvent::Handle;
25    
26     my $cv = AnyEvent->condvar;
27    
28 root 1.31 my $handle =
29 elmex 1.2 AnyEvent::Handle->new (
30     fh => \*STDIN,
31     on_eof => sub {
32     $cv->broadcast;
33     },
34     );
35    
36 root 1.31 # send some request line
37     $handle->push_write ("getinfo\015\012");
38    
39     # read the response line
40     $handle->push_read (line => sub {
41     my ($handle, $line) = @_;
42     warn "read line <$line>\n";
43     $cv->send;
44     });
45    
46     $cv->recv;
47 elmex 1.1
48     =head1 DESCRIPTION
49    
50 root 1.8 This module is a helper module to make it easier to do event-based I/O on
51 elmex 1.13 filehandles. For utility functions for doing non-blocking connects and accepts
52     on sockets see L<AnyEvent::Util>.
53 root 1.8
54     In the following, when the documentation refers to of "bytes" then this
55     means characters. As sysread and syswrite are used for all I/O, their
56     treatment of characters applies to this module as well.
57 elmex 1.1
58 root 1.8 All callbacks will be invoked with the handle object as their first
59     argument.
60 elmex 1.1
61     =head1 METHODS
62    
63     =over 4
64    
65     =item B<new (%args)>
66    
67 root 1.8 The constructor supports these arguments (all as key => value pairs).
68 elmex 1.1
69     =over 4
70    
71 root 1.8 =item fh => $filehandle [MANDATORY]
72 elmex 1.1
73     The filehandle this L<AnyEvent::Handle> object will operate on.
74    
75 root 1.8 NOTE: The filehandle will be set to non-blocking (using
76     AnyEvent::Util::fh_nonblocking).
77    
78 root 1.40 =item on_eof => $cb->($handle)
79 root 1.10
80     Set the callback to be called on EOF.
81 root 1.8
82 root 1.16 While not mandatory, it is highly recommended to set an eof callback,
83     otherwise you might end up with a closed socket while you are still
84     waiting for data.
85    
86 root 1.40 =item on_error => $cb->($handle)
87 root 1.10
88     This is the fatal error callback, that is called when, well, a fatal error
89 elmex 1.20 occurs, such as not being able to resolve the hostname, failure to connect
90 root 1.10 or a read error.
91 root 1.8
92     The object will not be in a usable state when this callback has been
93     called.
94    
95 root 1.10 On callback entrance, the value of C<$!> contains the operating system
96 root 1.29 error (or C<ENOSPC>, C<EPIPE> or C<EBADMSG>).
97 root 1.8
98 root 1.38 The callback should throw an exception. If it returns, then
99 root 1.37 AnyEvent::Handle will C<croak> for you.
100    
101 root 1.10 While not mandatory, it is I<highly> recommended to set this callback, as
102     you will not be notified of errors otherwise. The default simply calls
103     die.
104 root 1.8
105 root 1.40 =item on_read => $cb->($handle)
106 root 1.8
107     This sets the default read callback, which is called when data arrives
108 root 1.10 and no read request is in the queue.
109 root 1.8
110     To access (and remove data from) the read buffer, use the C<< ->rbuf >>
111 root 1.40 method or access the C<$handle->{rbuf}> member directly.
112 root 1.8
113     When an EOF condition is detected then AnyEvent::Handle will first try to
114     feed all the remaining data to the queued callbacks and C<on_read> before
115     calling the C<on_eof> callback. If no progress can be made, then a fatal
116     error will be raised (with C<$!> set to C<EPIPE>).
117 elmex 1.1
118 root 1.40 =item on_drain => $cb->($handle)
119 elmex 1.1
120 root 1.8 This sets the callback that is called when the write buffer becomes empty
121     (or when the callback is set and the buffer is empty already).
122 elmex 1.1
123 root 1.8 To append to the write buffer, use the C<< ->push_write >> method.
124 elmex 1.2
125 root 1.8 =item rbuf_max => <bytes>
126 elmex 1.2
127 root 1.8 If defined, then a fatal error will be raised (with C<$!> set to C<ENOSPC>)
128     when the read buffer ever (strictly) exceeds this size. This is useful to
129     avoid denial-of-service attacks.
130 elmex 1.2
131 root 1.8 For example, a server accepting connections from untrusted sources should
132     be configured to accept only so-and-so much data that it cannot act on
133     (for example, when expecting a line, an attacker could send an unlimited
134     amount of data without a callback ever being called as long as the line
135     isn't finished).
136 elmex 1.2
137 root 1.8 =item read_size => <bytes>
138 elmex 1.2
139 root 1.8 The default read block size (the amount of bytes this module will try to read
140     on each [loop iteration). Default: C<4096>.
141    
142     =item low_water_mark => <bytes>
143    
144     Sets the amount of bytes (default: C<0>) that make up an "empty" write
145     buffer: If the write reaches this size or gets even samller it is
146     considered empty.
147 elmex 1.2
148 root 1.19 =item tls => "accept" | "connect" | Net::SSLeay::SSL object
149    
150     When this parameter is given, it enables TLS (SSL) mode, that means it
151     will start making tls handshake and will transparently encrypt/decrypt
152     data.
153    
154 root 1.26 TLS mode requires Net::SSLeay to be installed (it will be loaded
155     automatically when you try to create a TLS handle).
156    
157 root 1.19 For the TLS server side, use C<accept>, and for the TLS client side of a
158     connection, use C<connect> mode.
159    
160     You can also provide your own TLS connection object, but you have
161     to make sure that you call either C<Net::SSLeay::set_connect_state>
162     or C<Net::SSLeay::set_accept_state> on it before you pass it to
163     AnyEvent::Handle.
164    
165 root 1.26 See the C<starttls> method if you need to start TLs negotiation later.
166    
167 root 1.19 =item tls_ctx => $ssl_ctx
168    
169     Use the given Net::SSLeay::CTX object to create the new TLS connection
170     (unless a connection object was specified directly). If this parameter is
171     missing, then AnyEvent::Handle will use C<AnyEvent::Handle::TLS_CTX>.
172    
173 root 1.40 =item json => JSON or JSON::XS object
174    
175     This is the json coder object used by the C<json> read and write types.
176    
177     If you don't supply it, then AnyEvent::Handle will use C<encode_json> and
178     C<decode_json>.
179    
180     Note that you are responsible to depend on the JSON module if you want to
181     use this functionality, as AnyEvent does not have a dependency itself.
182    
183 root 1.38 =item filter_r => $cb
184    
185     =item filter_w => $cb
186    
187     These exist, but are undocumented at this time.
188    
189 elmex 1.1 =back
190    
191     =cut
192    
193     sub new {
194 root 1.8 my $class = shift;
195    
196     my $self = bless { @_ }, $class;
197    
198     $self->{fh} or Carp::croak "mandatory argument fh is missing";
199    
200     AnyEvent::Util::fh_nonblocking $self->{fh}, 1;
201 elmex 1.1
202 root 1.19 if ($self->{tls}) {
203     require Net::SSLeay;
204     $self->starttls (delete $self->{tls}, delete $self->{tls_ctx});
205     }
206    
207 root 1.16 $self->on_eof (delete $self->{on_eof} ) if $self->{on_eof};
208 root 1.10 $self->on_error (delete $self->{on_error}) if $self->{on_error};
209 root 1.8 $self->on_drain (delete $self->{on_drain}) if $self->{on_drain};
210     $self->on_read (delete $self->{on_read} ) if $self->{on_read};
211 elmex 1.1
212 root 1.10 $self->start_read;
213    
214 root 1.8 $self
215     }
216 elmex 1.2
217 root 1.8 sub _shutdown {
218     my ($self) = @_;
219 elmex 1.2
220 root 1.38 delete $self->{_rw};
221     delete $self->{_ww};
222 root 1.8 delete $self->{fh};
223     }
224    
225     sub error {
226     my ($self) = @_;
227    
228     {
229     local $!;
230     $self->_shutdown;
231 elmex 1.1 }
232    
233 root 1.37 $self->{on_error}($self)
234     if $self->{on_error};
235    
236     Carp::croak "AnyEvent::Handle uncaught fatal error: $!";
237 elmex 1.1 }
238    
239 root 1.8 =item $fh = $handle->fh
240 elmex 1.1
241 root 1.22 This method returns the file handle of the L<AnyEvent::Handle> object.
242 elmex 1.1
243     =cut
244    
245 root 1.38 sub fh { $_[0]{fh} }
246 elmex 1.1
247 root 1.8 =item $handle->on_error ($cb)
248 elmex 1.1
249 root 1.8 Replace the current C<on_error> callback (see the C<on_error> constructor argument).
250 elmex 1.1
251 root 1.8 =cut
252    
253     sub on_error {
254     $_[0]{on_error} = $_[1];
255     }
256    
257     =item $handle->on_eof ($cb)
258    
259     Replace the current C<on_eof> callback (see the C<on_eof> constructor argument).
260 elmex 1.1
261     =cut
262    
263 root 1.8 sub on_eof {
264     $_[0]{on_eof} = $_[1];
265     }
266    
267 root 1.9 #############################################################################
268    
269     =back
270    
271     =head2 WRITE QUEUE
272    
273     AnyEvent::Handle manages two queues per handle, one for writing and one
274     for reading.
275    
276     The write queue is very simple: you can add data to its end, and
277     AnyEvent::Handle will automatically try to get rid of it for you.
278    
279 elmex 1.20 When data could be written and the write buffer is shorter then the low
280 root 1.9 water mark, the C<on_drain> callback will be invoked.
281    
282     =over 4
283    
284 root 1.8 =item $handle->on_drain ($cb)
285    
286     Sets the C<on_drain> callback or clears it (see the description of
287     C<on_drain> in the constructor).
288    
289     =cut
290    
291     sub on_drain {
292 elmex 1.1 my ($self, $cb) = @_;
293    
294 root 1.8 $self->{on_drain} = $cb;
295    
296     $cb->($self)
297     if $cb && $self->{low_water_mark} >= length $self->{wbuf};
298     }
299    
300     =item $handle->push_write ($data)
301    
302     Queues the given scalar to be written. You can push as much data as you
303     want (only limited by the available memory), as C<AnyEvent::Handle>
304     buffers it independently of the kernel.
305    
306     =cut
307    
308 root 1.17 sub _drain_wbuf {
309     my ($self) = @_;
310 root 1.8
311 root 1.38 if (!$self->{_ww} && length $self->{wbuf}) {
312 root 1.35
313 root 1.8 Scalar::Util::weaken $self;
314 root 1.35
315 root 1.8 my $cb = sub {
316     my $len = syswrite $self->{fh}, $self->{wbuf};
317    
318 root 1.29 if ($len >= 0) {
319 root 1.8 substr $self->{wbuf}, 0, $len, "";
320    
321     $self->{on_drain}($self)
322     if $self->{low_water_mark} >= length $self->{wbuf}
323     && $self->{on_drain};
324    
325 root 1.38 delete $self->{_ww} unless length $self->{wbuf};
326 root 1.33 } elsif ($! != EAGAIN && $! != EINTR && $! != WSAWOULDBLOCK) {
327 root 1.8 $self->error;
328 elmex 1.1 }
329 root 1.8 };
330    
331 root 1.35 # try to write data immediately
332     $cb->();
333 root 1.8
334 root 1.35 # if still data left in wbuf, we need to poll
335 root 1.38 $self->{_ww} = AnyEvent->io (fh => $self->{fh}, poll => "w", cb => $cb)
336 root 1.35 if length $self->{wbuf};
337 root 1.8 };
338     }
339    
340 root 1.30 our %WH;
341    
342     sub register_write_type($$) {
343     $WH{$_[0]} = $_[1];
344     }
345    
346 root 1.17 sub push_write {
347     my $self = shift;
348    
349 root 1.29 if (@_ > 1) {
350     my $type = shift;
351    
352     @_ = ($WH{$type} or Carp::croak "unsupported type passed to AnyEvent::Handle::push_write")
353     ->($self, @_);
354     }
355    
356 root 1.17 if ($self->{filter_w}) {
357 root 1.18 $self->{filter_w}->($self, \$_[0]);
358 root 1.17 } else {
359     $self->{wbuf} .= $_[0];
360     $self->_drain_wbuf;
361     }
362     }
363    
364 root 1.29 =item $handle->push_write (type => @args)
365    
366     =item $handle->unshift_write (type => @args)
367    
368     Instead of formatting your data yourself, you can also let this module do
369     the job by specifying a type and type-specific arguments.
370    
371 root 1.30 Predefined types are (if you have ideas for additional types, feel free to
372     drop by and tell us):
373 root 1.29
374     =over 4
375    
376     =item netstring => $string
377    
378     Formats the given value as netstring
379     (http://cr.yp.to/proto/netstrings.txt, this is not a recommendation to use them).
380    
381 root 1.30 =back
382    
383 root 1.29 =cut
384    
385     register_write_type netstring => sub {
386     my ($self, $string) = @_;
387    
388     sprintf "%d:%s,", (length $string), $string
389     };
390    
391 root 1.39 =item json => $array_or_hashref
392    
393 root 1.40 Encodes the given hash or array reference into a JSON object. Unless you
394     provide your own JSON object, this means it will be encoded to JSON text
395     in UTF-8.
396    
397     JSON objects (and arrays) are self-delimiting, so you can write JSON at
398     one end of a handle and read them at the other end without using any
399     additional framing.
400    
401     =cut
402    
403     register_write_type json => sub {
404     my ($self, $ref) = @_;
405    
406     require JSON;
407    
408     $self->{json} ? $self->{json}->encode ($ref)
409     : JSON::encode_json ($ref)
410     };
411    
412     =item AnyEvent::Handle::register_write_type type => $coderef->($handle, @args)
413 root 1.30
414     This function (not method) lets you add your own types to C<push_write>.
415     Whenever the given C<type> is used, C<push_write> will invoke the code
416     reference with the handle object and the remaining arguments.
417 root 1.29
418 root 1.30 The code reference is supposed to return a single octet string that will
419     be appended to the write buffer.
420 root 1.29
421 root 1.30 Note that this is a function, and all types registered this way will be
422     global, so try to use unique names.
423 root 1.29
424 root 1.30 =cut
425 root 1.29
426 root 1.8 #############################################################################
427    
428 root 1.9 =back
429    
430     =head2 READ QUEUE
431    
432     AnyEvent::Handle manages two queues per handle, one for writing and one
433     for reading.
434    
435     The read queue is more complex than the write queue. It can be used in two
436     ways, the "simple" way, using only C<on_read> and the "complex" way, using
437     a queue.
438    
439     In the simple case, you just install an C<on_read> callback and whenever
440     new data arrives, it will be called. You can then remove some data (if
441     enough is there) from the read buffer (C<< $handle->rbuf >>) if you want
442     or not.
443    
444     In the more complex case, you want to queue multiple callbacks. In this
445     case, AnyEvent::Handle will call the first queued callback each time new
446     data arrives and removes it when it has done its job (see C<push_read>,
447     below).
448    
449     This way you can, for example, push three line-reads, followed by reading
450     a chunk of data, and AnyEvent::Handle will execute them in order.
451    
452     Example 1: EPP protocol parser. EPP sends 4 byte length info, followed by
453     the specified number of bytes which give an XML datagram.
454    
455     # in the default state, expect some header bytes
456     $handle->on_read (sub {
457     # some data is here, now queue the length-header-read (4 octets)
458     shift->unshift_read_chunk (4, sub {
459     # header arrived, decode
460     my $len = unpack "N", $_[1];
461    
462     # now read the payload
463     shift->unshift_read_chunk ($len, sub {
464     my $xml = $_[1];
465     # handle xml
466     });
467     });
468     });
469    
470     Example 2: Implement a client for a protocol that replies either with
471     "OK" and another line or "ERROR" for one request, and 64 bytes for the
472     second request. Due tot he availability of a full queue, we can just
473     pipeline sending both requests and manipulate the queue as necessary in
474     the callbacks:
475    
476     # request one
477     $handle->push_write ("request 1\015\012");
478    
479     # we expect "ERROR" or "OK" as response, so push a line read
480     $handle->push_read_line (sub {
481     # if we got an "OK", we have to _prepend_ another line,
482     # so it will be read before the second request reads its 64 bytes
483     # which are already in the queue when this callback is called
484     # we don't do this in case we got an error
485     if ($_[1] eq "OK") {
486     $_[0]->unshift_read_line (sub {
487     my $response = $_[1];
488     ...
489     });
490     }
491     });
492    
493     # request two
494     $handle->push_write ("request 2\015\012");
495    
496     # simply read 64 bytes, always
497     $handle->push_read_chunk (64, sub {
498     my $response = $_[1];
499     ...
500     });
501    
502     =over 4
503    
504 root 1.10 =cut
505    
506 root 1.8 sub _drain_rbuf {
507     my ($self) = @_;
508 elmex 1.1
509 root 1.17 if (
510     defined $self->{rbuf_max}
511     && $self->{rbuf_max} < length $self->{rbuf}
512     ) {
513 root 1.37 $! = &Errno::ENOSPC;
514     $self->error;
515 root 1.17 }
516    
517 root 1.11 return if $self->{in_drain};
518 root 1.8 local $self->{in_drain} = 1;
519 elmex 1.1
520 root 1.8 while (my $len = length $self->{rbuf}) {
521     no strict 'refs';
522 root 1.38 if (my $cb = shift @{ $self->{_queue} }) {
523 root 1.29 unless ($cb->($self)) {
524 root 1.38 if ($self->{_eof}) {
525 root 1.10 # no progress can be made (not enough data and no data forthcoming)
526 root 1.37 $! = &Errno::EPIPE;
527     $self->error;
528 root 1.10 }
529    
530 root 1.38 unshift @{ $self->{_queue} }, $cb;
531 root 1.8 return;
532     }
533     } elsif ($self->{on_read}) {
534     $self->{on_read}($self);
535    
536     if (
537 root 1.38 $self->{_eof} # if no further data will arrive
538 root 1.8 && $len == length $self->{rbuf} # and no data has been consumed
539 root 1.38 && !@{ $self->{_queue} } # and the queue is still empty
540 root 1.8 && $self->{on_read} # and we still want to read data
541     ) {
542     # then no progress can be made
543 root 1.37 $! = &Errno::EPIPE;
544     $self->error;
545 elmex 1.1 }
546 root 1.8 } else {
547     # read side becomes idle
548 root 1.38 delete $self->{_rw};
549 root 1.8 return;
550     }
551     }
552    
553 root 1.38 if ($self->{_eof}) {
554 root 1.8 $self->_shutdown;
555 root 1.16 $self->{on_eof}($self)
556     if $self->{on_eof};
557 root 1.8 }
558 elmex 1.1 }
559    
560 root 1.8 =item $handle->on_read ($cb)
561 elmex 1.1
562 root 1.8 This replaces the currently set C<on_read> callback, or clears it (when
563     the new callback is C<undef>). See the description of C<on_read> in the
564     constructor.
565 elmex 1.1
566 root 1.8 =cut
567    
568     sub on_read {
569     my ($self, $cb) = @_;
570 elmex 1.1
571 root 1.8 $self->{on_read} = $cb;
572 elmex 1.1 }
573    
574 root 1.8 =item $handle->rbuf
575    
576     Returns the read buffer (as a modifiable lvalue).
577 elmex 1.1
578 root 1.8 You can access the read buffer directly as the C<< ->{rbuf} >> member, if
579     you want.
580 elmex 1.1
581 root 1.8 NOTE: The read buffer should only be used or modified if the C<on_read>,
582     C<push_read> or C<unshift_read> methods are used. The other read methods
583     automatically manage the read buffer.
584 elmex 1.1
585     =cut
586    
587 elmex 1.2 sub rbuf : lvalue {
588 root 1.8 $_[0]{rbuf}
589 elmex 1.2 }
590 elmex 1.1
591 root 1.8 =item $handle->push_read ($cb)
592    
593     =item $handle->unshift_read ($cb)
594    
595     Append the given callback to the end of the queue (C<push_read>) or
596     prepend it (C<unshift_read>).
597    
598     The callback is called each time some additional read data arrives.
599 elmex 1.1
600 elmex 1.20 It must check whether enough data is in the read buffer already.
601 elmex 1.1
602 root 1.8 If not enough data is available, it must return the empty list or a false
603     value, in which case it will be called repeatedly until enough data is
604     available (or an error condition is detected).
605    
606     If enough data was available, then the callback must remove all data it is
607     interested in (which can be none at all) and return a true value. After returning
608     true, it will be removed from the queue.
609 elmex 1.1
610     =cut
611    
612 root 1.30 our %RH;
613    
614     sub register_read_type($$) {
615     $RH{$_[0]} = $_[1];
616     }
617    
618 root 1.8 sub push_read {
619 root 1.28 my $self = shift;
620     my $cb = pop;
621    
622     if (@_) {
623     my $type = shift;
624    
625     $cb = ($RH{$type} or Carp::croak "unsupported type passed to AnyEvent::Handle::push_read")
626     ->($self, $cb, @_);
627     }
628 elmex 1.1
629 root 1.38 push @{ $self->{_queue} }, $cb;
630 root 1.8 $self->_drain_rbuf;
631 elmex 1.1 }
632    
633 root 1.8 sub unshift_read {
634 root 1.28 my $self = shift;
635     my $cb = pop;
636    
637     if (@_) {
638     my $type = shift;
639    
640     $cb = ($RH{$type} or Carp::croak "unsupported type passed to AnyEvent::Handle::unshift_read")
641     ->($self, $cb, @_);
642     }
643    
644 root 1.8
645 root 1.38 unshift @{ $self->{_queue} }, $cb;
646 root 1.8 $self->_drain_rbuf;
647     }
648 elmex 1.1
649 root 1.28 =item $handle->push_read (type => @args, $cb)
650 elmex 1.1
651 root 1.28 =item $handle->unshift_read (type => @args, $cb)
652 elmex 1.1
653 root 1.28 Instead of providing a callback that parses the data itself you can chose
654     between a number of predefined parsing formats, for chunks of data, lines
655     etc.
656 elmex 1.1
657 root 1.30 Predefined types are (if you have ideas for additional types, feel free to
658     drop by and tell us):
659 root 1.28
660     =over 4
661    
662 root 1.40 =item chunk => $octets, $cb->($handle, $data)
663 root 1.28
664     Invoke the callback only once C<$octets> bytes have been read. Pass the
665     data read to the callback. The callback will never be called with less
666     data.
667    
668     Example: read 2 bytes.
669    
670     $handle->push_read (chunk => 2, sub {
671     warn "yay ", unpack "H*", $_[1];
672     });
673 elmex 1.1
674     =cut
675    
676 root 1.28 register_read_type chunk => sub {
677     my ($self, $cb, $len) = @_;
678 elmex 1.1
679 root 1.8 sub {
680     $len <= length $_[0]{rbuf} or return;
681 elmex 1.12 $cb->($_[0], substr $_[0]{rbuf}, 0, $len, "");
682 root 1.8 1
683     }
684 root 1.28 };
685 root 1.8
686 root 1.28 # compatibility with older API
687 root 1.8 sub push_read_chunk {
688 root 1.28 $_[0]->push_read (chunk => $_[1], $_[2]);
689 root 1.8 }
690 elmex 1.1
691 root 1.8 sub unshift_read_chunk {
692 root 1.28 $_[0]->unshift_read (chunk => $_[1], $_[2]);
693 elmex 1.1 }
694    
695 root 1.40 =item line => [$eol, ]$cb->($handle, $line, $eol)
696 elmex 1.1
697 root 1.8 The callback will be called only once a full line (including the end of
698     line marker, C<$eol>) has been read. This line (excluding the end of line
699     marker) will be passed to the callback as second argument (C<$line>), and
700     the end of line marker as the third argument (C<$eol>).
701 elmex 1.1
702 root 1.8 The end of line marker, C<$eol>, can be either a string, in which case it
703     will be interpreted as a fixed record end marker, or it can be a regex
704     object (e.g. created by C<qr>), in which case it is interpreted as a
705     regular expression.
706 elmex 1.1
707 root 1.8 The end of line marker argument C<$eol> is optional, if it is missing (NOT
708     undef), then C<qr|\015?\012|> is used (which is good for most internet
709     protocols).
710 elmex 1.1
711 root 1.8 Partial lines at the end of the stream will never be returned, as they are
712     not marked by the end of line marker.
713 elmex 1.1
714 root 1.8 =cut
715 elmex 1.1
716 root 1.28 register_read_type line => sub {
717     my ($self, $cb, $eol) = @_;
718 elmex 1.1
719 root 1.28 $eol = qr|(\015?\012)| if @_ < 3;
720 root 1.14 $eol = quotemeta $eol unless ref $eol;
721     $eol = qr|^(.*?)($eol)|s;
722 elmex 1.1
723 root 1.8 sub {
724     $_[0]{rbuf} =~ s/$eol// or return;
725 elmex 1.1
726 elmex 1.12 $cb->($_[0], $1, $2);
727 root 1.8 1
728     }
729 root 1.28 };
730 elmex 1.1
731 root 1.28 # compatibility with older API
732 root 1.8 sub push_read_line {
733 root 1.28 my $self = shift;
734     $self->push_read (line => @_);
735 root 1.10 }
736    
737     sub unshift_read_line {
738 root 1.28 my $self = shift;
739     $self->unshift_read (line => @_);
740 root 1.10 }
741    
742 root 1.40 =item netstring => $cb->($handle, $string)
743 root 1.29
744     A netstring (http://cr.yp.to/proto/netstrings.txt, this is not an endorsement).
745    
746     Throws an error with C<$!> set to EBADMSG on format violations.
747    
748     =cut
749    
750     register_read_type netstring => sub {
751     my ($self, $cb) = @_;
752    
753     sub {
754     unless ($_[0]{rbuf} =~ s/^(0|[1-9][0-9]*)://) {
755     if ($_[0]{rbuf} =~ /[^0-9]/) {
756     $! = &Errno::EBADMSG;
757     $self->error;
758     }
759     return;
760     }
761    
762     my $len = $1;
763    
764     $self->unshift_read (chunk => $len, sub {
765     my $string = $_[1];
766     $_[0]->unshift_read (chunk => 1, sub {
767     if ($_[1] eq ",") {
768     $cb->($_[0], $string);
769     } else {
770     $! = &Errno::EBADMSG;
771     $self->error;
772     }
773     });
774     });
775    
776     1
777     }
778     };
779    
780 root 1.40 =item regex => $accept[, $reject[, $skip], $cb->($handle, $data)
781 root 1.36
782     Makes a regex match against the regex object C<$accept> and returns
783     everything up to and including the match.
784    
785     Example: read a single line terminated by '\n'.
786    
787     $handle->push_read (regex => qr<\n>, sub { ... });
788    
789     If C<$reject> is given and not undef, then it determines when the data is
790     to be rejected: it is matched against the data when the C<$accept> regex
791     does not match and generates an C<EBADMSG> error when it matches. This is
792     useful to quickly reject wrong data (to avoid waiting for a timeout or a
793     receive buffer overflow).
794    
795     Example: expect a single decimal number followed by whitespace, reject
796     anything else (not the use of an anchor).
797    
798     $handle->push_read (regex => qr<^[0-9]+\s>, qr<[^0-9]>, sub { ... });
799    
800     If C<$skip> is given and not C<undef>, then it will be matched against
801     the receive buffer when neither C<$accept> nor C<$reject> match,
802     and everything preceding and including the match will be accepted
803     unconditionally. This is useful to skip large amounts of data that you
804     know cannot be matched, so that the C<$accept> or C<$reject> regex do not
805     have to start matching from the beginning. This is purely an optimisation
806     and is usually worth only when you expect more than a few kilobytes.
807    
808     Example: expect a http header, which ends at C<\015\012\015\012>. Since we
809     expect the header to be very large (it isn't in practise, but...), we use
810     a skip regex to skip initial portions. The skip regex is tricky in that
811     it only accepts something not ending in either \015 or \012, as these are
812     required for the accept regex.
813    
814     $handle->push_read (regex =>
815     qr<\015\012\015\012>,
816     undef, # no reject
817     qr<^.*[^\015\012]>,
818     sub { ... });
819    
820     =cut
821    
822     register_read_type regex => sub {
823     my ($self, $cb, $accept, $reject, $skip) = @_;
824    
825     my $data;
826     my $rbuf = \$self->{rbuf};
827    
828     sub {
829     # accept
830     if ($$rbuf =~ $accept) {
831     $data .= substr $$rbuf, 0, $+[0], "";
832     $cb->($self, $data);
833     return 1;
834     }
835    
836     # reject
837     if ($reject && $$rbuf =~ $reject) {
838     $! = &Errno::EBADMSG;
839     $self->error;
840     }
841    
842     # skip
843     if ($skip && $$rbuf =~ $skip) {
844     $data .= substr $$rbuf, 0, $+[0], "";
845     }
846    
847     ()
848     }
849     };
850    
851 root 1.40 =item json => $cb->($handle, $hash_or_arrayref)
852    
853     Reads a JSON object or array, decodes it and passes it to the callback.
854    
855     If a C<json> object was passed to the constructor, then that will be used
856     for the final decode, otherwise it will create a JSON coder expecting UTF-8.
857    
858     This read type uses the incremental parser available with JSON version
859     2.09 (and JSON::XS version 2.2) and above. You have to provide a
860     dependency on your own: this module will load the JSON module, but
861     AnyEvent does not depend on it itself.
862    
863     Since JSON texts are fully self-delimiting, the C<json> read and write
864     types are an ideal simple RPC protocol: just exchange JSON datagrams.
865    
866     =cut
867    
868     register_read_type json => sub {
869     my ($self, $cb, $accept, $reject, $skip) = @_;
870    
871     require JSON;
872    
873     my $data;
874     my $rbuf = \$self->{rbuf};
875    
876     my $json = $self->{json} ||= JSON::XS->new->utf8;
877    
878     sub {
879     my $ref = $json->incr_parse ($self->{rbuf});
880    
881     if ($ref) {
882     $self->{rbuf} = $json->incr_text;
883     $json->incr_text = "";
884     $cb->($self, $ref);
885    
886     1
887     } else {
888     $self->{rbuf} = "";
889     ()
890     }
891     }
892     };
893    
894 root 1.28 =back
895    
896 root 1.40 =item AnyEvent::Handle::register_read_type type => $coderef->($handle, $cb, @args)
897 root 1.30
898     This function (not method) lets you add your own types to C<push_read>.
899    
900     Whenever the given C<type> is used, C<push_read> will invoke the code
901     reference with the handle object, the callback and the remaining
902     arguments.
903    
904     The code reference is supposed to return a callback (usually a closure)
905     that works as a plain read callback (see C<< ->push_read ($cb) >>).
906    
907     It should invoke the passed callback when it is done reading (remember to
908 root 1.40 pass C<$handle> as first argument as all other callbacks do that).
909 root 1.30
910     Note that this is a function, and all types registered this way will be
911     global, so try to use unique names.
912    
913     For examples, see the source of this module (F<perldoc -m AnyEvent::Handle>,
914     search for C<register_read_type>)).
915    
916 root 1.10 =item $handle->stop_read
917    
918     =item $handle->start_read
919    
920 root 1.18 In rare cases you actually do not want to read anything from the
921 root 1.10 socket. In this case you can call C<stop_read>. Neither C<on_read> no
922 root 1.22 any queued callbacks will be executed then. To start reading again, call
923 root 1.10 C<start_read>.
924    
925     =cut
926    
927     sub stop_read {
928     my ($self) = @_;
929 elmex 1.1
930 root 1.38 delete $self->{_rw};
931 root 1.8 }
932 elmex 1.1
933 root 1.10 sub start_read {
934     my ($self) = @_;
935    
936 root 1.38 unless ($self->{_rw} || $self->{_eof}) {
937 root 1.10 Scalar::Util::weaken $self;
938    
939 root 1.38 $self->{_rw} = AnyEvent->io (fh => $self->{fh}, poll => "r", cb => sub {
940 root 1.17 my $rbuf = $self->{filter_r} ? \my $buf : \$self->{rbuf};
941     my $len = sysread $self->{fh}, $$rbuf, $self->{read_size} || 8192, length $$rbuf;
942 root 1.10
943     if ($len > 0) {
944 root 1.17 $self->{filter_r}
945 root 1.18 ? $self->{filter_r}->($self, $rbuf)
946 root 1.17 : $self->_drain_rbuf;
947 root 1.10
948     } elsif (defined $len) {
949 root 1.38 delete $self->{_rw};
950     $self->{_eof} = 1;
951 root 1.17 $self->_drain_rbuf;
952 root 1.10
953 root 1.33 } elsif ($! != EAGAIN && $! != EINTR && $! != &AnyEvent::Util::WSAWOULDBLOCK) {
954 root 1.10 return $self->error;
955     }
956     });
957     }
958 elmex 1.1 }
959    
960 root 1.19 sub _dotls {
961     my ($self) = @_;
962    
963 root 1.38 if (length $self->{_tls_wbuf}) {
964     while ((my $len = Net::SSLeay::write ($self->{tls}, $self->{_tls_wbuf})) > 0) {
965     substr $self->{_tls_wbuf}, 0, $len, "";
966 root 1.22 }
967 root 1.19 }
968    
969 root 1.38 if (defined (my $buf = Net::SSLeay::BIO_read ($self->{_wbio}))) {
970 root 1.19 $self->{wbuf} .= $buf;
971     $self->_drain_wbuf;
972     }
973    
974 root 1.23 while (defined (my $buf = Net::SSLeay::read ($self->{tls}))) {
975     $self->{rbuf} .= $buf;
976     $self->_drain_rbuf;
977     }
978    
979 root 1.24 my $err = Net::SSLeay::get_error ($self->{tls}, -1);
980    
981     if ($err!= Net::SSLeay::ERROR_WANT_READ ()) {
982 root 1.23 if ($err == Net::SSLeay::ERROR_SYSCALL ()) {
983     $self->error;
984     } elsif ($err == Net::SSLeay::ERROR_SSL ()) {
985     $! = &Errno::EIO;
986     $self->error;
987 root 1.19 }
988 root 1.23
989     # all others are fine for our purposes
990 root 1.19 }
991     }
992    
993 root 1.25 =item $handle->starttls ($tls[, $tls_ctx])
994    
995     Instead of starting TLS negotiation immediately when the AnyEvent::Handle
996     object is created, you can also do that at a later time by calling
997     C<starttls>.
998    
999     The first argument is the same as the C<tls> constructor argument (either
1000     C<"connect">, C<"accept"> or an existing Net::SSLeay object).
1001    
1002     The second argument is the optional C<Net::SSLeay::CTX> object that is
1003     used when AnyEvent::Handle has to create its own TLS connection object.
1004    
1005 root 1.38 The TLS connection object will end up in C<< $handle->{tls} >> after this
1006     call and can be used or changed to your liking. Note that the handshake
1007     might have already started when this function returns.
1008    
1009 root 1.25 =cut
1010    
1011 root 1.19 # TODO: maybe document...
1012     sub starttls {
1013     my ($self, $ssl, $ctx) = @_;
1014    
1015 root 1.25 $self->stoptls;
1016    
1017 root 1.19 if ($ssl eq "accept") {
1018     $ssl = Net::SSLeay::new ($ctx || TLS_CTX ());
1019     Net::SSLeay::set_accept_state ($ssl);
1020     } elsif ($ssl eq "connect") {
1021     $ssl = Net::SSLeay::new ($ctx || TLS_CTX ());
1022     Net::SSLeay::set_connect_state ($ssl);
1023     }
1024    
1025     $self->{tls} = $ssl;
1026    
1027 root 1.21 # basically, this is deep magic (because SSL_read should have the same issues)
1028     # but the openssl maintainers basically said: "trust us, it just works".
1029     # (unfortunately, we have to hardcode constants because the abysmally misdesigned
1030     # and mismaintained ssleay-module doesn't even offer them).
1031 root 1.27 # http://www.mail-archive.com/openssl-dev@openssl.org/msg22420.html
1032 root 1.21 Net::SSLeay::CTX_set_mode ($self->{tls},
1033 root 1.34 (eval { local $SIG{__DIE__}; Net::SSLeay::MODE_ENABLE_PARTIAL_WRITE () } || 1)
1034     | (eval { local $SIG{__DIE__}; Net::SSLeay::MODE_ACCEPT_MOVING_WRITE_BUFFER () } || 2));
1035 root 1.21
1036 root 1.38 $self->{_rbio} = Net::SSLeay::BIO_new (Net::SSLeay::BIO_s_mem ());
1037     $self->{_wbio} = Net::SSLeay::BIO_new (Net::SSLeay::BIO_s_mem ());
1038 root 1.19
1039 root 1.38 Net::SSLeay::set_bio ($ssl, $self->{_rbio}, $self->{_wbio});
1040 root 1.19
1041     $self->{filter_w} = sub {
1042 root 1.38 $_[0]{_tls_wbuf} .= ${$_[1]};
1043 root 1.19 &_dotls;
1044     };
1045     $self->{filter_r} = sub {
1046 root 1.38 Net::SSLeay::BIO_write ($_[0]{_rbio}, ${$_[1]});
1047 root 1.19 &_dotls;
1048     };
1049     }
1050    
1051 root 1.25 =item $handle->stoptls
1052    
1053     Destroys the SSL connection, if any. Partial read or write data will be
1054     lost.
1055    
1056     =cut
1057    
1058     sub stoptls {
1059     my ($self) = @_;
1060    
1061     Net::SSLeay::free (delete $self->{tls}) if $self->{tls};
1062 root 1.38
1063     delete $self->{_rbio};
1064     delete $self->{_wbio};
1065     delete $self->{_tls_wbuf};
1066 root 1.25 delete $self->{filter_r};
1067     delete $self->{filter_w};
1068     }
1069    
1070 root 1.19 sub DESTROY {
1071     my $self = shift;
1072    
1073 root 1.25 $self->stoptls;
1074 root 1.19 }
1075    
1076     =item AnyEvent::Handle::TLS_CTX
1077    
1078     This function creates and returns the Net::SSLeay::CTX object used by
1079     default for TLS mode.
1080    
1081     The context is created like this:
1082    
1083     Net::SSLeay::load_error_strings;
1084     Net::SSLeay::SSLeay_add_ssl_algorithms;
1085     Net::SSLeay::randomize;
1086    
1087     my $CTX = Net::SSLeay::CTX_new;
1088    
1089     Net::SSLeay::CTX_set_options $CTX, Net::SSLeay::OP_ALL
1090    
1091     =cut
1092    
1093     our $TLS_CTX;
1094    
1095     sub TLS_CTX() {
1096     $TLS_CTX || do {
1097     require Net::SSLeay;
1098    
1099     Net::SSLeay::load_error_strings ();
1100     Net::SSLeay::SSLeay_add_ssl_algorithms ();
1101     Net::SSLeay::randomize ();
1102    
1103     $TLS_CTX = Net::SSLeay::CTX_new ();
1104    
1105     Net::SSLeay::CTX_set_options ($TLS_CTX, Net::SSLeay::OP_ALL ());
1106    
1107     $TLS_CTX
1108     }
1109     }
1110    
1111 elmex 1.1 =back
1112    
1113 root 1.38 =head1 SUBCLASSING AnyEvent::Handle
1114    
1115     In many cases, you might want to subclass AnyEvent::Handle.
1116    
1117     To make this easier, a given version of AnyEvent::Handle uses these
1118     conventions:
1119    
1120     =over 4
1121    
1122     =item * all constructor arguments become object members.
1123    
1124     At least initially, when you pass a C<tls>-argument to the constructor it
1125     will end up in C<< $handle->{tls} >>. Those members might be changes or
1126     mutated later on (for example C<tls> will hold the TLS connection object).
1127    
1128     =item * other object member names are prefixed with an C<_>.
1129    
1130     All object members not explicitly documented (internal use) are prefixed
1131     with an underscore character, so the remaining non-C<_>-namespace is free
1132     for use for subclasses.
1133    
1134     =item * all members not documented here and not prefixed with an underscore
1135     are free to use in subclasses.
1136    
1137     Of course, new versions of AnyEvent::Handle may introduce more "public"
1138     member variables, but thats just life, at least it is documented.
1139    
1140     =back
1141    
1142 elmex 1.1 =head1 AUTHOR
1143    
1144 root 1.8 Robin Redeker C<< <elmex at ta-sa.org> >>, Marc Lehmann <schmorp@schmorp.de>.
1145 elmex 1.1
1146     =cut
1147    
1148     1; # End of AnyEvent::Handle