ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/IO-AIO/AIO.pm
Revision: 1.85
Committed: Sat Oct 28 01:40:30 2006 UTC (17 years, 7 months ago) by root
Branch: MAIN
Changes since 1.84: +17 -1 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3     IO::AIO - Asynchronous Input/Output
4    
5     =head1 SYNOPSIS
6    
7     use IO::AIO;
8    
9 root 1.6 aio_open "/etc/passwd", O_RDONLY, 0, sub {
10     my ($fh) = @_;
11     ...
12     };
13    
14     aio_unlink "/tmp/file", sub { };
15    
16     aio_read $fh, 30000, 1024, $buffer, 0, sub {
17 root 1.8 $_[0] > 0 or die "read error: $!";
18 root 1.6 };
19    
20 root 1.56 # version 2+ has request and group objects
21     use IO::AIO 2;
22 root 1.52
23 root 1.68 aioreq_pri 4; # give next request a very high priority
24 root 1.52 my $req = aio_unlink "/tmp/file", sub { };
25     $req->cancel; # cancel request if still in queue
26    
27 root 1.56 my $grp = aio_group sub { print "all stats done\n" };
28     add $grp aio_stat "..." for ...;
29    
30     # AnyEvent integration
31 root 1.42 open my $fh, "<&=" . IO::AIO::poll_fileno or die "$!";
32     my $w = AnyEvent->io (fh => $fh, poll => 'r', cb => sub { IO::AIO::poll_cb });
33    
34 root 1.56 # Event integration
35 root 1.6 Event->io (fd => IO::AIO::poll_fileno,
36 root 1.7 poll => 'r',
37 root 1.6 cb => \&IO::AIO::poll_cb);
38    
39 root 1.56 # Glib/Gtk2 integration
40 root 1.6 add_watch Glib::IO IO::AIO::poll_fileno,
41 root 1.22 in => sub { IO::AIO::poll_cb; 1 };
42 root 1.6
43 root 1.56 # Tk integration
44 root 1.6 Tk::Event::IO->fileevent (IO::AIO::poll_fileno, "",
45     readable => \&IO::AIO::poll_cb);
46    
47 root 1.56 # Danga::Socket integration
48 root 1.11 Danga::Socket->AddOtherFds (IO::AIO::poll_fileno =>
49     \&IO::AIO::poll_cb);
50    
51 root 1.1 =head1 DESCRIPTION
52    
53     This module implements asynchronous I/O using whatever means your
54 root 1.2 operating system supports.
55 root 1.1
56 root 1.85 Asynchronous means that operations that can normally block your program
57     (e.g. reading from disk) will be done asynchronously: the operation
58     will still block, but you can do something else in the meantime. This
59     is extremely useful for programs that need to stay interactive even
60     when doing heavy I/O (GUI programs, high performance network servers
61     etc.), but can also be used to easily do operations in parallel that are
62     normally done sequentially, e.g. stat'ing many files, which is much faster
63     on a RAID volume or over NFS when you do a number of stat operations
64     concurrently.
65    
66     While this works on all types of file descriptors (for example sockets),
67     using these functions on file descriptors that support nonblocking
68     operation (again, sockets, pipes etc.) is very inefficient. Use an event
69     loop for that (such as the L<Event|Event> module): IO::AIO will naturally
70     fit into such an event loop itself.
71    
72 root 1.72 In this version, a number of threads are started that execute your
73     requests and signal their completion. You don't need thread support
74     in perl, and the threads created by this module will not be visible
75     to perl. In the future, this module might make use of the native aio
76     functions available on many operating systems. However, they are often
77 root 1.85 not well-supported or restricted (GNU/Linux doesn't allow them on normal
78 root 1.72 files currently, for example), and they would only support aio_read and
79     aio_write, so the remaining functionality would have to be implemented
80     using threads anyway.
81    
82     Although the module will work with in the presence of other (Perl-)
83     threads, it is currently not reentrant in any way, so use appropriate
84     locking yourself, always call C<poll_cb> from within the same thread, or
85     never call C<poll_cb> (or other C<aio_> functions) recursively.
86    
87     =head1 REQUEST ANATOMY AND LIFETIME
88    
89     Every C<aio_*> function creates a request. which is a C data structure not
90     directly visible to Perl.
91    
92     If called in non-void context, every request function returns a Perl
93     object representing the request. In void context, nothing is returned,
94     which saves a bit of memory.
95    
96     The perl object is a fairly standard ref-to-hash object. The hash contents
97     are not used by IO::AIO so you are free to store anything you like in it.
98    
99     During their existance, aio requests travel through the following states,
100     in order:
101    
102     =over 4
103    
104     =item ready
105    
106     Immediately after a request is created it is put into the ready state,
107     waiting for a thread to execute it.
108    
109     =item execute
110    
111     A thread has accepted the request for processing and is currently
112     executing it (e.g. blocking in read).
113    
114     =item pending
115    
116     The request has been executed and is waiting for result processing.
117    
118     While request submission and execution is fully asynchronous, result
119     processing is not and relies on the perl interpreter calling C<poll_cb>
120     (or another function with the same effect).
121    
122     =item result
123    
124     The request results are processed synchronously by C<poll_cb>.
125    
126     The C<poll_cb> function will process all outstanding aio requests by
127     calling their callbacks, freeing memory associated with them and managing
128     any groups they are contained in.
129    
130     =item done
131    
132     Request has reached the end of its lifetime and holds no resources anymore
133     (except possibly for the Perl object, but its connection to the actual
134     aio request is severed and calling its methods will either do nothing or
135     result in a runtime error).
136 root 1.1
137     =cut
138    
139     package IO::AIO;
140    
141 root 1.23 no warnings;
142 root 1.51 use strict 'vars';
143 root 1.23
144 root 1.1 use base 'Exporter';
145    
146     BEGIN {
147 root 1.55 our $VERSION = '2.0';
148 root 1.1
149 root 1.67 our @AIO_REQ = qw(aio_sendfile aio_read aio_write aio_open aio_close aio_stat
150     aio_lstat aio_unlink aio_rmdir aio_readdir aio_scandir aio_symlink
151     aio_fsync aio_fdatasync aio_readahead aio_rename aio_link aio_move
152 root 1.82 aio_copy aio_group aio_nop aio_mknod);
153 root 1.70 our @EXPORT = (@AIO_REQ, qw(aioreq_pri aioreq_nice));
154 root 1.67 our @EXPORT_OK = qw(poll_fileno poll_cb poll_wait flush
155 root 1.80 min_parallel max_parallel nreqs nready npending);
156 root 1.1
157 root 1.54 @IO::AIO::GRP::ISA = 'IO::AIO::REQ';
158    
159 root 1.1 require XSLoader;
160 root 1.51 XSLoader::load ("IO::AIO", $VERSION);
161 root 1.1 }
162    
163 root 1.5 =head1 FUNCTIONS
164 root 1.1
165 root 1.5 =head2 AIO FUNCTIONS
166 root 1.1
167 root 1.5 All the C<aio_*> calls are more or less thin wrappers around the syscall
168     with the same name (sans C<aio_>). The arguments are similar or identical,
169 root 1.14 and they all accept an additional (and optional) C<$callback> argument
170     which must be a code reference. This code reference will get called with
171     the syscall return code (e.g. most syscalls return C<-1> on error, unlike
172     perl, which usually delivers "false") as it's sole argument when the given
173     syscall has been executed asynchronously.
174 root 1.1
175 root 1.23 All functions expecting a filehandle keep a copy of the filehandle
176     internally until the request has finished.
177 root 1.1
178 root 1.55 All requests return objects of type L<IO::AIO::REQ> that allow further
179     manipulation of those requests while they are in-flight.
180 root 1.52
181 root 1.28 The pathnames you pass to these routines I<must> be absolute and
182     encoded in byte form. The reason for the former is that at the time the
183     request is being executed, the current working directory could have
184     changed. Alternatively, you can make sure that you never change the
185     current working directory.
186    
187     To encode pathnames to byte form, either make sure you either: a)
188     always pass in filenames you got from outside (command line, readdir
189     etc.), b) are ASCII or ISO 8859-1, c) use the Encode module and encode
190     your pathnames to the locale (or other) encoding in effect in the user
191     environment, d) use Glib::filename_from_unicode on unicode filenames or e)
192     use something else.
193 root 1.1
194 root 1.5 =over 4
195 root 1.1
196 root 1.80 =item $prev_pri = aioreq_pri [$pri]
197 root 1.68
198 root 1.80 Returns the priority value that would be used for the next request and, if
199     C<$pri> is given, sets the priority for the next aio request.
200 root 1.68
201 root 1.80 The default priority is C<0>, the minimum and maximum priorities are C<-4>
202     and C<4>, respectively. Requests with higher priority will be serviced
203     first.
204    
205     The priority will be reset to C<0> after each call to one of the C<aio_*>
206 root 1.68 functions.
207    
208 root 1.69 Example: open a file with low priority, then read something from it with
209     higher priority so the read request is serviced before other low priority
210     open requests (potentially spamming the cache):
211    
212     aioreq_pri -3;
213     aio_open ..., sub {
214     return unless $_[0];
215    
216     aioreq_pri -2;
217     aio_read $_[0], ..., sub {
218     ...
219     };
220     };
221    
222     =item aioreq_nice $pri_adjust
223    
224     Similar to C<aioreq_pri>, but subtracts the given value from the current
225     priority, so effects are cumulative.
226    
227 root 1.40 =item aio_open $pathname, $flags, $mode, $callback->($fh)
228 root 1.1
229 root 1.2 Asynchronously open or create a file and call the callback with a newly
230     created filehandle for the file.
231 root 1.1
232     The pathname passed to C<aio_open> must be absolute. See API NOTES, above,
233     for an explanation.
234    
235 root 1.20 The C<$flags> argument is a bitmask. See the C<Fcntl> module for a
236     list. They are the same as used by C<sysopen>.
237    
238     Likewise, C<$mode> specifies the mode of the newly created file, if it
239     didn't exist and C<O_CREAT> has been given, just like perl's C<sysopen>,
240     except that it is mandatory (i.e. use C<0> if you don't create new files,
241     and C<0666> or C<0777> if you do).
242 root 1.1
243     Example:
244    
245     aio_open "/etc/passwd", O_RDONLY, 0, sub {
246 root 1.2 if ($_[0]) {
247     print "open successful, fh is $_[0]\n";
248 root 1.1 ...
249     } else {
250     die "open failed: $!\n";
251     }
252     };
253    
254 root 1.40 =item aio_close $fh, $callback->($status)
255 root 1.1
256 root 1.2 Asynchronously close a file and call the callback with the result
257     code. I<WARNING:> although accepted, you should not pass in a perl
258 root 1.20 filehandle here, as perl will likely close the file descriptor another
259     time when the filehandle is destroyed. Normally, you can safely call perls
260     C<close> or just let filehandles go out of scope.
261    
262     This is supposed to be a bug in the API, so that might change. It's
263     therefore best to avoid this function.
264 root 1.1
265 root 1.40 =item aio_read $fh,$offset,$length, $data,$dataoffset, $callback->($retval)
266 root 1.1
267 root 1.40 =item aio_write $fh,$offset,$length, $data,$dataoffset, $callback->($retval)
268 root 1.1
269     Reads or writes C<length> bytes from the specified C<fh> and C<offset>
270     into the scalar given by C<data> and offset C<dataoffset> and calls the
271     callback without the actual number of bytes read (or -1 on error, just
272     like the syscall).
273    
274 root 1.31 The C<$data> scalar I<MUST NOT> be modified in any way while the request
275     is outstanding. Modifying it can result in segfaults or WW3 (if the
276     necessary/optional hardware is installed).
277    
278 root 1.17 Example: Read 15 bytes at offset 7 into scalar C<$buffer>, starting at
279 root 1.1 offset C<0> within the scalar:
280    
281     aio_read $fh, 7, 15, $buffer, 0, sub {
282 root 1.9 $_[0] > 0 or die "read error: $!";
283     print "read $_[0] bytes: <$buffer>\n";
284 root 1.1 };
285    
286 root 1.40 =item aio_sendfile $out_fh, $in_fh, $in_offset, $length, $callback->($retval)
287 root 1.35
288     Tries to copy C<$length> bytes from C<$in_fh> to C<$out_fh>. It starts
289     reading at byte offset C<$in_offset>, and starts writing at the current
290     file offset of C<$out_fh>. Because of that, it is not safe to issue more
291     than one C<aio_sendfile> per C<$out_fh>, as they will interfere with each
292     other.
293    
294     This call tries to make use of a native C<sendfile> syscall to provide
295     zero-copy operation. For this to work, C<$out_fh> should refer to a
296     socket, and C<$in_fh> should refer to mmap'able file.
297    
298     If the native sendfile call fails or is not implemented, it will be
299 root 1.36 emulated, so you can call C<aio_sendfile> on any type of filehandle
300     regardless of the limitations of the operating system.
301 root 1.35
302     Please note, however, that C<aio_sendfile> can read more bytes from
303     C<$in_fh> than are written, and there is no way to find out how many
304 root 1.36 bytes have been read from C<aio_sendfile> alone, as C<aio_sendfile> only
305     provides the number of bytes written to C<$out_fh>. Only if the result
306     value equals C<$length> one can assume that C<$length> bytes have been
307     read.
308 root 1.35
309 root 1.40 =item aio_readahead $fh,$offset,$length, $callback->($retval)
310 root 1.1
311 root 1.20 C<aio_readahead> populates the page cache with data from a file so that
312 root 1.1 subsequent reads from that file will not block on disk I/O. The C<$offset>
313     argument specifies the starting point from which data is to be read and
314     C<$length> specifies the number of bytes to be read. I/O is performed in
315     whole pages, so that offset is effectively rounded down to a page boundary
316     and bytes are read up to the next page boundary greater than or equal to
317 root 1.20 (off-set+length). C<aio_readahead> does not read beyond the end of the
318 root 1.1 file. The current file offset of the file is left unchanged.
319    
320 root 1.26 If that syscall doesn't exist (likely if your OS isn't Linux) it will be
321     emulated by simply reading the data, which would have a similar effect.
322    
323 root 1.40 =item aio_stat $fh_or_path, $callback->($status)
324 root 1.1
325 root 1.40 =item aio_lstat $fh, $callback->($status)
326 root 1.1
327     Works like perl's C<stat> or C<lstat> in void context. The callback will
328     be called after the stat and the results will be available using C<stat _>
329     or C<-s _> etc...
330    
331     The pathname passed to C<aio_stat> must be absolute. See API NOTES, above,
332     for an explanation.
333    
334     Currently, the stats are always 64-bit-stats, i.e. instead of returning an
335     error when stat'ing a large file, the results will be silently truncated
336     unless perl itself is compiled with large file support.
337    
338     Example: Print the length of F</etc/passwd>:
339    
340     aio_stat "/etc/passwd", sub {
341     $_[0] and die "stat failed: $!";
342     print "size is ", -s _, "\n";
343     };
344    
345 root 1.40 =item aio_unlink $pathname, $callback->($status)
346 root 1.1
347     Asynchronously unlink (delete) a file and call the callback with the
348     result code.
349    
350 root 1.82 =item aio_mknod $path, $mode, $dev, $callback->($status)
351    
352 root 1.83 Asynchronously create a device node (or fifo). See mknod(2).
353    
354     The only portable (POSIX) way of calling this function is:
355    
356     aio_mknod $path, IO::AIO::S_IFIFO | $mode, 0, sub { ...
357 root 1.82
358 root 1.50 =item aio_link $srcpath, $dstpath, $callback->($status)
359    
360     Asynchronously create a new link to the existing object at C<$srcpath> at
361     the path C<$dstpath> and call the callback with the result code.
362    
363     =item aio_symlink $srcpath, $dstpath, $callback->($status)
364    
365     Asynchronously create a new symbolic link to the existing object at C<$srcpath> at
366     the path C<$dstpath> and call the callback with the result code.
367    
368     =item aio_rename $srcpath, $dstpath, $callback->($status)
369    
370     Asynchronously rename the object at C<$srcpath> to C<$dstpath>, just as
371     rename(2) and call the callback with the result code.
372    
373 root 1.40 =item aio_rmdir $pathname, $callback->($status)
374 root 1.27
375     Asynchronously rmdir (delete) a directory and call the callback with the
376     result code.
377    
378 root 1.46 =item aio_readdir $pathname, $callback->($entries)
379 root 1.37
380     Unlike the POSIX call of the same name, C<aio_readdir> reads an entire
381     directory (i.e. opendir + readdir + closedir). The entries will not be
382     sorted, and will B<NOT> include the C<.> and C<..> entries.
383    
384     The callback a single argument which is either C<undef> or an array-ref
385     with the filenames.
386    
387 root 1.82 =item aio_copy $srcpath, $dstpath, $callback->($status)
388    
389     Try to copy the I<file> (directories not supported as either source or
390     destination) from C<$srcpath> to C<$dstpath> and call the callback with
391     the C<0> (error) or C<-1> ok.
392    
393     This is a composite request that it creates the destination file with
394     mode 0200 and copies the contents of the source file into it using
395     C<aio_sendfile>, followed by restoring atime, mtime, access mode and
396     uid/gid, in that order.
397    
398     If an error occurs, the partial destination file will be unlinked, if
399     possible, except when setting atime, mtime, access mode and uid/gid, where
400     errors are being ignored.
401    
402     =cut
403    
404     sub aio_copy($$;$) {
405     my ($src, $dst, $cb) = @_;
406    
407     my $pri = aioreq_pri;
408     my $grp = aio_group $cb;
409    
410     aioreq_pri $pri;
411     add $grp aio_open $src, O_RDONLY, 0, sub {
412     if (my $src_fh = $_[0]) {
413     my @stat = stat $src_fh;
414    
415     aioreq_pri $pri;
416     add $grp aio_open $dst, O_CREAT | O_WRONLY | O_TRUNC, 0200, sub {
417     if (my $dst_fh = $_[0]) {
418     aioreq_pri $pri;
419     add $grp aio_sendfile $dst_fh, $src_fh, 0, $stat[7], sub {
420     if ($_[0] == $stat[7]) {
421     $grp->result (0);
422     close $src_fh;
423    
424     # those should not normally block. should. should.
425     utime $stat[8], $stat[9], $dst;
426     chmod $stat[2] & 07777, $dst_fh;
427     chown $stat[4], $stat[5], $dst_fh;
428     close $dst_fh;
429     } else {
430     $grp->result (-1);
431     close $src_fh;
432     close $dst_fh;
433    
434     aioreq $pri;
435     add $grp aio_unlink $dst;
436     }
437     };
438     } else {
439     $grp->result (-1);
440     }
441     },
442    
443     } else {
444     $grp->result (-1);
445     }
446     };
447    
448     $grp
449     }
450    
451     =item aio_move $srcpath, $dstpath, $callback->($status)
452    
453     Try to move the I<file> (directories not supported as either source or
454     destination) from C<$srcpath> to C<$dstpath> and call the callback with
455     the C<0> (error) or C<-1> ok.
456    
457     This is a composite request that tries to rename(2) the file first. If
458     rename files with C<EXDEV>, it copies the file with C<aio_copy> and, if
459     that is successful, unlinking the C<$srcpath>.
460    
461     =cut
462    
463     sub aio_move($$;$) {
464     my ($src, $dst, $cb) = @_;
465    
466     my $pri = aioreq_pri;
467     my $grp = aio_group $cb;
468    
469     aioreq_pri $pri;
470     add $grp aio_rename $src, $dst, sub {
471     if ($_[0] && $! == EXDEV) {
472     aioreq_pri $pri;
473     add $grp aio_copy $src, $dst, sub {
474     $grp->result ($_[0]);
475    
476     if (!$_[0]) {
477     aioreq_pri $pri;
478     add $grp aio_unlink $src;
479     }
480     };
481     } else {
482     $grp->result ($_[0]);
483     }
484     };
485    
486     $grp
487     }
488    
489 root 1.40 =item aio_scandir $path, $maxreq, $callback->($dirs, $nondirs)
490    
491 root 1.52 Scans a directory (similar to C<aio_readdir>) but additionally tries to
492 root 1.76 efficiently separate the entries of directory C<$path> into two sets of
493     names, directories you can recurse into (directories), and ones you cannot
494     recurse into (everything else, including symlinks to directories).
495 root 1.52
496 root 1.61 C<aio_scandir> is a composite request that creates of many sub requests_
497     C<$maxreq> specifies the maximum number of outstanding aio requests that
498     this function generates. If it is C<< <= 0 >>, then a suitable default
499 root 1.81 will be chosen (currently 4).
500 root 1.40
501     On error, the callback is called without arguments, otherwise it receives
502     two array-refs with path-relative entry names.
503    
504     Example:
505    
506     aio_scandir $dir, 0, sub {
507     my ($dirs, $nondirs) = @_;
508     print "real directories: @$dirs\n";
509     print "everything else: @$nondirs\n";
510     };
511    
512     Implementation notes.
513    
514     The C<aio_readdir> cannot be avoided, but C<stat()>'ing every entry can.
515    
516     After reading the directory, the modification time, size etc. of the
517 root 1.52 directory before and after the readdir is checked, and if they match (and
518     isn't the current time), the link count will be used to decide how many
519     entries are directories (if >= 2). Otherwise, no knowledge of the number
520     of subdirectories will be assumed.
521    
522     Then entries will be sorted into likely directories (everything without
523     a non-initial dot currently) and likely non-directories (everything
524     else). Then every entry plus an appended C</.> will be C<stat>'ed,
525     likely directories first. If that succeeds, it assumes that the entry
526     is a directory or a symlink to directory (which will be checked
527     seperately). This is often faster than stat'ing the entry itself because
528     filesystems might detect the type of the entry without reading the inode
529     data (e.g. ext2fs filetype feature).
530    
531     If the known number of directories (link count - 2) has been reached, the
532     rest of the entries is assumed to be non-directories.
533    
534     This only works with certainty on POSIX (= UNIX) filesystems, which
535     fortunately are the vast majority of filesystems around.
536    
537     It will also likely work on non-POSIX filesystems with reduced efficiency
538     as those tend to return 0 or 1 as link counts, which disables the
539     directory counting heuristic.
540 root 1.40
541     =cut
542    
543     sub aio_scandir($$$) {
544     my ($path, $maxreq, $cb) = @_;
545    
546 root 1.80 my $pri = aioreq_pri;
547    
548 root 1.58 my $grp = aio_group $cb;
549 root 1.55
550 root 1.81 $maxreq = 4 if $maxreq <= 0;
551 root 1.40
552     # stat once
553 root 1.80 aioreq_pri $pri;
554 root 1.55 add $grp aio_stat $path, sub {
555 root 1.58 return $grp->result () if $_[0];
556 root 1.52 my $now = time;
557 root 1.40 my $hash1 = join ":", (stat _)[0,1,3,7,9];
558    
559     # read the directory entries
560 root 1.80 aioreq_pri $pri;
561 root 1.55 add $grp aio_readdir $path, sub {
562 root 1.40 my $entries = shift
563 root 1.58 or return $grp->result ();
564 root 1.40
565     # stat the dir another time
566 root 1.80 aioreq_pri $pri;
567 root 1.55 add $grp aio_stat $path, sub {
568 root 1.40 my $hash2 = join ":", (stat _)[0,1,3,7,9];
569    
570     my $ndirs;
571    
572     # take the slow route if anything looks fishy
573 root 1.52 if ($hash1 ne $hash2 or (stat _)[9] == $now) {
574 root 1.40 $ndirs = -1;
575     } else {
576     # if nlink == 2, we are finished
577     # on non-posix-fs's, we rely on nlink < 2
578     $ndirs = (stat _)[3] - 2
579 root 1.58 or return $grp->result ([], $entries);
580 root 1.40 }
581    
582     # sort into likely dirs and likely nondirs
583     # dirs == files without ".", short entries first
584     $entries = [map $_->[0],
585     sort { $b->[1] cmp $a->[1] }
586     map [$_, sprintf "%s%04d", (/.\./ ? "1" : "0"), length],
587     @$entries];
588    
589     my (@dirs, @nondirs);
590    
591 root 1.74 my $statgrp = add $grp aio_group sub {
592     $grp->result (\@dirs, \@nondirs);
593 root 1.40 };
594    
595 root 1.74 limit $statgrp $maxreq;
596     feed $statgrp sub {
597     return unless @$entries;
598     my $entry = pop @$entries;
599    
600 root 1.80 aioreq_pri $pri;
601 root 1.74 add $statgrp aio_stat "$path/$entry/.", sub {
602     if ($_[0] < 0) {
603     push @nondirs, $entry;
604     } else {
605     # need to check for real directory
606 root 1.80 aioreq_pri $pri;
607 root 1.74 add $statgrp aio_lstat "$path/$entry", sub {
608     if (-d _) {
609     push @dirs, $entry;
610    
611 root 1.75 unless (--$ndirs) {
612 root 1.74 push @nondirs, @$entries;
613 root 1.75 feed $statgrp;
614 root 1.74 }
615     } else {
616     push @nondirs, $entry;
617 root 1.40 }
618     }
619     }
620 root 1.74 };
621 root 1.40 };
622     };
623     };
624     };
625 root 1.55
626     $grp
627 root 1.40 }
628    
629     =item aio_fsync $fh, $callback->($status)
630 root 1.1
631     Asynchronously call fsync on the given filehandle and call the callback
632     with the fsync result code.
633    
634 root 1.40 =item aio_fdatasync $fh, $callback->($status)
635 root 1.1
636     Asynchronously call fdatasync on the given filehandle and call the
637 root 1.26 callback with the fdatasync result code.
638    
639     If this call isn't available because your OS lacks it or it couldn't be
640     detected, it will be emulated by calling C<fsync> instead.
641 root 1.1
642 root 1.58 =item aio_group $callback->(...)
643 root 1.54
644 root 1.55 This is a very special aio request: Instead of doing something, it is a
645     container for other aio requests, which is useful if you want to bundle
646 root 1.71 many requests into a single, composite, request with a definite callback
647     and the ability to cancel the whole request with its subrequests.
648 root 1.55
649     Returns an object of class L<IO::AIO::GRP>. See its documentation below
650     for more info.
651    
652     Example:
653    
654     my $grp = aio_group sub {
655     print "all stats done\n";
656     };
657    
658     add $grp
659     (aio_stat ...),
660     (aio_stat ...),
661     ...;
662    
663 root 1.63 =item aio_nop $callback->()
664    
665     This is a special request - it does nothing in itself and is only used for
666     side effects, such as when you want to add a dummy request to a group so
667     that finishing the requests in the group depends on executing the given
668     code.
669    
670 root 1.64 While this request does nothing, it still goes through the execution
671     phase and still requires a worker thread. Thus, the callback will not
672     be executed immediately but only after other requests in the queue have
673     entered their execution phase. This can be used to measure request
674     latency.
675    
676 root 1.71 =item IO::AIO::aio_busy $fractional_seconds, $callback->() *NOT EXPORTED*
677 root 1.54
678     Mainly used for debugging and benchmarking, this aio request puts one of
679     the request workers to sleep for the given time.
680    
681 root 1.56 While it is theoretically handy to have simple I/O scheduling requests
682 root 1.71 like sleep and file handle readable/writable, the overhead this creates is
683     immense (it blocks a thread for a long time) so do not use this function
684     except to put your application under artificial I/O pressure.
685 root 1.56
686 root 1.5 =back
687    
688 root 1.53 =head2 IO::AIO::REQ CLASS
689 root 1.52
690     All non-aggregate C<aio_*> functions return an object of this class when
691     called in non-void context.
692    
693     =over 4
694    
695 root 1.65 =item cancel $req
696 root 1.52
697     Cancels the request, if possible. Has the effect of skipping execution
698     when entering the B<execute> state and skipping calling the callback when
699     entering the the B<result> state, but will leave the request otherwise
700     untouched. That means that requests that currently execute will not be
701     stopped and resources held by the request will not be freed prematurely.
702    
703 root 1.65 =item cb $req $callback->(...)
704    
705     Replace (or simply set) the callback registered to the request.
706    
707 root 1.52 =back
708    
709 root 1.55 =head2 IO::AIO::GRP CLASS
710    
711     This class is a subclass of L<IO::AIO::REQ>, so all its methods apply to
712     objects of this class, too.
713    
714     A IO::AIO::GRP object is a special request that can contain multiple other
715     aio requests.
716    
717     You create one by calling the C<aio_group> constructing function with a
718     callback that will be called when all contained requests have entered the
719     C<done> state:
720    
721     my $grp = aio_group sub {
722     print "all requests are done\n";
723     };
724    
725     You add requests by calling the C<add> method with one or more
726     C<IO::AIO::REQ> objects:
727    
728     $grp->add (aio_unlink "...");
729    
730 root 1.58 add $grp aio_stat "...", sub {
731     $_[0] or return $grp->result ("error");
732    
733     # add another request dynamically, if first succeeded
734     add $grp aio_open "...", sub {
735     $grp->result ("ok");
736     };
737     };
738 root 1.55
739     This makes it very easy to create composite requests (see the source of
740     C<aio_move> for an application) that work and feel like simple requests.
741    
742 root 1.62 =over 4
743    
744     =item * The IO::AIO::GRP objects will be cleaned up during calls to
745 root 1.55 C<IO::AIO::poll_cb>, just like any other request.
746    
747 root 1.62 =item * They can be canceled like any other request. Canceling will cancel not
748 root 1.59 only the request itself, but also all requests it contains.
749 root 1.55
750 root 1.62 =item * They can also can also be added to other IO::AIO::GRP objects.
751 root 1.55
752 root 1.62 =item * You must not add requests to a group from within the group callback (or
753 root 1.60 any later time).
754    
755 root 1.62 =back
756    
757 root 1.55 Their lifetime, simplified, looks like this: when they are empty, they
758     will finish very quickly. If they contain only requests that are in the
759     C<done> state, they will also finish. Otherwise they will continue to
760     exist.
761    
762 root 1.57 That means after creating a group you have some time to add requests. And
763     in the callbacks of those requests, you can add further requests to the
764     group. And only when all those requests have finished will the the group
765     itself finish.
766    
767 root 1.55 =over 4
768    
769 root 1.65 =item add $grp ...
770    
771 root 1.55 =item $grp->add (...)
772    
773 root 1.57 Add one or more requests to the group. Any type of L<IO::AIO::REQ> can
774     be added, including other groups, as long as you do not create circular
775     dependencies.
776    
777     Returns all its arguments.
778 root 1.55
779 root 1.74 =item $grp->cancel_subs
780    
781     Cancel all subrequests and clears any feeder, but not the group request
782     itself. Useful when you queued a lot of events but got a result early.
783    
784 root 1.58 =item $grp->result (...)
785    
786     Set the result value(s) that will be passed to the group callback when all
787 root 1.80 subrequests have finished and set thre groups errno to the current value
788     of errno (just like calling C<errno> without an error number). By default,
789     no argument will be passed and errno is zero.
790    
791     =item $grp->errno ([$errno])
792    
793     Sets the group errno value to C<$errno>, or the current value of errno
794     when the argument is missing.
795    
796     Every aio request has an associated errno value that is restored when
797     the callback is invoked. This method lets you change this value from its
798     default (0).
799    
800     Calling C<result> will also set errno, so make sure you either set C<$!>
801     before the call to C<result>, or call c<errno> after it.
802 root 1.58
803 root 1.65 =item feed $grp $callback->($grp)
804 root 1.60
805     Sets a feeder/generator on this group: every group can have an attached
806     generator that generates requests if idle. The idea behind this is that,
807     although you could just queue as many requests as you want in a group,
808     this might starve other requests for a potentially long time. For
809     example, C<aio_scandir> might generate hundreds of thousands C<aio_stat>
810     requests, delaying any later requests for a long time.
811    
812     To avoid this, and allow incremental generation of requests, you can
813     instead a group and set a feeder on it that generates those requests. The
814 root 1.68 feed callback will be called whenever there are few enough (see C<limit>,
815 root 1.60 below) requests active in the group itself and is expected to queue more
816     requests.
817    
818 root 1.68 The feed callback can queue as many requests as it likes (i.e. C<add> does
819     not impose any limits).
820 root 1.60
821 root 1.65 If the feed does not queue more requests when called, it will be
822 root 1.60 automatically removed from the group.
823    
824 root 1.65 If the feed limit is C<0>, it will be set to C<2> automatically.
825 root 1.60
826     Example:
827    
828     # stat all files in @files, but only ever use four aio requests concurrently:
829    
830     my $grp = aio_group sub { print "finished\n" };
831 root 1.68 limit $grp 4;
832 root 1.65 feed $grp sub {
833 root 1.60 my $file = pop @files
834     or return;
835    
836     add $grp aio_stat $file, sub { ... };
837 root 1.65 };
838 root 1.60
839 root 1.68 =item limit $grp $num
840 root 1.60
841     Sets the feeder limit for the group: The feeder will be called whenever
842     the group contains less than this many requests.
843    
844     Setting the limit to C<0> will pause the feeding process.
845    
846 root 1.55 =back
847    
848 root 1.5 =head2 SUPPORT FUNCTIONS
849    
850     =over 4
851    
852     =item $fileno = IO::AIO::poll_fileno
853    
854 root 1.20 Return the I<request result pipe file descriptor>. This filehandle must be
855     polled for reading by some mechanism outside this module (e.g. Event or
856     select, see below or the SYNOPSIS). If the pipe becomes readable you have
857     to call C<poll_cb> to check the results.
858 root 1.5
859     See C<poll_cb> for an example.
860    
861     =item IO::AIO::poll_cb
862    
863     Process all outstanding events on the result pipe. You have to call this
864     regularly. Returns the number of events processed. Returns immediately
865     when no events are outstanding.
866    
867 root 1.78 If not all requests were processed for whatever reason, the filehandle
868     will still be ready when C<poll_cb> returns.
869    
870 root 1.20 Example: Install an Event watcher that automatically calls
871     IO::AIO::poll_cb with high priority:
872 root 1.5
873     Event->io (fd => IO::AIO::poll_fileno,
874     poll => 'r', async => 1,
875     cb => \&IO::AIO::poll_cb);
876    
877 root 1.78 =item IO::AIO::poll_some $max_requests
878    
879     Similar to C<poll_cb>, but only processes up to C<$max_requests> requests
880     at a time.
881    
882     Useful if you want to ensure some level of interactiveness when perl is
883     not fast enough to process all requests in time.
884    
885     Example: Install an Event watcher that automatically calls
886     IO::AIO::poll_some with low priority, to ensure that other parts of the
887     program get the CPU sometimes even under high AIO load.
888    
889     Event->io (fd => IO::AIO::poll_fileno,
890     poll => 'r', nice => 1,
891     cb => sub { IO::AIO::poll_some 256 });
892    
893 root 1.5 =item IO::AIO::poll_wait
894    
895     Wait till the result filehandle becomes ready for reading (simply does a
896 root 1.20 C<select> on the filehandle. This is useful if you want to synchronously wait
897 root 1.5 for some requests to finish).
898    
899     See C<nreqs> for an example.
900    
901     =item IO::AIO::nreqs
902    
903 root 1.80 Returns the number of requests currently in the ready, execute or pending
904     states (i.e. for which their callback has not been invoked yet).
905 root 1.5
906     Example: wait till there are no outstanding requests anymore:
907    
908     IO::AIO::poll_wait, IO::AIO::poll_cb
909     while IO::AIO::nreqs;
910    
911 root 1.80 =item IO::AIO::nready
912    
913     Returns the number of requests currently in the ready state (not yet
914     executed).
915    
916     =item IO::AIO::npending
917    
918     Returns the number of requests currently in the pending state (executed,
919     but not yet processed by poll_cb).
920    
921 root 1.12 =item IO::AIO::flush
922    
923     Wait till all outstanding AIO requests have been handled.
924    
925 root 1.13 Strictly equivalent to:
926    
927     IO::AIO::poll_wait, IO::AIO::poll_cb
928     while IO::AIO::nreqs;
929    
930     =item IO::AIO::poll
931    
932     Waits until some requests have been handled.
933    
934     Strictly equivalent to:
935    
936     IO::AIO::poll_wait, IO::AIO::poll_cb
937     if IO::AIO::nreqs;
938    
939 root 1.5 =item IO::AIO::min_parallel $nthreads
940    
941 root 1.61 Set the minimum number of AIO threads to C<$nthreads>. The current
942     default is C<8>, which means eight asynchronous operations can execute
943     concurrently at any one time (the number of outstanding requests,
944     however, is unlimited).
945 root 1.5
946 root 1.34 IO::AIO starts threads only on demand, when an AIO request is queued and
947     no free thread exists.
948    
949 root 1.61 It is recommended to keep the number of threads relatively low, as some
950     Linux kernel versions will scale negatively with the number of threads
951     (higher parallelity => MUCH higher latency). With current Linux 2.6
952     versions, 4-32 threads should be fine.
953 root 1.5
954 root 1.34 Under most circumstances you don't need to call this function, as the
955     module selects a default that is suitable for low to moderate load.
956 root 1.5
957     =item IO::AIO::max_parallel $nthreads
958    
959 root 1.34 Sets the maximum number of AIO threads to C<$nthreads>. If more than the
960     specified number of threads are currently running, this function kills
961     them. This function blocks until the limit is reached.
962    
963     While C<$nthreads> are zero, aio requests get queued but not executed
964     until the number of threads has been increased again.
965 root 1.5
966     This module automatically runs C<max_parallel 0> at program end, to ensure
967     that all threads are killed and that there are no outstanding requests.
968    
969     Under normal circumstances you don't need to call this function.
970    
971 root 1.79 =item $oldmaxreqs = IO::AIO::max_outstanding $maxreqs
972 root 1.5
973 root 1.79 This is a very bad function to use in interactive programs because it
974     blocks, and a bad way to reduce concurrency because it is inexact: Better
975     use an C<aio_group> together with a feed callback.
976    
977     Sets the maximum number of outstanding requests to C<$nreqs>. If you
978     to queue up more than this number of requests, the next call to the
979     C<poll_cb> (and C<poll_some> and other functions calling C<poll_cb>)
980     function will block until the limit is no longer exceeded.
981    
982     The default value is very large, so there is no practical limit on the
983     number of outstanding requests.
984    
985     You can still queue as many requests as you want. Therefore,
986     C<max_oustsanding> is mainly useful in simple scripts (with low values) or
987     as a stop gap to shield against fatal memory overflow (with large values).
988 root 1.5
989     =back
990    
991 root 1.1 =cut
992    
993 root 1.2 # support function to convert a fd into a perl filehandle
994     sub _fd2fh {
995     return undef if $_[0] < 0;
996    
997 root 1.23 # try to generate nice filehandles
998     my $sym = "IO::AIO::fd#$_[0]";
999     local *$sym;
1000 root 1.25
1001 root 1.27 open *$sym, "+<&=$_[0]" # usually works under any unix
1002     or open *$sym, "<&=$_[0]" # cygwin needs this
1003     or open *$sym, ">&=$_[0]" # or this
1004 root 1.2 or return undef;
1005    
1006 root 1.23 *$sym
1007 root 1.2 }
1008    
1009 root 1.61 min_parallel 8;
1010 root 1.1
1011 root 1.82 END {
1012 root 1.84 min_parallel 1;
1013 root 1.82 flush;
1014     };
1015    
1016 root 1.1 1;
1017    
1018 root 1.27 =head2 FORK BEHAVIOUR
1019    
1020 root 1.52 This module should do "the right thing" when the process using it forks:
1021    
1022 root 1.34 Before the fork, IO::AIO enters a quiescent state where no requests
1023     can be added in other threads and no results will be processed. After
1024     the fork the parent simply leaves the quiescent state and continues
1025 root 1.72 request/result processing, while the child frees the request/result queue
1026     (so that the requests started before the fork will only be handled in the
1027     parent). Threads will be started on demand until the limit set in the
1028 root 1.34 parent process has been reached again.
1029 root 1.27
1030 root 1.52 In short: the parent will, after a short pause, continue as if fork had
1031     not been called, while the child will act as if IO::AIO has not been used
1032     yet.
1033    
1034 root 1.60 =head2 MEMORY USAGE
1035    
1036 root 1.72 Per-request usage:
1037    
1038     Each aio request uses - depending on your architecture - around 100-200
1039     bytes of memory. In addition, stat requests need a stat buffer (possibly
1040     a few hundred bytes), readdir requires a result buffer and so on. Perl
1041     scalars and other data passed into aio requests will also be locked and
1042     will consume memory till the request has entered the done state.
1043 root 1.60
1044     This is now awfully much, so queuing lots of requests is not usually a
1045     problem.
1046    
1047 root 1.72 Per-thread usage:
1048    
1049     In the execution phase, some aio requests require more memory for
1050     temporary buffers, and each thread requires a stack and other data
1051     structures (usually around 16k-128k, depending on the OS).
1052    
1053     =head1 KNOWN BUGS
1054    
1055 root 1.73 Known bugs will be fixed in the next release.
1056 root 1.60
1057 root 1.1 =head1 SEE ALSO
1058    
1059 root 1.68 L<Coro::AIO>.
1060 root 1.1
1061     =head1 AUTHOR
1062    
1063     Marc Lehmann <schmorp@schmorp.de>
1064     http://home.schmorp.de/
1065    
1066     =cut
1067