ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/IO-AIO/AIO.pm
Revision: 1.86
Committed: Sat Oct 28 23:32:29 2006 UTC (17 years, 7 months ago) by root
Branch: MAIN
Changes since 1.85: +121 -37 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3     IO::AIO - Asynchronous Input/Output
4    
5     =head1 SYNOPSIS
6    
7     use IO::AIO;
8    
9 root 1.6 aio_open "/etc/passwd", O_RDONLY, 0, sub {
10     my ($fh) = @_;
11     ...
12     };
13    
14     aio_unlink "/tmp/file", sub { };
15    
16     aio_read $fh, 30000, 1024, $buffer, 0, sub {
17 root 1.8 $_[0] > 0 or die "read error: $!";
18 root 1.6 };
19    
20 root 1.56 # version 2+ has request and group objects
21     use IO::AIO 2;
22 root 1.52
23 root 1.68 aioreq_pri 4; # give next request a very high priority
24 root 1.52 my $req = aio_unlink "/tmp/file", sub { };
25     $req->cancel; # cancel request if still in queue
26    
27 root 1.56 my $grp = aio_group sub { print "all stats done\n" };
28     add $grp aio_stat "..." for ...;
29    
30     # AnyEvent integration
31 root 1.42 open my $fh, "<&=" . IO::AIO::poll_fileno or die "$!";
32     my $w = AnyEvent->io (fh => $fh, poll => 'r', cb => sub { IO::AIO::poll_cb });
33    
34 root 1.56 # Event integration
35 root 1.6 Event->io (fd => IO::AIO::poll_fileno,
36 root 1.7 poll => 'r',
37 root 1.6 cb => \&IO::AIO::poll_cb);
38    
39 root 1.56 # Glib/Gtk2 integration
40 root 1.6 add_watch Glib::IO IO::AIO::poll_fileno,
41 root 1.22 in => sub { IO::AIO::poll_cb; 1 };
42 root 1.6
43 root 1.56 # Tk integration
44 root 1.6 Tk::Event::IO->fileevent (IO::AIO::poll_fileno, "",
45     readable => \&IO::AIO::poll_cb);
46    
47 root 1.56 # Danga::Socket integration
48 root 1.11 Danga::Socket->AddOtherFds (IO::AIO::poll_fileno =>
49     \&IO::AIO::poll_cb);
50    
51 root 1.1 =head1 DESCRIPTION
52    
53     This module implements asynchronous I/O using whatever means your
54 root 1.2 operating system supports.
55 root 1.1
56 root 1.85 Asynchronous means that operations that can normally block your program
57     (e.g. reading from disk) will be done asynchronously: the operation
58     will still block, but you can do something else in the meantime. This
59     is extremely useful for programs that need to stay interactive even
60     when doing heavy I/O (GUI programs, high performance network servers
61     etc.), but can also be used to easily do operations in parallel that are
62     normally done sequentially, e.g. stat'ing many files, which is much faster
63     on a RAID volume or over NFS when you do a number of stat operations
64     concurrently.
65    
66     While this works on all types of file descriptors (for example sockets),
67     using these functions on file descriptors that support nonblocking
68     operation (again, sockets, pipes etc.) is very inefficient. Use an event
69     loop for that (such as the L<Event|Event> module): IO::AIO will naturally
70     fit into such an event loop itself.
71    
72 root 1.72 In this version, a number of threads are started that execute your
73     requests and signal their completion. You don't need thread support
74     in perl, and the threads created by this module will not be visible
75     to perl. In the future, this module might make use of the native aio
76     functions available on many operating systems. However, they are often
77 root 1.85 not well-supported or restricted (GNU/Linux doesn't allow them on normal
78 root 1.72 files currently, for example), and they would only support aio_read and
79     aio_write, so the remaining functionality would have to be implemented
80     using threads anyway.
81    
82     Although the module will work with in the presence of other (Perl-)
83     threads, it is currently not reentrant in any way, so use appropriate
84     locking yourself, always call C<poll_cb> from within the same thread, or
85     never call C<poll_cb> (or other C<aio_> functions) recursively.
86    
87 root 1.86 =head2 EXAMPLE
88    
89     This is a simple example that uses the Event module and loads
90     F</etc/passwd> asynchronously:
91    
92     use Fcntl;
93     use Event;
94     use IO::AIO;
95    
96     # register the IO::AIO callback with Event
97     Event->io (fd => IO::AIO::poll_fileno,
98     poll => 'r',
99     cb => \&IO::AIO::poll_cb);
100    
101     # queue the request to open /etc/passwd
102     aio_open "/etc/passwd", O_RDONLY, 0, sub {
103     my $fh = $_[0]
104     or die "error while opening: $!";
105    
106     # stat'ing filehandles is generally non-blocking
107     my $size = -s $fh;
108    
109     # queue a request to read the file
110     my $contents;
111     aio_read $fh, 0, $size, $contents, 0, sub {
112     $_[0] == $size
113     or die "short read: $!";
114    
115     close $fh;
116    
117     # file contents now in $contents
118     print $contents;
119    
120     # exit event loop and program
121     Event::unloop;
122     };
123     };
124    
125     # possibly queue up other requests, or open GUI windows,
126     # check for sockets etc. etc.
127    
128     # process events as long as there are some:
129     Event::loop;
130    
131 root 1.72 =head1 REQUEST ANATOMY AND LIFETIME
132    
133     Every C<aio_*> function creates a request. which is a C data structure not
134     directly visible to Perl.
135    
136     If called in non-void context, every request function returns a Perl
137     object representing the request. In void context, nothing is returned,
138     which saves a bit of memory.
139    
140     The perl object is a fairly standard ref-to-hash object. The hash contents
141     are not used by IO::AIO so you are free to store anything you like in it.
142    
143     During their existance, aio requests travel through the following states,
144     in order:
145    
146     =over 4
147    
148     =item ready
149    
150     Immediately after a request is created it is put into the ready state,
151     waiting for a thread to execute it.
152    
153     =item execute
154    
155     A thread has accepted the request for processing and is currently
156     executing it (e.g. blocking in read).
157    
158     =item pending
159    
160     The request has been executed and is waiting for result processing.
161    
162     While request submission and execution is fully asynchronous, result
163     processing is not and relies on the perl interpreter calling C<poll_cb>
164     (or another function with the same effect).
165    
166     =item result
167    
168     The request results are processed synchronously by C<poll_cb>.
169    
170     The C<poll_cb> function will process all outstanding aio requests by
171     calling their callbacks, freeing memory associated with them and managing
172     any groups they are contained in.
173    
174     =item done
175    
176     Request has reached the end of its lifetime and holds no resources anymore
177     (except possibly for the Perl object, but its connection to the actual
178     aio request is severed and calling its methods will either do nothing or
179     result in a runtime error).
180 root 1.1
181     =cut
182    
183     package IO::AIO;
184    
185 root 1.23 no warnings;
186 root 1.51 use strict 'vars';
187 root 1.23
188 root 1.1 use base 'Exporter';
189    
190     BEGIN {
191 root 1.86 our $VERSION = '2.1';
192 root 1.1
193 root 1.67 our @AIO_REQ = qw(aio_sendfile aio_read aio_write aio_open aio_close aio_stat
194     aio_lstat aio_unlink aio_rmdir aio_readdir aio_scandir aio_symlink
195     aio_fsync aio_fdatasync aio_readahead aio_rename aio_link aio_move
196 root 1.82 aio_copy aio_group aio_nop aio_mknod);
197 root 1.70 our @EXPORT = (@AIO_REQ, qw(aioreq_pri aioreq_nice));
198 root 1.67 our @EXPORT_OK = qw(poll_fileno poll_cb poll_wait flush
199 root 1.86 min_parallel max_parallel max_idle
200     nreqs nready npending nthreads
201     max_poll_time max_poll_reqs);
202 root 1.1
203 root 1.54 @IO::AIO::GRP::ISA = 'IO::AIO::REQ';
204    
205 root 1.1 require XSLoader;
206 root 1.51 XSLoader::load ("IO::AIO", $VERSION);
207 root 1.1 }
208    
209 root 1.5 =head1 FUNCTIONS
210 root 1.1
211 root 1.5 =head2 AIO FUNCTIONS
212 root 1.1
213 root 1.5 All the C<aio_*> calls are more or less thin wrappers around the syscall
214     with the same name (sans C<aio_>). The arguments are similar or identical,
215 root 1.14 and they all accept an additional (and optional) C<$callback> argument
216     which must be a code reference. This code reference will get called with
217     the syscall return code (e.g. most syscalls return C<-1> on error, unlike
218     perl, which usually delivers "false") as it's sole argument when the given
219     syscall has been executed asynchronously.
220 root 1.1
221 root 1.23 All functions expecting a filehandle keep a copy of the filehandle
222     internally until the request has finished.
223 root 1.1
224 root 1.55 All requests return objects of type L<IO::AIO::REQ> that allow further
225     manipulation of those requests while they are in-flight.
226 root 1.52
227 root 1.28 The pathnames you pass to these routines I<must> be absolute and
228     encoded in byte form. The reason for the former is that at the time the
229     request is being executed, the current working directory could have
230     changed. Alternatively, you can make sure that you never change the
231     current working directory.
232    
233     To encode pathnames to byte form, either make sure you either: a)
234     always pass in filenames you got from outside (command line, readdir
235     etc.), b) are ASCII or ISO 8859-1, c) use the Encode module and encode
236     your pathnames to the locale (or other) encoding in effect in the user
237     environment, d) use Glib::filename_from_unicode on unicode filenames or e)
238     use something else.
239 root 1.1
240 root 1.5 =over 4
241 root 1.1
242 root 1.80 =item $prev_pri = aioreq_pri [$pri]
243 root 1.68
244 root 1.80 Returns the priority value that would be used for the next request and, if
245     C<$pri> is given, sets the priority for the next aio request.
246 root 1.68
247 root 1.80 The default priority is C<0>, the minimum and maximum priorities are C<-4>
248     and C<4>, respectively. Requests with higher priority will be serviced
249     first.
250    
251     The priority will be reset to C<0> after each call to one of the C<aio_*>
252 root 1.68 functions.
253    
254 root 1.69 Example: open a file with low priority, then read something from it with
255     higher priority so the read request is serviced before other low priority
256     open requests (potentially spamming the cache):
257    
258     aioreq_pri -3;
259     aio_open ..., sub {
260     return unless $_[0];
261    
262     aioreq_pri -2;
263     aio_read $_[0], ..., sub {
264     ...
265     };
266     };
267    
268     =item aioreq_nice $pri_adjust
269    
270     Similar to C<aioreq_pri>, but subtracts the given value from the current
271     priority, so effects are cumulative.
272    
273 root 1.40 =item aio_open $pathname, $flags, $mode, $callback->($fh)
274 root 1.1
275 root 1.2 Asynchronously open or create a file and call the callback with a newly
276     created filehandle for the file.
277 root 1.1
278     The pathname passed to C<aio_open> must be absolute. See API NOTES, above,
279     for an explanation.
280    
281 root 1.20 The C<$flags> argument is a bitmask. See the C<Fcntl> module for a
282     list. They are the same as used by C<sysopen>.
283    
284     Likewise, C<$mode> specifies the mode of the newly created file, if it
285     didn't exist and C<O_CREAT> has been given, just like perl's C<sysopen>,
286     except that it is mandatory (i.e. use C<0> if you don't create new files,
287     and C<0666> or C<0777> if you do).
288 root 1.1
289     Example:
290    
291     aio_open "/etc/passwd", O_RDONLY, 0, sub {
292 root 1.2 if ($_[0]) {
293     print "open successful, fh is $_[0]\n";
294 root 1.1 ...
295     } else {
296     die "open failed: $!\n";
297     }
298     };
299    
300 root 1.40 =item aio_close $fh, $callback->($status)
301 root 1.1
302 root 1.2 Asynchronously close a file and call the callback with the result
303     code. I<WARNING:> although accepted, you should not pass in a perl
304 root 1.20 filehandle here, as perl will likely close the file descriptor another
305     time when the filehandle is destroyed. Normally, you can safely call perls
306     C<close> or just let filehandles go out of scope.
307    
308     This is supposed to be a bug in the API, so that might change. It's
309     therefore best to avoid this function.
310 root 1.1
311 root 1.40 =item aio_read $fh,$offset,$length, $data,$dataoffset, $callback->($retval)
312 root 1.1
313 root 1.40 =item aio_write $fh,$offset,$length, $data,$dataoffset, $callback->($retval)
314 root 1.1
315     Reads or writes C<length> bytes from the specified C<fh> and C<offset>
316     into the scalar given by C<data> and offset C<dataoffset> and calls the
317     callback without the actual number of bytes read (or -1 on error, just
318     like the syscall).
319    
320 root 1.31 The C<$data> scalar I<MUST NOT> be modified in any way while the request
321     is outstanding. Modifying it can result in segfaults or WW3 (if the
322     necessary/optional hardware is installed).
323    
324 root 1.17 Example: Read 15 bytes at offset 7 into scalar C<$buffer>, starting at
325 root 1.1 offset C<0> within the scalar:
326    
327     aio_read $fh, 7, 15, $buffer, 0, sub {
328 root 1.9 $_[0] > 0 or die "read error: $!";
329     print "read $_[0] bytes: <$buffer>\n";
330 root 1.1 };
331    
332 root 1.40 =item aio_sendfile $out_fh, $in_fh, $in_offset, $length, $callback->($retval)
333 root 1.35
334     Tries to copy C<$length> bytes from C<$in_fh> to C<$out_fh>. It starts
335     reading at byte offset C<$in_offset>, and starts writing at the current
336     file offset of C<$out_fh>. Because of that, it is not safe to issue more
337     than one C<aio_sendfile> per C<$out_fh>, as they will interfere with each
338     other.
339    
340     This call tries to make use of a native C<sendfile> syscall to provide
341     zero-copy operation. For this to work, C<$out_fh> should refer to a
342     socket, and C<$in_fh> should refer to mmap'able file.
343    
344     If the native sendfile call fails or is not implemented, it will be
345 root 1.36 emulated, so you can call C<aio_sendfile> on any type of filehandle
346     regardless of the limitations of the operating system.
347 root 1.35
348     Please note, however, that C<aio_sendfile> can read more bytes from
349     C<$in_fh> than are written, and there is no way to find out how many
350 root 1.36 bytes have been read from C<aio_sendfile> alone, as C<aio_sendfile> only
351     provides the number of bytes written to C<$out_fh>. Only if the result
352     value equals C<$length> one can assume that C<$length> bytes have been
353     read.
354 root 1.35
355 root 1.40 =item aio_readahead $fh,$offset,$length, $callback->($retval)
356 root 1.1
357 root 1.20 C<aio_readahead> populates the page cache with data from a file so that
358 root 1.1 subsequent reads from that file will not block on disk I/O. The C<$offset>
359     argument specifies the starting point from which data is to be read and
360     C<$length> specifies the number of bytes to be read. I/O is performed in
361     whole pages, so that offset is effectively rounded down to a page boundary
362     and bytes are read up to the next page boundary greater than or equal to
363 root 1.20 (off-set+length). C<aio_readahead> does not read beyond the end of the
364 root 1.1 file. The current file offset of the file is left unchanged.
365    
366 root 1.26 If that syscall doesn't exist (likely if your OS isn't Linux) it will be
367     emulated by simply reading the data, which would have a similar effect.
368    
369 root 1.40 =item aio_stat $fh_or_path, $callback->($status)
370 root 1.1
371 root 1.40 =item aio_lstat $fh, $callback->($status)
372 root 1.1
373     Works like perl's C<stat> or C<lstat> in void context. The callback will
374     be called after the stat and the results will be available using C<stat _>
375     or C<-s _> etc...
376    
377     The pathname passed to C<aio_stat> must be absolute. See API NOTES, above,
378     for an explanation.
379    
380     Currently, the stats are always 64-bit-stats, i.e. instead of returning an
381     error when stat'ing a large file, the results will be silently truncated
382     unless perl itself is compiled with large file support.
383    
384     Example: Print the length of F</etc/passwd>:
385    
386     aio_stat "/etc/passwd", sub {
387     $_[0] and die "stat failed: $!";
388     print "size is ", -s _, "\n";
389     };
390    
391 root 1.40 =item aio_unlink $pathname, $callback->($status)
392 root 1.1
393     Asynchronously unlink (delete) a file and call the callback with the
394     result code.
395    
396 root 1.82 =item aio_mknod $path, $mode, $dev, $callback->($status)
397    
398 root 1.86 [EXPERIMENTAL]
399    
400 root 1.83 Asynchronously create a device node (or fifo). See mknod(2).
401    
402 root 1.86 The only (POSIX-) portable way of calling this function is:
403 root 1.83
404     aio_mknod $path, IO::AIO::S_IFIFO | $mode, 0, sub { ...
405 root 1.82
406 root 1.50 =item aio_link $srcpath, $dstpath, $callback->($status)
407    
408     Asynchronously create a new link to the existing object at C<$srcpath> at
409     the path C<$dstpath> and call the callback with the result code.
410    
411     =item aio_symlink $srcpath, $dstpath, $callback->($status)
412    
413     Asynchronously create a new symbolic link to the existing object at C<$srcpath> at
414     the path C<$dstpath> and call the callback with the result code.
415    
416     =item aio_rename $srcpath, $dstpath, $callback->($status)
417    
418     Asynchronously rename the object at C<$srcpath> to C<$dstpath>, just as
419     rename(2) and call the callback with the result code.
420    
421 root 1.40 =item aio_rmdir $pathname, $callback->($status)
422 root 1.27
423     Asynchronously rmdir (delete) a directory and call the callback with the
424     result code.
425    
426 root 1.46 =item aio_readdir $pathname, $callback->($entries)
427 root 1.37
428     Unlike the POSIX call of the same name, C<aio_readdir> reads an entire
429     directory (i.e. opendir + readdir + closedir). The entries will not be
430     sorted, and will B<NOT> include the C<.> and C<..> entries.
431    
432     The callback a single argument which is either C<undef> or an array-ref
433     with the filenames.
434    
435 root 1.82 =item aio_copy $srcpath, $dstpath, $callback->($status)
436    
437     Try to copy the I<file> (directories not supported as either source or
438     destination) from C<$srcpath> to C<$dstpath> and call the callback with
439     the C<0> (error) or C<-1> ok.
440    
441     This is a composite request that it creates the destination file with
442     mode 0200 and copies the contents of the source file into it using
443     C<aio_sendfile>, followed by restoring atime, mtime, access mode and
444     uid/gid, in that order.
445    
446     If an error occurs, the partial destination file will be unlinked, if
447     possible, except when setting atime, mtime, access mode and uid/gid, where
448     errors are being ignored.
449    
450     =cut
451    
452     sub aio_copy($$;$) {
453     my ($src, $dst, $cb) = @_;
454    
455     my $pri = aioreq_pri;
456     my $grp = aio_group $cb;
457    
458     aioreq_pri $pri;
459     add $grp aio_open $src, O_RDONLY, 0, sub {
460     if (my $src_fh = $_[0]) {
461     my @stat = stat $src_fh;
462    
463     aioreq_pri $pri;
464     add $grp aio_open $dst, O_CREAT | O_WRONLY | O_TRUNC, 0200, sub {
465     if (my $dst_fh = $_[0]) {
466     aioreq_pri $pri;
467     add $grp aio_sendfile $dst_fh, $src_fh, 0, $stat[7], sub {
468     if ($_[0] == $stat[7]) {
469     $grp->result (0);
470     close $src_fh;
471    
472     # those should not normally block. should. should.
473     utime $stat[8], $stat[9], $dst;
474     chmod $stat[2] & 07777, $dst_fh;
475     chown $stat[4], $stat[5], $dst_fh;
476     close $dst_fh;
477     } else {
478     $grp->result (-1);
479     close $src_fh;
480     close $dst_fh;
481    
482     aioreq $pri;
483     add $grp aio_unlink $dst;
484     }
485     };
486     } else {
487     $grp->result (-1);
488     }
489     },
490    
491     } else {
492     $grp->result (-1);
493     }
494     };
495    
496     $grp
497     }
498    
499     =item aio_move $srcpath, $dstpath, $callback->($status)
500    
501     Try to move the I<file> (directories not supported as either source or
502     destination) from C<$srcpath> to C<$dstpath> and call the callback with
503     the C<0> (error) or C<-1> ok.
504    
505     This is a composite request that tries to rename(2) the file first. If
506     rename files with C<EXDEV>, it copies the file with C<aio_copy> and, if
507     that is successful, unlinking the C<$srcpath>.
508    
509     =cut
510    
511     sub aio_move($$;$) {
512     my ($src, $dst, $cb) = @_;
513    
514     my $pri = aioreq_pri;
515     my $grp = aio_group $cb;
516    
517     aioreq_pri $pri;
518     add $grp aio_rename $src, $dst, sub {
519     if ($_[0] && $! == EXDEV) {
520     aioreq_pri $pri;
521     add $grp aio_copy $src, $dst, sub {
522     $grp->result ($_[0]);
523    
524     if (!$_[0]) {
525     aioreq_pri $pri;
526     add $grp aio_unlink $src;
527     }
528     };
529     } else {
530     $grp->result ($_[0]);
531     }
532     };
533    
534     $grp
535     }
536    
537 root 1.40 =item aio_scandir $path, $maxreq, $callback->($dirs, $nondirs)
538    
539 root 1.52 Scans a directory (similar to C<aio_readdir>) but additionally tries to
540 root 1.76 efficiently separate the entries of directory C<$path> into two sets of
541     names, directories you can recurse into (directories), and ones you cannot
542     recurse into (everything else, including symlinks to directories).
543 root 1.52
544 root 1.61 C<aio_scandir> is a composite request that creates of many sub requests_
545     C<$maxreq> specifies the maximum number of outstanding aio requests that
546     this function generates. If it is C<< <= 0 >>, then a suitable default
547 root 1.81 will be chosen (currently 4).
548 root 1.40
549     On error, the callback is called without arguments, otherwise it receives
550     two array-refs with path-relative entry names.
551    
552     Example:
553    
554     aio_scandir $dir, 0, sub {
555     my ($dirs, $nondirs) = @_;
556     print "real directories: @$dirs\n";
557     print "everything else: @$nondirs\n";
558     };
559    
560     Implementation notes.
561    
562     The C<aio_readdir> cannot be avoided, but C<stat()>'ing every entry can.
563    
564     After reading the directory, the modification time, size etc. of the
565 root 1.52 directory before and after the readdir is checked, and if they match (and
566     isn't the current time), the link count will be used to decide how many
567     entries are directories (if >= 2). Otherwise, no knowledge of the number
568     of subdirectories will be assumed.
569    
570     Then entries will be sorted into likely directories (everything without
571     a non-initial dot currently) and likely non-directories (everything
572     else). Then every entry plus an appended C</.> will be C<stat>'ed,
573     likely directories first. If that succeeds, it assumes that the entry
574     is a directory or a symlink to directory (which will be checked
575     seperately). This is often faster than stat'ing the entry itself because
576     filesystems might detect the type of the entry without reading the inode
577     data (e.g. ext2fs filetype feature).
578    
579     If the known number of directories (link count - 2) has been reached, the
580     rest of the entries is assumed to be non-directories.
581    
582     This only works with certainty on POSIX (= UNIX) filesystems, which
583     fortunately are the vast majority of filesystems around.
584    
585     It will also likely work on non-POSIX filesystems with reduced efficiency
586     as those tend to return 0 or 1 as link counts, which disables the
587     directory counting heuristic.
588 root 1.40
589     =cut
590    
591     sub aio_scandir($$$) {
592     my ($path, $maxreq, $cb) = @_;
593    
594 root 1.80 my $pri = aioreq_pri;
595    
596 root 1.58 my $grp = aio_group $cb;
597 root 1.55
598 root 1.81 $maxreq = 4 if $maxreq <= 0;
599 root 1.40
600     # stat once
601 root 1.80 aioreq_pri $pri;
602 root 1.55 add $grp aio_stat $path, sub {
603 root 1.58 return $grp->result () if $_[0];
604 root 1.52 my $now = time;
605 root 1.40 my $hash1 = join ":", (stat _)[0,1,3,7,9];
606    
607     # read the directory entries
608 root 1.80 aioreq_pri $pri;
609 root 1.55 add $grp aio_readdir $path, sub {
610 root 1.40 my $entries = shift
611 root 1.58 or return $grp->result ();
612 root 1.40
613     # stat the dir another time
614 root 1.80 aioreq_pri $pri;
615 root 1.55 add $grp aio_stat $path, sub {
616 root 1.40 my $hash2 = join ":", (stat _)[0,1,3,7,9];
617    
618     my $ndirs;
619    
620     # take the slow route if anything looks fishy
621 root 1.52 if ($hash1 ne $hash2 or (stat _)[9] == $now) {
622 root 1.40 $ndirs = -1;
623     } else {
624     # if nlink == 2, we are finished
625     # on non-posix-fs's, we rely on nlink < 2
626     $ndirs = (stat _)[3] - 2
627 root 1.58 or return $grp->result ([], $entries);
628 root 1.40 }
629    
630     # sort into likely dirs and likely nondirs
631     # dirs == files without ".", short entries first
632     $entries = [map $_->[0],
633     sort { $b->[1] cmp $a->[1] }
634     map [$_, sprintf "%s%04d", (/.\./ ? "1" : "0"), length],
635     @$entries];
636    
637     my (@dirs, @nondirs);
638    
639 root 1.74 my $statgrp = add $grp aio_group sub {
640     $grp->result (\@dirs, \@nondirs);
641 root 1.40 };
642    
643 root 1.74 limit $statgrp $maxreq;
644     feed $statgrp sub {
645     return unless @$entries;
646     my $entry = pop @$entries;
647    
648 root 1.80 aioreq_pri $pri;
649 root 1.74 add $statgrp aio_stat "$path/$entry/.", sub {
650     if ($_[0] < 0) {
651     push @nondirs, $entry;
652     } else {
653     # need to check for real directory
654 root 1.80 aioreq_pri $pri;
655 root 1.74 add $statgrp aio_lstat "$path/$entry", sub {
656     if (-d _) {
657     push @dirs, $entry;
658    
659 root 1.75 unless (--$ndirs) {
660 root 1.74 push @nondirs, @$entries;
661 root 1.75 feed $statgrp;
662 root 1.74 }
663     } else {
664     push @nondirs, $entry;
665 root 1.40 }
666     }
667     }
668 root 1.74 };
669 root 1.40 };
670     };
671     };
672     };
673 root 1.55
674     $grp
675 root 1.40 }
676    
677     =item aio_fsync $fh, $callback->($status)
678 root 1.1
679     Asynchronously call fsync on the given filehandle and call the callback
680     with the fsync result code.
681    
682 root 1.40 =item aio_fdatasync $fh, $callback->($status)
683 root 1.1
684     Asynchronously call fdatasync on the given filehandle and call the
685 root 1.26 callback with the fdatasync result code.
686    
687     If this call isn't available because your OS lacks it or it couldn't be
688     detected, it will be emulated by calling C<fsync> instead.
689 root 1.1
690 root 1.58 =item aio_group $callback->(...)
691 root 1.54
692 root 1.55 This is a very special aio request: Instead of doing something, it is a
693     container for other aio requests, which is useful if you want to bundle
694 root 1.71 many requests into a single, composite, request with a definite callback
695     and the ability to cancel the whole request with its subrequests.
696 root 1.55
697     Returns an object of class L<IO::AIO::GRP>. See its documentation below
698     for more info.
699    
700     Example:
701    
702     my $grp = aio_group sub {
703     print "all stats done\n";
704     };
705    
706     add $grp
707     (aio_stat ...),
708     (aio_stat ...),
709     ...;
710    
711 root 1.63 =item aio_nop $callback->()
712    
713     This is a special request - it does nothing in itself and is only used for
714     side effects, such as when you want to add a dummy request to a group so
715     that finishing the requests in the group depends on executing the given
716     code.
717    
718 root 1.64 While this request does nothing, it still goes through the execution
719     phase and still requires a worker thread. Thus, the callback will not
720     be executed immediately but only after other requests in the queue have
721     entered their execution phase. This can be used to measure request
722     latency.
723    
724 root 1.71 =item IO::AIO::aio_busy $fractional_seconds, $callback->() *NOT EXPORTED*
725 root 1.54
726     Mainly used for debugging and benchmarking, this aio request puts one of
727     the request workers to sleep for the given time.
728    
729 root 1.56 While it is theoretically handy to have simple I/O scheduling requests
730 root 1.71 like sleep and file handle readable/writable, the overhead this creates is
731     immense (it blocks a thread for a long time) so do not use this function
732     except to put your application under artificial I/O pressure.
733 root 1.56
734 root 1.5 =back
735    
736 root 1.53 =head2 IO::AIO::REQ CLASS
737 root 1.52
738     All non-aggregate C<aio_*> functions return an object of this class when
739     called in non-void context.
740    
741     =over 4
742    
743 root 1.65 =item cancel $req
744 root 1.52
745     Cancels the request, if possible. Has the effect of skipping execution
746     when entering the B<execute> state and skipping calling the callback when
747     entering the the B<result> state, but will leave the request otherwise
748     untouched. That means that requests that currently execute will not be
749     stopped and resources held by the request will not be freed prematurely.
750    
751 root 1.65 =item cb $req $callback->(...)
752    
753     Replace (or simply set) the callback registered to the request.
754    
755 root 1.52 =back
756    
757 root 1.55 =head2 IO::AIO::GRP CLASS
758    
759     This class is a subclass of L<IO::AIO::REQ>, so all its methods apply to
760     objects of this class, too.
761    
762     A IO::AIO::GRP object is a special request that can contain multiple other
763     aio requests.
764    
765     You create one by calling the C<aio_group> constructing function with a
766     callback that will be called when all contained requests have entered the
767     C<done> state:
768    
769     my $grp = aio_group sub {
770     print "all requests are done\n";
771     };
772    
773     You add requests by calling the C<add> method with one or more
774     C<IO::AIO::REQ> objects:
775    
776     $grp->add (aio_unlink "...");
777    
778 root 1.58 add $grp aio_stat "...", sub {
779     $_[0] or return $grp->result ("error");
780    
781     # add another request dynamically, if first succeeded
782     add $grp aio_open "...", sub {
783     $grp->result ("ok");
784     };
785     };
786 root 1.55
787     This makes it very easy to create composite requests (see the source of
788     C<aio_move> for an application) that work and feel like simple requests.
789    
790 root 1.62 =over 4
791    
792     =item * The IO::AIO::GRP objects will be cleaned up during calls to
793 root 1.55 C<IO::AIO::poll_cb>, just like any other request.
794    
795 root 1.62 =item * They can be canceled like any other request. Canceling will cancel not
796 root 1.59 only the request itself, but also all requests it contains.
797 root 1.55
798 root 1.62 =item * They can also can also be added to other IO::AIO::GRP objects.
799 root 1.55
800 root 1.62 =item * You must not add requests to a group from within the group callback (or
801 root 1.60 any later time).
802    
803 root 1.62 =back
804    
805 root 1.55 Their lifetime, simplified, looks like this: when they are empty, they
806     will finish very quickly. If they contain only requests that are in the
807     C<done> state, they will also finish. Otherwise they will continue to
808     exist.
809    
810 root 1.57 That means after creating a group you have some time to add requests. And
811     in the callbacks of those requests, you can add further requests to the
812     group. And only when all those requests have finished will the the group
813     itself finish.
814    
815 root 1.55 =over 4
816    
817 root 1.65 =item add $grp ...
818    
819 root 1.55 =item $grp->add (...)
820    
821 root 1.57 Add one or more requests to the group. Any type of L<IO::AIO::REQ> can
822     be added, including other groups, as long as you do not create circular
823     dependencies.
824    
825     Returns all its arguments.
826 root 1.55
827 root 1.74 =item $grp->cancel_subs
828    
829     Cancel all subrequests and clears any feeder, but not the group request
830     itself. Useful when you queued a lot of events but got a result early.
831    
832 root 1.58 =item $grp->result (...)
833    
834     Set the result value(s) that will be passed to the group callback when all
835 root 1.80 subrequests have finished and set thre groups errno to the current value
836     of errno (just like calling C<errno> without an error number). By default,
837     no argument will be passed and errno is zero.
838    
839     =item $grp->errno ([$errno])
840    
841     Sets the group errno value to C<$errno>, or the current value of errno
842     when the argument is missing.
843    
844     Every aio request has an associated errno value that is restored when
845     the callback is invoked. This method lets you change this value from its
846     default (0).
847    
848     Calling C<result> will also set errno, so make sure you either set C<$!>
849     before the call to C<result>, or call c<errno> after it.
850 root 1.58
851 root 1.65 =item feed $grp $callback->($grp)
852 root 1.60
853     Sets a feeder/generator on this group: every group can have an attached
854     generator that generates requests if idle. The idea behind this is that,
855     although you could just queue as many requests as you want in a group,
856     this might starve other requests for a potentially long time. For
857     example, C<aio_scandir> might generate hundreds of thousands C<aio_stat>
858     requests, delaying any later requests for a long time.
859    
860     To avoid this, and allow incremental generation of requests, you can
861     instead a group and set a feeder on it that generates those requests. The
862 root 1.68 feed callback will be called whenever there are few enough (see C<limit>,
863 root 1.60 below) requests active in the group itself and is expected to queue more
864     requests.
865    
866 root 1.68 The feed callback can queue as many requests as it likes (i.e. C<add> does
867     not impose any limits).
868 root 1.60
869 root 1.65 If the feed does not queue more requests when called, it will be
870 root 1.60 automatically removed from the group.
871    
872 root 1.65 If the feed limit is C<0>, it will be set to C<2> automatically.
873 root 1.60
874     Example:
875    
876     # stat all files in @files, but only ever use four aio requests concurrently:
877    
878     my $grp = aio_group sub { print "finished\n" };
879 root 1.68 limit $grp 4;
880 root 1.65 feed $grp sub {
881 root 1.60 my $file = pop @files
882     or return;
883    
884     add $grp aio_stat $file, sub { ... };
885 root 1.65 };
886 root 1.60
887 root 1.68 =item limit $grp $num
888 root 1.60
889     Sets the feeder limit for the group: The feeder will be called whenever
890     the group contains less than this many requests.
891    
892     Setting the limit to C<0> will pause the feeding process.
893    
894 root 1.55 =back
895    
896 root 1.5 =head2 SUPPORT FUNCTIONS
897    
898 root 1.86 =head3 EVENT PROCESSING AND EVENT LOOP INTEGRATION
899    
900 root 1.5 =over 4
901    
902     =item $fileno = IO::AIO::poll_fileno
903    
904 root 1.20 Return the I<request result pipe file descriptor>. This filehandle must be
905     polled for reading by some mechanism outside this module (e.g. Event or
906     select, see below or the SYNOPSIS). If the pipe becomes readable you have
907     to call C<poll_cb> to check the results.
908 root 1.5
909     See C<poll_cb> for an example.
910    
911     =item IO::AIO::poll_cb
912    
913 root 1.86 Process some outstanding events on the result pipe. You have to call this
914 root 1.5 regularly. Returns the number of events processed. Returns immediately
915 root 1.86 when no events are outstanding. The amount of events processed depends on
916     the settings of C<IO::AIO::max_poll_req> and C<IO::AIO::max_poll_time>.
917 root 1.5
918 root 1.78 If not all requests were processed for whatever reason, the filehandle
919     will still be ready when C<poll_cb> returns.
920    
921 root 1.20 Example: Install an Event watcher that automatically calls
922     IO::AIO::poll_cb with high priority:
923 root 1.5
924     Event->io (fd => IO::AIO::poll_fileno,
925     poll => 'r', async => 1,
926     cb => \&IO::AIO::poll_cb);
927    
928 root 1.86 =item IO::AIO::max_poll_reqs $nreqs
929    
930     =item IO::AIO::max_poll_time $seconds
931    
932     These set the maximum number of requests (default C<0>, meaning infinity)
933     that are being processed by C<IO::AIO::poll_cb> in one call, respectively
934     the maximum amount of time (default C<0>, meaning infinity) spent in
935     C<IO::AIO::poll_cb> to process requests (more correctly the mininum amount
936     of time C<poll_cb> is allowed to use).
937 root 1.78
938 root 1.86 Setting these is useful if you want to ensure some level of
939     interactiveness when perl is not fast enough to process all requests in
940     time.
941 root 1.78
942 root 1.86 For interactive programs, values such as C<0.01> to C<0.1> should be fine.
943 root 1.78
944     Example: Install an Event watcher that automatically calls
945     IO::AIO::poll_some with low priority, to ensure that other parts of the
946     program get the CPU sometimes even under high AIO load.
947    
948 root 1.86 # try not to spend much more than 0.1s in poll_cb
949     IO::AIO::max_poll_time 0.1;
950    
951     # use a low priority so other tasks have priority
952 root 1.78 Event->io (fd => IO::AIO::poll_fileno,
953     poll => 'r', nice => 1,
954 root 1.86 cb => &IO::AIO::poll_cb);
955 root 1.78
956 root 1.5 =item IO::AIO::poll_wait
957    
958     Wait till the result filehandle becomes ready for reading (simply does a
959 root 1.86 C<select> on the filehandle. This is useful if you want to synchronously
960     wait for some requests to finish).
961 root 1.5
962     See C<nreqs> for an example.
963    
964 root 1.86 =item IO::AIO::poll
965 root 1.5
966 root 1.86 Waits until some requests have been handled.
967 root 1.5
968 root 1.86 Strictly equivalent to:
969 root 1.5
970     IO::AIO::poll_wait, IO::AIO::poll_cb
971 root 1.86 if IO::AIO::nreqs;
972 root 1.80
973 root 1.12 =item IO::AIO::flush
974    
975     Wait till all outstanding AIO requests have been handled.
976    
977 root 1.13 Strictly equivalent to:
978    
979     IO::AIO::poll_wait, IO::AIO::poll_cb
980     while IO::AIO::nreqs;
981    
982 root 1.86 =head3 CONTROLLING THE NUMBER OF THREADS
983 root 1.13
984 root 1.5 =item IO::AIO::min_parallel $nthreads
985    
986 root 1.61 Set the minimum number of AIO threads to C<$nthreads>. The current
987     default is C<8>, which means eight asynchronous operations can execute
988     concurrently at any one time (the number of outstanding requests,
989     however, is unlimited).
990 root 1.5
991 root 1.34 IO::AIO starts threads only on demand, when an AIO request is queued and
992 root 1.86 no free thread exists. Please note that queueing up a hundred requests can
993     create demand for a hundred threads, even if it turns out that everything
994     is in the cache and could have been processed faster by a single thread.
995 root 1.34
996 root 1.61 It is recommended to keep the number of threads relatively low, as some
997     Linux kernel versions will scale negatively with the number of threads
998     (higher parallelity => MUCH higher latency). With current Linux 2.6
999     versions, 4-32 threads should be fine.
1000 root 1.5
1001 root 1.34 Under most circumstances you don't need to call this function, as the
1002     module selects a default that is suitable for low to moderate load.
1003 root 1.5
1004     =item IO::AIO::max_parallel $nthreads
1005    
1006 root 1.34 Sets the maximum number of AIO threads to C<$nthreads>. If more than the
1007     specified number of threads are currently running, this function kills
1008     them. This function blocks until the limit is reached.
1009    
1010     While C<$nthreads> are zero, aio requests get queued but not executed
1011     until the number of threads has been increased again.
1012 root 1.5
1013     This module automatically runs C<max_parallel 0> at program end, to ensure
1014     that all threads are killed and that there are no outstanding requests.
1015    
1016     Under normal circumstances you don't need to call this function.
1017    
1018 root 1.86 =item IO::AIO::max_idle $nthreads
1019    
1020     Limit the number of threads (default: 4) that are allowed to idle (i.e.,
1021     threads that did not get a request to process within 10 seconds). That
1022     means if a thread becomes idle while C<$nthreads> other threads are also
1023     idle, it will free its resources and exit.
1024    
1025     This is useful when you allow a large number of threads (e.g. 100 or 1000)
1026     to allow for extremely high load situations, but want to free resources
1027     under normal circumstances (1000 threads can easily consume 30MB of RAM).
1028    
1029     The default is probably ok in most situations, especially if thread
1030     creation is fast. If thread creation is very slow on your system you might
1031     want to use larger values.
1032    
1033 root 1.79 =item $oldmaxreqs = IO::AIO::max_outstanding $maxreqs
1034 root 1.5
1035 root 1.79 This is a very bad function to use in interactive programs because it
1036     blocks, and a bad way to reduce concurrency because it is inexact: Better
1037     use an C<aio_group> together with a feed callback.
1038    
1039     Sets the maximum number of outstanding requests to C<$nreqs>. If you
1040     to queue up more than this number of requests, the next call to the
1041     C<poll_cb> (and C<poll_some> and other functions calling C<poll_cb>)
1042     function will block until the limit is no longer exceeded.
1043    
1044     The default value is very large, so there is no practical limit on the
1045     number of outstanding requests.
1046    
1047     You can still queue as many requests as you want. Therefore,
1048     C<max_oustsanding> is mainly useful in simple scripts (with low values) or
1049     as a stop gap to shield against fatal memory overflow (with large values).
1050 root 1.5
1051 root 1.86 =head3 STATISTICAL INFORMATION
1052    
1053     =item IO::AIO::nreqs
1054    
1055     Returns the number of requests currently in the ready, execute or pending
1056     states (i.e. for which their callback has not been invoked yet).
1057    
1058     Example: wait till there are no outstanding requests anymore:
1059    
1060     IO::AIO::poll_wait, IO::AIO::poll_cb
1061     while IO::AIO::nreqs;
1062    
1063     =item IO::AIO::nready
1064    
1065     Returns the number of requests currently in the ready state (not yet
1066     executed).
1067    
1068     =item IO::AIO::npending
1069    
1070     Returns the number of requests currently in the pending state (executed,
1071     but not yet processed by poll_cb).
1072    
1073 root 1.5 =back
1074    
1075 root 1.1 =cut
1076    
1077 root 1.2 # support function to convert a fd into a perl filehandle
1078     sub _fd2fh {
1079     return undef if $_[0] < 0;
1080    
1081 root 1.23 # try to generate nice filehandles
1082     my $sym = "IO::AIO::fd#$_[0]";
1083     local *$sym;
1084 root 1.25
1085 root 1.27 open *$sym, "+<&=$_[0]" # usually works under any unix
1086     or open *$sym, "<&=$_[0]" # cygwin needs this
1087     or open *$sym, ">&=$_[0]" # or this
1088 root 1.2 or return undef;
1089    
1090 root 1.23 *$sym
1091 root 1.2 }
1092    
1093 root 1.61 min_parallel 8;
1094 root 1.1
1095 root 1.82 END {
1096 root 1.84 min_parallel 1;
1097 root 1.82 flush;
1098     };
1099    
1100 root 1.1 1;
1101    
1102 root 1.27 =head2 FORK BEHAVIOUR
1103    
1104 root 1.52 This module should do "the right thing" when the process using it forks:
1105    
1106 root 1.34 Before the fork, IO::AIO enters a quiescent state where no requests
1107     can be added in other threads and no results will be processed. After
1108     the fork the parent simply leaves the quiescent state and continues
1109 root 1.72 request/result processing, while the child frees the request/result queue
1110     (so that the requests started before the fork will only be handled in the
1111     parent). Threads will be started on demand until the limit set in the
1112 root 1.34 parent process has been reached again.
1113 root 1.27
1114 root 1.52 In short: the parent will, after a short pause, continue as if fork had
1115     not been called, while the child will act as if IO::AIO has not been used
1116     yet.
1117    
1118 root 1.60 =head2 MEMORY USAGE
1119    
1120 root 1.72 Per-request usage:
1121    
1122     Each aio request uses - depending on your architecture - around 100-200
1123     bytes of memory. In addition, stat requests need a stat buffer (possibly
1124     a few hundred bytes), readdir requires a result buffer and so on. Perl
1125     scalars and other data passed into aio requests will also be locked and
1126     will consume memory till the request has entered the done state.
1127 root 1.60
1128     This is now awfully much, so queuing lots of requests is not usually a
1129     problem.
1130    
1131 root 1.72 Per-thread usage:
1132    
1133     In the execution phase, some aio requests require more memory for
1134     temporary buffers, and each thread requires a stack and other data
1135     structures (usually around 16k-128k, depending on the OS).
1136    
1137     =head1 KNOWN BUGS
1138    
1139 root 1.73 Known bugs will be fixed in the next release.
1140 root 1.60
1141 root 1.1 =head1 SEE ALSO
1142    
1143 root 1.68 L<Coro::AIO>.
1144 root 1.1
1145     =head1 AUTHOR
1146    
1147     Marc Lehmann <schmorp@schmorp.de>
1148     http://home.schmorp.de/
1149    
1150     =cut
1151