ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-Fork-Pool/Pool.pm
Revision: 1.13
Committed: Sun Apr 28 14:27:31 2013 UTC (11 years ago) by root
Branch: MAIN
Changes since 1.12: +2 -0 lines
Log Message:
*** empty log message ***

File Contents

# Content
1 =head1 NAME
2
3 AnyEvent::Fork::Pool - simple process pool manager on top of AnyEvent::Fork
4
5 =head1 SYNOPSIS
6
7 use AnyEvent;
8 use AnyEvent::Fork::Pool;
9 # use AnyEvent::Fork is not needed
10
11 # all possible parameters shown, with default values
12 my $pool = AnyEvent::Fork
13 ->new
14 ->require ("MyWorker")
15 ->AnyEvent::Fork::Pool::run (
16 "MyWorker::run", # the worker function
17
18 # pool management
19 max => 4, # absolute maximum # of processes
20 idle => 0, # minimum # of idle processes
21 load => 2, # queue at most this number of jobs per process
22 start => 0.1, # wait this many seconds before starting a new process
23 stop => 10, # wait this many seconds before stopping an idle process
24 on_destroy => (my $finish = AE::cv), # called when object is destroyed
25
26 # parameters passed to AnyEvent::Fork::RPC
27 async => 0,
28 on_error => sub { die "FATAL: $_[0]\n" },
29 on_event => sub { my @ev = @_ },
30 init => "MyWorker::init",
31 serialiser => $AnyEvent::Fork::RPC::STRING_SERIALISER,
32 );
33
34 for (1..10) {
35 $pool->(doit => $_, sub {
36 print "MyWorker::run returned @_\n";
37 });
38 }
39
40 undef $pool;
41
42 $finish->recv;
43
44 =head1 DESCRIPTION
45
46 This module uses processes created via L<AnyEvent::Fork> (or
47 L<AnyEvent::Fork::Remote>) and the RPC protocol implement in
48 L<AnyEvent::Fork::RPC> to create a load-balanced pool of processes that
49 handles jobs.
50
51 Understanding of L<AnyEvent::Fork> is helpful but not critical to be able
52 to use this module, but a thorough understanding of L<AnyEvent::Fork::RPC>
53 is, as it defines the actual API that needs to be implemented in the
54 worker processes.
55
56 =head1 PARENT USAGE
57
58 To create a pool, you first have to create a L<AnyEvent::Fork> object -
59 this object becomes your template process. Whenever a new worker process
60 is needed, it is forked from this template process. Then you need to
61 "hand off" this template process to the C<AnyEvent::Fork::Pool> module by
62 calling its run method on it:
63
64 my $template = AnyEvent::Fork
65 ->new
66 ->require ("SomeModule", "MyWorkerModule");
67
68 my $pool = $template->AnyEvent::Fork::Pool::run ("MyWorkerModule::myfunction");
69
70 The pool "object" is not a regular Perl object, but a code reference that
71 you can call and that works roughly like calling the worker function
72 directly, except that it returns nothing but instead you need to specify a
73 callback to be invoked once results are in:
74
75 $pool->(1, 2, 3, sub { warn "myfunction(1,2,3) returned @_" });
76
77 =over 4
78
79 =cut
80
81 package AnyEvent::Fork::Pool;
82
83 use common::sense;
84
85 use Scalar::Util ();
86
87 use Guard ();
88 use Array::Heap ();
89
90 use AnyEvent;
91 # explicit version on next line, as some cpan-testers test with the 0.1 version,
92 # ignoring dependencies, and this line will at least give a clear indication of that.
93 use AnyEvent::Fork 0.6; # we don't actually depend on it, this is for convenience
94 use AnyEvent::Fork::RPC;
95
96 # these are used for the first and last argument of events
97 # in the hope of not colliding. yes, I don't like it either,
98 # but didn't come up with an obviously better alternative.
99 my $magic0 = ':t6Z@HK1N%Dx@_7?=~-7NQgWDdAs6a,jFN=wLO0*jD*1%P';
100 my $magic1 = '<~53rexz.U`!]X[A235^"fyEoiTF\T~oH1l/N6+Djep9b~bI9`\1x%B~vWO1q*';
101
102 our $VERSION = 1.1;
103
104 =item my $pool = AnyEvent::Fork::Pool::run $fork, $function, [key => value...]
105
106 The traditional way to call the pool creation function. But it is way
107 cooler to call it in the following way:
108
109 =item my $pool = $fork->AnyEvent::Fork::Pool::run ($function, [key => value...])
110
111 Creates a new pool object with the specified C<$function> as function
112 (name) to call for each request. The pool uses the C<$fork> object as the
113 template when creating worker processes.
114
115 You can supply your own template process, or tell C<AnyEvent::Fork::Pool>
116 to create one.
117
118 A relatively large number of key/value pairs can be specified to influence
119 the behaviour. They are grouped into the categories "pool management",
120 "template process" and "rpc parameters".
121
122 =over 4
123
124 =item Pool Management
125
126 The pool consists of a certain number of worker processes. These options
127 decide how many of these processes exist and when they are started and
128 stopped.
129
130 The worker pool is dynamically resized, according to (perceived :)
131 load. The minimum size is given by the C<idle> parameter and the maximum
132 size is given by the C<max> parameter. A new worker is started every
133 C<start> seconds at most, and an idle worker is stopped at most every
134 C<stop> second.
135
136 You can specify the amount of jobs sent to a worker concurrently using the
137 C<load> parameter.
138
139 =over 4
140
141 =item idle => $count (default: 0)
142
143 The minimum amount of idle processes in the pool - when there are fewer
144 than this many idle workers, C<AnyEvent::Fork::Pool> will try to start new
145 ones, subject to the limits set by C<max> and C<start>.
146
147 This is also the initial amount of workers in the pool. The default of
148 zero means that the pool starts empty and can shrink back to zero workers
149 over time.
150
151 =item max => $count (default: 4)
152
153 The maximum number of processes in the pool, in addition to the template
154 process. C<AnyEvent::Fork::Pool> will never have more than this number of
155 worker processes, although there can be more temporarily when a worker is
156 shut down and hasn't exited yet.
157
158 =item load => $count (default: 2)
159
160 The maximum number of concurrent jobs sent to a single worker process.
161
162 Jobs that cannot be sent to a worker immediately (because all workers are
163 busy) will be queued until a worker is available.
164
165 Setting this low improves latency. For example, at C<1>, every job that
166 is sent to a worker is sent to a completely idle worker that doesn't run
167 any other jobs. The downside is that throughput is reduced - a worker that
168 finishes a job needs to wait for a new job from the parent.
169
170 The default of C<2> is usually a good compromise.
171
172 =item start => $seconds (default: 0.1)
173
174 When there are fewer than C<idle> workers (or all workers are completely
175 busy), then a timer is started. If the timer elapses and there are still
176 jobs that cannot be queued to a worker, a new worker is started.
177
178 This sets the minimum time that all workers must be busy before a new
179 worker is started. Or, put differently, the minimum delay between starting
180 new workers.
181
182 The delay is small by default, which means new workers will be started
183 relatively quickly. A delay of C<0> is possible, and ensures that the pool
184 will grow as quickly as possible under load.
185
186 Non-zero values are useful to avoid "exploding" a pool because a lot of
187 jobs are queued in an instant.
188
189 Higher values are often useful to improve efficiency at the cost of
190 latency - when fewer processes can do the job over time, starting more and
191 more is not necessarily going to help.
192
193 =item stop => $seconds (default: 10)
194
195 When a worker has no jobs to execute it becomes idle. An idle worker that
196 hasn't executed a job within this amount of time will be stopped, unless
197 the other parameters say otherwise.
198
199 Setting this to a very high value means that workers stay around longer,
200 even when they have nothing to do, which can be good as they don't have to
201 be started on the netx load spike again.
202
203 Setting this to a lower value can be useful to avoid memory or simply
204 process table wastage.
205
206 Usually, setting this to a time longer than the time between load spikes
207 is best - if you expect a lot of requests every minute and little work
208 in between, setting this to longer than a minute avoids having to stop
209 and start workers. On the other hand, you have to ask yourself if letting
210 workers run idle is a good use of your resources. Try to find a good
211 balance between resource usage of your workers and the time to start new
212 workers - the processes created by L<AnyEvent::Fork> itself is fats at
213 creating workers while not using much memory for them, so most of the
214 overhead is likely from your own code.
215
216 =item on_destroy => $callback->() (default: none)
217
218 When a pool object goes out of scope, the outstanding requests are still
219 handled till completion. Only after handling all jobs will the workers
220 be destroyed (and also the template process if it isn't referenced
221 otherwise).
222
223 To find out when a pool I<really> has finished its work, you can set this
224 callback, which will be called when the pool has been destroyed.
225
226 =back
227
228 =item AnyEvent::Fork::RPC Parameters
229
230 These parameters are all passed more or less directly to
231 L<AnyEvent::Fork::RPC>. They are only briefly mentioned here, for
232 their full documentation please refer to the L<AnyEvent::Fork::RPC>
233 documentation. Also, the default values mentioned here are only documented
234 as a best effort - the L<AnyEvent::Fork::RPC> documentation is binding.
235
236 =over 4
237
238 =item async => $boolean (default: 0)
239
240 Whether to use the synchronous or asynchronous RPC backend.
241
242 =item on_error => $callback->($message) (default: die with message)
243
244 The callback to call on any (fatal) errors.
245
246 =item on_event => $callback->(...) (default: C<sub { }>, unlike L<AnyEvent::Fork::RPC>)
247
248 The callback to invoke on events.
249
250 =item init => $initfunction (default: none)
251
252 The function to call in the child, once before handling requests.
253
254 =item serialiser => $serialiser (defailt: $AnyEvent::Fork::RPC::STRING_SERIALISER)
255
256 The serialiser to use.
257
258 =back
259
260 =back
261
262 =cut
263
264 sub run {
265 my ($template, $function, %arg) = @_;
266
267 my $max = $arg{max} || 4;
268 my $idle = $arg{idle} || 0,
269 my $load = $arg{load} || 2,
270 my $start = $arg{start} || 0.1,
271 my $stop = $arg{stop} || 10,
272 my $on_event = $arg{on_event} || sub { },
273 my $on_destroy = $arg{on_destroy};
274
275 my @rpc = (
276 async => $arg{async},
277 init => $arg{init},
278 serialiser => delete $arg{serialiser},
279 on_error => $arg{on_error},
280 );
281
282 my (@pool, @queue, $nidle, $start_w, $stop_w, $shutdown);
283 my ($start_worker, $stop_worker, $want_start, $want_stop, $scheduler);
284
285 my $destroy_guard = Guard::guard {
286 $on_destroy->()
287 if $on_destroy;
288 };
289
290 $template
291 ->require ("AnyEvent::Fork::RPC::" . ($arg{async} ? "Async" : "Sync"))
292 ->eval ('
293 my ($magic0, $magic1) = @_;
294 sub AnyEvent::Fork::Pool::retire() {
295 AnyEvent::Fork::RPC::event $magic0, "quit", $magic1;
296 }
297 ', $magic0, $magic1)
298 ;
299
300 $start_worker = sub {
301 my $proc = [0, 0, undef]; # load, index, rpc
302
303 $proc->[2] = $template
304 ->fork
305 ->AnyEvent::Fork::RPC::run ($function,
306 @rpc,
307 on_event => sub {
308 if (@_ == 3 && $_[0] eq $magic0 && $_[2] eq $magic1) {
309 $destroy_guard if 0; # keep it alive
310
311 $_[1] eq "quit" and $stop_worker->($proc);
312 return;
313 }
314
315 &$on_event;
316 },
317 )
318 ;
319
320 ++$nidle;
321 Array::Heap::push_heap_idx @pool, $proc;
322
323 Scalar::Util::weaken $proc;
324 };
325
326 $stop_worker = sub {
327 my $proc = shift;
328
329 $proc->[0]
330 or --$nidle;
331
332 Array::Heap::splice_heap_idx @pool, $proc->[1]
333 if defined $proc->[1];
334
335 @$proc = 0; # tell others to leave it be
336 };
337
338 $want_start = sub {
339 undef $stop_w;
340
341 $start_w ||= AE::timer $start, $start, sub {
342 if (($nidle < $idle || @queue) && @pool < $max) {
343 $start_worker->();
344 $scheduler->();
345 } else {
346 undef $start_w;
347 }
348 };
349 };
350
351 $want_stop = sub {
352 $stop_w ||= AE::timer $stop, $stop, sub {
353 $stop_worker->($pool[0])
354 if $nidle;
355
356 undef $stop_w
357 if $nidle <= $idle;
358 };
359 };
360
361 $scheduler = sub {
362 if (@queue) {
363 while (@queue) {
364 @pool or $start_worker->();
365
366 my $proc = $pool[0];
367
368 if ($proc->[0] < $load) {
369 # found free worker, increase load
370 unless ($proc->[0]++) {
371 # worker became busy
372 --$nidle
373 or undef $stop_w;
374
375 $want_start->()
376 if $nidle < $idle && @pool < $max;
377 }
378
379 Array::Heap::adjust_heap_idx @pool, 0;
380
381 my $job = shift @queue;
382 my $ocb = pop @$job;
383
384 $proc->[2]->(@$job, sub {
385 # reduce load
386 --$proc->[0] # worker still busy?
387 or ++$nidle > $idle # not too many idle processes?
388 or $want_stop->();
389
390 Array::Heap::adjust_heap_idx @pool, $proc->[1]
391 if defined $proc->[1];
392
393 &$ocb;
394
395 $scheduler->();
396 });
397 } else {
398 $want_start->()
399 unless @pool >= $max;
400
401 last;
402 }
403 }
404 } elsif ($shutdown) {
405 @pool = ();
406 undef $start_w;
407 undef $start_worker; # frees $destroy_guard reference
408
409 $stop_worker->($pool[0])
410 while $nidle;
411 }
412 };
413
414 my $shutdown_guard = Guard::guard {
415 $shutdown = 1;
416 $scheduler->();
417 };
418
419 $start_worker->()
420 while @pool < $idle;
421
422 sub {
423 $shutdown_guard if 0; # keep it alive
424
425 $start_worker->()
426 unless @pool;
427
428 push @queue, [@_];
429 $scheduler->();
430 }
431 }
432
433 =item $pool->(..., $cb->(...))
434
435 Call the RPC function of a worker with the given arguments, and when the
436 worker is done, call the C<$cb> with the results, just like calling the
437 RPC object durectly - see the L<AnyEvent::Fork::RPC> documentation for
438 details on the RPC API.
439
440 If there is no free worker, the call will be queued until a worker becomes
441 available.
442
443 Note that there can be considerable time between calling this method and
444 the call actually being executed. During this time, the parameters passed
445 to this function are effectively read-only - modifying them after the call
446 and before the callback is invoked causes undefined behaviour.
447
448 =cut
449
450 =item $cpus = AnyEvent::Fork::Pool::ncpu [$default_cpus]
451
452 =item ($cpus, $eus) = AnyEvent::Fork::Pool::ncpu [$default_cpus]
453
454 Tries to detect the number of CPUs (C<$cpus> often called cpu cores
455 nowadays) and execution units (C<$eus>) which include e.g. extra
456 hyperthreaded units). When C<$cpus> cannot be determined reliably,
457 C<$default_cpus> is returned for both values, or C<1> if it is missing.
458
459 For normal CPU bound uses, it is wise to have as many worker processes
460 as CPUs in the system (C<$cpus>), if nothing else uses the CPU. Using
461 hyperthreading is usually detrimental to performance, but in those rare
462 cases where that really helps it might be beneficial to use more workers
463 (C<$eus>).
464
465 Currently, F</proc/cpuinfo> is parsed on GNU/Linux systems for both
466 C<$cpus> and C<$eu>, and on {Free,Net,Open}BSD, F<sysctl -n hw.ncpu> is
467 used for C<$cpus>.
468
469 Example: create a worker pool with as many workers as cpu cores, or C<2>,
470 if the actual number could not be determined.
471
472 $fork->AnyEvent::Fork::Pool::run ("myworker::function",
473 max => (scalar AnyEvent::Fork::Pool::ncpu 2),
474 );
475
476 =cut
477
478 BEGIN {
479 if ($^O eq "linux") {
480 *ncpu = sub(;$) {
481 my ($cpus, $eus);
482
483 if (open my $fh, "<", "/proc/cpuinfo") {
484 my %id;
485
486 while (<$fh>) {
487 if (/^core id\s*:\s*(\d+)/) {
488 ++$eus;
489 undef $id{$1};
490 }
491 }
492
493 $cpus = scalar keys %id;
494 } else {
495 $cpus = $eus = @_ ? shift : 1;
496 }
497 wantarray ? ($cpus, $eus) : $cpus
498 };
499 } elsif ($^O eq "freebsd" || $^O eq "netbsd" || $^O eq "openbsd") {
500 *ncpu = sub(;$) {
501 my $cpus = qx<sysctl -n hw.ncpu> * 1
502 || (@_ ? shift : 1);
503 wantarray ? ($cpus, $cpus) : $cpus
504 };
505 } else {
506 *ncpu = sub(;$) {
507 my $cpus = @_ ? shift : 1;
508 wantarray ? ($cpus, $cpus) : $cpus
509 };
510 }
511 }
512
513 =back
514
515 =head1 CHILD USAGE
516
517 In addition to the L<AnyEvent::Fork::RPC> API, this module implements one
518 more child-side function:
519
520 =over 4
521
522 =item AnyEvent::Fork::Pool::retire ()
523
524 This function sends an event to the parent process to request retirement:
525 the worker is removed from the pool and no new jobs will be sent to it,
526 but it has to handle the jobs that are already queued.
527
528 The parentheses are part of the syntax: the function usually isn't defined
529 when you compile your code (because that happens I<before> handing the
530 template process over to C<AnyEvent::Fork::Pool::run>, so you need the
531 empty parentheses to tell Perl that the function is indeed a function.
532
533 Retiring a worker can be useful to gracefully shut it down when the worker
534 deems this useful. For example, after executing a job, one could check
535 the process size or the number of jobs handled so far, and if either is
536 too high, the worker could ask to get retired, to avoid memory leaks to
537 accumulate.
538
539 Example: retire a worker after it has handled roughly 100 requests.
540
541 my $count = 0;
542
543 sub my::worker {
544
545 ++$count == 100
546 and AnyEvent::Fork::Pool::retire ();
547
548 ... normal code goes here
549 }
550
551 =back
552
553 =head1 POOL PARAMETERS RECIPES
554
555 This section describes some recipes for pool paramaters. These are mostly
556 meant for the synchronous RPC backend, as the asynchronous RPC backend
557 changes the rules considerably, making workers themselves responsible for
558 their scheduling.
559
560 =over 4
561
562 =item low latency - set load = 1
563
564 If you need a deterministic low latency, you should set the C<load>
565 parameter to C<1>. This ensures that never more than one job is sent to
566 each worker. This avoids having to wait for a previous job to finish.
567
568 This makes most sense with the synchronous (default) backend, as the
569 asynchronous backend can handle multiple requests concurrently.
570
571 =item lowest latency - set load = 1 and idle = max
572
573 To achieve the lowest latency, you additionally should disable any dynamic
574 resizing of the pool by setting C<idle> to the same value as C<max>.
575
576 =item high throughput, cpu bound jobs - set load >= 2, max = #cpus
577
578 To get high throughput with cpu-bound jobs, you should set the maximum
579 pool size to the number of cpus in your system, and C<load> to at least
580 C<2>, to make sure there can be another job waiting for the worker when it
581 has finished one.
582
583 The value of C<2> for C<load> is the minimum value that I<can> achieve
584 100% throughput, but if your parent process itself is sometimes busy, you
585 might need higher values. Also there is a limit on the amount of data that
586 can be "in flight" to the worker, so if you send big blobs of data to your
587 worker, C<load> might have much less of an effect.
588
589 =item high throughput, I/O bound jobs - set load >= 2, max = 1, or very high
590
591 When your jobs are I/O bound, using more workers usually boils down to
592 higher throughput, depending very much on your actual workload - sometimes
593 having only one worker is best, for example, when you read or write big
594 files at maixmum speed, as a second worker will increase seek times.
595
596 =back
597
598 =head1 EXCEPTIONS
599
600 The same "policy" as with L<AnyEvent::Fork::RPC> applies - exceptins will
601 not be caught, and exceptions in both worker and in callbacks causes
602 undesirable or undefined behaviour.
603
604 =head1 SEE ALSO
605
606 L<AnyEvent::Fork>, to create the processes in the first place.
607
608 L<AnyEvent::Fork::Remote>, likewise, but helpful for remote processes.
609
610 L<AnyEvent::Fork::RPC>, which implements the RPC protocol and API.
611
612 =head1 AUTHOR AND CONTACT INFORMATION
613
614 Marc Lehmann <schmorp@schmorp.de>
615 http://software.schmorp.de/pkg/AnyEvent-Fork-Pool
616
617 =cut
618
619 1
620