… | |
… | |
3 | AnyEvent::Fork::Pool - simple process pool manager on top of AnyEvent::Fork |
3 | AnyEvent::Fork::Pool - simple process pool manager on top of AnyEvent::Fork |
4 | |
4 | |
5 | =head1 SYNOPSIS |
5 | =head1 SYNOPSIS |
6 | |
6 | |
7 | use AnyEvent; |
7 | use AnyEvent; |
|
|
8 | use AnyEvent::Fork; |
8 | use AnyEvent::Fork::Pool; |
9 | use AnyEvent::Fork::Pool; |
9 | # use AnyEvent::Fork is not needed |
|
|
10 | |
10 | |
11 | # all parameters with default values |
11 | # all possible parameters shown, with default values |
12 | my $pool = new AnyEvent::Fork::Pool |
12 | my $pool = AnyEvent::Fork |
13 | "MyWorker::run", |
13 | ->new |
|
|
14 | ->require ("MyWorker") |
|
|
15 | ->AnyEvent::Fork::Pool::run ( |
|
|
16 | "MyWorker::run", # the worker function |
14 | |
17 | |
15 | # pool management |
18 | # pool management |
16 | min => 0, # minimum # of processes |
|
|
17 | max => 4, # maximum # of processes |
19 | max => 4, # absolute maximum # of processes |
18 | max_queue => 2, # queue at most this number of jobs per process |
20 | idle => 0, # minimum # of idle processes |
|
|
21 | load => 2, # queue at most this number of jobs per process |
19 | min_delay => 0, # wait this many seconds before starting a new process |
22 | start => 0.1, # wait this many seconds before starting a new process |
20 | min_idle => 0, # try to have at least this amount of idle processes |
|
|
21 | max_idle => 1, # at most this many idle processes |
|
|
22 | idle_time => 1, # wait this many seconds before killing an idle process |
23 | stop => 10, # wait this many seconds before stopping an idle process |
23 | on_destroy => (my $finish = AE::cv), |
24 | on_destroy => (my $finish = AE::cv), # called when object is destroyed |
24 | |
25 | |
25 | # template process |
|
|
26 | template => AnyEvent::Fork->new, # the template process to use |
|
|
27 | require => [MyWorker::], # module(s) to load |
|
|
28 | eval => "# perl code to execute in template", |
|
|
29 | |
|
|
30 | # parameters passed to AnyEvent::Fork::RPC |
26 | # parameters passed to AnyEvent::Fork::RPC |
31 | async => 0, |
27 | async => 0, |
32 | on_error => sub { die "FATAL: $_[0]\n" }, |
28 | on_error => sub { die "FATAL: $_[0]\n" }, |
33 | on_event => sub { my @ev = @_ }, |
29 | on_event => sub { my @ev = @_ }, |
34 | init => "MyWorker::init", |
30 | init => "MyWorker::init", |
35 | serialiser => $AnyEvent::Fork::RPC::STRING_SERIALISER, |
31 | serialiser => $AnyEvent::Fork::RPC::STRING_SERIALISER, |
36 | ; |
32 | ); |
37 | |
33 | |
38 | for (1..10) { |
34 | for (1..10) { |
39 | $pool->call (doit => $_, sub { |
35 | $pool->(doit => $_, sub { |
40 | print "MyWorker::run returned @_\n"; |
36 | print "MyWorker::run returned @_\n"; |
41 | }); |
37 | }); |
42 | } |
38 | } |
43 | |
39 | |
44 | undef $pool; |
40 | undef $pool; |
45 | |
41 | |
46 | $finish->recv; |
42 | $finish->recv; |
47 | |
43 | |
48 | =head1 DESCRIPTION |
44 | =head1 DESCRIPTION |
49 | |
45 | |
50 | This module uses processes created via L<AnyEvent::Fork> and the RPC |
46 | This module uses processes created via L<AnyEvent::Fork> (or |
51 | protocol implement in L<AnyEvent::Fork::RPC> to create a load-balanced |
47 | L<AnyEvent::Fork::Remote>) and the RPC protocol implement in |
52 | pool of processes that handles jobs. |
48 | L<AnyEvent::Fork::RPC> to create a load-balanced pool of processes that |
|
|
49 | handles jobs. |
53 | |
50 | |
54 | Understanding of L<AnyEvent::Fork> is helpful but not critical to be able |
51 | Understanding L<AnyEvent::Fork> is helpful but not required to use this |
55 | to use this module, but a thorough understanding of L<AnyEvent::Fork::RPC> |
52 | module, but a thorough understanding of L<AnyEvent::Fork::RPC> is, as |
56 | is, as it defines the actual API that needs to be implemented in the |
53 | it defines the actual API that needs to be implemented in the worker |
57 | children. |
54 | processes. |
58 | |
|
|
59 | =head1 EXAMPLES |
|
|
60 | |
55 | |
61 | =head1 PARENT USAGE |
56 | =head1 PARENT USAGE |
62 | |
57 | |
|
|
58 | To create a pool, you first have to create a L<AnyEvent::Fork> object - |
|
|
59 | this object becomes your template process. Whenever a new worker process |
|
|
60 | is needed, it is forked from this template process. Then you need to |
|
|
61 | "hand off" this template process to the C<AnyEvent::Fork::Pool> module by |
|
|
62 | calling its run method on it: |
|
|
63 | |
|
|
64 | my $template = AnyEvent::Fork |
|
|
65 | ->new |
|
|
66 | ->require ("SomeModule", "MyWorkerModule"); |
|
|
67 | |
|
|
68 | my $pool = $template->AnyEvent::Fork::Pool::run ("MyWorkerModule::myfunction"); |
|
|
69 | |
|
|
70 | The pool "object" is not a regular Perl object, but a code reference that |
|
|
71 | you can call and that works roughly like calling the worker function |
|
|
72 | directly, except that it returns nothing but instead you need to specify a |
|
|
73 | callback to be invoked once results are in: |
|
|
74 | |
|
|
75 | $pool->(1, 2, 3, sub { warn "myfunction(1,2,3) returned @_" }); |
|
|
76 | |
63 | =over 4 |
77 | =over 4 |
64 | |
78 | |
65 | =cut |
79 | =cut |
66 | |
80 | |
67 | package AnyEvent::Fork::Pool; |
81 | package AnyEvent::Fork::Pool; |
68 | |
82 | |
69 | use common::sense; |
83 | use common::sense; |
70 | |
84 | |
71 | use Scalar::Util (); |
85 | use Scalar::Util (); |
72 | |
86 | |
|
|
87 | use Guard (); |
73 | use Array::Heap (); |
88 | use Array::Heap (); |
74 | |
89 | |
75 | use AnyEvent; |
90 | use AnyEvent; |
76 | use AnyEvent::Fork; # we don't actually depend on it, this is for convenience |
|
|
77 | use AnyEvent::Fork::RPC; |
91 | use AnyEvent::Fork::RPC; |
78 | |
92 | |
|
|
93 | # these are used for the first and last argument of events |
|
|
94 | # in the hope of not colliding. yes, I don't like it either, |
|
|
95 | # but didn't come up with an obviously better alternative. |
79 | my $magic0 = ':t6Z@HK1N%Dx@_7?=~-7NQgWDdAs6a,jFN=wLO0*jD*1%P'; |
96 | my $magic0 = ':t6Z@HK1N%Dx@_7?=~-7NQgWDdAs6a,jFN=wLO0*jD*1%P'; |
80 | my $magic2 = '<~53rexz.U`!]X[A235^"fyEoiTF\T~oH1l/N6+Djep9b~bI9`\1x%B~vWO1q*'; |
97 | my $magic1 = '<~53rexz.U`!]X[A235^"fyEoiTF\T~oH1l/N6+Djep9b~bI9`\1x%B~vWO1q*'; |
81 | |
98 | |
82 | our $VERSION = 0.1; |
99 | our $VERSION = 1.3; |
83 | |
100 | |
84 | =item my $rpc = new AnyEvent::Fork::Pool $function, [key => value...] |
101 | =item my $pool = AnyEvent::Fork::Pool::run $fork, $function, [key => value...] |
|
|
102 | |
|
|
103 | The traditional way to call the pool creation function. But it is way |
|
|
104 | cooler to call it in the following way: |
|
|
105 | |
|
|
106 | =item my $pool = $fork->AnyEvent::Fork::Pool::run ($function, [key => value...]) |
85 | |
107 | |
86 | Creates a new pool object with the specified C<$function> as function |
108 | Creates a new pool object with the specified C<$function> as function |
87 | (name) to call for each request. |
109 | (name) to call for each request. The pool uses the C<$fork> object as the |
88 | |
110 | template when creating worker processes. |
89 | A pool consists of a template process that contains the code and data that |
|
|
90 | the worker processes need. And a number of worker processes that have been |
|
|
91 | forked off of that template process. |
|
|
92 | |
111 | |
93 | You can supply your own template process, or tell C<AnyEvent::Fork::Pool> |
112 | You can supply your own template process, or tell C<AnyEvent::Fork::Pool> |
94 | to create one. |
113 | to create one. |
95 | |
114 | |
96 | A relatively large number of key/value pairs can be specified to influence |
115 | A relatively large number of key/value pairs can be specified to influence |
… | |
… | |
101 | |
120 | |
102 | =item Pool Management |
121 | =item Pool Management |
103 | |
122 | |
104 | The pool consists of a certain number of worker processes. These options |
123 | The pool consists of a certain number of worker processes. These options |
105 | decide how many of these processes exist and when they are started and |
124 | decide how many of these processes exist and when they are started and |
106 | stopp.ed |
125 | stopped. |
|
|
126 | |
|
|
127 | The worker pool is dynamically resized, according to (perceived :) |
|
|
128 | load. The minimum size is given by the C<idle> parameter and the maximum |
|
|
129 | size is given by the C<max> parameter. A new worker is started every |
|
|
130 | C<start> seconds at most, and an idle worker is stopped at most every |
|
|
131 | C<stop> second. |
|
|
132 | |
|
|
133 | You can specify the amount of jobs sent to a worker concurrently using the |
|
|
134 | C<load> parameter. |
107 | |
135 | |
108 | =over 4 |
136 | =over 4 |
109 | |
137 | |
110 | =item min => $count (default: 0) |
138 | =item idle => $count (default: 0) |
111 | |
139 | |
112 | The minimum number of processes in the pool, in addition to the template |
140 | The minimum amount of idle processes in the pool - when there are fewer |
113 | process. Even when idle, there will never be fewer than this number of |
141 | than this many idle workers, C<AnyEvent::Fork::Pool> will try to start new |
114 | worker processes. The default means that the pool can be empty. |
142 | ones, subject to the limits set by C<max> and C<start>. |
|
|
143 | |
|
|
144 | This is also the initial amount of workers in the pool. The default of |
|
|
145 | zero means that the pool starts empty and can shrink back to zero workers |
|
|
146 | over time. |
115 | |
147 | |
116 | =item max => $count (default: 4) |
148 | =item max => $count (default: 4) |
117 | |
149 | |
118 | The maximum number of processes in the pool, in addition to the template |
150 | The maximum number of processes in the pool, in addition to the template |
119 | process. C<AnyEvent::Fork::Pool> will never create more than this number |
151 | process. C<AnyEvent::Fork::Pool> will never have more than this number of |
120 | of processes. |
152 | worker processes, although there can be more temporarily when a worker is |
|
|
153 | shut down and hasn't exited yet. |
121 | |
154 | |
122 | =item max_queue => $count (default: 2) |
155 | =item load => $count (default: 2) |
123 | |
156 | |
124 | The maximum number of jobs sent to a single worker process. Worker |
157 | The maximum number of concurrent jobs sent to a single worker process. |
125 | processes that handle this number of jobs already are called "busy". |
|
|
126 | |
158 | |
127 | Jobs that cannot be sent to a worker immediately (because all workers are |
159 | Jobs that cannot be sent to a worker immediately (because all workers are |
128 | busy) will be queued until a worker is available. |
160 | busy) will be queued until a worker is available. |
129 | |
161 | |
|
|
162 | Setting this low improves latency. For example, at C<1>, every job that |
|
|
163 | is sent to a worker is sent to a completely idle worker that doesn't run |
|
|
164 | any other jobs. The downside is that throughput is reduced - a worker that |
|
|
165 | finishes a job needs to wait for a new job from the parent. |
|
|
166 | |
|
|
167 | The default of C<2> is usually a good compromise. |
|
|
168 | |
130 | =item min_delay => $seconds (default: 0) |
169 | =item start => $seconds (default: 0.1) |
131 | |
170 | |
132 | When a job is queued and all workers are busy, a timer is started. If the |
171 | When there are fewer than C<idle> workers (or all workers are completely |
133 | timer elapses and there are still jobs that cannot be queued to a worker, |
172 | busy), then a timer is started. If the timer elapses and there are still |
134 | a new worker is started. |
173 | jobs that cannot be queued to a worker, a new worker is started. |
135 | |
174 | |
136 | This configurs the time that all workers must be busy before a new worker |
175 | This sets the minimum time that all workers must be busy before a new |
137 | is started. Or, put differently, the minimum delay betwene starting new |
176 | worker is started. Or, put differently, the minimum delay between starting |
138 | workers. |
177 | new workers. |
139 | |
178 | |
140 | The delay is zero by default, which means new workers will be started |
179 | The delay is small by default, which means new workers will be started |
141 | without delay. |
180 | relatively quickly. A delay of C<0> is possible, and ensures that the pool |
|
|
181 | will grow as quickly as possible under load. |
142 | |
182 | |
143 | =item min_idle => $count (default: 0) |
183 | Non-zero values are useful to avoid "exploding" a pool because a lot of |
|
|
184 | jobs are queued in an instant. |
144 | |
185 | |
145 | The minimum number of idle workers - when they are less, more |
186 | Higher values are often useful to improve efficiency at the cost of |
146 | are started. The C<min_delay> is still respected though, and |
187 | latency - when fewer processes can do the job over time, starting more and |
147 | C<min_idle>/C<min_delay> and C<max_idle>/C<idle_time> are useful to |
188 | more is not necessarily going to help. |
148 | dynamically adjust the pool. |
|
|
149 | |
189 | |
150 | =item max_idle => $count (default: 1) |
|
|
151 | |
|
|
152 | The maximum number of idle workers. If a worker becomes idle and there are |
|
|
153 | already this many idle workers, it will be stopped immediately instead of |
|
|
154 | waiting for the idle timer to elapse. |
|
|
155 | |
|
|
156 | =item idle_time => $seconds (default: 1) |
190 | =item stop => $seconds (default: 10) |
157 | |
191 | |
158 | When a worker has no jobs to execute it becomes idle. An idle worker that |
192 | When a worker has no jobs to execute it becomes idle. An idle worker that |
159 | hasn't executed a job within this amount of time will be stopped, unless |
193 | hasn't executed a job within this amount of time will be stopped, unless |
160 | the other parameters say otherwise. |
194 | the other parameters say otherwise. |
161 | |
195 | |
|
|
196 | Setting this to a very high value means that workers stay around longer, |
|
|
197 | even when they have nothing to do, which can be good as they don't have to |
|
|
198 | be started on the netx load spike again. |
|
|
199 | |
|
|
200 | Setting this to a lower value can be useful to avoid memory or simply |
|
|
201 | process table wastage. |
|
|
202 | |
|
|
203 | Usually, setting this to a time longer than the time between load spikes |
|
|
204 | is best - if you expect a lot of requests every minute and little work |
|
|
205 | in between, setting this to longer than a minute avoids having to stop |
|
|
206 | and start workers. On the other hand, you have to ask yourself if letting |
|
|
207 | workers run idle is a good use of your resources. Try to find a good |
|
|
208 | balance between resource usage of your workers and the time to start new |
|
|
209 | workers - the processes created by L<AnyEvent::Fork> itself is fats at |
|
|
210 | creating workers while not using much memory for them, so most of the |
|
|
211 | overhead is likely from your own code. |
|
|
212 | |
162 | =item on_destroy => $callback->() (default: none) |
213 | =item on_destroy => $callback->() (default: none) |
163 | |
214 | |
164 | When a pool object goes out of scope, it will still handle all outstanding |
215 | When a pool object goes out of scope, the outstanding requests are still |
165 | jobs. After that, it will destroy all workers (and also the template |
216 | handled till completion. Only after handling all jobs will the workers |
166 | process if it isn't referenced otherwise). |
217 | be destroyed (and also the template process if it isn't referenced |
|
|
218 | otherwise). |
|
|
219 | |
|
|
220 | To find out when a pool I<really> has finished its work, you can set this |
|
|
221 | callback, which will be called when the pool has been destroyed. |
167 | |
222 | |
168 | =back |
223 | =back |
169 | |
224 | |
170 | =item Template Process |
225 | =item AnyEvent::Fork::RPC Parameters |
171 | |
226 | |
172 | The worker processes are all forked from a single template |
227 | These parameters are all passed more or less directly to |
173 | process. Ideally, all modules and all cdoe used by the worker, as well as |
228 | L<AnyEvent::Fork::RPC>. They are only briefly mentioned here, for |
174 | any shared data structures should be loaded into the template process, to |
229 | their full documentation please refer to the L<AnyEvent::Fork::RPC> |
175 | take advantage of data sharing via fork. |
230 | documentation. Also, the default values mentioned here are only documented |
176 | |
231 | as a best effort - the L<AnyEvent::Fork::RPC> documentation is binding. |
177 | You can create your own template process by creating a L<AnyEvent::Fork> |
|
|
178 | object yourself and passing it as the C<template> parameter, but |
|
|
179 | C<AnyEvent::Fork::Pool> can create one for you, including some standard |
|
|
180 | options. |
|
|
181 | |
232 | |
182 | =over 4 |
233 | =over 4 |
183 | |
234 | |
184 | =item template => $fork (default: C<< AnyEvent::Fork->new >>) |
235 | =item async => $boolean (default: 0) |
185 | |
236 | |
186 | The template process to use, if you want to create your own. |
237 | Whether to use the synchronous or asynchronous RPC backend. |
187 | |
238 | |
188 | =item require => \@modules (default: C<[]>) |
239 | =item on_error => $callback->($message) (default: die with message) |
189 | |
240 | |
190 | The modules in this list will be laoded into the template process. |
241 | The callback to call on any (fatal) errors. |
191 | |
242 | |
192 | =item eval => "# perl code to execute in template" (default: none) |
243 | =item on_event => $callback->(...) (default: C<sub { }>, unlike L<AnyEvent::Fork::RPC>) |
193 | |
244 | |
194 | This is a perl string that is evaluated after creating the template |
245 | The callback to invoke on events. |
195 | process and after requiring the modules. It can do whatever it wants to |
246 | |
196 | configure the process, but it must not do anything that would keep a later |
247 | =item init => $initfunction (default: none) |
197 | fork from working (so must not create event handlers or (real) threads for |
248 | |
198 | example). |
249 | The function to call in the child, once before handling requests. |
|
|
250 | |
|
|
251 | =item serialiser => $serialiser (defailt: $AnyEvent::Fork::RPC::STRING_SERIALISER) |
|
|
252 | |
|
|
253 | The serialiser to use. |
199 | |
254 | |
200 | =back |
255 | =back |
201 | |
256 | |
202 | =item AnyEvent::Fork::RPC Parameters |
|
|
203 | |
|
|
204 | These parameters are all passed directly to L<AnyEvent::Fork::RPC>. They |
|
|
205 | are only briefly mentioned here, for their full documentation |
|
|
206 | please refer to the L<AnyEvent::Fork::RPC> documentation. Also, the |
|
|
207 | default values mentioned here are only documented as a best effort - |
|
|
208 | L<AnyEvent::Fork::RPC> documentation is binding. |
|
|
209 | |
|
|
210 | =over 4 |
|
|
211 | |
|
|
212 | =item async => $boolean (default: 0) |
|
|
213 | |
|
|
214 | Whether to sue the synchronous or asynchronous RPC backend. |
|
|
215 | |
|
|
216 | =item on_error => $callback->($message) (default: die with message) |
|
|
217 | |
|
|
218 | The callback to call on any (fatal) errors. |
|
|
219 | |
|
|
220 | =item on_event => $callback->(...) (default: C<sub { }>, unlike L<AnyEvent::Fork::RPC>) |
|
|
221 | |
|
|
222 | The callback to invoke on events. |
|
|
223 | |
|
|
224 | =item init => $initfunction (default: none) |
|
|
225 | |
|
|
226 | The function to call in the child, once before handling requests. |
|
|
227 | |
|
|
228 | =item serialiser => $serialiser (defailt: $AnyEvent::Fork::RPC::STRING_SERIALISER) |
|
|
229 | |
|
|
230 | The serialiser to use. |
|
|
231 | |
|
|
232 | =back |
257 | =back |
233 | |
258 | |
234 | =back |
|
|
235 | |
|
|
236 | =cut |
259 | =cut |
237 | |
260 | |
238 | sub new { |
261 | sub run { |
239 | my ($class, $function, %arg) = @_; |
262 | my ($template, $function, %arg) = @_; |
240 | |
263 | |
241 | my $self = bless { |
264 | my $max = $arg{max} || 4; |
242 | min => 0, |
265 | my $idle = $arg{idle} || 0, |
243 | max => 4, |
266 | my $load = $arg{load} || 2, |
244 | max_queue => 2, |
267 | my $start = $arg{start} || 0.1, |
245 | min_delay => 0, |
268 | my $stop = $arg{stop} || 10, |
246 | max_idle => 1, |
269 | my $on_event = $arg{on_event} || sub { }, |
247 | idle_time => 1, |
270 | my $on_destroy = $arg{on_destroy}; |
248 | on_event => sub { }, |
|
|
249 | %arg, |
|
|
250 | pool => [], |
|
|
251 | queue => [], |
|
|
252 | }, $class; |
|
|
253 | |
271 | |
254 | $self->{function} = $function; |
272 | my @rpc = ( |
|
|
273 | async => $arg{async}, |
|
|
274 | init => $arg{init}, |
|
|
275 | serialiser => delete $arg{serialiser}, |
|
|
276 | on_error => $arg{on_error}, |
|
|
277 | ); |
255 | |
278 | |
256 | ($self->{template} ||= new AnyEvent::Fork) |
279 | my (@pool, @queue, $nidle, $start_w, $stop_w, $shutdown); |
|
|
280 | my ($start_worker, $stop_worker, $want_start, $want_stop, $scheduler); |
|
|
281 | |
|
|
282 | my $destroy_guard = Guard::guard { |
|
|
283 | $on_destroy->() |
|
|
284 | if $on_destroy; |
|
|
285 | }; |
|
|
286 | |
|
|
287 | $template |
257 | ->require ("AnyEvent::Fork::RPC::" . ($self->{async} ? "Async" : "Sync")) |
288 | ->require ("AnyEvent::Fork::RPC::" . ($arg{async} ? "Async" : "Sync")) |
258 | ->require (@{ delete $self->{require} }) |
|
|
259 | ->eval (' |
289 | ->eval (' |
260 | my ($magic0, $magic2) = @_; |
290 | my ($magic0, $magic1) = @_; |
261 | sub AnyEvent::Fork::Pool::quit() { |
291 | sub AnyEvent::Fork::Pool::retire() { |
262 | AnyEvent::Fork::RPC::on_event $magic0, "quit", $magic2; |
292 | AnyEvent::Fork::RPC::event $magic0, "quit", $magic1; |
263 | } |
293 | } |
264 | ', $magic0, $magic2) |
294 | ', $magic0, $magic1) |
265 | ->eval (delete $self->{eval}); |
295 | ; |
266 | |
296 | |
267 | $self->start |
297 | $start_worker = sub { |
268 | while @{ $self->{pool} } < $self->{min}; |
298 | my $proc = [0, 0, undef]; # load, index, rpc |
269 | |
299 | |
270 | $self |
300 | $proc->[2] = $template |
|
|
301 | ->fork |
|
|
302 | ->AnyEvent::Fork::RPC::run ($function, |
|
|
303 | @rpc, |
|
|
304 | on_event => sub { |
|
|
305 | if (@_ == 3 && $_[0] eq $magic0 && $_[2] eq $magic1) { |
|
|
306 | $destroy_guard if 0; # keep it alive |
|
|
307 | |
|
|
308 | $_[1] eq "quit" and $stop_worker->($proc); |
|
|
309 | return; |
|
|
310 | } |
|
|
311 | |
|
|
312 | &$on_event; |
|
|
313 | }, |
|
|
314 | ) |
|
|
315 | ; |
|
|
316 | |
|
|
317 | ++$nidle; |
|
|
318 | Array::Heap::push_heap_idx @pool, $proc; |
|
|
319 | |
|
|
320 | Scalar::Util::weaken $proc; |
|
|
321 | }; |
|
|
322 | |
|
|
323 | $stop_worker = sub { |
|
|
324 | my $proc = shift; |
|
|
325 | |
|
|
326 | $proc->[0] |
|
|
327 | or --$nidle; |
|
|
328 | |
|
|
329 | Array::Heap::splice_heap_idx @pool, $proc->[1] |
|
|
330 | if defined $proc->[1]; |
|
|
331 | |
|
|
332 | @$proc = 0; # tell others to leave it be |
|
|
333 | }; |
|
|
334 | |
|
|
335 | $want_start = sub { |
|
|
336 | undef $stop_w; |
|
|
337 | |
|
|
338 | $start_w ||= AE::timer $start, $start, sub { |
|
|
339 | if (($nidle < $idle || @queue) && @pool < $max) { |
|
|
340 | $start_worker->(); |
|
|
341 | $scheduler->(); |
|
|
342 | } else { |
|
|
343 | undef $start_w; |
|
|
344 | } |
|
|
345 | }; |
|
|
346 | }; |
|
|
347 | |
|
|
348 | $want_stop = sub { |
|
|
349 | $stop_w ||= AE::timer $stop, $stop, sub { |
|
|
350 | $stop_worker->($pool[0]) |
|
|
351 | if $nidle; |
|
|
352 | |
|
|
353 | undef $stop_w |
|
|
354 | if $nidle <= $idle; |
|
|
355 | }; |
|
|
356 | }; |
|
|
357 | |
|
|
358 | $scheduler = sub { |
|
|
359 | if (@queue) { |
|
|
360 | while (@queue) { |
|
|
361 | @pool or $start_worker->(); |
|
|
362 | |
|
|
363 | my $proc = $pool[0]; |
|
|
364 | |
|
|
365 | if ($proc->[0] < $load) { |
|
|
366 | # found free worker, increase load |
|
|
367 | unless ($proc->[0]++) { |
|
|
368 | # worker became busy |
|
|
369 | --$nidle |
|
|
370 | or undef $stop_w; |
|
|
371 | |
|
|
372 | $want_start->() |
|
|
373 | if $nidle < $idle && @pool < $max; |
|
|
374 | } |
|
|
375 | |
|
|
376 | Array::Heap::adjust_heap_idx @pool, 0; |
|
|
377 | |
|
|
378 | my $job = shift @queue; |
|
|
379 | my $ocb = pop @$job; |
|
|
380 | |
|
|
381 | $proc->[2]->(@$job, sub { |
|
|
382 | # reduce load |
|
|
383 | --$proc->[0] # worker still busy? |
|
|
384 | or ++$nidle > $idle # not too many idle processes? |
|
|
385 | or $want_stop->(); |
|
|
386 | |
|
|
387 | Array::Heap::adjust_heap_idx @pool, $proc->[1] |
|
|
388 | if defined $proc->[1]; |
|
|
389 | |
|
|
390 | &$ocb; |
|
|
391 | |
|
|
392 | $scheduler->(); |
|
|
393 | }); |
|
|
394 | } else { |
|
|
395 | $want_start->() |
|
|
396 | unless @pool >= $max; |
|
|
397 | |
|
|
398 | last; |
|
|
399 | } |
|
|
400 | } |
|
|
401 | } elsif ($shutdown) { |
|
|
402 | undef $_->[2] |
|
|
403 | for @pool; |
|
|
404 | |
|
|
405 | undef $start_w; |
|
|
406 | undef $start_worker; # frees $destroy_guard reference |
|
|
407 | |
|
|
408 | $stop_worker->($pool[0]) |
|
|
409 | while $nidle; |
|
|
410 | } |
|
|
411 | }; |
|
|
412 | |
|
|
413 | my $shutdown_guard = Guard::guard { |
|
|
414 | $shutdown = 1; |
|
|
415 | $scheduler->(); |
|
|
416 | }; |
|
|
417 | |
|
|
418 | $start_worker->() |
|
|
419 | while @pool < $idle; |
|
|
420 | |
|
|
421 | sub { |
|
|
422 | $shutdown_guard if 0; # keep it alive |
|
|
423 | |
|
|
424 | $start_worker->() |
|
|
425 | unless @pool; |
|
|
426 | |
|
|
427 | push @queue, [@_]; |
|
|
428 | $scheduler->(); |
|
|
429 | } |
271 | } |
430 | } |
272 | |
431 | |
273 | sub start { |
|
|
274 | my ($self) = @_; |
|
|
275 | |
|
|
276 | warn "start\n";#d# |
|
|
277 | |
|
|
278 | Scalar::Util::weaken $self; |
|
|
279 | |
|
|
280 | my $proc = [0, undef, undef]; |
|
|
281 | |
|
|
282 | $proc->[1] = $self->{template} |
|
|
283 | ->fork |
|
|
284 | ->AnyEvent::Fork::RPC::run ($self->{function}, |
|
|
285 | async => $self->{async}, |
|
|
286 | init => $self->{init}, |
|
|
287 | serialiser => $self->{serialiser}, |
|
|
288 | on_error => $self->{on_error}, |
|
|
289 | on_event => sub { |
|
|
290 | if (@_ == 3 && $_[0] eq $magic0 && $_[2] eq $magic2) { |
|
|
291 | if ($_[1] eq "quit") { |
|
|
292 | my $pool = $self->{pool}; |
|
|
293 | for (0 .. $#$pool) { |
|
|
294 | if ($pool->[$_] == $proc) { |
|
|
295 | Array::Heap::splice_heap @$pool, $_; |
|
|
296 | return; |
|
|
297 | } |
|
|
298 | } |
|
|
299 | die; |
|
|
300 | } |
|
|
301 | return; |
|
|
302 | } |
|
|
303 | |
|
|
304 | &{ $self->{on_event} }; |
|
|
305 | }, |
|
|
306 | ) |
|
|
307 | ; |
|
|
308 | |
|
|
309 | ++$self->{idle}; |
|
|
310 | Array::Heap::push_heap @{ $self->{pool} }, $proc; |
|
|
311 | } |
|
|
312 | |
|
|
313 | =item $pool->call (..., $cb->(...)) |
432 | =item $pool->(..., $cb->(...)) |
314 | |
433 | |
315 | Call the RPC function of a worker with the given arguments, and when the |
434 | Call the RPC function of a worker with the given arguments, and when the |
316 | worker is done, call the C<$cb> with the results, like just calling the |
435 | worker is done, call the C<$cb> with the results, just like calling the |
317 | L<AnyEvent::Fork::RPC> object directly. |
436 | RPC object durectly - see the L<AnyEvent::Fork::RPC> documentation for |
|
|
437 | details on the RPC API. |
318 | |
438 | |
319 | If there is no free worker, the call will be queued. |
439 | If there is no free worker, the call will be queued until a worker becomes |
|
|
440 | available. |
320 | |
441 | |
321 | Note that there can be considerable time between calling this method and |
442 | Note that there can be considerable time between calling this method and |
322 | the call actually being executed. During this time, the parameters passed |
443 | the call actually being executed. During this time, the parameters passed |
323 | to this function are effectively read-only - modifying them after the call |
444 | to this function are effectively read-only - modifying them after the call |
324 | and before the callback is invoked causes undefined behaviour. |
445 | and before the callback is invoked causes undefined behaviour. |
325 | |
446 | |
326 | =cut |
447 | =cut |
327 | |
448 | |
328 | sub scheduler { |
449 | =item $cpus = AnyEvent::Fork::Pool::ncpu [$default_cpus] |
329 | my $self = shift; |
|
|
330 | |
450 | |
331 | my $pool = $self->{pool}; |
451 | =item ($cpus, $eus) = AnyEvent::Fork::Pool::ncpu [$default_cpus] |
332 | my $queue = $self->{queue}; |
|
|
333 | |
452 | |
334 | $self->start |
453 | Tries to detect the number of CPUs (C<$cpus> often called CPU cores |
335 | unless @$pool; |
454 | nowadays) and execution units (C<$eus>) which include e.g. extra |
|
|
455 | hyperthreaded units). When C<$cpus> cannot be determined reliably, |
|
|
456 | C<$default_cpus> is returned for both values, or C<1> if it is missing. |
336 | |
457 | |
337 | while (@$queue) { |
458 | For normal CPU bound uses, it is wise to have as many worker processes |
338 | my $proc = $pool->[0]; |
459 | as CPUs in the system (C<$cpus>), if nothing else uses the CPU. Using |
|
|
460 | hyperthreading is usually detrimental to performance, but in those rare |
|
|
461 | cases where that really helps it might be beneficial to use more workers |
|
|
462 | (C<$eus>). |
339 | |
463 | |
340 | if ($proc->[0] < $self->{max_queue}) { |
464 | Currently, F</proc/cpuinfo> is parsed on GNU/Linux systems for both |
341 | warn "free $proc $proc->[0]\n";#d# |
465 | C<$cpus> and C<$eus>, and on {Free,Net,Open}BSD, F<sysctl -n hw.ncpu> is |
342 | # found free worker |
466 | used for C<$cpus>. |
343 | --$self->{idle} |
|
|
344 | unless $proc->[0]++; |
|
|
345 | |
467 | |
346 | undef $proc->[2]; |
468 | Example: create a worker pool with as many workers as CPU cores, or C<2>, |
|
|
469 | if the actual number could not be determined. |
347 | |
470 | |
348 | Array::Heap::adjust_heap @$pool, 0; |
471 | $fork->AnyEvent::Fork::Pool::run ("myworker::function", |
|
|
472 | max => (scalar AnyEvent::Fork::Pool::ncpu 2), |
|
|
473 | ); |
349 | |
474 | |
350 | my $job = shift @$queue; |
475 | =cut |
351 | my $ocb = pop @$job; |
|
|
352 | |
476 | |
353 | $proc->[1]->(@$job, sub { |
477 | BEGIN { |
354 | for (0 .. $#$pool) { |
478 | if ($^O eq "linux") { |
355 | if ($pool->[$_] == $proc) { |
479 | *ncpu = sub(;$) { |
356 | # reduce queue counter |
480 | my ($cpus, $eus); |
357 | unless (--$pool->[$_][0]) { |
|
|
358 | # worker becomes idle |
|
|
359 | my $to = ++$self->{idle} > $self->{max_idle} |
|
|
360 | ? 0 |
|
|
361 | : $self->{idle_time}; |
|
|
362 | |
481 | |
363 | $proc->[2] = AE::timer $to, 0, sub { |
482 | if (open my $fh, "<", "/proc/cpuinfo") { |
364 | undef $proc->[2]; |
483 | my %id; |
365 | |
484 | |
366 | warn "destroy $proc afzer $to\n";#d# |
485 | while (<$fh>) { |
367 | |
486 | if (/^core id\s*:\s*(\d+)/) { |
368 | for (0 .. $#$pool) { |
|
|
369 | if ($pool->[$_] == $proc) { |
|
|
370 | Array::Heap::splice_heap @$pool, $_; |
|
|
371 | --$self->{idle}; |
|
|
372 | last; |
|
|
373 | } |
|
|
374 | } |
|
|
375 | }; |
|
|
376 | } |
|
|
377 | |
|
|
378 | Array::Heap::adjust_heap @$pool, $_; |
|
|
379 | last; |
487 | ++$eus; |
|
|
488 | undef $id{$1}; |
380 | } |
489 | } |
381 | } |
490 | } |
382 | &$ocb; |
491 | |
383 | }); |
492 | $cpus = scalar keys %id; |
384 | } else { |
493 | } else { |
385 | warn "busy $proc->[0]\n";#d# |
494 | $cpus = $eus = @_ ? shift : 1; |
386 | # all busy, delay |
|
|
387 | |
|
|
388 | $self->{min_delay_w} ||= AE::timer $self->{min_delay}, 0, sub { |
|
|
389 | delete $self->{min_delay_w}; |
|
|
390 | |
|
|
391 | if (@{ $self->{queue} }) { |
|
|
392 | $self->start; |
|
|
393 | $self->scheduler; |
|
|
394 | } |
|
|
395 | }; |
495 | } |
396 | last; |
496 | wantarray ? ($cpus, $eus) : $cpus |
397 | } |
497 | }; |
|
|
498 | } elsif ($^O eq "freebsd" || $^O eq "netbsd" || $^O eq "openbsd") { |
|
|
499 | *ncpu = sub(;$) { |
|
|
500 | my $cpus = qx<sysctl -n hw.ncpu> * 1 |
|
|
501 | || (@_ ? shift : 1); |
|
|
502 | wantarray ? ($cpus, $cpus) : $cpus |
|
|
503 | }; |
|
|
504 | } else { |
|
|
505 | *ncpu = sub(;$) { |
|
|
506 | my $cpus = @_ ? shift : 1; |
|
|
507 | wantarray ? ($cpus, $cpus) : $cpus |
|
|
508 | }; |
398 | } |
509 | } |
399 | warn "last\n";#d# |
|
|
400 | } |
510 | } |
401 | |
511 | |
402 | sub call { |
|
|
403 | my $self = shift; |
|
|
404 | |
|
|
405 | push @{ $self->{queue} }, [@_]; |
|
|
406 | $self->scheduler; |
|
|
407 | } |
|
|
408 | |
|
|
409 | sub DESTROY { |
|
|
410 | $_[0]{on_destroy}->(); |
|
|
411 | } |
|
|
412 | |
|
|
413 | =back |
512 | =back |
414 | |
513 | |
|
|
514 | =head1 CHILD USAGE |
|
|
515 | |
|
|
516 | In addition to the L<AnyEvent::Fork::RPC> API, this module implements one |
|
|
517 | more child-side function: |
|
|
518 | |
|
|
519 | =over 4 |
|
|
520 | |
|
|
521 | =item AnyEvent::Fork::Pool::retire () |
|
|
522 | |
|
|
523 | This function sends an event to the parent process to request retirement: |
|
|
524 | the worker is removed from the pool and no new jobs will be sent to it, |
|
|
525 | but it still has to handle the jobs that are already queued. |
|
|
526 | |
|
|
527 | The parentheses are part of the syntax: the function usually isn't defined |
|
|
528 | when you compile your code (because that happens I<before> handing the |
|
|
529 | template process over to C<AnyEvent::Fork::Pool::run>, so you need the |
|
|
530 | empty parentheses to tell Perl that the function is indeed a function. |
|
|
531 | |
|
|
532 | Retiring a worker can be useful to gracefully shut it down when the worker |
|
|
533 | deems this useful. For example, after executing a job, it could check the |
|
|
534 | process size or the number of jobs handled so far, and if either is too |
|
|
535 | high, the worker could request to be retired, to avoid memory leaks to |
|
|
536 | accumulate. |
|
|
537 | |
|
|
538 | Example: retire a worker after it has handled roughly 100 requests. It |
|
|
539 | doesn't matter whether you retire at the beginning or end of your request, |
|
|
540 | as the worker will continue to handle some outstanding requests. Likewise, |
|
|
541 | it's ok to call retire multiple times. |
|
|
542 | |
|
|
543 | my $count = 0; |
|
|
544 | |
|
|
545 | sub my::worker { |
|
|
546 | |
|
|
547 | ++$count == 100 |
|
|
548 | and AnyEvent::Fork::Pool::retire (); |
|
|
549 | |
|
|
550 | ... normal code goes here |
|
|
551 | } |
|
|
552 | |
|
|
553 | =back |
|
|
554 | |
|
|
555 | =head1 POOL PARAMETERS RECIPES |
|
|
556 | |
|
|
557 | This section describes some recipes for pool parameters. These are mostly |
|
|
558 | meant for the synchronous RPC backend, as the asynchronous RPC backend |
|
|
559 | changes the rules considerably, making workers themselves responsible for |
|
|
560 | their scheduling. |
|
|
561 | |
|
|
562 | =over 4 |
|
|
563 | |
|
|
564 | =item low latency - set load = 1 |
|
|
565 | |
|
|
566 | If you need a deterministic low latency, you should set the C<load> |
|
|
567 | parameter to C<1>. This ensures that never more than one job is sent to |
|
|
568 | each worker. This avoids having to wait for a previous job to finish. |
|
|
569 | |
|
|
570 | This makes most sense with the synchronous (default) backend, as the |
|
|
571 | asynchronous backend can handle multiple requests concurrently. |
|
|
572 | |
|
|
573 | =item lowest latency - set load = 1 and idle = max |
|
|
574 | |
|
|
575 | To achieve the lowest latency, you additionally should disable any dynamic |
|
|
576 | resizing of the pool by setting C<idle> to the same value as C<max>. |
|
|
577 | |
|
|
578 | =item high throughput, cpu bound jobs - set load >= 2, max = #cpus |
|
|
579 | |
|
|
580 | To get high throughput with cpu-bound jobs, you should set the maximum |
|
|
581 | pool size to the number of cpus in your system, and C<load> to at least |
|
|
582 | C<2>, to make sure there can be another job waiting for the worker when it |
|
|
583 | has finished one. |
|
|
584 | |
|
|
585 | The value of C<2> for C<load> is the minimum value that I<can> achieve |
|
|
586 | 100% throughput, but if your parent process itself is sometimes busy, you |
|
|
587 | might need higher values. Also there is a limit on the amount of data that |
|
|
588 | can be "in flight" to the worker, so if you send big blobs of data to your |
|
|
589 | worker, C<load> might have much less of an effect. |
|
|
590 | |
|
|
591 | =item high throughput, I/O bound jobs - set load >= 2, max = 1, or very high |
|
|
592 | |
|
|
593 | When your jobs are I/O bound, using more workers usually boils down to |
|
|
594 | higher throughput, depending very much on your actual workload - sometimes |
|
|
595 | having only one worker is best, for example, when you read or write big |
|
|
596 | files at maximum speed, as a second worker will increase seek times. |
|
|
597 | |
|
|
598 | =back |
|
|
599 | |
|
|
600 | =head1 EXCEPTIONS |
|
|
601 | |
|
|
602 | The same "policy" as with L<AnyEvent::Fork::RPC> applies - exceptions |
|
|
603 | will not be caught, and exceptions in both worker and in callbacks causes |
|
|
604 | undesirable or undefined behaviour. |
|
|
605 | |
415 | =head1 SEE ALSO |
606 | =head1 SEE ALSO |
416 | |
607 | |
417 | L<AnyEvent::Fork>, to create the processes in the first place. |
608 | L<AnyEvent::Fork>, to create the processes in the first place. |
|
|
609 | |
|
|
610 | L<AnyEvent::Fork::Remote>, likewise, but helpful for remote processes. |
418 | |
611 | |
419 | L<AnyEvent::Fork::RPC>, which implements the RPC protocol and API. |
612 | L<AnyEvent::Fork::RPC>, which implements the RPC protocol and API. |
420 | |
613 | |
421 | =head1 AUTHOR AND CONTACT INFORMATION |
614 | =head1 AUTHOR AND CONTACT INFORMATION |
422 | |
615 | |