ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-Fork-RPC/RPC.pm
(Generate patch)

Comparing AnyEvent-Fork-RPC/RPC.pm (file contents):
Revision 1.7 by root, Wed Apr 17 20:19:41 2013 UTC vs.
Revision 1.9 by root, Wed Apr 17 21:48:35 2013 UTC

256If you need an external module for serialisation, then you can either 256If you need an external module for serialisation, then you can either
257pre-load it into your L<AnyEvent::Fork> process, or you can add a C<use> 257pre-load it into your L<AnyEvent::Fork> process, or you can add a C<use>
258or C<require> statement into the serialiser string. Or both. 258or C<require> statement into the serialiser string. Or both.
259 259
260=back 260=back
261
262See the examples section earlier in this document for some actual
263examples.
261 264
262=cut 265=cut
263 266
264our $STRING_SERIALISER = '(sub { pack "(w/a*)*", @_ }, sub { unpack "(w/a*)*", shift })'; 267our $STRING_SERIALISER = '(sub { pack "(w/a*)*", @_ }, sub { unpack "(w/a*)*", shift })';
265 268
279 # default for on_event is to raise an error 282 # default for on_event is to raise an error
280 $on_event ||= sub { $on_error->("event received, but no on_event handler") }; 283 $on_event ||= sub { $on_error->("event received, but no on_event handler") };
281 284
282 my ($f, $t) = eval $serialiser; die $@ if $@; 285 my ($f, $t) = eval $serialiser; die $@ if $@;
283 286
284 my (@rcb, $fh, $shutdown, $wbuf, $ww, $rw); 287 my (@rcb, %rcb, $fh, $shutdown, $wbuf, $ww);
285 my ($rlen, $rbuf) = 512 - 16; 288 my ($rlen, $rbuf, $rw) = 512 - 16;
286 289
287 my $wcb = sub { 290 my $wcb = sub {
288 my $len = syswrite $fh, $wbuf; 291 my $len = syswrite $fh, $wbuf;
289 292
290 if (!defined $len) { 293 unless (defined $len) {
291 if ($! != Errno::EAGAIN && $! != Errno::EWOULDBLOCK) { 294 if ($! != Errno::EAGAIN && $! != Errno::EWOULDBLOCK) {
292 undef $rw; undef $ww; # it ends here 295 undef $rw; undef $ww; # it ends here
293 $on_error->("$!"); 296 $on_error->("$!");
294 } 297 }
295 } 298 }
306 309
307 $self->require ($module) 310 $self->require ($module)
308 ->send_arg ($function, $arg{init}, $serialiser) 311 ->send_arg ($function, $arg{init}, $serialiser)
309 ->run ("$module\::run", sub { 312 ->run ("$module\::run", sub {
310 $fh = shift; 313 $fh = shift;
314
315 my ($id, $len);
311 $rw = AE::io $fh, 0, sub { 316 $rw = AE::io $fh, 0, sub {
312 $rlen = $rlen * 2 + 16 if $rlen - 128 < length $rbuf; 317 $rlen = $rlen * 2 + 16 if $rlen - 128 < length $rbuf;
313 my $len = sysread $fh, $rbuf, $rlen - length $rbuf, length $rbuf; 318 $len = sysread $fh, $rbuf, $rlen - length $rbuf, length $rbuf;
314 319
315 if ($len) { 320 if ($len) {
316 while (4 <= length $rbuf) { 321 while (8 <= length $rbuf) {
317 $len = unpack "L", $rbuf; 322 ($id, $len) = unpack "LL", $rbuf;
318 4 + $len <= length $rbuf 323 8 + $len <= length $rbuf
319 or last; 324 or last;
320 325
321 my @r = $t->(substr $rbuf, 4, $len); 326 my @r = $t->(substr $rbuf, 8, $len);
322 substr $rbuf, 0, $len + 4, ""; 327 substr $rbuf, 0, 8 + $len, "";
328
329 if ($id) {
330 if (@rcb) {
331 (shift @rcb)->(@r);
332 } elsif (my $cb = delete $rcb{$id}) {
333 $cb->(@r);
334 } else {
335 undef $rw; undef $ww;
336 $on_error->("unexpected data from child");
323 337 }
324 if (pop @r) { 338 } else {
325 $on_event->(@r); 339 $on_event->(@r);
326 } elsif (@rcb) {
327 (shift @rcb)->(@r);
328 } else {
329 undef $rw; undef $ww;
330 $on_error->("unexpected data from child");
331 } 340 }
332 } 341 }
333 } elsif (defined $len) { 342 } elsif (defined $len) {
334 undef $rw; undef $ww; # it ends here 343 undef $rw; undef $ww; # it ends here
335 344
336 if (@rcb) { 345 if (@rcb || %rcb) {
346 use Data::Dump;ddx[\@rcb,\%rcb];#d#
337 $on_error->("unexpected eof"); 347 $on_error->("unexpected eof");
338 } else { 348 } else {
339 $on_destroy->(); 349 $on_destroy->();
340 } 350 }
341 } elsif ($! != Errno::EAGAIN && $! != Errno::EWOULDBLOCK) { 351 } elsif ($! != Errno::EAGAIN && $! != Errno::EWOULDBLOCK) {
350 my $guard = Guard::guard { 360 my $guard = Guard::guard {
351 $shutdown = 1; 361 $shutdown = 1;
352 $ww ||= $fh && AE::io $fh, 1, $wcb; 362 $ww ||= $fh && AE::io $fh, 1, $wcb;
353 }; 363 };
354 364
365 my $id;
366
367 $arg{async}
355 sub { 368 ? sub {
356 push @rcb, pop; 369 $id = ($id == 0xffffffff ? 0 : $id) + 1;
370 $id = ($id == 0xffffffff ? 0 : $id) + 1 while exists $rcb{$id}; # rarely loops
357 371
372 $rcb{$id} = pop;
373
358 $guard; # keep it alive 374 $guard; # keep it alive
359 375
360 $wbuf .= pack "L/a*", &$f; 376 $wbuf .= pack "LL/a*", $id, &$f;
361 $ww ||= $fh && AE::io $fh, 1, $wcb; 377 $ww ||= $fh && AE::io $fh, 1, $wcb;
362 } 378 }
379 : sub {
380 push @rcb, pop;
381
382 $guard; # keep it alive
383
384 $wbuf .= pack "L/a*", &$f;
385 $ww ||= $fh && AE::io $fh, 1, $wcb;
386 }
363} 387}
364 388
365=item $rpc->(..., $cb->(...)) 389=item $rpc->(..., $cb->(...))
366 390
367The RPC object returned by C<AnyEvent::Fork::RPC::run> is actually a code 391The RPC object returned by C<AnyEvent::Fork::RPC::run> is actually a code
382 406
383The other thing that can be done with the RPC object is to destroy it. In 407The other thing that can be done with the RPC object is to destroy it. In
384this case, the child process will execute all remaining RPC calls, report 408this case, the child process will execute all remaining RPC calls, report
385their results, and then exit. 409their results, and then exit.
386 410
411See the examples section earlier in this document for some actual
412examples.
413
387=back 414=back
388 415
389=head1 CHILD PROCESS USAGE 416=head1 CHILD PROCESS USAGE
390 417
391The following function is not available in this module. They are only 418The following function is not available in this module. They are only
399 426
400Send an event to the parent. Events are a bit like RPC calls made by the 427Send an event to the parent. Events are a bit like RPC calls made by the
401child process to the parent, except that there is no notion of return 428child process to the parent, except that there is no notion of return
402values. 429values.
403 430
431See the examples section earlier in this document for some actual
432examples.
433
404=back 434=back
405 435
406=head1 SEE ALSO 436=head1 SEE ALSO
407 437
408L<AnyEvent::Fork> (to create the processes in the first place), 438L<AnyEvent::Fork> (to create the processes in the first place),

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines