ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-Fork-RPC/RPC.pm
(Generate patch)

Comparing AnyEvent-Fork-RPC/RPC.pm (file contents):
Revision 1.8 by root, Wed Apr 17 20:24:36 2013 UTC vs.
Revision 1.9 by root, Wed Apr 17 21:48:35 2013 UTC

257pre-load it into your L<AnyEvent::Fork> process, or you can add a C<use> 257pre-load it into your L<AnyEvent::Fork> process, or you can add a C<use>
258or C<require> statement into the serialiser string. Or both. 258or C<require> statement into the serialiser string. Or both.
259 259
260=back 260=back
261 261
262See the examples section earlier in this document for some actual examples. 262See the examples section earlier in this document for some actual
263examples.
263 264
264=cut 265=cut
265 266
266our $STRING_SERIALISER = '(sub { pack "(w/a*)*", @_ }, sub { unpack "(w/a*)*", shift })'; 267our $STRING_SERIALISER = '(sub { pack "(w/a*)*", @_ }, sub { unpack "(w/a*)*", shift })';
267 268
281 # default for on_event is to raise an error 282 # default for on_event is to raise an error
282 $on_event ||= sub { $on_error->("event received, but no on_event handler") }; 283 $on_event ||= sub { $on_error->("event received, but no on_event handler") };
283 284
284 my ($f, $t) = eval $serialiser; die $@ if $@; 285 my ($f, $t) = eval $serialiser; die $@ if $@;
285 286
286 my (@rcb, $fh, $shutdown, $wbuf, $ww, $rw); 287 my (@rcb, %rcb, $fh, $shutdown, $wbuf, $ww);
287 my ($rlen, $rbuf) = 512 - 16; 288 my ($rlen, $rbuf, $rw) = 512 - 16;
288 289
289 my $wcb = sub { 290 my $wcb = sub {
290 my $len = syswrite $fh, $wbuf; 291 my $len = syswrite $fh, $wbuf;
291 292
292 if (!defined $len) { 293 unless (defined $len) {
293 if ($! != Errno::EAGAIN && $! != Errno::EWOULDBLOCK) { 294 if ($! != Errno::EAGAIN && $! != Errno::EWOULDBLOCK) {
294 undef $rw; undef $ww; # it ends here 295 undef $rw; undef $ww; # it ends here
295 $on_error->("$!"); 296 $on_error->("$!");
296 } 297 }
297 } 298 }
308 309
309 $self->require ($module) 310 $self->require ($module)
310 ->send_arg ($function, $arg{init}, $serialiser) 311 ->send_arg ($function, $arg{init}, $serialiser)
311 ->run ("$module\::run", sub { 312 ->run ("$module\::run", sub {
312 $fh = shift; 313 $fh = shift;
314
315 my ($id, $len);
313 $rw = AE::io $fh, 0, sub { 316 $rw = AE::io $fh, 0, sub {
314 $rlen = $rlen * 2 + 16 if $rlen - 128 < length $rbuf; 317 $rlen = $rlen * 2 + 16 if $rlen - 128 < length $rbuf;
315 my $len = sysread $fh, $rbuf, $rlen - length $rbuf, length $rbuf; 318 $len = sysread $fh, $rbuf, $rlen - length $rbuf, length $rbuf;
316 319
317 if ($len) { 320 if ($len) {
318 while (4 <= length $rbuf) { 321 while (8 <= length $rbuf) {
319 $len = unpack "L", $rbuf; 322 ($id, $len) = unpack "LL", $rbuf;
320 4 + $len <= length $rbuf 323 8 + $len <= length $rbuf
321 or last; 324 or last;
322 325
323 my @r = $t->(substr $rbuf, 4, $len); 326 my @r = $t->(substr $rbuf, 8, $len);
324 substr $rbuf, 0, $len + 4, ""; 327 substr $rbuf, 0, 8 + $len, "";
328
329 if ($id) {
330 if (@rcb) {
331 (shift @rcb)->(@r);
332 } elsif (my $cb = delete $rcb{$id}) {
333 $cb->(@r);
334 } else {
335 undef $rw; undef $ww;
336 $on_error->("unexpected data from child");
325 337 }
326 if (pop @r) { 338 } else {
327 $on_event->(@r); 339 $on_event->(@r);
328 } elsif (@rcb) {
329 (shift @rcb)->(@r);
330 } else {
331 undef $rw; undef $ww;
332 $on_error->("unexpected data from child");
333 } 340 }
334 } 341 }
335 } elsif (defined $len) { 342 } elsif (defined $len) {
336 undef $rw; undef $ww; # it ends here 343 undef $rw; undef $ww; # it ends here
337 344
338 if (@rcb) { 345 if (@rcb || %rcb) {
346 use Data::Dump;ddx[\@rcb,\%rcb];#d#
339 $on_error->("unexpected eof"); 347 $on_error->("unexpected eof");
340 } else { 348 } else {
341 $on_destroy->(); 349 $on_destroy->();
342 } 350 }
343 } elsif ($! != Errno::EAGAIN && $! != Errno::EWOULDBLOCK) { 351 } elsif ($! != Errno::EAGAIN && $! != Errno::EWOULDBLOCK) {
352 my $guard = Guard::guard { 360 my $guard = Guard::guard {
353 $shutdown = 1; 361 $shutdown = 1;
354 $ww ||= $fh && AE::io $fh, 1, $wcb; 362 $ww ||= $fh && AE::io $fh, 1, $wcb;
355 }; 363 };
356 364
365 my $id;
366
367 $arg{async}
357 sub { 368 ? sub {
358 push @rcb, pop; 369 $id = ($id == 0xffffffff ? 0 : $id) + 1;
370 $id = ($id == 0xffffffff ? 0 : $id) + 1 while exists $rcb{$id}; # rarely loops
359 371
372 $rcb{$id} = pop;
373
360 $guard; # keep it alive 374 $guard; # keep it alive
361 375
362 $wbuf .= pack "L/a*", &$f; 376 $wbuf .= pack "LL/a*", $id, &$f;
363 $ww ||= $fh && AE::io $fh, 1, $wcb; 377 $ww ||= $fh && AE::io $fh, 1, $wcb;
364 } 378 }
379 : sub {
380 push @rcb, pop;
381
382 $guard; # keep it alive
383
384 $wbuf .= pack "L/a*", &$f;
385 $ww ||= $fh && AE::io $fh, 1, $wcb;
386 }
365} 387}
366 388
367=item $rpc->(..., $cb->(...)) 389=item $rpc->(..., $cb->(...))
368 390
369The RPC object returned by C<AnyEvent::Fork::RPC::run> is actually a code 391The RPC object returned by C<AnyEvent::Fork::RPC::run> is actually a code

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines