=head1 NAME AnyEvent::Fork::RPC - simple RPC extension for AnyEvent::Fork =head1 SYNOPSIS use AnyEvent::Fork; use AnyEvent::Fork::RPC; my $rpc = AnyEvent::Fork ->new ->require ("MyModule") ->AnyEvent::Fork::RPC::run ( "MyModule::server", ); my $cv = AE::cv; $rpc->(1, 2, 3, sub { print "MyModule::server returned @_\n"; $cv->send; }); $cv->recv; =head1 DESCRIPTION This module implements a simple RPC protocol and backend for processes created via L, allowing you to call a function in the child process and receive its return values (up to 4GB serialised). It implements two different backends: a synchronous one that works like a normal function call, and an asynchronous one that can run multiple jobs concurrently in the child, using AnyEvent. It also implements an asynchronous event mechanism from the child to the parent, that could be used for progress indications or other information. =head1 PARENT PROCESS USAGE This module exports nothing, and only implements a single function: =over 4 =cut package AnyEvent::Fork::RPC; use common::sense; use Errno (); use Guard (); use AnyEvent; #use AnyEvent::Fork; our $VERSION = 0.1; =item my $rpc = AnyEvent::Fork::RPC::run $fork, $function, [key => value...] The traditional way to call it. But it is way cooler to call it in the following way: =item my $rpc = $fork->AnyEvent::Fork::RPC::run ($function, [key => value...]) This C function/method can be used in place of the L method. Just like that method, it takes over the L process, but instead of calling the specified C<$function> directly, it runs a server that accepts RPC calls and handles responses. It returns a function reference that can be used to call the function in the child process, handling serialisation and data transfers. The following key/value pairs are allowed. It is recommended to have at least an C or C handler set. =over 4 =item on_error => $cb->($msg) Called on (fatal) errors, with a descriptive (hopefully) message. If this callback is not provided, but C is, then the C callback is called with the first argument being the string C, followed by the error message. If neither handler is provided it prints the error to STDERR and will start failing badly. =item on_event => $cb->(...) Called for every call to the C function in the child, with the arguments of that function passed to the callback. Also called on errors when no C handler is provided. =item init => $function (default none) When specified (by name), this function is called in the child as the very first thing when taking over the process, with all the arguments normally passed to the C function, except the communications socket. It can be used to do one-time things in the child such as storing passed parameters or opening database connections. =item async => $boolean (default: 0) The default server used in the child does all I/O blockingly, and only allows a single RPC call to execute concurrently. Setting C to a true value switches to another implementation that uses L in the child and allows multiple concurrent RPC calls. The actual API in the child is documented in the section that describes the calling semantics of the returned C<$rpc> function. =item serialiser => $string (default: '(sub { pack "(w/a*)*", @_ }, sub { unpack "(w/a*)*", shift })') All arguments, result data and event data have to be serialised to be transferred between the processes. For this, they have to be frozen and thawed in both parent and child processes. By default, only octet strings can be passed between the processes, which is reasonably fast and efficient. For more complicated use cases, you can provide your own freeze and thaw functions, by specifying a string with perl source code. It's supposed to return two code references when evaluated: the first receives a list of perl values and must return an octet string. The second receives the octet string and must return the original list of values. =back =cut our $SERIALISE_STRINGS = '(sub { pack "(w/a*)*", @_ }, sub { unpack "(w/a*)*", shift })'; sub run { my ($self, $function, %arg) = @_; my $serialiser = delete $arg{serialiser} || $SERIALISE_STRINGS; my $on_event = delete $arg{on_event}; my $on_error = delete $arg{on_error}; # default for on_error is to on_event, if specified $on_error ||= $on_event ? sub { $on_event->(error => shift) } : sub { die "AnyEvent::Fork::RPC: uncaught error: $_[0].\n" }; # default for on_event is to raise an error $on_event ||= sub { $on_error->("event received, but no on_event handler") }; my ($f, $t) = eval $serialiser; die $@ if $@; my (@rcb, $fh, $shutdown, $wbuf, $ww, $rbuf, $rw); my $wcb = sub { my $len = syswrite $fh, $wbuf; if (!defined $len) { if ($! != Errno::EAGAIN && $! != Errno::EWOULDBLOCK) { undef $rw; undef $ww; # it ends here $on_error->("$!"); } } substr $wbuf, 0, $len, ""; unless (length $wbuf) { undef $ww; $shutdown and shutdown $fh, 1; } }; my $module = "AnyEvent::Fork::RPC::" . ($arg{async} ? "Async" : "Sync"); $self->require ($module) ->send_arg ($function, $arg{init}, $serialiser) ->run ("$module\::run", sub { $fh = shift; $rw = AE::io $fh, 0, sub { my $len = sysread $fh, $rbuf, 512 + length $rbuf, length $rbuf; if ($len) { while (5 <= length $rbuf) { $len = unpack "L", $rbuf; if (4 + $len <= length $rbuf) { my @r = $t->(substr $rbuf, 4, $len); substr $rbuf, 0, $len + 4, ""; if (pop @r) { $on_event->(@r); } elsif (@rcb) { (shift @rcb)->(@r); } else { undef $rw; undef $ww; $on_error->("unexpected data from child"); } } } } elsif (defined $len) { undef $rw; undef $ww; # it ends here $on_error->("unexpected eof") if @rcb; } elsif ($! != Errno::EAGAIN && $! != Errno::EWOULDBLOCK) { undef $rw; undef $ww; # it ends here $on_error->("read: $!"); } }; $ww ||= AE::io $fh, 1, $wcb; }); my $guard = Guard::guard { $shutdown = 1; $ww ||= $fh && AE::io $fh, 1, $wcb; }; sub { push @rcb, pop; $guard; # keep it alive $wbuf .= pack "L/a*", &$f; $ww ||= $fh && AE::io $fh, 1, $wcb; } } =back =head1 CHILD PROCESS USAGE These functions are not available in this module. They are only available in the namespace of this module when the child is running, without having to load any extra module. They are part of the child-side API of L. =over 4 =item AnyEvent::Fork::RPC::quit This function can be called to gracefully stop the child process when it is idle. After this function is called, the process stops handling incoming RPC requests, but outstanding events and function return values will be sent to the parent. When all data has been sent, the process calls C. Since the parent might not expect the child to exit at random points in time, it is often better to signal the parent by sending an C and letting the parent close down the child process. =item AnyEvent::Fork::RPC::event ... Send an event to the parent. Events are a bit like RPC calls made by the child process to the parent, except that there is no notion of return values. =back =head1 SEE ALSO L (to create the processes in the first place), L (to manage whole pools of processes). =head1 AUTHOR AND CONTACT INFORMATION Marc Lehmann http://software.schmorp.de/pkg/AnyEvent-Fork-RPC =cut 1