ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/Coro/Coro.pm
(Generate patch)

Comparing Coro/Coro.pm (file contents):
Revision 1.133 by root, Fri Sep 21 01:23:58 2007 UTC vs.
Revision 1.140 by root, Thu Sep 27 16:25:10 2007 UTC

50 50
51our $idle; # idle handler 51our $idle; # idle handler
52our $main; # main coroutine 52our $main; # main coroutine
53our $current; # current coroutine 53our $current; # current coroutine
54 54
55our $VERSION = '3.7'; 55our $VERSION = '3.8';
56 56
57our @EXPORT = qw(async async_pool cede schedule terminate current unblock_sub); 57our @EXPORT = qw(async async_pool cede schedule terminate current unblock_sub);
58our %EXPORT_TAGS = ( 58our %EXPORT_TAGS = (
59 prio => [qw(PRIO_MAX PRIO_HIGH PRIO_NORMAL PRIO_LOW PRIO_IDLE PRIO_MIN)], 59 prio => [qw(PRIO_MAX PRIO_HIGH PRIO_NORMAL PRIO_LOW PRIO_IDLE PRIO_MIN)],
60); 60);
227 227
228If you are concerned about pooled coroutines growing a lot because a 228If you are concerned about pooled coroutines growing a lot because a
229single C<async_pool> used a lot of stackspace you can e.g. C<async_pool 229single C<async_pool> used a lot of stackspace you can e.g. C<async_pool
230{ terminate }> once per second or so to slowly replenish the pool. In 230{ terminate }> once per second or so to slowly replenish the pool. In
231addition to that, when the stacks used by a handler grows larger than 16kb 231addition to that, when the stacks used by a handler grows larger than 16kb
232(adjustable with $Coro::MAX_POOL_RSS) it will also exit. 232(adjustable with $Coro::POOL_RSS) it will also exit.
233 233
234=cut 234=cut
235 235
236our $POOL_SIZE = 8; 236our $POOL_SIZE = 8;
237our $MAX_POOL_RSS = 16 * 1024; 237our $POOL_RSS = 16 * 1024;
238our @pool; 238our @async_pool;
239 239
240sub pool_handler { 240sub pool_handler {
241 my $cb;
242
241 while () { 243 while () {
242 $current->{desc} = "[async_pool]";
243
244 eval { 244 eval {
245 my ($cb, @arg) = @{ delete $current->{_invoke} or return }; 245 while () {
246 $cb->(@arg); 246 _pool_1 $cb;
247 &$cb;
248 _pool_2 $cb;
249 &schedule;
250 }
247 }; 251 };
252
253 last if $@ eq "\3terminate\2\n";
248 warn $@ if $@; 254 warn $@ if $@;
249
250 last if @pool >= $POOL_SIZE || $current->rss >= $MAX_POOL_RSS;
251
252 push @pool, $current;
253 $current->{desc} = "[async_pool idle]";
254 $current->save (Coro::State::SAVE_DEF);
255 $current->prio (0);
256 schedule;
257 } 255 }
258} 256}
259 257
260sub async_pool(&@) { 258sub async_pool(&@) {
261 # this is also inlined into the unlock_scheduler 259 # this is also inlined into the unlock_scheduler
262 my $coro = (pop @pool) || new Coro \&pool_handler;; 260 my $coro = (pop @async_pool) || new Coro \&pool_handler;
263 261
264 $coro->{_invoke} = [@_]; 262 $coro->{_invoke} = [@_];
265 $coro->ready; 263 $coro->ready;
266 264
267 $coro 265 $coro
537# inside an event callback. 535# inside an event callback.
538our $unblock_scheduler = new Coro sub { 536our $unblock_scheduler = new Coro sub {
539 while () { 537 while () {
540 while (my $cb = pop @unblock_queue) { 538 while (my $cb = pop @unblock_queue) {
541 # this is an inlined copy of async_pool 539 # this is an inlined copy of async_pool
542 my $coro = (pop @pool or new Coro \&pool_handler); 540 my $coro = (pop @async_pool) || new Coro \&pool_handler;
543 541
544 $coro->{_invoke} = $cb; 542 $coro->{_invoke} = $cb;
545 $coro->ready; 543 $coro->ready;
546 cede; # for short-lived callbacks, this reduces pressure on the coro pool 544 cede; # for short-lived callbacks, this reduces pressure on the coro pool
547 } 545 }

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines