ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/IO-AIO/AIO.pm
(Generate patch)

Comparing IO-AIO/AIO.pm (file contents):
Revision 1.53 by root, Sat Oct 21 23:20:29 2006 UTC vs.
Revision 1.58 by root, Sun Oct 22 10:33:19 2006 UTC

15 15
16 aio_read $fh, 30000, 1024, $buffer, 0, sub { 16 aio_read $fh, 30000, 1024, $buffer, 0, sub {
17 $_[0] > 0 or die "read error: $!"; 17 $_[0] > 0 or die "read error: $!";
18 }; 18 };
19 19
20 use IO::AIO 2; # version has aio objects 20 # version 2+ has request and group objects
21 use IO::AIO 2;
21 22
22 my $req = aio_unlink "/tmp/file", sub { }; 23 my $req = aio_unlink "/tmp/file", sub { };
23 $req->cancel; # cancel request if still in queue 24 $req->cancel; # cancel request if still in queue
24 25
25 # AnyEvent 26 my $grp = aio_group sub { print "all stats done\n" };
27 add $grp aio_stat "..." for ...;
28
29 # AnyEvent integration
26 open my $fh, "<&=" . IO::AIO::poll_fileno or die "$!"; 30 open my $fh, "<&=" . IO::AIO::poll_fileno or die "$!";
27 my $w = AnyEvent->io (fh => $fh, poll => 'r', cb => sub { IO::AIO::poll_cb }); 31 my $w = AnyEvent->io (fh => $fh, poll => 'r', cb => sub { IO::AIO::poll_cb });
28 32
29 # Event 33 # Event integration
30 Event->io (fd => IO::AIO::poll_fileno, 34 Event->io (fd => IO::AIO::poll_fileno,
31 poll => 'r', 35 poll => 'r',
32 cb => \&IO::AIO::poll_cb); 36 cb => \&IO::AIO::poll_cb);
33 37
34 # Glib/Gtk2 38 # Glib/Gtk2 integration
35 add_watch Glib::IO IO::AIO::poll_fileno, 39 add_watch Glib::IO IO::AIO::poll_fileno,
36 in => sub { IO::AIO::poll_cb; 1 }; 40 in => sub { IO::AIO::poll_cb; 1 };
37 41
38 # Tk 42 # Tk integration
39 Tk::Event::IO->fileevent (IO::AIO::poll_fileno, "", 43 Tk::Event::IO->fileevent (IO::AIO::poll_fileno, "",
40 readable => \&IO::AIO::poll_cb); 44 readable => \&IO::AIO::poll_cb);
41 45
42 # Danga::Socket 46 # Danga::Socket integration
43 Danga::Socket->AddOtherFds (IO::AIO::poll_fileno => 47 Danga::Socket->AddOtherFds (IO::AIO::poll_fileno =>
44 \&IO::AIO::poll_cb); 48 \&IO::AIO::poll_cb);
45
46 49
47=head1 DESCRIPTION 50=head1 DESCRIPTION
48 51
49This module implements asynchronous I/O using whatever means your 52This module implements asynchronous I/O using whatever means your
50operating system supports. 53operating system supports.
71use strict 'vars'; 74use strict 'vars';
72 75
73use base 'Exporter'; 76use base 'Exporter';
74 77
75BEGIN { 78BEGIN {
76 our $VERSION = '1.99'; 79 our $VERSION = '2.0';
77 80
78 our @EXPORT = qw(aio_sendfile aio_read aio_write aio_open aio_close aio_stat 81 our @EXPORT = qw(aio_sendfile aio_read aio_write aio_open aio_close aio_stat
79 aio_lstat aio_unlink aio_rmdir aio_readdir aio_scandir aio_symlink 82 aio_lstat aio_unlink aio_rmdir aio_readdir aio_scandir aio_symlink
80 aio_fsync aio_fdatasync aio_readahead aio_rename aio_link aio_move); 83 aio_fsync aio_fdatasync aio_readahead aio_rename aio_link aio_move
84 aio_group);
81 our @EXPORT_OK = qw(poll_fileno poll_cb min_parallel max_parallel max_outstanding nreqs); 85 our @EXPORT_OK = qw(poll_fileno poll_cb min_parallel max_parallel max_outstanding nreqs);
86
87 @IO::AIO::GRP::ISA = 'IO::AIO::REQ';
82 88
83 require XSLoader; 89 require XSLoader;
84 XSLoader::load ("IO::AIO", $VERSION); 90 XSLoader::load ("IO::AIO", $VERSION);
85} 91}
86 92
97syscall has been executed asynchronously. 103syscall has been executed asynchronously.
98 104
99All functions expecting a filehandle keep a copy of the filehandle 105All functions expecting a filehandle keep a copy of the filehandle
100internally until the request has finished. 106internally until the request has finished.
101 107
102All non-composite requests (requests that are not broken down into
103multiple requests) return objects of type L<IO::AIO::REQ> that allow 108All requests return objects of type L<IO::AIO::REQ> that allow further
104further manipulation of running requests. 109manipulation of those requests while they are in-flight.
105 110
106The pathnames you pass to these routines I<must> be absolute and 111The pathnames you pass to these routines I<must> be absolute and
107encoded in byte form. The reason for the former is that at the time the 112encoded in byte form. The reason for the former is that at the time the
108request is being executed, the current working directory could have 113request is being executed, the current working directory could have
109changed. Alternatively, you can make sure that you never change the 114changed. Alternatively, you can make sure that you never change the
177 print "read $_[0] bytes: <$buffer>\n"; 182 print "read $_[0] bytes: <$buffer>\n";
178 }; 183 };
179 184
180=item aio_move $srcpath, $dstpath, $callback->($status) 185=item aio_move $srcpath, $dstpath, $callback->($status)
181 186
187[EXPERIMENTAL due to internal aio_group use]
188
182Try to move the I<file> (directories not supported as either source or 189Try to move the I<file> (directories not supported as either source or
183destination) from C<$srcpath> to C<$dstpath> and call the callback with 190destination) from C<$srcpath> to C<$dstpath> and call the callback with
184the C<0> (error) or C<-1> ok. 191the C<0> (error) or C<-1> ok.
185 192
186This is a composite request that tries to rename(2) the file first. If 193This is a composite request that tries to rename(2) the file first. If
196=cut 203=cut
197 204
198sub aio_move($$$) { 205sub aio_move($$$) {
199 my ($src, $dst, $cb) = @_; 206 my ($src, $dst, $cb) = @_;
200 207
208 my $grp = aio_group $cb;
209
201 aio_rename $src, $dst, sub { 210 add $grp aio_rename $src, $dst, sub {
202 if ($_[0] && $! == EXDEV) { 211 if ($_[0] && $! == EXDEV) {
203 aio_open $src, O_RDONLY, 0, sub { 212 add $grp aio_open $src, O_RDONLY, 0, sub {
204 if (my $src_fh = $_[0]) { 213 if (my $src_fh = $_[0]) {
205 my @stat = stat $src_fh; 214 my @stat = stat $src_fh;
206 215
207 aio_open $dst, O_WRONLY, 0200, sub { 216 add $grp aio_open $dst, O_WRONLY, 0200, sub {
208 if (my $dst_fh = $_[0]) { 217 if (my $dst_fh = $_[0]) {
209 aio_sendfile $dst_fh, $src_fh, 0, $stat[7], sub { 218 add $grp aio_sendfile $dst_fh, $src_fh, 0, $stat[7], sub {
210 close $src_fh; 219 close $src_fh;
211 220
212 if ($_[0] == $stat[7]) { 221 if ($_[0] == $stat[7]) {
213 utime $stat[8], $stat[9], $dst; 222 utime $stat[8], $stat[9], $dst;
214 chmod $stat[2] & 07777, $dst_fh; 223 chmod $stat[2] & 07777, $dst_fh;
215 chown $stat[4], $stat[5], $dst_fh; 224 chown $stat[4], $stat[5], $dst_fh;
216 close $dst_fh; 225 close $dst_fh;
217 226
218 aio_unlink $src, sub { 227 add $grp aio_unlink $src, sub {
219 $cb->($_[0]); 228 $grp->result ($_[0]);
220 }; 229 };
221 } else { 230 } else {
222 my $errno = $!; 231 my $errno = $!;
223 aio_unlink $dst, sub { 232 add $grp aio_unlink $dst, sub {
224 $! = $errno; 233 $! = $errno;
225 $cb->(-1); 234 $grp->result (-1);
226 }; 235 };
227 } 236 }
228 }; 237 };
229 } else { 238 } else {
230 $cb->(-1); 239 $grp->result (-1);
231 } 240 }
232 }, 241 },
233 242
234 } else { 243 } else {
235 $cb->(-1); 244 $grp->result (-1);
236 } 245 }
237 }; 246 };
238 } else { 247 } else {
239 $cb->($_[0]); 248 $grp->result ($_[0]);
240 } 249 }
241 }; 250 };
251
252 $grp
242} 253}
243 254
244=item aio_sendfile $out_fh, $in_fh, $in_offset, $length, $callback->($retval) 255=item aio_sendfile $out_fh, $in_fh, $in_offset, $length, $callback->($retval)
245 256
246Tries to copy C<$length> bytes from C<$in_fh> to C<$out_fh>. It starts 257Tries to copy C<$length> bytes from C<$in_fh> to C<$out_fh>. It starts
334The callback a single argument which is either C<undef> or an array-ref 345The callback a single argument which is either C<undef> or an array-ref
335with the filenames. 346with the filenames.
336 347
337=item aio_scandir $path, $maxreq, $callback->($dirs, $nondirs) 348=item aio_scandir $path, $maxreq, $callback->($dirs, $nondirs)
338 349
350[EXPERIMENTAL due to internal aio_group use]
351
339Scans a directory (similar to C<aio_readdir>) but additionally tries to 352Scans a directory (similar to C<aio_readdir>) but additionally tries to
340separate the entries of directory C<$path> into two sets of names, ones 353separate the entries of directory C<$path> into two sets of names, ones
341you can recurse into (directories or links to them), and ones you cannot 354you can recurse into (directories or links to them), and ones you cannot
342recurse into (everything else). 355recurse into (everything else).
343 356
389=cut 402=cut
390 403
391sub aio_scandir($$$) { 404sub aio_scandir($$$) {
392 my ($path, $maxreq, $cb) = @_; 405 my ($path, $maxreq, $cb) = @_;
393 406
407 my $grp = aio_group $cb;
408
394 $maxreq = 8 if $maxreq <= 0; 409 $maxreq = 8 if $maxreq <= 0;
395 410
396 # stat once 411 # stat once
397 aio_stat $path, sub { 412 add $grp aio_stat $path, sub {
398 return $cb->() if $_[0]; 413 return $grp->result () if $_[0];
399 my $now = time; 414 my $now = time;
400 my $hash1 = join ":", (stat _)[0,1,3,7,9]; 415 my $hash1 = join ":", (stat _)[0,1,3,7,9];
401 416
402 # read the directory entries 417 # read the directory entries
403 aio_readdir $path, sub { 418 add $grp aio_readdir $path, sub {
404 my $entries = shift 419 my $entries = shift
405 or return $cb->(); 420 or return $grp->result ();
406 421
407 # stat the dir another time 422 # stat the dir another time
408 aio_stat $path, sub { 423 add $grp aio_stat $path, sub {
409 my $hash2 = join ":", (stat _)[0,1,3,7,9]; 424 my $hash2 = join ":", (stat _)[0,1,3,7,9];
410 425
411 my $ndirs; 426 my $ndirs;
412 427
413 # take the slow route if anything looks fishy 428 # take the slow route if anything looks fishy
415 $ndirs = -1; 430 $ndirs = -1;
416 } else { 431 } else {
417 # if nlink == 2, we are finished 432 # if nlink == 2, we are finished
418 # on non-posix-fs's, we rely on nlink < 2 433 # on non-posix-fs's, we rely on nlink < 2
419 $ndirs = (stat _)[3] - 2 434 $ndirs = (stat _)[3] - 2
420 or return $cb->([], $entries); 435 or return $grp->result ([], $entries);
421 } 436 }
422 437
423 # sort into likely dirs and likely nondirs 438 # sort into likely dirs and likely nondirs
424 # dirs == files without ".", short entries first 439 # dirs == files without ".", short entries first
425 $entries = [map $_->[0], 440 $entries = [map $_->[0],
435 $schedcb = sub { 450 $schedcb = sub {
436 if (@$entries) { 451 if (@$entries) {
437 if ($nreq < $maxreq) { 452 if ($nreq < $maxreq) {
438 my $ent = pop @$entries; 453 my $ent = pop @$entries;
439 $nreq++; 454 $nreq++;
440 aio_stat "$path/$ent/.", sub { $statcb->($_[0], $ent) }; 455 add $grp aio_stat "$path/$ent/.", sub { $statcb->($_[0], $ent) };
441 } 456 }
442 } elsif (!$nreq) { 457 } elsif (!$nreq) {
443 # finished 458 # finished
444 undef $statcb; 459 undef $statcb;
445 undef $schedcb; 460 undef $schedcb;
446 $cb->(\@dirs, \@nondirs) if $cb; 461 $grp->result (\@dirs, \@nondirs) if $cb;
447 undef $cb; 462 undef $cb;
448 } 463 }
449 }; 464 };
450 $statcb = sub { 465 $statcb = sub {
451 my ($status, $entry) = @_; 466 my ($status, $entry) = @_;
454 $nreq--; 469 $nreq--;
455 push @nondirs, $entry; 470 push @nondirs, $entry;
456 &$schedcb; 471 &$schedcb;
457 } else { 472 } else {
458 # need to check for real directory 473 # need to check for real directory
459 aio_lstat "$path/$entry", sub { 474 add $grp aio_lstat "$path/$entry", sub {
460 $nreq--; 475 $nreq--;
461 476
462 if (-d _) { 477 if (-d _) {
463 push @dirs, $entry; 478 push @dirs, $entry;
464 479
477 492
478 &$schedcb while @$entries && $nreq < $maxreq; 493 &$schedcb while @$entries && $nreq < $maxreq;
479 }; 494 };
480 }; 495 };
481 }; 496 };
497
498 $grp
482} 499}
483 500
484=item aio_fsync $fh, $callback->($status) 501=item aio_fsync $fh, $callback->($status)
485 502
486Asynchronously call fsync on the given filehandle and call the callback 503Asynchronously call fsync on the given filehandle and call the callback
491Asynchronously call fdatasync on the given filehandle and call the 508Asynchronously call fdatasync on the given filehandle and call the
492callback with the fdatasync result code. 509callback with the fdatasync result code.
493 510
494If this call isn't available because your OS lacks it or it couldn't be 511If this call isn't available because your OS lacks it or it couldn't be
495detected, it will be emulated by calling C<fsync> instead. 512detected, it will be emulated by calling C<fsync> instead.
513
514=item aio_group $callback->(...)
515
516[EXPERIMENTAL]
517
518This is a very special aio request: Instead of doing something, it is a
519container for other aio requests, which is useful if you want to bundle
520many requests into a single, composite, request.
521
522Returns an object of class L<IO::AIO::GRP>. See its documentation below
523for more info.
524
525Example:
526
527 my $grp = aio_group sub {
528 print "all stats done\n";
529 };
530
531 add $grp
532 (aio_stat ...),
533 (aio_stat ...),
534 ...;
535
536=item IO::AIO::aio_sleep $fractional_seconds, $callback->() *NOT EXPORTED*
537
538Mainly used for debugging and benchmarking, this aio request puts one of
539the request workers to sleep for the given time.
540
541While it is theoretically handy to have simple I/O scheduling requests
542like sleep and file handle readable/writable, the overhead this creates
543is immense, so do not use this function except to put your application
544under artificial I/O pressure.
496 545
497=back 546=back
498 547
499=head2 IO::AIO::REQ CLASS 548=head2 IO::AIO::REQ CLASS
500 549
519untouched. That means that requests that currently execute will not be 568untouched. That means that requests that currently execute will not be
520stopped and resources held by the request will not be freed prematurely. 569stopped and resources held by the request will not be freed prematurely.
521 570
522=back 571=back
523 572
573=head2 IO::AIO::GRP CLASS
574
575This class is a subclass of L<IO::AIO::REQ>, so all its methods apply to
576objects of this class, too.
577
578A IO::AIO::GRP object is a special request that can contain multiple other
579aio requests.
580
581You create one by calling the C<aio_group> constructing function with a
582callback that will be called when all contained requests have entered the
583C<done> state:
584
585 my $grp = aio_group sub {
586 print "all requests are done\n";
587 };
588
589You add requests by calling the C<add> method with one or more
590C<IO::AIO::REQ> objects:
591
592 $grp->add (aio_unlink "...");
593
594 add $grp aio_stat "...", sub {
595 $_[0] or return $grp->result ("error");
596
597 # add another request dynamically, if first succeeded
598 add $grp aio_open "...", sub {
599 $grp->result ("ok");
600 };
601 };
602
603This makes it very easy to create composite requests (see the source of
604C<aio_move> for an application) that work and feel like simple requests.
605
606The IO::AIO::GRP objects will be cleaned up during calls to
607C<IO::AIO::poll_cb>, just like any other request.
608
609They can be canceled like any other request. Canceling will cancel not
610just the request itself, but also all requests it contains.
611
612They can also can also be added to other IO::AIO::GRP objects.
613
614Their lifetime, simplified, looks like this: when they are empty, they
615will finish very quickly. If they contain only requests that are in the
616C<done> state, they will also finish. Otherwise they will continue to
617exist.
618
619That means after creating a group you have some time to add requests. And
620in the callbacks of those requests, you can add further requests to the
621group. And only when all those requests have finished will the the group
622itself finish.
623
624=over 4
625
626=item $grp->add (...)
627
628=item add $grp ...
629
630Add one or more requests to the group. Any type of L<IO::AIO::REQ> can
631be added, including other groups, as long as you do not create circular
632dependencies.
633
634Returns all its arguments.
635
636=item $grp->result (...)
637
638Set the result value(s) that will be passed to the group callback when all
639subrequests have finished. By default, no argument will be passed.
640
641=back
642
524=head2 SUPPORT FUNCTIONS 643=head2 SUPPORT FUNCTIONS
525 644
526=over 4 645=over 4
527 646
528=item $fileno = IO::AIO::poll_fileno 647=item $fileno = IO::AIO::poll_fileno

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines