--- AnyEvent-Fork-Pool/Pool.pm 2013/04/20 16:39:14 1.4 +++ AnyEvent-Fork-Pool/Pool.pm 2013/04/20 19:33:23 1.5 @@ -81,12 +81,12 @@ our $VERSION = 0.1; -=item my $rpc = AnyEvent::Fork::Pool::run $fork, $function, [key => value...] +=item my $pool = AnyEvent::Fork::Pool::run $fork, $function, [key => value...] The traditional way to call it. But it is way cooler to call it in the following way: -=item my $rpc = $fork->AnyEvent::Fork::Pool::run ($function, [key => value...]) +=item my $pool = $fork->AnyEvent::Fork::Pool::run ($function, [key => value...]) Creates a new pool object with the specified C<$function> as function (name) to call for each request. The pool uses the C<$fork> object as the @@ -105,7 +105,7 @@ The pool consists of a certain number of worker processes. These options decide how many of these processes exist and when they are started and -stopp.ed +stopped. =over 4 @@ -242,14 +242,14 @@ my $on_destroy = $arg{on_destroy}; my @rpc = ( - async => $arg{async}, - init => $arg{init}, - serialiser => $arg{serialiser}, - on_error => $arg{on_error}, + async => $arg{async}, + init => $arg{init}, + serialiser => delete $arg{serialiser}, + on_error => $arg{on_error}, ); my (@pool, @queue, $nidle, $start_w, $stop_w, $shutdown); - my ($start, $stop, $want_start, $want_stop, $scheduler); + my ($start_worker, $stop_worker, $want_start, $want_stop, $scheduler); my $destroy_guard = Guard::guard { $on_destroy->() @@ -264,13 +264,11 @@ AnyEvent::Fork::RPC::on_event $magic0, "quit", $magic1; } ', $magic0, $magic1) - ->eval (delete $arg{eval}); + ->eval ($arg{eval}); - $start = sub { + $start_worker = sub { my $proc = [0, 0, undef]; # load, index, rpc - warn "start a worker\n";#d# - $proc->[2] = $template ->fork ->AnyEvent::Fork::RPC::run ($function, @@ -279,7 +277,7 @@ if (@_ == 3 && $_[0] eq $magic0 && $_[2] eq $magic1) { $destroy_guard if 0; # keep it alive - $_[1] eq "quit" and $stop->($proc); + $_[1] eq "quit" and $stop_worker->($proc); return; } @@ -289,12 +287,12 @@ ; ++$nidle; - Array::Heap::push_heap @pool, $proc; + Array::Heap::push_heap_idx @pool, $proc; Scalar::Util::weaken $proc; }; - $stop = sub { + $stop_worker = sub { my $proc = shift; $proc->[0] @@ -307,22 +305,23 @@ $want_start = sub { undef $stop_w; - $start_w ||= AE::timer $start, 0, sub { - undef $start_w; - - if (@queue) { - $start->(); + $start_w ||= AE::timer $start, $start, sub { + if (($nidle < $idle || @queue) && @pool < $max) { + $start_worker->(); $scheduler->(); + } else { + undef $start_w; } }; }; $want_stop = sub { - $stop_w ||= AE::timer $stop, 0, sub { - undef $stop_w; - - $stop->($pool[0]) + $stop_w ||= AE::timer $stop, $stop, sub { + $stop_worker->($pool[0]) if $nidle; + + undef $stop_w + if $nidle <= $idle; }; }; @@ -332,43 +331,47 @@ my $proc = $pool[0]; if ($proc->[0] < $load) { - warn "free $proc $proc->[0]\n";#d# - # found free worker - $proc->[0]++ - or --$nidle >= $idle - or $want_start->(); + # found free worker, increase load + unless ($proc->[0]++) { + # worker became busy + --$nidle + or undef $stop_w; + + $want_start->() + if $nidle < $idle && @pool < $max; + } - Array::Heap::adjust_heap @pool, 0; + Array::Heap::adjust_heap_idx @pool, 0; my $job = shift @queue; my $ocb = pop @$job; $proc->[2]->(@$job, sub { - # reduce queue counter - --$pool[$_][0] - or ++$nidle > $idle + # reduce load + --$proc->[0] # worker still busy? + or ++$nidle > $idle # not too many idle processes? or $want_stop->(); - Array::Heap::adjust_heap @pool, $_; + Array::Heap::adjust_heap_idx @pool, $proc->[1] + if defined $proc->[1]; $scheduler->(); &$ocb; }); } else { - warn "busy $proc->[0]\n";#d# - # all busy, delay + $want_start->() + unless @pool >= $max; - $want_start->(); last; } } } elsif ($shutdown) { @pool = (); undef $start_w; - undef $start; # frees $destroy_guard reference + undef $start_worker; # frees $destroy_guard reference - $stop->($pool[0]) + $stop_worker->($pool[0]) while $nidle; } }; @@ -378,13 +381,13 @@ $scheduler->(); }; - $start->() + $start_worker->() while @pool < $idle; sub { $shutdown_guard if 0; # keep it alive - $start->() + $start_worker->() unless @pool; push @queue, [@_]; @@ -392,10 +395,10 @@ } } -=item $pool->call (..., $cb->(...)) +=item $pool->(..., $cb->(...)) Call the RPC function of a worker with the given arguments, and when the -worker is done, call the C<$cb> with the results, like just calling the +worker is done, call the C<$cb> with the results, just like calling the L object directly. If there is no free worker, the call will be queued.