ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-Fork-Pool/Pool.pm
(Generate patch)

Comparing AnyEvent-Fork-Pool/Pool.pm (file contents):
Revision 1.6 by root, Sun Apr 21 11:17:02 2013 UTC vs.
Revision 1.13 by root, Sun Apr 28 14:27:31 2013 UTC

41 41
42 $finish->recv; 42 $finish->recv;
43 43
44=head1 DESCRIPTION 44=head1 DESCRIPTION
45 45
46This module uses processes created via L<AnyEvent::Fork> and the RPC 46This module uses processes created via L<AnyEvent::Fork> (or
47protocol implement in L<AnyEvent::Fork::RPC> to create a load-balanced 47L<AnyEvent::Fork::Remote>) and the RPC protocol implement in
48pool of processes that handles jobs. 48L<AnyEvent::Fork::RPC> to create a load-balanced pool of processes that
49handles jobs.
49 50
50Understanding of L<AnyEvent::Fork> is helpful but not critical to be able 51Understanding of L<AnyEvent::Fork> is helpful but not critical to be able
51to use this module, but a thorough understanding of L<AnyEvent::Fork::RPC> 52to use this module, but a thorough understanding of L<AnyEvent::Fork::RPC>
52is, as it defines the actual API that needs to be implemented in the 53is, as it defines the actual API that needs to be implemented in the
53worker processes. 54worker processes.
54
55=head1 EXAMPLES
56 55
57=head1 PARENT USAGE 56=head1 PARENT USAGE
58 57
59To create a pool, you first have to create a L<AnyEvent::Fork> object - 58To create a pool, you first have to create a L<AnyEvent::Fork> object -
60this object becomes your template process. Whenever a new worker process 59this object becomes your template process. Whenever a new worker process
87 86
88use Guard (); 87use Guard ();
89use Array::Heap (); 88use Array::Heap ();
90 89
91use AnyEvent; 90use AnyEvent;
91# explicit version on next line, as some cpan-testers test with the 0.1 version,
92# ignoring dependencies, and this line will at least give a clear indication of that.
92use AnyEvent::Fork; # we don't actually depend on it, this is for convenience 93use AnyEvent::Fork 0.6; # we don't actually depend on it, this is for convenience
93use AnyEvent::Fork::RPC; 94use AnyEvent::Fork::RPC;
94 95
95# these are used for the first and last argument of events 96# these are used for the first and last argument of events
96# in the hope of not colliding. yes, I don't like it either, 97# in the hope of not colliding. yes, I don't like it either,
97# but didn't come up with an obviously better alternative. 98# but didn't come up with an obviously better alternative.
98my $magic0 = ':t6Z@HK1N%Dx@_7?=~-7NQgWDdAs6a,jFN=wLO0*jD*1%P'; 99my $magic0 = ':t6Z@HK1N%Dx@_7?=~-7NQgWDdAs6a,jFN=wLO0*jD*1%P';
99my $magic1 = '<~53rexz.U`!]X[A235^"fyEoiTF\T~oH1l/N6+Djep9b~bI9`\1x%B~vWO1q*'; 100my $magic1 = '<~53rexz.U`!]X[A235^"fyEoiTF\T~oH1l/N6+Djep9b~bI9`\1x%B~vWO1q*';
100 101
101our $VERSION = 0.1; 102our $VERSION = 1.1;
102 103
103=item my $pool = AnyEvent::Fork::Pool::run $fork, $function, [key => value...] 104=item my $pool = AnyEvent::Fork::Pool::run $fork, $function, [key => value...]
104 105
105The traditional way to call the pool creation function. But it is way 106The traditional way to call the pool creation function. But it is way
106cooler to call it in the following way: 107cooler to call it in the following way:
328 $proc->[0] 329 $proc->[0]
329 or --$nidle; 330 or --$nidle;
330 331
331 Array::Heap::splice_heap_idx @pool, $proc->[1] 332 Array::Heap::splice_heap_idx @pool, $proc->[1]
332 if defined $proc->[1]; 333 if defined $proc->[1];
334
335 @$proc = 0; # tell others to leave it be
333 }; 336 };
334 337
335 $want_start = sub { 338 $want_start = sub {
336 undef $stop_w; 339 undef $stop_w;
337 340
356 }; 359 };
357 360
358 $scheduler = sub { 361 $scheduler = sub {
359 if (@queue) { 362 if (@queue) {
360 while (@queue) { 363 while (@queue) {
364 @pool or $start_worker->();
365
361 my $proc = $pool[0]; 366 my $proc = $pool[0];
362 367
363 if ($proc->[0] < $load) { 368 if ($proc->[0] < $load) {
364 # found free worker, increase load 369 # found free worker, increase load
365 unless ($proc->[0]++) { 370 unless ($proc->[0]++) {
383 or $want_stop->(); 388 or $want_stop->();
384 389
385 Array::Heap::adjust_heap_idx @pool, $proc->[1] 390 Array::Heap::adjust_heap_idx @pool, $proc->[1]
386 if defined $proc->[1]; 391 if defined $proc->[1];
387 392
393 &$ocb;
394
388 $scheduler->(); 395 $scheduler->();
389
390 &$ocb;
391 }); 396 });
392 } else { 397 } else {
393 $want_start->() 398 $want_start->()
394 unless @pool >= $max; 399 unless @pool >= $max;
395 400
440to this function are effectively read-only - modifying them after the call 445to this function are effectively read-only - modifying them after the call
441and before the callback is invoked causes undefined behaviour. 446and before the callback is invoked causes undefined behaviour.
442 447
443=cut 448=cut
444 449
450=item $cpus = AnyEvent::Fork::Pool::ncpu [$default_cpus]
451
452=item ($cpus, $eus) = AnyEvent::Fork::Pool::ncpu [$default_cpus]
453
454Tries to detect the number of CPUs (C<$cpus> often called cpu cores
455nowadays) and execution units (C<$eus>) which include e.g. extra
456hyperthreaded units). When C<$cpus> cannot be determined reliably,
457C<$default_cpus> is returned for both values, or C<1> if it is missing.
458
459For normal CPU bound uses, it is wise to have as many worker processes
460as CPUs in the system (C<$cpus>), if nothing else uses the CPU. Using
461hyperthreading is usually detrimental to performance, but in those rare
462cases where that really helps it might be beneficial to use more workers
463(C<$eus>).
464
465Currently, F</proc/cpuinfo> is parsed on GNU/Linux systems for both
466C<$cpus> and C<$eu>, and on {Free,Net,Open}BSD, F<sysctl -n hw.ncpu> is
467used for C<$cpus>.
468
469Example: create a worker pool with as many workers as cpu cores, or C<2>,
470if the actual number could not be determined.
471
472 $fork->AnyEvent::Fork::Pool::run ("myworker::function",
473 max => (scalar AnyEvent::Fork::Pool::ncpu 2),
474 );
475
476=cut
477
478BEGIN {
479 if ($^O eq "linux") {
480 *ncpu = sub(;$) {
481 my ($cpus, $eus);
482
483 if (open my $fh, "<", "/proc/cpuinfo") {
484 my %id;
485
486 while (<$fh>) {
487 if (/^core id\s*:\s*(\d+)/) {
488 ++$eus;
489 undef $id{$1};
490 }
491 }
492
493 $cpus = scalar keys %id;
494 } else {
495 $cpus = $eus = @_ ? shift : 1;
496 }
497 wantarray ? ($cpus, $eus) : $cpus
498 };
499 } elsif ($^O eq "freebsd" || $^O eq "netbsd" || $^O eq "openbsd") {
500 *ncpu = sub(;$) {
501 my $cpus = qx<sysctl -n hw.ncpu> * 1
502 || (@_ ? shift : 1);
503 wantarray ? ($cpus, $cpus) : $cpus
504 };
505 } else {
506 *ncpu = sub(;$) {
507 my $cpus = @_ ? shift : 1;
508 wantarray ? ($cpus, $cpus) : $cpus
509 };
510 }
511}
512
445=back 513=back
446 514
447=head1 CHILD USAGE 515=head1 CHILD USAGE
448 516
449In addition to the L<AnyEvent::Fork::RPC> API, this module implements one 517In addition to the L<AnyEvent::Fork::RPC> API, this module implements one
466deems this useful. For example, after executing a job, one could check 534deems this useful. For example, after executing a job, one could check
467the process size or the number of jobs handled so far, and if either is 535the process size or the number of jobs handled so far, and if either is
468too high, the worker could ask to get retired, to avoid memory leaks to 536too high, the worker could ask to get retired, to avoid memory leaks to
469accumulate. 537accumulate.
470 538
539Example: retire a worker after it has handled roughly 100 requests.
540
541 my $count = 0;
542
543 sub my::worker {
544
545 ++$count == 100
546 and AnyEvent::Fork::Pool::retire ();
547
548 ... normal code goes here
549 }
550
471=back 551=back
472 552
473=head1 POOL PARAMETERS RECIPES 553=head1 POOL PARAMETERS RECIPES
474 554
475This section describes some recipes for pool paramaters. These are mostly 555This section describes some recipes for pool paramaters. These are mostly
513having only one worker is best, for example, when you read or write big 593having only one worker is best, for example, when you read or write big
514files at maixmum speed, as a second worker will increase seek times. 594files at maixmum speed, as a second worker will increase seek times.
515 595
516=back 596=back
517 597
598=head1 EXCEPTIONS
599
600The same "policy" as with L<AnyEvent::Fork::RPC> applies - exceptins will
601not be caught, and exceptions in both worker and in callbacks causes
602undesirable or undefined behaviour.
603
518=head1 SEE ALSO 604=head1 SEE ALSO
519 605
520L<AnyEvent::Fork>, to create the processes in the first place. 606L<AnyEvent::Fork>, to create the processes in the first place.
607
608L<AnyEvent::Fork::Remote>, likewise, but helpful for remote processes.
521 609
522L<AnyEvent::Fork::RPC>, which implements the RPC protocol and API. 610L<AnyEvent::Fork::RPC>, which implements the RPC protocol and API.
523 611
524=head1 AUTHOR AND CONTACT INFORMATION 612=head1 AUTHOR AND CONTACT INFORMATION
525 613

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines