ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent/lib/AnyEvent/Handle.pm
(Generate patch)

Comparing AnyEvent/lib/AnyEvent/Handle.pm (file contents):
Revision 1.162 by root, Sun Jul 26 00:17:25 2009 UTC vs.
Revision 1.184 by root, Thu Sep 3 13:14:38 2009 UTC

1package AnyEvent::Handle;
2
3use Scalar::Util ();
4use Carp ();
5use Errno qw(EAGAIN EINTR);
6
7use AnyEvent (); BEGIN { AnyEvent::common_sense }
8use AnyEvent::Util qw(WSAEWOULDBLOCK);
9
10=head1 NAME 1=head1 NAME
11 2
12AnyEvent::Handle - non-blocking I/O on file handles via AnyEvent 3AnyEvent::Handle - non-blocking I/O on file handles via AnyEvent
13
14=cut
15
16our $VERSION = 4.87;
17 4
18=head1 SYNOPSIS 5=head1 SYNOPSIS
19 6
20 use AnyEvent; 7 use AnyEvent;
21 use AnyEvent::Handle; 8 use AnyEvent::Handle;
59C<on_error> callback. 46C<on_error> callback.
60 47
61All callbacks will be invoked with the handle object as their first 48All callbacks will be invoked with the handle object as their first
62argument. 49argument.
63 50
51=cut
52
53package AnyEvent::Handle;
54
55use Scalar::Util ();
56use List::Util ();
57use Carp ();
58use Errno qw(EAGAIN EINTR);
59
60use AnyEvent (); BEGIN { AnyEvent::common_sense }
61use AnyEvent::Util qw(WSAEWOULDBLOCK);
62
63our $VERSION = $AnyEvent::VERSION;
64
64=head1 METHODS 65=head1 METHODS
65 66
66=over 4 67=over 4
67 68
68=item $handle = B<new> AnyEvent::TLS fh => $filehandle, key => value... 69=item $handle = B<new> AnyEvent::TLS fh => $filehandle, key => value...
216memory and push it into the queue, but instead only read more data from 217memory and push it into the queue, but instead only read more data from
217the file when the write queue becomes empty. 218the file when the write queue becomes empty.
218 219
219=item timeout => $fractional_seconds 220=item timeout => $fractional_seconds
220 221
222=item rtimeout => $fractional_seconds
223
224=item wtimeout => $fractional_seconds
225
221If non-zero, then this enables an "inactivity" timeout: whenever this many 226If non-zero, then these enables an "inactivity" timeout: whenever this
222seconds pass without a successful read or write on the underlying file 227many seconds pass without a successful read or write on the underlying
223handle, the C<on_timeout> callback will be invoked (and if that one is 228file handle (or a call to C<timeout_reset>), the C<on_timeout> callback
224missing, a non-fatal C<ETIMEDOUT> error will be raised). 229will be invoked (and if that one is missing, a non-fatal C<ETIMEDOUT>
230error will be raised).
231
232There are three variants of the timeouts that work fully independent
233of each other, for both read and write, just read, and just write:
234C<timeout>, C<rtimeout> and C<wtimeout>, with corresponding callbacks
235C<on_timeout>, C<on_rtimeout> and C<on_wtimeout>, and reset functions
236C<timeout_reset>, C<rtimeout_reset>, and C<wtimeout_reset>.
225 237
226Note that timeout processing is also active when you currently do not have 238Note that timeout processing is also active when you currently do not have
227any outstanding read or write requests: If you plan to keep the connection 239any outstanding read or write requests: If you plan to keep the connection
228idle then you should disable the timout temporarily or ignore the timeout 240idle then you should disable the timout temporarily or ignore the timeout
229in the C<on_timeout> callback, in which case AnyEvent::Handle will simply 241in the C<on_timeout> callback, in which case AnyEvent::Handle will simply
273accomplishd by setting this option to a true value. 285accomplishd by setting this option to a true value.
274 286
275The default is your opertaing system's default behaviour (most likely 287The default is your opertaing system's default behaviour (most likely
276enabled), this option explicitly enables or disables it, if possible. 288enabled), this option explicitly enables or disables it, if possible.
277 289
290=item keepalive => <boolean>
291
292Enables (default disable) the SO_KEEPALIVE option on the stream socket:
293normally, TCP connections have no time-out once established, so TCP
294conenctions, once established, can stay alive forever even when the other
295side has long gone. TCP keepalives are a cheap way to take down long-lived
296TCP connections whent he other side becomes unreachable. While the default
297is OS-dependent, TCP keepalives usually kick in after around two hours,
298and, if the other side doesn't reply, take down the TCP connection some 10
299to 15 minutes later.
300
301It is harmless to specify this option for file handles that do not support
302keepalives, and enabling it on connections that are potentially long-lived
303is usually a good idea.
304
305=item oobinline => <boolean>
306
307BSD majorly fucked up the implementation of TCP urgent data. The result
308is that almost no OS implements TCP according to the specs, and every OS
309implements it slightly differently.
310
311If you want to handle TCP urgent data, then setting this flag (the default
312is enabled) gives you the most portable way of getting urgent data, by
313putting it into the stream.
314
315Since BSD emulation of OOB data on top of TCP's urgent data can have
316security implications, AnyEvent::Handle sets this flag automatically
317unless explicitly specified. Note that setting this flag after
318establishing a connection I<may> be a bit too late (data loss could
319already have occured on BSD systems), but at least it will protect you
320from most attacks.
321
278=item read_size => <bytes> 322=item read_size => <bytes>
279 323
280The default read block size (the amount of bytes this module will 324The default read block size (the amount of bytes this module will
281try to read during each loop iteration, which affects memory 325try to read during each loop iteration, which affects memory
282requirements). Default: C<8192>. 326requirements). Default: C<8192>.
438 delete $self->{_skip_drain_rbuf}; 482 delete $self->{_skip_drain_rbuf};
439 $self->_start; 483 $self->_start;
440 484
441 $self->{on_connect} 485 $self->{on_connect}
442 and $self->{on_connect}($self, $host, $port, sub { 486 and $self->{on_connect}($self, $host, $port, sub {
443 delete @$self{qw(fh _tw _ww _rw _eof _queue rbuf _wbuf tls _tls_rbuf _tls_wbuf)}; 487 delete @$self{qw(fh _tw _rtw _wtw _ww _rw _eof _queue rbuf _wbuf tls _tls_rbuf _tls_wbuf)};
444 $self->{_skip_drain_rbuf} = 1; 488 $self->{_skip_drain_rbuf} = 1;
445 &$retry; 489 &$retry;
446 }); 490 });
447 491
448 } else { 492 } else {
474sub _start { 518sub _start {
475 my ($self) = @_; 519 my ($self) = @_;
476 520
477 AnyEvent::Util::fh_nonblocking $self->{fh}, 1; 521 AnyEvent::Util::fh_nonblocking $self->{fh}, 1;
478 522
523 $self->{_activity} =
524 $self->{_ractivity} =
479 $self->{_activity} = AnyEvent->now; 525 $self->{_wactivity} = AE::now;
480 $self->_timeout;
481 526
527 $self->timeout (delete $self->{timeout} ) if $self->{timeout};
528 $self->rtimeout (delete $self->{rtimeout} ) if $self->{rtimeout};
529 $self->wtimeout (delete $self->{wtimeout} ) if $self->{wtimeout};
530
482 $self->no_delay (delete $self->{no_delay}) if exists $self->{no_delay}; 531 $self->no_delay (delete $self->{no_delay} ) if exists $self->{no_delay} && $self->{no_delay};
532 $self->keepalive (delete $self->{keepalive}) if exists $self->{keepalive} && $self->{keepalive};
483 533
534 $self->oobinline (exists $self->{oobinline} ? delete $self->{oobinline} : 1);
535
484 $self->starttls (delete $self->{tls}, delete $self->{tls_ctx}) 536 $self->starttls (delete $self->{tls}, delete $self->{tls_ctx})
485 if $self->{tls}; 537 if $self->{tls};
486 538
487 $self->on_drain (delete $self->{on_drain}) if $self->{on_drain}; 539 $self->on_drain (delete $self->{on_drain}) if $self->{on_drain};
488 540
489 $self->start_read 541 $self->start_read
490 if $self->{on_read} || @{ $self->{_queue} }; 542 if $self->{on_read} || @{ $self->{_queue} };
491 543
492 $self->_drain_wbuf; 544 $self->_drain_wbuf;
493} 545}
494
495#sub _shutdown {
496# my ($self) = @_;
497#
498# delete @$self{qw(_tw _rw _ww fh wbuf on_read _queue)};
499# $self->{_eof} = 1; # tell starttls et. al to stop trying
500#
501# &_freetls;
502#}
503 546
504sub _error { 547sub _error {
505 my ($self, $errno, $fatal, $message) = @_; 548 my ($self, $errno, $fatal, $message) = @_;
506 549
507 $! = $errno; 550 $! = $errno;
544 $_[0]{on_eof} = $_[1]; 587 $_[0]{on_eof} = $_[1];
545} 588}
546 589
547=item $handle->on_timeout ($cb) 590=item $handle->on_timeout ($cb)
548 591
549Replace the current C<on_timeout> callback, or disables the callback (but 592=item $handle->on_rtimeout ($cb)
550not the timeout) if C<$cb> = C<undef>. See the C<timeout> constructor
551argument and method.
552 593
553=cut 594=item $handle->on_wtimeout ($cb)
554 595
555sub on_timeout { 596Replace the current C<on_timeout>, C<on_rtimeout> or C<on_wtimeout>
556 $_[0]{on_timeout} = $_[1]; 597callback, or disables the callback (but not the timeout) if C<$cb> =
557} 598C<undef>. See the C<timeout> constructor argument and method.
599
600=cut
601
602# see below
558 603
559=item $handle->autocork ($boolean) 604=item $handle->autocork ($boolean)
560 605
561Enables or disables the current autocork behaviour (see C<autocork> 606Enables or disables the current autocork behaviour (see C<autocork>
562constructor argument). Changes will only take effect on the next write. 607constructor argument). Changes will only take effect on the next write.
577sub no_delay { 622sub no_delay {
578 $_[0]{no_delay} = $_[1]; 623 $_[0]{no_delay} = $_[1];
579 624
580 eval { 625 eval {
581 local $SIG{__DIE__}; 626 local $SIG{__DIE__};
582 setsockopt $_[0]{fh}, &Socket::IPPROTO_TCP, &Socket::TCP_NODELAY, int $_[1] 627 setsockopt $_[0]{fh}, Socket::IPPROTO_TCP (), Socket::TCP_NODELAY (), int $_[1]
583 if $_[0]{fh}; 628 if $_[0]{fh};
584 }; 629 };
585} 630}
586 631
632=item $handle->keepalive ($boolean)
633
634Enables or disables the C<keepalive> setting (see constructor argument of
635the same name for details).
636
637=cut
638
639sub keepalive {
640 $_[0]{keepalive} = $_[1];
641
642 eval {
643 local $SIG{__DIE__};
644 setsockopt $_[0]{fh}, Socket::SOL_SOCKET (), Socket::SO_KEEPALIVE (), int $_[1]
645 if $_[0]{fh};
646 };
647}
648
649=item $handle->oobinline ($boolean)
650
651Enables or disables the C<oobinline> setting (see constructor argument of
652the same name for details).
653
654=cut
655
656sub oobinline {
657 $_[0]{oobinline} = $_[1];
658
659 eval {
660 local $SIG{__DIE__};
661 setsockopt $_[0]{fh}, Socket::SOL_SOCKET (), Socket::SO_OOBINLINE (), int $_[1]
662 if $_[0]{fh};
663 };
664}
665
666=item $handle->keepalive ($boolean)
667
668Enables or disables the C<keepalive> setting (see constructor argument of
669the same name for details).
670
671=cut
672
673sub keepalive {
674 $_[0]{keepalive} = $_[1];
675
676 eval {
677 local $SIG{__DIE__};
678 setsockopt $_[0]{fh}, Socket::SOL_SOCKET (), Socket::SO_KEEPALIVE (), int $_[1]
679 if $_[0]{fh};
680 };
681}
682
587=item $handle->on_starttls ($cb) 683=item $handle->on_starttls ($cb)
588 684
589Replace the current C<on_starttls> callback (see the C<on_starttls> constructor argument). 685Replace the current C<on_starttls> callback (see the C<on_starttls> constructor argument).
590 686
591=cut 687=cut
602 698
603sub on_starttls { 699sub on_starttls {
604 $_[0]{on_stoptls} = $_[1]; 700 $_[0]{on_stoptls} = $_[1];
605} 701}
606 702
703=item $handle->rbuf_max ($max_octets)
704
705Configures the C<rbuf_max> setting (C<undef> disables it).
706
707=cut
708
709sub rbuf_max {
710 $_[0]{rbuf_max} = $_[1];
711}
712
607############################################################################# 713#############################################################################
608 714
609=item $handle->timeout ($seconds) 715=item $handle->timeout ($seconds)
610 716
717=item $handle->rtimeout ($seconds)
718
719=item $handle->wtimeout ($seconds)
720
611Configures (or disables) the inactivity timeout. 721Configures (or disables) the inactivity timeout.
612 722
613=cut 723=item $handle->timeout_reset
614 724
615sub timeout { 725=item $handle->rtimeout_reset
726
727=item $handle->wtimeout_reset
728
729Reset the activity timeout, as if data was received or sent.
730
731These methods are cheap to call.
732
733=cut
734
735for my $dir ("", "r", "w") {
736 my $timeout = "${dir}timeout";
737 my $tw = "_${dir}tw";
738 my $on_timeout = "on_${dir}timeout";
739 my $activity = "_${dir}activity";
740 my $cb;
741
742 *$on_timeout = sub {
743 $_[0]{$on_timeout} = $_[1];
744 };
745
746 *$timeout = sub {
616 my ($self, $timeout) = @_; 747 my ($self, $new_value) = @_;
617 748
618 $self->{timeout} = $timeout; 749 $self->{$timeout} = $new_value;
619 $self->_timeout; 750 delete $self->{$tw}; &$cb;
620} 751 };
621 752
753 *{"${dir}timeout_reset"} = sub {
754 $_[0]{$activity} = AE::now;
755 };
756
757 # main workhorse:
622# reset the timeout watcher, as neccessary 758 # reset the timeout watcher, as neccessary
623# also check for time-outs 759 # also check for time-outs
624sub _timeout { 760 $cb = sub {
625 my ($self) = @_; 761 my ($self) = @_;
626 762
627 if ($self->{timeout} && $self->{fh}) { 763 if ($self->{$timeout} && $self->{fh}) {
628 my $NOW = AnyEvent->now; 764 my $NOW = AE::now;
629 765
630 # when would the timeout trigger? 766 # when would the timeout trigger?
631 my $after = $self->{_activity} + $self->{timeout} - $NOW; 767 my $after = $self->{$activity} + $self->{$timeout} - $NOW;
632 768
633 # now or in the past already? 769 # now or in the past already?
634 if ($after <= 0) { 770 if ($after <= 0) {
635 $self->{_activity} = $NOW; 771 $self->{$activity} = $NOW;
636 772
637 if ($self->{on_timeout}) { 773 if ($self->{$on_timeout}) {
638 $self->{on_timeout}($self); 774 $self->{$on_timeout}($self);
639 } else { 775 } else {
640 $self->_error (Errno::ETIMEDOUT); 776 $self->_error (Errno::ETIMEDOUT);
777 }
778
779 # callback could have changed timeout value, optimise
780 return unless $self->{$timeout};
781
782 # calculate new after
783 $after = $self->{$timeout};
641 } 784 }
642 785
643 # callback could have changed timeout value, optimise 786 Scalar::Util::weaken $self;
644 return unless $self->{timeout}; 787 return unless $self; # ->error could have destroyed $self
645 788
646 # calculate new after 789 $self->{$tw} ||= AE::timer $after, 0, sub {
647 $after = $self->{timeout}; 790 delete $self->{$tw};
791 $cb->($self);
792 };
793 } else {
794 delete $self->{$tw};
648 } 795 }
649
650 Scalar::Util::weaken $self;
651 return unless $self; # ->error could have destroyed $self
652
653 $self->{_tw} ||= AnyEvent->timer (after => $after, cb => sub {
654 delete $self->{_tw};
655 $self->_timeout;
656 });
657 } else {
658 delete $self->{_tw};
659 } 796 }
660} 797}
661 798
662############################################################################# 799#############################################################################
663 800
711 my $len = syswrite $self->{fh}, $self->{wbuf}; 848 my $len = syswrite $self->{fh}, $self->{wbuf};
712 849
713 if (defined $len) { 850 if (defined $len) {
714 substr $self->{wbuf}, 0, $len, ""; 851 substr $self->{wbuf}, 0, $len, "";
715 852
716 $self->{_activity} = AnyEvent->now; 853 $self->{_activity} = $self->{_wactivity} = AE::now;
717 854
718 $self->{on_drain}($self) 855 $self->{on_drain}($self)
719 if $self->{low_water_mark} >= (length $self->{wbuf}) + (length $self->{_tls_wbuf}) 856 if $self->{low_water_mark} >= (length $self->{wbuf}) + (length $self->{_tls_wbuf})
720 && $self->{on_drain}; 857 && $self->{on_drain};
721 858
727 864
728 # try to write data immediately 865 # try to write data immediately
729 $cb->() unless $self->{autocork}; 866 $cb->() unless $self->{autocork};
730 867
731 # if still data left in wbuf, we need to poll 868 # if still data left in wbuf, we need to poll
732 $self->{_ww} = AnyEvent->io (fh => $self->{fh}, poll => "w", cb => $cb) 869 $self->{_ww} = AE::io $self->{fh}, 1, $cb
733 if length $self->{wbuf}; 870 if length $self->{wbuf};
734 }; 871 };
735} 872}
736 873
737our %WH; 874our %WH;
827Other languages could read single lines terminated by a newline and pass 964Other languages could read single lines terminated by a newline and pass
828this line into their JSON decoder of choice. 965this line into their JSON decoder of choice.
829 966
830=cut 967=cut
831 968
969sub json_coder() {
970 eval { require JSON::XS; JSON::XS->new->utf8 }
971 || do { require JSON; JSON->new->utf8 }
972}
973
832register_write_type json => sub { 974register_write_type json => sub {
833 my ($self, $ref) = @_; 975 my ($self, $ref) = @_;
834 976
835 require JSON; 977 my $json = $self->{json} ||= json_coder;
836 978
837 $self->{json} ? $self->{json}->encode ($ref) 979 $json->encode ($ref)
838 : JSON::encode_json ($ref)
839}; 980};
840 981
841=item storable => $reference 982=item storable => $reference
842 983
843Freezes the given reference using L<Storable> and writes it to the 984Freezes the given reference using L<Storable> and writes it to the
981 1122
982sub _drain_rbuf { 1123sub _drain_rbuf {
983 my ($self) = @_; 1124 my ($self) = @_;
984 1125
985 # avoid recursion 1126 # avoid recursion
986 return if exists $self->{_skip_drain_rbuf}; 1127 return if $self->{_skip_drain_rbuf};
987 local $self->{_skip_drain_rbuf} = 1; 1128 local $self->{_skip_drain_rbuf} = 1;
988
989 if (
990 defined $self->{rbuf_max}
991 && $self->{rbuf_max} < length $self->{rbuf}
992 ) {
993 $self->_error (Errno::ENOSPC, 1), return;
994 }
995 1129
996 while () { 1130 while () {
997 # we need to use a separate tls read buffer, as we must not receive data while 1131 # we need to use a separate tls read buffer, as we must not receive data while
998 # we are draining the buffer, and this can only happen with TLS. 1132 # we are draining the buffer, and this can only happen with TLS.
999 $self->{rbuf} .= delete $self->{_tls_rbuf} if exists $self->{_tls_rbuf}; 1133 $self->{rbuf} .= delete $self->{_tls_rbuf}
1134 if exists $self->{_tls_rbuf};
1000 1135
1001 my $len = length $self->{rbuf}; 1136 my $len = length $self->{rbuf};
1002 1137
1003 if (my $cb = shift @{ $self->{_queue} }) { 1138 if (my $cb = shift @{ $self->{_queue} }) {
1004 unless ($cb->($self)) { 1139 unless ($cb->($self)) {
1005 if ($self->{_eof}) { 1140 # no progress can be made
1006 # no progress can be made (not enough data and no data forthcoming) 1141 # (not enough data and no data forthcoming)
1007 $self->_error (Errno::EPIPE, 1), return; 1142 $self->_error (Errno::EPIPE, 1), return
1008 } 1143 if $self->{_eof};
1009 1144
1010 unshift @{ $self->{_queue} }, $cb; 1145 unshift @{ $self->{_queue} }, $cb;
1011 last; 1146 last;
1012 } 1147 }
1013 } elsif ($self->{on_read}) { 1148 } elsif ($self->{on_read}) {
1033 last; 1168 last;
1034 } 1169 }
1035 } 1170 }
1036 1171
1037 if ($self->{_eof}) { 1172 if ($self->{_eof}) {
1038 if ($self->{on_eof}) { 1173 $self->{on_eof}
1039 $self->{on_eof}($self) 1174 ? $self->{on_eof}($self)
1040 } else {
1041 $self->_error (0, 1, "Unexpected end-of-file"); 1175 : $self->_error (0, 1, "Unexpected end-of-file");
1042 } 1176
1177 return;
1178 }
1179
1180 if (
1181 defined $self->{rbuf_max}
1182 && $self->{rbuf_max} < length $self->{rbuf}
1183 ) {
1184 $self->_error (Errno::ENOSPC, 1), return;
1043 } 1185 }
1044 1186
1045 # may need to restart read watcher 1187 # may need to restart read watcher
1046 unless ($self->{_rw}) { 1188 unless ($self->{_rw}) {
1047 $self->start_read 1189 $self->start_read
1134 my $type = shift; 1276 my $type = shift;
1135 1277
1136 $cb = ($RH{$type} or Carp::croak "unsupported type passed to AnyEvent::Handle::unshift_read") 1278 $cb = ($RH{$type} or Carp::croak "unsupported type passed to AnyEvent::Handle::unshift_read")
1137 ->($self, $cb, @_); 1279 ->($self, $cb, @_);
1138 } 1280 }
1139
1140 1281
1141 unshift @{ $self->{_queue} }, $cb; 1282 unshift @{ $self->{_queue} }, $cb;
1142 $self->_drain_rbuf; 1283 $self->_drain_rbuf;
1143} 1284}
1144 1285
1396=cut 1537=cut
1397 1538
1398register_read_type json => sub { 1539register_read_type json => sub {
1399 my ($self, $cb) = @_; 1540 my ($self, $cb) = @_;
1400 1541
1401 my $json = $self->{json} ||= 1542 my $json = $self->{json} ||= json_coder;
1402 eval { require JSON::XS; JSON::XS->new->utf8 }
1403 || do { require JSON; JSON->new->utf8 };
1404 1543
1405 my $data; 1544 my $data;
1406 my $rbuf = \$self->{rbuf}; 1545 my $rbuf = \$self->{rbuf};
1407 1546
1408 sub { 1547 sub {
1528 my ($self) = @_; 1667 my ($self) = @_;
1529 1668
1530 unless ($self->{_rw} || $self->{_eof}) { 1669 unless ($self->{_rw} || $self->{_eof}) {
1531 Scalar::Util::weaken $self; 1670 Scalar::Util::weaken $self;
1532 1671
1533 $self->{_rw} = AnyEvent->io (fh => $self->{fh}, poll => "r", cb => sub { 1672 $self->{_rw} = AE::io $self->{fh}, 0, sub {
1534 my $rbuf = \($self->{tls} ? my $buf : $self->{rbuf}); 1673 my $rbuf = \($self->{tls} ? my $buf : $self->{rbuf});
1535 my $len = sysread $self->{fh}, $$rbuf, $self->{read_size} || 8192, length $$rbuf; 1674 my $len = sysread $self->{fh}, $$rbuf, $self->{read_size} || 8192, length $$rbuf;
1536 1675
1537 if ($len > 0) { 1676 if ($len > 0) {
1538 $self->{_activity} = AnyEvent->now; 1677 $self->{_activity} = $self->{_ractivity} = AE::now;
1539 1678
1540 if ($self->{tls}) { 1679 if ($self->{tls}) {
1541 Net::SSLeay::BIO_write ($self->{_rbio}, $$rbuf); 1680 Net::SSLeay::BIO_write ($self->{_rbio}, $$rbuf);
1542 1681
1543 &_dotls ($self); 1682 &_dotls ($self);
1551 $self->_drain_rbuf; 1690 $self->_drain_rbuf;
1552 1691
1553 } elsif ($! != EAGAIN && $! != EINTR && $! != WSAEWOULDBLOCK) { 1692 } elsif ($! != EAGAIN && $! != EINTR && $! != WSAEWOULDBLOCK) {
1554 return $self->_error ($!, 1); 1693 return $self->_error ($!, 1);
1555 } 1694 }
1556 }); 1695 };
1557 } 1696 }
1558} 1697}
1559 1698
1560our $ERROR_SYSCALL; 1699our $ERROR_SYSCALL;
1561our $ERROR_WANT_READ; 1700our $ERROR_WANT_READ;
1682 require Net::SSLeay; 1821 require Net::SSLeay;
1683 1822
1684 $ERROR_SYSCALL = Net::SSLeay::ERROR_SYSCALL (); 1823 $ERROR_SYSCALL = Net::SSLeay::ERROR_SYSCALL ();
1685 $ERROR_WANT_READ = Net::SSLeay::ERROR_WANT_READ (); 1824 $ERROR_WANT_READ = Net::SSLeay::ERROR_WANT_READ ();
1686 1825
1687 $tls = $self->{tls}; 1826 $tls = delete $self->{tls};
1688 $ctx = $self->{tls_ctx}; 1827 $ctx = $self->{tls_ctx};
1689 1828
1690 local $Carp::CarpLevel = 1; # skip ourselves when creating a new context or session 1829 local $Carp::CarpLevel = 1; # skip ourselves when creating a new context or session
1691 1830
1692 if ("HASH" eq ref $ctx) { 1831 if ("HASH" eq ref $ctx) {
1721 Net::SSLeay::CTX_set_mode ($tls, 1|2); 1860 Net::SSLeay::CTX_set_mode ($tls, 1|2);
1722 1861
1723 $self->{_rbio} = Net::SSLeay::BIO_new (Net::SSLeay::BIO_s_mem ()); 1862 $self->{_rbio} = Net::SSLeay::BIO_new (Net::SSLeay::BIO_s_mem ());
1724 $self->{_wbio} = Net::SSLeay::BIO_new (Net::SSLeay::BIO_s_mem ()); 1863 $self->{_wbio} = Net::SSLeay::BIO_new (Net::SSLeay::BIO_s_mem ());
1725 1864
1865 Net::SSLeay::BIO_write ($self->{_rbio}, delete $self->{rbuf});
1866
1726 Net::SSLeay::set_bio ($tls, $self->{_rbio}, $self->{_wbio}); 1867 Net::SSLeay::set_bio ($tls, $self->{_rbio}, $self->{_wbio});
1727 1868
1728 $self->{_on_starttls} = sub { $_[0]{on_starttls}(@_) } 1869 $self->{_on_starttls} = sub { $_[0]{on_starttls}(@_) }
1729 if $self->{on_starttls}; 1870 if $self->{on_starttls};
1730 1871
1759 my ($self) = @_; 1900 my ($self) = @_;
1760 1901
1761 return unless $self->{tls}; 1902 return unless $self->{tls};
1762 1903
1763 $self->{tls_ctx}->_put_session (delete $self->{tls}) 1904 $self->{tls_ctx}->_put_session (delete $self->{tls})
1764 if ref $self->{tls}; 1905 if $self->{tls} > 0;
1765 1906
1766 delete @$self{qw(_rbio _wbio _tls_wbuf _on_starttls)}; 1907 delete @$self{qw(_rbio _wbio _tls_wbuf _on_starttls)};
1767} 1908}
1768 1909
1769sub DESTROY { 1910sub DESTROY {
1777 my $fh = delete $self->{fh}; 1918 my $fh = delete $self->{fh};
1778 my $wbuf = delete $self->{wbuf}; 1919 my $wbuf = delete $self->{wbuf};
1779 1920
1780 my @linger; 1921 my @linger;
1781 1922
1782 push @linger, AnyEvent->io (fh => $fh, poll => "w", cb => sub { 1923 push @linger, AE::io $fh, 1, sub {
1783 my $len = syswrite $fh, $wbuf, length $wbuf; 1924 my $len = syswrite $fh, $wbuf, length $wbuf;
1784 1925
1785 if ($len > 0) { 1926 if ($len > 0) {
1786 substr $wbuf, 0, $len, ""; 1927 substr $wbuf, 0, $len, "";
1787 } else { 1928 } else {
1788 @linger = (); # end 1929 @linger = (); # end
1789 } 1930 }
1790 }); 1931 };
1791 push @linger, AnyEvent->timer (after => $linger, cb => sub { 1932 push @linger, AE::timer $linger, 0, sub {
1792 @linger = (); 1933 @linger = ();
1793 }); 1934 };
1794 } 1935 }
1795} 1936}
1796 1937
1797=item $handle->destroy 1938=item $handle->destroy
1798 1939
1799Shuts down the handle object as much as possible - this call ensures that 1940Shuts down the handle object as much as possible - this call ensures that
1800no further callbacks will be invoked and as many resources as possible 1941no further callbacks will be invoked and as many resources as possible
1801will be freed. You must not call any methods on the object afterwards. 1942will be freed. Any method you will call on the handle object after
1943destroying it in this way will be silently ignored (and it will return the
1944empty list).
1802 1945
1803Normally, you can just "forget" any references to an AnyEvent::Handle 1946Normally, you can just "forget" any references to an AnyEvent::Handle
1804object and it will simply shut down. This works in fatal error and EOF 1947object and it will simply shut down. This works in fatal error and EOF
1805callbacks, as well as code outside. It does I<NOT> work in a read or write 1948callbacks, as well as code outside. It does I<NOT> work in a read or write
1806callback, so when you want to destroy the AnyEvent::Handle object from 1949callback, so when you want to destroy the AnyEvent::Handle object from
1820sub destroy { 1963sub destroy {
1821 my ($self) = @_; 1964 my ($self) = @_;
1822 1965
1823 $self->DESTROY; 1966 $self->DESTROY;
1824 %$self = (); 1967 %$self = ();
1968 bless $self, "AnyEvent::Handle::destroyed";
1969}
1970
1971sub AnyEvent::Handle::destroyed::AUTOLOAD {
1972 #nop
1825} 1973}
1826 1974
1827=item AnyEvent::Handle::TLS_CTX 1975=item AnyEvent::Handle::TLS_CTX
1828 1976
1829This function creates and returns the AnyEvent::TLS object used by default 1977This function creates and returns the AnyEvent::TLS object used by default

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines