ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-Fork-Pool/Pool.pm
Revision: 1.3
Committed: Sat Apr 20 16:38:10 2013 UTC (11 years, 1 month ago) by root
Branch: MAIN
Changes since 1.2: +206 -204 lines
Log Message:
api mostly finalised

File Contents

# Content
1 =head1 NAME
2
3 AnyEvent::Fork::Pool - simple process pool manager on top of AnyEvent::Fork
4
5 =head1 SYNOPSIS
6
7 use AnyEvent;
8 use AnyEvent::Fork::Pool;
9 # use AnyEvent::Fork is not needed
10
11 # all parameters with default values
12 my $pool = AnyEvent::Fork
13 ->new
14 ->require ("MyWorker")
15 ->AnyEvent::Fork::Pool::run (
16 "MyWorker::run", # the worker function
17
18 # pool management
19 max => 4, # absolute maximum # of processes
20 idle => 2, # minimum # of idle processes
21 load => 2, # queue at most this number of jobs per process
22 start => 0.1, # wait this many seconds before starting a new process
23 stop => 1, # wait this many seconds before stopping an idle process
24 on_destroy => (my $finish = AE::cv), # called when object is destroyed
25
26 # parameters passed to AnyEvent::Fork::RPC
27 async => 0,
28 on_error => sub { die "FATAL: $_[0]\n" },
29 on_event => sub { my @ev = @_ },
30 init => "MyWorker::init",
31 serialiser => $AnyEvent::Fork::RPC::STRING_SERIALISER,
32 );
33
34 for (1..10) {
35 $pool->(doit => $_, sub {
36 print "MyWorker::run returned @_\n";
37 });
38 }
39
40 undef $pool;
41
42 $finish->recv;
43
44 =head1 DESCRIPTION
45
46 This module uses processes created via L<AnyEvent::Fork> and the RPC
47 protocol implement in L<AnyEvent::Fork::RPC> to create a load-balanced
48 pool of processes that handles jobs.
49
50 Understanding of L<AnyEvent::Fork> is helpful but not critical to be able
51 to use this module, but a thorough understanding of L<AnyEvent::Fork::RPC>
52 is, as it defines the actual API that needs to be implemented in the
53 children.
54
55 =head1 EXAMPLES
56
57 =head1 PARENT USAGE
58
59 =over 4
60
61 =cut
62
63 package AnyEvent::Fork::Pool;
64
65 use common::sense;
66
67 use Scalar::Util ();
68
69 use Guard ();
70 use Array::Heap ();
71
72 use AnyEvent;
73 use AnyEvent::Fork; # we don't actually depend on it, this is for convenience
74 use AnyEvent::Fork::RPC;
75
76 # these are used for the first and last argument of events
77 # in the hope of not colliding. yes, I don't like it either,
78 # but didn't come up with an obviously better alternative.
79 my $magic0 = ':t6Z@HK1N%Dx@_7?=~-7NQgWDdAs6a,jFN=wLO0*jD*1%P';
80 my $magic1 = '<~53rexz.U`!]X[A235^"fyEoiTF\T~oH1l/N6+Djep9b~bI9`\1x%B~vWO1q*';
81
82 our $VERSION = 0.1;
83
84 =item my $rpc = AnyEvent::Fork::Pool::run $fork, $function, [key => value...]
85
86 The traditional way to call it. But it is way cooler to call it in the
87 following way:
88
89 =item my $rpc = $fork->AnyEvent::Fork::Pool::run ($function, [key => value...])
90
91 Creates a new pool object with the specified C<$function> as function
92 (name) to call for each request. The pool uses the C<$fork> object as the
93 template when creating worker processes.
94
95 You can supply your own template process, or tell C<AnyEvent::Fork::Pool>
96 to create one.
97
98 A relatively large number of key/value pairs can be specified to influence
99 the behaviour. They are grouped into the categories "pool management",
100 "template process" and "rpc parameters".
101
102 =over 4
103
104 =item Pool Management
105
106 The pool consists of a certain number of worker processes. These options
107 decide how many of these processes exist and when they are started and
108 stopp.ed
109
110 =over 4
111
112 =item idle => $count (default: 0)
113
114 The minimum amount of idle processes in the pool - when there are fewer
115 than this many idle workers, C<AnyEvent::Fork::Pool> will try to start new
116 ones, subject to C<max> and C<start>.
117
118 This is also the initial/minimum amount of workers in the pool. The
119 default of zero means that the pool starts empty and can shrink back to
120 zero workers over time.
121
122 =item max => $count (default: 4)
123
124 The maximum number of processes in the pool, in addition to the template
125 process. C<AnyEvent::Fork::Pool> will never create more than this number
126 of worker processes, although there can be more temporarily when a worker
127 is shut down and hasn't exited yet.
128
129 =item load => $count (default: 2)
130
131 The maximum number of concurrent jobs sent to a single worker
132 process. Worker processes that handle this number of jobs already are
133 called "busy".
134
135 Jobs that cannot be sent to a worker immediately (because all workers are
136 busy) will be queued until a worker is available.
137
138 =item start => $seconds (default: 0.1)
139
140 When a job is queued and all workers are busy, a timer is started. If the
141 timer elapses and there are still jobs that cannot be queued to a worker,
142 a new worker is started.
143
144 This configurs the time that all workers must be busy before a new worker
145 is started. Or, put differently, the minimum delay betwene starting new
146 workers.
147
148 The delay is zero by default, which means new workers will be started
149 without delay.
150
151 =item stop => $seconds (default: 1)
152
153 When a worker has no jobs to execute it becomes idle. An idle worker that
154 hasn't executed a job within this amount of time will be stopped, unless
155 the other parameters say otherwise.
156
157 =item on_destroy => $callback->() (default: none)
158
159 When a pool object goes out of scope, it will still handle all outstanding
160 jobs. After that, it will destroy all workers (and also the template
161 process if it isn't referenced otherwise).
162
163 =back
164
165 =item Template Process
166
167 The worker processes are all forked from a single template
168 process. Ideally, all modules and all cdoe used by the worker, as well as
169 any shared data structures should be loaded into the template process, to
170 take advantage of data sharing via fork.
171
172 You can create your own template process by creating a L<AnyEvent::Fork>
173 object yourself and passing it as the C<template> parameter, but
174 C<AnyEvent::Fork::Pool> can create one for you, including some standard
175 options.
176
177 =over 4
178
179 =item template => $fork (default: C<< AnyEvent::Fork->new >>)
180
181 The template process to use, if you want to create your own.
182
183 =item require => \@modules (default: C<[]>)
184
185 The modules in this list will be laoded into the template process.
186
187 =item eval => "# perl code to execute in template" (default: none)
188
189 This is a perl string that is evaluated after creating the template
190 process and after requiring the modules. It can do whatever it wants to
191 configure the process, but it must not do anything that would keep a later
192 fork from working (so must not create event handlers or (real) threads for
193 example).
194
195 =back
196
197 =item AnyEvent::Fork::RPC Parameters
198
199 These parameters are all passed directly to L<AnyEvent::Fork::RPC>. They
200 are only briefly mentioned here, for their full documentation
201 please refer to the L<AnyEvent::Fork::RPC> documentation. Also, the
202 default values mentioned here are only documented as a best effort -
203 L<AnyEvent::Fork::RPC> documentation is binding.
204
205 =over 4
206
207 =item async => $boolean (default: 0)
208
209 Whether to sue the synchronous or asynchronous RPC backend.
210
211 =item on_error => $callback->($message) (default: die with message)
212
213 The callback to call on any (fatal) errors.
214
215 =item on_event => $callback->(...) (default: C<sub { }>, unlike L<AnyEvent::Fork::RPC>)
216
217 The callback to invoke on events.
218
219 =item init => $initfunction (default: none)
220
221 The function to call in the child, once before handling requests.
222
223 =item serialiser => $serialiser (defailt: $AnyEvent::Fork::RPC::STRING_SERIALISER)
224
225 The serialiser to use.
226
227 =back
228
229 =back
230
231 =cut
232
233 sub run {
234 my ($template, $function, %arg) = @_;
235
236 my $max = $arg{max} || 4;
237 my $idle = $arg{idle} || 0,
238 my $load = $arg{load} || 2,
239 my $start = $arg{start} || 0.1,
240 my $stop = $arg{stop} || 1,
241 my $on_event = $arg{on_event} || sub { },
242 my $on_destroy = $arg{on_destroy};
243
244 my @rpc = (
245 async => $arg{async},
246 init => $arg{init},
247 serialiser => $arg{serialiser},
248 on_error => $arg{on_error},
249 );
250
251 my (@pool, @queue, $nidle, $start_w, $stop_w, $shutdown);
252 my ($start, $stop, $want_start, $want_stop, $scheduler);
253
254 my $destroy_guard = Guard::guard {
255 $on_destroy->()
256 if $on_destroy;
257 };
258
259 my $busy;#d#
260
261 $template
262 ->require ("AnyEvent::Fork::RPC::" . ($arg{async} ? "Async" : "Sync"))
263 ->eval ('
264 my ($magic0, $magic1) = @_;
265 sub AnyEvent::Fork::Pool::quit() {
266 AnyEvent::Fork::RPC::on_event $magic0, "quit", $magic1;
267 }
268 ', $magic0, $magic1)
269 ->eval (delete $arg{eval});
270
271 $start = sub {
272 my $proc = [0, 0, undef]; # load, index, rpc
273
274 warn "start a worker\n";#d#
275
276 $proc->[2] = $template
277 ->fork
278 ->AnyEvent::Fork::RPC::run ($function,
279 @rpc,
280 on_event => sub {
281 if (@_ == 3 && $_[0] eq $magic0 && $_[2] eq $magic1) {
282 $destroy_guard if 0; # keep it alive
283
284 $_[1] eq "quit" and $stop->($proc);
285 return;
286 }
287
288 &$on_event;
289 },
290 )
291 ;
292
293 ++$nidle;
294 Array::Heap::push_heap @pool, $proc;
295
296 Scalar::Util::weaken $proc;
297 };
298
299 $stop = sub {
300 my $proc = shift;
301
302 $proc->[0]
303 or --$nidle;
304
305 Array::Heap::splice_heap_idx @pool, $proc->[1]
306 if defined $proc->[1];
307 };
308
309 $want_start = sub {
310 undef $stop_w;
311
312 $start_w ||= AE::timer $start, 0, sub {
313 undef $start_w;
314
315 if (@queue) {
316 $start->();
317 $scheduler->();
318 }
319 };
320 };
321
322 $want_stop = sub {
323 $stop_w ||= AE::timer $stop, 0, sub {
324 undef $stop_w;
325
326 $stop->($pool[0])
327 if $nidle;
328 };
329 };
330
331 $scheduler = sub {
332 if (@queue) {
333 while (@queue) {
334 my $proc = $pool[0];
335
336 if ($proc->[0] < $load) {
337 warn "free $proc $proc->[0]\n";#d#
338 # found free worker
339 $proc->[0]++
340 or --$nidle >= $idle
341 or $want_start->();
342
343 Array::Heap::adjust_heap @pool, 0;
344
345 my $job = shift @queue;
346 my $ocb = pop @$job;
347
348 $proc->[2]->(@$job, sub {
349 --$busy; warn "busy now $busy\n";#d#
350 # reduce queue counter
351 --$pool[$_][0]
352 or ++$nidle > $idle
353 or $want_stop->();
354
355 Array::Heap::adjust_heap @pool, $_;
356
357 $scheduler->();
358
359 &$ocb;
360 });
361 } else {
362 warn "busy $proc->[0]\n";#d#
363 # all busy, delay
364
365 $want_start->();
366 last;
367 }
368 }
369 } elsif ($shutdown) {
370 @pool = ();
371 undef $start_w;
372 undef $start; # frees $destroy_guard reference
373
374 $stop->($pool[0])
375 while $nidle;
376 }
377 };
378
379 my $shutdown_guard = Guard::guard {
380 $shutdown = 1;
381 $scheduler->();
382 };
383
384 $start->()
385 while @pool < $idle;
386
387 sub {
388 $shutdown_guard if 0; # keep it alive
389
390 ++$busy;#d#
391
392 $start->()
393 unless @pool;
394
395 push @queue, [@_];
396 $scheduler->();
397 }
398 }
399
400 =item $pool->call (..., $cb->(...))
401
402 Call the RPC function of a worker with the given arguments, and when the
403 worker is done, call the C<$cb> with the results, like just calling the
404 L<AnyEvent::Fork::RPC> object directly.
405
406 If there is no free worker, the call will be queued.
407
408 Note that there can be considerable time between calling this method and
409 the call actually being executed. During this time, the parameters passed
410 to this function are effectively read-only - modifying them after the call
411 and before the callback is invoked causes undefined behaviour.
412
413 =cut
414
415 =back
416
417 =head1 SEE ALSO
418
419 L<AnyEvent::Fork>, to create the processes in the first place.
420
421 L<AnyEvent::Fork::RPC>, which implements the RPC protocol and API.
422
423 =head1 AUTHOR AND CONTACT INFORMATION
424
425 Marc Lehmann <schmorp@schmorp.de>
426 http://software.schmorp.de/pkg/AnyEvent-Fork-Pool
427
428 =cut
429
430 1
431