ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-Fork-Pool/Pool.pm
(Generate patch)

Comparing AnyEvent-Fork-Pool/Pool.pm (file contents):
Revision 1.1 by root, Thu Apr 18 14:24:01 2013 UTC vs.
Revision 1.17 by root, Thu Oct 27 07:27:56 2022 UTC

3AnyEvent::Fork::Pool - simple process pool manager on top of AnyEvent::Fork 3AnyEvent::Fork::Pool - simple process pool manager on top of AnyEvent::Fork
4 4
5=head1 SYNOPSIS 5=head1 SYNOPSIS
6 6
7 use AnyEvent; 7 use AnyEvent;
8 use AnyEvent::Fork;
8 use AnyEvent::Fork::Pool; 9 use AnyEvent::Fork::Pool;
9 # use AnyEvent::Fork is not needed
10 10
11 # all parameters with default values 11 # all possible parameters shown, with default values
12 my $pool = new AnyEvent::Fork::Pool 12 my $pool = AnyEvent::Fork
13 "MyWorker::run", 13 ->new
14 ->require ("MyWorker")
15 ->AnyEvent::Fork::Pool::run (
16 "MyWorker::run", # the worker function
14 17
15 # pool management 18 # pool management
16 min => 0, # minimum # of processes
17 max => 8, # maximum # of processes 19 max => 4, # absolute maximum # of processes
20 idle => 0, # minimum # of idle processes
21 load => 2, # queue at most this number of jobs per process
18 busy_time => 0, # wait this before starting a new process 22 start => 0.1, # wait this many seconds before starting a new process
19 max_idle => 1, # wait this before killing an idle process 23 stop => 10, # wait this many seconds before stopping an idle process
20 idle_time => 1, # at most this many idle processes 24 on_destroy => (my $finish = AE::cv), # called when object is destroyed
21 25
22 # template process
23 template => AnyEvent::Fork->new, # the template process to use
24 require => [MyWorker::], # module(s) to load
25 eval => "# perl code to execute in template",
26 on_destroy => (my $finish = AE::cv),
27
28 # parameters passed to AnyEvent::Fork::RPC 26 # parameters passed to AnyEvent::Fork::RPC
29 async => 0, 27 async => 0,
30 on_error => sub { die "FATAL: $_[0]\n" }, 28 on_error => sub { die "FATAL: $_[0]\n" },
31 on_event => sub { my @ev = @_ }, 29 on_event => sub { my @ev = @_ },
32 init => "MyWorker::init", 30 init => "MyWorker::init",
33 serialiser => $AnyEvent::Fork::RPC::STRING_SERIALISER, 31 serialiser => $AnyEvent::Fork::RPC::STRING_SERIALISER,
34 ; 32 );
35 33
36 for (1..10) { 34 for (1..10) {
37 $pool->call (doit => $_, sub { 35 $pool->(doit => $_, sub {
38 print "MyWorker::run returned @_\n"; 36 print "MyWorker::run returned @_\n";
39 }); 37 });
40 } 38 }
41 39
42 undef $pool; 40 undef $pool;
43 41
44 $finish->recv; 42 $finish->recv;
45 43
46=head1 DESCRIPTION 44=head1 DESCRIPTION
47 45
48This module uses processes created via L<AnyEvent::Fork> and the RPC 46This module uses processes created via L<AnyEvent::Fork> (or
49protocol implement in L<AnyEvent::Fork::RPC> to create a load-balanced 47L<AnyEvent::Fork::Remote>) and the RPC protocol implement in
50pool of processes that handles jobs. 48L<AnyEvent::Fork::RPC> to create a load-balanced pool of processes that
49handles jobs.
51 50
52Understanding of L<AnyEvent::Fork> is helpful but not critical to be able 51Understanding L<AnyEvent::Fork> is helpful but not required to use this
53to use this module, but a thorough understanding of L<AnyEvent::Fork::RPC> 52module, but a thorough understanding of L<AnyEvent::Fork::RPC> is, as
54is, as it defines the actual API that needs to be implemented in the 53it defines the actual API that needs to be implemented in the worker
55children. 54processes.
56 55
57=head1 EXAMPLES 56=head1 PARENT USAGE
58 57
59=head1 API 58To create a pool, you first have to create a L<AnyEvent::Fork> object -
59this object becomes your template process. Whenever a new worker process
60is needed, it is forked from this template process. Then you need to
61"hand off" this template process to the C<AnyEvent::Fork::Pool> module by
62calling its run method on it:
63
64 my $template = AnyEvent::Fork
65 ->new
66 ->require ("SomeModule", "MyWorkerModule");
67
68 my $pool = $template->AnyEvent::Fork::Pool::run ("MyWorkerModule::myfunction");
69
70The pool "object" is not a regular Perl object, but a code reference that
71you can call and that works roughly like calling the worker function
72directly, except that it returns nothing but instead you need to specify a
73callback to be invoked once results are in:
74
75 $pool->(1, 2, 3, sub { warn "myfunction(1,2,3) returned @_" });
60 76
61=over 4 77=over 4
62 78
63=cut 79=cut
64 80
65package AnyEvent::Fork::Pool; 81package AnyEvent::Fork::Pool;
66 82
67use common::sense; 83use common::sense;
68 84
85use Scalar::Util ();
86
69use Guard (); 87use Guard ();
88use Array::Heap ();
70 89
71use AnyEvent; 90use AnyEvent;
72use AnyEvent::Fork; # we don't actually depend on it, this is for convenience
73use AnyEvent::Fork::RPC; 91use AnyEvent::Fork::RPC;
74 92
93# these are used for the first and last argument of events
94# in the hope of not colliding. yes, I don't like it either,
95# but didn't come up with an obviously better alternative.
96my $magic0 = ':t6Z@HK1N%Dx@_7?=~-7NQgWDdAs6a,jFN=wLO0*jD*1%P';
97my $magic1 = '<~53rexz.U`!]X[A235^"fyEoiTF\T~oH1l/N6+Djep9b~bI9`\1x%B~vWO1q*';
98
75our $VERSION = 0.1; 99our $VERSION = 1.3;
76 100
77=item my $rpc = new AnyEvent::Fork::RPC::pool $function, [key => value...] 101=item my $pool = AnyEvent::Fork::Pool::run $fork, $function, [key => value...]
102
103The traditional way to call the pool creation function. But it is way
104cooler to call it in the following way:
105
106=item my $pool = $fork->AnyEvent::Fork::Pool::run ($function, [key => value...])
107
108Creates a new pool object with the specified C<$function> as function
109(name) to call for each request. The pool uses the C<$fork> object as the
110template when creating worker processes.
111
112You can supply your own template process, or tell C<AnyEvent::Fork::Pool>
113to create one.
114
115A relatively large number of key/value pairs can be specified to influence
116the behaviour. They are grouped into the categories "pool management",
117"template process" and "rpc parameters".
78 118
79=over 4 119=over 4
80 120
81=item on_error => $cb->($msg) 121=item Pool Management
82 122
83Called on (fatal) errors, with a descriptive (hopefully) message. If 123The pool consists of a certain number of worker processes. These options
84this callback is not provided, but C<on_event> is, then the C<on_event> 124decide how many of these processes exist and when they are started and
85callback is called with the first argument being the string C<error>, 125stopped.
86followed by the error message.
87 126
88If neither handler is provided it prints the error to STDERR and will 127The worker pool is dynamically resized, according to (perceived :)
89start failing badly. 128load. The minimum size is given by the C<idle> parameter and the maximum
129size is given by the C<max> parameter. A new worker is started every
130C<start> seconds at most, and an idle worker is stopped at most every
131C<stop> second.
90 132
91=item on_event => $cb->(...) 133You can specify the amount of jobs sent to a worker concurrently using the
134C<load> parameter.
92 135
93Called for every call to the C<AnyEvent::Fork::RPC::event> function in the 136=over 4
94child, with the arguments of that function passed to the callback.
95 137
96Also called on errors when no C<on_error> handler is provided. 138=item idle => $count (default: 0)
97 139
98=item on_destroy => $cb->() 140The minimum amount of idle processes in the pool - when there are fewer
141than this many idle workers, C<AnyEvent::Fork::Pool> will try to start new
142ones, subject to the limits set by C<max> and C<start>.
99 143
100Called when the C<$rpc> object has been destroyed and all requests have 144This is also the initial amount of workers in the pool. The default of
101been successfully handled. This is useful when you queue some requests and 145zero means that the pool starts empty and can shrink back to zero workers
102want the child to go away after it has handled them. The problem is that 146over time.
103the parent must not exit either until all requests have been handled, and
104this can be accomplished by waiting for this callback.
105 147
106=item init => $function (default none) 148=item max => $count (default: 4)
107 149
108When specified (by name), this function is called in the child as the very 150The maximum number of processes in the pool, in addition to the template
109first thing when taking over the process, with all the arguments normally 151process. C<AnyEvent::Fork::Pool> will never have more than this number of
110passed to the C<AnyEvent::Fork::run> function, except the communications 152worker processes, although there can be more temporarily when a worker is
111socket. 153shut down and hasn't exited yet.
112 154
113It can be used to do one-time things in the child such as storing passed 155=item load => $count (default: 2)
114parameters or opening database connections.
115 156
116It is called very early - before the serialisers are created or the 157The maximum number of concurrent jobs sent to a single worker process.
117C<$function> name is resolved into a function reference, so it could be 158
118used to load any modules that provide the serialiser or function. It can 159Jobs that cannot be sent to a worker immediately (because all workers are
119not, however, create events. 160busy) will be queued until a worker is available.
161
162Setting this low improves latency. For example, at C<1>, every job that
163is sent to a worker is sent to a completely idle worker that doesn't run
164any other jobs. The downside is that throughput is reduced - a worker that
165finishes a job needs to wait for a new job from the parent.
166
167The default of C<2> is usually a good compromise.
168
169=item start => $seconds (default: 0.1)
170
171When there are fewer than C<idle> workers (or all workers are completely
172busy), then a timer is started. If the timer elapses and there are still
173jobs that cannot be queued to a worker, a new worker is started.
174
175This sets the minimum time that all workers must be busy before a new
176worker is started. Or, put differently, the minimum delay between starting
177new workers.
178
179The delay is small by default, which means new workers will be started
180relatively quickly. A delay of C<0> is possible, and ensures that the pool
181will grow as quickly as possible under load.
182
183Non-zero values are useful to avoid "exploding" a pool because a lot of
184jobs are queued in an instant.
185
186Higher values are often useful to improve efficiency at the cost of
187latency - when fewer processes can do the job over time, starting more and
188more is not necessarily going to help.
189
190=item stop => $seconds (default: 10)
191
192When a worker has no jobs to execute it becomes idle. An idle worker that
193hasn't executed a job within this amount of time will be stopped, unless
194the other parameters say otherwise.
195
196Setting this to a very high value means that workers stay around longer,
197even when they have nothing to do, which can be good as they don't have to
198be started on the netx load spike again.
199
200Setting this to a lower value can be useful to avoid memory or simply
201process table wastage.
202
203Usually, setting this to a time longer than the time between load spikes
204is best - if you expect a lot of requests every minute and little work
205in between, setting this to longer than a minute avoids having to stop
206and start workers. On the other hand, you have to ask yourself if letting
207workers run idle is a good use of your resources. Try to find a good
208balance between resource usage of your workers and the time to start new
209workers - the processes created by L<AnyEvent::Fork> itself is fats at
210creating workers while not using much memory for them, so most of the
211overhead is likely from your own code.
212
213=item on_destroy => $callback->() (default: none)
214
215When a pool object goes out of scope, the outstanding requests are still
216handled till completion. Only after handling all jobs will the workers
217be destroyed (and also the template process if it isn't referenced
218otherwise).
219
220To find out when a pool I<really> has finished its work, you can set this
221callback, which will be called when the pool has been destroyed.
222
223=back
224
225=item AnyEvent::Fork::RPC Parameters
226
227These parameters are all passed more or less directly to
228L<AnyEvent::Fork::RPC>. They are only briefly mentioned here, for
229their full documentation please refer to the L<AnyEvent::Fork::RPC>
230documentation. Also, the default values mentioned here are only documented
231as a best effort - the L<AnyEvent::Fork::RPC> documentation is binding.
232
233=over 4
120 234
121=item async => $boolean (default: 0) 235=item async => $boolean (default: 0)
122 236
123The default server used in the child does all I/O blockingly, and only 237Whether to use the synchronous or asynchronous RPC backend.
124allows a single RPC call to execute concurrently.
125 238
126Setting C<async> to a true value switches to another implementation that 239=item on_error => $callback->($message) (default: die with message)
127uses L<AnyEvent> in the child and allows multiple concurrent RPC calls.
128 240
129The actual API in the child is documented in the section that describes 241The callback to call on any (fatal) errors.
130the calling semantics of the returned C<$rpc> function.
131 242
132If you want to pre-load the actual back-end modules to enable memory 243=item on_event => $callback->(...) (default: C<sub { }>, unlike L<AnyEvent::Fork::RPC>)
133sharing, then you should load C<AnyEvent::Fork::RPC::Sync> for
134synchronous, and C<AnyEvent::Fork::RPC::Async> for asynchronous mode.
135 244
136If you use a template process and want to fork both sync and async 245The callback to invoke on events.
137children, then it is permissible to load both modules.
138 246
139=item serialiser => $string (default: '(sub { pack "(w/a*)*", @_ }, sub { unpack "(w/a*)*", shift })') 247=item init => $initfunction (default: none)
140 248
141All arguments, result data and event data have to be serialised to be 249The function to call in the child, once before handling requests.
142transferred between the processes. For this, they have to be frozen and
143thawed in both parent and child processes.
144 250
145By default, only octet strings can be passed between the processes, which 251=item serialiser => $serialiser (defailt: $AnyEvent::Fork::RPC::STRING_SERIALISER)
146is reasonably fast and efficient.
147 252
148For more complicated use cases, you can provide your own freeze and thaw 253The serialiser to use.
149functions, by specifying a string with perl source code. It's supposed to
150return two code references when evaluated: the first receives a list of
151perl values and must return an octet string. The second receives the octet
152string and must return the original list of values.
153
154If you need an external module for serialisation, then you can either
155pre-load it into your L<AnyEvent::Fork> process, or you can add a C<use>
156or C<require> statement into the serialiser string. Or both.
157 254
158=back 255=back
159 256
160See the examples section earlier in this document for some actual 257=back
161examples.
162 258
163=cut 259=cut
164 260
165sub new { 261sub run {
166 my ($self, $function, %arg) = @_; 262 my ($template, $function, %arg) = @_;
167 263
168 my $serialiser = delete $arg{serialiser} || $STRING_SERIALISER; 264 my $max = $arg{max} || 4;
265 my $idle = $arg{idle} || 0,
266 my $load = $arg{load} || 2,
267 my $start = $arg{start} || 0.1,
268 my $stop = $arg{stop} || 10,
169 my $on_event = delete $arg{on_event}; 269 my $on_event = $arg{on_event} || sub { },
170 my $on_error = delete $arg{on_error};
171 my $on_destroy = delete $arg{on_destroy}; 270 my $on_destroy = $arg{on_destroy};
271
272 my @rpc = (
273 async => $arg{async},
274 init => $arg{init},
275 serialiser => delete $arg{serialiser},
276 on_error => $arg{on_error},
277 );
278
279 my (@pool, @queue, $nidle, $start_w, $stop_w, $shutdown);
280 my ($start_worker, $stop_worker, $want_start, $want_stop, $scheduler);
281
282 my $destroy_guard = Guard::guard {
283 $on_destroy->()
284 if $on_destroy;
285 };
286
287 $template
288 ->require ("AnyEvent::Fork::RPC::" . ($arg{async} ? "Async" : "Sync"))
289 ->eval ('
290 my ($magic0, $magic1) = @_;
291 sub AnyEvent::Fork::Pool::retire() {
292 AnyEvent::Fork::RPC::event $magic0, "quit", $magic1;
293 }
294 ', $magic0, $magic1)
172 295 ;
173 # default for on_error is to on_event, if specified
174 $on_error ||= $on_event
175 ? sub { $on_event->(error => shift) }
176 : sub { die "AnyEvent::Fork::RPC: uncaught error: $_[0].\n" };
177 296
178 # default for on_event is to raise an error 297 $start_worker = sub {
179 $on_event ||= sub { $on_error->("event received, but no on_event handler") }; 298 my $proc = [0, 0, undef]; # load, index, rpc
180 299
181 my ($f, $t) = eval $serialiser; die $@ if $@; 300 $proc->[2] = $template
301 ->fork
302 ->AnyEvent::Fork::RPC::run ($function,
303 @rpc,
304 on_event => sub {
305 if (@_ == 3 && $_[0] eq $magic0 && $_[2] eq $magic1) {
306 $destroy_guard if 0; # keep it alive
182 307
183 my (@rcb, %rcb, $fh, $shutdown, $wbuf, $ww); 308 $_[1] eq "quit" and $stop_worker->($proc);
184 my ($rlen, $rbuf, $rw) = 512 - 16; 309 return;
310 }
185 311
186 my $wcb = sub { 312 &$on_event;
187 my $len = syswrite $fh, $wbuf; 313 },
314 )
315 ;
188 316
189 unless (defined $len) { 317 ++$nidle;
190 if ($! != Errno::EAGAIN && $! != Errno::EWOULDBLOCK) { 318 Array::Heap::push_heap_idx @pool, $proc;
191 undef $rw; undef $ww; # it ends here 319
192 $on_error->("$!"); 320 Scalar::Util::weaken $proc;
321 };
322
323 $stop_worker = sub {
324 my $proc = shift;
325
326 $proc->[0]
327 or --$nidle;
328
329 Array::Heap::splice_heap_idx @pool, $proc->[1]
330 if defined $proc->[1];
331
332 @$proc = 0; # tell others to leave it be
333 };
334
335 $want_start = sub {
336 undef $stop_w;
337
338 $start_w ||= AE::timer $start, $start, sub {
339 if (($nidle < $idle || @queue) && @pool < $max) {
340 $start_worker->();
341 $scheduler->();
342 } else {
343 undef $start_w;
193 } 344 }
194 } 345 };
346 };
195 347
196 substr $wbuf, 0, $len, ""; 348 $want_stop = sub {
349 $stop_w ||= AE::timer $stop, $stop, sub {
350 $stop_worker->($pool[0])
351 if $nidle;
197 352
198 unless (length $wbuf) { 353 undef $stop_w
354 if $nidle <= $idle;
355 };
356 };
357
358 $scheduler = sub {
359 if (@queue) {
360 while (@queue) {
361 @pool or $start_worker->();
362
363 my $proc = $pool[0];
364
365 if ($proc->[0] < $load) {
366 # found free worker, increase load
367 unless ($proc->[0]++) {
368 # worker became busy
369 --$nidle
370 or undef $stop_w;
371
372 $want_start->()
373 if $nidle < $idle && @pool < $max;
374 }
375
376 Array::Heap::adjust_heap_idx @pool, 0;
377
378 my $job = shift @queue;
379 my $ocb = pop @$job;
380
381 $proc->[2]->(@$job, sub {
382 # reduce load
383 --$proc->[0] # worker still busy?
384 or ++$nidle > $idle # not too many idle processes?
385 or $want_stop->();
386
387 Array::Heap::adjust_heap_idx @pool, $proc->[1]
388 if defined $proc->[1];
389
390 &$ocb;
391
392 $scheduler->();
393 });
394 } else {
395 $want_start->()
396 unless @pool >= $max;
397
398 last;
399 }
400 }
401 } elsif ($shutdown) {
402 undef $_->[2]
403 for @pool;
404
199 undef $ww; 405 undef $start_w;
200 $shutdown and shutdown $fh, 1; 406 undef $start_worker; # frees $destroy_guard reference
407
408 $stop_worker->($pool[0])
409 while $nidle;
201 } 410 }
202 }; 411 };
203 412
204 my $module = "AnyEvent::Fork::RPC::" . ($arg{async} ? "Async" : "Sync"); 413 my $shutdown_guard = Guard::guard {
414 $shutdown = 1;
415 $scheduler->();
416 };
205 417
206 $self->require ($module) 418 $start_worker->()
207 ->send_arg ($function, $arg{init}, $serialiser) 419 while @pool < $idle;
208 ->run ("$module\::run", sub {
209 $fh = shift;
210 420
211 my ($id, $len); 421 sub {
212 $rw = AE::io $fh, 0, sub { 422 $shutdown_guard if 0; # keep it alive
213 $rlen = $rlen * 2 + 16 if $rlen - 128 < length $rbuf;
214 $len = sysread $fh, $rbuf, $rlen - length $rbuf, length $rbuf;
215 423
216 if ($len) { 424 $start_worker->()
217 while (8 <= length $rbuf) { 425 unless @pool;
218 ($id, $len) = unpack "LL", $rbuf;
219 8 + $len <= length $rbuf
220 or last;
221 426
222 my @r = $t->(substr $rbuf, 8, $len); 427 push @queue, [@_];
223 substr $rbuf, 0, 8 + $len, ""; 428 $scheduler->();
429 }
430}
224 431
225 if ($id) { 432=item $pool->(..., $cb->(...))
226 if (@rcb) { 433
227 (shift @rcb)->(@r); 434Call the RPC function of a worker with the given arguments, and when the
228 } elsif (my $cb = delete $rcb{$id}) { 435worker is done, call the C<$cb> with the results, just like calling the
229 $cb->(@r); 436RPC object durectly - see the L<AnyEvent::Fork::RPC> documentation for
230 } else { 437details on the RPC API.
231 undef $rw; undef $ww; 438
232 $on_error->("unexpected data from child"); 439If there is no free worker, the call will be queued until a worker becomes
440available.
441
442Note that there can be considerable time between calling this method and
443the call actually being executed. During this time, the parameters passed
444to this function are effectively read-only - modifying them after the call
445and before the callback is invoked causes undefined behaviour.
446
447=cut
448
449=item $cpus = AnyEvent::Fork::Pool::ncpu [$default_cpus]
450
451=item ($cpus, $eus) = AnyEvent::Fork::Pool::ncpu [$default_cpus]
452
453Tries to detect the number of CPUs (C<$cpus> often called CPU cores
454nowadays) and execution units (C<$eus>) which include e.g. extra
455hyperthreaded units). When C<$cpus> cannot be determined reliably,
456C<$default_cpus> is returned for both values, or C<1> if it is missing.
457
458For normal CPU bound uses, it is wise to have as many worker processes
459as CPUs in the system (C<$cpus>), if nothing else uses the CPU. Using
460hyperthreading is usually detrimental to performance, but in those rare
461cases where that really helps it might be beneficial to use more workers
462(C<$eus>).
463
464Currently, F</proc/cpuinfo> is parsed on GNU/Linux systems for both
465C<$cpus> and C<$eus>, and on {Free,Net,Open}BSD, F<sysctl -n hw.ncpu> is
466used for C<$cpus>.
467
468Example: create a worker pool with as many workers as CPU cores, or C<2>,
469if the actual number could not be determined.
470
471 $fork->AnyEvent::Fork::Pool::run ("myworker::function",
472 max => (scalar AnyEvent::Fork::Pool::ncpu 2),
473 );
474
475=cut
476
477BEGIN {
478 if ($^O eq "linux") {
479 *ncpu = sub(;$) {
480 my ($cpus, $eus);
481
482 if (open my $fh, "<", "/proc/cpuinfo") {
483 my %id;
484
485 while (<$fh>) {
486 if (/^core id\s*:\s*(\d+)/) {
233 } 487 ++$eus;
234 } else { 488 undef $id{$1};
235 $on_event->(@r);
236 } 489 }
237 } 490 }
238 } elsif (defined $len) {
239 undef $rw; undef $ww; # it ends here
240 491
241 if (@rcb || %rcb) { 492 $cpus = scalar keys %id;
242 $on_error->("unexpected eof");
243 } else { 493 } else {
244 $on_destroy->(); 494 $cpus = $eus = @_ ? shift : 1;
245 }
246 } elsif ($! != Errno::EAGAIN && $! != Errno::EWOULDBLOCK) {
247 undef $rw; undef $ww; # it ends here
248 $on_error->("read: $!");
249 } 495 }
496 wantarray ? ($cpus, $eus) : $cpus
250 }; 497 };
251 498 } elsif ($^O eq "freebsd" || $^O eq "netbsd" || $^O eq "openbsd") {
252 $ww ||= AE::io $fh, 1, $wcb; 499 *ncpu = sub(;$) {
253 }); 500 my $cpus = qx<sysctl -n hw.ncpu> * 1
254 501 || (@_ ? shift : 1);
255 my $guard = Guard::guard { 502 wantarray ? ($cpus, $cpus) : $cpus
256 $shutdown = 1; 503 };
257 $ww ||= $fh && AE::io $fh, 1, $wcb; 504 } else {
505 *ncpu = sub(;$) {
506 my $cpus = @_ ? shift : 1;
507 wantarray ? ($cpus, $cpus) : $cpus
508 };
258 }; 509 }
259
260 my $id;
261
262 $arg{async}
263 ? sub {
264 $id = ($id == 0xffffffff ? 0 : $id) + 1;
265 $id = ($id == 0xffffffff ? 0 : $id) + 1 while exists $rcb{$id}; # rarely loops
266
267 $rcb{$id} = pop;
268
269 $guard; # keep it alive
270
271 $wbuf .= pack "LL/a*", $id, &$f;
272 $ww ||= $fh && AE::io $fh, 1, $wcb;
273 }
274 : sub {
275 push @rcb, pop;
276
277 $guard; # keep it alive
278
279 $wbuf .= pack "L/a*", &$f;
280 $ww ||= $fh && AE::io $fh, 1, $wcb;
281 }
282} 510}
283 511
284=item $pool->call (..., $cb->(...))
285
286=back 512=back
287 513
514=head1 CHILD USAGE
515
516In addition to the L<AnyEvent::Fork::RPC> API, this module implements one
517more child-side function:
518
519=over 4
520
521=item AnyEvent::Fork::Pool::retire ()
522
523This function sends an event to the parent process to request retirement:
524the worker is removed from the pool and no new jobs will be sent to it,
525but it still has to handle the jobs that are already queued.
526
527The parentheses are part of the syntax: the function usually isn't defined
528when you compile your code (because that happens I<before> handing the
529template process over to C<AnyEvent::Fork::Pool::run>, so you need the
530empty parentheses to tell Perl that the function is indeed a function.
531
532Retiring a worker can be useful to gracefully shut it down when the worker
533deems this useful. For example, after executing a job, it could check the
534process size or the number of jobs handled so far, and if either is too
535high, the worker could request to be retired, to avoid memory leaks to
536accumulate.
537
538Example: retire a worker after it has handled roughly 100 requests. It
539doesn't matter whether you retire at the beginning or end of your request,
540as the worker will continue to handle some outstanding requests. Likewise,
541it's ok to call retire multiple times.
542
543 my $count = 0;
544
545 sub my::worker {
546
547 ++$count == 100
548 and AnyEvent::Fork::Pool::retire ();
549
550 ... normal code goes here
551 }
552
553=back
554
555=head1 POOL PARAMETERS RECIPES
556
557This section describes some recipes for pool parameters. These are mostly
558meant for the synchronous RPC backend, as the asynchronous RPC backend
559changes the rules considerably, making workers themselves responsible for
560their scheduling.
561
562=over 4
563
564=item low latency - set load = 1
565
566If you need a deterministic low latency, you should set the C<load>
567parameter to C<1>. This ensures that never more than one job is sent to
568each worker. This avoids having to wait for a previous job to finish.
569
570This makes most sense with the synchronous (default) backend, as the
571asynchronous backend can handle multiple requests concurrently.
572
573=item lowest latency - set load = 1 and idle = max
574
575To achieve the lowest latency, you additionally should disable any dynamic
576resizing of the pool by setting C<idle> to the same value as C<max>.
577
578=item high throughput, cpu bound jobs - set load >= 2, max = #cpus
579
580To get high throughput with cpu-bound jobs, you should set the maximum
581pool size to the number of cpus in your system, and C<load> to at least
582C<2>, to make sure there can be another job waiting for the worker when it
583has finished one.
584
585The value of C<2> for C<load> is the minimum value that I<can> achieve
586100% throughput, but if your parent process itself is sometimes busy, you
587might need higher values. Also there is a limit on the amount of data that
588can be "in flight" to the worker, so if you send big blobs of data to your
589worker, C<load> might have much less of an effect.
590
591=item high throughput, I/O bound jobs - set load >= 2, max = 1, or very high
592
593When your jobs are I/O bound, using more workers usually boils down to
594higher throughput, depending very much on your actual workload - sometimes
595having only one worker is best, for example, when you read or write big
596files at maximum speed, as a second worker will increase seek times.
597
598=back
599
600=head1 EXCEPTIONS
601
602The same "policy" as with L<AnyEvent::Fork::RPC> applies - exceptions
603will not be caught, and exceptions in both worker and in callbacks causes
604undesirable or undefined behaviour.
605
288=head1 SEE ALSO 606=head1 SEE ALSO
289 607
290L<AnyEvent::Fork>, to create the processes in the first place. 608L<AnyEvent::Fork>, to create the processes in the first place.
609
610L<AnyEvent::Fork::Remote>, likewise, but helpful for remote processes.
291 611
292L<AnyEvent::Fork::RPC>, which implements the RPC protocol and API. 612L<AnyEvent::Fork::RPC>, which implements the RPC protocol and API.
293 613
294=head1 AUTHOR AND CONTACT INFORMATION 614=head1 AUTHOR AND CONTACT INFORMATION
295 615

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines