1 |
root |
1.1 |
=head1 NAME |
2 |
|
|
|
3 |
|
|
AnyEvent::ProcessPool - manage pools of perl worker processes, exec'ed or fork'ed |
4 |
|
|
|
5 |
|
|
=head1 SYNOPSIS |
6 |
|
|
|
7 |
|
|
use AnyEvent::ProcessPool; |
8 |
|
|
|
9 |
|
|
=head1 DESCRIPTION |
10 |
|
|
|
11 |
|
|
This module allows you to create single worker processes but also worker |
12 |
|
|
pool that share memory, by forking from the main program, or exec'ing new |
13 |
|
|
perl interpreters from a module. |
14 |
|
|
|
15 |
|
|
You create a new processes in a pool by specifying a function to call |
16 |
|
|
with any combination of string values and file handles. |
17 |
|
|
|
18 |
|
|
A pool can have initialisation code which is executed before forking. The |
19 |
|
|
initialisation code is only executed once and the resulting process is |
20 |
|
|
cached, to be used as a template. |
21 |
|
|
|
22 |
|
|
Pools without such initialisation code don't cache an extra process. |
23 |
|
|
|
24 |
|
|
=head1 PROBLEM STATEMENT |
25 |
|
|
|
26 |
|
|
There are two ways to implement parallel processing on UNIX like operating |
27 |
|
|
systems - fork and process, and fork+exec and process. They have different |
28 |
|
|
advantages and disadvantages that I describe below, together with how this |
29 |
|
|
module tries to mitigate the disadvantages. |
30 |
|
|
|
31 |
|
|
=over 4 |
32 |
|
|
|
33 |
|
|
=item Forking from a big process can be very slow (a 5GB process needs |
34 |
|
|
0.05s to fork on my 3.6GHz amd64 GNU/Linux box for example). This overhead |
35 |
|
|
is often shared with exec (because you have to fork first), but in some |
36 |
|
|
circumstances (e.g. when vfork is used), fork+exec can be much faster. |
37 |
|
|
|
38 |
|
|
This module can help here by telling a small(er) helper process to fork, |
39 |
|
|
or fork+exec instead. |
40 |
|
|
|
41 |
|
|
=item Forking usually creates a copy-on-write copy of the parent |
42 |
|
|
process. Memory (for example, modules or data files that have been |
43 |
|
|
will not take additional memory). When exec'ing a new process, modules |
44 |
|
|
and data files might need to be loaded again, at extra cpu and memory |
45 |
|
|
cost. Likewise when forking, all data structures are copied as well - if |
46 |
|
|
the program frees them and replaces them by new data, the child processes |
47 |
|
|
will retain the memory even if it isn't used. |
48 |
|
|
|
49 |
|
|
This module allows the main program to do a controlled fork, and allows |
50 |
|
|
modules to exec processes safely at any time. When creating a custom |
51 |
|
|
process pool you can take advantage of data sharing via fork without |
52 |
|
|
risking to share large dynamic data structures that will blow up child |
53 |
|
|
memory usage. |
54 |
|
|
|
55 |
|
|
=item Exec'ing a new perl process might be difficult and slow. For |
56 |
|
|
example, it is not easy to find the correct path to the perl interpreter, |
57 |
|
|
and all modules have to be loaded from disk again. Long running processes |
58 |
|
|
might run into problems when perl is upgraded for example. |
59 |
|
|
|
60 |
|
|
This module supports creating pre-initialised perl processes to be used |
61 |
|
|
as template, and also tries hard to identify the correct path to the perl |
62 |
|
|
interpreter. With a cooperative main program, exec'ing the interpreter |
63 |
|
|
might not even be necessary. |
64 |
|
|
|
65 |
|
|
=item Forking might be impossible when a program is running. For example, |
66 |
|
|
POSIX makes it almost impossible to fork from a multithreaded program and |
67 |
|
|
do anything useful in the child - strictly speaking, if your perl program |
68 |
|
|
uses posix threads (even indirectly via e.g. L<IO::AIO> or L<threads>), |
69 |
|
|
you cannot call fork on the perl level anymore, at all. |
70 |
|
|
|
71 |
|
|
This module can safely fork helper processes at any time, by caling |
72 |
|
|
fork+exec in C, in a POSIX-compatible way. |
73 |
|
|
|
74 |
|
|
=item Parallel processing with fork might be inconvenient or difficult |
75 |
|
|
to implement. For example, when a program uses an event loop and creates |
76 |
|
|
watchers it becomes very hard to use the event loop from a child |
77 |
|
|
program, as the watchers already exist but are only meaningful in the |
78 |
|
|
parent. Worse, a module might want to use such a system, not knowing |
79 |
|
|
whether another module or the main program also does, leading to problems. |
80 |
|
|
|
81 |
|
|
This module only lets the main program create pools by forking (because |
82 |
|
|
only the main program can know when it is still safe to do so) - all other |
83 |
|
|
pools are created by fork+exec, after which such modules can again be |
84 |
|
|
loaded. |
85 |
|
|
|
86 |
|
|
=back |
87 |
|
|
|
88 |
|
|
=over 4 |
89 |
|
|
|
90 |
|
|
=cut |
91 |
|
|
|
92 |
|
|
package AnyEvent::ProcessPool; |
93 |
|
|
|
94 |
|
|
use common::sense; |
95 |
|
|
|
96 |
|
|
use Socket (); |
97 |
|
|
|
98 |
|
|
use Proc::FastSpawn; |
99 |
|
|
use AnyEvent; |
100 |
|
|
use AnyEvent::ProcessPool::Util; |
101 |
|
|
use AnyEvent::Util (); |
102 |
|
|
|
103 |
|
|
BEGIN { |
104 |
|
|
# require Exporter; |
105 |
|
|
} |
106 |
|
|
|
107 |
|
|
=item my $pool = new AnyEvent::ProcessPool key => value... |
108 |
|
|
|
109 |
|
|
Create a new process pool. The following named parameters are supported: |
110 |
|
|
|
111 |
|
|
=over 4 |
112 |
|
|
|
113 |
|
|
=back |
114 |
|
|
|
115 |
|
|
=cut |
116 |
|
|
|
117 |
|
|
# the template process |
118 |
|
|
our $template; |
119 |
|
|
|
120 |
|
|
sub _queue { |
121 |
|
|
my ($pid, $fh) = @_; |
122 |
|
|
|
123 |
|
|
[ |
124 |
|
|
$pid, |
125 |
|
|
$fh, |
126 |
|
|
[], |
127 |
|
|
undef |
128 |
|
|
] |
129 |
|
|
} |
130 |
|
|
|
131 |
|
|
sub queue_cmd { |
132 |
|
|
my ($queue, $cmd) = @_; |
133 |
|
|
|
134 |
|
|
push @{ $queue->[2] }, pack "N/a", $cmd; |
135 |
|
|
|
136 |
|
|
$queue->[3] ||= AE::io $queue->[1], 1, sub { |
137 |
|
|
if (ref $queue->[2][0]) { |
138 |
|
|
AnyEvent::ProcessPool::Util::fd_send fileno $queue->[1], fileno ${ $queue->[2][0] } |
139 |
|
|
and shift @{ $queue->[2] }; |
140 |
|
|
} else { |
141 |
|
|
my $len = syswrite $queue->[1], $queue->[2][0] |
142 |
root |
1.2 |
or do { undef $queue->[3]; die "AnyEvent::ProcessPool::queue write failure: $!" }; |
143 |
root |
1.1 |
substr $queue->[2][0], 0, $len, ""; |
144 |
|
|
shift @{ $queue->[2] } unless length $queue->[2][0]; |
145 |
|
|
} |
146 |
|
|
|
147 |
|
|
undef $queue->[3] unless @{ $queue->[2] }; |
148 |
|
|
}; |
149 |
|
|
} |
150 |
|
|
|
151 |
|
|
sub run_template { |
152 |
|
|
return if $template; |
153 |
|
|
|
154 |
|
|
my ($fh, $slave) = AnyEvent::Util::portable_socketpair; |
155 |
|
|
AnyEvent::Util::fh_nonblocking $fh, 1; |
156 |
|
|
fd_inherit fileno $slave; |
157 |
|
|
|
158 |
|
|
my %env = %ENV; |
159 |
|
|
$env{PERL5LIB} = join ":", grep !ref, @INC; |
160 |
|
|
|
161 |
|
|
my $pid = spawn |
162 |
|
|
$^X, |
163 |
|
|
["perl", "-MAnyEvent::ProcessPool::Serve", "-e", "AnyEvent::ProcessPool::Serve::me", fileno $slave], |
164 |
|
|
[map "$_=$env{$_}", keys %env], |
165 |
|
|
or die "unable to spawn AnyEvent::ProcessPool server: $!"; |
166 |
|
|
|
167 |
|
|
close $slave; |
168 |
|
|
|
169 |
|
|
$template = _queue $pid, $fh; |
170 |
|
|
|
171 |
|
|
my ($a, $b) = AnyEvent::Util::portable_socketpair; |
172 |
|
|
|
173 |
|
|
queue_cmd $template, "Iabc"; |
174 |
|
|
push @{ $template->[2] }, \$b; |
175 |
|
|
|
176 |
|
|
use Coro::AnyEvent; Coro::AnyEvent::sleep 1; |
177 |
root |
1.2 |
undef $b; |
178 |
|
|
die "x" . <$a>; |
179 |
root |
1.1 |
} |
180 |
|
|
|
181 |
|
|
sub new { |
182 |
|
|
my $class = shift; |
183 |
|
|
|
184 |
|
|
my $self = bless { |
185 |
|
|
@_ |
186 |
|
|
}, $class; |
187 |
|
|
|
188 |
|
|
run_template; |
189 |
|
|
|
190 |
|
|
$self |
191 |
|
|
} |
192 |
|
|
|
193 |
|
|
=back |
194 |
|
|
|
195 |
|
|
=head1 AUTHOR |
196 |
|
|
|
197 |
|
|
Marc Lehmann <schmorp@schmorp.de> |
198 |
|
|
http://home.schmorp.de/ |
199 |
|
|
|
200 |
|
|
=cut |
201 |
|
|
|
202 |
|
|
1 |
203 |
|
|
|