ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-Fork-Pool/Pool.pm
(Generate patch)

Comparing AnyEvent-Fork-Pool/Pool.pm (file contents):
Revision 1.2 by root, Thu Apr 18 21:37:27 2013 UTC vs.
Revision 1.12 by root, Sun Apr 28 14:26:11 2013 UTC

6 6
7 use AnyEvent; 7 use AnyEvent;
8 use AnyEvent::Fork::Pool; 8 use AnyEvent::Fork::Pool;
9 # use AnyEvent::Fork is not needed 9 # use AnyEvent::Fork is not needed
10 10
11 # all parameters with default values 11 # all possible parameters shown, with default values
12 my $pool = new AnyEvent::Fork::Pool 12 my $pool = AnyEvent::Fork
13 "MyWorker::run", 13 ->new
14 ->require ("MyWorker")
15 ->AnyEvent::Fork::Pool::run (
16 "MyWorker::run", # the worker function
14 17
15 # pool management 18 # pool management
16 min => 0, # minimum # of processes
17 max => 4, # maximum # of processes 19 max => 4, # absolute maximum # of processes
18 max_queue => 2, # queue at most this number of jobs per process 20 idle => 0, # minimum # of idle processes
21 load => 2, # queue at most this number of jobs per process
19 min_delay => 0, # wait this many seconds before starting a new process 22 start => 0.1, # wait this many seconds before starting a new process
20 min_idle => 0, # try to have at least this amount of idle processes
21 max_idle => 1, # at most this many idle processes
22 idle_time => 1, # wait this many seconds before killing an idle process 23 stop => 10, # wait this many seconds before stopping an idle process
23 on_destroy => (my $finish = AE::cv), 24 on_destroy => (my $finish = AE::cv), # called when object is destroyed
24 25
25 # template process
26 template => AnyEvent::Fork->new, # the template process to use
27 require => [MyWorker::], # module(s) to load
28 eval => "# perl code to execute in template",
29
30 # parameters passed to AnyEvent::Fork::RPC 26 # parameters passed to AnyEvent::Fork::RPC
31 async => 0, 27 async => 0,
32 on_error => sub { die "FATAL: $_[0]\n" }, 28 on_error => sub { die "FATAL: $_[0]\n" },
33 on_event => sub { my @ev = @_ }, 29 on_event => sub { my @ev = @_ },
34 init => "MyWorker::init", 30 init => "MyWorker::init",
35 serialiser => $AnyEvent::Fork::RPC::STRING_SERIALISER, 31 serialiser => $AnyEvent::Fork::RPC::STRING_SERIALISER,
36 ; 32 );
37 33
38 for (1..10) { 34 for (1..10) {
39 $pool->call (doit => $_, sub { 35 $pool->(doit => $_, sub {
40 print "MyWorker::run returned @_\n"; 36 print "MyWorker::run returned @_\n";
41 }); 37 });
42 } 38 }
43 39
44 undef $pool; 40 undef $pool;
45 41
46 $finish->recv; 42 $finish->recv;
47 43
48=head1 DESCRIPTION 44=head1 DESCRIPTION
49 45
50This module uses processes created via L<AnyEvent::Fork> and the RPC 46This module uses processes created via L<AnyEvent::Fork> (or
51protocol implement in L<AnyEvent::Fork::RPC> to create a load-balanced 47L<AnyEvent::Fork::Remote>) and the RPC protocol implement in
52pool of processes that handles jobs. 48L<AnyEvent::Fork::RPC> to create a load-balanced pool of processes that
49handles jobs.
53 50
54Understanding of L<AnyEvent::Fork> is helpful but not critical to be able 51Understanding of L<AnyEvent::Fork> is helpful but not critical to be able
55to use this module, but a thorough understanding of L<AnyEvent::Fork::RPC> 52to use this module, but a thorough understanding of L<AnyEvent::Fork::RPC>
56is, as it defines the actual API that needs to be implemented in the 53is, as it defines the actual API that needs to be implemented in the
57children. 54worker processes.
58
59=head1 EXAMPLES
60 55
61=head1 PARENT USAGE 56=head1 PARENT USAGE
62 57
58To create a pool, you first have to create a L<AnyEvent::Fork> object -
59this object becomes your template process. Whenever a new worker process
60is needed, it is forked from this template process. Then you need to
61"hand off" this template process to the C<AnyEvent::Fork::Pool> module by
62calling its run method on it:
63
64 my $template = AnyEvent::Fork
65 ->new
66 ->require ("SomeModule", "MyWorkerModule");
67
68 my $pool = $template->AnyEvent::Fork::Pool::run ("MyWorkerModule::myfunction");
69
70The pool "object" is not a regular Perl object, but a code reference that
71you can call and that works roughly like calling the worker function
72directly, except that it returns nothing but instead you need to specify a
73callback to be invoked once results are in:
74
75 $pool->(1, 2, 3, sub { warn "myfunction(1,2,3) returned @_" });
76
63=over 4 77=over 4
64 78
65=cut 79=cut
66 80
67package AnyEvent::Fork::Pool; 81package AnyEvent::Fork::Pool;
68 82
69use common::sense; 83use common::sense;
70 84
71use Scalar::Util (); 85use Scalar::Util ();
72 86
87use Guard ();
73use Array::Heap (); 88use Array::Heap ();
74 89
75use AnyEvent; 90use AnyEvent;
91# explicit version on next line, as some cpan-testers test with the 0.1 version,
92# ignoring dependencies, and this line will at least give a clear indication of that.
76use AnyEvent::Fork; # we don't actually depend on it, this is for convenience 93use AnyEvent::Fork 0.6; # we don't actually depend on it, this is for convenience
77use AnyEvent::Fork::RPC; 94use AnyEvent::Fork::RPC;
78 95
96# these are used for the first and last argument of events
97# in the hope of not colliding. yes, I don't like it either,
98# but didn't come up with an obviously better alternative.
79my $magic0 = ':t6Z@HK1N%Dx@_7?=~-7NQgWDdAs6a,jFN=wLO0*jD*1%P'; 99my $magic0 = ':t6Z@HK1N%Dx@_7?=~-7NQgWDdAs6a,jFN=wLO0*jD*1%P';
80my $magic2 = '<~53rexz.U`!]X[A235^"fyEoiTF\T~oH1l/N6+Djep9b~bI9`\1x%B~vWO1q*'; 100my $magic1 = '<~53rexz.U`!]X[A235^"fyEoiTF\T~oH1l/N6+Djep9b~bI9`\1x%B~vWO1q*';
81 101
82our $VERSION = 0.1; 102our $VERSION = 1.1;
83 103
84=item my $rpc = new AnyEvent::Fork::Pool $function, [key => value...] 104=item my $pool = AnyEvent::Fork::Pool::run $fork, $function, [key => value...]
105
106The traditional way to call the pool creation function. But it is way
107cooler to call it in the following way:
108
109=item my $pool = $fork->AnyEvent::Fork::Pool::run ($function, [key => value...])
85 110
86Creates a new pool object with the specified C<$function> as function 111Creates a new pool object with the specified C<$function> as function
87(name) to call for each request. 112(name) to call for each request. The pool uses the C<$fork> object as the
88 113template when creating worker processes.
89A pool consists of a template process that contains the code and data that
90the worker processes need. And a number of worker processes that have been
91forked off of that template process.
92 114
93You can supply your own template process, or tell C<AnyEvent::Fork::Pool> 115You can supply your own template process, or tell C<AnyEvent::Fork::Pool>
94to create one. 116to create one.
95 117
96A relatively large number of key/value pairs can be specified to influence 118A relatively large number of key/value pairs can be specified to influence
101 123
102=item Pool Management 124=item Pool Management
103 125
104The pool consists of a certain number of worker processes. These options 126The pool consists of a certain number of worker processes. These options
105decide how many of these processes exist and when they are started and 127decide how many of these processes exist and when they are started and
106stopp.ed 128stopped.
129
130The worker pool is dynamically resized, according to (perceived :)
131load. The minimum size is given by the C<idle> parameter and the maximum
132size is given by the C<max> parameter. A new worker is started every
133C<start> seconds at most, and an idle worker is stopped at most every
134C<stop> second.
135
136You can specify the amount of jobs sent to a worker concurrently using the
137C<load> parameter.
107 138
108=over 4 139=over 4
109 140
110=item min => $count (default: 0) 141=item idle => $count (default: 0)
111 142
112The minimum number of processes in the pool, in addition to the template 143The minimum amount of idle processes in the pool - when there are fewer
113process. Even when idle, there will never be fewer than this number of 144than this many idle workers, C<AnyEvent::Fork::Pool> will try to start new
114worker processes. The default means that the pool can be empty. 145ones, subject to the limits set by C<max> and C<start>.
146
147This is also the initial amount of workers in the pool. The default of
148zero means that the pool starts empty and can shrink back to zero workers
149over time.
115 150
116=item max => $count (default: 4) 151=item max => $count (default: 4)
117 152
118The maximum number of processes in the pool, in addition to the template 153The maximum number of processes in the pool, in addition to the template
119process. C<AnyEvent::Fork::Pool> will never create more than this number 154process. C<AnyEvent::Fork::Pool> will never have more than this number of
120of processes. 155worker processes, although there can be more temporarily when a worker is
156shut down and hasn't exited yet.
121 157
122=item max_queue => $count (default: 2) 158=item load => $count (default: 2)
123 159
124The maximum number of jobs sent to a single worker process. Worker 160The maximum number of concurrent jobs sent to a single worker process.
125processes that handle this number of jobs already are called "busy".
126 161
127Jobs that cannot be sent to a worker immediately (because all workers are 162Jobs that cannot be sent to a worker immediately (because all workers are
128busy) will be queued until a worker is available. 163busy) will be queued until a worker is available.
129 164
165Setting this low improves latency. For example, at C<1>, every job that
166is sent to a worker is sent to a completely idle worker that doesn't run
167any other jobs. The downside is that throughput is reduced - a worker that
168finishes a job needs to wait for a new job from the parent.
169
170The default of C<2> is usually a good compromise.
171
130=item min_delay => $seconds (default: 0) 172=item start => $seconds (default: 0.1)
131 173
132When a job is queued and all workers are busy, a timer is started. If the 174When there are fewer than C<idle> workers (or all workers are completely
133timer elapses and there are still jobs that cannot be queued to a worker, 175busy), then a timer is started. If the timer elapses and there are still
134a new worker is started. 176jobs that cannot be queued to a worker, a new worker is started.
135 177
136This configurs the time that all workers must be busy before a new worker 178This sets the minimum time that all workers must be busy before a new
137is started. Or, put differently, the minimum delay betwene starting new 179worker is started. Or, put differently, the minimum delay between starting
138workers. 180new workers.
139 181
140The delay is zero by default, which means new workers will be started 182The delay is small by default, which means new workers will be started
141without delay. 183relatively quickly. A delay of C<0> is possible, and ensures that the pool
184will grow as quickly as possible under load.
142 185
143=item min_idle => $count (default: 0) 186Non-zero values are useful to avoid "exploding" a pool because a lot of
187jobs are queued in an instant.
144 188
145The minimum number of idle workers - when they are less, more 189Higher values are often useful to improve efficiency at the cost of
146are started. The C<min_delay> is still respected though, and 190latency - when fewer processes can do the job over time, starting more and
147C<min_idle>/C<min_delay> and C<max_idle>/C<idle_time> are useful to 191more is not necessarily going to help.
148dynamically adjust the pool.
149 192
150=item max_idle => $count (default: 1)
151
152The maximum number of idle workers. If a worker becomes idle and there are
153already this many idle workers, it will be stopped immediately instead of
154waiting for the idle timer to elapse.
155
156=item idle_time => $seconds (default: 1) 193=item stop => $seconds (default: 10)
157 194
158When a worker has no jobs to execute it becomes idle. An idle worker that 195When a worker has no jobs to execute it becomes idle. An idle worker that
159hasn't executed a job within this amount of time will be stopped, unless 196hasn't executed a job within this amount of time will be stopped, unless
160the other parameters say otherwise. 197the other parameters say otherwise.
161 198
199Setting this to a very high value means that workers stay around longer,
200even when they have nothing to do, which can be good as they don't have to
201be started on the netx load spike again.
202
203Setting this to a lower value can be useful to avoid memory or simply
204process table wastage.
205
206Usually, setting this to a time longer than the time between load spikes
207is best - if you expect a lot of requests every minute and little work
208in between, setting this to longer than a minute avoids having to stop
209and start workers. On the other hand, you have to ask yourself if letting
210workers run idle is a good use of your resources. Try to find a good
211balance between resource usage of your workers and the time to start new
212workers - the processes created by L<AnyEvent::Fork> itself is fats at
213creating workers while not using much memory for them, so most of the
214overhead is likely from your own code.
215
162=item on_destroy => $callback->() (default: none) 216=item on_destroy => $callback->() (default: none)
163 217
164When a pool object goes out of scope, it will still handle all outstanding 218When a pool object goes out of scope, the outstanding requests are still
165jobs. After that, it will destroy all workers (and also the template 219handled till completion. Only after handling all jobs will the workers
166process if it isn't referenced otherwise). 220be destroyed (and also the template process if it isn't referenced
221otherwise).
222
223To find out when a pool I<really> has finished its work, you can set this
224callback, which will be called when the pool has been destroyed.
167 225
168=back 226=back
169 227
170=item Template Process 228=item AnyEvent::Fork::RPC Parameters
171 229
172The worker processes are all forked from a single template 230These parameters are all passed more or less directly to
173process. Ideally, all modules and all cdoe used by the worker, as well as 231L<AnyEvent::Fork::RPC>. They are only briefly mentioned here, for
174any shared data structures should be loaded into the template process, to 232their full documentation please refer to the L<AnyEvent::Fork::RPC>
175take advantage of data sharing via fork. 233documentation. Also, the default values mentioned here are only documented
176 234as a best effort - the L<AnyEvent::Fork::RPC> documentation is binding.
177You can create your own template process by creating a L<AnyEvent::Fork>
178object yourself and passing it as the C<template> parameter, but
179C<AnyEvent::Fork::Pool> can create one for you, including some standard
180options.
181 235
182=over 4 236=over 4
183 237
184=item template => $fork (default: C<< AnyEvent::Fork->new >>) 238=item async => $boolean (default: 0)
185 239
186The template process to use, if you want to create your own. 240Whether to use the synchronous or asynchronous RPC backend.
187 241
188=item require => \@modules (default: C<[]>) 242=item on_error => $callback->($message) (default: die with message)
189 243
190The modules in this list will be laoded into the template process. 244The callback to call on any (fatal) errors.
191 245
192=item eval => "# perl code to execute in template" (default: none) 246=item on_event => $callback->(...) (default: C<sub { }>, unlike L<AnyEvent::Fork::RPC>)
193 247
194This is a perl string that is evaluated after creating the template 248The callback to invoke on events.
195process and after requiring the modules. It can do whatever it wants to 249
196configure the process, but it must not do anything that would keep a later 250=item init => $initfunction (default: none)
197fork from working (so must not create event handlers or (real) threads for 251
198example). 252The function to call in the child, once before handling requests.
253
254=item serialiser => $serialiser (defailt: $AnyEvent::Fork::RPC::STRING_SERIALISER)
255
256The serialiser to use.
199 257
200=back 258=back
201 259
202=item AnyEvent::Fork::RPC Parameters
203
204These parameters are all passed directly to L<AnyEvent::Fork::RPC>. They
205are only briefly mentioned here, for their full documentation
206please refer to the L<AnyEvent::Fork::RPC> documentation. Also, the
207default values mentioned here are only documented as a best effort -
208L<AnyEvent::Fork::RPC> documentation is binding.
209
210=over 4
211
212=item async => $boolean (default: 0)
213
214Whether to sue the synchronous or asynchronous RPC backend.
215
216=item on_error => $callback->($message) (default: die with message)
217
218The callback to call on any (fatal) errors.
219
220=item on_event => $callback->(...) (default: C<sub { }>, unlike L<AnyEvent::Fork::RPC>)
221
222The callback to invoke on events.
223
224=item init => $initfunction (default: none)
225
226The function to call in the child, once before handling requests.
227
228=item serialiser => $serialiser (defailt: $AnyEvent::Fork::RPC::STRING_SERIALISER)
229
230The serialiser to use.
231
232=back 260=back
233 261
234=back
235
236=cut 262=cut
237 263
238sub new { 264sub run {
239 my ($class, $function, %arg) = @_; 265 my ($template, $function, %arg) = @_;
240 266
241 my $self = bless { 267 my $max = $arg{max} || 4;
242 min => 0, 268 my $idle = $arg{idle} || 0,
243 max => 4, 269 my $load = $arg{load} || 2,
244 max_queue => 2, 270 my $start = $arg{start} || 0.1,
245 min_delay => 0, 271 my $stop = $arg{stop} || 10,
246 max_idle => 1, 272 my $on_event = $arg{on_event} || sub { },
247 idle_time => 1, 273 my $on_destroy = $arg{on_destroy};
248 on_event => sub { },
249 %arg,
250 pool => [],
251 queue => [],
252 }, $class;
253 274
254 $self->{function} = $function; 275 my @rpc = (
276 async => $arg{async},
277 init => $arg{init},
278 serialiser => delete $arg{serialiser},
279 on_error => $arg{on_error},
280 );
255 281
256 ($self->{template} ||= new AnyEvent::Fork) 282 my (@pool, @queue, $nidle, $start_w, $stop_w, $shutdown);
283 my ($start_worker, $stop_worker, $want_start, $want_stop, $scheduler);
284
285 my $destroy_guard = Guard::guard {
286 $on_destroy->()
287 if $on_destroy;
288 };
289
290 $template
257 ->require ("AnyEvent::Fork::RPC::" . ($self->{async} ? "Async" : "Sync")) 291 ->require ("AnyEvent::Fork::RPC::" . ($arg{async} ? "Async" : "Sync"))
258 ->require (@{ delete $self->{require} })
259 ->eval (' 292 ->eval ('
260 my ($magic0, $magic2) = @_; 293 my ($magic0, $magic1) = @_;
261 sub AnyEvent::Fork::Pool::quit() { 294 sub AnyEvent::Fork::Pool::retire() {
262 AnyEvent::Fork::RPC::on_event $magic0, "quit", $magic2; 295 AnyEvent::Fork::RPC::event $magic0, "quit", $magic1;
263 } 296 }
264 ', $magic0, $magic2) 297 ', $magic0, $magic1)
265 ->eval (delete $self->{eval}); 298 ;
266 299
267 $self->start 300 $start_worker = sub {
268 while @{ $self->{pool} } < $self->{min}; 301 my $proc = [0, 0, undef]; # load, index, rpc
269 302
270 $self 303 $proc->[2] = $template
304 ->fork
305 ->AnyEvent::Fork::RPC::run ($function,
306 @rpc,
307 on_event => sub {
308 if (@_ == 3 && $_[0] eq $magic0 && $_[2] eq $magic1) {
309 $destroy_guard if 0; # keep it alive
310
311 $_[1] eq "quit" and $stop_worker->($proc);
312 return;
313 }
314
315 &$on_event;
316 },
317 )
318 ;
319
320 ++$nidle;
321 Array::Heap::push_heap_idx @pool, $proc;
322
323 Scalar::Util::weaken $proc;
324 };
325
326 $stop_worker = sub {
327 my $proc = shift;
328
329 $proc->[0]
330 or --$nidle;
331
332 Array::Heap::splice_heap_idx @pool, $proc->[1]
333 if defined $proc->[1];
334
335 @$proc = 0; # tell others to leave it be
336 };
337
338 $want_start = sub {
339 undef $stop_w;
340
341 $start_w ||= AE::timer $start, $start, sub {
342 if (($nidle < $idle || @queue) && @pool < $max) {
343 $start_worker->();
344 $scheduler->();
345 } else {
346 undef $start_w;
347 }
348 };
349 };
350
351 $want_stop = sub {
352 $stop_w ||= AE::timer $stop, $stop, sub {
353 $stop_worker->($pool[0])
354 if $nidle;
355
356 undef $stop_w
357 if $nidle <= $idle;
358 };
359 };
360
361 $scheduler = sub {
362 if (@queue) {
363 while (@queue) {
364 @pool or $start_worker->();
365
366 my $proc = $pool[0];
367
368 if ($proc->[0] < $load) {
369 # found free worker, increase load
370 unless ($proc->[0]++) {
371 # worker became busy
372 --$nidle
373 or undef $stop_w;
374
375 $want_start->()
376 if $nidle < $idle && @pool < $max;
377 }
378
379 Array::Heap::adjust_heap_idx @pool, 0;
380
381 my $job = shift @queue;
382 my $ocb = pop @$job;
383
384 $proc->[2]->(@$job, sub {
385 # reduce load
386 --$proc->[0] # worker still busy?
387 or ++$nidle > $idle # not too many idle processes?
388 or $want_stop->();
389
390 Array::Heap::adjust_heap_idx @pool, $proc->[1]
391 if defined $proc->[1];
392
393 &$ocb;
394
395 $scheduler->();
396 });
397 } else {
398 $want_start->()
399 unless @pool >= $max;
400
401 last;
402 }
403 }
404 } elsif ($shutdown) {
405 @pool = ();
406 undef $start_w;
407 undef $start_worker; # frees $destroy_guard reference
408
409 $stop_worker->($pool[0])
410 while $nidle;
411 }
412 };
413
414 my $shutdown_guard = Guard::guard {
415 $shutdown = 1;
416 $scheduler->();
417 };
418
419 $start_worker->()
420 while @pool < $idle;
421
422 sub {
423 $shutdown_guard if 0; # keep it alive
424
425 $start_worker->()
426 unless @pool;
427
428 push @queue, [@_];
429 $scheduler->();
430 }
271} 431}
272 432
273sub start {
274 my ($self) = @_;
275
276 warn "start\n";#d#
277
278 Scalar::Util::weaken $self;
279
280 my $proc = [0, undef, undef];
281
282 $proc->[1] = $self->{template}
283 ->fork
284 ->AnyEvent::Fork::RPC::run ($self->{function},
285 async => $self->{async},
286 init => $self->{init},
287 serialiser => $self->{serialiser},
288 on_error => $self->{on_error},
289 on_event => sub {
290 if (@_ == 3 && $_[0] eq $magic0 && $_[2] eq $magic2) {
291 if ($_[1] eq "quit") {
292 my $pool = $self->{pool};
293 for (0 .. $#$pool) {
294 if ($pool->[$_] == $proc) {
295 Array::Heap::splice_heap @$pool, $_;
296 return;
297 }
298 }
299 die;
300 }
301 return;
302 }
303
304 &{ $self->{on_event} };
305 },
306 )
307 ;
308
309 ++$self->{idle};
310 Array::Heap::push_heap @{ $self->{pool} }, $proc;
311}
312
313=item $pool->call (..., $cb->(...)) 433=item $pool->(..., $cb->(...))
314 434
315Call the RPC function of a worker with the given arguments, and when the 435Call the RPC function of a worker with the given arguments, and when the
316worker is done, call the C<$cb> with the results, like just calling the 436worker is done, call the C<$cb> with the results, just like calling the
317L<AnyEvent::Fork::RPC> object directly. 437RPC object durectly - see the L<AnyEvent::Fork::RPC> documentation for
438details on the RPC API.
318 439
319If there is no free worker, the call will be queued. 440If there is no free worker, the call will be queued until a worker becomes
441available.
320 442
321Note that there can be considerable time between calling this method and 443Note that there can be considerable time between calling this method and
322the call actually being executed. During this time, the parameters passed 444the call actually being executed. During this time, the parameters passed
323to this function are effectively read-only - modifying them after the call 445to this function are effectively read-only - modifying them after the call
324and before the callback is invoked causes undefined behaviour. 446and before the callback is invoked causes undefined behaviour.
325 447
326=cut 448=cut
327 449
328sub scheduler { 450=item $cpus = AnyEvent::Fork::Pool::ncpu [$default_cpus]
329 my $self = shift;
330 451
331 my $pool = $self->{pool}; 452=item ($cpus, $eus) = AnyEvent::Fork::Pool::ncpu [$default_cpus]
332 my $queue = $self->{queue};
333 453
334 $self->start 454Tries to detect the number of CPUs (C<$cpus> often called cpu cores
335 unless @$pool; 455nowadays) and execution units (C<$eus>) which include e.g. extra
456hyperthreaded units). When C<$cpus> cannot be determined reliably,
457C<$default_cpus> is returned for both values, or C<1> if it is missing.
336 458
337 while (@$queue) { 459For normal CPU bound uses, it is wise to have as many worker processes
338 my $proc = $pool->[0]; 460as CPUs in the system (C<$cpus>), if nothing else uses the CPU. Using
461hyperthreading is usually detrimental to performance, but in those rare
462cases where that really helps it might be beneficial to use more workers
463(C<$eus>).
339 464
340 if ($proc->[0] < $self->{max_queue}) { 465Currently, F</proc/cpuinfo> is parsed on GNU/Linux systems for both
341 warn "free $proc $proc->[0]\n";#d# 466C<$cpus> and C<$eu>, and on {Free,Net,Open}BSD, F<sysctl -n hw.ncpu> is
342 # found free worker 467used for C<$cpus>.
343 --$self->{idle}
344 unless $proc->[0]++;
345 468
346 undef $proc->[2]; 469Example: create a worker pool with as many workers as cpu cores, or C<2>,
470if the actual number could not be determined.
347 471
348 Array::Heap::adjust_heap @$pool, 0; 472 $fork->AnyEvent::Fork::Pool::run ("myworker::function",
473 max => (scalar AnyEvent::Fork::Pool::ncpu 2),
474 );
349 475
350 my $job = shift @$queue; 476=cut
351 my $ocb = pop @$job;
352 477
353 $proc->[1]->(@$job, sub { 478BEGIN {
354 for (0 .. $#$pool) { 479 if ($^O eq "linux") {
355 if ($pool->[$_] == $proc) { 480 *ncpu = sub(;$) {
356 # reduce queue counter 481 my ($cpus, $eus);
357 unless (--$pool->[$_][0]) {
358 # worker becomes idle
359 my $to = ++$self->{idle} > $self->{max_idle}
360 ? 0
361 : $self->{idle_time};
362 482
363 $proc->[2] = AE::timer $to, 0, sub { 483 if (open my $fh, "<", "/proc/cpuinfo") {
364 undef $proc->[2]; 484 my %id;
365 485
366 warn "destroy $proc afzer $to\n";#d# 486 while (<$fh>) {
367 487 if (/^core id\s*:\s*(\d+)/) {
368 for (0 .. $#$pool) {
369 if ($pool->[$_] == $proc) {
370 Array::Heap::splice_heap @$pool, $_;
371 --$self->{idle};
372 last;
373 }
374 }
375 };
376 }
377
378 Array::Heap::adjust_heap @$pool, $_;
379 last; 488 ++$eus;
489 undef $id{$1};
380 } 490 }
381 } 491 }
382 &$ocb; 492
383 }); 493 $cpus = scalar keys %id;
384 } else { 494 } else {
385 warn "busy $proc->[0]\n";#d# 495 $cpus = $eus = @_ ? shift : 1;
386 # all busy, delay
387
388 $self->{min_delay_w} ||= AE::timer $self->{min_delay}, 0, sub {
389 delete $self->{min_delay_w};
390
391 if (@{ $self->{queue} }) {
392 $self->start;
393 $self->scheduler;
394 }
395 }; 496 }
396 last; 497 wantarray ? ($cpus, $eus) : $cpus
397 } 498 };
499 } elsif ($^O eq "freebsd" || $^O eq "netbsd" || $^O eq "openbsd") {
500 *ncpu = sub(;$) {
501 my $cpus = qx<sysctl -n hw.ncpu> * 1
502 || (@_ ? shift : 1);
503 wantarray ? ($cpus, $cpus) : $cpus
504 };
505 } else {
506 *ncpu = sub(;$) {
507 my $cpus = @_ ? shift : 1;
508 wantarray ? ($cpus, $cpus) : $cpus
509 };
398 } 510 }
399 warn "last\n";#d#
400} 511}
401 512
402sub call {
403 my $self = shift;
404
405 push @{ $self->{queue} }, [@_];
406 $self->scheduler;
407}
408
409sub DESTROY {
410 $_[0]{on_destroy}->();
411}
412
413=back 513=back
514
515=head1 CHILD USAGE
516
517In addition to the L<AnyEvent::Fork::RPC> API, this module implements one
518more child-side function:
519
520=over 4
521
522=item AnyEvent::Fork::Pool::retire ()
523
524This function sends an event to the parent process to request retirement:
525the worker is removed from the pool and no new jobs will be sent to it,
526but it has to handle the jobs that are already queued.
527
528The parentheses are part of the syntax: the function usually isn't defined
529when you compile your code (because that happens I<before> handing the
530template process over to C<AnyEvent::Fork::Pool::run>, so you need the
531empty parentheses to tell Perl that the function is indeed a function.
532
533Retiring a worker can be useful to gracefully shut it down when the worker
534deems this useful. For example, after executing a job, one could check
535the process size or the number of jobs handled so far, and if either is
536too high, the worker could ask to get retired, to avoid memory leaks to
537accumulate.
538
539Example: retire a worker after it has handled roughly 100 requests.
540
541 my $count = 0;
542
543 sub my::worker {
544
545 ++$count == 100
546 and AnyEvent::Fork::Pool::retire ();
547
548 ... normal code goes here
549 }
550
551=back
552
553=head1 POOL PARAMETERS RECIPES
554
555This section describes some recipes for pool paramaters. These are mostly
556meant for the synchronous RPC backend, as the asynchronous RPC backend
557changes the rules considerably, making workers themselves responsible for
558their scheduling.
559
560=over 4
561
562=item low latency - set load = 1
563
564If you need a deterministic low latency, you should set the C<load>
565parameter to C<1>. This ensures that never more than one job is sent to
566each worker. This avoids having to wait for a previous job to finish.
567
568This makes most sense with the synchronous (default) backend, as the
569asynchronous backend can handle multiple requests concurrently.
570
571=item lowest latency - set load = 1 and idle = max
572
573To achieve the lowest latency, you additionally should disable any dynamic
574resizing of the pool by setting C<idle> to the same value as C<max>.
575
576=item high throughput, cpu bound jobs - set load >= 2, max = #cpus
577
578To get high throughput with cpu-bound jobs, you should set the maximum
579pool size to the number of cpus in your system, and C<load> to at least
580C<2>, to make sure there can be another job waiting for the worker when it
581has finished one.
582
583The value of C<2> for C<load> is the minimum value that I<can> achieve
584100% throughput, but if your parent process itself is sometimes busy, you
585might need higher values. Also there is a limit on the amount of data that
586can be "in flight" to the worker, so if you send big blobs of data to your
587worker, C<load> might have much less of an effect.
588
589=item high throughput, I/O bound jobs - set load >= 2, max = 1, or very high
590
591When your jobs are I/O bound, using more workers usually boils down to
592higher throughput, depending very much on your actual workload - sometimes
593having only one worker is best, for example, when you read or write big
594files at maixmum speed, as a second worker will increase seek times.
595
596=back
597
598=head1 EXCEPTIONS
599
600The same "policy" as with L<AnyEvent::Fork::RPC> applies - exceptins will
601not be caught, and exceptions in both worker and in callbacks causes
602undesirable or undefined behaviour.
414 603
415=head1 SEE ALSO 604=head1 SEE ALSO
416 605
417L<AnyEvent::Fork>, to create the processes in the first place. 606L<AnyEvent::Fork>, to create the processes in the first place.
418 607

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines