--- AnyEvent-Fork-RPC/RPC.pm 2013/04/28 14:34:40 1.27 +++ AnyEvent-Fork-RPC/RPC.pm 2013/08/25 21:52:15 1.29 @@ -28,7 +28,7 @@ =head1 DESCRIPTION This module implements a simple RPC protocol and backend for processes -created via L (or L), allowing you +created via L or L, allowing you to call a function in the child process and receive its return values (up to 4GB serialised). @@ -60,7 +60,7 @@ ->new ->require ("MyWorker") ->AnyEvent::Fork::RPC::run ("MyWorker::run", - on_error => sub { warn "FATAL: $_[0]"; exit 1 }, + on_error => sub { warn "ERROR: $_[0]"; exit 1 }, on_event => sub { warn "$_[0] requests handled\n" }, on_destroy => $done, ); @@ -203,7 +203,7 @@ ->eval (do { local $/; }) ->AnyEvent::Fork::RPC::run ("run", async => 1, - on_error => sub { warn "FATAL: $_[0]"; exit 1 }, + on_error => sub { warn "ERROR: $_[0]"; exit 1 }, on_event => sub { print $_[0] }, on_destroy => $done, ); @@ -289,6 +289,55 @@ actually fork, you are free to use about any module in the child, not just L, but also L, or L for example. +=head2 Example 3: Asynchronous backend with Coro + +With L you can create a nice asynchronous backend implementation by +defining an rpc server function that creates a new Coro thread for every +request that calls a function "normally", i.e. the parameters from the +parent process are passed to it, and any return values are returned to the +parent process, e.g.: + + package My::Arith; + + sub add { + return $_[0] + $_[1]; + } + + sub mul { + return $_[0] * $_[1]; + } + + sub run { + my ($done, $func, @arg) = @_; + + Coro::async_pool { + $done->($func->(@arg)); + }; + } + +The C function creates a new thread for every invocation, using the +first argument as function name, and calls the C<$done> callback on it's +return values. This makes it quite natural to define the C and C +functions to add or multiply two numbers and return the result. + +Since this is the asynchronous backend, it's quite possible to define RPC +function that do I/O or wait for external events - their execution will +overlap as needed. + +The above could be used like this: + + my $rpc = AnyEvent::Fork + ->new + ->require ("MyWorker") + ->AnyEvent::Fork::RPC::run ("My::Arith::run", + on_error => ..., on_event => ..., on_destroy => ..., + ); + + $rpc->(add => 1, 3, Coro::rouse_cb); say Coro::rouse_wait; + $rpc->(mul => 3, 2, Coro::rouse_cb); say Coro::rouse_wait; + +The C's will print C<4> and C<6>. + =head1 PARENT PROCESS USAGE This module exports nothing, and only implements a single function: @@ -336,8 +385,8 @@ callback is called with the first argument being the string C, followed by the error message. -If neither handler is provided it prints the error to STDERR and will -start failing badly. +If neither handler is provided, then the error is reported with loglevel +C via C. =item on_event => $cb->(...) @@ -416,7 +465,8 @@ =item octet strings - C<$AnyEvent::Fork::RPC::STRING_SERIALISER> This serialiser concatenates length-prefixes octet strings, and is the -default. +default. That means you can only pass (and return) strings containing +character codes 0-255. Implementation: @@ -447,7 +497,8 @@ This serialiser uses L, which means it has high chance of serialising just about anything you throw at it, at the cost of having -very high overhead per operation. It also comes with perl. +very high overhead per operation. It also comes with perl. It should be +used when you need to serialise complex data structures. Implementation: @@ -457,6 +508,21 @@ sub { @{ Storable::thaw shift } } ) +=item portable storable - C<$AnyEvent::Fork::RPC::NSTORABLE_SERIALISER> + +This serialiser also uses L, but uses it's "network" format +to serialise data, which makes it possible to talk to different +perl binaries (for example, when talking to a process created with +L). + +Implementation: + + use Storable (); + ( + sub { Storable::nfreeze \@_ }, + sub { @{ Storable::thaw shift } } + ) + =back =back @@ -466,9 +532,10 @@ =cut -our $STRING_SERIALISER = '(sub { pack "(w/a*)*", @_ }, sub { unpack "(w/a*)*", shift })'; -our $JSON_SERIALISER = 'use JSON (); (sub { JSON::encode_json \@_ }, sub { @{ JSON::decode_json shift } })'; -our $STORABLE_SERIALISER = 'use Storable (); (sub { Storable::freeze \@_ }, sub { @{ Storable::thaw shift } })'; +our $STRING_SERIALISER = '(sub { pack "(w/a*)*", @_ }, sub { unpack "(w/a*)*", shift })'; +our $JSON_SERIALISER = 'use JSON (); (sub { JSON::encode_json \@_ }, sub { @{ JSON::decode_json shift } })'; +our $STORABLE_SERIALISER = 'use Storable (); (sub { Storable::freeze \@_ }, sub { @{ Storable::thaw shift } })'; +our $NSTORABLE_SERIALISER = 'use Storable (); (sub { Storable::nfreeze \@_ }, sub { @{ Storable::thaw shift } })'; sub run { my ($self, $function, %arg) = @_; @@ -481,7 +548,7 @@ # default for on_error is to on_event, if specified $on_error ||= $on_event ? sub { $on_event->(error => shift) } - : sub { die "AnyEvent::Fork::RPC: uncaught error: $_[0].\n" }; + : sub { AE::log die => "AnyEvent::Fork::RPC: uncaught error: $_[0]." }; # default for on_event is to raise an error $on_event ||= sub { $on_error->("event received, but no on_event handler") };