ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/IO-AIO/AIO.pm
(Generate patch)

Comparing IO-AIO/AIO.pm (file contents):
Revision 1.93 by root, Wed Nov 8 01:59:58 2006 UTC vs.
Revision 1.95 by root, Sun Nov 26 18:28:37 2006 UTC

5=head1 SYNOPSIS 5=head1 SYNOPSIS
6 6
7 use IO::AIO; 7 use IO::AIO;
8 8
9 aio_open "/etc/passwd", O_RDONLY, 0, sub { 9 aio_open "/etc/passwd", O_RDONLY, 0, sub {
10 my ($fh) = @_; 10 my $fh = shift
11 or die "/etc/passwd: $!";
11 ... 12 ...
12 }; 13 };
13 14
14 aio_unlink "/tmp/file", sub { }; 15 aio_unlink "/tmp/file", sub { };
15 16
99 poll => 'r', 100 poll => 'r',
100 cb => \&IO::AIO::poll_cb); 101 cb => \&IO::AIO::poll_cb);
101 102
102 # queue the request to open /etc/passwd 103 # queue the request to open /etc/passwd
103 aio_open "/etc/passwd", O_RDONLY, 0, sub { 104 aio_open "/etc/passwd", O_RDONLY, 0, sub {
104 my $fh = $_[0] 105 my $fh = shift
105 or die "error while opening: $!"; 106 or die "error while opening: $!";
106 107
107 # stat'ing filehandles is generally non-blocking 108 # stat'ing filehandles is generally non-blocking
108 my $size = -s $fh; 109 my $size = -s $fh;
109 110
195 196
196 our @AIO_REQ = qw(aio_sendfile aio_read aio_write aio_open aio_close aio_stat 197 our @AIO_REQ = qw(aio_sendfile aio_read aio_write aio_open aio_close aio_stat
197 aio_lstat aio_unlink aio_rmdir aio_readdir aio_scandir aio_symlink 198 aio_lstat aio_unlink aio_rmdir aio_readdir aio_scandir aio_symlink
198 aio_readlink aio_fsync aio_fdatasync aio_readahead aio_rename aio_link 199 aio_readlink aio_fsync aio_fdatasync aio_readahead aio_rename aio_link
199 aio_move aio_copy aio_group aio_nop aio_mknod); 200 aio_move aio_copy aio_group aio_nop aio_mknod);
200 our @EXPORT = (@AIO_REQ, qw(aioreq_pri aioreq_nice)); 201 our @EXPORT = (@AIO_REQ, qw(aioreq_pri aioreq_nice aio_block));
201 our @EXPORT_OK = qw(poll_fileno poll_cb poll_wait flush 202 our @EXPORT_OK = qw(poll_fileno poll_cb poll_wait flush
202 min_parallel max_parallel max_idle 203 min_parallel max_parallel max_idle
203 nreqs nready npending nthreads 204 nreqs nready npending nthreads
204 max_poll_time max_poll_reqs); 205 max_poll_time max_poll_reqs);
205 206
461errors are being ignored. 462errors are being ignored.
462 463
463=cut 464=cut
464 465
465sub aio_copy($$;$) { 466sub aio_copy($$;$) {
467 aio_block {
466 my ($src, $dst, $cb) = @_; 468 my ($src, $dst, $cb) = @_;
467 469
468 my $pri = aioreq_pri; 470 my $pri = aioreq_pri;
469 my $grp = aio_group $cb; 471 my $grp = aio_group $cb;
470 472
471 aioreq_pri $pri; 473 aioreq_pri $pri;
472 add $grp aio_open $src, O_RDONLY, 0, sub { 474 add $grp aio_open $src, O_RDONLY, 0, sub {
473 if (my $src_fh = $_[0]) { 475 if (my $src_fh = $_[0]) {
474 my @stat = stat $src_fh; 476 my @stat = stat $src_fh;
475 477
476 aioreq_pri $pri; 478 aioreq_pri $pri;
477 add $grp aio_open $dst, O_CREAT | O_WRONLY | O_TRUNC, 0200, sub { 479 add $grp aio_open $dst, O_CREAT | O_WRONLY | O_TRUNC, 0200, sub {
478 if (my $dst_fh = $_[0]) { 480 if (my $dst_fh = $_[0]) {
479 aioreq_pri $pri; 481 aioreq_pri $pri;
480 add $grp aio_sendfile $dst_fh, $src_fh, 0, $stat[7], sub { 482 add $grp aio_sendfile $dst_fh, $src_fh, 0, $stat[7], sub {
481 if ($_[0] == $stat[7]) { 483 if ($_[0] == $stat[7]) {
482 $grp->result (0); 484 $grp->result (0);
483 close $src_fh; 485 close $src_fh;
484 486
485 # those should not normally block. should. should. 487 # those should not normally block. should. should.
486 utime $stat[8], $stat[9], $dst; 488 utime $stat[8], $stat[9], $dst;
487 chmod $stat[2] & 07777, $dst_fh; 489 chmod $stat[2] & 07777, $dst_fh;
488 chown $stat[4], $stat[5], $dst_fh; 490 chown $stat[4], $stat[5], $dst_fh;
489 close $dst_fh; 491 close $dst_fh;
490 } else { 492 } else {
491 $grp->result (-1); 493 $grp->result (-1);
492 close $src_fh; 494 close $src_fh;
493 close $dst_fh; 495 close $dst_fh;
494 496
495 aioreq $pri; 497 aioreq $pri;
496 add $grp aio_unlink $dst; 498 add $grp aio_unlink $dst;
499 }
497 } 500 };
501 } else {
502 $grp->result (-1);
498 }; 503 }
499 } else {
500 $grp->result (-1);
501 } 504 },
505
506 } else {
507 $grp->result (-1);
502 }, 508 }
503
504 } else {
505 $grp->result (-1);
506 } 509 };
510
511 $grp
507 }; 512 }
508
509 $grp
510} 513}
511 514
512=item aio_move $srcpath, $dstpath, $callback->($status) 515=item aio_move $srcpath, $dstpath, $callback->($status)
513 516
514Try to move the I<file> (directories not supported as either source or 517Try to move the I<file> (directories not supported as either source or
520that is successful, unlinking the C<$srcpath>. 523that is successful, unlinking the C<$srcpath>.
521 524
522=cut 525=cut
523 526
524sub aio_move($$;$) { 527sub aio_move($$;$) {
528 aio_block {
525 my ($src, $dst, $cb) = @_; 529 my ($src, $dst, $cb) = @_;
526 530
527 my $pri = aioreq_pri; 531 my $pri = aioreq_pri;
528 my $grp = aio_group $cb; 532 my $grp = aio_group $cb;
529 533
530 aioreq_pri $pri; 534 aioreq_pri $pri;
531 add $grp aio_rename $src, $dst, sub { 535 add $grp aio_rename $src, $dst, sub {
532 if ($_[0] && $! == EXDEV) { 536 if ($_[0] && $! == EXDEV) {
533 aioreq_pri $pri; 537 aioreq_pri $pri;
534 add $grp aio_copy $src, $dst, sub { 538 add $grp aio_copy $src, $dst, sub {
539 $grp->result ($_[0]);
540
541 if (!$_[0]) {
542 aioreq_pri $pri;
543 add $grp aio_unlink $src;
544 }
545 };
546 } else {
535 $grp->result ($_[0]); 547 $grp->result ($_[0]);
536
537 if (!$_[0]) {
538 aioreq_pri $pri;
539 add $grp aio_unlink $src;
540 }
541 }; 548 }
542 } else {
543 $grp->result ($_[0]);
544 } 549 };
550
551 $grp
545 }; 552 }
546
547 $grp
548} 553}
549 554
550=item aio_scandir $path, $maxreq, $callback->($dirs, $nondirs) 555=item aio_scandir $path, $maxreq, $callback->($dirs, $nondirs)
551 556
552Scans a directory (similar to C<aio_readdir>) but additionally tries to 557Scans a directory (similar to C<aio_readdir>) but additionally tries to
600directory counting heuristic. 605directory counting heuristic.
601 606
602=cut 607=cut
603 608
604sub aio_scandir($$$) { 609sub aio_scandir($$$) {
610 aio_block {
605 my ($path, $maxreq, $cb) = @_; 611 my ($path, $maxreq, $cb) = @_;
606 612
607 my $pri = aioreq_pri; 613 my $pri = aioreq_pri;
608 614
609 my $grp = aio_group $cb; 615 my $grp = aio_group $cb;
610 616
611 $maxreq = 4 if $maxreq <= 0; 617 $maxreq = 4 if $maxreq <= 0;
612 618
613 # stat once 619 # stat once
614 aioreq_pri $pri;
615 add $grp aio_stat $path, sub {
616 return $grp->result () if $_[0];
617 my $now = time;
618 my $hash1 = join ":", (stat _)[0,1,3,7,9];
619
620 # read the directory entries
621 aioreq_pri $pri; 620 aioreq_pri $pri;
622 add $grp aio_readdir $path, sub { 621 add $grp aio_stat $path, sub {
623 my $entries = shift
624 or return $grp->result (); 622 return $grp->result () if $_[0];
623 my $now = time;
624 my $hash1 = join ":", (stat _)[0,1,3,7,9];
625 625
626 # stat the dir another time 626 # read the directory entries
627 aioreq_pri $pri; 627 aioreq_pri $pri;
628 add $grp aio_readdir $path, sub {
629 my $entries = shift
630 or return $grp->result ();
631
632 # stat the dir another time
633 aioreq_pri $pri;
628 add $grp aio_stat $path, sub { 634 add $grp aio_stat $path, sub {
629 my $hash2 = join ":", (stat _)[0,1,3,7,9]; 635 my $hash2 = join ":", (stat _)[0,1,3,7,9];
630 636
631 my $ndirs; 637 my $ndirs;
632 638
633 # take the slow route if anything looks fishy 639 # take the slow route if anything looks fishy
634 if ($hash1 ne $hash2 or (stat _)[9] == $now) { 640 if ($hash1 ne $hash2 or (stat _)[9] == $now) {
635 $ndirs = -1; 641 $ndirs = -1;
636 } else { 642 } else {
637 # if nlink == 2, we are finished 643 # if nlink == 2, we are finished
638 # on non-posix-fs's, we rely on nlink < 2 644 # on non-posix-fs's, we rely on nlink < 2
639 $ndirs = (stat _)[3] - 2 645 $ndirs = (stat _)[3] - 2
640 or return $grp->result ([], $entries); 646 or return $grp->result ([], $entries);
641 } 647 }
642 648
643 # sort into likely dirs and likely nondirs 649 # sort into likely dirs and likely nondirs
644 # dirs == files without ".", short entries first 650 # dirs == files without ".", short entries first
645 $entries = [map $_->[0], 651 $entries = [map $_->[0],
646 sort { $b->[1] cmp $a->[1] } 652 sort { $b->[1] cmp $a->[1] }
647 map [$_, sprintf "%s%04d", (/.\./ ? "1" : "0"), length], 653 map [$_, sprintf "%s%04d", (/.\./ ? "1" : "0"), length],
648 @$entries]; 654 @$entries];
649 655
650 my (@dirs, @nondirs); 656 my (@dirs, @nondirs);
651 657
652 my $statgrp = add $grp aio_group sub { 658 my $statgrp = add $grp aio_group sub {
653 $grp->result (\@dirs, \@nondirs); 659 $grp->result (\@dirs, \@nondirs);
654 }; 660 };
655 661
656 limit $statgrp $maxreq; 662 limit $statgrp $maxreq;
657 feed $statgrp sub { 663 feed $statgrp sub {
658 return unless @$entries; 664 return unless @$entries;
659 my $entry = pop @$entries; 665 my $entry = pop @$entries;
660 666
661 aioreq_pri $pri; 667 aioreq_pri $pri;
662 add $statgrp aio_stat "$path/$entry/.", sub { 668 add $statgrp aio_stat "$path/$entry/.", sub {
663 if ($_[0] < 0) { 669 if ($_[0] < 0) {
664 push @nondirs, $entry; 670 push @nondirs, $entry;
665 } else { 671 } else {
666 # need to check for real directory 672 # need to check for real directory
667 aioreq_pri $pri; 673 aioreq_pri $pri;
668 add $statgrp aio_lstat "$path/$entry", sub { 674 add $statgrp aio_lstat "$path/$entry", sub {
669 if (-d _) { 675 if (-d _) {
670 push @dirs, $entry; 676 push @dirs, $entry;
671 677
672 unless (--$ndirs) { 678 unless (--$ndirs) {
673 push @nondirs, @$entries; 679 push @nondirs, @$entries;
674 feed $statgrp; 680 feed $statgrp;
681 }
682 } else {
683 push @nondirs, $entry;
675 } 684 }
676 } else {
677 push @nondirs, $entry;
678 } 685 }
679 } 686 }
680 } 687 };
681 }; 688 };
682 }; 689 };
683 }; 690 };
684 }; 691 };
692
693 $grp
685 }; 694 }
686
687 $grp
688} 695}
689 696
690=item aio_fsync $fh, $callback->($status) 697=item aio_fsync $fh, $callback->($status)
691 698
692Asynchronously call fsync on the given filehandle and call the callback 699Asynchronously call fsync on the given filehandle and call the callback
1109 *$sym 1116 *$sym
1110} 1117}
1111 1118
1112min_parallel 8; 1119min_parallel 8;
1113 1120
1114END { 1121END { flush }
1115 min_parallel 1;
1116 flush;
1117};
1118 1122
11191; 11231;
1120 1124
1121=head2 FORK BEHAVIOUR 1125=head2 FORK BEHAVIOUR
1122 1126

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines