ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-Fork-RPC/RPC.pm
(Generate patch)

Comparing AnyEvent-Fork-RPC/RPC.pm (file contents):
Revision 1.16 by root, Thu Apr 18 14:07:15 2013 UTC vs.
Revision 1.26 by root, Sun Apr 28 14:27:11 2013 UTC

26 $cv->recv; 26 $cv->recv;
27 27
28=head1 DESCRIPTION 28=head1 DESCRIPTION
29 29
30This module implements a simple RPC protocol and backend for processes 30This module implements a simple RPC protocol and backend for processes
31created via L<AnyEvent::Fork>, allowing you to call a function in the 31created via L<AnyEvent::Fork> (or L<AnyEvent::Fork::Remote>), allowing you
32child process and receive its return values (up to 4GB serialised). 32to call a function in the child process and receive its return values (up
33to 4GB serialised).
33 34
34It implements two different backends: a synchronous one that works like a 35It implements two different backends: a synchronous one that works like a
35normal function call, and an asynchronous one that can run multiple jobs 36normal function call, and an asynchronous one that can run multiple jobs
36concurrently in the child, using AnyEvent. 37concurrently in the child, using AnyEvent.
37 38
303 304
304use Errno (); 305use Errno ();
305use Guard (); 306use Guard ();
306 307
307use AnyEvent; 308use AnyEvent;
309# explicit version on next line, as some cpan-testers test with the 0.1 version,
310# ignoring dependencies, and this line will at least give a clear indication of that.
308use AnyEvent::Fork; # we don't actually depend on it, this is for convenience 311use AnyEvent::Fork 0.6; # we don't actually depend on it, this is for convenience
309 312
310our $VERSION = 0.1; 313our $VERSION = 1.1;
311 314
312=item my $rpc = AnyEvent::Fork::RPC::run $fork, $function, [key => value...] 315=item my $rpc = AnyEvent::Fork::RPC::run $fork, $function, [key => value...]
313 316
314The traditional way to call it. But it is way cooler to call it in the 317The traditional way to call it. But it is way cooler to call it in the
315following way: 318following way:
522 $rlen = $rlen * 2 + 16 if $rlen - 128 < length $rbuf; 525 $rlen = $rlen * 2 + 16 if $rlen - 128 < length $rbuf;
523 $len = sysread $fh, $rbuf, $rlen - length $rbuf, length $rbuf; 526 $len = sysread $fh, $rbuf, $rlen - length $rbuf, length $rbuf;
524 527
525 if ($len) { 528 if ($len) {
526 while (8 <= length $rbuf) { 529 while (8 <= length $rbuf) {
527 ($id, $len) = unpack "LL", $rbuf; 530 ($id, $len) = unpack "NN", $rbuf;
528 8 + $len <= length $rbuf 531 8 + $len <= length $rbuf
529 or last; 532 or last;
530 533
531 my @r = $t->(substr $rbuf, 8, $len); 534 my @r = $t->(substr $rbuf, 8, $len);
532 substr $rbuf, 0, 8 + $len, ""; 535 substr $rbuf, 0, 8 + $len, "";
548 undef $rw; undef $ww; # it ends here 551 undef $rw; undef $ww; # it ends here
549 552
550 if (@rcb || %rcb) { 553 if (@rcb || %rcb) {
551 $on_error->("unexpected eof"); 554 $on_error->("unexpected eof");
552 } else { 555 } else {
553 $on_destroy->(); 556 $on_destroy->()
557 if $on_destroy;
554 } 558 }
555 } elsif ($! != Errno::EAGAIN && $! != Errno::EWOULDBLOCK) { 559 } elsif ($! != Errno::EAGAIN && $! != Errno::EWOULDBLOCK) {
556 undef $rw; undef $ww; # it ends here 560 undef $rw; undef $ww; # it ends here
557 $on_error->("read: $!"); 561 $on_error->("read: $!");
558 } 562 }
561 $ww ||= AE::io $fh, 1, $wcb; 565 $ww ||= AE::io $fh, 1, $wcb;
562 }); 566 });
563 567
564 my $guard = Guard::guard { 568 my $guard = Guard::guard {
565 $shutdown = 1; 569 $shutdown = 1;
566 $ww ||= $fh && AE::io $fh, 1, $wcb; 570
571 shutdown $fh, 1 if $fh && !$ww;
567 }; 572 };
568 573
569 my $id; 574 my $id;
570 575
571 $arg{async} 576 $arg{async}
573 $id = ($id == 0xffffffff ? 0 : $id) + 1; 578 $id = ($id == 0xffffffff ? 0 : $id) + 1;
574 $id = ($id == 0xffffffff ? 0 : $id) + 1 while exists $rcb{$id}; # rarely loops 579 $id = ($id == 0xffffffff ? 0 : $id) + 1 while exists $rcb{$id}; # rarely loops
575 580
576 $rcb{$id} = pop; 581 $rcb{$id} = pop;
577 582
578 $guard; # keep it alive 583 $guard if 0; # keep it alive
579 584
580 $wbuf .= pack "LL/a*", $id, &$f; 585 $wbuf .= pack "NN/a*", $id, &$f;
581 $ww ||= $fh && AE::io $fh, 1, $wcb; 586 $ww ||= $fh && AE::io $fh, 1, $wcb;
582 } 587 }
583 : sub { 588 : sub {
584 push @rcb, pop; 589 push @rcb, pop;
585 590
586 $guard; # keep it alive 591 $guard; # keep it alive
587 592
588 $wbuf .= pack "L/a*", &$f; 593 $wbuf .= pack "N/a*", &$f;
589 $ww ||= $fh && AE::io $fh, 1, $wcb; 594 $ww ||= $fh && AE::io $fh, 1, $wcb;
590 } 595 }
591} 596}
592 597
593=item $rpc->(..., $cb->(...)) 598=item $rpc->(..., $cb->(...))
769 774
770Of course, this might be blocking if you pass a lot of file descriptors, 775Of course, this might be blocking if you pass a lot of file descriptors,
771so you might want to look into L<AnyEvent::FDpasser> which can handle the 776so you might want to look into L<AnyEvent::FDpasser> which can handle the
772gory details. 777gory details.
773 778
779=head1 EXCEPTIONS
780
781There are no provisions whatsoever for catching exceptions at this time -
782in the child, exeptions might kill the process, causing calls to be lost
783and the parent encountering a fatal error. In the parent, exceptions in
784the result callback will not be caught and cause undefined behaviour.
785
774=head1 SEE ALSO 786=head1 SEE ALSO
775 787
776L<AnyEvent::Fork>, to create the processes in the first place. 788L<AnyEvent::Fork>, to create the processes in the first place.
789
790L<AnyEvent::Fork::Remote>, like above, but helpful for remote processes.
777 791
778L<AnyEvent::Fork::Pool>, to manage whole pools of processes. 792L<AnyEvent::Fork::Pool>, to manage whole pools of processes.
779 793
780=head1 AUTHOR AND CONTACT INFORMATION 794=head1 AUTHOR AND CONTACT INFORMATION
781 795

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines