ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-Fork-Pool/Pool.pm
Revision: 1.12
Committed: Sun Apr 28 14:26:11 2013 UTC (11 years ago) by root
Branch: MAIN
Changes since 1.11: +4 -7 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3     AnyEvent::Fork::Pool - simple process pool manager on top of AnyEvent::Fork
4    
5     =head1 SYNOPSIS
6    
7     use AnyEvent;
8     use AnyEvent::Fork::Pool;
9     # use AnyEvent::Fork is not needed
10    
11 root 1.6 # all possible parameters shown, with default values
12 root 1.3 my $pool = AnyEvent::Fork
13     ->new
14     ->require ("MyWorker")
15     ->AnyEvent::Fork::Pool::run (
16     "MyWorker::run", # the worker function
17    
18     # pool management
19     max => 4, # absolute maximum # of processes
20 root 1.6 idle => 0, # minimum # of idle processes
21 root 1.3 load => 2, # queue at most this number of jobs per process
22     start => 0.1, # wait this many seconds before starting a new process
23 root 1.6 stop => 10, # wait this many seconds before stopping an idle process
24 root 1.3 on_destroy => (my $finish = AE::cv), # called when object is destroyed
25    
26     # parameters passed to AnyEvent::Fork::RPC
27     async => 0,
28     on_error => sub { die "FATAL: $_[0]\n" },
29     on_event => sub { my @ev = @_ },
30     init => "MyWorker::init",
31     serialiser => $AnyEvent::Fork::RPC::STRING_SERIALISER,
32     );
33 root 1.1
34     for (1..10) {
35 root 1.3 $pool->(doit => $_, sub {
36 root 1.1 print "MyWorker::run returned @_\n";
37     });
38     }
39    
40     undef $pool;
41    
42     $finish->recv;
43    
44     =head1 DESCRIPTION
45    
46 root 1.12 This module uses processes created via L<AnyEvent::Fork> (or
47     L<AnyEvent::Fork::Remote>) and the RPC protocol implement in
48     L<AnyEvent::Fork::RPC> to create a load-balanced pool of processes that
49     handles jobs.
50 root 1.1
51     Understanding of L<AnyEvent::Fork> is helpful but not critical to be able
52     to use this module, but a thorough understanding of L<AnyEvent::Fork::RPC>
53     is, as it defines the actual API that needs to be implemented in the
54 root 1.6 worker processes.
55 root 1.1
56 root 1.2 =head1 PARENT USAGE
57 root 1.1
58 root 1.6 To create a pool, you first have to create a L<AnyEvent::Fork> object -
59     this object becomes your template process. Whenever a new worker process
60     is needed, it is forked from this template process. Then you need to
61     "hand off" this template process to the C<AnyEvent::Fork::Pool> module by
62     calling its run method on it:
63    
64     my $template = AnyEvent::Fork
65     ->new
66     ->require ("SomeModule", "MyWorkerModule");
67    
68     my $pool = $template->AnyEvent::Fork::Pool::run ("MyWorkerModule::myfunction");
69    
70     The pool "object" is not a regular Perl object, but a code reference that
71     you can call and that works roughly like calling the worker function
72     directly, except that it returns nothing but instead you need to specify a
73     callback to be invoked once results are in:
74    
75     $pool->(1, 2, 3, sub { warn "myfunction(1,2,3) returned @_" });
76    
77 root 1.1 =over 4
78    
79     =cut
80    
81     package AnyEvent::Fork::Pool;
82    
83     use common::sense;
84    
85 root 1.2 use Scalar::Util ();
86    
87 root 1.3 use Guard ();
88 root 1.2 use Array::Heap ();
89 root 1.1
90     use AnyEvent;
91 root 1.9 # explicit version on next line, as some cpan-testers test with the 0.1 version,
92     # ignoring dependencies, and this line will at least give a clear indication of that.
93     use AnyEvent::Fork 0.6; # we don't actually depend on it, this is for convenience
94 root 1.1 use AnyEvent::Fork::RPC;
95    
96 root 1.3 # these are used for the first and last argument of events
97     # in the hope of not colliding. yes, I don't like it either,
98     # but didn't come up with an obviously better alternative.
99 root 1.2 my $magic0 = ':t6Z@HK1N%Dx@_7?=~-7NQgWDdAs6a,jFN=wLO0*jD*1%P';
100 root 1.3 my $magic1 = '<~53rexz.U`!]X[A235^"fyEoiTF\T~oH1l/N6+Djep9b~bI9`\1x%B~vWO1q*';
101 root 1.2
102 root 1.11 our $VERSION = 1.1;
103 root 1.1
104 root 1.5 =item my $pool = AnyEvent::Fork::Pool::run $fork, $function, [key => value...]
105 root 1.3
106 root 1.6 The traditional way to call the pool creation function. But it is way
107     cooler to call it in the following way:
108 root 1.3
109 root 1.5 =item my $pool = $fork->AnyEvent::Fork::Pool::run ($function, [key => value...])
110 root 1.2
111     Creates a new pool object with the specified C<$function> as function
112 root 1.3 (name) to call for each request. The pool uses the C<$fork> object as the
113     template when creating worker processes.
114 root 1.2
115     You can supply your own template process, or tell C<AnyEvent::Fork::Pool>
116     to create one.
117    
118     A relatively large number of key/value pairs can be specified to influence
119     the behaviour. They are grouped into the categories "pool management",
120     "template process" and "rpc parameters".
121 root 1.1
122     =over 4
123    
124 root 1.2 =item Pool Management
125    
126     The pool consists of a certain number of worker processes. These options
127     decide how many of these processes exist and when they are started and
128 root 1.5 stopped.
129 root 1.2
130 root 1.6 The worker pool is dynamically resized, according to (perceived :)
131     load. The minimum size is given by the C<idle> parameter and the maximum
132     size is given by the C<max> parameter. A new worker is started every
133     C<start> seconds at most, and an idle worker is stopped at most every
134     C<stop> second.
135    
136     You can specify the amount of jobs sent to a worker concurrently using the
137     C<load> parameter.
138    
139 root 1.2 =over 4
140    
141 root 1.3 =item idle => $count (default: 0)
142 root 1.2
143 root 1.3 The minimum amount of idle processes in the pool - when there are fewer
144     than this many idle workers, C<AnyEvent::Fork::Pool> will try to start new
145 root 1.6 ones, subject to the limits set by C<max> and C<start>.
146 root 1.3
147 root 1.6 This is also the initial amount of workers in the pool. The default of
148     zero means that the pool starts empty and can shrink back to zero workers
149     over time.
150 root 1.2
151     =item max => $count (default: 4)
152    
153     The maximum number of processes in the pool, in addition to the template
154 root 1.6 process. C<AnyEvent::Fork::Pool> will never have more than this number of
155     worker processes, although there can be more temporarily when a worker is
156     shut down and hasn't exited yet.
157 root 1.2
158 root 1.3 =item load => $count (default: 2)
159 root 1.2
160 root 1.6 The maximum number of concurrent jobs sent to a single worker process.
161 root 1.1
162 root 1.2 Jobs that cannot be sent to a worker immediately (because all workers are
163     busy) will be queued until a worker is available.
164 root 1.1
165 root 1.6 Setting this low improves latency. For example, at C<1>, every job that
166     is sent to a worker is sent to a completely idle worker that doesn't run
167     any other jobs. The downside is that throughput is reduced - a worker that
168     finishes a job needs to wait for a new job from the parent.
169    
170     The default of C<2> is usually a good compromise.
171    
172 root 1.3 =item start => $seconds (default: 0.1)
173 root 1.1
174 root 1.6 When there are fewer than C<idle> workers (or all workers are completely
175     busy), then a timer is started. If the timer elapses and there are still
176     jobs that cannot be queued to a worker, a new worker is started.
177    
178     This sets the minimum time that all workers must be busy before a new
179     worker is started. Or, put differently, the minimum delay between starting
180     new workers.
181    
182     The delay is small by default, which means new workers will be started
183     relatively quickly. A delay of C<0> is possible, and ensures that the pool
184     will grow as quickly as possible under load.
185    
186     Non-zero values are useful to avoid "exploding" a pool because a lot of
187     jobs are queued in an instant.
188    
189     Higher values are often useful to improve efficiency at the cost of
190     latency - when fewer processes can do the job over time, starting more and
191     more is not necessarily going to help.
192 root 1.1
193 root 1.6 =item stop => $seconds (default: 10)
194 root 1.1
195 root 1.2 When a worker has no jobs to execute it becomes idle. An idle worker that
196     hasn't executed a job within this amount of time will be stopped, unless
197     the other parameters say otherwise.
198    
199 root 1.6 Setting this to a very high value means that workers stay around longer,
200     even when they have nothing to do, which can be good as they don't have to
201     be started on the netx load spike again.
202    
203     Setting this to a lower value can be useful to avoid memory or simply
204     process table wastage.
205    
206     Usually, setting this to a time longer than the time between load spikes
207     is best - if you expect a lot of requests every minute and little work
208     in between, setting this to longer than a minute avoids having to stop
209     and start workers. On the other hand, you have to ask yourself if letting
210     workers run idle is a good use of your resources. Try to find a good
211     balance between resource usage of your workers and the time to start new
212     workers - the processes created by L<AnyEvent::Fork> itself is fats at
213     creating workers while not using much memory for them, so most of the
214     overhead is likely from your own code.
215    
216 root 1.2 =item on_destroy => $callback->() (default: none)
217    
218 root 1.6 When a pool object goes out of scope, the outstanding requests are still
219     handled till completion. Only after handling all jobs will the workers
220     be destroyed (and also the template process if it isn't referenced
221     otherwise).
222 root 1.2
223 root 1.6 To find out when a pool I<really> has finished its work, you can set this
224     callback, which will be called when the pool has been destroyed.
225 root 1.2
226     =back
227    
228     =item AnyEvent::Fork::RPC Parameters
229    
230 root 1.6 These parameters are all passed more or less directly to
231     L<AnyEvent::Fork::RPC>. They are only briefly mentioned here, for
232     their full documentation please refer to the L<AnyEvent::Fork::RPC>
233     documentation. Also, the default values mentioned here are only documented
234     as a best effort - the L<AnyEvent::Fork::RPC> documentation is binding.
235 root 1.2
236     =over 4
237 root 1.1
238     =item async => $boolean (default: 0)
239    
240 root 1.6 Whether to use the synchronous or asynchronous RPC backend.
241 root 1.1
242 root 1.2 =item on_error => $callback->($message) (default: die with message)
243 root 1.1
244 root 1.2 The callback to call on any (fatal) errors.
245 root 1.1
246 root 1.2 =item on_event => $callback->(...) (default: C<sub { }>, unlike L<AnyEvent::Fork::RPC>)
247 root 1.1
248 root 1.2 The callback to invoke on events.
249 root 1.1
250 root 1.2 =item init => $initfunction (default: none)
251 root 1.1
252 root 1.2 The function to call in the child, once before handling requests.
253 root 1.1
254 root 1.2 =item serialiser => $serialiser (defailt: $AnyEvent::Fork::RPC::STRING_SERIALISER)
255 root 1.1
256 root 1.2 The serialiser to use.
257 root 1.1
258 root 1.2 =back
259 root 1.1
260     =back
261    
262     =cut
263    
264 root 1.3 sub run {
265     my ($template, $function, %arg) = @_;
266    
267     my $max = $arg{max} || 4;
268     my $idle = $arg{idle} || 0,
269     my $load = $arg{load} || 2,
270     my $start = $arg{start} || 0.1,
271 root 1.6 my $stop = $arg{stop} || 10,
272 root 1.3 my $on_event = $arg{on_event} || sub { },
273     my $on_destroy = $arg{on_destroy};
274    
275     my @rpc = (
276 root 1.5 async => $arg{async},
277     init => $arg{init},
278     serialiser => delete $arg{serialiser},
279     on_error => $arg{on_error},
280 root 1.3 );
281    
282     my (@pool, @queue, $nidle, $start_w, $stop_w, $shutdown);
283 root 1.5 my ($start_worker, $stop_worker, $want_start, $want_stop, $scheduler);
284 root 1.3
285     my $destroy_guard = Guard::guard {
286     $on_destroy->()
287     if $on_destroy;
288     };
289 root 1.2
290 root 1.3 $template
291     ->require ("AnyEvent::Fork::RPC::" . ($arg{async} ? "Async" : "Sync"))
292 root 1.2 ->eval ('
293 root 1.3 my ($magic0, $magic1) = @_;
294 root 1.6 sub AnyEvent::Fork::Pool::retire() {
295     AnyEvent::Fork::RPC::event $magic0, "quit", $magic1;
296 root 1.2 }
297 root 1.3 ', $magic0, $magic1)
298 root 1.6 ;
299 root 1.3
300 root 1.5 $start_worker = sub {
301 root 1.3 my $proc = [0, 0, undef]; # load, index, rpc
302 root 1.2
303 root 1.3 $proc->[2] = $template
304     ->fork
305     ->AnyEvent::Fork::RPC::run ($function,
306     @rpc,
307     on_event => sub {
308     if (@_ == 3 && $_[0] eq $magic0 && $_[2] eq $magic1) {
309     $destroy_guard if 0; # keep it alive
310 root 1.2
311 root 1.5 $_[1] eq "quit" and $stop_worker->($proc);
312 root 1.3 return;
313 root 1.2 }
314    
315 root 1.3 &$on_event;
316     },
317     )
318     ;
319    
320     ++$nidle;
321 root 1.5 Array::Heap::push_heap_idx @pool, $proc;
322 root 1.3
323     Scalar::Util::weaken $proc;
324     };
325    
326 root 1.5 $stop_worker = sub {
327 root 1.3 my $proc = shift;
328    
329     $proc->[0]
330     or --$nidle;
331    
332     Array::Heap::splice_heap_idx @pool, $proc->[1]
333     if defined $proc->[1];
334 root 1.7
335     @$proc = 0; # tell others to leave it be
336 root 1.3 };
337    
338     $want_start = sub {
339     undef $stop_w;
340    
341 root 1.5 $start_w ||= AE::timer $start, $start, sub {
342     if (($nidle < $idle || @queue) && @pool < $max) {
343     $start_worker->();
344 root 1.3 $scheduler->();
345 root 1.5 } else {
346     undef $start_w;
347 root 1.3 }
348     };
349     };
350    
351     $want_stop = sub {
352 root 1.5 $stop_w ||= AE::timer $stop, $stop, sub {
353     $stop_worker->($pool[0])
354     if $nidle;
355 root 1.3
356 root 1.5 undef $stop_w
357     if $nidle <= $idle;
358 root 1.3 };
359     };
360    
361     $scheduler = sub {
362     if (@queue) {
363     while (@queue) {
364 root 1.7 @pool or $start_worker->();
365    
366 root 1.3 my $proc = $pool[0];
367    
368     if ($proc->[0] < $load) {
369 root 1.5 # found free worker, increase load
370     unless ($proc->[0]++) {
371     # worker became busy
372     --$nidle
373     or undef $stop_w;
374    
375     $want_start->()
376     if $nidle < $idle && @pool < $max;
377     }
378 root 1.3
379 root 1.5 Array::Heap::adjust_heap_idx @pool, 0;
380 root 1.3
381     my $job = shift @queue;
382     my $ocb = pop @$job;
383    
384     $proc->[2]->(@$job, sub {
385 root 1.5 # reduce load
386     --$proc->[0] # worker still busy?
387     or ++$nidle > $idle # not too many idle processes?
388 root 1.3 or $want_stop->();
389    
390 root 1.5 Array::Heap::adjust_heap_idx @pool, $proc->[1]
391     if defined $proc->[1];
392 root 1.3
393 root 1.7 &$ocb;
394    
395 root 1.3 $scheduler->();
396     });
397     } else {
398 root 1.5 $want_start->()
399     unless @pool >= $max;
400 root 1.3
401     last;
402     }
403     }
404     } elsif ($shutdown) {
405     @pool = ();
406     undef $start_w;
407 root 1.5 undef $start_worker; # frees $destroy_guard reference
408 root 1.3
409 root 1.5 $stop_worker->($pool[0])
410 root 1.3 while $nidle;
411     }
412     };
413    
414     my $shutdown_guard = Guard::guard {
415     $shutdown = 1;
416     $scheduler->();
417     };
418    
419 root 1.5 $start_worker->()
420 root 1.3 while @pool < $idle;
421    
422     sub {
423     $shutdown_guard if 0; # keep it alive
424 root 1.2
425 root 1.5 $start_worker->()
426 root 1.3 unless @pool;
427    
428     push @queue, [@_];
429     $scheduler->();
430     }
431 root 1.2 }
432    
433 root 1.5 =item $pool->(..., $cb->(...))
434 root 1.2
435     Call the RPC function of a worker with the given arguments, and when the
436 root 1.5 worker is done, call the C<$cb> with the results, just like calling the
437 root 1.6 RPC object durectly - see the L<AnyEvent::Fork::RPC> documentation for
438     details on the RPC API.
439 root 1.2
440 root 1.6 If there is no free worker, the call will be queued until a worker becomes
441     available.
442 root 1.2
443     Note that there can be considerable time between calling this method and
444     the call actually being executed. During this time, the parameters passed
445     to this function are effectively read-only - modifying them after the call
446     and before the callback is invoked causes undefined behaviour.
447    
448     =cut
449 root 1.1
450 root 1.9 =item $cpus = AnyEvent::Fork::Pool::ncpu [$default_cpus]
451    
452     =item ($cpus, $eus) = AnyEvent::Fork::Pool::ncpu [$default_cpus]
453    
454     Tries to detect the number of CPUs (C<$cpus> often called cpu cores
455     nowadays) and execution units (C<$eus>) which include e.g. extra
456     hyperthreaded units). When C<$cpus> cannot be determined reliably,
457     C<$default_cpus> is returned for both values, or C<1> if it is missing.
458    
459     For normal CPU bound uses, it is wise to have as many worker processes
460     as CPUs in the system (C<$cpus>), if nothing else uses the CPU. Using
461     hyperthreading is usually detrimental to performance, but in those rare
462     cases where that really helps it might be beneficial to use more workers
463     (C<$eus>).
464    
465     Currently, F</proc/cpuinfo> is parsed on GNU/Linux systems for both
466     C<$cpus> and C<$eu>, and on {Free,Net,Open}BSD, F<sysctl -n hw.ncpu> is
467     used for C<$cpus>.
468    
469     Example: create a worker pool with as many workers as cpu cores, or C<2>,
470     if the actual number could not be determined.
471    
472     $fork->AnyEvent::Fork::Pool::run ("myworker::function",
473     max => (scalar AnyEvent::Fork::Pool::ncpu 2),
474     );
475    
476     =cut
477    
478     BEGIN {
479     if ($^O eq "linux") {
480     *ncpu = sub(;$) {
481     my ($cpus, $eus);
482    
483     if (open my $fh, "<", "/proc/cpuinfo") {
484     my %id;
485    
486     while (<$fh>) {
487     if (/^core id\s*:\s*(\d+)/) {
488     ++$eus;
489     undef $id{$1};
490     }
491     }
492    
493     $cpus = scalar keys %id;
494     } else {
495     $cpus = $eus = @_ ? shift : 1;
496     }
497     wantarray ? ($cpus, $eus) : $cpus
498     };
499     } elsif ($^O eq "freebsd" || $^O eq "netbsd" || $^O eq "openbsd") {
500     *ncpu = sub(;$) {
501     my $cpus = qx<sysctl -n hw.ncpu> * 1
502     || (@_ ? shift : 1);
503     wantarray ? ($cpus, $cpus) : $cpus
504     };
505     } else {
506     *ncpu = sub(;$) {
507     my $cpus = @_ ? shift : 1;
508     wantarray ? ($cpus, $cpus) : $cpus
509     };
510     }
511     }
512    
513 root 1.1 =back
514    
515 root 1.6 =head1 CHILD USAGE
516    
517     In addition to the L<AnyEvent::Fork::RPC> API, this module implements one
518     more child-side function:
519    
520     =over 4
521    
522     =item AnyEvent::Fork::Pool::retire ()
523    
524     This function sends an event to the parent process to request retirement:
525     the worker is removed from the pool and no new jobs will be sent to it,
526     but it has to handle the jobs that are already queued.
527    
528     The parentheses are part of the syntax: the function usually isn't defined
529     when you compile your code (because that happens I<before> handing the
530     template process over to C<AnyEvent::Fork::Pool::run>, so you need the
531     empty parentheses to tell Perl that the function is indeed a function.
532    
533     Retiring a worker can be useful to gracefully shut it down when the worker
534     deems this useful. For example, after executing a job, one could check
535     the process size or the number of jobs handled so far, and if either is
536     too high, the worker could ask to get retired, to avoid memory leaks to
537     accumulate.
538    
539 root 1.10 Example: retire a worker after it has handled roughly 100 requests.
540    
541     my $count = 0;
542    
543     sub my::worker {
544    
545     ++$count == 100
546     and AnyEvent::Fork::Pool::retire ();
547    
548     ... normal code goes here
549     }
550    
551 root 1.6 =back
552    
553     =head1 POOL PARAMETERS RECIPES
554    
555     This section describes some recipes for pool paramaters. These are mostly
556     meant for the synchronous RPC backend, as the asynchronous RPC backend
557     changes the rules considerably, making workers themselves responsible for
558     their scheduling.
559    
560     =over 4
561    
562     =item low latency - set load = 1
563    
564     If you need a deterministic low latency, you should set the C<load>
565     parameter to C<1>. This ensures that never more than one job is sent to
566     each worker. This avoids having to wait for a previous job to finish.
567    
568     This makes most sense with the synchronous (default) backend, as the
569     asynchronous backend can handle multiple requests concurrently.
570    
571     =item lowest latency - set load = 1 and idle = max
572    
573     To achieve the lowest latency, you additionally should disable any dynamic
574     resizing of the pool by setting C<idle> to the same value as C<max>.
575    
576     =item high throughput, cpu bound jobs - set load >= 2, max = #cpus
577    
578     To get high throughput with cpu-bound jobs, you should set the maximum
579     pool size to the number of cpus in your system, and C<load> to at least
580     C<2>, to make sure there can be another job waiting for the worker when it
581     has finished one.
582    
583     The value of C<2> for C<load> is the minimum value that I<can> achieve
584     100% throughput, but if your parent process itself is sometimes busy, you
585     might need higher values. Also there is a limit on the amount of data that
586     can be "in flight" to the worker, so if you send big blobs of data to your
587     worker, C<load> might have much less of an effect.
588    
589     =item high throughput, I/O bound jobs - set load >= 2, max = 1, or very high
590    
591     When your jobs are I/O bound, using more workers usually boils down to
592     higher throughput, depending very much on your actual workload - sometimes
593     having only one worker is best, for example, when you read or write big
594     files at maixmum speed, as a second worker will increase seek times.
595    
596     =back
597    
598 root 1.7 =head1 EXCEPTIONS
599    
600     The same "policy" as with L<AnyEvent::Fork::RPC> applies - exceptins will
601     not be caught, and exceptions in both worker and in callbacks causes
602     undesirable or undefined behaviour.
603    
604 root 1.1 =head1 SEE ALSO
605    
606     L<AnyEvent::Fork>, to create the processes in the first place.
607    
608     L<AnyEvent::Fork::RPC>, which implements the RPC protocol and API.
609    
610     =head1 AUTHOR AND CONTACT INFORMATION
611    
612     Marc Lehmann <schmorp@schmorp.de>
613     http://software.schmorp.de/pkg/AnyEvent-Fork-Pool
614    
615     =cut
616    
617     1
618