ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/IO-AIO/AIO.pm
(Generate patch)

Comparing IO-AIO/AIO.pm (file contents):
Revision 1.92 by root, Wed Nov 8 01:57:42 2006 UTC vs.
Revision 1.96 by root, Fri Dec 22 04:05:50 2006 UTC

5=head1 SYNOPSIS 5=head1 SYNOPSIS
6 6
7 use IO::AIO; 7 use IO::AIO;
8 8
9 aio_open "/etc/passwd", O_RDONLY, 0, sub { 9 aio_open "/etc/passwd", O_RDONLY, 0, sub {
10 my ($fh) = @_; 10 my $fh = shift
11 or die "/etc/passwd: $!";
11 ... 12 ...
12 }; 13 };
13 14
14 aio_unlink "/tmp/file", sub { }; 15 aio_unlink "/tmp/file", sub { };
15 16
99 poll => 'r', 100 poll => 'r',
100 cb => \&IO::AIO::poll_cb); 101 cb => \&IO::AIO::poll_cb);
101 102
102 # queue the request to open /etc/passwd 103 # queue the request to open /etc/passwd
103 aio_open "/etc/passwd", O_RDONLY, 0, sub { 104 aio_open "/etc/passwd", O_RDONLY, 0, sub {
104 my $fh = $_[0] 105 my $fh = shift
105 or die "error while opening: $!"; 106 or die "error while opening: $!";
106 107
107 # stat'ing filehandles is generally non-blocking 108 # stat'ing filehandles is generally non-blocking
108 my $size = -s $fh; 109 my $size = -s $fh;
109 110
189use strict 'vars'; 190use strict 'vars';
190 191
191use base 'Exporter'; 192use base 'Exporter';
192 193
193BEGIN { 194BEGIN {
194 our $VERSION = '2.2'; 195 our $VERSION = '2.21';
195 196
196 our @AIO_REQ = qw(aio_sendfile aio_read aio_write aio_open aio_close aio_stat 197 our @AIO_REQ = qw(aio_sendfile aio_read aio_write aio_open aio_close aio_stat
197 aio_lstat aio_unlink aio_rmdir aio_readdir aio_scandir aio_symlink 198 aio_lstat aio_unlink aio_rmdir aio_readdir aio_scandir aio_symlink
198 aio_readlink aio_fsync aio_fdatasync aio_readahead aio_rename aio_link 199 aio_readlink aio_fsync aio_fdatasync aio_readahead aio_rename aio_link
199 aio_move aio_copy aio_group aio_nop aio_mknod); 200 aio_move aio_copy aio_group aio_nop aio_mknod);
200 our @EXPORT = (@AIO_REQ, qw(aioreq_pri aioreq_nice)); 201 our @EXPORT = (@AIO_REQ, qw(aioreq_pri aioreq_nice aio_block));
201 our @EXPORT_OK = qw(poll_fileno poll_cb poll_wait flush 202 our @EXPORT_OK = qw(poll_fileno poll_cb poll_wait flush
202 min_parallel max_parallel max_idle 203 min_parallel max_parallel max_idle
203 nreqs nready npending nthreads 204 nreqs nready npending nthreads
204 max_poll_time max_poll_reqs); 205 max_poll_time max_poll_reqs);
205 206
461errors are being ignored. 462errors are being ignored.
462 463
463=cut 464=cut
464 465
465sub aio_copy($$;$) { 466sub aio_copy($$;$) {
467 aio_block {
466 my ($src, $dst, $cb) = @_; 468 my ($src, $dst, $cb) = @_;
467 469
468 my $pri = aioreq_pri; 470 my $pri = aioreq_pri;
469 my $grp = aio_group $cb; 471 my $grp = aio_group $cb;
470 472
471 aioreq_pri $pri; 473 aioreq_pri $pri;
472 add $grp aio_open $src, O_RDONLY, 0, sub { 474 add $grp aio_open $src, O_RDONLY, 0, sub {
473 if (my $src_fh = $_[0]) { 475 if (my $src_fh = $_[0]) {
474 my @stat = stat $src_fh; 476 my @stat = stat $src_fh;
475 477
476 aioreq_pri $pri; 478 aioreq_pri $pri;
477 add $grp aio_open $dst, O_CREAT | O_WRONLY | O_TRUNC, 0200, sub { 479 add $grp aio_open $dst, O_CREAT | O_WRONLY | O_TRUNC, 0200, sub {
478 if (my $dst_fh = $_[0]) { 480 if (my $dst_fh = $_[0]) {
479 aioreq_pri $pri; 481 aioreq_pri $pri;
480 add $grp aio_sendfile $dst_fh, $src_fh, 0, $stat[7], sub { 482 add $grp aio_sendfile $dst_fh, $src_fh, 0, $stat[7], sub {
481 if ($_[0] == $stat[7]) { 483 if ($_[0] == $stat[7]) {
482 $grp->result (0); 484 $grp->result (0);
483 close $src_fh; 485 close $src_fh;
484 486
485 # those should not normally block. should. should. 487 # those should not normally block. should. should.
486 utime $stat[8], $stat[9], $dst; 488 utime $stat[8], $stat[9], $dst;
487 chmod $stat[2] & 07777, $dst_fh; 489 chmod $stat[2] & 07777, $dst_fh;
488 chown $stat[4], $stat[5], $dst_fh; 490 chown $stat[4], $stat[5], $dst_fh;
489 close $dst_fh; 491 close $dst_fh;
490 } else { 492 } else {
491 $grp->result (-1); 493 $grp->result (-1);
492 close $src_fh; 494 close $src_fh;
493 close $dst_fh; 495 close $dst_fh;
494 496
495 aioreq $pri; 497 aioreq $pri;
496 add $grp aio_unlink $dst; 498 add $grp aio_unlink $dst;
499 }
497 } 500 };
501 } else {
502 $grp->result (-1);
498 }; 503 }
499 } else {
500 $grp->result (-1);
501 } 504 },
505
506 } else {
507 $grp->result (-1);
502 }, 508 }
503
504 } else {
505 $grp->result (-1);
506 } 509 };
510
511 $grp
507 }; 512 }
508
509 $grp
510} 513}
511 514
512=item aio_move $srcpath, $dstpath, $callback->($status) 515=item aio_move $srcpath, $dstpath, $callback->($status)
513 516
514Try to move the I<file> (directories not supported as either source or 517Try to move the I<file> (directories not supported as either source or
520that is successful, unlinking the C<$srcpath>. 523that is successful, unlinking the C<$srcpath>.
521 524
522=cut 525=cut
523 526
524sub aio_move($$;$) { 527sub aio_move($$;$) {
528 aio_block {
525 my ($src, $dst, $cb) = @_; 529 my ($src, $dst, $cb) = @_;
526 530
527 my $pri = aioreq_pri; 531 my $pri = aioreq_pri;
528 my $grp = aio_group $cb; 532 my $grp = aio_group $cb;
529 533
530 aioreq_pri $pri; 534 aioreq_pri $pri;
531 add $grp aio_rename $src, $dst, sub { 535 add $grp aio_rename $src, $dst, sub {
532 if ($_[0] && $! == EXDEV) { 536 if ($_[0] && $! == EXDEV) {
533 aioreq_pri $pri; 537 aioreq_pri $pri;
534 add $grp aio_copy $src, $dst, sub { 538 add $grp aio_copy $src, $dst, sub {
539 $grp->result ($_[0]);
540
541 if (!$_[0]) {
542 aioreq_pri $pri;
543 add $grp aio_unlink $src;
544 }
545 };
546 } else {
535 $grp->result ($_[0]); 547 $grp->result ($_[0]);
536
537 if (!$_[0]) {
538 aioreq_pri $pri;
539 add $grp aio_unlink $src;
540 }
541 }; 548 }
542 } else {
543 $grp->result ($_[0]);
544 } 549 };
550
551 $grp
545 }; 552 }
546
547 $grp
548} 553}
549 554
550=item aio_scandir $path, $maxreq, $callback->($dirs, $nondirs) 555=item aio_scandir $path, $maxreq, $callback->($dirs, $nondirs)
551 556
552Scans a directory (similar to C<aio_readdir>) but additionally tries to 557Scans a directory (similar to C<aio_readdir>) but additionally tries to
600directory counting heuristic. 605directory counting heuristic.
601 606
602=cut 607=cut
603 608
604sub aio_scandir($$$) { 609sub aio_scandir($$$) {
610 aio_block {
605 my ($path, $maxreq, $cb) = @_; 611 my ($path, $maxreq, $cb) = @_;
606 612
607 my $pri = aioreq_pri; 613 my $pri = aioreq_pri;
608 614
609 my $grp = aio_group $cb; 615 my $grp = aio_group $cb;
610 616
611 $maxreq = 4 if $maxreq <= 0; 617 $maxreq = 4 if $maxreq <= 0;
612 618
613 # stat once 619 # stat once
614 aioreq_pri $pri;
615 add $grp aio_stat $path, sub {
616 return $grp->result () if $_[0];
617 my $now = time;
618 my $hash1 = join ":", (stat _)[0,1,3,7,9];
619
620 # read the directory entries
621 aioreq_pri $pri; 620 aioreq_pri $pri;
622 add $grp aio_readdir $path, sub { 621 add $grp aio_stat $path, sub {
623 my $entries = shift
624 or return $grp->result (); 622 return $grp->result () if $_[0];
623 my $now = time;
624 my $hash1 = join ":", (stat _)[0,1,3,7,9];
625 625
626 # stat the dir another time 626 # read the directory entries
627 aioreq_pri $pri; 627 aioreq_pri $pri;
628 add $grp aio_readdir $path, sub {
629 my $entries = shift
630 or return $grp->result ();
631
632 # stat the dir another time
633 aioreq_pri $pri;
628 add $grp aio_stat $path, sub { 634 add $grp aio_stat $path, sub {
629 my $hash2 = join ":", (stat _)[0,1,3,7,9]; 635 my $hash2 = join ":", (stat _)[0,1,3,7,9];
630 636
631 my $ndirs; 637 my $ndirs;
632 638
633 # take the slow route if anything looks fishy 639 # take the slow route if anything looks fishy
634 if ($hash1 ne $hash2 or (stat _)[9] == $now) { 640 if ($hash1 ne $hash2 or (stat _)[9] == $now) {
635 $ndirs = -1; 641 $ndirs = -1;
636 } else { 642 } else {
637 # if nlink == 2, we are finished 643 # if nlink == 2, we are finished
638 # on non-posix-fs's, we rely on nlink < 2 644 # on non-posix-fs's, we rely on nlink < 2
639 $ndirs = (stat _)[3] - 2 645 $ndirs = (stat _)[3] - 2
640 or return $grp->result ([], $entries); 646 or return $grp->result ([], $entries);
641 } 647 }
642 648
643 # sort into likely dirs and likely nondirs 649 # sort into likely dirs and likely nondirs
644 # dirs == files without ".", short entries first 650 # dirs == files without ".", short entries first
645 $entries = [map $_->[0], 651 $entries = [map $_->[0],
646 sort { $b->[1] cmp $a->[1] } 652 sort { $b->[1] cmp $a->[1] }
647 map [$_, sprintf "%s%04d", (/.\./ ? "1" : "0"), length], 653 map [$_, sprintf "%s%04d", (/.\./ ? "1" : "0"), length],
648 @$entries]; 654 @$entries];
649 655
650 my (@dirs, @nondirs); 656 my (@dirs, @nondirs);
651 657
652 my $statgrp = add $grp aio_group sub { 658 my $statgrp = add $grp aio_group sub {
653 $grp->result (\@dirs, \@nondirs); 659 $grp->result (\@dirs, \@nondirs);
654 }; 660 };
655 661
656 limit $statgrp $maxreq; 662 limit $statgrp $maxreq;
657 feed $statgrp sub { 663 feed $statgrp sub {
658 return unless @$entries; 664 return unless @$entries;
659 my $entry = pop @$entries; 665 my $entry = pop @$entries;
660 666
661 aioreq_pri $pri; 667 aioreq_pri $pri;
662 add $statgrp aio_stat "$path/$entry/.", sub { 668 add $statgrp aio_stat "$path/$entry/.", sub {
663 if ($_[0] < 0) { 669 if ($_[0] < 0) {
664 push @nondirs, $entry; 670 push @nondirs, $entry;
665 } else { 671 } else {
666 # need to check for real directory 672 # need to check for real directory
667 aioreq_pri $pri; 673 aioreq_pri $pri;
668 add $statgrp aio_lstat "$path/$entry", sub { 674 add $statgrp aio_lstat "$path/$entry", sub {
669 if (-d _) { 675 if (-d _) {
670 push @dirs, $entry; 676 push @dirs, $entry;
671 677
672 unless (--$ndirs) { 678 unless (--$ndirs) {
673 push @nondirs, @$entries; 679 push @nondirs, @$entries;
674 feed $statgrp; 680 feed $statgrp;
681 }
682 } else {
683 push @nondirs, $entry;
675 } 684 }
676 } else {
677 push @nondirs, $entry;
678 } 685 }
679 } 686 }
680 } 687 };
681 }; 688 };
682 }; 689 };
683 }; 690 };
684 }; 691 };
692
693 $grp
685 }; 694 }
686
687 $grp
688} 695}
689 696
690=item aio_fsync $fh, $callback->($status) 697=item aio_fsync $fh, $callback->($status)
691 698
692Asynchronously call fsync on the given filehandle and call the callback 699Asynchronously call fsync on the given filehandle and call the callback
971 poll => 'r', nice => 1, 978 poll => 'r', nice => 1,
972 cb => &IO::AIO::poll_cb); 979 cb => &IO::AIO::poll_cb);
973 980
974=item IO::AIO::poll_wait 981=item IO::AIO::poll_wait
975 982
976If there are any outstanding requests, wait till the result filehandle 983If there are any outstanding requests and none of them in the result
977becomes ready for reading (simply does a C<select> on the filehandle. This 984phase, wait till the result filehandle becomes ready for reading (simply
985does a C<select> on the filehandle. This is useful if you want to
978is useful if you want to synchronously wait for some requests to finish). 986synchronously wait for some requests to finish).
979 987
980See C<nreqs> for an example. 988See C<nreqs> for an example.
981 989
982=item IO::AIO::poll 990=item IO::AIO::poll
983 991
985 993
986Returns the number of requests processed, but is otherwise strictly 994Returns the number of requests processed, but is otherwise strictly
987equivalent to: 995equivalent to:
988 996
989 IO::AIO::poll_wait, IO::AIO::poll_cb 997 IO::AIO::poll_wait, IO::AIO::poll_cb
990 if IO::AIO::nreqs;
991 998
992=item IO::AIO::flush 999=item IO::AIO::flush
993 1000
994Wait till all outstanding AIO requests have been handled. 1001Wait till all outstanding AIO requests have been handled.
995 1002
1109 *$sym 1116 *$sym
1110} 1117}
1111 1118
1112min_parallel 8; 1119min_parallel 8;
1113 1120
1114END { 1121END { flush }
1115 min_parallel 1;
1116 flush;
1117};
1118 1122
11191; 11231;
1120 1124
1121=head2 FORK BEHAVIOUR 1125=head2 FORK BEHAVIOUR
1122 1126

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines