ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/IO-AIO/AIO.pm
Revision: 1.100
Committed: Sun Jan 7 21:36:58 2007 UTC (17 years, 4 months ago) by root
Branch: MAIN
Changes since 1.99: +6 -5 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3     IO::AIO - Asynchronous Input/Output
4    
5     =head1 SYNOPSIS
6    
7     use IO::AIO;
8    
9 root 1.6 aio_open "/etc/passwd", O_RDONLY, 0, sub {
10 root 1.94 my $fh = shift
11     or die "/etc/passwd: $!";
12 root 1.6 ...
13     };
14    
15     aio_unlink "/tmp/file", sub { };
16    
17     aio_read $fh, 30000, 1024, $buffer, 0, sub {
18 root 1.8 $_[0] > 0 or die "read error: $!";
19 root 1.6 };
20    
21 root 1.56 # version 2+ has request and group objects
22     use IO::AIO 2;
23 root 1.52
24 root 1.68 aioreq_pri 4; # give next request a very high priority
25 root 1.52 my $req = aio_unlink "/tmp/file", sub { };
26     $req->cancel; # cancel request if still in queue
27    
28 root 1.56 my $grp = aio_group sub { print "all stats done\n" };
29     add $grp aio_stat "..." for ...;
30    
31     # AnyEvent integration
32 root 1.42 open my $fh, "<&=" . IO::AIO::poll_fileno or die "$!";
33     my $w = AnyEvent->io (fh => $fh, poll => 'r', cb => sub { IO::AIO::poll_cb });
34    
35 root 1.56 # Event integration
36 root 1.6 Event->io (fd => IO::AIO::poll_fileno,
37 root 1.7 poll => 'r',
38 root 1.6 cb => \&IO::AIO::poll_cb);
39    
40 root 1.56 # Glib/Gtk2 integration
41 root 1.6 add_watch Glib::IO IO::AIO::poll_fileno,
42 root 1.22 in => sub { IO::AIO::poll_cb; 1 };
43 root 1.6
44 root 1.56 # Tk integration
45 root 1.6 Tk::Event::IO->fileevent (IO::AIO::poll_fileno, "",
46     readable => \&IO::AIO::poll_cb);
47    
48 root 1.56 # Danga::Socket integration
49 root 1.11 Danga::Socket->AddOtherFds (IO::AIO::poll_fileno =>
50     \&IO::AIO::poll_cb);
51    
52 root 1.1 =head1 DESCRIPTION
53    
54     This module implements asynchronous I/O using whatever means your
55 root 1.2 operating system supports.
56 root 1.1
57 root 1.85 Asynchronous means that operations that can normally block your program
58     (e.g. reading from disk) will be done asynchronously: the operation
59     will still block, but you can do something else in the meantime. This
60     is extremely useful for programs that need to stay interactive even
61     when doing heavy I/O (GUI programs, high performance network servers
62     etc.), but can also be used to easily do operations in parallel that are
63     normally done sequentially, e.g. stat'ing many files, which is much faster
64     on a RAID volume or over NFS when you do a number of stat operations
65     concurrently.
66    
67 root 1.90 While most of this works on all types of file descriptors (for example
68     sockets), using these functions on file descriptors that support
69     nonblocking operation (again, sockets, pipes etc.) is very inefficient or
70     might not work (aio_read fails on sockets/pipes/fifos). Use an event loop
71     for that (such as the L<Event|Event> module): IO::AIO will naturally fit
72     into such an event loop itself.
73 root 1.85
74 root 1.72 In this version, a number of threads are started that execute your
75     requests and signal their completion. You don't need thread support
76     in perl, and the threads created by this module will not be visible
77     to perl. In the future, this module might make use of the native aio
78     functions available on many operating systems. However, they are often
79 root 1.85 not well-supported or restricted (GNU/Linux doesn't allow them on normal
80 root 1.72 files currently, for example), and they would only support aio_read and
81     aio_write, so the remaining functionality would have to be implemented
82     using threads anyway.
83    
84     Although the module will work with in the presence of other (Perl-)
85     threads, it is currently not reentrant in any way, so use appropriate
86     locking yourself, always call C<poll_cb> from within the same thread, or
87     never call C<poll_cb> (or other C<aio_> functions) recursively.
88    
89 root 1.86 =head2 EXAMPLE
90    
91     This is a simple example that uses the Event module and loads
92     F</etc/passwd> asynchronously:
93    
94     use Fcntl;
95     use Event;
96     use IO::AIO;
97    
98     # register the IO::AIO callback with Event
99     Event->io (fd => IO::AIO::poll_fileno,
100     poll => 'r',
101     cb => \&IO::AIO::poll_cb);
102    
103     # queue the request to open /etc/passwd
104     aio_open "/etc/passwd", O_RDONLY, 0, sub {
105 root 1.94 my $fh = shift
106 root 1.86 or die "error while opening: $!";
107    
108     # stat'ing filehandles is generally non-blocking
109     my $size = -s $fh;
110    
111     # queue a request to read the file
112     my $contents;
113     aio_read $fh, 0, $size, $contents, 0, sub {
114     $_[0] == $size
115     or die "short read: $!";
116    
117     close $fh;
118    
119     # file contents now in $contents
120     print $contents;
121    
122     # exit event loop and program
123     Event::unloop;
124     };
125     };
126    
127     # possibly queue up other requests, or open GUI windows,
128     # check for sockets etc. etc.
129    
130     # process events as long as there are some:
131     Event::loop;
132    
133 root 1.72 =head1 REQUEST ANATOMY AND LIFETIME
134    
135     Every C<aio_*> function creates a request. which is a C data structure not
136     directly visible to Perl.
137    
138     If called in non-void context, every request function returns a Perl
139     object representing the request. In void context, nothing is returned,
140     which saves a bit of memory.
141    
142     The perl object is a fairly standard ref-to-hash object. The hash contents
143     are not used by IO::AIO so you are free to store anything you like in it.
144    
145     During their existance, aio requests travel through the following states,
146     in order:
147    
148     =over 4
149    
150     =item ready
151    
152     Immediately after a request is created it is put into the ready state,
153     waiting for a thread to execute it.
154    
155     =item execute
156    
157     A thread has accepted the request for processing and is currently
158     executing it (e.g. blocking in read).
159    
160     =item pending
161    
162     The request has been executed and is waiting for result processing.
163    
164     While request submission and execution is fully asynchronous, result
165     processing is not and relies on the perl interpreter calling C<poll_cb>
166     (or another function with the same effect).
167    
168     =item result
169    
170     The request results are processed synchronously by C<poll_cb>.
171    
172     The C<poll_cb> function will process all outstanding aio requests by
173     calling their callbacks, freeing memory associated with them and managing
174     any groups they are contained in.
175    
176     =item done
177    
178     Request has reached the end of its lifetime and holds no resources anymore
179     (except possibly for the Perl object, but its connection to the actual
180     aio request is severed and calling its methods will either do nothing or
181     result in a runtime error).
182 root 1.1
183 root 1.88 =back
184    
185 root 1.1 =cut
186    
187     package IO::AIO;
188    
189 root 1.23 no warnings;
190 root 1.51 use strict 'vars';
191 root 1.23
192 root 1.1 use base 'Exporter';
193    
194     BEGIN {
195 root 1.99 our $VERSION = '2.32';
196 root 1.1
197 root 1.67 our @AIO_REQ = qw(aio_sendfile aio_read aio_write aio_open aio_close aio_stat
198     aio_lstat aio_unlink aio_rmdir aio_readdir aio_scandir aio_symlink
199 root 1.90 aio_readlink aio_fsync aio_fdatasync aio_readahead aio_rename aio_link
200 root 1.99 aio_move aio_copy aio_group aio_nop aio_mknod aio_load aio_rmtree);
201 root 1.95 our @EXPORT = (@AIO_REQ, qw(aioreq_pri aioreq_nice aio_block));
202 root 1.67 our @EXPORT_OK = qw(poll_fileno poll_cb poll_wait flush
203 root 1.86 min_parallel max_parallel max_idle
204     nreqs nready npending nthreads
205     max_poll_time max_poll_reqs);
206 root 1.1
207 root 1.54 @IO::AIO::GRP::ISA = 'IO::AIO::REQ';
208    
209 root 1.1 require XSLoader;
210 root 1.51 XSLoader::load ("IO::AIO", $VERSION);
211 root 1.1 }
212    
213 root 1.5 =head1 FUNCTIONS
214 root 1.1
215 root 1.87 =head2 AIO REQUEST FUNCTIONS
216 root 1.1
217 root 1.5 All the C<aio_*> calls are more or less thin wrappers around the syscall
218     with the same name (sans C<aio_>). The arguments are similar or identical,
219 root 1.14 and they all accept an additional (and optional) C<$callback> argument
220     which must be a code reference. This code reference will get called with
221     the syscall return code (e.g. most syscalls return C<-1> on error, unlike
222     perl, which usually delivers "false") as it's sole argument when the given
223     syscall has been executed asynchronously.
224 root 1.1
225 root 1.23 All functions expecting a filehandle keep a copy of the filehandle
226     internally until the request has finished.
227 root 1.1
228 root 1.87 All functions return request objects of type L<IO::AIO::REQ> that allow
229     further manipulation of those requests while they are in-flight.
230 root 1.52
231 root 1.28 The pathnames you pass to these routines I<must> be absolute and
232 root 1.87 encoded as octets. The reason for the former is that at the time the
233 root 1.28 request is being executed, the current working directory could have
234     changed. Alternatively, you can make sure that you never change the
235 root 1.87 current working directory anywhere in the program and then use relative
236     paths.
237 root 1.28
238 root 1.87 To encode pathnames as octets, either make sure you either: a) always pass
239     in filenames you got from outside (command line, readdir etc.) without
240     tinkering, b) are ASCII or ISO 8859-1, c) use the Encode module and encode
241 root 1.28 your pathnames to the locale (or other) encoding in effect in the user
242     environment, d) use Glib::filename_from_unicode on unicode filenames or e)
243 root 1.87 use something else to ensure your scalar has the correct contents.
244    
245     This works, btw. independent of the internal UTF-8 bit, which IO::AIO
246     handles correctly wether it is set or not.
247 root 1.1
248 root 1.5 =over 4
249 root 1.1
250 root 1.80 =item $prev_pri = aioreq_pri [$pri]
251 root 1.68
252 root 1.80 Returns the priority value that would be used for the next request and, if
253     C<$pri> is given, sets the priority for the next aio request.
254 root 1.68
255 root 1.80 The default priority is C<0>, the minimum and maximum priorities are C<-4>
256     and C<4>, respectively. Requests with higher priority will be serviced
257     first.
258    
259     The priority will be reset to C<0> after each call to one of the C<aio_*>
260 root 1.68 functions.
261    
262 root 1.69 Example: open a file with low priority, then read something from it with
263     higher priority so the read request is serviced before other low priority
264     open requests (potentially spamming the cache):
265    
266     aioreq_pri -3;
267     aio_open ..., sub {
268     return unless $_[0];
269    
270     aioreq_pri -2;
271     aio_read $_[0], ..., sub {
272     ...
273     };
274     };
275    
276     =item aioreq_nice $pri_adjust
277    
278     Similar to C<aioreq_pri>, but subtracts the given value from the current
279 root 1.87 priority, so the effect is cumulative.
280 root 1.69
281 root 1.40 =item aio_open $pathname, $flags, $mode, $callback->($fh)
282 root 1.1
283 root 1.2 Asynchronously open or create a file and call the callback with a newly
284     created filehandle for the file.
285 root 1.1
286     The pathname passed to C<aio_open> must be absolute. See API NOTES, above,
287     for an explanation.
288    
289 root 1.20 The C<$flags> argument is a bitmask. See the C<Fcntl> module for a
290     list. They are the same as used by C<sysopen>.
291    
292     Likewise, C<$mode> specifies the mode of the newly created file, if it
293     didn't exist and C<O_CREAT> has been given, just like perl's C<sysopen>,
294     except that it is mandatory (i.e. use C<0> if you don't create new files,
295     and C<0666> or C<0777> if you do).
296 root 1.1
297     Example:
298    
299     aio_open "/etc/passwd", O_RDONLY, 0, sub {
300 root 1.2 if ($_[0]) {
301     print "open successful, fh is $_[0]\n";
302 root 1.1 ...
303     } else {
304     die "open failed: $!\n";
305     }
306     };
307    
308 root 1.40 =item aio_close $fh, $callback->($status)
309 root 1.1
310 root 1.2 Asynchronously close a file and call the callback with the result
311     code. I<WARNING:> although accepted, you should not pass in a perl
312 root 1.20 filehandle here, as perl will likely close the file descriptor another
313     time when the filehandle is destroyed. Normally, you can safely call perls
314     C<close> or just let filehandles go out of scope.
315    
316     This is supposed to be a bug in the API, so that might change. It's
317     therefore best to avoid this function.
318 root 1.1
319 root 1.40 =item aio_read $fh,$offset,$length, $data,$dataoffset, $callback->($retval)
320 root 1.1
321 root 1.40 =item aio_write $fh,$offset,$length, $data,$dataoffset, $callback->($retval)
322 root 1.1
323     Reads or writes C<length> bytes from the specified C<fh> and C<offset>
324     into the scalar given by C<data> and offset C<dataoffset> and calls the
325     callback without the actual number of bytes read (or -1 on error, just
326     like the syscall).
327    
328 root 1.31 The C<$data> scalar I<MUST NOT> be modified in any way while the request
329     is outstanding. Modifying it can result in segfaults or WW3 (if the
330     necessary/optional hardware is installed).
331    
332 root 1.17 Example: Read 15 bytes at offset 7 into scalar C<$buffer>, starting at
333 root 1.1 offset C<0> within the scalar:
334    
335     aio_read $fh, 7, 15, $buffer, 0, sub {
336 root 1.9 $_[0] > 0 or die "read error: $!";
337     print "read $_[0] bytes: <$buffer>\n";
338 root 1.1 };
339    
340 root 1.40 =item aio_sendfile $out_fh, $in_fh, $in_offset, $length, $callback->($retval)
341 root 1.35
342     Tries to copy C<$length> bytes from C<$in_fh> to C<$out_fh>. It starts
343     reading at byte offset C<$in_offset>, and starts writing at the current
344     file offset of C<$out_fh>. Because of that, it is not safe to issue more
345     than one C<aio_sendfile> per C<$out_fh>, as they will interfere with each
346     other.
347    
348     This call tries to make use of a native C<sendfile> syscall to provide
349     zero-copy operation. For this to work, C<$out_fh> should refer to a
350     socket, and C<$in_fh> should refer to mmap'able file.
351    
352     If the native sendfile call fails or is not implemented, it will be
353 root 1.36 emulated, so you can call C<aio_sendfile> on any type of filehandle
354     regardless of the limitations of the operating system.
355 root 1.35
356     Please note, however, that C<aio_sendfile> can read more bytes from
357     C<$in_fh> than are written, and there is no way to find out how many
358 root 1.36 bytes have been read from C<aio_sendfile> alone, as C<aio_sendfile> only
359     provides the number of bytes written to C<$out_fh>. Only if the result
360     value equals C<$length> one can assume that C<$length> bytes have been
361     read.
362 root 1.35
363 root 1.40 =item aio_readahead $fh,$offset,$length, $callback->($retval)
364 root 1.1
365 root 1.20 C<aio_readahead> populates the page cache with data from a file so that
366 root 1.1 subsequent reads from that file will not block on disk I/O. The C<$offset>
367     argument specifies the starting point from which data is to be read and
368     C<$length> specifies the number of bytes to be read. I/O is performed in
369     whole pages, so that offset is effectively rounded down to a page boundary
370     and bytes are read up to the next page boundary greater than or equal to
371 root 1.20 (off-set+length). C<aio_readahead> does not read beyond the end of the
372 root 1.1 file. The current file offset of the file is left unchanged.
373    
374 root 1.26 If that syscall doesn't exist (likely if your OS isn't Linux) it will be
375     emulated by simply reading the data, which would have a similar effect.
376    
377 root 1.40 =item aio_stat $fh_or_path, $callback->($status)
378 root 1.1
379 root 1.40 =item aio_lstat $fh, $callback->($status)
380 root 1.1
381     Works like perl's C<stat> or C<lstat> in void context. The callback will
382     be called after the stat and the results will be available using C<stat _>
383     or C<-s _> etc...
384    
385     The pathname passed to C<aio_stat> must be absolute. See API NOTES, above,
386     for an explanation.
387    
388     Currently, the stats are always 64-bit-stats, i.e. instead of returning an
389     error when stat'ing a large file, the results will be silently truncated
390     unless perl itself is compiled with large file support.
391    
392     Example: Print the length of F</etc/passwd>:
393    
394     aio_stat "/etc/passwd", sub {
395     $_[0] and die "stat failed: $!";
396     print "size is ", -s _, "\n";
397     };
398    
399 root 1.40 =item aio_unlink $pathname, $callback->($status)
400 root 1.1
401     Asynchronously unlink (delete) a file and call the callback with the
402     result code.
403    
404 root 1.82 =item aio_mknod $path, $mode, $dev, $callback->($status)
405    
406 root 1.86 [EXPERIMENTAL]
407    
408 root 1.83 Asynchronously create a device node (or fifo). See mknod(2).
409    
410 root 1.86 The only (POSIX-) portable way of calling this function is:
411 root 1.83
412     aio_mknod $path, IO::AIO::S_IFIFO | $mode, 0, sub { ...
413 root 1.82
414 root 1.50 =item aio_link $srcpath, $dstpath, $callback->($status)
415    
416     Asynchronously create a new link to the existing object at C<$srcpath> at
417     the path C<$dstpath> and call the callback with the result code.
418    
419     =item aio_symlink $srcpath, $dstpath, $callback->($status)
420    
421     Asynchronously create a new symbolic link to the existing object at C<$srcpath> at
422     the path C<$dstpath> and call the callback with the result code.
423    
424 root 1.90 =item aio_readlink $path, $callback->($link)
425    
426     Asynchronously read the symlink specified by C<$path> and pass it to
427     the callback. If an error occurs, nothing or undef gets passed to the
428     callback.
429    
430 root 1.50 =item aio_rename $srcpath, $dstpath, $callback->($status)
431    
432     Asynchronously rename the object at C<$srcpath> to C<$dstpath>, just as
433     rename(2) and call the callback with the result code.
434    
435 root 1.40 =item aio_rmdir $pathname, $callback->($status)
436 root 1.27
437     Asynchronously rmdir (delete) a directory and call the callback with the
438     result code.
439    
440 root 1.46 =item aio_readdir $pathname, $callback->($entries)
441 root 1.37
442     Unlike the POSIX call of the same name, C<aio_readdir> reads an entire
443     directory (i.e. opendir + readdir + closedir). The entries will not be
444     sorted, and will B<NOT> include the C<.> and C<..> entries.
445    
446     The callback a single argument which is either C<undef> or an array-ref
447     with the filenames.
448    
449 root 1.98 =item aio_load $path, $data, $callback->($status)
450    
451     This is a composite request that tries to fully load the given file into
452     memory. Status is the same as with aio_read.
453    
454     =cut
455    
456     sub aio_load($$;$) {
457     aio_block {
458     my ($path, undef, $cb) = @_;
459     my $data = \$_[1];
460    
461     my $pri = aioreq_pri;
462     my $grp = aio_group $cb;
463    
464     aioreq_pri $pri;
465     add $grp aio_open $path, O_RDONLY, 0, sub {
466     my ($fh) = @_
467     or return $grp->result (-1);
468    
469     aioreq_pri $pri;
470     add $grp aio_read $fh, 0, (-s $fh), $$data, 0, sub {
471     $grp->result ($_[0]);
472     };
473     };
474    
475     $grp
476     }
477     }
478    
479 root 1.82 =item aio_copy $srcpath, $dstpath, $callback->($status)
480    
481     Try to copy the I<file> (directories not supported as either source or
482     destination) from C<$srcpath> to C<$dstpath> and call the callback with
483     the C<0> (error) or C<-1> ok.
484    
485     This is a composite request that it creates the destination file with
486     mode 0200 and copies the contents of the source file into it using
487     C<aio_sendfile>, followed by restoring atime, mtime, access mode and
488     uid/gid, in that order.
489    
490     If an error occurs, the partial destination file will be unlinked, if
491     possible, except when setting atime, mtime, access mode and uid/gid, where
492     errors are being ignored.
493    
494     =cut
495    
496     sub aio_copy($$;$) {
497 root 1.95 aio_block {
498     my ($src, $dst, $cb) = @_;
499 root 1.82
500 root 1.95 my $pri = aioreq_pri;
501     my $grp = aio_group $cb;
502 root 1.82
503 root 1.95 aioreq_pri $pri;
504     add $grp aio_open $src, O_RDONLY, 0, sub {
505     if (my $src_fh = $_[0]) {
506     my @stat = stat $src_fh;
507    
508     aioreq_pri $pri;
509     add $grp aio_open $dst, O_CREAT | O_WRONLY | O_TRUNC, 0200, sub {
510     if (my $dst_fh = $_[0]) {
511     aioreq_pri $pri;
512     add $grp aio_sendfile $dst_fh, $src_fh, 0, $stat[7], sub {
513     if ($_[0] == $stat[7]) {
514     $grp->result (0);
515     close $src_fh;
516    
517     # those should not normally block. should. should.
518     utime $stat[8], $stat[9], $dst;
519     chmod $stat[2] & 07777, $dst_fh;
520     chown $stat[4], $stat[5], $dst_fh;
521     close $dst_fh;
522     } else {
523     $grp->result (-1);
524     close $src_fh;
525     close $dst_fh;
526 root 1.82
527 root 1.95 aioreq $pri;
528     add $grp aio_unlink $dst;
529     }
530     };
531     } else {
532     $grp->result (-1);
533     }
534     },
535    
536     } else {
537     $grp->result (-1);
538     }
539     };
540 root 1.82
541 root 1.95 $grp
542     }
543 root 1.82 }
544    
545     =item aio_move $srcpath, $dstpath, $callback->($status)
546    
547     Try to move the I<file> (directories not supported as either source or
548     destination) from C<$srcpath> to C<$dstpath> and call the callback with
549     the C<0> (error) or C<-1> ok.
550    
551     This is a composite request that tries to rename(2) the file first. If
552     rename files with C<EXDEV>, it copies the file with C<aio_copy> and, if
553     that is successful, unlinking the C<$srcpath>.
554    
555     =cut
556    
557     sub aio_move($$;$) {
558 root 1.95 aio_block {
559     my ($src, $dst, $cb) = @_;
560 root 1.82
561 root 1.95 my $pri = aioreq_pri;
562     my $grp = aio_group $cb;
563 root 1.82
564 root 1.95 aioreq_pri $pri;
565     add $grp aio_rename $src, $dst, sub {
566     if ($_[0] && $! == EXDEV) {
567     aioreq_pri $pri;
568     add $grp aio_copy $src, $dst, sub {
569     $grp->result ($_[0]);
570    
571     if (!$_[0]) {
572     aioreq_pri $pri;
573     add $grp aio_unlink $src;
574     }
575     };
576     } else {
577 root 1.82 $grp->result ($_[0]);
578 root 1.95 }
579     };
580 root 1.82
581 root 1.95 $grp
582     }
583 root 1.82 }
584    
585 root 1.40 =item aio_scandir $path, $maxreq, $callback->($dirs, $nondirs)
586    
587 root 1.52 Scans a directory (similar to C<aio_readdir>) but additionally tries to
588 root 1.76 efficiently separate the entries of directory C<$path> into two sets of
589     names, directories you can recurse into (directories), and ones you cannot
590     recurse into (everything else, including symlinks to directories).
591 root 1.52
592 root 1.61 C<aio_scandir> is a composite request that creates of many sub requests_
593     C<$maxreq> specifies the maximum number of outstanding aio requests that
594     this function generates. If it is C<< <= 0 >>, then a suitable default
595 root 1.81 will be chosen (currently 4).
596 root 1.40
597     On error, the callback is called without arguments, otherwise it receives
598     two array-refs with path-relative entry names.
599    
600     Example:
601    
602     aio_scandir $dir, 0, sub {
603     my ($dirs, $nondirs) = @_;
604     print "real directories: @$dirs\n";
605     print "everything else: @$nondirs\n";
606     };
607    
608     Implementation notes.
609    
610     The C<aio_readdir> cannot be avoided, but C<stat()>'ing every entry can.
611    
612     After reading the directory, the modification time, size etc. of the
613 root 1.52 directory before and after the readdir is checked, and if they match (and
614     isn't the current time), the link count will be used to decide how many
615     entries are directories (if >= 2). Otherwise, no knowledge of the number
616     of subdirectories will be assumed.
617    
618     Then entries will be sorted into likely directories (everything without
619     a non-initial dot currently) and likely non-directories (everything
620     else). Then every entry plus an appended C</.> will be C<stat>'ed,
621     likely directories first. If that succeeds, it assumes that the entry
622     is a directory or a symlink to directory (which will be checked
623     seperately). This is often faster than stat'ing the entry itself because
624     filesystems might detect the type of the entry without reading the inode
625     data (e.g. ext2fs filetype feature).
626    
627     If the known number of directories (link count - 2) has been reached, the
628     rest of the entries is assumed to be non-directories.
629    
630     This only works with certainty on POSIX (= UNIX) filesystems, which
631     fortunately are the vast majority of filesystems around.
632    
633     It will also likely work on non-POSIX filesystems with reduced efficiency
634     as those tend to return 0 or 1 as link counts, which disables the
635     directory counting heuristic.
636 root 1.40
637     =cut
638    
639 root 1.100 sub aio_scandir($$;$) {
640 root 1.95 aio_block {
641     my ($path, $maxreq, $cb) = @_;
642 root 1.40
643 root 1.95 my $pri = aioreq_pri;
644 root 1.80
645 root 1.95 my $grp = aio_group $cb;
646 root 1.55
647 root 1.95 $maxreq = 4 if $maxreq <= 0;
648 root 1.40
649 root 1.95 # stat once
650 root 1.80 aioreq_pri $pri;
651 root 1.95 add $grp aio_stat $path, sub {
652     return $grp->result () if $_[0];
653     my $now = time;
654     my $hash1 = join ":", (stat _)[0,1,3,7,9];
655 root 1.40
656 root 1.95 # read the directory entries
657 root 1.80 aioreq_pri $pri;
658 root 1.95 add $grp aio_readdir $path, sub {
659     my $entries = shift
660     or return $grp->result ();
661    
662     # stat the dir another time
663     aioreq_pri $pri;
664     add $grp aio_stat $path, sub {
665     my $hash2 = join ":", (stat _)[0,1,3,7,9];
666    
667     my $ndirs;
668    
669     # take the slow route if anything looks fishy
670     if ($hash1 ne $hash2 or (stat _)[9] == $now) {
671     $ndirs = -1;
672     } else {
673     # if nlink == 2, we are finished
674     # on non-posix-fs's, we rely on nlink < 2
675     $ndirs = (stat _)[3] - 2
676     or return $grp->result ([], $entries);
677     }
678    
679     # sort into likely dirs and likely nondirs
680     # dirs == files without ".", short entries first
681     $entries = [map $_->[0],
682     sort { $b->[1] cmp $a->[1] }
683     map [$_, sprintf "%s%04d", (/.\./ ? "1" : "0"), length],
684     @$entries];
685 root 1.40
686 root 1.95 my (@dirs, @nondirs);
687 root 1.40
688 root 1.95 my $statgrp = add $grp aio_group sub {
689     $grp->result (\@dirs, \@nondirs);
690     };
691 root 1.40
692 root 1.95 limit $statgrp $maxreq;
693     feed $statgrp sub {
694     return unless @$entries;
695     my $entry = pop @$entries;
696    
697     aioreq_pri $pri;
698     add $statgrp aio_stat "$path/$entry/.", sub {
699     if ($_[0] < 0) {
700     push @nondirs, $entry;
701     } else {
702     # need to check for real directory
703     aioreq_pri $pri;
704     add $statgrp aio_lstat "$path/$entry", sub {
705     if (-d _) {
706     push @dirs, $entry;
707    
708     unless (--$ndirs) {
709     push @nondirs, @$entries;
710     feed $statgrp;
711     }
712     } else {
713     push @nondirs, $entry;
714 root 1.74 }
715 root 1.40 }
716     }
717 root 1.95 };
718 root 1.74 };
719 root 1.40 };
720     };
721     };
722 root 1.55
723 root 1.95 $grp
724     }
725 root 1.40 }
726    
727 root 1.99 =item aio_rmtree $path, $callback->($status)
728    
729 root 1.100 Delete a directory tree starting (and including) C<$path>, return the
730     status of the final C<rmdir> only. This is a composite request that
731     uses C<aio_scandir> to recurse into and rmdir directories, and unlink
732     everything else.
733 root 1.99
734     =cut
735    
736     sub aio_rmtree;
737 root 1.100 sub aio_rmtree($;$) {
738 root 1.99 aio_block {
739     my ($path, $cb) = @_;
740    
741     my $pri = aioreq_pri;
742     my $grp = aio_group $cb;
743    
744     aioreq_pri $pri;
745     add $grp aio_scandir $path, 0, sub {
746     my ($dirs, $nondirs) = @_;
747    
748     my $dirgrp = aio_group sub {
749     add $grp aio_rmdir $path, sub {
750     $grp->result ($_[0]);
751     };
752     };
753    
754     (aioreq_pri $pri), add $dirgrp aio_rmtree "$path/$_" for @$dirs;
755     (aioreq_pri $pri), add $dirgrp aio_unlink "$path/$_" for @$nondirs;
756    
757     add $grp $dirgrp;
758     };
759    
760     $grp
761     }
762     }
763    
764 root 1.40 =item aio_fsync $fh, $callback->($status)
765 root 1.1
766     Asynchronously call fsync on the given filehandle and call the callback
767     with the fsync result code.
768    
769 root 1.40 =item aio_fdatasync $fh, $callback->($status)
770 root 1.1
771     Asynchronously call fdatasync on the given filehandle and call the
772 root 1.26 callback with the fdatasync result code.
773    
774     If this call isn't available because your OS lacks it or it couldn't be
775     detected, it will be emulated by calling C<fsync> instead.
776 root 1.1
777 root 1.58 =item aio_group $callback->(...)
778 root 1.54
779 root 1.55 This is a very special aio request: Instead of doing something, it is a
780     container for other aio requests, which is useful if you want to bundle
781 root 1.71 many requests into a single, composite, request with a definite callback
782     and the ability to cancel the whole request with its subrequests.
783 root 1.55
784     Returns an object of class L<IO::AIO::GRP>. See its documentation below
785     for more info.
786    
787     Example:
788    
789     my $grp = aio_group sub {
790     print "all stats done\n";
791     };
792    
793     add $grp
794     (aio_stat ...),
795     (aio_stat ...),
796     ...;
797    
798 root 1.63 =item aio_nop $callback->()
799    
800     This is a special request - it does nothing in itself and is only used for
801     side effects, such as when you want to add a dummy request to a group so
802     that finishing the requests in the group depends on executing the given
803     code.
804    
805 root 1.64 While this request does nothing, it still goes through the execution
806     phase and still requires a worker thread. Thus, the callback will not
807     be executed immediately but only after other requests in the queue have
808     entered their execution phase. This can be used to measure request
809     latency.
810    
811 root 1.71 =item IO::AIO::aio_busy $fractional_seconds, $callback->() *NOT EXPORTED*
812 root 1.54
813     Mainly used for debugging and benchmarking, this aio request puts one of
814     the request workers to sleep for the given time.
815    
816 root 1.56 While it is theoretically handy to have simple I/O scheduling requests
817 root 1.71 like sleep and file handle readable/writable, the overhead this creates is
818     immense (it blocks a thread for a long time) so do not use this function
819     except to put your application under artificial I/O pressure.
820 root 1.56
821 root 1.5 =back
822    
823 root 1.53 =head2 IO::AIO::REQ CLASS
824 root 1.52
825     All non-aggregate C<aio_*> functions return an object of this class when
826     called in non-void context.
827    
828     =over 4
829    
830 root 1.65 =item cancel $req
831 root 1.52
832     Cancels the request, if possible. Has the effect of skipping execution
833     when entering the B<execute> state and skipping calling the callback when
834     entering the the B<result> state, but will leave the request otherwise
835     untouched. That means that requests that currently execute will not be
836     stopped and resources held by the request will not be freed prematurely.
837    
838 root 1.65 =item cb $req $callback->(...)
839    
840     Replace (or simply set) the callback registered to the request.
841    
842 root 1.52 =back
843    
844 root 1.55 =head2 IO::AIO::GRP CLASS
845    
846     This class is a subclass of L<IO::AIO::REQ>, so all its methods apply to
847     objects of this class, too.
848    
849     A IO::AIO::GRP object is a special request that can contain multiple other
850     aio requests.
851    
852     You create one by calling the C<aio_group> constructing function with a
853     callback that will be called when all contained requests have entered the
854     C<done> state:
855    
856     my $grp = aio_group sub {
857     print "all requests are done\n";
858     };
859    
860     You add requests by calling the C<add> method with one or more
861     C<IO::AIO::REQ> objects:
862    
863     $grp->add (aio_unlink "...");
864    
865 root 1.58 add $grp aio_stat "...", sub {
866     $_[0] or return $grp->result ("error");
867    
868     # add another request dynamically, if first succeeded
869     add $grp aio_open "...", sub {
870     $grp->result ("ok");
871     };
872     };
873 root 1.55
874     This makes it very easy to create composite requests (see the source of
875     C<aio_move> for an application) that work and feel like simple requests.
876    
877 root 1.62 =over 4
878    
879     =item * The IO::AIO::GRP objects will be cleaned up during calls to
880 root 1.55 C<IO::AIO::poll_cb>, just like any other request.
881    
882 root 1.62 =item * They can be canceled like any other request. Canceling will cancel not
883 root 1.59 only the request itself, but also all requests it contains.
884 root 1.55
885 root 1.62 =item * They can also can also be added to other IO::AIO::GRP objects.
886 root 1.55
887 root 1.62 =item * You must not add requests to a group from within the group callback (or
888 root 1.60 any later time).
889    
890 root 1.62 =back
891    
892 root 1.55 Their lifetime, simplified, looks like this: when they are empty, they
893     will finish very quickly. If they contain only requests that are in the
894     C<done> state, they will also finish. Otherwise they will continue to
895     exist.
896    
897 root 1.57 That means after creating a group you have some time to add requests. And
898     in the callbacks of those requests, you can add further requests to the
899     group. And only when all those requests have finished will the the group
900     itself finish.
901    
902 root 1.55 =over 4
903    
904 root 1.65 =item add $grp ...
905    
906 root 1.55 =item $grp->add (...)
907    
908 root 1.57 Add one or more requests to the group. Any type of L<IO::AIO::REQ> can
909     be added, including other groups, as long as you do not create circular
910     dependencies.
911    
912     Returns all its arguments.
913 root 1.55
914 root 1.74 =item $grp->cancel_subs
915    
916     Cancel all subrequests and clears any feeder, but not the group request
917     itself. Useful when you queued a lot of events but got a result early.
918    
919 root 1.58 =item $grp->result (...)
920    
921     Set the result value(s) that will be passed to the group callback when all
922 root 1.80 subrequests have finished and set thre groups errno to the current value
923     of errno (just like calling C<errno> without an error number). By default,
924     no argument will be passed and errno is zero.
925    
926     =item $grp->errno ([$errno])
927    
928     Sets the group errno value to C<$errno>, or the current value of errno
929     when the argument is missing.
930    
931     Every aio request has an associated errno value that is restored when
932     the callback is invoked. This method lets you change this value from its
933     default (0).
934    
935     Calling C<result> will also set errno, so make sure you either set C<$!>
936     before the call to C<result>, or call c<errno> after it.
937 root 1.58
938 root 1.65 =item feed $grp $callback->($grp)
939 root 1.60
940     Sets a feeder/generator on this group: every group can have an attached
941     generator that generates requests if idle. The idea behind this is that,
942     although you could just queue as many requests as you want in a group,
943     this might starve other requests for a potentially long time. For
944     example, C<aio_scandir> might generate hundreds of thousands C<aio_stat>
945     requests, delaying any later requests for a long time.
946    
947     To avoid this, and allow incremental generation of requests, you can
948     instead a group and set a feeder on it that generates those requests. The
949 root 1.68 feed callback will be called whenever there are few enough (see C<limit>,
950 root 1.60 below) requests active in the group itself and is expected to queue more
951     requests.
952    
953 root 1.68 The feed callback can queue as many requests as it likes (i.e. C<add> does
954     not impose any limits).
955 root 1.60
956 root 1.65 If the feed does not queue more requests when called, it will be
957 root 1.60 automatically removed from the group.
958    
959 root 1.65 If the feed limit is C<0>, it will be set to C<2> automatically.
960 root 1.60
961     Example:
962    
963     # stat all files in @files, but only ever use four aio requests concurrently:
964    
965     my $grp = aio_group sub { print "finished\n" };
966 root 1.68 limit $grp 4;
967 root 1.65 feed $grp sub {
968 root 1.60 my $file = pop @files
969     or return;
970    
971     add $grp aio_stat $file, sub { ... };
972 root 1.65 };
973 root 1.60
974 root 1.68 =item limit $grp $num
975 root 1.60
976     Sets the feeder limit for the group: The feeder will be called whenever
977     the group contains less than this many requests.
978    
979     Setting the limit to C<0> will pause the feeding process.
980    
981 root 1.55 =back
982    
983 root 1.5 =head2 SUPPORT FUNCTIONS
984    
985 root 1.86 =head3 EVENT PROCESSING AND EVENT LOOP INTEGRATION
986    
987 root 1.5 =over 4
988    
989     =item $fileno = IO::AIO::poll_fileno
990    
991 root 1.20 Return the I<request result pipe file descriptor>. This filehandle must be
992     polled for reading by some mechanism outside this module (e.g. Event or
993     select, see below or the SYNOPSIS). If the pipe becomes readable you have
994     to call C<poll_cb> to check the results.
995 root 1.5
996     See C<poll_cb> for an example.
997    
998     =item IO::AIO::poll_cb
999    
1000 root 1.86 Process some outstanding events on the result pipe. You have to call this
1001 root 1.5 regularly. Returns the number of events processed. Returns immediately
1002 root 1.86 when no events are outstanding. The amount of events processed depends on
1003     the settings of C<IO::AIO::max_poll_req> and C<IO::AIO::max_poll_time>.
1004 root 1.5
1005 root 1.78 If not all requests were processed for whatever reason, the filehandle
1006     will still be ready when C<poll_cb> returns.
1007    
1008 root 1.20 Example: Install an Event watcher that automatically calls
1009     IO::AIO::poll_cb with high priority:
1010 root 1.5
1011     Event->io (fd => IO::AIO::poll_fileno,
1012     poll => 'r', async => 1,
1013     cb => \&IO::AIO::poll_cb);
1014    
1015 root 1.86 =item IO::AIO::max_poll_reqs $nreqs
1016    
1017     =item IO::AIO::max_poll_time $seconds
1018    
1019     These set the maximum number of requests (default C<0>, meaning infinity)
1020     that are being processed by C<IO::AIO::poll_cb> in one call, respectively
1021     the maximum amount of time (default C<0>, meaning infinity) spent in
1022     C<IO::AIO::poll_cb> to process requests (more correctly the mininum amount
1023     of time C<poll_cb> is allowed to use).
1024 root 1.78
1025 root 1.89 Setting C<max_poll_time> to a non-zero value creates an overhead of one
1026     syscall per request processed, which is not normally a problem unless your
1027     callbacks are really really fast or your OS is really really slow (I am
1028     not mentioning Solaris here). Using C<max_poll_reqs> incurs no overhead.
1029    
1030 root 1.86 Setting these is useful if you want to ensure some level of
1031     interactiveness when perl is not fast enough to process all requests in
1032     time.
1033 root 1.78
1034 root 1.86 For interactive programs, values such as C<0.01> to C<0.1> should be fine.
1035 root 1.78
1036     Example: Install an Event watcher that automatically calls
1037 root 1.89 IO::AIO::poll_cb with low priority, to ensure that other parts of the
1038 root 1.78 program get the CPU sometimes even under high AIO load.
1039    
1040 root 1.86 # try not to spend much more than 0.1s in poll_cb
1041     IO::AIO::max_poll_time 0.1;
1042    
1043     # use a low priority so other tasks have priority
1044 root 1.78 Event->io (fd => IO::AIO::poll_fileno,
1045     poll => 'r', nice => 1,
1046 root 1.86 cb => &IO::AIO::poll_cb);
1047 root 1.78
1048 root 1.5 =item IO::AIO::poll_wait
1049    
1050 root 1.93 If there are any outstanding requests and none of them in the result
1051     phase, wait till the result filehandle becomes ready for reading (simply
1052     does a C<select> on the filehandle. This is useful if you want to
1053     synchronously wait for some requests to finish).
1054 root 1.5
1055     See C<nreqs> for an example.
1056    
1057 root 1.86 =item IO::AIO::poll
1058 root 1.5
1059 root 1.86 Waits until some requests have been handled.
1060 root 1.5
1061 root 1.92 Returns the number of requests processed, but is otherwise strictly
1062     equivalent to:
1063 root 1.5
1064     IO::AIO::poll_wait, IO::AIO::poll_cb
1065 root 1.80
1066 root 1.12 =item IO::AIO::flush
1067    
1068     Wait till all outstanding AIO requests have been handled.
1069    
1070 root 1.13 Strictly equivalent to:
1071    
1072     IO::AIO::poll_wait, IO::AIO::poll_cb
1073     while IO::AIO::nreqs;
1074    
1075 root 1.86 =head3 CONTROLLING THE NUMBER OF THREADS
1076 root 1.13
1077 root 1.5 =item IO::AIO::min_parallel $nthreads
1078    
1079 root 1.61 Set the minimum number of AIO threads to C<$nthreads>. The current
1080     default is C<8>, which means eight asynchronous operations can execute
1081     concurrently at any one time (the number of outstanding requests,
1082     however, is unlimited).
1083 root 1.5
1084 root 1.34 IO::AIO starts threads only on demand, when an AIO request is queued and
1085 root 1.86 no free thread exists. Please note that queueing up a hundred requests can
1086     create demand for a hundred threads, even if it turns out that everything
1087     is in the cache and could have been processed faster by a single thread.
1088 root 1.34
1089 root 1.61 It is recommended to keep the number of threads relatively low, as some
1090     Linux kernel versions will scale negatively with the number of threads
1091     (higher parallelity => MUCH higher latency). With current Linux 2.6
1092     versions, 4-32 threads should be fine.
1093 root 1.5
1094 root 1.34 Under most circumstances you don't need to call this function, as the
1095     module selects a default that is suitable for low to moderate load.
1096 root 1.5
1097     =item IO::AIO::max_parallel $nthreads
1098    
1099 root 1.34 Sets the maximum number of AIO threads to C<$nthreads>. If more than the
1100     specified number of threads are currently running, this function kills
1101     them. This function blocks until the limit is reached.
1102    
1103     While C<$nthreads> are zero, aio requests get queued but not executed
1104     until the number of threads has been increased again.
1105 root 1.5
1106     This module automatically runs C<max_parallel 0> at program end, to ensure
1107     that all threads are killed and that there are no outstanding requests.
1108    
1109     Under normal circumstances you don't need to call this function.
1110    
1111 root 1.86 =item IO::AIO::max_idle $nthreads
1112    
1113     Limit the number of threads (default: 4) that are allowed to idle (i.e.,
1114     threads that did not get a request to process within 10 seconds). That
1115     means if a thread becomes idle while C<$nthreads> other threads are also
1116     idle, it will free its resources and exit.
1117    
1118     This is useful when you allow a large number of threads (e.g. 100 or 1000)
1119     to allow for extremely high load situations, but want to free resources
1120     under normal circumstances (1000 threads can easily consume 30MB of RAM).
1121    
1122     The default is probably ok in most situations, especially if thread
1123     creation is fast. If thread creation is very slow on your system you might
1124     want to use larger values.
1125    
1126 root 1.79 =item $oldmaxreqs = IO::AIO::max_outstanding $maxreqs
1127 root 1.5
1128 root 1.79 This is a very bad function to use in interactive programs because it
1129     blocks, and a bad way to reduce concurrency because it is inexact: Better
1130     use an C<aio_group> together with a feed callback.
1131    
1132     Sets the maximum number of outstanding requests to C<$nreqs>. If you
1133     to queue up more than this number of requests, the next call to the
1134     C<poll_cb> (and C<poll_some> and other functions calling C<poll_cb>)
1135     function will block until the limit is no longer exceeded.
1136    
1137     The default value is very large, so there is no practical limit on the
1138     number of outstanding requests.
1139    
1140     You can still queue as many requests as you want. Therefore,
1141     C<max_oustsanding> is mainly useful in simple scripts (with low values) or
1142     as a stop gap to shield against fatal memory overflow (with large values).
1143 root 1.5
1144 root 1.86 =head3 STATISTICAL INFORMATION
1145    
1146     =item IO::AIO::nreqs
1147    
1148     Returns the number of requests currently in the ready, execute or pending
1149     states (i.e. for which their callback has not been invoked yet).
1150    
1151     Example: wait till there are no outstanding requests anymore:
1152    
1153     IO::AIO::poll_wait, IO::AIO::poll_cb
1154     while IO::AIO::nreqs;
1155    
1156     =item IO::AIO::nready
1157    
1158     Returns the number of requests currently in the ready state (not yet
1159     executed).
1160    
1161     =item IO::AIO::npending
1162    
1163     Returns the number of requests currently in the pending state (executed,
1164     but not yet processed by poll_cb).
1165    
1166 root 1.5 =back
1167    
1168 root 1.1 =cut
1169    
1170 root 1.2 # support function to convert a fd into a perl filehandle
1171     sub _fd2fh {
1172     return undef if $_[0] < 0;
1173    
1174 root 1.23 # try to generate nice filehandles
1175     my $sym = "IO::AIO::fd#$_[0]";
1176     local *$sym;
1177 root 1.25
1178 root 1.27 open *$sym, "+<&=$_[0]" # usually works under any unix
1179     or open *$sym, "<&=$_[0]" # cygwin needs this
1180     or open *$sym, ">&=$_[0]" # or this
1181 root 1.2 or return undef;
1182    
1183 root 1.23 *$sym
1184 root 1.2 }
1185    
1186 root 1.61 min_parallel 8;
1187 root 1.1
1188 root 1.95 END { flush }
1189 root 1.82
1190 root 1.1 1;
1191    
1192 root 1.27 =head2 FORK BEHAVIOUR
1193    
1194 root 1.52 This module should do "the right thing" when the process using it forks:
1195    
1196 root 1.34 Before the fork, IO::AIO enters a quiescent state where no requests
1197     can be added in other threads and no results will be processed. After
1198     the fork the parent simply leaves the quiescent state and continues
1199 root 1.72 request/result processing, while the child frees the request/result queue
1200     (so that the requests started before the fork will only be handled in the
1201     parent). Threads will be started on demand until the limit set in the
1202 root 1.34 parent process has been reached again.
1203 root 1.27
1204 root 1.52 In short: the parent will, after a short pause, continue as if fork had
1205     not been called, while the child will act as if IO::AIO has not been used
1206     yet.
1207    
1208 root 1.60 =head2 MEMORY USAGE
1209    
1210 root 1.72 Per-request usage:
1211    
1212     Each aio request uses - depending on your architecture - around 100-200
1213     bytes of memory. In addition, stat requests need a stat buffer (possibly
1214     a few hundred bytes), readdir requires a result buffer and so on. Perl
1215     scalars and other data passed into aio requests will also be locked and
1216     will consume memory till the request has entered the done state.
1217 root 1.60
1218     This is now awfully much, so queuing lots of requests is not usually a
1219     problem.
1220    
1221 root 1.72 Per-thread usage:
1222    
1223     In the execution phase, some aio requests require more memory for
1224     temporary buffers, and each thread requires a stack and other data
1225     structures (usually around 16k-128k, depending on the OS).
1226    
1227     =head1 KNOWN BUGS
1228    
1229 root 1.73 Known bugs will be fixed in the next release.
1230 root 1.60
1231 root 1.1 =head1 SEE ALSO
1232    
1233 root 1.68 L<Coro::AIO>.
1234 root 1.1
1235     =head1 AUTHOR
1236    
1237     Marc Lehmann <schmorp@schmorp.de>
1238     http://home.schmorp.de/
1239    
1240     =cut
1241