ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-Fork-Pool/Pool.pm
(Generate patch)

Comparing AnyEvent-Fork-Pool/Pool.pm (file contents):
Revision 1.4 by root, Sat Apr 20 16:39:14 2013 UTC vs.
Revision 1.5 by root, Sat Apr 20 19:33:23 2013 UTC

79my $magic0 = ':t6Z@HK1N%Dx@_7?=~-7NQgWDdAs6a,jFN=wLO0*jD*1%P'; 79my $magic0 = ':t6Z@HK1N%Dx@_7?=~-7NQgWDdAs6a,jFN=wLO0*jD*1%P';
80my $magic1 = '<~53rexz.U`!]X[A235^"fyEoiTF\T~oH1l/N6+Djep9b~bI9`\1x%B~vWO1q*'; 80my $magic1 = '<~53rexz.U`!]X[A235^"fyEoiTF\T~oH1l/N6+Djep9b~bI9`\1x%B~vWO1q*';
81 81
82our $VERSION = 0.1; 82our $VERSION = 0.1;
83 83
84=item my $rpc = AnyEvent::Fork::Pool::run $fork, $function, [key => value...] 84=item my $pool = AnyEvent::Fork::Pool::run $fork, $function, [key => value...]
85 85
86The traditional way to call it. But it is way cooler to call it in the 86The traditional way to call it. But it is way cooler to call it in the
87following way: 87following way:
88 88
89=item my $rpc = $fork->AnyEvent::Fork::Pool::run ($function, [key => value...]) 89=item my $pool = $fork->AnyEvent::Fork::Pool::run ($function, [key => value...])
90 90
91Creates a new pool object with the specified C<$function> as function 91Creates a new pool object with the specified C<$function> as function
92(name) to call for each request. The pool uses the C<$fork> object as the 92(name) to call for each request. The pool uses the C<$fork> object as the
93template when creating worker processes. 93template when creating worker processes.
94 94
103 103
104=item Pool Management 104=item Pool Management
105 105
106The pool consists of a certain number of worker processes. These options 106The pool consists of a certain number of worker processes. These options
107decide how many of these processes exist and when they are started and 107decide how many of these processes exist and when they are started and
108stopp.ed 108stopped.
109 109
110=over 4 110=over 4
111 111
112=item idle => $count (default: 0) 112=item idle => $count (default: 0)
113 113
240 my $stop = $arg{stop} || 1, 240 my $stop = $arg{stop} || 1,
241 my $on_event = $arg{on_event} || sub { }, 241 my $on_event = $arg{on_event} || sub { },
242 my $on_destroy = $arg{on_destroy}; 242 my $on_destroy = $arg{on_destroy};
243 243
244 my @rpc = ( 244 my @rpc = (
245 async => $arg{async}, 245 async => $arg{async},
246 init => $arg{init}, 246 init => $arg{init},
247 serialiser => $arg{serialiser}, 247 serialiser => delete $arg{serialiser},
248 on_error => $arg{on_error}, 248 on_error => $arg{on_error},
249 ); 249 );
250 250
251 my (@pool, @queue, $nidle, $start_w, $stop_w, $shutdown); 251 my (@pool, @queue, $nidle, $start_w, $stop_w, $shutdown);
252 my ($start, $stop, $want_start, $want_stop, $scheduler); 252 my ($start_worker, $stop_worker, $want_start, $want_stop, $scheduler);
253 253
254 my $destroy_guard = Guard::guard { 254 my $destroy_guard = Guard::guard {
255 $on_destroy->() 255 $on_destroy->()
256 if $on_destroy; 256 if $on_destroy;
257 }; 257 };
262 my ($magic0, $magic1) = @_; 262 my ($magic0, $magic1) = @_;
263 sub AnyEvent::Fork::Pool::quit() { 263 sub AnyEvent::Fork::Pool::quit() {
264 AnyEvent::Fork::RPC::on_event $magic0, "quit", $magic1; 264 AnyEvent::Fork::RPC::on_event $magic0, "quit", $magic1;
265 } 265 }
266 ', $magic0, $magic1) 266 ', $magic0, $magic1)
267 ->eval (delete $arg{eval}); 267 ->eval ($arg{eval});
268 268
269 $start = sub { 269 $start_worker = sub {
270 my $proc = [0, 0, undef]; # load, index, rpc 270 my $proc = [0, 0, undef]; # load, index, rpc
271
272 warn "start a worker\n";#d#
273 271
274 $proc->[2] = $template 272 $proc->[2] = $template
275 ->fork 273 ->fork
276 ->AnyEvent::Fork::RPC::run ($function, 274 ->AnyEvent::Fork::RPC::run ($function,
277 @rpc, 275 @rpc,
278 on_event => sub { 276 on_event => sub {
279 if (@_ == 3 && $_[0] eq $magic0 && $_[2] eq $magic1) { 277 if (@_ == 3 && $_[0] eq $magic0 && $_[2] eq $magic1) {
280 $destroy_guard if 0; # keep it alive 278 $destroy_guard if 0; # keep it alive
281 279
282 $_[1] eq "quit" and $stop->($proc); 280 $_[1] eq "quit" and $stop_worker->($proc);
283 return; 281 return;
284 } 282 }
285 283
286 &$on_event; 284 &$on_event;
287 }, 285 },
288 ) 286 )
289 ; 287 ;
290 288
291 ++$nidle; 289 ++$nidle;
292 Array::Heap::push_heap @pool, $proc; 290 Array::Heap::push_heap_idx @pool, $proc;
293 291
294 Scalar::Util::weaken $proc; 292 Scalar::Util::weaken $proc;
295 }; 293 };
296 294
297 $stop = sub { 295 $stop_worker = sub {
298 my $proc = shift; 296 my $proc = shift;
299 297
300 $proc->[0] 298 $proc->[0]
301 or --$nidle; 299 or --$nidle;
302 300
305 }; 303 };
306 304
307 $want_start = sub { 305 $want_start = sub {
308 undef $stop_w; 306 undef $stop_w;
309 307
310 $start_w ||= AE::timer $start, 0, sub { 308 $start_w ||= AE::timer $start, $start, sub {
311 undef $start_w; 309 if (($nidle < $idle || @queue) && @pool < $max) {
312
313 if (@queue) {
314 $start->(); 310 $start_worker->();
315 $scheduler->(); 311 $scheduler->();
312 } else {
313 undef $start_w;
316 } 314 }
317 }; 315 };
318 }; 316 };
319 317
320 $want_stop = sub { 318 $want_stop = sub {
321 $stop_w ||= AE::timer $stop, 0, sub { 319 $stop_w ||= AE::timer $stop, $stop, sub {
322 undef $stop_w;
323
324 $stop->($pool[0]) 320 $stop_worker->($pool[0])
325 if $nidle; 321 if $nidle;
322
323 undef $stop_w
324 if $nidle <= $idle;
326 }; 325 };
327 }; 326 };
328 327
329 $scheduler = sub { 328 $scheduler = sub {
330 if (@queue) { 329 if (@queue) {
331 while (@queue) { 330 while (@queue) {
332 my $proc = $pool[0]; 331 my $proc = $pool[0];
333 332
334 if ($proc->[0] < $load) { 333 if ($proc->[0] < $load) {
335 warn "free $proc $proc->[0]\n";#d#
336 # found free worker 334 # found free worker, increase load
337 $proc->[0]++ 335 unless ($proc->[0]++) {
336 # worker became busy
338 or --$nidle >= $idle 337 --$nidle
338 or undef $stop_w;
339
339 or $want_start->(); 340 $want_start->()
341 if $nidle < $idle && @pool < $max;
342 }
340 343
341 Array::Heap::adjust_heap @pool, 0; 344 Array::Heap::adjust_heap_idx @pool, 0;
342 345
343 my $job = shift @queue; 346 my $job = shift @queue;
344 my $ocb = pop @$job; 347 my $ocb = pop @$job;
345 348
346 $proc->[2]->(@$job, sub { 349 $proc->[2]->(@$job, sub {
347 # reduce queue counter 350 # reduce load
348 --$pool[$_][0] 351 --$proc->[0] # worker still busy?
349 or ++$nidle > $idle 352 or ++$nidle > $idle # not too many idle processes?
350 or $want_stop->(); 353 or $want_stop->();
351 354
352 Array::Heap::adjust_heap @pool, $_; 355 Array::Heap::adjust_heap_idx @pool, $proc->[1]
356 if defined $proc->[1];
353 357
354 $scheduler->(); 358 $scheduler->();
355 359
356 &$ocb; 360 &$ocb;
357 }); 361 });
358 } else { 362 } else {
359 warn "busy $proc->[0]\n";#d#
360 # all busy, delay
361
362 $want_start->(); 363 $want_start->()
364 unless @pool >= $max;
365
363 last; 366 last;
364 } 367 }
365 } 368 }
366 } elsif ($shutdown) { 369 } elsif ($shutdown) {
367 @pool = (); 370 @pool = ();
368 undef $start_w; 371 undef $start_w;
369 undef $start; # frees $destroy_guard reference 372 undef $start_worker; # frees $destroy_guard reference
370 373
371 $stop->($pool[0]) 374 $stop_worker->($pool[0])
372 while $nidle; 375 while $nidle;
373 } 376 }
374 }; 377 };
375 378
376 my $shutdown_guard = Guard::guard { 379 my $shutdown_guard = Guard::guard {
377 $shutdown = 1; 380 $shutdown = 1;
378 $scheduler->(); 381 $scheduler->();
379 }; 382 };
380 383
381 $start->() 384 $start_worker->()
382 while @pool < $idle; 385 while @pool < $idle;
383 386
384 sub { 387 sub {
385 $shutdown_guard if 0; # keep it alive 388 $shutdown_guard if 0; # keep it alive
386 389
387 $start->() 390 $start_worker->()
388 unless @pool; 391 unless @pool;
389 392
390 push @queue, [@_]; 393 push @queue, [@_];
391 $scheduler->(); 394 $scheduler->();
392 } 395 }
393} 396}
394 397
395=item $pool->call (..., $cb->(...)) 398=item $pool->(..., $cb->(...))
396 399
397Call the RPC function of a worker with the given arguments, and when the 400Call the RPC function of a worker with the given arguments, and when the
398worker is done, call the C<$cb> with the results, like just calling the 401worker is done, call the C<$cb> with the results, just like calling the
399L<AnyEvent::Fork::RPC> object directly. 402L<AnyEvent::Fork::RPC> object directly.
400 403
401If there is no free worker, the call will be queued. 404If there is no free worker, the call will be queued.
402 405
403Note that there can be considerable time between calling this method and 406Note that there can be considerable time between calling this method and

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines