ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/IO-AIO/AIO.pm
(Generate patch)

Comparing IO-AIO/AIO.pm (file contents):
Revision 1.87 by root, Sun Oct 29 00:52:02 2006 UTC vs.
Revision 1.96 by root, Fri Dec 22 04:05:50 2006 UTC

5=head1 SYNOPSIS 5=head1 SYNOPSIS
6 6
7 use IO::AIO; 7 use IO::AIO;
8 8
9 aio_open "/etc/passwd", O_RDONLY, 0, sub { 9 aio_open "/etc/passwd", O_RDONLY, 0, sub {
10 my ($fh) = @_; 10 my $fh = shift
11 or die "/etc/passwd: $!";
11 ... 12 ...
12 }; 13 };
13 14
14 aio_unlink "/tmp/file", sub { }; 15 aio_unlink "/tmp/file", sub { };
15 16
61etc.), but can also be used to easily do operations in parallel that are 62etc.), but can also be used to easily do operations in parallel that are
62normally done sequentially, e.g. stat'ing many files, which is much faster 63normally done sequentially, e.g. stat'ing many files, which is much faster
63on a RAID volume or over NFS when you do a number of stat operations 64on a RAID volume or over NFS when you do a number of stat operations
64concurrently. 65concurrently.
65 66
66While this works on all types of file descriptors (for example sockets), 67While most of this works on all types of file descriptors (for example
67using these functions on file descriptors that support nonblocking 68sockets), using these functions on file descriptors that support
68operation (again, sockets, pipes etc.) is very inefficient. Use an event 69nonblocking operation (again, sockets, pipes etc.) is very inefficient or
70might not work (aio_read fails on sockets/pipes/fifos). Use an event loop
69loop for that (such as the L<Event|Event> module): IO::AIO will naturally 71for that (such as the L<Event|Event> module): IO::AIO will naturally fit
70fit into such an event loop itself. 72into such an event loop itself.
71 73
72In this version, a number of threads are started that execute your 74In this version, a number of threads are started that execute your
73requests and signal their completion. You don't need thread support 75requests and signal their completion. You don't need thread support
74in perl, and the threads created by this module will not be visible 76in perl, and the threads created by this module will not be visible
75to perl. In the future, this module might make use of the native aio 77to perl. In the future, this module might make use of the native aio
98 poll => 'r', 100 poll => 'r',
99 cb => \&IO::AIO::poll_cb); 101 cb => \&IO::AIO::poll_cb);
100 102
101 # queue the request to open /etc/passwd 103 # queue the request to open /etc/passwd
102 aio_open "/etc/passwd", O_RDONLY, 0, sub { 104 aio_open "/etc/passwd", O_RDONLY, 0, sub {
103 my $fh = $_[0] 105 my $fh = shift
104 or die "error while opening: $!"; 106 or die "error while opening: $!";
105 107
106 # stat'ing filehandles is generally non-blocking 108 # stat'ing filehandles is generally non-blocking
107 my $size = -s $fh; 109 my $size = -s $fh;
108 110
176Request has reached the end of its lifetime and holds no resources anymore 178Request has reached the end of its lifetime and holds no resources anymore
177(except possibly for the Perl object, but its connection to the actual 179(except possibly for the Perl object, but its connection to the actual
178aio request is severed and calling its methods will either do nothing or 180aio request is severed and calling its methods will either do nothing or
179result in a runtime error). 181result in a runtime error).
180 182
183=back
184
181=cut 185=cut
182 186
183package IO::AIO; 187package IO::AIO;
184 188
185no warnings; 189no warnings;
186use strict 'vars'; 190use strict 'vars';
187 191
188use base 'Exporter'; 192use base 'Exporter';
189 193
190BEGIN { 194BEGIN {
191 our $VERSION = '2.1'; 195 our $VERSION = '2.21';
192 196
193 our @AIO_REQ = qw(aio_sendfile aio_read aio_write aio_open aio_close aio_stat 197 our @AIO_REQ = qw(aio_sendfile aio_read aio_write aio_open aio_close aio_stat
194 aio_lstat aio_unlink aio_rmdir aio_readdir aio_scandir aio_symlink 198 aio_lstat aio_unlink aio_rmdir aio_readdir aio_scandir aio_symlink
195 aio_fsync aio_fdatasync aio_readahead aio_rename aio_link aio_move 199 aio_readlink aio_fsync aio_fdatasync aio_readahead aio_rename aio_link
196 aio_copy aio_group aio_nop aio_mknod); 200 aio_move aio_copy aio_group aio_nop aio_mknod);
197 our @EXPORT = (@AIO_REQ, qw(aioreq_pri aioreq_nice)); 201 our @EXPORT = (@AIO_REQ, qw(aioreq_pri aioreq_nice aio_block));
198 our @EXPORT_OK = qw(poll_fileno poll_cb poll_wait flush 202 our @EXPORT_OK = qw(poll_fileno poll_cb poll_wait flush
199 min_parallel max_parallel max_idle 203 min_parallel max_parallel max_idle
200 nreqs nready npending nthreads 204 nreqs nready npending nthreads
201 max_poll_time max_poll_reqs); 205 max_poll_time max_poll_reqs);
202 206
415=item aio_symlink $srcpath, $dstpath, $callback->($status) 419=item aio_symlink $srcpath, $dstpath, $callback->($status)
416 420
417Asynchronously create a new symbolic link to the existing object at C<$srcpath> at 421Asynchronously create a new symbolic link to the existing object at C<$srcpath> at
418the path C<$dstpath> and call the callback with the result code. 422the path C<$dstpath> and call the callback with the result code.
419 423
424=item aio_readlink $path, $callback->($link)
425
426Asynchronously read the symlink specified by C<$path> and pass it to
427the callback. If an error occurs, nothing or undef gets passed to the
428callback.
429
420=item aio_rename $srcpath, $dstpath, $callback->($status) 430=item aio_rename $srcpath, $dstpath, $callback->($status)
421 431
422Asynchronously rename the object at C<$srcpath> to C<$dstpath>, just as 432Asynchronously rename the object at C<$srcpath> to C<$dstpath>, just as
423rename(2) and call the callback with the result code. 433rename(2) and call the callback with the result code.
424 434
452errors are being ignored. 462errors are being ignored.
453 463
454=cut 464=cut
455 465
456sub aio_copy($$;$) { 466sub aio_copy($$;$) {
467 aio_block {
457 my ($src, $dst, $cb) = @_; 468 my ($src, $dst, $cb) = @_;
458 469
459 my $pri = aioreq_pri; 470 my $pri = aioreq_pri;
460 my $grp = aio_group $cb; 471 my $grp = aio_group $cb;
461 472
462 aioreq_pri $pri; 473 aioreq_pri $pri;
463 add $grp aio_open $src, O_RDONLY, 0, sub { 474 add $grp aio_open $src, O_RDONLY, 0, sub {
464 if (my $src_fh = $_[0]) { 475 if (my $src_fh = $_[0]) {
465 my @stat = stat $src_fh; 476 my @stat = stat $src_fh;
466 477
467 aioreq_pri $pri; 478 aioreq_pri $pri;
468 add $grp aio_open $dst, O_CREAT | O_WRONLY | O_TRUNC, 0200, sub { 479 add $grp aio_open $dst, O_CREAT | O_WRONLY | O_TRUNC, 0200, sub {
469 if (my $dst_fh = $_[0]) { 480 if (my $dst_fh = $_[0]) {
470 aioreq_pri $pri; 481 aioreq_pri $pri;
471 add $grp aio_sendfile $dst_fh, $src_fh, 0, $stat[7], sub { 482 add $grp aio_sendfile $dst_fh, $src_fh, 0, $stat[7], sub {
472 if ($_[0] == $stat[7]) { 483 if ($_[0] == $stat[7]) {
473 $grp->result (0); 484 $grp->result (0);
474 close $src_fh; 485 close $src_fh;
475 486
476 # those should not normally block. should. should. 487 # those should not normally block. should. should.
477 utime $stat[8], $stat[9], $dst; 488 utime $stat[8], $stat[9], $dst;
478 chmod $stat[2] & 07777, $dst_fh; 489 chmod $stat[2] & 07777, $dst_fh;
479 chown $stat[4], $stat[5], $dst_fh; 490 chown $stat[4], $stat[5], $dst_fh;
480 close $dst_fh; 491 close $dst_fh;
481 } else { 492 } else {
482 $grp->result (-1); 493 $grp->result (-1);
483 close $src_fh; 494 close $src_fh;
484 close $dst_fh; 495 close $dst_fh;
485 496
486 aioreq $pri; 497 aioreq $pri;
487 add $grp aio_unlink $dst; 498 add $grp aio_unlink $dst;
499 }
488 } 500 };
501 } else {
502 $grp->result (-1);
489 }; 503 }
490 } else {
491 $grp->result (-1);
492 } 504 },
505
506 } else {
507 $grp->result (-1);
493 }, 508 }
494
495 } else {
496 $grp->result (-1);
497 } 509 };
510
511 $grp
498 }; 512 }
499
500 $grp
501} 513}
502 514
503=item aio_move $srcpath, $dstpath, $callback->($status) 515=item aio_move $srcpath, $dstpath, $callback->($status)
504 516
505Try to move the I<file> (directories not supported as either source or 517Try to move the I<file> (directories not supported as either source or
511that is successful, unlinking the C<$srcpath>. 523that is successful, unlinking the C<$srcpath>.
512 524
513=cut 525=cut
514 526
515sub aio_move($$;$) { 527sub aio_move($$;$) {
528 aio_block {
516 my ($src, $dst, $cb) = @_; 529 my ($src, $dst, $cb) = @_;
517 530
518 my $pri = aioreq_pri; 531 my $pri = aioreq_pri;
519 my $grp = aio_group $cb; 532 my $grp = aio_group $cb;
520 533
521 aioreq_pri $pri; 534 aioreq_pri $pri;
522 add $grp aio_rename $src, $dst, sub { 535 add $grp aio_rename $src, $dst, sub {
523 if ($_[0] && $! == EXDEV) { 536 if ($_[0] && $! == EXDEV) {
524 aioreq_pri $pri; 537 aioreq_pri $pri;
525 add $grp aio_copy $src, $dst, sub { 538 add $grp aio_copy $src, $dst, sub {
539 $grp->result ($_[0]);
540
541 if (!$_[0]) {
542 aioreq_pri $pri;
543 add $grp aio_unlink $src;
544 }
545 };
546 } else {
526 $grp->result ($_[0]); 547 $grp->result ($_[0]);
527
528 if (!$_[0]) {
529 aioreq_pri $pri;
530 add $grp aio_unlink $src;
531 }
532 }; 548 }
533 } else {
534 $grp->result ($_[0]);
535 } 549 };
550
551 $grp
536 }; 552 }
537
538 $grp
539} 553}
540 554
541=item aio_scandir $path, $maxreq, $callback->($dirs, $nondirs) 555=item aio_scandir $path, $maxreq, $callback->($dirs, $nondirs)
542 556
543Scans a directory (similar to C<aio_readdir>) but additionally tries to 557Scans a directory (similar to C<aio_readdir>) but additionally tries to
591directory counting heuristic. 605directory counting heuristic.
592 606
593=cut 607=cut
594 608
595sub aio_scandir($$$) { 609sub aio_scandir($$$) {
610 aio_block {
596 my ($path, $maxreq, $cb) = @_; 611 my ($path, $maxreq, $cb) = @_;
597 612
598 my $pri = aioreq_pri; 613 my $pri = aioreq_pri;
599 614
600 my $grp = aio_group $cb; 615 my $grp = aio_group $cb;
601 616
602 $maxreq = 4 if $maxreq <= 0; 617 $maxreq = 4 if $maxreq <= 0;
603 618
604 # stat once 619 # stat once
605 aioreq_pri $pri;
606 add $grp aio_stat $path, sub {
607 return $grp->result () if $_[0];
608 my $now = time;
609 my $hash1 = join ":", (stat _)[0,1,3,7,9];
610
611 # read the directory entries
612 aioreq_pri $pri; 620 aioreq_pri $pri;
613 add $grp aio_readdir $path, sub { 621 add $grp aio_stat $path, sub {
614 my $entries = shift
615 or return $grp->result (); 622 return $grp->result () if $_[0];
623 my $now = time;
624 my $hash1 = join ":", (stat _)[0,1,3,7,9];
616 625
617 # stat the dir another time 626 # read the directory entries
618 aioreq_pri $pri; 627 aioreq_pri $pri;
628 add $grp aio_readdir $path, sub {
629 my $entries = shift
630 or return $grp->result ();
631
632 # stat the dir another time
633 aioreq_pri $pri;
619 add $grp aio_stat $path, sub { 634 add $grp aio_stat $path, sub {
620 my $hash2 = join ":", (stat _)[0,1,3,7,9]; 635 my $hash2 = join ":", (stat _)[0,1,3,7,9];
621 636
622 my $ndirs; 637 my $ndirs;
623 638
624 # take the slow route if anything looks fishy 639 # take the slow route if anything looks fishy
625 if ($hash1 ne $hash2 or (stat _)[9] == $now) { 640 if ($hash1 ne $hash2 or (stat _)[9] == $now) {
626 $ndirs = -1; 641 $ndirs = -1;
627 } else { 642 } else {
628 # if nlink == 2, we are finished 643 # if nlink == 2, we are finished
629 # on non-posix-fs's, we rely on nlink < 2 644 # on non-posix-fs's, we rely on nlink < 2
630 $ndirs = (stat _)[3] - 2 645 $ndirs = (stat _)[3] - 2
631 or return $grp->result ([], $entries); 646 or return $grp->result ([], $entries);
632 } 647 }
633 648
634 # sort into likely dirs and likely nondirs 649 # sort into likely dirs and likely nondirs
635 # dirs == files without ".", short entries first 650 # dirs == files without ".", short entries first
636 $entries = [map $_->[0], 651 $entries = [map $_->[0],
637 sort { $b->[1] cmp $a->[1] } 652 sort { $b->[1] cmp $a->[1] }
638 map [$_, sprintf "%s%04d", (/.\./ ? "1" : "0"), length], 653 map [$_, sprintf "%s%04d", (/.\./ ? "1" : "0"), length],
639 @$entries]; 654 @$entries];
640 655
641 my (@dirs, @nondirs); 656 my (@dirs, @nondirs);
642 657
643 my $statgrp = add $grp aio_group sub { 658 my $statgrp = add $grp aio_group sub {
644 $grp->result (\@dirs, \@nondirs); 659 $grp->result (\@dirs, \@nondirs);
645 }; 660 };
646 661
647 limit $statgrp $maxreq; 662 limit $statgrp $maxreq;
648 feed $statgrp sub { 663 feed $statgrp sub {
649 return unless @$entries; 664 return unless @$entries;
650 my $entry = pop @$entries; 665 my $entry = pop @$entries;
651 666
652 aioreq_pri $pri; 667 aioreq_pri $pri;
653 add $statgrp aio_stat "$path/$entry/.", sub { 668 add $statgrp aio_stat "$path/$entry/.", sub {
654 if ($_[0] < 0) { 669 if ($_[0] < 0) {
655 push @nondirs, $entry; 670 push @nondirs, $entry;
656 } else { 671 } else {
657 # need to check for real directory 672 # need to check for real directory
658 aioreq_pri $pri; 673 aioreq_pri $pri;
659 add $statgrp aio_lstat "$path/$entry", sub { 674 add $statgrp aio_lstat "$path/$entry", sub {
660 if (-d _) { 675 if (-d _) {
661 push @dirs, $entry; 676 push @dirs, $entry;
662 677
663 unless (--$ndirs) { 678 unless (--$ndirs) {
664 push @nondirs, @$entries; 679 push @nondirs, @$entries;
665 feed $statgrp; 680 feed $statgrp;
681 }
682 } else {
683 push @nondirs, $entry;
666 } 684 }
667 } else {
668 push @nondirs, $entry;
669 } 685 }
670 } 686 }
671 } 687 };
672 }; 688 };
673 }; 689 };
674 }; 690 };
675 }; 691 };
692
693 $grp
676 }; 694 }
677
678 $grp
679} 695}
680 696
681=item aio_fsync $fh, $callback->($status) 697=item aio_fsync $fh, $callback->($status)
682 698
683Asynchronously call fsync on the given filehandle and call the callback 699Asynchronously call fsync on the given filehandle and call the callback
937that are being processed by C<IO::AIO::poll_cb> in one call, respectively 953that are being processed by C<IO::AIO::poll_cb> in one call, respectively
938the maximum amount of time (default C<0>, meaning infinity) spent in 954the maximum amount of time (default C<0>, meaning infinity) spent in
939C<IO::AIO::poll_cb> to process requests (more correctly the mininum amount 955C<IO::AIO::poll_cb> to process requests (more correctly the mininum amount
940of time C<poll_cb> is allowed to use). 956of time C<poll_cb> is allowed to use).
941 957
958Setting C<max_poll_time> to a non-zero value creates an overhead of one
959syscall per request processed, which is not normally a problem unless your
960callbacks are really really fast or your OS is really really slow (I am
961not mentioning Solaris here). Using C<max_poll_reqs> incurs no overhead.
962
942Setting these is useful if you want to ensure some level of 963Setting these is useful if you want to ensure some level of
943interactiveness when perl is not fast enough to process all requests in 964interactiveness when perl is not fast enough to process all requests in
944time. 965time.
945 966
946For interactive programs, values such as C<0.01> to C<0.1> should be fine. 967For interactive programs, values such as C<0.01> to C<0.1> should be fine.
947 968
948Example: Install an Event watcher that automatically calls 969Example: Install an Event watcher that automatically calls
949IO::AIO::poll_some with low priority, to ensure that other parts of the 970IO::AIO::poll_cb with low priority, to ensure that other parts of the
950program get the CPU sometimes even under high AIO load. 971program get the CPU sometimes even under high AIO load.
951 972
952 # try not to spend much more than 0.1s in poll_cb 973 # try not to spend much more than 0.1s in poll_cb
953 IO::AIO::max_poll_time 0.1; 974 IO::AIO::max_poll_time 0.1;
954 975
957 poll => 'r', nice => 1, 978 poll => 'r', nice => 1,
958 cb => &IO::AIO::poll_cb); 979 cb => &IO::AIO::poll_cb);
959 980
960=item IO::AIO::poll_wait 981=item IO::AIO::poll_wait
961 982
983If there are any outstanding requests and none of them in the result
962Wait till the result filehandle becomes ready for reading (simply does a 984phase, wait till the result filehandle becomes ready for reading (simply
963C<select> on the filehandle. This is useful if you want to synchronously 985does a C<select> on the filehandle. This is useful if you want to
964wait for some requests to finish). 986synchronously wait for some requests to finish).
965 987
966See C<nreqs> for an example. 988See C<nreqs> for an example.
967 989
968=item IO::AIO::poll 990=item IO::AIO::poll
969 991
970Waits until some requests have been handled. 992Waits until some requests have been handled.
971 993
994Returns the number of requests processed, but is otherwise strictly
972Strictly equivalent to: 995equivalent to:
973 996
974 IO::AIO::poll_wait, IO::AIO::poll_cb 997 IO::AIO::poll_wait, IO::AIO::poll_cb
975 if IO::AIO::nreqs;
976 998
977=item IO::AIO::flush 999=item IO::AIO::flush
978 1000
979Wait till all outstanding AIO requests have been handled. 1001Wait till all outstanding AIO requests have been handled.
980 1002
1094 *$sym 1116 *$sym
1095} 1117}
1096 1118
1097min_parallel 8; 1119min_parallel 8;
1098 1120
1099END { 1121END { flush }
1100 min_parallel 1;
1101 flush;
1102};
1103 1122
11041; 11231;
1105 1124
1106=head2 FORK BEHAVIOUR 1125=head2 FORK BEHAVIOUR
1107 1126

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines