ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent/lib/AnyEvent/Handle.pm
Revision: 1.29
Committed: Sat May 24 23:10:18 2008 UTC (16 years, 1 month ago) by root
Branch: MAIN
Changes since 1.28: +80 -5 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 elmex 1.1 package AnyEvent::Handle;
2    
3 elmex 1.6 no warnings;
4 elmex 1.1 use strict;
5    
6 root 1.8 use AnyEvent ();
7     use AnyEvent::Util ();
8     use Scalar::Util ();
9     use Carp ();
10     use Fcntl ();
11 elmex 1.1 use Errno qw/EAGAIN EINTR/;
12    
13     =head1 NAME
14    
15 root 1.22 AnyEvent::Handle - non-blocking I/O on file handles via AnyEvent
16 elmex 1.1
17 root 1.15 This module is experimental.
18    
19 elmex 1.1 =cut
20    
21 root 1.15 our $VERSION = '0.04';
22 elmex 1.1
23     =head1 SYNOPSIS
24    
25     use AnyEvent;
26     use AnyEvent::Handle;
27    
28     my $cv = AnyEvent->condvar;
29    
30     my $ae_fh = AnyEvent::Handle->new (fh => \*STDIN);
31    
32 root 1.8 #TODO
33 elmex 1.1
34 elmex 1.2 # or use the constructor to pass the callback:
35    
36     my $ae_fh2 =
37     AnyEvent::Handle->new (
38     fh => \*STDIN,
39     on_eof => sub {
40     $cv->broadcast;
41     },
42 root 1.8 #TODO
43 elmex 1.2 );
44    
45 elmex 1.1 $cv->wait;
46    
47     =head1 DESCRIPTION
48    
49 root 1.8 This module is a helper module to make it easier to do event-based I/O on
50 elmex 1.13 filehandles. For utility functions for doing non-blocking connects and accepts
51     on sockets see L<AnyEvent::Util>.
52 root 1.8
53     In the following, when the documentation refers to of "bytes" then this
54     means characters. As sysread and syswrite are used for all I/O, their
55     treatment of characters applies to this module as well.
56 elmex 1.1
57 root 1.8 All callbacks will be invoked with the handle object as their first
58     argument.
59 elmex 1.1
60     =head1 METHODS
61    
62     =over 4
63    
64     =item B<new (%args)>
65    
66 root 1.8 The constructor supports these arguments (all as key => value pairs).
67 elmex 1.1
68     =over 4
69    
70 root 1.8 =item fh => $filehandle [MANDATORY]
71 elmex 1.1
72     The filehandle this L<AnyEvent::Handle> object will operate on.
73    
74 root 1.8 NOTE: The filehandle will be set to non-blocking (using
75     AnyEvent::Util::fh_nonblocking).
76    
77 root 1.16 =item on_eof => $cb->($self)
78 root 1.10
79     Set the callback to be called on EOF.
80 root 1.8
81 root 1.16 While not mandatory, it is highly recommended to set an eof callback,
82     otherwise you might end up with a closed socket while you are still
83     waiting for data.
84    
85 root 1.10 =item on_error => $cb->($self)
86    
87     This is the fatal error callback, that is called when, well, a fatal error
88 elmex 1.20 occurs, such as not being able to resolve the hostname, failure to connect
89 root 1.10 or a read error.
90 root 1.8
91     The object will not be in a usable state when this callback has been
92     called.
93    
94 root 1.10 On callback entrance, the value of C<$!> contains the operating system
95 root 1.29 error (or C<ENOSPC>, C<EPIPE> or C<EBADMSG>).
96 root 1.8
97 root 1.10 While not mandatory, it is I<highly> recommended to set this callback, as
98     you will not be notified of errors otherwise. The default simply calls
99     die.
100 root 1.8
101     =item on_read => $cb->($self)
102    
103     This sets the default read callback, which is called when data arrives
104 root 1.10 and no read request is in the queue.
105 root 1.8
106     To access (and remove data from) the read buffer, use the C<< ->rbuf >>
107 elmex 1.20 method or access the C<$self->{rbuf}> member directly.
108 root 1.8
109     When an EOF condition is detected then AnyEvent::Handle will first try to
110     feed all the remaining data to the queued callbacks and C<on_read> before
111     calling the C<on_eof> callback. If no progress can be made, then a fatal
112     error will be raised (with C<$!> set to C<EPIPE>).
113 elmex 1.1
114 root 1.8 =item on_drain => $cb->()
115 elmex 1.1
116 root 1.8 This sets the callback that is called when the write buffer becomes empty
117     (or when the callback is set and the buffer is empty already).
118 elmex 1.1
119 root 1.8 To append to the write buffer, use the C<< ->push_write >> method.
120 elmex 1.2
121 root 1.8 =item rbuf_max => <bytes>
122 elmex 1.2
123 root 1.8 If defined, then a fatal error will be raised (with C<$!> set to C<ENOSPC>)
124     when the read buffer ever (strictly) exceeds this size. This is useful to
125     avoid denial-of-service attacks.
126 elmex 1.2
127 root 1.8 For example, a server accepting connections from untrusted sources should
128     be configured to accept only so-and-so much data that it cannot act on
129     (for example, when expecting a line, an attacker could send an unlimited
130     amount of data without a callback ever being called as long as the line
131     isn't finished).
132 elmex 1.2
133 root 1.8 =item read_size => <bytes>
134 elmex 1.2
135 root 1.8 The default read block size (the amount of bytes this module will try to read
136     on each [loop iteration). Default: C<4096>.
137    
138     =item low_water_mark => <bytes>
139    
140     Sets the amount of bytes (default: C<0>) that make up an "empty" write
141     buffer: If the write reaches this size or gets even samller it is
142     considered empty.
143 elmex 1.2
144 root 1.19 =item tls => "accept" | "connect" | Net::SSLeay::SSL object
145    
146     When this parameter is given, it enables TLS (SSL) mode, that means it
147     will start making tls handshake and will transparently encrypt/decrypt
148     data.
149    
150 root 1.26 TLS mode requires Net::SSLeay to be installed (it will be loaded
151     automatically when you try to create a TLS handle).
152    
153 root 1.19 For the TLS server side, use C<accept>, and for the TLS client side of a
154     connection, use C<connect> mode.
155    
156     You can also provide your own TLS connection object, but you have
157     to make sure that you call either C<Net::SSLeay::set_connect_state>
158     or C<Net::SSLeay::set_accept_state> on it before you pass it to
159     AnyEvent::Handle.
160    
161 root 1.26 See the C<starttls> method if you need to start TLs negotiation later.
162    
163 root 1.19 =item tls_ctx => $ssl_ctx
164    
165     Use the given Net::SSLeay::CTX object to create the new TLS connection
166     (unless a connection object was specified directly). If this parameter is
167     missing, then AnyEvent::Handle will use C<AnyEvent::Handle::TLS_CTX>.
168    
169 elmex 1.1 =back
170    
171     =cut
172    
173 root 1.28 our (%RH, %WH);
174    
175     sub register_read_type($$) {
176     $RH{$_[0]} = $_[1];
177     }
178    
179     sub register_write_type($$) {
180     $WH{$_[0]} = $_[1];
181     }
182    
183 elmex 1.1 sub new {
184 root 1.8 my $class = shift;
185    
186     my $self = bless { @_ }, $class;
187    
188     $self->{fh} or Carp::croak "mandatory argument fh is missing";
189    
190     AnyEvent::Util::fh_nonblocking $self->{fh}, 1;
191 elmex 1.1
192 root 1.19 if ($self->{tls}) {
193     require Net::SSLeay;
194     $self->starttls (delete $self->{tls}, delete $self->{tls_ctx});
195     }
196    
197 root 1.16 $self->on_eof (delete $self->{on_eof} ) if $self->{on_eof};
198 root 1.10 $self->on_error (delete $self->{on_error}) if $self->{on_error};
199 root 1.8 $self->on_drain (delete $self->{on_drain}) if $self->{on_drain};
200     $self->on_read (delete $self->{on_read} ) if $self->{on_read};
201 elmex 1.1
202 root 1.10 $self->start_read;
203    
204 root 1.8 $self
205     }
206 elmex 1.2
207 root 1.8 sub _shutdown {
208     my ($self) = @_;
209 elmex 1.2
210 root 1.8 delete $self->{rw};
211     delete $self->{ww};
212     delete $self->{fh};
213     }
214    
215     sub error {
216     my ($self) = @_;
217    
218     {
219     local $!;
220     $self->_shutdown;
221 elmex 1.1 }
222    
223 root 1.10 if ($self->{on_error}) {
224     $self->{on_error}($self);
225     } else {
226 root 1.29 Carp::croak "AnyEvent::Handle uncaught fatal error: $!";
227 root 1.10 }
228 elmex 1.1 }
229    
230 root 1.8 =item $fh = $handle->fh
231 elmex 1.1
232 root 1.22 This method returns the file handle of the L<AnyEvent::Handle> object.
233 elmex 1.1
234     =cut
235    
236     sub fh { $_[0]->{fh} }
237    
238 root 1.8 =item $handle->on_error ($cb)
239 elmex 1.1
240 root 1.8 Replace the current C<on_error> callback (see the C<on_error> constructor argument).
241 elmex 1.1
242 root 1.8 =cut
243    
244     sub on_error {
245     $_[0]{on_error} = $_[1];
246     }
247    
248     =item $handle->on_eof ($cb)
249    
250     Replace the current C<on_eof> callback (see the C<on_eof> constructor argument).
251 elmex 1.1
252     =cut
253    
254 root 1.8 sub on_eof {
255     $_[0]{on_eof} = $_[1];
256     }
257    
258 root 1.9 #############################################################################
259    
260     =back
261    
262     =head2 WRITE QUEUE
263    
264     AnyEvent::Handle manages two queues per handle, one for writing and one
265     for reading.
266    
267     The write queue is very simple: you can add data to its end, and
268     AnyEvent::Handle will automatically try to get rid of it for you.
269    
270 elmex 1.20 When data could be written and the write buffer is shorter then the low
271 root 1.9 water mark, the C<on_drain> callback will be invoked.
272    
273     =over 4
274    
275 root 1.8 =item $handle->on_drain ($cb)
276    
277     Sets the C<on_drain> callback or clears it (see the description of
278     C<on_drain> in the constructor).
279    
280     =cut
281    
282     sub on_drain {
283 elmex 1.1 my ($self, $cb) = @_;
284    
285 root 1.8 $self->{on_drain} = $cb;
286    
287     $cb->($self)
288     if $cb && $self->{low_water_mark} >= length $self->{wbuf};
289     }
290    
291     =item $handle->push_write ($data)
292    
293     Queues the given scalar to be written. You can push as much data as you
294     want (only limited by the available memory), as C<AnyEvent::Handle>
295     buffers it independently of the kernel.
296    
297     =cut
298    
299 root 1.17 sub _drain_wbuf {
300     my ($self) = @_;
301 root 1.8
302 root 1.29 if (!$self->{ww} && length $self->{wbuf}) {
303 root 1.8 Scalar::Util::weaken $self;
304     my $cb = sub {
305     my $len = syswrite $self->{fh}, $self->{wbuf};
306    
307 root 1.29 if ($len >= 0) {
308 root 1.8 substr $self->{wbuf}, 0, $len, "";
309    
310     $self->{on_drain}($self)
311     if $self->{low_water_mark} >= length $self->{wbuf}
312     && $self->{on_drain};
313    
314     delete $self->{ww} unless length $self->{wbuf};
315     } elsif ($! != EAGAIN && $! != EINTR) {
316     $self->error;
317 elmex 1.1 }
318 root 1.8 };
319    
320     $self->{ww} = AnyEvent->io (fh => $self->{fh}, poll => "w", cb => $cb);
321    
322     $cb->($self);
323     };
324     }
325    
326 root 1.17 sub push_write {
327     my $self = shift;
328    
329 root 1.29 if (@_ > 1) {
330     my $type = shift;
331    
332     @_ = ($WH{$type} or Carp::croak "unsupported type passed to AnyEvent::Handle::push_write")
333     ->($self, @_);
334     }
335    
336 root 1.17 if ($self->{filter_w}) {
337 root 1.18 $self->{filter_w}->($self, \$_[0]);
338 root 1.17 } else {
339     $self->{wbuf} .= $_[0];
340     $self->_drain_wbuf;
341     }
342     }
343    
344 root 1.29 =item $handle->push_write (type => @args)
345    
346     =item $handle->unshift_write (type => @args)
347    
348     Instead of formatting your data yourself, you can also let this module do
349     the job by specifying a type and type-specific arguments.
350    
351     Predefined types are:
352    
353     =over 4
354    
355     =item netstring => $string
356    
357     Formats the given value as netstring
358     (http://cr.yp.to/proto/netstrings.txt, this is not a recommendation to use them).
359    
360     =cut
361    
362     register_write_type netstring => sub {
363     my ($self, $string) = @_;
364    
365     sprintf "%d:%s,", (length $string), $string
366     };
367    
368     =back
369    
370     =cut
371    
372    
373    
374 root 1.8 #############################################################################
375    
376 root 1.9 =back
377    
378     =head2 READ QUEUE
379    
380     AnyEvent::Handle manages two queues per handle, one for writing and one
381     for reading.
382    
383     The read queue is more complex than the write queue. It can be used in two
384     ways, the "simple" way, using only C<on_read> and the "complex" way, using
385     a queue.
386    
387     In the simple case, you just install an C<on_read> callback and whenever
388     new data arrives, it will be called. You can then remove some data (if
389     enough is there) from the read buffer (C<< $handle->rbuf >>) if you want
390     or not.
391    
392     In the more complex case, you want to queue multiple callbacks. In this
393     case, AnyEvent::Handle will call the first queued callback each time new
394     data arrives and removes it when it has done its job (see C<push_read>,
395     below).
396    
397     This way you can, for example, push three line-reads, followed by reading
398     a chunk of data, and AnyEvent::Handle will execute them in order.
399    
400     Example 1: EPP protocol parser. EPP sends 4 byte length info, followed by
401     the specified number of bytes which give an XML datagram.
402    
403     # in the default state, expect some header bytes
404     $handle->on_read (sub {
405     # some data is here, now queue the length-header-read (4 octets)
406     shift->unshift_read_chunk (4, sub {
407     # header arrived, decode
408     my $len = unpack "N", $_[1];
409    
410     # now read the payload
411     shift->unshift_read_chunk ($len, sub {
412     my $xml = $_[1];
413     # handle xml
414     });
415     });
416     });
417    
418     Example 2: Implement a client for a protocol that replies either with
419     "OK" and another line or "ERROR" for one request, and 64 bytes for the
420     second request. Due tot he availability of a full queue, we can just
421     pipeline sending both requests and manipulate the queue as necessary in
422     the callbacks:
423    
424     # request one
425     $handle->push_write ("request 1\015\012");
426    
427     # we expect "ERROR" or "OK" as response, so push a line read
428     $handle->push_read_line (sub {
429     # if we got an "OK", we have to _prepend_ another line,
430     # so it will be read before the second request reads its 64 bytes
431     # which are already in the queue when this callback is called
432     # we don't do this in case we got an error
433     if ($_[1] eq "OK") {
434     $_[0]->unshift_read_line (sub {
435     my $response = $_[1];
436     ...
437     });
438     }
439     });
440    
441     # request two
442     $handle->push_write ("request 2\015\012");
443    
444     # simply read 64 bytes, always
445     $handle->push_read_chunk (64, sub {
446     my $response = $_[1];
447     ...
448     });
449    
450     =over 4
451    
452 root 1.10 =cut
453    
454 root 1.8 sub _drain_rbuf {
455     my ($self) = @_;
456 elmex 1.1
457 root 1.17 if (
458     defined $self->{rbuf_max}
459     && $self->{rbuf_max} < length $self->{rbuf}
460     ) {
461     $! = &Errno::ENOSPC; return $self->error;
462     }
463    
464 root 1.11 return if $self->{in_drain};
465 root 1.8 local $self->{in_drain} = 1;
466 elmex 1.1
467 root 1.8 while (my $len = length $self->{rbuf}) {
468     no strict 'refs';
469 root 1.10 if (my $cb = shift @{ $self->{queue} }) {
470 root 1.29 unless ($cb->($self)) {
471 root 1.10 if ($self->{eof}) {
472     # no progress can be made (not enough data and no data forthcoming)
473     $! = &Errno::EPIPE; return $self->error;
474     }
475    
476     unshift @{ $self->{queue} }, $cb;
477 root 1.8 return;
478     }
479     } elsif ($self->{on_read}) {
480     $self->{on_read}($self);
481    
482     if (
483     $self->{eof} # if no further data will arrive
484     && $len == length $self->{rbuf} # and no data has been consumed
485     && !@{ $self->{queue} } # and the queue is still empty
486     && $self->{on_read} # and we still want to read data
487     ) {
488     # then no progress can be made
489     $! = &Errno::EPIPE; return $self->error;
490 elmex 1.1 }
491 root 1.8 } else {
492     # read side becomes idle
493     delete $self->{rw};
494     return;
495     }
496     }
497    
498     if ($self->{eof}) {
499     $self->_shutdown;
500 root 1.16 $self->{on_eof}($self)
501     if $self->{on_eof};
502 root 1.8 }
503 elmex 1.1 }
504    
505 root 1.8 =item $handle->on_read ($cb)
506 elmex 1.1
507 root 1.8 This replaces the currently set C<on_read> callback, or clears it (when
508     the new callback is C<undef>). See the description of C<on_read> in the
509     constructor.
510 elmex 1.1
511 root 1.8 =cut
512    
513     sub on_read {
514     my ($self, $cb) = @_;
515 elmex 1.1
516 root 1.8 $self->{on_read} = $cb;
517 elmex 1.1 }
518    
519 root 1.8 =item $handle->rbuf
520    
521     Returns the read buffer (as a modifiable lvalue).
522 elmex 1.1
523 root 1.8 You can access the read buffer directly as the C<< ->{rbuf} >> member, if
524     you want.
525 elmex 1.1
526 root 1.8 NOTE: The read buffer should only be used or modified if the C<on_read>,
527     C<push_read> or C<unshift_read> methods are used. The other read methods
528     automatically manage the read buffer.
529 elmex 1.1
530     =cut
531    
532 elmex 1.2 sub rbuf : lvalue {
533 root 1.8 $_[0]{rbuf}
534 elmex 1.2 }
535 elmex 1.1
536 root 1.8 =item $handle->push_read ($cb)
537    
538     =item $handle->unshift_read ($cb)
539    
540     Append the given callback to the end of the queue (C<push_read>) or
541     prepend it (C<unshift_read>).
542    
543     The callback is called each time some additional read data arrives.
544 elmex 1.1
545 elmex 1.20 It must check whether enough data is in the read buffer already.
546 elmex 1.1
547 root 1.8 If not enough data is available, it must return the empty list or a false
548     value, in which case it will be called repeatedly until enough data is
549     available (or an error condition is detected).
550    
551     If enough data was available, then the callback must remove all data it is
552     interested in (which can be none at all) and return a true value. After returning
553     true, it will be removed from the queue.
554 elmex 1.1
555     =cut
556    
557 root 1.8 sub push_read {
558 root 1.28 my $self = shift;
559     my $cb = pop;
560    
561     if (@_) {
562     my $type = shift;
563    
564     $cb = ($RH{$type} or Carp::croak "unsupported type passed to AnyEvent::Handle::push_read")
565     ->($self, $cb, @_);
566     }
567 elmex 1.1
568 root 1.8 push @{ $self->{queue} }, $cb;
569     $self->_drain_rbuf;
570 elmex 1.1 }
571    
572 root 1.8 sub unshift_read {
573 root 1.28 my $self = shift;
574     my $cb = pop;
575    
576     if (@_) {
577     my $type = shift;
578    
579     $cb = ($RH{$type} or Carp::croak "unsupported type passed to AnyEvent::Handle::unshift_read")
580     ->($self, $cb, @_);
581     }
582    
583 root 1.8
584 root 1.28 unshift @{ $self->{queue} }, $cb;
585 root 1.8 $self->_drain_rbuf;
586     }
587 elmex 1.1
588 root 1.28 =item $handle->push_read (type => @args, $cb)
589 elmex 1.1
590 root 1.28 =item $handle->unshift_read (type => @args, $cb)
591 elmex 1.1
592 root 1.28 Instead of providing a callback that parses the data itself you can chose
593     between a number of predefined parsing formats, for chunks of data, lines
594     etc.
595 elmex 1.1
596 root 1.28 The types currently supported are:
597    
598     =over 4
599    
600     =item chunk => $octets, $cb->($self, $data)
601    
602     Invoke the callback only once C<$octets> bytes have been read. Pass the
603     data read to the callback. The callback will never be called with less
604     data.
605    
606     Example: read 2 bytes.
607    
608     $handle->push_read (chunk => 2, sub {
609     warn "yay ", unpack "H*", $_[1];
610     });
611 elmex 1.1
612     =cut
613    
614 root 1.28 register_read_type chunk => sub {
615     my ($self, $cb, $len) = @_;
616 elmex 1.1
617 root 1.8 sub {
618     $len <= length $_[0]{rbuf} or return;
619 elmex 1.12 $cb->($_[0], substr $_[0]{rbuf}, 0, $len, "");
620 root 1.8 1
621     }
622 root 1.28 };
623 root 1.8
624 root 1.28 # compatibility with older API
625 root 1.8 sub push_read_chunk {
626 root 1.28 $_[0]->push_read (chunk => $_[1], $_[2]);
627 root 1.8 }
628 elmex 1.1
629 root 1.8 sub unshift_read_chunk {
630 root 1.28 $_[0]->unshift_read (chunk => $_[1], $_[2]);
631 elmex 1.1 }
632    
633 root 1.28 =item line => [$eol, ]$cb->($self, $line, $eol)
634 elmex 1.1
635 root 1.8 The callback will be called only once a full line (including the end of
636     line marker, C<$eol>) has been read. This line (excluding the end of line
637     marker) will be passed to the callback as second argument (C<$line>), and
638     the end of line marker as the third argument (C<$eol>).
639 elmex 1.1
640 root 1.8 The end of line marker, C<$eol>, can be either a string, in which case it
641     will be interpreted as a fixed record end marker, or it can be a regex
642     object (e.g. created by C<qr>), in which case it is interpreted as a
643     regular expression.
644 elmex 1.1
645 root 1.8 The end of line marker argument C<$eol> is optional, if it is missing (NOT
646     undef), then C<qr|\015?\012|> is used (which is good for most internet
647     protocols).
648 elmex 1.1
649 root 1.8 Partial lines at the end of the stream will never be returned, as they are
650     not marked by the end of line marker.
651 elmex 1.1
652 root 1.8 =cut
653 elmex 1.1
654 root 1.28 register_read_type line => sub {
655     my ($self, $cb, $eol) = @_;
656 elmex 1.1
657 root 1.28 $eol = qr|(\015?\012)| if @_ < 3;
658 root 1.14 $eol = quotemeta $eol unless ref $eol;
659     $eol = qr|^(.*?)($eol)|s;
660 elmex 1.1
661 root 1.8 sub {
662     $_[0]{rbuf} =~ s/$eol// or return;
663 elmex 1.1
664 elmex 1.12 $cb->($_[0], $1, $2);
665 root 1.8 1
666     }
667 root 1.28 };
668 elmex 1.1
669 root 1.28 # compatibility with older API
670 root 1.8 sub push_read_line {
671 root 1.28 my $self = shift;
672     $self->push_read (line => @_);
673 root 1.10 }
674    
675     sub unshift_read_line {
676 root 1.28 my $self = shift;
677     $self->unshift_read (line => @_);
678 root 1.10 }
679    
680 root 1.29 =item netstring => $cb->($string)
681    
682     A netstring (http://cr.yp.to/proto/netstrings.txt, this is not an endorsement).
683    
684     Throws an error with C<$!> set to EBADMSG on format violations.
685    
686     =cut
687    
688     register_read_type netstring => sub {
689     my ($self, $cb) = @_;
690    
691     sub {
692     unless ($_[0]{rbuf} =~ s/^(0|[1-9][0-9]*)://) {
693     if ($_[0]{rbuf} =~ /[^0-9]/) {
694     $! = &Errno::EBADMSG;
695     $self->error;
696     }
697     return;
698     }
699    
700     my $len = $1;
701    
702     $self->unshift_read (chunk => $len, sub {
703     my $string = $_[1];
704     $_[0]->unshift_read (chunk => 1, sub {
705     if ($_[1] eq ",") {
706     $cb->($_[0], $string);
707     } else {
708     $! = &Errno::EBADMSG;
709     $self->error;
710     }
711     });
712     });
713    
714     1
715     }
716     };
717    
718 root 1.28 =back
719    
720 root 1.10 =item $handle->stop_read
721    
722     =item $handle->start_read
723    
724 root 1.18 In rare cases you actually do not want to read anything from the
725 root 1.10 socket. In this case you can call C<stop_read>. Neither C<on_read> no
726 root 1.22 any queued callbacks will be executed then. To start reading again, call
727 root 1.10 C<start_read>.
728    
729     =cut
730    
731     sub stop_read {
732     my ($self) = @_;
733 elmex 1.1
734 root 1.10 delete $self->{rw};
735 root 1.8 }
736 elmex 1.1
737 root 1.10 sub start_read {
738     my ($self) = @_;
739    
740     unless ($self->{rw} || $self->{eof}) {
741     Scalar::Util::weaken $self;
742    
743     $self->{rw} = AnyEvent->io (fh => $self->{fh}, poll => "r", cb => sub {
744 root 1.17 my $rbuf = $self->{filter_r} ? \my $buf : \$self->{rbuf};
745     my $len = sysread $self->{fh}, $$rbuf, $self->{read_size} || 8192, length $$rbuf;
746 root 1.10
747     if ($len > 0) {
748 root 1.17 $self->{filter_r}
749 root 1.18 ? $self->{filter_r}->($self, $rbuf)
750 root 1.17 : $self->_drain_rbuf;
751 root 1.10
752     } elsif (defined $len) {
753 root 1.17 delete $self->{rw};
754 root 1.10 $self->{eof} = 1;
755 root 1.17 $self->_drain_rbuf;
756 root 1.10
757     } elsif ($! != EAGAIN && $! != EINTR) {
758     return $self->error;
759     }
760     });
761     }
762 elmex 1.1 }
763    
764 root 1.19 sub _dotls {
765     my ($self) = @_;
766    
767     if (length $self->{tls_wbuf}) {
768 root 1.22 while ((my $len = Net::SSLeay::write ($self->{tls}, $self->{tls_wbuf})) > 0) {
769     substr $self->{tls_wbuf}, 0, $len, "";
770     }
771 root 1.19 }
772    
773     if (defined (my $buf = Net::SSLeay::BIO_read ($self->{tls_wbio}))) {
774     $self->{wbuf} .= $buf;
775     $self->_drain_wbuf;
776     }
777    
778 root 1.23 while (defined (my $buf = Net::SSLeay::read ($self->{tls}))) {
779     $self->{rbuf} .= $buf;
780     $self->_drain_rbuf;
781     }
782    
783 root 1.24 my $err = Net::SSLeay::get_error ($self->{tls}, -1);
784    
785     if ($err!= Net::SSLeay::ERROR_WANT_READ ()) {
786 root 1.23 if ($err == Net::SSLeay::ERROR_SYSCALL ()) {
787     $self->error;
788     } elsif ($err == Net::SSLeay::ERROR_SSL ()) {
789     $! = &Errno::EIO;
790     $self->error;
791 root 1.19 }
792 root 1.23
793     # all others are fine for our purposes
794 root 1.19 }
795     }
796    
797 root 1.25 =item $handle->starttls ($tls[, $tls_ctx])
798    
799     Instead of starting TLS negotiation immediately when the AnyEvent::Handle
800     object is created, you can also do that at a later time by calling
801     C<starttls>.
802    
803     The first argument is the same as the C<tls> constructor argument (either
804     C<"connect">, C<"accept"> or an existing Net::SSLeay object).
805    
806     The second argument is the optional C<Net::SSLeay::CTX> object that is
807     used when AnyEvent::Handle has to create its own TLS connection object.
808    
809     =cut
810    
811 root 1.19 # TODO: maybe document...
812     sub starttls {
813     my ($self, $ssl, $ctx) = @_;
814    
815 root 1.25 $self->stoptls;
816    
817 root 1.19 if ($ssl eq "accept") {
818     $ssl = Net::SSLeay::new ($ctx || TLS_CTX ());
819     Net::SSLeay::set_accept_state ($ssl);
820     } elsif ($ssl eq "connect") {
821     $ssl = Net::SSLeay::new ($ctx || TLS_CTX ());
822     Net::SSLeay::set_connect_state ($ssl);
823     }
824    
825     $self->{tls} = $ssl;
826    
827 root 1.21 # basically, this is deep magic (because SSL_read should have the same issues)
828     # but the openssl maintainers basically said: "trust us, it just works".
829     # (unfortunately, we have to hardcode constants because the abysmally misdesigned
830     # and mismaintained ssleay-module doesn't even offer them).
831 root 1.27 # http://www.mail-archive.com/openssl-dev@openssl.org/msg22420.html
832 root 1.21 Net::SSLeay::CTX_set_mode ($self->{tls},
833     (eval { Net::SSLeay::MODE_ENABLE_PARTIAL_WRITE () } || 1)
834     | (eval { Net::SSLeay::MODE_ACCEPT_MOVING_WRITE_BUFFER () } || 2));
835    
836 root 1.19 $self->{tls_rbio} = Net::SSLeay::BIO_new (Net::SSLeay::BIO_s_mem ());
837     $self->{tls_wbio} = Net::SSLeay::BIO_new (Net::SSLeay::BIO_s_mem ());
838    
839     Net::SSLeay::set_bio ($ssl, $self->{tls_rbio}, $self->{tls_wbio});
840    
841     $self->{filter_w} = sub {
842     $_[0]{tls_wbuf} .= ${$_[1]};
843     &_dotls;
844     };
845     $self->{filter_r} = sub {
846     Net::SSLeay::BIO_write ($_[0]{tls_rbio}, ${$_[1]});
847     &_dotls;
848     };
849     }
850    
851 root 1.25 =item $handle->stoptls
852    
853     Destroys the SSL connection, if any. Partial read or write data will be
854     lost.
855    
856     =cut
857    
858     sub stoptls {
859     my ($self) = @_;
860    
861     Net::SSLeay::free (delete $self->{tls}) if $self->{tls};
862     delete $self->{tls_rbio};
863     delete $self->{tls_wbio};
864     delete $self->{tls_wbuf};
865     delete $self->{filter_r};
866     delete $self->{filter_w};
867     }
868    
869 root 1.19 sub DESTROY {
870     my $self = shift;
871    
872 root 1.25 $self->stoptls;
873 root 1.19 }
874    
875     =item AnyEvent::Handle::TLS_CTX
876    
877     This function creates and returns the Net::SSLeay::CTX object used by
878     default for TLS mode.
879    
880     The context is created like this:
881    
882     Net::SSLeay::load_error_strings;
883     Net::SSLeay::SSLeay_add_ssl_algorithms;
884     Net::SSLeay::randomize;
885    
886     my $CTX = Net::SSLeay::CTX_new;
887    
888     Net::SSLeay::CTX_set_options $CTX, Net::SSLeay::OP_ALL
889    
890     =cut
891    
892     our $TLS_CTX;
893    
894     sub TLS_CTX() {
895     $TLS_CTX || do {
896     require Net::SSLeay;
897    
898     Net::SSLeay::load_error_strings ();
899     Net::SSLeay::SSLeay_add_ssl_algorithms ();
900     Net::SSLeay::randomize ();
901    
902     $TLS_CTX = Net::SSLeay::CTX_new ();
903    
904     Net::SSLeay::CTX_set_options ($TLS_CTX, Net::SSLeay::OP_ALL ());
905    
906     $TLS_CTX
907     }
908     }
909    
910 elmex 1.1 =back
911    
912     =head1 AUTHOR
913    
914 root 1.8 Robin Redeker C<< <elmex at ta-sa.org> >>, Marc Lehmann <schmorp@schmorp.de>.
915 elmex 1.1
916     =cut
917    
918     1; # End of AnyEvent::Handle