ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent/lib/AnyEvent/Handle.pm
(Generate patch)

Comparing AnyEvent/lib/AnyEvent/Handle.pm (file contents):
Revision 1.4 by elmex, Sun Apr 27 20:20:20 2008 UTC vs.
Revision 1.98 by root, Thu Oct 2 15:11:01 2008 UTC

1package AnyEvent::Handle; 1package AnyEvent::Handle;
2 2
3use warnings; 3no warnings;
4use strict; 4use strict qw(subs vars);
5 5
6use AnyEvent; 6use AnyEvent ();
7use IO::Handle; 7use AnyEvent::Util qw(WSAEWOULDBLOCK);
8use Scalar::Util ();
9use Carp ();
10use Fcntl ();
8use Errno qw/EAGAIN EINTR/; 11use Errno qw(EAGAIN EINTR);
9 12
10=head1 NAME 13=head1 NAME
11 14
12AnyEvent::Handle - non-blocking I/O on filehandles via AnyEvent 15AnyEvent::Handle - non-blocking I/O on file handles via AnyEvent
13 16
14=head1 VERSION
15
16Version 0.01
17
18=cut 17=cut
19 18
20our $VERSION = '0.01'; 19our $VERSION = 4.3;
21 20
22=head1 SYNOPSIS 21=head1 SYNOPSIS
23 22
24 use AnyEvent; 23 use AnyEvent;
25 use AnyEvent::Handle; 24 use AnyEvent::Handle;
26 25
27 my $cv = AnyEvent->condvar; 26 my $cv = AnyEvent->condvar;
28 27
29 my $ae_fh = AnyEvent::Handle->new (fh => \*STDIN); 28 my $handle =
30
31 $ae_fh->on_eof (sub { $cv->broadcast });
32
33 $ae_fh->readlines (sub {
34 my ($ae_fh, @lines) = @_;
35 for (@lines) {
36 chomp;
37 print "Line: $_";
38 }
39 });
40
41 # or use the constructor to pass the callback:
42
43 my $ae_fh2 =
44 AnyEvent::Handle->new ( 29 AnyEvent::Handle->new (
45 fh => \*STDIN, 30 fh => \*STDIN,
46 on_eof => sub { 31 on_eof => sub {
47 $cv->broadcast; 32 $cv->broadcast;
48 }, 33 },
34 );
35
36 # send some request line
37 $handle->push_write ("getinfo\015\012");
38
39 # read the response line
40 $handle->push_read (line => sub {
41 my ($handle, $line) = @_;
42 warn "read line <$line>\n";
43 $cv->send;
44 });
45
46 $cv->recv;
47
48=head1 DESCRIPTION
49
50This module is a helper module to make it easier to do event-based I/O on
51filehandles. For utility functions for doing non-blocking connects and accepts
52on sockets see L<AnyEvent::Util>.
53
54The L<AnyEvent::Intro> tutorial contains some well-documented
55AnyEvent::Handle examples.
56
57In the following, when the documentation refers to of "bytes" then this
58means characters. As sysread and syswrite are used for all I/O, their
59treatment of characters applies to this module as well.
60
61All callbacks will be invoked with the handle object as their first
62argument.
63
64=head1 METHODS
65
66=over 4
67
68=item B<new (%args)>
69
70The constructor supports these arguments (all as key => value pairs).
71
72=over 4
73
74=item fh => $filehandle [MANDATORY]
75
76The filehandle this L<AnyEvent::Handle> object will operate on.
77
78NOTE: The filehandle will be set to non-blocking mode (using
79C<AnyEvent::Util::fh_nonblocking>) by the constructor and needs to stay in
80that mode.
81
82=item on_eof => $cb->($handle)
83
84Set the callback to be called when an end-of-file condition is detected,
85i.e. in the case of a socket, when the other side has closed the
86connection cleanly.
87
88For sockets, this just means that the other side has stopped sending data,
89you can still try to write data, and, in fact, one can return from the eof
90callback and continue writing data, as only the read part has been shut
91down.
92
93While not mandatory, it is I<highly> recommended to set an eof callback,
94otherwise you might end up with a closed socket while you are still
95waiting for data.
96
97If an EOF condition has been detected but no C<on_eof> callback has been
98set, then a fatal error will be raised with C<$!> set to <0>.
99
100=item on_error => $cb->($handle, $fatal)
101
102This is the error callback, which is called when, well, some error
103occured, such as not being able to resolve the hostname, failure to
104connect or a read error.
105
106Some errors are fatal (which is indicated by C<$fatal> being true). On
107fatal errors the handle object will be shut down and will not be usable
108(but you are free to look at the current C<< ->rbuf >>). Examples of fatal
109errors are an EOF condition with active (but unsatisifable) read watchers
110(C<EPIPE>) or I/O errors.
111
112Non-fatal errors can be retried by simply returning, but it is recommended
113to simply ignore this parameter and instead abondon the handle object
114when this callback is invoked. Examples of non-fatal errors are timeouts
115C<ETIMEDOUT>) or badly-formatted data (C<EBADMSG>).
116
117On callback entrance, the value of C<$!> contains the operating system
118error (or C<ENOSPC>, C<EPIPE>, C<ETIMEDOUT> or C<EBADMSG>).
119
120While not mandatory, it is I<highly> recommended to set this callback, as
121you will not be notified of errors otherwise. The default simply calls
122C<croak>.
123
124=item on_read => $cb->($handle)
125
126This sets the default read callback, which is called when data arrives
127and no read request is in the queue (unlike read queue callbacks, this
128callback will only be called when at least one octet of data is in the
129read buffer).
130
131To access (and remove data from) the read buffer, use the C<< ->rbuf >>
132method or access the C<$handle->{rbuf}> member directly.
133
134When an EOF condition is detected then AnyEvent::Handle will first try to
135feed all the remaining data to the queued callbacks and C<on_read> before
136calling the C<on_eof> callback. If no progress can be made, then a fatal
137error will be raised (with C<$!> set to C<EPIPE>).
138
139=item on_drain => $cb->($handle)
140
141This sets the callback that is called when the write buffer becomes empty
142(or when the callback is set and the buffer is empty already).
143
144To append to the write buffer, use the C<< ->push_write >> method.
145
146This callback is useful when you don't want to put all of your write data
147into the queue at once, for example, when you want to write the contents
148of some file to the socket you might not want to read the whole file into
149memory and push it into the queue, but instead only read more data from
150the file when the write queue becomes empty.
151
152=item timeout => $fractional_seconds
153
154If non-zero, then this enables an "inactivity" timeout: whenever this many
155seconds pass without a successful read or write on the underlying file
156handle, the C<on_timeout> callback will be invoked (and if that one is
157missing, a non-fatal C<ETIMEDOUT> error will be raised).
158
159Note that timeout processing is also active when you currently do not have
160any outstanding read or write requests: If you plan to keep the connection
161idle then you should disable the timout temporarily or ignore the timeout
162in the C<on_timeout> callback, in which case AnyEvent::Handle will simply
163restart the timeout.
164
165Zero (the default) disables this timeout.
166
167=item on_timeout => $cb->($handle)
168
169Called whenever the inactivity timeout passes. If you return from this
170callback, then the timeout will be reset as if some activity had happened,
171so this condition is not fatal in any way.
172
173=item rbuf_max => <bytes>
174
175If defined, then a fatal error will be raised (with C<$!> set to C<ENOSPC>)
176when the read buffer ever (strictly) exceeds this size. This is useful to
177avoid some forms of denial-of-service attacks.
178
179For example, a server accepting connections from untrusted sources should
180be configured to accept only so-and-so much data that it cannot act on
181(for example, when expecting a line, an attacker could send an unlimited
182amount of data without a callback ever being called as long as the line
183isn't finished).
184
185=item autocork => <boolean>
186
187When disabled (the default), then C<push_write> will try to immediately
188write the data to the handle, if possible. This avoids having to register
189a write watcher and wait for the next event loop iteration, but can
190be inefficient if you write multiple small chunks (on the wire, this
191disadvantage is usually avoided by your kernel's nagle algorithm, see
192C<no_delay>, but this option can save costly syscalls).
193
194When enabled, then writes will always be queued till the next event loop
195iteration. This is efficient when you do many small writes per iteration,
196but less efficient when you do a single write only per iteration (or when
197the write buffer often is full). It also increases write latency.
198
199=item no_delay => <boolean>
200
201When doing small writes on sockets, your operating system kernel might
202wait a bit for more data before actually sending it out. This is called
203the Nagle algorithm, and usually it is beneficial.
204
205In some situations you want as low a delay as possible, which can be
206accomplishd by setting this option to a true value.
207
208The default is your opertaing system's default behaviour (most likely
209enabled), this option explicitly enables or disables it, if possible.
210
211=item read_size => <bytes>
212
213The default read block size (the amount of bytes this module will
214try to read during each loop iteration, which affects memory
215requirements). Default: C<8192>.
216
217=item low_water_mark => <bytes>
218
219Sets the amount of bytes (default: C<0>) that make up an "empty" write
220buffer: If the write reaches this size or gets even samller it is
221considered empty.
222
223Sometimes it can be beneficial (for performance reasons) to add data to
224the write buffer before it is fully drained, but this is a rare case, as
225the operating system kernel usually buffers data as well, so the default
226is good in almost all cases.
227
228=item linger => <seconds>
229
230If non-zero (default: C<3600>), then the destructor of the
231AnyEvent::Handle object will check whether there is still outstanding
232write data and will install a watcher that will write this data to the
233socket. No errors will be reported (this mostly matches how the operating
234system treats outstanding data at socket close time).
235
236This will not work for partial TLS data that could not be encoded
237yet. This data will be lost. Calling the C<stoptls> method in time might
238help.
239
240=item tls => "accept" | "connect" | Net::SSLeay::SSL object
241
242When this parameter is given, it enables TLS (SSL) mode, that means
243AnyEvent will start a TLS handshake as soon as the conenction has been
244established and will transparently encrypt/decrypt data afterwards.
245
246TLS mode requires Net::SSLeay to be installed (it will be loaded
247automatically when you try to create a TLS handle): this module doesn't
248have a dependency on that module, so if your module requires it, you have
249to add the dependency yourself.
250
251Unlike TCP, TLS has a server and client side: for the TLS server side, use
252C<accept>, and for the TLS client side of a connection, use C<connect>
253mode.
254
255You can also provide your own TLS connection object, but you have
256to make sure that you call either C<Net::SSLeay::set_connect_state>
257or C<Net::SSLeay::set_accept_state> on it before you pass it to
258AnyEvent::Handle.
259
260See the C<< ->starttls >> method for when need to start TLS negotiation later.
261
262=item tls_ctx => $ssl_ctx
263
264Use the given C<Net::SSLeay::CTX> object to create the new TLS connection
265(unless a connection object was specified directly). If this parameter is
266missing, then AnyEvent::Handle will use C<AnyEvent::Handle::TLS_CTX>.
267
268=item json => JSON or JSON::XS object
269
270This is the json coder object used by the C<json> read and write types.
271
272If you don't supply it, then AnyEvent::Handle will create and use a
273suitable one (on demand), which will write and expect UTF-8 encoded JSON
274texts.
275
276Note that you are responsible to depend on the JSON module if you want to
277use this functionality, as AnyEvent does not have a dependency itself.
278
279=back
280
281=cut
282
283sub new {
284 my $class = shift;
285
286 my $self = bless { @_ }, $class;
287
288 $self->{fh} or Carp::croak "mandatory argument fh is missing";
289
290 AnyEvent::Util::fh_nonblocking $self->{fh}, 1;
291
292 $self->starttls (delete $self->{tls}, delete $self->{tls_ctx})
293 if $self->{tls};
294
295 $self->{_activity} = AnyEvent->now;
296 $self->_timeout;
297
298 $self->on_drain (delete $self->{on_drain}) if exists $self->{on_drain};
299 $self->no_delay (delete $self->{no_delay}) if exists $self->{no_delay};
300
301 $self->start_read
302 if $self->{on_read};
303
304 $self
305}
306
307sub _shutdown {
308 my ($self) = @_;
309
310 delete $self->{_tw};
311 delete $self->{_rw};
312 delete $self->{_ww};
313 delete $self->{fh};
314
315 &_freetls;
316
317 delete $self->{on_read};
318 delete $self->{_queue};
319}
320
321sub _error {
322 my ($self, $errno, $fatal) = @_;
323
324 $self->_shutdown
325 if $fatal;
326
327 $! = $errno;
328
329 if ($self->{on_error}) {
330 $self->{on_error}($self, $fatal);
331 } else {
332 Carp::croak "AnyEvent::Handle uncaught error: $!";
333 }
334}
335
336=item $fh = $handle->fh
337
338This method returns the file handle used to create the L<AnyEvent::Handle> object.
339
340=cut
341
342sub fh { $_[0]{fh} }
343
344=item $handle->on_error ($cb)
345
346Replace the current C<on_error> callback (see the C<on_error> constructor argument).
347
348=cut
349
350sub on_error {
351 $_[0]{on_error} = $_[1];
352}
353
354=item $handle->on_eof ($cb)
355
356Replace the current C<on_eof> callback (see the C<on_eof> constructor argument).
357
358=cut
359
360sub on_eof {
361 $_[0]{on_eof} = $_[1];
362}
363
364=item $handle->on_timeout ($cb)
365
366Replace the current C<on_timeout> callback, or disables the callback (but
367not the timeout) if C<$cb> = C<undef>. See the C<timeout> constructor
368argument and method.
369
370=cut
371
372sub on_timeout {
373 $_[0]{on_timeout} = $_[1];
374}
375
376=item $handle->autocork ($boolean)
377
378Enables or disables the current autocork behaviour (see C<autocork>
379constructor argument).
380
381=cut
382
383=item $handle->no_delay ($boolean)
384
385Enables or disables the C<no_delay> setting (see constructor argument of
386the same name for details).
387
388=cut
389
390sub no_delay {
391 $_[0]{no_delay} = $_[1];
392
393 eval {
394 local $SIG{__DIE__};
395 setsockopt $_[0]{fh}, &Socket::IPPROTO_TCP, &Socket::TCP_NODELAY, int $_[1];
396 };
397}
398
399#############################################################################
400
401=item $handle->timeout ($seconds)
402
403Configures (or disables) the inactivity timeout.
404
405=cut
406
407sub timeout {
408 my ($self, $timeout) = @_;
409
410 $self->{timeout} = $timeout;
411 $self->_timeout;
412}
413
414# reset the timeout watcher, as neccessary
415# also check for time-outs
416sub _timeout {
417 my ($self) = @_;
418
419 if ($self->{timeout}) {
420 my $NOW = AnyEvent->now;
421
422 # when would the timeout trigger?
423 my $after = $self->{_activity} + $self->{timeout} - $NOW;
424
425 # now or in the past already?
426 if ($after <= 0) {
427 $self->{_activity} = $NOW;
428
429 if ($self->{on_timeout}) {
430 $self->{on_timeout}($self);
431 } else {
432 $self->_error (&Errno::ETIMEDOUT);
433 }
434
435 # callback could have changed timeout value, optimise
436 return unless $self->{timeout};
437
438 # calculate new after
439 $after = $self->{timeout};
440 }
441
442 Scalar::Util::weaken $self;
443 return unless $self; # ->error could have destroyed $self
444
445 $self->{_tw} ||= AnyEvent->timer (after => $after, cb => sub {
446 delete $self->{_tw};
447 $self->_timeout;
448 });
449 } else {
450 delete $self->{_tw};
451 }
452}
453
454#############################################################################
455
456=back
457
458=head2 WRITE QUEUE
459
460AnyEvent::Handle manages two queues per handle, one for writing and one
461for reading.
462
463The write queue is very simple: you can add data to its end, and
464AnyEvent::Handle will automatically try to get rid of it for you.
465
466When data could be written and the write buffer is shorter then the low
467water mark, the C<on_drain> callback will be invoked.
468
469=over 4
470
471=item $handle->on_drain ($cb)
472
473Sets the C<on_drain> callback or clears it (see the description of
474C<on_drain> in the constructor).
475
476=cut
477
478sub on_drain {
479 my ($self, $cb) = @_;
480
481 $self->{on_drain} = $cb;
482
483 $cb->($self)
484 if $cb && $self->{low_water_mark} >= (length $self->{wbuf}) + (length $self->{_tls_wbuf});
485}
486
487=item $handle->push_write ($data)
488
489Queues the given scalar to be written. You can push as much data as you
490want (only limited by the available memory), as C<AnyEvent::Handle>
491buffers it independently of the kernel.
492
493=cut
494
495sub _drain_wbuf {
496 my ($self) = @_;
497
498 if (!$self->{_ww} && length $self->{wbuf}) {
499
500 Scalar::Util::weaken $self;
501
502 my $cb = sub {
503 my $len = syswrite $self->{fh}, $self->{wbuf};
504
505 if ($len >= 0) {
506 substr $self->{wbuf}, 0, $len, "";
507
508 $self->{_activity} = AnyEvent->now;
509
510 $self->{on_drain}($self)
511 if $self->{low_water_mark} >= (length $self->{wbuf}) + (length $self->{_tls_wbuf})
512 && $self->{on_drain};
513
514 delete $self->{_ww} unless length $self->{wbuf};
515 } elsif ($! != EAGAIN && $! != EINTR && $! != WSAEWOULDBLOCK) {
516 $self->_error ($!, 1);
517 }
518 };
519
520 # try to write data immediately
521 $cb->() unless $self->{autocork};
522
523 # if still data left in wbuf, we need to poll
524 $self->{_ww} = AnyEvent->io (fh => $self->{fh}, poll => "w", cb => $cb)
525 if length $self->{wbuf};
526 };
527}
528
529our %WH;
530
531sub register_write_type($$) {
532 $WH{$_[0]} = $_[1];
533}
534
535sub push_write {
536 my $self = shift;
537
538 if (@_ > 1) {
539 my $type = shift;
540
541 @_ = ($WH{$type} or Carp::croak "unsupported type passed to AnyEvent::Handle::push_write")
542 ->($self, @_);
543 }
544
545 if ($self->{tls}) {
546 $self->{_tls_wbuf} .= $_[0];
547
548 &_dotls ($self);
549 } else {
550 $self->{wbuf} .= $_[0];
551 $self->_drain_wbuf;
552 }
553}
554
555=item $handle->push_write (type => @args)
556
557Instead of formatting your data yourself, you can also let this module do
558the job by specifying a type and type-specific arguments.
559
560Predefined types are (if you have ideas for additional types, feel free to
561drop by and tell us):
562
563=over 4
564
565=item netstring => $string
566
567Formats the given value as netstring
568(http://cr.yp.to/proto/netstrings.txt, this is not a recommendation to use them).
569
570=cut
571
572register_write_type netstring => sub {
573 my ($self, $string) = @_;
574
575 (length $string) . ":$string,"
576};
577
578=item packstring => $format, $data
579
580An octet string prefixed with an encoded length. The encoding C<$format>
581uses the same format as a Perl C<pack> format, but must specify a single
582integer only (only one of C<cCsSlLqQiInNvVjJw> is allowed, plus an
583optional C<!>, C<< < >> or C<< > >> modifier).
584
585=cut
586
587register_write_type packstring => sub {
588 my ($self, $format, $string) = @_;
589
590 pack "$format/a*", $string
591};
592
593=item json => $array_or_hashref
594
595Encodes the given hash or array reference into a JSON object. Unless you
596provide your own JSON object, this means it will be encoded to JSON text
597in UTF-8.
598
599JSON objects (and arrays) are self-delimiting, so you can write JSON at
600one end of a handle and read them at the other end without using any
601additional framing.
602
603The generated JSON text is guaranteed not to contain any newlines: While
604this module doesn't need delimiters after or between JSON texts to be
605able to read them, many other languages depend on that.
606
607A simple RPC protocol that interoperates easily with others is to send
608JSON arrays (or objects, although arrays are usually the better choice as
609they mimic how function argument passing works) and a newline after each
610JSON text:
611
612 $handle->push_write (json => ["method", "arg1", "arg2"]); # whatever
613 $handle->push_write ("\012");
614
615An AnyEvent::Handle receiver would simply use the C<json> read type and
616rely on the fact that the newline will be skipped as leading whitespace:
617
618 $handle->push_read (json => sub { my $array = $_[1]; ... });
619
620Other languages could read single lines terminated by a newline and pass
621this line into their JSON decoder of choice.
622
623=cut
624
625register_write_type json => sub {
626 my ($self, $ref) = @_;
627
628 require JSON;
629
630 $self->{json} ? $self->{json}->encode ($ref)
631 : JSON::encode_json ($ref)
632};
633
634=item storable => $reference
635
636Freezes the given reference using L<Storable> and writes it to the
637handle. Uses the C<nfreeze> format.
638
639=cut
640
641register_write_type storable => sub {
642 my ($self, $ref) = @_;
643
644 require Storable;
645
646 pack "w/a*", Storable::nfreeze ($ref)
647};
648
649=back
650
651=item AnyEvent::Handle::register_write_type type => $coderef->($handle, @args)
652
653This function (not method) lets you add your own types to C<push_write>.
654Whenever the given C<type> is used, C<push_write> will invoke the code
655reference with the handle object and the remaining arguments.
656
657The code reference is supposed to return a single octet string that will
658be appended to the write buffer.
659
660Note that this is a function, and all types registered this way will be
661global, so try to use unique names.
662
663=cut
664
665#############################################################################
666
667=back
668
669=head2 READ QUEUE
670
671AnyEvent::Handle manages two queues per handle, one for writing and one
672for reading.
673
674The read queue is more complex than the write queue. It can be used in two
675ways, the "simple" way, using only C<on_read> and the "complex" way, using
676a queue.
677
678In the simple case, you just install an C<on_read> callback and whenever
679new data arrives, it will be called. You can then remove some data (if
680enough is there) from the read buffer (C<< $handle->rbuf >>). Or you cna
681leave the data there if you want to accumulate more (e.g. when only a
682partial message has been received so far).
683
684In the more complex case, you want to queue multiple callbacks. In this
685case, AnyEvent::Handle will call the first queued callback each time new
686data arrives (also the first time it is queued) and removes it when it has
687done its job (see C<push_read>, below).
688
689This way you can, for example, push three line-reads, followed by reading
690a chunk of data, and AnyEvent::Handle will execute them in order.
691
692Example 1: EPP protocol parser. EPP sends 4 byte length info, followed by
693the specified number of bytes which give an XML datagram.
694
695 # in the default state, expect some header bytes
696 $handle->on_read (sub {
697 # some data is here, now queue the length-header-read (4 octets)
698 shift->unshift_read (chunk => 4, sub {
699 # header arrived, decode
700 my $len = unpack "N", $_[1];
701
702 # now read the payload
703 shift->unshift_read (chunk => $len, sub {
704 my $xml = $_[1];
705 # handle xml
706 });
707 });
708 });
709
710Example 2: Implement a client for a protocol that replies either with "OK"
711and another line or "ERROR" for the first request that is sent, and 64
712bytes for the second request. Due to the availability of a queue, we can
713just pipeline sending both requests and manipulate the queue as necessary
714in the callbacks.
715
716When the first callback is called and sees an "OK" response, it will
717C<unshift> another line-read. This line-read will be queued I<before> the
71864-byte chunk callback.
719
720 # request one, returns either "OK + extra line" or "ERROR"
721 $handle->push_write ("request 1\015\012");
722
723 # we expect "ERROR" or "OK" as response, so push a line read
724 $handle->push_read (line => sub {
725 # if we got an "OK", we have to _prepend_ another line,
726 # so it will be read before the second request reads its 64 bytes
727 # which are already in the queue when this callback is called
728 # we don't do this in case we got an error
729 if ($_[1] eq "OK") {
49 on_readline => sub { 730 $_[0]->unshift_read (line => sub {
50 my ($ae_fh, @lines) = @_; 731 my $response = $_[1];
51 for (@lines) { 732 ...
52 chomp; 733 });
53 print "Line: $_"; 734 }
735 });
736
737 # request two, simply returns 64 octets
738 $handle->push_write ("request 2\015\012");
739
740 # simply read 64 bytes, always
741 $handle->push_read (chunk => 64, sub {
742 my $response = $_[1];
743 ...
744 });
745
746=over 4
747
748=cut
749
750sub _drain_rbuf {
751 my ($self) = @_;
752
753 local $self->{_in_drain} = 1;
754
755 if (
756 defined $self->{rbuf_max}
757 && $self->{rbuf_max} < length $self->{rbuf}
758 ) {
759 $self->_error (&Errno::ENOSPC, 1), return;
760 }
761
762 while () {
763 my $len = length $self->{rbuf};
764
765 if (my $cb = shift @{ $self->{_queue} }) {
766 unless ($cb->($self)) {
767 if ($self->{_eof}) {
768 # no progress can be made (not enough data and no data forthcoming)
769 $self->_error (&Errno::EPIPE, 1), return;
54 } 770 }
771
772 unshift @{ $self->{_queue} }, $cb;
773 last;
55 } 774 }
56 );
57
58 $cv->wait;
59
60=head1 DESCRIPTION
61
62This module is a helper module to make it easier to do non-blocking I/O
63on filehandles (and sockets, see L<AnyEvent::Socket>).
64
65The event loop is provided by L<AnyEvent>.
66
67=head1 METHODS
68
69=over 4
70
71=item B<new (%args)>
72
73The constructor has these arguments:
74
75=over 4
76
77=item fh => $filehandle
78
79The filehandle this L<AnyEvent::Handle> object will operate on.
80
81NOTE: The filehandle will be set to non-blocking.
82
83=item read_block_size => $size
84
85The default read block size use for reads via the C<on_read>
86method.
87
88=item on_read => $cb
89
90=item on_eof => $cb
91
92=item on_error => $cb
93
94These are shortcuts, that will call the corresponding method and set the callback to C<$cb>.
95
96=item on_readline => $cb
97
98The C<readlines> method is called with the default seperator and C<$cb> as callback
99for you.
100
101=back
102
103=cut
104
105sub new {
106 my $this = shift;
107 my $class = ref($this) || $this;
108 my $self = {
109 read_block_size => 4096,
110 rbuf => '',
111 @_
112 };
113 bless $self, $class;
114
115 $self->{fh}->blocking (0) if $self->{fh};
116
117 if ($self->{on_read}) {
118 $self->on_read ($self->{on_read});
119
120 } elsif ($self->{on_readline}) { 775 } elsif ($self->{on_read}) {
121 $self->readlines ($self->{on_readline}); 776 last unless $len;
122 777
778 $self->{on_read}($self);
779
780 if (
781 $len == length $self->{rbuf} # if no data has been consumed
782 && !@{ $self->{_queue} } # and the queue is still empty
783 && $self->{on_read} # but we still have on_read
784 ) {
785 # no further data will arrive
786 # so no progress can be made
787 $self->_error (&Errno::EPIPE, 1), return
788 if $self->{_eof};
789
790 last; # more data might arrive
791 }
792 } else {
793 # read side becomes idle
794 delete $self->{_rw} unless $self->{tls};
795 last;
796 }
797 }
798
123 } elsif ($self->{on_eof}) { 799 if ($self->{_eof}) {
124 $self->on_eof ($self->{on_eof});
125
126 } elsif ($self->{on_error}) { 800 if ($self->{on_eof}) {
127 $self->on_eof ($self->{on_error}); 801 $self->{on_eof}($self)
802 } else {
803 $self->_error (0, 1);
804 }
128 } 805 }
129 806
130 return $self 807 # may need to restart read watcher
808 unless ($self->{_rw}) {
809 $self->start_read
810 if $self->{on_read} || @{ $self->{_queue} };
811 }
131} 812}
132 813
133=item B<fh> 814=item $handle->on_read ($cb)
134 815
135This method returns the filehandle of the L<AnyEvent::Handle> object. 816This replaces the currently set C<on_read> callback, or clears it (when
136 817the new callback is C<undef>). See the description of C<on_read> in the
137=cut 818constructor.
138
139sub fh { $_[0]->{fh} }
140
141=item B<on_read ($callback)>
142
143This method installs a C<$callback> that will be called
144when new data arrived. You can access the read buffer via the C<rbuf>
145method (see below).
146
147The first argument of the C<$callback> will be the L<AnyEvent::Handle> object.
148 819
149=cut 820=cut
150 821
151sub on_read { 822sub on_read {
152 my ($self, $cb) = @_; 823 my ($self, $cb) = @_;
824
153 $self->{on_read} = $cb; 825 $self->{on_read} = $cb;
826 $self->_drain_rbuf if $cb && !$self->{_in_drain};
827}
154 828
155 unless (defined $self->{on_read}) { 829=item $handle->rbuf
156 delete $self->{on_read_w}; 830
831Returns the read buffer (as a modifiable lvalue).
832
833You can access the read buffer directly as the C<< ->{rbuf} >> member, if
834you want.
835
836NOTE: The read buffer should only be used or modified if the C<on_read>,
837C<push_read> or C<unshift_read> methods are used. The other read methods
838automatically manage the read buffer.
839
840=cut
841
842sub rbuf : lvalue {
843 $_[0]{rbuf}
844}
845
846=item $handle->push_read ($cb)
847
848=item $handle->unshift_read ($cb)
849
850Append the given callback to the end of the queue (C<push_read>) or
851prepend it (C<unshift_read>).
852
853The callback is called each time some additional read data arrives.
854
855It must check whether enough data is in the read buffer already.
856
857If not enough data is available, it must return the empty list or a false
858value, in which case it will be called repeatedly until enough data is
859available (or an error condition is detected).
860
861If enough data was available, then the callback must remove all data it is
862interested in (which can be none at all) and return a true value. After returning
863true, it will be removed from the queue.
864
865=cut
866
867our %RH;
868
869sub register_read_type($$) {
870 $RH{$_[0]} = $_[1];
871}
872
873sub push_read {
874 my $self = shift;
875 my $cb = pop;
876
877 if (@_) {
878 my $type = shift;
879
880 $cb = ($RH{$type} or Carp::croak "unsupported type passed to AnyEvent::Handle::push_read")
881 ->($self, $cb, @_);
882 }
883
884 push @{ $self->{_queue} }, $cb;
885 $self->_drain_rbuf unless $self->{_in_drain};
886}
887
888sub unshift_read {
889 my $self = shift;
890 my $cb = pop;
891
892 if (@_) {
893 my $type = shift;
894
895 $cb = ($RH{$type} or Carp::croak "unsupported type passed to AnyEvent::Handle::unshift_read")
896 ->($self, $cb, @_);
897 }
898
899
900 unshift @{ $self->{_queue} }, $cb;
901 $self->_drain_rbuf unless $self->{_in_drain};
902}
903
904=item $handle->push_read (type => @args, $cb)
905
906=item $handle->unshift_read (type => @args, $cb)
907
908Instead of providing a callback that parses the data itself you can chose
909between a number of predefined parsing formats, for chunks of data, lines
910etc.
911
912Predefined types are (if you have ideas for additional types, feel free to
913drop by and tell us):
914
915=over 4
916
917=item chunk => $octets, $cb->($handle, $data)
918
919Invoke the callback only once C<$octets> bytes have been read. Pass the
920data read to the callback. The callback will never be called with less
921data.
922
923Example: read 2 bytes.
924
925 $handle->push_read (chunk => 2, sub {
926 warn "yay ", unpack "H*", $_[1];
927 });
928
929=cut
930
931register_read_type chunk => sub {
932 my ($self, $cb, $len) = @_;
933
934 sub {
935 $len <= length $_[0]{rbuf} or return;
936 $cb->($_[0], substr $_[0]{rbuf}, 0, $len, "");
937 1
938 }
939};
940
941=item line => [$eol, ]$cb->($handle, $line, $eol)
942
943The callback will be called only once a full line (including the end of
944line marker, C<$eol>) has been read. This line (excluding the end of line
945marker) will be passed to the callback as second argument (C<$line>), and
946the end of line marker as the third argument (C<$eol>).
947
948The end of line marker, C<$eol>, can be either a string, in which case it
949will be interpreted as a fixed record end marker, or it can be a regex
950object (e.g. created by C<qr>), in which case it is interpreted as a
951regular expression.
952
953The end of line marker argument C<$eol> is optional, if it is missing (NOT
954undef), then C<qr|\015?\012|> is used (which is good for most internet
955protocols).
956
957Partial lines at the end of the stream will never be returned, as they are
958not marked by the end of line marker.
959
960=cut
961
962register_read_type line => sub {
963 my ($self, $cb, $eol) = @_;
964
965 if (@_ < 3) {
966 # this is more than twice as fast as the generic code below
967 sub {
968 $_[0]{rbuf} =~ s/^([^\015\012]*)(\015?\012)// or return;
969
970 $cb->($_[0], $1, $2);
971 1
972 }
973 } else {
974 $eol = quotemeta $eol unless ref $eol;
975 $eol = qr|^(.*?)($eol)|s;
976
977 sub {
978 $_[0]{rbuf} =~ s/$eol// or return;
979
980 $cb->($_[0], $1, $2);
981 1
982 }
983 }
984};
985
986=item regex => $accept[, $reject[, $skip], $cb->($handle, $data)
987
988Makes a regex match against the regex object C<$accept> and returns
989everything up to and including the match.
990
991Example: read a single line terminated by '\n'.
992
993 $handle->push_read (regex => qr<\n>, sub { ... });
994
995If C<$reject> is given and not undef, then it determines when the data is
996to be rejected: it is matched against the data when the C<$accept> regex
997does not match and generates an C<EBADMSG> error when it matches. This is
998useful to quickly reject wrong data (to avoid waiting for a timeout or a
999receive buffer overflow).
1000
1001Example: expect a single decimal number followed by whitespace, reject
1002anything else (not the use of an anchor).
1003
1004 $handle->push_read (regex => qr<^[0-9]+\s>, qr<[^0-9]>, sub { ... });
1005
1006If C<$skip> is given and not C<undef>, then it will be matched against
1007the receive buffer when neither C<$accept> nor C<$reject> match,
1008and everything preceding and including the match will be accepted
1009unconditionally. This is useful to skip large amounts of data that you
1010know cannot be matched, so that the C<$accept> or C<$reject> regex do not
1011have to start matching from the beginning. This is purely an optimisation
1012and is usually worth only when you expect more than a few kilobytes.
1013
1014Example: expect a http header, which ends at C<\015\012\015\012>. Since we
1015expect the header to be very large (it isn't in practise, but...), we use
1016a skip regex to skip initial portions. The skip regex is tricky in that
1017it only accepts something not ending in either \015 or \012, as these are
1018required for the accept regex.
1019
1020 $handle->push_read (regex =>
1021 qr<\015\012\015\012>,
1022 undef, # no reject
1023 qr<^.*[^\015\012]>,
1024 sub { ... });
1025
1026=cut
1027
1028register_read_type regex => sub {
1029 my ($self, $cb, $accept, $reject, $skip) = @_;
1030
1031 my $data;
1032 my $rbuf = \$self->{rbuf};
1033
1034 sub {
1035 # accept
1036 if ($$rbuf =~ $accept) {
1037 $data .= substr $$rbuf, 0, $+[0], "";
1038 $cb->($self, $data);
157 return; 1039 return 1;
1040 }
1041
1042 # reject
1043 if ($reject && $$rbuf =~ $reject) {
1044 $self->_error (&Errno::EBADMSG);
1045 }
1046
1047 # skip
1048 if ($skip && $$rbuf =~ $skip) {
1049 $data .= substr $$rbuf, 0, $+[0], "";
1050 }
1051
1052 ()
158 } 1053 }
159 1054};
160 $self->{on_read_w} = 1055
161 AnyEvent->io (poll => 'r', fh => $self->{fh}, cb => sub { 1056=item netstring => $cb->($handle, $string)
162 #d# warn "READ:[$self->{read_size}] $self->{read_block_size} : ".length ($self->{rbuf})."\n"; 1057
163 my $rbuf_len = length $self->{rbuf}; 1058A netstring (http://cr.yp.to/proto/netstrings.txt, this is not an endorsement).
164 my $l; 1059
165 if (defined $self->{read_size}) { 1060Throws an error with C<$!> set to EBADMSG on format violations.
166 $l = sysread $self->{fh}, $self->{rbuf}, 1061
167 ($self->{read_size} - $rbuf_len), $rbuf_len; 1062=cut
168 } else { 1063
169 $l = sysread $self->{fh}, $self->{rbuf}, $self->{read_block_size}, $rbuf_len; 1064register_read_type netstring => sub {
1065 my ($self, $cb) = @_;
1066
1067 sub {
1068 unless ($_[0]{rbuf} =~ s/^(0|[1-9][0-9]*)://) {
1069 if ($_[0]{rbuf} =~ /[^0-9]/) {
1070 $self->_error (&Errno::EBADMSG);
170 } 1071 }
171 #d# warn "READL $l [$self->{rbuf}]\n"; 1072 return;
1073 }
172 1074
173 if (not defined $l) { 1075 my $len = $1;
174 return if $! == EAGAIN || $! == EINTR;
175 $self->{on_error}->($self) if $self->{on_error};
176 delete $self->{on_read_w};
177 1076
178 } elsif ($l == 0) { 1077 $self->unshift_read (chunk => $len, sub {
179 $self->{on_eof}->($self) if $self->{on_eof}; 1078 my $string = $_[1];
180 delete $self->{on_read_w}; 1079 $_[0]->unshift_read (chunk => 1, sub {
181 1080 if ($_[1] eq ",") {
1081 $cb->($_[0], $string);
182 } else { 1082 } else {
183 $self->{on_read}->($self); 1083 $self->_error (&Errno::EBADMSG);
1084 }
1085 });
1086 });
1087
1088 1
1089 }
1090};
1091
1092=item packstring => $format, $cb->($handle, $string)
1093
1094An octet string prefixed with an encoded length. The encoding C<$format>
1095uses the same format as a Perl C<pack> format, but must specify a single
1096integer only (only one of C<cCsSlLqQiInNvVjJw> is allowed, plus an
1097optional C<!>, C<< < >> or C<< > >> modifier).
1098
1099For example, DNS over TCP uses a prefix of C<n> (2 octet network order),
1100EPP uses a prefix of C<N> (4 octtes).
1101
1102Example: read a block of data prefixed by its length in BER-encoded
1103format (very efficient).
1104
1105 $handle->push_read (packstring => "w", sub {
1106 my ($handle, $data) = @_;
1107 });
1108
1109=cut
1110
1111register_read_type packstring => sub {
1112 my ($self, $cb, $format) = @_;
1113
1114 sub {
1115 # when we can use 5.10 we can use ".", but for 5.8 we use the re-pack method
1116 defined (my $len = eval { unpack $format, $_[0]{rbuf} })
1117 or return;
1118
1119 $format = length pack $format, $len;
1120
1121 # bypass unshift if we already have the remaining chunk
1122 if ($format + $len <= length $_[0]{rbuf}) {
1123 my $data = substr $_[0]{rbuf}, $format, $len;
1124 substr $_[0]{rbuf}, 0, $format + $len, "";
1125 $cb->($_[0], $data);
1126 } else {
1127 # remove prefix
1128 substr $_[0]{rbuf}, 0, $format, "";
1129
1130 # read remaining chunk
1131 $_[0]->unshift_read (chunk => $len, $cb);
1132 }
1133
1134 1
1135 }
1136};
1137
1138=item json => $cb->($handle, $hash_or_arrayref)
1139
1140Reads a JSON object or array, decodes it and passes it to the callback.
1141
1142If a C<json> object was passed to the constructor, then that will be used
1143for the final decode, otherwise it will create a JSON coder expecting UTF-8.
1144
1145This read type uses the incremental parser available with JSON version
11462.09 (and JSON::XS version 2.2) and above. You have to provide a
1147dependency on your own: this module will load the JSON module, but
1148AnyEvent does not depend on it itself.
1149
1150Since JSON texts are fully self-delimiting, the C<json> read and write
1151types are an ideal simple RPC protocol: just exchange JSON datagrams. See
1152the C<json> write type description, above, for an actual example.
1153
1154=cut
1155
1156register_read_type json => sub {
1157 my ($self, $cb) = @_;
1158
1159 require JSON;
1160
1161 my $data;
1162 my $rbuf = \$self->{rbuf};
1163
1164 my $json = $self->{json} ||= JSON->new->utf8;
1165
1166 sub {
1167 my $ref = $json->incr_parse ($self->{rbuf});
1168
1169 if ($ref) {
1170 $self->{rbuf} = $json->incr_text;
1171 $json->incr_text = "";
1172 $cb->($self, $ref);
1173
1174 1
1175 } else {
1176 $self->{rbuf} = "";
1177 ()
1178 }
1179 }
1180};
1181
1182=item storable => $cb->($handle, $ref)
1183
1184Deserialises a L<Storable> frozen representation as written by the
1185C<storable> write type (BER-encoded length prefix followed by nfreeze'd
1186data).
1187
1188Raises C<EBADMSG> error if the data could not be decoded.
1189
1190=cut
1191
1192register_read_type storable => sub {
1193 my ($self, $cb) = @_;
1194
1195 require Storable;
1196
1197 sub {
1198 # when we can use 5.10 we can use ".", but for 5.8 we use the re-pack method
1199 defined (my $len = eval { unpack "w", $_[0]{rbuf} })
1200 or return;
1201
1202 my $format = length pack "w", $len;
1203
1204 # bypass unshift if we already have the remaining chunk
1205 if ($format + $len <= length $_[0]{rbuf}) {
1206 my $data = substr $_[0]{rbuf}, $format, $len;
1207 substr $_[0]{rbuf}, 0, $format + $len, "";
1208 $cb->($_[0], Storable::thaw ($data));
1209 } else {
1210 # remove prefix
1211 substr $_[0]{rbuf}, 0, $format, "";
1212
1213 # read remaining chunk
1214 $_[0]->unshift_read (chunk => $len, sub {
1215 if (my $ref = eval { Storable::thaw ($_[1]) }) {
1216 $cb->($_[0], $ref);
1217 } else {
1218 $self->_error (&Errno::EBADMSG);
1219 }
1220 });
1221 }
1222
1223 1
1224 }
1225};
1226
1227=back
1228
1229=item AnyEvent::Handle::register_read_type type => $coderef->($handle, $cb, @args)
1230
1231This function (not method) lets you add your own types to C<push_read>.
1232
1233Whenever the given C<type> is used, C<push_read> will invoke the code
1234reference with the handle object, the callback and the remaining
1235arguments.
1236
1237The code reference is supposed to return a callback (usually a closure)
1238that works as a plain read callback (see C<< ->push_read ($cb) >>).
1239
1240It should invoke the passed callback when it is done reading (remember to
1241pass C<$handle> as first argument as all other callbacks do that).
1242
1243Note that this is a function, and all types registered this way will be
1244global, so try to use unique names.
1245
1246For examples, see the source of this module (F<perldoc -m AnyEvent::Handle>,
1247search for C<register_read_type>)).
1248
1249=item $handle->stop_read
1250
1251=item $handle->start_read
1252
1253In rare cases you actually do not want to read anything from the
1254socket. In this case you can call C<stop_read>. Neither C<on_read> nor
1255any queued callbacks will be executed then. To start reading again, call
1256C<start_read>.
1257
1258Note that AnyEvent::Handle will automatically C<start_read> for you when
1259you change the C<on_read> callback or push/unshift a read callback, and it
1260will automatically C<stop_read> for you when neither C<on_read> is set nor
1261there are any read requests in the queue.
1262
1263These methods will have no effect when in TLS mode (as TLS doesn't support
1264half-duplex connections).
1265
1266=cut
1267
1268sub stop_read {
1269 my ($self) = @_;
1270
1271 delete $self->{_rw} unless $self->{tls};
1272}
1273
1274sub start_read {
1275 my ($self) = @_;
1276
1277 unless ($self->{_rw} || $self->{_eof}) {
1278 Scalar::Util::weaken $self;
1279
1280 $self->{_rw} = AnyEvent->io (fh => $self->{fh}, poll => "r", cb => sub {
1281 my $rbuf = \($self->{tls} ? my $buf : $self->{rbuf});
1282 my $len = sysread $self->{fh}, $$rbuf, $self->{read_size} || 8192, length $$rbuf;
1283
1284 if ($len > 0) {
1285 $self->{_activity} = AnyEvent->now;
1286
1287 if ($self->{tls}) {
1288 Net::SSLeay::BIO_write ($self->{_rbio}, $$rbuf);
1289
1290 &_dotls ($self);
1291 } else {
1292 $self->_drain_rbuf unless $self->{_in_drain};
1293 }
1294
1295 } elsif (defined $len) {
1296 delete $self->{_rw};
1297 $self->{_eof} = 1;
1298 $self->_drain_rbuf unless $self->{_in_drain};
1299
1300 } elsif ($! != EAGAIN && $! != EINTR && $! != WSAEWOULDBLOCK) {
1301 return $self->_error ($!, 1);
184 } 1302 }
185 }); 1303 });
1304 }
186} 1305}
187 1306
188=item B<on_error ($callback)> 1307# poll the write BIO and send the data if applicable
189 1308sub _dotls {
190Whenever a read or write operation resulted in an error the C<$callback>
191will be called.
192
193The first argument of C<$callback> will be the L<AnyEvent::Handle> object itself.
194The error is given as errno in C<$!>.
195
196=cut
197
198sub on_error {
199 $_[0]->{on_error} = $_[1];
200}
201
202=item B<on_eof ($callback)>
203
204Installs the C<$callback> that will be called when the end of file is
205encountered in a read operation this C<$callback> will be called. The first
206argument will be the L<AnyEvent::Handle> object itself.
207
208=cut
209
210sub on_eof {
211 $_[0]->{on_eof} = $_[1];
212}
213
214=item B<rbuf>
215
216Returns a reference to the read buffer.
217
218NOTE: The read buffer should only be used or modified if the C<on_read>
219method is used directly. The C<read> and C<readlines> methods will provide
220the read data to their callbacks.
221
222=cut
223
224sub rbuf : lvalue {
225 $_[0]->{rbuf}
226}
227
228=item B<read ($len, $callback)>
229
230Will read exactly C<$len> bytes from the filehandle and call the C<$callback>
231if done so. The first argument to the C<$callback> will be the L<AnyEvent::Handle>
232object itself and the second argument the read data.
233
234NOTE: This method will override any callbacks installed via the C<on_read> method.
235
236=cut
237
238sub read {
239 my ($self, $len, $cb) = @_; 1309 my ($self) = @_;
240 1310
241 $self->{read_cb} = $cb; 1311 my $tmp;
242 my $old_blk_size = $self->{read_block_size};
243 $self->{read_block_size} = $len;
244 1312
245 $self->on_read (sub { 1313 if (length $self->{_tls_wbuf}) {
246 #d# warn "OFOFO $len || ".length($_[0]->{rbuf})."||\n"; 1314 while (($tmp = Net::SSLeay::write ($self->{tls}, $self->{_tls_wbuf})) > 0) {
247 1315 substr $self->{_tls_wbuf}, 0, $tmp, "";
248 if ($len == length $_[0]->{rbuf}) {
249 $_[0]->{read_block_size} = $old_blk_size;
250 $_[0]->on_read (undef);
251 $_[0]->{read_cb}->($_[0], (substr $self->{rbuf}, 0, $len, ''));
252 } 1316 }
253 }); 1317 }
254}
255 1318
256=item B<readlines ($callback)> 1319 while (defined ($tmp = Net::SSLeay::read ($self->{tls}))) {
1320 unless (length $tmp) {
1321 # let's treat SSL-eof as we treat normal EOF
1322 delete $self->{_rw};
1323 $self->{_eof} = 1;
1324 &_freetls;
1325 }
257 1326
258=item B<readlines ($sep, $callback)> 1327 $self->{rbuf} .= $tmp;
1328 $self->_drain_rbuf unless $self->{_in_drain};
1329 $self->{tls} or return; # tls session might have gone away in callback
1330 }
259 1331
260This method will read lines from the filehandle, seperated by C<$sep> or C<"\n"> 1332 $tmp = Net::SSLeay::get_error ($self->{tls}, -1);
261if C<$sep> is not provided. C<$sep> will be used as part of a regex, so it can be
262a regex itself and won't be quoted!
263 1333
264The C<$callback> will be called when at least one 1334 if ($tmp != Net::SSLeay::ERROR_WANT_READ ()) {
265line could be read. The first argument to the C<$callback> will be the L<AnyEvent::Handle> 1335 if ($tmp == Net::SSLeay::ERROR_SYSCALL ()) {
266object itself and the rest of the arguments will be the read lines. 1336 return $self->_error ($!, 1);
1337 } elsif ($tmp == Net::SSLeay::ERROR_SSL ()) {
1338 return $self->_error (&Errno::EIO, 1);
1339 }
267 1340
268NOTE: This method will override any callbacks installed via the C<on_read> method. 1341 # all other errors are fine for our purposes
1342 }
269 1343
270=cut 1344 while (length ($tmp = Net::SSLeay::BIO_read ($self->{_wbio}))) {
1345 $self->{wbuf} .= $tmp;
1346 $self->_drain_wbuf;
1347 }
1348}
271 1349
272sub readlines { 1350=item $handle->starttls ($tls[, $tls_ctx])
1351
1352Instead of starting TLS negotiation immediately when the AnyEvent::Handle
1353object is created, you can also do that at a later time by calling
1354C<starttls>.
1355
1356The first argument is the same as the C<tls> constructor argument (either
1357C<"connect">, C<"accept"> or an existing Net::SSLeay object).
1358
1359The second argument is the optional C<Net::SSLeay::CTX> object that is
1360used when AnyEvent::Handle has to create its own TLS connection object.
1361
1362The TLS connection object will end up in C<< $handle->{tls} >> after this
1363call and can be used or changed to your liking. Note that the handshake
1364might have already started when this function returns.
1365
1366If it an error to start a TLS handshake more than once per
1367AnyEvent::Handle object (this is due to bugs in OpenSSL).
1368
1369=cut
1370
1371sub starttls {
273 my ($self, $NL, $cb) = @_; 1372 my ($self, $ssl, $ctx) = @_;
274 1373
275 if (ref $NL) { 1374 require Net::SSLeay;
276 $cb = $NL; 1375
277 $NL = "\n"; 1376 Carp::croak "it is an error to call starttls more than once on an Anyevent::Handle object"
1377 if $self->{tls};
278 } 1378
279 1379 if ($ssl eq "accept") {
280 $self->{on_readline} = $cb; 1380 $ssl = Net::SSLeay::new ($ctx || TLS_CTX ());
281 1381 Net::SSLeay::set_accept_state ($ssl);
282 $self->on_read (sub { 1382 } elsif ($ssl eq "connect") {
283 my @lines; 1383 $ssl = Net::SSLeay::new ($ctx || TLS_CTX ());
284 push @lines, $1 while $_[0]->{rbuf} =~ s/(.*)$NL//; 1384 Net::SSLeay::set_connect_state ($ssl);
285 $self->{on_readline}->($_[0], @lines);
286 }); 1385 }
287}
288 1386
289=item B<write ($data)> 1387 $self->{tls} = $ssl;
290 1388
291=item B<write ($callback)> 1389 # basically, this is deep magic (because SSL_read should have the same issues)
1390 # but the openssl maintainers basically said: "trust us, it just works".
1391 # (unfortunately, we have to hardcode constants because the abysmally misdesigned
1392 # and mismaintained ssleay-module doesn't even offer them).
1393 # http://www.mail-archive.com/openssl-dev@openssl.org/msg22420.html
1394 #
1395 # in short: this is a mess.
1396 #
1397 # note that we do not try to keep the length constant between writes as we are required to do.
1398 # we assume that most (but not all) of this insanity only applies to non-blocking cases,
1399 # and we drive openssl fully in blocking mode here. Or maybe we don't - openssl seems to
1400 # have identity issues in that area.
1401 Net::SSLeay::CTX_set_mode ($self->{tls},
1402 (eval { local $SIG{__DIE__}; Net::SSLeay::MODE_ENABLE_PARTIAL_WRITE () } || 1)
1403 | (eval { local $SIG{__DIE__}; Net::SSLeay::MODE_ACCEPT_MOVING_WRITE_BUFFER () } || 2));
292 1404
293=item B<write ($data, $callback)> 1405 $self->{_rbio} = Net::SSLeay::BIO_new (Net::SSLeay::BIO_s_mem ());
1406 $self->{_wbio} = Net::SSLeay::BIO_new (Net::SSLeay::BIO_s_mem ());
294 1407
295This method will write C<$data> to the filehandle and call the C<$callback> 1408 Net::SSLeay::set_bio ($ssl, $self->{_rbio}, $self->{_wbio});
296afterwards. If only C<$callback> is provided it will be called when the
297write buffer becomes empty the next time (or immediately if it already is empty).
298 1409
299=cut 1410 &_dotls; # need to trigger the initial handshake
300 1411 $self->start_read; # make sure we actually do read
301sub write {
302 my ($self, $data, $cb) = @_;
303 if (ref $data) { $cb = $data; undef $data }
304 push @{$self->{write_bufs}}, [$data, $cb];
305 $self->_check_writer;
306} 1412}
307 1413
308sub _check_writer { 1414=item $handle->stoptls
1415
1416Shuts down the SSL connection - this makes a proper EOF handshake by
1417sending a close notify to the other side, but since OpenSSL doesn't
1418support non-blocking shut downs, it is not possible to re-use the stream
1419afterwards.
1420
1421=cut
1422
1423sub stoptls {
309 my ($self) = @_; 1424 my ($self) = @_;
310 1425
311 if ($self->{write_w}) { 1426 if ($self->{tls}) {
312 unless ($self->{write_cb}) { 1427 Net::SSLeay::shutdown ($self->{tls});
313 while (@{$self->{write_bufs}} && not defined $self->{write_bufs}->[0]->[1]) { 1428
314 my $wba = shift @{$self->{write_bufs}}; 1429 &_dotls;
315 $self->{wbuf} .= $wba->[0]; 1430
316 } 1431 # we don't give a shit. no, we do, but we can't. no...
317 } 1432 # we, we... have to use openssl :/
318 return; 1433 &_freetls;
1434 }
1435}
1436
1437sub _freetls {
1438 my ($self) = @_;
1439
1440 return unless $self->{tls};
1441
1442 Net::SSLeay::free (delete $self->{tls});
319 } 1443
1444 delete @$self{qw(_rbio _wbio _tls_wbuf)};
1445}
320 1446
321 my $wba = shift @{$self->{write_bufs}} 1447sub DESTROY {
322 or return; 1448 my $self = shift;
323 1449
324 unless (defined $wba->[0]) { 1450 &_freetls;
325 $wba->[1]->($self) if $wba->[1];
326 $self->_check_writer;
327 return;
328 }
329 1451
330 $self->{wbuf} = $wba->[0]; 1452 my $linger = exists $self->{linger} ? $self->{linger} : 3600;
331 $self->{write_cb} = $wba->[1];
332 1453
333 $self->{write_w} = 1454 if ($linger && length $self->{wbuf}) {
334 AnyEvent->io (poll => 'w', fh => $self->{fh}, cb => sub { 1455 my $fh = delete $self->{fh};
1456 my $wbuf = delete $self->{wbuf};
1457
1458 my @linger;
1459
1460 push @linger, AnyEvent->io (fh => $fh, poll => "w", cb => sub {
335 my $l = syswrite $self->{fh}, $self->{wbuf}, length $self->{wbuf}; 1461 my $len = syswrite $fh, $wbuf, length $wbuf;
336 1462
337 if (not defined $l) { 1463 if ($len > 0) {
338 return if $! == EAGAIN || $! == EINTR; 1464 substr $wbuf, 0, $len, "";
339 delete $self->{write_w};
340 $self->{on_error}->($self) if $self->{on_error};
341
342 } else { 1465 } else {
343 substr $self->{wbuf}, 0, $l, ''; 1466 @linger = (); # end
344
345 if (length ($self->{wbuf}) == 0) {
346 $self->{write_cb}->($self) if $self->{write_cb};
347
348 delete $self->{write_w};
349 delete $self->{wbuf};
350 delete $self->{write_cb};
351
352 $self->_check_writer;
353 }
354 } 1467 }
355 }); 1468 });
1469 push @linger, AnyEvent->timer (after => $linger, cb => sub {
1470 @linger = ();
1471 });
1472 }
1473}
1474
1475=item AnyEvent::Handle::TLS_CTX
1476
1477This function creates and returns the Net::SSLeay::CTX object used by
1478default for TLS mode.
1479
1480The context is created like this:
1481
1482 Net::SSLeay::load_error_strings;
1483 Net::SSLeay::SSLeay_add_ssl_algorithms;
1484 Net::SSLeay::randomize;
1485
1486 my $CTX = Net::SSLeay::CTX_new;
1487
1488 Net::SSLeay::CTX_set_options $CTX, Net::SSLeay::OP_ALL
1489
1490=cut
1491
1492our $TLS_CTX;
1493
1494sub TLS_CTX() {
1495 $TLS_CTX || do {
1496 require Net::SSLeay;
1497
1498 Net::SSLeay::load_error_strings ();
1499 Net::SSLeay::SSLeay_add_ssl_algorithms ();
1500 Net::SSLeay::randomize ();
1501
1502 $TLS_CTX = Net::SSLeay::CTX_new ();
1503
1504 Net::SSLeay::CTX_set_options ($TLS_CTX, Net::SSLeay::OP_ALL ());
1505
1506 $TLS_CTX
1507 }
356} 1508}
357 1509
358=back 1510=back
359 1511
1512
1513=head1 NONFREQUENTLY ASKED QUESTIONS
1514
1515=over 4
1516
1517=item How do I read data until the other side closes the connection?
1518
1519If you just want to read your data into a perl scalar, the easiest way
1520to achieve this is by setting an C<on_read> callback that does nothing,
1521clearing the C<on_eof> callback and in the C<on_error> callback, the data
1522will be in C<$_[0]{rbuf}>:
1523
1524 $handle->on_read (sub { });
1525 $handle->on_eof (undef);
1526 $handle->on_error (sub {
1527 my $data = delete $_[0]{rbuf};
1528 undef $handle;
1529 });
1530
1531The reason to use C<on_error> is that TCP connections, due to latencies
1532and packets loss, might get closed quite violently with an error, when in
1533fact, all data has been received.
1534
1535It is usually better to use acknowledgements when transfering data,
1536to make sure the other side hasn't just died and you got the data
1537intact. This is also one reason why so many internet protocols have an
1538explicit QUIT command.
1539
1540
1541=item I don't want to destroy the handle too early - how do I wait until
1542all data has been written?
1543
1544After writing your last bits of data, set the C<on_drain> callback
1545and destroy the handle in there - with the default setting of
1546C<low_water_mark> this will be called precisely when all data has been
1547written to the socket:
1548
1549 $handle->push_write (...);
1550 $handle->on_drain (sub {
1551 warn "all data submitted to the kernel\n";
1552 undef $handle;
1553 });
1554
1555=back
1556
1557
1558=head1 SUBCLASSING AnyEvent::Handle
1559
1560In many cases, you might want to subclass AnyEvent::Handle.
1561
1562To make this easier, a given version of AnyEvent::Handle uses these
1563conventions:
1564
1565=over 4
1566
1567=item * all constructor arguments become object members.
1568
1569At least initially, when you pass a C<tls>-argument to the constructor it
1570will end up in C<< $handle->{tls} >>. Those members might be changed or
1571mutated later on (for example C<tls> will hold the TLS connection object).
1572
1573=item * other object member names are prefixed with an C<_>.
1574
1575All object members not explicitly documented (internal use) are prefixed
1576with an underscore character, so the remaining non-C<_>-namespace is free
1577for use for subclasses.
1578
1579=item * all members not documented here and not prefixed with an underscore
1580are free to use in subclasses.
1581
1582Of course, new versions of AnyEvent::Handle may introduce more "public"
1583member variables, but thats just life, at least it is documented.
1584
1585=back
1586
360=head1 AUTHOR 1587=head1 AUTHOR
361 1588
362Robin Redeker, C<< <elmex at ta-sa.org> >> 1589Robin Redeker C<< <elmex at ta-sa.org> >>, Marc Lehmann <schmorp@schmorp.de>.
363 1590
364=cut 1591=cut
365 1592
3661; # End of AnyEvent::Handle 15931; # End of AnyEvent::Handle

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines