ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-Fork/Fork.pm
(Generate patch)

Comparing AnyEvent-Fork/Fork.pm (file contents):
Revision 1.41 by root, Mon Apr 8 03:20:53 2013 UTC vs.
Revision 1.48 by root, Fri Apr 19 10:51:41 2013 UTC

34This module only creates processes and lets you pass file handles and 34This module only creates processes and lets you pass file handles and
35strings to it, and run perl code. It does not implement any kind of RPC - 35strings to it, and run perl code. It does not implement any kind of RPC -
36there is no back channel from the process back to you, and there is no RPC 36there is no back channel from the process back to you, and there is no RPC
37or message passing going on. 37or message passing going on.
38 38
39If you need some form of RPC, you can either implement it yourself 39If you need some form of RPC, you could use the L<AnyEvent::Fork::RPC>
40in whatever way you like, use some message-passing module such 40companion module, which adds simple RPC/job queueing to a process created
41as L<AnyEvent::MP>, some pipe such as L<AnyEvent::ZeroMQ>, use 41by this module.
42L<AnyEvent::Handle> on both sides to send e.g. JSON or Storable messages, 42
43and so on. 43Or you can implement it yourself in whatever way you like, use some
44message-passing module such as L<AnyEvent::MP>, some pipe such as
45L<AnyEvent::ZeroMQ>, use L<AnyEvent::Handle> on both sides to send
46e.g. JSON or Storable messages, and so on.
44 47
45=head2 COMPARISON TO OTHER MODULES 48=head2 COMPARISON TO OTHER MODULES
46 49
47There is an abundance of modules on CPAN that do "something fork", such as 50There is an abundance of modules on CPAN that do "something fork", such as
48L<Parallel::ForkManager>, L<AnyEvent::ForkManager>, L<AnyEvent::Worker> 51L<Parallel::ForkManager>, L<AnyEvent::ForkManager>, L<AnyEvent::Worker>
221 } 224 }
222 } 225 }
223 226
224=head2 use AnyEvent::Fork as a faster fork+exec 227=head2 use AnyEvent::Fork as a faster fork+exec
225 228
226This runs C</bin/echo hi>, with stdandard output redirected to /tmp/log 229This runs C</bin/echo hi>, with standard output redirected to F</tmp/log>
227and standard error redirected to the communications socket. It is usually 230and standard error redirected to the communications socket. It is usually
228faster than fork+exec, but still lets you prepare the environment. 231faster than fork+exec, but still lets you prepare the environment.
229 232
230 open my $output, ">/tmp/log" or die "$!"; 233 open my $output, ">/tmp/log" or die "$!";
231 234
251 254
252=head1 CONCEPTS 255=head1 CONCEPTS
253 256
254This module can create new processes either by executing a new perl 257This module can create new processes either by executing a new perl
255process, or by forking from an existing "template" process. 258process, or by forking from an existing "template" process.
259
260All these processes are called "child processes" (whether they are direct
261children or not), while the process that manages them is called the
262"parent process".
256 263
257Each such process comes with its own file handle that can be used to 264Each such process comes with its own file handle that can be used to
258communicate with it (it's actually a socket - one end in the new process, 265communicate with it (it's actually a socket - one end in the new process,
259one end in the main process), and among the things you can do in it are 266one end in the main process), and among the things you can do in it are
260load modules, fork new processes, send file handles to it, and execute 267load modules, fork new processes, send file handles to it, and execute
370use AnyEvent; 377use AnyEvent;
371use AnyEvent::Util (); 378use AnyEvent::Util ();
372 379
373use IO::FDPass; 380use IO::FDPass;
374 381
375our $VERSION = 0.6; 382our $VERSION = 0.7;
376
377=over 4
378
379=back
380
381=cut
382 383
383# the early fork template process 384# the early fork template process
384our $EARLY; 385our $EARLY;
385 386
386# the empty template process 387# the empty template process
387our $TEMPLATE; 388our $TEMPLATE;
389
390sub QUEUE() { 0 }
391sub FH() { 1 }
392sub WW() { 2 }
393sub PID() { 3 }
394sub CB() { 4 }
395
396sub _new {
397 my ($self, $fh, $pid) = @_;
398
399 AnyEvent::Util::fh_nonblocking $fh, 1;
400
401 $self = bless [
402 [], # write queue - strings or fd's
403 $fh,
404 undef, # AE watcher
405 $pid,
406 ], $self;
407
408 $self
409}
388 410
389sub _cmd { 411sub _cmd {
390 my $self = shift; 412 my $self = shift;
391 413
392 # ideally, we would want to use "a (w/a)*" as format string, but perl 414 # ideally, we would want to use "a (w/a)*" as format string, but perl
393 # versions from at least 5.8.9 to 5.16.3 are all buggy and can't unpack 415 # versions from at least 5.8.9 to 5.16.3 are all buggy and can't unpack
394 # it. 416 # it.
395 push @{ $self->[2] }, pack "a L/a*", $_[0], $_[1]; 417 push @{ $self->[QUEUE] }, pack "a L/a*", $_[0], $_[1];
396 418
397 $self->[3] ||= AE::io $self->[1], 1, sub { 419 $self->[WW] ||= AE::io $self->[FH], 1, sub {
398 do { 420 do {
399 # send the next "thing" in the queue - either a reference to an fh, 421 # send the next "thing" in the queue - either a reference to an fh,
400 # or a plain string. 422 # or a plain string.
401 423
402 if (ref $self->[2][0]) { 424 if (ref $self->[QUEUE][0]) {
403 # send fh 425 # send fh
404 unless (IO::FDPass::send fileno $self->[1], fileno ${ $self->[2][0] }) { 426 unless (IO::FDPass::send fileno $self->[FH], fileno ${ $self->[QUEUE][0] }) {
405 return if $! == Errno::EAGAIN || $! == Errno::EWOULDBLOCK; 427 return if $! == Errno::EAGAIN || $! == Errno::EWOULDBLOCK;
406 undef $self->[3]; 428 undef $self->[WW];
407 die "AnyEvent::Fork: file descriptor send failure: $!"; 429 die "AnyEvent::Fork: file descriptor send failure: $!";
408 } 430 }
409 431
410 shift @{ $self->[2] }; 432 shift @{ $self->[QUEUE] };
411 433
412 } else { 434 } else {
413 # send string 435 # send string
414 my $len = syswrite $self->[1], $self->[2][0]; 436 my $len = syswrite $self->[FH], $self->[QUEUE][0];
415 437
416 unless ($len) { 438 unless ($len) {
417 return if $! == Errno::EAGAIN || $! == Errno::EWOULDBLOCK; 439 return if $! == Errno::EAGAIN || $! == Errno::EWOULDBLOCK;
418 undef $self->[3]; 440 undef $self->[3];
419 die "AnyEvent::Fork: command write failure: $!"; 441 die "AnyEvent::Fork: command write failure: $!";
420 } 442 }
421 443
422 substr $self->[2][0], 0, $len, ""; 444 substr $self->[QUEUE][0], 0, $len, "";
423 shift @{ $self->[2] } unless length $self->[2][0]; 445 shift @{ $self->[QUEUE] } unless length $self->[QUEUE][0];
424 } 446 }
425 } while @{ $self->[2] }; 447 } while @{ $self->[QUEUE] };
426 448
427 # everything written 449 # everything written
428 undef $self->[3]; 450 undef $self->[WW];
429 451
430 # invoke run callback, if any 452 # invoke run callback, if any
431 $self->[4]->($self->[1]) if $self->[4]; 453 if ($self->[CB]) {
454 $self->[CB]->($self->[FH]);
455 @$self = ();
456 }
432 }; 457 };
433 458
434 () # make sure we don't leak the watcher 459 () # make sure we don't leak the watcher
435}
436
437sub _new {
438 my ($self, $fh, $pid) = @_;
439
440 AnyEvent::Util::fh_nonblocking $fh, 1;
441
442 $self = bless [
443 $pid,
444 $fh,
445 [], # write queue - strings or fd's
446 undef, # AE watcher
447 ], $self;
448
449 $self
450} 460}
451 461
452# fork template from current process, used by AnyEvent::Fork::Early/Template 462# fork template from current process, used by AnyEvent::Fork::Early/Template
453sub _new_fork { 463sub _new_fork {
454 my ($fh, $slave) = AnyEvent::Util::portable_socketpair; 464 my ($fh, $slave) = AnyEvent::Util::portable_socketpair;
587AnyEvent::Fork itself. 597AnyEvent::Fork itself.
588 598
589=cut 599=cut
590 600
591sub pid { 601sub pid {
592 $_[0][0] 602 $_[0][PID]
593} 603}
594 604
595=item $proc = $proc->eval ($perlcode, @args) 605=item $proc = $proc->eval ($perlcode, @args)
596 606
597Evaluates the given C<$perlcode> as ... perl code, while setting C<@_> to 607Evaluates the given C<$perlcode> as ... Perl code, while setting C<@_> to
598the strings specified by C<@args>, in the "main" package. 608the strings specified by C<@args>, in the "main" package.
599 609
600This call is meant to do any custom initialisation that might be required 610This call is meant to do any custom initialisation that might be required
601(for example, the C<require> method uses it). It's not supposed to be used 611(for example, the C<require> method uses it). It's not supposed to be used
602to completely take over the process, use C<run> for that. 612to completely take over the process, use C<run> for that.
664sub send_fh { 674sub send_fh {
665 my ($self, @fh) = @_; 675 my ($self, @fh) = @_;
666 676
667 for my $fh (@fh) { 677 for my $fh (@fh) {
668 $self->_cmd ("h"); 678 $self->_cmd ("h");
669 push @{ $self->[2] }, \$fh; 679 push @{ $self->[QUEUE] }, \$fh;
670 } 680 }
671 681
672 $self 682 $self
673} 683}
674 684
760=cut 770=cut
761 771
762sub run { 772sub run {
763 my ($self, $func, $cb) = @_; 773 my ($self, $func, $cb) = @_;
764 774
765 $self->[4] = $cb; 775 $self->[CB] = $cb;
766 $self->_cmd (r => $func); 776 $self->_cmd (r => $func);
777}
778
779=item $proc->to_fh ($cb->($fh))
780
781Flushes all commands out to the process and then calls the callback with
782the communications socket.
783
784The process object becomes unusable on return from this function - any
785further method calls result in undefined behaviour.
786
787The point of this method is to give you a file handle thta you cna pass
788to another process. In that other process, you can call C<new_from_fh
789AnyEvent::Fork::RPC> to create a new C<AnyEvent::Fork> object from it,
790thereby effectively passing a fork object to another process.
791
792=cut
793
794sub to_fh {
795 my ($self, $cb) = @_;
796
797 $self->[CB] = $cb;
798
799 unless ($self->[WW]) {
800 $self->[CB]->($self->[FH]);
801 @$self = ();
802 }
803}
804
805=item new_from_fh AnyEvent::Fork $fh
806
807Takes a file handle originally rceeived by the C<to_fh> method and creates
808a new C<AnyEvent:Fork> object. The child process itself will not change in
809any way, i.e. it will keep all the modifications done to it before calling
810C<to_fh>.
811
812The new object is very much like the original object, except that the
813C<pid> method will return C<undef> even if the process is a direct child.
814
815=cut
816
817sub new_from_fh {
818 my ($class, $fh) = @_;
819
820 $class->_new ($fh)
767} 821}
768 822
769=back 823=back
770 824
771=head1 PERFORMANCE 825=head1 PERFORMANCE
781 835
782 2079 new processes per second, using manual socketpair + fork 836 2079 new processes per second, using manual socketpair + fork
783 837
784Then I did the same thing, but instead of calling fork, I called 838Then I did the same thing, but instead of calling fork, I called
785AnyEvent::Fork->new->run ("CORE::exit") and then again waited for the 839AnyEvent::Fork->new->run ("CORE::exit") and then again waited for the
786socket form the child to close on exit. This does the same thing as manual 840socket from the child to close on exit. This does the same thing as manual
787socket pair + fork, except that what is forked is the template process 841socket pair + fork, except that what is forked is the template process
788(2440kB), and the socket needs to be passed to the server at the other end 842(2440kB), and the socket needs to be passed to the server at the other end
789of the socket first. 843of the socket first.
790 844
791 2307 new processes per second, using AnyEvent::Fork->new 845 2307 new processes per second, using AnyEvent::Fork->new
798So how can C<< AnyEvent->new >> be faster than a standard fork, even 852So how can C<< AnyEvent->new >> be faster than a standard fork, even
799though it uses the same operations, but adds a lot of overhead? 853though it uses the same operations, but adds a lot of overhead?
800 854
801The difference is simply the process size: forking the 5MB process takes 855The difference is simply the process size: forking the 5MB process takes
802so much longer than forking the 2.5MB template process that the extra 856so much longer than forking the 2.5MB template process that the extra
803overhead introduced is canceled out. 857overhead is canceled out.
804 858
805If the benchmark process grows, the normal fork becomes even slower: 859If the benchmark process grows, the normal fork becomes even slower:
806 860
807 1340 new processes, manual fork of a 20MB process 861 1340 new processes, manual fork of a 20MB process
808 731 new processes, manual fork of a 200MB process 862 731 new processes, manual fork of a 200MB process
868initialising them, for example, by calling C<init Gtk2> manually. 922initialising them, for example, by calling C<init Gtk2> manually.
869 923
870=item exiting calls object destructors 924=item exiting calls object destructors
871 925
872This only applies to users of L<AnyEvent::Fork:Early> and 926This only applies to users of L<AnyEvent::Fork:Early> and
873L<AnyEvent::Fork::Template>, or when initialiasing code creates objects 927L<AnyEvent::Fork::Template>, or when initialising code creates objects
874that reference external resources. 928that reference external resources.
875 929
876When a process created by AnyEvent::Fork exits, it might do so by calling 930When a process created by AnyEvent::Fork exits, it might do so by calling
877exit, or simply letting perl reach the end of the program. At which point 931exit, or simply letting perl reach the end of the program. At which point
878Perl runs all destructors. 932Perl runs all destructors.
903Cygwin perl is not supported at the moment due to some hilarious 957Cygwin perl is not supported at the moment due to some hilarious
904shortcomings of its API - see L<IO::FDPoll> for more details. 958shortcomings of its API - see L<IO::FDPoll> for more details.
905 959
906=head1 SEE ALSO 960=head1 SEE ALSO
907 961
908L<AnyEvent::Fork::Early> (to avoid executing a perl interpreter), 962L<AnyEvent::Fork::Early>, to avoid executing a perl interpreter at all
963(part of this distribution).
964
909L<AnyEvent::Fork::Template> (to create a process by forking the main 965L<AnyEvent::Fork::Template>, to create a process by forking the main
910program at a convenient time). 966program at a convenient time (part of this distribution).
911 967
912=head1 AUTHOR 968L<AnyEvent::Fork::RPC>, for simple RPC to child processes (on CPAN).
969
970=head1 AUTHOR AND CONTACT INFORMATION
913 971
914 Marc Lehmann <schmorp@schmorp.de> 972 Marc Lehmann <schmorp@schmorp.de>
915 http://home.schmorp.de/ 973 http://software.schmorp.de/pkg/AnyEvent-Fork
916 974
917=cut 975=cut
918 976
9191 9771
920 978

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines