ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/IO-AIO/AIO.pm
Revision: 1.67
Committed: Tue Oct 24 02:25:16 2006 UTC (17 years, 7 months ago) by root
Branch: MAIN
Changes since 1.66: +7 -5 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3     IO::AIO - Asynchronous Input/Output
4    
5     =head1 SYNOPSIS
6    
7     use IO::AIO;
8    
9 root 1.6 aio_open "/etc/passwd", O_RDONLY, 0, sub {
10     my ($fh) = @_;
11     ...
12     };
13    
14     aio_unlink "/tmp/file", sub { };
15    
16     aio_read $fh, 30000, 1024, $buffer, 0, sub {
17 root 1.8 $_[0] > 0 or die "read error: $!";
18 root 1.6 };
19    
20 root 1.56 # version 2+ has request and group objects
21     use IO::AIO 2;
22 root 1.52
23     my $req = aio_unlink "/tmp/file", sub { };
24     $req->cancel; # cancel request if still in queue
25    
26 root 1.56 my $grp = aio_group sub { print "all stats done\n" };
27     add $grp aio_stat "..." for ...;
28    
29     # AnyEvent integration
30 root 1.42 open my $fh, "<&=" . IO::AIO::poll_fileno or die "$!";
31     my $w = AnyEvent->io (fh => $fh, poll => 'r', cb => sub { IO::AIO::poll_cb });
32    
33 root 1.56 # Event integration
34 root 1.6 Event->io (fd => IO::AIO::poll_fileno,
35 root 1.7 poll => 'r',
36 root 1.6 cb => \&IO::AIO::poll_cb);
37    
38 root 1.56 # Glib/Gtk2 integration
39 root 1.6 add_watch Glib::IO IO::AIO::poll_fileno,
40 root 1.22 in => sub { IO::AIO::poll_cb; 1 };
41 root 1.6
42 root 1.56 # Tk integration
43 root 1.6 Tk::Event::IO->fileevent (IO::AIO::poll_fileno, "",
44     readable => \&IO::AIO::poll_cb);
45    
46 root 1.56 # Danga::Socket integration
47 root 1.11 Danga::Socket->AddOtherFds (IO::AIO::poll_fileno =>
48     \&IO::AIO::poll_cb);
49    
50 root 1.1 =head1 DESCRIPTION
51    
52     This module implements asynchronous I/O using whatever means your
53 root 1.2 operating system supports.
54 root 1.1
55 root 1.2 Currently, a number of threads are started that execute your read/writes
56 root 1.66 and signal their completion. You don't need thread support in perl, and
57     the threads created by this module will not be visible to perl. In the
58     future, this module might make use of the native aio functions available
59     on many operating systems. However, they are often not well-supported
60     (Linux doesn't allow them on normal files currently, for example),
61     and they would only support aio_read and aio_write, so the remaining
62     functionality would have to be implemented using threads anyway.
63 root 1.1
64     Although the module will work with in the presence of other threads, it is
65 root 1.22 currently not reentrant, so use appropriate locking yourself, always call
66     C<poll_cb> from within the same thread, or never call C<poll_cb> (or other
67     C<aio_> functions) recursively.
68 root 1.1
69     =cut
70    
71     package IO::AIO;
72    
73 root 1.23 no warnings;
74 root 1.51 use strict 'vars';
75 root 1.23
76 root 1.1 use base 'Exporter';
77    
78     BEGIN {
79 root 1.55 our $VERSION = '2.0';
80 root 1.1
81 root 1.67 our @AIO_REQ = qw(aio_sendfile aio_read aio_write aio_open aio_close aio_stat
82     aio_lstat aio_unlink aio_rmdir aio_readdir aio_scandir aio_symlink
83     aio_fsync aio_fdatasync aio_readahead aio_rename aio_link aio_move
84     aio_group aio_nop);
85     our @EXPORT = (@AIO_REQ, qw(aioreq_pri));
86     our @EXPORT_OK = qw(poll_fileno poll_cb poll_wait flush
87     min_parallel max_parallel max_outstanding nreqs);
88 root 1.1
89 root 1.54 @IO::AIO::GRP::ISA = 'IO::AIO::REQ';
90    
91 root 1.1 require XSLoader;
92 root 1.51 XSLoader::load ("IO::AIO", $VERSION);
93 root 1.1 }
94    
95 root 1.5 =head1 FUNCTIONS
96 root 1.1
97 root 1.5 =head2 AIO FUNCTIONS
98 root 1.1
99 root 1.5 All the C<aio_*> calls are more or less thin wrappers around the syscall
100     with the same name (sans C<aio_>). The arguments are similar or identical,
101 root 1.14 and they all accept an additional (and optional) C<$callback> argument
102     which must be a code reference. This code reference will get called with
103     the syscall return code (e.g. most syscalls return C<-1> on error, unlike
104     perl, which usually delivers "false") as it's sole argument when the given
105     syscall has been executed asynchronously.
106 root 1.1
107 root 1.23 All functions expecting a filehandle keep a copy of the filehandle
108     internally until the request has finished.
109 root 1.1
110 root 1.55 All requests return objects of type L<IO::AIO::REQ> that allow further
111     manipulation of those requests while they are in-flight.
112 root 1.52
113 root 1.28 The pathnames you pass to these routines I<must> be absolute and
114     encoded in byte form. The reason for the former is that at the time the
115     request is being executed, the current working directory could have
116     changed. Alternatively, you can make sure that you never change the
117     current working directory.
118    
119     To encode pathnames to byte form, either make sure you either: a)
120     always pass in filenames you got from outside (command line, readdir
121     etc.), b) are ASCII or ISO 8859-1, c) use the Encode module and encode
122     your pathnames to the locale (or other) encoding in effect in the user
123     environment, d) use Glib::filename_from_unicode on unicode filenames or e)
124     use something else.
125 root 1.1
126 root 1.5 =over 4
127 root 1.1
128 root 1.40 =item aio_open $pathname, $flags, $mode, $callback->($fh)
129 root 1.1
130 root 1.2 Asynchronously open or create a file and call the callback with a newly
131     created filehandle for the file.
132 root 1.1
133     The pathname passed to C<aio_open> must be absolute. See API NOTES, above,
134     for an explanation.
135    
136 root 1.20 The C<$flags> argument is a bitmask. See the C<Fcntl> module for a
137     list. They are the same as used by C<sysopen>.
138    
139     Likewise, C<$mode> specifies the mode of the newly created file, if it
140     didn't exist and C<O_CREAT> has been given, just like perl's C<sysopen>,
141     except that it is mandatory (i.e. use C<0> if you don't create new files,
142     and C<0666> or C<0777> if you do).
143 root 1.1
144     Example:
145    
146     aio_open "/etc/passwd", O_RDONLY, 0, sub {
147 root 1.2 if ($_[0]) {
148     print "open successful, fh is $_[0]\n";
149 root 1.1 ...
150     } else {
151     die "open failed: $!\n";
152     }
153     };
154    
155 root 1.40 =item aio_close $fh, $callback->($status)
156 root 1.1
157 root 1.2 Asynchronously close a file and call the callback with the result
158     code. I<WARNING:> although accepted, you should not pass in a perl
159 root 1.20 filehandle here, as perl will likely close the file descriptor another
160     time when the filehandle is destroyed. Normally, you can safely call perls
161     C<close> or just let filehandles go out of scope.
162    
163     This is supposed to be a bug in the API, so that might change. It's
164     therefore best to avoid this function.
165 root 1.1
166 root 1.40 =item aio_read $fh,$offset,$length, $data,$dataoffset, $callback->($retval)
167 root 1.1
168 root 1.40 =item aio_write $fh,$offset,$length, $data,$dataoffset, $callback->($retval)
169 root 1.1
170     Reads or writes C<length> bytes from the specified C<fh> and C<offset>
171     into the scalar given by C<data> and offset C<dataoffset> and calls the
172     callback without the actual number of bytes read (or -1 on error, just
173     like the syscall).
174    
175 root 1.31 The C<$data> scalar I<MUST NOT> be modified in any way while the request
176     is outstanding. Modifying it can result in segfaults or WW3 (if the
177     necessary/optional hardware is installed).
178    
179 root 1.17 Example: Read 15 bytes at offset 7 into scalar C<$buffer>, starting at
180 root 1.1 offset C<0> within the scalar:
181    
182     aio_read $fh, 7, 15, $buffer, 0, sub {
183 root 1.9 $_[0] > 0 or die "read error: $!";
184     print "read $_[0] bytes: <$buffer>\n";
185 root 1.1 };
186    
187 root 1.50 =item aio_move $srcpath, $dstpath, $callback->($status)
188    
189 root 1.58 [EXPERIMENTAL due to internal aio_group use]
190    
191 root 1.52 Try to move the I<file> (directories not supported as either source or
192     destination) from C<$srcpath> to C<$dstpath> and call the callback with
193     the C<0> (error) or C<-1> ok.
194 root 1.50
195     This is a composite request that tries to rename(2) the file first. If
196     rename files with C<EXDEV>, it creates the destination file with mode 0200
197     and copies the contents of the source file into it using C<aio_sendfile>,
198     followed by restoring atime, mtime, access mode and uid/gid, in that
199     order, and unlinking the C<$srcpath>.
200    
201     If an error occurs, the partial destination file will be unlinked, if
202     possible, except when setting atime, mtime, access mode and uid/gid, where
203     errors are being ignored.
204    
205     =cut
206    
207     sub aio_move($$$) {
208     my ($src, $dst, $cb) = @_;
209    
210 root 1.58 my $grp = aio_group $cb;
211 root 1.55
212     add $grp aio_rename $src, $dst, sub {
213 root 1.51 if ($_[0] && $! == EXDEV) {
214 root 1.55 add $grp aio_open $src, O_RDONLY, 0, sub {
215 root 1.50 if (my $src_fh = $_[0]) {
216     my @stat = stat $src_fh;
217    
218 root 1.55 add $grp aio_open $dst, O_WRONLY, 0200, sub {
219 root 1.50 if (my $dst_fh = $_[0]) {
220 root 1.55 add $grp aio_sendfile $dst_fh, $src_fh, 0, $stat[7], sub {
221 root 1.50 close $src_fh;
222    
223     if ($_[0] == $stat[7]) {
224     utime $stat[8], $stat[9], $dst;
225     chmod $stat[2] & 07777, $dst_fh;
226     chown $stat[4], $stat[5], $dst_fh;
227     close $dst_fh;
228    
229 root 1.55 add $grp aio_unlink $src, sub {
230 root 1.58 $grp->result ($_[0]);
231 root 1.50 };
232     } else {
233     my $errno = $!;
234 root 1.55 add $grp aio_unlink $dst, sub {
235 root 1.50 $! = $errno;
236 root 1.58 $grp->result (-1);
237 root 1.50 };
238     }
239     };
240     } else {
241 root 1.58 $grp->result (-1);
242 root 1.50 }
243     },
244    
245     } else {
246 root 1.58 $grp->result (-1);
247 root 1.50 }
248     };
249     } else {
250 root 1.58 $grp->result ($_[0]);
251 root 1.50 }
252     };
253 root 1.55
254     $grp
255 root 1.50 }
256    
257 root 1.40 =item aio_sendfile $out_fh, $in_fh, $in_offset, $length, $callback->($retval)
258 root 1.35
259     Tries to copy C<$length> bytes from C<$in_fh> to C<$out_fh>. It starts
260     reading at byte offset C<$in_offset>, and starts writing at the current
261     file offset of C<$out_fh>. Because of that, it is not safe to issue more
262     than one C<aio_sendfile> per C<$out_fh>, as they will interfere with each
263     other.
264    
265     This call tries to make use of a native C<sendfile> syscall to provide
266     zero-copy operation. For this to work, C<$out_fh> should refer to a
267     socket, and C<$in_fh> should refer to mmap'able file.
268    
269     If the native sendfile call fails or is not implemented, it will be
270 root 1.36 emulated, so you can call C<aio_sendfile> on any type of filehandle
271     regardless of the limitations of the operating system.
272 root 1.35
273     Please note, however, that C<aio_sendfile> can read more bytes from
274     C<$in_fh> than are written, and there is no way to find out how many
275 root 1.36 bytes have been read from C<aio_sendfile> alone, as C<aio_sendfile> only
276     provides the number of bytes written to C<$out_fh>. Only if the result
277     value equals C<$length> one can assume that C<$length> bytes have been
278     read.
279 root 1.35
280 root 1.40 =item aio_readahead $fh,$offset,$length, $callback->($retval)
281 root 1.1
282 root 1.20 C<aio_readahead> populates the page cache with data from a file so that
283 root 1.1 subsequent reads from that file will not block on disk I/O. The C<$offset>
284     argument specifies the starting point from which data is to be read and
285     C<$length> specifies the number of bytes to be read. I/O is performed in
286     whole pages, so that offset is effectively rounded down to a page boundary
287     and bytes are read up to the next page boundary greater than or equal to
288 root 1.20 (off-set+length). C<aio_readahead> does not read beyond the end of the
289 root 1.1 file. The current file offset of the file is left unchanged.
290    
291 root 1.26 If that syscall doesn't exist (likely if your OS isn't Linux) it will be
292     emulated by simply reading the data, which would have a similar effect.
293    
294 root 1.40 =item aio_stat $fh_or_path, $callback->($status)
295 root 1.1
296 root 1.40 =item aio_lstat $fh, $callback->($status)
297 root 1.1
298     Works like perl's C<stat> or C<lstat> in void context. The callback will
299     be called after the stat and the results will be available using C<stat _>
300     or C<-s _> etc...
301    
302     The pathname passed to C<aio_stat> must be absolute. See API NOTES, above,
303     for an explanation.
304    
305     Currently, the stats are always 64-bit-stats, i.e. instead of returning an
306     error when stat'ing a large file, the results will be silently truncated
307     unless perl itself is compiled with large file support.
308    
309     Example: Print the length of F</etc/passwd>:
310    
311     aio_stat "/etc/passwd", sub {
312     $_[0] and die "stat failed: $!";
313     print "size is ", -s _, "\n";
314     };
315    
316 root 1.40 =item aio_unlink $pathname, $callback->($status)
317 root 1.1
318     Asynchronously unlink (delete) a file and call the callback with the
319     result code.
320    
321 root 1.50 =item aio_link $srcpath, $dstpath, $callback->($status)
322    
323     Asynchronously create a new link to the existing object at C<$srcpath> at
324     the path C<$dstpath> and call the callback with the result code.
325    
326     =item aio_symlink $srcpath, $dstpath, $callback->($status)
327    
328     Asynchronously create a new symbolic link to the existing object at C<$srcpath> at
329     the path C<$dstpath> and call the callback with the result code.
330    
331     =item aio_rename $srcpath, $dstpath, $callback->($status)
332    
333     Asynchronously rename the object at C<$srcpath> to C<$dstpath>, just as
334     rename(2) and call the callback with the result code.
335    
336 root 1.40 =item aio_rmdir $pathname, $callback->($status)
337 root 1.27
338     Asynchronously rmdir (delete) a directory and call the callback with the
339     result code.
340    
341 root 1.46 =item aio_readdir $pathname, $callback->($entries)
342 root 1.37
343     Unlike the POSIX call of the same name, C<aio_readdir> reads an entire
344     directory (i.e. opendir + readdir + closedir). The entries will not be
345     sorted, and will B<NOT> include the C<.> and C<..> entries.
346    
347     The callback a single argument which is either C<undef> or an array-ref
348     with the filenames.
349    
350 root 1.40 =item aio_scandir $path, $maxreq, $callback->($dirs, $nondirs)
351    
352 root 1.58 [EXPERIMENTAL due to internal aio_group use]
353    
354 root 1.52 Scans a directory (similar to C<aio_readdir>) but additionally tries to
355     separate the entries of directory C<$path> into two sets of names, ones
356     you can recurse into (directories or links to them), and ones you cannot
357     recurse into (everything else).
358    
359 root 1.61 C<aio_scandir> is a composite request that creates of many sub requests_
360     C<$maxreq> specifies the maximum number of outstanding aio requests that
361     this function generates. If it is C<< <= 0 >>, then a suitable default
362     will be chosen (currently 6).
363 root 1.40
364     On error, the callback is called without arguments, otherwise it receives
365     two array-refs with path-relative entry names.
366    
367     Example:
368    
369     aio_scandir $dir, 0, sub {
370     my ($dirs, $nondirs) = @_;
371     print "real directories: @$dirs\n";
372     print "everything else: @$nondirs\n";
373     };
374    
375     Implementation notes.
376    
377     The C<aio_readdir> cannot be avoided, but C<stat()>'ing every entry can.
378    
379     After reading the directory, the modification time, size etc. of the
380 root 1.52 directory before and after the readdir is checked, and if they match (and
381     isn't the current time), the link count will be used to decide how many
382     entries are directories (if >= 2). Otherwise, no knowledge of the number
383     of subdirectories will be assumed.
384    
385     Then entries will be sorted into likely directories (everything without
386     a non-initial dot currently) and likely non-directories (everything
387     else). Then every entry plus an appended C</.> will be C<stat>'ed,
388     likely directories first. If that succeeds, it assumes that the entry
389     is a directory or a symlink to directory (which will be checked
390     seperately). This is often faster than stat'ing the entry itself because
391     filesystems might detect the type of the entry without reading the inode
392     data (e.g. ext2fs filetype feature).
393    
394     If the known number of directories (link count - 2) has been reached, the
395     rest of the entries is assumed to be non-directories.
396    
397     This only works with certainty on POSIX (= UNIX) filesystems, which
398     fortunately are the vast majority of filesystems around.
399    
400     It will also likely work on non-POSIX filesystems with reduced efficiency
401     as those tend to return 0 or 1 as link counts, which disables the
402     directory counting heuristic.
403 root 1.40
404     =cut
405    
406     sub aio_scandir($$$) {
407     my ($path, $maxreq, $cb) = @_;
408    
409 root 1.58 my $grp = aio_group $cb;
410 root 1.55
411 root 1.61 $maxreq = 6 if $maxreq <= 0;
412 root 1.40
413     # stat once
414 root 1.55 add $grp aio_stat $path, sub {
415 root 1.58 return $grp->result () if $_[0];
416 root 1.52 my $now = time;
417 root 1.40 my $hash1 = join ":", (stat _)[0,1,3,7,9];
418    
419     # read the directory entries
420 root 1.55 add $grp aio_readdir $path, sub {
421 root 1.40 my $entries = shift
422 root 1.58 or return $grp->result ();
423 root 1.40
424     # stat the dir another time
425 root 1.55 add $grp aio_stat $path, sub {
426 root 1.40 my $hash2 = join ":", (stat _)[0,1,3,7,9];
427    
428     my $ndirs;
429    
430     # take the slow route if anything looks fishy
431 root 1.52 if ($hash1 ne $hash2 or (stat _)[9] == $now) {
432 root 1.40 $ndirs = -1;
433     } else {
434     # if nlink == 2, we are finished
435     # on non-posix-fs's, we rely on nlink < 2
436     $ndirs = (stat _)[3] - 2
437 root 1.58 or return $grp->result ([], $entries);
438 root 1.40 }
439    
440     # sort into likely dirs and likely nondirs
441     # dirs == files without ".", short entries first
442     $entries = [map $_->[0],
443     sort { $b->[1] cmp $a->[1] }
444     map [$_, sprintf "%s%04d", (/.\./ ? "1" : "0"), length],
445     @$entries];
446    
447     my (@dirs, @nondirs);
448    
449     my ($statcb, $schedcb);
450     my $nreq = 0;
451    
452 root 1.60 my $statgrp = add $grp aio_group;
453    
454 root 1.40 $schedcb = sub {
455     if (@$entries) {
456     if ($nreq < $maxreq) {
457     my $ent = pop @$entries;
458     $nreq++;
459 root 1.60 add $statgrp aio_stat "$path/$ent/.", sub { $statcb->($_[0], $ent) };
460 root 1.40 }
461     } elsif (!$nreq) {
462     # finished
463 root 1.60 $statgrp->cancel;
464 root 1.40 undef $statcb;
465     undef $schedcb;
466 root 1.60 $grp->result (\@dirs, \@nondirs);
467 root 1.40 }
468     };
469     $statcb = sub {
470     my ($status, $entry) = @_;
471    
472     if ($status < 0) {
473     $nreq--;
474     push @nondirs, $entry;
475     &$schedcb;
476     } else {
477     # need to check for real directory
478 root 1.55 add $grp aio_lstat "$path/$entry", sub {
479 root 1.40 $nreq--;
480    
481     if (-d _) {
482     push @dirs, $entry;
483    
484     if (!--$ndirs) {
485     push @nondirs, @$entries;
486     $entries = [];
487     }
488     } else {
489     push @nondirs, $entry;
490     }
491    
492     &$schedcb;
493     }
494     }
495     };
496    
497     &$schedcb while @$entries && $nreq < $maxreq;
498     };
499     };
500     };
501 root 1.55
502     $grp
503 root 1.40 }
504    
505     =item aio_fsync $fh, $callback->($status)
506 root 1.1
507     Asynchronously call fsync on the given filehandle and call the callback
508     with the fsync result code.
509    
510 root 1.40 =item aio_fdatasync $fh, $callback->($status)
511 root 1.1
512     Asynchronously call fdatasync on the given filehandle and call the
513 root 1.26 callback with the fdatasync result code.
514    
515     If this call isn't available because your OS lacks it or it couldn't be
516     detected, it will be emulated by calling C<fsync> instead.
517 root 1.1
518 root 1.58 =item aio_group $callback->(...)
519 root 1.54
520 root 1.55 [EXPERIMENTAL]
521    
522     This is a very special aio request: Instead of doing something, it is a
523     container for other aio requests, which is useful if you want to bundle
524     many requests into a single, composite, request.
525    
526     Returns an object of class L<IO::AIO::GRP>. See its documentation below
527     for more info.
528    
529     Example:
530    
531     my $grp = aio_group sub {
532     print "all stats done\n";
533     };
534    
535     add $grp
536     (aio_stat ...),
537     (aio_stat ...),
538     ...;
539    
540 root 1.63 =item aio_nop $callback->()
541    
542     This is a special request - it does nothing in itself and is only used for
543     side effects, such as when you want to add a dummy request to a group so
544     that finishing the requests in the group depends on executing the given
545     code.
546    
547 root 1.64 While this request does nothing, it still goes through the execution
548     phase and still requires a worker thread. Thus, the callback will not
549     be executed immediately but only after other requests in the queue have
550     entered their execution phase. This can be used to measure request
551     latency.
552    
553 root 1.56 =item IO::AIO::aio_sleep $fractional_seconds, $callback->() *NOT EXPORTED*
554 root 1.54
555     Mainly used for debugging and benchmarking, this aio request puts one of
556     the request workers to sleep for the given time.
557    
558 root 1.56 While it is theoretically handy to have simple I/O scheduling requests
559     like sleep and file handle readable/writable, the overhead this creates
560     is immense, so do not use this function except to put your application
561     under artificial I/O pressure.
562    
563 root 1.5 =back
564    
565 root 1.53 =head2 IO::AIO::REQ CLASS
566 root 1.52
567     All non-aggregate C<aio_*> functions return an object of this class when
568     called in non-void context.
569    
570     A request always moves through the following five states in its lifetime,
571     in order: B<ready> (request has been created, but has not been executed
572     yet), B<execute> (request is currently being executed), B<pending>
573     (request has been executed but callback has not been called yet),
574     B<result> (results are being processed synchronously, includes calling the
575     callback) and B<done> (request has reached the end of its lifetime and
576     holds no resources anymore).
577    
578     =over 4
579    
580 root 1.65 =item cancel $req
581 root 1.52
582     Cancels the request, if possible. Has the effect of skipping execution
583     when entering the B<execute> state and skipping calling the callback when
584     entering the the B<result> state, but will leave the request otherwise
585     untouched. That means that requests that currently execute will not be
586     stopped and resources held by the request will not be freed prematurely.
587    
588 root 1.65 =item cb $req $callback->(...)
589    
590     Replace (or simply set) the callback registered to the request.
591    
592 root 1.52 =back
593    
594 root 1.55 =head2 IO::AIO::GRP CLASS
595    
596     This class is a subclass of L<IO::AIO::REQ>, so all its methods apply to
597     objects of this class, too.
598    
599     A IO::AIO::GRP object is a special request that can contain multiple other
600     aio requests.
601    
602     You create one by calling the C<aio_group> constructing function with a
603     callback that will be called when all contained requests have entered the
604     C<done> state:
605    
606     my $grp = aio_group sub {
607     print "all requests are done\n";
608     };
609    
610     You add requests by calling the C<add> method with one or more
611     C<IO::AIO::REQ> objects:
612    
613     $grp->add (aio_unlink "...");
614    
615 root 1.58 add $grp aio_stat "...", sub {
616     $_[0] or return $grp->result ("error");
617    
618     # add another request dynamically, if first succeeded
619     add $grp aio_open "...", sub {
620     $grp->result ("ok");
621     };
622     };
623 root 1.55
624     This makes it very easy to create composite requests (see the source of
625     C<aio_move> for an application) that work and feel like simple requests.
626    
627 root 1.62 =over 4
628    
629     =item * The IO::AIO::GRP objects will be cleaned up during calls to
630 root 1.55 C<IO::AIO::poll_cb>, just like any other request.
631    
632 root 1.62 =item * They can be canceled like any other request. Canceling will cancel not
633 root 1.59 only the request itself, but also all requests it contains.
634 root 1.55
635 root 1.62 =item * They can also can also be added to other IO::AIO::GRP objects.
636 root 1.55
637 root 1.62 =item * You must not add requests to a group from within the group callback (or
638 root 1.60 any later time).
639    
640 root 1.62 =item * This does not harmonise well with C<max_outstanding>, so best do
641     not combine C<aio_group> with it. Groups and feeders are recommended for
642     this kind of concurrency-limiting.
643    
644     =back
645    
646 root 1.55 Their lifetime, simplified, looks like this: when they are empty, they
647     will finish very quickly. If they contain only requests that are in the
648     C<done> state, they will also finish. Otherwise they will continue to
649     exist.
650    
651 root 1.57 That means after creating a group you have some time to add requests. And
652     in the callbacks of those requests, you can add further requests to the
653     group. And only when all those requests have finished will the the group
654     itself finish.
655    
656 root 1.55 =over 4
657    
658 root 1.65 =item add $grp ...
659    
660 root 1.55 =item $grp->add (...)
661    
662 root 1.57 Add one or more requests to the group. Any type of L<IO::AIO::REQ> can
663     be added, including other groups, as long as you do not create circular
664     dependencies.
665    
666     Returns all its arguments.
667 root 1.55
668 root 1.58 =item $grp->result (...)
669    
670     Set the result value(s) that will be passed to the group callback when all
671     subrequests have finished. By default, no argument will be passed.
672    
673 root 1.65 =item feed $grp $callback->($grp)
674 root 1.60
675     [VERY EXPERIMENTAL]
676    
677     Sets a feeder/generator on this group: every group can have an attached
678     generator that generates requests if idle. The idea behind this is that,
679     although you could just queue as many requests as you want in a group,
680     this might starve other requests for a potentially long time. For
681     example, C<aio_scandir> might generate hundreds of thousands C<aio_stat>
682     requests, delaying any later requests for a long time.
683    
684     To avoid this, and allow incremental generation of requests, you can
685     instead a group and set a feeder on it that generates those requests. The
686 root 1.65 feed callback will be called whenever there are few enough (see C<feed_limit>,
687 root 1.60 below) requests active in the group itself and is expected to queue more
688     requests.
689    
690 root 1.65 The feed can queue as many requests as it likes (i.e. C<add> does not
691 root 1.60 impose any limits).
692    
693 root 1.65 If the feed does not queue more requests when called, it will be
694 root 1.60 automatically removed from the group.
695    
696 root 1.65 If the feed limit is C<0>, it will be set to C<2> automatically.
697 root 1.60
698     Example:
699    
700     # stat all files in @files, but only ever use four aio requests concurrently:
701    
702     my $grp = aio_group sub { print "finished\n" };
703 root 1.65 feed_limit $grp 4;
704     feed $grp sub {
705 root 1.60 my $file = pop @files
706     or return;
707    
708     add $grp aio_stat $file, sub { ... };
709 root 1.65 };
710 root 1.60
711 root 1.65 =item feed_limit $grp $num
712 root 1.60
713     Sets the feeder limit for the group: The feeder will be called whenever
714     the group contains less than this many requests.
715    
716     Setting the limit to C<0> will pause the feeding process.
717    
718 root 1.55 =back
719    
720 root 1.5 =head2 SUPPORT FUNCTIONS
721    
722     =over 4
723    
724     =item $fileno = IO::AIO::poll_fileno
725    
726 root 1.20 Return the I<request result pipe file descriptor>. This filehandle must be
727     polled for reading by some mechanism outside this module (e.g. Event or
728     select, see below or the SYNOPSIS). If the pipe becomes readable you have
729     to call C<poll_cb> to check the results.
730 root 1.5
731     See C<poll_cb> for an example.
732    
733     =item IO::AIO::poll_cb
734    
735     Process all outstanding events on the result pipe. You have to call this
736     regularly. Returns the number of events processed. Returns immediately
737     when no events are outstanding.
738    
739 root 1.20 Example: Install an Event watcher that automatically calls
740     IO::AIO::poll_cb with high priority:
741 root 1.5
742     Event->io (fd => IO::AIO::poll_fileno,
743     poll => 'r', async => 1,
744     cb => \&IO::AIO::poll_cb);
745    
746     =item IO::AIO::poll_wait
747    
748     Wait till the result filehandle becomes ready for reading (simply does a
749 root 1.20 C<select> on the filehandle. This is useful if you want to synchronously wait
750 root 1.5 for some requests to finish).
751    
752     See C<nreqs> for an example.
753    
754     =item IO::AIO::nreqs
755    
756 root 1.20 Returns the number of requests currently outstanding (i.e. for which their
757     callback has not been invoked yet).
758 root 1.5
759     Example: wait till there are no outstanding requests anymore:
760    
761     IO::AIO::poll_wait, IO::AIO::poll_cb
762     while IO::AIO::nreqs;
763    
764 root 1.12 =item IO::AIO::flush
765    
766     Wait till all outstanding AIO requests have been handled.
767    
768 root 1.13 Strictly equivalent to:
769    
770     IO::AIO::poll_wait, IO::AIO::poll_cb
771     while IO::AIO::nreqs;
772    
773     =item IO::AIO::poll
774    
775     Waits until some requests have been handled.
776    
777     Strictly equivalent to:
778    
779     IO::AIO::poll_wait, IO::AIO::poll_cb
780     if IO::AIO::nreqs;
781    
782 root 1.5 =item IO::AIO::min_parallel $nthreads
783    
784 root 1.61 Set the minimum number of AIO threads to C<$nthreads>. The current
785     default is C<8>, which means eight asynchronous operations can execute
786     concurrently at any one time (the number of outstanding requests,
787     however, is unlimited).
788 root 1.5
789 root 1.34 IO::AIO starts threads only on demand, when an AIO request is queued and
790     no free thread exists.
791    
792 root 1.61 It is recommended to keep the number of threads relatively low, as some
793     Linux kernel versions will scale negatively with the number of threads
794     (higher parallelity => MUCH higher latency). With current Linux 2.6
795     versions, 4-32 threads should be fine.
796 root 1.5
797 root 1.34 Under most circumstances you don't need to call this function, as the
798     module selects a default that is suitable for low to moderate load.
799 root 1.5
800     =item IO::AIO::max_parallel $nthreads
801    
802 root 1.34 Sets the maximum number of AIO threads to C<$nthreads>. If more than the
803     specified number of threads are currently running, this function kills
804     them. This function blocks until the limit is reached.
805    
806     While C<$nthreads> are zero, aio requests get queued but not executed
807     until the number of threads has been increased again.
808 root 1.5
809     This module automatically runs C<max_parallel 0> at program end, to ensure
810     that all threads are killed and that there are no outstanding requests.
811    
812     Under normal circumstances you don't need to call this function.
813    
814     =item $oldnreqs = IO::AIO::max_outstanding $nreqs
815    
816 root 1.62 [DEPRECATED]
817    
818 root 1.5 Sets the maximum number of outstanding requests to C<$nreqs>. If you
819     try to queue up more than this number of requests, the caller will block until
820     some requests have been handled.
821    
822     The default is very large, so normally there is no practical limit. If you
823 root 1.34 queue up many requests in a loop it often improves speed if you set
824 root 1.5 this to a relatively low number, such as C<100>.
825    
826 root 1.62 This function does not work well together with C<aio_group>'s, and their
827     feeder interface is better suited to limiting concurrency, so do not use
828     this function.
829    
830 root 1.5 Under normal circumstances you don't need to call this function.
831    
832     =back
833    
834 root 1.1 =cut
835    
836 root 1.2 # support function to convert a fd into a perl filehandle
837     sub _fd2fh {
838     return undef if $_[0] < 0;
839    
840 root 1.23 # try to generate nice filehandles
841     my $sym = "IO::AIO::fd#$_[0]";
842     local *$sym;
843 root 1.25
844 root 1.27 open *$sym, "+<&=$_[0]" # usually works under any unix
845     or open *$sym, "<&=$_[0]" # cygwin needs this
846     or open *$sym, ">&=$_[0]" # or this
847 root 1.2 or return undef;
848    
849 root 1.23 *$sym
850 root 1.2 }
851    
852 root 1.61 min_parallel 8;
853 root 1.1
854     END {
855     max_parallel 0;
856     }
857    
858     1;
859    
860 root 1.27 =head2 FORK BEHAVIOUR
861    
862 root 1.52 This module should do "the right thing" when the process using it forks:
863    
864 root 1.34 Before the fork, IO::AIO enters a quiescent state where no requests
865     can be added in other threads and no results will be processed. After
866     the fork the parent simply leaves the quiescent state and continues
867     request/result processing, while the child clears the request/result
868     queue (so the requests started before the fork will only be handled in
869 root 1.52 the parent). Threads will be started on demand until the limit ste in the
870 root 1.34 parent process has been reached again.
871 root 1.27
872 root 1.52 In short: the parent will, after a short pause, continue as if fork had
873     not been called, while the child will act as if IO::AIO has not been used
874     yet.
875    
876 root 1.60 =head2 MEMORY USAGE
877    
878     Each aio request uses - depending on your architecture - around 128 bytes
879     of memory. In addition, stat requests need a stat buffer (possibly a few
880     hundred bytes). Perl scalars and other data passed into aio requests will
881     also be locked.
882    
883     This is now awfully much, so queuing lots of requests is not usually a
884     problem.
885    
886     Each thread needs a stack area which is usually around 16k, sometimes much
887     larger, depending on the OS.
888    
889 root 1.1 =head1 SEE ALSO
890    
891 root 1.52 L<Coro>, L<Linux::AIO> (obsolete).
892 root 1.1
893     =head1 AUTHOR
894    
895     Marc Lehmann <schmorp@schmorp.de>
896     http://home.schmorp.de/
897    
898     =cut
899