--- IO-AIO/AIO.pm 2006/10/29 01:50:29 1.88 +++ IO-AIO/AIO.pm 2007/09/24 18:14:00 1.115 @@ -7,7 +7,8 @@ use IO::AIO; aio_open "/etc/passwd", O_RDONLY, 0, sub { - my ($fh) = @_; + my $fh = shift + or die "/etc/passwd: $!"; ... }; @@ -63,11 +64,11 @@ on a RAID volume or over NFS when you do a number of stat operations concurrently. -While this works on all types of file descriptors (for example sockets), -using these functions on file descriptors that support nonblocking -operation (again, sockets, pipes etc.) is very inefficient. Use an event -loop for that (such as the L module): IO::AIO will naturally -fit into such an event loop itself. +While most of this works on all types of file descriptors (for +example sockets), using these functions on file descriptors that +support nonblocking operation (again, sockets, pipes etc.) is very +inefficient. Use an event loop for that (such as the L +module): IO::AIO will naturally fit into such an event loop itself. In this version, a number of threads are started that execute your requests and signal their completion. You don't need thread support @@ -79,10 +80,10 @@ aio_write, so the remaining functionality would have to be implemented using threads anyway. -Although the module will work with in the presence of other (Perl-) -threads, it is currently not reentrant in any way, so use appropriate -locking yourself, always call C from within the same thread, or -never call C (or other C functions) recursively. +Although the module will work in the presence of other (Perl-) threads, +it is currently not reentrant in any way, so use appropriate locking +yourself, always call C from within the same thread, or never +call C (or other C functions) recursively. =head2 EXAMPLE @@ -100,7 +101,7 @@ # queue the request to open /etc/passwd aio_open "/etc/passwd", O_RDONLY, 0, sub { - my $fh = $_[0] + my $fh = shift or die "error while opening: $!"; # stat'ing filehandles is generally non-blocking @@ -190,13 +191,14 @@ use base 'Exporter'; BEGIN { - our $VERSION = '2.1'; + our $VERSION = '2.41'; our @AIO_REQ = qw(aio_sendfile aio_read aio_write aio_open aio_close aio_stat aio_lstat aio_unlink aio_rmdir aio_readdir aio_scandir aio_symlink - aio_fsync aio_fdatasync aio_readahead aio_rename aio_link aio_move - aio_copy aio_group aio_nop aio_mknod); - our @EXPORT = (@AIO_REQ, qw(aioreq_pri aioreq_nice)); + aio_readlink aio_fsync aio_fdatasync aio_readahead aio_rename aio_link + aio_move aio_copy aio_group aio_nop aio_mknod aio_load aio_rmtree aio_mkdir + aio_chown aio_chmod aio_utime aio_truncate); + our @EXPORT = (@AIO_REQ, qw(aioreq_pri aioreq_nice aio_block)); our @EXPORT_OK = qw(poll_fileno poll_cb poll_wait flush min_parallel max_parallel max_idle nreqs nready npending nthreads @@ -271,11 +273,13 @@ }; }; + =item aioreq_nice $pri_adjust Similar to C, but subtracts the given value from the current priority, so the effect is cumulative. + =item aio_open $pathname, $flags, $mode, $callback->($fh) Asynchronously open or create a file and call the callback with a newly @@ -290,7 +294,9 @@ Likewise, C<$mode> specifies the mode of the newly created file, if it didn't exist and C has been given, just like perl's C, except that it is mandatory (i.e. use C<0> if you don't create new files, -and C<0666> or C<0777> if you do). +and C<0666> or C<0777> if you do). Note that the C<$mode> will be modified +by the umask in effect then the request is being executed, so better never +change the umask. Example: @@ -303,6 +309,7 @@ } }; + =item aio_close $fh, $callback->($status) Asynchronously close a file and call the callback with the result @@ -314,18 +321,28 @@ This is supposed to be a bug in the API, so that might change. It's therefore best to avoid this function. + =item aio_read $fh,$offset,$length, $data,$dataoffset, $callback->($retval) =item aio_write $fh,$offset,$length, $data,$dataoffset, $callback->($retval) -Reads or writes C bytes from the specified C and C -into the scalar given by C and offset C and calls the +Reads or writes C<$length> bytes from the specified C<$fh> and C<$offset> +into the scalar given by C<$data> and offset C<$dataoffset> and calls the callback without the actual number of bytes read (or -1 on error, just like the syscall). +If C<$offset> is undefined, then the current file descriptor offset will +be used (and updated), otherwise the file descriptor offset will not be +changed by these calls. + +If C<$length> is undefined in C, use the remaining length of C<$data>. + +If C<$dataoffset> is less than zero, it will be counted from the end of +C<$data>. + The C<$data> scalar I be modified in any way while the request -is outstanding. Modifying it can result in segfaults or WW3 (if the -necessary/optional hardware is installed). +is outstanding. Modifying it can result in segfaults or World War III (if +the necessary/optional hardware is installed). Example: Read 15 bytes at offset 7 into scalar C<$buffer>, starting at offset C<0> within the scalar: @@ -335,6 +352,7 @@ print "read $_[0] bytes: <$buffer>\n"; }; + =item aio_sendfile $out_fh, $in_fh, $in_offset, $length, $callback->($retval) Tries to copy C<$length> bytes from C<$in_fh> to C<$out_fh>. It starts @@ -358,6 +376,7 @@ value equals C<$length> one can assume that C<$length> bytes have been read. + =item aio_readahead $fh,$offset,$length, $callback->($retval) C populates the page cache with data from a file so that @@ -372,6 +391,7 @@ If that syscall doesn't exist (likely if your OS isn't Linux) it will be emulated by simply reading the data, which would have a similar effect. + =item aio_stat $fh_or_path, $callback->($status) =item aio_lstat $fh, $callback->($status) @@ -394,11 +414,54 @@ print "size is ", -s _, "\n"; }; + +=item aio_utime $fh_or_path, $atime, $mtime, $callback->($status) + +Works like perl's C function (including the special case of $atime +and $mtime being undef). Fractional times are supported if the underlying +syscalls support them. + +When called with a pathname, uses utimes(2) if available, otherwise +utime(2). If called on a file descriptor, uses futimes(2) if available, +otherwise returns ENOSYS, so this is not portable. + +Examples: + + # set atime and mtime to current time (basically touch(1)): + aio_utime "path", undef, undef; + # set atime to current time and mtime to beginning of the epoch: + aio_utime "path", time, undef; # undef==0 + + +=item aio_chown $fh_or_path, $uid, $gid, $callback->($status) + +Works like perl's C function, except that C for either $uid +or $gid is being interpreted as "do not change" (but -1 can also be used). + +Examples: + + # same as "chown root path" in the shell: + aio_chown "path", 0, -1; + # same as above: + aio_chown "path", 0, undef; + + +=item aio_truncate $fh_or_path, $offset, $callback->($status) + +Works like truncate(2) or ftruncate(2). + + +=item aio_chmod $fh_or_path, $mode, $callback->($status) + +Works like perl's C function. + + =item aio_unlink $pathname, $callback->($status) Asynchronously unlink (delete) a file and call the callback with the result code. + =item aio_mknod $path, $mode, $dev, $callback->($status) [EXPERIMENTAL] @@ -409,26 +472,45 @@ aio_mknod $path, IO::AIO::S_IFIFO | $mode, 0, sub { ... + =item aio_link $srcpath, $dstpath, $callback->($status) Asynchronously create a new link to the existing object at C<$srcpath> at the path C<$dstpath> and call the callback with the result code. + =item aio_symlink $srcpath, $dstpath, $callback->($status) Asynchronously create a new symbolic link to the existing object at C<$srcpath> at the path C<$dstpath> and call the callback with the result code. + +=item aio_readlink $path, $callback->($link) + +Asynchronously read the symlink specified by C<$path> and pass it to +the callback. If an error occurs, nothing or undef gets passed to the +callback. + + =item aio_rename $srcpath, $dstpath, $callback->($status) Asynchronously rename the object at C<$srcpath> to C<$dstpath>, just as rename(2) and call the callback with the result code. + +=item aio_mkdir $pathname, $mode, $callback->($status) + +Asynchronously mkdir (create) a directory and call the callback with +the result code. C<$mode> will be modified by the umask at the time the +request is executed, so do not change your umask. + + =item aio_rmdir $pathname, $callback->($status) Asynchronously rmdir (delete) a directory and call the callback with the result code. + =item aio_readdir $pathname, $callback->($entries) Unlike the POSIX call of the same name, C reads an entire @@ -438,6 +520,37 @@ The callback a single argument which is either C or an array-ref with the filenames. + +=item aio_load $path, $data, $callback->($status) + +This is a composite request that tries to fully load the given file into +memory. Status is the same as with aio_read. + +=cut + +sub aio_load($$;$) { + aio_block { + my ($path, undef, $cb) = @_; + my $data = \$_[1]; + + my $pri = aioreq_pri; + my $grp = aio_group $cb; + + aioreq_pri $pri; + add $grp aio_open $path, O_RDONLY, 0, sub { + my $fh = shift + or return $grp->result (-1); + + aioreq_pri $pri; + add $grp aio_read $fh, 0, (-s $fh), $$data, 0, sub { + $grp->result ($_[0]); + }; + }; + + $grp + } +} + =item aio_copy $srcpath, $dstpath, $callback->($status) Try to copy the I (directories not supported as either source or @@ -456,50 +569,52 @@ =cut sub aio_copy($$;$) { - my ($src, $dst, $cb) = @_; + aio_block { + my ($src, $dst, $cb) = @_; - my $pri = aioreq_pri; - my $grp = aio_group $cb; + my $pri = aioreq_pri; + my $grp = aio_group $cb; - aioreq_pri $pri; - add $grp aio_open $src, O_RDONLY, 0, sub { - if (my $src_fh = $_[0]) { - my @stat = stat $src_fh; - - aioreq_pri $pri; - add $grp aio_open $dst, O_CREAT | O_WRONLY | O_TRUNC, 0200, sub { - if (my $dst_fh = $_[0]) { - aioreq_pri $pri; - add $grp aio_sendfile $dst_fh, $src_fh, 0, $stat[7], sub { - if ($_[0] == $stat[7]) { - $grp->result (0); - close $src_fh; - - # those should not normally block. should. should. - utime $stat[8], $stat[9], $dst; - chmod $stat[2] & 07777, $dst_fh; - chown $stat[4], $stat[5], $dst_fh; - close $dst_fh; - } else { - $grp->result (-1); - close $src_fh; - close $dst_fh; - - aioreq $pri; - add $grp aio_unlink $dst; - } - }; - } else { - $grp->result (-1); - } - }, + aioreq_pri $pri; + add $grp aio_open $src, O_RDONLY, 0, sub { + if (my $src_fh = $_[0]) { + my @stat = stat $src_fh; + + aioreq_pri $pri; + add $grp aio_open $dst, O_CREAT | O_WRONLY | O_TRUNC, 0200, sub { + if (my $dst_fh = $_[0]) { + aioreq_pri $pri; + add $grp aio_sendfile $dst_fh, $src_fh, 0, $stat[7], sub { + if ($_[0] == $stat[7]) { + $grp->result (0); + close $src_fh; + + # those should not normally block. should. should. + utime $stat[8], $stat[9], $dst; + chmod $stat[2] & 07777, $dst_fh; + chown $stat[4], $stat[5], $dst_fh; + close $dst_fh; + } else { + $grp->result (-1); + close $src_fh; + close $dst_fh; - } else { - $grp->result (-1); - } - }; + aioreq $pri; + add $grp aio_unlink $dst; + } + }; + } else { + $grp->result (-1); + } + }, + + } else { + $grp->result (-1); + } + }; - $grp + $grp + } } =item aio_move $srcpath, $dstpath, $callback->($status) @@ -515,29 +630,31 @@ =cut sub aio_move($$;$) { - my ($src, $dst, $cb) = @_; + aio_block { + my ($src, $dst, $cb) = @_; - my $pri = aioreq_pri; - my $grp = aio_group $cb; + my $pri = aioreq_pri; + my $grp = aio_group $cb; - aioreq_pri $pri; - add $grp aio_rename $src, $dst, sub { - if ($_[0] && $! == EXDEV) { - aioreq_pri $pri; - add $grp aio_copy $src, $dst, sub { + aioreq_pri $pri; + add $grp aio_rename $src, $dst, sub { + if ($_[0] && $! == EXDEV) { + aioreq_pri $pri; + add $grp aio_copy $src, $dst, sub { + $grp->result ($_[0]); + + if (!$_[0]) { + aioreq_pri $pri; + add $grp aio_unlink $src; + } + }; + } else { $grp->result ($_[0]); + } + }; - if (!$_[0]) { - aioreq_pri $pri; - add $grp aio_unlink $src; - } - }; - } else { - $grp->result ($_[0]); - } - }; - - $grp + $grp + } } =item aio_scandir $path, $maxreq, $callback->($dirs, $nondirs) @@ -594,90 +711,129 @@ =cut -sub aio_scandir($$$) { - my ($path, $maxreq, $cb) = @_; - - my $pri = aioreq_pri; +sub aio_scandir($$;$) { + aio_block { + my ($path, $maxreq, $cb) = @_; - my $grp = aio_group $cb; + my $pri = aioreq_pri; - $maxreq = 4 if $maxreq <= 0; + my $grp = aio_group $cb; - # stat once - aioreq_pri $pri; - add $grp aio_stat $path, sub { - return $grp->result () if $_[0]; - my $now = time; - my $hash1 = join ":", (stat _)[0,1,3,7,9]; + $maxreq = 4 if $maxreq <= 0; - # read the directory entries + # stat once aioreq_pri $pri; - add $grp aio_readdir $path, sub { - my $entries = shift - or return $grp->result (); + add $grp aio_stat $path, sub { + return $grp->result () if $_[0]; + my $now = time; + my $hash1 = join ":", (stat _)[0,1,3,7,9]; - # stat the dir another time + # read the directory entries aioreq_pri $pri; - add $grp aio_stat $path, sub { - my $hash2 = join ":", (stat _)[0,1,3,7,9]; + add $grp aio_readdir $path, sub { + my $entries = shift + or return $grp->result (); + + # stat the dir another time + aioreq_pri $pri; + add $grp aio_stat $path, sub { + my $hash2 = join ":", (stat _)[0,1,3,7,9]; + + my $ndirs; + + # take the slow route if anything looks fishy + if ($hash1 ne $hash2 or (stat _)[9] == $now) { + $ndirs = -1; + } else { + # if nlink == 2, we are finished + # on non-posix-fs's, we rely on nlink < 2 + $ndirs = (stat _)[3] - 2 + or return $grp->result ([], $entries); + } + + # sort into likely dirs and likely nondirs + # dirs == files without ".", short entries first + $entries = [map $_->[0], + sort { $b->[1] cmp $a->[1] } + map [$_, sprintf "%s%04d", (/.\./ ? "1" : "0"), length], + @$entries]; - my $ndirs; + my (@dirs, @nondirs); - # take the slow route if anything looks fishy - if ($hash1 ne $hash2 or (stat _)[9] == $now) { - $ndirs = -1; - } else { - # if nlink == 2, we are finished - # on non-posix-fs's, we rely on nlink < 2 - $ndirs = (stat _)[3] - 2 - or return $grp->result ([], $entries); - } - - # sort into likely dirs and likely nondirs - # dirs == files without ".", short entries first - $entries = [map $_->[0], - sort { $b->[1] cmp $a->[1] } - map [$_, sprintf "%s%04d", (/.\./ ? "1" : "0"), length], - @$entries]; - - my (@dirs, @nondirs); - - my $statgrp = add $grp aio_group sub { - $grp->result (\@dirs, \@nondirs); - }; + my $statgrp = add $grp aio_group sub { + $grp->result (\@dirs, \@nondirs); + }; - limit $statgrp $maxreq; - feed $statgrp sub { - return unless @$entries; - my $entry = pop @$entries; - - aioreq_pri $pri; - add $statgrp aio_stat "$path/$entry/.", sub { - if ($_[0] < 0) { - push @nondirs, $entry; - } else { - # need to check for real directory - aioreq_pri $pri; - add $statgrp aio_lstat "$path/$entry", sub { - if (-d _) { - push @dirs, $entry; - - unless (--$ndirs) { - push @nondirs, @$entries; - feed $statgrp; + limit $statgrp $maxreq; + feed $statgrp sub { + return unless @$entries; + my $entry = pop @$entries; + + aioreq_pri $pri; + add $statgrp aio_stat "$path/$entry/.", sub { + if ($_[0] < 0) { + push @nondirs, $entry; + } else { + # need to check for real directory + aioreq_pri $pri; + add $statgrp aio_lstat "$path/$entry", sub { + if (-d _) { + push @dirs, $entry; + + unless (--$ndirs) { + push @nondirs, @$entries; + feed $statgrp; + } + } else { + push @nondirs, $entry; } - } else { - push @nondirs, $entry; } } - } + }; }; }; }; }; - }; - $grp + $grp + } +} + +=item aio_rmtree $path, $callback->($status) + +Delete a directory tree starting (and including) C<$path>, return the +status of the final C only. This is a composite request that +uses C to recurse into and rmdir directories, and unlink +everything else. + +=cut + +sub aio_rmtree; +sub aio_rmtree($;$) { + aio_block { + my ($path, $cb) = @_; + + my $pri = aioreq_pri; + my $grp = aio_group $cb; + + aioreq_pri $pri; + add $grp aio_scandir $path, 0, sub { + my ($dirs, $nondirs) = @_; + + my $dirgrp = aio_group sub { + add $grp aio_rmdir $path, sub { + $grp->result ($_[0]); + }; + }; + + (aioreq_pri $pri), add $dirgrp aio_rmtree "$path/$_" for @$dirs; + (aioreq_pri $pri), add $dirgrp aio_unlink "$path/$_" for @$nondirs; + + add $grp $dirgrp; + }; + + $grp + } } =item aio_fsync $fh, $callback->($status) @@ -941,6 +1097,11 @@ C to process requests (more correctly the mininum amount of time C is allowed to use). +Setting C to a non-zero value creates an overhead of one +syscall per request processed, which is not normally a problem unless your +callbacks are really really fast or your OS is really really slow (I am +not mentioning Solaris here). Using C incurs no overhead. + Setting these is useful if you want to ensure some level of interactiveness when perl is not fast enough to process all requests in time. @@ -948,7 +1109,7 @@ For interactive programs, values such as C<0.01> to C<0.1> should be fine. Example: Install an Event watcher that automatically calls -IO::AIO::poll_some with low priority, to ensure that other parts of the +IO::AIO::poll_cb with low priority, to ensure that other parts of the program get the CPU sometimes even under high AIO load. # try not to spend much more than 0.1s in poll_cb @@ -961,9 +1122,10 @@ =item IO::AIO::poll_wait -Wait till the result filehandle becomes ready for reading (simply does a -C on the filehandle. This is useful if you want to +synchronously wait for some requests to finish). See C for an example. @@ -971,10 +1133,10 @@ Waits until some requests have been handled. -Strictly equivalent to: +Returns the number of requests processed, but is otherwise strictly +equivalent to: IO::AIO::poll_wait, IO::AIO::poll_cb - if IO::AIO::nreqs; =item IO::AIO::flush @@ -985,8 +1147,12 @@ IO::AIO::poll_wait, IO::AIO::poll_cb while IO::AIO::nreqs; +=back + =head3 CONTROLLING THE NUMBER OF THREADS +=over + =item IO::AIO::min_parallel $nthreads Set the minimum number of AIO threads to C<$nthreads>. The current @@ -1043,7 +1209,7 @@ use an C together with a feed callback. Sets the maximum number of outstanding requests to C<$nreqs>. If you -to queue up more than this number of requests, the next call to the +do queue up more than this number of requests, the next call to the C (and C and other functions calling C) function will block until the limit is no longer exceeded. @@ -1054,8 +1220,12 @@ C is mainly useful in simple scripts (with low values) or as a stop gap to shield against fatal memory overflow (with large values). +=back + =head3 STATISTICAL INFORMATION +=over + =item IO::AIO::nreqs Returns the number of requests currently in the ready, execute or pending @@ -1098,10 +1268,7 @@ min_parallel 8; -END { - min_parallel 1; - flush; -}; +END { flush } 1; @@ -1131,7 +1298,7 @@ scalars and other data passed into aio requests will also be locked and will consume memory till the request has entered the done state. -This is now awfully much, so queuing lots of requests is not usually a +This is not awfully much, so queuing lots of requests is not usually a problem. Per-thread usage: