ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/IO-AIO/AIO.pm
Revision: 1.97
Committed: Sat Dec 23 04:49:37 2006 UTC (17 years, 5 months ago) by root
Branch: MAIN
CVS Tags: rel-2_3
Changes since 1.96: +1 -1 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3     IO::AIO - Asynchronous Input/Output
4    
5     =head1 SYNOPSIS
6    
7     use IO::AIO;
8    
9 root 1.6 aio_open "/etc/passwd", O_RDONLY, 0, sub {
10 root 1.94 my $fh = shift
11     or die "/etc/passwd: $!";
12 root 1.6 ...
13     };
14    
15     aio_unlink "/tmp/file", sub { };
16    
17     aio_read $fh, 30000, 1024, $buffer, 0, sub {
18 root 1.8 $_[0] > 0 or die "read error: $!";
19 root 1.6 };
20    
21 root 1.56 # version 2+ has request and group objects
22     use IO::AIO 2;
23 root 1.52
24 root 1.68 aioreq_pri 4; # give next request a very high priority
25 root 1.52 my $req = aio_unlink "/tmp/file", sub { };
26     $req->cancel; # cancel request if still in queue
27    
28 root 1.56 my $grp = aio_group sub { print "all stats done\n" };
29     add $grp aio_stat "..." for ...;
30    
31     # AnyEvent integration
32 root 1.42 open my $fh, "<&=" . IO::AIO::poll_fileno or die "$!";
33     my $w = AnyEvent->io (fh => $fh, poll => 'r', cb => sub { IO::AIO::poll_cb });
34    
35 root 1.56 # Event integration
36 root 1.6 Event->io (fd => IO::AIO::poll_fileno,
37 root 1.7 poll => 'r',
38 root 1.6 cb => \&IO::AIO::poll_cb);
39    
40 root 1.56 # Glib/Gtk2 integration
41 root 1.6 add_watch Glib::IO IO::AIO::poll_fileno,
42 root 1.22 in => sub { IO::AIO::poll_cb; 1 };
43 root 1.6
44 root 1.56 # Tk integration
45 root 1.6 Tk::Event::IO->fileevent (IO::AIO::poll_fileno, "",
46     readable => \&IO::AIO::poll_cb);
47    
48 root 1.56 # Danga::Socket integration
49 root 1.11 Danga::Socket->AddOtherFds (IO::AIO::poll_fileno =>
50     \&IO::AIO::poll_cb);
51    
52 root 1.1 =head1 DESCRIPTION
53    
54     This module implements asynchronous I/O using whatever means your
55 root 1.2 operating system supports.
56 root 1.1
57 root 1.85 Asynchronous means that operations that can normally block your program
58     (e.g. reading from disk) will be done asynchronously: the operation
59     will still block, but you can do something else in the meantime. This
60     is extremely useful for programs that need to stay interactive even
61     when doing heavy I/O (GUI programs, high performance network servers
62     etc.), but can also be used to easily do operations in parallel that are
63     normally done sequentially, e.g. stat'ing many files, which is much faster
64     on a RAID volume or over NFS when you do a number of stat operations
65     concurrently.
66    
67 root 1.90 While most of this works on all types of file descriptors (for example
68     sockets), using these functions on file descriptors that support
69     nonblocking operation (again, sockets, pipes etc.) is very inefficient or
70     might not work (aio_read fails on sockets/pipes/fifos). Use an event loop
71     for that (such as the L<Event|Event> module): IO::AIO will naturally fit
72     into such an event loop itself.
73 root 1.85
74 root 1.72 In this version, a number of threads are started that execute your
75     requests and signal their completion. You don't need thread support
76     in perl, and the threads created by this module will not be visible
77     to perl. In the future, this module might make use of the native aio
78     functions available on many operating systems. However, they are often
79 root 1.85 not well-supported or restricted (GNU/Linux doesn't allow them on normal
80 root 1.72 files currently, for example), and they would only support aio_read and
81     aio_write, so the remaining functionality would have to be implemented
82     using threads anyway.
83    
84     Although the module will work with in the presence of other (Perl-)
85     threads, it is currently not reentrant in any way, so use appropriate
86     locking yourself, always call C<poll_cb> from within the same thread, or
87     never call C<poll_cb> (or other C<aio_> functions) recursively.
88    
89 root 1.86 =head2 EXAMPLE
90    
91     This is a simple example that uses the Event module and loads
92     F</etc/passwd> asynchronously:
93    
94     use Fcntl;
95     use Event;
96     use IO::AIO;
97    
98     # register the IO::AIO callback with Event
99     Event->io (fd => IO::AIO::poll_fileno,
100     poll => 'r',
101     cb => \&IO::AIO::poll_cb);
102    
103     # queue the request to open /etc/passwd
104     aio_open "/etc/passwd", O_RDONLY, 0, sub {
105 root 1.94 my $fh = shift
106 root 1.86 or die "error while opening: $!";
107    
108     # stat'ing filehandles is generally non-blocking
109     my $size = -s $fh;
110    
111     # queue a request to read the file
112     my $contents;
113     aio_read $fh, 0, $size, $contents, 0, sub {
114     $_[0] == $size
115     or die "short read: $!";
116    
117     close $fh;
118    
119     # file contents now in $contents
120     print $contents;
121    
122     # exit event loop and program
123     Event::unloop;
124     };
125     };
126    
127     # possibly queue up other requests, or open GUI windows,
128     # check for sockets etc. etc.
129    
130     # process events as long as there are some:
131     Event::loop;
132    
133 root 1.72 =head1 REQUEST ANATOMY AND LIFETIME
134    
135     Every C<aio_*> function creates a request. which is a C data structure not
136     directly visible to Perl.
137    
138     If called in non-void context, every request function returns a Perl
139     object representing the request. In void context, nothing is returned,
140     which saves a bit of memory.
141    
142     The perl object is a fairly standard ref-to-hash object. The hash contents
143     are not used by IO::AIO so you are free to store anything you like in it.
144    
145     During their existance, aio requests travel through the following states,
146     in order:
147    
148     =over 4
149    
150     =item ready
151    
152     Immediately after a request is created it is put into the ready state,
153     waiting for a thread to execute it.
154    
155     =item execute
156    
157     A thread has accepted the request for processing and is currently
158     executing it (e.g. blocking in read).
159    
160     =item pending
161    
162     The request has been executed and is waiting for result processing.
163    
164     While request submission and execution is fully asynchronous, result
165     processing is not and relies on the perl interpreter calling C<poll_cb>
166     (or another function with the same effect).
167    
168     =item result
169    
170     The request results are processed synchronously by C<poll_cb>.
171    
172     The C<poll_cb> function will process all outstanding aio requests by
173     calling their callbacks, freeing memory associated with them and managing
174     any groups they are contained in.
175    
176     =item done
177    
178     Request has reached the end of its lifetime and holds no resources anymore
179     (except possibly for the Perl object, but its connection to the actual
180     aio request is severed and calling its methods will either do nothing or
181     result in a runtime error).
182 root 1.1
183 root 1.88 =back
184    
185 root 1.1 =cut
186    
187     package IO::AIO;
188    
189 root 1.23 no warnings;
190 root 1.51 use strict 'vars';
191 root 1.23
192 root 1.1 use base 'Exporter';
193    
194     BEGIN {
195 root 1.97 our $VERSION = '2.3';
196 root 1.1
197 root 1.67 our @AIO_REQ = qw(aio_sendfile aio_read aio_write aio_open aio_close aio_stat
198     aio_lstat aio_unlink aio_rmdir aio_readdir aio_scandir aio_symlink
199 root 1.90 aio_readlink aio_fsync aio_fdatasync aio_readahead aio_rename aio_link
200     aio_move aio_copy aio_group aio_nop aio_mknod);
201 root 1.95 our @EXPORT = (@AIO_REQ, qw(aioreq_pri aioreq_nice aio_block));
202 root 1.67 our @EXPORT_OK = qw(poll_fileno poll_cb poll_wait flush
203 root 1.86 min_parallel max_parallel max_idle
204     nreqs nready npending nthreads
205     max_poll_time max_poll_reqs);
206 root 1.1
207 root 1.54 @IO::AIO::GRP::ISA = 'IO::AIO::REQ';
208    
209 root 1.1 require XSLoader;
210 root 1.51 XSLoader::load ("IO::AIO", $VERSION);
211 root 1.1 }
212    
213 root 1.5 =head1 FUNCTIONS
214 root 1.1
215 root 1.87 =head2 AIO REQUEST FUNCTIONS
216 root 1.1
217 root 1.5 All the C<aio_*> calls are more or less thin wrappers around the syscall
218     with the same name (sans C<aio_>). The arguments are similar or identical,
219 root 1.14 and they all accept an additional (and optional) C<$callback> argument
220     which must be a code reference. This code reference will get called with
221     the syscall return code (e.g. most syscalls return C<-1> on error, unlike
222     perl, which usually delivers "false") as it's sole argument when the given
223     syscall has been executed asynchronously.
224 root 1.1
225 root 1.23 All functions expecting a filehandle keep a copy of the filehandle
226     internally until the request has finished.
227 root 1.1
228 root 1.87 All functions return request objects of type L<IO::AIO::REQ> that allow
229     further manipulation of those requests while they are in-flight.
230 root 1.52
231 root 1.28 The pathnames you pass to these routines I<must> be absolute and
232 root 1.87 encoded as octets. The reason for the former is that at the time the
233 root 1.28 request is being executed, the current working directory could have
234     changed. Alternatively, you can make sure that you never change the
235 root 1.87 current working directory anywhere in the program and then use relative
236     paths.
237 root 1.28
238 root 1.87 To encode pathnames as octets, either make sure you either: a) always pass
239     in filenames you got from outside (command line, readdir etc.) without
240     tinkering, b) are ASCII or ISO 8859-1, c) use the Encode module and encode
241 root 1.28 your pathnames to the locale (or other) encoding in effect in the user
242     environment, d) use Glib::filename_from_unicode on unicode filenames or e)
243 root 1.87 use something else to ensure your scalar has the correct contents.
244    
245     This works, btw. independent of the internal UTF-8 bit, which IO::AIO
246     handles correctly wether it is set or not.
247 root 1.1
248 root 1.5 =over 4
249 root 1.1
250 root 1.80 =item $prev_pri = aioreq_pri [$pri]
251 root 1.68
252 root 1.80 Returns the priority value that would be used for the next request and, if
253     C<$pri> is given, sets the priority for the next aio request.
254 root 1.68
255 root 1.80 The default priority is C<0>, the minimum and maximum priorities are C<-4>
256     and C<4>, respectively. Requests with higher priority will be serviced
257     first.
258    
259     The priority will be reset to C<0> after each call to one of the C<aio_*>
260 root 1.68 functions.
261    
262 root 1.69 Example: open a file with low priority, then read something from it with
263     higher priority so the read request is serviced before other low priority
264     open requests (potentially spamming the cache):
265    
266     aioreq_pri -3;
267     aio_open ..., sub {
268     return unless $_[0];
269    
270     aioreq_pri -2;
271     aio_read $_[0], ..., sub {
272     ...
273     };
274     };
275    
276     =item aioreq_nice $pri_adjust
277    
278     Similar to C<aioreq_pri>, but subtracts the given value from the current
279 root 1.87 priority, so the effect is cumulative.
280 root 1.69
281 root 1.40 =item aio_open $pathname, $flags, $mode, $callback->($fh)
282 root 1.1
283 root 1.2 Asynchronously open or create a file and call the callback with a newly
284     created filehandle for the file.
285 root 1.1
286     The pathname passed to C<aio_open> must be absolute. See API NOTES, above,
287     for an explanation.
288    
289 root 1.20 The C<$flags> argument is a bitmask. See the C<Fcntl> module for a
290     list. They are the same as used by C<sysopen>.
291    
292     Likewise, C<$mode> specifies the mode of the newly created file, if it
293     didn't exist and C<O_CREAT> has been given, just like perl's C<sysopen>,
294     except that it is mandatory (i.e. use C<0> if you don't create new files,
295     and C<0666> or C<0777> if you do).
296 root 1.1
297     Example:
298    
299     aio_open "/etc/passwd", O_RDONLY, 0, sub {
300 root 1.2 if ($_[0]) {
301     print "open successful, fh is $_[0]\n";
302 root 1.1 ...
303     } else {
304     die "open failed: $!\n";
305     }
306     };
307    
308 root 1.40 =item aio_close $fh, $callback->($status)
309 root 1.1
310 root 1.2 Asynchronously close a file and call the callback with the result
311     code. I<WARNING:> although accepted, you should not pass in a perl
312 root 1.20 filehandle here, as perl will likely close the file descriptor another
313     time when the filehandle is destroyed. Normally, you can safely call perls
314     C<close> or just let filehandles go out of scope.
315    
316     This is supposed to be a bug in the API, so that might change. It's
317     therefore best to avoid this function.
318 root 1.1
319 root 1.40 =item aio_read $fh,$offset,$length, $data,$dataoffset, $callback->($retval)
320 root 1.1
321 root 1.40 =item aio_write $fh,$offset,$length, $data,$dataoffset, $callback->($retval)
322 root 1.1
323     Reads or writes C<length> bytes from the specified C<fh> and C<offset>
324     into the scalar given by C<data> and offset C<dataoffset> and calls the
325     callback without the actual number of bytes read (or -1 on error, just
326     like the syscall).
327    
328 root 1.31 The C<$data> scalar I<MUST NOT> be modified in any way while the request
329     is outstanding. Modifying it can result in segfaults or WW3 (if the
330     necessary/optional hardware is installed).
331    
332 root 1.17 Example: Read 15 bytes at offset 7 into scalar C<$buffer>, starting at
333 root 1.1 offset C<0> within the scalar:
334    
335     aio_read $fh, 7, 15, $buffer, 0, sub {
336 root 1.9 $_[0] > 0 or die "read error: $!";
337     print "read $_[0] bytes: <$buffer>\n";
338 root 1.1 };
339    
340 root 1.40 =item aio_sendfile $out_fh, $in_fh, $in_offset, $length, $callback->($retval)
341 root 1.35
342     Tries to copy C<$length> bytes from C<$in_fh> to C<$out_fh>. It starts
343     reading at byte offset C<$in_offset>, and starts writing at the current
344     file offset of C<$out_fh>. Because of that, it is not safe to issue more
345     than one C<aio_sendfile> per C<$out_fh>, as they will interfere with each
346     other.
347    
348     This call tries to make use of a native C<sendfile> syscall to provide
349     zero-copy operation. For this to work, C<$out_fh> should refer to a
350     socket, and C<$in_fh> should refer to mmap'able file.
351    
352     If the native sendfile call fails or is not implemented, it will be
353 root 1.36 emulated, so you can call C<aio_sendfile> on any type of filehandle
354     regardless of the limitations of the operating system.
355 root 1.35
356     Please note, however, that C<aio_sendfile> can read more bytes from
357     C<$in_fh> than are written, and there is no way to find out how many
358 root 1.36 bytes have been read from C<aio_sendfile> alone, as C<aio_sendfile> only
359     provides the number of bytes written to C<$out_fh>. Only if the result
360     value equals C<$length> one can assume that C<$length> bytes have been
361     read.
362 root 1.35
363 root 1.40 =item aio_readahead $fh,$offset,$length, $callback->($retval)
364 root 1.1
365 root 1.20 C<aio_readahead> populates the page cache with data from a file so that
366 root 1.1 subsequent reads from that file will not block on disk I/O. The C<$offset>
367     argument specifies the starting point from which data is to be read and
368     C<$length> specifies the number of bytes to be read. I/O is performed in
369     whole pages, so that offset is effectively rounded down to a page boundary
370     and bytes are read up to the next page boundary greater than or equal to
371 root 1.20 (off-set+length). C<aio_readahead> does not read beyond the end of the
372 root 1.1 file. The current file offset of the file is left unchanged.
373    
374 root 1.26 If that syscall doesn't exist (likely if your OS isn't Linux) it will be
375     emulated by simply reading the data, which would have a similar effect.
376    
377 root 1.40 =item aio_stat $fh_or_path, $callback->($status)
378 root 1.1
379 root 1.40 =item aio_lstat $fh, $callback->($status)
380 root 1.1
381     Works like perl's C<stat> or C<lstat> in void context. The callback will
382     be called after the stat and the results will be available using C<stat _>
383     or C<-s _> etc...
384    
385     The pathname passed to C<aio_stat> must be absolute. See API NOTES, above,
386     for an explanation.
387    
388     Currently, the stats are always 64-bit-stats, i.e. instead of returning an
389     error when stat'ing a large file, the results will be silently truncated
390     unless perl itself is compiled with large file support.
391    
392     Example: Print the length of F</etc/passwd>:
393    
394     aio_stat "/etc/passwd", sub {
395     $_[0] and die "stat failed: $!";
396     print "size is ", -s _, "\n";
397     };
398    
399 root 1.40 =item aio_unlink $pathname, $callback->($status)
400 root 1.1
401     Asynchronously unlink (delete) a file and call the callback with the
402     result code.
403    
404 root 1.82 =item aio_mknod $path, $mode, $dev, $callback->($status)
405    
406 root 1.86 [EXPERIMENTAL]
407    
408 root 1.83 Asynchronously create a device node (or fifo). See mknod(2).
409    
410 root 1.86 The only (POSIX-) portable way of calling this function is:
411 root 1.83
412     aio_mknod $path, IO::AIO::S_IFIFO | $mode, 0, sub { ...
413 root 1.82
414 root 1.50 =item aio_link $srcpath, $dstpath, $callback->($status)
415    
416     Asynchronously create a new link to the existing object at C<$srcpath> at
417     the path C<$dstpath> and call the callback with the result code.
418    
419     =item aio_symlink $srcpath, $dstpath, $callback->($status)
420    
421     Asynchronously create a new symbolic link to the existing object at C<$srcpath> at
422     the path C<$dstpath> and call the callback with the result code.
423    
424 root 1.90 =item aio_readlink $path, $callback->($link)
425    
426     Asynchronously read the symlink specified by C<$path> and pass it to
427     the callback. If an error occurs, nothing or undef gets passed to the
428     callback.
429    
430 root 1.50 =item aio_rename $srcpath, $dstpath, $callback->($status)
431    
432     Asynchronously rename the object at C<$srcpath> to C<$dstpath>, just as
433     rename(2) and call the callback with the result code.
434    
435 root 1.40 =item aio_rmdir $pathname, $callback->($status)
436 root 1.27
437     Asynchronously rmdir (delete) a directory and call the callback with the
438     result code.
439    
440 root 1.46 =item aio_readdir $pathname, $callback->($entries)
441 root 1.37
442     Unlike the POSIX call of the same name, C<aio_readdir> reads an entire
443     directory (i.e. opendir + readdir + closedir). The entries will not be
444     sorted, and will B<NOT> include the C<.> and C<..> entries.
445    
446     The callback a single argument which is either C<undef> or an array-ref
447     with the filenames.
448    
449 root 1.82 =item aio_copy $srcpath, $dstpath, $callback->($status)
450    
451     Try to copy the I<file> (directories not supported as either source or
452     destination) from C<$srcpath> to C<$dstpath> and call the callback with
453     the C<0> (error) or C<-1> ok.
454    
455     This is a composite request that it creates the destination file with
456     mode 0200 and copies the contents of the source file into it using
457     C<aio_sendfile>, followed by restoring atime, mtime, access mode and
458     uid/gid, in that order.
459    
460     If an error occurs, the partial destination file will be unlinked, if
461     possible, except when setting atime, mtime, access mode and uid/gid, where
462     errors are being ignored.
463    
464     =cut
465    
466     sub aio_copy($$;$) {
467 root 1.95 aio_block {
468     my ($src, $dst, $cb) = @_;
469 root 1.82
470 root 1.95 my $pri = aioreq_pri;
471     my $grp = aio_group $cb;
472 root 1.82
473 root 1.95 aioreq_pri $pri;
474     add $grp aio_open $src, O_RDONLY, 0, sub {
475     if (my $src_fh = $_[0]) {
476     my @stat = stat $src_fh;
477    
478     aioreq_pri $pri;
479     add $grp aio_open $dst, O_CREAT | O_WRONLY | O_TRUNC, 0200, sub {
480     if (my $dst_fh = $_[0]) {
481     aioreq_pri $pri;
482     add $grp aio_sendfile $dst_fh, $src_fh, 0, $stat[7], sub {
483     if ($_[0] == $stat[7]) {
484     $grp->result (0);
485     close $src_fh;
486    
487     # those should not normally block. should. should.
488     utime $stat[8], $stat[9], $dst;
489     chmod $stat[2] & 07777, $dst_fh;
490     chown $stat[4], $stat[5], $dst_fh;
491     close $dst_fh;
492     } else {
493     $grp->result (-1);
494     close $src_fh;
495     close $dst_fh;
496 root 1.82
497 root 1.95 aioreq $pri;
498     add $grp aio_unlink $dst;
499     }
500     };
501     } else {
502     $grp->result (-1);
503     }
504     },
505    
506     } else {
507     $grp->result (-1);
508     }
509     };
510 root 1.82
511 root 1.95 $grp
512     }
513 root 1.82 }
514    
515     =item aio_move $srcpath, $dstpath, $callback->($status)
516    
517     Try to move the I<file> (directories not supported as either source or
518     destination) from C<$srcpath> to C<$dstpath> and call the callback with
519     the C<0> (error) or C<-1> ok.
520    
521     This is a composite request that tries to rename(2) the file first. If
522     rename files with C<EXDEV>, it copies the file with C<aio_copy> and, if
523     that is successful, unlinking the C<$srcpath>.
524    
525     =cut
526    
527     sub aio_move($$;$) {
528 root 1.95 aio_block {
529     my ($src, $dst, $cb) = @_;
530 root 1.82
531 root 1.95 my $pri = aioreq_pri;
532     my $grp = aio_group $cb;
533 root 1.82
534 root 1.95 aioreq_pri $pri;
535     add $grp aio_rename $src, $dst, sub {
536     if ($_[0] && $! == EXDEV) {
537     aioreq_pri $pri;
538     add $grp aio_copy $src, $dst, sub {
539     $grp->result ($_[0]);
540    
541     if (!$_[0]) {
542     aioreq_pri $pri;
543     add $grp aio_unlink $src;
544     }
545     };
546     } else {
547 root 1.82 $grp->result ($_[0]);
548 root 1.95 }
549     };
550 root 1.82
551 root 1.95 $grp
552     }
553 root 1.82 }
554    
555 root 1.40 =item aio_scandir $path, $maxreq, $callback->($dirs, $nondirs)
556    
557 root 1.52 Scans a directory (similar to C<aio_readdir>) but additionally tries to
558 root 1.76 efficiently separate the entries of directory C<$path> into two sets of
559     names, directories you can recurse into (directories), and ones you cannot
560     recurse into (everything else, including symlinks to directories).
561 root 1.52
562 root 1.61 C<aio_scandir> is a composite request that creates of many sub requests_
563     C<$maxreq> specifies the maximum number of outstanding aio requests that
564     this function generates. If it is C<< <= 0 >>, then a suitable default
565 root 1.81 will be chosen (currently 4).
566 root 1.40
567     On error, the callback is called without arguments, otherwise it receives
568     two array-refs with path-relative entry names.
569    
570     Example:
571    
572     aio_scandir $dir, 0, sub {
573     my ($dirs, $nondirs) = @_;
574     print "real directories: @$dirs\n";
575     print "everything else: @$nondirs\n";
576     };
577    
578     Implementation notes.
579    
580     The C<aio_readdir> cannot be avoided, but C<stat()>'ing every entry can.
581    
582     After reading the directory, the modification time, size etc. of the
583 root 1.52 directory before and after the readdir is checked, and if they match (and
584     isn't the current time), the link count will be used to decide how many
585     entries are directories (if >= 2). Otherwise, no knowledge of the number
586     of subdirectories will be assumed.
587    
588     Then entries will be sorted into likely directories (everything without
589     a non-initial dot currently) and likely non-directories (everything
590     else). Then every entry plus an appended C</.> will be C<stat>'ed,
591     likely directories first. If that succeeds, it assumes that the entry
592     is a directory or a symlink to directory (which will be checked
593     seperately). This is often faster than stat'ing the entry itself because
594     filesystems might detect the type of the entry without reading the inode
595     data (e.g. ext2fs filetype feature).
596    
597     If the known number of directories (link count - 2) has been reached, the
598     rest of the entries is assumed to be non-directories.
599    
600     This only works with certainty on POSIX (= UNIX) filesystems, which
601     fortunately are the vast majority of filesystems around.
602    
603     It will also likely work on non-POSIX filesystems with reduced efficiency
604     as those tend to return 0 or 1 as link counts, which disables the
605     directory counting heuristic.
606 root 1.40
607     =cut
608    
609     sub aio_scandir($$$) {
610 root 1.95 aio_block {
611     my ($path, $maxreq, $cb) = @_;
612 root 1.40
613 root 1.95 my $pri = aioreq_pri;
614 root 1.80
615 root 1.95 my $grp = aio_group $cb;
616 root 1.55
617 root 1.95 $maxreq = 4 if $maxreq <= 0;
618 root 1.40
619 root 1.95 # stat once
620 root 1.80 aioreq_pri $pri;
621 root 1.95 add $grp aio_stat $path, sub {
622     return $grp->result () if $_[0];
623     my $now = time;
624     my $hash1 = join ":", (stat _)[0,1,3,7,9];
625 root 1.40
626 root 1.95 # read the directory entries
627 root 1.80 aioreq_pri $pri;
628 root 1.95 add $grp aio_readdir $path, sub {
629     my $entries = shift
630     or return $grp->result ();
631    
632     # stat the dir another time
633     aioreq_pri $pri;
634     add $grp aio_stat $path, sub {
635     my $hash2 = join ":", (stat _)[0,1,3,7,9];
636    
637     my $ndirs;
638    
639     # take the slow route if anything looks fishy
640     if ($hash1 ne $hash2 or (stat _)[9] == $now) {
641     $ndirs = -1;
642     } else {
643     # if nlink == 2, we are finished
644     # on non-posix-fs's, we rely on nlink < 2
645     $ndirs = (stat _)[3] - 2
646     or return $grp->result ([], $entries);
647     }
648    
649     # sort into likely dirs and likely nondirs
650     # dirs == files without ".", short entries first
651     $entries = [map $_->[0],
652     sort { $b->[1] cmp $a->[1] }
653     map [$_, sprintf "%s%04d", (/.\./ ? "1" : "0"), length],
654     @$entries];
655 root 1.40
656 root 1.95 my (@dirs, @nondirs);
657 root 1.40
658 root 1.95 my $statgrp = add $grp aio_group sub {
659     $grp->result (\@dirs, \@nondirs);
660     };
661 root 1.40
662 root 1.95 limit $statgrp $maxreq;
663     feed $statgrp sub {
664     return unless @$entries;
665     my $entry = pop @$entries;
666    
667     aioreq_pri $pri;
668     add $statgrp aio_stat "$path/$entry/.", sub {
669     if ($_[0] < 0) {
670     push @nondirs, $entry;
671     } else {
672     # need to check for real directory
673     aioreq_pri $pri;
674     add $statgrp aio_lstat "$path/$entry", sub {
675     if (-d _) {
676     push @dirs, $entry;
677    
678     unless (--$ndirs) {
679     push @nondirs, @$entries;
680     feed $statgrp;
681     }
682     } else {
683     push @nondirs, $entry;
684 root 1.74 }
685 root 1.40 }
686     }
687 root 1.95 };
688 root 1.74 };
689 root 1.40 };
690     };
691     };
692 root 1.55
693 root 1.95 $grp
694     }
695 root 1.40 }
696    
697     =item aio_fsync $fh, $callback->($status)
698 root 1.1
699     Asynchronously call fsync on the given filehandle and call the callback
700     with the fsync result code.
701    
702 root 1.40 =item aio_fdatasync $fh, $callback->($status)
703 root 1.1
704     Asynchronously call fdatasync on the given filehandle and call the
705 root 1.26 callback with the fdatasync result code.
706    
707     If this call isn't available because your OS lacks it or it couldn't be
708     detected, it will be emulated by calling C<fsync> instead.
709 root 1.1
710 root 1.58 =item aio_group $callback->(...)
711 root 1.54
712 root 1.55 This is a very special aio request: Instead of doing something, it is a
713     container for other aio requests, which is useful if you want to bundle
714 root 1.71 many requests into a single, composite, request with a definite callback
715     and the ability to cancel the whole request with its subrequests.
716 root 1.55
717     Returns an object of class L<IO::AIO::GRP>. See its documentation below
718     for more info.
719    
720     Example:
721    
722     my $grp = aio_group sub {
723     print "all stats done\n";
724     };
725    
726     add $grp
727     (aio_stat ...),
728     (aio_stat ...),
729     ...;
730    
731 root 1.63 =item aio_nop $callback->()
732    
733     This is a special request - it does nothing in itself and is only used for
734     side effects, such as when you want to add a dummy request to a group so
735     that finishing the requests in the group depends on executing the given
736     code.
737    
738 root 1.64 While this request does nothing, it still goes through the execution
739     phase and still requires a worker thread. Thus, the callback will not
740     be executed immediately but only after other requests in the queue have
741     entered their execution phase. This can be used to measure request
742     latency.
743    
744 root 1.71 =item IO::AIO::aio_busy $fractional_seconds, $callback->() *NOT EXPORTED*
745 root 1.54
746     Mainly used for debugging and benchmarking, this aio request puts one of
747     the request workers to sleep for the given time.
748    
749 root 1.56 While it is theoretically handy to have simple I/O scheduling requests
750 root 1.71 like sleep and file handle readable/writable, the overhead this creates is
751     immense (it blocks a thread for a long time) so do not use this function
752     except to put your application under artificial I/O pressure.
753 root 1.56
754 root 1.5 =back
755    
756 root 1.53 =head2 IO::AIO::REQ CLASS
757 root 1.52
758     All non-aggregate C<aio_*> functions return an object of this class when
759     called in non-void context.
760    
761     =over 4
762    
763 root 1.65 =item cancel $req
764 root 1.52
765     Cancels the request, if possible. Has the effect of skipping execution
766     when entering the B<execute> state and skipping calling the callback when
767     entering the the B<result> state, but will leave the request otherwise
768     untouched. That means that requests that currently execute will not be
769     stopped and resources held by the request will not be freed prematurely.
770    
771 root 1.65 =item cb $req $callback->(...)
772    
773     Replace (or simply set) the callback registered to the request.
774    
775 root 1.52 =back
776    
777 root 1.55 =head2 IO::AIO::GRP CLASS
778    
779     This class is a subclass of L<IO::AIO::REQ>, so all its methods apply to
780     objects of this class, too.
781    
782     A IO::AIO::GRP object is a special request that can contain multiple other
783     aio requests.
784    
785     You create one by calling the C<aio_group> constructing function with a
786     callback that will be called when all contained requests have entered the
787     C<done> state:
788    
789     my $grp = aio_group sub {
790     print "all requests are done\n";
791     };
792    
793     You add requests by calling the C<add> method with one or more
794     C<IO::AIO::REQ> objects:
795    
796     $grp->add (aio_unlink "...");
797    
798 root 1.58 add $grp aio_stat "...", sub {
799     $_[0] or return $grp->result ("error");
800    
801     # add another request dynamically, if first succeeded
802     add $grp aio_open "...", sub {
803     $grp->result ("ok");
804     };
805     };
806 root 1.55
807     This makes it very easy to create composite requests (see the source of
808     C<aio_move> for an application) that work and feel like simple requests.
809    
810 root 1.62 =over 4
811    
812     =item * The IO::AIO::GRP objects will be cleaned up during calls to
813 root 1.55 C<IO::AIO::poll_cb>, just like any other request.
814    
815 root 1.62 =item * They can be canceled like any other request. Canceling will cancel not
816 root 1.59 only the request itself, but also all requests it contains.
817 root 1.55
818 root 1.62 =item * They can also can also be added to other IO::AIO::GRP objects.
819 root 1.55
820 root 1.62 =item * You must not add requests to a group from within the group callback (or
821 root 1.60 any later time).
822    
823 root 1.62 =back
824    
825 root 1.55 Their lifetime, simplified, looks like this: when they are empty, they
826     will finish very quickly. If they contain only requests that are in the
827     C<done> state, they will also finish. Otherwise they will continue to
828     exist.
829    
830 root 1.57 That means after creating a group you have some time to add requests. And
831     in the callbacks of those requests, you can add further requests to the
832     group. And only when all those requests have finished will the the group
833     itself finish.
834    
835 root 1.55 =over 4
836    
837 root 1.65 =item add $grp ...
838    
839 root 1.55 =item $grp->add (...)
840    
841 root 1.57 Add one or more requests to the group. Any type of L<IO::AIO::REQ> can
842     be added, including other groups, as long as you do not create circular
843     dependencies.
844    
845     Returns all its arguments.
846 root 1.55
847 root 1.74 =item $grp->cancel_subs
848    
849     Cancel all subrequests and clears any feeder, but not the group request
850     itself. Useful when you queued a lot of events but got a result early.
851    
852 root 1.58 =item $grp->result (...)
853    
854     Set the result value(s) that will be passed to the group callback when all
855 root 1.80 subrequests have finished and set thre groups errno to the current value
856     of errno (just like calling C<errno> without an error number). By default,
857     no argument will be passed and errno is zero.
858    
859     =item $grp->errno ([$errno])
860    
861     Sets the group errno value to C<$errno>, or the current value of errno
862     when the argument is missing.
863    
864     Every aio request has an associated errno value that is restored when
865     the callback is invoked. This method lets you change this value from its
866     default (0).
867    
868     Calling C<result> will also set errno, so make sure you either set C<$!>
869     before the call to C<result>, or call c<errno> after it.
870 root 1.58
871 root 1.65 =item feed $grp $callback->($grp)
872 root 1.60
873     Sets a feeder/generator on this group: every group can have an attached
874     generator that generates requests if idle. The idea behind this is that,
875     although you could just queue as many requests as you want in a group,
876     this might starve other requests for a potentially long time. For
877     example, C<aio_scandir> might generate hundreds of thousands C<aio_stat>
878     requests, delaying any later requests for a long time.
879    
880     To avoid this, and allow incremental generation of requests, you can
881     instead a group and set a feeder on it that generates those requests. The
882 root 1.68 feed callback will be called whenever there are few enough (see C<limit>,
883 root 1.60 below) requests active in the group itself and is expected to queue more
884     requests.
885    
886 root 1.68 The feed callback can queue as many requests as it likes (i.e. C<add> does
887     not impose any limits).
888 root 1.60
889 root 1.65 If the feed does not queue more requests when called, it will be
890 root 1.60 automatically removed from the group.
891    
892 root 1.65 If the feed limit is C<0>, it will be set to C<2> automatically.
893 root 1.60
894     Example:
895    
896     # stat all files in @files, but only ever use four aio requests concurrently:
897    
898     my $grp = aio_group sub { print "finished\n" };
899 root 1.68 limit $grp 4;
900 root 1.65 feed $grp sub {
901 root 1.60 my $file = pop @files
902     or return;
903    
904     add $grp aio_stat $file, sub { ... };
905 root 1.65 };
906 root 1.60
907 root 1.68 =item limit $grp $num
908 root 1.60
909     Sets the feeder limit for the group: The feeder will be called whenever
910     the group contains less than this many requests.
911    
912     Setting the limit to C<0> will pause the feeding process.
913    
914 root 1.55 =back
915    
916 root 1.5 =head2 SUPPORT FUNCTIONS
917    
918 root 1.86 =head3 EVENT PROCESSING AND EVENT LOOP INTEGRATION
919    
920 root 1.5 =over 4
921    
922     =item $fileno = IO::AIO::poll_fileno
923    
924 root 1.20 Return the I<request result pipe file descriptor>. This filehandle must be
925     polled for reading by some mechanism outside this module (e.g. Event or
926     select, see below or the SYNOPSIS). If the pipe becomes readable you have
927     to call C<poll_cb> to check the results.
928 root 1.5
929     See C<poll_cb> for an example.
930    
931     =item IO::AIO::poll_cb
932    
933 root 1.86 Process some outstanding events on the result pipe. You have to call this
934 root 1.5 regularly. Returns the number of events processed. Returns immediately
935 root 1.86 when no events are outstanding. The amount of events processed depends on
936     the settings of C<IO::AIO::max_poll_req> and C<IO::AIO::max_poll_time>.
937 root 1.5
938 root 1.78 If not all requests were processed for whatever reason, the filehandle
939     will still be ready when C<poll_cb> returns.
940    
941 root 1.20 Example: Install an Event watcher that automatically calls
942     IO::AIO::poll_cb with high priority:
943 root 1.5
944     Event->io (fd => IO::AIO::poll_fileno,
945     poll => 'r', async => 1,
946     cb => \&IO::AIO::poll_cb);
947    
948 root 1.86 =item IO::AIO::max_poll_reqs $nreqs
949    
950     =item IO::AIO::max_poll_time $seconds
951    
952     These set the maximum number of requests (default C<0>, meaning infinity)
953     that are being processed by C<IO::AIO::poll_cb> in one call, respectively
954     the maximum amount of time (default C<0>, meaning infinity) spent in
955     C<IO::AIO::poll_cb> to process requests (more correctly the mininum amount
956     of time C<poll_cb> is allowed to use).
957 root 1.78
958 root 1.89 Setting C<max_poll_time> to a non-zero value creates an overhead of one
959     syscall per request processed, which is not normally a problem unless your
960     callbacks are really really fast or your OS is really really slow (I am
961     not mentioning Solaris here). Using C<max_poll_reqs> incurs no overhead.
962    
963 root 1.86 Setting these is useful if you want to ensure some level of
964     interactiveness when perl is not fast enough to process all requests in
965     time.
966 root 1.78
967 root 1.86 For interactive programs, values such as C<0.01> to C<0.1> should be fine.
968 root 1.78
969     Example: Install an Event watcher that automatically calls
970 root 1.89 IO::AIO::poll_cb with low priority, to ensure that other parts of the
971 root 1.78 program get the CPU sometimes even under high AIO load.
972    
973 root 1.86 # try not to spend much more than 0.1s in poll_cb
974     IO::AIO::max_poll_time 0.1;
975    
976     # use a low priority so other tasks have priority
977 root 1.78 Event->io (fd => IO::AIO::poll_fileno,
978     poll => 'r', nice => 1,
979 root 1.86 cb => &IO::AIO::poll_cb);
980 root 1.78
981 root 1.5 =item IO::AIO::poll_wait
982    
983 root 1.93 If there are any outstanding requests and none of them in the result
984     phase, wait till the result filehandle becomes ready for reading (simply
985     does a C<select> on the filehandle. This is useful if you want to
986     synchronously wait for some requests to finish).
987 root 1.5
988     See C<nreqs> for an example.
989    
990 root 1.86 =item IO::AIO::poll
991 root 1.5
992 root 1.86 Waits until some requests have been handled.
993 root 1.5
994 root 1.92 Returns the number of requests processed, but is otherwise strictly
995     equivalent to:
996 root 1.5
997     IO::AIO::poll_wait, IO::AIO::poll_cb
998 root 1.80
999 root 1.12 =item IO::AIO::flush
1000    
1001     Wait till all outstanding AIO requests have been handled.
1002    
1003 root 1.13 Strictly equivalent to:
1004    
1005     IO::AIO::poll_wait, IO::AIO::poll_cb
1006     while IO::AIO::nreqs;
1007    
1008 root 1.86 =head3 CONTROLLING THE NUMBER OF THREADS
1009 root 1.13
1010 root 1.5 =item IO::AIO::min_parallel $nthreads
1011    
1012 root 1.61 Set the minimum number of AIO threads to C<$nthreads>. The current
1013     default is C<8>, which means eight asynchronous operations can execute
1014     concurrently at any one time (the number of outstanding requests,
1015     however, is unlimited).
1016 root 1.5
1017 root 1.34 IO::AIO starts threads only on demand, when an AIO request is queued and
1018 root 1.86 no free thread exists. Please note that queueing up a hundred requests can
1019     create demand for a hundred threads, even if it turns out that everything
1020     is in the cache and could have been processed faster by a single thread.
1021 root 1.34
1022 root 1.61 It is recommended to keep the number of threads relatively low, as some
1023     Linux kernel versions will scale negatively with the number of threads
1024     (higher parallelity => MUCH higher latency). With current Linux 2.6
1025     versions, 4-32 threads should be fine.
1026 root 1.5
1027 root 1.34 Under most circumstances you don't need to call this function, as the
1028     module selects a default that is suitable for low to moderate load.
1029 root 1.5
1030     =item IO::AIO::max_parallel $nthreads
1031    
1032 root 1.34 Sets the maximum number of AIO threads to C<$nthreads>. If more than the
1033     specified number of threads are currently running, this function kills
1034     them. This function blocks until the limit is reached.
1035    
1036     While C<$nthreads> are zero, aio requests get queued but not executed
1037     until the number of threads has been increased again.
1038 root 1.5
1039     This module automatically runs C<max_parallel 0> at program end, to ensure
1040     that all threads are killed and that there are no outstanding requests.
1041    
1042     Under normal circumstances you don't need to call this function.
1043    
1044 root 1.86 =item IO::AIO::max_idle $nthreads
1045    
1046     Limit the number of threads (default: 4) that are allowed to idle (i.e.,
1047     threads that did not get a request to process within 10 seconds). That
1048     means if a thread becomes idle while C<$nthreads> other threads are also
1049     idle, it will free its resources and exit.
1050    
1051     This is useful when you allow a large number of threads (e.g. 100 or 1000)
1052     to allow for extremely high load situations, but want to free resources
1053     under normal circumstances (1000 threads can easily consume 30MB of RAM).
1054    
1055     The default is probably ok in most situations, especially if thread
1056     creation is fast. If thread creation is very slow on your system you might
1057     want to use larger values.
1058    
1059 root 1.79 =item $oldmaxreqs = IO::AIO::max_outstanding $maxreqs
1060 root 1.5
1061 root 1.79 This is a very bad function to use in interactive programs because it
1062     blocks, and a bad way to reduce concurrency because it is inexact: Better
1063     use an C<aio_group> together with a feed callback.
1064    
1065     Sets the maximum number of outstanding requests to C<$nreqs>. If you
1066     to queue up more than this number of requests, the next call to the
1067     C<poll_cb> (and C<poll_some> and other functions calling C<poll_cb>)
1068     function will block until the limit is no longer exceeded.
1069    
1070     The default value is very large, so there is no practical limit on the
1071     number of outstanding requests.
1072    
1073     You can still queue as many requests as you want. Therefore,
1074     C<max_oustsanding> is mainly useful in simple scripts (with low values) or
1075     as a stop gap to shield against fatal memory overflow (with large values).
1076 root 1.5
1077 root 1.86 =head3 STATISTICAL INFORMATION
1078    
1079     =item IO::AIO::nreqs
1080    
1081     Returns the number of requests currently in the ready, execute or pending
1082     states (i.e. for which their callback has not been invoked yet).
1083    
1084     Example: wait till there are no outstanding requests anymore:
1085    
1086     IO::AIO::poll_wait, IO::AIO::poll_cb
1087     while IO::AIO::nreqs;
1088    
1089     =item IO::AIO::nready
1090    
1091     Returns the number of requests currently in the ready state (not yet
1092     executed).
1093    
1094     =item IO::AIO::npending
1095    
1096     Returns the number of requests currently in the pending state (executed,
1097     but not yet processed by poll_cb).
1098    
1099 root 1.5 =back
1100    
1101 root 1.1 =cut
1102    
1103 root 1.2 # support function to convert a fd into a perl filehandle
1104     sub _fd2fh {
1105     return undef if $_[0] < 0;
1106    
1107 root 1.23 # try to generate nice filehandles
1108     my $sym = "IO::AIO::fd#$_[0]";
1109     local *$sym;
1110 root 1.25
1111 root 1.27 open *$sym, "+<&=$_[0]" # usually works under any unix
1112     or open *$sym, "<&=$_[0]" # cygwin needs this
1113     or open *$sym, ">&=$_[0]" # or this
1114 root 1.2 or return undef;
1115    
1116 root 1.23 *$sym
1117 root 1.2 }
1118    
1119 root 1.61 min_parallel 8;
1120 root 1.1
1121 root 1.95 END { flush }
1122 root 1.82
1123 root 1.1 1;
1124    
1125 root 1.27 =head2 FORK BEHAVIOUR
1126    
1127 root 1.52 This module should do "the right thing" when the process using it forks:
1128    
1129 root 1.34 Before the fork, IO::AIO enters a quiescent state where no requests
1130     can be added in other threads and no results will be processed. After
1131     the fork the parent simply leaves the quiescent state and continues
1132 root 1.72 request/result processing, while the child frees the request/result queue
1133     (so that the requests started before the fork will only be handled in the
1134     parent). Threads will be started on demand until the limit set in the
1135 root 1.34 parent process has been reached again.
1136 root 1.27
1137 root 1.52 In short: the parent will, after a short pause, continue as if fork had
1138     not been called, while the child will act as if IO::AIO has not been used
1139     yet.
1140    
1141 root 1.60 =head2 MEMORY USAGE
1142    
1143 root 1.72 Per-request usage:
1144    
1145     Each aio request uses - depending on your architecture - around 100-200
1146     bytes of memory. In addition, stat requests need a stat buffer (possibly
1147     a few hundred bytes), readdir requires a result buffer and so on. Perl
1148     scalars and other data passed into aio requests will also be locked and
1149     will consume memory till the request has entered the done state.
1150 root 1.60
1151     This is now awfully much, so queuing lots of requests is not usually a
1152     problem.
1153    
1154 root 1.72 Per-thread usage:
1155    
1156     In the execution phase, some aio requests require more memory for
1157     temporary buffers, and each thread requires a stack and other data
1158     structures (usually around 16k-128k, depending on the OS).
1159    
1160     =head1 KNOWN BUGS
1161    
1162 root 1.73 Known bugs will be fixed in the next release.
1163 root 1.60
1164 root 1.1 =head1 SEE ALSO
1165    
1166 root 1.68 L<Coro::AIO>.
1167 root 1.1
1168     =head1 AUTHOR
1169    
1170     Marc Lehmann <schmorp@schmorp.de>
1171     http://home.schmorp.de/
1172    
1173     =cut
1174