--- AnyEvent-Fork-RPC/RPC.pm 2013/04/17 19:43:48 1.6 +++ AnyEvent-Fork-RPC/RPC.pm 2013/04/17 22:04:49 1.10 @@ -4,8 +4,8 @@ =head1 SYNOPSIS - use AnyEvent::Fork; use AnyEvent::Fork::RPC; + # use AnyEvent::Fork is not needed my $rpc = AnyEvent::Fork ->new @@ -36,9 +36,12 @@ It also implements an asynchronous event mechanism from the child to the parent, that could be used for progress indications or other information. +Loading this module also always loads L, so you can make a +separate C if you wish, but you don't have to. + =head1 EXAMPLES -=head2 Synchronous Backend +=head2 Example 1: Synchronous Backend Here is a simple example that implements a backend that executes C and C calls, and reports their status back. It also reports the @@ -136,6 +139,45 @@ asynchronously C and C and a lot more, and more efficiently than this example, namely L. +=head3 Example 1a: the same with the asynchronous backend + +This example only shows what needs to be changed to use the async backend +instead. Doing this is not very useful, the purpose of this example is +to show the minimum amount of change that is required to go from the +synchronous to the asynchronous backend. + +To use the async backend in the previous example, you need to add the +C parameter to the C call: + + ->AnyEvent::Fork::RPC::run ("MyWorker::run", + async => 1, + ... + +And since the function call protocol is now changed, you need to adopt +C to the async API. + +First, you need to accept the extra initial C<$done> callback: + + sub run { + my ($done, $cmd, $path) = @_; + +And since a response is now generated when C<$done> is called, as opposed +to when the function returns, we need to call the C<$done> function with +the status: + + $done->($status or (0, "$!")); + +A few remarks are in order. First, it's quite pointless to use the async +backend for this example - but it I possible. Second, you can call +C<$done> before or after returning from the function. Third, having both +returned from the function and having called the C<$done> callback, the +child process may exit at any time, so you should call C<$done> only when +you really I done. + +=head2 Example 2: Asynchronous Backend + +#TODO + =head1 PARENT PROCESS USAGE This module exports nothing, and only implements a single function: @@ -152,7 +194,7 @@ use Guard (); use AnyEvent; -#use AnyEvent::Fork; +use AnyEvent::Fork; # we don't actually depend on it, this is for convenience our $VERSION = 0.1; @@ -256,6 +298,9 @@ =back +See the examples section earlier in this document for some actual +examples. + =cut our $STRING_SERIALISER = '(sub { pack "(w/a*)*", @_ }, sub { unpack "(w/a*)*", shift })'; @@ -278,13 +323,13 @@ my ($f, $t) = eval $serialiser; die $@ if $@; - my (@rcb, $fh, $shutdown, $wbuf, $ww, $rw); - my ($rlen, $rbuf) = 512 - 16; + my (@rcb, %rcb, $fh, $shutdown, $wbuf, $ww); + my ($rlen, $rbuf, $rw) = 512 - 16; my $wcb = sub { my $len = syswrite $fh, $wbuf; - if (!defined $len) { + unless (defined $len) { if ($! != Errno::EAGAIN && $! != Errno::EWOULDBLOCK) { undef $rw; undef $ww; # it ends here $on_error->("$!"); @@ -305,32 +350,39 @@ ->send_arg ($function, $arg{init}, $serialiser) ->run ("$module\::run", sub { $fh = shift; + + my ($id, $len); $rw = AE::io $fh, 0, sub { $rlen = $rlen * 2 + 16 if $rlen - 128 < length $rbuf; - my $len = sysread $fh, $rbuf, $rlen - length $rbuf, length $rbuf; + $len = sysread $fh, $rbuf, $rlen - length $rbuf, length $rbuf; if ($len) { - while (5 <= length $rbuf) { - $len = unpack "L", $rbuf; - 4 + $len <= length $rbuf + while (8 <= length $rbuf) { + ($id, $len) = unpack "LL", $rbuf; + 8 + $len <= length $rbuf or last; - my @r = $t->(substr $rbuf, 4, $len); - substr $rbuf, 0, $len + 4, ""; - - if (pop @r) { - $on_event->(@r); - } elsif (@rcb) { - (shift @rcb)->(@r); + my @r = $t->(substr $rbuf, 8, $len); + substr $rbuf, 0, 8 + $len, ""; + + if ($id) { + if (@rcb) { + (shift @rcb)->(@r); + } elsif (my $cb = delete $rcb{$id}) { + $cb->(@r); + } else { + undef $rw; undef $ww; + $on_error->("unexpected data from child"); + } } else { - undef $rw; undef $ww; - $on_error->("unexpected data from child"); + $on_event->(@r); } } } elsif (defined $len) { undef $rw; undef $ww; # it ends here - if (@rcb) { + if (@rcb || %rcb) { + use Data::Dump;ddx[\@rcb,\%rcb];#d# $on_error->("unexpected eof"); } else { $on_destroy->(); @@ -349,14 +401,28 @@ $ww ||= $fh && AE::io $fh, 1, $wcb; }; - sub { - push @rcb, pop; + my $id; - $guard; # keep it alive + $arg{async} + ? sub { + $id = ($id == 0xffffffff ? 0 : $id) + 1; + $id = ($id == 0xffffffff ? 0 : $id) + 1 while exists $rcb{$id}; # rarely loops - $wbuf .= pack "L/a*", &$f; - $ww ||= $fh && AE::io $fh, 1, $wcb; - } + $rcb{$id} = pop; + + $guard; # keep it alive + + $wbuf .= pack "LL/a*", $id, &$f; + $ww ||= $fh && AE::io $fh, 1, $wcb; + } + : sub { + push @rcb, pop; + + $guard; # keep it alive + + $wbuf .= pack "L/a*", &$f; + $ww ||= $fh && AE::io $fh, 1, $wcb; + } } =item $rpc->(..., $cb->(...)) @@ -381,6 +447,9 @@ this case, the child process will execute all remaining RPC calls, report their results, and then exit. +See the examples section earlier in this document for some actual +examples. + =back =head1 CHILD PROCESS USAGE @@ -398,6 +467,9 @@ child process to the parent, except that there is no notion of return values. +See the examples section earlier in this document for some actual +examples. + =back =head1 SEE ALSO