ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/IO-AIO/AIO.pm
Revision: 1.87
Committed: Sun Oct 29 00:52:02 2006 UTC (17 years, 7 months ago) by root
Branch: MAIN
CVS Tags: rel-2_1
Changes since 1.86: +14 -10 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3     IO::AIO - Asynchronous Input/Output
4    
5     =head1 SYNOPSIS
6    
7     use IO::AIO;
8    
9 root 1.6 aio_open "/etc/passwd", O_RDONLY, 0, sub {
10     my ($fh) = @_;
11     ...
12     };
13    
14     aio_unlink "/tmp/file", sub { };
15    
16     aio_read $fh, 30000, 1024, $buffer, 0, sub {
17 root 1.8 $_[0] > 0 or die "read error: $!";
18 root 1.6 };
19    
20 root 1.56 # version 2+ has request and group objects
21     use IO::AIO 2;
22 root 1.52
23 root 1.68 aioreq_pri 4; # give next request a very high priority
24 root 1.52 my $req = aio_unlink "/tmp/file", sub { };
25     $req->cancel; # cancel request if still in queue
26    
27 root 1.56 my $grp = aio_group sub { print "all stats done\n" };
28     add $grp aio_stat "..." for ...;
29    
30     # AnyEvent integration
31 root 1.42 open my $fh, "<&=" . IO::AIO::poll_fileno or die "$!";
32     my $w = AnyEvent->io (fh => $fh, poll => 'r', cb => sub { IO::AIO::poll_cb });
33    
34 root 1.56 # Event integration
35 root 1.6 Event->io (fd => IO::AIO::poll_fileno,
36 root 1.7 poll => 'r',
37 root 1.6 cb => \&IO::AIO::poll_cb);
38    
39 root 1.56 # Glib/Gtk2 integration
40 root 1.6 add_watch Glib::IO IO::AIO::poll_fileno,
41 root 1.22 in => sub { IO::AIO::poll_cb; 1 };
42 root 1.6
43 root 1.56 # Tk integration
44 root 1.6 Tk::Event::IO->fileevent (IO::AIO::poll_fileno, "",
45     readable => \&IO::AIO::poll_cb);
46    
47 root 1.56 # Danga::Socket integration
48 root 1.11 Danga::Socket->AddOtherFds (IO::AIO::poll_fileno =>
49     \&IO::AIO::poll_cb);
50    
51 root 1.1 =head1 DESCRIPTION
52    
53     This module implements asynchronous I/O using whatever means your
54 root 1.2 operating system supports.
55 root 1.1
56 root 1.85 Asynchronous means that operations that can normally block your program
57     (e.g. reading from disk) will be done asynchronously: the operation
58     will still block, but you can do something else in the meantime. This
59     is extremely useful for programs that need to stay interactive even
60     when doing heavy I/O (GUI programs, high performance network servers
61     etc.), but can also be used to easily do operations in parallel that are
62     normally done sequentially, e.g. stat'ing many files, which is much faster
63     on a RAID volume or over NFS when you do a number of stat operations
64     concurrently.
65    
66     While this works on all types of file descriptors (for example sockets),
67     using these functions on file descriptors that support nonblocking
68     operation (again, sockets, pipes etc.) is very inefficient. Use an event
69     loop for that (such as the L<Event|Event> module): IO::AIO will naturally
70     fit into such an event loop itself.
71    
72 root 1.72 In this version, a number of threads are started that execute your
73     requests and signal their completion. You don't need thread support
74     in perl, and the threads created by this module will not be visible
75     to perl. In the future, this module might make use of the native aio
76     functions available on many operating systems. However, they are often
77 root 1.85 not well-supported or restricted (GNU/Linux doesn't allow them on normal
78 root 1.72 files currently, for example), and they would only support aio_read and
79     aio_write, so the remaining functionality would have to be implemented
80     using threads anyway.
81    
82     Although the module will work with in the presence of other (Perl-)
83     threads, it is currently not reentrant in any way, so use appropriate
84     locking yourself, always call C<poll_cb> from within the same thread, or
85     never call C<poll_cb> (or other C<aio_> functions) recursively.
86    
87 root 1.86 =head2 EXAMPLE
88    
89     This is a simple example that uses the Event module and loads
90     F</etc/passwd> asynchronously:
91    
92     use Fcntl;
93     use Event;
94     use IO::AIO;
95    
96     # register the IO::AIO callback with Event
97     Event->io (fd => IO::AIO::poll_fileno,
98     poll => 'r',
99     cb => \&IO::AIO::poll_cb);
100    
101     # queue the request to open /etc/passwd
102     aio_open "/etc/passwd", O_RDONLY, 0, sub {
103     my $fh = $_[0]
104     or die "error while opening: $!";
105    
106     # stat'ing filehandles is generally non-blocking
107     my $size = -s $fh;
108    
109     # queue a request to read the file
110     my $contents;
111     aio_read $fh, 0, $size, $contents, 0, sub {
112     $_[0] == $size
113     or die "short read: $!";
114    
115     close $fh;
116    
117     # file contents now in $contents
118     print $contents;
119    
120     # exit event loop and program
121     Event::unloop;
122     };
123     };
124    
125     # possibly queue up other requests, or open GUI windows,
126     # check for sockets etc. etc.
127    
128     # process events as long as there are some:
129     Event::loop;
130    
131 root 1.72 =head1 REQUEST ANATOMY AND LIFETIME
132    
133     Every C<aio_*> function creates a request. which is a C data structure not
134     directly visible to Perl.
135    
136     If called in non-void context, every request function returns a Perl
137     object representing the request. In void context, nothing is returned,
138     which saves a bit of memory.
139    
140     The perl object is a fairly standard ref-to-hash object. The hash contents
141     are not used by IO::AIO so you are free to store anything you like in it.
142    
143     During their existance, aio requests travel through the following states,
144     in order:
145    
146     =over 4
147    
148     =item ready
149    
150     Immediately after a request is created it is put into the ready state,
151     waiting for a thread to execute it.
152    
153     =item execute
154    
155     A thread has accepted the request for processing and is currently
156     executing it (e.g. blocking in read).
157    
158     =item pending
159    
160     The request has been executed and is waiting for result processing.
161    
162     While request submission and execution is fully asynchronous, result
163     processing is not and relies on the perl interpreter calling C<poll_cb>
164     (or another function with the same effect).
165    
166     =item result
167    
168     The request results are processed synchronously by C<poll_cb>.
169    
170     The C<poll_cb> function will process all outstanding aio requests by
171     calling their callbacks, freeing memory associated with them and managing
172     any groups they are contained in.
173    
174     =item done
175    
176     Request has reached the end of its lifetime and holds no resources anymore
177     (except possibly for the Perl object, but its connection to the actual
178     aio request is severed and calling its methods will either do nothing or
179     result in a runtime error).
180 root 1.1
181     =cut
182    
183     package IO::AIO;
184    
185 root 1.23 no warnings;
186 root 1.51 use strict 'vars';
187 root 1.23
188 root 1.1 use base 'Exporter';
189    
190     BEGIN {
191 root 1.86 our $VERSION = '2.1';
192 root 1.1
193 root 1.67 our @AIO_REQ = qw(aio_sendfile aio_read aio_write aio_open aio_close aio_stat
194     aio_lstat aio_unlink aio_rmdir aio_readdir aio_scandir aio_symlink
195     aio_fsync aio_fdatasync aio_readahead aio_rename aio_link aio_move
196 root 1.82 aio_copy aio_group aio_nop aio_mknod);
197 root 1.70 our @EXPORT = (@AIO_REQ, qw(aioreq_pri aioreq_nice));
198 root 1.67 our @EXPORT_OK = qw(poll_fileno poll_cb poll_wait flush
199 root 1.86 min_parallel max_parallel max_idle
200     nreqs nready npending nthreads
201     max_poll_time max_poll_reqs);
202 root 1.1
203 root 1.54 @IO::AIO::GRP::ISA = 'IO::AIO::REQ';
204    
205 root 1.1 require XSLoader;
206 root 1.51 XSLoader::load ("IO::AIO", $VERSION);
207 root 1.1 }
208    
209 root 1.5 =head1 FUNCTIONS
210 root 1.1
211 root 1.87 =head2 AIO REQUEST FUNCTIONS
212 root 1.1
213 root 1.5 All the C<aio_*> calls are more or less thin wrappers around the syscall
214     with the same name (sans C<aio_>). The arguments are similar or identical,
215 root 1.14 and they all accept an additional (and optional) C<$callback> argument
216     which must be a code reference. This code reference will get called with
217     the syscall return code (e.g. most syscalls return C<-1> on error, unlike
218     perl, which usually delivers "false") as it's sole argument when the given
219     syscall has been executed asynchronously.
220 root 1.1
221 root 1.23 All functions expecting a filehandle keep a copy of the filehandle
222     internally until the request has finished.
223 root 1.1
224 root 1.87 All functions return request objects of type L<IO::AIO::REQ> that allow
225     further manipulation of those requests while they are in-flight.
226 root 1.52
227 root 1.28 The pathnames you pass to these routines I<must> be absolute and
228 root 1.87 encoded as octets. The reason for the former is that at the time the
229 root 1.28 request is being executed, the current working directory could have
230     changed. Alternatively, you can make sure that you never change the
231 root 1.87 current working directory anywhere in the program and then use relative
232     paths.
233 root 1.28
234 root 1.87 To encode pathnames as octets, either make sure you either: a) always pass
235     in filenames you got from outside (command line, readdir etc.) without
236     tinkering, b) are ASCII or ISO 8859-1, c) use the Encode module and encode
237 root 1.28 your pathnames to the locale (or other) encoding in effect in the user
238     environment, d) use Glib::filename_from_unicode on unicode filenames or e)
239 root 1.87 use something else to ensure your scalar has the correct contents.
240    
241     This works, btw. independent of the internal UTF-8 bit, which IO::AIO
242     handles correctly wether it is set or not.
243 root 1.1
244 root 1.5 =over 4
245 root 1.1
246 root 1.80 =item $prev_pri = aioreq_pri [$pri]
247 root 1.68
248 root 1.80 Returns the priority value that would be used for the next request and, if
249     C<$pri> is given, sets the priority for the next aio request.
250 root 1.68
251 root 1.80 The default priority is C<0>, the minimum and maximum priorities are C<-4>
252     and C<4>, respectively. Requests with higher priority will be serviced
253     first.
254    
255     The priority will be reset to C<0> after each call to one of the C<aio_*>
256 root 1.68 functions.
257    
258 root 1.69 Example: open a file with low priority, then read something from it with
259     higher priority so the read request is serviced before other low priority
260     open requests (potentially spamming the cache):
261    
262     aioreq_pri -3;
263     aio_open ..., sub {
264     return unless $_[0];
265    
266     aioreq_pri -2;
267     aio_read $_[0], ..., sub {
268     ...
269     };
270     };
271    
272     =item aioreq_nice $pri_adjust
273    
274     Similar to C<aioreq_pri>, but subtracts the given value from the current
275 root 1.87 priority, so the effect is cumulative.
276 root 1.69
277 root 1.40 =item aio_open $pathname, $flags, $mode, $callback->($fh)
278 root 1.1
279 root 1.2 Asynchronously open or create a file and call the callback with a newly
280     created filehandle for the file.
281 root 1.1
282     The pathname passed to C<aio_open> must be absolute. See API NOTES, above,
283     for an explanation.
284    
285 root 1.20 The C<$flags> argument is a bitmask. See the C<Fcntl> module for a
286     list. They are the same as used by C<sysopen>.
287    
288     Likewise, C<$mode> specifies the mode of the newly created file, if it
289     didn't exist and C<O_CREAT> has been given, just like perl's C<sysopen>,
290     except that it is mandatory (i.e. use C<0> if you don't create new files,
291     and C<0666> or C<0777> if you do).
292 root 1.1
293     Example:
294    
295     aio_open "/etc/passwd", O_RDONLY, 0, sub {
296 root 1.2 if ($_[0]) {
297     print "open successful, fh is $_[0]\n";
298 root 1.1 ...
299     } else {
300     die "open failed: $!\n";
301     }
302     };
303    
304 root 1.40 =item aio_close $fh, $callback->($status)
305 root 1.1
306 root 1.2 Asynchronously close a file and call the callback with the result
307     code. I<WARNING:> although accepted, you should not pass in a perl
308 root 1.20 filehandle here, as perl will likely close the file descriptor another
309     time when the filehandle is destroyed. Normally, you can safely call perls
310     C<close> or just let filehandles go out of scope.
311    
312     This is supposed to be a bug in the API, so that might change. It's
313     therefore best to avoid this function.
314 root 1.1
315 root 1.40 =item aio_read $fh,$offset,$length, $data,$dataoffset, $callback->($retval)
316 root 1.1
317 root 1.40 =item aio_write $fh,$offset,$length, $data,$dataoffset, $callback->($retval)
318 root 1.1
319     Reads or writes C<length> bytes from the specified C<fh> and C<offset>
320     into the scalar given by C<data> and offset C<dataoffset> and calls the
321     callback without the actual number of bytes read (or -1 on error, just
322     like the syscall).
323    
324 root 1.31 The C<$data> scalar I<MUST NOT> be modified in any way while the request
325     is outstanding. Modifying it can result in segfaults or WW3 (if the
326     necessary/optional hardware is installed).
327    
328 root 1.17 Example: Read 15 bytes at offset 7 into scalar C<$buffer>, starting at
329 root 1.1 offset C<0> within the scalar:
330    
331     aio_read $fh, 7, 15, $buffer, 0, sub {
332 root 1.9 $_[0] > 0 or die "read error: $!";
333     print "read $_[0] bytes: <$buffer>\n";
334 root 1.1 };
335    
336 root 1.40 =item aio_sendfile $out_fh, $in_fh, $in_offset, $length, $callback->($retval)
337 root 1.35
338     Tries to copy C<$length> bytes from C<$in_fh> to C<$out_fh>. It starts
339     reading at byte offset C<$in_offset>, and starts writing at the current
340     file offset of C<$out_fh>. Because of that, it is not safe to issue more
341     than one C<aio_sendfile> per C<$out_fh>, as they will interfere with each
342     other.
343    
344     This call tries to make use of a native C<sendfile> syscall to provide
345     zero-copy operation. For this to work, C<$out_fh> should refer to a
346     socket, and C<$in_fh> should refer to mmap'able file.
347    
348     If the native sendfile call fails or is not implemented, it will be
349 root 1.36 emulated, so you can call C<aio_sendfile> on any type of filehandle
350     regardless of the limitations of the operating system.
351 root 1.35
352     Please note, however, that C<aio_sendfile> can read more bytes from
353     C<$in_fh> than are written, and there is no way to find out how many
354 root 1.36 bytes have been read from C<aio_sendfile> alone, as C<aio_sendfile> only
355     provides the number of bytes written to C<$out_fh>. Only if the result
356     value equals C<$length> one can assume that C<$length> bytes have been
357     read.
358 root 1.35
359 root 1.40 =item aio_readahead $fh,$offset,$length, $callback->($retval)
360 root 1.1
361 root 1.20 C<aio_readahead> populates the page cache with data from a file so that
362 root 1.1 subsequent reads from that file will not block on disk I/O. The C<$offset>
363     argument specifies the starting point from which data is to be read and
364     C<$length> specifies the number of bytes to be read. I/O is performed in
365     whole pages, so that offset is effectively rounded down to a page boundary
366     and bytes are read up to the next page boundary greater than or equal to
367 root 1.20 (off-set+length). C<aio_readahead> does not read beyond the end of the
368 root 1.1 file. The current file offset of the file is left unchanged.
369    
370 root 1.26 If that syscall doesn't exist (likely if your OS isn't Linux) it will be
371     emulated by simply reading the data, which would have a similar effect.
372    
373 root 1.40 =item aio_stat $fh_or_path, $callback->($status)
374 root 1.1
375 root 1.40 =item aio_lstat $fh, $callback->($status)
376 root 1.1
377     Works like perl's C<stat> or C<lstat> in void context. The callback will
378     be called after the stat and the results will be available using C<stat _>
379     or C<-s _> etc...
380    
381     The pathname passed to C<aio_stat> must be absolute. See API NOTES, above,
382     for an explanation.
383    
384     Currently, the stats are always 64-bit-stats, i.e. instead of returning an
385     error when stat'ing a large file, the results will be silently truncated
386     unless perl itself is compiled with large file support.
387    
388     Example: Print the length of F</etc/passwd>:
389    
390     aio_stat "/etc/passwd", sub {
391     $_[0] and die "stat failed: $!";
392     print "size is ", -s _, "\n";
393     };
394    
395 root 1.40 =item aio_unlink $pathname, $callback->($status)
396 root 1.1
397     Asynchronously unlink (delete) a file and call the callback with the
398     result code.
399    
400 root 1.82 =item aio_mknod $path, $mode, $dev, $callback->($status)
401    
402 root 1.86 [EXPERIMENTAL]
403    
404 root 1.83 Asynchronously create a device node (or fifo). See mknod(2).
405    
406 root 1.86 The only (POSIX-) portable way of calling this function is:
407 root 1.83
408     aio_mknod $path, IO::AIO::S_IFIFO | $mode, 0, sub { ...
409 root 1.82
410 root 1.50 =item aio_link $srcpath, $dstpath, $callback->($status)
411    
412     Asynchronously create a new link to the existing object at C<$srcpath> at
413     the path C<$dstpath> and call the callback with the result code.
414    
415     =item aio_symlink $srcpath, $dstpath, $callback->($status)
416    
417     Asynchronously create a new symbolic link to the existing object at C<$srcpath> at
418     the path C<$dstpath> and call the callback with the result code.
419    
420     =item aio_rename $srcpath, $dstpath, $callback->($status)
421    
422     Asynchronously rename the object at C<$srcpath> to C<$dstpath>, just as
423     rename(2) and call the callback with the result code.
424    
425 root 1.40 =item aio_rmdir $pathname, $callback->($status)
426 root 1.27
427     Asynchronously rmdir (delete) a directory and call the callback with the
428     result code.
429    
430 root 1.46 =item aio_readdir $pathname, $callback->($entries)
431 root 1.37
432     Unlike the POSIX call of the same name, C<aio_readdir> reads an entire
433     directory (i.e. opendir + readdir + closedir). The entries will not be
434     sorted, and will B<NOT> include the C<.> and C<..> entries.
435    
436     The callback a single argument which is either C<undef> or an array-ref
437     with the filenames.
438    
439 root 1.82 =item aio_copy $srcpath, $dstpath, $callback->($status)
440    
441     Try to copy the I<file> (directories not supported as either source or
442     destination) from C<$srcpath> to C<$dstpath> and call the callback with
443     the C<0> (error) or C<-1> ok.
444    
445     This is a composite request that it creates the destination file with
446     mode 0200 and copies the contents of the source file into it using
447     C<aio_sendfile>, followed by restoring atime, mtime, access mode and
448     uid/gid, in that order.
449    
450     If an error occurs, the partial destination file will be unlinked, if
451     possible, except when setting atime, mtime, access mode and uid/gid, where
452     errors are being ignored.
453    
454     =cut
455    
456     sub aio_copy($$;$) {
457     my ($src, $dst, $cb) = @_;
458    
459     my $pri = aioreq_pri;
460     my $grp = aio_group $cb;
461    
462     aioreq_pri $pri;
463     add $grp aio_open $src, O_RDONLY, 0, sub {
464     if (my $src_fh = $_[0]) {
465     my @stat = stat $src_fh;
466    
467     aioreq_pri $pri;
468     add $grp aio_open $dst, O_CREAT | O_WRONLY | O_TRUNC, 0200, sub {
469     if (my $dst_fh = $_[0]) {
470     aioreq_pri $pri;
471     add $grp aio_sendfile $dst_fh, $src_fh, 0, $stat[7], sub {
472     if ($_[0] == $stat[7]) {
473     $grp->result (0);
474     close $src_fh;
475    
476     # those should not normally block. should. should.
477     utime $stat[8], $stat[9], $dst;
478     chmod $stat[2] & 07777, $dst_fh;
479     chown $stat[4], $stat[5], $dst_fh;
480     close $dst_fh;
481     } else {
482     $grp->result (-1);
483     close $src_fh;
484     close $dst_fh;
485    
486     aioreq $pri;
487     add $grp aio_unlink $dst;
488     }
489     };
490     } else {
491     $grp->result (-1);
492     }
493     },
494    
495     } else {
496     $grp->result (-1);
497     }
498     };
499    
500     $grp
501     }
502    
503     =item aio_move $srcpath, $dstpath, $callback->($status)
504    
505     Try to move the I<file> (directories not supported as either source or
506     destination) from C<$srcpath> to C<$dstpath> and call the callback with
507     the C<0> (error) or C<-1> ok.
508    
509     This is a composite request that tries to rename(2) the file first. If
510     rename files with C<EXDEV>, it copies the file with C<aio_copy> and, if
511     that is successful, unlinking the C<$srcpath>.
512    
513     =cut
514    
515     sub aio_move($$;$) {
516     my ($src, $dst, $cb) = @_;
517    
518     my $pri = aioreq_pri;
519     my $grp = aio_group $cb;
520    
521     aioreq_pri $pri;
522     add $grp aio_rename $src, $dst, sub {
523     if ($_[0] && $! == EXDEV) {
524     aioreq_pri $pri;
525     add $grp aio_copy $src, $dst, sub {
526     $grp->result ($_[0]);
527    
528     if (!$_[0]) {
529     aioreq_pri $pri;
530     add $grp aio_unlink $src;
531     }
532     };
533     } else {
534     $grp->result ($_[0]);
535     }
536     };
537    
538     $grp
539     }
540    
541 root 1.40 =item aio_scandir $path, $maxreq, $callback->($dirs, $nondirs)
542    
543 root 1.52 Scans a directory (similar to C<aio_readdir>) but additionally tries to
544 root 1.76 efficiently separate the entries of directory C<$path> into two sets of
545     names, directories you can recurse into (directories), and ones you cannot
546     recurse into (everything else, including symlinks to directories).
547 root 1.52
548 root 1.61 C<aio_scandir> is a composite request that creates of many sub requests_
549     C<$maxreq> specifies the maximum number of outstanding aio requests that
550     this function generates. If it is C<< <= 0 >>, then a suitable default
551 root 1.81 will be chosen (currently 4).
552 root 1.40
553     On error, the callback is called without arguments, otherwise it receives
554     two array-refs with path-relative entry names.
555    
556     Example:
557    
558     aio_scandir $dir, 0, sub {
559     my ($dirs, $nondirs) = @_;
560     print "real directories: @$dirs\n";
561     print "everything else: @$nondirs\n";
562     };
563    
564     Implementation notes.
565    
566     The C<aio_readdir> cannot be avoided, but C<stat()>'ing every entry can.
567    
568     After reading the directory, the modification time, size etc. of the
569 root 1.52 directory before and after the readdir is checked, and if they match (and
570     isn't the current time), the link count will be used to decide how many
571     entries are directories (if >= 2). Otherwise, no knowledge of the number
572     of subdirectories will be assumed.
573    
574     Then entries will be sorted into likely directories (everything without
575     a non-initial dot currently) and likely non-directories (everything
576     else). Then every entry plus an appended C</.> will be C<stat>'ed,
577     likely directories first. If that succeeds, it assumes that the entry
578     is a directory or a symlink to directory (which will be checked
579     seperately). This is often faster than stat'ing the entry itself because
580     filesystems might detect the type of the entry without reading the inode
581     data (e.g. ext2fs filetype feature).
582    
583     If the known number of directories (link count - 2) has been reached, the
584     rest of the entries is assumed to be non-directories.
585    
586     This only works with certainty on POSIX (= UNIX) filesystems, which
587     fortunately are the vast majority of filesystems around.
588    
589     It will also likely work on non-POSIX filesystems with reduced efficiency
590     as those tend to return 0 or 1 as link counts, which disables the
591     directory counting heuristic.
592 root 1.40
593     =cut
594    
595     sub aio_scandir($$$) {
596     my ($path, $maxreq, $cb) = @_;
597    
598 root 1.80 my $pri = aioreq_pri;
599    
600 root 1.58 my $grp = aio_group $cb;
601 root 1.55
602 root 1.81 $maxreq = 4 if $maxreq <= 0;
603 root 1.40
604     # stat once
605 root 1.80 aioreq_pri $pri;
606 root 1.55 add $grp aio_stat $path, sub {
607 root 1.58 return $grp->result () if $_[0];
608 root 1.52 my $now = time;
609 root 1.40 my $hash1 = join ":", (stat _)[0,1,3,7,9];
610    
611     # read the directory entries
612 root 1.80 aioreq_pri $pri;
613 root 1.55 add $grp aio_readdir $path, sub {
614 root 1.40 my $entries = shift
615 root 1.58 or return $grp->result ();
616 root 1.40
617     # stat the dir another time
618 root 1.80 aioreq_pri $pri;
619 root 1.55 add $grp aio_stat $path, sub {
620 root 1.40 my $hash2 = join ":", (stat _)[0,1,3,7,9];
621    
622     my $ndirs;
623    
624     # take the slow route if anything looks fishy
625 root 1.52 if ($hash1 ne $hash2 or (stat _)[9] == $now) {
626 root 1.40 $ndirs = -1;
627     } else {
628     # if nlink == 2, we are finished
629     # on non-posix-fs's, we rely on nlink < 2
630     $ndirs = (stat _)[3] - 2
631 root 1.58 or return $grp->result ([], $entries);
632 root 1.40 }
633    
634     # sort into likely dirs and likely nondirs
635     # dirs == files without ".", short entries first
636     $entries = [map $_->[0],
637     sort { $b->[1] cmp $a->[1] }
638     map [$_, sprintf "%s%04d", (/.\./ ? "1" : "0"), length],
639     @$entries];
640    
641     my (@dirs, @nondirs);
642    
643 root 1.74 my $statgrp = add $grp aio_group sub {
644     $grp->result (\@dirs, \@nondirs);
645 root 1.40 };
646    
647 root 1.74 limit $statgrp $maxreq;
648     feed $statgrp sub {
649     return unless @$entries;
650     my $entry = pop @$entries;
651    
652 root 1.80 aioreq_pri $pri;
653 root 1.74 add $statgrp aio_stat "$path/$entry/.", sub {
654     if ($_[0] < 0) {
655     push @nondirs, $entry;
656     } else {
657     # need to check for real directory
658 root 1.80 aioreq_pri $pri;
659 root 1.74 add $statgrp aio_lstat "$path/$entry", sub {
660     if (-d _) {
661     push @dirs, $entry;
662    
663 root 1.75 unless (--$ndirs) {
664 root 1.74 push @nondirs, @$entries;
665 root 1.75 feed $statgrp;
666 root 1.74 }
667     } else {
668     push @nondirs, $entry;
669 root 1.40 }
670     }
671     }
672 root 1.74 };
673 root 1.40 };
674     };
675     };
676     };
677 root 1.55
678     $grp
679 root 1.40 }
680    
681     =item aio_fsync $fh, $callback->($status)
682 root 1.1
683     Asynchronously call fsync on the given filehandle and call the callback
684     with the fsync result code.
685    
686 root 1.40 =item aio_fdatasync $fh, $callback->($status)
687 root 1.1
688     Asynchronously call fdatasync on the given filehandle and call the
689 root 1.26 callback with the fdatasync result code.
690    
691     If this call isn't available because your OS lacks it or it couldn't be
692     detected, it will be emulated by calling C<fsync> instead.
693 root 1.1
694 root 1.58 =item aio_group $callback->(...)
695 root 1.54
696 root 1.55 This is a very special aio request: Instead of doing something, it is a
697     container for other aio requests, which is useful if you want to bundle
698 root 1.71 many requests into a single, composite, request with a definite callback
699     and the ability to cancel the whole request with its subrequests.
700 root 1.55
701     Returns an object of class L<IO::AIO::GRP>. See its documentation below
702     for more info.
703    
704     Example:
705    
706     my $grp = aio_group sub {
707     print "all stats done\n";
708     };
709    
710     add $grp
711     (aio_stat ...),
712     (aio_stat ...),
713     ...;
714    
715 root 1.63 =item aio_nop $callback->()
716    
717     This is a special request - it does nothing in itself and is only used for
718     side effects, such as when you want to add a dummy request to a group so
719     that finishing the requests in the group depends on executing the given
720     code.
721    
722 root 1.64 While this request does nothing, it still goes through the execution
723     phase and still requires a worker thread. Thus, the callback will not
724     be executed immediately but only after other requests in the queue have
725     entered their execution phase. This can be used to measure request
726     latency.
727    
728 root 1.71 =item IO::AIO::aio_busy $fractional_seconds, $callback->() *NOT EXPORTED*
729 root 1.54
730     Mainly used for debugging and benchmarking, this aio request puts one of
731     the request workers to sleep for the given time.
732    
733 root 1.56 While it is theoretically handy to have simple I/O scheduling requests
734 root 1.71 like sleep and file handle readable/writable, the overhead this creates is
735     immense (it blocks a thread for a long time) so do not use this function
736     except to put your application under artificial I/O pressure.
737 root 1.56
738 root 1.5 =back
739    
740 root 1.53 =head2 IO::AIO::REQ CLASS
741 root 1.52
742     All non-aggregate C<aio_*> functions return an object of this class when
743     called in non-void context.
744    
745     =over 4
746    
747 root 1.65 =item cancel $req
748 root 1.52
749     Cancels the request, if possible. Has the effect of skipping execution
750     when entering the B<execute> state and skipping calling the callback when
751     entering the the B<result> state, but will leave the request otherwise
752     untouched. That means that requests that currently execute will not be
753     stopped and resources held by the request will not be freed prematurely.
754    
755 root 1.65 =item cb $req $callback->(...)
756    
757     Replace (or simply set) the callback registered to the request.
758    
759 root 1.52 =back
760    
761 root 1.55 =head2 IO::AIO::GRP CLASS
762    
763     This class is a subclass of L<IO::AIO::REQ>, so all its methods apply to
764     objects of this class, too.
765    
766     A IO::AIO::GRP object is a special request that can contain multiple other
767     aio requests.
768    
769     You create one by calling the C<aio_group> constructing function with a
770     callback that will be called when all contained requests have entered the
771     C<done> state:
772    
773     my $grp = aio_group sub {
774     print "all requests are done\n";
775     };
776    
777     You add requests by calling the C<add> method with one or more
778     C<IO::AIO::REQ> objects:
779    
780     $grp->add (aio_unlink "...");
781    
782 root 1.58 add $grp aio_stat "...", sub {
783     $_[0] or return $grp->result ("error");
784    
785     # add another request dynamically, if first succeeded
786     add $grp aio_open "...", sub {
787     $grp->result ("ok");
788     };
789     };
790 root 1.55
791     This makes it very easy to create composite requests (see the source of
792     C<aio_move> for an application) that work and feel like simple requests.
793    
794 root 1.62 =over 4
795    
796     =item * The IO::AIO::GRP objects will be cleaned up during calls to
797 root 1.55 C<IO::AIO::poll_cb>, just like any other request.
798    
799 root 1.62 =item * They can be canceled like any other request. Canceling will cancel not
800 root 1.59 only the request itself, but also all requests it contains.
801 root 1.55
802 root 1.62 =item * They can also can also be added to other IO::AIO::GRP objects.
803 root 1.55
804 root 1.62 =item * You must not add requests to a group from within the group callback (or
805 root 1.60 any later time).
806    
807 root 1.62 =back
808    
809 root 1.55 Their lifetime, simplified, looks like this: when they are empty, they
810     will finish very quickly. If they contain only requests that are in the
811     C<done> state, they will also finish. Otherwise they will continue to
812     exist.
813    
814 root 1.57 That means after creating a group you have some time to add requests. And
815     in the callbacks of those requests, you can add further requests to the
816     group. And only when all those requests have finished will the the group
817     itself finish.
818    
819 root 1.55 =over 4
820    
821 root 1.65 =item add $grp ...
822    
823 root 1.55 =item $grp->add (...)
824    
825 root 1.57 Add one or more requests to the group. Any type of L<IO::AIO::REQ> can
826     be added, including other groups, as long as you do not create circular
827     dependencies.
828    
829     Returns all its arguments.
830 root 1.55
831 root 1.74 =item $grp->cancel_subs
832    
833     Cancel all subrequests and clears any feeder, but not the group request
834     itself. Useful when you queued a lot of events but got a result early.
835    
836 root 1.58 =item $grp->result (...)
837    
838     Set the result value(s) that will be passed to the group callback when all
839 root 1.80 subrequests have finished and set thre groups errno to the current value
840     of errno (just like calling C<errno> without an error number). By default,
841     no argument will be passed and errno is zero.
842    
843     =item $grp->errno ([$errno])
844    
845     Sets the group errno value to C<$errno>, or the current value of errno
846     when the argument is missing.
847    
848     Every aio request has an associated errno value that is restored when
849     the callback is invoked. This method lets you change this value from its
850     default (0).
851    
852     Calling C<result> will also set errno, so make sure you either set C<$!>
853     before the call to C<result>, or call c<errno> after it.
854 root 1.58
855 root 1.65 =item feed $grp $callback->($grp)
856 root 1.60
857     Sets a feeder/generator on this group: every group can have an attached
858     generator that generates requests if idle. The idea behind this is that,
859     although you could just queue as many requests as you want in a group,
860     this might starve other requests for a potentially long time. For
861     example, C<aio_scandir> might generate hundreds of thousands C<aio_stat>
862     requests, delaying any later requests for a long time.
863    
864     To avoid this, and allow incremental generation of requests, you can
865     instead a group and set a feeder on it that generates those requests. The
866 root 1.68 feed callback will be called whenever there are few enough (see C<limit>,
867 root 1.60 below) requests active in the group itself and is expected to queue more
868     requests.
869    
870 root 1.68 The feed callback can queue as many requests as it likes (i.e. C<add> does
871     not impose any limits).
872 root 1.60
873 root 1.65 If the feed does not queue more requests when called, it will be
874 root 1.60 automatically removed from the group.
875    
876 root 1.65 If the feed limit is C<0>, it will be set to C<2> automatically.
877 root 1.60
878     Example:
879    
880     # stat all files in @files, but only ever use four aio requests concurrently:
881    
882     my $grp = aio_group sub { print "finished\n" };
883 root 1.68 limit $grp 4;
884 root 1.65 feed $grp sub {
885 root 1.60 my $file = pop @files
886     or return;
887    
888     add $grp aio_stat $file, sub { ... };
889 root 1.65 };
890 root 1.60
891 root 1.68 =item limit $grp $num
892 root 1.60
893     Sets the feeder limit for the group: The feeder will be called whenever
894     the group contains less than this many requests.
895    
896     Setting the limit to C<0> will pause the feeding process.
897    
898 root 1.55 =back
899    
900 root 1.5 =head2 SUPPORT FUNCTIONS
901    
902 root 1.86 =head3 EVENT PROCESSING AND EVENT LOOP INTEGRATION
903    
904 root 1.5 =over 4
905    
906     =item $fileno = IO::AIO::poll_fileno
907    
908 root 1.20 Return the I<request result pipe file descriptor>. This filehandle must be
909     polled for reading by some mechanism outside this module (e.g. Event or
910     select, see below or the SYNOPSIS). If the pipe becomes readable you have
911     to call C<poll_cb> to check the results.
912 root 1.5
913     See C<poll_cb> for an example.
914    
915     =item IO::AIO::poll_cb
916    
917 root 1.86 Process some outstanding events on the result pipe. You have to call this
918 root 1.5 regularly. Returns the number of events processed. Returns immediately
919 root 1.86 when no events are outstanding. The amount of events processed depends on
920     the settings of C<IO::AIO::max_poll_req> and C<IO::AIO::max_poll_time>.
921 root 1.5
922 root 1.78 If not all requests were processed for whatever reason, the filehandle
923     will still be ready when C<poll_cb> returns.
924    
925 root 1.20 Example: Install an Event watcher that automatically calls
926     IO::AIO::poll_cb with high priority:
927 root 1.5
928     Event->io (fd => IO::AIO::poll_fileno,
929     poll => 'r', async => 1,
930     cb => \&IO::AIO::poll_cb);
931    
932 root 1.86 =item IO::AIO::max_poll_reqs $nreqs
933    
934     =item IO::AIO::max_poll_time $seconds
935    
936     These set the maximum number of requests (default C<0>, meaning infinity)
937     that are being processed by C<IO::AIO::poll_cb> in one call, respectively
938     the maximum amount of time (default C<0>, meaning infinity) spent in
939     C<IO::AIO::poll_cb> to process requests (more correctly the mininum amount
940     of time C<poll_cb> is allowed to use).
941 root 1.78
942 root 1.86 Setting these is useful if you want to ensure some level of
943     interactiveness when perl is not fast enough to process all requests in
944     time.
945 root 1.78
946 root 1.86 For interactive programs, values such as C<0.01> to C<0.1> should be fine.
947 root 1.78
948     Example: Install an Event watcher that automatically calls
949     IO::AIO::poll_some with low priority, to ensure that other parts of the
950     program get the CPU sometimes even under high AIO load.
951    
952 root 1.86 # try not to spend much more than 0.1s in poll_cb
953     IO::AIO::max_poll_time 0.1;
954    
955     # use a low priority so other tasks have priority
956 root 1.78 Event->io (fd => IO::AIO::poll_fileno,
957     poll => 'r', nice => 1,
958 root 1.86 cb => &IO::AIO::poll_cb);
959 root 1.78
960 root 1.5 =item IO::AIO::poll_wait
961    
962     Wait till the result filehandle becomes ready for reading (simply does a
963 root 1.86 C<select> on the filehandle. This is useful if you want to synchronously
964     wait for some requests to finish).
965 root 1.5
966     See C<nreqs> for an example.
967    
968 root 1.86 =item IO::AIO::poll
969 root 1.5
970 root 1.86 Waits until some requests have been handled.
971 root 1.5
972 root 1.86 Strictly equivalent to:
973 root 1.5
974     IO::AIO::poll_wait, IO::AIO::poll_cb
975 root 1.86 if IO::AIO::nreqs;
976 root 1.80
977 root 1.12 =item IO::AIO::flush
978    
979     Wait till all outstanding AIO requests have been handled.
980    
981 root 1.13 Strictly equivalent to:
982    
983     IO::AIO::poll_wait, IO::AIO::poll_cb
984     while IO::AIO::nreqs;
985    
986 root 1.86 =head3 CONTROLLING THE NUMBER OF THREADS
987 root 1.13
988 root 1.5 =item IO::AIO::min_parallel $nthreads
989    
990 root 1.61 Set the minimum number of AIO threads to C<$nthreads>. The current
991     default is C<8>, which means eight asynchronous operations can execute
992     concurrently at any one time (the number of outstanding requests,
993     however, is unlimited).
994 root 1.5
995 root 1.34 IO::AIO starts threads only on demand, when an AIO request is queued and
996 root 1.86 no free thread exists. Please note that queueing up a hundred requests can
997     create demand for a hundred threads, even if it turns out that everything
998     is in the cache and could have been processed faster by a single thread.
999 root 1.34
1000 root 1.61 It is recommended to keep the number of threads relatively low, as some
1001     Linux kernel versions will scale negatively with the number of threads
1002     (higher parallelity => MUCH higher latency). With current Linux 2.6
1003     versions, 4-32 threads should be fine.
1004 root 1.5
1005 root 1.34 Under most circumstances you don't need to call this function, as the
1006     module selects a default that is suitable for low to moderate load.
1007 root 1.5
1008     =item IO::AIO::max_parallel $nthreads
1009    
1010 root 1.34 Sets the maximum number of AIO threads to C<$nthreads>. If more than the
1011     specified number of threads are currently running, this function kills
1012     them. This function blocks until the limit is reached.
1013    
1014     While C<$nthreads> are zero, aio requests get queued but not executed
1015     until the number of threads has been increased again.
1016 root 1.5
1017     This module automatically runs C<max_parallel 0> at program end, to ensure
1018     that all threads are killed and that there are no outstanding requests.
1019    
1020     Under normal circumstances you don't need to call this function.
1021    
1022 root 1.86 =item IO::AIO::max_idle $nthreads
1023    
1024     Limit the number of threads (default: 4) that are allowed to idle (i.e.,
1025     threads that did not get a request to process within 10 seconds). That
1026     means if a thread becomes idle while C<$nthreads> other threads are also
1027     idle, it will free its resources and exit.
1028    
1029     This is useful when you allow a large number of threads (e.g. 100 or 1000)
1030     to allow for extremely high load situations, but want to free resources
1031     under normal circumstances (1000 threads can easily consume 30MB of RAM).
1032    
1033     The default is probably ok in most situations, especially if thread
1034     creation is fast. If thread creation is very slow on your system you might
1035     want to use larger values.
1036    
1037 root 1.79 =item $oldmaxreqs = IO::AIO::max_outstanding $maxreqs
1038 root 1.5
1039 root 1.79 This is a very bad function to use in interactive programs because it
1040     blocks, and a bad way to reduce concurrency because it is inexact: Better
1041     use an C<aio_group> together with a feed callback.
1042    
1043     Sets the maximum number of outstanding requests to C<$nreqs>. If you
1044     to queue up more than this number of requests, the next call to the
1045     C<poll_cb> (and C<poll_some> and other functions calling C<poll_cb>)
1046     function will block until the limit is no longer exceeded.
1047    
1048     The default value is very large, so there is no practical limit on the
1049     number of outstanding requests.
1050    
1051     You can still queue as many requests as you want. Therefore,
1052     C<max_oustsanding> is mainly useful in simple scripts (with low values) or
1053     as a stop gap to shield against fatal memory overflow (with large values).
1054 root 1.5
1055 root 1.86 =head3 STATISTICAL INFORMATION
1056    
1057     =item IO::AIO::nreqs
1058    
1059     Returns the number of requests currently in the ready, execute or pending
1060     states (i.e. for which their callback has not been invoked yet).
1061    
1062     Example: wait till there are no outstanding requests anymore:
1063    
1064     IO::AIO::poll_wait, IO::AIO::poll_cb
1065     while IO::AIO::nreqs;
1066    
1067     =item IO::AIO::nready
1068    
1069     Returns the number of requests currently in the ready state (not yet
1070     executed).
1071    
1072     =item IO::AIO::npending
1073    
1074     Returns the number of requests currently in the pending state (executed,
1075     but not yet processed by poll_cb).
1076    
1077 root 1.5 =back
1078    
1079 root 1.1 =cut
1080    
1081 root 1.2 # support function to convert a fd into a perl filehandle
1082     sub _fd2fh {
1083     return undef if $_[0] < 0;
1084    
1085 root 1.23 # try to generate nice filehandles
1086     my $sym = "IO::AIO::fd#$_[0]";
1087     local *$sym;
1088 root 1.25
1089 root 1.27 open *$sym, "+<&=$_[0]" # usually works under any unix
1090     or open *$sym, "<&=$_[0]" # cygwin needs this
1091     or open *$sym, ">&=$_[0]" # or this
1092 root 1.2 or return undef;
1093    
1094 root 1.23 *$sym
1095 root 1.2 }
1096    
1097 root 1.61 min_parallel 8;
1098 root 1.1
1099 root 1.82 END {
1100 root 1.84 min_parallel 1;
1101 root 1.82 flush;
1102     };
1103    
1104 root 1.1 1;
1105    
1106 root 1.27 =head2 FORK BEHAVIOUR
1107    
1108 root 1.52 This module should do "the right thing" when the process using it forks:
1109    
1110 root 1.34 Before the fork, IO::AIO enters a quiescent state where no requests
1111     can be added in other threads and no results will be processed. After
1112     the fork the parent simply leaves the quiescent state and continues
1113 root 1.72 request/result processing, while the child frees the request/result queue
1114     (so that the requests started before the fork will only be handled in the
1115     parent). Threads will be started on demand until the limit set in the
1116 root 1.34 parent process has been reached again.
1117 root 1.27
1118 root 1.52 In short: the parent will, after a short pause, continue as if fork had
1119     not been called, while the child will act as if IO::AIO has not been used
1120     yet.
1121    
1122 root 1.60 =head2 MEMORY USAGE
1123    
1124 root 1.72 Per-request usage:
1125    
1126     Each aio request uses - depending on your architecture - around 100-200
1127     bytes of memory. In addition, stat requests need a stat buffer (possibly
1128     a few hundred bytes), readdir requires a result buffer and so on. Perl
1129     scalars and other data passed into aio requests will also be locked and
1130     will consume memory till the request has entered the done state.
1131 root 1.60
1132     This is now awfully much, so queuing lots of requests is not usually a
1133     problem.
1134    
1135 root 1.72 Per-thread usage:
1136    
1137     In the execution phase, some aio requests require more memory for
1138     temporary buffers, and each thread requires a stack and other data
1139     structures (usually around 16k-128k, depending on the OS).
1140    
1141     =head1 KNOWN BUGS
1142    
1143 root 1.73 Known bugs will be fixed in the next release.
1144 root 1.60
1145 root 1.1 =head1 SEE ALSO
1146    
1147 root 1.68 L<Coro::AIO>.
1148 root 1.1
1149     =head1 AUTHOR
1150    
1151     Marc Lehmann <schmorp@schmorp.de>
1152     http://home.schmorp.de/
1153    
1154     =cut
1155