--- AnyEvent-Fork/Fork.pm 2013/04/04 01:54:40 1.8 +++ AnyEvent-Fork/Fork.pm 2013/04/04 03:45:12 1.9 @@ -2,10 +2,70 @@ AnyEvent::Fork - everything you wanted to use fork() for, but couldn't +ATTENTION, this is a very early release, and very untested. Consider it a +technology preview. + =head1 SYNOPSIS use AnyEvent::Fork; + ################################################################## + # create a single new process, tell it to run your worker function + + AnyEvent::Fork + ->new + ->require ("MyModule") + ->run ("MyModule::worker, sub { + my ($master_filehandle) = @_; + + # now $master_filehandle is connected to the + # $slave_filehandle in the new process. + }); + + # MyModule::worker might look like this + sub MyModule::worker { + my ($slave_filehandle) = @_; + + # now $slave_filehandle is connected to the $master_filehandle + # in the original prorcess. have fun! + } + + ################################################################## + # create a pool of server processes all accepting on the same socket + + # create listener socket + my $listener = ...; + + # create a pool template, initialise it and give it the socket + my $pool = AnyEvent::Fork + ->new + ->require ("Some::Stuff", "My::Server") + ->send_fh ($listener); + + # now create 10 identical workers + for my $id (1..10) { + $pool + ->fork + ->send_arg ($id) + ->run ("My::Server::run"); + } + + # now do other things - maybe use the filehandle provided by run + # to wait for the processes to die. or whatever. + + # My::Server::run might look like this + sub My::Server::run { + my ($slave, $listener, $id) = @_; + + close $slave; # we do not use the socket, so close it to save resources + + # we could go ballistic and use e.g. AnyEvent here, or IO::AIO, + # or anything we usually couldn't do in a process forked normally. + while (my $socket = $listener->accept) { + # do sth. with new socket + } + } + =head1 DESCRIPTION This module allows you to create new processes, without actually forking @@ -113,6 +173,15 @@ This is ideal for when you only need one extra process of a kind, with the option of starting and stipping it on demand. +Example: + + AnyEvent::Fork + ->new + ->require ("Some::Module") + ->run ("Some::Module::run", sub { + my ($fork_fh) = @_; + }); + =item fork a new template process, load code, then fork processes off of it and run the code @@ -130,6 +199,19 @@ only need a fixed number of proceses you can create them, and then destroy the template process. +Example: + + my $template = AnyEvent::Fork->new->require ("Some::Module"); + + for (1..10) { + $template->fork->run ("Some::Module::run", sub { + my ($fork_fh) = @_; + }); + } + + # at this point, you can keep $template around to fork new processes + # later, or you can destroy it, which causes it to vanish. + =item execute a new perl interpreter, load some code, run it This is relatively slow, and doesn't allow you to share memory between @@ -140,6 +222,15 @@ an advantage when there are long time spans where no extra processes are needed. +Example: + + AnyEvent::Fork + ->new_exec + ->require ("Some::Module") + ->run ("Some::Module::run", sub { + my ($fork_fh) = @_; + }); + =back =head1 FUNCTIONS @@ -179,16 +270,23 @@ sub _cmd { my $self = shift; + #TODO: maybe append the packet to any existing string command already in the queue + # ideally, we would want to use "a (w/a)*" as format string, but perl versions # from at least 5.8.9 to 5.16.3 are all buggy and can't unpack it. push @{ $self->[2] }, pack "N/a", pack "(w/a)*", @_; $self->[3] ||= AE::io $self->[1], 1, sub { + # send the next "thing" in the queue - either a reference to an fh, + # or a plain string. + if (ref $self->[2][0]) { + # send fh AnyEvent::Fork::Util::fd_send fileno $self->[1], fileno ${ $self->[2][0] } and shift @{ $self->[2] }; } else { + # send string my $len = syswrite $self->[1], $self->[2][0] or do { undef $self->[3]; die "AnyEvent::Fork: command write failure: $!" }; @@ -198,6 +296,7 @@ unless (@{ $self->[2] }) { undef $self->[3]; + # invoke run callback $self->[0]->($self->[1]) if $self->[0]; } }; @@ -215,15 +314,6 @@ undef, # AE watcher ], $self; -# my ($a, $b) = AnyEvent::Util::portable_socketpair; - -# queue_cmd $template, "Iabc"; -# push @{ $template->[2] }, \$b; - -# use Coro::AnyEvent; Coro::AnyEvent::sleep 1; -# undef $b; -# die "x" . <$a>; - $self } @@ -257,6 +347,11 @@ for this purpose. When it doesn't exist yet, it is created by a call to C and kept around for future calls. +When the process object is destroyed, it will release the file handle +that connects it with the new process. When the new process has not yet +called C, then the process will exit. Otherwise, what happens depends +entirely on the code that is executed. + =cut sub new { @@ -348,12 +443,48 @@ $self->_new ($fh) } +=item $proc = $proc->eval ($perlcode, @args) + +Evaluates the given C<$perlcode> as ... perl code, while setting C<@_> to +the strings specified by C<@args>. + +This call is meant to do any custom initialisation that might be required +(for example, the C method uses it). It's not supposed to be used +to completely take over the process, use C for that. + +The code will usually be executed after this call returns, and there is no +way to pass anything back to the calling process. Any evaluation errors +will be reported to stderr and cause the process to exit. + +Returns the process object for easy chaining of method calls. + +=cut + +sub eval { + my ($self, $code, @args) = @_; + + $self->_cmd (e => $code, @args); + + $self +} + =item $proc = $proc->require ($module, ...) -Tries to load the given modules into the process +Tries to load the given module(s) into the process Returns the process object for easy chaining of method calls. +=cut + +sub require { + my ($self, @modules) = @_; + + s%::%/%g for @modules; + $self->eval ('require "$_.pm" for @_', @modules); + + $self +} + =item $proc = $proc->send_fh ($handle, ...) Send one or more file handles (I file descriptors) to the process, @@ -366,6 +497,12 @@ Returns the process object for easy chaining of method calls. +Example: pass an fh to a process, and release it without closing. it will +be closed automatically when it is no longer used. + + $proc->send_fh ($my_fh); + undef $my_fh; # free the reference if you want, but DO NOT CLOSE IT + =cut sub send_fh { @@ -374,7 +511,6 @@ for my $fh (@fh) { $self->_cmd ("h"); push @{ $self->[2] }, \$fh; - push @$self, $fh; # dire hack } $self @@ -421,13 +557,44 @@ because exiting the process closes the socket (if it didn't create any children using fork). +Example: create a template for a process pool, pass a few strings, some +file handles, then fork, pass one more string, and run some code. + + my $pool = AnyEvent::Fork + ->new + ->send_arg ("str1", "str2") + ->send_fh ($fh1, $fh2); + + for (1..2) { + $pool + ->fork + ->send_arg ("str3") + ->run ("Some::function", sub { + my ($fh) = @_; + + # fh is nonblocking, but we trust that the OS can accept these + # extra 3 octets anyway. + syswrite $fh, "hi #$_\n"; + + # $fh is being closed here, as we don't store it anywhere + }); + } + + # Some::function might look like this - all parameters passed before fork + # and after will be passed, in order, after the communications socket. + sub Some::function { + my ($fh, $str1, $str2, $fh1, $fh2, $str3) = @_; + + print scalar <$fh>; # prints "hi 1\n" and "hi 2\n" + } + =cut sub run { my ($self, $func, $cb) = @_; $self->[0] = $cb; - $self->_cmd ("r", $func); + $self->_cmd (r => $func); } =back