ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/IO-AIO/AIO.pm
(Generate patch)

Comparing IO-AIO/AIO.pm (file contents):
Revision 1.87 by root, Sun Oct 29 00:52:02 2006 UTC vs.
Revision 1.99 by root, Sun Jan 7 21:32:20 2007 UTC

5=head1 SYNOPSIS 5=head1 SYNOPSIS
6 6
7 use IO::AIO; 7 use IO::AIO;
8 8
9 aio_open "/etc/passwd", O_RDONLY, 0, sub { 9 aio_open "/etc/passwd", O_RDONLY, 0, sub {
10 my ($fh) = @_; 10 my $fh = shift
11 or die "/etc/passwd: $!";
11 ... 12 ...
12 }; 13 };
13 14
14 aio_unlink "/tmp/file", sub { }; 15 aio_unlink "/tmp/file", sub { };
15 16
61etc.), but can also be used to easily do operations in parallel that are 62etc.), but can also be used to easily do operations in parallel that are
62normally done sequentially, e.g. stat'ing many files, which is much faster 63normally done sequentially, e.g. stat'ing many files, which is much faster
63on a RAID volume or over NFS when you do a number of stat operations 64on a RAID volume or over NFS when you do a number of stat operations
64concurrently. 65concurrently.
65 66
66While this works on all types of file descriptors (for example sockets), 67While most of this works on all types of file descriptors (for example
67using these functions on file descriptors that support nonblocking 68sockets), using these functions on file descriptors that support
68operation (again, sockets, pipes etc.) is very inefficient. Use an event 69nonblocking operation (again, sockets, pipes etc.) is very inefficient or
70might not work (aio_read fails on sockets/pipes/fifos). Use an event loop
69loop for that (such as the L<Event|Event> module): IO::AIO will naturally 71for that (such as the L<Event|Event> module): IO::AIO will naturally fit
70fit into such an event loop itself. 72into such an event loop itself.
71 73
72In this version, a number of threads are started that execute your 74In this version, a number of threads are started that execute your
73requests and signal their completion. You don't need thread support 75requests and signal their completion. You don't need thread support
74in perl, and the threads created by this module will not be visible 76in perl, and the threads created by this module will not be visible
75to perl. In the future, this module might make use of the native aio 77to perl. In the future, this module might make use of the native aio
98 poll => 'r', 100 poll => 'r',
99 cb => \&IO::AIO::poll_cb); 101 cb => \&IO::AIO::poll_cb);
100 102
101 # queue the request to open /etc/passwd 103 # queue the request to open /etc/passwd
102 aio_open "/etc/passwd", O_RDONLY, 0, sub { 104 aio_open "/etc/passwd", O_RDONLY, 0, sub {
103 my $fh = $_[0] 105 my $fh = shift
104 or die "error while opening: $!"; 106 or die "error while opening: $!";
105 107
106 # stat'ing filehandles is generally non-blocking 108 # stat'ing filehandles is generally non-blocking
107 my $size = -s $fh; 109 my $size = -s $fh;
108 110
176Request has reached the end of its lifetime and holds no resources anymore 178Request has reached the end of its lifetime and holds no resources anymore
177(except possibly for the Perl object, but its connection to the actual 179(except possibly for the Perl object, but its connection to the actual
178aio request is severed and calling its methods will either do nothing or 180aio request is severed and calling its methods will either do nothing or
179result in a runtime error). 181result in a runtime error).
180 182
183=back
184
181=cut 185=cut
182 186
183package IO::AIO; 187package IO::AIO;
184 188
185no warnings; 189no warnings;
186use strict 'vars'; 190use strict 'vars';
187 191
188use base 'Exporter'; 192use base 'Exporter';
189 193
190BEGIN { 194BEGIN {
191 our $VERSION = '2.1'; 195 our $VERSION = '2.32';
192 196
193 our @AIO_REQ = qw(aio_sendfile aio_read aio_write aio_open aio_close aio_stat 197 our @AIO_REQ = qw(aio_sendfile aio_read aio_write aio_open aio_close aio_stat
194 aio_lstat aio_unlink aio_rmdir aio_readdir aio_scandir aio_symlink 198 aio_lstat aio_unlink aio_rmdir aio_readdir aio_scandir aio_symlink
195 aio_fsync aio_fdatasync aio_readahead aio_rename aio_link aio_move 199 aio_readlink aio_fsync aio_fdatasync aio_readahead aio_rename aio_link
196 aio_copy aio_group aio_nop aio_mknod); 200 aio_move aio_copy aio_group aio_nop aio_mknod aio_load aio_rmtree);
197 our @EXPORT = (@AIO_REQ, qw(aioreq_pri aioreq_nice)); 201 our @EXPORT = (@AIO_REQ, qw(aioreq_pri aioreq_nice aio_block));
198 our @EXPORT_OK = qw(poll_fileno poll_cb poll_wait flush 202 our @EXPORT_OK = qw(poll_fileno poll_cb poll_wait flush
199 min_parallel max_parallel max_idle 203 min_parallel max_parallel max_idle
200 nreqs nready npending nthreads 204 nreqs nready npending nthreads
201 max_poll_time max_poll_reqs); 205 max_poll_time max_poll_reqs);
202 206
415=item aio_symlink $srcpath, $dstpath, $callback->($status) 419=item aio_symlink $srcpath, $dstpath, $callback->($status)
416 420
417Asynchronously create a new symbolic link to the existing object at C<$srcpath> at 421Asynchronously create a new symbolic link to the existing object at C<$srcpath> at
418the path C<$dstpath> and call the callback with the result code. 422the path C<$dstpath> and call the callback with the result code.
419 423
424=item aio_readlink $path, $callback->($link)
425
426Asynchronously read the symlink specified by C<$path> and pass it to
427the callback. If an error occurs, nothing or undef gets passed to the
428callback.
429
420=item aio_rename $srcpath, $dstpath, $callback->($status) 430=item aio_rename $srcpath, $dstpath, $callback->($status)
421 431
422Asynchronously rename the object at C<$srcpath> to C<$dstpath>, just as 432Asynchronously rename the object at C<$srcpath> to C<$dstpath>, just as
423rename(2) and call the callback with the result code. 433rename(2) and call the callback with the result code.
424 434
433directory (i.e. opendir + readdir + closedir). The entries will not be 443directory (i.e. opendir + readdir + closedir). The entries will not be
434sorted, and will B<NOT> include the C<.> and C<..> entries. 444sorted, and will B<NOT> include the C<.> and C<..> entries.
435 445
436The callback a single argument which is either C<undef> or an array-ref 446The callback a single argument which is either C<undef> or an array-ref
437with the filenames. 447with the filenames.
448
449=item aio_load $path, $data, $callback->($status)
450
451This is a composite request that tries to fully load the given file into
452memory. Status is the same as with aio_read.
453
454=cut
455
456sub aio_load($$;$) {
457 aio_block {
458 my ($path, undef, $cb) = @_;
459 my $data = \$_[1];
460
461 my $pri = aioreq_pri;
462 my $grp = aio_group $cb;
463
464 aioreq_pri $pri;
465 add $grp aio_open $path, O_RDONLY, 0, sub {
466 my ($fh) = @_
467 or return $grp->result (-1);
468
469 aioreq_pri $pri;
470 add $grp aio_read $fh, 0, (-s $fh), $$data, 0, sub {
471 $grp->result ($_[0]);
472 };
473 };
474
475 $grp
476 }
477}
438 478
439=item aio_copy $srcpath, $dstpath, $callback->($status) 479=item aio_copy $srcpath, $dstpath, $callback->($status)
440 480
441Try to copy the I<file> (directories not supported as either source or 481Try to copy the I<file> (directories not supported as either source or
442destination) from C<$srcpath> to C<$dstpath> and call the callback with 482destination) from C<$srcpath> to C<$dstpath> and call the callback with
452errors are being ignored. 492errors are being ignored.
453 493
454=cut 494=cut
455 495
456sub aio_copy($$;$) { 496sub aio_copy($$;$) {
497 aio_block {
457 my ($src, $dst, $cb) = @_; 498 my ($src, $dst, $cb) = @_;
458 499
459 my $pri = aioreq_pri; 500 my $pri = aioreq_pri;
460 my $grp = aio_group $cb; 501 my $grp = aio_group $cb;
461 502
462 aioreq_pri $pri; 503 aioreq_pri $pri;
463 add $grp aio_open $src, O_RDONLY, 0, sub { 504 add $grp aio_open $src, O_RDONLY, 0, sub {
464 if (my $src_fh = $_[0]) { 505 if (my $src_fh = $_[0]) {
465 my @stat = stat $src_fh; 506 my @stat = stat $src_fh;
466 507
467 aioreq_pri $pri; 508 aioreq_pri $pri;
468 add $grp aio_open $dst, O_CREAT | O_WRONLY | O_TRUNC, 0200, sub { 509 add $grp aio_open $dst, O_CREAT | O_WRONLY | O_TRUNC, 0200, sub {
469 if (my $dst_fh = $_[0]) { 510 if (my $dst_fh = $_[0]) {
470 aioreq_pri $pri; 511 aioreq_pri $pri;
471 add $grp aio_sendfile $dst_fh, $src_fh, 0, $stat[7], sub { 512 add $grp aio_sendfile $dst_fh, $src_fh, 0, $stat[7], sub {
472 if ($_[0] == $stat[7]) { 513 if ($_[0] == $stat[7]) {
473 $grp->result (0); 514 $grp->result (0);
474 close $src_fh; 515 close $src_fh;
475 516
476 # those should not normally block. should. should. 517 # those should not normally block. should. should.
477 utime $stat[8], $stat[9], $dst; 518 utime $stat[8], $stat[9], $dst;
478 chmod $stat[2] & 07777, $dst_fh; 519 chmod $stat[2] & 07777, $dst_fh;
479 chown $stat[4], $stat[5], $dst_fh; 520 chown $stat[4], $stat[5], $dst_fh;
480 close $dst_fh; 521 close $dst_fh;
481 } else { 522 } else {
482 $grp->result (-1); 523 $grp->result (-1);
483 close $src_fh; 524 close $src_fh;
484 close $dst_fh; 525 close $dst_fh;
485 526
486 aioreq $pri; 527 aioreq $pri;
487 add $grp aio_unlink $dst; 528 add $grp aio_unlink $dst;
529 }
488 } 530 };
531 } else {
532 $grp->result (-1);
489 }; 533 }
490 } else {
491 $grp->result (-1);
492 } 534 },
535
536 } else {
537 $grp->result (-1);
493 }, 538 }
494
495 } else {
496 $grp->result (-1);
497 } 539 };
540
541 $grp
498 }; 542 }
499
500 $grp
501} 543}
502 544
503=item aio_move $srcpath, $dstpath, $callback->($status) 545=item aio_move $srcpath, $dstpath, $callback->($status)
504 546
505Try to move the I<file> (directories not supported as either source or 547Try to move the I<file> (directories not supported as either source or
511that is successful, unlinking the C<$srcpath>. 553that is successful, unlinking the C<$srcpath>.
512 554
513=cut 555=cut
514 556
515sub aio_move($$;$) { 557sub aio_move($$;$) {
558 aio_block {
516 my ($src, $dst, $cb) = @_; 559 my ($src, $dst, $cb) = @_;
517 560
518 my $pri = aioreq_pri; 561 my $pri = aioreq_pri;
519 my $grp = aio_group $cb; 562 my $grp = aio_group $cb;
520 563
521 aioreq_pri $pri; 564 aioreq_pri $pri;
522 add $grp aio_rename $src, $dst, sub { 565 add $grp aio_rename $src, $dst, sub {
523 if ($_[0] && $! == EXDEV) { 566 if ($_[0] && $! == EXDEV) {
524 aioreq_pri $pri; 567 aioreq_pri $pri;
525 add $grp aio_copy $src, $dst, sub { 568 add $grp aio_copy $src, $dst, sub {
569 $grp->result ($_[0]);
570
571 if (!$_[0]) {
572 aioreq_pri $pri;
573 add $grp aio_unlink $src;
574 }
575 };
576 } else {
526 $grp->result ($_[0]); 577 $grp->result ($_[0]);
527
528 if (!$_[0]) {
529 aioreq_pri $pri;
530 add $grp aio_unlink $src;
531 }
532 }; 578 }
533 } else {
534 $grp->result ($_[0]);
535 } 579 };
580
581 $grp
536 }; 582 }
537
538 $grp
539} 583}
540 584
541=item aio_scandir $path, $maxreq, $callback->($dirs, $nondirs) 585=item aio_scandir $path, $maxreq, $callback->($dirs, $nondirs)
542 586
543Scans a directory (similar to C<aio_readdir>) but additionally tries to 587Scans a directory (similar to C<aio_readdir>) but additionally tries to
591directory counting heuristic. 635directory counting heuristic.
592 636
593=cut 637=cut
594 638
595sub aio_scandir($$$) { 639sub aio_scandir($$$) {
640 aio_block {
596 my ($path, $maxreq, $cb) = @_; 641 my ($path, $maxreq, $cb) = @_;
597 642
598 my $pri = aioreq_pri; 643 my $pri = aioreq_pri;
599 644
600 my $grp = aio_group $cb; 645 my $grp = aio_group $cb;
601 646
602 $maxreq = 4 if $maxreq <= 0; 647 $maxreq = 4 if $maxreq <= 0;
603 648
604 # stat once 649 # stat once
605 aioreq_pri $pri;
606 add $grp aio_stat $path, sub {
607 return $grp->result () if $_[0];
608 my $now = time;
609 my $hash1 = join ":", (stat _)[0,1,3,7,9];
610
611 # read the directory entries
612 aioreq_pri $pri; 650 aioreq_pri $pri;
613 add $grp aio_readdir $path, sub { 651 add $grp aio_stat $path, sub {
614 my $entries = shift
615 or return $grp->result (); 652 return $grp->result () if $_[0];
653 my $now = time;
654 my $hash1 = join ":", (stat _)[0,1,3,7,9];
616 655
617 # stat the dir another time 656 # read the directory entries
618 aioreq_pri $pri; 657 aioreq_pri $pri;
658 add $grp aio_readdir $path, sub {
659 my $entries = shift
660 or return $grp->result ();
661
662 # stat the dir another time
663 aioreq_pri $pri;
619 add $grp aio_stat $path, sub { 664 add $grp aio_stat $path, sub {
620 my $hash2 = join ":", (stat _)[0,1,3,7,9]; 665 my $hash2 = join ":", (stat _)[0,1,3,7,9];
621 666
622 my $ndirs; 667 my $ndirs;
623 668
624 # take the slow route if anything looks fishy 669 # take the slow route if anything looks fishy
625 if ($hash1 ne $hash2 or (stat _)[9] == $now) { 670 if ($hash1 ne $hash2 or (stat _)[9] == $now) {
626 $ndirs = -1; 671 $ndirs = -1;
627 } else { 672 } else {
628 # if nlink == 2, we are finished 673 # if nlink == 2, we are finished
629 # on non-posix-fs's, we rely on nlink < 2 674 # on non-posix-fs's, we rely on nlink < 2
630 $ndirs = (stat _)[3] - 2 675 $ndirs = (stat _)[3] - 2
631 or return $grp->result ([], $entries); 676 or return $grp->result ([], $entries);
632 } 677 }
633 678
634 # sort into likely dirs and likely nondirs 679 # sort into likely dirs and likely nondirs
635 # dirs == files without ".", short entries first 680 # dirs == files without ".", short entries first
636 $entries = [map $_->[0], 681 $entries = [map $_->[0],
637 sort { $b->[1] cmp $a->[1] } 682 sort { $b->[1] cmp $a->[1] }
638 map [$_, sprintf "%s%04d", (/.\./ ? "1" : "0"), length], 683 map [$_, sprintf "%s%04d", (/.\./ ? "1" : "0"), length],
639 @$entries]; 684 @$entries];
640 685
641 my (@dirs, @nondirs); 686 my (@dirs, @nondirs);
642 687
643 my $statgrp = add $grp aio_group sub { 688 my $statgrp = add $grp aio_group sub {
644 $grp->result (\@dirs, \@nondirs); 689 $grp->result (\@dirs, \@nondirs);
645 }; 690 };
646 691
647 limit $statgrp $maxreq; 692 limit $statgrp $maxreq;
648 feed $statgrp sub { 693 feed $statgrp sub {
649 return unless @$entries; 694 return unless @$entries;
650 my $entry = pop @$entries; 695 my $entry = pop @$entries;
651 696
652 aioreq_pri $pri; 697 aioreq_pri $pri;
653 add $statgrp aio_stat "$path/$entry/.", sub { 698 add $statgrp aio_stat "$path/$entry/.", sub {
654 if ($_[0] < 0) { 699 if ($_[0] < 0) {
655 push @nondirs, $entry; 700 push @nondirs, $entry;
656 } else { 701 } else {
657 # need to check for real directory 702 # need to check for real directory
658 aioreq_pri $pri; 703 aioreq_pri $pri;
659 add $statgrp aio_lstat "$path/$entry", sub { 704 add $statgrp aio_lstat "$path/$entry", sub {
660 if (-d _) { 705 if (-d _) {
661 push @dirs, $entry; 706 push @dirs, $entry;
662 707
663 unless (--$ndirs) { 708 unless (--$ndirs) {
664 push @nondirs, @$entries; 709 push @nondirs, @$entries;
665 feed $statgrp; 710 feed $statgrp;
711 }
712 } else {
713 push @nondirs, $entry;
666 } 714 }
667 } else {
668 push @nondirs, $entry;
669 } 715 }
670 } 716 }
671 } 717 };
672 }; 718 };
673 }; 719 };
674 }; 720 };
675 }; 721 };
722
723 $grp
676 }; 724 }
725}
677 726
727=item aio_rmtree $path, $callback->($status)
728
729Delete a directory tree starting (and including) C<$path>, return the status of the final C<rmdir> only.
730This is a composite request that uses C<aio_scandir> to recurse into and rmdir directories, and
731unlink everything else.
732
733=cut
734
735sub aio_rmtree;
736sub aio_rmtree {
737 aio_block {
738 my ($path, $cb) = @_;
739
740 my $pri = aioreq_pri;
741 my $grp = aio_group $cb;
742
743 aioreq_pri $pri;
744 add $grp aio_scandir $path, 0, sub {
745 my ($dirs, $nondirs) = @_;
746
747 my $dirgrp = aio_group sub {
748 add $grp aio_rmdir $path, sub {
749 $grp->result ($_[0]);
750 };
751 };
752
753 (aioreq_pri $pri), add $dirgrp aio_rmtree "$path/$_" for @$dirs;
754 (aioreq_pri $pri), add $dirgrp aio_unlink "$path/$_" for @$nondirs;
755
756 add $grp $dirgrp;
757 };
758
678 $grp 759 $grp
760 }
679} 761}
680 762
681=item aio_fsync $fh, $callback->($status) 763=item aio_fsync $fh, $callback->($status)
682 764
683Asynchronously call fsync on the given filehandle and call the callback 765Asynchronously call fsync on the given filehandle and call the callback
937that are being processed by C<IO::AIO::poll_cb> in one call, respectively 1019that are being processed by C<IO::AIO::poll_cb> in one call, respectively
938the maximum amount of time (default C<0>, meaning infinity) spent in 1020the maximum amount of time (default C<0>, meaning infinity) spent in
939C<IO::AIO::poll_cb> to process requests (more correctly the mininum amount 1021C<IO::AIO::poll_cb> to process requests (more correctly the mininum amount
940of time C<poll_cb> is allowed to use). 1022of time C<poll_cb> is allowed to use).
941 1023
1024Setting C<max_poll_time> to a non-zero value creates an overhead of one
1025syscall per request processed, which is not normally a problem unless your
1026callbacks are really really fast or your OS is really really slow (I am
1027not mentioning Solaris here). Using C<max_poll_reqs> incurs no overhead.
1028
942Setting these is useful if you want to ensure some level of 1029Setting these is useful if you want to ensure some level of
943interactiveness when perl is not fast enough to process all requests in 1030interactiveness when perl is not fast enough to process all requests in
944time. 1031time.
945 1032
946For interactive programs, values such as C<0.01> to C<0.1> should be fine. 1033For interactive programs, values such as C<0.01> to C<0.1> should be fine.
947 1034
948Example: Install an Event watcher that automatically calls 1035Example: Install an Event watcher that automatically calls
949IO::AIO::poll_some with low priority, to ensure that other parts of the 1036IO::AIO::poll_cb with low priority, to ensure that other parts of the
950program get the CPU sometimes even under high AIO load. 1037program get the CPU sometimes even under high AIO load.
951 1038
952 # try not to spend much more than 0.1s in poll_cb 1039 # try not to spend much more than 0.1s in poll_cb
953 IO::AIO::max_poll_time 0.1; 1040 IO::AIO::max_poll_time 0.1;
954 1041
957 poll => 'r', nice => 1, 1044 poll => 'r', nice => 1,
958 cb => &IO::AIO::poll_cb); 1045 cb => &IO::AIO::poll_cb);
959 1046
960=item IO::AIO::poll_wait 1047=item IO::AIO::poll_wait
961 1048
1049If there are any outstanding requests and none of them in the result
962Wait till the result filehandle becomes ready for reading (simply does a 1050phase, wait till the result filehandle becomes ready for reading (simply
963C<select> on the filehandle. This is useful if you want to synchronously 1051does a C<select> on the filehandle. This is useful if you want to
964wait for some requests to finish). 1052synchronously wait for some requests to finish).
965 1053
966See C<nreqs> for an example. 1054See C<nreqs> for an example.
967 1055
968=item IO::AIO::poll 1056=item IO::AIO::poll
969 1057
970Waits until some requests have been handled. 1058Waits until some requests have been handled.
971 1059
1060Returns the number of requests processed, but is otherwise strictly
972Strictly equivalent to: 1061equivalent to:
973 1062
974 IO::AIO::poll_wait, IO::AIO::poll_cb 1063 IO::AIO::poll_wait, IO::AIO::poll_cb
975 if IO::AIO::nreqs;
976 1064
977=item IO::AIO::flush 1065=item IO::AIO::flush
978 1066
979Wait till all outstanding AIO requests have been handled. 1067Wait till all outstanding AIO requests have been handled.
980 1068
1094 *$sym 1182 *$sym
1095} 1183}
1096 1184
1097min_parallel 8; 1185min_parallel 8;
1098 1186
1099END { 1187END { flush }
1100 min_parallel 1;
1101 flush;
1102};
1103 1188
11041; 11891;
1105 1190
1106=head2 FORK BEHAVIOUR 1191=head2 FORK BEHAVIOUR
1107 1192

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines