ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/IO-AIO/AIO.pm
(Generate patch)

Comparing IO-AIO/AIO.pm (file contents):
Revision 1.86 by root, Sat Oct 28 23:32:29 2006 UTC vs.
Revision 1.96 by root, Fri Dec 22 04:05:50 2006 UTC

5=head1 SYNOPSIS 5=head1 SYNOPSIS
6 6
7 use IO::AIO; 7 use IO::AIO;
8 8
9 aio_open "/etc/passwd", O_RDONLY, 0, sub { 9 aio_open "/etc/passwd", O_RDONLY, 0, sub {
10 my ($fh) = @_; 10 my $fh = shift
11 or die "/etc/passwd: $!";
11 ... 12 ...
12 }; 13 };
13 14
14 aio_unlink "/tmp/file", sub { }; 15 aio_unlink "/tmp/file", sub { };
15 16
61etc.), but can also be used to easily do operations in parallel that are 62etc.), but can also be used to easily do operations in parallel that are
62normally done sequentially, e.g. stat'ing many files, which is much faster 63normally done sequentially, e.g. stat'ing many files, which is much faster
63on a RAID volume or over NFS when you do a number of stat operations 64on a RAID volume or over NFS when you do a number of stat operations
64concurrently. 65concurrently.
65 66
66While this works on all types of file descriptors (for example sockets), 67While most of this works on all types of file descriptors (for example
67using these functions on file descriptors that support nonblocking 68sockets), using these functions on file descriptors that support
68operation (again, sockets, pipes etc.) is very inefficient. Use an event 69nonblocking operation (again, sockets, pipes etc.) is very inefficient or
70might not work (aio_read fails on sockets/pipes/fifos). Use an event loop
69loop for that (such as the L<Event|Event> module): IO::AIO will naturally 71for that (such as the L<Event|Event> module): IO::AIO will naturally fit
70fit into such an event loop itself. 72into such an event loop itself.
71 73
72In this version, a number of threads are started that execute your 74In this version, a number of threads are started that execute your
73requests and signal their completion. You don't need thread support 75requests and signal their completion. You don't need thread support
74in perl, and the threads created by this module will not be visible 76in perl, and the threads created by this module will not be visible
75to perl. In the future, this module might make use of the native aio 77to perl. In the future, this module might make use of the native aio
98 poll => 'r', 100 poll => 'r',
99 cb => \&IO::AIO::poll_cb); 101 cb => \&IO::AIO::poll_cb);
100 102
101 # queue the request to open /etc/passwd 103 # queue the request to open /etc/passwd
102 aio_open "/etc/passwd", O_RDONLY, 0, sub { 104 aio_open "/etc/passwd", O_RDONLY, 0, sub {
103 my $fh = $_[0] 105 my $fh = shift
104 or die "error while opening: $!"; 106 or die "error while opening: $!";
105 107
106 # stat'ing filehandles is generally non-blocking 108 # stat'ing filehandles is generally non-blocking
107 my $size = -s $fh; 109 my $size = -s $fh;
108 110
176Request has reached the end of its lifetime and holds no resources anymore 178Request has reached the end of its lifetime and holds no resources anymore
177(except possibly for the Perl object, but its connection to the actual 179(except possibly for the Perl object, but its connection to the actual
178aio request is severed and calling its methods will either do nothing or 180aio request is severed and calling its methods will either do nothing or
179result in a runtime error). 181result in a runtime error).
180 182
183=back
184
181=cut 185=cut
182 186
183package IO::AIO; 187package IO::AIO;
184 188
185no warnings; 189no warnings;
186use strict 'vars'; 190use strict 'vars';
187 191
188use base 'Exporter'; 192use base 'Exporter';
189 193
190BEGIN { 194BEGIN {
191 our $VERSION = '2.1'; 195 our $VERSION = '2.21';
192 196
193 our @AIO_REQ = qw(aio_sendfile aio_read aio_write aio_open aio_close aio_stat 197 our @AIO_REQ = qw(aio_sendfile aio_read aio_write aio_open aio_close aio_stat
194 aio_lstat aio_unlink aio_rmdir aio_readdir aio_scandir aio_symlink 198 aio_lstat aio_unlink aio_rmdir aio_readdir aio_scandir aio_symlink
195 aio_fsync aio_fdatasync aio_readahead aio_rename aio_link aio_move 199 aio_readlink aio_fsync aio_fdatasync aio_readahead aio_rename aio_link
196 aio_copy aio_group aio_nop aio_mknod); 200 aio_move aio_copy aio_group aio_nop aio_mknod);
197 our @EXPORT = (@AIO_REQ, qw(aioreq_pri aioreq_nice)); 201 our @EXPORT = (@AIO_REQ, qw(aioreq_pri aioreq_nice aio_block));
198 our @EXPORT_OK = qw(poll_fileno poll_cb poll_wait flush 202 our @EXPORT_OK = qw(poll_fileno poll_cb poll_wait flush
199 min_parallel max_parallel max_idle 203 min_parallel max_parallel max_idle
200 nreqs nready npending nthreads 204 nreqs nready npending nthreads
201 max_poll_time max_poll_reqs); 205 max_poll_time max_poll_reqs);
202 206
206 XSLoader::load ("IO::AIO", $VERSION); 210 XSLoader::load ("IO::AIO", $VERSION);
207} 211}
208 212
209=head1 FUNCTIONS 213=head1 FUNCTIONS
210 214
211=head2 AIO FUNCTIONS 215=head2 AIO REQUEST FUNCTIONS
212 216
213All the C<aio_*> calls are more or less thin wrappers around the syscall 217All the C<aio_*> calls are more or less thin wrappers around the syscall
214with the same name (sans C<aio_>). The arguments are similar or identical, 218with the same name (sans C<aio_>). The arguments are similar or identical,
215and they all accept an additional (and optional) C<$callback> argument 219and they all accept an additional (and optional) C<$callback> argument
216which must be a code reference. This code reference will get called with 220which must be a code reference. This code reference will get called with
219syscall has been executed asynchronously. 223syscall has been executed asynchronously.
220 224
221All functions expecting a filehandle keep a copy of the filehandle 225All functions expecting a filehandle keep a copy of the filehandle
222internally until the request has finished. 226internally until the request has finished.
223 227
224All requests return objects of type L<IO::AIO::REQ> that allow further 228All functions return request objects of type L<IO::AIO::REQ> that allow
225manipulation of those requests while they are in-flight. 229further manipulation of those requests while they are in-flight.
226 230
227The pathnames you pass to these routines I<must> be absolute and 231The pathnames you pass to these routines I<must> be absolute and
228encoded in byte form. The reason for the former is that at the time the 232encoded as octets. The reason for the former is that at the time the
229request is being executed, the current working directory could have 233request is being executed, the current working directory could have
230changed. Alternatively, you can make sure that you never change the 234changed. Alternatively, you can make sure that you never change the
231current working directory. 235current working directory anywhere in the program and then use relative
236paths.
232 237
233To encode pathnames to byte form, either make sure you either: a) 238To encode pathnames as octets, either make sure you either: a) always pass
234always pass in filenames you got from outside (command line, readdir 239in filenames you got from outside (command line, readdir etc.) without
235etc.), b) are ASCII or ISO 8859-1, c) use the Encode module and encode 240tinkering, b) are ASCII or ISO 8859-1, c) use the Encode module and encode
236your pathnames to the locale (or other) encoding in effect in the user 241your pathnames to the locale (or other) encoding in effect in the user
237environment, d) use Glib::filename_from_unicode on unicode filenames or e) 242environment, d) use Glib::filename_from_unicode on unicode filenames or e)
238use something else. 243use something else to ensure your scalar has the correct contents.
244
245This works, btw. independent of the internal UTF-8 bit, which IO::AIO
246handles correctly wether it is set or not.
239 247
240=over 4 248=over 4
241 249
242=item $prev_pri = aioreq_pri [$pri] 250=item $prev_pri = aioreq_pri [$pri]
243 251
266 }; 274 };
267 275
268=item aioreq_nice $pri_adjust 276=item aioreq_nice $pri_adjust
269 277
270Similar to C<aioreq_pri>, but subtracts the given value from the current 278Similar to C<aioreq_pri>, but subtracts the given value from the current
271priority, so effects are cumulative. 279priority, so the effect is cumulative.
272 280
273=item aio_open $pathname, $flags, $mode, $callback->($fh) 281=item aio_open $pathname, $flags, $mode, $callback->($fh)
274 282
275Asynchronously open or create a file and call the callback with a newly 283Asynchronously open or create a file and call the callback with a newly
276created filehandle for the file. 284created filehandle for the file.
411=item aio_symlink $srcpath, $dstpath, $callback->($status) 419=item aio_symlink $srcpath, $dstpath, $callback->($status)
412 420
413Asynchronously create a new symbolic link to the existing object at C<$srcpath> at 421Asynchronously create a new symbolic link to the existing object at C<$srcpath> at
414the path C<$dstpath> and call the callback with the result code. 422the path C<$dstpath> and call the callback with the result code.
415 423
424=item aio_readlink $path, $callback->($link)
425
426Asynchronously read the symlink specified by C<$path> and pass it to
427the callback. If an error occurs, nothing or undef gets passed to the
428callback.
429
416=item aio_rename $srcpath, $dstpath, $callback->($status) 430=item aio_rename $srcpath, $dstpath, $callback->($status)
417 431
418Asynchronously rename the object at C<$srcpath> to C<$dstpath>, just as 432Asynchronously rename the object at C<$srcpath> to C<$dstpath>, just as
419rename(2) and call the callback with the result code. 433rename(2) and call the callback with the result code.
420 434
448errors are being ignored. 462errors are being ignored.
449 463
450=cut 464=cut
451 465
452sub aio_copy($$;$) { 466sub aio_copy($$;$) {
467 aio_block {
453 my ($src, $dst, $cb) = @_; 468 my ($src, $dst, $cb) = @_;
454 469
455 my $pri = aioreq_pri; 470 my $pri = aioreq_pri;
456 my $grp = aio_group $cb; 471 my $grp = aio_group $cb;
457 472
458 aioreq_pri $pri; 473 aioreq_pri $pri;
459 add $grp aio_open $src, O_RDONLY, 0, sub { 474 add $grp aio_open $src, O_RDONLY, 0, sub {
460 if (my $src_fh = $_[0]) { 475 if (my $src_fh = $_[0]) {
461 my @stat = stat $src_fh; 476 my @stat = stat $src_fh;
462 477
463 aioreq_pri $pri; 478 aioreq_pri $pri;
464 add $grp aio_open $dst, O_CREAT | O_WRONLY | O_TRUNC, 0200, sub { 479 add $grp aio_open $dst, O_CREAT | O_WRONLY | O_TRUNC, 0200, sub {
465 if (my $dst_fh = $_[0]) { 480 if (my $dst_fh = $_[0]) {
466 aioreq_pri $pri; 481 aioreq_pri $pri;
467 add $grp aio_sendfile $dst_fh, $src_fh, 0, $stat[7], sub { 482 add $grp aio_sendfile $dst_fh, $src_fh, 0, $stat[7], sub {
468 if ($_[0] == $stat[7]) { 483 if ($_[0] == $stat[7]) {
469 $grp->result (0); 484 $grp->result (0);
470 close $src_fh; 485 close $src_fh;
471 486
472 # those should not normally block. should. should. 487 # those should not normally block. should. should.
473 utime $stat[8], $stat[9], $dst; 488 utime $stat[8], $stat[9], $dst;
474 chmod $stat[2] & 07777, $dst_fh; 489 chmod $stat[2] & 07777, $dst_fh;
475 chown $stat[4], $stat[5], $dst_fh; 490 chown $stat[4], $stat[5], $dst_fh;
476 close $dst_fh; 491 close $dst_fh;
477 } else { 492 } else {
478 $grp->result (-1); 493 $grp->result (-1);
479 close $src_fh; 494 close $src_fh;
480 close $dst_fh; 495 close $dst_fh;
481 496
482 aioreq $pri; 497 aioreq $pri;
483 add $grp aio_unlink $dst; 498 add $grp aio_unlink $dst;
499 }
484 } 500 };
501 } else {
502 $grp->result (-1);
485 }; 503 }
486 } else {
487 $grp->result (-1);
488 } 504 },
505
506 } else {
507 $grp->result (-1);
489 }, 508 }
490
491 } else {
492 $grp->result (-1);
493 } 509 };
510
511 $grp
494 }; 512 }
495
496 $grp
497} 513}
498 514
499=item aio_move $srcpath, $dstpath, $callback->($status) 515=item aio_move $srcpath, $dstpath, $callback->($status)
500 516
501Try to move the I<file> (directories not supported as either source or 517Try to move the I<file> (directories not supported as either source or
507that is successful, unlinking the C<$srcpath>. 523that is successful, unlinking the C<$srcpath>.
508 524
509=cut 525=cut
510 526
511sub aio_move($$;$) { 527sub aio_move($$;$) {
528 aio_block {
512 my ($src, $dst, $cb) = @_; 529 my ($src, $dst, $cb) = @_;
513 530
514 my $pri = aioreq_pri; 531 my $pri = aioreq_pri;
515 my $grp = aio_group $cb; 532 my $grp = aio_group $cb;
516 533
517 aioreq_pri $pri; 534 aioreq_pri $pri;
518 add $grp aio_rename $src, $dst, sub { 535 add $grp aio_rename $src, $dst, sub {
519 if ($_[0] && $! == EXDEV) { 536 if ($_[0] && $! == EXDEV) {
520 aioreq_pri $pri; 537 aioreq_pri $pri;
521 add $grp aio_copy $src, $dst, sub { 538 add $grp aio_copy $src, $dst, sub {
539 $grp->result ($_[0]);
540
541 if (!$_[0]) {
542 aioreq_pri $pri;
543 add $grp aio_unlink $src;
544 }
545 };
546 } else {
522 $grp->result ($_[0]); 547 $grp->result ($_[0]);
523
524 if (!$_[0]) {
525 aioreq_pri $pri;
526 add $grp aio_unlink $src;
527 }
528 }; 548 }
529 } else {
530 $grp->result ($_[0]);
531 } 549 };
550
551 $grp
532 }; 552 }
533
534 $grp
535} 553}
536 554
537=item aio_scandir $path, $maxreq, $callback->($dirs, $nondirs) 555=item aio_scandir $path, $maxreq, $callback->($dirs, $nondirs)
538 556
539Scans a directory (similar to C<aio_readdir>) but additionally tries to 557Scans a directory (similar to C<aio_readdir>) but additionally tries to
587directory counting heuristic. 605directory counting heuristic.
588 606
589=cut 607=cut
590 608
591sub aio_scandir($$$) { 609sub aio_scandir($$$) {
610 aio_block {
592 my ($path, $maxreq, $cb) = @_; 611 my ($path, $maxreq, $cb) = @_;
593 612
594 my $pri = aioreq_pri; 613 my $pri = aioreq_pri;
595 614
596 my $grp = aio_group $cb; 615 my $grp = aio_group $cb;
597 616
598 $maxreq = 4 if $maxreq <= 0; 617 $maxreq = 4 if $maxreq <= 0;
599 618
600 # stat once 619 # stat once
601 aioreq_pri $pri;
602 add $grp aio_stat $path, sub {
603 return $grp->result () if $_[0];
604 my $now = time;
605 my $hash1 = join ":", (stat _)[0,1,3,7,9];
606
607 # read the directory entries
608 aioreq_pri $pri; 620 aioreq_pri $pri;
609 add $grp aio_readdir $path, sub { 621 add $grp aio_stat $path, sub {
610 my $entries = shift
611 or return $grp->result (); 622 return $grp->result () if $_[0];
623 my $now = time;
624 my $hash1 = join ":", (stat _)[0,1,3,7,9];
612 625
613 # stat the dir another time 626 # read the directory entries
614 aioreq_pri $pri; 627 aioreq_pri $pri;
628 add $grp aio_readdir $path, sub {
629 my $entries = shift
630 or return $grp->result ();
631
632 # stat the dir another time
633 aioreq_pri $pri;
615 add $grp aio_stat $path, sub { 634 add $grp aio_stat $path, sub {
616 my $hash2 = join ":", (stat _)[0,1,3,7,9]; 635 my $hash2 = join ":", (stat _)[0,1,3,7,9];
617 636
618 my $ndirs; 637 my $ndirs;
619 638
620 # take the slow route if anything looks fishy 639 # take the slow route if anything looks fishy
621 if ($hash1 ne $hash2 or (stat _)[9] == $now) { 640 if ($hash1 ne $hash2 or (stat _)[9] == $now) {
622 $ndirs = -1; 641 $ndirs = -1;
623 } else { 642 } else {
624 # if nlink == 2, we are finished 643 # if nlink == 2, we are finished
625 # on non-posix-fs's, we rely on nlink < 2 644 # on non-posix-fs's, we rely on nlink < 2
626 $ndirs = (stat _)[3] - 2 645 $ndirs = (stat _)[3] - 2
627 or return $grp->result ([], $entries); 646 or return $grp->result ([], $entries);
628 } 647 }
629 648
630 # sort into likely dirs and likely nondirs 649 # sort into likely dirs and likely nondirs
631 # dirs == files without ".", short entries first 650 # dirs == files without ".", short entries first
632 $entries = [map $_->[0], 651 $entries = [map $_->[0],
633 sort { $b->[1] cmp $a->[1] } 652 sort { $b->[1] cmp $a->[1] }
634 map [$_, sprintf "%s%04d", (/.\./ ? "1" : "0"), length], 653 map [$_, sprintf "%s%04d", (/.\./ ? "1" : "0"), length],
635 @$entries]; 654 @$entries];
636 655
637 my (@dirs, @nondirs); 656 my (@dirs, @nondirs);
638 657
639 my $statgrp = add $grp aio_group sub { 658 my $statgrp = add $grp aio_group sub {
640 $grp->result (\@dirs, \@nondirs); 659 $grp->result (\@dirs, \@nondirs);
641 }; 660 };
642 661
643 limit $statgrp $maxreq; 662 limit $statgrp $maxreq;
644 feed $statgrp sub { 663 feed $statgrp sub {
645 return unless @$entries; 664 return unless @$entries;
646 my $entry = pop @$entries; 665 my $entry = pop @$entries;
647 666
648 aioreq_pri $pri; 667 aioreq_pri $pri;
649 add $statgrp aio_stat "$path/$entry/.", sub { 668 add $statgrp aio_stat "$path/$entry/.", sub {
650 if ($_[0] < 0) { 669 if ($_[0] < 0) {
651 push @nondirs, $entry; 670 push @nondirs, $entry;
652 } else { 671 } else {
653 # need to check for real directory 672 # need to check for real directory
654 aioreq_pri $pri; 673 aioreq_pri $pri;
655 add $statgrp aio_lstat "$path/$entry", sub { 674 add $statgrp aio_lstat "$path/$entry", sub {
656 if (-d _) { 675 if (-d _) {
657 push @dirs, $entry; 676 push @dirs, $entry;
658 677
659 unless (--$ndirs) { 678 unless (--$ndirs) {
660 push @nondirs, @$entries; 679 push @nondirs, @$entries;
661 feed $statgrp; 680 feed $statgrp;
681 }
682 } else {
683 push @nondirs, $entry;
662 } 684 }
663 } else {
664 push @nondirs, $entry;
665 } 685 }
666 } 686 }
667 } 687 };
668 }; 688 };
669 }; 689 };
670 }; 690 };
671 }; 691 };
692
693 $grp
672 }; 694 }
673
674 $grp
675} 695}
676 696
677=item aio_fsync $fh, $callback->($status) 697=item aio_fsync $fh, $callback->($status)
678 698
679Asynchronously call fsync on the given filehandle and call the callback 699Asynchronously call fsync on the given filehandle and call the callback
933that are being processed by C<IO::AIO::poll_cb> in one call, respectively 953that are being processed by C<IO::AIO::poll_cb> in one call, respectively
934the maximum amount of time (default C<0>, meaning infinity) spent in 954the maximum amount of time (default C<0>, meaning infinity) spent in
935C<IO::AIO::poll_cb> to process requests (more correctly the mininum amount 955C<IO::AIO::poll_cb> to process requests (more correctly the mininum amount
936of time C<poll_cb> is allowed to use). 956of time C<poll_cb> is allowed to use).
937 957
958Setting C<max_poll_time> to a non-zero value creates an overhead of one
959syscall per request processed, which is not normally a problem unless your
960callbacks are really really fast or your OS is really really slow (I am
961not mentioning Solaris here). Using C<max_poll_reqs> incurs no overhead.
962
938Setting these is useful if you want to ensure some level of 963Setting these is useful if you want to ensure some level of
939interactiveness when perl is not fast enough to process all requests in 964interactiveness when perl is not fast enough to process all requests in
940time. 965time.
941 966
942For interactive programs, values such as C<0.01> to C<0.1> should be fine. 967For interactive programs, values such as C<0.01> to C<0.1> should be fine.
943 968
944Example: Install an Event watcher that automatically calls 969Example: Install an Event watcher that automatically calls
945IO::AIO::poll_some with low priority, to ensure that other parts of the 970IO::AIO::poll_cb with low priority, to ensure that other parts of the
946program get the CPU sometimes even under high AIO load. 971program get the CPU sometimes even under high AIO load.
947 972
948 # try not to spend much more than 0.1s in poll_cb 973 # try not to spend much more than 0.1s in poll_cb
949 IO::AIO::max_poll_time 0.1; 974 IO::AIO::max_poll_time 0.1;
950 975
953 poll => 'r', nice => 1, 978 poll => 'r', nice => 1,
954 cb => &IO::AIO::poll_cb); 979 cb => &IO::AIO::poll_cb);
955 980
956=item IO::AIO::poll_wait 981=item IO::AIO::poll_wait
957 982
983If there are any outstanding requests and none of them in the result
958Wait till the result filehandle becomes ready for reading (simply does a 984phase, wait till the result filehandle becomes ready for reading (simply
959C<select> on the filehandle. This is useful if you want to synchronously 985does a C<select> on the filehandle. This is useful if you want to
960wait for some requests to finish). 986synchronously wait for some requests to finish).
961 987
962See C<nreqs> for an example. 988See C<nreqs> for an example.
963 989
964=item IO::AIO::poll 990=item IO::AIO::poll
965 991
966Waits until some requests have been handled. 992Waits until some requests have been handled.
967 993
994Returns the number of requests processed, but is otherwise strictly
968Strictly equivalent to: 995equivalent to:
969 996
970 IO::AIO::poll_wait, IO::AIO::poll_cb 997 IO::AIO::poll_wait, IO::AIO::poll_cb
971 if IO::AIO::nreqs;
972 998
973=item IO::AIO::flush 999=item IO::AIO::flush
974 1000
975Wait till all outstanding AIO requests have been handled. 1001Wait till all outstanding AIO requests have been handled.
976 1002
1090 *$sym 1116 *$sym
1091} 1117}
1092 1118
1093min_parallel 8; 1119min_parallel 8;
1094 1120
1095END { 1121END { flush }
1096 min_parallel 1;
1097 flush;
1098};
1099 1122
11001; 11231;
1101 1124
1102=head2 FORK BEHAVIOUR 1125=head2 FORK BEHAVIOUR
1103 1126

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines