ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/IO-AIO/AIO.pm
(Generate patch)

Comparing IO-AIO/AIO.pm (file contents):
Revision 1.52 by root, Sat Oct 21 23:06:04 2006 UTC vs.
Revision 1.56 by root, Sun Oct 22 00:53:47 2006 UTC

15 15
16 aio_read $fh, 30000, 1024, $buffer, 0, sub { 16 aio_read $fh, 30000, 1024, $buffer, 0, sub {
17 $_[0] > 0 or die "read error: $!"; 17 $_[0] > 0 or die "read error: $!";
18 }; 18 };
19 19
20 use IO::AIO 2; # version has aio objects 20 # version 2+ has request and group objects
21 use IO::AIO 2;
21 22
22 my $req = aio_unlink "/tmp/file", sub { }; 23 my $req = aio_unlink "/tmp/file", sub { };
23 $req->cancel; # cancel request if still in queue 24 $req->cancel; # cancel request if still in queue
24 25
25 # AnyEvent 26 my $grp = aio_group sub { print "all stats done\n" };
27 add $grp aio_stat "..." for ...;
28
29 # AnyEvent integration
26 open my $fh, "<&=" . IO::AIO::poll_fileno or die "$!"; 30 open my $fh, "<&=" . IO::AIO::poll_fileno or die "$!";
27 my $w = AnyEvent->io (fh => $fh, poll => 'r', cb => sub { IO::AIO::poll_cb }); 31 my $w = AnyEvent->io (fh => $fh, poll => 'r', cb => sub { IO::AIO::poll_cb });
28 32
29 # Event 33 # Event integration
30 Event->io (fd => IO::AIO::poll_fileno, 34 Event->io (fd => IO::AIO::poll_fileno,
31 poll => 'r', 35 poll => 'r',
32 cb => \&IO::AIO::poll_cb); 36 cb => \&IO::AIO::poll_cb);
33 37
34 # Glib/Gtk2 38 # Glib/Gtk2 integration
35 add_watch Glib::IO IO::AIO::poll_fileno, 39 add_watch Glib::IO IO::AIO::poll_fileno,
36 in => sub { IO::AIO::poll_cb; 1 }; 40 in => sub { IO::AIO::poll_cb; 1 };
37 41
38 # Tk 42 # Tk integration
39 Tk::Event::IO->fileevent (IO::AIO::poll_fileno, "", 43 Tk::Event::IO->fileevent (IO::AIO::poll_fileno, "",
40 readable => \&IO::AIO::poll_cb); 44 readable => \&IO::AIO::poll_cb);
41 45
42 # Danga::Socket 46 # Danga::Socket integration
43 Danga::Socket->AddOtherFds (IO::AIO::poll_fileno => 47 Danga::Socket->AddOtherFds (IO::AIO::poll_fileno =>
44 \&IO::AIO::poll_cb); 48 \&IO::AIO::poll_cb);
45
46 49
47=head1 DESCRIPTION 50=head1 DESCRIPTION
48 51
49This module implements asynchronous I/O using whatever means your 52This module implements asynchronous I/O using whatever means your
50operating system supports. 53operating system supports.
71use strict 'vars'; 74use strict 'vars';
72 75
73use base 'Exporter'; 76use base 'Exporter';
74 77
75BEGIN { 78BEGIN {
76 our $VERSION = '1.8'; 79 our $VERSION = '2.0';
77 80
78 our @EXPORT = qw(aio_sendfile aio_read aio_write aio_open aio_close aio_stat 81 our @EXPORT = qw(aio_sendfile aio_read aio_write aio_open aio_close aio_stat
79 aio_lstat aio_unlink aio_rmdir aio_readdir aio_scandir aio_symlink 82 aio_lstat aio_unlink aio_rmdir aio_readdir aio_scandir aio_symlink
80 aio_fsync aio_fdatasync aio_readahead aio_rename aio_link aio_move); 83 aio_fsync aio_fdatasync aio_readahead aio_rename aio_link aio_move
84 aio_group);
81 our @EXPORT_OK = qw(poll_fileno poll_cb min_parallel max_parallel max_outstanding nreqs); 85 our @EXPORT_OK = qw(poll_fileno poll_cb min_parallel max_parallel max_outstanding nreqs);
86
87 @IO::AIO::GRP::ISA = 'IO::AIO::REQ';
82 88
83 require XSLoader; 89 require XSLoader;
84 XSLoader::load ("IO::AIO", $VERSION); 90 XSLoader::load ("IO::AIO", $VERSION);
85} 91}
86 92
97syscall has been executed asynchronously. 103syscall has been executed asynchronously.
98 104
99All functions expecting a filehandle keep a copy of the filehandle 105All functions expecting a filehandle keep a copy of the filehandle
100internally until the request has finished. 106internally until the request has finished.
101 107
102All non-composite requests (requests that are not broken down into
103multiple requests) return objects of type L<IO::AIO::REQ> that allow 108All requests return objects of type L<IO::AIO::REQ> that allow further
104further manipulation of running requests. 109manipulation of those requests while they are in-flight.
105 110
106The pathnames you pass to these routines I<must> be absolute and 111The pathnames you pass to these routines I<must> be absolute and
107encoded in byte form. The reason for the former is that at the time the 112encoded in byte form. The reason for the former is that at the time the
108request is being executed, the current working directory could have 113request is being executed, the current working directory could have
109changed. Alternatively, you can make sure that you never change the 114changed. Alternatively, you can make sure that you never change the
196=cut 201=cut
197 202
198sub aio_move($$$) { 203sub aio_move($$$) {
199 my ($src, $dst, $cb) = @_; 204 my ($src, $dst, $cb) = @_;
200 205
206 my $grp = aio_group;
207
201 aio_rename $src, $dst, sub { 208 add $grp aio_rename $src, $dst, sub {
202 if ($_[0] && $! == EXDEV) { 209 if ($_[0] && $! == EXDEV) {
203 aio_open $src, O_RDONLY, 0, sub { 210 add $grp aio_open $src, O_RDONLY, 0, sub {
204 if (my $src_fh = $_[0]) { 211 if (my $src_fh = $_[0]) {
205 my @stat = stat $src_fh; 212 my @stat = stat $src_fh;
206 213
207 aio_open $dst, O_WRONLY, 0200, sub { 214 add $grp aio_open $dst, O_WRONLY, 0200, sub {
208 if (my $dst_fh = $_[0]) { 215 if (my $dst_fh = $_[0]) {
209 aio_sendfile $dst_fh, $src_fh, 0, $stat[7], sub { 216 add $grp aio_sendfile $dst_fh, $src_fh, 0, $stat[7], sub {
210 close $src_fh; 217 close $src_fh;
211 218
212 if ($_[0] == $stat[7]) { 219 if ($_[0] == $stat[7]) {
213 utime $stat[8], $stat[9], $dst; 220 utime $stat[8], $stat[9], $dst;
214 chmod $stat[2] & 07777, $dst_fh; 221 chmod $stat[2] & 07777, $dst_fh;
215 chown $stat[4], $stat[5], $dst_fh; 222 chown $stat[4], $stat[5], $dst_fh;
216 close $dst_fh; 223 close $dst_fh;
217 224
218 aio_unlink $src, sub { 225 add $grp aio_unlink $src, sub {
219 $cb->($_[0]); 226 $cb->($_[0]);
220 }; 227 };
221 } else { 228 } else {
222 my $errno = $!; 229 my $errno = $!;
223 aio_unlink $dst, sub { 230 add $grp aio_unlink $dst, sub {
224 $! = $errno; 231 $! = $errno;
225 $cb->(-1); 232 $cb->(-1);
226 }; 233 };
227 } 234 }
228 }; 235 };
237 }; 244 };
238 } else { 245 } else {
239 $cb->($_[0]); 246 $cb->($_[0]);
240 } 247 }
241 }; 248 };
249
250 $grp
242} 251}
243 252
244=item aio_sendfile $out_fh, $in_fh, $in_offset, $length, $callback->($retval) 253=item aio_sendfile $out_fh, $in_fh, $in_offset, $length, $callback->($retval)
245 254
246Tries to copy C<$length> bytes from C<$in_fh> to C<$out_fh>. It starts 255Tries to copy C<$length> bytes from C<$in_fh> to C<$out_fh>. It starts
389=cut 398=cut
390 399
391sub aio_scandir($$$) { 400sub aio_scandir($$$) {
392 my ($path, $maxreq, $cb) = @_; 401 my ($path, $maxreq, $cb) = @_;
393 402
403 my $grp = aio_group;
404
394 $maxreq = 8 if $maxreq <= 0; 405 $maxreq = 8 if $maxreq <= 0;
395 406
396 # stat once 407 # stat once
397 aio_stat $path, sub { 408 add $grp aio_stat $path, sub {
398 return $cb->() if $_[0]; 409 return $cb->() if $_[0];
399 my $now = time; 410 my $now = time;
400 my $hash1 = join ":", (stat _)[0,1,3,7,9]; 411 my $hash1 = join ":", (stat _)[0,1,3,7,9];
401 412
402 # read the directory entries 413 # read the directory entries
403 aio_readdir $path, sub { 414 add $grp aio_readdir $path, sub {
404 my $entries = shift 415 my $entries = shift
405 or return $cb->(); 416 or return $cb->();
406 417
407 # stat the dir another time 418 # stat the dir another time
408 aio_stat $path, sub { 419 add $grp aio_stat $path, sub {
409 my $hash2 = join ":", (stat _)[0,1,3,7,9]; 420 my $hash2 = join ":", (stat _)[0,1,3,7,9];
410 421
411 my $ndirs; 422 my $ndirs;
412 423
413 # take the slow route if anything looks fishy 424 # take the slow route if anything looks fishy
435 $schedcb = sub { 446 $schedcb = sub {
436 if (@$entries) { 447 if (@$entries) {
437 if ($nreq < $maxreq) { 448 if ($nreq < $maxreq) {
438 my $ent = pop @$entries; 449 my $ent = pop @$entries;
439 $nreq++; 450 $nreq++;
440 aio_stat "$path/$ent/.", sub { $statcb->($_[0], $ent) }; 451 add $grp aio_stat "$path/$ent/.", sub { $statcb->($_[0], $ent) };
441 } 452 }
442 } elsif (!$nreq) { 453 } elsif (!$nreq) {
443 # finished 454 # finished
444 undef $statcb; 455 undef $statcb;
445 undef $schedcb; 456 undef $schedcb;
454 $nreq--; 465 $nreq--;
455 push @nondirs, $entry; 466 push @nondirs, $entry;
456 &$schedcb; 467 &$schedcb;
457 } else { 468 } else {
458 # need to check for real directory 469 # need to check for real directory
459 aio_lstat "$path/$entry", sub { 470 add $grp aio_lstat "$path/$entry", sub {
460 $nreq--; 471 $nreq--;
461 472
462 if (-d _) { 473 if (-d _) {
463 push @dirs, $entry; 474 push @dirs, $entry;
464 475
477 488
478 &$schedcb while @$entries && $nreq < $maxreq; 489 &$schedcb while @$entries && $nreq < $maxreq;
479 }; 490 };
480 }; 491 };
481 }; 492 };
493
494 $grp
482} 495}
483 496
484=item aio_fsync $fh, $callback->($status) 497=item aio_fsync $fh, $callback->($status)
485 498
486Asynchronously call fsync on the given filehandle and call the callback 499Asynchronously call fsync on the given filehandle and call the callback
492callback with the fdatasync result code. 505callback with the fdatasync result code.
493 506
494If this call isn't available because your OS lacks it or it couldn't be 507If this call isn't available because your OS lacks it or it couldn't be
495detected, it will be emulated by calling C<fsync> instead. 508detected, it will be emulated by calling C<fsync> instead.
496 509
510=item aio_group $callback->()
511
512[EXPERIMENTAL]
513
514This is a very special aio request: Instead of doing something, it is a
515container for other aio requests, which is useful if you want to bundle
516many requests into a single, composite, request.
517
518Returns an object of class L<IO::AIO::GRP>. See its documentation below
519for more info.
520
521Example:
522
523 my $grp = aio_group sub {
524 print "all stats done\n";
525 };
526
527 add $grp
528 (aio_stat ...),
529 (aio_stat ...),
530 ...;
531
532=item IO::AIO::aio_sleep $fractional_seconds, $callback->() *NOT EXPORTED*
533
534Mainly used for debugging and benchmarking, this aio request puts one of
535the request workers to sleep for the given time.
536
537While it is theoretically handy to have simple I/O scheduling requests
538like sleep and file handle readable/writable, the overhead this creates
539is immense, so do not use this function except to put your application
540under artificial I/O pressure.
541
497=back 542=back
498 543
499=head2 IO::AIO::CB CLASS 544=head2 IO::AIO::REQ CLASS
500 545
501All non-aggregate C<aio_*> functions return an object of this class when 546All non-aggregate C<aio_*> functions return an object of this class when
502called in non-void context. 547called in non-void context.
503 548
504A request always moves through the following five states in its lifetime, 549A request always moves through the following five states in its lifetime,
519untouched. That means that requests that currently execute will not be 564untouched. That means that requests that currently execute will not be
520stopped and resources held by the request will not be freed prematurely. 565stopped and resources held by the request will not be freed prematurely.
521 566
522=back 567=back
523 568
569=head2 IO::AIO::GRP CLASS
570
571This class is a subclass of L<IO::AIO::REQ>, so all its methods apply to
572objects of this class, too.
573
574A IO::AIO::GRP object is a special request that can contain multiple other
575aio requests.
576
577You create one by calling the C<aio_group> constructing function with a
578callback that will be called when all contained requests have entered the
579C<done> state:
580
581 my $grp = aio_group sub {
582 print "all requests are done\n";
583 };
584
585You add requests by calling the C<add> method with one or more
586C<IO::AIO::REQ> objects:
587
588 $grp->add (aio_unlink "...");
589
590 add $grp aio_stat "...", sub { ... };
591
592This makes it very easy to create composite requests (see the source of
593C<aio_move> for an application) that work and feel like simple requests.
594
595The IO::AIO::GRP objects will be cleaned up during calls to
596C<IO::AIO::poll_cb>, just like any other request.
597
598They can be canceled like any other request. Canceling will cancel not
599just the request itself, but also all requests it contains.
600
601They can also can also be added to other IO::AIO::GRP objects.
602
603Their lifetime, simplified, looks like this: when they are empty, they
604will finish very quickly. If they contain only requests that are in the
605C<done> state, they will also finish. Otherwise they will continue to
606exist.
607
608=over 4
609
610=item $grp->add (...)
611
612=item add $grp ...
613
614Add one or more
615Cancels the request, if possible. Has the effect of skipping execution
616when entering the B<execute> state and skipping calling the callback when
617entering the the B<result> state, but will leave the request otherwise
618untouched. That means that requests that currently execute will not be
619stopped and resources held by the request will not be freed prematurely.
620
621=back
622
524=head2 SUPPORT FUNCTIONS 623=head2 SUPPORT FUNCTIONS
525 624
526=over 4 625=over 4
527 626
528=item $fileno = IO::AIO::poll_fileno 627=item $fileno = IO::AIO::poll_fileno

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines