--- Coro/Coro.pm 2006/12/29 08:36:34 1.101 +++ Coro/Coro.pm 2007/10/02 23:16:24 1.142 @@ -22,8 +22,8 @@ This module collection manages coroutines. Coroutines are similar to threads but don't run in parallel at the same time even on SMP -machines. The specific flavor of coroutine use din this module also -guarentees you that it will not switch between coroutines unless +machines. The specific flavor of coroutine used in this module also +guarantees you that it will not switch between coroutines unless necessary, at easily-identified points in your program, so locking and parallel access are rarely an issue, making coroutine programming much safer than threads programming. @@ -52,9 +52,9 @@ our $main; # main coroutine our $current; # current coroutine -our $VERSION = '3.3'; +our $VERSION = '3.8'; -our @EXPORT = qw(async cede schedule terminate current unblock_sub); +our @EXPORT = qw(async async_pool cede schedule terminate current unblock_sub); our %EXPORT_TAGS = ( prio => [qw(PRIO_MAX PRIO_HIGH PRIO_NORMAL PRIO_LOW PRIO_IDLE PRIO_MIN)], ); @@ -110,13 +110,15 @@ is C<$main> (of course). This variable is B I. It is provided for performance -reasons. If performance is not essentiel you are encouraged to use the +reasons. If performance is not essential you are encouraged to use the C function instead. =cut +$main->{desc} = "[main::]"; + # maybe some other module used Coro::Specific before... -$main->{specific} = $current->{specific} +$main->{_specific} = $current->{_specific} if $current; _set_current $main; @@ -143,33 +145,33 @@ Carp::croak ("FATAL: deadlock detected"); }; +sub _cancel { + my ($self) = @_; + + # free coroutine data and mark as destructed + $self->_destroy + or return; + + # call all destruction callbacks + $_->(@{$self->{_status}}) + for @{(delete $self->{_on_destroy}) || []}; +} + # this coroutine is necessary because a coroutine # cannot destroy itself. my @destroy; -my $manager; $manager = new Coro sub { +my $manager; + +$manager = new Coro sub { while () { - # by overwriting the state object with the manager we destroy it - # while still being able to schedule this coroutine (in case it has - # been readied multiple times. this is harmless since the manager - # can be called as many times as neccessary and will always - # remove itself from the runqueue - while (@destroy) { - my $coro = pop @destroy; - - $coro->{status} ||= []; - - $_->ready for @{(delete $coro->{join} ) || []}; - $_->(@{$coro->{status}}) for @{(delete $coro->{destroy_cb}) || []}; - - # the next line destroys the coro state, but keeps the - # coroutine itself intact (we basically make it a zombie - # coroutine that always runs the manager thread, so it's possible - # to transfer() to this coroutine). - $coro->_clone_state_from ($manager); - } + (shift @destroy)->_cancel + while @destroy; + &schedule; } }; +$manager->desc ("[coro manager]"); +$manager->prio (PRIO_MAX); # static methods. not really. @@ -187,10 +189,9 @@ (usually unused). When the sub returns the new coroutine is automatically terminated. -Calling C in a coroutine will not work correctly, so do not do that. - -When the coroutine dies, the program will exit, just as in the main -program. +Calling C in a coroutine will do the same as calling exit outside +the coroutine. Likewise, when the coroutine dies, the program will exit, +just as it would in the main program. # create a new coroutine that just prints its arguments async { @@ -200,9 +201,68 @@ =cut sub async(&@) { - my $pid = new Coro @_; - $pid->ready; - $pid + my $coro = new Coro @_; + $coro->ready; + $coro +} + +=item async_pool { ... } [@args...] + +Similar to C, but uses a coroutine pool, so you should not call +terminate or join (although you are allowed to), and you get a coroutine +that might have executed other code already (which can be good or bad :). + +Also, the block is executed in an C context and a warning will be +issued in case of an exception instead of terminating the program, as +C does. As the coroutine is being reused, stuff like C +will not work in the expected way, unless you call terminate or cancel, +which somehow defeats the purpose of pooling. + +The priority will be reset to C<0> after each job, otherwise the coroutine +will be re-used "as-is". + +The pool size is limited to 8 idle coroutines (this can be adjusted by +changing $Coro::POOL_SIZE), and there can be as many non-idle coros as +required. + +If you are concerned about pooled coroutines growing a lot because a +single C used a lot of stackspace you can e.g. C once per second or so to slowly replenish the pool. In +addition to that, when the stacks used by a handler grows larger than 16kb +(adjustable with $Coro::POOL_RSS) it will also exit. + +=cut + +our $POOL_SIZE = 8; +our $POOL_RSS = 16 * 1024; +our @async_pool; + +sub pool_handler { + my $cb; + + while () { + eval { + while () { + _pool_1 $cb; + &$cb; + _pool_2 $cb; + &schedule; + } + }; + + last if $@ eq "\3terminate\2\n"; + warn $@ if $@; + } +} + +sub async_pool(&@) { + # this is also inlined into the unlock_scheduler + my $coro = (pop @async_pool) || new Coro \&pool_handler; + + $coro->{_invoke} = [@_]; + $coro->ready; + + $coro } =item schedule @@ -225,7 +285,7 @@ undef $current; }; - # call schedule until event occured. + # call schedule until event occurred. # in case we are woken up for other reasons # (current still defined), loop. Coro::schedule while $current; @@ -237,16 +297,38 @@ ready queue and calls C, which has the effect of giving up the current "timeslice" to other coroutines of the same or higher priority. +Returns true if at least one coroutine switch has happened. + +=item Coro::cede_notself + +Works like cede, but is not exported by default and will cede to any +coroutine, regardless of priority, once. + +Returns true if at least one coroutine switch has happened. + =item terminate [arg...] Terminates the current coroutine with the given status values (see L). +=item killall + +Kills/terminates/cancels all coroutines except the currently running +one. This is useful after a fork, either in the child or the parent, as +usually only one of them should inherit the running coroutines. + =cut sub terminate { $current->cancel (@_); } +sub killall { + for (Coro::State::list) { + $_->cancel + if $_ != $current && UNIVERSAL::isa $_, "Coro"; + } +} + =back # dynamic methods @@ -264,7 +346,7 @@ called. To make the coroutine run you must first put it into the ready queue by calling the ready method. -Calling C in a coroutine will not work correctly, so do not do that. +See C for additional discussion. =cut @@ -291,16 +373,22 @@ =item $coroutine->cancel (arg...) Terminates the given coroutine and makes it return the given arguments as -status (default: the empty list). +status (default: the empty list). Never returns if the coroutine is the +current coroutine. =cut sub cancel { my $self = shift; - $self->{status} = [@_]; - push @destroy, $self; - $manager->ready; - &schedule if $current == $self; + $self->{_status} = [@_]; + + if ($current == $self) { + push @destroy, $self; + $manager->ready; + &schedule while 1; + } else { + $self->_cancel; + } } =item $coroutine->join @@ -313,11 +401,19 @@ sub join { my $self = shift; - unless ($self->{status}) { - push @{$self->{join}}, $current; - &schedule; + + unless ($self->{_status}) { + my $current = $current; + + push @{$self->{_on_destroy}}, sub { + $current->ready; + undef $current; + }; + + &schedule while $current; } - wantarray ? @{$self->{status}} : $self->{status}[0]; + + wantarray ? @{$self->{_status}} : $self->{_status}[0]; } =item $coroutine->on_destroy (\&cb) @@ -331,7 +427,7 @@ sub on_destroy { my ($self, $cb) = @_; - push @{ $self->{destroy_cb} }, $cb; + push @{ $self->{_on_destroy} }, $cb; } =item $oldprio = $coroutine->prio ($newprio) @@ -366,6 +462,9 @@ Sets (or gets in case the argument is missing) the description for this coroutine. This is just a free-form string you can associate with a coroutine. +This method simply sets the C<< $coroutine->{desc} >> member to the given string. You +can modify this member directly if you wish. + =cut sub desc { @@ -383,11 +482,45 @@ =item Coro::nready Returns the number of coroutines that are currently in the ready state, -i.e. that can be swicthed to. The value C<0> means that the only runnable +i.e. that can be switched to. The value C<0> means that the only runnable coroutine is the currently running one, so C would have no effect, and C would cause a deadlock unless there is an idle handler that wakes up some coroutines. +=item my $guard = Coro::guard { ... } + +This creates and returns a guard object. Nothing happens until the object +gets destroyed, in which case the codeblock given as argument will be +executed. This is useful to free locks or other resources in case of a +runtime error or when the coroutine gets canceled, as in both cases the +guard block will be executed. The guard object supports only one method, +C<< ->cancel >>, which will keep the codeblock from being executed. + +Example: set some flag and clear it again when the coroutine gets canceled +or the function returns: + + sub do_something { + my $guard = Coro::guard { $busy = 0 }; + $busy = 1; + + # do something that requires $busy to be true + } + +=cut + +sub guard(&) { + bless \(my $cb = $_[0]), "Coro::guard" +} + +sub Coro::guard::cancel { + ${$_[0]} = sub { }; +} + +sub Coro::guard::DESTROY { + ${$_[0]}->(); +} + + =item unblock_sub { ... } This utility function takes a BLOCK or code reference and "unblocks" it, @@ -395,7 +528,7 @@ immediately without blocking, returning nothing, while the original code ref will be called (with parameters) from within its own coroutine. -The reason this fucntion exists is that many event libraries (such as the +The reason this function exists is that many event libraries (such as the venerable L module) are not coroutine-safe (a weaker form of thread-safety). This means you must not block within event callbacks, otherwise you might suffer from crashes or worse. @@ -410,39 +543,32 @@ =cut -our @unblock_pool; our @unblock_queue; -our $UNBLOCK_POOL_SIZE = 2; - -sub unblock_handler_ { - while () { - my ($cb, @arg) = @{ delete $Coro::current->{arg} }; - $cb->(@arg); - - last if @unblock_pool >= $UNBLOCK_POOL_SIZE; - push @unblock_pool, $Coro::current; - schedule; - } -} -our $unblock_scheduler = async { +# we create a special coro because we want to cede, +# to reduce pressure on the coro pool (because most callbacks +# return immediately and can be reused) and because we cannot cede +# inside an event callback. +our $unblock_scheduler = new Coro sub { while () { while (my $cb = pop @unblock_queue) { - my $handler = (pop @unblock_pool or new Coro \&unblock_handler_); - $handler->{arg} = $cb; - $handler->ready; - cede; - } + # this is an inlined copy of async_pool + my $coro = (pop @async_pool) || new Coro \&pool_handler; - schedule; + $coro->{_invoke} = $cb; + $coro->ready; + cede; # for short-lived callbacks, this reduces pressure on the coro pool + } + schedule; # sleep well } }; +$unblock_scheduler->desc ("[unblock_sub scheduler]"); sub unblock_sub(&) { my $cb = shift; sub { - push @unblock_queue, [$cb, @_]; + unshift @unblock_queue, [$cb, @_]; $unblock_scheduler->ready; } } @@ -459,7 +585,7 @@ destruction. very bad things might happen otherwise (usually segfaults). - this module is not thread-safe. You should only ever use this module - from the same thread (this requirement might be losened in the future + from the same thread (this requirement might be loosened in the future to allow per-thread schedulers, but Coro::State does not yet allow this).