ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-Fork-RPC/RPC.pm
(Generate patch)

Comparing AnyEvent-Fork-RPC/RPC.pm (file contents):
Revision 1.13 by root, Thu Apr 18 11:11:26 2013 UTC vs.
Revision 1.24 by root, Sat Apr 27 23:49:01 2013 UTC

1=head1 NAME 1=head1 NAME
2 2
3AnyEvent::Fork::RPC - simple RPC extension for AnyEvent::Fork 3AnyEvent::Fork::RPC - simple RPC extension for AnyEvent::Fork
4
5THE API IS NOT FINISHED, CONSIDER THIS A BETA RELEASE
4 6
5=head1 SYNOPSIS 7=head1 SYNOPSIS
6 8
7 use AnyEvent::Fork::RPC; 9 use AnyEvent::Fork::RPC;
8 # use AnyEvent::Fork is not needed 10 # use AnyEvent::Fork is not needed
11 ->new 13 ->new
12 ->require ("MyModule") 14 ->require ("MyModule")
13 ->AnyEvent::Fork::RPC::run ( 15 ->AnyEvent::Fork::RPC::run (
14 "MyModule::server", 16 "MyModule::server",
15 ); 17 );
18
19 use AnyEvent;
16 20
17 my $cv = AE::cv; 21 my $cv = AE::cv;
18 22
19 $rpc->(1, 2, 3, sub { 23 $rpc->(1, 2, 3, sub {
20 print "MyModule::server returned @_\n"; 24 print "MyModule::server returned @_\n";
301 305
302use Errno (); 306use Errno ();
303use Guard (); 307use Guard ();
304 308
305use AnyEvent; 309use AnyEvent;
310# explicit version on next line, as some cpan-testers test with the 0.1 version,
311# ignoring dependencies, and this line will at least give a clear indication of that.
306use AnyEvent::Fork; # we don't actually depend on it, this is for convenience 312use AnyEvent::Fork 0.6; # we don't actually depend on it, this is for convenience
307 313
308our $VERSION = 0.1; 314our $VERSION = 0.2;
309 315
310=item my $rpc = AnyEvent::Fork::RPC::run $fork, $function, [key => value...] 316=item my $rpc = AnyEvent::Fork::RPC::run $fork, $function, [key => value...]
311 317
312The traditional way to call it. But it is way cooler to call it in the 318The traditional way to call it. But it is way cooler to call it in the
313following way: 319following way:
372 378
373The default server used in the child does all I/O blockingly, and only 379The default server used in the child does all I/O blockingly, and only
374allows a single RPC call to execute concurrently. 380allows a single RPC call to execute concurrently.
375 381
376Setting C<async> to a true value switches to another implementation that 382Setting C<async> to a true value switches to another implementation that
377uses L<AnyEvent> in the child and allows multiple concurrent RPC calls. 383uses L<AnyEvent> in the child and allows multiple concurrent RPC calls (it
384does not support recursion in the event loop however, blocking condvar
385calls will fail).
378 386
379The actual API in the child is documented in the section that describes 387The actual API in the child is documented in the section that describes
380the calling semantics of the returned C<$rpc> function. 388the calling semantics of the returned C<$rpc> function.
381 389
382If you want to pre-load the actual back-end modules to enable memory 390If you want to pre-load the actual back-end modules to enable memory
384synchronous, and C<AnyEvent::Fork::RPC::Async> for asynchronous mode. 392synchronous, and C<AnyEvent::Fork::RPC::Async> for asynchronous mode.
385 393
386If you use a template process and want to fork both sync and async 394If you use a template process and want to fork both sync and async
387children, then it is permissible to load both modules. 395children, then it is permissible to load both modules.
388 396
389=item serialiser => $string (default: '(sub { pack "(w/a*)*", @_ }, sub { unpack "(w/a*)*", shift })') 397=item serialiser => $string (default: $AnyEvent::Fork::RPC::STRING_SERIALISER)
390 398
391All arguments, result data and event data have to be serialised to be 399All arguments, result data and event data have to be serialised to be
392transferred between the processes. For this, they have to be frozen and 400transferred between the processes. For this, they have to be frozen and
393thawed in both parent and child processes. 401thawed in both parent and child processes.
394 402
395By default, only octet strings can be passed between the processes, which 403By default, only octet strings can be passed between the processes, which
396is reasonably fast and efficient. 404is reasonably fast and efficient and requires no extra modules.
397 405
398For more complicated use cases, you can provide your own freeze and thaw 406For more complicated use cases, you can provide your own freeze and thaw
399functions, by specifying a string with perl source code. It's supposed to 407functions, by specifying a string with perl source code. It's supposed to
400return two code references when evaluated: the first receives a list of 408return two code references when evaluated: the first receives a list of
401perl values and must return an octet string. The second receives the octet 409perl values and must return an octet string. The second receives the octet
403 411
404If you need an external module for serialisation, then you can either 412If you need an external module for serialisation, then you can either
405pre-load it into your L<AnyEvent::Fork> process, or you can add a C<use> 413pre-load it into your L<AnyEvent::Fork> process, or you can add a C<use>
406or C<require> statement into the serialiser string. Or both. 414or C<require> statement into the serialiser string. Or both.
407 415
416Here are some examples - some of them are also available as global
417variables that make them easier to use.
418
419=over 4
420
421=item octet strings - C<$AnyEvent::Fork::RPC::STRING_SERIALISER>
422
423This serialiser concatenates length-prefixes octet strings, and is the
424default.
425
426Implementation:
427
428 (
429 sub { pack "(w/a*)*", @_ },
430 sub { unpack "(w/a*)*", shift }
431 )
432
433=item json - C<$AnyEvent::Fork::RPC::JSON_SERIALISER>
434
435This serialiser creates JSON arrays - you have to make sure the L<JSON>
436module is installed for this serialiser to work. It can be beneficial for
437sharing when you preload the L<JSON> module in a template process.
438
439L<JSON> (with L<JSON::XS> installed) is slower than the octet string
440serialiser, but usually much faster than L<Storable>, unless big chunks of
441binary data need to be transferred.
442
443Implementation:
444
445 use JSON ();
446 (
447 sub { JSON::encode_json \@_ },
448 sub { @{ JSON::decode_json shift } }
449 )
450
451=item storable - C<$AnyEvent::Fork::RPC::STORABLE_SERIALISER>
452
453This serialiser uses L<Storable>, which means it has high chance of
454serialising just about anything you throw at it, at the cost of having
455very high overhead per operation. It also comes with perl.
456
457Implementation:
458
459 use Storable ();
460 (
461 sub { Storable::freeze \@_ },
462 sub { @{ Storable::thaw shift } }
463 )
464
465=back
466
408=back 467=back
409 468
410See the examples section earlier in this document for some actual 469See the examples section earlier in this document for some actual
411examples. 470examples.
412 471
413=cut 472=cut
414 473
415our $STRING_SERIALISER = '(sub { pack "(w/a*)*", @_ }, sub { unpack "(w/a*)*", shift })'; 474our $STRING_SERIALISER = '(sub { pack "(w/a*)*", @_ }, sub { unpack "(w/a*)*", shift })';
475our $JSON_SERIALISER = 'use JSON (); (sub { JSON::encode_json \@_ }, sub { @{ JSON::decode_json shift } })';
476our $STORABLE_SERIALISER = 'use Storable (); (sub { Storable::freeze \@_ }, sub { @{ Storable::thaw shift } })';
416 477
417sub run { 478sub run {
418 my ($self, $function, %arg) = @_; 479 my ($self, $function, %arg) = @_;
419 480
420 my $serialiser = delete $arg{serialiser} || $STRING_SERIALISER; 481 my $serialiser = delete $arg{serialiser} || $STRING_SERIALISER;
465 $rlen = $rlen * 2 + 16 if $rlen - 128 < length $rbuf; 526 $rlen = $rlen * 2 + 16 if $rlen - 128 < length $rbuf;
466 $len = sysread $fh, $rbuf, $rlen - length $rbuf, length $rbuf; 527 $len = sysread $fh, $rbuf, $rlen - length $rbuf, length $rbuf;
467 528
468 if ($len) { 529 if ($len) {
469 while (8 <= length $rbuf) { 530 while (8 <= length $rbuf) {
470 ($id, $len) = unpack "LL", $rbuf; 531 ($id, $len) = unpack "NN", $rbuf;
471 8 + $len <= length $rbuf 532 8 + $len <= length $rbuf
472 or last; 533 or last;
473 534
474 my @r = $t->(substr $rbuf, 8, $len); 535 my @r = $t->(substr $rbuf, 8, $len);
475 substr $rbuf, 0, 8 + $len, ""; 536 substr $rbuf, 0, 8 + $len, "";
491 undef $rw; undef $ww; # it ends here 552 undef $rw; undef $ww; # it ends here
492 553
493 if (@rcb || %rcb) { 554 if (@rcb || %rcb) {
494 $on_error->("unexpected eof"); 555 $on_error->("unexpected eof");
495 } else { 556 } else {
496 $on_destroy->(); 557 $on_destroy->()
558 if $on_destroy;
497 } 559 }
498 } elsif ($! != Errno::EAGAIN && $! != Errno::EWOULDBLOCK) { 560 } elsif ($! != Errno::EAGAIN && $! != Errno::EWOULDBLOCK) {
499 undef $rw; undef $ww; # it ends here 561 undef $rw; undef $ww; # it ends here
500 $on_error->("read: $!"); 562 $on_error->("read: $!");
501 } 563 }
504 $ww ||= AE::io $fh, 1, $wcb; 566 $ww ||= AE::io $fh, 1, $wcb;
505 }); 567 });
506 568
507 my $guard = Guard::guard { 569 my $guard = Guard::guard {
508 $shutdown = 1; 570 $shutdown = 1;
509 $ww ||= $fh && AE::io $fh, 1, $wcb; 571
572 shutdown $fh, 1 if $fh && !$ww;
510 }; 573 };
511 574
512 my $id; 575 my $id;
513 576
514 $arg{async} 577 $arg{async}
516 $id = ($id == 0xffffffff ? 0 : $id) + 1; 579 $id = ($id == 0xffffffff ? 0 : $id) + 1;
517 $id = ($id == 0xffffffff ? 0 : $id) + 1 while exists $rcb{$id}; # rarely loops 580 $id = ($id == 0xffffffff ? 0 : $id) + 1 while exists $rcb{$id}; # rarely loops
518 581
519 $rcb{$id} = pop; 582 $rcb{$id} = pop;
520 583
521 $guard; # keep it alive 584 $guard if 0; # keep it alive
522 585
523 $wbuf .= pack "LL/a*", $id, &$f; 586 $wbuf .= pack "NN/a*", $id, &$f;
524 $ww ||= $fh && AE::io $fh, 1, $wcb; 587 $ww ||= $fh && AE::io $fh, 1, $wcb;
525 } 588 }
526 : sub { 589 : sub {
527 push @rcb, pop; 590 push @rcb, pop;
528 591
529 $guard; # keep it alive 592 $guard; # keep it alive
530 593
531 $wbuf .= pack "L/a*", &$f; 594 $wbuf .= pack "N/a*", &$f;
532 $ww ||= $fh && AE::io $fh, 1, $wcb; 595 $ww ||= $fh && AE::io $fh, 1, $wcb;
533 } 596 }
534} 597}
535 598
536=item $rpc->(..., $cb->(...)) 599=item $rpc->(..., $cb->(...))
640are queued and the jobs are slow, they will all run concurrently. The 703are queued and the jobs are slow, they will all run concurrently. The
641child must implement some queueing/limiting mechanism if this causes 704child must implement some queueing/limiting mechanism if this causes
642problems. Alternatively, the parent could limit the amount of rpc calls 705problems. Alternatively, the parent could limit the amount of rpc calls
643that are outstanding. 706that are outstanding.
644 707
708Blocking use of condvars is not supported.
709
645Using event-based modules such as L<IO::AIO>, L<Gtk2>, L<Tk> and so on is 710Using event-based modules such as L<IO::AIO>, L<Gtk2>, L<Tk> and so on is
646easy. 711easy.
647 712
648=back 713=back
649 714
710 775
711Of course, this might be blocking if you pass a lot of file descriptors, 776Of course, this might be blocking if you pass a lot of file descriptors,
712so you might want to look into L<AnyEvent::FDpasser> which can handle the 777so you might want to look into L<AnyEvent::FDpasser> which can handle the
713gory details. 778gory details.
714 779
780=head1 EXCEPTIONS
781
782There are no provisions whatsoever for catching exceptions at this time -
783in the child, exeptions might kill the process, causing calls to be lost
784and the parent encountering a fatal error. In the parent, exceptions in
785the result callback will not be caught and cause undefined behaviour.
786
715=head1 SEE ALSO 787=head1 SEE ALSO
716 788
717L<AnyEvent::Fork> (to create the processes in the first place), 789L<AnyEvent::Fork>, to create the processes in the first place.
790
718L<AnyEvent::Fork::Pool> (to manage whole pools of processes). 791L<AnyEvent::Fork::Pool>, to manage whole pools of processes.
719 792
720=head1 AUTHOR AND CONTACT INFORMATION 793=head1 AUTHOR AND CONTACT INFORMATION
721 794
722 Marc Lehmann <schmorp@schmorp.de> 795 Marc Lehmann <schmorp@schmorp.de>
723 http://software.schmorp.de/pkg/AnyEvent-Fork-RPC 796 http://software.schmorp.de/pkg/AnyEvent-Fork-RPC

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines