ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-Fork-Pool/Pool.pm
Revision: 1.8
Committed: Sun Apr 21 12:28:34 2013 UTC (11 years ago) by root
Branch: MAIN
CVS Tags: rel-0_1
Changes since 1.7: +2 -0 lines
Log Message:
0.1

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3     AnyEvent::Fork::Pool - simple process pool manager on top of AnyEvent::Fork
4    
5 root 1.8 THE API IS NOT FINISHED, CONSIDER THIS AN ALPHA RELEASE
6    
7 root 1.1 =head1 SYNOPSIS
8    
9     use AnyEvent;
10     use AnyEvent::Fork::Pool;
11     # use AnyEvent::Fork is not needed
12    
13 root 1.6 # all possible parameters shown, with default values
14 root 1.3 my $pool = AnyEvent::Fork
15     ->new
16     ->require ("MyWorker")
17     ->AnyEvent::Fork::Pool::run (
18     "MyWorker::run", # the worker function
19    
20     # pool management
21     max => 4, # absolute maximum # of processes
22 root 1.6 idle => 0, # minimum # of idle processes
23 root 1.3 load => 2, # queue at most this number of jobs per process
24     start => 0.1, # wait this many seconds before starting a new process
25 root 1.6 stop => 10, # wait this many seconds before stopping an idle process
26 root 1.3 on_destroy => (my $finish = AE::cv), # called when object is destroyed
27    
28     # parameters passed to AnyEvent::Fork::RPC
29     async => 0,
30     on_error => sub { die "FATAL: $_[0]\n" },
31     on_event => sub { my @ev = @_ },
32     init => "MyWorker::init",
33     serialiser => $AnyEvent::Fork::RPC::STRING_SERIALISER,
34     );
35 root 1.1
36     for (1..10) {
37 root 1.3 $pool->(doit => $_, sub {
38 root 1.1 print "MyWorker::run returned @_\n";
39     });
40     }
41    
42     undef $pool;
43    
44     $finish->recv;
45    
46     =head1 DESCRIPTION
47    
48     This module uses processes created via L<AnyEvent::Fork> and the RPC
49     protocol implement in L<AnyEvent::Fork::RPC> to create a load-balanced
50     pool of processes that handles jobs.
51    
52     Understanding of L<AnyEvent::Fork> is helpful but not critical to be able
53     to use this module, but a thorough understanding of L<AnyEvent::Fork::RPC>
54     is, as it defines the actual API that needs to be implemented in the
55 root 1.6 worker processes.
56 root 1.1
57     =head1 EXAMPLES
58    
59 root 1.2 =head1 PARENT USAGE
60 root 1.1
61 root 1.6 To create a pool, you first have to create a L<AnyEvent::Fork> object -
62     this object becomes your template process. Whenever a new worker process
63     is needed, it is forked from this template process. Then you need to
64     "hand off" this template process to the C<AnyEvent::Fork::Pool> module by
65     calling its run method on it:
66    
67     my $template = AnyEvent::Fork
68     ->new
69     ->require ("SomeModule", "MyWorkerModule");
70    
71     my $pool = $template->AnyEvent::Fork::Pool::run ("MyWorkerModule::myfunction");
72    
73     The pool "object" is not a regular Perl object, but a code reference that
74     you can call and that works roughly like calling the worker function
75     directly, except that it returns nothing but instead you need to specify a
76     callback to be invoked once results are in:
77    
78     $pool->(1, 2, 3, sub { warn "myfunction(1,2,3) returned @_" });
79    
80 root 1.1 =over 4
81    
82     =cut
83    
84     package AnyEvent::Fork::Pool;
85    
86     use common::sense;
87    
88 root 1.2 use Scalar::Util ();
89    
90 root 1.3 use Guard ();
91 root 1.2 use Array::Heap ();
92 root 1.1
93     use AnyEvent;
94     use AnyEvent::Fork; # we don't actually depend on it, this is for convenience
95     use AnyEvent::Fork::RPC;
96    
97 root 1.3 # these are used for the first and last argument of events
98     # in the hope of not colliding. yes, I don't like it either,
99     # but didn't come up with an obviously better alternative.
100 root 1.2 my $magic0 = ':t6Z@HK1N%Dx@_7?=~-7NQgWDdAs6a,jFN=wLO0*jD*1%P';
101 root 1.3 my $magic1 = '<~53rexz.U`!]X[A235^"fyEoiTF\T~oH1l/N6+Djep9b~bI9`\1x%B~vWO1q*';
102 root 1.2
103 root 1.1 our $VERSION = 0.1;
104    
105 root 1.5 =item my $pool = AnyEvent::Fork::Pool::run $fork, $function, [key => value...]
106 root 1.3
107 root 1.6 The traditional way to call the pool creation function. But it is way
108     cooler to call it in the following way:
109 root 1.3
110 root 1.5 =item my $pool = $fork->AnyEvent::Fork::Pool::run ($function, [key => value...])
111 root 1.2
112     Creates a new pool object with the specified C<$function> as function
113 root 1.3 (name) to call for each request. The pool uses the C<$fork> object as the
114     template when creating worker processes.
115 root 1.2
116     You can supply your own template process, or tell C<AnyEvent::Fork::Pool>
117     to create one.
118    
119     A relatively large number of key/value pairs can be specified to influence
120     the behaviour. They are grouped into the categories "pool management",
121     "template process" and "rpc parameters".
122 root 1.1
123     =over 4
124    
125 root 1.2 =item Pool Management
126    
127     The pool consists of a certain number of worker processes. These options
128     decide how many of these processes exist and when they are started and
129 root 1.5 stopped.
130 root 1.2
131 root 1.6 The worker pool is dynamically resized, according to (perceived :)
132     load. The minimum size is given by the C<idle> parameter and the maximum
133     size is given by the C<max> parameter. A new worker is started every
134     C<start> seconds at most, and an idle worker is stopped at most every
135     C<stop> second.
136    
137     You can specify the amount of jobs sent to a worker concurrently using the
138     C<load> parameter.
139    
140 root 1.2 =over 4
141    
142 root 1.3 =item idle => $count (default: 0)
143 root 1.2
144 root 1.3 The minimum amount of idle processes in the pool - when there are fewer
145     than this many idle workers, C<AnyEvent::Fork::Pool> will try to start new
146 root 1.6 ones, subject to the limits set by C<max> and C<start>.
147 root 1.3
148 root 1.6 This is also the initial amount of workers in the pool. The default of
149     zero means that the pool starts empty and can shrink back to zero workers
150     over time.
151 root 1.2
152     =item max => $count (default: 4)
153    
154     The maximum number of processes in the pool, in addition to the template
155 root 1.6 process. C<AnyEvent::Fork::Pool> will never have more than this number of
156     worker processes, although there can be more temporarily when a worker is
157     shut down and hasn't exited yet.
158 root 1.2
159 root 1.3 =item load => $count (default: 2)
160 root 1.2
161 root 1.6 The maximum number of concurrent jobs sent to a single worker process.
162 root 1.1
163 root 1.2 Jobs that cannot be sent to a worker immediately (because all workers are
164     busy) will be queued until a worker is available.
165 root 1.1
166 root 1.6 Setting this low improves latency. For example, at C<1>, every job that
167     is sent to a worker is sent to a completely idle worker that doesn't run
168     any other jobs. The downside is that throughput is reduced - a worker that
169     finishes a job needs to wait for a new job from the parent.
170    
171     The default of C<2> is usually a good compromise.
172    
173 root 1.3 =item start => $seconds (default: 0.1)
174 root 1.1
175 root 1.6 When there are fewer than C<idle> workers (or all workers are completely
176     busy), then a timer is started. If the timer elapses and there are still
177     jobs that cannot be queued to a worker, a new worker is started.
178    
179     This sets the minimum time that all workers must be busy before a new
180     worker is started. Or, put differently, the minimum delay between starting
181     new workers.
182    
183     The delay is small by default, which means new workers will be started
184     relatively quickly. A delay of C<0> is possible, and ensures that the pool
185     will grow as quickly as possible under load.
186    
187     Non-zero values are useful to avoid "exploding" a pool because a lot of
188     jobs are queued in an instant.
189    
190     Higher values are often useful to improve efficiency at the cost of
191     latency - when fewer processes can do the job over time, starting more and
192     more is not necessarily going to help.
193 root 1.1
194 root 1.6 =item stop => $seconds (default: 10)
195 root 1.1
196 root 1.2 When a worker has no jobs to execute it becomes idle. An idle worker that
197     hasn't executed a job within this amount of time will be stopped, unless
198     the other parameters say otherwise.
199    
200 root 1.6 Setting this to a very high value means that workers stay around longer,
201     even when they have nothing to do, which can be good as they don't have to
202     be started on the netx load spike again.
203    
204     Setting this to a lower value can be useful to avoid memory or simply
205     process table wastage.
206    
207     Usually, setting this to a time longer than the time between load spikes
208     is best - if you expect a lot of requests every minute and little work
209     in between, setting this to longer than a minute avoids having to stop
210     and start workers. On the other hand, you have to ask yourself if letting
211     workers run idle is a good use of your resources. Try to find a good
212     balance between resource usage of your workers and the time to start new
213     workers - the processes created by L<AnyEvent::Fork> itself is fats at
214     creating workers while not using much memory for them, so most of the
215     overhead is likely from your own code.
216    
217 root 1.2 =item on_destroy => $callback->() (default: none)
218    
219 root 1.6 When a pool object goes out of scope, the outstanding requests are still
220     handled till completion. Only after handling all jobs will the workers
221     be destroyed (and also the template process if it isn't referenced
222     otherwise).
223 root 1.2
224 root 1.6 To find out when a pool I<really> has finished its work, you can set this
225     callback, which will be called when the pool has been destroyed.
226 root 1.2
227     =back
228    
229     =item AnyEvent::Fork::RPC Parameters
230    
231 root 1.6 These parameters are all passed more or less directly to
232     L<AnyEvent::Fork::RPC>. They are only briefly mentioned here, for
233     their full documentation please refer to the L<AnyEvent::Fork::RPC>
234     documentation. Also, the default values mentioned here are only documented
235     as a best effort - the L<AnyEvent::Fork::RPC> documentation is binding.
236 root 1.2
237     =over 4
238 root 1.1
239     =item async => $boolean (default: 0)
240    
241 root 1.6 Whether to use the synchronous or asynchronous RPC backend.
242 root 1.1
243 root 1.2 =item on_error => $callback->($message) (default: die with message)
244 root 1.1
245 root 1.2 The callback to call on any (fatal) errors.
246 root 1.1
247 root 1.2 =item on_event => $callback->(...) (default: C<sub { }>, unlike L<AnyEvent::Fork::RPC>)
248 root 1.1
249 root 1.2 The callback to invoke on events.
250 root 1.1
251 root 1.2 =item init => $initfunction (default: none)
252 root 1.1
253 root 1.2 The function to call in the child, once before handling requests.
254 root 1.1
255 root 1.2 =item serialiser => $serialiser (defailt: $AnyEvent::Fork::RPC::STRING_SERIALISER)
256 root 1.1
257 root 1.2 The serialiser to use.
258 root 1.1
259 root 1.2 =back
260 root 1.1
261     =back
262    
263     =cut
264    
265 root 1.3 sub run {
266     my ($template, $function, %arg) = @_;
267    
268     my $max = $arg{max} || 4;
269     my $idle = $arg{idle} || 0,
270     my $load = $arg{load} || 2,
271     my $start = $arg{start} || 0.1,
272 root 1.6 my $stop = $arg{stop} || 10,
273 root 1.3 my $on_event = $arg{on_event} || sub { },
274     my $on_destroy = $arg{on_destroy};
275    
276     my @rpc = (
277 root 1.5 async => $arg{async},
278     init => $arg{init},
279     serialiser => delete $arg{serialiser},
280     on_error => $arg{on_error},
281 root 1.3 );
282    
283     my (@pool, @queue, $nidle, $start_w, $stop_w, $shutdown);
284 root 1.5 my ($start_worker, $stop_worker, $want_start, $want_stop, $scheduler);
285 root 1.3
286     my $destroy_guard = Guard::guard {
287     $on_destroy->()
288     if $on_destroy;
289     };
290 root 1.2
291 root 1.3 $template
292     ->require ("AnyEvent::Fork::RPC::" . ($arg{async} ? "Async" : "Sync"))
293 root 1.2 ->eval ('
294 root 1.3 my ($magic0, $magic1) = @_;
295 root 1.6 sub AnyEvent::Fork::Pool::retire() {
296     AnyEvent::Fork::RPC::event $magic0, "quit", $magic1;
297 root 1.2 }
298 root 1.3 ', $magic0, $magic1)
299 root 1.6 ;
300 root 1.3
301 root 1.5 $start_worker = sub {
302 root 1.3 my $proc = [0, 0, undef]; # load, index, rpc
303 root 1.2
304 root 1.3 $proc->[2] = $template
305     ->fork
306     ->AnyEvent::Fork::RPC::run ($function,
307     @rpc,
308     on_event => sub {
309     if (@_ == 3 && $_[0] eq $magic0 && $_[2] eq $magic1) {
310     $destroy_guard if 0; # keep it alive
311 root 1.2
312 root 1.5 $_[1] eq "quit" and $stop_worker->($proc);
313 root 1.3 return;
314 root 1.2 }
315    
316 root 1.3 &$on_event;
317     },
318     )
319     ;
320    
321     ++$nidle;
322 root 1.5 Array::Heap::push_heap_idx @pool, $proc;
323 root 1.3
324     Scalar::Util::weaken $proc;
325     };
326    
327 root 1.5 $stop_worker = sub {
328 root 1.3 my $proc = shift;
329    
330     $proc->[0]
331     or --$nidle;
332    
333     Array::Heap::splice_heap_idx @pool, $proc->[1]
334     if defined $proc->[1];
335 root 1.7
336     @$proc = 0; # tell others to leave it be
337 root 1.3 };
338    
339     $want_start = sub {
340     undef $stop_w;
341    
342 root 1.5 $start_w ||= AE::timer $start, $start, sub {
343     if (($nidle < $idle || @queue) && @pool < $max) {
344     $start_worker->();
345 root 1.3 $scheduler->();
346 root 1.5 } else {
347     undef $start_w;
348 root 1.3 }
349     };
350     };
351    
352     $want_stop = sub {
353 root 1.5 $stop_w ||= AE::timer $stop, $stop, sub {
354     $stop_worker->($pool[0])
355     if $nidle;
356 root 1.3
357 root 1.5 undef $stop_w
358     if $nidle <= $idle;
359 root 1.3 };
360     };
361    
362     $scheduler = sub {
363     if (@queue) {
364     while (@queue) {
365 root 1.7 @pool or $start_worker->();
366    
367 root 1.3 my $proc = $pool[0];
368    
369     if ($proc->[0] < $load) {
370 root 1.5 # found free worker, increase load
371     unless ($proc->[0]++) {
372     # worker became busy
373     --$nidle
374     or undef $stop_w;
375    
376     $want_start->()
377     if $nidle < $idle && @pool < $max;
378     }
379 root 1.3
380 root 1.5 Array::Heap::adjust_heap_idx @pool, 0;
381 root 1.3
382     my $job = shift @queue;
383     my $ocb = pop @$job;
384    
385     $proc->[2]->(@$job, sub {
386 root 1.5 # reduce load
387     --$proc->[0] # worker still busy?
388     or ++$nidle > $idle # not too many idle processes?
389 root 1.3 or $want_stop->();
390    
391 root 1.5 Array::Heap::adjust_heap_idx @pool, $proc->[1]
392     if defined $proc->[1];
393 root 1.3
394 root 1.7 &$ocb;
395    
396 root 1.3 $scheduler->();
397     });
398     } else {
399 root 1.5 $want_start->()
400     unless @pool >= $max;
401 root 1.3
402     last;
403     }
404     }
405     } elsif ($shutdown) {
406     @pool = ();
407     undef $start_w;
408 root 1.5 undef $start_worker; # frees $destroy_guard reference
409 root 1.3
410 root 1.5 $stop_worker->($pool[0])
411 root 1.3 while $nidle;
412     }
413     };
414    
415     my $shutdown_guard = Guard::guard {
416     $shutdown = 1;
417     $scheduler->();
418     };
419    
420 root 1.5 $start_worker->()
421 root 1.3 while @pool < $idle;
422    
423     sub {
424     $shutdown_guard if 0; # keep it alive
425 root 1.2
426 root 1.5 $start_worker->()
427 root 1.3 unless @pool;
428    
429     push @queue, [@_];
430     $scheduler->();
431     }
432 root 1.2 }
433    
434 root 1.5 =item $pool->(..., $cb->(...))
435 root 1.2
436     Call the RPC function of a worker with the given arguments, and when the
437 root 1.5 worker is done, call the C<$cb> with the results, just like calling the
438 root 1.6 RPC object durectly - see the L<AnyEvent::Fork::RPC> documentation for
439     details on the RPC API.
440 root 1.2
441 root 1.6 If there is no free worker, the call will be queued until a worker becomes
442     available.
443 root 1.2
444     Note that there can be considerable time between calling this method and
445     the call actually being executed. During this time, the parameters passed
446     to this function are effectively read-only - modifying them after the call
447     and before the callback is invoked causes undefined behaviour.
448    
449     =cut
450 root 1.1
451     =back
452    
453 root 1.6 =head1 CHILD USAGE
454    
455     In addition to the L<AnyEvent::Fork::RPC> API, this module implements one
456     more child-side function:
457    
458     =over 4
459    
460     =item AnyEvent::Fork::Pool::retire ()
461    
462     This function sends an event to the parent process to request retirement:
463     the worker is removed from the pool and no new jobs will be sent to it,
464     but it has to handle the jobs that are already queued.
465    
466     The parentheses are part of the syntax: the function usually isn't defined
467     when you compile your code (because that happens I<before> handing the
468     template process over to C<AnyEvent::Fork::Pool::run>, so you need the
469     empty parentheses to tell Perl that the function is indeed a function.
470    
471     Retiring a worker can be useful to gracefully shut it down when the worker
472     deems this useful. For example, after executing a job, one could check
473     the process size or the number of jobs handled so far, and if either is
474     too high, the worker could ask to get retired, to avoid memory leaks to
475     accumulate.
476    
477     =back
478    
479     =head1 POOL PARAMETERS RECIPES
480    
481     This section describes some recipes for pool paramaters. These are mostly
482     meant for the synchronous RPC backend, as the asynchronous RPC backend
483     changes the rules considerably, making workers themselves responsible for
484     their scheduling.
485    
486     =over 4
487    
488     =item low latency - set load = 1
489    
490     If you need a deterministic low latency, you should set the C<load>
491     parameter to C<1>. This ensures that never more than one job is sent to
492     each worker. This avoids having to wait for a previous job to finish.
493    
494     This makes most sense with the synchronous (default) backend, as the
495     asynchronous backend can handle multiple requests concurrently.
496    
497     =item lowest latency - set load = 1 and idle = max
498    
499     To achieve the lowest latency, you additionally should disable any dynamic
500     resizing of the pool by setting C<idle> to the same value as C<max>.
501    
502     =item high throughput, cpu bound jobs - set load >= 2, max = #cpus
503    
504     To get high throughput with cpu-bound jobs, you should set the maximum
505     pool size to the number of cpus in your system, and C<load> to at least
506     C<2>, to make sure there can be another job waiting for the worker when it
507     has finished one.
508    
509     The value of C<2> for C<load> is the minimum value that I<can> achieve
510     100% throughput, but if your parent process itself is sometimes busy, you
511     might need higher values. Also there is a limit on the amount of data that
512     can be "in flight" to the worker, so if you send big blobs of data to your
513     worker, C<load> might have much less of an effect.
514    
515     =item high throughput, I/O bound jobs - set load >= 2, max = 1, or very high
516    
517     When your jobs are I/O bound, using more workers usually boils down to
518     higher throughput, depending very much on your actual workload - sometimes
519     having only one worker is best, for example, when you read or write big
520     files at maixmum speed, as a second worker will increase seek times.
521    
522     =back
523    
524 root 1.7 =head1 EXCEPTIONS
525    
526     The same "policy" as with L<AnyEvent::Fork::RPC> applies - exceptins will
527     not be caught, and exceptions in both worker and in callbacks causes
528     undesirable or undefined behaviour.
529    
530 root 1.1 =head1 SEE ALSO
531    
532     L<AnyEvent::Fork>, to create the processes in the first place.
533    
534     L<AnyEvent::Fork::RPC>, which implements the RPC protocol and API.
535    
536     =head1 AUTHOR AND CONTACT INFORMATION
537    
538     Marc Lehmann <schmorp@schmorp.de>
539     http://software.schmorp.de/pkg/AnyEvent-Fork-Pool
540    
541     =cut
542    
543     1
544