ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/IO-AIO/AIO.pm
Revision: 1.70
Committed: Tue Oct 24 03:40:38 2006 UTC (17 years, 7 months ago) by root
Branch: MAIN
Changes since 1.69: +1 -1 lines
Log Message:
*** empty log message ***

File Contents

# Content
1 =head1 NAME
2
3 IO::AIO - Asynchronous Input/Output
4
5 =head1 SYNOPSIS
6
7 use IO::AIO;
8
9 aio_open "/etc/passwd", O_RDONLY, 0, sub {
10 my ($fh) = @_;
11 ...
12 };
13
14 aio_unlink "/tmp/file", sub { };
15
16 aio_read $fh, 30000, 1024, $buffer, 0, sub {
17 $_[0] > 0 or die "read error: $!";
18 };
19
20 # version 2+ has request and group objects
21 use IO::AIO 2;
22
23 aioreq_pri 4; # give next request a very high priority
24 my $req = aio_unlink "/tmp/file", sub { };
25 $req->cancel; # cancel request if still in queue
26
27 my $grp = aio_group sub { print "all stats done\n" };
28 add $grp aio_stat "..." for ...;
29
30 # AnyEvent integration
31 open my $fh, "<&=" . IO::AIO::poll_fileno or die "$!";
32 my $w = AnyEvent->io (fh => $fh, poll => 'r', cb => sub { IO::AIO::poll_cb });
33
34 # Event integration
35 Event->io (fd => IO::AIO::poll_fileno,
36 poll => 'r',
37 cb => \&IO::AIO::poll_cb);
38
39 # Glib/Gtk2 integration
40 add_watch Glib::IO IO::AIO::poll_fileno,
41 in => sub { IO::AIO::poll_cb; 1 };
42
43 # Tk integration
44 Tk::Event::IO->fileevent (IO::AIO::poll_fileno, "",
45 readable => \&IO::AIO::poll_cb);
46
47 # Danga::Socket integration
48 Danga::Socket->AddOtherFds (IO::AIO::poll_fileno =>
49 \&IO::AIO::poll_cb);
50
51 =head1 DESCRIPTION
52
53 This module implements asynchronous I/O using whatever means your
54 operating system supports.
55
56 Currently, a number of threads are started that execute your read/writes
57 and signal their completion. You don't need thread support in perl, and
58 the threads created by this module will not be visible to perl. In the
59 future, this module might make use of the native aio functions available
60 on many operating systems. However, they are often not well-supported
61 (Linux doesn't allow them on normal files currently, for example),
62 and they would only support aio_read and aio_write, so the remaining
63 functionality would have to be implemented using threads anyway.
64
65 Although the module will work with in the presence of other threads,
66 it is currently not reentrant in any way, so use appropriate locking
67 yourself, always call C<poll_cb> from within the same thread, or never
68 call C<poll_cb> (or other C<aio_> functions) recursively.
69
70 =cut
71
72 package IO::AIO;
73
74 no warnings;
75 use strict 'vars';
76
77 use base 'Exporter';
78
79 BEGIN {
80 our $VERSION = '2.0';
81
82 our @AIO_REQ = qw(aio_sendfile aio_read aio_write aio_open aio_close aio_stat
83 aio_lstat aio_unlink aio_rmdir aio_readdir aio_scandir aio_symlink
84 aio_fsync aio_fdatasync aio_readahead aio_rename aio_link aio_move
85 aio_group aio_nop);
86 our @EXPORT = (@AIO_REQ, qw(aioreq_pri aioreq_nice));
87 our @EXPORT_OK = qw(poll_fileno poll_cb poll_wait flush
88 min_parallel max_parallel max_outstanding nreqs);
89
90 @IO::AIO::GRP::ISA = 'IO::AIO::REQ';
91
92 require XSLoader;
93 XSLoader::load ("IO::AIO", $VERSION);
94 }
95
96 =head1 FUNCTIONS
97
98 =head2 AIO FUNCTIONS
99
100 All the C<aio_*> calls are more or less thin wrappers around the syscall
101 with the same name (sans C<aio_>). The arguments are similar or identical,
102 and they all accept an additional (and optional) C<$callback> argument
103 which must be a code reference. This code reference will get called with
104 the syscall return code (e.g. most syscalls return C<-1> on error, unlike
105 perl, which usually delivers "false") as it's sole argument when the given
106 syscall has been executed asynchronously.
107
108 All functions expecting a filehandle keep a copy of the filehandle
109 internally until the request has finished.
110
111 All requests return objects of type L<IO::AIO::REQ> that allow further
112 manipulation of those requests while they are in-flight.
113
114 The pathnames you pass to these routines I<must> be absolute and
115 encoded in byte form. The reason for the former is that at the time the
116 request is being executed, the current working directory could have
117 changed. Alternatively, you can make sure that you never change the
118 current working directory.
119
120 To encode pathnames to byte form, either make sure you either: a)
121 always pass in filenames you got from outside (command line, readdir
122 etc.), b) are ASCII or ISO 8859-1, c) use the Encode module and encode
123 your pathnames to the locale (or other) encoding in effect in the user
124 environment, d) use Glib::filename_from_unicode on unicode filenames or e)
125 use something else.
126
127 =over 4
128
129 =item aioreq_pri $pri
130
131 Sets the priority for the next aio request. The default priority
132 is C<0>, the minimum and maximum priorities are C<-4> and C<4>,
133 respectively. Requests with higher priority will be serviced first.
134
135 The priority will be reset to C<0> after each call to one of the C<aio_>
136 functions.
137
138 Example: open a file with low priority, then read something from it with
139 higher priority so the read request is serviced before other low priority
140 open requests (potentially spamming the cache):
141
142 aioreq_pri -3;
143 aio_open ..., sub {
144 return unless $_[0];
145
146 aioreq_pri -2;
147 aio_read $_[0], ..., sub {
148 ...
149 };
150 };
151
152 =item aioreq_nice $pri_adjust
153
154 Similar to C<aioreq_pri>, but subtracts the given value from the current
155 priority, so effects are cumulative.
156
157 =item aio_open $pathname, $flags, $mode, $callback->($fh)
158
159 Asynchronously open or create a file and call the callback with a newly
160 created filehandle for the file.
161
162 The pathname passed to C<aio_open> must be absolute. See API NOTES, above,
163 for an explanation.
164
165 The C<$flags> argument is a bitmask. See the C<Fcntl> module for a
166 list. They are the same as used by C<sysopen>.
167
168 Likewise, C<$mode> specifies the mode of the newly created file, if it
169 didn't exist and C<O_CREAT> has been given, just like perl's C<sysopen>,
170 except that it is mandatory (i.e. use C<0> if you don't create new files,
171 and C<0666> or C<0777> if you do).
172
173 Example:
174
175 aio_open "/etc/passwd", O_RDONLY, 0, sub {
176 if ($_[0]) {
177 print "open successful, fh is $_[0]\n";
178 ...
179 } else {
180 die "open failed: $!\n";
181 }
182 };
183
184 =item aio_close $fh, $callback->($status)
185
186 Asynchronously close a file and call the callback with the result
187 code. I<WARNING:> although accepted, you should not pass in a perl
188 filehandle here, as perl will likely close the file descriptor another
189 time when the filehandle is destroyed. Normally, you can safely call perls
190 C<close> or just let filehandles go out of scope.
191
192 This is supposed to be a bug in the API, so that might change. It's
193 therefore best to avoid this function.
194
195 =item aio_read $fh,$offset,$length, $data,$dataoffset, $callback->($retval)
196
197 =item aio_write $fh,$offset,$length, $data,$dataoffset, $callback->($retval)
198
199 Reads or writes C<length> bytes from the specified C<fh> and C<offset>
200 into the scalar given by C<data> and offset C<dataoffset> and calls the
201 callback without the actual number of bytes read (or -1 on error, just
202 like the syscall).
203
204 The C<$data> scalar I<MUST NOT> be modified in any way while the request
205 is outstanding. Modifying it can result in segfaults or WW3 (if the
206 necessary/optional hardware is installed).
207
208 Example: Read 15 bytes at offset 7 into scalar C<$buffer>, starting at
209 offset C<0> within the scalar:
210
211 aio_read $fh, 7, 15, $buffer, 0, sub {
212 $_[0] > 0 or die "read error: $!";
213 print "read $_[0] bytes: <$buffer>\n";
214 };
215
216 =item aio_move $srcpath, $dstpath, $callback->($status)
217
218 [EXPERIMENTAL due to internal aio_group use]
219
220 Try to move the I<file> (directories not supported as either source or
221 destination) from C<$srcpath> to C<$dstpath> and call the callback with
222 the C<0> (error) or C<-1> ok.
223
224 This is a composite request that tries to rename(2) the file first. If
225 rename files with C<EXDEV>, it creates the destination file with mode 0200
226 and copies the contents of the source file into it using C<aio_sendfile>,
227 followed by restoring atime, mtime, access mode and uid/gid, in that
228 order, and unlinking the C<$srcpath>.
229
230 If an error occurs, the partial destination file will be unlinked, if
231 possible, except when setting atime, mtime, access mode and uid/gid, where
232 errors are being ignored.
233
234 =cut
235
236 sub aio_move($$$) {
237 my ($src, $dst, $cb) = @_;
238
239 my $grp = aio_group $cb;
240
241 add $grp aio_rename $src, $dst, sub {
242 if ($_[0] && $! == EXDEV) {
243 add $grp aio_open $src, O_RDONLY, 0, sub {
244 if (my $src_fh = $_[0]) {
245 my @stat = stat $src_fh;
246
247 add $grp aio_open $dst, O_WRONLY, 0200, sub {
248 if (my $dst_fh = $_[0]) {
249 add $grp aio_sendfile $dst_fh, $src_fh, 0, $stat[7], sub {
250 close $src_fh;
251
252 if ($_[0] == $stat[7]) {
253 utime $stat[8], $stat[9], $dst;
254 chmod $stat[2] & 07777, $dst_fh;
255 chown $stat[4], $stat[5], $dst_fh;
256 close $dst_fh;
257
258 add $grp aio_unlink $src, sub {
259 $grp->result ($_[0]);
260 };
261 } else {
262 my $errno = $!;
263 add $grp aio_unlink $dst, sub {
264 $! = $errno;
265 $grp->result (-1);
266 };
267 }
268 };
269 } else {
270 $grp->result (-1);
271 }
272 },
273
274 } else {
275 $grp->result (-1);
276 }
277 };
278 } else {
279 $grp->result ($_[0]);
280 }
281 };
282
283 $grp
284 }
285
286 =item aio_sendfile $out_fh, $in_fh, $in_offset, $length, $callback->($retval)
287
288 Tries to copy C<$length> bytes from C<$in_fh> to C<$out_fh>. It starts
289 reading at byte offset C<$in_offset>, and starts writing at the current
290 file offset of C<$out_fh>. Because of that, it is not safe to issue more
291 than one C<aio_sendfile> per C<$out_fh>, as they will interfere with each
292 other.
293
294 This call tries to make use of a native C<sendfile> syscall to provide
295 zero-copy operation. For this to work, C<$out_fh> should refer to a
296 socket, and C<$in_fh> should refer to mmap'able file.
297
298 If the native sendfile call fails or is not implemented, it will be
299 emulated, so you can call C<aio_sendfile> on any type of filehandle
300 regardless of the limitations of the operating system.
301
302 Please note, however, that C<aio_sendfile> can read more bytes from
303 C<$in_fh> than are written, and there is no way to find out how many
304 bytes have been read from C<aio_sendfile> alone, as C<aio_sendfile> only
305 provides the number of bytes written to C<$out_fh>. Only if the result
306 value equals C<$length> one can assume that C<$length> bytes have been
307 read.
308
309 =item aio_readahead $fh,$offset,$length, $callback->($retval)
310
311 C<aio_readahead> populates the page cache with data from a file so that
312 subsequent reads from that file will not block on disk I/O. The C<$offset>
313 argument specifies the starting point from which data is to be read and
314 C<$length> specifies the number of bytes to be read. I/O is performed in
315 whole pages, so that offset is effectively rounded down to a page boundary
316 and bytes are read up to the next page boundary greater than or equal to
317 (off-set+length). C<aio_readahead> does not read beyond the end of the
318 file. The current file offset of the file is left unchanged.
319
320 If that syscall doesn't exist (likely if your OS isn't Linux) it will be
321 emulated by simply reading the data, which would have a similar effect.
322
323 =item aio_stat $fh_or_path, $callback->($status)
324
325 =item aio_lstat $fh, $callback->($status)
326
327 Works like perl's C<stat> or C<lstat> in void context. The callback will
328 be called after the stat and the results will be available using C<stat _>
329 or C<-s _> etc...
330
331 The pathname passed to C<aio_stat> must be absolute. See API NOTES, above,
332 for an explanation.
333
334 Currently, the stats are always 64-bit-stats, i.e. instead of returning an
335 error when stat'ing a large file, the results will be silently truncated
336 unless perl itself is compiled with large file support.
337
338 Example: Print the length of F</etc/passwd>:
339
340 aio_stat "/etc/passwd", sub {
341 $_[0] and die "stat failed: $!";
342 print "size is ", -s _, "\n";
343 };
344
345 =item aio_unlink $pathname, $callback->($status)
346
347 Asynchronously unlink (delete) a file and call the callback with the
348 result code.
349
350 =item aio_link $srcpath, $dstpath, $callback->($status)
351
352 Asynchronously create a new link to the existing object at C<$srcpath> at
353 the path C<$dstpath> and call the callback with the result code.
354
355 =item aio_symlink $srcpath, $dstpath, $callback->($status)
356
357 Asynchronously create a new symbolic link to the existing object at C<$srcpath> at
358 the path C<$dstpath> and call the callback with the result code.
359
360 =item aio_rename $srcpath, $dstpath, $callback->($status)
361
362 Asynchronously rename the object at C<$srcpath> to C<$dstpath>, just as
363 rename(2) and call the callback with the result code.
364
365 =item aio_rmdir $pathname, $callback->($status)
366
367 Asynchronously rmdir (delete) a directory and call the callback with the
368 result code.
369
370 =item aio_readdir $pathname, $callback->($entries)
371
372 Unlike the POSIX call of the same name, C<aio_readdir> reads an entire
373 directory (i.e. opendir + readdir + closedir). The entries will not be
374 sorted, and will B<NOT> include the C<.> and C<..> entries.
375
376 The callback a single argument which is either C<undef> or an array-ref
377 with the filenames.
378
379 =item aio_scandir $path, $maxreq, $callback->($dirs, $nondirs)
380
381 [EXPERIMENTAL due to internal aio_group use]
382
383 Scans a directory (similar to C<aio_readdir>) but additionally tries to
384 separate the entries of directory C<$path> into two sets of names, ones
385 you can recurse into (directories or links to them), and ones you cannot
386 recurse into (everything else).
387
388 C<aio_scandir> is a composite request that creates of many sub requests_
389 C<$maxreq> specifies the maximum number of outstanding aio requests that
390 this function generates. If it is C<< <= 0 >>, then a suitable default
391 will be chosen (currently 6).
392
393 On error, the callback is called without arguments, otherwise it receives
394 two array-refs with path-relative entry names.
395
396 Example:
397
398 aio_scandir $dir, 0, sub {
399 my ($dirs, $nondirs) = @_;
400 print "real directories: @$dirs\n";
401 print "everything else: @$nondirs\n";
402 };
403
404 Implementation notes.
405
406 The C<aio_readdir> cannot be avoided, but C<stat()>'ing every entry can.
407
408 After reading the directory, the modification time, size etc. of the
409 directory before and after the readdir is checked, and if they match (and
410 isn't the current time), the link count will be used to decide how many
411 entries are directories (if >= 2). Otherwise, no knowledge of the number
412 of subdirectories will be assumed.
413
414 Then entries will be sorted into likely directories (everything without
415 a non-initial dot currently) and likely non-directories (everything
416 else). Then every entry plus an appended C</.> will be C<stat>'ed,
417 likely directories first. If that succeeds, it assumes that the entry
418 is a directory or a symlink to directory (which will be checked
419 seperately). This is often faster than stat'ing the entry itself because
420 filesystems might detect the type of the entry without reading the inode
421 data (e.g. ext2fs filetype feature).
422
423 If the known number of directories (link count - 2) has been reached, the
424 rest of the entries is assumed to be non-directories.
425
426 This only works with certainty on POSIX (= UNIX) filesystems, which
427 fortunately are the vast majority of filesystems around.
428
429 It will also likely work on non-POSIX filesystems with reduced efficiency
430 as those tend to return 0 or 1 as link counts, which disables the
431 directory counting heuristic.
432
433 =cut
434
435 sub aio_scandir($$$) {
436 my ($path, $maxreq, $cb) = @_;
437
438 my $grp = aio_group $cb;
439
440 $maxreq = 6 if $maxreq <= 0;
441
442 # stat once
443 add $grp aio_stat $path, sub {
444 return $grp->result () if $_[0];
445 my $now = time;
446 my $hash1 = join ":", (stat _)[0,1,3,7,9];
447
448 # read the directory entries
449 add $grp aio_readdir $path, sub {
450 my $entries = shift
451 or return $grp->result ();
452
453 # stat the dir another time
454 add $grp aio_stat $path, sub {
455 my $hash2 = join ":", (stat _)[0,1,3,7,9];
456
457 my $ndirs;
458
459 # take the slow route if anything looks fishy
460 if ($hash1 ne $hash2 or (stat _)[9] == $now) {
461 $ndirs = -1;
462 } else {
463 # if nlink == 2, we are finished
464 # on non-posix-fs's, we rely on nlink < 2
465 $ndirs = (stat _)[3] - 2
466 or return $grp->result ([], $entries);
467 }
468
469 # sort into likely dirs and likely nondirs
470 # dirs == files without ".", short entries first
471 $entries = [map $_->[0],
472 sort { $b->[1] cmp $a->[1] }
473 map [$_, sprintf "%s%04d", (/.\./ ? "1" : "0"), length],
474 @$entries];
475
476 my (@dirs, @nondirs);
477
478 my ($statcb, $schedcb);
479 my $nreq = 0;
480
481 my $statgrp = add $grp aio_group;
482
483 $schedcb = sub {
484 if (@$entries) {
485 if ($nreq < $maxreq) {
486 my $ent = pop @$entries;
487 $nreq++;
488 add $statgrp aio_stat "$path/$ent/.", sub { $statcb->($_[0], $ent) };
489 }
490 } elsif (!$nreq) {
491 # finished
492 $statgrp->cancel;
493 undef $statcb;
494 undef $schedcb;
495 $grp->result (\@dirs, \@nondirs);
496 }
497 };
498 $statcb = sub {
499 my ($status, $entry) = @_;
500
501 if ($status < 0) {
502 $nreq--;
503 push @nondirs, $entry;
504 &$schedcb;
505 } else {
506 # need to check for real directory
507 add $grp aio_lstat "$path/$entry", sub {
508 $nreq--;
509
510 if (-d _) {
511 push @dirs, $entry;
512
513 if (!--$ndirs) {
514 push @nondirs, @$entries;
515 $entries = [];
516 }
517 } else {
518 push @nondirs, $entry;
519 }
520
521 &$schedcb;
522 }
523 }
524 };
525
526 &$schedcb while @$entries && $nreq < $maxreq;
527 };
528 };
529 };
530
531 $grp
532 }
533
534 =item aio_fsync $fh, $callback->($status)
535
536 Asynchronously call fsync on the given filehandle and call the callback
537 with the fsync result code.
538
539 =item aio_fdatasync $fh, $callback->($status)
540
541 Asynchronously call fdatasync on the given filehandle and call the
542 callback with the fdatasync result code.
543
544 If this call isn't available because your OS lacks it or it couldn't be
545 detected, it will be emulated by calling C<fsync> instead.
546
547 =item aio_group $callback->(...)
548
549 [EXPERIMENTAL]
550
551 This is a very special aio request: Instead of doing something, it is a
552 container for other aio requests, which is useful if you want to bundle
553 many requests into a single, composite, request.
554
555 Returns an object of class L<IO::AIO::GRP>. See its documentation below
556 for more info.
557
558 Example:
559
560 my $grp = aio_group sub {
561 print "all stats done\n";
562 };
563
564 add $grp
565 (aio_stat ...),
566 (aio_stat ...),
567 ...;
568
569 =item aio_nop $callback->()
570
571 This is a special request - it does nothing in itself and is only used for
572 side effects, such as when you want to add a dummy request to a group so
573 that finishing the requests in the group depends on executing the given
574 code.
575
576 While this request does nothing, it still goes through the execution
577 phase and still requires a worker thread. Thus, the callback will not
578 be executed immediately but only after other requests in the queue have
579 entered their execution phase. This can be used to measure request
580 latency.
581
582 =item IO::AIO::aio_sleep $fractional_seconds, $callback->() *NOT EXPORTED*
583
584 Mainly used for debugging and benchmarking, this aio request puts one of
585 the request workers to sleep for the given time.
586
587 While it is theoretically handy to have simple I/O scheduling requests
588 like sleep and file handle readable/writable, the overhead this creates
589 is immense, so do not use this function except to put your application
590 under artificial I/O pressure.
591
592 =back
593
594 =head2 IO::AIO::REQ CLASS
595
596 All non-aggregate C<aio_*> functions return an object of this class when
597 called in non-void context.
598
599 A request always moves through the following five states in its lifetime,
600 in order: B<ready> (request has been created, but has not been executed
601 yet), B<execute> (request is currently being executed), B<pending>
602 (request has been executed but callback has not been called yet),
603 B<result> (results are being processed synchronously, includes calling the
604 callback) and B<done> (request has reached the end of its lifetime and
605 holds no resources anymore).
606
607 =over 4
608
609 =item cancel $req
610
611 Cancels the request, if possible. Has the effect of skipping execution
612 when entering the B<execute> state and skipping calling the callback when
613 entering the the B<result> state, but will leave the request otherwise
614 untouched. That means that requests that currently execute will not be
615 stopped and resources held by the request will not be freed prematurely.
616
617 =item cb $req $callback->(...)
618
619 Replace (or simply set) the callback registered to the request.
620
621 =back
622
623 =head2 IO::AIO::GRP CLASS
624
625 This class is a subclass of L<IO::AIO::REQ>, so all its methods apply to
626 objects of this class, too.
627
628 A IO::AIO::GRP object is a special request that can contain multiple other
629 aio requests.
630
631 You create one by calling the C<aio_group> constructing function with a
632 callback that will be called when all contained requests have entered the
633 C<done> state:
634
635 my $grp = aio_group sub {
636 print "all requests are done\n";
637 };
638
639 You add requests by calling the C<add> method with one or more
640 C<IO::AIO::REQ> objects:
641
642 $grp->add (aio_unlink "...");
643
644 add $grp aio_stat "...", sub {
645 $_[0] or return $grp->result ("error");
646
647 # add another request dynamically, if first succeeded
648 add $grp aio_open "...", sub {
649 $grp->result ("ok");
650 };
651 };
652
653 This makes it very easy to create composite requests (see the source of
654 C<aio_move> for an application) that work and feel like simple requests.
655
656 =over 4
657
658 =item * The IO::AIO::GRP objects will be cleaned up during calls to
659 C<IO::AIO::poll_cb>, just like any other request.
660
661 =item * They can be canceled like any other request. Canceling will cancel not
662 only the request itself, but also all requests it contains.
663
664 =item * They can also can also be added to other IO::AIO::GRP objects.
665
666 =item * You must not add requests to a group from within the group callback (or
667 any later time).
668
669 =item * This does not harmonise well with C<max_outstanding>, so best do
670 not combine C<aio_group> with it. Groups and feeders are recommended for
671 this kind of concurrency-limiting.
672
673 =back
674
675 Their lifetime, simplified, looks like this: when they are empty, they
676 will finish very quickly. If they contain only requests that are in the
677 C<done> state, they will also finish. Otherwise they will continue to
678 exist.
679
680 That means after creating a group you have some time to add requests. And
681 in the callbacks of those requests, you can add further requests to the
682 group. And only when all those requests have finished will the the group
683 itself finish.
684
685 =over 4
686
687 =item add $grp ...
688
689 =item $grp->add (...)
690
691 Add one or more requests to the group. Any type of L<IO::AIO::REQ> can
692 be added, including other groups, as long as you do not create circular
693 dependencies.
694
695 Returns all its arguments.
696
697 =item $grp->result (...)
698
699 Set the result value(s) that will be passed to the group callback when all
700 subrequests have finished. By default, no argument will be passed.
701
702 =item feed $grp $callback->($grp)
703
704 [VERY EXPERIMENTAL]
705
706 Sets a feeder/generator on this group: every group can have an attached
707 generator that generates requests if idle. The idea behind this is that,
708 although you could just queue as many requests as you want in a group,
709 this might starve other requests for a potentially long time. For
710 example, C<aio_scandir> might generate hundreds of thousands C<aio_stat>
711 requests, delaying any later requests for a long time.
712
713 To avoid this, and allow incremental generation of requests, you can
714 instead a group and set a feeder on it that generates those requests. The
715 feed callback will be called whenever there are few enough (see C<limit>,
716 below) requests active in the group itself and is expected to queue more
717 requests.
718
719 The feed callback can queue as many requests as it likes (i.e. C<add> does
720 not impose any limits).
721
722 If the feed does not queue more requests when called, it will be
723 automatically removed from the group.
724
725 If the feed limit is C<0>, it will be set to C<2> automatically.
726
727 Example:
728
729 # stat all files in @files, but only ever use four aio requests concurrently:
730
731 my $grp = aio_group sub { print "finished\n" };
732 limit $grp 4;
733 feed $grp sub {
734 my $file = pop @files
735 or return;
736
737 add $grp aio_stat $file, sub { ... };
738 };
739
740 =item limit $grp $num
741
742 Sets the feeder limit for the group: The feeder will be called whenever
743 the group contains less than this many requests.
744
745 Setting the limit to C<0> will pause the feeding process.
746
747 =back
748
749 =head2 SUPPORT FUNCTIONS
750
751 =over 4
752
753 =item $fileno = IO::AIO::poll_fileno
754
755 Return the I<request result pipe file descriptor>. This filehandle must be
756 polled for reading by some mechanism outside this module (e.g. Event or
757 select, see below or the SYNOPSIS). If the pipe becomes readable you have
758 to call C<poll_cb> to check the results.
759
760 See C<poll_cb> for an example.
761
762 =item IO::AIO::poll_cb
763
764 Process all outstanding events on the result pipe. You have to call this
765 regularly. Returns the number of events processed. Returns immediately
766 when no events are outstanding.
767
768 Example: Install an Event watcher that automatically calls
769 IO::AIO::poll_cb with high priority:
770
771 Event->io (fd => IO::AIO::poll_fileno,
772 poll => 'r', async => 1,
773 cb => \&IO::AIO::poll_cb);
774
775 =item IO::AIO::poll_wait
776
777 Wait till the result filehandle becomes ready for reading (simply does a
778 C<select> on the filehandle. This is useful if you want to synchronously wait
779 for some requests to finish).
780
781 See C<nreqs> for an example.
782
783 =item IO::AIO::nreqs
784
785 Returns the number of requests currently outstanding (i.e. for which their
786 callback has not been invoked yet).
787
788 Example: wait till there are no outstanding requests anymore:
789
790 IO::AIO::poll_wait, IO::AIO::poll_cb
791 while IO::AIO::nreqs;
792
793 =item IO::AIO::flush
794
795 Wait till all outstanding AIO requests have been handled.
796
797 Strictly equivalent to:
798
799 IO::AIO::poll_wait, IO::AIO::poll_cb
800 while IO::AIO::nreqs;
801
802 =item IO::AIO::poll
803
804 Waits until some requests have been handled.
805
806 Strictly equivalent to:
807
808 IO::AIO::poll_wait, IO::AIO::poll_cb
809 if IO::AIO::nreqs;
810
811 =item IO::AIO::min_parallel $nthreads
812
813 Set the minimum number of AIO threads to C<$nthreads>. The current
814 default is C<8>, which means eight asynchronous operations can execute
815 concurrently at any one time (the number of outstanding requests,
816 however, is unlimited).
817
818 IO::AIO starts threads only on demand, when an AIO request is queued and
819 no free thread exists.
820
821 It is recommended to keep the number of threads relatively low, as some
822 Linux kernel versions will scale negatively with the number of threads
823 (higher parallelity => MUCH higher latency). With current Linux 2.6
824 versions, 4-32 threads should be fine.
825
826 Under most circumstances you don't need to call this function, as the
827 module selects a default that is suitable for low to moderate load.
828
829 =item IO::AIO::max_parallel $nthreads
830
831 Sets the maximum number of AIO threads to C<$nthreads>. If more than the
832 specified number of threads are currently running, this function kills
833 them. This function blocks until the limit is reached.
834
835 While C<$nthreads> are zero, aio requests get queued but not executed
836 until the number of threads has been increased again.
837
838 This module automatically runs C<max_parallel 0> at program end, to ensure
839 that all threads are killed and that there are no outstanding requests.
840
841 Under normal circumstances you don't need to call this function.
842
843 =item $oldnreqs = IO::AIO::max_outstanding $nreqs
844
845 [DEPRECATED]
846
847 Sets the maximum number of outstanding requests to C<$nreqs>. If you
848 try to queue up more than this number of requests, the caller will block until
849 some requests have been handled.
850
851 The default is very large, so normally there is no practical limit. If you
852 queue up many requests in a loop it often improves speed if you set
853 this to a relatively low number, such as C<100>.
854
855 This function does not work well together with C<aio_group>'s, and their
856 feeder interface is better suited to limiting concurrency, so do not use
857 this function.
858
859 Under normal circumstances you don't need to call this function.
860
861 =back
862
863 =cut
864
865 # support function to convert a fd into a perl filehandle
866 sub _fd2fh {
867 return undef if $_[0] < 0;
868
869 # try to generate nice filehandles
870 my $sym = "IO::AIO::fd#$_[0]";
871 local *$sym;
872
873 open *$sym, "+<&=$_[0]" # usually works under any unix
874 or open *$sym, "<&=$_[0]" # cygwin needs this
875 or open *$sym, ">&=$_[0]" # or this
876 or return undef;
877
878 *$sym
879 }
880
881 min_parallel 8;
882
883 END {
884 max_parallel 0;
885 }
886
887 1;
888
889 =head2 FORK BEHAVIOUR
890
891 This module should do "the right thing" when the process using it forks:
892
893 Before the fork, IO::AIO enters a quiescent state where no requests
894 can be added in other threads and no results will be processed. After
895 the fork the parent simply leaves the quiescent state and continues
896 request/result processing, while the child clears the request/result
897 queue (so the requests started before the fork will only be handled in
898 the parent). Threads will be started on demand until the limit ste in the
899 parent process has been reached again.
900
901 In short: the parent will, after a short pause, continue as if fork had
902 not been called, while the child will act as if IO::AIO has not been used
903 yet.
904
905 =head2 MEMORY USAGE
906
907 Each aio request uses - depending on your architecture - around 128 bytes
908 of memory. In addition, stat requests need a stat buffer (possibly a few
909 hundred bytes). Perl scalars and other data passed into aio requests will
910 also be locked.
911
912 This is now awfully much, so queuing lots of requests is not usually a
913 problem.
914
915 Each thread needs a stack area which is usually around 16k, sometimes much
916 larger, depending on the OS.
917
918 =head1 SEE ALSO
919
920 L<Coro::AIO>.
921
922 =head1 AUTHOR
923
924 Marc Lehmann <schmorp@schmorp.de>
925 http://home.schmorp.de/
926
927 =cut
928