ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/IO-AIO/AIO.pm
(Generate patch)

Comparing IO-AIO/AIO.pm (file contents):
Revision 1.94 by root, Wed Nov 8 02:01:02 2006 UTC vs.
Revision 1.104 by root, Sat Mar 24 19:19:11 2007 UTC

190use strict 'vars'; 190use strict 'vars';
191 191
192use base 'Exporter'; 192use base 'Exporter';
193 193
194BEGIN { 194BEGIN {
195 our $VERSION = '2.2'; 195 our $VERSION = '2.33';
196 196
197 our @AIO_REQ = qw(aio_sendfile aio_read aio_write aio_open aio_close aio_stat 197 our @AIO_REQ = qw(aio_sendfile aio_read aio_write aio_open aio_close aio_stat
198 aio_lstat aio_unlink aio_rmdir aio_readdir aio_scandir aio_symlink 198 aio_lstat aio_unlink aio_rmdir aio_readdir aio_scandir aio_symlink
199 aio_readlink aio_fsync aio_fdatasync aio_readahead aio_rename aio_link 199 aio_readlink aio_fsync aio_fdatasync aio_readahead aio_rename aio_link
200 aio_move aio_copy aio_group aio_nop aio_mknod); 200 aio_move aio_copy aio_group aio_nop aio_mknod aio_load aio_rmtree aio_mkdir);
201 our @EXPORT = (@AIO_REQ, qw(aioreq_pri aioreq_nice)); 201 our @EXPORT = (@AIO_REQ, qw(aioreq_pri aioreq_nice aio_block));
202 our @EXPORT_OK = qw(poll_fileno poll_cb poll_wait flush 202 our @EXPORT_OK = qw(poll_fileno poll_cb poll_wait flush
203 min_parallel max_parallel max_idle 203 min_parallel max_parallel max_idle
204 nreqs nready npending nthreads 204 nreqs nready npending nthreads
205 max_poll_time max_poll_reqs); 205 max_poll_time max_poll_reqs);
206 206
290list. They are the same as used by C<sysopen>. 290list. They are the same as used by C<sysopen>.
291 291
292Likewise, C<$mode> specifies the mode of the newly created file, if it 292Likewise, C<$mode> specifies the mode of the newly created file, if it
293didn't exist and C<O_CREAT> has been given, just like perl's C<sysopen>, 293didn't exist and C<O_CREAT> has been given, just like perl's C<sysopen>,
294except that it is mandatory (i.e. use C<0> if you don't create new files, 294except that it is mandatory (i.e. use C<0> if you don't create new files,
295and C<0666> or C<0777> if you do). 295and C<0666> or C<0777> if you do). Note that the C<$mode> will be modified
296by the umask in effect then the request is being executed, so better never
297change the umask.
296 298
297Example: 299Example:
298 300
299 aio_open "/etc/passwd", O_RDONLY, 0, sub { 301 aio_open "/etc/passwd", O_RDONLY, 0, sub {
300 if ($_[0]) { 302 if ($_[0]) {
430=item aio_rename $srcpath, $dstpath, $callback->($status) 432=item aio_rename $srcpath, $dstpath, $callback->($status)
431 433
432Asynchronously rename the object at C<$srcpath> to C<$dstpath>, just as 434Asynchronously rename the object at C<$srcpath> to C<$dstpath>, just as
433rename(2) and call the callback with the result code. 435rename(2) and call the callback with the result code.
434 436
437=item aio_mkdir $pathname, $mode, $callback->($status)
438
439Asynchronously mkdir (create) a directory and call the callback with
440the result code. C<$mode> will be modified by the umask at the time the
441request is executed, so do not change your umask.
442
435=item aio_rmdir $pathname, $callback->($status) 443=item aio_rmdir $pathname, $callback->($status)
436 444
437Asynchronously rmdir (delete) a directory and call the callback with the 445Asynchronously rmdir (delete) a directory and call the callback with the
438result code. 446result code.
439 447
443directory (i.e. opendir + readdir + closedir). The entries will not be 451directory (i.e. opendir + readdir + closedir). The entries will not be
444sorted, and will B<NOT> include the C<.> and C<..> entries. 452sorted, and will B<NOT> include the C<.> and C<..> entries.
445 453
446The callback a single argument which is either C<undef> or an array-ref 454The callback a single argument which is either C<undef> or an array-ref
447with the filenames. 455with the filenames.
456
457=item aio_load $path, $data, $callback->($status)
458
459This is a composite request that tries to fully load the given file into
460memory. Status is the same as with aio_read.
461
462=cut
463
464sub aio_load($$;$) {
465 aio_block {
466 my ($path, undef, $cb) = @_;
467 my $data = \$_[1];
468
469 my $pri = aioreq_pri;
470 my $grp = aio_group $cb;
471
472 aioreq_pri $pri;
473 add $grp aio_open $path, O_RDONLY, 0, sub {
474 my $fh = shift
475 or return $grp->result (-1);
476
477 aioreq_pri $pri;
478 add $grp aio_read $fh, 0, (-s $fh), $$data, 0, sub {
479 $grp->result ($_[0]);
480 };
481 };
482
483 $grp
484 }
485}
448 486
449=item aio_copy $srcpath, $dstpath, $callback->($status) 487=item aio_copy $srcpath, $dstpath, $callback->($status)
450 488
451Try to copy the I<file> (directories not supported as either source or 489Try to copy the I<file> (directories not supported as either source or
452destination) from C<$srcpath> to C<$dstpath> and call the callback with 490destination) from C<$srcpath> to C<$dstpath> and call the callback with
462errors are being ignored. 500errors are being ignored.
463 501
464=cut 502=cut
465 503
466sub aio_copy($$;$) { 504sub aio_copy($$;$) {
505 aio_block {
467 my ($src, $dst, $cb) = @_; 506 my ($src, $dst, $cb) = @_;
468 507
469 my $pri = aioreq_pri; 508 my $pri = aioreq_pri;
470 my $grp = aio_group $cb; 509 my $grp = aio_group $cb;
471 510
472 aioreq_pri $pri; 511 aioreq_pri $pri;
473 add $grp aio_open $src, O_RDONLY, 0, sub { 512 add $grp aio_open $src, O_RDONLY, 0, sub {
474 if (my $src_fh = $_[0]) { 513 if (my $src_fh = $_[0]) {
475 my @stat = stat $src_fh; 514 my @stat = stat $src_fh;
476 515
477 aioreq_pri $pri; 516 aioreq_pri $pri;
478 add $grp aio_open $dst, O_CREAT | O_WRONLY | O_TRUNC, 0200, sub { 517 add $grp aio_open $dst, O_CREAT | O_WRONLY | O_TRUNC, 0200, sub {
479 if (my $dst_fh = $_[0]) { 518 if (my $dst_fh = $_[0]) {
480 aioreq_pri $pri; 519 aioreq_pri $pri;
481 add $grp aio_sendfile $dst_fh, $src_fh, 0, $stat[7], sub { 520 add $grp aio_sendfile $dst_fh, $src_fh, 0, $stat[7], sub {
482 if ($_[0] == $stat[7]) { 521 if ($_[0] == $stat[7]) {
483 $grp->result (0); 522 $grp->result (0);
484 close $src_fh; 523 close $src_fh;
485 524
486 # those should not normally block. should. should. 525 # those should not normally block. should. should.
487 utime $stat[8], $stat[9], $dst; 526 utime $stat[8], $stat[9], $dst;
488 chmod $stat[2] & 07777, $dst_fh; 527 chmod $stat[2] & 07777, $dst_fh;
489 chown $stat[4], $stat[5], $dst_fh; 528 chown $stat[4], $stat[5], $dst_fh;
490 close $dst_fh; 529 close $dst_fh;
491 } else { 530 } else {
492 $grp->result (-1); 531 $grp->result (-1);
493 close $src_fh; 532 close $src_fh;
494 close $dst_fh; 533 close $dst_fh;
495 534
496 aioreq $pri; 535 aioreq $pri;
497 add $grp aio_unlink $dst; 536 add $grp aio_unlink $dst;
537 }
498 } 538 };
539 } else {
540 $grp->result (-1);
499 }; 541 }
500 } else {
501 $grp->result (-1);
502 } 542 },
543
544 } else {
545 $grp->result (-1);
503 }, 546 }
504
505 } else {
506 $grp->result (-1);
507 } 547 };
548
549 $grp
508 }; 550 }
509
510 $grp
511} 551}
512 552
513=item aio_move $srcpath, $dstpath, $callback->($status) 553=item aio_move $srcpath, $dstpath, $callback->($status)
514 554
515Try to move the I<file> (directories not supported as either source or 555Try to move the I<file> (directories not supported as either source or
521that is successful, unlinking the C<$srcpath>. 561that is successful, unlinking the C<$srcpath>.
522 562
523=cut 563=cut
524 564
525sub aio_move($$;$) { 565sub aio_move($$;$) {
566 aio_block {
526 my ($src, $dst, $cb) = @_; 567 my ($src, $dst, $cb) = @_;
527 568
528 my $pri = aioreq_pri; 569 my $pri = aioreq_pri;
529 my $grp = aio_group $cb; 570 my $grp = aio_group $cb;
530 571
531 aioreq_pri $pri; 572 aioreq_pri $pri;
532 add $grp aio_rename $src, $dst, sub { 573 add $grp aio_rename $src, $dst, sub {
533 if ($_[0] && $! == EXDEV) { 574 if ($_[0] && $! == EXDEV) {
534 aioreq_pri $pri; 575 aioreq_pri $pri;
535 add $grp aio_copy $src, $dst, sub { 576 add $grp aio_copy $src, $dst, sub {
577 $grp->result ($_[0]);
578
579 if (!$_[0]) {
580 aioreq_pri $pri;
581 add $grp aio_unlink $src;
582 }
583 };
584 } else {
536 $grp->result ($_[0]); 585 $grp->result ($_[0]);
537
538 if (!$_[0]) {
539 aioreq_pri $pri;
540 add $grp aio_unlink $src;
541 }
542 }; 586 }
543 } else {
544 $grp->result ($_[0]);
545 } 587 };
588
589 $grp
546 }; 590 }
547
548 $grp
549} 591}
550 592
551=item aio_scandir $path, $maxreq, $callback->($dirs, $nondirs) 593=item aio_scandir $path, $maxreq, $callback->($dirs, $nondirs)
552 594
553Scans a directory (similar to C<aio_readdir>) but additionally tries to 595Scans a directory (similar to C<aio_readdir>) but additionally tries to
600as those tend to return 0 or 1 as link counts, which disables the 642as those tend to return 0 or 1 as link counts, which disables the
601directory counting heuristic. 643directory counting heuristic.
602 644
603=cut 645=cut
604 646
605sub aio_scandir($$$) { 647sub aio_scandir($$;$) {
648 aio_block {
606 my ($path, $maxreq, $cb) = @_; 649 my ($path, $maxreq, $cb) = @_;
607 650
608 my $pri = aioreq_pri; 651 my $pri = aioreq_pri;
609 652
610 my $grp = aio_group $cb; 653 my $grp = aio_group $cb;
611 654
612 $maxreq = 4 if $maxreq <= 0; 655 $maxreq = 4 if $maxreq <= 0;
613 656
614 # stat once 657 # stat once
615 aioreq_pri $pri;
616 add $grp aio_stat $path, sub {
617 return $grp->result () if $_[0];
618 my $now = time;
619 my $hash1 = join ":", (stat _)[0,1,3,7,9];
620
621 # read the directory entries
622 aioreq_pri $pri; 658 aioreq_pri $pri;
623 add $grp aio_readdir $path, sub { 659 add $grp aio_stat $path, sub {
624 my $entries = shift
625 or return $grp->result (); 660 return $grp->result () if $_[0];
661 my $now = time;
662 my $hash1 = join ":", (stat _)[0,1,3,7,9];
626 663
627 # stat the dir another time 664 # read the directory entries
628 aioreq_pri $pri; 665 aioreq_pri $pri;
666 add $grp aio_readdir $path, sub {
667 my $entries = shift
668 or return $grp->result ();
669
670 # stat the dir another time
671 aioreq_pri $pri;
629 add $grp aio_stat $path, sub { 672 add $grp aio_stat $path, sub {
630 my $hash2 = join ":", (stat _)[0,1,3,7,9]; 673 my $hash2 = join ":", (stat _)[0,1,3,7,9];
631 674
632 my $ndirs; 675 my $ndirs;
633 676
634 # take the slow route if anything looks fishy 677 # take the slow route if anything looks fishy
635 if ($hash1 ne $hash2 or (stat _)[9] == $now) { 678 if ($hash1 ne $hash2 or (stat _)[9] == $now) {
636 $ndirs = -1; 679 $ndirs = -1;
637 } else { 680 } else {
638 # if nlink == 2, we are finished 681 # if nlink == 2, we are finished
639 # on non-posix-fs's, we rely on nlink < 2 682 # on non-posix-fs's, we rely on nlink < 2
640 $ndirs = (stat _)[3] - 2 683 $ndirs = (stat _)[3] - 2
641 or return $grp->result ([], $entries); 684 or return $grp->result ([], $entries);
642 } 685 }
643 686
644 # sort into likely dirs and likely nondirs 687 # sort into likely dirs and likely nondirs
645 # dirs == files without ".", short entries first 688 # dirs == files without ".", short entries first
646 $entries = [map $_->[0], 689 $entries = [map $_->[0],
647 sort { $b->[1] cmp $a->[1] } 690 sort { $b->[1] cmp $a->[1] }
648 map [$_, sprintf "%s%04d", (/.\./ ? "1" : "0"), length], 691 map [$_, sprintf "%s%04d", (/.\./ ? "1" : "0"), length],
649 @$entries]; 692 @$entries];
650 693
651 my (@dirs, @nondirs); 694 my (@dirs, @nondirs);
652 695
653 my $statgrp = add $grp aio_group sub { 696 my $statgrp = add $grp aio_group sub {
654 $grp->result (\@dirs, \@nondirs); 697 $grp->result (\@dirs, \@nondirs);
655 }; 698 };
656 699
657 limit $statgrp $maxreq; 700 limit $statgrp $maxreq;
658 feed $statgrp sub { 701 feed $statgrp sub {
659 return unless @$entries; 702 return unless @$entries;
660 my $entry = pop @$entries; 703 my $entry = pop @$entries;
661 704
662 aioreq_pri $pri; 705 aioreq_pri $pri;
663 add $statgrp aio_stat "$path/$entry/.", sub { 706 add $statgrp aio_stat "$path/$entry/.", sub {
664 if ($_[0] < 0) { 707 if ($_[0] < 0) {
665 push @nondirs, $entry; 708 push @nondirs, $entry;
666 } else { 709 } else {
667 # need to check for real directory 710 # need to check for real directory
668 aioreq_pri $pri; 711 aioreq_pri $pri;
669 add $statgrp aio_lstat "$path/$entry", sub { 712 add $statgrp aio_lstat "$path/$entry", sub {
670 if (-d _) { 713 if (-d _) {
671 push @dirs, $entry; 714 push @dirs, $entry;
672 715
673 unless (--$ndirs) { 716 unless (--$ndirs) {
674 push @nondirs, @$entries; 717 push @nondirs, @$entries;
675 feed $statgrp; 718 feed $statgrp;
719 }
720 } else {
721 push @nondirs, $entry;
676 } 722 }
677 } else {
678 push @nondirs, $entry;
679 } 723 }
680 } 724 }
681 } 725 };
682 }; 726 };
683 }; 727 };
684 }; 728 };
685 }; 729 };
730
731 $grp
686 }; 732 }
733}
687 734
735=item aio_rmtree $path, $callback->($status)
736
737Delete a directory tree starting (and including) C<$path>, return the
738status of the final C<rmdir> only. This is a composite request that
739uses C<aio_scandir> to recurse into and rmdir directories, and unlink
740everything else.
741
742=cut
743
744sub aio_rmtree;
745sub aio_rmtree($;$) {
746 aio_block {
747 my ($path, $cb) = @_;
748
749 my $pri = aioreq_pri;
750 my $grp = aio_group $cb;
751
752 aioreq_pri $pri;
753 add $grp aio_scandir $path, 0, sub {
754 my ($dirs, $nondirs) = @_;
755
756 my $dirgrp = aio_group sub {
757 add $grp aio_rmdir $path, sub {
758 $grp->result ($_[0]);
759 };
760 };
761
762 (aioreq_pri $pri), add $dirgrp aio_rmtree "$path/$_" for @$dirs;
763 (aioreq_pri $pri), add $dirgrp aio_unlink "$path/$_" for @$nondirs;
764
765 add $grp $dirgrp;
766 };
767
688 $grp 768 $grp
769 }
689} 770}
690 771
691=item aio_fsync $fh, $callback->($status) 772=item aio_fsync $fh, $callback->($status)
692 773
693Asynchronously call fsync on the given filehandle and call the callback 774Asynchronously call fsync on the given filehandle and call the callback
997Strictly equivalent to: 1078Strictly equivalent to:
998 1079
999 IO::AIO::poll_wait, IO::AIO::poll_cb 1080 IO::AIO::poll_wait, IO::AIO::poll_cb
1000 while IO::AIO::nreqs; 1081 while IO::AIO::nreqs;
1001 1082
1083=back
1084
1002=head3 CONTROLLING THE NUMBER OF THREADS 1085=head3 CONTROLLING THE NUMBER OF THREADS
1003 1086
1004=item IO::AIO::min_parallel $nthreads 1087=item IO::AIO::min_parallel $nthreads
1005 1088
1006Set the minimum number of AIO threads to C<$nthreads>. The current 1089Set the minimum number of AIO threads to C<$nthreads>. The current
1066 1149
1067You can still queue as many requests as you want. Therefore, 1150You can still queue as many requests as you want. Therefore,
1068C<max_oustsanding> is mainly useful in simple scripts (with low values) or 1151C<max_oustsanding> is mainly useful in simple scripts (with low values) or
1069as a stop gap to shield against fatal memory overflow (with large values). 1152as a stop gap to shield against fatal memory overflow (with large values).
1070 1153
1154=back
1155
1071=head3 STATISTICAL INFORMATION 1156=head3 STATISTICAL INFORMATION
1157
1158=over
1072 1159
1073=item IO::AIO::nreqs 1160=item IO::AIO::nreqs
1074 1161
1075Returns the number of requests currently in the ready, execute or pending 1162Returns the number of requests currently in the ready, execute or pending
1076states (i.e. for which their callback has not been invoked yet). 1163states (i.e. for which their callback has not been invoked yet).
1110 *$sym 1197 *$sym
1111} 1198}
1112 1199
1113min_parallel 8; 1200min_parallel 8;
1114 1201
1115END { 1202END { flush }
1116 min_parallel 1;
1117 flush;
1118};
1119 1203
11201; 12041;
1121 1205
1122=head2 FORK BEHAVIOUR 1206=head2 FORK BEHAVIOUR
1123 1207

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines