ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/Coro/Coro.pm
(Generate patch)

Comparing Coro/Coro.pm (file contents):
Revision 1.128 by root, Wed Sep 19 21:39:15 2007 UTC vs.
Revision 1.131 by root, Thu Sep 20 12:24:42 2007 UTC

113reasons. If performance is not essential you are encouraged to use the 113reasons. If performance is not essential you are encouraged to use the
114C<Coro::current> function instead. 114C<Coro::current> function instead.
115 115
116=cut 116=cut
117 117
118$main->{desc} = "[main::]";
119
118# maybe some other module used Coro::Specific before... 120# maybe some other module used Coro::Specific before...
119$main->{specific} = $current->{specific} 121$main->{specific} = $current->{specific}
120 if $current; 122 if $current;
121 123
122_set_current $main; 124_set_current $main;
159# cannot destroy itself. 161# cannot destroy itself.
160my @destroy; 162my @destroy;
161my $manager; 163my $manager;
162 164
163$manager = new Coro sub { 165$manager = new Coro sub {
166 $current->desc ("[coro manager]");
167
164 while () { 168 while () {
165 (shift @destroy)->_cancel 169 (shift @destroy)->_cancel
166 while @destroy; 170 while @destroy;
167 171
168 &schedule; 172 &schedule;
228terminate }> once per second or so to slowly replenish the pool. 232terminate }> once per second or so to slowly replenish the pool.
229 233
230=cut 234=cut
231 235
232our $POOL_SIZE = 8; 236our $POOL_SIZE = 8;
237our $MAX_POOL_RSS = 64 * 1024;
233our @pool; 238our @pool;
234 239
235sub pool_handler { 240sub pool_handler {
236 while () { 241 while () {
242 $current->{desc} = "[async_pool]";
243
237 eval { 244 eval {
238 my ($cb, @arg) = @{ delete $current->{_invoke} or return }; 245 my ($cb, @arg) = @{ delete $current->{_invoke} or return };
239 $cb->(@arg); 246 $cb->(@arg);
240 }; 247 };
241 warn $@ if $@; 248 warn $@ if $@;
242 249
243 last if @pool >= $POOL_SIZE; 250 last if @pool >= $POOL_SIZE || $current->rss >= $MAX_POOL_RSS;
251
244 push @pool, $current; 252 push @pool, $current;
245 253 $current->{desc} = "[async_pool idle]";
246 $current->save (Coro::State::SAVE_DEF); 254 $current->save (Coro::State::SAVE_DEF);
247 $current->prio (0); 255 $current->prio (0);
248 schedule; 256 schedule;
249 } 257 }
250} 258}
251 259
252sub async_pool(&@) { 260sub async_pool(&@) {
253 # this is also inlined into the unlock_scheduler 261 # this is also inlined into the unlock_scheduler
254 my $coro = (pop @pool) || do {
255 my $coro = new Coro \&pool_handler; 262 my $coro = (pop @pool) || new Coro \&pool_handler;;
256 $coro->{desc} = "async_pool";
257 $coro
258 };
259 263
260 $coro->{_invoke} = [@_]; 264 $coro->{_invoke} = [@_];
261 $coro->ready; 265 $coro->ready;
262 266
263 $coro 267 $coro
530# we create a special coro because we want to cede, 534# we create a special coro because we want to cede,
531# to reduce pressure on the coro pool (because most callbacks 535# to reduce pressure on the coro pool (because most callbacks
532# return immediately and can be reused) and because we cannot cede 536# return immediately and can be reused) and because we cannot cede
533# inside an event callback. 537# inside an event callback.
534our $unblock_scheduler = async { 538our $unblock_scheduler = async {
539 $current->desc ("[unblock_sub scheduler]");
535 while () { 540 while () {
536 while (my $cb = pop @unblock_queue) { 541 while (my $cb = pop @unblock_queue) {
537 # this is an inlined copy of async_pool 542 # this is an inlined copy of async_pool
538 my $coro = (pop @pool or new Coro \&pool_handler); 543 my $coro = (pop @pool or new Coro \&pool_handler);
539 544

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines