--- IO-AIO/AIO.pm 2007/09/11 08:25:16 1.112 +++ IO-AIO/AIO.pm 2008/06/19 21:25:30 1.130 @@ -28,9 +28,11 @@ my $grp = aio_group sub { print "all stats done\n" }; add $grp aio_stat "..." for ...; - # AnyEvent integration - open my $fh, "<&=" . IO::AIO::poll_fileno or die "$!"; - my $w = AnyEvent->io (fh => $fh, poll => 'r', cb => sub { IO::AIO::poll_cb }); + # AnyEvent integration (EV, Event, Glib, Tk, POE, urxvt, pureperl...) + use AnyEvent::AIO; + + # EV integration + my $w = EV::io IO::AIO::poll_fileno, EV::READ, \&IO::AIO::poll_cb; # Event integration Event->io (fd => IO::AIO::poll_fileno, @@ -185,20 +187,25 @@ package IO::AIO; +use Carp (); + no warnings; use strict 'vars'; use base 'Exporter'; BEGIN { - our $VERSION = '2.4'; + our $VERSION = '3.05'; + + our @AIO_REQ = qw(aio_sendfile aio_read aio_write aio_open aio_close + aio_stat aio_lstat aio_unlink aio_rmdir aio_readdir + aio_scandir aio_symlink aio_readlink aio_sync aio_fsync + aio_fdatasync aio_pathsync aio_readahead + aio_rename aio_link aio_move aio_copy aio_group + aio_nop aio_mknod aio_load aio_rmtree aio_mkdir aio_chown + aio_chmod aio_utime aio_truncate); - our @AIO_REQ = qw(aio_sendfile aio_read aio_write aio_open aio_close aio_stat - aio_lstat aio_unlink aio_rmdir aio_readdir aio_scandir aio_symlink - aio_readlink aio_fsync aio_fdatasync aio_readahead aio_rename aio_link - aio_move aio_copy aio_group aio_nop aio_mknod aio_load aio_rmtree aio_mkdir - aio_chown aio_chmod aio_utime aio_truncate); - our @EXPORT = (@AIO_REQ, qw(aioreq_pri aioreq_nice aio_block)); + our @EXPORT = (@AIO_REQ, qw(aioreq_pri aioreq_nice)); our @EXPORT_OK = qw(poll_fileno poll_cb poll_wait flush min_parallel max_parallel max_idle nreqs nready npending nthreads @@ -313,14 +320,19 @@ =item aio_close $fh, $callback->($status) Asynchronously close a file and call the callback with the result -code. I although accepted, you should not pass in a perl -filehandle here, as perl will likely close the file descriptor another -time when the filehandle is destroyed. Normally, you can safely call perls -C or just let filehandles go out of scope. +code. + +Unfortunately, you can't do this to perl. Perl I very strongly on +closing the file descriptor associated with the filehandle itself. -This is supposed to be a bug in the API, so that might change. It's -therefore best to avoid this function. +Therefore, C will not close the filehandle - instead it will +use dup2 to overwrite the file descriptor with the write-end of a pipe +(the pipe fd will be created on demand and will be cached). +Or in other words: the file descriptor will be closed, but it will not be +free for reuse until the perl filehandle is closed. + +=cut =item aio_read $fh,$offset,$length, $data,$dataoffset, $callback->($retval) @@ -335,7 +347,7 @@ be used (and updated), otherwise the file descriptor offset will not be changed by these calls. -If C<$length> is undefined in C, use the remaining length of C<$data>. +If C<$length> is undefined in C, use the remaining length of C<$data>. If C<$dataoffset> is less than zero, it will be counted from the end of C<$data>. @@ -529,26 +541,24 @@ =cut sub aio_load($$;$) { - aio_block { - my ($path, undef, $cb) = @_; - my $data = \$_[1]; + my ($path, undef, $cb) = @_; + my $data = \$_[1]; - my $pri = aioreq_pri; - my $grp = aio_group $cb; + my $pri = aioreq_pri; + my $grp = aio_group $cb; - aioreq_pri $pri; - add $grp aio_open $path, O_RDONLY, 0, sub { - my $fh = shift - or return $grp->result (-1); + aioreq_pri $pri; + add $grp aio_open $path, O_RDONLY, 0, sub { + my $fh = shift + or return $grp->result (-1); - aioreq_pri $pri; - add $grp aio_read $fh, 0, (-s $fh), $$data, 0, sub { - $grp->result ($_[0]); - }; + aioreq_pri $pri; + add $grp aio_read $fh, 0, (-s $fh), $$data, 0, sub { + $grp->result ($_[0]); }; + }; - $grp - } + $grp } =item aio_copy $srcpath, $dstpath, $callback->($status) @@ -569,52 +579,52 @@ =cut sub aio_copy($$;$) { - aio_block { - my ($src, $dst, $cb) = @_; + my ($src, $dst, $cb) = @_; - my $pri = aioreq_pri; - my $grp = aio_group $cb; + my $pri = aioreq_pri; + my $grp = aio_group $cb; - aioreq_pri $pri; - add $grp aio_open $src, O_RDONLY, 0, sub { - if (my $src_fh = $_[0]) { - my @stat = stat $src_fh; + aioreq_pri $pri; + add $grp aio_open $src, O_RDONLY, 0, sub { + if (my $src_fh = $_[0]) { + my @stat = stat $src_fh; - aioreq_pri $pri; - add $grp aio_open $dst, O_CREAT | O_WRONLY | O_TRUNC, 0200, sub { - if (my $dst_fh = $_[0]) { - aioreq_pri $pri; - add $grp aio_sendfile $dst_fh, $src_fh, 0, $stat[7], sub { - if ($_[0] == $stat[7]) { - $grp->result (0); - close $src_fh; - - # those should not normally block. should. should. - utime $stat[8], $stat[9], $dst; - chmod $stat[2] & 07777, $dst_fh; - chown $stat[4], $stat[5], $dst_fh; - close $dst_fh; - } else { - $grp->result (-1); - close $src_fh; - close $dst_fh; + aioreq_pri $pri; + add $grp aio_open $dst, O_CREAT | O_WRONLY | O_TRUNC, 0200, sub { + if (my $dst_fh = $_[0]) { + aioreq_pri $pri; + add $grp aio_sendfile $dst_fh, $src_fh, 0, $stat[7], sub { + if ($_[0] == $stat[7]) { + $grp->result (0); + close $src_fh; + + # those should not normally block. should. should. + utime $stat[8], $stat[9], $dst; + chmod $stat[2] & 07777, $dst_fh; + chown $stat[4], $stat[5], $dst_fh; + + aioreq_pri $pri; + add $grp aio_close $dst_fh; + } else { + $grp->result (-1); + close $src_fh; + close $dst_fh; + + aioreq $pri; + add $grp aio_unlink $dst; + } + }; + } else { + $grp->result (-1); + } + }, - aioreq $pri; - add $grp aio_unlink $dst; - } - }; - } else { - $grp->result (-1); - } - }, - - } else { - $grp->result (-1); - } - }; + } else { + $grp->result (-1); + } + }; - $grp - } + $grp } =item aio_move $srcpath, $dstpath, $callback->($status) @@ -630,31 +640,29 @@ =cut sub aio_move($$;$) { - aio_block { - my ($src, $dst, $cb) = @_; + my ($src, $dst, $cb) = @_; - my $pri = aioreq_pri; - my $grp = aio_group $cb; + my $pri = aioreq_pri; + my $grp = aio_group $cb; - aioreq_pri $pri; - add $grp aio_rename $src, $dst, sub { - if ($_[0] && $! == EXDEV) { - aioreq_pri $pri; - add $grp aio_copy $src, $dst, sub { - $grp->result ($_[0]); - - if (!$_[0]) { - aioreq_pri $pri; - add $grp aio_unlink $src; - } - }; - } else { + aioreq_pri $pri; + add $grp aio_rename $src, $dst, sub { + if ($_[0] && $! == EXDEV) { + aioreq_pri $pri; + add $grp aio_copy $src, $dst, sub { $grp->result ($_[0]); - } - }; - $grp - } + if (!$_[0]) { + aioreq_pri $pri; + add $grp aio_unlink $src; + } + }; + } else { + $grp->result ($_[0]); + } + }; + + $grp } =item aio_scandir $path, $maxreq, $callback->($dirs, $nondirs) @@ -712,91 +720,89 @@ =cut sub aio_scandir($$;$) { - aio_block { - my ($path, $maxreq, $cb) = @_; + my ($path, $maxreq, $cb) = @_; + + my $pri = aioreq_pri; - my $pri = aioreq_pri; + my $grp = aio_group $cb; - my $grp = aio_group $cb; + $maxreq = 4 if $maxreq <= 0; - $maxreq = 4 if $maxreq <= 0; + # stat once + aioreq_pri $pri; + add $grp aio_stat $path, sub { + return $grp->result () if $_[0]; + my $now = time; + my $hash1 = join ":", (stat _)[0,1,3,7,9]; - # stat once + # read the directory entries aioreq_pri $pri; - add $grp aio_stat $path, sub { - return $grp->result () if $_[0]; - my $now = time; - my $hash1 = join ":", (stat _)[0,1,3,7,9]; + add $grp aio_readdir $path, sub { + my $entries = shift + or return $grp->result (); - # read the directory entries + # stat the dir another time aioreq_pri $pri; - add $grp aio_readdir $path, sub { - my $entries = shift - or return $grp->result (); + add $grp aio_stat $path, sub { + my $hash2 = join ":", (stat _)[0,1,3,7,9]; - # stat the dir another time - aioreq_pri $pri; - add $grp aio_stat $path, sub { - my $hash2 = join ":", (stat _)[0,1,3,7,9]; - - my $ndirs; + my $ndirs; - # take the slow route if anything looks fishy - if ($hash1 ne $hash2 or (stat _)[9] == $now) { - $ndirs = -1; - } else { - # if nlink == 2, we are finished - # on non-posix-fs's, we rely on nlink < 2 - $ndirs = (stat _)[3] - 2 - or return $grp->result ([], $entries); - } - - # sort into likely dirs and likely nondirs - # dirs == files without ".", short entries first - $entries = [map $_->[0], - sort { $b->[1] cmp $a->[1] } - map [$_, sprintf "%s%04d", (/.\./ ? "1" : "0"), length], - @$entries]; + # take the slow route if anything looks fishy + if ($hash1 ne $hash2 or (stat _)[9] == $now) { + $ndirs = -1; + } else { + # if nlink == 2, we are finished + # on non-posix-fs's, we rely on nlink < 2 + $ndirs = (stat _)[3] - 2 + or return $grp->result ([], $entries); + } + + # sort into likely dirs and likely nondirs + # dirs == files without ".", short entries first + $entries = [map $_->[0], + sort { $b->[1] cmp $a->[1] } + map [$_, sprintf "%s%04d", (/.\./ ? "1" : "0"), length], + @$entries]; - my (@dirs, @nondirs); + my (@dirs, @nondirs); - my $statgrp = add $grp aio_group sub { - $grp->result (\@dirs, \@nondirs); - }; + my $statgrp = add $grp aio_group sub { + $grp->result (\@dirs, \@nondirs); + }; - limit $statgrp $maxreq; - feed $statgrp sub { - return unless @$entries; - my $entry = pop @$entries; - - aioreq_pri $pri; - add $statgrp aio_stat "$path/$entry/.", sub { - if ($_[0] < 0) { - push @nondirs, $entry; - } else { - # need to check for real directory - aioreq_pri $pri; - add $statgrp aio_lstat "$path/$entry", sub { - if (-d _) { - push @dirs, $entry; - - unless (--$ndirs) { - push @nondirs, @$entries; - feed $statgrp; - } - } else { - push @nondirs, $entry; + limit $statgrp $maxreq; + feed $statgrp sub { + return unless @$entries; + my $entry = pop @$entries; + + aioreq_pri $pri; + add $statgrp aio_stat "$path/$entry/.", sub { + if ($_[0] < 0) { + push @nondirs, $entry; + } else { + # need to check for real directory + aioreq_pri $pri; + add $statgrp aio_lstat "$path/$entry", sub { + if (-d _) { + push @dirs, $entry; + + unless (--$ndirs) { + push @nondirs, @$entries; + feed $statgrp; } + } else { + push @nondirs, $entry; } } - }; + } }; }; }; }; + }; - $grp - } + $grp } =item aio_rmtree $path, $callback->($status) @@ -810,32 +816,34 @@ sub aio_rmtree; sub aio_rmtree($;$) { - aio_block { - my ($path, $cb) = @_; + my ($path, $cb) = @_; - my $pri = aioreq_pri; - my $grp = aio_group $cb; + my $pri = aioreq_pri; + my $grp = aio_group $cb; - aioreq_pri $pri; - add $grp aio_scandir $path, 0, sub { - my ($dirs, $nondirs) = @_; + aioreq_pri $pri; + add $grp aio_scandir $path, 0, sub { + my ($dirs, $nondirs) = @_; - my $dirgrp = aio_group sub { - add $grp aio_rmdir $path, sub { - $grp->result ($_[0]); - }; + my $dirgrp = aio_group sub { + add $grp aio_rmdir $path, sub { + $grp->result ($_[0]); }; + }; - (aioreq_pri $pri), add $dirgrp aio_rmtree "$path/$_" for @$dirs; - (aioreq_pri $pri), add $dirgrp aio_unlink "$path/$_" for @$nondirs; + (aioreq_pri $pri), add $dirgrp aio_rmtree "$path/$_" for @$dirs; + (aioreq_pri $pri), add $dirgrp aio_unlink "$path/$_" for @$nondirs; - add $grp $dirgrp; - }; + add $grp $dirgrp; + }; - $grp - } + $grp } +=item aio_sync $callback->($status) + +Asynchronously call sync and call the callback when finished. + =item aio_fsync $fh, $callback->($status) Asynchronously call fsync on the given filehandle and call the callback @@ -849,6 +857,44 @@ If this call isn't available because your OS lacks it or it couldn't be detected, it will be emulated by calling C instead. +=item aio_pathsync $path, $callback->($status) + +This request tries to open, fsync and close the given path. This is a +composite request intended tosync directories after directory operations +(E.g. rename). This might not work on all operating systems or have any +specific effect, but usually it makes sure that directory changes get +written to disc. It works for anything that can be opened for read-only, +not just directories. + +Passes C<0> when everything went ok, and C<-1> on error. + +=cut + +sub aio_pathsync($;$) { + my ($path, $cb) = @_; + + my $pri = aioreq_pri; + my $grp = aio_group $cb; + + aioreq_pri $pri; + add $grp aio_open $path, O_RDONLY, 0, sub { + my ($fh) = @_; + if ($fh) { + aioreq_pri $pri; + add $grp aio_fsync $fh, sub { + $grp->result ($_[0]); + + aioreq_pri $pri; + add $grp aio_close $fh; + }; + } else { + $grp->result (-1); + } + }; + + $grp +} + =item aio_group $callback->(...) This is a very special aio request: Instead of doing something, it is a @@ -994,7 +1040,7 @@ =item $grp->result (...) Set the result value(s) that will be passed to the group callback when all -subrequests have finished and set thre groups errno to the current value +subrequests have finished and set the groups errno to the current value of errno (just like calling C without an error number). By default, no argument will be passed and errno is zero. @@ -1073,12 +1119,14 @@ =item IO::AIO::poll_cb Process some outstanding events on the result pipe. You have to call this -regularly. Returns the number of events processed. Returns immediately -when no events are outstanding. The amount of events processed depends on -the settings of C and C. +regularly. Returns C<0> if all events could be processed, or C<-1> if it +returned earlier for whatever reason. Returns immediately when no events +are outstanding. The amount of events processed depends on the settings of +C and C. If not all requests were processed for whatever reason, the filehandle -will still be ready when C returns. +will still be ready when C returns, so normally you don't have to +do anything special to have it called later. Example: Install an Event watcher that automatically calls IO::AIO::poll_cb with high priority: @@ -1202,14 +1250,14 @@ creation is fast. If thread creation is very slow on your system you might want to use larger values. -=item $oldmaxreqs = IO::AIO::max_outstanding $maxreqs +=item IO::AIO::max_outstanding $maxreqs This is a very bad function to use in interactive programs because it blocks, and a bad way to reduce concurrency because it is inexact: Better use an C together with a feed callback. Sets the maximum number of outstanding requests to C<$nreqs>. If you -to queue up more than this number of requests, the next call to the +do queue up more than this number of requests, the next call to the C (and C and other functions calling C) function will block until the limit is no longer exceeded. @@ -1217,7 +1265,7 @@ number of outstanding requests. You can still queue as many requests as you want. Therefore, -C is mainly useful in simple scripts (with low values) or +C is mainly useful in simple scripts (with low values) or as a stop gap to shield against fatal memory overflow (with large values). =back @@ -1250,22 +1298,6 @@ =cut -# support function to convert a fd into a perl filehandle -sub _fd2fh { - return undef if $_[0] < 0; - - # try to generate nice filehandles - my $sym = "IO::AIO::fd#$_[0]"; - local *$sym; - - open *$sym, "+<&=$_[0]" # usually works under any unix - or open *$sym, "<&=$_[0]" # cygwin needs this - or open *$sym, ">&=$_[0]" # or this - or return undef; - - *$sym -} - min_parallel 8; END { flush } @@ -1313,7 +1345,8 @@ =head1 SEE ALSO -L. +L for easy integration into event loops, L for a +more natural syntax. =head1 AUTHOR