ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent/lib/AnyEvent/Handle.pm
Revision: 1.33
Committed: Sun May 25 03:03:51 2008 UTC (15 years, 11 months ago) by root
Branch: MAIN
CVS Tags: rel-4_03
Changes since 1.32: +3 -3 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 elmex 1.1 package AnyEvent::Handle;
2    
3 elmex 1.6 no warnings;
4 elmex 1.1 use strict;
5    
6 root 1.8 use AnyEvent ();
7 root 1.33 use AnyEvent::Util qw(WSAWOULDBLOCK);
8 root 1.8 use Scalar::Util ();
9     use Carp ();
10     use Fcntl ();
11 elmex 1.1 use Errno qw/EAGAIN EINTR/;
12    
13     =head1 NAME
14    
15 root 1.22 AnyEvent::Handle - non-blocking I/O on file handles via AnyEvent
16 elmex 1.1
17     =cut
18    
19 root 1.15 our $VERSION = '0.04';
20 elmex 1.1
21     =head1 SYNOPSIS
22    
23     use AnyEvent;
24     use AnyEvent::Handle;
25    
26     my $cv = AnyEvent->condvar;
27    
28 root 1.31 my $handle =
29 elmex 1.2 AnyEvent::Handle->new (
30     fh => \*STDIN,
31     on_eof => sub {
32     $cv->broadcast;
33     },
34     );
35    
36 root 1.31 # send some request line
37     $handle->push_write ("getinfo\015\012");
38    
39     # read the response line
40     $handle->push_read (line => sub {
41     my ($handle, $line) = @_;
42     warn "read line <$line>\n";
43     $cv->send;
44     });
45    
46     $cv->recv;
47 elmex 1.1
48     =head1 DESCRIPTION
49    
50 root 1.8 This module is a helper module to make it easier to do event-based I/O on
51 elmex 1.13 filehandles. For utility functions for doing non-blocking connects and accepts
52     on sockets see L<AnyEvent::Util>.
53 root 1.8
54     In the following, when the documentation refers to of "bytes" then this
55     means characters. As sysread and syswrite are used for all I/O, their
56     treatment of characters applies to this module as well.
57 elmex 1.1
58 root 1.8 All callbacks will be invoked with the handle object as their first
59     argument.
60 elmex 1.1
61     =head1 METHODS
62    
63     =over 4
64    
65     =item B<new (%args)>
66    
67 root 1.8 The constructor supports these arguments (all as key => value pairs).
68 elmex 1.1
69     =over 4
70    
71 root 1.8 =item fh => $filehandle [MANDATORY]
72 elmex 1.1
73     The filehandle this L<AnyEvent::Handle> object will operate on.
74    
75 root 1.8 NOTE: The filehandle will be set to non-blocking (using
76     AnyEvent::Util::fh_nonblocking).
77    
78 root 1.16 =item on_eof => $cb->($self)
79 root 1.10
80     Set the callback to be called on EOF.
81 root 1.8
82 root 1.16 While not mandatory, it is highly recommended to set an eof callback,
83     otherwise you might end up with a closed socket while you are still
84     waiting for data.
85    
86 root 1.10 =item on_error => $cb->($self)
87    
88     This is the fatal error callback, that is called when, well, a fatal error
89 elmex 1.20 occurs, such as not being able to resolve the hostname, failure to connect
90 root 1.10 or a read error.
91 root 1.8
92     The object will not be in a usable state when this callback has been
93     called.
94    
95 root 1.10 On callback entrance, the value of C<$!> contains the operating system
96 root 1.29 error (or C<ENOSPC>, C<EPIPE> or C<EBADMSG>).
97 root 1.8
98 root 1.10 While not mandatory, it is I<highly> recommended to set this callback, as
99     you will not be notified of errors otherwise. The default simply calls
100     die.
101 root 1.8
102     =item on_read => $cb->($self)
103    
104     This sets the default read callback, which is called when data arrives
105 root 1.10 and no read request is in the queue.
106 root 1.8
107     To access (and remove data from) the read buffer, use the C<< ->rbuf >>
108 elmex 1.20 method or access the C<$self->{rbuf}> member directly.
109 root 1.8
110     When an EOF condition is detected then AnyEvent::Handle will first try to
111     feed all the remaining data to the queued callbacks and C<on_read> before
112     calling the C<on_eof> callback. If no progress can be made, then a fatal
113     error will be raised (with C<$!> set to C<EPIPE>).
114 elmex 1.1
115 root 1.8 =item on_drain => $cb->()
116 elmex 1.1
117 root 1.8 This sets the callback that is called when the write buffer becomes empty
118     (or when the callback is set and the buffer is empty already).
119 elmex 1.1
120 root 1.8 To append to the write buffer, use the C<< ->push_write >> method.
121 elmex 1.2
122 root 1.8 =item rbuf_max => <bytes>
123 elmex 1.2
124 root 1.8 If defined, then a fatal error will be raised (with C<$!> set to C<ENOSPC>)
125     when the read buffer ever (strictly) exceeds this size. This is useful to
126     avoid denial-of-service attacks.
127 elmex 1.2
128 root 1.8 For example, a server accepting connections from untrusted sources should
129     be configured to accept only so-and-so much data that it cannot act on
130     (for example, when expecting a line, an attacker could send an unlimited
131     amount of data without a callback ever being called as long as the line
132     isn't finished).
133 elmex 1.2
134 root 1.8 =item read_size => <bytes>
135 elmex 1.2
136 root 1.8 The default read block size (the amount of bytes this module will try to read
137     on each [loop iteration). Default: C<4096>.
138    
139     =item low_water_mark => <bytes>
140    
141     Sets the amount of bytes (default: C<0>) that make up an "empty" write
142     buffer: If the write reaches this size or gets even samller it is
143     considered empty.
144 elmex 1.2
145 root 1.19 =item tls => "accept" | "connect" | Net::SSLeay::SSL object
146    
147     When this parameter is given, it enables TLS (SSL) mode, that means it
148     will start making tls handshake and will transparently encrypt/decrypt
149     data.
150    
151 root 1.26 TLS mode requires Net::SSLeay to be installed (it will be loaded
152     automatically when you try to create a TLS handle).
153    
154 root 1.19 For the TLS server side, use C<accept>, and for the TLS client side of a
155     connection, use C<connect> mode.
156    
157     You can also provide your own TLS connection object, but you have
158     to make sure that you call either C<Net::SSLeay::set_connect_state>
159     or C<Net::SSLeay::set_accept_state> on it before you pass it to
160     AnyEvent::Handle.
161    
162 root 1.26 See the C<starttls> method if you need to start TLs negotiation later.
163    
164 root 1.19 =item tls_ctx => $ssl_ctx
165    
166     Use the given Net::SSLeay::CTX object to create the new TLS connection
167     (unless a connection object was specified directly). If this parameter is
168     missing, then AnyEvent::Handle will use C<AnyEvent::Handle::TLS_CTX>.
169    
170 elmex 1.1 =back
171    
172     =cut
173    
174     sub new {
175 root 1.8 my $class = shift;
176    
177     my $self = bless { @_ }, $class;
178    
179     $self->{fh} or Carp::croak "mandatory argument fh is missing";
180    
181     AnyEvent::Util::fh_nonblocking $self->{fh}, 1;
182 elmex 1.1
183 root 1.19 if ($self->{tls}) {
184     require Net::SSLeay;
185     $self->starttls (delete $self->{tls}, delete $self->{tls_ctx});
186     }
187    
188 root 1.16 $self->on_eof (delete $self->{on_eof} ) if $self->{on_eof};
189 root 1.10 $self->on_error (delete $self->{on_error}) if $self->{on_error};
190 root 1.8 $self->on_drain (delete $self->{on_drain}) if $self->{on_drain};
191     $self->on_read (delete $self->{on_read} ) if $self->{on_read};
192 elmex 1.1
193 root 1.10 $self->start_read;
194    
195 root 1.8 $self
196     }
197 elmex 1.2
198 root 1.8 sub _shutdown {
199     my ($self) = @_;
200 elmex 1.2
201 root 1.8 delete $self->{rw};
202     delete $self->{ww};
203     delete $self->{fh};
204     }
205    
206     sub error {
207     my ($self) = @_;
208    
209     {
210     local $!;
211     $self->_shutdown;
212 elmex 1.1 }
213    
214 root 1.10 if ($self->{on_error}) {
215     $self->{on_error}($self);
216     } else {
217 root 1.29 Carp::croak "AnyEvent::Handle uncaught fatal error: $!";
218 root 1.10 }
219 elmex 1.1 }
220    
221 root 1.8 =item $fh = $handle->fh
222 elmex 1.1
223 root 1.22 This method returns the file handle of the L<AnyEvent::Handle> object.
224 elmex 1.1
225     =cut
226    
227     sub fh { $_[0]->{fh} }
228    
229 root 1.8 =item $handle->on_error ($cb)
230 elmex 1.1
231 root 1.8 Replace the current C<on_error> callback (see the C<on_error> constructor argument).
232 elmex 1.1
233 root 1.8 =cut
234    
235     sub on_error {
236     $_[0]{on_error} = $_[1];
237     }
238    
239     =item $handle->on_eof ($cb)
240    
241     Replace the current C<on_eof> callback (see the C<on_eof> constructor argument).
242 elmex 1.1
243     =cut
244    
245 root 1.8 sub on_eof {
246     $_[0]{on_eof} = $_[1];
247     }
248    
249 root 1.9 #############################################################################
250    
251     =back
252    
253     =head2 WRITE QUEUE
254    
255     AnyEvent::Handle manages two queues per handle, one for writing and one
256     for reading.
257    
258     The write queue is very simple: you can add data to its end, and
259     AnyEvent::Handle will automatically try to get rid of it for you.
260    
261 elmex 1.20 When data could be written and the write buffer is shorter then the low
262 root 1.9 water mark, the C<on_drain> callback will be invoked.
263    
264     =over 4
265    
266 root 1.8 =item $handle->on_drain ($cb)
267    
268     Sets the C<on_drain> callback or clears it (see the description of
269     C<on_drain> in the constructor).
270    
271     =cut
272    
273     sub on_drain {
274 elmex 1.1 my ($self, $cb) = @_;
275    
276 root 1.8 $self->{on_drain} = $cb;
277    
278     $cb->($self)
279     if $cb && $self->{low_water_mark} >= length $self->{wbuf};
280     }
281    
282     =item $handle->push_write ($data)
283    
284     Queues the given scalar to be written. You can push as much data as you
285     want (only limited by the available memory), as C<AnyEvent::Handle>
286     buffers it independently of the kernel.
287    
288     =cut
289    
290 root 1.17 sub _drain_wbuf {
291     my ($self) = @_;
292 root 1.8
293 root 1.29 if (!$self->{ww} && length $self->{wbuf}) {
294 root 1.8 Scalar::Util::weaken $self;
295     my $cb = sub {
296     my $len = syswrite $self->{fh}, $self->{wbuf};
297    
298 root 1.29 if ($len >= 0) {
299 root 1.8 substr $self->{wbuf}, 0, $len, "";
300    
301     $self->{on_drain}($self)
302     if $self->{low_water_mark} >= length $self->{wbuf}
303     && $self->{on_drain};
304    
305     delete $self->{ww} unless length $self->{wbuf};
306 root 1.33 } elsif ($! != EAGAIN && $! != EINTR && $! != WSAWOULDBLOCK) {
307 root 1.8 $self->error;
308 elmex 1.1 }
309 root 1.8 };
310    
311     $self->{ww} = AnyEvent->io (fh => $self->{fh}, poll => "w", cb => $cb);
312    
313     $cb->($self);
314     };
315     }
316    
317 root 1.30 our %WH;
318    
319     sub register_write_type($$) {
320     $WH{$_[0]} = $_[1];
321     }
322    
323 root 1.17 sub push_write {
324     my $self = shift;
325    
326 root 1.29 if (@_ > 1) {
327     my $type = shift;
328    
329     @_ = ($WH{$type} or Carp::croak "unsupported type passed to AnyEvent::Handle::push_write")
330     ->($self, @_);
331     }
332    
333 root 1.17 if ($self->{filter_w}) {
334 root 1.18 $self->{filter_w}->($self, \$_[0]);
335 root 1.17 } else {
336     $self->{wbuf} .= $_[0];
337     $self->_drain_wbuf;
338     }
339     }
340    
341 root 1.29 =item $handle->push_write (type => @args)
342    
343     =item $handle->unshift_write (type => @args)
344    
345     Instead of formatting your data yourself, you can also let this module do
346     the job by specifying a type and type-specific arguments.
347    
348 root 1.30 Predefined types are (if you have ideas for additional types, feel free to
349     drop by and tell us):
350 root 1.29
351     =over 4
352    
353     =item netstring => $string
354    
355     Formats the given value as netstring
356     (http://cr.yp.to/proto/netstrings.txt, this is not a recommendation to use them).
357    
358 root 1.30 =back
359    
360 root 1.29 =cut
361    
362     register_write_type netstring => sub {
363     my ($self, $string) = @_;
364    
365     sprintf "%d:%s,", (length $string), $string
366     };
367    
368 root 1.30 =item AnyEvent::Handle::register_write_type type => $coderef->($self, @args)
369    
370     This function (not method) lets you add your own types to C<push_write>.
371     Whenever the given C<type> is used, C<push_write> will invoke the code
372     reference with the handle object and the remaining arguments.
373 root 1.29
374 root 1.30 The code reference is supposed to return a single octet string that will
375     be appended to the write buffer.
376 root 1.29
377 root 1.30 Note that this is a function, and all types registered this way will be
378     global, so try to use unique names.
379 root 1.29
380 root 1.30 =cut
381 root 1.29
382 root 1.8 #############################################################################
383    
384 root 1.9 =back
385    
386     =head2 READ QUEUE
387    
388     AnyEvent::Handle manages two queues per handle, one for writing and one
389     for reading.
390    
391     The read queue is more complex than the write queue. It can be used in two
392     ways, the "simple" way, using only C<on_read> and the "complex" way, using
393     a queue.
394    
395     In the simple case, you just install an C<on_read> callback and whenever
396     new data arrives, it will be called. You can then remove some data (if
397     enough is there) from the read buffer (C<< $handle->rbuf >>) if you want
398     or not.
399    
400     In the more complex case, you want to queue multiple callbacks. In this
401     case, AnyEvent::Handle will call the first queued callback each time new
402     data arrives and removes it when it has done its job (see C<push_read>,
403     below).
404    
405     This way you can, for example, push three line-reads, followed by reading
406     a chunk of data, and AnyEvent::Handle will execute them in order.
407    
408     Example 1: EPP protocol parser. EPP sends 4 byte length info, followed by
409     the specified number of bytes which give an XML datagram.
410    
411     # in the default state, expect some header bytes
412     $handle->on_read (sub {
413     # some data is here, now queue the length-header-read (4 octets)
414     shift->unshift_read_chunk (4, sub {
415     # header arrived, decode
416     my $len = unpack "N", $_[1];
417    
418     # now read the payload
419     shift->unshift_read_chunk ($len, sub {
420     my $xml = $_[1];
421     # handle xml
422     });
423     });
424     });
425    
426     Example 2: Implement a client for a protocol that replies either with
427     "OK" and another line or "ERROR" for one request, and 64 bytes for the
428     second request. Due tot he availability of a full queue, we can just
429     pipeline sending both requests and manipulate the queue as necessary in
430     the callbacks:
431    
432     # request one
433     $handle->push_write ("request 1\015\012");
434    
435     # we expect "ERROR" or "OK" as response, so push a line read
436     $handle->push_read_line (sub {
437     # if we got an "OK", we have to _prepend_ another line,
438     # so it will be read before the second request reads its 64 bytes
439     # which are already in the queue when this callback is called
440     # we don't do this in case we got an error
441     if ($_[1] eq "OK") {
442     $_[0]->unshift_read_line (sub {
443     my $response = $_[1];
444     ...
445     });
446     }
447     });
448    
449     # request two
450     $handle->push_write ("request 2\015\012");
451    
452     # simply read 64 bytes, always
453     $handle->push_read_chunk (64, sub {
454     my $response = $_[1];
455     ...
456     });
457    
458     =over 4
459    
460 root 1.10 =cut
461    
462 root 1.8 sub _drain_rbuf {
463     my ($self) = @_;
464 elmex 1.1
465 root 1.17 if (
466     defined $self->{rbuf_max}
467     && $self->{rbuf_max} < length $self->{rbuf}
468     ) {
469     $! = &Errno::ENOSPC; return $self->error;
470     }
471    
472 root 1.11 return if $self->{in_drain};
473 root 1.8 local $self->{in_drain} = 1;
474 elmex 1.1
475 root 1.8 while (my $len = length $self->{rbuf}) {
476     no strict 'refs';
477 root 1.10 if (my $cb = shift @{ $self->{queue} }) {
478 root 1.29 unless ($cb->($self)) {
479 root 1.10 if ($self->{eof}) {
480     # no progress can be made (not enough data and no data forthcoming)
481     $! = &Errno::EPIPE; return $self->error;
482     }
483    
484     unshift @{ $self->{queue} }, $cb;
485 root 1.8 return;
486     }
487     } elsif ($self->{on_read}) {
488     $self->{on_read}($self);
489    
490     if (
491     $self->{eof} # if no further data will arrive
492     && $len == length $self->{rbuf} # and no data has been consumed
493     && !@{ $self->{queue} } # and the queue is still empty
494     && $self->{on_read} # and we still want to read data
495     ) {
496     # then no progress can be made
497     $! = &Errno::EPIPE; return $self->error;
498 elmex 1.1 }
499 root 1.8 } else {
500     # read side becomes idle
501     delete $self->{rw};
502     return;
503     }
504     }
505    
506     if ($self->{eof}) {
507     $self->_shutdown;
508 root 1.16 $self->{on_eof}($self)
509     if $self->{on_eof};
510 root 1.8 }
511 elmex 1.1 }
512    
513 root 1.8 =item $handle->on_read ($cb)
514 elmex 1.1
515 root 1.8 This replaces the currently set C<on_read> callback, or clears it (when
516     the new callback is C<undef>). See the description of C<on_read> in the
517     constructor.
518 elmex 1.1
519 root 1.8 =cut
520    
521     sub on_read {
522     my ($self, $cb) = @_;
523 elmex 1.1
524 root 1.8 $self->{on_read} = $cb;
525 elmex 1.1 }
526    
527 root 1.8 =item $handle->rbuf
528    
529     Returns the read buffer (as a modifiable lvalue).
530 elmex 1.1
531 root 1.8 You can access the read buffer directly as the C<< ->{rbuf} >> member, if
532     you want.
533 elmex 1.1
534 root 1.8 NOTE: The read buffer should only be used or modified if the C<on_read>,
535     C<push_read> or C<unshift_read> methods are used. The other read methods
536     automatically manage the read buffer.
537 elmex 1.1
538     =cut
539    
540 elmex 1.2 sub rbuf : lvalue {
541 root 1.8 $_[0]{rbuf}
542 elmex 1.2 }
543 elmex 1.1
544 root 1.8 =item $handle->push_read ($cb)
545    
546     =item $handle->unshift_read ($cb)
547    
548     Append the given callback to the end of the queue (C<push_read>) or
549     prepend it (C<unshift_read>).
550    
551     The callback is called each time some additional read data arrives.
552 elmex 1.1
553 elmex 1.20 It must check whether enough data is in the read buffer already.
554 elmex 1.1
555 root 1.8 If not enough data is available, it must return the empty list or a false
556     value, in which case it will be called repeatedly until enough data is
557     available (or an error condition is detected).
558    
559     If enough data was available, then the callback must remove all data it is
560     interested in (which can be none at all) and return a true value. After returning
561     true, it will be removed from the queue.
562 elmex 1.1
563     =cut
564    
565 root 1.30 our %RH;
566    
567     sub register_read_type($$) {
568     $RH{$_[0]} = $_[1];
569     }
570    
571 root 1.8 sub push_read {
572 root 1.28 my $self = shift;
573     my $cb = pop;
574    
575     if (@_) {
576     my $type = shift;
577    
578     $cb = ($RH{$type} or Carp::croak "unsupported type passed to AnyEvent::Handle::push_read")
579     ->($self, $cb, @_);
580     }
581 elmex 1.1
582 root 1.8 push @{ $self->{queue} }, $cb;
583     $self->_drain_rbuf;
584 elmex 1.1 }
585    
586 root 1.8 sub unshift_read {
587 root 1.28 my $self = shift;
588     my $cb = pop;
589    
590     if (@_) {
591     my $type = shift;
592    
593     $cb = ($RH{$type} or Carp::croak "unsupported type passed to AnyEvent::Handle::unshift_read")
594     ->($self, $cb, @_);
595     }
596    
597 root 1.8
598 root 1.28 unshift @{ $self->{queue} }, $cb;
599 root 1.8 $self->_drain_rbuf;
600     }
601 elmex 1.1
602 root 1.28 =item $handle->push_read (type => @args, $cb)
603 elmex 1.1
604 root 1.28 =item $handle->unshift_read (type => @args, $cb)
605 elmex 1.1
606 root 1.28 Instead of providing a callback that parses the data itself you can chose
607     between a number of predefined parsing formats, for chunks of data, lines
608     etc.
609 elmex 1.1
610 root 1.30 Predefined types are (if you have ideas for additional types, feel free to
611     drop by and tell us):
612 root 1.28
613     =over 4
614    
615     =item chunk => $octets, $cb->($self, $data)
616    
617     Invoke the callback only once C<$octets> bytes have been read. Pass the
618     data read to the callback. The callback will never be called with less
619     data.
620    
621     Example: read 2 bytes.
622    
623     $handle->push_read (chunk => 2, sub {
624     warn "yay ", unpack "H*", $_[1];
625     });
626 elmex 1.1
627     =cut
628    
629 root 1.28 register_read_type chunk => sub {
630     my ($self, $cb, $len) = @_;
631 elmex 1.1
632 root 1.8 sub {
633     $len <= length $_[0]{rbuf} or return;
634 elmex 1.12 $cb->($_[0], substr $_[0]{rbuf}, 0, $len, "");
635 root 1.8 1
636     }
637 root 1.28 };
638 root 1.8
639 root 1.28 # compatibility with older API
640 root 1.8 sub push_read_chunk {
641 root 1.28 $_[0]->push_read (chunk => $_[1], $_[2]);
642 root 1.8 }
643 elmex 1.1
644 root 1.8 sub unshift_read_chunk {
645 root 1.28 $_[0]->unshift_read (chunk => $_[1], $_[2]);
646 elmex 1.1 }
647    
648 root 1.28 =item line => [$eol, ]$cb->($self, $line, $eol)
649 elmex 1.1
650 root 1.8 The callback will be called only once a full line (including the end of
651     line marker, C<$eol>) has been read. This line (excluding the end of line
652     marker) will be passed to the callback as second argument (C<$line>), and
653     the end of line marker as the third argument (C<$eol>).
654 elmex 1.1
655 root 1.8 The end of line marker, C<$eol>, can be either a string, in which case it
656     will be interpreted as a fixed record end marker, or it can be a regex
657     object (e.g. created by C<qr>), in which case it is interpreted as a
658     regular expression.
659 elmex 1.1
660 root 1.8 The end of line marker argument C<$eol> is optional, if it is missing (NOT
661     undef), then C<qr|\015?\012|> is used (which is good for most internet
662     protocols).
663 elmex 1.1
664 root 1.8 Partial lines at the end of the stream will never be returned, as they are
665     not marked by the end of line marker.
666 elmex 1.1
667 root 1.8 =cut
668 elmex 1.1
669 root 1.28 register_read_type line => sub {
670     my ($self, $cb, $eol) = @_;
671 elmex 1.1
672 root 1.28 $eol = qr|(\015?\012)| if @_ < 3;
673 root 1.14 $eol = quotemeta $eol unless ref $eol;
674     $eol = qr|^(.*?)($eol)|s;
675 elmex 1.1
676 root 1.8 sub {
677     $_[0]{rbuf} =~ s/$eol// or return;
678 elmex 1.1
679 elmex 1.12 $cb->($_[0], $1, $2);
680 root 1.8 1
681     }
682 root 1.28 };
683 elmex 1.1
684 root 1.28 # compatibility with older API
685 root 1.8 sub push_read_line {
686 root 1.28 my $self = shift;
687     $self->push_read (line => @_);
688 root 1.10 }
689    
690     sub unshift_read_line {
691 root 1.28 my $self = shift;
692     $self->unshift_read (line => @_);
693 root 1.10 }
694    
695 root 1.29 =item netstring => $cb->($string)
696    
697     A netstring (http://cr.yp.to/proto/netstrings.txt, this is not an endorsement).
698    
699     Throws an error with C<$!> set to EBADMSG on format violations.
700    
701     =cut
702    
703     register_read_type netstring => sub {
704     my ($self, $cb) = @_;
705    
706     sub {
707     unless ($_[0]{rbuf} =~ s/^(0|[1-9][0-9]*)://) {
708     if ($_[0]{rbuf} =~ /[^0-9]/) {
709     $! = &Errno::EBADMSG;
710     $self->error;
711     }
712     return;
713     }
714    
715     my $len = $1;
716    
717     $self->unshift_read (chunk => $len, sub {
718     my $string = $_[1];
719     $_[0]->unshift_read (chunk => 1, sub {
720     if ($_[1] eq ",") {
721     $cb->($_[0], $string);
722     } else {
723     $! = &Errno::EBADMSG;
724     $self->error;
725     }
726     });
727     });
728    
729     1
730     }
731     };
732    
733 root 1.28 =back
734    
735 root 1.30 =item AnyEvent::Handle::register_read_type type => $coderef->($self, $cb, @args)
736    
737     This function (not method) lets you add your own types to C<push_read>.
738    
739     Whenever the given C<type> is used, C<push_read> will invoke the code
740     reference with the handle object, the callback and the remaining
741     arguments.
742    
743     The code reference is supposed to return a callback (usually a closure)
744     that works as a plain read callback (see C<< ->push_read ($cb) >>).
745    
746     It should invoke the passed callback when it is done reading (remember to
747     pass C<$self> as first argument as all other callbacks do that).
748    
749     Note that this is a function, and all types registered this way will be
750     global, so try to use unique names.
751    
752     For examples, see the source of this module (F<perldoc -m AnyEvent::Handle>,
753     search for C<register_read_type>)).
754    
755 root 1.10 =item $handle->stop_read
756    
757     =item $handle->start_read
758    
759 root 1.18 In rare cases you actually do not want to read anything from the
760 root 1.10 socket. In this case you can call C<stop_read>. Neither C<on_read> no
761 root 1.22 any queued callbacks will be executed then. To start reading again, call
762 root 1.10 C<start_read>.
763    
764     =cut
765    
766     sub stop_read {
767     my ($self) = @_;
768 elmex 1.1
769 root 1.10 delete $self->{rw};
770 root 1.8 }
771 elmex 1.1
772 root 1.10 sub start_read {
773     my ($self) = @_;
774    
775     unless ($self->{rw} || $self->{eof}) {
776     Scalar::Util::weaken $self;
777    
778     $self->{rw} = AnyEvent->io (fh => $self->{fh}, poll => "r", cb => sub {
779 root 1.17 my $rbuf = $self->{filter_r} ? \my $buf : \$self->{rbuf};
780     my $len = sysread $self->{fh}, $$rbuf, $self->{read_size} || 8192, length $$rbuf;
781 root 1.10
782     if ($len > 0) {
783 root 1.17 $self->{filter_r}
784 root 1.18 ? $self->{filter_r}->($self, $rbuf)
785 root 1.17 : $self->_drain_rbuf;
786 root 1.10
787     } elsif (defined $len) {
788 root 1.17 delete $self->{rw};
789 root 1.10 $self->{eof} = 1;
790 root 1.17 $self->_drain_rbuf;
791 root 1.10
792 root 1.33 } elsif ($! != EAGAIN && $! != EINTR && $! != &AnyEvent::Util::WSAWOULDBLOCK) {
793 root 1.10 return $self->error;
794     }
795     });
796     }
797 elmex 1.1 }
798    
799 root 1.19 sub _dotls {
800     my ($self) = @_;
801    
802     if (length $self->{tls_wbuf}) {
803 root 1.22 while ((my $len = Net::SSLeay::write ($self->{tls}, $self->{tls_wbuf})) > 0) {
804     substr $self->{tls_wbuf}, 0, $len, "";
805     }
806 root 1.19 }
807    
808     if (defined (my $buf = Net::SSLeay::BIO_read ($self->{tls_wbio}))) {
809     $self->{wbuf} .= $buf;
810     $self->_drain_wbuf;
811     }
812    
813 root 1.23 while (defined (my $buf = Net::SSLeay::read ($self->{tls}))) {
814     $self->{rbuf} .= $buf;
815     $self->_drain_rbuf;
816     }
817    
818 root 1.24 my $err = Net::SSLeay::get_error ($self->{tls}, -1);
819    
820     if ($err!= Net::SSLeay::ERROR_WANT_READ ()) {
821 root 1.23 if ($err == Net::SSLeay::ERROR_SYSCALL ()) {
822     $self->error;
823     } elsif ($err == Net::SSLeay::ERROR_SSL ()) {
824     $! = &Errno::EIO;
825     $self->error;
826 root 1.19 }
827 root 1.23
828     # all others are fine for our purposes
829 root 1.19 }
830     }
831    
832 root 1.25 =item $handle->starttls ($tls[, $tls_ctx])
833    
834     Instead of starting TLS negotiation immediately when the AnyEvent::Handle
835     object is created, you can also do that at a later time by calling
836     C<starttls>.
837    
838     The first argument is the same as the C<tls> constructor argument (either
839     C<"connect">, C<"accept"> or an existing Net::SSLeay object).
840    
841     The second argument is the optional C<Net::SSLeay::CTX> object that is
842     used when AnyEvent::Handle has to create its own TLS connection object.
843    
844     =cut
845    
846 root 1.19 # TODO: maybe document...
847     sub starttls {
848     my ($self, $ssl, $ctx) = @_;
849    
850 root 1.25 $self->stoptls;
851    
852 root 1.19 if ($ssl eq "accept") {
853     $ssl = Net::SSLeay::new ($ctx || TLS_CTX ());
854     Net::SSLeay::set_accept_state ($ssl);
855     } elsif ($ssl eq "connect") {
856     $ssl = Net::SSLeay::new ($ctx || TLS_CTX ());
857     Net::SSLeay::set_connect_state ($ssl);
858     }
859    
860     $self->{tls} = $ssl;
861    
862 root 1.21 # basically, this is deep magic (because SSL_read should have the same issues)
863     # but the openssl maintainers basically said: "trust us, it just works".
864     # (unfortunately, we have to hardcode constants because the abysmally misdesigned
865     # and mismaintained ssleay-module doesn't even offer them).
866 root 1.27 # http://www.mail-archive.com/openssl-dev@openssl.org/msg22420.html
867 root 1.21 Net::SSLeay::CTX_set_mode ($self->{tls},
868     (eval { Net::SSLeay::MODE_ENABLE_PARTIAL_WRITE () } || 1)
869     | (eval { Net::SSLeay::MODE_ACCEPT_MOVING_WRITE_BUFFER () } || 2));
870    
871 root 1.19 $self->{tls_rbio} = Net::SSLeay::BIO_new (Net::SSLeay::BIO_s_mem ());
872     $self->{tls_wbio} = Net::SSLeay::BIO_new (Net::SSLeay::BIO_s_mem ());
873    
874     Net::SSLeay::set_bio ($ssl, $self->{tls_rbio}, $self->{tls_wbio});
875    
876     $self->{filter_w} = sub {
877     $_[0]{tls_wbuf} .= ${$_[1]};
878     &_dotls;
879     };
880     $self->{filter_r} = sub {
881     Net::SSLeay::BIO_write ($_[0]{tls_rbio}, ${$_[1]});
882     &_dotls;
883     };
884     }
885    
886 root 1.25 =item $handle->stoptls
887    
888     Destroys the SSL connection, if any. Partial read or write data will be
889     lost.
890    
891     =cut
892    
893     sub stoptls {
894     my ($self) = @_;
895    
896     Net::SSLeay::free (delete $self->{tls}) if $self->{tls};
897     delete $self->{tls_rbio};
898     delete $self->{tls_wbio};
899     delete $self->{tls_wbuf};
900     delete $self->{filter_r};
901     delete $self->{filter_w};
902     }
903    
904 root 1.19 sub DESTROY {
905     my $self = shift;
906    
907 root 1.25 $self->stoptls;
908 root 1.19 }
909    
910     =item AnyEvent::Handle::TLS_CTX
911    
912     This function creates and returns the Net::SSLeay::CTX object used by
913     default for TLS mode.
914    
915     The context is created like this:
916    
917     Net::SSLeay::load_error_strings;
918     Net::SSLeay::SSLeay_add_ssl_algorithms;
919     Net::SSLeay::randomize;
920    
921     my $CTX = Net::SSLeay::CTX_new;
922    
923     Net::SSLeay::CTX_set_options $CTX, Net::SSLeay::OP_ALL
924    
925     =cut
926    
927     our $TLS_CTX;
928    
929     sub TLS_CTX() {
930     $TLS_CTX || do {
931     require Net::SSLeay;
932    
933     Net::SSLeay::load_error_strings ();
934     Net::SSLeay::SSLeay_add_ssl_algorithms ();
935     Net::SSLeay::randomize ();
936    
937     $TLS_CTX = Net::SSLeay::CTX_new ();
938    
939     Net::SSLeay::CTX_set_options ($TLS_CTX, Net::SSLeay::OP_ALL ());
940    
941     $TLS_CTX
942     }
943     }
944    
945 elmex 1.1 =back
946    
947     =head1 AUTHOR
948    
949 root 1.8 Robin Redeker C<< <elmex at ta-sa.org> >>, Marc Lehmann <schmorp@schmorp.de>.
950 elmex 1.1
951     =cut
952    
953     1; # End of AnyEvent::Handle