--- Coro/Coro.pm 2007/01/04 23:49:27 1.104 +++ Coro/Coro.pm 2007/03/04 11:46:51 1.116 @@ -52,9 +52,9 @@ our $main; # main coroutine our $current; # current coroutine -our $VERSION = '3.3'; +our $VERSION = '3.51'; -our @EXPORT = qw(async cede schedule terminate current unblock_sub); +our @EXPORT = qw(async async_pool cede schedule terminate current unblock_sub); our %EXPORT_TAGS = ( prio => [qw(PRIO_MAX PRIO_HIGH PRIO_NORMAL PRIO_LOW PRIO_IDLE PRIO_MIN)], ); @@ -205,6 +205,60 @@ $coro } +=item async_pool { ... } [@args...] + +Similar to C, but uses a coroutine pool, so you should not call +terminate or join (although you are allowed to), and you get a coroutine +that might have executed other code already (which can be good or bad :). + +Also, the block is executed in an C context and a warning will be +issued in case of an exception instead of terminating the program, as +C does. As the coroutine is being reused, stuff like C +will not work in the expected way, unless you call terminate or cancel, +which somehow defeats the purpose of pooling. + +The priority will be reset to C<0> after each job, otherwise the coroutine +will be re-used "as-is". + +The pool size is limited to 8 idle coroutines (this can be adjusted by +changing $Coro::POOL_SIZE), and there can be as many non-idle coros as +required. + +If you are concerned about pooled coroutines growing a lot because a +single C used a lot of stackspace you can e.g. C once per second or so to slowly replenish the pool. + +=cut + +our $POOL_SIZE = 8; +our @pool; + +sub pool_handler { + while () { + eval { + my ($cb, @arg) = @{ delete $current->{_invoke} or return }; + $cb->(@arg); + }; + warn $@ if $@; + + last if @pool >= $POOL_SIZE; + push @pool, $current; + + $current->prio (0); + schedule; + } +} + +sub async_pool(&@) { + # this is also inlined into the unlock_scheduler + my $coro = (pop @pool or new Coro \&pool_handler); + + $coro->{_invoke} = [@_]; + $coro->ready; + + $coro +} + =item schedule Calls the scheduler. Please note that the current coroutine will not be put @@ -237,11 +291,15 @@ ready queue and calls C, which has the effect of giving up the current "timeslice" to other coroutines of the same or higher priority. +Returns true if at least one coroutine switch has happened. + =item Coro::cede_notself Works like cede, but is not exported by default and will cede to any coroutine, regardless of priority, once. +Returns true if at least one coroutine switch has happened. + =item terminate [arg...] Terminates the current coroutine with the given status values (see L). @@ -463,31 +521,23 @@ =cut -our @unblock_pool; our @unblock_queue; -our $UNBLOCK_POOL_SIZE = 2; - -sub unblock_handler_ { - while () { - my ($cb, @arg) = @{ delete $Coro::current->{arg} }; - $cb->(@arg); - - last if @unblock_pool >= $UNBLOCK_POOL_SIZE; - push @unblock_pool, $Coro::current; - schedule; - } -} +# we create a special coro because we want to cede, +# to reduce pressure on the coro pool (because most callbacks +# return immediately and can be reused) and because we cannot cede +# inside an event callback. our $unblock_scheduler = async { while () { while (my $cb = pop @unblock_queue) { - my $handler = (pop @unblock_pool or new Coro \&unblock_handler_); - $handler->{arg} = $cb; - $handler->ready; - cede; - } + # this is an inlined copy of async_pool + my $coro = (pop @pool or new Coro \&pool_handler); - schedule; + $coro->{_invoke} = $cb; + $coro->ready; + cede; # for short-lived callbacks, this reduces pressure on the coro pool + } + schedule; # sleep well } }; @@ -495,7 +545,7 @@ my $cb = shift; sub { - push @unblock_queue, [$cb, @_]; + unshift @unblock_queue, [$cb, @_]; $unblock_scheduler->ready; } }