… | |
… | |
26 | $cv->recv; |
26 | $cv->recv; |
27 | |
27 | |
28 | =head1 DESCRIPTION |
28 | =head1 DESCRIPTION |
29 | |
29 | |
30 | This module implements a simple RPC protocol and backend for processes |
30 | This module implements a simple RPC protocol and backend for processes |
31 | created via L<AnyEvent::Fork>, allowing you to call a function in the |
31 | created via L<AnyEvent::Fork> (or L<AnyEvent::Fork::Remote>), allowing you |
32 | child process and receive its return values (up to 4GB serialised). |
32 | to call a function in the child process and receive its return values (up |
|
|
33 | to 4GB serialised). |
33 | |
34 | |
34 | It implements two different backends: a synchronous one that works like a |
35 | It implements two different backends: a synchronous one that works like a |
35 | normal function call, and an asynchronous one that can run multiple jobs |
36 | normal function call, and an asynchronous one that can run multiple jobs |
36 | concurrently in the child, using AnyEvent. |
37 | concurrently in the child, using AnyEvent. |
37 | |
38 | |
… | |
… | |
303 | |
304 | |
304 | use Errno (); |
305 | use Errno (); |
305 | use Guard (); |
306 | use Guard (); |
306 | |
307 | |
307 | use AnyEvent; |
308 | use AnyEvent; |
|
|
309 | # explicit version on next line, as some cpan-testers test with the 0.1 version, |
|
|
310 | # ignoring dependencies, and this line will at least give a clear indication of that. |
308 | use AnyEvent::Fork; # we don't actually depend on it, this is for convenience |
311 | use AnyEvent::Fork 0.6; # we don't actually depend on it, this is for convenience |
309 | |
312 | |
310 | our $VERSION = 0.1; |
313 | our $VERSION = 1.1; |
311 | |
314 | |
312 | =item my $rpc = AnyEvent::Fork::RPC::run $fork, $function, [key => value...] |
315 | =item my $rpc = AnyEvent::Fork::RPC::run $fork, $function, [key => value...] |
313 | |
316 | |
314 | The traditional way to call it. But it is way cooler to call it in the |
317 | The traditional way to call it. But it is way cooler to call it in the |
315 | following way: |
318 | following way: |
… | |
… | |
522 | $rlen = $rlen * 2 + 16 if $rlen - 128 < length $rbuf; |
525 | $rlen = $rlen * 2 + 16 if $rlen - 128 < length $rbuf; |
523 | $len = sysread $fh, $rbuf, $rlen - length $rbuf, length $rbuf; |
526 | $len = sysread $fh, $rbuf, $rlen - length $rbuf, length $rbuf; |
524 | |
527 | |
525 | if ($len) { |
528 | if ($len) { |
526 | while (8 <= length $rbuf) { |
529 | while (8 <= length $rbuf) { |
527 | ($id, $len) = unpack "LL", $rbuf; |
530 | ($id, $len) = unpack "NN", $rbuf; |
528 | 8 + $len <= length $rbuf |
531 | 8 + $len <= length $rbuf |
529 | or last; |
532 | or last; |
530 | |
533 | |
531 | my @r = $t->(substr $rbuf, 8, $len); |
534 | my @r = $t->(substr $rbuf, 8, $len); |
532 | substr $rbuf, 0, 8 + $len, ""; |
535 | substr $rbuf, 0, 8 + $len, ""; |
… | |
… | |
548 | undef $rw; undef $ww; # it ends here |
551 | undef $rw; undef $ww; # it ends here |
549 | |
552 | |
550 | if (@rcb || %rcb) { |
553 | if (@rcb || %rcb) { |
551 | $on_error->("unexpected eof"); |
554 | $on_error->("unexpected eof"); |
552 | } else { |
555 | } else { |
553 | $on_destroy->(); |
556 | $on_destroy->() |
|
|
557 | if $on_destroy; |
554 | } |
558 | } |
555 | } elsif ($! != Errno::EAGAIN && $! != Errno::EWOULDBLOCK) { |
559 | } elsif ($! != Errno::EAGAIN && $! != Errno::EWOULDBLOCK) { |
556 | undef $rw; undef $ww; # it ends here |
560 | undef $rw; undef $ww; # it ends here |
557 | $on_error->("read: $!"); |
561 | $on_error->("read: $!"); |
558 | } |
562 | } |
… | |
… | |
561 | $ww ||= AE::io $fh, 1, $wcb; |
565 | $ww ||= AE::io $fh, 1, $wcb; |
562 | }); |
566 | }); |
563 | |
567 | |
564 | my $guard = Guard::guard { |
568 | my $guard = Guard::guard { |
565 | $shutdown = 1; |
569 | $shutdown = 1; |
566 | $ww ||= $fh && AE::io $fh, 1, $wcb; |
570 | |
|
|
571 | shutdown $fh, 1 if $fh && !$ww; |
567 | }; |
572 | }; |
568 | |
573 | |
569 | my $id; |
574 | my $id; |
570 | |
575 | |
571 | $arg{async} |
576 | $arg{async} |
… | |
… | |
573 | $id = ($id == 0xffffffff ? 0 : $id) + 1; |
578 | $id = ($id == 0xffffffff ? 0 : $id) + 1; |
574 | $id = ($id == 0xffffffff ? 0 : $id) + 1 while exists $rcb{$id}; # rarely loops |
579 | $id = ($id == 0xffffffff ? 0 : $id) + 1 while exists $rcb{$id}; # rarely loops |
575 | |
580 | |
576 | $rcb{$id} = pop; |
581 | $rcb{$id} = pop; |
577 | |
582 | |
578 | $guard; # keep it alive |
583 | $guard if 0; # keep it alive |
579 | |
584 | |
580 | $wbuf .= pack "LL/a*", $id, &$f; |
585 | $wbuf .= pack "NN/a*", $id, &$f; |
581 | $ww ||= $fh && AE::io $fh, 1, $wcb; |
586 | $ww ||= $fh && AE::io $fh, 1, $wcb; |
582 | } |
587 | } |
583 | : sub { |
588 | : sub { |
584 | push @rcb, pop; |
589 | push @rcb, pop; |
585 | |
590 | |
586 | $guard; # keep it alive |
591 | $guard; # keep it alive |
587 | |
592 | |
588 | $wbuf .= pack "L/a*", &$f; |
593 | $wbuf .= pack "N/a*", &$f; |
589 | $ww ||= $fh && AE::io $fh, 1, $wcb; |
594 | $ww ||= $fh && AE::io $fh, 1, $wcb; |
590 | } |
595 | } |
591 | } |
596 | } |
592 | |
597 | |
593 | =item $rpc->(..., $cb->(...)) |
598 | =item $rpc->(..., $cb->(...)) |
… | |
… | |
769 | |
774 | |
770 | Of course, this might be blocking if you pass a lot of file descriptors, |
775 | Of course, this might be blocking if you pass a lot of file descriptors, |
771 | so you might want to look into L<AnyEvent::FDpasser> which can handle the |
776 | so you might want to look into L<AnyEvent::FDpasser> which can handle the |
772 | gory details. |
777 | gory details. |
773 | |
778 | |
|
|
779 | =head1 EXCEPTIONS |
|
|
780 | |
|
|
781 | There are no provisions whatsoever for catching exceptions at this time - |
|
|
782 | in the child, exeptions might kill the process, causing calls to be lost |
|
|
783 | and the parent encountering a fatal error. In the parent, exceptions in |
|
|
784 | the result callback will not be caught and cause undefined behaviour. |
|
|
785 | |
774 | =head1 SEE ALSO |
786 | =head1 SEE ALSO |
775 | |
787 | |
776 | L<AnyEvent::Fork>, to create the processes in the first place. |
788 | L<AnyEvent::Fork>, to create the processes in the first place. |
|
|
789 | |
|
|
790 | L<AnyEvent::Fork::Remote>, like above, but helpful for remote processes. |
777 | |
791 | |
778 | L<AnyEvent::Fork::Pool>, to manage whole pools of processes. |
792 | L<AnyEvent::Fork::Pool>, to manage whole pools of processes. |
779 | |
793 | |
780 | =head1 AUTHOR AND CONTACT INFORMATION |
794 | =head1 AUTHOR AND CONTACT INFORMATION |
781 | |
795 | |