ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent/lib/AnyEvent/Handle.pm
Revision: 1.37
Committed: Mon May 26 20:02:22 2008 UTC (15 years, 11 months ago) by root
Branch: MAIN
Changes since 1.36: +13 -8 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 elmex 1.1 package AnyEvent::Handle;
2    
3 elmex 1.6 no warnings;
4 elmex 1.1 use strict;
5    
6 root 1.8 use AnyEvent ();
7 root 1.33 use AnyEvent::Util qw(WSAWOULDBLOCK);
8 root 1.8 use Scalar::Util ();
9     use Carp ();
10     use Fcntl ();
11 elmex 1.1 use Errno qw/EAGAIN EINTR/;
12    
13     =head1 NAME
14    
15 root 1.22 AnyEvent::Handle - non-blocking I/O on file handles via AnyEvent
16 elmex 1.1
17     =cut
18    
19 root 1.15 our $VERSION = '0.04';
20 elmex 1.1
21     =head1 SYNOPSIS
22    
23     use AnyEvent;
24     use AnyEvent::Handle;
25    
26     my $cv = AnyEvent->condvar;
27    
28 root 1.31 my $handle =
29 elmex 1.2 AnyEvent::Handle->new (
30     fh => \*STDIN,
31     on_eof => sub {
32     $cv->broadcast;
33     },
34     );
35    
36 root 1.31 # send some request line
37     $handle->push_write ("getinfo\015\012");
38    
39     # read the response line
40     $handle->push_read (line => sub {
41     my ($handle, $line) = @_;
42     warn "read line <$line>\n";
43     $cv->send;
44     });
45    
46     $cv->recv;
47 elmex 1.1
48     =head1 DESCRIPTION
49    
50 root 1.8 This module is a helper module to make it easier to do event-based I/O on
51 elmex 1.13 filehandles. For utility functions for doing non-blocking connects and accepts
52     on sockets see L<AnyEvent::Util>.
53 root 1.8
54     In the following, when the documentation refers to of "bytes" then this
55     means characters. As sysread and syswrite are used for all I/O, their
56     treatment of characters applies to this module as well.
57 elmex 1.1
58 root 1.8 All callbacks will be invoked with the handle object as their first
59     argument.
60 elmex 1.1
61     =head1 METHODS
62    
63     =over 4
64    
65     =item B<new (%args)>
66    
67 root 1.8 The constructor supports these arguments (all as key => value pairs).
68 elmex 1.1
69     =over 4
70    
71 root 1.8 =item fh => $filehandle [MANDATORY]
72 elmex 1.1
73     The filehandle this L<AnyEvent::Handle> object will operate on.
74    
75 root 1.8 NOTE: The filehandle will be set to non-blocking (using
76     AnyEvent::Util::fh_nonblocking).
77    
78 root 1.16 =item on_eof => $cb->($self)
79 root 1.10
80     Set the callback to be called on EOF.
81 root 1.8
82 root 1.16 While not mandatory, it is highly recommended to set an eof callback,
83     otherwise you might end up with a closed socket while you are still
84     waiting for data.
85    
86 root 1.10 =item on_error => $cb->($self)
87    
88     This is the fatal error callback, that is called when, well, a fatal error
89 elmex 1.20 occurs, such as not being able to resolve the hostname, failure to connect
90 root 1.10 or a read error.
91 root 1.8
92     The object will not be in a usable state when this callback has been
93     called.
94    
95 root 1.10 On callback entrance, the value of C<$!> contains the operating system
96 root 1.29 error (or C<ENOSPC>, C<EPIPE> or C<EBADMSG>).
97 root 1.8
98 root 1.37 The callbakc should throw an exception. If it returns, then
99     AnyEvent::Handle will C<croak> for you.
100    
101 root 1.10 While not mandatory, it is I<highly> recommended to set this callback, as
102     you will not be notified of errors otherwise. The default simply calls
103     die.
104 root 1.8
105     =item on_read => $cb->($self)
106    
107     This sets the default read callback, which is called when data arrives
108 root 1.10 and no read request is in the queue.
109 root 1.8
110     To access (and remove data from) the read buffer, use the C<< ->rbuf >>
111 elmex 1.20 method or access the C<$self->{rbuf}> member directly.
112 root 1.8
113     When an EOF condition is detected then AnyEvent::Handle will first try to
114     feed all the remaining data to the queued callbacks and C<on_read> before
115     calling the C<on_eof> callback. If no progress can be made, then a fatal
116     error will be raised (with C<$!> set to C<EPIPE>).
117 elmex 1.1
118 root 1.8 =item on_drain => $cb->()
119 elmex 1.1
120 root 1.8 This sets the callback that is called when the write buffer becomes empty
121     (or when the callback is set and the buffer is empty already).
122 elmex 1.1
123 root 1.8 To append to the write buffer, use the C<< ->push_write >> method.
124 elmex 1.2
125 root 1.8 =item rbuf_max => <bytes>
126 elmex 1.2
127 root 1.8 If defined, then a fatal error will be raised (with C<$!> set to C<ENOSPC>)
128     when the read buffer ever (strictly) exceeds this size. This is useful to
129     avoid denial-of-service attacks.
130 elmex 1.2
131 root 1.8 For example, a server accepting connections from untrusted sources should
132     be configured to accept only so-and-so much data that it cannot act on
133     (for example, when expecting a line, an attacker could send an unlimited
134     amount of data without a callback ever being called as long as the line
135     isn't finished).
136 elmex 1.2
137 root 1.8 =item read_size => <bytes>
138 elmex 1.2
139 root 1.8 The default read block size (the amount of bytes this module will try to read
140     on each [loop iteration). Default: C<4096>.
141    
142     =item low_water_mark => <bytes>
143    
144     Sets the amount of bytes (default: C<0>) that make up an "empty" write
145     buffer: If the write reaches this size or gets even samller it is
146     considered empty.
147 elmex 1.2
148 root 1.19 =item tls => "accept" | "connect" | Net::SSLeay::SSL object
149    
150     When this parameter is given, it enables TLS (SSL) mode, that means it
151     will start making tls handshake and will transparently encrypt/decrypt
152     data.
153    
154 root 1.26 TLS mode requires Net::SSLeay to be installed (it will be loaded
155     automatically when you try to create a TLS handle).
156    
157 root 1.19 For the TLS server side, use C<accept>, and for the TLS client side of a
158     connection, use C<connect> mode.
159    
160     You can also provide your own TLS connection object, but you have
161     to make sure that you call either C<Net::SSLeay::set_connect_state>
162     or C<Net::SSLeay::set_accept_state> on it before you pass it to
163     AnyEvent::Handle.
164    
165 root 1.26 See the C<starttls> method if you need to start TLs negotiation later.
166    
167 root 1.19 =item tls_ctx => $ssl_ctx
168    
169     Use the given Net::SSLeay::CTX object to create the new TLS connection
170     (unless a connection object was specified directly). If this parameter is
171     missing, then AnyEvent::Handle will use C<AnyEvent::Handle::TLS_CTX>.
172    
173 elmex 1.1 =back
174    
175     =cut
176    
177     sub new {
178 root 1.8 my $class = shift;
179    
180     my $self = bless { @_ }, $class;
181    
182     $self->{fh} or Carp::croak "mandatory argument fh is missing";
183    
184     AnyEvent::Util::fh_nonblocking $self->{fh}, 1;
185 elmex 1.1
186 root 1.19 if ($self->{tls}) {
187     require Net::SSLeay;
188     $self->starttls (delete $self->{tls}, delete $self->{tls_ctx});
189     }
190    
191 root 1.16 $self->on_eof (delete $self->{on_eof} ) if $self->{on_eof};
192 root 1.10 $self->on_error (delete $self->{on_error}) if $self->{on_error};
193 root 1.8 $self->on_drain (delete $self->{on_drain}) if $self->{on_drain};
194     $self->on_read (delete $self->{on_read} ) if $self->{on_read};
195 elmex 1.1
196 root 1.10 $self->start_read;
197    
198 root 1.8 $self
199     }
200 elmex 1.2
201 root 1.8 sub _shutdown {
202     my ($self) = @_;
203 elmex 1.2
204 root 1.8 delete $self->{rw};
205     delete $self->{ww};
206     delete $self->{fh};
207     }
208    
209     sub error {
210     my ($self) = @_;
211    
212     {
213     local $!;
214     $self->_shutdown;
215 elmex 1.1 }
216    
217 root 1.37 $self->{on_error}($self)
218     if $self->{on_error};
219    
220     Carp::croak "AnyEvent::Handle uncaught fatal error: $!";
221 elmex 1.1 }
222    
223 root 1.8 =item $fh = $handle->fh
224 elmex 1.1
225 root 1.22 This method returns the file handle of the L<AnyEvent::Handle> object.
226 elmex 1.1
227     =cut
228    
229     sub fh { $_[0]->{fh} }
230    
231 root 1.8 =item $handle->on_error ($cb)
232 elmex 1.1
233 root 1.8 Replace the current C<on_error> callback (see the C<on_error> constructor argument).
234 elmex 1.1
235 root 1.8 =cut
236    
237     sub on_error {
238     $_[0]{on_error} = $_[1];
239     }
240    
241     =item $handle->on_eof ($cb)
242    
243     Replace the current C<on_eof> callback (see the C<on_eof> constructor argument).
244 elmex 1.1
245     =cut
246    
247 root 1.8 sub on_eof {
248     $_[0]{on_eof} = $_[1];
249     }
250    
251 root 1.9 #############################################################################
252    
253     =back
254    
255     =head2 WRITE QUEUE
256    
257     AnyEvent::Handle manages two queues per handle, one for writing and one
258     for reading.
259    
260     The write queue is very simple: you can add data to its end, and
261     AnyEvent::Handle will automatically try to get rid of it for you.
262    
263 elmex 1.20 When data could be written and the write buffer is shorter then the low
264 root 1.9 water mark, the C<on_drain> callback will be invoked.
265    
266     =over 4
267    
268 root 1.8 =item $handle->on_drain ($cb)
269    
270     Sets the C<on_drain> callback or clears it (see the description of
271     C<on_drain> in the constructor).
272    
273     =cut
274    
275     sub on_drain {
276 elmex 1.1 my ($self, $cb) = @_;
277    
278 root 1.8 $self->{on_drain} = $cb;
279    
280     $cb->($self)
281     if $cb && $self->{low_water_mark} >= length $self->{wbuf};
282     }
283    
284     =item $handle->push_write ($data)
285    
286     Queues the given scalar to be written. You can push as much data as you
287     want (only limited by the available memory), as C<AnyEvent::Handle>
288     buffers it independently of the kernel.
289    
290     =cut
291    
292 root 1.17 sub _drain_wbuf {
293     my ($self) = @_;
294 root 1.8
295 root 1.29 if (!$self->{ww} && length $self->{wbuf}) {
296 root 1.35
297 root 1.8 Scalar::Util::weaken $self;
298 root 1.35
299 root 1.8 my $cb = sub {
300     my $len = syswrite $self->{fh}, $self->{wbuf};
301    
302 root 1.29 if ($len >= 0) {
303 root 1.8 substr $self->{wbuf}, 0, $len, "";
304    
305     $self->{on_drain}($self)
306     if $self->{low_water_mark} >= length $self->{wbuf}
307     && $self->{on_drain};
308    
309     delete $self->{ww} unless length $self->{wbuf};
310 root 1.33 } elsif ($! != EAGAIN && $! != EINTR && $! != WSAWOULDBLOCK) {
311 root 1.8 $self->error;
312 elmex 1.1 }
313 root 1.8 };
314    
315 root 1.35 # try to write data immediately
316     $cb->();
317 root 1.8
318 root 1.35 # if still data left in wbuf, we need to poll
319     $self->{ww} = AnyEvent->io (fh => $self->{fh}, poll => "w", cb => $cb)
320     if length $self->{wbuf};
321 root 1.8 };
322     }
323    
324 root 1.30 our %WH;
325    
326     sub register_write_type($$) {
327     $WH{$_[0]} = $_[1];
328     }
329    
330 root 1.17 sub push_write {
331     my $self = shift;
332    
333 root 1.29 if (@_ > 1) {
334     my $type = shift;
335    
336     @_ = ($WH{$type} or Carp::croak "unsupported type passed to AnyEvent::Handle::push_write")
337     ->($self, @_);
338     }
339    
340 root 1.17 if ($self->{filter_w}) {
341 root 1.18 $self->{filter_w}->($self, \$_[0]);
342 root 1.17 } else {
343     $self->{wbuf} .= $_[0];
344     $self->_drain_wbuf;
345     }
346     }
347    
348 root 1.29 =item $handle->push_write (type => @args)
349    
350     =item $handle->unshift_write (type => @args)
351    
352     Instead of formatting your data yourself, you can also let this module do
353     the job by specifying a type and type-specific arguments.
354    
355 root 1.30 Predefined types are (if you have ideas for additional types, feel free to
356     drop by and tell us):
357 root 1.29
358     =over 4
359    
360     =item netstring => $string
361    
362     Formats the given value as netstring
363     (http://cr.yp.to/proto/netstrings.txt, this is not a recommendation to use them).
364    
365 root 1.30 =back
366    
367 root 1.29 =cut
368    
369     register_write_type netstring => sub {
370     my ($self, $string) = @_;
371    
372     sprintf "%d:%s,", (length $string), $string
373     };
374    
375 root 1.30 =item AnyEvent::Handle::register_write_type type => $coderef->($self, @args)
376    
377     This function (not method) lets you add your own types to C<push_write>.
378     Whenever the given C<type> is used, C<push_write> will invoke the code
379     reference with the handle object and the remaining arguments.
380 root 1.29
381 root 1.30 The code reference is supposed to return a single octet string that will
382     be appended to the write buffer.
383 root 1.29
384 root 1.30 Note that this is a function, and all types registered this way will be
385     global, so try to use unique names.
386 root 1.29
387 root 1.30 =cut
388 root 1.29
389 root 1.8 #############################################################################
390    
391 root 1.9 =back
392    
393     =head2 READ QUEUE
394    
395     AnyEvent::Handle manages two queues per handle, one for writing and one
396     for reading.
397    
398     The read queue is more complex than the write queue. It can be used in two
399     ways, the "simple" way, using only C<on_read> and the "complex" way, using
400     a queue.
401    
402     In the simple case, you just install an C<on_read> callback and whenever
403     new data arrives, it will be called. You can then remove some data (if
404     enough is there) from the read buffer (C<< $handle->rbuf >>) if you want
405     or not.
406    
407     In the more complex case, you want to queue multiple callbacks. In this
408     case, AnyEvent::Handle will call the first queued callback each time new
409     data arrives and removes it when it has done its job (see C<push_read>,
410     below).
411    
412     This way you can, for example, push three line-reads, followed by reading
413     a chunk of data, and AnyEvent::Handle will execute them in order.
414    
415     Example 1: EPP protocol parser. EPP sends 4 byte length info, followed by
416     the specified number of bytes which give an XML datagram.
417    
418     # in the default state, expect some header bytes
419     $handle->on_read (sub {
420     # some data is here, now queue the length-header-read (4 octets)
421     shift->unshift_read_chunk (4, sub {
422     # header arrived, decode
423     my $len = unpack "N", $_[1];
424    
425     # now read the payload
426     shift->unshift_read_chunk ($len, sub {
427     my $xml = $_[1];
428     # handle xml
429     });
430     });
431     });
432    
433     Example 2: Implement a client for a protocol that replies either with
434     "OK" and another line or "ERROR" for one request, and 64 bytes for the
435     second request. Due tot he availability of a full queue, we can just
436     pipeline sending both requests and manipulate the queue as necessary in
437     the callbacks:
438    
439     # request one
440     $handle->push_write ("request 1\015\012");
441    
442     # we expect "ERROR" or "OK" as response, so push a line read
443     $handle->push_read_line (sub {
444     # if we got an "OK", we have to _prepend_ another line,
445     # so it will be read before the second request reads its 64 bytes
446     # which are already in the queue when this callback is called
447     # we don't do this in case we got an error
448     if ($_[1] eq "OK") {
449     $_[0]->unshift_read_line (sub {
450     my $response = $_[1];
451     ...
452     });
453     }
454     });
455    
456     # request two
457     $handle->push_write ("request 2\015\012");
458    
459     # simply read 64 bytes, always
460     $handle->push_read_chunk (64, sub {
461     my $response = $_[1];
462     ...
463     });
464    
465     =over 4
466    
467 root 1.10 =cut
468    
469 root 1.8 sub _drain_rbuf {
470     my ($self) = @_;
471 elmex 1.1
472 root 1.17 if (
473     defined $self->{rbuf_max}
474     && $self->{rbuf_max} < length $self->{rbuf}
475     ) {
476 root 1.37 $! = &Errno::ENOSPC;
477     $self->error;
478 root 1.17 }
479    
480 root 1.11 return if $self->{in_drain};
481 root 1.8 local $self->{in_drain} = 1;
482 elmex 1.1
483 root 1.8 while (my $len = length $self->{rbuf}) {
484     no strict 'refs';
485 root 1.10 if (my $cb = shift @{ $self->{queue} }) {
486 root 1.29 unless ($cb->($self)) {
487 root 1.10 if ($self->{eof}) {
488     # no progress can be made (not enough data and no data forthcoming)
489 root 1.37 $! = &Errno::EPIPE;
490     $self->error;
491 root 1.10 }
492    
493     unshift @{ $self->{queue} }, $cb;
494 root 1.8 return;
495     }
496     } elsif ($self->{on_read}) {
497     $self->{on_read}($self);
498    
499     if (
500     $self->{eof} # if no further data will arrive
501     && $len == length $self->{rbuf} # and no data has been consumed
502     && !@{ $self->{queue} } # and the queue is still empty
503     && $self->{on_read} # and we still want to read data
504     ) {
505     # then no progress can be made
506 root 1.37 $! = &Errno::EPIPE;
507     $self->error;
508 elmex 1.1 }
509 root 1.8 } else {
510     # read side becomes idle
511     delete $self->{rw};
512     return;
513     }
514     }
515    
516     if ($self->{eof}) {
517     $self->_shutdown;
518 root 1.16 $self->{on_eof}($self)
519     if $self->{on_eof};
520 root 1.8 }
521 elmex 1.1 }
522    
523 root 1.8 =item $handle->on_read ($cb)
524 elmex 1.1
525 root 1.8 This replaces the currently set C<on_read> callback, or clears it (when
526     the new callback is C<undef>). See the description of C<on_read> in the
527     constructor.
528 elmex 1.1
529 root 1.8 =cut
530    
531     sub on_read {
532     my ($self, $cb) = @_;
533 elmex 1.1
534 root 1.8 $self->{on_read} = $cb;
535 elmex 1.1 }
536    
537 root 1.8 =item $handle->rbuf
538    
539     Returns the read buffer (as a modifiable lvalue).
540 elmex 1.1
541 root 1.8 You can access the read buffer directly as the C<< ->{rbuf} >> member, if
542     you want.
543 elmex 1.1
544 root 1.8 NOTE: The read buffer should only be used or modified if the C<on_read>,
545     C<push_read> or C<unshift_read> methods are used. The other read methods
546     automatically manage the read buffer.
547 elmex 1.1
548     =cut
549    
550 elmex 1.2 sub rbuf : lvalue {
551 root 1.8 $_[0]{rbuf}
552 elmex 1.2 }
553 elmex 1.1
554 root 1.8 =item $handle->push_read ($cb)
555    
556     =item $handle->unshift_read ($cb)
557    
558     Append the given callback to the end of the queue (C<push_read>) or
559     prepend it (C<unshift_read>).
560    
561     The callback is called each time some additional read data arrives.
562 elmex 1.1
563 elmex 1.20 It must check whether enough data is in the read buffer already.
564 elmex 1.1
565 root 1.8 If not enough data is available, it must return the empty list or a false
566     value, in which case it will be called repeatedly until enough data is
567     available (or an error condition is detected).
568    
569     If enough data was available, then the callback must remove all data it is
570     interested in (which can be none at all) and return a true value. After returning
571     true, it will be removed from the queue.
572 elmex 1.1
573     =cut
574    
575 root 1.30 our %RH;
576    
577     sub register_read_type($$) {
578     $RH{$_[0]} = $_[1];
579     }
580    
581 root 1.8 sub push_read {
582 root 1.28 my $self = shift;
583     my $cb = pop;
584    
585     if (@_) {
586     my $type = shift;
587    
588     $cb = ($RH{$type} or Carp::croak "unsupported type passed to AnyEvent::Handle::push_read")
589     ->($self, $cb, @_);
590     }
591 elmex 1.1
592 root 1.8 push @{ $self->{queue} }, $cb;
593     $self->_drain_rbuf;
594 elmex 1.1 }
595    
596 root 1.8 sub unshift_read {
597 root 1.28 my $self = shift;
598     my $cb = pop;
599    
600     if (@_) {
601     my $type = shift;
602    
603     $cb = ($RH{$type} or Carp::croak "unsupported type passed to AnyEvent::Handle::unshift_read")
604     ->($self, $cb, @_);
605     }
606    
607 root 1.8
608 root 1.28 unshift @{ $self->{queue} }, $cb;
609 root 1.8 $self->_drain_rbuf;
610     }
611 elmex 1.1
612 root 1.28 =item $handle->push_read (type => @args, $cb)
613 elmex 1.1
614 root 1.28 =item $handle->unshift_read (type => @args, $cb)
615 elmex 1.1
616 root 1.28 Instead of providing a callback that parses the data itself you can chose
617     between a number of predefined parsing formats, for chunks of data, lines
618     etc.
619 elmex 1.1
620 root 1.30 Predefined types are (if you have ideas for additional types, feel free to
621     drop by and tell us):
622 root 1.28
623     =over 4
624    
625     =item chunk => $octets, $cb->($self, $data)
626    
627     Invoke the callback only once C<$octets> bytes have been read. Pass the
628     data read to the callback. The callback will never be called with less
629     data.
630    
631     Example: read 2 bytes.
632    
633     $handle->push_read (chunk => 2, sub {
634     warn "yay ", unpack "H*", $_[1];
635     });
636 elmex 1.1
637     =cut
638    
639 root 1.28 register_read_type chunk => sub {
640     my ($self, $cb, $len) = @_;
641 elmex 1.1
642 root 1.8 sub {
643     $len <= length $_[0]{rbuf} or return;
644 elmex 1.12 $cb->($_[0], substr $_[0]{rbuf}, 0, $len, "");
645 root 1.8 1
646     }
647 root 1.28 };
648 root 1.8
649 root 1.28 # compatibility with older API
650 root 1.8 sub push_read_chunk {
651 root 1.28 $_[0]->push_read (chunk => $_[1], $_[2]);
652 root 1.8 }
653 elmex 1.1
654 root 1.8 sub unshift_read_chunk {
655 root 1.28 $_[0]->unshift_read (chunk => $_[1], $_[2]);
656 elmex 1.1 }
657    
658 root 1.28 =item line => [$eol, ]$cb->($self, $line, $eol)
659 elmex 1.1
660 root 1.8 The callback will be called only once a full line (including the end of
661     line marker, C<$eol>) has been read. This line (excluding the end of line
662     marker) will be passed to the callback as second argument (C<$line>), and
663     the end of line marker as the third argument (C<$eol>).
664 elmex 1.1
665 root 1.8 The end of line marker, C<$eol>, can be either a string, in which case it
666     will be interpreted as a fixed record end marker, or it can be a regex
667     object (e.g. created by C<qr>), in which case it is interpreted as a
668     regular expression.
669 elmex 1.1
670 root 1.8 The end of line marker argument C<$eol> is optional, if it is missing (NOT
671     undef), then C<qr|\015?\012|> is used (which is good for most internet
672     protocols).
673 elmex 1.1
674 root 1.8 Partial lines at the end of the stream will never be returned, as they are
675     not marked by the end of line marker.
676 elmex 1.1
677 root 1.8 =cut
678 elmex 1.1
679 root 1.28 register_read_type line => sub {
680     my ($self, $cb, $eol) = @_;
681 elmex 1.1
682 root 1.28 $eol = qr|(\015?\012)| if @_ < 3;
683 root 1.14 $eol = quotemeta $eol unless ref $eol;
684     $eol = qr|^(.*?)($eol)|s;
685 elmex 1.1
686 root 1.8 sub {
687     $_[0]{rbuf} =~ s/$eol// or return;
688 elmex 1.1
689 elmex 1.12 $cb->($_[0], $1, $2);
690 root 1.8 1
691     }
692 root 1.28 };
693 elmex 1.1
694 root 1.28 # compatibility with older API
695 root 1.8 sub push_read_line {
696 root 1.28 my $self = shift;
697     $self->push_read (line => @_);
698 root 1.10 }
699    
700     sub unshift_read_line {
701 root 1.28 my $self = shift;
702     $self->unshift_read (line => @_);
703 root 1.10 }
704    
705 root 1.29 =item netstring => $cb->($string)
706    
707     A netstring (http://cr.yp.to/proto/netstrings.txt, this is not an endorsement).
708    
709     Throws an error with C<$!> set to EBADMSG on format violations.
710    
711     =cut
712    
713     register_read_type netstring => sub {
714     my ($self, $cb) = @_;
715    
716     sub {
717     unless ($_[0]{rbuf} =~ s/^(0|[1-9][0-9]*)://) {
718     if ($_[0]{rbuf} =~ /[^0-9]/) {
719     $! = &Errno::EBADMSG;
720     $self->error;
721     }
722     return;
723     }
724    
725     my $len = $1;
726    
727     $self->unshift_read (chunk => $len, sub {
728     my $string = $_[1];
729     $_[0]->unshift_read (chunk => 1, sub {
730     if ($_[1] eq ",") {
731     $cb->($_[0], $string);
732     } else {
733     $! = &Errno::EBADMSG;
734     $self->error;
735     }
736     });
737     });
738    
739     1
740     }
741     };
742    
743 root 1.36 =item regex => $accept[, $reject[, $skip], $cb->($data)
744    
745     Makes a regex match against the regex object C<$accept> and returns
746     everything up to and including the match.
747    
748     Example: read a single line terminated by '\n'.
749    
750     $handle->push_read (regex => qr<\n>, sub { ... });
751    
752     If C<$reject> is given and not undef, then it determines when the data is
753     to be rejected: it is matched against the data when the C<$accept> regex
754     does not match and generates an C<EBADMSG> error when it matches. This is
755     useful to quickly reject wrong data (to avoid waiting for a timeout or a
756     receive buffer overflow).
757    
758     Example: expect a single decimal number followed by whitespace, reject
759     anything else (not the use of an anchor).
760    
761     $handle->push_read (regex => qr<^[0-9]+\s>, qr<[^0-9]>, sub { ... });
762    
763     If C<$skip> is given and not C<undef>, then it will be matched against
764     the receive buffer when neither C<$accept> nor C<$reject> match,
765     and everything preceding and including the match will be accepted
766     unconditionally. This is useful to skip large amounts of data that you
767     know cannot be matched, so that the C<$accept> or C<$reject> regex do not
768     have to start matching from the beginning. This is purely an optimisation
769     and is usually worth only when you expect more than a few kilobytes.
770    
771     Example: expect a http header, which ends at C<\015\012\015\012>. Since we
772     expect the header to be very large (it isn't in practise, but...), we use
773     a skip regex to skip initial portions. The skip regex is tricky in that
774     it only accepts something not ending in either \015 or \012, as these are
775     required for the accept regex.
776    
777     $handle->push_read (regex =>
778     qr<\015\012\015\012>,
779     undef, # no reject
780     qr<^.*[^\015\012]>,
781     sub { ... });
782    
783     =cut
784    
785     register_read_type regex => sub {
786     my ($self, $cb, $accept, $reject, $skip) = @_;
787    
788     my $data;
789     my $rbuf = \$self->{rbuf};
790    
791     sub {
792     # accept
793     if ($$rbuf =~ $accept) {
794     $data .= substr $$rbuf, 0, $+[0], "";
795     $cb->($self, $data);
796     return 1;
797     }
798    
799     # reject
800     if ($reject && $$rbuf =~ $reject) {
801     $! = &Errno::EBADMSG;
802     $self->error;
803     }
804    
805     # skip
806     if ($skip && $$rbuf =~ $skip) {
807     $data .= substr $$rbuf, 0, $+[0], "";
808     }
809    
810     ()
811     }
812     };
813    
814 root 1.28 =back
815    
816 root 1.30 =item AnyEvent::Handle::register_read_type type => $coderef->($self, $cb, @args)
817    
818     This function (not method) lets you add your own types to C<push_read>.
819    
820     Whenever the given C<type> is used, C<push_read> will invoke the code
821     reference with the handle object, the callback and the remaining
822     arguments.
823    
824     The code reference is supposed to return a callback (usually a closure)
825     that works as a plain read callback (see C<< ->push_read ($cb) >>).
826    
827     It should invoke the passed callback when it is done reading (remember to
828     pass C<$self> as first argument as all other callbacks do that).
829    
830     Note that this is a function, and all types registered this way will be
831     global, so try to use unique names.
832    
833     For examples, see the source of this module (F<perldoc -m AnyEvent::Handle>,
834     search for C<register_read_type>)).
835    
836 root 1.10 =item $handle->stop_read
837    
838     =item $handle->start_read
839    
840 root 1.18 In rare cases you actually do not want to read anything from the
841 root 1.10 socket. In this case you can call C<stop_read>. Neither C<on_read> no
842 root 1.22 any queued callbacks will be executed then. To start reading again, call
843 root 1.10 C<start_read>.
844    
845     =cut
846    
847     sub stop_read {
848     my ($self) = @_;
849 elmex 1.1
850 root 1.10 delete $self->{rw};
851 root 1.8 }
852 elmex 1.1
853 root 1.10 sub start_read {
854     my ($self) = @_;
855    
856     unless ($self->{rw} || $self->{eof}) {
857     Scalar::Util::weaken $self;
858    
859     $self->{rw} = AnyEvent->io (fh => $self->{fh}, poll => "r", cb => sub {
860 root 1.17 my $rbuf = $self->{filter_r} ? \my $buf : \$self->{rbuf};
861     my $len = sysread $self->{fh}, $$rbuf, $self->{read_size} || 8192, length $$rbuf;
862 root 1.10
863     if ($len > 0) {
864 root 1.17 $self->{filter_r}
865 root 1.18 ? $self->{filter_r}->($self, $rbuf)
866 root 1.17 : $self->_drain_rbuf;
867 root 1.10
868     } elsif (defined $len) {
869 root 1.17 delete $self->{rw};
870 root 1.10 $self->{eof} = 1;
871 root 1.17 $self->_drain_rbuf;
872 root 1.10
873 root 1.33 } elsif ($! != EAGAIN && $! != EINTR && $! != &AnyEvent::Util::WSAWOULDBLOCK) {
874 root 1.10 return $self->error;
875     }
876     });
877     }
878 elmex 1.1 }
879    
880 root 1.19 sub _dotls {
881     my ($self) = @_;
882    
883     if (length $self->{tls_wbuf}) {
884 root 1.22 while ((my $len = Net::SSLeay::write ($self->{tls}, $self->{tls_wbuf})) > 0) {
885     substr $self->{tls_wbuf}, 0, $len, "";
886     }
887 root 1.19 }
888    
889     if (defined (my $buf = Net::SSLeay::BIO_read ($self->{tls_wbio}))) {
890     $self->{wbuf} .= $buf;
891     $self->_drain_wbuf;
892     }
893    
894 root 1.23 while (defined (my $buf = Net::SSLeay::read ($self->{tls}))) {
895     $self->{rbuf} .= $buf;
896     $self->_drain_rbuf;
897     }
898    
899 root 1.24 my $err = Net::SSLeay::get_error ($self->{tls}, -1);
900    
901     if ($err!= Net::SSLeay::ERROR_WANT_READ ()) {
902 root 1.23 if ($err == Net::SSLeay::ERROR_SYSCALL ()) {
903     $self->error;
904     } elsif ($err == Net::SSLeay::ERROR_SSL ()) {
905     $! = &Errno::EIO;
906     $self->error;
907 root 1.19 }
908 root 1.23
909     # all others are fine for our purposes
910 root 1.19 }
911     }
912    
913 root 1.25 =item $handle->starttls ($tls[, $tls_ctx])
914    
915     Instead of starting TLS negotiation immediately when the AnyEvent::Handle
916     object is created, you can also do that at a later time by calling
917     C<starttls>.
918    
919     The first argument is the same as the C<tls> constructor argument (either
920     C<"connect">, C<"accept"> or an existing Net::SSLeay object).
921    
922     The second argument is the optional C<Net::SSLeay::CTX> object that is
923     used when AnyEvent::Handle has to create its own TLS connection object.
924    
925     =cut
926    
927 root 1.19 # TODO: maybe document...
928     sub starttls {
929     my ($self, $ssl, $ctx) = @_;
930    
931 root 1.25 $self->stoptls;
932    
933 root 1.19 if ($ssl eq "accept") {
934     $ssl = Net::SSLeay::new ($ctx || TLS_CTX ());
935     Net::SSLeay::set_accept_state ($ssl);
936     } elsif ($ssl eq "connect") {
937     $ssl = Net::SSLeay::new ($ctx || TLS_CTX ());
938     Net::SSLeay::set_connect_state ($ssl);
939     }
940    
941     $self->{tls} = $ssl;
942    
943 root 1.21 # basically, this is deep magic (because SSL_read should have the same issues)
944     # but the openssl maintainers basically said: "trust us, it just works".
945     # (unfortunately, we have to hardcode constants because the abysmally misdesigned
946     # and mismaintained ssleay-module doesn't even offer them).
947 root 1.27 # http://www.mail-archive.com/openssl-dev@openssl.org/msg22420.html
948 root 1.21 Net::SSLeay::CTX_set_mode ($self->{tls},
949 root 1.34 (eval { local $SIG{__DIE__}; Net::SSLeay::MODE_ENABLE_PARTIAL_WRITE () } || 1)
950     | (eval { local $SIG{__DIE__}; Net::SSLeay::MODE_ACCEPT_MOVING_WRITE_BUFFER () } || 2));
951 root 1.21
952 root 1.19 $self->{tls_rbio} = Net::SSLeay::BIO_new (Net::SSLeay::BIO_s_mem ());
953     $self->{tls_wbio} = Net::SSLeay::BIO_new (Net::SSLeay::BIO_s_mem ());
954    
955     Net::SSLeay::set_bio ($ssl, $self->{tls_rbio}, $self->{tls_wbio});
956    
957     $self->{filter_w} = sub {
958     $_[0]{tls_wbuf} .= ${$_[1]};
959     &_dotls;
960     };
961     $self->{filter_r} = sub {
962     Net::SSLeay::BIO_write ($_[0]{tls_rbio}, ${$_[1]});
963     &_dotls;
964     };
965     }
966    
967 root 1.25 =item $handle->stoptls
968    
969     Destroys the SSL connection, if any. Partial read or write data will be
970     lost.
971    
972     =cut
973    
974     sub stoptls {
975     my ($self) = @_;
976    
977     Net::SSLeay::free (delete $self->{tls}) if $self->{tls};
978     delete $self->{tls_rbio};
979     delete $self->{tls_wbio};
980     delete $self->{tls_wbuf};
981     delete $self->{filter_r};
982     delete $self->{filter_w};
983     }
984    
985 root 1.19 sub DESTROY {
986     my $self = shift;
987    
988 root 1.25 $self->stoptls;
989 root 1.19 }
990    
991     =item AnyEvent::Handle::TLS_CTX
992    
993     This function creates and returns the Net::SSLeay::CTX object used by
994     default for TLS mode.
995    
996     The context is created like this:
997    
998     Net::SSLeay::load_error_strings;
999     Net::SSLeay::SSLeay_add_ssl_algorithms;
1000     Net::SSLeay::randomize;
1001    
1002     my $CTX = Net::SSLeay::CTX_new;
1003    
1004     Net::SSLeay::CTX_set_options $CTX, Net::SSLeay::OP_ALL
1005    
1006     =cut
1007    
1008     our $TLS_CTX;
1009    
1010     sub TLS_CTX() {
1011     $TLS_CTX || do {
1012     require Net::SSLeay;
1013    
1014     Net::SSLeay::load_error_strings ();
1015     Net::SSLeay::SSLeay_add_ssl_algorithms ();
1016     Net::SSLeay::randomize ();
1017    
1018     $TLS_CTX = Net::SSLeay::CTX_new ();
1019    
1020     Net::SSLeay::CTX_set_options ($TLS_CTX, Net::SSLeay::OP_ALL ());
1021    
1022     $TLS_CTX
1023     }
1024     }
1025    
1026 elmex 1.1 =back
1027    
1028     =head1 AUTHOR
1029    
1030 root 1.8 Robin Redeker C<< <elmex at ta-sa.org> >>, Marc Lehmann <schmorp@schmorp.de>.
1031 elmex 1.1
1032     =cut
1033    
1034     1; # End of AnyEvent::Handle