ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/Coro/Coro.pm
(Generate patch)

Comparing Coro/Coro.pm (file contents):
Revision 1.222 by root, Tue Nov 18 08:59:46 2008 UTC vs.
Revision 1.227 by root, Thu Nov 20 03:10:30 2008 UTC

149 for @{ delete $self->{_on_destroy} || [] }; 149 for @{ delete $self->{_on_destroy} || [] };
150} 150}
151 151
152# this coroutine is necessary because a coroutine 152# this coroutine is necessary because a coroutine
153# cannot destroy itself. 153# cannot destroy itself.
154my @destroy; 154our @destroy;
155my $manager; 155our $manager;
156 156
157$manager = new Coro sub { 157$manager = new Coro sub {
158 while () { 158 while () {
159 (shift @destroy)->_cancel 159 (shift @destroy)->_cancel
160 while @destroy; 160 while @destroy;
247our $POOL_SIZE = 8; 247our $POOL_SIZE = 8;
248our $POOL_RSS = 16 * 1024; 248our $POOL_RSS = 16 * 1024;
249our @async_pool; 249our @async_pool;
250 250
251sub pool_handler { 251sub pool_handler {
252 my $cb;
253
254 while () { 252 while () {
255 eval { 253 eval {
256 while () { 254 &{&_pool_handler} while 1;
257 _pool_1 $cb;
258 &$cb;
259 _pool_2 $cb;
260 &schedule;
261 }
262 }; 255 };
263 256
264 if ($@) {
265 last if $@ eq "\3async_pool terminate\2\n";
266 warn $@; 257 warn $@ if $@;
267 }
268 } 258 }
269}
270
271sub async_pool(&@) {
272 # this is also inlined into the unblock_scheduler
273 my $coro = (pop @async_pool) || new Coro \&pool_handler;
274
275 $coro->{_invoke} = [@_];
276 $coro->ready;
277
278 $coro
279} 259}
280 260
281=back 261=back
282 262
283=head2 STATIC METHODS 263=head2 STATIC METHODS
305>> on that once some event happens, and last you call C<schedule> to put 285>> on that once some event happens, and last you call C<schedule> to put
306yourself to sleep. Note that a lot of things can wake your coroutine up, 286yourself to sleep. Note that a lot of things can wake your coroutine up,
307so you need to check whether the event indeed happened, e.g. by storing the 287so you need to check whether the event indeed happened, e.g. by storing the
308status in a variable. 288status in a variable.
309 289
310The canonical way to wait on external events is this: 290See B<HOW TO WAIT FOR A CALLBACK>, below, for some ways to wait for callbacks.
311
312 {
313 # remember current coroutine
314 my $current = $Coro::current;
315
316 # register a hypothetical event handler
317 on_event_invoke sub {
318 # wake up sleeping coroutine
319 $current->ready;
320 undef $current;
321 };
322
323 # call schedule until event occurred.
324 # in case we are woken up for other reasons
325 # (current still defined), loop.
326 Coro::schedule while $current;
327 }
328 291
329=item cede 292=item cede
330 293
331"Cede" to other coroutines. This function puts the current coroutine into 294"Cede" to other coroutines. This function puts the current coroutine into
332the ready queue and calls C<schedule>, which has the effect of giving 295the ready queue and calls C<schedule>, which has the effect of giving
357program calls this function, there will be some one-time resource leak. 320program calls this function, there will be some one-time resource leak.
358 321
359=cut 322=cut
360 323
361sub terminate { 324sub terminate {
362 $current->cancel (@_); 325 $current->{_status} = [@_];
326 push @destroy, $current;
327 $manager->ready;
328 do { &schedule } while 1;
363} 329}
364 330
365sub killall { 331sub killall {
366 for (Coro::State::list) { 332 for (Coro::State::list) {
367 $_->cancel 333 $_->cancel
388See C<async> and C<Coro::State::new> for additional info about the 354See C<async> and C<Coro::State::new> for additional info about the
389coroutine environment. 355coroutine environment.
390 356
391=cut 357=cut
392 358
393sub _run_coro { 359sub _terminate {
394 terminate &{+shift}; 360 terminate &{+shift};
395}
396
397sub new {
398 my $class = shift;
399
400 $class->SUPER::new (\&_run_coro, @_)
401} 361}
402 362
403=item $success = $coroutine->ready 363=item $success = $coroutine->ready
404 364
405Put the given coroutine into the end of its ready queue (there is one 365Put the given coroutine into the end of its ready queue (there is one
422 382
423=cut 383=cut
424 384
425sub cancel { 385sub cancel {
426 my $self = shift; 386 my $self = shift;
427 $self->{_status} = [@_];
428 387
429 if ($current == $self) { 388 if ($current == $self) {
430 push @destroy, $self; 389 terminate @_;
431 $manager->ready;
432 &schedule while 1;
433 } else { 390 } else {
391 $self->{_status} = [@_];
434 $self->_cancel; 392 $self->_cancel;
435 } 393 }
436} 394}
437 395
438=item $coroutine->throw ([$scalar]) 396=item $coroutine->throw ([$scalar])
441inside the coroutine at the next convenient point in time. Otherwise 399inside the coroutine at the next convenient point in time. Otherwise
442clears the exception object. 400clears the exception object.
443 401
444Coro will check for the exception each time a schedule-like-function 402Coro will check for the exception each time a schedule-like-function
445returns, i.e. after each C<schedule>, C<cede>, C<< Coro::Semaphore->down 403returns, i.e. after each C<schedule>, C<cede>, C<< Coro::Semaphore->down
446>>, C<< Coro::Handle->readable >> and so on. Note that this means that 404>>, C<< Coro::Handle->readable >> and so on. Most of these functions
447when a coroutine is acquiring a lock, it might only throw after it has 405detect this case and return early in case an exception is pending.
448sucessfully acquired it.
449 406
450The exception object will be thrown "as is" with the specified scalar in 407The exception object will be thrown "as is" with the specified scalar in
451C<$@>, i.e. if it is a string, no line number or newline will be appended 408C<$@>, i.e. if it is a string, no line number or newline will be appended
452(unlike with C<die>). 409(unlike with C<die>).
453 410
633# return immediately and can be reused) and because we cannot cede 590# return immediately and can be reused) and because we cannot cede
634# inside an event callback. 591# inside an event callback.
635our $unblock_scheduler = new Coro sub { 592our $unblock_scheduler = new Coro sub {
636 while () { 593 while () {
637 while (my $cb = pop @unblock_queue) { 594 while (my $cb = pop @unblock_queue) {
638 # this is an inlined copy of async_pool 595 &async_pool (@$cb);
639 my $coro = (pop @async_pool) || new Coro \&pool_handler;
640 596
641 $coro->{_invoke} = $cb;
642 $coro->ready;
643 cede; # for short-lived callbacks, this reduces pressure on the coro pool 597 # for short-lived callbacks, this reduces pressure on the coro pool
598 # as the chance is very high that the async_poll coro will be back
599 # in the idle state when cede returns
600 cede;
644 } 601 }
645 schedule; # sleep well 602 schedule; # sleep well
646 } 603 }
647}; 604};
648$unblock_scheduler->{desc} = "[unblock_sub scheduler]"; 605$unblock_scheduler->{desc} = "[unblock_sub scheduler]";
654 unshift @unblock_queue, [$cb, @_]; 611 unshift @unblock_queue, [$cb, @_];
655 $unblock_scheduler->ready; 612 $unblock_scheduler->ready;
656 } 613 }
657} 614}
658 615
616=item $cb = Coro::rouse_cb
617
618Create and return a "rouse callback". That's a code reference that, when
619called, will save its arguments and notify the owner coroutine of the
620callback.
621
622See the next function.
623
624=item @args = Coro::rouse_wait [$cb]
625
626Wait for the specified rouse callback (or the last one tht was created in
627this coroutine).
628
629As soon as the callback is invoked (or when the calback was invoked before
630C<rouse_wait>), it will return a copy of the arguments originally passed
631to the rouse callback.
632
633See the section B<HOW TO WAIT FOR A CALLBACK> for an actual usage example.
634
659=back 635=back
660 636
661=cut 637=cut
662 638
6631; 6391;
640
641=head1 HOW TO WAIT FOR A CALLBACK
642
643It is very common for a coroutine to wait for some callback to be
644called. This occurs naturally when you use coroutines in an otherwise
645event-based program, or when you use event-based libraries.
646
647These typically register a callback for some event, and call that callback
648when the event occured. In a coroutine, however, you typically want to
649just wait for the event, simplyifying things.
650
651For example C<< AnyEvent->child >> registers a callback to be called when
652a specific child has exited:
653
654 my $child_watcher = AnyEvent->child (pid => $pid, cb => sub { ... });
655
656But from withina coroutine, you often just want to write this:
657
658 my $status = wait_for_child $pid;
659
660Coro offers two functions specifically designed to make this easy,
661C<Coro::rouse_cb> and C<Coro::rouse_wait>.
662
663The first function, C<rouse_cb>, generates and returns a callback that,
664when invoked, will save it's arguments and notify the coroutine that
665created the callback.
666
667The second function, C<rouse_wait>, waits for the callback to be called
668(by calling C<schedule> to go to sleep) and returns the arguments
669originally passed to the callback.
670
671Using these functions, it becomes easy to write the C<wait_for_child>
672function mentioned above:
673
674 sub wait_for_child($) {
675 my ($pid) = @_;
676
677 my $watcher = AnyEvent->child (pid => $pid, cb => Coro::rouse_cb);
678
679 my ($rpid, $rstatus) = Coro::rouse_wait;
680 $rstatus
681 }
682
683In the case where C<rouse_cb> and C<rouse_wait> are not flexible enough,
684you can roll your own, using C<schedule>:
685
686 sub wait_for_child($) {
687 my ($pid) = @_;
688
689 # store the current coroutine in $current,
690 # and provide result variables for the closure passed to ->child
691 my $current = $Coro::current;
692 my ($done, $rstatus);
693
694 # pass a closure to ->child
695 my $watcher = AnyEvent->child (pid => $pid, cb => sub {
696 $rstatus = $_[1]; # remember rstatus
697 $done = 1; # mark $rstatus as valud
698 });
699
700 # wait until the closure has been called
701 schedule while !$done;
702
703 $rstatus
704 }
705
664 706
665=head1 BUGS/LIMITATIONS 707=head1 BUGS/LIMITATIONS
666 708
667=over 4 709=over 4
668 710

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines