ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-Fork-RPC/RPC.pm
(Generate patch)

Comparing AnyEvent-Fork-RPC/RPC.pm (file contents):
Revision 1.11 by root, Thu Apr 18 07:59:46 2013 UTC vs.
Revision 1.26 by root, Sun Apr 28 14:27:11 2013 UTC

12 ->require ("MyModule") 12 ->require ("MyModule")
13 ->AnyEvent::Fork::RPC::run ( 13 ->AnyEvent::Fork::RPC::run (
14 "MyModule::server", 14 "MyModule::server",
15 ); 15 );
16 16
17 use AnyEvent;
18
17 my $cv = AE::cv; 19 my $cv = AE::cv;
18 20
19 $rpc->(1, 2, 3, sub { 21 $rpc->(1, 2, 3, sub {
20 print "MyModule::server returned @_\n"; 22 print "MyModule::server returned @_\n";
21 $cv->send; 23 $cv->send;
24 $cv->recv; 26 $cv->recv;
25 27
26=head1 DESCRIPTION 28=head1 DESCRIPTION
27 29
28This module implements a simple RPC protocol and backend for processes 30This module implements a simple RPC protocol and backend for processes
29created via L<AnyEvent::Fork>, allowing you to call a function in the 31created via L<AnyEvent::Fork> (or L<AnyEvent::Fork::Remote>), allowing you
30child process and receive its return values (up to 4GB serialised). 32to call a function in the child process and receive its return values (up
33to 4GB serialised).
31 34
32It implements two different backends: a synchronous one that works like a 35It implements two different backends: a synchronous one that works like a
33normal function call, and an asynchronous one that can run multiple jobs 36normal function call, and an asynchronous one that can run multiple jobs
34concurrently in the child, using AnyEvent. 37concurrently in the child, using AnyEvent.
35 38
301 304
302use Errno (); 305use Errno ();
303use Guard (); 306use Guard ();
304 307
305use AnyEvent; 308use AnyEvent;
309# explicit version on next line, as some cpan-testers test with the 0.1 version,
310# ignoring dependencies, and this line will at least give a clear indication of that.
306use AnyEvent::Fork; # we don't actually depend on it, this is for convenience 311use AnyEvent::Fork 0.6; # we don't actually depend on it, this is for convenience
307 312
308our $VERSION = 0.1; 313our $VERSION = 1.1;
309 314
310=item my $rpc = AnyEvent::Fork::RPC::run $fork, $function, [key => value...] 315=item my $rpc = AnyEvent::Fork::RPC::run $fork, $function, [key => value...]
311 316
312The traditional way to call it. But it is way cooler to call it in the 317The traditional way to call it. But it is way cooler to call it in the
313following way: 318following way:
372 377
373The default server used in the child does all I/O blockingly, and only 378The default server used in the child does all I/O blockingly, and only
374allows a single RPC call to execute concurrently. 379allows a single RPC call to execute concurrently.
375 380
376Setting C<async> to a true value switches to another implementation that 381Setting C<async> to a true value switches to another implementation that
377uses L<AnyEvent> in the child and allows multiple concurrent RPC calls. 382uses L<AnyEvent> in the child and allows multiple concurrent RPC calls (it
383does not support recursion in the event loop however, blocking condvar
384calls will fail).
378 385
379The actual API in the child is documented in the section that describes 386The actual API in the child is documented in the section that describes
380the calling semantics of the returned C<$rpc> function. 387the calling semantics of the returned C<$rpc> function.
381 388
382If you want to pre-load the actual back-end modules to enable memory 389If you want to pre-load the actual back-end modules to enable memory
384synchronous, and C<AnyEvent::Fork::RPC::Async> for asynchronous mode. 391synchronous, and C<AnyEvent::Fork::RPC::Async> for asynchronous mode.
385 392
386If you use a template process and want to fork both sync and async 393If you use a template process and want to fork both sync and async
387children, then it is permissible to load both modules. 394children, then it is permissible to load both modules.
388 395
389=item serialiser => $string (default: '(sub { pack "(w/a*)*", @_ }, sub { unpack "(w/a*)*", shift })') 396=item serialiser => $string (default: $AnyEvent::Fork::RPC::STRING_SERIALISER)
390 397
391All arguments, result data and event data have to be serialised to be 398All arguments, result data and event data have to be serialised to be
392transferred between the processes. For this, they have to be frozen and 399transferred between the processes. For this, they have to be frozen and
393thawed in both parent and child processes. 400thawed in both parent and child processes.
394 401
395By default, only octet strings can be passed between the processes, which 402By default, only octet strings can be passed between the processes, which
396is reasonably fast and efficient. 403is reasonably fast and efficient and requires no extra modules.
397 404
398For more complicated use cases, you can provide your own freeze and thaw 405For more complicated use cases, you can provide your own freeze and thaw
399functions, by specifying a string with perl source code. It's supposed to 406functions, by specifying a string with perl source code. It's supposed to
400return two code references when evaluated: the first receives a list of 407return two code references when evaluated: the first receives a list of
401perl values and must return an octet string. The second receives the octet 408perl values and must return an octet string. The second receives the octet
403 410
404If you need an external module for serialisation, then you can either 411If you need an external module for serialisation, then you can either
405pre-load it into your L<AnyEvent::Fork> process, or you can add a C<use> 412pre-load it into your L<AnyEvent::Fork> process, or you can add a C<use>
406or C<require> statement into the serialiser string. Or both. 413or C<require> statement into the serialiser string. Or both.
407 414
415Here are some examples - some of them are also available as global
416variables that make them easier to use.
417
418=over 4
419
420=item octet strings - C<$AnyEvent::Fork::RPC::STRING_SERIALISER>
421
422This serialiser concatenates length-prefixes octet strings, and is the
423default.
424
425Implementation:
426
427 (
428 sub { pack "(w/a*)*", @_ },
429 sub { unpack "(w/a*)*", shift }
430 )
431
432=item json - C<$AnyEvent::Fork::RPC::JSON_SERIALISER>
433
434This serialiser creates JSON arrays - you have to make sure the L<JSON>
435module is installed for this serialiser to work. It can be beneficial for
436sharing when you preload the L<JSON> module in a template process.
437
438L<JSON> (with L<JSON::XS> installed) is slower than the octet string
439serialiser, but usually much faster than L<Storable>, unless big chunks of
440binary data need to be transferred.
441
442Implementation:
443
444 use JSON ();
445 (
446 sub { JSON::encode_json \@_ },
447 sub { @{ JSON::decode_json shift } }
448 )
449
450=item storable - C<$AnyEvent::Fork::RPC::STORABLE_SERIALISER>
451
452This serialiser uses L<Storable>, which means it has high chance of
453serialising just about anything you throw at it, at the cost of having
454very high overhead per operation. It also comes with perl.
455
456Implementation:
457
458 use Storable ();
459 (
460 sub { Storable::freeze \@_ },
461 sub { @{ Storable::thaw shift } }
462 )
463
464=back
465
408=back 466=back
409 467
410See the examples section earlier in this document for some actual 468See the examples section earlier in this document for some actual
411examples. 469examples.
412 470
413=cut 471=cut
414 472
415our $STRING_SERIALISER = '(sub { pack "(w/a*)*", @_ }, sub { unpack "(w/a*)*", shift })'; 473our $STRING_SERIALISER = '(sub { pack "(w/a*)*", @_ }, sub { unpack "(w/a*)*", shift })';
474our $JSON_SERIALISER = 'use JSON (); (sub { JSON::encode_json \@_ }, sub { @{ JSON::decode_json shift } })';
475our $STORABLE_SERIALISER = 'use Storable (); (sub { Storable::freeze \@_ }, sub { @{ Storable::thaw shift } })';
416 476
417sub run { 477sub run {
418 my ($self, $function, %arg) = @_; 478 my ($self, $function, %arg) = @_;
419 479
420 my $serialiser = delete $arg{serialiser} || $STRING_SERIALISER; 480 my $serialiser = delete $arg{serialiser} || $STRING_SERIALISER;
465 $rlen = $rlen * 2 + 16 if $rlen - 128 < length $rbuf; 525 $rlen = $rlen * 2 + 16 if $rlen - 128 < length $rbuf;
466 $len = sysread $fh, $rbuf, $rlen - length $rbuf, length $rbuf; 526 $len = sysread $fh, $rbuf, $rlen - length $rbuf, length $rbuf;
467 527
468 if ($len) { 528 if ($len) {
469 while (8 <= length $rbuf) { 529 while (8 <= length $rbuf) {
470 ($id, $len) = unpack "LL", $rbuf; 530 ($id, $len) = unpack "NN", $rbuf;
471 8 + $len <= length $rbuf 531 8 + $len <= length $rbuf
472 or last; 532 or last;
473 533
474 my @r = $t->(substr $rbuf, 8, $len); 534 my @r = $t->(substr $rbuf, 8, $len);
475 substr $rbuf, 0, 8 + $len, ""; 535 substr $rbuf, 0, 8 + $len, "";
489 } 549 }
490 } elsif (defined $len) { 550 } elsif (defined $len) {
491 undef $rw; undef $ww; # it ends here 551 undef $rw; undef $ww; # it ends here
492 552
493 if (@rcb || %rcb) { 553 if (@rcb || %rcb) {
494 use Data::Dump;ddx[\@rcb,\%rcb];#d#
495 $on_error->("unexpected eof"); 554 $on_error->("unexpected eof");
496 } else { 555 } else {
497 $on_destroy->(); 556 $on_destroy->()
557 if $on_destroy;
498 } 558 }
499 } elsif ($! != Errno::EAGAIN && $! != Errno::EWOULDBLOCK) { 559 } elsif ($! != Errno::EAGAIN && $! != Errno::EWOULDBLOCK) {
500 undef $rw; undef $ww; # it ends here 560 undef $rw; undef $ww; # it ends here
501 $on_error->("read: $!"); 561 $on_error->("read: $!");
502 } 562 }
505 $ww ||= AE::io $fh, 1, $wcb; 565 $ww ||= AE::io $fh, 1, $wcb;
506 }); 566 });
507 567
508 my $guard = Guard::guard { 568 my $guard = Guard::guard {
509 $shutdown = 1; 569 $shutdown = 1;
510 $ww ||= $fh && AE::io $fh, 1, $wcb; 570
571 shutdown $fh, 1 if $fh && !$ww;
511 }; 572 };
512 573
513 my $id; 574 my $id;
514 575
515 $arg{async} 576 $arg{async}
517 $id = ($id == 0xffffffff ? 0 : $id) + 1; 578 $id = ($id == 0xffffffff ? 0 : $id) + 1;
518 $id = ($id == 0xffffffff ? 0 : $id) + 1 while exists $rcb{$id}; # rarely loops 579 $id = ($id == 0xffffffff ? 0 : $id) + 1 while exists $rcb{$id}; # rarely loops
519 580
520 $rcb{$id} = pop; 581 $rcb{$id} = pop;
521 582
522 $guard; # keep it alive 583 $guard if 0; # keep it alive
523 584
524 $wbuf .= pack "LL/a*", $id, &$f; 585 $wbuf .= pack "NN/a*", $id, &$f;
525 $ww ||= $fh && AE::io $fh, 1, $wcb; 586 $ww ||= $fh && AE::io $fh, 1, $wcb;
526 } 587 }
527 : sub { 588 : sub {
528 push @rcb, pop; 589 push @rcb, pop;
529 590
530 $guard; # keep it alive 591 $guard; # keep it alive
531 592
532 $wbuf .= pack "L/a*", &$f; 593 $wbuf .= pack "N/a*", &$f;
533 $ww ||= $fh && AE::io $fh, 1, $wcb; 594 $ww ||= $fh && AE::io $fh, 1, $wcb;
534 } 595 }
535} 596}
536 597
537=item $rpc->(..., $cb->(...)) 598=item $rpc->(..., $cb->(...))
579See the examples section earlier in this document for some actual 640See the examples section earlier in this document for some actual
580examples. 641examples.
581 642
582=back 643=back
583 644
645=head1 ADVANCED TOPICS
646
647=head2 Choosing a backend
648
649So how do you decide which backend to use? Well, that's your problem to
650solve, but here are some thoughts on the matter:
651
652=over 4
653
654=item Synchronous
655
656The synchronous backend does not rely on any external modules (well,
657except L<common::sense>, which works around a bug in how perl's warning
658system works). This keeps the process very small, for example, on my
659system, an empty perl interpreter uses 1492kB RSS, which becomes 2020kB
660after C<use warnings; use strict> (for people who grew up with C64s around
661them this is probably shocking every single time they see it). The worker
662process in the first example in this document uses 1792kB.
663
664Since the calls are done synchronously, slow jobs will keep newer jobs
665from executing.
666
667The synchronous backend also has no overhead due to running an event loop
668- reading requests is therefore very efficient, while writing responses is
669less so, as every response results in a write syscall.
670
671If the parent process is busy and a bit slow reading responses, the child
672waits instead of processing further requests. This also limits the amount
673of memory needed for buffering, as never more than one response has to be
674buffered.
675
676The API in the child is simple - you just have to define a function that
677does something and returns something.
678
679It's hard to use modules or code that relies on an event loop, as the
680child cannot execute anything while it waits for more input.
681
682=item Asynchronous
683
684The asynchronous backend relies on L<AnyEvent>, which tries to be small,
685but still comes at a price: On my system, the worker from example 1a uses
6863420kB RSS (for L<AnyEvent>, which loads L<EV>, which needs L<XSLoader>
687which in turn loads a lot of other modules such as L<warnings>, L<strict>,
688L<vars>, L<Exporter>...).
689
690It batches requests and responses reasonably efficiently, doing only as
691few reads and writes as needed, but needs to poll for events via the event
692loop.
693
694Responses are queued when the parent process is busy. This means the child
695can continue to execute any queued requests. It also means that a child
696might queue a lot of responses in memory when it generates them and the
697parent process is slow accepting them.
698
699The API is not a straightforward RPC pattern - you have to call a
700"done" callback to pass return values and signal completion. Also, more
701importantly, the API starts jobs as fast as possible - when 1000 jobs
702are queued and the jobs are slow, they will all run concurrently. The
703child must implement some queueing/limiting mechanism if this causes
704problems. Alternatively, the parent could limit the amount of rpc calls
705that are outstanding.
706
707Blocking use of condvars is not supported.
708
709Using event-based modules such as L<IO::AIO>, L<Gtk2>, L<Tk> and so on is
710easy.
711
712=back
713
714=head2 Passing file descriptors
715
716Unlike L<AnyEvent::Fork>, this module has no in-built file handle or file
717descriptor passing abilities.
718
719The reason is that passing file descriptors is extraordinary tricky
720business, and conflicts with efficient batching of messages.
721
722There still is a method you can use: Create a
723C<AnyEvent::Util::portable_socketpair> and C<send_fh> one half of it to
724the process before you pass control to C<AnyEvent::Fork::RPC::run>.
725
726Whenever you want to pass a file descriptor, send an rpc request to the
727child process (so it expects the descriptor), then send it over the other
728half of the socketpair. The child should fetch the descriptor from the
729half it has passed earlier.
730
731Here is some (untested) pseudocode to that effect:
732
733 use AnyEvent::Util;
734 use AnyEvent::Fork::RPC;
735 use IO::FDPass;
736
737 my ($s1, $s2) = AnyEvent::Util::portable_socketpair;
738
739 my $rpc = AnyEvent::Fork
740 ->new
741 ->send_fh ($s2)
742 ->require ("MyWorker")
743 ->AnyEvent::Fork::RPC::run ("MyWorker::run"
744 init => "MyWorker::init",
745 );
746
747 undef $s2; # no need to keep it around
748
749 # pass an fd
750 $rpc->("i'll send some fd now, please expect it!", my $cv = AE::cv);
751
752 IO::FDPass fileno $s1, fileno $handle_to_pass;
753
754 $cv->recv;
755
756The MyWorker module could look like this:
757
758 package MyWorker;
759
760 use IO::FDPass;
761
762 my $s2;
763
764 sub init {
765 $s2 = $_[0];
766 }
767
768 sub run {
769 if ($_[0] eq "i'll send some fd now, please expect it!") {
770 my $fd = IO::FDPass::recv fileno $s2;
771 ...
772 }
773 }
774
775Of course, this might be blocking if you pass a lot of file descriptors,
776so you might want to look into L<AnyEvent::FDpasser> which can handle the
777gory details.
778
779=head1 EXCEPTIONS
780
781There are no provisions whatsoever for catching exceptions at this time -
782in the child, exeptions might kill the process, causing calls to be lost
783and the parent encountering a fatal error. In the parent, exceptions in
784the result callback will not be caught and cause undefined behaviour.
785
584=head1 SEE ALSO 786=head1 SEE ALSO
585 787
586L<AnyEvent::Fork> (to create the processes in the first place), 788L<AnyEvent::Fork>, to create the processes in the first place.
789
790L<AnyEvent::Fork::Remote>, like above, but helpful for remote processes.
791
587L<AnyEvent::Fork::Pool> (to manage whole pools of processes). 792L<AnyEvent::Fork::Pool>, to manage whole pools of processes.
588 793
589=head1 AUTHOR AND CONTACT INFORMATION 794=head1 AUTHOR AND CONTACT INFORMATION
590 795
591 Marc Lehmann <schmorp@schmorp.de> 796 Marc Lehmann <schmorp@schmorp.de>
592 http://software.schmorp.de/pkg/AnyEvent-Fork-RPC 797 http://software.schmorp.de/pkg/AnyEvent-Fork-RPC

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines