ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-Fork-Pool/Pool.pm
Revision: 1.3
Committed: Sat Apr 20 16:38:10 2013 UTC (11 years, 1 month ago) by root
Branch: MAIN
Changes since 1.2: +206 -204 lines
Log Message:
api mostly finalised

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3     AnyEvent::Fork::Pool - simple process pool manager on top of AnyEvent::Fork
4    
5     =head1 SYNOPSIS
6    
7     use AnyEvent;
8     use AnyEvent::Fork::Pool;
9     # use AnyEvent::Fork is not needed
10    
11     # all parameters with default values
12 root 1.3 my $pool = AnyEvent::Fork
13     ->new
14     ->require ("MyWorker")
15     ->AnyEvent::Fork::Pool::run (
16     "MyWorker::run", # the worker function
17    
18     # pool management
19     max => 4, # absolute maximum # of processes
20     idle => 2, # minimum # of idle processes
21     load => 2, # queue at most this number of jobs per process
22     start => 0.1, # wait this many seconds before starting a new process
23     stop => 1, # wait this many seconds before stopping an idle process
24     on_destroy => (my $finish = AE::cv), # called when object is destroyed
25    
26     # parameters passed to AnyEvent::Fork::RPC
27     async => 0,
28     on_error => sub { die "FATAL: $_[0]\n" },
29     on_event => sub { my @ev = @_ },
30     init => "MyWorker::init",
31     serialiser => $AnyEvent::Fork::RPC::STRING_SERIALISER,
32     );
33 root 1.1
34     for (1..10) {
35 root 1.3 $pool->(doit => $_, sub {
36 root 1.1 print "MyWorker::run returned @_\n";
37     });
38     }
39    
40     undef $pool;
41    
42     $finish->recv;
43    
44     =head1 DESCRIPTION
45    
46     This module uses processes created via L<AnyEvent::Fork> and the RPC
47     protocol implement in L<AnyEvent::Fork::RPC> to create a load-balanced
48     pool of processes that handles jobs.
49    
50     Understanding of L<AnyEvent::Fork> is helpful but not critical to be able
51     to use this module, but a thorough understanding of L<AnyEvent::Fork::RPC>
52     is, as it defines the actual API that needs to be implemented in the
53     children.
54    
55     =head1 EXAMPLES
56    
57 root 1.2 =head1 PARENT USAGE
58 root 1.1
59     =over 4
60    
61     =cut
62    
63     package AnyEvent::Fork::Pool;
64    
65     use common::sense;
66    
67 root 1.2 use Scalar::Util ();
68    
69 root 1.3 use Guard ();
70 root 1.2 use Array::Heap ();
71 root 1.1
72     use AnyEvent;
73     use AnyEvent::Fork; # we don't actually depend on it, this is for convenience
74     use AnyEvent::Fork::RPC;
75    
76 root 1.3 # these are used for the first and last argument of events
77     # in the hope of not colliding. yes, I don't like it either,
78     # but didn't come up with an obviously better alternative.
79 root 1.2 my $magic0 = ':t6Z@HK1N%Dx@_7?=~-7NQgWDdAs6a,jFN=wLO0*jD*1%P';
80 root 1.3 my $magic1 = '<~53rexz.U`!]X[A235^"fyEoiTF\T~oH1l/N6+Djep9b~bI9`\1x%B~vWO1q*';
81 root 1.2
82 root 1.1 our $VERSION = 0.1;
83    
84 root 1.3 =item my $rpc = AnyEvent::Fork::Pool::run $fork, $function, [key => value...]
85    
86     The traditional way to call it. But it is way cooler to call it in the
87     following way:
88    
89     =item my $rpc = $fork->AnyEvent::Fork::Pool::run ($function, [key => value...])
90 root 1.2
91     Creates a new pool object with the specified C<$function> as function
92 root 1.3 (name) to call for each request. The pool uses the C<$fork> object as the
93     template when creating worker processes.
94 root 1.2
95     You can supply your own template process, or tell C<AnyEvent::Fork::Pool>
96     to create one.
97    
98     A relatively large number of key/value pairs can be specified to influence
99     the behaviour. They are grouped into the categories "pool management",
100     "template process" and "rpc parameters".
101 root 1.1
102     =over 4
103    
104 root 1.2 =item Pool Management
105    
106     The pool consists of a certain number of worker processes. These options
107     decide how many of these processes exist and when they are started and
108     stopp.ed
109    
110     =over 4
111    
112 root 1.3 =item idle => $count (default: 0)
113 root 1.2
114 root 1.3 The minimum amount of idle processes in the pool - when there are fewer
115     than this many idle workers, C<AnyEvent::Fork::Pool> will try to start new
116     ones, subject to C<max> and C<start>.
117    
118     This is also the initial/minimum amount of workers in the pool. The
119     default of zero means that the pool starts empty and can shrink back to
120     zero workers over time.
121 root 1.2
122     =item max => $count (default: 4)
123    
124     The maximum number of processes in the pool, in addition to the template
125     process. C<AnyEvent::Fork::Pool> will never create more than this number
126 root 1.3 of worker processes, although there can be more temporarily when a worker
127     is shut down and hasn't exited yet.
128 root 1.2
129 root 1.3 =item load => $count (default: 2)
130 root 1.2
131 root 1.3 The maximum number of concurrent jobs sent to a single worker
132     process. Worker processes that handle this number of jobs already are
133     called "busy".
134 root 1.1
135 root 1.2 Jobs that cannot be sent to a worker immediately (because all workers are
136     busy) will be queued until a worker is available.
137 root 1.1
138 root 1.3 =item start => $seconds (default: 0.1)
139 root 1.1
140 root 1.2 When a job is queued and all workers are busy, a timer is started. If the
141     timer elapses and there are still jobs that cannot be queued to a worker,
142     a new worker is started.
143 root 1.1
144 root 1.2 This configurs the time that all workers must be busy before a new worker
145     is started. Or, put differently, the minimum delay betwene starting new
146     workers.
147 root 1.1
148 root 1.2 The delay is zero by default, which means new workers will be started
149     without delay.
150 root 1.1
151 root 1.3 =item stop => $seconds (default: 1)
152 root 1.1
153 root 1.2 When a worker has no jobs to execute it becomes idle. An idle worker that
154     hasn't executed a job within this amount of time will be stopped, unless
155     the other parameters say otherwise.
156    
157     =item on_destroy => $callback->() (default: none)
158    
159     When a pool object goes out of scope, it will still handle all outstanding
160     jobs. After that, it will destroy all workers (and also the template
161     process if it isn't referenced otherwise).
162    
163     =back
164    
165     =item Template Process
166    
167     The worker processes are all forked from a single template
168     process. Ideally, all modules and all cdoe used by the worker, as well as
169     any shared data structures should be loaded into the template process, to
170     take advantage of data sharing via fork.
171    
172     You can create your own template process by creating a L<AnyEvent::Fork>
173     object yourself and passing it as the C<template> parameter, but
174     C<AnyEvent::Fork::Pool> can create one for you, including some standard
175     options.
176    
177     =over 4
178    
179     =item template => $fork (default: C<< AnyEvent::Fork->new >>)
180    
181     The template process to use, if you want to create your own.
182    
183     =item require => \@modules (default: C<[]>)
184    
185     The modules in this list will be laoded into the template process.
186    
187     =item eval => "# perl code to execute in template" (default: none)
188    
189     This is a perl string that is evaluated after creating the template
190     process and after requiring the modules. It can do whatever it wants to
191     configure the process, but it must not do anything that would keep a later
192     fork from working (so must not create event handlers or (real) threads for
193     example).
194    
195     =back
196    
197     =item AnyEvent::Fork::RPC Parameters
198    
199     These parameters are all passed directly to L<AnyEvent::Fork::RPC>. They
200     are only briefly mentioned here, for their full documentation
201     please refer to the L<AnyEvent::Fork::RPC> documentation. Also, the
202     default values mentioned here are only documented as a best effort -
203     L<AnyEvent::Fork::RPC> documentation is binding.
204    
205     =over 4
206 root 1.1
207     =item async => $boolean (default: 0)
208    
209 root 1.2 Whether to sue the synchronous or asynchronous RPC backend.
210 root 1.1
211 root 1.2 =item on_error => $callback->($message) (default: die with message)
212 root 1.1
213 root 1.2 The callback to call on any (fatal) errors.
214 root 1.1
215 root 1.2 =item on_event => $callback->(...) (default: C<sub { }>, unlike L<AnyEvent::Fork::RPC>)
216 root 1.1
217 root 1.2 The callback to invoke on events.
218 root 1.1
219 root 1.2 =item init => $initfunction (default: none)
220 root 1.1
221 root 1.2 The function to call in the child, once before handling requests.
222 root 1.1
223 root 1.2 =item serialiser => $serialiser (defailt: $AnyEvent::Fork::RPC::STRING_SERIALISER)
224 root 1.1
225 root 1.2 The serialiser to use.
226 root 1.1
227 root 1.2 =back
228 root 1.1
229     =back
230    
231     =cut
232    
233 root 1.3 sub run {
234     my ($template, $function, %arg) = @_;
235    
236     my $max = $arg{max} || 4;
237     my $idle = $arg{idle} || 0,
238     my $load = $arg{load} || 2,
239     my $start = $arg{start} || 0.1,
240     my $stop = $arg{stop} || 1,
241     my $on_event = $arg{on_event} || sub { },
242     my $on_destroy = $arg{on_destroy};
243    
244     my @rpc = (
245     async => $arg{async},
246     init => $arg{init},
247     serialiser => $arg{serialiser},
248     on_error => $arg{on_error},
249     );
250    
251     my (@pool, @queue, $nidle, $start_w, $stop_w, $shutdown);
252     my ($start, $stop, $want_start, $want_stop, $scheduler);
253    
254     my $destroy_guard = Guard::guard {
255     $on_destroy->()
256     if $on_destroy;
257     };
258 root 1.2
259 root 1.3 my $busy;#d#
260    
261     $template
262     ->require ("AnyEvent::Fork::RPC::" . ($arg{async} ? "Async" : "Sync"))
263 root 1.2 ->eval ('
264 root 1.3 my ($magic0, $magic1) = @_;
265 root 1.2 sub AnyEvent::Fork::Pool::quit() {
266 root 1.3 AnyEvent::Fork::RPC::on_event $magic0, "quit", $magic1;
267 root 1.2 }
268 root 1.3 ', $magic0, $magic1)
269     ->eval (delete $arg{eval});
270    
271     $start = sub {
272     my $proc = [0, 0, undef]; # load, index, rpc
273 root 1.2
274 root 1.3 warn "start a worker\n";#d#
275 root 1.1
276 root 1.3 $proc->[2] = $template
277     ->fork
278     ->AnyEvent::Fork::RPC::run ($function,
279     @rpc,
280     on_event => sub {
281     if (@_ == 3 && $_[0] eq $magic0 && $_[2] eq $magic1) {
282     $destroy_guard if 0; # keep it alive
283 root 1.2
284 root 1.3 $_[1] eq "quit" and $stop->($proc);
285     return;
286 root 1.2 }
287    
288 root 1.3 &$on_event;
289     },
290     )
291     ;
292    
293     ++$nidle;
294     Array::Heap::push_heap @pool, $proc;
295    
296     Scalar::Util::weaken $proc;
297     };
298    
299     $stop = sub {
300     my $proc = shift;
301    
302     $proc->[0]
303     or --$nidle;
304    
305     Array::Heap::splice_heap_idx @pool, $proc->[1]
306     if defined $proc->[1];
307     };
308    
309     $want_start = sub {
310     undef $stop_w;
311    
312     $start_w ||= AE::timer $start, 0, sub {
313     undef $start_w;
314    
315     if (@queue) {
316     $start->();
317     $scheduler->();
318     }
319     };
320     };
321    
322     $want_stop = sub {
323     $stop_w ||= AE::timer $stop, 0, sub {
324     undef $stop_w;
325    
326     $stop->($pool[0])
327     if $nidle;
328     };
329     };
330    
331     $scheduler = sub {
332     if (@queue) {
333     while (@queue) {
334     my $proc = $pool[0];
335    
336     if ($proc->[0] < $load) {
337     warn "free $proc $proc->[0]\n";#d#
338     # found free worker
339     $proc->[0]++
340     or --$nidle >= $idle
341     or $want_start->();
342    
343     Array::Heap::adjust_heap @pool, 0;
344    
345     my $job = shift @queue;
346     my $ocb = pop @$job;
347    
348     $proc->[2]->(@$job, sub {
349     --$busy; warn "busy now $busy\n";#d#
350     # reduce queue counter
351     --$pool[$_][0]
352     or ++$nidle > $idle
353     or $want_stop->();
354    
355     Array::Heap::adjust_heap @pool, $_;
356    
357     $scheduler->();
358    
359     &$ocb;
360     });
361     } else {
362     warn "busy $proc->[0]\n";#d#
363     # all busy, delay
364    
365     $want_start->();
366     last;
367     }
368     }
369     } elsif ($shutdown) {
370     @pool = ();
371     undef $start_w;
372     undef $start; # frees $destroy_guard reference
373    
374     $stop->($pool[0])
375     while $nidle;
376     }
377     };
378    
379     my $shutdown_guard = Guard::guard {
380     $shutdown = 1;
381     $scheduler->();
382     };
383    
384     $start->()
385     while @pool < $idle;
386    
387     sub {
388     $shutdown_guard if 0; # keep it alive
389 root 1.2
390 root 1.3 ++$busy;#d#
391    
392     $start->()
393     unless @pool;
394    
395     push @queue, [@_];
396     $scheduler->();
397     }
398 root 1.2 }
399    
400     =item $pool->call (..., $cb->(...))
401    
402     Call the RPC function of a worker with the given arguments, and when the
403     worker is done, call the C<$cb> with the results, like just calling the
404     L<AnyEvent::Fork::RPC> object directly.
405    
406     If there is no free worker, the call will be queued.
407    
408     Note that there can be considerable time between calling this method and
409     the call actually being executed. During this time, the parameters passed
410     to this function are effectively read-only - modifying them after the call
411     and before the callback is invoked causes undefined behaviour.
412    
413     =cut
414 root 1.1
415     =back
416    
417     =head1 SEE ALSO
418    
419     L<AnyEvent::Fork>, to create the processes in the first place.
420    
421     L<AnyEvent::Fork::RPC>, which implements the RPC protocol and API.
422    
423     =head1 AUTHOR AND CONTACT INFORMATION
424    
425     Marc Lehmann <schmorp@schmorp.de>
426     http://software.schmorp.de/pkg/AnyEvent-Fork-Pool
427    
428     =cut
429    
430     1
431