ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-Fork-Pool/Pool.pm
(Generate patch)

Comparing AnyEvent-Fork-Pool/Pool.pm (file contents):
Revision 1.5 by root, Sat Apr 20 19:33:23 2013 UTC vs.
Revision 1.9 by root, Thu Apr 25 00:27:22 2013 UTC

1=head1 NAME 1=head1 NAME
2 2
3AnyEvent::Fork::Pool - simple process pool manager on top of AnyEvent::Fork 3AnyEvent::Fork::Pool - simple process pool manager on top of AnyEvent::Fork
4
5THE API IS NOT FINISHED, CONSIDER THIS AN ALPHA RELEASE
4 6
5=head1 SYNOPSIS 7=head1 SYNOPSIS
6 8
7 use AnyEvent; 9 use AnyEvent;
8 use AnyEvent::Fork::Pool; 10 use AnyEvent::Fork::Pool;
9 # use AnyEvent::Fork is not needed 11 # use AnyEvent::Fork is not needed
10 12
11 # all parameters with default values 13 # all possible parameters shown, with default values
12 my $pool = AnyEvent::Fork 14 my $pool = AnyEvent::Fork
13 ->new 15 ->new
14 ->require ("MyWorker") 16 ->require ("MyWorker")
15 ->AnyEvent::Fork::Pool::run ( 17 ->AnyEvent::Fork::Pool::run (
16 "MyWorker::run", # the worker function 18 "MyWorker::run", # the worker function
17 19
18 # pool management 20 # pool management
19 max => 4, # absolute maximum # of processes 21 max => 4, # absolute maximum # of processes
20 idle => 2, # minimum # of idle processes 22 idle => 0, # minimum # of idle processes
21 load => 2, # queue at most this number of jobs per process 23 load => 2, # queue at most this number of jobs per process
22 start => 0.1, # wait this many seconds before starting a new process 24 start => 0.1, # wait this many seconds before starting a new process
23 stop => 1, # wait this many seconds before stopping an idle process 25 stop => 10, # wait this many seconds before stopping an idle process
24 on_destroy => (my $finish = AE::cv), # called when object is destroyed 26 on_destroy => (my $finish = AE::cv), # called when object is destroyed
25 27
26 # parameters passed to AnyEvent::Fork::RPC 28 # parameters passed to AnyEvent::Fork::RPC
27 async => 0, 29 async => 0,
28 on_error => sub { die "FATAL: $_[0]\n" }, 30 on_error => sub { die "FATAL: $_[0]\n" },
48pool of processes that handles jobs. 50pool of processes that handles jobs.
49 51
50Understanding of L<AnyEvent::Fork> is helpful but not critical to be able 52Understanding of L<AnyEvent::Fork> is helpful but not critical to be able
51to use this module, but a thorough understanding of L<AnyEvent::Fork::RPC> 53to use this module, but a thorough understanding of L<AnyEvent::Fork::RPC>
52is, as it defines the actual API that needs to be implemented in the 54is, as it defines the actual API that needs to be implemented in the
53children. 55worker processes.
54 56
55=head1 EXAMPLES 57=head1 EXAMPLES
56 58
57=head1 PARENT USAGE 59=head1 PARENT USAGE
60
61To create a pool, you first have to create a L<AnyEvent::Fork> object -
62this object becomes your template process. Whenever a new worker process
63is needed, it is forked from this template process. Then you need to
64"hand off" this template process to the C<AnyEvent::Fork::Pool> module by
65calling its run method on it:
66
67 my $template = AnyEvent::Fork
68 ->new
69 ->require ("SomeModule", "MyWorkerModule");
70
71 my $pool = $template->AnyEvent::Fork::Pool::run ("MyWorkerModule::myfunction");
72
73The pool "object" is not a regular Perl object, but a code reference that
74you can call and that works roughly like calling the worker function
75directly, except that it returns nothing but instead you need to specify a
76callback to be invoked once results are in:
77
78 $pool->(1, 2, 3, sub { warn "myfunction(1,2,3) returned @_" });
58 79
59=over 4 80=over 4
60 81
61=cut 82=cut
62 83
68 89
69use Guard (); 90use Guard ();
70use Array::Heap (); 91use Array::Heap ();
71 92
72use AnyEvent; 93use AnyEvent;
94# explicit version on next line, as some cpan-testers test with the 0.1 version,
95# ignoring dependencies, and this line will at least give a clear indication of that.
73use AnyEvent::Fork; # we don't actually depend on it, this is for convenience 96use AnyEvent::Fork 0.6; # we don't actually depend on it, this is for convenience
74use AnyEvent::Fork::RPC; 97use AnyEvent::Fork::RPC;
75 98
76# these are used for the first and last argument of events 99# these are used for the first and last argument of events
77# in the hope of not colliding. yes, I don't like it either, 100# in the hope of not colliding. yes, I don't like it either,
78# but didn't come up with an obviously better alternative. 101# but didn't come up with an obviously better alternative.
81 104
82our $VERSION = 0.1; 105our $VERSION = 0.1;
83 106
84=item my $pool = AnyEvent::Fork::Pool::run $fork, $function, [key => value...] 107=item my $pool = AnyEvent::Fork::Pool::run $fork, $function, [key => value...]
85 108
86The traditional way to call it. But it is way cooler to call it in the 109The traditional way to call the pool creation function. But it is way
87following way: 110cooler to call it in the following way:
88 111
89=item my $pool = $fork->AnyEvent::Fork::Pool::run ($function, [key => value...]) 112=item my $pool = $fork->AnyEvent::Fork::Pool::run ($function, [key => value...])
90 113
91Creates a new pool object with the specified C<$function> as function 114Creates a new pool object with the specified C<$function> as function
92(name) to call for each request. The pool uses the C<$fork> object as the 115(name) to call for each request. The pool uses the C<$fork> object as the
105 128
106The pool consists of a certain number of worker processes. These options 129The pool consists of a certain number of worker processes. These options
107decide how many of these processes exist and when they are started and 130decide how many of these processes exist and when they are started and
108stopped. 131stopped.
109 132
133The worker pool is dynamically resized, according to (perceived :)
134load. The minimum size is given by the C<idle> parameter and the maximum
135size is given by the C<max> parameter. A new worker is started every
136C<start> seconds at most, and an idle worker is stopped at most every
137C<stop> second.
138
139You can specify the amount of jobs sent to a worker concurrently using the
140C<load> parameter.
141
110=over 4 142=over 4
111 143
112=item idle => $count (default: 0) 144=item idle => $count (default: 0)
113 145
114The minimum amount of idle processes in the pool - when there are fewer 146The minimum amount of idle processes in the pool - when there are fewer
115than this many idle workers, C<AnyEvent::Fork::Pool> will try to start new 147than this many idle workers, C<AnyEvent::Fork::Pool> will try to start new
116ones, subject to C<max> and C<start>. 148ones, subject to the limits set by C<max> and C<start>.
117 149
118This is also the initial/minimum amount of workers in the pool. The 150This is also the initial amount of workers in the pool. The default of
119default of zero means that the pool starts empty and can shrink back to 151zero means that the pool starts empty and can shrink back to zero workers
120zero workers over time. 152over time.
121 153
122=item max => $count (default: 4) 154=item max => $count (default: 4)
123 155
124The maximum number of processes in the pool, in addition to the template 156The maximum number of processes in the pool, in addition to the template
125process. C<AnyEvent::Fork::Pool> will never create more than this number 157process. C<AnyEvent::Fork::Pool> will never have more than this number of
126of worker processes, although there can be more temporarily when a worker 158worker processes, although there can be more temporarily when a worker is
127is shut down and hasn't exited yet. 159shut down and hasn't exited yet.
128 160
129=item load => $count (default: 2) 161=item load => $count (default: 2)
130 162
131The maximum number of concurrent jobs sent to a single worker 163The maximum number of concurrent jobs sent to a single worker process.
132process. Worker processes that handle this number of jobs already are
133called "busy".
134 164
135Jobs that cannot be sent to a worker immediately (because all workers are 165Jobs that cannot be sent to a worker immediately (because all workers are
136busy) will be queued until a worker is available. 166busy) will be queued until a worker is available.
137 167
168Setting this low improves latency. For example, at C<1>, every job that
169is sent to a worker is sent to a completely idle worker that doesn't run
170any other jobs. The downside is that throughput is reduced - a worker that
171finishes a job needs to wait for a new job from the parent.
172
173The default of C<2> is usually a good compromise.
174
138=item start => $seconds (default: 0.1) 175=item start => $seconds (default: 0.1)
139 176
140When a job is queued and all workers are busy, a timer is started. If the 177When there are fewer than C<idle> workers (or all workers are completely
141timer elapses and there are still jobs that cannot be queued to a worker, 178busy), then a timer is started. If the timer elapses and there are still
142a new worker is started. 179jobs that cannot be queued to a worker, a new worker is started.
143 180
144This configurs the time that all workers must be busy before a new worker 181This sets the minimum time that all workers must be busy before a new
145is started. Or, put differently, the minimum delay betwene starting new 182worker is started. Or, put differently, the minimum delay between starting
146workers. 183new workers.
147 184
148The delay is zero by default, which means new workers will be started 185The delay is small by default, which means new workers will be started
149without delay. 186relatively quickly. A delay of C<0> is possible, and ensures that the pool
187will grow as quickly as possible under load.
150 188
189Non-zero values are useful to avoid "exploding" a pool because a lot of
190jobs are queued in an instant.
191
192Higher values are often useful to improve efficiency at the cost of
193latency - when fewer processes can do the job over time, starting more and
194more is not necessarily going to help.
195
151=item stop => $seconds (default: 1) 196=item stop => $seconds (default: 10)
152 197
153When a worker has no jobs to execute it becomes idle. An idle worker that 198When a worker has no jobs to execute it becomes idle. An idle worker that
154hasn't executed a job within this amount of time will be stopped, unless 199hasn't executed a job within this amount of time will be stopped, unless
155the other parameters say otherwise. 200the other parameters say otherwise.
156 201
202Setting this to a very high value means that workers stay around longer,
203even when they have nothing to do, which can be good as they don't have to
204be started on the netx load spike again.
205
206Setting this to a lower value can be useful to avoid memory or simply
207process table wastage.
208
209Usually, setting this to a time longer than the time between load spikes
210is best - if you expect a lot of requests every minute and little work
211in between, setting this to longer than a minute avoids having to stop
212and start workers. On the other hand, you have to ask yourself if letting
213workers run idle is a good use of your resources. Try to find a good
214balance between resource usage of your workers and the time to start new
215workers - the processes created by L<AnyEvent::Fork> itself is fats at
216creating workers while not using much memory for them, so most of the
217overhead is likely from your own code.
218
157=item on_destroy => $callback->() (default: none) 219=item on_destroy => $callback->() (default: none)
158 220
159When a pool object goes out of scope, it will still handle all outstanding 221When a pool object goes out of scope, the outstanding requests are still
160jobs. After that, it will destroy all workers (and also the template 222handled till completion. Only after handling all jobs will the workers
161process if it isn't referenced otherwise). 223be destroyed (and also the template process if it isn't referenced
224otherwise).
225
226To find out when a pool I<really> has finished its work, you can set this
227callback, which will be called when the pool has been destroyed.
162 228
163=back 229=back
164 230
165=item Template Process 231=item AnyEvent::Fork::RPC Parameters
166 232
167The worker processes are all forked from a single template 233These parameters are all passed more or less directly to
168process. Ideally, all modules and all cdoe used by the worker, as well as 234L<AnyEvent::Fork::RPC>. They are only briefly mentioned here, for
169any shared data structures should be loaded into the template process, to 235their full documentation please refer to the L<AnyEvent::Fork::RPC>
170take advantage of data sharing via fork. 236documentation. Also, the default values mentioned here are only documented
171 237as a best effort - the L<AnyEvent::Fork::RPC> documentation is binding.
172You can create your own template process by creating a L<AnyEvent::Fork>
173object yourself and passing it as the C<template> parameter, but
174C<AnyEvent::Fork::Pool> can create one for you, including some standard
175options.
176 238
177=over 4 239=over 4
178 240
179=item template => $fork (default: C<< AnyEvent::Fork->new >>)
180
181The template process to use, if you want to create your own.
182
183=item require => \@modules (default: C<[]>)
184
185The modules in this list will be laoded into the template process.
186
187=item eval => "# perl code to execute in template" (default: none)
188
189This is a perl string that is evaluated after creating the template
190process and after requiring the modules. It can do whatever it wants to
191configure the process, but it must not do anything that would keep a later
192fork from working (so must not create event handlers or (real) threads for
193example).
194
195=back
196
197=item AnyEvent::Fork::RPC Parameters
198
199These parameters are all passed directly to L<AnyEvent::Fork::RPC>. They
200are only briefly mentioned here, for their full documentation
201please refer to the L<AnyEvent::Fork::RPC> documentation. Also, the
202default values mentioned here are only documented as a best effort -
203L<AnyEvent::Fork::RPC> documentation is binding.
204
205=over 4
206
207=item async => $boolean (default: 0) 241=item async => $boolean (default: 0)
208 242
209Whether to sue the synchronous or asynchronous RPC backend. 243Whether to use the synchronous or asynchronous RPC backend.
210 244
211=item on_error => $callback->($message) (default: die with message) 245=item on_error => $callback->($message) (default: die with message)
212 246
213The callback to call on any (fatal) errors. 247The callback to call on any (fatal) errors.
214 248
235 269
236 my $max = $arg{max} || 4; 270 my $max = $arg{max} || 4;
237 my $idle = $arg{idle} || 0, 271 my $idle = $arg{idle} || 0,
238 my $load = $arg{load} || 2, 272 my $load = $arg{load} || 2,
239 my $start = $arg{start} || 0.1, 273 my $start = $arg{start} || 0.1,
240 my $stop = $arg{stop} || 1, 274 my $stop = $arg{stop} || 10,
241 my $on_event = $arg{on_event} || sub { }, 275 my $on_event = $arg{on_event} || sub { },
242 my $on_destroy = $arg{on_destroy}; 276 my $on_destroy = $arg{on_destroy};
243 277
244 my @rpc = ( 278 my @rpc = (
245 async => $arg{async}, 279 async => $arg{async},
258 292
259 $template 293 $template
260 ->require ("AnyEvent::Fork::RPC::" . ($arg{async} ? "Async" : "Sync")) 294 ->require ("AnyEvent::Fork::RPC::" . ($arg{async} ? "Async" : "Sync"))
261 ->eval (' 295 ->eval ('
262 my ($magic0, $magic1) = @_; 296 my ($magic0, $magic1) = @_;
263 sub AnyEvent::Fork::Pool::quit() { 297 sub AnyEvent::Fork::Pool::retire() {
264 AnyEvent::Fork::RPC::on_event $magic0, "quit", $magic1; 298 AnyEvent::Fork::RPC::event $magic0, "quit", $magic1;
265 } 299 }
266 ', $magic0, $magic1) 300 ', $magic0, $magic1)
267 ->eval ($arg{eval}); 301 ;
268 302
269 $start_worker = sub { 303 $start_worker = sub {
270 my $proc = [0, 0, undef]; # load, index, rpc 304 my $proc = [0, 0, undef]; # load, index, rpc
271 305
272 $proc->[2] = $template 306 $proc->[2] = $template
298 $proc->[0] 332 $proc->[0]
299 or --$nidle; 333 or --$nidle;
300 334
301 Array::Heap::splice_heap_idx @pool, $proc->[1] 335 Array::Heap::splice_heap_idx @pool, $proc->[1]
302 if defined $proc->[1]; 336 if defined $proc->[1];
337
338 @$proc = 0; # tell others to leave it be
303 }; 339 };
304 340
305 $want_start = sub { 341 $want_start = sub {
306 undef $stop_w; 342 undef $stop_w;
307 343
326 }; 362 };
327 363
328 $scheduler = sub { 364 $scheduler = sub {
329 if (@queue) { 365 if (@queue) {
330 while (@queue) { 366 while (@queue) {
367 @pool or $start_worker->();
368
331 my $proc = $pool[0]; 369 my $proc = $pool[0];
332 370
333 if ($proc->[0] < $load) { 371 if ($proc->[0] < $load) {
334 # found free worker, increase load 372 # found free worker, increase load
335 unless ($proc->[0]++) { 373 unless ($proc->[0]++) {
353 or $want_stop->(); 391 or $want_stop->();
354 392
355 Array::Heap::adjust_heap_idx @pool, $proc->[1] 393 Array::Heap::adjust_heap_idx @pool, $proc->[1]
356 if defined $proc->[1]; 394 if defined $proc->[1];
357 395
396 &$ocb;
397
358 $scheduler->(); 398 $scheduler->();
359
360 &$ocb;
361 }); 399 });
362 } else { 400 } else {
363 $want_start->() 401 $want_start->()
364 unless @pool >= $max; 402 unless @pool >= $max;
365 403
397 435
398=item $pool->(..., $cb->(...)) 436=item $pool->(..., $cb->(...))
399 437
400Call the RPC function of a worker with the given arguments, and when the 438Call the RPC function of a worker with the given arguments, and when the
401worker is done, call the C<$cb> with the results, just like calling the 439worker is done, call the C<$cb> with the results, just like calling the
402L<AnyEvent::Fork::RPC> object directly. 440RPC object durectly - see the L<AnyEvent::Fork::RPC> documentation for
441details on the RPC API.
403 442
404If there is no free worker, the call will be queued. 443If there is no free worker, the call will be queued until a worker becomes
444available.
405 445
406Note that there can be considerable time between calling this method and 446Note that there can be considerable time between calling this method and
407the call actually being executed. During this time, the parameters passed 447the call actually being executed. During this time, the parameters passed
408to this function are effectively read-only - modifying them after the call 448to this function are effectively read-only - modifying them after the call
409and before the callback is invoked causes undefined behaviour. 449and before the callback is invoked causes undefined behaviour.
410 450
411=cut 451=cut
412 452
453=item $cpus = AnyEvent::Fork::Pool::ncpu [$default_cpus]
454
455=item ($cpus, $eus) = AnyEvent::Fork::Pool::ncpu [$default_cpus]
456
457Tries to detect the number of CPUs (C<$cpus> often called cpu cores
458nowadays) and execution units (C<$eus>) which include e.g. extra
459hyperthreaded units). When C<$cpus> cannot be determined reliably,
460C<$default_cpus> is returned for both values, or C<1> if it is missing.
461
462For normal CPU bound uses, it is wise to have as many worker processes
463as CPUs in the system (C<$cpus>), if nothing else uses the CPU. Using
464hyperthreading is usually detrimental to performance, but in those rare
465cases where that really helps it might be beneficial to use more workers
466(C<$eus>).
467
468Currently, F</proc/cpuinfo> is parsed on GNU/Linux systems for both
469C<$cpus> and C<$eu>, and on {Free,Net,Open}BSD, F<sysctl -n hw.ncpu> is
470used for C<$cpus>.
471
472Example: create a worker pool with as many workers as cpu cores, or C<2>,
473if the actual number could not be determined.
474
475 $fork->AnyEvent::Fork::Pool::run ("myworker::function",
476 max => (scalar AnyEvent::Fork::Pool::ncpu 2),
477 );
478
479=cut
480
481BEGIN {
482 if ($^O eq "linux") {
483 *ncpu = sub(;$) {
484 my ($cpus, $eus);
485
486 if (open my $fh, "<", "/proc/cpuinfo") {
487 my %id;
488
489 while (<$fh>) {
490 if (/^core id\s*:\s*(\d+)/) {
491 ++$eus;
492 undef $id{$1};
493 }
494 }
495
496 $cpus = scalar keys %id;
497 } else {
498 $cpus = $eus = @_ ? shift : 1;
499 }
500 wantarray ? ($cpus, $eus) : $cpus
501 };
502 } elsif ($^O eq "freebsd" || $^O eq "netbsd" || $^O eq "openbsd") {
503 *ncpu = sub(;$) {
504 my $cpus = qx<sysctl -n hw.ncpu> * 1
505 || (@_ ? shift : 1);
506 wantarray ? ($cpus, $cpus) : $cpus
507 };
508 } else {
509 *ncpu = sub(;$) {
510 my $cpus = @_ ? shift : 1;
511 wantarray ? ($cpus, $cpus) : $cpus
512 };
513 }
514}
515
413=back 516=back
517
518=head1 CHILD USAGE
519
520In addition to the L<AnyEvent::Fork::RPC> API, this module implements one
521more child-side function:
522
523=over 4
524
525=item AnyEvent::Fork::Pool::retire ()
526
527This function sends an event to the parent process to request retirement:
528the worker is removed from the pool and no new jobs will be sent to it,
529but it has to handle the jobs that are already queued.
530
531The parentheses are part of the syntax: the function usually isn't defined
532when you compile your code (because that happens I<before> handing the
533template process over to C<AnyEvent::Fork::Pool::run>, so you need the
534empty parentheses to tell Perl that the function is indeed a function.
535
536Retiring a worker can be useful to gracefully shut it down when the worker
537deems this useful. For example, after executing a job, one could check
538the process size or the number of jobs handled so far, and if either is
539too high, the worker could ask to get retired, to avoid memory leaks to
540accumulate.
541
542=back
543
544=head1 POOL PARAMETERS RECIPES
545
546This section describes some recipes for pool paramaters. These are mostly
547meant for the synchronous RPC backend, as the asynchronous RPC backend
548changes the rules considerably, making workers themselves responsible for
549their scheduling.
550
551=over 4
552
553=item low latency - set load = 1
554
555If you need a deterministic low latency, you should set the C<load>
556parameter to C<1>. This ensures that never more than one job is sent to
557each worker. This avoids having to wait for a previous job to finish.
558
559This makes most sense with the synchronous (default) backend, as the
560asynchronous backend can handle multiple requests concurrently.
561
562=item lowest latency - set load = 1 and idle = max
563
564To achieve the lowest latency, you additionally should disable any dynamic
565resizing of the pool by setting C<idle> to the same value as C<max>.
566
567=item high throughput, cpu bound jobs - set load >= 2, max = #cpus
568
569To get high throughput with cpu-bound jobs, you should set the maximum
570pool size to the number of cpus in your system, and C<load> to at least
571C<2>, to make sure there can be another job waiting for the worker when it
572has finished one.
573
574The value of C<2> for C<load> is the minimum value that I<can> achieve
575100% throughput, but if your parent process itself is sometimes busy, you
576might need higher values. Also there is a limit on the amount of data that
577can be "in flight" to the worker, so if you send big blobs of data to your
578worker, C<load> might have much less of an effect.
579
580=item high throughput, I/O bound jobs - set load >= 2, max = 1, or very high
581
582When your jobs are I/O bound, using more workers usually boils down to
583higher throughput, depending very much on your actual workload - sometimes
584having only one worker is best, for example, when you read or write big
585files at maixmum speed, as a second worker will increase seek times.
586
587=back
588
589=head1 EXCEPTIONS
590
591The same "policy" as with L<AnyEvent::Fork::RPC> applies - exceptins will
592not be caught, and exceptions in both worker and in callbacks causes
593undesirable or undefined behaviour.
414 594
415=head1 SEE ALSO 595=head1 SEE ALSO
416 596
417L<AnyEvent::Fork>, to create the processes in the first place. 597L<AnyEvent::Fork>, to create the processes in the first place.
418 598

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines