ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/IO-AIO/AIO.pm
(Generate patch)

Comparing IO-AIO/AIO.pm (file contents):
Revision 1.94 by root, Wed Nov 8 02:01:02 2006 UTC vs.
Revision 1.101 by root, Sun Jan 7 22:59:57 2007 UTC

190use strict 'vars'; 190use strict 'vars';
191 191
192use base 'Exporter'; 192use base 'Exporter';
193 193
194BEGIN { 194BEGIN {
195 our $VERSION = '2.2'; 195 our $VERSION = '2.32';
196 196
197 our @AIO_REQ = qw(aio_sendfile aio_read aio_write aio_open aio_close aio_stat 197 our @AIO_REQ = qw(aio_sendfile aio_read aio_write aio_open aio_close aio_stat
198 aio_lstat aio_unlink aio_rmdir aio_readdir aio_scandir aio_symlink 198 aio_lstat aio_unlink aio_rmdir aio_readdir aio_scandir aio_symlink
199 aio_readlink aio_fsync aio_fdatasync aio_readahead aio_rename aio_link 199 aio_readlink aio_fsync aio_fdatasync aio_readahead aio_rename aio_link
200 aio_move aio_copy aio_group aio_nop aio_mknod); 200 aio_move aio_copy aio_group aio_nop aio_mknod aio_load aio_rmtree aio_mkdir);
201 our @EXPORT = (@AIO_REQ, qw(aioreq_pri aioreq_nice)); 201 our @EXPORT = (@AIO_REQ, qw(aioreq_pri aioreq_nice aio_block));
202 our @EXPORT_OK = qw(poll_fileno poll_cb poll_wait flush 202 our @EXPORT_OK = qw(poll_fileno poll_cb poll_wait flush
203 min_parallel max_parallel max_idle 203 min_parallel max_parallel max_idle
204 nreqs nready npending nthreads 204 nreqs nready npending nthreads
205 max_poll_time max_poll_reqs); 205 max_poll_time max_poll_reqs);
206 206
290list. They are the same as used by C<sysopen>. 290list. They are the same as used by C<sysopen>.
291 291
292Likewise, C<$mode> specifies the mode of the newly created file, if it 292Likewise, C<$mode> specifies the mode of the newly created file, if it
293didn't exist and C<O_CREAT> has been given, just like perl's C<sysopen>, 293didn't exist and C<O_CREAT> has been given, just like perl's C<sysopen>,
294except that it is mandatory (i.e. use C<0> if you don't create new files, 294except that it is mandatory (i.e. use C<0> if you don't create new files,
295and C<0666> or C<0777> if you do). 295and C<0666> or C<0777> if you do). Note that the C<$mode> will be modified
296by the umask in effect then the request is being executed, so better never
297change the umask.
296 298
297Example: 299Example:
298 300
299 aio_open "/etc/passwd", O_RDONLY, 0, sub { 301 aio_open "/etc/passwd", O_RDONLY, 0, sub {
300 if ($_[0]) { 302 if ($_[0]) {
430=item aio_rename $srcpath, $dstpath, $callback->($status) 432=item aio_rename $srcpath, $dstpath, $callback->($status)
431 433
432Asynchronously rename the object at C<$srcpath> to C<$dstpath>, just as 434Asynchronously rename the object at C<$srcpath> to C<$dstpath>, just as
433rename(2) and call the callback with the result code. 435rename(2) and call the callback with the result code.
434 436
437=item aio_mkdir $pathname, $mode, $callback->($status)
438
439Asynchronously mkdir (create) a directory and call the callback with
440the result code. C<$mode> will be modified by the umask at the time the
441request is executed, so do not change your umask.
442
435=item aio_rmdir $pathname, $callback->($status) 443=item aio_rmdir $pathname, $callback->($status)
436 444
437Asynchronously rmdir (delete) a directory and call the callback with the 445Asynchronously rmdir (delete) a directory and call the callback with the
438result code. 446result code.
439 447
443directory (i.e. opendir + readdir + closedir). The entries will not be 451directory (i.e. opendir + readdir + closedir). The entries will not be
444sorted, and will B<NOT> include the C<.> and C<..> entries. 452sorted, and will B<NOT> include the C<.> and C<..> entries.
445 453
446The callback a single argument which is either C<undef> or an array-ref 454The callback a single argument which is either C<undef> or an array-ref
447with the filenames. 455with the filenames.
456
457=item aio_load $path, $data, $callback->($status)
458
459This is a composite request that tries to fully load the given file into
460memory. Status is the same as with aio_read.
461
462=cut
463
464sub aio_load($$;$) {
465 aio_block {
466 my ($path, undef, $cb) = @_;
467 my $data = \$_[1];
468
469 my $pri = aioreq_pri;
470 my $grp = aio_group $cb;
471
472 aioreq_pri $pri;
473 add $grp aio_open $path, O_RDONLY, 0, sub {
474 my ($fh) = @_
475 or return $grp->result (-1);
476
477 aioreq_pri $pri;
478 add $grp aio_read $fh, 0, (-s $fh), $$data, 0, sub {
479 $grp->result ($_[0]);
480 };
481 };
482
483 $grp
484 }
485}
448 486
449=item aio_copy $srcpath, $dstpath, $callback->($status) 487=item aio_copy $srcpath, $dstpath, $callback->($status)
450 488
451Try to copy the I<file> (directories not supported as either source or 489Try to copy the I<file> (directories not supported as either source or
452destination) from C<$srcpath> to C<$dstpath> and call the callback with 490destination) from C<$srcpath> to C<$dstpath> and call the callback with
462errors are being ignored. 500errors are being ignored.
463 501
464=cut 502=cut
465 503
466sub aio_copy($$;$) { 504sub aio_copy($$;$) {
505 aio_block {
467 my ($src, $dst, $cb) = @_; 506 my ($src, $dst, $cb) = @_;
468 507
469 my $pri = aioreq_pri; 508 my $pri = aioreq_pri;
470 my $grp = aio_group $cb; 509 my $grp = aio_group $cb;
471 510
472 aioreq_pri $pri; 511 aioreq_pri $pri;
473 add $grp aio_open $src, O_RDONLY, 0, sub { 512 add $grp aio_open $src, O_RDONLY, 0, sub {
474 if (my $src_fh = $_[0]) { 513 if (my $src_fh = $_[0]) {
475 my @stat = stat $src_fh; 514 my @stat = stat $src_fh;
476 515
477 aioreq_pri $pri; 516 aioreq_pri $pri;
478 add $grp aio_open $dst, O_CREAT | O_WRONLY | O_TRUNC, 0200, sub { 517 add $grp aio_open $dst, O_CREAT | O_WRONLY | O_TRUNC, 0200, sub {
479 if (my $dst_fh = $_[0]) { 518 if (my $dst_fh = $_[0]) {
480 aioreq_pri $pri; 519 aioreq_pri $pri;
481 add $grp aio_sendfile $dst_fh, $src_fh, 0, $stat[7], sub { 520 add $grp aio_sendfile $dst_fh, $src_fh, 0, $stat[7], sub {
482 if ($_[0] == $stat[7]) { 521 if ($_[0] == $stat[7]) {
483 $grp->result (0); 522 $grp->result (0);
484 close $src_fh; 523 close $src_fh;
485 524
486 # those should not normally block. should. should. 525 # those should not normally block. should. should.
487 utime $stat[8], $stat[9], $dst; 526 utime $stat[8], $stat[9], $dst;
488 chmod $stat[2] & 07777, $dst_fh; 527 chmod $stat[2] & 07777, $dst_fh;
489 chown $stat[4], $stat[5], $dst_fh; 528 chown $stat[4], $stat[5], $dst_fh;
490 close $dst_fh; 529 close $dst_fh;
491 } else { 530 } else {
492 $grp->result (-1); 531 $grp->result (-1);
493 close $src_fh; 532 close $src_fh;
494 close $dst_fh; 533 close $dst_fh;
495 534
496 aioreq $pri; 535 aioreq $pri;
497 add $grp aio_unlink $dst; 536 add $grp aio_unlink $dst;
537 }
498 } 538 };
539 } else {
540 $grp->result (-1);
499 }; 541 }
500 } else {
501 $grp->result (-1);
502 } 542 },
543
544 } else {
545 $grp->result (-1);
503 }, 546 }
504
505 } else {
506 $grp->result (-1);
507 } 547 };
548
549 $grp
508 }; 550 }
509
510 $grp
511} 551}
512 552
513=item aio_move $srcpath, $dstpath, $callback->($status) 553=item aio_move $srcpath, $dstpath, $callback->($status)
514 554
515Try to move the I<file> (directories not supported as either source or 555Try to move the I<file> (directories not supported as either source or
521that is successful, unlinking the C<$srcpath>. 561that is successful, unlinking the C<$srcpath>.
522 562
523=cut 563=cut
524 564
525sub aio_move($$;$) { 565sub aio_move($$;$) {
566 aio_block {
526 my ($src, $dst, $cb) = @_; 567 my ($src, $dst, $cb) = @_;
527 568
528 my $pri = aioreq_pri; 569 my $pri = aioreq_pri;
529 my $grp = aio_group $cb; 570 my $grp = aio_group $cb;
530 571
531 aioreq_pri $pri; 572 aioreq_pri $pri;
532 add $grp aio_rename $src, $dst, sub { 573 add $grp aio_rename $src, $dst, sub {
533 if ($_[0] && $! == EXDEV) { 574 if ($_[0] && $! == EXDEV) {
534 aioreq_pri $pri; 575 aioreq_pri $pri;
535 add $grp aio_copy $src, $dst, sub { 576 add $grp aio_copy $src, $dst, sub {
577 $grp->result ($_[0]);
578
579 if (!$_[0]) {
580 aioreq_pri $pri;
581 add $grp aio_unlink $src;
582 }
583 };
584 } else {
536 $grp->result ($_[0]); 585 $grp->result ($_[0]);
537
538 if (!$_[0]) {
539 aioreq_pri $pri;
540 add $grp aio_unlink $src;
541 }
542 }; 586 }
543 } else {
544 $grp->result ($_[0]);
545 } 587 };
588
589 $grp
546 }; 590 }
547
548 $grp
549} 591}
550 592
551=item aio_scandir $path, $maxreq, $callback->($dirs, $nondirs) 593=item aio_scandir $path, $maxreq, $callback->($dirs, $nondirs)
552 594
553Scans a directory (similar to C<aio_readdir>) but additionally tries to 595Scans a directory (similar to C<aio_readdir>) but additionally tries to
600as those tend to return 0 or 1 as link counts, which disables the 642as those tend to return 0 or 1 as link counts, which disables the
601directory counting heuristic. 643directory counting heuristic.
602 644
603=cut 645=cut
604 646
605sub aio_scandir($$$) { 647sub aio_scandir($$;$) {
648 aio_block {
606 my ($path, $maxreq, $cb) = @_; 649 my ($path, $maxreq, $cb) = @_;
607 650
608 my $pri = aioreq_pri; 651 my $pri = aioreq_pri;
609 652
610 my $grp = aio_group $cb; 653 my $grp = aio_group $cb;
611 654
612 $maxreq = 4 if $maxreq <= 0; 655 $maxreq = 4 if $maxreq <= 0;
613 656
614 # stat once 657 # stat once
615 aioreq_pri $pri;
616 add $grp aio_stat $path, sub {
617 return $grp->result () if $_[0];
618 my $now = time;
619 my $hash1 = join ":", (stat _)[0,1,3,7,9];
620
621 # read the directory entries
622 aioreq_pri $pri; 658 aioreq_pri $pri;
623 add $grp aio_readdir $path, sub { 659 add $grp aio_stat $path, sub {
624 my $entries = shift
625 or return $grp->result (); 660 return $grp->result () if $_[0];
661 my $now = time;
662 my $hash1 = join ":", (stat _)[0,1,3,7,9];
626 663
627 # stat the dir another time 664 # read the directory entries
628 aioreq_pri $pri; 665 aioreq_pri $pri;
666 add $grp aio_readdir $path, sub {
667 my $entries = shift
668 or return $grp->result ();
669
670 # stat the dir another time
671 aioreq_pri $pri;
629 add $grp aio_stat $path, sub { 672 add $grp aio_stat $path, sub {
630 my $hash2 = join ":", (stat _)[0,1,3,7,9]; 673 my $hash2 = join ":", (stat _)[0,1,3,7,9];
631 674
632 my $ndirs; 675 my $ndirs;
633 676
634 # take the slow route if anything looks fishy 677 # take the slow route if anything looks fishy
635 if ($hash1 ne $hash2 or (stat _)[9] == $now) { 678 if ($hash1 ne $hash2 or (stat _)[9] == $now) {
636 $ndirs = -1; 679 $ndirs = -1;
637 } else { 680 } else {
638 # if nlink == 2, we are finished 681 # if nlink == 2, we are finished
639 # on non-posix-fs's, we rely on nlink < 2 682 # on non-posix-fs's, we rely on nlink < 2
640 $ndirs = (stat _)[3] - 2 683 $ndirs = (stat _)[3] - 2
641 or return $grp->result ([], $entries); 684 or return $grp->result ([], $entries);
642 } 685 }
643 686
644 # sort into likely dirs and likely nondirs 687 # sort into likely dirs and likely nondirs
645 # dirs == files without ".", short entries first 688 # dirs == files without ".", short entries first
646 $entries = [map $_->[0], 689 $entries = [map $_->[0],
647 sort { $b->[1] cmp $a->[1] } 690 sort { $b->[1] cmp $a->[1] }
648 map [$_, sprintf "%s%04d", (/.\./ ? "1" : "0"), length], 691 map [$_, sprintf "%s%04d", (/.\./ ? "1" : "0"), length],
649 @$entries]; 692 @$entries];
650 693
651 my (@dirs, @nondirs); 694 my (@dirs, @nondirs);
652 695
653 my $statgrp = add $grp aio_group sub { 696 my $statgrp = add $grp aio_group sub {
654 $grp->result (\@dirs, \@nondirs); 697 $grp->result (\@dirs, \@nondirs);
655 }; 698 };
656 699
657 limit $statgrp $maxreq; 700 limit $statgrp $maxreq;
658 feed $statgrp sub { 701 feed $statgrp sub {
659 return unless @$entries; 702 return unless @$entries;
660 my $entry = pop @$entries; 703 my $entry = pop @$entries;
661 704
662 aioreq_pri $pri; 705 aioreq_pri $pri;
663 add $statgrp aio_stat "$path/$entry/.", sub { 706 add $statgrp aio_stat "$path/$entry/.", sub {
664 if ($_[0] < 0) { 707 if ($_[0] < 0) {
665 push @nondirs, $entry; 708 push @nondirs, $entry;
666 } else { 709 } else {
667 # need to check for real directory 710 # need to check for real directory
668 aioreq_pri $pri; 711 aioreq_pri $pri;
669 add $statgrp aio_lstat "$path/$entry", sub { 712 add $statgrp aio_lstat "$path/$entry", sub {
670 if (-d _) { 713 if (-d _) {
671 push @dirs, $entry; 714 push @dirs, $entry;
672 715
673 unless (--$ndirs) { 716 unless (--$ndirs) {
674 push @nondirs, @$entries; 717 push @nondirs, @$entries;
675 feed $statgrp; 718 feed $statgrp;
719 }
720 } else {
721 push @nondirs, $entry;
676 } 722 }
677 } else {
678 push @nondirs, $entry;
679 } 723 }
680 } 724 }
681 } 725 };
682 }; 726 };
683 }; 727 };
684 }; 728 };
685 }; 729 };
730
731 $grp
686 }; 732 }
733}
687 734
735=item aio_rmtree $path, $callback->($status)
736
737Delete a directory tree starting (and including) C<$path>, return the
738status of the final C<rmdir> only. This is a composite request that
739uses C<aio_scandir> to recurse into and rmdir directories, and unlink
740everything else.
741
742=cut
743
744sub aio_rmtree;
745sub aio_rmtree($;$) {
746 aio_block {
747 my ($path, $cb) = @_;
748
749 my $pri = aioreq_pri;
750 my $grp = aio_group $cb;
751
752 aioreq_pri $pri;
753 add $grp aio_scandir $path, 0, sub {
754 my ($dirs, $nondirs) = @_;
755
756 my $dirgrp = aio_group sub {
757 add $grp aio_rmdir $path, sub {
758 $grp->result ($_[0]);
759 };
760 };
761
762 (aioreq_pri $pri), add $dirgrp aio_rmtree "$path/$_" for @$dirs;
763 (aioreq_pri $pri), add $dirgrp aio_unlink "$path/$_" for @$nondirs;
764
765 add $grp $dirgrp;
766 };
767
688 $grp 768 $grp
769 }
689} 770}
690 771
691=item aio_fsync $fh, $callback->($status) 772=item aio_fsync $fh, $callback->($status)
692 773
693Asynchronously call fsync on the given filehandle and call the callback 774Asynchronously call fsync on the given filehandle and call the callback
1110 *$sym 1191 *$sym
1111} 1192}
1112 1193
1113min_parallel 8; 1194min_parallel 8;
1114 1195
1115END { 1196END { flush }
1116 min_parallel 1;
1117 flush;
1118};
1119 1197
11201; 11981;
1121 1199
1122=head2 FORK BEHAVIOUR 1200=head2 FORK BEHAVIOUR
1123 1201

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines