ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/IO-AIO/AIO.pm
(Generate patch)

Comparing IO-AIO/AIO.pm (file contents):
Revision 1.120 by root, Sun Dec 2 21:51:36 2007 UTC vs.
Revision 1.124 by root, Sat May 10 19:25:33 2008 UTC

194use strict 'vars'; 194use strict 'vars';
195 195
196use base 'Exporter'; 196use base 'Exporter';
197 197
198BEGIN { 198BEGIN {
199 our $VERSION = '2.6'; 199 our $VERSION = '3.0';
200 200
201 our @AIO_REQ = qw(aio_sendfile aio_read aio_write aio_open aio_close 201 our @AIO_REQ = qw(aio_sendfile aio_read aio_write aio_open aio_close
202 aio_stat aio_lstat aio_unlink aio_rmdir aio_readdir 202 aio_stat aio_lstat aio_unlink aio_rmdir aio_readdir
203 aio_scandir aio_symlink aio_readlink aio_sync aio_fsync 203 aio_scandir aio_symlink aio_readlink aio_sync aio_fsync
204 aio_fdatasync aio_pathsync aio_readahead 204 aio_fdatasync aio_pathsync aio_readahead
205 aio_rename aio_link aio_move aio_copy aio_group 205 aio_rename aio_link aio_move aio_copy aio_group
206 aio_nop aio_mknod aio_load aio_rmtree aio_mkdir aio_chown 206 aio_nop aio_mknod aio_load aio_rmtree aio_mkdir aio_chown
207 aio_chmod aio_utime aio_truncate); 207 aio_chmod aio_utime aio_truncate);
208 208
209 our @EXPORT = (@AIO_REQ, qw(aioreq_pri aioreq_nice aio_block)); 209 our @EXPORT = (@AIO_REQ, qw(aioreq_pri aioreq_nice));
210 our @EXPORT_OK = qw(poll_fileno poll_cb poll_wait flush 210 our @EXPORT_OK = qw(poll_fileno poll_cb poll_wait flush
211 min_parallel max_parallel max_idle 211 min_parallel max_parallel max_idle
212 nreqs nready npending nthreads 212 nreqs nready npending nthreads
213 max_poll_time max_poll_reqs); 213 max_poll_time max_poll_reqs);
214 214
322 322
323Asynchronously close a file and call the callback with the result 323Asynchronously close a file and call the callback with the result
324code. 324code.
325 325
326Unfortunately, you can't do this to perl. Perl I<insists> very strongly on 326Unfortunately, you can't do this to perl. Perl I<insists> very strongly on
327closing the file descriptor associated with the filehandle itself. Here is 327closing the file descriptor associated with the filehandle itself.
328what aio_close will try:
329 328
330 1. dup()licate the fd 329Therefore, C<aio_close> will not close the filehandle - instead it will
331 2. asynchronously close() the duplicated fd 330use dup2 to overwrite the file descriptor with the write-end of a pipe
332 3. dup()licate the fd once more 331(the pipe fd will be created on demand and will be cached).
333 4. let perl close() the filehandle
334 5. asynchronously close the duplicated fd
335 332
336The idea is that the first close() flushes stuff to disk that closing an 333Or in other words: the file descriptor will be closed, but it will not be
337fd will flush, so when perl closes the fd, nothing much will need to be 334free for reuse until the perl filehandle is closed.
338flushed. The second async. close() will then flush stuff to disk that
339closing the last fd to the file will flush.
340
341Just FYI, SuSv3 has this to say on close:
342
343 All outstanding record locks owned by the process on the file
344 associated with the file descriptor shall be removed.
345
346 If fildes refers to a socket, close() shall cause the socket to be
347 destroyed. ... close() shall block for up to the current linger
348 interval until all data is transmitted.
349 [this actually sounds like a specification bug, but who knows]
350
351And at least Linux additionally actually flushes stuff on every close,
352even when the file itself is still open.
353
354Sounds enourmously inefficient and complicated? Yes... please show me how
355to nuke perl's fd out of existence...
356 335
357=cut 336=cut
358
359sub aio_close($;$) {
360 aio_block {
361 my ($fh, $cb) = @_;
362
363 my $pri = aioreq_pri;
364 my $grp = aio_group $cb;
365
366 my $fd = fileno $fh;
367
368 defined $fd or Carp::croak "aio_close called with fd-less filehandle";
369
370 # if the dups fail we will simply get EBADF
371 my $fd2 = _dup $fd;
372 aioreq_pri $pri;
373 add $grp _aio_close $fd2, sub {
374 my $fd2 = _dup $fd;
375 close $fh;
376 aioreq_pri $pri;
377 add $grp _aio_close $fd2, sub {
378 $grp->result ($_[0]);
379 };
380 };
381
382 $grp
383 }
384}
385
386 337
387=item aio_read $fh,$offset,$length, $data,$dataoffset, $callback->($retval) 338=item aio_read $fh,$offset,$length, $data,$dataoffset, $callback->($retval)
388 339
389=item aio_write $fh,$offset,$length, $data,$dataoffset, $callback->($retval) 340=item aio_write $fh,$offset,$length, $data,$dataoffset, $callback->($retval)
390 341
589memory. Status is the same as with aio_read. 540memory. Status is the same as with aio_read.
590 541
591=cut 542=cut
592 543
593sub aio_load($$;$) { 544sub aio_load($$;$) {
594 aio_block {
595 my ($path, undef, $cb) = @_; 545 my ($path, undef, $cb) = @_;
596 my $data = \$_[1]; 546 my $data = \$_[1];
597 547
598 my $pri = aioreq_pri; 548 my $pri = aioreq_pri;
599 my $grp = aio_group $cb; 549 my $grp = aio_group $cb;
550
551 aioreq_pri $pri;
552 add $grp aio_open $path, O_RDONLY, 0, sub {
553 my $fh = shift
554 or return $grp->result (-1);
600 555
601 aioreq_pri $pri; 556 aioreq_pri $pri;
602 add $grp aio_open $path, O_RDONLY, 0, sub {
603 my $fh = shift
604 or return $grp->result (-1);
605
606 aioreq_pri $pri;
607 add $grp aio_read $fh, 0, (-s $fh), $$data, 0, sub { 557 add $grp aio_read $fh, 0, (-s $fh), $$data, 0, sub {
608 $grp->result ($_[0]); 558 $grp->result ($_[0]);
609 };
610 }; 559 };
611
612 $grp
613 } 560 };
561
562 $grp
614} 563}
615 564
616=item aio_copy $srcpath, $dstpath, $callback->($status) 565=item aio_copy $srcpath, $dstpath, $callback->($status)
617 566
618Try to copy the I<file> (directories not supported as either source or 567Try to copy the I<file> (directories not supported as either source or
629errors are being ignored. 578errors are being ignored.
630 579
631=cut 580=cut
632 581
633sub aio_copy($$;$) { 582sub aio_copy($$;$) {
634 aio_block {
635 my ($src, $dst, $cb) = @_; 583 my ($src, $dst, $cb) = @_;
636 584
637 my $pri = aioreq_pri; 585 my $pri = aioreq_pri;
638 my $grp = aio_group $cb; 586 my $grp = aio_group $cb;
639 587
640 aioreq_pri $pri; 588 aioreq_pri $pri;
641 add $grp aio_open $src, O_RDONLY, 0, sub { 589 add $grp aio_open $src, O_RDONLY, 0, sub {
642 if (my $src_fh = $_[0]) { 590 if (my $src_fh = $_[0]) {
643 my @stat = stat $src_fh; 591 my @stat = stat $src_fh;
644 592
645 aioreq_pri $pri; 593 aioreq_pri $pri;
646 add $grp aio_open $dst, O_CREAT | O_WRONLY | O_TRUNC, 0200, sub { 594 add $grp aio_open $dst, O_CREAT | O_WRONLY | O_TRUNC, 0200, sub {
647 if (my $dst_fh = $_[0]) { 595 if (my $dst_fh = $_[0]) {
648 aioreq_pri $pri; 596 aioreq_pri $pri;
649 add $grp aio_sendfile $dst_fh, $src_fh, 0, $stat[7], sub { 597 add $grp aio_sendfile $dst_fh, $src_fh, 0, $stat[7], sub {
650 if ($_[0] == $stat[7]) { 598 if ($_[0] == $stat[7]) {
651 $grp->result (0); 599 $grp->result (0);
652 close $src_fh; 600 close $src_fh;
653 601
654 # those should not normally block. should. should. 602 # those should not normally block. should. should.
655 utime $stat[8], $stat[9], $dst; 603 utime $stat[8], $stat[9], $dst;
656 chmod $stat[2] & 07777, $dst_fh; 604 chmod $stat[2] & 07777, $dst_fh;
657 chown $stat[4], $stat[5], $dst_fh; 605 chown $stat[4], $stat[5], $dst_fh;
658 606
659 aioreq_pri $pri; 607 aioreq_pri $pri;
660 add $grp aio_close $dst_fh; 608 add $grp aio_close $dst_fh;
661 } else { 609 } else {
662 $grp->result (-1); 610 $grp->result (-1);
663 close $src_fh; 611 close $src_fh;
664 close $dst_fh; 612 close $dst_fh;
665 613
666 aioreq $pri; 614 aioreq $pri;
667 add $grp aio_unlink $dst; 615 add $grp aio_unlink $dst;
668 }
669 }; 616 }
670 } else {
671 $grp->result (-1);
672 } 617 };
618 } else {
619 $grp->result (-1);
673 }, 620 }
674
675 } else {
676 $grp->result (-1);
677 } 621 },
622
623 } else {
624 $grp->result (-1);
678 }; 625 }
679
680 $grp
681 } 626 };
627
628 $grp
682} 629}
683 630
684=item aio_move $srcpath, $dstpath, $callback->($status) 631=item aio_move $srcpath, $dstpath, $callback->($status)
685 632
686Try to move the I<file> (directories not supported as either source or 633Try to move the I<file> (directories not supported as either source or
692that is successful, unlinking the C<$srcpath>. 639that is successful, unlinking the C<$srcpath>.
693 640
694=cut 641=cut
695 642
696sub aio_move($$;$) { 643sub aio_move($$;$) {
697 aio_block {
698 my ($src, $dst, $cb) = @_; 644 my ($src, $dst, $cb) = @_;
699 645
700 my $pri = aioreq_pri; 646 my $pri = aioreq_pri;
701 my $grp = aio_group $cb; 647 my $grp = aio_group $cb;
702 648
703 aioreq_pri $pri; 649 aioreq_pri $pri;
704 add $grp aio_rename $src, $dst, sub { 650 add $grp aio_rename $src, $dst, sub {
705 if ($_[0] && $! == EXDEV) { 651 if ($_[0] && $! == EXDEV) {
706 aioreq_pri $pri; 652 aioreq_pri $pri;
707 add $grp aio_copy $src, $dst, sub { 653 add $grp aio_copy $src, $dst, sub {
708 $grp->result ($_[0]);
709
710 if (!$_[0]) {
711 aioreq_pri $pri;
712 add $grp aio_unlink $src;
713 }
714 };
715 } else {
716 $grp->result ($_[0]); 654 $grp->result ($_[0]);
655
656 if (!$_[0]) {
657 aioreq_pri $pri;
658 add $grp aio_unlink $src;
659 }
717 } 660 };
661 } else {
662 $grp->result ($_[0]);
718 }; 663 }
719
720 $grp
721 } 664 };
665
666 $grp
722} 667}
723 668
724=item aio_scandir $path, $maxreq, $callback->($dirs, $nondirs) 669=item aio_scandir $path, $maxreq, $callback->($dirs, $nondirs)
725 670
726Scans a directory (similar to C<aio_readdir>) but additionally tries to 671Scans a directory (similar to C<aio_readdir>) but additionally tries to
774directory counting heuristic. 719directory counting heuristic.
775 720
776=cut 721=cut
777 722
778sub aio_scandir($$;$) { 723sub aio_scandir($$;$) {
779 aio_block {
780 my ($path, $maxreq, $cb) = @_; 724 my ($path, $maxreq, $cb) = @_;
781 725
782 my $pri = aioreq_pri; 726 my $pri = aioreq_pri;
783 727
784 my $grp = aio_group $cb; 728 my $grp = aio_group $cb;
785 729
786 $maxreq = 4 if $maxreq <= 0; 730 $maxreq = 4 if $maxreq <= 0;
787 731
788 # stat once 732 # stat once
733 aioreq_pri $pri;
734 add $grp aio_stat $path, sub {
735 return $grp->result () if $_[0];
736 my $now = time;
737 my $hash1 = join ":", (stat _)[0,1,3,7,9];
738
739 # read the directory entries
789 aioreq_pri $pri; 740 aioreq_pri $pri;
790 add $grp aio_stat $path, sub { 741 add $grp aio_readdir $path, sub {
742 my $entries = shift
791 return $grp->result () if $_[0]; 743 or return $grp->result ();
792 my $now = time;
793 my $hash1 = join ":", (stat _)[0,1,3,7,9];
794 744
795 # read the directory entries 745 # stat the dir another time
796 aioreq_pri $pri; 746 aioreq_pri $pri;
797 add $grp aio_readdir $path, sub {
798 my $entries = shift
799 or return $grp->result ();
800
801 # stat the dir another time
802 aioreq_pri $pri;
803 add $grp aio_stat $path, sub { 747 add $grp aio_stat $path, sub {
804 my $hash2 = join ":", (stat _)[0,1,3,7,9]; 748 my $hash2 = join ":", (stat _)[0,1,3,7,9];
805 749
806 my $ndirs; 750 my $ndirs;
807 751
808 # take the slow route if anything looks fishy 752 # take the slow route if anything looks fishy
809 if ($hash1 ne $hash2 or (stat _)[9] == $now) { 753 if ($hash1 ne $hash2 or (stat _)[9] == $now) {
810 $ndirs = -1; 754 $ndirs = -1;
811 } else { 755 } else {
812 # if nlink == 2, we are finished 756 # if nlink == 2, we are finished
813 # on non-posix-fs's, we rely on nlink < 2 757 # on non-posix-fs's, we rely on nlink < 2
814 $ndirs = (stat _)[3] - 2 758 $ndirs = (stat _)[3] - 2
815 or return $grp->result ([], $entries); 759 or return $grp->result ([], $entries);
816 } 760 }
817 761
818 # sort into likely dirs and likely nondirs 762 # sort into likely dirs and likely nondirs
819 # dirs == files without ".", short entries first 763 # dirs == files without ".", short entries first
820 $entries = [map $_->[0], 764 $entries = [map $_->[0],
821 sort { $b->[1] cmp $a->[1] } 765 sort { $b->[1] cmp $a->[1] }
822 map [$_, sprintf "%s%04d", (/.\./ ? "1" : "0"), length], 766 map [$_, sprintf "%s%04d", (/.\./ ? "1" : "0"), length],
823 @$entries]; 767 @$entries];
824 768
825 my (@dirs, @nondirs); 769 my (@dirs, @nondirs);
826 770
827 my $statgrp = add $grp aio_group sub { 771 my $statgrp = add $grp aio_group sub {
828 $grp->result (\@dirs, \@nondirs); 772 $grp->result (\@dirs, \@nondirs);
829 }; 773 };
830 774
831 limit $statgrp $maxreq; 775 limit $statgrp $maxreq;
832 feed $statgrp sub { 776 feed $statgrp sub {
833 return unless @$entries; 777 return unless @$entries;
834 my $entry = pop @$entries; 778 my $entry = pop @$entries;
835 779
836 aioreq_pri $pri; 780 aioreq_pri $pri;
837 add $statgrp aio_stat "$path/$entry/.", sub { 781 add $statgrp aio_stat "$path/$entry/.", sub {
838 if ($_[0] < 0) { 782 if ($_[0] < 0) {
839 push @nondirs, $entry; 783 push @nondirs, $entry;
840 } else { 784 } else {
841 # need to check for real directory 785 # need to check for real directory
842 aioreq_pri $pri; 786 aioreq_pri $pri;
843 add $statgrp aio_lstat "$path/$entry", sub { 787 add $statgrp aio_lstat "$path/$entry", sub {
844 if (-d _) { 788 if (-d _) {
845 push @dirs, $entry; 789 push @dirs, $entry;
846 790
847 unless (--$ndirs) { 791 unless (--$ndirs) {
848 push @nondirs, @$entries; 792 push @nondirs, @$entries;
849 feed $statgrp; 793 feed $statgrp;
850 }
851 } else {
852 push @nondirs, $entry;
853 } 794 }
795 } else {
796 push @nondirs, $entry;
854 } 797 }
855 } 798 }
856 }; 799 }
857 }; 800 };
858 }; 801 };
859 }; 802 };
860 }; 803 };
861
862 $grp
863 } 804 };
805
806 $grp
864} 807}
865 808
866=item aio_rmtree $path, $callback->($status) 809=item aio_rmtree $path, $callback->($status)
867 810
868Delete a directory tree starting (and including) C<$path>, return the 811Delete a directory tree starting (and including) C<$path>, return the
872 815
873=cut 816=cut
874 817
875sub aio_rmtree; 818sub aio_rmtree;
876sub aio_rmtree($;$) { 819sub aio_rmtree($;$) {
877 aio_block {
878 my ($path, $cb) = @_; 820 my ($path, $cb) = @_;
879 821
880 my $pri = aioreq_pri; 822 my $pri = aioreq_pri;
881 my $grp = aio_group $cb; 823 my $grp = aio_group $cb;
882 824
883 aioreq_pri $pri; 825 aioreq_pri $pri;
884 add $grp aio_scandir $path, 0, sub { 826 add $grp aio_scandir $path, 0, sub {
885 my ($dirs, $nondirs) = @_; 827 my ($dirs, $nondirs) = @_;
886 828
887 my $dirgrp = aio_group sub { 829 my $dirgrp = aio_group sub {
888 add $grp aio_rmdir $path, sub { 830 add $grp aio_rmdir $path, sub {
889 $grp->result ($_[0]); 831 $grp->result ($_[0]);
890 };
891 }; 832 };
892
893 (aioreq_pri $pri), add $dirgrp aio_rmtree "$path/$_" for @$dirs;
894 (aioreq_pri $pri), add $dirgrp aio_unlink "$path/$_" for @$nondirs;
895
896 add $grp $dirgrp;
897 }; 833 };
898 834
899 $grp 835 (aioreq_pri $pri), add $dirgrp aio_rmtree "$path/$_" for @$dirs;
836 (aioreq_pri $pri), add $dirgrp aio_unlink "$path/$_" for @$nondirs;
837
838 add $grp $dirgrp;
900 } 839 };
840
841 $grp
901} 842}
902 843
903=item aio_sync $callback->($status) 844=item aio_sync $callback->($status)
904 845
905Asynchronously call sync and call the callback when finished. 846Asynchronously call sync and call the callback when finished.
929Passes C<0> when everything went ok, and C<-1> on error. 870Passes C<0> when everything went ok, and C<-1> on error.
930 871
931=cut 872=cut
932 873
933sub aio_pathsync($;$) { 874sub aio_pathsync($;$) {
934 aio_block {
935 my ($path, $cb) = @_; 875 my ($path, $cb) = @_;
936 876
937 my $pri = aioreq_pri; 877 my $pri = aioreq_pri;
938 my $grp = aio_group $cb; 878 my $grp = aio_group $cb;
939 879
940 aioreq_pri $pri; 880 aioreq_pri $pri;
941 add $grp aio_open $path, O_RDONLY, 0, sub { 881 add $grp aio_open $path, O_RDONLY, 0, sub {
942 my ($fh) = @_; 882 my ($fh) = @_;
943 if ($fh) { 883 if ($fh) {
884 aioreq_pri $pri;
885 add $grp aio_fsync $fh, sub {
886 $grp->result ($_[0]);
887
944 aioreq_pri $pri; 888 aioreq_pri $pri;
945 add $grp aio_fsync $fh, sub {
946 $grp->result ($_[0]);
947
948 aioreq_pri $pri;
949 add $grp aio_close $fh; 889 add $grp aio_close $fh;
950 };
951 } else {
952 $grp->result (-1);
953 } 890 };
891 } else {
892 $grp->result (-1);
954 }; 893 }
955
956 $grp
957 } 894 };
895
896 $grp
958} 897}
959 898
960=item aio_group $callback->(...) 899=item aio_group $callback->(...)
961 900
962This is a very special aio request: Instead of doing something, it is a 901This is a very special aio request: Instead of doing something, it is a
1308 1247
1309The default is probably ok in most situations, especially if thread 1248The default is probably ok in most situations, especially if thread
1310creation is fast. If thread creation is very slow on your system you might 1249creation is fast. If thread creation is very slow on your system you might
1311want to use larger values. 1250want to use larger values.
1312 1251
1313=item $oldmaxreqs = IO::AIO::max_outstanding $maxreqs 1252=item IO::AIO::max_outstanding $maxreqs
1314 1253
1315This is a very bad function to use in interactive programs because it 1254This is a very bad function to use in interactive programs because it
1316blocks, and a bad way to reduce concurrency because it is inexact: Better 1255blocks, and a bad way to reduce concurrency because it is inexact: Better
1317use an C<aio_group> together with a feed callback. 1256use an C<aio_group> together with a feed callback.
1318 1257
1323 1262
1324The default value is very large, so there is no practical limit on the 1263The default value is very large, so there is no practical limit on the
1325number of outstanding requests. 1264number of outstanding requests.
1326 1265
1327You can still queue as many requests as you want. Therefore, 1266You can still queue as many requests as you want. Therefore,
1328C<max_oustsanding> is mainly useful in simple scripts (with low values) or 1267C<max_outstanding> is mainly useful in simple scripts (with low values) or
1329as a stop gap to shield against fatal memory overflow (with large values). 1268as a stop gap to shield against fatal memory overflow (with large values).
1330 1269
1331=back 1270=back
1332 1271
1333=head3 STATISTICAL INFORMATION 1272=head3 STATISTICAL INFORMATION

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines