1 |
root |
1.1 |
=head1 NAME |
2 |
|
|
|
3 |
|
|
AnyEvent::Fork::Pool - simple process pool manager on top of AnyEvent::Fork |
4 |
|
|
|
5 |
|
|
=head1 SYNOPSIS |
6 |
|
|
|
7 |
|
|
use AnyEvent; |
8 |
|
|
use AnyEvent::Fork::Pool; |
9 |
|
|
# use AnyEvent::Fork is not needed |
10 |
|
|
|
11 |
|
|
# all parameters with default values |
12 |
root |
1.3 |
my $pool = AnyEvent::Fork |
13 |
|
|
->new |
14 |
|
|
->require ("MyWorker") |
15 |
|
|
->AnyEvent::Fork::Pool::run ( |
16 |
|
|
"MyWorker::run", # the worker function |
17 |
|
|
|
18 |
|
|
# pool management |
19 |
|
|
max => 4, # absolute maximum # of processes |
20 |
|
|
idle => 2, # minimum # of idle processes |
21 |
|
|
load => 2, # queue at most this number of jobs per process |
22 |
|
|
start => 0.1, # wait this many seconds before starting a new process |
23 |
|
|
stop => 1, # wait this many seconds before stopping an idle process |
24 |
|
|
on_destroy => (my $finish = AE::cv), # called when object is destroyed |
25 |
|
|
|
26 |
|
|
# parameters passed to AnyEvent::Fork::RPC |
27 |
|
|
async => 0, |
28 |
|
|
on_error => sub { die "FATAL: $_[0]\n" }, |
29 |
|
|
on_event => sub { my @ev = @_ }, |
30 |
|
|
init => "MyWorker::init", |
31 |
|
|
serialiser => $AnyEvent::Fork::RPC::STRING_SERIALISER, |
32 |
|
|
); |
33 |
root |
1.1 |
|
34 |
|
|
for (1..10) { |
35 |
root |
1.3 |
$pool->(doit => $_, sub { |
36 |
root |
1.1 |
print "MyWorker::run returned @_\n"; |
37 |
|
|
}); |
38 |
|
|
} |
39 |
|
|
|
40 |
|
|
undef $pool; |
41 |
|
|
|
42 |
|
|
$finish->recv; |
43 |
|
|
|
44 |
|
|
=head1 DESCRIPTION |
45 |
|
|
|
46 |
|
|
This module uses processes created via L<AnyEvent::Fork> and the RPC |
47 |
|
|
protocol implement in L<AnyEvent::Fork::RPC> to create a load-balanced |
48 |
|
|
pool of processes that handles jobs. |
49 |
|
|
|
50 |
|
|
Understanding of L<AnyEvent::Fork> is helpful but not critical to be able |
51 |
|
|
to use this module, but a thorough understanding of L<AnyEvent::Fork::RPC> |
52 |
|
|
is, as it defines the actual API that needs to be implemented in the |
53 |
|
|
children. |
54 |
|
|
|
55 |
|
|
=head1 EXAMPLES |
56 |
|
|
|
57 |
root |
1.2 |
=head1 PARENT USAGE |
58 |
root |
1.1 |
|
59 |
|
|
=over 4 |
60 |
|
|
|
61 |
|
|
=cut |
62 |
|
|
|
63 |
|
|
package AnyEvent::Fork::Pool; |
64 |
|
|
|
65 |
|
|
use common::sense; |
66 |
|
|
|
67 |
root |
1.2 |
use Scalar::Util (); |
68 |
|
|
|
69 |
root |
1.3 |
use Guard (); |
70 |
root |
1.2 |
use Array::Heap (); |
71 |
root |
1.1 |
|
72 |
|
|
use AnyEvent; |
73 |
|
|
use AnyEvent::Fork; # we don't actually depend on it, this is for convenience |
74 |
|
|
use AnyEvent::Fork::RPC; |
75 |
|
|
|
76 |
root |
1.3 |
# these are used for the first and last argument of events |
77 |
|
|
# in the hope of not colliding. yes, I don't like it either, |
78 |
|
|
# but didn't come up with an obviously better alternative. |
79 |
root |
1.2 |
my $magic0 = ':t6Z@HK1N%Dx@_7?=~-7NQgWDdAs6a,jFN=wLO0*jD*1%P'; |
80 |
root |
1.3 |
my $magic1 = '<~53rexz.U`!]X[A235^"fyEoiTF\T~oH1l/N6+Djep9b~bI9`\1x%B~vWO1q*'; |
81 |
root |
1.2 |
|
82 |
root |
1.1 |
our $VERSION = 0.1; |
83 |
|
|
|
84 |
root |
1.3 |
=item my $rpc = AnyEvent::Fork::Pool::run $fork, $function, [key => value...] |
85 |
|
|
|
86 |
|
|
The traditional way to call it. But it is way cooler to call it in the |
87 |
|
|
following way: |
88 |
|
|
|
89 |
|
|
=item my $rpc = $fork->AnyEvent::Fork::Pool::run ($function, [key => value...]) |
90 |
root |
1.2 |
|
91 |
|
|
Creates a new pool object with the specified C<$function> as function |
92 |
root |
1.3 |
(name) to call for each request. The pool uses the C<$fork> object as the |
93 |
|
|
template when creating worker processes. |
94 |
root |
1.2 |
|
95 |
|
|
You can supply your own template process, or tell C<AnyEvent::Fork::Pool> |
96 |
|
|
to create one. |
97 |
|
|
|
98 |
|
|
A relatively large number of key/value pairs can be specified to influence |
99 |
|
|
the behaviour. They are grouped into the categories "pool management", |
100 |
|
|
"template process" and "rpc parameters". |
101 |
root |
1.1 |
|
102 |
|
|
=over 4 |
103 |
|
|
|
104 |
root |
1.2 |
=item Pool Management |
105 |
|
|
|
106 |
|
|
The pool consists of a certain number of worker processes. These options |
107 |
|
|
decide how many of these processes exist and when they are started and |
108 |
|
|
stopp.ed |
109 |
|
|
|
110 |
|
|
=over 4 |
111 |
|
|
|
112 |
root |
1.3 |
=item idle => $count (default: 0) |
113 |
root |
1.2 |
|
114 |
root |
1.3 |
The minimum amount of idle processes in the pool - when there are fewer |
115 |
|
|
than this many idle workers, C<AnyEvent::Fork::Pool> will try to start new |
116 |
|
|
ones, subject to C<max> and C<start>. |
117 |
|
|
|
118 |
|
|
This is also the initial/minimum amount of workers in the pool. The |
119 |
|
|
default of zero means that the pool starts empty and can shrink back to |
120 |
|
|
zero workers over time. |
121 |
root |
1.2 |
|
122 |
|
|
=item max => $count (default: 4) |
123 |
|
|
|
124 |
|
|
The maximum number of processes in the pool, in addition to the template |
125 |
|
|
process. C<AnyEvent::Fork::Pool> will never create more than this number |
126 |
root |
1.3 |
of worker processes, although there can be more temporarily when a worker |
127 |
|
|
is shut down and hasn't exited yet. |
128 |
root |
1.2 |
|
129 |
root |
1.3 |
=item load => $count (default: 2) |
130 |
root |
1.2 |
|
131 |
root |
1.3 |
The maximum number of concurrent jobs sent to a single worker |
132 |
|
|
process. Worker processes that handle this number of jobs already are |
133 |
|
|
called "busy". |
134 |
root |
1.1 |
|
135 |
root |
1.2 |
Jobs that cannot be sent to a worker immediately (because all workers are |
136 |
|
|
busy) will be queued until a worker is available. |
137 |
root |
1.1 |
|
138 |
root |
1.3 |
=item start => $seconds (default: 0.1) |
139 |
root |
1.1 |
|
140 |
root |
1.2 |
When a job is queued and all workers are busy, a timer is started. If the |
141 |
|
|
timer elapses and there are still jobs that cannot be queued to a worker, |
142 |
|
|
a new worker is started. |
143 |
root |
1.1 |
|
144 |
root |
1.2 |
This configurs the time that all workers must be busy before a new worker |
145 |
|
|
is started. Or, put differently, the minimum delay betwene starting new |
146 |
|
|
workers. |
147 |
root |
1.1 |
|
148 |
root |
1.2 |
The delay is zero by default, which means new workers will be started |
149 |
|
|
without delay. |
150 |
root |
1.1 |
|
151 |
root |
1.3 |
=item stop => $seconds (default: 1) |
152 |
root |
1.1 |
|
153 |
root |
1.2 |
When a worker has no jobs to execute it becomes idle. An idle worker that |
154 |
|
|
hasn't executed a job within this amount of time will be stopped, unless |
155 |
|
|
the other parameters say otherwise. |
156 |
|
|
|
157 |
|
|
=item on_destroy => $callback->() (default: none) |
158 |
|
|
|
159 |
|
|
When a pool object goes out of scope, it will still handle all outstanding |
160 |
|
|
jobs. After that, it will destroy all workers (and also the template |
161 |
|
|
process if it isn't referenced otherwise). |
162 |
|
|
|
163 |
|
|
=back |
164 |
|
|
|
165 |
|
|
=item Template Process |
166 |
|
|
|
167 |
|
|
The worker processes are all forked from a single template |
168 |
|
|
process. Ideally, all modules and all cdoe used by the worker, as well as |
169 |
|
|
any shared data structures should be loaded into the template process, to |
170 |
|
|
take advantage of data sharing via fork. |
171 |
|
|
|
172 |
|
|
You can create your own template process by creating a L<AnyEvent::Fork> |
173 |
|
|
object yourself and passing it as the C<template> parameter, but |
174 |
|
|
C<AnyEvent::Fork::Pool> can create one for you, including some standard |
175 |
|
|
options. |
176 |
|
|
|
177 |
|
|
=over 4 |
178 |
|
|
|
179 |
|
|
=item template => $fork (default: C<< AnyEvent::Fork->new >>) |
180 |
|
|
|
181 |
|
|
The template process to use, if you want to create your own. |
182 |
|
|
|
183 |
|
|
=item require => \@modules (default: C<[]>) |
184 |
|
|
|
185 |
|
|
The modules in this list will be laoded into the template process. |
186 |
|
|
|
187 |
|
|
=item eval => "# perl code to execute in template" (default: none) |
188 |
|
|
|
189 |
|
|
This is a perl string that is evaluated after creating the template |
190 |
|
|
process and after requiring the modules. It can do whatever it wants to |
191 |
|
|
configure the process, but it must not do anything that would keep a later |
192 |
|
|
fork from working (so must not create event handlers or (real) threads for |
193 |
|
|
example). |
194 |
|
|
|
195 |
|
|
=back |
196 |
|
|
|
197 |
|
|
=item AnyEvent::Fork::RPC Parameters |
198 |
|
|
|
199 |
|
|
These parameters are all passed directly to L<AnyEvent::Fork::RPC>. They |
200 |
|
|
are only briefly mentioned here, for their full documentation |
201 |
|
|
please refer to the L<AnyEvent::Fork::RPC> documentation. Also, the |
202 |
|
|
default values mentioned here are only documented as a best effort - |
203 |
|
|
L<AnyEvent::Fork::RPC> documentation is binding. |
204 |
|
|
|
205 |
|
|
=over 4 |
206 |
root |
1.1 |
|
207 |
|
|
=item async => $boolean (default: 0) |
208 |
|
|
|
209 |
root |
1.2 |
Whether to sue the synchronous or asynchronous RPC backend. |
210 |
root |
1.1 |
|
211 |
root |
1.2 |
=item on_error => $callback->($message) (default: die with message) |
212 |
root |
1.1 |
|
213 |
root |
1.2 |
The callback to call on any (fatal) errors. |
214 |
root |
1.1 |
|
215 |
root |
1.2 |
=item on_event => $callback->(...) (default: C<sub { }>, unlike L<AnyEvent::Fork::RPC>) |
216 |
root |
1.1 |
|
217 |
root |
1.2 |
The callback to invoke on events. |
218 |
root |
1.1 |
|
219 |
root |
1.2 |
=item init => $initfunction (default: none) |
220 |
root |
1.1 |
|
221 |
root |
1.2 |
The function to call in the child, once before handling requests. |
222 |
root |
1.1 |
|
223 |
root |
1.2 |
=item serialiser => $serialiser (defailt: $AnyEvent::Fork::RPC::STRING_SERIALISER) |
224 |
root |
1.1 |
|
225 |
root |
1.2 |
The serialiser to use. |
226 |
root |
1.1 |
|
227 |
root |
1.2 |
=back |
228 |
root |
1.1 |
|
229 |
|
|
=back |
230 |
|
|
|
231 |
|
|
=cut |
232 |
|
|
|
233 |
root |
1.3 |
sub run { |
234 |
|
|
my ($template, $function, %arg) = @_; |
235 |
|
|
|
236 |
|
|
my $max = $arg{max} || 4; |
237 |
|
|
my $idle = $arg{idle} || 0, |
238 |
|
|
my $load = $arg{load} || 2, |
239 |
|
|
my $start = $arg{start} || 0.1, |
240 |
|
|
my $stop = $arg{stop} || 1, |
241 |
|
|
my $on_event = $arg{on_event} || sub { }, |
242 |
|
|
my $on_destroy = $arg{on_destroy}; |
243 |
|
|
|
244 |
|
|
my @rpc = ( |
245 |
|
|
async => $arg{async}, |
246 |
|
|
init => $arg{init}, |
247 |
|
|
serialiser => $arg{serialiser}, |
248 |
|
|
on_error => $arg{on_error}, |
249 |
|
|
); |
250 |
|
|
|
251 |
|
|
my (@pool, @queue, $nidle, $start_w, $stop_w, $shutdown); |
252 |
|
|
my ($start, $stop, $want_start, $want_stop, $scheduler); |
253 |
|
|
|
254 |
|
|
my $destroy_guard = Guard::guard { |
255 |
|
|
$on_destroy->() |
256 |
|
|
if $on_destroy; |
257 |
|
|
}; |
258 |
root |
1.2 |
|
259 |
root |
1.3 |
my $busy;#d# |
260 |
|
|
|
261 |
|
|
$template |
262 |
|
|
->require ("AnyEvent::Fork::RPC::" . ($arg{async} ? "Async" : "Sync")) |
263 |
root |
1.2 |
->eval (' |
264 |
root |
1.3 |
my ($magic0, $magic1) = @_; |
265 |
root |
1.2 |
sub AnyEvent::Fork::Pool::quit() { |
266 |
root |
1.3 |
AnyEvent::Fork::RPC::on_event $magic0, "quit", $magic1; |
267 |
root |
1.2 |
} |
268 |
root |
1.3 |
', $magic0, $magic1) |
269 |
|
|
->eval (delete $arg{eval}); |
270 |
|
|
|
271 |
|
|
$start = sub { |
272 |
|
|
my $proc = [0, 0, undef]; # load, index, rpc |
273 |
root |
1.2 |
|
274 |
root |
1.3 |
warn "start a worker\n";#d# |
275 |
root |
1.1 |
|
276 |
root |
1.3 |
$proc->[2] = $template |
277 |
|
|
->fork |
278 |
|
|
->AnyEvent::Fork::RPC::run ($function, |
279 |
|
|
@rpc, |
280 |
|
|
on_event => sub { |
281 |
|
|
if (@_ == 3 && $_[0] eq $magic0 && $_[2] eq $magic1) { |
282 |
|
|
$destroy_guard if 0; # keep it alive |
283 |
root |
1.2 |
|
284 |
root |
1.3 |
$_[1] eq "quit" and $stop->($proc); |
285 |
|
|
return; |
286 |
root |
1.2 |
} |
287 |
|
|
|
288 |
root |
1.3 |
&$on_event; |
289 |
|
|
}, |
290 |
|
|
) |
291 |
|
|
; |
292 |
|
|
|
293 |
|
|
++$nidle; |
294 |
|
|
Array::Heap::push_heap @pool, $proc; |
295 |
|
|
|
296 |
|
|
Scalar::Util::weaken $proc; |
297 |
|
|
}; |
298 |
|
|
|
299 |
|
|
$stop = sub { |
300 |
|
|
my $proc = shift; |
301 |
|
|
|
302 |
|
|
$proc->[0] |
303 |
|
|
or --$nidle; |
304 |
|
|
|
305 |
|
|
Array::Heap::splice_heap_idx @pool, $proc->[1] |
306 |
|
|
if defined $proc->[1]; |
307 |
|
|
}; |
308 |
|
|
|
309 |
|
|
$want_start = sub { |
310 |
|
|
undef $stop_w; |
311 |
|
|
|
312 |
|
|
$start_w ||= AE::timer $start, 0, sub { |
313 |
|
|
undef $start_w; |
314 |
|
|
|
315 |
|
|
if (@queue) { |
316 |
|
|
$start->(); |
317 |
|
|
$scheduler->(); |
318 |
|
|
} |
319 |
|
|
}; |
320 |
|
|
}; |
321 |
|
|
|
322 |
|
|
$want_stop = sub { |
323 |
|
|
$stop_w ||= AE::timer $stop, 0, sub { |
324 |
|
|
undef $stop_w; |
325 |
|
|
|
326 |
|
|
$stop->($pool[0]) |
327 |
|
|
if $nidle; |
328 |
|
|
}; |
329 |
|
|
}; |
330 |
|
|
|
331 |
|
|
$scheduler = sub { |
332 |
|
|
if (@queue) { |
333 |
|
|
while (@queue) { |
334 |
|
|
my $proc = $pool[0]; |
335 |
|
|
|
336 |
|
|
if ($proc->[0] < $load) { |
337 |
|
|
warn "free $proc $proc->[0]\n";#d# |
338 |
|
|
# found free worker |
339 |
|
|
$proc->[0]++ |
340 |
|
|
or --$nidle >= $idle |
341 |
|
|
or $want_start->(); |
342 |
|
|
|
343 |
|
|
Array::Heap::adjust_heap @pool, 0; |
344 |
|
|
|
345 |
|
|
my $job = shift @queue; |
346 |
|
|
my $ocb = pop @$job; |
347 |
|
|
|
348 |
|
|
$proc->[2]->(@$job, sub { |
349 |
|
|
--$busy; warn "busy now $busy\n";#d# |
350 |
|
|
# reduce queue counter |
351 |
|
|
--$pool[$_][0] |
352 |
|
|
or ++$nidle > $idle |
353 |
|
|
or $want_stop->(); |
354 |
|
|
|
355 |
|
|
Array::Heap::adjust_heap @pool, $_; |
356 |
|
|
|
357 |
|
|
$scheduler->(); |
358 |
|
|
|
359 |
|
|
&$ocb; |
360 |
|
|
}); |
361 |
|
|
} else { |
362 |
|
|
warn "busy $proc->[0]\n";#d# |
363 |
|
|
# all busy, delay |
364 |
|
|
|
365 |
|
|
$want_start->(); |
366 |
|
|
last; |
367 |
|
|
} |
368 |
|
|
} |
369 |
|
|
} elsif ($shutdown) { |
370 |
|
|
@pool = (); |
371 |
|
|
undef $start_w; |
372 |
|
|
undef $start; # frees $destroy_guard reference |
373 |
|
|
|
374 |
|
|
$stop->($pool[0]) |
375 |
|
|
while $nidle; |
376 |
|
|
} |
377 |
|
|
}; |
378 |
|
|
|
379 |
|
|
my $shutdown_guard = Guard::guard { |
380 |
|
|
$shutdown = 1; |
381 |
|
|
$scheduler->(); |
382 |
|
|
}; |
383 |
|
|
|
384 |
|
|
$start->() |
385 |
|
|
while @pool < $idle; |
386 |
|
|
|
387 |
|
|
sub { |
388 |
|
|
$shutdown_guard if 0; # keep it alive |
389 |
root |
1.2 |
|
390 |
root |
1.3 |
++$busy;#d# |
391 |
|
|
|
392 |
|
|
$start->() |
393 |
|
|
unless @pool; |
394 |
|
|
|
395 |
|
|
push @queue, [@_]; |
396 |
|
|
$scheduler->(); |
397 |
|
|
} |
398 |
root |
1.2 |
} |
399 |
|
|
|
400 |
|
|
=item $pool->call (..., $cb->(...)) |
401 |
|
|
|
402 |
|
|
Call the RPC function of a worker with the given arguments, and when the |
403 |
|
|
worker is done, call the C<$cb> with the results, like just calling the |
404 |
|
|
L<AnyEvent::Fork::RPC> object directly. |
405 |
|
|
|
406 |
|
|
If there is no free worker, the call will be queued. |
407 |
|
|
|
408 |
|
|
Note that there can be considerable time between calling this method and |
409 |
|
|
the call actually being executed. During this time, the parameters passed |
410 |
|
|
to this function are effectively read-only - modifying them after the call |
411 |
|
|
and before the callback is invoked causes undefined behaviour. |
412 |
|
|
|
413 |
|
|
=cut |
414 |
root |
1.1 |
|
415 |
|
|
=back |
416 |
|
|
|
417 |
|
|
=head1 SEE ALSO |
418 |
|
|
|
419 |
|
|
L<AnyEvent::Fork>, to create the processes in the first place. |
420 |
|
|
|
421 |
|
|
L<AnyEvent::Fork::RPC>, which implements the RPC protocol and API. |
422 |
|
|
|
423 |
|
|
=head1 AUTHOR AND CONTACT INFORMATION |
424 |
|
|
|
425 |
|
|
Marc Lehmann <schmorp@schmorp.de> |
426 |
|
|
http://software.schmorp.de/pkg/AnyEvent-Fork-Pool |
427 |
|
|
|
428 |
|
|
=cut |
429 |
|
|
|
430 |
|
|
1 |
431 |
|
|
|