ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/IO-AIO/AIO.pm
(Generate patch)

Comparing IO-AIO/AIO.pm (file contents):
Revision 1.91 by root, Mon Oct 30 23:30:29 2006 UTC vs.
Revision 1.100 by root, Sun Jan 7 21:36:58 2007 UTC

5=head1 SYNOPSIS 5=head1 SYNOPSIS
6 6
7 use IO::AIO; 7 use IO::AIO;
8 8
9 aio_open "/etc/passwd", O_RDONLY, 0, sub { 9 aio_open "/etc/passwd", O_RDONLY, 0, sub {
10 my ($fh) = @_; 10 my $fh = shift
11 or die "/etc/passwd: $!";
11 ... 12 ...
12 }; 13 };
13 14
14 aio_unlink "/tmp/file", sub { }; 15 aio_unlink "/tmp/file", sub { };
15 16
99 poll => 'r', 100 poll => 'r',
100 cb => \&IO::AIO::poll_cb); 101 cb => \&IO::AIO::poll_cb);
101 102
102 # queue the request to open /etc/passwd 103 # queue the request to open /etc/passwd
103 aio_open "/etc/passwd", O_RDONLY, 0, sub { 104 aio_open "/etc/passwd", O_RDONLY, 0, sub {
104 my $fh = $_[0] 105 my $fh = shift
105 or die "error while opening: $!"; 106 or die "error while opening: $!";
106 107
107 # stat'ing filehandles is generally non-blocking 108 # stat'ing filehandles is generally non-blocking
108 my $size = -s $fh; 109 my $size = -s $fh;
109 110
189use strict 'vars'; 190use strict 'vars';
190 191
191use base 'Exporter'; 192use base 'Exporter';
192 193
193BEGIN { 194BEGIN {
194 our $VERSION = '2.2'; 195 our $VERSION = '2.32';
195 196
196 our @AIO_REQ = qw(aio_sendfile aio_read aio_write aio_open aio_close aio_stat 197 our @AIO_REQ = qw(aio_sendfile aio_read aio_write aio_open aio_close aio_stat
197 aio_lstat aio_unlink aio_rmdir aio_readdir aio_scandir aio_symlink 198 aio_lstat aio_unlink aio_rmdir aio_readdir aio_scandir aio_symlink
198 aio_readlink aio_fsync aio_fdatasync aio_readahead aio_rename aio_link 199 aio_readlink aio_fsync aio_fdatasync aio_readahead aio_rename aio_link
199 aio_move aio_copy aio_group aio_nop aio_mknod); 200 aio_move aio_copy aio_group aio_nop aio_mknod aio_load aio_rmtree);
200 our @EXPORT = (@AIO_REQ, qw(aioreq_pri aioreq_nice)); 201 our @EXPORT = (@AIO_REQ, qw(aioreq_pri aioreq_nice aio_block));
201 our @EXPORT_OK = qw(poll_fileno poll_cb poll_wait flush 202 our @EXPORT_OK = qw(poll_fileno poll_cb poll_wait flush
202 min_parallel max_parallel max_idle 203 min_parallel max_parallel max_idle
203 nreqs nready npending nthreads 204 nreqs nready npending nthreads
204 max_poll_time max_poll_reqs); 205 max_poll_time max_poll_reqs);
205 206
443sorted, and will B<NOT> include the C<.> and C<..> entries. 444sorted, and will B<NOT> include the C<.> and C<..> entries.
444 445
445The callback a single argument which is either C<undef> or an array-ref 446The callback a single argument which is either C<undef> or an array-ref
446with the filenames. 447with the filenames.
447 448
449=item aio_load $path, $data, $callback->($status)
450
451This is a composite request that tries to fully load the given file into
452memory. Status is the same as with aio_read.
453
454=cut
455
456sub aio_load($$;$) {
457 aio_block {
458 my ($path, undef, $cb) = @_;
459 my $data = \$_[1];
460
461 my $pri = aioreq_pri;
462 my $grp = aio_group $cb;
463
464 aioreq_pri $pri;
465 add $grp aio_open $path, O_RDONLY, 0, sub {
466 my ($fh) = @_
467 or return $grp->result (-1);
468
469 aioreq_pri $pri;
470 add $grp aio_read $fh, 0, (-s $fh), $$data, 0, sub {
471 $grp->result ($_[0]);
472 };
473 };
474
475 $grp
476 }
477}
478
448=item aio_copy $srcpath, $dstpath, $callback->($status) 479=item aio_copy $srcpath, $dstpath, $callback->($status)
449 480
450Try to copy the I<file> (directories not supported as either source or 481Try to copy the I<file> (directories not supported as either source or
451destination) from C<$srcpath> to C<$dstpath> and call the callback with 482destination) from C<$srcpath> to C<$dstpath> and call the callback with
452the C<0> (error) or C<-1> ok. 483the C<0> (error) or C<-1> ok.
461errors are being ignored. 492errors are being ignored.
462 493
463=cut 494=cut
464 495
465sub aio_copy($$;$) { 496sub aio_copy($$;$) {
497 aio_block {
466 my ($src, $dst, $cb) = @_; 498 my ($src, $dst, $cb) = @_;
467 499
468 my $pri = aioreq_pri; 500 my $pri = aioreq_pri;
469 my $grp = aio_group $cb; 501 my $grp = aio_group $cb;
470 502
471 aioreq_pri $pri; 503 aioreq_pri $pri;
472 add $grp aio_open $src, O_RDONLY, 0, sub { 504 add $grp aio_open $src, O_RDONLY, 0, sub {
473 if (my $src_fh = $_[0]) { 505 if (my $src_fh = $_[0]) {
474 my @stat = stat $src_fh; 506 my @stat = stat $src_fh;
475 507
476 aioreq_pri $pri; 508 aioreq_pri $pri;
477 add $grp aio_open $dst, O_CREAT | O_WRONLY | O_TRUNC, 0200, sub { 509 add $grp aio_open $dst, O_CREAT | O_WRONLY | O_TRUNC, 0200, sub {
478 if (my $dst_fh = $_[0]) { 510 if (my $dst_fh = $_[0]) {
479 aioreq_pri $pri; 511 aioreq_pri $pri;
480 add $grp aio_sendfile $dst_fh, $src_fh, 0, $stat[7], sub { 512 add $grp aio_sendfile $dst_fh, $src_fh, 0, $stat[7], sub {
481 if ($_[0] == $stat[7]) { 513 if ($_[0] == $stat[7]) {
482 $grp->result (0); 514 $grp->result (0);
483 close $src_fh; 515 close $src_fh;
484 516
485 # those should not normally block. should. should. 517 # those should not normally block. should. should.
486 utime $stat[8], $stat[9], $dst; 518 utime $stat[8], $stat[9], $dst;
487 chmod $stat[2] & 07777, $dst_fh; 519 chmod $stat[2] & 07777, $dst_fh;
488 chown $stat[4], $stat[5], $dst_fh; 520 chown $stat[4], $stat[5], $dst_fh;
489 close $dst_fh; 521 close $dst_fh;
490 } else { 522 } else {
491 $grp->result (-1); 523 $grp->result (-1);
492 close $src_fh; 524 close $src_fh;
493 close $dst_fh; 525 close $dst_fh;
494 526
495 aioreq $pri; 527 aioreq $pri;
496 add $grp aio_unlink $dst; 528 add $grp aio_unlink $dst;
529 }
497 } 530 };
531 } else {
532 $grp->result (-1);
498 }; 533 }
499 } else {
500 $grp->result (-1);
501 } 534 },
535
536 } else {
537 $grp->result (-1);
502 }, 538 }
503
504 } else {
505 $grp->result (-1);
506 } 539 };
540
541 $grp
507 }; 542 }
508
509 $grp
510} 543}
511 544
512=item aio_move $srcpath, $dstpath, $callback->($status) 545=item aio_move $srcpath, $dstpath, $callback->($status)
513 546
514Try to move the I<file> (directories not supported as either source or 547Try to move the I<file> (directories not supported as either source or
520that is successful, unlinking the C<$srcpath>. 553that is successful, unlinking the C<$srcpath>.
521 554
522=cut 555=cut
523 556
524sub aio_move($$;$) { 557sub aio_move($$;$) {
558 aio_block {
525 my ($src, $dst, $cb) = @_; 559 my ($src, $dst, $cb) = @_;
526 560
527 my $pri = aioreq_pri; 561 my $pri = aioreq_pri;
528 my $grp = aio_group $cb; 562 my $grp = aio_group $cb;
529 563
530 aioreq_pri $pri; 564 aioreq_pri $pri;
531 add $grp aio_rename $src, $dst, sub { 565 add $grp aio_rename $src, $dst, sub {
532 if ($_[0] && $! == EXDEV) { 566 if ($_[0] && $! == EXDEV) {
533 aioreq_pri $pri; 567 aioreq_pri $pri;
534 add $grp aio_copy $src, $dst, sub { 568 add $grp aio_copy $src, $dst, sub {
569 $grp->result ($_[0]);
570
571 if (!$_[0]) {
572 aioreq_pri $pri;
573 add $grp aio_unlink $src;
574 }
575 };
576 } else {
535 $grp->result ($_[0]); 577 $grp->result ($_[0]);
536
537 if (!$_[0]) {
538 aioreq_pri $pri;
539 add $grp aio_unlink $src;
540 }
541 }; 578 }
542 } else {
543 $grp->result ($_[0]);
544 } 579 };
580
581 $grp
545 }; 582 }
546
547 $grp
548} 583}
549 584
550=item aio_scandir $path, $maxreq, $callback->($dirs, $nondirs) 585=item aio_scandir $path, $maxreq, $callback->($dirs, $nondirs)
551 586
552Scans a directory (similar to C<aio_readdir>) but additionally tries to 587Scans a directory (similar to C<aio_readdir>) but additionally tries to
599as those tend to return 0 or 1 as link counts, which disables the 634as those tend to return 0 or 1 as link counts, which disables the
600directory counting heuristic. 635directory counting heuristic.
601 636
602=cut 637=cut
603 638
604sub aio_scandir($$$) { 639sub aio_scandir($$;$) {
640 aio_block {
605 my ($path, $maxreq, $cb) = @_; 641 my ($path, $maxreq, $cb) = @_;
606 642
607 my $pri = aioreq_pri; 643 my $pri = aioreq_pri;
608 644
609 my $grp = aio_group $cb; 645 my $grp = aio_group $cb;
610 646
611 $maxreq = 4 if $maxreq <= 0; 647 $maxreq = 4 if $maxreq <= 0;
612 648
613 # stat once 649 # stat once
614 aioreq_pri $pri;
615 add $grp aio_stat $path, sub {
616 return $grp->result () if $_[0];
617 my $now = time;
618 my $hash1 = join ":", (stat _)[0,1,3,7,9];
619
620 # read the directory entries
621 aioreq_pri $pri; 650 aioreq_pri $pri;
622 add $grp aio_readdir $path, sub { 651 add $grp aio_stat $path, sub {
623 my $entries = shift
624 or return $grp->result (); 652 return $grp->result () if $_[0];
653 my $now = time;
654 my $hash1 = join ":", (stat _)[0,1,3,7,9];
625 655
626 # stat the dir another time 656 # read the directory entries
627 aioreq_pri $pri; 657 aioreq_pri $pri;
658 add $grp aio_readdir $path, sub {
659 my $entries = shift
660 or return $grp->result ();
661
662 # stat the dir another time
663 aioreq_pri $pri;
628 add $grp aio_stat $path, sub { 664 add $grp aio_stat $path, sub {
629 my $hash2 = join ":", (stat _)[0,1,3,7,9]; 665 my $hash2 = join ":", (stat _)[0,1,3,7,9];
630 666
631 my $ndirs; 667 my $ndirs;
632 668
633 # take the slow route if anything looks fishy 669 # take the slow route if anything looks fishy
634 if ($hash1 ne $hash2 or (stat _)[9] == $now) { 670 if ($hash1 ne $hash2 or (stat _)[9] == $now) {
635 $ndirs = -1; 671 $ndirs = -1;
636 } else { 672 } else {
637 # if nlink == 2, we are finished 673 # if nlink == 2, we are finished
638 # on non-posix-fs's, we rely on nlink < 2 674 # on non-posix-fs's, we rely on nlink < 2
639 $ndirs = (stat _)[3] - 2 675 $ndirs = (stat _)[3] - 2
640 or return $grp->result ([], $entries); 676 or return $grp->result ([], $entries);
641 } 677 }
642 678
643 # sort into likely dirs and likely nondirs 679 # sort into likely dirs and likely nondirs
644 # dirs == files without ".", short entries first 680 # dirs == files without ".", short entries first
645 $entries = [map $_->[0], 681 $entries = [map $_->[0],
646 sort { $b->[1] cmp $a->[1] } 682 sort { $b->[1] cmp $a->[1] }
647 map [$_, sprintf "%s%04d", (/.\./ ? "1" : "0"), length], 683 map [$_, sprintf "%s%04d", (/.\./ ? "1" : "0"), length],
648 @$entries]; 684 @$entries];
649 685
650 my (@dirs, @nondirs); 686 my (@dirs, @nondirs);
651 687
652 my $statgrp = add $grp aio_group sub { 688 my $statgrp = add $grp aio_group sub {
653 $grp->result (\@dirs, \@nondirs); 689 $grp->result (\@dirs, \@nondirs);
654 }; 690 };
655 691
656 limit $statgrp $maxreq; 692 limit $statgrp $maxreq;
657 feed $statgrp sub { 693 feed $statgrp sub {
658 return unless @$entries; 694 return unless @$entries;
659 my $entry = pop @$entries; 695 my $entry = pop @$entries;
660 696
661 aioreq_pri $pri; 697 aioreq_pri $pri;
662 add $statgrp aio_stat "$path/$entry/.", sub { 698 add $statgrp aio_stat "$path/$entry/.", sub {
663 if ($_[0] < 0) { 699 if ($_[0] < 0) {
664 push @nondirs, $entry; 700 push @nondirs, $entry;
665 } else { 701 } else {
666 # need to check for real directory 702 # need to check for real directory
667 aioreq_pri $pri; 703 aioreq_pri $pri;
668 add $statgrp aio_lstat "$path/$entry", sub { 704 add $statgrp aio_lstat "$path/$entry", sub {
669 if (-d _) { 705 if (-d _) {
670 push @dirs, $entry; 706 push @dirs, $entry;
671 707
672 unless (--$ndirs) { 708 unless (--$ndirs) {
673 push @nondirs, @$entries; 709 push @nondirs, @$entries;
674 feed $statgrp; 710 feed $statgrp;
711 }
712 } else {
713 push @nondirs, $entry;
675 } 714 }
676 } else {
677 push @nondirs, $entry;
678 } 715 }
679 } 716 }
680 } 717 };
681 }; 718 };
682 }; 719 };
683 }; 720 };
684 }; 721 };
722
723 $grp
685 }; 724 }
725}
686 726
727=item aio_rmtree $path, $callback->($status)
728
729Delete a directory tree starting (and including) C<$path>, return the
730status of the final C<rmdir> only. This is a composite request that
731uses C<aio_scandir> to recurse into and rmdir directories, and unlink
732everything else.
733
734=cut
735
736sub aio_rmtree;
737sub aio_rmtree($;$) {
738 aio_block {
739 my ($path, $cb) = @_;
740
741 my $pri = aioreq_pri;
742 my $grp = aio_group $cb;
743
744 aioreq_pri $pri;
745 add $grp aio_scandir $path, 0, sub {
746 my ($dirs, $nondirs) = @_;
747
748 my $dirgrp = aio_group sub {
749 add $grp aio_rmdir $path, sub {
750 $grp->result ($_[0]);
751 };
752 };
753
754 (aioreq_pri $pri), add $dirgrp aio_rmtree "$path/$_" for @$dirs;
755 (aioreq_pri $pri), add $dirgrp aio_unlink "$path/$_" for @$nondirs;
756
757 add $grp $dirgrp;
758 };
759
687 $grp 760 $grp
761 }
688} 762}
689 763
690=item aio_fsync $fh, $callback->($status) 764=item aio_fsync $fh, $callback->($status)
691 765
692Asynchronously call fsync on the given filehandle and call the callback 766Asynchronously call fsync on the given filehandle and call the callback
971 poll => 'r', nice => 1, 1045 poll => 'r', nice => 1,
972 cb => &IO::AIO::poll_cb); 1046 cb => &IO::AIO::poll_cb);
973 1047
974=item IO::AIO::poll_wait 1048=item IO::AIO::poll_wait
975 1049
1050If there are any outstanding requests and none of them in the result
976Wait till the result filehandle becomes ready for reading (simply does a 1051phase, wait till the result filehandle becomes ready for reading (simply
977C<select> on the filehandle. This is useful if you want to synchronously 1052does a C<select> on the filehandle. This is useful if you want to
978wait for some requests to finish). 1053synchronously wait for some requests to finish).
979 1054
980See C<nreqs> for an example. 1055See C<nreqs> for an example.
981 1056
982=item IO::AIO::poll 1057=item IO::AIO::poll
983 1058
984Waits until some requests have been handled. 1059Waits until some requests have been handled.
985 1060
1061Returns the number of requests processed, but is otherwise strictly
986Strictly equivalent to: 1062equivalent to:
987 1063
988 IO::AIO::poll_wait, IO::AIO::poll_cb 1064 IO::AIO::poll_wait, IO::AIO::poll_cb
989 if IO::AIO::nreqs;
990 1065
991=item IO::AIO::flush 1066=item IO::AIO::flush
992 1067
993Wait till all outstanding AIO requests have been handled. 1068Wait till all outstanding AIO requests have been handled.
994 1069
1108 *$sym 1183 *$sym
1109} 1184}
1110 1185
1111min_parallel 8; 1186min_parallel 8;
1112 1187
1113END { 1188END { flush }
1114 min_parallel 1;
1115 flush;
1116};
1117 1189
11181; 11901;
1119 1191
1120=head2 FORK BEHAVIOUR 1192=head2 FORK BEHAVIOUR
1121 1193

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines