--- AnyEvent-Fork-RPC/RPC.pm 2013/04/18 14:07:15 1.16 +++ AnyEvent-Fork-RPC/RPC.pm 2016/05/12 16:54:43 1.39 @@ -4,8 +4,8 @@ =head1 SYNOPSIS + use AnyEvent::Fork; use AnyEvent::Fork::RPC; - # use AnyEvent::Fork is not needed my $rpc = AnyEvent::Fork ->new @@ -28,8 +28,9 @@ =head1 DESCRIPTION This module implements a simple RPC protocol and backend for processes -created via L, allowing you to call a function in the -child process and receive its return values (up to 4GB serialised). +created via L or L, allowing you +to call a function in the child process and receive its return values (up +to 4GB serialised). It implements two different backends: a synchronous one that works like a normal function call, and an asynchronous one that can run multiple jobs @@ -38,9 +39,6 @@ It also implements an asynchronous event mechanism from the child to the parent, that could be used for progress indications or other information. -Loading this module also always loads L, so you can make a -separate C if you wish, but you don't have to. - =head1 EXAMPLES =head2 Example 1: Synchronous Backend @@ -53,6 +51,7 @@ First the parent process: use AnyEvent; + use AnyEvent::Fork; use AnyEvent::Fork::RPC; my $done = AE::cv; @@ -61,7 +60,7 @@ ->new ->require ("MyWorker") ->AnyEvent::Fork::RPC::run ("MyWorker::run", - on_error => sub { warn "FATAL: $_[0]"; exit 1 }, + on_error => sub { warn "ERROR: $_[0]"; exit 1 }, on_event => sub { warn "$_[0] requests handled\n" }, on_destroy => $done, ); @@ -178,7 +177,7 @@ =head2 Example 2: Asynchronous Backend This example implements multiple count-downs in the child, using -L timers. While this is a bit silly (one could use timers in te +L timers. While this is a bit silly (one could use timers in the parent just as well), it illustrates the ability to use AnyEvent in the child and the fact that responses can arrive in a different order then the requests. @@ -193,6 +192,7 @@ Without further ado, here is the code: use AnyEvent; + use AnyEvent::Fork; use AnyEvent::Fork::RPC; my $done = AE::cv; @@ -203,7 +203,7 @@ ->eval (do { local $/; }) ->AnyEvent::Fork::RPC::run ("run", async => 1, - on_error => sub { warn "FATAL: $_[0]"; exit 1 }, + on_error => sub { warn "ERROR: $_[0]"; exit 1 }, on_event => sub { print $_[0] }, on_destroy => $done, ); @@ -289,6 +289,93 @@ actually fork, you are free to use about any module in the child, not just L, but also L, or L for example. +=head2 Example 3: Asynchronous backend with Coro + +With L you can create a nice asynchronous backend implementation by +defining an rpc server function that creates a new Coro thread for every +request that calls a function "normally", i.e. the parameters from the +parent process are passed to it, and any return values are returned to the +parent process, e.g.: + + package My::Arith; + + sub add { + return $_[0] + $_[1]; + } + + sub mul { + return $_[0] * $_[1]; + } + + sub run { + my ($done, $func, @arg) = @_; + + Coro::async_pool { + $done->($func->(@arg)); + }; + } + +The C function creates a new thread for every invocation, using the +first argument as function name, and calls the C<$done> callback on it's +return values. This makes it quite natural to define the C and C +functions to add or multiply two numbers and return the result. + +Since this is the asynchronous backend, it's quite possible to define RPC +function that do I/O or wait for external events - their execution will +overlap as needed. + +The above could be used like this: + + my $rpc = AnyEvent::Fork + ->new + ->require ("MyWorker") + ->AnyEvent::Fork::RPC::run ("My::Arith::run", + on_error => ..., on_event => ..., on_destroy => ..., + ); + + $rpc->(add => 1, 3, Coro::rouse_cb); say Coro::rouse_wait; + $rpc->(mul => 3, 2, Coro::rouse_cb); say Coro::rouse_wait; + +The C's will print C<4> and C<6>. + +=head2 Example 4: Forward AnyEvent::Log messages using C + +This partial example shows how to use the C function to forward +L messages to the parent. + +For this, the parent needs to provide a suitable C: + + ->AnyEvent::Fork::RPC::run ( + on_event => sub { + if ($_[0] eq "ae_log") { + my (undef, $level, $message) = @_; + AE::log $level, $message; + } else { + # other event types + } + }, + ) + +In the child, as early as possible, the following code should reconfigure +L to log via C: + + $AnyEvent::Log::LOG->log_cb (sub { + my ($timestamp, $orig_ctx, $level, $message) = @{+shift}; + + if (defined &AnyEvent::Fork::RPC::event) { + AnyEvent::Fork::RPC::event (ae_log => $level, $message); + } else { + warn "[$$ before init] $message\n"; + } + }); + +There is an important twist - the C function +is only defined when the child is fully initialised. If you redirect the +log messages in your C function for example, then the C +function might not yet be available. This is why the log callback checks +whether the fucntion is there using C, and only then uses it to +log the message. + =head1 PARENT PROCESS USAGE This module exports nothing, and only implements a single function: @@ -305,9 +392,8 @@ use Guard (); use AnyEvent; -use AnyEvent::Fork; # we don't actually depend on it, this is for convenience -our $VERSION = 0.1; +our $VERSION = 1.22; =item my $rpc = AnyEvent::Fork::RPC::run $fork, $function, [key => value...] @@ -337,8 +423,8 @@ callback is called with the first argument being the string C, followed by the error message. -If neither handler is provided it prints the error to STDERR and will -start failing badly. +If neither handler is provided, then the error is reported with loglevel +C via C. =item on_event => $cb->(...) @@ -370,6 +456,20 @@ used to load any modules that provide the serialiser or function. It can not, however, create events. +=item done => $function (default C) + +The function to call when the asynchronous backend detects an end of file +condition when reading from the communications socket I there are no +outstanding requests. It's ignored by the synchronous backend. + +By overriding this you can prolong the life of a RPC process after e.g. +the parent has exited by running the event loop in the provided function +(or simply calling it, for example, when your child process uses L you +could provide L as C function). + +Of course, in that case you are responsible for exiting at the appropriate +time and not returning from + =item async => $boolean (default: 0) The default server used in the child does all I/O blockingly, and only @@ -396,8 +496,10 @@ transferred between the processes. For this, they have to be frozen and thawed in both parent and child processes. -By default, only octet strings can be passed between the processes, which -is reasonably fast and efficient and requires no extra modules. +By default, only octet strings can be passed between the processes, +which is reasonably fast and efficient and requires no extra modules +(the C distribution does not provide these extra +serialiser modules). For more complicated use cases, you can provide your own freeze and thaw functions, by specifying a string with perl source code. It's supposed to @@ -409,15 +511,20 @@ pre-load it into your L process, or you can add a C or C statement into the serialiser string. Or both. -Here are some examples - some of them are also available as global +Here are some examples - all of them are also available as global variables that make them easier to use. =over 4 -=item octet strings - C<$AnyEvent::Fork::RPC::STRING_SERIALISER> +=item C<$AnyEvent::Fork::RPC::STRING_SERIALISER> - octet strings only -This serialiser concatenates length-prefixes octet strings, and is the -default. +This serialiser (currently the default) concatenates length-prefixes octet +strings, and is the default. That means you can only pass (and return) +strings containing character codes 0-255. + +The main advantages of this serialiser are the high speed and that it +doesn't need another module. The main disadvantage is that you are very +limited in what you can pass - only octet strings. Implementation: @@ -426,7 +533,30 @@ sub { unpack "(w/a*)*", shift } ) -=item json - C<$AnyEvent::Fork::RPC::JSON_SERIALISER> +=item C<$AnyEvent::Fork::RPC::CBOR_XS_SERIALISER> - uses L + +This serialiser creates CBOR::XS arrays - you have to make sure the +L module is installed for this serialiser to work. It can be +beneficial for sharing when you preload the L module in a template +process. + +L is about as fast as the octet string serialiser, but supports +complex data structures (similar to JSON) and is faster than any of the +other serialisers. If you have the L module available, it's the +best choice. + +The encoder enables C (so this serialisation method can +encode cyclic and self-referencing data structures). + +Implementation: + + use CBOR::XS (); + ( + sub { CBOR::XS::encode_cbor_sharing \@_ }, + sub { @{ CBOR::XS::decode_cbor shift } } + ) + +=item C<$AnyEvent::Fork::RPC::JSON_SERIALISER> - uses L or L This serialiser creates JSON arrays - you have to make sure the L module is installed for this serialiser to work. It can be beneficial for @@ -444,11 +574,12 @@ sub { @{ JSON::decode_json shift } } ) -=item storable - C<$AnyEvent::Fork::RPC::STORABLE_SERIALISER> +=item C<$AnyEvent::Fork::RPC::STORABLE_SERIALISER> - L This serialiser uses L, which means it has high chance of serialising just about anything you throw at it, at the cost of having -very high overhead per operation. It also comes with perl. +very high overhead per operation. It also comes with perl. It should be +used when you need to serialise complex data structures. Implementation: @@ -458,6 +589,21 @@ sub { @{ Storable::thaw shift } } ) +=item C<$AnyEvent::Fork::RPC::NSTORABLE_SERIALISER> - portable Storable + +This serialiser also uses L, but uses it's "network" format +to serialise data, which makes it possible to talk to different +perl binaries (for example, when talking to a process created with +L). + +Implementation: + + use Storable (); + ( + sub { Storable::nfreeze \@_ }, + sub { @{ Storable::thaw shift } } + ) + =back =back @@ -467,9 +613,11 @@ =cut -our $STRING_SERIALISER = '(sub { pack "(w/a*)*", @_ }, sub { unpack "(w/a*)*", shift })'; -our $JSON_SERIALISER = 'use JSON (); (sub { JSON::encode_json \@_ }, sub { @{ JSON::decode_json shift } })'; -our $STORABLE_SERIALISER = 'use Storable (); (sub { Storable::freeze \@_ }, sub { @{ Storable::thaw shift } })'; +our $STRING_SERIALISER = '(sub { pack "(w/a*)*", @_ }, sub { unpack "(w/a*)*", shift })'; +our $CBOR_XS_SERIALISER = 'use CBOR::XS (); (sub { CBOR::XS::encode_cbor_sharing \@_ }, sub { @{ CBOR::XS::decode_cbor shift } })'; +our $JSON_SERIALISER = 'use JSON (); (sub { JSON::encode_json \@_ }, sub { @{ JSON::decode_json shift } })'; +our $STORABLE_SERIALISER = 'use Storable (); (sub { Storable::freeze \@_ }, sub { @{ Storable::thaw shift } })'; +our $NSTORABLE_SERIALISER = 'use Storable (); (sub { Storable::nfreeze \@_ }, sub { @{ Storable::thaw shift } })'; sub run { my ($self, $function, %arg) = @_; @@ -482,7 +630,7 @@ # default for on_error is to on_event, if specified $on_error ||= $on_event ? sub { $on_event->(error => shift) } - : sub { die "AnyEvent::Fork::RPC: uncaught error: $_[0].\n" }; + : sub { AE::log die => "AnyEvent::Fork::RPC: uncaught error: $_[0]." }; # default for on_event is to raise an error $on_event ||= sub { $on_error->("event received, but no on_event handler") }; @@ -513,7 +661,7 @@ my $module = "AnyEvent::Fork::RPC::" . ($arg{async} ? "Async" : "Sync"); $self->require ($module) - ->send_arg ($function, $arg{init}, $serialiser) + ->send_arg ($function, $arg{init}, $serialiser, $arg{done} || "$module\::do_exit") ->run ("$module\::run", sub { $fh = shift; @@ -524,7 +672,7 @@ if ($len) { while (8 <= length $rbuf) { - ($id, $len) = unpack "LL", $rbuf; + ($id, $len) = unpack "NN", $rbuf; 8 + $len <= length $rbuf or last; @@ -550,7 +698,8 @@ if (@rcb || %rcb) { $on_error->("unexpected eof"); } else { - $on_destroy->(); + $on_destroy->() + if $on_destroy; } } elsif ($! != Errno::EAGAIN && $! != Errno::EWOULDBLOCK) { undef $rw; undef $ww; # it ends here @@ -563,7 +712,8 @@ my $guard = Guard::guard { $shutdown = 1; - $ww ||= $fh && AE::io $fh, 1, $wcb; + + shutdown $fh, 1 if $fh && !$ww; }; my $id; @@ -575,9 +725,9 @@ $rcb{$id} = pop; - $guard; # keep it alive + $guard if 0; # keep it alive - $wbuf .= pack "LL/a*", $id, &$f; + $wbuf .= pack "NN/a*", $id, &$f; $ww ||= $fh && AE::io $fh, 1, $wcb; } : sub { @@ -585,7 +735,7 @@ $guard; # keep it alive - $wbuf .= pack "L/a*", &$f; + $wbuf .= pack "N/a*", &$f; $ww ||= $fh && AE::io $fh, 1, $wcb; } } @@ -637,6 +787,56 @@ =back +=head2 PROCESS EXIT + +If and when the child process exits depends on the backend and +configuration. Apart from explicit exits (e.g. by calling C) or +runtime conditions (uncaught exceptions, signals etc.), the backends exit +under these conditions: + +=over 4 + +=item Synchronous Backend + +The synchronous backend is very simple: when the process waits for another +request to arrive and the writing side (usually in the parent) is closed, +it will exit normally, i.e. as if your main program reached the end of the +file. + +That means that if your parent process exits, the RPC process will usually +exit as well, either because it is idle anyway, or because it executes a +request. In the latter case, you will likely get an error when the RPc +process tries to send the results to the parent (because agruably, you +shouldn't exit your parent while there are still outstanding requests). + +The process is usually quiescent when it happens, so it should rarely be a +problem, and C handlers can be used to clean up. + +=item Asynchronous Backend + +For the asynchronous backend, things are more complicated: Whenever it +listens for another request by the parent, it might detect that the socket +was closed (e.g. because the parent exited). It will sotp listening for +new requests and instead try to write out any remaining data (if any) or +simply check whether the socket can be written to. After this, the RPC +process is effectively done - no new requests are incoming, no outstanding +request data can be written back. + +Since chances are high that there are event watchers that the RPC server +knows nothing about (why else would one use the async backend if not for +the ability to register watchers?), the event loop would often happily +continue. + +This is why the asynchronous backend explicitly calls C when +it is done (under other circumstances, such as when there is an I/O error +and there is outstanding data to write, it will log a fatal message via +L, also causing the program to exit). + +You can override this by specifying a function name to call via the C +parameter instead. + +=back + =head1 ADVANCED TOPICS =head2 Choosing a backend @@ -699,7 +899,8 @@ problems. Alternatively, the parent could limit the amount of rpc calls that are outstanding. -Blocking use of condvars is not supported. +Blocking use of condvars is not supported (in the main thread, outside of +e.g. L threads). Using event-based modules such as L, L, L and so on is easy. @@ -726,6 +927,7 @@ Here is some (untested) pseudocode to that effect: use AnyEvent::Util; + use AnyEvent::Fork; use AnyEvent::Fork::RPC; use IO::FDPass; @@ -771,10 +973,19 @@ so you might want to look into L which can handle the gory details. +=head1 EXCEPTIONS + +There are no provisions whatsoever for catching exceptions at this time - +in the child, exceptions might kill the process, causing calls to be lost +and the parent encountering a fatal error. In the parent, exceptions in +the result callback will not be caught and cause undefined behaviour. + =head1 SEE ALSO L, to create the processes in the first place. +L, likewise, but helpful for remote processes. + L, to manage whole pools of processes. =head1 AUTHOR AND CONTACT INFORMATION