ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-Fork-Pool/Pool.pm
Revision: 1.9
Committed: Thu Apr 25 00:27:22 2013 UTC (11 years ago) by root
Branch: MAIN
Changes since 1.8: +66 -1 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3     AnyEvent::Fork::Pool - simple process pool manager on top of AnyEvent::Fork
4    
5 root 1.8 THE API IS NOT FINISHED, CONSIDER THIS AN ALPHA RELEASE
6    
7 root 1.1 =head1 SYNOPSIS
8    
9     use AnyEvent;
10     use AnyEvent::Fork::Pool;
11     # use AnyEvent::Fork is not needed
12    
13 root 1.6 # all possible parameters shown, with default values
14 root 1.3 my $pool = AnyEvent::Fork
15     ->new
16     ->require ("MyWorker")
17     ->AnyEvent::Fork::Pool::run (
18     "MyWorker::run", # the worker function
19    
20     # pool management
21     max => 4, # absolute maximum # of processes
22 root 1.6 idle => 0, # minimum # of idle processes
23 root 1.3 load => 2, # queue at most this number of jobs per process
24     start => 0.1, # wait this many seconds before starting a new process
25 root 1.6 stop => 10, # wait this many seconds before stopping an idle process
26 root 1.3 on_destroy => (my $finish = AE::cv), # called when object is destroyed
27    
28     # parameters passed to AnyEvent::Fork::RPC
29     async => 0,
30     on_error => sub { die "FATAL: $_[0]\n" },
31     on_event => sub { my @ev = @_ },
32     init => "MyWorker::init",
33     serialiser => $AnyEvent::Fork::RPC::STRING_SERIALISER,
34     );
35 root 1.1
36     for (1..10) {
37 root 1.3 $pool->(doit => $_, sub {
38 root 1.1 print "MyWorker::run returned @_\n";
39     });
40     }
41    
42     undef $pool;
43    
44     $finish->recv;
45    
46     =head1 DESCRIPTION
47    
48     This module uses processes created via L<AnyEvent::Fork> and the RPC
49     protocol implement in L<AnyEvent::Fork::RPC> to create a load-balanced
50     pool of processes that handles jobs.
51    
52     Understanding of L<AnyEvent::Fork> is helpful but not critical to be able
53     to use this module, but a thorough understanding of L<AnyEvent::Fork::RPC>
54     is, as it defines the actual API that needs to be implemented in the
55 root 1.6 worker processes.
56 root 1.1
57     =head1 EXAMPLES
58    
59 root 1.2 =head1 PARENT USAGE
60 root 1.1
61 root 1.6 To create a pool, you first have to create a L<AnyEvent::Fork> object -
62     this object becomes your template process. Whenever a new worker process
63     is needed, it is forked from this template process. Then you need to
64     "hand off" this template process to the C<AnyEvent::Fork::Pool> module by
65     calling its run method on it:
66    
67     my $template = AnyEvent::Fork
68     ->new
69     ->require ("SomeModule", "MyWorkerModule");
70    
71     my $pool = $template->AnyEvent::Fork::Pool::run ("MyWorkerModule::myfunction");
72    
73     The pool "object" is not a regular Perl object, but a code reference that
74     you can call and that works roughly like calling the worker function
75     directly, except that it returns nothing but instead you need to specify a
76     callback to be invoked once results are in:
77    
78     $pool->(1, 2, 3, sub { warn "myfunction(1,2,3) returned @_" });
79    
80 root 1.1 =over 4
81    
82     =cut
83    
84     package AnyEvent::Fork::Pool;
85    
86     use common::sense;
87    
88 root 1.2 use Scalar::Util ();
89    
90 root 1.3 use Guard ();
91 root 1.2 use Array::Heap ();
92 root 1.1
93     use AnyEvent;
94 root 1.9 # explicit version on next line, as some cpan-testers test with the 0.1 version,
95     # ignoring dependencies, and this line will at least give a clear indication of that.
96     use AnyEvent::Fork 0.6; # we don't actually depend on it, this is for convenience
97 root 1.1 use AnyEvent::Fork::RPC;
98    
99 root 1.3 # these are used for the first and last argument of events
100     # in the hope of not colliding. yes, I don't like it either,
101     # but didn't come up with an obviously better alternative.
102 root 1.2 my $magic0 = ':t6Z@HK1N%Dx@_7?=~-7NQgWDdAs6a,jFN=wLO0*jD*1%P';
103 root 1.3 my $magic1 = '<~53rexz.U`!]X[A235^"fyEoiTF\T~oH1l/N6+Djep9b~bI9`\1x%B~vWO1q*';
104 root 1.2
105 root 1.1 our $VERSION = 0.1;
106    
107 root 1.5 =item my $pool = AnyEvent::Fork::Pool::run $fork, $function, [key => value...]
108 root 1.3
109 root 1.6 The traditional way to call the pool creation function. But it is way
110     cooler to call it in the following way:
111 root 1.3
112 root 1.5 =item my $pool = $fork->AnyEvent::Fork::Pool::run ($function, [key => value...])
113 root 1.2
114     Creates a new pool object with the specified C<$function> as function
115 root 1.3 (name) to call for each request. The pool uses the C<$fork> object as the
116     template when creating worker processes.
117 root 1.2
118     You can supply your own template process, or tell C<AnyEvent::Fork::Pool>
119     to create one.
120    
121     A relatively large number of key/value pairs can be specified to influence
122     the behaviour. They are grouped into the categories "pool management",
123     "template process" and "rpc parameters".
124 root 1.1
125     =over 4
126    
127 root 1.2 =item Pool Management
128    
129     The pool consists of a certain number of worker processes. These options
130     decide how many of these processes exist and when they are started and
131 root 1.5 stopped.
132 root 1.2
133 root 1.6 The worker pool is dynamically resized, according to (perceived :)
134     load. The minimum size is given by the C<idle> parameter and the maximum
135     size is given by the C<max> parameter. A new worker is started every
136     C<start> seconds at most, and an idle worker is stopped at most every
137     C<stop> second.
138    
139     You can specify the amount of jobs sent to a worker concurrently using the
140     C<load> parameter.
141    
142 root 1.2 =over 4
143    
144 root 1.3 =item idle => $count (default: 0)
145 root 1.2
146 root 1.3 The minimum amount of idle processes in the pool - when there are fewer
147     than this many idle workers, C<AnyEvent::Fork::Pool> will try to start new
148 root 1.6 ones, subject to the limits set by C<max> and C<start>.
149 root 1.3
150 root 1.6 This is also the initial amount of workers in the pool. The default of
151     zero means that the pool starts empty and can shrink back to zero workers
152     over time.
153 root 1.2
154     =item max => $count (default: 4)
155    
156     The maximum number of processes in the pool, in addition to the template
157 root 1.6 process. C<AnyEvent::Fork::Pool> will never have more than this number of
158     worker processes, although there can be more temporarily when a worker is
159     shut down and hasn't exited yet.
160 root 1.2
161 root 1.3 =item load => $count (default: 2)
162 root 1.2
163 root 1.6 The maximum number of concurrent jobs sent to a single worker process.
164 root 1.1
165 root 1.2 Jobs that cannot be sent to a worker immediately (because all workers are
166     busy) will be queued until a worker is available.
167 root 1.1
168 root 1.6 Setting this low improves latency. For example, at C<1>, every job that
169     is sent to a worker is sent to a completely idle worker that doesn't run
170     any other jobs. The downside is that throughput is reduced - a worker that
171     finishes a job needs to wait for a new job from the parent.
172    
173     The default of C<2> is usually a good compromise.
174    
175 root 1.3 =item start => $seconds (default: 0.1)
176 root 1.1
177 root 1.6 When there are fewer than C<idle> workers (or all workers are completely
178     busy), then a timer is started. If the timer elapses and there are still
179     jobs that cannot be queued to a worker, a new worker is started.
180    
181     This sets the minimum time that all workers must be busy before a new
182     worker is started. Or, put differently, the minimum delay between starting
183     new workers.
184    
185     The delay is small by default, which means new workers will be started
186     relatively quickly. A delay of C<0> is possible, and ensures that the pool
187     will grow as quickly as possible under load.
188    
189     Non-zero values are useful to avoid "exploding" a pool because a lot of
190     jobs are queued in an instant.
191    
192     Higher values are often useful to improve efficiency at the cost of
193     latency - when fewer processes can do the job over time, starting more and
194     more is not necessarily going to help.
195 root 1.1
196 root 1.6 =item stop => $seconds (default: 10)
197 root 1.1
198 root 1.2 When a worker has no jobs to execute it becomes idle. An idle worker that
199     hasn't executed a job within this amount of time will be stopped, unless
200     the other parameters say otherwise.
201    
202 root 1.6 Setting this to a very high value means that workers stay around longer,
203     even when they have nothing to do, which can be good as they don't have to
204     be started on the netx load spike again.
205    
206     Setting this to a lower value can be useful to avoid memory or simply
207     process table wastage.
208    
209     Usually, setting this to a time longer than the time between load spikes
210     is best - if you expect a lot of requests every minute and little work
211     in between, setting this to longer than a minute avoids having to stop
212     and start workers. On the other hand, you have to ask yourself if letting
213     workers run idle is a good use of your resources. Try to find a good
214     balance between resource usage of your workers and the time to start new
215     workers - the processes created by L<AnyEvent::Fork> itself is fats at
216     creating workers while not using much memory for them, so most of the
217     overhead is likely from your own code.
218    
219 root 1.2 =item on_destroy => $callback->() (default: none)
220    
221 root 1.6 When a pool object goes out of scope, the outstanding requests are still
222     handled till completion. Only after handling all jobs will the workers
223     be destroyed (and also the template process if it isn't referenced
224     otherwise).
225 root 1.2
226 root 1.6 To find out when a pool I<really> has finished its work, you can set this
227     callback, which will be called when the pool has been destroyed.
228 root 1.2
229     =back
230    
231     =item AnyEvent::Fork::RPC Parameters
232    
233 root 1.6 These parameters are all passed more or less directly to
234     L<AnyEvent::Fork::RPC>. They are only briefly mentioned here, for
235     their full documentation please refer to the L<AnyEvent::Fork::RPC>
236     documentation. Also, the default values mentioned here are only documented
237     as a best effort - the L<AnyEvent::Fork::RPC> documentation is binding.
238 root 1.2
239     =over 4
240 root 1.1
241     =item async => $boolean (default: 0)
242    
243 root 1.6 Whether to use the synchronous or asynchronous RPC backend.
244 root 1.1
245 root 1.2 =item on_error => $callback->($message) (default: die with message)
246 root 1.1
247 root 1.2 The callback to call on any (fatal) errors.
248 root 1.1
249 root 1.2 =item on_event => $callback->(...) (default: C<sub { }>, unlike L<AnyEvent::Fork::RPC>)
250 root 1.1
251 root 1.2 The callback to invoke on events.
252 root 1.1
253 root 1.2 =item init => $initfunction (default: none)
254 root 1.1
255 root 1.2 The function to call in the child, once before handling requests.
256 root 1.1
257 root 1.2 =item serialiser => $serialiser (defailt: $AnyEvent::Fork::RPC::STRING_SERIALISER)
258 root 1.1
259 root 1.2 The serialiser to use.
260 root 1.1
261 root 1.2 =back
262 root 1.1
263     =back
264    
265     =cut
266    
267 root 1.3 sub run {
268     my ($template, $function, %arg) = @_;
269    
270     my $max = $arg{max} || 4;
271     my $idle = $arg{idle} || 0,
272     my $load = $arg{load} || 2,
273     my $start = $arg{start} || 0.1,
274 root 1.6 my $stop = $arg{stop} || 10,
275 root 1.3 my $on_event = $arg{on_event} || sub { },
276     my $on_destroy = $arg{on_destroy};
277    
278     my @rpc = (
279 root 1.5 async => $arg{async},
280     init => $arg{init},
281     serialiser => delete $arg{serialiser},
282     on_error => $arg{on_error},
283 root 1.3 );
284    
285     my (@pool, @queue, $nidle, $start_w, $stop_w, $shutdown);
286 root 1.5 my ($start_worker, $stop_worker, $want_start, $want_stop, $scheduler);
287 root 1.3
288     my $destroy_guard = Guard::guard {
289     $on_destroy->()
290     if $on_destroy;
291     };
292 root 1.2
293 root 1.3 $template
294     ->require ("AnyEvent::Fork::RPC::" . ($arg{async} ? "Async" : "Sync"))
295 root 1.2 ->eval ('
296 root 1.3 my ($magic0, $magic1) = @_;
297 root 1.6 sub AnyEvent::Fork::Pool::retire() {
298     AnyEvent::Fork::RPC::event $magic0, "quit", $magic1;
299 root 1.2 }
300 root 1.3 ', $magic0, $magic1)
301 root 1.6 ;
302 root 1.3
303 root 1.5 $start_worker = sub {
304 root 1.3 my $proc = [0, 0, undef]; # load, index, rpc
305 root 1.2
306 root 1.3 $proc->[2] = $template
307     ->fork
308     ->AnyEvent::Fork::RPC::run ($function,
309     @rpc,
310     on_event => sub {
311     if (@_ == 3 && $_[0] eq $magic0 && $_[2] eq $magic1) {
312     $destroy_guard if 0; # keep it alive
313 root 1.2
314 root 1.5 $_[1] eq "quit" and $stop_worker->($proc);
315 root 1.3 return;
316 root 1.2 }
317    
318 root 1.3 &$on_event;
319     },
320     )
321     ;
322    
323     ++$nidle;
324 root 1.5 Array::Heap::push_heap_idx @pool, $proc;
325 root 1.3
326     Scalar::Util::weaken $proc;
327     };
328    
329 root 1.5 $stop_worker = sub {
330 root 1.3 my $proc = shift;
331    
332     $proc->[0]
333     or --$nidle;
334    
335     Array::Heap::splice_heap_idx @pool, $proc->[1]
336     if defined $proc->[1];
337 root 1.7
338     @$proc = 0; # tell others to leave it be
339 root 1.3 };
340    
341     $want_start = sub {
342     undef $stop_w;
343    
344 root 1.5 $start_w ||= AE::timer $start, $start, sub {
345     if (($nidle < $idle || @queue) && @pool < $max) {
346     $start_worker->();
347 root 1.3 $scheduler->();
348 root 1.5 } else {
349     undef $start_w;
350 root 1.3 }
351     };
352     };
353    
354     $want_stop = sub {
355 root 1.5 $stop_w ||= AE::timer $stop, $stop, sub {
356     $stop_worker->($pool[0])
357     if $nidle;
358 root 1.3
359 root 1.5 undef $stop_w
360     if $nidle <= $idle;
361 root 1.3 };
362     };
363    
364     $scheduler = sub {
365     if (@queue) {
366     while (@queue) {
367 root 1.7 @pool or $start_worker->();
368    
369 root 1.3 my $proc = $pool[0];
370    
371     if ($proc->[0] < $load) {
372 root 1.5 # found free worker, increase load
373     unless ($proc->[0]++) {
374     # worker became busy
375     --$nidle
376     or undef $stop_w;
377    
378     $want_start->()
379     if $nidle < $idle && @pool < $max;
380     }
381 root 1.3
382 root 1.5 Array::Heap::adjust_heap_idx @pool, 0;
383 root 1.3
384     my $job = shift @queue;
385     my $ocb = pop @$job;
386    
387     $proc->[2]->(@$job, sub {
388 root 1.5 # reduce load
389     --$proc->[0] # worker still busy?
390     or ++$nidle > $idle # not too many idle processes?
391 root 1.3 or $want_stop->();
392    
393 root 1.5 Array::Heap::adjust_heap_idx @pool, $proc->[1]
394     if defined $proc->[1];
395 root 1.3
396 root 1.7 &$ocb;
397    
398 root 1.3 $scheduler->();
399     });
400     } else {
401 root 1.5 $want_start->()
402     unless @pool >= $max;
403 root 1.3
404     last;
405     }
406     }
407     } elsif ($shutdown) {
408     @pool = ();
409     undef $start_w;
410 root 1.5 undef $start_worker; # frees $destroy_guard reference
411 root 1.3
412 root 1.5 $stop_worker->($pool[0])
413 root 1.3 while $nidle;
414     }
415     };
416    
417     my $shutdown_guard = Guard::guard {
418     $shutdown = 1;
419     $scheduler->();
420     };
421    
422 root 1.5 $start_worker->()
423 root 1.3 while @pool < $idle;
424    
425     sub {
426     $shutdown_guard if 0; # keep it alive
427 root 1.2
428 root 1.5 $start_worker->()
429 root 1.3 unless @pool;
430    
431     push @queue, [@_];
432     $scheduler->();
433     }
434 root 1.2 }
435    
436 root 1.5 =item $pool->(..., $cb->(...))
437 root 1.2
438     Call the RPC function of a worker with the given arguments, and when the
439 root 1.5 worker is done, call the C<$cb> with the results, just like calling the
440 root 1.6 RPC object durectly - see the L<AnyEvent::Fork::RPC> documentation for
441     details on the RPC API.
442 root 1.2
443 root 1.6 If there is no free worker, the call will be queued until a worker becomes
444     available.
445 root 1.2
446     Note that there can be considerable time between calling this method and
447     the call actually being executed. During this time, the parameters passed
448     to this function are effectively read-only - modifying them after the call
449     and before the callback is invoked causes undefined behaviour.
450    
451     =cut
452 root 1.1
453 root 1.9 =item $cpus = AnyEvent::Fork::Pool::ncpu [$default_cpus]
454    
455     =item ($cpus, $eus) = AnyEvent::Fork::Pool::ncpu [$default_cpus]
456    
457     Tries to detect the number of CPUs (C<$cpus> often called cpu cores
458     nowadays) and execution units (C<$eus>) which include e.g. extra
459     hyperthreaded units). When C<$cpus> cannot be determined reliably,
460     C<$default_cpus> is returned for both values, or C<1> if it is missing.
461    
462     For normal CPU bound uses, it is wise to have as many worker processes
463     as CPUs in the system (C<$cpus>), if nothing else uses the CPU. Using
464     hyperthreading is usually detrimental to performance, but in those rare
465     cases where that really helps it might be beneficial to use more workers
466     (C<$eus>).
467    
468     Currently, F</proc/cpuinfo> is parsed on GNU/Linux systems for both
469     C<$cpus> and C<$eu>, and on {Free,Net,Open}BSD, F<sysctl -n hw.ncpu> is
470     used for C<$cpus>.
471    
472     Example: create a worker pool with as many workers as cpu cores, or C<2>,
473     if the actual number could not be determined.
474    
475     $fork->AnyEvent::Fork::Pool::run ("myworker::function",
476     max => (scalar AnyEvent::Fork::Pool::ncpu 2),
477     );
478    
479     =cut
480    
481     BEGIN {
482     if ($^O eq "linux") {
483     *ncpu = sub(;$) {
484     my ($cpus, $eus);
485    
486     if (open my $fh, "<", "/proc/cpuinfo") {
487     my %id;
488    
489     while (<$fh>) {
490     if (/^core id\s*:\s*(\d+)/) {
491     ++$eus;
492     undef $id{$1};
493     }
494     }
495    
496     $cpus = scalar keys %id;
497     } else {
498     $cpus = $eus = @_ ? shift : 1;
499     }
500     wantarray ? ($cpus, $eus) : $cpus
501     };
502     } elsif ($^O eq "freebsd" || $^O eq "netbsd" || $^O eq "openbsd") {
503     *ncpu = sub(;$) {
504     my $cpus = qx<sysctl -n hw.ncpu> * 1
505     || (@_ ? shift : 1);
506     wantarray ? ($cpus, $cpus) : $cpus
507     };
508     } else {
509     *ncpu = sub(;$) {
510     my $cpus = @_ ? shift : 1;
511     wantarray ? ($cpus, $cpus) : $cpus
512     };
513     }
514     }
515    
516 root 1.1 =back
517    
518 root 1.6 =head1 CHILD USAGE
519    
520     In addition to the L<AnyEvent::Fork::RPC> API, this module implements one
521     more child-side function:
522    
523     =over 4
524    
525     =item AnyEvent::Fork::Pool::retire ()
526    
527     This function sends an event to the parent process to request retirement:
528     the worker is removed from the pool and no new jobs will be sent to it,
529     but it has to handle the jobs that are already queued.
530    
531     The parentheses are part of the syntax: the function usually isn't defined
532     when you compile your code (because that happens I<before> handing the
533     template process over to C<AnyEvent::Fork::Pool::run>, so you need the
534     empty parentheses to tell Perl that the function is indeed a function.
535    
536     Retiring a worker can be useful to gracefully shut it down when the worker
537     deems this useful. For example, after executing a job, one could check
538     the process size or the number of jobs handled so far, and if either is
539     too high, the worker could ask to get retired, to avoid memory leaks to
540     accumulate.
541    
542     =back
543    
544     =head1 POOL PARAMETERS RECIPES
545    
546     This section describes some recipes for pool paramaters. These are mostly
547     meant for the synchronous RPC backend, as the asynchronous RPC backend
548     changes the rules considerably, making workers themselves responsible for
549     their scheduling.
550    
551     =over 4
552    
553     =item low latency - set load = 1
554    
555     If you need a deterministic low latency, you should set the C<load>
556     parameter to C<1>. This ensures that never more than one job is sent to
557     each worker. This avoids having to wait for a previous job to finish.
558    
559     This makes most sense with the synchronous (default) backend, as the
560     asynchronous backend can handle multiple requests concurrently.
561    
562     =item lowest latency - set load = 1 and idle = max
563    
564     To achieve the lowest latency, you additionally should disable any dynamic
565     resizing of the pool by setting C<idle> to the same value as C<max>.
566    
567     =item high throughput, cpu bound jobs - set load >= 2, max = #cpus
568    
569     To get high throughput with cpu-bound jobs, you should set the maximum
570     pool size to the number of cpus in your system, and C<load> to at least
571     C<2>, to make sure there can be another job waiting for the worker when it
572     has finished one.
573    
574     The value of C<2> for C<load> is the minimum value that I<can> achieve
575     100% throughput, but if your parent process itself is sometimes busy, you
576     might need higher values. Also there is a limit on the amount of data that
577     can be "in flight" to the worker, so if you send big blobs of data to your
578     worker, C<load> might have much less of an effect.
579    
580     =item high throughput, I/O bound jobs - set load >= 2, max = 1, or very high
581    
582     When your jobs are I/O bound, using more workers usually boils down to
583     higher throughput, depending very much on your actual workload - sometimes
584     having only one worker is best, for example, when you read or write big
585     files at maixmum speed, as a second worker will increase seek times.
586    
587     =back
588    
589 root 1.7 =head1 EXCEPTIONS
590    
591     The same "policy" as with L<AnyEvent::Fork::RPC> applies - exceptins will
592     not be caught, and exceptions in both worker and in callbacks causes
593     undesirable or undefined behaviour.
594    
595 root 1.1 =head1 SEE ALSO
596    
597     L<AnyEvent::Fork>, to create the processes in the first place.
598    
599     L<AnyEvent::Fork::RPC>, which implements the RPC protocol and API.
600    
601     =head1 AUTHOR AND CONTACT INFORMATION
602    
603     Marc Lehmann <schmorp@schmorp.de>
604     http://software.schmorp.de/pkg/AnyEvent-Fork-Pool
605    
606     =cut
607    
608     1
609