=head1 NAME AnyEvent::Fork::RPC - simple RPC extension for AnyEvent::Fork =head1 SYNOPSIS use AnyEvent::Fork::RPC; # use AnyEvent::Fork is not needed my $rpc = AnyEvent::Fork ->new ->require ("MyModule") ->AnyEvent::Fork::RPC::run ( "MyModule::server", ); my $cv = AE::cv; $rpc->(1, 2, 3, sub { print "MyModule::server returned @_\n"; $cv->send; }); $cv->recv; =head1 DESCRIPTION This module implements a simple RPC protocol and backend for processes created via L, allowing you to call a function in the child process and receive its return values (up to 4GB serialised). It implements two different backends: a synchronous one that works like a normal function call, and an asynchronous one that can run multiple jobs concurrently in the child, using AnyEvent. It also implements an asynchronous event mechanism from the child to the parent, that could be used for progress indications or other information. Loading this module also always loads L, so you can make a separate C if you wish, but you don't have to. =head1 EXAMPLES =head2 Synchronous Backend Here is a simple example that implements a backend that executes C and C calls, and reports their status back. It also reports the number of requests it has processed every three requests, which is clearly silly, but illustrates the use of events. First the parent process: use AnyEvent; use AnyEvent::Fork; use AnyEvent::Fork::RPC; my $done = AE::cv; my $rpc = AnyEvent::Fork ->new ->require ("MyWorker") ->AnyEvent::Fork::RPC::run ("MyWorker::run", on_error => sub { warn "FATAL: $_[0]"; exit 1 }, on_event => sub { warn "$_[0] requests handled\n" }, on_destroy => $done, ); for my $id (1..6) { $rpc->(rmdir => "/tmp/somepath/$id", sub { $_[0] or warn "/tmp/somepath/$id: $_[1]\n"; }); } undef $rpc; $done->recv; The parent creates the process, queues a few rmdir's. It then forgets about the C<$rpc> object, so that the child exits after it has handled the requests, and then it waits till the requests have been handled. The child is implemented using a separate module, C, shown here: package MyWorker; my $count; sub run { my ($cmd, $path) = @_; AnyEvent::Fork::RPC::event ($count) unless ++$count % 3; my $status = $cmd eq "rmdir" ? rmdir $path : $cmd eq "unlink" ? unlink $path : die "fatal error, illegal command '$cmd'"; $status or (0, "$!") } 1 The C function first sends a "progress" event every three calls, and then executes C or C, depending on the first parameter (or dies with a fatal error - obviously, you must never let this happen :). Eventually it returns the status value true if the command was successful, or the status value 0 and the stringified error message. On my system, running the first code fragment with the given F in the current directory yields: /tmp/somepath/1: No such file or directory /tmp/somepath/2: No such file or directory 3 requests handled /tmp/somepath/3: No such file or directory /tmp/somepath/4: No such file or directory /tmp/somepath/5: No such file or directory 6 requests handled /tmp/somepath/6: No such file or directory Obviously, none of the directories I am trying to delete even exist. Also, the events and responses are processed in exactly the same order as they were created in the child, which is true for both synchronous and asynchronous backends. Note that the parentheses in the call to C are not optional. That is because the function isn't defined when the code is compiled. You can make sure it is visible by pre-loading the correct backend module in the call to C: ->require ("AnyEvent::Fork::RPC::Sync", "MyWorker") Since the backend module declares the C function, loading it first ensures that perl will correctly interpret calls to it. And as a final remark, there is a fine module on CPAN that can asynchronously C and C and a lot more, and more efficiently than this example, namely L. =head1 PARENT PROCESS USAGE This module exports nothing, and only implements a single function: =over 4 =cut package AnyEvent::Fork::RPC; use common::sense; use Errno (); use Guard (); use AnyEvent; use AnyEvent::Fork; # we don't actually depend on it, this is for convenience our $VERSION = 0.1; =item my $rpc = AnyEvent::Fork::RPC::run $fork, $function, [key => value...] The traditional way to call it. But it is way cooler to call it in the following way: =item my $rpc = $fork->AnyEvent::Fork::RPC::run ($function, [key => value...]) This C function/method can be used in place of the L method. Just like that method, it takes over the L process, but instead of calling the specified C<$function> directly, it runs a server that accepts RPC calls and handles responses. It returns a function reference that can be used to call the function in the child process, handling serialisation and data transfers. The following key/value pairs are allowed. It is recommended to have at least an C or C handler set. =over 4 =item on_error => $cb->($msg) Called on (fatal) errors, with a descriptive (hopefully) message. If this callback is not provided, but C is, then the C callback is called with the first argument being the string C, followed by the error message. If neither handler is provided it prints the error to STDERR and will start failing badly. =item on_event => $cb->(...) Called for every call to the C function in the child, with the arguments of that function passed to the callback. Also called on errors when no C handler is provided. =item on_destroy => $cb->() Called when the C<$rpc> object has been destroyed and all requests have been successfully handled. This is useful when you queue some requests and want the child to go away after it has handled them. The problem is that the parent must not exit either until all requests have been handled, and this can be accomplished by waiting for this callback. =item init => $function (default none) When specified (by name), this function is called in the child as the very first thing when taking over the process, with all the arguments normally passed to the C function, except the communications socket. It can be used to do one-time things in the child such as storing passed parameters or opening database connections. It is called very early - before the serialisers are created or the C<$function> name is resolved into a function reference, so it could be used to load any modules that provide the serialiser or function. It can not, however, create events. =item async => $boolean (default: 0) The default server used in the child does all I/O blockingly, and only allows a single RPC call to execute concurrently. Setting C to a true value switches to another implementation that uses L in the child and allows multiple concurrent RPC calls. The actual API in the child is documented in the section that describes the calling semantics of the returned C<$rpc> function. If you want to pre-load the actual back-end modules to enable memory sharing, then you should load C for synchronous, and C for asynchronous mode. If you use a template process and want to fork both sync and async children, then it is permissible to load both modules. =item serialiser => $string (default: '(sub { pack "(w/a*)*", @_ }, sub { unpack "(w/a*)*", shift })') All arguments, result data and event data have to be serialised to be transferred between the processes. For this, they have to be frozen and thawed in both parent and child processes. By default, only octet strings can be passed between the processes, which is reasonably fast and efficient. For more complicated use cases, you can provide your own freeze and thaw functions, by specifying a string with perl source code. It's supposed to return two code references when evaluated: the first receives a list of perl values and must return an octet string. The second receives the octet string and must return the original list of values. If you need an external module for serialisation, then you can either pre-load it into your L process, or you can add a C or C statement into the serialiser string. Or both. =back See the examples section earlier in this document for some actual examples. =cut our $STRING_SERIALISER = '(sub { pack "(w/a*)*", @_ }, sub { unpack "(w/a*)*", shift })'; sub run { my ($self, $function, %arg) = @_; my $serialiser = delete $arg{serialiser} || $STRING_SERIALISER; my $on_event = delete $arg{on_event}; my $on_error = delete $arg{on_error}; my $on_destroy = delete $arg{on_destroy}; # default for on_error is to on_event, if specified $on_error ||= $on_event ? sub { $on_event->(error => shift) } : sub { die "AnyEvent::Fork::RPC: uncaught error: $_[0].\n" }; # default for on_event is to raise an error $on_event ||= sub { $on_error->("event received, but no on_event handler") }; my ($f, $t) = eval $serialiser; die $@ if $@; my (@rcb, %rcb, $fh, $shutdown, $wbuf, $ww); my ($rlen, $rbuf, $rw) = 512 - 16; my $wcb = sub { my $len = syswrite $fh, $wbuf; unless (defined $len) { if ($! != Errno::EAGAIN && $! != Errno::EWOULDBLOCK) { undef $rw; undef $ww; # it ends here $on_error->("$!"); } } substr $wbuf, 0, $len, ""; unless (length $wbuf) { undef $ww; $shutdown and shutdown $fh, 1; } }; my $module = "AnyEvent::Fork::RPC::" . ($arg{async} ? "Async" : "Sync"); $self->require ($module) ->send_arg ($function, $arg{init}, $serialiser) ->run ("$module\::run", sub { $fh = shift; my ($id, $len); $rw = AE::io $fh, 0, sub { $rlen = $rlen * 2 + 16 if $rlen - 128 < length $rbuf; $len = sysread $fh, $rbuf, $rlen - length $rbuf, length $rbuf; if ($len) { while (8 <= length $rbuf) { ($id, $len) = unpack "LL", $rbuf; 8 + $len <= length $rbuf or last; my @r = $t->(substr $rbuf, 8, $len); substr $rbuf, 0, 8 + $len, ""; if ($id) { if (@rcb) { (shift @rcb)->(@r); } elsif (my $cb = delete $rcb{$id}) { $cb->(@r); } else { undef $rw; undef $ww; $on_error->("unexpected data from child"); } } else { $on_event->(@r); } } } elsif (defined $len) { undef $rw; undef $ww; # it ends here if (@rcb || %rcb) { use Data::Dump;ddx[\@rcb,\%rcb];#d# $on_error->("unexpected eof"); } else { $on_destroy->(); } } elsif ($! != Errno::EAGAIN && $! != Errno::EWOULDBLOCK) { undef $rw; undef $ww; # it ends here $on_error->("read: $!"); } }; $ww ||= AE::io $fh, 1, $wcb; }); my $guard = Guard::guard { $shutdown = 1; $ww ||= $fh && AE::io $fh, 1, $wcb; }; my $id; $arg{async} ? sub { $id = ($id == 0xffffffff ? 0 : $id) + 1; $id = ($id == 0xffffffff ? 0 : $id) + 1 while exists $rcb{$id}; # rarely loops $rcb{$id} = pop; $guard; # keep it alive $wbuf .= pack "LL/a*", $id, &$f; $ww ||= $fh && AE::io $fh, 1, $wcb; } : sub { push @rcb, pop; $guard; # keep it alive $wbuf .= pack "L/a*", &$f; $ww ||= $fh && AE::io $fh, 1, $wcb; } } =item $rpc->(..., $cb->(...)) The RPC object returned by C is actually a code reference. There are two things you can do with it: call it, and let it go out of scope (let it get destroyed). If C was false when C<$rpc> was created (the default), then, if you call C<$rpc>, the C<$function> is invoked with all arguments passed to C<$rpc> except the last one (the callback). When the function returns, the callback will be invoked with all the return values. If C was true, then the C<$function> receives an additional initial argument, the result callback. In this case, returning from C<$function> does nothing - the function only counts as "done" when the result callback is called, and any arguments passed to it are considered the return values. This makes it possible to "return" from event handlers or e.g. Coro threads. The other thing that can be done with the RPC object is to destroy it. In this case, the child process will execute all remaining RPC calls, report their results, and then exit. See the examples section earlier in this document for some actual examples. =back =head1 CHILD PROCESS USAGE The following function is not available in this module. They are only available in the namespace of this module when the child is running, without having to load any extra modules. They are part of the child-side API of L. =over 4 =item AnyEvent::Fork::RPC::event ... Send an event to the parent. Events are a bit like RPC calls made by the child process to the parent, except that there is no notion of return values. See the examples section earlier in this document for some actual examples. =back =head1 SEE ALSO L (to create the processes in the first place), L (to manage whole pools of processes). =head1 AUTHOR AND CONTACT INFORMATION Marc Lehmann http://software.schmorp.de/pkg/AnyEvent-Fork-RPC =cut 1