ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-Fork-Pool/Pool.pm
(Generate patch)

Comparing AnyEvent-Fork-Pool/Pool.pm (file contents):
Revision 1.2 by root, Thu Apr 18 21:37:27 2013 UTC vs.
Revision 1.3 by root, Sat Apr 20 16:38:10 2013 UTC

7 use AnyEvent; 7 use AnyEvent;
8 use AnyEvent::Fork::Pool; 8 use AnyEvent::Fork::Pool;
9 # use AnyEvent::Fork is not needed 9 # use AnyEvent::Fork is not needed
10 10
11 # all parameters with default values 11 # all parameters with default values
12 my $pool = new AnyEvent::Fork::Pool 12 my $pool = AnyEvent::Fork
13 "MyWorker::run", 13 ->new
14 ->require ("MyWorker")
15 ->AnyEvent::Fork::Pool::run (
16 "MyWorker::run", # the worker function
14 17
15 # pool management 18 # pool management
16 min => 0, # minimum # of processes
17 max => 4, # maximum # of processes 19 max => 4, # absolute maximum # of processes
18 max_queue => 2, # queue at most this number of jobs per process 20 idle => 2, # minimum # of idle processes
21 load => 2, # queue at most this number of jobs per process
19 min_delay => 0, # wait this many seconds before starting a new process 22 start => 0.1, # wait this many seconds before starting a new process
20 min_idle => 0, # try to have at least this amount of idle processes
21 max_idle => 1, # at most this many idle processes
22 idle_time => 1, # wait this many seconds before killing an idle process 23 stop => 1, # wait this many seconds before stopping an idle process
23 on_destroy => (my $finish = AE::cv), 24 on_destroy => (my $finish = AE::cv), # called when object is destroyed
24 25
25 # template process
26 template => AnyEvent::Fork->new, # the template process to use
27 require => [MyWorker::], # module(s) to load
28 eval => "# perl code to execute in template",
29
30 # parameters passed to AnyEvent::Fork::RPC 26 # parameters passed to AnyEvent::Fork::RPC
31 async => 0, 27 async => 0,
32 on_error => sub { die "FATAL: $_[0]\n" }, 28 on_error => sub { die "FATAL: $_[0]\n" },
33 on_event => sub { my @ev = @_ }, 29 on_event => sub { my @ev = @_ },
34 init => "MyWorker::init", 30 init => "MyWorker::init",
35 serialiser => $AnyEvent::Fork::RPC::STRING_SERIALISER, 31 serialiser => $AnyEvent::Fork::RPC::STRING_SERIALISER,
36 ; 32 );
37 33
38 for (1..10) { 34 for (1..10) {
39 $pool->call (doit => $_, sub { 35 $pool->(doit => $_, sub {
40 print "MyWorker::run returned @_\n"; 36 print "MyWorker::run returned @_\n";
41 }); 37 });
42 } 38 }
43 39
44 undef $pool; 40 undef $pool;
68 64
69use common::sense; 65use common::sense;
70 66
71use Scalar::Util (); 67use Scalar::Util ();
72 68
69use Guard ();
73use Array::Heap (); 70use Array::Heap ();
74 71
75use AnyEvent; 72use AnyEvent;
76use AnyEvent::Fork; # we don't actually depend on it, this is for convenience 73use AnyEvent::Fork; # we don't actually depend on it, this is for convenience
77use AnyEvent::Fork::RPC; 74use AnyEvent::Fork::RPC;
78 75
76# these are used for the first and last argument of events
77# in the hope of not colliding. yes, I don't like it either,
78# but didn't come up with an obviously better alternative.
79my $magic0 = ':t6Z@HK1N%Dx@_7?=~-7NQgWDdAs6a,jFN=wLO0*jD*1%P'; 79my $magic0 = ':t6Z@HK1N%Dx@_7?=~-7NQgWDdAs6a,jFN=wLO0*jD*1%P';
80my $magic2 = '<~53rexz.U`!]X[A235^"fyEoiTF\T~oH1l/N6+Djep9b~bI9`\1x%B~vWO1q*'; 80my $magic1 = '<~53rexz.U`!]X[A235^"fyEoiTF\T~oH1l/N6+Djep9b~bI9`\1x%B~vWO1q*';
81 81
82our $VERSION = 0.1; 82our $VERSION = 0.1;
83 83
84=item my $rpc = new AnyEvent::Fork::Pool $function, [key => value...] 84=item my $rpc = AnyEvent::Fork::Pool::run $fork, $function, [key => value...]
85
86The traditional way to call it. But it is way cooler to call it in the
87following way:
88
89=item my $rpc = $fork->AnyEvent::Fork::Pool::run ($function, [key => value...])
85 90
86Creates a new pool object with the specified C<$function> as function 91Creates a new pool object with the specified C<$function> as function
87(name) to call for each request. 92(name) to call for each request. The pool uses the C<$fork> object as the
88 93template when creating worker processes.
89A pool consists of a template process that contains the code and data that
90the worker processes need. And a number of worker processes that have been
91forked off of that template process.
92 94
93You can supply your own template process, or tell C<AnyEvent::Fork::Pool> 95You can supply your own template process, or tell C<AnyEvent::Fork::Pool>
94to create one. 96to create one.
95 97
96A relatively large number of key/value pairs can be specified to influence 98A relatively large number of key/value pairs can be specified to influence
105decide how many of these processes exist and when they are started and 107decide how many of these processes exist and when they are started and
106stopp.ed 108stopp.ed
107 109
108=over 4 110=over 4
109 111
110=item min => $count (default: 0) 112=item idle => $count (default: 0)
111 113
112The minimum number of processes in the pool, in addition to the template 114The minimum amount of idle processes in the pool - when there are fewer
113process. Even when idle, there will never be fewer than this number of 115than this many idle workers, C<AnyEvent::Fork::Pool> will try to start new
114worker processes. The default means that the pool can be empty. 116ones, subject to C<max> and C<start>.
117
118This is also the initial/minimum amount of workers in the pool. The
119default of zero means that the pool starts empty and can shrink back to
120zero workers over time.
115 121
116=item max => $count (default: 4) 122=item max => $count (default: 4)
117 123
118The maximum number of processes in the pool, in addition to the template 124The maximum number of processes in the pool, in addition to the template
119process. C<AnyEvent::Fork::Pool> will never create more than this number 125process. C<AnyEvent::Fork::Pool> will never create more than this number
120of processes. 126of worker processes, although there can be more temporarily when a worker
127is shut down and hasn't exited yet.
121 128
122=item max_queue => $count (default: 2) 129=item load => $count (default: 2)
123 130
124The maximum number of jobs sent to a single worker process. Worker 131The maximum number of concurrent jobs sent to a single worker
125processes that handle this number of jobs already are called "busy". 132process. Worker processes that handle this number of jobs already are
133called "busy".
126 134
127Jobs that cannot be sent to a worker immediately (because all workers are 135Jobs that cannot be sent to a worker immediately (because all workers are
128busy) will be queued until a worker is available. 136busy) will be queued until a worker is available.
129 137
130=item min_delay => $seconds (default: 0) 138=item start => $seconds (default: 0.1)
131 139
132When a job is queued and all workers are busy, a timer is started. If the 140When a job is queued and all workers are busy, a timer is started. If the
133timer elapses and there are still jobs that cannot be queued to a worker, 141timer elapses and there are still jobs that cannot be queued to a worker,
134a new worker is started. 142a new worker is started.
135 143
138workers. 146workers.
139 147
140The delay is zero by default, which means new workers will be started 148The delay is zero by default, which means new workers will be started
141without delay. 149without delay.
142 150
143=item min_idle => $count (default: 0)
144
145The minimum number of idle workers - when they are less, more
146are started. The C<min_delay> is still respected though, and
147C<min_idle>/C<min_delay> and C<max_idle>/C<idle_time> are useful to
148dynamically adjust the pool.
149
150=item max_idle => $count (default: 1)
151
152The maximum number of idle workers. If a worker becomes idle and there are
153already this many idle workers, it will be stopped immediately instead of
154waiting for the idle timer to elapse.
155
156=item idle_time => $seconds (default: 1) 151=item stop => $seconds (default: 1)
157 152
158When a worker has no jobs to execute it becomes idle. An idle worker that 153When a worker has no jobs to execute it becomes idle. An idle worker that
159hasn't executed a job within this amount of time will be stopped, unless 154hasn't executed a job within this amount of time will be stopped, unless
160the other parameters say otherwise. 155the other parameters say otherwise.
161 156
233 228
234=back 229=back
235 230
236=cut 231=cut
237 232
238sub new { 233sub run {
239 my ($class, $function, %arg) = @_; 234 my ($template, $function, %arg) = @_;
240 235
241 my $self = bless { 236 my $max = $arg{max} || 4;
242 min => 0, 237 my $idle = $arg{idle} || 0,
243 max => 4, 238 my $load = $arg{load} || 2,
244 max_queue => 2, 239 my $start = $arg{start} || 0.1,
245 min_delay => 0, 240 my $stop = $arg{stop} || 1,
246 max_idle => 1, 241 my $on_event = $arg{on_event} || sub { },
247 idle_time => 1, 242 my $on_destroy = $arg{on_destroy};
248 on_event => sub { },
249 %arg,
250 pool => [],
251 queue => [],
252 }, $class;
253 243
254 $self->{function} = $function; 244 my @rpc = (
245 async => $arg{async},
246 init => $arg{init},
247 serialiser => $arg{serialiser},
248 on_error => $arg{on_error},
249 );
255 250
256 ($self->{template} ||= new AnyEvent::Fork) 251 my (@pool, @queue, $nidle, $start_w, $stop_w, $shutdown);
252 my ($start, $stop, $want_start, $want_stop, $scheduler);
253
254 my $destroy_guard = Guard::guard {
255 $on_destroy->()
256 if $on_destroy;
257 };
258
259 my $busy;#d#
260
261 $template
257 ->require ("AnyEvent::Fork::RPC::" . ($self->{async} ? "Async" : "Sync")) 262 ->require ("AnyEvent::Fork::RPC::" . ($arg{async} ? "Async" : "Sync"))
258 ->require (@{ delete $self->{require} })
259 ->eval (' 263 ->eval ('
260 my ($magic0, $magic2) = @_; 264 my ($magic0, $magic1) = @_;
261 sub AnyEvent::Fork::Pool::quit() { 265 sub AnyEvent::Fork::Pool::quit() {
262 AnyEvent::Fork::RPC::on_event $magic0, "quit", $magic2; 266 AnyEvent::Fork::RPC::on_event $magic0, "quit", $magic1;
263 } 267 }
264 ', $magic0, $magic2) 268 ', $magic0, $magic1)
265 ->eval (delete $self->{eval}); 269 ->eval (delete $arg{eval});
266 270
267 $self->start 271 $start = sub {
268 while @{ $self->{pool} } < $self->{min}; 272 my $proc = [0, 0, undef]; # load, index, rpc
269 273
270 $self
271}
272
273sub start {
274 my ($self) = @_;
275
276 warn "start\n";#d# 274 warn "start a worker\n";#d#
277 275
278 Scalar::Util::weaken $self;
279
280 my $proc = [0, undef, undef];
281
282 $proc->[1] = $self->{template} 276 $proc->[2] = $template
283 ->fork 277 ->fork
284 ->AnyEvent::Fork::RPC::run ($self->{function}, 278 ->AnyEvent::Fork::RPC::run ($function,
285 async => $self->{async}, 279 @rpc,
286 init => $self->{init},
287 serialiser => $self->{serialiser},
288 on_error => $self->{on_error},
289 on_event => sub { 280 on_event => sub {
290 if (@_ == 3 && $_[0] eq $magic0 && $_[2] eq $magic2) { 281 if (@_ == 3 && $_[0] eq $magic0 && $_[2] eq $magic1) {
291 if ($_[1] eq "quit") { 282 $destroy_guard if 0; # keep it alive
292 my $pool = $self->{pool}; 283
293 for (0 .. $#$pool) { 284 $_[1] eq "quit" and $stop->($proc);
294 if ($pool->[$_] == $proc) {
295 Array::Heap::splice_heap @$pool, $_;
296 return; 285 return;
297 }
298 }
299 die;
300 } 286 }
287
301 return; 288 &$on_event;
302 } 289 },
290 )
291 ;
303 292
304 &{ $self->{on_event} }; 293 ++$nidle;
294 Array::Heap::push_heap @pool, $proc;
295
296 Scalar::Util::weaken $proc;
297 };
298
299 $stop = sub {
300 my $proc = shift;
301
302 $proc->[0]
303 or --$nidle;
304
305 Array::Heap::splice_heap_idx @pool, $proc->[1]
306 if defined $proc->[1];
307 };
308
309 $want_start = sub {
310 undef $stop_w;
311
312 $start_w ||= AE::timer $start, 0, sub {
313 undef $start_w;
314
315 if (@queue) {
316 $start->();
317 $scheduler->();
318 }
319 };
320 };
321
322 $want_stop = sub {
323 $stop_w ||= AE::timer $stop, 0, sub {
324 undef $stop_w;
325
326 $stop->($pool[0])
327 if $nidle;
328 };
329 };
330
331 $scheduler = sub {
332 if (@queue) {
333 while (@queue) {
334 my $proc = $pool[0];
335
336 if ($proc->[0] < $load) {
337 warn "free $proc $proc->[0]\n";#d#
338 # found free worker
339 $proc->[0]++
340 or --$nidle >= $idle
341 or $want_start->();
342
343 Array::Heap::adjust_heap @pool, 0;
344
345 my $job = shift @queue;
346 my $ocb = pop @$job;
347
348 $proc->[2]->(@$job, sub {
349 --$busy; warn "busy now $busy\n";#d#
350 # reduce queue counter
351 --$pool[$_][0]
352 or ++$nidle > $idle
353 or $want_stop->();
354
355 Array::Heap::adjust_heap @pool, $_;
356
357 $scheduler->();
358
359 &$ocb;
360 });
361 } else {
362 warn "busy $proc->[0]\n";#d#
363 # all busy, delay
364
365 $want_start->();
366 last;
305 }, 367 }
306 ) 368 }
369 } elsif ($shutdown) {
370 @pool = ();
371 undef $start_w;
372 undef $start; # frees $destroy_guard reference
373
374 $stop->($pool[0])
375 while $nidle;
376 }
307 ; 377 };
308 378
309 ++$self->{idle}; 379 my $shutdown_guard = Guard::guard {
310 Array::Heap::push_heap @{ $self->{pool} }, $proc; 380 $shutdown = 1;
381 $scheduler->();
382 };
383
384 $start->()
385 while @pool < $idle;
386
387 sub {
388 $shutdown_guard if 0; # keep it alive
389
390 ++$busy;#d#
391
392 $start->()
393 unless @pool;
394
395 push @queue, [@_];
396 $scheduler->();
397 }
311} 398}
312 399
313=item $pool->call (..., $cb->(...)) 400=item $pool->call (..., $cb->(...))
314 401
315Call the RPC function of a worker with the given arguments, and when the 402Call the RPC function of a worker with the given arguments, and when the
323to this function are effectively read-only - modifying them after the call 410to this function are effectively read-only - modifying them after the call
324and before the callback is invoked causes undefined behaviour. 411and before the callback is invoked causes undefined behaviour.
325 412
326=cut 413=cut
327 414
328sub scheduler {
329 my $self = shift;
330
331 my $pool = $self->{pool};
332 my $queue = $self->{queue};
333
334 $self->start
335 unless @$pool;
336
337 while (@$queue) {
338 my $proc = $pool->[0];
339
340 if ($proc->[0] < $self->{max_queue}) {
341 warn "free $proc $proc->[0]\n";#d#
342 # found free worker
343 --$self->{idle}
344 unless $proc->[0]++;
345
346 undef $proc->[2];
347
348 Array::Heap::adjust_heap @$pool, 0;
349
350 my $job = shift @$queue;
351 my $ocb = pop @$job;
352
353 $proc->[1]->(@$job, sub {
354 for (0 .. $#$pool) {
355 if ($pool->[$_] == $proc) {
356 # reduce queue counter
357 unless (--$pool->[$_][0]) {
358 # worker becomes idle
359 my $to = ++$self->{idle} > $self->{max_idle}
360 ? 0
361 : $self->{idle_time};
362
363 $proc->[2] = AE::timer $to, 0, sub {
364 undef $proc->[2];
365
366 warn "destroy $proc afzer $to\n";#d#
367
368 for (0 .. $#$pool) {
369 if ($pool->[$_] == $proc) {
370 Array::Heap::splice_heap @$pool, $_;
371 --$self->{idle};
372 last;
373 }
374 }
375 };
376 }
377
378 Array::Heap::adjust_heap @$pool, $_;
379 last;
380 }
381 }
382 &$ocb;
383 });
384 } else {
385 warn "busy $proc->[0]\n";#d#
386 # all busy, delay
387
388 $self->{min_delay_w} ||= AE::timer $self->{min_delay}, 0, sub {
389 delete $self->{min_delay_w};
390
391 if (@{ $self->{queue} }) {
392 $self->start;
393 $self->scheduler;
394 }
395 };
396 last;
397 }
398 }
399 warn "last\n";#d#
400}
401
402sub call {
403 my $self = shift;
404
405 push @{ $self->{queue} }, [@_];
406 $self->scheduler;
407}
408
409sub DESTROY {
410 $_[0]{on_destroy}->();
411}
412
413=back 415=back
414 416
415=head1 SEE ALSO 417=head1 SEE ALSO
416 418
417L<AnyEvent::Fork>, to create the processes in the first place. 419L<AnyEvent::Fork>, to create the processes in the first place.

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines