--- AnyEvent-Fork-RPC/RPC.pm 2013/04/18 20:20:42 1.18 +++ AnyEvent-Fork-RPC/RPC.pm 2019/09/15 20:18:14 1.46 @@ -2,12 +2,10 @@ AnyEvent::Fork::RPC - simple RPC extension for AnyEvent::Fork -THE API IS NOT FINISHED, CONSIDER THIS A TECHNOLOGY DEMO - =head1 SYNOPSIS + use AnyEvent::Fork; use AnyEvent::Fork::RPC; - # use AnyEvent::Fork is not needed my $rpc = AnyEvent::Fork ->new @@ -30,8 +28,9 @@ =head1 DESCRIPTION This module implements a simple RPC protocol and backend for processes -created via L, allowing you to call a function in the -child process and receive its return values (up to 4GB serialised). +created via L or L, allowing you +to call a function in the child process and receive its return values (up +to 4GB serialised). It implements two different backends: a synchronous one that works like a normal function call, and an asynchronous one that can run multiple jobs @@ -40,9 +39,6 @@ It also implements an asynchronous event mechanism from the child to the parent, that could be used for progress indications or other information. -Loading this module also always loads L, so you can make a -separate C if you wish, but you don't have to. - =head1 EXAMPLES =head2 Example 1: Synchronous Backend @@ -55,6 +51,7 @@ First the parent process: use AnyEvent; + use AnyEvent::Fork; use AnyEvent::Fork::RPC; my $done = AE::cv; @@ -63,7 +60,7 @@ ->new ->require ("MyWorker") ->AnyEvent::Fork::RPC::run ("MyWorker::run", - on_error => sub { warn "FATAL: $_[0]"; exit 1 }, + on_error => sub { warn "ERROR: $_[0]"; exit 1 }, on_event => sub { warn "$_[0] requests handled\n" }, on_destroy => $done, ); @@ -180,7 +177,7 @@ =head2 Example 2: Asynchronous Backend This example implements multiple count-downs in the child, using -L timers. While this is a bit silly (one could use timers in te +L timers. While this is a bit silly (one could use timers in the parent just as well), it illustrates the ability to use AnyEvent in the child and the fact that responses can arrive in a different order then the requests. @@ -195,6 +192,7 @@ Without further ado, here is the code: use AnyEvent; + use AnyEvent::Fork; use AnyEvent::Fork::RPC; my $done = AE::cv; @@ -205,7 +203,7 @@ ->eval (do { local $/; }) ->AnyEvent::Fork::RPC::run ("run", async => 1, - on_error => sub { warn "FATAL: $_[0]"; exit 1 }, + on_error => sub { warn "ERROR: $_[0]"; exit 1 }, on_event => sub { print $_[0] }, on_destroy => $done, ); @@ -291,6 +289,93 @@ actually fork, you are free to use about any module in the child, not just L, but also L, or L for example. +=head2 Example 3: Asynchronous backend with Coro + +With L you can create a nice asynchronous backend implementation by +defining an rpc server function that creates a new Coro thread for every +request that calls a function "normally", i.e. the parameters from the +parent process are passed to it, and any return values are returned to the +parent process, e.g.: + + package My::Arith; + + sub add { + return $_[0] + $_[1]; + } + + sub mul { + return $_[0] * $_[1]; + } + + sub run { + my ($done, $func, @arg) = @_; + + Coro::async_pool { + $done->($func->(@arg)); + }; + } + +The C function creates a new thread for every invocation, using the +first argument as function name, and calls the C<$done> callback on it's +return values. This makes it quite natural to define the C and C +functions to add or multiply two numbers and return the result. + +Since this is the asynchronous backend, it's quite possible to define RPC +function that do I/O or wait for external events - their execution will +overlap as needed. + +The above could be used like this: + + my $rpc = AnyEvent::Fork + ->new + ->require ("MyWorker") + ->AnyEvent::Fork::RPC::run ("My::Arith::run", + on_error => ..., on_event => ..., on_destroy => ..., + ); + + $rpc->(add => 1, 3, Coro::rouse_cb); say Coro::rouse_wait; + $rpc->(mul => 3, 2, Coro::rouse_cb); say Coro::rouse_wait; + +The C's will print C<4> and C<6>. + +=head2 Example 4: Forward AnyEvent::Log messages using C + +This partial example shows how to use the C function to forward +L messages to the parent. + +For this, the parent needs to provide a suitable C: + + ->AnyEvent::Fork::RPC::run ( + on_event => sub { + if ($_[0] eq "ae_log") { + my (undef, $level, $message) = @_; + AE::log $level, $message; + } else { + # other event types + } + }, + ) + +In the child, as early as possible, the following code should reconfigure +L to log via C: + + $AnyEvent::Log::LOG->log_cb (sub { + my ($timestamp, $orig_ctx, $level, $message) = @{+shift}; + + if (defined &AnyEvent::Fork::RPC::event) { + AnyEvent::Fork::RPC::event (ae_log => $level, $message); + } else { + warn "[$$ before init] $message\n"; + } + }); + +There is an important twist - the C function +is only defined when the child is fully initialised. If you redirect the +log messages in your C function for example, then the C +function might not yet be available. This is why the log callback checks +whether the function is there using C, and only then uses it to +log the message. + =head1 PARENT PROCESS USAGE This module exports nothing, and only implements a single function: @@ -307,9 +392,8 @@ use Guard (); use AnyEvent; -use AnyEvent::Fork; # we don't actually depend on it, this is for convenience -our $VERSION = 0.1; +our $VERSION = '2.0'; =item my $rpc = AnyEvent::Fork::RPC::run $fork, $function, [key => value...] @@ -339,8 +423,8 @@ callback is called with the first argument being the string C, followed by the error message. -If neither handler is provided it prints the error to STDERR and will -start failing badly. +If neither handler is provided, then the error is reported with loglevel +C via C. =item on_event => $cb->(...) @@ -357,7 +441,7 @@ the parent must not exit either until all requests have been handled, and this can be accomplished by waiting for this callback. -=item init => $function (default none) +=item init => $function (default: none) When specified (by name), this function is called in the child as the very first thing when taking over the process, with all the arguments normally @@ -372,7 +456,21 @@ used to load any modules that provide the serialiser or function. It can not, however, create events. -=item async => $boolean (default: 0) +=item done => $function (default: C) + +The function to call when the asynchronous backend detects an end of file +condition when reading from the communications socket I there are no +outstanding requests. It is ignored by the synchronous backend. + +By overriding this you can prolong the life of a RPC process after e.g. +the parent has exited by running the event loop in the provided function +(or simply calling it, for example, when your child process uses L you +could provide L as C function). + +Of course, in that case you are responsible for exiting at the appropriate +time and not returning from + +=item async => $boolean (default: C<0>) The default server used in the child does all I/O blockingly, and only allows a single RPC call to execute concurrently. @@ -392,14 +490,16 @@ If you use a template process and want to fork both sync and async children, then it is permissible to load both modules. -=item serialiser => $string (default: $AnyEvent::Fork::RPC::STRING_SERIALISER) +=item serialiser => $string (default: C<$AnyEvent::Fork::RPC::STRING_SERIALISER>) All arguments, result data and event data have to be serialised to be transferred between the processes. For this, they have to be frozen and thawed in both parent and child processes. -By default, only octet strings can be passed between the processes, which -is reasonably fast and efficient and requires no extra modules. +By default, only octet strings can be passed between the processes, +which is reasonably fast and efficient and requires no extra modules +(the C distribution does not provide these extra +serialiser modules). For more complicated use cases, you can provide your own freeze and thaw functions, by specifying a string with perl source code. It's supposed to @@ -411,15 +511,20 @@ pre-load it into your L process, or you can add a C or C statement into the serialiser string. Or both. -Here are some examples - some of them are also available as global +Here are some examples - all of them are also available as global variables that make them easier to use. =over 4 -=item octet strings - C<$AnyEvent::Fork::RPC::STRING_SERIALISER> +=item C<$AnyEvent::Fork::RPC::STRING_SERIALISER> - octet strings only -This serialiser concatenates length-prefixes octet strings, and is the -default. +This serialiser (currently the default) concatenates length-prefixes octet +strings, and is the default. That means you can only pass (and return) +strings containing character codes 0-255. + +The main advantages of this serialiser are the high speed and that it +doesn't need another module. The main disadvantage is that you are very +limited in what you can pass - only octet strings. Implementation: @@ -428,7 +533,30 @@ sub { unpack "(w/a*)*", shift } ) -=item json - C<$AnyEvent::Fork::RPC::JSON_SERIALISER> +=item C<$AnyEvent::Fork::RPC::CBOR_XS_SERIALISER> - uses L + +This serialiser creates CBOR::XS arrays - you have to make sure the +L module is installed for this serialiser to work. It can be +beneficial for sharing when you preload the L module in a template +process. + +L is about as fast as the octet string serialiser, but supports +complex data structures (similar to JSON) and is faster than any of the +other serialisers. If you have the L module available, it's the +best choice. + +The encoder enables C (so this serialisation method can +encode cyclic and self-referencing data structures). + +Implementation: + + use CBOR::XS (); + ( + sub { CBOR::XS::encode_cbor_sharing \@_ }, + sub { @{ CBOR::XS::decode_cbor shift } } + ) + +=item C<$AnyEvent::Fork::RPC::JSON_SERIALISER> - uses L or L This serialiser creates JSON arrays - you have to make sure the L module is installed for this serialiser to work. It can be beneficial for @@ -446,11 +574,12 @@ sub { @{ JSON::decode_json shift } } ) -=item storable - C<$AnyEvent::Fork::RPC::STORABLE_SERIALISER> +=item C<$AnyEvent::Fork::RPC::STORABLE_SERIALISER> - L This serialiser uses L, which means it has high chance of serialising just about anything you throw at it, at the cost of having -very high overhead per operation. It also comes with perl. +very high overhead per operation. It also comes with perl. It should be +used when you need to serialise complex data structures. Implementation: @@ -460,8 +589,44 @@ sub { @{ Storable::thaw shift } } ) +=item C<$AnyEvent::Fork::RPC::NSTORABLE_SERIALISER> - portable Storable + +This serialiser also uses L, but uses it's "network" format +to serialise data, which makes it possible to talk to different +perl binaries (for example, when talking to a process created with +L). + +Implementation: + + use Storable (); + ( + sub { Storable::nfreeze \@_ }, + sub { @{ Storable::thaw shift } } + ) + =back +=item buflen => $bytes (default: C<512 - 16>) + +The starting size of the read buffer for request and response data. + +C ensures that the buffer for reeading request and +response data is large enough for at leats aingle request or response, and +will dynamically enlarge the buffer if needed. + +While this ensures that memory is not overly wasted, it typically leads +to having to do one syscall per request, which can be inefficient in some +cases. In such cases, it can be beneficient to increase the buffer size to +hold more than one request. + +=item buflen_req => $bytes (default: same as C) + +Overrides C for request data (as read by the forked process). + +=item buflen_res => $bytes (default: same as C) + +Overrides C for response data (replies read by the parent process). + =back See the examples section earlier in this document for some actual @@ -469,9 +634,11 @@ =cut -our $STRING_SERIALISER = '(sub { pack "(w/a*)*", @_ }, sub { unpack "(w/a*)*", shift })'; -our $JSON_SERIALISER = 'use JSON (); (sub { JSON::encode_json \@_ }, sub { @{ JSON::decode_json shift } })'; -our $STORABLE_SERIALISER = 'use Storable (); (sub { Storable::freeze \@_ }, sub { @{ Storable::thaw shift } })'; +our $STRING_SERIALISER = '(sub { pack "(w/a*)*", @_ }, sub { unpack "(w/a*)*", shift })'; +our $CBOR_XS_SERIALISER = 'use CBOR::XS (); (sub { CBOR::XS::encode_cbor_sharing \@_ }, sub { @{ CBOR::XS::decode_cbor shift } })'; +our $JSON_SERIALISER = 'use JSON (); (sub { JSON::encode_json \@_ }, sub { @{ JSON::decode_json shift } })'; +our $STORABLE_SERIALISER = 'use Storable (); (sub { Storable::freeze \@_ }, sub { @{ Storable::thaw shift } })'; +our $NSTORABLE_SERIALISER = 'use Storable (); (sub { Storable::nfreeze \@_ }, sub { @{ Storable::thaw shift } })'; sub run { my ($self, $function, %arg) = @_; @@ -484,7 +651,7 @@ # default for on_error is to on_event, if specified $on_error ||= $on_event ? sub { $on_event->(error => shift) } - : sub { die "AnyEvent::Fork::RPC: uncaught error: $_[0].\n" }; + : sub { AE::log die => "AnyEvent::Fork::RPC: uncaught error: $_[0]." }; # default for on_event is to raise an error $on_event ||= sub { $on_error->("event received, but no on_event handler") }; @@ -492,7 +659,7 @@ my ($f, $t) = eval $serialiser; die $@ if $@; my (@rcb, %rcb, $fh, $shutdown, $wbuf, $ww); - my ($rlen, $rbuf, $rw) = 512 - 16; + my ($rlen, $rbuf, $rw) = $arg{buflen_res} || $arg{buflen} || 512 - 16; my $wcb = sub { my $len = syswrite $fh, $wbuf; @@ -514,10 +681,18 @@ my $module = "AnyEvent::Fork::RPC::" . ($arg{async} ? "Async" : "Sync"); - $self->require ($module) - ->send_arg ($function, $arg{init}, $serialiser) + $self->eval ("use $module 2 ()") + ->send_arg ( + function => $function, + init => $arg{init}, + serialiser => $serialiser, + done => $arg{done} || "$module\::do_exit", + rlen => $arg{buflen_req} || $arg{buflen} || 512 - 16, + -10 # the above are 10 arguments + ) ->run ("$module\::run", sub { - $fh = shift; + $fh = shift + or return $on_error->("connection failed"); my ($id, $len); $rw = AE::io $fh, 0, sub { @@ -526,7 +701,7 @@ if ($len) { while (8 <= length $rbuf) { - ($id, $len) = unpack "LL", $rbuf; + ($id, $len) = unpack "NN", $rbuf; 8 + $len <= length $rbuf or last; @@ -552,7 +727,8 @@ if (@rcb || %rcb) { $on_error->("unexpected eof"); } else { - $on_destroy->(); + $on_destroy->() + if $on_destroy; } } elsif ($! != Errno::EAGAIN && $! != Errno::EWOULDBLOCK) { undef $rw; undef $ww; # it ends here @@ -566,7 +742,7 @@ my $guard = Guard::guard { $shutdown = 1; - $wcb->() if $fh && !$ww; + shutdown $fh, 1 if $fh && !$ww; }; my $id; @@ -578,9 +754,9 @@ $rcb{$id} = pop; - $guard; # keep it alive + $guard if 0; # keep it alive - $wbuf .= pack "LL/a*", $id, &$f; + $wbuf .= pack "NN/a*", $id, &$f; $ww ||= $fh && AE::io $fh, 1, $wcb; } : sub { @@ -588,7 +764,7 @@ $guard; # keep it alive - $wbuf .= pack "L/a*", &$f; + $wbuf .= pack "N/a*", &$f; $ww ||= $fh && AE::io $fh, 1, $wcb; } } @@ -627,9 +803,21 @@ without having to load any extra modules. They are part of the child-side API of L. +Note that these functions are typically not yet declared when code is +compiled into the child, because the backend module is only loaded when +you call C, which is typically the last method you call on the fork +object. + +Therefore, you either have to explicitly pre-load the right backend module +or mark calls to these functions as function calls, e.g.: + + AnyEvent::Fork::RPC::event (0 => "five"); + AnyEvent::Fork::RPC::event->(0 => "five"); + &AnyEvent::Fork::RPC::flush; + =over 4 -=item AnyEvent::Fork::RPC::event ... +=item AnyEvent::Fork::RPC::event (...) Send an event to the parent. Events are a bit like RPC calls made by the child process to the parent, except that there is no notion of return @@ -638,6 +826,100 @@ See the examples section earlier in this document for some actual examples. +Note: the event data, like any data send to the parent, might not be sent +immediatelly but queued for later sending, so there is no guarantee that +the event has been sent to the parent when the call returns - when you +e.g. exit directly after calling this function, the parent might never +receive the event. See the next function for a remedy. + +=item $success = AnyEvent::Fork::RPC::flush () + +Synchronously wait and flush the reply data to the parent. Returns true on +success and false otherwise (i.e. when the reply data cannot be written at +all). Ignoring the success status is a common and healthy behaviour. + +Only the "async" backend does something on C - the "sync" backend +is not buffering reply data and always returns true from this function. + +Normally, reply data might or might not be written to the parent +immediatelly but is buffered. This can greatly improve performance and +efficiency, but sometimes can get in your way: for example. when you want +to send an error message just before exiting, or when you want to ensure +replies timely reach the parent before starting a long blocking operation. + +In these cases, you can call this function to flush any outstanding reply +data to the parent. This is done blockingly, so no requests will be +handled and no event callbacks will be called. + +For example, you could wrap your request function in a C block and +report the exception string back to the caller just before exiting: + + sub req { + ... + + eval { + ... + }; + + if ($@) { + AnyEvent::RPC::event (throw => "$@"); + AnyEvent::RPC::flush (); + exit; + } + + ... + } + +=back + +=head2 PROCESS EXIT + +If and when the child process exits depends on the backend and +configuration. Apart from explicit exits (e.g. by calling C) or +runtime conditions (uncaught exceptions, signals etc.), the backends exit +under these conditions: + +=over 4 + +=item Synchronous Backend + +The synchronous backend is very simple: when the process waits for another +request to arrive and the writing side (usually in the parent) is closed, +it will exit normally, i.e. as if your main program reached the end of the +file. + +That means that if your parent process exits, the RPC process will usually +exit as well, either because it is idle anyway, or because it executes a +request. In the latter case, you will likely get an error when the RPc +process tries to send the results to the parent (because agruably, you +shouldn't exit your parent while there are still outstanding requests). + +The process is usually quiescent when it happens, so it should rarely be a +problem, and C handlers can be used to clean up. + +=item Asynchronous Backend + +For the asynchronous backend, things are more complicated: Whenever it +listens for another request by the parent, it might detect that the socket +was closed (e.g. because the parent exited). It will sotp listening for +new requests and instead try to write out any remaining data (if any) or +simply check whether the socket can be written to. After this, the RPC +process is effectively done - no new requests are incoming, no outstanding +request data can be written back. + +Since chances are high that there are event watchers that the RPC server +knows nothing about (why else would one use the async backend if not for +the ability to register watchers?), the event loop would often happily +continue. + +This is why the asynchronous backend explicitly calls C when +it is done (under other circumstances, such as when there is an I/O error +and there is outstanding data to write, it will log a fatal message via +L, also causing the program to exit). + +You can override this by specifying a function name to call via the C +parameter instead. + =back =head1 ADVANCED TOPICS @@ -702,7 +984,8 @@ problems. Alternatively, the parent could limit the amount of rpc calls that are outstanding. -Blocking use of condvars is not supported. +Blocking use of condvars is not supported (in the main thread, outside of +e.g. L threads). Using event-based modules such as L, L, L and so on is easy. @@ -729,6 +1012,7 @@ Here is some (untested) pseudocode to that effect: use AnyEvent::Util; + use AnyEvent::Fork; use AnyEvent::Fork::RPC; use IO::FDPass; @@ -774,10 +1058,19 @@ so you might want to look into L which can handle the gory details. +=head1 EXCEPTIONS + +There are no provisions whatsoever for catching exceptions at this time - +in the child, exceptions might kill the process, causing calls to be lost +and the parent encountering a fatal error. In the parent, exceptions in +the result callback will not be caught and cause undefined behaviour. + =head1 SEE ALSO L, to create the processes in the first place. +L, likewise, but helpful for remote processes. + L, to manage whole pools of processes. =head1 AUTHOR AND CONTACT INFORMATION