ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-Fork/Fork.pm
(Generate patch)

Comparing AnyEvent-Fork/Fork.pm (file contents):
Revision 1.3 by root, Tue Apr 2 18:00:04 2013 UTC vs.
Revision 1.4 by root, Wed Apr 3 07:35:57 2013 UTC

1=head1 NAME 1=head1 NAME
2 2
3AnyEvent::ProcessPool - manage pools of perl worker processes, exec'ed or fork'ed 3AnyEvent::Fork - everything you wanted to use fork() for, but couldn't
4 4
5=head1 SYNOPSIS 5=head1 SYNOPSIS
6 6
7 use AnyEvent::ProcessPool; 7 use AnyEvent::Fork;
8 8
9=head1 DESCRIPTION 9=head1 DESCRIPTION
10 10
11This module allows you to create single worker processes but also worker 11This module allows you to create new processes, without actually forking
12pool that share memory, by forking from the main program, or exec'ing new 12them from your current process (avoiding the problems of forking), but
13perl interpreters from a module. 13preserving most of the advantages of fork.
14 14
15You create a new processes in a pool by specifying a function to call 15It can be used to create new worker processes or new independent
16with any combination of string values and file handles. 16subprocesses for short- and long-running jobs, process pools (e.g. for use
17 17in pre-forked servers) but also to spawn new external processes (such as
18A pool can have initialisation code which is executed before forking. The 18CGI scripts from a webserver), which can be faster (and more well behaved)
19initialisation code is only executed once and the resulting process is 19than using fork+exec in big processes.
20cached, to be used as a template.
21
22Pools without such initialisation code don't cache an extra process.
23 20
24=head1 PROBLEM STATEMENT 21=head1 PROBLEM STATEMENT
25 22
26There are two ways to implement parallel processing on UNIX like operating 23There are two ways to implement parallel processing on UNIX like operating
27systems - fork and process, and fork+exec and process. They have different 24systems - fork and process, and fork+exec and process. They have different
145 142
146=over 4 143=over 4
147 144
148=cut 145=cut
149 146
150package AnyEvent::ProcessPool; 147package AnyEvent::Fork;
151 148
152use common::sense; 149use common::sense;
153 150
154use Socket (); 151use Socket ();
155 152
156use Proc::FastSpawn;
157use AnyEvent; 153use AnyEvent;
158use AnyEvent::ProcessPool::Util; 154use AnyEvent::Fork::Util;
159use AnyEvent::Util (); 155use AnyEvent::Util ();
160 156
161BEGIN { 157our $PERL; # the path to the perl interpreter, deduces with various forms of magic
162# require Exporter;
163}
164 158
165=item my $pool = new AnyEvent::ProcessPool key => value... 159=item my $pool = new AnyEvent::Fork key => value...
166 160
167Create a new process pool. The following named parameters are supported: 161Create a new process pool. The following named parameters are supported:
168 162
169=over 4 163=over 4
170 164
171=back 165=back
172 166
173=cut 167=cut
174 168
175# the template process 169# the empty template process
176our $template; 170our $TEMPLATE;
177 171
178sub _queue { 172sub _cmd {
173 my $self = shift;
174
175 # ideally, we would want to use "a (w/a)*" as format string, but perl versions
176 # form at least 5.8.9 to 5.16.3 are all buggy and can't unpack it.
177 push @{ $self->[2] }, pack "N/a", pack "(w/a)*", @_;
178
179 $self->[3] ||= AE::io $self->[1], 1, sub {
180 if (ref $self->[2][0]) {
181 AnyEvent::Fork::Util::fd_send fileno $self->[1], fileno ${ $self->[2][0] }
182 and shift @{ $self->[2] };
183 } else {
184 my $len = syswrite $self->[1], $self->[2][0]
185 or do { undef $self->[3]; die "AnyEvent::Fork: command write failure: $!" };
186 substr $self->[2][0], 0, $len, "";
187 shift @{ $self->[2] } unless length $self->[2][0];
188 }
189
190 unless (@{ $self->[2] }) {
191 undef $self->[3];
192 $self->[0]->($self->[1]) if $self->[0];
193 }
194 };
195}
196
197sub _new {
179 my ($pid, $fh) = @_; 198 my ($self, $fh) = @_;
180 199
181 [ 200 $self = bless [
182 $pid, 201 undef, # run callback
183 $fh, 202 $fh,
184 [], 203 [], # write queue - strings or fd's
185 undef 204 undef, # AE watcher
186 ] 205 ], $self;
187}
188 206
189sub queue_cmd { 207# my ($a, $b) = AnyEvent::Util::portable_socketpair;
208
209# queue_cmd $template, "Iabc";
210# push @{ $template->[2] }, \$b;
211
212# use Coro::AnyEvent; Coro::AnyEvent::sleep 1;
213# undef $b;
214# die "x" . <$a>;
215
216 $self
217}
218
219=item my $proc = new AnyEvent::Fork
220
221Create a new "empty" perl interpreter process and returns its process
222object for further manipulation.
223
224The new process is forked from a template process that is kept around
225for this purpose. When it doesn't exist yet, it is created by a call to
226C<new_exec> and kept around for future calls.
227
228=cut
229
230sub new {
190 my $queue = shift; 231 my $class = shift;
191 232
192 push @{ $queue->[2] }, pack "N/a", pack "a (w/a)*", @_; 233 $TEMPLATE ||= $class->new_exec;
234 $TEMPLATE->fork
235}
193 236
194 $queue->[3] ||= AE::io $queue->[1], 1, sub { 237=item $new_proc = $proc->fork
195 if (ref $queue->[2][0]) {
196 AnyEvent::ProcessPool::Util::fd_send fileno $queue->[1], fileno ${ $queue->[2][0] }
197 and shift @{ $queue->[2] };
198 } else {
199 my $len = syswrite $queue->[1], $queue->[2][0]
200 or do { undef $queue->[3]; die "AnyEvent::ProcessPool::queue write failure: $!" };
201 substr $queue->[2][0], 0, $len, "";
202 shift @{ $queue->[2] } unless length $queue->[2][0];
203 }
204 238
205 undef $queue->[3] unless @{ $queue->[2] }; 239Forks C<$proc>, creating a new process, and returns the process object
240of the new process.
241
242If any of the C<send_> functions have been called before fork, then they
243will be cloned in the child. For example, in a pre-forked server, you
244might C<send_fh> the listening socket into the template process, and then
245keep calling C<fork> and C<run>.
246
247=cut
248
249sub fork {
250 my ($self) = @_;
251
252 my ($fh, $slave) = AnyEvent::Util::portable_socketpair;
253
254 $self->send_fh ($slave);
255 $self->_cmd ("f");
256
257 AnyEvent::Util::fh_nonblocking $fh, 1;
258
259 AnyEvent::Fork->_new ($fh)
260}
261
262=item my $proc = new_exec AnyEvent::Fork
263
264Create a new "empty" perl interpreter process and returns its process
265object for further manipulation.
266
267Unlike the C<new> method, this method I<always> spawns a new perl process
268(except in some cases, see L<AnyEvent::Fork::Early> for details). This
269reduces the amount of memory sharing that is possible, and is also slower.
270
271You should use C<new> whenever possible, except when having a template
272process around is unacceptable.
273
274The path to the perl interpreter is divined usign various methods - first
275C<$^X> is investigated to see if the path ends with something that sounds
276as if it were the perl interpreter. Failing this, the module falls back to
277using C<$Config::Config{perlpath}>.
278
279=cut
280
281sub new_exec {
282 my ($self) = @_;
283
284 # first find path of perl
285 my $perl = $;
286
287 # first we try $^X, but the path must be absolute (always on win32), and end in sth.
288 # that looks like perl. this obviously only works for posix and win32
289 unless (
290 (AnyEvent::Fork::Util::WIN32 || $perl =~ m%^/%)
291 && $perl =~ m%[/\\]perl(?:[0-9]+(\.[0-9]+)+)?(\.exe)?$%i
292 ) {
293 # if it doesn't look perlish enough, try Config
294 require Config;
295 $perl = $Config::Config{perlpath};
296 $perl =~ s/(?:\Q$Config::Config{_exe}\E)?$/$Config::Config{_exe}/;
206 }; 297 }
207}
208 298
209sub run_template { 299 require Proc::FastSpawn;
210 return if $template;
211 300
212 my ($fh, $slave) = AnyEvent::Util::portable_socketpair; 301 my ($fh, $slave) = AnyEvent::Util::portable_socketpair;
213 AnyEvent::Util::fh_nonblocking $fh, 1; 302 AnyEvent::Util::fh_nonblocking $fh, 1;
214 fd_inherit fileno $slave; 303 Proc::FastSpawn::fd_inherit (fileno $slave);
215 304
305 # quick. also doesn't work in win32. of course. what did you expect
306 #local $ENV{PERL5LIB} = join ":", grep !ref, @INC;
216 my %env = %ENV; 307 my %env = %ENV;
217 $env{PERL5LIB} = join ":", grep !ref, @INC; 308 $env{PERL5LIB} = join ":", grep !ref, @INC;
218 309
219 my $pid = spawn 310 Proc::FastSpawn::spawn (
220 $^X, 311 $perl,
221 ["perl", "-MAnyEvent::ProcessPool::Serve", "-e", "AnyEvent::ProcessPool::Serve::me", fileno $slave], 312 ["perl", "-MAnyEvent::Fork::Serve", "-e", "AnyEvent::Fork::Serve::me", fileno $slave],
222 [map "$_=$env{$_}", keys %env], 313 [map "$_=$env{$_}", keys %env],
223 or die "unable to spawn AnyEvent::ProcessPool server: $!"; 314 ) or die "unable to spawn AnyEvent::Fork server: $!";
224 315
225 close $slave; 316 $self->_new ($fh)
317}
226 318
227 $template = _queue $pid, $fh; 319=item $proc = $proc->require ($module, ...)
228 320
229 my ($a, $b) = AnyEvent::Util::portable_socketpair; 321Tries to load the given modules into the process
230 322
231 queue_cmd $template, "Iabc"; 323Returns the process object for easy chaining of method calls.
324
325=item $proc = $proc->send_fh ($handle, ...)
326
327Send one or more file handles (I<not> file descriptors) to the process,
328to prepare a call to C<run>.
329
330The process object keeps a reference to the handles until this is done,
331so you must not explicitly close the handles. This is most easily
332accomplished by simply not storing the file handles anywhere after passing
333them to this method.
334
335Returns the process object for easy chaining of method calls.
336
337=cut
338
339sub send_fh {
340 my ($self, @fh) = @_;
341
342 for my $fh (@fh) {
343 $self->_cmd ("h");
232 push @{ $template->[2] }, \$b; 344 push @{ $self->[2] }, \$fh;
233 345 }
234 use Coro::AnyEvent; Coro::AnyEvent::sleep 1;
235 undef $b;
236 die "x" . <$a>;
237}
238
239sub new {
240 my $class = shift;
241
242 my $self = bless {
243 @_
244 }, $class;
245
246 run_template;
247 346
248 $self 347 $self
348}
349
350=item $proc = $proc->send_arg ($string, ...)
351
352Send one or more argument strings to the process, to prepare a call to
353C<run>. The strings can be any octet string.
354
355Returns the process object for easy chaining of emthod calls.
356
357=cut
358
359sub send_arg {
360 my ($self, @arg) = @_;
361
362 $self->_cmd (a => @arg);
363
364 $self
365}
366
367=item $proc->run ($func, $cb->($fh))
368
369Enter the function specified by the fully qualified name in C<$func> in
370the process. The function is called with the communication socket as first
371argument, followed by all file handles and string arguments sent earlier
372via C<send_fh> and C<send_arg> methods, in the order they were called.
373
374If the called function returns, the process exits.
375
376Preparing the process can take time - when the process is ready, the
377callback is invoked with the local communications socket as argument.
378
379The process object becomes unusable on return from this function.
380
381If the communication socket isn't used, it should be closed on both sides,
382to save on kernel memory.
383
384The socket is non-blocking in the parent, and blocking in the newly
385created process. The close-on-exec flag is set on both. Even if not used
386otherwise, the socket can be a good indicator for the existance of the
387process - if the othe rprocess exits, you get a readable event on it,
388because exiting the process closes the socket (if it didn't create any
389children using fork).
390
391=cut
392
393sub run {
394 my ($self, $func, $cb) = @_;
395
396 $self->[0] = $cb;
397 $self->_cmd ("r", $func);
249} 398}
250 399
251=back 400=back
252 401
253=head1 AUTHOR 402=head1 AUTHOR

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines