--- IO-AIO/AIO.pm 2006/10/28 23:32:29 1.86 +++ IO-AIO/AIO.pm 2007/03/24 19:19:11 1.104 @@ -7,7 +7,8 @@ use IO::AIO; aio_open "/etc/passwd", O_RDONLY, 0, sub { - my ($fh) = @_; + my $fh = shift + or die "/etc/passwd: $!"; ... }; @@ -63,11 +64,12 @@ on a RAID volume or over NFS when you do a number of stat operations concurrently. -While this works on all types of file descriptors (for example sockets), -using these functions on file descriptors that support nonblocking -operation (again, sockets, pipes etc.) is very inefficient. Use an event -loop for that (such as the L module): IO::AIO will naturally -fit into such an event loop itself. +While most of this works on all types of file descriptors (for example +sockets), using these functions on file descriptors that support +nonblocking operation (again, sockets, pipes etc.) is very inefficient or +might not work (aio_read fails on sockets/pipes/fifos). Use an event loop +for that (such as the L module): IO::AIO will naturally fit +into such an event loop itself. In this version, a number of threads are started that execute your requests and signal their completion. You don't need thread support @@ -100,7 +102,7 @@ # queue the request to open /etc/passwd aio_open "/etc/passwd", O_RDONLY, 0, sub { - my $fh = $_[0] + my $fh = shift or die "error while opening: $!"; # stat'ing filehandles is generally non-blocking @@ -178,6 +180,8 @@ aio request is severed and calling its methods will either do nothing or result in a runtime error). +=back + =cut package IO::AIO; @@ -188,13 +192,13 @@ use base 'Exporter'; BEGIN { - our $VERSION = '2.1'; + our $VERSION = '2.33'; our @AIO_REQ = qw(aio_sendfile aio_read aio_write aio_open aio_close aio_stat aio_lstat aio_unlink aio_rmdir aio_readdir aio_scandir aio_symlink - aio_fsync aio_fdatasync aio_readahead aio_rename aio_link aio_move - aio_copy aio_group aio_nop aio_mknod); - our @EXPORT = (@AIO_REQ, qw(aioreq_pri aioreq_nice)); + aio_readlink aio_fsync aio_fdatasync aio_readahead aio_rename aio_link + aio_move aio_copy aio_group aio_nop aio_mknod aio_load aio_rmtree aio_mkdir); + our @EXPORT = (@AIO_REQ, qw(aioreq_pri aioreq_nice aio_block)); our @EXPORT_OK = qw(poll_fileno poll_cb poll_wait flush min_parallel max_parallel max_idle nreqs nready npending nthreads @@ -208,7 +212,7 @@ =head1 FUNCTIONS -=head2 AIO FUNCTIONS +=head2 AIO REQUEST FUNCTIONS All the C calls are more or less thin wrappers around the syscall with the same name (sans C). The arguments are similar or identical, @@ -221,21 +225,25 @@ All functions expecting a filehandle keep a copy of the filehandle internally until the request has finished. -All requests return objects of type L that allow further -manipulation of those requests while they are in-flight. +All functions return request objects of type L that allow +further manipulation of those requests while they are in-flight. The pathnames you pass to these routines I be absolute and -encoded in byte form. The reason for the former is that at the time the +encoded as octets. The reason for the former is that at the time the request is being executed, the current working directory could have changed. Alternatively, you can make sure that you never change the -current working directory. +current working directory anywhere in the program and then use relative +paths. -To encode pathnames to byte form, either make sure you either: a) -always pass in filenames you got from outside (command line, readdir -etc.), b) are ASCII or ISO 8859-1, c) use the Encode module and encode +To encode pathnames as octets, either make sure you either: a) always pass +in filenames you got from outside (command line, readdir etc.) without +tinkering, b) are ASCII or ISO 8859-1, c) use the Encode module and encode your pathnames to the locale (or other) encoding in effect in the user environment, d) use Glib::filename_from_unicode on unicode filenames or e) -use something else. +use something else to ensure your scalar has the correct contents. + +This works, btw. independent of the internal UTF-8 bit, which IO::AIO +handles correctly wether it is set or not. =over 4 @@ -268,7 +276,7 @@ =item aioreq_nice $pri_adjust Similar to C, but subtracts the given value from the current -priority, so effects are cumulative. +priority, so the effect is cumulative. =item aio_open $pathname, $flags, $mode, $callback->($fh) @@ -284,7 +292,9 @@ Likewise, C<$mode> specifies the mode of the newly created file, if it didn't exist and C has been given, just like perl's C, except that it is mandatory (i.e. use C<0> if you don't create new files, -and C<0666> or C<0777> if you do). +and C<0666> or C<0777> if you do). Note that the C<$mode> will be modified +by the umask in effect then the request is being executed, so better never +change the umask. Example: @@ -413,11 +423,23 @@ Asynchronously create a new symbolic link to the existing object at C<$srcpath> at the path C<$dstpath> and call the callback with the result code. +=item aio_readlink $path, $callback->($link) + +Asynchronously read the symlink specified by C<$path> and pass it to +the callback. If an error occurs, nothing or undef gets passed to the +callback. + =item aio_rename $srcpath, $dstpath, $callback->($status) Asynchronously rename the object at C<$srcpath> to C<$dstpath>, just as rename(2) and call the callback with the result code. +=item aio_mkdir $pathname, $mode, $callback->($status) + +Asynchronously mkdir (create) a directory and call the callback with +the result code. C<$mode> will be modified by the umask at the time the +request is executed, so do not change your umask. + =item aio_rmdir $pathname, $callback->($status) Asynchronously rmdir (delete) a directory and call the callback with the @@ -432,6 +454,36 @@ The callback a single argument which is either C or an array-ref with the filenames. +=item aio_load $path, $data, $callback->($status) + +This is a composite request that tries to fully load the given file into +memory. Status is the same as with aio_read. + +=cut + +sub aio_load($$;$) { + aio_block { + my ($path, undef, $cb) = @_; + my $data = \$_[1]; + + my $pri = aioreq_pri; + my $grp = aio_group $cb; + + aioreq_pri $pri; + add $grp aio_open $path, O_RDONLY, 0, sub { + my $fh = shift + or return $grp->result (-1); + + aioreq_pri $pri; + add $grp aio_read $fh, 0, (-s $fh), $$data, 0, sub { + $grp->result ($_[0]); + }; + }; + + $grp + } +} + =item aio_copy $srcpath, $dstpath, $callback->($status) Try to copy the I (directories not supported as either source or @@ -450,50 +502,52 @@ =cut sub aio_copy($$;$) { - my ($src, $dst, $cb) = @_; - - my $pri = aioreq_pri; - my $grp = aio_group $cb; + aio_block { + my ($src, $dst, $cb) = @_; - aioreq_pri $pri; - add $grp aio_open $src, O_RDONLY, 0, sub { - if (my $src_fh = $_[0]) { - my @stat = stat $src_fh; + my $pri = aioreq_pri; + my $grp = aio_group $cb; - aioreq_pri $pri; - add $grp aio_open $dst, O_CREAT | O_WRONLY | O_TRUNC, 0200, sub { - if (my $dst_fh = $_[0]) { - aioreq_pri $pri; - add $grp aio_sendfile $dst_fh, $src_fh, 0, $stat[7], sub { - if ($_[0] == $stat[7]) { - $grp->result (0); - close $src_fh; - - # those should not normally block. should. should. - utime $stat[8], $stat[9], $dst; - chmod $stat[2] & 07777, $dst_fh; - chown $stat[4], $stat[5], $dst_fh; - close $dst_fh; - } else { - $grp->result (-1); - close $src_fh; - close $dst_fh; - - aioreq $pri; - add $grp aio_unlink $dst; - } - }; - } else { - $grp->result (-1); - } - }, + aioreq_pri $pri; + add $grp aio_open $src, O_RDONLY, 0, sub { + if (my $src_fh = $_[0]) { + my @stat = stat $src_fh; + + aioreq_pri $pri; + add $grp aio_open $dst, O_CREAT | O_WRONLY | O_TRUNC, 0200, sub { + if (my $dst_fh = $_[0]) { + aioreq_pri $pri; + add $grp aio_sendfile $dst_fh, $src_fh, 0, $stat[7], sub { + if ($_[0] == $stat[7]) { + $grp->result (0); + close $src_fh; + + # those should not normally block. should. should. + utime $stat[8], $stat[9], $dst; + chmod $stat[2] & 07777, $dst_fh; + chown $stat[4], $stat[5], $dst_fh; + close $dst_fh; + } else { + $grp->result (-1); + close $src_fh; + close $dst_fh; - } else { - $grp->result (-1); - } - }; + aioreq $pri; + add $grp aio_unlink $dst; + } + }; + } else { + $grp->result (-1); + } + }, + + } else { + $grp->result (-1); + } + }; - $grp + $grp + } } =item aio_move $srcpath, $dstpath, $callback->($status) @@ -509,29 +563,31 @@ =cut sub aio_move($$;$) { - my ($src, $dst, $cb) = @_; + aio_block { + my ($src, $dst, $cb) = @_; - my $pri = aioreq_pri; - my $grp = aio_group $cb; + my $pri = aioreq_pri; + my $grp = aio_group $cb; - aioreq_pri $pri; - add $grp aio_rename $src, $dst, sub { - if ($_[0] && $! == EXDEV) { - aioreq_pri $pri; - add $grp aio_copy $src, $dst, sub { + aioreq_pri $pri; + add $grp aio_rename $src, $dst, sub { + if ($_[0] && $! == EXDEV) { + aioreq_pri $pri; + add $grp aio_copy $src, $dst, sub { + $grp->result ($_[0]); + + if (!$_[0]) { + aioreq_pri $pri; + add $grp aio_unlink $src; + } + }; + } else { $grp->result ($_[0]); + } + }; - if (!$_[0]) { - aioreq_pri $pri; - add $grp aio_unlink $src; - } - }; - } else { - $grp->result ($_[0]); - } - }; - - $grp + $grp + } } =item aio_scandir $path, $maxreq, $callback->($dirs, $nondirs) @@ -588,90 +644,129 @@ =cut -sub aio_scandir($$$) { - my ($path, $maxreq, $cb) = @_; +sub aio_scandir($$;$) { + aio_block { + my ($path, $maxreq, $cb) = @_; - my $pri = aioreq_pri; + my $pri = aioreq_pri; - my $grp = aio_group $cb; + my $grp = aio_group $cb; - $maxreq = 4 if $maxreq <= 0; + $maxreq = 4 if $maxreq <= 0; - # stat once - aioreq_pri $pri; - add $grp aio_stat $path, sub { - return $grp->result () if $_[0]; - my $now = time; - my $hash1 = join ":", (stat _)[0,1,3,7,9]; - - # read the directory entries + # stat once aioreq_pri $pri; - add $grp aio_readdir $path, sub { - my $entries = shift - or return $grp->result (); + add $grp aio_stat $path, sub { + return $grp->result () if $_[0]; + my $now = time; + my $hash1 = join ":", (stat _)[0,1,3,7,9]; - # stat the dir another time + # read the directory entries aioreq_pri $pri; - add $grp aio_stat $path, sub { - my $hash2 = join ":", (stat _)[0,1,3,7,9]; - - my $ndirs; + add $grp aio_readdir $path, sub { + my $entries = shift + or return $grp->result (); + + # stat the dir another time + aioreq_pri $pri; + add $grp aio_stat $path, sub { + my $hash2 = join ":", (stat _)[0,1,3,7,9]; + + my $ndirs; + + # take the slow route if anything looks fishy + if ($hash1 ne $hash2 or (stat _)[9] == $now) { + $ndirs = -1; + } else { + # if nlink == 2, we are finished + # on non-posix-fs's, we rely on nlink < 2 + $ndirs = (stat _)[3] - 2 + or return $grp->result ([], $entries); + } + + # sort into likely dirs and likely nondirs + # dirs == files without ".", short entries first + $entries = [map $_->[0], + sort { $b->[1] cmp $a->[1] } + map [$_, sprintf "%s%04d", (/.\./ ? "1" : "0"), length], + @$entries]; - # take the slow route if anything looks fishy - if ($hash1 ne $hash2 or (stat _)[9] == $now) { - $ndirs = -1; - } else { - # if nlink == 2, we are finished - # on non-posix-fs's, we rely on nlink < 2 - $ndirs = (stat _)[3] - 2 - or return $grp->result ([], $entries); - } - - # sort into likely dirs and likely nondirs - # dirs == files without ".", short entries first - $entries = [map $_->[0], - sort { $b->[1] cmp $a->[1] } - map [$_, sprintf "%s%04d", (/.\./ ? "1" : "0"), length], - @$entries]; + my (@dirs, @nondirs); - my (@dirs, @nondirs); - - my $statgrp = add $grp aio_group sub { - $grp->result (\@dirs, \@nondirs); - }; + my $statgrp = add $grp aio_group sub { + $grp->result (\@dirs, \@nondirs); + }; - limit $statgrp $maxreq; - feed $statgrp sub { - return unless @$entries; - my $entry = pop @$entries; - - aioreq_pri $pri; - add $statgrp aio_stat "$path/$entry/.", sub { - if ($_[0] < 0) { - push @nondirs, $entry; - } else { - # need to check for real directory - aioreq_pri $pri; - add $statgrp aio_lstat "$path/$entry", sub { - if (-d _) { - push @dirs, $entry; - - unless (--$ndirs) { - push @nondirs, @$entries; - feed $statgrp; + limit $statgrp $maxreq; + feed $statgrp sub { + return unless @$entries; + my $entry = pop @$entries; + + aioreq_pri $pri; + add $statgrp aio_stat "$path/$entry/.", sub { + if ($_[0] < 0) { + push @nondirs, $entry; + } else { + # need to check for real directory + aioreq_pri $pri; + add $statgrp aio_lstat "$path/$entry", sub { + if (-d _) { + push @dirs, $entry; + + unless (--$ndirs) { + push @nondirs, @$entries; + feed $statgrp; + } + } else { + push @nondirs, $entry; } - } else { - push @nondirs, $entry; } } - } + }; }; }; }; }; - }; - $grp + $grp + } +} + +=item aio_rmtree $path, $callback->($status) + +Delete a directory tree starting (and including) C<$path>, return the +status of the final C only. This is a composite request that +uses C to recurse into and rmdir directories, and unlink +everything else. + +=cut + +sub aio_rmtree; +sub aio_rmtree($;$) { + aio_block { + my ($path, $cb) = @_; + + my $pri = aioreq_pri; + my $grp = aio_group $cb; + + aioreq_pri $pri; + add $grp aio_scandir $path, 0, sub { + my ($dirs, $nondirs) = @_; + + my $dirgrp = aio_group sub { + add $grp aio_rmdir $path, sub { + $grp->result ($_[0]); + }; + }; + + (aioreq_pri $pri), add $dirgrp aio_rmtree "$path/$_" for @$dirs; + (aioreq_pri $pri), add $dirgrp aio_unlink "$path/$_" for @$nondirs; + + add $grp $dirgrp; + }; + + $grp + } } =item aio_fsync $fh, $callback->($status) @@ -935,6 +1030,11 @@ C to process requests (more correctly the mininum amount of time C is allowed to use). +Setting C to a non-zero value creates an overhead of one +syscall per request processed, which is not normally a problem unless your +callbacks are really really fast or your OS is really really slow (I am +not mentioning Solaris here). Using C incurs no overhead. + Setting these is useful if you want to ensure some level of interactiveness when perl is not fast enough to process all requests in time. @@ -942,7 +1042,7 @@ For interactive programs, values such as C<0.01> to C<0.1> should be fine. Example: Install an Event watcher that automatically calls -IO::AIO::poll_some with low priority, to ensure that other parts of the +IO::AIO::poll_cb with low priority, to ensure that other parts of the program get the CPU sometimes even under high AIO load. # try not to spend much more than 0.1s in poll_cb @@ -955,9 +1055,10 @@ =item IO::AIO::poll_wait -Wait till the result filehandle becomes ready for reading (simply does a -C on the filehandle. This is useful if you want to +synchronously wait for some requests to finish). See C for an example. @@ -965,10 +1066,10 @@ Waits until some requests have been handled. -Strictly equivalent to: +Returns the number of requests processed, but is otherwise strictly +equivalent to: IO::AIO::poll_wait, IO::AIO::poll_cb - if IO::AIO::nreqs; =item IO::AIO::flush @@ -979,6 +1080,8 @@ IO::AIO::poll_wait, IO::AIO::poll_cb while IO::AIO::nreqs; +=back + =head3 CONTROLLING THE NUMBER OF THREADS =item IO::AIO::min_parallel $nthreads @@ -1048,8 +1151,12 @@ C is mainly useful in simple scripts (with low values) or as a stop gap to shield against fatal memory overflow (with large values). +=back + =head3 STATISTICAL INFORMATION +=over + =item IO::AIO::nreqs Returns the number of requests currently in the ready, execute or pending @@ -1092,10 +1199,7 @@ min_parallel 8; -END { - min_parallel 1; - flush; -}; +END { flush } 1;