--- AnyEvent-Fork-RPC/RPC.pm 2013/04/17 17:08:16 1.2 +++ AnyEvent-Fork-RPC/RPC.pm 2013/04/18 07:59:46 1.11 @@ -4,8 +4,8 @@ =head1 SYNOPSIS - use AnyEvent::Fork; use AnyEvent::Fork::RPC; + # use AnyEvent::Fork is not needed my $rpc = AnyEvent::Fork ->new @@ -36,6 +36,257 @@ It also implements an asynchronous event mechanism from the child to the parent, that could be used for progress indications or other information. +Loading this module also always loads L, so you can make a +separate C if you wish, but you don't have to. + +=head1 EXAMPLES + +=head2 Example 1: Synchronous Backend + +Here is a simple example that implements a backend that executes C +and C calls, and reports their status back. It also reports the +number of requests it has processed every three requests, which is clearly +silly, but illustrates the use of events. + +First the parent process: + + use AnyEvent; + use AnyEvent::Fork::RPC; + + my $done = AE::cv; + + my $rpc = AnyEvent::Fork + ->new + ->require ("MyWorker") + ->AnyEvent::Fork::RPC::run ("MyWorker::run", + on_error => sub { warn "FATAL: $_[0]"; exit 1 }, + on_event => sub { warn "$_[0] requests handled\n" }, + on_destroy => $done, + ); + + for my $id (1..6) { + $rpc->(rmdir => "/tmp/somepath/$id", sub { + $_[0] + or warn "/tmp/somepath/$id: $_[1]\n"; + }); + } + + undef $rpc; + + $done->recv; + +The parent creates the process, queues a few rmdir's. It then forgets +about the C<$rpc> object, so that the child exits after it has handled the +requests, and then it waits till the requests have been handled. + +The child is implemented using a separate module, C, shown here: + + package MyWorker; + + my $count; + + sub run { + my ($cmd, $path) = @_; + + AnyEvent::Fork::RPC::event ($count) + unless ++$count % 3; + + my $status = $cmd eq "rmdir" ? rmdir $path + : $cmd eq "unlink" ? unlink $path + : die "fatal error, illegal command '$cmd'"; + + $status or (0, "$!") + } + + 1 + +The C function first sends a "progress" event every three calls, and +then executes C or C, depending on the first parameter (or +dies with a fatal error - obviously, you must never let this happen :). + +Eventually it returns the status value true if the command was successful, +or the status value 0 and the stringified error message. + +On my system, running the first code fragment with the given +F in the current directory yields: + + /tmp/somepath/1: No such file or directory + /tmp/somepath/2: No such file or directory + 3 requests handled + /tmp/somepath/3: No such file or directory + /tmp/somepath/4: No such file or directory + /tmp/somepath/5: No such file or directory + 6 requests handled + /tmp/somepath/6: No such file or directory + +Obviously, none of the directories I am trying to delete even exist. Also, +the events and responses are processed in exactly the same order as +they were created in the child, which is true for both synchronous and +asynchronous backends. + +Note that the parentheses in the call to C are +not optional. That is because the function isn't defined when the code is +compiled. You can make sure it is visible by pre-loading the correct +backend module in the call to C: + + ->require ("AnyEvent::Fork::RPC::Sync", "MyWorker") + +Since the backend module declares the C function, loading it first +ensures that perl will correctly interpret calls to it. + +And as a final remark, there is a fine module on CPAN that can +asynchronously C and C and a lot more, and more efficiently +than this example, namely L. + +=head3 Example 1a: the same with the asynchronous backend + +This example only shows what needs to be changed to use the async backend +instead. Doing this is not very useful, the purpose of this example is +to show the minimum amount of change that is required to go from the +synchronous to the asynchronous backend. + +To use the async backend in the previous example, you need to add the +C parameter to the C call: + + ->AnyEvent::Fork::RPC::run ("MyWorker::run", + async => 1, + ... + +And since the function call protocol is now changed, you need to adopt +C to the async API. + +First, you need to accept the extra initial C<$done> callback: + + sub run { + my ($done, $cmd, $path) = @_; + +And since a response is now generated when C<$done> is called, as opposed +to when the function returns, we need to call the C<$done> function with +the status: + + $done->($status or (0, "$!")); + +A few remarks are in order. First, it's quite pointless to use the async +backend for this example - but it I possible. Second, you can call +C<$done> before or after returning from the function. Third, having both +returned from the function and having called the C<$done> callback, the +child process may exit at any time, so you should call C<$done> only when +you really I done. + +=head2 Example 2: Asynchronous Backend + +This example implements multiple count-downs in the child, using +L timers. While this is a bit silly (one could use timers in te +parent just as well), it illustrates the ability to use AnyEvent in the +child and the fact that responses can arrive in a different order then the +requests. + +It also shows how to embed the actual child code into a C<__DATA__> +section, so it doesn't need any external files at all. + +And when your parent process is often busy, and you have stricter timing +requirements, then running timers in a child process suddenly doesn't look +so silly anymore. + +Without further ado, here is the code: + + use AnyEvent; + use AnyEvent::Fork::RPC; + + my $done = AE::cv; + + my $rpc = AnyEvent::Fork + ->new + ->require ("AnyEvent::Fork::RPC::Async") + ->eval (do { local $/; }) + ->AnyEvent::Fork::RPC::run ("run", + async => 1, + on_error => sub { warn "FATAL: $_[0]"; exit 1 }, + on_event => sub { print $_[0] }, + on_destroy => $done, + ); + + for my $count (3, 2, 1) { + $rpc->($count, sub { + warn "job $count finished\n"; + }); + } + + undef $rpc; + + $done->recv; + + __DATA__ + + # this ends up in main, as we don't use a package declaration + + use AnyEvent; + + sub run { + my ($done, $count) = @_; + + my $n; + + AnyEvent::Fork::RPC::event "starting to count up to $count\n"; + + my $w; $w = AE::timer 1, 1, sub { + ++$n; + + AnyEvent::Fork::RPC::event "count $n of $count\n"; + + if ($n == $count) { + undef $w; + $done->(); + } + }; + } + +The parent part (the one before the C<__DATA__> section) isn't very +different from the earlier examples. It sets async mode, preloads +the backend module (so the C function is +declared), uses a slightly different C handler (which we use +simply for logging purposes) and then, instead of loading a module with +the actual worker code, it C's the code from the data section in the +child process. + +It then starts three countdowns, from 3 to 1 seconds downwards, destroys +the rpc object so the example finishes eventually, and then just waits for +the stuff to trickle in. + +The worker code uses the event function to log some progress messages, but +mostly just creates a recurring one-second timer. + +The timer callback increments a counter, logs a message, and eventually, +when the count has been reached, calls the finish callback. + +On my system, this results in the following output. Since all timers fire +at roughly the same time, the actual order isn't guaranteed, but the order +shown is very likely what you would get, too. + + starting to count up to 3 + starting to count up to 2 + starting to count up to 1 + count 1 of 3 + count 1 of 2 + count 1 of 1 + job 1 finished + count 2 of 2 + job 2 finished + count 2 of 3 + count 3 of 3 + job 3 finished + +While the overall ordering isn't guaranteed, the async backend still +guarantees that events and responses are delivered to the parent process +in the exact same ordering as they were generated in the child process. + +And unless your system is I busy, it should clearly show that the +job started last will finish first, as it has the lowest count. + +This concludes the async example. Since L does not +actually fork, you are free to use about any module in the child, not just +L, but also L, or L for example. + =head1 PARENT PROCESS USAGE This module exports nothing, and only implements a single function: @@ -52,7 +303,7 @@ use Guard (); use AnyEvent; -#use AnyEvent::Fork; +use AnyEvent::Fork; # we don't actually depend on it, this is for convenience our $VERSION = 0.1; @@ -94,6 +345,14 @@ Also called on errors when no C handler is provided. +=item on_destroy => $cb->() + +Called when the C<$rpc> object has been destroyed and all requests have +been successfully handled. This is useful when you queue some requests and +want the child to go away after it has handled them. The problem is that +the parent must not exit either until all requests have been handled, and +this can be accomplished by waiting for this callback. + =item init => $function (default none) When specified (by name), this function is called in the child as the very @@ -104,6 +363,11 @@ It can be used to do one-time things in the child such as storing passed parameters or opening database connections. +It is called very early - before the serialisers are created or the +C<$function> name is resolved into a function reference, so it could be +used to load any modules that provide the serialiser or function. It can +not, however, create events. + =item async => $boolean (default: 0) The default server used in the child does all I/O blockingly, and only @@ -119,6 +383,9 @@ sharing, then you should load C for synchronous, and C for asynchronous mode. +If you use a template process and want to fork both sync and async +children, then it is permissible to load both modules. + =item serialiser => $string (default: '(sub { pack "(w/a*)*", @_ }, sub { unpack "(w/a*)*", shift })') All arguments, result data and event data have to be serialised to be @@ -140,19 +407,20 @@ =back +See the examples section earlier in this document for some actual +examples. + =cut our $STRING_SERIALISER = '(sub { pack "(w/a*)*", @_ }, sub { unpack "(w/a*)*", shift })'; -# ideally, we want (SvLEN - SvCUR) || 1024 or somesuch... -sub rlen($) { ($_[0] < 384 ? 512 + 16 : 2 << int +(log $_[0] + 512) / log 2) - $_[0] - 16 } - sub run { my ($self, $function, %arg) = @_; my $serialiser = delete $arg{serialiser} || $STRING_SERIALISER; my $on_event = delete $arg{on_event}; my $on_error = delete $arg{on_error}; + my $on_destroy = delete $arg{on_destroy}; # default for on_error is to on_event, if specified $on_error ||= $on_event @@ -164,12 +432,13 @@ my ($f, $t) = eval $serialiser; die $@ if $@; - my (@rcb, $fh, $shutdown, $wbuf, $ww, $rbuf, $rw); + my (@rcb, %rcb, $fh, $shutdown, $wbuf, $ww); + my ($rlen, $rbuf, $rw) = 512 - 16; my $wcb = sub { my $len = syswrite $fh, $wbuf; - if (!defined $len) { + unless (defined $len) { if ($! != Errno::EAGAIN && $! != Errno::EWOULDBLOCK) { undef $rw; undef $ww; # it ends here $on_error->("$!"); @@ -190,31 +459,43 @@ ->send_arg ($function, $arg{init}, $serialiser) ->run ("$module\::run", sub { $fh = shift; + + my ($id, $len); $rw = AE::io $fh, 0, sub { - my $len = sysread $fh, $rbuf, rlen length $rbuf, length $rbuf; + $rlen = $rlen * 2 + 16 if $rlen - 128 < length $rbuf; + $len = sysread $fh, $rbuf, $rlen - length $rbuf, length $rbuf; if ($len) { - while (5 <= length $rbuf) { - $len = unpack "L", $rbuf; - 4 + $len <= length $rbuf + while (8 <= length $rbuf) { + ($id, $len) = unpack "LL", $rbuf; + 8 + $len <= length $rbuf or last; - my @r = $t->(substr $rbuf, 4, $len); - substr $rbuf, 0, $len + 4, ""; - - if (pop @r) { - $on_event->(@r); - } elsif (@rcb) { - (shift @rcb)->(@r); + my @r = $t->(substr $rbuf, 8, $len); + substr $rbuf, 0, 8 + $len, ""; + + if ($id) { + if (@rcb) { + (shift @rcb)->(@r); + } elsif (my $cb = delete $rcb{$id}) { + $cb->(@r); + } else { + undef $rw; undef $ww; + $on_error->("unexpected data from child"); + } } else { - undef $rw; undef $ww; - $on_error->("unexpected data from child"); + $on_event->(@r); } } } elsif (defined $len) { undef $rw; undef $ww; # it ends here - $on_error->("unexpected eof") - if @rcb; + + if (@rcb || %rcb) { + use Data::Dump;ddx[\@rcb,\%rcb];#d# + $on_error->("unexpected eof"); + } else { + $on_destroy->(); + } } elsif ($! != Errno::EAGAIN && $! != Errno::EWOULDBLOCK) { undef $rw; undef $ww; # it ends here $on_error->("read: $!"); @@ -229,39 +510,65 @@ $ww ||= $fh && AE::io $fh, 1, $wcb; }; - sub { - push @rcb, pop; + my $id; - $guard; # keep it alive + $arg{async} + ? sub { + $id = ($id == 0xffffffff ? 0 : $id) + 1; + $id = ($id == 0xffffffff ? 0 : $id) + 1 while exists $rcb{$id}; # rarely loops - $wbuf .= pack "L/a*", &$f; - $ww ||= $fh && AE::io $fh, 1, $wcb; - } + $rcb{$id} = pop; + + $guard; # keep it alive + + $wbuf .= pack "LL/a*", $id, &$f; + $ww ||= $fh && AE::io $fh, 1, $wcb; + } + : sub { + push @rcb, pop; + + $guard; # keep it alive + + $wbuf .= pack "L/a*", &$f; + $ww ||= $fh && AE::io $fh, 1, $wcb; + } } -=back +=item $rpc->(..., $cb->(...)) -=head1 CHILD PROCESS USAGE +The RPC object returned by C is actually a code +reference. There are two things you can do with it: call it, and let it go +out of scope (let it get destroyed). + +If C was false when C<$rpc> was created (the default), then, if you +call C<$rpc>, the C<$function> is invoked with all arguments passed to +C<$rpc> except the last one (the callback). When the function returns, the +callback will be invoked with all the return values. + +If C was true, then the C<$function> receives an additional +initial argument, the result callback. In this case, returning from +C<$function> does nothing - the function only counts as "done" when the +result callback is called, and any arguments passed to it are considered +the return values. This makes it possible to "return" from event handlers +or e.g. Coro threads. + +The other thing that can be done with the RPC object is to destroy it. In +this case, the child process will execute all remaining RPC calls, report +their results, and then exit. -These functions are not available in this module. They are only available -in the namespace of this module when the child is running, without -having to load any extra module. They are part of the child-side API of -L. +See the examples section earlier in this document for some actual +examples. -=over 4 +=back -=item AnyEvent::Fork::RPC::quit +=head1 CHILD PROCESS USAGE -This function can be called to gracefully stop the child process when it -is idle. +The following function is not available in this module. They are only +available in the namespace of this module when the child is running, +without having to load any extra modules. They are part of the child-side +API of L. -After this function is called, the process stops handling incoming RPC -requests, but outstanding events and function return values will be sent -to the parent. When all data has been sent, the process calls C. - -Since the parent might not expect the child to exit at random points in -time, it is often better to signal the parent by sending an C and -letting the parent close down the child process. +=over 4 =item AnyEvent::Fork::RPC::event ... @@ -269,6 +576,9 @@ child process to the parent, except that there is no notion of return values. +See the examples section earlier in this document for some actual +examples. + =back =head1 SEE ALSO