ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/cvsroot/Coro/Coro.pm
(Generate patch)

Comparing cvsroot/Coro/Coro.pm (file contents):
Revision 1.131 by root, Thu Sep 20 12:24:42 2007 UTC vs.
Revision 1.135 by root, Sat Sep 22 22:39:15 2007 UTC

161# cannot destroy itself. 161# cannot destroy itself.
162my @destroy; 162my @destroy;
163my $manager; 163my $manager;
164 164
165$manager = new Coro sub { 165$manager = new Coro sub {
166 $current->desc ("[coro manager]");
167
168 while () { 166 while () {
169 (shift @destroy)->_cancel 167 (shift @destroy)->_cancel
170 while @destroy; 168 while @destroy;
171 169
172 &schedule; 170 &schedule;
173 } 171 }
174}; 172};
175 173$manager->desc ("[coro manager]");
176$manager->prio (PRIO_MAX); 174$manager->prio (PRIO_MAX);
177 175
178# static methods. not really. 176# static methods. not really.
179 177
180=back 178=back
226The pool size is limited to 8 idle coroutines (this can be adjusted by 224The pool size is limited to 8 idle coroutines (this can be adjusted by
227changing $Coro::POOL_SIZE), and there can be as many non-idle coros as 225changing $Coro::POOL_SIZE), and there can be as many non-idle coros as
228required. 226required.
229 227
230If you are concerned about pooled coroutines growing a lot because a 228If you are concerned about pooled coroutines growing a lot because a
231single C<async_pool> used a lot of stackspace you can e.g. C<async_pool { 229single C<async_pool> used a lot of stackspace you can e.g. C<async_pool
232terminate }> once per second or so to slowly replenish the pool. 230{ terminate }> once per second or so to slowly replenish the pool. In
231addition to that, when the stacks used by a handler grows larger than 16kb
232(adjustable with $Coro::POOL_RSS) it will also exit.
233 233
234=cut 234=cut
235 235
236our $POOL_SIZE = 8; 236our $POOL_SIZE = 8;
237our $MAX_POOL_RSS = 64 * 1024; 237our $POOL_RSS = 16 * 1024;
238our @pool; 238our @async_pool;
239 239
240sub pool_handler { 240sub pool_handler {
241 my $cb;
242
241 while () { 243 while () {
242 $current->{desc} = "[async_pool]";
243
244 eval { 244 eval {
245 my ($cb, @arg) = @{ delete $current->{_invoke} or return }; 245 while () {
246 $cb->(@arg); 246# &{&_pool_1 or &terminate}; # crashes, would be ~5% faster
247 $cb = &_pool_1
248 or &terminate;
249 &$cb;
250 undef $cb;
251 &terminate if &_pool_2;
252 &schedule;
253 }
247 }; 254 };
255
248 warn $@ if $@; 256 warn $@ if $@;
249
250 last if @pool >= $POOL_SIZE || $current->rss >= $MAX_POOL_RSS;
251
252 push @pool, $current;
253 $current->{desc} = "[async_pool idle]";
254 $current->save (Coro::State::SAVE_DEF);
255 $current->prio (0);
256 schedule;
257 } 257 }
258} 258}
259 259
260sub async_pool(&@) { 260sub async_pool(&@) {
261 # this is also inlined into the unlock_scheduler 261 # this is also inlined into the unlock_scheduler
262 my $coro = (pop @pool) || new Coro \&pool_handler;; 262 my $coro = (pop @async_pool) || new Coro \&pool_handler;
263 263
264 $coro->{_invoke} = [@_]; 264 $coro->{_invoke} = [@_];
265 $coro->ready; 265 $coro->ready;
266 266
267 $coro 267 $coro
533 533
534# we create a special coro because we want to cede, 534# we create a special coro because we want to cede,
535# to reduce pressure on the coro pool (because most callbacks 535# to reduce pressure on the coro pool (because most callbacks
536# return immediately and can be reused) and because we cannot cede 536# return immediately and can be reused) and because we cannot cede
537# inside an event callback. 537# inside an event callback.
538our $unblock_scheduler = async { 538our $unblock_scheduler = new Coro sub {
539 $current->desc ("[unblock_sub scheduler]");
540 while () { 539 while () {
541 while (my $cb = pop @unblock_queue) { 540 while (my $cb = pop @unblock_queue) {
542 # this is an inlined copy of async_pool 541 # this is an inlined copy of async_pool
543 my $coro = (pop @pool or new Coro \&pool_handler); 542 my $coro = (pop @async_pool) || new Coro \&pool_handler;
544 543
545 $coro->{_invoke} = $cb; 544 $coro->{_invoke} = $cb;
546 $coro->ready; 545 $coro->ready;
547 cede; # for short-lived callbacks, this reduces pressure on the coro pool 546 cede; # for short-lived callbacks, this reduces pressure on the coro pool
548 } 547 }
549 schedule; # sleep well 548 schedule; # sleep well
550 } 549 }
551}; 550};
551$unblock_scheduler->desc ("[unblock_sub scheduler]");
552 552
553sub unblock_sub(&) { 553sub unblock_sub(&) {
554 my $cb = shift; 554 my $cb = shift;
555 555
556 sub { 556 sub {

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines