--- AnyEvent-Fork-RPC/RPC.pm 2013/04/17 20:19:41 1.7 +++ AnyEvent-Fork-RPC/RPC.pm 2013/04/17 21:48:35 1.9 @@ -259,6 +259,9 @@ =back +See the examples section earlier in this document for some actual +examples. + =cut our $STRING_SERIALISER = '(sub { pack "(w/a*)*", @_ }, sub { unpack "(w/a*)*", shift })'; @@ -281,13 +284,13 @@ my ($f, $t) = eval $serialiser; die $@ if $@; - my (@rcb, $fh, $shutdown, $wbuf, $ww, $rw); - my ($rlen, $rbuf) = 512 - 16; + my (@rcb, %rcb, $fh, $shutdown, $wbuf, $ww); + my ($rlen, $rbuf, $rw) = 512 - 16; my $wcb = sub { my $len = syswrite $fh, $wbuf; - if (!defined $len) { + unless (defined $len) { if ($! != Errno::EAGAIN && $! != Errno::EWOULDBLOCK) { undef $rw; undef $ww; # it ends here $on_error->("$!"); @@ -308,32 +311,39 @@ ->send_arg ($function, $arg{init}, $serialiser) ->run ("$module\::run", sub { $fh = shift; + + my ($id, $len); $rw = AE::io $fh, 0, sub { $rlen = $rlen * 2 + 16 if $rlen - 128 < length $rbuf; - my $len = sysread $fh, $rbuf, $rlen - length $rbuf, length $rbuf; + $len = sysread $fh, $rbuf, $rlen - length $rbuf, length $rbuf; if ($len) { - while (4 <= length $rbuf) { - $len = unpack "L", $rbuf; - 4 + $len <= length $rbuf + while (8 <= length $rbuf) { + ($id, $len) = unpack "LL", $rbuf; + 8 + $len <= length $rbuf or last; - my @r = $t->(substr $rbuf, 4, $len); - substr $rbuf, 0, $len + 4, ""; - - if (pop @r) { - $on_event->(@r); - } elsif (@rcb) { - (shift @rcb)->(@r); + my @r = $t->(substr $rbuf, 8, $len); + substr $rbuf, 0, 8 + $len, ""; + + if ($id) { + if (@rcb) { + (shift @rcb)->(@r); + } elsif (my $cb = delete $rcb{$id}) { + $cb->(@r); + } else { + undef $rw; undef $ww; + $on_error->("unexpected data from child"); + } } else { - undef $rw; undef $ww; - $on_error->("unexpected data from child"); + $on_event->(@r); } } } elsif (defined $len) { undef $rw; undef $ww; # it ends here - if (@rcb) { + if (@rcb || %rcb) { + use Data::Dump;ddx[\@rcb,\%rcb];#d# $on_error->("unexpected eof"); } else { $on_destroy->(); @@ -352,14 +362,28 @@ $ww ||= $fh && AE::io $fh, 1, $wcb; }; - sub { - push @rcb, pop; + my $id; - $guard; # keep it alive + $arg{async} + ? sub { + $id = ($id == 0xffffffff ? 0 : $id) + 1; + $id = ($id == 0xffffffff ? 0 : $id) + 1 while exists $rcb{$id}; # rarely loops - $wbuf .= pack "L/a*", &$f; - $ww ||= $fh && AE::io $fh, 1, $wcb; - } + $rcb{$id} = pop; + + $guard; # keep it alive + + $wbuf .= pack "LL/a*", $id, &$f; + $ww ||= $fh && AE::io $fh, 1, $wcb; + } + : sub { + push @rcb, pop; + + $guard; # keep it alive + + $wbuf .= pack "L/a*", &$f; + $ww ||= $fh && AE::io $fh, 1, $wcb; + } } =item $rpc->(..., $cb->(...)) @@ -384,6 +408,9 @@ this case, the child process will execute all remaining RPC calls, report their results, and then exit. +See the examples section earlier in this document for some actual +examples. + =back =head1 CHILD PROCESS USAGE @@ -401,6 +428,9 @@ child process to the parent, except that there is no notion of return values. +See the examples section earlier in this document for some actual +examples. + =back =head1 SEE ALSO