ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-Fork-RPC/RPC.pm
(Generate patch)

Comparing AnyEvent-Fork-RPC/RPC.pm (file contents):
Revision 1.27 by root, Sun Apr 28 14:34:40 2013 UTC vs.
Revision 1.31 by root, Sat Aug 31 16:35:33 2013 UTC

26 $cv->recv; 26 $cv->recv;
27 27
28=head1 DESCRIPTION 28=head1 DESCRIPTION
29 29
30This module implements a simple RPC protocol and backend for processes 30This module implements a simple RPC protocol and backend for processes
31created via L<AnyEvent::Fork> (or L<AnyEvent::Fork::Remote>), allowing you 31created via L<AnyEvent::Fork> or L<AnyEvent::Fork::Remote>, allowing you
32to call a function in the child process and receive its return values (up 32to call a function in the child process and receive its return values (up
33to 4GB serialised). 33to 4GB serialised).
34 34
35It implements two different backends: a synchronous one that works like a 35It implements two different backends: a synchronous one that works like a
36normal function call, and an asynchronous one that can run multiple jobs 36normal function call, and an asynchronous one that can run multiple jobs
58 58
59 my $rpc = AnyEvent::Fork 59 my $rpc = AnyEvent::Fork
60 ->new 60 ->new
61 ->require ("MyWorker") 61 ->require ("MyWorker")
62 ->AnyEvent::Fork::RPC::run ("MyWorker::run", 62 ->AnyEvent::Fork::RPC::run ("MyWorker::run",
63 on_error => sub { warn "FATAL: $_[0]"; exit 1 }, 63 on_error => sub { warn "ERROR: $_[0]"; exit 1 },
64 on_event => sub { warn "$_[0] requests handled\n" }, 64 on_event => sub { warn "$_[0] requests handled\n" },
65 on_destroy => $done, 65 on_destroy => $done,
66 ); 66 );
67 67
68 for my $id (1..6) { 68 for my $id (1..6) {
201 ->new 201 ->new
202 ->require ("AnyEvent::Fork::RPC::Async") 202 ->require ("AnyEvent::Fork::RPC::Async")
203 ->eval (do { local $/; <DATA> }) 203 ->eval (do { local $/; <DATA> })
204 ->AnyEvent::Fork::RPC::run ("run", 204 ->AnyEvent::Fork::RPC::run ("run",
205 async => 1, 205 async => 1,
206 on_error => sub { warn "FATAL: $_[0]"; exit 1 }, 206 on_error => sub { warn "ERROR: $_[0]"; exit 1 },
207 on_event => sub { print $_[0] }, 207 on_event => sub { print $_[0] },
208 on_destroy => $done, 208 on_destroy => $done,
209 ); 209 );
210 210
211 for my $count (3, 2, 1) { 211 for my $count (3, 2, 1) {
287 287
288This concludes the async example. Since L<AnyEvent::Fork> does not 288This concludes the async example. Since L<AnyEvent::Fork> does not
289actually fork, you are free to use about any module in the child, not just 289actually fork, you are free to use about any module in the child, not just
290L<AnyEvent>, but also L<IO::AIO>, or L<Tk> for example. 290L<AnyEvent>, but also L<IO::AIO>, or L<Tk> for example.
291 291
292=head2 Example 3: Asynchronous backend with Coro
293
294With L<Coro> you can create a nice asynchronous backend implementation by
295defining an rpc server function that creates a new Coro thread for every
296request that calls a function "normally", i.e. the parameters from the
297parent process are passed to it, and any return values are returned to the
298parent process, e.g.:
299
300 package My::Arith;
301
302 sub add {
303 return $_[0] + $_[1];
304 }
305
306 sub mul {
307 return $_[0] * $_[1];
308 }
309
310 sub run {
311 my ($done, $func, @arg) = @_;
312
313 Coro::async_pool {
314 $done->($func->(@arg));
315 };
316 }
317
318The C<run> function creates a new thread for every invocation, using the
319first argument as function name, and calls the C<$done> callback on it's
320return values. This makes it quite natural to define the C<add> and C<mul>
321functions to add or multiply two numbers and return the result.
322
323Since this is the asynchronous backend, it's quite possible to define RPC
324function that do I/O or wait for external events - their execution will
325overlap as needed.
326
327The above could be used like this:
328
329 my $rpc = AnyEvent::Fork
330 ->new
331 ->require ("MyWorker")
332 ->AnyEvent::Fork::RPC::run ("My::Arith::run",
333 on_error => ..., on_event => ..., on_destroy => ...,
334 );
335
336 $rpc->(add => 1, 3, Coro::rouse_cb); say Coro::rouse_wait;
337 $rpc->(mul => 3, 2, Coro::rouse_cb); say Coro::rouse_wait;
338
339The C<say>'s will print C<4> and C<6>.
340
341=head2 Example 4: Forward AnyEvent::Log messages using C<on_event>
342
343This partial example shows how to use the C<event> function to forward
344L<AnyEvent::Log> messages to the parent.
345
346For this, the parent needs to provide a suitable C<on_event>:
347
348 ->AnyEvent::Fork::RPC::run (
349 on_event => sub {
350 if ($_[0] eq "ae_log") {
351 my (undef, $level, $message) = @_;
352 AE::log $level, $message;
353 } else {
354 # other event types
355 }
356 },
357 )
358
359In the child, as early as possible, the following code should reconfigure
360L<AnyEvent::Log> to log via C<AnyEvent::Fork::RPC::event>:
361
362 $AnyEvent::Log::LOG->log_cb (sub {
363 my ($timestamp, $orig_ctx, $level, $message) = @{+shift};
364
365 if (defined &AnyEvent::Fork::RPC::event) {
366 AnyEvent::Fork::RPC::event (ae_log => $level, $message);
367 } else {
368 warn "[$$ before init] $message\n";
369 }
370 });
371
372There is an important twist - the C<AnyEvent::Fork::RPC::event> function
373is only defined when the child is fully initialised. If you redirect the
374log messages in your C<init> function for example, then the C<event>
375function might not yet be available. This is why the log callback checks
376whether the fucntion is there using C<defined>, and only then uses it to
377log the message.
378
292=head1 PARENT PROCESS USAGE 379=head1 PARENT PROCESS USAGE
293 380
294This module exports nothing, and only implements a single function: 381This module exports nothing, and only implements a single function:
295 382
296=over 4 383=over 4
334Called on (fatal) errors, with a descriptive (hopefully) message. If 421Called on (fatal) errors, with a descriptive (hopefully) message. If
335this callback is not provided, but C<on_event> is, then the C<on_event> 422this callback is not provided, but C<on_event> is, then the C<on_event>
336callback is called with the first argument being the string C<error>, 423callback is called with the first argument being the string C<error>,
337followed by the error message. 424followed by the error message.
338 425
339If neither handler is provided it prints the error to STDERR and will 426If neither handler is provided, then the error is reported with loglevel
340start failing badly. 427C<error> via C<AE::log>.
341 428
342=item on_event => $cb->(...) 429=item on_event => $cb->(...)
343 430
344Called for every call to the C<AnyEvent::Fork::RPC::event> function in the 431Called for every call to the C<AnyEvent::Fork::RPC::event> function in the
345child, with the arguments of that function passed to the callback. 432child, with the arguments of that function passed to the callback.
367It is called very early - before the serialisers are created or the 454It is called very early - before the serialisers are created or the
368C<$function> name is resolved into a function reference, so it could be 455C<$function> name is resolved into a function reference, so it could be
369used to load any modules that provide the serialiser or function. It can 456used to load any modules that provide the serialiser or function. It can
370not, however, create events. 457not, however, create events.
371 458
459=item done => $function (default C<CORE::exit>)
460
461The function to call when the asynchronous backend detects an end of file
462condition when reading from the communications socket I<and> there are no
463outstanding requests. It's ignored by the synchronous backend.
464
465By overriding this you can prolong the life of a RPC process after e.g.
466the parent has exited by running the event loop in the provided function
467(or simply calling it, for example, when your child process uses L<EV> you
468could provide L<EV::loop> as C<done> function).
469
470Of course, in that case you are responsible for exiting at the appropriate
471time and not returning from
472
372=item async => $boolean (default: 0) 473=item async => $boolean (default: 0)
373 474
374The default server used in the child does all I/O blockingly, and only 475The default server used in the child does all I/O blockingly, and only
375allows a single RPC call to execute concurrently. 476allows a single RPC call to execute concurrently.
376 477
414=over 4 515=over 4
415 516
416=item octet strings - C<$AnyEvent::Fork::RPC::STRING_SERIALISER> 517=item octet strings - C<$AnyEvent::Fork::RPC::STRING_SERIALISER>
417 518
418This serialiser concatenates length-prefixes octet strings, and is the 519This serialiser concatenates length-prefixes octet strings, and is the
419default. 520default. That means you can only pass (and return) strings containing
521character codes 0-255.
420 522
421Implementation: 523Implementation:
422 524
423 ( 525 (
424 sub { pack "(w/a*)*", @_ }, 526 sub { pack "(w/a*)*", @_ },
445 547
446=item storable - C<$AnyEvent::Fork::RPC::STORABLE_SERIALISER> 548=item storable - C<$AnyEvent::Fork::RPC::STORABLE_SERIALISER>
447 549
448This serialiser uses L<Storable>, which means it has high chance of 550This serialiser uses L<Storable>, which means it has high chance of
449serialising just about anything you throw at it, at the cost of having 551serialising just about anything you throw at it, at the cost of having
450very high overhead per operation. It also comes with perl. 552very high overhead per operation. It also comes with perl. It should be
553used when you need to serialise complex data structures.
451 554
452Implementation: 555Implementation:
453 556
454 use Storable (); 557 use Storable ();
455 ( 558 (
456 sub { Storable::freeze \@_ }, 559 sub { Storable::freeze \@_ },
457 sub { @{ Storable::thaw shift } } 560 sub { @{ Storable::thaw shift } }
458 ) 561 )
459 562
563=item portable storable - C<$AnyEvent::Fork::RPC::NSTORABLE_SERIALISER>
564
565This serialiser also uses L<Storable>, but uses it's "network" format
566to serialise data, which makes it possible to talk to different
567perl binaries (for example, when talking to a process created with
568L<AnyEvent::Fork::Remote>).
569
570Implementation:
571
572 use Storable ();
573 (
574 sub { Storable::nfreeze \@_ },
575 sub { @{ Storable::thaw shift } }
576 )
577
460=back 578=back
461 579
462=back 580=back
463 581
464See the examples section earlier in this document for some actual 582See the examples section earlier in this document for some actual
465examples. 583examples.
466 584
467=cut 585=cut
468 586
469our $STRING_SERIALISER = '(sub { pack "(w/a*)*", @_ }, sub { unpack "(w/a*)*", shift })'; 587our $STRING_SERIALISER = '(sub { pack "(w/a*)*", @_ }, sub { unpack "(w/a*)*", shift })';
470our $JSON_SERIALISER = 'use JSON (); (sub { JSON::encode_json \@_ }, sub { @{ JSON::decode_json shift } })'; 588our $JSON_SERIALISER = 'use JSON (); (sub { JSON::encode_json \@_ }, sub { @{ JSON::decode_json shift } })';
471our $STORABLE_SERIALISER = 'use Storable (); (sub { Storable::freeze \@_ }, sub { @{ Storable::thaw shift } })'; 589our $STORABLE_SERIALISER = 'use Storable (); (sub { Storable::freeze \@_ }, sub { @{ Storable::thaw shift } })';
590our $NSTORABLE_SERIALISER = 'use Storable (); (sub { Storable::nfreeze \@_ }, sub { @{ Storable::thaw shift } })';
472 591
473sub run { 592sub run {
474 my ($self, $function, %arg) = @_; 593 my ($self, $function, %arg) = @_;
475 594
476 my $serialiser = delete $arg{serialiser} || $STRING_SERIALISER; 595 my $serialiser = delete $arg{serialiser} || $STRING_SERIALISER;
479 my $on_destroy = delete $arg{on_destroy}; 598 my $on_destroy = delete $arg{on_destroy};
480 599
481 # default for on_error is to on_event, if specified 600 # default for on_error is to on_event, if specified
482 $on_error ||= $on_event 601 $on_error ||= $on_event
483 ? sub { $on_event->(error => shift) } 602 ? sub { $on_event->(error => shift) }
484 : sub { die "AnyEvent::Fork::RPC: uncaught error: $_[0].\n" }; 603 : sub { AE::log die => "AnyEvent::Fork::RPC: uncaught error: $_[0]." };
485 604
486 # default for on_event is to raise an error 605 # default for on_event is to raise an error
487 $on_event ||= sub { $on_error->("event received, but no on_event handler") }; 606 $on_event ||= sub { $on_error->("event received, but no on_event handler") };
488 607
489 my ($f, $t) = eval $serialiser; die $@ if $@; 608 my ($f, $t) = eval $serialiser; die $@ if $@;
510 }; 629 };
511 630
512 my $module = "AnyEvent::Fork::RPC::" . ($arg{async} ? "Async" : "Sync"); 631 my $module = "AnyEvent::Fork::RPC::" . ($arg{async} ? "Async" : "Sync");
513 632
514 $self->require ($module) 633 $self->require ($module)
515 ->send_arg ($function, $arg{init}, $serialiser) 634 ->send_arg ($function, $arg{init}, $serialiser, $arg{done} || "CORE::exit")
516 ->run ("$module\::run", sub { 635 ->run ("$module\::run", sub {
517 $fh = shift; 636 $fh = shift;
518 637
519 my ($id, $len); 638 my ($id, $len);
520 $rw = AE::io $fh, 0, sub { 639 $rw = AE::io $fh, 0, sub {
636See the examples section earlier in this document for some actual 755See the examples section earlier in this document for some actual
637examples. 756examples.
638 757
639=back 758=back
640 759
760=head2 PROCESS EXIT
761
762If and when the child process exits depends on the backend and
763configuration. Apart from explicit exits (e.g. by calling C<exit>) or
764runtime conditions (uncaught exceptions, signals etc.), the backends exit
765under these conditions:
766
767=over 4
768
769=item Synchronous Backend
770
771The synchronous backend is very simple: when the process waits for another
772request to arrive and the writing side (usually in the parent) is closed,
773it will exit normally, i.e. as if your main program reached the end of the
774file.
775
776That means that if your parent process exits, the RPC process will usually
777exit as well, either because it is idle anyway, or because it executes a
778request. In the latter case, you will likely get an error when the RPc
779process tries to send the results to the parent (because agruably, you
780shouldn't exit your parent while there are still outstanding requests).
781
782The process is usually quiescent when it happens, so it should rarely be a
783problem, and C<END> handlers can be used to clean up.
784
785=item Asynchronous Backend
786
787For the asynchronous backend, things are more complicated: Whenever it
788listens for another request by the parent, it might detect that the socket
789was closed (e.g. because the parent exited). It will sotp listening for
790new requests and instead try to write out any remaining data (if any) or
791simply check whether the socket cna be written to. After this, the RPC
792process is effectively done - no new requests are incoming, no outstanding
793request data can be written back.
794
795Since chances are high that there are event watchers that the RPC server
796knows nothing about (why else would one use the async backend if not for
797the ability to register watchers?), the event loop would often happily
798continue.
799
800This is why the asynchronous backend explicitly calls C<CORE::exit> when
801it is done (it will raise an exception under other circumstances, which
802might lead to the process not exiting on it's own).
803
804You can override this by specifying a function name to call via the C<done>
805parameter instead.
806
807=back
808
641=head1 ADVANCED TOPICS 809=head1 ADVANCED TOPICS
642 810
643=head2 Choosing a backend 811=head2 Choosing a backend
644 812
645So how do you decide which backend to use? Well, that's your problem to 813So how do you decide which backend to use? Well, that's your problem to

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines