ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/IO-AIO/AIO.pm
(Generate patch)

Comparing IO-AIO/AIO.pm (file contents):
Revision 1.119 by root, Sun Dec 2 20:54:33 2007 UTC vs.
Revision 1.123 by root, Sat May 10 18:06:41 2008 UTC

194use strict 'vars'; 194use strict 'vars';
195 195
196use base 'Exporter'; 196use base 'Exporter';
197 197
198BEGIN { 198BEGIN {
199 our $VERSION = '2.6'; 199 our $VERSION = '2.62';
200 200
201 our @AIO_REQ = qw(aio_sendfile aio_read aio_write aio_open aio_close aio_stat 201 our @AIO_REQ = qw(aio_sendfile aio_read aio_write aio_open aio_close
202 aio_lstat aio_unlink aio_rmdir aio_readdir aio_scandir aio_symlink 202 aio_stat aio_lstat aio_unlink aio_rmdir aio_readdir
203 aio_readlink aio_sync aio_fsync aio_fdatasync aio_readahead aio_rename aio_link 203 aio_scandir aio_symlink aio_readlink aio_sync aio_fsync
204 aio_fdatasync aio_pathsync aio_readahead
205 aio_rename aio_link aio_move aio_copy aio_group
204 aio_move aio_copy aio_group aio_nop aio_mknod aio_load aio_rmtree aio_mkdir 206 aio_nop aio_mknod aio_load aio_rmtree aio_mkdir aio_chown
205 aio_chown aio_chmod aio_utime aio_truncate); 207 aio_chmod aio_utime aio_truncate);
208
206 our @EXPORT = (@AIO_REQ, qw(aioreq_pri aioreq_nice aio_block)); 209 our @EXPORT = (@AIO_REQ, qw(aioreq_pri aioreq_nice));
207 our @EXPORT_OK = qw(poll_fileno poll_cb poll_wait flush 210 our @EXPORT_OK = qw(poll_fileno poll_cb poll_wait flush
208 min_parallel max_parallel max_idle 211 min_parallel max_parallel max_idle
209 nreqs nready npending nthreads 212 nreqs nready npending nthreads
210 max_poll_time max_poll_reqs); 213 max_poll_time max_poll_reqs);
211 214
319 322
320Asynchronously close a file and call the callback with the result 323Asynchronously close a file and call the callback with the result
321code. 324code.
322 325
323Unfortunately, you can't do this to perl. Perl I<insists> very strongly on 326Unfortunately, you can't do this to perl. Perl I<insists> very strongly on
324closing the file descriptor associated with the filehandle itself. Here is 327closing the file descriptor associated with the filehandle itself.
325what aio_close will try:
326 328
327 1. dup()licate the fd 329Therefore, C<aio_close> will not close the filehandle - instead it will
328 2. asynchronously close() the duplicated fd 330use dup2 to overwrite the file descriptor with the write-end of a pipe
329 3. dup()licate the fd once more 331(the pipe fd will be created on demand and will be cached).
330 4. let perl close() the filehandle
331 5. asynchronously close the duplicated fd
332 332
333The idea is that the first close() flushes stuff to disk that closing an 333Or in other words: the file descriptor will be closed, but it will not be
334fd will flush, so when perl closes the fd, nothing much will need to be 334free for reuse until the perl filehandle is closed.
335flushed. The second async. close() will then flush stuff to disk that
336closing the last fd to the file will flush.
337
338Just FYI, SuSv3 has this to say on close:
339
340 All outstanding record locks owned by the process on the file
341 associated with the file descriptor shall be removed.
342
343 If fildes refers to a socket, close() shall cause the socket to be
344 destroyed. ... close() shall block for up to the current linger
345 interval until all data is transmitted.
346 [this actually sounds like a specification bug, but who knows]
347
348And at least Linux additionally actually flushes stuff on every close,
349even when the file itself is still open.
350
351Sounds enourmously inefficient and complicated? Yes... please show me how
352to nuke perl's fd out of existence...
353 335
354=cut 336=cut
355
356sub aio_close($;$) {
357 aio_block {
358 my ($fh, $cb) = @_;
359
360 my $pri = aioreq_pri;
361 my $grp = aio_group $cb;
362
363 my $fd = fileno $fh;
364
365 defined $fd or Carp::croak "aio_close called with fd-less filehandle";
366
367 # if the dups fail we will simply get EBADF
368 my $fd2 = _dup $fd;
369 aioreq_pri $pri;
370 add $grp _aio_close $fd2, sub {
371 my $fd2 = _dup $fd;
372 close $fh;
373 aioreq_pri $pri;
374 add $grp _aio_close $fd2, sub {
375 $grp->result ($_[0]);
376 };
377 };
378
379 $grp
380 }
381}
382
383 337
384=item aio_read $fh,$offset,$length, $data,$dataoffset, $callback->($retval) 338=item aio_read $fh,$offset,$length, $data,$dataoffset, $callback->($retval)
385 339
386=item aio_write $fh,$offset,$length, $data,$dataoffset, $callback->($retval) 340=item aio_write $fh,$offset,$length, $data,$dataoffset, $callback->($retval)
387 341
586memory. Status is the same as with aio_read. 540memory. Status is the same as with aio_read.
587 541
588=cut 542=cut
589 543
590sub aio_load($$;$) { 544sub aio_load($$;$) {
591 aio_block {
592 my ($path, undef, $cb) = @_; 545 my ($path, undef, $cb) = @_;
593 my $data = \$_[1]; 546 my $data = \$_[1];
594 547
595 my $pri = aioreq_pri; 548 my $pri = aioreq_pri;
596 my $grp = aio_group $cb; 549 my $grp = aio_group $cb;
550
551 aioreq_pri $pri;
552 add $grp aio_open $path, O_RDONLY, 0, sub {
553 my $fh = shift
554 or return $grp->result (-1);
597 555
598 aioreq_pri $pri; 556 aioreq_pri $pri;
599 add $grp aio_open $path, O_RDONLY, 0, sub {
600 my $fh = shift
601 or return $grp->result (-1);
602
603 aioreq_pri $pri;
604 add $grp aio_read $fh, 0, (-s $fh), $$data, 0, sub { 557 add $grp aio_read $fh, 0, (-s $fh), $$data, 0, sub {
605 $grp->result ($_[0]); 558 $grp->result ($_[0]);
606 };
607 }; 559 };
608
609 $grp
610 } 560 };
561
562 $grp
611} 563}
612 564
613=item aio_copy $srcpath, $dstpath, $callback->($status) 565=item aio_copy $srcpath, $dstpath, $callback->($status)
614 566
615Try to copy the I<file> (directories not supported as either source or 567Try to copy the I<file> (directories not supported as either source or
626errors are being ignored. 578errors are being ignored.
627 579
628=cut 580=cut
629 581
630sub aio_copy($$;$) { 582sub aio_copy($$;$) {
631 aio_block {
632 my ($src, $dst, $cb) = @_; 583 my ($src, $dst, $cb) = @_;
633 584
634 my $pri = aioreq_pri; 585 my $pri = aioreq_pri;
635 my $grp = aio_group $cb; 586 my $grp = aio_group $cb;
636 587
637 aioreq_pri $pri; 588 aioreq_pri $pri;
638 add $grp aio_open $src, O_RDONLY, 0, sub { 589 add $grp aio_open $src, O_RDONLY, 0, sub {
639 if (my $src_fh = $_[0]) { 590 if (my $src_fh = $_[0]) {
640 my @stat = stat $src_fh; 591 my @stat = stat $src_fh;
641 592
642 aioreq_pri $pri; 593 aioreq_pri $pri;
643 add $grp aio_open $dst, O_CREAT | O_WRONLY | O_TRUNC, 0200, sub { 594 add $grp aio_open $dst, O_CREAT | O_WRONLY | O_TRUNC, 0200, sub {
644 if (my $dst_fh = $_[0]) { 595 if (my $dst_fh = $_[0]) {
645 aioreq_pri $pri; 596 aioreq_pri $pri;
646 add $grp aio_sendfile $dst_fh, $src_fh, 0, $stat[7], sub { 597 add $grp aio_sendfile $dst_fh, $src_fh, 0, $stat[7], sub {
647 if ($_[0] == $stat[7]) { 598 if ($_[0] == $stat[7]) {
648 $grp->result (0); 599 $grp->result (0);
649 close $src_fh; 600 close $src_fh;
650 601
651 # those should not normally block. should. should. 602 # those should not normally block. should. should.
652 utime $stat[8], $stat[9], $dst; 603 utime $stat[8], $stat[9], $dst;
653 chmod $stat[2] & 07777, $dst_fh; 604 chmod $stat[2] & 07777, $dst_fh;
654 chown $stat[4], $stat[5], $dst_fh; 605 chown $stat[4], $stat[5], $dst_fh;
606
607 aioreq_pri $pri;
655 close $dst_fh; 608 add $grp aio_close $dst_fh;
656 } else { 609 } else {
657 $grp->result (-1); 610 $grp->result (-1);
658 close $src_fh; 611 close $src_fh;
659 close $dst_fh; 612 close $dst_fh;
660 613
661 aioreq $pri; 614 aioreq $pri;
662 add $grp aio_unlink $dst; 615 add $grp aio_unlink $dst;
663 }
664 }; 616 }
665 } else {
666 $grp->result (-1);
667 } 617 };
618 } else {
619 $grp->result (-1);
668 }, 620 }
669
670 } else {
671 $grp->result (-1);
672 } 621 },
622
623 } else {
624 $grp->result (-1);
673 }; 625 }
674
675 $grp
676 } 626 };
627
628 $grp
677} 629}
678 630
679=item aio_move $srcpath, $dstpath, $callback->($status) 631=item aio_move $srcpath, $dstpath, $callback->($status)
680 632
681Try to move the I<file> (directories not supported as either source or 633Try to move the I<file> (directories not supported as either source or
687that is successful, unlinking the C<$srcpath>. 639that is successful, unlinking the C<$srcpath>.
688 640
689=cut 641=cut
690 642
691sub aio_move($$;$) { 643sub aio_move($$;$) {
692 aio_block {
693 my ($src, $dst, $cb) = @_; 644 my ($src, $dst, $cb) = @_;
694 645
695 my $pri = aioreq_pri; 646 my $pri = aioreq_pri;
696 my $grp = aio_group $cb; 647 my $grp = aio_group $cb;
697 648
698 aioreq_pri $pri; 649 aioreq_pri $pri;
699 add $grp aio_rename $src, $dst, sub { 650 add $grp aio_rename $src, $dst, sub {
700 if ($_[0] && $! == EXDEV) { 651 if ($_[0] && $! == EXDEV) {
701 aioreq_pri $pri; 652 aioreq_pri $pri;
702 add $grp aio_copy $src, $dst, sub { 653 add $grp aio_copy $src, $dst, sub {
703 $grp->result ($_[0]);
704
705 if (!$_[0]) {
706 aioreq_pri $pri;
707 add $grp aio_unlink $src;
708 }
709 };
710 } else {
711 $grp->result ($_[0]); 654 $grp->result ($_[0]);
655
656 if (!$_[0]) {
657 aioreq_pri $pri;
658 add $grp aio_unlink $src;
659 }
712 } 660 };
661 } else {
662 $grp->result ($_[0]);
713 }; 663 }
714
715 $grp
716 } 664 };
665
666 $grp
717} 667}
718 668
719=item aio_scandir $path, $maxreq, $callback->($dirs, $nondirs) 669=item aio_scandir $path, $maxreq, $callback->($dirs, $nondirs)
720 670
721Scans a directory (similar to C<aio_readdir>) but additionally tries to 671Scans a directory (similar to C<aio_readdir>) but additionally tries to
769directory counting heuristic. 719directory counting heuristic.
770 720
771=cut 721=cut
772 722
773sub aio_scandir($$;$) { 723sub aio_scandir($$;$) {
774 aio_block {
775 my ($path, $maxreq, $cb) = @_; 724 my ($path, $maxreq, $cb) = @_;
776 725
777 my $pri = aioreq_pri; 726 my $pri = aioreq_pri;
778 727
779 my $grp = aio_group $cb; 728 my $grp = aio_group $cb;
780 729
781 $maxreq = 4 if $maxreq <= 0; 730 $maxreq = 4 if $maxreq <= 0;
782 731
783 # stat once 732 # stat once
733 aioreq_pri $pri;
734 add $grp aio_stat $path, sub {
735 return $grp->result () if $_[0];
736 my $now = time;
737 my $hash1 = join ":", (stat _)[0,1,3,7,9];
738
739 # read the directory entries
784 aioreq_pri $pri; 740 aioreq_pri $pri;
785 add $grp aio_stat $path, sub { 741 add $grp aio_readdir $path, sub {
742 my $entries = shift
786 return $grp->result () if $_[0]; 743 or return $grp->result ();
787 my $now = time;
788 my $hash1 = join ":", (stat _)[0,1,3,7,9];
789 744
790 # read the directory entries 745 # stat the dir another time
791 aioreq_pri $pri; 746 aioreq_pri $pri;
792 add $grp aio_readdir $path, sub {
793 my $entries = shift
794 or return $grp->result ();
795
796 # stat the dir another time
797 aioreq_pri $pri;
798 add $grp aio_stat $path, sub { 747 add $grp aio_stat $path, sub {
799 my $hash2 = join ":", (stat _)[0,1,3,7,9]; 748 my $hash2 = join ":", (stat _)[0,1,3,7,9];
800 749
801 my $ndirs; 750 my $ndirs;
802 751
803 # take the slow route if anything looks fishy 752 # take the slow route if anything looks fishy
804 if ($hash1 ne $hash2 or (stat _)[9] == $now) { 753 if ($hash1 ne $hash2 or (stat _)[9] == $now) {
805 $ndirs = -1; 754 $ndirs = -1;
806 } else { 755 } else {
807 # if nlink == 2, we are finished 756 # if nlink == 2, we are finished
808 # on non-posix-fs's, we rely on nlink < 2 757 # on non-posix-fs's, we rely on nlink < 2
809 $ndirs = (stat _)[3] - 2 758 $ndirs = (stat _)[3] - 2
810 or return $grp->result ([], $entries); 759 or return $grp->result ([], $entries);
811 } 760 }
812 761
813 # sort into likely dirs and likely nondirs 762 # sort into likely dirs and likely nondirs
814 # dirs == files without ".", short entries first 763 # dirs == files without ".", short entries first
815 $entries = [map $_->[0], 764 $entries = [map $_->[0],
816 sort { $b->[1] cmp $a->[1] } 765 sort { $b->[1] cmp $a->[1] }
817 map [$_, sprintf "%s%04d", (/.\./ ? "1" : "0"), length], 766 map [$_, sprintf "%s%04d", (/.\./ ? "1" : "0"), length],
818 @$entries]; 767 @$entries];
819 768
820 my (@dirs, @nondirs); 769 my (@dirs, @nondirs);
821 770
822 my $statgrp = add $grp aio_group sub { 771 my $statgrp = add $grp aio_group sub {
823 $grp->result (\@dirs, \@nondirs); 772 $grp->result (\@dirs, \@nondirs);
824 }; 773 };
825 774
826 limit $statgrp $maxreq; 775 limit $statgrp $maxreq;
827 feed $statgrp sub { 776 feed $statgrp sub {
828 return unless @$entries; 777 return unless @$entries;
829 my $entry = pop @$entries; 778 my $entry = pop @$entries;
830 779
831 aioreq_pri $pri; 780 aioreq_pri $pri;
832 add $statgrp aio_stat "$path/$entry/.", sub { 781 add $statgrp aio_stat "$path/$entry/.", sub {
833 if ($_[0] < 0) { 782 if ($_[0] < 0) {
834 push @nondirs, $entry; 783 push @nondirs, $entry;
835 } else { 784 } else {
836 # need to check for real directory 785 # need to check for real directory
837 aioreq_pri $pri; 786 aioreq_pri $pri;
838 add $statgrp aio_lstat "$path/$entry", sub { 787 add $statgrp aio_lstat "$path/$entry", sub {
839 if (-d _) { 788 if (-d _) {
840 push @dirs, $entry; 789 push @dirs, $entry;
841 790
842 unless (--$ndirs) { 791 unless (--$ndirs) {
843 push @nondirs, @$entries; 792 push @nondirs, @$entries;
844 feed $statgrp; 793 feed $statgrp;
845 }
846 } else {
847 push @nondirs, $entry;
848 } 794 }
795 } else {
796 push @nondirs, $entry;
849 } 797 }
850 } 798 }
851 }; 799 }
852 }; 800 };
853 }; 801 };
854 }; 802 };
855 }; 803 };
856
857 $grp
858 } 804 };
805
806 $grp
859} 807}
860 808
861=item aio_rmtree $path, $callback->($status) 809=item aio_rmtree $path, $callback->($status)
862 810
863Delete a directory tree starting (and including) C<$path>, return the 811Delete a directory tree starting (and including) C<$path>, return the
867 815
868=cut 816=cut
869 817
870sub aio_rmtree; 818sub aio_rmtree;
871sub aio_rmtree($;$) { 819sub aio_rmtree($;$) {
872 aio_block {
873 my ($path, $cb) = @_; 820 my ($path, $cb) = @_;
874 821
875 my $pri = aioreq_pri; 822 my $pri = aioreq_pri;
876 my $grp = aio_group $cb; 823 my $grp = aio_group $cb;
877 824
878 aioreq_pri $pri; 825 aioreq_pri $pri;
879 add $grp aio_scandir $path, 0, sub { 826 add $grp aio_scandir $path, 0, sub {
880 my ($dirs, $nondirs) = @_; 827 my ($dirs, $nondirs) = @_;
881 828
882 my $dirgrp = aio_group sub { 829 my $dirgrp = aio_group sub {
883 add $grp aio_rmdir $path, sub { 830 add $grp aio_rmdir $path, sub {
884 $grp->result ($_[0]); 831 $grp->result ($_[0]);
885 };
886 }; 832 };
887
888 (aioreq_pri $pri), add $dirgrp aio_rmtree "$path/$_" for @$dirs;
889 (aioreq_pri $pri), add $dirgrp aio_unlink "$path/$_" for @$nondirs;
890
891 add $grp $dirgrp;
892 }; 833 };
893 834
894 $grp 835 (aioreq_pri $pri), add $dirgrp aio_rmtree "$path/$_" for @$dirs;
836 (aioreq_pri $pri), add $dirgrp aio_unlink "$path/$_" for @$nondirs;
837
838 add $grp $dirgrp;
895 } 839 };
840
841 $grp
896} 842}
897 843
898=item aio_sync $callback->($status) 844=item aio_sync $callback->($status)
899 845
900Asynchronously call sync and call the callback when finished. 846Asynchronously call sync and call the callback when finished.
909Asynchronously call fdatasync on the given filehandle and call the 855Asynchronously call fdatasync on the given filehandle and call the
910callback with the fdatasync result code. 856callback with the fdatasync result code.
911 857
912If this call isn't available because your OS lacks it or it couldn't be 858If this call isn't available because your OS lacks it or it couldn't be
913detected, it will be emulated by calling C<fsync> instead. 859detected, it will be emulated by calling C<fsync> instead.
860
861=item aio_pathsync $path, $callback->($status)
862
863This request tries to open, fsync and close the given path. This is a
864composite request intended tosync directories after directory operations
865(E.g. rename). This might not work on all operating systems or have any
866specific effect, but usually it makes sure that directory changes get
867written to disc. It works for anything that can be opened for read-only,
868not just directories.
869
870Passes C<0> when everything went ok, and C<-1> on error.
871
872=cut
873
874sub aio_pathsync($;$) {
875 my ($path, $cb) = @_;
876
877 my $pri = aioreq_pri;
878 my $grp = aio_group $cb;
879
880 aioreq_pri $pri;
881 add $grp aio_open $path, O_RDONLY, 0, sub {
882 my ($fh) = @_;
883 if ($fh) {
884 aioreq_pri $pri;
885 add $grp aio_fsync $fh, sub {
886 $grp->result ($_[0]);
887
888 aioreq_pri $pri;
889 add $grp aio_close $fh;
890 };
891 } else {
892 $grp->result (-1);
893 }
894 };
895
896 $grp
897}
914 898
915=item aio_group $callback->(...) 899=item aio_group $callback->(...)
916 900
917This is a very special aio request: Instead of doing something, it is a 901This is a very special aio request: Instead of doing something, it is a
918container for other aio requests, which is useful if you want to bundle 902container for other aio requests, which is useful if you want to bundle
1055itself. Useful when you queued a lot of events but got a result early. 1039itself. Useful when you queued a lot of events but got a result early.
1056 1040
1057=item $grp->result (...) 1041=item $grp->result (...)
1058 1042
1059Set the result value(s) that will be passed to the group callback when all 1043Set the result value(s) that will be passed to the group callback when all
1060subrequests have finished and set thre groups errno to the current value 1044subrequests have finished and set the groups errno to the current value
1061of errno (just like calling C<errno> without an error number). By default, 1045of errno (just like calling C<errno> without an error number). By default,
1062no argument will be passed and errno is zero. 1046no argument will be passed and errno is zero.
1063 1047
1064=item $grp->errno ([$errno]) 1048=item $grp->errno ([$errno])
1065 1049
1263 1247
1264The default is probably ok in most situations, especially if thread 1248The default is probably ok in most situations, especially if thread
1265creation is fast. If thread creation is very slow on your system you might 1249creation is fast. If thread creation is very slow on your system you might
1266want to use larger values. 1250want to use larger values.
1267 1251
1268=item $oldmaxreqs = IO::AIO::max_outstanding $maxreqs 1252=item IO::AIO::max_outstanding $maxreqs
1269 1253
1270This is a very bad function to use in interactive programs because it 1254This is a very bad function to use in interactive programs because it
1271blocks, and a bad way to reduce concurrency because it is inexact: Better 1255blocks, and a bad way to reduce concurrency because it is inexact: Better
1272use an C<aio_group> together with a feed callback. 1256use an C<aio_group> together with a feed callback.
1273 1257
1278 1262
1279The default value is very large, so there is no practical limit on the 1263The default value is very large, so there is no practical limit on the
1280number of outstanding requests. 1264number of outstanding requests.
1281 1265
1282You can still queue as many requests as you want. Therefore, 1266You can still queue as many requests as you want. Therefore,
1283C<max_oustsanding> is mainly useful in simple scripts (with low values) or 1267C<max_outstanding> is mainly useful in simple scripts (with low values) or
1284as a stop gap to shield against fatal memory overflow (with large values). 1268as a stop gap to shield against fatal memory overflow (with large values).
1285 1269
1286=back 1270=back
1287 1271
1288=head3 STATISTICAL INFORMATION 1272=head3 STATISTICAL INFORMATION

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines