1 |
root |
1.1 |
=head1 NAME |
2 |
|
|
|
3 |
|
|
AnyEvent::Fork::Pool - simple process pool manager on top of AnyEvent::Fork |
4 |
|
|
|
5 |
|
|
=head1 SYNOPSIS |
6 |
|
|
|
7 |
|
|
use AnyEvent; |
8 |
|
|
use AnyEvent::Fork::Pool; |
9 |
|
|
# use AnyEvent::Fork is not needed |
10 |
|
|
|
11 |
|
|
# all parameters with default values |
12 |
|
|
my $pool = new AnyEvent::Fork::Pool |
13 |
|
|
"MyWorker::run", |
14 |
|
|
|
15 |
|
|
# pool management |
16 |
|
|
min => 0, # minimum # of processes |
17 |
|
|
max => 8, # maximum # of processes |
18 |
|
|
busy_time => 0, # wait this before starting a new process |
19 |
|
|
max_idle => 1, # wait this before killing an idle process |
20 |
|
|
idle_time => 1, # at most this many idle processes |
21 |
|
|
|
22 |
|
|
# template process |
23 |
|
|
template => AnyEvent::Fork->new, # the template process to use |
24 |
|
|
require => [MyWorker::], # module(s) to load |
25 |
|
|
eval => "# perl code to execute in template", |
26 |
|
|
on_destroy => (my $finish = AE::cv), |
27 |
|
|
|
28 |
|
|
# parameters passed to AnyEvent::Fork::RPC |
29 |
|
|
async => 0, |
30 |
|
|
on_error => sub { die "FATAL: $_[0]\n" }, |
31 |
|
|
on_event => sub { my @ev = @_ }, |
32 |
|
|
init => "MyWorker::init", |
33 |
|
|
serialiser => $AnyEvent::Fork::RPC::STRING_SERIALISER, |
34 |
|
|
; |
35 |
|
|
|
36 |
|
|
for (1..10) { |
37 |
|
|
$pool->call (doit => $_, sub { |
38 |
|
|
print "MyWorker::run returned @_\n"; |
39 |
|
|
}); |
40 |
|
|
} |
41 |
|
|
|
42 |
|
|
undef $pool; |
43 |
|
|
|
44 |
|
|
$finish->recv; |
45 |
|
|
|
46 |
|
|
=head1 DESCRIPTION |
47 |
|
|
|
48 |
|
|
This module uses processes created via L<AnyEvent::Fork> and the RPC |
49 |
|
|
protocol implement in L<AnyEvent::Fork::RPC> to create a load-balanced |
50 |
|
|
pool of processes that handles jobs. |
51 |
|
|
|
52 |
|
|
Understanding of L<AnyEvent::Fork> is helpful but not critical to be able |
53 |
|
|
to use this module, but a thorough understanding of L<AnyEvent::Fork::RPC> |
54 |
|
|
is, as it defines the actual API that needs to be implemented in the |
55 |
|
|
children. |
56 |
|
|
|
57 |
|
|
=head1 EXAMPLES |
58 |
|
|
|
59 |
|
|
=head1 API |
60 |
|
|
|
61 |
|
|
=over 4 |
62 |
|
|
|
63 |
|
|
=cut |
64 |
|
|
|
65 |
|
|
package AnyEvent::Fork::Pool; |
66 |
|
|
|
67 |
|
|
use common::sense; |
68 |
|
|
|
69 |
|
|
use Guard (); |
70 |
|
|
|
71 |
|
|
use AnyEvent; |
72 |
|
|
use AnyEvent::Fork; # we don't actually depend on it, this is for convenience |
73 |
|
|
use AnyEvent::Fork::RPC; |
74 |
|
|
|
75 |
|
|
our $VERSION = 0.1; |
76 |
|
|
|
77 |
|
|
=item my $rpc = new AnyEvent::Fork::RPC::pool $function, [key => value...] |
78 |
|
|
|
79 |
|
|
=over 4 |
80 |
|
|
|
81 |
|
|
=item on_error => $cb->($msg) |
82 |
|
|
|
83 |
|
|
Called on (fatal) errors, with a descriptive (hopefully) message. If |
84 |
|
|
this callback is not provided, but C<on_event> is, then the C<on_event> |
85 |
|
|
callback is called with the first argument being the string C<error>, |
86 |
|
|
followed by the error message. |
87 |
|
|
|
88 |
|
|
If neither handler is provided it prints the error to STDERR and will |
89 |
|
|
start failing badly. |
90 |
|
|
|
91 |
|
|
=item on_event => $cb->(...) |
92 |
|
|
|
93 |
|
|
Called for every call to the C<AnyEvent::Fork::RPC::event> function in the |
94 |
|
|
child, with the arguments of that function passed to the callback. |
95 |
|
|
|
96 |
|
|
Also called on errors when no C<on_error> handler is provided. |
97 |
|
|
|
98 |
|
|
=item on_destroy => $cb->() |
99 |
|
|
|
100 |
|
|
Called when the C<$rpc> object has been destroyed and all requests have |
101 |
|
|
been successfully handled. This is useful when you queue some requests and |
102 |
|
|
want the child to go away after it has handled them. The problem is that |
103 |
|
|
the parent must not exit either until all requests have been handled, and |
104 |
|
|
this can be accomplished by waiting for this callback. |
105 |
|
|
|
106 |
|
|
=item init => $function (default none) |
107 |
|
|
|
108 |
|
|
When specified (by name), this function is called in the child as the very |
109 |
|
|
first thing when taking over the process, with all the arguments normally |
110 |
|
|
passed to the C<AnyEvent::Fork::run> function, except the communications |
111 |
|
|
socket. |
112 |
|
|
|
113 |
|
|
It can be used to do one-time things in the child such as storing passed |
114 |
|
|
parameters or opening database connections. |
115 |
|
|
|
116 |
|
|
It is called very early - before the serialisers are created or the |
117 |
|
|
C<$function> name is resolved into a function reference, so it could be |
118 |
|
|
used to load any modules that provide the serialiser or function. It can |
119 |
|
|
not, however, create events. |
120 |
|
|
|
121 |
|
|
=item async => $boolean (default: 0) |
122 |
|
|
|
123 |
|
|
The default server used in the child does all I/O blockingly, and only |
124 |
|
|
allows a single RPC call to execute concurrently. |
125 |
|
|
|
126 |
|
|
Setting C<async> to a true value switches to another implementation that |
127 |
|
|
uses L<AnyEvent> in the child and allows multiple concurrent RPC calls. |
128 |
|
|
|
129 |
|
|
The actual API in the child is documented in the section that describes |
130 |
|
|
the calling semantics of the returned C<$rpc> function. |
131 |
|
|
|
132 |
|
|
If you want to pre-load the actual back-end modules to enable memory |
133 |
|
|
sharing, then you should load C<AnyEvent::Fork::RPC::Sync> for |
134 |
|
|
synchronous, and C<AnyEvent::Fork::RPC::Async> for asynchronous mode. |
135 |
|
|
|
136 |
|
|
If you use a template process and want to fork both sync and async |
137 |
|
|
children, then it is permissible to load both modules. |
138 |
|
|
|
139 |
|
|
=item serialiser => $string (default: '(sub { pack "(w/a*)*", @_ }, sub { unpack "(w/a*)*", shift })') |
140 |
|
|
|
141 |
|
|
All arguments, result data and event data have to be serialised to be |
142 |
|
|
transferred between the processes. For this, they have to be frozen and |
143 |
|
|
thawed in both parent and child processes. |
144 |
|
|
|
145 |
|
|
By default, only octet strings can be passed between the processes, which |
146 |
|
|
is reasonably fast and efficient. |
147 |
|
|
|
148 |
|
|
For more complicated use cases, you can provide your own freeze and thaw |
149 |
|
|
functions, by specifying a string with perl source code. It's supposed to |
150 |
|
|
return two code references when evaluated: the first receives a list of |
151 |
|
|
perl values and must return an octet string. The second receives the octet |
152 |
|
|
string and must return the original list of values. |
153 |
|
|
|
154 |
|
|
If you need an external module for serialisation, then you can either |
155 |
|
|
pre-load it into your L<AnyEvent::Fork> process, or you can add a C<use> |
156 |
|
|
or C<require> statement into the serialiser string. Or both. |
157 |
|
|
|
158 |
|
|
=back |
159 |
|
|
|
160 |
|
|
See the examples section earlier in this document for some actual |
161 |
|
|
examples. |
162 |
|
|
|
163 |
|
|
=cut |
164 |
|
|
|
165 |
|
|
sub new { |
166 |
|
|
my ($self, $function, %arg) = @_; |
167 |
|
|
|
168 |
|
|
my $serialiser = delete $arg{serialiser} || $STRING_SERIALISER; |
169 |
|
|
my $on_event = delete $arg{on_event}; |
170 |
|
|
my $on_error = delete $arg{on_error}; |
171 |
|
|
my $on_destroy = delete $arg{on_destroy}; |
172 |
|
|
|
173 |
|
|
# default for on_error is to on_event, if specified |
174 |
|
|
$on_error ||= $on_event |
175 |
|
|
? sub { $on_event->(error => shift) } |
176 |
|
|
: sub { die "AnyEvent::Fork::RPC: uncaught error: $_[0].\n" }; |
177 |
|
|
|
178 |
|
|
# default for on_event is to raise an error |
179 |
|
|
$on_event ||= sub { $on_error->("event received, but no on_event handler") }; |
180 |
|
|
|
181 |
|
|
my ($f, $t) = eval $serialiser; die $@ if $@; |
182 |
|
|
|
183 |
|
|
my (@rcb, %rcb, $fh, $shutdown, $wbuf, $ww); |
184 |
|
|
my ($rlen, $rbuf, $rw) = 512 - 16; |
185 |
|
|
|
186 |
|
|
my $wcb = sub { |
187 |
|
|
my $len = syswrite $fh, $wbuf; |
188 |
|
|
|
189 |
|
|
unless (defined $len) { |
190 |
|
|
if ($! != Errno::EAGAIN && $! != Errno::EWOULDBLOCK) { |
191 |
|
|
undef $rw; undef $ww; # it ends here |
192 |
|
|
$on_error->("$!"); |
193 |
|
|
} |
194 |
|
|
} |
195 |
|
|
|
196 |
|
|
substr $wbuf, 0, $len, ""; |
197 |
|
|
|
198 |
|
|
unless (length $wbuf) { |
199 |
|
|
undef $ww; |
200 |
|
|
$shutdown and shutdown $fh, 1; |
201 |
|
|
} |
202 |
|
|
}; |
203 |
|
|
|
204 |
|
|
my $module = "AnyEvent::Fork::RPC::" . ($arg{async} ? "Async" : "Sync"); |
205 |
|
|
|
206 |
|
|
$self->require ($module) |
207 |
|
|
->send_arg ($function, $arg{init}, $serialiser) |
208 |
|
|
->run ("$module\::run", sub { |
209 |
|
|
$fh = shift; |
210 |
|
|
|
211 |
|
|
my ($id, $len); |
212 |
|
|
$rw = AE::io $fh, 0, sub { |
213 |
|
|
$rlen = $rlen * 2 + 16 if $rlen - 128 < length $rbuf; |
214 |
|
|
$len = sysread $fh, $rbuf, $rlen - length $rbuf, length $rbuf; |
215 |
|
|
|
216 |
|
|
if ($len) { |
217 |
|
|
while (8 <= length $rbuf) { |
218 |
|
|
($id, $len) = unpack "LL", $rbuf; |
219 |
|
|
8 + $len <= length $rbuf |
220 |
|
|
or last; |
221 |
|
|
|
222 |
|
|
my @r = $t->(substr $rbuf, 8, $len); |
223 |
|
|
substr $rbuf, 0, 8 + $len, ""; |
224 |
|
|
|
225 |
|
|
if ($id) { |
226 |
|
|
if (@rcb) { |
227 |
|
|
(shift @rcb)->(@r); |
228 |
|
|
} elsif (my $cb = delete $rcb{$id}) { |
229 |
|
|
$cb->(@r); |
230 |
|
|
} else { |
231 |
|
|
undef $rw; undef $ww; |
232 |
|
|
$on_error->("unexpected data from child"); |
233 |
|
|
} |
234 |
|
|
} else { |
235 |
|
|
$on_event->(@r); |
236 |
|
|
} |
237 |
|
|
} |
238 |
|
|
} elsif (defined $len) { |
239 |
|
|
undef $rw; undef $ww; # it ends here |
240 |
|
|
|
241 |
|
|
if (@rcb || %rcb) { |
242 |
|
|
$on_error->("unexpected eof"); |
243 |
|
|
} else { |
244 |
|
|
$on_destroy->(); |
245 |
|
|
} |
246 |
|
|
} elsif ($! != Errno::EAGAIN && $! != Errno::EWOULDBLOCK) { |
247 |
|
|
undef $rw; undef $ww; # it ends here |
248 |
|
|
$on_error->("read: $!"); |
249 |
|
|
} |
250 |
|
|
}; |
251 |
|
|
|
252 |
|
|
$ww ||= AE::io $fh, 1, $wcb; |
253 |
|
|
}); |
254 |
|
|
|
255 |
|
|
my $guard = Guard::guard { |
256 |
|
|
$shutdown = 1; |
257 |
|
|
$ww ||= $fh && AE::io $fh, 1, $wcb; |
258 |
|
|
}; |
259 |
|
|
|
260 |
|
|
my $id; |
261 |
|
|
|
262 |
|
|
$arg{async} |
263 |
|
|
? sub { |
264 |
|
|
$id = ($id == 0xffffffff ? 0 : $id) + 1; |
265 |
|
|
$id = ($id == 0xffffffff ? 0 : $id) + 1 while exists $rcb{$id}; # rarely loops |
266 |
|
|
|
267 |
|
|
$rcb{$id} = pop; |
268 |
|
|
|
269 |
|
|
$guard; # keep it alive |
270 |
|
|
|
271 |
|
|
$wbuf .= pack "LL/a*", $id, &$f; |
272 |
|
|
$ww ||= $fh && AE::io $fh, 1, $wcb; |
273 |
|
|
} |
274 |
|
|
: sub { |
275 |
|
|
push @rcb, pop; |
276 |
|
|
|
277 |
|
|
$guard; # keep it alive |
278 |
|
|
|
279 |
|
|
$wbuf .= pack "L/a*", &$f; |
280 |
|
|
$ww ||= $fh && AE::io $fh, 1, $wcb; |
281 |
|
|
} |
282 |
|
|
} |
283 |
|
|
|
284 |
|
|
=item $pool->call (..., $cb->(...)) |
285 |
|
|
|
286 |
|
|
=back |
287 |
|
|
|
288 |
|
|
=head1 SEE ALSO |
289 |
|
|
|
290 |
|
|
L<AnyEvent::Fork>, to create the processes in the first place. |
291 |
|
|
|
292 |
|
|
L<AnyEvent::Fork::RPC>, which implements the RPC protocol and API. |
293 |
|
|
|
294 |
|
|
=head1 AUTHOR AND CONTACT INFORMATION |
295 |
|
|
|
296 |
|
|
Marc Lehmann <schmorp@schmorp.de> |
297 |
|
|
http://software.schmorp.de/pkg/AnyEvent-Fork-Pool |
298 |
|
|
|
299 |
|
|
=cut |
300 |
|
|
|
301 |
|
|
1 |
302 |
|
|
|