ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/IO-AIO/AIO.pm
Revision: 1.71
Committed: Tue Oct 24 11:57:30 2006 UTC (17 years, 7 months ago) by root
Branch: MAIN
Changes since 1.70: +6 -9 lines
Log Message:
*** empty log message ***

File Contents

# Content
1 =head1 NAME
2
3 IO::AIO - Asynchronous Input/Output
4
5 =head1 SYNOPSIS
6
7 use IO::AIO;
8
9 aio_open "/etc/passwd", O_RDONLY, 0, sub {
10 my ($fh) = @_;
11 ...
12 };
13
14 aio_unlink "/tmp/file", sub { };
15
16 aio_read $fh, 30000, 1024, $buffer, 0, sub {
17 $_[0] > 0 or die "read error: $!";
18 };
19
20 # version 2+ has request and group objects
21 use IO::AIO 2;
22
23 aioreq_pri 4; # give next request a very high priority
24 my $req = aio_unlink "/tmp/file", sub { };
25 $req->cancel; # cancel request if still in queue
26
27 my $grp = aio_group sub { print "all stats done\n" };
28 add $grp aio_stat "..." for ...;
29
30 # AnyEvent integration
31 open my $fh, "<&=" . IO::AIO::poll_fileno or die "$!";
32 my $w = AnyEvent->io (fh => $fh, poll => 'r', cb => sub { IO::AIO::poll_cb });
33
34 # Event integration
35 Event->io (fd => IO::AIO::poll_fileno,
36 poll => 'r',
37 cb => \&IO::AIO::poll_cb);
38
39 # Glib/Gtk2 integration
40 add_watch Glib::IO IO::AIO::poll_fileno,
41 in => sub { IO::AIO::poll_cb; 1 };
42
43 # Tk integration
44 Tk::Event::IO->fileevent (IO::AIO::poll_fileno, "",
45 readable => \&IO::AIO::poll_cb);
46
47 # Danga::Socket integration
48 Danga::Socket->AddOtherFds (IO::AIO::poll_fileno =>
49 \&IO::AIO::poll_cb);
50
51 =head1 DESCRIPTION
52
53 This module implements asynchronous I/O using whatever means your
54 operating system supports.
55
56 Currently, a number of threads are started that execute your read/writes
57 and signal their completion. You don't need thread support in perl, and
58 the threads created by this module will not be visible to perl. In the
59 future, this module might make use of the native aio functions available
60 on many operating systems. However, they are often not well-supported
61 (Linux doesn't allow them on normal files currently, for example),
62 and they would only support aio_read and aio_write, so the remaining
63 functionality would have to be implemented using threads anyway.
64
65 Although the module will work with in the presence of other threads,
66 it is currently not reentrant in any way, so use appropriate locking
67 yourself, always call C<poll_cb> from within the same thread, or never
68 call C<poll_cb> (or other C<aio_> functions) recursively.
69
70 =cut
71
72 package IO::AIO;
73
74 no warnings;
75 use strict 'vars';
76
77 use base 'Exporter';
78
79 BEGIN {
80 our $VERSION = '2.0';
81
82 our @AIO_REQ = qw(aio_sendfile aio_read aio_write aio_open aio_close aio_stat
83 aio_lstat aio_unlink aio_rmdir aio_readdir aio_scandir aio_symlink
84 aio_fsync aio_fdatasync aio_readahead aio_rename aio_link aio_move
85 aio_group aio_nop);
86 our @EXPORT = (@AIO_REQ, qw(aioreq_pri aioreq_nice));
87 our @EXPORT_OK = qw(poll_fileno poll_cb poll_wait flush
88 min_parallel max_parallel max_outstanding nreqs);
89
90 @IO::AIO::GRP::ISA = 'IO::AIO::REQ';
91
92 require XSLoader;
93 XSLoader::load ("IO::AIO", $VERSION);
94 }
95
96 =head1 FUNCTIONS
97
98 =head2 AIO FUNCTIONS
99
100 All the C<aio_*> calls are more or less thin wrappers around the syscall
101 with the same name (sans C<aio_>). The arguments are similar or identical,
102 and they all accept an additional (and optional) C<$callback> argument
103 which must be a code reference. This code reference will get called with
104 the syscall return code (e.g. most syscalls return C<-1> on error, unlike
105 perl, which usually delivers "false") as it's sole argument when the given
106 syscall has been executed asynchronously.
107
108 All functions expecting a filehandle keep a copy of the filehandle
109 internally until the request has finished.
110
111 All requests return objects of type L<IO::AIO::REQ> that allow further
112 manipulation of those requests while they are in-flight.
113
114 The pathnames you pass to these routines I<must> be absolute and
115 encoded in byte form. The reason for the former is that at the time the
116 request is being executed, the current working directory could have
117 changed. Alternatively, you can make sure that you never change the
118 current working directory.
119
120 To encode pathnames to byte form, either make sure you either: a)
121 always pass in filenames you got from outside (command line, readdir
122 etc.), b) are ASCII or ISO 8859-1, c) use the Encode module and encode
123 your pathnames to the locale (or other) encoding in effect in the user
124 environment, d) use Glib::filename_from_unicode on unicode filenames or e)
125 use something else.
126
127 =over 4
128
129 =item aioreq_pri $pri
130
131 Sets the priority for the next aio request. The default priority
132 is C<0>, the minimum and maximum priorities are C<-4> and C<4>,
133 respectively. Requests with higher priority will be serviced first.
134
135 The priority will be reset to C<0> after each call to one of the C<aio_>
136 functions.
137
138 Example: open a file with low priority, then read something from it with
139 higher priority so the read request is serviced before other low priority
140 open requests (potentially spamming the cache):
141
142 aioreq_pri -3;
143 aio_open ..., sub {
144 return unless $_[0];
145
146 aioreq_pri -2;
147 aio_read $_[0], ..., sub {
148 ...
149 };
150 };
151
152 =item aioreq_nice $pri_adjust
153
154 Similar to C<aioreq_pri>, but subtracts the given value from the current
155 priority, so effects are cumulative.
156
157 =item aio_open $pathname, $flags, $mode, $callback->($fh)
158
159 Asynchronously open or create a file and call the callback with a newly
160 created filehandle for the file.
161
162 The pathname passed to C<aio_open> must be absolute. See API NOTES, above,
163 for an explanation.
164
165 The C<$flags> argument is a bitmask. See the C<Fcntl> module for a
166 list. They are the same as used by C<sysopen>.
167
168 Likewise, C<$mode> specifies the mode of the newly created file, if it
169 didn't exist and C<O_CREAT> has been given, just like perl's C<sysopen>,
170 except that it is mandatory (i.e. use C<0> if you don't create new files,
171 and C<0666> or C<0777> if you do).
172
173 Example:
174
175 aio_open "/etc/passwd", O_RDONLY, 0, sub {
176 if ($_[0]) {
177 print "open successful, fh is $_[0]\n";
178 ...
179 } else {
180 die "open failed: $!\n";
181 }
182 };
183
184 =item aio_close $fh, $callback->($status)
185
186 Asynchronously close a file and call the callback with the result
187 code. I<WARNING:> although accepted, you should not pass in a perl
188 filehandle here, as perl will likely close the file descriptor another
189 time when the filehandle is destroyed. Normally, you can safely call perls
190 C<close> or just let filehandles go out of scope.
191
192 This is supposed to be a bug in the API, so that might change. It's
193 therefore best to avoid this function.
194
195 =item aio_read $fh,$offset,$length, $data,$dataoffset, $callback->($retval)
196
197 =item aio_write $fh,$offset,$length, $data,$dataoffset, $callback->($retval)
198
199 Reads or writes C<length> bytes from the specified C<fh> and C<offset>
200 into the scalar given by C<data> and offset C<dataoffset> and calls the
201 callback without the actual number of bytes read (or -1 on error, just
202 like the syscall).
203
204 The C<$data> scalar I<MUST NOT> be modified in any way while the request
205 is outstanding. Modifying it can result in segfaults or WW3 (if the
206 necessary/optional hardware is installed).
207
208 Example: Read 15 bytes at offset 7 into scalar C<$buffer>, starting at
209 offset C<0> within the scalar:
210
211 aio_read $fh, 7, 15, $buffer, 0, sub {
212 $_[0] > 0 or die "read error: $!";
213 print "read $_[0] bytes: <$buffer>\n";
214 };
215
216 =item aio_move $srcpath, $dstpath, $callback->($status)
217
218 Try to move the I<file> (directories not supported as either source or
219 destination) from C<$srcpath> to C<$dstpath> and call the callback with
220 the C<0> (error) or C<-1> ok.
221
222 This is a composite request that tries to rename(2) the file first. If
223 rename files with C<EXDEV>, it creates the destination file with mode 0200
224 and copies the contents of the source file into it using C<aio_sendfile>,
225 followed by restoring atime, mtime, access mode and uid/gid, in that
226 order, and unlinking the C<$srcpath>.
227
228 If an error occurs, the partial destination file will be unlinked, if
229 possible, except when setting atime, mtime, access mode and uid/gid, where
230 errors are being ignored.
231
232 =cut
233
234 sub aio_move($$$) {
235 my ($src, $dst, $cb) = @_;
236
237 my $grp = aio_group $cb;
238
239 add $grp aio_rename $src, $dst, sub {
240 if ($_[0] && $! == EXDEV) {
241 add $grp aio_open $src, O_RDONLY, 0, sub {
242 if (my $src_fh = $_[0]) {
243 my @stat = stat $src_fh;
244
245 add $grp aio_open $dst, O_WRONLY, 0200, sub {
246 if (my $dst_fh = $_[0]) {
247 add $grp aio_sendfile $dst_fh, $src_fh, 0, $stat[7], sub {
248 close $src_fh;
249
250 if ($_[0] == $stat[7]) {
251 utime $stat[8], $stat[9], $dst;
252 chmod $stat[2] & 07777, $dst_fh;
253 chown $stat[4], $stat[5], $dst_fh;
254 close $dst_fh;
255
256 add $grp aio_unlink $src, sub {
257 $grp->result ($_[0]);
258 };
259 } else {
260 my $errno = $!;
261 add $grp aio_unlink $dst, sub {
262 $! = $errno;
263 $grp->result (-1);
264 };
265 }
266 };
267 } else {
268 $grp->result (-1);
269 }
270 },
271
272 } else {
273 $grp->result (-1);
274 }
275 };
276 } else {
277 $grp->result ($_[0]);
278 }
279 };
280
281 $grp
282 }
283
284 =item aio_sendfile $out_fh, $in_fh, $in_offset, $length, $callback->($retval)
285
286 Tries to copy C<$length> bytes from C<$in_fh> to C<$out_fh>. It starts
287 reading at byte offset C<$in_offset>, and starts writing at the current
288 file offset of C<$out_fh>. Because of that, it is not safe to issue more
289 than one C<aio_sendfile> per C<$out_fh>, as they will interfere with each
290 other.
291
292 This call tries to make use of a native C<sendfile> syscall to provide
293 zero-copy operation. For this to work, C<$out_fh> should refer to a
294 socket, and C<$in_fh> should refer to mmap'able file.
295
296 If the native sendfile call fails or is not implemented, it will be
297 emulated, so you can call C<aio_sendfile> on any type of filehandle
298 regardless of the limitations of the operating system.
299
300 Please note, however, that C<aio_sendfile> can read more bytes from
301 C<$in_fh> than are written, and there is no way to find out how many
302 bytes have been read from C<aio_sendfile> alone, as C<aio_sendfile> only
303 provides the number of bytes written to C<$out_fh>. Only if the result
304 value equals C<$length> one can assume that C<$length> bytes have been
305 read.
306
307 =item aio_readahead $fh,$offset,$length, $callback->($retval)
308
309 C<aio_readahead> populates the page cache with data from a file so that
310 subsequent reads from that file will not block on disk I/O. The C<$offset>
311 argument specifies the starting point from which data is to be read and
312 C<$length> specifies the number of bytes to be read. I/O is performed in
313 whole pages, so that offset is effectively rounded down to a page boundary
314 and bytes are read up to the next page boundary greater than or equal to
315 (off-set+length). C<aio_readahead> does not read beyond the end of the
316 file. The current file offset of the file is left unchanged.
317
318 If that syscall doesn't exist (likely if your OS isn't Linux) it will be
319 emulated by simply reading the data, which would have a similar effect.
320
321 =item aio_stat $fh_or_path, $callback->($status)
322
323 =item aio_lstat $fh, $callback->($status)
324
325 Works like perl's C<stat> or C<lstat> in void context. The callback will
326 be called after the stat and the results will be available using C<stat _>
327 or C<-s _> etc...
328
329 The pathname passed to C<aio_stat> must be absolute. See API NOTES, above,
330 for an explanation.
331
332 Currently, the stats are always 64-bit-stats, i.e. instead of returning an
333 error when stat'ing a large file, the results will be silently truncated
334 unless perl itself is compiled with large file support.
335
336 Example: Print the length of F</etc/passwd>:
337
338 aio_stat "/etc/passwd", sub {
339 $_[0] and die "stat failed: $!";
340 print "size is ", -s _, "\n";
341 };
342
343 =item aio_unlink $pathname, $callback->($status)
344
345 Asynchronously unlink (delete) a file and call the callback with the
346 result code.
347
348 =item aio_link $srcpath, $dstpath, $callback->($status)
349
350 Asynchronously create a new link to the existing object at C<$srcpath> at
351 the path C<$dstpath> and call the callback with the result code.
352
353 =item aio_symlink $srcpath, $dstpath, $callback->($status)
354
355 Asynchronously create a new symbolic link to the existing object at C<$srcpath> at
356 the path C<$dstpath> and call the callback with the result code.
357
358 =item aio_rename $srcpath, $dstpath, $callback->($status)
359
360 Asynchronously rename the object at C<$srcpath> to C<$dstpath>, just as
361 rename(2) and call the callback with the result code.
362
363 =item aio_rmdir $pathname, $callback->($status)
364
365 Asynchronously rmdir (delete) a directory and call the callback with the
366 result code.
367
368 =item aio_readdir $pathname, $callback->($entries)
369
370 Unlike the POSIX call of the same name, C<aio_readdir> reads an entire
371 directory (i.e. opendir + readdir + closedir). The entries will not be
372 sorted, and will B<NOT> include the C<.> and C<..> entries.
373
374 The callback a single argument which is either C<undef> or an array-ref
375 with the filenames.
376
377 =item aio_scandir $path, $maxreq, $callback->($dirs, $nondirs)
378
379 Scans a directory (similar to C<aio_readdir>) but additionally tries to
380 separate the entries of directory C<$path> into two sets of names, ones
381 you can recurse into (directories or links to them), and ones you cannot
382 recurse into (everything else).
383
384 C<aio_scandir> is a composite request that creates of many sub requests_
385 C<$maxreq> specifies the maximum number of outstanding aio requests that
386 this function generates. If it is C<< <= 0 >>, then a suitable default
387 will be chosen (currently 6).
388
389 On error, the callback is called without arguments, otherwise it receives
390 two array-refs with path-relative entry names.
391
392 Example:
393
394 aio_scandir $dir, 0, sub {
395 my ($dirs, $nondirs) = @_;
396 print "real directories: @$dirs\n";
397 print "everything else: @$nondirs\n";
398 };
399
400 Implementation notes.
401
402 The C<aio_readdir> cannot be avoided, but C<stat()>'ing every entry can.
403
404 After reading the directory, the modification time, size etc. of the
405 directory before and after the readdir is checked, and if they match (and
406 isn't the current time), the link count will be used to decide how many
407 entries are directories (if >= 2). Otherwise, no knowledge of the number
408 of subdirectories will be assumed.
409
410 Then entries will be sorted into likely directories (everything without
411 a non-initial dot currently) and likely non-directories (everything
412 else). Then every entry plus an appended C</.> will be C<stat>'ed,
413 likely directories first. If that succeeds, it assumes that the entry
414 is a directory or a symlink to directory (which will be checked
415 seperately). This is often faster than stat'ing the entry itself because
416 filesystems might detect the type of the entry without reading the inode
417 data (e.g. ext2fs filetype feature).
418
419 If the known number of directories (link count - 2) has been reached, the
420 rest of the entries is assumed to be non-directories.
421
422 This only works with certainty on POSIX (= UNIX) filesystems, which
423 fortunately are the vast majority of filesystems around.
424
425 It will also likely work on non-POSIX filesystems with reduced efficiency
426 as those tend to return 0 or 1 as link counts, which disables the
427 directory counting heuristic.
428
429 =cut
430
431 sub aio_scandir($$$) {
432 my ($path, $maxreq, $cb) = @_;
433
434 my $grp = aio_group $cb;
435
436 $maxreq = 6 if $maxreq <= 0;
437
438 # stat once
439 add $grp aio_stat $path, sub {
440 return $grp->result () if $_[0];
441 my $now = time;
442 my $hash1 = join ":", (stat _)[0,1,3,7,9];
443
444 # read the directory entries
445 add $grp aio_readdir $path, sub {
446 my $entries = shift
447 or return $grp->result ();
448
449 # stat the dir another time
450 add $grp aio_stat $path, sub {
451 my $hash2 = join ":", (stat _)[0,1,3,7,9];
452
453 my $ndirs;
454
455 # take the slow route if anything looks fishy
456 if ($hash1 ne $hash2 or (stat _)[9] == $now) {
457 $ndirs = -1;
458 } else {
459 # if nlink == 2, we are finished
460 # on non-posix-fs's, we rely on nlink < 2
461 $ndirs = (stat _)[3] - 2
462 or return $grp->result ([], $entries);
463 }
464
465 # sort into likely dirs and likely nondirs
466 # dirs == files without ".", short entries first
467 $entries = [map $_->[0],
468 sort { $b->[1] cmp $a->[1] }
469 map [$_, sprintf "%s%04d", (/.\./ ? "1" : "0"), length],
470 @$entries];
471
472 my (@dirs, @nondirs);
473
474 my ($statcb, $schedcb);
475 my $nreq = 0;
476
477 my $statgrp = add $grp aio_group;
478
479 $schedcb = sub {
480 if (@$entries) {
481 if ($nreq < $maxreq) {
482 my $ent = pop @$entries;
483 $nreq++;
484 add $statgrp aio_stat "$path/$ent/.", sub { $statcb->($_[0], $ent) };
485 }
486 } elsif (!$nreq) {
487 # finished
488 $statgrp->cancel;
489 undef $statcb;
490 undef $schedcb;
491 $grp->result (\@dirs, \@nondirs);
492 }
493 };
494 $statcb = sub {
495 my ($status, $entry) = @_;
496
497 if ($status < 0) {
498 $nreq--;
499 push @nondirs, $entry;
500 &$schedcb;
501 } else {
502 # need to check for real directory
503 add $grp aio_lstat "$path/$entry", sub {
504 $nreq--;
505
506 if (-d _) {
507 push @dirs, $entry;
508
509 if (!--$ndirs) {
510 push @nondirs, @$entries;
511 $entries = [];
512 }
513 } else {
514 push @nondirs, $entry;
515 }
516
517 &$schedcb;
518 }
519 }
520 };
521
522 &$schedcb while @$entries && $nreq < $maxreq;
523 };
524 };
525 };
526
527 $grp
528 }
529
530 =item aio_fsync $fh, $callback->($status)
531
532 Asynchronously call fsync on the given filehandle and call the callback
533 with the fsync result code.
534
535 =item aio_fdatasync $fh, $callback->($status)
536
537 Asynchronously call fdatasync on the given filehandle and call the
538 callback with the fdatasync result code.
539
540 If this call isn't available because your OS lacks it or it couldn't be
541 detected, it will be emulated by calling C<fsync> instead.
542
543 =item aio_group $callback->(...)
544
545 [EXPERIMENTAL]
546
547 This is a very special aio request: Instead of doing something, it is a
548 container for other aio requests, which is useful if you want to bundle
549 many requests into a single, composite, request with a definite callback
550 and the ability to cancel the whole request with its subrequests.
551
552 Returns an object of class L<IO::AIO::GRP>. See its documentation below
553 for more info.
554
555 Example:
556
557 my $grp = aio_group sub {
558 print "all stats done\n";
559 };
560
561 add $grp
562 (aio_stat ...),
563 (aio_stat ...),
564 ...;
565
566 =item aio_nop $callback->()
567
568 This is a special request - it does nothing in itself and is only used for
569 side effects, such as when you want to add a dummy request to a group so
570 that finishing the requests in the group depends on executing the given
571 code.
572
573 While this request does nothing, it still goes through the execution
574 phase and still requires a worker thread. Thus, the callback will not
575 be executed immediately but only after other requests in the queue have
576 entered their execution phase. This can be used to measure request
577 latency.
578
579 =item IO::AIO::aio_busy $fractional_seconds, $callback->() *NOT EXPORTED*
580
581 Mainly used for debugging and benchmarking, this aio request puts one of
582 the request workers to sleep for the given time.
583
584 While it is theoretically handy to have simple I/O scheduling requests
585 like sleep and file handle readable/writable, the overhead this creates is
586 immense (it blocks a thread for a long time) so do not use this function
587 except to put your application under artificial I/O pressure.
588
589 =back
590
591 =head2 IO::AIO::REQ CLASS
592
593 All non-aggregate C<aio_*> functions return an object of this class when
594 called in non-void context.
595
596 A request always moves through the following five states in its lifetime,
597 in order: B<ready> (request has been created, but has not been executed
598 yet), B<execute> (request is currently being executed), B<pending>
599 (request has been executed but callback has not been called yet),
600 B<result> (results are being processed synchronously, includes calling the
601 callback) and B<done> (request has reached the end of its lifetime and
602 holds no resources anymore).
603
604 =over 4
605
606 =item cancel $req
607
608 Cancels the request, if possible. Has the effect of skipping execution
609 when entering the B<execute> state and skipping calling the callback when
610 entering the the B<result> state, but will leave the request otherwise
611 untouched. That means that requests that currently execute will not be
612 stopped and resources held by the request will not be freed prematurely.
613
614 =item cb $req $callback->(...)
615
616 Replace (or simply set) the callback registered to the request.
617
618 =back
619
620 =head2 IO::AIO::GRP CLASS
621
622 This class is a subclass of L<IO::AIO::REQ>, so all its methods apply to
623 objects of this class, too.
624
625 A IO::AIO::GRP object is a special request that can contain multiple other
626 aio requests.
627
628 You create one by calling the C<aio_group> constructing function with a
629 callback that will be called when all contained requests have entered the
630 C<done> state:
631
632 my $grp = aio_group sub {
633 print "all requests are done\n";
634 };
635
636 You add requests by calling the C<add> method with one or more
637 C<IO::AIO::REQ> objects:
638
639 $grp->add (aio_unlink "...");
640
641 add $grp aio_stat "...", sub {
642 $_[0] or return $grp->result ("error");
643
644 # add another request dynamically, if first succeeded
645 add $grp aio_open "...", sub {
646 $grp->result ("ok");
647 };
648 };
649
650 This makes it very easy to create composite requests (see the source of
651 C<aio_move> for an application) that work and feel like simple requests.
652
653 =over 4
654
655 =item * The IO::AIO::GRP objects will be cleaned up during calls to
656 C<IO::AIO::poll_cb>, just like any other request.
657
658 =item * They can be canceled like any other request. Canceling will cancel not
659 only the request itself, but also all requests it contains.
660
661 =item * They can also can also be added to other IO::AIO::GRP objects.
662
663 =item * You must not add requests to a group from within the group callback (or
664 any later time).
665
666 =item * This does not harmonise well with C<max_outstanding>, so best do
667 not combine C<aio_group> with it. Groups and feeders are recommended for
668 this kind of concurrency-limiting.
669
670 =back
671
672 Their lifetime, simplified, looks like this: when they are empty, they
673 will finish very quickly. If they contain only requests that are in the
674 C<done> state, they will also finish. Otherwise they will continue to
675 exist.
676
677 That means after creating a group you have some time to add requests. And
678 in the callbacks of those requests, you can add further requests to the
679 group. And only when all those requests have finished will the the group
680 itself finish.
681
682 =over 4
683
684 =item add $grp ...
685
686 =item $grp->add (...)
687
688 Add one or more requests to the group. Any type of L<IO::AIO::REQ> can
689 be added, including other groups, as long as you do not create circular
690 dependencies.
691
692 Returns all its arguments.
693
694 =item $grp->result (...)
695
696 Set the result value(s) that will be passed to the group callback when all
697 subrequests have finished. By default, no argument will be passed.
698
699 =item feed $grp $callback->($grp)
700
701 [VERY EXPERIMENTAL]
702
703 Sets a feeder/generator on this group: every group can have an attached
704 generator that generates requests if idle. The idea behind this is that,
705 although you could just queue as many requests as you want in a group,
706 this might starve other requests for a potentially long time. For
707 example, C<aio_scandir> might generate hundreds of thousands C<aio_stat>
708 requests, delaying any later requests for a long time.
709
710 To avoid this, and allow incremental generation of requests, you can
711 instead a group and set a feeder on it that generates those requests. The
712 feed callback will be called whenever there are few enough (see C<limit>,
713 below) requests active in the group itself and is expected to queue more
714 requests.
715
716 The feed callback can queue as many requests as it likes (i.e. C<add> does
717 not impose any limits).
718
719 If the feed does not queue more requests when called, it will be
720 automatically removed from the group.
721
722 If the feed limit is C<0>, it will be set to C<2> automatically.
723
724 Example:
725
726 # stat all files in @files, but only ever use four aio requests concurrently:
727
728 my $grp = aio_group sub { print "finished\n" };
729 limit $grp 4;
730 feed $grp sub {
731 my $file = pop @files
732 or return;
733
734 add $grp aio_stat $file, sub { ... };
735 };
736
737 =item limit $grp $num
738
739 Sets the feeder limit for the group: The feeder will be called whenever
740 the group contains less than this many requests.
741
742 Setting the limit to C<0> will pause the feeding process.
743
744 =back
745
746 =head2 SUPPORT FUNCTIONS
747
748 =over 4
749
750 =item $fileno = IO::AIO::poll_fileno
751
752 Return the I<request result pipe file descriptor>. This filehandle must be
753 polled for reading by some mechanism outside this module (e.g. Event or
754 select, see below or the SYNOPSIS). If the pipe becomes readable you have
755 to call C<poll_cb> to check the results.
756
757 See C<poll_cb> for an example.
758
759 =item IO::AIO::poll_cb
760
761 Process all outstanding events on the result pipe. You have to call this
762 regularly. Returns the number of events processed. Returns immediately
763 when no events are outstanding.
764
765 Example: Install an Event watcher that automatically calls
766 IO::AIO::poll_cb with high priority:
767
768 Event->io (fd => IO::AIO::poll_fileno,
769 poll => 'r', async => 1,
770 cb => \&IO::AIO::poll_cb);
771
772 =item IO::AIO::poll_wait
773
774 Wait till the result filehandle becomes ready for reading (simply does a
775 C<select> on the filehandle. This is useful if you want to synchronously wait
776 for some requests to finish).
777
778 See C<nreqs> for an example.
779
780 =item IO::AIO::nreqs
781
782 Returns the number of requests currently outstanding (i.e. for which their
783 callback has not been invoked yet).
784
785 Example: wait till there are no outstanding requests anymore:
786
787 IO::AIO::poll_wait, IO::AIO::poll_cb
788 while IO::AIO::nreqs;
789
790 =item IO::AIO::flush
791
792 Wait till all outstanding AIO requests have been handled.
793
794 Strictly equivalent to:
795
796 IO::AIO::poll_wait, IO::AIO::poll_cb
797 while IO::AIO::nreqs;
798
799 =item IO::AIO::poll
800
801 Waits until some requests have been handled.
802
803 Strictly equivalent to:
804
805 IO::AIO::poll_wait, IO::AIO::poll_cb
806 if IO::AIO::nreqs;
807
808 =item IO::AIO::min_parallel $nthreads
809
810 Set the minimum number of AIO threads to C<$nthreads>. The current
811 default is C<8>, which means eight asynchronous operations can execute
812 concurrently at any one time (the number of outstanding requests,
813 however, is unlimited).
814
815 IO::AIO starts threads only on demand, when an AIO request is queued and
816 no free thread exists.
817
818 It is recommended to keep the number of threads relatively low, as some
819 Linux kernel versions will scale negatively with the number of threads
820 (higher parallelity => MUCH higher latency). With current Linux 2.6
821 versions, 4-32 threads should be fine.
822
823 Under most circumstances you don't need to call this function, as the
824 module selects a default that is suitable for low to moderate load.
825
826 =item IO::AIO::max_parallel $nthreads
827
828 Sets the maximum number of AIO threads to C<$nthreads>. If more than the
829 specified number of threads are currently running, this function kills
830 them. This function blocks until the limit is reached.
831
832 While C<$nthreads> are zero, aio requests get queued but not executed
833 until the number of threads has been increased again.
834
835 This module automatically runs C<max_parallel 0> at program end, to ensure
836 that all threads are killed and that there are no outstanding requests.
837
838 Under normal circumstances you don't need to call this function.
839
840 =item $oldnreqs = IO::AIO::max_outstanding $nreqs
841
842 [DEPRECATED]
843
844 Sets the maximum number of outstanding requests to C<$nreqs>. If you
845 try to queue up more than this number of requests, the caller will block until
846 some requests have been handled.
847
848 The default is very large, so normally there is no practical limit. If you
849 queue up many requests in a loop it often improves speed if you set
850 this to a relatively low number, such as C<100>.
851
852 This function does not work well together with C<aio_group>'s, and their
853 feeder interface is better suited to limiting concurrency, so do not use
854 this function.
855
856 Under normal circumstances you don't need to call this function.
857
858 =back
859
860 =cut
861
862 # support function to convert a fd into a perl filehandle
863 sub _fd2fh {
864 return undef if $_[0] < 0;
865
866 # try to generate nice filehandles
867 my $sym = "IO::AIO::fd#$_[0]";
868 local *$sym;
869
870 open *$sym, "+<&=$_[0]" # usually works under any unix
871 or open *$sym, "<&=$_[0]" # cygwin needs this
872 or open *$sym, ">&=$_[0]" # or this
873 or return undef;
874
875 *$sym
876 }
877
878 min_parallel 8;
879
880 END {
881 max_parallel 0;
882 }
883
884 1;
885
886 =head2 FORK BEHAVIOUR
887
888 This module should do "the right thing" when the process using it forks:
889
890 Before the fork, IO::AIO enters a quiescent state where no requests
891 can be added in other threads and no results will be processed. After
892 the fork the parent simply leaves the quiescent state and continues
893 request/result processing, while the child clears the request/result
894 queue (so the requests started before the fork will only be handled in
895 the parent). Threads will be started on demand until the limit ste in the
896 parent process has been reached again.
897
898 In short: the parent will, after a short pause, continue as if fork had
899 not been called, while the child will act as if IO::AIO has not been used
900 yet.
901
902 =head2 MEMORY USAGE
903
904 Each aio request uses - depending on your architecture - around 128 bytes
905 of memory. In addition, stat requests need a stat buffer (possibly a few
906 hundred bytes). Perl scalars and other data passed into aio requests will
907 also be locked.
908
909 This is now awfully much, so queuing lots of requests is not usually a
910 problem.
911
912 Each thread needs a stack area which is usually around 16k, sometimes much
913 larger, depending on the OS.
914
915 =head1 SEE ALSO
916
917 L<Coro::AIO>.
918
919 =head1 AUTHOR
920
921 Marc Lehmann <schmorp@schmorp.de>
922 http://home.schmorp.de/
923
924 =cut
925