… | |
… | |
79 | my $magic0 = ':t6Z@HK1N%Dx@_7?=~-7NQgWDdAs6a,jFN=wLO0*jD*1%P'; |
79 | my $magic0 = ':t6Z@HK1N%Dx@_7?=~-7NQgWDdAs6a,jFN=wLO0*jD*1%P'; |
80 | my $magic1 = '<~53rexz.U`!]X[A235^"fyEoiTF\T~oH1l/N6+Djep9b~bI9`\1x%B~vWO1q*'; |
80 | my $magic1 = '<~53rexz.U`!]X[A235^"fyEoiTF\T~oH1l/N6+Djep9b~bI9`\1x%B~vWO1q*'; |
81 | |
81 | |
82 | our $VERSION = 0.1; |
82 | our $VERSION = 0.1; |
83 | |
83 | |
84 | =item my $rpc = AnyEvent::Fork::Pool::run $fork, $function, [key => value...] |
84 | =item my $pool = AnyEvent::Fork::Pool::run $fork, $function, [key => value...] |
85 | |
85 | |
86 | The traditional way to call it. But it is way cooler to call it in the |
86 | The traditional way to call it. But it is way cooler to call it in the |
87 | following way: |
87 | following way: |
88 | |
88 | |
89 | =item my $rpc = $fork->AnyEvent::Fork::Pool::run ($function, [key => value...]) |
89 | =item my $pool = $fork->AnyEvent::Fork::Pool::run ($function, [key => value...]) |
90 | |
90 | |
91 | Creates a new pool object with the specified C<$function> as function |
91 | Creates a new pool object with the specified C<$function> as function |
92 | (name) to call for each request. The pool uses the C<$fork> object as the |
92 | (name) to call for each request. The pool uses the C<$fork> object as the |
93 | template when creating worker processes. |
93 | template when creating worker processes. |
94 | |
94 | |
… | |
… | |
103 | |
103 | |
104 | =item Pool Management |
104 | =item Pool Management |
105 | |
105 | |
106 | The pool consists of a certain number of worker processes. These options |
106 | The pool consists of a certain number of worker processes. These options |
107 | decide how many of these processes exist and when they are started and |
107 | decide how many of these processes exist and when they are started and |
108 | stopp.ed |
108 | stopped. |
109 | |
109 | |
110 | =over 4 |
110 | =over 4 |
111 | |
111 | |
112 | =item idle => $count (default: 0) |
112 | =item idle => $count (default: 0) |
113 | |
113 | |
… | |
… | |
240 | my $stop = $arg{stop} || 1, |
240 | my $stop = $arg{stop} || 1, |
241 | my $on_event = $arg{on_event} || sub { }, |
241 | my $on_event = $arg{on_event} || sub { }, |
242 | my $on_destroy = $arg{on_destroy}; |
242 | my $on_destroy = $arg{on_destroy}; |
243 | |
243 | |
244 | my @rpc = ( |
244 | my @rpc = ( |
245 | async => $arg{async}, |
245 | async => $arg{async}, |
246 | init => $arg{init}, |
246 | init => $arg{init}, |
247 | serialiser => $arg{serialiser}, |
247 | serialiser => delete $arg{serialiser}, |
248 | on_error => $arg{on_error}, |
248 | on_error => $arg{on_error}, |
249 | ); |
249 | ); |
250 | |
250 | |
251 | my (@pool, @queue, $nidle, $start_w, $stop_w, $shutdown); |
251 | my (@pool, @queue, $nidle, $start_w, $stop_w, $shutdown); |
252 | my ($start, $stop, $want_start, $want_stop, $scheduler); |
252 | my ($start_worker, $stop_worker, $want_start, $want_stop, $scheduler); |
253 | |
253 | |
254 | my $destroy_guard = Guard::guard { |
254 | my $destroy_guard = Guard::guard { |
255 | $on_destroy->() |
255 | $on_destroy->() |
256 | if $on_destroy; |
256 | if $on_destroy; |
257 | }; |
257 | }; |
258 | |
|
|
259 | my $busy;#d# |
|
|
260 | |
258 | |
261 | $template |
259 | $template |
262 | ->require ("AnyEvent::Fork::RPC::" . ($arg{async} ? "Async" : "Sync")) |
260 | ->require ("AnyEvent::Fork::RPC::" . ($arg{async} ? "Async" : "Sync")) |
263 | ->eval (' |
261 | ->eval (' |
264 | my ($magic0, $magic1) = @_; |
262 | my ($magic0, $magic1) = @_; |
265 | sub AnyEvent::Fork::Pool::quit() { |
263 | sub AnyEvent::Fork::Pool::quit() { |
266 | AnyEvent::Fork::RPC::on_event $magic0, "quit", $magic1; |
264 | AnyEvent::Fork::RPC::on_event $magic0, "quit", $magic1; |
267 | } |
265 | } |
268 | ', $magic0, $magic1) |
266 | ', $magic0, $magic1) |
269 | ->eval (delete $arg{eval}); |
267 | ->eval ($arg{eval}); |
270 | |
268 | |
271 | $start = sub { |
269 | $start_worker = sub { |
272 | my $proc = [0, 0, undef]; # load, index, rpc |
270 | my $proc = [0, 0, undef]; # load, index, rpc |
273 | |
|
|
274 | warn "start a worker\n";#d# |
|
|
275 | |
271 | |
276 | $proc->[2] = $template |
272 | $proc->[2] = $template |
277 | ->fork |
273 | ->fork |
278 | ->AnyEvent::Fork::RPC::run ($function, |
274 | ->AnyEvent::Fork::RPC::run ($function, |
279 | @rpc, |
275 | @rpc, |
280 | on_event => sub { |
276 | on_event => sub { |
281 | if (@_ == 3 && $_[0] eq $magic0 && $_[2] eq $magic1) { |
277 | if (@_ == 3 && $_[0] eq $magic0 && $_[2] eq $magic1) { |
282 | $destroy_guard if 0; # keep it alive |
278 | $destroy_guard if 0; # keep it alive |
283 | |
279 | |
284 | $_[1] eq "quit" and $stop->($proc); |
280 | $_[1] eq "quit" and $stop_worker->($proc); |
285 | return; |
281 | return; |
286 | } |
282 | } |
287 | |
283 | |
288 | &$on_event; |
284 | &$on_event; |
289 | }, |
285 | }, |
290 | ) |
286 | ) |
291 | ; |
287 | ; |
292 | |
288 | |
293 | ++$nidle; |
289 | ++$nidle; |
294 | Array::Heap::push_heap @pool, $proc; |
290 | Array::Heap::push_heap_idx @pool, $proc; |
295 | |
291 | |
296 | Scalar::Util::weaken $proc; |
292 | Scalar::Util::weaken $proc; |
297 | }; |
293 | }; |
298 | |
294 | |
299 | $stop = sub { |
295 | $stop_worker = sub { |
300 | my $proc = shift; |
296 | my $proc = shift; |
301 | |
297 | |
302 | $proc->[0] |
298 | $proc->[0] |
303 | or --$nidle; |
299 | or --$nidle; |
304 | |
300 | |
… | |
… | |
307 | }; |
303 | }; |
308 | |
304 | |
309 | $want_start = sub { |
305 | $want_start = sub { |
310 | undef $stop_w; |
306 | undef $stop_w; |
311 | |
307 | |
312 | $start_w ||= AE::timer $start, 0, sub { |
308 | $start_w ||= AE::timer $start, $start, sub { |
313 | undef $start_w; |
309 | if (($nidle < $idle || @queue) && @pool < $max) { |
314 | |
|
|
315 | if (@queue) { |
|
|
316 | $start->(); |
310 | $start_worker->(); |
317 | $scheduler->(); |
311 | $scheduler->(); |
|
|
312 | } else { |
|
|
313 | undef $start_w; |
318 | } |
314 | } |
319 | }; |
315 | }; |
320 | }; |
316 | }; |
321 | |
317 | |
322 | $want_stop = sub { |
318 | $want_stop = sub { |
323 | $stop_w ||= AE::timer $stop, 0, sub { |
319 | $stop_w ||= AE::timer $stop, $stop, sub { |
324 | undef $stop_w; |
|
|
325 | |
|
|
326 | $stop->($pool[0]) |
320 | $stop_worker->($pool[0]) |
327 | if $nidle; |
321 | if $nidle; |
|
|
322 | |
|
|
323 | undef $stop_w |
|
|
324 | if $nidle <= $idle; |
328 | }; |
325 | }; |
329 | }; |
326 | }; |
330 | |
327 | |
331 | $scheduler = sub { |
328 | $scheduler = sub { |
332 | if (@queue) { |
329 | if (@queue) { |
333 | while (@queue) { |
330 | while (@queue) { |
334 | my $proc = $pool[0]; |
331 | my $proc = $pool[0]; |
335 | |
332 | |
336 | if ($proc->[0] < $load) { |
333 | if ($proc->[0] < $load) { |
337 | warn "free $proc $proc->[0]\n";#d# |
|
|
338 | # found free worker |
334 | # found free worker, increase load |
339 | $proc->[0]++ |
335 | unless ($proc->[0]++) { |
|
|
336 | # worker became busy |
340 | or --$nidle >= $idle |
337 | --$nidle |
|
|
338 | or undef $stop_w; |
|
|
339 | |
341 | or $want_start->(); |
340 | $want_start->() |
|
|
341 | if $nidle < $idle && @pool < $max; |
|
|
342 | } |
342 | |
343 | |
343 | Array::Heap::adjust_heap @pool, 0; |
344 | Array::Heap::adjust_heap_idx @pool, 0; |
344 | |
345 | |
345 | my $job = shift @queue; |
346 | my $job = shift @queue; |
346 | my $ocb = pop @$job; |
347 | my $ocb = pop @$job; |
347 | |
348 | |
348 | $proc->[2]->(@$job, sub { |
349 | $proc->[2]->(@$job, sub { |
349 | --$busy; warn "busy now $busy\n";#d# |
|
|
350 | # reduce queue counter |
350 | # reduce load |
351 | --$pool[$_][0] |
351 | --$proc->[0] # worker still busy? |
352 | or ++$nidle > $idle |
352 | or ++$nidle > $idle # not too many idle processes? |
353 | or $want_stop->(); |
353 | or $want_stop->(); |
354 | |
354 | |
355 | Array::Heap::adjust_heap @pool, $_; |
355 | Array::Heap::adjust_heap_idx @pool, $proc->[1] |
|
|
356 | if defined $proc->[1]; |
356 | |
357 | |
357 | $scheduler->(); |
358 | $scheduler->(); |
358 | |
359 | |
359 | &$ocb; |
360 | &$ocb; |
360 | }); |
361 | }); |
361 | } else { |
362 | } else { |
362 | warn "busy $proc->[0]\n";#d# |
|
|
363 | # all busy, delay |
|
|
364 | |
|
|
365 | $want_start->(); |
363 | $want_start->() |
|
|
364 | unless @pool >= $max; |
|
|
365 | |
366 | last; |
366 | last; |
367 | } |
367 | } |
368 | } |
368 | } |
369 | } elsif ($shutdown) { |
369 | } elsif ($shutdown) { |
370 | @pool = (); |
370 | @pool = (); |
371 | undef $start_w; |
371 | undef $start_w; |
372 | undef $start; # frees $destroy_guard reference |
372 | undef $start_worker; # frees $destroy_guard reference |
373 | |
373 | |
374 | $stop->($pool[0]) |
374 | $stop_worker->($pool[0]) |
375 | while $nidle; |
375 | while $nidle; |
376 | } |
376 | } |
377 | }; |
377 | }; |
378 | |
378 | |
379 | my $shutdown_guard = Guard::guard { |
379 | my $shutdown_guard = Guard::guard { |
380 | $shutdown = 1; |
380 | $shutdown = 1; |
381 | $scheduler->(); |
381 | $scheduler->(); |
382 | }; |
382 | }; |
383 | |
383 | |
384 | $start->() |
384 | $start_worker->() |
385 | while @pool < $idle; |
385 | while @pool < $idle; |
386 | |
386 | |
387 | sub { |
387 | sub { |
388 | $shutdown_guard if 0; # keep it alive |
388 | $shutdown_guard if 0; # keep it alive |
389 | |
389 | |
390 | ++$busy;#d# |
|
|
391 | |
|
|
392 | $start->() |
390 | $start_worker->() |
393 | unless @pool; |
391 | unless @pool; |
394 | |
392 | |
395 | push @queue, [@_]; |
393 | push @queue, [@_]; |
396 | $scheduler->(); |
394 | $scheduler->(); |
397 | } |
395 | } |
398 | } |
396 | } |
399 | |
397 | |
400 | =item $pool->call (..., $cb->(...)) |
398 | =item $pool->(..., $cb->(...)) |
401 | |
399 | |
402 | Call the RPC function of a worker with the given arguments, and when the |
400 | Call the RPC function of a worker with the given arguments, and when the |
403 | worker is done, call the C<$cb> with the results, like just calling the |
401 | worker is done, call the C<$cb> with the results, just like calling the |
404 | L<AnyEvent::Fork::RPC> object directly. |
402 | L<AnyEvent::Fork::RPC> object directly. |
405 | |
403 | |
406 | If there is no free worker, the call will be queued. |
404 | If there is no free worker, the call will be queued. |
407 | |
405 | |
408 | Note that there can be considerable time between calling this method and |
406 | Note that there can be considerable time between calling this method and |