… | |
… | |
7 | use AnyEvent; |
7 | use AnyEvent; |
8 | use AnyEvent::Fork::Pool; |
8 | use AnyEvent::Fork::Pool; |
9 | # use AnyEvent::Fork is not needed |
9 | # use AnyEvent::Fork is not needed |
10 | |
10 | |
11 | # all parameters with default values |
11 | # all parameters with default values |
12 | my $pool = new AnyEvent::Fork::Pool |
12 | my $pool = AnyEvent::Fork |
13 | "MyWorker::run", |
13 | ->new |
|
|
14 | ->require ("MyWorker") |
|
|
15 | ->AnyEvent::Fork::Pool::run ( |
|
|
16 | "MyWorker::run", # the worker function |
14 | |
17 | |
15 | # pool management |
18 | # pool management |
16 | min => 0, # minimum # of processes |
|
|
17 | max => 4, # maximum # of processes |
19 | max => 4, # absolute maximum # of processes |
18 | max_queue => 2, # queue at most this number of jobs per process |
20 | idle => 2, # minimum # of idle processes |
|
|
21 | load => 2, # queue at most this number of jobs per process |
19 | min_delay => 0, # wait this many seconds before starting a new process |
22 | start => 0.1, # wait this many seconds before starting a new process |
20 | min_idle => 0, # try to have at least this amount of idle processes |
|
|
21 | max_idle => 1, # at most this many idle processes |
|
|
22 | idle_time => 1, # wait this many seconds before killing an idle process |
23 | stop => 1, # wait this many seconds before stopping an idle process |
23 | on_destroy => (my $finish = AE::cv), |
24 | on_destroy => (my $finish = AE::cv), # called when object is destroyed |
24 | |
25 | |
25 | # template process |
|
|
26 | template => AnyEvent::Fork->new, # the template process to use |
|
|
27 | require => [MyWorker::], # module(s) to load |
|
|
28 | eval => "# perl code to execute in template", |
|
|
29 | |
|
|
30 | # parameters passed to AnyEvent::Fork::RPC |
26 | # parameters passed to AnyEvent::Fork::RPC |
31 | async => 0, |
27 | async => 0, |
32 | on_error => sub { die "FATAL: $_[0]\n" }, |
28 | on_error => sub { die "FATAL: $_[0]\n" }, |
33 | on_event => sub { my @ev = @_ }, |
29 | on_event => sub { my @ev = @_ }, |
34 | init => "MyWorker::init", |
30 | init => "MyWorker::init", |
35 | serialiser => $AnyEvent::Fork::RPC::STRING_SERIALISER, |
31 | serialiser => $AnyEvent::Fork::RPC::STRING_SERIALISER, |
36 | ; |
32 | ); |
37 | |
33 | |
38 | for (1..10) { |
34 | for (1..10) { |
39 | $pool->call (doit => $_, sub { |
35 | $pool->(doit => $_, sub { |
40 | print "MyWorker::run returned @_\n"; |
36 | print "MyWorker::run returned @_\n"; |
41 | }); |
37 | }); |
42 | } |
38 | } |
43 | |
39 | |
44 | undef $pool; |
40 | undef $pool; |
… | |
… | |
68 | |
64 | |
69 | use common::sense; |
65 | use common::sense; |
70 | |
66 | |
71 | use Scalar::Util (); |
67 | use Scalar::Util (); |
72 | |
68 | |
|
|
69 | use Guard (); |
73 | use Array::Heap (); |
70 | use Array::Heap (); |
74 | |
71 | |
75 | use AnyEvent; |
72 | use AnyEvent; |
76 | use AnyEvent::Fork; # we don't actually depend on it, this is for convenience |
73 | use AnyEvent::Fork; # we don't actually depend on it, this is for convenience |
77 | use AnyEvent::Fork::RPC; |
74 | use AnyEvent::Fork::RPC; |
78 | |
75 | |
|
|
76 | # these are used for the first and last argument of events |
|
|
77 | # in the hope of not colliding. yes, I don't like it either, |
|
|
78 | # but didn't come up with an obviously better alternative. |
79 | my $magic0 = ':t6Z@HK1N%Dx@_7?=~-7NQgWDdAs6a,jFN=wLO0*jD*1%P'; |
79 | my $magic0 = ':t6Z@HK1N%Dx@_7?=~-7NQgWDdAs6a,jFN=wLO0*jD*1%P'; |
80 | my $magic2 = '<~53rexz.U`!]X[A235^"fyEoiTF\T~oH1l/N6+Djep9b~bI9`\1x%B~vWO1q*'; |
80 | my $magic1 = '<~53rexz.U`!]X[A235^"fyEoiTF\T~oH1l/N6+Djep9b~bI9`\1x%B~vWO1q*'; |
81 | |
81 | |
82 | our $VERSION = 0.1; |
82 | our $VERSION = 0.1; |
83 | |
83 | |
84 | =item my $rpc = new AnyEvent::Fork::Pool $function, [key => value...] |
84 | =item my $rpc = AnyEvent::Fork::Pool::run $fork, $function, [key => value...] |
|
|
85 | |
|
|
86 | The traditional way to call it. But it is way cooler to call it in the |
|
|
87 | following way: |
|
|
88 | |
|
|
89 | =item my $rpc = $fork->AnyEvent::Fork::Pool::run ($function, [key => value...]) |
85 | |
90 | |
86 | Creates a new pool object with the specified C<$function> as function |
91 | Creates a new pool object with the specified C<$function> as function |
87 | (name) to call for each request. |
92 | (name) to call for each request. The pool uses the C<$fork> object as the |
88 | |
93 | template when creating worker processes. |
89 | A pool consists of a template process that contains the code and data that |
|
|
90 | the worker processes need. And a number of worker processes that have been |
|
|
91 | forked off of that template process. |
|
|
92 | |
94 | |
93 | You can supply your own template process, or tell C<AnyEvent::Fork::Pool> |
95 | You can supply your own template process, or tell C<AnyEvent::Fork::Pool> |
94 | to create one. |
96 | to create one. |
95 | |
97 | |
96 | A relatively large number of key/value pairs can be specified to influence |
98 | A relatively large number of key/value pairs can be specified to influence |
… | |
… | |
105 | decide how many of these processes exist and when they are started and |
107 | decide how many of these processes exist and when they are started and |
106 | stopp.ed |
108 | stopp.ed |
107 | |
109 | |
108 | =over 4 |
110 | =over 4 |
109 | |
111 | |
110 | =item min => $count (default: 0) |
112 | =item idle => $count (default: 0) |
111 | |
113 | |
112 | The minimum number of processes in the pool, in addition to the template |
114 | The minimum amount of idle processes in the pool - when there are fewer |
113 | process. Even when idle, there will never be fewer than this number of |
115 | than this many idle workers, C<AnyEvent::Fork::Pool> will try to start new |
114 | worker processes. The default means that the pool can be empty. |
116 | ones, subject to C<max> and C<start>. |
|
|
117 | |
|
|
118 | This is also the initial/minimum amount of workers in the pool. The |
|
|
119 | default of zero means that the pool starts empty and can shrink back to |
|
|
120 | zero workers over time. |
115 | |
121 | |
116 | =item max => $count (default: 4) |
122 | =item max => $count (default: 4) |
117 | |
123 | |
118 | The maximum number of processes in the pool, in addition to the template |
124 | The maximum number of processes in the pool, in addition to the template |
119 | process. C<AnyEvent::Fork::Pool> will never create more than this number |
125 | process. C<AnyEvent::Fork::Pool> will never create more than this number |
120 | of processes. |
126 | of worker processes, although there can be more temporarily when a worker |
|
|
127 | is shut down and hasn't exited yet. |
121 | |
128 | |
122 | =item max_queue => $count (default: 2) |
129 | =item load => $count (default: 2) |
123 | |
130 | |
124 | The maximum number of jobs sent to a single worker process. Worker |
131 | The maximum number of concurrent jobs sent to a single worker |
125 | processes that handle this number of jobs already are called "busy". |
132 | process. Worker processes that handle this number of jobs already are |
|
|
133 | called "busy". |
126 | |
134 | |
127 | Jobs that cannot be sent to a worker immediately (because all workers are |
135 | Jobs that cannot be sent to a worker immediately (because all workers are |
128 | busy) will be queued until a worker is available. |
136 | busy) will be queued until a worker is available. |
129 | |
137 | |
130 | =item min_delay => $seconds (default: 0) |
138 | =item start => $seconds (default: 0.1) |
131 | |
139 | |
132 | When a job is queued and all workers are busy, a timer is started. If the |
140 | When a job is queued and all workers are busy, a timer is started. If the |
133 | timer elapses and there are still jobs that cannot be queued to a worker, |
141 | timer elapses and there are still jobs that cannot be queued to a worker, |
134 | a new worker is started. |
142 | a new worker is started. |
135 | |
143 | |
… | |
… | |
138 | workers. |
146 | workers. |
139 | |
147 | |
140 | The delay is zero by default, which means new workers will be started |
148 | The delay is zero by default, which means new workers will be started |
141 | without delay. |
149 | without delay. |
142 | |
150 | |
143 | =item min_idle => $count (default: 0) |
|
|
144 | |
|
|
145 | The minimum number of idle workers - when they are less, more |
|
|
146 | are started. The C<min_delay> is still respected though, and |
|
|
147 | C<min_idle>/C<min_delay> and C<max_idle>/C<idle_time> are useful to |
|
|
148 | dynamically adjust the pool. |
|
|
149 | |
|
|
150 | =item max_idle => $count (default: 1) |
|
|
151 | |
|
|
152 | The maximum number of idle workers. If a worker becomes idle and there are |
|
|
153 | already this many idle workers, it will be stopped immediately instead of |
|
|
154 | waiting for the idle timer to elapse. |
|
|
155 | |
|
|
156 | =item idle_time => $seconds (default: 1) |
151 | =item stop => $seconds (default: 1) |
157 | |
152 | |
158 | When a worker has no jobs to execute it becomes idle. An idle worker that |
153 | When a worker has no jobs to execute it becomes idle. An idle worker that |
159 | hasn't executed a job within this amount of time will be stopped, unless |
154 | hasn't executed a job within this amount of time will be stopped, unless |
160 | the other parameters say otherwise. |
155 | the other parameters say otherwise. |
161 | |
156 | |
… | |
… | |
233 | |
228 | |
234 | =back |
229 | =back |
235 | |
230 | |
236 | =cut |
231 | =cut |
237 | |
232 | |
238 | sub new { |
233 | sub run { |
239 | my ($class, $function, %arg) = @_; |
234 | my ($template, $function, %arg) = @_; |
240 | |
235 | |
241 | my $self = bless { |
236 | my $max = $arg{max} || 4; |
242 | min => 0, |
237 | my $idle = $arg{idle} || 0, |
243 | max => 4, |
238 | my $load = $arg{load} || 2, |
244 | max_queue => 2, |
239 | my $start = $arg{start} || 0.1, |
245 | min_delay => 0, |
240 | my $stop = $arg{stop} || 1, |
246 | max_idle => 1, |
241 | my $on_event = $arg{on_event} || sub { }, |
247 | idle_time => 1, |
242 | my $on_destroy = $arg{on_destroy}; |
248 | on_event => sub { }, |
|
|
249 | %arg, |
|
|
250 | pool => [], |
|
|
251 | queue => [], |
|
|
252 | }, $class; |
|
|
253 | |
243 | |
254 | $self->{function} = $function; |
244 | my @rpc = ( |
|
|
245 | async => $arg{async}, |
|
|
246 | init => $arg{init}, |
|
|
247 | serialiser => $arg{serialiser}, |
|
|
248 | on_error => $arg{on_error}, |
|
|
249 | ); |
255 | |
250 | |
256 | ($self->{template} ||= new AnyEvent::Fork) |
251 | my (@pool, @queue, $nidle, $start_w, $stop_w, $shutdown); |
|
|
252 | my ($start, $stop, $want_start, $want_stop, $scheduler); |
|
|
253 | |
|
|
254 | my $destroy_guard = Guard::guard { |
|
|
255 | $on_destroy->() |
|
|
256 | if $on_destroy; |
|
|
257 | }; |
|
|
258 | |
|
|
259 | my $busy;#d# |
|
|
260 | |
|
|
261 | $template |
257 | ->require ("AnyEvent::Fork::RPC::" . ($self->{async} ? "Async" : "Sync")) |
262 | ->require ("AnyEvent::Fork::RPC::" . ($arg{async} ? "Async" : "Sync")) |
258 | ->require (@{ delete $self->{require} }) |
|
|
259 | ->eval (' |
263 | ->eval (' |
260 | my ($magic0, $magic2) = @_; |
264 | my ($magic0, $magic1) = @_; |
261 | sub AnyEvent::Fork::Pool::quit() { |
265 | sub AnyEvent::Fork::Pool::quit() { |
262 | AnyEvent::Fork::RPC::on_event $magic0, "quit", $magic2; |
266 | AnyEvent::Fork::RPC::on_event $magic0, "quit", $magic1; |
263 | } |
267 | } |
264 | ', $magic0, $magic2) |
268 | ', $magic0, $magic1) |
265 | ->eval (delete $self->{eval}); |
269 | ->eval (delete $arg{eval}); |
266 | |
270 | |
267 | $self->start |
271 | $start = sub { |
268 | while @{ $self->{pool} } < $self->{min}; |
272 | my $proc = [0, 0, undef]; # load, index, rpc |
269 | |
273 | |
270 | $self |
|
|
271 | } |
|
|
272 | |
|
|
273 | sub start { |
|
|
274 | my ($self) = @_; |
|
|
275 | |
|
|
276 | warn "start\n";#d# |
274 | warn "start a worker\n";#d# |
277 | |
275 | |
278 | Scalar::Util::weaken $self; |
|
|
279 | |
|
|
280 | my $proc = [0, undef, undef]; |
|
|
281 | |
|
|
282 | $proc->[1] = $self->{template} |
276 | $proc->[2] = $template |
283 | ->fork |
277 | ->fork |
284 | ->AnyEvent::Fork::RPC::run ($self->{function}, |
278 | ->AnyEvent::Fork::RPC::run ($function, |
285 | async => $self->{async}, |
279 | @rpc, |
286 | init => $self->{init}, |
|
|
287 | serialiser => $self->{serialiser}, |
|
|
288 | on_error => $self->{on_error}, |
|
|
289 | on_event => sub { |
280 | on_event => sub { |
290 | if (@_ == 3 && $_[0] eq $magic0 && $_[2] eq $magic2) { |
281 | if (@_ == 3 && $_[0] eq $magic0 && $_[2] eq $magic1) { |
291 | if ($_[1] eq "quit") { |
282 | $destroy_guard if 0; # keep it alive |
292 | my $pool = $self->{pool}; |
283 | |
293 | for (0 .. $#$pool) { |
284 | $_[1] eq "quit" and $stop->($proc); |
294 | if ($pool->[$_] == $proc) { |
|
|
295 | Array::Heap::splice_heap @$pool, $_; |
|
|
296 | return; |
285 | return; |
297 | } |
|
|
298 | } |
|
|
299 | die; |
|
|
300 | } |
286 | } |
|
|
287 | |
301 | return; |
288 | &$on_event; |
302 | } |
289 | }, |
|
|
290 | ) |
|
|
291 | ; |
303 | |
292 | |
304 | &{ $self->{on_event} }; |
293 | ++$nidle; |
|
|
294 | Array::Heap::push_heap @pool, $proc; |
|
|
295 | |
|
|
296 | Scalar::Util::weaken $proc; |
|
|
297 | }; |
|
|
298 | |
|
|
299 | $stop = sub { |
|
|
300 | my $proc = shift; |
|
|
301 | |
|
|
302 | $proc->[0] |
|
|
303 | or --$nidle; |
|
|
304 | |
|
|
305 | Array::Heap::splice_heap_idx @pool, $proc->[1] |
|
|
306 | if defined $proc->[1]; |
|
|
307 | }; |
|
|
308 | |
|
|
309 | $want_start = sub { |
|
|
310 | undef $stop_w; |
|
|
311 | |
|
|
312 | $start_w ||= AE::timer $start, 0, sub { |
|
|
313 | undef $start_w; |
|
|
314 | |
|
|
315 | if (@queue) { |
|
|
316 | $start->(); |
|
|
317 | $scheduler->(); |
|
|
318 | } |
|
|
319 | }; |
|
|
320 | }; |
|
|
321 | |
|
|
322 | $want_stop = sub { |
|
|
323 | $stop_w ||= AE::timer $stop, 0, sub { |
|
|
324 | undef $stop_w; |
|
|
325 | |
|
|
326 | $stop->($pool[0]) |
|
|
327 | if $nidle; |
|
|
328 | }; |
|
|
329 | }; |
|
|
330 | |
|
|
331 | $scheduler = sub { |
|
|
332 | if (@queue) { |
|
|
333 | while (@queue) { |
|
|
334 | my $proc = $pool[0]; |
|
|
335 | |
|
|
336 | if ($proc->[0] < $load) { |
|
|
337 | warn "free $proc $proc->[0]\n";#d# |
|
|
338 | # found free worker |
|
|
339 | $proc->[0]++ |
|
|
340 | or --$nidle >= $idle |
|
|
341 | or $want_start->(); |
|
|
342 | |
|
|
343 | Array::Heap::adjust_heap @pool, 0; |
|
|
344 | |
|
|
345 | my $job = shift @queue; |
|
|
346 | my $ocb = pop @$job; |
|
|
347 | |
|
|
348 | $proc->[2]->(@$job, sub { |
|
|
349 | --$busy; warn "busy now $busy\n";#d# |
|
|
350 | # reduce queue counter |
|
|
351 | --$pool[$_][0] |
|
|
352 | or ++$nidle > $idle |
|
|
353 | or $want_stop->(); |
|
|
354 | |
|
|
355 | Array::Heap::adjust_heap @pool, $_; |
|
|
356 | |
|
|
357 | $scheduler->(); |
|
|
358 | |
|
|
359 | &$ocb; |
|
|
360 | }); |
|
|
361 | } else { |
|
|
362 | warn "busy $proc->[0]\n";#d# |
|
|
363 | # all busy, delay |
|
|
364 | |
|
|
365 | $want_start->(); |
|
|
366 | last; |
305 | }, |
367 | } |
306 | ) |
368 | } |
|
|
369 | } elsif ($shutdown) { |
|
|
370 | @pool = (); |
|
|
371 | undef $start_w; |
|
|
372 | undef $start; # frees $destroy_guard reference |
|
|
373 | |
|
|
374 | $stop->($pool[0]) |
|
|
375 | while $nidle; |
|
|
376 | } |
307 | ; |
377 | }; |
308 | |
378 | |
309 | ++$self->{idle}; |
379 | my $shutdown_guard = Guard::guard { |
310 | Array::Heap::push_heap @{ $self->{pool} }, $proc; |
380 | $shutdown = 1; |
|
|
381 | $scheduler->(); |
|
|
382 | }; |
|
|
383 | |
|
|
384 | $start->() |
|
|
385 | while @pool < $idle; |
|
|
386 | |
|
|
387 | sub { |
|
|
388 | $shutdown_guard if 0; # keep it alive |
|
|
389 | |
|
|
390 | ++$busy;#d# |
|
|
391 | |
|
|
392 | $start->() |
|
|
393 | unless @pool; |
|
|
394 | |
|
|
395 | push @queue, [@_]; |
|
|
396 | $scheduler->(); |
|
|
397 | } |
311 | } |
398 | } |
312 | |
399 | |
313 | =item $pool->call (..., $cb->(...)) |
400 | =item $pool->call (..., $cb->(...)) |
314 | |
401 | |
315 | Call the RPC function of a worker with the given arguments, and when the |
402 | Call the RPC function of a worker with the given arguments, and when the |
… | |
… | |
323 | to this function are effectively read-only - modifying them after the call |
410 | to this function are effectively read-only - modifying them after the call |
324 | and before the callback is invoked causes undefined behaviour. |
411 | and before the callback is invoked causes undefined behaviour. |
325 | |
412 | |
326 | =cut |
413 | =cut |
327 | |
414 | |
328 | sub scheduler { |
|
|
329 | my $self = shift; |
|
|
330 | |
|
|
331 | my $pool = $self->{pool}; |
|
|
332 | my $queue = $self->{queue}; |
|
|
333 | |
|
|
334 | $self->start |
|
|
335 | unless @$pool; |
|
|
336 | |
|
|
337 | while (@$queue) { |
|
|
338 | my $proc = $pool->[0]; |
|
|
339 | |
|
|
340 | if ($proc->[0] < $self->{max_queue}) { |
|
|
341 | warn "free $proc $proc->[0]\n";#d# |
|
|
342 | # found free worker |
|
|
343 | --$self->{idle} |
|
|
344 | unless $proc->[0]++; |
|
|
345 | |
|
|
346 | undef $proc->[2]; |
|
|
347 | |
|
|
348 | Array::Heap::adjust_heap @$pool, 0; |
|
|
349 | |
|
|
350 | my $job = shift @$queue; |
|
|
351 | my $ocb = pop @$job; |
|
|
352 | |
|
|
353 | $proc->[1]->(@$job, sub { |
|
|
354 | for (0 .. $#$pool) { |
|
|
355 | if ($pool->[$_] == $proc) { |
|
|
356 | # reduce queue counter |
|
|
357 | unless (--$pool->[$_][0]) { |
|
|
358 | # worker becomes idle |
|
|
359 | my $to = ++$self->{idle} > $self->{max_idle} |
|
|
360 | ? 0 |
|
|
361 | : $self->{idle_time}; |
|
|
362 | |
|
|
363 | $proc->[2] = AE::timer $to, 0, sub { |
|
|
364 | undef $proc->[2]; |
|
|
365 | |
|
|
366 | warn "destroy $proc afzer $to\n";#d# |
|
|
367 | |
|
|
368 | for (0 .. $#$pool) { |
|
|
369 | if ($pool->[$_] == $proc) { |
|
|
370 | Array::Heap::splice_heap @$pool, $_; |
|
|
371 | --$self->{idle}; |
|
|
372 | last; |
|
|
373 | } |
|
|
374 | } |
|
|
375 | }; |
|
|
376 | } |
|
|
377 | |
|
|
378 | Array::Heap::adjust_heap @$pool, $_; |
|
|
379 | last; |
|
|
380 | } |
|
|
381 | } |
|
|
382 | &$ocb; |
|
|
383 | }); |
|
|
384 | } else { |
|
|
385 | warn "busy $proc->[0]\n";#d# |
|
|
386 | # all busy, delay |
|
|
387 | |
|
|
388 | $self->{min_delay_w} ||= AE::timer $self->{min_delay}, 0, sub { |
|
|
389 | delete $self->{min_delay_w}; |
|
|
390 | |
|
|
391 | if (@{ $self->{queue} }) { |
|
|
392 | $self->start; |
|
|
393 | $self->scheduler; |
|
|
394 | } |
|
|
395 | }; |
|
|
396 | last; |
|
|
397 | } |
|
|
398 | } |
|
|
399 | warn "last\n";#d# |
|
|
400 | } |
|
|
401 | |
|
|
402 | sub call { |
|
|
403 | my $self = shift; |
|
|
404 | |
|
|
405 | push @{ $self->{queue} }, [@_]; |
|
|
406 | $self->scheduler; |
|
|
407 | } |
|
|
408 | |
|
|
409 | sub DESTROY { |
|
|
410 | $_[0]{on_destroy}->(); |
|
|
411 | } |
|
|
412 | |
|
|
413 | =back |
415 | =back |
414 | |
416 | |
415 | =head1 SEE ALSO |
417 | =head1 SEE ALSO |
416 | |
418 | |
417 | L<AnyEvent::Fork>, to create the processes in the first place. |
419 | L<AnyEvent::Fork>, to create the processes in the first place. |