ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/IO-AIO/AIO.pm
(Generate patch)

Comparing IO-AIO/AIO.pm (file contents):
Revision 1.54 by root, Sun Oct 22 00:19:05 2006 UTC vs.
Revision 1.58 by root, Sun Oct 22 10:33:19 2006 UTC

15 15
16 aio_read $fh, 30000, 1024, $buffer, 0, sub { 16 aio_read $fh, 30000, 1024, $buffer, 0, sub {
17 $_[0] > 0 or die "read error: $!"; 17 $_[0] > 0 or die "read error: $!";
18 }; 18 };
19 19
20 use IO::AIO 2; # version has aio objects 20 # version 2+ has request and group objects
21 use IO::AIO 2;
21 22
22 my $req = aio_unlink "/tmp/file", sub { }; 23 my $req = aio_unlink "/tmp/file", sub { };
23 $req->cancel; # cancel request if still in queue 24 $req->cancel; # cancel request if still in queue
24 25
25 # AnyEvent 26 my $grp = aio_group sub { print "all stats done\n" };
27 add $grp aio_stat "..." for ...;
28
29 # AnyEvent integration
26 open my $fh, "<&=" . IO::AIO::poll_fileno or die "$!"; 30 open my $fh, "<&=" . IO::AIO::poll_fileno or die "$!";
27 my $w = AnyEvent->io (fh => $fh, poll => 'r', cb => sub { IO::AIO::poll_cb }); 31 my $w = AnyEvent->io (fh => $fh, poll => 'r', cb => sub { IO::AIO::poll_cb });
28 32
29 # Event 33 # Event integration
30 Event->io (fd => IO::AIO::poll_fileno, 34 Event->io (fd => IO::AIO::poll_fileno,
31 poll => 'r', 35 poll => 'r',
32 cb => \&IO::AIO::poll_cb); 36 cb => \&IO::AIO::poll_cb);
33 37
34 # Glib/Gtk2 38 # Glib/Gtk2 integration
35 add_watch Glib::IO IO::AIO::poll_fileno, 39 add_watch Glib::IO IO::AIO::poll_fileno,
36 in => sub { IO::AIO::poll_cb; 1 }; 40 in => sub { IO::AIO::poll_cb; 1 };
37 41
38 # Tk 42 # Tk integration
39 Tk::Event::IO->fileevent (IO::AIO::poll_fileno, "", 43 Tk::Event::IO->fileevent (IO::AIO::poll_fileno, "",
40 readable => \&IO::AIO::poll_cb); 44 readable => \&IO::AIO::poll_cb);
41 45
42 # Danga::Socket 46 # Danga::Socket integration
43 Danga::Socket->AddOtherFds (IO::AIO::poll_fileno => 47 Danga::Socket->AddOtherFds (IO::AIO::poll_fileno =>
44 \&IO::AIO::poll_cb); 48 \&IO::AIO::poll_cb);
45
46 49
47=head1 DESCRIPTION 50=head1 DESCRIPTION
48 51
49This module implements asynchronous I/O using whatever means your 52This module implements asynchronous I/O using whatever means your
50operating system supports. 53operating system supports.
71use strict 'vars'; 74use strict 'vars';
72 75
73use base 'Exporter'; 76use base 'Exporter';
74 77
75BEGIN { 78BEGIN {
76 our $VERSION = '1.99'; 79 our $VERSION = '2.0';
77 80
78 our @EXPORT = qw(aio_sendfile aio_read aio_write aio_open aio_close aio_stat 81 our @EXPORT = qw(aio_sendfile aio_read aio_write aio_open aio_close aio_stat
79 aio_lstat aio_unlink aio_rmdir aio_readdir aio_scandir aio_symlink 82 aio_lstat aio_unlink aio_rmdir aio_readdir aio_scandir aio_symlink
80 aio_fsync aio_fdatasync aio_readahead aio_rename aio_link aio_move 83 aio_fsync aio_fdatasync aio_readahead aio_rename aio_link aio_move
81 aio_group); 84 aio_group);
100syscall has been executed asynchronously. 103syscall has been executed asynchronously.
101 104
102All functions expecting a filehandle keep a copy of the filehandle 105All functions expecting a filehandle keep a copy of the filehandle
103internally until the request has finished. 106internally until the request has finished.
104 107
105All non-composite requests (requests that are not broken down into
106multiple requests) return objects of type L<IO::AIO::REQ> that allow 108All requests return objects of type L<IO::AIO::REQ> that allow further
107further manipulation of running requests. 109manipulation of those requests while they are in-flight.
108 110
109The pathnames you pass to these routines I<must> be absolute and 111The pathnames you pass to these routines I<must> be absolute and
110encoded in byte form. The reason for the former is that at the time the 112encoded in byte form. The reason for the former is that at the time the
111request is being executed, the current working directory could have 113request is being executed, the current working directory could have
112changed. Alternatively, you can make sure that you never change the 114changed. Alternatively, you can make sure that you never change the
180 print "read $_[0] bytes: <$buffer>\n"; 182 print "read $_[0] bytes: <$buffer>\n";
181 }; 183 };
182 184
183=item aio_move $srcpath, $dstpath, $callback->($status) 185=item aio_move $srcpath, $dstpath, $callback->($status)
184 186
187[EXPERIMENTAL due to internal aio_group use]
188
185Try to move the I<file> (directories not supported as either source or 189Try to move the I<file> (directories not supported as either source or
186destination) from C<$srcpath> to C<$dstpath> and call the callback with 190destination) from C<$srcpath> to C<$dstpath> and call the callback with
187the C<0> (error) or C<-1> ok. 191the C<0> (error) or C<-1> ok.
188 192
189This is a composite request that tries to rename(2) the file first. If 193This is a composite request that tries to rename(2) the file first. If
199=cut 203=cut
200 204
201sub aio_move($$$) { 205sub aio_move($$$) {
202 my ($src, $dst, $cb) = @_; 206 my ($src, $dst, $cb) = @_;
203 207
208 my $grp = aio_group $cb;
209
204 aio_rename $src, $dst, sub { 210 add $grp aio_rename $src, $dst, sub {
205 if ($_[0] && $! == EXDEV) { 211 if ($_[0] && $! == EXDEV) {
206 aio_open $src, O_RDONLY, 0, sub { 212 add $grp aio_open $src, O_RDONLY, 0, sub {
207 if (my $src_fh = $_[0]) { 213 if (my $src_fh = $_[0]) {
208 my @stat = stat $src_fh; 214 my @stat = stat $src_fh;
209 215
210 aio_open $dst, O_WRONLY, 0200, sub { 216 add $grp aio_open $dst, O_WRONLY, 0200, sub {
211 if (my $dst_fh = $_[0]) { 217 if (my $dst_fh = $_[0]) {
212 aio_sendfile $dst_fh, $src_fh, 0, $stat[7], sub { 218 add $grp aio_sendfile $dst_fh, $src_fh, 0, $stat[7], sub {
213 close $src_fh; 219 close $src_fh;
214 220
215 if ($_[0] == $stat[7]) { 221 if ($_[0] == $stat[7]) {
216 utime $stat[8], $stat[9], $dst; 222 utime $stat[8], $stat[9], $dst;
217 chmod $stat[2] & 07777, $dst_fh; 223 chmod $stat[2] & 07777, $dst_fh;
218 chown $stat[4], $stat[5], $dst_fh; 224 chown $stat[4], $stat[5], $dst_fh;
219 close $dst_fh; 225 close $dst_fh;
220 226
221 aio_unlink $src, sub { 227 add $grp aio_unlink $src, sub {
222 $cb->($_[0]); 228 $grp->result ($_[0]);
223 }; 229 };
224 } else { 230 } else {
225 my $errno = $!; 231 my $errno = $!;
226 aio_unlink $dst, sub { 232 add $grp aio_unlink $dst, sub {
227 $! = $errno; 233 $! = $errno;
228 $cb->(-1); 234 $grp->result (-1);
229 }; 235 };
230 } 236 }
231 }; 237 };
232 } else { 238 } else {
233 $cb->(-1); 239 $grp->result (-1);
234 } 240 }
235 }, 241 },
236 242
237 } else { 243 } else {
238 $cb->(-1); 244 $grp->result (-1);
239 } 245 }
240 }; 246 };
241 } else { 247 } else {
242 $cb->($_[0]); 248 $grp->result ($_[0]);
243 } 249 }
244 }; 250 };
251
252 $grp
245} 253}
246 254
247=item aio_sendfile $out_fh, $in_fh, $in_offset, $length, $callback->($retval) 255=item aio_sendfile $out_fh, $in_fh, $in_offset, $length, $callback->($retval)
248 256
249Tries to copy C<$length> bytes from C<$in_fh> to C<$out_fh>. It starts 257Tries to copy C<$length> bytes from C<$in_fh> to C<$out_fh>. It starts
337The callback a single argument which is either C<undef> or an array-ref 345The callback a single argument which is either C<undef> or an array-ref
338with the filenames. 346with the filenames.
339 347
340=item aio_scandir $path, $maxreq, $callback->($dirs, $nondirs) 348=item aio_scandir $path, $maxreq, $callback->($dirs, $nondirs)
341 349
350[EXPERIMENTAL due to internal aio_group use]
351
342Scans a directory (similar to C<aio_readdir>) but additionally tries to 352Scans a directory (similar to C<aio_readdir>) but additionally tries to
343separate the entries of directory C<$path> into two sets of names, ones 353separate the entries of directory C<$path> into two sets of names, ones
344you can recurse into (directories or links to them), and ones you cannot 354you can recurse into (directories or links to them), and ones you cannot
345recurse into (everything else). 355recurse into (everything else).
346 356
392=cut 402=cut
393 403
394sub aio_scandir($$$) { 404sub aio_scandir($$$) {
395 my ($path, $maxreq, $cb) = @_; 405 my ($path, $maxreq, $cb) = @_;
396 406
407 my $grp = aio_group $cb;
408
397 $maxreq = 8 if $maxreq <= 0; 409 $maxreq = 8 if $maxreq <= 0;
398 410
399 # stat once 411 # stat once
400 aio_stat $path, sub { 412 add $grp aio_stat $path, sub {
401 return $cb->() if $_[0]; 413 return $grp->result () if $_[0];
402 my $now = time; 414 my $now = time;
403 my $hash1 = join ":", (stat _)[0,1,3,7,9]; 415 my $hash1 = join ":", (stat _)[0,1,3,7,9];
404 416
405 # read the directory entries 417 # read the directory entries
406 aio_readdir $path, sub { 418 add $grp aio_readdir $path, sub {
407 my $entries = shift 419 my $entries = shift
408 or return $cb->(); 420 or return $grp->result ();
409 421
410 # stat the dir another time 422 # stat the dir another time
411 aio_stat $path, sub { 423 add $grp aio_stat $path, sub {
412 my $hash2 = join ":", (stat _)[0,1,3,7,9]; 424 my $hash2 = join ":", (stat _)[0,1,3,7,9];
413 425
414 my $ndirs; 426 my $ndirs;
415 427
416 # take the slow route if anything looks fishy 428 # take the slow route if anything looks fishy
418 $ndirs = -1; 430 $ndirs = -1;
419 } else { 431 } else {
420 # if nlink == 2, we are finished 432 # if nlink == 2, we are finished
421 # on non-posix-fs's, we rely on nlink < 2 433 # on non-posix-fs's, we rely on nlink < 2
422 $ndirs = (stat _)[3] - 2 434 $ndirs = (stat _)[3] - 2
423 or return $cb->([], $entries); 435 or return $grp->result ([], $entries);
424 } 436 }
425 437
426 # sort into likely dirs and likely nondirs 438 # sort into likely dirs and likely nondirs
427 # dirs == files without ".", short entries first 439 # dirs == files without ".", short entries first
428 $entries = [map $_->[0], 440 $entries = [map $_->[0],
438 $schedcb = sub { 450 $schedcb = sub {
439 if (@$entries) { 451 if (@$entries) {
440 if ($nreq < $maxreq) { 452 if ($nreq < $maxreq) {
441 my $ent = pop @$entries; 453 my $ent = pop @$entries;
442 $nreq++; 454 $nreq++;
443 aio_stat "$path/$ent/.", sub { $statcb->($_[0], $ent) }; 455 add $grp aio_stat "$path/$ent/.", sub { $statcb->($_[0], $ent) };
444 } 456 }
445 } elsif (!$nreq) { 457 } elsif (!$nreq) {
446 # finished 458 # finished
447 undef $statcb; 459 undef $statcb;
448 undef $schedcb; 460 undef $schedcb;
449 $cb->(\@dirs, \@nondirs) if $cb; 461 $grp->result (\@dirs, \@nondirs) if $cb;
450 undef $cb; 462 undef $cb;
451 } 463 }
452 }; 464 };
453 $statcb = sub { 465 $statcb = sub {
454 my ($status, $entry) = @_; 466 my ($status, $entry) = @_;
457 $nreq--; 469 $nreq--;
458 push @nondirs, $entry; 470 push @nondirs, $entry;
459 &$schedcb; 471 &$schedcb;
460 } else { 472 } else {
461 # need to check for real directory 473 # need to check for real directory
462 aio_lstat "$path/$entry", sub { 474 add $grp aio_lstat "$path/$entry", sub {
463 $nreq--; 475 $nreq--;
464 476
465 if (-d _) { 477 if (-d _) {
466 push @dirs, $entry; 478 push @dirs, $entry;
467 479
480 492
481 &$schedcb while @$entries && $nreq < $maxreq; 493 &$schedcb while @$entries && $nreq < $maxreq;
482 }; 494 };
483 }; 495 };
484 }; 496 };
497
498 $grp
485} 499}
486 500
487=item aio_fsync $fh, $callback->($status) 501=item aio_fsync $fh, $callback->($status)
488 502
489Asynchronously call fsync on the given filehandle and call the callback 503Asynchronously call fsync on the given filehandle and call the callback
495callback with the fdatasync result code. 509callback with the fdatasync result code.
496 510
497If this call isn't available because your OS lacks it or it couldn't be 511If this call isn't available because your OS lacks it or it couldn't be
498detected, it will be emulated by calling C<fsync> instead. 512detected, it will be emulated by calling C<fsync> instead.
499 513
500=item aio_group $callback->() 514=item aio_group $callback->(...)
501 515
516[EXPERIMENTAL]
517
518This is a very special aio request: Instead of doing something, it is a
519container for other aio requests, which is useful if you want to bundle
520many requests into a single, composite, request.
521
522Returns an object of class L<IO::AIO::GRP>. See its documentation below
523for more info.
524
525Example:
526
527 my $grp = aio_group sub {
528 print "all stats done\n";
529 };
530
531 add $grp
532 (aio_stat ...),
533 (aio_stat ...),
534 ...;
535
502=item aio_sleep $fractional_seconds, $callback->() *NOT EXPORTED* 536=item IO::AIO::aio_sleep $fractional_seconds, $callback->() *NOT EXPORTED*
503 537
504Mainly used for debugging and benchmarking, this aio request puts one of 538Mainly used for debugging and benchmarking, this aio request puts one of
505the request workers to sleep for the given time. 539the request workers to sleep for the given time.
540
541While it is theoretically handy to have simple I/O scheduling requests
542like sleep and file handle readable/writable, the overhead this creates
543is immense, so do not use this function except to put your application
544under artificial I/O pressure.
506 545
507=back 546=back
508 547
509=head2 IO::AIO::REQ CLASS 548=head2 IO::AIO::REQ CLASS
510 549
529untouched. That means that requests that currently execute will not be 568untouched. That means that requests that currently execute will not be
530stopped and resources held by the request will not be freed prematurely. 569stopped and resources held by the request will not be freed prematurely.
531 570
532=back 571=back
533 572
573=head2 IO::AIO::GRP CLASS
574
575This class is a subclass of L<IO::AIO::REQ>, so all its methods apply to
576objects of this class, too.
577
578A IO::AIO::GRP object is a special request that can contain multiple other
579aio requests.
580
581You create one by calling the C<aio_group> constructing function with a
582callback that will be called when all contained requests have entered the
583C<done> state:
584
585 my $grp = aio_group sub {
586 print "all requests are done\n";
587 };
588
589You add requests by calling the C<add> method with one or more
590C<IO::AIO::REQ> objects:
591
592 $grp->add (aio_unlink "...");
593
594 add $grp aio_stat "...", sub {
595 $_[0] or return $grp->result ("error");
596
597 # add another request dynamically, if first succeeded
598 add $grp aio_open "...", sub {
599 $grp->result ("ok");
600 };
601 };
602
603This makes it very easy to create composite requests (see the source of
604C<aio_move> for an application) that work and feel like simple requests.
605
606The IO::AIO::GRP objects will be cleaned up during calls to
607C<IO::AIO::poll_cb>, just like any other request.
608
609They can be canceled like any other request. Canceling will cancel not
610just the request itself, but also all requests it contains.
611
612They can also can also be added to other IO::AIO::GRP objects.
613
614Their lifetime, simplified, looks like this: when they are empty, they
615will finish very quickly. If they contain only requests that are in the
616C<done> state, they will also finish. Otherwise they will continue to
617exist.
618
619That means after creating a group you have some time to add requests. And
620in the callbacks of those requests, you can add further requests to the
621group. And only when all those requests have finished will the the group
622itself finish.
623
624=over 4
625
626=item $grp->add (...)
627
628=item add $grp ...
629
630Add one or more requests to the group. Any type of L<IO::AIO::REQ> can
631be added, including other groups, as long as you do not create circular
632dependencies.
633
634Returns all its arguments.
635
636=item $grp->result (...)
637
638Set the result value(s) that will be passed to the group callback when all
639subrequests have finished. By default, no argument will be passed.
640
641=back
642
534=head2 SUPPORT FUNCTIONS 643=head2 SUPPORT FUNCTIONS
535 644
536=over 4 645=over 4
537 646
538=item $fileno = IO::AIO::poll_fileno 647=item $fileno = IO::AIO::poll_fileno

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines