ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent/lib/AnyEvent/Handle.pm
(Generate patch)

Comparing AnyEvent/lib/AnyEvent/Handle.pm (file contents):
Revision 1.40 by root, Tue May 27 05:36:27 2008 UTC vs.
Revision 1.59 by root, Thu Jun 5 16:53:11 2008 UTC

2 2
3no warnings; 3no warnings;
4use strict; 4use strict;
5 5
6use AnyEvent (); 6use AnyEvent ();
7use AnyEvent::Util qw(WSAWOULDBLOCK); 7use AnyEvent::Util qw(WSAEWOULDBLOCK);
8use Scalar::Util (); 8use Scalar::Util ();
9use Carp (); 9use Carp ();
10use Fcntl (); 10use Fcntl ();
11use Errno qw/EAGAIN EINTR/; 11use Errno qw(EAGAIN EINTR);
12 12
13=head1 NAME 13=head1 NAME
14 14
15AnyEvent::Handle - non-blocking I/O on file handles via AnyEvent 15AnyEvent::Handle - non-blocking I/O on file handles via AnyEvent
16 16
17=cut 17=cut
18 18
19our $VERSION = '0.04'; 19our $VERSION = 4.13;
20 20
21=head1 SYNOPSIS 21=head1 SYNOPSIS
22 22
23 use AnyEvent; 23 use AnyEvent;
24 use AnyEvent::Handle; 24 use AnyEvent::Handle;
75NOTE: The filehandle will be set to non-blocking (using 75NOTE: The filehandle will be set to non-blocking (using
76AnyEvent::Util::fh_nonblocking). 76AnyEvent::Util::fh_nonblocking).
77 77
78=item on_eof => $cb->($handle) 78=item on_eof => $cb->($handle)
79 79
80Set the callback to be called on EOF. 80Set the callback to be called when an end-of-file condition is detcted,
81i.e. in the case of a socket, when the other side has closed the
82connection cleanly.
81 83
82While not mandatory, it is highly recommended to set an eof callback, 84While not mandatory, it is highly recommended to set an eof callback,
83otherwise you might end up with a closed socket while you are still 85otherwise you might end up with a closed socket while you are still
84waiting for data. 86waiting for data.
85 87
86=item on_error => $cb->($handle) 88=item on_error => $cb->($handle, $fatal)
87 89
88This is the fatal error callback, that is called when, well, a fatal error 90This is the error callback, which is called when, well, some error
89occurs, such as not being able to resolve the hostname, failure to connect 91occured, such as not being able to resolve the hostname, failure to
90or a read error. 92connect or a read error.
91 93
92The object will not be in a usable state when this callback has been 94Some errors are fatal (which is indicated by C<$fatal> being true). On
93called. 95fatal errors the handle object will be shut down and will not be
96usable. Non-fatal errors can be retried by simply returning, but it is
97recommended to simply ignore this parameter and instead abondon the handle
98object when this callback is invoked.
94 99
95On callback entrance, the value of C<$!> contains the operating system 100On callback entrance, the value of C<$!> contains the operating system
96error (or C<ENOSPC>, C<EPIPE> or C<EBADMSG>). 101error (or C<ENOSPC>, C<EPIPE>, C<ETIMEDOUT> or C<EBADMSG>).
97
98The callback should throw an exception. If it returns, then
99AnyEvent::Handle will C<croak> for you.
100 102
101While not mandatory, it is I<highly> recommended to set this callback, as 103While not mandatory, it is I<highly> recommended to set this callback, as
102you will not be notified of errors otherwise. The default simply calls 104you will not be notified of errors otherwise. The default simply calls
103die. 105C<croak>.
104 106
105=item on_read => $cb->($handle) 107=item on_read => $cb->($handle)
106 108
107This sets the default read callback, which is called when data arrives 109This sets the default read callback, which is called when data arrives
108and no read request is in the queue. 110and no read request is in the queue.
119 121
120This sets the callback that is called when the write buffer becomes empty 122This sets the callback that is called when the write buffer becomes empty
121(or when the callback is set and the buffer is empty already). 123(or when the callback is set and the buffer is empty already).
122 124
123To append to the write buffer, use the C<< ->push_write >> method. 125To append to the write buffer, use the C<< ->push_write >> method.
126
127=item timeout => $fractional_seconds
128
129If non-zero, then this enables an "inactivity" timeout: whenever this many
130seconds pass without a successful read or write on the underlying file
131handle, the C<on_timeout> callback will be invoked (and if that one is
132missing, an C<ETIMEDOUT> error will be raised).
133
134Note that timeout processing is also active when you currently do not have
135any outstanding read or write requests: If you plan to keep the connection
136idle then you should disable the timout temporarily or ignore the timeout
137in the C<on_timeout> callback.
138
139Zero (the default) disables this timeout.
140
141=item on_timeout => $cb->($handle)
142
143Called whenever the inactivity timeout passes. If you return from this
144callback, then the timeout will be reset as if some activity had happened,
145so this condition is not fatal in any way.
124 146
125=item rbuf_max => <bytes> 147=item rbuf_max => <bytes>
126 148
127If defined, then a fatal error will be raised (with C<$!> set to C<ENOSPC>) 149If defined, then a fatal error will be raised (with C<$!> set to C<ENOSPC>)
128when the read buffer ever (strictly) exceeds this size. This is useful to 150when the read buffer ever (strictly) exceeds this size. This is useful to
135isn't finished). 157isn't finished).
136 158
137=item read_size => <bytes> 159=item read_size => <bytes>
138 160
139The default read block size (the amount of bytes this module will try to read 161The default read block size (the amount of bytes this module will try to read
140on each [loop iteration). Default: C<4096>. 162during each (loop iteration). Default: C<8192>.
141 163
142=item low_water_mark => <bytes> 164=item low_water_mark => <bytes>
143 165
144Sets the amount of bytes (default: C<0>) that make up an "empty" write 166Sets the amount of bytes (default: C<0>) that make up an "empty" write
145buffer: If the write reaches this size or gets even samller it is 167buffer: If the write reaches this size or gets even samller it is
172 194
173=item json => JSON or JSON::XS object 195=item json => JSON or JSON::XS object
174 196
175This is the json coder object used by the C<json> read and write types. 197This is the json coder object used by the C<json> read and write types.
176 198
177If you don't supply it, then AnyEvent::Handle will use C<encode_json> and 199If you don't supply it, then AnyEvent::Handle will create and use a
178C<decode_json>. 200suitable one, which will write and expect UTF-8 encoded JSON texts.
179 201
180Note that you are responsible to depend on the JSON module if you want to 202Note that you are responsible to depend on the JSON module if you want to
181use this functionality, as AnyEvent does not have a dependency itself. 203use this functionality, as AnyEvent does not have a dependency itself.
182 204
183=item filter_r => $cb 205=item filter_r => $cb
202 if ($self->{tls}) { 224 if ($self->{tls}) {
203 require Net::SSLeay; 225 require Net::SSLeay;
204 $self->starttls (delete $self->{tls}, delete $self->{tls_ctx}); 226 $self->starttls (delete $self->{tls}, delete $self->{tls_ctx});
205 } 227 }
206 228
207 $self->on_eof (delete $self->{on_eof} ) if $self->{on_eof}; 229 $self->{_activity} = AnyEvent->now;
208 $self->on_error (delete $self->{on_error}) if $self->{on_error}; 230 $self->_timeout;
231
209 $self->on_drain (delete $self->{on_drain}) if $self->{on_drain}; 232 $self->on_drain (delete $self->{on_drain}) if $self->{on_drain};
210 $self->on_read (delete $self->{on_read} ) if $self->{on_read}; 233 $self->on_read (delete $self->{on_read} ) if $self->{on_read};
211 234
212 $self->start_read;
213
214 $self 235 $self
215} 236}
216 237
217sub _shutdown { 238sub _shutdown {
218 my ($self) = @_; 239 my ($self) = @_;
219 240
241 delete $self->{_tw};
220 delete $self->{_rw}; 242 delete $self->{_rw};
221 delete $self->{_ww}; 243 delete $self->{_ww};
222 delete $self->{fh}; 244 delete $self->{fh};
223}
224 245
246 $self->stoptls;
247}
248
225sub error { 249sub _error {
226 my ($self) = @_; 250 my ($self, $errno, $fatal) = @_;
227 251
228 {
229 local $!;
230 $self->_shutdown; 252 $self->_shutdown
231 } 253 if $fatal;
232 254
233 $self->{on_error}($self) 255 $! = $errno;
256
234 if $self->{on_error}; 257 if ($self->{on_error}) {
235 258 $self->{on_error}($self, $fatal);
259 } else {
236 Carp::croak "AnyEvent::Handle uncaught fatal error: $!"; 260 Carp::croak "AnyEvent::Handle uncaught error: $!";
261 }
237} 262}
238 263
239=item $fh = $handle->fh 264=item $fh = $handle->fh
240 265
241This method returns the file handle of the L<AnyEvent::Handle> object. 266This method returns the file handle of the L<AnyEvent::Handle> object.
260 285
261=cut 286=cut
262 287
263sub on_eof { 288sub on_eof {
264 $_[0]{on_eof} = $_[1]; 289 $_[0]{on_eof} = $_[1];
290}
291
292=item $handle->on_timeout ($cb)
293
294Replace the current C<on_timeout> callback, or disables the callback
295(but not the timeout) if C<$cb> = C<undef>. See C<timeout> constructor
296argument.
297
298=cut
299
300sub on_timeout {
301 $_[0]{on_timeout} = $_[1];
302}
303
304#############################################################################
305
306=item $handle->timeout ($seconds)
307
308Configures (or disables) the inactivity timeout.
309
310=cut
311
312sub timeout {
313 my ($self, $timeout) = @_;
314
315 $self->{timeout} = $timeout;
316 $self->_timeout;
317}
318
319# reset the timeout watcher, as neccessary
320# also check for time-outs
321sub _timeout {
322 my ($self) = @_;
323
324 if ($self->{timeout}) {
325 my $NOW = AnyEvent->now;
326
327 # when would the timeout trigger?
328 my $after = $self->{_activity} + $self->{timeout} - $NOW;
329
330 # now or in the past already?
331 if ($after <= 0) {
332 $self->{_activity} = $NOW;
333
334 if ($self->{on_timeout}) {
335 $self->{on_timeout}($self);
336 } else {
337 $self->_error (&Errno::ETIMEDOUT);
338 }
339
340 # callback could have changed timeout value, optimise
341 return unless $self->{timeout};
342
343 # calculate new after
344 $after = $self->{timeout};
345 }
346
347 Scalar::Util::weaken $self;
348 return unless $self; # ->error could have destroyed $self
349
350 $self->{_tw} ||= AnyEvent->timer (after => $after, cb => sub {
351 delete $self->{_tw};
352 $self->_timeout;
353 });
354 } else {
355 delete $self->{_tw};
356 }
265} 357}
266 358
267############################################################################# 359#############################################################################
268 360
269=back 361=back
316 my $len = syswrite $self->{fh}, $self->{wbuf}; 408 my $len = syswrite $self->{fh}, $self->{wbuf};
317 409
318 if ($len >= 0) { 410 if ($len >= 0) {
319 substr $self->{wbuf}, 0, $len, ""; 411 substr $self->{wbuf}, 0, $len, "";
320 412
413 $self->{_activity} = AnyEvent->now;
414
321 $self->{on_drain}($self) 415 $self->{on_drain}($self)
322 if $self->{low_water_mark} >= length $self->{wbuf} 416 if $self->{low_water_mark} >= length $self->{wbuf}
323 && $self->{on_drain}; 417 && $self->{on_drain};
324 418
325 delete $self->{_ww} unless length $self->{wbuf}; 419 delete $self->{_ww} unless length $self->{wbuf};
326 } elsif ($! != EAGAIN && $! != EINTR && $! != WSAWOULDBLOCK) { 420 } elsif ($! != EAGAIN && $! != EINTR && $! != WSAEWOULDBLOCK) {
327 $self->error; 421 $self->_error ($!, 1);
328 } 422 }
329 }; 423 };
330 424
331 # try to write data immediately 425 # try to write data immediately
332 $cb->(); 426 $cb->();
352 @_ = ($WH{$type} or Carp::croak "unsupported type passed to AnyEvent::Handle::push_write") 446 @_ = ($WH{$type} or Carp::croak "unsupported type passed to AnyEvent::Handle::push_write")
353 ->($self, @_); 447 ->($self, @_);
354 } 448 }
355 449
356 if ($self->{filter_w}) { 450 if ($self->{filter_w}) {
357 $self->{filter_w}->($self, \$_[0]); 451 $self->{filter_w}($self, \$_[0]);
358 } else { 452 } else {
359 $self->{wbuf} .= $_[0]; 453 $self->{wbuf} .= $_[0];
360 $self->_drain_wbuf; 454 $self->_drain_wbuf;
361 } 455 }
362} 456}
363 457
364=item $handle->push_write (type => @args) 458=item $handle->push_write (type => @args)
365 459
366=item $handle->unshift_write (type => @args)
367
368Instead of formatting your data yourself, you can also let this module do 460Instead of formatting your data yourself, you can also let this module do
369the job by specifying a type and type-specific arguments. 461the job by specifying a type and type-specific arguments.
370 462
371Predefined types are (if you have ideas for additional types, feel free to 463Predefined types are (if you have ideas for additional types, feel free to
372drop by and tell us): 464drop by and tell us):
375 467
376=item netstring => $string 468=item netstring => $string
377 469
378Formats the given value as netstring 470Formats the given value as netstring
379(http://cr.yp.to/proto/netstrings.txt, this is not a recommendation to use them). 471(http://cr.yp.to/proto/netstrings.txt, this is not a recommendation to use them).
380
381=back
382 472
383=cut 473=cut
384 474
385register_write_type netstring => sub { 475register_write_type netstring => sub {
386 my ($self, $string) = @_; 476 my ($self, $string) = @_;
396 486
397JSON objects (and arrays) are self-delimiting, so you can write JSON at 487JSON objects (and arrays) are self-delimiting, so you can write JSON at
398one end of a handle and read them at the other end without using any 488one end of a handle and read them at the other end without using any
399additional framing. 489additional framing.
400 490
491The generated JSON text is guaranteed not to contain any newlines: While
492this module doesn't need delimiters after or between JSON texts to be
493able to read them, many other languages depend on that.
494
495A simple RPC protocol that interoperates easily with others is to send
496JSON arrays (or objects, although arrays are usually the better choice as
497they mimic how function argument passing works) and a newline after each
498JSON text:
499
500 $handle->push_write (json => ["method", "arg1", "arg2"]); # whatever
501 $handle->push_write ("\012");
502
503An AnyEvent::Handle receiver would simply use the C<json> read type and
504rely on the fact that the newline will be skipped as leading whitespace:
505
506 $handle->push_read (json => sub { my $array = $_[1]; ... });
507
508Other languages could read single lines terminated by a newline and pass
509this line into their JSON decoder of choice.
510
401=cut 511=cut
402 512
403register_write_type json => sub { 513register_write_type json => sub {
404 my ($self, $ref) = @_; 514 my ($self, $ref) = @_;
405 515
406 require JSON; 516 require JSON;
407 517
408 $self->{json} ? $self->{json}->encode ($ref) 518 $self->{json} ? $self->{json}->encode ($ref)
409 : JSON::encode_json ($ref) 519 : JSON::encode_json ($ref)
410}; 520};
521
522=back
411 523
412=item AnyEvent::Handle::register_write_type type => $coderef->($handle, @args) 524=item AnyEvent::Handle::register_write_type type => $coderef->($handle, @args)
413 525
414This function (not method) lets you add your own types to C<push_write>. 526This function (not method) lets you add your own types to C<push_write>.
415Whenever the given C<type> is used, C<push_write> will invoke the code 527Whenever the given C<type> is used, C<push_write> will invoke the code
453the specified number of bytes which give an XML datagram. 565the specified number of bytes which give an XML datagram.
454 566
455 # in the default state, expect some header bytes 567 # in the default state, expect some header bytes
456 $handle->on_read (sub { 568 $handle->on_read (sub {
457 # some data is here, now queue the length-header-read (4 octets) 569 # some data is here, now queue the length-header-read (4 octets)
458 shift->unshift_read_chunk (4, sub { 570 shift->unshift_read (chunk => 4, sub {
459 # header arrived, decode 571 # header arrived, decode
460 my $len = unpack "N", $_[1]; 572 my $len = unpack "N", $_[1];
461 573
462 # now read the payload 574 # now read the payload
463 shift->unshift_read_chunk ($len, sub { 575 shift->unshift_read (chunk => $len, sub {
464 my $xml = $_[1]; 576 my $xml = $_[1];
465 # handle xml 577 # handle xml
466 }); 578 });
467 }); 579 });
468 }); 580 });
475 587
476 # request one 588 # request one
477 $handle->push_write ("request 1\015\012"); 589 $handle->push_write ("request 1\015\012");
478 590
479 # we expect "ERROR" or "OK" as response, so push a line read 591 # we expect "ERROR" or "OK" as response, so push a line read
480 $handle->push_read_line (sub { 592 $handle->push_read (line => sub {
481 # if we got an "OK", we have to _prepend_ another line, 593 # if we got an "OK", we have to _prepend_ another line,
482 # so it will be read before the second request reads its 64 bytes 594 # so it will be read before the second request reads its 64 bytes
483 # which are already in the queue when this callback is called 595 # which are already in the queue when this callback is called
484 # we don't do this in case we got an error 596 # we don't do this in case we got an error
485 if ($_[1] eq "OK") { 597 if ($_[1] eq "OK") {
486 $_[0]->unshift_read_line (sub { 598 $_[0]->unshift_read (line => sub {
487 my $response = $_[1]; 599 my $response = $_[1];
488 ... 600 ...
489 }); 601 });
490 } 602 }
491 }); 603 });
492 604
493 # request two 605 # request two
494 $handle->push_write ("request 2\015\012"); 606 $handle->push_write ("request 2\015\012");
495 607
496 # simply read 64 bytes, always 608 # simply read 64 bytes, always
497 $handle->push_read_chunk (64, sub { 609 $handle->push_read (chunk => 64, sub {
498 my $response = $_[1]; 610 my $response = $_[1];
499 ... 611 ...
500 }); 612 });
501 613
502=over 4 614=over 4
503 615
504=cut 616=cut
505 617
506sub _drain_rbuf { 618sub _drain_rbuf {
507 my ($self) = @_; 619 my ($self) = @_;
620
621 local $self->{_in_drain} = 1;
508 622
509 if ( 623 if (
510 defined $self->{rbuf_max} 624 defined $self->{rbuf_max}
511 && $self->{rbuf_max} < length $self->{rbuf} 625 && $self->{rbuf_max} < length $self->{rbuf}
512 ) { 626 ) {
513 $! = &Errno::ENOSPC; 627 return $self->_error (&Errno::ENOSPC, 1);
514 $self->error;
515 } 628 }
516 629
517 return if $self->{in_drain}; 630 while () {
518 local $self->{in_drain} = 1;
519
520 while (my $len = length $self->{rbuf}) {
521 no strict 'refs'; 631 no strict 'refs';
632
633 my $len = length $self->{rbuf};
634
522 if (my $cb = shift @{ $self->{_queue} }) { 635 if (my $cb = shift @{ $self->{_queue} }) {
523 unless ($cb->($self)) { 636 unless ($cb->($self)) {
524 if ($self->{_eof}) { 637 if ($self->{_eof}) {
525 # no progress can be made (not enough data and no data forthcoming) 638 # no progress can be made (not enough data and no data forthcoming)
526 $! = &Errno::EPIPE; 639 return $self->_error (&Errno::EPIPE, 1);
527 $self->error;
528 } 640 }
529 641
530 unshift @{ $self->{_queue} }, $cb; 642 unshift @{ $self->{_queue} }, $cb;
531 return; 643 last;
532 } 644 }
533 } elsif ($self->{on_read}) { 645 } elsif ($self->{on_read}) {
534 $self->{on_read}($self); 646 $self->{on_read}($self);
535 647
536 if ( 648 if (
537 $self->{_eof} # if no further data will arrive
538 && $len == length $self->{rbuf} # and no data has been consumed 649 $len == length $self->{rbuf} # if no data has been consumed
539 && !@{ $self->{_queue} } # and the queue is still empty 650 && !@{ $self->{_queue} } # and the queue is still empty
540 && $self->{on_read} # and we still want to read data 651 && $self->{on_read} # but we still have on_read
541 ) { 652 ) {
653 # no further data will arrive
542 # then no progress can be made 654 # so no progress can be made
543 $! = &Errno::EPIPE; 655 return $self->_error (&Errno::EPIPE, 1)
544 $self->error; 656 if $self->{_eof};
657
658 last; # more data might arrive
545 } 659 }
546 } else { 660 } else {
547 # read side becomes idle 661 # read side becomes idle
548 delete $self->{_rw}; 662 delete $self->{_rw};
549 return; 663 last;
550 } 664 }
551 } 665 }
552 666
553 if ($self->{_eof}) {
554 $self->_shutdown;
555 $self->{on_eof}($self) 667 $self->{on_eof}($self)
556 if $self->{on_eof}; 668 if $self->{_eof} && $self->{on_eof};
669
670 # may need to restart read watcher
671 unless ($self->{_rw}) {
672 $self->start_read
673 if $self->{on_read} || @{ $self->{_queue} };
557 } 674 }
558} 675}
559 676
560=item $handle->on_read ($cb) 677=item $handle->on_read ($cb)
561 678
567 684
568sub on_read { 685sub on_read {
569 my ($self, $cb) = @_; 686 my ($self, $cb) = @_;
570 687
571 $self->{on_read} = $cb; 688 $self->{on_read} = $cb;
689 $self->_drain_rbuf if $cb && !$self->{_in_drain};
572} 690}
573 691
574=item $handle->rbuf 692=item $handle->rbuf
575 693
576Returns the read buffer (as a modifiable lvalue). 694Returns the read buffer (as a modifiable lvalue).
625 $cb = ($RH{$type} or Carp::croak "unsupported type passed to AnyEvent::Handle::push_read") 743 $cb = ($RH{$type} or Carp::croak "unsupported type passed to AnyEvent::Handle::push_read")
626 ->($self, $cb, @_); 744 ->($self, $cb, @_);
627 } 745 }
628 746
629 push @{ $self->{_queue} }, $cb; 747 push @{ $self->{_queue} }, $cb;
630 $self->_drain_rbuf; 748 $self->_drain_rbuf unless $self->{_in_drain};
631} 749}
632 750
633sub unshift_read { 751sub unshift_read {
634 my $self = shift; 752 my $self = shift;
635 my $cb = pop; 753 my $cb = pop;
641 ->($self, $cb, @_); 759 ->($self, $cb, @_);
642 } 760 }
643 761
644 762
645 unshift @{ $self->{_queue} }, $cb; 763 unshift @{ $self->{_queue} }, $cb;
646 $self->_drain_rbuf; 764 $self->_drain_rbuf unless $self->{_in_drain};
647} 765}
648 766
649=item $handle->push_read (type => @args, $cb) 767=item $handle->push_read (type => @args, $cb)
650 768
651=item $handle->unshift_read (type => @args, $cb) 769=item $handle->unshift_read (type => @args, $cb)
751 my ($self, $cb) = @_; 869 my ($self, $cb) = @_;
752 870
753 sub { 871 sub {
754 unless ($_[0]{rbuf} =~ s/^(0|[1-9][0-9]*)://) { 872 unless ($_[0]{rbuf} =~ s/^(0|[1-9][0-9]*)://) {
755 if ($_[0]{rbuf} =~ /[^0-9]/) { 873 if ($_[0]{rbuf} =~ /[^0-9]/) {
756 $! = &Errno::EBADMSG; 874 $self->_error (&Errno::EBADMSG);
757 $self->error;
758 } 875 }
759 return; 876 return;
760 } 877 }
761 878
762 my $len = $1; 879 my $len = $1;
765 my $string = $_[1]; 882 my $string = $_[1];
766 $_[0]->unshift_read (chunk => 1, sub { 883 $_[0]->unshift_read (chunk => 1, sub {
767 if ($_[1] eq ",") { 884 if ($_[1] eq ",") {
768 $cb->($_[0], $string); 885 $cb->($_[0], $string);
769 } else { 886 } else {
770 $! = &Errno::EBADMSG; 887 $self->_error (&Errno::EBADMSG);
771 $self->error;
772 } 888 }
773 }); 889 });
774 }); 890 });
775 891
776 1 892 1
833 return 1; 949 return 1;
834 } 950 }
835 951
836 # reject 952 # reject
837 if ($reject && $$rbuf =~ $reject) { 953 if ($reject && $$rbuf =~ $reject) {
838 $! = &Errno::EBADMSG; 954 $self->_error (&Errno::EBADMSG);
839 $self->error;
840 } 955 }
841 956
842 # skip 957 # skip
843 if ($skip && $$rbuf =~ $skip) { 958 if ($skip && $$rbuf =~ $skip) {
844 $data .= substr $$rbuf, 0, $+[0], ""; 959 $data .= substr $$rbuf, 0, $+[0], "";
8592.09 (and JSON::XS version 2.2) and above. You have to provide a 9742.09 (and JSON::XS version 2.2) and above. You have to provide a
860dependency on your own: this module will load the JSON module, but 975dependency on your own: this module will load the JSON module, but
861AnyEvent does not depend on it itself. 976AnyEvent does not depend on it itself.
862 977
863Since JSON texts are fully self-delimiting, the C<json> read and write 978Since JSON texts are fully self-delimiting, the C<json> read and write
864types are an ideal simple RPC protocol: just exchange JSON datagrams. 979types are an ideal simple RPC protocol: just exchange JSON datagrams. See
980the C<json> write type description, above, for an actual example.
865 981
866=cut 982=cut
867 983
868register_read_type json => sub { 984register_read_type json => sub {
869 my ($self, $cb, $accept, $reject, $skip) = @_; 985 my ($self, $cb, $accept, $reject, $skip) = @_;
871 require JSON; 987 require JSON;
872 988
873 my $data; 989 my $data;
874 my $rbuf = \$self->{rbuf}; 990 my $rbuf = \$self->{rbuf};
875 991
876 my $json = $self->{json} ||= JSON::XS->new->utf8; 992 my $json = $self->{json} ||= JSON->new->utf8;
877 993
878 sub { 994 sub {
879 my $ref = $json->incr_parse ($self->{rbuf}); 995 my $ref = $json->incr_parse ($self->{rbuf});
880 996
881 if ($ref) { 997 if ($ref) {
916=item $handle->stop_read 1032=item $handle->stop_read
917 1033
918=item $handle->start_read 1034=item $handle->start_read
919 1035
920In rare cases you actually do not want to read anything from the 1036In rare cases you actually do not want to read anything from the
921socket. In this case you can call C<stop_read>. Neither C<on_read> no 1037socket. In this case you can call C<stop_read>. Neither C<on_read> nor
922any queued callbacks will be executed then. To start reading again, call 1038any queued callbacks will be executed then. To start reading again, call
923C<start_read>. 1039C<start_read>.
1040
1041Note that AnyEvent::Handle will automatically C<start_read> for you when
1042you change the C<on_read> callback or push/unshift a read callback, and it
1043will automatically C<stop_read> for you when neither C<on_read> is set nor
1044there are any read requests in the queue.
924 1045
925=cut 1046=cut
926 1047
927sub stop_read { 1048sub stop_read {
928 my ($self) = @_; 1049 my ($self) = @_;
939 $self->{_rw} = AnyEvent->io (fh => $self->{fh}, poll => "r", cb => sub { 1060 $self->{_rw} = AnyEvent->io (fh => $self->{fh}, poll => "r", cb => sub {
940 my $rbuf = $self->{filter_r} ? \my $buf : \$self->{rbuf}; 1061 my $rbuf = $self->{filter_r} ? \my $buf : \$self->{rbuf};
941 my $len = sysread $self->{fh}, $$rbuf, $self->{read_size} || 8192, length $$rbuf; 1062 my $len = sysread $self->{fh}, $$rbuf, $self->{read_size} || 8192, length $$rbuf;
942 1063
943 if ($len > 0) { 1064 if ($len > 0) {
1065 $self->{_activity} = AnyEvent->now;
1066
944 $self->{filter_r} 1067 $self->{filter_r}
945 ? $self->{filter_r}->($self, $rbuf) 1068 ? $self->{filter_r}($self, $rbuf)
946 : $self->_drain_rbuf; 1069 : $self->{_in_drain} || $self->_drain_rbuf;
947 1070
948 } elsif (defined $len) { 1071 } elsif (defined $len) {
949 delete $self->{_rw}; 1072 delete $self->{_rw};
950 $self->{_eof} = 1; 1073 $self->{_eof} = 1;
951 $self->_drain_rbuf; 1074 $self->_drain_rbuf unless $self->{_in_drain};
952 1075
953 } elsif ($! != EAGAIN && $! != EINTR && $! != &AnyEvent::Util::WSAWOULDBLOCK) { 1076 } elsif ($! != EAGAIN && $! != EINTR && $! != WSAEWOULDBLOCK) {
954 return $self->error; 1077 return $self->_error ($!, 1);
955 } 1078 }
956 }); 1079 });
957 } 1080 }
958} 1081}
959 1082
960sub _dotls { 1083sub _dotls {
961 my ($self) = @_; 1084 my ($self) = @_;
1085
1086 my $buf;
962 1087
963 if (length $self->{_tls_wbuf}) { 1088 if (length $self->{_tls_wbuf}) {
964 while ((my $len = Net::SSLeay::write ($self->{tls}, $self->{_tls_wbuf})) > 0) { 1089 while ((my $len = Net::SSLeay::write ($self->{tls}, $self->{_tls_wbuf})) > 0) {
965 substr $self->{_tls_wbuf}, 0, $len, ""; 1090 substr $self->{_tls_wbuf}, 0, $len, "";
966 } 1091 }
967 } 1092 }
968 1093
969 if (defined (my $buf = Net::SSLeay::BIO_read ($self->{_wbio}))) { 1094 if (length ($buf = Net::SSLeay::BIO_read ($self->{_wbio}))) {
970 $self->{wbuf} .= $buf; 1095 $self->{wbuf} .= $buf;
971 $self->_drain_wbuf; 1096 $self->_drain_wbuf;
972 } 1097 }
973 1098
974 while (defined (my $buf = Net::SSLeay::read ($self->{tls}))) { 1099 while (defined ($buf = Net::SSLeay::read ($self->{tls}))) {
1100 if (length $buf) {
975 $self->{rbuf} .= $buf; 1101 $self->{rbuf} .= $buf;
976 $self->_drain_rbuf; 1102 $self->_drain_rbuf unless $self->{_in_drain};
1103 } else {
1104 # let's treat SSL-eof as we treat normal EOF
1105 $self->{_eof} = 1;
1106 $self->_shutdown;
1107 return;
1108 }
977 } 1109 }
978 1110
979 my $err = Net::SSLeay::get_error ($self->{tls}, -1); 1111 my $err = Net::SSLeay::get_error ($self->{tls}, -1);
980 1112
981 if ($err!= Net::SSLeay::ERROR_WANT_READ ()) { 1113 if ($err!= Net::SSLeay::ERROR_WANT_READ ()) {
982 if ($err == Net::SSLeay::ERROR_SYSCALL ()) { 1114 if ($err == Net::SSLeay::ERROR_SYSCALL ()) {
983 $self->error; 1115 return $self->_error ($!, 1);
984 } elsif ($err == Net::SSLeay::ERROR_SSL ()) { 1116 } elsif ($err == Net::SSLeay::ERROR_SSL ()) {
985 $! = &Errno::EIO; 1117 return $self->_error (&Errno::EIO, 1);
986 $self->error;
987 } 1118 }
988 1119
989 # all others are fine for our purposes 1120 # all others are fine for our purposes
990 } 1121 }
991} 1122}
1006call and can be used or changed to your liking. Note that the handshake 1137call and can be used or changed to your liking. Note that the handshake
1007might have already started when this function returns. 1138might have already started when this function returns.
1008 1139
1009=cut 1140=cut
1010 1141
1011# TODO: maybe document...
1012sub starttls { 1142sub starttls {
1013 my ($self, $ssl, $ctx) = @_; 1143 my ($self, $ssl, $ctx) = @_;
1014 1144
1015 $self->stoptls; 1145 $self->stoptls;
1016 1146

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines