ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-Fork-Pool/Pool.pm
Revision: 1.11
Committed: Sun Apr 28 14:19:22 2013 UTC (11 years ago) by root
Branch: MAIN
CVS Tags: rel-1_1
Changes since 1.10: +1 -1 lines
Log Message:
1.1

File Contents

# Content
1 =head1 NAME
2
3 AnyEvent::Fork::Pool - simple process pool manager on top of AnyEvent::Fork
4
5 THE API IS NOT FINISHED, CONSIDER THIS AN ALPHA RELEASE
6
7 =head1 SYNOPSIS
8
9 use AnyEvent;
10 use AnyEvent::Fork::Pool;
11 # use AnyEvent::Fork is not needed
12
13 # all possible parameters shown, with default values
14 my $pool = AnyEvent::Fork
15 ->new
16 ->require ("MyWorker")
17 ->AnyEvent::Fork::Pool::run (
18 "MyWorker::run", # the worker function
19
20 # pool management
21 max => 4, # absolute maximum # of processes
22 idle => 0, # minimum # of idle processes
23 load => 2, # queue at most this number of jobs per process
24 start => 0.1, # wait this many seconds before starting a new process
25 stop => 10, # wait this many seconds before stopping an idle process
26 on_destroy => (my $finish = AE::cv), # called when object is destroyed
27
28 # parameters passed to AnyEvent::Fork::RPC
29 async => 0,
30 on_error => sub { die "FATAL: $_[0]\n" },
31 on_event => sub { my @ev = @_ },
32 init => "MyWorker::init",
33 serialiser => $AnyEvent::Fork::RPC::STRING_SERIALISER,
34 );
35
36 for (1..10) {
37 $pool->(doit => $_, sub {
38 print "MyWorker::run returned @_\n";
39 });
40 }
41
42 undef $pool;
43
44 $finish->recv;
45
46 =head1 DESCRIPTION
47
48 This module uses processes created via L<AnyEvent::Fork> and the RPC
49 protocol implement in L<AnyEvent::Fork::RPC> to create a load-balanced
50 pool of processes that handles jobs.
51
52 Understanding of L<AnyEvent::Fork> is helpful but not critical to be able
53 to use this module, but a thorough understanding of L<AnyEvent::Fork::RPC>
54 is, as it defines the actual API that needs to be implemented in the
55 worker processes.
56
57 =head1 EXAMPLES
58
59 =head1 PARENT USAGE
60
61 To create a pool, you first have to create a L<AnyEvent::Fork> object -
62 this object becomes your template process. Whenever a new worker process
63 is needed, it is forked from this template process. Then you need to
64 "hand off" this template process to the C<AnyEvent::Fork::Pool> module by
65 calling its run method on it:
66
67 my $template = AnyEvent::Fork
68 ->new
69 ->require ("SomeModule", "MyWorkerModule");
70
71 my $pool = $template->AnyEvent::Fork::Pool::run ("MyWorkerModule::myfunction");
72
73 The pool "object" is not a regular Perl object, but a code reference that
74 you can call and that works roughly like calling the worker function
75 directly, except that it returns nothing but instead you need to specify a
76 callback to be invoked once results are in:
77
78 $pool->(1, 2, 3, sub { warn "myfunction(1,2,3) returned @_" });
79
80 =over 4
81
82 =cut
83
84 package AnyEvent::Fork::Pool;
85
86 use common::sense;
87
88 use Scalar::Util ();
89
90 use Guard ();
91 use Array::Heap ();
92
93 use AnyEvent;
94 # explicit version on next line, as some cpan-testers test with the 0.1 version,
95 # ignoring dependencies, and this line will at least give a clear indication of that.
96 use AnyEvent::Fork 0.6; # we don't actually depend on it, this is for convenience
97 use AnyEvent::Fork::RPC;
98
99 # these are used for the first and last argument of events
100 # in the hope of not colliding. yes, I don't like it either,
101 # but didn't come up with an obviously better alternative.
102 my $magic0 = ':t6Z@HK1N%Dx@_7?=~-7NQgWDdAs6a,jFN=wLO0*jD*1%P';
103 my $magic1 = '<~53rexz.U`!]X[A235^"fyEoiTF\T~oH1l/N6+Djep9b~bI9`\1x%B~vWO1q*';
104
105 our $VERSION = 1.1;
106
107 =item my $pool = AnyEvent::Fork::Pool::run $fork, $function, [key => value...]
108
109 The traditional way to call the pool creation function. But it is way
110 cooler to call it in the following way:
111
112 =item my $pool = $fork->AnyEvent::Fork::Pool::run ($function, [key => value...])
113
114 Creates a new pool object with the specified C<$function> as function
115 (name) to call for each request. The pool uses the C<$fork> object as the
116 template when creating worker processes.
117
118 You can supply your own template process, or tell C<AnyEvent::Fork::Pool>
119 to create one.
120
121 A relatively large number of key/value pairs can be specified to influence
122 the behaviour. They are grouped into the categories "pool management",
123 "template process" and "rpc parameters".
124
125 =over 4
126
127 =item Pool Management
128
129 The pool consists of a certain number of worker processes. These options
130 decide how many of these processes exist and when they are started and
131 stopped.
132
133 The worker pool is dynamically resized, according to (perceived :)
134 load. The minimum size is given by the C<idle> parameter and the maximum
135 size is given by the C<max> parameter. A new worker is started every
136 C<start> seconds at most, and an idle worker is stopped at most every
137 C<stop> second.
138
139 You can specify the amount of jobs sent to a worker concurrently using the
140 C<load> parameter.
141
142 =over 4
143
144 =item idle => $count (default: 0)
145
146 The minimum amount of idle processes in the pool - when there are fewer
147 than this many idle workers, C<AnyEvent::Fork::Pool> will try to start new
148 ones, subject to the limits set by C<max> and C<start>.
149
150 This is also the initial amount of workers in the pool. The default of
151 zero means that the pool starts empty and can shrink back to zero workers
152 over time.
153
154 =item max => $count (default: 4)
155
156 The maximum number of processes in the pool, in addition to the template
157 process. C<AnyEvent::Fork::Pool> will never have more than this number of
158 worker processes, although there can be more temporarily when a worker is
159 shut down and hasn't exited yet.
160
161 =item load => $count (default: 2)
162
163 The maximum number of concurrent jobs sent to a single worker process.
164
165 Jobs that cannot be sent to a worker immediately (because all workers are
166 busy) will be queued until a worker is available.
167
168 Setting this low improves latency. For example, at C<1>, every job that
169 is sent to a worker is sent to a completely idle worker that doesn't run
170 any other jobs. The downside is that throughput is reduced - a worker that
171 finishes a job needs to wait for a new job from the parent.
172
173 The default of C<2> is usually a good compromise.
174
175 =item start => $seconds (default: 0.1)
176
177 When there are fewer than C<idle> workers (or all workers are completely
178 busy), then a timer is started. If the timer elapses and there are still
179 jobs that cannot be queued to a worker, a new worker is started.
180
181 This sets the minimum time that all workers must be busy before a new
182 worker is started. Or, put differently, the minimum delay between starting
183 new workers.
184
185 The delay is small by default, which means new workers will be started
186 relatively quickly. A delay of C<0> is possible, and ensures that the pool
187 will grow as quickly as possible under load.
188
189 Non-zero values are useful to avoid "exploding" a pool because a lot of
190 jobs are queued in an instant.
191
192 Higher values are often useful to improve efficiency at the cost of
193 latency - when fewer processes can do the job over time, starting more and
194 more is not necessarily going to help.
195
196 =item stop => $seconds (default: 10)
197
198 When a worker has no jobs to execute it becomes idle. An idle worker that
199 hasn't executed a job within this amount of time will be stopped, unless
200 the other parameters say otherwise.
201
202 Setting this to a very high value means that workers stay around longer,
203 even when they have nothing to do, which can be good as they don't have to
204 be started on the netx load spike again.
205
206 Setting this to a lower value can be useful to avoid memory or simply
207 process table wastage.
208
209 Usually, setting this to a time longer than the time between load spikes
210 is best - if you expect a lot of requests every minute and little work
211 in between, setting this to longer than a minute avoids having to stop
212 and start workers. On the other hand, you have to ask yourself if letting
213 workers run idle is a good use of your resources. Try to find a good
214 balance between resource usage of your workers and the time to start new
215 workers - the processes created by L<AnyEvent::Fork> itself is fats at
216 creating workers while not using much memory for them, so most of the
217 overhead is likely from your own code.
218
219 =item on_destroy => $callback->() (default: none)
220
221 When a pool object goes out of scope, the outstanding requests are still
222 handled till completion. Only after handling all jobs will the workers
223 be destroyed (and also the template process if it isn't referenced
224 otherwise).
225
226 To find out when a pool I<really> has finished its work, you can set this
227 callback, which will be called when the pool has been destroyed.
228
229 =back
230
231 =item AnyEvent::Fork::RPC Parameters
232
233 These parameters are all passed more or less directly to
234 L<AnyEvent::Fork::RPC>. They are only briefly mentioned here, for
235 their full documentation please refer to the L<AnyEvent::Fork::RPC>
236 documentation. Also, the default values mentioned here are only documented
237 as a best effort - the L<AnyEvent::Fork::RPC> documentation is binding.
238
239 =over 4
240
241 =item async => $boolean (default: 0)
242
243 Whether to use the synchronous or asynchronous RPC backend.
244
245 =item on_error => $callback->($message) (default: die with message)
246
247 The callback to call on any (fatal) errors.
248
249 =item on_event => $callback->(...) (default: C<sub { }>, unlike L<AnyEvent::Fork::RPC>)
250
251 The callback to invoke on events.
252
253 =item init => $initfunction (default: none)
254
255 The function to call in the child, once before handling requests.
256
257 =item serialiser => $serialiser (defailt: $AnyEvent::Fork::RPC::STRING_SERIALISER)
258
259 The serialiser to use.
260
261 =back
262
263 =back
264
265 =cut
266
267 sub run {
268 my ($template, $function, %arg) = @_;
269
270 my $max = $arg{max} || 4;
271 my $idle = $arg{idle} || 0,
272 my $load = $arg{load} || 2,
273 my $start = $arg{start} || 0.1,
274 my $stop = $arg{stop} || 10,
275 my $on_event = $arg{on_event} || sub { },
276 my $on_destroy = $arg{on_destroy};
277
278 my @rpc = (
279 async => $arg{async},
280 init => $arg{init},
281 serialiser => delete $arg{serialiser},
282 on_error => $arg{on_error},
283 );
284
285 my (@pool, @queue, $nidle, $start_w, $stop_w, $shutdown);
286 my ($start_worker, $stop_worker, $want_start, $want_stop, $scheduler);
287
288 my $destroy_guard = Guard::guard {
289 $on_destroy->()
290 if $on_destroy;
291 };
292
293 $template
294 ->require ("AnyEvent::Fork::RPC::" . ($arg{async} ? "Async" : "Sync"))
295 ->eval ('
296 my ($magic0, $magic1) = @_;
297 sub AnyEvent::Fork::Pool::retire() {
298 AnyEvent::Fork::RPC::event $magic0, "quit", $magic1;
299 }
300 ', $magic0, $magic1)
301 ;
302
303 $start_worker = sub {
304 my $proc = [0, 0, undef]; # load, index, rpc
305
306 $proc->[2] = $template
307 ->fork
308 ->AnyEvent::Fork::RPC::run ($function,
309 @rpc,
310 on_event => sub {
311 if (@_ == 3 && $_[0] eq $magic0 && $_[2] eq $magic1) {
312 $destroy_guard if 0; # keep it alive
313
314 $_[1] eq "quit" and $stop_worker->($proc);
315 return;
316 }
317
318 &$on_event;
319 },
320 )
321 ;
322
323 ++$nidle;
324 Array::Heap::push_heap_idx @pool, $proc;
325
326 Scalar::Util::weaken $proc;
327 };
328
329 $stop_worker = sub {
330 my $proc = shift;
331
332 $proc->[0]
333 or --$nidle;
334
335 Array::Heap::splice_heap_idx @pool, $proc->[1]
336 if defined $proc->[1];
337
338 @$proc = 0; # tell others to leave it be
339 };
340
341 $want_start = sub {
342 undef $stop_w;
343
344 $start_w ||= AE::timer $start, $start, sub {
345 if (($nidle < $idle || @queue) && @pool < $max) {
346 $start_worker->();
347 $scheduler->();
348 } else {
349 undef $start_w;
350 }
351 };
352 };
353
354 $want_stop = sub {
355 $stop_w ||= AE::timer $stop, $stop, sub {
356 $stop_worker->($pool[0])
357 if $nidle;
358
359 undef $stop_w
360 if $nidle <= $idle;
361 };
362 };
363
364 $scheduler = sub {
365 if (@queue) {
366 while (@queue) {
367 @pool or $start_worker->();
368
369 my $proc = $pool[0];
370
371 if ($proc->[0] < $load) {
372 # found free worker, increase load
373 unless ($proc->[0]++) {
374 # worker became busy
375 --$nidle
376 or undef $stop_w;
377
378 $want_start->()
379 if $nidle < $idle && @pool < $max;
380 }
381
382 Array::Heap::adjust_heap_idx @pool, 0;
383
384 my $job = shift @queue;
385 my $ocb = pop @$job;
386
387 $proc->[2]->(@$job, sub {
388 # reduce load
389 --$proc->[0] # worker still busy?
390 or ++$nidle > $idle # not too many idle processes?
391 or $want_stop->();
392
393 Array::Heap::adjust_heap_idx @pool, $proc->[1]
394 if defined $proc->[1];
395
396 &$ocb;
397
398 $scheduler->();
399 });
400 } else {
401 $want_start->()
402 unless @pool >= $max;
403
404 last;
405 }
406 }
407 } elsif ($shutdown) {
408 @pool = ();
409 undef $start_w;
410 undef $start_worker; # frees $destroy_guard reference
411
412 $stop_worker->($pool[0])
413 while $nidle;
414 }
415 };
416
417 my $shutdown_guard = Guard::guard {
418 $shutdown = 1;
419 $scheduler->();
420 };
421
422 $start_worker->()
423 while @pool < $idle;
424
425 sub {
426 $shutdown_guard if 0; # keep it alive
427
428 $start_worker->()
429 unless @pool;
430
431 push @queue, [@_];
432 $scheduler->();
433 }
434 }
435
436 =item $pool->(..., $cb->(...))
437
438 Call the RPC function of a worker with the given arguments, and when the
439 worker is done, call the C<$cb> with the results, just like calling the
440 RPC object durectly - see the L<AnyEvent::Fork::RPC> documentation for
441 details on the RPC API.
442
443 If there is no free worker, the call will be queued until a worker becomes
444 available.
445
446 Note that there can be considerable time between calling this method and
447 the call actually being executed. During this time, the parameters passed
448 to this function are effectively read-only - modifying them after the call
449 and before the callback is invoked causes undefined behaviour.
450
451 =cut
452
453 =item $cpus = AnyEvent::Fork::Pool::ncpu [$default_cpus]
454
455 =item ($cpus, $eus) = AnyEvent::Fork::Pool::ncpu [$default_cpus]
456
457 Tries to detect the number of CPUs (C<$cpus> often called cpu cores
458 nowadays) and execution units (C<$eus>) which include e.g. extra
459 hyperthreaded units). When C<$cpus> cannot be determined reliably,
460 C<$default_cpus> is returned for both values, or C<1> if it is missing.
461
462 For normal CPU bound uses, it is wise to have as many worker processes
463 as CPUs in the system (C<$cpus>), if nothing else uses the CPU. Using
464 hyperthreading is usually detrimental to performance, but in those rare
465 cases where that really helps it might be beneficial to use more workers
466 (C<$eus>).
467
468 Currently, F</proc/cpuinfo> is parsed on GNU/Linux systems for both
469 C<$cpus> and C<$eu>, and on {Free,Net,Open}BSD, F<sysctl -n hw.ncpu> is
470 used for C<$cpus>.
471
472 Example: create a worker pool with as many workers as cpu cores, or C<2>,
473 if the actual number could not be determined.
474
475 $fork->AnyEvent::Fork::Pool::run ("myworker::function",
476 max => (scalar AnyEvent::Fork::Pool::ncpu 2),
477 );
478
479 =cut
480
481 BEGIN {
482 if ($^O eq "linux") {
483 *ncpu = sub(;$) {
484 my ($cpus, $eus);
485
486 if (open my $fh, "<", "/proc/cpuinfo") {
487 my %id;
488
489 while (<$fh>) {
490 if (/^core id\s*:\s*(\d+)/) {
491 ++$eus;
492 undef $id{$1};
493 }
494 }
495
496 $cpus = scalar keys %id;
497 } else {
498 $cpus = $eus = @_ ? shift : 1;
499 }
500 wantarray ? ($cpus, $eus) : $cpus
501 };
502 } elsif ($^O eq "freebsd" || $^O eq "netbsd" || $^O eq "openbsd") {
503 *ncpu = sub(;$) {
504 my $cpus = qx<sysctl -n hw.ncpu> * 1
505 || (@_ ? shift : 1);
506 wantarray ? ($cpus, $cpus) : $cpus
507 };
508 } else {
509 *ncpu = sub(;$) {
510 my $cpus = @_ ? shift : 1;
511 wantarray ? ($cpus, $cpus) : $cpus
512 };
513 }
514 }
515
516 =back
517
518 =head1 CHILD USAGE
519
520 In addition to the L<AnyEvent::Fork::RPC> API, this module implements one
521 more child-side function:
522
523 =over 4
524
525 =item AnyEvent::Fork::Pool::retire ()
526
527 This function sends an event to the parent process to request retirement:
528 the worker is removed from the pool and no new jobs will be sent to it,
529 but it has to handle the jobs that are already queued.
530
531 The parentheses are part of the syntax: the function usually isn't defined
532 when you compile your code (because that happens I<before> handing the
533 template process over to C<AnyEvent::Fork::Pool::run>, so you need the
534 empty parentheses to tell Perl that the function is indeed a function.
535
536 Retiring a worker can be useful to gracefully shut it down when the worker
537 deems this useful. For example, after executing a job, one could check
538 the process size or the number of jobs handled so far, and if either is
539 too high, the worker could ask to get retired, to avoid memory leaks to
540 accumulate.
541
542 Example: retire a worker after it has handled roughly 100 requests.
543
544 my $count = 0;
545
546 sub my::worker {
547
548 ++$count == 100
549 and AnyEvent::Fork::Pool::retire ();
550
551 ... normal code goes here
552 }
553
554 =back
555
556 =head1 POOL PARAMETERS RECIPES
557
558 This section describes some recipes for pool paramaters. These are mostly
559 meant for the synchronous RPC backend, as the asynchronous RPC backend
560 changes the rules considerably, making workers themselves responsible for
561 their scheduling.
562
563 =over 4
564
565 =item low latency - set load = 1
566
567 If you need a deterministic low latency, you should set the C<load>
568 parameter to C<1>. This ensures that never more than one job is sent to
569 each worker. This avoids having to wait for a previous job to finish.
570
571 This makes most sense with the synchronous (default) backend, as the
572 asynchronous backend can handle multiple requests concurrently.
573
574 =item lowest latency - set load = 1 and idle = max
575
576 To achieve the lowest latency, you additionally should disable any dynamic
577 resizing of the pool by setting C<idle> to the same value as C<max>.
578
579 =item high throughput, cpu bound jobs - set load >= 2, max = #cpus
580
581 To get high throughput with cpu-bound jobs, you should set the maximum
582 pool size to the number of cpus in your system, and C<load> to at least
583 C<2>, to make sure there can be another job waiting for the worker when it
584 has finished one.
585
586 The value of C<2> for C<load> is the minimum value that I<can> achieve
587 100% throughput, but if your parent process itself is sometimes busy, you
588 might need higher values. Also there is a limit on the amount of data that
589 can be "in flight" to the worker, so if you send big blobs of data to your
590 worker, C<load> might have much less of an effect.
591
592 =item high throughput, I/O bound jobs - set load >= 2, max = 1, or very high
593
594 When your jobs are I/O bound, using more workers usually boils down to
595 higher throughput, depending very much on your actual workload - sometimes
596 having only one worker is best, for example, when you read or write big
597 files at maixmum speed, as a second worker will increase seek times.
598
599 =back
600
601 =head1 EXCEPTIONS
602
603 The same "policy" as with L<AnyEvent::Fork::RPC> applies - exceptins will
604 not be caught, and exceptions in both worker and in callbacks causes
605 undesirable or undefined behaviour.
606
607 =head1 SEE ALSO
608
609 L<AnyEvent::Fork>, to create the processes in the first place.
610
611 L<AnyEvent::Fork::RPC>, which implements the RPC protocol and API.
612
613 =head1 AUTHOR AND CONTACT INFORMATION
614
615 Marc Lehmann <schmorp@schmorp.de>
616 http://software.schmorp.de/pkg/AnyEvent-Fork-Pool
617
618 =cut
619
620 1
621