ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-Fork-Pool/Pool.pm
(Generate patch)

Comparing AnyEvent-Fork-Pool/Pool.pm (file contents):
Revision 1.6 by root, Sun Apr 21 11:17:02 2013 UTC vs.
Revision 1.9 by root, Thu Apr 25 00:27:22 2013 UTC

1=head1 NAME 1=head1 NAME
2 2
3AnyEvent::Fork::Pool - simple process pool manager on top of AnyEvent::Fork 3AnyEvent::Fork::Pool - simple process pool manager on top of AnyEvent::Fork
4
5THE API IS NOT FINISHED, CONSIDER THIS AN ALPHA RELEASE
4 6
5=head1 SYNOPSIS 7=head1 SYNOPSIS
6 8
7 use AnyEvent; 9 use AnyEvent;
8 use AnyEvent::Fork::Pool; 10 use AnyEvent::Fork::Pool;
87 89
88use Guard (); 90use Guard ();
89use Array::Heap (); 91use Array::Heap ();
90 92
91use AnyEvent; 93use AnyEvent;
94# explicit version on next line, as some cpan-testers test with the 0.1 version,
95# ignoring dependencies, and this line will at least give a clear indication of that.
92use AnyEvent::Fork; # we don't actually depend on it, this is for convenience 96use AnyEvent::Fork 0.6; # we don't actually depend on it, this is for convenience
93use AnyEvent::Fork::RPC; 97use AnyEvent::Fork::RPC;
94 98
95# these are used for the first and last argument of events 99# these are used for the first and last argument of events
96# in the hope of not colliding. yes, I don't like it either, 100# in the hope of not colliding. yes, I don't like it either,
97# but didn't come up with an obviously better alternative. 101# but didn't come up with an obviously better alternative.
328 $proc->[0] 332 $proc->[0]
329 or --$nidle; 333 or --$nidle;
330 334
331 Array::Heap::splice_heap_idx @pool, $proc->[1] 335 Array::Heap::splice_heap_idx @pool, $proc->[1]
332 if defined $proc->[1]; 336 if defined $proc->[1];
337
338 @$proc = 0; # tell others to leave it be
333 }; 339 };
334 340
335 $want_start = sub { 341 $want_start = sub {
336 undef $stop_w; 342 undef $stop_w;
337 343
356 }; 362 };
357 363
358 $scheduler = sub { 364 $scheduler = sub {
359 if (@queue) { 365 if (@queue) {
360 while (@queue) { 366 while (@queue) {
367 @pool or $start_worker->();
368
361 my $proc = $pool[0]; 369 my $proc = $pool[0];
362 370
363 if ($proc->[0] < $load) { 371 if ($proc->[0] < $load) {
364 # found free worker, increase load 372 # found free worker, increase load
365 unless ($proc->[0]++) { 373 unless ($proc->[0]++) {
383 or $want_stop->(); 391 or $want_stop->();
384 392
385 Array::Heap::adjust_heap_idx @pool, $proc->[1] 393 Array::Heap::adjust_heap_idx @pool, $proc->[1]
386 if defined $proc->[1]; 394 if defined $proc->[1];
387 395
396 &$ocb;
397
388 $scheduler->(); 398 $scheduler->();
389
390 &$ocb;
391 }); 399 });
392 } else { 400 } else {
393 $want_start->() 401 $want_start->()
394 unless @pool >= $max; 402 unless @pool >= $max;
395 403
440to this function are effectively read-only - modifying them after the call 448to this function are effectively read-only - modifying them after the call
441and before the callback is invoked causes undefined behaviour. 449and before the callback is invoked causes undefined behaviour.
442 450
443=cut 451=cut
444 452
453=item $cpus = AnyEvent::Fork::Pool::ncpu [$default_cpus]
454
455=item ($cpus, $eus) = AnyEvent::Fork::Pool::ncpu [$default_cpus]
456
457Tries to detect the number of CPUs (C<$cpus> often called cpu cores
458nowadays) and execution units (C<$eus>) which include e.g. extra
459hyperthreaded units). When C<$cpus> cannot be determined reliably,
460C<$default_cpus> is returned for both values, or C<1> if it is missing.
461
462For normal CPU bound uses, it is wise to have as many worker processes
463as CPUs in the system (C<$cpus>), if nothing else uses the CPU. Using
464hyperthreading is usually detrimental to performance, but in those rare
465cases where that really helps it might be beneficial to use more workers
466(C<$eus>).
467
468Currently, F</proc/cpuinfo> is parsed on GNU/Linux systems for both
469C<$cpus> and C<$eu>, and on {Free,Net,Open}BSD, F<sysctl -n hw.ncpu> is
470used for C<$cpus>.
471
472Example: create a worker pool with as many workers as cpu cores, or C<2>,
473if the actual number could not be determined.
474
475 $fork->AnyEvent::Fork::Pool::run ("myworker::function",
476 max => (scalar AnyEvent::Fork::Pool::ncpu 2),
477 );
478
479=cut
480
481BEGIN {
482 if ($^O eq "linux") {
483 *ncpu = sub(;$) {
484 my ($cpus, $eus);
485
486 if (open my $fh, "<", "/proc/cpuinfo") {
487 my %id;
488
489 while (<$fh>) {
490 if (/^core id\s*:\s*(\d+)/) {
491 ++$eus;
492 undef $id{$1};
493 }
494 }
495
496 $cpus = scalar keys %id;
497 } else {
498 $cpus = $eus = @_ ? shift : 1;
499 }
500 wantarray ? ($cpus, $eus) : $cpus
501 };
502 } elsif ($^O eq "freebsd" || $^O eq "netbsd" || $^O eq "openbsd") {
503 *ncpu = sub(;$) {
504 my $cpus = qx<sysctl -n hw.ncpu> * 1
505 || (@_ ? shift : 1);
506 wantarray ? ($cpus, $cpus) : $cpus
507 };
508 } else {
509 *ncpu = sub(;$) {
510 my $cpus = @_ ? shift : 1;
511 wantarray ? ($cpus, $cpus) : $cpus
512 };
513 }
514}
515
445=back 516=back
446 517
447=head1 CHILD USAGE 518=head1 CHILD USAGE
448 519
449In addition to the L<AnyEvent::Fork::RPC> API, this module implements one 520In addition to the L<AnyEvent::Fork::RPC> API, this module implements one
513having only one worker is best, for example, when you read or write big 584having only one worker is best, for example, when you read or write big
514files at maixmum speed, as a second worker will increase seek times. 585files at maixmum speed, as a second worker will increase seek times.
515 586
516=back 587=back
517 588
589=head1 EXCEPTIONS
590
591The same "policy" as with L<AnyEvent::Fork::RPC> applies - exceptins will
592not be caught, and exceptions in both worker and in callbacks causes
593undesirable or undefined behaviour.
594
518=head1 SEE ALSO 595=head1 SEE ALSO
519 596
520L<AnyEvent::Fork>, to create the processes in the first place. 597L<AnyEvent::Fork>, to create the processes in the first place.
521 598
522L<AnyEvent::Fork::RPC>, which implements the RPC protocol and API. 599L<AnyEvent::Fork::RPC>, which implements the RPC protocol and API.

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines