ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-Fork-Pool/Pool.pm
Revision: 1.4
Committed: Sat Apr 20 16:39:14 2013 UTC (11 years, 1 month ago) by root
Branch: MAIN
Changes since 1.3: +0 -5 lines
Log Message:
*** empty log message ***

File Contents

# Content
1 =head1 NAME
2
3 AnyEvent::Fork::Pool - simple process pool manager on top of AnyEvent::Fork
4
5 =head1 SYNOPSIS
6
7 use AnyEvent;
8 use AnyEvent::Fork::Pool;
9 # use AnyEvent::Fork is not needed
10
11 # all parameters with default values
12 my $pool = AnyEvent::Fork
13 ->new
14 ->require ("MyWorker")
15 ->AnyEvent::Fork::Pool::run (
16 "MyWorker::run", # the worker function
17
18 # pool management
19 max => 4, # absolute maximum # of processes
20 idle => 2, # minimum # of idle processes
21 load => 2, # queue at most this number of jobs per process
22 start => 0.1, # wait this many seconds before starting a new process
23 stop => 1, # wait this many seconds before stopping an idle process
24 on_destroy => (my $finish = AE::cv), # called when object is destroyed
25
26 # parameters passed to AnyEvent::Fork::RPC
27 async => 0,
28 on_error => sub { die "FATAL: $_[0]\n" },
29 on_event => sub { my @ev = @_ },
30 init => "MyWorker::init",
31 serialiser => $AnyEvent::Fork::RPC::STRING_SERIALISER,
32 );
33
34 for (1..10) {
35 $pool->(doit => $_, sub {
36 print "MyWorker::run returned @_\n";
37 });
38 }
39
40 undef $pool;
41
42 $finish->recv;
43
44 =head1 DESCRIPTION
45
46 This module uses processes created via L<AnyEvent::Fork> and the RPC
47 protocol implement in L<AnyEvent::Fork::RPC> to create a load-balanced
48 pool of processes that handles jobs.
49
50 Understanding of L<AnyEvent::Fork> is helpful but not critical to be able
51 to use this module, but a thorough understanding of L<AnyEvent::Fork::RPC>
52 is, as it defines the actual API that needs to be implemented in the
53 children.
54
55 =head1 EXAMPLES
56
57 =head1 PARENT USAGE
58
59 =over 4
60
61 =cut
62
63 package AnyEvent::Fork::Pool;
64
65 use common::sense;
66
67 use Scalar::Util ();
68
69 use Guard ();
70 use Array::Heap ();
71
72 use AnyEvent;
73 use AnyEvent::Fork; # we don't actually depend on it, this is for convenience
74 use AnyEvent::Fork::RPC;
75
76 # these are used for the first and last argument of events
77 # in the hope of not colliding. yes, I don't like it either,
78 # but didn't come up with an obviously better alternative.
79 my $magic0 = ':t6Z@HK1N%Dx@_7?=~-7NQgWDdAs6a,jFN=wLO0*jD*1%P';
80 my $magic1 = '<~53rexz.U`!]X[A235^"fyEoiTF\T~oH1l/N6+Djep9b~bI9`\1x%B~vWO1q*';
81
82 our $VERSION = 0.1;
83
84 =item my $rpc = AnyEvent::Fork::Pool::run $fork, $function, [key => value...]
85
86 The traditional way to call it. But it is way cooler to call it in the
87 following way:
88
89 =item my $rpc = $fork->AnyEvent::Fork::Pool::run ($function, [key => value...])
90
91 Creates a new pool object with the specified C<$function> as function
92 (name) to call for each request. The pool uses the C<$fork> object as the
93 template when creating worker processes.
94
95 You can supply your own template process, or tell C<AnyEvent::Fork::Pool>
96 to create one.
97
98 A relatively large number of key/value pairs can be specified to influence
99 the behaviour. They are grouped into the categories "pool management",
100 "template process" and "rpc parameters".
101
102 =over 4
103
104 =item Pool Management
105
106 The pool consists of a certain number of worker processes. These options
107 decide how many of these processes exist and when they are started and
108 stopp.ed
109
110 =over 4
111
112 =item idle => $count (default: 0)
113
114 The minimum amount of idle processes in the pool - when there are fewer
115 than this many idle workers, C<AnyEvent::Fork::Pool> will try to start new
116 ones, subject to C<max> and C<start>.
117
118 This is also the initial/minimum amount of workers in the pool. The
119 default of zero means that the pool starts empty and can shrink back to
120 zero workers over time.
121
122 =item max => $count (default: 4)
123
124 The maximum number of processes in the pool, in addition to the template
125 process. C<AnyEvent::Fork::Pool> will never create more than this number
126 of worker processes, although there can be more temporarily when a worker
127 is shut down and hasn't exited yet.
128
129 =item load => $count (default: 2)
130
131 The maximum number of concurrent jobs sent to a single worker
132 process. Worker processes that handle this number of jobs already are
133 called "busy".
134
135 Jobs that cannot be sent to a worker immediately (because all workers are
136 busy) will be queued until a worker is available.
137
138 =item start => $seconds (default: 0.1)
139
140 When a job is queued and all workers are busy, a timer is started. If the
141 timer elapses and there are still jobs that cannot be queued to a worker,
142 a new worker is started.
143
144 This configurs the time that all workers must be busy before a new worker
145 is started. Or, put differently, the minimum delay betwene starting new
146 workers.
147
148 The delay is zero by default, which means new workers will be started
149 without delay.
150
151 =item stop => $seconds (default: 1)
152
153 When a worker has no jobs to execute it becomes idle. An idle worker that
154 hasn't executed a job within this amount of time will be stopped, unless
155 the other parameters say otherwise.
156
157 =item on_destroy => $callback->() (default: none)
158
159 When a pool object goes out of scope, it will still handle all outstanding
160 jobs. After that, it will destroy all workers (and also the template
161 process if it isn't referenced otherwise).
162
163 =back
164
165 =item Template Process
166
167 The worker processes are all forked from a single template
168 process. Ideally, all modules and all cdoe used by the worker, as well as
169 any shared data structures should be loaded into the template process, to
170 take advantage of data sharing via fork.
171
172 You can create your own template process by creating a L<AnyEvent::Fork>
173 object yourself and passing it as the C<template> parameter, but
174 C<AnyEvent::Fork::Pool> can create one for you, including some standard
175 options.
176
177 =over 4
178
179 =item template => $fork (default: C<< AnyEvent::Fork->new >>)
180
181 The template process to use, if you want to create your own.
182
183 =item require => \@modules (default: C<[]>)
184
185 The modules in this list will be laoded into the template process.
186
187 =item eval => "# perl code to execute in template" (default: none)
188
189 This is a perl string that is evaluated after creating the template
190 process and after requiring the modules. It can do whatever it wants to
191 configure the process, but it must not do anything that would keep a later
192 fork from working (so must not create event handlers or (real) threads for
193 example).
194
195 =back
196
197 =item AnyEvent::Fork::RPC Parameters
198
199 These parameters are all passed directly to L<AnyEvent::Fork::RPC>. They
200 are only briefly mentioned here, for their full documentation
201 please refer to the L<AnyEvent::Fork::RPC> documentation. Also, the
202 default values mentioned here are only documented as a best effort -
203 L<AnyEvent::Fork::RPC> documentation is binding.
204
205 =over 4
206
207 =item async => $boolean (default: 0)
208
209 Whether to sue the synchronous or asynchronous RPC backend.
210
211 =item on_error => $callback->($message) (default: die with message)
212
213 The callback to call on any (fatal) errors.
214
215 =item on_event => $callback->(...) (default: C<sub { }>, unlike L<AnyEvent::Fork::RPC>)
216
217 The callback to invoke on events.
218
219 =item init => $initfunction (default: none)
220
221 The function to call in the child, once before handling requests.
222
223 =item serialiser => $serialiser (defailt: $AnyEvent::Fork::RPC::STRING_SERIALISER)
224
225 The serialiser to use.
226
227 =back
228
229 =back
230
231 =cut
232
233 sub run {
234 my ($template, $function, %arg) = @_;
235
236 my $max = $arg{max} || 4;
237 my $idle = $arg{idle} || 0,
238 my $load = $arg{load} || 2,
239 my $start = $arg{start} || 0.1,
240 my $stop = $arg{stop} || 1,
241 my $on_event = $arg{on_event} || sub { },
242 my $on_destroy = $arg{on_destroy};
243
244 my @rpc = (
245 async => $arg{async},
246 init => $arg{init},
247 serialiser => $arg{serialiser},
248 on_error => $arg{on_error},
249 );
250
251 my (@pool, @queue, $nidle, $start_w, $stop_w, $shutdown);
252 my ($start, $stop, $want_start, $want_stop, $scheduler);
253
254 my $destroy_guard = Guard::guard {
255 $on_destroy->()
256 if $on_destroy;
257 };
258
259 $template
260 ->require ("AnyEvent::Fork::RPC::" . ($arg{async} ? "Async" : "Sync"))
261 ->eval ('
262 my ($magic0, $magic1) = @_;
263 sub AnyEvent::Fork::Pool::quit() {
264 AnyEvent::Fork::RPC::on_event $magic0, "quit", $magic1;
265 }
266 ', $magic0, $magic1)
267 ->eval (delete $arg{eval});
268
269 $start = sub {
270 my $proc = [0, 0, undef]; # load, index, rpc
271
272 warn "start a worker\n";#d#
273
274 $proc->[2] = $template
275 ->fork
276 ->AnyEvent::Fork::RPC::run ($function,
277 @rpc,
278 on_event => sub {
279 if (@_ == 3 && $_[0] eq $magic0 && $_[2] eq $magic1) {
280 $destroy_guard if 0; # keep it alive
281
282 $_[1] eq "quit" and $stop->($proc);
283 return;
284 }
285
286 &$on_event;
287 },
288 )
289 ;
290
291 ++$nidle;
292 Array::Heap::push_heap @pool, $proc;
293
294 Scalar::Util::weaken $proc;
295 };
296
297 $stop = sub {
298 my $proc = shift;
299
300 $proc->[0]
301 or --$nidle;
302
303 Array::Heap::splice_heap_idx @pool, $proc->[1]
304 if defined $proc->[1];
305 };
306
307 $want_start = sub {
308 undef $stop_w;
309
310 $start_w ||= AE::timer $start, 0, sub {
311 undef $start_w;
312
313 if (@queue) {
314 $start->();
315 $scheduler->();
316 }
317 };
318 };
319
320 $want_stop = sub {
321 $stop_w ||= AE::timer $stop, 0, sub {
322 undef $stop_w;
323
324 $stop->($pool[0])
325 if $nidle;
326 };
327 };
328
329 $scheduler = sub {
330 if (@queue) {
331 while (@queue) {
332 my $proc = $pool[0];
333
334 if ($proc->[0] < $load) {
335 warn "free $proc $proc->[0]\n";#d#
336 # found free worker
337 $proc->[0]++
338 or --$nidle >= $idle
339 or $want_start->();
340
341 Array::Heap::adjust_heap @pool, 0;
342
343 my $job = shift @queue;
344 my $ocb = pop @$job;
345
346 $proc->[2]->(@$job, sub {
347 # reduce queue counter
348 --$pool[$_][0]
349 or ++$nidle > $idle
350 or $want_stop->();
351
352 Array::Heap::adjust_heap @pool, $_;
353
354 $scheduler->();
355
356 &$ocb;
357 });
358 } else {
359 warn "busy $proc->[0]\n";#d#
360 # all busy, delay
361
362 $want_start->();
363 last;
364 }
365 }
366 } elsif ($shutdown) {
367 @pool = ();
368 undef $start_w;
369 undef $start; # frees $destroy_guard reference
370
371 $stop->($pool[0])
372 while $nidle;
373 }
374 };
375
376 my $shutdown_guard = Guard::guard {
377 $shutdown = 1;
378 $scheduler->();
379 };
380
381 $start->()
382 while @pool < $idle;
383
384 sub {
385 $shutdown_guard if 0; # keep it alive
386
387 $start->()
388 unless @pool;
389
390 push @queue, [@_];
391 $scheduler->();
392 }
393 }
394
395 =item $pool->call (..., $cb->(...))
396
397 Call the RPC function of a worker with the given arguments, and when the
398 worker is done, call the C<$cb> with the results, like just calling the
399 L<AnyEvent::Fork::RPC> object directly.
400
401 If there is no free worker, the call will be queued.
402
403 Note that there can be considerable time between calling this method and
404 the call actually being executed. During this time, the parameters passed
405 to this function are effectively read-only - modifying them after the call
406 and before the callback is invoked causes undefined behaviour.
407
408 =cut
409
410 =back
411
412 =head1 SEE ALSO
413
414 L<AnyEvent::Fork>, to create the processes in the first place.
415
416 L<AnyEvent::Fork::RPC>, which implements the RPC protocol and API.
417
418 =head1 AUTHOR AND CONTACT INFORMATION
419
420 Marc Lehmann <schmorp@schmorp.de>
421 http://software.schmorp.de/pkg/AnyEvent-Fork-Pool
422
423 =cut
424
425 1
426