ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent/lib/AnyEvent/Handle.pm
Revision: 1.30
Committed: Sat May 24 23:56:26 2008 UTC (16 years ago) by root
Branch: MAIN
Changes since 1.29: +48 -14 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 elmex 1.1 package AnyEvent::Handle;
2    
3 elmex 1.6 no warnings;
4 elmex 1.1 use strict;
5    
6 root 1.8 use AnyEvent ();
7     use AnyEvent::Util ();
8     use Scalar::Util ();
9     use Carp ();
10     use Fcntl ();
11 elmex 1.1 use Errno qw/EAGAIN EINTR/;
12    
13     =head1 NAME
14    
15 root 1.22 AnyEvent::Handle - non-blocking I/O on file handles via AnyEvent
16 elmex 1.1
17 root 1.15 This module is experimental.
18    
19 elmex 1.1 =cut
20    
21 root 1.15 our $VERSION = '0.04';
22 elmex 1.1
23     =head1 SYNOPSIS
24    
25     use AnyEvent;
26     use AnyEvent::Handle;
27    
28     my $cv = AnyEvent->condvar;
29    
30     my $ae_fh = AnyEvent::Handle->new (fh => \*STDIN);
31    
32 root 1.8 #TODO
33 elmex 1.1
34 elmex 1.2 # or use the constructor to pass the callback:
35    
36     my $ae_fh2 =
37     AnyEvent::Handle->new (
38     fh => \*STDIN,
39     on_eof => sub {
40     $cv->broadcast;
41     },
42 root 1.8 #TODO
43 elmex 1.2 );
44    
45 elmex 1.1 $cv->wait;
46    
47     =head1 DESCRIPTION
48    
49 root 1.8 This module is a helper module to make it easier to do event-based I/O on
50 elmex 1.13 filehandles. For utility functions for doing non-blocking connects and accepts
51     on sockets see L<AnyEvent::Util>.
52 root 1.8
53     In the following, when the documentation refers to of "bytes" then this
54     means characters. As sysread and syswrite are used for all I/O, their
55     treatment of characters applies to this module as well.
56 elmex 1.1
57 root 1.8 All callbacks will be invoked with the handle object as their first
58     argument.
59 elmex 1.1
60     =head1 METHODS
61    
62     =over 4
63    
64     =item B<new (%args)>
65    
66 root 1.8 The constructor supports these arguments (all as key => value pairs).
67 elmex 1.1
68     =over 4
69    
70 root 1.8 =item fh => $filehandle [MANDATORY]
71 elmex 1.1
72     The filehandle this L<AnyEvent::Handle> object will operate on.
73    
74 root 1.8 NOTE: The filehandle will be set to non-blocking (using
75     AnyEvent::Util::fh_nonblocking).
76    
77 root 1.16 =item on_eof => $cb->($self)
78 root 1.10
79     Set the callback to be called on EOF.
80 root 1.8
81 root 1.16 While not mandatory, it is highly recommended to set an eof callback,
82     otherwise you might end up with a closed socket while you are still
83     waiting for data.
84    
85 root 1.10 =item on_error => $cb->($self)
86    
87     This is the fatal error callback, that is called when, well, a fatal error
88 elmex 1.20 occurs, such as not being able to resolve the hostname, failure to connect
89 root 1.10 or a read error.
90 root 1.8
91     The object will not be in a usable state when this callback has been
92     called.
93    
94 root 1.10 On callback entrance, the value of C<$!> contains the operating system
95 root 1.29 error (or C<ENOSPC>, C<EPIPE> or C<EBADMSG>).
96 root 1.8
97 root 1.10 While not mandatory, it is I<highly> recommended to set this callback, as
98     you will not be notified of errors otherwise. The default simply calls
99     die.
100 root 1.8
101     =item on_read => $cb->($self)
102    
103     This sets the default read callback, which is called when data arrives
104 root 1.10 and no read request is in the queue.
105 root 1.8
106     To access (and remove data from) the read buffer, use the C<< ->rbuf >>
107 elmex 1.20 method or access the C<$self->{rbuf}> member directly.
108 root 1.8
109     When an EOF condition is detected then AnyEvent::Handle will first try to
110     feed all the remaining data to the queued callbacks and C<on_read> before
111     calling the C<on_eof> callback. If no progress can be made, then a fatal
112     error will be raised (with C<$!> set to C<EPIPE>).
113 elmex 1.1
114 root 1.8 =item on_drain => $cb->()
115 elmex 1.1
116 root 1.8 This sets the callback that is called when the write buffer becomes empty
117     (or when the callback is set and the buffer is empty already).
118 elmex 1.1
119 root 1.8 To append to the write buffer, use the C<< ->push_write >> method.
120 elmex 1.2
121 root 1.8 =item rbuf_max => <bytes>
122 elmex 1.2
123 root 1.8 If defined, then a fatal error will be raised (with C<$!> set to C<ENOSPC>)
124     when the read buffer ever (strictly) exceeds this size. This is useful to
125     avoid denial-of-service attacks.
126 elmex 1.2
127 root 1.8 For example, a server accepting connections from untrusted sources should
128     be configured to accept only so-and-so much data that it cannot act on
129     (for example, when expecting a line, an attacker could send an unlimited
130     amount of data without a callback ever being called as long as the line
131     isn't finished).
132 elmex 1.2
133 root 1.8 =item read_size => <bytes>
134 elmex 1.2
135 root 1.8 The default read block size (the amount of bytes this module will try to read
136     on each [loop iteration). Default: C<4096>.
137    
138     =item low_water_mark => <bytes>
139    
140     Sets the amount of bytes (default: C<0>) that make up an "empty" write
141     buffer: If the write reaches this size or gets even samller it is
142     considered empty.
143 elmex 1.2
144 root 1.19 =item tls => "accept" | "connect" | Net::SSLeay::SSL object
145    
146     When this parameter is given, it enables TLS (SSL) mode, that means it
147     will start making tls handshake and will transparently encrypt/decrypt
148     data.
149    
150 root 1.26 TLS mode requires Net::SSLeay to be installed (it will be loaded
151     automatically when you try to create a TLS handle).
152    
153 root 1.19 For the TLS server side, use C<accept>, and for the TLS client side of a
154     connection, use C<connect> mode.
155    
156     You can also provide your own TLS connection object, but you have
157     to make sure that you call either C<Net::SSLeay::set_connect_state>
158     or C<Net::SSLeay::set_accept_state> on it before you pass it to
159     AnyEvent::Handle.
160    
161 root 1.26 See the C<starttls> method if you need to start TLs negotiation later.
162    
163 root 1.19 =item tls_ctx => $ssl_ctx
164    
165     Use the given Net::SSLeay::CTX object to create the new TLS connection
166     (unless a connection object was specified directly). If this parameter is
167     missing, then AnyEvent::Handle will use C<AnyEvent::Handle::TLS_CTX>.
168    
169 elmex 1.1 =back
170    
171     =cut
172    
173     sub new {
174 root 1.8 my $class = shift;
175    
176     my $self = bless { @_ }, $class;
177    
178     $self->{fh} or Carp::croak "mandatory argument fh is missing";
179    
180     AnyEvent::Util::fh_nonblocking $self->{fh}, 1;
181 elmex 1.1
182 root 1.19 if ($self->{tls}) {
183     require Net::SSLeay;
184     $self->starttls (delete $self->{tls}, delete $self->{tls_ctx});
185     }
186    
187 root 1.16 $self->on_eof (delete $self->{on_eof} ) if $self->{on_eof};
188 root 1.10 $self->on_error (delete $self->{on_error}) if $self->{on_error};
189 root 1.8 $self->on_drain (delete $self->{on_drain}) if $self->{on_drain};
190     $self->on_read (delete $self->{on_read} ) if $self->{on_read};
191 elmex 1.1
192 root 1.10 $self->start_read;
193    
194 root 1.8 $self
195     }
196 elmex 1.2
197 root 1.8 sub _shutdown {
198     my ($self) = @_;
199 elmex 1.2
200 root 1.8 delete $self->{rw};
201     delete $self->{ww};
202     delete $self->{fh};
203     }
204    
205     sub error {
206     my ($self) = @_;
207    
208     {
209     local $!;
210     $self->_shutdown;
211 elmex 1.1 }
212    
213 root 1.10 if ($self->{on_error}) {
214     $self->{on_error}($self);
215     } else {
216 root 1.29 Carp::croak "AnyEvent::Handle uncaught fatal error: $!";
217 root 1.10 }
218 elmex 1.1 }
219    
220 root 1.8 =item $fh = $handle->fh
221 elmex 1.1
222 root 1.22 This method returns the file handle of the L<AnyEvent::Handle> object.
223 elmex 1.1
224     =cut
225    
226     sub fh { $_[0]->{fh} }
227    
228 root 1.8 =item $handle->on_error ($cb)
229 elmex 1.1
230 root 1.8 Replace the current C<on_error> callback (see the C<on_error> constructor argument).
231 elmex 1.1
232 root 1.8 =cut
233    
234     sub on_error {
235     $_[0]{on_error} = $_[1];
236     }
237    
238     =item $handle->on_eof ($cb)
239    
240     Replace the current C<on_eof> callback (see the C<on_eof> constructor argument).
241 elmex 1.1
242     =cut
243    
244 root 1.8 sub on_eof {
245     $_[0]{on_eof} = $_[1];
246     }
247    
248 root 1.9 #############################################################################
249    
250     =back
251    
252     =head2 WRITE QUEUE
253    
254     AnyEvent::Handle manages two queues per handle, one for writing and one
255     for reading.
256    
257     The write queue is very simple: you can add data to its end, and
258     AnyEvent::Handle will automatically try to get rid of it for you.
259    
260 elmex 1.20 When data could be written and the write buffer is shorter then the low
261 root 1.9 water mark, the C<on_drain> callback will be invoked.
262    
263     =over 4
264    
265 root 1.8 =item $handle->on_drain ($cb)
266    
267     Sets the C<on_drain> callback or clears it (see the description of
268     C<on_drain> in the constructor).
269    
270     =cut
271    
272     sub on_drain {
273 elmex 1.1 my ($self, $cb) = @_;
274    
275 root 1.8 $self->{on_drain} = $cb;
276    
277     $cb->($self)
278     if $cb && $self->{low_water_mark} >= length $self->{wbuf};
279     }
280    
281     =item $handle->push_write ($data)
282    
283     Queues the given scalar to be written. You can push as much data as you
284     want (only limited by the available memory), as C<AnyEvent::Handle>
285     buffers it independently of the kernel.
286    
287     =cut
288    
289 root 1.17 sub _drain_wbuf {
290     my ($self) = @_;
291 root 1.8
292 root 1.29 if (!$self->{ww} && length $self->{wbuf}) {
293 root 1.8 Scalar::Util::weaken $self;
294     my $cb = sub {
295     my $len = syswrite $self->{fh}, $self->{wbuf};
296    
297 root 1.29 if ($len >= 0) {
298 root 1.8 substr $self->{wbuf}, 0, $len, "";
299    
300     $self->{on_drain}($self)
301     if $self->{low_water_mark} >= length $self->{wbuf}
302     && $self->{on_drain};
303    
304     delete $self->{ww} unless length $self->{wbuf};
305     } elsif ($! != EAGAIN && $! != EINTR) {
306     $self->error;
307 elmex 1.1 }
308 root 1.8 };
309    
310     $self->{ww} = AnyEvent->io (fh => $self->{fh}, poll => "w", cb => $cb);
311    
312     $cb->($self);
313     };
314     }
315    
316 root 1.30 our %WH;
317    
318     sub register_write_type($$) {
319     $WH{$_[0]} = $_[1];
320     }
321    
322 root 1.17 sub push_write {
323     my $self = shift;
324    
325 root 1.29 if (@_ > 1) {
326     my $type = shift;
327    
328     @_ = ($WH{$type} or Carp::croak "unsupported type passed to AnyEvent::Handle::push_write")
329     ->($self, @_);
330     }
331    
332 root 1.17 if ($self->{filter_w}) {
333 root 1.18 $self->{filter_w}->($self, \$_[0]);
334 root 1.17 } else {
335     $self->{wbuf} .= $_[0];
336     $self->_drain_wbuf;
337     }
338     }
339    
340 root 1.29 =item $handle->push_write (type => @args)
341    
342     =item $handle->unshift_write (type => @args)
343    
344     Instead of formatting your data yourself, you can also let this module do
345     the job by specifying a type and type-specific arguments.
346    
347 root 1.30 Predefined types are (if you have ideas for additional types, feel free to
348     drop by and tell us):
349 root 1.29
350     =over 4
351    
352     =item netstring => $string
353    
354     Formats the given value as netstring
355     (http://cr.yp.to/proto/netstrings.txt, this is not a recommendation to use them).
356    
357 root 1.30 =back
358    
359 root 1.29 =cut
360    
361     register_write_type netstring => sub {
362     my ($self, $string) = @_;
363    
364     sprintf "%d:%s,", (length $string), $string
365     };
366    
367 root 1.30 =item AnyEvent::Handle::register_write_type type => $coderef->($self, @args)
368    
369     This function (not method) lets you add your own types to C<push_write>.
370     Whenever the given C<type> is used, C<push_write> will invoke the code
371     reference with the handle object and the remaining arguments.
372 root 1.29
373 root 1.30 The code reference is supposed to return a single octet string that will
374     be appended to the write buffer.
375 root 1.29
376 root 1.30 Note that this is a function, and all types registered this way will be
377     global, so try to use unique names.
378 root 1.29
379 root 1.30 =cut
380 root 1.29
381 root 1.8 #############################################################################
382    
383 root 1.9 =back
384    
385     =head2 READ QUEUE
386    
387     AnyEvent::Handle manages two queues per handle, one for writing and one
388     for reading.
389    
390     The read queue is more complex than the write queue. It can be used in two
391     ways, the "simple" way, using only C<on_read> and the "complex" way, using
392     a queue.
393    
394     In the simple case, you just install an C<on_read> callback and whenever
395     new data arrives, it will be called. You can then remove some data (if
396     enough is there) from the read buffer (C<< $handle->rbuf >>) if you want
397     or not.
398    
399     In the more complex case, you want to queue multiple callbacks. In this
400     case, AnyEvent::Handle will call the first queued callback each time new
401     data arrives and removes it when it has done its job (see C<push_read>,
402     below).
403    
404     This way you can, for example, push three line-reads, followed by reading
405     a chunk of data, and AnyEvent::Handle will execute them in order.
406    
407     Example 1: EPP protocol parser. EPP sends 4 byte length info, followed by
408     the specified number of bytes which give an XML datagram.
409    
410     # in the default state, expect some header bytes
411     $handle->on_read (sub {
412     # some data is here, now queue the length-header-read (4 octets)
413     shift->unshift_read_chunk (4, sub {
414     # header arrived, decode
415     my $len = unpack "N", $_[1];
416    
417     # now read the payload
418     shift->unshift_read_chunk ($len, sub {
419     my $xml = $_[1];
420     # handle xml
421     });
422     });
423     });
424    
425     Example 2: Implement a client for a protocol that replies either with
426     "OK" and another line or "ERROR" for one request, and 64 bytes for the
427     second request. Due tot he availability of a full queue, we can just
428     pipeline sending both requests and manipulate the queue as necessary in
429     the callbacks:
430    
431     # request one
432     $handle->push_write ("request 1\015\012");
433    
434     # we expect "ERROR" or "OK" as response, so push a line read
435     $handle->push_read_line (sub {
436     # if we got an "OK", we have to _prepend_ another line,
437     # so it will be read before the second request reads its 64 bytes
438     # which are already in the queue when this callback is called
439     # we don't do this in case we got an error
440     if ($_[1] eq "OK") {
441     $_[0]->unshift_read_line (sub {
442     my $response = $_[1];
443     ...
444     });
445     }
446     });
447    
448     # request two
449     $handle->push_write ("request 2\015\012");
450    
451     # simply read 64 bytes, always
452     $handle->push_read_chunk (64, sub {
453     my $response = $_[1];
454     ...
455     });
456    
457     =over 4
458    
459 root 1.10 =cut
460    
461 root 1.8 sub _drain_rbuf {
462     my ($self) = @_;
463 elmex 1.1
464 root 1.17 if (
465     defined $self->{rbuf_max}
466     && $self->{rbuf_max} < length $self->{rbuf}
467     ) {
468     $! = &Errno::ENOSPC; return $self->error;
469     }
470    
471 root 1.11 return if $self->{in_drain};
472 root 1.8 local $self->{in_drain} = 1;
473 elmex 1.1
474 root 1.8 while (my $len = length $self->{rbuf}) {
475     no strict 'refs';
476 root 1.10 if (my $cb = shift @{ $self->{queue} }) {
477 root 1.29 unless ($cb->($self)) {
478 root 1.10 if ($self->{eof}) {
479     # no progress can be made (not enough data and no data forthcoming)
480     $! = &Errno::EPIPE; return $self->error;
481     }
482    
483     unshift @{ $self->{queue} }, $cb;
484 root 1.8 return;
485     }
486     } elsif ($self->{on_read}) {
487     $self->{on_read}($self);
488    
489     if (
490     $self->{eof} # if no further data will arrive
491     && $len == length $self->{rbuf} # and no data has been consumed
492     && !@{ $self->{queue} } # and the queue is still empty
493     && $self->{on_read} # and we still want to read data
494     ) {
495     # then no progress can be made
496     $! = &Errno::EPIPE; return $self->error;
497 elmex 1.1 }
498 root 1.8 } else {
499     # read side becomes idle
500     delete $self->{rw};
501     return;
502     }
503     }
504    
505     if ($self->{eof}) {
506     $self->_shutdown;
507 root 1.16 $self->{on_eof}($self)
508     if $self->{on_eof};
509 root 1.8 }
510 elmex 1.1 }
511    
512 root 1.8 =item $handle->on_read ($cb)
513 elmex 1.1
514 root 1.8 This replaces the currently set C<on_read> callback, or clears it (when
515     the new callback is C<undef>). See the description of C<on_read> in the
516     constructor.
517 elmex 1.1
518 root 1.8 =cut
519    
520     sub on_read {
521     my ($self, $cb) = @_;
522 elmex 1.1
523 root 1.8 $self->{on_read} = $cb;
524 elmex 1.1 }
525    
526 root 1.8 =item $handle->rbuf
527    
528     Returns the read buffer (as a modifiable lvalue).
529 elmex 1.1
530 root 1.8 You can access the read buffer directly as the C<< ->{rbuf} >> member, if
531     you want.
532 elmex 1.1
533 root 1.8 NOTE: The read buffer should only be used or modified if the C<on_read>,
534     C<push_read> or C<unshift_read> methods are used. The other read methods
535     automatically manage the read buffer.
536 elmex 1.1
537     =cut
538    
539 elmex 1.2 sub rbuf : lvalue {
540 root 1.8 $_[0]{rbuf}
541 elmex 1.2 }
542 elmex 1.1
543 root 1.8 =item $handle->push_read ($cb)
544    
545     =item $handle->unshift_read ($cb)
546    
547     Append the given callback to the end of the queue (C<push_read>) or
548     prepend it (C<unshift_read>).
549    
550     The callback is called each time some additional read data arrives.
551 elmex 1.1
552 elmex 1.20 It must check whether enough data is in the read buffer already.
553 elmex 1.1
554 root 1.8 If not enough data is available, it must return the empty list or a false
555     value, in which case it will be called repeatedly until enough data is
556     available (or an error condition is detected).
557    
558     If enough data was available, then the callback must remove all data it is
559     interested in (which can be none at all) and return a true value. After returning
560     true, it will be removed from the queue.
561 elmex 1.1
562     =cut
563    
564 root 1.30 our %RH;
565    
566     sub register_read_type($$) {
567     $RH{$_[0]} = $_[1];
568     }
569    
570 root 1.8 sub push_read {
571 root 1.28 my $self = shift;
572     my $cb = pop;
573    
574     if (@_) {
575     my $type = shift;
576    
577     $cb = ($RH{$type} or Carp::croak "unsupported type passed to AnyEvent::Handle::push_read")
578     ->($self, $cb, @_);
579     }
580 elmex 1.1
581 root 1.8 push @{ $self->{queue} }, $cb;
582     $self->_drain_rbuf;
583 elmex 1.1 }
584    
585 root 1.8 sub unshift_read {
586 root 1.28 my $self = shift;
587     my $cb = pop;
588    
589     if (@_) {
590     my $type = shift;
591    
592     $cb = ($RH{$type} or Carp::croak "unsupported type passed to AnyEvent::Handle::unshift_read")
593     ->($self, $cb, @_);
594     }
595    
596 root 1.8
597 root 1.28 unshift @{ $self->{queue} }, $cb;
598 root 1.8 $self->_drain_rbuf;
599     }
600 elmex 1.1
601 root 1.28 =item $handle->push_read (type => @args, $cb)
602 elmex 1.1
603 root 1.28 =item $handle->unshift_read (type => @args, $cb)
604 elmex 1.1
605 root 1.28 Instead of providing a callback that parses the data itself you can chose
606     between a number of predefined parsing formats, for chunks of data, lines
607     etc.
608 elmex 1.1
609 root 1.30 Predefined types are (if you have ideas for additional types, feel free to
610     drop by and tell us):
611 root 1.28
612     =over 4
613    
614     =item chunk => $octets, $cb->($self, $data)
615    
616     Invoke the callback only once C<$octets> bytes have been read. Pass the
617     data read to the callback. The callback will never be called with less
618     data.
619    
620     Example: read 2 bytes.
621    
622     $handle->push_read (chunk => 2, sub {
623     warn "yay ", unpack "H*", $_[1];
624     });
625 elmex 1.1
626     =cut
627    
628 root 1.28 register_read_type chunk => sub {
629     my ($self, $cb, $len) = @_;
630 elmex 1.1
631 root 1.8 sub {
632     $len <= length $_[0]{rbuf} or return;
633 elmex 1.12 $cb->($_[0], substr $_[0]{rbuf}, 0, $len, "");
634 root 1.8 1
635     }
636 root 1.28 };
637 root 1.8
638 root 1.28 # compatibility with older API
639 root 1.8 sub push_read_chunk {
640 root 1.28 $_[0]->push_read (chunk => $_[1], $_[2]);
641 root 1.8 }
642 elmex 1.1
643 root 1.8 sub unshift_read_chunk {
644 root 1.28 $_[0]->unshift_read (chunk => $_[1], $_[2]);
645 elmex 1.1 }
646    
647 root 1.28 =item line => [$eol, ]$cb->($self, $line, $eol)
648 elmex 1.1
649 root 1.8 The callback will be called only once a full line (including the end of
650     line marker, C<$eol>) has been read. This line (excluding the end of line
651     marker) will be passed to the callback as second argument (C<$line>), and
652     the end of line marker as the third argument (C<$eol>).
653 elmex 1.1
654 root 1.8 The end of line marker, C<$eol>, can be either a string, in which case it
655     will be interpreted as a fixed record end marker, or it can be a regex
656     object (e.g. created by C<qr>), in which case it is interpreted as a
657     regular expression.
658 elmex 1.1
659 root 1.8 The end of line marker argument C<$eol> is optional, if it is missing (NOT
660     undef), then C<qr|\015?\012|> is used (which is good for most internet
661     protocols).
662 elmex 1.1
663 root 1.8 Partial lines at the end of the stream will never be returned, as they are
664     not marked by the end of line marker.
665 elmex 1.1
666 root 1.8 =cut
667 elmex 1.1
668 root 1.28 register_read_type line => sub {
669     my ($self, $cb, $eol) = @_;
670 elmex 1.1
671 root 1.28 $eol = qr|(\015?\012)| if @_ < 3;
672 root 1.14 $eol = quotemeta $eol unless ref $eol;
673     $eol = qr|^(.*?)($eol)|s;
674 elmex 1.1
675 root 1.8 sub {
676     $_[0]{rbuf} =~ s/$eol// or return;
677 elmex 1.1
678 elmex 1.12 $cb->($_[0], $1, $2);
679 root 1.8 1
680     }
681 root 1.28 };
682 elmex 1.1
683 root 1.28 # compatibility with older API
684 root 1.8 sub push_read_line {
685 root 1.28 my $self = shift;
686     $self->push_read (line => @_);
687 root 1.10 }
688    
689     sub unshift_read_line {
690 root 1.28 my $self = shift;
691     $self->unshift_read (line => @_);
692 root 1.10 }
693    
694 root 1.29 =item netstring => $cb->($string)
695    
696     A netstring (http://cr.yp.to/proto/netstrings.txt, this is not an endorsement).
697    
698     Throws an error with C<$!> set to EBADMSG on format violations.
699    
700     =cut
701    
702     register_read_type netstring => sub {
703     my ($self, $cb) = @_;
704    
705     sub {
706     unless ($_[0]{rbuf} =~ s/^(0|[1-9][0-9]*)://) {
707     if ($_[0]{rbuf} =~ /[^0-9]/) {
708     $! = &Errno::EBADMSG;
709     $self->error;
710     }
711     return;
712     }
713    
714     my $len = $1;
715    
716     $self->unshift_read (chunk => $len, sub {
717     my $string = $_[1];
718     $_[0]->unshift_read (chunk => 1, sub {
719     if ($_[1] eq ",") {
720     $cb->($_[0], $string);
721     } else {
722     $! = &Errno::EBADMSG;
723     $self->error;
724     }
725     });
726     });
727    
728     1
729     }
730     };
731    
732 root 1.28 =back
733    
734 root 1.30 =item AnyEvent::Handle::register_read_type type => $coderef->($self, $cb, @args)
735    
736     This function (not method) lets you add your own types to C<push_read>.
737    
738     Whenever the given C<type> is used, C<push_read> will invoke the code
739     reference with the handle object, the callback and the remaining
740     arguments.
741    
742     The code reference is supposed to return a callback (usually a closure)
743     that works as a plain read callback (see C<< ->push_read ($cb) >>).
744    
745     It should invoke the passed callback when it is done reading (remember to
746     pass C<$self> as first argument as all other callbacks do that).
747    
748     Note that this is a function, and all types registered this way will be
749     global, so try to use unique names.
750    
751     For examples, see the source of this module (F<perldoc -m AnyEvent::Handle>,
752     search for C<register_read_type>)).
753    
754 root 1.10 =item $handle->stop_read
755    
756     =item $handle->start_read
757    
758 root 1.18 In rare cases you actually do not want to read anything from the
759 root 1.10 socket. In this case you can call C<stop_read>. Neither C<on_read> no
760 root 1.22 any queued callbacks will be executed then. To start reading again, call
761 root 1.10 C<start_read>.
762    
763     =cut
764    
765     sub stop_read {
766     my ($self) = @_;
767 elmex 1.1
768 root 1.10 delete $self->{rw};
769 root 1.8 }
770 elmex 1.1
771 root 1.10 sub start_read {
772     my ($self) = @_;
773    
774     unless ($self->{rw} || $self->{eof}) {
775     Scalar::Util::weaken $self;
776    
777     $self->{rw} = AnyEvent->io (fh => $self->{fh}, poll => "r", cb => sub {
778 root 1.17 my $rbuf = $self->{filter_r} ? \my $buf : \$self->{rbuf};
779     my $len = sysread $self->{fh}, $$rbuf, $self->{read_size} || 8192, length $$rbuf;
780 root 1.10
781     if ($len > 0) {
782 root 1.17 $self->{filter_r}
783 root 1.18 ? $self->{filter_r}->($self, $rbuf)
784 root 1.17 : $self->_drain_rbuf;
785 root 1.10
786     } elsif (defined $len) {
787 root 1.17 delete $self->{rw};
788 root 1.10 $self->{eof} = 1;
789 root 1.17 $self->_drain_rbuf;
790 root 1.10
791     } elsif ($! != EAGAIN && $! != EINTR) {
792     return $self->error;
793     }
794     });
795     }
796 elmex 1.1 }
797    
798 root 1.19 sub _dotls {
799     my ($self) = @_;
800    
801     if (length $self->{tls_wbuf}) {
802 root 1.22 while ((my $len = Net::SSLeay::write ($self->{tls}, $self->{tls_wbuf})) > 0) {
803     substr $self->{tls_wbuf}, 0, $len, "";
804     }
805 root 1.19 }
806    
807     if (defined (my $buf = Net::SSLeay::BIO_read ($self->{tls_wbio}))) {
808     $self->{wbuf} .= $buf;
809     $self->_drain_wbuf;
810     }
811    
812 root 1.23 while (defined (my $buf = Net::SSLeay::read ($self->{tls}))) {
813     $self->{rbuf} .= $buf;
814     $self->_drain_rbuf;
815     }
816    
817 root 1.24 my $err = Net::SSLeay::get_error ($self->{tls}, -1);
818    
819     if ($err!= Net::SSLeay::ERROR_WANT_READ ()) {
820 root 1.23 if ($err == Net::SSLeay::ERROR_SYSCALL ()) {
821     $self->error;
822     } elsif ($err == Net::SSLeay::ERROR_SSL ()) {
823     $! = &Errno::EIO;
824     $self->error;
825 root 1.19 }
826 root 1.23
827     # all others are fine for our purposes
828 root 1.19 }
829     }
830    
831 root 1.25 =item $handle->starttls ($tls[, $tls_ctx])
832    
833     Instead of starting TLS negotiation immediately when the AnyEvent::Handle
834     object is created, you can also do that at a later time by calling
835     C<starttls>.
836    
837     The first argument is the same as the C<tls> constructor argument (either
838     C<"connect">, C<"accept"> or an existing Net::SSLeay object).
839    
840     The second argument is the optional C<Net::SSLeay::CTX> object that is
841     used when AnyEvent::Handle has to create its own TLS connection object.
842    
843     =cut
844    
845 root 1.19 # TODO: maybe document...
846     sub starttls {
847     my ($self, $ssl, $ctx) = @_;
848    
849 root 1.25 $self->stoptls;
850    
851 root 1.19 if ($ssl eq "accept") {
852     $ssl = Net::SSLeay::new ($ctx || TLS_CTX ());
853     Net::SSLeay::set_accept_state ($ssl);
854     } elsif ($ssl eq "connect") {
855     $ssl = Net::SSLeay::new ($ctx || TLS_CTX ());
856     Net::SSLeay::set_connect_state ($ssl);
857     }
858    
859     $self->{tls} = $ssl;
860    
861 root 1.21 # basically, this is deep magic (because SSL_read should have the same issues)
862     # but the openssl maintainers basically said: "trust us, it just works".
863     # (unfortunately, we have to hardcode constants because the abysmally misdesigned
864     # and mismaintained ssleay-module doesn't even offer them).
865 root 1.27 # http://www.mail-archive.com/openssl-dev@openssl.org/msg22420.html
866 root 1.21 Net::SSLeay::CTX_set_mode ($self->{tls},
867     (eval { Net::SSLeay::MODE_ENABLE_PARTIAL_WRITE () } || 1)
868     | (eval { Net::SSLeay::MODE_ACCEPT_MOVING_WRITE_BUFFER () } || 2));
869    
870 root 1.19 $self->{tls_rbio} = Net::SSLeay::BIO_new (Net::SSLeay::BIO_s_mem ());
871     $self->{tls_wbio} = Net::SSLeay::BIO_new (Net::SSLeay::BIO_s_mem ());
872    
873     Net::SSLeay::set_bio ($ssl, $self->{tls_rbio}, $self->{tls_wbio});
874    
875     $self->{filter_w} = sub {
876     $_[0]{tls_wbuf} .= ${$_[1]};
877     &_dotls;
878     };
879     $self->{filter_r} = sub {
880     Net::SSLeay::BIO_write ($_[0]{tls_rbio}, ${$_[1]});
881     &_dotls;
882     };
883     }
884    
885 root 1.25 =item $handle->stoptls
886    
887     Destroys the SSL connection, if any. Partial read or write data will be
888     lost.
889    
890     =cut
891    
892     sub stoptls {
893     my ($self) = @_;
894    
895     Net::SSLeay::free (delete $self->{tls}) if $self->{tls};
896     delete $self->{tls_rbio};
897     delete $self->{tls_wbio};
898     delete $self->{tls_wbuf};
899     delete $self->{filter_r};
900     delete $self->{filter_w};
901     }
902    
903 root 1.19 sub DESTROY {
904     my $self = shift;
905    
906 root 1.25 $self->stoptls;
907 root 1.19 }
908    
909     =item AnyEvent::Handle::TLS_CTX
910    
911     This function creates and returns the Net::SSLeay::CTX object used by
912     default for TLS mode.
913    
914     The context is created like this:
915    
916     Net::SSLeay::load_error_strings;
917     Net::SSLeay::SSLeay_add_ssl_algorithms;
918     Net::SSLeay::randomize;
919    
920     my $CTX = Net::SSLeay::CTX_new;
921    
922     Net::SSLeay::CTX_set_options $CTX, Net::SSLeay::OP_ALL
923    
924     =cut
925    
926     our $TLS_CTX;
927    
928     sub TLS_CTX() {
929     $TLS_CTX || do {
930     require Net::SSLeay;
931    
932     Net::SSLeay::load_error_strings ();
933     Net::SSLeay::SSLeay_add_ssl_algorithms ();
934     Net::SSLeay::randomize ();
935    
936     $TLS_CTX = Net::SSLeay::CTX_new ();
937    
938     Net::SSLeay::CTX_set_options ($TLS_CTX, Net::SSLeay::OP_ALL ());
939    
940     $TLS_CTX
941     }
942     }
943    
944 elmex 1.1 =back
945    
946     =head1 AUTHOR
947    
948 root 1.8 Robin Redeker C<< <elmex at ta-sa.org> >>, Marc Lehmann <schmorp@schmorp.de>.
949 elmex 1.1
950     =cut
951    
952     1; # End of AnyEvent::Handle