1 | =head1 NAME |
1 | =head1 NAME |
2 | |
2 | |
3 | AnyEvent::Fork::Pool - simple process pool manager on top of AnyEvent::Fork |
3 | AnyEvent::Fork::Pool - simple process pool manager on top of AnyEvent::Fork |
|
|
4 | |
|
|
5 | THE API IS NOT FINISHED, CONSIDER THIS AN ALPHA RELEASE |
4 | |
6 | |
5 | =head1 SYNOPSIS |
7 | =head1 SYNOPSIS |
6 | |
8 | |
7 | use AnyEvent; |
9 | use AnyEvent; |
8 | use AnyEvent::Fork::Pool; |
10 | use AnyEvent::Fork::Pool; |
9 | # use AnyEvent::Fork is not needed |
11 | # use AnyEvent::Fork is not needed |
10 | |
12 | |
11 | # all parameters with default values |
13 | # all possible parameters shown, with default values |
12 | my $pool = AnyEvent::Fork |
14 | my $pool = AnyEvent::Fork |
13 | ->new |
15 | ->new |
14 | ->require ("MyWorker") |
16 | ->require ("MyWorker") |
15 | ->AnyEvent::Fork::Pool::run ( |
17 | ->AnyEvent::Fork::Pool::run ( |
16 | "MyWorker::run", # the worker function |
18 | "MyWorker::run", # the worker function |
17 | |
19 | |
18 | # pool management |
20 | # pool management |
19 | max => 4, # absolute maximum # of processes |
21 | max => 4, # absolute maximum # of processes |
20 | idle => 2, # minimum # of idle processes |
22 | idle => 0, # minimum # of idle processes |
21 | load => 2, # queue at most this number of jobs per process |
23 | load => 2, # queue at most this number of jobs per process |
22 | start => 0.1, # wait this many seconds before starting a new process |
24 | start => 0.1, # wait this many seconds before starting a new process |
23 | stop => 1, # wait this many seconds before stopping an idle process |
25 | stop => 10, # wait this many seconds before stopping an idle process |
24 | on_destroy => (my $finish = AE::cv), # called when object is destroyed |
26 | on_destroy => (my $finish = AE::cv), # called when object is destroyed |
25 | |
27 | |
26 | # parameters passed to AnyEvent::Fork::RPC |
28 | # parameters passed to AnyEvent::Fork::RPC |
27 | async => 0, |
29 | async => 0, |
28 | on_error => sub { die "FATAL: $_[0]\n" }, |
30 | on_error => sub { die "FATAL: $_[0]\n" }, |
… | |
… | |
48 | pool of processes that handles jobs. |
50 | pool of processes that handles jobs. |
49 | |
51 | |
50 | Understanding of L<AnyEvent::Fork> is helpful but not critical to be able |
52 | Understanding of L<AnyEvent::Fork> is helpful but not critical to be able |
51 | to use this module, but a thorough understanding of L<AnyEvent::Fork::RPC> |
53 | to use this module, but a thorough understanding of L<AnyEvent::Fork::RPC> |
52 | is, as it defines the actual API that needs to be implemented in the |
54 | is, as it defines the actual API that needs to be implemented in the |
53 | children. |
55 | worker processes. |
54 | |
56 | |
55 | =head1 EXAMPLES |
57 | =head1 EXAMPLES |
56 | |
58 | |
57 | =head1 PARENT USAGE |
59 | =head1 PARENT USAGE |
|
|
60 | |
|
|
61 | To create a pool, you first have to create a L<AnyEvent::Fork> object - |
|
|
62 | this object becomes your template process. Whenever a new worker process |
|
|
63 | is needed, it is forked from this template process. Then you need to |
|
|
64 | "hand off" this template process to the C<AnyEvent::Fork::Pool> module by |
|
|
65 | calling its run method on it: |
|
|
66 | |
|
|
67 | my $template = AnyEvent::Fork |
|
|
68 | ->new |
|
|
69 | ->require ("SomeModule", "MyWorkerModule"); |
|
|
70 | |
|
|
71 | my $pool = $template->AnyEvent::Fork::Pool::run ("MyWorkerModule::myfunction"); |
|
|
72 | |
|
|
73 | The pool "object" is not a regular Perl object, but a code reference that |
|
|
74 | you can call and that works roughly like calling the worker function |
|
|
75 | directly, except that it returns nothing but instead you need to specify a |
|
|
76 | callback to be invoked once results are in: |
|
|
77 | |
|
|
78 | $pool->(1, 2, 3, sub { warn "myfunction(1,2,3) returned @_" }); |
58 | |
79 | |
59 | =over 4 |
80 | =over 4 |
60 | |
81 | |
61 | =cut |
82 | =cut |
62 | |
83 | |
… | |
… | |
79 | my $magic0 = ':t6Z@HK1N%Dx@_7?=~-7NQgWDdAs6a,jFN=wLO0*jD*1%P'; |
100 | my $magic0 = ':t6Z@HK1N%Dx@_7?=~-7NQgWDdAs6a,jFN=wLO0*jD*1%P'; |
80 | my $magic1 = '<~53rexz.U`!]X[A235^"fyEoiTF\T~oH1l/N6+Djep9b~bI9`\1x%B~vWO1q*'; |
101 | my $magic1 = '<~53rexz.U`!]X[A235^"fyEoiTF\T~oH1l/N6+Djep9b~bI9`\1x%B~vWO1q*'; |
81 | |
102 | |
82 | our $VERSION = 0.1; |
103 | our $VERSION = 0.1; |
83 | |
104 | |
84 | =item my $rpc = AnyEvent::Fork::Pool::run $fork, $function, [key => value...] |
105 | =item my $pool = AnyEvent::Fork::Pool::run $fork, $function, [key => value...] |
85 | |
106 | |
86 | The traditional way to call it. But it is way cooler to call it in the |
107 | The traditional way to call the pool creation function. But it is way |
87 | following way: |
108 | cooler to call it in the following way: |
88 | |
109 | |
89 | =item my $rpc = $fork->AnyEvent::Fork::Pool::run ($function, [key => value...]) |
110 | =item my $pool = $fork->AnyEvent::Fork::Pool::run ($function, [key => value...]) |
90 | |
111 | |
91 | Creates a new pool object with the specified C<$function> as function |
112 | Creates a new pool object with the specified C<$function> as function |
92 | (name) to call for each request. The pool uses the C<$fork> object as the |
113 | (name) to call for each request. The pool uses the C<$fork> object as the |
93 | template when creating worker processes. |
114 | template when creating worker processes. |
94 | |
115 | |
… | |
… | |
103 | |
124 | |
104 | =item Pool Management |
125 | =item Pool Management |
105 | |
126 | |
106 | The pool consists of a certain number of worker processes. These options |
127 | The pool consists of a certain number of worker processes. These options |
107 | decide how many of these processes exist and when they are started and |
128 | decide how many of these processes exist and when they are started and |
108 | stopp.ed |
129 | stopped. |
|
|
130 | |
|
|
131 | The worker pool is dynamically resized, according to (perceived :) |
|
|
132 | load. The minimum size is given by the C<idle> parameter and the maximum |
|
|
133 | size is given by the C<max> parameter. A new worker is started every |
|
|
134 | C<start> seconds at most, and an idle worker is stopped at most every |
|
|
135 | C<stop> second. |
|
|
136 | |
|
|
137 | You can specify the amount of jobs sent to a worker concurrently using the |
|
|
138 | C<load> parameter. |
109 | |
139 | |
110 | =over 4 |
140 | =over 4 |
111 | |
141 | |
112 | =item idle => $count (default: 0) |
142 | =item idle => $count (default: 0) |
113 | |
143 | |
114 | The minimum amount of idle processes in the pool - when there are fewer |
144 | The minimum amount of idle processes in the pool - when there are fewer |
115 | than this many idle workers, C<AnyEvent::Fork::Pool> will try to start new |
145 | than this many idle workers, C<AnyEvent::Fork::Pool> will try to start new |
116 | ones, subject to C<max> and C<start>. |
146 | ones, subject to the limits set by C<max> and C<start>. |
117 | |
147 | |
118 | This is also the initial/minimum amount of workers in the pool. The |
148 | This is also the initial amount of workers in the pool. The default of |
119 | default of zero means that the pool starts empty and can shrink back to |
149 | zero means that the pool starts empty and can shrink back to zero workers |
120 | zero workers over time. |
150 | over time. |
121 | |
151 | |
122 | =item max => $count (default: 4) |
152 | =item max => $count (default: 4) |
123 | |
153 | |
124 | The maximum number of processes in the pool, in addition to the template |
154 | The maximum number of processes in the pool, in addition to the template |
125 | process. C<AnyEvent::Fork::Pool> will never create more than this number |
155 | process. C<AnyEvent::Fork::Pool> will never have more than this number of |
126 | of worker processes, although there can be more temporarily when a worker |
156 | worker processes, although there can be more temporarily when a worker is |
127 | is shut down and hasn't exited yet. |
157 | shut down and hasn't exited yet. |
128 | |
158 | |
129 | =item load => $count (default: 2) |
159 | =item load => $count (default: 2) |
130 | |
160 | |
131 | The maximum number of concurrent jobs sent to a single worker |
161 | The maximum number of concurrent jobs sent to a single worker process. |
132 | process. Worker processes that handle this number of jobs already are |
|
|
133 | called "busy". |
|
|
134 | |
162 | |
135 | Jobs that cannot be sent to a worker immediately (because all workers are |
163 | Jobs that cannot be sent to a worker immediately (because all workers are |
136 | busy) will be queued until a worker is available. |
164 | busy) will be queued until a worker is available. |
137 | |
165 | |
|
|
166 | Setting this low improves latency. For example, at C<1>, every job that |
|
|
167 | is sent to a worker is sent to a completely idle worker that doesn't run |
|
|
168 | any other jobs. The downside is that throughput is reduced - a worker that |
|
|
169 | finishes a job needs to wait for a new job from the parent. |
|
|
170 | |
|
|
171 | The default of C<2> is usually a good compromise. |
|
|
172 | |
138 | =item start => $seconds (default: 0.1) |
173 | =item start => $seconds (default: 0.1) |
139 | |
174 | |
140 | When a job is queued and all workers are busy, a timer is started. If the |
175 | When there are fewer than C<idle> workers (or all workers are completely |
141 | timer elapses and there are still jobs that cannot be queued to a worker, |
176 | busy), then a timer is started. If the timer elapses and there are still |
142 | a new worker is started. |
177 | jobs that cannot be queued to a worker, a new worker is started. |
143 | |
178 | |
144 | This configurs the time that all workers must be busy before a new worker |
179 | This sets the minimum time that all workers must be busy before a new |
145 | is started. Or, put differently, the minimum delay betwene starting new |
180 | worker is started. Or, put differently, the minimum delay between starting |
146 | workers. |
181 | new workers. |
147 | |
182 | |
148 | The delay is zero by default, which means new workers will be started |
183 | The delay is small by default, which means new workers will be started |
149 | without delay. |
184 | relatively quickly. A delay of C<0> is possible, and ensures that the pool |
|
|
185 | will grow as quickly as possible under load. |
150 | |
186 | |
|
|
187 | Non-zero values are useful to avoid "exploding" a pool because a lot of |
|
|
188 | jobs are queued in an instant. |
|
|
189 | |
|
|
190 | Higher values are often useful to improve efficiency at the cost of |
|
|
191 | latency - when fewer processes can do the job over time, starting more and |
|
|
192 | more is not necessarily going to help. |
|
|
193 | |
151 | =item stop => $seconds (default: 1) |
194 | =item stop => $seconds (default: 10) |
152 | |
195 | |
153 | When a worker has no jobs to execute it becomes idle. An idle worker that |
196 | When a worker has no jobs to execute it becomes idle. An idle worker that |
154 | hasn't executed a job within this amount of time will be stopped, unless |
197 | hasn't executed a job within this amount of time will be stopped, unless |
155 | the other parameters say otherwise. |
198 | the other parameters say otherwise. |
156 | |
199 | |
|
|
200 | Setting this to a very high value means that workers stay around longer, |
|
|
201 | even when they have nothing to do, which can be good as they don't have to |
|
|
202 | be started on the netx load spike again. |
|
|
203 | |
|
|
204 | Setting this to a lower value can be useful to avoid memory or simply |
|
|
205 | process table wastage. |
|
|
206 | |
|
|
207 | Usually, setting this to a time longer than the time between load spikes |
|
|
208 | is best - if you expect a lot of requests every minute and little work |
|
|
209 | in between, setting this to longer than a minute avoids having to stop |
|
|
210 | and start workers. On the other hand, you have to ask yourself if letting |
|
|
211 | workers run idle is a good use of your resources. Try to find a good |
|
|
212 | balance between resource usage of your workers and the time to start new |
|
|
213 | workers - the processes created by L<AnyEvent::Fork> itself is fats at |
|
|
214 | creating workers while not using much memory for them, so most of the |
|
|
215 | overhead is likely from your own code. |
|
|
216 | |
157 | =item on_destroy => $callback->() (default: none) |
217 | =item on_destroy => $callback->() (default: none) |
158 | |
218 | |
159 | When a pool object goes out of scope, it will still handle all outstanding |
219 | When a pool object goes out of scope, the outstanding requests are still |
160 | jobs. After that, it will destroy all workers (and also the template |
220 | handled till completion. Only after handling all jobs will the workers |
161 | process if it isn't referenced otherwise). |
221 | be destroyed (and also the template process if it isn't referenced |
|
|
222 | otherwise). |
|
|
223 | |
|
|
224 | To find out when a pool I<really> has finished its work, you can set this |
|
|
225 | callback, which will be called when the pool has been destroyed. |
162 | |
226 | |
163 | =back |
227 | =back |
164 | |
228 | |
165 | =item Template Process |
229 | =item AnyEvent::Fork::RPC Parameters |
166 | |
230 | |
167 | The worker processes are all forked from a single template |
231 | These parameters are all passed more or less directly to |
168 | process. Ideally, all modules and all cdoe used by the worker, as well as |
232 | L<AnyEvent::Fork::RPC>. They are only briefly mentioned here, for |
169 | any shared data structures should be loaded into the template process, to |
233 | their full documentation please refer to the L<AnyEvent::Fork::RPC> |
170 | take advantage of data sharing via fork. |
234 | documentation. Also, the default values mentioned here are only documented |
171 | |
235 | as a best effort - the L<AnyEvent::Fork::RPC> documentation is binding. |
172 | You can create your own template process by creating a L<AnyEvent::Fork> |
|
|
173 | object yourself and passing it as the C<template> parameter, but |
|
|
174 | C<AnyEvent::Fork::Pool> can create one for you, including some standard |
|
|
175 | options. |
|
|
176 | |
236 | |
177 | =over 4 |
237 | =over 4 |
178 | |
238 | |
179 | =item template => $fork (default: C<< AnyEvent::Fork->new >>) |
|
|
180 | |
|
|
181 | The template process to use, if you want to create your own. |
|
|
182 | |
|
|
183 | =item require => \@modules (default: C<[]>) |
|
|
184 | |
|
|
185 | The modules in this list will be laoded into the template process. |
|
|
186 | |
|
|
187 | =item eval => "# perl code to execute in template" (default: none) |
|
|
188 | |
|
|
189 | This is a perl string that is evaluated after creating the template |
|
|
190 | process and after requiring the modules. It can do whatever it wants to |
|
|
191 | configure the process, but it must not do anything that would keep a later |
|
|
192 | fork from working (so must not create event handlers or (real) threads for |
|
|
193 | example). |
|
|
194 | |
|
|
195 | =back |
|
|
196 | |
|
|
197 | =item AnyEvent::Fork::RPC Parameters |
|
|
198 | |
|
|
199 | These parameters are all passed directly to L<AnyEvent::Fork::RPC>. They |
|
|
200 | are only briefly mentioned here, for their full documentation |
|
|
201 | please refer to the L<AnyEvent::Fork::RPC> documentation. Also, the |
|
|
202 | default values mentioned here are only documented as a best effort - |
|
|
203 | L<AnyEvent::Fork::RPC> documentation is binding. |
|
|
204 | |
|
|
205 | =over 4 |
|
|
206 | |
|
|
207 | =item async => $boolean (default: 0) |
239 | =item async => $boolean (default: 0) |
208 | |
240 | |
209 | Whether to sue the synchronous or asynchronous RPC backend. |
241 | Whether to use the synchronous or asynchronous RPC backend. |
210 | |
242 | |
211 | =item on_error => $callback->($message) (default: die with message) |
243 | =item on_error => $callback->($message) (default: die with message) |
212 | |
244 | |
213 | The callback to call on any (fatal) errors. |
245 | The callback to call on any (fatal) errors. |
214 | |
246 | |
… | |
… | |
235 | |
267 | |
236 | my $max = $arg{max} || 4; |
268 | my $max = $arg{max} || 4; |
237 | my $idle = $arg{idle} || 0, |
269 | my $idle = $arg{idle} || 0, |
238 | my $load = $arg{load} || 2, |
270 | my $load = $arg{load} || 2, |
239 | my $start = $arg{start} || 0.1, |
271 | my $start = $arg{start} || 0.1, |
240 | my $stop = $arg{stop} || 1, |
272 | my $stop = $arg{stop} || 10, |
241 | my $on_event = $arg{on_event} || sub { }, |
273 | my $on_event = $arg{on_event} || sub { }, |
242 | my $on_destroy = $arg{on_destroy}; |
274 | my $on_destroy = $arg{on_destroy}; |
243 | |
275 | |
244 | my @rpc = ( |
276 | my @rpc = ( |
245 | async => $arg{async}, |
277 | async => $arg{async}, |
246 | init => $arg{init}, |
278 | init => $arg{init}, |
247 | serialiser => $arg{serialiser}, |
279 | serialiser => delete $arg{serialiser}, |
248 | on_error => $arg{on_error}, |
280 | on_error => $arg{on_error}, |
249 | ); |
281 | ); |
250 | |
282 | |
251 | my (@pool, @queue, $nidle, $start_w, $stop_w, $shutdown); |
283 | my (@pool, @queue, $nidle, $start_w, $stop_w, $shutdown); |
252 | my ($start, $stop, $want_start, $want_stop, $scheduler); |
284 | my ($start_worker, $stop_worker, $want_start, $want_stop, $scheduler); |
253 | |
285 | |
254 | my $destroy_guard = Guard::guard { |
286 | my $destroy_guard = Guard::guard { |
255 | $on_destroy->() |
287 | $on_destroy->() |
256 | if $on_destroy; |
288 | if $on_destroy; |
257 | }; |
289 | }; |
258 | |
290 | |
259 | $template |
291 | $template |
260 | ->require ("AnyEvent::Fork::RPC::" . ($arg{async} ? "Async" : "Sync")) |
292 | ->require ("AnyEvent::Fork::RPC::" . ($arg{async} ? "Async" : "Sync")) |
261 | ->eval (' |
293 | ->eval (' |
262 | my ($magic0, $magic1) = @_; |
294 | my ($magic0, $magic1) = @_; |
263 | sub AnyEvent::Fork::Pool::quit() { |
295 | sub AnyEvent::Fork::Pool::retire() { |
264 | AnyEvent::Fork::RPC::on_event $magic0, "quit", $magic1; |
296 | AnyEvent::Fork::RPC::event $magic0, "quit", $magic1; |
265 | } |
297 | } |
266 | ', $magic0, $magic1) |
298 | ', $magic0, $magic1) |
267 | ->eval (delete $arg{eval}); |
299 | ; |
268 | |
300 | |
269 | $start = sub { |
301 | $start_worker = sub { |
270 | my $proc = [0, 0, undef]; # load, index, rpc |
302 | my $proc = [0, 0, undef]; # load, index, rpc |
271 | |
|
|
272 | warn "start a worker\n";#d# |
|
|
273 | |
303 | |
274 | $proc->[2] = $template |
304 | $proc->[2] = $template |
275 | ->fork |
305 | ->fork |
276 | ->AnyEvent::Fork::RPC::run ($function, |
306 | ->AnyEvent::Fork::RPC::run ($function, |
277 | @rpc, |
307 | @rpc, |
278 | on_event => sub { |
308 | on_event => sub { |
279 | if (@_ == 3 && $_[0] eq $magic0 && $_[2] eq $magic1) { |
309 | if (@_ == 3 && $_[0] eq $magic0 && $_[2] eq $magic1) { |
280 | $destroy_guard if 0; # keep it alive |
310 | $destroy_guard if 0; # keep it alive |
281 | |
311 | |
282 | $_[1] eq "quit" and $stop->($proc); |
312 | $_[1] eq "quit" and $stop_worker->($proc); |
283 | return; |
313 | return; |
284 | } |
314 | } |
285 | |
315 | |
286 | &$on_event; |
316 | &$on_event; |
287 | }, |
317 | }, |
288 | ) |
318 | ) |
289 | ; |
319 | ; |
290 | |
320 | |
291 | ++$nidle; |
321 | ++$nidle; |
292 | Array::Heap::push_heap @pool, $proc; |
322 | Array::Heap::push_heap_idx @pool, $proc; |
293 | |
323 | |
294 | Scalar::Util::weaken $proc; |
324 | Scalar::Util::weaken $proc; |
295 | }; |
325 | }; |
296 | |
326 | |
297 | $stop = sub { |
327 | $stop_worker = sub { |
298 | my $proc = shift; |
328 | my $proc = shift; |
299 | |
329 | |
300 | $proc->[0] |
330 | $proc->[0] |
301 | or --$nidle; |
331 | or --$nidle; |
302 | |
332 | |
303 | Array::Heap::splice_heap_idx @pool, $proc->[1] |
333 | Array::Heap::splice_heap_idx @pool, $proc->[1] |
304 | if defined $proc->[1]; |
334 | if defined $proc->[1]; |
|
|
335 | |
|
|
336 | @$proc = 0; # tell others to leave it be |
305 | }; |
337 | }; |
306 | |
338 | |
307 | $want_start = sub { |
339 | $want_start = sub { |
308 | undef $stop_w; |
340 | undef $stop_w; |
309 | |
341 | |
310 | $start_w ||= AE::timer $start, 0, sub { |
342 | $start_w ||= AE::timer $start, $start, sub { |
311 | undef $start_w; |
343 | if (($nidle < $idle || @queue) && @pool < $max) { |
312 | |
|
|
313 | if (@queue) { |
|
|
314 | $start->(); |
344 | $start_worker->(); |
315 | $scheduler->(); |
345 | $scheduler->(); |
|
|
346 | } else { |
|
|
347 | undef $start_w; |
316 | } |
348 | } |
317 | }; |
349 | }; |
318 | }; |
350 | }; |
319 | |
351 | |
320 | $want_stop = sub { |
352 | $want_stop = sub { |
321 | $stop_w ||= AE::timer $stop, 0, sub { |
353 | $stop_w ||= AE::timer $stop, $stop, sub { |
322 | undef $stop_w; |
|
|
323 | |
|
|
324 | $stop->($pool[0]) |
354 | $stop_worker->($pool[0]) |
325 | if $nidle; |
355 | if $nidle; |
|
|
356 | |
|
|
357 | undef $stop_w |
|
|
358 | if $nidle <= $idle; |
326 | }; |
359 | }; |
327 | }; |
360 | }; |
328 | |
361 | |
329 | $scheduler = sub { |
362 | $scheduler = sub { |
330 | if (@queue) { |
363 | if (@queue) { |
331 | while (@queue) { |
364 | while (@queue) { |
|
|
365 | @pool or $start_worker->(); |
|
|
366 | |
332 | my $proc = $pool[0]; |
367 | my $proc = $pool[0]; |
333 | |
368 | |
334 | if ($proc->[0] < $load) { |
369 | if ($proc->[0] < $load) { |
335 | warn "free $proc $proc->[0]\n";#d# |
|
|
336 | # found free worker |
370 | # found free worker, increase load |
337 | $proc->[0]++ |
371 | unless ($proc->[0]++) { |
|
|
372 | # worker became busy |
338 | or --$nidle >= $idle |
373 | --$nidle |
|
|
374 | or undef $stop_w; |
|
|
375 | |
339 | or $want_start->(); |
376 | $want_start->() |
|
|
377 | if $nidle < $idle && @pool < $max; |
|
|
378 | } |
340 | |
379 | |
341 | Array::Heap::adjust_heap @pool, 0; |
380 | Array::Heap::adjust_heap_idx @pool, 0; |
342 | |
381 | |
343 | my $job = shift @queue; |
382 | my $job = shift @queue; |
344 | my $ocb = pop @$job; |
383 | my $ocb = pop @$job; |
345 | |
384 | |
346 | $proc->[2]->(@$job, sub { |
385 | $proc->[2]->(@$job, sub { |
347 | # reduce queue counter |
386 | # reduce load |
348 | --$pool[$_][0] |
387 | --$proc->[0] # worker still busy? |
349 | or ++$nidle > $idle |
388 | or ++$nidle > $idle # not too many idle processes? |
350 | or $want_stop->(); |
389 | or $want_stop->(); |
351 | |
390 | |
352 | Array::Heap::adjust_heap @pool, $_; |
391 | Array::Heap::adjust_heap_idx @pool, $proc->[1] |
|
|
392 | if defined $proc->[1]; |
|
|
393 | |
|
|
394 | &$ocb; |
353 | |
395 | |
354 | $scheduler->(); |
396 | $scheduler->(); |
355 | |
|
|
356 | &$ocb; |
|
|
357 | }); |
397 | }); |
358 | } else { |
398 | } else { |
359 | warn "busy $proc->[0]\n";#d# |
|
|
360 | # all busy, delay |
|
|
361 | |
|
|
362 | $want_start->(); |
399 | $want_start->() |
|
|
400 | unless @pool >= $max; |
|
|
401 | |
363 | last; |
402 | last; |
364 | } |
403 | } |
365 | } |
404 | } |
366 | } elsif ($shutdown) { |
405 | } elsif ($shutdown) { |
367 | @pool = (); |
406 | @pool = (); |
368 | undef $start_w; |
407 | undef $start_w; |
369 | undef $start; # frees $destroy_guard reference |
408 | undef $start_worker; # frees $destroy_guard reference |
370 | |
409 | |
371 | $stop->($pool[0]) |
410 | $stop_worker->($pool[0]) |
372 | while $nidle; |
411 | while $nidle; |
373 | } |
412 | } |
374 | }; |
413 | }; |
375 | |
414 | |
376 | my $shutdown_guard = Guard::guard { |
415 | my $shutdown_guard = Guard::guard { |
377 | $shutdown = 1; |
416 | $shutdown = 1; |
378 | $scheduler->(); |
417 | $scheduler->(); |
379 | }; |
418 | }; |
380 | |
419 | |
381 | $start->() |
420 | $start_worker->() |
382 | while @pool < $idle; |
421 | while @pool < $idle; |
383 | |
422 | |
384 | sub { |
423 | sub { |
385 | $shutdown_guard if 0; # keep it alive |
424 | $shutdown_guard if 0; # keep it alive |
386 | |
425 | |
387 | $start->() |
426 | $start_worker->() |
388 | unless @pool; |
427 | unless @pool; |
389 | |
428 | |
390 | push @queue, [@_]; |
429 | push @queue, [@_]; |
391 | $scheduler->(); |
430 | $scheduler->(); |
392 | } |
431 | } |
393 | } |
432 | } |
394 | |
433 | |
395 | =item $pool->call (..., $cb->(...)) |
434 | =item $pool->(..., $cb->(...)) |
396 | |
435 | |
397 | Call the RPC function of a worker with the given arguments, and when the |
436 | Call the RPC function of a worker with the given arguments, and when the |
398 | worker is done, call the C<$cb> with the results, like just calling the |
437 | worker is done, call the C<$cb> with the results, just like calling the |
399 | L<AnyEvent::Fork::RPC> object directly. |
438 | RPC object durectly - see the L<AnyEvent::Fork::RPC> documentation for |
|
|
439 | details on the RPC API. |
400 | |
440 | |
401 | If there is no free worker, the call will be queued. |
441 | If there is no free worker, the call will be queued until a worker becomes |
|
|
442 | available. |
402 | |
443 | |
403 | Note that there can be considerable time between calling this method and |
444 | Note that there can be considerable time between calling this method and |
404 | the call actually being executed. During this time, the parameters passed |
445 | the call actually being executed. During this time, the parameters passed |
405 | to this function are effectively read-only - modifying them after the call |
446 | to this function are effectively read-only - modifying them after the call |
406 | and before the callback is invoked causes undefined behaviour. |
447 | and before the callback is invoked causes undefined behaviour. |
407 | |
448 | |
408 | =cut |
449 | =cut |
409 | |
450 | |
410 | =back |
451 | =back |
411 | |
452 | |
|
|
453 | =head1 CHILD USAGE |
|
|
454 | |
|
|
455 | In addition to the L<AnyEvent::Fork::RPC> API, this module implements one |
|
|
456 | more child-side function: |
|
|
457 | |
|
|
458 | =over 4 |
|
|
459 | |
|
|
460 | =item AnyEvent::Fork::Pool::retire () |
|
|
461 | |
|
|
462 | This function sends an event to the parent process to request retirement: |
|
|
463 | the worker is removed from the pool and no new jobs will be sent to it, |
|
|
464 | but it has to handle the jobs that are already queued. |
|
|
465 | |
|
|
466 | The parentheses are part of the syntax: the function usually isn't defined |
|
|
467 | when you compile your code (because that happens I<before> handing the |
|
|
468 | template process over to C<AnyEvent::Fork::Pool::run>, so you need the |
|
|
469 | empty parentheses to tell Perl that the function is indeed a function. |
|
|
470 | |
|
|
471 | Retiring a worker can be useful to gracefully shut it down when the worker |
|
|
472 | deems this useful. For example, after executing a job, one could check |
|
|
473 | the process size or the number of jobs handled so far, and if either is |
|
|
474 | too high, the worker could ask to get retired, to avoid memory leaks to |
|
|
475 | accumulate. |
|
|
476 | |
|
|
477 | =back |
|
|
478 | |
|
|
479 | =head1 POOL PARAMETERS RECIPES |
|
|
480 | |
|
|
481 | This section describes some recipes for pool paramaters. These are mostly |
|
|
482 | meant for the synchronous RPC backend, as the asynchronous RPC backend |
|
|
483 | changes the rules considerably, making workers themselves responsible for |
|
|
484 | their scheduling. |
|
|
485 | |
|
|
486 | =over 4 |
|
|
487 | |
|
|
488 | =item low latency - set load = 1 |
|
|
489 | |
|
|
490 | If you need a deterministic low latency, you should set the C<load> |
|
|
491 | parameter to C<1>. This ensures that never more than one job is sent to |
|
|
492 | each worker. This avoids having to wait for a previous job to finish. |
|
|
493 | |
|
|
494 | This makes most sense with the synchronous (default) backend, as the |
|
|
495 | asynchronous backend can handle multiple requests concurrently. |
|
|
496 | |
|
|
497 | =item lowest latency - set load = 1 and idle = max |
|
|
498 | |
|
|
499 | To achieve the lowest latency, you additionally should disable any dynamic |
|
|
500 | resizing of the pool by setting C<idle> to the same value as C<max>. |
|
|
501 | |
|
|
502 | =item high throughput, cpu bound jobs - set load >= 2, max = #cpus |
|
|
503 | |
|
|
504 | To get high throughput with cpu-bound jobs, you should set the maximum |
|
|
505 | pool size to the number of cpus in your system, and C<load> to at least |
|
|
506 | C<2>, to make sure there can be another job waiting for the worker when it |
|
|
507 | has finished one. |
|
|
508 | |
|
|
509 | The value of C<2> for C<load> is the minimum value that I<can> achieve |
|
|
510 | 100% throughput, but if your parent process itself is sometimes busy, you |
|
|
511 | might need higher values. Also there is a limit on the amount of data that |
|
|
512 | can be "in flight" to the worker, so if you send big blobs of data to your |
|
|
513 | worker, C<load> might have much less of an effect. |
|
|
514 | |
|
|
515 | =item high throughput, I/O bound jobs - set load >= 2, max = 1, or very high |
|
|
516 | |
|
|
517 | When your jobs are I/O bound, using more workers usually boils down to |
|
|
518 | higher throughput, depending very much on your actual workload - sometimes |
|
|
519 | having only one worker is best, for example, when you read or write big |
|
|
520 | files at maixmum speed, as a second worker will increase seek times. |
|
|
521 | |
|
|
522 | =back |
|
|
523 | |
|
|
524 | =head1 EXCEPTIONS |
|
|
525 | |
|
|
526 | The same "policy" as with L<AnyEvent::Fork::RPC> applies - exceptins will |
|
|
527 | not be caught, and exceptions in both worker and in callbacks causes |
|
|
528 | undesirable or undefined behaviour. |
|
|
529 | |
412 | =head1 SEE ALSO |
530 | =head1 SEE ALSO |
413 | |
531 | |
414 | L<AnyEvent::Fork>, to create the processes in the first place. |
532 | L<AnyEvent::Fork>, to create the processes in the first place. |
415 | |
533 | |
416 | L<AnyEvent::Fork::RPC>, which implements the RPC protocol and API. |
534 | L<AnyEvent::Fork::RPC>, which implements the RPC protocol and API. |