ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-Fork-RPC/RPC.pm
(Generate patch)

Comparing AnyEvent-Fork-RPC/RPC.pm (file contents):
Revision 1.11 by root, Thu Apr 18 07:59:46 2013 UTC vs.
Revision 1.21 by root, Sun Apr 21 12:01:54 2013 UTC

1=head1 NAME 1=head1 NAME
2 2
3AnyEvent::Fork::RPC - simple RPC extension for AnyEvent::Fork 3AnyEvent::Fork::RPC - simple RPC extension for AnyEvent::Fork
4
5THE API IS NOT FINISHED, CONSIDER THIS A TECHNOLOGY DEMO
4 6
5=head1 SYNOPSIS 7=head1 SYNOPSIS
6 8
7 use AnyEvent::Fork::RPC; 9 use AnyEvent::Fork::RPC;
8 # use AnyEvent::Fork is not needed 10 # use AnyEvent::Fork is not needed
11 ->new 13 ->new
12 ->require ("MyModule") 14 ->require ("MyModule")
13 ->AnyEvent::Fork::RPC::run ( 15 ->AnyEvent::Fork::RPC::run (
14 "MyModule::server", 16 "MyModule::server",
15 ); 17 );
18
19 use AnyEvent;
16 20
17 my $cv = AE::cv; 21 my $cv = AE::cv;
18 22
19 $rpc->(1, 2, 3, sub { 23 $rpc->(1, 2, 3, sub {
20 print "MyModule::server returned @_\n"; 24 print "MyModule::server returned @_\n";
372 376
373The default server used in the child does all I/O blockingly, and only 377The default server used in the child does all I/O blockingly, and only
374allows a single RPC call to execute concurrently. 378allows a single RPC call to execute concurrently.
375 379
376Setting C<async> to a true value switches to another implementation that 380Setting C<async> to a true value switches to another implementation that
377uses L<AnyEvent> in the child and allows multiple concurrent RPC calls. 381uses L<AnyEvent> in the child and allows multiple concurrent RPC calls (it
382does not support recursion in the event loop however, blocking condvar
383calls will fail).
378 384
379The actual API in the child is documented in the section that describes 385The actual API in the child is documented in the section that describes
380the calling semantics of the returned C<$rpc> function. 386the calling semantics of the returned C<$rpc> function.
381 387
382If you want to pre-load the actual back-end modules to enable memory 388If you want to pre-load the actual back-end modules to enable memory
384synchronous, and C<AnyEvent::Fork::RPC::Async> for asynchronous mode. 390synchronous, and C<AnyEvent::Fork::RPC::Async> for asynchronous mode.
385 391
386If you use a template process and want to fork both sync and async 392If you use a template process and want to fork both sync and async
387children, then it is permissible to load both modules. 393children, then it is permissible to load both modules.
388 394
389=item serialiser => $string (default: '(sub { pack "(w/a*)*", @_ }, sub { unpack "(w/a*)*", shift })') 395=item serialiser => $string (default: $AnyEvent::Fork::RPC::STRING_SERIALISER)
390 396
391All arguments, result data and event data have to be serialised to be 397All arguments, result data and event data have to be serialised to be
392transferred between the processes. For this, they have to be frozen and 398transferred between the processes. For this, they have to be frozen and
393thawed in both parent and child processes. 399thawed in both parent and child processes.
394 400
395By default, only octet strings can be passed between the processes, which 401By default, only octet strings can be passed between the processes, which
396is reasonably fast and efficient. 402is reasonably fast and efficient and requires no extra modules.
397 403
398For more complicated use cases, you can provide your own freeze and thaw 404For more complicated use cases, you can provide your own freeze and thaw
399functions, by specifying a string with perl source code. It's supposed to 405functions, by specifying a string with perl source code. It's supposed to
400return two code references when evaluated: the first receives a list of 406return two code references when evaluated: the first receives a list of
401perl values and must return an octet string. The second receives the octet 407perl values and must return an octet string. The second receives the octet
403 409
404If you need an external module for serialisation, then you can either 410If you need an external module for serialisation, then you can either
405pre-load it into your L<AnyEvent::Fork> process, or you can add a C<use> 411pre-load it into your L<AnyEvent::Fork> process, or you can add a C<use>
406or C<require> statement into the serialiser string. Or both. 412or C<require> statement into the serialiser string. Or both.
407 413
414Here are some examples - some of them are also available as global
415variables that make them easier to use.
416
417=over 4
418
419=item octet strings - C<$AnyEvent::Fork::RPC::STRING_SERIALISER>
420
421This serialiser concatenates length-prefixes octet strings, and is the
422default.
423
424Implementation:
425
426 (
427 sub { pack "(w/a*)*", @_ },
428 sub { unpack "(w/a*)*", shift }
429 )
430
431=item json - C<$AnyEvent::Fork::RPC::JSON_SERIALISER>
432
433This serialiser creates JSON arrays - you have to make sure the L<JSON>
434module is installed for this serialiser to work. It can be beneficial for
435sharing when you preload the L<JSON> module in a template process.
436
437L<JSON> (with L<JSON::XS> installed) is slower than the octet string
438serialiser, but usually much faster than L<Storable>, unless big chunks of
439binary data need to be transferred.
440
441Implementation:
442
443 use JSON ();
444 (
445 sub { JSON::encode_json \@_ },
446 sub { @{ JSON::decode_json shift } }
447 )
448
449=item storable - C<$AnyEvent::Fork::RPC::STORABLE_SERIALISER>
450
451This serialiser uses L<Storable>, which means it has high chance of
452serialising just about anything you throw at it, at the cost of having
453very high overhead per operation. It also comes with perl.
454
455Implementation:
456
457 use Storable ();
458 (
459 sub { Storable::freeze \@_ },
460 sub { @{ Storable::thaw shift } }
461 )
462
463=back
464
408=back 465=back
409 466
410See the examples section earlier in this document for some actual 467See the examples section earlier in this document for some actual
411examples. 468examples.
412 469
413=cut 470=cut
414 471
415our $STRING_SERIALISER = '(sub { pack "(w/a*)*", @_ }, sub { unpack "(w/a*)*", shift })'; 472our $STRING_SERIALISER = '(sub { pack "(w/a*)*", @_ }, sub { unpack "(w/a*)*", shift })';
473our $JSON_SERIALISER = 'use JSON (); (sub { JSON::encode_json \@_ }, sub { @{ JSON::decode_json shift } })';
474our $STORABLE_SERIALISER = 'use Storable (); (sub { Storable::freeze \@_ }, sub { @{ Storable::thaw shift } })';
416 475
417sub run { 476sub run {
418 my ($self, $function, %arg) = @_; 477 my ($self, $function, %arg) = @_;
419 478
420 my $serialiser = delete $arg{serialiser} || $STRING_SERIALISER; 479 my $serialiser = delete $arg{serialiser} || $STRING_SERIALISER;
489 } 548 }
490 } elsif (defined $len) { 549 } elsif (defined $len) {
491 undef $rw; undef $ww; # it ends here 550 undef $rw; undef $ww; # it ends here
492 551
493 if (@rcb || %rcb) { 552 if (@rcb || %rcb) {
494 use Data::Dump;ddx[\@rcb,\%rcb];#d#
495 $on_error->("unexpected eof"); 553 $on_error->("unexpected eof");
496 } else { 554 } else {
497 $on_destroy->(); 555 $on_destroy->()
556 if $on_destroy;
498 } 557 }
499 } elsif ($! != Errno::EAGAIN && $! != Errno::EWOULDBLOCK) { 558 } elsif ($! != Errno::EAGAIN && $! != Errno::EWOULDBLOCK) {
500 undef $rw; undef $ww; # it ends here 559 undef $rw; undef $ww; # it ends here
501 $on_error->("read: $!"); 560 $on_error->("read: $!");
502 } 561 }
505 $ww ||= AE::io $fh, 1, $wcb; 564 $ww ||= AE::io $fh, 1, $wcb;
506 }); 565 });
507 566
508 my $guard = Guard::guard { 567 my $guard = Guard::guard {
509 $shutdown = 1; 568 $shutdown = 1;
510 $ww ||= $fh && AE::io $fh, 1, $wcb; 569
570 shutdown $fh, 1 if $fh && !$ww;
511 }; 571 };
512 572
513 my $id; 573 my $id;
514 574
515 $arg{async} 575 $arg{async}
517 $id = ($id == 0xffffffff ? 0 : $id) + 1; 577 $id = ($id == 0xffffffff ? 0 : $id) + 1;
518 $id = ($id == 0xffffffff ? 0 : $id) + 1 while exists $rcb{$id}; # rarely loops 578 $id = ($id == 0xffffffff ? 0 : $id) + 1 while exists $rcb{$id}; # rarely loops
519 579
520 $rcb{$id} = pop; 580 $rcb{$id} = pop;
521 581
522 $guard; # keep it alive 582 $guard if 0; # keep it alive
523 583
524 $wbuf .= pack "LL/a*", $id, &$f; 584 $wbuf .= pack "LL/a*", $id, &$f;
525 $ww ||= $fh && AE::io $fh, 1, $wcb; 585 $ww ||= $fh && AE::io $fh, 1, $wcb;
526 } 586 }
527 : sub { 587 : sub {
579See the examples section earlier in this document for some actual 639See the examples section earlier in this document for some actual
580examples. 640examples.
581 641
582=back 642=back
583 643
644=head1 ADVANCED TOPICS
645
646=head2 Choosing a backend
647
648So how do you decide which backend to use? Well, that's your problem to
649solve, but here are some thoughts on the matter:
650
651=over 4
652
653=item Synchronous
654
655The synchronous backend does not rely on any external modules (well,
656except L<common::sense>, which works around a bug in how perl's warning
657system works). This keeps the process very small, for example, on my
658system, an empty perl interpreter uses 1492kB RSS, which becomes 2020kB
659after C<use warnings; use strict> (for people who grew up with C64s around
660them this is probably shocking every single time they see it). The worker
661process in the first example in this document uses 1792kB.
662
663Since the calls are done synchronously, slow jobs will keep newer jobs
664from executing.
665
666The synchronous backend also has no overhead due to running an event loop
667- reading requests is therefore very efficient, while writing responses is
668less so, as every response results in a write syscall.
669
670If the parent process is busy and a bit slow reading responses, the child
671waits instead of processing further requests. This also limits the amount
672of memory needed for buffering, as never more than one response has to be
673buffered.
674
675The API in the child is simple - you just have to define a function that
676does something and returns something.
677
678It's hard to use modules or code that relies on an event loop, as the
679child cannot execute anything while it waits for more input.
680
681=item Asynchronous
682
683The asynchronous backend relies on L<AnyEvent>, which tries to be small,
684but still comes at a price: On my system, the worker from example 1a uses
6853420kB RSS (for L<AnyEvent>, which loads L<EV>, which needs L<XSLoader>
686which in turn loads a lot of other modules such as L<warnings>, L<strict>,
687L<vars>, L<Exporter>...).
688
689It batches requests and responses reasonably efficiently, doing only as
690few reads and writes as needed, but needs to poll for events via the event
691loop.
692
693Responses are queued when the parent process is busy. This means the child
694can continue to execute any queued requests. It also means that a child
695might queue a lot of responses in memory when it generates them and the
696parent process is slow accepting them.
697
698The API is not a straightforward RPC pattern - you have to call a
699"done" callback to pass return values and signal completion. Also, more
700importantly, the API starts jobs as fast as possible - when 1000 jobs
701are queued and the jobs are slow, they will all run concurrently. The
702child must implement some queueing/limiting mechanism if this causes
703problems. Alternatively, the parent could limit the amount of rpc calls
704that are outstanding.
705
706Blocking use of condvars is not supported.
707
708Using event-based modules such as L<IO::AIO>, L<Gtk2>, L<Tk> and so on is
709easy.
710
711=back
712
713=head2 Passing file descriptors
714
715Unlike L<AnyEvent::Fork>, this module has no in-built file handle or file
716descriptor passing abilities.
717
718The reason is that passing file descriptors is extraordinary tricky
719business, and conflicts with efficient batching of messages.
720
721There still is a method you can use: Create a
722C<AnyEvent::Util::portable_socketpair> and C<send_fh> one half of it to
723the process before you pass control to C<AnyEvent::Fork::RPC::run>.
724
725Whenever you want to pass a file descriptor, send an rpc request to the
726child process (so it expects the descriptor), then send it over the other
727half of the socketpair. The child should fetch the descriptor from the
728half it has passed earlier.
729
730Here is some (untested) pseudocode to that effect:
731
732 use AnyEvent::Util;
733 use AnyEvent::Fork::RPC;
734 use IO::FDPass;
735
736 my ($s1, $s2) = AnyEvent::Util::portable_socketpair;
737
738 my $rpc = AnyEvent::Fork
739 ->new
740 ->send_fh ($s2)
741 ->require ("MyWorker")
742 ->AnyEvent::Fork::RPC::run ("MyWorker::run"
743 init => "MyWorker::init",
744 );
745
746 undef $s2; # no need to keep it around
747
748 # pass an fd
749 $rpc->("i'll send some fd now, please expect it!", my $cv = AE::cv);
750
751 IO::FDPass fileno $s1, fileno $handle_to_pass;
752
753 $cv->recv;
754
755The MyWorker module could look like this:
756
757 package MyWorker;
758
759 use IO::FDPass;
760
761 my $s2;
762
763 sub init {
764 $s2 = $_[0];
765 }
766
767 sub run {
768 if ($_[0] eq "i'll send some fd now, please expect it!") {
769 my $fd = IO::FDPass::recv fileno $s2;
770 ...
771 }
772 }
773
774Of course, this might be blocking if you pass a lot of file descriptors,
775so you might want to look into L<AnyEvent::FDpasser> which can handle the
776gory details.
777
778=head1 EXCEPTIONS
779
780There are no provisions whatsoever for catching exceptions at this time -
781in the child, exeptions might kill the process, causing calls to be lost
782and the parent encountering a fatal error. In the parent, exceptions in
783the result callback will not be caught and cause undefined behaviour.
784
584=head1 SEE ALSO 785=head1 SEE ALSO
585 786
586L<AnyEvent::Fork> (to create the processes in the first place), 787L<AnyEvent::Fork>, to create the processes in the first place.
788
587L<AnyEvent::Fork::Pool> (to manage whole pools of processes). 789L<AnyEvent::Fork::Pool>, to manage whole pools of processes.
588 790
589=head1 AUTHOR AND CONTACT INFORMATION 791=head1 AUTHOR AND CONTACT INFORMATION
590 792
591 Marc Lehmann <schmorp@schmorp.de> 793 Marc Lehmann <schmorp@schmorp.de>
592 http://software.schmorp.de/pkg/AnyEvent-Fork-RPC 794 http://software.schmorp.de/pkg/AnyEvent-Fork-RPC

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines