ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-Fork-Pool/Pool.pm
Revision: 1.7
Committed: Sun Apr 21 12:03:02 2013 UTC (11 years ago) by root
Branch: MAIN
Changes since 1.6: +12 -2 lines
Log Message:
*** empty log message ***

File Contents

# Content
1 =head1 NAME
2
3 AnyEvent::Fork::Pool - simple process pool manager on top of AnyEvent::Fork
4
5 =head1 SYNOPSIS
6
7 use AnyEvent;
8 use AnyEvent::Fork::Pool;
9 # use AnyEvent::Fork is not needed
10
11 # all possible parameters shown, with default values
12 my $pool = AnyEvent::Fork
13 ->new
14 ->require ("MyWorker")
15 ->AnyEvent::Fork::Pool::run (
16 "MyWorker::run", # the worker function
17
18 # pool management
19 max => 4, # absolute maximum # of processes
20 idle => 0, # minimum # of idle processes
21 load => 2, # queue at most this number of jobs per process
22 start => 0.1, # wait this many seconds before starting a new process
23 stop => 10, # wait this many seconds before stopping an idle process
24 on_destroy => (my $finish = AE::cv), # called when object is destroyed
25
26 # parameters passed to AnyEvent::Fork::RPC
27 async => 0,
28 on_error => sub { die "FATAL: $_[0]\n" },
29 on_event => sub { my @ev = @_ },
30 init => "MyWorker::init",
31 serialiser => $AnyEvent::Fork::RPC::STRING_SERIALISER,
32 );
33
34 for (1..10) {
35 $pool->(doit => $_, sub {
36 print "MyWorker::run returned @_\n";
37 });
38 }
39
40 undef $pool;
41
42 $finish->recv;
43
44 =head1 DESCRIPTION
45
46 This module uses processes created via L<AnyEvent::Fork> and the RPC
47 protocol implement in L<AnyEvent::Fork::RPC> to create a load-balanced
48 pool of processes that handles jobs.
49
50 Understanding of L<AnyEvent::Fork> is helpful but not critical to be able
51 to use this module, but a thorough understanding of L<AnyEvent::Fork::RPC>
52 is, as it defines the actual API that needs to be implemented in the
53 worker processes.
54
55 =head1 EXAMPLES
56
57 =head1 PARENT USAGE
58
59 To create a pool, you first have to create a L<AnyEvent::Fork> object -
60 this object becomes your template process. Whenever a new worker process
61 is needed, it is forked from this template process. Then you need to
62 "hand off" this template process to the C<AnyEvent::Fork::Pool> module by
63 calling its run method on it:
64
65 my $template = AnyEvent::Fork
66 ->new
67 ->require ("SomeModule", "MyWorkerModule");
68
69 my $pool = $template->AnyEvent::Fork::Pool::run ("MyWorkerModule::myfunction");
70
71 The pool "object" is not a regular Perl object, but a code reference that
72 you can call and that works roughly like calling the worker function
73 directly, except that it returns nothing but instead you need to specify a
74 callback to be invoked once results are in:
75
76 $pool->(1, 2, 3, sub { warn "myfunction(1,2,3) returned @_" });
77
78 =over 4
79
80 =cut
81
82 package AnyEvent::Fork::Pool;
83
84 use common::sense;
85
86 use Scalar::Util ();
87
88 use Guard ();
89 use Array::Heap ();
90
91 use AnyEvent;
92 use AnyEvent::Fork; # we don't actually depend on it, this is for convenience
93 use AnyEvent::Fork::RPC;
94
95 # these are used for the first and last argument of events
96 # in the hope of not colliding. yes, I don't like it either,
97 # but didn't come up with an obviously better alternative.
98 my $magic0 = ':t6Z@HK1N%Dx@_7?=~-7NQgWDdAs6a,jFN=wLO0*jD*1%P';
99 my $magic1 = '<~53rexz.U`!]X[A235^"fyEoiTF\T~oH1l/N6+Djep9b~bI9`\1x%B~vWO1q*';
100
101 our $VERSION = 0.1;
102
103 =item my $pool = AnyEvent::Fork::Pool::run $fork, $function, [key => value...]
104
105 The traditional way to call the pool creation function. But it is way
106 cooler to call it in the following way:
107
108 =item my $pool = $fork->AnyEvent::Fork::Pool::run ($function, [key => value...])
109
110 Creates a new pool object with the specified C<$function> as function
111 (name) to call for each request. The pool uses the C<$fork> object as the
112 template when creating worker processes.
113
114 You can supply your own template process, or tell C<AnyEvent::Fork::Pool>
115 to create one.
116
117 A relatively large number of key/value pairs can be specified to influence
118 the behaviour. They are grouped into the categories "pool management",
119 "template process" and "rpc parameters".
120
121 =over 4
122
123 =item Pool Management
124
125 The pool consists of a certain number of worker processes. These options
126 decide how many of these processes exist and when they are started and
127 stopped.
128
129 The worker pool is dynamically resized, according to (perceived :)
130 load. The minimum size is given by the C<idle> parameter and the maximum
131 size is given by the C<max> parameter. A new worker is started every
132 C<start> seconds at most, and an idle worker is stopped at most every
133 C<stop> second.
134
135 You can specify the amount of jobs sent to a worker concurrently using the
136 C<load> parameter.
137
138 =over 4
139
140 =item idle => $count (default: 0)
141
142 The minimum amount of idle processes in the pool - when there are fewer
143 than this many idle workers, C<AnyEvent::Fork::Pool> will try to start new
144 ones, subject to the limits set by C<max> and C<start>.
145
146 This is also the initial amount of workers in the pool. The default of
147 zero means that the pool starts empty and can shrink back to zero workers
148 over time.
149
150 =item max => $count (default: 4)
151
152 The maximum number of processes in the pool, in addition to the template
153 process. C<AnyEvent::Fork::Pool> will never have more than this number of
154 worker processes, although there can be more temporarily when a worker is
155 shut down and hasn't exited yet.
156
157 =item load => $count (default: 2)
158
159 The maximum number of concurrent jobs sent to a single worker process.
160
161 Jobs that cannot be sent to a worker immediately (because all workers are
162 busy) will be queued until a worker is available.
163
164 Setting this low improves latency. For example, at C<1>, every job that
165 is sent to a worker is sent to a completely idle worker that doesn't run
166 any other jobs. The downside is that throughput is reduced - a worker that
167 finishes a job needs to wait for a new job from the parent.
168
169 The default of C<2> is usually a good compromise.
170
171 =item start => $seconds (default: 0.1)
172
173 When there are fewer than C<idle> workers (or all workers are completely
174 busy), then a timer is started. If the timer elapses and there are still
175 jobs that cannot be queued to a worker, a new worker is started.
176
177 This sets the minimum time that all workers must be busy before a new
178 worker is started. Or, put differently, the minimum delay between starting
179 new workers.
180
181 The delay is small by default, which means new workers will be started
182 relatively quickly. A delay of C<0> is possible, and ensures that the pool
183 will grow as quickly as possible under load.
184
185 Non-zero values are useful to avoid "exploding" a pool because a lot of
186 jobs are queued in an instant.
187
188 Higher values are often useful to improve efficiency at the cost of
189 latency - when fewer processes can do the job over time, starting more and
190 more is not necessarily going to help.
191
192 =item stop => $seconds (default: 10)
193
194 When a worker has no jobs to execute it becomes idle. An idle worker that
195 hasn't executed a job within this amount of time will be stopped, unless
196 the other parameters say otherwise.
197
198 Setting this to a very high value means that workers stay around longer,
199 even when they have nothing to do, which can be good as they don't have to
200 be started on the netx load spike again.
201
202 Setting this to a lower value can be useful to avoid memory or simply
203 process table wastage.
204
205 Usually, setting this to a time longer than the time between load spikes
206 is best - if you expect a lot of requests every minute and little work
207 in between, setting this to longer than a minute avoids having to stop
208 and start workers. On the other hand, you have to ask yourself if letting
209 workers run idle is a good use of your resources. Try to find a good
210 balance between resource usage of your workers and the time to start new
211 workers - the processes created by L<AnyEvent::Fork> itself is fats at
212 creating workers while not using much memory for them, so most of the
213 overhead is likely from your own code.
214
215 =item on_destroy => $callback->() (default: none)
216
217 When a pool object goes out of scope, the outstanding requests are still
218 handled till completion. Only after handling all jobs will the workers
219 be destroyed (and also the template process if it isn't referenced
220 otherwise).
221
222 To find out when a pool I<really> has finished its work, you can set this
223 callback, which will be called when the pool has been destroyed.
224
225 =back
226
227 =item AnyEvent::Fork::RPC Parameters
228
229 These parameters are all passed more or less directly to
230 L<AnyEvent::Fork::RPC>. They are only briefly mentioned here, for
231 their full documentation please refer to the L<AnyEvent::Fork::RPC>
232 documentation. Also, the default values mentioned here are only documented
233 as a best effort - the L<AnyEvent::Fork::RPC> documentation is binding.
234
235 =over 4
236
237 =item async => $boolean (default: 0)
238
239 Whether to use the synchronous or asynchronous RPC backend.
240
241 =item on_error => $callback->($message) (default: die with message)
242
243 The callback to call on any (fatal) errors.
244
245 =item on_event => $callback->(...) (default: C<sub { }>, unlike L<AnyEvent::Fork::RPC>)
246
247 The callback to invoke on events.
248
249 =item init => $initfunction (default: none)
250
251 The function to call in the child, once before handling requests.
252
253 =item serialiser => $serialiser (defailt: $AnyEvent::Fork::RPC::STRING_SERIALISER)
254
255 The serialiser to use.
256
257 =back
258
259 =back
260
261 =cut
262
263 sub run {
264 my ($template, $function, %arg) = @_;
265
266 my $max = $arg{max} || 4;
267 my $idle = $arg{idle} || 0,
268 my $load = $arg{load} || 2,
269 my $start = $arg{start} || 0.1,
270 my $stop = $arg{stop} || 10,
271 my $on_event = $arg{on_event} || sub { },
272 my $on_destroy = $arg{on_destroy};
273
274 my @rpc = (
275 async => $arg{async},
276 init => $arg{init},
277 serialiser => delete $arg{serialiser},
278 on_error => $arg{on_error},
279 );
280
281 my (@pool, @queue, $nidle, $start_w, $stop_w, $shutdown);
282 my ($start_worker, $stop_worker, $want_start, $want_stop, $scheduler);
283
284 my $destroy_guard = Guard::guard {
285 $on_destroy->()
286 if $on_destroy;
287 };
288
289 $template
290 ->require ("AnyEvent::Fork::RPC::" . ($arg{async} ? "Async" : "Sync"))
291 ->eval ('
292 my ($magic0, $magic1) = @_;
293 sub AnyEvent::Fork::Pool::retire() {
294 AnyEvent::Fork::RPC::event $magic0, "quit", $magic1;
295 }
296 ', $magic0, $magic1)
297 ;
298
299 $start_worker = sub {
300 my $proc = [0, 0, undef]; # load, index, rpc
301
302 $proc->[2] = $template
303 ->fork
304 ->AnyEvent::Fork::RPC::run ($function,
305 @rpc,
306 on_event => sub {
307 if (@_ == 3 && $_[0] eq $magic0 && $_[2] eq $magic1) {
308 $destroy_guard if 0; # keep it alive
309
310 $_[1] eq "quit" and $stop_worker->($proc);
311 return;
312 }
313
314 &$on_event;
315 },
316 )
317 ;
318
319 ++$nidle;
320 Array::Heap::push_heap_idx @pool, $proc;
321
322 Scalar::Util::weaken $proc;
323 };
324
325 $stop_worker = sub {
326 my $proc = shift;
327
328 $proc->[0]
329 or --$nidle;
330
331 Array::Heap::splice_heap_idx @pool, $proc->[1]
332 if defined $proc->[1];
333
334 @$proc = 0; # tell others to leave it be
335 };
336
337 $want_start = sub {
338 undef $stop_w;
339
340 $start_w ||= AE::timer $start, $start, sub {
341 if (($nidle < $idle || @queue) && @pool < $max) {
342 $start_worker->();
343 $scheduler->();
344 } else {
345 undef $start_w;
346 }
347 };
348 };
349
350 $want_stop = sub {
351 $stop_w ||= AE::timer $stop, $stop, sub {
352 $stop_worker->($pool[0])
353 if $nidle;
354
355 undef $stop_w
356 if $nidle <= $idle;
357 };
358 };
359
360 $scheduler = sub {
361 if (@queue) {
362 while (@queue) {
363 @pool or $start_worker->();
364
365 my $proc = $pool[0];
366
367 if ($proc->[0] < $load) {
368 # found free worker, increase load
369 unless ($proc->[0]++) {
370 # worker became busy
371 --$nidle
372 or undef $stop_w;
373
374 $want_start->()
375 if $nidle < $idle && @pool < $max;
376 }
377
378 Array::Heap::adjust_heap_idx @pool, 0;
379
380 my $job = shift @queue;
381 my $ocb = pop @$job;
382
383 $proc->[2]->(@$job, sub {
384 # reduce load
385 --$proc->[0] # worker still busy?
386 or ++$nidle > $idle # not too many idle processes?
387 or $want_stop->();
388
389 Array::Heap::adjust_heap_idx @pool, $proc->[1]
390 if defined $proc->[1];
391
392 &$ocb;
393
394 $scheduler->();
395 });
396 } else {
397 $want_start->()
398 unless @pool >= $max;
399
400 last;
401 }
402 }
403 } elsif ($shutdown) {
404 @pool = ();
405 undef $start_w;
406 undef $start_worker; # frees $destroy_guard reference
407
408 $stop_worker->($pool[0])
409 while $nidle;
410 }
411 };
412
413 my $shutdown_guard = Guard::guard {
414 $shutdown = 1;
415 $scheduler->();
416 };
417
418 $start_worker->()
419 while @pool < $idle;
420
421 sub {
422 $shutdown_guard if 0; # keep it alive
423
424 $start_worker->()
425 unless @pool;
426
427 push @queue, [@_];
428 $scheduler->();
429 }
430 }
431
432 =item $pool->(..., $cb->(...))
433
434 Call the RPC function of a worker with the given arguments, and when the
435 worker is done, call the C<$cb> with the results, just like calling the
436 RPC object durectly - see the L<AnyEvent::Fork::RPC> documentation for
437 details on the RPC API.
438
439 If there is no free worker, the call will be queued until a worker becomes
440 available.
441
442 Note that there can be considerable time between calling this method and
443 the call actually being executed. During this time, the parameters passed
444 to this function are effectively read-only - modifying them after the call
445 and before the callback is invoked causes undefined behaviour.
446
447 =cut
448
449 =back
450
451 =head1 CHILD USAGE
452
453 In addition to the L<AnyEvent::Fork::RPC> API, this module implements one
454 more child-side function:
455
456 =over 4
457
458 =item AnyEvent::Fork::Pool::retire ()
459
460 This function sends an event to the parent process to request retirement:
461 the worker is removed from the pool and no new jobs will be sent to it,
462 but it has to handle the jobs that are already queued.
463
464 The parentheses are part of the syntax: the function usually isn't defined
465 when you compile your code (because that happens I<before> handing the
466 template process over to C<AnyEvent::Fork::Pool::run>, so you need the
467 empty parentheses to tell Perl that the function is indeed a function.
468
469 Retiring a worker can be useful to gracefully shut it down when the worker
470 deems this useful. For example, after executing a job, one could check
471 the process size or the number of jobs handled so far, and if either is
472 too high, the worker could ask to get retired, to avoid memory leaks to
473 accumulate.
474
475 =back
476
477 =head1 POOL PARAMETERS RECIPES
478
479 This section describes some recipes for pool paramaters. These are mostly
480 meant for the synchronous RPC backend, as the asynchronous RPC backend
481 changes the rules considerably, making workers themselves responsible for
482 their scheduling.
483
484 =over 4
485
486 =item low latency - set load = 1
487
488 If you need a deterministic low latency, you should set the C<load>
489 parameter to C<1>. This ensures that never more than one job is sent to
490 each worker. This avoids having to wait for a previous job to finish.
491
492 This makes most sense with the synchronous (default) backend, as the
493 asynchronous backend can handle multiple requests concurrently.
494
495 =item lowest latency - set load = 1 and idle = max
496
497 To achieve the lowest latency, you additionally should disable any dynamic
498 resizing of the pool by setting C<idle> to the same value as C<max>.
499
500 =item high throughput, cpu bound jobs - set load >= 2, max = #cpus
501
502 To get high throughput with cpu-bound jobs, you should set the maximum
503 pool size to the number of cpus in your system, and C<load> to at least
504 C<2>, to make sure there can be another job waiting for the worker when it
505 has finished one.
506
507 The value of C<2> for C<load> is the minimum value that I<can> achieve
508 100% throughput, but if your parent process itself is sometimes busy, you
509 might need higher values. Also there is a limit on the amount of data that
510 can be "in flight" to the worker, so if you send big blobs of data to your
511 worker, C<load> might have much less of an effect.
512
513 =item high throughput, I/O bound jobs - set load >= 2, max = 1, or very high
514
515 When your jobs are I/O bound, using more workers usually boils down to
516 higher throughput, depending very much on your actual workload - sometimes
517 having only one worker is best, for example, when you read or write big
518 files at maixmum speed, as a second worker will increase seek times.
519
520 =back
521
522 =head1 EXCEPTIONS
523
524 The same "policy" as with L<AnyEvent::Fork::RPC> applies - exceptins will
525 not be caught, and exceptions in both worker and in callbacks causes
526 undesirable or undefined behaviour.
527
528 =head1 SEE ALSO
529
530 L<AnyEvent::Fork>, to create the processes in the first place.
531
532 L<AnyEvent::Fork::RPC>, which implements the RPC protocol and API.
533
534 =head1 AUTHOR AND CONTACT INFORMATION
535
536 Marc Lehmann <schmorp@schmorp.de>
537 http://software.schmorp.de/pkg/AnyEvent-Fork-Pool
538
539 =cut
540
541 1
542