ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent/lib/AnyEvent/Handle.pm
(Generate patch)

Comparing AnyEvent/lib/AnyEvent/Handle.pm (file contents):
Revision 1.9 by root, Fri May 2 16:07:46 2008 UTC vs.
Revision 1.18 by root, Sat May 24 05:01:16 2008 UTC

12 12
13=head1 NAME 13=head1 NAME
14 14
15AnyEvent::Handle - non-blocking I/O on filehandles via AnyEvent 15AnyEvent::Handle - non-blocking I/O on filehandles via AnyEvent
16 16
17=cut 17This module is experimental.
18 18
19=cut
20
19our $VERSION = '0.02'; 21our $VERSION = '0.04';
20 22
21=head1 SYNOPSIS 23=head1 SYNOPSIS
22 24
23 use AnyEvent; 25 use AnyEvent;
24 use AnyEvent::Handle; 26 use AnyEvent::Handle;
43 $cv->wait; 45 $cv->wait;
44 46
45=head1 DESCRIPTION 47=head1 DESCRIPTION
46 48
47This module is a helper module to make it easier to do event-based I/O on 49This module is a helper module to make it easier to do event-based I/O on
48filehandles (and sockets, see L<AnyEvent::Socket> for an easy way to make 50filehandles. For utility functions for doing non-blocking connects and accepts
49non-blocking resolves and connects). 51on sockets see L<AnyEvent::Util>.
50 52
51In the following, when the documentation refers to of "bytes" then this 53In the following, when the documentation refers to of "bytes" then this
52means characters. As sysread and syswrite are used for all I/O, their 54means characters. As sysread and syswrite are used for all I/O, their
53treatment of characters applies to this module as well. 55treatment of characters applies to this module as well.
54 56
70The filehandle this L<AnyEvent::Handle> object will operate on. 72The filehandle this L<AnyEvent::Handle> object will operate on.
71 73
72NOTE: The filehandle will be set to non-blocking (using 74NOTE: The filehandle will be set to non-blocking (using
73AnyEvent::Util::fh_nonblocking). 75AnyEvent::Util::fh_nonblocking).
74 76
77=item on_eof => $cb->($self)
78
79Set the callback to be called on EOF.
80
81While not mandatory, it is highly recommended to set an eof callback,
82otherwise you might end up with a closed socket while you are still
83waiting for data.
84
75=item on_error => $cb->($self) [MANDATORY] 85=item on_error => $cb->($self)
76 86
77This is the fatal error callback, that is called when a fatal error ocurs, 87This is the fatal error callback, that is called when, well, a fatal error
78such as not being able to resolve the hostname, failure to connect or a 88ocurs, such as not being able to resolve the hostname, failure to connect
79read error. 89or a read error.
80 90
81The object will not be in a usable state when this callback has been 91The object will not be in a usable state when this callback has been
82called. 92called.
83 93
84On callback entrance, the value of C<$!> contains the opertaing system 94On callback entrance, the value of C<$!> contains the operating system
85error (or C<ENOSPC> or C<EPIPE>). 95error (or C<ENOSPC> or C<EPIPE>).
86 96
87=item on_eof => $cb->($self) [MANDATORY] 97While not mandatory, it is I<highly> recommended to set this callback, as
88 98you will not be notified of errors otherwise. The default simply calls
89Set the callback to be called on EOF. 99die.
90 100
91=item on_read => $cb->($self) 101=item on_read => $cb->($self)
92 102
93This sets the default read callback, which is called when data arrives 103This sets the default read callback, which is called when data arrives
94and no read request is in the queue. If the read callback is C<undef> 104and no read request is in the queue.
95or has never been set, than AnyEvent::Handle will cease reading from the
96filehandle.
97 105
98To access (and remove data from) the read buffer, use the C<< ->rbuf >> 106To access (and remove data from) the read buffer, use the C<< ->rbuf >>
99method or acces sthe C<$self->{rbuf}> member directly. 107method or acces sthe C<$self->{rbuf}> member directly.
100 108
101When an EOF condition is detected then AnyEvent::Handle will first try to 109When an EOF condition is detected then AnyEvent::Handle will first try to
144 152
145 $self->{fh} or Carp::croak "mandatory argument fh is missing"; 153 $self->{fh} or Carp::croak "mandatory argument fh is missing";
146 154
147 AnyEvent::Util::fh_nonblocking $self->{fh}, 1; 155 AnyEvent::Util::fh_nonblocking $self->{fh}, 1;
148 156
149 $self->on_error ((delete $self->{on_error}) or Carp::croak "mandatory argument on_error is missing"); 157 $self->on_eof (delete $self->{on_eof} ) if $self->{on_eof};
150 $self->on_eof ((delete $self->{on_eof} ) or Carp::croak "mandatory argument on_eof is missing"); 158 $self->on_error (delete $self->{on_error}) if $self->{on_error};
151
152 $self->on_drain (delete $self->{on_drain}) if $self->{on_drain}; 159 $self->on_drain (delete $self->{on_drain}) if $self->{on_drain};
153 $self->on_read (delete $self->{on_read} ) if $self->{on_read}; 160 $self->on_read (delete $self->{on_read} ) if $self->{on_read};
161
162 $self->start_read;
154 163
155 $self 164 $self
156} 165}
157 166
158sub _shutdown { 167sub _shutdown {
169 { 178 {
170 local $!; 179 local $!;
171 $self->_shutdown; 180 $self->_shutdown;
172 } 181 }
173 182
183 if ($self->{on_error}) {
174 $self->{on_error}($self); 184 $self->{on_error}($self);
185 } else {
186 die "AnyEvent::Handle uncaught fatal error: $!";
187 }
175} 188}
176 189
177=item $fh = $handle->fh 190=item $fh = $handle->fh
178 191
179This method returns the filehandle of the L<AnyEvent::Handle> object. 192This method returns the filehandle of the L<AnyEvent::Handle> object.
241want (only limited by the available memory), as C<AnyEvent::Handle> 254want (only limited by the available memory), as C<AnyEvent::Handle>
242buffers it independently of the kernel. 255buffers it independently of the kernel.
243 256
244=cut 257=cut
245 258
246sub push_write { 259sub _drain_wbuf {
247 my ($self, $data) = @_; 260 my ($self) = @_;
248
249 $self->{wbuf} .= $data;
250 261
251 unless ($self->{ww}) { 262 unless ($self->{ww}) {
252 Scalar::Util::weaken $self; 263 Scalar::Util::weaken $self;
253 my $cb = sub { 264 my $cb = sub {
254 my $len = syswrite $self->{fh}, $self->{wbuf}; 265 my $len = syswrite $self->{fh}, $self->{wbuf};
255 266
256 if ($len > 0) { 267 if ($len > 0) {
257 substr $self->{wbuf}, 0, $len, ""; 268 substr $self->{wbuf}, 0, $len, "";
258
259 269
260 $self->{on_drain}($self) 270 $self->{on_drain}($self)
261 if $self->{low_water_mark} >= length $self->{wbuf} 271 if $self->{low_water_mark} >= length $self->{wbuf}
262 && $self->{on_drain}; 272 && $self->{on_drain};
263 273
269 279
270 $self->{ww} = AnyEvent->io (fh => $self->{fh}, poll => "w", cb => $cb); 280 $self->{ww} = AnyEvent->io (fh => $self->{fh}, poll => "w", cb => $cb);
271 281
272 $cb->($self); 282 $cb->($self);
273 }; 283 };
284}
285
286sub push_write {
287 my $self = shift;
288
289 if ($self->{filter_w}) {
290 $self->{filter_w}->($self, \$_[0]);
291 } else {
292 $self->{wbuf} .= $_[0];
293 $self->_drain_wbuf;
294 }
274} 295}
275 296
276############################################################################# 297#############################################################################
277 298
278=back 299=back
349 ... 370 ...
350 }); 371 });
351 372
352=over 4 373=over 4
353 374
375=cut
376
354sub _drain_rbuf { 377sub _drain_rbuf {
355 my ($self) = @_; 378 my ($self) = @_;
356 379
380 if (
381 defined $self->{rbuf_max}
382 && $self->{rbuf_max} < length $self->{rbuf}
383 ) {
384 $! = &Errno::ENOSPC; return $self->error;
385 }
386
357 return if exists $self->{in_drain}; 387 return if $self->{in_drain};
358 local $self->{in_drain} = 1; 388 local $self->{in_drain} = 1;
359 389
360 while (my $len = length $self->{rbuf}) { 390 while (my $len = length $self->{rbuf}) {
361 no strict 'refs'; 391 no strict 'refs';
362 if (@{ $self->{queue} }) { 392 if (my $cb = shift @{ $self->{queue} }) {
363 if ($self->{queue}[0]($self)) { 393 if (!$cb->($self)) {
364 shift @{ $self->{queue} };
365 } elsif ($self->{eof}) { 394 if ($self->{eof}) {
366 # no progress can be made (not enough data and no data forthcoming) 395 # no progress can be made (not enough data and no data forthcoming)
367 $! = &Errno::EPIPE; return $self->error; 396 $! = &Errno::EPIPE; return $self->error;
368 } else { 397 }
398
399 unshift @{ $self->{queue} }, $cb;
369 return; 400 return;
370 } 401 }
371 } elsif ($self->{on_read}) { 402 } elsif ($self->{on_read}) {
372 $self->{on_read}($self); 403 $self->{on_read}($self);
373 404
387 } 418 }
388 } 419 }
389 420
390 if ($self->{eof}) { 421 if ($self->{eof}) {
391 $self->_shutdown; 422 $self->_shutdown;
392 $self->{on_eof}($self); 423 $self->{on_eof}($self)
424 if $self->{on_eof};
393 } 425 }
394} 426}
395 427
396=item $handle->on_read ($cb) 428=item $handle->on_read ($cb)
397 429
403 435
404sub on_read { 436sub on_read {
405 my ($self, $cb) = @_; 437 my ($self, $cb) = @_;
406 438
407 $self->{on_read} = $cb; 439 $self->{on_read} = $cb;
408
409 unless ($self->{rw} || $self->{eof}) {
410 Scalar::Util::weaken $self;
411
412 $self->{rw} = AnyEvent->io (fh => $self->{fh}, poll => "r", cb => sub {
413 my $len = sysread $self->{fh}, $self->{rbuf}, $self->{read_size} || 8192, length $self->{rbuf};
414
415 if ($len > 0) {
416 if (exists $self->{rbuf_max}) {
417 if ($self->{rbuf_max} < length $self->{rbuf}) {
418 $! = &Errno::ENOSPC; return $self->error;
419 }
420 }
421
422 } elsif (defined $len) {
423 $self->{eof} = 1;
424 delete $self->{rw};
425
426 } elsif ($! != EAGAIN && $! != EINTR) {
427 return $self->error;
428 }
429
430 $self->_drain_rbuf;
431 });
432 }
433} 440}
434 441
435=item $handle->rbuf 442=item $handle->rbuf
436 443
437Returns the read buffer (as a modifiable lvalue). 444Returns the read buffer (as a modifiable lvalue).
495these C<$len> bytes will be passed to the callback. 502these C<$len> bytes will be passed to the callback.
496 503
497=cut 504=cut
498 505
499sub _read_chunk($$) { 506sub _read_chunk($$) {
500 my ($len, $cb) = @_; 507 my ($self, $len, $cb) = @_;
501 508
502 sub { 509 sub {
503 $len <= length $_[0]{rbuf} or return; 510 $len <= length $_[0]{rbuf} or return;
504 $cb->($_[0], substr $_[0]{rbuf}, 0, $len, ""); 511 $cb->($_[0], substr $_[0]{rbuf}, 0, $len, "");
505 1 512 1
506 } 513 }
507} 514}
508 515
509sub push_read_chunk { 516sub push_read_chunk {
510 my ($self, $len, $cb) = @_;
511
512 $self->push_read (_read_chunk $len, $cb); 517 $_[0]->push_read (&_read_chunk);
513} 518}
514 519
515 520
516sub unshift_read_chunk { 521sub unshift_read_chunk {
517 my ($self, $len, $cb) = @_;
518
519 $self->unshift_read (_read_chunk $len, $cb); 522 $_[0]->unshift_read (&_read_chunk);
520} 523}
521 524
522=item $handle->push_read_line ([$eol, ]$cb->($self, $line, $eol)) 525=item $handle->push_read_line ([$eol, ]$cb->($self, $line, $eol))
523 526
524=item $handle->unshift_read_line ([$eol, ]$cb->($self, $line, $eol)) 527=item $handle->unshift_read_line ([$eol, ]$cb->($self, $line, $eol))
544not marked by the end of line marker. 547not marked by the end of line marker.
545 548
546=cut 549=cut
547 550
548sub _read_line($$) { 551sub _read_line($$) {
552 my $self = shift;
549 my $cb = pop; 553 my $cb = pop;
550 my $eol = @_ ? shift : qr|(\015?\012)|; 554 my $eol = @_ ? shift : qr|(\015?\012)|;
551 my $pos; 555 my $pos;
552 556
553 $eol = qr|(\Q$eol\E)| unless ref $eol; 557 $eol = quotemeta $eol unless ref $eol;
554 $eol = qr|^(.*?)($eol)|; 558 $eol = qr|^(.*?)($eol)|s;
555 559
556 sub { 560 sub {
557 $_[0]{rbuf} =~ s/$eol// or return; 561 $_[0]{rbuf} =~ s/$eol// or return;
558 562
559 $cb->($1, $2); 563 $cb->($_[0], $1, $2);
560 1 564 1
561 } 565 }
562} 566}
563 567
564sub push_read_line { 568sub push_read_line {
565 my $self = shift;
566
567 $self->push_read (&_read_line); 569 $_[0]->push_read (&_read_line);
568} 570}
569 571
570sub unshift_read_line { 572sub unshift_read_line {
571 my $self = shift;
572
573 $self->unshift_read (&_read_line); 573 $_[0]->unshift_read (&_read_line);
574}
575
576=item $handle->stop_read
577
578=item $handle->start_read
579
580In rare cases you actually do not want to read anything from the
581socket. In this case you can call C<stop_read>. Neither C<on_read> no
582any queued callbacks will be executed then. To start readign again, call
583C<start_read>.
584
585=cut
586
587sub stop_read {
588 my ($self) = @_;
589
590 delete $self->{rw};
591}
592
593sub start_read {
594 my ($self) = @_;
595
596 unless ($self->{rw} || $self->{eof}) {
597 Scalar::Util::weaken $self;
598
599 $self->{rw} = AnyEvent->io (fh => $self->{fh}, poll => "r", cb => sub {
600 my $rbuf = $self->{filter_r} ? \my $buf : \$self->{rbuf};
601 my $len = sysread $self->{fh}, $$rbuf, $self->{read_size} || 8192, length $$rbuf;
602
603 if ($len > 0) {
604 $self->{filter_r}
605 ? $self->{filter_r}->($self, $rbuf)
606 : $self->_drain_rbuf;
607
608 } elsif (defined $len) {
609 delete $self->{rw};
610 $self->{eof} = 1;
611 $self->_drain_rbuf;
612
613 } elsif ($! != EAGAIN && $! != EINTR) {
614 return $self->error;
615 }
616 });
617 }
574} 618}
575 619
576=back 620=back
577 621
578=head1 AUTHOR 622=head1 AUTHOR

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines