1 |
=head1 NAME |
2 |
|
3 |
AnyEvent::Fork::Pool - simple process pool manager on top of AnyEvent::Fork |
4 |
|
5 |
=head1 SYNOPSIS |
6 |
|
7 |
use AnyEvent; |
8 |
use AnyEvent::Fork::Pool; |
9 |
# use AnyEvent::Fork is not needed |
10 |
|
11 |
# all parameters with default values |
12 |
my $pool = new AnyEvent::Fork::Pool |
13 |
"MyWorker::run", |
14 |
|
15 |
# pool management |
16 |
min => 0, # minimum # of processes |
17 |
max => 4, # maximum # of processes |
18 |
max_queue => 2, # queue at most this number of jobs per process |
19 |
min_delay => 0, # wait this many seconds before starting a new process |
20 |
min_idle => 0, # try to have at least this amount of idle processes |
21 |
max_idle => 1, # at most this many idle processes |
22 |
idle_time => 1, # wait this many seconds before killing an idle process |
23 |
on_destroy => (my $finish = AE::cv), |
24 |
|
25 |
# template process |
26 |
template => AnyEvent::Fork->new, # the template process to use |
27 |
require => [MyWorker::], # module(s) to load |
28 |
eval => "# perl code to execute in template", |
29 |
|
30 |
# parameters passed to AnyEvent::Fork::RPC |
31 |
async => 0, |
32 |
on_error => sub { die "FATAL: $_[0]\n" }, |
33 |
on_event => sub { my @ev = @_ }, |
34 |
init => "MyWorker::init", |
35 |
serialiser => $AnyEvent::Fork::RPC::STRING_SERIALISER, |
36 |
; |
37 |
|
38 |
for (1..10) { |
39 |
$pool->call (doit => $_, sub { |
40 |
print "MyWorker::run returned @_\n"; |
41 |
}); |
42 |
} |
43 |
|
44 |
undef $pool; |
45 |
|
46 |
$finish->recv; |
47 |
|
48 |
=head1 DESCRIPTION |
49 |
|
50 |
This module uses processes created via L<AnyEvent::Fork> and the RPC |
51 |
protocol implement in L<AnyEvent::Fork::RPC> to create a load-balanced |
52 |
pool of processes that handles jobs. |
53 |
|
54 |
Understanding of L<AnyEvent::Fork> is helpful but not critical to be able |
55 |
to use this module, but a thorough understanding of L<AnyEvent::Fork::RPC> |
56 |
is, as it defines the actual API that needs to be implemented in the |
57 |
children. |
58 |
|
59 |
=head1 EXAMPLES |
60 |
|
61 |
=head1 PARENT USAGE |
62 |
|
63 |
=over 4 |
64 |
|
65 |
=cut |
66 |
|
67 |
package AnyEvent::Fork::Pool; |
68 |
|
69 |
use common::sense; |
70 |
|
71 |
use Scalar::Util (); |
72 |
|
73 |
use Array::Heap (); |
74 |
|
75 |
use AnyEvent; |
76 |
use AnyEvent::Fork; # we don't actually depend on it, this is for convenience |
77 |
use AnyEvent::Fork::RPC; |
78 |
|
79 |
my $magic0 = ':t6Z@HK1N%Dx@_7?=~-7NQgWDdAs6a,jFN=wLO0*jD*1%P'; |
80 |
my $magic2 = '<~53rexz.U`!]X[A235^"fyEoiTF\T~oH1l/N6+Djep9b~bI9`\1x%B~vWO1q*'; |
81 |
|
82 |
our $VERSION = 0.1; |
83 |
|
84 |
=item my $rpc = new AnyEvent::Fork::Pool $function, [key => value...] |
85 |
|
86 |
Creates a new pool object with the specified C<$function> as function |
87 |
(name) to call for each request. |
88 |
|
89 |
A pool consists of a template process that contains the code and data that |
90 |
the worker processes need. And a number of worker processes that have been |
91 |
forked off of that template process. |
92 |
|
93 |
You can supply your own template process, or tell C<AnyEvent::Fork::Pool> |
94 |
to create one. |
95 |
|
96 |
A relatively large number of key/value pairs can be specified to influence |
97 |
the behaviour. They are grouped into the categories "pool management", |
98 |
"template process" and "rpc parameters". |
99 |
|
100 |
=over 4 |
101 |
|
102 |
=item Pool Management |
103 |
|
104 |
The pool consists of a certain number of worker processes. These options |
105 |
decide how many of these processes exist and when they are started and |
106 |
stopp.ed |
107 |
|
108 |
=over 4 |
109 |
|
110 |
=item min => $count (default: 0) |
111 |
|
112 |
The minimum number of processes in the pool, in addition to the template |
113 |
process. Even when idle, there will never be fewer than this number of |
114 |
worker processes. The default means that the pool can be empty. |
115 |
|
116 |
=item max => $count (default: 4) |
117 |
|
118 |
The maximum number of processes in the pool, in addition to the template |
119 |
process. C<AnyEvent::Fork::Pool> will never create more than this number |
120 |
of processes. |
121 |
|
122 |
=item max_queue => $count (default: 2) |
123 |
|
124 |
The maximum number of jobs sent to a single worker process. Worker |
125 |
processes that handle this number of jobs already are called "busy". |
126 |
|
127 |
Jobs that cannot be sent to a worker immediately (because all workers are |
128 |
busy) will be queued until a worker is available. |
129 |
|
130 |
=item min_delay => $seconds (default: 0) |
131 |
|
132 |
When a job is queued and all workers are busy, a timer is started. If the |
133 |
timer elapses and there are still jobs that cannot be queued to a worker, |
134 |
a new worker is started. |
135 |
|
136 |
This configurs the time that all workers must be busy before a new worker |
137 |
is started. Or, put differently, the minimum delay betwene starting new |
138 |
workers. |
139 |
|
140 |
The delay is zero by default, which means new workers will be started |
141 |
without delay. |
142 |
|
143 |
=item min_idle => $count (default: 0) |
144 |
|
145 |
The minimum number of idle workers - when they are less, more |
146 |
are started. The C<min_delay> is still respected though, and |
147 |
C<min_idle>/C<min_delay> and C<max_idle>/C<idle_time> are useful to |
148 |
dynamically adjust the pool. |
149 |
|
150 |
=item max_idle => $count (default: 1) |
151 |
|
152 |
The maximum number of idle workers. If a worker becomes idle and there are |
153 |
already this many idle workers, it will be stopped immediately instead of |
154 |
waiting for the idle timer to elapse. |
155 |
|
156 |
=item idle_time => $seconds (default: 1) |
157 |
|
158 |
When a worker has no jobs to execute it becomes idle. An idle worker that |
159 |
hasn't executed a job within this amount of time will be stopped, unless |
160 |
the other parameters say otherwise. |
161 |
|
162 |
=item on_destroy => $callback->() (default: none) |
163 |
|
164 |
When a pool object goes out of scope, it will still handle all outstanding |
165 |
jobs. After that, it will destroy all workers (and also the template |
166 |
process if it isn't referenced otherwise). |
167 |
|
168 |
=back |
169 |
|
170 |
=item Template Process |
171 |
|
172 |
The worker processes are all forked from a single template |
173 |
process. Ideally, all modules and all cdoe used by the worker, as well as |
174 |
any shared data structures should be loaded into the template process, to |
175 |
take advantage of data sharing via fork. |
176 |
|
177 |
You can create your own template process by creating a L<AnyEvent::Fork> |
178 |
object yourself and passing it as the C<template> parameter, but |
179 |
C<AnyEvent::Fork::Pool> can create one for you, including some standard |
180 |
options. |
181 |
|
182 |
=over 4 |
183 |
|
184 |
=item template => $fork (default: C<< AnyEvent::Fork->new >>) |
185 |
|
186 |
The template process to use, if you want to create your own. |
187 |
|
188 |
=item require => \@modules (default: C<[]>) |
189 |
|
190 |
The modules in this list will be laoded into the template process. |
191 |
|
192 |
=item eval => "# perl code to execute in template" (default: none) |
193 |
|
194 |
This is a perl string that is evaluated after creating the template |
195 |
process and after requiring the modules. It can do whatever it wants to |
196 |
configure the process, but it must not do anything that would keep a later |
197 |
fork from working (so must not create event handlers or (real) threads for |
198 |
example). |
199 |
|
200 |
=back |
201 |
|
202 |
=item AnyEvent::Fork::RPC Parameters |
203 |
|
204 |
These parameters are all passed directly to L<AnyEvent::Fork::RPC>. They |
205 |
are only briefly mentioned here, for their full documentation |
206 |
please refer to the L<AnyEvent::Fork::RPC> documentation. Also, the |
207 |
default values mentioned here are only documented as a best effort - |
208 |
L<AnyEvent::Fork::RPC> documentation is binding. |
209 |
|
210 |
=over 4 |
211 |
|
212 |
=item async => $boolean (default: 0) |
213 |
|
214 |
Whether to sue the synchronous or asynchronous RPC backend. |
215 |
|
216 |
=item on_error => $callback->($message) (default: die with message) |
217 |
|
218 |
The callback to call on any (fatal) errors. |
219 |
|
220 |
=item on_event => $callback->(...) (default: C<sub { }>, unlike L<AnyEvent::Fork::RPC>) |
221 |
|
222 |
The callback to invoke on events. |
223 |
|
224 |
=item init => $initfunction (default: none) |
225 |
|
226 |
The function to call in the child, once before handling requests. |
227 |
|
228 |
=item serialiser => $serialiser (defailt: $AnyEvent::Fork::RPC::STRING_SERIALISER) |
229 |
|
230 |
The serialiser to use. |
231 |
|
232 |
=back |
233 |
|
234 |
=back |
235 |
|
236 |
=cut |
237 |
|
238 |
sub new { |
239 |
my ($class, $function, %arg) = @_; |
240 |
|
241 |
my $self = bless { |
242 |
min => 0, |
243 |
max => 4, |
244 |
max_queue => 2, |
245 |
min_delay => 0, |
246 |
max_idle => 1, |
247 |
idle_time => 1, |
248 |
on_event => sub { }, |
249 |
%arg, |
250 |
pool => [], |
251 |
queue => [], |
252 |
}, $class; |
253 |
|
254 |
$self->{function} = $function; |
255 |
|
256 |
($self->{template} ||= new AnyEvent::Fork) |
257 |
->require ("AnyEvent::Fork::RPC::" . ($self->{async} ? "Async" : "Sync")) |
258 |
->require (@{ delete $self->{require} }) |
259 |
->eval (' |
260 |
my ($magic0, $magic2) = @_; |
261 |
sub AnyEvent::Fork::Pool::quit() { |
262 |
AnyEvent::Fork::RPC::on_event $magic0, "quit", $magic2; |
263 |
} |
264 |
', $magic0, $magic2) |
265 |
->eval (delete $self->{eval}); |
266 |
|
267 |
$self->start |
268 |
while @{ $self->{pool} } < $self->{min}; |
269 |
|
270 |
$self |
271 |
} |
272 |
|
273 |
sub start { |
274 |
my ($self) = @_; |
275 |
|
276 |
warn "start\n";#d# |
277 |
|
278 |
Scalar::Util::weaken $self; |
279 |
|
280 |
my $proc = [0, undef, undef]; |
281 |
|
282 |
$proc->[1] = $self->{template} |
283 |
->fork |
284 |
->AnyEvent::Fork::RPC::run ($self->{function}, |
285 |
async => $self->{async}, |
286 |
init => $self->{init}, |
287 |
serialiser => $self->{serialiser}, |
288 |
on_error => $self->{on_error}, |
289 |
on_event => sub { |
290 |
if (@_ == 3 && $_[0] eq $magic0 && $_[2] eq $magic2) { |
291 |
if ($_[1] eq "quit") { |
292 |
my $pool = $self->{pool}; |
293 |
for (0 .. $#$pool) { |
294 |
if ($pool->[$_] == $proc) { |
295 |
Array::Heap::splice_heap @$pool, $_; |
296 |
return; |
297 |
} |
298 |
} |
299 |
die; |
300 |
} |
301 |
return; |
302 |
} |
303 |
|
304 |
&{ $self->{on_event} }; |
305 |
}, |
306 |
) |
307 |
; |
308 |
|
309 |
++$self->{idle}; |
310 |
Array::Heap::push_heap @{ $self->{pool} }, $proc; |
311 |
} |
312 |
|
313 |
=item $pool->call (..., $cb->(...)) |
314 |
|
315 |
Call the RPC function of a worker with the given arguments, and when the |
316 |
worker is done, call the C<$cb> with the results, like just calling the |
317 |
L<AnyEvent::Fork::RPC> object directly. |
318 |
|
319 |
If there is no free worker, the call will be queued. |
320 |
|
321 |
Note that there can be considerable time between calling this method and |
322 |
the call actually being executed. During this time, the parameters passed |
323 |
to this function are effectively read-only - modifying them after the call |
324 |
and before the callback is invoked causes undefined behaviour. |
325 |
|
326 |
=cut |
327 |
|
328 |
sub scheduler { |
329 |
my $self = shift; |
330 |
|
331 |
my $pool = $self->{pool}; |
332 |
my $queue = $self->{queue}; |
333 |
|
334 |
$self->start |
335 |
unless @$pool; |
336 |
|
337 |
while (@$queue) { |
338 |
my $proc = $pool->[0]; |
339 |
|
340 |
if ($proc->[0] < $self->{max_queue}) { |
341 |
warn "free $proc $proc->[0]\n";#d# |
342 |
# found free worker |
343 |
--$self->{idle} |
344 |
unless $proc->[0]++; |
345 |
|
346 |
undef $proc->[2]; |
347 |
|
348 |
Array::Heap::adjust_heap @$pool, 0; |
349 |
|
350 |
my $job = shift @$queue; |
351 |
my $ocb = pop @$job; |
352 |
|
353 |
$proc->[1]->(@$job, sub { |
354 |
for (0 .. $#$pool) { |
355 |
if ($pool->[$_] == $proc) { |
356 |
# reduce queue counter |
357 |
unless (--$pool->[$_][0]) { |
358 |
# worker becomes idle |
359 |
my $to = ++$self->{idle} > $self->{max_idle} |
360 |
? 0 |
361 |
: $self->{idle_time}; |
362 |
|
363 |
$proc->[2] = AE::timer $to, 0, sub { |
364 |
undef $proc->[2]; |
365 |
|
366 |
warn "destroy $proc afzer $to\n";#d# |
367 |
|
368 |
for (0 .. $#$pool) { |
369 |
if ($pool->[$_] == $proc) { |
370 |
Array::Heap::splice_heap @$pool, $_; |
371 |
--$self->{idle}; |
372 |
last; |
373 |
} |
374 |
} |
375 |
}; |
376 |
} |
377 |
|
378 |
Array::Heap::adjust_heap @$pool, $_; |
379 |
last; |
380 |
} |
381 |
} |
382 |
&$ocb; |
383 |
}); |
384 |
} else { |
385 |
warn "busy $proc->[0]\n";#d# |
386 |
# all busy, delay |
387 |
|
388 |
$self->{min_delay_w} ||= AE::timer $self->{min_delay}, 0, sub { |
389 |
delete $self->{min_delay_w}; |
390 |
|
391 |
if (@{ $self->{queue} }) { |
392 |
$self->start; |
393 |
$self->scheduler; |
394 |
} |
395 |
}; |
396 |
last; |
397 |
} |
398 |
} |
399 |
warn "last\n";#d# |
400 |
} |
401 |
|
402 |
sub call { |
403 |
my $self = shift; |
404 |
|
405 |
push @{ $self->{queue} }, [@_]; |
406 |
$self->scheduler; |
407 |
} |
408 |
|
409 |
sub DESTROY { |
410 |
$_[0]{on_destroy}->(); |
411 |
} |
412 |
|
413 |
=back |
414 |
|
415 |
=head1 SEE ALSO |
416 |
|
417 |
L<AnyEvent::Fork>, to create the processes in the first place. |
418 |
|
419 |
L<AnyEvent::Fork::RPC>, which implements the RPC protocol and API. |
420 |
|
421 |
=head1 AUTHOR AND CONTACT INFORMATION |
422 |
|
423 |
Marc Lehmann <schmorp@schmorp.de> |
424 |
http://software.schmorp.de/pkg/AnyEvent-Fork-Pool |
425 |
|
426 |
=cut |
427 |
|
428 |
1 |
429 |
|