ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/Coro/Coro.pm
(Generate patch)

Comparing Coro/Coro.pm (file contents):
Revision 1.128 by root, Wed Sep 19 21:39:15 2007 UTC vs.
Revision 1.141 by root, Tue Oct 2 10:38:17 2007 UTC

50 50
51our $idle; # idle handler 51our $idle; # idle handler
52our $main; # main coroutine 52our $main; # main coroutine
53our $current; # current coroutine 53our $current; # current coroutine
54 54
55our $VERSION = '3.7'; 55our $VERSION = '3.8';
56 56
57our @EXPORT = qw(async async_pool cede schedule terminate current unblock_sub); 57our @EXPORT = qw(async async_pool cede schedule terminate current unblock_sub);
58our %EXPORT_TAGS = ( 58our %EXPORT_TAGS = (
59 prio => [qw(PRIO_MAX PRIO_HIGH PRIO_NORMAL PRIO_LOW PRIO_IDLE PRIO_MIN)], 59 prio => [qw(PRIO_MAX PRIO_HIGH PRIO_NORMAL PRIO_LOW PRIO_IDLE PRIO_MIN)],
60); 60);
113reasons. If performance is not essential you are encouraged to use the 113reasons. If performance is not essential you are encouraged to use the
114C<Coro::current> function instead. 114C<Coro::current> function instead.
115 115
116=cut 116=cut
117 117
118$main->{desc} = "[main::]";
119
118# maybe some other module used Coro::Specific before... 120# maybe some other module used Coro::Specific before...
119$main->{specific} = $current->{specific} 121$main->{specific} = $current->{specific}
120 if $current; 122 if $current;
121 123
122_set_current $main; 124_set_current $main;
166 while @destroy; 168 while @destroy;
167 169
168 &schedule; 170 &schedule;
169 } 171 }
170}; 172};
171 173$manager->desc ("[coro manager]");
172$manager->prio (PRIO_MAX); 174$manager->prio (PRIO_MAX);
173 175
174# static methods. not really. 176# static methods. not really.
175 177
176=back 178=back
222The pool size is limited to 8 idle coroutines (this can be adjusted by 224The pool size is limited to 8 idle coroutines (this can be adjusted by
223changing $Coro::POOL_SIZE), and there can be as many non-idle coros as 225changing $Coro::POOL_SIZE), and there can be as many non-idle coros as
224required. 226required.
225 227
226If you are concerned about pooled coroutines growing a lot because a 228If you are concerned about pooled coroutines growing a lot because a
227single C<async_pool> used a lot of stackspace you can e.g. C<async_pool { 229single C<async_pool> used a lot of stackspace you can e.g. C<async_pool
228terminate }> once per second or so to slowly replenish the pool. 230{ terminate }> once per second or so to slowly replenish the pool. In
231addition to that, when the stacks used by a handler grows larger than 16kb
232(adjustable with $Coro::POOL_RSS) it will also exit.
229 233
230=cut 234=cut
231 235
232our $POOL_SIZE = 8; 236our $POOL_SIZE = 8;
237our $POOL_RSS = 16 * 1024;
233our @pool; 238our @async_pool;
234 239
235sub pool_handler { 240sub pool_handler {
241 my $cb;
242
236 while () { 243 while () {
237 eval { 244 eval {
238 my ($cb, @arg) = @{ delete $current->{_invoke} or return }; 245 while () {
239 $cb->(@arg); 246 _pool_1 $cb;
247 &$cb;
248 _pool_2 $cb;
249 &schedule;
250 }
240 }; 251 };
252
253 last if $@ eq "\3terminate\2\n";
241 warn $@ if $@; 254 warn $@ if $@;
242
243 last if @pool >= $POOL_SIZE;
244 push @pool, $current;
245
246 $current->save (Coro::State::SAVE_DEF);
247 $current->prio (0);
248 schedule;
249 } 255 }
250} 256}
251 257
252sub async_pool(&@) { 258sub async_pool(&@) {
253 # this is also inlined into the unlock_scheduler 259 # this is also inlined into the unlock_scheduler
254 my $coro = (pop @pool) || do {
255 my $coro = new Coro \&pool_handler; 260 my $coro = (pop @async_pool) || new Coro \&pool_handler;
256 $coro->{desc} = "async_pool";
257 $coro
258 };
259 261
260 $coro->{_invoke} = [@_]; 262 $coro->{_invoke} = [@_];
261 $coro->ready; 263 $coro->ready;
262 264
263 $coro 265 $coro
306 308
307=item terminate [arg...] 309=item terminate [arg...]
308 310
309Terminates the current coroutine with the given status values (see L<cancel>). 311Terminates the current coroutine with the given status values (see L<cancel>).
310 312
313=item killall
314
315Kills/terminates/cancels all coroutines except the currently running
316one. This is useful after a fork, either in the child or the parent, as
317usually only one of them should inherit the running coroutines.
318
311=cut 319=cut
312 320
313sub terminate { 321sub terminate {
314 $current->cancel (@_); 322 $current->cancel (@_);
323}
324
325sub killall {
326 for (Coro::State::list) {
327 $_->cancel
328 if $_ != $current && UNIVERSAL::isa $_, "Coro";
329 }
315} 330}
316 331
317=back 332=back
318 333
319# dynamic methods 334# dynamic methods
529 544
530# we create a special coro because we want to cede, 545# we create a special coro because we want to cede,
531# to reduce pressure on the coro pool (because most callbacks 546# to reduce pressure on the coro pool (because most callbacks
532# return immediately and can be reused) and because we cannot cede 547# return immediately and can be reused) and because we cannot cede
533# inside an event callback. 548# inside an event callback.
534our $unblock_scheduler = async { 549our $unblock_scheduler = new Coro sub {
535 while () { 550 while () {
536 while (my $cb = pop @unblock_queue) { 551 while (my $cb = pop @unblock_queue) {
537 # this is an inlined copy of async_pool 552 # this is an inlined copy of async_pool
538 my $coro = (pop @pool or new Coro \&pool_handler); 553 my $coro = (pop @async_pool) || new Coro \&pool_handler;
539 554
540 $coro->{_invoke} = $cb; 555 $coro->{_invoke} = $cb;
541 $coro->ready; 556 $coro->ready;
542 cede; # for short-lived callbacks, this reduces pressure on the coro pool 557 cede; # for short-lived callbacks, this reduces pressure on the coro pool
543 } 558 }
544 schedule; # sleep well 559 schedule; # sleep well
545 } 560 }
546}; 561};
562$unblock_scheduler->desc ("[unblock_sub scheduler]");
547 563
548sub unblock_sub(&) { 564sub unblock_sub(&) {
549 my $cb = shift; 565 my $cb = shift;
550 566
551 sub { 567 sub {

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines