ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-Fork-Pool/Pool.pm
(Generate patch)

Comparing AnyEvent-Fork-Pool/Pool.pm (file contents):
Revision 1.6 by root, Sun Apr 21 11:17:02 2013 UTC vs.
Revision 1.10 by root, Thu Apr 25 01:21:18 2013 UTC

1=head1 NAME 1=head1 NAME
2 2
3AnyEvent::Fork::Pool - simple process pool manager on top of AnyEvent::Fork 3AnyEvent::Fork::Pool - simple process pool manager on top of AnyEvent::Fork
4
5THE API IS NOT FINISHED, CONSIDER THIS AN ALPHA RELEASE
4 6
5=head1 SYNOPSIS 7=head1 SYNOPSIS
6 8
7 use AnyEvent; 9 use AnyEvent;
8 use AnyEvent::Fork::Pool; 10 use AnyEvent::Fork::Pool;
87 89
88use Guard (); 90use Guard ();
89use Array::Heap (); 91use Array::Heap ();
90 92
91use AnyEvent; 93use AnyEvent;
94# explicit version on next line, as some cpan-testers test with the 0.1 version,
95# ignoring dependencies, and this line will at least give a clear indication of that.
92use AnyEvent::Fork; # we don't actually depend on it, this is for convenience 96use AnyEvent::Fork 0.6; # we don't actually depend on it, this is for convenience
93use AnyEvent::Fork::RPC; 97use AnyEvent::Fork::RPC;
94 98
95# these are used for the first and last argument of events 99# these are used for the first and last argument of events
96# in the hope of not colliding. yes, I don't like it either, 100# in the hope of not colliding. yes, I don't like it either,
97# but didn't come up with an obviously better alternative. 101# but didn't come up with an obviously better alternative.
328 $proc->[0] 332 $proc->[0]
329 or --$nidle; 333 or --$nidle;
330 334
331 Array::Heap::splice_heap_idx @pool, $proc->[1] 335 Array::Heap::splice_heap_idx @pool, $proc->[1]
332 if defined $proc->[1]; 336 if defined $proc->[1];
337
338 @$proc = 0; # tell others to leave it be
333 }; 339 };
334 340
335 $want_start = sub { 341 $want_start = sub {
336 undef $stop_w; 342 undef $stop_w;
337 343
356 }; 362 };
357 363
358 $scheduler = sub { 364 $scheduler = sub {
359 if (@queue) { 365 if (@queue) {
360 while (@queue) { 366 while (@queue) {
367 @pool or $start_worker->();
368
361 my $proc = $pool[0]; 369 my $proc = $pool[0];
362 370
363 if ($proc->[0] < $load) { 371 if ($proc->[0] < $load) {
364 # found free worker, increase load 372 # found free worker, increase load
365 unless ($proc->[0]++) { 373 unless ($proc->[0]++) {
383 or $want_stop->(); 391 or $want_stop->();
384 392
385 Array::Heap::adjust_heap_idx @pool, $proc->[1] 393 Array::Heap::adjust_heap_idx @pool, $proc->[1]
386 if defined $proc->[1]; 394 if defined $proc->[1];
387 395
396 &$ocb;
397
388 $scheduler->(); 398 $scheduler->();
389
390 &$ocb;
391 }); 399 });
392 } else { 400 } else {
393 $want_start->() 401 $want_start->()
394 unless @pool >= $max; 402 unless @pool >= $max;
395 403
440to this function are effectively read-only - modifying them after the call 448to this function are effectively read-only - modifying them after the call
441and before the callback is invoked causes undefined behaviour. 449and before the callback is invoked causes undefined behaviour.
442 450
443=cut 451=cut
444 452
453=item $cpus = AnyEvent::Fork::Pool::ncpu [$default_cpus]
454
455=item ($cpus, $eus) = AnyEvent::Fork::Pool::ncpu [$default_cpus]
456
457Tries to detect the number of CPUs (C<$cpus> often called cpu cores
458nowadays) and execution units (C<$eus>) which include e.g. extra
459hyperthreaded units). When C<$cpus> cannot be determined reliably,
460C<$default_cpus> is returned for both values, or C<1> if it is missing.
461
462For normal CPU bound uses, it is wise to have as many worker processes
463as CPUs in the system (C<$cpus>), if nothing else uses the CPU. Using
464hyperthreading is usually detrimental to performance, but in those rare
465cases where that really helps it might be beneficial to use more workers
466(C<$eus>).
467
468Currently, F</proc/cpuinfo> is parsed on GNU/Linux systems for both
469C<$cpus> and C<$eu>, and on {Free,Net,Open}BSD, F<sysctl -n hw.ncpu> is
470used for C<$cpus>.
471
472Example: create a worker pool with as many workers as cpu cores, or C<2>,
473if the actual number could not be determined.
474
475 $fork->AnyEvent::Fork::Pool::run ("myworker::function",
476 max => (scalar AnyEvent::Fork::Pool::ncpu 2),
477 );
478
479=cut
480
481BEGIN {
482 if ($^O eq "linux") {
483 *ncpu = sub(;$) {
484 my ($cpus, $eus);
485
486 if (open my $fh, "<", "/proc/cpuinfo") {
487 my %id;
488
489 while (<$fh>) {
490 if (/^core id\s*:\s*(\d+)/) {
491 ++$eus;
492 undef $id{$1};
493 }
494 }
495
496 $cpus = scalar keys %id;
497 } else {
498 $cpus = $eus = @_ ? shift : 1;
499 }
500 wantarray ? ($cpus, $eus) : $cpus
501 };
502 } elsif ($^O eq "freebsd" || $^O eq "netbsd" || $^O eq "openbsd") {
503 *ncpu = sub(;$) {
504 my $cpus = qx<sysctl -n hw.ncpu> * 1
505 || (@_ ? shift : 1);
506 wantarray ? ($cpus, $cpus) : $cpus
507 };
508 } else {
509 *ncpu = sub(;$) {
510 my $cpus = @_ ? shift : 1;
511 wantarray ? ($cpus, $cpus) : $cpus
512 };
513 }
514}
515
445=back 516=back
446 517
447=head1 CHILD USAGE 518=head1 CHILD USAGE
448 519
449In addition to the L<AnyEvent::Fork::RPC> API, this module implements one 520In addition to the L<AnyEvent::Fork::RPC> API, this module implements one
466deems this useful. For example, after executing a job, one could check 537deems this useful. For example, after executing a job, one could check
467the process size or the number of jobs handled so far, and if either is 538the process size or the number of jobs handled so far, and if either is
468too high, the worker could ask to get retired, to avoid memory leaks to 539too high, the worker could ask to get retired, to avoid memory leaks to
469accumulate. 540accumulate.
470 541
542Example: retire a worker after it has handled roughly 100 requests.
543
544 my $count = 0;
545
546 sub my::worker {
547
548 ++$count == 100
549 and AnyEvent::Fork::Pool::retire ();
550
551 ... normal code goes here
552 }
553
471=back 554=back
472 555
473=head1 POOL PARAMETERS RECIPES 556=head1 POOL PARAMETERS RECIPES
474 557
475This section describes some recipes for pool paramaters. These are mostly 558This section describes some recipes for pool paramaters. These are mostly
513having only one worker is best, for example, when you read or write big 596having only one worker is best, for example, when you read or write big
514files at maixmum speed, as a second worker will increase seek times. 597files at maixmum speed, as a second worker will increase seek times.
515 598
516=back 599=back
517 600
601=head1 EXCEPTIONS
602
603The same "policy" as with L<AnyEvent::Fork::RPC> applies - exceptins will
604not be caught, and exceptions in both worker and in callbacks causes
605undesirable or undefined behaviour.
606
518=head1 SEE ALSO 607=head1 SEE ALSO
519 608
520L<AnyEvent::Fork>, to create the processes in the first place. 609L<AnyEvent::Fork>, to create the processes in the first place.
521 610
522L<AnyEvent::Fork::RPC>, which implements the RPC protocol and API. 611L<AnyEvent::Fork::RPC>, which implements the RPC protocol and API.

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines