ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/cvsroot/Coro/Coro.pm
(Generate patch)

Comparing cvsroot/Coro/Coro.pm (file contents):
Revision 1.130 by root, Thu Sep 20 12:02:25 2007 UTC vs.
Revision 1.139 by root, Thu Sep 27 15:52:30 2007 UTC

50 50
51our $idle; # idle handler 51our $idle; # idle handler
52our $main; # main coroutine 52our $main; # main coroutine
53our $current; # current coroutine 53our $current; # current coroutine
54 54
55our $VERSION = '3.7'; 55our $VERSION = '3.8';
56 56
57our @EXPORT = qw(async async_pool cede schedule terminate current unblock_sub); 57our @EXPORT = qw(async async_pool cede schedule terminate current unblock_sub);
58our %EXPORT_TAGS = ( 58our %EXPORT_TAGS = (
59 prio => [qw(PRIO_MAX PRIO_HIGH PRIO_NORMAL PRIO_LOW PRIO_IDLE PRIO_MIN)], 59 prio => [qw(PRIO_MAX PRIO_HIGH PRIO_NORMAL PRIO_LOW PRIO_IDLE PRIO_MIN)],
60); 60);
113reasons. If performance is not essential you are encouraged to use the 113reasons. If performance is not essential you are encouraged to use the
114C<Coro::current> function instead. 114C<Coro::current> function instead.
115 115
116=cut 116=cut
117 117
118$main->{desc} = "[main::]";
119
118# maybe some other module used Coro::Specific before... 120# maybe some other module used Coro::Specific before...
119$main->{specific} = $current->{specific} 121$main->{specific} = $current->{specific}
120 if $current; 122 if $current;
121 123
122_set_current $main; 124_set_current $main;
153 # call all destruction callbacks 155 # call all destruction callbacks
154 $_->(@{$self->{status}}) 156 $_->(@{$self->{status}})
155 for @{(delete $self->{destroy_cb}) || []}; 157 for @{(delete $self->{destroy_cb}) || []};
156} 158}
157 159
160sub _do_trace_sub {
161 &{$current->{_trace_sub_cb}}
162}
163
164sub _do_trace_line {
165 &{$current->{_trace_line_cb}}
166}
167
158# this coroutine is necessary because a coroutine 168# this coroutine is necessary because a coroutine
159# cannot destroy itself. 169# cannot destroy itself.
160my @destroy; 170my @destroy;
161my $manager; 171my $manager;
162 172
163$manager = new Coro sub { 173$manager = new Coro sub {
164 $current->desc ("[coro manager]");
165
166 while () { 174 while () {
167 (shift @destroy)->_cancel 175 (shift @destroy)->_cancel
168 while @destroy; 176 while @destroy;
169 177
170 &schedule; 178 &schedule;
171 } 179 }
172}; 180};
173 181$manager->desc ("[coro manager]");
174$manager->prio (PRIO_MAX); 182$manager->prio (PRIO_MAX);
175 183
176# static methods. not really. 184# static methods. not really.
177 185
178=back 186=back
224The pool size is limited to 8 idle coroutines (this can be adjusted by 232The pool size is limited to 8 idle coroutines (this can be adjusted by
225changing $Coro::POOL_SIZE), and there can be as many non-idle coros as 233changing $Coro::POOL_SIZE), and there can be as many non-idle coros as
226required. 234required.
227 235
228If you are concerned about pooled coroutines growing a lot because a 236If you are concerned about pooled coroutines growing a lot because a
229single C<async_pool> used a lot of stackspace you can e.g. C<async_pool { 237single C<async_pool> used a lot of stackspace you can e.g. C<async_pool
230terminate }> once per second or so to slowly replenish the pool. 238{ terminate }> once per second or so to slowly replenish the pool. In
239addition to that, when the stacks used by a handler grows larger than 16kb
240(adjustable with $Coro::POOL_RSS) it will also exit.
231 241
232=cut 242=cut
233 243
234our $POOL_SIZE = 8; 244our $POOL_SIZE = 8;
235our $MAX_POOL_RSS = 64 * 1024; 245our $POOL_RSS = 16 * 1024;
236our @pool; 246our @async_pool;
237 247
238sub pool_handler { 248sub pool_handler {
249 my $cb;
250
239 while () { 251 while () {
240 $current->{desc} = "[async_pool]";
241
242 eval { 252 eval {
243 my ($cb, @arg) = @{ delete $current->{_invoke} or return }; 253 while () {
244 $cb->(@arg); 254 _pool_1 $cb;
255 &$cb;
256 _pool_2 $cb;
257 &schedule;
258 }
245 }; 259 };
260
261 last if $@ eq "\3terminate\2\n";
246 warn $@ if $@; 262 warn $@ if $@;
247
248 last if @pool >= $POOL_SIZE || $current->rss >= $MAX_POOL_RSS;
249
250 push @pool, $current;
251 $current->{desc} = "[async_pool idle]";
252 $current->save (Coro::State::SAVE_DEF);
253 $current->prio (0);
254 schedule;
255 } 263 }
256} 264}
257 265
258sub async_pool(&@) { 266sub async_pool(&@) {
259 # this is also inlined into the unlock_scheduler 267 # this is also inlined into the unlock_scheduler
260 my $coro = (pop @pool) || new Coro \&pool_handler;; 268 my $coro = (pop @async_pool) || new Coro \&pool_handler;
261 269
262 $coro->{_invoke} = [@_]; 270 $coro->{_invoke} = [@_];
263 $coro->ready; 271 $coro->ready;
264 272
265 $coro 273 $coro
531 539
532# we create a special coro because we want to cede, 540# we create a special coro because we want to cede,
533# to reduce pressure on the coro pool (because most callbacks 541# to reduce pressure on the coro pool (because most callbacks
534# return immediately and can be reused) and because we cannot cede 542# return immediately and can be reused) and because we cannot cede
535# inside an event callback. 543# inside an event callback.
536our $unblock_scheduler = async { 544our $unblock_scheduler = new Coro sub {
537 $current->desc ("[unblock_sub scheduler]");
538 while () { 545 while () {
539 while (my $cb = pop @unblock_queue) { 546 while (my $cb = pop @unblock_queue) {
540 # this is an inlined copy of async_pool 547 # this is an inlined copy of async_pool
541 my $coro = (pop @pool or new Coro \&pool_handler); 548 my $coro = (pop @async_pool) || new Coro \&pool_handler;
542 549
543 $coro->{_invoke} = $cb; 550 $coro->{_invoke} = $cb;
544 $coro->ready; 551 $coro->ready;
545 cede; # for short-lived callbacks, this reduces pressure on the coro pool 552 cede; # for short-lived callbacks, this reduces pressure on the coro pool
546 } 553 }
547 schedule; # sleep well 554 schedule; # sleep well
548 } 555 }
549}; 556};
557$unblock_scheduler->desc ("[unblock_sub scheduler]");
550 558
551sub unblock_sub(&) { 559sub unblock_sub(&) {
552 my $cb = shift; 560 my $cb = shift;
553 561
554 sub { 562 sub {

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines