ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-Fork-Pool/Pool.pm
Revision: 1.2
Committed: Thu Apr 18 21:37:27 2013 UTC (11 years, 1 month ago) by root
Branch: MAIN
Changes since 1.1: +298 -171 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3     AnyEvent::Fork::Pool - simple process pool manager on top of AnyEvent::Fork
4    
5     =head1 SYNOPSIS
6    
7     use AnyEvent;
8     use AnyEvent::Fork::Pool;
9     # use AnyEvent::Fork is not needed
10    
11     # all parameters with default values
12     my $pool = new AnyEvent::Fork::Pool
13     "MyWorker::run",
14    
15     # pool management
16     min => 0, # minimum # of processes
17 root 1.2 max => 4, # maximum # of processes
18     max_queue => 2, # queue at most this number of jobs per process
19     min_delay => 0, # wait this many seconds before starting a new process
20     min_idle => 0, # try to have at least this amount of idle processes
21     max_idle => 1, # at most this many idle processes
22     idle_time => 1, # wait this many seconds before killing an idle process
23     on_destroy => (my $finish = AE::cv),
24 root 1.1
25     # template process
26     template => AnyEvent::Fork->new, # the template process to use
27     require => [MyWorker::], # module(s) to load
28     eval => "# perl code to execute in template",
29    
30     # parameters passed to AnyEvent::Fork::RPC
31     async => 0,
32     on_error => sub { die "FATAL: $_[0]\n" },
33     on_event => sub { my @ev = @_ },
34     init => "MyWorker::init",
35     serialiser => $AnyEvent::Fork::RPC::STRING_SERIALISER,
36     ;
37    
38     for (1..10) {
39     $pool->call (doit => $_, sub {
40     print "MyWorker::run returned @_\n";
41     });
42     }
43    
44     undef $pool;
45    
46     $finish->recv;
47    
48     =head1 DESCRIPTION
49    
50     This module uses processes created via L<AnyEvent::Fork> and the RPC
51     protocol implement in L<AnyEvent::Fork::RPC> to create a load-balanced
52     pool of processes that handles jobs.
53    
54     Understanding of L<AnyEvent::Fork> is helpful but not critical to be able
55     to use this module, but a thorough understanding of L<AnyEvent::Fork::RPC>
56     is, as it defines the actual API that needs to be implemented in the
57     children.
58    
59     =head1 EXAMPLES
60    
61 root 1.2 =head1 PARENT USAGE
62 root 1.1
63     =over 4
64    
65     =cut
66    
67     package AnyEvent::Fork::Pool;
68    
69     use common::sense;
70    
71 root 1.2 use Scalar::Util ();
72    
73     use Array::Heap ();
74 root 1.1
75     use AnyEvent;
76     use AnyEvent::Fork; # we don't actually depend on it, this is for convenience
77     use AnyEvent::Fork::RPC;
78    
79 root 1.2 my $magic0 = ':t6Z@HK1N%Dx@_7?=~-7NQgWDdAs6a,jFN=wLO0*jD*1%P';
80     my $magic2 = '<~53rexz.U`!]X[A235^"fyEoiTF\T~oH1l/N6+Djep9b~bI9`\1x%B~vWO1q*';
81    
82 root 1.1 our $VERSION = 0.1;
83    
84 root 1.2 =item my $rpc = new AnyEvent::Fork::Pool $function, [key => value...]
85    
86     Creates a new pool object with the specified C<$function> as function
87     (name) to call for each request.
88    
89     A pool consists of a template process that contains the code and data that
90     the worker processes need. And a number of worker processes that have been
91     forked off of that template process.
92    
93     You can supply your own template process, or tell C<AnyEvent::Fork::Pool>
94     to create one.
95    
96     A relatively large number of key/value pairs can be specified to influence
97     the behaviour. They are grouped into the categories "pool management",
98     "template process" and "rpc parameters".
99 root 1.1
100     =over 4
101    
102 root 1.2 =item Pool Management
103    
104     The pool consists of a certain number of worker processes. These options
105     decide how many of these processes exist and when they are started and
106     stopp.ed
107    
108     =over 4
109    
110     =item min => $count (default: 0)
111    
112     The minimum number of processes in the pool, in addition to the template
113     process. Even when idle, there will never be fewer than this number of
114     worker processes. The default means that the pool can be empty.
115    
116     =item max => $count (default: 4)
117    
118     The maximum number of processes in the pool, in addition to the template
119     process. C<AnyEvent::Fork::Pool> will never create more than this number
120     of processes.
121    
122     =item max_queue => $count (default: 2)
123    
124     The maximum number of jobs sent to a single worker process. Worker
125     processes that handle this number of jobs already are called "busy".
126 root 1.1
127 root 1.2 Jobs that cannot be sent to a worker immediately (because all workers are
128     busy) will be queued until a worker is available.
129 root 1.1
130 root 1.2 =item min_delay => $seconds (default: 0)
131 root 1.1
132 root 1.2 When a job is queued and all workers are busy, a timer is started. If the
133     timer elapses and there are still jobs that cannot be queued to a worker,
134     a new worker is started.
135 root 1.1
136 root 1.2 This configurs the time that all workers must be busy before a new worker
137     is started. Or, put differently, the minimum delay betwene starting new
138     workers.
139 root 1.1
140 root 1.2 The delay is zero by default, which means new workers will be started
141     without delay.
142 root 1.1
143 root 1.2 =item min_idle => $count (default: 0)
144 root 1.1
145 root 1.2 The minimum number of idle workers - when they are less, more
146     are started. The C<min_delay> is still respected though, and
147     C<min_idle>/C<min_delay> and C<max_idle>/C<idle_time> are useful to
148     dynamically adjust the pool.
149 root 1.1
150 root 1.2 =item max_idle => $count (default: 1)
151 root 1.1
152 root 1.2 The maximum number of idle workers. If a worker becomes idle and there are
153     already this many idle workers, it will be stopped immediately instead of
154     waiting for the idle timer to elapse.
155 root 1.1
156 root 1.2 =item idle_time => $seconds (default: 1)
157 root 1.1
158 root 1.2 When a worker has no jobs to execute it becomes idle. An idle worker that
159     hasn't executed a job within this amount of time will be stopped, unless
160     the other parameters say otherwise.
161    
162     =item on_destroy => $callback->() (default: none)
163    
164     When a pool object goes out of scope, it will still handle all outstanding
165     jobs. After that, it will destroy all workers (and also the template
166     process if it isn't referenced otherwise).
167    
168     =back
169    
170     =item Template Process
171    
172     The worker processes are all forked from a single template
173     process. Ideally, all modules and all cdoe used by the worker, as well as
174     any shared data structures should be loaded into the template process, to
175     take advantage of data sharing via fork.
176    
177     You can create your own template process by creating a L<AnyEvent::Fork>
178     object yourself and passing it as the C<template> parameter, but
179     C<AnyEvent::Fork::Pool> can create one for you, including some standard
180     options.
181    
182     =over 4
183    
184     =item template => $fork (default: C<< AnyEvent::Fork->new >>)
185    
186     The template process to use, if you want to create your own.
187    
188     =item require => \@modules (default: C<[]>)
189    
190     The modules in this list will be laoded into the template process.
191    
192     =item eval => "# perl code to execute in template" (default: none)
193    
194     This is a perl string that is evaluated after creating the template
195     process and after requiring the modules. It can do whatever it wants to
196     configure the process, but it must not do anything that would keep a later
197     fork from working (so must not create event handlers or (real) threads for
198     example).
199    
200     =back
201    
202     =item AnyEvent::Fork::RPC Parameters
203    
204     These parameters are all passed directly to L<AnyEvent::Fork::RPC>. They
205     are only briefly mentioned here, for their full documentation
206     please refer to the L<AnyEvent::Fork::RPC> documentation. Also, the
207     default values mentioned here are only documented as a best effort -
208     L<AnyEvent::Fork::RPC> documentation is binding.
209    
210     =over 4
211 root 1.1
212     =item async => $boolean (default: 0)
213    
214 root 1.2 Whether to sue the synchronous or asynchronous RPC backend.
215 root 1.1
216 root 1.2 =item on_error => $callback->($message) (default: die with message)
217 root 1.1
218 root 1.2 The callback to call on any (fatal) errors.
219 root 1.1
220 root 1.2 =item on_event => $callback->(...) (default: C<sub { }>, unlike L<AnyEvent::Fork::RPC>)
221 root 1.1
222 root 1.2 The callback to invoke on events.
223 root 1.1
224 root 1.2 =item init => $initfunction (default: none)
225 root 1.1
226 root 1.2 The function to call in the child, once before handling requests.
227 root 1.1
228 root 1.2 =item serialiser => $serialiser (defailt: $AnyEvent::Fork::RPC::STRING_SERIALISER)
229 root 1.1
230 root 1.2 The serialiser to use.
231 root 1.1
232 root 1.2 =back
233 root 1.1
234     =back
235    
236     =cut
237    
238     sub new {
239 root 1.2 my ($class, $function, %arg) = @_;
240    
241     my $self = bless {
242     min => 0,
243     max => 4,
244     max_queue => 2,
245     min_delay => 0,
246     max_idle => 1,
247     idle_time => 1,
248     on_event => sub { },
249     %arg,
250     pool => [],
251     queue => [],
252     }, $class;
253    
254     $self->{function} = $function;
255    
256     ($self->{template} ||= new AnyEvent::Fork)
257     ->require ("AnyEvent::Fork::RPC::" . ($self->{async} ? "Async" : "Sync"))
258     ->require (@{ delete $self->{require} })
259     ->eval ('
260     my ($magic0, $magic2) = @_;
261     sub AnyEvent::Fork::Pool::quit() {
262     AnyEvent::Fork::RPC::on_event $magic0, "quit", $magic2;
263     }
264     ', $magic0, $magic2)
265     ->eval (delete $self->{eval});
266    
267     $self->start
268     while @{ $self->{pool} } < $self->{min};
269 root 1.1
270 root 1.2 $self
271     }
272    
273     sub start {
274     my ($self) = @_;
275 root 1.1
276 root 1.2 warn "start\n";#d#
277    
278     Scalar::Util::weaken $self;
279    
280     my $proc = [0, undef, undef];
281    
282     $proc->[1] = $self->{template}
283     ->fork
284     ->AnyEvent::Fork::RPC::run ($self->{function},
285     async => $self->{async},
286     init => $self->{init},
287     serialiser => $self->{serialiser},
288     on_error => $self->{on_error},
289     on_event => sub {
290     if (@_ == 3 && $_[0] eq $magic0 && $_[2] eq $magic2) {
291     if ($_[1] eq "quit") {
292     my $pool = $self->{pool};
293     for (0 .. $#$pool) {
294     if ($pool->[$_] == $proc) {
295     Array::Heap::splice_heap @$pool, $_;
296     return;
297     }
298     }
299     die;
300     }
301     return;
302     }
303    
304     &{ $self->{on_event} };
305     },
306     )
307     ;
308    
309     ++$self->{idle};
310     Array::Heap::push_heap @{ $self->{pool} }, $proc;
311     }
312    
313     =item $pool->call (..., $cb->(...))
314    
315     Call the RPC function of a worker with the given arguments, and when the
316     worker is done, call the C<$cb> with the results, like just calling the
317     L<AnyEvent::Fork::RPC> object directly.
318    
319     If there is no free worker, the call will be queued.
320    
321     Note that there can be considerable time between calling this method and
322     the call actually being executed. During this time, the parameters passed
323     to this function are effectively read-only - modifying them after the call
324     and before the callback is invoked causes undefined behaviour.
325    
326     =cut
327 root 1.1
328 root 1.2 sub scheduler {
329     my $self = shift;
330 root 1.1
331 root 1.2 my $pool = $self->{pool};
332     my $queue = $self->{queue};
333 root 1.1
334 root 1.2 $self->start
335     unless @$pool;
336 root 1.1
337 root 1.2 while (@$queue) {
338     my $proc = $pool->[0];
339    
340     if ($proc->[0] < $self->{max_queue}) {
341     warn "free $proc $proc->[0]\n";#d#
342     # found free worker
343     --$self->{idle}
344     unless $proc->[0]++;
345    
346     undef $proc->[2];
347    
348     Array::Heap::adjust_heap @$pool, 0;
349    
350     my $job = shift @$queue;
351     my $ocb = pop @$job;
352    
353     $proc->[1]->(@$job, sub {
354     for (0 .. $#$pool) {
355     if ($pool->[$_] == $proc) {
356     # reduce queue counter
357     unless (--$pool->[$_][0]) {
358     # worker becomes idle
359     my $to = ++$self->{idle} > $self->{max_idle}
360     ? 0
361     : $self->{idle_time};
362    
363     $proc->[2] = AE::timer $to, 0, sub {
364     undef $proc->[2];
365    
366     warn "destroy $proc afzer $to\n";#d#
367    
368     for (0 .. $#$pool) {
369     if ($pool->[$_] == $proc) {
370     Array::Heap::splice_heap @$pool, $_;
371     --$self->{idle};
372     last;
373     }
374     }
375     };
376 root 1.1 }
377 root 1.2
378     Array::Heap::adjust_heap @$pool, $_;
379     last;
380 root 1.1 }
381     }
382 root 1.2 &$ocb;
383     });
384     } else {
385     warn "busy $proc->[0]\n";#d#
386     # all busy, delay
387    
388     $self->{min_delay_w} ||= AE::timer $self->{min_delay}, 0, sub {
389     delete $self->{min_delay_w};
390    
391     if (@{ $self->{queue} }) {
392     $self->start;
393     $self->scheduler;
394     }
395     };
396     last;
397     }
398     }
399     warn "last\n";#d#
400     }
401 root 1.1
402 root 1.2 sub call {
403     my $self = shift;
404    
405     push @{ $self->{queue} }, [@_];
406     $self->scheduler;
407 root 1.1 }
408    
409 root 1.2 sub DESTROY {
410     $_[0]{on_destroy}->();
411     }
412 root 1.1
413     =back
414    
415     =head1 SEE ALSO
416    
417     L<AnyEvent::Fork>, to create the processes in the first place.
418    
419     L<AnyEvent::Fork::RPC>, which implements the RPC protocol and API.
420    
421     =head1 AUTHOR AND CONTACT INFORMATION
422    
423     Marc Lehmann <schmorp@schmorp.de>
424     http://software.schmorp.de/pkg/AnyEvent-Fork-Pool
425    
426     =cut
427    
428     1
429