ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/IO-AIO/AIO.pm
Revision: 1.94
Committed: Wed Nov 8 02:01:02 2006 UTC (17 years, 6 months ago) by root
Branch: MAIN
Changes since 1.93: +3 -2 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3     IO::AIO - Asynchronous Input/Output
4    
5     =head1 SYNOPSIS
6    
7     use IO::AIO;
8    
9 root 1.6 aio_open "/etc/passwd", O_RDONLY, 0, sub {
10 root 1.94 my $fh = shift
11     or die "/etc/passwd: $!";
12 root 1.6 ...
13     };
14    
15     aio_unlink "/tmp/file", sub { };
16    
17     aio_read $fh, 30000, 1024, $buffer, 0, sub {
18 root 1.8 $_[0] > 0 or die "read error: $!";
19 root 1.6 };
20    
21 root 1.56 # version 2+ has request and group objects
22     use IO::AIO 2;
23 root 1.52
24 root 1.68 aioreq_pri 4; # give next request a very high priority
25 root 1.52 my $req = aio_unlink "/tmp/file", sub { };
26     $req->cancel; # cancel request if still in queue
27    
28 root 1.56 my $grp = aio_group sub { print "all stats done\n" };
29     add $grp aio_stat "..." for ...;
30    
31     # AnyEvent integration
32 root 1.42 open my $fh, "<&=" . IO::AIO::poll_fileno or die "$!";
33     my $w = AnyEvent->io (fh => $fh, poll => 'r', cb => sub { IO::AIO::poll_cb });
34    
35 root 1.56 # Event integration
36 root 1.6 Event->io (fd => IO::AIO::poll_fileno,
37 root 1.7 poll => 'r',
38 root 1.6 cb => \&IO::AIO::poll_cb);
39    
40 root 1.56 # Glib/Gtk2 integration
41 root 1.6 add_watch Glib::IO IO::AIO::poll_fileno,
42 root 1.22 in => sub { IO::AIO::poll_cb; 1 };
43 root 1.6
44 root 1.56 # Tk integration
45 root 1.6 Tk::Event::IO->fileevent (IO::AIO::poll_fileno, "",
46     readable => \&IO::AIO::poll_cb);
47    
48 root 1.56 # Danga::Socket integration
49 root 1.11 Danga::Socket->AddOtherFds (IO::AIO::poll_fileno =>
50     \&IO::AIO::poll_cb);
51    
52 root 1.1 =head1 DESCRIPTION
53    
54     This module implements asynchronous I/O using whatever means your
55 root 1.2 operating system supports.
56 root 1.1
57 root 1.85 Asynchronous means that operations that can normally block your program
58     (e.g. reading from disk) will be done asynchronously: the operation
59     will still block, but you can do something else in the meantime. This
60     is extremely useful for programs that need to stay interactive even
61     when doing heavy I/O (GUI programs, high performance network servers
62     etc.), but can also be used to easily do operations in parallel that are
63     normally done sequentially, e.g. stat'ing many files, which is much faster
64     on a RAID volume or over NFS when you do a number of stat operations
65     concurrently.
66    
67 root 1.90 While most of this works on all types of file descriptors (for example
68     sockets), using these functions on file descriptors that support
69     nonblocking operation (again, sockets, pipes etc.) is very inefficient or
70     might not work (aio_read fails on sockets/pipes/fifos). Use an event loop
71     for that (such as the L<Event|Event> module): IO::AIO will naturally fit
72     into such an event loop itself.
73 root 1.85
74 root 1.72 In this version, a number of threads are started that execute your
75     requests and signal their completion. You don't need thread support
76     in perl, and the threads created by this module will not be visible
77     to perl. In the future, this module might make use of the native aio
78     functions available on many operating systems. However, they are often
79 root 1.85 not well-supported or restricted (GNU/Linux doesn't allow them on normal
80 root 1.72 files currently, for example), and they would only support aio_read and
81     aio_write, so the remaining functionality would have to be implemented
82     using threads anyway.
83    
84     Although the module will work with in the presence of other (Perl-)
85     threads, it is currently not reentrant in any way, so use appropriate
86     locking yourself, always call C<poll_cb> from within the same thread, or
87     never call C<poll_cb> (or other C<aio_> functions) recursively.
88    
89 root 1.86 =head2 EXAMPLE
90    
91     This is a simple example that uses the Event module and loads
92     F</etc/passwd> asynchronously:
93    
94     use Fcntl;
95     use Event;
96     use IO::AIO;
97    
98     # register the IO::AIO callback with Event
99     Event->io (fd => IO::AIO::poll_fileno,
100     poll => 'r',
101     cb => \&IO::AIO::poll_cb);
102    
103     # queue the request to open /etc/passwd
104     aio_open "/etc/passwd", O_RDONLY, 0, sub {
105 root 1.94 my $fh = shift
106 root 1.86 or die "error while opening: $!";
107    
108     # stat'ing filehandles is generally non-blocking
109     my $size = -s $fh;
110    
111     # queue a request to read the file
112     my $contents;
113     aio_read $fh, 0, $size, $contents, 0, sub {
114     $_[0] == $size
115     or die "short read: $!";
116    
117     close $fh;
118    
119     # file contents now in $contents
120     print $contents;
121    
122     # exit event loop and program
123     Event::unloop;
124     };
125     };
126    
127     # possibly queue up other requests, or open GUI windows,
128     # check for sockets etc. etc.
129    
130     # process events as long as there are some:
131     Event::loop;
132    
133 root 1.72 =head1 REQUEST ANATOMY AND LIFETIME
134    
135     Every C<aio_*> function creates a request. which is a C data structure not
136     directly visible to Perl.
137    
138     If called in non-void context, every request function returns a Perl
139     object representing the request. In void context, nothing is returned,
140     which saves a bit of memory.
141    
142     The perl object is a fairly standard ref-to-hash object. The hash contents
143     are not used by IO::AIO so you are free to store anything you like in it.
144    
145     During their existance, aio requests travel through the following states,
146     in order:
147    
148     =over 4
149    
150     =item ready
151    
152     Immediately after a request is created it is put into the ready state,
153     waiting for a thread to execute it.
154    
155     =item execute
156    
157     A thread has accepted the request for processing and is currently
158     executing it (e.g. blocking in read).
159    
160     =item pending
161    
162     The request has been executed and is waiting for result processing.
163    
164     While request submission and execution is fully asynchronous, result
165     processing is not and relies on the perl interpreter calling C<poll_cb>
166     (or another function with the same effect).
167    
168     =item result
169    
170     The request results are processed synchronously by C<poll_cb>.
171    
172     The C<poll_cb> function will process all outstanding aio requests by
173     calling their callbacks, freeing memory associated with them and managing
174     any groups they are contained in.
175    
176     =item done
177    
178     Request has reached the end of its lifetime and holds no resources anymore
179     (except possibly for the Perl object, but its connection to the actual
180     aio request is severed and calling its methods will either do nothing or
181     result in a runtime error).
182 root 1.1
183 root 1.88 =back
184    
185 root 1.1 =cut
186    
187     package IO::AIO;
188    
189 root 1.23 no warnings;
190 root 1.51 use strict 'vars';
191 root 1.23
192 root 1.1 use base 'Exporter';
193    
194     BEGIN {
195 root 1.91 our $VERSION = '2.2';
196 root 1.1
197 root 1.67 our @AIO_REQ = qw(aio_sendfile aio_read aio_write aio_open aio_close aio_stat
198     aio_lstat aio_unlink aio_rmdir aio_readdir aio_scandir aio_symlink
199 root 1.90 aio_readlink aio_fsync aio_fdatasync aio_readahead aio_rename aio_link
200     aio_move aio_copy aio_group aio_nop aio_mknod);
201 root 1.70 our @EXPORT = (@AIO_REQ, qw(aioreq_pri aioreq_nice));
202 root 1.67 our @EXPORT_OK = qw(poll_fileno poll_cb poll_wait flush
203 root 1.86 min_parallel max_parallel max_idle
204     nreqs nready npending nthreads
205     max_poll_time max_poll_reqs);
206 root 1.1
207 root 1.54 @IO::AIO::GRP::ISA = 'IO::AIO::REQ';
208    
209 root 1.1 require XSLoader;
210 root 1.51 XSLoader::load ("IO::AIO", $VERSION);
211 root 1.1 }
212    
213 root 1.5 =head1 FUNCTIONS
214 root 1.1
215 root 1.87 =head2 AIO REQUEST FUNCTIONS
216 root 1.1
217 root 1.5 All the C<aio_*> calls are more or less thin wrappers around the syscall
218     with the same name (sans C<aio_>). The arguments are similar or identical,
219 root 1.14 and they all accept an additional (and optional) C<$callback> argument
220     which must be a code reference. This code reference will get called with
221     the syscall return code (e.g. most syscalls return C<-1> on error, unlike
222     perl, which usually delivers "false") as it's sole argument when the given
223     syscall has been executed asynchronously.
224 root 1.1
225 root 1.23 All functions expecting a filehandle keep a copy of the filehandle
226     internally until the request has finished.
227 root 1.1
228 root 1.87 All functions return request objects of type L<IO::AIO::REQ> that allow
229     further manipulation of those requests while they are in-flight.
230 root 1.52
231 root 1.28 The pathnames you pass to these routines I<must> be absolute and
232 root 1.87 encoded as octets. The reason for the former is that at the time the
233 root 1.28 request is being executed, the current working directory could have
234     changed. Alternatively, you can make sure that you never change the
235 root 1.87 current working directory anywhere in the program and then use relative
236     paths.
237 root 1.28
238 root 1.87 To encode pathnames as octets, either make sure you either: a) always pass
239     in filenames you got from outside (command line, readdir etc.) without
240     tinkering, b) are ASCII or ISO 8859-1, c) use the Encode module and encode
241 root 1.28 your pathnames to the locale (or other) encoding in effect in the user
242     environment, d) use Glib::filename_from_unicode on unicode filenames or e)
243 root 1.87 use something else to ensure your scalar has the correct contents.
244    
245     This works, btw. independent of the internal UTF-8 bit, which IO::AIO
246     handles correctly wether it is set or not.
247 root 1.1
248 root 1.5 =over 4
249 root 1.1
250 root 1.80 =item $prev_pri = aioreq_pri [$pri]
251 root 1.68
252 root 1.80 Returns the priority value that would be used for the next request and, if
253     C<$pri> is given, sets the priority for the next aio request.
254 root 1.68
255 root 1.80 The default priority is C<0>, the minimum and maximum priorities are C<-4>
256     and C<4>, respectively. Requests with higher priority will be serviced
257     first.
258    
259     The priority will be reset to C<0> after each call to one of the C<aio_*>
260 root 1.68 functions.
261    
262 root 1.69 Example: open a file with low priority, then read something from it with
263     higher priority so the read request is serviced before other low priority
264     open requests (potentially spamming the cache):
265    
266     aioreq_pri -3;
267     aio_open ..., sub {
268     return unless $_[0];
269    
270     aioreq_pri -2;
271     aio_read $_[0], ..., sub {
272     ...
273     };
274     };
275    
276     =item aioreq_nice $pri_adjust
277    
278     Similar to C<aioreq_pri>, but subtracts the given value from the current
279 root 1.87 priority, so the effect is cumulative.
280 root 1.69
281 root 1.40 =item aio_open $pathname, $flags, $mode, $callback->($fh)
282 root 1.1
283 root 1.2 Asynchronously open or create a file and call the callback with a newly
284     created filehandle for the file.
285 root 1.1
286     The pathname passed to C<aio_open> must be absolute. See API NOTES, above,
287     for an explanation.
288    
289 root 1.20 The C<$flags> argument is a bitmask. See the C<Fcntl> module for a
290     list. They are the same as used by C<sysopen>.
291    
292     Likewise, C<$mode> specifies the mode of the newly created file, if it
293     didn't exist and C<O_CREAT> has been given, just like perl's C<sysopen>,
294     except that it is mandatory (i.e. use C<0> if you don't create new files,
295     and C<0666> or C<0777> if you do).
296 root 1.1
297     Example:
298    
299     aio_open "/etc/passwd", O_RDONLY, 0, sub {
300 root 1.2 if ($_[0]) {
301     print "open successful, fh is $_[0]\n";
302 root 1.1 ...
303     } else {
304     die "open failed: $!\n";
305     }
306     };
307    
308 root 1.40 =item aio_close $fh, $callback->($status)
309 root 1.1
310 root 1.2 Asynchronously close a file and call the callback with the result
311     code. I<WARNING:> although accepted, you should not pass in a perl
312 root 1.20 filehandle here, as perl will likely close the file descriptor another
313     time when the filehandle is destroyed. Normally, you can safely call perls
314     C<close> or just let filehandles go out of scope.
315    
316     This is supposed to be a bug in the API, so that might change. It's
317     therefore best to avoid this function.
318 root 1.1
319 root 1.40 =item aio_read $fh,$offset,$length, $data,$dataoffset, $callback->($retval)
320 root 1.1
321 root 1.40 =item aio_write $fh,$offset,$length, $data,$dataoffset, $callback->($retval)
322 root 1.1
323     Reads or writes C<length> bytes from the specified C<fh> and C<offset>
324     into the scalar given by C<data> and offset C<dataoffset> and calls the
325     callback without the actual number of bytes read (or -1 on error, just
326     like the syscall).
327    
328 root 1.31 The C<$data> scalar I<MUST NOT> be modified in any way while the request
329     is outstanding. Modifying it can result in segfaults or WW3 (if the
330     necessary/optional hardware is installed).
331    
332 root 1.17 Example: Read 15 bytes at offset 7 into scalar C<$buffer>, starting at
333 root 1.1 offset C<0> within the scalar:
334    
335     aio_read $fh, 7, 15, $buffer, 0, sub {
336 root 1.9 $_[0] > 0 or die "read error: $!";
337     print "read $_[0] bytes: <$buffer>\n";
338 root 1.1 };
339    
340 root 1.40 =item aio_sendfile $out_fh, $in_fh, $in_offset, $length, $callback->($retval)
341 root 1.35
342     Tries to copy C<$length> bytes from C<$in_fh> to C<$out_fh>. It starts
343     reading at byte offset C<$in_offset>, and starts writing at the current
344     file offset of C<$out_fh>. Because of that, it is not safe to issue more
345     than one C<aio_sendfile> per C<$out_fh>, as they will interfere with each
346     other.
347    
348     This call tries to make use of a native C<sendfile> syscall to provide
349     zero-copy operation. For this to work, C<$out_fh> should refer to a
350     socket, and C<$in_fh> should refer to mmap'able file.
351    
352     If the native sendfile call fails or is not implemented, it will be
353 root 1.36 emulated, so you can call C<aio_sendfile> on any type of filehandle
354     regardless of the limitations of the operating system.
355 root 1.35
356     Please note, however, that C<aio_sendfile> can read more bytes from
357     C<$in_fh> than are written, and there is no way to find out how many
358 root 1.36 bytes have been read from C<aio_sendfile> alone, as C<aio_sendfile> only
359     provides the number of bytes written to C<$out_fh>. Only if the result
360     value equals C<$length> one can assume that C<$length> bytes have been
361     read.
362 root 1.35
363 root 1.40 =item aio_readahead $fh,$offset,$length, $callback->($retval)
364 root 1.1
365 root 1.20 C<aio_readahead> populates the page cache with data from a file so that
366 root 1.1 subsequent reads from that file will not block on disk I/O. The C<$offset>
367     argument specifies the starting point from which data is to be read and
368     C<$length> specifies the number of bytes to be read. I/O is performed in
369     whole pages, so that offset is effectively rounded down to a page boundary
370     and bytes are read up to the next page boundary greater than or equal to
371 root 1.20 (off-set+length). C<aio_readahead> does not read beyond the end of the
372 root 1.1 file. The current file offset of the file is left unchanged.
373    
374 root 1.26 If that syscall doesn't exist (likely if your OS isn't Linux) it will be
375     emulated by simply reading the data, which would have a similar effect.
376    
377 root 1.40 =item aio_stat $fh_or_path, $callback->($status)
378 root 1.1
379 root 1.40 =item aio_lstat $fh, $callback->($status)
380 root 1.1
381     Works like perl's C<stat> or C<lstat> in void context. The callback will
382     be called after the stat and the results will be available using C<stat _>
383     or C<-s _> etc...
384    
385     The pathname passed to C<aio_stat> must be absolute. See API NOTES, above,
386     for an explanation.
387    
388     Currently, the stats are always 64-bit-stats, i.e. instead of returning an
389     error when stat'ing a large file, the results will be silently truncated
390     unless perl itself is compiled with large file support.
391    
392     Example: Print the length of F</etc/passwd>:
393    
394     aio_stat "/etc/passwd", sub {
395     $_[0] and die "stat failed: $!";
396     print "size is ", -s _, "\n";
397     };
398    
399 root 1.40 =item aio_unlink $pathname, $callback->($status)
400 root 1.1
401     Asynchronously unlink (delete) a file and call the callback with the
402     result code.
403    
404 root 1.82 =item aio_mknod $path, $mode, $dev, $callback->($status)
405    
406 root 1.86 [EXPERIMENTAL]
407    
408 root 1.83 Asynchronously create a device node (or fifo). See mknod(2).
409    
410 root 1.86 The only (POSIX-) portable way of calling this function is:
411 root 1.83
412     aio_mknod $path, IO::AIO::S_IFIFO | $mode, 0, sub { ...
413 root 1.82
414 root 1.50 =item aio_link $srcpath, $dstpath, $callback->($status)
415    
416     Asynchronously create a new link to the existing object at C<$srcpath> at
417     the path C<$dstpath> and call the callback with the result code.
418    
419     =item aio_symlink $srcpath, $dstpath, $callback->($status)
420    
421     Asynchronously create a new symbolic link to the existing object at C<$srcpath> at
422     the path C<$dstpath> and call the callback with the result code.
423    
424 root 1.90 =item aio_readlink $path, $callback->($link)
425    
426     Asynchronously read the symlink specified by C<$path> and pass it to
427     the callback. If an error occurs, nothing or undef gets passed to the
428     callback.
429    
430 root 1.50 =item aio_rename $srcpath, $dstpath, $callback->($status)
431    
432     Asynchronously rename the object at C<$srcpath> to C<$dstpath>, just as
433     rename(2) and call the callback with the result code.
434    
435 root 1.40 =item aio_rmdir $pathname, $callback->($status)
436 root 1.27
437     Asynchronously rmdir (delete) a directory and call the callback with the
438     result code.
439    
440 root 1.46 =item aio_readdir $pathname, $callback->($entries)
441 root 1.37
442     Unlike the POSIX call of the same name, C<aio_readdir> reads an entire
443     directory (i.e. opendir + readdir + closedir). The entries will not be
444     sorted, and will B<NOT> include the C<.> and C<..> entries.
445    
446     The callback a single argument which is either C<undef> or an array-ref
447     with the filenames.
448    
449 root 1.82 =item aio_copy $srcpath, $dstpath, $callback->($status)
450    
451     Try to copy the I<file> (directories not supported as either source or
452     destination) from C<$srcpath> to C<$dstpath> and call the callback with
453     the C<0> (error) or C<-1> ok.
454    
455     This is a composite request that it creates the destination file with
456     mode 0200 and copies the contents of the source file into it using
457     C<aio_sendfile>, followed by restoring atime, mtime, access mode and
458     uid/gid, in that order.
459    
460     If an error occurs, the partial destination file will be unlinked, if
461     possible, except when setting atime, mtime, access mode and uid/gid, where
462     errors are being ignored.
463    
464     =cut
465    
466     sub aio_copy($$;$) {
467     my ($src, $dst, $cb) = @_;
468    
469     my $pri = aioreq_pri;
470     my $grp = aio_group $cb;
471    
472     aioreq_pri $pri;
473     add $grp aio_open $src, O_RDONLY, 0, sub {
474     if (my $src_fh = $_[0]) {
475     my @stat = stat $src_fh;
476    
477     aioreq_pri $pri;
478     add $grp aio_open $dst, O_CREAT | O_WRONLY | O_TRUNC, 0200, sub {
479     if (my $dst_fh = $_[0]) {
480     aioreq_pri $pri;
481     add $grp aio_sendfile $dst_fh, $src_fh, 0, $stat[7], sub {
482     if ($_[0] == $stat[7]) {
483     $grp->result (0);
484     close $src_fh;
485    
486     # those should not normally block. should. should.
487     utime $stat[8], $stat[9], $dst;
488     chmod $stat[2] & 07777, $dst_fh;
489     chown $stat[4], $stat[5], $dst_fh;
490     close $dst_fh;
491     } else {
492     $grp->result (-1);
493     close $src_fh;
494     close $dst_fh;
495    
496     aioreq $pri;
497     add $grp aio_unlink $dst;
498     }
499     };
500     } else {
501     $grp->result (-1);
502     }
503     },
504    
505     } else {
506     $grp->result (-1);
507     }
508     };
509    
510     $grp
511     }
512    
513     =item aio_move $srcpath, $dstpath, $callback->($status)
514    
515     Try to move the I<file> (directories not supported as either source or
516     destination) from C<$srcpath> to C<$dstpath> and call the callback with
517     the C<0> (error) or C<-1> ok.
518    
519     This is a composite request that tries to rename(2) the file first. If
520     rename files with C<EXDEV>, it copies the file with C<aio_copy> and, if
521     that is successful, unlinking the C<$srcpath>.
522    
523     =cut
524    
525     sub aio_move($$;$) {
526     my ($src, $dst, $cb) = @_;
527    
528     my $pri = aioreq_pri;
529     my $grp = aio_group $cb;
530    
531     aioreq_pri $pri;
532     add $grp aio_rename $src, $dst, sub {
533     if ($_[0] && $! == EXDEV) {
534     aioreq_pri $pri;
535     add $grp aio_copy $src, $dst, sub {
536     $grp->result ($_[0]);
537    
538     if (!$_[0]) {
539     aioreq_pri $pri;
540     add $grp aio_unlink $src;
541     }
542     };
543     } else {
544     $grp->result ($_[0]);
545     }
546     };
547    
548     $grp
549     }
550    
551 root 1.40 =item aio_scandir $path, $maxreq, $callback->($dirs, $nondirs)
552    
553 root 1.52 Scans a directory (similar to C<aio_readdir>) but additionally tries to
554 root 1.76 efficiently separate the entries of directory C<$path> into two sets of
555     names, directories you can recurse into (directories), and ones you cannot
556     recurse into (everything else, including symlinks to directories).
557 root 1.52
558 root 1.61 C<aio_scandir> is a composite request that creates of many sub requests_
559     C<$maxreq> specifies the maximum number of outstanding aio requests that
560     this function generates. If it is C<< <= 0 >>, then a suitable default
561 root 1.81 will be chosen (currently 4).
562 root 1.40
563     On error, the callback is called without arguments, otherwise it receives
564     two array-refs with path-relative entry names.
565    
566     Example:
567    
568     aio_scandir $dir, 0, sub {
569     my ($dirs, $nondirs) = @_;
570     print "real directories: @$dirs\n";
571     print "everything else: @$nondirs\n";
572     };
573    
574     Implementation notes.
575    
576     The C<aio_readdir> cannot be avoided, but C<stat()>'ing every entry can.
577    
578     After reading the directory, the modification time, size etc. of the
579 root 1.52 directory before and after the readdir is checked, and if they match (and
580     isn't the current time), the link count will be used to decide how many
581     entries are directories (if >= 2). Otherwise, no knowledge of the number
582     of subdirectories will be assumed.
583    
584     Then entries will be sorted into likely directories (everything without
585     a non-initial dot currently) and likely non-directories (everything
586     else). Then every entry plus an appended C</.> will be C<stat>'ed,
587     likely directories first. If that succeeds, it assumes that the entry
588     is a directory or a symlink to directory (which will be checked
589     seperately). This is often faster than stat'ing the entry itself because
590     filesystems might detect the type of the entry without reading the inode
591     data (e.g. ext2fs filetype feature).
592    
593     If the known number of directories (link count - 2) has been reached, the
594     rest of the entries is assumed to be non-directories.
595    
596     This only works with certainty on POSIX (= UNIX) filesystems, which
597     fortunately are the vast majority of filesystems around.
598    
599     It will also likely work on non-POSIX filesystems with reduced efficiency
600     as those tend to return 0 or 1 as link counts, which disables the
601     directory counting heuristic.
602 root 1.40
603     =cut
604    
605     sub aio_scandir($$$) {
606     my ($path, $maxreq, $cb) = @_;
607    
608 root 1.80 my $pri = aioreq_pri;
609    
610 root 1.58 my $grp = aio_group $cb;
611 root 1.55
612 root 1.81 $maxreq = 4 if $maxreq <= 0;
613 root 1.40
614     # stat once
615 root 1.80 aioreq_pri $pri;
616 root 1.55 add $grp aio_stat $path, sub {
617 root 1.58 return $grp->result () if $_[0];
618 root 1.52 my $now = time;
619 root 1.40 my $hash1 = join ":", (stat _)[0,1,3,7,9];
620    
621     # read the directory entries
622 root 1.80 aioreq_pri $pri;
623 root 1.55 add $grp aio_readdir $path, sub {
624 root 1.40 my $entries = shift
625 root 1.58 or return $grp->result ();
626 root 1.40
627     # stat the dir another time
628 root 1.80 aioreq_pri $pri;
629 root 1.55 add $grp aio_stat $path, sub {
630 root 1.40 my $hash2 = join ":", (stat _)[0,1,3,7,9];
631    
632     my $ndirs;
633    
634     # take the slow route if anything looks fishy
635 root 1.52 if ($hash1 ne $hash2 or (stat _)[9] == $now) {
636 root 1.40 $ndirs = -1;
637     } else {
638     # if nlink == 2, we are finished
639     # on non-posix-fs's, we rely on nlink < 2
640     $ndirs = (stat _)[3] - 2
641 root 1.58 or return $grp->result ([], $entries);
642 root 1.40 }
643    
644     # sort into likely dirs and likely nondirs
645     # dirs == files without ".", short entries first
646     $entries = [map $_->[0],
647     sort { $b->[1] cmp $a->[1] }
648     map [$_, sprintf "%s%04d", (/.\./ ? "1" : "0"), length],
649     @$entries];
650    
651     my (@dirs, @nondirs);
652    
653 root 1.74 my $statgrp = add $grp aio_group sub {
654     $grp->result (\@dirs, \@nondirs);
655 root 1.40 };
656    
657 root 1.74 limit $statgrp $maxreq;
658     feed $statgrp sub {
659     return unless @$entries;
660     my $entry = pop @$entries;
661    
662 root 1.80 aioreq_pri $pri;
663 root 1.74 add $statgrp aio_stat "$path/$entry/.", sub {
664     if ($_[0] < 0) {
665     push @nondirs, $entry;
666     } else {
667     # need to check for real directory
668 root 1.80 aioreq_pri $pri;
669 root 1.74 add $statgrp aio_lstat "$path/$entry", sub {
670     if (-d _) {
671     push @dirs, $entry;
672    
673 root 1.75 unless (--$ndirs) {
674 root 1.74 push @nondirs, @$entries;
675 root 1.75 feed $statgrp;
676 root 1.74 }
677     } else {
678     push @nondirs, $entry;
679 root 1.40 }
680     }
681     }
682 root 1.74 };
683 root 1.40 };
684     };
685     };
686     };
687 root 1.55
688     $grp
689 root 1.40 }
690    
691     =item aio_fsync $fh, $callback->($status)
692 root 1.1
693     Asynchronously call fsync on the given filehandle and call the callback
694     with the fsync result code.
695    
696 root 1.40 =item aio_fdatasync $fh, $callback->($status)
697 root 1.1
698     Asynchronously call fdatasync on the given filehandle and call the
699 root 1.26 callback with the fdatasync result code.
700    
701     If this call isn't available because your OS lacks it or it couldn't be
702     detected, it will be emulated by calling C<fsync> instead.
703 root 1.1
704 root 1.58 =item aio_group $callback->(...)
705 root 1.54
706 root 1.55 This is a very special aio request: Instead of doing something, it is a
707     container for other aio requests, which is useful if you want to bundle
708 root 1.71 many requests into a single, composite, request with a definite callback
709     and the ability to cancel the whole request with its subrequests.
710 root 1.55
711     Returns an object of class L<IO::AIO::GRP>. See its documentation below
712     for more info.
713    
714     Example:
715    
716     my $grp = aio_group sub {
717     print "all stats done\n";
718     };
719    
720     add $grp
721     (aio_stat ...),
722     (aio_stat ...),
723     ...;
724    
725 root 1.63 =item aio_nop $callback->()
726    
727     This is a special request - it does nothing in itself and is only used for
728     side effects, such as when you want to add a dummy request to a group so
729     that finishing the requests in the group depends on executing the given
730     code.
731    
732 root 1.64 While this request does nothing, it still goes through the execution
733     phase and still requires a worker thread. Thus, the callback will not
734     be executed immediately but only after other requests in the queue have
735     entered their execution phase. This can be used to measure request
736     latency.
737    
738 root 1.71 =item IO::AIO::aio_busy $fractional_seconds, $callback->() *NOT EXPORTED*
739 root 1.54
740     Mainly used for debugging and benchmarking, this aio request puts one of
741     the request workers to sleep for the given time.
742    
743 root 1.56 While it is theoretically handy to have simple I/O scheduling requests
744 root 1.71 like sleep and file handle readable/writable, the overhead this creates is
745     immense (it blocks a thread for a long time) so do not use this function
746     except to put your application under artificial I/O pressure.
747 root 1.56
748 root 1.5 =back
749    
750 root 1.53 =head2 IO::AIO::REQ CLASS
751 root 1.52
752     All non-aggregate C<aio_*> functions return an object of this class when
753     called in non-void context.
754    
755     =over 4
756    
757 root 1.65 =item cancel $req
758 root 1.52
759     Cancels the request, if possible. Has the effect of skipping execution
760     when entering the B<execute> state and skipping calling the callback when
761     entering the the B<result> state, but will leave the request otherwise
762     untouched. That means that requests that currently execute will not be
763     stopped and resources held by the request will not be freed prematurely.
764    
765 root 1.65 =item cb $req $callback->(...)
766    
767     Replace (or simply set) the callback registered to the request.
768    
769 root 1.52 =back
770    
771 root 1.55 =head2 IO::AIO::GRP CLASS
772    
773     This class is a subclass of L<IO::AIO::REQ>, so all its methods apply to
774     objects of this class, too.
775    
776     A IO::AIO::GRP object is a special request that can contain multiple other
777     aio requests.
778    
779     You create one by calling the C<aio_group> constructing function with a
780     callback that will be called when all contained requests have entered the
781     C<done> state:
782    
783     my $grp = aio_group sub {
784     print "all requests are done\n";
785     };
786    
787     You add requests by calling the C<add> method with one or more
788     C<IO::AIO::REQ> objects:
789    
790     $grp->add (aio_unlink "...");
791    
792 root 1.58 add $grp aio_stat "...", sub {
793     $_[0] or return $grp->result ("error");
794    
795     # add another request dynamically, if first succeeded
796     add $grp aio_open "...", sub {
797     $grp->result ("ok");
798     };
799     };
800 root 1.55
801     This makes it very easy to create composite requests (see the source of
802     C<aio_move> for an application) that work and feel like simple requests.
803    
804 root 1.62 =over 4
805    
806     =item * The IO::AIO::GRP objects will be cleaned up during calls to
807 root 1.55 C<IO::AIO::poll_cb>, just like any other request.
808    
809 root 1.62 =item * They can be canceled like any other request. Canceling will cancel not
810 root 1.59 only the request itself, but also all requests it contains.
811 root 1.55
812 root 1.62 =item * They can also can also be added to other IO::AIO::GRP objects.
813 root 1.55
814 root 1.62 =item * You must not add requests to a group from within the group callback (or
815 root 1.60 any later time).
816    
817 root 1.62 =back
818    
819 root 1.55 Their lifetime, simplified, looks like this: when they are empty, they
820     will finish very quickly. If they contain only requests that are in the
821     C<done> state, they will also finish. Otherwise they will continue to
822     exist.
823    
824 root 1.57 That means after creating a group you have some time to add requests. And
825     in the callbacks of those requests, you can add further requests to the
826     group. And only when all those requests have finished will the the group
827     itself finish.
828    
829 root 1.55 =over 4
830    
831 root 1.65 =item add $grp ...
832    
833 root 1.55 =item $grp->add (...)
834    
835 root 1.57 Add one or more requests to the group. Any type of L<IO::AIO::REQ> can
836     be added, including other groups, as long as you do not create circular
837     dependencies.
838    
839     Returns all its arguments.
840 root 1.55
841 root 1.74 =item $grp->cancel_subs
842    
843     Cancel all subrequests and clears any feeder, but not the group request
844     itself. Useful when you queued a lot of events but got a result early.
845    
846 root 1.58 =item $grp->result (...)
847    
848     Set the result value(s) that will be passed to the group callback when all
849 root 1.80 subrequests have finished and set thre groups errno to the current value
850     of errno (just like calling C<errno> without an error number). By default,
851     no argument will be passed and errno is zero.
852    
853     =item $grp->errno ([$errno])
854    
855     Sets the group errno value to C<$errno>, or the current value of errno
856     when the argument is missing.
857    
858     Every aio request has an associated errno value that is restored when
859     the callback is invoked. This method lets you change this value from its
860     default (0).
861    
862     Calling C<result> will also set errno, so make sure you either set C<$!>
863     before the call to C<result>, or call c<errno> after it.
864 root 1.58
865 root 1.65 =item feed $grp $callback->($grp)
866 root 1.60
867     Sets a feeder/generator on this group: every group can have an attached
868     generator that generates requests if idle. The idea behind this is that,
869     although you could just queue as many requests as you want in a group,
870     this might starve other requests for a potentially long time. For
871     example, C<aio_scandir> might generate hundreds of thousands C<aio_stat>
872     requests, delaying any later requests for a long time.
873    
874     To avoid this, and allow incremental generation of requests, you can
875     instead a group and set a feeder on it that generates those requests. The
876 root 1.68 feed callback will be called whenever there are few enough (see C<limit>,
877 root 1.60 below) requests active in the group itself and is expected to queue more
878     requests.
879    
880 root 1.68 The feed callback can queue as many requests as it likes (i.e. C<add> does
881     not impose any limits).
882 root 1.60
883 root 1.65 If the feed does not queue more requests when called, it will be
884 root 1.60 automatically removed from the group.
885    
886 root 1.65 If the feed limit is C<0>, it will be set to C<2> automatically.
887 root 1.60
888     Example:
889    
890     # stat all files in @files, but only ever use four aio requests concurrently:
891    
892     my $grp = aio_group sub { print "finished\n" };
893 root 1.68 limit $grp 4;
894 root 1.65 feed $grp sub {
895 root 1.60 my $file = pop @files
896     or return;
897    
898     add $grp aio_stat $file, sub { ... };
899 root 1.65 };
900 root 1.60
901 root 1.68 =item limit $grp $num
902 root 1.60
903     Sets the feeder limit for the group: The feeder will be called whenever
904     the group contains less than this many requests.
905    
906     Setting the limit to C<0> will pause the feeding process.
907    
908 root 1.55 =back
909    
910 root 1.5 =head2 SUPPORT FUNCTIONS
911    
912 root 1.86 =head3 EVENT PROCESSING AND EVENT LOOP INTEGRATION
913    
914 root 1.5 =over 4
915    
916     =item $fileno = IO::AIO::poll_fileno
917    
918 root 1.20 Return the I<request result pipe file descriptor>. This filehandle must be
919     polled for reading by some mechanism outside this module (e.g. Event or
920     select, see below or the SYNOPSIS). If the pipe becomes readable you have
921     to call C<poll_cb> to check the results.
922 root 1.5
923     See C<poll_cb> for an example.
924    
925     =item IO::AIO::poll_cb
926    
927 root 1.86 Process some outstanding events on the result pipe. You have to call this
928 root 1.5 regularly. Returns the number of events processed. Returns immediately
929 root 1.86 when no events are outstanding. The amount of events processed depends on
930     the settings of C<IO::AIO::max_poll_req> and C<IO::AIO::max_poll_time>.
931 root 1.5
932 root 1.78 If not all requests were processed for whatever reason, the filehandle
933     will still be ready when C<poll_cb> returns.
934    
935 root 1.20 Example: Install an Event watcher that automatically calls
936     IO::AIO::poll_cb with high priority:
937 root 1.5
938     Event->io (fd => IO::AIO::poll_fileno,
939     poll => 'r', async => 1,
940     cb => \&IO::AIO::poll_cb);
941    
942 root 1.86 =item IO::AIO::max_poll_reqs $nreqs
943    
944     =item IO::AIO::max_poll_time $seconds
945    
946     These set the maximum number of requests (default C<0>, meaning infinity)
947     that are being processed by C<IO::AIO::poll_cb> in one call, respectively
948     the maximum amount of time (default C<0>, meaning infinity) spent in
949     C<IO::AIO::poll_cb> to process requests (more correctly the mininum amount
950     of time C<poll_cb> is allowed to use).
951 root 1.78
952 root 1.89 Setting C<max_poll_time> to a non-zero value creates an overhead of one
953     syscall per request processed, which is not normally a problem unless your
954     callbacks are really really fast or your OS is really really slow (I am
955     not mentioning Solaris here). Using C<max_poll_reqs> incurs no overhead.
956    
957 root 1.86 Setting these is useful if you want to ensure some level of
958     interactiveness when perl is not fast enough to process all requests in
959     time.
960 root 1.78
961 root 1.86 For interactive programs, values such as C<0.01> to C<0.1> should be fine.
962 root 1.78
963     Example: Install an Event watcher that automatically calls
964 root 1.89 IO::AIO::poll_cb with low priority, to ensure that other parts of the
965 root 1.78 program get the CPU sometimes even under high AIO load.
966    
967 root 1.86 # try not to spend much more than 0.1s in poll_cb
968     IO::AIO::max_poll_time 0.1;
969    
970     # use a low priority so other tasks have priority
971 root 1.78 Event->io (fd => IO::AIO::poll_fileno,
972     poll => 'r', nice => 1,
973 root 1.86 cb => &IO::AIO::poll_cb);
974 root 1.78
975 root 1.5 =item IO::AIO::poll_wait
976    
977 root 1.93 If there are any outstanding requests and none of them in the result
978     phase, wait till the result filehandle becomes ready for reading (simply
979     does a C<select> on the filehandle. This is useful if you want to
980     synchronously wait for some requests to finish).
981 root 1.5
982     See C<nreqs> for an example.
983    
984 root 1.86 =item IO::AIO::poll
985 root 1.5
986 root 1.86 Waits until some requests have been handled.
987 root 1.5
988 root 1.92 Returns the number of requests processed, but is otherwise strictly
989     equivalent to:
990 root 1.5
991     IO::AIO::poll_wait, IO::AIO::poll_cb
992 root 1.80
993 root 1.12 =item IO::AIO::flush
994    
995     Wait till all outstanding AIO requests have been handled.
996    
997 root 1.13 Strictly equivalent to:
998    
999     IO::AIO::poll_wait, IO::AIO::poll_cb
1000     while IO::AIO::nreqs;
1001    
1002 root 1.86 =head3 CONTROLLING THE NUMBER OF THREADS
1003 root 1.13
1004 root 1.5 =item IO::AIO::min_parallel $nthreads
1005    
1006 root 1.61 Set the minimum number of AIO threads to C<$nthreads>. The current
1007     default is C<8>, which means eight asynchronous operations can execute
1008     concurrently at any one time (the number of outstanding requests,
1009     however, is unlimited).
1010 root 1.5
1011 root 1.34 IO::AIO starts threads only on demand, when an AIO request is queued and
1012 root 1.86 no free thread exists. Please note that queueing up a hundred requests can
1013     create demand for a hundred threads, even if it turns out that everything
1014     is in the cache and could have been processed faster by a single thread.
1015 root 1.34
1016 root 1.61 It is recommended to keep the number of threads relatively low, as some
1017     Linux kernel versions will scale negatively with the number of threads
1018     (higher parallelity => MUCH higher latency). With current Linux 2.6
1019     versions, 4-32 threads should be fine.
1020 root 1.5
1021 root 1.34 Under most circumstances you don't need to call this function, as the
1022     module selects a default that is suitable for low to moderate load.
1023 root 1.5
1024     =item IO::AIO::max_parallel $nthreads
1025    
1026 root 1.34 Sets the maximum number of AIO threads to C<$nthreads>. If more than the
1027     specified number of threads are currently running, this function kills
1028     them. This function blocks until the limit is reached.
1029    
1030     While C<$nthreads> are zero, aio requests get queued but not executed
1031     until the number of threads has been increased again.
1032 root 1.5
1033     This module automatically runs C<max_parallel 0> at program end, to ensure
1034     that all threads are killed and that there are no outstanding requests.
1035    
1036     Under normal circumstances you don't need to call this function.
1037    
1038 root 1.86 =item IO::AIO::max_idle $nthreads
1039    
1040     Limit the number of threads (default: 4) that are allowed to idle (i.e.,
1041     threads that did not get a request to process within 10 seconds). That
1042     means if a thread becomes idle while C<$nthreads> other threads are also
1043     idle, it will free its resources and exit.
1044    
1045     This is useful when you allow a large number of threads (e.g. 100 or 1000)
1046     to allow for extremely high load situations, but want to free resources
1047     under normal circumstances (1000 threads can easily consume 30MB of RAM).
1048    
1049     The default is probably ok in most situations, especially if thread
1050     creation is fast. If thread creation is very slow on your system you might
1051     want to use larger values.
1052    
1053 root 1.79 =item $oldmaxreqs = IO::AIO::max_outstanding $maxreqs
1054 root 1.5
1055 root 1.79 This is a very bad function to use in interactive programs because it
1056     blocks, and a bad way to reduce concurrency because it is inexact: Better
1057     use an C<aio_group> together with a feed callback.
1058    
1059     Sets the maximum number of outstanding requests to C<$nreqs>. If you
1060     to queue up more than this number of requests, the next call to the
1061     C<poll_cb> (and C<poll_some> and other functions calling C<poll_cb>)
1062     function will block until the limit is no longer exceeded.
1063    
1064     The default value is very large, so there is no practical limit on the
1065     number of outstanding requests.
1066    
1067     You can still queue as many requests as you want. Therefore,
1068     C<max_oustsanding> is mainly useful in simple scripts (with low values) or
1069     as a stop gap to shield against fatal memory overflow (with large values).
1070 root 1.5
1071 root 1.86 =head3 STATISTICAL INFORMATION
1072    
1073     =item IO::AIO::nreqs
1074    
1075     Returns the number of requests currently in the ready, execute or pending
1076     states (i.e. for which their callback has not been invoked yet).
1077    
1078     Example: wait till there are no outstanding requests anymore:
1079    
1080     IO::AIO::poll_wait, IO::AIO::poll_cb
1081     while IO::AIO::nreqs;
1082    
1083     =item IO::AIO::nready
1084    
1085     Returns the number of requests currently in the ready state (not yet
1086     executed).
1087    
1088     =item IO::AIO::npending
1089    
1090     Returns the number of requests currently in the pending state (executed,
1091     but not yet processed by poll_cb).
1092    
1093 root 1.5 =back
1094    
1095 root 1.1 =cut
1096    
1097 root 1.2 # support function to convert a fd into a perl filehandle
1098     sub _fd2fh {
1099     return undef if $_[0] < 0;
1100    
1101 root 1.23 # try to generate nice filehandles
1102     my $sym = "IO::AIO::fd#$_[0]";
1103     local *$sym;
1104 root 1.25
1105 root 1.27 open *$sym, "+<&=$_[0]" # usually works under any unix
1106     or open *$sym, "<&=$_[0]" # cygwin needs this
1107     or open *$sym, ">&=$_[0]" # or this
1108 root 1.2 or return undef;
1109    
1110 root 1.23 *$sym
1111 root 1.2 }
1112    
1113 root 1.61 min_parallel 8;
1114 root 1.1
1115 root 1.82 END {
1116 root 1.84 min_parallel 1;
1117 root 1.82 flush;
1118     };
1119    
1120 root 1.1 1;
1121    
1122 root 1.27 =head2 FORK BEHAVIOUR
1123    
1124 root 1.52 This module should do "the right thing" when the process using it forks:
1125    
1126 root 1.34 Before the fork, IO::AIO enters a quiescent state where no requests
1127     can be added in other threads and no results will be processed. After
1128     the fork the parent simply leaves the quiescent state and continues
1129 root 1.72 request/result processing, while the child frees the request/result queue
1130     (so that the requests started before the fork will only be handled in the
1131     parent). Threads will be started on demand until the limit set in the
1132 root 1.34 parent process has been reached again.
1133 root 1.27
1134 root 1.52 In short: the parent will, after a short pause, continue as if fork had
1135     not been called, while the child will act as if IO::AIO has not been used
1136     yet.
1137    
1138 root 1.60 =head2 MEMORY USAGE
1139    
1140 root 1.72 Per-request usage:
1141    
1142     Each aio request uses - depending on your architecture - around 100-200
1143     bytes of memory. In addition, stat requests need a stat buffer (possibly
1144     a few hundred bytes), readdir requires a result buffer and so on. Perl
1145     scalars and other data passed into aio requests will also be locked and
1146     will consume memory till the request has entered the done state.
1147 root 1.60
1148     This is now awfully much, so queuing lots of requests is not usually a
1149     problem.
1150    
1151 root 1.72 Per-thread usage:
1152    
1153     In the execution phase, some aio requests require more memory for
1154     temporary buffers, and each thread requires a stack and other data
1155     structures (usually around 16k-128k, depending on the OS).
1156    
1157     =head1 KNOWN BUGS
1158    
1159 root 1.73 Known bugs will be fixed in the next release.
1160 root 1.60
1161 root 1.1 =head1 SEE ALSO
1162    
1163 root 1.68 L<Coro::AIO>.
1164 root 1.1
1165     =head1 AUTHOR
1166    
1167     Marc Lehmann <schmorp@schmorp.de>
1168     http://home.schmorp.de/
1169    
1170     =cut
1171