ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-Fork-Pool/Pool.pm
(Generate patch)

Comparing AnyEvent-Fork-Pool/Pool.pm (file contents):
Revision 1.1 by root, Thu Apr 18 14:24:01 2013 UTC vs.
Revision 1.4 by root, Sat Apr 20 16:39:14 2013 UTC

7 use AnyEvent; 7 use AnyEvent;
8 use AnyEvent::Fork::Pool; 8 use AnyEvent::Fork::Pool;
9 # use AnyEvent::Fork is not needed 9 # use AnyEvent::Fork is not needed
10 10
11 # all parameters with default values 11 # all parameters with default values
12 my $pool = new AnyEvent::Fork::Pool 12 my $pool = AnyEvent::Fork
13 "MyWorker::run", 13 ->new
14 ->require ("MyWorker")
15 ->AnyEvent::Fork::Pool::run (
16 "MyWorker::run", # the worker function
14 17
15 # pool management 18 # pool management
16 min => 0, # minimum # of processes
17 max => 8, # maximum # of processes 19 max => 4, # absolute maximum # of processes
20 idle => 2, # minimum # of idle processes
21 load => 2, # queue at most this number of jobs per process
18 busy_time => 0, # wait this before starting a new process 22 start => 0.1, # wait this many seconds before starting a new process
19 max_idle => 1, # wait this before killing an idle process 23 stop => 1, # wait this many seconds before stopping an idle process
20 idle_time => 1, # at most this many idle processes 24 on_destroy => (my $finish = AE::cv), # called when object is destroyed
21 25
22 # template process
23 template => AnyEvent::Fork->new, # the template process to use
24 require => [MyWorker::], # module(s) to load
25 eval => "# perl code to execute in template",
26 on_destroy => (my $finish = AE::cv),
27
28 # parameters passed to AnyEvent::Fork::RPC 26 # parameters passed to AnyEvent::Fork::RPC
29 async => 0, 27 async => 0,
30 on_error => sub { die "FATAL: $_[0]\n" }, 28 on_error => sub { die "FATAL: $_[0]\n" },
31 on_event => sub { my @ev = @_ }, 29 on_event => sub { my @ev = @_ },
32 init => "MyWorker::init", 30 init => "MyWorker::init",
33 serialiser => $AnyEvent::Fork::RPC::STRING_SERIALISER, 31 serialiser => $AnyEvent::Fork::RPC::STRING_SERIALISER,
34 ; 32 );
35 33
36 for (1..10) { 34 for (1..10) {
37 $pool->call (doit => $_, sub { 35 $pool->(doit => $_, sub {
38 print "MyWorker::run returned @_\n"; 36 print "MyWorker::run returned @_\n";
39 }); 37 });
40 } 38 }
41 39
42 undef $pool; 40 undef $pool;
54is, as it defines the actual API that needs to be implemented in the 52is, as it defines the actual API that needs to be implemented in the
55children. 53children.
56 54
57=head1 EXAMPLES 55=head1 EXAMPLES
58 56
59=head1 API 57=head1 PARENT USAGE
60 58
61=over 4 59=over 4
62 60
63=cut 61=cut
64 62
65package AnyEvent::Fork::Pool; 63package AnyEvent::Fork::Pool;
66 64
67use common::sense; 65use common::sense;
68 66
67use Scalar::Util ();
68
69use Guard (); 69use Guard ();
70use Array::Heap ();
70 71
71use AnyEvent; 72use AnyEvent;
72use AnyEvent::Fork; # we don't actually depend on it, this is for convenience 73use AnyEvent::Fork; # we don't actually depend on it, this is for convenience
73use AnyEvent::Fork::RPC; 74use AnyEvent::Fork::RPC;
74 75
76# these are used for the first and last argument of events
77# in the hope of not colliding. yes, I don't like it either,
78# but didn't come up with an obviously better alternative.
79my $magic0 = ':t6Z@HK1N%Dx@_7?=~-7NQgWDdAs6a,jFN=wLO0*jD*1%P';
80my $magic1 = '<~53rexz.U`!]X[A235^"fyEoiTF\T~oH1l/N6+Djep9b~bI9`\1x%B~vWO1q*';
81
75our $VERSION = 0.1; 82our $VERSION = 0.1;
76 83
77=item my $rpc = new AnyEvent::Fork::RPC::pool $function, [key => value...] 84=item my $rpc = AnyEvent::Fork::Pool::run $fork, $function, [key => value...]
85
86The traditional way to call it. But it is way cooler to call it in the
87following way:
88
89=item my $rpc = $fork->AnyEvent::Fork::Pool::run ($function, [key => value...])
90
91Creates a new pool object with the specified C<$function> as function
92(name) to call for each request. The pool uses the C<$fork> object as the
93template when creating worker processes.
94
95You can supply your own template process, or tell C<AnyEvent::Fork::Pool>
96to create one.
97
98A relatively large number of key/value pairs can be specified to influence
99the behaviour. They are grouped into the categories "pool management",
100"template process" and "rpc parameters".
78 101
79=over 4 102=over 4
80 103
81=item on_error => $cb->($msg) 104=item Pool Management
82 105
83Called on (fatal) errors, with a descriptive (hopefully) message. If 106The pool consists of a certain number of worker processes. These options
84this callback is not provided, but C<on_event> is, then the C<on_event> 107decide how many of these processes exist and when they are started and
85callback is called with the first argument being the string C<error>, 108stopp.ed
86followed by the error message.
87 109
88If neither handler is provided it prints the error to STDERR and will 110=over 4
89start failing badly.
90 111
91=item on_event => $cb->(...) 112=item idle => $count (default: 0)
92 113
93Called for every call to the C<AnyEvent::Fork::RPC::event> function in the 114The minimum amount of idle processes in the pool - when there are fewer
94child, with the arguments of that function passed to the callback. 115than this many idle workers, C<AnyEvent::Fork::Pool> will try to start new
116ones, subject to C<max> and C<start>.
95 117
96Also called on errors when no C<on_error> handler is provided. 118This is also the initial/minimum amount of workers in the pool. The
119default of zero means that the pool starts empty and can shrink back to
120zero workers over time.
97 121
98=item on_destroy => $cb->() 122=item max => $count (default: 4)
99 123
100Called when the C<$rpc> object has been destroyed and all requests have 124The maximum number of processes in the pool, in addition to the template
101been successfully handled. This is useful when you queue some requests and 125process. C<AnyEvent::Fork::Pool> will never create more than this number
102want the child to go away after it has handled them. The problem is that 126of worker processes, although there can be more temporarily when a worker
103the parent must not exit either until all requests have been handled, and 127is shut down and hasn't exited yet.
104this can be accomplished by waiting for this callback.
105 128
106=item init => $function (default none) 129=item load => $count (default: 2)
107 130
108When specified (by name), this function is called in the child as the very 131The maximum number of concurrent jobs sent to a single worker
109first thing when taking over the process, with all the arguments normally 132process. Worker processes that handle this number of jobs already are
110passed to the C<AnyEvent::Fork::run> function, except the communications 133called "busy".
111socket.
112 134
113It can be used to do one-time things in the child such as storing passed 135Jobs that cannot be sent to a worker immediately (because all workers are
114parameters or opening database connections. 136busy) will be queued until a worker is available.
115 137
116It is called very early - before the serialisers are created or the 138=item start => $seconds (default: 0.1)
117C<$function> name is resolved into a function reference, so it could be 139
118used to load any modules that provide the serialiser or function. It can 140When a job is queued and all workers are busy, a timer is started. If the
119not, however, create events. 141timer elapses and there are still jobs that cannot be queued to a worker,
142a new worker is started.
143
144This configurs the time that all workers must be busy before a new worker
145is started. Or, put differently, the minimum delay betwene starting new
146workers.
147
148The delay is zero by default, which means new workers will be started
149without delay.
150
151=item stop => $seconds (default: 1)
152
153When a worker has no jobs to execute it becomes idle. An idle worker that
154hasn't executed a job within this amount of time will be stopped, unless
155the other parameters say otherwise.
156
157=item on_destroy => $callback->() (default: none)
158
159When a pool object goes out of scope, it will still handle all outstanding
160jobs. After that, it will destroy all workers (and also the template
161process if it isn't referenced otherwise).
162
163=back
164
165=item Template Process
166
167The worker processes are all forked from a single template
168process. Ideally, all modules and all cdoe used by the worker, as well as
169any shared data structures should be loaded into the template process, to
170take advantage of data sharing via fork.
171
172You can create your own template process by creating a L<AnyEvent::Fork>
173object yourself and passing it as the C<template> parameter, but
174C<AnyEvent::Fork::Pool> can create one for you, including some standard
175options.
176
177=over 4
178
179=item template => $fork (default: C<< AnyEvent::Fork->new >>)
180
181The template process to use, if you want to create your own.
182
183=item require => \@modules (default: C<[]>)
184
185The modules in this list will be laoded into the template process.
186
187=item eval => "# perl code to execute in template" (default: none)
188
189This is a perl string that is evaluated after creating the template
190process and after requiring the modules. It can do whatever it wants to
191configure the process, but it must not do anything that would keep a later
192fork from working (so must not create event handlers or (real) threads for
193example).
194
195=back
196
197=item AnyEvent::Fork::RPC Parameters
198
199These parameters are all passed directly to L<AnyEvent::Fork::RPC>. They
200are only briefly mentioned here, for their full documentation
201please refer to the L<AnyEvent::Fork::RPC> documentation. Also, the
202default values mentioned here are only documented as a best effort -
203L<AnyEvent::Fork::RPC> documentation is binding.
204
205=over 4
120 206
121=item async => $boolean (default: 0) 207=item async => $boolean (default: 0)
122 208
123The default server used in the child does all I/O blockingly, and only 209Whether to sue the synchronous or asynchronous RPC backend.
124allows a single RPC call to execute concurrently.
125 210
126Setting C<async> to a true value switches to another implementation that 211=item on_error => $callback->($message) (default: die with message)
127uses L<AnyEvent> in the child and allows multiple concurrent RPC calls.
128 212
129The actual API in the child is documented in the section that describes 213The callback to call on any (fatal) errors.
130the calling semantics of the returned C<$rpc> function.
131 214
132If you want to pre-load the actual back-end modules to enable memory 215=item on_event => $callback->(...) (default: C<sub { }>, unlike L<AnyEvent::Fork::RPC>)
133sharing, then you should load C<AnyEvent::Fork::RPC::Sync> for
134synchronous, and C<AnyEvent::Fork::RPC::Async> for asynchronous mode.
135 216
136If you use a template process and want to fork both sync and async 217The callback to invoke on events.
137children, then it is permissible to load both modules.
138 218
139=item serialiser => $string (default: '(sub { pack "(w/a*)*", @_ }, sub { unpack "(w/a*)*", shift })') 219=item init => $initfunction (default: none)
140 220
141All arguments, result data and event data have to be serialised to be 221The function to call in the child, once before handling requests.
142transferred between the processes. For this, they have to be frozen and
143thawed in both parent and child processes.
144 222
145By default, only octet strings can be passed between the processes, which 223=item serialiser => $serialiser (defailt: $AnyEvent::Fork::RPC::STRING_SERIALISER)
146is reasonably fast and efficient.
147 224
148For more complicated use cases, you can provide your own freeze and thaw 225The serialiser to use.
149functions, by specifying a string with perl source code. It's supposed to
150return two code references when evaluated: the first receives a list of
151perl values and must return an octet string. The second receives the octet
152string and must return the original list of values.
153
154If you need an external module for serialisation, then you can either
155pre-load it into your L<AnyEvent::Fork> process, or you can add a C<use>
156or C<require> statement into the serialiser string. Or both.
157 226
158=back 227=back
159 228
160See the examples section earlier in this document for some actual 229=back
161examples.
162 230
163=cut 231=cut
164 232
165sub new { 233sub run {
166 my ($self, $function, %arg) = @_; 234 my ($template, $function, %arg) = @_;
167 235
168 my $serialiser = delete $arg{serialiser} || $STRING_SERIALISER; 236 my $max = $arg{max} || 4;
237 my $idle = $arg{idle} || 0,
238 my $load = $arg{load} || 2,
239 my $start = $arg{start} || 0.1,
240 my $stop = $arg{stop} || 1,
169 my $on_event = delete $arg{on_event}; 241 my $on_event = $arg{on_event} || sub { },
170 my $on_error = delete $arg{on_error};
171 my $on_destroy = delete $arg{on_destroy}; 242 my $on_destroy = $arg{on_destroy};
243
244 my @rpc = (
245 async => $arg{async},
246 init => $arg{init},
247 serialiser => $arg{serialiser},
248 on_error => $arg{on_error},
172 249 );
173 # default for on_error is to on_event, if specified
174 $on_error ||= $on_event
175 ? sub { $on_event->(error => shift) }
176 : sub { die "AnyEvent::Fork::RPC: uncaught error: $_[0].\n" };
177 250
178 # default for on_event is to raise an error 251 my (@pool, @queue, $nidle, $start_w, $stop_w, $shutdown);
179 $on_event ||= sub { $on_error->("event received, but no on_event handler") }; 252 my ($start, $stop, $want_start, $want_stop, $scheduler);
180 253
181 my ($f, $t) = eval $serialiser; die $@ if $@; 254 my $destroy_guard = Guard::guard {
255 $on_destroy->()
256 if $on_destroy;
257 };
182 258
183 my (@rcb, %rcb, $fh, $shutdown, $wbuf, $ww); 259 $template
184 my ($rlen, $rbuf, $rw) = 512 - 16; 260 ->require ("AnyEvent::Fork::RPC::" . ($arg{async} ? "Async" : "Sync"))
185 261 ->eval ('
186 my $wcb = sub { 262 my ($magic0, $magic1) = @_;
187 my $len = syswrite $fh, $wbuf; 263 sub AnyEvent::Fork::Pool::quit() {
188 264 AnyEvent::Fork::RPC::on_event $magic0, "quit", $magic1;
189 unless (defined $len) {
190 if ($! != Errno::EAGAIN && $! != Errno::EWOULDBLOCK) {
191 undef $rw; undef $ww; # it ends here
192 $on_error->("$!");
193 } 265 }
194 } 266 ', $magic0, $magic1)
267 ->eval (delete $arg{eval});
195 268
196 substr $wbuf, 0, $len, ""; 269 $start = sub {
270 my $proc = [0, 0, undef]; # load, index, rpc
197 271
198 unless (length $wbuf) { 272 warn "start a worker\n";#d#
199 undef $ww;
200 $shutdown and shutdown $fh, 1;
201 }
202 };
203 273
204 my $module = "AnyEvent::Fork::RPC::" . ($arg{async} ? "Async" : "Sync"); 274 $proc->[2] = $template
275 ->fork
276 ->AnyEvent::Fork::RPC::run ($function,
277 @rpc,
278 on_event => sub {
279 if (@_ == 3 && $_[0] eq $magic0 && $_[2] eq $magic1) {
280 $destroy_guard if 0; # keep it alive
205 281
206 $self->require ($module) 282 $_[1] eq "quit" and $stop->($proc);
207 ->send_arg ($function, $arg{init}, $serialiser)
208 ->run ("$module\::run", sub {
209 $fh = shift;
210
211 my ($id, $len);
212 $rw = AE::io $fh, 0, sub {
213 $rlen = $rlen * 2 + 16 if $rlen - 128 < length $rbuf;
214 $len = sysread $fh, $rbuf, $rlen - length $rbuf, length $rbuf;
215
216 if ($len) {
217 while (8 <= length $rbuf) {
218 ($id, $len) = unpack "LL", $rbuf;
219 8 + $len <= length $rbuf
220 or last; 283 return;
221
222 my @r = $t->(substr $rbuf, 8, $len);
223 substr $rbuf, 0, 8 + $len, "";
224
225 if ($id) {
226 if (@rcb) {
227 (shift @rcb)->(@r);
228 } elsif (my $cb = delete $rcb{$id}) {
229 $cb->(@r);
230 } else {
231 undef $rw; undef $ww;
232 $on_error->("unexpected data from child");
233 } 284 }
234 } else { 285
235 $on_event->(@r); 286 &$on_event;
236 } 287 },
237 } 288 )
238 } elsif (defined $len) { 289 ;
239 undef $rw; undef $ww; # it ends here
240 290
241 if (@rcb || %rcb) { 291 ++$nidle;
242 $on_error->("unexpected eof"); 292 Array::Heap::push_heap @pool, $proc;
243 } else { 293
294 Scalar::Util::weaken $proc;
295 };
296
297 $stop = sub {
298 my $proc = shift;
299
300 $proc->[0]
301 or --$nidle;
302
303 Array::Heap::splice_heap_idx @pool, $proc->[1]
304 if defined $proc->[1];
305 };
306
307 $want_start = sub {
308 undef $stop_w;
309
310 $start_w ||= AE::timer $start, 0, sub {
311 undef $start_w;
312
313 if (@queue) {
244 $on_destroy->(); 314 $start->();
245 } 315 $scheduler->();
246 } elsif ($! != Errno::EAGAIN && $! != Errno::EWOULDBLOCK) {
247 undef $rw; undef $ww; # it ends here
248 $on_error->("read: $!");
249 } 316 }
250 }; 317 };
251
252 $ww ||= AE::io $fh, 1, $wcb;
253 }); 318 };
254 319
320 $want_stop = sub {
321 $stop_w ||= AE::timer $stop, 0, sub {
322 undef $stop_w;
323
324 $stop->($pool[0])
325 if $nidle;
326 };
327 };
328
329 $scheduler = sub {
330 if (@queue) {
331 while (@queue) {
332 my $proc = $pool[0];
333
334 if ($proc->[0] < $load) {
335 warn "free $proc $proc->[0]\n";#d#
336 # found free worker
337 $proc->[0]++
338 or --$nidle >= $idle
339 or $want_start->();
340
341 Array::Heap::adjust_heap @pool, 0;
342
343 my $job = shift @queue;
344 my $ocb = pop @$job;
345
346 $proc->[2]->(@$job, sub {
347 # reduce queue counter
348 --$pool[$_][0]
349 or ++$nidle > $idle
350 or $want_stop->();
351
352 Array::Heap::adjust_heap @pool, $_;
353
354 $scheduler->();
355
356 &$ocb;
357 });
358 } else {
359 warn "busy $proc->[0]\n";#d#
360 # all busy, delay
361
362 $want_start->();
363 last;
364 }
365 }
366 } elsif ($shutdown) {
367 @pool = ();
368 undef $start_w;
369 undef $start; # frees $destroy_guard reference
370
371 $stop->($pool[0])
372 while $nidle;
373 }
374 };
375
255 my $guard = Guard::guard { 376 my $shutdown_guard = Guard::guard {
256 $shutdown = 1; 377 $shutdown = 1;
257 $ww ||= $fh && AE::io $fh, 1, $wcb; 378 $scheduler->();
379 };
380
381 $start->()
382 while @pool < $idle;
383
384 sub {
385 $shutdown_guard if 0; # keep it alive
386
387 $start->()
388 unless @pool;
389
390 push @queue, [@_];
391 $scheduler->();
258 }; 392 }
259
260 my $id;
261
262 $arg{async}
263 ? sub {
264 $id = ($id == 0xffffffff ? 0 : $id) + 1;
265 $id = ($id == 0xffffffff ? 0 : $id) + 1 while exists $rcb{$id}; # rarely loops
266
267 $rcb{$id} = pop;
268
269 $guard; # keep it alive
270
271 $wbuf .= pack "LL/a*", $id, &$f;
272 $ww ||= $fh && AE::io $fh, 1, $wcb;
273 }
274 : sub {
275 push @rcb, pop;
276
277 $guard; # keep it alive
278
279 $wbuf .= pack "L/a*", &$f;
280 $ww ||= $fh && AE::io $fh, 1, $wcb;
281 }
282} 393}
283 394
284=item $pool->call (..., $cb->(...)) 395=item $pool->call (..., $cb->(...))
396
397Call the RPC function of a worker with the given arguments, and when the
398worker is done, call the C<$cb> with the results, like just calling the
399L<AnyEvent::Fork::RPC> object directly.
400
401If there is no free worker, the call will be queued.
402
403Note that there can be considerable time between calling this method and
404the call actually being executed. During this time, the parameters passed
405to this function are effectively read-only - modifying them after the call
406and before the callback is invoked causes undefined behaviour.
407
408=cut
285 409
286=back 410=back
287 411
288=head1 SEE ALSO 412=head1 SEE ALSO
289 413

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines