ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/Coro/Coro.pm
(Generate patch)

Comparing Coro/Coro.pm (file contents):
Revision 1.131 by root, Thu Sep 20 12:24:42 2007 UTC vs.
Revision 1.145 by root, Wed Oct 3 16:03:17 2007 UTC

50 50
51our $idle; # idle handler 51our $idle; # idle handler
52our $main; # main coroutine 52our $main; # main coroutine
53our $current; # current coroutine 53our $current; # current coroutine
54 54
55our $VERSION = '3.7'; 55our $VERSION = '4.0';
56 56
57our @EXPORT = qw(async async_pool cede schedule terminate current unblock_sub); 57our @EXPORT = qw(async async_pool cede schedule terminate current unblock_sub);
58our %EXPORT_TAGS = ( 58our %EXPORT_TAGS = (
59 prio => [qw(PRIO_MAX PRIO_HIGH PRIO_NORMAL PRIO_LOW PRIO_IDLE PRIO_MIN)], 59 prio => [qw(PRIO_MAX PRIO_HIGH PRIO_NORMAL PRIO_LOW PRIO_IDLE PRIO_MIN)],
60); 60);
116=cut 116=cut
117 117
118$main->{desc} = "[main::]"; 118$main->{desc} = "[main::]";
119 119
120# maybe some other module used Coro::Specific before... 120# maybe some other module used Coro::Specific before...
121$main->{specific} = $current->{specific} 121$main->{_specific} = $current->{_specific}
122 if $current; 122 if $current;
123 123
124_set_current $main; 124_set_current $main;
125 125
126sub current() { $current } 126sub current() { $current }
151 # free coroutine data and mark as destructed 151 # free coroutine data and mark as destructed
152 $self->_destroy 152 $self->_destroy
153 or return; 153 or return;
154 154
155 # call all destruction callbacks 155 # call all destruction callbacks
156 $_->(@{$self->{status}}) 156 $_->(@{$self->{_status}})
157 for @{(delete $self->{destroy_cb}) || []}; 157 for @{(delete $self->{_on_destroy}) || []};
158} 158}
159 159
160# this coroutine is necessary because a coroutine 160# this coroutine is necessary because a coroutine
161# cannot destroy itself. 161# cannot destroy itself.
162my @destroy; 162my @destroy;
163my $manager; 163my $manager;
164 164
165$manager = new Coro sub { 165$manager = new Coro sub {
166 $current->desc ("[coro manager]");
167
168 while () { 166 while () {
169 (shift @destroy)->_cancel 167 (shift @destroy)->_cancel
170 while @destroy; 168 while @destroy;
171 169
172 &schedule; 170 &schedule;
173 } 171 }
174}; 172};
175 173$manager->desc ("[coro manager]");
176$manager->prio (PRIO_MAX); 174$manager->prio (PRIO_MAX);
177 175
178# static methods. not really. 176# static methods. not really.
179 177
180=back 178=back
188=item async { ... } [@args...] 186=item async { ... } [@args...]
189 187
190Create a new asynchronous coroutine and return it's coroutine object 188Create a new asynchronous coroutine and return it's coroutine object
191(usually unused). When the sub returns the new coroutine is automatically 189(usually unused). When the sub returns the new coroutine is automatically
192terminated. 190terminated.
191
192See the C<Coro::State::new> constructor for info about the coroutine
193environment.
193 194
194Calling C<exit> in a coroutine will do the same as calling exit outside 195Calling C<exit> in a coroutine will do the same as calling exit outside
195the coroutine. Likewise, when the coroutine dies, the program will exit, 196the coroutine. Likewise, when the coroutine dies, the program will exit,
196just as it would in the main program. 197just as it would in the main program.
197 198
226The pool size is limited to 8 idle coroutines (this can be adjusted by 227The pool size is limited to 8 idle coroutines (this can be adjusted by
227changing $Coro::POOL_SIZE), and there can be as many non-idle coros as 228changing $Coro::POOL_SIZE), and there can be as many non-idle coros as
228required. 229required.
229 230
230If you are concerned about pooled coroutines growing a lot because a 231If you are concerned about pooled coroutines growing a lot because a
231single C<async_pool> used a lot of stackspace you can e.g. C<async_pool { 232single C<async_pool> used a lot of stackspace you can e.g. C<async_pool
232terminate }> once per second or so to slowly replenish the pool. 233{ terminate }> once per second or so to slowly replenish the pool. In
234addition to that, when the stacks used by a handler grows larger than 16kb
235(adjustable with $Coro::POOL_RSS) it will also exit.
233 236
234=cut 237=cut
235 238
236our $POOL_SIZE = 8; 239our $POOL_SIZE = 8;
237our $MAX_POOL_RSS = 64 * 1024; 240our $POOL_RSS = 16 * 1024;
238our @pool; 241our @async_pool;
239 242
240sub pool_handler { 243sub pool_handler {
244 my $cb;
245
241 while () { 246 while () {
242 $current->{desc} = "[async_pool]";
243
244 eval { 247 eval {
245 my ($cb, @arg) = @{ delete $current->{_invoke} or return }; 248 while () {
246 $cb->(@arg); 249 _pool_1 $cb;
250 &$cb;
251 _pool_2 $cb;
252 &schedule;
253 }
247 }; 254 };
255
256 last if $@ eq "\3terminate\2\n";
248 warn $@ if $@; 257 warn $@ if $@;
249
250 last if @pool >= $POOL_SIZE || $current->rss >= $MAX_POOL_RSS;
251
252 push @pool, $current;
253 $current->{desc} = "[async_pool idle]";
254 $current->save (Coro::State::SAVE_DEF);
255 $current->prio (0);
256 schedule;
257 } 258 }
258} 259}
259 260
260sub async_pool(&@) { 261sub async_pool(&@) {
261 # this is also inlined into the unlock_scheduler 262 # this is also inlined into the unlock_scheduler
262 my $coro = (pop @pool) || new Coro \&pool_handler;; 263 my $coro = (pop @async_pool) || new Coro \&pool_handler;
263 264
264 $coro->{_invoke} = [@_]; 265 $coro->{_invoke} = [@_];
265 $coro->ready; 266 $coro->ready;
266 267
267 $coro 268 $coro
310 311
311=item terminate [arg...] 312=item terminate [arg...]
312 313
313Terminates the current coroutine with the given status values (see L<cancel>). 314Terminates the current coroutine with the given status values (see L<cancel>).
314 315
316=item killall
317
318Kills/terminates/cancels all coroutines except the currently running
319one. This is useful after a fork, either in the child or the parent, as
320usually only one of them should inherit the running coroutines.
321
315=cut 322=cut
316 323
317sub terminate { 324sub terminate {
318 $current->cancel (@_); 325 $current->cancel (@_);
326}
327
328sub killall {
329 for (Coro::State::list) {
330 $_->cancel
331 if $_ != $current && UNIVERSAL::isa $_, "Coro";
332 }
319} 333}
320 334
321=back 335=back
322 336
323# dynamic methods 337# dynamic methods
333Create a new coroutine and return it. When the sub returns the coroutine 347Create a new coroutine and return it. When the sub returns the coroutine
334automatically terminates as if C<terminate> with the returned values were 348automatically terminates as if C<terminate> with the returned values were
335called. To make the coroutine run you must first put it into the ready queue 349called. To make the coroutine run you must first put it into the ready queue
336by calling the ready method. 350by calling the ready method.
337 351
338See C<async> for additional discussion. 352See C<async> and C<Coro::State::new> for additional info about the
353coroutine environment.
339 354
340=cut 355=cut
341 356
342sub _run_coro { 357sub _run_coro {
343 terminate &{+shift}; 358 terminate &{+shift};
367 382
368=cut 383=cut
369 384
370sub cancel { 385sub cancel {
371 my $self = shift; 386 my $self = shift;
372 $self->{status} = [@_]; 387 $self->{_status} = [@_];
373 388
374 if ($current == $self) { 389 if ($current == $self) {
375 push @destroy, $self; 390 push @destroy, $self;
376 $manager->ready; 391 $manager->ready;
377 &schedule while 1; 392 &schedule while 1;
381} 396}
382 397
383=item $coroutine->join 398=item $coroutine->join
384 399
385Wait until the coroutine terminates and return any values given to the 400Wait until the coroutine terminates and return any values given to the
386C<terminate> or C<cancel> functions. C<join> can be called multiple times 401C<terminate> or C<cancel> functions. C<join> can be called concurrently
387from multiple coroutine. 402from multiple coroutines.
388 403
389=cut 404=cut
390 405
391sub join { 406sub join {
392 my $self = shift; 407 my $self = shift;
393 408
394 unless ($self->{status}) { 409 unless ($self->{_status}) {
395 my $current = $current; 410 my $current = $current;
396 411
397 push @{$self->{destroy_cb}}, sub { 412 push @{$self->{_on_destroy}}, sub {
398 $current->ready; 413 $current->ready;
399 undef $current; 414 undef $current;
400 }; 415 };
401 416
402 &schedule while $current; 417 &schedule while $current;
403 } 418 }
404 419
405 wantarray ? @{$self->{status}} : $self->{status}[0]; 420 wantarray ? @{$self->{_status}} : $self->{_status}[0];
406} 421}
407 422
408=item $coroutine->on_destroy (\&cb) 423=item $coroutine->on_destroy (\&cb)
409 424
410Registers a callback that is called when this coroutine gets destroyed, 425Registers a callback that is called when this coroutine gets destroyed,
414=cut 429=cut
415 430
416sub on_destroy { 431sub on_destroy {
417 my ($self, $cb) = @_; 432 my ($self, $cb) = @_;
418 433
419 push @{ $self->{destroy_cb} }, $cb; 434 push @{ $self->{_on_destroy} }, $cb;
420} 435}
421 436
422=item $oldprio = $coroutine->prio ($newprio) 437=item $oldprio = $coroutine->prio ($newprio)
423 438
424Sets (or gets, if the argument is missing) the priority of the 439Sets (or gets, if the argument is missing) the priority of the
448 463
449=item $olddesc = $coroutine->desc ($newdesc) 464=item $olddesc = $coroutine->desc ($newdesc)
450 465
451Sets (or gets in case the argument is missing) the description for this 466Sets (or gets in case the argument is missing) the description for this
452coroutine. This is just a free-form string you can associate with a coroutine. 467coroutine. This is just a free-form string you can associate with a coroutine.
468
469This method simply sets the C<< $coroutine->{desc} >> member to the given string. You
470can modify this member directly if you wish.
453 471
454=cut 472=cut
455 473
456sub desc { 474sub desc {
457 my $old = $_[0]{desc}; 475 my $old = $_[0]{desc};
533 551
534# we create a special coro because we want to cede, 552# we create a special coro because we want to cede,
535# to reduce pressure on the coro pool (because most callbacks 553# to reduce pressure on the coro pool (because most callbacks
536# return immediately and can be reused) and because we cannot cede 554# return immediately and can be reused) and because we cannot cede
537# inside an event callback. 555# inside an event callback.
538our $unblock_scheduler = async { 556our $unblock_scheduler = new Coro sub {
539 $current->desc ("[unblock_sub scheduler]");
540 while () { 557 while () {
541 while (my $cb = pop @unblock_queue) { 558 while (my $cb = pop @unblock_queue) {
542 # this is an inlined copy of async_pool 559 # this is an inlined copy of async_pool
543 my $coro = (pop @pool or new Coro \&pool_handler); 560 my $coro = (pop @async_pool) || new Coro \&pool_handler;
544 561
545 $coro->{_invoke} = $cb; 562 $coro->{_invoke} = $cb;
546 $coro->ready; 563 $coro->ready;
547 cede; # for short-lived callbacks, this reduces pressure on the coro pool 564 cede; # for short-lived callbacks, this reduces pressure on the coro pool
548 } 565 }
549 schedule; # sleep well 566 schedule; # sleep well
550 } 567 }
551}; 568};
569$unblock_scheduler->desc ("[unblock_sub scheduler]");
552 570
553sub unblock_sub(&) { 571sub unblock_sub(&) {
554 my $cb = shift; 572 my $cb = shift;
555 573
556 sub { 574 sub {

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines