ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-Fork/Fork.pm
(Generate patch)

Comparing AnyEvent-Fork/Fork.pm (file contents):
Revision 1.38 by root, Sat Apr 6 20:07:30 2013 UTC vs.
Revision 1.43 by root, Thu Apr 18 10:47:48 2013 UTC

27 27
28Special care has been taken to make this module useful from other modules, 28Special care has been taken to make this module useful from other modules,
29while still supporting specialised environments such as L<App::Staticperl> 29while still supporting specialised environments such as L<App::Staticperl>
30or L<PAR::Packer>. 30or L<PAR::Packer>.
31 31
32=head1 WHAT THIS MODULE IS NOT 32=head2 WHAT THIS MODULE IS NOT
33 33
34This module only creates processes and lets you pass file handles and 34This module only creates processes and lets you pass file handles and
35strings to it, and run perl code. It does not implement any kind of RPC - 35strings to it, and run perl code. It does not implement any kind of RPC -
36there is no back channel from the process back to you, and there is no RPC 36there is no back channel from the process back to you, and there is no RPC
37or message passing going on. 37or message passing going on.
38 38
39If you need some form of RPC, you can either implement it yourself 39If you need some form of RPC, you could use the L<AnyEvent::Fork::RPC>
40in whatever way you like, use some message-passing module such 40companion module, which adds simple RPC/job queueing to a process created
41as L<AnyEvent::MP>, some pipe such as L<AnyEvent::ZeroMQ>, use 41by this module.
42L<AnyEvent::Handle> on both sides to send e.g. JSON or Storable messages,
43and so on.
44 42
43Or you can implement it yourself in whatever way you like, use some
44message-passing module such as L<AnyEvent::MP>, some pipe such as
45L<AnyEvent::ZeroMQ>, use L<AnyEvent::Handle> on both sides to send
46e.g. JSON or Storable messages, and so on.
47
48=head2 COMPARISON TO OTHER MODULES
49
50There is an abundance of modules on CPAN that do "something fork", such as
51L<Parallel::ForkManager>, L<AnyEvent::ForkManager>, L<AnyEvent::Worker>
52or L<AnyEvent::Subprocess>. There are modules that implement their own
53process management, such as L<AnyEvent::DBI>.
54
55The problems that all these modules try to solve are real, however, none
56of them (from what I have seen) tackle the very real problems of unwanted
57memory sharing, efficiency, not being able to use event processing or
58similar modules in the processes they create.
59
60This module doesn't try to replace any of them - instead it tries to solve
61the problem of creating processes with a minimum of fuss and overhead (and
62also luxury). Ideally, most of these would use AnyEvent::Fork internally,
63except they were written before AnyEvent:Fork was available, so obviously
64had to roll their own.
65
45=head1 PROBLEM STATEMENT 66=head2 PROBLEM STATEMENT
46 67
47There are two traditional ways to implement parallel processing on UNIX 68There are two traditional ways to implement parallel processing on UNIX
48like operating systems - fork and process, and fork+exec and process. They 69like operating systems - fork and process, and fork+exec and process. They
49have different advantages and disadvantages that I describe below, 70have different advantages and disadvantages that I describe below,
50together with how this module tries to mitigate the disadvantages. 71together with how this module tries to mitigate the disadvantages.
212 open my $output, ">/tmp/log" or die "$!"; 233 open my $output, ">/tmp/log" or die "$!";
213 234
214 AnyEvent::Fork 235 AnyEvent::Fork
215 ->new 236 ->new
216 ->eval (' 237 ->eval ('
238 # compile a helper function for later use
217 sub run { 239 sub run {
218 my ($fh, $output, @cmd) = @_; 240 my ($fh, $output, @cmd) = @_;
219 241
220 # perl will clear close-on-exec on STDOUT/STDERR 242 # perl will clear close-on-exec on STDOUT/STDERR
221 open STDOUT, ">&", $output or die; 243 open STDOUT, ">&", $output or die;
351use AnyEvent; 373use AnyEvent;
352use AnyEvent::Util (); 374use AnyEvent::Util ();
353 375
354use IO::FDPass; 376use IO::FDPass;
355 377
356our $VERSION = 0.5; 378our $VERSION = 0.6;
357
358our $PERL; # the path to the perl interpreter, deduces with various forms of magic
359
360=over 4
361
362=back
363
364=cut
365 379
366# the early fork template process 380# the early fork template process
367our $EARLY; 381our $EARLY;
368 382
369# the empty template process 383# the empty template process
370our $TEMPLATE; 384our $TEMPLATE;
385
386sub QUEUE() { 0 }
387sub FH() { 1 }
388sub WW() { 2 }
389sub PID() { 3 }
390sub CB() { 4 }
391
392sub _new {
393 my ($self, $fh, $pid) = @_;
394
395 AnyEvent::Util::fh_nonblocking $fh, 1;
396
397 $self = bless [
398 [], # write queue - strings or fd's
399 $fh,
400 undef, # AE watcher
401 $pid,
402 ], $self;
403
404 $self
405}
371 406
372sub _cmd { 407sub _cmd {
373 my $self = shift; 408 my $self = shift;
374 409
375 # ideally, we would want to use "a (w/a)*" as format string, but perl 410 # ideally, we would want to use "a (w/a)*" as format string, but perl
376 # versions from at least 5.8.9 to 5.16.3 are all buggy and can't unpack 411 # versions from at least 5.8.9 to 5.16.3 are all buggy and can't unpack
377 # it. 412 # it.
378 push @{ $self->[2] }, pack "a L/a*", $_[0], $_[1]; 413 push @{ $self->[QUEUE] }, pack "a L/a*", $_[0], $_[1];
379 414
380 $self->[3] ||= AE::io $self->[1], 1, sub { 415 $self->[WW] ||= AE::io $self->[FH], 1, sub {
381 do { 416 do {
382 # send the next "thing" in the queue - either a reference to an fh, 417 # send the next "thing" in the queue - either a reference to an fh,
383 # or a plain string. 418 # or a plain string.
384 419
385 if (ref $self->[2][0]) { 420 if (ref $self->[QUEUE][0]) {
386 # send fh 421 # send fh
387 unless (IO::FDPass::send fileno $self->[1], fileno ${ $self->[2][0] }) { 422 unless (IO::FDPass::send fileno $self->[FH], fileno ${ $self->[QUEUE][0] }) {
388 return if $! == Errno::EAGAIN || $! == Errno::EWOULDBLOCK; 423 return if $! == Errno::EAGAIN || $! == Errno::EWOULDBLOCK;
389 undef $self->[3]; 424 undef $self->[WW];
390 die "AnyEvent::Fork: file descriptor send failure: $!"; 425 die "AnyEvent::Fork: file descriptor send failure: $!";
391 } 426 }
392 427
393 shift @{ $self->[2] }; 428 shift @{ $self->[QUEUE] };
394 429
395 } else { 430 } else {
396 # send string 431 # send string
397 my $len = syswrite $self->[1], $self->[2][0]; 432 my $len = syswrite $self->[FH], $self->[QUEUE][0];
398 433
399 unless ($len) { 434 unless ($len) {
400 return if $! == Errno::EAGAIN || $! == Errno::EWOULDBLOCK; 435 return if $! == Errno::EAGAIN || $! == Errno::EWOULDBLOCK;
401 undef $self->[3]; 436 undef $self->[3];
402 die "AnyEvent::Fork: command write failure: $!"; 437 die "AnyEvent::Fork: command write failure: $!";
403 } 438 }
404 439
405 substr $self->[2][0], 0, $len, ""; 440 substr $self->[QUEUE][0], 0, $len, "";
406 shift @{ $self->[2] } unless length $self->[2][0]; 441 shift @{ $self->[QUEUE] } unless length $self->[QUEUE][0];
407 } 442 }
408 } while @{ $self->[2] }; 443 } while @{ $self->[QUEUE] };
409 444
410 # everything written 445 # everything written
411 undef $self->[3]; 446 undef $self->[WW];
412 447
413 # invoke run callback, if any 448 # invoke run callback, if any
414 $self->[4]->($self->[1]) if $self->[4]; 449 $self->[CB]->($self->[FH]) if $self->[CB];
415 }; 450 };
416 451
417 () # make sure we don't leak the watcher 452 () # make sure we don't leak the watcher
418}
419
420sub _new {
421 my ($self, $fh, $pid) = @_;
422
423 AnyEvent::Util::fh_nonblocking $fh, 1;
424
425 $self = bless [
426 $pid,
427 $fh,
428 [], # write queue - strings or fd's
429 undef, # AE watcher
430 ], $self;
431
432 $self
433} 453}
434 454
435# fork template from current process, used by AnyEvent::Fork::Early/Template 455# fork template from current process, used by AnyEvent::Fork::Early/Template
436sub _new_fork { 456sub _new_fork {
437 my ($fh, $slave) = AnyEvent::Util::portable_socketpair; 457 my ($fh, $slave) = AnyEvent::Util::portable_socketpair;
442 if ($pid eq 0) { 462 if ($pid eq 0) {
443 require AnyEvent::Fork::Serve; 463 require AnyEvent::Fork::Serve;
444 $AnyEvent::Fork::Serve::OWNER = $parent; 464 $AnyEvent::Fork::Serve::OWNER = $parent;
445 close $fh; 465 close $fh;
446 $0 = "$_[1] of $parent"; 466 $0 = "$_[1] of $parent";
447 $SIG{CHLD} = 'IGNORE';
448 AnyEvent::Fork::Serve::serve ($slave); 467 AnyEvent::Fork::Serve::serve ($slave);
449 exit 0; 468 exit 0;
450 } elsif (!$pid) { 469 } elsif (!$pid) {
451 die "AnyEvent::Fork::Early/Template: unable to fork template process: $!"; 470 die "AnyEvent::Fork::Early/Template: unable to fork template process: $!";
452 } 471 }
571AnyEvent::Fork itself. 590AnyEvent::Fork itself.
572 591
573=cut 592=cut
574 593
575sub pid { 594sub pid {
576 $_[0][0] 595 $_[0][PID]
577} 596}
578 597
579=item $proc = $proc->eval ($perlcode, @args) 598=item $proc = $proc->eval ($perlcode, @args)
580 599
581Evaluates the given C<$perlcode> as ... perl code, while setting C<@_> to 600Evaluates the given C<$perlcode> as ... perl code, while setting C<@_> to
648sub send_fh { 667sub send_fh {
649 my ($self, @fh) = @_; 668 my ($self, @fh) = @_;
650 669
651 for my $fh (@fh) { 670 for my $fh (@fh) {
652 $self->_cmd ("h"); 671 $self->_cmd ("h");
653 push @{ $self->[2] }, \$fh; 672 push @{ $self->[QUEUE] }, \$fh;
654 } 673 }
655 674
656 $self 675 $self
657} 676}
658 677
744=cut 763=cut
745 764
746sub run { 765sub run {
747 my ($self, $func, $cb) = @_; 766 my ($self, $func, $cb) = @_;
748 767
749 $self->[4] = $cb; 768 $self->[CB] = $cb;
750 $self->_cmd (r => $func); 769 $self->_cmd (r => $func);
751} 770}
752 771
753=back 772=back
754 773
782So how can C<< AnyEvent->new >> be faster than a standard fork, even 801So how can C<< AnyEvent->new >> be faster than a standard fork, even
783though it uses the same operations, but adds a lot of overhead? 802though it uses the same operations, but adds a lot of overhead?
784 803
785The difference is simply the process size: forking the 5MB process takes 804The difference is simply the process size: forking the 5MB process takes
786so much longer than forking the 2.5MB template process that the extra 805so much longer than forking the 2.5MB template process that the extra
787overhead introduced is canceled out. 806overhead is canceled out.
788 807
789If the benchmark process grows, the normal fork becomes even slower: 808If the benchmark process grows, the normal fork becomes even slower:
790 809
791 1340 new processes, manual fork of a 20MB process 810 1340 new processes, manual fork of a 20MB process
792 731 new processes, manual fork of a 200MB process 811 731 new processes, manual fork of a 200MB process
891 910
892L<AnyEvent::Fork::Early> (to avoid executing a perl interpreter), 911L<AnyEvent::Fork::Early> (to avoid executing a perl interpreter),
893L<AnyEvent::Fork::Template> (to create a process by forking the main 912L<AnyEvent::Fork::Template> (to create a process by forking the main
894program at a convenient time). 913program at a convenient time).
895 914
896=head1 AUTHOR 915=head1 AUTHOR AND CONTACT INFORMATION
897 916
898 Marc Lehmann <schmorp@schmorp.de> 917 Marc Lehmann <schmorp@schmorp.de>
899 http://home.schmorp.de/ 918 http://software.schmorp.de/pkg/AnyEvent-Fork
900 919
901=cut 920=cut
902 921
9031 9221
904 923

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines