ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-Fork-Pool/Pool.pm
(Generate patch)

Comparing AnyEvent-Fork-Pool/Pool.pm (file contents):
Revision 1.2 by root, Thu Apr 18 21:37:27 2013 UTC vs.
Revision 1.6 by root, Sun Apr 21 11:17:02 2013 UTC

6 6
7 use AnyEvent; 7 use AnyEvent;
8 use AnyEvent::Fork::Pool; 8 use AnyEvent::Fork::Pool;
9 # use AnyEvent::Fork is not needed 9 # use AnyEvent::Fork is not needed
10 10
11 # all parameters with default values 11 # all possible parameters shown, with default values
12 my $pool = new AnyEvent::Fork::Pool 12 my $pool = AnyEvent::Fork
13 "MyWorker::run", 13 ->new
14 ->require ("MyWorker")
15 ->AnyEvent::Fork::Pool::run (
16 "MyWorker::run", # the worker function
14 17
15 # pool management 18 # pool management
16 min => 0, # minimum # of processes
17 max => 4, # maximum # of processes 19 max => 4, # absolute maximum # of processes
18 max_queue => 2, # queue at most this number of jobs per process 20 idle => 0, # minimum # of idle processes
21 load => 2, # queue at most this number of jobs per process
19 min_delay => 0, # wait this many seconds before starting a new process 22 start => 0.1, # wait this many seconds before starting a new process
20 min_idle => 0, # try to have at least this amount of idle processes
21 max_idle => 1, # at most this many idle processes
22 idle_time => 1, # wait this many seconds before killing an idle process 23 stop => 10, # wait this many seconds before stopping an idle process
23 on_destroy => (my $finish = AE::cv), 24 on_destroy => (my $finish = AE::cv), # called when object is destroyed
24 25
25 # template process
26 template => AnyEvent::Fork->new, # the template process to use
27 require => [MyWorker::], # module(s) to load
28 eval => "# perl code to execute in template",
29
30 # parameters passed to AnyEvent::Fork::RPC 26 # parameters passed to AnyEvent::Fork::RPC
31 async => 0, 27 async => 0,
32 on_error => sub { die "FATAL: $_[0]\n" }, 28 on_error => sub { die "FATAL: $_[0]\n" },
33 on_event => sub { my @ev = @_ }, 29 on_event => sub { my @ev = @_ },
34 init => "MyWorker::init", 30 init => "MyWorker::init",
35 serialiser => $AnyEvent::Fork::RPC::STRING_SERIALISER, 31 serialiser => $AnyEvent::Fork::RPC::STRING_SERIALISER,
36 ; 32 );
37 33
38 for (1..10) { 34 for (1..10) {
39 $pool->call (doit => $_, sub { 35 $pool->(doit => $_, sub {
40 print "MyWorker::run returned @_\n"; 36 print "MyWorker::run returned @_\n";
41 }); 37 });
42 } 38 }
43 39
44 undef $pool; 40 undef $pool;
52pool of processes that handles jobs. 48pool of processes that handles jobs.
53 49
54Understanding of L<AnyEvent::Fork> is helpful but not critical to be able 50Understanding of L<AnyEvent::Fork> is helpful but not critical to be able
55to use this module, but a thorough understanding of L<AnyEvent::Fork::RPC> 51to use this module, but a thorough understanding of L<AnyEvent::Fork::RPC>
56is, as it defines the actual API that needs to be implemented in the 52is, as it defines the actual API that needs to be implemented in the
57children. 53worker processes.
58 54
59=head1 EXAMPLES 55=head1 EXAMPLES
60 56
61=head1 PARENT USAGE 57=head1 PARENT USAGE
62 58
59To create a pool, you first have to create a L<AnyEvent::Fork> object -
60this object becomes your template process. Whenever a new worker process
61is needed, it is forked from this template process. Then you need to
62"hand off" this template process to the C<AnyEvent::Fork::Pool> module by
63calling its run method on it:
64
65 my $template = AnyEvent::Fork
66 ->new
67 ->require ("SomeModule", "MyWorkerModule");
68
69 my $pool = $template->AnyEvent::Fork::Pool::run ("MyWorkerModule::myfunction");
70
71The pool "object" is not a regular Perl object, but a code reference that
72you can call and that works roughly like calling the worker function
73directly, except that it returns nothing but instead you need to specify a
74callback to be invoked once results are in:
75
76 $pool->(1, 2, 3, sub { warn "myfunction(1,2,3) returned @_" });
77
63=over 4 78=over 4
64 79
65=cut 80=cut
66 81
67package AnyEvent::Fork::Pool; 82package AnyEvent::Fork::Pool;
68 83
69use common::sense; 84use common::sense;
70 85
71use Scalar::Util (); 86use Scalar::Util ();
72 87
88use Guard ();
73use Array::Heap (); 89use Array::Heap ();
74 90
75use AnyEvent; 91use AnyEvent;
76use AnyEvent::Fork; # we don't actually depend on it, this is for convenience 92use AnyEvent::Fork; # we don't actually depend on it, this is for convenience
77use AnyEvent::Fork::RPC; 93use AnyEvent::Fork::RPC;
78 94
95# these are used for the first and last argument of events
96# in the hope of not colliding. yes, I don't like it either,
97# but didn't come up with an obviously better alternative.
79my $magic0 = ':t6Z@HK1N%Dx@_7?=~-7NQgWDdAs6a,jFN=wLO0*jD*1%P'; 98my $magic0 = ':t6Z@HK1N%Dx@_7?=~-7NQgWDdAs6a,jFN=wLO0*jD*1%P';
80my $magic2 = '<~53rexz.U`!]X[A235^"fyEoiTF\T~oH1l/N6+Djep9b~bI9`\1x%B~vWO1q*'; 99my $magic1 = '<~53rexz.U`!]X[A235^"fyEoiTF\T~oH1l/N6+Djep9b~bI9`\1x%B~vWO1q*';
81 100
82our $VERSION = 0.1; 101our $VERSION = 0.1;
83 102
84=item my $rpc = new AnyEvent::Fork::Pool $function, [key => value...] 103=item my $pool = AnyEvent::Fork::Pool::run $fork, $function, [key => value...]
104
105The traditional way to call the pool creation function. But it is way
106cooler to call it in the following way:
107
108=item my $pool = $fork->AnyEvent::Fork::Pool::run ($function, [key => value...])
85 109
86Creates a new pool object with the specified C<$function> as function 110Creates a new pool object with the specified C<$function> as function
87(name) to call for each request. 111(name) to call for each request. The pool uses the C<$fork> object as the
88 112template when creating worker processes.
89A pool consists of a template process that contains the code and data that
90the worker processes need. And a number of worker processes that have been
91forked off of that template process.
92 113
93You can supply your own template process, or tell C<AnyEvent::Fork::Pool> 114You can supply your own template process, or tell C<AnyEvent::Fork::Pool>
94to create one. 115to create one.
95 116
96A relatively large number of key/value pairs can be specified to influence 117A relatively large number of key/value pairs can be specified to influence
101 122
102=item Pool Management 123=item Pool Management
103 124
104The pool consists of a certain number of worker processes. These options 125The pool consists of a certain number of worker processes. These options
105decide how many of these processes exist and when they are started and 126decide how many of these processes exist and when they are started and
106stopp.ed 127stopped.
128
129The worker pool is dynamically resized, according to (perceived :)
130load. The minimum size is given by the C<idle> parameter and the maximum
131size is given by the C<max> parameter. A new worker is started every
132C<start> seconds at most, and an idle worker is stopped at most every
133C<stop> second.
134
135You can specify the amount of jobs sent to a worker concurrently using the
136C<load> parameter.
107 137
108=over 4 138=over 4
109 139
110=item min => $count (default: 0) 140=item idle => $count (default: 0)
111 141
112The minimum number of processes in the pool, in addition to the template 142The minimum amount of idle processes in the pool - when there are fewer
113process. Even when idle, there will never be fewer than this number of 143than this many idle workers, C<AnyEvent::Fork::Pool> will try to start new
114worker processes. The default means that the pool can be empty. 144ones, subject to the limits set by C<max> and C<start>.
145
146This is also the initial amount of workers in the pool. The default of
147zero means that the pool starts empty and can shrink back to zero workers
148over time.
115 149
116=item max => $count (default: 4) 150=item max => $count (default: 4)
117 151
118The maximum number of processes in the pool, in addition to the template 152The maximum number of processes in the pool, in addition to the template
119process. C<AnyEvent::Fork::Pool> will never create more than this number 153process. C<AnyEvent::Fork::Pool> will never have more than this number of
120of processes. 154worker processes, although there can be more temporarily when a worker is
155shut down and hasn't exited yet.
121 156
122=item max_queue => $count (default: 2) 157=item load => $count (default: 2)
123 158
124The maximum number of jobs sent to a single worker process. Worker 159The maximum number of concurrent jobs sent to a single worker process.
125processes that handle this number of jobs already are called "busy".
126 160
127Jobs that cannot be sent to a worker immediately (because all workers are 161Jobs that cannot be sent to a worker immediately (because all workers are
128busy) will be queued until a worker is available. 162busy) will be queued until a worker is available.
129 163
164Setting this low improves latency. For example, at C<1>, every job that
165is sent to a worker is sent to a completely idle worker that doesn't run
166any other jobs. The downside is that throughput is reduced - a worker that
167finishes a job needs to wait for a new job from the parent.
168
169The default of C<2> is usually a good compromise.
170
130=item min_delay => $seconds (default: 0) 171=item start => $seconds (default: 0.1)
131 172
132When a job is queued and all workers are busy, a timer is started. If the 173When there are fewer than C<idle> workers (or all workers are completely
133timer elapses and there are still jobs that cannot be queued to a worker, 174busy), then a timer is started. If the timer elapses and there are still
134a new worker is started. 175jobs that cannot be queued to a worker, a new worker is started.
135 176
136This configurs the time that all workers must be busy before a new worker 177This sets the minimum time that all workers must be busy before a new
137is started. Or, put differently, the minimum delay betwene starting new 178worker is started. Or, put differently, the minimum delay between starting
138workers. 179new workers.
139 180
140The delay is zero by default, which means new workers will be started 181The delay is small by default, which means new workers will be started
141without delay. 182relatively quickly. A delay of C<0> is possible, and ensures that the pool
183will grow as quickly as possible under load.
142 184
143=item min_idle => $count (default: 0) 185Non-zero values are useful to avoid "exploding" a pool because a lot of
186jobs are queued in an instant.
144 187
145The minimum number of idle workers - when they are less, more 188Higher values are often useful to improve efficiency at the cost of
146are started. The C<min_delay> is still respected though, and 189latency - when fewer processes can do the job over time, starting more and
147C<min_idle>/C<min_delay> and C<max_idle>/C<idle_time> are useful to 190more is not necessarily going to help.
148dynamically adjust the pool.
149 191
150=item max_idle => $count (default: 1)
151
152The maximum number of idle workers. If a worker becomes idle and there are
153already this many idle workers, it will be stopped immediately instead of
154waiting for the idle timer to elapse.
155
156=item idle_time => $seconds (default: 1) 192=item stop => $seconds (default: 10)
157 193
158When a worker has no jobs to execute it becomes idle. An idle worker that 194When a worker has no jobs to execute it becomes idle. An idle worker that
159hasn't executed a job within this amount of time will be stopped, unless 195hasn't executed a job within this amount of time will be stopped, unless
160the other parameters say otherwise. 196the other parameters say otherwise.
161 197
198Setting this to a very high value means that workers stay around longer,
199even when they have nothing to do, which can be good as they don't have to
200be started on the netx load spike again.
201
202Setting this to a lower value can be useful to avoid memory or simply
203process table wastage.
204
205Usually, setting this to a time longer than the time between load spikes
206is best - if you expect a lot of requests every minute and little work
207in between, setting this to longer than a minute avoids having to stop
208and start workers. On the other hand, you have to ask yourself if letting
209workers run idle is a good use of your resources. Try to find a good
210balance between resource usage of your workers and the time to start new
211workers - the processes created by L<AnyEvent::Fork> itself is fats at
212creating workers while not using much memory for them, so most of the
213overhead is likely from your own code.
214
162=item on_destroy => $callback->() (default: none) 215=item on_destroy => $callback->() (default: none)
163 216
164When a pool object goes out of scope, it will still handle all outstanding 217When a pool object goes out of scope, the outstanding requests are still
165jobs. After that, it will destroy all workers (and also the template 218handled till completion. Only after handling all jobs will the workers
166process if it isn't referenced otherwise). 219be destroyed (and also the template process if it isn't referenced
220otherwise).
221
222To find out when a pool I<really> has finished its work, you can set this
223callback, which will be called when the pool has been destroyed.
167 224
168=back 225=back
169 226
170=item Template Process 227=item AnyEvent::Fork::RPC Parameters
171 228
172The worker processes are all forked from a single template 229These parameters are all passed more or less directly to
173process. Ideally, all modules and all cdoe used by the worker, as well as 230L<AnyEvent::Fork::RPC>. They are only briefly mentioned here, for
174any shared data structures should be loaded into the template process, to 231their full documentation please refer to the L<AnyEvent::Fork::RPC>
175take advantage of data sharing via fork. 232documentation. Also, the default values mentioned here are only documented
176 233as a best effort - the L<AnyEvent::Fork::RPC> documentation is binding.
177You can create your own template process by creating a L<AnyEvent::Fork>
178object yourself and passing it as the C<template> parameter, but
179C<AnyEvent::Fork::Pool> can create one for you, including some standard
180options.
181 234
182=over 4 235=over 4
183 236
184=item template => $fork (default: C<< AnyEvent::Fork->new >>) 237=item async => $boolean (default: 0)
185 238
186The template process to use, if you want to create your own. 239Whether to use the synchronous or asynchronous RPC backend.
187 240
188=item require => \@modules (default: C<[]>) 241=item on_error => $callback->($message) (default: die with message)
189 242
190The modules in this list will be laoded into the template process. 243The callback to call on any (fatal) errors.
191 244
192=item eval => "# perl code to execute in template" (default: none) 245=item on_event => $callback->(...) (default: C<sub { }>, unlike L<AnyEvent::Fork::RPC>)
193 246
194This is a perl string that is evaluated after creating the template 247The callback to invoke on events.
195process and after requiring the modules. It can do whatever it wants to 248
196configure the process, but it must not do anything that would keep a later 249=item init => $initfunction (default: none)
197fork from working (so must not create event handlers or (real) threads for 250
198example). 251The function to call in the child, once before handling requests.
252
253=item serialiser => $serialiser (defailt: $AnyEvent::Fork::RPC::STRING_SERIALISER)
254
255The serialiser to use.
199 256
200=back 257=back
201 258
202=item AnyEvent::Fork::RPC Parameters
203
204These parameters are all passed directly to L<AnyEvent::Fork::RPC>. They
205are only briefly mentioned here, for their full documentation
206please refer to the L<AnyEvent::Fork::RPC> documentation. Also, the
207default values mentioned here are only documented as a best effort -
208L<AnyEvent::Fork::RPC> documentation is binding.
209
210=over 4
211
212=item async => $boolean (default: 0)
213
214Whether to sue the synchronous or asynchronous RPC backend.
215
216=item on_error => $callback->($message) (default: die with message)
217
218The callback to call on any (fatal) errors.
219
220=item on_event => $callback->(...) (default: C<sub { }>, unlike L<AnyEvent::Fork::RPC>)
221
222The callback to invoke on events.
223
224=item init => $initfunction (default: none)
225
226The function to call in the child, once before handling requests.
227
228=item serialiser => $serialiser (defailt: $AnyEvent::Fork::RPC::STRING_SERIALISER)
229
230The serialiser to use.
231
232=back 259=back
233 260
234=back
235
236=cut 261=cut
237 262
238sub new { 263sub run {
239 my ($class, $function, %arg) = @_; 264 my ($template, $function, %arg) = @_;
240 265
241 my $self = bless { 266 my $max = $arg{max} || 4;
242 min => 0, 267 my $idle = $arg{idle} || 0,
243 max => 4, 268 my $load = $arg{load} || 2,
244 max_queue => 2, 269 my $start = $arg{start} || 0.1,
245 min_delay => 0, 270 my $stop = $arg{stop} || 10,
246 max_idle => 1, 271 my $on_event = $arg{on_event} || sub { },
247 idle_time => 1, 272 my $on_destroy = $arg{on_destroy};
248 on_event => sub { },
249 %arg,
250 pool => [],
251 queue => [],
252 }, $class;
253 273
254 $self->{function} = $function; 274 my @rpc = (
275 async => $arg{async},
276 init => $arg{init},
277 serialiser => delete $arg{serialiser},
278 on_error => $arg{on_error},
279 );
255 280
256 ($self->{template} ||= new AnyEvent::Fork) 281 my (@pool, @queue, $nidle, $start_w, $stop_w, $shutdown);
282 my ($start_worker, $stop_worker, $want_start, $want_stop, $scheduler);
283
284 my $destroy_guard = Guard::guard {
285 $on_destroy->()
286 if $on_destroy;
287 };
288
289 $template
257 ->require ("AnyEvent::Fork::RPC::" . ($self->{async} ? "Async" : "Sync")) 290 ->require ("AnyEvent::Fork::RPC::" . ($arg{async} ? "Async" : "Sync"))
258 ->require (@{ delete $self->{require} })
259 ->eval (' 291 ->eval ('
260 my ($magic0, $magic2) = @_; 292 my ($magic0, $magic1) = @_;
261 sub AnyEvent::Fork::Pool::quit() { 293 sub AnyEvent::Fork::Pool::retire() {
262 AnyEvent::Fork::RPC::on_event $magic0, "quit", $magic2; 294 AnyEvent::Fork::RPC::event $magic0, "quit", $magic1;
263 } 295 }
264 ', $magic0, $magic2) 296 ', $magic0, $magic1)
265 ->eval (delete $self->{eval}); 297 ;
266 298
267 $self->start 299 $start_worker = sub {
268 while @{ $self->{pool} } < $self->{min}; 300 my $proc = [0, 0, undef]; # load, index, rpc
269 301
270 $self 302 $proc->[2] = $template
303 ->fork
304 ->AnyEvent::Fork::RPC::run ($function,
305 @rpc,
306 on_event => sub {
307 if (@_ == 3 && $_[0] eq $magic0 && $_[2] eq $magic1) {
308 $destroy_guard if 0; # keep it alive
309
310 $_[1] eq "quit" and $stop_worker->($proc);
311 return;
312 }
313
314 &$on_event;
315 },
316 )
317 ;
318
319 ++$nidle;
320 Array::Heap::push_heap_idx @pool, $proc;
321
322 Scalar::Util::weaken $proc;
323 };
324
325 $stop_worker = sub {
326 my $proc = shift;
327
328 $proc->[0]
329 or --$nidle;
330
331 Array::Heap::splice_heap_idx @pool, $proc->[1]
332 if defined $proc->[1];
333 };
334
335 $want_start = sub {
336 undef $stop_w;
337
338 $start_w ||= AE::timer $start, $start, sub {
339 if (($nidle < $idle || @queue) && @pool < $max) {
340 $start_worker->();
341 $scheduler->();
342 } else {
343 undef $start_w;
344 }
345 };
346 };
347
348 $want_stop = sub {
349 $stop_w ||= AE::timer $stop, $stop, sub {
350 $stop_worker->($pool[0])
351 if $nidle;
352
353 undef $stop_w
354 if $nidle <= $idle;
355 };
356 };
357
358 $scheduler = sub {
359 if (@queue) {
360 while (@queue) {
361 my $proc = $pool[0];
362
363 if ($proc->[0] < $load) {
364 # found free worker, increase load
365 unless ($proc->[0]++) {
366 # worker became busy
367 --$nidle
368 or undef $stop_w;
369
370 $want_start->()
371 if $nidle < $idle && @pool < $max;
372 }
373
374 Array::Heap::adjust_heap_idx @pool, 0;
375
376 my $job = shift @queue;
377 my $ocb = pop @$job;
378
379 $proc->[2]->(@$job, sub {
380 # reduce load
381 --$proc->[0] # worker still busy?
382 or ++$nidle > $idle # not too many idle processes?
383 or $want_stop->();
384
385 Array::Heap::adjust_heap_idx @pool, $proc->[1]
386 if defined $proc->[1];
387
388 $scheduler->();
389
390 &$ocb;
391 });
392 } else {
393 $want_start->()
394 unless @pool >= $max;
395
396 last;
397 }
398 }
399 } elsif ($shutdown) {
400 @pool = ();
401 undef $start_w;
402 undef $start_worker; # frees $destroy_guard reference
403
404 $stop_worker->($pool[0])
405 while $nidle;
406 }
407 };
408
409 my $shutdown_guard = Guard::guard {
410 $shutdown = 1;
411 $scheduler->();
412 };
413
414 $start_worker->()
415 while @pool < $idle;
416
417 sub {
418 $shutdown_guard if 0; # keep it alive
419
420 $start_worker->()
421 unless @pool;
422
423 push @queue, [@_];
424 $scheduler->();
425 }
271} 426}
272 427
273sub start {
274 my ($self) = @_;
275
276 warn "start\n";#d#
277
278 Scalar::Util::weaken $self;
279
280 my $proc = [0, undef, undef];
281
282 $proc->[1] = $self->{template}
283 ->fork
284 ->AnyEvent::Fork::RPC::run ($self->{function},
285 async => $self->{async},
286 init => $self->{init},
287 serialiser => $self->{serialiser},
288 on_error => $self->{on_error},
289 on_event => sub {
290 if (@_ == 3 && $_[0] eq $magic0 && $_[2] eq $magic2) {
291 if ($_[1] eq "quit") {
292 my $pool = $self->{pool};
293 for (0 .. $#$pool) {
294 if ($pool->[$_] == $proc) {
295 Array::Heap::splice_heap @$pool, $_;
296 return;
297 }
298 }
299 die;
300 }
301 return;
302 }
303
304 &{ $self->{on_event} };
305 },
306 )
307 ;
308
309 ++$self->{idle};
310 Array::Heap::push_heap @{ $self->{pool} }, $proc;
311}
312
313=item $pool->call (..., $cb->(...)) 428=item $pool->(..., $cb->(...))
314 429
315Call the RPC function of a worker with the given arguments, and when the 430Call the RPC function of a worker with the given arguments, and when the
316worker is done, call the C<$cb> with the results, like just calling the 431worker is done, call the C<$cb> with the results, just like calling the
317L<AnyEvent::Fork::RPC> object directly. 432RPC object durectly - see the L<AnyEvent::Fork::RPC> documentation for
433details on the RPC API.
318 434
319If there is no free worker, the call will be queued. 435If there is no free worker, the call will be queued until a worker becomes
436available.
320 437
321Note that there can be considerable time between calling this method and 438Note that there can be considerable time between calling this method and
322the call actually being executed. During this time, the parameters passed 439the call actually being executed. During this time, the parameters passed
323to this function are effectively read-only - modifying them after the call 440to this function are effectively read-only - modifying them after the call
324and before the callback is invoked causes undefined behaviour. 441and before the callback is invoked causes undefined behaviour.
325 442
326=cut 443=cut
327 444
328sub scheduler { 445=back
329 my $self = shift;
330 446
331 my $pool = $self->{pool}; 447=head1 CHILD USAGE
332 my $queue = $self->{queue};
333 448
334 $self->start 449In addition to the L<AnyEvent::Fork::RPC> API, this module implements one
335 unless @$pool; 450more child-side function:
336 451
337 while (@$queue) { 452=over 4
338 my $proc = $pool->[0];
339 453
340 if ($proc->[0] < $self->{max_queue}) { 454=item AnyEvent::Fork::Pool::retire ()
341 warn "free $proc $proc->[0]\n";#d#
342 # found free worker
343 --$self->{idle}
344 unless $proc->[0]++;
345 455
346 undef $proc->[2]; 456This function sends an event to the parent process to request retirement:
457the worker is removed from the pool and no new jobs will be sent to it,
458but it has to handle the jobs that are already queued.
347 459
348 Array::Heap::adjust_heap @$pool, 0; 460The parentheses are part of the syntax: the function usually isn't defined
461when you compile your code (because that happens I<before> handing the
462template process over to C<AnyEvent::Fork::Pool::run>, so you need the
463empty parentheses to tell Perl that the function is indeed a function.
349 464
350 my $job = shift @$queue; 465Retiring a worker can be useful to gracefully shut it down when the worker
351 my $ocb = pop @$job; 466deems this useful. For example, after executing a job, one could check
467the process size or the number of jobs handled so far, and if either is
468too high, the worker could ask to get retired, to avoid memory leaks to
469accumulate.
352 470
353 $proc->[1]->(@$job, sub { 471=back
354 for (0 .. $#$pool) {
355 if ($pool->[$_] == $proc) {
356 # reduce queue counter
357 unless (--$pool->[$_][0]) {
358 # worker becomes idle
359 my $to = ++$self->{idle} > $self->{max_idle}
360 ? 0
361 : $self->{idle_time};
362 472
363 $proc->[2] = AE::timer $to, 0, sub { 473=head1 POOL PARAMETERS RECIPES
364 undef $proc->[2];
365 474
366 warn "destroy $proc afzer $to\n";#d# 475This section describes some recipes for pool paramaters. These are mostly
476meant for the synchronous RPC backend, as the asynchronous RPC backend
477changes the rules considerably, making workers themselves responsible for
478their scheduling.
367 479
368 for (0 .. $#$pool) { 480=over 4
369 if ($pool->[$_] == $proc) {
370 Array::Heap::splice_heap @$pool, $_;
371 --$self->{idle};
372 last;
373 }
374 }
375 };
376 }
377 481
378 Array::Heap::adjust_heap @$pool, $_; 482=item low latency - set load = 1
379 last;
380 }
381 }
382 &$ocb;
383 });
384 } else {
385 warn "busy $proc->[0]\n";#d#
386 # all busy, delay
387 483
388 $self->{min_delay_w} ||= AE::timer $self->{min_delay}, 0, sub { 484If you need a deterministic low latency, you should set the C<load>
389 delete $self->{min_delay_w}; 485parameter to C<1>. This ensures that never more than one job is sent to
486each worker. This avoids having to wait for a previous job to finish.
390 487
391 if (@{ $self->{queue} }) { 488This makes most sense with the synchronous (default) backend, as the
392 $self->start; 489asynchronous backend can handle multiple requests concurrently.
393 $self->scheduler;
394 }
395 };
396 last;
397 }
398 }
399 warn "last\n";#d#
400}
401 490
402sub call { 491=item lowest latency - set load = 1 and idle = max
403 my $self = shift;
404 492
405 push @{ $self->{queue} }, [@_]; 493To achieve the lowest latency, you additionally should disable any dynamic
406 $self->scheduler; 494resizing of the pool by setting C<idle> to the same value as C<max>.
407}
408 495
409sub DESTROY { 496=item high throughput, cpu bound jobs - set load >= 2, max = #cpus
410 $_[0]{on_destroy}->(); 497
411} 498To get high throughput with cpu-bound jobs, you should set the maximum
499pool size to the number of cpus in your system, and C<load> to at least
500C<2>, to make sure there can be another job waiting for the worker when it
501has finished one.
502
503The value of C<2> for C<load> is the minimum value that I<can> achieve
504100% throughput, but if your parent process itself is sometimes busy, you
505might need higher values. Also there is a limit on the amount of data that
506can be "in flight" to the worker, so if you send big blobs of data to your
507worker, C<load> might have much less of an effect.
508
509=item high throughput, I/O bound jobs - set load >= 2, max = 1, or very high
510
511When your jobs are I/O bound, using more workers usually boils down to
512higher throughput, depending very much on your actual workload - sometimes
513having only one worker is best, for example, when you read or write big
514files at maixmum speed, as a second worker will increase seek times.
412 515
413=back 516=back
414 517
415=head1 SEE ALSO 518=head1 SEE ALSO
416 519

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines