--- AnyEvent-Fork-RPC/RPC.pm 2013/04/17 15:55:59 1.1 +++ AnyEvent-Fork-RPC/RPC.pm 2013/04/17 20:24:36 1.8 @@ -4,8 +4,8 @@ =head1 SYNOPSIS - use AnyEvent::Fork; use AnyEvent::Fork::RPC; + # use AnyEvent::Fork is not needed my $rpc = AnyEvent::Fork ->new @@ -36,6 +36,109 @@ It also implements an asynchronous event mechanism from the child to the parent, that could be used for progress indications or other information. +Loading this module also always loads L, so you can make a +separate C if you wish, but you don't have to. + +=head1 EXAMPLES + +=head2 Synchronous Backend + +Here is a simple example that implements a backend that executes C +and C calls, and reports their status back. It also reports the +number of requests it has processed every three requests, which is clearly +silly, but illustrates the use of events. + +First the parent process: + + use AnyEvent; + use AnyEvent::Fork; + use AnyEvent::Fork::RPC; + + my $done = AE::cv; + + my $rpc = AnyEvent::Fork + ->new + ->require ("MyWorker") + ->AnyEvent::Fork::RPC::run ("MyWorker::run", + on_error => sub { warn "FATAL: $_[0]"; exit 1 }, + on_event => sub { warn "$_[0] requests handled\n" }, + on_destroy => $done, + ); + + for my $id (1..6) { + $rpc->(rmdir => "/tmp/somepath/$id", sub { + $_[0] + or warn "/tmp/somepath/$id: $_[1]\n"; + }); + } + + undef $rpc; + + $done->recv; + +The parent creates the process, queues a few rmdir's. It then forgets +about the C<$rpc> object, so that the child exits after it has handled the +requests, and then it waits till the requests have been handled. + +The child is implemented using a separate module, C, shown here: + + package MyWorker; + + my $count; + + sub run { + my ($cmd, $path) = @_; + + AnyEvent::Fork::RPC::event ($count) + unless ++$count % 3; + + my $status = $cmd eq "rmdir" ? rmdir $path + : $cmd eq "unlink" ? unlink $path + : die "fatal error, illegal command '$cmd'"; + + $status or (0, "$!") + } + + 1 + +The C function first sends a "progress" event every three calls, and +then executes C or C, depending on the first parameter (or +dies with a fatal error - obviously, you must never let this happen :). + +Eventually it returns the status value true if the command was successful, +or the status value 0 and the stringified error message. + +On my system, running the first code fragment with the given +F in the current directory yields: + + /tmp/somepath/1: No such file or directory + /tmp/somepath/2: No such file or directory + 3 requests handled + /tmp/somepath/3: No such file or directory + /tmp/somepath/4: No such file or directory + /tmp/somepath/5: No such file or directory + 6 requests handled + /tmp/somepath/6: No such file or directory + +Obviously, none of the directories I am trying to delete even exist. Also, +the events and responses are processed in exactly the same order as +they were created in the child, which is true for both synchronous and +asynchronous backends. + +Note that the parentheses in the call to C are +not optional. That is because the function isn't defined when the code is +compiled. You can make sure it is visible by pre-loading the correct +backend module in the call to C: + + ->require ("AnyEvent::Fork::RPC::Sync", "MyWorker") + +Since the backend module declares the C function, loading it first +ensures that perl will correctly interpret calls to it. + +And as a final remark, there is a fine module on CPAN that can +asynchronously C and C and a lot more, and more efficiently +than this example, namely L. + =head1 PARENT PROCESS USAGE This module exports nothing, and only implements a single function: @@ -52,7 +155,7 @@ use Guard (); use AnyEvent; -#use AnyEvent::Fork; +use AnyEvent::Fork; # we don't actually depend on it, this is for convenience our $VERSION = 0.1; @@ -94,6 +197,14 @@ Also called on errors when no C handler is provided. +=item on_destroy => $cb->() + +Called when the C<$rpc> object has been destroyed and all requests have +been successfully handled. This is useful when you queue some requests and +want the child to go away after it has handled them. The problem is that +the parent must not exit either until all requests have been handled, and +this can be accomplished by waiting for this callback. + =item init => $function (default none) When specified (by name), this function is called in the child as the very @@ -104,6 +215,11 @@ It can be used to do one-time things in the child such as storing passed parameters or opening database connections. +It is called very early - before the serialisers are created or the +C<$function> name is resolved into a function reference, so it could be +used to load any modules that provide the serialiser or function. It can +not, however, create events. + =item async => $boolean (default: 0) The default server used in the child does all I/O blockingly, and only @@ -115,6 +231,13 @@ The actual API in the child is documented in the section that describes the calling semantics of the returned C<$rpc> function. +If you want to pre-load the actual back-end modules to enable memory +sharing, then you should load C for +synchronous, and C for asynchronous mode. + +If you use a template process and want to fork both sync and async +children, then it is permissible to load both modules. + =item serialiser => $string (default: '(sub { pack "(w/a*)*", @_ }, sub { unpack "(w/a*)*", shift })') All arguments, result data and event data have to be serialised to be @@ -130,18 +253,25 @@ perl values and must return an octet string. The second receives the octet string and must return the original list of values. +If you need an external module for serialisation, then you can either +pre-load it into your L process, or you can add a C +or C statement into the serialiser string. Or both. + =back +See the examples section earlier in this document for some actual examples. + =cut -our $SERIALISE_STRINGS = '(sub { pack "(w/a*)*", @_ }, sub { unpack "(w/a*)*", shift })'; +our $STRING_SERIALISER = '(sub { pack "(w/a*)*", @_ }, sub { unpack "(w/a*)*", shift })'; sub run { my ($self, $function, %arg) = @_; - my $serialiser = delete $arg{serialiser} || $SERIALISE_STRINGS; + my $serialiser = delete $arg{serialiser} || $STRING_SERIALISER; my $on_event = delete $arg{on_event}; my $on_error = delete $arg{on_error}; + my $on_destroy = delete $arg{on_destroy}; # default for on_error is to on_event, if specified $on_error ||= $on_event @@ -153,7 +283,8 @@ my ($f, $t) = eval $serialiser; die $@ if $@; - my (@rcb, $fh, $shutdown, $wbuf, $ww, $rbuf, $rw); + my (@rcb, $fh, $shutdown, $wbuf, $ww, $rw); + my ($rlen, $rbuf) = 512 - 16; my $wcb = sub { my $len = syswrite $fh, $wbuf; @@ -180,29 +311,35 @@ ->run ("$module\::run", sub { $fh = shift; $rw = AE::io $fh, 0, sub { - my $len = sysread $fh, $rbuf, 512 + length $rbuf, length $rbuf; + $rlen = $rlen * 2 + 16 if $rlen - 128 < length $rbuf; + my $len = sysread $fh, $rbuf, $rlen - length $rbuf, length $rbuf; if ($len) { - while (5 <= length $rbuf) { + while (4 <= length $rbuf) { $len = unpack "L", $rbuf; - if (4 + $len <= length $rbuf) { - my @r = $t->(substr $rbuf, 4, $len); - substr $rbuf, 0, $len + 4, ""; - - if (pop @r) { - $on_event->(@r); - } elsif (@rcb) { - (shift @rcb)->(@r); - } else { - undef $rw; undef $ww; - $on_error->("unexpected data from child"); - } + 4 + $len <= length $rbuf + or last; + + my @r = $t->(substr $rbuf, 4, $len); + substr $rbuf, 0, $len + 4, ""; + + if (pop @r) { + $on_event->(@r); + } elsif (@rcb) { + (shift @rcb)->(@r); + } else { + undef $rw; undef $ww; + $on_error->("unexpected data from child"); } } } elsif (defined $len) { undef $rw; undef $ww; # it ends here - $on_error->("unexpected eof") - if @rcb; + + if (@rcb) { + $on_error->("unexpected eof"); + } else { + $on_destroy->(); + } } elsif ($! != Errno::EAGAIN && $! != Errno::EWOULDBLOCK) { undef $rw; undef $ww; # it ends here $on_error->("read: $!"); @@ -227,29 +364,41 @@ } } -=back +=item $rpc->(..., $cb->(...)) -=head1 CHILD PROCESS USAGE +The RPC object returned by C is actually a code +reference. There are two things you can do with it: call it, and let it go +out of scope (let it get destroyed). + +If C was false when C<$rpc> was created (the default), then, if you +call C<$rpc>, the C<$function> is invoked with all arguments passed to +C<$rpc> except the last one (the callback). When the function returns, the +callback will be invoked with all the return values. + +If C was true, then the C<$function> receives an additional +initial argument, the result callback. In this case, returning from +C<$function> does nothing - the function only counts as "done" when the +result callback is called, and any arguments passed to it are considered +the return values. This makes it possible to "return" from event handlers +or e.g. Coro threads. + +The other thing that can be done with the RPC object is to destroy it. In +this case, the child process will execute all remaining RPC calls, report +their results, and then exit. -These functions are not available in this module. They are only available -in the namespace of this module when the child is running, without -having to load any extra module. They are part of the child-side API of -L. +See the examples section earlier in this document for some actual +examples. -=over 4 +=back -=item AnyEvent::Fork::RPC::quit +=head1 CHILD PROCESS USAGE -This function can be called to gracefully stop the child process when it -is idle. +The following function is not available in this module. They are only +available in the namespace of this module when the child is running, +without having to load any extra modules. They are part of the child-side +API of L. -After this function is called, the process stops handling incoming RPC -requests, but outstanding events and function return values will be sent -to the parent. When all data has been sent, the process calls C. - -Since the parent might not expect the child to exit at random points in -time, it is often better to signal the parent by sending an C and -letting the parent close down the child process. +=over 4 =item AnyEvent::Fork::RPC::event ... @@ -257,6 +406,9 @@ child process to the parent, except that there is no notion of return values. +See the examples section earlier in this document for some actual +examples. + =back =head1 SEE ALSO