ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/IO-AIO/AIO.pm
(Generate patch)

Comparing IO-AIO/AIO.pm (file contents):
Revision 1.89 by root, Sun Oct 29 11:03:18 2006 UTC vs.
Revision 1.100 by root, Sun Jan 7 21:36:58 2007 UTC

5=head1 SYNOPSIS 5=head1 SYNOPSIS
6 6
7 use IO::AIO; 7 use IO::AIO;
8 8
9 aio_open "/etc/passwd", O_RDONLY, 0, sub { 9 aio_open "/etc/passwd", O_RDONLY, 0, sub {
10 my ($fh) = @_; 10 my $fh = shift
11 or die "/etc/passwd: $!";
11 ... 12 ...
12 }; 13 };
13 14
14 aio_unlink "/tmp/file", sub { }; 15 aio_unlink "/tmp/file", sub { };
15 16
61etc.), but can also be used to easily do operations in parallel that are 62etc.), but can also be used to easily do operations in parallel that are
62normally done sequentially, e.g. stat'ing many files, which is much faster 63normally done sequentially, e.g. stat'ing many files, which is much faster
63on a RAID volume or over NFS when you do a number of stat operations 64on a RAID volume or over NFS when you do a number of stat operations
64concurrently. 65concurrently.
65 66
66While this works on all types of file descriptors (for example sockets), 67While most of this works on all types of file descriptors (for example
67using these functions on file descriptors that support nonblocking 68sockets), using these functions on file descriptors that support
68operation (again, sockets, pipes etc.) is very inefficient. Use an event 69nonblocking operation (again, sockets, pipes etc.) is very inefficient or
70might not work (aio_read fails on sockets/pipes/fifos). Use an event loop
69loop for that (such as the L<Event|Event> module): IO::AIO will naturally 71for that (such as the L<Event|Event> module): IO::AIO will naturally fit
70fit into such an event loop itself. 72into such an event loop itself.
71 73
72In this version, a number of threads are started that execute your 74In this version, a number of threads are started that execute your
73requests and signal their completion. You don't need thread support 75requests and signal their completion. You don't need thread support
74in perl, and the threads created by this module will not be visible 76in perl, and the threads created by this module will not be visible
75to perl. In the future, this module might make use of the native aio 77to perl. In the future, this module might make use of the native aio
98 poll => 'r', 100 poll => 'r',
99 cb => \&IO::AIO::poll_cb); 101 cb => \&IO::AIO::poll_cb);
100 102
101 # queue the request to open /etc/passwd 103 # queue the request to open /etc/passwd
102 aio_open "/etc/passwd", O_RDONLY, 0, sub { 104 aio_open "/etc/passwd", O_RDONLY, 0, sub {
103 my $fh = $_[0] 105 my $fh = shift
104 or die "error while opening: $!"; 106 or die "error while opening: $!";
105 107
106 # stat'ing filehandles is generally non-blocking 108 # stat'ing filehandles is generally non-blocking
107 my $size = -s $fh; 109 my $size = -s $fh;
108 110
188use strict 'vars'; 190use strict 'vars';
189 191
190use base 'Exporter'; 192use base 'Exporter';
191 193
192BEGIN { 194BEGIN {
193 our $VERSION = '2.1'; 195 our $VERSION = '2.32';
194 196
195 our @AIO_REQ = qw(aio_sendfile aio_read aio_write aio_open aio_close aio_stat 197 our @AIO_REQ = qw(aio_sendfile aio_read aio_write aio_open aio_close aio_stat
196 aio_lstat aio_unlink aio_rmdir aio_readdir aio_scandir aio_symlink 198 aio_lstat aio_unlink aio_rmdir aio_readdir aio_scandir aio_symlink
197 aio_fsync aio_fdatasync aio_readahead aio_rename aio_link aio_move 199 aio_readlink aio_fsync aio_fdatasync aio_readahead aio_rename aio_link
198 aio_copy aio_group aio_nop aio_mknod); 200 aio_move aio_copy aio_group aio_nop aio_mknod aio_load aio_rmtree);
199 our @EXPORT = (@AIO_REQ, qw(aioreq_pri aioreq_nice)); 201 our @EXPORT = (@AIO_REQ, qw(aioreq_pri aioreq_nice aio_block));
200 our @EXPORT_OK = qw(poll_fileno poll_cb poll_wait flush 202 our @EXPORT_OK = qw(poll_fileno poll_cb poll_wait flush
201 min_parallel max_parallel max_idle 203 min_parallel max_parallel max_idle
202 nreqs nready npending nthreads 204 nreqs nready npending nthreads
203 max_poll_time max_poll_reqs); 205 max_poll_time max_poll_reqs);
204 206
417=item aio_symlink $srcpath, $dstpath, $callback->($status) 419=item aio_symlink $srcpath, $dstpath, $callback->($status)
418 420
419Asynchronously create a new symbolic link to the existing object at C<$srcpath> at 421Asynchronously create a new symbolic link to the existing object at C<$srcpath> at
420the path C<$dstpath> and call the callback with the result code. 422the path C<$dstpath> and call the callback with the result code.
421 423
424=item aio_readlink $path, $callback->($link)
425
426Asynchronously read the symlink specified by C<$path> and pass it to
427the callback. If an error occurs, nothing or undef gets passed to the
428callback.
429
422=item aio_rename $srcpath, $dstpath, $callback->($status) 430=item aio_rename $srcpath, $dstpath, $callback->($status)
423 431
424Asynchronously rename the object at C<$srcpath> to C<$dstpath>, just as 432Asynchronously rename the object at C<$srcpath> to C<$dstpath>, just as
425rename(2) and call the callback with the result code. 433rename(2) and call the callback with the result code.
426 434
435directory (i.e. opendir + readdir + closedir). The entries will not be 443directory (i.e. opendir + readdir + closedir). The entries will not be
436sorted, and will B<NOT> include the C<.> and C<..> entries. 444sorted, and will B<NOT> include the C<.> and C<..> entries.
437 445
438The callback a single argument which is either C<undef> or an array-ref 446The callback a single argument which is either C<undef> or an array-ref
439with the filenames. 447with the filenames.
448
449=item aio_load $path, $data, $callback->($status)
450
451This is a composite request that tries to fully load the given file into
452memory. Status is the same as with aio_read.
453
454=cut
455
456sub aio_load($$;$) {
457 aio_block {
458 my ($path, undef, $cb) = @_;
459 my $data = \$_[1];
460
461 my $pri = aioreq_pri;
462 my $grp = aio_group $cb;
463
464 aioreq_pri $pri;
465 add $grp aio_open $path, O_RDONLY, 0, sub {
466 my ($fh) = @_
467 or return $grp->result (-1);
468
469 aioreq_pri $pri;
470 add $grp aio_read $fh, 0, (-s $fh), $$data, 0, sub {
471 $grp->result ($_[0]);
472 };
473 };
474
475 $grp
476 }
477}
440 478
441=item aio_copy $srcpath, $dstpath, $callback->($status) 479=item aio_copy $srcpath, $dstpath, $callback->($status)
442 480
443Try to copy the I<file> (directories not supported as either source or 481Try to copy the I<file> (directories not supported as either source or
444destination) from C<$srcpath> to C<$dstpath> and call the callback with 482destination) from C<$srcpath> to C<$dstpath> and call the callback with
454errors are being ignored. 492errors are being ignored.
455 493
456=cut 494=cut
457 495
458sub aio_copy($$;$) { 496sub aio_copy($$;$) {
497 aio_block {
459 my ($src, $dst, $cb) = @_; 498 my ($src, $dst, $cb) = @_;
460 499
461 my $pri = aioreq_pri; 500 my $pri = aioreq_pri;
462 my $grp = aio_group $cb; 501 my $grp = aio_group $cb;
463 502
464 aioreq_pri $pri; 503 aioreq_pri $pri;
465 add $grp aio_open $src, O_RDONLY, 0, sub { 504 add $grp aio_open $src, O_RDONLY, 0, sub {
466 if (my $src_fh = $_[0]) { 505 if (my $src_fh = $_[0]) {
467 my @stat = stat $src_fh; 506 my @stat = stat $src_fh;
468 507
469 aioreq_pri $pri; 508 aioreq_pri $pri;
470 add $grp aio_open $dst, O_CREAT | O_WRONLY | O_TRUNC, 0200, sub { 509 add $grp aio_open $dst, O_CREAT | O_WRONLY | O_TRUNC, 0200, sub {
471 if (my $dst_fh = $_[0]) { 510 if (my $dst_fh = $_[0]) {
472 aioreq_pri $pri; 511 aioreq_pri $pri;
473 add $grp aio_sendfile $dst_fh, $src_fh, 0, $stat[7], sub { 512 add $grp aio_sendfile $dst_fh, $src_fh, 0, $stat[7], sub {
474 if ($_[0] == $stat[7]) { 513 if ($_[0] == $stat[7]) {
475 $grp->result (0); 514 $grp->result (0);
476 close $src_fh; 515 close $src_fh;
477 516
478 # those should not normally block. should. should. 517 # those should not normally block. should. should.
479 utime $stat[8], $stat[9], $dst; 518 utime $stat[8], $stat[9], $dst;
480 chmod $stat[2] & 07777, $dst_fh; 519 chmod $stat[2] & 07777, $dst_fh;
481 chown $stat[4], $stat[5], $dst_fh; 520 chown $stat[4], $stat[5], $dst_fh;
482 close $dst_fh; 521 close $dst_fh;
483 } else { 522 } else {
484 $grp->result (-1); 523 $grp->result (-1);
485 close $src_fh; 524 close $src_fh;
486 close $dst_fh; 525 close $dst_fh;
487 526
488 aioreq $pri; 527 aioreq $pri;
489 add $grp aio_unlink $dst; 528 add $grp aio_unlink $dst;
529 }
490 } 530 };
531 } else {
532 $grp->result (-1);
491 }; 533 }
492 } else {
493 $grp->result (-1);
494 } 534 },
535
536 } else {
537 $grp->result (-1);
495 }, 538 }
496
497 } else {
498 $grp->result (-1);
499 } 539 };
540
541 $grp
500 }; 542 }
501
502 $grp
503} 543}
504 544
505=item aio_move $srcpath, $dstpath, $callback->($status) 545=item aio_move $srcpath, $dstpath, $callback->($status)
506 546
507Try to move the I<file> (directories not supported as either source or 547Try to move the I<file> (directories not supported as either source or
513that is successful, unlinking the C<$srcpath>. 553that is successful, unlinking the C<$srcpath>.
514 554
515=cut 555=cut
516 556
517sub aio_move($$;$) { 557sub aio_move($$;$) {
558 aio_block {
518 my ($src, $dst, $cb) = @_; 559 my ($src, $dst, $cb) = @_;
519 560
520 my $pri = aioreq_pri; 561 my $pri = aioreq_pri;
521 my $grp = aio_group $cb; 562 my $grp = aio_group $cb;
522 563
523 aioreq_pri $pri; 564 aioreq_pri $pri;
524 add $grp aio_rename $src, $dst, sub { 565 add $grp aio_rename $src, $dst, sub {
525 if ($_[0] && $! == EXDEV) { 566 if ($_[0] && $! == EXDEV) {
526 aioreq_pri $pri; 567 aioreq_pri $pri;
527 add $grp aio_copy $src, $dst, sub { 568 add $grp aio_copy $src, $dst, sub {
569 $grp->result ($_[0]);
570
571 if (!$_[0]) {
572 aioreq_pri $pri;
573 add $grp aio_unlink $src;
574 }
575 };
576 } else {
528 $grp->result ($_[0]); 577 $grp->result ($_[0]);
529
530 if (!$_[0]) {
531 aioreq_pri $pri;
532 add $grp aio_unlink $src;
533 }
534 }; 578 }
535 } else {
536 $grp->result ($_[0]);
537 } 579 };
580
581 $grp
538 }; 582 }
539
540 $grp
541} 583}
542 584
543=item aio_scandir $path, $maxreq, $callback->($dirs, $nondirs) 585=item aio_scandir $path, $maxreq, $callback->($dirs, $nondirs)
544 586
545Scans a directory (similar to C<aio_readdir>) but additionally tries to 587Scans a directory (similar to C<aio_readdir>) but additionally tries to
592as those tend to return 0 or 1 as link counts, which disables the 634as those tend to return 0 or 1 as link counts, which disables the
593directory counting heuristic. 635directory counting heuristic.
594 636
595=cut 637=cut
596 638
597sub aio_scandir($$$) { 639sub aio_scandir($$;$) {
640 aio_block {
598 my ($path, $maxreq, $cb) = @_; 641 my ($path, $maxreq, $cb) = @_;
599 642
600 my $pri = aioreq_pri; 643 my $pri = aioreq_pri;
601 644
602 my $grp = aio_group $cb; 645 my $grp = aio_group $cb;
603 646
604 $maxreq = 4 if $maxreq <= 0; 647 $maxreq = 4 if $maxreq <= 0;
605 648
606 # stat once 649 # stat once
607 aioreq_pri $pri;
608 add $grp aio_stat $path, sub {
609 return $grp->result () if $_[0];
610 my $now = time;
611 my $hash1 = join ":", (stat _)[0,1,3,7,9];
612
613 # read the directory entries
614 aioreq_pri $pri; 650 aioreq_pri $pri;
615 add $grp aio_readdir $path, sub { 651 add $grp aio_stat $path, sub {
616 my $entries = shift
617 or return $grp->result (); 652 return $grp->result () if $_[0];
653 my $now = time;
654 my $hash1 = join ":", (stat _)[0,1,3,7,9];
618 655
619 # stat the dir another time 656 # read the directory entries
620 aioreq_pri $pri; 657 aioreq_pri $pri;
658 add $grp aio_readdir $path, sub {
659 my $entries = shift
660 or return $grp->result ();
661
662 # stat the dir another time
663 aioreq_pri $pri;
621 add $grp aio_stat $path, sub { 664 add $grp aio_stat $path, sub {
622 my $hash2 = join ":", (stat _)[0,1,3,7,9]; 665 my $hash2 = join ":", (stat _)[0,1,3,7,9];
623 666
624 my $ndirs; 667 my $ndirs;
625 668
626 # take the slow route if anything looks fishy 669 # take the slow route if anything looks fishy
627 if ($hash1 ne $hash2 or (stat _)[9] == $now) { 670 if ($hash1 ne $hash2 or (stat _)[9] == $now) {
628 $ndirs = -1; 671 $ndirs = -1;
629 } else { 672 } else {
630 # if nlink == 2, we are finished 673 # if nlink == 2, we are finished
631 # on non-posix-fs's, we rely on nlink < 2 674 # on non-posix-fs's, we rely on nlink < 2
632 $ndirs = (stat _)[3] - 2 675 $ndirs = (stat _)[3] - 2
633 or return $grp->result ([], $entries); 676 or return $grp->result ([], $entries);
634 } 677 }
635 678
636 # sort into likely dirs and likely nondirs 679 # sort into likely dirs and likely nondirs
637 # dirs == files without ".", short entries first 680 # dirs == files without ".", short entries first
638 $entries = [map $_->[0], 681 $entries = [map $_->[0],
639 sort { $b->[1] cmp $a->[1] } 682 sort { $b->[1] cmp $a->[1] }
640 map [$_, sprintf "%s%04d", (/.\./ ? "1" : "0"), length], 683 map [$_, sprintf "%s%04d", (/.\./ ? "1" : "0"), length],
641 @$entries]; 684 @$entries];
642 685
643 my (@dirs, @nondirs); 686 my (@dirs, @nondirs);
644 687
645 my $statgrp = add $grp aio_group sub { 688 my $statgrp = add $grp aio_group sub {
646 $grp->result (\@dirs, \@nondirs); 689 $grp->result (\@dirs, \@nondirs);
647 }; 690 };
648 691
649 limit $statgrp $maxreq; 692 limit $statgrp $maxreq;
650 feed $statgrp sub { 693 feed $statgrp sub {
651 return unless @$entries; 694 return unless @$entries;
652 my $entry = pop @$entries; 695 my $entry = pop @$entries;
653 696
654 aioreq_pri $pri; 697 aioreq_pri $pri;
655 add $statgrp aio_stat "$path/$entry/.", sub { 698 add $statgrp aio_stat "$path/$entry/.", sub {
656 if ($_[0] < 0) { 699 if ($_[0] < 0) {
657 push @nondirs, $entry; 700 push @nondirs, $entry;
658 } else { 701 } else {
659 # need to check for real directory 702 # need to check for real directory
660 aioreq_pri $pri; 703 aioreq_pri $pri;
661 add $statgrp aio_lstat "$path/$entry", sub { 704 add $statgrp aio_lstat "$path/$entry", sub {
662 if (-d _) { 705 if (-d _) {
663 push @dirs, $entry; 706 push @dirs, $entry;
664 707
665 unless (--$ndirs) { 708 unless (--$ndirs) {
666 push @nondirs, @$entries; 709 push @nondirs, @$entries;
667 feed $statgrp; 710 feed $statgrp;
711 }
712 } else {
713 push @nondirs, $entry;
668 } 714 }
669 } else {
670 push @nondirs, $entry;
671 } 715 }
672 } 716 }
673 } 717 };
674 }; 718 };
675 }; 719 };
676 }; 720 };
677 }; 721 };
722
723 $grp
678 }; 724 }
725}
679 726
727=item aio_rmtree $path, $callback->($status)
728
729Delete a directory tree starting (and including) C<$path>, return the
730status of the final C<rmdir> only. This is a composite request that
731uses C<aio_scandir> to recurse into and rmdir directories, and unlink
732everything else.
733
734=cut
735
736sub aio_rmtree;
737sub aio_rmtree($;$) {
738 aio_block {
739 my ($path, $cb) = @_;
740
741 my $pri = aioreq_pri;
742 my $grp = aio_group $cb;
743
744 aioreq_pri $pri;
745 add $grp aio_scandir $path, 0, sub {
746 my ($dirs, $nondirs) = @_;
747
748 my $dirgrp = aio_group sub {
749 add $grp aio_rmdir $path, sub {
750 $grp->result ($_[0]);
751 };
752 };
753
754 (aioreq_pri $pri), add $dirgrp aio_rmtree "$path/$_" for @$dirs;
755 (aioreq_pri $pri), add $dirgrp aio_unlink "$path/$_" for @$nondirs;
756
757 add $grp $dirgrp;
758 };
759
680 $grp 760 $grp
761 }
681} 762}
682 763
683=item aio_fsync $fh, $callback->($status) 764=item aio_fsync $fh, $callback->($status)
684 765
685Asynchronously call fsync on the given filehandle and call the callback 766Asynchronously call fsync on the given filehandle and call the callback
964 poll => 'r', nice => 1, 1045 poll => 'r', nice => 1,
965 cb => &IO::AIO::poll_cb); 1046 cb => &IO::AIO::poll_cb);
966 1047
967=item IO::AIO::poll_wait 1048=item IO::AIO::poll_wait
968 1049
1050If there are any outstanding requests and none of them in the result
969Wait till the result filehandle becomes ready for reading (simply does a 1051phase, wait till the result filehandle becomes ready for reading (simply
970C<select> on the filehandle. This is useful if you want to synchronously 1052does a C<select> on the filehandle. This is useful if you want to
971wait for some requests to finish). 1053synchronously wait for some requests to finish).
972 1054
973See C<nreqs> for an example. 1055See C<nreqs> for an example.
974 1056
975=item IO::AIO::poll 1057=item IO::AIO::poll
976 1058
977Waits until some requests have been handled. 1059Waits until some requests have been handled.
978 1060
1061Returns the number of requests processed, but is otherwise strictly
979Strictly equivalent to: 1062equivalent to:
980 1063
981 IO::AIO::poll_wait, IO::AIO::poll_cb 1064 IO::AIO::poll_wait, IO::AIO::poll_cb
982 if IO::AIO::nreqs;
983 1065
984=item IO::AIO::flush 1066=item IO::AIO::flush
985 1067
986Wait till all outstanding AIO requests have been handled. 1068Wait till all outstanding AIO requests have been handled.
987 1069
1101 *$sym 1183 *$sym
1102} 1184}
1103 1185
1104min_parallel 8; 1186min_parallel 8;
1105 1187
1106END { 1188END { flush }
1107 min_parallel 1;
1108 flush;
1109};
1110 1189
11111; 11901;
1112 1191
1113=head2 FORK BEHAVIOUR 1192=head2 FORK BEHAVIOUR
1114 1193

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines