ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-Fork-Pool/Pool.pm
Revision: 1.1
Committed: Thu Apr 18 14:24:01 2013 UTC (11 years, 1 month ago) by root
Branch: MAIN
Log Message:
*** empty log message ***

File Contents

# Content
1 =head1 NAME
2
3 AnyEvent::Fork::Pool - simple process pool manager on top of AnyEvent::Fork
4
5 =head1 SYNOPSIS
6
7 use AnyEvent;
8 use AnyEvent::Fork::Pool;
9 # use AnyEvent::Fork is not needed
10
11 # all parameters with default values
12 my $pool = new AnyEvent::Fork::Pool
13 "MyWorker::run",
14
15 # pool management
16 min => 0, # minimum # of processes
17 max => 8, # maximum # of processes
18 busy_time => 0, # wait this before starting a new process
19 max_idle => 1, # wait this before killing an idle process
20 idle_time => 1, # at most this many idle processes
21
22 # template process
23 template => AnyEvent::Fork->new, # the template process to use
24 require => [MyWorker::], # module(s) to load
25 eval => "# perl code to execute in template",
26 on_destroy => (my $finish = AE::cv),
27
28 # parameters passed to AnyEvent::Fork::RPC
29 async => 0,
30 on_error => sub { die "FATAL: $_[0]\n" },
31 on_event => sub { my @ev = @_ },
32 init => "MyWorker::init",
33 serialiser => $AnyEvent::Fork::RPC::STRING_SERIALISER,
34 ;
35
36 for (1..10) {
37 $pool->call (doit => $_, sub {
38 print "MyWorker::run returned @_\n";
39 });
40 }
41
42 undef $pool;
43
44 $finish->recv;
45
46 =head1 DESCRIPTION
47
48 This module uses processes created via L<AnyEvent::Fork> and the RPC
49 protocol implement in L<AnyEvent::Fork::RPC> to create a load-balanced
50 pool of processes that handles jobs.
51
52 Understanding of L<AnyEvent::Fork> is helpful but not critical to be able
53 to use this module, but a thorough understanding of L<AnyEvent::Fork::RPC>
54 is, as it defines the actual API that needs to be implemented in the
55 children.
56
57 =head1 EXAMPLES
58
59 =head1 API
60
61 =over 4
62
63 =cut
64
65 package AnyEvent::Fork::Pool;
66
67 use common::sense;
68
69 use Guard ();
70
71 use AnyEvent;
72 use AnyEvent::Fork; # we don't actually depend on it, this is for convenience
73 use AnyEvent::Fork::RPC;
74
75 our $VERSION = 0.1;
76
77 =item my $rpc = new AnyEvent::Fork::RPC::pool $function, [key => value...]
78
79 =over 4
80
81 =item on_error => $cb->($msg)
82
83 Called on (fatal) errors, with a descriptive (hopefully) message. If
84 this callback is not provided, but C<on_event> is, then the C<on_event>
85 callback is called with the first argument being the string C<error>,
86 followed by the error message.
87
88 If neither handler is provided it prints the error to STDERR and will
89 start failing badly.
90
91 =item on_event => $cb->(...)
92
93 Called for every call to the C<AnyEvent::Fork::RPC::event> function in the
94 child, with the arguments of that function passed to the callback.
95
96 Also called on errors when no C<on_error> handler is provided.
97
98 =item on_destroy => $cb->()
99
100 Called when the C<$rpc> object has been destroyed and all requests have
101 been successfully handled. This is useful when you queue some requests and
102 want the child to go away after it has handled them. The problem is that
103 the parent must not exit either until all requests have been handled, and
104 this can be accomplished by waiting for this callback.
105
106 =item init => $function (default none)
107
108 When specified (by name), this function is called in the child as the very
109 first thing when taking over the process, with all the arguments normally
110 passed to the C<AnyEvent::Fork::run> function, except the communications
111 socket.
112
113 It can be used to do one-time things in the child such as storing passed
114 parameters or opening database connections.
115
116 It is called very early - before the serialisers are created or the
117 C<$function> name is resolved into a function reference, so it could be
118 used to load any modules that provide the serialiser or function. It can
119 not, however, create events.
120
121 =item async => $boolean (default: 0)
122
123 The default server used in the child does all I/O blockingly, and only
124 allows a single RPC call to execute concurrently.
125
126 Setting C<async> to a true value switches to another implementation that
127 uses L<AnyEvent> in the child and allows multiple concurrent RPC calls.
128
129 The actual API in the child is documented in the section that describes
130 the calling semantics of the returned C<$rpc> function.
131
132 If you want to pre-load the actual back-end modules to enable memory
133 sharing, then you should load C<AnyEvent::Fork::RPC::Sync> for
134 synchronous, and C<AnyEvent::Fork::RPC::Async> for asynchronous mode.
135
136 If you use a template process and want to fork both sync and async
137 children, then it is permissible to load both modules.
138
139 =item serialiser => $string (default: '(sub { pack "(w/a*)*", @_ }, sub { unpack "(w/a*)*", shift })')
140
141 All arguments, result data and event data have to be serialised to be
142 transferred between the processes. For this, they have to be frozen and
143 thawed in both parent and child processes.
144
145 By default, only octet strings can be passed between the processes, which
146 is reasonably fast and efficient.
147
148 For more complicated use cases, you can provide your own freeze and thaw
149 functions, by specifying a string with perl source code. It's supposed to
150 return two code references when evaluated: the first receives a list of
151 perl values and must return an octet string. The second receives the octet
152 string and must return the original list of values.
153
154 If you need an external module for serialisation, then you can either
155 pre-load it into your L<AnyEvent::Fork> process, or you can add a C<use>
156 or C<require> statement into the serialiser string. Or both.
157
158 =back
159
160 See the examples section earlier in this document for some actual
161 examples.
162
163 =cut
164
165 sub new {
166 my ($self, $function, %arg) = @_;
167
168 my $serialiser = delete $arg{serialiser} || $STRING_SERIALISER;
169 my $on_event = delete $arg{on_event};
170 my $on_error = delete $arg{on_error};
171 my $on_destroy = delete $arg{on_destroy};
172
173 # default for on_error is to on_event, if specified
174 $on_error ||= $on_event
175 ? sub { $on_event->(error => shift) }
176 : sub { die "AnyEvent::Fork::RPC: uncaught error: $_[0].\n" };
177
178 # default for on_event is to raise an error
179 $on_event ||= sub { $on_error->("event received, but no on_event handler") };
180
181 my ($f, $t) = eval $serialiser; die $@ if $@;
182
183 my (@rcb, %rcb, $fh, $shutdown, $wbuf, $ww);
184 my ($rlen, $rbuf, $rw) = 512 - 16;
185
186 my $wcb = sub {
187 my $len = syswrite $fh, $wbuf;
188
189 unless (defined $len) {
190 if ($! != Errno::EAGAIN && $! != Errno::EWOULDBLOCK) {
191 undef $rw; undef $ww; # it ends here
192 $on_error->("$!");
193 }
194 }
195
196 substr $wbuf, 0, $len, "";
197
198 unless (length $wbuf) {
199 undef $ww;
200 $shutdown and shutdown $fh, 1;
201 }
202 };
203
204 my $module = "AnyEvent::Fork::RPC::" . ($arg{async} ? "Async" : "Sync");
205
206 $self->require ($module)
207 ->send_arg ($function, $arg{init}, $serialiser)
208 ->run ("$module\::run", sub {
209 $fh = shift;
210
211 my ($id, $len);
212 $rw = AE::io $fh, 0, sub {
213 $rlen = $rlen * 2 + 16 if $rlen - 128 < length $rbuf;
214 $len = sysread $fh, $rbuf, $rlen - length $rbuf, length $rbuf;
215
216 if ($len) {
217 while (8 <= length $rbuf) {
218 ($id, $len) = unpack "LL", $rbuf;
219 8 + $len <= length $rbuf
220 or last;
221
222 my @r = $t->(substr $rbuf, 8, $len);
223 substr $rbuf, 0, 8 + $len, "";
224
225 if ($id) {
226 if (@rcb) {
227 (shift @rcb)->(@r);
228 } elsif (my $cb = delete $rcb{$id}) {
229 $cb->(@r);
230 } else {
231 undef $rw; undef $ww;
232 $on_error->("unexpected data from child");
233 }
234 } else {
235 $on_event->(@r);
236 }
237 }
238 } elsif (defined $len) {
239 undef $rw; undef $ww; # it ends here
240
241 if (@rcb || %rcb) {
242 $on_error->("unexpected eof");
243 } else {
244 $on_destroy->();
245 }
246 } elsif ($! != Errno::EAGAIN && $! != Errno::EWOULDBLOCK) {
247 undef $rw; undef $ww; # it ends here
248 $on_error->("read: $!");
249 }
250 };
251
252 $ww ||= AE::io $fh, 1, $wcb;
253 });
254
255 my $guard = Guard::guard {
256 $shutdown = 1;
257 $ww ||= $fh && AE::io $fh, 1, $wcb;
258 };
259
260 my $id;
261
262 $arg{async}
263 ? sub {
264 $id = ($id == 0xffffffff ? 0 : $id) + 1;
265 $id = ($id == 0xffffffff ? 0 : $id) + 1 while exists $rcb{$id}; # rarely loops
266
267 $rcb{$id} = pop;
268
269 $guard; # keep it alive
270
271 $wbuf .= pack "LL/a*", $id, &$f;
272 $ww ||= $fh && AE::io $fh, 1, $wcb;
273 }
274 : sub {
275 push @rcb, pop;
276
277 $guard; # keep it alive
278
279 $wbuf .= pack "L/a*", &$f;
280 $ww ||= $fh && AE::io $fh, 1, $wcb;
281 }
282 }
283
284 =item $pool->call (..., $cb->(...))
285
286 =back
287
288 =head1 SEE ALSO
289
290 L<AnyEvent::Fork>, to create the processes in the first place.
291
292 L<AnyEvent::Fork::RPC>, which implements the RPC protocol and API.
293
294 =head1 AUTHOR AND CONTACT INFORMATION
295
296 Marc Lehmann <schmorp@schmorp.de>
297 http://software.schmorp.de/pkg/AnyEvent-Fork-Pool
298
299 =cut
300
301 1
302