ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-Fork-Pool/Pool.pm
(Generate patch)

Comparing AnyEvent-Fork-Pool/Pool.pm (file contents):
Revision 1.2 by root, Thu Apr 18 21:37:27 2013 UTC vs.
Revision 1.17 by root, Thu Oct 27 07:27:56 2022 UTC

3AnyEvent::Fork::Pool - simple process pool manager on top of AnyEvent::Fork 3AnyEvent::Fork::Pool - simple process pool manager on top of AnyEvent::Fork
4 4
5=head1 SYNOPSIS 5=head1 SYNOPSIS
6 6
7 use AnyEvent; 7 use AnyEvent;
8 use AnyEvent::Fork;
8 use AnyEvent::Fork::Pool; 9 use AnyEvent::Fork::Pool;
9 # use AnyEvent::Fork is not needed
10 10
11 # all parameters with default values 11 # all possible parameters shown, with default values
12 my $pool = new AnyEvent::Fork::Pool 12 my $pool = AnyEvent::Fork
13 "MyWorker::run", 13 ->new
14 ->require ("MyWorker")
15 ->AnyEvent::Fork::Pool::run (
16 "MyWorker::run", # the worker function
14 17
15 # pool management 18 # pool management
16 min => 0, # minimum # of processes
17 max => 4, # maximum # of processes 19 max => 4, # absolute maximum # of processes
18 max_queue => 2, # queue at most this number of jobs per process 20 idle => 0, # minimum # of idle processes
21 load => 2, # queue at most this number of jobs per process
19 min_delay => 0, # wait this many seconds before starting a new process 22 start => 0.1, # wait this many seconds before starting a new process
20 min_idle => 0, # try to have at least this amount of idle processes
21 max_idle => 1, # at most this many idle processes
22 idle_time => 1, # wait this many seconds before killing an idle process 23 stop => 10, # wait this many seconds before stopping an idle process
23 on_destroy => (my $finish = AE::cv), 24 on_destroy => (my $finish = AE::cv), # called when object is destroyed
24 25
25 # template process
26 template => AnyEvent::Fork->new, # the template process to use
27 require => [MyWorker::], # module(s) to load
28 eval => "# perl code to execute in template",
29
30 # parameters passed to AnyEvent::Fork::RPC 26 # parameters passed to AnyEvent::Fork::RPC
31 async => 0, 27 async => 0,
32 on_error => sub { die "FATAL: $_[0]\n" }, 28 on_error => sub { die "FATAL: $_[0]\n" },
33 on_event => sub { my @ev = @_ }, 29 on_event => sub { my @ev = @_ },
34 init => "MyWorker::init", 30 init => "MyWorker::init",
35 serialiser => $AnyEvent::Fork::RPC::STRING_SERIALISER, 31 serialiser => $AnyEvent::Fork::RPC::STRING_SERIALISER,
36 ; 32 );
37 33
38 for (1..10) { 34 for (1..10) {
39 $pool->call (doit => $_, sub { 35 $pool->(doit => $_, sub {
40 print "MyWorker::run returned @_\n"; 36 print "MyWorker::run returned @_\n";
41 }); 37 });
42 } 38 }
43 39
44 undef $pool; 40 undef $pool;
45 41
46 $finish->recv; 42 $finish->recv;
47 43
48=head1 DESCRIPTION 44=head1 DESCRIPTION
49 45
50This module uses processes created via L<AnyEvent::Fork> and the RPC 46This module uses processes created via L<AnyEvent::Fork> (or
51protocol implement in L<AnyEvent::Fork::RPC> to create a load-balanced 47L<AnyEvent::Fork::Remote>) and the RPC protocol implement in
52pool of processes that handles jobs. 48L<AnyEvent::Fork::RPC> to create a load-balanced pool of processes that
49handles jobs.
53 50
54Understanding of L<AnyEvent::Fork> is helpful but not critical to be able 51Understanding L<AnyEvent::Fork> is helpful but not required to use this
55to use this module, but a thorough understanding of L<AnyEvent::Fork::RPC> 52module, but a thorough understanding of L<AnyEvent::Fork::RPC> is, as
56is, as it defines the actual API that needs to be implemented in the 53it defines the actual API that needs to be implemented in the worker
57children. 54processes.
58
59=head1 EXAMPLES
60 55
61=head1 PARENT USAGE 56=head1 PARENT USAGE
62 57
58To create a pool, you first have to create a L<AnyEvent::Fork> object -
59this object becomes your template process. Whenever a new worker process
60is needed, it is forked from this template process. Then you need to
61"hand off" this template process to the C<AnyEvent::Fork::Pool> module by
62calling its run method on it:
63
64 my $template = AnyEvent::Fork
65 ->new
66 ->require ("SomeModule", "MyWorkerModule");
67
68 my $pool = $template->AnyEvent::Fork::Pool::run ("MyWorkerModule::myfunction");
69
70The pool "object" is not a regular Perl object, but a code reference that
71you can call and that works roughly like calling the worker function
72directly, except that it returns nothing but instead you need to specify a
73callback to be invoked once results are in:
74
75 $pool->(1, 2, 3, sub { warn "myfunction(1,2,3) returned @_" });
76
63=over 4 77=over 4
64 78
65=cut 79=cut
66 80
67package AnyEvent::Fork::Pool; 81package AnyEvent::Fork::Pool;
68 82
69use common::sense; 83use common::sense;
70 84
71use Scalar::Util (); 85use Scalar::Util ();
72 86
87use Guard ();
73use Array::Heap (); 88use Array::Heap ();
74 89
75use AnyEvent; 90use AnyEvent;
76use AnyEvent::Fork; # we don't actually depend on it, this is for convenience
77use AnyEvent::Fork::RPC; 91use AnyEvent::Fork::RPC;
78 92
93# these are used for the first and last argument of events
94# in the hope of not colliding. yes, I don't like it either,
95# but didn't come up with an obviously better alternative.
79my $magic0 = ':t6Z@HK1N%Dx@_7?=~-7NQgWDdAs6a,jFN=wLO0*jD*1%P'; 96my $magic0 = ':t6Z@HK1N%Dx@_7?=~-7NQgWDdAs6a,jFN=wLO0*jD*1%P';
80my $magic2 = '<~53rexz.U`!]X[A235^"fyEoiTF\T~oH1l/N6+Djep9b~bI9`\1x%B~vWO1q*'; 97my $magic1 = '<~53rexz.U`!]X[A235^"fyEoiTF\T~oH1l/N6+Djep9b~bI9`\1x%B~vWO1q*';
81 98
82our $VERSION = 0.1; 99our $VERSION = 1.3;
83 100
84=item my $rpc = new AnyEvent::Fork::Pool $function, [key => value...] 101=item my $pool = AnyEvent::Fork::Pool::run $fork, $function, [key => value...]
102
103The traditional way to call the pool creation function. But it is way
104cooler to call it in the following way:
105
106=item my $pool = $fork->AnyEvent::Fork::Pool::run ($function, [key => value...])
85 107
86Creates a new pool object with the specified C<$function> as function 108Creates a new pool object with the specified C<$function> as function
87(name) to call for each request. 109(name) to call for each request. The pool uses the C<$fork> object as the
88 110template when creating worker processes.
89A pool consists of a template process that contains the code and data that
90the worker processes need. And a number of worker processes that have been
91forked off of that template process.
92 111
93You can supply your own template process, or tell C<AnyEvent::Fork::Pool> 112You can supply your own template process, or tell C<AnyEvent::Fork::Pool>
94to create one. 113to create one.
95 114
96A relatively large number of key/value pairs can be specified to influence 115A relatively large number of key/value pairs can be specified to influence
101 120
102=item Pool Management 121=item Pool Management
103 122
104The pool consists of a certain number of worker processes. These options 123The pool consists of a certain number of worker processes. These options
105decide how many of these processes exist and when they are started and 124decide how many of these processes exist and when they are started and
106stopp.ed 125stopped.
126
127The worker pool is dynamically resized, according to (perceived :)
128load. The minimum size is given by the C<idle> parameter and the maximum
129size is given by the C<max> parameter. A new worker is started every
130C<start> seconds at most, and an idle worker is stopped at most every
131C<stop> second.
132
133You can specify the amount of jobs sent to a worker concurrently using the
134C<load> parameter.
107 135
108=over 4 136=over 4
109 137
110=item min => $count (default: 0) 138=item idle => $count (default: 0)
111 139
112The minimum number of processes in the pool, in addition to the template 140The minimum amount of idle processes in the pool - when there are fewer
113process. Even when idle, there will never be fewer than this number of 141than this many idle workers, C<AnyEvent::Fork::Pool> will try to start new
114worker processes. The default means that the pool can be empty. 142ones, subject to the limits set by C<max> and C<start>.
143
144This is also the initial amount of workers in the pool. The default of
145zero means that the pool starts empty and can shrink back to zero workers
146over time.
115 147
116=item max => $count (default: 4) 148=item max => $count (default: 4)
117 149
118The maximum number of processes in the pool, in addition to the template 150The maximum number of processes in the pool, in addition to the template
119process. C<AnyEvent::Fork::Pool> will never create more than this number 151process. C<AnyEvent::Fork::Pool> will never have more than this number of
120of processes. 152worker processes, although there can be more temporarily when a worker is
153shut down and hasn't exited yet.
121 154
122=item max_queue => $count (default: 2) 155=item load => $count (default: 2)
123 156
124The maximum number of jobs sent to a single worker process. Worker 157The maximum number of concurrent jobs sent to a single worker process.
125processes that handle this number of jobs already are called "busy".
126 158
127Jobs that cannot be sent to a worker immediately (because all workers are 159Jobs that cannot be sent to a worker immediately (because all workers are
128busy) will be queued until a worker is available. 160busy) will be queued until a worker is available.
129 161
162Setting this low improves latency. For example, at C<1>, every job that
163is sent to a worker is sent to a completely idle worker that doesn't run
164any other jobs. The downside is that throughput is reduced - a worker that
165finishes a job needs to wait for a new job from the parent.
166
167The default of C<2> is usually a good compromise.
168
130=item min_delay => $seconds (default: 0) 169=item start => $seconds (default: 0.1)
131 170
132When a job is queued and all workers are busy, a timer is started. If the 171When there are fewer than C<idle> workers (or all workers are completely
133timer elapses and there are still jobs that cannot be queued to a worker, 172busy), then a timer is started. If the timer elapses and there are still
134a new worker is started. 173jobs that cannot be queued to a worker, a new worker is started.
135 174
136This configurs the time that all workers must be busy before a new worker 175This sets the minimum time that all workers must be busy before a new
137is started. Or, put differently, the minimum delay betwene starting new 176worker is started. Or, put differently, the minimum delay between starting
138workers. 177new workers.
139 178
140The delay is zero by default, which means new workers will be started 179The delay is small by default, which means new workers will be started
141without delay. 180relatively quickly. A delay of C<0> is possible, and ensures that the pool
181will grow as quickly as possible under load.
142 182
143=item min_idle => $count (default: 0) 183Non-zero values are useful to avoid "exploding" a pool because a lot of
184jobs are queued in an instant.
144 185
145The minimum number of idle workers - when they are less, more 186Higher values are often useful to improve efficiency at the cost of
146are started. The C<min_delay> is still respected though, and 187latency - when fewer processes can do the job over time, starting more and
147C<min_idle>/C<min_delay> and C<max_idle>/C<idle_time> are useful to 188more is not necessarily going to help.
148dynamically adjust the pool.
149 189
150=item max_idle => $count (default: 1)
151
152The maximum number of idle workers. If a worker becomes idle and there are
153already this many idle workers, it will be stopped immediately instead of
154waiting for the idle timer to elapse.
155
156=item idle_time => $seconds (default: 1) 190=item stop => $seconds (default: 10)
157 191
158When a worker has no jobs to execute it becomes idle. An idle worker that 192When a worker has no jobs to execute it becomes idle. An idle worker that
159hasn't executed a job within this amount of time will be stopped, unless 193hasn't executed a job within this amount of time will be stopped, unless
160the other parameters say otherwise. 194the other parameters say otherwise.
161 195
196Setting this to a very high value means that workers stay around longer,
197even when they have nothing to do, which can be good as they don't have to
198be started on the netx load spike again.
199
200Setting this to a lower value can be useful to avoid memory or simply
201process table wastage.
202
203Usually, setting this to a time longer than the time between load spikes
204is best - if you expect a lot of requests every minute and little work
205in between, setting this to longer than a minute avoids having to stop
206and start workers. On the other hand, you have to ask yourself if letting
207workers run idle is a good use of your resources. Try to find a good
208balance between resource usage of your workers and the time to start new
209workers - the processes created by L<AnyEvent::Fork> itself is fats at
210creating workers while not using much memory for them, so most of the
211overhead is likely from your own code.
212
162=item on_destroy => $callback->() (default: none) 213=item on_destroy => $callback->() (default: none)
163 214
164When a pool object goes out of scope, it will still handle all outstanding 215When a pool object goes out of scope, the outstanding requests are still
165jobs. After that, it will destroy all workers (and also the template 216handled till completion. Only after handling all jobs will the workers
166process if it isn't referenced otherwise). 217be destroyed (and also the template process if it isn't referenced
218otherwise).
219
220To find out when a pool I<really> has finished its work, you can set this
221callback, which will be called when the pool has been destroyed.
167 222
168=back 223=back
169 224
170=item Template Process 225=item AnyEvent::Fork::RPC Parameters
171 226
172The worker processes are all forked from a single template 227These parameters are all passed more or less directly to
173process. Ideally, all modules and all cdoe used by the worker, as well as 228L<AnyEvent::Fork::RPC>. They are only briefly mentioned here, for
174any shared data structures should be loaded into the template process, to 229their full documentation please refer to the L<AnyEvent::Fork::RPC>
175take advantage of data sharing via fork. 230documentation. Also, the default values mentioned here are only documented
176 231as a best effort - the L<AnyEvent::Fork::RPC> documentation is binding.
177You can create your own template process by creating a L<AnyEvent::Fork>
178object yourself and passing it as the C<template> parameter, but
179C<AnyEvent::Fork::Pool> can create one for you, including some standard
180options.
181 232
182=over 4 233=over 4
183 234
184=item template => $fork (default: C<< AnyEvent::Fork->new >>) 235=item async => $boolean (default: 0)
185 236
186The template process to use, if you want to create your own. 237Whether to use the synchronous or asynchronous RPC backend.
187 238
188=item require => \@modules (default: C<[]>) 239=item on_error => $callback->($message) (default: die with message)
189 240
190The modules in this list will be laoded into the template process. 241The callback to call on any (fatal) errors.
191 242
192=item eval => "# perl code to execute in template" (default: none) 243=item on_event => $callback->(...) (default: C<sub { }>, unlike L<AnyEvent::Fork::RPC>)
193 244
194This is a perl string that is evaluated after creating the template 245The callback to invoke on events.
195process and after requiring the modules. It can do whatever it wants to 246
196configure the process, but it must not do anything that would keep a later 247=item init => $initfunction (default: none)
197fork from working (so must not create event handlers or (real) threads for 248
198example). 249The function to call in the child, once before handling requests.
250
251=item serialiser => $serialiser (defailt: $AnyEvent::Fork::RPC::STRING_SERIALISER)
252
253The serialiser to use.
199 254
200=back 255=back
201 256
202=item AnyEvent::Fork::RPC Parameters
203
204These parameters are all passed directly to L<AnyEvent::Fork::RPC>. They
205are only briefly mentioned here, for their full documentation
206please refer to the L<AnyEvent::Fork::RPC> documentation. Also, the
207default values mentioned here are only documented as a best effort -
208L<AnyEvent::Fork::RPC> documentation is binding.
209
210=over 4
211
212=item async => $boolean (default: 0)
213
214Whether to sue the synchronous or asynchronous RPC backend.
215
216=item on_error => $callback->($message) (default: die with message)
217
218The callback to call on any (fatal) errors.
219
220=item on_event => $callback->(...) (default: C<sub { }>, unlike L<AnyEvent::Fork::RPC>)
221
222The callback to invoke on events.
223
224=item init => $initfunction (default: none)
225
226The function to call in the child, once before handling requests.
227
228=item serialiser => $serialiser (defailt: $AnyEvent::Fork::RPC::STRING_SERIALISER)
229
230The serialiser to use.
231
232=back 257=back
233 258
234=back
235
236=cut 259=cut
237 260
238sub new { 261sub run {
239 my ($class, $function, %arg) = @_; 262 my ($template, $function, %arg) = @_;
240 263
241 my $self = bless { 264 my $max = $arg{max} || 4;
242 min => 0, 265 my $idle = $arg{idle} || 0,
243 max => 4, 266 my $load = $arg{load} || 2,
244 max_queue => 2, 267 my $start = $arg{start} || 0.1,
245 min_delay => 0, 268 my $stop = $arg{stop} || 10,
246 max_idle => 1, 269 my $on_event = $arg{on_event} || sub { },
247 idle_time => 1, 270 my $on_destroy = $arg{on_destroy};
248 on_event => sub { },
249 %arg,
250 pool => [],
251 queue => [],
252 }, $class;
253 271
254 $self->{function} = $function; 272 my @rpc = (
273 async => $arg{async},
274 init => $arg{init},
275 serialiser => delete $arg{serialiser},
276 on_error => $arg{on_error},
277 );
255 278
256 ($self->{template} ||= new AnyEvent::Fork) 279 my (@pool, @queue, $nidle, $start_w, $stop_w, $shutdown);
280 my ($start_worker, $stop_worker, $want_start, $want_stop, $scheduler);
281
282 my $destroy_guard = Guard::guard {
283 $on_destroy->()
284 if $on_destroy;
285 };
286
287 $template
257 ->require ("AnyEvent::Fork::RPC::" . ($self->{async} ? "Async" : "Sync")) 288 ->require ("AnyEvent::Fork::RPC::" . ($arg{async} ? "Async" : "Sync"))
258 ->require (@{ delete $self->{require} })
259 ->eval (' 289 ->eval ('
260 my ($magic0, $magic2) = @_; 290 my ($magic0, $magic1) = @_;
261 sub AnyEvent::Fork::Pool::quit() { 291 sub AnyEvent::Fork::Pool::retire() {
262 AnyEvent::Fork::RPC::on_event $magic0, "quit", $magic2; 292 AnyEvent::Fork::RPC::event $magic0, "quit", $magic1;
263 } 293 }
264 ', $magic0, $magic2) 294 ', $magic0, $magic1)
265 ->eval (delete $self->{eval}); 295 ;
266 296
267 $self->start 297 $start_worker = sub {
268 while @{ $self->{pool} } < $self->{min}; 298 my $proc = [0, 0, undef]; # load, index, rpc
269 299
270 $self 300 $proc->[2] = $template
301 ->fork
302 ->AnyEvent::Fork::RPC::run ($function,
303 @rpc,
304 on_event => sub {
305 if (@_ == 3 && $_[0] eq $magic0 && $_[2] eq $magic1) {
306 $destroy_guard if 0; # keep it alive
307
308 $_[1] eq "quit" and $stop_worker->($proc);
309 return;
310 }
311
312 &$on_event;
313 },
314 )
315 ;
316
317 ++$nidle;
318 Array::Heap::push_heap_idx @pool, $proc;
319
320 Scalar::Util::weaken $proc;
321 };
322
323 $stop_worker = sub {
324 my $proc = shift;
325
326 $proc->[0]
327 or --$nidle;
328
329 Array::Heap::splice_heap_idx @pool, $proc->[1]
330 if defined $proc->[1];
331
332 @$proc = 0; # tell others to leave it be
333 };
334
335 $want_start = sub {
336 undef $stop_w;
337
338 $start_w ||= AE::timer $start, $start, sub {
339 if (($nidle < $idle || @queue) && @pool < $max) {
340 $start_worker->();
341 $scheduler->();
342 } else {
343 undef $start_w;
344 }
345 };
346 };
347
348 $want_stop = sub {
349 $stop_w ||= AE::timer $stop, $stop, sub {
350 $stop_worker->($pool[0])
351 if $nidle;
352
353 undef $stop_w
354 if $nidle <= $idle;
355 };
356 };
357
358 $scheduler = sub {
359 if (@queue) {
360 while (@queue) {
361 @pool or $start_worker->();
362
363 my $proc = $pool[0];
364
365 if ($proc->[0] < $load) {
366 # found free worker, increase load
367 unless ($proc->[0]++) {
368 # worker became busy
369 --$nidle
370 or undef $stop_w;
371
372 $want_start->()
373 if $nidle < $idle && @pool < $max;
374 }
375
376 Array::Heap::adjust_heap_idx @pool, 0;
377
378 my $job = shift @queue;
379 my $ocb = pop @$job;
380
381 $proc->[2]->(@$job, sub {
382 # reduce load
383 --$proc->[0] # worker still busy?
384 or ++$nidle > $idle # not too many idle processes?
385 or $want_stop->();
386
387 Array::Heap::adjust_heap_idx @pool, $proc->[1]
388 if defined $proc->[1];
389
390 &$ocb;
391
392 $scheduler->();
393 });
394 } else {
395 $want_start->()
396 unless @pool >= $max;
397
398 last;
399 }
400 }
401 } elsif ($shutdown) {
402 undef $_->[2]
403 for @pool;
404
405 undef $start_w;
406 undef $start_worker; # frees $destroy_guard reference
407
408 $stop_worker->($pool[0])
409 while $nidle;
410 }
411 };
412
413 my $shutdown_guard = Guard::guard {
414 $shutdown = 1;
415 $scheduler->();
416 };
417
418 $start_worker->()
419 while @pool < $idle;
420
421 sub {
422 $shutdown_guard if 0; # keep it alive
423
424 $start_worker->()
425 unless @pool;
426
427 push @queue, [@_];
428 $scheduler->();
429 }
271} 430}
272 431
273sub start {
274 my ($self) = @_;
275
276 warn "start\n";#d#
277
278 Scalar::Util::weaken $self;
279
280 my $proc = [0, undef, undef];
281
282 $proc->[1] = $self->{template}
283 ->fork
284 ->AnyEvent::Fork::RPC::run ($self->{function},
285 async => $self->{async},
286 init => $self->{init},
287 serialiser => $self->{serialiser},
288 on_error => $self->{on_error},
289 on_event => sub {
290 if (@_ == 3 && $_[0] eq $magic0 && $_[2] eq $magic2) {
291 if ($_[1] eq "quit") {
292 my $pool = $self->{pool};
293 for (0 .. $#$pool) {
294 if ($pool->[$_] == $proc) {
295 Array::Heap::splice_heap @$pool, $_;
296 return;
297 }
298 }
299 die;
300 }
301 return;
302 }
303
304 &{ $self->{on_event} };
305 },
306 )
307 ;
308
309 ++$self->{idle};
310 Array::Heap::push_heap @{ $self->{pool} }, $proc;
311}
312
313=item $pool->call (..., $cb->(...)) 432=item $pool->(..., $cb->(...))
314 433
315Call the RPC function of a worker with the given arguments, and when the 434Call the RPC function of a worker with the given arguments, and when the
316worker is done, call the C<$cb> with the results, like just calling the 435worker is done, call the C<$cb> with the results, just like calling the
317L<AnyEvent::Fork::RPC> object directly. 436RPC object durectly - see the L<AnyEvent::Fork::RPC> documentation for
437details on the RPC API.
318 438
319If there is no free worker, the call will be queued. 439If there is no free worker, the call will be queued until a worker becomes
440available.
320 441
321Note that there can be considerable time between calling this method and 442Note that there can be considerable time between calling this method and
322the call actually being executed. During this time, the parameters passed 443the call actually being executed. During this time, the parameters passed
323to this function are effectively read-only - modifying them after the call 444to this function are effectively read-only - modifying them after the call
324and before the callback is invoked causes undefined behaviour. 445and before the callback is invoked causes undefined behaviour.
325 446
326=cut 447=cut
327 448
328sub scheduler { 449=item $cpus = AnyEvent::Fork::Pool::ncpu [$default_cpus]
329 my $self = shift;
330 450
331 my $pool = $self->{pool}; 451=item ($cpus, $eus) = AnyEvent::Fork::Pool::ncpu [$default_cpus]
332 my $queue = $self->{queue};
333 452
334 $self->start 453Tries to detect the number of CPUs (C<$cpus> often called CPU cores
335 unless @$pool; 454nowadays) and execution units (C<$eus>) which include e.g. extra
455hyperthreaded units). When C<$cpus> cannot be determined reliably,
456C<$default_cpus> is returned for both values, or C<1> if it is missing.
336 457
337 while (@$queue) { 458For normal CPU bound uses, it is wise to have as many worker processes
338 my $proc = $pool->[0]; 459as CPUs in the system (C<$cpus>), if nothing else uses the CPU. Using
460hyperthreading is usually detrimental to performance, but in those rare
461cases where that really helps it might be beneficial to use more workers
462(C<$eus>).
339 463
340 if ($proc->[0] < $self->{max_queue}) { 464Currently, F</proc/cpuinfo> is parsed on GNU/Linux systems for both
341 warn "free $proc $proc->[0]\n";#d# 465C<$cpus> and C<$eus>, and on {Free,Net,Open}BSD, F<sysctl -n hw.ncpu> is
342 # found free worker 466used for C<$cpus>.
343 --$self->{idle}
344 unless $proc->[0]++;
345 467
346 undef $proc->[2]; 468Example: create a worker pool with as many workers as CPU cores, or C<2>,
469if the actual number could not be determined.
347 470
348 Array::Heap::adjust_heap @$pool, 0; 471 $fork->AnyEvent::Fork::Pool::run ("myworker::function",
472 max => (scalar AnyEvent::Fork::Pool::ncpu 2),
473 );
349 474
350 my $job = shift @$queue; 475=cut
351 my $ocb = pop @$job;
352 476
353 $proc->[1]->(@$job, sub { 477BEGIN {
354 for (0 .. $#$pool) { 478 if ($^O eq "linux") {
355 if ($pool->[$_] == $proc) { 479 *ncpu = sub(;$) {
356 # reduce queue counter 480 my ($cpus, $eus);
357 unless (--$pool->[$_][0]) {
358 # worker becomes idle
359 my $to = ++$self->{idle} > $self->{max_idle}
360 ? 0
361 : $self->{idle_time};
362 481
363 $proc->[2] = AE::timer $to, 0, sub { 482 if (open my $fh, "<", "/proc/cpuinfo") {
364 undef $proc->[2]; 483 my %id;
365 484
366 warn "destroy $proc afzer $to\n";#d# 485 while (<$fh>) {
367 486 if (/^core id\s*:\s*(\d+)/) {
368 for (0 .. $#$pool) {
369 if ($pool->[$_] == $proc) {
370 Array::Heap::splice_heap @$pool, $_;
371 --$self->{idle};
372 last;
373 }
374 }
375 };
376 }
377
378 Array::Heap::adjust_heap @$pool, $_;
379 last; 487 ++$eus;
488 undef $id{$1};
380 } 489 }
381 } 490 }
382 &$ocb; 491
383 }); 492 $cpus = scalar keys %id;
384 } else { 493 } else {
385 warn "busy $proc->[0]\n";#d# 494 $cpus = $eus = @_ ? shift : 1;
386 # all busy, delay
387
388 $self->{min_delay_w} ||= AE::timer $self->{min_delay}, 0, sub {
389 delete $self->{min_delay_w};
390
391 if (@{ $self->{queue} }) {
392 $self->start;
393 $self->scheduler;
394 }
395 }; 495 }
396 last; 496 wantarray ? ($cpus, $eus) : $cpus
397 } 497 };
498 } elsif ($^O eq "freebsd" || $^O eq "netbsd" || $^O eq "openbsd") {
499 *ncpu = sub(;$) {
500 my $cpus = qx<sysctl -n hw.ncpu> * 1
501 || (@_ ? shift : 1);
502 wantarray ? ($cpus, $cpus) : $cpus
503 };
504 } else {
505 *ncpu = sub(;$) {
506 my $cpus = @_ ? shift : 1;
507 wantarray ? ($cpus, $cpus) : $cpus
508 };
398 } 509 }
399 warn "last\n";#d#
400} 510}
401 511
402sub call {
403 my $self = shift;
404
405 push @{ $self->{queue} }, [@_];
406 $self->scheduler;
407}
408
409sub DESTROY {
410 $_[0]{on_destroy}->();
411}
412
413=back 512=back
414 513
514=head1 CHILD USAGE
515
516In addition to the L<AnyEvent::Fork::RPC> API, this module implements one
517more child-side function:
518
519=over 4
520
521=item AnyEvent::Fork::Pool::retire ()
522
523This function sends an event to the parent process to request retirement:
524the worker is removed from the pool and no new jobs will be sent to it,
525but it still has to handle the jobs that are already queued.
526
527The parentheses are part of the syntax: the function usually isn't defined
528when you compile your code (because that happens I<before> handing the
529template process over to C<AnyEvent::Fork::Pool::run>, so you need the
530empty parentheses to tell Perl that the function is indeed a function.
531
532Retiring a worker can be useful to gracefully shut it down when the worker
533deems this useful. For example, after executing a job, it could check the
534process size or the number of jobs handled so far, and if either is too
535high, the worker could request to be retired, to avoid memory leaks to
536accumulate.
537
538Example: retire a worker after it has handled roughly 100 requests. It
539doesn't matter whether you retire at the beginning or end of your request,
540as the worker will continue to handle some outstanding requests. Likewise,
541it's ok to call retire multiple times.
542
543 my $count = 0;
544
545 sub my::worker {
546
547 ++$count == 100
548 and AnyEvent::Fork::Pool::retire ();
549
550 ... normal code goes here
551 }
552
553=back
554
555=head1 POOL PARAMETERS RECIPES
556
557This section describes some recipes for pool parameters. These are mostly
558meant for the synchronous RPC backend, as the asynchronous RPC backend
559changes the rules considerably, making workers themselves responsible for
560their scheduling.
561
562=over 4
563
564=item low latency - set load = 1
565
566If you need a deterministic low latency, you should set the C<load>
567parameter to C<1>. This ensures that never more than one job is sent to
568each worker. This avoids having to wait for a previous job to finish.
569
570This makes most sense with the synchronous (default) backend, as the
571asynchronous backend can handle multiple requests concurrently.
572
573=item lowest latency - set load = 1 and idle = max
574
575To achieve the lowest latency, you additionally should disable any dynamic
576resizing of the pool by setting C<idle> to the same value as C<max>.
577
578=item high throughput, cpu bound jobs - set load >= 2, max = #cpus
579
580To get high throughput with cpu-bound jobs, you should set the maximum
581pool size to the number of cpus in your system, and C<load> to at least
582C<2>, to make sure there can be another job waiting for the worker when it
583has finished one.
584
585The value of C<2> for C<load> is the minimum value that I<can> achieve
586100% throughput, but if your parent process itself is sometimes busy, you
587might need higher values. Also there is a limit on the amount of data that
588can be "in flight" to the worker, so if you send big blobs of data to your
589worker, C<load> might have much less of an effect.
590
591=item high throughput, I/O bound jobs - set load >= 2, max = 1, or very high
592
593When your jobs are I/O bound, using more workers usually boils down to
594higher throughput, depending very much on your actual workload - sometimes
595having only one worker is best, for example, when you read or write big
596files at maximum speed, as a second worker will increase seek times.
597
598=back
599
600=head1 EXCEPTIONS
601
602The same "policy" as with L<AnyEvent::Fork::RPC> applies - exceptions
603will not be caught, and exceptions in both worker and in callbacks causes
604undesirable or undefined behaviour.
605
415=head1 SEE ALSO 606=head1 SEE ALSO
416 607
417L<AnyEvent::Fork>, to create the processes in the first place. 608L<AnyEvent::Fork>, to create the processes in the first place.
609
610L<AnyEvent::Fork::Remote>, likewise, but helpful for remote processes.
418 611
419L<AnyEvent::Fork::RPC>, which implements the RPC protocol and API. 612L<AnyEvent::Fork::RPC>, which implements the RPC protocol and API.
420 613
421=head1 AUTHOR AND CONTACT INFORMATION 614=head1 AUTHOR AND CONTACT INFORMATION
422 615

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines