ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/IO-AIO/AIO.pm
(Generate patch)

Comparing IO-AIO/AIO.pm (file contents):
Revision 1.93 by root, Wed Nov 8 01:59:58 2006 UTC vs.
Revision 1.99 by root, Sun Jan 7 21:32:20 2007 UTC

5=head1 SYNOPSIS 5=head1 SYNOPSIS
6 6
7 use IO::AIO; 7 use IO::AIO;
8 8
9 aio_open "/etc/passwd", O_RDONLY, 0, sub { 9 aio_open "/etc/passwd", O_RDONLY, 0, sub {
10 my ($fh) = @_; 10 my $fh = shift
11 or die "/etc/passwd: $!";
11 ... 12 ...
12 }; 13 };
13 14
14 aio_unlink "/tmp/file", sub { }; 15 aio_unlink "/tmp/file", sub { };
15 16
99 poll => 'r', 100 poll => 'r',
100 cb => \&IO::AIO::poll_cb); 101 cb => \&IO::AIO::poll_cb);
101 102
102 # queue the request to open /etc/passwd 103 # queue the request to open /etc/passwd
103 aio_open "/etc/passwd", O_RDONLY, 0, sub { 104 aio_open "/etc/passwd", O_RDONLY, 0, sub {
104 my $fh = $_[0] 105 my $fh = shift
105 or die "error while opening: $!"; 106 or die "error while opening: $!";
106 107
107 # stat'ing filehandles is generally non-blocking 108 # stat'ing filehandles is generally non-blocking
108 my $size = -s $fh; 109 my $size = -s $fh;
109 110
189use strict 'vars'; 190use strict 'vars';
190 191
191use base 'Exporter'; 192use base 'Exporter';
192 193
193BEGIN { 194BEGIN {
194 our $VERSION = '2.2'; 195 our $VERSION = '2.32';
195 196
196 our @AIO_REQ = qw(aio_sendfile aio_read aio_write aio_open aio_close aio_stat 197 our @AIO_REQ = qw(aio_sendfile aio_read aio_write aio_open aio_close aio_stat
197 aio_lstat aio_unlink aio_rmdir aio_readdir aio_scandir aio_symlink 198 aio_lstat aio_unlink aio_rmdir aio_readdir aio_scandir aio_symlink
198 aio_readlink aio_fsync aio_fdatasync aio_readahead aio_rename aio_link 199 aio_readlink aio_fsync aio_fdatasync aio_readahead aio_rename aio_link
199 aio_move aio_copy aio_group aio_nop aio_mknod); 200 aio_move aio_copy aio_group aio_nop aio_mknod aio_load aio_rmtree);
200 our @EXPORT = (@AIO_REQ, qw(aioreq_pri aioreq_nice)); 201 our @EXPORT = (@AIO_REQ, qw(aioreq_pri aioreq_nice aio_block));
201 our @EXPORT_OK = qw(poll_fileno poll_cb poll_wait flush 202 our @EXPORT_OK = qw(poll_fileno poll_cb poll_wait flush
202 min_parallel max_parallel max_idle 203 min_parallel max_parallel max_idle
203 nreqs nready npending nthreads 204 nreqs nready npending nthreads
204 max_poll_time max_poll_reqs); 205 max_poll_time max_poll_reqs);
205 206
443sorted, and will B<NOT> include the C<.> and C<..> entries. 444sorted, and will B<NOT> include the C<.> and C<..> entries.
444 445
445The callback a single argument which is either C<undef> or an array-ref 446The callback a single argument which is either C<undef> or an array-ref
446with the filenames. 447with the filenames.
447 448
449=item aio_load $path, $data, $callback->($status)
450
451This is a composite request that tries to fully load the given file into
452memory. Status is the same as with aio_read.
453
454=cut
455
456sub aio_load($$;$) {
457 aio_block {
458 my ($path, undef, $cb) = @_;
459 my $data = \$_[1];
460
461 my $pri = aioreq_pri;
462 my $grp = aio_group $cb;
463
464 aioreq_pri $pri;
465 add $grp aio_open $path, O_RDONLY, 0, sub {
466 my ($fh) = @_
467 or return $grp->result (-1);
468
469 aioreq_pri $pri;
470 add $grp aio_read $fh, 0, (-s $fh), $$data, 0, sub {
471 $grp->result ($_[0]);
472 };
473 };
474
475 $grp
476 }
477}
478
448=item aio_copy $srcpath, $dstpath, $callback->($status) 479=item aio_copy $srcpath, $dstpath, $callback->($status)
449 480
450Try to copy the I<file> (directories not supported as either source or 481Try to copy the I<file> (directories not supported as either source or
451destination) from C<$srcpath> to C<$dstpath> and call the callback with 482destination) from C<$srcpath> to C<$dstpath> and call the callback with
452the C<0> (error) or C<-1> ok. 483the C<0> (error) or C<-1> ok.
461errors are being ignored. 492errors are being ignored.
462 493
463=cut 494=cut
464 495
465sub aio_copy($$;$) { 496sub aio_copy($$;$) {
497 aio_block {
466 my ($src, $dst, $cb) = @_; 498 my ($src, $dst, $cb) = @_;
467 499
468 my $pri = aioreq_pri; 500 my $pri = aioreq_pri;
469 my $grp = aio_group $cb; 501 my $grp = aio_group $cb;
470 502
471 aioreq_pri $pri; 503 aioreq_pri $pri;
472 add $grp aio_open $src, O_RDONLY, 0, sub { 504 add $grp aio_open $src, O_RDONLY, 0, sub {
473 if (my $src_fh = $_[0]) { 505 if (my $src_fh = $_[0]) {
474 my @stat = stat $src_fh; 506 my @stat = stat $src_fh;
475 507
476 aioreq_pri $pri; 508 aioreq_pri $pri;
477 add $grp aio_open $dst, O_CREAT | O_WRONLY | O_TRUNC, 0200, sub { 509 add $grp aio_open $dst, O_CREAT | O_WRONLY | O_TRUNC, 0200, sub {
478 if (my $dst_fh = $_[0]) { 510 if (my $dst_fh = $_[0]) {
479 aioreq_pri $pri; 511 aioreq_pri $pri;
480 add $grp aio_sendfile $dst_fh, $src_fh, 0, $stat[7], sub { 512 add $grp aio_sendfile $dst_fh, $src_fh, 0, $stat[7], sub {
481 if ($_[0] == $stat[7]) { 513 if ($_[0] == $stat[7]) {
482 $grp->result (0); 514 $grp->result (0);
483 close $src_fh; 515 close $src_fh;
484 516
485 # those should not normally block. should. should. 517 # those should not normally block. should. should.
486 utime $stat[8], $stat[9], $dst; 518 utime $stat[8], $stat[9], $dst;
487 chmod $stat[2] & 07777, $dst_fh; 519 chmod $stat[2] & 07777, $dst_fh;
488 chown $stat[4], $stat[5], $dst_fh; 520 chown $stat[4], $stat[5], $dst_fh;
489 close $dst_fh; 521 close $dst_fh;
490 } else { 522 } else {
491 $grp->result (-1); 523 $grp->result (-1);
492 close $src_fh; 524 close $src_fh;
493 close $dst_fh; 525 close $dst_fh;
494 526
495 aioreq $pri; 527 aioreq $pri;
496 add $grp aio_unlink $dst; 528 add $grp aio_unlink $dst;
529 }
497 } 530 };
531 } else {
532 $grp->result (-1);
498 }; 533 }
499 } else {
500 $grp->result (-1);
501 } 534 },
535
536 } else {
537 $grp->result (-1);
502 }, 538 }
503
504 } else {
505 $grp->result (-1);
506 } 539 };
540
541 $grp
507 }; 542 }
508
509 $grp
510} 543}
511 544
512=item aio_move $srcpath, $dstpath, $callback->($status) 545=item aio_move $srcpath, $dstpath, $callback->($status)
513 546
514Try to move the I<file> (directories not supported as either source or 547Try to move the I<file> (directories not supported as either source or
520that is successful, unlinking the C<$srcpath>. 553that is successful, unlinking the C<$srcpath>.
521 554
522=cut 555=cut
523 556
524sub aio_move($$;$) { 557sub aio_move($$;$) {
558 aio_block {
525 my ($src, $dst, $cb) = @_; 559 my ($src, $dst, $cb) = @_;
526 560
527 my $pri = aioreq_pri; 561 my $pri = aioreq_pri;
528 my $grp = aio_group $cb; 562 my $grp = aio_group $cb;
529 563
530 aioreq_pri $pri; 564 aioreq_pri $pri;
531 add $grp aio_rename $src, $dst, sub { 565 add $grp aio_rename $src, $dst, sub {
532 if ($_[0] && $! == EXDEV) { 566 if ($_[0] && $! == EXDEV) {
533 aioreq_pri $pri; 567 aioreq_pri $pri;
534 add $grp aio_copy $src, $dst, sub { 568 add $grp aio_copy $src, $dst, sub {
569 $grp->result ($_[0]);
570
571 if (!$_[0]) {
572 aioreq_pri $pri;
573 add $grp aio_unlink $src;
574 }
575 };
576 } else {
535 $grp->result ($_[0]); 577 $grp->result ($_[0]);
536
537 if (!$_[0]) {
538 aioreq_pri $pri;
539 add $grp aio_unlink $src;
540 }
541 }; 578 }
542 } else {
543 $grp->result ($_[0]);
544 } 579 };
580
581 $grp
545 }; 582 }
546
547 $grp
548} 583}
549 584
550=item aio_scandir $path, $maxreq, $callback->($dirs, $nondirs) 585=item aio_scandir $path, $maxreq, $callback->($dirs, $nondirs)
551 586
552Scans a directory (similar to C<aio_readdir>) but additionally tries to 587Scans a directory (similar to C<aio_readdir>) but additionally tries to
600directory counting heuristic. 635directory counting heuristic.
601 636
602=cut 637=cut
603 638
604sub aio_scandir($$$) { 639sub aio_scandir($$$) {
640 aio_block {
605 my ($path, $maxreq, $cb) = @_; 641 my ($path, $maxreq, $cb) = @_;
606 642
607 my $pri = aioreq_pri; 643 my $pri = aioreq_pri;
608 644
609 my $grp = aio_group $cb; 645 my $grp = aio_group $cb;
610 646
611 $maxreq = 4 if $maxreq <= 0; 647 $maxreq = 4 if $maxreq <= 0;
612 648
613 # stat once 649 # stat once
614 aioreq_pri $pri;
615 add $grp aio_stat $path, sub {
616 return $grp->result () if $_[0];
617 my $now = time;
618 my $hash1 = join ":", (stat _)[0,1,3,7,9];
619
620 # read the directory entries
621 aioreq_pri $pri; 650 aioreq_pri $pri;
622 add $grp aio_readdir $path, sub { 651 add $grp aio_stat $path, sub {
623 my $entries = shift
624 or return $grp->result (); 652 return $grp->result () if $_[0];
653 my $now = time;
654 my $hash1 = join ":", (stat _)[0,1,3,7,9];
625 655
626 # stat the dir another time 656 # read the directory entries
627 aioreq_pri $pri; 657 aioreq_pri $pri;
658 add $grp aio_readdir $path, sub {
659 my $entries = shift
660 or return $grp->result ();
661
662 # stat the dir another time
663 aioreq_pri $pri;
628 add $grp aio_stat $path, sub { 664 add $grp aio_stat $path, sub {
629 my $hash2 = join ":", (stat _)[0,1,3,7,9]; 665 my $hash2 = join ":", (stat _)[0,1,3,7,9];
630 666
631 my $ndirs; 667 my $ndirs;
632 668
633 # take the slow route if anything looks fishy 669 # take the slow route if anything looks fishy
634 if ($hash1 ne $hash2 or (stat _)[9] == $now) { 670 if ($hash1 ne $hash2 or (stat _)[9] == $now) {
635 $ndirs = -1; 671 $ndirs = -1;
636 } else { 672 } else {
637 # if nlink == 2, we are finished 673 # if nlink == 2, we are finished
638 # on non-posix-fs's, we rely on nlink < 2 674 # on non-posix-fs's, we rely on nlink < 2
639 $ndirs = (stat _)[3] - 2 675 $ndirs = (stat _)[3] - 2
640 or return $grp->result ([], $entries); 676 or return $grp->result ([], $entries);
641 } 677 }
642 678
643 # sort into likely dirs and likely nondirs 679 # sort into likely dirs and likely nondirs
644 # dirs == files without ".", short entries first 680 # dirs == files without ".", short entries first
645 $entries = [map $_->[0], 681 $entries = [map $_->[0],
646 sort { $b->[1] cmp $a->[1] } 682 sort { $b->[1] cmp $a->[1] }
647 map [$_, sprintf "%s%04d", (/.\./ ? "1" : "0"), length], 683 map [$_, sprintf "%s%04d", (/.\./ ? "1" : "0"), length],
648 @$entries]; 684 @$entries];
649 685
650 my (@dirs, @nondirs); 686 my (@dirs, @nondirs);
651 687
652 my $statgrp = add $grp aio_group sub { 688 my $statgrp = add $grp aio_group sub {
653 $grp->result (\@dirs, \@nondirs); 689 $grp->result (\@dirs, \@nondirs);
654 }; 690 };
655 691
656 limit $statgrp $maxreq; 692 limit $statgrp $maxreq;
657 feed $statgrp sub { 693 feed $statgrp sub {
658 return unless @$entries; 694 return unless @$entries;
659 my $entry = pop @$entries; 695 my $entry = pop @$entries;
660 696
661 aioreq_pri $pri; 697 aioreq_pri $pri;
662 add $statgrp aio_stat "$path/$entry/.", sub { 698 add $statgrp aio_stat "$path/$entry/.", sub {
663 if ($_[0] < 0) { 699 if ($_[0] < 0) {
664 push @nondirs, $entry; 700 push @nondirs, $entry;
665 } else { 701 } else {
666 # need to check for real directory 702 # need to check for real directory
667 aioreq_pri $pri; 703 aioreq_pri $pri;
668 add $statgrp aio_lstat "$path/$entry", sub { 704 add $statgrp aio_lstat "$path/$entry", sub {
669 if (-d _) { 705 if (-d _) {
670 push @dirs, $entry; 706 push @dirs, $entry;
671 707
672 unless (--$ndirs) { 708 unless (--$ndirs) {
673 push @nondirs, @$entries; 709 push @nondirs, @$entries;
674 feed $statgrp; 710 feed $statgrp;
711 }
712 } else {
713 push @nondirs, $entry;
675 } 714 }
676 } else {
677 push @nondirs, $entry;
678 } 715 }
679 } 716 }
680 } 717 };
681 }; 718 };
682 }; 719 };
683 }; 720 };
684 }; 721 };
722
723 $grp
685 }; 724 }
725}
686 726
727=item aio_rmtree $path, $callback->($status)
728
729Delete a directory tree starting (and including) C<$path>, return the status of the final C<rmdir> only.
730This is a composite request that uses C<aio_scandir> to recurse into and rmdir directories, and
731unlink everything else.
732
733=cut
734
735sub aio_rmtree;
736sub aio_rmtree {
737 aio_block {
738 my ($path, $cb) = @_;
739
740 my $pri = aioreq_pri;
741 my $grp = aio_group $cb;
742
743 aioreq_pri $pri;
744 add $grp aio_scandir $path, 0, sub {
745 my ($dirs, $nondirs) = @_;
746
747 my $dirgrp = aio_group sub {
748 add $grp aio_rmdir $path, sub {
749 $grp->result ($_[0]);
750 };
751 };
752
753 (aioreq_pri $pri), add $dirgrp aio_rmtree "$path/$_" for @$dirs;
754 (aioreq_pri $pri), add $dirgrp aio_unlink "$path/$_" for @$nondirs;
755
756 add $grp $dirgrp;
757 };
758
687 $grp 759 $grp
760 }
688} 761}
689 762
690=item aio_fsync $fh, $callback->($status) 763=item aio_fsync $fh, $callback->($status)
691 764
692Asynchronously call fsync on the given filehandle and call the callback 765Asynchronously call fsync on the given filehandle and call the callback
1109 *$sym 1182 *$sym
1110} 1183}
1111 1184
1112min_parallel 8; 1185min_parallel 8;
1113 1186
1114END { 1187END { flush }
1115 min_parallel 1;
1116 flush;
1117};
1118 1188
11191; 11891;
1120 1190
1121=head2 FORK BEHAVIOUR 1191=head2 FORK BEHAVIOUR
1122 1192

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines