ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/Coro/Coro.pm
(Generate patch)

Comparing Coro/Coro.pm (file contents):
Revision 1.125 by root, Fri Apr 27 19:35:58 2007 UTC vs.
Revision 1.130 by root, Thu Sep 20 12:02:25 2007 UTC

50 50
51our $idle; # idle handler 51our $idle; # idle handler
52our $main; # main coroutine 52our $main; # main coroutine
53our $current; # current coroutine 53our $current; # current coroutine
54 54
55our $VERSION = '3.62'; 55our $VERSION = '3.7';
56 56
57our @EXPORT = qw(async async_pool cede schedule terminate current unblock_sub); 57our @EXPORT = qw(async async_pool cede schedule terminate current unblock_sub);
58our %EXPORT_TAGS = ( 58our %EXPORT_TAGS = (
59 prio => [qw(PRIO_MAX PRIO_HIGH PRIO_NORMAL PRIO_LOW PRIO_IDLE PRIO_MIN)], 59 prio => [qw(PRIO_MAX PRIO_HIGH PRIO_NORMAL PRIO_LOW PRIO_IDLE PRIO_MIN)],
60); 60);
159# cannot destroy itself. 159# cannot destroy itself.
160my @destroy; 160my @destroy;
161my $manager; 161my $manager;
162 162
163$manager = new Coro sub { 163$manager = new Coro sub {
164 $current->desc ("[coro manager]");
165
164 while () { 166 while () {
165 (shift @destroy)->_cancel 167 (shift @destroy)->_cancel
166 while @destroy; 168 while @destroy;
167 169
168 &schedule; 170 &schedule;
228terminate }> once per second or so to slowly replenish the pool. 230terminate }> once per second or so to slowly replenish the pool.
229 231
230=cut 232=cut
231 233
232our $POOL_SIZE = 8; 234our $POOL_SIZE = 8;
235our $MAX_POOL_RSS = 64 * 1024;
233our @pool; 236our @pool;
234 237
235sub pool_handler { 238sub pool_handler {
236 while () { 239 while () {
240 $current->{desc} = "[async_pool]";
241
237 eval { 242 eval {
238 my ($cb, @arg) = @{ delete $current->{_invoke} or return }; 243 my ($cb, @arg) = @{ delete $current->{_invoke} or return };
239 $cb->(@arg); 244 $cb->(@arg);
240 }; 245 };
241 warn $@ if $@; 246 warn $@ if $@;
242 247
243 last if @pool >= $POOL_SIZE; 248 last if @pool >= $POOL_SIZE || $current->rss >= $MAX_POOL_RSS;
249
244 push @pool, $current; 250 push @pool, $current;
245 251 $current->{desc} = "[async_pool idle]";
246 $current->save (Coro::State::SAVE_DEF); 252 $current->save (Coro::State::SAVE_DEF);
247 $current->prio (0); 253 $current->prio (0);
248 schedule; 254 schedule;
249 } 255 }
250} 256}
251 257
252sub async_pool(&@) { 258sub async_pool(&@) {
253 # this is also inlined into the unlock_scheduler 259 # this is also inlined into the unlock_scheduler
254 my $coro = (pop @pool or new Coro \&pool_handler); 260 my $coro = (pop @pool) || new Coro \&pool_handler;;
255 261
256 $coro->{_invoke} = [@_]; 262 $coro->{_invoke} = [@_];
257 $coro->ready; 263 $coro->ready;
258 264
259 $coro 265 $coro
526# we create a special coro because we want to cede, 532# we create a special coro because we want to cede,
527# to reduce pressure on the coro pool (because most callbacks 533# to reduce pressure on the coro pool (because most callbacks
528# return immediately and can be reused) and because we cannot cede 534# return immediately and can be reused) and because we cannot cede
529# inside an event callback. 535# inside an event callback.
530our $unblock_scheduler = async { 536our $unblock_scheduler = async {
537 $current->desc ("[unblock_sub scheduler]");
531 while () { 538 while () {
532 while (my $cb = pop @unblock_queue) { 539 while (my $cb = pop @unblock_queue) {
533 # this is an inlined copy of async_pool 540 # this is an inlined copy of async_pool
534 my $coro = (pop @pool or new Coro \&pool_handler); 541 my $coro = (pop @pool or new Coro \&pool_handler);
535 542

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines