ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/IO-AIO/AIO.pm
Revision: 1.83
Committed: Fri Oct 27 20:11:58 2006 UTC (17 years, 7 months ago) by root
Branch: MAIN
Changes since 1.82: +5 -3 lines
Log Message:
*** empty log message ***

File Contents

# Content
1 =head1 NAME
2
3 IO::AIO - Asynchronous Input/Output
4
5 =head1 SYNOPSIS
6
7 use IO::AIO;
8
9 aio_open "/etc/passwd", O_RDONLY, 0, sub {
10 my ($fh) = @_;
11 ...
12 };
13
14 aio_unlink "/tmp/file", sub { };
15
16 aio_read $fh, 30000, 1024, $buffer, 0, sub {
17 $_[0] > 0 or die "read error: $!";
18 };
19
20 # version 2+ has request and group objects
21 use IO::AIO 2;
22
23 aioreq_pri 4; # give next request a very high priority
24 my $req = aio_unlink "/tmp/file", sub { };
25 $req->cancel; # cancel request if still in queue
26
27 my $grp = aio_group sub { print "all stats done\n" };
28 add $grp aio_stat "..." for ...;
29
30 # AnyEvent integration
31 open my $fh, "<&=" . IO::AIO::poll_fileno or die "$!";
32 my $w = AnyEvent->io (fh => $fh, poll => 'r', cb => sub { IO::AIO::poll_cb });
33
34 # Event integration
35 Event->io (fd => IO::AIO::poll_fileno,
36 poll => 'r',
37 cb => \&IO::AIO::poll_cb);
38
39 # Glib/Gtk2 integration
40 add_watch Glib::IO IO::AIO::poll_fileno,
41 in => sub { IO::AIO::poll_cb; 1 };
42
43 # Tk integration
44 Tk::Event::IO->fileevent (IO::AIO::poll_fileno, "",
45 readable => \&IO::AIO::poll_cb);
46
47 # Danga::Socket integration
48 Danga::Socket->AddOtherFds (IO::AIO::poll_fileno =>
49 \&IO::AIO::poll_cb);
50
51 =head1 DESCRIPTION
52
53 This module implements asynchronous I/O using whatever means your
54 operating system supports.
55
56 In this version, a number of threads are started that execute your
57 requests and signal their completion. You don't need thread support
58 in perl, and the threads created by this module will not be visible
59 to perl. In the future, this module might make use of the native aio
60 functions available on many operating systems. However, they are often
61 not well-supported or restricted (Linux doesn't allow them on normal
62 files currently, for example), and they would only support aio_read and
63 aio_write, so the remaining functionality would have to be implemented
64 using threads anyway.
65
66 Although the module will work with in the presence of other (Perl-)
67 threads, it is currently not reentrant in any way, so use appropriate
68 locking yourself, always call C<poll_cb> from within the same thread, or
69 never call C<poll_cb> (or other C<aio_> functions) recursively.
70
71 =head1 REQUEST ANATOMY AND LIFETIME
72
73 Every C<aio_*> function creates a request. which is a C data structure not
74 directly visible to Perl.
75
76 If called in non-void context, every request function returns a Perl
77 object representing the request. In void context, nothing is returned,
78 which saves a bit of memory.
79
80 The perl object is a fairly standard ref-to-hash object. The hash contents
81 are not used by IO::AIO so you are free to store anything you like in it.
82
83 During their existance, aio requests travel through the following states,
84 in order:
85
86 =over 4
87
88 =item ready
89
90 Immediately after a request is created it is put into the ready state,
91 waiting for a thread to execute it.
92
93 =item execute
94
95 A thread has accepted the request for processing and is currently
96 executing it (e.g. blocking in read).
97
98 =item pending
99
100 The request has been executed and is waiting for result processing.
101
102 While request submission and execution is fully asynchronous, result
103 processing is not and relies on the perl interpreter calling C<poll_cb>
104 (or another function with the same effect).
105
106 =item result
107
108 The request results are processed synchronously by C<poll_cb>.
109
110 The C<poll_cb> function will process all outstanding aio requests by
111 calling their callbacks, freeing memory associated with them and managing
112 any groups they are contained in.
113
114 =item done
115
116 Request has reached the end of its lifetime and holds no resources anymore
117 (except possibly for the Perl object, but its connection to the actual
118 aio request is severed and calling its methods will either do nothing or
119 result in a runtime error).
120
121 =cut
122
123 package IO::AIO;
124
125 no warnings;
126 use strict 'vars';
127
128 use base 'Exporter';
129
130 BEGIN {
131 our $VERSION = '2.0';
132
133 our @AIO_REQ = qw(aio_sendfile aio_read aio_write aio_open aio_close aio_stat
134 aio_lstat aio_unlink aio_rmdir aio_readdir aio_scandir aio_symlink
135 aio_fsync aio_fdatasync aio_readahead aio_rename aio_link aio_move
136 aio_copy aio_group aio_nop aio_mknod);
137 our @EXPORT = (@AIO_REQ, qw(aioreq_pri aioreq_nice));
138 our @EXPORT_OK = qw(poll_fileno poll_cb poll_wait flush
139 min_parallel max_parallel nreqs nready npending);
140
141 @IO::AIO::GRP::ISA = 'IO::AIO::REQ';
142
143 require XSLoader;
144 XSLoader::load ("IO::AIO", $VERSION);
145 }
146
147 =head1 FUNCTIONS
148
149 =head2 AIO FUNCTIONS
150
151 All the C<aio_*> calls are more or less thin wrappers around the syscall
152 with the same name (sans C<aio_>). The arguments are similar or identical,
153 and they all accept an additional (and optional) C<$callback> argument
154 which must be a code reference. This code reference will get called with
155 the syscall return code (e.g. most syscalls return C<-1> on error, unlike
156 perl, which usually delivers "false") as it's sole argument when the given
157 syscall has been executed asynchronously.
158
159 All functions expecting a filehandle keep a copy of the filehandle
160 internally until the request has finished.
161
162 All requests return objects of type L<IO::AIO::REQ> that allow further
163 manipulation of those requests while they are in-flight.
164
165 The pathnames you pass to these routines I<must> be absolute and
166 encoded in byte form. The reason for the former is that at the time the
167 request is being executed, the current working directory could have
168 changed. Alternatively, you can make sure that you never change the
169 current working directory.
170
171 To encode pathnames to byte form, either make sure you either: a)
172 always pass in filenames you got from outside (command line, readdir
173 etc.), b) are ASCII or ISO 8859-1, c) use the Encode module and encode
174 your pathnames to the locale (or other) encoding in effect in the user
175 environment, d) use Glib::filename_from_unicode on unicode filenames or e)
176 use something else.
177
178 =over 4
179
180 =item $prev_pri = aioreq_pri [$pri]
181
182 Returns the priority value that would be used for the next request and, if
183 C<$pri> is given, sets the priority for the next aio request.
184
185 The default priority is C<0>, the minimum and maximum priorities are C<-4>
186 and C<4>, respectively. Requests with higher priority will be serviced
187 first.
188
189 The priority will be reset to C<0> after each call to one of the C<aio_*>
190 functions.
191
192 Example: open a file with low priority, then read something from it with
193 higher priority so the read request is serviced before other low priority
194 open requests (potentially spamming the cache):
195
196 aioreq_pri -3;
197 aio_open ..., sub {
198 return unless $_[0];
199
200 aioreq_pri -2;
201 aio_read $_[0], ..., sub {
202 ...
203 };
204 };
205
206 =item aioreq_nice $pri_adjust
207
208 Similar to C<aioreq_pri>, but subtracts the given value from the current
209 priority, so effects are cumulative.
210
211 =item aio_open $pathname, $flags, $mode, $callback->($fh)
212
213 Asynchronously open or create a file and call the callback with a newly
214 created filehandle for the file.
215
216 The pathname passed to C<aio_open> must be absolute. See API NOTES, above,
217 for an explanation.
218
219 The C<$flags> argument is a bitmask. See the C<Fcntl> module for a
220 list. They are the same as used by C<sysopen>.
221
222 Likewise, C<$mode> specifies the mode of the newly created file, if it
223 didn't exist and C<O_CREAT> has been given, just like perl's C<sysopen>,
224 except that it is mandatory (i.e. use C<0> if you don't create new files,
225 and C<0666> or C<0777> if you do).
226
227 Example:
228
229 aio_open "/etc/passwd", O_RDONLY, 0, sub {
230 if ($_[0]) {
231 print "open successful, fh is $_[0]\n";
232 ...
233 } else {
234 die "open failed: $!\n";
235 }
236 };
237
238 =item aio_close $fh, $callback->($status)
239
240 Asynchronously close a file and call the callback with the result
241 code. I<WARNING:> although accepted, you should not pass in a perl
242 filehandle here, as perl will likely close the file descriptor another
243 time when the filehandle is destroyed. Normally, you can safely call perls
244 C<close> or just let filehandles go out of scope.
245
246 This is supposed to be a bug in the API, so that might change. It's
247 therefore best to avoid this function.
248
249 =item aio_read $fh,$offset,$length, $data,$dataoffset, $callback->($retval)
250
251 =item aio_write $fh,$offset,$length, $data,$dataoffset, $callback->($retval)
252
253 Reads or writes C<length> bytes from the specified C<fh> and C<offset>
254 into the scalar given by C<data> and offset C<dataoffset> and calls the
255 callback without the actual number of bytes read (or -1 on error, just
256 like the syscall).
257
258 The C<$data> scalar I<MUST NOT> be modified in any way while the request
259 is outstanding. Modifying it can result in segfaults or WW3 (if the
260 necessary/optional hardware is installed).
261
262 Example: Read 15 bytes at offset 7 into scalar C<$buffer>, starting at
263 offset C<0> within the scalar:
264
265 aio_read $fh, 7, 15, $buffer, 0, sub {
266 $_[0] > 0 or die "read error: $!";
267 print "read $_[0] bytes: <$buffer>\n";
268 };
269
270 =item aio_sendfile $out_fh, $in_fh, $in_offset, $length, $callback->($retval)
271
272 Tries to copy C<$length> bytes from C<$in_fh> to C<$out_fh>. It starts
273 reading at byte offset C<$in_offset>, and starts writing at the current
274 file offset of C<$out_fh>. Because of that, it is not safe to issue more
275 than one C<aio_sendfile> per C<$out_fh>, as they will interfere with each
276 other.
277
278 This call tries to make use of a native C<sendfile> syscall to provide
279 zero-copy operation. For this to work, C<$out_fh> should refer to a
280 socket, and C<$in_fh> should refer to mmap'able file.
281
282 If the native sendfile call fails or is not implemented, it will be
283 emulated, so you can call C<aio_sendfile> on any type of filehandle
284 regardless of the limitations of the operating system.
285
286 Please note, however, that C<aio_sendfile> can read more bytes from
287 C<$in_fh> than are written, and there is no way to find out how many
288 bytes have been read from C<aio_sendfile> alone, as C<aio_sendfile> only
289 provides the number of bytes written to C<$out_fh>. Only if the result
290 value equals C<$length> one can assume that C<$length> bytes have been
291 read.
292
293 =item aio_readahead $fh,$offset,$length, $callback->($retval)
294
295 C<aio_readahead> populates the page cache with data from a file so that
296 subsequent reads from that file will not block on disk I/O. The C<$offset>
297 argument specifies the starting point from which data is to be read and
298 C<$length> specifies the number of bytes to be read. I/O is performed in
299 whole pages, so that offset is effectively rounded down to a page boundary
300 and bytes are read up to the next page boundary greater than or equal to
301 (off-set+length). C<aio_readahead> does not read beyond the end of the
302 file. The current file offset of the file is left unchanged.
303
304 If that syscall doesn't exist (likely if your OS isn't Linux) it will be
305 emulated by simply reading the data, which would have a similar effect.
306
307 =item aio_stat $fh_or_path, $callback->($status)
308
309 =item aio_lstat $fh, $callback->($status)
310
311 Works like perl's C<stat> or C<lstat> in void context. The callback will
312 be called after the stat and the results will be available using C<stat _>
313 or C<-s _> etc...
314
315 The pathname passed to C<aio_stat> must be absolute. See API NOTES, above,
316 for an explanation.
317
318 Currently, the stats are always 64-bit-stats, i.e. instead of returning an
319 error when stat'ing a large file, the results will be silently truncated
320 unless perl itself is compiled with large file support.
321
322 Example: Print the length of F</etc/passwd>:
323
324 aio_stat "/etc/passwd", sub {
325 $_[0] and die "stat failed: $!";
326 print "size is ", -s _, "\n";
327 };
328
329 =item aio_unlink $pathname, $callback->($status)
330
331 Asynchronously unlink (delete) a file and call the callback with the
332 result code.
333
334 =item aio_mknod $path, $mode, $dev, $callback->($status)
335
336 Asynchronously create a device node (or fifo). See mknod(2).
337
338 The only portable (POSIX) way of calling this function is:
339
340 aio_mknod $path, IO::AIO::S_IFIFO | $mode, 0, sub { ...
341
342 =item aio_link $srcpath, $dstpath, $callback->($status)
343
344 Asynchronously create a new link to the existing object at C<$srcpath> at
345 the path C<$dstpath> and call the callback with the result code.
346
347 =item aio_symlink $srcpath, $dstpath, $callback->($status)
348
349 Asynchronously create a new symbolic link to the existing object at C<$srcpath> at
350 the path C<$dstpath> and call the callback with the result code.
351
352 =item aio_rename $srcpath, $dstpath, $callback->($status)
353
354 Asynchronously rename the object at C<$srcpath> to C<$dstpath>, just as
355 rename(2) and call the callback with the result code.
356
357 =item aio_rmdir $pathname, $callback->($status)
358
359 Asynchronously rmdir (delete) a directory and call the callback with the
360 result code.
361
362 =item aio_readdir $pathname, $callback->($entries)
363
364 Unlike the POSIX call of the same name, C<aio_readdir> reads an entire
365 directory (i.e. opendir + readdir + closedir). The entries will not be
366 sorted, and will B<NOT> include the C<.> and C<..> entries.
367
368 The callback a single argument which is either C<undef> or an array-ref
369 with the filenames.
370
371 =item aio_copy $srcpath, $dstpath, $callback->($status)
372
373 Try to copy the I<file> (directories not supported as either source or
374 destination) from C<$srcpath> to C<$dstpath> and call the callback with
375 the C<0> (error) or C<-1> ok.
376
377 This is a composite request that it creates the destination file with
378 mode 0200 and copies the contents of the source file into it using
379 C<aio_sendfile>, followed by restoring atime, mtime, access mode and
380 uid/gid, in that order.
381
382 If an error occurs, the partial destination file will be unlinked, if
383 possible, except when setting atime, mtime, access mode and uid/gid, where
384 errors are being ignored.
385
386 =cut
387
388 sub aio_copy($$;$) {
389 my ($src, $dst, $cb) = @_;
390
391 my $pri = aioreq_pri;
392 my $grp = aio_group $cb;
393
394 aioreq_pri $pri;
395 add $grp aio_open $src, O_RDONLY, 0, sub {
396 if (my $src_fh = $_[0]) {
397 my @stat = stat $src_fh;
398
399 aioreq_pri $pri;
400 add $grp aio_open $dst, O_CREAT | O_WRONLY | O_TRUNC, 0200, sub {
401 if (my $dst_fh = $_[0]) {
402 aioreq_pri $pri;
403 add $grp aio_sendfile $dst_fh, $src_fh, 0, $stat[7], sub {
404 if ($_[0] == $stat[7]) {
405 $grp->result (0);
406 close $src_fh;
407
408 # those should not normally block. should. should.
409 utime $stat[8], $stat[9], $dst;
410 chmod $stat[2] & 07777, $dst_fh;
411 chown $stat[4], $stat[5], $dst_fh;
412 close $dst_fh;
413 } else {
414 $grp->result (-1);
415 close $src_fh;
416 close $dst_fh;
417
418 aioreq $pri;
419 add $grp aio_unlink $dst;
420 }
421 };
422 } else {
423 $grp->result (-1);
424 }
425 },
426
427 } else {
428 $grp->result (-1);
429 }
430 };
431
432 $grp
433 }
434
435 =item aio_move $srcpath, $dstpath, $callback->($status)
436
437 Try to move the I<file> (directories not supported as either source or
438 destination) from C<$srcpath> to C<$dstpath> and call the callback with
439 the C<0> (error) or C<-1> ok.
440
441 This is a composite request that tries to rename(2) the file first. If
442 rename files with C<EXDEV>, it copies the file with C<aio_copy> and, if
443 that is successful, unlinking the C<$srcpath>.
444
445 =cut
446
447 sub aio_move($$;$) {
448 my ($src, $dst, $cb) = @_;
449
450 my $pri = aioreq_pri;
451 my $grp = aio_group $cb;
452
453 aioreq_pri $pri;
454 add $grp aio_rename $src, $dst, sub {
455 if ($_[0] && $! == EXDEV) {
456 aioreq_pri $pri;
457 add $grp aio_copy $src, $dst, sub {
458 $grp->result ($_[0]);
459
460 if (!$_[0]) {
461 aioreq_pri $pri;
462 add $grp aio_unlink $src;
463 }
464 };
465 } else {
466 $grp->result ($_[0]);
467 }
468 };
469
470 $grp
471 }
472
473 =item aio_scandir $path, $maxreq, $callback->($dirs, $nondirs)
474
475 Scans a directory (similar to C<aio_readdir>) but additionally tries to
476 efficiently separate the entries of directory C<$path> into two sets of
477 names, directories you can recurse into (directories), and ones you cannot
478 recurse into (everything else, including symlinks to directories).
479
480 C<aio_scandir> is a composite request that creates of many sub requests_
481 C<$maxreq> specifies the maximum number of outstanding aio requests that
482 this function generates. If it is C<< <= 0 >>, then a suitable default
483 will be chosen (currently 4).
484
485 On error, the callback is called without arguments, otherwise it receives
486 two array-refs with path-relative entry names.
487
488 Example:
489
490 aio_scandir $dir, 0, sub {
491 my ($dirs, $nondirs) = @_;
492 print "real directories: @$dirs\n";
493 print "everything else: @$nondirs\n";
494 };
495
496 Implementation notes.
497
498 The C<aio_readdir> cannot be avoided, but C<stat()>'ing every entry can.
499
500 After reading the directory, the modification time, size etc. of the
501 directory before and after the readdir is checked, and if they match (and
502 isn't the current time), the link count will be used to decide how many
503 entries are directories (if >= 2). Otherwise, no knowledge of the number
504 of subdirectories will be assumed.
505
506 Then entries will be sorted into likely directories (everything without
507 a non-initial dot currently) and likely non-directories (everything
508 else). Then every entry plus an appended C</.> will be C<stat>'ed,
509 likely directories first. If that succeeds, it assumes that the entry
510 is a directory or a symlink to directory (which will be checked
511 seperately). This is often faster than stat'ing the entry itself because
512 filesystems might detect the type of the entry without reading the inode
513 data (e.g. ext2fs filetype feature).
514
515 If the known number of directories (link count - 2) has been reached, the
516 rest of the entries is assumed to be non-directories.
517
518 This only works with certainty on POSIX (= UNIX) filesystems, which
519 fortunately are the vast majority of filesystems around.
520
521 It will also likely work on non-POSIX filesystems with reduced efficiency
522 as those tend to return 0 or 1 as link counts, which disables the
523 directory counting heuristic.
524
525 =cut
526
527 sub aio_scandir($$$) {
528 my ($path, $maxreq, $cb) = @_;
529
530 my $pri = aioreq_pri;
531
532 my $grp = aio_group $cb;
533
534 $maxreq = 4 if $maxreq <= 0;
535
536 # stat once
537 aioreq_pri $pri;
538 add $grp aio_stat $path, sub {
539 return $grp->result () if $_[0];
540 my $now = time;
541 my $hash1 = join ":", (stat _)[0,1,3,7,9];
542
543 # read the directory entries
544 aioreq_pri $pri;
545 add $grp aio_readdir $path, sub {
546 my $entries = shift
547 or return $grp->result ();
548
549 # stat the dir another time
550 aioreq_pri $pri;
551 add $grp aio_stat $path, sub {
552 my $hash2 = join ":", (stat _)[0,1,3,7,9];
553
554 my $ndirs;
555
556 # take the slow route if anything looks fishy
557 if ($hash1 ne $hash2 or (stat _)[9] == $now) {
558 $ndirs = -1;
559 } else {
560 # if nlink == 2, we are finished
561 # on non-posix-fs's, we rely on nlink < 2
562 $ndirs = (stat _)[3] - 2
563 or return $grp->result ([], $entries);
564 }
565
566 # sort into likely dirs and likely nondirs
567 # dirs == files without ".", short entries first
568 $entries = [map $_->[0],
569 sort { $b->[1] cmp $a->[1] }
570 map [$_, sprintf "%s%04d", (/.\./ ? "1" : "0"), length],
571 @$entries];
572
573 my (@dirs, @nondirs);
574
575 my $statgrp = add $grp aio_group sub {
576 $grp->result (\@dirs, \@nondirs);
577 };
578
579 limit $statgrp $maxreq;
580 feed $statgrp sub {
581 return unless @$entries;
582 my $entry = pop @$entries;
583
584 aioreq_pri $pri;
585 add $statgrp aio_stat "$path/$entry/.", sub {
586 if ($_[0] < 0) {
587 push @nondirs, $entry;
588 } else {
589 # need to check for real directory
590 aioreq_pri $pri;
591 add $statgrp aio_lstat "$path/$entry", sub {
592 if (-d _) {
593 push @dirs, $entry;
594
595 unless (--$ndirs) {
596 push @nondirs, @$entries;
597 feed $statgrp;
598 }
599 } else {
600 push @nondirs, $entry;
601 }
602 }
603 }
604 };
605 };
606 };
607 };
608 };
609
610 $grp
611 }
612
613 =item aio_fsync $fh, $callback->($status)
614
615 Asynchronously call fsync on the given filehandle and call the callback
616 with the fsync result code.
617
618 =item aio_fdatasync $fh, $callback->($status)
619
620 Asynchronously call fdatasync on the given filehandle and call the
621 callback with the fdatasync result code.
622
623 If this call isn't available because your OS lacks it or it couldn't be
624 detected, it will be emulated by calling C<fsync> instead.
625
626 =item aio_group $callback->(...)
627
628 This is a very special aio request: Instead of doing something, it is a
629 container for other aio requests, which is useful if you want to bundle
630 many requests into a single, composite, request with a definite callback
631 and the ability to cancel the whole request with its subrequests.
632
633 Returns an object of class L<IO::AIO::GRP>. See its documentation below
634 for more info.
635
636 Example:
637
638 my $grp = aio_group sub {
639 print "all stats done\n";
640 };
641
642 add $grp
643 (aio_stat ...),
644 (aio_stat ...),
645 ...;
646
647 =item aio_nop $callback->()
648
649 This is a special request - it does nothing in itself and is only used for
650 side effects, such as when you want to add a dummy request to a group so
651 that finishing the requests in the group depends on executing the given
652 code.
653
654 While this request does nothing, it still goes through the execution
655 phase and still requires a worker thread. Thus, the callback will not
656 be executed immediately but only after other requests in the queue have
657 entered their execution phase. This can be used to measure request
658 latency.
659
660 =item IO::AIO::aio_busy $fractional_seconds, $callback->() *NOT EXPORTED*
661
662 Mainly used for debugging and benchmarking, this aio request puts one of
663 the request workers to sleep for the given time.
664
665 While it is theoretically handy to have simple I/O scheduling requests
666 like sleep and file handle readable/writable, the overhead this creates is
667 immense (it blocks a thread for a long time) so do not use this function
668 except to put your application under artificial I/O pressure.
669
670 =back
671
672 =head2 IO::AIO::REQ CLASS
673
674 All non-aggregate C<aio_*> functions return an object of this class when
675 called in non-void context.
676
677 =over 4
678
679 =item cancel $req
680
681 Cancels the request, if possible. Has the effect of skipping execution
682 when entering the B<execute> state and skipping calling the callback when
683 entering the the B<result> state, but will leave the request otherwise
684 untouched. That means that requests that currently execute will not be
685 stopped and resources held by the request will not be freed prematurely.
686
687 =item cb $req $callback->(...)
688
689 Replace (or simply set) the callback registered to the request.
690
691 =back
692
693 =head2 IO::AIO::GRP CLASS
694
695 This class is a subclass of L<IO::AIO::REQ>, so all its methods apply to
696 objects of this class, too.
697
698 A IO::AIO::GRP object is a special request that can contain multiple other
699 aio requests.
700
701 You create one by calling the C<aio_group> constructing function with a
702 callback that will be called when all contained requests have entered the
703 C<done> state:
704
705 my $grp = aio_group sub {
706 print "all requests are done\n";
707 };
708
709 You add requests by calling the C<add> method with one or more
710 C<IO::AIO::REQ> objects:
711
712 $grp->add (aio_unlink "...");
713
714 add $grp aio_stat "...", sub {
715 $_[0] or return $grp->result ("error");
716
717 # add another request dynamically, if first succeeded
718 add $grp aio_open "...", sub {
719 $grp->result ("ok");
720 };
721 };
722
723 This makes it very easy to create composite requests (see the source of
724 C<aio_move> for an application) that work and feel like simple requests.
725
726 =over 4
727
728 =item * The IO::AIO::GRP objects will be cleaned up during calls to
729 C<IO::AIO::poll_cb>, just like any other request.
730
731 =item * They can be canceled like any other request. Canceling will cancel not
732 only the request itself, but also all requests it contains.
733
734 =item * They can also can also be added to other IO::AIO::GRP objects.
735
736 =item * You must not add requests to a group from within the group callback (or
737 any later time).
738
739 =back
740
741 Their lifetime, simplified, looks like this: when they are empty, they
742 will finish very quickly. If they contain only requests that are in the
743 C<done> state, they will also finish. Otherwise they will continue to
744 exist.
745
746 That means after creating a group you have some time to add requests. And
747 in the callbacks of those requests, you can add further requests to the
748 group. And only when all those requests have finished will the the group
749 itself finish.
750
751 =over 4
752
753 =item add $grp ...
754
755 =item $grp->add (...)
756
757 Add one or more requests to the group. Any type of L<IO::AIO::REQ> can
758 be added, including other groups, as long as you do not create circular
759 dependencies.
760
761 Returns all its arguments.
762
763 =item $grp->cancel_subs
764
765 Cancel all subrequests and clears any feeder, but not the group request
766 itself. Useful when you queued a lot of events but got a result early.
767
768 =item $grp->result (...)
769
770 Set the result value(s) that will be passed to the group callback when all
771 subrequests have finished and set thre groups errno to the current value
772 of errno (just like calling C<errno> without an error number). By default,
773 no argument will be passed and errno is zero.
774
775 =item $grp->errno ([$errno])
776
777 Sets the group errno value to C<$errno>, or the current value of errno
778 when the argument is missing.
779
780 Every aio request has an associated errno value that is restored when
781 the callback is invoked. This method lets you change this value from its
782 default (0).
783
784 Calling C<result> will also set errno, so make sure you either set C<$!>
785 before the call to C<result>, or call c<errno> after it.
786
787 =item feed $grp $callback->($grp)
788
789 Sets a feeder/generator on this group: every group can have an attached
790 generator that generates requests if idle. The idea behind this is that,
791 although you could just queue as many requests as you want in a group,
792 this might starve other requests for a potentially long time. For
793 example, C<aio_scandir> might generate hundreds of thousands C<aio_stat>
794 requests, delaying any later requests for a long time.
795
796 To avoid this, and allow incremental generation of requests, you can
797 instead a group and set a feeder on it that generates those requests. The
798 feed callback will be called whenever there are few enough (see C<limit>,
799 below) requests active in the group itself and is expected to queue more
800 requests.
801
802 The feed callback can queue as many requests as it likes (i.e. C<add> does
803 not impose any limits).
804
805 If the feed does not queue more requests when called, it will be
806 automatically removed from the group.
807
808 If the feed limit is C<0>, it will be set to C<2> automatically.
809
810 Example:
811
812 # stat all files in @files, but only ever use four aio requests concurrently:
813
814 my $grp = aio_group sub { print "finished\n" };
815 limit $grp 4;
816 feed $grp sub {
817 my $file = pop @files
818 or return;
819
820 add $grp aio_stat $file, sub { ... };
821 };
822
823 =item limit $grp $num
824
825 Sets the feeder limit for the group: The feeder will be called whenever
826 the group contains less than this many requests.
827
828 Setting the limit to C<0> will pause the feeding process.
829
830 =back
831
832 =head2 SUPPORT FUNCTIONS
833
834 =over 4
835
836 =item $fileno = IO::AIO::poll_fileno
837
838 Return the I<request result pipe file descriptor>. This filehandle must be
839 polled for reading by some mechanism outside this module (e.g. Event or
840 select, see below or the SYNOPSIS). If the pipe becomes readable you have
841 to call C<poll_cb> to check the results.
842
843 See C<poll_cb> for an example.
844
845 =item IO::AIO::poll_cb
846
847 Process all outstanding events on the result pipe. You have to call this
848 regularly. Returns the number of events processed. Returns immediately
849 when no events are outstanding.
850
851 If not all requests were processed for whatever reason, the filehandle
852 will still be ready when C<poll_cb> returns.
853
854 Example: Install an Event watcher that automatically calls
855 IO::AIO::poll_cb with high priority:
856
857 Event->io (fd => IO::AIO::poll_fileno,
858 poll => 'r', async => 1,
859 cb => \&IO::AIO::poll_cb);
860
861 =item IO::AIO::poll_some $max_requests
862
863 Similar to C<poll_cb>, but only processes up to C<$max_requests> requests
864 at a time.
865
866 Useful if you want to ensure some level of interactiveness when perl is
867 not fast enough to process all requests in time.
868
869 Example: Install an Event watcher that automatically calls
870 IO::AIO::poll_some with low priority, to ensure that other parts of the
871 program get the CPU sometimes even under high AIO load.
872
873 Event->io (fd => IO::AIO::poll_fileno,
874 poll => 'r', nice => 1,
875 cb => sub { IO::AIO::poll_some 256 });
876
877 =item IO::AIO::poll_wait
878
879 Wait till the result filehandle becomes ready for reading (simply does a
880 C<select> on the filehandle. This is useful if you want to synchronously wait
881 for some requests to finish).
882
883 See C<nreqs> for an example.
884
885 =item IO::AIO::nreqs
886
887 Returns the number of requests currently in the ready, execute or pending
888 states (i.e. for which their callback has not been invoked yet).
889
890 Example: wait till there are no outstanding requests anymore:
891
892 IO::AIO::poll_wait, IO::AIO::poll_cb
893 while IO::AIO::nreqs;
894
895 =item IO::AIO::nready
896
897 Returns the number of requests currently in the ready state (not yet
898 executed).
899
900 =item IO::AIO::npending
901
902 Returns the number of requests currently in the pending state (executed,
903 but not yet processed by poll_cb).
904
905 =item IO::AIO::flush
906
907 Wait till all outstanding AIO requests have been handled.
908
909 Strictly equivalent to:
910
911 IO::AIO::poll_wait, IO::AIO::poll_cb
912 while IO::AIO::nreqs;
913
914 =item IO::AIO::poll
915
916 Waits until some requests have been handled.
917
918 Strictly equivalent to:
919
920 IO::AIO::poll_wait, IO::AIO::poll_cb
921 if IO::AIO::nreqs;
922
923 =item IO::AIO::min_parallel $nthreads
924
925 Set the minimum number of AIO threads to C<$nthreads>. The current
926 default is C<8>, which means eight asynchronous operations can execute
927 concurrently at any one time (the number of outstanding requests,
928 however, is unlimited).
929
930 IO::AIO starts threads only on demand, when an AIO request is queued and
931 no free thread exists.
932
933 It is recommended to keep the number of threads relatively low, as some
934 Linux kernel versions will scale negatively with the number of threads
935 (higher parallelity => MUCH higher latency). With current Linux 2.6
936 versions, 4-32 threads should be fine.
937
938 Under most circumstances you don't need to call this function, as the
939 module selects a default that is suitable for low to moderate load.
940
941 =item IO::AIO::max_parallel $nthreads
942
943 Sets the maximum number of AIO threads to C<$nthreads>. If more than the
944 specified number of threads are currently running, this function kills
945 them. This function blocks until the limit is reached.
946
947 While C<$nthreads> are zero, aio requests get queued but not executed
948 until the number of threads has been increased again.
949
950 This module automatically runs C<max_parallel 0> at program end, to ensure
951 that all threads are killed and that there are no outstanding requests.
952
953 Under normal circumstances you don't need to call this function.
954
955 =item $oldmaxreqs = IO::AIO::max_outstanding $maxreqs
956
957 This is a very bad function to use in interactive programs because it
958 blocks, and a bad way to reduce concurrency because it is inexact: Better
959 use an C<aio_group> together with a feed callback.
960
961 Sets the maximum number of outstanding requests to C<$nreqs>. If you
962 to queue up more than this number of requests, the next call to the
963 C<poll_cb> (and C<poll_some> and other functions calling C<poll_cb>)
964 function will block until the limit is no longer exceeded.
965
966 The default value is very large, so there is no practical limit on the
967 number of outstanding requests.
968
969 You can still queue as many requests as you want. Therefore,
970 C<max_oustsanding> is mainly useful in simple scripts (with low values) or
971 as a stop gap to shield against fatal memory overflow (with large values).
972
973 =back
974
975 =cut
976
977 # support function to convert a fd into a perl filehandle
978 sub _fd2fh {
979 return undef if $_[0] < 0;
980
981 # try to generate nice filehandles
982 my $sym = "IO::AIO::fd#$_[0]";
983 local *$sym;
984
985 open *$sym, "+<&=$_[0]" # usually works under any unix
986 or open *$sym, "<&=$_[0]" # cygwin needs this
987 or open *$sym, ">&=$_[0]" # or this
988 or return undef;
989
990 *$sym
991 }
992
993 min_parallel 8;
994
995 END {
996 flush;
997 };
998
999 1;
1000
1001 =head2 FORK BEHAVIOUR
1002
1003 This module should do "the right thing" when the process using it forks:
1004
1005 Before the fork, IO::AIO enters a quiescent state where no requests
1006 can be added in other threads and no results will be processed. After
1007 the fork the parent simply leaves the quiescent state and continues
1008 request/result processing, while the child frees the request/result queue
1009 (so that the requests started before the fork will only be handled in the
1010 parent). Threads will be started on demand until the limit set in the
1011 parent process has been reached again.
1012
1013 In short: the parent will, after a short pause, continue as if fork had
1014 not been called, while the child will act as if IO::AIO has not been used
1015 yet.
1016
1017 =head2 MEMORY USAGE
1018
1019 Per-request usage:
1020
1021 Each aio request uses - depending on your architecture - around 100-200
1022 bytes of memory. In addition, stat requests need a stat buffer (possibly
1023 a few hundred bytes), readdir requires a result buffer and so on. Perl
1024 scalars and other data passed into aio requests will also be locked and
1025 will consume memory till the request has entered the done state.
1026
1027 This is now awfully much, so queuing lots of requests is not usually a
1028 problem.
1029
1030 Per-thread usage:
1031
1032 In the execution phase, some aio requests require more memory for
1033 temporary buffers, and each thread requires a stack and other data
1034 structures (usually around 16k-128k, depending on the OS).
1035
1036 =head1 KNOWN BUGS
1037
1038 Known bugs will be fixed in the next release.
1039
1040 =head1 SEE ALSO
1041
1042 L<Coro::AIO>.
1043
1044 =head1 AUTHOR
1045
1046 Marc Lehmann <schmorp@schmorp.de>
1047 http://home.schmorp.de/
1048
1049 =cut
1050