1 |
root |
1.1 |
=head1 NAME |
2 |
|
|
|
3 |
|
|
AnyEvent::ProcessPool - manage pools of perl worker processes, exec'ed or fork'ed |
4 |
|
|
|
5 |
|
|
=head1 SYNOPSIS |
6 |
|
|
|
7 |
|
|
use AnyEvent::ProcessPool; |
8 |
|
|
|
9 |
|
|
=head1 DESCRIPTION |
10 |
|
|
|
11 |
|
|
This module allows you to create single worker processes but also worker |
12 |
|
|
pool that share memory, by forking from the main program, or exec'ing new |
13 |
|
|
perl interpreters from a module. |
14 |
|
|
|
15 |
|
|
You create a new processes in a pool by specifying a function to call |
16 |
|
|
with any combination of string values and file handles. |
17 |
|
|
|
18 |
|
|
A pool can have initialisation code which is executed before forking. The |
19 |
|
|
initialisation code is only executed once and the resulting process is |
20 |
|
|
cached, to be used as a template. |
21 |
|
|
|
22 |
|
|
Pools without such initialisation code don't cache an extra process. |
23 |
|
|
|
24 |
|
|
=head1 PROBLEM STATEMENT |
25 |
|
|
|
26 |
|
|
There are two ways to implement parallel processing on UNIX like operating |
27 |
|
|
systems - fork and process, and fork+exec and process. They have different |
28 |
|
|
advantages and disadvantages that I describe below, together with how this |
29 |
|
|
module tries to mitigate the disadvantages. |
30 |
|
|
|
31 |
|
|
=over 4 |
32 |
|
|
|
33 |
|
|
=item Forking from a big process can be very slow (a 5GB process needs |
34 |
|
|
0.05s to fork on my 3.6GHz amd64 GNU/Linux box for example). This overhead |
35 |
|
|
is often shared with exec (because you have to fork first), but in some |
36 |
|
|
circumstances (e.g. when vfork is used), fork+exec can be much faster. |
37 |
|
|
|
38 |
|
|
This module can help here by telling a small(er) helper process to fork, |
39 |
|
|
or fork+exec instead. |
40 |
|
|
|
41 |
|
|
=item Forking usually creates a copy-on-write copy of the parent |
42 |
|
|
process. Memory (for example, modules or data files that have been |
43 |
|
|
will not take additional memory). When exec'ing a new process, modules |
44 |
|
|
and data files might need to be loaded again, at extra cpu and memory |
45 |
|
|
cost. Likewise when forking, all data structures are copied as well - if |
46 |
|
|
the program frees them and replaces them by new data, the child processes |
47 |
|
|
will retain the memory even if it isn't used. |
48 |
|
|
|
49 |
|
|
This module allows the main program to do a controlled fork, and allows |
50 |
|
|
modules to exec processes safely at any time. When creating a custom |
51 |
|
|
process pool you can take advantage of data sharing via fork without |
52 |
|
|
risking to share large dynamic data structures that will blow up child |
53 |
|
|
memory usage. |
54 |
|
|
|
55 |
|
|
=item Exec'ing a new perl process might be difficult and slow. For |
56 |
|
|
example, it is not easy to find the correct path to the perl interpreter, |
57 |
|
|
and all modules have to be loaded from disk again. Long running processes |
58 |
|
|
might run into problems when perl is upgraded for example. |
59 |
|
|
|
60 |
|
|
This module supports creating pre-initialised perl processes to be used |
61 |
|
|
as template, and also tries hard to identify the correct path to the perl |
62 |
|
|
interpreter. With a cooperative main program, exec'ing the interpreter |
63 |
|
|
might not even be necessary. |
64 |
|
|
|
65 |
|
|
=item Forking might be impossible when a program is running. For example, |
66 |
|
|
POSIX makes it almost impossible to fork from a multithreaded program and |
67 |
|
|
do anything useful in the child - strictly speaking, if your perl program |
68 |
|
|
uses posix threads (even indirectly via e.g. L<IO::AIO> or L<threads>), |
69 |
|
|
you cannot call fork on the perl level anymore, at all. |
70 |
|
|
|
71 |
|
|
This module can safely fork helper processes at any time, by caling |
72 |
|
|
fork+exec in C, in a POSIX-compatible way. |
73 |
|
|
|
74 |
|
|
=item Parallel processing with fork might be inconvenient or difficult |
75 |
|
|
to implement. For example, when a program uses an event loop and creates |
76 |
|
|
watchers it becomes very hard to use the event loop from a child |
77 |
|
|
program, as the watchers already exist but are only meaningful in the |
78 |
|
|
parent. Worse, a module might want to use such a system, not knowing |
79 |
|
|
whether another module or the main program also does, leading to problems. |
80 |
|
|
|
81 |
|
|
This module only lets the main program create pools by forking (because |
82 |
|
|
only the main program can know when it is still safe to do so) - all other |
83 |
|
|
pools are created by fork+exec, after which such modules can again be |
84 |
|
|
loaded. |
85 |
|
|
|
86 |
|
|
=back |
87 |
|
|
|
88 |
root |
1.3 |
=head1 CONCEPTS |
89 |
|
|
|
90 |
|
|
This module can create new processes either by executing a new perl |
91 |
|
|
process, or by forking from an existing "template" process. |
92 |
|
|
|
93 |
|
|
Each such process comes with its own file handle that can be used to |
94 |
|
|
communicate with it (it's actually a socket - one end in the new process, |
95 |
|
|
one end in the main process), and among the things you can do in it are |
96 |
|
|
load modules, fork new processes, send file handles to it, and execute |
97 |
|
|
functions. |
98 |
|
|
|
99 |
|
|
There are multiple ways to create additional processes to execute some |
100 |
|
|
jobs: |
101 |
|
|
|
102 |
|
|
=over 4 |
103 |
|
|
|
104 |
|
|
=item fork a new process from the "default" template process, load code, |
105 |
|
|
run it |
106 |
|
|
|
107 |
|
|
This module has a "default" template process which it executes when it is |
108 |
|
|
needed the first time. Forking from this process shares the memory used |
109 |
|
|
for the perl interpreter with the new process, but loading modules takes |
110 |
|
|
time, and the memory is not shared with anything else. |
111 |
|
|
|
112 |
|
|
This is ideal for when you only need one extra process of a kind, with the |
113 |
|
|
option of starting and stipping it on demand. |
114 |
|
|
|
115 |
|
|
=item fork a new template process, load code, then fork processes off of |
116 |
|
|
it and run the code |
117 |
|
|
|
118 |
|
|
When you need to have a bunch of processes that all execute the same (or |
119 |
|
|
very similar) tasks, then a good way is to create a new template process |
120 |
|
|
for them, loading all the modules you need, and then create your worker |
121 |
|
|
processes from this new template process. |
122 |
|
|
|
123 |
|
|
This way, all code (and data structures) that can be shared (e.g. the |
124 |
|
|
modules you loaded) is shared between the processes, and each new process |
125 |
|
|
consumes relatively little memory of its own. |
126 |
|
|
|
127 |
|
|
The disadvantage of this approach is that you need to create a template |
128 |
|
|
process for the sole purpose of forking new processes from it, but if you |
129 |
|
|
only need a fixed number of proceses you can create them, and then destroy |
130 |
|
|
the template process. |
131 |
|
|
|
132 |
|
|
=item execute a new perl interpreter, load some code, run it |
133 |
|
|
|
134 |
|
|
This is relatively slow, and doesn't allow you to share memory between |
135 |
|
|
multiple processes. |
136 |
|
|
|
137 |
|
|
The only advantage is that you don't have to have a template process |
138 |
|
|
hanging around all the time to fork off some new processes, which might be |
139 |
|
|
an advantage when there are long time spans where no extra processes are |
140 |
|
|
needed. |
141 |
|
|
|
142 |
|
|
=back |
143 |
|
|
|
144 |
|
|
=head1 FUNCTIONS |
145 |
|
|
|
146 |
root |
1.1 |
=over 4 |
147 |
|
|
|
148 |
|
|
=cut |
149 |
|
|
|
150 |
|
|
package AnyEvent::ProcessPool; |
151 |
|
|
|
152 |
|
|
use common::sense; |
153 |
|
|
|
154 |
|
|
use Socket (); |
155 |
|
|
|
156 |
|
|
use Proc::FastSpawn; |
157 |
|
|
use AnyEvent; |
158 |
|
|
use AnyEvent::ProcessPool::Util; |
159 |
|
|
use AnyEvent::Util (); |
160 |
|
|
|
161 |
|
|
BEGIN { |
162 |
|
|
# require Exporter; |
163 |
|
|
} |
164 |
|
|
|
165 |
|
|
=item my $pool = new AnyEvent::ProcessPool key => value... |
166 |
|
|
|
167 |
|
|
Create a new process pool. The following named parameters are supported: |
168 |
|
|
|
169 |
|
|
=over 4 |
170 |
|
|
|
171 |
|
|
=back |
172 |
|
|
|
173 |
|
|
=cut |
174 |
|
|
|
175 |
|
|
# the template process |
176 |
|
|
our $template; |
177 |
|
|
|
178 |
|
|
sub _queue { |
179 |
|
|
my ($pid, $fh) = @_; |
180 |
|
|
|
181 |
|
|
[ |
182 |
|
|
$pid, |
183 |
|
|
$fh, |
184 |
|
|
[], |
185 |
|
|
undef |
186 |
|
|
] |
187 |
|
|
} |
188 |
|
|
|
189 |
|
|
sub queue_cmd { |
190 |
root |
1.3 |
my $queue = shift; |
191 |
root |
1.1 |
|
192 |
root |
1.3 |
push @{ $queue->[2] }, pack "N/a", pack "a (w/a)*", @_; |
193 |
root |
1.1 |
|
194 |
|
|
$queue->[3] ||= AE::io $queue->[1], 1, sub { |
195 |
|
|
if (ref $queue->[2][0]) { |
196 |
|
|
AnyEvent::ProcessPool::Util::fd_send fileno $queue->[1], fileno ${ $queue->[2][0] } |
197 |
|
|
and shift @{ $queue->[2] }; |
198 |
|
|
} else { |
199 |
|
|
my $len = syswrite $queue->[1], $queue->[2][0] |
200 |
root |
1.2 |
or do { undef $queue->[3]; die "AnyEvent::ProcessPool::queue write failure: $!" }; |
201 |
root |
1.1 |
substr $queue->[2][0], 0, $len, ""; |
202 |
|
|
shift @{ $queue->[2] } unless length $queue->[2][0]; |
203 |
|
|
} |
204 |
|
|
|
205 |
|
|
undef $queue->[3] unless @{ $queue->[2] }; |
206 |
|
|
}; |
207 |
|
|
} |
208 |
|
|
|
209 |
|
|
sub run_template { |
210 |
|
|
return if $template; |
211 |
|
|
|
212 |
|
|
my ($fh, $slave) = AnyEvent::Util::portable_socketpair; |
213 |
|
|
AnyEvent::Util::fh_nonblocking $fh, 1; |
214 |
|
|
fd_inherit fileno $slave; |
215 |
|
|
|
216 |
|
|
my %env = %ENV; |
217 |
|
|
$env{PERL5LIB} = join ":", grep !ref, @INC; |
218 |
|
|
|
219 |
|
|
my $pid = spawn |
220 |
|
|
$^X, |
221 |
|
|
["perl", "-MAnyEvent::ProcessPool::Serve", "-e", "AnyEvent::ProcessPool::Serve::me", fileno $slave], |
222 |
|
|
[map "$_=$env{$_}", keys %env], |
223 |
|
|
or die "unable to spawn AnyEvent::ProcessPool server: $!"; |
224 |
|
|
|
225 |
|
|
close $slave; |
226 |
|
|
|
227 |
|
|
$template = _queue $pid, $fh; |
228 |
|
|
|
229 |
|
|
my ($a, $b) = AnyEvent::Util::portable_socketpair; |
230 |
|
|
|
231 |
|
|
queue_cmd $template, "Iabc"; |
232 |
|
|
push @{ $template->[2] }, \$b; |
233 |
|
|
|
234 |
|
|
use Coro::AnyEvent; Coro::AnyEvent::sleep 1; |
235 |
root |
1.2 |
undef $b; |
236 |
|
|
die "x" . <$a>; |
237 |
root |
1.1 |
} |
238 |
|
|
|
239 |
|
|
sub new { |
240 |
|
|
my $class = shift; |
241 |
|
|
|
242 |
|
|
my $self = bless { |
243 |
|
|
@_ |
244 |
|
|
}, $class; |
245 |
|
|
|
246 |
|
|
run_template; |
247 |
|
|
|
248 |
|
|
$self |
249 |
|
|
} |
250 |
|
|
|
251 |
|
|
=back |
252 |
|
|
|
253 |
|
|
=head1 AUTHOR |
254 |
|
|
|
255 |
|
|
Marc Lehmann <schmorp@schmorp.de> |
256 |
|
|
http://home.schmorp.de/ |
257 |
|
|
|
258 |
|
|
=cut |
259 |
|
|
|
260 |
|
|
1 |
261 |
|
|
|