ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent/lib/AnyEvent/Handle.pm
(Generate patch)

Comparing AnyEvent/lib/AnyEvent/Handle.pm (file contents):
Revision 1.160 by root, Fri Jul 24 22:47:04 2009 UTC vs.
Revision 1.182 by root, Thu Sep 3 12:35:01 2009 UTC

1package AnyEvent::Handle;
2
3use Scalar::Util ();
4use Carp ();
5use Errno qw(EAGAIN EINTR);
6
7use AnyEvent (); BEGIN { AnyEvent::common_sense }
8use AnyEvent::Util qw(WSAEWOULDBLOCK);
9
10=head1 NAME 1=head1 NAME
11 2
12AnyEvent::Handle - non-blocking I/O on file handles via AnyEvent 3AnyEvent::Handle - non-blocking I/O on file handles via AnyEvent
13
14=cut
15
16our $VERSION = 4.86;
17 4
18=head1 SYNOPSIS 5=head1 SYNOPSIS
19 6
20 use AnyEvent; 7 use AnyEvent;
21 use AnyEvent::Handle; 8 use AnyEvent::Handle;
59C<on_error> callback. 46C<on_error> callback.
60 47
61All callbacks will be invoked with the handle object as their first 48All callbacks will be invoked with the handle object as their first
62argument. 49argument.
63 50
51=cut
52
53package AnyEvent::Handle;
54
55use Scalar::Util ();
56use List::Util ();
57use Carp ();
58use Errno qw(EAGAIN EINTR);
59
60use AnyEvent (); BEGIN { AnyEvent::common_sense }
61use AnyEvent::Util qw(WSAEWOULDBLOCK);
62
63our $VERSION = $AnyEvent::VERSION;
64
64=head1 METHODS 65=head1 METHODS
65 66
66=over 4 67=over 4
67 68
68=item $handle = B<new> AnyEvent::TLS fh => $filehandle, key => value... 69=item $handle = B<new> AnyEvent::TLS fh => $filehandle, key => value...
100This (rarely used) callback is called before a new connection is 101This (rarely used) callback is called before a new connection is
101attempted, but after the file handle has been created. It could be used to 102attempted, but after the file handle has been created. It could be used to
102prepare the file handle with parameters required for the actual connect 103prepare the file handle with parameters required for the actual connect
103(as opposed to settings that can be changed when the connection is already 104(as opposed to settings that can be changed when the connection is already
104established). 105established).
106
107The return value of this callback should be the connect timeout value in
108seconds (or C<0>, or C<undef>, or the empty list, to indicate the default
109timeout is to be used).
105 110
106=item on_connect => $cb->($handle, $host, $port, $retry->()) 111=item on_connect => $cb->($handle, $host, $port, $retry->())
107 112
108This callback is called when a connection has been successfully established. 113This callback is called when a connection has been successfully established.
109 114
212memory and push it into the queue, but instead only read more data from 217memory and push it into the queue, but instead only read more data from
213the file when the write queue becomes empty. 218the file when the write queue becomes empty.
214 219
215=item timeout => $fractional_seconds 220=item timeout => $fractional_seconds
216 221
222=item rtimeout => $fractional_seconds
223
224=item wtimeout => $fractional_seconds
225
217If non-zero, then this enables an "inactivity" timeout: whenever this many 226If non-zero, then these enables an "inactivity" timeout: whenever this
218seconds pass without a successful read or write on the underlying file 227many seconds pass without a successful read or write on the underlying
219handle, the C<on_timeout> callback will be invoked (and if that one is 228file handle (or a call to C<timeout_reset>), the C<on_timeout> callback
220missing, a non-fatal C<ETIMEDOUT> error will be raised). 229will be invoked (and if that one is missing, a non-fatal C<ETIMEDOUT>
230error will be raised).
231
232There are three variants of the timeouts that work fully independent
233of each other, for both read and write, just read, and just write:
234C<timeout>, C<rtimeout> and C<wtimeout>, with corresponding callbacks
235C<on_timeout>, C<on_rtimeout> and C<on_wtimeout>, and reset functions
236C<timeout_reset>, C<rtimeout_reset>, and C<wtimeout_reset>.
221 237
222Note that timeout processing is also active when you currently do not have 238Note that timeout processing is also active when you currently do not have
223any outstanding read or write requests: If you plan to keep the connection 239any outstanding read or write requests: If you plan to keep the connection
224idle then you should disable the timout temporarily or ignore the timeout 240idle then you should disable the timout temporarily or ignore the timeout
225in the C<on_timeout> callback, in which case AnyEvent::Handle will simply 241in the C<on_timeout> callback, in which case AnyEvent::Handle will simply
269accomplishd by setting this option to a true value. 285accomplishd by setting this option to a true value.
270 286
271The default is your opertaing system's default behaviour (most likely 287The default is your opertaing system's default behaviour (most likely
272enabled), this option explicitly enables or disables it, if possible. 288enabled), this option explicitly enables or disables it, if possible.
273 289
290=item keepalive => <boolean>
291
292Enables (default disable) the SO_KEEPALIVE option on the stream socket:
293normally, TCP connections have no time-out once established, so TCP
294conenctions, once established, can stay alive forever even when the other
295side has long gone. TCP keepalives are a cheap way to take down long-lived
296TCP connections whent he other side becomes unreachable. While the default
297is OS-dependent, TCP keepalives usually kick in after around two hours,
298and, if the other side doesn't reply, take down the TCP connection some 10
299to 15 minutes later.
300
301It is harmless to specify this option for file handles that do not support
302keepalives, and enabling it on connections that are potentially long-lived
303is usually a good idea.
304
305=item oobinline => <boolean>
306
307BSD majorly fucked up the implementation of TCP urgent data. The result
308is that almost no OS implements TCP according to the specs, and every OS
309implements it slightly differently.
310
311If you want to handle TCP urgent data, then setting this flag gives you
312the most portable way of getting urgent data, by putting it into the
313stream.
314
274=item read_size => <bytes> 315=item read_size => <bytes>
275 316
276The default read block size (the amount of bytes this module will 317The default read block size (the amount of bytes this module will
277try to read during each loop iteration, which affects memory 318try to read during each loop iteration, which affects memory
278requirements). Default: C<8192>. 319requirements). Default: C<8192>.
434 delete $self->{_skip_drain_rbuf}; 475 delete $self->{_skip_drain_rbuf};
435 $self->_start; 476 $self->_start;
436 477
437 $self->{on_connect} 478 $self->{on_connect}
438 and $self->{on_connect}($self, $host, $port, sub { 479 and $self->{on_connect}($self, $host, $port, sub {
439 delete @$self{qw(fh _tw _ww _rw _eof _queue rbuf _wbuf tls _tls_rbuf _tls_wbuf)}; 480 delete @$self{qw(fh _tw _rtw _wtw _ww _rw _eof _queue rbuf _wbuf tls _tls_rbuf _tls_wbuf)};
440 $self->{_skip_drain_rbuf} = 1; 481 $self->{_skip_drain_rbuf} = 1;
441 &$retry; 482 &$retry;
442 }); 483 });
443 484
444 } else { 485 } else {
445 if ($self->{on_connect_error}) { 486 if ($self->{on_connect_error}) {
446 $self->{on_connect_error}($self, "$!"); 487 $self->{on_connect_error}($self, "$!");
447 $self->destroy; 488 $self->destroy;
448 } else { 489 } else {
449 $self->fatal ($!, 1); 490 $self->_error ($!, 1);
450 } 491 }
451 } 492 }
452 }, 493 },
453 sub { 494 sub {
454 local $self->{fh} = $_[0]; 495 local $self->{fh} = $_[0];
455 496
497 $self->{on_prepare}
456 $self->{on_prepare}->($self) 498 ? $self->{on_prepare}->($self)
457 if $self->{on_prepare}; 499 : ()
458 } 500 }
459 ); 501 );
460 } 502 }
461 503
462 } else { 504 } else {
469sub _start { 511sub _start {
470 my ($self) = @_; 512 my ($self) = @_;
471 513
472 AnyEvent::Util::fh_nonblocking $self->{fh}, 1; 514 AnyEvent::Util::fh_nonblocking $self->{fh}, 1;
473 515
516 $self->{_activity} =
517 $self->{_ractivity} =
474 $self->{_activity} = AnyEvent->now; 518 $self->{_wactivity} = AE::now;
475 $self->_timeout;
476 519
520 $self->timeout (delete $self->{timeout} ) if $self->{timeout};
521 $self->rtimeout (delete $self->{rtimeout} ) if $self->{rtimeout};
522 $self->wtimeout (delete $self->{wtimeout} ) if $self->{wtimeout};
523
477 $self->no_delay (delete $self->{no_delay}) if exists $self->{no_delay}; 524 $self->no_delay (delete $self->{no_delay} ) if exists $self->{no_delay};
525 $self->keepalive (delete $self->{keepalive}) if exists $self->{keepalive};
526 $self->oobinline (delete $self->{oobinline}) if exists $self->{oobinline};
478 527
479 $self->starttls (delete $self->{tls}, delete $self->{tls_ctx}) 528 $self->starttls (delete $self->{tls}, delete $self->{tls_ctx})
480 if $self->{tls}; 529 if $self->{tls};
481 530
482 $self->on_drain (delete $self->{on_drain}) if $self->{on_drain}; 531 $self->on_drain (delete $self->{on_drain}) if $self->{on_drain};
483 532
484 $self->start_read 533 $self->start_read
485 if $self->{on_read} || @{ $self->{_queue} }; 534 if $self->{on_read} || @{ $self->{_queue} };
486 535
487 $self->_drain_wbuf; 536 $self->_drain_wbuf;
488} 537}
489
490#sub _shutdown {
491# my ($self) = @_;
492#
493# delete @$self{qw(_tw _rw _ww fh wbuf on_read _queue)};
494# $self->{_eof} = 1; # tell starttls et. al to stop trying
495#
496# &_freetls;
497#}
498 538
499sub _error { 539sub _error {
500 my ($self, $errno, $fatal, $message) = @_; 540 my ($self, $errno, $fatal, $message) = @_;
501 541
502 $! = $errno; 542 $! = $errno;
539 $_[0]{on_eof} = $_[1]; 579 $_[0]{on_eof} = $_[1];
540} 580}
541 581
542=item $handle->on_timeout ($cb) 582=item $handle->on_timeout ($cb)
543 583
544Replace the current C<on_timeout> callback, or disables the callback (but 584=item $handle->on_rtimeout ($cb)
545not the timeout) if C<$cb> = C<undef>. See the C<timeout> constructor
546argument and method.
547 585
548=cut 586=item $handle->on_wtimeout ($cb)
549 587
550sub on_timeout { 588Replace the current C<on_timeout>, C<on_rtimeout> or C<on_wtimeout>
551 $_[0]{on_timeout} = $_[1]; 589callback, or disables the callback (but not the timeout) if C<$cb> =
552} 590C<undef>. See the C<timeout> constructor argument and method.
591
592=cut
593
594# see below
553 595
554=item $handle->autocork ($boolean) 596=item $handle->autocork ($boolean)
555 597
556Enables or disables the current autocork behaviour (see C<autocork> 598Enables or disables the current autocork behaviour (see C<autocork>
557constructor argument). Changes will only take effect on the next write. 599constructor argument). Changes will only take effect on the next write.
572sub no_delay { 614sub no_delay {
573 $_[0]{no_delay} = $_[1]; 615 $_[0]{no_delay} = $_[1];
574 616
575 eval { 617 eval {
576 local $SIG{__DIE__}; 618 local $SIG{__DIE__};
577 setsockopt $_[0]{fh}, &Socket::IPPROTO_TCP, &Socket::TCP_NODELAY, int $_[1] 619 setsockopt $_[0]{fh}, Socket::IPPROTO_TCP (), Socket::TCP_NODELAY (), int $_[1]
578 if $_[0]{fh}; 620 if $_[0]{fh};
579 }; 621 };
580} 622}
581 623
624=item $handle->keepalive ($boolean)
625
626Enables or disables the C<keepalive> setting (see constructor argument of
627the same name for details).
628
629=cut
630
631sub keepalive {
632 $_[0]{keepalive} = $_[1];
633
634 eval {
635 local $SIG{__DIE__};
636 setsockopt $_[0]{fh}, Socket::SOL_SOCKET (), Socket::SO_KEEPALIVE (), int $_[1]
637 if $_[0]{fh};
638 };
639}
640
641=item $handle->oobinline ($boolean)
642
643Enables or disables the C<oobinline> setting (see constructor argument of
644the same name for details).
645
646=cut
647
648sub oobinline {
649 $_[0]{oobinline} = $_[1];
650
651 eval {
652 local $SIG{__DIE__};
653 setsockopt $_[0]{fh}, Socket::SOL_SOCKET (), Socket::SO_OOBINLINE (), int $_[1]
654 if $_[0]{fh};
655 };
656}
657
658=item $handle->keepalive ($boolean)
659
660Enables or disables the C<keepalive> setting (see constructor argument of
661the same name for details).
662
663=cut
664
665sub keepalive {
666 $_[0]{keepalive} = $_[1];
667
668 eval {
669 local $SIG{__DIE__};
670 setsockopt $_[0]{fh}, Socket::SOL_SOCKET (), Socket::SO_KEEPALIVE (), int $_[1]
671 if $_[0]{fh};
672 };
673}
674
582=item $handle->on_starttls ($cb) 675=item $handle->on_starttls ($cb)
583 676
584Replace the current C<on_starttls> callback (see the C<on_starttls> constructor argument). 677Replace the current C<on_starttls> callback (see the C<on_starttls> constructor argument).
585 678
586=cut 679=cut
597 690
598sub on_starttls { 691sub on_starttls {
599 $_[0]{on_stoptls} = $_[1]; 692 $_[0]{on_stoptls} = $_[1];
600} 693}
601 694
695=item $handle->rbuf_max ($max_octets)
696
697Configures the C<rbuf_max> setting (C<undef> disables it).
698
699=cut
700
701sub rbuf_max {
702 $_[0]{rbuf_max} = $_[1];
703}
704
602############################################################################# 705#############################################################################
603 706
604=item $handle->timeout ($seconds) 707=item $handle->timeout ($seconds)
605 708
709=item $handle->rtimeout ($seconds)
710
711=item $handle->wtimeout ($seconds)
712
606Configures (or disables) the inactivity timeout. 713Configures (or disables) the inactivity timeout.
607 714
608=cut 715=item $handle->timeout_reset
609 716
610sub timeout { 717=item $handle->rtimeout_reset
718
719=item $handle->wtimeout_reset
720
721Reset the activity timeout, as if data was received or sent.
722
723These methods are cheap to call.
724
725=cut
726
727for my $dir ("", "r", "w") {
728 my $timeout = "${dir}timeout";
729 my $tw = "_${dir}tw";
730 my $on_timeout = "on_${dir}timeout";
731 my $activity = "_${dir}activity";
732 my $cb;
733
734 *$on_timeout = sub {
735 $_[0]{$on_timeout} = $_[1];
736 };
737
738 *$timeout = sub {
611 my ($self, $timeout) = @_; 739 my ($self, $new_value) = @_;
612 740
613 $self->{timeout} = $timeout; 741 $self->{$timeout} = $new_value;
614 $self->_timeout; 742 delete $self->{$tw}; &$cb;
615} 743 };
616 744
745 *{"${dir}timeout_reset"} = sub {
746 $_[0]{$activity} = AE::now;
747 };
748
749 # main workhorse:
617# reset the timeout watcher, as neccessary 750 # reset the timeout watcher, as neccessary
618# also check for time-outs 751 # also check for time-outs
619sub _timeout { 752 $cb = sub {
620 my ($self) = @_; 753 my ($self) = @_;
621 754
622 if ($self->{timeout} && $self->{fh}) { 755 if ($self->{$timeout} && $self->{fh}) {
623 my $NOW = AnyEvent->now; 756 my $NOW = AE::now;
624 757
625 # when would the timeout trigger? 758 # when would the timeout trigger?
626 my $after = $self->{_activity} + $self->{timeout} - $NOW; 759 my $after = $self->{$activity} + $self->{$timeout} - $NOW;
627 760
628 # now or in the past already? 761 # now or in the past already?
629 if ($after <= 0) { 762 if ($after <= 0) {
630 $self->{_activity} = $NOW; 763 $self->{$activity} = $NOW;
631 764
632 if ($self->{on_timeout}) { 765 if ($self->{$on_timeout}) {
633 $self->{on_timeout}($self); 766 $self->{$on_timeout}($self);
634 } else { 767 } else {
635 $self->_error (Errno::ETIMEDOUT); 768 $self->_error (Errno::ETIMEDOUT);
769 }
770
771 # callback could have changed timeout value, optimise
772 return unless $self->{$timeout};
773
774 # calculate new after
775 $after = $self->{$timeout};
636 } 776 }
637 777
638 # callback could have changed timeout value, optimise 778 Scalar::Util::weaken $self;
639 return unless $self->{timeout}; 779 return unless $self; # ->error could have destroyed $self
640 780
641 # calculate new after 781 $self->{$tw} ||= AE::timer $after, 0, sub {
642 $after = $self->{timeout}; 782 delete $self->{$tw};
783 $cb->($self);
784 };
785 } else {
786 delete $self->{$tw};
643 } 787 }
644
645 Scalar::Util::weaken $self;
646 return unless $self; # ->error could have destroyed $self
647
648 $self->{_tw} ||= AnyEvent->timer (after => $after, cb => sub {
649 delete $self->{_tw};
650 $self->_timeout;
651 });
652 } else {
653 delete $self->{_tw};
654 } 788 }
655} 789}
656 790
657############################################################################# 791#############################################################################
658 792
706 my $len = syswrite $self->{fh}, $self->{wbuf}; 840 my $len = syswrite $self->{fh}, $self->{wbuf};
707 841
708 if (defined $len) { 842 if (defined $len) {
709 substr $self->{wbuf}, 0, $len, ""; 843 substr $self->{wbuf}, 0, $len, "";
710 844
711 $self->{_activity} = AnyEvent->now; 845 $self->{_activity} = $self->{_wactivity} = AE::now;
712 846
713 $self->{on_drain}($self) 847 $self->{on_drain}($self)
714 if $self->{low_water_mark} >= (length $self->{wbuf}) + (length $self->{_tls_wbuf}) 848 if $self->{low_water_mark} >= (length $self->{wbuf}) + (length $self->{_tls_wbuf})
715 && $self->{on_drain}; 849 && $self->{on_drain};
716 850
722 856
723 # try to write data immediately 857 # try to write data immediately
724 $cb->() unless $self->{autocork}; 858 $cb->() unless $self->{autocork};
725 859
726 # if still data left in wbuf, we need to poll 860 # if still data left in wbuf, we need to poll
727 $self->{_ww} = AnyEvent->io (fh => $self->{fh}, poll => "w", cb => $cb) 861 $self->{_ww} = AE::io $self->{fh}, 1, $cb
728 if length $self->{wbuf}; 862 if length $self->{wbuf};
729 }; 863 };
730} 864}
731 865
732our %WH; 866our %WH;
822Other languages could read single lines terminated by a newline and pass 956Other languages could read single lines terminated by a newline and pass
823this line into their JSON decoder of choice. 957this line into their JSON decoder of choice.
824 958
825=cut 959=cut
826 960
961sub json_coder() {
962 eval { require JSON::XS; JSON::XS->new->utf8 }
963 || do { require JSON; JSON->new->utf8 }
964}
965
827register_write_type json => sub { 966register_write_type json => sub {
828 my ($self, $ref) = @_; 967 my ($self, $ref) = @_;
829 968
830 require JSON; 969 my $json = $self->{json} ||= json_coder;
831 970
832 $self->{json} ? $self->{json}->encode ($ref) 971 $json->encode ($ref)
833 : JSON::encode_json ($ref)
834}; 972};
835 973
836=item storable => $reference 974=item storable => $reference
837 975
838Freezes the given reference using L<Storable> and writes it to the 976Freezes the given reference using L<Storable> and writes it to the
976 1114
977sub _drain_rbuf { 1115sub _drain_rbuf {
978 my ($self) = @_; 1116 my ($self) = @_;
979 1117
980 # avoid recursion 1118 # avoid recursion
981 return if exists $self->{_skip_drain_rbuf}; 1119 return if $self->{_skip_drain_rbuf};
982 local $self->{_skip_drain_rbuf} = 1; 1120 local $self->{_skip_drain_rbuf} = 1;
983
984 if (
985 defined $self->{rbuf_max}
986 && $self->{rbuf_max} < length $self->{rbuf}
987 ) {
988 $self->_error (Errno::ENOSPC, 1), return;
989 }
990 1121
991 while () { 1122 while () {
992 # we need to use a separate tls read buffer, as we must not receive data while 1123 # we need to use a separate tls read buffer, as we must not receive data while
993 # we are draining the buffer, and this can only happen with TLS. 1124 # we are draining the buffer, and this can only happen with TLS.
994 $self->{rbuf} .= delete $self->{_tls_rbuf} if exists $self->{_tls_rbuf}; 1125 $self->{rbuf} .= delete $self->{_tls_rbuf}
1126 if exists $self->{_tls_rbuf};
995 1127
996 my $len = length $self->{rbuf}; 1128 my $len = length $self->{rbuf};
997 1129
998 if (my $cb = shift @{ $self->{_queue} }) { 1130 if (my $cb = shift @{ $self->{_queue} }) {
999 unless ($cb->($self)) { 1131 unless ($cb->($self)) {
1000 if ($self->{_eof}) { 1132 # no progress can be made
1001 # no progress can be made (not enough data and no data forthcoming) 1133 # (not enough data and no data forthcoming)
1002 $self->_error (Errno::EPIPE, 1), return; 1134 $self->_error (Errno::EPIPE, 1), return
1003 } 1135 if $self->{_eof};
1004 1136
1005 unshift @{ $self->{_queue} }, $cb; 1137 unshift @{ $self->{_queue} }, $cb;
1006 last; 1138 last;
1007 } 1139 }
1008 } elsif ($self->{on_read}) { 1140 } elsif ($self->{on_read}) {
1028 last; 1160 last;
1029 } 1161 }
1030 } 1162 }
1031 1163
1032 if ($self->{_eof}) { 1164 if ($self->{_eof}) {
1033 if ($self->{on_eof}) { 1165 $self->{on_eof}
1034 $self->{on_eof}($self) 1166 ? $self->{on_eof}($self)
1035 } else {
1036 $self->_error (0, 1, "Unexpected end-of-file"); 1167 : $self->_error (0, 1, "Unexpected end-of-file");
1037 } 1168
1169 return;
1170 }
1171
1172 if (
1173 defined $self->{rbuf_max}
1174 && $self->{rbuf_max} < length $self->{rbuf}
1175 ) {
1176 $self->_error (Errno::ENOSPC, 1), return;
1038 } 1177 }
1039 1178
1040 # may need to restart read watcher 1179 # may need to restart read watcher
1041 unless ($self->{_rw}) { 1180 unless ($self->{_rw}) {
1042 $self->start_read 1181 $self->start_read
1129 my $type = shift; 1268 my $type = shift;
1130 1269
1131 $cb = ($RH{$type} or Carp::croak "unsupported type passed to AnyEvent::Handle::unshift_read") 1270 $cb = ($RH{$type} or Carp::croak "unsupported type passed to AnyEvent::Handle::unshift_read")
1132 ->($self, $cb, @_); 1271 ->($self, $cb, @_);
1133 } 1272 }
1134
1135 1273
1136 unshift @{ $self->{_queue} }, $cb; 1274 unshift @{ $self->{_queue} }, $cb;
1137 $self->_drain_rbuf; 1275 $self->_drain_rbuf;
1138} 1276}
1139 1277
1391=cut 1529=cut
1392 1530
1393register_read_type json => sub { 1531register_read_type json => sub {
1394 my ($self, $cb) = @_; 1532 my ($self, $cb) = @_;
1395 1533
1396 my $json = $self->{json} ||= 1534 my $json = $self->{json} ||= json_coder;
1397 eval { require JSON::XS; JSON::XS->new->utf8 }
1398 || do { require JSON; JSON->new->utf8 };
1399 1535
1400 my $data; 1536 my $data;
1401 my $rbuf = \$self->{rbuf}; 1537 my $rbuf = \$self->{rbuf};
1402 1538
1403 sub { 1539 sub {
1523 my ($self) = @_; 1659 my ($self) = @_;
1524 1660
1525 unless ($self->{_rw} || $self->{_eof}) { 1661 unless ($self->{_rw} || $self->{_eof}) {
1526 Scalar::Util::weaken $self; 1662 Scalar::Util::weaken $self;
1527 1663
1528 $self->{_rw} = AnyEvent->io (fh => $self->{fh}, poll => "r", cb => sub { 1664 $self->{_rw} = AE::io $self->{fh}, 0, sub {
1529 my $rbuf = \($self->{tls} ? my $buf : $self->{rbuf}); 1665 my $rbuf = \($self->{tls} ? my $buf : $self->{rbuf});
1530 my $len = sysread $self->{fh}, $$rbuf, $self->{read_size} || 8192, length $$rbuf; 1666 my $len = sysread $self->{fh}, $$rbuf, $self->{read_size} || 8192, length $$rbuf;
1531 1667
1532 if ($len > 0) { 1668 if ($len > 0) {
1533 $self->{_activity} = AnyEvent->now; 1669 $self->{_activity} = $self->{_ractivity} = AE::now;
1534 1670
1535 if ($self->{tls}) { 1671 if ($self->{tls}) {
1536 Net::SSLeay::BIO_write ($self->{_rbio}, $$rbuf); 1672 Net::SSLeay::BIO_write ($self->{_rbio}, $$rbuf);
1537 1673
1538 &_dotls ($self); 1674 &_dotls ($self);
1546 $self->_drain_rbuf; 1682 $self->_drain_rbuf;
1547 1683
1548 } elsif ($! != EAGAIN && $! != EINTR && $! != WSAEWOULDBLOCK) { 1684 } elsif ($! != EAGAIN && $! != EINTR && $! != WSAEWOULDBLOCK) {
1549 return $self->_error ($!, 1); 1685 return $self->_error ($!, 1);
1550 } 1686 }
1551 }); 1687 };
1552 } 1688 }
1553} 1689}
1554 1690
1555our $ERROR_SYSCALL; 1691our $ERROR_SYSCALL;
1556our $ERROR_WANT_READ; 1692our $ERROR_WANT_READ;
1677 require Net::SSLeay; 1813 require Net::SSLeay;
1678 1814
1679 $ERROR_SYSCALL = Net::SSLeay::ERROR_SYSCALL (); 1815 $ERROR_SYSCALL = Net::SSLeay::ERROR_SYSCALL ();
1680 $ERROR_WANT_READ = Net::SSLeay::ERROR_WANT_READ (); 1816 $ERROR_WANT_READ = Net::SSLeay::ERROR_WANT_READ ();
1681 1817
1682 $tls = $self->{tls}; 1818 $tls = delete $self->{tls};
1683 $ctx = $self->{tls_ctx}; 1819 $ctx = $self->{tls_ctx};
1684 1820
1685 local $Carp::CarpLevel = 1; # skip ourselves when creating a new context or session 1821 local $Carp::CarpLevel = 1; # skip ourselves when creating a new context or session
1686 1822
1687 if ("HASH" eq ref $ctx) { 1823 if ("HASH" eq ref $ctx) {
1716 Net::SSLeay::CTX_set_mode ($tls, 1|2); 1852 Net::SSLeay::CTX_set_mode ($tls, 1|2);
1717 1853
1718 $self->{_rbio} = Net::SSLeay::BIO_new (Net::SSLeay::BIO_s_mem ()); 1854 $self->{_rbio} = Net::SSLeay::BIO_new (Net::SSLeay::BIO_s_mem ());
1719 $self->{_wbio} = Net::SSLeay::BIO_new (Net::SSLeay::BIO_s_mem ()); 1855 $self->{_wbio} = Net::SSLeay::BIO_new (Net::SSLeay::BIO_s_mem ());
1720 1856
1857 Net::SSLeay::BIO_write ($self->{_rbio}, delete $self->{rbuf});
1858
1721 Net::SSLeay::set_bio ($tls, $self->{_rbio}, $self->{_wbio}); 1859 Net::SSLeay::set_bio ($tls, $self->{_rbio}, $self->{_wbio});
1722 1860
1723 $self->{_on_starttls} = sub { $_[0]{on_starttls}(@_) } 1861 $self->{_on_starttls} = sub { $_[0]{on_starttls}(@_) }
1724 if $self->{on_starttls}; 1862 if $self->{on_starttls};
1725 1863
1754 my ($self) = @_; 1892 my ($self) = @_;
1755 1893
1756 return unless $self->{tls}; 1894 return unless $self->{tls};
1757 1895
1758 $self->{tls_ctx}->_put_session (delete $self->{tls}) 1896 $self->{tls_ctx}->_put_session (delete $self->{tls})
1759 if ref $self->{tls}; 1897 if $self->{tls} > 0;
1760 1898
1761 delete @$self{qw(_rbio _wbio _tls_wbuf _on_starttls)}; 1899 delete @$self{qw(_rbio _wbio _tls_wbuf _on_starttls)};
1762} 1900}
1763 1901
1764sub DESTROY { 1902sub DESTROY {
1772 my $fh = delete $self->{fh}; 1910 my $fh = delete $self->{fh};
1773 my $wbuf = delete $self->{wbuf}; 1911 my $wbuf = delete $self->{wbuf};
1774 1912
1775 my @linger; 1913 my @linger;
1776 1914
1777 push @linger, AnyEvent->io (fh => $fh, poll => "w", cb => sub { 1915 push @linger, AE::io $fh, 1, sub {
1778 my $len = syswrite $fh, $wbuf, length $wbuf; 1916 my $len = syswrite $fh, $wbuf, length $wbuf;
1779 1917
1780 if ($len > 0) { 1918 if ($len > 0) {
1781 substr $wbuf, 0, $len, ""; 1919 substr $wbuf, 0, $len, "";
1782 } else { 1920 } else {
1783 @linger = (); # end 1921 @linger = (); # end
1784 } 1922 }
1785 }); 1923 };
1786 push @linger, AnyEvent->timer (after => $linger, cb => sub { 1924 push @linger, AE::timer $linger, 0, sub {
1787 @linger = (); 1925 @linger = ();
1788 }); 1926 };
1789 } 1927 }
1790} 1928}
1791 1929
1792=item $handle->destroy 1930=item $handle->destroy
1793 1931
1794Shuts down the handle object as much as possible - this call ensures that 1932Shuts down the handle object as much as possible - this call ensures that
1795no further callbacks will be invoked and as many resources as possible 1933no further callbacks will be invoked and as many resources as possible
1796will be freed. You must not call any methods on the object afterwards. 1934will be freed. Any method you will call on the handle object after
1935destroying it in this way will be silently ignored (and it will return the
1936empty list).
1797 1937
1798Normally, you can just "forget" any references to an AnyEvent::Handle 1938Normally, you can just "forget" any references to an AnyEvent::Handle
1799object and it will simply shut down. This works in fatal error and EOF 1939object and it will simply shut down. This works in fatal error and EOF
1800callbacks, as well as code outside. It does I<NOT> work in a read or write 1940callbacks, as well as code outside. It does I<NOT> work in a read or write
1801callback, so when you want to destroy the AnyEvent::Handle object from 1941callback, so when you want to destroy the AnyEvent::Handle object from
1815sub destroy { 1955sub destroy {
1816 my ($self) = @_; 1956 my ($self) = @_;
1817 1957
1818 $self->DESTROY; 1958 $self->DESTROY;
1819 %$self = (); 1959 %$self = ();
1960 bless $self, "AnyEvent::Handle::destroyed";
1961}
1962
1963sub AnyEvent::Handle::destroyed::AUTOLOAD {
1964 #nop
1820} 1965}
1821 1966
1822=item AnyEvent::Handle::TLS_CTX 1967=item AnyEvent::Handle::TLS_CTX
1823 1968
1824This function creates and returns the AnyEvent::TLS object used by default 1969This function creates and returns the AnyEvent::TLS object used by default

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines