ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-Fork-Pool/Pool.pm
Revision: 1.7
Committed: Sun Apr 21 12:03:02 2013 UTC (11 years ago) by root
Branch: MAIN
Changes since 1.6: +12 -2 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3     AnyEvent::Fork::Pool - simple process pool manager on top of AnyEvent::Fork
4    
5     =head1 SYNOPSIS
6    
7     use AnyEvent;
8     use AnyEvent::Fork::Pool;
9     # use AnyEvent::Fork is not needed
10    
11 root 1.6 # all possible parameters shown, with default values
12 root 1.3 my $pool = AnyEvent::Fork
13     ->new
14     ->require ("MyWorker")
15     ->AnyEvent::Fork::Pool::run (
16     "MyWorker::run", # the worker function
17    
18     # pool management
19     max => 4, # absolute maximum # of processes
20 root 1.6 idle => 0, # minimum # of idle processes
21 root 1.3 load => 2, # queue at most this number of jobs per process
22     start => 0.1, # wait this many seconds before starting a new process
23 root 1.6 stop => 10, # wait this many seconds before stopping an idle process
24 root 1.3 on_destroy => (my $finish = AE::cv), # called when object is destroyed
25    
26     # parameters passed to AnyEvent::Fork::RPC
27     async => 0,
28     on_error => sub { die "FATAL: $_[0]\n" },
29     on_event => sub { my @ev = @_ },
30     init => "MyWorker::init",
31     serialiser => $AnyEvent::Fork::RPC::STRING_SERIALISER,
32     );
33 root 1.1
34     for (1..10) {
35 root 1.3 $pool->(doit => $_, sub {
36 root 1.1 print "MyWorker::run returned @_\n";
37     });
38     }
39    
40     undef $pool;
41    
42     $finish->recv;
43    
44     =head1 DESCRIPTION
45    
46     This module uses processes created via L<AnyEvent::Fork> and the RPC
47     protocol implement in L<AnyEvent::Fork::RPC> to create a load-balanced
48     pool of processes that handles jobs.
49    
50     Understanding of L<AnyEvent::Fork> is helpful but not critical to be able
51     to use this module, but a thorough understanding of L<AnyEvent::Fork::RPC>
52     is, as it defines the actual API that needs to be implemented in the
53 root 1.6 worker processes.
54 root 1.1
55     =head1 EXAMPLES
56    
57 root 1.2 =head1 PARENT USAGE
58 root 1.1
59 root 1.6 To create a pool, you first have to create a L<AnyEvent::Fork> object -
60     this object becomes your template process. Whenever a new worker process
61     is needed, it is forked from this template process. Then you need to
62     "hand off" this template process to the C<AnyEvent::Fork::Pool> module by
63     calling its run method on it:
64    
65     my $template = AnyEvent::Fork
66     ->new
67     ->require ("SomeModule", "MyWorkerModule");
68    
69     my $pool = $template->AnyEvent::Fork::Pool::run ("MyWorkerModule::myfunction");
70    
71     The pool "object" is not a regular Perl object, but a code reference that
72     you can call and that works roughly like calling the worker function
73     directly, except that it returns nothing but instead you need to specify a
74     callback to be invoked once results are in:
75    
76     $pool->(1, 2, 3, sub { warn "myfunction(1,2,3) returned @_" });
77    
78 root 1.1 =over 4
79    
80     =cut
81    
82     package AnyEvent::Fork::Pool;
83    
84     use common::sense;
85    
86 root 1.2 use Scalar::Util ();
87    
88 root 1.3 use Guard ();
89 root 1.2 use Array::Heap ();
90 root 1.1
91     use AnyEvent;
92     use AnyEvent::Fork; # we don't actually depend on it, this is for convenience
93     use AnyEvent::Fork::RPC;
94    
95 root 1.3 # these are used for the first and last argument of events
96     # in the hope of not colliding. yes, I don't like it either,
97     # but didn't come up with an obviously better alternative.
98 root 1.2 my $magic0 = ':t6Z@HK1N%Dx@_7?=~-7NQgWDdAs6a,jFN=wLO0*jD*1%P';
99 root 1.3 my $magic1 = '<~53rexz.U`!]X[A235^"fyEoiTF\T~oH1l/N6+Djep9b~bI9`\1x%B~vWO1q*';
100 root 1.2
101 root 1.1 our $VERSION = 0.1;
102    
103 root 1.5 =item my $pool = AnyEvent::Fork::Pool::run $fork, $function, [key => value...]
104 root 1.3
105 root 1.6 The traditional way to call the pool creation function. But it is way
106     cooler to call it in the following way:
107 root 1.3
108 root 1.5 =item my $pool = $fork->AnyEvent::Fork::Pool::run ($function, [key => value...])
109 root 1.2
110     Creates a new pool object with the specified C<$function> as function
111 root 1.3 (name) to call for each request. The pool uses the C<$fork> object as the
112     template when creating worker processes.
113 root 1.2
114     You can supply your own template process, or tell C<AnyEvent::Fork::Pool>
115     to create one.
116    
117     A relatively large number of key/value pairs can be specified to influence
118     the behaviour. They are grouped into the categories "pool management",
119     "template process" and "rpc parameters".
120 root 1.1
121     =over 4
122    
123 root 1.2 =item Pool Management
124    
125     The pool consists of a certain number of worker processes. These options
126     decide how many of these processes exist and when they are started and
127 root 1.5 stopped.
128 root 1.2
129 root 1.6 The worker pool is dynamically resized, according to (perceived :)
130     load. The minimum size is given by the C<idle> parameter and the maximum
131     size is given by the C<max> parameter. A new worker is started every
132     C<start> seconds at most, and an idle worker is stopped at most every
133     C<stop> second.
134    
135     You can specify the amount of jobs sent to a worker concurrently using the
136     C<load> parameter.
137    
138 root 1.2 =over 4
139    
140 root 1.3 =item idle => $count (default: 0)
141 root 1.2
142 root 1.3 The minimum amount of idle processes in the pool - when there are fewer
143     than this many idle workers, C<AnyEvent::Fork::Pool> will try to start new
144 root 1.6 ones, subject to the limits set by C<max> and C<start>.
145 root 1.3
146 root 1.6 This is also the initial amount of workers in the pool. The default of
147     zero means that the pool starts empty and can shrink back to zero workers
148     over time.
149 root 1.2
150     =item max => $count (default: 4)
151    
152     The maximum number of processes in the pool, in addition to the template
153 root 1.6 process. C<AnyEvent::Fork::Pool> will never have more than this number of
154     worker processes, although there can be more temporarily when a worker is
155     shut down and hasn't exited yet.
156 root 1.2
157 root 1.3 =item load => $count (default: 2)
158 root 1.2
159 root 1.6 The maximum number of concurrent jobs sent to a single worker process.
160 root 1.1
161 root 1.2 Jobs that cannot be sent to a worker immediately (because all workers are
162     busy) will be queued until a worker is available.
163 root 1.1
164 root 1.6 Setting this low improves latency. For example, at C<1>, every job that
165     is sent to a worker is sent to a completely idle worker that doesn't run
166     any other jobs. The downside is that throughput is reduced - a worker that
167     finishes a job needs to wait for a new job from the parent.
168    
169     The default of C<2> is usually a good compromise.
170    
171 root 1.3 =item start => $seconds (default: 0.1)
172 root 1.1
173 root 1.6 When there are fewer than C<idle> workers (or all workers are completely
174     busy), then a timer is started. If the timer elapses and there are still
175     jobs that cannot be queued to a worker, a new worker is started.
176    
177     This sets the minimum time that all workers must be busy before a new
178     worker is started. Or, put differently, the minimum delay between starting
179     new workers.
180    
181     The delay is small by default, which means new workers will be started
182     relatively quickly. A delay of C<0> is possible, and ensures that the pool
183     will grow as quickly as possible under load.
184    
185     Non-zero values are useful to avoid "exploding" a pool because a lot of
186     jobs are queued in an instant.
187    
188     Higher values are often useful to improve efficiency at the cost of
189     latency - when fewer processes can do the job over time, starting more and
190     more is not necessarily going to help.
191 root 1.1
192 root 1.6 =item stop => $seconds (default: 10)
193 root 1.1
194 root 1.2 When a worker has no jobs to execute it becomes idle. An idle worker that
195     hasn't executed a job within this amount of time will be stopped, unless
196     the other parameters say otherwise.
197    
198 root 1.6 Setting this to a very high value means that workers stay around longer,
199     even when they have nothing to do, which can be good as they don't have to
200     be started on the netx load spike again.
201    
202     Setting this to a lower value can be useful to avoid memory or simply
203     process table wastage.
204    
205     Usually, setting this to a time longer than the time between load spikes
206     is best - if you expect a lot of requests every minute and little work
207     in between, setting this to longer than a minute avoids having to stop
208     and start workers. On the other hand, you have to ask yourself if letting
209     workers run idle is a good use of your resources. Try to find a good
210     balance between resource usage of your workers and the time to start new
211     workers - the processes created by L<AnyEvent::Fork> itself is fats at
212     creating workers while not using much memory for them, so most of the
213     overhead is likely from your own code.
214    
215 root 1.2 =item on_destroy => $callback->() (default: none)
216    
217 root 1.6 When a pool object goes out of scope, the outstanding requests are still
218     handled till completion. Only after handling all jobs will the workers
219     be destroyed (and also the template process if it isn't referenced
220     otherwise).
221 root 1.2
222 root 1.6 To find out when a pool I<really> has finished its work, you can set this
223     callback, which will be called when the pool has been destroyed.
224 root 1.2
225     =back
226    
227     =item AnyEvent::Fork::RPC Parameters
228    
229 root 1.6 These parameters are all passed more or less directly to
230     L<AnyEvent::Fork::RPC>. They are only briefly mentioned here, for
231     their full documentation please refer to the L<AnyEvent::Fork::RPC>
232     documentation. Also, the default values mentioned here are only documented
233     as a best effort - the L<AnyEvent::Fork::RPC> documentation is binding.
234 root 1.2
235     =over 4
236 root 1.1
237     =item async => $boolean (default: 0)
238    
239 root 1.6 Whether to use the synchronous or asynchronous RPC backend.
240 root 1.1
241 root 1.2 =item on_error => $callback->($message) (default: die with message)
242 root 1.1
243 root 1.2 The callback to call on any (fatal) errors.
244 root 1.1
245 root 1.2 =item on_event => $callback->(...) (default: C<sub { }>, unlike L<AnyEvent::Fork::RPC>)
246 root 1.1
247 root 1.2 The callback to invoke on events.
248 root 1.1
249 root 1.2 =item init => $initfunction (default: none)
250 root 1.1
251 root 1.2 The function to call in the child, once before handling requests.
252 root 1.1
253 root 1.2 =item serialiser => $serialiser (defailt: $AnyEvent::Fork::RPC::STRING_SERIALISER)
254 root 1.1
255 root 1.2 The serialiser to use.
256 root 1.1
257 root 1.2 =back
258 root 1.1
259     =back
260    
261     =cut
262    
263 root 1.3 sub run {
264     my ($template, $function, %arg) = @_;
265    
266     my $max = $arg{max} || 4;
267     my $idle = $arg{idle} || 0,
268     my $load = $arg{load} || 2,
269     my $start = $arg{start} || 0.1,
270 root 1.6 my $stop = $arg{stop} || 10,
271 root 1.3 my $on_event = $arg{on_event} || sub { },
272     my $on_destroy = $arg{on_destroy};
273    
274     my @rpc = (
275 root 1.5 async => $arg{async},
276     init => $arg{init},
277     serialiser => delete $arg{serialiser},
278     on_error => $arg{on_error},
279 root 1.3 );
280    
281     my (@pool, @queue, $nidle, $start_w, $stop_w, $shutdown);
282 root 1.5 my ($start_worker, $stop_worker, $want_start, $want_stop, $scheduler);
283 root 1.3
284     my $destroy_guard = Guard::guard {
285     $on_destroy->()
286     if $on_destroy;
287     };
288 root 1.2
289 root 1.3 $template
290     ->require ("AnyEvent::Fork::RPC::" . ($arg{async} ? "Async" : "Sync"))
291 root 1.2 ->eval ('
292 root 1.3 my ($magic0, $magic1) = @_;
293 root 1.6 sub AnyEvent::Fork::Pool::retire() {
294     AnyEvent::Fork::RPC::event $magic0, "quit", $magic1;
295 root 1.2 }
296 root 1.3 ', $magic0, $magic1)
297 root 1.6 ;
298 root 1.3
299 root 1.5 $start_worker = sub {
300 root 1.3 my $proc = [0, 0, undef]; # load, index, rpc
301 root 1.2
302 root 1.3 $proc->[2] = $template
303     ->fork
304     ->AnyEvent::Fork::RPC::run ($function,
305     @rpc,
306     on_event => sub {
307     if (@_ == 3 && $_[0] eq $magic0 && $_[2] eq $magic1) {
308     $destroy_guard if 0; # keep it alive
309 root 1.2
310 root 1.5 $_[1] eq "quit" and $stop_worker->($proc);
311 root 1.3 return;
312 root 1.2 }
313    
314 root 1.3 &$on_event;
315     },
316     )
317     ;
318    
319     ++$nidle;
320 root 1.5 Array::Heap::push_heap_idx @pool, $proc;
321 root 1.3
322     Scalar::Util::weaken $proc;
323     };
324    
325 root 1.5 $stop_worker = sub {
326 root 1.3 my $proc = shift;
327    
328     $proc->[0]
329     or --$nidle;
330    
331     Array::Heap::splice_heap_idx @pool, $proc->[1]
332     if defined $proc->[1];
333 root 1.7
334     @$proc = 0; # tell others to leave it be
335 root 1.3 };
336    
337     $want_start = sub {
338     undef $stop_w;
339    
340 root 1.5 $start_w ||= AE::timer $start, $start, sub {
341     if (($nidle < $idle || @queue) && @pool < $max) {
342     $start_worker->();
343 root 1.3 $scheduler->();
344 root 1.5 } else {
345     undef $start_w;
346 root 1.3 }
347     };
348     };
349    
350     $want_stop = sub {
351 root 1.5 $stop_w ||= AE::timer $stop, $stop, sub {
352     $stop_worker->($pool[0])
353     if $nidle;
354 root 1.3
355 root 1.5 undef $stop_w
356     if $nidle <= $idle;
357 root 1.3 };
358     };
359    
360     $scheduler = sub {
361     if (@queue) {
362     while (@queue) {
363 root 1.7 @pool or $start_worker->();
364    
365 root 1.3 my $proc = $pool[0];
366    
367     if ($proc->[0] < $load) {
368 root 1.5 # found free worker, increase load
369     unless ($proc->[0]++) {
370     # worker became busy
371     --$nidle
372     or undef $stop_w;
373    
374     $want_start->()
375     if $nidle < $idle && @pool < $max;
376     }
377 root 1.3
378 root 1.5 Array::Heap::adjust_heap_idx @pool, 0;
379 root 1.3
380     my $job = shift @queue;
381     my $ocb = pop @$job;
382    
383     $proc->[2]->(@$job, sub {
384 root 1.5 # reduce load
385     --$proc->[0] # worker still busy?
386     or ++$nidle > $idle # not too many idle processes?
387 root 1.3 or $want_stop->();
388    
389 root 1.5 Array::Heap::adjust_heap_idx @pool, $proc->[1]
390     if defined $proc->[1];
391 root 1.3
392 root 1.7 &$ocb;
393    
394 root 1.3 $scheduler->();
395     });
396     } else {
397 root 1.5 $want_start->()
398     unless @pool >= $max;
399 root 1.3
400     last;
401     }
402     }
403     } elsif ($shutdown) {
404     @pool = ();
405     undef $start_w;
406 root 1.5 undef $start_worker; # frees $destroy_guard reference
407 root 1.3
408 root 1.5 $stop_worker->($pool[0])
409 root 1.3 while $nidle;
410     }
411     };
412    
413     my $shutdown_guard = Guard::guard {
414     $shutdown = 1;
415     $scheduler->();
416     };
417    
418 root 1.5 $start_worker->()
419 root 1.3 while @pool < $idle;
420    
421     sub {
422     $shutdown_guard if 0; # keep it alive
423 root 1.2
424 root 1.5 $start_worker->()
425 root 1.3 unless @pool;
426    
427     push @queue, [@_];
428     $scheduler->();
429     }
430 root 1.2 }
431    
432 root 1.5 =item $pool->(..., $cb->(...))
433 root 1.2
434     Call the RPC function of a worker with the given arguments, and when the
435 root 1.5 worker is done, call the C<$cb> with the results, just like calling the
436 root 1.6 RPC object durectly - see the L<AnyEvent::Fork::RPC> documentation for
437     details on the RPC API.
438 root 1.2
439 root 1.6 If there is no free worker, the call will be queued until a worker becomes
440     available.
441 root 1.2
442     Note that there can be considerable time between calling this method and
443     the call actually being executed. During this time, the parameters passed
444     to this function are effectively read-only - modifying them after the call
445     and before the callback is invoked causes undefined behaviour.
446    
447     =cut
448 root 1.1
449     =back
450    
451 root 1.6 =head1 CHILD USAGE
452    
453     In addition to the L<AnyEvent::Fork::RPC> API, this module implements one
454     more child-side function:
455    
456     =over 4
457    
458     =item AnyEvent::Fork::Pool::retire ()
459    
460     This function sends an event to the parent process to request retirement:
461     the worker is removed from the pool and no new jobs will be sent to it,
462     but it has to handle the jobs that are already queued.
463    
464     The parentheses are part of the syntax: the function usually isn't defined
465     when you compile your code (because that happens I<before> handing the
466     template process over to C<AnyEvent::Fork::Pool::run>, so you need the
467     empty parentheses to tell Perl that the function is indeed a function.
468    
469     Retiring a worker can be useful to gracefully shut it down when the worker
470     deems this useful. For example, after executing a job, one could check
471     the process size or the number of jobs handled so far, and if either is
472     too high, the worker could ask to get retired, to avoid memory leaks to
473     accumulate.
474    
475     =back
476    
477     =head1 POOL PARAMETERS RECIPES
478    
479     This section describes some recipes for pool paramaters. These are mostly
480     meant for the synchronous RPC backend, as the asynchronous RPC backend
481     changes the rules considerably, making workers themselves responsible for
482     their scheduling.
483    
484     =over 4
485    
486     =item low latency - set load = 1
487    
488     If you need a deterministic low latency, you should set the C<load>
489     parameter to C<1>. This ensures that never more than one job is sent to
490     each worker. This avoids having to wait for a previous job to finish.
491    
492     This makes most sense with the synchronous (default) backend, as the
493     asynchronous backend can handle multiple requests concurrently.
494    
495     =item lowest latency - set load = 1 and idle = max
496    
497     To achieve the lowest latency, you additionally should disable any dynamic
498     resizing of the pool by setting C<idle> to the same value as C<max>.
499    
500     =item high throughput, cpu bound jobs - set load >= 2, max = #cpus
501    
502     To get high throughput with cpu-bound jobs, you should set the maximum
503     pool size to the number of cpus in your system, and C<load> to at least
504     C<2>, to make sure there can be another job waiting for the worker when it
505     has finished one.
506    
507     The value of C<2> for C<load> is the minimum value that I<can> achieve
508     100% throughput, but if your parent process itself is sometimes busy, you
509     might need higher values. Also there is a limit on the amount of data that
510     can be "in flight" to the worker, so if you send big blobs of data to your
511     worker, C<load> might have much less of an effect.
512    
513     =item high throughput, I/O bound jobs - set load >= 2, max = 1, or very high
514    
515     When your jobs are I/O bound, using more workers usually boils down to
516     higher throughput, depending very much on your actual workload - sometimes
517     having only one worker is best, for example, when you read or write big
518     files at maixmum speed, as a second worker will increase seek times.
519    
520     =back
521    
522 root 1.7 =head1 EXCEPTIONS
523    
524     The same "policy" as with L<AnyEvent::Fork::RPC> applies - exceptins will
525     not be caught, and exceptions in both worker and in callbacks causes
526     undesirable or undefined behaviour.
527    
528 root 1.1 =head1 SEE ALSO
529    
530     L<AnyEvent::Fork>, to create the processes in the first place.
531    
532     L<AnyEvent::Fork::RPC>, which implements the RPC protocol and API.
533    
534     =head1 AUTHOR AND CONTACT INFORMATION
535    
536     Marc Lehmann <schmorp@schmorp.de>
537     http://software.schmorp.de/pkg/AnyEvent-Fork-Pool
538    
539     =cut
540    
541     1
542