ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/IO-AIO/AIO.pm
(Generate patch)

Comparing IO-AIO/AIO.pm (file contents):
Revision 1.91 by root, Mon Oct 30 23:30:29 2006 UTC vs.
Revision 1.104 by root, Sat Mar 24 19:19:11 2007 UTC

5=head1 SYNOPSIS 5=head1 SYNOPSIS
6 6
7 use IO::AIO; 7 use IO::AIO;
8 8
9 aio_open "/etc/passwd", O_RDONLY, 0, sub { 9 aio_open "/etc/passwd", O_RDONLY, 0, sub {
10 my ($fh) = @_; 10 my $fh = shift
11 or die "/etc/passwd: $!";
11 ... 12 ...
12 }; 13 };
13 14
14 aio_unlink "/tmp/file", sub { }; 15 aio_unlink "/tmp/file", sub { };
15 16
99 poll => 'r', 100 poll => 'r',
100 cb => \&IO::AIO::poll_cb); 101 cb => \&IO::AIO::poll_cb);
101 102
102 # queue the request to open /etc/passwd 103 # queue the request to open /etc/passwd
103 aio_open "/etc/passwd", O_RDONLY, 0, sub { 104 aio_open "/etc/passwd", O_RDONLY, 0, sub {
104 my $fh = $_[0] 105 my $fh = shift
105 or die "error while opening: $!"; 106 or die "error while opening: $!";
106 107
107 # stat'ing filehandles is generally non-blocking 108 # stat'ing filehandles is generally non-blocking
108 my $size = -s $fh; 109 my $size = -s $fh;
109 110
189use strict 'vars'; 190use strict 'vars';
190 191
191use base 'Exporter'; 192use base 'Exporter';
192 193
193BEGIN { 194BEGIN {
194 our $VERSION = '2.2'; 195 our $VERSION = '2.33';
195 196
196 our @AIO_REQ = qw(aio_sendfile aio_read aio_write aio_open aio_close aio_stat 197 our @AIO_REQ = qw(aio_sendfile aio_read aio_write aio_open aio_close aio_stat
197 aio_lstat aio_unlink aio_rmdir aio_readdir aio_scandir aio_symlink 198 aio_lstat aio_unlink aio_rmdir aio_readdir aio_scandir aio_symlink
198 aio_readlink aio_fsync aio_fdatasync aio_readahead aio_rename aio_link 199 aio_readlink aio_fsync aio_fdatasync aio_readahead aio_rename aio_link
199 aio_move aio_copy aio_group aio_nop aio_mknod); 200 aio_move aio_copy aio_group aio_nop aio_mknod aio_load aio_rmtree aio_mkdir);
200 our @EXPORT = (@AIO_REQ, qw(aioreq_pri aioreq_nice)); 201 our @EXPORT = (@AIO_REQ, qw(aioreq_pri aioreq_nice aio_block));
201 our @EXPORT_OK = qw(poll_fileno poll_cb poll_wait flush 202 our @EXPORT_OK = qw(poll_fileno poll_cb poll_wait flush
202 min_parallel max_parallel max_idle 203 min_parallel max_parallel max_idle
203 nreqs nready npending nthreads 204 nreqs nready npending nthreads
204 max_poll_time max_poll_reqs); 205 max_poll_time max_poll_reqs);
205 206
289list. They are the same as used by C<sysopen>. 290list. They are the same as used by C<sysopen>.
290 291
291Likewise, C<$mode> specifies the mode of the newly created file, if it 292Likewise, C<$mode> specifies the mode of the newly created file, if it
292didn't exist and C<O_CREAT> has been given, just like perl's C<sysopen>, 293didn't exist and C<O_CREAT> has been given, just like perl's C<sysopen>,
293except that it is mandatory (i.e. use C<0> if you don't create new files, 294except that it is mandatory (i.e. use C<0> if you don't create new files,
294and C<0666> or C<0777> if you do). 295and C<0666> or C<0777> if you do). Note that the C<$mode> will be modified
296by the umask in effect then the request is being executed, so better never
297change the umask.
295 298
296Example: 299Example:
297 300
298 aio_open "/etc/passwd", O_RDONLY, 0, sub { 301 aio_open "/etc/passwd", O_RDONLY, 0, sub {
299 if ($_[0]) { 302 if ($_[0]) {
429=item aio_rename $srcpath, $dstpath, $callback->($status) 432=item aio_rename $srcpath, $dstpath, $callback->($status)
430 433
431Asynchronously rename the object at C<$srcpath> to C<$dstpath>, just as 434Asynchronously rename the object at C<$srcpath> to C<$dstpath>, just as
432rename(2) and call the callback with the result code. 435rename(2) and call the callback with the result code.
433 436
437=item aio_mkdir $pathname, $mode, $callback->($status)
438
439Asynchronously mkdir (create) a directory and call the callback with
440the result code. C<$mode> will be modified by the umask at the time the
441request is executed, so do not change your umask.
442
434=item aio_rmdir $pathname, $callback->($status) 443=item aio_rmdir $pathname, $callback->($status)
435 444
436Asynchronously rmdir (delete) a directory and call the callback with the 445Asynchronously rmdir (delete) a directory and call the callback with the
437result code. 446result code.
438 447
442directory (i.e. opendir + readdir + closedir). The entries will not be 451directory (i.e. opendir + readdir + closedir). The entries will not be
443sorted, and will B<NOT> include the C<.> and C<..> entries. 452sorted, and will B<NOT> include the C<.> and C<..> entries.
444 453
445The callback a single argument which is either C<undef> or an array-ref 454The callback a single argument which is either C<undef> or an array-ref
446with the filenames. 455with the filenames.
456
457=item aio_load $path, $data, $callback->($status)
458
459This is a composite request that tries to fully load the given file into
460memory. Status is the same as with aio_read.
461
462=cut
463
464sub aio_load($$;$) {
465 aio_block {
466 my ($path, undef, $cb) = @_;
467 my $data = \$_[1];
468
469 my $pri = aioreq_pri;
470 my $grp = aio_group $cb;
471
472 aioreq_pri $pri;
473 add $grp aio_open $path, O_RDONLY, 0, sub {
474 my $fh = shift
475 or return $grp->result (-1);
476
477 aioreq_pri $pri;
478 add $grp aio_read $fh, 0, (-s $fh), $$data, 0, sub {
479 $grp->result ($_[0]);
480 };
481 };
482
483 $grp
484 }
485}
447 486
448=item aio_copy $srcpath, $dstpath, $callback->($status) 487=item aio_copy $srcpath, $dstpath, $callback->($status)
449 488
450Try to copy the I<file> (directories not supported as either source or 489Try to copy the I<file> (directories not supported as either source or
451destination) from C<$srcpath> to C<$dstpath> and call the callback with 490destination) from C<$srcpath> to C<$dstpath> and call the callback with
461errors are being ignored. 500errors are being ignored.
462 501
463=cut 502=cut
464 503
465sub aio_copy($$;$) { 504sub aio_copy($$;$) {
505 aio_block {
466 my ($src, $dst, $cb) = @_; 506 my ($src, $dst, $cb) = @_;
467 507
468 my $pri = aioreq_pri; 508 my $pri = aioreq_pri;
469 my $grp = aio_group $cb; 509 my $grp = aio_group $cb;
470 510
471 aioreq_pri $pri; 511 aioreq_pri $pri;
472 add $grp aio_open $src, O_RDONLY, 0, sub { 512 add $grp aio_open $src, O_RDONLY, 0, sub {
473 if (my $src_fh = $_[0]) { 513 if (my $src_fh = $_[0]) {
474 my @stat = stat $src_fh; 514 my @stat = stat $src_fh;
475 515
476 aioreq_pri $pri; 516 aioreq_pri $pri;
477 add $grp aio_open $dst, O_CREAT | O_WRONLY | O_TRUNC, 0200, sub { 517 add $grp aio_open $dst, O_CREAT | O_WRONLY | O_TRUNC, 0200, sub {
478 if (my $dst_fh = $_[0]) { 518 if (my $dst_fh = $_[0]) {
479 aioreq_pri $pri; 519 aioreq_pri $pri;
480 add $grp aio_sendfile $dst_fh, $src_fh, 0, $stat[7], sub { 520 add $grp aio_sendfile $dst_fh, $src_fh, 0, $stat[7], sub {
481 if ($_[0] == $stat[7]) { 521 if ($_[0] == $stat[7]) {
482 $grp->result (0); 522 $grp->result (0);
483 close $src_fh; 523 close $src_fh;
484 524
485 # those should not normally block. should. should. 525 # those should not normally block. should. should.
486 utime $stat[8], $stat[9], $dst; 526 utime $stat[8], $stat[9], $dst;
487 chmod $stat[2] & 07777, $dst_fh; 527 chmod $stat[2] & 07777, $dst_fh;
488 chown $stat[4], $stat[5], $dst_fh; 528 chown $stat[4], $stat[5], $dst_fh;
489 close $dst_fh; 529 close $dst_fh;
490 } else { 530 } else {
491 $grp->result (-1); 531 $grp->result (-1);
492 close $src_fh; 532 close $src_fh;
493 close $dst_fh; 533 close $dst_fh;
494 534
495 aioreq $pri; 535 aioreq $pri;
496 add $grp aio_unlink $dst; 536 add $grp aio_unlink $dst;
537 }
497 } 538 };
539 } else {
540 $grp->result (-1);
498 }; 541 }
499 } else {
500 $grp->result (-1);
501 } 542 },
543
544 } else {
545 $grp->result (-1);
502 }, 546 }
503
504 } else {
505 $grp->result (-1);
506 } 547 };
548
549 $grp
507 }; 550 }
508
509 $grp
510} 551}
511 552
512=item aio_move $srcpath, $dstpath, $callback->($status) 553=item aio_move $srcpath, $dstpath, $callback->($status)
513 554
514Try to move the I<file> (directories not supported as either source or 555Try to move the I<file> (directories not supported as either source or
520that is successful, unlinking the C<$srcpath>. 561that is successful, unlinking the C<$srcpath>.
521 562
522=cut 563=cut
523 564
524sub aio_move($$;$) { 565sub aio_move($$;$) {
566 aio_block {
525 my ($src, $dst, $cb) = @_; 567 my ($src, $dst, $cb) = @_;
526 568
527 my $pri = aioreq_pri; 569 my $pri = aioreq_pri;
528 my $grp = aio_group $cb; 570 my $grp = aio_group $cb;
529 571
530 aioreq_pri $pri; 572 aioreq_pri $pri;
531 add $grp aio_rename $src, $dst, sub { 573 add $grp aio_rename $src, $dst, sub {
532 if ($_[0] && $! == EXDEV) { 574 if ($_[0] && $! == EXDEV) {
533 aioreq_pri $pri; 575 aioreq_pri $pri;
534 add $grp aio_copy $src, $dst, sub { 576 add $grp aio_copy $src, $dst, sub {
577 $grp->result ($_[0]);
578
579 if (!$_[0]) {
580 aioreq_pri $pri;
581 add $grp aio_unlink $src;
582 }
583 };
584 } else {
535 $grp->result ($_[0]); 585 $grp->result ($_[0]);
536
537 if (!$_[0]) {
538 aioreq_pri $pri;
539 add $grp aio_unlink $src;
540 }
541 }; 586 }
542 } else {
543 $grp->result ($_[0]);
544 } 587 };
588
589 $grp
545 }; 590 }
546
547 $grp
548} 591}
549 592
550=item aio_scandir $path, $maxreq, $callback->($dirs, $nondirs) 593=item aio_scandir $path, $maxreq, $callback->($dirs, $nondirs)
551 594
552Scans a directory (similar to C<aio_readdir>) but additionally tries to 595Scans a directory (similar to C<aio_readdir>) but additionally tries to
599as those tend to return 0 or 1 as link counts, which disables the 642as those tend to return 0 or 1 as link counts, which disables the
600directory counting heuristic. 643directory counting heuristic.
601 644
602=cut 645=cut
603 646
604sub aio_scandir($$$) { 647sub aio_scandir($$;$) {
648 aio_block {
605 my ($path, $maxreq, $cb) = @_; 649 my ($path, $maxreq, $cb) = @_;
606 650
607 my $pri = aioreq_pri; 651 my $pri = aioreq_pri;
608 652
609 my $grp = aio_group $cb; 653 my $grp = aio_group $cb;
610 654
611 $maxreq = 4 if $maxreq <= 0; 655 $maxreq = 4 if $maxreq <= 0;
612 656
613 # stat once 657 # stat once
614 aioreq_pri $pri;
615 add $grp aio_stat $path, sub {
616 return $grp->result () if $_[0];
617 my $now = time;
618 my $hash1 = join ":", (stat _)[0,1,3,7,9];
619
620 # read the directory entries
621 aioreq_pri $pri; 658 aioreq_pri $pri;
622 add $grp aio_readdir $path, sub { 659 add $grp aio_stat $path, sub {
623 my $entries = shift
624 or return $grp->result (); 660 return $grp->result () if $_[0];
661 my $now = time;
662 my $hash1 = join ":", (stat _)[0,1,3,7,9];
625 663
626 # stat the dir another time 664 # read the directory entries
627 aioreq_pri $pri; 665 aioreq_pri $pri;
666 add $grp aio_readdir $path, sub {
667 my $entries = shift
668 or return $grp->result ();
669
670 # stat the dir another time
671 aioreq_pri $pri;
628 add $grp aio_stat $path, sub { 672 add $grp aio_stat $path, sub {
629 my $hash2 = join ":", (stat _)[0,1,3,7,9]; 673 my $hash2 = join ":", (stat _)[0,1,3,7,9];
630 674
631 my $ndirs; 675 my $ndirs;
632 676
633 # take the slow route if anything looks fishy 677 # take the slow route if anything looks fishy
634 if ($hash1 ne $hash2 or (stat _)[9] == $now) { 678 if ($hash1 ne $hash2 or (stat _)[9] == $now) {
635 $ndirs = -1; 679 $ndirs = -1;
636 } else { 680 } else {
637 # if nlink == 2, we are finished 681 # if nlink == 2, we are finished
638 # on non-posix-fs's, we rely on nlink < 2 682 # on non-posix-fs's, we rely on nlink < 2
639 $ndirs = (stat _)[3] - 2 683 $ndirs = (stat _)[3] - 2
640 or return $grp->result ([], $entries); 684 or return $grp->result ([], $entries);
641 } 685 }
642 686
643 # sort into likely dirs and likely nondirs 687 # sort into likely dirs and likely nondirs
644 # dirs == files without ".", short entries first 688 # dirs == files without ".", short entries first
645 $entries = [map $_->[0], 689 $entries = [map $_->[0],
646 sort { $b->[1] cmp $a->[1] } 690 sort { $b->[1] cmp $a->[1] }
647 map [$_, sprintf "%s%04d", (/.\./ ? "1" : "0"), length], 691 map [$_, sprintf "%s%04d", (/.\./ ? "1" : "0"), length],
648 @$entries]; 692 @$entries];
649 693
650 my (@dirs, @nondirs); 694 my (@dirs, @nondirs);
651 695
652 my $statgrp = add $grp aio_group sub { 696 my $statgrp = add $grp aio_group sub {
653 $grp->result (\@dirs, \@nondirs); 697 $grp->result (\@dirs, \@nondirs);
654 }; 698 };
655 699
656 limit $statgrp $maxreq; 700 limit $statgrp $maxreq;
657 feed $statgrp sub { 701 feed $statgrp sub {
658 return unless @$entries; 702 return unless @$entries;
659 my $entry = pop @$entries; 703 my $entry = pop @$entries;
660 704
661 aioreq_pri $pri; 705 aioreq_pri $pri;
662 add $statgrp aio_stat "$path/$entry/.", sub { 706 add $statgrp aio_stat "$path/$entry/.", sub {
663 if ($_[0] < 0) { 707 if ($_[0] < 0) {
664 push @nondirs, $entry; 708 push @nondirs, $entry;
665 } else { 709 } else {
666 # need to check for real directory 710 # need to check for real directory
667 aioreq_pri $pri; 711 aioreq_pri $pri;
668 add $statgrp aio_lstat "$path/$entry", sub { 712 add $statgrp aio_lstat "$path/$entry", sub {
669 if (-d _) { 713 if (-d _) {
670 push @dirs, $entry; 714 push @dirs, $entry;
671 715
672 unless (--$ndirs) { 716 unless (--$ndirs) {
673 push @nondirs, @$entries; 717 push @nondirs, @$entries;
674 feed $statgrp; 718 feed $statgrp;
719 }
720 } else {
721 push @nondirs, $entry;
675 } 722 }
676 } else {
677 push @nondirs, $entry;
678 } 723 }
679 } 724 }
680 } 725 };
681 }; 726 };
682 }; 727 };
683 }; 728 };
684 }; 729 };
730
731 $grp
685 }; 732 }
733}
686 734
735=item aio_rmtree $path, $callback->($status)
736
737Delete a directory tree starting (and including) C<$path>, return the
738status of the final C<rmdir> only. This is a composite request that
739uses C<aio_scandir> to recurse into and rmdir directories, and unlink
740everything else.
741
742=cut
743
744sub aio_rmtree;
745sub aio_rmtree($;$) {
746 aio_block {
747 my ($path, $cb) = @_;
748
749 my $pri = aioreq_pri;
750 my $grp = aio_group $cb;
751
752 aioreq_pri $pri;
753 add $grp aio_scandir $path, 0, sub {
754 my ($dirs, $nondirs) = @_;
755
756 my $dirgrp = aio_group sub {
757 add $grp aio_rmdir $path, sub {
758 $grp->result ($_[0]);
759 };
760 };
761
762 (aioreq_pri $pri), add $dirgrp aio_rmtree "$path/$_" for @$dirs;
763 (aioreq_pri $pri), add $dirgrp aio_unlink "$path/$_" for @$nondirs;
764
765 add $grp $dirgrp;
766 };
767
687 $grp 768 $grp
769 }
688} 770}
689 771
690=item aio_fsync $fh, $callback->($status) 772=item aio_fsync $fh, $callback->($status)
691 773
692Asynchronously call fsync on the given filehandle and call the callback 774Asynchronously call fsync on the given filehandle and call the callback
971 poll => 'r', nice => 1, 1053 poll => 'r', nice => 1,
972 cb => &IO::AIO::poll_cb); 1054 cb => &IO::AIO::poll_cb);
973 1055
974=item IO::AIO::poll_wait 1056=item IO::AIO::poll_wait
975 1057
1058If there are any outstanding requests and none of them in the result
976Wait till the result filehandle becomes ready for reading (simply does a 1059phase, wait till the result filehandle becomes ready for reading (simply
977C<select> on the filehandle. This is useful if you want to synchronously 1060does a C<select> on the filehandle. This is useful if you want to
978wait for some requests to finish). 1061synchronously wait for some requests to finish).
979 1062
980See C<nreqs> for an example. 1063See C<nreqs> for an example.
981 1064
982=item IO::AIO::poll 1065=item IO::AIO::poll
983 1066
984Waits until some requests have been handled. 1067Waits until some requests have been handled.
985 1068
1069Returns the number of requests processed, but is otherwise strictly
986Strictly equivalent to: 1070equivalent to:
987 1071
988 IO::AIO::poll_wait, IO::AIO::poll_cb 1072 IO::AIO::poll_wait, IO::AIO::poll_cb
989 if IO::AIO::nreqs;
990 1073
991=item IO::AIO::flush 1074=item IO::AIO::flush
992 1075
993Wait till all outstanding AIO requests have been handled. 1076Wait till all outstanding AIO requests have been handled.
994 1077
995Strictly equivalent to: 1078Strictly equivalent to:
996 1079
997 IO::AIO::poll_wait, IO::AIO::poll_cb 1080 IO::AIO::poll_wait, IO::AIO::poll_cb
998 while IO::AIO::nreqs; 1081 while IO::AIO::nreqs;
1082
1083=back
999 1084
1000=head3 CONTROLLING THE NUMBER OF THREADS 1085=head3 CONTROLLING THE NUMBER OF THREADS
1001 1086
1002=item IO::AIO::min_parallel $nthreads 1087=item IO::AIO::min_parallel $nthreads
1003 1088
1064 1149
1065You can still queue as many requests as you want. Therefore, 1150You can still queue as many requests as you want. Therefore,
1066C<max_oustsanding> is mainly useful in simple scripts (with low values) or 1151C<max_oustsanding> is mainly useful in simple scripts (with low values) or
1067as a stop gap to shield against fatal memory overflow (with large values). 1152as a stop gap to shield against fatal memory overflow (with large values).
1068 1153
1154=back
1155
1069=head3 STATISTICAL INFORMATION 1156=head3 STATISTICAL INFORMATION
1157
1158=over
1070 1159
1071=item IO::AIO::nreqs 1160=item IO::AIO::nreqs
1072 1161
1073Returns the number of requests currently in the ready, execute or pending 1162Returns the number of requests currently in the ready, execute or pending
1074states (i.e. for which their callback has not been invoked yet). 1163states (i.e. for which their callback has not been invoked yet).
1108 *$sym 1197 *$sym
1109} 1198}
1110 1199
1111min_parallel 8; 1200min_parallel 8;
1112 1201
1113END { 1202END { flush }
1114 min_parallel 1;
1115 flush;
1116};
1117 1203
11181; 12041;
1119 1205
1120=head2 FORK BEHAVIOUR 1206=head2 FORK BEHAVIOUR
1121 1207

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines