ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/IO-AIO/AIO.pm
(Generate patch)

Comparing IO-AIO/AIO.pm (file contents):
Revision 1.89 by root, Sun Oct 29 11:03:18 2006 UTC vs.
Revision 1.105 by root, Sun Mar 25 00:20:27 2007 UTC

5=head1 SYNOPSIS 5=head1 SYNOPSIS
6 6
7 use IO::AIO; 7 use IO::AIO;
8 8
9 aio_open "/etc/passwd", O_RDONLY, 0, sub { 9 aio_open "/etc/passwd", O_RDONLY, 0, sub {
10 my ($fh) = @_; 10 my $fh = shift
11 or die "/etc/passwd: $!";
11 ... 12 ...
12 }; 13 };
13 14
14 aio_unlink "/tmp/file", sub { }; 15 aio_unlink "/tmp/file", sub { };
15 16
61etc.), but can also be used to easily do operations in parallel that are 62etc.), but can also be used to easily do operations in parallel that are
62normally done sequentially, e.g. stat'ing many files, which is much faster 63normally done sequentially, e.g. stat'ing many files, which is much faster
63on a RAID volume or over NFS when you do a number of stat operations 64on a RAID volume or over NFS when you do a number of stat operations
64concurrently. 65concurrently.
65 66
66While this works on all types of file descriptors (for example sockets), 67While most of this works on all types of file descriptors (for example
67using these functions on file descriptors that support nonblocking 68sockets), using these functions on file descriptors that support
68operation (again, sockets, pipes etc.) is very inefficient. Use an event 69nonblocking operation (again, sockets, pipes etc.) is very inefficient or
70might not work (aio_read fails on sockets/pipes/fifos). Use an event loop
69loop for that (such as the L<Event|Event> module): IO::AIO will naturally 71for that (such as the L<Event|Event> module): IO::AIO will naturally fit
70fit into such an event loop itself. 72into such an event loop itself.
71 73
72In this version, a number of threads are started that execute your 74In this version, a number of threads are started that execute your
73requests and signal their completion. You don't need thread support 75requests and signal their completion. You don't need thread support
74in perl, and the threads created by this module will not be visible 76in perl, and the threads created by this module will not be visible
75to perl. In the future, this module might make use of the native aio 77to perl. In the future, this module might make use of the native aio
98 poll => 'r', 100 poll => 'r',
99 cb => \&IO::AIO::poll_cb); 101 cb => \&IO::AIO::poll_cb);
100 102
101 # queue the request to open /etc/passwd 103 # queue the request to open /etc/passwd
102 aio_open "/etc/passwd", O_RDONLY, 0, sub { 104 aio_open "/etc/passwd", O_RDONLY, 0, sub {
103 my $fh = $_[0] 105 my $fh = shift
104 or die "error while opening: $!"; 106 or die "error while opening: $!";
105 107
106 # stat'ing filehandles is generally non-blocking 108 # stat'ing filehandles is generally non-blocking
107 my $size = -s $fh; 109 my $size = -s $fh;
108 110
188use strict 'vars'; 190use strict 'vars';
189 191
190use base 'Exporter'; 192use base 'Exporter';
191 193
192BEGIN { 194BEGIN {
193 our $VERSION = '2.1'; 195 our $VERSION = '2.33';
194 196
195 our @AIO_REQ = qw(aio_sendfile aio_read aio_write aio_open aio_close aio_stat 197 our @AIO_REQ = qw(aio_sendfile aio_read aio_write aio_open aio_close aio_stat
196 aio_lstat aio_unlink aio_rmdir aio_readdir aio_scandir aio_symlink 198 aio_lstat aio_unlink aio_rmdir aio_readdir aio_scandir aio_symlink
197 aio_fsync aio_fdatasync aio_readahead aio_rename aio_link aio_move 199 aio_readlink aio_fsync aio_fdatasync aio_readahead aio_rename aio_link
198 aio_copy aio_group aio_nop aio_mknod); 200 aio_move aio_copy aio_group aio_nop aio_mknod aio_load aio_rmtree aio_mkdir);
199 our @EXPORT = (@AIO_REQ, qw(aioreq_pri aioreq_nice)); 201 our @EXPORT = (@AIO_REQ, qw(aioreq_pri aioreq_nice aio_block));
200 our @EXPORT_OK = qw(poll_fileno poll_cb poll_wait flush 202 our @EXPORT_OK = qw(poll_fileno poll_cb poll_wait flush
201 min_parallel max_parallel max_idle 203 min_parallel max_parallel max_idle
202 nreqs nready npending nthreads 204 nreqs nready npending nthreads
203 max_poll_time max_poll_reqs); 205 max_poll_time max_poll_reqs);
204 206
288list. They are the same as used by C<sysopen>. 290list. They are the same as used by C<sysopen>.
289 291
290Likewise, C<$mode> specifies the mode of the newly created file, if it 292Likewise, C<$mode> specifies the mode of the newly created file, if it
291didn't exist and C<O_CREAT> has been given, just like perl's C<sysopen>, 293didn't exist and C<O_CREAT> has been given, just like perl's C<sysopen>,
292except that it is mandatory (i.e. use C<0> if you don't create new files, 294except that it is mandatory (i.e. use C<0> if you don't create new files,
293and C<0666> or C<0777> if you do). 295and C<0666> or C<0777> if you do). Note that the C<$mode> will be modified
296by the umask in effect then the request is being executed, so better never
297change the umask.
294 298
295Example: 299Example:
296 300
297 aio_open "/etc/passwd", O_RDONLY, 0, sub { 301 aio_open "/etc/passwd", O_RDONLY, 0, sub {
298 if ($_[0]) { 302 if ($_[0]) {
417=item aio_symlink $srcpath, $dstpath, $callback->($status) 421=item aio_symlink $srcpath, $dstpath, $callback->($status)
418 422
419Asynchronously create a new symbolic link to the existing object at C<$srcpath> at 423Asynchronously create a new symbolic link to the existing object at C<$srcpath> at
420the path C<$dstpath> and call the callback with the result code. 424the path C<$dstpath> and call the callback with the result code.
421 425
426=item aio_readlink $path, $callback->($link)
427
428Asynchronously read the symlink specified by C<$path> and pass it to
429the callback. If an error occurs, nothing or undef gets passed to the
430callback.
431
422=item aio_rename $srcpath, $dstpath, $callback->($status) 432=item aio_rename $srcpath, $dstpath, $callback->($status)
423 433
424Asynchronously rename the object at C<$srcpath> to C<$dstpath>, just as 434Asynchronously rename the object at C<$srcpath> to C<$dstpath>, just as
425rename(2) and call the callback with the result code. 435rename(2) and call the callback with the result code.
436
437=item aio_mkdir $pathname, $mode, $callback->($status)
438
439Asynchronously mkdir (create) a directory and call the callback with
440the result code. C<$mode> will be modified by the umask at the time the
441request is executed, so do not change your umask.
426 442
427=item aio_rmdir $pathname, $callback->($status) 443=item aio_rmdir $pathname, $callback->($status)
428 444
429Asynchronously rmdir (delete) a directory and call the callback with the 445Asynchronously rmdir (delete) a directory and call the callback with the
430result code. 446result code.
435directory (i.e. opendir + readdir + closedir). The entries will not be 451directory (i.e. opendir + readdir + closedir). The entries will not be
436sorted, and will B<NOT> include the C<.> and C<..> entries. 452sorted, and will B<NOT> include the C<.> and C<..> entries.
437 453
438The callback a single argument which is either C<undef> or an array-ref 454The callback a single argument which is either C<undef> or an array-ref
439with the filenames. 455with the filenames.
456
457=item aio_load $path, $data, $callback->($status)
458
459This is a composite request that tries to fully load the given file into
460memory. Status is the same as with aio_read.
461
462=cut
463
464sub aio_load($$;$) {
465 aio_block {
466 my ($path, undef, $cb) = @_;
467 my $data = \$_[1];
468
469 my $pri = aioreq_pri;
470 my $grp = aio_group $cb;
471
472 aioreq_pri $pri;
473 add $grp aio_open $path, O_RDONLY, 0, sub {
474 my $fh = shift
475 or return $grp->result (-1);
476
477 aioreq_pri $pri;
478 add $grp aio_read $fh, 0, (-s $fh), $$data, 0, sub {
479 $grp->result ($_[0]);
480 };
481 };
482
483 $grp
484 }
485}
440 486
441=item aio_copy $srcpath, $dstpath, $callback->($status) 487=item aio_copy $srcpath, $dstpath, $callback->($status)
442 488
443Try to copy the I<file> (directories not supported as either source or 489Try to copy the I<file> (directories not supported as either source or
444destination) from C<$srcpath> to C<$dstpath> and call the callback with 490destination) from C<$srcpath> to C<$dstpath> and call the callback with
454errors are being ignored. 500errors are being ignored.
455 501
456=cut 502=cut
457 503
458sub aio_copy($$;$) { 504sub aio_copy($$;$) {
505 aio_block {
459 my ($src, $dst, $cb) = @_; 506 my ($src, $dst, $cb) = @_;
460 507
461 my $pri = aioreq_pri; 508 my $pri = aioreq_pri;
462 my $grp = aio_group $cb; 509 my $grp = aio_group $cb;
463 510
464 aioreq_pri $pri; 511 aioreq_pri $pri;
465 add $grp aio_open $src, O_RDONLY, 0, sub { 512 add $grp aio_open $src, O_RDONLY, 0, sub {
466 if (my $src_fh = $_[0]) { 513 if (my $src_fh = $_[0]) {
467 my @stat = stat $src_fh; 514 my @stat = stat $src_fh;
468 515
469 aioreq_pri $pri; 516 aioreq_pri $pri;
470 add $grp aio_open $dst, O_CREAT | O_WRONLY | O_TRUNC, 0200, sub { 517 add $grp aio_open $dst, O_CREAT | O_WRONLY | O_TRUNC, 0200, sub {
471 if (my $dst_fh = $_[0]) { 518 if (my $dst_fh = $_[0]) {
472 aioreq_pri $pri; 519 aioreq_pri $pri;
473 add $grp aio_sendfile $dst_fh, $src_fh, 0, $stat[7], sub { 520 add $grp aio_sendfile $dst_fh, $src_fh, 0, $stat[7], sub {
474 if ($_[0] == $stat[7]) { 521 if ($_[0] == $stat[7]) {
475 $grp->result (0); 522 $grp->result (0);
476 close $src_fh; 523 close $src_fh;
477 524
478 # those should not normally block. should. should. 525 # those should not normally block. should. should.
479 utime $stat[8], $stat[9], $dst; 526 utime $stat[8], $stat[9], $dst;
480 chmod $stat[2] & 07777, $dst_fh; 527 chmod $stat[2] & 07777, $dst_fh;
481 chown $stat[4], $stat[5], $dst_fh; 528 chown $stat[4], $stat[5], $dst_fh;
482 close $dst_fh; 529 close $dst_fh;
483 } else { 530 } else {
484 $grp->result (-1); 531 $grp->result (-1);
485 close $src_fh; 532 close $src_fh;
486 close $dst_fh; 533 close $dst_fh;
487 534
488 aioreq $pri; 535 aioreq $pri;
489 add $grp aio_unlink $dst; 536 add $grp aio_unlink $dst;
537 }
490 } 538 };
539 } else {
540 $grp->result (-1);
491 }; 541 }
492 } else {
493 $grp->result (-1);
494 } 542 },
543
544 } else {
545 $grp->result (-1);
495 }, 546 }
496
497 } else {
498 $grp->result (-1);
499 } 547 };
548
549 $grp
500 }; 550 }
501
502 $grp
503} 551}
504 552
505=item aio_move $srcpath, $dstpath, $callback->($status) 553=item aio_move $srcpath, $dstpath, $callback->($status)
506 554
507Try to move the I<file> (directories not supported as either source or 555Try to move the I<file> (directories not supported as either source or
513that is successful, unlinking the C<$srcpath>. 561that is successful, unlinking the C<$srcpath>.
514 562
515=cut 563=cut
516 564
517sub aio_move($$;$) { 565sub aio_move($$;$) {
566 aio_block {
518 my ($src, $dst, $cb) = @_; 567 my ($src, $dst, $cb) = @_;
519 568
520 my $pri = aioreq_pri; 569 my $pri = aioreq_pri;
521 my $grp = aio_group $cb; 570 my $grp = aio_group $cb;
522 571
523 aioreq_pri $pri; 572 aioreq_pri $pri;
524 add $grp aio_rename $src, $dst, sub { 573 add $grp aio_rename $src, $dst, sub {
525 if ($_[0] && $! == EXDEV) { 574 if ($_[0] && $! == EXDEV) {
526 aioreq_pri $pri; 575 aioreq_pri $pri;
527 add $grp aio_copy $src, $dst, sub { 576 add $grp aio_copy $src, $dst, sub {
577 $grp->result ($_[0]);
578
579 if (!$_[0]) {
580 aioreq_pri $pri;
581 add $grp aio_unlink $src;
582 }
583 };
584 } else {
528 $grp->result ($_[0]); 585 $grp->result ($_[0]);
529
530 if (!$_[0]) {
531 aioreq_pri $pri;
532 add $grp aio_unlink $src;
533 }
534 }; 586 }
535 } else {
536 $grp->result ($_[0]);
537 } 587 };
588
589 $grp
538 }; 590 }
539
540 $grp
541} 591}
542 592
543=item aio_scandir $path, $maxreq, $callback->($dirs, $nondirs) 593=item aio_scandir $path, $maxreq, $callback->($dirs, $nondirs)
544 594
545Scans a directory (similar to C<aio_readdir>) but additionally tries to 595Scans a directory (similar to C<aio_readdir>) but additionally tries to
592as those tend to return 0 or 1 as link counts, which disables the 642as those tend to return 0 or 1 as link counts, which disables the
593directory counting heuristic. 643directory counting heuristic.
594 644
595=cut 645=cut
596 646
597sub aio_scandir($$$) { 647sub aio_scandir($$;$) {
648 aio_block {
598 my ($path, $maxreq, $cb) = @_; 649 my ($path, $maxreq, $cb) = @_;
599 650
600 my $pri = aioreq_pri; 651 my $pri = aioreq_pri;
601 652
602 my $grp = aio_group $cb; 653 my $grp = aio_group $cb;
603 654
604 $maxreq = 4 if $maxreq <= 0; 655 $maxreq = 4 if $maxreq <= 0;
605 656
606 # stat once 657 # stat once
607 aioreq_pri $pri;
608 add $grp aio_stat $path, sub {
609 return $grp->result () if $_[0];
610 my $now = time;
611 my $hash1 = join ":", (stat _)[0,1,3,7,9];
612
613 # read the directory entries
614 aioreq_pri $pri; 658 aioreq_pri $pri;
615 add $grp aio_readdir $path, sub { 659 add $grp aio_stat $path, sub {
616 my $entries = shift
617 or return $grp->result (); 660 return $grp->result () if $_[0];
661 my $now = time;
662 my $hash1 = join ":", (stat _)[0,1,3,7,9];
618 663
619 # stat the dir another time 664 # read the directory entries
620 aioreq_pri $pri; 665 aioreq_pri $pri;
666 add $grp aio_readdir $path, sub {
667 my $entries = shift
668 or return $grp->result ();
669
670 # stat the dir another time
671 aioreq_pri $pri;
621 add $grp aio_stat $path, sub { 672 add $grp aio_stat $path, sub {
622 my $hash2 = join ":", (stat _)[0,1,3,7,9]; 673 my $hash2 = join ":", (stat _)[0,1,3,7,9];
623 674
624 my $ndirs; 675 my $ndirs;
625 676
626 # take the slow route if anything looks fishy 677 # take the slow route if anything looks fishy
627 if ($hash1 ne $hash2 or (stat _)[9] == $now) { 678 if ($hash1 ne $hash2 or (stat _)[9] == $now) {
628 $ndirs = -1; 679 $ndirs = -1;
629 } else { 680 } else {
630 # if nlink == 2, we are finished 681 # if nlink == 2, we are finished
631 # on non-posix-fs's, we rely on nlink < 2 682 # on non-posix-fs's, we rely on nlink < 2
632 $ndirs = (stat _)[3] - 2 683 $ndirs = (stat _)[3] - 2
633 or return $grp->result ([], $entries); 684 or return $grp->result ([], $entries);
634 } 685 }
635 686
636 # sort into likely dirs and likely nondirs 687 # sort into likely dirs and likely nondirs
637 # dirs == files without ".", short entries first 688 # dirs == files without ".", short entries first
638 $entries = [map $_->[0], 689 $entries = [map $_->[0],
639 sort { $b->[1] cmp $a->[1] } 690 sort { $b->[1] cmp $a->[1] }
640 map [$_, sprintf "%s%04d", (/.\./ ? "1" : "0"), length], 691 map [$_, sprintf "%s%04d", (/.\./ ? "1" : "0"), length],
641 @$entries]; 692 @$entries];
642 693
643 my (@dirs, @nondirs); 694 my (@dirs, @nondirs);
644 695
645 my $statgrp = add $grp aio_group sub { 696 my $statgrp = add $grp aio_group sub {
646 $grp->result (\@dirs, \@nondirs); 697 $grp->result (\@dirs, \@nondirs);
647 }; 698 };
648 699
649 limit $statgrp $maxreq; 700 limit $statgrp $maxreq;
650 feed $statgrp sub { 701 feed $statgrp sub {
651 return unless @$entries; 702 return unless @$entries;
652 my $entry = pop @$entries; 703 my $entry = pop @$entries;
653 704
654 aioreq_pri $pri; 705 aioreq_pri $pri;
655 add $statgrp aio_stat "$path/$entry/.", sub { 706 add $statgrp aio_stat "$path/$entry/.", sub {
656 if ($_[0] < 0) { 707 if ($_[0] < 0) {
657 push @nondirs, $entry; 708 push @nondirs, $entry;
658 } else { 709 } else {
659 # need to check for real directory 710 # need to check for real directory
660 aioreq_pri $pri; 711 aioreq_pri $pri;
661 add $statgrp aio_lstat "$path/$entry", sub { 712 add $statgrp aio_lstat "$path/$entry", sub {
662 if (-d _) { 713 if (-d _) {
663 push @dirs, $entry; 714 push @dirs, $entry;
664 715
665 unless (--$ndirs) { 716 unless (--$ndirs) {
666 push @nondirs, @$entries; 717 push @nondirs, @$entries;
667 feed $statgrp; 718 feed $statgrp;
719 }
720 } else {
721 push @nondirs, $entry;
668 } 722 }
669 } else {
670 push @nondirs, $entry;
671 } 723 }
672 } 724 }
673 } 725 };
674 }; 726 };
675 }; 727 };
676 }; 728 };
677 }; 729 };
730
731 $grp
678 }; 732 }
733}
679 734
735=item aio_rmtree $path, $callback->($status)
736
737Delete a directory tree starting (and including) C<$path>, return the
738status of the final C<rmdir> only. This is a composite request that
739uses C<aio_scandir> to recurse into and rmdir directories, and unlink
740everything else.
741
742=cut
743
744sub aio_rmtree;
745sub aio_rmtree($;$) {
746 aio_block {
747 my ($path, $cb) = @_;
748
749 my $pri = aioreq_pri;
750 my $grp = aio_group $cb;
751
752 aioreq_pri $pri;
753 add $grp aio_scandir $path, 0, sub {
754 my ($dirs, $nondirs) = @_;
755
756 my $dirgrp = aio_group sub {
757 add $grp aio_rmdir $path, sub {
758 $grp->result ($_[0]);
759 };
760 };
761
762 (aioreq_pri $pri), add $dirgrp aio_rmtree "$path/$_" for @$dirs;
763 (aioreq_pri $pri), add $dirgrp aio_unlink "$path/$_" for @$nondirs;
764
765 add $grp $dirgrp;
766 };
767
680 $grp 768 $grp
769 }
681} 770}
682 771
683=item aio_fsync $fh, $callback->($status) 772=item aio_fsync $fh, $callback->($status)
684 773
685Asynchronously call fsync on the given filehandle and call the callback 774Asynchronously call fsync on the given filehandle and call the callback
964 poll => 'r', nice => 1, 1053 poll => 'r', nice => 1,
965 cb => &IO::AIO::poll_cb); 1054 cb => &IO::AIO::poll_cb);
966 1055
967=item IO::AIO::poll_wait 1056=item IO::AIO::poll_wait
968 1057
1058If there are any outstanding requests and none of them in the result
969Wait till the result filehandle becomes ready for reading (simply does a 1059phase, wait till the result filehandle becomes ready for reading (simply
970C<select> on the filehandle. This is useful if you want to synchronously 1060does a C<select> on the filehandle. This is useful if you want to
971wait for some requests to finish). 1061synchronously wait for some requests to finish).
972 1062
973See C<nreqs> for an example. 1063See C<nreqs> for an example.
974 1064
975=item IO::AIO::poll 1065=item IO::AIO::poll
976 1066
977Waits until some requests have been handled. 1067Waits until some requests have been handled.
978 1068
1069Returns the number of requests processed, but is otherwise strictly
979Strictly equivalent to: 1070equivalent to:
980 1071
981 IO::AIO::poll_wait, IO::AIO::poll_cb 1072 IO::AIO::poll_wait, IO::AIO::poll_cb
982 if IO::AIO::nreqs;
983 1073
984=item IO::AIO::flush 1074=item IO::AIO::flush
985 1075
986Wait till all outstanding AIO requests have been handled. 1076Wait till all outstanding AIO requests have been handled.
987 1077
988Strictly equivalent to: 1078Strictly equivalent to:
989 1079
990 IO::AIO::poll_wait, IO::AIO::poll_cb 1080 IO::AIO::poll_wait, IO::AIO::poll_cb
991 while IO::AIO::nreqs; 1081 while IO::AIO::nreqs;
992 1082
1083=back
1084
993=head3 CONTROLLING THE NUMBER OF THREADS 1085=head3 CONTROLLING THE NUMBER OF THREADS
1086
1087=over
994 1088
995=item IO::AIO::min_parallel $nthreads 1089=item IO::AIO::min_parallel $nthreads
996 1090
997Set the minimum number of AIO threads to C<$nthreads>. The current 1091Set the minimum number of AIO threads to C<$nthreads>. The current
998default is C<8>, which means eight asynchronous operations can execute 1092default is C<8>, which means eight asynchronous operations can execute
1057 1151
1058You can still queue as many requests as you want. Therefore, 1152You can still queue as many requests as you want. Therefore,
1059C<max_oustsanding> is mainly useful in simple scripts (with low values) or 1153C<max_oustsanding> is mainly useful in simple scripts (with low values) or
1060as a stop gap to shield against fatal memory overflow (with large values). 1154as a stop gap to shield against fatal memory overflow (with large values).
1061 1155
1156=back
1157
1062=head3 STATISTICAL INFORMATION 1158=head3 STATISTICAL INFORMATION
1159
1160=over
1063 1161
1064=item IO::AIO::nreqs 1162=item IO::AIO::nreqs
1065 1163
1066Returns the number of requests currently in the ready, execute or pending 1164Returns the number of requests currently in the ready, execute or pending
1067states (i.e. for which their callback has not been invoked yet). 1165states (i.e. for which their callback has not been invoked yet).
1101 *$sym 1199 *$sym
1102} 1200}
1103 1201
1104min_parallel 8; 1202min_parallel 8;
1105 1203
1106END { 1204END { flush }
1107 min_parallel 1;
1108 flush;
1109};
1110 1205
11111; 12061;
1112 1207
1113=head2 FORK BEHAVIOUR 1208=head2 FORK BEHAVIOUR
1114 1209

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines