ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/IO-AIO/AIO.pm
(Generate patch)

Comparing IO-AIO/AIO.pm (file contents):
Revision 1.94 by root, Wed Nov 8 02:01:02 2006 UTC vs.
Revision 1.100 by root, Sun Jan 7 21:36:58 2007 UTC

190use strict 'vars'; 190use strict 'vars';
191 191
192use base 'Exporter'; 192use base 'Exporter';
193 193
194BEGIN { 194BEGIN {
195 our $VERSION = '2.2'; 195 our $VERSION = '2.32';
196 196
197 our @AIO_REQ = qw(aio_sendfile aio_read aio_write aio_open aio_close aio_stat 197 our @AIO_REQ = qw(aio_sendfile aio_read aio_write aio_open aio_close aio_stat
198 aio_lstat aio_unlink aio_rmdir aio_readdir aio_scandir aio_symlink 198 aio_lstat aio_unlink aio_rmdir aio_readdir aio_scandir aio_symlink
199 aio_readlink aio_fsync aio_fdatasync aio_readahead aio_rename aio_link 199 aio_readlink aio_fsync aio_fdatasync aio_readahead aio_rename aio_link
200 aio_move aio_copy aio_group aio_nop aio_mknod); 200 aio_move aio_copy aio_group aio_nop aio_mknod aio_load aio_rmtree);
201 our @EXPORT = (@AIO_REQ, qw(aioreq_pri aioreq_nice)); 201 our @EXPORT = (@AIO_REQ, qw(aioreq_pri aioreq_nice aio_block));
202 our @EXPORT_OK = qw(poll_fileno poll_cb poll_wait flush 202 our @EXPORT_OK = qw(poll_fileno poll_cb poll_wait flush
203 min_parallel max_parallel max_idle 203 min_parallel max_parallel max_idle
204 nreqs nready npending nthreads 204 nreqs nready npending nthreads
205 max_poll_time max_poll_reqs); 205 max_poll_time max_poll_reqs);
206 206
444sorted, and will B<NOT> include the C<.> and C<..> entries. 444sorted, and will B<NOT> include the C<.> and C<..> entries.
445 445
446The callback a single argument which is either C<undef> or an array-ref 446The callback a single argument which is either C<undef> or an array-ref
447with the filenames. 447with the filenames.
448 448
449=item aio_load $path, $data, $callback->($status)
450
451This is a composite request that tries to fully load the given file into
452memory. Status is the same as with aio_read.
453
454=cut
455
456sub aio_load($$;$) {
457 aio_block {
458 my ($path, undef, $cb) = @_;
459 my $data = \$_[1];
460
461 my $pri = aioreq_pri;
462 my $grp = aio_group $cb;
463
464 aioreq_pri $pri;
465 add $grp aio_open $path, O_RDONLY, 0, sub {
466 my ($fh) = @_
467 or return $grp->result (-1);
468
469 aioreq_pri $pri;
470 add $grp aio_read $fh, 0, (-s $fh), $$data, 0, sub {
471 $grp->result ($_[0]);
472 };
473 };
474
475 $grp
476 }
477}
478
449=item aio_copy $srcpath, $dstpath, $callback->($status) 479=item aio_copy $srcpath, $dstpath, $callback->($status)
450 480
451Try to copy the I<file> (directories not supported as either source or 481Try to copy the I<file> (directories not supported as either source or
452destination) from C<$srcpath> to C<$dstpath> and call the callback with 482destination) from C<$srcpath> to C<$dstpath> and call the callback with
453the C<0> (error) or C<-1> ok. 483the C<0> (error) or C<-1> ok.
462errors are being ignored. 492errors are being ignored.
463 493
464=cut 494=cut
465 495
466sub aio_copy($$;$) { 496sub aio_copy($$;$) {
497 aio_block {
467 my ($src, $dst, $cb) = @_; 498 my ($src, $dst, $cb) = @_;
468 499
469 my $pri = aioreq_pri; 500 my $pri = aioreq_pri;
470 my $grp = aio_group $cb; 501 my $grp = aio_group $cb;
471 502
472 aioreq_pri $pri; 503 aioreq_pri $pri;
473 add $grp aio_open $src, O_RDONLY, 0, sub { 504 add $grp aio_open $src, O_RDONLY, 0, sub {
474 if (my $src_fh = $_[0]) { 505 if (my $src_fh = $_[0]) {
475 my @stat = stat $src_fh; 506 my @stat = stat $src_fh;
476 507
477 aioreq_pri $pri; 508 aioreq_pri $pri;
478 add $grp aio_open $dst, O_CREAT | O_WRONLY | O_TRUNC, 0200, sub { 509 add $grp aio_open $dst, O_CREAT | O_WRONLY | O_TRUNC, 0200, sub {
479 if (my $dst_fh = $_[0]) { 510 if (my $dst_fh = $_[0]) {
480 aioreq_pri $pri; 511 aioreq_pri $pri;
481 add $grp aio_sendfile $dst_fh, $src_fh, 0, $stat[7], sub { 512 add $grp aio_sendfile $dst_fh, $src_fh, 0, $stat[7], sub {
482 if ($_[0] == $stat[7]) { 513 if ($_[0] == $stat[7]) {
483 $grp->result (0); 514 $grp->result (0);
484 close $src_fh; 515 close $src_fh;
485 516
486 # those should not normally block. should. should. 517 # those should not normally block. should. should.
487 utime $stat[8], $stat[9], $dst; 518 utime $stat[8], $stat[9], $dst;
488 chmod $stat[2] & 07777, $dst_fh; 519 chmod $stat[2] & 07777, $dst_fh;
489 chown $stat[4], $stat[5], $dst_fh; 520 chown $stat[4], $stat[5], $dst_fh;
490 close $dst_fh; 521 close $dst_fh;
491 } else { 522 } else {
492 $grp->result (-1); 523 $grp->result (-1);
493 close $src_fh; 524 close $src_fh;
494 close $dst_fh; 525 close $dst_fh;
495 526
496 aioreq $pri; 527 aioreq $pri;
497 add $grp aio_unlink $dst; 528 add $grp aio_unlink $dst;
529 }
498 } 530 };
531 } else {
532 $grp->result (-1);
499 }; 533 }
500 } else {
501 $grp->result (-1);
502 } 534 },
535
536 } else {
537 $grp->result (-1);
503 }, 538 }
504
505 } else {
506 $grp->result (-1);
507 } 539 };
540
541 $grp
508 }; 542 }
509
510 $grp
511} 543}
512 544
513=item aio_move $srcpath, $dstpath, $callback->($status) 545=item aio_move $srcpath, $dstpath, $callback->($status)
514 546
515Try to move the I<file> (directories not supported as either source or 547Try to move the I<file> (directories not supported as either source or
521that is successful, unlinking the C<$srcpath>. 553that is successful, unlinking the C<$srcpath>.
522 554
523=cut 555=cut
524 556
525sub aio_move($$;$) { 557sub aio_move($$;$) {
558 aio_block {
526 my ($src, $dst, $cb) = @_; 559 my ($src, $dst, $cb) = @_;
527 560
528 my $pri = aioreq_pri; 561 my $pri = aioreq_pri;
529 my $grp = aio_group $cb; 562 my $grp = aio_group $cb;
530 563
531 aioreq_pri $pri; 564 aioreq_pri $pri;
532 add $grp aio_rename $src, $dst, sub { 565 add $grp aio_rename $src, $dst, sub {
533 if ($_[0] && $! == EXDEV) { 566 if ($_[0] && $! == EXDEV) {
534 aioreq_pri $pri; 567 aioreq_pri $pri;
535 add $grp aio_copy $src, $dst, sub { 568 add $grp aio_copy $src, $dst, sub {
569 $grp->result ($_[0]);
570
571 if (!$_[0]) {
572 aioreq_pri $pri;
573 add $grp aio_unlink $src;
574 }
575 };
576 } else {
536 $grp->result ($_[0]); 577 $grp->result ($_[0]);
537
538 if (!$_[0]) {
539 aioreq_pri $pri;
540 add $grp aio_unlink $src;
541 }
542 }; 578 }
543 } else {
544 $grp->result ($_[0]);
545 } 579 };
580
581 $grp
546 }; 582 }
547
548 $grp
549} 583}
550 584
551=item aio_scandir $path, $maxreq, $callback->($dirs, $nondirs) 585=item aio_scandir $path, $maxreq, $callback->($dirs, $nondirs)
552 586
553Scans a directory (similar to C<aio_readdir>) but additionally tries to 587Scans a directory (similar to C<aio_readdir>) but additionally tries to
600as those tend to return 0 or 1 as link counts, which disables the 634as those tend to return 0 or 1 as link counts, which disables the
601directory counting heuristic. 635directory counting heuristic.
602 636
603=cut 637=cut
604 638
605sub aio_scandir($$$) { 639sub aio_scandir($$;$) {
640 aio_block {
606 my ($path, $maxreq, $cb) = @_; 641 my ($path, $maxreq, $cb) = @_;
607 642
608 my $pri = aioreq_pri; 643 my $pri = aioreq_pri;
609 644
610 my $grp = aio_group $cb; 645 my $grp = aio_group $cb;
611 646
612 $maxreq = 4 if $maxreq <= 0; 647 $maxreq = 4 if $maxreq <= 0;
613 648
614 # stat once 649 # stat once
615 aioreq_pri $pri;
616 add $grp aio_stat $path, sub {
617 return $grp->result () if $_[0];
618 my $now = time;
619 my $hash1 = join ":", (stat _)[0,1,3,7,9];
620
621 # read the directory entries
622 aioreq_pri $pri; 650 aioreq_pri $pri;
623 add $grp aio_readdir $path, sub { 651 add $grp aio_stat $path, sub {
624 my $entries = shift
625 or return $grp->result (); 652 return $grp->result () if $_[0];
653 my $now = time;
654 my $hash1 = join ":", (stat _)[0,1,3,7,9];
626 655
627 # stat the dir another time 656 # read the directory entries
628 aioreq_pri $pri; 657 aioreq_pri $pri;
658 add $grp aio_readdir $path, sub {
659 my $entries = shift
660 or return $grp->result ();
661
662 # stat the dir another time
663 aioreq_pri $pri;
629 add $grp aio_stat $path, sub { 664 add $grp aio_stat $path, sub {
630 my $hash2 = join ":", (stat _)[0,1,3,7,9]; 665 my $hash2 = join ":", (stat _)[0,1,3,7,9];
631 666
632 my $ndirs; 667 my $ndirs;
633 668
634 # take the slow route if anything looks fishy 669 # take the slow route if anything looks fishy
635 if ($hash1 ne $hash2 or (stat _)[9] == $now) { 670 if ($hash1 ne $hash2 or (stat _)[9] == $now) {
636 $ndirs = -1; 671 $ndirs = -1;
637 } else { 672 } else {
638 # if nlink == 2, we are finished 673 # if nlink == 2, we are finished
639 # on non-posix-fs's, we rely on nlink < 2 674 # on non-posix-fs's, we rely on nlink < 2
640 $ndirs = (stat _)[3] - 2 675 $ndirs = (stat _)[3] - 2
641 or return $grp->result ([], $entries); 676 or return $grp->result ([], $entries);
642 } 677 }
643 678
644 # sort into likely dirs and likely nondirs 679 # sort into likely dirs and likely nondirs
645 # dirs == files without ".", short entries first 680 # dirs == files without ".", short entries first
646 $entries = [map $_->[0], 681 $entries = [map $_->[0],
647 sort { $b->[1] cmp $a->[1] } 682 sort { $b->[1] cmp $a->[1] }
648 map [$_, sprintf "%s%04d", (/.\./ ? "1" : "0"), length], 683 map [$_, sprintf "%s%04d", (/.\./ ? "1" : "0"), length],
649 @$entries]; 684 @$entries];
650 685
651 my (@dirs, @nondirs); 686 my (@dirs, @nondirs);
652 687
653 my $statgrp = add $grp aio_group sub { 688 my $statgrp = add $grp aio_group sub {
654 $grp->result (\@dirs, \@nondirs); 689 $grp->result (\@dirs, \@nondirs);
655 }; 690 };
656 691
657 limit $statgrp $maxreq; 692 limit $statgrp $maxreq;
658 feed $statgrp sub { 693 feed $statgrp sub {
659 return unless @$entries; 694 return unless @$entries;
660 my $entry = pop @$entries; 695 my $entry = pop @$entries;
661 696
662 aioreq_pri $pri; 697 aioreq_pri $pri;
663 add $statgrp aio_stat "$path/$entry/.", sub { 698 add $statgrp aio_stat "$path/$entry/.", sub {
664 if ($_[0] < 0) { 699 if ($_[0] < 0) {
665 push @nondirs, $entry; 700 push @nondirs, $entry;
666 } else { 701 } else {
667 # need to check for real directory 702 # need to check for real directory
668 aioreq_pri $pri; 703 aioreq_pri $pri;
669 add $statgrp aio_lstat "$path/$entry", sub { 704 add $statgrp aio_lstat "$path/$entry", sub {
670 if (-d _) { 705 if (-d _) {
671 push @dirs, $entry; 706 push @dirs, $entry;
672 707
673 unless (--$ndirs) { 708 unless (--$ndirs) {
674 push @nondirs, @$entries; 709 push @nondirs, @$entries;
675 feed $statgrp; 710 feed $statgrp;
711 }
712 } else {
713 push @nondirs, $entry;
676 } 714 }
677 } else {
678 push @nondirs, $entry;
679 } 715 }
680 } 716 }
681 } 717 };
682 }; 718 };
683 }; 719 };
684 }; 720 };
685 }; 721 };
722
723 $grp
686 }; 724 }
725}
687 726
727=item aio_rmtree $path, $callback->($status)
728
729Delete a directory tree starting (and including) C<$path>, return the
730status of the final C<rmdir> only. This is a composite request that
731uses C<aio_scandir> to recurse into and rmdir directories, and unlink
732everything else.
733
734=cut
735
736sub aio_rmtree;
737sub aio_rmtree($;$) {
738 aio_block {
739 my ($path, $cb) = @_;
740
741 my $pri = aioreq_pri;
742 my $grp = aio_group $cb;
743
744 aioreq_pri $pri;
745 add $grp aio_scandir $path, 0, sub {
746 my ($dirs, $nondirs) = @_;
747
748 my $dirgrp = aio_group sub {
749 add $grp aio_rmdir $path, sub {
750 $grp->result ($_[0]);
751 };
752 };
753
754 (aioreq_pri $pri), add $dirgrp aio_rmtree "$path/$_" for @$dirs;
755 (aioreq_pri $pri), add $dirgrp aio_unlink "$path/$_" for @$nondirs;
756
757 add $grp $dirgrp;
758 };
759
688 $grp 760 $grp
761 }
689} 762}
690 763
691=item aio_fsync $fh, $callback->($status) 764=item aio_fsync $fh, $callback->($status)
692 765
693Asynchronously call fsync on the given filehandle and call the callback 766Asynchronously call fsync on the given filehandle and call the callback
1110 *$sym 1183 *$sym
1111} 1184}
1112 1185
1113min_parallel 8; 1186min_parallel 8;
1114 1187
1115END { 1188END { flush }
1116 min_parallel 1;
1117 flush;
1118};
1119 1189
11201; 11901;
1121 1191
1122=head2 FORK BEHAVIOUR 1192=head2 FORK BEHAVIOUR
1123 1193

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines