ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent/lib/AnyEvent/Handle.pm
(Generate patch)

Comparing AnyEvent/lib/AnyEvent/Handle.pm (file contents):
Revision 1.159 by root, Fri Jul 24 12:35:58 2009 UTC vs.
Revision 1.182 by root, Thu Sep 3 12:35:01 2009 UTC

1package AnyEvent::Handle;
2
3use Scalar::Util ();
4use Carp ();
5use Errno qw(EAGAIN EINTR);
6
7use AnyEvent (); BEGIN { AnyEvent::common_sense }
8use AnyEvent::Util qw(WSAEWOULDBLOCK);
9
10=head1 NAME 1=head1 NAME
11 2
12AnyEvent::Handle - non-blocking I/O on file handles via AnyEvent 3AnyEvent::Handle - non-blocking I/O on file handles via AnyEvent
13
14=cut
15
16our $VERSION = 4.86;
17 4
18=head1 SYNOPSIS 5=head1 SYNOPSIS
19 6
20 use AnyEvent; 7 use AnyEvent;
21 use AnyEvent::Handle; 8 use AnyEvent::Handle;
59C<on_error> callback. 46C<on_error> callback.
60 47
61All callbacks will be invoked with the handle object as their first 48All callbacks will be invoked with the handle object as their first
62argument. 49argument.
63 50
51=cut
52
53package AnyEvent::Handle;
54
55use Scalar::Util ();
56use List::Util ();
57use Carp ();
58use Errno qw(EAGAIN EINTR);
59
60use AnyEvent (); BEGIN { AnyEvent::common_sense }
61use AnyEvent::Util qw(WSAEWOULDBLOCK);
62
63our $VERSION = $AnyEvent::VERSION;
64
64=head1 METHODS 65=head1 METHODS
65 66
66=over 4 67=over 4
67 68
68=item $handle = B<new> AnyEvent::TLS fh => $filehandle, key => value... 69=item $handle = B<new> AnyEvent::TLS fh => $filehandle, key => value...
83Try to connect to the specified host and service (port), using 84Try to connect to the specified host and service (port), using
84C<AnyEvent::Socket::tcp_connect>. The C<$host> additionally becomes the 85C<AnyEvent::Socket::tcp_connect>. The C<$host> additionally becomes the
85default C<peername>. 86default C<peername>.
86 87
87You have to specify either this parameter, or C<fh>, above. 88You have to specify either this parameter, or C<fh>, above.
89
90It is possible to push requests on the read and write queues, and modify
91properties of the stream, even while AnyEvent::Handle is connecting.
88 92
89When this parameter is specified, then the C<on_prepare>, 93When this parameter is specified, then the C<on_prepare>,
90C<on_connect_error> and C<on_connect> callbacks will be called under the 94C<on_connect_error> and C<on_connect> callbacks will be called under the
91appropriate circumstances: 95appropriate circumstances:
92 96
97This (rarely used) callback is called before a new connection is 101This (rarely used) callback is called before a new connection is
98attempted, but after the file handle has been created. It could be used to 102attempted, but after the file handle has been created. It could be used to
99prepare the file handle with parameters required for the actual connect 103prepare the file handle with parameters required for the actual connect
100(as opposed to settings that can be changed when the connection is already 104(as opposed to settings that can be changed when the connection is already
101established). 105established).
106
107The return value of this callback should be the connect timeout value in
108seconds (or C<0>, or C<undef>, or the empty list, to indicate the default
109timeout is to be used).
102 110
103=item on_connect => $cb->($handle, $host, $port, $retry->()) 111=item on_connect => $cb->($handle, $host, $port, $retry->())
104 112
105This callback is called when a connection has been successfully established. 113This callback is called when a connection has been successfully established.
106 114
209memory and push it into the queue, but instead only read more data from 217memory and push it into the queue, but instead only read more data from
210the file when the write queue becomes empty. 218the file when the write queue becomes empty.
211 219
212=item timeout => $fractional_seconds 220=item timeout => $fractional_seconds
213 221
222=item rtimeout => $fractional_seconds
223
224=item wtimeout => $fractional_seconds
225
214If non-zero, then this enables an "inactivity" timeout: whenever this many 226If non-zero, then these enables an "inactivity" timeout: whenever this
215seconds pass without a successful read or write on the underlying file 227many seconds pass without a successful read or write on the underlying
216handle, the C<on_timeout> callback will be invoked (and if that one is 228file handle (or a call to C<timeout_reset>), the C<on_timeout> callback
217missing, a non-fatal C<ETIMEDOUT> error will be raised). 229will be invoked (and if that one is missing, a non-fatal C<ETIMEDOUT>
230error will be raised).
231
232There are three variants of the timeouts that work fully independent
233of each other, for both read and write, just read, and just write:
234C<timeout>, C<rtimeout> and C<wtimeout>, with corresponding callbacks
235C<on_timeout>, C<on_rtimeout> and C<on_wtimeout>, and reset functions
236C<timeout_reset>, C<rtimeout_reset>, and C<wtimeout_reset>.
218 237
219Note that timeout processing is also active when you currently do not have 238Note that timeout processing is also active when you currently do not have
220any outstanding read or write requests: If you plan to keep the connection 239any outstanding read or write requests: If you plan to keep the connection
221idle then you should disable the timout temporarily or ignore the timeout 240idle then you should disable the timout temporarily or ignore the timeout
222in the C<on_timeout> callback, in which case AnyEvent::Handle will simply 241in the C<on_timeout> callback, in which case AnyEvent::Handle will simply
266accomplishd by setting this option to a true value. 285accomplishd by setting this option to a true value.
267 286
268The default is your opertaing system's default behaviour (most likely 287The default is your opertaing system's default behaviour (most likely
269enabled), this option explicitly enables or disables it, if possible. 288enabled), this option explicitly enables or disables it, if possible.
270 289
290=item keepalive => <boolean>
291
292Enables (default disable) the SO_KEEPALIVE option on the stream socket:
293normally, TCP connections have no time-out once established, so TCP
294conenctions, once established, can stay alive forever even when the other
295side has long gone. TCP keepalives are a cheap way to take down long-lived
296TCP connections whent he other side becomes unreachable. While the default
297is OS-dependent, TCP keepalives usually kick in after around two hours,
298and, if the other side doesn't reply, take down the TCP connection some 10
299to 15 minutes later.
300
301It is harmless to specify this option for file handles that do not support
302keepalives, and enabling it on connections that are potentially long-lived
303is usually a good idea.
304
305=item oobinline => <boolean>
306
307BSD majorly fucked up the implementation of TCP urgent data. The result
308is that almost no OS implements TCP according to the specs, and every OS
309implements it slightly differently.
310
311If you want to handle TCP urgent data, then setting this flag gives you
312the most portable way of getting urgent data, by putting it into the
313stream.
314
271=item read_size => <bytes> 315=item read_size => <bytes>
272 316
273The default read block size (the amount of bytes this module will 317The default read block size (the amount of bytes this module will
274try to read during each loop iteration, which affects memory 318try to read during each loop iteration, which affects memory
275requirements). Default: C<8192>. 319requirements). Default: C<8192>.
431 delete $self->{_skip_drain_rbuf}; 475 delete $self->{_skip_drain_rbuf};
432 $self->_start; 476 $self->_start;
433 477
434 $self->{on_connect} 478 $self->{on_connect}
435 and $self->{on_connect}($self, $host, $port, sub { 479 and $self->{on_connect}($self, $host, $port, sub {
436 delete @$self{qw(fh _tw _ww _rw _eof _queue rbuf _wbuf tls _tls_rbuf _tls_wbuf)}; 480 delete @$self{qw(fh _tw _rtw _wtw _ww _rw _eof _queue rbuf _wbuf tls _tls_rbuf _tls_wbuf)};
437 $self->{_skip_drain_rbuf} = 1; 481 $self->{_skip_drain_rbuf} = 1;
438 &$retry; 482 &$retry;
439 }); 483 });
440 484
441 } else { 485 } else {
442 if ($self->{on_connect_error}) { 486 if ($self->{on_connect_error}) {
443 $self->{on_connect_error}($self, "$!"); 487 $self->{on_connect_error}($self, "$!");
444 $self->destroy; 488 $self->destroy;
445 } else { 489 } else {
446 $self->fatal ($!, 1); 490 $self->_error ($!, 1);
447 } 491 }
448 } 492 }
449 }, 493 },
450 sub { 494 sub {
451 local $self->{fh} = $_[0]; 495 local $self->{fh} = $_[0];
452 496
497 $self->{on_prepare}
453 $self->{on_prepare}->($self) 498 ? $self->{on_prepare}->($self)
454 if $self->{on_prepare}; 499 : ()
455 } 500 }
456 ); 501 );
457 } 502 }
458 503
459 } else { 504 } else {
466sub _start { 511sub _start {
467 my ($self) = @_; 512 my ($self) = @_;
468 513
469 AnyEvent::Util::fh_nonblocking $self->{fh}, 1; 514 AnyEvent::Util::fh_nonblocking $self->{fh}, 1;
470 515
516 $self->{_activity} =
517 $self->{_ractivity} =
471 $self->{_activity} = AnyEvent->now; 518 $self->{_wactivity} = AE::now;
472 $self->_timeout;
473 519
520 $self->timeout (delete $self->{timeout} ) if $self->{timeout};
521 $self->rtimeout (delete $self->{rtimeout} ) if $self->{rtimeout};
522 $self->wtimeout (delete $self->{wtimeout} ) if $self->{wtimeout};
523
474 $self->no_delay (delete $self->{no_delay}) if exists $self->{no_delay}; 524 $self->no_delay (delete $self->{no_delay} ) if exists $self->{no_delay};
525 $self->keepalive (delete $self->{keepalive}) if exists $self->{keepalive};
526 $self->oobinline (delete $self->{oobinline}) if exists $self->{oobinline};
475 527
476 $self->starttls (delete $self->{tls}, delete $self->{tls_ctx}) 528 $self->starttls (delete $self->{tls}, delete $self->{tls_ctx})
477 if $self->{tls}; 529 if $self->{tls};
478 530
479 $self->on_drain (delete $self->{on_drain}) if $self->{on_drain}; 531 $self->on_drain (delete $self->{on_drain}) if $self->{on_drain};
480 532
481 $self->start_read 533 $self->start_read
482 if $self->{on_read} || @{ $self->{_queue} }; 534 if $self->{on_read} || @{ $self->{_queue} };
483}
484 535
485#sub _shutdown { 536 $self->_drain_wbuf;
486# my ($self) = @_; 537}
487#
488# delete @$self{qw(_tw _rw _ww fh wbuf on_read _queue)};
489# $self->{_eof} = 1; # tell starttls et. al to stop trying
490#
491# &_freetls;
492#}
493 538
494sub _error { 539sub _error {
495 my ($self, $errno, $fatal, $message) = @_; 540 my ($self, $errno, $fatal, $message) = @_;
496 541
497 $! = $errno; 542 $! = $errno;
534 $_[0]{on_eof} = $_[1]; 579 $_[0]{on_eof} = $_[1];
535} 580}
536 581
537=item $handle->on_timeout ($cb) 582=item $handle->on_timeout ($cb)
538 583
539Replace the current C<on_timeout> callback, or disables the callback (but 584=item $handle->on_rtimeout ($cb)
540not the timeout) if C<$cb> = C<undef>. See the C<timeout> constructor
541argument and method.
542 585
543=cut 586=item $handle->on_wtimeout ($cb)
544 587
545sub on_timeout { 588Replace the current C<on_timeout>, C<on_rtimeout> or C<on_wtimeout>
546 $_[0]{on_timeout} = $_[1]; 589callback, or disables the callback (but not the timeout) if C<$cb> =
547} 590C<undef>. See the C<timeout> constructor argument and method.
591
592=cut
593
594# see below
548 595
549=item $handle->autocork ($boolean) 596=item $handle->autocork ($boolean)
550 597
551Enables or disables the current autocork behaviour (see C<autocork> 598Enables or disables the current autocork behaviour (see C<autocork>
552constructor argument). Changes will only take effect on the next write. 599constructor argument). Changes will only take effect on the next write.
567sub no_delay { 614sub no_delay {
568 $_[0]{no_delay} = $_[1]; 615 $_[0]{no_delay} = $_[1];
569 616
570 eval { 617 eval {
571 local $SIG{__DIE__}; 618 local $SIG{__DIE__};
572 setsockopt $_[0]{fh}, &Socket::IPPROTO_TCP, &Socket::TCP_NODELAY, int $_[1] 619 setsockopt $_[0]{fh}, Socket::IPPROTO_TCP (), Socket::TCP_NODELAY (), int $_[1]
573 if $_[0]{fh}; 620 if $_[0]{fh};
574 }; 621 };
575} 622}
576 623
624=item $handle->keepalive ($boolean)
625
626Enables or disables the C<keepalive> setting (see constructor argument of
627the same name for details).
628
629=cut
630
631sub keepalive {
632 $_[0]{keepalive} = $_[1];
633
634 eval {
635 local $SIG{__DIE__};
636 setsockopt $_[0]{fh}, Socket::SOL_SOCKET (), Socket::SO_KEEPALIVE (), int $_[1]
637 if $_[0]{fh};
638 };
639}
640
641=item $handle->oobinline ($boolean)
642
643Enables or disables the C<oobinline> setting (see constructor argument of
644the same name for details).
645
646=cut
647
648sub oobinline {
649 $_[0]{oobinline} = $_[1];
650
651 eval {
652 local $SIG{__DIE__};
653 setsockopt $_[0]{fh}, Socket::SOL_SOCKET (), Socket::SO_OOBINLINE (), int $_[1]
654 if $_[0]{fh};
655 };
656}
657
658=item $handle->keepalive ($boolean)
659
660Enables or disables the C<keepalive> setting (see constructor argument of
661the same name for details).
662
663=cut
664
665sub keepalive {
666 $_[0]{keepalive} = $_[1];
667
668 eval {
669 local $SIG{__DIE__};
670 setsockopt $_[0]{fh}, Socket::SOL_SOCKET (), Socket::SO_KEEPALIVE (), int $_[1]
671 if $_[0]{fh};
672 };
673}
674
577=item $handle->on_starttls ($cb) 675=item $handle->on_starttls ($cb)
578 676
579Replace the current C<on_starttls> callback (see the C<on_starttls> constructor argument). 677Replace the current C<on_starttls> callback (see the C<on_starttls> constructor argument).
580 678
581=cut 679=cut
592 690
593sub on_starttls { 691sub on_starttls {
594 $_[0]{on_stoptls} = $_[1]; 692 $_[0]{on_stoptls} = $_[1];
595} 693}
596 694
695=item $handle->rbuf_max ($max_octets)
696
697Configures the C<rbuf_max> setting (C<undef> disables it).
698
699=cut
700
701sub rbuf_max {
702 $_[0]{rbuf_max} = $_[1];
703}
704
597############################################################################# 705#############################################################################
598 706
599=item $handle->timeout ($seconds) 707=item $handle->timeout ($seconds)
600 708
709=item $handle->rtimeout ($seconds)
710
711=item $handle->wtimeout ($seconds)
712
601Configures (or disables) the inactivity timeout. 713Configures (or disables) the inactivity timeout.
602 714
603=cut 715=item $handle->timeout_reset
604 716
605sub timeout { 717=item $handle->rtimeout_reset
718
719=item $handle->wtimeout_reset
720
721Reset the activity timeout, as if data was received or sent.
722
723These methods are cheap to call.
724
725=cut
726
727for my $dir ("", "r", "w") {
728 my $timeout = "${dir}timeout";
729 my $tw = "_${dir}tw";
730 my $on_timeout = "on_${dir}timeout";
731 my $activity = "_${dir}activity";
732 my $cb;
733
734 *$on_timeout = sub {
735 $_[0]{$on_timeout} = $_[1];
736 };
737
738 *$timeout = sub {
606 my ($self, $timeout) = @_; 739 my ($self, $new_value) = @_;
607 740
608 $self->{timeout} = $timeout; 741 $self->{$timeout} = $new_value;
609 $self->_timeout; 742 delete $self->{$tw}; &$cb;
610} 743 };
611 744
745 *{"${dir}timeout_reset"} = sub {
746 $_[0]{$activity} = AE::now;
747 };
748
749 # main workhorse:
612# reset the timeout watcher, as neccessary 750 # reset the timeout watcher, as neccessary
613# also check for time-outs 751 # also check for time-outs
614sub _timeout { 752 $cb = sub {
615 my ($self) = @_; 753 my ($self) = @_;
616 754
617 if ($self->{timeout} && $self->{fh}) { 755 if ($self->{$timeout} && $self->{fh}) {
618 my $NOW = AnyEvent->now; 756 my $NOW = AE::now;
619 757
620 # when would the timeout trigger? 758 # when would the timeout trigger?
621 my $after = $self->{_activity} + $self->{timeout} - $NOW; 759 my $after = $self->{$activity} + $self->{$timeout} - $NOW;
622 760
623 # now or in the past already? 761 # now or in the past already?
624 if ($after <= 0) { 762 if ($after <= 0) {
625 $self->{_activity} = $NOW; 763 $self->{$activity} = $NOW;
626 764
627 if ($self->{on_timeout}) { 765 if ($self->{$on_timeout}) {
628 $self->{on_timeout}($self); 766 $self->{$on_timeout}($self);
629 } else { 767 } else {
630 $self->_error (Errno::ETIMEDOUT); 768 $self->_error (Errno::ETIMEDOUT);
769 }
770
771 # callback could have changed timeout value, optimise
772 return unless $self->{$timeout};
773
774 # calculate new after
775 $after = $self->{$timeout};
631 } 776 }
632 777
633 # callback could have changed timeout value, optimise 778 Scalar::Util::weaken $self;
634 return unless $self->{timeout}; 779 return unless $self; # ->error could have destroyed $self
635 780
636 # calculate new after 781 $self->{$tw} ||= AE::timer $after, 0, sub {
637 $after = $self->{timeout}; 782 delete $self->{$tw};
783 $cb->($self);
784 };
785 } else {
786 delete $self->{$tw};
638 } 787 }
639
640 Scalar::Util::weaken $self;
641 return unless $self; # ->error could have destroyed $self
642
643 $self->{_tw} ||= AnyEvent->timer (after => $after, cb => sub {
644 delete $self->{_tw};
645 $self->_timeout;
646 });
647 } else {
648 delete $self->{_tw};
649 } 788 }
650} 789}
651 790
652############################################################################# 791#############################################################################
653 792
701 my $len = syswrite $self->{fh}, $self->{wbuf}; 840 my $len = syswrite $self->{fh}, $self->{wbuf};
702 841
703 if (defined $len) { 842 if (defined $len) {
704 substr $self->{wbuf}, 0, $len, ""; 843 substr $self->{wbuf}, 0, $len, "";
705 844
706 $self->{_activity} = AnyEvent->now; 845 $self->{_activity} = $self->{_wactivity} = AE::now;
707 846
708 $self->{on_drain}($self) 847 $self->{on_drain}($self)
709 if $self->{low_water_mark} >= (length $self->{wbuf}) + (length $self->{_tls_wbuf}) 848 if $self->{low_water_mark} >= (length $self->{wbuf}) + (length $self->{_tls_wbuf})
710 && $self->{on_drain}; 849 && $self->{on_drain};
711 850
717 856
718 # try to write data immediately 857 # try to write data immediately
719 $cb->() unless $self->{autocork}; 858 $cb->() unless $self->{autocork};
720 859
721 # if still data left in wbuf, we need to poll 860 # if still data left in wbuf, we need to poll
722 $self->{_ww} = AnyEvent->io (fh => $self->{fh}, poll => "w", cb => $cb) 861 $self->{_ww} = AE::io $self->{fh}, 1, $cb
723 if length $self->{wbuf}; 862 if length $self->{wbuf};
724 }; 863 };
725} 864}
726 865
727our %WH; 866our %WH;
740 ->($self, @_); 879 ->($self, @_);
741 } 880 }
742 881
743 if ($self->{tls}) { 882 if ($self->{tls}) {
744 $self->{_tls_wbuf} .= $_[0]; 883 $self->{_tls_wbuf} .= $_[0];
745 884 &_dotls ($self) if $self->{fh};
746 &_dotls ($self);
747 } else { 885 } else {
748 $self->{wbuf} .= $_[0]; 886 $self->{wbuf} .= $_[0];
749 $self->_drain_wbuf if $self->{fh}; 887 $self->_drain_wbuf if $self->{fh};
750 } 888 }
751} 889}
752 890
753=item $handle->push_write (type => @args) 891=item $handle->push_write (type => @args)
818Other languages could read single lines terminated by a newline and pass 956Other languages could read single lines terminated by a newline and pass
819this line into their JSON decoder of choice. 957this line into their JSON decoder of choice.
820 958
821=cut 959=cut
822 960
961sub json_coder() {
962 eval { require JSON::XS; JSON::XS->new->utf8 }
963 || do { require JSON; JSON->new->utf8 }
964}
965
823register_write_type json => sub { 966register_write_type json => sub {
824 my ($self, $ref) = @_; 967 my ($self, $ref) = @_;
825 968
826 require JSON; 969 my $json = $self->{json} ||= json_coder;
827 970
828 $self->{json} ? $self->{json}->encode ($ref) 971 $json->encode ($ref)
829 : JSON::encode_json ($ref)
830}; 972};
831 973
832=item storable => $reference 974=item storable => $reference
833 975
834Freezes the given reference using L<Storable> and writes it to the 976Freezes the given reference using L<Storable> and writes it to the
972 1114
973sub _drain_rbuf { 1115sub _drain_rbuf {
974 my ($self) = @_; 1116 my ($self) = @_;
975 1117
976 # avoid recursion 1118 # avoid recursion
977 return if exists $self->{_skip_drain_rbuf}; 1119 return if $self->{_skip_drain_rbuf};
978 local $self->{_skip_drain_rbuf} = 1; 1120 local $self->{_skip_drain_rbuf} = 1;
979
980 if (
981 defined $self->{rbuf_max}
982 && $self->{rbuf_max} < length $self->{rbuf}
983 ) {
984 $self->_error (Errno::ENOSPC, 1), return;
985 }
986 1121
987 while () { 1122 while () {
988 # we need to use a separate tls read buffer, as we must not receive data while 1123 # we need to use a separate tls read buffer, as we must not receive data while
989 # we are draining the buffer, and this can only happen with TLS. 1124 # we are draining the buffer, and this can only happen with TLS.
990 $self->{rbuf} .= delete $self->{_tls_rbuf} if exists $self->{_tls_rbuf}; 1125 $self->{rbuf} .= delete $self->{_tls_rbuf}
1126 if exists $self->{_tls_rbuf};
991 1127
992 my $len = length $self->{rbuf}; 1128 my $len = length $self->{rbuf};
993 1129
994 if (my $cb = shift @{ $self->{_queue} }) { 1130 if (my $cb = shift @{ $self->{_queue} }) {
995 unless ($cb->($self)) { 1131 unless ($cb->($self)) {
996 if ($self->{_eof}) { 1132 # no progress can be made
997 # no progress can be made (not enough data and no data forthcoming) 1133 # (not enough data and no data forthcoming)
998 $self->_error (Errno::EPIPE, 1), return; 1134 $self->_error (Errno::EPIPE, 1), return
999 } 1135 if $self->{_eof};
1000 1136
1001 unshift @{ $self->{_queue} }, $cb; 1137 unshift @{ $self->{_queue} }, $cb;
1002 last; 1138 last;
1003 } 1139 }
1004 } elsif ($self->{on_read}) { 1140 } elsif ($self->{on_read}) {
1024 last; 1160 last;
1025 } 1161 }
1026 } 1162 }
1027 1163
1028 if ($self->{_eof}) { 1164 if ($self->{_eof}) {
1029 if ($self->{on_eof}) { 1165 $self->{on_eof}
1030 $self->{on_eof}($self) 1166 ? $self->{on_eof}($self)
1031 } else {
1032 $self->_error (0, 1, "Unexpected end-of-file"); 1167 : $self->_error (0, 1, "Unexpected end-of-file");
1033 } 1168
1169 return;
1170 }
1171
1172 if (
1173 defined $self->{rbuf_max}
1174 && $self->{rbuf_max} < length $self->{rbuf}
1175 ) {
1176 $self->_error (Errno::ENOSPC, 1), return;
1034 } 1177 }
1035 1178
1036 # may need to restart read watcher 1179 # may need to restart read watcher
1037 unless ($self->{_rw}) { 1180 unless ($self->{_rw}) {
1038 $self->start_read 1181 $self->start_read
1125 my $type = shift; 1268 my $type = shift;
1126 1269
1127 $cb = ($RH{$type} or Carp::croak "unsupported type passed to AnyEvent::Handle::unshift_read") 1270 $cb = ($RH{$type} or Carp::croak "unsupported type passed to AnyEvent::Handle::unshift_read")
1128 ->($self, $cb, @_); 1271 ->($self, $cb, @_);
1129 } 1272 }
1130
1131 1273
1132 unshift @{ $self->{_queue} }, $cb; 1274 unshift @{ $self->{_queue} }, $cb;
1133 $self->_drain_rbuf; 1275 $self->_drain_rbuf;
1134} 1276}
1135 1277
1387=cut 1529=cut
1388 1530
1389register_read_type json => sub { 1531register_read_type json => sub {
1390 my ($self, $cb) = @_; 1532 my ($self, $cb) = @_;
1391 1533
1392 my $json = $self->{json} ||= 1534 my $json = $self->{json} ||= json_coder;
1393 eval { require JSON::XS; JSON::XS->new->utf8 }
1394 || do { require JSON; JSON->new->utf8 };
1395 1535
1396 my $data; 1536 my $data;
1397 my $rbuf = \$self->{rbuf}; 1537 my $rbuf = \$self->{rbuf};
1398 1538
1399 sub { 1539 sub {
1519 my ($self) = @_; 1659 my ($self) = @_;
1520 1660
1521 unless ($self->{_rw} || $self->{_eof}) { 1661 unless ($self->{_rw} || $self->{_eof}) {
1522 Scalar::Util::weaken $self; 1662 Scalar::Util::weaken $self;
1523 1663
1524 $self->{_rw} = AnyEvent->io (fh => $self->{fh}, poll => "r", cb => sub { 1664 $self->{_rw} = AE::io $self->{fh}, 0, sub {
1525 my $rbuf = \($self->{tls} ? my $buf : $self->{rbuf}); 1665 my $rbuf = \($self->{tls} ? my $buf : $self->{rbuf});
1526 my $len = sysread $self->{fh}, $$rbuf, $self->{read_size} || 8192, length $$rbuf; 1666 my $len = sysread $self->{fh}, $$rbuf, $self->{read_size} || 8192, length $$rbuf;
1527 1667
1528 if ($len > 0) { 1668 if ($len > 0) {
1529 $self->{_activity} = AnyEvent->now; 1669 $self->{_activity} = $self->{_ractivity} = AE::now;
1530 1670
1531 if ($self->{tls}) { 1671 if ($self->{tls}) {
1532 Net::SSLeay::BIO_write ($self->{_rbio}, $$rbuf); 1672 Net::SSLeay::BIO_write ($self->{_rbio}, $$rbuf);
1533 1673
1534 &_dotls ($self); 1674 &_dotls ($self);
1542 $self->_drain_rbuf; 1682 $self->_drain_rbuf;
1543 1683
1544 } elsif ($! != EAGAIN && $! != EINTR && $! != WSAEWOULDBLOCK) { 1684 } elsif ($! != EAGAIN && $! != EINTR && $! != WSAEWOULDBLOCK) {
1545 return $self->_error ($!, 1); 1685 return $self->_error ($!, 1);
1546 } 1686 }
1547 }); 1687 };
1548 } 1688 }
1549} 1689}
1550 1690
1551our $ERROR_SYSCALL; 1691our $ERROR_SYSCALL;
1552our $ERROR_WANT_READ; 1692our $ERROR_WANT_READ;
1649The TLS connection object will end up in C<< $handle->{tls} >>, the TLS 1789The TLS connection object will end up in C<< $handle->{tls} >>, the TLS
1650context in C<< $handle->{tls_ctx} >> after this call and can be used or 1790context in C<< $handle->{tls_ctx} >> after this call and can be used or
1651changed to your liking. Note that the handshake might have already started 1791changed to your liking. Note that the handshake might have already started
1652when this function returns. 1792when this function returns.
1653 1793
1654If it an error to start a TLS handshake more than once per 1794Due to bugs in OpenSSL, it might or might not be possible to do multiple
1655AnyEvent::Handle object (this is due to bugs in OpenSSL). 1795handshakes on the same stream. Best do not attempt to use the stream after
1796stopping TLS.
1656 1797
1657=cut 1798=cut
1658 1799
1659our %TLS_CACHE; #TODO not yet documented, should we? 1800our %TLS_CACHE; #TODO not yet documented, should we?
1660 1801
1661sub starttls { 1802sub starttls {
1662 my ($self, $ssl, $ctx) = @_; 1803 my ($self, $tls, $ctx) = @_;
1804
1805 Carp::croak "It is an error to call starttls on an AnyEvent::Handle object while TLS is already active, caught"
1806 if $self->{tls};
1807
1808 $self->{tls} = $tls;
1809 $self->{tls_ctx} = $ctx if @_ > 2;
1810
1811 return unless $self->{fh};
1663 1812
1664 require Net::SSLeay; 1813 require Net::SSLeay;
1665
1666 Carp::croak "it is an error to call starttls more than once on an AnyEvent::Handle object"
1667 if $self->{tls};
1668 1814
1669 $ERROR_SYSCALL = Net::SSLeay::ERROR_SYSCALL (); 1815 $ERROR_SYSCALL = Net::SSLeay::ERROR_SYSCALL ();
1670 $ERROR_WANT_READ = Net::SSLeay::ERROR_WANT_READ (); 1816 $ERROR_WANT_READ = Net::SSLeay::ERROR_WANT_READ ();
1671 1817
1818 $tls = delete $self->{tls};
1672 $ctx ||= $self->{tls_ctx}; 1819 $ctx = $self->{tls_ctx};
1673 1820
1674 local $Carp::CarpLevel = 1; # skip ourselves when creating a new context or session 1821 local $Carp::CarpLevel = 1; # skip ourselves when creating a new context or session
1675 1822
1676 if ("HASH" eq ref $ctx) { 1823 if ("HASH" eq ref $ctx) {
1677 require AnyEvent::TLS; 1824 require AnyEvent::TLS;
1683 $ctx = new AnyEvent::TLS %$ctx; 1830 $ctx = new AnyEvent::TLS %$ctx;
1684 } 1831 }
1685 } 1832 }
1686 1833
1687 $self->{tls_ctx} = $ctx || TLS_CTX (); 1834 $self->{tls_ctx} = $ctx || TLS_CTX ();
1688 $self->{tls} = $ssl = $self->{tls_ctx}->_get_session ($ssl, $self, $self->{peername}); 1835 $self->{tls} = $tls = $self->{tls_ctx}->_get_session ($tls, $self, $self->{peername});
1689 1836
1690 # basically, this is deep magic (because SSL_read should have the same issues) 1837 # basically, this is deep magic (because SSL_read should have the same issues)
1691 # but the openssl maintainers basically said: "trust us, it just works". 1838 # but the openssl maintainers basically said: "trust us, it just works".
1692 # (unfortunately, we have to hardcode constants because the abysmally misdesigned 1839 # (unfortunately, we have to hardcode constants because the abysmally misdesigned
1693 # and mismaintained ssleay-module doesn't even offer them). 1840 # and mismaintained ssleay-module doesn't even offer them).
1700 # and we drive openssl fully in blocking mode here. Or maybe we don't - openssl seems to 1847 # and we drive openssl fully in blocking mode here. Or maybe we don't - openssl seems to
1701 # have identity issues in that area. 1848 # have identity issues in that area.
1702# Net::SSLeay::CTX_set_mode ($ssl, 1849# Net::SSLeay::CTX_set_mode ($ssl,
1703# (eval { local $SIG{__DIE__}; Net::SSLeay::MODE_ENABLE_PARTIAL_WRITE () } || 1) 1850# (eval { local $SIG{__DIE__}; Net::SSLeay::MODE_ENABLE_PARTIAL_WRITE () } || 1)
1704# | (eval { local $SIG{__DIE__}; Net::SSLeay::MODE_ACCEPT_MOVING_WRITE_BUFFER () } || 2)); 1851# | (eval { local $SIG{__DIE__}; Net::SSLeay::MODE_ACCEPT_MOVING_WRITE_BUFFER () } || 2));
1705 Net::SSLeay::CTX_set_mode ($ssl, 1|2); 1852 Net::SSLeay::CTX_set_mode ($tls, 1|2);
1706 1853
1707 $self->{_rbio} = Net::SSLeay::BIO_new (Net::SSLeay::BIO_s_mem ()); 1854 $self->{_rbio} = Net::SSLeay::BIO_new (Net::SSLeay::BIO_s_mem ());
1708 $self->{_wbio} = Net::SSLeay::BIO_new (Net::SSLeay::BIO_s_mem ()); 1855 $self->{_wbio} = Net::SSLeay::BIO_new (Net::SSLeay::BIO_s_mem ());
1709 1856
1857 Net::SSLeay::BIO_write ($self->{_rbio}, delete $self->{rbuf});
1858
1710 Net::SSLeay::set_bio ($ssl, $self->{_rbio}, $self->{_wbio}); 1859 Net::SSLeay::set_bio ($tls, $self->{_rbio}, $self->{_wbio});
1711 1860
1712 $self->{_on_starttls} = sub { $_[0]{on_starttls}(@_) } 1861 $self->{_on_starttls} = sub { $_[0]{on_starttls}(@_) }
1713 if $self->{on_starttls}; 1862 if $self->{on_starttls};
1714 1863
1715 &_dotls; # need to trigger the initial handshake 1864 &_dotls; # need to trigger the initial handshake
1718 1867
1719=item $handle->stoptls 1868=item $handle->stoptls
1720 1869
1721Shuts down the SSL connection - this makes a proper EOF handshake by 1870Shuts down the SSL connection - this makes a proper EOF handshake by
1722sending a close notify to the other side, but since OpenSSL doesn't 1871sending a close notify to the other side, but since OpenSSL doesn't
1723support non-blocking shut downs, it is not possible to re-use the stream 1872support non-blocking shut downs, it is not guarenteed that you can re-use
1724afterwards. 1873the stream afterwards.
1725 1874
1726=cut 1875=cut
1727 1876
1728sub stoptls { 1877sub stoptls {
1729 my ($self) = @_; 1878 my ($self) = @_;
1742sub _freetls { 1891sub _freetls {
1743 my ($self) = @_; 1892 my ($self) = @_;
1744 1893
1745 return unless $self->{tls}; 1894 return unless $self->{tls};
1746 1895
1747 $self->{tls_ctx}->_put_session (delete $self->{tls}); 1896 $self->{tls_ctx}->_put_session (delete $self->{tls})
1897 if $self->{tls} > 0;
1748 1898
1749 delete @$self{qw(_rbio _wbio _tls_wbuf _on_starttls)}; 1899 delete @$self{qw(_rbio _wbio _tls_wbuf _on_starttls)};
1750} 1900}
1751 1901
1752sub DESTROY { 1902sub DESTROY {
1760 my $fh = delete $self->{fh}; 1910 my $fh = delete $self->{fh};
1761 my $wbuf = delete $self->{wbuf}; 1911 my $wbuf = delete $self->{wbuf};
1762 1912
1763 my @linger; 1913 my @linger;
1764 1914
1765 push @linger, AnyEvent->io (fh => $fh, poll => "w", cb => sub { 1915 push @linger, AE::io $fh, 1, sub {
1766 my $len = syswrite $fh, $wbuf, length $wbuf; 1916 my $len = syswrite $fh, $wbuf, length $wbuf;
1767 1917
1768 if ($len > 0) { 1918 if ($len > 0) {
1769 substr $wbuf, 0, $len, ""; 1919 substr $wbuf, 0, $len, "";
1770 } else { 1920 } else {
1771 @linger = (); # end 1921 @linger = (); # end
1772 } 1922 }
1773 }); 1923 };
1774 push @linger, AnyEvent->timer (after => $linger, cb => sub { 1924 push @linger, AE::timer $linger, 0, sub {
1775 @linger = (); 1925 @linger = ();
1776 }); 1926 };
1777 } 1927 }
1778} 1928}
1779 1929
1780=item $handle->destroy 1930=item $handle->destroy
1781 1931
1782Shuts down the handle object as much as possible - this call ensures that 1932Shuts down the handle object as much as possible - this call ensures that
1783no further callbacks will be invoked and as many resources as possible 1933no further callbacks will be invoked and as many resources as possible
1784will be freed. You must not call any methods on the object afterwards. 1934will be freed. Any method you will call on the handle object after
1935destroying it in this way will be silently ignored (and it will return the
1936empty list).
1785 1937
1786Normally, you can just "forget" any references to an AnyEvent::Handle 1938Normally, you can just "forget" any references to an AnyEvent::Handle
1787object and it will simply shut down. This works in fatal error and EOF 1939object and it will simply shut down. This works in fatal error and EOF
1788callbacks, as well as code outside. It does I<NOT> work in a read or write 1940callbacks, as well as code outside. It does I<NOT> work in a read or write
1789callback, so when you want to destroy the AnyEvent::Handle object from 1941callback, so when you want to destroy the AnyEvent::Handle object from
1803sub destroy { 1955sub destroy {
1804 my ($self) = @_; 1956 my ($self) = @_;
1805 1957
1806 $self->DESTROY; 1958 $self->DESTROY;
1807 %$self = (); 1959 %$self = ();
1960 bless $self, "AnyEvent::Handle::destroyed";
1961}
1962
1963sub AnyEvent::Handle::destroyed::AUTOLOAD {
1964 #nop
1808} 1965}
1809 1966
1810=item AnyEvent::Handle::TLS_CTX 1967=item AnyEvent::Handle::TLS_CTX
1811 1968
1812This function creates and returns the AnyEvent::TLS object used by default 1969This function creates and returns the AnyEvent::TLS object used by default

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines