ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-Fork-RPC/RPC.pm
(Generate patch)

Comparing AnyEvent-Fork-RPC/RPC.pm (file contents):
Revision 1.27 by root, Sun Apr 28 14:34:40 2013 UTC vs.
Revision 1.29 by root, Sun Aug 25 21:52:15 2013 UTC

26 $cv->recv; 26 $cv->recv;
27 27
28=head1 DESCRIPTION 28=head1 DESCRIPTION
29 29
30This module implements a simple RPC protocol and backend for processes 30This module implements a simple RPC protocol and backend for processes
31created via L<AnyEvent::Fork> (or L<AnyEvent::Fork::Remote>), allowing you 31created via L<AnyEvent::Fork> or L<AnyEvent::Fork::Remote>, allowing you
32to call a function in the child process and receive its return values (up 32to call a function in the child process and receive its return values (up
33to 4GB serialised). 33to 4GB serialised).
34 34
35It implements two different backends: a synchronous one that works like a 35It implements two different backends: a synchronous one that works like a
36normal function call, and an asynchronous one that can run multiple jobs 36normal function call, and an asynchronous one that can run multiple jobs
58 58
59 my $rpc = AnyEvent::Fork 59 my $rpc = AnyEvent::Fork
60 ->new 60 ->new
61 ->require ("MyWorker") 61 ->require ("MyWorker")
62 ->AnyEvent::Fork::RPC::run ("MyWorker::run", 62 ->AnyEvent::Fork::RPC::run ("MyWorker::run",
63 on_error => sub { warn "FATAL: $_[0]"; exit 1 }, 63 on_error => sub { warn "ERROR: $_[0]"; exit 1 },
64 on_event => sub { warn "$_[0] requests handled\n" }, 64 on_event => sub { warn "$_[0] requests handled\n" },
65 on_destroy => $done, 65 on_destroy => $done,
66 ); 66 );
67 67
68 for my $id (1..6) { 68 for my $id (1..6) {
201 ->new 201 ->new
202 ->require ("AnyEvent::Fork::RPC::Async") 202 ->require ("AnyEvent::Fork::RPC::Async")
203 ->eval (do { local $/; <DATA> }) 203 ->eval (do { local $/; <DATA> })
204 ->AnyEvent::Fork::RPC::run ("run", 204 ->AnyEvent::Fork::RPC::run ("run",
205 async => 1, 205 async => 1,
206 on_error => sub { warn "FATAL: $_[0]"; exit 1 }, 206 on_error => sub { warn "ERROR: $_[0]"; exit 1 },
207 on_event => sub { print $_[0] }, 207 on_event => sub { print $_[0] },
208 on_destroy => $done, 208 on_destroy => $done,
209 ); 209 );
210 210
211 for my $count (3, 2, 1) { 211 for my $count (3, 2, 1) {
287 287
288This concludes the async example. Since L<AnyEvent::Fork> does not 288This concludes the async example. Since L<AnyEvent::Fork> does not
289actually fork, you are free to use about any module in the child, not just 289actually fork, you are free to use about any module in the child, not just
290L<AnyEvent>, but also L<IO::AIO>, or L<Tk> for example. 290L<AnyEvent>, but also L<IO::AIO>, or L<Tk> for example.
291 291
292=head2 Example 3: Asynchronous backend with Coro
293
294With L<Coro> you can create a nice asynchronous backend implementation by
295defining an rpc server function that creates a new Coro thread for every
296request that calls a function "normally", i.e. the parameters from the
297parent process are passed to it, and any return values are returned to the
298parent process, e.g.:
299
300 package My::Arith;
301
302 sub add {
303 return $_[0] + $_[1];
304 }
305
306 sub mul {
307 return $_[0] * $_[1];
308 }
309
310 sub run {
311 my ($done, $func, @arg) = @_;
312
313 Coro::async_pool {
314 $done->($func->(@arg));
315 };
316 }
317
318The C<run> function creates a new thread for every invocation, using the
319first argument as function name, and calls the C<$done> callback on it's
320return values. This makes it quite natural to define the C<add> and C<mul>
321functions to add or multiply two numbers and return the result.
322
323Since this is the asynchronous backend, it's quite possible to define RPC
324function that do I/O or wait for external events - their execution will
325overlap as needed.
326
327The above could be used like this:
328
329 my $rpc = AnyEvent::Fork
330 ->new
331 ->require ("MyWorker")
332 ->AnyEvent::Fork::RPC::run ("My::Arith::run",
333 on_error => ..., on_event => ..., on_destroy => ...,
334 );
335
336 $rpc->(add => 1, 3, Coro::rouse_cb); say Coro::rouse_wait;
337 $rpc->(mul => 3, 2, Coro::rouse_cb); say Coro::rouse_wait;
338
339The C<say>'s will print C<4> and C<6>.
340
292=head1 PARENT PROCESS USAGE 341=head1 PARENT PROCESS USAGE
293 342
294This module exports nothing, and only implements a single function: 343This module exports nothing, and only implements a single function:
295 344
296=over 4 345=over 4
334Called on (fatal) errors, with a descriptive (hopefully) message. If 383Called on (fatal) errors, with a descriptive (hopefully) message. If
335this callback is not provided, but C<on_event> is, then the C<on_event> 384this callback is not provided, but C<on_event> is, then the C<on_event>
336callback is called with the first argument being the string C<error>, 385callback is called with the first argument being the string C<error>,
337followed by the error message. 386followed by the error message.
338 387
339If neither handler is provided it prints the error to STDERR and will 388If neither handler is provided, then the error is reported with loglevel
340start failing badly. 389C<error> via C<AE::log>.
341 390
342=item on_event => $cb->(...) 391=item on_event => $cb->(...)
343 392
344Called for every call to the C<AnyEvent::Fork::RPC::event> function in the 393Called for every call to the C<AnyEvent::Fork::RPC::event> function in the
345child, with the arguments of that function passed to the callback. 394child, with the arguments of that function passed to the callback.
414=over 4 463=over 4
415 464
416=item octet strings - C<$AnyEvent::Fork::RPC::STRING_SERIALISER> 465=item octet strings - C<$AnyEvent::Fork::RPC::STRING_SERIALISER>
417 466
418This serialiser concatenates length-prefixes octet strings, and is the 467This serialiser concatenates length-prefixes octet strings, and is the
419default. 468default. That means you can only pass (and return) strings containing
469character codes 0-255.
420 470
421Implementation: 471Implementation:
422 472
423 ( 473 (
424 sub { pack "(w/a*)*", @_ }, 474 sub { pack "(w/a*)*", @_ },
445 495
446=item storable - C<$AnyEvent::Fork::RPC::STORABLE_SERIALISER> 496=item storable - C<$AnyEvent::Fork::RPC::STORABLE_SERIALISER>
447 497
448This serialiser uses L<Storable>, which means it has high chance of 498This serialiser uses L<Storable>, which means it has high chance of
449serialising just about anything you throw at it, at the cost of having 499serialising just about anything you throw at it, at the cost of having
450very high overhead per operation. It also comes with perl. 500very high overhead per operation. It also comes with perl. It should be
501used when you need to serialise complex data structures.
451 502
452Implementation: 503Implementation:
453 504
454 use Storable (); 505 use Storable ();
455 ( 506 (
456 sub { Storable::freeze \@_ }, 507 sub { Storable::freeze \@_ },
457 sub { @{ Storable::thaw shift } } 508 sub { @{ Storable::thaw shift } }
458 ) 509 )
459 510
511=item portable storable - C<$AnyEvent::Fork::RPC::NSTORABLE_SERIALISER>
512
513This serialiser also uses L<Storable>, but uses it's "network" format
514to serialise data, which makes it possible to talk to different
515perl binaries (for example, when talking to a process created with
516L<AnyEvent::Fork::Remote>).
517
518Implementation:
519
520 use Storable ();
521 (
522 sub { Storable::nfreeze \@_ },
523 sub { @{ Storable::thaw shift } }
524 )
525
460=back 526=back
461 527
462=back 528=back
463 529
464See the examples section earlier in this document for some actual 530See the examples section earlier in this document for some actual
465examples. 531examples.
466 532
467=cut 533=cut
468 534
469our $STRING_SERIALISER = '(sub { pack "(w/a*)*", @_ }, sub { unpack "(w/a*)*", shift })'; 535our $STRING_SERIALISER = '(sub { pack "(w/a*)*", @_ }, sub { unpack "(w/a*)*", shift })';
470our $JSON_SERIALISER = 'use JSON (); (sub { JSON::encode_json \@_ }, sub { @{ JSON::decode_json shift } })'; 536our $JSON_SERIALISER = 'use JSON (); (sub { JSON::encode_json \@_ }, sub { @{ JSON::decode_json shift } })';
471our $STORABLE_SERIALISER = 'use Storable (); (sub { Storable::freeze \@_ }, sub { @{ Storable::thaw shift } })'; 537our $STORABLE_SERIALISER = 'use Storable (); (sub { Storable::freeze \@_ }, sub { @{ Storable::thaw shift } })';
538our $NSTORABLE_SERIALISER = 'use Storable (); (sub { Storable::nfreeze \@_ }, sub { @{ Storable::thaw shift } })';
472 539
473sub run { 540sub run {
474 my ($self, $function, %arg) = @_; 541 my ($self, $function, %arg) = @_;
475 542
476 my $serialiser = delete $arg{serialiser} || $STRING_SERIALISER; 543 my $serialiser = delete $arg{serialiser} || $STRING_SERIALISER;
479 my $on_destroy = delete $arg{on_destroy}; 546 my $on_destroy = delete $arg{on_destroy};
480 547
481 # default for on_error is to on_event, if specified 548 # default for on_error is to on_event, if specified
482 $on_error ||= $on_event 549 $on_error ||= $on_event
483 ? sub { $on_event->(error => shift) } 550 ? sub { $on_event->(error => shift) }
484 : sub { die "AnyEvent::Fork::RPC: uncaught error: $_[0].\n" }; 551 : sub { AE::log die => "AnyEvent::Fork::RPC: uncaught error: $_[0]." };
485 552
486 # default for on_event is to raise an error 553 # default for on_event is to raise an error
487 $on_event ||= sub { $on_error->("event received, but no on_event handler") }; 554 $on_event ||= sub { $on_error->("event received, but no on_event handler") };
488 555
489 my ($f, $t) = eval $serialiser; die $@ if $@; 556 my ($f, $t) = eval $serialiser; die $@ if $@;

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines