ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/IO-AIO/AIO.pm
(Generate patch)

Comparing IO-AIO/AIO.pm (file contents):
Revision 1.51 by root, Sat Jun 24 19:14:04 2006 UTC vs.
Revision 1.56 by root, Sun Oct 22 00:53:47 2006 UTC

15 15
16 aio_read $fh, 30000, 1024, $buffer, 0, sub { 16 aio_read $fh, 30000, 1024, $buffer, 0, sub {
17 $_[0] > 0 or die "read error: $!"; 17 $_[0] > 0 or die "read error: $!";
18 }; 18 };
19 19
20 # AnyEvent 20 # version 2+ has request and group objects
21 use IO::AIO 2;
22
23 my $req = aio_unlink "/tmp/file", sub { };
24 $req->cancel; # cancel request if still in queue
25
26 my $grp = aio_group sub { print "all stats done\n" };
27 add $grp aio_stat "..." for ...;
28
29 # AnyEvent integration
21 open my $fh, "<&=" . IO::AIO::poll_fileno or die "$!"; 30 open my $fh, "<&=" . IO::AIO::poll_fileno or die "$!";
22 my $w = AnyEvent->io (fh => $fh, poll => 'r', cb => sub { IO::AIO::poll_cb }); 31 my $w = AnyEvent->io (fh => $fh, poll => 'r', cb => sub { IO::AIO::poll_cb });
23 32
24 # Event 33 # Event integration
25 Event->io (fd => IO::AIO::poll_fileno, 34 Event->io (fd => IO::AIO::poll_fileno,
26 poll => 'r', 35 poll => 'r',
27 cb => \&IO::AIO::poll_cb); 36 cb => \&IO::AIO::poll_cb);
28 37
29 # Glib/Gtk2 38 # Glib/Gtk2 integration
30 add_watch Glib::IO IO::AIO::poll_fileno, 39 add_watch Glib::IO IO::AIO::poll_fileno,
31 in => sub { IO::AIO::poll_cb; 1 }; 40 in => sub { IO::AIO::poll_cb; 1 };
32 41
33 # Tk 42 # Tk integration
34 Tk::Event::IO->fileevent (IO::AIO::poll_fileno, "", 43 Tk::Event::IO->fileevent (IO::AIO::poll_fileno, "",
35 readable => \&IO::AIO::poll_cb); 44 readable => \&IO::AIO::poll_cb);
36 45
37 # Danga::Socket 46 # Danga::Socket integration
38 Danga::Socket->AddOtherFds (IO::AIO::poll_fileno => 47 Danga::Socket->AddOtherFds (IO::AIO::poll_fileno =>
39 \&IO::AIO::poll_cb); 48 \&IO::AIO::poll_cb);
40
41 49
42=head1 DESCRIPTION 50=head1 DESCRIPTION
43 51
44This module implements asynchronous I/O using whatever means your 52This module implements asynchronous I/O using whatever means your
45operating system supports. 53operating system supports.
66use strict 'vars'; 74use strict 'vars';
67 75
68use base 'Exporter'; 76use base 'Exporter';
69 77
70BEGIN { 78BEGIN {
71 our $VERSION = '1.8'; 79 our $VERSION = '2.0';
72 80
73 our @EXPORT = qw(aio_sendfile aio_read aio_write aio_open aio_close aio_stat 81 our @EXPORT = qw(aio_sendfile aio_read aio_write aio_open aio_close aio_stat
74 aio_lstat aio_unlink aio_rmdir aio_readdir aio_scandir aio_symlink 82 aio_lstat aio_unlink aio_rmdir aio_readdir aio_scandir aio_symlink
75 aio_fsync aio_fdatasync aio_readahead aio_rename aio_link aio_move); 83 aio_fsync aio_fdatasync aio_readahead aio_rename aio_link aio_move
84 aio_group);
76 our @EXPORT_OK = qw(poll_fileno poll_cb min_parallel max_parallel max_outstanding nreqs); 85 our @EXPORT_OK = qw(poll_fileno poll_cb min_parallel max_parallel max_outstanding nreqs);
86
87 @IO::AIO::GRP::ISA = 'IO::AIO::REQ';
77 88
78 require XSLoader; 89 require XSLoader;
79 XSLoader::load ("IO::AIO", $VERSION); 90 XSLoader::load ("IO::AIO", $VERSION);
80} 91}
81 92
91perl, which usually delivers "false") as it's sole argument when the given 102perl, which usually delivers "false") as it's sole argument when the given
92syscall has been executed asynchronously. 103syscall has been executed asynchronously.
93 104
94All functions expecting a filehandle keep a copy of the filehandle 105All functions expecting a filehandle keep a copy of the filehandle
95internally until the request has finished. 106internally until the request has finished.
107
108All requests return objects of type L<IO::AIO::REQ> that allow further
109manipulation of those requests while they are in-flight.
96 110
97The pathnames you pass to these routines I<must> be absolute and 111The pathnames you pass to these routines I<must> be absolute and
98encoded in byte form. The reason for the former is that at the time the 112encoded in byte form. The reason for the former is that at the time the
99request is being executed, the current working directory could have 113request is being executed, the current working directory could have
100changed. Alternatively, you can make sure that you never change the 114changed. Alternatively, you can make sure that you never change the
168 print "read $_[0] bytes: <$buffer>\n"; 182 print "read $_[0] bytes: <$buffer>\n";
169 }; 183 };
170 184
171=item aio_move $srcpath, $dstpath, $callback->($status) 185=item aio_move $srcpath, $dstpath, $callback->($status)
172 186
173[EXPERIMENTAL]
174
175Try to move the I<file> (directories not supported as either source or destination) 187Try to move the I<file> (directories not supported as either source or
176from C<$srcpath> to C<$dstpath> and call the callback with the C<0> (error) or C<-1> ok. 188destination) from C<$srcpath> to C<$dstpath> and call the callback with
189the C<0> (error) or C<-1> ok.
177 190
178This is a composite request that tries to rename(2) the file first. If 191This is a composite request that tries to rename(2) the file first. If
179rename files with C<EXDEV>, it creates the destination file with mode 0200 192rename files with C<EXDEV>, it creates the destination file with mode 0200
180and copies the contents of the source file into it using C<aio_sendfile>, 193and copies the contents of the source file into it using C<aio_sendfile>,
181followed by restoring atime, mtime, access mode and uid/gid, in that 194followed by restoring atime, mtime, access mode and uid/gid, in that
188=cut 201=cut
189 202
190sub aio_move($$$) { 203sub aio_move($$$) {
191 my ($src, $dst, $cb) = @_; 204 my ($src, $dst, $cb) = @_;
192 205
206 my $grp = aio_group;
207
193 aio_rename $src, $dst, sub { 208 add $grp aio_rename $src, $dst, sub {
194 if ($_[0] && $! == EXDEV) { 209 if ($_[0] && $! == EXDEV) {
195 aio_open $src, O_RDONLY, 0, sub { 210 add $grp aio_open $src, O_RDONLY, 0, sub {
196 if (my $src_fh = $_[0]) { 211 if (my $src_fh = $_[0]) {
197 my @stat = stat $src_fh; 212 my @stat = stat $src_fh;
198 213
199 aio_open $dst, O_WRONLY, 0200, sub { 214 add $grp aio_open $dst, O_WRONLY, 0200, sub {
200 if (my $dst_fh = $_[0]) { 215 if (my $dst_fh = $_[0]) {
201 aio_sendfile $dst_fh, $src_fh, 0, $stat[7], sub { 216 add $grp aio_sendfile $dst_fh, $src_fh, 0, $stat[7], sub {
202 close $src_fh; 217 close $src_fh;
203 218
204 if ($_[0] == $stat[7]) { 219 if ($_[0] == $stat[7]) {
205 utime $stat[8], $stat[9], $dst; 220 utime $stat[8], $stat[9], $dst;
206 chmod $stat[2] & 07777, $dst_fh; 221 chmod $stat[2] & 07777, $dst_fh;
207 chown $stat[4], $stat[5], $dst_fh; 222 chown $stat[4], $stat[5], $dst_fh;
208 close $dst_fh; 223 close $dst_fh;
209 224
210 aio_unlink $src, sub { 225 add $grp aio_unlink $src, sub {
211 $cb->($_[0]); 226 $cb->($_[0]);
212 }; 227 };
213 } else { 228 } else {
214 my $errno = $!; 229 my $errno = $!;
215 aio_unlink $dst, sub { 230 add $grp aio_unlink $dst, sub {
216 $! = $errno; 231 $! = $errno;
217 $cb->(-1); 232 $cb->(-1);
218 }; 233 };
219 } 234 }
220 }; 235 };
229 }; 244 };
230 } else { 245 } else {
231 $cb->($_[0]); 246 $cb->($_[0]);
232 } 247 }
233 }; 248 };
249
250 $grp
234} 251}
235 252
236=item aio_sendfile $out_fh, $in_fh, $in_offset, $length, $callback->($retval) 253=item aio_sendfile $out_fh, $in_fh, $in_offset, $length, $callback->($retval)
237 254
238Tries to copy C<$length> bytes from C<$in_fh> to C<$out_fh>. It starts 255Tries to copy C<$length> bytes from C<$in_fh> to C<$out_fh>. It starts
326The callback a single argument which is either C<undef> or an array-ref 343The callback a single argument which is either C<undef> or an array-ref
327with the filenames. 344with the filenames.
328 345
329=item aio_scandir $path, $maxreq, $callback->($dirs, $nondirs) 346=item aio_scandir $path, $maxreq, $callback->($dirs, $nondirs)
330 347
331Scans a directory (similar to C<aio_readdir>) and tries to separate the 348Scans a directory (similar to C<aio_readdir>) but additionally tries to
332entries of directory C<$path> into two sets of names, ones you can recurse 349separate the entries of directory C<$path> into two sets of names, ones
333into (directories), and ones you cannot recurse into (everything else). 350you can recurse into (directories or links to them), and ones you cannot
351recurse into (everything else).
334 352
335C<aio_scandir> is a composite request that consists of many 353C<aio_scandir> is a composite request that consists of many sub
336aio-primitives. C<$maxreq> specifies the maximum number of outstanding 354requests. C<$maxreq> specifies the maximum number of outstanding aio
337aio requests that this function generates. If it is C<< <= 0 >>, then a 355requests that this function generates. If it is C<< <= 0 >>, then a
338suitable default will be chosen (currently 8). 356suitable default will be chosen (currently 8).
339 357
340On error, the callback is called without arguments, otherwise it receives 358On error, the callback is called without arguments, otherwise it receives
341two array-refs with path-relative entry names. 359two array-refs with path-relative entry names.
342 360
351Implementation notes. 369Implementation notes.
352 370
353The C<aio_readdir> cannot be avoided, but C<stat()>'ing every entry can. 371The C<aio_readdir> cannot be avoided, but C<stat()>'ing every entry can.
354 372
355After reading the directory, the modification time, size etc. of the 373After reading the directory, the modification time, size etc. of the
356directory before and after the readdir is checked, and if they match, the 374directory before and after the readdir is checked, and if they match (and
357link count will be used to decide how many entries are directories (if 375isn't the current time), the link count will be used to decide how many
358>= 2). Otherwise, no knowledge of the number of subdirectories will be 376entries are directories (if >= 2). Otherwise, no knowledge of the number
359assumed. 377of subdirectories will be assumed.
360 378
361Then entires will be sorted into likely directories (everything without a 379Then entries will be sorted into likely directories (everything without
362non-initial dot) and likely non-directories (everything else). Then every 380a non-initial dot currently) and likely non-directories (everything
363entry + C</.> will be C<stat>'ed, likely directories first. This is often 381else). Then every entry plus an appended C</.> will be C<stat>'ed,
382likely directories first. If that succeeds, it assumes that the entry
383is a directory or a symlink to directory (which will be checked
384seperately). This is often faster than stat'ing the entry itself because
364faster because filesystems might detect the type of the entry without 385filesystems might detect the type of the entry without reading the inode
365reading the inode data (e.g. ext2fs filetype feature). If that succeeds, 386data (e.g. ext2fs filetype feature).
366it assumes that the entry is a directory or a symlink to directory (which
367will be checked seperately).
368 387
369If the known number of directories has been reached, the rest of the 388If the known number of directories (link count - 2) has been reached, the
370entries is assumed to be non-directories. 389rest of the entries is assumed to be non-directories.
390
391This only works with certainty on POSIX (= UNIX) filesystems, which
392fortunately are the vast majority of filesystems around.
393
394It will also likely work on non-POSIX filesystems with reduced efficiency
395as those tend to return 0 or 1 as link counts, which disables the
396directory counting heuristic.
371 397
372=cut 398=cut
373 399
374sub aio_scandir($$$) { 400sub aio_scandir($$$) {
375 my ($path, $maxreq, $cb) = @_; 401 my ($path, $maxreq, $cb) = @_;
376 402
403 my $grp = aio_group;
404
377 $maxreq = 8 if $maxreq <= 0; 405 $maxreq = 8 if $maxreq <= 0;
378 406
379 # stat once 407 # stat once
380 aio_stat $path, sub { 408 add $grp aio_stat $path, sub {
381 return $cb->() if $_[0]; 409 return $cb->() if $_[0];
410 my $now = time;
382 my $hash1 = join ":", (stat _)[0,1,3,7,9]; 411 my $hash1 = join ":", (stat _)[0,1,3,7,9];
383 412
384 # read the directory entries 413 # read the directory entries
385 aio_readdir $path, sub { 414 add $grp aio_readdir $path, sub {
386 my $entries = shift 415 my $entries = shift
387 or return $cb->(); 416 or return $cb->();
388 417
389 # stat the dir another time 418 # stat the dir another time
390 aio_stat $path, sub { 419 add $grp aio_stat $path, sub {
391 my $hash2 = join ":", (stat _)[0,1,3,7,9]; 420 my $hash2 = join ":", (stat _)[0,1,3,7,9];
392 421
393 my $ndirs; 422 my $ndirs;
394 423
395 # take the slow route if anything looks fishy 424 # take the slow route if anything looks fishy
396 if ($hash1 ne $hash2) { 425 if ($hash1 ne $hash2 or (stat _)[9] == $now) {
397 $ndirs = -1; 426 $ndirs = -1;
398 } else { 427 } else {
399 # if nlink == 2, we are finished 428 # if nlink == 2, we are finished
400 # on non-posix-fs's, we rely on nlink < 2 429 # on non-posix-fs's, we rely on nlink < 2
401 $ndirs = (stat _)[3] - 2 430 $ndirs = (stat _)[3] - 2
417 $schedcb = sub { 446 $schedcb = sub {
418 if (@$entries) { 447 if (@$entries) {
419 if ($nreq < $maxreq) { 448 if ($nreq < $maxreq) {
420 my $ent = pop @$entries; 449 my $ent = pop @$entries;
421 $nreq++; 450 $nreq++;
422 aio_stat "$path/$ent/.", sub { $statcb->($_[0], $ent) }; 451 add $grp aio_stat "$path/$ent/.", sub { $statcb->($_[0], $ent) };
423 } 452 }
424 } elsif (!$nreq) { 453 } elsif (!$nreq) {
425 # finished 454 # finished
426 undef $statcb; 455 undef $statcb;
427 undef $schedcb; 456 undef $schedcb;
436 $nreq--; 465 $nreq--;
437 push @nondirs, $entry; 466 push @nondirs, $entry;
438 &$schedcb; 467 &$schedcb;
439 } else { 468 } else {
440 # need to check for real directory 469 # need to check for real directory
441 aio_lstat "$path/$entry", sub { 470 add $grp aio_lstat "$path/$entry", sub {
442 $nreq--; 471 $nreq--;
443 472
444 if (-d _) { 473 if (-d _) {
445 push @dirs, $entry; 474 push @dirs, $entry;
446 475
459 488
460 &$schedcb while @$entries && $nreq < $maxreq; 489 &$schedcb while @$entries && $nreq < $maxreq;
461 }; 490 };
462 }; 491 };
463 }; 492 };
493
494 $grp
464} 495}
465 496
466=item aio_fsync $fh, $callback->($status) 497=item aio_fsync $fh, $callback->($status)
467 498
468Asynchronously call fsync on the given filehandle and call the callback 499Asynchronously call fsync on the given filehandle and call the callback
473Asynchronously call fdatasync on the given filehandle and call the 504Asynchronously call fdatasync on the given filehandle and call the
474callback with the fdatasync result code. 505callback with the fdatasync result code.
475 506
476If this call isn't available because your OS lacks it or it couldn't be 507If this call isn't available because your OS lacks it or it couldn't be
477detected, it will be emulated by calling C<fsync> instead. 508detected, it will be emulated by calling C<fsync> instead.
509
510=item aio_group $callback->()
511
512[EXPERIMENTAL]
513
514This is a very special aio request: Instead of doing something, it is a
515container for other aio requests, which is useful if you want to bundle
516many requests into a single, composite, request.
517
518Returns an object of class L<IO::AIO::GRP>. See its documentation below
519for more info.
520
521Example:
522
523 my $grp = aio_group sub {
524 print "all stats done\n";
525 };
526
527 add $grp
528 (aio_stat ...),
529 (aio_stat ...),
530 ...;
531
532=item IO::AIO::aio_sleep $fractional_seconds, $callback->() *NOT EXPORTED*
533
534Mainly used for debugging and benchmarking, this aio request puts one of
535the request workers to sleep for the given time.
536
537While it is theoretically handy to have simple I/O scheduling requests
538like sleep and file handle readable/writable, the overhead this creates
539is immense, so do not use this function except to put your application
540under artificial I/O pressure.
541
542=back
543
544=head2 IO::AIO::REQ CLASS
545
546All non-aggregate C<aio_*> functions return an object of this class when
547called in non-void context.
548
549A request always moves through the following five states in its lifetime,
550in order: B<ready> (request has been created, but has not been executed
551yet), B<execute> (request is currently being executed), B<pending>
552(request has been executed but callback has not been called yet),
553B<result> (results are being processed synchronously, includes calling the
554callback) and B<done> (request has reached the end of its lifetime and
555holds no resources anymore).
556
557=over 4
558
559=item $req->cancel
560
561Cancels the request, if possible. Has the effect of skipping execution
562when entering the B<execute> state and skipping calling the callback when
563entering the the B<result> state, but will leave the request otherwise
564untouched. That means that requests that currently execute will not be
565stopped and resources held by the request will not be freed prematurely.
566
567=back
568
569=head2 IO::AIO::GRP CLASS
570
571This class is a subclass of L<IO::AIO::REQ>, so all its methods apply to
572objects of this class, too.
573
574A IO::AIO::GRP object is a special request that can contain multiple other
575aio requests.
576
577You create one by calling the C<aio_group> constructing function with a
578callback that will be called when all contained requests have entered the
579C<done> state:
580
581 my $grp = aio_group sub {
582 print "all requests are done\n";
583 };
584
585You add requests by calling the C<add> method with one or more
586C<IO::AIO::REQ> objects:
587
588 $grp->add (aio_unlink "...");
589
590 add $grp aio_stat "...", sub { ... };
591
592This makes it very easy to create composite requests (see the source of
593C<aio_move> for an application) that work and feel like simple requests.
594
595The IO::AIO::GRP objects will be cleaned up during calls to
596C<IO::AIO::poll_cb>, just like any other request.
597
598They can be canceled like any other request. Canceling will cancel not
599just the request itself, but also all requests it contains.
600
601They can also can also be added to other IO::AIO::GRP objects.
602
603Their lifetime, simplified, looks like this: when they are empty, they
604will finish very quickly. If they contain only requests that are in the
605C<done> state, they will also finish. Otherwise they will continue to
606exist.
607
608=over 4
609
610=item $grp->add (...)
611
612=item add $grp ...
613
614Add one or more
615Cancels the request, if possible. Has the effect of skipping execution
616when entering the B<execute> state and skipping calling the callback when
617entering the the B<result> state, but will leave the request otherwise
618untouched. That means that requests that currently execute will not be
619stopped and resources held by the request will not be freed prematurely.
478 620
479=back 621=back
480 622
481=head2 SUPPORT FUNCTIONS 623=head2 SUPPORT FUNCTIONS
482 624
610} 752}
611 753
6121; 7541;
613 755
614=head2 FORK BEHAVIOUR 756=head2 FORK BEHAVIOUR
757
758This module should do "the right thing" when the process using it forks:
615 759
616Before the fork, IO::AIO enters a quiescent state where no requests 760Before the fork, IO::AIO enters a quiescent state where no requests
617can be added in other threads and no results will be processed. After 761can be added in other threads and no results will be processed. After
618the fork the parent simply leaves the quiescent state and continues 762the fork the parent simply leaves the quiescent state and continues
619request/result processing, while the child clears the request/result 763request/result processing, while the child clears the request/result
620queue (so the requests started before the fork will only be handled in 764queue (so the requests started before the fork will only be handled in
621the parent). Threats will be started on demand until the limit ste in the 765the parent). Threads will be started on demand until the limit ste in the
622parent process has been reached again. 766parent process has been reached again.
623 767
768In short: the parent will, after a short pause, continue as if fork had
769not been called, while the child will act as if IO::AIO has not been used
770yet.
771
624=head1 SEE ALSO 772=head1 SEE ALSO
625 773
626L<Coro>, L<Linux::AIO>. 774L<Coro>, L<Linux::AIO> (obsolete).
627 775
628=head1 AUTHOR 776=head1 AUTHOR
629 777
630 Marc Lehmann <schmorp@schmorp.de> 778 Marc Lehmann <schmorp@schmorp.de>
631 http://home.schmorp.de/ 779 http://home.schmorp.de/

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines