ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-Fork-Pool/Pool.pm
(Generate patch)

Comparing AnyEvent-Fork-Pool/Pool.pm (file contents):
Revision 1.1 by root, Thu Apr 18 14:24:01 2013 UTC vs.
Revision 1.9 by root, Thu Apr 25 00:27:22 2013 UTC

1=head1 NAME 1=head1 NAME
2 2
3AnyEvent::Fork::Pool - simple process pool manager on top of AnyEvent::Fork 3AnyEvent::Fork::Pool - simple process pool manager on top of AnyEvent::Fork
4
5THE API IS NOT FINISHED, CONSIDER THIS AN ALPHA RELEASE
4 6
5=head1 SYNOPSIS 7=head1 SYNOPSIS
6 8
7 use AnyEvent; 9 use AnyEvent;
8 use AnyEvent::Fork::Pool; 10 use AnyEvent::Fork::Pool;
9 # use AnyEvent::Fork is not needed 11 # use AnyEvent::Fork is not needed
10 12
11 # all parameters with default values 13 # all possible parameters shown, with default values
12 my $pool = new AnyEvent::Fork::Pool 14 my $pool = AnyEvent::Fork
13 "MyWorker::run", 15 ->new
16 ->require ("MyWorker")
17 ->AnyEvent::Fork::Pool::run (
18 "MyWorker::run", # the worker function
14 19
15 # pool management 20 # pool management
16 min => 0, # minimum # of processes
17 max => 8, # maximum # of processes 21 max => 4, # absolute maximum # of processes
22 idle => 0, # minimum # of idle processes
23 load => 2, # queue at most this number of jobs per process
18 busy_time => 0, # wait this before starting a new process 24 start => 0.1, # wait this many seconds before starting a new process
19 max_idle => 1, # wait this before killing an idle process 25 stop => 10, # wait this many seconds before stopping an idle process
20 idle_time => 1, # at most this many idle processes 26 on_destroy => (my $finish = AE::cv), # called when object is destroyed
21 27
22 # template process
23 template => AnyEvent::Fork->new, # the template process to use
24 require => [MyWorker::], # module(s) to load
25 eval => "# perl code to execute in template",
26 on_destroy => (my $finish = AE::cv),
27
28 # parameters passed to AnyEvent::Fork::RPC 28 # parameters passed to AnyEvent::Fork::RPC
29 async => 0, 29 async => 0,
30 on_error => sub { die "FATAL: $_[0]\n" }, 30 on_error => sub { die "FATAL: $_[0]\n" },
31 on_event => sub { my @ev = @_ }, 31 on_event => sub { my @ev = @_ },
32 init => "MyWorker::init", 32 init => "MyWorker::init",
33 serialiser => $AnyEvent::Fork::RPC::STRING_SERIALISER, 33 serialiser => $AnyEvent::Fork::RPC::STRING_SERIALISER,
34 ; 34 );
35 35
36 for (1..10) { 36 for (1..10) {
37 $pool->call (doit => $_, sub { 37 $pool->(doit => $_, sub {
38 print "MyWorker::run returned @_\n"; 38 print "MyWorker::run returned @_\n";
39 }); 39 });
40 } 40 }
41 41
42 undef $pool; 42 undef $pool;
50pool of processes that handles jobs. 50pool of processes that handles jobs.
51 51
52Understanding of L<AnyEvent::Fork> is helpful but not critical to be able 52Understanding of L<AnyEvent::Fork> is helpful but not critical to be able
53to use this module, but a thorough understanding of L<AnyEvent::Fork::RPC> 53to use this module, but a thorough understanding of L<AnyEvent::Fork::RPC>
54is, as it defines the actual API that needs to be implemented in the 54is, as it defines the actual API that needs to be implemented in the
55children. 55worker processes.
56 56
57=head1 EXAMPLES 57=head1 EXAMPLES
58 58
59=head1 API 59=head1 PARENT USAGE
60
61To create a pool, you first have to create a L<AnyEvent::Fork> object -
62this object becomes your template process. Whenever a new worker process
63is needed, it is forked from this template process. Then you need to
64"hand off" this template process to the C<AnyEvent::Fork::Pool> module by
65calling its run method on it:
66
67 my $template = AnyEvent::Fork
68 ->new
69 ->require ("SomeModule", "MyWorkerModule");
70
71 my $pool = $template->AnyEvent::Fork::Pool::run ("MyWorkerModule::myfunction");
72
73The pool "object" is not a regular Perl object, but a code reference that
74you can call and that works roughly like calling the worker function
75directly, except that it returns nothing but instead you need to specify a
76callback to be invoked once results are in:
77
78 $pool->(1, 2, 3, sub { warn "myfunction(1,2,3) returned @_" });
60 79
61=over 4 80=over 4
62 81
63=cut 82=cut
64 83
65package AnyEvent::Fork::Pool; 84package AnyEvent::Fork::Pool;
66 85
67use common::sense; 86use common::sense;
68 87
88use Scalar::Util ();
89
69use Guard (); 90use Guard ();
91use Array::Heap ();
70 92
71use AnyEvent; 93use AnyEvent;
94# explicit version on next line, as some cpan-testers test with the 0.1 version,
95# ignoring dependencies, and this line will at least give a clear indication of that.
72use AnyEvent::Fork; # we don't actually depend on it, this is for convenience 96use AnyEvent::Fork 0.6; # we don't actually depend on it, this is for convenience
73use AnyEvent::Fork::RPC; 97use AnyEvent::Fork::RPC;
74 98
99# these are used for the first and last argument of events
100# in the hope of not colliding. yes, I don't like it either,
101# but didn't come up with an obviously better alternative.
102my $magic0 = ':t6Z@HK1N%Dx@_7?=~-7NQgWDdAs6a,jFN=wLO0*jD*1%P';
103my $magic1 = '<~53rexz.U`!]X[A235^"fyEoiTF\T~oH1l/N6+Djep9b~bI9`\1x%B~vWO1q*';
104
75our $VERSION = 0.1; 105our $VERSION = 0.1;
76 106
77=item my $rpc = new AnyEvent::Fork::RPC::pool $function, [key => value...] 107=item my $pool = AnyEvent::Fork::Pool::run $fork, $function, [key => value...]
108
109The traditional way to call the pool creation function. But it is way
110cooler to call it in the following way:
111
112=item my $pool = $fork->AnyEvent::Fork::Pool::run ($function, [key => value...])
113
114Creates a new pool object with the specified C<$function> as function
115(name) to call for each request. The pool uses the C<$fork> object as the
116template when creating worker processes.
117
118You can supply your own template process, or tell C<AnyEvent::Fork::Pool>
119to create one.
120
121A relatively large number of key/value pairs can be specified to influence
122the behaviour. They are grouped into the categories "pool management",
123"template process" and "rpc parameters".
78 124
79=over 4 125=over 4
80 126
81=item on_error => $cb->($msg) 127=item Pool Management
82 128
83Called on (fatal) errors, with a descriptive (hopefully) message. If 129The pool consists of a certain number of worker processes. These options
84this callback is not provided, but C<on_event> is, then the C<on_event> 130decide how many of these processes exist and when they are started and
85callback is called with the first argument being the string C<error>, 131stopped.
86followed by the error message.
87 132
88If neither handler is provided it prints the error to STDERR and will 133The worker pool is dynamically resized, according to (perceived :)
89start failing badly. 134load. The minimum size is given by the C<idle> parameter and the maximum
135size is given by the C<max> parameter. A new worker is started every
136C<start> seconds at most, and an idle worker is stopped at most every
137C<stop> second.
90 138
91=item on_event => $cb->(...) 139You can specify the amount of jobs sent to a worker concurrently using the
140C<load> parameter.
92 141
93Called for every call to the C<AnyEvent::Fork::RPC::event> function in the 142=over 4
94child, with the arguments of that function passed to the callback.
95 143
96Also called on errors when no C<on_error> handler is provided. 144=item idle => $count (default: 0)
97 145
98=item on_destroy => $cb->() 146The minimum amount of idle processes in the pool - when there are fewer
147than this many idle workers, C<AnyEvent::Fork::Pool> will try to start new
148ones, subject to the limits set by C<max> and C<start>.
99 149
100Called when the C<$rpc> object has been destroyed and all requests have 150This is also the initial amount of workers in the pool. The default of
101been successfully handled. This is useful when you queue some requests and 151zero means that the pool starts empty and can shrink back to zero workers
102want the child to go away after it has handled them. The problem is that 152over time.
103the parent must not exit either until all requests have been handled, and
104this can be accomplished by waiting for this callback.
105 153
106=item init => $function (default none) 154=item max => $count (default: 4)
107 155
108When specified (by name), this function is called in the child as the very 156The maximum number of processes in the pool, in addition to the template
109first thing when taking over the process, with all the arguments normally 157process. C<AnyEvent::Fork::Pool> will never have more than this number of
110passed to the C<AnyEvent::Fork::run> function, except the communications 158worker processes, although there can be more temporarily when a worker is
111socket. 159shut down and hasn't exited yet.
112 160
113It can be used to do one-time things in the child such as storing passed 161=item load => $count (default: 2)
114parameters or opening database connections.
115 162
116It is called very early - before the serialisers are created or the 163The maximum number of concurrent jobs sent to a single worker process.
117C<$function> name is resolved into a function reference, so it could be 164
118used to load any modules that provide the serialiser or function. It can 165Jobs that cannot be sent to a worker immediately (because all workers are
119not, however, create events. 166busy) will be queued until a worker is available.
167
168Setting this low improves latency. For example, at C<1>, every job that
169is sent to a worker is sent to a completely idle worker that doesn't run
170any other jobs. The downside is that throughput is reduced - a worker that
171finishes a job needs to wait for a new job from the parent.
172
173The default of C<2> is usually a good compromise.
174
175=item start => $seconds (default: 0.1)
176
177When there are fewer than C<idle> workers (or all workers are completely
178busy), then a timer is started. If the timer elapses and there are still
179jobs that cannot be queued to a worker, a new worker is started.
180
181This sets the minimum time that all workers must be busy before a new
182worker is started. Or, put differently, the minimum delay between starting
183new workers.
184
185The delay is small by default, which means new workers will be started
186relatively quickly. A delay of C<0> is possible, and ensures that the pool
187will grow as quickly as possible under load.
188
189Non-zero values are useful to avoid "exploding" a pool because a lot of
190jobs are queued in an instant.
191
192Higher values are often useful to improve efficiency at the cost of
193latency - when fewer processes can do the job over time, starting more and
194more is not necessarily going to help.
195
196=item stop => $seconds (default: 10)
197
198When a worker has no jobs to execute it becomes idle. An idle worker that
199hasn't executed a job within this amount of time will be stopped, unless
200the other parameters say otherwise.
201
202Setting this to a very high value means that workers stay around longer,
203even when they have nothing to do, which can be good as they don't have to
204be started on the netx load spike again.
205
206Setting this to a lower value can be useful to avoid memory or simply
207process table wastage.
208
209Usually, setting this to a time longer than the time between load spikes
210is best - if you expect a lot of requests every minute and little work
211in between, setting this to longer than a minute avoids having to stop
212and start workers. On the other hand, you have to ask yourself if letting
213workers run idle is a good use of your resources. Try to find a good
214balance between resource usage of your workers and the time to start new
215workers - the processes created by L<AnyEvent::Fork> itself is fats at
216creating workers while not using much memory for them, so most of the
217overhead is likely from your own code.
218
219=item on_destroy => $callback->() (default: none)
220
221When a pool object goes out of scope, the outstanding requests are still
222handled till completion. Only after handling all jobs will the workers
223be destroyed (and also the template process if it isn't referenced
224otherwise).
225
226To find out when a pool I<really> has finished its work, you can set this
227callback, which will be called when the pool has been destroyed.
228
229=back
230
231=item AnyEvent::Fork::RPC Parameters
232
233These parameters are all passed more or less directly to
234L<AnyEvent::Fork::RPC>. They are only briefly mentioned here, for
235their full documentation please refer to the L<AnyEvent::Fork::RPC>
236documentation. Also, the default values mentioned here are only documented
237as a best effort - the L<AnyEvent::Fork::RPC> documentation is binding.
238
239=over 4
120 240
121=item async => $boolean (default: 0) 241=item async => $boolean (default: 0)
122 242
123The default server used in the child does all I/O blockingly, and only 243Whether to use the synchronous or asynchronous RPC backend.
124allows a single RPC call to execute concurrently.
125 244
126Setting C<async> to a true value switches to another implementation that 245=item on_error => $callback->($message) (default: die with message)
127uses L<AnyEvent> in the child and allows multiple concurrent RPC calls.
128 246
129The actual API in the child is documented in the section that describes 247The callback to call on any (fatal) errors.
130the calling semantics of the returned C<$rpc> function.
131 248
132If you want to pre-load the actual back-end modules to enable memory 249=item on_event => $callback->(...) (default: C<sub { }>, unlike L<AnyEvent::Fork::RPC>)
133sharing, then you should load C<AnyEvent::Fork::RPC::Sync> for
134synchronous, and C<AnyEvent::Fork::RPC::Async> for asynchronous mode.
135 250
136If you use a template process and want to fork both sync and async 251The callback to invoke on events.
137children, then it is permissible to load both modules.
138 252
139=item serialiser => $string (default: '(sub { pack "(w/a*)*", @_ }, sub { unpack "(w/a*)*", shift })') 253=item init => $initfunction (default: none)
140 254
141All arguments, result data and event data have to be serialised to be 255The function to call in the child, once before handling requests.
142transferred between the processes. For this, they have to be frozen and
143thawed in both parent and child processes.
144 256
145By default, only octet strings can be passed between the processes, which 257=item serialiser => $serialiser (defailt: $AnyEvent::Fork::RPC::STRING_SERIALISER)
146is reasonably fast and efficient.
147 258
148For more complicated use cases, you can provide your own freeze and thaw 259The serialiser to use.
149functions, by specifying a string with perl source code. It's supposed to
150return two code references when evaluated: the first receives a list of
151perl values and must return an octet string. The second receives the octet
152string and must return the original list of values.
153
154If you need an external module for serialisation, then you can either
155pre-load it into your L<AnyEvent::Fork> process, or you can add a C<use>
156or C<require> statement into the serialiser string. Or both.
157 260
158=back 261=back
159 262
160See the examples section earlier in this document for some actual 263=back
161examples.
162 264
163=cut 265=cut
164 266
165sub new { 267sub run {
166 my ($self, $function, %arg) = @_; 268 my ($template, $function, %arg) = @_;
167 269
168 my $serialiser = delete $arg{serialiser} || $STRING_SERIALISER; 270 my $max = $arg{max} || 4;
271 my $idle = $arg{idle} || 0,
272 my $load = $arg{load} || 2,
273 my $start = $arg{start} || 0.1,
274 my $stop = $arg{stop} || 10,
169 my $on_event = delete $arg{on_event}; 275 my $on_event = $arg{on_event} || sub { },
170 my $on_error = delete $arg{on_error};
171 my $on_destroy = delete $arg{on_destroy}; 276 my $on_destroy = $arg{on_destroy};
277
278 my @rpc = (
279 async => $arg{async},
280 init => $arg{init},
281 serialiser => delete $arg{serialiser},
282 on_error => $arg{on_error},
283 );
284
285 my (@pool, @queue, $nidle, $start_w, $stop_w, $shutdown);
286 my ($start_worker, $stop_worker, $want_start, $want_stop, $scheduler);
287
288 my $destroy_guard = Guard::guard {
289 $on_destroy->()
290 if $on_destroy;
291 };
292
293 $template
294 ->require ("AnyEvent::Fork::RPC::" . ($arg{async} ? "Async" : "Sync"))
295 ->eval ('
296 my ($magic0, $magic1) = @_;
297 sub AnyEvent::Fork::Pool::retire() {
298 AnyEvent::Fork::RPC::event $magic0, "quit", $magic1;
299 }
300 ', $magic0, $magic1)
172 301 ;
173 # default for on_error is to on_event, if specified
174 $on_error ||= $on_event
175 ? sub { $on_event->(error => shift) }
176 : sub { die "AnyEvent::Fork::RPC: uncaught error: $_[0].\n" };
177 302
178 # default for on_event is to raise an error 303 $start_worker = sub {
179 $on_event ||= sub { $on_error->("event received, but no on_event handler") }; 304 my $proc = [0, 0, undef]; # load, index, rpc
180 305
181 my ($f, $t) = eval $serialiser; die $@ if $@; 306 $proc->[2] = $template
307 ->fork
308 ->AnyEvent::Fork::RPC::run ($function,
309 @rpc,
310 on_event => sub {
311 if (@_ == 3 && $_[0] eq $magic0 && $_[2] eq $magic1) {
312 $destroy_guard if 0; # keep it alive
182 313
183 my (@rcb, %rcb, $fh, $shutdown, $wbuf, $ww); 314 $_[1] eq "quit" and $stop_worker->($proc);
184 my ($rlen, $rbuf, $rw) = 512 - 16; 315 return;
316 }
185 317
186 my $wcb = sub { 318 &$on_event;
187 my $len = syswrite $fh, $wbuf; 319 },
320 )
321 ;
188 322
189 unless (defined $len) { 323 ++$nidle;
190 if ($! != Errno::EAGAIN && $! != Errno::EWOULDBLOCK) { 324 Array::Heap::push_heap_idx @pool, $proc;
191 undef $rw; undef $ww; # it ends here 325
192 $on_error->("$!"); 326 Scalar::Util::weaken $proc;
327 };
328
329 $stop_worker = sub {
330 my $proc = shift;
331
332 $proc->[0]
333 or --$nidle;
334
335 Array::Heap::splice_heap_idx @pool, $proc->[1]
336 if defined $proc->[1];
337
338 @$proc = 0; # tell others to leave it be
339 };
340
341 $want_start = sub {
342 undef $stop_w;
343
344 $start_w ||= AE::timer $start, $start, sub {
345 if (($nidle < $idle || @queue) && @pool < $max) {
346 $start_worker->();
347 $scheduler->();
348 } else {
349 undef $start_w;
193 } 350 }
194 } 351 };
352 };
195 353
196 substr $wbuf, 0, $len, ""; 354 $want_stop = sub {
355 $stop_w ||= AE::timer $stop, $stop, sub {
356 $stop_worker->($pool[0])
357 if $nidle;
197 358
198 unless (length $wbuf) { 359 undef $stop_w
360 if $nidle <= $idle;
361 };
362 };
363
364 $scheduler = sub {
365 if (@queue) {
366 while (@queue) {
367 @pool or $start_worker->();
368
369 my $proc = $pool[0];
370
371 if ($proc->[0] < $load) {
372 # found free worker, increase load
373 unless ($proc->[0]++) {
374 # worker became busy
375 --$nidle
376 or undef $stop_w;
377
378 $want_start->()
379 if $nidle < $idle && @pool < $max;
380 }
381
382 Array::Heap::adjust_heap_idx @pool, 0;
383
384 my $job = shift @queue;
385 my $ocb = pop @$job;
386
387 $proc->[2]->(@$job, sub {
388 # reduce load
389 --$proc->[0] # worker still busy?
390 or ++$nidle > $idle # not too many idle processes?
391 or $want_stop->();
392
393 Array::Heap::adjust_heap_idx @pool, $proc->[1]
394 if defined $proc->[1];
395
396 &$ocb;
397
398 $scheduler->();
399 });
400 } else {
401 $want_start->()
402 unless @pool >= $max;
403
404 last;
405 }
406 }
407 } elsif ($shutdown) {
408 @pool = ();
199 undef $ww; 409 undef $start_w;
200 $shutdown and shutdown $fh, 1; 410 undef $start_worker; # frees $destroy_guard reference
411
412 $stop_worker->($pool[0])
413 while $nidle;
201 } 414 }
202 }; 415 };
203 416
204 my $module = "AnyEvent::Fork::RPC::" . ($arg{async} ? "Async" : "Sync"); 417 my $shutdown_guard = Guard::guard {
418 $shutdown = 1;
419 $scheduler->();
420 };
205 421
206 $self->require ($module) 422 $start_worker->()
207 ->send_arg ($function, $arg{init}, $serialiser) 423 while @pool < $idle;
208 ->run ("$module\::run", sub {
209 $fh = shift;
210 424
211 my ($id, $len); 425 sub {
212 $rw = AE::io $fh, 0, sub { 426 $shutdown_guard if 0; # keep it alive
213 $rlen = $rlen * 2 + 16 if $rlen - 128 < length $rbuf;
214 $len = sysread $fh, $rbuf, $rlen - length $rbuf, length $rbuf;
215 427
216 if ($len) { 428 $start_worker->()
217 while (8 <= length $rbuf) { 429 unless @pool;
218 ($id, $len) = unpack "LL", $rbuf;
219 8 + $len <= length $rbuf
220 or last;
221 430
222 my @r = $t->(substr $rbuf, 8, $len); 431 push @queue, [@_];
223 substr $rbuf, 0, 8 + $len, ""; 432 $scheduler->();
433 }
434}
224 435
225 if ($id) { 436=item $pool->(..., $cb->(...))
226 if (@rcb) { 437
227 (shift @rcb)->(@r); 438Call the RPC function of a worker with the given arguments, and when the
228 } elsif (my $cb = delete $rcb{$id}) { 439worker is done, call the C<$cb> with the results, just like calling the
229 $cb->(@r); 440RPC object durectly - see the L<AnyEvent::Fork::RPC> documentation for
230 } else { 441details on the RPC API.
231 undef $rw; undef $ww; 442
232 $on_error->("unexpected data from child"); 443If there is no free worker, the call will be queued until a worker becomes
444available.
445
446Note that there can be considerable time between calling this method and
447the call actually being executed. During this time, the parameters passed
448to this function are effectively read-only - modifying them after the call
449and before the callback is invoked causes undefined behaviour.
450
451=cut
452
453=item $cpus = AnyEvent::Fork::Pool::ncpu [$default_cpus]
454
455=item ($cpus, $eus) = AnyEvent::Fork::Pool::ncpu [$default_cpus]
456
457Tries to detect the number of CPUs (C<$cpus> often called cpu cores
458nowadays) and execution units (C<$eus>) which include e.g. extra
459hyperthreaded units). When C<$cpus> cannot be determined reliably,
460C<$default_cpus> is returned for both values, or C<1> if it is missing.
461
462For normal CPU bound uses, it is wise to have as many worker processes
463as CPUs in the system (C<$cpus>), if nothing else uses the CPU. Using
464hyperthreading is usually detrimental to performance, but in those rare
465cases where that really helps it might be beneficial to use more workers
466(C<$eus>).
467
468Currently, F</proc/cpuinfo> is parsed on GNU/Linux systems for both
469C<$cpus> and C<$eu>, and on {Free,Net,Open}BSD, F<sysctl -n hw.ncpu> is
470used for C<$cpus>.
471
472Example: create a worker pool with as many workers as cpu cores, or C<2>,
473if the actual number could not be determined.
474
475 $fork->AnyEvent::Fork::Pool::run ("myworker::function",
476 max => (scalar AnyEvent::Fork::Pool::ncpu 2),
477 );
478
479=cut
480
481BEGIN {
482 if ($^O eq "linux") {
483 *ncpu = sub(;$) {
484 my ($cpus, $eus);
485
486 if (open my $fh, "<", "/proc/cpuinfo") {
487 my %id;
488
489 while (<$fh>) {
490 if (/^core id\s*:\s*(\d+)/) {
233 } 491 ++$eus;
234 } else { 492 undef $id{$1};
235 $on_event->(@r);
236 } 493 }
237 } 494 }
238 } elsif (defined $len) {
239 undef $rw; undef $ww; # it ends here
240 495
241 if (@rcb || %rcb) { 496 $cpus = scalar keys %id;
242 $on_error->("unexpected eof");
243 } else { 497 } else {
244 $on_destroy->(); 498 $cpus = $eus = @_ ? shift : 1;
245 }
246 } elsif ($! != Errno::EAGAIN && $! != Errno::EWOULDBLOCK) {
247 undef $rw; undef $ww; # it ends here
248 $on_error->("read: $!");
249 } 499 }
500 wantarray ? ($cpus, $eus) : $cpus
250 }; 501 };
251 502 } elsif ($^O eq "freebsd" || $^O eq "netbsd" || $^O eq "openbsd") {
252 $ww ||= AE::io $fh, 1, $wcb; 503 *ncpu = sub(;$) {
253 }); 504 my $cpus = qx<sysctl -n hw.ncpu> * 1
254 505 || (@_ ? shift : 1);
255 my $guard = Guard::guard { 506 wantarray ? ($cpus, $cpus) : $cpus
256 $shutdown = 1; 507 };
257 $ww ||= $fh && AE::io $fh, 1, $wcb; 508 } else {
509 *ncpu = sub(;$) {
510 my $cpus = @_ ? shift : 1;
511 wantarray ? ($cpus, $cpus) : $cpus
512 };
258 }; 513 }
259
260 my $id;
261
262 $arg{async}
263 ? sub {
264 $id = ($id == 0xffffffff ? 0 : $id) + 1;
265 $id = ($id == 0xffffffff ? 0 : $id) + 1 while exists $rcb{$id}; # rarely loops
266
267 $rcb{$id} = pop;
268
269 $guard; # keep it alive
270
271 $wbuf .= pack "LL/a*", $id, &$f;
272 $ww ||= $fh && AE::io $fh, 1, $wcb;
273 }
274 : sub {
275 push @rcb, pop;
276
277 $guard; # keep it alive
278
279 $wbuf .= pack "L/a*", &$f;
280 $ww ||= $fh && AE::io $fh, 1, $wcb;
281 }
282} 514}
283 515
284=item $pool->call (..., $cb->(...))
285
286=back 516=back
517
518=head1 CHILD USAGE
519
520In addition to the L<AnyEvent::Fork::RPC> API, this module implements one
521more child-side function:
522
523=over 4
524
525=item AnyEvent::Fork::Pool::retire ()
526
527This function sends an event to the parent process to request retirement:
528the worker is removed from the pool and no new jobs will be sent to it,
529but it has to handle the jobs that are already queued.
530
531The parentheses are part of the syntax: the function usually isn't defined
532when you compile your code (because that happens I<before> handing the
533template process over to C<AnyEvent::Fork::Pool::run>, so you need the
534empty parentheses to tell Perl that the function is indeed a function.
535
536Retiring a worker can be useful to gracefully shut it down when the worker
537deems this useful. For example, after executing a job, one could check
538the process size or the number of jobs handled so far, and if either is
539too high, the worker could ask to get retired, to avoid memory leaks to
540accumulate.
541
542=back
543
544=head1 POOL PARAMETERS RECIPES
545
546This section describes some recipes for pool paramaters. These are mostly
547meant for the synchronous RPC backend, as the asynchronous RPC backend
548changes the rules considerably, making workers themselves responsible for
549their scheduling.
550
551=over 4
552
553=item low latency - set load = 1
554
555If you need a deterministic low latency, you should set the C<load>
556parameter to C<1>. This ensures that never more than one job is sent to
557each worker. This avoids having to wait for a previous job to finish.
558
559This makes most sense with the synchronous (default) backend, as the
560asynchronous backend can handle multiple requests concurrently.
561
562=item lowest latency - set load = 1 and idle = max
563
564To achieve the lowest latency, you additionally should disable any dynamic
565resizing of the pool by setting C<idle> to the same value as C<max>.
566
567=item high throughput, cpu bound jobs - set load >= 2, max = #cpus
568
569To get high throughput with cpu-bound jobs, you should set the maximum
570pool size to the number of cpus in your system, and C<load> to at least
571C<2>, to make sure there can be another job waiting for the worker when it
572has finished one.
573
574The value of C<2> for C<load> is the minimum value that I<can> achieve
575100% throughput, but if your parent process itself is sometimes busy, you
576might need higher values. Also there is a limit on the amount of data that
577can be "in flight" to the worker, so if you send big blobs of data to your
578worker, C<load> might have much less of an effect.
579
580=item high throughput, I/O bound jobs - set load >= 2, max = 1, or very high
581
582When your jobs are I/O bound, using more workers usually boils down to
583higher throughput, depending very much on your actual workload - sometimes
584having only one worker is best, for example, when you read or write big
585files at maixmum speed, as a second worker will increase seek times.
586
587=back
588
589=head1 EXCEPTIONS
590
591The same "policy" as with L<AnyEvent::Fork::RPC> applies - exceptins will
592not be caught, and exceptions in both worker and in callbacks causes
593undesirable or undefined behaviour.
287 594
288=head1 SEE ALSO 595=head1 SEE ALSO
289 596
290L<AnyEvent::Fork>, to create the processes in the first place. 597L<AnyEvent::Fork>, to create the processes in the first place.
291 598

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines