ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent/lib/AnyEvent/Handle.pm
(Generate patch)

Comparing AnyEvent/lib/AnyEvent/Handle.pm (file contents):
Revision 1.8 by root, Fri May 2 15:36:10 2008 UTC vs.
Revision 1.13 by elmex, Thu May 15 13:32:19 2008 UTC

43 $cv->wait; 43 $cv->wait;
44 44
45=head1 DESCRIPTION 45=head1 DESCRIPTION
46 46
47This module is a helper module to make it easier to do event-based I/O on 47This module is a helper module to make it easier to do event-based I/O on
48filehandles (and sockets, see L<AnyEvent::Socket> for an easy way to make 48filehandles. For utility functions for doing non-blocking connects and accepts
49non-blocking resolves and connects). 49on sockets see L<AnyEvent::Util>.
50 50
51In the following, when the documentation refers to of "bytes" then this 51In the following, when the documentation refers to of "bytes" then this
52means characters. As sysread and syswrite are used for all I/O, their 52means characters. As sysread and syswrite are used for all I/O, their
53treatment of characters applies to this module as well. 53treatment of characters applies to this module as well.
54 54
70The filehandle this L<AnyEvent::Handle> object will operate on. 70The filehandle this L<AnyEvent::Handle> object will operate on.
71 71
72NOTE: The filehandle will be set to non-blocking (using 72NOTE: The filehandle will be set to non-blocking (using
73AnyEvent::Util::fh_nonblocking). 73AnyEvent::Util::fh_nonblocking).
74 74
75=item on_error => $cb->($self) [MANDATORY] 75=item on_eof => $cb->($self) [MANDATORY]
76 76
77Set the callback to be called on EOF.
78
79=item on_error => $cb->($self)
80
77This is the fatal error callback, that is called when a fatal error ocurs, 81This is the fatal error callback, that is called when, well, a fatal error
78such as not being able to resolve the hostname, failure to connect or a 82ocurs, such as not being able to resolve the hostname, failure to connect
79read error. 83or a read error.
80 84
81The object will not be in a usable state when this callback has been 85The object will not be in a usable state when this callback has been
82called. 86called.
83 87
84On callback entrance, the value of C<$!> contains the opertaing system 88On callback entrance, the value of C<$!> contains the operating system
85error (or C<ENOSPC> or C<EPIPE>). 89error (or C<ENOSPC> or C<EPIPE>).
86 90
87=item on_eof => $cb->($self) [MANDATORY] 91While not mandatory, it is I<highly> recommended to set this callback, as
88 92you will not be notified of errors otherwise. The default simply calls
89Set the callback to be called on EOF. 93die.
90 94
91=item on_read => $cb->($self) 95=item on_read => $cb->($self)
92 96
93This sets the default read callback, which is called when data arrives 97This sets the default read callback, which is called when data arrives
94and no read request is in the queue. If the read callback is C<undef> 98and no read request is in the queue.
95or has never been set, than AnyEvent::Handle will cease reading from the
96filehandle.
97 99
98To access (and remove data from) the read buffer, use the C<< ->rbuf >> 100To access (and remove data from) the read buffer, use the C<< ->rbuf >>
99method or acces sthe C<$self->{rbuf}> member directly. 101method or acces sthe C<$self->{rbuf}> member directly.
100 102
101When an EOF condition is detected then AnyEvent::Handle will first try to 103When an EOF condition is detected then AnyEvent::Handle will first try to
144 146
145 $self->{fh} or Carp::croak "mandatory argument fh is missing"; 147 $self->{fh} or Carp::croak "mandatory argument fh is missing";
146 148
147 AnyEvent::Util::fh_nonblocking $self->{fh}, 1; 149 AnyEvent::Util::fh_nonblocking $self->{fh}, 1;
148 150
149 $self->on_error ((delete $self->{on_error}) or Carp::croak "mandatory argument on_error is missing");
150 $self->on_eof ((delete $self->{on_eof} ) or Carp::croak "mandatory argument on_eof is missing"); 151 $self->on_eof ((delete $self->{on_eof} ) or Carp::croak "mandatory argument on_eof is missing");
151 152
153 $self->on_error (delete $self->{on_error}) if $self->{on_error};
152 $self->on_drain (delete $self->{on_drain}) if $self->{on_drain}; 154 $self->on_drain (delete $self->{on_drain}) if $self->{on_drain};
153 $self->on_read (delete $self->{on_read} ) if $self->{on_read}; 155 $self->on_read (delete $self->{on_read} ) if $self->{on_read};
156
157 $self->start_read;
154 158
155 $self 159 $self
156} 160}
157 161
158sub _shutdown { 162sub _shutdown {
169 { 173 {
170 local $!; 174 local $!;
171 $self->_shutdown; 175 $self->_shutdown;
172 } 176 }
173 177
178 if ($self->{on_error}) {
174 $self->{on_error}($self); 179 $self->{on_error}($self);
180 } else {
181 die "AnyEvent::Handle uncaught fatal error: $!";
182 }
175} 183}
176 184
177=item $fh = $handle->fh 185=item $fh = $handle->fh
178 186
179This method returns the filehandle of the L<AnyEvent::Handle> object. 187This method returns the filehandle of the L<AnyEvent::Handle> object.
196 204
197Replace the current C<on_eof> callback (see the C<on_eof> constructor argument). 205Replace the current C<on_eof> callback (see the C<on_eof> constructor argument).
198 206
199=cut 207=cut
200 208
201#############################################################################
202
203sub on_eof { 209sub on_eof {
204 $_[0]{on_eof} = $_[1]; 210 $_[0]{on_eof} = $_[1];
205} 211}
212
213#############################################################################
214
215=back
216
217=head2 WRITE QUEUE
218
219AnyEvent::Handle manages two queues per handle, one for writing and one
220for reading.
221
222The write queue is very simple: you can add data to its end, and
223AnyEvent::Handle will automatically try to get rid of it for you.
224
225When data could be writtena nd the write buffer is shorter then the low
226water mark, the C<on_drain> callback will be invoked.
227
228=over 4
206 229
207=item $handle->on_drain ($cb) 230=item $handle->on_drain ($cb)
208 231
209Sets the C<on_drain> callback or clears it (see the description of 232Sets the C<on_drain> callback or clears it (see the description of
210C<on_drain> in the constructor). 233C<on_drain> in the constructor).
258 }; 281 };
259} 282}
260 283
261############################################################################# 284#############################################################################
262 285
286=back
287
288=head2 READ QUEUE
289
290AnyEvent::Handle manages two queues per handle, one for writing and one
291for reading.
292
293The read queue is more complex than the write queue. It can be used in two
294ways, the "simple" way, using only C<on_read> and the "complex" way, using
295a queue.
296
297In the simple case, you just install an C<on_read> callback and whenever
298new data arrives, it will be called. You can then remove some data (if
299enough is there) from the read buffer (C<< $handle->rbuf >>) if you want
300or not.
301
302In the more complex case, you want to queue multiple callbacks. In this
303case, AnyEvent::Handle will call the first queued callback each time new
304data arrives and removes it when it has done its job (see C<push_read>,
305below).
306
307This way you can, for example, push three line-reads, followed by reading
308a chunk of data, and AnyEvent::Handle will execute them in order.
309
310Example 1: EPP protocol parser. EPP sends 4 byte length info, followed by
311the specified number of bytes which give an XML datagram.
312
313 # in the default state, expect some header bytes
314 $handle->on_read (sub {
315 # some data is here, now queue the length-header-read (4 octets)
316 shift->unshift_read_chunk (4, sub {
317 # header arrived, decode
318 my $len = unpack "N", $_[1];
319
320 # now read the payload
321 shift->unshift_read_chunk ($len, sub {
322 my $xml = $_[1];
323 # handle xml
324 });
325 });
326 });
327
328Example 2: Implement a client for a protocol that replies either with
329"OK" and another line or "ERROR" for one request, and 64 bytes for the
330second request. Due tot he availability of a full queue, we can just
331pipeline sending both requests and manipulate the queue as necessary in
332the callbacks:
333
334 # request one
335 $handle->push_write ("request 1\015\012");
336
337 # we expect "ERROR" or "OK" as response, so push a line read
338 $handle->push_read_line (sub {
339 # if we got an "OK", we have to _prepend_ another line,
340 # so it will be read before the second request reads its 64 bytes
341 # which are already in the queue when this callback is called
342 # we don't do this in case we got an error
343 if ($_[1] eq "OK") {
344 $_[0]->unshift_read_line (sub {
345 my $response = $_[1];
346 ...
347 });
348 }
349 });
350
351 # request two
352 $handle->push_write ("request 2\015\012");
353
354 # simply read 64 bytes, always
355 $handle->push_read_chunk (64, sub {
356 my $response = $_[1];
357 ...
358 });
359
360=over 4
361
362=cut
363
263sub _drain_rbuf { 364sub _drain_rbuf {
264 my ($self) = @_; 365 my ($self) = @_;
265 366
266 return if exists $self->{in_drain}; 367 return if $self->{in_drain};
267 local $self->{in_drain} = 1; 368 local $self->{in_drain} = 1;
268 369
269 while (my $len = length $self->{rbuf}) { 370 while (my $len = length $self->{rbuf}) {
270 no strict 'refs'; 371 no strict 'refs';
271 if (@{ $self->{queue} }) { 372 if (my $cb = shift @{ $self->{queue} }) {
272 if ($self->{queue}[0]($self)) { 373 if (!$cb->($self)) {
273 shift @{ $self->{queue} };
274 } elsif ($self->{eof}) { 374 if ($self->{eof}) {
275 # no progress can be made (not enough data and no data forthcoming) 375 # no progress can be made (not enough data and no data forthcoming)
276 $! = &Errno::EPIPE; return $self->error; 376 $! = &Errno::EPIPE; return $self->error;
277 } else { 377 }
378
379 unshift @{ $self->{queue} }, $cb;
278 return; 380 return;
279 } 381 }
280 } elsif ($self->{on_read}) { 382 } elsif ($self->{on_read}) {
281 $self->{on_read}($self); 383 $self->{on_read}($self);
282 384
312 414
313sub on_read { 415sub on_read {
314 my ($self, $cb) = @_; 416 my ($self, $cb) = @_;
315 417
316 $self->{on_read} = $cb; 418 $self->{on_read} = $cb;
317
318 unless ($self->{rw} || $self->{eof}) {
319 Scalar::Util::weaken $self;
320
321 $self->{rw} = AnyEvent->io (fh => $self->{fh}, poll => "r", cb => sub {
322 my $len = sysread $self->{fh}, $self->{rbuf}, $self->{read_size} || 8192, length $self->{rbuf};
323
324 if ($len > 0) {
325 if (exists $self->{rbuf_max}) {
326 if ($self->{rbuf_max} < length $self->{rbuf}) {
327 $! = &Errno::ENOSPC; return $self->error;
328 }
329 }
330
331 } elsif (defined $len) {
332 $self->{eof} = 1;
333 delete $self->{rw};
334
335 } elsif ($! != EAGAIN && $! != EINTR) {
336 return $self->error;
337 }
338
339 $self->_drain_rbuf;
340 });
341 }
342} 419}
343 420
344=item $handle->rbuf 421=item $handle->rbuf
345 422
346Returns the read buffer (as a modifiable lvalue). 423Returns the read buffer (as a modifiable lvalue).
404these C<$len> bytes will be passed to the callback. 481these C<$len> bytes will be passed to the callback.
405 482
406=cut 483=cut
407 484
408sub _read_chunk($$) { 485sub _read_chunk($$) {
409 my ($len, $cb) = @_; 486 my ($self, $len, $cb) = @_;
410 487
411 sub { 488 sub {
412 $len <= length $_[0]{rbuf} or return; 489 $len <= length $_[0]{rbuf} or return;
413 $cb->($_[0], substr $_[0]{rbuf}, 0, $len, ""); 490 $cb->($_[0], substr $_[0]{rbuf}, 0, $len, "");
414 1 491 1
415 } 492 }
416} 493}
417 494
418sub push_read_chunk { 495sub push_read_chunk {
419 my ($self, $len, $cb) = @_;
420
421 $self->push_read (_read_chunk $len, $cb); 496 $_[0]->push_read (&_read_chunk);
422} 497}
423 498
424 499
425sub unshift_read_chunk { 500sub unshift_read_chunk {
426 my ($self, $len, $cb) = @_;
427
428 $self->unshift_read (_read_chunk $len, $cb); 501 $_[0]->unshift_read (&_read_chunk);
429} 502}
430 503
431=item $handle->push_read_line ([$eol, ]$cb->($self, $line, $eol)) 504=item $handle->push_read_line ([$eol, ]$cb->($self, $line, $eol))
432 505
433=item $handle->unshift_read_line ([$eol, ]$cb->($self, $line, $eol)) 506=item $handle->unshift_read_line ([$eol, ]$cb->($self, $line, $eol))
453not marked by the end of line marker. 526not marked by the end of line marker.
454 527
455=cut 528=cut
456 529
457sub _read_line($$) { 530sub _read_line($$) {
531 my $self = shift;
458 my $cb = pop; 532 my $cb = pop;
459 my $eol = @_ ? shift : qr|(\015?\012)|; 533 my $eol = @_ ? shift : qr|(\015?\012)|;
460 my $pos; 534 my $pos;
461 535
462 $eol = qr|(\Q$eol\E)| unless ref $eol; 536 $eol = qr|(\Q$eol\E)| unless ref $eol;
463 $eol = qr|^(.*?)($eol)|; 537 $eol = qr|^(.*?)($eol)|;
464 538
465 sub { 539 sub {
466 $_[0]{rbuf} =~ s/$eol// or return; 540 $_[0]{rbuf} =~ s/$eol// or return;
467 541
468 $cb->($1, $2); 542 $cb->($_[0], $1, $2);
469 1 543 1
470 } 544 }
471} 545}
472 546
473sub push_read_line { 547sub push_read_line {
474 my $self = shift;
475
476 $self->push_read (&_read_line); 548 $_[0]->push_read (&_read_line);
477} 549}
478 550
479sub unshift_read_line { 551sub unshift_read_line {
480 my $self = shift;
481
482 $self->unshift_read (&_read_line); 552 $_[0]->unshift_read (&_read_line);
553}
554
555=item $handle->stop_read
556
557=item $handle->start_read
558
559In rare cases you actually do not want to read anything form the
560socket. In this case you can call C<stop_read>. Neither C<on_read> no
561any queued callbacks will be executed then. To start readign again, call
562C<start_read>.
563
564=cut
565
566sub stop_read {
567 my ($self) = @_;
568
569 delete $self->{rw};
570}
571
572sub start_read {
573 my ($self) = @_;
574
575 unless ($self->{rw} || $self->{eof}) {
576 Scalar::Util::weaken $self;
577
578 $self->{rw} = AnyEvent->io (fh => $self->{fh}, poll => "r", cb => sub {
579 my $len = sysread $self->{fh}, $self->{rbuf}, $self->{read_size} || 8192, length $self->{rbuf};
580
581 if ($len > 0) {
582 if (defined $self->{rbuf_max}) {
583 if ($self->{rbuf_max} < length $self->{rbuf}) {
584 $! = &Errno::ENOSPC; return $self->error;
585 }
586 }
587
588 } elsif (defined $len) {
589 $self->{eof} = 1;
590 delete $self->{rw};
591
592 } elsif ($! != EAGAIN && $! != EINTR) {
593 return $self->error;
594 }
595
596 $self->_drain_rbuf;
597 });
598 }
483} 599}
484 600
485=back 601=back
486 602
487=head1 AUTHOR 603=head1 AUTHOR

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines