ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/Coro/Coro.pm
(Generate patch)

Comparing Coro/Coro.pm (file contents):
Revision 1.225 by root, Wed Nov 19 15:29:57 2008 UTC vs.
Revision 1.227 by root, Thu Nov 20 03:10:30 2008 UTC

149 for @{ delete $self->{_on_destroy} || [] }; 149 for @{ delete $self->{_on_destroy} || [] };
150} 150}
151 151
152# this coroutine is necessary because a coroutine 152# this coroutine is necessary because a coroutine
153# cannot destroy itself. 153# cannot destroy itself.
154my @destroy; 154our @destroy;
155my $manager; 155our $manager;
156 156
157$manager = new Coro sub { 157$manager = new Coro sub {
158 while () { 158 while () {
159 (shift @destroy)->_cancel 159 (shift @destroy)->_cancel
160 while @destroy; 160 while @destroy;
247our $POOL_SIZE = 8; 247our $POOL_SIZE = 8;
248our $POOL_RSS = 16 * 1024; 248our $POOL_RSS = 16 * 1024;
249our @async_pool; 249our @async_pool;
250 250
251sub pool_handler { 251sub pool_handler {
252 my $cb;
253
254 while () { 252 while () {
255 eval { 253 eval {
256 while () { 254 &{&_pool_handler} while 1;
257 _pool_1 $cb;
258 &$cb;
259 _pool_2 $cb;
260 &schedule;
261 }
262 }; 255 };
263 256
264 if ($@) {
265 last if $@ eq "\3async_pool terminate\2\n";
266 warn $@; 257 warn $@ if $@;
267 }
268 } 258 }
269}
270
271sub async_pool(&@) {
272 # this is also inlined into the unblock_scheduler
273 my $coro = (pop @async_pool) || new Coro \&pool_handler;
274
275 $coro->{_invoke} = [@_];
276 $coro->ready;
277
278 $coro
279} 259}
280 260
281=back 261=back
282 262
283=head2 STATIC METHODS 263=head2 STATIC METHODS
340program calls this function, there will be some one-time resource leak. 320program calls this function, there will be some one-time resource leak.
341 321
342=cut 322=cut
343 323
344sub terminate { 324sub terminate {
345 $current->cancel (@_); 325 $current->{_status} = [@_];
326 push @destroy, $current;
327 $manager->ready;
328 do { &schedule } while 1;
346} 329}
347 330
348sub killall { 331sub killall {
349 for (Coro::State::list) { 332 for (Coro::State::list) {
350 $_->cancel 333 $_->cancel
399 382
400=cut 383=cut
401 384
402sub cancel { 385sub cancel {
403 my $self = shift; 386 my $self = shift;
404 $self->{_status} = [@_];
405 387
406 if ($current == $self) { 388 if ($current == $self) {
407 push @destroy, $self; 389 terminate @_;
408 $manager->ready;
409 &schedule while 1;
410 } else { 390 } else {
391 $self->{_status} = [@_];
411 $self->_cancel; 392 $self->_cancel;
412 } 393 }
413} 394}
414 395
415=item $coroutine->throw ([$scalar]) 396=item $coroutine->throw ([$scalar])
609# return immediately and can be reused) and because we cannot cede 590# return immediately and can be reused) and because we cannot cede
610# inside an event callback. 591# inside an event callback.
611our $unblock_scheduler = new Coro sub { 592our $unblock_scheduler = new Coro sub {
612 while () { 593 while () {
613 while (my $cb = pop @unblock_queue) { 594 while (my $cb = pop @unblock_queue) {
614 # this is an inlined copy of async_pool 595 &async_pool (@$cb);
615 my $coro = (pop @async_pool) || new Coro \&pool_handler;
616 596
617 $coro->{_invoke} = $cb;
618 $coro->ready;
619 cede; # for short-lived callbacks, this reduces pressure on the coro pool 597 # for short-lived callbacks, this reduces pressure on the coro pool
598 # as the chance is very high that the async_poll coro will be back
599 # in the idle state when cede returns
600 cede;
620 } 601 }
621 schedule; # sleep well 602 schedule; # sleep well
622 } 603 }
623}; 604};
624$unblock_scheduler->{desc} = "[unblock_sub scheduler]"; 605$unblock_scheduler->{desc} = "[unblock_sub scheduler]";

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines