ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/cvsroot/Coro/Coro.pm
(Generate patch)

Comparing cvsroot/Coro/Coro.pm (file contents):
Revision 1.224 by root, Wed Nov 19 05:52:42 2008 UTC vs.
Revision 1.227 by root, Thu Nov 20 03:10:30 2008 UTC

149 for @{ delete $self->{_on_destroy} || [] }; 149 for @{ delete $self->{_on_destroy} || [] };
150} 150}
151 151
152# this coroutine is necessary because a coroutine 152# this coroutine is necessary because a coroutine
153# cannot destroy itself. 153# cannot destroy itself.
154my @destroy; 154our @destroy;
155my $manager; 155our $manager;
156 156
157$manager = new Coro sub { 157$manager = new Coro sub {
158 while () { 158 while () {
159 (shift @destroy)->_cancel 159 (shift @destroy)->_cancel
160 while @destroy; 160 while @destroy;
247our $POOL_SIZE = 8; 247our $POOL_SIZE = 8;
248our $POOL_RSS = 16 * 1024; 248our $POOL_RSS = 16 * 1024;
249our @async_pool; 249our @async_pool;
250 250
251sub pool_handler { 251sub pool_handler {
252 my $cb;
253
254 while () { 252 while () {
255 eval { 253 eval {
256 while () { 254 &{&_pool_handler} while 1;
257 _pool_1 $cb;
258 &$cb;
259 _pool_2 $cb;
260 &schedule;
261 }
262 }; 255 };
263 256
264 if ($@) {
265 last if $@ eq "\3async_pool terminate\2\n";
266 warn $@; 257 warn $@ if $@;
267 }
268 } 258 }
269}
270
271sub async_pool(&@) {
272 # this is also inlined into the unblock_scheduler
273 my $coro = (pop @async_pool) || new Coro \&pool_handler;
274
275 $coro->{_invoke} = [@_];
276 $coro->ready;
277
278 $coro
279} 259}
280 260
281=back 261=back
282 262
283=head2 STATIC METHODS 263=head2 STATIC METHODS
340program calls this function, there will be some one-time resource leak. 320program calls this function, there will be some one-time resource leak.
341 321
342=cut 322=cut
343 323
344sub terminate { 324sub terminate {
345 $current->cancel (@_); 325 $current->{_status} = [@_];
326 push @destroy, $current;
327 $manager->ready;
328 do { &schedule } while 1;
346} 329}
347 330
348sub killall { 331sub killall {
349 for (Coro::State::list) { 332 for (Coro::State::list) {
350 $_->cancel 333 $_->cancel
371See C<async> and C<Coro::State::new> for additional info about the 354See C<async> and C<Coro::State::new> for additional info about the
372coroutine environment. 355coroutine environment.
373 356
374=cut 357=cut
375 358
376sub _run_coro { 359sub _terminate {
377 terminate &{+shift}; 360 terminate &{+shift};
378}
379
380sub new {
381 my $class = shift;
382
383 $class->SUPER::new (\&_run_coro, @_)
384} 361}
385 362
386=item $success = $coroutine->ready 363=item $success = $coroutine->ready
387 364
388Put the given coroutine into the end of its ready queue (there is one 365Put the given coroutine into the end of its ready queue (there is one
405 382
406=cut 383=cut
407 384
408sub cancel { 385sub cancel {
409 my $self = shift; 386 my $self = shift;
410 $self->{_status} = [@_];
411 387
412 if ($current == $self) { 388 if ($current == $self) {
413 push @destroy, $self; 389 terminate @_;
414 $manager->ready;
415 &schedule while 1;
416 } else { 390 } else {
391 $self->{_status} = [@_];
417 $self->_cancel; 392 $self->_cancel;
418 } 393 }
419} 394}
420 395
421=item $coroutine->throw ([$scalar]) 396=item $coroutine->throw ([$scalar])
615# return immediately and can be reused) and because we cannot cede 590# return immediately and can be reused) and because we cannot cede
616# inside an event callback. 591# inside an event callback.
617our $unblock_scheduler = new Coro sub { 592our $unblock_scheduler = new Coro sub {
618 while () { 593 while () {
619 while (my $cb = pop @unblock_queue) { 594 while (my $cb = pop @unblock_queue) {
620 # this is an inlined copy of async_pool 595 &async_pool (@$cb);
621 my $coro = (pop @async_pool) || new Coro \&pool_handler;
622 596
623 $coro->{_invoke} = $cb;
624 $coro->ready;
625 cede; # for short-lived callbacks, this reduces pressure on the coro pool 597 # for short-lived callbacks, this reduces pressure on the coro pool
598 # as the chance is very high that the async_poll coro will be back
599 # in the idle state when cede returns
600 cede;
626 } 601 }
627 schedule; # sleep well 602 schedule; # sleep well
628 } 603 }
629}; 604};
630$unblock_scheduler->{desc} = "[unblock_sub scheduler]"; 605$unblock_scheduler->{desc} = "[unblock_sub scheduler]";

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines