ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-Fork-Pool/Pool.pm
(Generate patch)

Comparing AnyEvent-Fork-Pool/Pool.pm (file contents):
Revision 1.3 by root, Sat Apr 20 16:38:10 2013 UTC vs.
Revision 1.5 by root, Sat Apr 20 19:33:23 2013 UTC

79my $magic0 = ':t6Z@HK1N%Dx@_7?=~-7NQgWDdAs6a,jFN=wLO0*jD*1%P'; 79my $magic0 = ':t6Z@HK1N%Dx@_7?=~-7NQgWDdAs6a,jFN=wLO0*jD*1%P';
80my $magic1 = '<~53rexz.U`!]X[A235^"fyEoiTF\T~oH1l/N6+Djep9b~bI9`\1x%B~vWO1q*'; 80my $magic1 = '<~53rexz.U`!]X[A235^"fyEoiTF\T~oH1l/N6+Djep9b~bI9`\1x%B~vWO1q*';
81 81
82our $VERSION = 0.1; 82our $VERSION = 0.1;
83 83
84=item my $rpc = AnyEvent::Fork::Pool::run $fork, $function, [key => value...] 84=item my $pool = AnyEvent::Fork::Pool::run $fork, $function, [key => value...]
85 85
86The traditional way to call it. But it is way cooler to call it in the 86The traditional way to call it. But it is way cooler to call it in the
87following way: 87following way:
88 88
89=item my $rpc = $fork->AnyEvent::Fork::Pool::run ($function, [key => value...]) 89=item my $pool = $fork->AnyEvent::Fork::Pool::run ($function, [key => value...])
90 90
91Creates a new pool object with the specified C<$function> as function 91Creates a new pool object with the specified C<$function> as function
92(name) to call for each request. The pool uses the C<$fork> object as the 92(name) to call for each request. The pool uses the C<$fork> object as the
93template when creating worker processes. 93template when creating worker processes.
94 94
103 103
104=item Pool Management 104=item Pool Management
105 105
106The pool consists of a certain number of worker processes. These options 106The pool consists of a certain number of worker processes. These options
107decide how many of these processes exist and when they are started and 107decide how many of these processes exist and when they are started and
108stopp.ed 108stopped.
109 109
110=over 4 110=over 4
111 111
112=item idle => $count (default: 0) 112=item idle => $count (default: 0)
113 113
240 my $stop = $arg{stop} || 1, 240 my $stop = $arg{stop} || 1,
241 my $on_event = $arg{on_event} || sub { }, 241 my $on_event = $arg{on_event} || sub { },
242 my $on_destroy = $arg{on_destroy}; 242 my $on_destroy = $arg{on_destroy};
243 243
244 my @rpc = ( 244 my @rpc = (
245 async => $arg{async}, 245 async => $arg{async},
246 init => $arg{init}, 246 init => $arg{init},
247 serialiser => $arg{serialiser}, 247 serialiser => delete $arg{serialiser},
248 on_error => $arg{on_error}, 248 on_error => $arg{on_error},
249 ); 249 );
250 250
251 my (@pool, @queue, $nidle, $start_w, $stop_w, $shutdown); 251 my (@pool, @queue, $nidle, $start_w, $stop_w, $shutdown);
252 my ($start, $stop, $want_start, $want_stop, $scheduler); 252 my ($start_worker, $stop_worker, $want_start, $want_stop, $scheduler);
253 253
254 my $destroy_guard = Guard::guard { 254 my $destroy_guard = Guard::guard {
255 $on_destroy->() 255 $on_destroy->()
256 if $on_destroy; 256 if $on_destroy;
257 }; 257 };
258
259 my $busy;#d#
260 258
261 $template 259 $template
262 ->require ("AnyEvent::Fork::RPC::" . ($arg{async} ? "Async" : "Sync")) 260 ->require ("AnyEvent::Fork::RPC::" . ($arg{async} ? "Async" : "Sync"))
263 ->eval (' 261 ->eval ('
264 my ($magic0, $magic1) = @_; 262 my ($magic0, $magic1) = @_;
265 sub AnyEvent::Fork::Pool::quit() { 263 sub AnyEvent::Fork::Pool::quit() {
266 AnyEvent::Fork::RPC::on_event $magic0, "quit", $magic1; 264 AnyEvent::Fork::RPC::on_event $magic0, "quit", $magic1;
267 } 265 }
268 ', $magic0, $magic1) 266 ', $magic0, $magic1)
269 ->eval (delete $arg{eval}); 267 ->eval ($arg{eval});
270 268
271 $start = sub { 269 $start_worker = sub {
272 my $proc = [0, 0, undef]; # load, index, rpc 270 my $proc = [0, 0, undef]; # load, index, rpc
273
274 warn "start a worker\n";#d#
275 271
276 $proc->[2] = $template 272 $proc->[2] = $template
277 ->fork 273 ->fork
278 ->AnyEvent::Fork::RPC::run ($function, 274 ->AnyEvent::Fork::RPC::run ($function,
279 @rpc, 275 @rpc,
280 on_event => sub { 276 on_event => sub {
281 if (@_ == 3 && $_[0] eq $magic0 && $_[2] eq $magic1) { 277 if (@_ == 3 && $_[0] eq $magic0 && $_[2] eq $magic1) {
282 $destroy_guard if 0; # keep it alive 278 $destroy_guard if 0; # keep it alive
283 279
284 $_[1] eq "quit" and $stop->($proc); 280 $_[1] eq "quit" and $stop_worker->($proc);
285 return; 281 return;
286 } 282 }
287 283
288 &$on_event; 284 &$on_event;
289 }, 285 },
290 ) 286 )
291 ; 287 ;
292 288
293 ++$nidle; 289 ++$nidle;
294 Array::Heap::push_heap @pool, $proc; 290 Array::Heap::push_heap_idx @pool, $proc;
295 291
296 Scalar::Util::weaken $proc; 292 Scalar::Util::weaken $proc;
297 }; 293 };
298 294
299 $stop = sub { 295 $stop_worker = sub {
300 my $proc = shift; 296 my $proc = shift;
301 297
302 $proc->[0] 298 $proc->[0]
303 or --$nidle; 299 or --$nidle;
304 300
307 }; 303 };
308 304
309 $want_start = sub { 305 $want_start = sub {
310 undef $stop_w; 306 undef $stop_w;
311 307
312 $start_w ||= AE::timer $start, 0, sub { 308 $start_w ||= AE::timer $start, $start, sub {
313 undef $start_w; 309 if (($nidle < $idle || @queue) && @pool < $max) {
314
315 if (@queue) {
316 $start->(); 310 $start_worker->();
317 $scheduler->(); 311 $scheduler->();
312 } else {
313 undef $start_w;
318 } 314 }
319 }; 315 };
320 }; 316 };
321 317
322 $want_stop = sub { 318 $want_stop = sub {
323 $stop_w ||= AE::timer $stop, 0, sub { 319 $stop_w ||= AE::timer $stop, $stop, sub {
324 undef $stop_w;
325
326 $stop->($pool[0]) 320 $stop_worker->($pool[0])
327 if $nidle; 321 if $nidle;
322
323 undef $stop_w
324 if $nidle <= $idle;
328 }; 325 };
329 }; 326 };
330 327
331 $scheduler = sub { 328 $scheduler = sub {
332 if (@queue) { 329 if (@queue) {
333 while (@queue) { 330 while (@queue) {
334 my $proc = $pool[0]; 331 my $proc = $pool[0];
335 332
336 if ($proc->[0] < $load) { 333 if ($proc->[0] < $load) {
337 warn "free $proc $proc->[0]\n";#d#
338 # found free worker 334 # found free worker, increase load
339 $proc->[0]++ 335 unless ($proc->[0]++) {
336 # worker became busy
340 or --$nidle >= $idle 337 --$nidle
338 or undef $stop_w;
339
341 or $want_start->(); 340 $want_start->()
341 if $nidle < $idle && @pool < $max;
342 }
342 343
343 Array::Heap::adjust_heap @pool, 0; 344 Array::Heap::adjust_heap_idx @pool, 0;
344 345
345 my $job = shift @queue; 346 my $job = shift @queue;
346 my $ocb = pop @$job; 347 my $ocb = pop @$job;
347 348
348 $proc->[2]->(@$job, sub { 349 $proc->[2]->(@$job, sub {
349 --$busy; warn "busy now $busy\n";#d#
350 # reduce queue counter 350 # reduce load
351 --$pool[$_][0] 351 --$proc->[0] # worker still busy?
352 or ++$nidle > $idle 352 or ++$nidle > $idle # not too many idle processes?
353 or $want_stop->(); 353 or $want_stop->();
354 354
355 Array::Heap::adjust_heap @pool, $_; 355 Array::Heap::adjust_heap_idx @pool, $proc->[1]
356 if defined $proc->[1];
356 357
357 $scheduler->(); 358 $scheduler->();
358 359
359 &$ocb; 360 &$ocb;
360 }); 361 });
361 } else { 362 } else {
362 warn "busy $proc->[0]\n";#d#
363 # all busy, delay
364
365 $want_start->(); 363 $want_start->()
364 unless @pool >= $max;
365
366 last; 366 last;
367 } 367 }
368 } 368 }
369 } elsif ($shutdown) { 369 } elsif ($shutdown) {
370 @pool = (); 370 @pool = ();
371 undef $start_w; 371 undef $start_w;
372 undef $start; # frees $destroy_guard reference 372 undef $start_worker; # frees $destroy_guard reference
373 373
374 $stop->($pool[0]) 374 $stop_worker->($pool[0])
375 while $nidle; 375 while $nidle;
376 } 376 }
377 }; 377 };
378 378
379 my $shutdown_guard = Guard::guard { 379 my $shutdown_guard = Guard::guard {
380 $shutdown = 1; 380 $shutdown = 1;
381 $scheduler->(); 381 $scheduler->();
382 }; 382 };
383 383
384 $start->() 384 $start_worker->()
385 while @pool < $idle; 385 while @pool < $idle;
386 386
387 sub { 387 sub {
388 $shutdown_guard if 0; # keep it alive 388 $shutdown_guard if 0; # keep it alive
389 389
390 ++$busy;#d#
391
392 $start->() 390 $start_worker->()
393 unless @pool; 391 unless @pool;
394 392
395 push @queue, [@_]; 393 push @queue, [@_];
396 $scheduler->(); 394 $scheduler->();
397 } 395 }
398} 396}
399 397
400=item $pool->call (..., $cb->(...)) 398=item $pool->(..., $cb->(...))
401 399
402Call the RPC function of a worker with the given arguments, and when the 400Call the RPC function of a worker with the given arguments, and when the
403worker is done, call the C<$cb> with the results, like just calling the 401worker is done, call the C<$cb> with the results, just like calling the
404L<AnyEvent::Fork::RPC> object directly. 402L<AnyEvent::Fork::RPC> object directly.
405 403
406If there is no free worker, the call will be queued. 404If there is no free worker, the call will be queued.
407 405
408Note that there can be considerable time between calling this method and 406Note that there can be considerable time between calling this method and

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines