--- IO-AIO/AIO.pm 2006/10/28 23:32:29 1.86 +++ IO-AIO/AIO.pm 2006/12/23 04:49:37 1.97 @@ -7,7 +7,8 @@ use IO::AIO; aio_open "/etc/passwd", O_RDONLY, 0, sub { - my ($fh) = @_; + my $fh = shift + or die "/etc/passwd: $!"; ... }; @@ -63,11 +64,12 @@ on a RAID volume or over NFS when you do a number of stat operations concurrently. -While this works on all types of file descriptors (for example sockets), -using these functions on file descriptors that support nonblocking -operation (again, sockets, pipes etc.) is very inefficient. Use an event -loop for that (such as the L module): IO::AIO will naturally -fit into such an event loop itself. +While most of this works on all types of file descriptors (for example +sockets), using these functions on file descriptors that support +nonblocking operation (again, sockets, pipes etc.) is very inefficient or +might not work (aio_read fails on sockets/pipes/fifos). Use an event loop +for that (such as the L module): IO::AIO will naturally fit +into such an event loop itself. In this version, a number of threads are started that execute your requests and signal their completion. You don't need thread support @@ -100,7 +102,7 @@ # queue the request to open /etc/passwd aio_open "/etc/passwd", O_RDONLY, 0, sub { - my $fh = $_[0] + my $fh = shift or die "error while opening: $!"; # stat'ing filehandles is generally non-blocking @@ -178,6 +180,8 @@ aio request is severed and calling its methods will either do nothing or result in a runtime error). +=back + =cut package IO::AIO; @@ -188,13 +192,13 @@ use base 'Exporter'; BEGIN { - our $VERSION = '2.1'; + our $VERSION = '2.3'; our @AIO_REQ = qw(aio_sendfile aio_read aio_write aio_open aio_close aio_stat aio_lstat aio_unlink aio_rmdir aio_readdir aio_scandir aio_symlink - aio_fsync aio_fdatasync aio_readahead aio_rename aio_link aio_move - aio_copy aio_group aio_nop aio_mknod); - our @EXPORT = (@AIO_REQ, qw(aioreq_pri aioreq_nice)); + aio_readlink aio_fsync aio_fdatasync aio_readahead aio_rename aio_link + aio_move aio_copy aio_group aio_nop aio_mknod); + our @EXPORT = (@AIO_REQ, qw(aioreq_pri aioreq_nice aio_block)); our @EXPORT_OK = qw(poll_fileno poll_cb poll_wait flush min_parallel max_parallel max_idle nreqs nready npending nthreads @@ -208,7 +212,7 @@ =head1 FUNCTIONS -=head2 AIO FUNCTIONS +=head2 AIO REQUEST FUNCTIONS All the C calls are more or less thin wrappers around the syscall with the same name (sans C). The arguments are similar or identical, @@ -221,21 +225,25 @@ All functions expecting a filehandle keep a copy of the filehandle internally until the request has finished. -All requests return objects of type L that allow further -manipulation of those requests while they are in-flight. +All functions return request objects of type L that allow +further manipulation of those requests while they are in-flight. The pathnames you pass to these routines I be absolute and -encoded in byte form. The reason for the former is that at the time the +encoded as octets. The reason for the former is that at the time the request is being executed, the current working directory could have changed. Alternatively, you can make sure that you never change the -current working directory. +current working directory anywhere in the program and then use relative +paths. -To encode pathnames to byte form, either make sure you either: a) -always pass in filenames you got from outside (command line, readdir -etc.), b) are ASCII or ISO 8859-1, c) use the Encode module and encode +To encode pathnames as octets, either make sure you either: a) always pass +in filenames you got from outside (command line, readdir etc.) without +tinkering, b) are ASCII or ISO 8859-1, c) use the Encode module and encode your pathnames to the locale (or other) encoding in effect in the user environment, d) use Glib::filename_from_unicode on unicode filenames or e) -use something else. +use something else to ensure your scalar has the correct contents. + +This works, btw. independent of the internal UTF-8 bit, which IO::AIO +handles correctly wether it is set or not. =over 4 @@ -268,7 +276,7 @@ =item aioreq_nice $pri_adjust Similar to C, but subtracts the given value from the current -priority, so effects are cumulative. +priority, so the effect is cumulative. =item aio_open $pathname, $flags, $mode, $callback->($fh) @@ -413,6 +421,12 @@ Asynchronously create a new symbolic link to the existing object at C<$srcpath> at the path C<$dstpath> and call the callback with the result code. +=item aio_readlink $path, $callback->($link) + +Asynchronously read the symlink specified by C<$path> and pass it to +the callback. If an error occurs, nothing or undef gets passed to the +callback. + =item aio_rename $srcpath, $dstpath, $callback->($status) Asynchronously rename the object at C<$srcpath> to C<$dstpath>, just as @@ -450,50 +464,52 @@ =cut sub aio_copy($$;$) { - my ($src, $dst, $cb) = @_; - - my $pri = aioreq_pri; - my $grp = aio_group $cb; + aio_block { + my ($src, $dst, $cb) = @_; - aioreq_pri $pri; - add $grp aio_open $src, O_RDONLY, 0, sub { - if (my $src_fh = $_[0]) { - my @stat = stat $src_fh; + my $pri = aioreq_pri; + my $grp = aio_group $cb; - aioreq_pri $pri; - add $grp aio_open $dst, O_CREAT | O_WRONLY | O_TRUNC, 0200, sub { - if (my $dst_fh = $_[0]) { - aioreq_pri $pri; - add $grp aio_sendfile $dst_fh, $src_fh, 0, $stat[7], sub { - if ($_[0] == $stat[7]) { - $grp->result (0); - close $src_fh; - - # those should not normally block. should. should. - utime $stat[8], $stat[9], $dst; - chmod $stat[2] & 07777, $dst_fh; - chown $stat[4], $stat[5], $dst_fh; - close $dst_fh; - } else { - $grp->result (-1); - close $src_fh; - close $dst_fh; - - aioreq $pri; - add $grp aio_unlink $dst; - } - }; - } else { - $grp->result (-1); - } - }, + aioreq_pri $pri; + add $grp aio_open $src, O_RDONLY, 0, sub { + if (my $src_fh = $_[0]) { + my @stat = stat $src_fh; + + aioreq_pri $pri; + add $grp aio_open $dst, O_CREAT | O_WRONLY | O_TRUNC, 0200, sub { + if (my $dst_fh = $_[0]) { + aioreq_pri $pri; + add $grp aio_sendfile $dst_fh, $src_fh, 0, $stat[7], sub { + if ($_[0] == $stat[7]) { + $grp->result (0); + close $src_fh; + + # those should not normally block. should. should. + utime $stat[8], $stat[9], $dst; + chmod $stat[2] & 07777, $dst_fh; + chown $stat[4], $stat[5], $dst_fh; + close $dst_fh; + } else { + $grp->result (-1); + close $src_fh; + close $dst_fh; - } else { - $grp->result (-1); - } - }; + aioreq $pri; + add $grp aio_unlink $dst; + } + }; + } else { + $grp->result (-1); + } + }, + + } else { + $grp->result (-1); + } + }; - $grp + $grp + } } =item aio_move $srcpath, $dstpath, $callback->($status) @@ -509,29 +525,31 @@ =cut sub aio_move($$;$) { - my ($src, $dst, $cb) = @_; + aio_block { + my ($src, $dst, $cb) = @_; - my $pri = aioreq_pri; - my $grp = aio_group $cb; + my $pri = aioreq_pri; + my $grp = aio_group $cb; - aioreq_pri $pri; - add $grp aio_rename $src, $dst, sub { - if ($_[0] && $! == EXDEV) { - aioreq_pri $pri; - add $grp aio_copy $src, $dst, sub { + aioreq_pri $pri; + add $grp aio_rename $src, $dst, sub { + if ($_[0] && $! == EXDEV) { + aioreq_pri $pri; + add $grp aio_copy $src, $dst, sub { + $grp->result ($_[0]); + + if (!$_[0]) { + aioreq_pri $pri; + add $grp aio_unlink $src; + } + }; + } else { $grp->result ($_[0]); + } + }; - if (!$_[0]) { - aioreq_pri $pri; - add $grp aio_unlink $src; - } - }; - } else { - $grp->result ($_[0]); - } - }; - - $grp + $grp + } } =item aio_scandir $path, $maxreq, $callback->($dirs, $nondirs) @@ -589,89 +607,91 @@ =cut sub aio_scandir($$$) { - my ($path, $maxreq, $cb) = @_; - - my $pri = aioreq_pri; + aio_block { + my ($path, $maxreq, $cb) = @_; - my $grp = aio_group $cb; + my $pri = aioreq_pri; - $maxreq = 4 if $maxreq <= 0; + my $grp = aio_group $cb; - # stat once - aioreq_pri $pri; - add $grp aio_stat $path, sub { - return $grp->result () if $_[0]; - my $now = time; - my $hash1 = join ":", (stat _)[0,1,3,7,9]; + $maxreq = 4 if $maxreq <= 0; - # read the directory entries + # stat once aioreq_pri $pri; - add $grp aio_readdir $path, sub { - my $entries = shift - or return $grp->result (); + add $grp aio_stat $path, sub { + return $grp->result () if $_[0]; + my $now = time; + my $hash1 = join ":", (stat _)[0,1,3,7,9]; - # stat the dir another time + # read the directory entries aioreq_pri $pri; - add $grp aio_stat $path, sub { - my $hash2 = join ":", (stat _)[0,1,3,7,9]; + add $grp aio_readdir $path, sub { + my $entries = shift + or return $grp->result (); + + # stat the dir another time + aioreq_pri $pri; + add $grp aio_stat $path, sub { + my $hash2 = join ":", (stat _)[0,1,3,7,9]; + + my $ndirs; + + # take the slow route if anything looks fishy + if ($hash1 ne $hash2 or (stat _)[9] == $now) { + $ndirs = -1; + } else { + # if nlink == 2, we are finished + # on non-posix-fs's, we rely on nlink < 2 + $ndirs = (stat _)[3] - 2 + or return $grp->result ([], $entries); + } + + # sort into likely dirs and likely nondirs + # dirs == files without ".", short entries first + $entries = [map $_->[0], + sort { $b->[1] cmp $a->[1] } + map [$_, sprintf "%s%04d", (/.\./ ? "1" : "0"), length], + @$entries]; - my $ndirs; + my (@dirs, @nondirs); - # take the slow route if anything looks fishy - if ($hash1 ne $hash2 or (stat _)[9] == $now) { - $ndirs = -1; - } else { - # if nlink == 2, we are finished - # on non-posix-fs's, we rely on nlink < 2 - $ndirs = (stat _)[3] - 2 - or return $grp->result ([], $entries); - } - - # sort into likely dirs and likely nondirs - # dirs == files without ".", short entries first - $entries = [map $_->[0], - sort { $b->[1] cmp $a->[1] } - map [$_, sprintf "%s%04d", (/.\./ ? "1" : "0"), length], - @$entries]; - - my (@dirs, @nondirs); - - my $statgrp = add $grp aio_group sub { - $grp->result (\@dirs, \@nondirs); - }; + my $statgrp = add $grp aio_group sub { + $grp->result (\@dirs, \@nondirs); + }; - limit $statgrp $maxreq; - feed $statgrp sub { - return unless @$entries; - my $entry = pop @$entries; - - aioreq_pri $pri; - add $statgrp aio_stat "$path/$entry/.", sub { - if ($_[0] < 0) { - push @nondirs, $entry; - } else { - # need to check for real directory - aioreq_pri $pri; - add $statgrp aio_lstat "$path/$entry", sub { - if (-d _) { - push @dirs, $entry; - - unless (--$ndirs) { - push @nondirs, @$entries; - feed $statgrp; + limit $statgrp $maxreq; + feed $statgrp sub { + return unless @$entries; + my $entry = pop @$entries; + + aioreq_pri $pri; + add $statgrp aio_stat "$path/$entry/.", sub { + if ($_[0] < 0) { + push @nondirs, $entry; + } else { + # need to check for real directory + aioreq_pri $pri; + add $statgrp aio_lstat "$path/$entry", sub { + if (-d _) { + push @dirs, $entry; + + unless (--$ndirs) { + push @nondirs, @$entries; + feed $statgrp; + } + } else { + push @nondirs, $entry; } - } else { - push @nondirs, $entry; } } - } + }; }; }; }; }; - }; - $grp + $grp + } } =item aio_fsync $fh, $callback->($status) @@ -935,6 +955,11 @@ C to process requests (more correctly the mininum amount of time C is allowed to use). +Setting C to a non-zero value creates an overhead of one +syscall per request processed, which is not normally a problem unless your +callbacks are really really fast or your OS is really really slow (I am +not mentioning Solaris here). Using C incurs no overhead. + Setting these is useful if you want to ensure some level of interactiveness when perl is not fast enough to process all requests in time. @@ -942,7 +967,7 @@ For interactive programs, values such as C<0.01> to C<0.1> should be fine. Example: Install an Event watcher that automatically calls -IO::AIO::poll_some with low priority, to ensure that other parts of the +IO::AIO::poll_cb with low priority, to ensure that other parts of the program get the CPU sometimes even under high AIO load. # try not to spend much more than 0.1s in poll_cb @@ -955,9 +980,10 @@ =item IO::AIO::poll_wait -Wait till the result filehandle becomes ready for reading (simply does a -C on the filehandle. This is useful if you want to +synchronously wait for some requests to finish). See C for an example. @@ -965,10 +991,10 @@ Waits until some requests have been handled. -Strictly equivalent to: +Returns the number of requests processed, but is otherwise strictly +equivalent to: IO::AIO::poll_wait, IO::AIO::poll_cb - if IO::AIO::nreqs; =item IO::AIO::flush @@ -1092,10 +1118,7 @@ min_parallel 8; -END { - min_parallel 1; - flush; -}; +END { flush } 1;