ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/IO-AIO/AIO.pm
Revision: 1.47
Committed: Thu Dec 29 15:44:13 2005 UTC (18 years, 4 months ago) by root
Branch: MAIN
CVS Tags: rel-1_72
Changes since 1.46: +2 -2 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3     IO::AIO - Asynchronous Input/Output
4    
5     =head1 SYNOPSIS
6    
7     use IO::AIO;
8    
9 root 1.6 aio_open "/etc/passwd", O_RDONLY, 0, sub {
10     my ($fh) = @_;
11     ...
12     };
13    
14     aio_unlink "/tmp/file", sub { };
15    
16     aio_read $fh, 30000, 1024, $buffer, 0, sub {
17 root 1.8 $_[0] > 0 or die "read error: $!";
18 root 1.6 };
19    
20 root 1.42 # AnyEvent
21     open my $fh, "<&=" . IO::AIO::poll_fileno or die "$!";
22     my $w = AnyEvent->io (fh => $fh, poll => 'r', cb => sub { IO::AIO::poll_cb });
23    
24 root 1.6 # Event
25     Event->io (fd => IO::AIO::poll_fileno,
26 root 1.7 poll => 'r',
27 root 1.6 cb => \&IO::AIO::poll_cb);
28    
29     # Glib/Gtk2
30     add_watch Glib::IO IO::AIO::poll_fileno,
31 root 1.22 in => sub { IO::AIO::poll_cb; 1 };
32 root 1.6
33     # Tk
34     Tk::Event::IO->fileevent (IO::AIO::poll_fileno, "",
35     readable => \&IO::AIO::poll_cb);
36    
37 root 1.11 # Danga::Socket
38     Danga::Socket->AddOtherFds (IO::AIO::poll_fileno =>
39     \&IO::AIO::poll_cb);
40    
41    
42 root 1.1 =head1 DESCRIPTION
43    
44     This module implements asynchronous I/O using whatever means your
45 root 1.2 operating system supports.
46 root 1.1
47 root 1.2 Currently, a number of threads are started that execute your read/writes
48     and signal their completion. You don't need thread support in your libc or
49     perl, and the threads created by this module will not be visible to the
50     pthreads library. In the future, this module might make use of the native
51     aio functions available on many operating systems. However, they are often
52     not well-supported (Linux doesn't allow them on normal files currently,
53     for example), and they would only support aio_read and aio_write, so the
54     remaining functionality would have to be implemented using threads anyway.
55 root 1.1
56     Although the module will work with in the presence of other threads, it is
57 root 1.22 currently not reentrant, so use appropriate locking yourself, always call
58     C<poll_cb> from within the same thread, or never call C<poll_cb> (or other
59     C<aio_> functions) recursively.
60 root 1.1
61     =cut
62    
63     package IO::AIO;
64    
65 root 1.23 no warnings;
66    
67 root 1.1 use base 'Exporter';
68    
69 root 1.2 use Fcntl ();
70    
71 root 1.1 BEGIN {
72 root 1.47 $VERSION = '1.72';
73 root 1.1
74 root 1.39 @EXPORT = qw(aio_sendfile aio_read aio_write aio_open aio_close aio_stat
75 root 1.40 aio_lstat aio_unlink aio_rmdir aio_readdir aio_scandir aio_symlink
76 root 1.38 aio_fsync aio_fdatasync aio_readahead);
77     @EXPORT_OK = qw(poll_fileno poll_cb min_parallel max_parallel
78     max_outstanding nreqs);
79 root 1.1
80     require XSLoader;
81     XSLoader::load IO::AIO, $VERSION;
82     }
83    
84 root 1.5 =head1 FUNCTIONS
85 root 1.1
86 root 1.5 =head2 AIO FUNCTIONS
87 root 1.1
88 root 1.5 All the C<aio_*> calls are more or less thin wrappers around the syscall
89     with the same name (sans C<aio_>). The arguments are similar or identical,
90 root 1.14 and they all accept an additional (and optional) C<$callback> argument
91     which must be a code reference. This code reference will get called with
92     the syscall return code (e.g. most syscalls return C<-1> on error, unlike
93     perl, which usually delivers "false") as it's sole argument when the given
94     syscall has been executed asynchronously.
95 root 1.1
96 root 1.23 All functions expecting a filehandle keep a copy of the filehandle
97     internally until the request has finished.
98 root 1.1
99 root 1.28 The pathnames you pass to these routines I<must> be absolute and
100     encoded in byte form. The reason for the former is that at the time the
101     request is being executed, the current working directory could have
102     changed. Alternatively, you can make sure that you never change the
103     current working directory.
104    
105     To encode pathnames to byte form, either make sure you either: a)
106     always pass in filenames you got from outside (command line, readdir
107     etc.), b) are ASCII or ISO 8859-1, c) use the Encode module and encode
108     your pathnames to the locale (or other) encoding in effect in the user
109     environment, d) use Glib::filename_from_unicode on unicode filenames or e)
110     use something else.
111 root 1.1
112 root 1.5 =over 4
113 root 1.1
114 root 1.40 =item aio_open $pathname, $flags, $mode, $callback->($fh)
115 root 1.1
116 root 1.2 Asynchronously open or create a file and call the callback with a newly
117     created filehandle for the file.
118 root 1.1
119     The pathname passed to C<aio_open> must be absolute. See API NOTES, above,
120     for an explanation.
121    
122 root 1.20 The C<$flags> argument is a bitmask. See the C<Fcntl> module for a
123     list. They are the same as used by C<sysopen>.
124    
125     Likewise, C<$mode> specifies the mode of the newly created file, if it
126     didn't exist and C<O_CREAT> has been given, just like perl's C<sysopen>,
127     except that it is mandatory (i.e. use C<0> if you don't create new files,
128     and C<0666> or C<0777> if you do).
129 root 1.1
130     Example:
131    
132     aio_open "/etc/passwd", O_RDONLY, 0, sub {
133 root 1.2 if ($_[0]) {
134     print "open successful, fh is $_[0]\n";
135 root 1.1 ...
136     } else {
137     die "open failed: $!\n";
138     }
139     };
140    
141 root 1.40 =item aio_close $fh, $callback->($status)
142 root 1.1
143 root 1.2 Asynchronously close a file and call the callback with the result
144     code. I<WARNING:> although accepted, you should not pass in a perl
145 root 1.20 filehandle here, as perl will likely close the file descriptor another
146     time when the filehandle is destroyed. Normally, you can safely call perls
147     C<close> or just let filehandles go out of scope.
148    
149     This is supposed to be a bug in the API, so that might change. It's
150     therefore best to avoid this function.
151 root 1.1
152 root 1.40 =item aio_read $fh,$offset,$length, $data,$dataoffset, $callback->($retval)
153 root 1.1
154 root 1.40 =item aio_write $fh,$offset,$length, $data,$dataoffset, $callback->($retval)
155 root 1.1
156     Reads or writes C<length> bytes from the specified C<fh> and C<offset>
157     into the scalar given by C<data> and offset C<dataoffset> and calls the
158     callback without the actual number of bytes read (or -1 on error, just
159     like the syscall).
160    
161 root 1.31 The C<$data> scalar I<MUST NOT> be modified in any way while the request
162     is outstanding. Modifying it can result in segfaults or WW3 (if the
163     necessary/optional hardware is installed).
164    
165 root 1.17 Example: Read 15 bytes at offset 7 into scalar C<$buffer>, starting at
166 root 1.1 offset C<0> within the scalar:
167    
168     aio_read $fh, 7, 15, $buffer, 0, sub {
169 root 1.9 $_[0] > 0 or die "read error: $!";
170     print "read $_[0] bytes: <$buffer>\n";
171 root 1.1 };
172    
173 root 1.40 =item aio_sendfile $out_fh, $in_fh, $in_offset, $length, $callback->($retval)
174 root 1.35
175     Tries to copy C<$length> bytes from C<$in_fh> to C<$out_fh>. It starts
176     reading at byte offset C<$in_offset>, and starts writing at the current
177     file offset of C<$out_fh>. Because of that, it is not safe to issue more
178     than one C<aio_sendfile> per C<$out_fh>, as they will interfere with each
179     other.
180    
181     This call tries to make use of a native C<sendfile> syscall to provide
182     zero-copy operation. For this to work, C<$out_fh> should refer to a
183     socket, and C<$in_fh> should refer to mmap'able file.
184    
185     If the native sendfile call fails or is not implemented, it will be
186 root 1.36 emulated, so you can call C<aio_sendfile> on any type of filehandle
187     regardless of the limitations of the operating system.
188 root 1.35
189     Please note, however, that C<aio_sendfile> can read more bytes from
190     C<$in_fh> than are written, and there is no way to find out how many
191 root 1.36 bytes have been read from C<aio_sendfile> alone, as C<aio_sendfile> only
192     provides the number of bytes written to C<$out_fh>. Only if the result
193     value equals C<$length> one can assume that C<$length> bytes have been
194     read.
195 root 1.35
196 root 1.40 =item aio_readahead $fh,$offset,$length, $callback->($retval)
197 root 1.1
198 root 1.20 C<aio_readahead> populates the page cache with data from a file so that
199 root 1.1 subsequent reads from that file will not block on disk I/O. The C<$offset>
200     argument specifies the starting point from which data is to be read and
201     C<$length> specifies the number of bytes to be read. I/O is performed in
202     whole pages, so that offset is effectively rounded down to a page boundary
203     and bytes are read up to the next page boundary greater than or equal to
204 root 1.20 (off-set+length). C<aio_readahead> does not read beyond the end of the
205 root 1.1 file. The current file offset of the file is left unchanged.
206    
207 root 1.26 If that syscall doesn't exist (likely if your OS isn't Linux) it will be
208     emulated by simply reading the data, which would have a similar effect.
209    
210 root 1.40 =item aio_stat $fh_or_path, $callback->($status)
211 root 1.1
212 root 1.40 =item aio_lstat $fh, $callback->($status)
213 root 1.1
214     Works like perl's C<stat> or C<lstat> in void context. The callback will
215     be called after the stat and the results will be available using C<stat _>
216     or C<-s _> etc...
217    
218     The pathname passed to C<aio_stat> must be absolute. See API NOTES, above,
219     for an explanation.
220    
221     Currently, the stats are always 64-bit-stats, i.e. instead of returning an
222     error when stat'ing a large file, the results will be silently truncated
223     unless perl itself is compiled with large file support.
224    
225     Example: Print the length of F</etc/passwd>:
226    
227     aio_stat "/etc/passwd", sub {
228     $_[0] and die "stat failed: $!";
229     print "size is ", -s _, "\n";
230     };
231    
232 root 1.40 =item aio_unlink $pathname, $callback->($status)
233 root 1.1
234     Asynchronously unlink (delete) a file and call the callback with the
235     result code.
236    
237 root 1.40 =item aio_rmdir $pathname, $callback->($status)
238 root 1.27
239     Asynchronously rmdir (delete) a directory and call the callback with the
240     result code.
241    
242 root 1.46 =item aio_readdir $pathname, $callback->($entries)
243 root 1.37
244     Unlike the POSIX call of the same name, C<aio_readdir> reads an entire
245     directory (i.e. opendir + readdir + closedir). The entries will not be
246     sorted, and will B<NOT> include the C<.> and C<..> entries.
247    
248     The callback a single argument which is either C<undef> or an array-ref
249     with the filenames.
250    
251 root 1.40 =item aio_scandir $path, $maxreq, $callback->($dirs, $nondirs)
252    
253     Scans a directory (similar to C<aio_readdir>) and tries to separate the
254     entries of directory C<$path> into two sets of names, ones you can recurse
255     into (directories), and ones you cannot recurse into (everything else).
256    
257     C<aio_scandir> is a composite request that consists of many
258     aio-primitives. C<$maxreq> specifies the maximum number of outstanding
259     aio requests that this function generates. If it is C<< <= 0 >>, then a
260     suitable default will be chosen (currently 8).
261    
262     On error, the callback is called without arguments, otherwise it receives
263     two array-refs with path-relative entry names.
264    
265     Example:
266    
267     aio_scandir $dir, 0, sub {
268     my ($dirs, $nondirs) = @_;
269     print "real directories: @$dirs\n";
270     print "everything else: @$nondirs\n";
271     };
272    
273     Implementation notes.
274    
275     The C<aio_readdir> cannot be avoided, but C<stat()>'ing every entry can.
276    
277     After reading the directory, the modification time, size etc. of the
278     directory before and after the readdir is checked, and if they match, the
279     link count will be used to decide how many entries are directories (if
280     >= 2). Otherwise, no knowledge of the number of subdirectories will be
281     assumed.
282    
283     Then entires will be sorted into likely directories (everything without a
284     non-initial dot) and likely non-directories (everything else). Then every
285     entry + C</.> will be C<stat>'ed, likely directories first. This is often
286     faster because filesystems might detect the type of the entry without
287     reading the inode data (e.g. ext2s filetype feature). If that succeeds,
288     it assumes that the entry is a directory or a symlink to directory (which
289     will be checked seperately).
290    
291     If the known number of directories has been reached, the rest of the
292     entries is assumed to be non-directories.
293    
294     =cut
295    
296     sub aio_scandir($$$) {
297     my ($path, $maxreq, $cb) = @_;
298    
299     $maxreq = 8 if $maxreq <= 0;
300    
301     # stat once
302     aio_stat $path, sub {
303 root 1.47 return $cb->() if $_[0];
304 root 1.40 my $hash1 = join ":", (stat _)[0,1,3,7,9];
305    
306     # read the directory entries
307     aio_readdir $path, sub {
308     my $entries = shift
309     or return $cb->();
310    
311     # stat the dir another time
312     aio_stat $path, sub {
313     my $hash2 = join ":", (stat _)[0,1,3,7,9];
314    
315     my $ndirs;
316    
317     # take the slow route if anything looks fishy
318     if ($hash1 ne $hash2) {
319     $ndirs = -1;
320     } else {
321     # if nlink == 2, we are finished
322     # on non-posix-fs's, we rely on nlink < 2
323     $ndirs = (stat _)[3] - 2
324 root 1.43 or return $cb->([], $entries);
325 root 1.40 }
326    
327     # sort into likely dirs and likely nondirs
328     # dirs == files without ".", short entries first
329     $entries = [map $_->[0],
330     sort { $b->[1] cmp $a->[1] }
331     map [$_, sprintf "%s%04d", (/.\./ ? "1" : "0"), length],
332     @$entries];
333    
334     my (@dirs, @nondirs);
335    
336     my ($statcb, $schedcb);
337     my $nreq = 0;
338    
339     $schedcb = sub {
340     if (@$entries) {
341     if ($nreq < $maxreq) {
342     my $ent = pop @$entries;
343     $nreq++;
344     aio_stat "$path/$ent/.", sub { $statcb->($_[0], $ent) };
345     }
346     } elsif (!$nreq) {
347     # finished
348     undef $statcb;
349     undef $schedcb;
350 root 1.45 $cb->(\@dirs, \@nondirs) if $cb;
351 root 1.40 undef $cb;
352     }
353     };
354     $statcb = sub {
355     my ($status, $entry) = @_;
356    
357     if ($status < 0) {
358     $nreq--;
359     push @nondirs, $entry;
360     &$schedcb;
361     } else {
362     # need to check for real directory
363     aio_lstat "$path/$entry", sub {
364     $nreq--;
365    
366     if (-d _) {
367     push @dirs, $entry;
368    
369     if (!--$ndirs) {
370     push @nondirs, @$entries;
371     $entries = [];
372     }
373     } else {
374     push @nondirs, $entry;
375     }
376    
377     &$schedcb;
378     }
379     }
380     };
381    
382     &$schedcb while @$entries && $nreq < $maxreq;
383     };
384     };
385     };
386     }
387    
388     =item aio_fsync $fh, $callback->($status)
389 root 1.1
390     Asynchronously call fsync on the given filehandle and call the callback
391     with the fsync result code.
392    
393 root 1.40 =item aio_fdatasync $fh, $callback->($status)
394 root 1.1
395     Asynchronously call fdatasync on the given filehandle and call the
396 root 1.26 callback with the fdatasync result code.
397    
398     If this call isn't available because your OS lacks it or it couldn't be
399     detected, it will be emulated by calling C<fsync> instead.
400 root 1.1
401 root 1.5 =back
402    
403     =head2 SUPPORT FUNCTIONS
404    
405     =over 4
406    
407     =item $fileno = IO::AIO::poll_fileno
408    
409 root 1.20 Return the I<request result pipe file descriptor>. This filehandle must be
410     polled for reading by some mechanism outside this module (e.g. Event or
411     select, see below or the SYNOPSIS). If the pipe becomes readable you have
412     to call C<poll_cb> to check the results.
413 root 1.5
414     See C<poll_cb> for an example.
415    
416     =item IO::AIO::poll_cb
417    
418     Process all outstanding events on the result pipe. You have to call this
419     regularly. Returns the number of events processed. Returns immediately
420     when no events are outstanding.
421    
422 root 1.20 Example: Install an Event watcher that automatically calls
423     IO::AIO::poll_cb with high priority:
424 root 1.5
425     Event->io (fd => IO::AIO::poll_fileno,
426     poll => 'r', async => 1,
427     cb => \&IO::AIO::poll_cb);
428    
429     =item IO::AIO::poll_wait
430    
431     Wait till the result filehandle becomes ready for reading (simply does a
432 root 1.20 C<select> on the filehandle. This is useful if you want to synchronously wait
433 root 1.5 for some requests to finish).
434    
435     See C<nreqs> for an example.
436    
437     =item IO::AIO::nreqs
438    
439 root 1.20 Returns the number of requests currently outstanding (i.e. for which their
440     callback has not been invoked yet).
441 root 1.5
442     Example: wait till there are no outstanding requests anymore:
443    
444     IO::AIO::poll_wait, IO::AIO::poll_cb
445     while IO::AIO::nreqs;
446    
447 root 1.12 =item IO::AIO::flush
448    
449     Wait till all outstanding AIO requests have been handled.
450    
451 root 1.13 Strictly equivalent to:
452    
453     IO::AIO::poll_wait, IO::AIO::poll_cb
454     while IO::AIO::nreqs;
455    
456     =item IO::AIO::poll
457    
458     Waits until some requests have been handled.
459    
460     Strictly equivalent to:
461    
462     IO::AIO::poll_wait, IO::AIO::poll_cb
463     if IO::AIO::nreqs;
464    
465 root 1.5 =item IO::AIO::min_parallel $nthreads
466    
467 root 1.34 Set the minimum number of AIO threads to C<$nthreads>. The current default
468     is C<4>, which means four asynchronous operations can be done at one time
469 root 1.5 (the number of outstanding operations, however, is unlimited).
470    
471 root 1.34 IO::AIO starts threads only on demand, when an AIO request is queued and
472     no free thread exists.
473    
474 root 1.5 It is recommended to keep the number of threads low, as some Linux
475     kernel versions will scale negatively with the number of threads (higher
476     parallelity => MUCH higher latency). With current Linux 2.6 versions, 4-32
477     threads should be fine.
478    
479 root 1.34 Under most circumstances you don't need to call this function, as the
480     module selects a default that is suitable for low to moderate load.
481 root 1.5
482     =item IO::AIO::max_parallel $nthreads
483    
484 root 1.34 Sets the maximum number of AIO threads to C<$nthreads>. If more than the
485     specified number of threads are currently running, this function kills
486     them. This function blocks until the limit is reached.
487    
488     While C<$nthreads> are zero, aio requests get queued but not executed
489     until the number of threads has been increased again.
490 root 1.5
491     This module automatically runs C<max_parallel 0> at program end, to ensure
492     that all threads are killed and that there are no outstanding requests.
493    
494     Under normal circumstances you don't need to call this function.
495    
496     =item $oldnreqs = IO::AIO::max_outstanding $nreqs
497    
498     Sets the maximum number of outstanding requests to C<$nreqs>. If you
499     try to queue up more than this number of requests, the caller will block until
500     some requests have been handled.
501    
502     The default is very large, so normally there is no practical limit. If you
503 root 1.34 queue up many requests in a loop it often improves speed if you set
504 root 1.5 this to a relatively low number, such as C<100>.
505    
506     Under normal circumstances you don't need to call this function.
507    
508     =back
509    
510 root 1.1 =cut
511    
512 root 1.2 # support function to convert a fd into a perl filehandle
513     sub _fd2fh {
514     return undef if $_[0] < 0;
515    
516 root 1.23 # try to generate nice filehandles
517     my $sym = "IO::AIO::fd#$_[0]";
518     local *$sym;
519 root 1.25
520 root 1.27 open *$sym, "+<&=$_[0]" # usually works under any unix
521     or open *$sym, "<&=$_[0]" # cygwin needs this
522     or open *$sym, ">&=$_[0]" # or this
523 root 1.2 or return undef;
524    
525 root 1.23 *$sym
526 root 1.2 }
527    
528 root 1.1 min_parallel 4;
529    
530     END {
531     max_parallel 0;
532     }
533    
534     1;
535    
536 root 1.27 =head2 FORK BEHAVIOUR
537    
538 root 1.34 Before the fork, IO::AIO enters a quiescent state where no requests
539     can be added in other threads and no results will be processed. After
540     the fork the parent simply leaves the quiescent state and continues
541     request/result processing, while the child clears the request/result
542     queue (so the requests started before the fork will only be handled in
543     the parent). Threats will be started on demand until the limit ste in the
544     parent process has been reached again.
545 root 1.27
546 root 1.1 =head1 SEE ALSO
547    
548     L<Coro>, L<Linux::AIO>.
549    
550     =head1 AUTHOR
551    
552     Marc Lehmann <schmorp@schmorp.de>
553     http://home.schmorp.de/
554    
555     =cut
556