ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent/lib/AnyEvent/Handle.pm
(Generate patch)

Comparing AnyEvent/lib/AnyEvent/Handle.pm (file contents):
Revision 1.9 by root, Fri May 2 16:07:46 2008 UTC vs.
Revision 1.12 by elmex, Thu May 15 09:03:43 2008 UTC

70The filehandle this L<AnyEvent::Handle> object will operate on. 70The filehandle this L<AnyEvent::Handle> object will operate on.
71 71
72NOTE: The filehandle will be set to non-blocking (using 72NOTE: The filehandle will be set to non-blocking (using
73AnyEvent::Util::fh_nonblocking). 73AnyEvent::Util::fh_nonblocking).
74 74
75=item on_error => $cb->($self) [MANDATORY] 75=item on_eof => $cb->($self) [MANDATORY]
76 76
77Set the callback to be called on EOF.
78
79=item on_error => $cb->($self)
80
77This is the fatal error callback, that is called when a fatal error ocurs, 81This is the fatal error callback, that is called when, well, a fatal error
78such as not being able to resolve the hostname, failure to connect or a 82ocurs, such as not being able to resolve the hostname, failure to connect
79read error. 83or a read error.
80 84
81The object will not be in a usable state when this callback has been 85The object will not be in a usable state when this callback has been
82called. 86called.
83 87
84On callback entrance, the value of C<$!> contains the opertaing system 88On callback entrance, the value of C<$!> contains the operating system
85error (or C<ENOSPC> or C<EPIPE>). 89error (or C<ENOSPC> or C<EPIPE>).
86 90
87=item on_eof => $cb->($self) [MANDATORY] 91While not mandatory, it is I<highly> recommended to set this callback, as
88 92you will not be notified of errors otherwise. The default simply calls
89Set the callback to be called on EOF. 93die.
90 94
91=item on_read => $cb->($self) 95=item on_read => $cb->($self)
92 96
93This sets the default read callback, which is called when data arrives 97This sets the default read callback, which is called when data arrives
94and no read request is in the queue. If the read callback is C<undef> 98and no read request is in the queue.
95or has never been set, than AnyEvent::Handle will cease reading from the
96filehandle.
97 99
98To access (and remove data from) the read buffer, use the C<< ->rbuf >> 100To access (and remove data from) the read buffer, use the C<< ->rbuf >>
99method or acces sthe C<$self->{rbuf}> member directly. 101method or acces sthe C<$self->{rbuf}> member directly.
100 102
101When an EOF condition is detected then AnyEvent::Handle will first try to 103When an EOF condition is detected then AnyEvent::Handle will first try to
144 146
145 $self->{fh} or Carp::croak "mandatory argument fh is missing"; 147 $self->{fh} or Carp::croak "mandatory argument fh is missing";
146 148
147 AnyEvent::Util::fh_nonblocking $self->{fh}, 1; 149 AnyEvent::Util::fh_nonblocking $self->{fh}, 1;
148 150
149 $self->on_error ((delete $self->{on_error}) or Carp::croak "mandatory argument on_error is missing");
150 $self->on_eof ((delete $self->{on_eof} ) or Carp::croak "mandatory argument on_eof is missing"); 151 $self->on_eof ((delete $self->{on_eof} ) or Carp::croak "mandatory argument on_eof is missing");
151 152
153 $self->on_error (delete $self->{on_error}) if $self->{on_error};
152 $self->on_drain (delete $self->{on_drain}) if $self->{on_drain}; 154 $self->on_drain (delete $self->{on_drain}) if $self->{on_drain};
153 $self->on_read (delete $self->{on_read} ) if $self->{on_read}; 155 $self->on_read (delete $self->{on_read} ) if $self->{on_read};
156
157 $self->start_read;
154 158
155 $self 159 $self
156} 160}
157 161
158sub _shutdown { 162sub _shutdown {
169 { 173 {
170 local $!; 174 local $!;
171 $self->_shutdown; 175 $self->_shutdown;
172 } 176 }
173 177
178 if ($self->{on_error}) {
174 $self->{on_error}($self); 179 $self->{on_error}($self);
180 } else {
181 die "AnyEvent::Handle uncaught fatal error: $!";
182 }
175} 183}
176 184
177=item $fh = $handle->fh 185=item $fh = $handle->fh
178 186
179This method returns the filehandle of the L<AnyEvent::Handle> object. 187This method returns the filehandle of the L<AnyEvent::Handle> object.
349 ... 357 ...
350 }); 358 });
351 359
352=over 4 360=over 4
353 361
362=cut
363
354sub _drain_rbuf { 364sub _drain_rbuf {
355 my ($self) = @_; 365 my ($self) = @_;
356 366
357 return if exists $self->{in_drain}; 367 return if $self->{in_drain};
358 local $self->{in_drain} = 1; 368 local $self->{in_drain} = 1;
359 369
360 while (my $len = length $self->{rbuf}) { 370 while (my $len = length $self->{rbuf}) {
361 no strict 'refs'; 371 no strict 'refs';
362 if (@{ $self->{queue} }) { 372 if (my $cb = shift @{ $self->{queue} }) {
363 if ($self->{queue}[0]($self)) { 373 if (!$cb->($self)) {
364 shift @{ $self->{queue} };
365 } elsif ($self->{eof}) { 374 if ($self->{eof}) {
366 # no progress can be made (not enough data and no data forthcoming) 375 # no progress can be made (not enough data and no data forthcoming)
367 $! = &Errno::EPIPE; return $self->error; 376 $! = &Errno::EPIPE; return $self->error;
368 } else { 377 }
378
379 unshift @{ $self->{queue} }, $cb;
369 return; 380 return;
370 } 381 }
371 } elsif ($self->{on_read}) { 382 } elsif ($self->{on_read}) {
372 $self->{on_read}($self); 383 $self->{on_read}($self);
373 384
403 414
404sub on_read { 415sub on_read {
405 my ($self, $cb) = @_; 416 my ($self, $cb) = @_;
406 417
407 $self->{on_read} = $cb; 418 $self->{on_read} = $cb;
408
409 unless ($self->{rw} || $self->{eof}) {
410 Scalar::Util::weaken $self;
411
412 $self->{rw} = AnyEvent->io (fh => $self->{fh}, poll => "r", cb => sub {
413 my $len = sysread $self->{fh}, $self->{rbuf}, $self->{read_size} || 8192, length $self->{rbuf};
414
415 if ($len > 0) {
416 if (exists $self->{rbuf_max}) {
417 if ($self->{rbuf_max} < length $self->{rbuf}) {
418 $! = &Errno::ENOSPC; return $self->error;
419 }
420 }
421
422 } elsif (defined $len) {
423 $self->{eof} = 1;
424 delete $self->{rw};
425
426 } elsif ($! != EAGAIN && $! != EINTR) {
427 return $self->error;
428 }
429
430 $self->_drain_rbuf;
431 });
432 }
433} 419}
434 420
435=item $handle->rbuf 421=item $handle->rbuf
436 422
437Returns the read buffer (as a modifiable lvalue). 423Returns the read buffer (as a modifiable lvalue).
495these C<$len> bytes will be passed to the callback. 481these C<$len> bytes will be passed to the callback.
496 482
497=cut 483=cut
498 484
499sub _read_chunk($$) { 485sub _read_chunk($$) {
500 my ($len, $cb) = @_; 486 my ($self, $len, $cb) = @_;
501 487
502 sub { 488 sub {
503 $len <= length $_[0]{rbuf} or return; 489 $len <= length $_[0]{rbuf} or return;
504 $cb->($_[0], substr $_[0]{rbuf}, 0, $len, ""); 490 $cb->($_[0], substr $_[0]{rbuf}, 0, $len, "");
505 1 491 1
506 } 492 }
507} 493}
508 494
509sub push_read_chunk { 495sub push_read_chunk {
510 my ($self, $len, $cb) = @_;
511
512 $self->push_read (_read_chunk $len, $cb); 496 $_[0]->push_read (&_read_chunk);
513} 497}
514 498
515 499
516sub unshift_read_chunk { 500sub unshift_read_chunk {
517 my ($self, $len, $cb) = @_;
518
519 $self->unshift_read (_read_chunk $len, $cb); 501 $_[0]->unshift_read (&_read_chunk);
520} 502}
521 503
522=item $handle->push_read_line ([$eol, ]$cb->($self, $line, $eol)) 504=item $handle->push_read_line ([$eol, ]$cb->($self, $line, $eol))
523 505
524=item $handle->unshift_read_line ([$eol, ]$cb->($self, $line, $eol)) 506=item $handle->unshift_read_line ([$eol, ]$cb->($self, $line, $eol))
544not marked by the end of line marker. 526not marked by the end of line marker.
545 527
546=cut 528=cut
547 529
548sub _read_line($$) { 530sub _read_line($$) {
531 my $self = shift;
549 my $cb = pop; 532 my $cb = pop;
550 my $eol = @_ ? shift : qr|(\015?\012)|; 533 my $eol = @_ ? shift : qr|(\015?\012)|;
551 my $pos; 534 my $pos;
552 535
553 $eol = qr|(\Q$eol\E)| unless ref $eol; 536 $eol = qr|(\Q$eol\E)| unless ref $eol;
554 $eol = qr|^(.*?)($eol)|; 537 $eol = qr|^(.*?)($eol)|;
555 538
556 sub { 539 sub {
557 $_[0]{rbuf} =~ s/$eol// or return; 540 $_[0]{rbuf} =~ s/$eol// or return;
558 541
559 $cb->($1, $2); 542 $cb->($_[0], $1, $2);
560 1 543 1
561 } 544 }
562} 545}
563 546
564sub push_read_line { 547sub push_read_line {
565 my $self = shift;
566
567 $self->push_read (&_read_line); 548 $_[0]->push_read (&_read_line);
568} 549}
569 550
570sub unshift_read_line { 551sub unshift_read_line {
571 my $self = shift;
572
573 $self->unshift_read (&_read_line); 552 $_[0]->unshift_read (&_read_line);
553}
554
555=item $handle->stop_read
556
557=item $handle->start_read
558
559In rare cases you actually do not want to read anything form the
560socket. In this case you can call C<stop_read>. Neither C<on_read> no
561any queued callbacks will be executed then. To start readign again, call
562C<start_read>.
563
564=cut
565
566sub stop_read {
567 my ($self) = @_;
568
569 delete $self->{rw};
570}
571
572sub start_read {
573 my ($self) = @_;
574
575 unless ($self->{rw} || $self->{eof}) {
576 Scalar::Util::weaken $self;
577
578 $self->{rw} = AnyEvent->io (fh => $self->{fh}, poll => "r", cb => sub {
579 my $len = sysread $self->{fh}, $self->{rbuf}, $self->{read_size} || 8192, length $self->{rbuf};
580
581 if ($len > 0) {
582 if (defined $self->{rbuf_max}) {
583 if ($self->{rbuf_max} < length $self->{rbuf}) {
584 $! = &Errno::ENOSPC; return $self->error;
585 }
586 }
587
588 } elsif (defined $len) {
589 $self->{eof} = 1;
590 delete $self->{rw};
591
592 } elsif ($! != EAGAIN && $! != EINTR) {
593 return $self->error;
594 }
595
596 $self->_drain_rbuf;
597 });
598 }
574} 599}
575 600
576=back 601=back
577 602
578=head1 AUTHOR 603=head1 AUTHOR

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines