1 | =head1 NAME |
1 | =head1 NAME |
2 | |
2 | |
3 | AnyEvent::Fork::Pool - simple process pool manager on top of AnyEvent::Fork |
3 | AnyEvent::Fork::Pool - simple process pool manager on top of AnyEvent::Fork |
|
|
4 | |
|
|
5 | THE API IS NOT FINISHED, CONSIDER THIS AN ALPHA RELEASE |
4 | |
6 | |
5 | =head1 SYNOPSIS |
7 | =head1 SYNOPSIS |
6 | |
8 | |
7 | use AnyEvent; |
9 | use AnyEvent; |
8 | use AnyEvent::Fork::Pool; |
10 | use AnyEvent::Fork::Pool; |
9 | # use AnyEvent::Fork is not needed |
11 | # use AnyEvent::Fork is not needed |
10 | |
12 | |
11 | # all parameters with default values |
13 | # all possible parameters shown, with default values |
12 | my $pool = new AnyEvent::Fork::Pool |
14 | my $pool = AnyEvent::Fork |
13 | "MyWorker::run", |
15 | ->new |
|
|
16 | ->require ("MyWorker") |
|
|
17 | ->AnyEvent::Fork::Pool::run ( |
|
|
18 | "MyWorker::run", # the worker function |
14 | |
19 | |
15 | # pool management |
20 | # pool management |
16 | min => 0, # minimum # of processes |
|
|
17 | max => 8, # maximum # of processes |
21 | max => 4, # absolute maximum # of processes |
|
|
22 | idle => 0, # minimum # of idle processes |
|
|
23 | load => 2, # queue at most this number of jobs per process |
18 | busy_time => 0, # wait this before starting a new process |
24 | start => 0.1, # wait this many seconds before starting a new process |
19 | max_idle => 1, # wait this before killing an idle process |
25 | stop => 10, # wait this many seconds before stopping an idle process |
20 | idle_time => 1, # at most this many idle processes |
26 | on_destroy => (my $finish = AE::cv), # called when object is destroyed |
21 | |
27 | |
22 | # template process |
|
|
23 | template => AnyEvent::Fork->new, # the template process to use |
|
|
24 | require => [MyWorker::], # module(s) to load |
|
|
25 | eval => "# perl code to execute in template", |
|
|
26 | on_destroy => (my $finish = AE::cv), |
|
|
27 | |
|
|
28 | # parameters passed to AnyEvent::Fork::RPC |
28 | # parameters passed to AnyEvent::Fork::RPC |
29 | async => 0, |
29 | async => 0, |
30 | on_error => sub { die "FATAL: $_[0]\n" }, |
30 | on_error => sub { die "FATAL: $_[0]\n" }, |
31 | on_event => sub { my @ev = @_ }, |
31 | on_event => sub { my @ev = @_ }, |
32 | init => "MyWorker::init", |
32 | init => "MyWorker::init", |
33 | serialiser => $AnyEvent::Fork::RPC::STRING_SERIALISER, |
33 | serialiser => $AnyEvent::Fork::RPC::STRING_SERIALISER, |
34 | ; |
34 | ); |
35 | |
35 | |
36 | for (1..10) { |
36 | for (1..10) { |
37 | $pool->call (doit => $_, sub { |
37 | $pool->(doit => $_, sub { |
38 | print "MyWorker::run returned @_\n"; |
38 | print "MyWorker::run returned @_\n"; |
39 | }); |
39 | }); |
40 | } |
40 | } |
41 | |
41 | |
42 | undef $pool; |
42 | undef $pool; |
… | |
… | |
50 | pool of processes that handles jobs. |
50 | pool of processes that handles jobs. |
51 | |
51 | |
52 | Understanding of L<AnyEvent::Fork> is helpful but not critical to be able |
52 | Understanding of L<AnyEvent::Fork> is helpful but not critical to be able |
53 | to use this module, but a thorough understanding of L<AnyEvent::Fork::RPC> |
53 | to use this module, but a thorough understanding of L<AnyEvent::Fork::RPC> |
54 | is, as it defines the actual API that needs to be implemented in the |
54 | is, as it defines the actual API that needs to be implemented in the |
55 | children. |
55 | worker processes. |
56 | |
56 | |
57 | =head1 EXAMPLES |
57 | =head1 EXAMPLES |
58 | |
58 | |
59 | =head1 API |
59 | =head1 PARENT USAGE |
|
|
60 | |
|
|
61 | To create a pool, you first have to create a L<AnyEvent::Fork> object - |
|
|
62 | this object becomes your template process. Whenever a new worker process |
|
|
63 | is needed, it is forked from this template process. Then you need to |
|
|
64 | "hand off" this template process to the C<AnyEvent::Fork::Pool> module by |
|
|
65 | calling its run method on it: |
|
|
66 | |
|
|
67 | my $template = AnyEvent::Fork |
|
|
68 | ->new |
|
|
69 | ->require ("SomeModule", "MyWorkerModule"); |
|
|
70 | |
|
|
71 | my $pool = $template->AnyEvent::Fork::Pool::run ("MyWorkerModule::myfunction"); |
|
|
72 | |
|
|
73 | The pool "object" is not a regular Perl object, but a code reference that |
|
|
74 | you can call and that works roughly like calling the worker function |
|
|
75 | directly, except that it returns nothing but instead you need to specify a |
|
|
76 | callback to be invoked once results are in: |
|
|
77 | |
|
|
78 | $pool->(1, 2, 3, sub { warn "myfunction(1,2,3) returned @_" }); |
60 | |
79 | |
61 | =over 4 |
80 | =over 4 |
62 | |
81 | |
63 | =cut |
82 | =cut |
64 | |
83 | |
65 | package AnyEvent::Fork::Pool; |
84 | package AnyEvent::Fork::Pool; |
66 | |
85 | |
67 | use common::sense; |
86 | use common::sense; |
68 | |
87 | |
|
|
88 | use Scalar::Util (); |
|
|
89 | |
69 | use Guard (); |
90 | use Guard (); |
|
|
91 | use Array::Heap (); |
70 | |
92 | |
71 | use AnyEvent; |
93 | use AnyEvent; |
|
|
94 | # explicit version on next line, as some cpan-testers test with the 0.1 version, |
|
|
95 | # ignoring dependencies, and this line will at least give a clear indication of that. |
72 | use AnyEvent::Fork; # we don't actually depend on it, this is for convenience |
96 | use AnyEvent::Fork 0.6; # we don't actually depend on it, this is for convenience |
73 | use AnyEvent::Fork::RPC; |
97 | use AnyEvent::Fork::RPC; |
74 | |
98 | |
|
|
99 | # these are used for the first and last argument of events |
|
|
100 | # in the hope of not colliding. yes, I don't like it either, |
|
|
101 | # but didn't come up with an obviously better alternative. |
|
|
102 | my $magic0 = ':t6Z@HK1N%Dx@_7?=~-7NQgWDdAs6a,jFN=wLO0*jD*1%P'; |
|
|
103 | my $magic1 = '<~53rexz.U`!]X[A235^"fyEoiTF\T~oH1l/N6+Djep9b~bI9`\1x%B~vWO1q*'; |
|
|
104 | |
75 | our $VERSION = 0.1; |
105 | our $VERSION = 0.1; |
76 | |
106 | |
77 | =item my $rpc = new AnyEvent::Fork::RPC::pool $function, [key => value...] |
107 | =item my $pool = AnyEvent::Fork::Pool::run $fork, $function, [key => value...] |
|
|
108 | |
|
|
109 | The traditional way to call the pool creation function. But it is way |
|
|
110 | cooler to call it in the following way: |
|
|
111 | |
|
|
112 | =item my $pool = $fork->AnyEvent::Fork::Pool::run ($function, [key => value...]) |
|
|
113 | |
|
|
114 | Creates a new pool object with the specified C<$function> as function |
|
|
115 | (name) to call for each request. The pool uses the C<$fork> object as the |
|
|
116 | template when creating worker processes. |
|
|
117 | |
|
|
118 | You can supply your own template process, or tell C<AnyEvent::Fork::Pool> |
|
|
119 | to create one. |
|
|
120 | |
|
|
121 | A relatively large number of key/value pairs can be specified to influence |
|
|
122 | the behaviour. They are grouped into the categories "pool management", |
|
|
123 | "template process" and "rpc parameters". |
78 | |
124 | |
79 | =over 4 |
125 | =over 4 |
80 | |
126 | |
81 | =item on_error => $cb->($msg) |
127 | =item Pool Management |
82 | |
128 | |
83 | Called on (fatal) errors, with a descriptive (hopefully) message. If |
129 | The pool consists of a certain number of worker processes. These options |
84 | this callback is not provided, but C<on_event> is, then the C<on_event> |
130 | decide how many of these processes exist and when they are started and |
85 | callback is called with the first argument being the string C<error>, |
131 | stopped. |
86 | followed by the error message. |
|
|
87 | |
132 | |
88 | If neither handler is provided it prints the error to STDERR and will |
133 | The worker pool is dynamically resized, according to (perceived :) |
89 | start failing badly. |
134 | load. The minimum size is given by the C<idle> parameter and the maximum |
|
|
135 | size is given by the C<max> parameter. A new worker is started every |
|
|
136 | C<start> seconds at most, and an idle worker is stopped at most every |
|
|
137 | C<stop> second. |
90 | |
138 | |
91 | =item on_event => $cb->(...) |
139 | You can specify the amount of jobs sent to a worker concurrently using the |
|
|
140 | C<load> parameter. |
92 | |
141 | |
93 | Called for every call to the C<AnyEvent::Fork::RPC::event> function in the |
142 | =over 4 |
94 | child, with the arguments of that function passed to the callback. |
|
|
95 | |
143 | |
96 | Also called on errors when no C<on_error> handler is provided. |
144 | =item idle => $count (default: 0) |
97 | |
145 | |
98 | =item on_destroy => $cb->() |
146 | The minimum amount of idle processes in the pool - when there are fewer |
|
|
147 | than this many idle workers, C<AnyEvent::Fork::Pool> will try to start new |
|
|
148 | ones, subject to the limits set by C<max> and C<start>. |
99 | |
149 | |
100 | Called when the C<$rpc> object has been destroyed and all requests have |
150 | This is also the initial amount of workers in the pool. The default of |
101 | been successfully handled. This is useful when you queue some requests and |
151 | zero means that the pool starts empty and can shrink back to zero workers |
102 | want the child to go away after it has handled them. The problem is that |
152 | over time. |
103 | the parent must not exit either until all requests have been handled, and |
|
|
104 | this can be accomplished by waiting for this callback. |
|
|
105 | |
153 | |
106 | =item init => $function (default none) |
154 | =item max => $count (default: 4) |
107 | |
155 | |
108 | When specified (by name), this function is called in the child as the very |
156 | The maximum number of processes in the pool, in addition to the template |
109 | first thing when taking over the process, with all the arguments normally |
157 | process. C<AnyEvent::Fork::Pool> will never have more than this number of |
110 | passed to the C<AnyEvent::Fork::run> function, except the communications |
158 | worker processes, although there can be more temporarily when a worker is |
111 | socket. |
159 | shut down and hasn't exited yet. |
112 | |
160 | |
113 | It can be used to do one-time things in the child such as storing passed |
161 | =item load => $count (default: 2) |
114 | parameters or opening database connections. |
|
|
115 | |
162 | |
116 | It is called very early - before the serialisers are created or the |
163 | The maximum number of concurrent jobs sent to a single worker process. |
117 | C<$function> name is resolved into a function reference, so it could be |
164 | |
118 | used to load any modules that provide the serialiser or function. It can |
165 | Jobs that cannot be sent to a worker immediately (because all workers are |
119 | not, however, create events. |
166 | busy) will be queued until a worker is available. |
|
|
167 | |
|
|
168 | Setting this low improves latency. For example, at C<1>, every job that |
|
|
169 | is sent to a worker is sent to a completely idle worker that doesn't run |
|
|
170 | any other jobs. The downside is that throughput is reduced - a worker that |
|
|
171 | finishes a job needs to wait for a new job from the parent. |
|
|
172 | |
|
|
173 | The default of C<2> is usually a good compromise. |
|
|
174 | |
|
|
175 | =item start => $seconds (default: 0.1) |
|
|
176 | |
|
|
177 | When there are fewer than C<idle> workers (or all workers are completely |
|
|
178 | busy), then a timer is started. If the timer elapses and there are still |
|
|
179 | jobs that cannot be queued to a worker, a new worker is started. |
|
|
180 | |
|
|
181 | This sets the minimum time that all workers must be busy before a new |
|
|
182 | worker is started. Or, put differently, the minimum delay between starting |
|
|
183 | new workers. |
|
|
184 | |
|
|
185 | The delay is small by default, which means new workers will be started |
|
|
186 | relatively quickly. A delay of C<0> is possible, and ensures that the pool |
|
|
187 | will grow as quickly as possible under load. |
|
|
188 | |
|
|
189 | Non-zero values are useful to avoid "exploding" a pool because a lot of |
|
|
190 | jobs are queued in an instant. |
|
|
191 | |
|
|
192 | Higher values are often useful to improve efficiency at the cost of |
|
|
193 | latency - when fewer processes can do the job over time, starting more and |
|
|
194 | more is not necessarily going to help. |
|
|
195 | |
|
|
196 | =item stop => $seconds (default: 10) |
|
|
197 | |
|
|
198 | When a worker has no jobs to execute it becomes idle. An idle worker that |
|
|
199 | hasn't executed a job within this amount of time will be stopped, unless |
|
|
200 | the other parameters say otherwise. |
|
|
201 | |
|
|
202 | Setting this to a very high value means that workers stay around longer, |
|
|
203 | even when they have nothing to do, which can be good as they don't have to |
|
|
204 | be started on the netx load spike again. |
|
|
205 | |
|
|
206 | Setting this to a lower value can be useful to avoid memory or simply |
|
|
207 | process table wastage. |
|
|
208 | |
|
|
209 | Usually, setting this to a time longer than the time between load spikes |
|
|
210 | is best - if you expect a lot of requests every minute and little work |
|
|
211 | in between, setting this to longer than a minute avoids having to stop |
|
|
212 | and start workers. On the other hand, you have to ask yourself if letting |
|
|
213 | workers run idle is a good use of your resources. Try to find a good |
|
|
214 | balance between resource usage of your workers and the time to start new |
|
|
215 | workers - the processes created by L<AnyEvent::Fork> itself is fats at |
|
|
216 | creating workers while not using much memory for them, so most of the |
|
|
217 | overhead is likely from your own code. |
|
|
218 | |
|
|
219 | =item on_destroy => $callback->() (default: none) |
|
|
220 | |
|
|
221 | When a pool object goes out of scope, the outstanding requests are still |
|
|
222 | handled till completion. Only after handling all jobs will the workers |
|
|
223 | be destroyed (and also the template process if it isn't referenced |
|
|
224 | otherwise). |
|
|
225 | |
|
|
226 | To find out when a pool I<really> has finished its work, you can set this |
|
|
227 | callback, which will be called when the pool has been destroyed. |
|
|
228 | |
|
|
229 | =back |
|
|
230 | |
|
|
231 | =item AnyEvent::Fork::RPC Parameters |
|
|
232 | |
|
|
233 | These parameters are all passed more or less directly to |
|
|
234 | L<AnyEvent::Fork::RPC>. They are only briefly mentioned here, for |
|
|
235 | their full documentation please refer to the L<AnyEvent::Fork::RPC> |
|
|
236 | documentation. Also, the default values mentioned here are only documented |
|
|
237 | as a best effort - the L<AnyEvent::Fork::RPC> documentation is binding. |
|
|
238 | |
|
|
239 | =over 4 |
120 | |
240 | |
121 | =item async => $boolean (default: 0) |
241 | =item async => $boolean (default: 0) |
122 | |
242 | |
123 | The default server used in the child does all I/O blockingly, and only |
243 | Whether to use the synchronous or asynchronous RPC backend. |
124 | allows a single RPC call to execute concurrently. |
|
|
125 | |
244 | |
126 | Setting C<async> to a true value switches to another implementation that |
245 | =item on_error => $callback->($message) (default: die with message) |
127 | uses L<AnyEvent> in the child and allows multiple concurrent RPC calls. |
|
|
128 | |
246 | |
129 | The actual API in the child is documented in the section that describes |
247 | The callback to call on any (fatal) errors. |
130 | the calling semantics of the returned C<$rpc> function. |
|
|
131 | |
248 | |
132 | If you want to pre-load the actual back-end modules to enable memory |
249 | =item on_event => $callback->(...) (default: C<sub { }>, unlike L<AnyEvent::Fork::RPC>) |
133 | sharing, then you should load C<AnyEvent::Fork::RPC::Sync> for |
|
|
134 | synchronous, and C<AnyEvent::Fork::RPC::Async> for asynchronous mode. |
|
|
135 | |
250 | |
136 | If you use a template process and want to fork both sync and async |
251 | The callback to invoke on events. |
137 | children, then it is permissible to load both modules. |
|
|
138 | |
252 | |
139 | =item serialiser => $string (default: '(sub { pack "(w/a*)*", @_ }, sub { unpack "(w/a*)*", shift })') |
253 | =item init => $initfunction (default: none) |
140 | |
254 | |
141 | All arguments, result data and event data have to be serialised to be |
255 | The function to call in the child, once before handling requests. |
142 | transferred between the processes. For this, they have to be frozen and |
|
|
143 | thawed in both parent and child processes. |
|
|
144 | |
256 | |
145 | By default, only octet strings can be passed between the processes, which |
257 | =item serialiser => $serialiser (defailt: $AnyEvent::Fork::RPC::STRING_SERIALISER) |
146 | is reasonably fast and efficient. |
|
|
147 | |
258 | |
148 | For more complicated use cases, you can provide your own freeze and thaw |
259 | The serialiser to use. |
149 | functions, by specifying a string with perl source code. It's supposed to |
|
|
150 | return two code references when evaluated: the first receives a list of |
|
|
151 | perl values and must return an octet string. The second receives the octet |
|
|
152 | string and must return the original list of values. |
|
|
153 | |
|
|
154 | If you need an external module for serialisation, then you can either |
|
|
155 | pre-load it into your L<AnyEvent::Fork> process, or you can add a C<use> |
|
|
156 | or C<require> statement into the serialiser string. Or both. |
|
|
157 | |
260 | |
158 | =back |
261 | =back |
159 | |
262 | |
160 | See the examples section earlier in this document for some actual |
263 | =back |
161 | examples. |
|
|
162 | |
264 | |
163 | =cut |
265 | =cut |
164 | |
266 | |
165 | sub new { |
267 | sub run { |
166 | my ($self, $function, %arg) = @_; |
268 | my ($template, $function, %arg) = @_; |
167 | |
269 | |
168 | my $serialiser = delete $arg{serialiser} || $STRING_SERIALISER; |
270 | my $max = $arg{max} || 4; |
|
|
271 | my $idle = $arg{idle} || 0, |
|
|
272 | my $load = $arg{load} || 2, |
|
|
273 | my $start = $arg{start} || 0.1, |
|
|
274 | my $stop = $arg{stop} || 10, |
169 | my $on_event = delete $arg{on_event}; |
275 | my $on_event = $arg{on_event} || sub { }, |
170 | my $on_error = delete $arg{on_error}; |
|
|
171 | my $on_destroy = delete $arg{on_destroy}; |
276 | my $on_destroy = $arg{on_destroy}; |
|
|
277 | |
|
|
278 | my @rpc = ( |
|
|
279 | async => $arg{async}, |
|
|
280 | init => $arg{init}, |
|
|
281 | serialiser => delete $arg{serialiser}, |
|
|
282 | on_error => $arg{on_error}, |
|
|
283 | ); |
|
|
284 | |
|
|
285 | my (@pool, @queue, $nidle, $start_w, $stop_w, $shutdown); |
|
|
286 | my ($start_worker, $stop_worker, $want_start, $want_stop, $scheduler); |
|
|
287 | |
|
|
288 | my $destroy_guard = Guard::guard { |
|
|
289 | $on_destroy->() |
|
|
290 | if $on_destroy; |
|
|
291 | }; |
|
|
292 | |
|
|
293 | $template |
|
|
294 | ->require ("AnyEvent::Fork::RPC::" . ($arg{async} ? "Async" : "Sync")) |
|
|
295 | ->eval (' |
|
|
296 | my ($magic0, $magic1) = @_; |
|
|
297 | sub AnyEvent::Fork::Pool::retire() { |
|
|
298 | AnyEvent::Fork::RPC::event $magic0, "quit", $magic1; |
|
|
299 | } |
|
|
300 | ', $magic0, $magic1) |
172 | |
301 | ; |
173 | # default for on_error is to on_event, if specified |
|
|
174 | $on_error ||= $on_event |
|
|
175 | ? sub { $on_event->(error => shift) } |
|
|
176 | : sub { die "AnyEvent::Fork::RPC: uncaught error: $_[0].\n" }; |
|
|
177 | |
302 | |
178 | # default for on_event is to raise an error |
303 | $start_worker = sub { |
179 | $on_event ||= sub { $on_error->("event received, but no on_event handler") }; |
304 | my $proc = [0, 0, undef]; # load, index, rpc |
180 | |
305 | |
181 | my ($f, $t) = eval $serialiser; die $@ if $@; |
306 | $proc->[2] = $template |
|
|
307 | ->fork |
|
|
308 | ->AnyEvent::Fork::RPC::run ($function, |
|
|
309 | @rpc, |
|
|
310 | on_event => sub { |
|
|
311 | if (@_ == 3 && $_[0] eq $magic0 && $_[2] eq $magic1) { |
|
|
312 | $destroy_guard if 0; # keep it alive |
182 | |
313 | |
183 | my (@rcb, %rcb, $fh, $shutdown, $wbuf, $ww); |
314 | $_[1] eq "quit" and $stop_worker->($proc); |
184 | my ($rlen, $rbuf, $rw) = 512 - 16; |
315 | return; |
|
|
316 | } |
185 | |
317 | |
186 | my $wcb = sub { |
318 | &$on_event; |
187 | my $len = syswrite $fh, $wbuf; |
319 | }, |
|
|
320 | ) |
|
|
321 | ; |
188 | |
322 | |
189 | unless (defined $len) { |
323 | ++$nidle; |
190 | if ($! != Errno::EAGAIN && $! != Errno::EWOULDBLOCK) { |
324 | Array::Heap::push_heap_idx @pool, $proc; |
191 | undef $rw; undef $ww; # it ends here |
325 | |
192 | $on_error->("$!"); |
326 | Scalar::Util::weaken $proc; |
|
|
327 | }; |
|
|
328 | |
|
|
329 | $stop_worker = sub { |
|
|
330 | my $proc = shift; |
|
|
331 | |
|
|
332 | $proc->[0] |
|
|
333 | or --$nidle; |
|
|
334 | |
|
|
335 | Array::Heap::splice_heap_idx @pool, $proc->[1] |
|
|
336 | if defined $proc->[1]; |
|
|
337 | |
|
|
338 | @$proc = 0; # tell others to leave it be |
|
|
339 | }; |
|
|
340 | |
|
|
341 | $want_start = sub { |
|
|
342 | undef $stop_w; |
|
|
343 | |
|
|
344 | $start_w ||= AE::timer $start, $start, sub { |
|
|
345 | if (($nidle < $idle || @queue) && @pool < $max) { |
|
|
346 | $start_worker->(); |
|
|
347 | $scheduler->(); |
|
|
348 | } else { |
|
|
349 | undef $start_w; |
193 | } |
350 | } |
194 | } |
351 | }; |
|
|
352 | }; |
195 | |
353 | |
196 | substr $wbuf, 0, $len, ""; |
354 | $want_stop = sub { |
|
|
355 | $stop_w ||= AE::timer $stop, $stop, sub { |
|
|
356 | $stop_worker->($pool[0]) |
|
|
357 | if $nidle; |
197 | |
358 | |
198 | unless (length $wbuf) { |
359 | undef $stop_w |
|
|
360 | if $nidle <= $idle; |
|
|
361 | }; |
|
|
362 | }; |
|
|
363 | |
|
|
364 | $scheduler = sub { |
|
|
365 | if (@queue) { |
|
|
366 | while (@queue) { |
|
|
367 | @pool or $start_worker->(); |
|
|
368 | |
|
|
369 | my $proc = $pool[0]; |
|
|
370 | |
|
|
371 | if ($proc->[0] < $load) { |
|
|
372 | # found free worker, increase load |
|
|
373 | unless ($proc->[0]++) { |
|
|
374 | # worker became busy |
|
|
375 | --$nidle |
|
|
376 | or undef $stop_w; |
|
|
377 | |
|
|
378 | $want_start->() |
|
|
379 | if $nidle < $idle && @pool < $max; |
|
|
380 | } |
|
|
381 | |
|
|
382 | Array::Heap::adjust_heap_idx @pool, 0; |
|
|
383 | |
|
|
384 | my $job = shift @queue; |
|
|
385 | my $ocb = pop @$job; |
|
|
386 | |
|
|
387 | $proc->[2]->(@$job, sub { |
|
|
388 | # reduce load |
|
|
389 | --$proc->[0] # worker still busy? |
|
|
390 | or ++$nidle > $idle # not too many idle processes? |
|
|
391 | or $want_stop->(); |
|
|
392 | |
|
|
393 | Array::Heap::adjust_heap_idx @pool, $proc->[1] |
|
|
394 | if defined $proc->[1]; |
|
|
395 | |
|
|
396 | &$ocb; |
|
|
397 | |
|
|
398 | $scheduler->(); |
|
|
399 | }); |
|
|
400 | } else { |
|
|
401 | $want_start->() |
|
|
402 | unless @pool >= $max; |
|
|
403 | |
|
|
404 | last; |
|
|
405 | } |
|
|
406 | } |
|
|
407 | } elsif ($shutdown) { |
|
|
408 | @pool = (); |
199 | undef $ww; |
409 | undef $start_w; |
200 | $shutdown and shutdown $fh, 1; |
410 | undef $start_worker; # frees $destroy_guard reference |
|
|
411 | |
|
|
412 | $stop_worker->($pool[0]) |
|
|
413 | while $nidle; |
201 | } |
414 | } |
202 | }; |
415 | }; |
203 | |
416 | |
204 | my $module = "AnyEvent::Fork::RPC::" . ($arg{async} ? "Async" : "Sync"); |
417 | my $shutdown_guard = Guard::guard { |
|
|
418 | $shutdown = 1; |
|
|
419 | $scheduler->(); |
|
|
420 | }; |
205 | |
421 | |
206 | $self->require ($module) |
422 | $start_worker->() |
207 | ->send_arg ($function, $arg{init}, $serialiser) |
423 | while @pool < $idle; |
208 | ->run ("$module\::run", sub { |
|
|
209 | $fh = shift; |
|
|
210 | |
424 | |
211 | my ($id, $len); |
425 | sub { |
212 | $rw = AE::io $fh, 0, sub { |
426 | $shutdown_guard if 0; # keep it alive |
213 | $rlen = $rlen * 2 + 16 if $rlen - 128 < length $rbuf; |
|
|
214 | $len = sysread $fh, $rbuf, $rlen - length $rbuf, length $rbuf; |
|
|
215 | |
427 | |
216 | if ($len) { |
428 | $start_worker->() |
217 | while (8 <= length $rbuf) { |
429 | unless @pool; |
218 | ($id, $len) = unpack "LL", $rbuf; |
|
|
219 | 8 + $len <= length $rbuf |
|
|
220 | or last; |
|
|
221 | |
430 | |
222 | my @r = $t->(substr $rbuf, 8, $len); |
431 | push @queue, [@_]; |
223 | substr $rbuf, 0, 8 + $len, ""; |
432 | $scheduler->(); |
|
|
433 | } |
|
|
434 | } |
224 | |
435 | |
225 | if ($id) { |
436 | =item $pool->(..., $cb->(...)) |
226 | if (@rcb) { |
437 | |
227 | (shift @rcb)->(@r); |
438 | Call the RPC function of a worker with the given arguments, and when the |
228 | } elsif (my $cb = delete $rcb{$id}) { |
439 | worker is done, call the C<$cb> with the results, just like calling the |
229 | $cb->(@r); |
440 | RPC object durectly - see the L<AnyEvent::Fork::RPC> documentation for |
230 | } else { |
441 | details on the RPC API. |
231 | undef $rw; undef $ww; |
442 | |
232 | $on_error->("unexpected data from child"); |
443 | If there is no free worker, the call will be queued until a worker becomes |
|
|
444 | available. |
|
|
445 | |
|
|
446 | Note that there can be considerable time between calling this method and |
|
|
447 | the call actually being executed. During this time, the parameters passed |
|
|
448 | to this function are effectively read-only - modifying them after the call |
|
|
449 | and before the callback is invoked causes undefined behaviour. |
|
|
450 | |
|
|
451 | =cut |
|
|
452 | |
|
|
453 | =item $cpus = AnyEvent::Fork::Pool::ncpu [$default_cpus] |
|
|
454 | |
|
|
455 | =item ($cpus, $eus) = AnyEvent::Fork::Pool::ncpu [$default_cpus] |
|
|
456 | |
|
|
457 | Tries to detect the number of CPUs (C<$cpus> often called cpu cores |
|
|
458 | nowadays) and execution units (C<$eus>) which include e.g. extra |
|
|
459 | hyperthreaded units). When C<$cpus> cannot be determined reliably, |
|
|
460 | C<$default_cpus> is returned for both values, or C<1> if it is missing. |
|
|
461 | |
|
|
462 | For normal CPU bound uses, it is wise to have as many worker processes |
|
|
463 | as CPUs in the system (C<$cpus>), if nothing else uses the CPU. Using |
|
|
464 | hyperthreading is usually detrimental to performance, but in those rare |
|
|
465 | cases where that really helps it might be beneficial to use more workers |
|
|
466 | (C<$eus>). |
|
|
467 | |
|
|
468 | Currently, F</proc/cpuinfo> is parsed on GNU/Linux systems for both |
|
|
469 | C<$cpus> and C<$eu>, and on {Free,Net,Open}BSD, F<sysctl -n hw.ncpu> is |
|
|
470 | used for C<$cpus>. |
|
|
471 | |
|
|
472 | Example: create a worker pool with as many workers as cpu cores, or C<2>, |
|
|
473 | if the actual number could not be determined. |
|
|
474 | |
|
|
475 | $fork->AnyEvent::Fork::Pool::run ("myworker::function", |
|
|
476 | max => (scalar AnyEvent::Fork::Pool::ncpu 2), |
|
|
477 | ); |
|
|
478 | |
|
|
479 | =cut |
|
|
480 | |
|
|
481 | BEGIN { |
|
|
482 | if ($^O eq "linux") { |
|
|
483 | *ncpu = sub(;$) { |
|
|
484 | my ($cpus, $eus); |
|
|
485 | |
|
|
486 | if (open my $fh, "<", "/proc/cpuinfo") { |
|
|
487 | my %id; |
|
|
488 | |
|
|
489 | while (<$fh>) { |
|
|
490 | if (/^core id\s*:\s*(\d+)/) { |
233 | } |
491 | ++$eus; |
234 | } else { |
492 | undef $id{$1}; |
235 | $on_event->(@r); |
|
|
236 | } |
493 | } |
237 | } |
494 | } |
238 | } elsif (defined $len) { |
|
|
239 | undef $rw; undef $ww; # it ends here |
|
|
240 | |
495 | |
241 | if (@rcb || %rcb) { |
496 | $cpus = scalar keys %id; |
242 | $on_error->("unexpected eof"); |
|
|
243 | } else { |
497 | } else { |
244 | $on_destroy->(); |
498 | $cpus = $eus = @_ ? shift : 1; |
245 | } |
|
|
246 | } elsif ($! != Errno::EAGAIN && $! != Errno::EWOULDBLOCK) { |
|
|
247 | undef $rw; undef $ww; # it ends here |
|
|
248 | $on_error->("read: $!"); |
|
|
249 | } |
499 | } |
|
|
500 | wantarray ? ($cpus, $eus) : $cpus |
250 | }; |
501 | }; |
251 | |
502 | } elsif ($^O eq "freebsd" || $^O eq "netbsd" || $^O eq "openbsd") { |
252 | $ww ||= AE::io $fh, 1, $wcb; |
503 | *ncpu = sub(;$) { |
253 | }); |
504 | my $cpus = qx<sysctl -n hw.ncpu> * 1 |
254 | |
505 | || (@_ ? shift : 1); |
255 | my $guard = Guard::guard { |
506 | wantarray ? ($cpus, $cpus) : $cpus |
256 | $shutdown = 1; |
507 | }; |
257 | $ww ||= $fh && AE::io $fh, 1, $wcb; |
508 | } else { |
|
|
509 | *ncpu = sub(;$) { |
|
|
510 | my $cpus = @_ ? shift : 1; |
|
|
511 | wantarray ? ($cpus, $cpus) : $cpus |
|
|
512 | }; |
258 | }; |
513 | } |
259 | |
|
|
260 | my $id; |
|
|
261 | |
|
|
262 | $arg{async} |
|
|
263 | ? sub { |
|
|
264 | $id = ($id == 0xffffffff ? 0 : $id) + 1; |
|
|
265 | $id = ($id == 0xffffffff ? 0 : $id) + 1 while exists $rcb{$id}; # rarely loops |
|
|
266 | |
|
|
267 | $rcb{$id} = pop; |
|
|
268 | |
|
|
269 | $guard; # keep it alive |
|
|
270 | |
|
|
271 | $wbuf .= pack "LL/a*", $id, &$f; |
|
|
272 | $ww ||= $fh && AE::io $fh, 1, $wcb; |
|
|
273 | } |
|
|
274 | : sub { |
|
|
275 | push @rcb, pop; |
|
|
276 | |
|
|
277 | $guard; # keep it alive |
|
|
278 | |
|
|
279 | $wbuf .= pack "L/a*", &$f; |
|
|
280 | $ww ||= $fh && AE::io $fh, 1, $wcb; |
|
|
281 | } |
|
|
282 | } |
514 | } |
283 | |
515 | |
284 | =item $pool->call (..., $cb->(...)) |
|
|
285 | |
|
|
286 | =back |
516 | =back |
|
|
517 | |
|
|
518 | =head1 CHILD USAGE |
|
|
519 | |
|
|
520 | In addition to the L<AnyEvent::Fork::RPC> API, this module implements one |
|
|
521 | more child-side function: |
|
|
522 | |
|
|
523 | =over 4 |
|
|
524 | |
|
|
525 | =item AnyEvent::Fork::Pool::retire () |
|
|
526 | |
|
|
527 | This function sends an event to the parent process to request retirement: |
|
|
528 | the worker is removed from the pool and no new jobs will be sent to it, |
|
|
529 | but it has to handle the jobs that are already queued. |
|
|
530 | |
|
|
531 | The parentheses are part of the syntax: the function usually isn't defined |
|
|
532 | when you compile your code (because that happens I<before> handing the |
|
|
533 | template process over to C<AnyEvent::Fork::Pool::run>, so you need the |
|
|
534 | empty parentheses to tell Perl that the function is indeed a function. |
|
|
535 | |
|
|
536 | Retiring a worker can be useful to gracefully shut it down when the worker |
|
|
537 | deems this useful. For example, after executing a job, one could check |
|
|
538 | the process size or the number of jobs handled so far, and if either is |
|
|
539 | too high, the worker could ask to get retired, to avoid memory leaks to |
|
|
540 | accumulate. |
|
|
541 | |
|
|
542 | =back |
|
|
543 | |
|
|
544 | =head1 POOL PARAMETERS RECIPES |
|
|
545 | |
|
|
546 | This section describes some recipes for pool paramaters. These are mostly |
|
|
547 | meant for the synchronous RPC backend, as the asynchronous RPC backend |
|
|
548 | changes the rules considerably, making workers themselves responsible for |
|
|
549 | their scheduling. |
|
|
550 | |
|
|
551 | =over 4 |
|
|
552 | |
|
|
553 | =item low latency - set load = 1 |
|
|
554 | |
|
|
555 | If you need a deterministic low latency, you should set the C<load> |
|
|
556 | parameter to C<1>. This ensures that never more than one job is sent to |
|
|
557 | each worker. This avoids having to wait for a previous job to finish. |
|
|
558 | |
|
|
559 | This makes most sense with the synchronous (default) backend, as the |
|
|
560 | asynchronous backend can handle multiple requests concurrently. |
|
|
561 | |
|
|
562 | =item lowest latency - set load = 1 and idle = max |
|
|
563 | |
|
|
564 | To achieve the lowest latency, you additionally should disable any dynamic |
|
|
565 | resizing of the pool by setting C<idle> to the same value as C<max>. |
|
|
566 | |
|
|
567 | =item high throughput, cpu bound jobs - set load >= 2, max = #cpus |
|
|
568 | |
|
|
569 | To get high throughput with cpu-bound jobs, you should set the maximum |
|
|
570 | pool size to the number of cpus in your system, and C<load> to at least |
|
|
571 | C<2>, to make sure there can be another job waiting for the worker when it |
|
|
572 | has finished one. |
|
|
573 | |
|
|
574 | The value of C<2> for C<load> is the minimum value that I<can> achieve |
|
|
575 | 100% throughput, but if your parent process itself is sometimes busy, you |
|
|
576 | might need higher values. Also there is a limit on the amount of data that |
|
|
577 | can be "in flight" to the worker, so if you send big blobs of data to your |
|
|
578 | worker, C<load> might have much less of an effect. |
|
|
579 | |
|
|
580 | =item high throughput, I/O bound jobs - set load >= 2, max = 1, or very high |
|
|
581 | |
|
|
582 | When your jobs are I/O bound, using more workers usually boils down to |
|
|
583 | higher throughput, depending very much on your actual workload - sometimes |
|
|
584 | having only one worker is best, for example, when you read or write big |
|
|
585 | files at maixmum speed, as a second worker will increase seek times. |
|
|
586 | |
|
|
587 | =back |
|
|
588 | |
|
|
589 | =head1 EXCEPTIONS |
|
|
590 | |
|
|
591 | The same "policy" as with L<AnyEvent::Fork::RPC> applies - exceptins will |
|
|
592 | not be caught, and exceptions in both worker and in callbacks causes |
|
|
593 | undesirable or undefined behaviour. |
287 | |
594 | |
288 | =head1 SEE ALSO |
595 | =head1 SEE ALSO |
289 | |
596 | |
290 | L<AnyEvent::Fork>, to create the processes in the first place. |
597 | L<AnyEvent::Fork>, to create the processes in the first place. |
291 | |
598 | |