ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-Fork-Pool/Pool.pm
(Generate patch)

Comparing AnyEvent-Fork-Pool/Pool.pm (file contents):
Revision 1.5 by root, Sat Apr 20 19:33:23 2013 UTC vs.
Revision 1.6 by root, Sun Apr 21 11:17:02 2013 UTC

6 6
7 use AnyEvent; 7 use AnyEvent;
8 use AnyEvent::Fork::Pool; 8 use AnyEvent::Fork::Pool;
9 # use AnyEvent::Fork is not needed 9 # use AnyEvent::Fork is not needed
10 10
11 # all parameters with default values 11 # all possible parameters shown, with default values
12 my $pool = AnyEvent::Fork 12 my $pool = AnyEvent::Fork
13 ->new 13 ->new
14 ->require ("MyWorker") 14 ->require ("MyWorker")
15 ->AnyEvent::Fork::Pool::run ( 15 ->AnyEvent::Fork::Pool::run (
16 "MyWorker::run", # the worker function 16 "MyWorker::run", # the worker function
17 17
18 # pool management 18 # pool management
19 max => 4, # absolute maximum # of processes 19 max => 4, # absolute maximum # of processes
20 idle => 2, # minimum # of idle processes 20 idle => 0, # minimum # of idle processes
21 load => 2, # queue at most this number of jobs per process 21 load => 2, # queue at most this number of jobs per process
22 start => 0.1, # wait this many seconds before starting a new process 22 start => 0.1, # wait this many seconds before starting a new process
23 stop => 1, # wait this many seconds before stopping an idle process 23 stop => 10, # wait this many seconds before stopping an idle process
24 on_destroy => (my $finish = AE::cv), # called when object is destroyed 24 on_destroy => (my $finish = AE::cv), # called when object is destroyed
25 25
26 # parameters passed to AnyEvent::Fork::RPC 26 # parameters passed to AnyEvent::Fork::RPC
27 async => 0, 27 async => 0,
28 on_error => sub { die "FATAL: $_[0]\n" }, 28 on_error => sub { die "FATAL: $_[0]\n" },
48pool of processes that handles jobs. 48pool of processes that handles jobs.
49 49
50Understanding of L<AnyEvent::Fork> is helpful but not critical to be able 50Understanding of L<AnyEvent::Fork> is helpful but not critical to be able
51to use this module, but a thorough understanding of L<AnyEvent::Fork::RPC> 51to use this module, but a thorough understanding of L<AnyEvent::Fork::RPC>
52is, as it defines the actual API that needs to be implemented in the 52is, as it defines the actual API that needs to be implemented in the
53children. 53worker processes.
54 54
55=head1 EXAMPLES 55=head1 EXAMPLES
56 56
57=head1 PARENT USAGE 57=head1 PARENT USAGE
58
59To create a pool, you first have to create a L<AnyEvent::Fork> object -
60this object becomes your template process. Whenever a new worker process
61is needed, it is forked from this template process. Then you need to
62"hand off" this template process to the C<AnyEvent::Fork::Pool> module by
63calling its run method on it:
64
65 my $template = AnyEvent::Fork
66 ->new
67 ->require ("SomeModule", "MyWorkerModule");
68
69 my $pool = $template->AnyEvent::Fork::Pool::run ("MyWorkerModule::myfunction");
70
71The pool "object" is not a regular Perl object, but a code reference that
72you can call and that works roughly like calling the worker function
73directly, except that it returns nothing but instead you need to specify a
74callback to be invoked once results are in:
75
76 $pool->(1, 2, 3, sub { warn "myfunction(1,2,3) returned @_" });
58 77
59=over 4 78=over 4
60 79
61=cut 80=cut
62 81
81 100
82our $VERSION = 0.1; 101our $VERSION = 0.1;
83 102
84=item my $pool = AnyEvent::Fork::Pool::run $fork, $function, [key => value...] 103=item my $pool = AnyEvent::Fork::Pool::run $fork, $function, [key => value...]
85 104
86The traditional way to call it. But it is way cooler to call it in the 105The traditional way to call the pool creation function. But it is way
87following way: 106cooler to call it in the following way:
88 107
89=item my $pool = $fork->AnyEvent::Fork::Pool::run ($function, [key => value...]) 108=item my $pool = $fork->AnyEvent::Fork::Pool::run ($function, [key => value...])
90 109
91Creates a new pool object with the specified C<$function> as function 110Creates a new pool object with the specified C<$function> as function
92(name) to call for each request. The pool uses the C<$fork> object as the 111(name) to call for each request. The pool uses the C<$fork> object as the
105 124
106The pool consists of a certain number of worker processes. These options 125The pool consists of a certain number of worker processes. These options
107decide how many of these processes exist and when they are started and 126decide how many of these processes exist and when they are started and
108stopped. 127stopped.
109 128
129The worker pool is dynamically resized, according to (perceived :)
130load. The minimum size is given by the C<idle> parameter and the maximum
131size is given by the C<max> parameter. A new worker is started every
132C<start> seconds at most, and an idle worker is stopped at most every
133C<stop> second.
134
135You can specify the amount of jobs sent to a worker concurrently using the
136C<load> parameter.
137
110=over 4 138=over 4
111 139
112=item idle => $count (default: 0) 140=item idle => $count (default: 0)
113 141
114The minimum amount of idle processes in the pool - when there are fewer 142The minimum amount of idle processes in the pool - when there are fewer
115than this many idle workers, C<AnyEvent::Fork::Pool> will try to start new 143than this many idle workers, C<AnyEvent::Fork::Pool> will try to start new
116ones, subject to C<max> and C<start>. 144ones, subject to the limits set by C<max> and C<start>.
117 145
118This is also the initial/minimum amount of workers in the pool. The 146This is also the initial amount of workers in the pool. The default of
119default of zero means that the pool starts empty and can shrink back to 147zero means that the pool starts empty and can shrink back to zero workers
120zero workers over time. 148over time.
121 149
122=item max => $count (default: 4) 150=item max => $count (default: 4)
123 151
124The maximum number of processes in the pool, in addition to the template 152The maximum number of processes in the pool, in addition to the template
125process. C<AnyEvent::Fork::Pool> will never create more than this number 153process. C<AnyEvent::Fork::Pool> will never have more than this number of
126of worker processes, although there can be more temporarily when a worker 154worker processes, although there can be more temporarily when a worker is
127is shut down and hasn't exited yet. 155shut down and hasn't exited yet.
128 156
129=item load => $count (default: 2) 157=item load => $count (default: 2)
130 158
131The maximum number of concurrent jobs sent to a single worker 159The maximum number of concurrent jobs sent to a single worker process.
132process. Worker processes that handle this number of jobs already are
133called "busy".
134 160
135Jobs that cannot be sent to a worker immediately (because all workers are 161Jobs that cannot be sent to a worker immediately (because all workers are
136busy) will be queued until a worker is available. 162busy) will be queued until a worker is available.
137 163
164Setting this low improves latency. For example, at C<1>, every job that
165is sent to a worker is sent to a completely idle worker that doesn't run
166any other jobs. The downside is that throughput is reduced - a worker that
167finishes a job needs to wait for a new job from the parent.
168
169The default of C<2> is usually a good compromise.
170
138=item start => $seconds (default: 0.1) 171=item start => $seconds (default: 0.1)
139 172
140When a job is queued and all workers are busy, a timer is started. If the 173When there are fewer than C<idle> workers (or all workers are completely
141timer elapses and there are still jobs that cannot be queued to a worker, 174busy), then a timer is started. If the timer elapses and there are still
142a new worker is started. 175jobs that cannot be queued to a worker, a new worker is started.
143 176
144This configurs the time that all workers must be busy before a new worker 177This sets the minimum time that all workers must be busy before a new
145is started. Or, put differently, the minimum delay betwene starting new 178worker is started. Or, put differently, the minimum delay between starting
146workers. 179new workers.
147 180
148The delay is zero by default, which means new workers will be started 181The delay is small by default, which means new workers will be started
149without delay. 182relatively quickly. A delay of C<0> is possible, and ensures that the pool
183will grow as quickly as possible under load.
150 184
185Non-zero values are useful to avoid "exploding" a pool because a lot of
186jobs are queued in an instant.
187
188Higher values are often useful to improve efficiency at the cost of
189latency - when fewer processes can do the job over time, starting more and
190more is not necessarily going to help.
191
151=item stop => $seconds (default: 1) 192=item stop => $seconds (default: 10)
152 193
153When a worker has no jobs to execute it becomes idle. An idle worker that 194When a worker has no jobs to execute it becomes idle. An idle worker that
154hasn't executed a job within this amount of time will be stopped, unless 195hasn't executed a job within this amount of time will be stopped, unless
155the other parameters say otherwise. 196the other parameters say otherwise.
156 197
198Setting this to a very high value means that workers stay around longer,
199even when they have nothing to do, which can be good as they don't have to
200be started on the netx load spike again.
201
202Setting this to a lower value can be useful to avoid memory or simply
203process table wastage.
204
205Usually, setting this to a time longer than the time between load spikes
206is best - if you expect a lot of requests every minute and little work
207in between, setting this to longer than a minute avoids having to stop
208and start workers. On the other hand, you have to ask yourself if letting
209workers run idle is a good use of your resources. Try to find a good
210balance between resource usage of your workers and the time to start new
211workers - the processes created by L<AnyEvent::Fork> itself is fats at
212creating workers while not using much memory for them, so most of the
213overhead is likely from your own code.
214
157=item on_destroy => $callback->() (default: none) 215=item on_destroy => $callback->() (default: none)
158 216
159When a pool object goes out of scope, it will still handle all outstanding 217When a pool object goes out of scope, the outstanding requests are still
160jobs. After that, it will destroy all workers (and also the template 218handled till completion. Only after handling all jobs will the workers
161process if it isn't referenced otherwise). 219be destroyed (and also the template process if it isn't referenced
220otherwise).
221
222To find out when a pool I<really> has finished its work, you can set this
223callback, which will be called when the pool has been destroyed.
162 224
163=back 225=back
164 226
165=item Template Process 227=item AnyEvent::Fork::RPC Parameters
166 228
167The worker processes are all forked from a single template 229These parameters are all passed more or less directly to
168process. Ideally, all modules and all cdoe used by the worker, as well as 230L<AnyEvent::Fork::RPC>. They are only briefly mentioned here, for
169any shared data structures should be loaded into the template process, to 231their full documentation please refer to the L<AnyEvent::Fork::RPC>
170take advantage of data sharing via fork. 232documentation. Also, the default values mentioned here are only documented
171 233as a best effort - the L<AnyEvent::Fork::RPC> documentation is binding.
172You can create your own template process by creating a L<AnyEvent::Fork>
173object yourself and passing it as the C<template> parameter, but
174C<AnyEvent::Fork::Pool> can create one for you, including some standard
175options.
176 234
177=over 4 235=over 4
178 236
179=item template => $fork (default: C<< AnyEvent::Fork->new >>)
180
181The template process to use, if you want to create your own.
182
183=item require => \@modules (default: C<[]>)
184
185The modules in this list will be laoded into the template process.
186
187=item eval => "# perl code to execute in template" (default: none)
188
189This is a perl string that is evaluated after creating the template
190process and after requiring the modules. It can do whatever it wants to
191configure the process, but it must not do anything that would keep a later
192fork from working (so must not create event handlers or (real) threads for
193example).
194
195=back
196
197=item AnyEvent::Fork::RPC Parameters
198
199These parameters are all passed directly to L<AnyEvent::Fork::RPC>. They
200are only briefly mentioned here, for their full documentation
201please refer to the L<AnyEvent::Fork::RPC> documentation. Also, the
202default values mentioned here are only documented as a best effort -
203L<AnyEvent::Fork::RPC> documentation is binding.
204
205=over 4
206
207=item async => $boolean (default: 0) 237=item async => $boolean (default: 0)
208 238
209Whether to sue the synchronous or asynchronous RPC backend. 239Whether to use the synchronous or asynchronous RPC backend.
210 240
211=item on_error => $callback->($message) (default: die with message) 241=item on_error => $callback->($message) (default: die with message)
212 242
213The callback to call on any (fatal) errors. 243The callback to call on any (fatal) errors.
214 244
235 265
236 my $max = $arg{max} || 4; 266 my $max = $arg{max} || 4;
237 my $idle = $arg{idle} || 0, 267 my $idle = $arg{idle} || 0,
238 my $load = $arg{load} || 2, 268 my $load = $arg{load} || 2,
239 my $start = $arg{start} || 0.1, 269 my $start = $arg{start} || 0.1,
240 my $stop = $arg{stop} || 1, 270 my $stop = $arg{stop} || 10,
241 my $on_event = $arg{on_event} || sub { }, 271 my $on_event = $arg{on_event} || sub { },
242 my $on_destroy = $arg{on_destroy}; 272 my $on_destroy = $arg{on_destroy};
243 273
244 my @rpc = ( 274 my @rpc = (
245 async => $arg{async}, 275 async => $arg{async},
258 288
259 $template 289 $template
260 ->require ("AnyEvent::Fork::RPC::" . ($arg{async} ? "Async" : "Sync")) 290 ->require ("AnyEvent::Fork::RPC::" . ($arg{async} ? "Async" : "Sync"))
261 ->eval (' 291 ->eval ('
262 my ($magic0, $magic1) = @_; 292 my ($magic0, $magic1) = @_;
263 sub AnyEvent::Fork::Pool::quit() { 293 sub AnyEvent::Fork::Pool::retire() {
264 AnyEvent::Fork::RPC::on_event $magic0, "quit", $magic1; 294 AnyEvent::Fork::RPC::event $magic0, "quit", $magic1;
265 } 295 }
266 ', $magic0, $magic1) 296 ', $magic0, $magic1)
267 ->eval ($arg{eval}); 297 ;
268 298
269 $start_worker = sub { 299 $start_worker = sub {
270 my $proc = [0, 0, undef]; # load, index, rpc 300 my $proc = [0, 0, undef]; # load, index, rpc
271 301
272 $proc->[2] = $template 302 $proc->[2] = $template
397 427
398=item $pool->(..., $cb->(...)) 428=item $pool->(..., $cb->(...))
399 429
400Call the RPC function of a worker with the given arguments, and when the 430Call the RPC function of a worker with the given arguments, and when the
401worker is done, call the C<$cb> with the results, just like calling the 431worker is done, call the C<$cb> with the results, just like calling the
402L<AnyEvent::Fork::RPC> object directly. 432RPC object durectly - see the L<AnyEvent::Fork::RPC> documentation for
433details on the RPC API.
403 434
404If there is no free worker, the call will be queued. 435If there is no free worker, the call will be queued until a worker becomes
436available.
405 437
406Note that there can be considerable time between calling this method and 438Note that there can be considerable time between calling this method and
407the call actually being executed. During this time, the parameters passed 439the call actually being executed. During this time, the parameters passed
408to this function are effectively read-only - modifying them after the call 440to this function are effectively read-only - modifying them after the call
409and before the callback is invoked causes undefined behaviour. 441and before the callback is invoked causes undefined behaviour.
410 442
411=cut 443=cut
412 444
413=back 445=back
414 446
447=head1 CHILD USAGE
448
449In addition to the L<AnyEvent::Fork::RPC> API, this module implements one
450more child-side function:
451
452=over 4
453
454=item AnyEvent::Fork::Pool::retire ()
455
456This function sends an event to the parent process to request retirement:
457the worker is removed from the pool and no new jobs will be sent to it,
458but it has to handle the jobs that are already queued.
459
460The parentheses are part of the syntax: the function usually isn't defined
461when you compile your code (because that happens I<before> handing the
462template process over to C<AnyEvent::Fork::Pool::run>, so you need the
463empty parentheses to tell Perl that the function is indeed a function.
464
465Retiring a worker can be useful to gracefully shut it down when the worker
466deems this useful. For example, after executing a job, one could check
467the process size or the number of jobs handled so far, and if either is
468too high, the worker could ask to get retired, to avoid memory leaks to
469accumulate.
470
471=back
472
473=head1 POOL PARAMETERS RECIPES
474
475This section describes some recipes for pool paramaters. These are mostly
476meant for the synchronous RPC backend, as the asynchronous RPC backend
477changes the rules considerably, making workers themselves responsible for
478their scheduling.
479
480=over 4
481
482=item low latency - set load = 1
483
484If you need a deterministic low latency, you should set the C<load>
485parameter to C<1>. This ensures that never more than one job is sent to
486each worker. This avoids having to wait for a previous job to finish.
487
488This makes most sense with the synchronous (default) backend, as the
489asynchronous backend can handle multiple requests concurrently.
490
491=item lowest latency - set load = 1 and idle = max
492
493To achieve the lowest latency, you additionally should disable any dynamic
494resizing of the pool by setting C<idle> to the same value as C<max>.
495
496=item high throughput, cpu bound jobs - set load >= 2, max = #cpus
497
498To get high throughput with cpu-bound jobs, you should set the maximum
499pool size to the number of cpus in your system, and C<load> to at least
500C<2>, to make sure there can be another job waiting for the worker when it
501has finished one.
502
503The value of C<2> for C<load> is the minimum value that I<can> achieve
504100% throughput, but if your parent process itself is sometimes busy, you
505might need higher values. Also there is a limit on the amount of data that
506can be "in flight" to the worker, so if you send big blobs of data to your
507worker, C<load> might have much less of an effect.
508
509=item high throughput, I/O bound jobs - set load >= 2, max = 1, or very high
510
511When your jobs are I/O bound, using more workers usually boils down to
512higher throughput, depending very much on your actual workload - sometimes
513having only one worker is best, for example, when you read or write big
514files at maixmum speed, as a second worker will increase seek times.
515
516=back
517
415=head1 SEE ALSO 518=head1 SEE ALSO
416 519
417L<AnyEvent::Fork>, to create the processes in the first place. 520L<AnyEvent::Fork>, to create the processes in the first place.
418 521
419L<AnyEvent::Fork::RPC>, which implements the RPC protocol and API. 522L<AnyEvent::Fork::RPC>, which implements the RPC protocol and API.

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines