ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent/lib/AnyEvent/Handle.pm
Revision: 1.88
Committed: Thu Aug 21 23:48:35 2008 UTC (15 years, 8 months ago) by root
Branch: MAIN
CVS Tags: rel-4_233
Changes since 1.87: +42 -31 lines
Log Message:
4.233

File Contents

# User Rev Content
1 elmex 1.1 package AnyEvent::Handle;
2    
3 elmex 1.6 no warnings;
4 root 1.79 use strict qw(subs vars);
5 elmex 1.1
6 root 1.8 use AnyEvent ();
7 root 1.42 use AnyEvent::Util qw(WSAEWOULDBLOCK);
8 root 1.8 use Scalar::Util ();
9     use Carp ();
10     use Fcntl ();
11 root 1.43 use Errno qw(EAGAIN EINTR);
12 elmex 1.1
13     =head1 NAME
14    
15 root 1.22 AnyEvent::Handle - non-blocking I/O on file handles via AnyEvent
16 elmex 1.1
17     =cut
18    
19 root 1.88 our $VERSION = 4.233;
20 elmex 1.1
21     =head1 SYNOPSIS
22    
23     use AnyEvent;
24     use AnyEvent::Handle;
25    
26     my $cv = AnyEvent->condvar;
27    
28 root 1.31 my $handle =
29 elmex 1.2 AnyEvent::Handle->new (
30     fh => \*STDIN,
31     on_eof => sub {
32     $cv->broadcast;
33     },
34     );
35    
36 root 1.31 # send some request line
37     $handle->push_write ("getinfo\015\012");
38    
39     # read the response line
40     $handle->push_read (line => sub {
41     my ($handle, $line) = @_;
42     warn "read line <$line>\n";
43     $cv->send;
44     });
45    
46     $cv->recv;
47 elmex 1.1
48     =head1 DESCRIPTION
49    
50 root 1.8 This module is a helper module to make it easier to do event-based I/O on
51 elmex 1.13 filehandles. For utility functions for doing non-blocking connects and accepts
52     on sockets see L<AnyEvent::Util>.
53 root 1.8
54 root 1.84 The L<AnyEvent::Intro> tutorial contains some well-documented
55     AnyEvent::Handle examples.
56    
57 root 1.8 In the following, when the documentation refers to of "bytes" then this
58     means characters. As sysread and syswrite are used for all I/O, their
59     treatment of characters applies to this module as well.
60 elmex 1.1
61 root 1.8 All callbacks will be invoked with the handle object as their first
62     argument.
63 elmex 1.1
64     =head1 METHODS
65    
66     =over 4
67    
68     =item B<new (%args)>
69    
70 root 1.8 The constructor supports these arguments (all as key => value pairs).
71 elmex 1.1
72     =over 4
73    
74 root 1.8 =item fh => $filehandle [MANDATORY]
75 elmex 1.1
76     The filehandle this L<AnyEvent::Handle> object will operate on.
77    
78 root 1.83 NOTE: The filehandle will be set to non-blocking mode (using
79     C<AnyEvent::Util::fh_nonblocking>) by the constructor and needs to stay in
80     that mode.
81 root 1.8
82 root 1.40 =item on_eof => $cb->($handle)
83 root 1.10
84 root 1.74 Set the callback to be called when an end-of-file condition is detected,
85 root 1.52 i.e. in the case of a socket, when the other side has closed the
86     connection cleanly.
87 root 1.8
88 root 1.82 For sockets, this just means that the other side has stopped sending data,
89     you can still try to write data, and, in fact, one can return from the eof
90     callback and continue writing data, as only the read part has been shut
91     down.
92    
93 root 1.80 While not mandatory, it is I<highly> recommended to set an eof callback,
94 root 1.16 otherwise you might end up with a closed socket while you are still
95     waiting for data.
96    
97 root 1.80 If an EOF condition has been detected but no C<on_eof> callback has been
98     set, then a fatal error will be raised with C<$!> set to <0>.
99    
100 root 1.52 =item on_error => $cb->($handle, $fatal)
101 root 1.10
102 root 1.52 This is the error callback, which is called when, well, some error
103     occured, such as not being able to resolve the hostname, failure to
104     connect or a read error.
105    
106     Some errors are fatal (which is indicated by C<$fatal> being true). On
107 root 1.82 fatal errors the handle object will be shut down and will not be usable
108 root 1.88 (but you are free to look at the current C<< ->rbuf >>). Examples of fatal
109 root 1.82 errors are an EOF condition with active (but unsatisifable) read watchers
110     (C<EPIPE>) or I/O errors.
111    
112     Non-fatal errors can be retried by simply returning, but it is recommended
113     to simply ignore this parameter and instead abondon the handle object
114     when this callback is invoked. Examples of non-fatal errors are timeouts
115     C<ETIMEDOUT>) or badly-formatted data (C<EBADMSG>).
116 root 1.8
117 root 1.10 On callback entrance, the value of C<$!> contains the operating system
118 root 1.43 error (or C<ENOSPC>, C<EPIPE>, C<ETIMEDOUT> or C<EBADMSG>).
119 root 1.8
120 root 1.10 While not mandatory, it is I<highly> recommended to set this callback, as
121     you will not be notified of errors otherwise. The default simply calls
122 root 1.52 C<croak>.
123 root 1.8
124 root 1.40 =item on_read => $cb->($handle)
125 root 1.8
126     This sets the default read callback, which is called when data arrives
127 root 1.61 and no read request is in the queue (unlike read queue callbacks, this
128     callback will only be called when at least one octet of data is in the
129     read buffer).
130 root 1.8
131     To access (and remove data from) the read buffer, use the C<< ->rbuf >>
132 root 1.40 method or access the C<$handle->{rbuf}> member directly.
133 root 1.8
134     When an EOF condition is detected then AnyEvent::Handle will first try to
135     feed all the remaining data to the queued callbacks and C<on_read> before
136     calling the C<on_eof> callback. If no progress can be made, then a fatal
137     error will be raised (with C<$!> set to C<EPIPE>).
138 elmex 1.1
139 root 1.40 =item on_drain => $cb->($handle)
140 elmex 1.1
141 root 1.8 This sets the callback that is called when the write buffer becomes empty
142     (or when the callback is set and the buffer is empty already).
143 elmex 1.1
144 root 1.8 To append to the write buffer, use the C<< ->push_write >> method.
145 elmex 1.2
146 root 1.69 This callback is useful when you don't want to put all of your write data
147     into the queue at once, for example, when you want to write the contents
148     of some file to the socket you might not want to read the whole file into
149     memory and push it into the queue, but instead only read more data from
150     the file when the write queue becomes empty.
151    
152 root 1.43 =item timeout => $fractional_seconds
153    
154     If non-zero, then this enables an "inactivity" timeout: whenever this many
155     seconds pass without a successful read or write on the underlying file
156     handle, the C<on_timeout> callback will be invoked (and if that one is
157 root 1.88 missing, a non-fatal C<ETIMEDOUT> error will be raised).
158 root 1.43
159     Note that timeout processing is also active when you currently do not have
160     any outstanding read or write requests: If you plan to keep the connection
161     idle then you should disable the timout temporarily or ignore the timeout
162 root 1.88 in the C<on_timeout> callback, in which case AnyEvent::Handle will simply
163     restart the timeout.
164 root 1.43
165     Zero (the default) disables this timeout.
166    
167     =item on_timeout => $cb->($handle)
168    
169     Called whenever the inactivity timeout passes. If you return from this
170     callback, then the timeout will be reset as if some activity had happened,
171     so this condition is not fatal in any way.
172    
173 root 1.8 =item rbuf_max => <bytes>
174 elmex 1.2
175 root 1.8 If defined, then a fatal error will be raised (with C<$!> set to C<ENOSPC>)
176     when the read buffer ever (strictly) exceeds this size. This is useful to
177 root 1.88 avoid some forms of denial-of-service attacks.
178 elmex 1.2
179 root 1.8 For example, a server accepting connections from untrusted sources should
180     be configured to accept only so-and-so much data that it cannot act on
181     (for example, when expecting a line, an attacker could send an unlimited
182     amount of data without a callback ever being called as long as the line
183     isn't finished).
184 elmex 1.2
185 root 1.70 =item autocork => <boolean>
186    
187     When disabled (the default), then C<push_write> will try to immediately
188 root 1.88 write the data to the handle, if possible. This avoids having to register
189     a write watcher and wait for the next event loop iteration, but can
190     be inefficient if you write multiple small chunks (on the wire, this
191     disadvantage is usually avoided by your kernel's nagle algorithm, see
192     C<no_delay>, but this option can save costly syscalls).
193 root 1.70
194     When enabled, then writes will always be queued till the next event loop
195     iteration. This is efficient when you do many small writes per iteration,
196 root 1.88 but less efficient when you do a single write only per iteration (or when
197     the write buffer often is full). It also increases write latency.
198 root 1.70
199     =item no_delay => <boolean>
200    
201     When doing small writes on sockets, your operating system kernel might
202     wait a bit for more data before actually sending it out. This is called
203     the Nagle algorithm, and usually it is beneficial.
204    
205 root 1.88 In some situations you want as low a delay as possible, which can be
206     accomplishd by setting this option to a true value.
207 root 1.70
208 root 1.88 The default is your opertaing system's default behaviour (most likely
209     enabled), this option explicitly enables or disables it, if possible.
210 root 1.70
211 root 1.8 =item read_size => <bytes>
212 elmex 1.2
213 root 1.88 The default read block size (the amount of bytes this module will
214     try to read during each loop iteration, which affects memory
215     requirements). Default: C<8192>.
216 root 1.8
217     =item low_water_mark => <bytes>
218    
219     Sets the amount of bytes (default: C<0>) that make up an "empty" write
220     buffer: If the write reaches this size or gets even samller it is
221     considered empty.
222 elmex 1.2
223 root 1.88 Sometimes it can be beneficial (for performance reasons) to add data to
224     the write buffer before it is fully drained, but this is a rare case, as
225     the operating system kernel usually buffers data as well, so the default
226     is good in almost all cases.
227    
228 root 1.62 =item linger => <seconds>
229    
230     If non-zero (default: C<3600>), then the destructor of the
231 root 1.88 AnyEvent::Handle object will check whether there is still outstanding
232     write data and will install a watcher that will write this data to the
233     socket. No errors will be reported (this mostly matches how the operating
234     system treats outstanding data at socket close time).
235 root 1.62
236 root 1.88 This will not work for partial TLS data that could not be encoded
237     yet. This data will be lost.
238 root 1.62
239 root 1.19 =item tls => "accept" | "connect" | Net::SSLeay::SSL object
240    
241 root 1.85 When this parameter is given, it enables TLS (SSL) mode, that means
242 root 1.88 AnyEvent will start a TLS handshake as soon as the conenction has been
243     established and will transparently encrypt/decrypt data afterwards.
244 root 1.19
245 root 1.26 TLS mode requires Net::SSLeay to be installed (it will be loaded
246 root 1.88 automatically when you try to create a TLS handle): this module doesn't
247     have a dependency on that module, so if your module requires it, you have
248     to add the dependency yourself.
249 root 1.26
250 root 1.85 Unlike TCP, TLS has a server and client side: for the TLS server side, use
251     C<accept>, and for the TLS client side of a connection, use C<connect>
252     mode.
253 root 1.19
254     You can also provide your own TLS connection object, but you have
255     to make sure that you call either C<Net::SSLeay::set_connect_state>
256     or C<Net::SSLeay::set_accept_state> on it before you pass it to
257     AnyEvent::Handle.
258    
259 root 1.88 See the C<< ->starttls >> method for when need to start TLS negotiation later.
260 root 1.26
261 root 1.19 =item tls_ctx => $ssl_ctx
262    
263 root 1.88 Use the given C<Net::SSLeay::CTX> object to create the new TLS connection
264 root 1.19 (unless a connection object was specified directly). If this parameter is
265     missing, then AnyEvent::Handle will use C<AnyEvent::Handle::TLS_CTX>.
266    
267 root 1.40 =item json => JSON or JSON::XS object
268    
269     This is the json coder object used by the C<json> read and write types.
270    
271 root 1.41 If you don't supply it, then AnyEvent::Handle will create and use a
272 root 1.86 suitable one (on demand), which will write and expect UTF-8 encoded JSON
273     texts.
274 root 1.40
275     Note that you are responsible to depend on the JSON module if you want to
276     use this functionality, as AnyEvent does not have a dependency itself.
277    
278 root 1.38 =item filter_r => $cb
279    
280     =item filter_w => $cb
281    
282 root 1.87 These exist, but are undocumented at this time. (They are used internally
283     by the TLS code).
284 root 1.38
285 elmex 1.1 =back
286    
287     =cut
288    
289     sub new {
290 root 1.8 my $class = shift;
291    
292     my $self = bless { @_ }, $class;
293    
294     $self->{fh} or Carp::croak "mandatory argument fh is missing";
295    
296     AnyEvent::Util::fh_nonblocking $self->{fh}, 1;
297 elmex 1.1
298 root 1.19 if ($self->{tls}) {
299     require Net::SSLeay;
300     $self->starttls (delete $self->{tls}, delete $self->{tls_ctx});
301     }
302    
303 root 1.44 $self->{_activity} = AnyEvent->now;
304 root 1.43 $self->_timeout;
305 elmex 1.1
306 root 1.70 $self->on_drain (delete $self->{on_drain}) if exists $self->{on_drain};
307     $self->no_delay (delete $self->{no_delay}) if exists $self->{no_delay};
308 root 1.10
309 root 1.66 $self->start_read
310 root 1.67 if $self->{on_read};
311 root 1.66
312 root 1.8 $self
313     }
314 elmex 1.2
315 root 1.8 sub _shutdown {
316     my ($self) = @_;
317 elmex 1.2
318 root 1.46 delete $self->{_tw};
319 root 1.38 delete $self->{_rw};
320     delete $self->{_ww};
321 root 1.8 delete $self->{fh};
322 root 1.52
323     $self->stoptls;
324 root 1.82
325     delete $self->{on_read};
326     delete $self->{_queue};
327 root 1.8 }
328    
329 root 1.52 sub _error {
330     my ($self, $errno, $fatal) = @_;
331 root 1.8
332 root 1.52 $self->_shutdown
333     if $fatal;
334 elmex 1.1
335 root 1.52 $! = $errno;
336 root 1.37
337 root 1.52 if ($self->{on_error}) {
338     $self->{on_error}($self, $fatal);
339     } else {
340     Carp::croak "AnyEvent::Handle uncaught error: $!";
341     }
342 elmex 1.1 }
343    
344 root 1.8 =item $fh = $handle->fh
345 elmex 1.1
346 root 1.88 This method returns the file handle used to create the L<AnyEvent::Handle> object.
347 elmex 1.1
348     =cut
349    
350 root 1.38 sub fh { $_[0]{fh} }
351 elmex 1.1
352 root 1.8 =item $handle->on_error ($cb)
353 elmex 1.1
354 root 1.8 Replace the current C<on_error> callback (see the C<on_error> constructor argument).
355 elmex 1.1
356 root 1.8 =cut
357    
358     sub on_error {
359     $_[0]{on_error} = $_[1];
360     }
361    
362     =item $handle->on_eof ($cb)
363    
364     Replace the current C<on_eof> callback (see the C<on_eof> constructor argument).
365 elmex 1.1
366     =cut
367    
368 root 1.8 sub on_eof {
369     $_[0]{on_eof} = $_[1];
370     }
371    
372 root 1.43 =item $handle->on_timeout ($cb)
373    
374 root 1.88 Replace the current C<on_timeout> callback, or disables the callback (but
375     not the timeout) if C<$cb> = C<undef>. See the C<timeout> constructor
376     argument and method.
377 root 1.43
378     =cut
379    
380     sub on_timeout {
381     $_[0]{on_timeout} = $_[1];
382     }
383    
384 root 1.70 =item $handle->autocork ($boolean)
385    
386     Enables or disables the current autocork behaviour (see C<autocork>
387     constructor argument).
388    
389     =cut
390    
391     =item $handle->no_delay ($boolean)
392    
393     Enables or disables the C<no_delay> setting (see constructor argument of
394     the same name for details).
395    
396     =cut
397    
398     sub no_delay {
399     $_[0]{no_delay} = $_[1];
400    
401     eval {
402     local $SIG{__DIE__};
403     setsockopt $_[0]{fh}, &Socket::IPPROTO_TCP, &Socket::TCP_NODELAY, int $_[1];
404     };
405     }
406    
407 root 1.43 #############################################################################
408    
409     =item $handle->timeout ($seconds)
410    
411     Configures (or disables) the inactivity timeout.
412    
413     =cut
414    
415     sub timeout {
416     my ($self, $timeout) = @_;
417    
418     $self->{timeout} = $timeout;
419     $self->_timeout;
420     }
421    
422     # reset the timeout watcher, as neccessary
423     # also check for time-outs
424     sub _timeout {
425     my ($self) = @_;
426    
427     if ($self->{timeout}) {
428 root 1.44 my $NOW = AnyEvent->now;
429 root 1.43
430     # when would the timeout trigger?
431     my $after = $self->{_activity} + $self->{timeout} - $NOW;
432    
433     # now or in the past already?
434     if ($after <= 0) {
435     $self->{_activity} = $NOW;
436    
437     if ($self->{on_timeout}) {
438 root 1.48 $self->{on_timeout}($self);
439 root 1.43 } else {
440 root 1.52 $self->_error (&Errno::ETIMEDOUT);
441 root 1.43 }
442    
443 root 1.56 # callback could have changed timeout value, optimise
444 root 1.43 return unless $self->{timeout};
445    
446     # calculate new after
447     $after = $self->{timeout};
448     }
449    
450     Scalar::Util::weaken $self;
451 root 1.56 return unless $self; # ->error could have destroyed $self
452 root 1.43
453     $self->{_tw} ||= AnyEvent->timer (after => $after, cb => sub {
454     delete $self->{_tw};
455     $self->_timeout;
456     });
457     } else {
458     delete $self->{_tw};
459     }
460     }
461    
462 root 1.9 #############################################################################
463    
464     =back
465    
466     =head2 WRITE QUEUE
467    
468     AnyEvent::Handle manages two queues per handle, one for writing and one
469     for reading.
470    
471     The write queue is very simple: you can add data to its end, and
472     AnyEvent::Handle will automatically try to get rid of it for you.
473    
474 elmex 1.20 When data could be written and the write buffer is shorter then the low
475 root 1.9 water mark, the C<on_drain> callback will be invoked.
476    
477     =over 4
478    
479 root 1.8 =item $handle->on_drain ($cb)
480    
481     Sets the C<on_drain> callback or clears it (see the description of
482     C<on_drain> in the constructor).
483    
484     =cut
485    
486     sub on_drain {
487 elmex 1.1 my ($self, $cb) = @_;
488    
489 root 1.8 $self->{on_drain} = $cb;
490    
491     $cb->($self)
492     if $cb && $self->{low_water_mark} >= length $self->{wbuf};
493     }
494    
495     =item $handle->push_write ($data)
496    
497     Queues the given scalar to be written. You can push as much data as you
498     want (only limited by the available memory), as C<AnyEvent::Handle>
499     buffers it independently of the kernel.
500    
501     =cut
502    
503 root 1.17 sub _drain_wbuf {
504     my ($self) = @_;
505 root 1.8
506 root 1.38 if (!$self->{_ww} && length $self->{wbuf}) {
507 root 1.35
508 root 1.8 Scalar::Util::weaken $self;
509 root 1.35
510 root 1.8 my $cb = sub {
511     my $len = syswrite $self->{fh}, $self->{wbuf};
512    
513 root 1.29 if ($len >= 0) {
514 root 1.8 substr $self->{wbuf}, 0, $len, "";
515    
516 root 1.44 $self->{_activity} = AnyEvent->now;
517 root 1.43
518 root 1.8 $self->{on_drain}($self)
519     if $self->{low_water_mark} >= length $self->{wbuf}
520     && $self->{on_drain};
521    
522 root 1.38 delete $self->{_ww} unless length $self->{wbuf};
523 root 1.42 } elsif ($! != EAGAIN && $! != EINTR && $! != WSAEWOULDBLOCK) {
524 root 1.52 $self->_error ($!, 1);
525 elmex 1.1 }
526 root 1.8 };
527    
528 root 1.35 # try to write data immediately
529 root 1.70 $cb->() unless $self->{autocork};
530 root 1.8
531 root 1.35 # if still data left in wbuf, we need to poll
532 root 1.38 $self->{_ww} = AnyEvent->io (fh => $self->{fh}, poll => "w", cb => $cb)
533 root 1.35 if length $self->{wbuf};
534 root 1.8 };
535     }
536    
537 root 1.30 our %WH;
538    
539     sub register_write_type($$) {
540     $WH{$_[0]} = $_[1];
541     }
542    
543 root 1.17 sub push_write {
544     my $self = shift;
545    
546 root 1.29 if (@_ > 1) {
547     my $type = shift;
548    
549     @_ = ($WH{$type} or Carp::croak "unsupported type passed to AnyEvent::Handle::push_write")
550     ->($self, @_);
551     }
552    
553 root 1.17 if ($self->{filter_w}) {
554 root 1.48 $self->{filter_w}($self, \$_[0]);
555 root 1.17 } else {
556     $self->{wbuf} .= $_[0];
557     $self->_drain_wbuf;
558     }
559     }
560    
561 root 1.29 =item $handle->push_write (type => @args)
562    
563     Instead of formatting your data yourself, you can also let this module do
564     the job by specifying a type and type-specific arguments.
565    
566 root 1.30 Predefined types are (if you have ideas for additional types, feel free to
567     drop by and tell us):
568 root 1.29
569     =over 4
570    
571     =item netstring => $string
572    
573     Formats the given value as netstring
574     (http://cr.yp.to/proto/netstrings.txt, this is not a recommendation to use them).
575    
576     =cut
577    
578     register_write_type netstring => sub {
579     my ($self, $string) = @_;
580    
581     sprintf "%d:%s,", (length $string), $string
582     };
583    
584 root 1.61 =item packstring => $format, $data
585    
586     An octet string prefixed with an encoded length. The encoding C<$format>
587     uses the same format as a Perl C<pack> format, but must specify a single
588     integer only (only one of C<cCsSlLqQiInNvVjJw> is allowed, plus an
589     optional C<!>, C<< < >> or C<< > >> modifier).
590    
591     =cut
592    
593     register_write_type packstring => sub {
594     my ($self, $format, $string) = @_;
595    
596 root 1.65 pack "$format/a*", $string
597 root 1.61 };
598    
599 root 1.39 =item json => $array_or_hashref
600    
601 root 1.40 Encodes the given hash or array reference into a JSON object. Unless you
602     provide your own JSON object, this means it will be encoded to JSON text
603     in UTF-8.
604    
605     JSON objects (and arrays) are self-delimiting, so you can write JSON at
606     one end of a handle and read them at the other end without using any
607     additional framing.
608    
609 root 1.41 The generated JSON text is guaranteed not to contain any newlines: While
610     this module doesn't need delimiters after or between JSON texts to be
611     able to read them, many other languages depend on that.
612    
613     A simple RPC protocol that interoperates easily with others is to send
614     JSON arrays (or objects, although arrays are usually the better choice as
615     they mimic how function argument passing works) and a newline after each
616     JSON text:
617    
618     $handle->push_write (json => ["method", "arg1", "arg2"]); # whatever
619     $handle->push_write ("\012");
620    
621     An AnyEvent::Handle receiver would simply use the C<json> read type and
622     rely on the fact that the newline will be skipped as leading whitespace:
623    
624     $handle->push_read (json => sub { my $array = $_[1]; ... });
625    
626     Other languages could read single lines terminated by a newline and pass
627     this line into their JSON decoder of choice.
628    
629 root 1.40 =cut
630    
631     register_write_type json => sub {
632     my ($self, $ref) = @_;
633    
634     require JSON;
635    
636     $self->{json} ? $self->{json}->encode ($ref)
637     : JSON::encode_json ($ref)
638     };
639    
640 root 1.63 =item storable => $reference
641    
642     Freezes the given reference using L<Storable> and writes it to the
643     handle. Uses the C<nfreeze> format.
644    
645     =cut
646    
647     register_write_type storable => sub {
648     my ($self, $ref) = @_;
649    
650     require Storable;
651    
652 root 1.65 pack "w/a*", Storable::nfreeze ($ref)
653 root 1.63 };
654    
655 root 1.53 =back
656    
657 root 1.40 =item AnyEvent::Handle::register_write_type type => $coderef->($handle, @args)
658 root 1.30
659     This function (not method) lets you add your own types to C<push_write>.
660     Whenever the given C<type> is used, C<push_write> will invoke the code
661     reference with the handle object and the remaining arguments.
662 root 1.29
663 root 1.30 The code reference is supposed to return a single octet string that will
664     be appended to the write buffer.
665 root 1.29
666 root 1.30 Note that this is a function, and all types registered this way will be
667     global, so try to use unique names.
668 root 1.29
669 root 1.30 =cut
670 root 1.29
671 root 1.8 #############################################################################
672    
673 root 1.9 =back
674    
675     =head2 READ QUEUE
676    
677     AnyEvent::Handle manages two queues per handle, one for writing and one
678     for reading.
679    
680     The read queue is more complex than the write queue. It can be used in two
681     ways, the "simple" way, using only C<on_read> and the "complex" way, using
682     a queue.
683    
684     In the simple case, you just install an C<on_read> callback and whenever
685     new data arrives, it will be called. You can then remove some data (if
686 root 1.69 enough is there) from the read buffer (C<< $handle->rbuf >>). Or you cna
687     leave the data there if you want to accumulate more (e.g. when only a
688     partial message has been received so far).
689 root 1.9
690     In the more complex case, you want to queue multiple callbacks. In this
691     case, AnyEvent::Handle will call the first queued callback each time new
692 root 1.61 data arrives (also the first time it is queued) and removes it when it has
693     done its job (see C<push_read>, below).
694 root 1.9
695     This way you can, for example, push three line-reads, followed by reading
696     a chunk of data, and AnyEvent::Handle will execute them in order.
697    
698     Example 1: EPP protocol parser. EPP sends 4 byte length info, followed by
699     the specified number of bytes which give an XML datagram.
700    
701     # in the default state, expect some header bytes
702     $handle->on_read (sub {
703     # some data is here, now queue the length-header-read (4 octets)
704 root 1.52 shift->unshift_read (chunk => 4, sub {
705 root 1.9 # header arrived, decode
706     my $len = unpack "N", $_[1];
707    
708     # now read the payload
709 root 1.52 shift->unshift_read (chunk => $len, sub {
710 root 1.9 my $xml = $_[1];
711     # handle xml
712     });
713     });
714     });
715    
716 root 1.69 Example 2: Implement a client for a protocol that replies either with "OK"
717     and another line or "ERROR" for the first request that is sent, and 64
718     bytes for the second request. Due to the availability of a queue, we can
719     just pipeline sending both requests and manipulate the queue as necessary
720     in the callbacks.
721    
722     When the first callback is called and sees an "OK" response, it will
723     C<unshift> another line-read. This line-read will be queued I<before> the
724     64-byte chunk callback.
725 root 1.9
726 root 1.69 # request one, returns either "OK + extra line" or "ERROR"
727 root 1.9 $handle->push_write ("request 1\015\012");
728    
729     # we expect "ERROR" or "OK" as response, so push a line read
730 root 1.52 $handle->push_read (line => sub {
731 root 1.9 # if we got an "OK", we have to _prepend_ another line,
732     # so it will be read before the second request reads its 64 bytes
733     # which are already in the queue when this callback is called
734     # we don't do this in case we got an error
735     if ($_[1] eq "OK") {
736 root 1.52 $_[0]->unshift_read (line => sub {
737 root 1.9 my $response = $_[1];
738     ...
739     });
740     }
741     });
742    
743 root 1.69 # request two, simply returns 64 octets
744 root 1.9 $handle->push_write ("request 2\015\012");
745    
746     # simply read 64 bytes, always
747 root 1.52 $handle->push_read (chunk => 64, sub {
748 root 1.9 my $response = $_[1];
749     ...
750     });
751    
752     =over 4
753    
754 root 1.10 =cut
755    
756 root 1.8 sub _drain_rbuf {
757     my ($self) = @_;
758 elmex 1.1
759 root 1.59 local $self->{_in_drain} = 1;
760    
761 root 1.17 if (
762     defined $self->{rbuf_max}
763     && $self->{rbuf_max} < length $self->{rbuf}
764     ) {
765 root 1.82 $self->_error (&Errno::ENOSPC, 1), return;
766 root 1.17 }
767    
768 root 1.59 while () {
769     my $len = length $self->{rbuf};
770 elmex 1.1
771 root 1.38 if (my $cb = shift @{ $self->{_queue} }) {
772 root 1.29 unless ($cb->($self)) {
773 root 1.38 if ($self->{_eof}) {
774 root 1.10 # no progress can be made (not enough data and no data forthcoming)
775 root 1.82 $self->_error (&Errno::EPIPE, 1), return;
776 root 1.10 }
777    
778 root 1.38 unshift @{ $self->{_queue} }, $cb;
779 root 1.55 last;
780 root 1.8 }
781     } elsif ($self->{on_read}) {
782 root 1.61 last unless $len;
783    
784 root 1.8 $self->{on_read}($self);
785    
786     if (
787 root 1.55 $len == length $self->{rbuf} # if no data has been consumed
788     && !@{ $self->{_queue} } # and the queue is still empty
789     && $self->{on_read} # but we still have on_read
790 root 1.8 ) {
791 root 1.55 # no further data will arrive
792     # so no progress can be made
793 root 1.82 $self->_error (&Errno::EPIPE, 1), return
794 root 1.55 if $self->{_eof};
795    
796     last; # more data might arrive
797 elmex 1.1 }
798 root 1.8 } else {
799     # read side becomes idle
800 root 1.38 delete $self->{_rw};
801 root 1.55 last;
802 root 1.8 }
803     }
804    
805 root 1.80 if ($self->{_eof}) {
806     if ($self->{on_eof}) {
807     $self->{on_eof}($self)
808     } else {
809     $self->_error (0, 1);
810     }
811     }
812 root 1.55
813     # may need to restart read watcher
814     unless ($self->{_rw}) {
815     $self->start_read
816     if $self->{on_read} || @{ $self->{_queue} };
817     }
818 elmex 1.1 }
819    
820 root 1.8 =item $handle->on_read ($cb)
821 elmex 1.1
822 root 1.8 This replaces the currently set C<on_read> callback, or clears it (when
823     the new callback is C<undef>). See the description of C<on_read> in the
824     constructor.
825 elmex 1.1
826 root 1.8 =cut
827    
828     sub on_read {
829     my ($self, $cb) = @_;
830 elmex 1.1
831 root 1.8 $self->{on_read} = $cb;
832 root 1.59 $self->_drain_rbuf if $cb && !$self->{_in_drain};
833 elmex 1.1 }
834    
835 root 1.8 =item $handle->rbuf
836    
837     Returns the read buffer (as a modifiable lvalue).
838 elmex 1.1
839 root 1.8 You can access the read buffer directly as the C<< ->{rbuf} >> member, if
840     you want.
841 elmex 1.1
842 root 1.8 NOTE: The read buffer should only be used or modified if the C<on_read>,
843     C<push_read> or C<unshift_read> methods are used. The other read methods
844     automatically manage the read buffer.
845 elmex 1.1
846     =cut
847    
848 elmex 1.2 sub rbuf : lvalue {
849 root 1.8 $_[0]{rbuf}
850 elmex 1.2 }
851 elmex 1.1
852 root 1.8 =item $handle->push_read ($cb)
853    
854     =item $handle->unshift_read ($cb)
855    
856     Append the given callback to the end of the queue (C<push_read>) or
857     prepend it (C<unshift_read>).
858    
859     The callback is called each time some additional read data arrives.
860 elmex 1.1
861 elmex 1.20 It must check whether enough data is in the read buffer already.
862 elmex 1.1
863 root 1.8 If not enough data is available, it must return the empty list or a false
864     value, in which case it will be called repeatedly until enough data is
865     available (or an error condition is detected).
866    
867     If enough data was available, then the callback must remove all data it is
868     interested in (which can be none at all) and return a true value. After returning
869     true, it will be removed from the queue.
870 elmex 1.1
871     =cut
872    
873 root 1.30 our %RH;
874    
875     sub register_read_type($$) {
876     $RH{$_[0]} = $_[1];
877     }
878    
879 root 1.8 sub push_read {
880 root 1.28 my $self = shift;
881     my $cb = pop;
882    
883     if (@_) {
884     my $type = shift;
885    
886     $cb = ($RH{$type} or Carp::croak "unsupported type passed to AnyEvent::Handle::push_read")
887     ->($self, $cb, @_);
888     }
889 elmex 1.1
890 root 1.38 push @{ $self->{_queue} }, $cb;
891 root 1.59 $self->_drain_rbuf unless $self->{_in_drain};
892 elmex 1.1 }
893    
894 root 1.8 sub unshift_read {
895 root 1.28 my $self = shift;
896     my $cb = pop;
897    
898     if (@_) {
899     my $type = shift;
900    
901     $cb = ($RH{$type} or Carp::croak "unsupported type passed to AnyEvent::Handle::unshift_read")
902     ->($self, $cb, @_);
903     }
904    
905 root 1.8
906 root 1.38 unshift @{ $self->{_queue} }, $cb;
907 root 1.59 $self->_drain_rbuf unless $self->{_in_drain};
908 root 1.8 }
909 elmex 1.1
910 root 1.28 =item $handle->push_read (type => @args, $cb)
911 elmex 1.1
912 root 1.28 =item $handle->unshift_read (type => @args, $cb)
913 elmex 1.1
914 root 1.28 Instead of providing a callback that parses the data itself you can chose
915     between a number of predefined parsing formats, for chunks of data, lines
916     etc.
917 elmex 1.1
918 root 1.30 Predefined types are (if you have ideas for additional types, feel free to
919     drop by and tell us):
920 root 1.28
921     =over 4
922    
923 root 1.40 =item chunk => $octets, $cb->($handle, $data)
924 root 1.28
925     Invoke the callback only once C<$octets> bytes have been read. Pass the
926     data read to the callback. The callback will never be called with less
927     data.
928    
929     Example: read 2 bytes.
930    
931     $handle->push_read (chunk => 2, sub {
932     warn "yay ", unpack "H*", $_[1];
933     });
934 elmex 1.1
935     =cut
936    
937 root 1.28 register_read_type chunk => sub {
938     my ($self, $cb, $len) = @_;
939 elmex 1.1
940 root 1.8 sub {
941     $len <= length $_[0]{rbuf} or return;
942 elmex 1.12 $cb->($_[0], substr $_[0]{rbuf}, 0, $len, "");
943 root 1.8 1
944     }
945 root 1.28 };
946 root 1.8
947 root 1.40 =item line => [$eol, ]$cb->($handle, $line, $eol)
948 elmex 1.1
949 root 1.8 The callback will be called only once a full line (including the end of
950     line marker, C<$eol>) has been read. This line (excluding the end of line
951     marker) will be passed to the callback as second argument (C<$line>), and
952     the end of line marker as the third argument (C<$eol>).
953 elmex 1.1
954 root 1.8 The end of line marker, C<$eol>, can be either a string, in which case it
955     will be interpreted as a fixed record end marker, or it can be a regex
956     object (e.g. created by C<qr>), in which case it is interpreted as a
957     regular expression.
958 elmex 1.1
959 root 1.8 The end of line marker argument C<$eol> is optional, if it is missing (NOT
960     undef), then C<qr|\015?\012|> is used (which is good for most internet
961     protocols).
962 elmex 1.1
963 root 1.8 Partial lines at the end of the stream will never be returned, as they are
964     not marked by the end of line marker.
965 elmex 1.1
966 root 1.8 =cut
967 elmex 1.1
968 root 1.28 register_read_type line => sub {
969     my ($self, $cb, $eol) = @_;
970 elmex 1.1
971 root 1.76 if (@_ < 3) {
972     # this is more than twice as fast as the generic code below
973     sub {
974     $_[0]{rbuf} =~ s/^([^\015\012]*)(\015?\012)// or return;
975 elmex 1.1
976 root 1.76 $cb->($_[0], $1, $2);
977     1
978     }
979     } else {
980     $eol = quotemeta $eol unless ref $eol;
981     $eol = qr|^(.*?)($eol)|s;
982    
983     sub {
984     $_[0]{rbuf} =~ s/$eol// or return;
985 elmex 1.1
986 root 1.76 $cb->($_[0], $1, $2);
987     1
988     }
989 root 1.8 }
990 root 1.28 };
991 elmex 1.1
992 root 1.40 =item regex => $accept[, $reject[, $skip], $cb->($handle, $data)
993 root 1.36
994     Makes a regex match against the regex object C<$accept> and returns
995     everything up to and including the match.
996    
997     Example: read a single line terminated by '\n'.
998    
999     $handle->push_read (regex => qr<\n>, sub { ... });
1000    
1001     If C<$reject> is given and not undef, then it determines when the data is
1002     to be rejected: it is matched against the data when the C<$accept> regex
1003     does not match and generates an C<EBADMSG> error when it matches. This is
1004     useful to quickly reject wrong data (to avoid waiting for a timeout or a
1005     receive buffer overflow).
1006    
1007     Example: expect a single decimal number followed by whitespace, reject
1008     anything else (not the use of an anchor).
1009    
1010     $handle->push_read (regex => qr<^[0-9]+\s>, qr<[^0-9]>, sub { ... });
1011    
1012     If C<$skip> is given and not C<undef>, then it will be matched against
1013     the receive buffer when neither C<$accept> nor C<$reject> match,
1014     and everything preceding and including the match will be accepted
1015     unconditionally. This is useful to skip large amounts of data that you
1016     know cannot be matched, so that the C<$accept> or C<$reject> regex do not
1017     have to start matching from the beginning. This is purely an optimisation
1018     and is usually worth only when you expect more than a few kilobytes.
1019    
1020     Example: expect a http header, which ends at C<\015\012\015\012>. Since we
1021     expect the header to be very large (it isn't in practise, but...), we use
1022     a skip regex to skip initial portions. The skip regex is tricky in that
1023     it only accepts something not ending in either \015 or \012, as these are
1024     required for the accept regex.
1025    
1026     $handle->push_read (regex =>
1027     qr<\015\012\015\012>,
1028     undef, # no reject
1029     qr<^.*[^\015\012]>,
1030     sub { ... });
1031    
1032     =cut
1033    
1034     register_read_type regex => sub {
1035     my ($self, $cb, $accept, $reject, $skip) = @_;
1036    
1037     my $data;
1038     my $rbuf = \$self->{rbuf};
1039    
1040     sub {
1041     # accept
1042     if ($$rbuf =~ $accept) {
1043     $data .= substr $$rbuf, 0, $+[0], "";
1044     $cb->($self, $data);
1045     return 1;
1046     }
1047    
1048     # reject
1049     if ($reject && $$rbuf =~ $reject) {
1050 root 1.52 $self->_error (&Errno::EBADMSG);
1051 root 1.36 }
1052    
1053     # skip
1054     if ($skip && $$rbuf =~ $skip) {
1055     $data .= substr $$rbuf, 0, $+[0], "";
1056     }
1057    
1058     ()
1059     }
1060     };
1061    
1062 root 1.61 =item netstring => $cb->($handle, $string)
1063    
1064     A netstring (http://cr.yp.to/proto/netstrings.txt, this is not an endorsement).
1065    
1066     Throws an error with C<$!> set to EBADMSG on format violations.
1067    
1068     =cut
1069    
1070     register_read_type netstring => sub {
1071     my ($self, $cb) = @_;
1072    
1073     sub {
1074     unless ($_[0]{rbuf} =~ s/^(0|[1-9][0-9]*)://) {
1075     if ($_[0]{rbuf} =~ /[^0-9]/) {
1076     $self->_error (&Errno::EBADMSG);
1077     }
1078     return;
1079     }
1080    
1081     my $len = $1;
1082    
1083     $self->unshift_read (chunk => $len, sub {
1084     my $string = $_[1];
1085     $_[0]->unshift_read (chunk => 1, sub {
1086     if ($_[1] eq ",") {
1087     $cb->($_[0], $string);
1088     } else {
1089     $self->_error (&Errno::EBADMSG);
1090     }
1091     });
1092     });
1093    
1094     1
1095     }
1096     };
1097    
1098     =item packstring => $format, $cb->($handle, $string)
1099    
1100     An octet string prefixed with an encoded length. The encoding C<$format>
1101     uses the same format as a Perl C<pack> format, but must specify a single
1102     integer only (only one of C<cCsSlLqQiInNvVjJw> is allowed, plus an
1103     optional C<!>, C<< < >> or C<< > >> modifier).
1104    
1105     DNS over TCP uses a prefix of C<n>, EPP uses a prefix of C<N>.
1106    
1107     Example: read a block of data prefixed by its length in BER-encoded
1108     format (very efficient).
1109    
1110     $handle->push_read (packstring => "w", sub {
1111     my ($handle, $data) = @_;
1112     });
1113    
1114     =cut
1115    
1116     register_read_type packstring => sub {
1117     my ($self, $cb, $format) = @_;
1118    
1119     sub {
1120     # when we can use 5.10 we can use ".", but for 5.8 we use the re-pack method
1121 root 1.76 defined (my $len = eval { unpack $format, $_[0]{rbuf} })
1122 root 1.61 or return;
1123    
1124 root 1.77 $format = length pack $format, $len;
1125 root 1.61
1126 root 1.77 # bypass unshift if we already have the remaining chunk
1127     if ($format + $len <= length $_[0]{rbuf}) {
1128     my $data = substr $_[0]{rbuf}, $format, $len;
1129     substr $_[0]{rbuf}, 0, $format + $len, "";
1130     $cb->($_[0], $data);
1131     } else {
1132     # remove prefix
1133     substr $_[0]{rbuf}, 0, $format, "";
1134    
1135     # read remaining chunk
1136     $_[0]->unshift_read (chunk => $len, $cb);
1137     }
1138 root 1.61
1139     1
1140     }
1141     };
1142    
1143 root 1.40 =item json => $cb->($handle, $hash_or_arrayref)
1144    
1145     Reads a JSON object or array, decodes it and passes it to the callback.
1146    
1147     If a C<json> object was passed to the constructor, then that will be used
1148     for the final decode, otherwise it will create a JSON coder expecting UTF-8.
1149    
1150     This read type uses the incremental parser available with JSON version
1151     2.09 (and JSON::XS version 2.2) and above. You have to provide a
1152     dependency on your own: this module will load the JSON module, but
1153     AnyEvent does not depend on it itself.
1154    
1155     Since JSON texts are fully self-delimiting, the C<json> read and write
1156 root 1.41 types are an ideal simple RPC protocol: just exchange JSON datagrams. See
1157     the C<json> write type description, above, for an actual example.
1158 root 1.40
1159     =cut
1160    
1161     register_read_type json => sub {
1162 root 1.63 my ($self, $cb) = @_;
1163 root 1.40
1164     require JSON;
1165    
1166     my $data;
1167     my $rbuf = \$self->{rbuf};
1168    
1169 root 1.41 my $json = $self->{json} ||= JSON->new->utf8;
1170 root 1.40
1171     sub {
1172     my $ref = $json->incr_parse ($self->{rbuf});
1173    
1174     if ($ref) {
1175     $self->{rbuf} = $json->incr_text;
1176     $json->incr_text = "";
1177     $cb->($self, $ref);
1178    
1179     1
1180     } else {
1181     $self->{rbuf} = "";
1182     ()
1183     }
1184     }
1185     };
1186    
1187 root 1.63 =item storable => $cb->($handle, $ref)
1188    
1189     Deserialises a L<Storable> frozen representation as written by the
1190     C<storable> write type (BER-encoded length prefix followed by nfreeze'd
1191     data).
1192    
1193     Raises C<EBADMSG> error if the data could not be decoded.
1194    
1195     =cut
1196    
1197     register_read_type storable => sub {
1198     my ($self, $cb) = @_;
1199    
1200     require Storable;
1201    
1202     sub {
1203     # when we can use 5.10 we can use ".", but for 5.8 we use the re-pack method
1204 root 1.76 defined (my $len = eval { unpack "w", $_[0]{rbuf} })
1205 root 1.63 or return;
1206    
1207 root 1.77 my $format = length pack "w", $len;
1208 root 1.63
1209 root 1.77 # bypass unshift if we already have the remaining chunk
1210     if ($format + $len <= length $_[0]{rbuf}) {
1211     my $data = substr $_[0]{rbuf}, $format, $len;
1212     substr $_[0]{rbuf}, 0, $format + $len, "";
1213     $cb->($_[0], Storable::thaw ($data));
1214     } else {
1215     # remove prefix
1216     substr $_[0]{rbuf}, 0, $format, "";
1217    
1218     # read remaining chunk
1219     $_[0]->unshift_read (chunk => $len, sub {
1220     if (my $ref = eval { Storable::thaw ($_[1]) }) {
1221     $cb->($_[0], $ref);
1222     } else {
1223     $self->_error (&Errno::EBADMSG);
1224     }
1225     });
1226     }
1227    
1228     1
1229 root 1.63 }
1230     };
1231    
1232 root 1.28 =back
1233    
1234 root 1.40 =item AnyEvent::Handle::register_read_type type => $coderef->($handle, $cb, @args)
1235 root 1.30
1236     This function (not method) lets you add your own types to C<push_read>.
1237    
1238     Whenever the given C<type> is used, C<push_read> will invoke the code
1239     reference with the handle object, the callback and the remaining
1240     arguments.
1241    
1242     The code reference is supposed to return a callback (usually a closure)
1243     that works as a plain read callback (see C<< ->push_read ($cb) >>).
1244    
1245     It should invoke the passed callback when it is done reading (remember to
1246 root 1.40 pass C<$handle> as first argument as all other callbacks do that).
1247 root 1.30
1248     Note that this is a function, and all types registered this way will be
1249     global, so try to use unique names.
1250    
1251     For examples, see the source of this module (F<perldoc -m AnyEvent::Handle>,
1252     search for C<register_read_type>)).
1253    
1254 root 1.10 =item $handle->stop_read
1255    
1256     =item $handle->start_read
1257    
1258 root 1.18 In rare cases you actually do not want to read anything from the
1259 root 1.58 socket. In this case you can call C<stop_read>. Neither C<on_read> nor
1260 root 1.22 any queued callbacks will be executed then. To start reading again, call
1261 root 1.10 C<start_read>.
1262    
1263 root 1.56 Note that AnyEvent::Handle will automatically C<start_read> for you when
1264     you change the C<on_read> callback or push/unshift a read callback, and it
1265     will automatically C<stop_read> for you when neither C<on_read> is set nor
1266     there are any read requests in the queue.
1267    
1268 root 1.10 =cut
1269    
1270     sub stop_read {
1271     my ($self) = @_;
1272 elmex 1.1
1273 root 1.38 delete $self->{_rw};
1274 root 1.8 }
1275 elmex 1.1
1276 root 1.10 sub start_read {
1277     my ($self) = @_;
1278    
1279 root 1.38 unless ($self->{_rw} || $self->{_eof}) {
1280 root 1.10 Scalar::Util::weaken $self;
1281    
1282 root 1.38 $self->{_rw} = AnyEvent->io (fh => $self->{fh}, poll => "r", cb => sub {
1283 root 1.17 my $rbuf = $self->{filter_r} ? \my $buf : \$self->{rbuf};
1284     my $len = sysread $self->{fh}, $$rbuf, $self->{read_size} || 8192, length $$rbuf;
1285 root 1.10
1286     if ($len > 0) {
1287 root 1.44 $self->{_activity} = AnyEvent->now;
1288 root 1.43
1289 root 1.17 $self->{filter_r}
1290 root 1.48 ? $self->{filter_r}($self, $rbuf)
1291 root 1.59 : $self->{_in_drain} || $self->_drain_rbuf;
1292 root 1.10
1293     } elsif (defined $len) {
1294 root 1.38 delete $self->{_rw};
1295     $self->{_eof} = 1;
1296 root 1.59 $self->_drain_rbuf unless $self->{_in_drain};
1297 root 1.10
1298 root 1.42 } elsif ($! != EAGAIN && $! != EINTR && $! != WSAEWOULDBLOCK) {
1299 root 1.52 return $self->_error ($!, 1);
1300 root 1.10 }
1301     });
1302     }
1303 elmex 1.1 }
1304    
1305 root 1.19 sub _dotls {
1306     my ($self) = @_;
1307    
1308 root 1.56 my $buf;
1309    
1310 root 1.38 if (length $self->{_tls_wbuf}) {
1311     while ((my $len = Net::SSLeay::write ($self->{tls}, $self->{_tls_wbuf})) > 0) {
1312     substr $self->{_tls_wbuf}, 0, $len, "";
1313 root 1.22 }
1314 root 1.19 }
1315    
1316 root 1.56 if (length ($buf = Net::SSLeay::BIO_read ($self->{_wbio}))) {
1317 root 1.19 $self->{wbuf} .= $buf;
1318     $self->_drain_wbuf;
1319     }
1320    
1321 root 1.56 while (defined ($buf = Net::SSLeay::read ($self->{tls}))) {
1322     if (length $buf) {
1323     $self->{rbuf} .= $buf;
1324 root 1.59 $self->_drain_rbuf unless $self->{_in_drain};
1325 root 1.56 } else {
1326     # let's treat SSL-eof as we treat normal EOF
1327     $self->{_eof} = 1;
1328     $self->_shutdown;
1329     return;
1330     }
1331 root 1.23 }
1332    
1333 root 1.24 my $err = Net::SSLeay::get_error ($self->{tls}, -1);
1334    
1335     if ($err!= Net::SSLeay::ERROR_WANT_READ ()) {
1336 root 1.23 if ($err == Net::SSLeay::ERROR_SYSCALL ()) {
1337 root 1.52 return $self->_error ($!, 1);
1338 root 1.23 } elsif ($err == Net::SSLeay::ERROR_SSL ()) {
1339 root 1.52 return $self->_error (&Errno::EIO, 1);
1340 root 1.19 }
1341 root 1.23
1342     # all others are fine for our purposes
1343 root 1.19 }
1344     }
1345    
1346 root 1.25 =item $handle->starttls ($tls[, $tls_ctx])
1347    
1348     Instead of starting TLS negotiation immediately when the AnyEvent::Handle
1349     object is created, you can also do that at a later time by calling
1350     C<starttls>.
1351    
1352     The first argument is the same as the C<tls> constructor argument (either
1353     C<"connect">, C<"accept"> or an existing Net::SSLeay object).
1354    
1355     The second argument is the optional C<Net::SSLeay::CTX> object that is
1356     used when AnyEvent::Handle has to create its own TLS connection object.
1357    
1358 root 1.38 The TLS connection object will end up in C<< $handle->{tls} >> after this
1359     call and can be used or changed to your liking. Note that the handshake
1360     might have already started when this function returns.
1361    
1362 root 1.25 =cut
1363    
1364 root 1.19 sub starttls {
1365     my ($self, $ssl, $ctx) = @_;
1366    
1367 root 1.25 $self->stoptls;
1368    
1369 root 1.19 if ($ssl eq "accept") {
1370     $ssl = Net::SSLeay::new ($ctx || TLS_CTX ());
1371     Net::SSLeay::set_accept_state ($ssl);
1372     } elsif ($ssl eq "connect") {
1373     $ssl = Net::SSLeay::new ($ctx || TLS_CTX ());
1374     Net::SSLeay::set_connect_state ($ssl);
1375     }
1376    
1377     $self->{tls} = $ssl;
1378    
1379 root 1.21 # basically, this is deep magic (because SSL_read should have the same issues)
1380     # but the openssl maintainers basically said: "trust us, it just works".
1381     # (unfortunately, we have to hardcode constants because the abysmally misdesigned
1382     # and mismaintained ssleay-module doesn't even offer them).
1383 root 1.27 # http://www.mail-archive.com/openssl-dev@openssl.org/msg22420.html
1384 root 1.87 #
1385     # in short: this is a mess.
1386     #
1387     # note that we do not try to kepe the length constant between writes as we are required to do.
1388     # we assume that most (but not all) of this insanity only applies to non-blocking cases,
1389     # and we drive openssl fully in blocking mode here.
1390 root 1.21 Net::SSLeay::CTX_set_mode ($self->{tls},
1391 root 1.34 (eval { local $SIG{__DIE__}; Net::SSLeay::MODE_ENABLE_PARTIAL_WRITE () } || 1)
1392     | (eval { local $SIG{__DIE__}; Net::SSLeay::MODE_ACCEPT_MOVING_WRITE_BUFFER () } || 2));
1393 root 1.21
1394 root 1.38 $self->{_rbio} = Net::SSLeay::BIO_new (Net::SSLeay::BIO_s_mem ());
1395     $self->{_wbio} = Net::SSLeay::BIO_new (Net::SSLeay::BIO_s_mem ());
1396 root 1.19
1397 root 1.38 Net::SSLeay::set_bio ($ssl, $self->{_rbio}, $self->{_wbio});
1398 root 1.19
1399     $self->{filter_w} = sub {
1400 root 1.38 $_[0]{_tls_wbuf} .= ${$_[1]};
1401 root 1.19 &_dotls;
1402     };
1403     $self->{filter_r} = sub {
1404 root 1.38 Net::SSLeay::BIO_write ($_[0]{_rbio}, ${$_[1]});
1405 root 1.19 &_dotls;
1406     };
1407     }
1408    
1409 root 1.25 =item $handle->stoptls
1410    
1411     Destroys the SSL connection, if any. Partial read or write data will be
1412     lost.
1413    
1414     =cut
1415    
1416     sub stoptls {
1417     my ($self) = @_;
1418    
1419     Net::SSLeay::free (delete $self->{tls}) if $self->{tls};
1420 root 1.38
1421     delete $self->{_rbio};
1422     delete $self->{_wbio};
1423     delete $self->{_tls_wbuf};
1424 root 1.25 delete $self->{filter_r};
1425     delete $self->{filter_w};
1426     }
1427    
1428 root 1.19 sub DESTROY {
1429     my $self = shift;
1430    
1431 root 1.25 $self->stoptls;
1432 root 1.62
1433     my $linger = exists $self->{linger} ? $self->{linger} : 3600;
1434    
1435     if ($linger && length $self->{wbuf}) {
1436     my $fh = delete $self->{fh};
1437     my $wbuf = delete $self->{wbuf};
1438    
1439     my @linger;
1440    
1441     push @linger, AnyEvent->io (fh => $fh, poll => "w", cb => sub {
1442     my $len = syswrite $fh, $wbuf, length $wbuf;
1443    
1444     if ($len > 0) {
1445     substr $wbuf, 0, $len, "";
1446     } else {
1447     @linger = (); # end
1448     }
1449     });
1450     push @linger, AnyEvent->timer (after => $linger, cb => sub {
1451     @linger = ();
1452     });
1453     }
1454 root 1.19 }
1455    
1456     =item AnyEvent::Handle::TLS_CTX
1457    
1458     This function creates and returns the Net::SSLeay::CTX object used by
1459     default for TLS mode.
1460    
1461     The context is created like this:
1462    
1463     Net::SSLeay::load_error_strings;
1464     Net::SSLeay::SSLeay_add_ssl_algorithms;
1465     Net::SSLeay::randomize;
1466    
1467     my $CTX = Net::SSLeay::CTX_new;
1468    
1469     Net::SSLeay::CTX_set_options $CTX, Net::SSLeay::OP_ALL
1470    
1471     =cut
1472    
1473     our $TLS_CTX;
1474    
1475     sub TLS_CTX() {
1476     $TLS_CTX || do {
1477     require Net::SSLeay;
1478    
1479     Net::SSLeay::load_error_strings ();
1480     Net::SSLeay::SSLeay_add_ssl_algorithms ();
1481     Net::SSLeay::randomize ();
1482    
1483     $TLS_CTX = Net::SSLeay::CTX_new ();
1484    
1485     Net::SSLeay::CTX_set_options ($TLS_CTX, Net::SSLeay::OP_ALL ());
1486    
1487     $TLS_CTX
1488     }
1489     }
1490    
1491 elmex 1.1 =back
1492    
1493 root 1.38 =head1 SUBCLASSING AnyEvent::Handle
1494    
1495     In many cases, you might want to subclass AnyEvent::Handle.
1496    
1497     To make this easier, a given version of AnyEvent::Handle uses these
1498     conventions:
1499    
1500     =over 4
1501    
1502     =item * all constructor arguments become object members.
1503    
1504     At least initially, when you pass a C<tls>-argument to the constructor it
1505 root 1.75 will end up in C<< $handle->{tls} >>. Those members might be changed or
1506 root 1.38 mutated later on (for example C<tls> will hold the TLS connection object).
1507    
1508     =item * other object member names are prefixed with an C<_>.
1509    
1510     All object members not explicitly documented (internal use) are prefixed
1511     with an underscore character, so the remaining non-C<_>-namespace is free
1512     for use for subclasses.
1513    
1514     =item * all members not documented here and not prefixed with an underscore
1515     are free to use in subclasses.
1516    
1517     Of course, new versions of AnyEvent::Handle may introduce more "public"
1518     member variables, but thats just life, at least it is documented.
1519    
1520     =back
1521    
1522 elmex 1.1 =head1 AUTHOR
1523    
1524 root 1.8 Robin Redeker C<< <elmex at ta-sa.org> >>, Marc Lehmann <schmorp@schmorp.de>.
1525 elmex 1.1
1526     =cut
1527    
1528     1; # End of AnyEvent::Handle