ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/IO-AIO/AIO.pm
Revision: 1.99
Committed: Sun Jan 7 21:32:20 2007 UTC (17 years, 4 months ago) by root
Branch: MAIN
Changes since 1.98: +38 -2 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3     IO::AIO - Asynchronous Input/Output
4    
5     =head1 SYNOPSIS
6    
7     use IO::AIO;
8    
9 root 1.6 aio_open "/etc/passwd", O_RDONLY, 0, sub {
10 root 1.94 my $fh = shift
11     or die "/etc/passwd: $!";
12 root 1.6 ...
13     };
14    
15     aio_unlink "/tmp/file", sub { };
16    
17     aio_read $fh, 30000, 1024, $buffer, 0, sub {
18 root 1.8 $_[0] > 0 or die "read error: $!";
19 root 1.6 };
20    
21 root 1.56 # version 2+ has request and group objects
22     use IO::AIO 2;
23 root 1.52
24 root 1.68 aioreq_pri 4; # give next request a very high priority
25 root 1.52 my $req = aio_unlink "/tmp/file", sub { };
26     $req->cancel; # cancel request if still in queue
27    
28 root 1.56 my $grp = aio_group sub { print "all stats done\n" };
29     add $grp aio_stat "..." for ...;
30    
31     # AnyEvent integration
32 root 1.42 open my $fh, "<&=" . IO::AIO::poll_fileno or die "$!";
33     my $w = AnyEvent->io (fh => $fh, poll => 'r', cb => sub { IO::AIO::poll_cb });
34    
35 root 1.56 # Event integration
36 root 1.6 Event->io (fd => IO::AIO::poll_fileno,
37 root 1.7 poll => 'r',
38 root 1.6 cb => \&IO::AIO::poll_cb);
39    
40 root 1.56 # Glib/Gtk2 integration
41 root 1.6 add_watch Glib::IO IO::AIO::poll_fileno,
42 root 1.22 in => sub { IO::AIO::poll_cb; 1 };
43 root 1.6
44 root 1.56 # Tk integration
45 root 1.6 Tk::Event::IO->fileevent (IO::AIO::poll_fileno, "",
46     readable => \&IO::AIO::poll_cb);
47    
48 root 1.56 # Danga::Socket integration
49 root 1.11 Danga::Socket->AddOtherFds (IO::AIO::poll_fileno =>
50     \&IO::AIO::poll_cb);
51    
52 root 1.1 =head1 DESCRIPTION
53    
54     This module implements asynchronous I/O using whatever means your
55 root 1.2 operating system supports.
56 root 1.1
57 root 1.85 Asynchronous means that operations that can normally block your program
58     (e.g. reading from disk) will be done asynchronously: the operation
59     will still block, but you can do something else in the meantime. This
60     is extremely useful for programs that need to stay interactive even
61     when doing heavy I/O (GUI programs, high performance network servers
62     etc.), but can also be used to easily do operations in parallel that are
63     normally done sequentially, e.g. stat'ing many files, which is much faster
64     on a RAID volume or over NFS when you do a number of stat operations
65     concurrently.
66    
67 root 1.90 While most of this works on all types of file descriptors (for example
68     sockets), using these functions on file descriptors that support
69     nonblocking operation (again, sockets, pipes etc.) is very inefficient or
70     might not work (aio_read fails on sockets/pipes/fifos). Use an event loop
71     for that (such as the L<Event|Event> module): IO::AIO will naturally fit
72     into such an event loop itself.
73 root 1.85
74 root 1.72 In this version, a number of threads are started that execute your
75     requests and signal their completion. You don't need thread support
76     in perl, and the threads created by this module will not be visible
77     to perl. In the future, this module might make use of the native aio
78     functions available on many operating systems. However, they are often
79 root 1.85 not well-supported or restricted (GNU/Linux doesn't allow them on normal
80 root 1.72 files currently, for example), and they would only support aio_read and
81     aio_write, so the remaining functionality would have to be implemented
82     using threads anyway.
83    
84     Although the module will work with in the presence of other (Perl-)
85     threads, it is currently not reentrant in any way, so use appropriate
86     locking yourself, always call C<poll_cb> from within the same thread, or
87     never call C<poll_cb> (or other C<aio_> functions) recursively.
88    
89 root 1.86 =head2 EXAMPLE
90    
91     This is a simple example that uses the Event module and loads
92     F</etc/passwd> asynchronously:
93    
94     use Fcntl;
95     use Event;
96     use IO::AIO;
97    
98     # register the IO::AIO callback with Event
99     Event->io (fd => IO::AIO::poll_fileno,
100     poll => 'r',
101     cb => \&IO::AIO::poll_cb);
102    
103     # queue the request to open /etc/passwd
104     aio_open "/etc/passwd", O_RDONLY, 0, sub {
105 root 1.94 my $fh = shift
106 root 1.86 or die "error while opening: $!";
107    
108     # stat'ing filehandles is generally non-blocking
109     my $size = -s $fh;
110    
111     # queue a request to read the file
112     my $contents;
113     aio_read $fh, 0, $size, $contents, 0, sub {
114     $_[0] == $size
115     or die "short read: $!";
116    
117     close $fh;
118    
119     # file contents now in $contents
120     print $contents;
121    
122     # exit event loop and program
123     Event::unloop;
124     };
125     };
126    
127     # possibly queue up other requests, or open GUI windows,
128     # check for sockets etc. etc.
129    
130     # process events as long as there are some:
131     Event::loop;
132    
133 root 1.72 =head1 REQUEST ANATOMY AND LIFETIME
134    
135     Every C<aio_*> function creates a request. which is a C data structure not
136     directly visible to Perl.
137    
138     If called in non-void context, every request function returns a Perl
139     object representing the request. In void context, nothing is returned,
140     which saves a bit of memory.
141    
142     The perl object is a fairly standard ref-to-hash object. The hash contents
143     are not used by IO::AIO so you are free to store anything you like in it.
144    
145     During their existance, aio requests travel through the following states,
146     in order:
147    
148     =over 4
149    
150     =item ready
151    
152     Immediately after a request is created it is put into the ready state,
153     waiting for a thread to execute it.
154    
155     =item execute
156    
157     A thread has accepted the request for processing and is currently
158     executing it (e.g. blocking in read).
159    
160     =item pending
161    
162     The request has been executed and is waiting for result processing.
163    
164     While request submission and execution is fully asynchronous, result
165     processing is not and relies on the perl interpreter calling C<poll_cb>
166     (or another function with the same effect).
167    
168     =item result
169    
170     The request results are processed synchronously by C<poll_cb>.
171    
172     The C<poll_cb> function will process all outstanding aio requests by
173     calling their callbacks, freeing memory associated with them and managing
174     any groups they are contained in.
175    
176     =item done
177    
178     Request has reached the end of its lifetime and holds no resources anymore
179     (except possibly for the Perl object, but its connection to the actual
180     aio request is severed and calling its methods will either do nothing or
181     result in a runtime error).
182 root 1.1
183 root 1.88 =back
184    
185 root 1.1 =cut
186    
187     package IO::AIO;
188    
189 root 1.23 no warnings;
190 root 1.51 use strict 'vars';
191 root 1.23
192 root 1.1 use base 'Exporter';
193    
194     BEGIN {
195 root 1.99 our $VERSION = '2.32';
196 root 1.1
197 root 1.67 our @AIO_REQ = qw(aio_sendfile aio_read aio_write aio_open aio_close aio_stat
198     aio_lstat aio_unlink aio_rmdir aio_readdir aio_scandir aio_symlink
199 root 1.90 aio_readlink aio_fsync aio_fdatasync aio_readahead aio_rename aio_link
200 root 1.99 aio_move aio_copy aio_group aio_nop aio_mknod aio_load aio_rmtree);
201 root 1.95 our @EXPORT = (@AIO_REQ, qw(aioreq_pri aioreq_nice aio_block));
202 root 1.67 our @EXPORT_OK = qw(poll_fileno poll_cb poll_wait flush
203 root 1.86 min_parallel max_parallel max_idle
204     nreqs nready npending nthreads
205     max_poll_time max_poll_reqs);
206 root 1.1
207 root 1.54 @IO::AIO::GRP::ISA = 'IO::AIO::REQ';
208    
209 root 1.1 require XSLoader;
210 root 1.51 XSLoader::load ("IO::AIO", $VERSION);
211 root 1.1 }
212    
213 root 1.5 =head1 FUNCTIONS
214 root 1.1
215 root 1.87 =head2 AIO REQUEST FUNCTIONS
216 root 1.1
217 root 1.5 All the C<aio_*> calls are more or less thin wrappers around the syscall
218     with the same name (sans C<aio_>). The arguments are similar or identical,
219 root 1.14 and they all accept an additional (and optional) C<$callback> argument
220     which must be a code reference. This code reference will get called with
221     the syscall return code (e.g. most syscalls return C<-1> on error, unlike
222     perl, which usually delivers "false") as it's sole argument when the given
223     syscall has been executed asynchronously.
224 root 1.1
225 root 1.23 All functions expecting a filehandle keep a copy of the filehandle
226     internally until the request has finished.
227 root 1.1
228 root 1.87 All functions return request objects of type L<IO::AIO::REQ> that allow
229     further manipulation of those requests while they are in-flight.
230 root 1.52
231 root 1.28 The pathnames you pass to these routines I<must> be absolute and
232 root 1.87 encoded as octets. The reason for the former is that at the time the
233 root 1.28 request is being executed, the current working directory could have
234     changed. Alternatively, you can make sure that you never change the
235 root 1.87 current working directory anywhere in the program and then use relative
236     paths.
237 root 1.28
238 root 1.87 To encode pathnames as octets, either make sure you either: a) always pass
239     in filenames you got from outside (command line, readdir etc.) without
240     tinkering, b) are ASCII or ISO 8859-1, c) use the Encode module and encode
241 root 1.28 your pathnames to the locale (or other) encoding in effect in the user
242     environment, d) use Glib::filename_from_unicode on unicode filenames or e)
243 root 1.87 use something else to ensure your scalar has the correct contents.
244    
245     This works, btw. independent of the internal UTF-8 bit, which IO::AIO
246     handles correctly wether it is set or not.
247 root 1.1
248 root 1.5 =over 4
249 root 1.1
250 root 1.80 =item $prev_pri = aioreq_pri [$pri]
251 root 1.68
252 root 1.80 Returns the priority value that would be used for the next request and, if
253     C<$pri> is given, sets the priority for the next aio request.
254 root 1.68
255 root 1.80 The default priority is C<0>, the minimum and maximum priorities are C<-4>
256     and C<4>, respectively. Requests with higher priority will be serviced
257     first.
258    
259     The priority will be reset to C<0> after each call to one of the C<aio_*>
260 root 1.68 functions.
261    
262 root 1.69 Example: open a file with low priority, then read something from it with
263     higher priority so the read request is serviced before other low priority
264     open requests (potentially spamming the cache):
265    
266     aioreq_pri -3;
267     aio_open ..., sub {
268     return unless $_[0];
269    
270     aioreq_pri -2;
271     aio_read $_[0], ..., sub {
272     ...
273     };
274     };
275    
276     =item aioreq_nice $pri_adjust
277    
278     Similar to C<aioreq_pri>, but subtracts the given value from the current
279 root 1.87 priority, so the effect is cumulative.
280 root 1.69
281 root 1.40 =item aio_open $pathname, $flags, $mode, $callback->($fh)
282 root 1.1
283 root 1.2 Asynchronously open or create a file and call the callback with a newly
284     created filehandle for the file.
285 root 1.1
286     The pathname passed to C<aio_open> must be absolute. See API NOTES, above,
287     for an explanation.
288    
289 root 1.20 The C<$flags> argument is a bitmask. See the C<Fcntl> module for a
290     list. They are the same as used by C<sysopen>.
291    
292     Likewise, C<$mode> specifies the mode of the newly created file, if it
293     didn't exist and C<O_CREAT> has been given, just like perl's C<sysopen>,
294     except that it is mandatory (i.e. use C<0> if you don't create new files,
295     and C<0666> or C<0777> if you do).
296 root 1.1
297     Example:
298    
299     aio_open "/etc/passwd", O_RDONLY, 0, sub {
300 root 1.2 if ($_[0]) {
301     print "open successful, fh is $_[0]\n";
302 root 1.1 ...
303     } else {
304     die "open failed: $!\n";
305     }
306     };
307    
308 root 1.40 =item aio_close $fh, $callback->($status)
309 root 1.1
310 root 1.2 Asynchronously close a file and call the callback with the result
311     code. I<WARNING:> although accepted, you should not pass in a perl
312 root 1.20 filehandle here, as perl will likely close the file descriptor another
313     time when the filehandle is destroyed. Normally, you can safely call perls
314     C<close> or just let filehandles go out of scope.
315    
316     This is supposed to be a bug in the API, so that might change. It's
317     therefore best to avoid this function.
318 root 1.1
319 root 1.40 =item aio_read $fh,$offset,$length, $data,$dataoffset, $callback->($retval)
320 root 1.1
321 root 1.40 =item aio_write $fh,$offset,$length, $data,$dataoffset, $callback->($retval)
322 root 1.1
323     Reads or writes C<length> bytes from the specified C<fh> and C<offset>
324     into the scalar given by C<data> and offset C<dataoffset> and calls the
325     callback without the actual number of bytes read (or -1 on error, just
326     like the syscall).
327    
328 root 1.31 The C<$data> scalar I<MUST NOT> be modified in any way while the request
329     is outstanding. Modifying it can result in segfaults or WW3 (if the
330     necessary/optional hardware is installed).
331    
332 root 1.17 Example: Read 15 bytes at offset 7 into scalar C<$buffer>, starting at
333 root 1.1 offset C<0> within the scalar:
334    
335     aio_read $fh, 7, 15, $buffer, 0, sub {
336 root 1.9 $_[0] > 0 or die "read error: $!";
337     print "read $_[0] bytes: <$buffer>\n";
338 root 1.1 };
339    
340 root 1.40 =item aio_sendfile $out_fh, $in_fh, $in_offset, $length, $callback->($retval)
341 root 1.35
342     Tries to copy C<$length> bytes from C<$in_fh> to C<$out_fh>. It starts
343     reading at byte offset C<$in_offset>, and starts writing at the current
344     file offset of C<$out_fh>. Because of that, it is not safe to issue more
345     than one C<aio_sendfile> per C<$out_fh>, as they will interfere with each
346     other.
347    
348     This call tries to make use of a native C<sendfile> syscall to provide
349     zero-copy operation. For this to work, C<$out_fh> should refer to a
350     socket, and C<$in_fh> should refer to mmap'able file.
351    
352     If the native sendfile call fails or is not implemented, it will be
353 root 1.36 emulated, so you can call C<aio_sendfile> on any type of filehandle
354     regardless of the limitations of the operating system.
355 root 1.35
356     Please note, however, that C<aio_sendfile> can read more bytes from
357     C<$in_fh> than are written, and there is no way to find out how many
358 root 1.36 bytes have been read from C<aio_sendfile> alone, as C<aio_sendfile> only
359     provides the number of bytes written to C<$out_fh>. Only if the result
360     value equals C<$length> one can assume that C<$length> bytes have been
361     read.
362 root 1.35
363 root 1.40 =item aio_readahead $fh,$offset,$length, $callback->($retval)
364 root 1.1
365 root 1.20 C<aio_readahead> populates the page cache with data from a file so that
366 root 1.1 subsequent reads from that file will not block on disk I/O. The C<$offset>
367     argument specifies the starting point from which data is to be read and
368     C<$length> specifies the number of bytes to be read. I/O is performed in
369     whole pages, so that offset is effectively rounded down to a page boundary
370     and bytes are read up to the next page boundary greater than or equal to
371 root 1.20 (off-set+length). C<aio_readahead> does not read beyond the end of the
372 root 1.1 file. The current file offset of the file is left unchanged.
373    
374 root 1.26 If that syscall doesn't exist (likely if your OS isn't Linux) it will be
375     emulated by simply reading the data, which would have a similar effect.
376    
377 root 1.40 =item aio_stat $fh_or_path, $callback->($status)
378 root 1.1
379 root 1.40 =item aio_lstat $fh, $callback->($status)
380 root 1.1
381     Works like perl's C<stat> or C<lstat> in void context. The callback will
382     be called after the stat and the results will be available using C<stat _>
383     or C<-s _> etc...
384    
385     The pathname passed to C<aio_stat> must be absolute. See API NOTES, above,
386     for an explanation.
387    
388     Currently, the stats are always 64-bit-stats, i.e. instead of returning an
389     error when stat'ing a large file, the results will be silently truncated
390     unless perl itself is compiled with large file support.
391    
392     Example: Print the length of F</etc/passwd>:
393    
394     aio_stat "/etc/passwd", sub {
395     $_[0] and die "stat failed: $!";
396     print "size is ", -s _, "\n";
397     };
398    
399 root 1.40 =item aio_unlink $pathname, $callback->($status)
400 root 1.1
401     Asynchronously unlink (delete) a file and call the callback with the
402     result code.
403    
404 root 1.82 =item aio_mknod $path, $mode, $dev, $callback->($status)
405    
406 root 1.86 [EXPERIMENTAL]
407    
408 root 1.83 Asynchronously create a device node (or fifo). See mknod(2).
409    
410 root 1.86 The only (POSIX-) portable way of calling this function is:
411 root 1.83
412     aio_mknod $path, IO::AIO::S_IFIFO | $mode, 0, sub { ...
413 root 1.82
414 root 1.50 =item aio_link $srcpath, $dstpath, $callback->($status)
415    
416     Asynchronously create a new link to the existing object at C<$srcpath> at
417     the path C<$dstpath> and call the callback with the result code.
418    
419     =item aio_symlink $srcpath, $dstpath, $callback->($status)
420    
421     Asynchronously create a new symbolic link to the existing object at C<$srcpath> at
422     the path C<$dstpath> and call the callback with the result code.
423    
424 root 1.90 =item aio_readlink $path, $callback->($link)
425    
426     Asynchronously read the symlink specified by C<$path> and pass it to
427     the callback. If an error occurs, nothing or undef gets passed to the
428     callback.
429    
430 root 1.50 =item aio_rename $srcpath, $dstpath, $callback->($status)
431    
432     Asynchronously rename the object at C<$srcpath> to C<$dstpath>, just as
433     rename(2) and call the callback with the result code.
434    
435 root 1.40 =item aio_rmdir $pathname, $callback->($status)
436 root 1.27
437     Asynchronously rmdir (delete) a directory and call the callback with the
438     result code.
439    
440 root 1.46 =item aio_readdir $pathname, $callback->($entries)
441 root 1.37
442     Unlike the POSIX call of the same name, C<aio_readdir> reads an entire
443     directory (i.e. opendir + readdir + closedir). The entries will not be
444     sorted, and will B<NOT> include the C<.> and C<..> entries.
445    
446     The callback a single argument which is either C<undef> or an array-ref
447     with the filenames.
448    
449 root 1.98 =item aio_load $path, $data, $callback->($status)
450    
451     This is a composite request that tries to fully load the given file into
452     memory. Status is the same as with aio_read.
453    
454     =cut
455    
456     sub aio_load($$;$) {
457     aio_block {
458     my ($path, undef, $cb) = @_;
459     my $data = \$_[1];
460    
461     my $pri = aioreq_pri;
462     my $grp = aio_group $cb;
463    
464     aioreq_pri $pri;
465     add $grp aio_open $path, O_RDONLY, 0, sub {
466     my ($fh) = @_
467     or return $grp->result (-1);
468    
469     aioreq_pri $pri;
470     add $grp aio_read $fh, 0, (-s $fh), $$data, 0, sub {
471     $grp->result ($_[0]);
472     };
473     };
474    
475     $grp
476     }
477     }
478    
479 root 1.82 =item aio_copy $srcpath, $dstpath, $callback->($status)
480    
481     Try to copy the I<file> (directories not supported as either source or
482     destination) from C<$srcpath> to C<$dstpath> and call the callback with
483     the C<0> (error) or C<-1> ok.
484    
485     This is a composite request that it creates the destination file with
486     mode 0200 and copies the contents of the source file into it using
487     C<aio_sendfile>, followed by restoring atime, mtime, access mode and
488     uid/gid, in that order.
489    
490     If an error occurs, the partial destination file will be unlinked, if
491     possible, except when setting atime, mtime, access mode and uid/gid, where
492     errors are being ignored.
493    
494     =cut
495    
496     sub aio_copy($$;$) {
497 root 1.95 aio_block {
498     my ($src, $dst, $cb) = @_;
499 root 1.82
500 root 1.95 my $pri = aioreq_pri;
501     my $grp = aio_group $cb;
502 root 1.82
503 root 1.95 aioreq_pri $pri;
504     add $grp aio_open $src, O_RDONLY, 0, sub {
505     if (my $src_fh = $_[0]) {
506     my @stat = stat $src_fh;
507    
508     aioreq_pri $pri;
509     add $grp aio_open $dst, O_CREAT | O_WRONLY | O_TRUNC, 0200, sub {
510     if (my $dst_fh = $_[0]) {
511     aioreq_pri $pri;
512     add $grp aio_sendfile $dst_fh, $src_fh, 0, $stat[7], sub {
513     if ($_[0] == $stat[7]) {
514     $grp->result (0);
515     close $src_fh;
516    
517     # those should not normally block. should. should.
518     utime $stat[8], $stat[9], $dst;
519     chmod $stat[2] & 07777, $dst_fh;
520     chown $stat[4], $stat[5], $dst_fh;
521     close $dst_fh;
522     } else {
523     $grp->result (-1);
524     close $src_fh;
525     close $dst_fh;
526 root 1.82
527 root 1.95 aioreq $pri;
528     add $grp aio_unlink $dst;
529     }
530     };
531     } else {
532     $grp->result (-1);
533     }
534     },
535    
536     } else {
537     $grp->result (-1);
538     }
539     };
540 root 1.82
541 root 1.95 $grp
542     }
543 root 1.82 }
544    
545     =item aio_move $srcpath, $dstpath, $callback->($status)
546    
547     Try to move the I<file> (directories not supported as either source or
548     destination) from C<$srcpath> to C<$dstpath> and call the callback with
549     the C<0> (error) or C<-1> ok.
550    
551     This is a composite request that tries to rename(2) the file first. If
552     rename files with C<EXDEV>, it copies the file with C<aio_copy> and, if
553     that is successful, unlinking the C<$srcpath>.
554    
555     =cut
556    
557     sub aio_move($$;$) {
558 root 1.95 aio_block {
559     my ($src, $dst, $cb) = @_;
560 root 1.82
561 root 1.95 my $pri = aioreq_pri;
562     my $grp = aio_group $cb;
563 root 1.82
564 root 1.95 aioreq_pri $pri;
565     add $grp aio_rename $src, $dst, sub {
566     if ($_[0] && $! == EXDEV) {
567     aioreq_pri $pri;
568     add $grp aio_copy $src, $dst, sub {
569     $grp->result ($_[0]);
570    
571     if (!$_[0]) {
572     aioreq_pri $pri;
573     add $grp aio_unlink $src;
574     }
575     };
576     } else {
577 root 1.82 $grp->result ($_[0]);
578 root 1.95 }
579     };
580 root 1.82
581 root 1.95 $grp
582     }
583 root 1.82 }
584    
585 root 1.40 =item aio_scandir $path, $maxreq, $callback->($dirs, $nondirs)
586    
587 root 1.52 Scans a directory (similar to C<aio_readdir>) but additionally tries to
588 root 1.76 efficiently separate the entries of directory C<$path> into two sets of
589     names, directories you can recurse into (directories), and ones you cannot
590     recurse into (everything else, including symlinks to directories).
591 root 1.52
592 root 1.61 C<aio_scandir> is a composite request that creates of many sub requests_
593     C<$maxreq> specifies the maximum number of outstanding aio requests that
594     this function generates. If it is C<< <= 0 >>, then a suitable default
595 root 1.81 will be chosen (currently 4).
596 root 1.40
597     On error, the callback is called without arguments, otherwise it receives
598     two array-refs with path-relative entry names.
599    
600     Example:
601    
602     aio_scandir $dir, 0, sub {
603     my ($dirs, $nondirs) = @_;
604     print "real directories: @$dirs\n";
605     print "everything else: @$nondirs\n";
606     };
607    
608     Implementation notes.
609    
610     The C<aio_readdir> cannot be avoided, but C<stat()>'ing every entry can.
611    
612     After reading the directory, the modification time, size etc. of the
613 root 1.52 directory before and after the readdir is checked, and if they match (and
614     isn't the current time), the link count will be used to decide how many
615     entries are directories (if >= 2). Otherwise, no knowledge of the number
616     of subdirectories will be assumed.
617    
618     Then entries will be sorted into likely directories (everything without
619     a non-initial dot currently) and likely non-directories (everything
620     else). Then every entry plus an appended C</.> will be C<stat>'ed,
621     likely directories first. If that succeeds, it assumes that the entry
622     is a directory or a symlink to directory (which will be checked
623     seperately). This is often faster than stat'ing the entry itself because
624     filesystems might detect the type of the entry without reading the inode
625     data (e.g. ext2fs filetype feature).
626    
627     If the known number of directories (link count - 2) has been reached, the
628     rest of the entries is assumed to be non-directories.
629    
630     This only works with certainty on POSIX (= UNIX) filesystems, which
631     fortunately are the vast majority of filesystems around.
632    
633     It will also likely work on non-POSIX filesystems with reduced efficiency
634     as those tend to return 0 or 1 as link counts, which disables the
635     directory counting heuristic.
636 root 1.40
637     =cut
638    
639     sub aio_scandir($$$) {
640 root 1.95 aio_block {
641     my ($path, $maxreq, $cb) = @_;
642 root 1.40
643 root 1.95 my $pri = aioreq_pri;
644 root 1.80
645 root 1.95 my $grp = aio_group $cb;
646 root 1.55
647 root 1.95 $maxreq = 4 if $maxreq <= 0;
648 root 1.40
649 root 1.95 # stat once
650 root 1.80 aioreq_pri $pri;
651 root 1.95 add $grp aio_stat $path, sub {
652     return $grp->result () if $_[0];
653     my $now = time;
654     my $hash1 = join ":", (stat _)[0,1,3,7,9];
655 root 1.40
656 root 1.95 # read the directory entries
657 root 1.80 aioreq_pri $pri;
658 root 1.95 add $grp aio_readdir $path, sub {
659     my $entries = shift
660     or return $grp->result ();
661    
662     # stat the dir another time
663     aioreq_pri $pri;
664     add $grp aio_stat $path, sub {
665     my $hash2 = join ":", (stat _)[0,1,3,7,9];
666    
667     my $ndirs;
668    
669     # take the slow route if anything looks fishy
670     if ($hash1 ne $hash2 or (stat _)[9] == $now) {
671     $ndirs = -1;
672     } else {
673     # if nlink == 2, we are finished
674     # on non-posix-fs's, we rely on nlink < 2
675     $ndirs = (stat _)[3] - 2
676     or return $grp->result ([], $entries);
677     }
678    
679     # sort into likely dirs and likely nondirs
680     # dirs == files without ".", short entries first
681     $entries = [map $_->[0],
682     sort { $b->[1] cmp $a->[1] }
683     map [$_, sprintf "%s%04d", (/.\./ ? "1" : "0"), length],
684     @$entries];
685 root 1.40
686 root 1.95 my (@dirs, @nondirs);
687 root 1.40
688 root 1.95 my $statgrp = add $grp aio_group sub {
689     $grp->result (\@dirs, \@nondirs);
690     };
691 root 1.40
692 root 1.95 limit $statgrp $maxreq;
693     feed $statgrp sub {
694     return unless @$entries;
695     my $entry = pop @$entries;
696    
697     aioreq_pri $pri;
698     add $statgrp aio_stat "$path/$entry/.", sub {
699     if ($_[0] < 0) {
700     push @nondirs, $entry;
701     } else {
702     # need to check for real directory
703     aioreq_pri $pri;
704     add $statgrp aio_lstat "$path/$entry", sub {
705     if (-d _) {
706     push @dirs, $entry;
707    
708     unless (--$ndirs) {
709     push @nondirs, @$entries;
710     feed $statgrp;
711     }
712     } else {
713     push @nondirs, $entry;
714 root 1.74 }
715 root 1.40 }
716     }
717 root 1.95 };
718 root 1.74 };
719 root 1.40 };
720     };
721     };
722 root 1.55
723 root 1.95 $grp
724     }
725 root 1.40 }
726    
727 root 1.99 =item aio_rmtree $path, $callback->($status)
728    
729     Delete a directory tree starting (and including) C<$path>, return the status of the final C<rmdir> only.
730     This is a composite request that uses C<aio_scandir> to recurse into and rmdir directories, and
731     unlink everything else.
732    
733     =cut
734    
735     sub aio_rmtree;
736     sub aio_rmtree {
737     aio_block {
738     my ($path, $cb) = @_;
739    
740     my $pri = aioreq_pri;
741     my $grp = aio_group $cb;
742    
743     aioreq_pri $pri;
744     add $grp aio_scandir $path, 0, sub {
745     my ($dirs, $nondirs) = @_;
746    
747     my $dirgrp = aio_group sub {
748     add $grp aio_rmdir $path, sub {
749     $grp->result ($_[0]);
750     };
751     };
752    
753     (aioreq_pri $pri), add $dirgrp aio_rmtree "$path/$_" for @$dirs;
754     (aioreq_pri $pri), add $dirgrp aio_unlink "$path/$_" for @$nondirs;
755    
756     add $grp $dirgrp;
757     };
758    
759     $grp
760     }
761     }
762    
763 root 1.40 =item aio_fsync $fh, $callback->($status)
764 root 1.1
765     Asynchronously call fsync on the given filehandle and call the callback
766     with the fsync result code.
767    
768 root 1.40 =item aio_fdatasync $fh, $callback->($status)
769 root 1.1
770     Asynchronously call fdatasync on the given filehandle and call the
771 root 1.26 callback with the fdatasync result code.
772    
773     If this call isn't available because your OS lacks it or it couldn't be
774     detected, it will be emulated by calling C<fsync> instead.
775 root 1.1
776 root 1.58 =item aio_group $callback->(...)
777 root 1.54
778 root 1.55 This is a very special aio request: Instead of doing something, it is a
779     container for other aio requests, which is useful if you want to bundle
780 root 1.71 many requests into a single, composite, request with a definite callback
781     and the ability to cancel the whole request with its subrequests.
782 root 1.55
783     Returns an object of class L<IO::AIO::GRP>. See its documentation below
784     for more info.
785    
786     Example:
787    
788     my $grp = aio_group sub {
789     print "all stats done\n";
790     };
791    
792     add $grp
793     (aio_stat ...),
794     (aio_stat ...),
795     ...;
796    
797 root 1.63 =item aio_nop $callback->()
798    
799     This is a special request - it does nothing in itself and is only used for
800     side effects, such as when you want to add a dummy request to a group so
801     that finishing the requests in the group depends on executing the given
802     code.
803    
804 root 1.64 While this request does nothing, it still goes through the execution
805     phase and still requires a worker thread. Thus, the callback will not
806     be executed immediately but only after other requests in the queue have
807     entered their execution phase. This can be used to measure request
808     latency.
809    
810 root 1.71 =item IO::AIO::aio_busy $fractional_seconds, $callback->() *NOT EXPORTED*
811 root 1.54
812     Mainly used for debugging and benchmarking, this aio request puts one of
813     the request workers to sleep for the given time.
814    
815 root 1.56 While it is theoretically handy to have simple I/O scheduling requests
816 root 1.71 like sleep and file handle readable/writable, the overhead this creates is
817     immense (it blocks a thread for a long time) so do not use this function
818     except to put your application under artificial I/O pressure.
819 root 1.56
820 root 1.5 =back
821    
822 root 1.53 =head2 IO::AIO::REQ CLASS
823 root 1.52
824     All non-aggregate C<aio_*> functions return an object of this class when
825     called in non-void context.
826    
827     =over 4
828    
829 root 1.65 =item cancel $req
830 root 1.52
831     Cancels the request, if possible. Has the effect of skipping execution
832     when entering the B<execute> state and skipping calling the callback when
833     entering the the B<result> state, but will leave the request otherwise
834     untouched. That means that requests that currently execute will not be
835     stopped and resources held by the request will not be freed prematurely.
836    
837 root 1.65 =item cb $req $callback->(...)
838    
839     Replace (or simply set) the callback registered to the request.
840    
841 root 1.52 =back
842    
843 root 1.55 =head2 IO::AIO::GRP CLASS
844    
845     This class is a subclass of L<IO::AIO::REQ>, so all its methods apply to
846     objects of this class, too.
847    
848     A IO::AIO::GRP object is a special request that can contain multiple other
849     aio requests.
850    
851     You create one by calling the C<aio_group> constructing function with a
852     callback that will be called when all contained requests have entered the
853     C<done> state:
854    
855     my $grp = aio_group sub {
856     print "all requests are done\n";
857     };
858    
859     You add requests by calling the C<add> method with one or more
860     C<IO::AIO::REQ> objects:
861    
862     $grp->add (aio_unlink "...");
863    
864 root 1.58 add $grp aio_stat "...", sub {
865     $_[0] or return $grp->result ("error");
866    
867     # add another request dynamically, if first succeeded
868     add $grp aio_open "...", sub {
869     $grp->result ("ok");
870     };
871     };
872 root 1.55
873     This makes it very easy to create composite requests (see the source of
874     C<aio_move> for an application) that work and feel like simple requests.
875    
876 root 1.62 =over 4
877    
878     =item * The IO::AIO::GRP objects will be cleaned up during calls to
879 root 1.55 C<IO::AIO::poll_cb>, just like any other request.
880    
881 root 1.62 =item * They can be canceled like any other request. Canceling will cancel not
882 root 1.59 only the request itself, but also all requests it contains.
883 root 1.55
884 root 1.62 =item * They can also can also be added to other IO::AIO::GRP objects.
885 root 1.55
886 root 1.62 =item * You must not add requests to a group from within the group callback (or
887 root 1.60 any later time).
888    
889 root 1.62 =back
890    
891 root 1.55 Their lifetime, simplified, looks like this: when they are empty, they
892     will finish very quickly. If they contain only requests that are in the
893     C<done> state, they will also finish. Otherwise they will continue to
894     exist.
895    
896 root 1.57 That means after creating a group you have some time to add requests. And
897     in the callbacks of those requests, you can add further requests to the
898     group. And only when all those requests have finished will the the group
899     itself finish.
900    
901 root 1.55 =over 4
902    
903 root 1.65 =item add $grp ...
904    
905 root 1.55 =item $grp->add (...)
906    
907 root 1.57 Add one or more requests to the group. Any type of L<IO::AIO::REQ> can
908     be added, including other groups, as long as you do not create circular
909     dependencies.
910    
911     Returns all its arguments.
912 root 1.55
913 root 1.74 =item $grp->cancel_subs
914    
915     Cancel all subrequests and clears any feeder, but not the group request
916     itself. Useful when you queued a lot of events but got a result early.
917    
918 root 1.58 =item $grp->result (...)
919    
920     Set the result value(s) that will be passed to the group callback when all
921 root 1.80 subrequests have finished and set thre groups errno to the current value
922     of errno (just like calling C<errno> without an error number). By default,
923     no argument will be passed and errno is zero.
924    
925     =item $grp->errno ([$errno])
926    
927     Sets the group errno value to C<$errno>, or the current value of errno
928     when the argument is missing.
929    
930     Every aio request has an associated errno value that is restored when
931     the callback is invoked. This method lets you change this value from its
932     default (0).
933    
934     Calling C<result> will also set errno, so make sure you either set C<$!>
935     before the call to C<result>, or call c<errno> after it.
936 root 1.58
937 root 1.65 =item feed $grp $callback->($grp)
938 root 1.60
939     Sets a feeder/generator on this group: every group can have an attached
940     generator that generates requests if idle. The idea behind this is that,
941     although you could just queue as many requests as you want in a group,
942     this might starve other requests for a potentially long time. For
943     example, C<aio_scandir> might generate hundreds of thousands C<aio_stat>
944     requests, delaying any later requests for a long time.
945    
946     To avoid this, and allow incremental generation of requests, you can
947     instead a group and set a feeder on it that generates those requests. The
948 root 1.68 feed callback will be called whenever there are few enough (see C<limit>,
949 root 1.60 below) requests active in the group itself and is expected to queue more
950     requests.
951    
952 root 1.68 The feed callback can queue as many requests as it likes (i.e. C<add> does
953     not impose any limits).
954 root 1.60
955 root 1.65 If the feed does not queue more requests when called, it will be
956 root 1.60 automatically removed from the group.
957    
958 root 1.65 If the feed limit is C<0>, it will be set to C<2> automatically.
959 root 1.60
960     Example:
961    
962     # stat all files in @files, but only ever use four aio requests concurrently:
963    
964     my $grp = aio_group sub { print "finished\n" };
965 root 1.68 limit $grp 4;
966 root 1.65 feed $grp sub {
967 root 1.60 my $file = pop @files
968     or return;
969    
970     add $grp aio_stat $file, sub { ... };
971 root 1.65 };
972 root 1.60
973 root 1.68 =item limit $grp $num
974 root 1.60
975     Sets the feeder limit for the group: The feeder will be called whenever
976     the group contains less than this many requests.
977    
978     Setting the limit to C<0> will pause the feeding process.
979    
980 root 1.55 =back
981    
982 root 1.5 =head2 SUPPORT FUNCTIONS
983    
984 root 1.86 =head3 EVENT PROCESSING AND EVENT LOOP INTEGRATION
985    
986 root 1.5 =over 4
987    
988     =item $fileno = IO::AIO::poll_fileno
989    
990 root 1.20 Return the I<request result pipe file descriptor>. This filehandle must be
991     polled for reading by some mechanism outside this module (e.g. Event or
992     select, see below or the SYNOPSIS). If the pipe becomes readable you have
993     to call C<poll_cb> to check the results.
994 root 1.5
995     See C<poll_cb> for an example.
996    
997     =item IO::AIO::poll_cb
998    
999 root 1.86 Process some outstanding events on the result pipe. You have to call this
1000 root 1.5 regularly. Returns the number of events processed. Returns immediately
1001 root 1.86 when no events are outstanding. The amount of events processed depends on
1002     the settings of C<IO::AIO::max_poll_req> and C<IO::AIO::max_poll_time>.
1003 root 1.5
1004 root 1.78 If not all requests were processed for whatever reason, the filehandle
1005     will still be ready when C<poll_cb> returns.
1006    
1007 root 1.20 Example: Install an Event watcher that automatically calls
1008     IO::AIO::poll_cb with high priority:
1009 root 1.5
1010     Event->io (fd => IO::AIO::poll_fileno,
1011     poll => 'r', async => 1,
1012     cb => \&IO::AIO::poll_cb);
1013    
1014 root 1.86 =item IO::AIO::max_poll_reqs $nreqs
1015    
1016     =item IO::AIO::max_poll_time $seconds
1017    
1018     These set the maximum number of requests (default C<0>, meaning infinity)
1019     that are being processed by C<IO::AIO::poll_cb> in one call, respectively
1020     the maximum amount of time (default C<0>, meaning infinity) spent in
1021     C<IO::AIO::poll_cb> to process requests (more correctly the mininum amount
1022     of time C<poll_cb> is allowed to use).
1023 root 1.78
1024 root 1.89 Setting C<max_poll_time> to a non-zero value creates an overhead of one
1025     syscall per request processed, which is not normally a problem unless your
1026     callbacks are really really fast or your OS is really really slow (I am
1027     not mentioning Solaris here). Using C<max_poll_reqs> incurs no overhead.
1028    
1029 root 1.86 Setting these is useful if you want to ensure some level of
1030     interactiveness when perl is not fast enough to process all requests in
1031     time.
1032 root 1.78
1033 root 1.86 For interactive programs, values such as C<0.01> to C<0.1> should be fine.
1034 root 1.78
1035     Example: Install an Event watcher that automatically calls
1036 root 1.89 IO::AIO::poll_cb with low priority, to ensure that other parts of the
1037 root 1.78 program get the CPU sometimes even under high AIO load.
1038    
1039 root 1.86 # try not to spend much more than 0.1s in poll_cb
1040     IO::AIO::max_poll_time 0.1;
1041    
1042     # use a low priority so other tasks have priority
1043 root 1.78 Event->io (fd => IO::AIO::poll_fileno,
1044     poll => 'r', nice => 1,
1045 root 1.86 cb => &IO::AIO::poll_cb);
1046 root 1.78
1047 root 1.5 =item IO::AIO::poll_wait
1048    
1049 root 1.93 If there are any outstanding requests and none of them in the result
1050     phase, wait till the result filehandle becomes ready for reading (simply
1051     does a C<select> on the filehandle. This is useful if you want to
1052     synchronously wait for some requests to finish).
1053 root 1.5
1054     See C<nreqs> for an example.
1055    
1056 root 1.86 =item IO::AIO::poll
1057 root 1.5
1058 root 1.86 Waits until some requests have been handled.
1059 root 1.5
1060 root 1.92 Returns the number of requests processed, but is otherwise strictly
1061     equivalent to:
1062 root 1.5
1063     IO::AIO::poll_wait, IO::AIO::poll_cb
1064 root 1.80
1065 root 1.12 =item IO::AIO::flush
1066    
1067     Wait till all outstanding AIO requests have been handled.
1068    
1069 root 1.13 Strictly equivalent to:
1070    
1071     IO::AIO::poll_wait, IO::AIO::poll_cb
1072     while IO::AIO::nreqs;
1073    
1074 root 1.86 =head3 CONTROLLING THE NUMBER OF THREADS
1075 root 1.13
1076 root 1.5 =item IO::AIO::min_parallel $nthreads
1077    
1078 root 1.61 Set the minimum number of AIO threads to C<$nthreads>. The current
1079     default is C<8>, which means eight asynchronous operations can execute
1080     concurrently at any one time (the number of outstanding requests,
1081     however, is unlimited).
1082 root 1.5
1083 root 1.34 IO::AIO starts threads only on demand, when an AIO request is queued and
1084 root 1.86 no free thread exists. Please note that queueing up a hundred requests can
1085     create demand for a hundred threads, even if it turns out that everything
1086     is in the cache and could have been processed faster by a single thread.
1087 root 1.34
1088 root 1.61 It is recommended to keep the number of threads relatively low, as some
1089     Linux kernel versions will scale negatively with the number of threads
1090     (higher parallelity => MUCH higher latency). With current Linux 2.6
1091     versions, 4-32 threads should be fine.
1092 root 1.5
1093 root 1.34 Under most circumstances you don't need to call this function, as the
1094     module selects a default that is suitable for low to moderate load.
1095 root 1.5
1096     =item IO::AIO::max_parallel $nthreads
1097    
1098 root 1.34 Sets the maximum number of AIO threads to C<$nthreads>. If more than the
1099     specified number of threads are currently running, this function kills
1100     them. This function blocks until the limit is reached.
1101    
1102     While C<$nthreads> are zero, aio requests get queued but not executed
1103     until the number of threads has been increased again.
1104 root 1.5
1105     This module automatically runs C<max_parallel 0> at program end, to ensure
1106     that all threads are killed and that there are no outstanding requests.
1107    
1108     Under normal circumstances you don't need to call this function.
1109    
1110 root 1.86 =item IO::AIO::max_idle $nthreads
1111    
1112     Limit the number of threads (default: 4) that are allowed to idle (i.e.,
1113     threads that did not get a request to process within 10 seconds). That
1114     means if a thread becomes idle while C<$nthreads> other threads are also
1115     idle, it will free its resources and exit.
1116    
1117     This is useful when you allow a large number of threads (e.g. 100 or 1000)
1118     to allow for extremely high load situations, but want to free resources
1119     under normal circumstances (1000 threads can easily consume 30MB of RAM).
1120    
1121     The default is probably ok in most situations, especially if thread
1122     creation is fast. If thread creation is very slow on your system you might
1123     want to use larger values.
1124    
1125 root 1.79 =item $oldmaxreqs = IO::AIO::max_outstanding $maxreqs
1126 root 1.5
1127 root 1.79 This is a very bad function to use in interactive programs because it
1128     blocks, and a bad way to reduce concurrency because it is inexact: Better
1129     use an C<aio_group> together with a feed callback.
1130    
1131     Sets the maximum number of outstanding requests to C<$nreqs>. If you
1132     to queue up more than this number of requests, the next call to the
1133     C<poll_cb> (and C<poll_some> and other functions calling C<poll_cb>)
1134     function will block until the limit is no longer exceeded.
1135    
1136     The default value is very large, so there is no practical limit on the
1137     number of outstanding requests.
1138    
1139     You can still queue as many requests as you want. Therefore,
1140     C<max_oustsanding> is mainly useful in simple scripts (with low values) or
1141     as a stop gap to shield against fatal memory overflow (with large values).
1142 root 1.5
1143 root 1.86 =head3 STATISTICAL INFORMATION
1144    
1145     =item IO::AIO::nreqs
1146    
1147     Returns the number of requests currently in the ready, execute or pending
1148     states (i.e. for which their callback has not been invoked yet).
1149    
1150     Example: wait till there are no outstanding requests anymore:
1151    
1152     IO::AIO::poll_wait, IO::AIO::poll_cb
1153     while IO::AIO::nreqs;
1154    
1155     =item IO::AIO::nready
1156    
1157     Returns the number of requests currently in the ready state (not yet
1158     executed).
1159    
1160     =item IO::AIO::npending
1161    
1162     Returns the number of requests currently in the pending state (executed,
1163     but not yet processed by poll_cb).
1164    
1165 root 1.5 =back
1166    
1167 root 1.1 =cut
1168    
1169 root 1.2 # support function to convert a fd into a perl filehandle
1170     sub _fd2fh {
1171     return undef if $_[0] < 0;
1172    
1173 root 1.23 # try to generate nice filehandles
1174     my $sym = "IO::AIO::fd#$_[0]";
1175     local *$sym;
1176 root 1.25
1177 root 1.27 open *$sym, "+<&=$_[0]" # usually works under any unix
1178     or open *$sym, "<&=$_[0]" # cygwin needs this
1179     or open *$sym, ">&=$_[0]" # or this
1180 root 1.2 or return undef;
1181    
1182 root 1.23 *$sym
1183 root 1.2 }
1184    
1185 root 1.61 min_parallel 8;
1186 root 1.1
1187 root 1.95 END { flush }
1188 root 1.82
1189 root 1.1 1;
1190    
1191 root 1.27 =head2 FORK BEHAVIOUR
1192    
1193 root 1.52 This module should do "the right thing" when the process using it forks:
1194    
1195 root 1.34 Before the fork, IO::AIO enters a quiescent state where no requests
1196     can be added in other threads and no results will be processed. After
1197     the fork the parent simply leaves the quiescent state and continues
1198 root 1.72 request/result processing, while the child frees the request/result queue
1199     (so that the requests started before the fork will only be handled in the
1200     parent). Threads will be started on demand until the limit set in the
1201 root 1.34 parent process has been reached again.
1202 root 1.27
1203 root 1.52 In short: the parent will, after a short pause, continue as if fork had
1204     not been called, while the child will act as if IO::AIO has not been used
1205     yet.
1206    
1207 root 1.60 =head2 MEMORY USAGE
1208    
1209 root 1.72 Per-request usage:
1210    
1211     Each aio request uses - depending on your architecture - around 100-200
1212     bytes of memory. In addition, stat requests need a stat buffer (possibly
1213     a few hundred bytes), readdir requires a result buffer and so on. Perl
1214     scalars and other data passed into aio requests will also be locked and
1215     will consume memory till the request has entered the done state.
1216 root 1.60
1217     This is now awfully much, so queuing lots of requests is not usually a
1218     problem.
1219    
1220 root 1.72 Per-thread usage:
1221    
1222     In the execution phase, some aio requests require more memory for
1223     temporary buffers, and each thread requires a stack and other data
1224     structures (usually around 16k-128k, depending on the OS).
1225    
1226     =head1 KNOWN BUGS
1227    
1228 root 1.73 Known bugs will be fixed in the next release.
1229 root 1.60
1230 root 1.1 =head1 SEE ALSO
1231    
1232 root 1.68 L<Coro::AIO>.
1233 root 1.1
1234     =head1 AUTHOR
1235    
1236     Marc Lehmann <schmorp@schmorp.de>
1237     http://home.schmorp.de/
1238    
1239     =cut
1240