ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent/lib/AnyEvent/Handle.pm
(Generate patch)

Comparing AnyEvent/lib/AnyEvent/Handle.pm (file contents):
Revision 1.8 by root, Fri May 2 15:36:10 2008 UTC vs.
Revision 1.17 by root, Sat May 24 04:17:45 2008 UTC

12 12
13=head1 NAME 13=head1 NAME
14 14
15AnyEvent::Handle - non-blocking I/O on filehandles via AnyEvent 15AnyEvent::Handle - non-blocking I/O on filehandles via AnyEvent
16 16
17=cut 17This module is experimental.
18 18
19=cut
20
19our $VERSION = '0.02'; 21our $VERSION = '0.04';
20 22
21=head1 SYNOPSIS 23=head1 SYNOPSIS
22 24
23 use AnyEvent; 25 use AnyEvent;
24 use AnyEvent::Handle; 26 use AnyEvent::Handle;
43 $cv->wait; 45 $cv->wait;
44 46
45=head1 DESCRIPTION 47=head1 DESCRIPTION
46 48
47This module is a helper module to make it easier to do event-based I/O on 49This module is a helper module to make it easier to do event-based I/O on
48filehandles (and sockets, see L<AnyEvent::Socket> for an easy way to make 50filehandles. For utility functions for doing non-blocking connects and accepts
49non-blocking resolves and connects). 51on sockets see L<AnyEvent::Util>.
50 52
51In the following, when the documentation refers to of "bytes" then this 53In the following, when the documentation refers to of "bytes" then this
52means characters. As sysread and syswrite are used for all I/O, their 54means characters. As sysread and syswrite are used for all I/O, their
53treatment of characters applies to this module as well. 55treatment of characters applies to this module as well.
54 56
70The filehandle this L<AnyEvent::Handle> object will operate on. 72The filehandle this L<AnyEvent::Handle> object will operate on.
71 73
72NOTE: The filehandle will be set to non-blocking (using 74NOTE: The filehandle will be set to non-blocking (using
73AnyEvent::Util::fh_nonblocking). 75AnyEvent::Util::fh_nonblocking).
74 76
77=item on_eof => $cb->($self)
78
79Set the callback to be called on EOF.
80
81While not mandatory, it is highly recommended to set an eof callback,
82otherwise you might end up with a closed socket while you are still
83waiting for data.
84
75=item on_error => $cb->($self) [MANDATORY] 85=item on_error => $cb->($self)
76 86
77This is the fatal error callback, that is called when a fatal error ocurs, 87This is the fatal error callback, that is called when, well, a fatal error
78such as not being able to resolve the hostname, failure to connect or a 88ocurs, such as not being able to resolve the hostname, failure to connect
79read error. 89or a read error.
80 90
81The object will not be in a usable state when this callback has been 91The object will not be in a usable state when this callback has been
82called. 92called.
83 93
84On callback entrance, the value of C<$!> contains the opertaing system 94On callback entrance, the value of C<$!> contains the operating system
85error (or C<ENOSPC> or C<EPIPE>). 95error (or C<ENOSPC> or C<EPIPE>).
86 96
87=item on_eof => $cb->($self) [MANDATORY] 97While not mandatory, it is I<highly> recommended to set this callback, as
88 98you will not be notified of errors otherwise. The default simply calls
89Set the callback to be called on EOF. 99die.
90 100
91=item on_read => $cb->($self) 101=item on_read => $cb->($self)
92 102
93This sets the default read callback, which is called when data arrives 103This sets the default read callback, which is called when data arrives
94and no read request is in the queue. If the read callback is C<undef> 104and no read request is in the queue.
95or has never been set, than AnyEvent::Handle will cease reading from the
96filehandle.
97 105
98To access (and remove data from) the read buffer, use the C<< ->rbuf >> 106To access (and remove data from) the read buffer, use the C<< ->rbuf >>
99method or acces sthe C<$self->{rbuf}> member directly. 107method or acces sthe C<$self->{rbuf}> member directly.
100 108
101When an EOF condition is detected then AnyEvent::Handle will first try to 109When an EOF condition is detected then AnyEvent::Handle will first try to
144 152
145 $self->{fh} or Carp::croak "mandatory argument fh is missing"; 153 $self->{fh} or Carp::croak "mandatory argument fh is missing";
146 154
147 AnyEvent::Util::fh_nonblocking $self->{fh}, 1; 155 AnyEvent::Util::fh_nonblocking $self->{fh}, 1;
148 156
149 $self->on_error ((delete $self->{on_error}) or Carp::croak "mandatory argument on_error is missing"); 157 $self->on_eof (delete $self->{on_eof} ) if $self->{on_eof};
150 $self->on_eof ((delete $self->{on_eof} ) or Carp::croak "mandatory argument on_eof is missing"); 158 $self->on_error (delete $self->{on_error}) if $self->{on_error};
151
152 $self->on_drain (delete $self->{on_drain}) if $self->{on_drain}; 159 $self->on_drain (delete $self->{on_drain}) if $self->{on_drain};
153 $self->on_read (delete $self->{on_read} ) if $self->{on_read}; 160 $self->on_read (delete $self->{on_read} ) if $self->{on_read};
161
162 $self->start_read;
154 163
155 $self 164 $self
156} 165}
157 166
158sub _shutdown { 167sub _shutdown {
169 { 178 {
170 local $!; 179 local $!;
171 $self->_shutdown; 180 $self->_shutdown;
172 } 181 }
173 182
183 if ($self->{on_error}) {
174 $self->{on_error}($self); 184 $self->{on_error}($self);
185 } else {
186 die "AnyEvent::Handle uncaught fatal error: $!";
187 }
175} 188}
176 189
177=item $fh = $handle->fh 190=item $fh = $handle->fh
178 191
179This method returns the filehandle of the L<AnyEvent::Handle> object. 192This method returns the filehandle of the L<AnyEvent::Handle> object.
196 209
197Replace the current C<on_eof> callback (see the C<on_eof> constructor argument). 210Replace the current C<on_eof> callback (see the C<on_eof> constructor argument).
198 211
199=cut 212=cut
200 213
201#############################################################################
202
203sub on_eof { 214sub on_eof {
204 $_[0]{on_eof} = $_[1]; 215 $_[0]{on_eof} = $_[1];
205} 216}
217
218#############################################################################
219
220=back
221
222=head2 WRITE QUEUE
223
224AnyEvent::Handle manages two queues per handle, one for writing and one
225for reading.
226
227The write queue is very simple: you can add data to its end, and
228AnyEvent::Handle will automatically try to get rid of it for you.
229
230When data could be writtena nd the write buffer is shorter then the low
231water mark, the C<on_drain> callback will be invoked.
232
233=over 4
206 234
207=item $handle->on_drain ($cb) 235=item $handle->on_drain ($cb)
208 236
209Sets the C<on_drain> callback or clears it (see the description of 237Sets the C<on_drain> callback or clears it (see the description of
210C<on_drain> in the constructor). 238C<on_drain> in the constructor).
226want (only limited by the available memory), as C<AnyEvent::Handle> 254want (only limited by the available memory), as C<AnyEvent::Handle>
227buffers it independently of the kernel. 255buffers it independently of the kernel.
228 256
229=cut 257=cut
230 258
231sub push_write { 259sub _drain_wbuf {
232 my ($self, $data) = @_; 260 my ($self) = @_;
233
234 $self->{wbuf} .= $data;
235 261
236 unless ($self->{ww}) { 262 unless ($self->{ww}) {
237 Scalar::Util::weaken $self; 263 Scalar::Util::weaken $self;
238 my $cb = sub { 264 my $cb = sub {
239 my $len = syswrite $self->{fh}, $self->{wbuf}; 265 my $len = syswrite $self->{fh}, $self->{wbuf};
240 266
241 if ($len > 0) { 267 if ($len > 0) {
242 substr $self->{wbuf}, 0, $len, ""; 268 substr $self->{wbuf}, 0, $len, "";
243
244 269
245 $self->{on_drain}($self) 270 $self->{on_drain}($self)
246 if $self->{low_water_mark} >= length $self->{wbuf} 271 if $self->{low_water_mark} >= length $self->{wbuf}
247 && $self->{on_drain}; 272 && $self->{on_drain};
248 273
256 281
257 $cb->($self); 282 $cb->($self);
258 }; 283 };
259} 284}
260 285
286sub push_write {
287 my $self = shift;
288
289 if ($self->{filter_w}) {
290 $self->{filter_w}->(\$_[0]);
291 } else {
292 $self->{wbuf} .= $_[0];
293 $self->_drain_wbuf;
294 }
295}
296
261############################################################################# 297#############################################################################
298
299=back
300
301=head2 READ QUEUE
302
303AnyEvent::Handle manages two queues per handle, one for writing and one
304for reading.
305
306The read queue is more complex than the write queue. It can be used in two
307ways, the "simple" way, using only C<on_read> and the "complex" way, using
308a queue.
309
310In the simple case, you just install an C<on_read> callback and whenever
311new data arrives, it will be called. You can then remove some data (if
312enough is there) from the read buffer (C<< $handle->rbuf >>) if you want
313or not.
314
315In the more complex case, you want to queue multiple callbacks. In this
316case, AnyEvent::Handle will call the first queued callback each time new
317data arrives and removes it when it has done its job (see C<push_read>,
318below).
319
320This way you can, for example, push three line-reads, followed by reading
321a chunk of data, and AnyEvent::Handle will execute them in order.
322
323Example 1: EPP protocol parser. EPP sends 4 byte length info, followed by
324the specified number of bytes which give an XML datagram.
325
326 # in the default state, expect some header bytes
327 $handle->on_read (sub {
328 # some data is here, now queue the length-header-read (4 octets)
329 shift->unshift_read_chunk (4, sub {
330 # header arrived, decode
331 my $len = unpack "N", $_[1];
332
333 # now read the payload
334 shift->unshift_read_chunk ($len, sub {
335 my $xml = $_[1];
336 # handle xml
337 });
338 });
339 });
340
341Example 2: Implement a client for a protocol that replies either with
342"OK" and another line or "ERROR" for one request, and 64 bytes for the
343second request. Due tot he availability of a full queue, we can just
344pipeline sending both requests and manipulate the queue as necessary in
345the callbacks:
346
347 # request one
348 $handle->push_write ("request 1\015\012");
349
350 # we expect "ERROR" or "OK" as response, so push a line read
351 $handle->push_read_line (sub {
352 # if we got an "OK", we have to _prepend_ another line,
353 # so it will be read before the second request reads its 64 bytes
354 # which are already in the queue when this callback is called
355 # we don't do this in case we got an error
356 if ($_[1] eq "OK") {
357 $_[0]->unshift_read_line (sub {
358 my $response = $_[1];
359 ...
360 });
361 }
362 });
363
364 # request two
365 $handle->push_write ("request 2\015\012");
366
367 # simply read 64 bytes, always
368 $handle->push_read_chunk (64, sub {
369 my $response = $_[1];
370 ...
371 });
372
373=over 4
374
375=cut
262 376
263sub _drain_rbuf { 377sub _drain_rbuf {
264 my ($self) = @_; 378 my ($self) = @_;
265 379
380 if (
381 defined $self->{rbuf_max}
382 && $self->{rbuf_max} < length $self->{rbuf}
383 ) {
384 $! = &Errno::ENOSPC; return $self->error;
385 }
386
266 return if exists $self->{in_drain}; 387 return if $self->{in_drain};
267 local $self->{in_drain} = 1; 388 local $self->{in_drain} = 1;
268 389
269 while (my $len = length $self->{rbuf}) { 390 while (my $len = length $self->{rbuf}) {
270 no strict 'refs'; 391 no strict 'refs';
271 if (@{ $self->{queue} }) { 392 if (my $cb = shift @{ $self->{queue} }) {
272 if ($self->{queue}[0]($self)) { 393 if (!$cb->($self)) {
273 shift @{ $self->{queue} };
274 } elsif ($self->{eof}) { 394 if ($self->{eof}) {
275 # no progress can be made (not enough data and no data forthcoming) 395 # no progress can be made (not enough data and no data forthcoming)
276 $! = &Errno::EPIPE; return $self->error; 396 $! = &Errno::EPIPE; return $self->error;
277 } else { 397 }
398
399 unshift @{ $self->{queue} }, $cb;
278 return; 400 return;
279 } 401 }
280 } elsif ($self->{on_read}) { 402 } elsif ($self->{on_read}) {
281 $self->{on_read}($self); 403 $self->{on_read}($self);
282 404
296 } 418 }
297 } 419 }
298 420
299 if ($self->{eof}) { 421 if ($self->{eof}) {
300 $self->_shutdown; 422 $self->_shutdown;
301 $self->{on_eof}($self); 423 $self->{on_eof}($self)
424 if $self->{on_eof};
302 } 425 }
303} 426}
304 427
305=item $handle->on_read ($cb) 428=item $handle->on_read ($cb)
306 429
312 435
313sub on_read { 436sub on_read {
314 my ($self, $cb) = @_; 437 my ($self, $cb) = @_;
315 438
316 $self->{on_read} = $cb; 439 $self->{on_read} = $cb;
317
318 unless ($self->{rw} || $self->{eof}) {
319 Scalar::Util::weaken $self;
320
321 $self->{rw} = AnyEvent->io (fh => $self->{fh}, poll => "r", cb => sub {
322 my $len = sysread $self->{fh}, $self->{rbuf}, $self->{read_size} || 8192, length $self->{rbuf};
323
324 if ($len > 0) {
325 if (exists $self->{rbuf_max}) {
326 if ($self->{rbuf_max} < length $self->{rbuf}) {
327 $! = &Errno::ENOSPC; return $self->error;
328 }
329 }
330
331 } elsif (defined $len) {
332 $self->{eof} = 1;
333 delete $self->{rw};
334
335 } elsif ($! != EAGAIN && $! != EINTR) {
336 return $self->error;
337 }
338
339 $self->_drain_rbuf;
340 });
341 }
342} 440}
343 441
344=item $handle->rbuf 442=item $handle->rbuf
345 443
346Returns the read buffer (as a modifiable lvalue). 444Returns the read buffer (as a modifiable lvalue).
404these C<$len> bytes will be passed to the callback. 502these C<$len> bytes will be passed to the callback.
405 503
406=cut 504=cut
407 505
408sub _read_chunk($$) { 506sub _read_chunk($$) {
409 my ($len, $cb) = @_; 507 my ($self, $len, $cb) = @_;
410 508
411 sub { 509 sub {
412 $len <= length $_[0]{rbuf} or return; 510 $len <= length $_[0]{rbuf} or return;
413 $cb->($_[0], substr $_[0]{rbuf}, 0, $len, ""); 511 $cb->($_[0], substr $_[0]{rbuf}, 0, $len, "");
414 1 512 1
415 } 513 }
416} 514}
417 515
418sub push_read_chunk { 516sub push_read_chunk {
419 my ($self, $len, $cb) = @_;
420
421 $self->push_read (_read_chunk $len, $cb); 517 $_[0]->push_read (&_read_chunk);
422} 518}
423 519
424 520
425sub unshift_read_chunk { 521sub unshift_read_chunk {
426 my ($self, $len, $cb) = @_;
427
428 $self->unshift_read (_read_chunk $len, $cb); 522 $_[0]->unshift_read (&_read_chunk);
429} 523}
430 524
431=item $handle->push_read_line ([$eol, ]$cb->($self, $line, $eol)) 525=item $handle->push_read_line ([$eol, ]$cb->($self, $line, $eol))
432 526
433=item $handle->unshift_read_line ([$eol, ]$cb->($self, $line, $eol)) 527=item $handle->unshift_read_line ([$eol, ]$cb->($self, $line, $eol))
453not marked by the end of line marker. 547not marked by the end of line marker.
454 548
455=cut 549=cut
456 550
457sub _read_line($$) { 551sub _read_line($$) {
552 my $self = shift;
458 my $cb = pop; 553 my $cb = pop;
459 my $eol = @_ ? shift : qr|(\015?\012)|; 554 my $eol = @_ ? shift : qr|(\015?\012)|;
460 my $pos; 555 my $pos;
461 556
462 $eol = qr|(\Q$eol\E)| unless ref $eol; 557 $eol = quotemeta $eol unless ref $eol;
463 $eol = qr|^(.*?)($eol)|; 558 $eol = qr|^(.*?)($eol)|s;
464 559
465 sub { 560 sub {
466 $_[0]{rbuf} =~ s/$eol// or return; 561 $_[0]{rbuf} =~ s/$eol// or return;
467 562
468 $cb->($1, $2); 563 $cb->($_[0], $1, $2);
469 1 564 1
470 } 565 }
471} 566}
472 567
473sub push_read_line { 568sub push_read_line {
474 my $self = shift;
475
476 $self->push_read (&_read_line); 569 $_[0]->push_read (&_read_line);
477} 570}
478 571
479sub unshift_read_line { 572sub unshift_read_line {
480 my $self = shift;
481
482 $self->unshift_read (&_read_line); 573 $_[0]->unshift_read (&_read_line);
574}
575
576=item $handle->stop_read
577
578=item $handle->start_read
579
580In rare cases you actually do not want to read anything form the
581socket. In this case you can call C<stop_read>. Neither C<on_read> no
582any queued callbacks will be executed then. To start readign again, call
583C<start_read>.
584
585=cut
586
587sub stop_read {
588 my ($self) = @_;
589
590 delete $self->{rw};
591}
592
593sub start_read {
594 my ($self) = @_;
595
596 unless ($self->{rw} || $self->{eof}) {
597 Scalar::Util::weaken $self;
598
599 $self->{rw} = AnyEvent->io (fh => $self->{fh}, poll => "r", cb => sub {
600 my $rbuf = $self->{filter_r} ? \my $buf : \$self->{rbuf};
601 my $len = sysread $self->{fh}, $$rbuf, $self->{read_size} || 8192, length $$rbuf;
602
603 if ($len > 0) {
604 $self->{filter_r}
605 ? $self->{filter_r}->($rbuf)
606 : $self->_drain_rbuf;
607
608 } elsif (defined $len) {
609 delete $self->{rw};
610 $self->{eof} = 1;
611 $self->_drain_rbuf;
612
613 } elsif ($! != EAGAIN && $! != EINTR) {
614 return $self->error;
615 }
616 });
617 }
483} 618}
484 619
485=back 620=back
486 621
487=head1 AUTHOR 622=head1 AUTHOR

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines