ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-Fork-Pool/Pool.pm
Revision: 1.8
Committed: Sun Apr 21 12:28:34 2013 UTC (11 years ago) by root
Branch: MAIN
CVS Tags: rel-0_1
Changes since 1.7: +2 -0 lines
Log Message:
0.1

File Contents

# Content
1 =head1 NAME
2
3 AnyEvent::Fork::Pool - simple process pool manager on top of AnyEvent::Fork
4
5 THE API IS NOT FINISHED, CONSIDER THIS AN ALPHA RELEASE
6
7 =head1 SYNOPSIS
8
9 use AnyEvent;
10 use AnyEvent::Fork::Pool;
11 # use AnyEvent::Fork is not needed
12
13 # all possible parameters shown, with default values
14 my $pool = AnyEvent::Fork
15 ->new
16 ->require ("MyWorker")
17 ->AnyEvent::Fork::Pool::run (
18 "MyWorker::run", # the worker function
19
20 # pool management
21 max => 4, # absolute maximum # of processes
22 idle => 0, # minimum # of idle processes
23 load => 2, # queue at most this number of jobs per process
24 start => 0.1, # wait this many seconds before starting a new process
25 stop => 10, # wait this many seconds before stopping an idle process
26 on_destroy => (my $finish = AE::cv), # called when object is destroyed
27
28 # parameters passed to AnyEvent::Fork::RPC
29 async => 0,
30 on_error => sub { die "FATAL: $_[0]\n" },
31 on_event => sub { my @ev = @_ },
32 init => "MyWorker::init",
33 serialiser => $AnyEvent::Fork::RPC::STRING_SERIALISER,
34 );
35
36 for (1..10) {
37 $pool->(doit => $_, sub {
38 print "MyWorker::run returned @_\n";
39 });
40 }
41
42 undef $pool;
43
44 $finish->recv;
45
46 =head1 DESCRIPTION
47
48 This module uses processes created via L<AnyEvent::Fork> and the RPC
49 protocol implement in L<AnyEvent::Fork::RPC> to create a load-balanced
50 pool of processes that handles jobs.
51
52 Understanding of L<AnyEvent::Fork> is helpful but not critical to be able
53 to use this module, but a thorough understanding of L<AnyEvent::Fork::RPC>
54 is, as it defines the actual API that needs to be implemented in the
55 worker processes.
56
57 =head1 EXAMPLES
58
59 =head1 PARENT USAGE
60
61 To create a pool, you first have to create a L<AnyEvent::Fork> object -
62 this object becomes your template process. Whenever a new worker process
63 is needed, it is forked from this template process. Then you need to
64 "hand off" this template process to the C<AnyEvent::Fork::Pool> module by
65 calling its run method on it:
66
67 my $template = AnyEvent::Fork
68 ->new
69 ->require ("SomeModule", "MyWorkerModule");
70
71 my $pool = $template->AnyEvent::Fork::Pool::run ("MyWorkerModule::myfunction");
72
73 The pool "object" is not a regular Perl object, but a code reference that
74 you can call and that works roughly like calling the worker function
75 directly, except that it returns nothing but instead you need to specify a
76 callback to be invoked once results are in:
77
78 $pool->(1, 2, 3, sub { warn "myfunction(1,2,3) returned @_" });
79
80 =over 4
81
82 =cut
83
84 package AnyEvent::Fork::Pool;
85
86 use common::sense;
87
88 use Scalar::Util ();
89
90 use Guard ();
91 use Array::Heap ();
92
93 use AnyEvent;
94 use AnyEvent::Fork; # we don't actually depend on it, this is for convenience
95 use AnyEvent::Fork::RPC;
96
97 # these are used for the first and last argument of events
98 # in the hope of not colliding. yes, I don't like it either,
99 # but didn't come up with an obviously better alternative.
100 my $magic0 = ':t6Z@HK1N%Dx@_7?=~-7NQgWDdAs6a,jFN=wLO0*jD*1%P';
101 my $magic1 = '<~53rexz.U`!]X[A235^"fyEoiTF\T~oH1l/N6+Djep9b~bI9`\1x%B~vWO1q*';
102
103 our $VERSION = 0.1;
104
105 =item my $pool = AnyEvent::Fork::Pool::run $fork, $function, [key => value...]
106
107 The traditional way to call the pool creation function. But it is way
108 cooler to call it in the following way:
109
110 =item my $pool = $fork->AnyEvent::Fork::Pool::run ($function, [key => value...])
111
112 Creates a new pool object with the specified C<$function> as function
113 (name) to call for each request. The pool uses the C<$fork> object as the
114 template when creating worker processes.
115
116 You can supply your own template process, or tell C<AnyEvent::Fork::Pool>
117 to create one.
118
119 A relatively large number of key/value pairs can be specified to influence
120 the behaviour. They are grouped into the categories "pool management",
121 "template process" and "rpc parameters".
122
123 =over 4
124
125 =item Pool Management
126
127 The pool consists of a certain number of worker processes. These options
128 decide how many of these processes exist and when they are started and
129 stopped.
130
131 The worker pool is dynamically resized, according to (perceived :)
132 load. The minimum size is given by the C<idle> parameter and the maximum
133 size is given by the C<max> parameter. A new worker is started every
134 C<start> seconds at most, and an idle worker is stopped at most every
135 C<stop> second.
136
137 You can specify the amount of jobs sent to a worker concurrently using the
138 C<load> parameter.
139
140 =over 4
141
142 =item idle => $count (default: 0)
143
144 The minimum amount of idle processes in the pool - when there are fewer
145 than this many idle workers, C<AnyEvent::Fork::Pool> will try to start new
146 ones, subject to the limits set by C<max> and C<start>.
147
148 This is also the initial amount of workers in the pool. The default of
149 zero means that the pool starts empty and can shrink back to zero workers
150 over time.
151
152 =item max => $count (default: 4)
153
154 The maximum number of processes in the pool, in addition to the template
155 process. C<AnyEvent::Fork::Pool> will never have more than this number of
156 worker processes, although there can be more temporarily when a worker is
157 shut down and hasn't exited yet.
158
159 =item load => $count (default: 2)
160
161 The maximum number of concurrent jobs sent to a single worker process.
162
163 Jobs that cannot be sent to a worker immediately (because all workers are
164 busy) will be queued until a worker is available.
165
166 Setting this low improves latency. For example, at C<1>, every job that
167 is sent to a worker is sent to a completely idle worker that doesn't run
168 any other jobs. The downside is that throughput is reduced - a worker that
169 finishes a job needs to wait for a new job from the parent.
170
171 The default of C<2> is usually a good compromise.
172
173 =item start => $seconds (default: 0.1)
174
175 When there are fewer than C<idle> workers (or all workers are completely
176 busy), then a timer is started. If the timer elapses and there are still
177 jobs that cannot be queued to a worker, a new worker is started.
178
179 This sets the minimum time that all workers must be busy before a new
180 worker is started. Or, put differently, the minimum delay between starting
181 new workers.
182
183 The delay is small by default, which means new workers will be started
184 relatively quickly. A delay of C<0> is possible, and ensures that the pool
185 will grow as quickly as possible under load.
186
187 Non-zero values are useful to avoid "exploding" a pool because a lot of
188 jobs are queued in an instant.
189
190 Higher values are often useful to improve efficiency at the cost of
191 latency - when fewer processes can do the job over time, starting more and
192 more is not necessarily going to help.
193
194 =item stop => $seconds (default: 10)
195
196 When a worker has no jobs to execute it becomes idle. An idle worker that
197 hasn't executed a job within this amount of time will be stopped, unless
198 the other parameters say otherwise.
199
200 Setting this to a very high value means that workers stay around longer,
201 even when they have nothing to do, which can be good as they don't have to
202 be started on the netx load spike again.
203
204 Setting this to a lower value can be useful to avoid memory or simply
205 process table wastage.
206
207 Usually, setting this to a time longer than the time between load spikes
208 is best - if you expect a lot of requests every minute and little work
209 in between, setting this to longer than a minute avoids having to stop
210 and start workers. On the other hand, you have to ask yourself if letting
211 workers run idle is a good use of your resources. Try to find a good
212 balance between resource usage of your workers and the time to start new
213 workers - the processes created by L<AnyEvent::Fork> itself is fats at
214 creating workers while not using much memory for them, so most of the
215 overhead is likely from your own code.
216
217 =item on_destroy => $callback->() (default: none)
218
219 When a pool object goes out of scope, the outstanding requests are still
220 handled till completion. Only after handling all jobs will the workers
221 be destroyed (and also the template process if it isn't referenced
222 otherwise).
223
224 To find out when a pool I<really> has finished its work, you can set this
225 callback, which will be called when the pool has been destroyed.
226
227 =back
228
229 =item AnyEvent::Fork::RPC Parameters
230
231 These parameters are all passed more or less directly to
232 L<AnyEvent::Fork::RPC>. They are only briefly mentioned here, for
233 their full documentation please refer to the L<AnyEvent::Fork::RPC>
234 documentation. Also, the default values mentioned here are only documented
235 as a best effort - the L<AnyEvent::Fork::RPC> documentation is binding.
236
237 =over 4
238
239 =item async => $boolean (default: 0)
240
241 Whether to use the synchronous or asynchronous RPC backend.
242
243 =item on_error => $callback->($message) (default: die with message)
244
245 The callback to call on any (fatal) errors.
246
247 =item on_event => $callback->(...) (default: C<sub { }>, unlike L<AnyEvent::Fork::RPC>)
248
249 The callback to invoke on events.
250
251 =item init => $initfunction (default: none)
252
253 The function to call in the child, once before handling requests.
254
255 =item serialiser => $serialiser (defailt: $AnyEvent::Fork::RPC::STRING_SERIALISER)
256
257 The serialiser to use.
258
259 =back
260
261 =back
262
263 =cut
264
265 sub run {
266 my ($template, $function, %arg) = @_;
267
268 my $max = $arg{max} || 4;
269 my $idle = $arg{idle} || 0,
270 my $load = $arg{load} || 2,
271 my $start = $arg{start} || 0.1,
272 my $stop = $arg{stop} || 10,
273 my $on_event = $arg{on_event} || sub { },
274 my $on_destroy = $arg{on_destroy};
275
276 my @rpc = (
277 async => $arg{async},
278 init => $arg{init},
279 serialiser => delete $arg{serialiser},
280 on_error => $arg{on_error},
281 );
282
283 my (@pool, @queue, $nidle, $start_w, $stop_w, $shutdown);
284 my ($start_worker, $stop_worker, $want_start, $want_stop, $scheduler);
285
286 my $destroy_guard = Guard::guard {
287 $on_destroy->()
288 if $on_destroy;
289 };
290
291 $template
292 ->require ("AnyEvent::Fork::RPC::" . ($arg{async} ? "Async" : "Sync"))
293 ->eval ('
294 my ($magic0, $magic1) = @_;
295 sub AnyEvent::Fork::Pool::retire() {
296 AnyEvent::Fork::RPC::event $magic0, "quit", $magic1;
297 }
298 ', $magic0, $magic1)
299 ;
300
301 $start_worker = sub {
302 my $proc = [0, 0, undef]; # load, index, rpc
303
304 $proc->[2] = $template
305 ->fork
306 ->AnyEvent::Fork::RPC::run ($function,
307 @rpc,
308 on_event => sub {
309 if (@_ == 3 && $_[0] eq $magic0 && $_[2] eq $magic1) {
310 $destroy_guard if 0; # keep it alive
311
312 $_[1] eq "quit" and $stop_worker->($proc);
313 return;
314 }
315
316 &$on_event;
317 },
318 )
319 ;
320
321 ++$nidle;
322 Array::Heap::push_heap_idx @pool, $proc;
323
324 Scalar::Util::weaken $proc;
325 };
326
327 $stop_worker = sub {
328 my $proc = shift;
329
330 $proc->[0]
331 or --$nidle;
332
333 Array::Heap::splice_heap_idx @pool, $proc->[1]
334 if defined $proc->[1];
335
336 @$proc = 0; # tell others to leave it be
337 };
338
339 $want_start = sub {
340 undef $stop_w;
341
342 $start_w ||= AE::timer $start, $start, sub {
343 if (($nidle < $idle || @queue) && @pool < $max) {
344 $start_worker->();
345 $scheduler->();
346 } else {
347 undef $start_w;
348 }
349 };
350 };
351
352 $want_stop = sub {
353 $stop_w ||= AE::timer $stop, $stop, sub {
354 $stop_worker->($pool[0])
355 if $nidle;
356
357 undef $stop_w
358 if $nidle <= $idle;
359 };
360 };
361
362 $scheduler = sub {
363 if (@queue) {
364 while (@queue) {
365 @pool or $start_worker->();
366
367 my $proc = $pool[0];
368
369 if ($proc->[0] < $load) {
370 # found free worker, increase load
371 unless ($proc->[0]++) {
372 # worker became busy
373 --$nidle
374 or undef $stop_w;
375
376 $want_start->()
377 if $nidle < $idle && @pool < $max;
378 }
379
380 Array::Heap::adjust_heap_idx @pool, 0;
381
382 my $job = shift @queue;
383 my $ocb = pop @$job;
384
385 $proc->[2]->(@$job, sub {
386 # reduce load
387 --$proc->[0] # worker still busy?
388 or ++$nidle > $idle # not too many idle processes?
389 or $want_stop->();
390
391 Array::Heap::adjust_heap_idx @pool, $proc->[1]
392 if defined $proc->[1];
393
394 &$ocb;
395
396 $scheduler->();
397 });
398 } else {
399 $want_start->()
400 unless @pool >= $max;
401
402 last;
403 }
404 }
405 } elsif ($shutdown) {
406 @pool = ();
407 undef $start_w;
408 undef $start_worker; # frees $destroy_guard reference
409
410 $stop_worker->($pool[0])
411 while $nidle;
412 }
413 };
414
415 my $shutdown_guard = Guard::guard {
416 $shutdown = 1;
417 $scheduler->();
418 };
419
420 $start_worker->()
421 while @pool < $idle;
422
423 sub {
424 $shutdown_guard if 0; # keep it alive
425
426 $start_worker->()
427 unless @pool;
428
429 push @queue, [@_];
430 $scheduler->();
431 }
432 }
433
434 =item $pool->(..., $cb->(...))
435
436 Call the RPC function of a worker with the given arguments, and when the
437 worker is done, call the C<$cb> with the results, just like calling the
438 RPC object durectly - see the L<AnyEvent::Fork::RPC> documentation for
439 details on the RPC API.
440
441 If there is no free worker, the call will be queued until a worker becomes
442 available.
443
444 Note that there can be considerable time between calling this method and
445 the call actually being executed. During this time, the parameters passed
446 to this function are effectively read-only - modifying them after the call
447 and before the callback is invoked causes undefined behaviour.
448
449 =cut
450
451 =back
452
453 =head1 CHILD USAGE
454
455 In addition to the L<AnyEvent::Fork::RPC> API, this module implements one
456 more child-side function:
457
458 =over 4
459
460 =item AnyEvent::Fork::Pool::retire ()
461
462 This function sends an event to the parent process to request retirement:
463 the worker is removed from the pool and no new jobs will be sent to it,
464 but it has to handle the jobs that are already queued.
465
466 The parentheses are part of the syntax: the function usually isn't defined
467 when you compile your code (because that happens I<before> handing the
468 template process over to C<AnyEvent::Fork::Pool::run>, so you need the
469 empty parentheses to tell Perl that the function is indeed a function.
470
471 Retiring a worker can be useful to gracefully shut it down when the worker
472 deems this useful. For example, after executing a job, one could check
473 the process size or the number of jobs handled so far, and if either is
474 too high, the worker could ask to get retired, to avoid memory leaks to
475 accumulate.
476
477 =back
478
479 =head1 POOL PARAMETERS RECIPES
480
481 This section describes some recipes for pool paramaters. These are mostly
482 meant for the synchronous RPC backend, as the asynchronous RPC backend
483 changes the rules considerably, making workers themselves responsible for
484 their scheduling.
485
486 =over 4
487
488 =item low latency - set load = 1
489
490 If you need a deterministic low latency, you should set the C<load>
491 parameter to C<1>. This ensures that never more than one job is sent to
492 each worker. This avoids having to wait for a previous job to finish.
493
494 This makes most sense with the synchronous (default) backend, as the
495 asynchronous backend can handle multiple requests concurrently.
496
497 =item lowest latency - set load = 1 and idle = max
498
499 To achieve the lowest latency, you additionally should disable any dynamic
500 resizing of the pool by setting C<idle> to the same value as C<max>.
501
502 =item high throughput, cpu bound jobs - set load >= 2, max = #cpus
503
504 To get high throughput with cpu-bound jobs, you should set the maximum
505 pool size to the number of cpus in your system, and C<load> to at least
506 C<2>, to make sure there can be another job waiting for the worker when it
507 has finished one.
508
509 The value of C<2> for C<load> is the minimum value that I<can> achieve
510 100% throughput, but if your parent process itself is sometimes busy, you
511 might need higher values. Also there is a limit on the amount of data that
512 can be "in flight" to the worker, so if you send big blobs of data to your
513 worker, C<load> might have much less of an effect.
514
515 =item high throughput, I/O bound jobs - set load >= 2, max = 1, or very high
516
517 When your jobs are I/O bound, using more workers usually boils down to
518 higher throughput, depending very much on your actual workload - sometimes
519 having only one worker is best, for example, when you read or write big
520 files at maixmum speed, as a second worker will increase seek times.
521
522 =back
523
524 =head1 EXCEPTIONS
525
526 The same "policy" as with L<AnyEvent::Fork::RPC> applies - exceptins will
527 not be caught, and exceptions in both worker and in callbacks causes
528 undesirable or undefined behaviour.
529
530 =head1 SEE ALSO
531
532 L<AnyEvent::Fork>, to create the processes in the first place.
533
534 L<AnyEvent::Fork::RPC>, which implements the RPC protocol and API.
535
536 =head1 AUTHOR AND CONTACT INFORMATION
537
538 Marc Lehmann <schmorp@schmorp.de>
539 http://software.schmorp.de/pkg/AnyEvent-Fork-Pool
540
541 =cut
542
543 1
544