… | |
… | |
149 | for @{ delete $self->{_on_destroy} || [] }; |
149 | for @{ delete $self->{_on_destroy} || [] }; |
150 | } |
150 | } |
151 | |
151 | |
152 | # this coroutine is necessary because a coroutine |
152 | # this coroutine is necessary because a coroutine |
153 | # cannot destroy itself. |
153 | # cannot destroy itself. |
154 | my @destroy; |
154 | our @destroy; |
155 | my $manager; |
155 | our $manager; |
156 | |
156 | |
157 | $manager = new Coro sub { |
157 | $manager = new Coro sub { |
158 | while () { |
158 | while () { |
159 | (shift @destroy)->_cancel |
159 | (shift @destroy)->_cancel |
160 | while @destroy; |
160 | while @destroy; |
… | |
… | |
247 | our $POOL_SIZE = 8; |
247 | our $POOL_SIZE = 8; |
248 | our $POOL_RSS = 16 * 1024; |
248 | our $POOL_RSS = 16 * 1024; |
249 | our @async_pool; |
249 | our @async_pool; |
250 | |
250 | |
251 | sub pool_handler { |
251 | sub pool_handler { |
252 | my $cb; |
|
|
253 | |
|
|
254 | while () { |
252 | while () { |
255 | eval { |
253 | eval { |
256 | while () { |
254 | &{&_pool_handler} while 1; |
257 | _pool_1 $cb; |
|
|
258 | &$cb; |
|
|
259 | _pool_2 $cb; |
|
|
260 | &schedule; |
|
|
261 | } |
|
|
262 | }; |
255 | }; |
263 | |
256 | |
264 | if ($@) { |
|
|
265 | last if $@ eq "\3async_pool terminate\2\n"; |
|
|
266 | warn $@; |
257 | warn $@ if $@; |
267 | } |
|
|
268 | } |
258 | } |
269 | } |
|
|
270 | |
|
|
271 | sub async_pool(&@) { |
|
|
272 | # this is also inlined into the unblock_scheduler |
|
|
273 | my $coro = (pop @async_pool) || new Coro \&pool_handler; |
|
|
274 | |
|
|
275 | $coro->{_invoke} = [@_]; |
|
|
276 | $coro->ready; |
|
|
277 | |
|
|
278 | $coro |
|
|
279 | } |
259 | } |
280 | |
260 | |
281 | =back |
261 | =back |
282 | |
262 | |
283 | =head2 STATIC METHODS |
263 | =head2 STATIC METHODS |
… | |
… | |
340 | program calls this function, there will be some one-time resource leak. |
320 | program calls this function, there will be some one-time resource leak. |
341 | |
321 | |
342 | =cut |
322 | =cut |
343 | |
323 | |
344 | sub terminate { |
324 | sub terminate { |
345 | $current->cancel (@_); |
325 | $current->{_status} = [@_]; |
|
|
326 | push @destroy, $current; |
|
|
327 | $manager->ready; |
|
|
328 | do { &schedule } while 1; |
346 | } |
329 | } |
347 | |
330 | |
348 | sub killall { |
331 | sub killall { |
349 | for (Coro::State::list) { |
332 | for (Coro::State::list) { |
350 | $_->cancel |
333 | $_->cancel |
… | |
… | |
399 | |
382 | |
400 | =cut |
383 | =cut |
401 | |
384 | |
402 | sub cancel { |
385 | sub cancel { |
403 | my $self = shift; |
386 | my $self = shift; |
404 | $self->{_status} = [@_]; |
|
|
405 | |
387 | |
406 | if ($current == $self) { |
388 | if ($current == $self) { |
407 | push @destroy, $self; |
389 | terminate @_; |
408 | $manager->ready; |
|
|
409 | &schedule while 1; |
|
|
410 | } else { |
390 | } else { |
|
|
391 | $self->{_status} = [@_]; |
411 | $self->_cancel; |
392 | $self->_cancel; |
412 | } |
393 | } |
413 | } |
394 | } |
414 | |
395 | |
415 | =item $coroutine->throw ([$scalar]) |
396 | =item $coroutine->throw ([$scalar]) |
… | |
… | |
609 | # return immediately and can be reused) and because we cannot cede |
590 | # return immediately and can be reused) and because we cannot cede |
610 | # inside an event callback. |
591 | # inside an event callback. |
611 | our $unblock_scheduler = new Coro sub { |
592 | our $unblock_scheduler = new Coro sub { |
612 | while () { |
593 | while () { |
613 | while (my $cb = pop @unblock_queue) { |
594 | while (my $cb = pop @unblock_queue) { |
614 | # this is an inlined copy of async_pool |
595 | &async_pool (@$cb); |
615 | my $coro = (pop @async_pool) || new Coro \&pool_handler; |
|
|
616 | |
596 | |
617 | $coro->{_invoke} = $cb; |
|
|
618 | $coro->ready; |
|
|
619 | cede; # for short-lived callbacks, this reduces pressure on the coro pool |
597 | # for short-lived callbacks, this reduces pressure on the coro pool |
|
|
598 | # as the chance is very high that the async_poll coro will be back |
|
|
599 | # in the idle state when cede returns |
|
|
600 | cede; |
620 | } |
601 | } |
621 | schedule; # sleep well |
602 | schedule; # sleep well |
622 | } |
603 | } |
623 | }; |
604 | }; |
624 | $unblock_scheduler->{desc} = "[unblock_sub scheduler]"; |
605 | $unblock_scheduler->{desc} = "[unblock_sub scheduler]"; |