… | |
… | |
159 | # cannot destroy itself. |
159 | # cannot destroy itself. |
160 | my @destroy; |
160 | my @destroy; |
161 | my $manager; |
161 | my $manager; |
162 | |
162 | |
163 | $manager = new Coro sub { |
163 | $manager = new Coro sub { |
|
|
164 | $current->desc ("[coro manager]"); |
|
|
165 | |
164 | while () { |
166 | while () { |
165 | (shift @destroy)->_cancel |
167 | (shift @destroy)->_cancel |
166 | while @destroy; |
168 | while @destroy; |
167 | |
169 | |
168 | &schedule; |
170 | &schedule; |
… | |
… | |
232 | our $POOL_SIZE = 8; |
234 | our $POOL_SIZE = 8; |
233 | our @pool; |
235 | our @pool; |
234 | |
236 | |
235 | sub pool_handler { |
237 | sub pool_handler { |
236 | while () { |
238 | while () { |
|
|
239 | $current->{desc} = "[async_pool]"; |
|
|
240 | |
237 | eval { |
241 | eval { |
238 | my ($cb, @arg) = @{ delete $current->{_invoke} or return }; |
242 | my ($cb, @arg) = @{ delete $current->{_invoke} or return }; |
239 | $cb->(@arg); |
243 | $cb->(@arg); |
240 | }; |
244 | }; |
241 | warn $@ if $@; |
245 | warn $@ if $@; |
242 | |
246 | |
243 | last if @pool >= $POOL_SIZE; |
247 | last if @pool >= $POOL_SIZE; |
|
|
248 | |
244 | push @pool, $current; |
249 | push @pool, $current; |
245 | |
250 | $current->{desc} = "[async_pool idle]"; |
246 | $current->save (Coro::State::SAVE_DEF); |
251 | $current->save (Coro::State::SAVE_DEF); |
247 | $current->prio (0); |
252 | $current->prio (0); |
248 | schedule; |
253 | schedule; |
249 | } |
254 | } |
250 | } |
255 | } |
251 | |
256 | |
252 | sub async_pool(&@) { |
257 | sub async_pool(&@) { |
253 | # this is also inlined into the unlock_scheduler |
258 | # this is also inlined into the unlock_scheduler |
254 | my $coro = (pop @pool) || do { |
|
|
255 | my $coro = new Coro \&pool_handler; |
259 | my $coro = (pop @pool) || new Coro \&pool_handler;; |
256 | $coro->{desc} = "async_pool"; |
|
|
257 | $coro |
|
|
258 | }; |
|
|
259 | |
260 | |
260 | $coro->{_invoke} = [@_]; |
261 | $coro->{_invoke} = [@_]; |
261 | $coro->ready; |
262 | $coro->ready; |
262 | |
263 | |
263 | $coro |
264 | $coro |
… | |
… | |
530 | # we create a special coro because we want to cede, |
531 | # we create a special coro because we want to cede, |
531 | # to reduce pressure on the coro pool (because most callbacks |
532 | # to reduce pressure on the coro pool (because most callbacks |
532 | # return immediately and can be reused) and because we cannot cede |
533 | # return immediately and can be reused) and because we cannot cede |
533 | # inside an event callback. |
534 | # inside an event callback. |
534 | our $unblock_scheduler = async { |
535 | our $unblock_scheduler = async { |
|
|
536 | $current->desc ("[unblock_sub scheduler]"); |
535 | while () { |
537 | while () { |
536 | while (my $cb = pop @unblock_queue) { |
538 | while (my $cb = pop @unblock_queue) { |
537 | # this is an inlined copy of async_pool |
539 | # this is an inlined copy of async_pool |
538 | my $coro = (pop @pool or new Coro \&pool_handler); |
540 | my $coro = (pop @pool or new Coro \&pool_handler); |
539 | |
541 | |