ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-Fork-Pool/Pool.pm
Revision: 1.2
Committed: Thu Apr 18 21:37:27 2013 UTC (11 years, 1 month ago) by root
Branch: MAIN
Changes since 1.1: +298 -171 lines
Log Message:
*** empty log message ***

File Contents

# Content
1 =head1 NAME
2
3 AnyEvent::Fork::Pool - simple process pool manager on top of AnyEvent::Fork
4
5 =head1 SYNOPSIS
6
7 use AnyEvent;
8 use AnyEvent::Fork::Pool;
9 # use AnyEvent::Fork is not needed
10
11 # all parameters with default values
12 my $pool = new AnyEvent::Fork::Pool
13 "MyWorker::run",
14
15 # pool management
16 min => 0, # minimum # of processes
17 max => 4, # maximum # of processes
18 max_queue => 2, # queue at most this number of jobs per process
19 min_delay => 0, # wait this many seconds before starting a new process
20 min_idle => 0, # try to have at least this amount of idle processes
21 max_idle => 1, # at most this many idle processes
22 idle_time => 1, # wait this many seconds before killing an idle process
23 on_destroy => (my $finish = AE::cv),
24
25 # template process
26 template => AnyEvent::Fork->new, # the template process to use
27 require => [MyWorker::], # module(s) to load
28 eval => "# perl code to execute in template",
29
30 # parameters passed to AnyEvent::Fork::RPC
31 async => 0,
32 on_error => sub { die "FATAL: $_[0]\n" },
33 on_event => sub { my @ev = @_ },
34 init => "MyWorker::init",
35 serialiser => $AnyEvent::Fork::RPC::STRING_SERIALISER,
36 ;
37
38 for (1..10) {
39 $pool->call (doit => $_, sub {
40 print "MyWorker::run returned @_\n";
41 });
42 }
43
44 undef $pool;
45
46 $finish->recv;
47
48 =head1 DESCRIPTION
49
50 This module uses processes created via L<AnyEvent::Fork> and the RPC
51 protocol implement in L<AnyEvent::Fork::RPC> to create a load-balanced
52 pool of processes that handles jobs.
53
54 Understanding of L<AnyEvent::Fork> is helpful but not critical to be able
55 to use this module, but a thorough understanding of L<AnyEvent::Fork::RPC>
56 is, as it defines the actual API that needs to be implemented in the
57 children.
58
59 =head1 EXAMPLES
60
61 =head1 PARENT USAGE
62
63 =over 4
64
65 =cut
66
67 package AnyEvent::Fork::Pool;
68
69 use common::sense;
70
71 use Scalar::Util ();
72
73 use Array::Heap ();
74
75 use AnyEvent;
76 use AnyEvent::Fork; # we don't actually depend on it, this is for convenience
77 use AnyEvent::Fork::RPC;
78
79 my $magic0 = ':t6Z@HK1N%Dx@_7?=~-7NQgWDdAs6a,jFN=wLO0*jD*1%P';
80 my $magic2 = '<~53rexz.U`!]X[A235^"fyEoiTF\T~oH1l/N6+Djep9b~bI9`\1x%B~vWO1q*';
81
82 our $VERSION = 0.1;
83
84 =item my $rpc = new AnyEvent::Fork::Pool $function, [key => value...]
85
86 Creates a new pool object with the specified C<$function> as function
87 (name) to call for each request.
88
89 A pool consists of a template process that contains the code and data that
90 the worker processes need. And a number of worker processes that have been
91 forked off of that template process.
92
93 You can supply your own template process, or tell C<AnyEvent::Fork::Pool>
94 to create one.
95
96 A relatively large number of key/value pairs can be specified to influence
97 the behaviour. They are grouped into the categories "pool management",
98 "template process" and "rpc parameters".
99
100 =over 4
101
102 =item Pool Management
103
104 The pool consists of a certain number of worker processes. These options
105 decide how many of these processes exist and when they are started and
106 stopp.ed
107
108 =over 4
109
110 =item min => $count (default: 0)
111
112 The minimum number of processes in the pool, in addition to the template
113 process. Even when idle, there will never be fewer than this number of
114 worker processes. The default means that the pool can be empty.
115
116 =item max => $count (default: 4)
117
118 The maximum number of processes in the pool, in addition to the template
119 process. C<AnyEvent::Fork::Pool> will never create more than this number
120 of processes.
121
122 =item max_queue => $count (default: 2)
123
124 The maximum number of jobs sent to a single worker process. Worker
125 processes that handle this number of jobs already are called "busy".
126
127 Jobs that cannot be sent to a worker immediately (because all workers are
128 busy) will be queued until a worker is available.
129
130 =item min_delay => $seconds (default: 0)
131
132 When a job is queued and all workers are busy, a timer is started. If the
133 timer elapses and there are still jobs that cannot be queued to a worker,
134 a new worker is started.
135
136 This configurs the time that all workers must be busy before a new worker
137 is started. Or, put differently, the minimum delay betwene starting new
138 workers.
139
140 The delay is zero by default, which means new workers will be started
141 without delay.
142
143 =item min_idle => $count (default: 0)
144
145 The minimum number of idle workers - when they are less, more
146 are started. The C<min_delay> is still respected though, and
147 C<min_idle>/C<min_delay> and C<max_idle>/C<idle_time> are useful to
148 dynamically adjust the pool.
149
150 =item max_idle => $count (default: 1)
151
152 The maximum number of idle workers. If a worker becomes idle and there are
153 already this many idle workers, it will be stopped immediately instead of
154 waiting for the idle timer to elapse.
155
156 =item idle_time => $seconds (default: 1)
157
158 When a worker has no jobs to execute it becomes idle. An idle worker that
159 hasn't executed a job within this amount of time will be stopped, unless
160 the other parameters say otherwise.
161
162 =item on_destroy => $callback->() (default: none)
163
164 When a pool object goes out of scope, it will still handle all outstanding
165 jobs. After that, it will destroy all workers (and also the template
166 process if it isn't referenced otherwise).
167
168 =back
169
170 =item Template Process
171
172 The worker processes are all forked from a single template
173 process. Ideally, all modules and all cdoe used by the worker, as well as
174 any shared data structures should be loaded into the template process, to
175 take advantage of data sharing via fork.
176
177 You can create your own template process by creating a L<AnyEvent::Fork>
178 object yourself and passing it as the C<template> parameter, but
179 C<AnyEvent::Fork::Pool> can create one for you, including some standard
180 options.
181
182 =over 4
183
184 =item template => $fork (default: C<< AnyEvent::Fork->new >>)
185
186 The template process to use, if you want to create your own.
187
188 =item require => \@modules (default: C<[]>)
189
190 The modules in this list will be laoded into the template process.
191
192 =item eval => "# perl code to execute in template" (default: none)
193
194 This is a perl string that is evaluated after creating the template
195 process and after requiring the modules. It can do whatever it wants to
196 configure the process, but it must not do anything that would keep a later
197 fork from working (so must not create event handlers or (real) threads for
198 example).
199
200 =back
201
202 =item AnyEvent::Fork::RPC Parameters
203
204 These parameters are all passed directly to L<AnyEvent::Fork::RPC>. They
205 are only briefly mentioned here, for their full documentation
206 please refer to the L<AnyEvent::Fork::RPC> documentation. Also, the
207 default values mentioned here are only documented as a best effort -
208 L<AnyEvent::Fork::RPC> documentation is binding.
209
210 =over 4
211
212 =item async => $boolean (default: 0)
213
214 Whether to sue the synchronous or asynchronous RPC backend.
215
216 =item on_error => $callback->($message) (default: die with message)
217
218 The callback to call on any (fatal) errors.
219
220 =item on_event => $callback->(...) (default: C<sub { }>, unlike L<AnyEvent::Fork::RPC>)
221
222 The callback to invoke on events.
223
224 =item init => $initfunction (default: none)
225
226 The function to call in the child, once before handling requests.
227
228 =item serialiser => $serialiser (defailt: $AnyEvent::Fork::RPC::STRING_SERIALISER)
229
230 The serialiser to use.
231
232 =back
233
234 =back
235
236 =cut
237
238 sub new {
239 my ($class, $function, %arg) = @_;
240
241 my $self = bless {
242 min => 0,
243 max => 4,
244 max_queue => 2,
245 min_delay => 0,
246 max_idle => 1,
247 idle_time => 1,
248 on_event => sub { },
249 %arg,
250 pool => [],
251 queue => [],
252 }, $class;
253
254 $self->{function} = $function;
255
256 ($self->{template} ||= new AnyEvent::Fork)
257 ->require ("AnyEvent::Fork::RPC::" . ($self->{async} ? "Async" : "Sync"))
258 ->require (@{ delete $self->{require} })
259 ->eval ('
260 my ($magic0, $magic2) = @_;
261 sub AnyEvent::Fork::Pool::quit() {
262 AnyEvent::Fork::RPC::on_event $magic0, "quit", $magic2;
263 }
264 ', $magic0, $magic2)
265 ->eval (delete $self->{eval});
266
267 $self->start
268 while @{ $self->{pool} } < $self->{min};
269
270 $self
271 }
272
273 sub start {
274 my ($self) = @_;
275
276 warn "start\n";#d#
277
278 Scalar::Util::weaken $self;
279
280 my $proc = [0, undef, undef];
281
282 $proc->[1] = $self->{template}
283 ->fork
284 ->AnyEvent::Fork::RPC::run ($self->{function},
285 async => $self->{async},
286 init => $self->{init},
287 serialiser => $self->{serialiser},
288 on_error => $self->{on_error},
289 on_event => sub {
290 if (@_ == 3 && $_[0] eq $magic0 && $_[2] eq $magic2) {
291 if ($_[1] eq "quit") {
292 my $pool = $self->{pool};
293 for (0 .. $#$pool) {
294 if ($pool->[$_] == $proc) {
295 Array::Heap::splice_heap @$pool, $_;
296 return;
297 }
298 }
299 die;
300 }
301 return;
302 }
303
304 &{ $self->{on_event} };
305 },
306 )
307 ;
308
309 ++$self->{idle};
310 Array::Heap::push_heap @{ $self->{pool} }, $proc;
311 }
312
313 =item $pool->call (..., $cb->(...))
314
315 Call the RPC function of a worker with the given arguments, and when the
316 worker is done, call the C<$cb> with the results, like just calling the
317 L<AnyEvent::Fork::RPC> object directly.
318
319 If there is no free worker, the call will be queued.
320
321 Note that there can be considerable time between calling this method and
322 the call actually being executed. During this time, the parameters passed
323 to this function are effectively read-only - modifying them after the call
324 and before the callback is invoked causes undefined behaviour.
325
326 =cut
327
328 sub scheduler {
329 my $self = shift;
330
331 my $pool = $self->{pool};
332 my $queue = $self->{queue};
333
334 $self->start
335 unless @$pool;
336
337 while (@$queue) {
338 my $proc = $pool->[0];
339
340 if ($proc->[0] < $self->{max_queue}) {
341 warn "free $proc $proc->[0]\n";#d#
342 # found free worker
343 --$self->{idle}
344 unless $proc->[0]++;
345
346 undef $proc->[2];
347
348 Array::Heap::adjust_heap @$pool, 0;
349
350 my $job = shift @$queue;
351 my $ocb = pop @$job;
352
353 $proc->[1]->(@$job, sub {
354 for (0 .. $#$pool) {
355 if ($pool->[$_] == $proc) {
356 # reduce queue counter
357 unless (--$pool->[$_][0]) {
358 # worker becomes idle
359 my $to = ++$self->{idle} > $self->{max_idle}
360 ? 0
361 : $self->{idle_time};
362
363 $proc->[2] = AE::timer $to, 0, sub {
364 undef $proc->[2];
365
366 warn "destroy $proc afzer $to\n";#d#
367
368 for (0 .. $#$pool) {
369 if ($pool->[$_] == $proc) {
370 Array::Heap::splice_heap @$pool, $_;
371 --$self->{idle};
372 last;
373 }
374 }
375 };
376 }
377
378 Array::Heap::adjust_heap @$pool, $_;
379 last;
380 }
381 }
382 &$ocb;
383 });
384 } else {
385 warn "busy $proc->[0]\n";#d#
386 # all busy, delay
387
388 $self->{min_delay_w} ||= AE::timer $self->{min_delay}, 0, sub {
389 delete $self->{min_delay_w};
390
391 if (@{ $self->{queue} }) {
392 $self->start;
393 $self->scheduler;
394 }
395 };
396 last;
397 }
398 }
399 warn "last\n";#d#
400 }
401
402 sub call {
403 my $self = shift;
404
405 push @{ $self->{queue} }, [@_];
406 $self->scheduler;
407 }
408
409 sub DESTROY {
410 $_[0]{on_destroy}->();
411 }
412
413 =back
414
415 =head1 SEE ALSO
416
417 L<AnyEvent::Fork>, to create the processes in the first place.
418
419 L<AnyEvent::Fork::RPC>, which implements the RPC protocol and API.
420
421 =head1 AUTHOR AND CONTACT INFORMATION
422
423 Marc Lehmann <schmorp@schmorp.de>
424 http://software.schmorp.de/pkg/AnyEvent-Fork-Pool
425
426 =cut
427
428 1
429