ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-Fork-Pool/Pool.pm
(Generate patch)

Comparing AnyEvent-Fork-Pool/Pool.pm (file contents):
Revision 1.6 by root, Sun Apr 21 11:17:02 2013 UTC vs.
Revision 1.15 by root, Mon Oct 27 22:24:52 2014 UTC

3AnyEvent::Fork::Pool - simple process pool manager on top of AnyEvent::Fork 3AnyEvent::Fork::Pool - simple process pool manager on top of AnyEvent::Fork
4 4
5=head1 SYNOPSIS 5=head1 SYNOPSIS
6 6
7 use AnyEvent; 7 use AnyEvent;
8 use AnyEvent::Fork;
8 use AnyEvent::Fork::Pool; 9 use AnyEvent::Fork::Pool;
9 # use AnyEvent::Fork is not needed
10 10
11 # all possible parameters shown, with default values 11 # all possible parameters shown, with default values
12 my $pool = AnyEvent::Fork 12 my $pool = AnyEvent::Fork
13 ->new 13 ->new
14 ->require ("MyWorker") 14 ->require ("MyWorker")
41 41
42 $finish->recv; 42 $finish->recv;
43 43
44=head1 DESCRIPTION 44=head1 DESCRIPTION
45 45
46This module uses processes created via L<AnyEvent::Fork> and the RPC 46This module uses processes created via L<AnyEvent::Fork> (or
47protocol implement in L<AnyEvent::Fork::RPC> to create a load-balanced 47L<AnyEvent::Fork::Remote>) and the RPC protocol implement in
48pool of processes that handles jobs. 48L<AnyEvent::Fork::RPC> to create a load-balanced pool of processes that
49handles jobs.
49 50
50Understanding of L<AnyEvent::Fork> is helpful but not critical to be able 51Understanding of L<AnyEvent::Fork> is helpful but not critical to be able
51to use this module, but a thorough understanding of L<AnyEvent::Fork::RPC> 52to use this module, but a thorough understanding of L<AnyEvent::Fork::RPC>
52is, as it defines the actual API that needs to be implemented in the 53is, as it defines the actual API that needs to be implemented in the
53worker processes. 54worker processes.
54
55=head1 EXAMPLES
56 55
57=head1 PARENT USAGE 56=head1 PARENT USAGE
58 57
59To create a pool, you first have to create a L<AnyEvent::Fork> object - 58To create a pool, you first have to create a L<AnyEvent::Fork> object -
60this object becomes your template process. Whenever a new worker process 59this object becomes your template process. Whenever a new worker process
87 86
88use Guard (); 87use Guard ();
89use Array::Heap (); 88use Array::Heap ();
90 89
91use AnyEvent; 90use AnyEvent;
92use AnyEvent::Fork; # we don't actually depend on it, this is for convenience
93use AnyEvent::Fork::RPC; 91use AnyEvent::Fork::RPC;
94 92
95# these are used for the first and last argument of events 93# these are used for the first and last argument of events
96# in the hope of not colliding. yes, I don't like it either, 94# in the hope of not colliding. yes, I don't like it either,
97# but didn't come up with an obviously better alternative. 95# but didn't come up with an obviously better alternative.
98my $magic0 = ':t6Z@HK1N%Dx@_7?=~-7NQgWDdAs6a,jFN=wLO0*jD*1%P'; 96my $magic0 = ':t6Z@HK1N%Dx@_7?=~-7NQgWDdAs6a,jFN=wLO0*jD*1%P';
99my $magic1 = '<~53rexz.U`!]X[A235^"fyEoiTF\T~oH1l/N6+Djep9b~bI9`\1x%B~vWO1q*'; 97my $magic1 = '<~53rexz.U`!]X[A235^"fyEoiTF\T~oH1l/N6+Djep9b~bI9`\1x%B~vWO1q*';
100 98
101our $VERSION = 0.1; 99our $VERSION = 1.2;
102 100
103=item my $pool = AnyEvent::Fork::Pool::run $fork, $function, [key => value...] 101=item my $pool = AnyEvent::Fork::Pool::run $fork, $function, [key => value...]
104 102
105The traditional way to call the pool creation function. But it is way 103The traditional way to call the pool creation function. But it is way
106cooler to call it in the following way: 104cooler to call it in the following way:
328 $proc->[0] 326 $proc->[0]
329 or --$nidle; 327 or --$nidle;
330 328
331 Array::Heap::splice_heap_idx @pool, $proc->[1] 329 Array::Heap::splice_heap_idx @pool, $proc->[1]
332 if defined $proc->[1]; 330 if defined $proc->[1];
331
332 @$proc = 0; # tell others to leave it be
333 }; 333 };
334 334
335 $want_start = sub { 335 $want_start = sub {
336 undef $stop_w; 336 undef $stop_w;
337 337
356 }; 356 };
357 357
358 $scheduler = sub { 358 $scheduler = sub {
359 if (@queue) { 359 if (@queue) {
360 while (@queue) { 360 while (@queue) {
361 @pool or $start_worker->();
362
361 my $proc = $pool[0]; 363 my $proc = $pool[0];
362 364
363 if ($proc->[0] < $load) { 365 if ($proc->[0] < $load) {
364 # found free worker, increase load 366 # found free worker, increase load
365 unless ($proc->[0]++) { 367 unless ($proc->[0]++) {
383 or $want_stop->(); 385 or $want_stop->();
384 386
385 Array::Heap::adjust_heap_idx @pool, $proc->[1] 387 Array::Heap::adjust_heap_idx @pool, $proc->[1]
386 if defined $proc->[1]; 388 if defined $proc->[1];
387 389
390 &$ocb;
391
388 $scheduler->(); 392 $scheduler->();
389
390 &$ocb;
391 }); 393 });
392 } else { 394 } else {
393 $want_start->() 395 $want_start->()
394 unless @pool >= $max; 396 unless @pool >= $max;
395 397
396 last; 398 last;
397 } 399 }
398 } 400 }
399 } elsif ($shutdown) { 401 } elsif ($shutdown) {
400 @pool = (); 402 undef $_->[2]
403 for @pool;
404
401 undef $start_w; 405 undef $start_w;
402 undef $start_worker; # frees $destroy_guard reference 406 undef $start_worker; # frees $destroy_guard reference
403 407
404 $stop_worker->($pool[0]) 408 $stop_worker->($pool[0])
405 while $nidle; 409 while $nidle;
440to this function are effectively read-only - modifying them after the call 444to this function are effectively read-only - modifying them after the call
441and before the callback is invoked causes undefined behaviour. 445and before the callback is invoked causes undefined behaviour.
442 446
443=cut 447=cut
444 448
449=item $cpus = AnyEvent::Fork::Pool::ncpu [$default_cpus]
450
451=item ($cpus, $eus) = AnyEvent::Fork::Pool::ncpu [$default_cpus]
452
453Tries to detect the number of CPUs (C<$cpus> often called CPU cores
454nowadays) and execution units (C<$eus>) which include e.g. extra
455hyperthreaded units). When C<$cpus> cannot be determined reliably,
456C<$default_cpus> is returned for both values, or C<1> if it is missing.
457
458For normal CPU bound uses, it is wise to have as many worker processes
459as CPUs in the system (C<$cpus>), if nothing else uses the CPU. Using
460hyperthreading is usually detrimental to performance, but in those rare
461cases where that really helps it might be beneficial to use more workers
462(C<$eus>).
463
464Currently, F</proc/cpuinfo> is parsed on GNU/Linux systems for both
465C<$cpus> and C<$eus>, and on {Free,Net,Open}BSD, F<sysctl -n hw.ncpu> is
466used for C<$cpus>.
467
468Example: create a worker pool with as many workers as CPU cores, or C<2>,
469if the actual number could not be determined.
470
471 $fork->AnyEvent::Fork::Pool::run ("myworker::function",
472 max => (scalar AnyEvent::Fork::Pool::ncpu 2),
473 );
474
475=cut
476
477BEGIN {
478 if ($^O eq "linux") {
479 *ncpu = sub(;$) {
480 my ($cpus, $eus);
481
482 if (open my $fh, "<", "/proc/cpuinfo") {
483 my %id;
484
485 while (<$fh>) {
486 if (/^core id\s*:\s*(\d+)/) {
487 ++$eus;
488 undef $id{$1};
489 }
490 }
491
492 $cpus = scalar keys %id;
493 } else {
494 $cpus = $eus = @_ ? shift : 1;
495 }
496 wantarray ? ($cpus, $eus) : $cpus
497 };
498 } elsif ($^O eq "freebsd" || $^O eq "netbsd" || $^O eq "openbsd") {
499 *ncpu = sub(;$) {
500 my $cpus = qx<sysctl -n hw.ncpu> * 1
501 || (@_ ? shift : 1);
502 wantarray ? ($cpus, $cpus) : $cpus
503 };
504 } else {
505 *ncpu = sub(;$) {
506 my $cpus = @_ ? shift : 1;
507 wantarray ? ($cpus, $cpus) : $cpus
508 };
509 }
510}
511
445=back 512=back
446 513
447=head1 CHILD USAGE 514=head1 CHILD USAGE
448 515
449In addition to the L<AnyEvent::Fork::RPC> API, this module implements one 516In addition to the L<AnyEvent::Fork::RPC> API, this module implements one
453 520
454=item AnyEvent::Fork::Pool::retire () 521=item AnyEvent::Fork::Pool::retire ()
455 522
456This function sends an event to the parent process to request retirement: 523This function sends an event to the parent process to request retirement:
457the worker is removed from the pool and no new jobs will be sent to it, 524the worker is removed from the pool and no new jobs will be sent to it,
458but it has to handle the jobs that are already queued. 525but it still has to handle the jobs that are already queued.
459 526
460The parentheses are part of the syntax: the function usually isn't defined 527The parentheses are part of the syntax: the function usually isn't defined
461when you compile your code (because that happens I<before> handing the 528when you compile your code (because that happens I<before> handing the
462template process over to C<AnyEvent::Fork::Pool::run>, so you need the 529template process over to C<AnyEvent::Fork::Pool::run>, so you need the
463empty parentheses to tell Perl that the function is indeed a function. 530empty parentheses to tell Perl that the function is indeed a function.
464 531
465Retiring a worker can be useful to gracefully shut it down when the worker 532Retiring a worker can be useful to gracefully shut it down when the worker
466deems this useful. For example, after executing a job, one could check 533deems this useful. For example, after executing a job, it could check the
467the process size or the number of jobs handled so far, and if either is 534process size or the number of jobs handled so far, and if either is too
468too high, the worker could ask to get retired, to avoid memory leaks to 535high, the worker could request to be retired, to avoid memory leaks to
469accumulate. 536accumulate.
470 537
538Example: retire a worker after it has handled roughly 100 requests. It
539doesn't matter whether you retire at the beginning or end of your request,
540as the worker will continue to handle some outstanding requests. Likewise,
541it's ok to call retire multiple times.
542
543 my $count = 0;
544
545 sub my::worker {
546
547 ++$count == 100
548 and AnyEvent::Fork::Pool::retire ();
549
550 ... normal code goes here
551 }
552
471=back 553=back
472 554
473=head1 POOL PARAMETERS RECIPES 555=head1 POOL PARAMETERS RECIPES
474 556
475This section describes some recipes for pool paramaters. These are mostly 557This section describes some recipes for pool parameters. These are mostly
476meant for the synchronous RPC backend, as the asynchronous RPC backend 558meant for the synchronous RPC backend, as the asynchronous RPC backend
477changes the rules considerably, making workers themselves responsible for 559changes the rules considerably, making workers themselves responsible for
478their scheduling. 560their scheduling.
479 561
480=over 4 562=over 4
509=item high throughput, I/O bound jobs - set load >= 2, max = 1, or very high 591=item high throughput, I/O bound jobs - set load >= 2, max = 1, or very high
510 592
511When your jobs are I/O bound, using more workers usually boils down to 593When your jobs are I/O bound, using more workers usually boils down to
512higher throughput, depending very much on your actual workload - sometimes 594higher throughput, depending very much on your actual workload - sometimes
513having only one worker is best, for example, when you read or write big 595having only one worker is best, for example, when you read or write big
514files at maixmum speed, as a second worker will increase seek times. 596files at maximum speed, as a second worker will increase seek times.
515 597
516=back 598=back
517 599
600=head1 EXCEPTIONS
601
602The same "policy" as with L<AnyEvent::Fork::RPC> applies - exceptions
603will not be caught, and exceptions in both worker and in callbacks causes
604undesirable or undefined behaviour.
605
518=head1 SEE ALSO 606=head1 SEE ALSO
519 607
520L<AnyEvent::Fork>, to create the processes in the first place. 608L<AnyEvent::Fork>, to create the processes in the first place.
609
610L<AnyEvent::Fork::Remote>, likewise, but helpful for remote processes.
521 611
522L<AnyEvent::Fork::RPC>, which implements the RPC protocol and API. 612L<AnyEvent::Fork::RPC>, which implements the RPC protocol and API.
523 613
524=head1 AUTHOR AND CONTACT INFORMATION 614=head1 AUTHOR AND CONTACT INFORMATION
525 615

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines