ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent/lib/AnyEvent/Handle.pm
Revision: 1.41
Committed: Tue May 27 05:47:36 2008 UTC (16 years ago) by root
Branch: MAIN
Changes since 1.40: +25 -4 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 elmex 1.1 package AnyEvent::Handle;
2    
3 elmex 1.6 no warnings;
4 elmex 1.1 use strict;
5    
6 root 1.8 use AnyEvent ();
7 root 1.33 use AnyEvent::Util qw(WSAWOULDBLOCK);
8 root 1.8 use Scalar::Util ();
9     use Carp ();
10     use Fcntl ();
11 elmex 1.1 use Errno qw/EAGAIN EINTR/;
12    
13     =head1 NAME
14    
15 root 1.22 AnyEvent::Handle - non-blocking I/O on file handles via AnyEvent
16 elmex 1.1
17     =cut
18    
19 root 1.15 our $VERSION = '0.04';
20 elmex 1.1
21     =head1 SYNOPSIS
22    
23     use AnyEvent;
24     use AnyEvent::Handle;
25    
26     my $cv = AnyEvent->condvar;
27    
28 root 1.31 my $handle =
29 elmex 1.2 AnyEvent::Handle->new (
30     fh => \*STDIN,
31     on_eof => sub {
32     $cv->broadcast;
33     },
34     );
35    
36 root 1.31 # send some request line
37     $handle->push_write ("getinfo\015\012");
38    
39     # read the response line
40     $handle->push_read (line => sub {
41     my ($handle, $line) = @_;
42     warn "read line <$line>\n";
43     $cv->send;
44     });
45    
46     $cv->recv;
47 elmex 1.1
48     =head1 DESCRIPTION
49    
50 root 1.8 This module is a helper module to make it easier to do event-based I/O on
51 elmex 1.13 filehandles. For utility functions for doing non-blocking connects and accepts
52     on sockets see L<AnyEvent::Util>.
53 root 1.8
54     In the following, when the documentation refers to of "bytes" then this
55     means characters. As sysread and syswrite are used for all I/O, their
56     treatment of characters applies to this module as well.
57 elmex 1.1
58 root 1.8 All callbacks will be invoked with the handle object as their first
59     argument.
60 elmex 1.1
61     =head1 METHODS
62    
63     =over 4
64    
65     =item B<new (%args)>
66    
67 root 1.8 The constructor supports these arguments (all as key => value pairs).
68 elmex 1.1
69     =over 4
70    
71 root 1.8 =item fh => $filehandle [MANDATORY]
72 elmex 1.1
73     The filehandle this L<AnyEvent::Handle> object will operate on.
74    
75 root 1.8 NOTE: The filehandle will be set to non-blocking (using
76     AnyEvent::Util::fh_nonblocking).
77    
78 root 1.40 =item on_eof => $cb->($handle)
79 root 1.10
80     Set the callback to be called on EOF.
81 root 1.8
82 root 1.16 While not mandatory, it is highly recommended to set an eof callback,
83     otherwise you might end up with a closed socket while you are still
84     waiting for data.
85    
86 root 1.40 =item on_error => $cb->($handle)
87 root 1.10
88     This is the fatal error callback, that is called when, well, a fatal error
89 elmex 1.20 occurs, such as not being able to resolve the hostname, failure to connect
90 root 1.10 or a read error.
91 root 1.8
92     The object will not be in a usable state when this callback has been
93     called.
94    
95 root 1.10 On callback entrance, the value of C<$!> contains the operating system
96 root 1.29 error (or C<ENOSPC>, C<EPIPE> or C<EBADMSG>).
97 root 1.8
98 root 1.38 The callback should throw an exception. If it returns, then
99 root 1.37 AnyEvent::Handle will C<croak> for you.
100    
101 root 1.10 While not mandatory, it is I<highly> recommended to set this callback, as
102     you will not be notified of errors otherwise. The default simply calls
103     die.
104 root 1.8
105 root 1.40 =item on_read => $cb->($handle)
106 root 1.8
107     This sets the default read callback, which is called when data arrives
108 root 1.10 and no read request is in the queue.
109 root 1.8
110     To access (and remove data from) the read buffer, use the C<< ->rbuf >>
111 root 1.40 method or access the C<$handle->{rbuf}> member directly.
112 root 1.8
113     When an EOF condition is detected then AnyEvent::Handle will first try to
114     feed all the remaining data to the queued callbacks and C<on_read> before
115     calling the C<on_eof> callback. If no progress can be made, then a fatal
116     error will be raised (with C<$!> set to C<EPIPE>).
117 elmex 1.1
118 root 1.40 =item on_drain => $cb->($handle)
119 elmex 1.1
120 root 1.8 This sets the callback that is called when the write buffer becomes empty
121     (or when the callback is set and the buffer is empty already).
122 elmex 1.1
123 root 1.8 To append to the write buffer, use the C<< ->push_write >> method.
124 elmex 1.2
125 root 1.8 =item rbuf_max => <bytes>
126 elmex 1.2
127 root 1.8 If defined, then a fatal error will be raised (with C<$!> set to C<ENOSPC>)
128     when the read buffer ever (strictly) exceeds this size. This is useful to
129     avoid denial-of-service attacks.
130 elmex 1.2
131 root 1.8 For example, a server accepting connections from untrusted sources should
132     be configured to accept only so-and-so much data that it cannot act on
133     (for example, when expecting a line, an attacker could send an unlimited
134     amount of data without a callback ever being called as long as the line
135     isn't finished).
136 elmex 1.2
137 root 1.8 =item read_size => <bytes>
138 elmex 1.2
139 root 1.8 The default read block size (the amount of bytes this module will try to read
140     on each [loop iteration). Default: C<4096>.
141    
142     =item low_water_mark => <bytes>
143    
144     Sets the amount of bytes (default: C<0>) that make up an "empty" write
145     buffer: If the write reaches this size or gets even samller it is
146     considered empty.
147 elmex 1.2
148 root 1.19 =item tls => "accept" | "connect" | Net::SSLeay::SSL object
149    
150     When this parameter is given, it enables TLS (SSL) mode, that means it
151     will start making tls handshake and will transparently encrypt/decrypt
152     data.
153    
154 root 1.26 TLS mode requires Net::SSLeay to be installed (it will be loaded
155     automatically when you try to create a TLS handle).
156    
157 root 1.19 For the TLS server side, use C<accept>, and for the TLS client side of a
158     connection, use C<connect> mode.
159    
160     You can also provide your own TLS connection object, but you have
161     to make sure that you call either C<Net::SSLeay::set_connect_state>
162     or C<Net::SSLeay::set_accept_state> on it before you pass it to
163     AnyEvent::Handle.
164    
165 root 1.26 See the C<starttls> method if you need to start TLs negotiation later.
166    
167 root 1.19 =item tls_ctx => $ssl_ctx
168    
169     Use the given Net::SSLeay::CTX object to create the new TLS connection
170     (unless a connection object was specified directly). If this parameter is
171     missing, then AnyEvent::Handle will use C<AnyEvent::Handle::TLS_CTX>.
172    
173 root 1.40 =item json => JSON or JSON::XS object
174    
175     This is the json coder object used by the C<json> read and write types.
176    
177 root 1.41 If you don't supply it, then AnyEvent::Handle will create and use a
178     suitable one, which will write and expect UTF-8 encoded JSON texts.
179 root 1.40
180     Note that you are responsible to depend on the JSON module if you want to
181     use this functionality, as AnyEvent does not have a dependency itself.
182    
183 root 1.38 =item filter_r => $cb
184    
185     =item filter_w => $cb
186    
187     These exist, but are undocumented at this time.
188    
189 elmex 1.1 =back
190    
191     =cut
192    
193     sub new {
194 root 1.8 my $class = shift;
195    
196     my $self = bless { @_ }, $class;
197    
198     $self->{fh} or Carp::croak "mandatory argument fh is missing";
199    
200     AnyEvent::Util::fh_nonblocking $self->{fh}, 1;
201 elmex 1.1
202 root 1.19 if ($self->{tls}) {
203     require Net::SSLeay;
204     $self->starttls (delete $self->{tls}, delete $self->{tls_ctx});
205     }
206    
207 root 1.16 $self->on_eof (delete $self->{on_eof} ) if $self->{on_eof};
208 root 1.10 $self->on_error (delete $self->{on_error}) if $self->{on_error};
209 root 1.8 $self->on_drain (delete $self->{on_drain}) if $self->{on_drain};
210     $self->on_read (delete $self->{on_read} ) if $self->{on_read};
211 elmex 1.1
212 root 1.10 $self->start_read;
213    
214 root 1.8 $self
215     }
216 elmex 1.2
217 root 1.8 sub _shutdown {
218     my ($self) = @_;
219 elmex 1.2
220 root 1.38 delete $self->{_rw};
221     delete $self->{_ww};
222 root 1.8 delete $self->{fh};
223     }
224    
225     sub error {
226     my ($self) = @_;
227    
228     {
229     local $!;
230     $self->_shutdown;
231 elmex 1.1 }
232    
233 root 1.37 $self->{on_error}($self)
234     if $self->{on_error};
235    
236     Carp::croak "AnyEvent::Handle uncaught fatal error: $!";
237 elmex 1.1 }
238    
239 root 1.8 =item $fh = $handle->fh
240 elmex 1.1
241 root 1.22 This method returns the file handle of the L<AnyEvent::Handle> object.
242 elmex 1.1
243     =cut
244    
245 root 1.38 sub fh { $_[0]{fh} }
246 elmex 1.1
247 root 1.8 =item $handle->on_error ($cb)
248 elmex 1.1
249 root 1.8 Replace the current C<on_error> callback (see the C<on_error> constructor argument).
250 elmex 1.1
251 root 1.8 =cut
252    
253     sub on_error {
254     $_[0]{on_error} = $_[1];
255     }
256    
257     =item $handle->on_eof ($cb)
258    
259     Replace the current C<on_eof> callback (see the C<on_eof> constructor argument).
260 elmex 1.1
261     =cut
262    
263 root 1.8 sub on_eof {
264     $_[0]{on_eof} = $_[1];
265     }
266    
267 root 1.9 #############################################################################
268    
269     =back
270    
271     =head2 WRITE QUEUE
272    
273     AnyEvent::Handle manages two queues per handle, one for writing and one
274     for reading.
275    
276     The write queue is very simple: you can add data to its end, and
277     AnyEvent::Handle will automatically try to get rid of it for you.
278    
279 elmex 1.20 When data could be written and the write buffer is shorter then the low
280 root 1.9 water mark, the C<on_drain> callback will be invoked.
281    
282     =over 4
283    
284 root 1.8 =item $handle->on_drain ($cb)
285    
286     Sets the C<on_drain> callback or clears it (see the description of
287     C<on_drain> in the constructor).
288    
289     =cut
290    
291     sub on_drain {
292 elmex 1.1 my ($self, $cb) = @_;
293    
294 root 1.8 $self->{on_drain} = $cb;
295    
296     $cb->($self)
297     if $cb && $self->{low_water_mark} >= length $self->{wbuf};
298     }
299    
300     =item $handle->push_write ($data)
301    
302     Queues the given scalar to be written. You can push as much data as you
303     want (only limited by the available memory), as C<AnyEvent::Handle>
304     buffers it independently of the kernel.
305    
306     =cut
307    
308 root 1.17 sub _drain_wbuf {
309     my ($self) = @_;
310 root 1.8
311 root 1.38 if (!$self->{_ww} && length $self->{wbuf}) {
312 root 1.35
313 root 1.8 Scalar::Util::weaken $self;
314 root 1.35
315 root 1.8 my $cb = sub {
316     my $len = syswrite $self->{fh}, $self->{wbuf};
317    
318 root 1.29 if ($len >= 0) {
319 root 1.8 substr $self->{wbuf}, 0, $len, "";
320    
321     $self->{on_drain}($self)
322     if $self->{low_water_mark} >= length $self->{wbuf}
323     && $self->{on_drain};
324    
325 root 1.38 delete $self->{_ww} unless length $self->{wbuf};
326 root 1.33 } elsif ($! != EAGAIN && $! != EINTR && $! != WSAWOULDBLOCK) {
327 root 1.8 $self->error;
328 elmex 1.1 }
329 root 1.8 };
330    
331 root 1.35 # try to write data immediately
332     $cb->();
333 root 1.8
334 root 1.35 # if still data left in wbuf, we need to poll
335 root 1.38 $self->{_ww} = AnyEvent->io (fh => $self->{fh}, poll => "w", cb => $cb)
336 root 1.35 if length $self->{wbuf};
337 root 1.8 };
338     }
339    
340 root 1.30 our %WH;
341    
342     sub register_write_type($$) {
343     $WH{$_[0]} = $_[1];
344     }
345    
346 root 1.17 sub push_write {
347     my $self = shift;
348    
349 root 1.29 if (@_ > 1) {
350     my $type = shift;
351    
352     @_ = ($WH{$type} or Carp::croak "unsupported type passed to AnyEvent::Handle::push_write")
353     ->($self, @_);
354     }
355    
356 root 1.17 if ($self->{filter_w}) {
357 root 1.18 $self->{filter_w}->($self, \$_[0]);
358 root 1.17 } else {
359     $self->{wbuf} .= $_[0];
360     $self->_drain_wbuf;
361     }
362     }
363    
364 root 1.29 =item $handle->push_write (type => @args)
365    
366     =item $handle->unshift_write (type => @args)
367    
368     Instead of formatting your data yourself, you can also let this module do
369     the job by specifying a type and type-specific arguments.
370    
371 root 1.30 Predefined types are (if you have ideas for additional types, feel free to
372     drop by and tell us):
373 root 1.29
374     =over 4
375    
376     =item netstring => $string
377    
378     Formats the given value as netstring
379     (http://cr.yp.to/proto/netstrings.txt, this is not a recommendation to use them).
380    
381 root 1.30 =back
382    
383 root 1.29 =cut
384    
385     register_write_type netstring => sub {
386     my ($self, $string) = @_;
387    
388     sprintf "%d:%s,", (length $string), $string
389     };
390    
391 root 1.39 =item json => $array_or_hashref
392    
393 root 1.40 Encodes the given hash or array reference into a JSON object. Unless you
394     provide your own JSON object, this means it will be encoded to JSON text
395     in UTF-8.
396    
397     JSON objects (and arrays) are self-delimiting, so you can write JSON at
398     one end of a handle and read them at the other end without using any
399     additional framing.
400    
401 root 1.41 The generated JSON text is guaranteed not to contain any newlines: While
402     this module doesn't need delimiters after or between JSON texts to be
403     able to read them, many other languages depend on that.
404    
405     A simple RPC protocol that interoperates easily with others is to send
406     JSON arrays (or objects, although arrays are usually the better choice as
407     they mimic how function argument passing works) and a newline after each
408     JSON text:
409    
410     $handle->push_write (json => ["method", "arg1", "arg2"]); # whatever
411     $handle->push_write ("\012");
412    
413     An AnyEvent::Handle receiver would simply use the C<json> read type and
414     rely on the fact that the newline will be skipped as leading whitespace:
415    
416     $handle->push_read (json => sub { my $array = $_[1]; ... });
417    
418     Other languages could read single lines terminated by a newline and pass
419     this line into their JSON decoder of choice.
420    
421 root 1.40 =cut
422    
423     register_write_type json => sub {
424     my ($self, $ref) = @_;
425    
426     require JSON;
427    
428     $self->{json} ? $self->{json}->encode ($ref)
429     : JSON::encode_json ($ref)
430     };
431    
432     =item AnyEvent::Handle::register_write_type type => $coderef->($handle, @args)
433 root 1.30
434     This function (not method) lets you add your own types to C<push_write>.
435     Whenever the given C<type> is used, C<push_write> will invoke the code
436     reference with the handle object and the remaining arguments.
437 root 1.29
438 root 1.30 The code reference is supposed to return a single octet string that will
439     be appended to the write buffer.
440 root 1.29
441 root 1.30 Note that this is a function, and all types registered this way will be
442     global, so try to use unique names.
443 root 1.29
444 root 1.30 =cut
445 root 1.29
446 root 1.8 #############################################################################
447    
448 root 1.9 =back
449    
450     =head2 READ QUEUE
451    
452     AnyEvent::Handle manages two queues per handle, one for writing and one
453     for reading.
454    
455     The read queue is more complex than the write queue. It can be used in two
456     ways, the "simple" way, using only C<on_read> and the "complex" way, using
457     a queue.
458    
459     In the simple case, you just install an C<on_read> callback and whenever
460     new data arrives, it will be called. You can then remove some data (if
461     enough is there) from the read buffer (C<< $handle->rbuf >>) if you want
462     or not.
463    
464     In the more complex case, you want to queue multiple callbacks. In this
465     case, AnyEvent::Handle will call the first queued callback each time new
466     data arrives and removes it when it has done its job (see C<push_read>,
467     below).
468    
469     This way you can, for example, push three line-reads, followed by reading
470     a chunk of data, and AnyEvent::Handle will execute them in order.
471    
472     Example 1: EPP protocol parser. EPP sends 4 byte length info, followed by
473     the specified number of bytes which give an XML datagram.
474    
475     # in the default state, expect some header bytes
476     $handle->on_read (sub {
477     # some data is here, now queue the length-header-read (4 octets)
478     shift->unshift_read_chunk (4, sub {
479     # header arrived, decode
480     my $len = unpack "N", $_[1];
481    
482     # now read the payload
483     shift->unshift_read_chunk ($len, sub {
484     my $xml = $_[1];
485     # handle xml
486     });
487     });
488     });
489    
490     Example 2: Implement a client for a protocol that replies either with
491     "OK" and another line or "ERROR" for one request, and 64 bytes for the
492     second request. Due tot he availability of a full queue, we can just
493     pipeline sending both requests and manipulate the queue as necessary in
494     the callbacks:
495    
496     # request one
497     $handle->push_write ("request 1\015\012");
498    
499     # we expect "ERROR" or "OK" as response, so push a line read
500     $handle->push_read_line (sub {
501     # if we got an "OK", we have to _prepend_ another line,
502     # so it will be read before the second request reads its 64 bytes
503     # which are already in the queue when this callback is called
504     # we don't do this in case we got an error
505     if ($_[1] eq "OK") {
506     $_[0]->unshift_read_line (sub {
507     my $response = $_[1];
508     ...
509     });
510     }
511     });
512    
513     # request two
514     $handle->push_write ("request 2\015\012");
515    
516     # simply read 64 bytes, always
517     $handle->push_read_chunk (64, sub {
518     my $response = $_[1];
519     ...
520     });
521    
522     =over 4
523    
524 root 1.10 =cut
525    
526 root 1.8 sub _drain_rbuf {
527     my ($self) = @_;
528 elmex 1.1
529 root 1.17 if (
530     defined $self->{rbuf_max}
531     && $self->{rbuf_max} < length $self->{rbuf}
532     ) {
533 root 1.37 $! = &Errno::ENOSPC;
534     $self->error;
535 root 1.17 }
536    
537 root 1.11 return if $self->{in_drain};
538 root 1.8 local $self->{in_drain} = 1;
539 elmex 1.1
540 root 1.8 while (my $len = length $self->{rbuf}) {
541     no strict 'refs';
542 root 1.38 if (my $cb = shift @{ $self->{_queue} }) {
543 root 1.29 unless ($cb->($self)) {
544 root 1.38 if ($self->{_eof}) {
545 root 1.10 # no progress can be made (not enough data and no data forthcoming)
546 root 1.37 $! = &Errno::EPIPE;
547     $self->error;
548 root 1.10 }
549    
550 root 1.38 unshift @{ $self->{_queue} }, $cb;
551 root 1.8 return;
552     }
553     } elsif ($self->{on_read}) {
554     $self->{on_read}($self);
555    
556     if (
557 root 1.38 $self->{_eof} # if no further data will arrive
558 root 1.8 && $len == length $self->{rbuf} # and no data has been consumed
559 root 1.38 && !@{ $self->{_queue} } # and the queue is still empty
560 root 1.8 && $self->{on_read} # and we still want to read data
561     ) {
562     # then no progress can be made
563 root 1.37 $! = &Errno::EPIPE;
564     $self->error;
565 elmex 1.1 }
566 root 1.8 } else {
567     # read side becomes idle
568 root 1.38 delete $self->{_rw};
569 root 1.8 return;
570     }
571     }
572    
573 root 1.38 if ($self->{_eof}) {
574 root 1.8 $self->_shutdown;
575 root 1.16 $self->{on_eof}($self)
576     if $self->{on_eof};
577 root 1.8 }
578 elmex 1.1 }
579    
580 root 1.8 =item $handle->on_read ($cb)
581 elmex 1.1
582 root 1.8 This replaces the currently set C<on_read> callback, or clears it (when
583     the new callback is C<undef>). See the description of C<on_read> in the
584     constructor.
585 elmex 1.1
586 root 1.8 =cut
587    
588     sub on_read {
589     my ($self, $cb) = @_;
590 elmex 1.1
591 root 1.8 $self->{on_read} = $cb;
592 elmex 1.1 }
593    
594 root 1.8 =item $handle->rbuf
595    
596     Returns the read buffer (as a modifiable lvalue).
597 elmex 1.1
598 root 1.8 You can access the read buffer directly as the C<< ->{rbuf} >> member, if
599     you want.
600 elmex 1.1
601 root 1.8 NOTE: The read buffer should only be used or modified if the C<on_read>,
602     C<push_read> or C<unshift_read> methods are used. The other read methods
603     automatically manage the read buffer.
604 elmex 1.1
605     =cut
606    
607 elmex 1.2 sub rbuf : lvalue {
608 root 1.8 $_[0]{rbuf}
609 elmex 1.2 }
610 elmex 1.1
611 root 1.8 =item $handle->push_read ($cb)
612    
613     =item $handle->unshift_read ($cb)
614    
615     Append the given callback to the end of the queue (C<push_read>) or
616     prepend it (C<unshift_read>).
617    
618     The callback is called each time some additional read data arrives.
619 elmex 1.1
620 elmex 1.20 It must check whether enough data is in the read buffer already.
621 elmex 1.1
622 root 1.8 If not enough data is available, it must return the empty list or a false
623     value, in which case it will be called repeatedly until enough data is
624     available (or an error condition is detected).
625    
626     If enough data was available, then the callback must remove all data it is
627     interested in (which can be none at all) and return a true value. After returning
628     true, it will be removed from the queue.
629 elmex 1.1
630     =cut
631    
632 root 1.30 our %RH;
633    
634     sub register_read_type($$) {
635     $RH{$_[0]} = $_[1];
636     }
637    
638 root 1.8 sub push_read {
639 root 1.28 my $self = shift;
640     my $cb = pop;
641    
642     if (@_) {
643     my $type = shift;
644    
645     $cb = ($RH{$type} or Carp::croak "unsupported type passed to AnyEvent::Handle::push_read")
646     ->($self, $cb, @_);
647     }
648 elmex 1.1
649 root 1.38 push @{ $self->{_queue} }, $cb;
650 root 1.8 $self->_drain_rbuf;
651 elmex 1.1 }
652    
653 root 1.8 sub unshift_read {
654 root 1.28 my $self = shift;
655     my $cb = pop;
656    
657     if (@_) {
658     my $type = shift;
659    
660     $cb = ($RH{$type} or Carp::croak "unsupported type passed to AnyEvent::Handle::unshift_read")
661     ->($self, $cb, @_);
662     }
663    
664 root 1.8
665 root 1.38 unshift @{ $self->{_queue} }, $cb;
666 root 1.8 $self->_drain_rbuf;
667     }
668 elmex 1.1
669 root 1.28 =item $handle->push_read (type => @args, $cb)
670 elmex 1.1
671 root 1.28 =item $handle->unshift_read (type => @args, $cb)
672 elmex 1.1
673 root 1.28 Instead of providing a callback that parses the data itself you can chose
674     between a number of predefined parsing formats, for chunks of data, lines
675     etc.
676 elmex 1.1
677 root 1.30 Predefined types are (if you have ideas for additional types, feel free to
678     drop by and tell us):
679 root 1.28
680     =over 4
681    
682 root 1.40 =item chunk => $octets, $cb->($handle, $data)
683 root 1.28
684     Invoke the callback only once C<$octets> bytes have been read. Pass the
685     data read to the callback. The callback will never be called with less
686     data.
687    
688     Example: read 2 bytes.
689    
690     $handle->push_read (chunk => 2, sub {
691     warn "yay ", unpack "H*", $_[1];
692     });
693 elmex 1.1
694     =cut
695    
696 root 1.28 register_read_type chunk => sub {
697     my ($self, $cb, $len) = @_;
698 elmex 1.1
699 root 1.8 sub {
700     $len <= length $_[0]{rbuf} or return;
701 elmex 1.12 $cb->($_[0], substr $_[0]{rbuf}, 0, $len, "");
702 root 1.8 1
703     }
704 root 1.28 };
705 root 1.8
706 root 1.28 # compatibility with older API
707 root 1.8 sub push_read_chunk {
708 root 1.28 $_[0]->push_read (chunk => $_[1], $_[2]);
709 root 1.8 }
710 elmex 1.1
711 root 1.8 sub unshift_read_chunk {
712 root 1.28 $_[0]->unshift_read (chunk => $_[1], $_[2]);
713 elmex 1.1 }
714    
715 root 1.40 =item line => [$eol, ]$cb->($handle, $line, $eol)
716 elmex 1.1
717 root 1.8 The callback will be called only once a full line (including the end of
718     line marker, C<$eol>) has been read. This line (excluding the end of line
719     marker) will be passed to the callback as second argument (C<$line>), and
720     the end of line marker as the third argument (C<$eol>).
721 elmex 1.1
722 root 1.8 The end of line marker, C<$eol>, can be either a string, in which case it
723     will be interpreted as a fixed record end marker, or it can be a regex
724     object (e.g. created by C<qr>), in which case it is interpreted as a
725     regular expression.
726 elmex 1.1
727 root 1.8 The end of line marker argument C<$eol> is optional, if it is missing (NOT
728     undef), then C<qr|\015?\012|> is used (which is good for most internet
729     protocols).
730 elmex 1.1
731 root 1.8 Partial lines at the end of the stream will never be returned, as they are
732     not marked by the end of line marker.
733 elmex 1.1
734 root 1.8 =cut
735 elmex 1.1
736 root 1.28 register_read_type line => sub {
737     my ($self, $cb, $eol) = @_;
738 elmex 1.1
739 root 1.28 $eol = qr|(\015?\012)| if @_ < 3;
740 root 1.14 $eol = quotemeta $eol unless ref $eol;
741     $eol = qr|^(.*?)($eol)|s;
742 elmex 1.1
743 root 1.8 sub {
744     $_[0]{rbuf} =~ s/$eol// or return;
745 elmex 1.1
746 elmex 1.12 $cb->($_[0], $1, $2);
747 root 1.8 1
748     }
749 root 1.28 };
750 elmex 1.1
751 root 1.28 # compatibility with older API
752 root 1.8 sub push_read_line {
753 root 1.28 my $self = shift;
754     $self->push_read (line => @_);
755 root 1.10 }
756    
757     sub unshift_read_line {
758 root 1.28 my $self = shift;
759     $self->unshift_read (line => @_);
760 root 1.10 }
761    
762 root 1.40 =item netstring => $cb->($handle, $string)
763 root 1.29
764     A netstring (http://cr.yp.to/proto/netstrings.txt, this is not an endorsement).
765    
766     Throws an error with C<$!> set to EBADMSG on format violations.
767    
768     =cut
769    
770     register_read_type netstring => sub {
771     my ($self, $cb) = @_;
772    
773     sub {
774     unless ($_[0]{rbuf} =~ s/^(0|[1-9][0-9]*)://) {
775     if ($_[0]{rbuf} =~ /[^0-9]/) {
776     $! = &Errno::EBADMSG;
777     $self->error;
778     }
779     return;
780     }
781    
782     my $len = $1;
783    
784     $self->unshift_read (chunk => $len, sub {
785     my $string = $_[1];
786     $_[0]->unshift_read (chunk => 1, sub {
787     if ($_[1] eq ",") {
788     $cb->($_[0], $string);
789     } else {
790     $! = &Errno::EBADMSG;
791     $self->error;
792     }
793     });
794     });
795    
796     1
797     }
798     };
799    
800 root 1.40 =item regex => $accept[, $reject[, $skip], $cb->($handle, $data)
801 root 1.36
802     Makes a regex match against the regex object C<$accept> and returns
803     everything up to and including the match.
804    
805     Example: read a single line terminated by '\n'.
806    
807     $handle->push_read (regex => qr<\n>, sub { ... });
808    
809     If C<$reject> is given and not undef, then it determines when the data is
810     to be rejected: it is matched against the data when the C<$accept> regex
811     does not match and generates an C<EBADMSG> error when it matches. This is
812     useful to quickly reject wrong data (to avoid waiting for a timeout or a
813     receive buffer overflow).
814    
815     Example: expect a single decimal number followed by whitespace, reject
816     anything else (not the use of an anchor).
817    
818     $handle->push_read (regex => qr<^[0-9]+\s>, qr<[^0-9]>, sub { ... });
819    
820     If C<$skip> is given and not C<undef>, then it will be matched against
821     the receive buffer when neither C<$accept> nor C<$reject> match,
822     and everything preceding and including the match will be accepted
823     unconditionally. This is useful to skip large amounts of data that you
824     know cannot be matched, so that the C<$accept> or C<$reject> regex do not
825     have to start matching from the beginning. This is purely an optimisation
826     and is usually worth only when you expect more than a few kilobytes.
827    
828     Example: expect a http header, which ends at C<\015\012\015\012>. Since we
829     expect the header to be very large (it isn't in practise, but...), we use
830     a skip regex to skip initial portions. The skip regex is tricky in that
831     it only accepts something not ending in either \015 or \012, as these are
832     required for the accept regex.
833    
834     $handle->push_read (regex =>
835     qr<\015\012\015\012>,
836     undef, # no reject
837     qr<^.*[^\015\012]>,
838     sub { ... });
839    
840     =cut
841    
842     register_read_type regex => sub {
843     my ($self, $cb, $accept, $reject, $skip) = @_;
844    
845     my $data;
846     my $rbuf = \$self->{rbuf};
847    
848     sub {
849     # accept
850     if ($$rbuf =~ $accept) {
851     $data .= substr $$rbuf, 0, $+[0], "";
852     $cb->($self, $data);
853     return 1;
854     }
855    
856     # reject
857     if ($reject && $$rbuf =~ $reject) {
858     $! = &Errno::EBADMSG;
859     $self->error;
860     }
861    
862     # skip
863     if ($skip && $$rbuf =~ $skip) {
864     $data .= substr $$rbuf, 0, $+[0], "";
865     }
866    
867     ()
868     }
869     };
870    
871 root 1.40 =item json => $cb->($handle, $hash_or_arrayref)
872    
873     Reads a JSON object or array, decodes it and passes it to the callback.
874    
875     If a C<json> object was passed to the constructor, then that will be used
876     for the final decode, otherwise it will create a JSON coder expecting UTF-8.
877    
878     This read type uses the incremental parser available with JSON version
879     2.09 (and JSON::XS version 2.2) and above. You have to provide a
880     dependency on your own: this module will load the JSON module, but
881     AnyEvent does not depend on it itself.
882    
883     Since JSON texts are fully self-delimiting, the C<json> read and write
884 root 1.41 types are an ideal simple RPC protocol: just exchange JSON datagrams. See
885     the C<json> write type description, above, for an actual example.
886 root 1.40
887     =cut
888    
889     register_read_type json => sub {
890     my ($self, $cb, $accept, $reject, $skip) = @_;
891    
892     require JSON;
893    
894     my $data;
895     my $rbuf = \$self->{rbuf};
896    
897 root 1.41 my $json = $self->{json} ||= JSON->new->utf8;
898 root 1.40
899     sub {
900     my $ref = $json->incr_parse ($self->{rbuf});
901    
902     if ($ref) {
903     $self->{rbuf} = $json->incr_text;
904     $json->incr_text = "";
905     $cb->($self, $ref);
906    
907     1
908     } else {
909     $self->{rbuf} = "";
910     ()
911     }
912     }
913     };
914    
915 root 1.28 =back
916    
917 root 1.40 =item AnyEvent::Handle::register_read_type type => $coderef->($handle, $cb, @args)
918 root 1.30
919     This function (not method) lets you add your own types to C<push_read>.
920    
921     Whenever the given C<type> is used, C<push_read> will invoke the code
922     reference with the handle object, the callback and the remaining
923     arguments.
924    
925     The code reference is supposed to return a callback (usually a closure)
926     that works as a plain read callback (see C<< ->push_read ($cb) >>).
927    
928     It should invoke the passed callback when it is done reading (remember to
929 root 1.40 pass C<$handle> as first argument as all other callbacks do that).
930 root 1.30
931     Note that this is a function, and all types registered this way will be
932     global, so try to use unique names.
933    
934     For examples, see the source of this module (F<perldoc -m AnyEvent::Handle>,
935     search for C<register_read_type>)).
936    
937 root 1.10 =item $handle->stop_read
938    
939     =item $handle->start_read
940    
941 root 1.18 In rare cases you actually do not want to read anything from the
942 root 1.10 socket. In this case you can call C<stop_read>. Neither C<on_read> no
943 root 1.22 any queued callbacks will be executed then. To start reading again, call
944 root 1.10 C<start_read>.
945    
946     =cut
947    
948     sub stop_read {
949     my ($self) = @_;
950 elmex 1.1
951 root 1.38 delete $self->{_rw};
952 root 1.8 }
953 elmex 1.1
954 root 1.10 sub start_read {
955     my ($self) = @_;
956    
957 root 1.38 unless ($self->{_rw} || $self->{_eof}) {
958 root 1.10 Scalar::Util::weaken $self;
959    
960 root 1.38 $self->{_rw} = AnyEvent->io (fh => $self->{fh}, poll => "r", cb => sub {
961 root 1.17 my $rbuf = $self->{filter_r} ? \my $buf : \$self->{rbuf};
962     my $len = sysread $self->{fh}, $$rbuf, $self->{read_size} || 8192, length $$rbuf;
963 root 1.10
964     if ($len > 0) {
965 root 1.17 $self->{filter_r}
966 root 1.18 ? $self->{filter_r}->($self, $rbuf)
967 root 1.17 : $self->_drain_rbuf;
968 root 1.10
969     } elsif (defined $len) {
970 root 1.38 delete $self->{_rw};
971     $self->{_eof} = 1;
972 root 1.17 $self->_drain_rbuf;
973 root 1.10
974 root 1.33 } elsif ($! != EAGAIN && $! != EINTR && $! != &AnyEvent::Util::WSAWOULDBLOCK) {
975 root 1.10 return $self->error;
976     }
977     });
978     }
979 elmex 1.1 }
980    
981 root 1.19 sub _dotls {
982     my ($self) = @_;
983    
984 root 1.38 if (length $self->{_tls_wbuf}) {
985     while ((my $len = Net::SSLeay::write ($self->{tls}, $self->{_tls_wbuf})) > 0) {
986     substr $self->{_tls_wbuf}, 0, $len, "";
987 root 1.22 }
988 root 1.19 }
989    
990 root 1.38 if (defined (my $buf = Net::SSLeay::BIO_read ($self->{_wbio}))) {
991 root 1.19 $self->{wbuf} .= $buf;
992     $self->_drain_wbuf;
993     }
994    
995 root 1.23 while (defined (my $buf = Net::SSLeay::read ($self->{tls}))) {
996     $self->{rbuf} .= $buf;
997     $self->_drain_rbuf;
998     }
999    
1000 root 1.24 my $err = Net::SSLeay::get_error ($self->{tls}, -1);
1001    
1002     if ($err!= Net::SSLeay::ERROR_WANT_READ ()) {
1003 root 1.23 if ($err == Net::SSLeay::ERROR_SYSCALL ()) {
1004     $self->error;
1005     } elsif ($err == Net::SSLeay::ERROR_SSL ()) {
1006     $! = &Errno::EIO;
1007     $self->error;
1008 root 1.19 }
1009 root 1.23
1010     # all others are fine for our purposes
1011 root 1.19 }
1012     }
1013    
1014 root 1.25 =item $handle->starttls ($tls[, $tls_ctx])
1015    
1016     Instead of starting TLS negotiation immediately when the AnyEvent::Handle
1017     object is created, you can also do that at a later time by calling
1018     C<starttls>.
1019    
1020     The first argument is the same as the C<tls> constructor argument (either
1021     C<"connect">, C<"accept"> or an existing Net::SSLeay object).
1022    
1023     The second argument is the optional C<Net::SSLeay::CTX> object that is
1024     used when AnyEvent::Handle has to create its own TLS connection object.
1025    
1026 root 1.38 The TLS connection object will end up in C<< $handle->{tls} >> after this
1027     call and can be used or changed to your liking. Note that the handshake
1028     might have already started when this function returns.
1029    
1030 root 1.25 =cut
1031    
1032 root 1.19 # TODO: maybe document...
1033     sub starttls {
1034     my ($self, $ssl, $ctx) = @_;
1035    
1036 root 1.25 $self->stoptls;
1037    
1038 root 1.19 if ($ssl eq "accept") {
1039     $ssl = Net::SSLeay::new ($ctx || TLS_CTX ());
1040     Net::SSLeay::set_accept_state ($ssl);
1041     } elsif ($ssl eq "connect") {
1042     $ssl = Net::SSLeay::new ($ctx || TLS_CTX ());
1043     Net::SSLeay::set_connect_state ($ssl);
1044     }
1045    
1046     $self->{tls} = $ssl;
1047    
1048 root 1.21 # basically, this is deep magic (because SSL_read should have the same issues)
1049     # but the openssl maintainers basically said: "trust us, it just works".
1050     # (unfortunately, we have to hardcode constants because the abysmally misdesigned
1051     # and mismaintained ssleay-module doesn't even offer them).
1052 root 1.27 # http://www.mail-archive.com/openssl-dev@openssl.org/msg22420.html
1053 root 1.21 Net::SSLeay::CTX_set_mode ($self->{tls},
1054 root 1.34 (eval { local $SIG{__DIE__}; Net::SSLeay::MODE_ENABLE_PARTIAL_WRITE () } || 1)
1055     | (eval { local $SIG{__DIE__}; Net::SSLeay::MODE_ACCEPT_MOVING_WRITE_BUFFER () } || 2));
1056 root 1.21
1057 root 1.38 $self->{_rbio} = Net::SSLeay::BIO_new (Net::SSLeay::BIO_s_mem ());
1058     $self->{_wbio} = Net::SSLeay::BIO_new (Net::SSLeay::BIO_s_mem ());
1059 root 1.19
1060 root 1.38 Net::SSLeay::set_bio ($ssl, $self->{_rbio}, $self->{_wbio});
1061 root 1.19
1062     $self->{filter_w} = sub {
1063 root 1.38 $_[0]{_tls_wbuf} .= ${$_[1]};
1064 root 1.19 &_dotls;
1065     };
1066     $self->{filter_r} = sub {
1067 root 1.38 Net::SSLeay::BIO_write ($_[0]{_rbio}, ${$_[1]});
1068 root 1.19 &_dotls;
1069     };
1070     }
1071    
1072 root 1.25 =item $handle->stoptls
1073    
1074     Destroys the SSL connection, if any. Partial read or write data will be
1075     lost.
1076    
1077     =cut
1078    
1079     sub stoptls {
1080     my ($self) = @_;
1081    
1082     Net::SSLeay::free (delete $self->{tls}) if $self->{tls};
1083 root 1.38
1084     delete $self->{_rbio};
1085     delete $self->{_wbio};
1086     delete $self->{_tls_wbuf};
1087 root 1.25 delete $self->{filter_r};
1088     delete $self->{filter_w};
1089     }
1090    
1091 root 1.19 sub DESTROY {
1092     my $self = shift;
1093    
1094 root 1.25 $self->stoptls;
1095 root 1.19 }
1096    
1097     =item AnyEvent::Handle::TLS_CTX
1098    
1099     This function creates and returns the Net::SSLeay::CTX object used by
1100     default for TLS mode.
1101    
1102     The context is created like this:
1103    
1104     Net::SSLeay::load_error_strings;
1105     Net::SSLeay::SSLeay_add_ssl_algorithms;
1106     Net::SSLeay::randomize;
1107    
1108     my $CTX = Net::SSLeay::CTX_new;
1109    
1110     Net::SSLeay::CTX_set_options $CTX, Net::SSLeay::OP_ALL
1111    
1112     =cut
1113    
1114     our $TLS_CTX;
1115    
1116     sub TLS_CTX() {
1117     $TLS_CTX || do {
1118     require Net::SSLeay;
1119    
1120     Net::SSLeay::load_error_strings ();
1121     Net::SSLeay::SSLeay_add_ssl_algorithms ();
1122     Net::SSLeay::randomize ();
1123    
1124     $TLS_CTX = Net::SSLeay::CTX_new ();
1125    
1126     Net::SSLeay::CTX_set_options ($TLS_CTX, Net::SSLeay::OP_ALL ());
1127    
1128     $TLS_CTX
1129     }
1130     }
1131    
1132 elmex 1.1 =back
1133    
1134 root 1.38 =head1 SUBCLASSING AnyEvent::Handle
1135    
1136     In many cases, you might want to subclass AnyEvent::Handle.
1137    
1138     To make this easier, a given version of AnyEvent::Handle uses these
1139     conventions:
1140    
1141     =over 4
1142    
1143     =item * all constructor arguments become object members.
1144    
1145     At least initially, when you pass a C<tls>-argument to the constructor it
1146     will end up in C<< $handle->{tls} >>. Those members might be changes or
1147     mutated later on (for example C<tls> will hold the TLS connection object).
1148    
1149     =item * other object member names are prefixed with an C<_>.
1150    
1151     All object members not explicitly documented (internal use) are prefixed
1152     with an underscore character, so the remaining non-C<_>-namespace is free
1153     for use for subclasses.
1154    
1155     =item * all members not documented here and not prefixed with an underscore
1156     are free to use in subclasses.
1157    
1158     Of course, new versions of AnyEvent::Handle may introduce more "public"
1159     member variables, but thats just life, at least it is documented.
1160    
1161     =back
1162    
1163 elmex 1.1 =head1 AUTHOR
1164    
1165 root 1.8 Robin Redeker C<< <elmex at ta-sa.org> >>, Marc Lehmann <schmorp@schmorp.de>.
1166 elmex 1.1
1167     =cut
1168    
1169     1; # End of AnyEvent::Handle