--- AnyEvent-Fork-Pool/Pool.pm 2013/04/18 14:24:01 1.1 +++ AnyEvent-Fork-Pool/Pool.pm 2013/04/25 01:21:18 1.10 @@ -2,39 +2,39 @@ AnyEvent::Fork::Pool - simple process pool manager on top of AnyEvent::Fork +THE API IS NOT FINISHED, CONSIDER THIS AN ALPHA RELEASE + =head1 SYNOPSIS use AnyEvent; use AnyEvent::Fork::Pool; # use AnyEvent::Fork is not needed - # all parameters with default values - my $pool = new AnyEvent::Fork::Pool - "MyWorker::run", - - # pool management - min => 0, # minimum # of processes - max => 8, # maximum # of processes - busy_time => 0, # wait this before starting a new process - max_idle => 1, # wait this before killing an idle process - idle_time => 1, # at most this many idle processes - - # template process - template => AnyEvent::Fork->new, # the template process to use - require => [MyWorker::], # module(s) to load - eval => "# perl code to execute in template", - on_destroy => (my $finish = AE::cv), - - # parameters passed to AnyEvent::Fork::RPC - async => 0, - on_error => sub { die "FATAL: $_[0]\n" }, - on_event => sub { my @ev = @_ }, - init => "MyWorker::init", - serialiser => $AnyEvent::Fork::RPC::STRING_SERIALISER, - ; + # all possible parameters shown, with default values + my $pool = AnyEvent::Fork + ->new + ->require ("MyWorker") + ->AnyEvent::Fork::Pool::run ( + "MyWorker::run", # the worker function + + # pool management + max => 4, # absolute maximum # of processes + idle => 0, # minimum # of idle processes + load => 2, # queue at most this number of jobs per process + start => 0.1, # wait this many seconds before starting a new process + stop => 10, # wait this many seconds before stopping an idle process + on_destroy => (my $finish = AE::cv), # called when object is destroyed + + # parameters passed to AnyEvent::Fork::RPC + async => 0, + on_error => sub { die "FATAL: $_[0]\n" }, + on_event => sub { my @ev = @_ }, + init => "MyWorker::init", + serialiser => $AnyEvent::Fork::RPC::STRING_SERIALISER, + ); for (1..10) { - $pool->call (doit => $_, sub { + $pool->(doit => $_, sub { print "MyWorker::run returned @_\n"; }); } @@ -52,11 +52,30 @@ Understanding of L is helpful but not critical to be able to use this module, but a thorough understanding of L is, as it defines the actual API that needs to be implemented in the -children. +worker processes. =head1 EXAMPLES -=head1 API +=head1 PARENT USAGE + +To create a pool, you first have to create a L object - +this object becomes your template process. Whenever a new worker process +is needed, it is forked from this template process. Then you need to +"hand off" this template process to the C module by +calling its run method on it: + + my $template = AnyEvent::Fork + ->new + ->require ("SomeModule", "MyWorkerModule"); + + my $pool = $template->AnyEvent::Fork::Pool::run ("MyWorkerModule::myfunction"); + +The pool "object" is not a regular Perl object, but a code reference that +you can call and that works roughly like calling the worker function +directly, except that it returns nothing but instead you need to specify a +callback to be invoked once results are in: + + $pool->(1, 2, 3, sub { warn "myfunction(1,2,3) returned @_" }); =over 4 @@ -66,225 +85,525 @@ use common::sense; +use Scalar::Util (); + use Guard (); +use Array::Heap (); use AnyEvent; -use AnyEvent::Fork; # we don't actually depend on it, this is for convenience +# explicit version on next line, as some cpan-testers test with the 0.1 version, +# ignoring dependencies, and this line will at least give a clear indication of that. +use AnyEvent::Fork 0.6; # we don't actually depend on it, this is for convenience use AnyEvent::Fork::RPC; +# these are used for the first and last argument of events +# in the hope of not colliding. yes, I don't like it either, +# but didn't come up with an obviously better alternative. +my $magic0 = ':t6Z@HK1N%Dx@_7?=~-7NQgWDdAs6a,jFN=wLO0*jD*1%P'; +my $magic1 = '<~53rexz.U`!]X[A235^"fyEoiTF\T~oH1l/N6+Djep9b~bI9`\1x%B~vWO1q*'; + our $VERSION = 0.1; -=item my $rpc = new AnyEvent::Fork::RPC::pool $function, [key => value...] +=item my $pool = AnyEvent::Fork::Pool::run $fork, $function, [key => value...] + +The traditional way to call the pool creation function. But it is way +cooler to call it in the following way: + +=item my $pool = $fork->AnyEvent::Fork::Pool::run ($function, [key => value...]) + +Creates a new pool object with the specified C<$function> as function +(name) to call for each request. The pool uses the C<$fork> object as the +template when creating worker processes. + +You can supply your own template process, or tell C +to create one. + +A relatively large number of key/value pairs can be specified to influence +the behaviour. They are grouped into the categories "pool management", +"template process" and "rpc parameters". =over 4 -=item on_error => $cb->($msg) +=item Pool Management -Called on (fatal) errors, with a descriptive (hopefully) message. If -this callback is not provided, but C is, then the C -callback is called with the first argument being the string C, -followed by the error message. +The pool consists of a certain number of worker processes. These options +decide how many of these processes exist and when they are started and +stopped. + +The worker pool is dynamically resized, according to (perceived :) +load. The minimum size is given by the C parameter and the maximum +size is given by the C parameter. A new worker is started every +C seconds at most, and an idle worker is stopped at most every +C second. -If neither handler is provided it prints the error to STDERR and will -start failing badly. +You can specify the amount of jobs sent to a worker concurrently using the +C parameter. -=item on_event => $cb->(...) +=over 4 -Called for every call to the C function in the -child, with the arguments of that function passed to the callback. +=item idle => $count (default: 0) -Also called on errors when no C handler is provided. +The minimum amount of idle processes in the pool - when there are fewer +than this many idle workers, C will try to start new +ones, subject to the limits set by C and C. -=item on_destroy => $cb->() +This is also the initial amount of workers in the pool. The default of +zero means that the pool starts empty and can shrink back to zero workers +over time. -Called when the C<$rpc> object has been destroyed and all requests have -been successfully handled. This is useful when you queue some requests and -want the child to go away after it has handled them. The problem is that -the parent must not exit either until all requests have been handled, and -this can be accomplished by waiting for this callback. +=item max => $count (default: 4) -=item init => $function (default none) +The maximum number of processes in the pool, in addition to the template +process. C will never have more than this number of +worker processes, although there can be more temporarily when a worker is +shut down and hasn't exited yet. -When specified (by name), this function is called in the child as the very -first thing when taking over the process, with all the arguments normally -passed to the C function, except the communications -socket. +=item load => $count (default: 2) -It can be used to do one-time things in the child such as storing passed -parameters or opening database connections. +The maximum number of concurrent jobs sent to a single worker process. -It is called very early - before the serialisers are created or the -C<$function> name is resolved into a function reference, so it could be -used to load any modules that provide the serialiser or function. It can -not, however, create events. +Jobs that cannot be sent to a worker immediately (because all workers are +busy) will be queued until a worker is available. -=item async => $boolean (default: 0) +Setting this low improves latency. For example, at C<1>, every job that +is sent to a worker is sent to a completely idle worker that doesn't run +any other jobs. The downside is that throughput is reduced - a worker that +finishes a job needs to wait for a new job from the parent. + +The default of C<2> is usually a good compromise. + +=item start => $seconds (default: 0.1) + +When there are fewer than C workers (or all workers are completely +busy), then a timer is started. If the timer elapses and there are still +jobs that cannot be queued to a worker, a new worker is started. + +This sets the minimum time that all workers must be busy before a new +worker is started. Or, put differently, the minimum delay between starting +new workers. + +The delay is small by default, which means new workers will be started +relatively quickly. A delay of C<0> is possible, and ensures that the pool +will grow as quickly as possible under load. + +Non-zero values are useful to avoid "exploding" a pool because a lot of +jobs are queued in an instant. + +Higher values are often useful to improve efficiency at the cost of +latency - when fewer processes can do the job over time, starting more and +more is not necessarily going to help. + +=item stop => $seconds (default: 10) + +When a worker has no jobs to execute it becomes idle. An idle worker that +hasn't executed a job within this amount of time will be stopped, unless +the other parameters say otherwise. + +Setting this to a very high value means that workers stay around longer, +even when they have nothing to do, which can be good as they don't have to +be started on the netx load spike again. + +Setting this to a lower value can be useful to avoid memory or simply +process table wastage. + +Usually, setting this to a time longer than the time between load spikes +is best - if you expect a lot of requests every minute and little work +in between, setting this to longer than a minute avoids having to stop +and start workers. On the other hand, you have to ask yourself if letting +workers run idle is a good use of your resources. Try to find a good +balance between resource usage of your workers and the time to start new +workers - the processes created by L itself is fats at +creating workers while not using much memory for them, so most of the +overhead is likely from your own code. + +=item on_destroy => $callback->() (default: none) + +When a pool object goes out of scope, the outstanding requests are still +handled till completion. Only after handling all jobs will the workers +be destroyed (and also the template process if it isn't referenced +otherwise). + +To find out when a pool I has finished its work, you can set this +callback, which will be called when the pool has been destroyed. + +=back + +=item AnyEvent::Fork::RPC Parameters + +These parameters are all passed more or less directly to +L. They are only briefly mentioned here, for +their full documentation please refer to the L +documentation. Also, the default values mentioned here are only documented +as a best effort - the L documentation is binding. + +=over 4 -The default server used in the child does all I/O blockingly, and only -allows a single RPC call to execute concurrently. +=item async => $boolean (default: 0) -Setting C to a true value switches to another implementation that -uses L in the child and allows multiple concurrent RPC calls. +Whether to use the synchronous or asynchronous RPC backend. -The actual API in the child is documented in the section that describes -the calling semantics of the returned C<$rpc> function. +=item on_error => $callback->($message) (default: die with message) -If you want to pre-load the actual back-end modules to enable memory -sharing, then you should load C for -synchronous, and C for asynchronous mode. +The callback to call on any (fatal) errors. -If you use a template process and want to fork both sync and async -children, then it is permissible to load both modules. +=item on_event => $callback->(...) (default: C, unlike L) -=item serialiser => $string (default: '(sub { pack "(w/a*)*", @_ }, sub { unpack "(w/a*)*", shift })') +The callback to invoke on events. -All arguments, result data and event data have to be serialised to be -transferred between the processes. For this, they have to be frozen and -thawed in both parent and child processes. +=item init => $initfunction (default: none) -By default, only octet strings can be passed between the processes, which -is reasonably fast and efficient. +The function to call in the child, once before handling requests. -For more complicated use cases, you can provide your own freeze and thaw -functions, by specifying a string with perl source code. It's supposed to -return two code references when evaluated: the first receives a list of -perl values and must return an octet string. The second receives the octet -string and must return the original list of values. +=item serialiser => $serialiser (defailt: $AnyEvent::Fork::RPC::STRING_SERIALISER) -If you need an external module for serialisation, then you can either -pre-load it into your L process, or you can add a C -or C statement into the serialiser string. Or both. +The serialiser to use. =back -See the examples section earlier in this document for some actual -examples. +=back =cut -sub new { - my ($self, $function, %arg) = @_; +sub run { + my ($template, $function, %arg) = @_; - my $serialiser = delete $arg{serialiser} || $STRING_SERIALISER; - my $on_event = delete $arg{on_event}; - my $on_error = delete $arg{on_error}; - my $on_destroy = delete $arg{on_destroy}; - - # default for on_error is to on_event, if specified - $on_error ||= $on_event - ? sub { $on_event->(error => shift) } - : sub { die "AnyEvent::Fork::RPC: uncaught error: $_[0].\n" }; - - # default for on_event is to raise an error - $on_event ||= sub { $on_error->("event received, but no on_event handler") }; - - my ($f, $t) = eval $serialiser; die $@ if $@; - - my (@rcb, %rcb, $fh, $shutdown, $wbuf, $ww); - my ($rlen, $rbuf, $rw) = 512 - 16; - - my $wcb = sub { - my $len = syswrite $fh, $wbuf; - - unless (defined $len) { - if ($! != Errno::EAGAIN && $! != Errno::EWOULDBLOCK) { - undef $rw; undef $ww; # it ends here - $on_error->("$!"); + my $max = $arg{max} || 4; + my $idle = $arg{idle} || 0, + my $load = $arg{load} || 2, + my $start = $arg{start} || 0.1, + my $stop = $arg{stop} || 10, + my $on_event = $arg{on_event} || sub { }, + my $on_destroy = $arg{on_destroy}; + + my @rpc = ( + async => $arg{async}, + init => $arg{init}, + serialiser => delete $arg{serialiser}, + on_error => $arg{on_error}, + ); + + my (@pool, @queue, $nidle, $start_w, $stop_w, $shutdown); + my ($start_worker, $stop_worker, $want_start, $want_stop, $scheduler); + + my $destroy_guard = Guard::guard { + $on_destroy->() + if $on_destroy; + }; + + $template + ->require ("AnyEvent::Fork::RPC::" . ($arg{async} ? "Async" : "Sync")) + ->eval (' + my ($magic0, $magic1) = @_; + sub AnyEvent::Fork::Pool::retire() { + AnyEvent::Fork::RPC::event $magic0, "quit", $magic1; + } + ', $magic0, $magic1) + ; + + $start_worker = sub { + my $proc = [0, 0, undef]; # load, index, rpc + + $proc->[2] = $template + ->fork + ->AnyEvent::Fork::RPC::run ($function, + @rpc, + on_event => sub { + if (@_ == 3 && $_[0] eq $magic0 && $_[2] eq $magic1) { + $destroy_guard if 0; # keep it alive + + $_[1] eq "quit" and $stop_worker->($proc); + return; + } + + &$on_event; + }, + ) + ; + + ++$nidle; + Array::Heap::push_heap_idx @pool, $proc; + + Scalar::Util::weaken $proc; + }; + + $stop_worker = sub { + my $proc = shift; + + $proc->[0] + or --$nidle; + + Array::Heap::splice_heap_idx @pool, $proc->[1] + if defined $proc->[1]; + + @$proc = 0; # tell others to leave it be + }; + + $want_start = sub { + undef $stop_w; + + $start_w ||= AE::timer $start, $start, sub { + if (($nidle < $idle || @queue) && @pool < $max) { + $start_worker->(); + $scheduler->(); + } else { + undef $start_w; } - } + }; + }; - substr $wbuf, 0, $len, ""; + $want_stop = sub { + $stop_w ||= AE::timer $stop, $stop, sub { + $stop_worker->($pool[0]) + if $nidle; - unless (length $wbuf) { - undef $ww; - $shutdown and shutdown $fh, 1; - } + undef $stop_w + if $nidle <= $idle; + }; }; - my $module = "AnyEvent::Fork::RPC::" . ($arg{async} ? "Async" : "Sync"); + $scheduler = sub { + if (@queue) { + while (@queue) { + @pool or $start_worker->(); + + my $proc = $pool[0]; + + if ($proc->[0] < $load) { + # found free worker, increase load + unless ($proc->[0]++) { + # worker became busy + --$nidle + or undef $stop_w; - $self->require ($module) - ->send_arg ($function, $arg{init}, $serialiser) - ->run ("$module\::run", sub { - $fh = shift; - - my ($id, $len); - $rw = AE::io $fh, 0, sub { - $rlen = $rlen * 2 + 16 if $rlen - 128 < length $rbuf; - $len = sysread $fh, $rbuf, $rlen - length $rbuf, length $rbuf; - - if ($len) { - while (8 <= length $rbuf) { - ($id, $len) = unpack "LL", $rbuf; - 8 + $len <= length $rbuf - or last; - - my @r = $t->(substr $rbuf, 8, $len); - substr $rbuf, 0, 8 + $len, ""; - - if ($id) { - if (@rcb) { - (shift @rcb)->(@r); - } elsif (my $cb = delete $rcb{$id}) { - $cb->(@r); - } else { - undef $rw; undef $ww; - $on_error->("unexpected data from child"); - } - } else { - $on_event->(@r); + $want_start->() + if $nidle < $idle && @pool < $max; } - } - } elsif (defined $len) { - undef $rw; undef $ww; # it ends here - if (@rcb || %rcb) { - $on_error->("unexpected eof"); + Array::Heap::adjust_heap_idx @pool, 0; + + my $job = shift @queue; + my $ocb = pop @$job; + + $proc->[2]->(@$job, sub { + # reduce load + --$proc->[0] # worker still busy? + or ++$nidle > $idle # not too many idle processes? + or $want_stop->(); + + Array::Heap::adjust_heap_idx @pool, $proc->[1] + if defined $proc->[1]; + + &$ocb; + + $scheduler->(); + }); } else { - $on_destroy->(); + $want_start->() + unless @pool >= $max; + + last; } - } elsif ($! != Errno::EAGAIN && $! != Errno::EWOULDBLOCK) { - undef $rw; undef $ww; # it ends here - $on_error->("read: $!"); } - }; + } elsif ($shutdown) { + @pool = (); + undef $start_w; + undef $start_worker; # frees $destroy_guard reference - $ww ||= AE::io $fh, 1, $wcb; - }); + $stop_worker->($pool[0]) + while $nidle; + } + }; - my $guard = Guard::guard { + my $shutdown_guard = Guard::guard { $shutdown = 1; - $ww ||= $fh && AE::io $fh, 1, $wcb; + $scheduler->(); }; - my $id; + $start_worker->() + while @pool < $idle; - $arg{async} - ? sub { - $id = ($id == 0xffffffff ? 0 : $id) + 1; - $id = ($id == 0xffffffff ? 0 : $id) + 1 while exists $rcb{$id}; # rarely loops + sub { + $shutdown_guard if 0; # keep it alive - $rcb{$id} = pop; + $start_worker->() + unless @pool; + + push @queue, [@_]; + $scheduler->(); + } +} + +=item $pool->(..., $cb->(...)) + +Call the RPC function of a worker with the given arguments, and when the +worker is done, call the C<$cb> with the results, just like calling the +RPC object durectly - see the L documentation for +details on the RPC API. + +If there is no free worker, the call will be queued until a worker becomes +available. + +Note that there can be considerable time between calling this method and +the call actually being executed. During this time, the parameters passed +to this function are effectively read-only - modifying them after the call +and before the callback is invoked causes undefined behaviour. + +=cut - $guard; # keep it alive +=item $cpus = AnyEvent::Fork::Pool::ncpu [$default_cpus] - $wbuf .= pack "LL/a*", $id, &$f; - $ww ||= $fh && AE::io $fh, 1, $wcb; - } - : sub { - push @rcb, pop; +=item ($cpus, $eus) = AnyEvent::Fork::Pool::ncpu [$default_cpus] - $guard; # keep it alive +Tries to detect the number of CPUs (C<$cpus> often called cpu cores +nowadays) and execution units (C<$eus>) which include e.g. extra +hyperthreaded units). When C<$cpus> cannot be determined reliably, +C<$default_cpus> is returned for both values, or C<1> if it is missing. + +For normal CPU bound uses, it is wise to have as many worker processes +as CPUs in the system (C<$cpus>), if nothing else uses the CPU. Using +hyperthreading is usually detrimental to performance, but in those rare +cases where that really helps it might be beneficial to use more workers +(C<$eus>). + +Currently, F is parsed on GNU/Linux systems for both +C<$cpus> and C<$eu>, and on {Free,Net,Open}BSD, F is +used for C<$cpus>. + +Example: create a worker pool with as many workers as cpu cores, or C<2>, +if the actual number could not be determined. + + $fork->AnyEvent::Fork::Pool::run ("myworker::function", + max => (scalar AnyEvent::Fork::Pool::ncpu 2), + ); - $wbuf .= pack "L/a*", &$f; - $ww ||= $fh && AE::io $fh, 1, $wcb; - } +=cut + +BEGIN { + if ($^O eq "linux") { + *ncpu = sub(;$) { + my ($cpus, $eus); + + if (open my $fh, "<", "/proc/cpuinfo") { + my %id; + + while (<$fh>) { + if (/^core id\s*:\s*(\d+)/) { + ++$eus; + undef $id{$1}; + } + } + + $cpus = scalar keys %id; + } else { + $cpus = $eus = @_ ? shift : 1; + } + wantarray ? ($cpus, $eus) : $cpus + }; + } elsif ($^O eq "freebsd" || $^O eq "netbsd" || $^O eq "openbsd") { + *ncpu = sub(;$) { + my $cpus = qx * 1 + || (@_ ? shift : 1); + wantarray ? ($cpus, $cpus) : $cpus + }; + } else { + *ncpu = sub(;$) { + my $cpus = @_ ? shift : 1; + wantarray ? ($cpus, $cpus) : $cpus + }; + } } -=item $pool->call (..., $cb->(...)) +=back + +=head1 CHILD USAGE + +In addition to the L API, this module implements one +more child-side function: + +=over 4 + +=item AnyEvent::Fork::Pool::retire () + +This function sends an event to the parent process to request retirement: +the worker is removed from the pool and no new jobs will be sent to it, +but it has to handle the jobs that are already queued. + +The parentheses are part of the syntax: the function usually isn't defined +when you compile your code (because that happens I handing the +template process over to C, so you need the +empty parentheses to tell Perl that the function is indeed a function. + +Retiring a worker can be useful to gracefully shut it down when the worker +deems this useful. For example, after executing a job, one could check +the process size or the number of jobs handled so far, and if either is +too high, the worker could ask to get retired, to avoid memory leaks to +accumulate. + +Example: retire a worker after it has handled roughly 100 requests. + + my $count = 0; + + sub my::worker { + + ++$count == 100 + and AnyEvent::Fork::Pool::retire (); + + ... normal code goes here + } =back +=head1 POOL PARAMETERS RECIPES + +This section describes some recipes for pool paramaters. These are mostly +meant for the synchronous RPC backend, as the asynchronous RPC backend +changes the rules considerably, making workers themselves responsible for +their scheduling. + +=over 4 + +=item low latency - set load = 1 + +If you need a deterministic low latency, you should set the C +parameter to C<1>. This ensures that never more than one job is sent to +each worker. This avoids having to wait for a previous job to finish. + +This makes most sense with the synchronous (default) backend, as the +asynchronous backend can handle multiple requests concurrently. + +=item lowest latency - set load = 1 and idle = max + +To achieve the lowest latency, you additionally should disable any dynamic +resizing of the pool by setting C to the same value as C. + +=item high throughput, cpu bound jobs - set load >= 2, max = #cpus + +To get high throughput with cpu-bound jobs, you should set the maximum +pool size to the number of cpus in your system, and C to at least +C<2>, to make sure there can be another job waiting for the worker when it +has finished one. + +The value of C<2> for C is the minimum value that I achieve +100% throughput, but if your parent process itself is sometimes busy, you +might need higher values. Also there is a limit on the amount of data that +can be "in flight" to the worker, so if you send big blobs of data to your +worker, C might have much less of an effect. + +=item high throughput, I/O bound jobs - set load >= 2, max = 1, or very high + +When your jobs are I/O bound, using more workers usually boils down to +higher throughput, depending very much on your actual workload - sometimes +having only one worker is best, for example, when you read or write big +files at maixmum speed, as a second worker will increase seek times. + +=back + +=head1 EXCEPTIONS + +The same "policy" as with L applies - exceptins will +not be caught, and exceptions in both worker and in callbacks causes +undesirable or undefined behaviour. + =head1 SEE ALSO L, to create the processes in the first place.