=head1 NAME AnyEvent::Fork::Pool - simple process pool manager on top of AnyEvent::Fork =head1 SYNOPSIS use AnyEvent; use AnyEvent::Fork::Pool; # use AnyEvent::Fork is not needed # all parameters with default values my $pool = new AnyEvent::Fork::Pool "MyWorker::run", # pool management min => 0, # minimum # of processes max => 8, # maximum # of processes busy_time => 0, # wait this before starting a new process max_idle => 1, # wait this before killing an idle process idle_time => 1, # at most this many idle processes # template process template => AnyEvent::Fork->new, # the template process to use require => [MyWorker::], # module(s) to load eval => "# perl code to execute in template", on_destroy => (my $finish = AE::cv), # parameters passed to AnyEvent::Fork::RPC async => 0, on_error => sub { die "FATAL: $_[0]\n" }, on_event => sub { my @ev = @_ }, init => "MyWorker::init", serialiser => $AnyEvent::Fork::RPC::STRING_SERIALISER, ; for (1..10) { $pool->call (doit => $_, sub { print "MyWorker::run returned @_\n"; }); } undef $pool; $finish->recv; =head1 DESCRIPTION This module uses processes created via L and the RPC protocol implement in L to create a load-balanced pool of processes that handles jobs. Understanding of L is helpful but not critical to be able to use this module, but a thorough understanding of L is, as it defines the actual API that needs to be implemented in the children. =head1 EXAMPLES =head1 API =over 4 =cut package AnyEvent::Fork::Pool; use common::sense; use Guard (); use AnyEvent; use AnyEvent::Fork; # we don't actually depend on it, this is for convenience use AnyEvent::Fork::RPC; our $VERSION = 0.1; =item my $rpc = new AnyEvent::Fork::RPC::pool $function, [key => value...] =over 4 =item on_error => $cb->($msg) Called on (fatal) errors, with a descriptive (hopefully) message. If this callback is not provided, but C is, then the C callback is called with the first argument being the string C, followed by the error message. If neither handler is provided it prints the error to STDERR and will start failing badly. =item on_event => $cb->(...) Called for every call to the C function in the child, with the arguments of that function passed to the callback. Also called on errors when no C handler is provided. =item on_destroy => $cb->() Called when the C<$rpc> object has been destroyed and all requests have been successfully handled. This is useful when you queue some requests and want the child to go away after it has handled them. The problem is that the parent must not exit either until all requests have been handled, and this can be accomplished by waiting for this callback. =item init => $function (default none) When specified (by name), this function is called in the child as the very first thing when taking over the process, with all the arguments normally passed to the C function, except the communications socket. It can be used to do one-time things in the child such as storing passed parameters or opening database connections. It is called very early - before the serialisers are created or the C<$function> name is resolved into a function reference, so it could be used to load any modules that provide the serialiser or function. It can not, however, create events. =item async => $boolean (default: 0) The default server used in the child does all I/O blockingly, and only allows a single RPC call to execute concurrently. Setting C to a true value switches to another implementation that uses L in the child and allows multiple concurrent RPC calls. The actual API in the child is documented in the section that describes the calling semantics of the returned C<$rpc> function. If you want to pre-load the actual back-end modules to enable memory sharing, then you should load C for synchronous, and C for asynchronous mode. If you use a template process and want to fork both sync and async children, then it is permissible to load both modules. =item serialiser => $string (default: '(sub { pack "(w/a*)*", @_ }, sub { unpack "(w/a*)*", shift })') All arguments, result data and event data have to be serialised to be transferred between the processes. For this, they have to be frozen and thawed in both parent and child processes. By default, only octet strings can be passed between the processes, which is reasonably fast and efficient. For more complicated use cases, you can provide your own freeze and thaw functions, by specifying a string with perl source code. It's supposed to return two code references when evaluated: the first receives a list of perl values and must return an octet string. The second receives the octet string and must return the original list of values. If you need an external module for serialisation, then you can either pre-load it into your L process, or you can add a C or C statement into the serialiser string. Or both. =back See the examples section earlier in this document for some actual examples. =cut sub new { my ($self, $function, %arg) = @_; my $serialiser = delete $arg{serialiser} || $STRING_SERIALISER; my $on_event = delete $arg{on_event}; my $on_error = delete $arg{on_error}; my $on_destroy = delete $arg{on_destroy}; # default for on_error is to on_event, if specified $on_error ||= $on_event ? sub { $on_event->(error => shift) } : sub { die "AnyEvent::Fork::RPC: uncaught error: $_[0].\n" }; # default for on_event is to raise an error $on_event ||= sub { $on_error->("event received, but no on_event handler") }; my ($f, $t) = eval $serialiser; die $@ if $@; my (@rcb, %rcb, $fh, $shutdown, $wbuf, $ww); my ($rlen, $rbuf, $rw) = 512 - 16; my $wcb = sub { my $len = syswrite $fh, $wbuf; unless (defined $len) { if ($! != Errno::EAGAIN && $! != Errno::EWOULDBLOCK) { undef $rw; undef $ww; # it ends here $on_error->("$!"); } } substr $wbuf, 0, $len, ""; unless (length $wbuf) { undef $ww; $shutdown and shutdown $fh, 1; } }; my $module = "AnyEvent::Fork::RPC::" . ($arg{async} ? "Async" : "Sync"); $self->require ($module) ->send_arg ($function, $arg{init}, $serialiser) ->run ("$module\::run", sub { $fh = shift; my ($id, $len); $rw = AE::io $fh, 0, sub { $rlen = $rlen * 2 + 16 if $rlen - 128 < length $rbuf; $len = sysread $fh, $rbuf, $rlen - length $rbuf, length $rbuf; if ($len) { while (8 <= length $rbuf) { ($id, $len) = unpack "LL", $rbuf; 8 + $len <= length $rbuf or last; my @r = $t->(substr $rbuf, 8, $len); substr $rbuf, 0, 8 + $len, ""; if ($id) { if (@rcb) { (shift @rcb)->(@r); } elsif (my $cb = delete $rcb{$id}) { $cb->(@r); } else { undef $rw; undef $ww; $on_error->("unexpected data from child"); } } else { $on_event->(@r); } } } elsif (defined $len) { undef $rw; undef $ww; # it ends here if (@rcb || %rcb) { $on_error->("unexpected eof"); } else { $on_destroy->(); } } elsif ($! != Errno::EAGAIN && $! != Errno::EWOULDBLOCK) { undef $rw; undef $ww; # it ends here $on_error->("read: $!"); } }; $ww ||= AE::io $fh, 1, $wcb; }); my $guard = Guard::guard { $shutdown = 1; $ww ||= $fh && AE::io $fh, 1, $wcb; }; my $id; $arg{async} ? sub { $id = ($id == 0xffffffff ? 0 : $id) + 1; $id = ($id == 0xffffffff ? 0 : $id) + 1 while exists $rcb{$id}; # rarely loops $rcb{$id} = pop; $guard; # keep it alive $wbuf .= pack "LL/a*", $id, &$f; $ww ||= $fh && AE::io $fh, 1, $wcb; } : sub { push @rcb, pop; $guard; # keep it alive $wbuf .= pack "L/a*", &$f; $ww ||= $fh && AE::io $fh, 1, $wcb; } } =item $pool->call (..., $cb->(...)) =back =head1 SEE ALSO L, to create the processes in the first place. L, which implements the RPC protocol and API. =head1 AUTHOR AND CONTACT INFORMATION Marc Lehmann http://software.schmorp.de/pkg/AnyEvent-Fork-Pool =cut 1