ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/IO-AIO/AIO.pm
Revision: 1.98
Committed: Sun Dec 31 17:07:32 2006 UTC (17 years, 5 months ago) by root
Branch: MAIN
CVS Tags: rel-2_31
Changes since 1.97: +32 -2 lines
Log Message:
add aio_load

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3     IO::AIO - Asynchronous Input/Output
4    
5     =head1 SYNOPSIS
6    
7     use IO::AIO;
8    
9 root 1.6 aio_open "/etc/passwd", O_RDONLY, 0, sub {
10 root 1.94 my $fh = shift
11     or die "/etc/passwd: $!";
12 root 1.6 ...
13     };
14    
15     aio_unlink "/tmp/file", sub { };
16    
17     aio_read $fh, 30000, 1024, $buffer, 0, sub {
18 root 1.8 $_[0] > 0 or die "read error: $!";
19 root 1.6 };
20    
21 root 1.56 # version 2+ has request and group objects
22     use IO::AIO 2;
23 root 1.52
24 root 1.68 aioreq_pri 4; # give next request a very high priority
25 root 1.52 my $req = aio_unlink "/tmp/file", sub { };
26     $req->cancel; # cancel request if still in queue
27    
28 root 1.56 my $grp = aio_group sub { print "all stats done\n" };
29     add $grp aio_stat "..." for ...;
30    
31     # AnyEvent integration
32 root 1.42 open my $fh, "<&=" . IO::AIO::poll_fileno or die "$!";
33     my $w = AnyEvent->io (fh => $fh, poll => 'r', cb => sub { IO::AIO::poll_cb });
34    
35 root 1.56 # Event integration
36 root 1.6 Event->io (fd => IO::AIO::poll_fileno,
37 root 1.7 poll => 'r',
38 root 1.6 cb => \&IO::AIO::poll_cb);
39    
40 root 1.56 # Glib/Gtk2 integration
41 root 1.6 add_watch Glib::IO IO::AIO::poll_fileno,
42 root 1.22 in => sub { IO::AIO::poll_cb; 1 };
43 root 1.6
44 root 1.56 # Tk integration
45 root 1.6 Tk::Event::IO->fileevent (IO::AIO::poll_fileno, "",
46     readable => \&IO::AIO::poll_cb);
47    
48 root 1.56 # Danga::Socket integration
49 root 1.11 Danga::Socket->AddOtherFds (IO::AIO::poll_fileno =>
50     \&IO::AIO::poll_cb);
51    
52 root 1.1 =head1 DESCRIPTION
53    
54     This module implements asynchronous I/O using whatever means your
55 root 1.2 operating system supports.
56 root 1.1
57 root 1.85 Asynchronous means that operations that can normally block your program
58     (e.g. reading from disk) will be done asynchronously: the operation
59     will still block, but you can do something else in the meantime. This
60     is extremely useful for programs that need to stay interactive even
61     when doing heavy I/O (GUI programs, high performance network servers
62     etc.), but can also be used to easily do operations in parallel that are
63     normally done sequentially, e.g. stat'ing many files, which is much faster
64     on a RAID volume or over NFS when you do a number of stat operations
65     concurrently.
66    
67 root 1.90 While most of this works on all types of file descriptors (for example
68     sockets), using these functions on file descriptors that support
69     nonblocking operation (again, sockets, pipes etc.) is very inefficient or
70     might not work (aio_read fails on sockets/pipes/fifos). Use an event loop
71     for that (such as the L<Event|Event> module): IO::AIO will naturally fit
72     into such an event loop itself.
73 root 1.85
74 root 1.72 In this version, a number of threads are started that execute your
75     requests and signal their completion. You don't need thread support
76     in perl, and the threads created by this module will not be visible
77     to perl. In the future, this module might make use of the native aio
78     functions available on many operating systems. However, they are often
79 root 1.85 not well-supported or restricted (GNU/Linux doesn't allow them on normal
80 root 1.72 files currently, for example), and they would only support aio_read and
81     aio_write, so the remaining functionality would have to be implemented
82     using threads anyway.
83    
84     Although the module will work with in the presence of other (Perl-)
85     threads, it is currently not reentrant in any way, so use appropriate
86     locking yourself, always call C<poll_cb> from within the same thread, or
87     never call C<poll_cb> (or other C<aio_> functions) recursively.
88    
89 root 1.86 =head2 EXAMPLE
90    
91     This is a simple example that uses the Event module and loads
92     F</etc/passwd> asynchronously:
93    
94     use Fcntl;
95     use Event;
96     use IO::AIO;
97    
98     # register the IO::AIO callback with Event
99     Event->io (fd => IO::AIO::poll_fileno,
100     poll => 'r',
101     cb => \&IO::AIO::poll_cb);
102    
103     # queue the request to open /etc/passwd
104     aio_open "/etc/passwd", O_RDONLY, 0, sub {
105 root 1.94 my $fh = shift
106 root 1.86 or die "error while opening: $!";
107    
108     # stat'ing filehandles is generally non-blocking
109     my $size = -s $fh;
110    
111     # queue a request to read the file
112     my $contents;
113     aio_read $fh, 0, $size, $contents, 0, sub {
114     $_[0] == $size
115     or die "short read: $!";
116    
117     close $fh;
118    
119     # file contents now in $contents
120     print $contents;
121    
122     # exit event loop and program
123     Event::unloop;
124     };
125     };
126    
127     # possibly queue up other requests, or open GUI windows,
128     # check for sockets etc. etc.
129    
130     # process events as long as there are some:
131     Event::loop;
132    
133 root 1.72 =head1 REQUEST ANATOMY AND LIFETIME
134    
135     Every C<aio_*> function creates a request. which is a C data structure not
136     directly visible to Perl.
137    
138     If called in non-void context, every request function returns a Perl
139     object representing the request. In void context, nothing is returned,
140     which saves a bit of memory.
141    
142     The perl object is a fairly standard ref-to-hash object. The hash contents
143     are not used by IO::AIO so you are free to store anything you like in it.
144    
145     During their existance, aio requests travel through the following states,
146     in order:
147    
148     =over 4
149    
150     =item ready
151    
152     Immediately after a request is created it is put into the ready state,
153     waiting for a thread to execute it.
154    
155     =item execute
156    
157     A thread has accepted the request for processing and is currently
158     executing it (e.g. blocking in read).
159    
160     =item pending
161    
162     The request has been executed and is waiting for result processing.
163    
164     While request submission and execution is fully asynchronous, result
165     processing is not and relies on the perl interpreter calling C<poll_cb>
166     (or another function with the same effect).
167    
168     =item result
169    
170     The request results are processed synchronously by C<poll_cb>.
171    
172     The C<poll_cb> function will process all outstanding aio requests by
173     calling their callbacks, freeing memory associated with them and managing
174     any groups they are contained in.
175    
176     =item done
177    
178     Request has reached the end of its lifetime and holds no resources anymore
179     (except possibly for the Perl object, but its connection to the actual
180     aio request is severed and calling its methods will either do nothing or
181     result in a runtime error).
182 root 1.1
183 root 1.88 =back
184    
185 root 1.1 =cut
186    
187     package IO::AIO;
188    
189 root 1.23 no warnings;
190 root 1.51 use strict 'vars';
191 root 1.23
192 root 1.1 use base 'Exporter';
193    
194     BEGIN {
195 root 1.98 our $VERSION = '2.31';
196 root 1.1
197 root 1.67 our @AIO_REQ = qw(aio_sendfile aio_read aio_write aio_open aio_close aio_stat
198     aio_lstat aio_unlink aio_rmdir aio_readdir aio_scandir aio_symlink
199 root 1.90 aio_readlink aio_fsync aio_fdatasync aio_readahead aio_rename aio_link
200 root 1.98 aio_move aio_copy aio_group aio_nop aio_mknod aio_load);
201 root 1.95 our @EXPORT = (@AIO_REQ, qw(aioreq_pri aioreq_nice aio_block));
202 root 1.67 our @EXPORT_OK = qw(poll_fileno poll_cb poll_wait flush
203 root 1.86 min_parallel max_parallel max_idle
204     nreqs nready npending nthreads
205     max_poll_time max_poll_reqs);
206 root 1.1
207 root 1.54 @IO::AIO::GRP::ISA = 'IO::AIO::REQ';
208    
209 root 1.1 require XSLoader;
210 root 1.51 XSLoader::load ("IO::AIO", $VERSION);
211 root 1.1 }
212    
213 root 1.5 =head1 FUNCTIONS
214 root 1.1
215 root 1.87 =head2 AIO REQUEST FUNCTIONS
216 root 1.1
217 root 1.5 All the C<aio_*> calls are more or less thin wrappers around the syscall
218     with the same name (sans C<aio_>). The arguments are similar or identical,
219 root 1.14 and they all accept an additional (and optional) C<$callback> argument
220     which must be a code reference. This code reference will get called with
221     the syscall return code (e.g. most syscalls return C<-1> on error, unlike
222     perl, which usually delivers "false") as it's sole argument when the given
223     syscall has been executed asynchronously.
224 root 1.1
225 root 1.23 All functions expecting a filehandle keep a copy of the filehandle
226     internally until the request has finished.
227 root 1.1
228 root 1.87 All functions return request objects of type L<IO::AIO::REQ> that allow
229     further manipulation of those requests while they are in-flight.
230 root 1.52
231 root 1.28 The pathnames you pass to these routines I<must> be absolute and
232 root 1.87 encoded as octets. The reason for the former is that at the time the
233 root 1.28 request is being executed, the current working directory could have
234     changed. Alternatively, you can make sure that you never change the
235 root 1.87 current working directory anywhere in the program and then use relative
236     paths.
237 root 1.28
238 root 1.87 To encode pathnames as octets, either make sure you either: a) always pass
239     in filenames you got from outside (command line, readdir etc.) without
240     tinkering, b) are ASCII or ISO 8859-1, c) use the Encode module and encode
241 root 1.28 your pathnames to the locale (or other) encoding in effect in the user
242     environment, d) use Glib::filename_from_unicode on unicode filenames or e)
243 root 1.87 use something else to ensure your scalar has the correct contents.
244    
245     This works, btw. independent of the internal UTF-8 bit, which IO::AIO
246     handles correctly wether it is set or not.
247 root 1.1
248 root 1.5 =over 4
249 root 1.1
250 root 1.80 =item $prev_pri = aioreq_pri [$pri]
251 root 1.68
252 root 1.80 Returns the priority value that would be used for the next request and, if
253     C<$pri> is given, sets the priority for the next aio request.
254 root 1.68
255 root 1.80 The default priority is C<0>, the minimum and maximum priorities are C<-4>
256     and C<4>, respectively. Requests with higher priority will be serviced
257     first.
258    
259     The priority will be reset to C<0> after each call to one of the C<aio_*>
260 root 1.68 functions.
261    
262 root 1.69 Example: open a file with low priority, then read something from it with
263     higher priority so the read request is serviced before other low priority
264     open requests (potentially spamming the cache):
265    
266     aioreq_pri -3;
267     aio_open ..., sub {
268     return unless $_[0];
269    
270     aioreq_pri -2;
271     aio_read $_[0], ..., sub {
272     ...
273     };
274     };
275    
276     =item aioreq_nice $pri_adjust
277    
278     Similar to C<aioreq_pri>, but subtracts the given value from the current
279 root 1.87 priority, so the effect is cumulative.
280 root 1.69
281 root 1.40 =item aio_open $pathname, $flags, $mode, $callback->($fh)
282 root 1.1
283 root 1.2 Asynchronously open or create a file and call the callback with a newly
284     created filehandle for the file.
285 root 1.1
286     The pathname passed to C<aio_open> must be absolute. See API NOTES, above,
287     for an explanation.
288    
289 root 1.20 The C<$flags> argument is a bitmask. See the C<Fcntl> module for a
290     list. They are the same as used by C<sysopen>.
291    
292     Likewise, C<$mode> specifies the mode of the newly created file, if it
293     didn't exist and C<O_CREAT> has been given, just like perl's C<sysopen>,
294     except that it is mandatory (i.e. use C<0> if you don't create new files,
295     and C<0666> or C<0777> if you do).
296 root 1.1
297     Example:
298    
299     aio_open "/etc/passwd", O_RDONLY, 0, sub {
300 root 1.2 if ($_[0]) {
301     print "open successful, fh is $_[0]\n";
302 root 1.1 ...
303     } else {
304     die "open failed: $!\n";
305     }
306     };
307    
308 root 1.40 =item aio_close $fh, $callback->($status)
309 root 1.1
310 root 1.2 Asynchronously close a file and call the callback with the result
311     code. I<WARNING:> although accepted, you should not pass in a perl
312 root 1.20 filehandle here, as perl will likely close the file descriptor another
313     time when the filehandle is destroyed. Normally, you can safely call perls
314     C<close> or just let filehandles go out of scope.
315    
316     This is supposed to be a bug in the API, so that might change. It's
317     therefore best to avoid this function.
318 root 1.1
319 root 1.40 =item aio_read $fh,$offset,$length, $data,$dataoffset, $callback->($retval)
320 root 1.1
321 root 1.40 =item aio_write $fh,$offset,$length, $data,$dataoffset, $callback->($retval)
322 root 1.1
323     Reads or writes C<length> bytes from the specified C<fh> and C<offset>
324     into the scalar given by C<data> and offset C<dataoffset> and calls the
325     callback without the actual number of bytes read (or -1 on error, just
326     like the syscall).
327    
328 root 1.31 The C<$data> scalar I<MUST NOT> be modified in any way while the request
329     is outstanding. Modifying it can result in segfaults or WW3 (if the
330     necessary/optional hardware is installed).
331    
332 root 1.17 Example: Read 15 bytes at offset 7 into scalar C<$buffer>, starting at
333 root 1.1 offset C<0> within the scalar:
334    
335     aio_read $fh, 7, 15, $buffer, 0, sub {
336 root 1.9 $_[0] > 0 or die "read error: $!";
337     print "read $_[0] bytes: <$buffer>\n";
338 root 1.1 };
339    
340 root 1.40 =item aio_sendfile $out_fh, $in_fh, $in_offset, $length, $callback->($retval)
341 root 1.35
342     Tries to copy C<$length> bytes from C<$in_fh> to C<$out_fh>. It starts
343     reading at byte offset C<$in_offset>, and starts writing at the current
344     file offset of C<$out_fh>. Because of that, it is not safe to issue more
345     than one C<aio_sendfile> per C<$out_fh>, as they will interfere with each
346     other.
347    
348     This call tries to make use of a native C<sendfile> syscall to provide
349     zero-copy operation. For this to work, C<$out_fh> should refer to a
350     socket, and C<$in_fh> should refer to mmap'able file.
351    
352     If the native sendfile call fails or is not implemented, it will be
353 root 1.36 emulated, so you can call C<aio_sendfile> on any type of filehandle
354     regardless of the limitations of the operating system.
355 root 1.35
356     Please note, however, that C<aio_sendfile> can read more bytes from
357     C<$in_fh> than are written, and there is no way to find out how many
358 root 1.36 bytes have been read from C<aio_sendfile> alone, as C<aio_sendfile> only
359     provides the number of bytes written to C<$out_fh>. Only if the result
360     value equals C<$length> one can assume that C<$length> bytes have been
361     read.
362 root 1.35
363 root 1.40 =item aio_readahead $fh,$offset,$length, $callback->($retval)
364 root 1.1
365 root 1.20 C<aio_readahead> populates the page cache with data from a file so that
366 root 1.1 subsequent reads from that file will not block on disk I/O. The C<$offset>
367     argument specifies the starting point from which data is to be read and
368     C<$length> specifies the number of bytes to be read. I/O is performed in
369     whole pages, so that offset is effectively rounded down to a page boundary
370     and bytes are read up to the next page boundary greater than or equal to
371 root 1.20 (off-set+length). C<aio_readahead> does not read beyond the end of the
372 root 1.1 file. The current file offset of the file is left unchanged.
373    
374 root 1.26 If that syscall doesn't exist (likely if your OS isn't Linux) it will be
375     emulated by simply reading the data, which would have a similar effect.
376    
377 root 1.40 =item aio_stat $fh_or_path, $callback->($status)
378 root 1.1
379 root 1.40 =item aio_lstat $fh, $callback->($status)
380 root 1.1
381     Works like perl's C<stat> or C<lstat> in void context. The callback will
382     be called after the stat and the results will be available using C<stat _>
383     or C<-s _> etc...
384    
385     The pathname passed to C<aio_stat> must be absolute. See API NOTES, above,
386     for an explanation.
387    
388     Currently, the stats are always 64-bit-stats, i.e. instead of returning an
389     error when stat'ing a large file, the results will be silently truncated
390     unless perl itself is compiled with large file support.
391    
392     Example: Print the length of F</etc/passwd>:
393    
394     aio_stat "/etc/passwd", sub {
395     $_[0] and die "stat failed: $!";
396     print "size is ", -s _, "\n";
397     };
398    
399 root 1.40 =item aio_unlink $pathname, $callback->($status)
400 root 1.1
401     Asynchronously unlink (delete) a file and call the callback with the
402     result code.
403    
404 root 1.82 =item aio_mknod $path, $mode, $dev, $callback->($status)
405    
406 root 1.86 [EXPERIMENTAL]
407    
408 root 1.83 Asynchronously create a device node (or fifo). See mknod(2).
409    
410 root 1.86 The only (POSIX-) portable way of calling this function is:
411 root 1.83
412     aio_mknod $path, IO::AIO::S_IFIFO | $mode, 0, sub { ...
413 root 1.82
414 root 1.50 =item aio_link $srcpath, $dstpath, $callback->($status)
415    
416     Asynchronously create a new link to the existing object at C<$srcpath> at
417     the path C<$dstpath> and call the callback with the result code.
418    
419     =item aio_symlink $srcpath, $dstpath, $callback->($status)
420    
421     Asynchronously create a new symbolic link to the existing object at C<$srcpath> at
422     the path C<$dstpath> and call the callback with the result code.
423    
424 root 1.90 =item aio_readlink $path, $callback->($link)
425    
426     Asynchronously read the symlink specified by C<$path> and pass it to
427     the callback. If an error occurs, nothing or undef gets passed to the
428     callback.
429    
430 root 1.50 =item aio_rename $srcpath, $dstpath, $callback->($status)
431    
432     Asynchronously rename the object at C<$srcpath> to C<$dstpath>, just as
433     rename(2) and call the callback with the result code.
434    
435 root 1.40 =item aio_rmdir $pathname, $callback->($status)
436 root 1.27
437     Asynchronously rmdir (delete) a directory and call the callback with the
438     result code.
439    
440 root 1.46 =item aio_readdir $pathname, $callback->($entries)
441 root 1.37
442     Unlike the POSIX call of the same name, C<aio_readdir> reads an entire
443     directory (i.e. opendir + readdir + closedir). The entries will not be
444     sorted, and will B<NOT> include the C<.> and C<..> entries.
445    
446     The callback a single argument which is either C<undef> or an array-ref
447     with the filenames.
448    
449 root 1.98 =item aio_load $path, $data, $callback->($status)
450    
451     This is a composite request that tries to fully load the given file into
452     memory. Status is the same as with aio_read.
453    
454     =cut
455    
456     sub aio_load($$;$) {
457     aio_block {
458     my ($path, undef, $cb) = @_;
459     my $data = \$_[1];
460    
461     my $pri = aioreq_pri;
462     my $grp = aio_group $cb;
463    
464     aioreq_pri $pri;
465     add $grp aio_open $path, O_RDONLY, 0, sub {
466     my ($fh) = @_
467     or return $grp->result (-1);
468    
469     aioreq_pri $pri;
470     add $grp aio_read $fh, 0, (-s $fh), $$data, 0, sub {
471     $grp->result ($_[0]);
472     };
473     };
474    
475     $grp
476     }
477     }
478    
479 root 1.82 =item aio_copy $srcpath, $dstpath, $callback->($status)
480    
481     Try to copy the I<file> (directories not supported as either source or
482     destination) from C<$srcpath> to C<$dstpath> and call the callback with
483     the C<0> (error) or C<-1> ok.
484    
485     This is a composite request that it creates the destination file with
486     mode 0200 and copies the contents of the source file into it using
487     C<aio_sendfile>, followed by restoring atime, mtime, access mode and
488     uid/gid, in that order.
489    
490     If an error occurs, the partial destination file will be unlinked, if
491     possible, except when setting atime, mtime, access mode and uid/gid, where
492     errors are being ignored.
493    
494     =cut
495    
496     sub aio_copy($$;$) {
497 root 1.95 aio_block {
498     my ($src, $dst, $cb) = @_;
499 root 1.82
500 root 1.95 my $pri = aioreq_pri;
501     my $grp = aio_group $cb;
502 root 1.82
503 root 1.95 aioreq_pri $pri;
504     add $grp aio_open $src, O_RDONLY, 0, sub {
505     if (my $src_fh = $_[0]) {
506     my @stat = stat $src_fh;
507    
508     aioreq_pri $pri;
509     add $grp aio_open $dst, O_CREAT | O_WRONLY | O_TRUNC, 0200, sub {
510     if (my $dst_fh = $_[0]) {
511     aioreq_pri $pri;
512     add $grp aio_sendfile $dst_fh, $src_fh, 0, $stat[7], sub {
513     if ($_[0] == $stat[7]) {
514     $grp->result (0);
515     close $src_fh;
516    
517     # those should not normally block. should. should.
518     utime $stat[8], $stat[9], $dst;
519     chmod $stat[2] & 07777, $dst_fh;
520     chown $stat[4], $stat[5], $dst_fh;
521     close $dst_fh;
522     } else {
523     $grp->result (-1);
524     close $src_fh;
525     close $dst_fh;
526 root 1.82
527 root 1.95 aioreq $pri;
528     add $grp aio_unlink $dst;
529     }
530     };
531     } else {
532     $grp->result (-1);
533     }
534     },
535    
536     } else {
537     $grp->result (-1);
538     }
539     };
540 root 1.82
541 root 1.95 $grp
542     }
543 root 1.82 }
544    
545     =item aio_move $srcpath, $dstpath, $callback->($status)
546    
547     Try to move the I<file> (directories not supported as either source or
548     destination) from C<$srcpath> to C<$dstpath> and call the callback with
549     the C<0> (error) or C<-1> ok.
550    
551     This is a composite request that tries to rename(2) the file first. If
552     rename files with C<EXDEV>, it copies the file with C<aio_copy> and, if
553     that is successful, unlinking the C<$srcpath>.
554    
555     =cut
556    
557     sub aio_move($$;$) {
558 root 1.95 aio_block {
559     my ($src, $dst, $cb) = @_;
560 root 1.82
561 root 1.95 my $pri = aioreq_pri;
562     my $grp = aio_group $cb;
563 root 1.82
564 root 1.95 aioreq_pri $pri;
565     add $grp aio_rename $src, $dst, sub {
566     if ($_[0] && $! == EXDEV) {
567     aioreq_pri $pri;
568     add $grp aio_copy $src, $dst, sub {
569     $grp->result ($_[0]);
570    
571     if (!$_[0]) {
572     aioreq_pri $pri;
573     add $grp aio_unlink $src;
574     }
575     };
576     } else {
577 root 1.82 $grp->result ($_[0]);
578 root 1.95 }
579     };
580 root 1.82
581 root 1.95 $grp
582     }
583 root 1.82 }
584    
585 root 1.40 =item aio_scandir $path, $maxreq, $callback->($dirs, $nondirs)
586    
587 root 1.52 Scans a directory (similar to C<aio_readdir>) but additionally tries to
588 root 1.76 efficiently separate the entries of directory C<$path> into two sets of
589     names, directories you can recurse into (directories), and ones you cannot
590     recurse into (everything else, including symlinks to directories).
591 root 1.52
592 root 1.61 C<aio_scandir> is a composite request that creates of many sub requests_
593     C<$maxreq> specifies the maximum number of outstanding aio requests that
594     this function generates. If it is C<< <= 0 >>, then a suitable default
595 root 1.81 will be chosen (currently 4).
596 root 1.40
597     On error, the callback is called without arguments, otherwise it receives
598     two array-refs with path-relative entry names.
599    
600     Example:
601    
602     aio_scandir $dir, 0, sub {
603     my ($dirs, $nondirs) = @_;
604     print "real directories: @$dirs\n";
605     print "everything else: @$nondirs\n";
606     };
607    
608     Implementation notes.
609    
610     The C<aio_readdir> cannot be avoided, but C<stat()>'ing every entry can.
611    
612     After reading the directory, the modification time, size etc. of the
613 root 1.52 directory before and after the readdir is checked, and if they match (and
614     isn't the current time), the link count will be used to decide how many
615     entries are directories (if >= 2). Otherwise, no knowledge of the number
616     of subdirectories will be assumed.
617    
618     Then entries will be sorted into likely directories (everything without
619     a non-initial dot currently) and likely non-directories (everything
620     else). Then every entry plus an appended C</.> will be C<stat>'ed,
621     likely directories first. If that succeeds, it assumes that the entry
622     is a directory or a symlink to directory (which will be checked
623     seperately). This is often faster than stat'ing the entry itself because
624     filesystems might detect the type of the entry without reading the inode
625     data (e.g. ext2fs filetype feature).
626    
627     If the known number of directories (link count - 2) has been reached, the
628     rest of the entries is assumed to be non-directories.
629    
630     This only works with certainty on POSIX (= UNIX) filesystems, which
631     fortunately are the vast majority of filesystems around.
632    
633     It will also likely work on non-POSIX filesystems with reduced efficiency
634     as those tend to return 0 or 1 as link counts, which disables the
635     directory counting heuristic.
636 root 1.40
637     =cut
638    
639     sub aio_scandir($$$) {
640 root 1.95 aio_block {
641     my ($path, $maxreq, $cb) = @_;
642 root 1.40
643 root 1.95 my $pri = aioreq_pri;
644 root 1.80
645 root 1.95 my $grp = aio_group $cb;
646 root 1.55
647 root 1.95 $maxreq = 4 if $maxreq <= 0;
648 root 1.40
649 root 1.95 # stat once
650 root 1.80 aioreq_pri $pri;
651 root 1.95 add $grp aio_stat $path, sub {
652     return $grp->result () if $_[0];
653     my $now = time;
654     my $hash1 = join ":", (stat _)[0,1,3,7,9];
655 root 1.40
656 root 1.95 # read the directory entries
657 root 1.80 aioreq_pri $pri;
658 root 1.95 add $grp aio_readdir $path, sub {
659     my $entries = shift
660     or return $grp->result ();
661    
662     # stat the dir another time
663     aioreq_pri $pri;
664     add $grp aio_stat $path, sub {
665     my $hash2 = join ":", (stat _)[0,1,3,7,9];
666    
667     my $ndirs;
668    
669     # take the slow route if anything looks fishy
670     if ($hash1 ne $hash2 or (stat _)[9] == $now) {
671     $ndirs = -1;
672     } else {
673     # if nlink == 2, we are finished
674     # on non-posix-fs's, we rely on nlink < 2
675     $ndirs = (stat _)[3] - 2
676     or return $grp->result ([], $entries);
677     }
678    
679     # sort into likely dirs and likely nondirs
680     # dirs == files without ".", short entries first
681     $entries = [map $_->[0],
682     sort { $b->[1] cmp $a->[1] }
683     map [$_, sprintf "%s%04d", (/.\./ ? "1" : "0"), length],
684     @$entries];
685 root 1.40
686 root 1.95 my (@dirs, @nondirs);
687 root 1.40
688 root 1.95 my $statgrp = add $grp aio_group sub {
689     $grp->result (\@dirs, \@nondirs);
690     };
691 root 1.40
692 root 1.95 limit $statgrp $maxreq;
693     feed $statgrp sub {
694     return unless @$entries;
695     my $entry = pop @$entries;
696    
697     aioreq_pri $pri;
698     add $statgrp aio_stat "$path/$entry/.", sub {
699     if ($_[0] < 0) {
700     push @nondirs, $entry;
701     } else {
702     # need to check for real directory
703     aioreq_pri $pri;
704     add $statgrp aio_lstat "$path/$entry", sub {
705     if (-d _) {
706     push @dirs, $entry;
707    
708     unless (--$ndirs) {
709     push @nondirs, @$entries;
710     feed $statgrp;
711     }
712     } else {
713     push @nondirs, $entry;
714 root 1.74 }
715 root 1.40 }
716     }
717 root 1.95 };
718 root 1.74 };
719 root 1.40 };
720     };
721     };
722 root 1.55
723 root 1.95 $grp
724     }
725 root 1.40 }
726    
727     =item aio_fsync $fh, $callback->($status)
728 root 1.1
729     Asynchronously call fsync on the given filehandle and call the callback
730     with the fsync result code.
731    
732 root 1.40 =item aio_fdatasync $fh, $callback->($status)
733 root 1.1
734     Asynchronously call fdatasync on the given filehandle and call the
735 root 1.26 callback with the fdatasync result code.
736    
737     If this call isn't available because your OS lacks it or it couldn't be
738     detected, it will be emulated by calling C<fsync> instead.
739 root 1.1
740 root 1.58 =item aio_group $callback->(...)
741 root 1.54
742 root 1.55 This is a very special aio request: Instead of doing something, it is a
743     container for other aio requests, which is useful if you want to bundle
744 root 1.71 many requests into a single, composite, request with a definite callback
745     and the ability to cancel the whole request with its subrequests.
746 root 1.55
747     Returns an object of class L<IO::AIO::GRP>. See its documentation below
748     for more info.
749    
750     Example:
751    
752     my $grp = aio_group sub {
753     print "all stats done\n";
754     };
755    
756     add $grp
757     (aio_stat ...),
758     (aio_stat ...),
759     ...;
760    
761 root 1.63 =item aio_nop $callback->()
762    
763     This is a special request - it does nothing in itself and is only used for
764     side effects, such as when you want to add a dummy request to a group so
765     that finishing the requests in the group depends on executing the given
766     code.
767    
768 root 1.64 While this request does nothing, it still goes through the execution
769     phase and still requires a worker thread. Thus, the callback will not
770     be executed immediately but only after other requests in the queue have
771     entered their execution phase. This can be used to measure request
772     latency.
773    
774 root 1.71 =item IO::AIO::aio_busy $fractional_seconds, $callback->() *NOT EXPORTED*
775 root 1.54
776     Mainly used for debugging and benchmarking, this aio request puts one of
777     the request workers to sleep for the given time.
778    
779 root 1.56 While it is theoretically handy to have simple I/O scheduling requests
780 root 1.71 like sleep and file handle readable/writable, the overhead this creates is
781     immense (it blocks a thread for a long time) so do not use this function
782     except to put your application under artificial I/O pressure.
783 root 1.56
784 root 1.5 =back
785    
786 root 1.53 =head2 IO::AIO::REQ CLASS
787 root 1.52
788     All non-aggregate C<aio_*> functions return an object of this class when
789     called in non-void context.
790    
791     =over 4
792    
793 root 1.65 =item cancel $req
794 root 1.52
795     Cancels the request, if possible. Has the effect of skipping execution
796     when entering the B<execute> state and skipping calling the callback when
797     entering the the B<result> state, but will leave the request otherwise
798     untouched. That means that requests that currently execute will not be
799     stopped and resources held by the request will not be freed prematurely.
800    
801 root 1.65 =item cb $req $callback->(...)
802    
803     Replace (or simply set) the callback registered to the request.
804    
805 root 1.52 =back
806    
807 root 1.55 =head2 IO::AIO::GRP CLASS
808    
809     This class is a subclass of L<IO::AIO::REQ>, so all its methods apply to
810     objects of this class, too.
811    
812     A IO::AIO::GRP object is a special request that can contain multiple other
813     aio requests.
814    
815     You create one by calling the C<aio_group> constructing function with a
816     callback that will be called when all contained requests have entered the
817     C<done> state:
818    
819     my $grp = aio_group sub {
820     print "all requests are done\n";
821     };
822    
823     You add requests by calling the C<add> method with one or more
824     C<IO::AIO::REQ> objects:
825    
826     $grp->add (aio_unlink "...");
827    
828 root 1.58 add $grp aio_stat "...", sub {
829     $_[0] or return $grp->result ("error");
830    
831     # add another request dynamically, if first succeeded
832     add $grp aio_open "...", sub {
833     $grp->result ("ok");
834     };
835     };
836 root 1.55
837     This makes it very easy to create composite requests (see the source of
838     C<aio_move> for an application) that work and feel like simple requests.
839    
840 root 1.62 =over 4
841    
842     =item * The IO::AIO::GRP objects will be cleaned up during calls to
843 root 1.55 C<IO::AIO::poll_cb>, just like any other request.
844    
845 root 1.62 =item * They can be canceled like any other request. Canceling will cancel not
846 root 1.59 only the request itself, but also all requests it contains.
847 root 1.55
848 root 1.62 =item * They can also can also be added to other IO::AIO::GRP objects.
849 root 1.55
850 root 1.62 =item * You must not add requests to a group from within the group callback (or
851 root 1.60 any later time).
852    
853 root 1.62 =back
854    
855 root 1.55 Their lifetime, simplified, looks like this: when they are empty, they
856     will finish very quickly. If they contain only requests that are in the
857     C<done> state, they will also finish. Otherwise they will continue to
858     exist.
859    
860 root 1.57 That means after creating a group you have some time to add requests. And
861     in the callbacks of those requests, you can add further requests to the
862     group. And only when all those requests have finished will the the group
863     itself finish.
864    
865 root 1.55 =over 4
866    
867 root 1.65 =item add $grp ...
868    
869 root 1.55 =item $grp->add (...)
870    
871 root 1.57 Add one or more requests to the group. Any type of L<IO::AIO::REQ> can
872     be added, including other groups, as long as you do not create circular
873     dependencies.
874    
875     Returns all its arguments.
876 root 1.55
877 root 1.74 =item $grp->cancel_subs
878    
879     Cancel all subrequests and clears any feeder, but not the group request
880     itself. Useful when you queued a lot of events but got a result early.
881    
882 root 1.58 =item $grp->result (...)
883    
884     Set the result value(s) that will be passed to the group callback when all
885 root 1.80 subrequests have finished and set thre groups errno to the current value
886     of errno (just like calling C<errno> without an error number). By default,
887     no argument will be passed and errno is zero.
888    
889     =item $grp->errno ([$errno])
890    
891     Sets the group errno value to C<$errno>, or the current value of errno
892     when the argument is missing.
893    
894     Every aio request has an associated errno value that is restored when
895     the callback is invoked. This method lets you change this value from its
896     default (0).
897    
898     Calling C<result> will also set errno, so make sure you either set C<$!>
899     before the call to C<result>, or call c<errno> after it.
900 root 1.58
901 root 1.65 =item feed $grp $callback->($grp)
902 root 1.60
903     Sets a feeder/generator on this group: every group can have an attached
904     generator that generates requests if idle. The idea behind this is that,
905     although you could just queue as many requests as you want in a group,
906     this might starve other requests for a potentially long time. For
907     example, C<aio_scandir> might generate hundreds of thousands C<aio_stat>
908     requests, delaying any later requests for a long time.
909    
910     To avoid this, and allow incremental generation of requests, you can
911     instead a group and set a feeder on it that generates those requests. The
912 root 1.68 feed callback will be called whenever there are few enough (see C<limit>,
913 root 1.60 below) requests active in the group itself and is expected to queue more
914     requests.
915    
916 root 1.68 The feed callback can queue as many requests as it likes (i.e. C<add> does
917     not impose any limits).
918 root 1.60
919 root 1.65 If the feed does not queue more requests when called, it will be
920 root 1.60 automatically removed from the group.
921    
922 root 1.65 If the feed limit is C<0>, it will be set to C<2> automatically.
923 root 1.60
924     Example:
925    
926     # stat all files in @files, but only ever use four aio requests concurrently:
927    
928     my $grp = aio_group sub { print "finished\n" };
929 root 1.68 limit $grp 4;
930 root 1.65 feed $grp sub {
931 root 1.60 my $file = pop @files
932     or return;
933    
934     add $grp aio_stat $file, sub { ... };
935 root 1.65 };
936 root 1.60
937 root 1.68 =item limit $grp $num
938 root 1.60
939     Sets the feeder limit for the group: The feeder will be called whenever
940     the group contains less than this many requests.
941    
942     Setting the limit to C<0> will pause the feeding process.
943    
944 root 1.55 =back
945    
946 root 1.5 =head2 SUPPORT FUNCTIONS
947    
948 root 1.86 =head3 EVENT PROCESSING AND EVENT LOOP INTEGRATION
949    
950 root 1.5 =over 4
951    
952     =item $fileno = IO::AIO::poll_fileno
953    
954 root 1.20 Return the I<request result pipe file descriptor>. This filehandle must be
955     polled for reading by some mechanism outside this module (e.g. Event or
956     select, see below or the SYNOPSIS). If the pipe becomes readable you have
957     to call C<poll_cb> to check the results.
958 root 1.5
959     See C<poll_cb> for an example.
960    
961     =item IO::AIO::poll_cb
962    
963 root 1.86 Process some outstanding events on the result pipe. You have to call this
964 root 1.5 regularly. Returns the number of events processed. Returns immediately
965 root 1.86 when no events are outstanding. The amount of events processed depends on
966     the settings of C<IO::AIO::max_poll_req> and C<IO::AIO::max_poll_time>.
967 root 1.5
968 root 1.78 If not all requests were processed for whatever reason, the filehandle
969     will still be ready when C<poll_cb> returns.
970    
971 root 1.20 Example: Install an Event watcher that automatically calls
972     IO::AIO::poll_cb with high priority:
973 root 1.5
974     Event->io (fd => IO::AIO::poll_fileno,
975     poll => 'r', async => 1,
976     cb => \&IO::AIO::poll_cb);
977    
978 root 1.86 =item IO::AIO::max_poll_reqs $nreqs
979    
980     =item IO::AIO::max_poll_time $seconds
981    
982     These set the maximum number of requests (default C<0>, meaning infinity)
983     that are being processed by C<IO::AIO::poll_cb> in one call, respectively
984     the maximum amount of time (default C<0>, meaning infinity) spent in
985     C<IO::AIO::poll_cb> to process requests (more correctly the mininum amount
986     of time C<poll_cb> is allowed to use).
987 root 1.78
988 root 1.89 Setting C<max_poll_time> to a non-zero value creates an overhead of one
989     syscall per request processed, which is not normally a problem unless your
990     callbacks are really really fast or your OS is really really slow (I am
991     not mentioning Solaris here). Using C<max_poll_reqs> incurs no overhead.
992    
993 root 1.86 Setting these is useful if you want to ensure some level of
994     interactiveness when perl is not fast enough to process all requests in
995     time.
996 root 1.78
997 root 1.86 For interactive programs, values such as C<0.01> to C<0.1> should be fine.
998 root 1.78
999     Example: Install an Event watcher that automatically calls
1000 root 1.89 IO::AIO::poll_cb with low priority, to ensure that other parts of the
1001 root 1.78 program get the CPU sometimes even under high AIO load.
1002    
1003 root 1.86 # try not to spend much more than 0.1s in poll_cb
1004     IO::AIO::max_poll_time 0.1;
1005    
1006     # use a low priority so other tasks have priority
1007 root 1.78 Event->io (fd => IO::AIO::poll_fileno,
1008     poll => 'r', nice => 1,
1009 root 1.86 cb => &IO::AIO::poll_cb);
1010 root 1.78
1011 root 1.5 =item IO::AIO::poll_wait
1012    
1013 root 1.93 If there are any outstanding requests and none of them in the result
1014     phase, wait till the result filehandle becomes ready for reading (simply
1015     does a C<select> on the filehandle. This is useful if you want to
1016     synchronously wait for some requests to finish).
1017 root 1.5
1018     See C<nreqs> for an example.
1019    
1020 root 1.86 =item IO::AIO::poll
1021 root 1.5
1022 root 1.86 Waits until some requests have been handled.
1023 root 1.5
1024 root 1.92 Returns the number of requests processed, but is otherwise strictly
1025     equivalent to:
1026 root 1.5
1027     IO::AIO::poll_wait, IO::AIO::poll_cb
1028 root 1.80
1029 root 1.12 =item IO::AIO::flush
1030    
1031     Wait till all outstanding AIO requests have been handled.
1032    
1033 root 1.13 Strictly equivalent to:
1034    
1035     IO::AIO::poll_wait, IO::AIO::poll_cb
1036     while IO::AIO::nreqs;
1037    
1038 root 1.86 =head3 CONTROLLING THE NUMBER OF THREADS
1039 root 1.13
1040 root 1.5 =item IO::AIO::min_parallel $nthreads
1041    
1042 root 1.61 Set the minimum number of AIO threads to C<$nthreads>. The current
1043     default is C<8>, which means eight asynchronous operations can execute
1044     concurrently at any one time (the number of outstanding requests,
1045     however, is unlimited).
1046 root 1.5
1047 root 1.34 IO::AIO starts threads only on demand, when an AIO request is queued and
1048 root 1.86 no free thread exists. Please note that queueing up a hundred requests can
1049     create demand for a hundred threads, even if it turns out that everything
1050     is in the cache and could have been processed faster by a single thread.
1051 root 1.34
1052 root 1.61 It is recommended to keep the number of threads relatively low, as some
1053     Linux kernel versions will scale negatively with the number of threads
1054     (higher parallelity => MUCH higher latency). With current Linux 2.6
1055     versions, 4-32 threads should be fine.
1056 root 1.5
1057 root 1.34 Under most circumstances you don't need to call this function, as the
1058     module selects a default that is suitable for low to moderate load.
1059 root 1.5
1060     =item IO::AIO::max_parallel $nthreads
1061    
1062 root 1.34 Sets the maximum number of AIO threads to C<$nthreads>. If more than the
1063     specified number of threads are currently running, this function kills
1064     them. This function blocks until the limit is reached.
1065    
1066     While C<$nthreads> are zero, aio requests get queued but not executed
1067     until the number of threads has been increased again.
1068 root 1.5
1069     This module automatically runs C<max_parallel 0> at program end, to ensure
1070     that all threads are killed and that there are no outstanding requests.
1071    
1072     Under normal circumstances you don't need to call this function.
1073    
1074 root 1.86 =item IO::AIO::max_idle $nthreads
1075    
1076     Limit the number of threads (default: 4) that are allowed to idle (i.e.,
1077     threads that did not get a request to process within 10 seconds). That
1078     means if a thread becomes idle while C<$nthreads> other threads are also
1079     idle, it will free its resources and exit.
1080    
1081     This is useful when you allow a large number of threads (e.g. 100 or 1000)
1082     to allow for extremely high load situations, but want to free resources
1083     under normal circumstances (1000 threads can easily consume 30MB of RAM).
1084    
1085     The default is probably ok in most situations, especially if thread
1086     creation is fast. If thread creation is very slow on your system you might
1087     want to use larger values.
1088    
1089 root 1.79 =item $oldmaxreqs = IO::AIO::max_outstanding $maxreqs
1090 root 1.5
1091 root 1.79 This is a very bad function to use in interactive programs because it
1092     blocks, and a bad way to reduce concurrency because it is inexact: Better
1093     use an C<aio_group> together with a feed callback.
1094    
1095     Sets the maximum number of outstanding requests to C<$nreqs>. If you
1096     to queue up more than this number of requests, the next call to the
1097     C<poll_cb> (and C<poll_some> and other functions calling C<poll_cb>)
1098     function will block until the limit is no longer exceeded.
1099    
1100     The default value is very large, so there is no practical limit on the
1101     number of outstanding requests.
1102    
1103     You can still queue as many requests as you want. Therefore,
1104     C<max_oustsanding> is mainly useful in simple scripts (with low values) or
1105     as a stop gap to shield against fatal memory overflow (with large values).
1106 root 1.5
1107 root 1.86 =head3 STATISTICAL INFORMATION
1108    
1109     =item IO::AIO::nreqs
1110    
1111     Returns the number of requests currently in the ready, execute or pending
1112     states (i.e. for which their callback has not been invoked yet).
1113    
1114     Example: wait till there are no outstanding requests anymore:
1115    
1116     IO::AIO::poll_wait, IO::AIO::poll_cb
1117     while IO::AIO::nreqs;
1118    
1119     =item IO::AIO::nready
1120    
1121     Returns the number of requests currently in the ready state (not yet
1122     executed).
1123    
1124     =item IO::AIO::npending
1125    
1126     Returns the number of requests currently in the pending state (executed,
1127     but not yet processed by poll_cb).
1128    
1129 root 1.5 =back
1130    
1131 root 1.1 =cut
1132    
1133 root 1.2 # support function to convert a fd into a perl filehandle
1134     sub _fd2fh {
1135     return undef if $_[0] < 0;
1136    
1137 root 1.23 # try to generate nice filehandles
1138     my $sym = "IO::AIO::fd#$_[0]";
1139     local *$sym;
1140 root 1.25
1141 root 1.27 open *$sym, "+<&=$_[0]" # usually works under any unix
1142     or open *$sym, "<&=$_[0]" # cygwin needs this
1143     or open *$sym, ">&=$_[0]" # or this
1144 root 1.2 or return undef;
1145    
1146 root 1.23 *$sym
1147 root 1.2 }
1148    
1149 root 1.61 min_parallel 8;
1150 root 1.1
1151 root 1.95 END { flush }
1152 root 1.82
1153 root 1.1 1;
1154    
1155 root 1.27 =head2 FORK BEHAVIOUR
1156    
1157 root 1.52 This module should do "the right thing" when the process using it forks:
1158    
1159 root 1.34 Before the fork, IO::AIO enters a quiescent state where no requests
1160     can be added in other threads and no results will be processed. After
1161     the fork the parent simply leaves the quiescent state and continues
1162 root 1.72 request/result processing, while the child frees the request/result queue
1163     (so that the requests started before the fork will only be handled in the
1164     parent). Threads will be started on demand until the limit set in the
1165 root 1.34 parent process has been reached again.
1166 root 1.27
1167 root 1.52 In short: the parent will, after a short pause, continue as if fork had
1168     not been called, while the child will act as if IO::AIO has not been used
1169     yet.
1170    
1171 root 1.60 =head2 MEMORY USAGE
1172    
1173 root 1.72 Per-request usage:
1174    
1175     Each aio request uses - depending on your architecture - around 100-200
1176     bytes of memory. In addition, stat requests need a stat buffer (possibly
1177     a few hundred bytes), readdir requires a result buffer and so on. Perl
1178     scalars and other data passed into aio requests will also be locked and
1179     will consume memory till the request has entered the done state.
1180 root 1.60
1181     This is now awfully much, so queuing lots of requests is not usually a
1182     problem.
1183    
1184 root 1.72 Per-thread usage:
1185    
1186     In the execution phase, some aio requests require more memory for
1187     temporary buffers, and each thread requires a stack and other data
1188     structures (usually around 16k-128k, depending on the OS).
1189    
1190     =head1 KNOWN BUGS
1191    
1192 root 1.73 Known bugs will be fixed in the next release.
1193 root 1.60
1194 root 1.1 =head1 SEE ALSO
1195    
1196 root 1.68 L<Coro::AIO>.
1197 root 1.1
1198     =head1 AUTHOR
1199    
1200     Marc Lehmann <schmorp@schmorp.de>
1201     http://home.schmorp.de/
1202    
1203     =cut
1204