ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-Fork-Pool/Pool.pm
(Generate patch)

Comparing AnyEvent-Fork-Pool/Pool.pm (file contents):
Revision 1.1 by root, Thu Apr 18 14:24:01 2013 UTC vs.
Revision 1.2 by root, Thu Apr 18 21:37:27 2013 UTC

12 my $pool = new AnyEvent::Fork::Pool 12 my $pool = new AnyEvent::Fork::Pool
13 "MyWorker::run", 13 "MyWorker::run",
14 14
15 # pool management 15 # pool management
16 min => 0, # minimum # of processes 16 min => 0, # minimum # of processes
17 max => 8, # maximum # of processes 17 max => 4, # maximum # of processes
18 max_queue => 2, # queue at most this number of jobs per process
18 busy_time => 0, # wait this before starting a new process 19 min_delay => 0, # wait this many seconds before starting a new process
19 max_idle => 1, # wait this before killing an idle process 20 min_idle => 0, # try to have at least this amount of idle processes
20 idle_time => 1, # at most this many idle processes 21 max_idle => 1, # at most this many idle processes
22 idle_time => 1, # wait this many seconds before killing an idle process
23 on_destroy => (my $finish = AE::cv),
21 24
22 # template process 25 # template process
23 template => AnyEvent::Fork->new, # the template process to use 26 template => AnyEvent::Fork->new, # the template process to use
24 require => [MyWorker::], # module(s) to load 27 require => [MyWorker::], # module(s) to load
25 eval => "# perl code to execute in template", 28 eval => "# perl code to execute in template",
26 on_destroy => (my $finish = AE::cv),
27 29
28 # parameters passed to AnyEvent::Fork::RPC 30 # parameters passed to AnyEvent::Fork::RPC
29 async => 0, 31 async => 0,
30 on_error => sub { die "FATAL: $_[0]\n" }, 32 on_error => sub { die "FATAL: $_[0]\n" },
31 on_event => sub { my @ev = @_ }, 33 on_event => sub { my @ev = @_ },
54is, as it defines the actual API that needs to be implemented in the 56is, as it defines the actual API that needs to be implemented in the
55children. 57children.
56 58
57=head1 EXAMPLES 59=head1 EXAMPLES
58 60
59=head1 API 61=head1 PARENT USAGE
60 62
61=over 4 63=over 4
62 64
63=cut 65=cut
64 66
65package AnyEvent::Fork::Pool; 67package AnyEvent::Fork::Pool;
66 68
67use common::sense; 69use common::sense;
68 70
69use Guard (); 71use Scalar::Util ();
72
73use Array::Heap ();
70 74
71use AnyEvent; 75use AnyEvent;
72use AnyEvent::Fork; # we don't actually depend on it, this is for convenience 76use AnyEvent::Fork; # we don't actually depend on it, this is for convenience
73use AnyEvent::Fork::RPC; 77use AnyEvent::Fork::RPC;
74 78
79my $magic0 = ':t6Z@HK1N%Dx@_7?=~-7NQgWDdAs6a,jFN=wLO0*jD*1%P';
80my $magic2 = '<~53rexz.U`!]X[A235^"fyEoiTF\T~oH1l/N6+Djep9b~bI9`\1x%B~vWO1q*';
81
75our $VERSION = 0.1; 82our $VERSION = 0.1;
76 83
77=item my $rpc = new AnyEvent::Fork::RPC::pool $function, [key => value...] 84=item my $rpc = new AnyEvent::Fork::Pool $function, [key => value...]
85
86Creates a new pool object with the specified C<$function> as function
87(name) to call for each request.
88
89A pool consists of a template process that contains the code and data that
90the worker processes need. And a number of worker processes that have been
91forked off of that template process.
92
93You can supply your own template process, or tell C<AnyEvent::Fork::Pool>
94to create one.
95
96A relatively large number of key/value pairs can be specified to influence
97the behaviour. They are grouped into the categories "pool management",
98"template process" and "rpc parameters".
78 99
79=over 4 100=over 4
80 101
81=item on_error => $cb->($msg) 102=item Pool Management
82 103
83Called on (fatal) errors, with a descriptive (hopefully) message. If 104The pool consists of a certain number of worker processes. These options
84this callback is not provided, but C<on_event> is, then the C<on_event> 105decide how many of these processes exist and when they are started and
85callback is called with the first argument being the string C<error>, 106stopp.ed
86followed by the error message.
87 107
88If neither handler is provided it prints the error to STDERR and will 108=over 4
89start failing badly.
90 109
91=item on_event => $cb->(...) 110=item min => $count (default: 0)
92 111
93Called for every call to the C<AnyEvent::Fork::RPC::event> function in the 112The minimum number of processes in the pool, in addition to the template
94child, with the arguments of that function passed to the callback. 113process. Even when idle, there will never be fewer than this number of
114worker processes. The default means that the pool can be empty.
95 115
96Also called on errors when no C<on_error> handler is provided. 116=item max => $count (default: 4)
97 117
98=item on_destroy => $cb->() 118The maximum number of processes in the pool, in addition to the template
119process. C<AnyEvent::Fork::Pool> will never create more than this number
120of processes.
99 121
100Called when the C<$rpc> object has been destroyed and all requests have 122=item max_queue => $count (default: 2)
101been successfully handled. This is useful when you queue some requests and
102want the child to go away after it has handled them. The problem is that
103the parent must not exit either until all requests have been handled, and
104this can be accomplished by waiting for this callback.
105 123
106=item init => $function (default none) 124The maximum number of jobs sent to a single worker process. Worker
125processes that handle this number of jobs already are called "busy".
107 126
108When specified (by name), this function is called in the child as the very 127Jobs that cannot be sent to a worker immediately (because all workers are
109first thing when taking over the process, with all the arguments normally 128busy) will be queued until a worker is available.
110passed to the C<AnyEvent::Fork::run> function, except the communications
111socket.
112 129
113It can be used to do one-time things in the child such as storing passed 130=item min_delay => $seconds (default: 0)
114parameters or opening database connections.
115 131
116It is called very early - before the serialisers are created or the 132When a job is queued and all workers are busy, a timer is started. If the
117C<$function> name is resolved into a function reference, so it could be 133timer elapses and there are still jobs that cannot be queued to a worker,
118used to load any modules that provide the serialiser or function. It can 134a new worker is started.
119not, however, create events. 135
136This configurs the time that all workers must be busy before a new worker
137is started. Or, put differently, the minimum delay betwene starting new
138workers.
139
140The delay is zero by default, which means new workers will be started
141without delay.
142
143=item min_idle => $count (default: 0)
144
145The minimum number of idle workers - when they are less, more
146are started. The C<min_delay> is still respected though, and
147C<min_idle>/C<min_delay> and C<max_idle>/C<idle_time> are useful to
148dynamically adjust the pool.
149
150=item max_idle => $count (default: 1)
151
152The maximum number of idle workers. If a worker becomes idle and there are
153already this many idle workers, it will be stopped immediately instead of
154waiting for the idle timer to elapse.
155
156=item idle_time => $seconds (default: 1)
157
158When a worker has no jobs to execute it becomes idle. An idle worker that
159hasn't executed a job within this amount of time will be stopped, unless
160the other parameters say otherwise.
161
162=item on_destroy => $callback->() (default: none)
163
164When a pool object goes out of scope, it will still handle all outstanding
165jobs. After that, it will destroy all workers (and also the template
166process if it isn't referenced otherwise).
167
168=back
169
170=item Template Process
171
172The worker processes are all forked from a single template
173process. Ideally, all modules and all cdoe used by the worker, as well as
174any shared data structures should be loaded into the template process, to
175take advantage of data sharing via fork.
176
177You can create your own template process by creating a L<AnyEvent::Fork>
178object yourself and passing it as the C<template> parameter, but
179C<AnyEvent::Fork::Pool> can create one for you, including some standard
180options.
181
182=over 4
183
184=item template => $fork (default: C<< AnyEvent::Fork->new >>)
185
186The template process to use, if you want to create your own.
187
188=item require => \@modules (default: C<[]>)
189
190The modules in this list will be laoded into the template process.
191
192=item eval => "# perl code to execute in template" (default: none)
193
194This is a perl string that is evaluated after creating the template
195process and after requiring the modules. It can do whatever it wants to
196configure the process, but it must not do anything that would keep a later
197fork from working (so must not create event handlers or (real) threads for
198example).
199
200=back
201
202=item AnyEvent::Fork::RPC Parameters
203
204These parameters are all passed directly to L<AnyEvent::Fork::RPC>. They
205are only briefly mentioned here, for their full documentation
206please refer to the L<AnyEvent::Fork::RPC> documentation. Also, the
207default values mentioned here are only documented as a best effort -
208L<AnyEvent::Fork::RPC> documentation is binding.
209
210=over 4
120 211
121=item async => $boolean (default: 0) 212=item async => $boolean (default: 0)
122 213
123The default server used in the child does all I/O blockingly, and only 214Whether to sue the synchronous or asynchronous RPC backend.
124allows a single RPC call to execute concurrently.
125 215
126Setting C<async> to a true value switches to another implementation that 216=item on_error => $callback->($message) (default: die with message)
127uses L<AnyEvent> in the child and allows multiple concurrent RPC calls.
128 217
129The actual API in the child is documented in the section that describes 218The callback to call on any (fatal) errors.
130the calling semantics of the returned C<$rpc> function.
131 219
132If you want to pre-load the actual back-end modules to enable memory 220=item on_event => $callback->(...) (default: C<sub { }>, unlike L<AnyEvent::Fork::RPC>)
133sharing, then you should load C<AnyEvent::Fork::RPC::Sync> for
134synchronous, and C<AnyEvent::Fork::RPC::Async> for asynchronous mode.
135 221
136If you use a template process and want to fork both sync and async 222The callback to invoke on events.
137children, then it is permissible to load both modules.
138 223
139=item serialiser => $string (default: '(sub { pack "(w/a*)*", @_ }, sub { unpack "(w/a*)*", shift })') 224=item init => $initfunction (default: none)
140 225
141All arguments, result data and event data have to be serialised to be 226The function to call in the child, once before handling requests.
142transferred between the processes. For this, they have to be frozen and
143thawed in both parent and child processes.
144 227
145By default, only octet strings can be passed between the processes, which 228=item serialiser => $serialiser (defailt: $AnyEvent::Fork::RPC::STRING_SERIALISER)
146is reasonably fast and efficient.
147 229
148For more complicated use cases, you can provide your own freeze and thaw 230The serialiser to use.
149functions, by specifying a string with perl source code. It's supposed to
150return two code references when evaluated: the first receives a list of
151perl values and must return an octet string. The second receives the octet
152string and must return the original list of values.
153
154If you need an external module for serialisation, then you can either
155pre-load it into your L<AnyEvent::Fork> process, or you can add a C<use>
156or C<require> statement into the serialiser string. Or both.
157 231
158=back 232=back
159 233
160See the examples section earlier in this document for some actual 234=back
161examples.
162 235
163=cut 236=cut
164 237
165sub new { 238sub new {
166 my ($self, $function, %arg) = @_; 239 my ($class, $function, %arg) = @_;
167 240
168 my $serialiser = delete $arg{serialiser} || $STRING_SERIALISER; 241 my $self = bless {
169 my $on_event = delete $arg{on_event}; 242 min => 0,
170 my $on_error = delete $arg{on_error}; 243 max => 4,
171 my $on_destroy = delete $arg{on_destroy}; 244 max_queue => 2,
245 min_delay => 0,
246 max_idle => 1,
247 idle_time => 1,
248 on_event => sub { },
249 %arg,
250 pool => [],
251 queue => [],
252 }, $class;
253
254 $self->{function} = $function;
255
256 ($self->{template} ||= new AnyEvent::Fork)
257 ->require ("AnyEvent::Fork::RPC::" . ($self->{async} ? "Async" : "Sync"))
258 ->require (@{ delete $self->{require} })
259 ->eval ('
260 my ($magic0, $magic2) = @_;
261 sub AnyEvent::Fork::Pool::quit() {
262 AnyEvent::Fork::RPC::on_event $magic0, "quit", $magic2;
263 }
264 ', $magic0, $magic2)
265 ->eval (delete $self->{eval});
266
267 $self->start
268 while @{ $self->{pool} } < $self->{min};
269
270 $self
271}
272
273sub start {
274 my ($self) = @_;
172 275
173 # default for on_error is to on_event, if specified 276 warn "start\n";#d#
174 $on_error ||= $on_event
175 ? sub { $on_event->(error => shift) }
176 : sub { die "AnyEvent::Fork::RPC: uncaught error: $_[0].\n" };
177 277
178 # default for on_event is to raise an error 278 Scalar::Util::weaken $self;
179 $on_event ||= sub { $on_error->("event received, but no on_event handler") };
180 279
181 my ($f, $t) = eval $serialiser; die $@ if $@; 280 my $proc = [0, undef, undef];
182 281
183 my (@rcb, %rcb, $fh, $shutdown, $wbuf, $ww); 282 $proc->[1] = $self->{template}
184 my ($rlen, $rbuf, $rw) = 512 - 16; 283 ->fork
284 ->AnyEvent::Fork::RPC::run ($self->{function},
285 async => $self->{async},
286 init => $self->{init},
287 serialiser => $self->{serialiser},
288 on_error => $self->{on_error},
289 on_event => sub {
290 if (@_ == 3 && $_[0] eq $magic0 && $_[2] eq $magic2) {
291 if ($_[1] eq "quit") {
292 my $pool = $self->{pool};
293 for (0 .. $#$pool) {
294 if ($pool->[$_] == $proc) {
295 Array::Heap::splice_heap @$pool, $_;
296 return;
297 }
298 }
299 die;
300 }
301 return;
302 }
185 303
186 my $wcb = sub { 304 &{ $self->{on_event} };
187 my $len = syswrite $fh, $wbuf;
188
189 unless (defined $len) {
190 if ($! != Errno::EAGAIN && $! != Errno::EWOULDBLOCK) {
191 undef $rw; undef $ww; # it ends here
192 $on_error->("$!");
193 } 305 },
194 } 306 )
195
196 substr $wbuf, 0, $len, "";
197
198 unless (length $wbuf) {
199 undef $ww;
200 $shutdown and shutdown $fh, 1;
201 }
202 }; 307 ;
203 308
204 my $module = "AnyEvent::Fork::RPC::" . ($arg{async} ? "Async" : "Sync"); 309 ++$self->{idle};
310 Array::Heap::push_heap @{ $self->{pool} }, $proc;
311}
205 312
206 $self->require ($module) 313=item $pool->call (..., $cb->(...))
207 ->send_arg ($function, $arg{init}, $serialiser) 314
208 ->run ("$module\::run", sub { 315Call the RPC function of a worker with the given arguments, and when the
316worker is done, call the C<$cb> with the results, like just calling the
317L<AnyEvent::Fork::RPC> object directly.
318
319If there is no free worker, the call will be queued.
320
321Note that there can be considerable time between calling this method and
322the call actually being executed. During this time, the parameters passed
323to this function are effectively read-only - modifying them after the call
324and before the callback is invoked causes undefined behaviour.
325
326=cut
327
328sub scheduler {
209 $fh = shift; 329 my $self = shift;
210 330
211 my ($id, $len); 331 my $pool = $self->{pool};
212 $rw = AE::io $fh, 0, sub { 332 my $queue = $self->{queue};
213 $rlen = $rlen * 2 + 16 if $rlen - 128 < length $rbuf;
214 $len = sysread $fh, $rbuf, $rlen - length $rbuf, length $rbuf;
215 333
216 if ($len) { 334 $self->start
217 while (8 <= length $rbuf) { 335 unless @$pool;
218 ($id, $len) = unpack "LL", $rbuf; 336
219 8 + $len <= length $rbuf 337 while (@$queue) {
338 my $proc = $pool->[0];
339
340 if ($proc->[0] < $self->{max_queue}) {
341 warn "free $proc $proc->[0]\n";#d#
342 # found free worker
343 --$self->{idle}
344 unless $proc->[0]++;
345
346 undef $proc->[2];
347
348 Array::Heap::adjust_heap @$pool, 0;
349
350 my $job = shift @$queue;
351 my $ocb = pop @$job;
352
353 $proc->[1]->(@$job, sub {
354 for (0 .. $#$pool) {
355 if ($pool->[$_] == $proc) {
356 # reduce queue counter
357 unless (--$pool->[$_][0]) {
358 # worker becomes idle
359 my $to = ++$self->{idle} > $self->{max_idle}
360 ? 0
361 : $self->{idle_time};
362
363 $proc->[2] = AE::timer $to, 0, sub {
364 undef $proc->[2];
365
366 warn "destroy $proc afzer $to\n";#d#
367
368 for (0 .. $#$pool) {
369 if ($pool->[$_] == $proc) {
370 Array::Heap::splice_heap @$pool, $_;
371 --$self->{idle};
220 or last; 372 last;
221 373 }
222 my @r = $t->(substr $rbuf, 8, $len); 374 }
223 substr $rbuf, 0, 8 + $len, "";
224
225 if ($id) {
226 if (@rcb) {
227 (shift @rcb)->(@r);
228 } elsif (my $cb = delete $rcb{$id}) {
229 $cb->(@r); 375 };
230 } else {
231 undef $rw; undef $ww;
232 $on_error->("unexpected data from child");
233 } 376 }
234 } else { 377
235 $on_event->(@r); 378 Array::Heap::adjust_heap @$pool, $_;
379 last;
236 } 380 }
237 } 381 }
238 } elsif (defined $len) { 382 &$ocb;
239 undef $rw; undef $ww; # it ends here 383 });
240
241 if (@rcb || %rcb) {
242 $on_error->("unexpected eof");
243 } else { 384 } else {
244 $on_destroy->(); 385 warn "busy $proc->[0]\n";#d#
386 # all busy, delay
387
388 $self->{min_delay_w} ||= AE::timer $self->{min_delay}, 0, sub {
389 delete $self->{min_delay_w};
390
391 if (@{ $self->{queue} }) {
392 $self->start;
393 $self->scheduler;
245 } 394 }
246 } elsif ($! != Errno::EAGAIN && $! != Errno::EWOULDBLOCK) {
247 undef $rw; undef $ww; # it ends here
248 $on_error->("read: $!");
249 } 395 };
396 last;
250 }; 397 }
251
252 $ww ||= AE::io $fh, 1, $wcb;
253 });
254
255 my $guard = Guard::guard {
256 $shutdown = 1;
257 $ww ||= $fh && AE::io $fh, 1, $wcb;
258 }; 398 }
259 399 warn "last\n";#d#
260 my $id;
261
262 $arg{async}
263 ? sub {
264 $id = ($id == 0xffffffff ? 0 : $id) + 1;
265 $id = ($id == 0xffffffff ? 0 : $id) + 1 while exists $rcb{$id}; # rarely loops
266
267 $rcb{$id} = pop;
268
269 $guard; # keep it alive
270
271 $wbuf .= pack "LL/a*", $id, &$f;
272 $ww ||= $fh && AE::io $fh, 1, $wcb;
273 }
274 : sub {
275 push @rcb, pop;
276
277 $guard; # keep it alive
278
279 $wbuf .= pack "L/a*", &$f;
280 $ww ||= $fh && AE::io $fh, 1, $wcb;
281 }
282} 400}
283 401
284=item $pool->call (..., $cb->(...)) 402sub call {
403 my $self = shift;
404
405 push @{ $self->{queue} }, [@_];
406 $self->scheduler;
407}
408
409sub DESTROY {
410 $_[0]{on_destroy}->();
411}
285 412
286=back 413=back
287 414
288=head1 SEE ALSO 415=head1 SEE ALSO
289 416

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines