ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-Fork-Pool/Pool.pm
Revision: 1.17
Committed: Thu Oct 27 07:27:56 2022 UTC (18 months, 1 week ago) by root
Branch: MAIN
CVS Tags: rel-1_3, HEAD
Changes since 1.16: +1 -1 lines
Log Message:
1.3

File Contents

# Content
1 =head1 NAME
2
3 AnyEvent::Fork::Pool - simple process pool manager on top of AnyEvent::Fork
4
5 =head1 SYNOPSIS
6
7 use AnyEvent;
8 use AnyEvent::Fork;
9 use AnyEvent::Fork::Pool;
10
11 # all possible parameters shown, with default values
12 my $pool = AnyEvent::Fork
13 ->new
14 ->require ("MyWorker")
15 ->AnyEvent::Fork::Pool::run (
16 "MyWorker::run", # the worker function
17
18 # pool management
19 max => 4, # absolute maximum # of processes
20 idle => 0, # minimum # of idle processes
21 load => 2, # queue at most this number of jobs per process
22 start => 0.1, # wait this many seconds before starting a new process
23 stop => 10, # wait this many seconds before stopping an idle process
24 on_destroy => (my $finish = AE::cv), # called when object is destroyed
25
26 # parameters passed to AnyEvent::Fork::RPC
27 async => 0,
28 on_error => sub { die "FATAL: $_[0]\n" },
29 on_event => sub { my @ev = @_ },
30 init => "MyWorker::init",
31 serialiser => $AnyEvent::Fork::RPC::STRING_SERIALISER,
32 );
33
34 for (1..10) {
35 $pool->(doit => $_, sub {
36 print "MyWorker::run returned @_\n";
37 });
38 }
39
40 undef $pool;
41
42 $finish->recv;
43
44 =head1 DESCRIPTION
45
46 This module uses processes created via L<AnyEvent::Fork> (or
47 L<AnyEvent::Fork::Remote>) and the RPC protocol implement in
48 L<AnyEvent::Fork::RPC> to create a load-balanced pool of processes that
49 handles jobs.
50
51 Understanding L<AnyEvent::Fork> is helpful but not required to use this
52 module, but a thorough understanding of L<AnyEvent::Fork::RPC> is, as
53 it defines the actual API that needs to be implemented in the worker
54 processes.
55
56 =head1 PARENT USAGE
57
58 To create a pool, you first have to create a L<AnyEvent::Fork> object -
59 this object becomes your template process. Whenever a new worker process
60 is needed, it is forked from this template process. Then you need to
61 "hand off" this template process to the C<AnyEvent::Fork::Pool> module by
62 calling its run method on it:
63
64 my $template = AnyEvent::Fork
65 ->new
66 ->require ("SomeModule", "MyWorkerModule");
67
68 my $pool = $template->AnyEvent::Fork::Pool::run ("MyWorkerModule::myfunction");
69
70 The pool "object" is not a regular Perl object, but a code reference that
71 you can call and that works roughly like calling the worker function
72 directly, except that it returns nothing but instead you need to specify a
73 callback to be invoked once results are in:
74
75 $pool->(1, 2, 3, sub { warn "myfunction(1,2,3) returned @_" });
76
77 =over 4
78
79 =cut
80
81 package AnyEvent::Fork::Pool;
82
83 use common::sense;
84
85 use Scalar::Util ();
86
87 use Guard ();
88 use Array::Heap ();
89
90 use AnyEvent;
91 use AnyEvent::Fork::RPC;
92
93 # these are used for the first and last argument of events
94 # in the hope of not colliding. yes, I don't like it either,
95 # but didn't come up with an obviously better alternative.
96 my $magic0 = ':t6Z@HK1N%Dx@_7?=~-7NQgWDdAs6a,jFN=wLO0*jD*1%P';
97 my $magic1 = '<~53rexz.U`!]X[A235^"fyEoiTF\T~oH1l/N6+Djep9b~bI9`\1x%B~vWO1q*';
98
99 our $VERSION = 1.3;
100
101 =item my $pool = AnyEvent::Fork::Pool::run $fork, $function, [key => value...]
102
103 The traditional way to call the pool creation function. But it is way
104 cooler to call it in the following way:
105
106 =item my $pool = $fork->AnyEvent::Fork::Pool::run ($function, [key => value...])
107
108 Creates a new pool object with the specified C<$function> as function
109 (name) to call for each request. The pool uses the C<$fork> object as the
110 template when creating worker processes.
111
112 You can supply your own template process, or tell C<AnyEvent::Fork::Pool>
113 to create one.
114
115 A relatively large number of key/value pairs can be specified to influence
116 the behaviour. They are grouped into the categories "pool management",
117 "template process" and "rpc parameters".
118
119 =over 4
120
121 =item Pool Management
122
123 The pool consists of a certain number of worker processes. These options
124 decide how many of these processes exist and when they are started and
125 stopped.
126
127 The worker pool is dynamically resized, according to (perceived :)
128 load. The minimum size is given by the C<idle> parameter and the maximum
129 size is given by the C<max> parameter. A new worker is started every
130 C<start> seconds at most, and an idle worker is stopped at most every
131 C<stop> second.
132
133 You can specify the amount of jobs sent to a worker concurrently using the
134 C<load> parameter.
135
136 =over 4
137
138 =item idle => $count (default: 0)
139
140 The minimum amount of idle processes in the pool - when there are fewer
141 than this many idle workers, C<AnyEvent::Fork::Pool> will try to start new
142 ones, subject to the limits set by C<max> and C<start>.
143
144 This is also the initial amount of workers in the pool. The default of
145 zero means that the pool starts empty and can shrink back to zero workers
146 over time.
147
148 =item max => $count (default: 4)
149
150 The maximum number of processes in the pool, in addition to the template
151 process. C<AnyEvent::Fork::Pool> will never have more than this number of
152 worker processes, although there can be more temporarily when a worker is
153 shut down and hasn't exited yet.
154
155 =item load => $count (default: 2)
156
157 The maximum number of concurrent jobs sent to a single worker process.
158
159 Jobs that cannot be sent to a worker immediately (because all workers are
160 busy) will be queued until a worker is available.
161
162 Setting this low improves latency. For example, at C<1>, every job that
163 is sent to a worker is sent to a completely idle worker that doesn't run
164 any other jobs. The downside is that throughput is reduced - a worker that
165 finishes a job needs to wait for a new job from the parent.
166
167 The default of C<2> is usually a good compromise.
168
169 =item start => $seconds (default: 0.1)
170
171 When there are fewer than C<idle> workers (or all workers are completely
172 busy), then a timer is started. If the timer elapses and there are still
173 jobs that cannot be queued to a worker, a new worker is started.
174
175 This sets the minimum time that all workers must be busy before a new
176 worker is started. Or, put differently, the minimum delay between starting
177 new workers.
178
179 The delay is small by default, which means new workers will be started
180 relatively quickly. A delay of C<0> is possible, and ensures that the pool
181 will grow as quickly as possible under load.
182
183 Non-zero values are useful to avoid "exploding" a pool because a lot of
184 jobs are queued in an instant.
185
186 Higher values are often useful to improve efficiency at the cost of
187 latency - when fewer processes can do the job over time, starting more and
188 more is not necessarily going to help.
189
190 =item stop => $seconds (default: 10)
191
192 When a worker has no jobs to execute it becomes idle. An idle worker that
193 hasn't executed a job within this amount of time will be stopped, unless
194 the other parameters say otherwise.
195
196 Setting this to a very high value means that workers stay around longer,
197 even when they have nothing to do, which can be good as they don't have to
198 be started on the netx load spike again.
199
200 Setting this to a lower value can be useful to avoid memory or simply
201 process table wastage.
202
203 Usually, setting this to a time longer than the time between load spikes
204 is best - if you expect a lot of requests every minute and little work
205 in between, setting this to longer than a minute avoids having to stop
206 and start workers. On the other hand, you have to ask yourself if letting
207 workers run idle is a good use of your resources. Try to find a good
208 balance between resource usage of your workers and the time to start new
209 workers - the processes created by L<AnyEvent::Fork> itself is fats at
210 creating workers while not using much memory for them, so most of the
211 overhead is likely from your own code.
212
213 =item on_destroy => $callback->() (default: none)
214
215 When a pool object goes out of scope, the outstanding requests are still
216 handled till completion. Only after handling all jobs will the workers
217 be destroyed (and also the template process if it isn't referenced
218 otherwise).
219
220 To find out when a pool I<really> has finished its work, you can set this
221 callback, which will be called when the pool has been destroyed.
222
223 =back
224
225 =item AnyEvent::Fork::RPC Parameters
226
227 These parameters are all passed more or less directly to
228 L<AnyEvent::Fork::RPC>. They are only briefly mentioned here, for
229 their full documentation please refer to the L<AnyEvent::Fork::RPC>
230 documentation. Also, the default values mentioned here are only documented
231 as a best effort - the L<AnyEvent::Fork::RPC> documentation is binding.
232
233 =over 4
234
235 =item async => $boolean (default: 0)
236
237 Whether to use the synchronous or asynchronous RPC backend.
238
239 =item on_error => $callback->($message) (default: die with message)
240
241 The callback to call on any (fatal) errors.
242
243 =item on_event => $callback->(...) (default: C<sub { }>, unlike L<AnyEvent::Fork::RPC>)
244
245 The callback to invoke on events.
246
247 =item init => $initfunction (default: none)
248
249 The function to call in the child, once before handling requests.
250
251 =item serialiser => $serialiser (defailt: $AnyEvent::Fork::RPC::STRING_SERIALISER)
252
253 The serialiser to use.
254
255 =back
256
257 =back
258
259 =cut
260
261 sub run {
262 my ($template, $function, %arg) = @_;
263
264 my $max = $arg{max} || 4;
265 my $idle = $arg{idle} || 0,
266 my $load = $arg{load} || 2,
267 my $start = $arg{start} || 0.1,
268 my $stop = $arg{stop} || 10,
269 my $on_event = $arg{on_event} || sub { },
270 my $on_destroy = $arg{on_destroy};
271
272 my @rpc = (
273 async => $arg{async},
274 init => $arg{init},
275 serialiser => delete $arg{serialiser},
276 on_error => $arg{on_error},
277 );
278
279 my (@pool, @queue, $nidle, $start_w, $stop_w, $shutdown);
280 my ($start_worker, $stop_worker, $want_start, $want_stop, $scheduler);
281
282 my $destroy_guard = Guard::guard {
283 $on_destroy->()
284 if $on_destroy;
285 };
286
287 $template
288 ->require ("AnyEvent::Fork::RPC::" . ($arg{async} ? "Async" : "Sync"))
289 ->eval ('
290 my ($magic0, $magic1) = @_;
291 sub AnyEvent::Fork::Pool::retire() {
292 AnyEvent::Fork::RPC::event $magic0, "quit", $magic1;
293 }
294 ', $magic0, $magic1)
295 ;
296
297 $start_worker = sub {
298 my $proc = [0, 0, undef]; # load, index, rpc
299
300 $proc->[2] = $template
301 ->fork
302 ->AnyEvent::Fork::RPC::run ($function,
303 @rpc,
304 on_event => sub {
305 if (@_ == 3 && $_[0] eq $magic0 && $_[2] eq $magic1) {
306 $destroy_guard if 0; # keep it alive
307
308 $_[1] eq "quit" and $stop_worker->($proc);
309 return;
310 }
311
312 &$on_event;
313 },
314 )
315 ;
316
317 ++$nidle;
318 Array::Heap::push_heap_idx @pool, $proc;
319
320 Scalar::Util::weaken $proc;
321 };
322
323 $stop_worker = sub {
324 my $proc = shift;
325
326 $proc->[0]
327 or --$nidle;
328
329 Array::Heap::splice_heap_idx @pool, $proc->[1]
330 if defined $proc->[1];
331
332 @$proc = 0; # tell others to leave it be
333 };
334
335 $want_start = sub {
336 undef $stop_w;
337
338 $start_w ||= AE::timer $start, $start, sub {
339 if (($nidle < $idle || @queue) && @pool < $max) {
340 $start_worker->();
341 $scheduler->();
342 } else {
343 undef $start_w;
344 }
345 };
346 };
347
348 $want_stop = sub {
349 $stop_w ||= AE::timer $stop, $stop, sub {
350 $stop_worker->($pool[0])
351 if $nidle;
352
353 undef $stop_w
354 if $nidle <= $idle;
355 };
356 };
357
358 $scheduler = sub {
359 if (@queue) {
360 while (@queue) {
361 @pool or $start_worker->();
362
363 my $proc = $pool[0];
364
365 if ($proc->[0] < $load) {
366 # found free worker, increase load
367 unless ($proc->[0]++) {
368 # worker became busy
369 --$nidle
370 or undef $stop_w;
371
372 $want_start->()
373 if $nidle < $idle && @pool < $max;
374 }
375
376 Array::Heap::adjust_heap_idx @pool, 0;
377
378 my $job = shift @queue;
379 my $ocb = pop @$job;
380
381 $proc->[2]->(@$job, sub {
382 # reduce load
383 --$proc->[0] # worker still busy?
384 or ++$nidle > $idle # not too many idle processes?
385 or $want_stop->();
386
387 Array::Heap::adjust_heap_idx @pool, $proc->[1]
388 if defined $proc->[1];
389
390 &$ocb;
391
392 $scheduler->();
393 });
394 } else {
395 $want_start->()
396 unless @pool >= $max;
397
398 last;
399 }
400 }
401 } elsif ($shutdown) {
402 undef $_->[2]
403 for @pool;
404
405 undef $start_w;
406 undef $start_worker; # frees $destroy_guard reference
407
408 $stop_worker->($pool[0])
409 while $nidle;
410 }
411 };
412
413 my $shutdown_guard = Guard::guard {
414 $shutdown = 1;
415 $scheduler->();
416 };
417
418 $start_worker->()
419 while @pool < $idle;
420
421 sub {
422 $shutdown_guard if 0; # keep it alive
423
424 $start_worker->()
425 unless @pool;
426
427 push @queue, [@_];
428 $scheduler->();
429 }
430 }
431
432 =item $pool->(..., $cb->(...))
433
434 Call the RPC function of a worker with the given arguments, and when the
435 worker is done, call the C<$cb> with the results, just like calling the
436 RPC object durectly - see the L<AnyEvent::Fork::RPC> documentation for
437 details on the RPC API.
438
439 If there is no free worker, the call will be queued until a worker becomes
440 available.
441
442 Note that there can be considerable time between calling this method and
443 the call actually being executed. During this time, the parameters passed
444 to this function are effectively read-only - modifying them after the call
445 and before the callback is invoked causes undefined behaviour.
446
447 =cut
448
449 =item $cpus = AnyEvent::Fork::Pool::ncpu [$default_cpus]
450
451 =item ($cpus, $eus) = AnyEvent::Fork::Pool::ncpu [$default_cpus]
452
453 Tries to detect the number of CPUs (C<$cpus> often called CPU cores
454 nowadays) and execution units (C<$eus>) which include e.g. extra
455 hyperthreaded units). When C<$cpus> cannot be determined reliably,
456 C<$default_cpus> is returned for both values, or C<1> if it is missing.
457
458 For normal CPU bound uses, it is wise to have as many worker processes
459 as CPUs in the system (C<$cpus>), if nothing else uses the CPU. Using
460 hyperthreading is usually detrimental to performance, but in those rare
461 cases where that really helps it might be beneficial to use more workers
462 (C<$eus>).
463
464 Currently, F</proc/cpuinfo> is parsed on GNU/Linux systems for both
465 C<$cpus> and C<$eus>, and on {Free,Net,Open}BSD, F<sysctl -n hw.ncpu> is
466 used for C<$cpus>.
467
468 Example: create a worker pool with as many workers as CPU cores, or C<2>,
469 if the actual number could not be determined.
470
471 $fork->AnyEvent::Fork::Pool::run ("myworker::function",
472 max => (scalar AnyEvent::Fork::Pool::ncpu 2),
473 );
474
475 =cut
476
477 BEGIN {
478 if ($^O eq "linux") {
479 *ncpu = sub(;$) {
480 my ($cpus, $eus);
481
482 if (open my $fh, "<", "/proc/cpuinfo") {
483 my %id;
484
485 while (<$fh>) {
486 if (/^core id\s*:\s*(\d+)/) {
487 ++$eus;
488 undef $id{$1};
489 }
490 }
491
492 $cpus = scalar keys %id;
493 } else {
494 $cpus = $eus = @_ ? shift : 1;
495 }
496 wantarray ? ($cpus, $eus) : $cpus
497 };
498 } elsif ($^O eq "freebsd" || $^O eq "netbsd" || $^O eq "openbsd") {
499 *ncpu = sub(;$) {
500 my $cpus = qx<sysctl -n hw.ncpu> * 1
501 || (@_ ? shift : 1);
502 wantarray ? ($cpus, $cpus) : $cpus
503 };
504 } else {
505 *ncpu = sub(;$) {
506 my $cpus = @_ ? shift : 1;
507 wantarray ? ($cpus, $cpus) : $cpus
508 };
509 }
510 }
511
512 =back
513
514 =head1 CHILD USAGE
515
516 In addition to the L<AnyEvent::Fork::RPC> API, this module implements one
517 more child-side function:
518
519 =over 4
520
521 =item AnyEvent::Fork::Pool::retire ()
522
523 This function sends an event to the parent process to request retirement:
524 the worker is removed from the pool and no new jobs will be sent to it,
525 but it still has to handle the jobs that are already queued.
526
527 The parentheses are part of the syntax: the function usually isn't defined
528 when you compile your code (because that happens I<before> handing the
529 template process over to C<AnyEvent::Fork::Pool::run>, so you need the
530 empty parentheses to tell Perl that the function is indeed a function.
531
532 Retiring a worker can be useful to gracefully shut it down when the worker
533 deems this useful. For example, after executing a job, it could check the
534 process size or the number of jobs handled so far, and if either is too
535 high, the worker could request to be retired, to avoid memory leaks to
536 accumulate.
537
538 Example: retire a worker after it has handled roughly 100 requests. It
539 doesn't matter whether you retire at the beginning or end of your request,
540 as the worker will continue to handle some outstanding requests. Likewise,
541 it's ok to call retire multiple times.
542
543 my $count = 0;
544
545 sub my::worker {
546
547 ++$count == 100
548 and AnyEvent::Fork::Pool::retire ();
549
550 ... normal code goes here
551 }
552
553 =back
554
555 =head1 POOL PARAMETERS RECIPES
556
557 This section describes some recipes for pool parameters. These are mostly
558 meant for the synchronous RPC backend, as the asynchronous RPC backend
559 changes the rules considerably, making workers themselves responsible for
560 their scheduling.
561
562 =over 4
563
564 =item low latency - set load = 1
565
566 If you need a deterministic low latency, you should set the C<load>
567 parameter to C<1>. This ensures that never more than one job is sent to
568 each worker. This avoids having to wait for a previous job to finish.
569
570 This makes most sense with the synchronous (default) backend, as the
571 asynchronous backend can handle multiple requests concurrently.
572
573 =item lowest latency - set load = 1 and idle = max
574
575 To achieve the lowest latency, you additionally should disable any dynamic
576 resizing of the pool by setting C<idle> to the same value as C<max>.
577
578 =item high throughput, cpu bound jobs - set load >= 2, max = #cpus
579
580 To get high throughput with cpu-bound jobs, you should set the maximum
581 pool size to the number of cpus in your system, and C<load> to at least
582 C<2>, to make sure there can be another job waiting for the worker when it
583 has finished one.
584
585 The value of C<2> for C<load> is the minimum value that I<can> achieve
586 100% throughput, but if your parent process itself is sometimes busy, you
587 might need higher values. Also there is a limit on the amount of data that
588 can be "in flight" to the worker, so if you send big blobs of data to your
589 worker, C<load> might have much less of an effect.
590
591 =item high throughput, I/O bound jobs - set load >= 2, max = 1, or very high
592
593 When your jobs are I/O bound, using more workers usually boils down to
594 higher throughput, depending very much on your actual workload - sometimes
595 having only one worker is best, for example, when you read or write big
596 files at maximum speed, as a second worker will increase seek times.
597
598 =back
599
600 =head1 EXCEPTIONS
601
602 The same "policy" as with L<AnyEvent::Fork::RPC> applies - exceptions
603 will not be caught, and exceptions in both worker and in callbacks causes
604 undesirable or undefined behaviour.
605
606 =head1 SEE ALSO
607
608 L<AnyEvent::Fork>, to create the processes in the first place.
609
610 L<AnyEvent::Fork::Remote>, likewise, but helpful for remote processes.
611
612 L<AnyEvent::Fork::RPC>, which implements the RPC protocol and API.
613
614 =head1 AUTHOR AND CONTACT INFORMATION
615
616 Marc Lehmann <schmorp@schmorp.de>
617 http://software.schmorp.de/pkg/AnyEvent-Fork-Pool
618
619 =cut
620
621 1
622