ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/Coro/Coro.pm
(Generate patch)

Comparing Coro/Coro.pm (file contents):
Revision 1.220 by root, Sun Nov 16 11:12:57 2008 UTC vs.
Revision 1.229 by root, Thu Nov 20 06:32:55 2008 UTC

149 for @{ delete $self->{_on_destroy} || [] }; 149 for @{ delete $self->{_on_destroy} || [] };
150} 150}
151 151
152# this coroutine is necessary because a coroutine 152# this coroutine is necessary because a coroutine
153# cannot destroy itself. 153# cannot destroy itself.
154my @destroy; 154our @destroy;
155my $manager; 155our $manager;
156 156
157$manager = new Coro sub { 157$manager = new Coro sub {
158 while () { 158 while () {
159 (shift @destroy)->_cancel 159 (shift @destroy)->_cancel
160 while @destroy; 160 while @destroy;
212Similar to C<async>, but uses a coroutine pool, so you should not call 212Similar to C<async>, but uses a coroutine pool, so you should not call
213terminate or join on it (although you are allowed to), and you get a 213terminate or join on it (although you are allowed to), and you get a
214coroutine that might have executed other code already (which can be good 214coroutine that might have executed other code already (which can be good
215or bad :). 215or bad :).
216 216
217On the plus side, this function is faster than creating (and destroying) 217On the plus side, this function is about twice as fast as creating (and
218a completly new coroutine, so if you need a lot of generic coroutines in 218destroying) a completely new coroutine, so if you need a lot of generic
219quick successsion, use C<async_pool>, not C<async>. 219coroutines in quick successsion, use C<async_pool>, not C<async>.
220 220
221The code block is executed in an C<eval> context and a warning will be 221The code block is executed in an C<eval> context and a warning will be
222issued in case of an exception instead of terminating the program, as 222issued in case of an exception instead of terminating the program, as
223C<async> does. As the coroutine is being reused, stuff like C<on_destroy> 223C<async> does. As the coroutine is being reused, stuff like C<on_destroy>
224will not work in the expected way, unless you call terminate or cancel, 224will not work in the expected way, unless you call terminate or cancel,
247our $POOL_SIZE = 8; 247our $POOL_SIZE = 8;
248our $POOL_RSS = 16 * 1024; 248our $POOL_RSS = 16 * 1024;
249our @async_pool; 249our @async_pool;
250 250
251sub pool_handler { 251sub pool_handler {
252 my $cb;
253
254 while () { 252 while () {
255 eval { 253 eval {
256 while () { 254 &{&_pool_handler} while 1;
257 _pool_1 $cb;
258 &$cb;
259 _pool_2 $cb;
260 &schedule;
261 }
262 }; 255 };
263 256
264 if ($@) {
265 last if $@ eq "\3async_pool terminate\2\n";
266 warn $@; 257 warn $@ if $@;
267 }
268 } 258 }
269}
270
271sub async_pool(&@) {
272 # this is also inlined into the unblock_scheduler
273 my $coro = (pop @async_pool) || new Coro \&pool_handler;
274
275 $coro->{_invoke} = [@_];
276 $coro->ready;
277
278 $coro
279} 259}
280 260
281=back 261=back
282 262
283=head2 STATIC METHODS 263=head2 STATIC METHODS
305>> on that once some event happens, and last you call C<schedule> to put 285>> on that once some event happens, and last you call C<schedule> to put
306yourself to sleep. Note that a lot of things can wake your coroutine up, 286yourself to sleep. Note that a lot of things can wake your coroutine up,
307so you need to check whether the event indeed happened, e.g. by storing the 287so you need to check whether the event indeed happened, e.g. by storing the
308status in a variable. 288status in a variable.
309 289
310The canonical way to wait on external events is this: 290See B<HOW TO WAIT FOR A CALLBACK>, below, for some ways to wait for callbacks.
311
312 {
313 # remember current coroutine
314 my $current = $Coro::current;
315
316 # register a hypothetical event handler
317 on_event_invoke sub {
318 # wake up sleeping coroutine
319 $current->ready;
320 undef $current;
321 };
322
323 # call schedule until event occurred.
324 # in case we are woken up for other reasons
325 # (current still defined), loop.
326 Coro::schedule while $current;
327 }
328 291
329=item cede 292=item cede
330 293
331"Cede" to other coroutines. This function puts the current coroutine into 294"Cede" to other coroutines. This function puts the current coroutine into
332the ready queue and calls C<schedule>, which has the effect of giving 295the ready queue and calls C<schedule>, which has the effect of giving
357program calls this function, there will be some one-time resource leak. 320program calls this function, there will be some one-time resource leak.
358 321
359=cut 322=cut
360 323
361sub terminate { 324sub terminate {
362 $current->cancel (@_); 325 $current->{_status} = [@_];
326 push @destroy, $current;
327 $manager->ready;
328 do { &schedule } while 1;
363} 329}
364 330
365sub killall { 331sub killall {
366 for (Coro::State::list) { 332 for (Coro::State::list) {
367 $_->cancel 333 $_->cancel
388See C<async> and C<Coro::State::new> for additional info about the 354See C<async> and C<Coro::State::new> for additional info about the
389coroutine environment. 355coroutine environment.
390 356
391=cut 357=cut
392 358
393sub _run_coro { 359sub _terminate {
394 terminate &{+shift}; 360 terminate &{+shift};
395}
396
397sub new {
398 my $class = shift;
399
400 $class->SUPER::new (\&_run_coro, @_)
401} 361}
402 362
403=item $success = $coroutine->ready 363=item $success = $coroutine->ready
404 364
405Put the given coroutine into the end of its ready queue (there is one 365Put the given coroutine into the end of its ready queue (there is one
422 382
423=cut 383=cut
424 384
425sub cancel { 385sub cancel {
426 my $self = shift; 386 my $self = shift;
427 $self->{_status} = [@_];
428 387
429 if ($current == $self) { 388 if ($current == $self) {
430 push @destroy, $self; 389 terminate @_;
431 $manager->ready;
432 &schedule while 1;
433 } else { 390 } else {
391 $self->{_status} = [@_];
434 $self->_cancel; 392 $self->_cancel;
435 } 393 }
436} 394}
437 395
396=item $coroutine->schedule_to
397
398Puts the current coroutine to sleep (like C<Coro::schedule>), but instead
399of continuing with the next coro from the ready queue, always switch to
400the given coroutine object (regardless of priority etc.). The readyness
401state of that coroutine isn't changed.
402
403This is an advanced method for special cases - I'd love to hear about any
404uses for this one.
405
406=item $coroutine->cede_to
407
408Like C<schedule_to>, but puts the current coroutine into the ready
409queue. This has the effect of temporarily switching to the given
410coroutine, and continuing some time later.
411
412This is an advanced method for special cases - I'd love to hear about any
413uses for this one.
414
438=item $coroutine->throw ([$scalar]) 415=item $coroutine->throw ([$scalar])
439 416
440If C<$throw> is specified and defined, it will be thrown as an exception 417If C<$throw> is specified and defined, it will be thrown as an exception
441inside the coroutine at the next convenient point in time (usually after 418inside the coroutine at the next convenient point in time. Otherwise
442it gains control at the next schedule/transfer/cede). Otherwise clears the
443exception object. 419clears the exception object.
420
421Coro will check for the exception each time a schedule-like-function
422returns, i.e. after each C<schedule>, C<cede>, C<< Coro::Semaphore->down
423>>, C<< Coro::Handle->readable >> and so on. Most of these functions
424detect this case and return early in case an exception is pending.
444 425
445The exception object will be thrown "as is" with the specified scalar in 426The exception object will be thrown "as is" with the specified scalar in
446C<$@>, i.e. if it is a string, no line number or newline will be appended 427C<$@>, i.e. if it is a string, no line number or newline will be appended
447(unlike with C<die>). 428(unlike with C<die>).
448 429
628# return immediately and can be reused) and because we cannot cede 609# return immediately and can be reused) and because we cannot cede
629# inside an event callback. 610# inside an event callback.
630our $unblock_scheduler = new Coro sub { 611our $unblock_scheduler = new Coro sub {
631 while () { 612 while () {
632 while (my $cb = pop @unblock_queue) { 613 while (my $cb = pop @unblock_queue) {
633 # this is an inlined copy of async_pool 614 &async_pool (@$cb);
634 my $coro = (pop @async_pool) || new Coro \&pool_handler;
635 615
636 $coro->{_invoke} = $cb;
637 $coro->ready;
638 cede; # for short-lived callbacks, this reduces pressure on the coro pool 616 # for short-lived callbacks, this reduces pressure on the coro pool
617 # as the chance is very high that the async_poll coro will be back
618 # in the idle state when cede returns
619 cede;
639 } 620 }
640 schedule; # sleep well 621 schedule; # sleep well
641 } 622 }
642}; 623};
643$unblock_scheduler->{desc} = "[unblock_sub scheduler]"; 624$unblock_scheduler->{desc} = "[unblock_sub scheduler]";
649 unshift @unblock_queue, [$cb, @_]; 630 unshift @unblock_queue, [$cb, @_];
650 $unblock_scheduler->ready; 631 $unblock_scheduler->ready;
651 } 632 }
652} 633}
653 634
635=item $cb = Coro::rouse_cb
636
637Create and return a "rouse callback". That's a code reference that, when
638called, will save its arguments and notify the owner coroutine of the
639callback.
640
641See the next function.
642
643=item @args = Coro::rouse_wait [$cb]
644
645Wait for the specified rouse callback (or the last one tht was created in
646this coroutine).
647
648As soon as the callback is invoked (or when the calback was invoked before
649C<rouse_wait>), it will return a copy of the arguments originally passed
650to the rouse callback.
651
652See the section B<HOW TO WAIT FOR A CALLBACK> for an actual usage example.
653
654=back 654=back
655 655
656=cut 656=cut
657 657
6581; 6581;
659
660=head1 HOW TO WAIT FOR A CALLBACK
661
662It is very common for a coroutine to wait for some callback to be
663called. This occurs naturally when you use coroutines in an otherwise
664event-based program, or when you use event-based libraries.
665
666These typically register a callback for some event, and call that callback
667when the event occured. In a coroutine, however, you typically want to
668just wait for the event, simplyifying things.
669
670For example C<< AnyEvent->child >> registers a callback to be called when
671a specific child has exited:
672
673 my $child_watcher = AnyEvent->child (pid => $pid, cb => sub { ... });
674
675But from withina coroutine, you often just want to write this:
676
677 my $status = wait_for_child $pid;
678
679Coro offers two functions specifically designed to make this easy,
680C<Coro::rouse_cb> and C<Coro::rouse_wait>.
681
682The first function, C<rouse_cb>, generates and returns a callback that,
683when invoked, will save it's arguments and notify the coroutine that
684created the callback.
685
686The second function, C<rouse_wait>, waits for the callback to be called
687(by calling C<schedule> to go to sleep) and returns the arguments
688originally passed to the callback.
689
690Using these functions, it becomes easy to write the C<wait_for_child>
691function mentioned above:
692
693 sub wait_for_child($) {
694 my ($pid) = @_;
695
696 my $watcher = AnyEvent->child (pid => $pid, cb => Coro::rouse_cb);
697
698 my ($rpid, $rstatus) = Coro::rouse_wait;
699 $rstatus
700 }
701
702In the case where C<rouse_cb> and C<rouse_wait> are not flexible enough,
703you can roll your own, using C<schedule>:
704
705 sub wait_for_child($) {
706 my ($pid) = @_;
707
708 # store the current coroutine in $current,
709 # and provide result variables for the closure passed to ->child
710 my $current = $Coro::current;
711 my ($done, $rstatus);
712
713 # pass a closure to ->child
714 my $watcher = AnyEvent->child (pid => $pid, cb => sub {
715 $rstatus = $_[1]; # remember rstatus
716 $done = 1; # mark $rstatus as valud
717 });
718
719 # wait until the closure has been called
720 schedule while !$done;
721
722 $rstatus
723 }
724
659 725
660=head1 BUGS/LIMITATIONS 726=head1 BUGS/LIMITATIONS
661 727
662=over 4 728=over 4
663 729
680=item coroutine switching not signal safe 746=item coroutine switching not signal safe
681 747
682You must not switch to another coroutine from within a signal handler 748You must not switch to another coroutine from within a signal handler
683(only relevant with %SIG - most event libraries provide safe signals). 749(only relevant with %SIG - most event libraries provide safe signals).
684 750
685That means you I<MUST NOT> call any fucntion that might "block" the 751That means you I<MUST NOT> call any function that might "block" the
686current coroutine - C<cede>, C<schedule> C<< Coro::Semaphore->down >> or 752current coroutine - C<cede>, C<schedule> C<< Coro::Semaphore->down >> or
687anything that calls those. Everything else, including calling C<ready>, 753anything that calls those. Everything else, including calling C<ready>,
688works. 754works.
689 755
690=back 756=back

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines