ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/IO-AIO/AIO.pm
(Generate patch)

Comparing IO-AIO/AIO.pm (file contents):
Revision 1.77 by root, Wed Oct 25 17:57:30 2006 UTC vs.
Revision 1.82 by root, Fri Oct 27 20:10:06 2006 UTC

131 our $VERSION = '2.0'; 131 our $VERSION = '2.0';
132 132
133 our @AIO_REQ = qw(aio_sendfile aio_read aio_write aio_open aio_close aio_stat 133 our @AIO_REQ = qw(aio_sendfile aio_read aio_write aio_open aio_close aio_stat
134 aio_lstat aio_unlink aio_rmdir aio_readdir aio_scandir aio_symlink 134 aio_lstat aio_unlink aio_rmdir aio_readdir aio_scandir aio_symlink
135 aio_fsync aio_fdatasync aio_readahead aio_rename aio_link aio_move 135 aio_fsync aio_fdatasync aio_readahead aio_rename aio_link aio_move
136 aio_group aio_nop); 136 aio_copy aio_group aio_nop aio_mknod);
137 our @EXPORT = (@AIO_REQ, qw(aioreq_pri aioreq_nice)); 137 our @EXPORT = (@AIO_REQ, qw(aioreq_pri aioreq_nice));
138 our @EXPORT_OK = qw(poll_fileno poll_cb poll_wait flush 138 our @EXPORT_OK = qw(poll_fileno poll_cb poll_wait flush
139 min_parallel max_parallel nreqs); 139 min_parallel max_parallel nreqs nready npending);
140 140
141 @IO::AIO::GRP::ISA = 'IO::AIO::REQ'; 141 @IO::AIO::GRP::ISA = 'IO::AIO::REQ';
142 142
143 require XSLoader; 143 require XSLoader;
144 XSLoader::load ("IO::AIO", $VERSION); 144 XSLoader::load ("IO::AIO", $VERSION);
175environment, d) use Glib::filename_from_unicode on unicode filenames or e) 175environment, d) use Glib::filename_from_unicode on unicode filenames or e)
176use something else. 176use something else.
177 177
178=over 4 178=over 4
179 179
180=item aioreq_pri $pri 180=item $prev_pri = aioreq_pri [$pri]
181 181
182Sets the priority for the next aio request. The default priority 182Returns the priority value that would be used for the next request and, if
183C<$pri> is given, sets the priority for the next aio request.
184
183is C<0>, the minimum and maximum priorities are C<-4> and C<4>, 185The default priority is C<0>, the minimum and maximum priorities are C<-4>
184respectively. Requests with higher priority will be serviced first. 186and C<4>, respectively. Requests with higher priority will be serviced
187first.
185 188
186The priority will be reset to C<0> after each call to one of the C<aio_> 189The priority will be reset to C<0> after each call to one of the C<aio_*>
187functions. 190functions.
188 191
189Example: open a file with low priority, then read something from it with 192Example: open a file with low priority, then read something from it with
190higher priority so the read request is serviced before other low priority 193higher priority so the read request is serviced before other low priority
191open requests (potentially spamming the cache): 194open requests (potentially spamming the cache):
261 264
262 aio_read $fh, 7, 15, $buffer, 0, sub { 265 aio_read $fh, 7, 15, $buffer, 0, sub {
263 $_[0] > 0 or die "read error: $!"; 266 $_[0] > 0 or die "read error: $!";
264 print "read $_[0] bytes: <$buffer>\n"; 267 print "read $_[0] bytes: <$buffer>\n";
265 }; 268 };
266
267=item aio_move $srcpath, $dstpath, $callback->($status)
268
269Try to move the I<file> (directories not supported as either source or
270destination) from C<$srcpath> to C<$dstpath> and call the callback with
271the C<0> (error) or C<-1> ok.
272
273This is a composite request that tries to rename(2) the file first. If
274rename files with C<EXDEV>, it creates the destination file with mode 0200
275and copies the contents of the source file into it using C<aio_sendfile>,
276followed by restoring atime, mtime, access mode and uid/gid, in that
277order, and unlinking the C<$srcpath>.
278
279If an error occurs, the partial destination file will be unlinked, if
280possible, except when setting atime, mtime, access mode and uid/gid, where
281errors are being ignored.
282
283=cut
284
285sub aio_move($$$) {
286 my ($src, $dst, $cb) = @_;
287
288 my $grp = aio_group $cb;
289
290 add $grp aio_rename $src, $dst, sub {
291 if ($_[0] && $! == EXDEV) {
292 add $grp aio_open $src, O_RDONLY, 0, sub {
293 if (my $src_fh = $_[0]) {
294 my @stat = stat $src_fh;
295
296 add $grp aio_open $dst, O_WRONLY, 0200, sub {
297 if (my $dst_fh = $_[0]) {
298 add $grp aio_sendfile $dst_fh, $src_fh, 0, $stat[7], sub {
299 close $src_fh;
300
301 if ($_[0] == $stat[7]) {
302 utime $stat[8], $stat[9], $dst;
303 chmod $stat[2] & 07777, $dst_fh;
304 chown $stat[4], $stat[5], $dst_fh;
305 close $dst_fh;
306
307 add $grp aio_unlink $src, sub {
308 $grp->result ($_[0]);
309 };
310 } else {
311 my $errno = $!;
312 add $grp aio_unlink $dst, sub {
313 $! = $errno;
314 $grp->result (-1);
315 };
316 }
317 };
318 } else {
319 $grp->result (-1);
320 }
321 },
322
323 } else {
324 $grp->result (-1);
325 }
326 };
327 } else {
328 $grp->result ($_[0]);
329 }
330 };
331
332 $grp
333}
334 269
335=item aio_sendfile $out_fh, $in_fh, $in_offset, $length, $callback->($retval) 270=item aio_sendfile $out_fh, $in_fh, $in_offset, $length, $callback->($retval)
336 271
337Tries to copy C<$length> bytes from C<$in_fh> to C<$out_fh>. It starts 272Tries to copy C<$length> bytes from C<$in_fh> to C<$out_fh>. It starts
338reading at byte offset C<$in_offset>, and starts writing at the current 273reading at byte offset C<$in_offset>, and starts writing at the current
394=item aio_unlink $pathname, $callback->($status) 329=item aio_unlink $pathname, $callback->($status)
395 330
396Asynchronously unlink (delete) a file and call the callback with the 331Asynchronously unlink (delete) a file and call the callback with the
397result code. 332result code.
398 333
334=item aio_mknod $path, $mode, $dev, $callback->($status)
335
336Asynchronously create a device node (or fifo). See mknod(2): the only
337portable value for C<$mode> is C<S_IFIFO> ored with permissions, and C<0>
338for C<$dev>.
339
399=item aio_link $srcpath, $dstpath, $callback->($status) 340=item aio_link $srcpath, $dstpath, $callback->($status)
400 341
401Asynchronously create a new link to the existing object at C<$srcpath> at 342Asynchronously create a new link to the existing object at C<$srcpath> at
402the path C<$dstpath> and call the callback with the result code. 343the path C<$dstpath> and call the callback with the result code.
403 344
422directory (i.e. opendir + readdir + closedir). The entries will not be 363directory (i.e. opendir + readdir + closedir). The entries will not be
423sorted, and will B<NOT> include the C<.> and C<..> entries. 364sorted, and will B<NOT> include the C<.> and C<..> entries.
424 365
425The callback a single argument which is either C<undef> or an array-ref 366The callback a single argument which is either C<undef> or an array-ref
426with the filenames. 367with the filenames.
368
369=item aio_copy $srcpath, $dstpath, $callback->($status)
370
371Try to copy the I<file> (directories not supported as either source or
372destination) from C<$srcpath> to C<$dstpath> and call the callback with
373the C<0> (error) or C<-1> ok.
374
375This is a composite request that it creates the destination file with
376mode 0200 and copies the contents of the source file into it using
377C<aio_sendfile>, followed by restoring atime, mtime, access mode and
378uid/gid, in that order.
379
380If an error occurs, the partial destination file will be unlinked, if
381possible, except when setting atime, mtime, access mode and uid/gid, where
382errors are being ignored.
383
384=cut
385
386sub aio_copy($$;$) {
387 my ($src, $dst, $cb) = @_;
388
389 my $pri = aioreq_pri;
390 my $grp = aio_group $cb;
391
392 aioreq_pri $pri;
393 add $grp aio_open $src, O_RDONLY, 0, sub {
394 if (my $src_fh = $_[0]) {
395 my @stat = stat $src_fh;
396
397 aioreq_pri $pri;
398 add $grp aio_open $dst, O_CREAT | O_WRONLY | O_TRUNC, 0200, sub {
399 if (my $dst_fh = $_[0]) {
400 aioreq_pri $pri;
401 add $grp aio_sendfile $dst_fh, $src_fh, 0, $stat[7], sub {
402 if ($_[0] == $stat[7]) {
403 $grp->result (0);
404 close $src_fh;
405
406 # those should not normally block. should. should.
407 utime $stat[8], $stat[9], $dst;
408 chmod $stat[2] & 07777, $dst_fh;
409 chown $stat[4], $stat[5], $dst_fh;
410 close $dst_fh;
411 } else {
412 $grp->result (-1);
413 close $src_fh;
414 close $dst_fh;
415
416 aioreq $pri;
417 add $grp aio_unlink $dst;
418 }
419 };
420 } else {
421 $grp->result (-1);
422 }
423 },
424
425 } else {
426 $grp->result (-1);
427 }
428 };
429
430 $grp
431}
432
433=item aio_move $srcpath, $dstpath, $callback->($status)
434
435Try to move the I<file> (directories not supported as either source or
436destination) from C<$srcpath> to C<$dstpath> and call the callback with
437the C<0> (error) or C<-1> ok.
438
439This is a composite request that tries to rename(2) the file first. If
440rename files with C<EXDEV>, it copies the file with C<aio_copy> and, if
441that is successful, unlinking the C<$srcpath>.
442
443=cut
444
445sub aio_move($$;$) {
446 my ($src, $dst, $cb) = @_;
447
448 my $pri = aioreq_pri;
449 my $grp = aio_group $cb;
450
451 aioreq_pri $pri;
452 add $grp aio_rename $src, $dst, sub {
453 if ($_[0] && $! == EXDEV) {
454 aioreq_pri $pri;
455 add $grp aio_copy $src, $dst, sub {
456 $grp->result ($_[0]);
457
458 if (!$_[0]) {
459 aioreq_pri $pri;
460 add $grp aio_unlink $src;
461 }
462 };
463 } else {
464 $grp->result ($_[0]);
465 }
466 };
467
468 $grp
469}
427 470
428=item aio_scandir $path, $maxreq, $callback->($dirs, $nondirs) 471=item aio_scandir $path, $maxreq, $callback->($dirs, $nondirs)
429 472
430Scans a directory (similar to C<aio_readdir>) but additionally tries to 473Scans a directory (similar to C<aio_readdir>) but additionally tries to
431efficiently separate the entries of directory C<$path> into two sets of 474efficiently separate the entries of directory C<$path> into two sets of
433recurse into (everything else, including symlinks to directories). 476recurse into (everything else, including symlinks to directories).
434 477
435C<aio_scandir> is a composite request that creates of many sub requests_ 478C<aio_scandir> is a composite request that creates of many sub requests_
436C<$maxreq> specifies the maximum number of outstanding aio requests that 479C<$maxreq> specifies the maximum number of outstanding aio requests that
437this function generates. If it is C<< <= 0 >>, then a suitable default 480this function generates. If it is C<< <= 0 >>, then a suitable default
438will be chosen (currently 6). 481will be chosen (currently 4).
439 482
440On error, the callback is called without arguments, otherwise it receives 483On error, the callback is called without arguments, otherwise it receives
441two array-refs with path-relative entry names. 484two array-refs with path-relative entry names.
442 485
443Example: 486Example:
480=cut 523=cut
481 524
482sub aio_scandir($$$) { 525sub aio_scandir($$$) {
483 my ($path, $maxreq, $cb) = @_; 526 my ($path, $maxreq, $cb) = @_;
484 527
528 my $pri = aioreq_pri;
529
485 my $grp = aio_group $cb; 530 my $grp = aio_group $cb;
486 531
487 $maxreq = 6 if $maxreq <= 0; 532 $maxreq = 4 if $maxreq <= 0;
488 533
489 # stat once 534 # stat once
535 aioreq_pri $pri;
490 add $grp aio_stat $path, sub { 536 add $grp aio_stat $path, sub {
491 return $grp->result () if $_[0]; 537 return $grp->result () if $_[0];
492 my $now = time; 538 my $now = time;
493 my $hash1 = join ":", (stat _)[0,1,3,7,9]; 539 my $hash1 = join ":", (stat _)[0,1,3,7,9];
494 540
495 # read the directory entries 541 # read the directory entries
542 aioreq_pri $pri;
496 add $grp aio_readdir $path, sub { 543 add $grp aio_readdir $path, sub {
497 my $entries = shift 544 my $entries = shift
498 or return $grp->result (); 545 or return $grp->result ();
499 546
500 # stat the dir another time 547 # stat the dir another time
548 aioreq_pri $pri;
501 add $grp aio_stat $path, sub { 549 add $grp aio_stat $path, sub {
502 my $hash2 = join ":", (stat _)[0,1,3,7,9]; 550 my $hash2 = join ":", (stat _)[0,1,3,7,9];
503 551
504 my $ndirs; 552 my $ndirs;
505 553
529 limit $statgrp $maxreq; 577 limit $statgrp $maxreq;
530 feed $statgrp sub { 578 feed $statgrp sub {
531 return unless @$entries; 579 return unless @$entries;
532 my $entry = pop @$entries; 580 my $entry = pop @$entries;
533 581
582 aioreq_pri $pri;
534 add $statgrp aio_stat "$path/$entry/.", sub { 583 add $statgrp aio_stat "$path/$entry/.", sub {
535 if ($_[0] < 0) { 584 if ($_[0] < 0) {
536 push @nondirs, $entry; 585 push @nondirs, $entry;
537 } else { 586 } else {
538 # need to check for real directory 587 # need to check for real directory
588 aioreq_pri $pri;
539 add $statgrp aio_lstat "$path/$entry", sub { 589 add $statgrp aio_lstat "$path/$entry", sub {
540 if (-d _) { 590 if (-d _) {
541 push @dirs, $entry; 591 push @dirs, $entry;
542 592
543 unless (--$ndirs) { 593 unless (--$ndirs) {
714itself. Useful when you queued a lot of events but got a result early. 764itself. Useful when you queued a lot of events but got a result early.
715 765
716=item $grp->result (...) 766=item $grp->result (...)
717 767
718Set the result value(s) that will be passed to the group callback when all 768Set the result value(s) that will be passed to the group callback when all
719subrequests have finished. By default, no argument will be passed. 769subrequests have finished and set thre groups errno to the current value
770of errno (just like calling C<errno> without an error number). By default,
771no argument will be passed and errno is zero.
772
773=item $grp->errno ([$errno])
774
775Sets the group errno value to C<$errno>, or the current value of errno
776when the argument is missing.
777
778Every aio request has an associated errno value that is restored when
779the callback is invoked. This method lets you change this value from its
780default (0).
781
782Calling C<result> will also set errno, so make sure you either set C<$!>
783before the call to C<result>, or call c<errno> after it.
720 784
721=item feed $grp $callback->($grp) 785=item feed $grp $callback->($grp)
722 786
723Sets a feeder/generator on this group: every group can have an attached 787Sets a feeder/generator on this group: every group can have an attached
724generator that generates requests if idle. The idea behind this is that, 788generator that generates requests if idle. The idea behind this is that,
780 844
781Process all outstanding events on the result pipe. You have to call this 845Process all outstanding events on the result pipe. You have to call this
782regularly. Returns the number of events processed. Returns immediately 846regularly. Returns the number of events processed. Returns immediately
783when no events are outstanding. 847when no events are outstanding.
784 848
849If not all requests were processed for whatever reason, the filehandle
850will still be ready when C<poll_cb> returns.
851
785Example: Install an Event watcher that automatically calls 852Example: Install an Event watcher that automatically calls
786IO::AIO::poll_cb with high priority: 853IO::AIO::poll_cb with high priority:
787 854
788 Event->io (fd => IO::AIO::poll_fileno, 855 Event->io (fd => IO::AIO::poll_fileno,
789 poll => 'r', async => 1, 856 poll => 'r', async => 1,
790 cb => \&IO::AIO::poll_cb); 857 cb => \&IO::AIO::poll_cb);
791 858
859=item IO::AIO::poll_some $max_requests
860
861Similar to C<poll_cb>, but only processes up to C<$max_requests> requests
862at a time.
863
864Useful if you want to ensure some level of interactiveness when perl is
865not fast enough to process all requests in time.
866
867Example: Install an Event watcher that automatically calls
868IO::AIO::poll_some with low priority, to ensure that other parts of the
869program get the CPU sometimes even under high AIO load.
870
871 Event->io (fd => IO::AIO::poll_fileno,
872 poll => 'r', nice => 1,
873 cb => sub { IO::AIO::poll_some 256 });
874
792=item IO::AIO::poll_wait 875=item IO::AIO::poll_wait
793 876
794Wait till the result filehandle becomes ready for reading (simply does a 877Wait till the result filehandle becomes ready for reading (simply does a
795C<select> on the filehandle. This is useful if you want to synchronously wait 878C<select> on the filehandle. This is useful if you want to synchronously wait
796for some requests to finish). 879for some requests to finish).
797 880
798See C<nreqs> for an example. 881See C<nreqs> for an example.
799 882
800=item IO::AIO::nreqs 883=item IO::AIO::nreqs
801 884
802Returns the number of requests currently outstanding (i.e. for which their 885Returns the number of requests currently in the ready, execute or pending
803callback has not been invoked yet). 886states (i.e. for which their callback has not been invoked yet).
804 887
805Example: wait till there are no outstanding requests anymore: 888Example: wait till there are no outstanding requests anymore:
806 889
807 IO::AIO::poll_wait, IO::AIO::poll_cb 890 IO::AIO::poll_wait, IO::AIO::poll_cb
808 while IO::AIO::nreqs; 891 while IO::AIO::nreqs;
892
893=item IO::AIO::nready
894
895Returns the number of requests currently in the ready state (not yet
896executed).
897
898=item IO::AIO::npending
899
900Returns the number of requests currently in the pending state (executed,
901but not yet processed by poll_cb).
809 902
810=item IO::AIO::flush 903=item IO::AIO::flush
811 904
812Wait till all outstanding AIO requests have been handled. 905Wait till all outstanding AIO requests have been handled.
813 906
855This module automatically runs C<max_parallel 0> at program end, to ensure 948This module automatically runs C<max_parallel 0> at program end, to ensure
856that all threads are killed and that there are no outstanding requests. 949that all threads are killed and that there are no outstanding requests.
857 950
858Under normal circumstances you don't need to call this function. 951Under normal circumstances you don't need to call this function.
859 952
860=item $oldnreqs = IO::AIO::max_outstanding $nreqs 953=item $oldmaxreqs = IO::AIO::max_outstanding $maxreqs
861 954
862[REMOVED] 955This is a very bad function to use in interactive programs because it
956blocks, and a bad way to reduce concurrency because it is inexact: Better
957use an C<aio_group> together with a feed callback.
863 958
864Pre-2.x versions used max_outstanding for a crude request queue length limit.
865
866In 2.x+ you are advised to use a group and a feeder to limit
867concurrency. The max_outstanding feature ran very unstable (endless
868recursions causing segfaults, bad interaction with groups etc.) and was
869removed.
870
871I am deeply sorry, but I am still on the hunt for a good limiting interface.
872
873Original description was as follows:
874
875Sets the maximum number of outstanding requests to C<$nreqs>. If you try 959Sets the maximum number of outstanding requests to C<$nreqs>. If you
876to queue up more than this number of requests, the caller will block until 960to queue up more than this number of requests, the next call to the
877some requests have been handled. 961C<poll_cb> (and C<poll_some> and other functions calling C<poll_cb>)
962function will block until the limit is no longer exceeded.
963
964The default value is very large, so there is no practical limit on the
965number of outstanding requests.
966
967You can still queue as many requests as you want. Therefore,
968C<max_oustsanding> is mainly useful in simple scripts (with low values) or
969as a stop gap to shield against fatal memory overflow (with large values).
878 970
879=back 971=back
880 972
881=cut 973=cut
882 974
897} 989}
898 990
899min_parallel 8; 991min_parallel 8;
900 992
901END { 993END {
902 max_parallel 0; 994 flush;
903} 995};
904 996
9051; 9971;
906 998
907=head2 FORK BEHAVIOUR 999=head2 FORK BEHAVIOUR
908 1000

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines