ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-Fork-Pool/Pool.pm
Revision: 1.5
Committed: Sat Apr 20 19:33:23 2013 UTC (11 years, 1 month ago) by root
Branch: MAIN
Changes since 1.4: +46 -43 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3     AnyEvent::Fork::Pool - simple process pool manager on top of AnyEvent::Fork
4    
5     =head1 SYNOPSIS
6    
7     use AnyEvent;
8     use AnyEvent::Fork::Pool;
9     # use AnyEvent::Fork is not needed
10    
11     # all parameters with default values
12 root 1.3 my $pool = AnyEvent::Fork
13     ->new
14     ->require ("MyWorker")
15     ->AnyEvent::Fork::Pool::run (
16     "MyWorker::run", # the worker function
17    
18     # pool management
19     max => 4, # absolute maximum # of processes
20     idle => 2, # minimum # of idle processes
21     load => 2, # queue at most this number of jobs per process
22     start => 0.1, # wait this many seconds before starting a new process
23     stop => 1, # wait this many seconds before stopping an idle process
24     on_destroy => (my $finish = AE::cv), # called when object is destroyed
25    
26     # parameters passed to AnyEvent::Fork::RPC
27     async => 0,
28     on_error => sub { die "FATAL: $_[0]\n" },
29     on_event => sub { my @ev = @_ },
30     init => "MyWorker::init",
31     serialiser => $AnyEvent::Fork::RPC::STRING_SERIALISER,
32     );
33 root 1.1
34     for (1..10) {
35 root 1.3 $pool->(doit => $_, sub {
36 root 1.1 print "MyWorker::run returned @_\n";
37     });
38     }
39    
40     undef $pool;
41    
42     $finish->recv;
43    
44     =head1 DESCRIPTION
45    
46     This module uses processes created via L<AnyEvent::Fork> and the RPC
47     protocol implement in L<AnyEvent::Fork::RPC> to create a load-balanced
48     pool of processes that handles jobs.
49    
50     Understanding of L<AnyEvent::Fork> is helpful but not critical to be able
51     to use this module, but a thorough understanding of L<AnyEvent::Fork::RPC>
52     is, as it defines the actual API that needs to be implemented in the
53     children.
54    
55     =head1 EXAMPLES
56    
57 root 1.2 =head1 PARENT USAGE
58 root 1.1
59     =over 4
60    
61     =cut
62    
63     package AnyEvent::Fork::Pool;
64    
65     use common::sense;
66    
67 root 1.2 use Scalar::Util ();
68    
69 root 1.3 use Guard ();
70 root 1.2 use Array::Heap ();
71 root 1.1
72     use AnyEvent;
73     use AnyEvent::Fork; # we don't actually depend on it, this is for convenience
74     use AnyEvent::Fork::RPC;
75    
76 root 1.3 # these are used for the first and last argument of events
77     # in the hope of not colliding. yes, I don't like it either,
78     # but didn't come up with an obviously better alternative.
79 root 1.2 my $magic0 = ':t6Z@HK1N%Dx@_7?=~-7NQgWDdAs6a,jFN=wLO0*jD*1%P';
80 root 1.3 my $magic1 = '<~53rexz.U`!]X[A235^"fyEoiTF\T~oH1l/N6+Djep9b~bI9`\1x%B~vWO1q*';
81 root 1.2
82 root 1.1 our $VERSION = 0.1;
83    
84 root 1.5 =item my $pool = AnyEvent::Fork::Pool::run $fork, $function, [key => value...]
85 root 1.3
86     The traditional way to call it. But it is way cooler to call it in the
87     following way:
88    
89 root 1.5 =item my $pool = $fork->AnyEvent::Fork::Pool::run ($function, [key => value...])
90 root 1.2
91     Creates a new pool object with the specified C<$function> as function
92 root 1.3 (name) to call for each request. The pool uses the C<$fork> object as the
93     template when creating worker processes.
94 root 1.2
95     You can supply your own template process, or tell C<AnyEvent::Fork::Pool>
96     to create one.
97    
98     A relatively large number of key/value pairs can be specified to influence
99     the behaviour. They are grouped into the categories "pool management",
100     "template process" and "rpc parameters".
101 root 1.1
102     =over 4
103    
104 root 1.2 =item Pool Management
105    
106     The pool consists of a certain number of worker processes. These options
107     decide how many of these processes exist and when they are started and
108 root 1.5 stopped.
109 root 1.2
110     =over 4
111    
112 root 1.3 =item idle => $count (default: 0)
113 root 1.2
114 root 1.3 The minimum amount of idle processes in the pool - when there are fewer
115     than this many idle workers, C<AnyEvent::Fork::Pool> will try to start new
116     ones, subject to C<max> and C<start>.
117    
118     This is also the initial/minimum amount of workers in the pool. The
119     default of zero means that the pool starts empty and can shrink back to
120     zero workers over time.
121 root 1.2
122     =item max => $count (default: 4)
123    
124     The maximum number of processes in the pool, in addition to the template
125     process. C<AnyEvent::Fork::Pool> will never create more than this number
126 root 1.3 of worker processes, although there can be more temporarily when a worker
127     is shut down and hasn't exited yet.
128 root 1.2
129 root 1.3 =item load => $count (default: 2)
130 root 1.2
131 root 1.3 The maximum number of concurrent jobs sent to a single worker
132     process. Worker processes that handle this number of jobs already are
133     called "busy".
134 root 1.1
135 root 1.2 Jobs that cannot be sent to a worker immediately (because all workers are
136     busy) will be queued until a worker is available.
137 root 1.1
138 root 1.3 =item start => $seconds (default: 0.1)
139 root 1.1
140 root 1.2 When a job is queued and all workers are busy, a timer is started. If the
141     timer elapses and there are still jobs that cannot be queued to a worker,
142     a new worker is started.
143 root 1.1
144 root 1.2 This configurs the time that all workers must be busy before a new worker
145     is started. Or, put differently, the minimum delay betwene starting new
146     workers.
147 root 1.1
148 root 1.2 The delay is zero by default, which means new workers will be started
149     without delay.
150 root 1.1
151 root 1.3 =item stop => $seconds (default: 1)
152 root 1.1
153 root 1.2 When a worker has no jobs to execute it becomes idle. An idle worker that
154     hasn't executed a job within this amount of time will be stopped, unless
155     the other parameters say otherwise.
156    
157     =item on_destroy => $callback->() (default: none)
158    
159     When a pool object goes out of scope, it will still handle all outstanding
160     jobs. After that, it will destroy all workers (and also the template
161     process if it isn't referenced otherwise).
162    
163     =back
164    
165     =item Template Process
166    
167     The worker processes are all forked from a single template
168     process. Ideally, all modules and all cdoe used by the worker, as well as
169     any shared data structures should be loaded into the template process, to
170     take advantage of data sharing via fork.
171    
172     You can create your own template process by creating a L<AnyEvent::Fork>
173     object yourself and passing it as the C<template> parameter, but
174     C<AnyEvent::Fork::Pool> can create one for you, including some standard
175     options.
176    
177     =over 4
178    
179     =item template => $fork (default: C<< AnyEvent::Fork->new >>)
180    
181     The template process to use, if you want to create your own.
182    
183     =item require => \@modules (default: C<[]>)
184    
185     The modules in this list will be laoded into the template process.
186    
187     =item eval => "# perl code to execute in template" (default: none)
188    
189     This is a perl string that is evaluated after creating the template
190     process and after requiring the modules. It can do whatever it wants to
191     configure the process, but it must not do anything that would keep a later
192     fork from working (so must not create event handlers or (real) threads for
193     example).
194    
195     =back
196    
197     =item AnyEvent::Fork::RPC Parameters
198    
199     These parameters are all passed directly to L<AnyEvent::Fork::RPC>. They
200     are only briefly mentioned here, for their full documentation
201     please refer to the L<AnyEvent::Fork::RPC> documentation. Also, the
202     default values mentioned here are only documented as a best effort -
203     L<AnyEvent::Fork::RPC> documentation is binding.
204    
205     =over 4
206 root 1.1
207     =item async => $boolean (default: 0)
208    
209 root 1.2 Whether to sue the synchronous or asynchronous RPC backend.
210 root 1.1
211 root 1.2 =item on_error => $callback->($message) (default: die with message)
212 root 1.1
213 root 1.2 The callback to call on any (fatal) errors.
214 root 1.1
215 root 1.2 =item on_event => $callback->(...) (default: C<sub { }>, unlike L<AnyEvent::Fork::RPC>)
216 root 1.1
217 root 1.2 The callback to invoke on events.
218 root 1.1
219 root 1.2 =item init => $initfunction (default: none)
220 root 1.1
221 root 1.2 The function to call in the child, once before handling requests.
222 root 1.1
223 root 1.2 =item serialiser => $serialiser (defailt: $AnyEvent::Fork::RPC::STRING_SERIALISER)
224 root 1.1
225 root 1.2 The serialiser to use.
226 root 1.1
227 root 1.2 =back
228 root 1.1
229     =back
230    
231     =cut
232    
233 root 1.3 sub run {
234     my ($template, $function, %arg) = @_;
235    
236     my $max = $arg{max} || 4;
237     my $idle = $arg{idle} || 0,
238     my $load = $arg{load} || 2,
239     my $start = $arg{start} || 0.1,
240     my $stop = $arg{stop} || 1,
241     my $on_event = $arg{on_event} || sub { },
242     my $on_destroy = $arg{on_destroy};
243    
244     my @rpc = (
245 root 1.5 async => $arg{async},
246     init => $arg{init},
247     serialiser => delete $arg{serialiser},
248     on_error => $arg{on_error},
249 root 1.3 );
250    
251     my (@pool, @queue, $nidle, $start_w, $stop_w, $shutdown);
252 root 1.5 my ($start_worker, $stop_worker, $want_start, $want_stop, $scheduler);
253 root 1.3
254     my $destroy_guard = Guard::guard {
255     $on_destroy->()
256     if $on_destroy;
257     };
258 root 1.2
259 root 1.3 $template
260     ->require ("AnyEvent::Fork::RPC::" . ($arg{async} ? "Async" : "Sync"))
261 root 1.2 ->eval ('
262 root 1.3 my ($magic0, $magic1) = @_;
263 root 1.2 sub AnyEvent::Fork::Pool::quit() {
264 root 1.3 AnyEvent::Fork::RPC::on_event $magic0, "quit", $magic1;
265 root 1.2 }
266 root 1.3 ', $magic0, $magic1)
267 root 1.5 ->eval ($arg{eval});
268 root 1.3
269 root 1.5 $start_worker = sub {
270 root 1.3 my $proc = [0, 0, undef]; # load, index, rpc
271 root 1.2
272 root 1.3 $proc->[2] = $template
273     ->fork
274     ->AnyEvent::Fork::RPC::run ($function,
275     @rpc,
276     on_event => sub {
277     if (@_ == 3 && $_[0] eq $magic0 && $_[2] eq $magic1) {
278     $destroy_guard if 0; # keep it alive
279 root 1.2
280 root 1.5 $_[1] eq "quit" and $stop_worker->($proc);
281 root 1.3 return;
282 root 1.2 }
283    
284 root 1.3 &$on_event;
285     },
286     )
287     ;
288    
289     ++$nidle;
290 root 1.5 Array::Heap::push_heap_idx @pool, $proc;
291 root 1.3
292     Scalar::Util::weaken $proc;
293     };
294    
295 root 1.5 $stop_worker = sub {
296 root 1.3 my $proc = shift;
297    
298     $proc->[0]
299     or --$nidle;
300    
301     Array::Heap::splice_heap_idx @pool, $proc->[1]
302     if defined $proc->[1];
303     };
304    
305     $want_start = sub {
306     undef $stop_w;
307    
308 root 1.5 $start_w ||= AE::timer $start, $start, sub {
309     if (($nidle < $idle || @queue) && @pool < $max) {
310     $start_worker->();
311 root 1.3 $scheduler->();
312 root 1.5 } else {
313     undef $start_w;
314 root 1.3 }
315     };
316     };
317    
318     $want_stop = sub {
319 root 1.5 $stop_w ||= AE::timer $stop, $stop, sub {
320     $stop_worker->($pool[0])
321     if $nidle;
322 root 1.3
323 root 1.5 undef $stop_w
324     if $nidle <= $idle;
325 root 1.3 };
326     };
327    
328     $scheduler = sub {
329     if (@queue) {
330     while (@queue) {
331     my $proc = $pool[0];
332    
333     if ($proc->[0] < $load) {
334 root 1.5 # found free worker, increase load
335     unless ($proc->[0]++) {
336     # worker became busy
337     --$nidle
338     or undef $stop_w;
339    
340     $want_start->()
341     if $nidle < $idle && @pool < $max;
342     }
343 root 1.3
344 root 1.5 Array::Heap::adjust_heap_idx @pool, 0;
345 root 1.3
346     my $job = shift @queue;
347     my $ocb = pop @$job;
348    
349     $proc->[2]->(@$job, sub {
350 root 1.5 # reduce load
351     --$proc->[0] # worker still busy?
352     or ++$nidle > $idle # not too many idle processes?
353 root 1.3 or $want_stop->();
354    
355 root 1.5 Array::Heap::adjust_heap_idx @pool, $proc->[1]
356     if defined $proc->[1];
357 root 1.3
358     $scheduler->();
359    
360     &$ocb;
361     });
362     } else {
363 root 1.5 $want_start->()
364     unless @pool >= $max;
365 root 1.3
366     last;
367     }
368     }
369     } elsif ($shutdown) {
370     @pool = ();
371     undef $start_w;
372 root 1.5 undef $start_worker; # frees $destroy_guard reference
373 root 1.3
374 root 1.5 $stop_worker->($pool[0])
375 root 1.3 while $nidle;
376     }
377     };
378    
379     my $shutdown_guard = Guard::guard {
380     $shutdown = 1;
381     $scheduler->();
382     };
383    
384 root 1.5 $start_worker->()
385 root 1.3 while @pool < $idle;
386    
387     sub {
388     $shutdown_guard if 0; # keep it alive
389 root 1.2
390 root 1.5 $start_worker->()
391 root 1.3 unless @pool;
392    
393     push @queue, [@_];
394     $scheduler->();
395     }
396 root 1.2 }
397    
398 root 1.5 =item $pool->(..., $cb->(...))
399 root 1.2
400     Call the RPC function of a worker with the given arguments, and when the
401 root 1.5 worker is done, call the C<$cb> with the results, just like calling the
402 root 1.2 L<AnyEvent::Fork::RPC> object directly.
403    
404     If there is no free worker, the call will be queued.
405    
406     Note that there can be considerable time between calling this method and
407     the call actually being executed. During this time, the parameters passed
408     to this function are effectively read-only - modifying them after the call
409     and before the callback is invoked causes undefined behaviour.
410    
411     =cut
412 root 1.1
413     =back
414    
415     =head1 SEE ALSO
416    
417     L<AnyEvent::Fork>, to create the processes in the first place.
418    
419     L<AnyEvent::Fork::RPC>, which implements the RPC protocol and API.
420    
421     =head1 AUTHOR AND CONTACT INFORMATION
422    
423     Marc Lehmann <schmorp@schmorp.de>
424     http://software.schmorp.de/pkg/AnyEvent-Fork-Pool
425    
426     =cut
427    
428     1
429