1 |
=head1 NAME |
2 |
|
3 |
AnyEvent::Fork::Pool - simple process pool manager on top of AnyEvent::Fork |
4 |
|
5 |
=head1 SYNOPSIS |
6 |
|
7 |
use AnyEvent; |
8 |
use AnyEvent::Fork::Pool; |
9 |
# use AnyEvent::Fork is not needed |
10 |
|
11 |
# all parameters with default values |
12 |
my $pool = new AnyEvent::Fork::Pool |
13 |
"MyWorker::run", |
14 |
|
15 |
# pool management |
16 |
min => 0, # minimum # of processes |
17 |
max => 8, # maximum # of processes |
18 |
busy_time => 0, # wait this before starting a new process |
19 |
max_idle => 1, # wait this before killing an idle process |
20 |
idle_time => 1, # at most this many idle processes |
21 |
|
22 |
# template process |
23 |
template => AnyEvent::Fork->new, # the template process to use |
24 |
require => [MyWorker::], # module(s) to load |
25 |
eval => "# perl code to execute in template", |
26 |
on_destroy => (my $finish = AE::cv), |
27 |
|
28 |
# parameters passed to AnyEvent::Fork::RPC |
29 |
async => 0, |
30 |
on_error => sub { die "FATAL: $_[0]\n" }, |
31 |
on_event => sub { my @ev = @_ }, |
32 |
init => "MyWorker::init", |
33 |
serialiser => $AnyEvent::Fork::RPC::STRING_SERIALISER, |
34 |
; |
35 |
|
36 |
for (1..10) { |
37 |
$pool->call (doit => $_, sub { |
38 |
print "MyWorker::run returned @_\n"; |
39 |
}); |
40 |
} |
41 |
|
42 |
undef $pool; |
43 |
|
44 |
$finish->recv; |
45 |
|
46 |
=head1 DESCRIPTION |
47 |
|
48 |
This module uses processes created via L<AnyEvent::Fork> and the RPC |
49 |
protocol implement in L<AnyEvent::Fork::RPC> to create a load-balanced |
50 |
pool of processes that handles jobs. |
51 |
|
52 |
Understanding of L<AnyEvent::Fork> is helpful but not critical to be able |
53 |
to use this module, but a thorough understanding of L<AnyEvent::Fork::RPC> |
54 |
is, as it defines the actual API that needs to be implemented in the |
55 |
children. |
56 |
|
57 |
=head1 EXAMPLES |
58 |
|
59 |
=head1 API |
60 |
|
61 |
=over 4 |
62 |
|
63 |
=cut |
64 |
|
65 |
package AnyEvent::Fork::Pool; |
66 |
|
67 |
use common::sense; |
68 |
|
69 |
use Guard (); |
70 |
|
71 |
use AnyEvent; |
72 |
use AnyEvent::Fork; # we don't actually depend on it, this is for convenience |
73 |
use AnyEvent::Fork::RPC; |
74 |
|
75 |
our $VERSION = 0.1; |
76 |
|
77 |
=item my $rpc = new AnyEvent::Fork::RPC::pool $function, [key => value...] |
78 |
|
79 |
=over 4 |
80 |
|
81 |
=item on_error => $cb->($msg) |
82 |
|
83 |
Called on (fatal) errors, with a descriptive (hopefully) message. If |
84 |
this callback is not provided, but C<on_event> is, then the C<on_event> |
85 |
callback is called with the first argument being the string C<error>, |
86 |
followed by the error message. |
87 |
|
88 |
If neither handler is provided it prints the error to STDERR and will |
89 |
start failing badly. |
90 |
|
91 |
=item on_event => $cb->(...) |
92 |
|
93 |
Called for every call to the C<AnyEvent::Fork::RPC::event> function in the |
94 |
child, with the arguments of that function passed to the callback. |
95 |
|
96 |
Also called on errors when no C<on_error> handler is provided. |
97 |
|
98 |
=item on_destroy => $cb->() |
99 |
|
100 |
Called when the C<$rpc> object has been destroyed and all requests have |
101 |
been successfully handled. This is useful when you queue some requests and |
102 |
want the child to go away after it has handled them. The problem is that |
103 |
the parent must not exit either until all requests have been handled, and |
104 |
this can be accomplished by waiting for this callback. |
105 |
|
106 |
=item init => $function (default none) |
107 |
|
108 |
When specified (by name), this function is called in the child as the very |
109 |
first thing when taking over the process, with all the arguments normally |
110 |
passed to the C<AnyEvent::Fork::run> function, except the communications |
111 |
socket. |
112 |
|
113 |
It can be used to do one-time things in the child such as storing passed |
114 |
parameters or opening database connections. |
115 |
|
116 |
It is called very early - before the serialisers are created or the |
117 |
C<$function> name is resolved into a function reference, so it could be |
118 |
used to load any modules that provide the serialiser or function. It can |
119 |
not, however, create events. |
120 |
|
121 |
=item async => $boolean (default: 0) |
122 |
|
123 |
The default server used in the child does all I/O blockingly, and only |
124 |
allows a single RPC call to execute concurrently. |
125 |
|
126 |
Setting C<async> to a true value switches to another implementation that |
127 |
uses L<AnyEvent> in the child and allows multiple concurrent RPC calls. |
128 |
|
129 |
The actual API in the child is documented in the section that describes |
130 |
the calling semantics of the returned C<$rpc> function. |
131 |
|
132 |
If you want to pre-load the actual back-end modules to enable memory |
133 |
sharing, then you should load C<AnyEvent::Fork::RPC::Sync> for |
134 |
synchronous, and C<AnyEvent::Fork::RPC::Async> for asynchronous mode. |
135 |
|
136 |
If you use a template process and want to fork both sync and async |
137 |
children, then it is permissible to load both modules. |
138 |
|
139 |
=item serialiser => $string (default: '(sub { pack "(w/a*)*", @_ }, sub { unpack "(w/a*)*", shift })') |
140 |
|
141 |
All arguments, result data and event data have to be serialised to be |
142 |
transferred between the processes. For this, they have to be frozen and |
143 |
thawed in both parent and child processes. |
144 |
|
145 |
By default, only octet strings can be passed between the processes, which |
146 |
is reasonably fast and efficient. |
147 |
|
148 |
For more complicated use cases, you can provide your own freeze and thaw |
149 |
functions, by specifying a string with perl source code. It's supposed to |
150 |
return two code references when evaluated: the first receives a list of |
151 |
perl values and must return an octet string. The second receives the octet |
152 |
string and must return the original list of values. |
153 |
|
154 |
If you need an external module for serialisation, then you can either |
155 |
pre-load it into your L<AnyEvent::Fork> process, or you can add a C<use> |
156 |
or C<require> statement into the serialiser string. Or both. |
157 |
|
158 |
=back |
159 |
|
160 |
See the examples section earlier in this document for some actual |
161 |
examples. |
162 |
|
163 |
=cut |
164 |
|
165 |
sub new { |
166 |
my ($self, $function, %arg) = @_; |
167 |
|
168 |
my $serialiser = delete $arg{serialiser} || $STRING_SERIALISER; |
169 |
my $on_event = delete $arg{on_event}; |
170 |
my $on_error = delete $arg{on_error}; |
171 |
my $on_destroy = delete $arg{on_destroy}; |
172 |
|
173 |
# default for on_error is to on_event, if specified |
174 |
$on_error ||= $on_event |
175 |
? sub { $on_event->(error => shift) } |
176 |
: sub { die "AnyEvent::Fork::RPC: uncaught error: $_[0].\n" }; |
177 |
|
178 |
# default for on_event is to raise an error |
179 |
$on_event ||= sub { $on_error->("event received, but no on_event handler") }; |
180 |
|
181 |
my ($f, $t) = eval $serialiser; die $@ if $@; |
182 |
|
183 |
my (@rcb, %rcb, $fh, $shutdown, $wbuf, $ww); |
184 |
my ($rlen, $rbuf, $rw) = 512 - 16; |
185 |
|
186 |
my $wcb = sub { |
187 |
my $len = syswrite $fh, $wbuf; |
188 |
|
189 |
unless (defined $len) { |
190 |
if ($! != Errno::EAGAIN && $! != Errno::EWOULDBLOCK) { |
191 |
undef $rw; undef $ww; # it ends here |
192 |
$on_error->("$!"); |
193 |
} |
194 |
} |
195 |
|
196 |
substr $wbuf, 0, $len, ""; |
197 |
|
198 |
unless (length $wbuf) { |
199 |
undef $ww; |
200 |
$shutdown and shutdown $fh, 1; |
201 |
} |
202 |
}; |
203 |
|
204 |
my $module = "AnyEvent::Fork::RPC::" . ($arg{async} ? "Async" : "Sync"); |
205 |
|
206 |
$self->require ($module) |
207 |
->send_arg ($function, $arg{init}, $serialiser) |
208 |
->run ("$module\::run", sub { |
209 |
$fh = shift; |
210 |
|
211 |
my ($id, $len); |
212 |
$rw = AE::io $fh, 0, sub { |
213 |
$rlen = $rlen * 2 + 16 if $rlen - 128 < length $rbuf; |
214 |
$len = sysread $fh, $rbuf, $rlen - length $rbuf, length $rbuf; |
215 |
|
216 |
if ($len) { |
217 |
while (8 <= length $rbuf) { |
218 |
($id, $len) = unpack "LL", $rbuf; |
219 |
8 + $len <= length $rbuf |
220 |
or last; |
221 |
|
222 |
my @r = $t->(substr $rbuf, 8, $len); |
223 |
substr $rbuf, 0, 8 + $len, ""; |
224 |
|
225 |
if ($id) { |
226 |
if (@rcb) { |
227 |
(shift @rcb)->(@r); |
228 |
} elsif (my $cb = delete $rcb{$id}) { |
229 |
$cb->(@r); |
230 |
} else { |
231 |
undef $rw; undef $ww; |
232 |
$on_error->("unexpected data from child"); |
233 |
} |
234 |
} else { |
235 |
$on_event->(@r); |
236 |
} |
237 |
} |
238 |
} elsif (defined $len) { |
239 |
undef $rw; undef $ww; # it ends here |
240 |
|
241 |
if (@rcb || %rcb) { |
242 |
$on_error->("unexpected eof"); |
243 |
} else { |
244 |
$on_destroy->(); |
245 |
} |
246 |
} elsif ($! != Errno::EAGAIN && $! != Errno::EWOULDBLOCK) { |
247 |
undef $rw; undef $ww; # it ends here |
248 |
$on_error->("read: $!"); |
249 |
} |
250 |
}; |
251 |
|
252 |
$ww ||= AE::io $fh, 1, $wcb; |
253 |
}); |
254 |
|
255 |
my $guard = Guard::guard { |
256 |
$shutdown = 1; |
257 |
$ww ||= $fh && AE::io $fh, 1, $wcb; |
258 |
}; |
259 |
|
260 |
my $id; |
261 |
|
262 |
$arg{async} |
263 |
? sub { |
264 |
$id = ($id == 0xffffffff ? 0 : $id) + 1; |
265 |
$id = ($id == 0xffffffff ? 0 : $id) + 1 while exists $rcb{$id}; # rarely loops |
266 |
|
267 |
$rcb{$id} = pop; |
268 |
|
269 |
$guard; # keep it alive |
270 |
|
271 |
$wbuf .= pack "LL/a*", $id, &$f; |
272 |
$ww ||= $fh && AE::io $fh, 1, $wcb; |
273 |
} |
274 |
: sub { |
275 |
push @rcb, pop; |
276 |
|
277 |
$guard; # keep it alive |
278 |
|
279 |
$wbuf .= pack "L/a*", &$f; |
280 |
$ww ||= $fh && AE::io $fh, 1, $wcb; |
281 |
} |
282 |
} |
283 |
|
284 |
=item $pool->call (..., $cb->(...)) |
285 |
|
286 |
=back |
287 |
|
288 |
=head1 SEE ALSO |
289 |
|
290 |
L<AnyEvent::Fork>, to create the processes in the first place. |
291 |
|
292 |
L<AnyEvent::Fork::RPC>, which implements the RPC protocol and API. |
293 |
|
294 |
=head1 AUTHOR AND CONTACT INFORMATION |
295 |
|
296 |
Marc Lehmann <schmorp@schmorp.de> |
297 |
http://software.schmorp.de/pkg/AnyEvent-Fork-Pool |
298 |
|
299 |
=cut |
300 |
|
301 |
1 |
302 |
|