ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-Fork-Pool/Pool.pm
(Generate patch)

Comparing AnyEvent-Fork-Pool/Pool.pm (file contents):
Revision 1.1 by root, Thu Apr 18 14:24:01 2013 UTC vs.
Revision 1.13 by root, Sun Apr 28 14:27:31 2013 UTC

6 6
7 use AnyEvent; 7 use AnyEvent;
8 use AnyEvent::Fork::Pool; 8 use AnyEvent::Fork::Pool;
9 # use AnyEvent::Fork is not needed 9 # use AnyEvent::Fork is not needed
10 10
11 # all parameters with default values 11 # all possible parameters shown, with default values
12 my $pool = new AnyEvent::Fork::Pool 12 my $pool = AnyEvent::Fork
13 "MyWorker::run", 13 ->new
14 ->require ("MyWorker")
15 ->AnyEvent::Fork::Pool::run (
16 "MyWorker::run", # the worker function
14 17
15 # pool management 18 # pool management
16 min => 0, # minimum # of processes
17 max => 8, # maximum # of processes 19 max => 4, # absolute maximum # of processes
20 idle => 0, # minimum # of idle processes
21 load => 2, # queue at most this number of jobs per process
18 busy_time => 0, # wait this before starting a new process 22 start => 0.1, # wait this many seconds before starting a new process
19 max_idle => 1, # wait this before killing an idle process 23 stop => 10, # wait this many seconds before stopping an idle process
20 idle_time => 1, # at most this many idle processes 24 on_destroy => (my $finish = AE::cv), # called when object is destroyed
21 25
22 # template process
23 template => AnyEvent::Fork->new, # the template process to use
24 require => [MyWorker::], # module(s) to load
25 eval => "# perl code to execute in template",
26 on_destroy => (my $finish = AE::cv),
27
28 # parameters passed to AnyEvent::Fork::RPC 26 # parameters passed to AnyEvent::Fork::RPC
29 async => 0, 27 async => 0,
30 on_error => sub { die "FATAL: $_[0]\n" }, 28 on_error => sub { die "FATAL: $_[0]\n" },
31 on_event => sub { my @ev = @_ }, 29 on_event => sub { my @ev = @_ },
32 init => "MyWorker::init", 30 init => "MyWorker::init",
33 serialiser => $AnyEvent::Fork::RPC::STRING_SERIALISER, 31 serialiser => $AnyEvent::Fork::RPC::STRING_SERIALISER,
34 ; 32 );
35 33
36 for (1..10) { 34 for (1..10) {
37 $pool->call (doit => $_, sub { 35 $pool->(doit => $_, sub {
38 print "MyWorker::run returned @_\n"; 36 print "MyWorker::run returned @_\n";
39 }); 37 });
40 } 38 }
41 39
42 undef $pool; 40 undef $pool;
43 41
44 $finish->recv; 42 $finish->recv;
45 43
46=head1 DESCRIPTION 44=head1 DESCRIPTION
47 45
48This module uses processes created via L<AnyEvent::Fork> and the RPC 46This module uses processes created via L<AnyEvent::Fork> (or
49protocol implement in L<AnyEvent::Fork::RPC> to create a load-balanced 47L<AnyEvent::Fork::Remote>) and the RPC protocol implement in
50pool of processes that handles jobs. 48L<AnyEvent::Fork::RPC> to create a load-balanced pool of processes that
49handles jobs.
51 50
52Understanding of L<AnyEvent::Fork> is helpful but not critical to be able 51Understanding of L<AnyEvent::Fork> is helpful but not critical to be able
53to use this module, but a thorough understanding of L<AnyEvent::Fork::RPC> 52to use this module, but a thorough understanding of L<AnyEvent::Fork::RPC>
54is, as it defines the actual API that needs to be implemented in the 53is, as it defines the actual API that needs to be implemented in the
55children. 54worker processes.
56 55
57=head1 EXAMPLES 56=head1 PARENT USAGE
58 57
59=head1 API 58To create a pool, you first have to create a L<AnyEvent::Fork> object -
59this object becomes your template process. Whenever a new worker process
60is needed, it is forked from this template process. Then you need to
61"hand off" this template process to the C<AnyEvent::Fork::Pool> module by
62calling its run method on it:
63
64 my $template = AnyEvent::Fork
65 ->new
66 ->require ("SomeModule", "MyWorkerModule");
67
68 my $pool = $template->AnyEvent::Fork::Pool::run ("MyWorkerModule::myfunction");
69
70The pool "object" is not a regular Perl object, but a code reference that
71you can call and that works roughly like calling the worker function
72directly, except that it returns nothing but instead you need to specify a
73callback to be invoked once results are in:
74
75 $pool->(1, 2, 3, sub { warn "myfunction(1,2,3) returned @_" });
60 76
61=over 4 77=over 4
62 78
63=cut 79=cut
64 80
65package AnyEvent::Fork::Pool; 81package AnyEvent::Fork::Pool;
66 82
67use common::sense; 83use common::sense;
68 84
85use Scalar::Util ();
86
69use Guard (); 87use Guard ();
88use Array::Heap ();
70 89
71use AnyEvent; 90use AnyEvent;
91# explicit version on next line, as some cpan-testers test with the 0.1 version,
92# ignoring dependencies, and this line will at least give a clear indication of that.
72use AnyEvent::Fork; # we don't actually depend on it, this is for convenience 93use AnyEvent::Fork 0.6; # we don't actually depend on it, this is for convenience
73use AnyEvent::Fork::RPC; 94use AnyEvent::Fork::RPC;
74 95
96# these are used for the first and last argument of events
97# in the hope of not colliding. yes, I don't like it either,
98# but didn't come up with an obviously better alternative.
99my $magic0 = ':t6Z@HK1N%Dx@_7?=~-7NQgWDdAs6a,jFN=wLO0*jD*1%P';
100my $magic1 = '<~53rexz.U`!]X[A235^"fyEoiTF\T~oH1l/N6+Djep9b~bI9`\1x%B~vWO1q*';
101
75our $VERSION = 0.1; 102our $VERSION = 1.1;
76 103
77=item my $rpc = new AnyEvent::Fork::RPC::pool $function, [key => value...] 104=item my $pool = AnyEvent::Fork::Pool::run $fork, $function, [key => value...]
105
106The traditional way to call the pool creation function. But it is way
107cooler to call it in the following way:
108
109=item my $pool = $fork->AnyEvent::Fork::Pool::run ($function, [key => value...])
110
111Creates a new pool object with the specified C<$function> as function
112(name) to call for each request. The pool uses the C<$fork> object as the
113template when creating worker processes.
114
115You can supply your own template process, or tell C<AnyEvent::Fork::Pool>
116to create one.
117
118A relatively large number of key/value pairs can be specified to influence
119the behaviour. They are grouped into the categories "pool management",
120"template process" and "rpc parameters".
78 121
79=over 4 122=over 4
80 123
81=item on_error => $cb->($msg) 124=item Pool Management
82 125
83Called on (fatal) errors, with a descriptive (hopefully) message. If 126The pool consists of a certain number of worker processes. These options
84this callback is not provided, but C<on_event> is, then the C<on_event> 127decide how many of these processes exist and when they are started and
85callback is called with the first argument being the string C<error>, 128stopped.
86followed by the error message.
87 129
88If neither handler is provided it prints the error to STDERR and will 130The worker pool is dynamically resized, according to (perceived :)
89start failing badly. 131load. The minimum size is given by the C<idle> parameter and the maximum
132size is given by the C<max> parameter. A new worker is started every
133C<start> seconds at most, and an idle worker is stopped at most every
134C<stop> second.
90 135
91=item on_event => $cb->(...) 136You can specify the amount of jobs sent to a worker concurrently using the
137C<load> parameter.
92 138
93Called for every call to the C<AnyEvent::Fork::RPC::event> function in the 139=over 4
94child, with the arguments of that function passed to the callback.
95 140
96Also called on errors when no C<on_error> handler is provided. 141=item idle => $count (default: 0)
97 142
98=item on_destroy => $cb->() 143The minimum amount of idle processes in the pool - when there are fewer
144than this many idle workers, C<AnyEvent::Fork::Pool> will try to start new
145ones, subject to the limits set by C<max> and C<start>.
99 146
100Called when the C<$rpc> object has been destroyed and all requests have 147This is also the initial amount of workers in the pool. The default of
101been successfully handled. This is useful when you queue some requests and 148zero means that the pool starts empty and can shrink back to zero workers
102want the child to go away after it has handled them. The problem is that 149over time.
103the parent must not exit either until all requests have been handled, and
104this can be accomplished by waiting for this callback.
105 150
106=item init => $function (default none) 151=item max => $count (default: 4)
107 152
108When specified (by name), this function is called in the child as the very 153The maximum number of processes in the pool, in addition to the template
109first thing when taking over the process, with all the arguments normally 154process. C<AnyEvent::Fork::Pool> will never have more than this number of
110passed to the C<AnyEvent::Fork::run> function, except the communications 155worker processes, although there can be more temporarily when a worker is
111socket. 156shut down and hasn't exited yet.
112 157
113It can be used to do one-time things in the child such as storing passed 158=item load => $count (default: 2)
114parameters or opening database connections.
115 159
116It is called very early - before the serialisers are created or the 160The maximum number of concurrent jobs sent to a single worker process.
117C<$function> name is resolved into a function reference, so it could be 161
118used to load any modules that provide the serialiser or function. It can 162Jobs that cannot be sent to a worker immediately (because all workers are
119not, however, create events. 163busy) will be queued until a worker is available.
164
165Setting this low improves latency. For example, at C<1>, every job that
166is sent to a worker is sent to a completely idle worker that doesn't run
167any other jobs. The downside is that throughput is reduced - a worker that
168finishes a job needs to wait for a new job from the parent.
169
170The default of C<2> is usually a good compromise.
171
172=item start => $seconds (default: 0.1)
173
174When there are fewer than C<idle> workers (or all workers are completely
175busy), then a timer is started. If the timer elapses and there are still
176jobs that cannot be queued to a worker, a new worker is started.
177
178This sets the minimum time that all workers must be busy before a new
179worker is started. Or, put differently, the minimum delay between starting
180new workers.
181
182The delay is small by default, which means new workers will be started
183relatively quickly. A delay of C<0> is possible, and ensures that the pool
184will grow as quickly as possible under load.
185
186Non-zero values are useful to avoid "exploding" a pool because a lot of
187jobs are queued in an instant.
188
189Higher values are often useful to improve efficiency at the cost of
190latency - when fewer processes can do the job over time, starting more and
191more is not necessarily going to help.
192
193=item stop => $seconds (default: 10)
194
195When a worker has no jobs to execute it becomes idle. An idle worker that
196hasn't executed a job within this amount of time will be stopped, unless
197the other parameters say otherwise.
198
199Setting this to a very high value means that workers stay around longer,
200even when they have nothing to do, which can be good as they don't have to
201be started on the netx load spike again.
202
203Setting this to a lower value can be useful to avoid memory or simply
204process table wastage.
205
206Usually, setting this to a time longer than the time between load spikes
207is best - if you expect a lot of requests every minute and little work
208in between, setting this to longer than a minute avoids having to stop
209and start workers. On the other hand, you have to ask yourself if letting
210workers run idle is a good use of your resources. Try to find a good
211balance between resource usage of your workers and the time to start new
212workers - the processes created by L<AnyEvent::Fork> itself is fats at
213creating workers while not using much memory for them, so most of the
214overhead is likely from your own code.
215
216=item on_destroy => $callback->() (default: none)
217
218When a pool object goes out of scope, the outstanding requests are still
219handled till completion. Only after handling all jobs will the workers
220be destroyed (and also the template process if it isn't referenced
221otherwise).
222
223To find out when a pool I<really> has finished its work, you can set this
224callback, which will be called when the pool has been destroyed.
225
226=back
227
228=item AnyEvent::Fork::RPC Parameters
229
230These parameters are all passed more or less directly to
231L<AnyEvent::Fork::RPC>. They are only briefly mentioned here, for
232their full documentation please refer to the L<AnyEvent::Fork::RPC>
233documentation. Also, the default values mentioned here are only documented
234as a best effort - the L<AnyEvent::Fork::RPC> documentation is binding.
235
236=over 4
120 237
121=item async => $boolean (default: 0) 238=item async => $boolean (default: 0)
122 239
123The default server used in the child does all I/O blockingly, and only 240Whether to use the synchronous or asynchronous RPC backend.
124allows a single RPC call to execute concurrently.
125 241
126Setting C<async> to a true value switches to another implementation that 242=item on_error => $callback->($message) (default: die with message)
127uses L<AnyEvent> in the child and allows multiple concurrent RPC calls.
128 243
129The actual API in the child is documented in the section that describes 244The callback to call on any (fatal) errors.
130the calling semantics of the returned C<$rpc> function.
131 245
132If you want to pre-load the actual back-end modules to enable memory 246=item on_event => $callback->(...) (default: C<sub { }>, unlike L<AnyEvent::Fork::RPC>)
133sharing, then you should load C<AnyEvent::Fork::RPC::Sync> for
134synchronous, and C<AnyEvent::Fork::RPC::Async> for asynchronous mode.
135 247
136If you use a template process and want to fork both sync and async 248The callback to invoke on events.
137children, then it is permissible to load both modules.
138 249
139=item serialiser => $string (default: '(sub { pack "(w/a*)*", @_ }, sub { unpack "(w/a*)*", shift })') 250=item init => $initfunction (default: none)
140 251
141All arguments, result data and event data have to be serialised to be 252The function to call in the child, once before handling requests.
142transferred between the processes. For this, they have to be frozen and
143thawed in both parent and child processes.
144 253
145By default, only octet strings can be passed between the processes, which 254=item serialiser => $serialiser (defailt: $AnyEvent::Fork::RPC::STRING_SERIALISER)
146is reasonably fast and efficient.
147 255
148For more complicated use cases, you can provide your own freeze and thaw 256The serialiser to use.
149functions, by specifying a string with perl source code. It's supposed to
150return two code references when evaluated: the first receives a list of
151perl values and must return an octet string. The second receives the octet
152string and must return the original list of values.
153
154If you need an external module for serialisation, then you can either
155pre-load it into your L<AnyEvent::Fork> process, or you can add a C<use>
156or C<require> statement into the serialiser string. Or both.
157 257
158=back 258=back
159 259
160See the examples section earlier in this document for some actual 260=back
161examples.
162 261
163=cut 262=cut
164 263
165sub new { 264sub run {
166 my ($self, $function, %arg) = @_; 265 my ($template, $function, %arg) = @_;
167 266
168 my $serialiser = delete $arg{serialiser} || $STRING_SERIALISER; 267 my $max = $arg{max} || 4;
268 my $idle = $arg{idle} || 0,
269 my $load = $arg{load} || 2,
270 my $start = $arg{start} || 0.1,
271 my $stop = $arg{stop} || 10,
169 my $on_event = delete $arg{on_event}; 272 my $on_event = $arg{on_event} || sub { },
170 my $on_error = delete $arg{on_error};
171 my $on_destroy = delete $arg{on_destroy}; 273 my $on_destroy = $arg{on_destroy};
274
275 my @rpc = (
276 async => $arg{async},
277 init => $arg{init},
278 serialiser => delete $arg{serialiser},
279 on_error => $arg{on_error},
280 );
281
282 my (@pool, @queue, $nidle, $start_w, $stop_w, $shutdown);
283 my ($start_worker, $stop_worker, $want_start, $want_stop, $scheduler);
284
285 my $destroy_guard = Guard::guard {
286 $on_destroy->()
287 if $on_destroy;
288 };
289
290 $template
291 ->require ("AnyEvent::Fork::RPC::" . ($arg{async} ? "Async" : "Sync"))
292 ->eval ('
293 my ($magic0, $magic1) = @_;
294 sub AnyEvent::Fork::Pool::retire() {
295 AnyEvent::Fork::RPC::event $magic0, "quit", $magic1;
296 }
297 ', $magic0, $magic1)
172 298 ;
173 # default for on_error is to on_event, if specified
174 $on_error ||= $on_event
175 ? sub { $on_event->(error => shift) }
176 : sub { die "AnyEvent::Fork::RPC: uncaught error: $_[0].\n" };
177 299
178 # default for on_event is to raise an error 300 $start_worker = sub {
179 $on_event ||= sub { $on_error->("event received, but no on_event handler") }; 301 my $proc = [0, 0, undef]; # load, index, rpc
180 302
181 my ($f, $t) = eval $serialiser; die $@ if $@; 303 $proc->[2] = $template
304 ->fork
305 ->AnyEvent::Fork::RPC::run ($function,
306 @rpc,
307 on_event => sub {
308 if (@_ == 3 && $_[0] eq $magic0 && $_[2] eq $magic1) {
309 $destroy_guard if 0; # keep it alive
182 310
183 my (@rcb, %rcb, $fh, $shutdown, $wbuf, $ww); 311 $_[1] eq "quit" and $stop_worker->($proc);
184 my ($rlen, $rbuf, $rw) = 512 - 16; 312 return;
313 }
185 314
186 my $wcb = sub { 315 &$on_event;
187 my $len = syswrite $fh, $wbuf; 316 },
317 )
318 ;
188 319
189 unless (defined $len) { 320 ++$nidle;
190 if ($! != Errno::EAGAIN && $! != Errno::EWOULDBLOCK) { 321 Array::Heap::push_heap_idx @pool, $proc;
191 undef $rw; undef $ww; # it ends here 322
192 $on_error->("$!"); 323 Scalar::Util::weaken $proc;
324 };
325
326 $stop_worker = sub {
327 my $proc = shift;
328
329 $proc->[0]
330 or --$nidle;
331
332 Array::Heap::splice_heap_idx @pool, $proc->[1]
333 if defined $proc->[1];
334
335 @$proc = 0; # tell others to leave it be
336 };
337
338 $want_start = sub {
339 undef $stop_w;
340
341 $start_w ||= AE::timer $start, $start, sub {
342 if (($nidle < $idle || @queue) && @pool < $max) {
343 $start_worker->();
344 $scheduler->();
345 } else {
346 undef $start_w;
193 } 347 }
194 } 348 };
349 };
195 350
196 substr $wbuf, 0, $len, ""; 351 $want_stop = sub {
352 $stop_w ||= AE::timer $stop, $stop, sub {
353 $stop_worker->($pool[0])
354 if $nidle;
197 355
198 unless (length $wbuf) { 356 undef $stop_w
357 if $nidle <= $idle;
358 };
359 };
360
361 $scheduler = sub {
362 if (@queue) {
363 while (@queue) {
364 @pool or $start_worker->();
365
366 my $proc = $pool[0];
367
368 if ($proc->[0] < $load) {
369 # found free worker, increase load
370 unless ($proc->[0]++) {
371 # worker became busy
372 --$nidle
373 or undef $stop_w;
374
375 $want_start->()
376 if $nidle < $idle && @pool < $max;
377 }
378
379 Array::Heap::adjust_heap_idx @pool, 0;
380
381 my $job = shift @queue;
382 my $ocb = pop @$job;
383
384 $proc->[2]->(@$job, sub {
385 # reduce load
386 --$proc->[0] # worker still busy?
387 or ++$nidle > $idle # not too many idle processes?
388 or $want_stop->();
389
390 Array::Heap::adjust_heap_idx @pool, $proc->[1]
391 if defined $proc->[1];
392
393 &$ocb;
394
395 $scheduler->();
396 });
397 } else {
398 $want_start->()
399 unless @pool >= $max;
400
401 last;
402 }
403 }
404 } elsif ($shutdown) {
405 @pool = ();
199 undef $ww; 406 undef $start_w;
200 $shutdown and shutdown $fh, 1; 407 undef $start_worker; # frees $destroy_guard reference
408
409 $stop_worker->($pool[0])
410 while $nidle;
201 } 411 }
202 }; 412 };
203 413
204 my $module = "AnyEvent::Fork::RPC::" . ($arg{async} ? "Async" : "Sync"); 414 my $shutdown_guard = Guard::guard {
415 $shutdown = 1;
416 $scheduler->();
417 };
205 418
206 $self->require ($module) 419 $start_worker->()
207 ->send_arg ($function, $arg{init}, $serialiser) 420 while @pool < $idle;
208 ->run ("$module\::run", sub {
209 $fh = shift;
210 421
211 my ($id, $len); 422 sub {
212 $rw = AE::io $fh, 0, sub { 423 $shutdown_guard if 0; # keep it alive
213 $rlen = $rlen * 2 + 16 if $rlen - 128 < length $rbuf;
214 $len = sysread $fh, $rbuf, $rlen - length $rbuf, length $rbuf;
215 424
216 if ($len) { 425 $start_worker->()
217 while (8 <= length $rbuf) { 426 unless @pool;
218 ($id, $len) = unpack "LL", $rbuf;
219 8 + $len <= length $rbuf
220 or last;
221 427
222 my @r = $t->(substr $rbuf, 8, $len); 428 push @queue, [@_];
223 substr $rbuf, 0, 8 + $len, ""; 429 $scheduler->();
430 }
431}
224 432
225 if ($id) { 433=item $pool->(..., $cb->(...))
226 if (@rcb) { 434
227 (shift @rcb)->(@r); 435Call the RPC function of a worker with the given arguments, and when the
228 } elsif (my $cb = delete $rcb{$id}) { 436worker is done, call the C<$cb> with the results, just like calling the
229 $cb->(@r); 437RPC object durectly - see the L<AnyEvent::Fork::RPC> documentation for
230 } else { 438details on the RPC API.
231 undef $rw; undef $ww; 439
232 $on_error->("unexpected data from child"); 440If there is no free worker, the call will be queued until a worker becomes
441available.
442
443Note that there can be considerable time between calling this method and
444the call actually being executed. During this time, the parameters passed
445to this function are effectively read-only - modifying them after the call
446and before the callback is invoked causes undefined behaviour.
447
448=cut
449
450=item $cpus = AnyEvent::Fork::Pool::ncpu [$default_cpus]
451
452=item ($cpus, $eus) = AnyEvent::Fork::Pool::ncpu [$default_cpus]
453
454Tries to detect the number of CPUs (C<$cpus> often called cpu cores
455nowadays) and execution units (C<$eus>) which include e.g. extra
456hyperthreaded units). When C<$cpus> cannot be determined reliably,
457C<$default_cpus> is returned for both values, or C<1> if it is missing.
458
459For normal CPU bound uses, it is wise to have as many worker processes
460as CPUs in the system (C<$cpus>), if nothing else uses the CPU. Using
461hyperthreading is usually detrimental to performance, but in those rare
462cases where that really helps it might be beneficial to use more workers
463(C<$eus>).
464
465Currently, F</proc/cpuinfo> is parsed on GNU/Linux systems for both
466C<$cpus> and C<$eu>, and on {Free,Net,Open}BSD, F<sysctl -n hw.ncpu> is
467used for C<$cpus>.
468
469Example: create a worker pool with as many workers as cpu cores, or C<2>,
470if the actual number could not be determined.
471
472 $fork->AnyEvent::Fork::Pool::run ("myworker::function",
473 max => (scalar AnyEvent::Fork::Pool::ncpu 2),
474 );
475
476=cut
477
478BEGIN {
479 if ($^O eq "linux") {
480 *ncpu = sub(;$) {
481 my ($cpus, $eus);
482
483 if (open my $fh, "<", "/proc/cpuinfo") {
484 my %id;
485
486 while (<$fh>) {
487 if (/^core id\s*:\s*(\d+)/) {
233 } 488 ++$eus;
234 } else { 489 undef $id{$1};
235 $on_event->(@r);
236 } 490 }
237 } 491 }
238 } elsif (defined $len) {
239 undef $rw; undef $ww; # it ends here
240 492
241 if (@rcb || %rcb) { 493 $cpus = scalar keys %id;
242 $on_error->("unexpected eof");
243 } else { 494 } else {
244 $on_destroy->(); 495 $cpus = $eus = @_ ? shift : 1;
245 }
246 } elsif ($! != Errno::EAGAIN && $! != Errno::EWOULDBLOCK) {
247 undef $rw; undef $ww; # it ends here
248 $on_error->("read: $!");
249 } 496 }
497 wantarray ? ($cpus, $eus) : $cpus
250 }; 498 };
251 499 } elsif ($^O eq "freebsd" || $^O eq "netbsd" || $^O eq "openbsd") {
252 $ww ||= AE::io $fh, 1, $wcb; 500 *ncpu = sub(;$) {
253 }); 501 my $cpus = qx<sysctl -n hw.ncpu> * 1
254 502 || (@_ ? shift : 1);
255 my $guard = Guard::guard { 503 wantarray ? ($cpus, $cpus) : $cpus
256 $shutdown = 1; 504 };
257 $ww ||= $fh && AE::io $fh, 1, $wcb; 505 } else {
506 *ncpu = sub(;$) {
507 my $cpus = @_ ? shift : 1;
508 wantarray ? ($cpus, $cpus) : $cpus
509 };
258 }; 510 }
259
260 my $id;
261
262 $arg{async}
263 ? sub {
264 $id = ($id == 0xffffffff ? 0 : $id) + 1;
265 $id = ($id == 0xffffffff ? 0 : $id) + 1 while exists $rcb{$id}; # rarely loops
266
267 $rcb{$id} = pop;
268
269 $guard; # keep it alive
270
271 $wbuf .= pack "LL/a*", $id, &$f;
272 $ww ||= $fh && AE::io $fh, 1, $wcb;
273 }
274 : sub {
275 push @rcb, pop;
276
277 $guard; # keep it alive
278
279 $wbuf .= pack "L/a*", &$f;
280 $ww ||= $fh && AE::io $fh, 1, $wcb;
281 }
282} 511}
283 512
284=item $pool->call (..., $cb->(...))
285
286=back 513=back
287 514
515=head1 CHILD USAGE
516
517In addition to the L<AnyEvent::Fork::RPC> API, this module implements one
518more child-side function:
519
520=over 4
521
522=item AnyEvent::Fork::Pool::retire ()
523
524This function sends an event to the parent process to request retirement:
525the worker is removed from the pool and no new jobs will be sent to it,
526but it has to handle the jobs that are already queued.
527
528The parentheses are part of the syntax: the function usually isn't defined
529when you compile your code (because that happens I<before> handing the
530template process over to C<AnyEvent::Fork::Pool::run>, so you need the
531empty parentheses to tell Perl that the function is indeed a function.
532
533Retiring a worker can be useful to gracefully shut it down when the worker
534deems this useful. For example, after executing a job, one could check
535the process size or the number of jobs handled so far, and if either is
536too high, the worker could ask to get retired, to avoid memory leaks to
537accumulate.
538
539Example: retire a worker after it has handled roughly 100 requests.
540
541 my $count = 0;
542
543 sub my::worker {
544
545 ++$count == 100
546 and AnyEvent::Fork::Pool::retire ();
547
548 ... normal code goes here
549 }
550
551=back
552
553=head1 POOL PARAMETERS RECIPES
554
555This section describes some recipes for pool paramaters. These are mostly
556meant for the synchronous RPC backend, as the asynchronous RPC backend
557changes the rules considerably, making workers themselves responsible for
558their scheduling.
559
560=over 4
561
562=item low latency - set load = 1
563
564If you need a deterministic low latency, you should set the C<load>
565parameter to C<1>. This ensures that never more than one job is sent to
566each worker. This avoids having to wait for a previous job to finish.
567
568This makes most sense with the synchronous (default) backend, as the
569asynchronous backend can handle multiple requests concurrently.
570
571=item lowest latency - set load = 1 and idle = max
572
573To achieve the lowest latency, you additionally should disable any dynamic
574resizing of the pool by setting C<idle> to the same value as C<max>.
575
576=item high throughput, cpu bound jobs - set load >= 2, max = #cpus
577
578To get high throughput with cpu-bound jobs, you should set the maximum
579pool size to the number of cpus in your system, and C<load> to at least
580C<2>, to make sure there can be another job waiting for the worker when it
581has finished one.
582
583The value of C<2> for C<load> is the minimum value that I<can> achieve
584100% throughput, but if your parent process itself is sometimes busy, you
585might need higher values. Also there is a limit on the amount of data that
586can be "in flight" to the worker, so if you send big blobs of data to your
587worker, C<load> might have much less of an effect.
588
589=item high throughput, I/O bound jobs - set load >= 2, max = 1, or very high
590
591When your jobs are I/O bound, using more workers usually boils down to
592higher throughput, depending very much on your actual workload - sometimes
593having only one worker is best, for example, when you read or write big
594files at maixmum speed, as a second worker will increase seek times.
595
596=back
597
598=head1 EXCEPTIONS
599
600The same "policy" as with L<AnyEvent::Fork::RPC> applies - exceptins will
601not be caught, and exceptions in both worker and in callbacks causes
602undesirable or undefined behaviour.
603
288=head1 SEE ALSO 604=head1 SEE ALSO
289 605
290L<AnyEvent::Fork>, to create the processes in the first place. 606L<AnyEvent::Fork>, to create the processes in the first place.
607
608L<AnyEvent::Fork::Remote>, likewise, but helpful for remote processes.
291 609
292L<AnyEvent::Fork::RPC>, which implements the RPC protocol and API. 610L<AnyEvent::Fork::RPC>, which implements the RPC protocol and API.
293 611
294=head1 AUTHOR AND CONTACT INFORMATION 612=head1 AUTHOR AND CONTACT INFORMATION
295 613

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines