ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-Fork/Fork.pm
(Generate patch)

Comparing AnyEvent-Fork/Fork.pm (file contents):
Revision 1.3 by root, Tue Apr 2 18:00:04 2013 UTC vs.
Revision 1.7 by root, Thu Apr 4 00:27:06 2013 UTC

1=head1 NAME 1=head1 NAME
2 2
3AnyEvent::ProcessPool - manage pools of perl worker processes, exec'ed or fork'ed 3AnyEvent::Fork - everything you wanted to use fork() for, but couldn't
4 4
5=head1 SYNOPSIS 5=head1 SYNOPSIS
6 6
7 use AnyEvent::ProcessPool; 7 use AnyEvent::Fork;
8 8
9=head1 DESCRIPTION 9=head1 DESCRIPTION
10 10
11This module allows you to create single worker processes but also worker 11This module allows you to create new processes, without actually forking
12pool that share memory, by forking from the main program, or exec'ing new 12them from your current process (avoiding the problems of forking), but
13perl interpreters from a module. 13preserving most of the advantages of fork.
14 14
15You create a new processes in a pool by specifying a function to call 15It can be used to create new worker processes or new independent
16with any combination of string values and file handles. 16subprocesses for short- and long-running jobs, process pools (e.g. for use
17in pre-forked servers) but also to spawn new external processes (such as
18CGI scripts from a webserver), which can be faster (and more well behaved)
19than using fork+exec in big processes.
17 20
18A pool can have initialisation code which is executed before forking. The 21Special care has been taken to make this module useful from other modules,
19initialisation code is only executed once and the resulting process is 22while still supporting specialised environments such as L<App::Staticperl>
20cached, to be used as a template. 23or L<PAR::Packer>.
21
22Pools without such initialisation code don't cache an extra process.
23 24
24=head1 PROBLEM STATEMENT 25=head1 PROBLEM STATEMENT
25 26
26There are two ways to implement parallel processing on UNIX like operating 27There are two ways to implement parallel processing on UNIX like operating
27systems - fork and process, and fork+exec and process. They have different 28systems - fork and process, and fork+exec and process. They have different
145 146
146=over 4 147=over 4
147 148
148=cut 149=cut
149 150
150package AnyEvent::ProcessPool; 151package AnyEvent::Fork;
151 152
152use common::sense; 153use common::sense;
153 154
154use Socket (); 155use Socket ();
155 156
156use Proc::FastSpawn;
157use AnyEvent; 157use AnyEvent;
158use AnyEvent::ProcessPool::Util; 158use AnyEvent::Fork::Util;
159use AnyEvent::Util (); 159use AnyEvent::Util ();
160 160
161BEGIN { 161our $PERL; # the path to the perl interpreter, deduces with various forms of magic
162# require Exporter;
163}
164 162
165=item my $pool = new AnyEvent::ProcessPool key => value... 163=item my $pool = new AnyEvent::Fork key => value...
166 164
167Create a new process pool. The following named parameters are supported: 165Create a new process pool. The following named parameters are supported:
168 166
169=over 4 167=over 4
170 168
171=back 169=back
172 170
173=cut 171=cut
174 172
173# the early fork template process
174our $EARLY;
175
175# the template process 176# the empty template process
176our $template; 177our $TEMPLATE;
177 178
178sub _queue { 179sub _cmd {
180 my $self = shift;
181
182 # ideally, we would want to use "a (w/a)*" as format string, but perl versions
183 # from at least 5.8.9 to 5.16.3 are all buggy and can't unpack it.
184 push @{ $self->[2] }, pack "N/a", pack "(w/a)*", @_;
185
186 $self->[3] ||= AE::io $self->[1], 1, sub {
187 if (ref $self->[2][0]) {
188 AnyEvent::Fork::Util::fd_send fileno $self->[1], fileno ${ $self->[2][0] }
189 and shift @{ $self->[2] };
190
191 } else {
192 my $len = syswrite $self->[1], $self->[2][0]
193 or do { undef $self->[3]; die "AnyEvent::Fork: command write failure: $!" };
194
195 substr $self->[2][0], 0, $len, "";
196 shift @{ $self->[2] } unless length $self->[2][0];
197 }
198
199 unless (@{ $self->[2] }) {
200 undef $self->[3];
201 $self->[0]->($self->[1]) if $self->[0];
202 }
203 };
204}
205
206sub _new {
179 my ($pid, $fh) = @_; 207 my ($self, $fh) = @_;
180 208
181 [ 209 AnyEvent::Util::fh_nonblocking $fh, 1;
182 $pid, 210
211 $self = bless [
212 undef, # run callback
183 $fh, 213 $fh,
184 [], 214 [], # write queue - strings or fd's
185 undef 215 undef, # AE watcher
186 ] 216 ], $self;
187}
188 217
189sub queue_cmd { 218# my ($a, $b) = AnyEvent::Util::portable_socketpair;
190 my $queue = shift;
191 219
192 push @{ $queue->[2] }, pack "N/a", pack "a (w/a)*", @_; 220# queue_cmd $template, "Iabc";
221# push @{ $template->[2] }, \$b;
193 222
194 $queue->[3] ||= AE::io $queue->[1], 1, sub { 223# use Coro::AnyEvent; Coro::AnyEvent::sleep 1;
195 if (ref $queue->[2][0]) { 224# undef $b;
196 AnyEvent::ProcessPool::Util::fd_send fileno $queue->[1], fileno ${ $queue->[2][0] } 225# die "x" . <$a>;
197 and shift @{ $queue->[2] };
198 } else {
199 my $len = syswrite $queue->[1], $queue->[2][0]
200 or do { undef $queue->[3]; die "AnyEvent::ProcessPool::queue write failure: $!" };
201 substr $queue->[2][0], 0, $len, "";
202 shift @{ $queue->[2] } unless length $queue->[2][0];
203 }
204 226
205 undef $queue->[3] unless @{ $queue->[2] }; 227 $self
206 };
207} 228}
208 229
209sub run_template { 230# fork template from current process, used by AnyEvent::Fork::Early/Template
210 return if $template; 231sub _new_fork {
211
212 my ($fh, $slave) = AnyEvent::Util::portable_socketpair; 232 my ($fh, $slave) = AnyEvent::Util::portable_socketpair;
213 AnyEvent::Util::fh_nonblocking $fh, 1; 233 my $parent = $$;
214 fd_inherit fileno $slave;
215 234
235 my $pid = fork;
236
237 if ($pid eq 0) {
238 require AnyEvent::Fork::Serve;
239 $AnyEvent::Fork::Serve::OWNER = $parent;
240 close $fh;
241 $0 = "$_[1] of $parent";
242 AnyEvent::Fork::Serve::serve ($slave);
243 AnyEvent::Fork::Util::_exit 0;
244 } elsif (!$pid) {
245 die "AnyEvent::Fork::Early/Template: unable to fork template process: $!";
246 }
247
248 AnyEvent::Fork->_new ($fh)
249}
250
251=item my $proc = new AnyEvent::Fork
252
253Create a new "empty" perl interpreter process and returns its process
254object for further manipulation.
255
256The new process is forked from a template process that is kept around
257for this purpose. When it doesn't exist yet, it is created by a call to
258C<new_exec> and kept around for future calls.
259
260=cut
261
262sub new {
263 my $class = shift;
264
265 $TEMPLATE ||= $class->new_exec;
266 $TEMPLATE->fork
267}
268
269=item $new_proc = $proc->fork
270
271Forks C<$proc>, creating a new process, and returns the process object
272of the new process.
273
274If any of the C<send_> functions have been called before fork, then they
275will be cloned in the child. For example, in a pre-forked server, you
276might C<send_fh> the listening socket into the template process, and then
277keep calling C<fork> and C<run>.
278
279=cut
280
281sub fork {
282 my ($self) = @_;
283
284 my ($fh, $slave) = AnyEvent::Util::portable_socketpair;
285
286 $self->send_fh ($slave);
287 $self->_cmd ("f");
288
289 AnyEvent::Fork->_new ($fh)
290}
291
292=item my $proc = new_exec AnyEvent::Fork
293
294Create a new "empty" perl interpreter process and returns its process
295object for further manipulation.
296
297Unlike the C<new> method, this method I<always> spawns a new perl process
298(except in some cases, see L<AnyEvent::Fork::Early> for details). This
299reduces the amount of memory sharing that is possible, and is also slower.
300
301You should use C<new> whenever possible, except when having a template
302process around is unacceptable.
303
304The path to the perl interpreter is divined usign various methods - first
305C<$^X> is investigated to see if the path ends with something that sounds
306as if it were the perl interpreter. Failing this, the module falls back to
307using C<$Config::Config{perlpath}>.
308
309=cut
310
311sub new_exec {
312 my ($self) = @_;
313
314 return $EARLY->fork
315 if $EARLY;
316
317 # first find path of perl
318 my $perl = $;
319
320 # first we try $^X, but the path must be absolute (always on win32), and end in sth.
321 # that looks like perl. this obviously only works for posix and win32
322 unless (
323 (AnyEvent::Fork::Util::WIN32 || $perl =~ m%^/%)
324 && $perl =~ m%[/\\]perl(?:[0-9]+(\.[0-9]+)+)?(\.exe)?$%i
325 ) {
326 # if it doesn't look perlish enough, try Config
327 require Config;
328 $perl = $Config::Config{perlpath};
329 $perl =~ s/(?:\Q$Config::Config{_exe}\E)?$/$Config::Config{_exe}/;
330 }
331
332 require Proc::FastSpawn;
333
334 my ($fh, $slave) = AnyEvent::Util::portable_socketpair;
335 Proc::FastSpawn::fd_inherit (fileno $slave);
336
337 # quick. also doesn't work in win32. of course. what did you expect
338 #local $ENV{PERL5LIB} = join ":", grep !ref, @INC;
216 my %env = %ENV; 339 my %env = %ENV;
217 $env{PERL5LIB} = join ":", grep !ref, @INC; 340 $env{PERL5LIB} = join ":", grep !ref, @INC;
218 341
219 my $pid = spawn 342 Proc::FastSpawn::spawn (
220 $^X, 343 $perl,
221 ["perl", "-MAnyEvent::ProcessPool::Serve", "-e", "AnyEvent::ProcessPool::Serve::me", fileno $slave], 344 ["perl", "-MAnyEvent::Fork::Serve", "-e", "AnyEvent::Fork::Serve::me", fileno $slave, $$],
222 [map "$_=$env{$_}", keys %env], 345 [map "$_=$env{$_}", keys %env],
223 or die "unable to spawn AnyEvent::ProcessPool server: $!"; 346 ) or die "unable to spawn AnyEvent::Fork server: $!";
224 347
225 close $slave; 348 $self->_new ($fh)
349}
226 350
227 $template = _queue $pid, $fh; 351=item $proc = $proc->require ($module, ...)
228 352
229 my ($a, $b) = AnyEvent::Util::portable_socketpair; 353Tries to load the given modules into the process
230 354
231 queue_cmd $template, "Iabc"; 355Returns the process object for easy chaining of method calls.
356
357=item $proc = $proc->send_fh ($handle, ...)
358
359Send one or more file handles (I<not> file descriptors) to the process,
360to prepare a call to C<run>.
361
362The process object keeps a reference to the handles until this is done,
363so you must not explicitly close the handles. This is most easily
364accomplished by simply not storing the file handles anywhere after passing
365them to this method.
366
367Returns the process object for easy chaining of method calls.
368
369=cut
370
371sub send_fh {
372 my ($self, @fh) = @_;
373
374 for my $fh (@fh) {
375 $self->_cmd ("h");
232 push @{ $template->[2] }, \$b; 376 push @{ $self->[2] }, \$fh;
233 377 }
234 use Coro::AnyEvent; Coro::AnyEvent::sleep 1;
235 undef $b;
236 die "x" . <$a>;
237}
238
239sub new {
240 my $class = shift;
241
242 my $self = bless {
243 @_
244 }, $class;
245
246 run_template;
247 378
248 $self 379 $self
380}
381
382=item $proc = $proc->send_arg ($string, ...)
383
384Send one or more argument strings to the process, to prepare a call to
385C<run>. The strings can be any octet string.
386
387Returns the process object for easy chaining of emthod calls.
388
389=cut
390
391sub send_arg {
392 my ($self, @arg) = @_;
393
394 $self->_cmd (a => @arg);
395
396 $self
397}
398
399=item $proc->run ($func, $cb->($fh))
400
401Enter the function specified by the fully qualified name in C<$func> in
402the process. The function is called with the communication socket as first
403argument, followed by all file handles and string arguments sent earlier
404via C<send_fh> and C<send_arg> methods, in the order they were called.
405
406If the called function returns, the process exits.
407
408Preparing the process can take time - when the process is ready, the
409callback is invoked with the local communications socket as argument.
410
411The process object becomes unusable on return from this function.
412
413If the communication socket isn't used, it should be closed on both sides,
414to save on kernel memory.
415
416The socket is non-blocking in the parent, and blocking in the newly
417created process. The close-on-exec flag is set on both. Even if not used
418otherwise, the socket can be a good indicator for the existance of the
419process - if the othe rprocess exits, you get a readable event on it,
420because exiting the process closes the socket (if it didn't create any
421children using fork).
422
423=cut
424
425sub run {
426 my ($self, $func, $cb) = @_;
427
428 $self->[0] = $cb;
429 $self->_cmd ("r", $func);
249} 430}
250 431
251=back 432=back
252 433
253=head1 AUTHOR 434=head1 AUTHOR

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines