ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-Fork-Pool/Pool.pm
Revision: 1.4
Committed: Sat Apr 20 16:39:14 2013 UTC (11 years, 1 month ago) by root
Branch: MAIN
Changes since 1.3: +0 -5 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3     AnyEvent::Fork::Pool - simple process pool manager on top of AnyEvent::Fork
4    
5     =head1 SYNOPSIS
6    
7     use AnyEvent;
8     use AnyEvent::Fork::Pool;
9     # use AnyEvent::Fork is not needed
10    
11     # all parameters with default values
12 root 1.3 my $pool = AnyEvent::Fork
13     ->new
14     ->require ("MyWorker")
15     ->AnyEvent::Fork::Pool::run (
16     "MyWorker::run", # the worker function
17    
18     # pool management
19     max => 4, # absolute maximum # of processes
20     idle => 2, # minimum # of idle processes
21     load => 2, # queue at most this number of jobs per process
22     start => 0.1, # wait this many seconds before starting a new process
23     stop => 1, # wait this many seconds before stopping an idle process
24     on_destroy => (my $finish = AE::cv), # called when object is destroyed
25    
26     # parameters passed to AnyEvent::Fork::RPC
27     async => 0,
28     on_error => sub { die "FATAL: $_[0]\n" },
29     on_event => sub { my @ev = @_ },
30     init => "MyWorker::init",
31     serialiser => $AnyEvent::Fork::RPC::STRING_SERIALISER,
32     );
33 root 1.1
34     for (1..10) {
35 root 1.3 $pool->(doit => $_, sub {
36 root 1.1 print "MyWorker::run returned @_\n";
37     });
38     }
39    
40     undef $pool;
41    
42     $finish->recv;
43    
44     =head1 DESCRIPTION
45    
46     This module uses processes created via L<AnyEvent::Fork> and the RPC
47     protocol implement in L<AnyEvent::Fork::RPC> to create a load-balanced
48     pool of processes that handles jobs.
49    
50     Understanding of L<AnyEvent::Fork> is helpful but not critical to be able
51     to use this module, but a thorough understanding of L<AnyEvent::Fork::RPC>
52     is, as it defines the actual API that needs to be implemented in the
53     children.
54    
55     =head1 EXAMPLES
56    
57 root 1.2 =head1 PARENT USAGE
58 root 1.1
59     =over 4
60    
61     =cut
62    
63     package AnyEvent::Fork::Pool;
64    
65     use common::sense;
66    
67 root 1.2 use Scalar::Util ();
68    
69 root 1.3 use Guard ();
70 root 1.2 use Array::Heap ();
71 root 1.1
72     use AnyEvent;
73     use AnyEvent::Fork; # we don't actually depend on it, this is for convenience
74     use AnyEvent::Fork::RPC;
75    
76 root 1.3 # these are used for the first and last argument of events
77     # in the hope of not colliding. yes, I don't like it either,
78     # but didn't come up with an obviously better alternative.
79 root 1.2 my $magic0 = ':t6Z@HK1N%Dx@_7?=~-7NQgWDdAs6a,jFN=wLO0*jD*1%P';
80 root 1.3 my $magic1 = '<~53rexz.U`!]X[A235^"fyEoiTF\T~oH1l/N6+Djep9b~bI9`\1x%B~vWO1q*';
81 root 1.2
82 root 1.1 our $VERSION = 0.1;
83    
84 root 1.3 =item my $rpc = AnyEvent::Fork::Pool::run $fork, $function, [key => value...]
85    
86     The traditional way to call it. But it is way cooler to call it in the
87     following way:
88    
89     =item my $rpc = $fork->AnyEvent::Fork::Pool::run ($function, [key => value...])
90 root 1.2
91     Creates a new pool object with the specified C<$function> as function
92 root 1.3 (name) to call for each request. The pool uses the C<$fork> object as the
93     template when creating worker processes.
94 root 1.2
95     You can supply your own template process, or tell C<AnyEvent::Fork::Pool>
96     to create one.
97    
98     A relatively large number of key/value pairs can be specified to influence
99     the behaviour. They are grouped into the categories "pool management",
100     "template process" and "rpc parameters".
101 root 1.1
102     =over 4
103    
104 root 1.2 =item Pool Management
105    
106     The pool consists of a certain number of worker processes. These options
107     decide how many of these processes exist and when they are started and
108     stopp.ed
109    
110     =over 4
111    
112 root 1.3 =item idle => $count (default: 0)
113 root 1.2
114 root 1.3 The minimum amount of idle processes in the pool - when there are fewer
115     than this many idle workers, C<AnyEvent::Fork::Pool> will try to start new
116     ones, subject to C<max> and C<start>.
117    
118     This is also the initial/minimum amount of workers in the pool. The
119     default of zero means that the pool starts empty and can shrink back to
120     zero workers over time.
121 root 1.2
122     =item max => $count (default: 4)
123    
124     The maximum number of processes in the pool, in addition to the template
125     process. C<AnyEvent::Fork::Pool> will never create more than this number
126 root 1.3 of worker processes, although there can be more temporarily when a worker
127     is shut down and hasn't exited yet.
128 root 1.2
129 root 1.3 =item load => $count (default: 2)
130 root 1.2
131 root 1.3 The maximum number of concurrent jobs sent to a single worker
132     process. Worker processes that handle this number of jobs already are
133     called "busy".
134 root 1.1
135 root 1.2 Jobs that cannot be sent to a worker immediately (because all workers are
136     busy) will be queued until a worker is available.
137 root 1.1
138 root 1.3 =item start => $seconds (default: 0.1)
139 root 1.1
140 root 1.2 When a job is queued and all workers are busy, a timer is started. If the
141     timer elapses and there are still jobs that cannot be queued to a worker,
142     a new worker is started.
143 root 1.1
144 root 1.2 This configurs the time that all workers must be busy before a new worker
145     is started. Or, put differently, the minimum delay betwene starting new
146     workers.
147 root 1.1
148 root 1.2 The delay is zero by default, which means new workers will be started
149     without delay.
150 root 1.1
151 root 1.3 =item stop => $seconds (default: 1)
152 root 1.1
153 root 1.2 When a worker has no jobs to execute it becomes idle. An idle worker that
154     hasn't executed a job within this amount of time will be stopped, unless
155     the other parameters say otherwise.
156    
157     =item on_destroy => $callback->() (default: none)
158    
159     When a pool object goes out of scope, it will still handle all outstanding
160     jobs. After that, it will destroy all workers (and also the template
161     process if it isn't referenced otherwise).
162    
163     =back
164    
165     =item Template Process
166    
167     The worker processes are all forked from a single template
168     process. Ideally, all modules and all cdoe used by the worker, as well as
169     any shared data structures should be loaded into the template process, to
170     take advantage of data sharing via fork.
171    
172     You can create your own template process by creating a L<AnyEvent::Fork>
173     object yourself and passing it as the C<template> parameter, but
174     C<AnyEvent::Fork::Pool> can create one for you, including some standard
175     options.
176    
177     =over 4
178    
179     =item template => $fork (default: C<< AnyEvent::Fork->new >>)
180    
181     The template process to use, if you want to create your own.
182    
183     =item require => \@modules (default: C<[]>)
184    
185     The modules in this list will be laoded into the template process.
186    
187     =item eval => "# perl code to execute in template" (default: none)
188    
189     This is a perl string that is evaluated after creating the template
190     process and after requiring the modules. It can do whatever it wants to
191     configure the process, but it must not do anything that would keep a later
192     fork from working (so must not create event handlers or (real) threads for
193     example).
194    
195     =back
196    
197     =item AnyEvent::Fork::RPC Parameters
198    
199     These parameters are all passed directly to L<AnyEvent::Fork::RPC>. They
200     are only briefly mentioned here, for their full documentation
201     please refer to the L<AnyEvent::Fork::RPC> documentation. Also, the
202     default values mentioned here are only documented as a best effort -
203     L<AnyEvent::Fork::RPC> documentation is binding.
204    
205     =over 4
206 root 1.1
207     =item async => $boolean (default: 0)
208    
209 root 1.2 Whether to sue the synchronous or asynchronous RPC backend.
210 root 1.1
211 root 1.2 =item on_error => $callback->($message) (default: die with message)
212 root 1.1
213 root 1.2 The callback to call on any (fatal) errors.
214 root 1.1
215 root 1.2 =item on_event => $callback->(...) (default: C<sub { }>, unlike L<AnyEvent::Fork::RPC>)
216 root 1.1
217 root 1.2 The callback to invoke on events.
218 root 1.1
219 root 1.2 =item init => $initfunction (default: none)
220 root 1.1
221 root 1.2 The function to call in the child, once before handling requests.
222 root 1.1
223 root 1.2 =item serialiser => $serialiser (defailt: $AnyEvent::Fork::RPC::STRING_SERIALISER)
224 root 1.1
225 root 1.2 The serialiser to use.
226 root 1.1
227 root 1.2 =back
228 root 1.1
229     =back
230    
231     =cut
232    
233 root 1.3 sub run {
234     my ($template, $function, %arg) = @_;
235    
236     my $max = $arg{max} || 4;
237     my $idle = $arg{idle} || 0,
238     my $load = $arg{load} || 2,
239     my $start = $arg{start} || 0.1,
240     my $stop = $arg{stop} || 1,
241     my $on_event = $arg{on_event} || sub { },
242     my $on_destroy = $arg{on_destroy};
243    
244     my @rpc = (
245     async => $arg{async},
246     init => $arg{init},
247     serialiser => $arg{serialiser},
248     on_error => $arg{on_error},
249     );
250    
251     my (@pool, @queue, $nidle, $start_w, $stop_w, $shutdown);
252     my ($start, $stop, $want_start, $want_stop, $scheduler);
253    
254     my $destroy_guard = Guard::guard {
255     $on_destroy->()
256     if $on_destroy;
257     };
258 root 1.2
259 root 1.3 $template
260     ->require ("AnyEvent::Fork::RPC::" . ($arg{async} ? "Async" : "Sync"))
261 root 1.2 ->eval ('
262 root 1.3 my ($magic0, $magic1) = @_;
263 root 1.2 sub AnyEvent::Fork::Pool::quit() {
264 root 1.3 AnyEvent::Fork::RPC::on_event $magic0, "quit", $magic1;
265 root 1.2 }
266 root 1.3 ', $magic0, $magic1)
267     ->eval (delete $arg{eval});
268    
269     $start = sub {
270     my $proc = [0, 0, undef]; # load, index, rpc
271 root 1.2
272 root 1.3 warn "start a worker\n";#d#
273 root 1.1
274 root 1.3 $proc->[2] = $template
275     ->fork
276     ->AnyEvent::Fork::RPC::run ($function,
277     @rpc,
278     on_event => sub {
279     if (@_ == 3 && $_[0] eq $magic0 && $_[2] eq $magic1) {
280     $destroy_guard if 0; # keep it alive
281 root 1.2
282 root 1.3 $_[1] eq "quit" and $stop->($proc);
283     return;
284 root 1.2 }
285    
286 root 1.3 &$on_event;
287     },
288     )
289     ;
290    
291     ++$nidle;
292     Array::Heap::push_heap @pool, $proc;
293    
294     Scalar::Util::weaken $proc;
295     };
296    
297     $stop = sub {
298     my $proc = shift;
299    
300     $proc->[0]
301     or --$nidle;
302    
303     Array::Heap::splice_heap_idx @pool, $proc->[1]
304     if defined $proc->[1];
305     };
306    
307     $want_start = sub {
308     undef $stop_w;
309    
310     $start_w ||= AE::timer $start, 0, sub {
311     undef $start_w;
312    
313     if (@queue) {
314     $start->();
315     $scheduler->();
316     }
317     };
318     };
319    
320     $want_stop = sub {
321     $stop_w ||= AE::timer $stop, 0, sub {
322     undef $stop_w;
323    
324     $stop->($pool[0])
325     if $nidle;
326     };
327     };
328    
329     $scheduler = sub {
330     if (@queue) {
331     while (@queue) {
332     my $proc = $pool[0];
333    
334     if ($proc->[0] < $load) {
335     warn "free $proc $proc->[0]\n";#d#
336     # found free worker
337     $proc->[0]++
338     or --$nidle >= $idle
339     or $want_start->();
340    
341     Array::Heap::adjust_heap @pool, 0;
342    
343     my $job = shift @queue;
344     my $ocb = pop @$job;
345    
346     $proc->[2]->(@$job, sub {
347     # reduce queue counter
348     --$pool[$_][0]
349     or ++$nidle > $idle
350     or $want_stop->();
351    
352     Array::Heap::adjust_heap @pool, $_;
353    
354     $scheduler->();
355    
356     &$ocb;
357     });
358     } else {
359     warn "busy $proc->[0]\n";#d#
360     # all busy, delay
361    
362     $want_start->();
363     last;
364     }
365     }
366     } elsif ($shutdown) {
367     @pool = ();
368     undef $start_w;
369     undef $start; # frees $destroy_guard reference
370    
371     $stop->($pool[0])
372     while $nidle;
373     }
374     };
375    
376     my $shutdown_guard = Guard::guard {
377     $shutdown = 1;
378     $scheduler->();
379     };
380    
381     $start->()
382     while @pool < $idle;
383    
384     sub {
385     $shutdown_guard if 0; # keep it alive
386 root 1.2
387 root 1.3 $start->()
388     unless @pool;
389    
390     push @queue, [@_];
391     $scheduler->();
392     }
393 root 1.2 }
394    
395     =item $pool->call (..., $cb->(...))
396    
397     Call the RPC function of a worker with the given arguments, and when the
398     worker is done, call the C<$cb> with the results, like just calling the
399     L<AnyEvent::Fork::RPC> object directly.
400    
401     If there is no free worker, the call will be queued.
402    
403     Note that there can be considerable time between calling this method and
404     the call actually being executed. During this time, the parameters passed
405     to this function are effectively read-only - modifying them after the call
406     and before the callback is invoked causes undefined behaviour.
407    
408     =cut
409 root 1.1
410     =back
411    
412     =head1 SEE ALSO
413    
414     L<AnyEvent::Fork>, to create the processes in the first place.
415    
416     L<AnyEvent::Fork::RPC>, which implements the RPC protocol and API.
417    
418     =head1 AUTHOR AND CONTACT INFORMATION
419    
420     Marc Lehmann <schmorp@schmorp.de>
421     http://software.schmorp.de/pkg/AnyEvent-Fork-Pool
422    
423     =cut
424    
425     1
426