1 |
=head1 NAME |
2 |
|
3 |
AnyEvent::Fork::Pool - simple process pool manager on top of AnyEvent::Fork |
4 |
|
5 |
=head1 SYNOPSIS |
6 |
|
7 |
use AnyEvent; |
8 |
use AnyEvent::Fork::Pool; |
9 |
# use AnyEvent::Fork is not needed |
10 |
|
11 |
# all parameters with default values |
12 |
my $pool = AnyEvent::Fork |
13 |
->new |
14 |
->require ("MyWorker") |
15 |
->AnyEvent::Fork::Pool::run ( |
16 |
"MyWorker::run", # the worker function |
17 |
|
18 |
# pool management |
19 |
max => 4, # absolute maximum # of processes |
20 |
idle => 2, # minimum # of idle processes |
21 |
load => 2, # queue at most this number of jobs per process |
22 |
start => 0.1, # wait this many seconds before starting a new process |
23 |
stop => 1, # wait this many seconds before stopping an idle process |
24 |
on_destroy => (my $finish = AE::cv), # called when object is destroyed |
25 |
|
26 |
# parameters passed to AnyEvent::Fork::RPC |
27 |
async => 0, |
28 |
on_error => sub { die "FATAL: $_[0]\n" }, |
29 |
on_event => sub { my @ev = @_ }, |
30 |
init => "MyWorker::init", |
31 |
serialiser => $AnyEvent::Fork::RPC::STRING_SERIALISER, |
32 |
); |
33 |
|
34 |
for (1..10) { |
35 |
$pool->(doit => $_, sub { |
36 |
print "MyWorker::run returned @_\n"; |
37 |
}); |
38 |
} |
39 |
|
40 |
undef $pool; |
41 |
|
42 |
$finish->recv; |
43 |
|
44 |
=head1 DESCRIPTION |
45 |
|
46 |
This module uses processes created via L<AnyEvent::Fork> and the RPC |
47 |
protocol implement in L<AnyEvent::Fork::RPC> to create a load-balanced |
48 |
pool of processes that handles jobs. |
49 |
|
50 |
Understanding of L<AnyEvent::Fork> is helpful but not critical to be able |
51 |
to use this module, but a thorough understanding of L<AnyEvent::Fork::RPC> |
52 |
is, as it defines the actual API that needs to be implemented in the |
53 |
children. |
54 |
|
55 |
=head1 EXAMPLES |
56 |
|
57 |
=head1 PARENT USAGE |
58 |
|
59 |
=over 4 |
60 |
|
61 |
=cut |
62 |
|
63 |
package AnyEvent::Fork::Pool; |
64 |
|
65 |
use common::sense; |
66 |
|
67 |
use Scalar::Util (); |
68 |
|
69 |
use Guard (); |
70 |
use Array::Heap (); |
71 |
|
72 |
use AnyEvent; |
73 |
use AnyEvent::Fork; # we don't actually depend on it, this is for convenience |
74 |
use AnyEvent::Fork::RPC; |
75 |
|
76 |
# these are used for the first and last argument of events |
77 |
# in the hope of not colliding. yes, I don't like it either, |
78 |
# but didn't come up with an obviously better alternative. |
79 |
my $magic0 = ':t6Z@HK1N%Dx@_7?=~-7NQgWDdAs6a,jFN=wLO0*jD*1%P'; |
80 |
my $magic1 = '<~53rexz.U`!]X[A235^"fyEoiTF\T~oH1l/N6+Djep9b~bI9`\1x%B~vWO1q*'; |
81 |
|
82 |
our $VERSION = 0.1; |
83 |
|
84 |
=item my $rpc = AnyEvent::Fork::Pool::run $fork, $function, [key => value...] |
85 |
|
86 |
The traditional way to call it. But it is way cooler to call it in the |
87 |
following way: |
88 |
|
89 |
=item my $rpc = $fork->AnyEvent::Fork::Pool::run ($function, [key => value...]) |
90 |
|
91 |
Creates a new pool object with the specified C<$function> as function |
92 |
(name) to call for each request. The pool uses the C<$fork> object as the |
93 |
template when creating worker processes. |
94 |
|
95 |
You can supply your own template process, or tell C<AnyEvent::Fork::Pool> |
96 |
to create one. |
97 |
|
98 |
A relatively large number of key/value pairs can be specified to influence |
99 |
the behaviour. They are grouped into the categories "pool management", |
100 |
"template process" and "rpc parameters". |
101 |
|
102 |
=over 4 |
103 |
|
104 |
=item Pool Management |
105 |
|
106 |
The pool consists of a certain number of worker processes. These options |
107 |
decide how many of these processes exist and when they are started and |
108 |
stopp.ed |
109 |
|
110 |
=over 4 |
111 |
|
112 |
=item idle => $count (default: 0) |
113 |
|
114 |
The minimum amount of idle processes in the pool - when there are fewer |
115 |
than this many idle workers, C<AnyEvent::Fork::Pool> will try to start new |
116 |
ones, subject to C<max> and C<start>. |
117 |
|
118 |
This is also the initial/minimum amount of workers in the pool. The |
119 |
default of zero means that the pool starts empty and can shrink back to |
120 |
zero workers over time. |
121 |
|
122 |
=item max => $count (default: 4) |
123 |
|
124 |
The maximum number of processes in the pool, in addition to the template |
125 |
process. C<AnyEvent::Fork::Pool> will never create more than this number |
126 |
of worker processes, although there can be more temporarily when a worker |
127 |
is shut down and hasn't exited yet. |
128 |
|
129 |
=item load => $count (default: 2) |
130 |
|
131 |
The maximum number of concurrent jobs sent to a single worker |
132 |
process. Worker processes that handle this number of jobs already are |
133 |
called "busy". |
134 |
|
135 |
Jobs that cannot be sent to a worker immediately (because all workers are |
136 |
busy) will be queued until a worker is available. |
137 |
|
138 |
=item start => $seconds (default: 0.1) |
139 |
|
140 |
When a job is queued and all workers are busy, a timer is started. If the |
141 |
timer elapses and there are still jobs that cannot be queued to a worker, |
142 |
a new worker is started. |
143 |
|
144 |
This configurs the time that all workers must be busy before a new worker |
145 |
is started. Or, put differently, the minimum delay betwene starting new |
146 |
workers. |
147 |
|
148 |
The delay is zero by default, which means new workers will be started |
149 |
without delay. |
150 |
|
151 |
=item stop => $seconds (default: 1) |
152 |
|
153 |
When a worker has no jobs to execute it becomes idle. An idle worker that |
154 |
hasn't executed a job within this amount of time will be stopped, unless |
155 |
the other parameters say otherwise. |
156 |
|
157 |
=item on_destroy => $callback->() (default: none) |
158 |
|
159 |
When a pool object goes out of scope, it will still handle all outstanding |
160 |
jobs. After that, it will destroy all workers (and also the template |
161 |
process if it isn't referenced otherwise). |
162 |
|
163 |
=back |
164 |
|
165 |
=item Template Process |
166 |
|
167 |
The worker processes are all forked from a single template |
168 |
process. Ideally, all modules and all cdoe used by the worker, as well as |
169 |
any shared data structures should be loaded into the template process, to |
170 |
take advantage of data sharing via fork. |
171 |
|
172 |
You can create your own template process by creating a L<AnyEvent::Fork> |
173 |
object yourself and passing it as the C<template> parameter, but |
174 |
C<AnyEvent::Fork::Pool> can create one for you, including some standard |
175 |
options. |
176 |
|
177 |
=over 4 |
178 |
|
179 |
=item template => $fork (default: C<< AnyEvent::Fork->new >>) |
180 |
|
181 |
The template process to use, if you want to create your own. |
182 |
|
183 |
=item require => \@modules (default: C<[]>) |
184 |
|
185 |
The modules in this list will be laoded into the template process. |
186 |
|
187 |
=item eval => "# perl code to execute in template" (default: none) |
188 |
|
189 |
This is a perl string that is evaluated after creating the template |
190 |
process and after requiring the modules. It can do whatever it wants to |
191 |
configure the process, but it must not do anything that would keep a later |
192 |
fork from working (so must not create event handlers or (real) threads for |
193 |
example). |
194 |
|
195 |
=back |
196 |
|
197 |
=item AnyEvent::Fork::RPC Parameters |
198 |
|
199 |
These parameters are all passed directly to L<AnyEvent::Fork::RPC>. They |
200 |
are only briefly mentioned here, for their full documentation |
201 |
please refer to the L<AnyEvent::Fork::RPC> documentation. Also, the |
202 |
default values mentioned here are only documented as a best effort - |
203 |
L<AnyEvent::Fork::RPC> documentation is binding. |
204 |
|
205 |
=over 4 |
206 |
|
207 |
=item async => $boolean (default: 0) |
208 |
|
209 |
Whether to sue the synchronous or asynchronous RPC backend. |
210 |
|
211 |
=item on_error => $callback->($message) (default: die with message) |
212 |
|
213 |
The callback to call on any (fatal) errors. |
214 |
|
215 |
=item on_event => $callback->(...) (default: C<sub { }>, unlike L<AnyEvent::Fork::RPC>) |
216 |
|
217 |
The callback to invoke on events. |
218 |
|
219 |
=item init => $initfunction (default: none) |
220 |
|
221 |
The function to call in the child, once before handling requests. |
222 |
|
223 |
=item serialiser => $serialiser (defailt: $AnyEvent::Fork::RPC::STRING_SERIALISER) |
224 |
|
225 |
The serialiser to use. |
226 |
|
227 |
=back |
228 |
|
229 |
=back |
230 |
|
231 |
=cut |
232 |
|
233 |
sub run { |
234 |
my ($template, $function, %arg) = @_; |
235 |
|
236 |
my $max = $arg{max} || 4; |
237 |
my $idle = $arg{idle} || 0, |
238 |
my $load = $arg{load} || 2, |
239 |
my $start = $arg{start} || 0.1, |
240 |
my $stop = $arg{stop} || 1, |
241 |
my $on_event = $arg{on_event} || sub { }, |
242 |
my $on_destroy = $arg{on_destroy}; |
243 |
|
244 |
my @rpc = ( |
245 |
async => $arg{async}, |
246 |
init => $arg{init}, |
247 |
serialiser => $arg{serialiser}, |
248 |
on_error => $arg{on_error}, |
249 |
); |
250 |
|
251 |
my (@pool, @queue, $nidle, $start_w, $stop_w, $shutdown); |
252 |
my ($start, $stop, $want_start, $want_stop, $scheduler); |
253 |
|
254 |
my $destroy_guard = Guard::guard { |
255 |
$on_destroy->() |
256 |
if $on_destroy; |
257 |
}; |
258 |
|
259 |
$template |
260 |
->require ("AnyEvent::Fork::RPC::" . ($arg{async} ? "Async" : "Sync")) |
261 |
->eval (' |
262 |
my ($magic0, $magic1) = @_; |
263 |
sub AnyEvent::Fork::Pool::quit() { |
264 |
AnyEvent::Fork::RPC::on_event $magic0, "quit", $magic1; |
265 |
} |
266 |
', $magic0, $magic1) |
267 |
->eval (delete $arg{eval}); |
268 |
|
269 |
$start = sub { |
270 |
my $proc = [0, 0, undef]; # load, index, rpc |
271 |
|
272 |
warn "start a worker\n";#d# |
273 |
|
274 |
$proc->[2] = $template |
275 |
->fork |
276 |
->AnyEvent::Fork::RPC::run ($function, |
277 |
@rpc, |
278 |
on_event => sub { |
279 |
if (@_ == 3 && $_[0] eq $magic0 && $_[2] eq $magic1) { |
280 |
$destroy_guard if 0; # keep it alive |
281 |
|
282 |
$_[1] eq "quit" and $stop->($proc); |
283 |
return; |
284 |
} |
285 |
|
286 |
&$on_event; |
287 |
}, |
288 |
) |
289 |
; |
290 |
|
291 |
++$nidle; |
292 |
Array::Heap::push_heap @pool, $proc; |
293 |
|
294 |
Scalar::Util::weaken $proc; |
295 |
}; |
296 |
|
297 |
$stop = sub { |
298 |
my $proc = shift; |
299 |
|
300 |
$proc->[0] |
301 |
or --$nidle; |
302 |
|
303 |
Array::Heap::splice_heap_idx @pool, $proc->[1] |
304 |
if defined $proc->[1]; |
305 |
}; |
306 |
|
307 |
$want_start = sub { |
308 |
undef $stop_w; |
309 |
|
310 |
$start_w ||= AE::timer $start, 0, sub { |
311 |
undef $start_w; |
312 |
|
313 |
if (@queue) { |
314 |
$start->(); |
315 |
$scheduler->(); |
316 |
} |
317 |
}; |
318 |
}; |
319 |
|
320 |
$want_stop = sub { |
321 |
$stop_w ||= AE::timer $stop, 0, sub { |
322 |
undef $stop_w; |
323 |
|
324 |
$stop->($pool[0]) |
325 |
if $nidle; |
326 |
}; |
327 |
}; |
328 |
|
329 |
$scheduler = sub { |
330 |
if (@queue) { |
331 |
while (@queue) { |
332 |
my $proc = $pool[0]; |
333 |
|
334 |
if ($proc->[0] < $load) { |
335 |
warn "free $proc $proc->[0]\n";#d# |
336 |
# found free worker |
337 |
$proc->[0]++ |
338 |
or --$nidle >= $idle |
339 |
or $want_start->(); |
340 |
|
341 |
Array::Heap::adjust_heap @pool, 0; |
342 |
|
343 |
my $job = shift @queue; |
344 |
my $ocb = pop @$job; |
345 |
|
346 |
$proc->[2]->(@$job, sub { |
347 |
# reduce queue counter |
348 |
--$pool[$_][0] |
349 |
or ++$nidle > $idle |
350 |
or $want_stop->(); |
351 |
|
352 |
Array::Heap::adjust_heap @pool, $_; |
353 |
|
354 |
$scheduler->(); |
355 |
|
356 |
&$ocb; |
357 |
}); |
358 |
} else { |
359 |
warn "busy $proc->[0]\n";#d# |
360 |
# all busy, delay |
361 |
|
362 |
$want_start->(); |
363 |
last; |
364 |
} |
365 |
} |
366 |
} elsif ($shutdown) { |
367 |
@pool = (); |
368 |
undef $start_w; |
369 |
undef $start; # frees $destroy_guard reference |
370 |
|
371 |
$stop->($pool[0]) |
372 |
while $nidle; |
373 |
} |
374 |
}; |
375 |
|
376 |
my $shutdown_guard = Guard::guard { |
377 |
$shutdown = 1; |
378 |
$scheduler->(); |
379 |
}; |
380 |
|
381 |
$start->() |
382 |
while @pool < $idle; |
383 |
|
384 |
sub { |
385 |
$shutdown_guard if 0; # keep it alive |
386 |
|
387 |
$start->() |
388 |
unless @pool; |
389 |
|
390 |
push @queue, [@_]; |
391 |
$scheduler->(); |
392 |
} |
393 |
} |
394 |
|
395 |
=item $pool->call (..., $cb->(...)) |
396 |
|
397 |
Call the RPC function of a worker with the given arguments, and when the |
398 |
worker is done, call the C<$cb> with the results, like just calling the |
399 |
L<AnyEvent::Fork::RPC> object directly. |
400 |
|
401 |
If there is no free worker, the call will be queued. |
402 |
|
403 |
Note that there can be considerable time between calling this method and |
404 |
the call actually being executed. During this time, the parameters passed |
405 |
to this function are effectively read-only - modifying them after the call |
406 |
and before the callback is invoked causes undefined behaviour. |
407 |
|
408 |
=cut |
409 |
|
410 |
=back |
411 |
|
412 |
=head1 SEE ALSO |
413 |
|
414 |
L<AnyEvent::Fork>, to create the processes in the first place. |
415 |
|
416 |
L<AnyEvent::Fork::RPC>, which implements the RPC protocol and API. |
417 |
|
418 |
=head1 AUTHOR AND CONTACT INFORMATION |
419 |
|
420 |
Marc Lehmann <schmorp@schmorp.de> |
421 |
http://software.schmorp.de/pkg/AnyEvent-Fork-Pool |
422 |
|
423 |
=cut |
424 |
|
425 |
1 |
426 |
|