ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent/lib/AnyEvent/Handle.pm
(Generate patch)

Comparing AnyEvent/lib/AnyEvent/Handle.pm (file contents):
Revision 1.159 by root, Fri Jul 24 12:35:58 2009 UTC vs.
Revision 1.197 by root, Tue Aug 31 00:59:55 2010 UTC

1package AnyEvent::Handle;
2
3use Scalar::Util ();
4use Carp ();
5use Errno qw(EAGAIN EINTR);
6
7use AnyEvent (); BEGIN { AnyEvent::common_sense }
8use AnyEvent::Util qw(WSAEWOULDBLOCK);
9
10=head1 NAME 1=head1 NAME
11 2
12AnyEvent::Handle - non-blocking I/O on file handles via AnyEvent 3AnyEvent::Handle - non-blocking I/O on streaming handles via AnyEvent
13
14=cut
15
16our $VERSION = 4.86;
17 4
18=head1 SYNOPSIS 5=head1 SYNOPSIS
19 6
20 use AnyEvent; 7 use AnyEvent;
21 use AnyEvent::Handle; 8 use AnyEvent::Handle;
27 on_error => sub { 14 on_error => sub {
28 my ($hdl, $fatal, $msg) = @_; 15 my ($hdl, $fatal, $msg) = @_;
29 warn "got error $msg\n"; 16 warn "got error $msg\n";
30 $hdl->destroy; 17 $hdl->destroy;
31 $cv->send; 18 $cv->send;
32 ); 19 };
33 20
34 # send some request line 21 # send some request line
35 $hdl->push_write ("getinfo\015\012"); 22 $hdl->push_write ("getinfo\015\012");
36 23
37 # read the response line 24 # read the response line
44 $cv->recv; 31 $cv->recv;
45 32
46=head1 DESCRIPTION 33=head1 DESCRIPTION
47 34
48This module is a helper module to make it easier to do event-based I/O on 35This module is a helper module to make it easier to do event-based I/O on
49filehandles. 36stream-based filehandles (sockets, pipes or other stream things).
50 37
51The L<AnyEvent::Intro> tutorial contains some well-documented 38The L<AnyEvent::Intro> tutorial contains some well-documented
52AnyEvent::Handle examples. 39AnyEvent::Handle examples.
53 40
54In the following, when the documentation refers to of "bytes" then this 41In the following, when the documentation refers to of "bytes" then this
59C<on_error> callback. 46C<on_error> callback.
60 47
61All callbacks will be invoked with the handle object as their first 48All callbacks will be invoked with the handle object as their first
62argument. 49argument.
63 50
51=cut
52
53package AnyEvent::Handle;
54
55use Scalar::Util ();
56use List::Util ();
57use Carp ();
58use Errno qw(EAGAIN EINTR);
59
60use AnyEvent (); BEGIN { AnyEvent::common_sense }
61use AnyEvent::Util qw(WSAEWOULDBLOCK);
62
63our $VERSION = $AnyEvent::VERSION;
64
65sub _load_func($) {
66 my $func = $_[0];
67
68 unless (defined &$func) {
69 my $pkg = $func;
70 do {
71 $pkg =~ s/::[^:]+$//
72 or return;
73 eval "require $pkg";
74 } until defined &$func;
75 }
76
77 \&$func
78}
79
64=head1 METHODS 80=head1 METHODS
65 81
66=over 4 82=over 4
67 83
68=item $handle = B<new> AnyEvent::TLS fh => $filehandle, key => value... 84=item $handle = B<new> AnyEvent::Handle fh => $filehandle, key => value...
69 85
70The constructor supports these arguments (all as C<< key => value >> pairs). 86The constructor supports these arguments (all as C<< key => value >> pairs).
71 87
72=over 4 88=over 4
73 89
83Try to connect to the specified host and service (port), using 99Try to connect to the specified host and service (port), using
84C<AnyEvent::Socket::tcp_connect>. The C<$host> additionally becomes the 100C<AnyEvent::Socket::tcp_connect>. The C<$host> additionally becomes the
85default C<peername>. 101default C<peername>.
86 102
87You have to specify either this parameter, or C<fh>, above. 103You have to specify either this parameter, or C<fh>, above.
104
105It is possible to push requests on the read and write queues, and modify
106properties of the stream, even while AnyEvent::Handle is connecting.
88 107
89When this parameter is specified, then the C<on_prepare>, 108When this parameter is specified, then the C<on_prepare>,
90C<on_connect_error> and C<on_connect> callbacks will be called under the 109C<on_connect_error> and C<on_connect> callbacks will be called under the
91appropriate circumstances: 110appropriate circumstances:
92 111
98attempted, but after the file handle has been created. It could be used to 117attempted, but after the file handle has been created. It could be used to
99prepare the file handle with parameters required for the actual connect 118prepare the file handle with parameters required for the actual connect
100(as opposed to settings that can be changed when the connection is already 119(as opposed to settings that can be changed when the connection is already
101established). 120established).
102 121
122The return value of this callback should be the connect timeout value in
123seconds (or C<0>, or C<undef>, or the empty list, to indicate the default
124timeout is to be used).
125
103=item on_connect => $cb->($handle, $host, $port, $retry->()) 126=item on_connect => $cb->($handle, $host, $port, $retry->())
104 127
105This callback is called when a connection has been successfully established. 128This callback is called when a connection has been successfully established.
106 129
107The actual numeric host and port (the socket peername) are passed as 130The actual numeric host and port (the socket peername) are passed as
108parameters, together with a retry callback. 131parameters, together with a retry callback.
109 132
110When, for some reason, the handle is not acceptable, then calling 133When, for some reason, the handle is not acceptable, then calling
111C<$retry> will continue with the next conenction target (in case of 134C<$retry> will continue with the next connection target (in case of
112multi-homed hosts or SRV records there can be multiple connection 135multi-homed hosts or SRV records there can be multiple connection
113endpoints). When it is called then the read and write queues, eof status, 136endpoints). At the time it is called the read and write queues, eof
114tls status and similar properties of the handle are being reset. 137status, tls status and similar properties of the handle will have been
138reset.
115 139
116In most cases, ignoring the C<$retry> parameter is the way to go. 140In most cases, ignoring the C<$retry> parameter is the way to go.
117 141
118=item on_connect_error => $cb->($handle, $message) 142=item on_connect_error => $cb->($handle, $message)
119 143
120This callback is called when the conenction could not be 144This callback is called when the connection could not be
121established. C<$!> will contain the relevant error code, and C<$message> a 145established. C<$!> will contain the relevant error code, and C<$message> a
122message describing it (usually the same as C<"$!">). 146message describing it (usually the same as C<"$!">).
123 147
124If this callback isn't specified, then C<on_error> will be called with a 148If this callback isn't specified, then C<on_error> will be called with a
125fatal error instead. 149fatal error instead.
168To access (and remove data from) the read buffer, use the C<< ->rbuf >> 192To access (and remove data from) the read buffer, use the C<< ->rbuf >>
169method or access the C<< $handle->{rbuf} >> member directly. Note that you 193method or access the C<< $handle->{rbuf} >> member directly. Note that you
170must not enlarge or modify the read buffer, you can only remove data at 194must not enlarge or modify the read buffer, you can only remove data at
171the beginning from it. 195the beginning from it.
172 196
197You can also call C<< ->push_read (...) >> or any other function that
198modifies the read queue. Or do both. Or ...
199
173When an EOF condition is detected then AnyEvent::Handle will first try to 200When an EOF condition is detected then AnyEvent::Handle will first try to
174feed all the remaining data to the queued callbacks and C<on_read> before 201feed all the remaining data to the queued callbacks and C<on_read> before
175calling the C<on_eof> callback. If no progress can be made, then a fatal 202calling the C<on_eof> callback. If no progress can be made, then a fatal
176error will be raised (with C<$!> set to C<EPIPE>). 203error will be raised (with C<$!> set to C<EPIPE>).
177 204
209memory and push it into the queue, but instead only read more data from 236memory and push it into the queue, but instead only read more data from
210the file when the write queue becomes empty. 237the file when the write queue becomes empty.
211 238
212=item timeout => $fractional_seconds 239=item timeout => $fractional_seconds
213 240
241=item rtimeout => $fractional_seconds
242
243=item wtimeout => $fractional_seconds
244
214If non-zero, then this enables an "inactivity" timeout: whenever this many 245If non-zero, then these enables an "inactivity" timeout: whenever this
215seconds pass without a successful read or write on the underlying file 246many seconds pass without a successful read or write on the underlying
216handle, the C<on_timeout> callback will be invoked (and if that one is 247file handle (or a call to C<timeout_reset>), the C<on_timeout> callback
217missing, a non-fatal C<ETIMEDOUT> error will be raised). 248will be invoked (and if that one is missing, a non-fatal C<ETIMEDOUT>
249error will be raised).
250
251There are three variants of the timeouts that work fully independent
252of each other, for both read and write, just read, and just write:
253C<timeout>, C<rtimeout> and C<wtimeout>, with corresponding callbacks
254C<on_timeout>, C<on_rtimeout> and C<on_wtimeout>, and reset functions
255C<timeout_reset>, C<rtimeout_reset>, and C<wtimeout_reset>.
218 256
219Note that timeout processing is also active when you currently do not have 257Note that timeout processing is also active when you currently do not have
220any outstanding read or write requests: If you plan to keep the connection 258any outstanding read or write requests: If you plan to keep the connection
221idle then you should disable the timout temporarily or ignore the timeout 259idle then you should disable the timout temporarily or ignore the timeout
222in the C<on_timeout> callback, in which case AnyEvent::Handle will simply 260in the C<on_timeout> callback, in which case AnyEvent::Handle will simply
266accomplishd by setting this option to a true value. 304accomplishd by setting this option to a true value.
267 305
268The default is your opertaing system's default behaviour (most likely 306The default is your opertaing system's default behaviour (most likely
269enabled), this option explicitly enables or disables it, if possible. 307enabled), this option explicitly enables or disables it, if possible.
270 308
309=item keepalive => <boolean>
310
311Enables (default disable) the SO_KEEPALIVE option on the stream socket:
312normally, TCP connections have no time-out once established, so TCP
313connections, once established, can stay alive forever even when the other
314side has long gone. TCP keepalives are a cheap way to take down long-lived
315TCP connections whent he other side becomes unreachable. While the default
316is OS-dependent, TCP keepalives usually kick in after around two hours,
317and, if the other side doesn't reply, take down the TCP connection some 10
318to 15 minutes later.
319
320It is harmless to specify this option for file handles that do not support
321keepalives, and enabling it on connections that are potentially long-lived
322is usually a good idea.
323
324=item oobinline => <boolean>
325
326BSD majorly fucked up the implementation of TCP urgent data. The result
327is that almost no OS implements TCP according to the specs, and every OS
328implements it slightly differently.
329
330If you want to handle TCP urgent data, then setting this flag (the default
331is enabled) gives you the most portable way of getting urgent data, by
332putting it into the stream.
333
334Since BSD emulation of OOB data on top of TCP's urgent data can have
335security implications, AnyEvent::Handle sets this flag automatically
336unless explicitly specified. Note that setting this flag after
337establishing a connection I<may> be a bit too late (data loss could
338already have occured on BSD systems), but at least it will protect you
339from most attacks.
340
271=item read_size => <bytes> 341=item read_size => <bytes>
272 342
273The default read block size (the amount of bytes this module will 343The default read block size (the amount of bytes this module will
274try to read during each loop iteration, which affects memory 344try to read during each loop iteration, which affects memory
275requirements). Default: C<8192>. 345requirements). Default: C<8192>.
308C<undef>. 378C<undef>.
309 379
310=item tls => "accept" | "connect" | Net::SSLeay::SSL object 380=item tls => "accept" | "connect" | Net::SSLeay::SSL object
311 381
312When this parameter is given, it enables TLS (SSL) mode, that means 382When this parameter is given, it enables TLS (SSL) mode, that means
313AnyEvent will start a TLS handshake as soon as the conenction has been 383AnyEvent will start a TLS handshake as soon as the connection has been
314established and will transparently encrypt/decrypt data afterwards. 384established and will transparently encrypt/decrypt data afterwards.
315 385
316All TLS protocol errors will be signalled as C<EPROTO>, with an 386All TLS protocol errors will be signalled as C<EPROTO>, with an
317appropriate error message. 387appropriate error message.
318 388
431 delete $self->{_skip_drain_rbuf}; 501 delete $self->{_skip_drain_rbuf};
432 $self->_start; 502 $self->_start;
433 503
434 $self->{on_connect} 504 $self->{on_connect}
435 and $self->{on_connect}($self, $host, $port, sub { 505 and $self->{on_connect}($self, $host, $port, sub {
436 delete @$self{qw(fh _tw _ww _rw _eof _queue rbuf _wbuf tls _tls_rbuf _tls_wbuf)}; 506 delete @$self{qw(fh _tw _rtw _wtw _ww _rw _eof _queue rbuf _wbuf tls _tls_rbuf _tls_wbuf)};
437 $self->{_skip_drain_rbuf} = 1; 507 $self->{_skip_drain_rbuf} = 1;
438 &$retry; 508 &$retry;
439 }); 509 });
440 510
441 } else { 511 } else {
442 if ($self->{on_connect_error}) { 512 if ($self->{on_connect_error}) {
443 $self->{on_connect_error}($self, "$!"); 513 $self->{on_connect_error}($self, "$!");
444 $self->destroy; 514 $self->destroy;
445 } else { 515 } else {
446 $self->fatal ($!, 1); 516 $self->_error ($!, 1);
447 } 517 }
448 } 518 }
449 }, 519 },
450 sub { 520 sub {
451 local $self->{fh} = $_[0]; 521 local $self->{fh} = $_[0];
452 522
523 $self->{on_prepare}
453 $self->{on_prepare}->($self) 524 ? $self->{on_prepare}->($self)
454 if $self->{on_prepare}; 525 : ()
455 } 526 }
456 ); 527 );
457 } 528 }
458 529
459 } else { 530 } else {
464} 535}
465 536
466sub _start { 537sub _start {
467 my ($self) = @_; 538 my ($self) = @_;
468 539
540 # too many clueless people try to use udp and similar sockets
541 # with AnyEvent::Handle, do them a favour.
542 my $type = getsockopt $self->{fh}, Socket::SOL_SOCKET (), Socket::SO_TYPE ();
543 Carp::croak "AnyEvent::Handle: only stream sockets supported, anything else will NOT work!"
544 if Socket::SOCK_STREAM () != (unpack "I", $type) && defined $type;
545
469 AnyEvent::Util::fh_nonblocking $self->{fh}, 1; 546 AnyEvent::Util::fh_nonblocking $self->{fh}, 1;
470 547
548 $self->{_activity} =
549 $self->{_ractivity} =
471 $self->{_activity} = AnyEvent->now; 550 $self->{_wactivity} = AE::now;
472 $self->_timeout;
473 551
552 $self->timeout (delete $self->{timeout} ) if $self->{timeout};
553 $self->rtimeout (delete $self->{rtimeout} ) if $self->{rtimeout};
554 $self->wtimeout (delete $self->{wtimeout} ) if $self->{wtimeout};
555
474 $self->no_delay (delete $self->{no_delay}) if exists $self->{no_delay}; 556 $self->no_delay (delete $self->{no_delay} ) if exists $self->{no_delay} && $self->{no_delay};
557 $self->keepalive (delete $self->{keepalive}) if exists $self->{keepalive} && $self->{keepalive};
475 558
559 $self->oobinline (exists $self->{oobinline} ? delete $self->{oobinline} : 1);
560
476 $self->starttls (delete $self->{tls}, delete $self->{tls_ctx}) 561 $self->starttls (delete $self->{tls}, delete $self->{tls_ctx})
477 if $self->{tls}; 562 if $self->{tls};
478 563
479 $self->on_drain (delete $self->{on_drain}) if $self->{on_drain}; 564 $self->on_drain (delete $self->{on_drain}) if $self->{on_drain};
480 565
481 $self->start_read 566 $self->start_read
482 if $self->{on_read} || @{ $self->{_queue} }; 567 if $self->{on_read} || @{ $self->{_queue} };
483}
484 568
485#sub _shutdown { 569 $self->_drain_wbuf;
486# my ($self) = @_; 570}
487#
488# delete @$self{qw(_tw _rw _ww fh wbuf on_read _queue)};
489# $self->{_eof} = 1; # tell starttls et. al to stop trying
490#
491# &_freetls;
492#}
493 571
494sub _error { 572sub _error {
495 my ($self, $errno, $fatal, $message) = @_; 573 my ($self, $errno, $fatal, $message) = @_;
496 574
497 $! = $errno; 575 $! = $errno;
498 $message ||= "$!"; 576 $message ||= "$!";
499 577
500 if ($self->{on_error}) { 578 if ($self->{on_error}) {
501 $self->{on_error}($self, $fatal, $message); 579 $self->{on_error}($self, $fatal, $message);
502 $self->destroy if $fatal; 580 $self->destroy if $fatal;
503 } elsif ($self->{fh}) { 581 } elsif ($self->{fh} || $self->{connect}) {
504 $self->destroy; 582 $self->destroy;
505 Carp::croak "AnyEvent::Handle uncaught error: $message"; 583 Carp::croak "AnyEvent::Handle uncaught error: $message";
506 } 584 }
507} 585}
508 586
534 $_[0]{on_eof} = $_[1]; 612 $_[0]{on_eof} = $_[1];
535} 613}
536 614
537=item $handle->on_timeout ($cb) 615=item $handle->on_timeout ($cb)
538 616
539Replace the current C<on_timeout> callback, or disables the callback (but 617=item $handle->on_rtimeout ($cb)
540not the timeout) if C<$cb> = C<undef>. See the C<timeout> constructor
541argument and method.
542 618
543=cut 619=item $handle->on_wtimeout ($cb)
544 620
545sub on_timeout { 621Replace the current C<on_timeout>, C<on_rtimeout> or C<on_wtimeout>
546 $_[0]{on_timeout} = $_[1]; 622callback, or disables the callback (but not the timeout) if C<$cb> =
547} 623C<undef>. See the C<timeout> constructor argument and method.
624
625=cut
626
627# see below
548 628
549=item $handle->autocork ($boolean) 629=item $handle->autocork ($boolean)
550 630
551Enables or disables the current autocork behaviour (see C<autocork> 631Enables or disables the current autocork behaviour (see C<autocork>
552constructor argument). Changes will only take effect on the next write. 632constructor argument). Changes will only take effect on the next write.
567sub no_delay { 647sub no_delay {
568 $_[0]{no_delay} = $_[1]; 648 $_[0]{no_delay} = $_[1];
569 649
570 eval { 650 eval {
571 local $SIG{__DIE__}; 651 local $SIG{__DIE__};
572 setsockopt $_[0]{fh}, &Socket::IPPROTO_TCP, &Socket::TCP_NODELAY, int $_[1] 652 setsockopt $_[0]{fh}, Socket::IPPROTO_TCP (), Socket::TCP_NODELAY (), int $_[1]
573 if $_[0]{fh}; 653 if $_[0]{fh};
574 }; 654 };
575} 655}
576 656
657=item $handle->keepalive ($boolean)
658
659Enables or disables the C<keepalive> setting (see constructor argument of
660the same name for details).
661
662=cut
663
664sub keepalive {
665 $_[0]{keepalive} = $_[1];
666
667 eval {
668 local $SIG{__DIE__};
669 setsockopt $_[0]{fh}, Socket::SOL_SOCKET (), Socket::SO_KEEPALIVE (), int $_[1]
670 if $_[0]{fh};
671 };
672}
673
674=item $handle->oobinline ($boolean)
675
676Enables or disables the C<oobinline> setting (see constructor argument of
677the same name for details).
678
679=cut
680
681sub oobinline {
682 $_[0]{oobinline} = $_[1];
683
684 eval {
685 local $SIG{__DIE__};
686 setsockopt $_[0]{fh}, Socket::SOL_SOCKET (), Socket::SO_OOBINLINE (), int $_[1]
687 if $_[0]{fh};
688 };
689}
690
691=item $handle->keepalive ($boolean)
692
693Enables or disables the C<keepalive> setting (see constructor argument of
694the same name for details).
695
696=cut
697
698sub keepalive {
699 $_[0]{keepalive} = $_[1];
700
701 eval {
702 local $SIG{__DIE__};
703 setsockopt $_[0]{fh}, Socket::SOL_SOCKET (), Socket::SO_KEEPALIVE (), int $_[1]
704 if $_[0]{fh};
705 };
706}
707
577=item $handle->on_starttls ($cb) 708=item $handle->on_starttls ($cb)
578 709
579Replace the current C<on_starttls> callback (see the C<on_starttls> constructor argument). 710Replace the current C<on_starttls> callback (see the C<on_starttls> constructor argument).
580 711
581=cut 712=cut
588 719
589Replace the current C<on_stoptls> callback (see the C<on_stoptls> constructor argument). 720Replace the current C<on_stoptls> callback (see the C<on_stoptls> constructor argument).
590 721
591=cut 722=cut
592 723
593sub on_starttls { 724sub on_stoptls {
594 $_[0]{on_stoptls} = $_[1]; 725 $_[0]{on_stoptls} = $_[1];
595} 726}
596 727
728=item $handle->rbuf_max ($max_octets)
729
730Configures the C<rbuf_max> setting (C<undef> disables it).
731
732=cut
733
734sub rbuf_max {
735 $_[0]{rbuf_max} = $_[1];
736}
737
597############################################################################# 738#############################################################################
598 739
599=item $handle->timeout ($seconds) 740=item $handle->timeout ($seconds)
600 741
742=item $handle->rtimeout ($seconds)
743
744=item $handle->wtimeout ($seconds)
745
601Configures (or disables) the inactivity timeout. 746Configures (or disables) the inactivity timeout.
602 747
603=cut 748=item $handle->timeout_reset
604 749
605sub timeout { 750=item $handle->rtimeout_reset
751
752=item $handle->wtimeout_reset
753
754Reset the activity timeout, as if data was received or sent.
755
756These methods are cheap to call.
757
758=cut
759
760for my $dir ("", "r", "w") {
761 my $timeout = "${dir}timeout";
762 my $tw = "_${dir}tw";
763 my $on_timeout = "on_${dir}timeout";
764 my $activity = "_${dir}activity";
765 my $cb;
766
767 *$on_timeout = sub {
768 $_[0]{$on_timeout} = $_[1];
769 };
770
771 *$timeout = sub {
606 my ($self, $timeout) = @_; 772 my ($self, $new_value) = @_;
607 773
608 $self->{timeout} = $timeout; 774 $self->{$timeout} = $new_value;
609 $self->_timeout; 775 delete $self->{$tw}; &$cb;
610} 776 };
611 777
778 *{"${dir}timeout_reset"} = sub {
779 $_[0]{$activity} = AE::now;
780 };
781
782 # main workhorse:
612# reset the timeout watcher, as neccessary 783 # reset the timeout watcher, as neccessary
613# also check for time-outs 784 # also check for time-outs
614sub _timeout { 785 $cb = sub {
615 my ($self) = @_; 786 my ($self) = @_;
616 787
617 if ($self->{timeout} && $self->{fh}) { 788 if ($self->{$timeout} && $self->{fh}) {
618 my $NOW = AnyEvent->now; 789 my $NOW = AE::now;
619 790
620 # when would the timeout trigger? 791 # when would the timeout trigger?
621 my $after = $self->{_activity} + $self->{timeout} - $NOW; 792 my $after = $self->{$activity} + $self->{$timeout} - $NOW;
622 793
623 # now or in the past already? 794 # now or in the past already?
624 if ($after <= 0) { 795 if ($after <= 0) {
625 $self->{_activity} = $NOW; 796 $self->{$activity} = $NOW;
626 797
627 if ($self->{on_timeout}) { 798 if ($self->{$on_timeout}) {
628 $self->{on_timeout}($self); 799 $self->{$on_timeout}($self);
629 } else { 800 } else {
630 $self->_error (Errno::ETIMEDOUT); 801 $self->_error (Errno::ETIMEDOUT);
802 }
803
804 # callback could have changed timeout value, optimise
805 return unless $self->{$timeout};
806
807 # calculate new after
808 $after = $self->{$timeout};
631 } 809 }
632 810
633 # callback could have changed timeout value, optimise 811 Scalar::Util::weaken $self;
634 return unless $self->{timeout}; 812 return unless $self; # ->error could have destroyed $self
635 813
636 # calculate new after 814 $self->{$tw} ||= AE::timer $after, 0, sub {
637 $after = $self->{timeout}; 815 delete $self->{$tw};
816 $cb->($self);
817 };
818 } else {
819 delete $self->{$tw};
638 } 820 }
639
640 Scalar::Util::weaken $self;
641 return unless $self; # ->error could have destroyed $self
642
643 $self->{_tw} ||= AnyEvent->timer (after => $after, cb => sub {
644 delete $self->{_tw};
645 $self->_timeout;
646 });
647 } else {
648 delete $self->{_tw};
649 } 821 }
650} 822}
651 823
652############################################################################# 824#############################################################################
653 825
668 840
669=item $handle->on_drain ($cb) 841=item $handle->on_drain ($cb)
670 842
671Sets the C<on_drain> callback or clears it (see the description of 843Sets the C<on_drain> callback or clears it (see the description of
672C<on_drain> in the constructor). 844C<on_drain> in the constructor).
845
846This method may invoke callbacks (and therefore the handle might be
847destroyed after it returns).
673 848
674=cut 849=cut
675 850
676sub on_drain { 851sub on_drain {
677 my ($self, $cb) = @_; 852 my ($self, $cb) = @_;
686 861
687Queues the given scalar to be written. You can push as much data as you 862Queues the given scalar to be written. You can push as much data as you
688want (only limited by the available memory), as C<AnyEvent::Handle> 863want (only limited by the available memory), as C<AnyEvent::Handle>
689buffers it independently of the kernel. 864buffers it independently of the kernel.
690 865
866This method may invoke callbacks (and therefore the handle might be
867destroyed after it returns).
868
691=cut 869=cut
692 870
693sub _drain_wbuf { 871sub _drain_wbuf {
694 my ($self) = @_; 872 my ($self) = @_;
695 873
701 my $len = syswrite $self->{fh}, $self->{wbuf}; 879 my $len = syswrite $self->{fh}, $self->{wbuf};
702 880
703 if (defined $len) { 881 if (defined $len) {
704 substr $self->{wbuf}, 0, $len, ""; 882 substr $self->{wbuf}, 0, $len, "";
705 883
706 $self->{_activity} = AnyEvent->now; 884 $self->{_activity} = $self->{_wactivity} = AE::now;
707 885
708 $self->{on_drain}($self) 886 $self->{on_drain}($self)
709 if $self->{low_water_mark} >= (length $self->{wbuf}) + (length $self->{_tls_wbuf}) 887 if $self->{low_water_mark} >= (length $self->{wbuf}) + (length $self->{_tls_wbuf})
710 && $self->{on_drain}; 888 && $self->{on_drain};
711 889
717 895
718 # try to write data immediately 896 # try to write data immediately
719 $cb->() unless $self->{autocork}; 897 $cb->() unless $self->{autocork};
720 898
721 # if still data left in wbuf, we need to poll 899 # if still data left in wbuf, we need to poll
722 $self->{_ww} = AnyEvent->io (fh => $self->{fh}, poll => "w", cb => $cb) 900 $self->{_ww} = AE::io $self->{fh}, 1, $cb
723 if length $self->{wbuf}; 901 if length $self->{wbuf};
724 }; 902 };
725} 903}
726 904
727our %WH; 905our %WH;
728 906
907# deprecated
729sub register_write_type($$) { 908sub register_write_type($$) {
730 $WH{$_[0]} = $_[1]; 909 $WH{$_[0]} = $_[1];
731} 910}
732 911
733sub push_write { 912sub push_write {
734 my $self = shift; 913 my $self = shift;
735 914
736 if (@_ > 1) { 915 if (@_ > 1) {
737 my $type = shift; 916 my $type = shift;
738 917
918 @_ = ($WH{$type} ||= _load_func "$type\::anyevent_write_type"
739 @_ = ($WH{$type} or Carp::croak "unsupported type passed to AnyEvent::Handle::push_write") 919 or Carp::croak "unsupported/unloadable type '$type' passed to AnyEvent::Handle::push_write")
740 ->($self, @_); 920 ->($self, @_);
741 } 921 }
742 922
923 # we downgrade here to avoid hard-to-track-down bugs,
924 # and diagnose the problem earlier and better.
925
743 if ($self->{tls}) { 926 if ($self->{tls}) {
744 $self->{_tls_wbuf} .= $_[0]; 927 utf8::downgrade $self->{_tls_wbuf} .= $_[0];
745 928 &_dotls ($self) if $self->{fh};
746 &_dotls ($self);
747 } else { 929 } else {
748 $self->{wbuf} .= $_[0]; 930 utf8::downgrade $self->{wbuf} .= $_[0];
749 $self->_drain_wbuf if $self->{fh}; 931 $self->_drain_wbuf if $self->{fh};
750 } 932 }
751} 933}
752 934
753=item $handle->push_write (type => @args) 935=item $handle->push_write (type => @args)
754 936
755Instead of formatting your data yourself, you can also let this module do 937Instead of formatting your data yourself, you can also let this module
756the job by specifying a type and type-specific arguments. 938do the job by specifying a type and type-specific arguments. You
939can also specify the (fully qualified) name of a package, in which
940case AnyEvent tries to load the package and then expects to find the
941C<anyevent_write_type> function inside (see "custom write types", below).
757 942
758Predefined types are (if you have ideas for additional types, feel free to 943Predefined types are (if you have ideas for additional types, feel free to
759drop by and tell us): 944drop by and tell us):
760 945
761=over 4 946=over 4
818Other languages could read single lines terminated by a newline and pass 1003Other languages could read single lines terminated by a newline and pass
819this line into their JSON decoder of choice. 1004this line into their JSON decoder of choice.
820 1005
821=cut 1006=cut
822 1007
1008sub json_coder() {
1009 eval { require JSON::XS; JSON::XS->new->utf8 }
1010 || do { require JSON; JSON->new->utf8 }
1011}
1012
823register_write_type json => sub { 1013register_write_type json => sub {
824 my ($self, $ref) = @_; 1014 my ($self, $ref) = @_;
825 1015
826 require JSON; 1016 my $json = $self->{json} ||= json_coder;
827 1017
828 $self->{json} ? $self->{json}->encode ($ref) 1018 $json->encode ($ref)
829 : JSON::encode_json ($ref)
830}; 1019};
831 1020
832=item storable => $reference 1021=item storable => $reference
833 1022
834Freezes the given reference using L<Storable> and writes it to the 1023Freezes the given reference using L<Storable> and writes it to the
860the peer. 1049the peer.
861 1050
862You can rely on the normal read queue and C<on_eof> handling 1051You can rely on the normal read queue and C<on_eof> handling
863afterwards. This is the cleanest way to close a connection. 1052afterwards. This is the cleanest way to close a connection.
864 1053
1054This method may invoke callbacks (and therefore the handle might be
1055destroyed after it returns).
1056
865=cut 1057=cut
866 1058
867sub push_shutdown { 1059sub push_shutdown {
868 my ($self) = @_; 1060 my ($self) = @_;
869 1061
870 delete $self->{low_water_mark}; 1062 delete $self->{low_water_mark};
871 $self->on_drain (sub { shutdown $_[0]{fh}, 1 }); 1063 $self->on_drain (sub { shutdown $_[0]{fh}, 1 });
872} 1064}
873 1065
874=item AnyEvent::Handle::register_write_type type => $coderef->($handle, @args) 1066=item custom write types - Package::anyevent_write_type $handle, @args
875 1067
876This function (not method) lets you add your own types to C<push_write>. 1068Instead of one of the predefined types, you can also specify the name of
1069a package. AnyEvent will try to load the package and then expects to find
1070a function named C<anyevent_write_type> inside. If it isn't found, it
1071progressively tries to load the parent package until it either finds the
1072function (good) or runs out of packages (bad).
1073
877Whenever the given C<type> is used, C<push_write> will invoke the code 1074Whenever the given C<type> is used, C<push_write> will the function with
878reference with the handle object and the remaining arguments. 1075the handle object and the remaining arguments.
879 1076
880The code reference is supposed to return a single octet string that will 1077The function is supposed to return a single octet string that will be
881be appended to the write buffer. 1078appended to the write buffer, so you cna mentally treat this function as a
1079"arguments to on-the-wire-format" converter.
882 1080
883Note that this is a function, and all types registered this way will be 1081Example: implement a custom write type C<join> that joins the remaining
884global, so try to use unique names. 1082arguments using the first one.
1083
1084 $handle->push_write (My::Type => " ", 1,2,3);
1085
1086 # uses the following package, which can be defined in the "My::Type" or in
1087 # the "My" modules to be auto-loaded, or just about anywhere when the
1088 # My::Type::anyevent_write_type is defined before invoking it.
1089
1090 package My::Type;
1091
1092 sub anyevent_write_type {
1093 my ($handle, $delim, @args) = @_;
1094
1095 join $delim, @args
1096 }
885 1097
886=cut 1098=cut
887 1099
888############################################################################# 1100#############################################################################
889 1101
898ways, the "simple" way, using only C<on_read> and the "complex" way, using 1110ways, the "simple" way, using only C<on_read> and the "complex" way, using
899a queue. 1111a queue.
900 1112
901In the simple case, you just install an C<on_read> callback and whenever 1113In the simple case, you just install an C<on_read> callback and whenever
902new data arrives, it will be called. You can then remove some data (if 1114new data arrives, it will be called. You can then remove some data (if
903enough is there) from the read buffer (C<< $handle->rbuf >>). Or you cna 1115enough is there) from the read buffer (C<< $handle->rbuf >>). Or you can
904leave the data there if you want to accumulate more (e.g. when only a 1116leave the data there if you want to accumulate more (e.g. when only a
905partial message has been received so far). 1117partial message has been received so far), or change the read queue with
1118e.g. C<push_read>.
906 1119
907In the more complex case, you want to queue multiple callbacks. In this 1120In the more complex case, you want to queue multiple callbacks. In this
908case, AnyEvent::Handle will call the first queued callback each time new 1121case, AnyEvent::Handle will call the first queued callback each time new
909data arrives (also the first time it is queued) and removes it when it has 1122data arrives (also the first time it is queued) and removes it when it has
910done its job (see C<push_read>, below). 1123done its job (see C<push_read>, below).
972 1185
973sub _drain_rbuf { 1186sub _drain_rbuf {
974 my ($self) = @_; 1187 my ($self) = @_;
975 1188
976 # avoid recursion 1189 # avoid recursion
977 return if exists $self->{_skip_drain_rbuf}; 1190 return if $self->{_skip_drain_rbuf};
978 local $self->{_skip_drain_rbuf} = 1; 1191 local $self->{_skip_drain_rbuf} = 1;
979
980 if (
981 defined $self->{rbuf_max}
982 && $self->{rbuf_max} < length $self->{rbuf}
983 ) {
984 $self->_error (Errno::ENOSPC, 1), return;
985 }
986 1192
987 while () { 1193 while () {
988 # we need to use a separate tls read buffer, as we must not receive data while 1194 # we need to use a separate tls read buffer, as we must not receive data while
989 # we are draining the buffer, and this can only happen with TLS. 1195 # we are draining the buffer, and this can only happen with TLS.
990 $self->{rbuf} .= delete $self->{_tls_rbuf} if exists $self->{_tls_rbuf}; 1196 $self->{rbuf} .= delete $self->{_tls_rbuf}
1197 if exists $self->{_tls_rbuf};
991 1198
992 my $len = length $self->{rbuf}; 1199 my $len = length $self->{rbuf};
993 1200
994 if (my $cb = shift @{ $self->{_queue} }) { 1201 if (my $cb = shift @{ $self->{_queue} }) {
995 unless ($cb->($self)) { 1202 unless ($cb->($self)) {
996 if ($self->{_eof}) { 1203 # no progress can be made
997 # no progress can be made (not enough data and no data forthcoming) 1204 # (not enough data and no data forthcoming)
998 $self->_error (Errno::EPIPE, 1), return; 1205 $self->_error (Errno::EPIPE, 1), return
999 } 1206 if $self->{_eof};
1000 1207
1001 unshift @{ $self->{_queue} }, $cb; 1208 unshift @{ $self->{_queue} }, $cb;
1002 last; 1209 last;
1003 } 1210 }
1004 } elsif ($self->{on_read}) { 1211 } elsif ($self->{on_read}) {
1024 last; 1231 last;
1025 } 1232 }
1026 } 1233 }
1027 1234
1028 if ($self->{_eof}) { 1235 if ($self->{_eof}) {
1029 if ($self->{on_eof}) { 1236 $self->{on_eof}
1030 $self->{on_eof}($self) 1237 ? $self->{on_eof}($self)
1031 } else {
1032 $self->_error (0, 1, "Unexpected end-of-file"); 1238 : $self->_error (0, 1, "Unexpected end-of-file");
1033 } 1239
1240 return;
1241 }
1242
1243 if (
1244 defined $self->{rbuf_max}
1245 && $self->{rbuf_max} < length $self->{rbuf}
1246 ) {
1247 $self->_error (Errno::ENOSPC, 1), return;
1034 } 1248 }
1035 1249
1036 # may need to restart read watcher 1250 # may need to restart read watcher
1037 unless ($self->{_rw}) { 1251 unless ($self->{_rw}) {
1038 $self->start_read 1252 $self->start_read
1043=item $handle->on_read ($cb) 1257=item $handle->on_read ($cb)
1044 1258
1045This replaces the currently set C<on_read> callback, or clears it (when 1259This replaces the currently set C<on_read> callback, or clears it (when
1046the new callback is C<undef>). See the description of C<on_read> in the 1260the new callback is C<undef>). See the description of C<on_read> in the
1047constructor. 1261constructor.
1262
1263This method may invoke callbacks (and therefore the handle might be
1264destroyed after it returns).
1048 1265
1049=cut 1266=cut
1050 1267
1051sub on_read { 1268sub on_read {
1052 my ($self, $cb) = @_; 1269 my ($self, $cb) = @_;
1092 1309
1093If enough data was available, then the callback must remove all data it is 1310If enough data was available, then the callback must remove all data it is
1094interested in (which can be none at all) and return a true value. After returning 1311interested in (which can be none at all) and return a true value. After returning
1095true, it will be removed from the queue. 1312true, it will be removed from the queue.
1096 1313
1314These methods may invoke callbacks (and therefore the handle might be
1315destroyed after it returns).
1316
1097=cut 1317=cut
1098 1318
1099our %RH; 1319our %RH;
1100 1320
1101sub register_read_type($$) { 1321sub register_read_type($$) {
1107 my $cb = pop; 1327 my $cb = pop;
1108 1328
1109 if (@_) { 1329 if (@_) {
1110 my $type = shift; 1330 my $type = shift;
1111 1331
1332 $cb = ($RH{$type} ||= _load_func "$type\::anyevent_read_type"
1112 $cb = ($RH{$type} or Carp::croak "unsupported type passed to AnyEvent::Handle::push_read") 1333 or Carp::croak "unsupported/unloadable type '$type' passed to AnyEvent::Handle::push_read")
1113 ->($self, $cb, @_); 1334 ->($self, $cb, @_);
1114 } 1335 }
1115 1336
1116 push @{ $self->{_queue} }, $cb; 1337 push @{ $self->{_queue} }, $cb;
1117 $self->_drain_rbuf; 1338 $self->_drain_rbuf;
1126 1347
1127 $cb = ($RH{$type} or Carp::croak "unsupported type passed to AnyEvent::Handle::unshift_read") 1348 $cb = ($RH{$type} or Carp::croak "unsupported type passed to AnyEvent::Handle::unshift_read")
1128 ->($self, $cb, @_); 1349 ->($self, $cb, @_);
1129 } 1350 }
1130 1351
1131
1132 unshift @{ $self->{_queue} }, $cb; 1352 unshift @{ $self->{_queue} }, $cb;
1133 $self->_drain_rbuf; 1353 $self->_drain_rbuf;
1134} 1354}
1135 1355
1136=item $handle->push_read (type => @args, $cb) 1356=item $handle->push_read (type => @args, $cb)
1137 1357
1138=item $handle->unshift_read (type => @args, $cb) 1358=item $handle->unshift_read (type => @args, $cb)
1139 1359
1140Instead of providing a callback that parses the data itself you can chose 1360Instead of providing a callback that parses the data itself you can chose
1141between a number of predefined parsing formats, for chunks of data, lines 1361between a number of predefined parsing formats, for chunks of data, lines
1142etc. 1362etc. You can also specify the (fully qualified) name of a package, in
1363which case AnyEvent tries to load the package and then expects to find the
1364C<anyevent_read_type> function inside (see "custom read types", below).
1143 1365
1144Predefined types are (if you have ideas for additional types, feel free to 1366Predefined types are (if you have ideas for additional types, feel free to
1145drop by and tell us): 1367drop by and tell us):
1146 1368
1147=over 4 1369=over 4
1387=cut 1609=cut
1388 1610
1389register_read_type json => sub { 1611register_read_type json => sub {
1390 my ($self, $cb) = @_; 1612 my ($self, $cb) = @_;
1391 1613
1392 my $json = $self->{json} ||= 1614 my $json = $self->{json} ||= json_coder;
1393 eval { require JSON::XS; JSON::XS->new->utf8 }
1394 || do { require JSON; JSON->new->utf8 };
1395 1615
1396 my $data; 1616 my $data;
1397 my $rbuf = \$self->{rbuf}; 1617 my $rbuf = \$self->{rbuf};
1398 1618
1399 sub { 1619 sub {
1468 } 1688 }
1469}; 1689};
1470 1690
1471=back 1691=back
1472 1692
1473=item AnyEvent::Handle::register_read_type type => $coderef->($handle, $cb, @args) 1693=item custom read types - Package::anyevent_read_type $handle, $cb, @args
1474 1694
1475This function (not method) lets you add your own types to C<push_read>. 1695Instead of one of the predefined types, you can also specify the name
1696of a package. AnyEvent will try to load the package and then expects to
1697find a function named C<anyevent_read_type> inside. If it isn't found, it
1698progressively tries to load the parent package until it either finds the
1699function (good) or runs out of packages (bad).
1476 1700
1477Whenever the given C<type> is used, C<push_read> will invoke the code 1701Whenever this type is used, C<push_read> will invoke the function with the
1478reference with the handle object, the callback and the remaining 1702handle object, the original callback and the remaining arguments.
1479arguments.
1480 1703
1481The code reference is supposed to return a callback (usually a closure) 1704The function is supposed to return a callback (usually a closure) that
1482that works as a plain read callback (see C<< ->push_read ($cb) >>). 1705works as a plain read callback (see C<< ->push_read ($cb) >>), so you can
1706mentally treat the function as a "configurable read type to read callback"
1707converter.
1483 1708
1484It should invoke the passed callback when it is done reading (remember to 1709It should invoke the original callback when it is done reading (remember
1485pass C<$handle> as first argument as all other callbacks do that). 1710to pass C<$handle> as first argument as all other callbacks do that,
1711although there is no strict requirement on this).
1486 1712
1487Note that this is a function, and all types registered this way will be
1488global, so try to use unique names.
1489
1490For examples, see the source of this module (F<perldoc -m AnyEvent::Handle>, 1713For examples, see the source of this module (F<perldoc -m
1491search for C<register_read_type>)). 1714AnyEvent::Handle>, search for C<register_read_type>)).
1492 1715
1493=item $handle->stop_read 1716=item $handle->stop_read
1494 1717
1495=item $handle->start_read 1718=item $handle->start_read
1496 1719
1516} 1739}
1517 1740
1518sub start_read { 1741sub start_read {
1519 my ($self) = @_; 1742 my ($self) = @_;
1520 1743
1521 unless ($self->{_rw} || $self->{_eof}) { 1744 unless ($self->{_rw} || $self->{_eof} || !$self->{fh}) {
1522 Scalar::Util::weaken $self; 1745 Scalar::Util::weaken $self;
1523 1746
1524 $self->{_rw} = AnyEvent->io (fh => $self->{fh}, poll => "r", cb => sub { 1747 $self->{_rw} = AE::io $self->{fh}, 0, sub {
1525 my $rbuf = \($self->{tls} ? my $buf : $self->{rbuf}); 1748 my $rbuf = \($self->{tls} ? my $buf : $self->{rbuf});
1526 my $len = sysread $self->{fh}, $$rbuf, $self->{read_size} || 8192, length $$rbuf; 1749 my $len = sysread $self->{fh}, $$rbuf, $self->{read_size} || 8192, length $$rbuf;
1527 1750
1528 if ($len > 0) { 1751 if ($len > 0) {
1529 $self->{_activity} = AnyEvent->now; 1752 $self->{_activity} = $self->{_ractivity} = AE::now;
1530 1753
1531 if ($self->{tls}) { 1754 if ($self->{tls}) {
1532 Net::SSLeay::BIO_write ($self->{_rbio}, $$rbuf); 1755 Net::SSLeay::BIO_write ($self->{_rbio}, $$rbuf);
1533 1756
1534 &_dotls ($self); 1757 &_dotls ($self);
1542 $self->_drain_rbuf; 1765 $self->_drain_rbuf;
1543 1766
1544 } elsif ($! != EAGAIN && $! != EINTR && $! != WSAEWOULDBLOCK) { 1767 } elsif ($! != EAGAIN && $! != EINTR && $! != WSAEWOULDBLOCK) {
1545 return $self->_error ($!, 1); 1768 return $self->_error ($!, 1);
1546 } 1769 }
1547 }); 1770 };
1548 } 1771 }
1549} 1772}
1550 1773
1551our $ERROR_SYSCALL; 1774our $ERROR_SYSCALL;
1552our $ERROR_WANT_READ; 1775our $ERROR_WANT_READ;
1619 && ($tmp != $ERROR_SYSCALL || $!); 1842 && ($tmp != $ERROR_SYSCALL || $!);
1620 1843
1621 while (length ($tmp = Net::SSLeay::BIO_read ($self->{_wbio}))) { 1844 while (length ($tmp = Net::SSLeay::BIO_read ($self->{_wbio}))) {
1622 $self->{wbuf} .= $tmp; 1845 $self->{wbuf} .= $tmp;
1623 $self->_drain_wbuf; 1846 $self->_drain_wbuf;
1847 $self->{tls} or return; # tls session might have gone away in callback
1624 } 1848 }
1625 1849
1626 $self->{_on_starttls} 1850 $self->{_on_starttls}
1627 and Net::SSLeay::state ($self->{tls}) == Net::SSLeay::ST_OK () 1851 and Net::SSLeay::state ($self->{tls}) == Net::SSLeay::ST_OK ()
1628 and (delete $self->{_on_starttls})->($self, 1, "TLS/SSL connection established"); 1852 and (delete $self->{_on_starttls})->($self, 1, "TLS/SSL connection established");
1649The TLS connection object will end up in C<< $handle->{tls} >>, the TLS 1873The TLS connection object will end up in C<< $handle->{tls} >>, the TLS
1650context in C<< $handle->{tls_ctx} >> after this call and can be used or 1874context in C<< $handle->{tls_ctx} >> after this call and can be used or
1651changed to your liking. Note that the handshake might have already started 1875changed to your liking. Note that the handshake might have already started
1652when this function returns. 1876when this function returns.
1653 1877
1654If it an error to start a TLS handshake more than once per 1878Due to bugs in OpenSSL, it might or might not be possible to do multiple
1655AnyEvent::Handle object (this is due to bugs in OpenSSL). 1879handshakes on the same stream. Best do not attempt to use the stream after
1880stopping TLS.
1881
1882This method may invoke callbacks (and therefore the handle might be
1883destroyed after it returns).
1656 1884
1657=cut 1885=cut
1658 1886
1659our %TLS_CACHE; #TODO not yet documented, should we? 1887our %TLS_CACHE; #TODO not yet documented, should we?
1660 1888
1661sub starttls { 1889sub starttls {
1662 my ($self, $ssl, $ctx) = @_; 1890 my ($self, $tls, $ctx) = @_;
1891
1892 Carp::croak "It is an error to call starttls on an AnyEvent::Handle object while TLS is already active, caught"
1893 if $self->{tls};
1894
1895 $self->{tls} = $tls;
1896 $self->{tls_ctx} = $ctx if @_ > 2;
1897
1898 return unless $self->{fh};
1663 1899
1664 require Net::SSLeay; 1900 require Net::SSLeay;
1665
1666 Carp::croak "it is an error to call starttls more than once on an AnyEvent::Handle object"
1667 if $self->{tls};
1668 1901
1669 $ERROR_SYSCALL = Net::SSLeay::ERROR_SYSCALL (); 1902 $ERROR_SYSCALL = Net::SSLeay::ERROR_SYSCALL ();
1670 $ERROR_WANT_READ = Net::SSLeay::ERROR_WANT_READ (); 1903 $ERROR_WANT_READ = Net::SSLeay::ERROR_WANT_READ ();
1671 1904
1905 $tls = delete $self->{tls};
1672 $ctx ||= $self->{tls_ctx}; 1906 $ctx = $self->{tls_ctx};
1673 1907
1674 local $Carp::CarpLevel = 1; # skip ourselves when creating a new context or session 1908 local $Carp::CarpLevel = 1; # skip ourselves when creating a new context or session
1675 1909
1676 if ("HASH" eq ref $ctx) { 1910 if ("HASH" eq ref $ctx) {
1677 require AnyEvent::TLS; 1911 require AnyEvent::TLS;
1683 $ctx = new AnyEvent::TLS %$ctx; 1917 $ctx = new AnyEvent::TLS %$ctx;
1684 } 1918 }
1685 } 1919 }
1686 1920
1687 $self->{tls_ctx} = $ctx || TLS_CTX (); 1921 $self->{tls_ctx} = $ctx || TLS_CTX ();
1688 $self->{tls} = $ssl = $self->{tls_ctx}->_get_session ($ssl, $self, $self->{peername}); 1922 $self->{tls} = $tls = $self->{tls_ctx}->_get_session ($tls, $self, $self->{peername});
1689 1923
1690 # basically, this is deep magic (because SSL_read should have the same issues) 1924 # basically, this is deep magic (because SSL_read should have the same issues)
1691 # but the openssl maintainers basically said: "trust us, it just works". 1925 # but the openssl maintainers basically said: "trust us, it just works".
1692 # (unfortunately, we have to hardcode constants because the abysmally misdesigned 1926 # (unfortunately, we have to hardcode constants because the abysmally misdesigned
1693 # and mismaintained ssleay-module doesn't even offer them). 1927 # and mismaintained ssleay-module doesn't even offer them).
1700 # and we drive openssl fully in blocking mode here. Or maybe we don't - openssl seems to 1934 # and we drive openssl fully in blocking mode here. Or maybe we don't - openssl seems to
1701 # have identity issues in that area. 1935 # have identity issues in that area.
1702# Net::SSLeay::CTX_set_mode ($ssl, 1936# Net::SSLeay::CTX_set_mode ($ssl,
1703# (eval { local $SIG{__DIE__}; Net::SSLeay::MODE_ENABLE_PARTIAL_WRITE () } || 1) 1937# (eval { local $SIG{__DIE__}; Net::SSLeay::MODE_ENABLE_PARTIAL_WRITE () } || 1)
1704# | (eval { local $SIG{__DIE__}; Net::SSLeay::MODE_ACCEPT_MOVING_WRITE_BUFFER () } || 2)); 1938# | (eval { local $SIG{__DIE__}; Net::SSLeay::MODE_ACCEPT_MOVING_WRITE_BUFFER () } || 2));
1705 Net::SSLeay::CTX_set_mode ($ssl, 1|2); 1939 Net::SSLeay::CTX_set_mode ($tls, 1|2);
1706 1940
1707 $self->{_rbio} = Net::SSLeay::BIO_new (Net::SSLeay::BIO_s_mem ()); 1941 $self->{_rbio} = Net::SSLeay::BIO_new (Net::SSLeay::BIO_s_mem ());
1708 $self->{_wbio} = Net::SSLeay::BIO_new (Net::SSLeay::BIO_s_mem ()); 1942 $self->{_wbio} = Net::SSLeay::BIO_new (Net::SSLeay::BIO_s_mem ());
1709 1943
1944 Net::SSLeay::BIO_write ($self->{_rbio}, delete $self->{rbuf});
1945
1710 Net::SSLeay::set_bio ($ssl, $self->{_rbio}, $self->{_wbio}); 1946 Net::SSLeay::set_bio ($tls, $self->{_rbio}, $self->{_wbio});
1711 1947
1712 $self->{_on_starttls} = sub { $_[0]{on_starttls}(@_) } 1948 $self->{_on_starttls} = sub { $_[0]{on_starttls}(@_) }
1713 if $self->{on_starttls}; 1949 if $self->{on_starttls};
1714 1950
1715 &_dotls; # need to trigger the initial handshake 1951 &_dotls; # need to trigger the initial handshake
1718 1954
1719=item $handle->stoptls 1955=item $handle->stoptls
1720 1956
1721Shuts down the SSL connection - this makes a proper EOF handshake by 1957Shuts down the SSL connection - this makes a proper EOF handshake by
1722sending a close notify to the other side, but since OpenSSL doesn't 1958sending a close notify to the other side, but since OpenSSL doesn't
1723support non-blocking shut downs, it is not possible to re-use the stream 1959support non-blocking shut downs, it is not guaranteed that you can re-use
1724afterwards. 1960the stream afterwards.
1961
1962This method may invoke callbacks (and therefore the handle might be
1963destroyed after it returns).
1725 1964
1726=cut 1965=cut
1727 1966
1728sub stoptls { 1967sub stoptls {
1729 my ($self) = @_; 1968 my ($self) = @_;
1730 1969
1731 if ($self->{tls}) { 1970 if ($self->{tls} && $self->{fh}) {
1732 Net::SSLeay::shutdown ($self->{tls}); 1971 Net::SSLeay::shutdown ($self->{tls});
1733 1972
1734 &_dotls; 1973 &_dotls;
1735 1974
1736# # we don't give a shit. no, we do, but we can't. no...#d# 1975# # we don't give a shit. no, we do, but we can't. no...#d#
1742sub _freetls { 1981sub _freetls {
1743 my ($self) = @_; 1982 my ($self) = @_;
1744 1983
1745 return unless $self->{tls}; 1984 return unless $self->{tls};
1746 1985
1747 $self->{tls_ctx}->_put_session (delete $self->{tls}); 1986 $self->{tls_ctx}->_put_session (delete $self->{tls})
1987 if $self->{tls} > 0;
1748 1988
1749 delete @$self{qw(_rbio _wbio _tls_wbuf _on_starttls)}; 1989 delete @$self{qw(_rbio _wbio _tls_wbuf _on_starttls)};
1750} 1990}
1751 1991
1752sub DESTROY { 1992sub DESTROY {
1760 my $fh = delete $self->{fh}; 2000 my $fh = delete $self->{fh};
1761 my $wbuf = delete $self->{wbuf}; 2001 my $wbuf = delete $self->{wbuf};
1762 2002
1763 my @linger; 2003 my @linger;
1764 2004
1765 push @linger, AnyEvent->io (fh => $fh, poll => "w", cb => sub { 2005 push @linger, AE::io $fh, 1, sub {
1766 my $len = syswrite $fh, $wbuf, length $wbuf; 2006 my $len = syswrite $fh, $wbuf, length $wbuf;
1767 2007
1768 if ($len > 0) { 2008 if ($len > 0) {
1769 substr $wbuf, 0, $len, ""; 2009 substr $wbuf, 0, $len, "";
1770 } else { 2010 } else {
1771 @linger = (); # end 2011 @linger = (); # end
1772 } 2012 }
1773 }); 2013 };
1774 push @linger, AnyEvent->timer (after => $linger, cb => sub { 2014 push @linger, AE::timer $linger, 0, sub {
1775 @linger = (); 2015 @linger = ();
1776 }); 2016 };
1777 } 2017 }
1778} 2018}
1779 2019
1780=item $handle->destroy 2020=item $handle->destroy
1781 2021
1782Shuts down the handle object as much as possible - this call ensures that 2022Shuts down the handle object as much as possible - this call ensures that
1783no further callbacks will be invoked and as many resources as possible 2023no further callbacks will be invoked and as many resources as possible
1784will be freed. You must not call any methods on the object afterwards. 2024will be freed. Any method you will call on the handle object after
2025destroying it in this way will be silently ignored (and it will return the
2026empty list).
1785 2027
1786Normally, you can just "forget" any references to an AnyEvent::Handle 2028Normally, you can just "forget" any references to an AnyEvent::Handle
1787object and it will simply shut down. This works in fatal error and EOF 2029object and it will simply shut down. This works in fatal error and EOF
1788callbacks, as well as code outside. It does I<NOT> work in a read or write 2030callbacks, as well as code outside. It does I<NOT> work in a read or write
1789callback, so when you want to destroy the AnyEvent::Handle object from 2031callback, so when you want to destroy the AnyEvent::Handle object from
1803sub destroy { 2045sub destroy {
1804 my ($self) = @_; 2046 my ($self) = @_;
1805 2047
1806 $self->DESTROY; 2048 $self->DESTROY;
1807 %$self = (); 2049 %$self = ();
2050 bless $self, "AnyEvent::Handle::destroyed";
1808} 2051}
2052
2053sub AnyEvent::Handle::destroyed::AUTOLOAD {
2054 #nop
2055}
2056
2057=item $handle->destroyed
2058
2059Returns false as long as the handle hasn't been destroyed by a call to C<<
2060->destroy >>, true otherwise.
2061
2062Can be useful to decide whether the handle is still valid after some
2063callback possibly destroyed the handle. For example, C<< ->push_write >>,
2064C<< ->starttls >> and other methods can call user callbacks, which in turn
2065can destroy the handle, so work can be avoided by checking sometimes:
2066
2067 $hdl->starttls ("accept");
2068 return if $hdl->destroyed;
2069 $hdl->push_write (...
2070
2071Note that the call to C<push_write> will silently be ignored if the handle
2072has been destroyed, so often you can just ignore the possibility of the
2073handle being destroyed.
2074
2075=cut
2076
2077sub destroyed { 0 }
2078sub AnyEvent::Handle::destroyed::destroyed { 1 }
1809 2079
1810=item AnyEvent::Handle::TLS_CTX 2080=item AnyEvent::Handle::TLS_CTX
1811 2081
1812This function creates and returns the AnyEvent::TLS object used by default 2082This function creates and returns the AnyEvent::TLS object used by default
1813for TLS mode. 2083for TLS mode.

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines