ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-Fork-Pool/Pool.pm
Revision: 1.17
Committed: Thu Oct 27 07:27:56 2022 UTC (18 months, 3 weeks ago) by root
Branch: MAIN
CVS Tags: rel-1_3, HEAD
Changes since 1.16: +1 -1 lines
Log Message:
1.3

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3     AnyEvent::Fork::Pool - simple process pool manager on top of AnyEvent::Fork
4    
5     =head1 SYNOPSIS
6    
7     use AnyEvent;
8 root 1.14 use AnyEvent::Fork;
9 root 1.1 use AnyEvent::Fork::Pool;
10    
11 root 1.6 # all possible parameters shown, with default values
12 root 1.3 my $pool = AnyEvent::Fork
13     ->new
14     ->require ("MyWorker")
15     ->AnyEvent::Fork::Pool::run (
16     "MyWorker::run", # the worker function
17    
18     # pool management
19     max => 4, # absolute maximum # of processes
20 root 1.6 idle => 0, # minimum # of idle processes
21 root 1.3 load => 2, # queue at most this number of jobs per process
22     start => 0.1, # wait this many seconds before starting a new process
23 root 1.6 stop => 10, # wait this many seconds before stopping an idle process
24 root 1.3 on_destroy => (my $finish = AE::cv), # called when object is destroyed
25    
26     # parameters passed to AnyEvent::Fork::RPC
27     async => 0,
28     on_error => sub { die "FATAL: $_[0]\n" },
29     on_event => sub { my @ev = @_ },
30     init => "MyWorker::init",
31     serialiser => $AnyEvent::Fork::RPC::STRING_SERIALISER,
32     );
33 root 1.1
34     for (1..10) {
35 root 1.3 $pool->(doit => $_, sub {
36 root 1.1 print "MyWorker::run returned @_\n";
37     });
38     }
39    
40     undef $pool;
41    
42     $finish->recv;
43    
44     =head1 DESCRIPTION
45    
46 root 1.12 This module uses processes created via L<AnyEvent::Fork> (or
47     L<AnyEvent::Fork::Remote>) and the RPC protocol implement in
48     L<AnyEvent::Fork::RPC> to create a load-balanced pool of processes that
49     handles jobs.
50 root 1.1
51 root 1.16 Understanding L<AnyEvent::Fork> is helpful but not required to use this
52     module, but a thorough understanding of L<AnyEvent::Fork::RPC> is, as
53     it defines the actual API that needs to be implemented in the worker
54     processes.
55 root 1.1
56 root 1.2 =head1 PARENT USAGE
57 root 1.1
58 root 1.6 To create a pool, you first have to create a L<AnyEvent::Fork> object -
59     this object becomes your template process. Whenever a new worker process
60     is needed, it is forked from this template process. Then you need to
61     "hand off" this template process to the C<AnyEvent::Fork::Pool> module by
62     calling its run method on it:
63    
64     my $template = AnyEvent::Fork
65     ->new
66     ->require ("SomeModule", "MyWorkerModule");
67    
68     my $pool = $template->AnyEvent::Fork::Pool::run ("MyWorkerModule::myfunction");
69    
70     The pool "object" is not a regular Perl object, but a code reference that
71     you can call and that works roughly like calling the worker function
72     directly, except that it returns nothing but instead you need to specify a
73     callback to be invoked once results are in:
74    
75     $pool->(1, 2, 3, sub { warn "myfunction(1,2,3) returned @_" });
76    
77 root 1.1 =over 4
78    
79     =cut
80    
81     package AnyEvent::Fork::Pool;
82    
83     use common::sense;
84    
85 root 1.2 use Scalar::Util ();
86    
87 root 1.3 use Guard ();
88 root 1.2 use Array::Heap ();
89 root 1.1
90     use AnyEvent;
91     use AnyEvent::Fork::RPC;
92    
93 root 1.3 # these are used for the first and last argument of events
94     # in the hope of not colliding. yes, I don't like it either,
95     # but didn't come up with an obviously better alternative.
96 root 1.2 my $magic0 = ':t6Z@HK1N%Dx@_7?=~-7NQgWDdAs6a,jFN=wLO0*jD*1%P';
97 root 1.3 my $magic1 = '<~53rexz.U`!]X[A235^"fyEoiTF\T~oH1l/N6+Djep9b~bI9`\1x%B~vWO1q*';
98 root 1.2
99 root 1.17 our $VERSION = 1.3;
100 root 1.1
101 root 1.5 =item my $pool = AnyEvent::Fork::Pool::run $fork, $function, [key => value...]
102 root 1.3
103 root 1.6 The traditional way to call the pool creation function. But it is way
104     cooler to call it in the following way:
105 root 1.3
106 root 1.5 =item my $pool = $fork->AnyEvent::Fork::Pool::run ($function, [key => value...])
107 root 1.2
108     Creates a new pool object with the specified C<$function> as function
109 root 1.3 (name) to call for each request. The pool uses the C<$fork> object as the
110     template when creating worker processes.
111 root 1.2
112     You can supply your own template process, or tell C<AnyEvent::Fork::Pool>
113     to create one.
114    
115     A relatively large number of key/value pairs can be specified to influence
116     the behaviour. They are grouped into the categories "pool management",
117     "template process" and "rpc parameters".
118 root 1.1
119     =over 4
120    
121 root 1.2 =item Pool Management
122    
123     The pool consists of a certain number of worker processes. These options
124     decide how many of these processes exist and when they are started and
125 root 1.5 stopped.
126 root 1.2
127 root 1.6 The worker pool is dynamically resized, according to (perceived :)
128     load. The minimum size is given by the C<idle> parameter and the maximum
129     size is given by the C<max> parameter. A new worker is started every
130     C<start> seconds at most, and an idle worker is stopped at most every
131     C<stop> second.
132    
133     You can specify the amount of jobs sent to a worker concurrently using the
134     C<load> parameter.
135    
136 root 1.2 =over 4
137    
138 root 1.3 =item idle => $count (default: 0)
139 root 1.2
140 root 1.3 The minimum amount of idle processes in the pool - when there are fewer
141     than this many idle workers, C<AnyEvent::Fork::Pool> will try to start new
142 root 1.6 ones, subject to the limits set by C<max> and C<start>.
143 root 1.3
144 root 1.6 This is also the initial amount of workers in the pool. The default of
145     zero means that the pool starts empty and can shrink back to zero workers
146     over time.
147 root 1.2
148     =item max => $count (default: 4)
149    
150     The maximum number of processes in the pool, in addition to the template
151 root 1.6 process. C<AnyEvent::Fork::Pool> will never have more than this number of
152     worker processes, although there can be more temporarily when a worker is
153     shut down and hasn't exited yet.
154 root 1.2
155 root 1.3 =item load => $count (default: 2)
156 root 1.2
157 root 1.6 The maximum number of concurrent jobs sent to a single worker process.
158 root 1.1
159 root 1.2 Jobs that cannot be sent to a worker immediately (because all workers are
160     busy) will be queued until a worker is available.
161 root 1.1
162 root 1.6 Setting this low improves latency. For example, at C<1>, every job that
163     is sent to a worker is sent to a completely idle worker that doesn't run
164     any other jobs. The downside is that throughput is reduced - a worker that
165     finishes a job needs to wait for a new job from the parent.
166    
167     The default of C<2> is usually a good compromise.
168    
169 root 1.3 =item start => $seconds (default: 0.1)
170 root 1.1
171 root 1.6 When there are fewer than C<idle> workers (or all workers are completely
172     busy), then a timer is started. If the timer elapses and there are still
173     jobs that cannot be queued to a worker, a new worker is started.
174    
175     This sets the minimum time that all workers must be busy before a new
176     worker is started. Or, put differently, the minimum delay between starting
177     new workers.
178    
179     The delay is small by default, which means new workers will be started
180     relatively quickly. A delay of C<0> is possible, and ensures that the pool
181     will grow as quickly as possible under load.
182    
183     Non-zero values are useful to avoid "exploding" a pool because a lot of
184     jobs are queued in an instant.
185    
186     Higher values are often useful to improve efficiency at the cost of
187     latency - when fewer processes can do the job over time, starting more and
188     more is not necessarily going to help.
189 root 1.1
190 root 1.6 =item stop => $seconds (default: 10)
191 root 1.1
192 root 1.2 When a worker has no jobs to execute it becomes idle. An idle worker that
193     hasn't executed a job within this amount of time will be stopped, unless
194     the other parameters say otherwise.
195    
196 root 1.6 Setting this to a very high value means that workers stay around longer,
197     even when they have nothing to do, which can be good as they don't have to
198     be started on the netx load spike again.
199    
200     Setting this to a lower value can be useful to avoid memory or simply
201     process table wastage.
202    
203     Usually, setting this to a time longer than the time between load spikes
204     is best - if you expect a lot of requests every minute and little work
205     in between, setting this to longer than a minute avoids having to stop
206     and start workers. On the other hand, you have to ask yourself if letting
207     workers run idle is a good use of your resources. Try to find a good
208     balance between resource usage of your workers and the time to start new
209     workers - the processes created by L<AnyEvent::Fork> itself is fats at
210     creating workers while not using much memory for them, so most of the
211     overhead is likely from your own code.
212    
213 root 1.2 =item on_destroy => $callback->() (default: none)
214    
215 root 1.6 When a pool object goes out of scope, the outstanding requests are still
216     handled till completion. Only after handling all jobs will the workers
217     be destroyed (and also the template process if it isn't referenced
218     otherwise).
219 root 1.2
220 root 1.6 To find out when a pool I<really> has finished its work, you can set this
221     callback, which will be called when the pool has been destroyed.
222 root 1.2
223     =back
224    
225     =item AnyEvent::Fork::RPC Parameters
226    
227 root 1.6 These parameters are all passed more or less directly to
228     L<AnyEvent::Fork::RPC>. They are only briefly mentioned here, for
229     their full documentation please refer to the L<AnyEvent::Fork::RPC>
230     documentation. Also, the default values mentioned here are only documented
231     as a best effort - the L<AnyEvent::Fork::RPC> documentation is binding.
232 root 1.2
233     =over 4
234 root 1.1
235     =item async => $boolean (default: 0)
236    
237 root 1.6 Whether to use the synchronous or asynchronous RPC backend.
238 root 1.1
239 root 1.2 =item on_error => $callback->($message) (default: die with message)
240 root 1.1
241 root 1.2 The callback to call on any (fatal) errors.
242 root 1.1
243 root 1.2 =item on_event => $callback->(...) (default: C<sub { }>, unlike L<AnyEvent::Fork::RPC>)
244 root 1.1
245 root 1.2 The callback to invoke on events.
246 root 1.1
247 root 1.2 =item init => $initfunction (default: none)
248 root 1.1
249 root 1.2 The function to call in the child, once before handling requests.
250 root 1.1
251 root 1.2 =item serialiser => $serialiser (defailt: $AnyEvent::Fork::RPC::STRING_SERIALISER)
252 root 1.1
253 root 1.2 The serialiser to use.
254 root 1.1
255 root 1.2 =back
256 root 1.1
257     =back
258    
259     =cut
260    
261 root 1.3 sub run {
262     my ($template, $function, %arg) = @_;
263    
264     my $max = $arg{max} || 4;
265     my $idle = $arg{idle} || 0,
266     my $load = $arg{load} || 2,
267     my $start = $arg{start} || 0.1,
268 root 1.6 my $stop = $arg{stop} || 10,
269 root 1.3 my $on_event = $arg{on_event} || sub { },
270     my $on_destroy = $arg{on_destroy};
271    
272     my @rpc = (
273 root 1.5 async => $arg{async},
274     init => $arg{init},
275     serialiser => delete $arg{serialiser},
276     on_error => $arg{on_error},
277 root 1.3 );
278    
279     my (@pool, @queue, $nidle, $start_w, $stop_w, $shutdown);
280 root 1.5 my ($start_worker, $stop_worker, $want_start, $want_stop, $scheduler);
281 root 1.3
282     my $destroy_guard = Guard::guard {
283     $on_destroy->()
284     if $on_destroy;
285     };
286 root 1.2
287 root 1.3 $template
288     ->require ("AnyEvent::Fork::RPC::" . ($arg{async} ? "Async" : "Sync"))
289 root 1.2 ->eval ('
290 root 1.3 my ($magic0, $magic1) = @_;
291 root 1.6 sub AnyEvent::Fork::Pool::retire() {
292     AnyEvent::Fork::RPC::event $magic0, "quit", $magic1;
293 root 1.2 }
294 root 1.3 ', $magic0, $magic1)
295 root 1.6 ;
296 root 1.3
297 root 1.5 $start_worker = sub {
298 root 1.3 my $proc = [0, 0, undef]; # load, index, rpc
299 root 1.2
300 root 1.3 $proc->[2] = $template
301     ->fork
302     ->AnyEvent::Fork::RPC::run ($function,
303     @rpc,
304     on_event => sub {
305     if (@_ == 3 && $_[0] eq $magic0 && $_[2] eq $magic1) {
306     $destroy_guard if 0; # keep it alive
307 root 1.2
308 root 1.5 $_[1] eq "quit" and $stop_worker->($proc);
309 root 1.3 return;
310 root 1.2 }
311    
312 root 1.3 &$on_event;
313     },
314     )
315     ;
316    
317     ++$nidle;
318 root 1.5 Array::Heap::push_heap_idx @pool, $proc;
319 root 1.3
320     Scalar::Util::weaken $proc;
321     };
322    
323 root 1.5 $stop_worker = sub {
324 root 1.3 my $proc = shift;
325    
326     $proc->[0]
327     or --$nidle;
328    
329     Array::Heap::splice_heap_idx @pool, $proc->[1]
330     if defined $proc->[1];
331 root 1.7
332     @$proc = 0; # tell others to leave it be
333 root 1.3 };
334    
335     $want_start = sub {
336     undef $stop_w;
337    
338 root 1.5 $start_w ||= AE::timer $start, $start, sub {
339     if (($nidle < $idle || @queue) && @pool < $max) {
340     $start_worker->();
341 root 1.3 $scheduler->();
342 root 1.5 } else {
343     undef $start_w;
344 root 1.3 }
345     };
346     };
347    
348     $want_stop = sub {
349 root 1.5 $stop_w ||= AE::timer $stop, $stop, sub {
350     $stop_worker->($pool[0])
351     if $nidle;
352 root 1.3
353 root 1.5 undef $stop_w
354     if $nidle <= $idle;
355 root 1.3 };
356     };
357    
358     $scheduler = sub {
359     if (@queue) {
360     while (@queue) {
361 root 1.7 @pool or $start_worker->();
362    
363 root 1.3 my $proc = $pool[0];
364    
365     if ($proc->[0] < $load) {
366 root 1.5 # found free worker, increase load
367     unless ($proc->[0]++) {
368     # worker became busy
369     --$nidle
370     or undef $stop_w;
371    
372     $want_start->()
373     if $nidle < $idle && @pool < $max;
374     }
375 root 1.3
376 root 1.5 Array::Heap::adjust_heap_idx @pool, 0;
377 root 1.3
378     my $job = shift @queue;
379     my $ocb = pop @$job;
380    
381     $proc->[2]->(@$job, sub {
382 root 1.5 # reduce load
383     --$proc->[0] # worker still busy?
384     or ++$nidle > $idle # not too many idle processes?
385 root 1.3 or $want_stop->();
386    
387 root 1.5 Array::Heap::adjust_heap_idx @pool, $proc->[1]
388     if defined $proc->[1];
389 root 1.3
390 root 1.7 &$ocb;
391    
392 root 1.3 $scheduler->();
393     });
394     } else {
395 root 1.5 $want_start->()
396     unless @pool >= $max;
397 root 1.3
398     last;
399     }
400     }
401     } elsif ($shutdown) {
402 root 1.15 undef $_->[2]
403     for @pool;
404    
405 root 1.3 undef $start_w;
406 root 1.5 undef $start_worker; # frees $destroy_guard reference
407 root 1.3
408 root 1.5 $stop_worker->($pool[0])
409 root 1.3 while $nidle;
410     }
411     };
412    
413     my $shutdown_guard = Guard::guard {
414     $shutdown = 1;
415     $scheduler->();
416     };
417    
418 root 1.5 $start_worker->()
419 root 1.3 while @pool < $idle;
420    
421     sub {
422     $shutdown_guard if 0; # keep it alive
423 root 1.2
424 root 1.5 $start_worker->()
425 root 1.3 unless @pool;
426    
427     push @queue, [@_];
428     $scheduler->();
429     }
430 root 1.2 }
431    
432 root 1.5 =item $pool->(..., $cb->(...))
433 root 1.2
434     Call the RPC function of a worker with the given arguments, and when the
435 root 1.5 worker is done, call the C<$cb> with the results, just like calling the
436 root 1.6 RPC object durectly - see the L<AnyEvent::Fork::RPC> documentation for
437     details on the RPC API.
438 root 1.2
439 root 1.6 If there is no free worker, the call will be queued until a worker becomes
440     available.
441 root 1.2
442     Note that there can be considerable time between calling this method and
443     the call actually being executed. During this time, the parameters passed
444     to this function are effectively read-only - modifying them after the call
445     and before the callback is invoked causes undefined behaviour.
446    
447     =cut
448 root 1.1
449 root 1.9 =item $cpus = AnyEvent::Fork::Pool::ncpu [$default_cpus]
450    
451     =item ($cpus, $eus) = AnyEvent::Fork::Pool::ncpu [$default_cpus]
452    
453 root 1.14 Tries to detect the number of CPUs (C<$cpus> often called CPU cores
454 root 1.9 nowadays) and execution units (C<$eus>) which include e.g. extra
455     hyperthreaded units). When C<$cpus> cannot be determined reliably,
456     C<$default_cpus> is returned for both values, or C<1> if it is missing.
457    
458     For normal CPU bound uses, it is wise to have as many worker processes
459     as CPUs in the system (C<$cpus>), if nothing else uses the CPU. Using
460     hyperthreading is usually detrimental to performance, but in those rare
461     cases where that really helps it might be beneficial to use more workers
462     (C<$eus>).
463    
464     Currently, F</proc/cpuinfo> is parsed on GNU/Linux systems for both
465 root 1.14 C<$cpus> and C<$eus>, and on {Free,Net,Open}BSD, F<sysctl -n hw.ncpu> is
466 root 1.9 used for C<$cpus>.
467    
468 root 1.14 Example: create a worker pool with as many workers as CPU cores, or C<2>,
469 root 1.9 if the actual number could not be determined.
470    
471     $fork->AnyEvent::Fork::Pool::run ("myworker::function",
472     max => (scalar AnyEvent::Fork::Pool::ncpu 2),
473     );
474    
475     =cut
476    
477     BEGIN {
478     if ($^O eq "linux") {
479     *ncpu = sub(;$) {
480     my ($cpus, $eus);
481    
482     if (open my $fh, "<", "/proc/cpuinfo") {
483     my %id;
484    
485     while (<$fh>) {
486     if (/^core id\s*:\s*(\d+)/) {
487     ++$eus;
488     undef $id{$1};
489     }
490     }
491    
492     $cpus = scalar keys %id;
493     } else {
494     $cpus = $eus = @_ ? shift : 1;
495     }
496     wantarray ? ($cpus, $eus) : $cpus
497     };
498     } elsif ($^O eq "freebsd" || $^O eq "netbsd" || $^O eq "openbsd") {
499     *ncpu = sub(;$) {
500     my $cpus = qx<sysctl -n hw.ncpu> * 1
501     || (@_ ? shift : 1);
502     wantarray ? ($cpus, $cpus) : $cpus
503     };
504     } else {
505     *ncpu = sub(;$) {
506     my $cpus = @_ ? shift : 1;
507     wantarray ? ($cpus, $cpus) : $cpus
508     };
509     }
510     }
511    
512 root 1.1 =back
513    
514 root 1.6 =head1 CHILD USAGE
515    
516     In addition to the L<AnyEvent::Fork::RPC> API, this module implements one
517     more child-side function:
518    
519     =over 4
520    
521     =item AnyEvent::Fork::Pool::retire ()
522    
523     This function sends an event to the parent process to request retirement:
524     the worker is removed from the pool and no new jobs will be sent to it,
525 root 1.14 but it still has to handle the jobs that are already queued.
526 root 1.6
527     The parentheses are part of the syntax: the function usually isn't defined
528     when you compile your code (because that happens I<before> handing the
529     template process over to C<AnyEvent::Fork::Pool::run>, so you need the
530     empty parentheses to tell Perl that the function is indeed a function.
531    
532     Retiring a worker can be useful to gracefully shut it down when the worker
533 root 1.14 deems this useful. For example, after executing a job, it could check the
534     process size or the number of jobs handled so far, and if either is too
535     high, the worker could request to be retired, to avoid memory leaks to
536 root 1.6 accumulate.
537    
538 root 1.14 Example: retire a worker after it has handled roughly 100 requests. It
539     doesn't matter whether you retire at the beginning or end of your request,
540     as the worker will continue to handle some outstanding requests. Likewise,
541     it's ok to call retire multiple times.
542 root 1.10
543     my $count = 0;
544    
545     sub my::worker {
546    
547     ++$count == 100
548     and AnyEvent::Fork::Pool::retire ();
549    
550     ... normal code goes here
551     }
552    
553 root 1.6 =back
554    
555     =head1 POOL PARAMETERS RECIPES
556    
557 root 1.14 This section describes some recipes for pool parameters. These are mostly
558 root 1.6 meant for the synchronous RPC backend, as the asynchronous RPC backend
559     changes the rules considerably, making workers themselves responsible for
560     their scheduling.
561    
562     =over 4
563    
564     =item low latency - set load = 1
565    
566     If you need a deterministic low latency, you should set the C<load>
567     parameter to C<1>. This ensures that never more than one job is sent to
568     each worker. This avoids having to wait for a previous job to finish.
569    
570     This makes most sense with the synchronous (default) backend, as the
571     asynchronous backend can handle multiple requests concurrently.
572    
573     =item lowest latency - set load = 1 and idle = max
574    
575     To achieve the lowest latency, you additionally should disable any dynamic
576     resizing of the pool by setting C<idle> to the same value as C<max>.
577    
578     =item high throughput, cpu bound jobs - set load >= 2, max = #cpus
579    
580     To get high throughput with cpu-bound jobs, you should set the maximum
581     pool size to the number of cpus in your system, and C<load> to at least
582     C<2>, to make sure there can be another job waiting for the worker when it
583     has finished one.
584    
585     The value of C<2> for C<load> is the minimum value that I<can> achieve
586     100% throughput, but if your parent process itself is sometimes busy, you
587     might need higher values. Also there is a limit on the amount of data that
588     can be "in flight" to the worker, so if you send big blobs of data to your
589     worker, C<load> might have much less of an effect.
590    
591     =item high throughput, I/O bound jobs - set load >= 2, max = 1, or very high
592    
593     When your jobs are I/O bound, using more workers usually boils down to
594     higher throughput, depending very much on your actual workload - sometimes
595     having only one worker is best, for example, when you read or write big
596 root 1.14 files at maximum speed, as a second worker will increase seek times.
597 root 1.6
598     =back
599    
600 root 1.7 =head1 EXCEPTIONS
601    
602 root 1.14 The same "policy" as with L<AnyEvent::Fork::RPC> applies - exceptions
603     will not be caught, and exceptions in both worker and in callbacks causes
604 root 1.7 undesirable or undefined behaviour.
605    
606 root 1.1 =head1 SEE ALSO
607    
608     L<AnyEvent::Fork>, to create the processes in the first place.
609    
610 root 1.13 L<AnyEvent::Fork::Remote>, likewise, but helpful for remote processes.
611    
612 root 1.1 L<AnyEvent::Fork::RPC>, which implements the RPC protocol and API.
613    
614     =head1 AUTHOR AND CONTACT INFORMATION
615    
616     Marc Lehmann <schmorp@schmorp.de>
617     http://software.schmorp.de/pkg/AnyEvent-Fork-Pool
618    
619     =cut
620    
621     1
622