… | |
… | |
149 | for @{ delete $self->{_on_destroy} || [] }; |
149 | for @{ delete $self->{_on_destroy} || [] }; |
150 | } |
150 | } |
151 | |
151 | |
152 | # this coroutine is necessary because a coroutine |
152 | # this coroutine is necessary because a coroutine |
153 | # cannot destroy itself. |
153 | # cannot destroy itself. |
154 | my @destroy; |
154 | our @destroy; |
155 | my $manager; |
155 | our $manager; |
156 | |
156 | |
157 | $manager = new Coro sub { |
157 | $manager = new Coro sub { |
158 | while () { |
158 | while () { |
159 | (shift @destroy)->_cancel |
159 | (shift @destroy)->_cancel |
160 | while @destroy; |
160 | while @destroy; |
… | |
… | |
247 | our $POOL_SIZE = 8; |
247 | our $POOL_SIZE = 8; |
248 | our $POOL_RSS = 16 * 1024; |
248 | our $POOL_RSS = 16 * 1024; |
249 | our @async_pool; |
249 | our @async_pool; |
250 | |
250 | |
251 | sub pool_handler { |
251 | sub pool_handler { |
252 | my $cb; |
|
|
253 | |
|
|
254 | while () { |
252 | while () { |
255 | eval { |
253 | eval { |
256 | while () { |
254 | &{&_pool_handler} while 1; |
257 | _pool_1 $cb; |
|
|
258 | &$cb; |
|
|
259 | _pool_2 $cb; |
|
|
260 | &schedule; |
|
|
261 | } |
|
|
262 | }; |
255 | }; |
263 | |
256 | |
264 | if ($@) { |
|
|
265 | last if $@ eq "\3async_pool terminate\2\n"; |
|
|
266 | warn $@; |
257 | warn $@ if $@; |
267 | } |
|
|
268 | } |
258 | } |
269 | } |
|
|
270 | |
|
|
271 | sub async_pool(&@) { |
|
|
272 | # this is also inlined into the unblock_scheduler |
|
|
273 | my $coro = (pop @async_pool) || new Coro \&pool_handler; |
|
|
274 | |
|
|
275 | $coro->{_invoke} = [@_]; |
|
|
276 | $coro->ready; |
|
|
277 | |
|
|
278 | $coro |
|
|
279 | } |
259 | } |
280 | |
260 | |
281 | =back |
261 | =back |
282 | |
262 | |
283 | =head2 STATIC METHODS |
263 | =head2 STATIC METHODS |
… | |
… | |
340 | program calls this function, there will be some one-time resource leak. |
320 | program calls this function, there will be some one-time resource leak. |
341 | |
321 | |
342 | =cut |
322 | =cut |
343 | |
323 | |
344 | sub terminate { |
324 | sub terminate { |
345 | $current->cancel (@_); |
325 | $current->{_status} = [@_]; |
|
|
326 | push @destroy, $current; |
|
|
327 | $manager->ready; |
|
|
328 | do { &schedule } while 1; |
346 | } |
329 | } |
347 | |
330 | |
348 | sub killall { |
331 | sub killall { |
349 | for (Coro::State::list) { |
332 | for (Coro::State::list) { |
350 | $_->cancel |
333 | $_->cancel |
… | |
… | |
371 | See C<async> and C<Coro::State::new> for additional info about the |
354 | See C<async> and C<Coro::State::new> for additional info about the |
372 | coroutine environment. |
355 | coroutine environment. |
373 | |
356 | |
374 | =cut |
357 | =cut |
375 | |
358 | |
376 | sub _run_coro { |
359 | sub _terminate { |
377 | terminate &{+shift}; |
360 | terminate &{+shift}; |
378 | } |
|
|
379 | |
|
|
380 | sub new { |
|
|
381 | my $class = shift; |
|
|
382 | |
|
|
383 | $class->SUPER::new (\&_run_coro, @_) |
|
|
384 | } |
361 | } |
385 | |
362 | |
386 | =item $success = $coroutine->ready |
363 | =item $success = $coroutine->ready |
387 | |
364 | |
388 | Put the given coroutine into the end of its ready queue (there is one |
365 | Put the given coroutine into the end of its ready queue (there is one |
… | |
… | |
405 | |
382 | |
406 | =cut |
383 | =cut |
407 | |
384 | |
408 | sub cancel { |
385 | sub cancel { |
409 | my $self = shift; |
386 | my $self = shift; |
410 | $self->{_status} = [@_]; |
|
|
411 | |
387 | |
412 | if ($current == $self) { |
388 | if ($current == $self) { |
413 | push @destroy, $self; |
389 | terminate @_; |
414 | $manager->ready; |
|
|
415 | &schedule while 1; |
|
|
416 | } else { |
390 | } else { |
|
|
391 | $self->{_status} = [@_]; |
417 | $self->_cancel; |
392 | $self->_cancel; |
418 | } |
393 | } |
419 | } |
394 | } |
420 | |
395 | |
421 | =item $coroutine->throw ([$scalar]) |
396 | =item $coroutine->throw ([$scalar]) |
… | |
… | |
615 | # return immediately and can be reused) and because we cannot cede |
590 | # return immediately and can be reused) and because we cannot cede |
616 | # inside an event callback. |
591 | # inside an event callback. |
617 | our $unblock_scheduler = new Coro sub { |
592 | our $unblock_scheduler = new Coro sub { |
618 | while () { |
593 | while () { |
619 | while (my $cb = pop @unblock_queue) { |
594 | while (my $cb = pop @unblock_queue) { |
620 | # this is an inlined copy of async_pool |
595 | &async_pool (@$cb); |
621 | my $coro = (pop @async_pool) || new Coro \&pool_handler; |
|
|
622 | |
596 | |
623 | $coro->{_invoke} = $cb; |
|
|
624 | $coro->ready; |
|
|
625 | cede; # for short-lived callbacks, this reduces pressure on the coro pool |
597 | # for short-lived callbacks, this reduces pressure on the coro pool |
|
|
598 | # as the chance is very high that the async_poll coro will be back |
|
|
599 | # in the idle state when cede returns |
|
|
600 | cede; |
626 | } |
601 | } |
627 | schedule; # sleep well |
602 | schedule; # sleep well |
628 | } |
603 | } |
629 | }; |
604 | }; |
630 | $unblock_scheduler->{desc} = "[unblock_sub scheduler]"; |
605 | $unblock_scheduler->{desc} = "[unblock_sub scheduler]"; |