ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-Fork-Pool/Pool.pm
Revision: 1.6
Committed: Sun Apr 21 11:17:02 2013 UTC (11 years ago) by root
Branch: MAIN
Changes since 1.5: +176 -73 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3     AnyEvent::Fork::Pool - simple process pool manager on top of AnyEvent::Fork
4    
5     =head1 SYNOPSIS
6    
7     use AnyEvent;
8     use AnyEvent::Fork::Pool;
9     # use AnyEvent::Fork is not needed
10    
11 root 1.6 # all possible parameters shown, with default values
12 root 1.3 my $pool = AnyEvent::Fork
13     ->new
14     ->require ("MyWorker")
15     ->AnyEvent::Fork::Pool::run (
16     "MyWorker::run", # the worker function
17    
18     # pool management
19     max => 4, # absolute maximum # of processes
20 root 1.6 idle => 0, # minimum # of idle processes
21 root 1.3 load => 2, # queue at most this number of jobs per process
22     start => 0.1, # wait this many seconds before starting a new process
23 root 1.6 stop => 10, # wait this many seconds before stopping an idle process
24 root 1.3 on_destroy => (my $finish = AE::cv), # called when object is destroyed
25    
26     # parameters passed to AnyEvent::Fork::RPC
27     async => 0,
28     on_error => sub { die "FATAL: $_[0]\n" },
29     on_event => sub { my @ev = @_ },
30     init => "MyWorker::init",
31     serialiser => $AnyEvent::Fork::RPC::STRING_SERIALISER,
32     );
33 root 1.1
34     for (1..10) {
35 root 1.3 $pool->(doit => $_, sub {
36 root 1.1 print "MyWorker::run returned @_\n";
37     });
38     }
39    
40     undef $pool;
41    
42     $finish->recv;
43    
44     =head1 DESCRIPTION
45    
46     This module uses processes created via L<AnyEvent::Fork> and the RPC
47     protocol implement in L<AnyEvent::Fork::RPC> to create a load-balanced
48     pool of processes that handles jobs.
49    
50     Understanding of L<AnyEvent::Fork> is helpful but not critical to be able
51     to use this module, but a thorough understanding of L<AnyEvent::Fork::RPC>
52     is, as it defines the actual API that needs to be implemented in the
53 root 1.6 worker processes.
54 root 1.1
55     =head1 EXAMPLES
56    
57 root 1.2 =head1 PARENT USAGE
58 root 1.1
59 root 1.6 To create a pool, you first have to create a L<AnyEvent::Fork> object -
60     this object becomes your template process. Whenever a new worker process
61     is needed, it is forked from this template process. Then you need to
62     "hand off" this template process to the C<AnyEvent::Fork::Pool> module by
63     calling its run method on it:
64    
65     my $template = AnyEvent::Fork
66     ->new
67     ->require ("SomeModule", "MyWorkerModule");
68    
69     my $pool = $template->AnyEvent::Fork::Pool::run ("MyWorkerModule::myfunction");
70    
71     The pool "object" is not a regular Perl object, but a code reference that
72     you can call and that works roughly like calling the worker function
73     directly, except that it returns nothing but instead you need to specify a
74     callback to be invoked once results are in:
75    
76     $pool->(1, 2, 3, sub { warn "myfunction(1,2,3) returned @_" });
77    
78 root 1.1 =over 4
79    
80     =cut
81    
82     package AnyEvent::Fork::Pool;
83    
84     use common::sense;
85    
86 root 1.2 use Scalar::Util ();
87    
88 root 1.3 use Guard ();
89 root 1.2 use Array::Heap ();
90 root 1.1
91     use AnyEvent;
92     use AnyEvent::Fork; # we don't actually depend on it, this is for convenience
93     use AnyEvent::Fork::RPC;
94    
95 root 1.3 # these are used for the first and last argument of events
96     # in the hope of not colliding. yes, I don't like it either,
97     # but didn't come up with an obviously better alternative.
98 root 1.2 my $magic0 = ':t6Z@HK1N%Dx@_7?=~-7NQgWDdAs6a,jFN=wLO0*jD*1%P';
99 root 1.3 my $magic1 = '<~53rexz.U`!]X[A235^"fyEoiTF\T~oH1l/N6+Djep9b~bI9`\1x%B~vWO1q*';
100 root 1.2
101 root 1.1 our $VERSION = 0.1;
102    
103 root 1.5 =item my $pool = AnyEvent::Fork::Pool::run $fork, $function, [key => value...]
104 root 1.3
105 root 1.6 The traditional way to call the pool creation function. But it is way
106     cooler to call it in the following way:
107 root 1.3
108 root 1.5 =item my $pool = $fork->AnyEvent::Fork::Pool::run ($function, [key => value...])
109 root 1.2
110     Creates a new pool object with the specified C<$function> as function
111 root 1.3 (name) to call for each request. The pool uses the C<$fork> object as the
112     template when creating worker processes.
113 root 1.2
114     You can supply your own template process, or tell C<AnyEvent::Fork::Pool>
115     to create one.
116    
117     A relatively large number of key/value pairs can be specified to influence
118     the behaviour. They are grouped into the categories "pool management",
119     "template process" and "rpc parameters".
120 root 1.1
121     =over 4
122    
123 root 1.2 =item Pool Management
124    
125     The pool consists of a certain number of worker processes. These options
126     decide how many of these processes exist and when they are started and
127 root 1.5 stopped.
128 root 1.2
129 root 1.6 The worker pool is dynamically resized, according to (perceived :)
130     load. The minimum size is given by the C<idle> parameter and the maximum
131     size is given by the C<max> parameter. A new worker is started every
132     C<start> seconds at most, and an idle worker is stopped at most every
133     C<stop> second.
134    
135     You can specify the amount of jobs sent to a worker concurrently using the
136     C<load> parameter.
137    
138 root 1.2 =over 4
139    
140 root 1.3 =item idle => $count (default: 0)
141 root 1.2
142 root 1.3 The minimum amount of idle processes in the pool - when there are fewer
143     than this many idle workers, C<AnyEvent::Fork::Pool> will try to start new
144 root 1.6 ones, subject to the limits set by C<max> and C<start>.
145 root 1.3
146 root 1.6 This is also the initial amount of workers in the pool. The default of
147     zero means that the pool starts empty and can shrink back to zero workers
148     over time.
149 root 1.2
150     =item max => $count (default: 4)
151    
152     The maximum number of processes in the pool, in addition to the template
153 root 1.6 process. C<AnyEvent::Fork::Pool> will never have more than this number of
154     worker processes, although there can be more temporarily when a worker is
155     shut down and hasn't exited yet.
156 root 1.2
157 root 1.3 =item load => $count (default: 2)
158 root 1.2
159 root 1.6 The maximum number of concurrent jobs sent to a single worker process.
160 root 1.1
161 root 1.2 Jobs that cannot be sent to a worker immediately (because all workers are
162     busy) will be queued until a worker is available.
163 root 1.1
164 root 1.6 Setting this low improves latency. For example, at C<1>, every job that
165     is sent to a worker is sent to a completely idle worker that doesn't run
166     any other jobs. The downside is that throughput is reduced - a worker that
167     finishes a job needs to wait for a new job from the parent.
168    
169     The default of C<2> is usually a good compromise.
170    
171 root 1.3 =item start => $seconds (default: 0.1)
172 root 1.1
173 root 1.6 When there are fewer than C<idle> workers (or all workers are completely
174     busy), then a timer is started. If the timer elapses and there are still
175     jobs that cannot be queued to a worker, a new worker is started.
176    
177     This sets the minimum time that all workers must be busy before a new
178     worker is started. Or, put differently, the minimum delay between starting
179     new workers.
180    
181     The delay is small by default, which means new workers will be started
182     relatively quickly. A delay of C<0> is possible, and ensures that the pool
183     will grow as quickly as possible under load.
184    
185     Non-zero values are useful to avoid "exploding" a pool because a lot of
186     jobs are queued in an instant.
187    
188     Higher values are often useful to improve efficiency at the cost of
189     latency - when fewer processes can do the job over time, starting more and
190     more is not necessarily going to help.
191 root 1.1
192 root 1.6 =item stop => $seconds (default: 10)
193 root 1.1
194 root 1.2 When a worker has no jobs to execute it becomes idle. An idle worker that
195     hasn't executed a job within this amount of time will be stopped, unless
196     the other parameters say otherwise.
197    
198 root 1.6 Setting this to a very high value means that workers stay around longer,
199     even when they have nothing to do, which can be good as they don't have to
200     be started on the netx load spike again.
201    
202     Setting this to a lower value can be useful to avoid memory or simply
203     process table wastage.
204    
205     Usually, setting this to a time longer than the time between load spikes
206     is best - if you expect a lot of requests every minute and little work
207     in between, setting this to longer than a minute avoids having to stop
208     and start workers. On the other hand, you have to ask yourself if letting
209     workers run idle is a good use of your resources. Try to find a good
210     balance between resource usage of your workers and the time to start new
211     workers - the processes created by L<AnyEvent::Fork> itself is fats at
212     creating workers while not using much memory for them, so most of the
213     overhead is likely from your own code.
214    
215 root 1.2 =item on_destroy => $callback->() (default: none)
216    
217 root 1.6 When a pool object goes out of scope, the outstanding requests are still
218     handled till completion. Only after handling all jobs will the workers
219     be destroyed (and also the template process if it isn't referenced
220     otherwise).
221 root 1.2
222 root 1.6 To find out when a pool I<really> has finished its work, you can set this
223     callback, which will be called when the pool has been destroyed.
224 root 1.2
225     =back
226    
227     =item AnyEvent::Fork::RPC Parameters
228    
229 root 1.6 These parameters are all passed more or less directly to
230     L<AnyEvent::Fork::RPC>. They are only briefly mentioned here, for
231     their full documentation please refer to the L<AnyEvent::Fork::RPC>
232     documentation. Also, the default values mentioned here are only documented
233     as a best effort - the L<AnyEvent::Fork::RPC> documentation is binding.
234 root 1.2
235     =over 4
236 root 1.1
237     =item async => $boolean (default: 0)
238    
239 root 1.6 Whether to use the synchronous or asynchronous RPC backend.
240 root 1.1
241 root 1.2 =item on_error => $callback->($message) (default: die with message)
242 root 1.1
243 root 1.2 The callback to call on any (fatal) errors.
244 root 1.1
245 root 1.2 =item on_event => $callback->(...) (default: C<sub { }>, unlike L<AnyEvent::Fork::RPC>)
246 root 1.1
247 root 1.2 The callback to invoke on events.
248 root 1.1
249 root 1.2 =item init => $initfunction (default: none)
250 root 1.1
251 root 1.2 The function to call in the child, once before handling requests.
252 root 1.1
253 root 1.2 =item serialiser => $serialiser (defailt: $AnyEvent::Fork::RPC::STRING_SERIALISER)
254 root 1.1
255 root 1.2 The serialiser to use.
256 root 1.1
257 root 1.2 =back
258 root 1.1
259     =back
260    
261     =cut
262    
263 root 1.3 sub run {
264     my ($template, $function, %arg) = @_;
265    
266     my $max = $arg{max} || 4;
267     my $idle = $arg{idle} || 0,
268     my $load = $arg{load} || 2,
269     my $start = $arg{start} || 0.1,
270 root 1.6 my $stop = $arg{stop} || 10,
271 root 1.3 my $on_event = $arg{on_event} || sub { },
272     my $on_destroy = $arg{on_destroy};
273    
274     my @rpc = (
275 root 1.5 async => $arg{async},
276     init => $arg{init},
277     serialiser => delete $arg{serialiser},
278     on_error => $arg{on_error},
279 root 1.3 );
280    
281     my (@pool, @queue, $nidle, $start_w, $stop_w, $shutdown);
282 root 1.5 my ($start_worker, $stop_worker, $want_start, $want_stop, $scheduler);
283 root 1.3
284     my $destroy_guard = Guard::guard {
285     $on_destroy->()
286     if $on_destroy;
287     };
288 root 1.2
289 root 1.3 $template
290     ->require ("AnyEvent::Fork::RPC::" . ($arg{async} ? "Async" : "Sync"))
291 root 1.2 ->eval ('
292 root 1.3 my ($magic0, $magic1) = @_;
293 root 1.6 sub AnyEvent::Fork::Pool::retire() {
294     AnyEvent::Fork::RPC::event $magic0, "quit", $magic1;
295 root 1.2 }
296 root 1.3 ', $magic0, $magic1)
297 root 1.6 ;
298 root 1.3
299 root 1.5 $start_worker = sub {
300 root 1.3 my $proc = [0, 0, undef]; # load, index, rpc
301 root 1.2
302 root 1.3 $proc->[2] = $template
303     ->fork
304     ->AnyEvent::Fork::RPC::run ($function,
305     @rpc,
306     on_event => sub {
307     if (@_ == 3 && $_[0] eq $magic0 && $_[2] eq $magic1) {
308     $destroy_guard if 0; # keep it alive
309 root 1.2
310 root 1.5 $_[1] eq "quit" and $stop_worker->($proc);
311 root 1.3 return;
312 root 1.2 }
313    
314 root 1.3 &$on_event;
315     },
316     )
317     ;
318    
319     ++$nidle;
320 root 1.5 Array::Heap::push_heap_idx @pool, $proc;
321 root 1.3
322     Scalar::Util::weaken $proc;
323     };
324    
325 root 1.5 $stop_worker = sub {
326 root 1.3 my $proc = shift;
327    
328     $proc->[0]
329     or --$nidle;
330    
331     Array::Heap::splice_heap_idx @pool, $proc->[1]
332     if defined $proc->[1];
333     };
334    
335     $want_start = sub {
336     undef $stop_w;
337    
338 root 1.5 $start_w ||= AE::timer $start, $start, sub {
339     if (($nidle < $idle || @queue) && @pool < $max) {
340     $start_worker->();
341 root 1.3 $scheduler->();
342 root 1.5 } else {
343     undef $start_w;
344 root 1.3 }
345     };
346     };
347    
348     $want_stop = sub {
349 root 1.5 $stop_w ||= AE::timer $stop, $stop, sub {
350     $stop_worker->($pool[0])
351     if $nidle;
352 root 1.3
353 root 1.5 undef $stop_w
354     if $nidle <= $idle;
355 root 1.3 };
356     };
357    
358     $scheduler = sub {
359     if (@queue) {
360     while (@queue) {
361     my $proc = $pool[0];
362    
363     if ($proc->[0] < $load) {
364 root 1.5 # found free worker, increase load
365     unless ($proc->[0]++) {
366     # worker became busy
367     --$nidle
368     or undef $stop_w;
369    
370     $want_start->()
371     if $nidle < $idle && @pool < $max;
372     }
373 root 1.3
374 root 1.5 Array::Heap::adjust_heap_idx @pool, 0;
375 root 1.3
376     my $job = shift @queue;
377     my $ocb = pop @$job;
378    
379     $proc->[2]->(@$job, sub {
380 root 1.5 # reduce load
381     --$proc->[0] # worker still busy?
382     or ++$nidle > $idle # not too many idle processes?
383 root 1.3 or $want_stop->();
384    
385 root 1.5 Array::Heap::adjust_heap_idx @pool, $proc->[1]
386     if defined $proc->[1];
387 root 1.3
388     $scheduler->();
389    
390     &$ocb;
391     });
392     } else {
393 root 1.5 $want_start->()
394     unless @pool >= $max;
395 root 1.3
396     last;
397     }
398     }
399     } elsif ($shutdown) {
400     @pool = ();
401     undef $start_w;
402 root 1.5 undef $start_worker; # frees $destroy_guard reference
403 root 1.3
404 root 1.5 $stop_worker->($pool[0])
405 root 1.3 while $nidle;
406     }
407     };
408    
409     my $shutdown_guard = Guard::guard {
410     $shutdown = 1;
411     $scheduler->();
412     };
413    
414 root 1.5 $start_worker->()
415 root 1.3 while @pool < $idle;
416    
417     sub {
418     $shutdown_guard if 0; # keep it alive
419 root 1.2
420 root 1.5 $start_worker->()
421 root 1.3 unless @pool;
422    
423     push @queue, [@_];
424     $scheduler->();
425     }
426 root 1.2 }
427    
428 root 1.5 =item $pool->(..., $cb->(...))
429 root 1.2
430     Call the RPC function of a worker with the given arguments, and when the
431 root 1.5 worker is done, call the C<$cb> with the results, just like calling the
432 root 1.6 RPC object durectly - see the L<AnyEvent::Fork::RPC> documentation for
433     details on the RPC API.
434 root 1.2
435 root 1.6 If there is no free worker, the call will be queued until a worker becomes
436     available.
437 root 1.2
438     Note that there can be considerable time between calling this method and
439     the call actually being executed. During this time, the parameters passed
440     to this function are effectively read-only - modifying them after the call
441     and before the callback is invoked causes undefined behaviour.
442    
443     =cut
444 root 1.1
445     =back
446    
447 root 1.6 =head1 CHILD USAGE
448    
449     In addition to the L<AnyEvent::Fork::RPC> API, this module implements one
450     more child-side function:
451    
452     =over 4
453    
454     =item AnyEvent::Fork::Pool::retire ()
455    
456     This function sends an event to the parent process to request retirement:
457     the worker is removed from the pool and no new jobs will be sent to it,
458     but it has to handle the jobs that are already queued.
459    
460     The parentheses are part of the syntax: the function usually isn't defined
461     when you compile your code (because that happens I<before> handing the
462     template process over to C<AnyEvent::Fork::Pool::run>, so you need the
463     empty parentheses to tell Perl that the function is indeed a function.
464    
465     Retiring a worker can be useful to gracefully shut it down when the worker
466     deems this useful. For example, after executing a job, one could check
467     the process size or the number of jobs handled so far, and if either is
468     too high, the worker could ask to get retired, to avoid memory leaks to
469     accumulate.
470    
471     =back
472    
473     =head1 POOL PARAMETERS RECIPES
474    
475     This section describes some recipes for pool paramaters. These are mostly
476     meant for the synchronous RPC backend, as the asynchronous RPC backend
477     changes the rules considerably, making workers themselves responsible for
478     their scheduling.
479    
480     =over 4
481    
482     =item low latency - set load = 1
483    
484     If you need a deterministic low latency, you should set the C<load>
485     parameter to C<1>. This ensures that never more than one job is sent to
486     each worker. This avoids having to wait for a previous job to finish.
487    
488     This makes most sense with the synchronous (default) backend, as the
489     asynchronous backend can handle multiple requests concurrently.
490    
491     =item lowest latency - set load = 1 and idle = max
492    
493     To achieve the lowest latency, you additionally should disable any dynamic
494     resizing of the pool by setting C<idle> to the same value as C<max>.
495    
496     =item high throughput, cpu bound jobs - set load >= 2, max = #cpus
497    
498     To get high throughput with cpu-bound jobs, you should set the maximum
499     pool size to the number of cpus in your system, and C<load> to at least
500     C<2>, to make sure there can be another job waiting for the worker when it
501     has finished one.
502    
503     The value of C<2> for C<load> is the minimum value that I<can> achieve
504     100% throughput, but if your parent process itself is sometimes busy, you
505     might need higher values. Also there is a limit on the amount of data that
506     can be "in flight" to the worker, so if you send big blobs of data to your
507     worker, C<load> might have much less of an effect.
508    
509     =item high throughput, I/O bound jobs - set load >= 2, max = 1, or very high
510    
511     When your jobs are I/O bound, using more workers usually boils down to
512     higher throughput, depending very much on your actual workload - sometimes
513     having only one worker is best, for example, when you read or write big
514     files at maixmum speed, as a second worker will increase seek times.
515    
516     =back
517    
518 root 1.1 =head1 SEE ALSO
519    
520     L<AnyEvent::Fork>, to create the processes in the first place.
521    
522     L<AnyEvent::Fork::RPC>, which implements the RPC protocol and API.
523    
524     =head1 AUTHOR AND CONTACT INFORMATION
525    
526     Marc Lehmann <schmorp@schmorp.de>
527     http://software.schmorp.de/pkg/AnyEvent-Fork-Pool
528    
529     =cut
530    
531     1
532