ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/Coro/Coro.pm
(Generate patch)

Comparing Coro/Coro.pm (file contents):
Revision 1.207 by root, Mon Nov 3 16:05:37 2008 UTC vs.
Revision 1.226 by root, Wed Nov 19 16:01:32 2008 UTC

56 56
57=cut 57=cut
58 58
59package Coro; 59package Coro;
60 60
61use strict; 61use strict qw(vars subs);
62no warnings "uninitialized"; 62no warnings "uninitialized";
63 63
64use Coro::State; 64use Coro::State;
65 65
66use base qw(Coro::State Exporter); 66use base qw(Coro::State Exporter);
67 67
68our $idle; # idle handler 68our $idle; # idle handler
69our $main; # main coroutine 69our $main; # main coroutine
70our $current; # current coroutine 70our $current; # current coroutine
71 71
72our $VERSION = 4.803; 72our $VERSION = 5.0;
73 73
74our @EXPORT = qw(async async_pool cede schedule terminate current unblock_sub); 74our @EXPORT = qw(async async_pool cede schedule terminate current unblock_sub);
75our %EXPORT_TAGS = ( 75our %EXPORT_TAGS = (
76 prio => [qw(PRIO_MAX PRIO_HIGH PRIO_NORMAL PRIO_LOW PRIO_IDLE PRIO_MIN)], 76 prio => [qw(PRIO_MAX PRIO_HIGH PRIO_NORMAL PRIO_LOW PRIO_IDLE PRIO_MIN)],
77); 77);
86coroutines, it is mainly useful to compare again C<$Coro::current>, to see 86coroutines, it is mainly useful to compare again C<$Coro::current>, to see
87whether you are running in the main program or not. 87whether you are running in the main program or not.
88 88
89=cut 89=cut
90 90
91$main = new Coro; 91# $main is now being initialised by Coro::State
92 92
93=item $Coro::current 93=item $Coro::current
94 94
95The coroutine object representing the current coroutine (the last 95The coroutine object representing the current coroutine (the last
96coroutine that the Coro scheduler switched to). The initial value is 96coroutine that the Coro scheduler switched to). The initial value is
97C<$main> (of course). 97C<$Coro::main> (of course).
98 98
99This variable is B<strictly> I<read-only>. You can take copies of the 99This variable is B<strictly> I<read-only>. You can take copies of the
100value stored in it and use it as any other coroutine object, but you must 100value stored in it and use it as any other coroutine object, but you must
101not otherwise modify the variable itself. 101not otherwise modify the variable itself.
102 102
103=cut 103=cut
104
105$main->{desc} = "[main::]";
106
107# maybe some other module used Coro::Specific before...
108$main->{_specific} = $current->{_specific}
109 if $current;
110
111_set_current $main;
112 104
113sub current() { $current } # [DEPRECATED] 105sub current() { $current } # [DEPRECATED]
114 106
115=item $Coro::idle 107=item $Coro::idle
116 108
152 $self->_destroy 144 $self->_destroy
153 or return; 145 or return;
154 146
155 # call all destruction callbacks 147 # call all destruction callbacks
156 $_->(@{$self->{_status}}) 148 $_->(@{$self->{_status}})
157 for @{(delete $self->{_on_destroy}) || []}; 149 for @{ delete $self->{_on_destroy} || [] };
158} 150}
159 151
160# this coroutine is necessary because a coroutine 152# this coroutine is necessary because a coroutine
161# cannot destroy itself. 153# cannot destroy itself.
162my @destroy; 154our @destroy;
163my $manager; 155our $manager;
164 156
165$manager = new Coro sub { 157$manager = new Coro sub {
166 while () { 158 while () {
167 (shift @destroy)->_cancel 159 (shift @destroy)->_cancel
168 while @destroy; 160 while @destroy;
169 161
170 &schedule; 162 &schedule;
171 } 163 }
172}; 164};
173$manager->desc ("[coro manager]"); 165$manager->{desc} = "[coro manager]";
174$manager->prio (PRIO_MAX); 166$manager->prio (PRIO_MAX);
175 167
176=back 168=back
177 169
178=head2 SIMPLE COROUTINE CREATION 170=head2 SIMPLE COROUTINE CREATION
275 } 267 }
276 } 268 }
277} 269}
278 270
279sub async_pool(&@) { 271sub async_pool(&@) {
280 # this is also inlined into the unlock_scheduler 272 # this is also inlined into the unblock_scheduler
281 my $coro = (pop @async_pool) || new Coro \&pool_handler; 273 my $coro = (pop @async_pool) || new Coro \&pool_handler;
282 274
283 $coro->{_invoke} = [@_]; 275 $coro->{_invoke} = [@_];
284 $coro->ready; 276 $coro->ready;
285 277
313>> on that once some event happens, and last you call C<schedule> to put 305>> on that once some event happens, and last you call C<schedule> to put
314yourself to sleep. Note that a lot of things can wake your coroutine up, 306yourself to sleep. Note that a lot of things can wake your coroutine up,
315so you need to check whether the event indeed happened, e.g. by storing the 307so you need to check whether the event indeed happened, e.g. by storing the
316status in a variable. 308status in a variable.
317 309
318The canonical way to wait on external events is this: 310See B<HOW TO WAIT FOR A CALLBACK>, below, for some ways to wait for callbacks.
319
320 {
321 # remember current coroutine
322 my $current = $Coro::current;
323
324 # register a hypothetical event handler
325 on_event_invoke sub {
326 # wake up sleeping coroutine
327 $current->ready;
328 undef $current;
329 };
330
331 # call schedule until event occurred.
332 # in case we are woken up for other reasons
333 # (current still defined), loop.
334 Coro::schedule while $current;
335 }
336 311
337=item cede 312=item cede
338 313
339"Cede" to other coroutines. This function puts the current coroutine into 314"Cede" to other coroutines. This function puts the current coroutine into
340the ready queue and calls C<schedule>, which has the effect of giving 315the ready queue and calls C<schedule>, which has the effect of giving
365program calls this function, there will be some one-time resource leak. 340program calls this function, there will be some one-time resource leak.
366 341
367=cut 342=cut
368 343
369sub terminate { 344sub terminate {
370 $current->cancel (@_); 345 $current->{_status} = [@_];
346 push @destroy, $current;
347 $manager->ready;
348 do { &schedule } while 1;
371} 349}
372 350
373sub killall { 351sub killall {
374 for (Coro::State::list) { 352 for (Coro::State::list) {
375 $_->cancel 353 $_->cancel
396See C<async> and C<Coro::State::new> for additional info about the 374See C<async> and C<Coro::State::new> for additional info about the
397coroutine environment. 375coroutine environment.
398 376
399=cut 377=cut
400 378
401sub _run_coro { 379sub _terminate {
402 terminate &{+shift}; 380 terminate &{+shift};
403}
404
405sub new {
406 my $class = shift;
407
408 $class->SUPER::new (\&_run_coro, @_)
409} 381}
410 382
411=item $success = $coroutine->ready 383=item $success = $coroutine->ready
412 384
413Put the given coroutine into the end of its ready queue (there is one 385Put the given coroutine into the end of its ready queue (there is one
430 402
431=cut 403=cut
432 404
433sub cancel { 405sub cancel {
434 my $self = shift; 406 my $self = shift;
435 $self->{_status} = [@_];
436 407
437 if ($current == $self) { 408 if ($current == $self) {
438 push @destroy, $self; 409 terminate @_;
439 $manager->ready;
440 &schedule while 1;
441 } else { 410 } else {
411 $self->{_status} = [@_];
442 $self->_cancel; 412 $self->_cancel;
443 } 413 }
444} 414}
415
416=item $coroutine->throw ([$scalar])
417
418If C<$throw> is specified and defined, it will be thrown as an exception
419inside the coroutine at the next convenient point in time. Otherwise
420clears the exception object.
421
422Coro will check for the exception each time a schedule-like-function
423returns, i.e. after each C<schedule>, C<cede>, C<< Coro::Semaphore->down
424>>, C<< Coro::Handle->readable >> and so on. Most of these functions
425detect this case and return early in case an exception is pending.
426
427The exception object will be thrown "as is" with the specified scalar in
428C<$@>, i.e. if it is a string, no line number or newline will be appended
429(unlike with C<die>).
430
431This can be used as a softer means than C<cancel> to ask a coroutine to
432end itself, although there is no guarantee that the exception will lead to
433termination, and if the exception isn't caught it might well end the whole
434program.
435
436You might also think of C<throw> as being the moral equivalent of
437C<kill>ing a coroutine with a signal (in this case, a scalar).
445 438
446=item $coroutine->join 439=item $coroutine->join
447 440
448Wait until the coroutine terminates and return any values given to the 441Wait until the coroutine terminates and return any values given to the
449C<terminate> or C<cancel> functions. C<join> can be called concurrently 442C<terminate> or C<cancel> functions. C<join> can be called concurrently
511higher values mean lower priority, just as in unix). 504higher values mean lower priority, just as in unix).
512 505
513=item $olddesc = $coroutine->desc ($newdesc) 506=item $olddesc = $coroutine->desc ($newdesc)
514 507
515Sets (or gets in case the argument is missing) the description for this 508Sets (or gets in case the argument is missing) the description for this
516coroutine. This is just a free-form string you can associate with a coroutine. 509coroutine. This is just a free-form string you can associate with a
510coroutine.
517 511
518This method simply sets the C<< $coroutine->{desc} >> member to the given string. You 512This method simply sets the C<< $coroutine->{desc} >> member to the given
519can modify this member directly if you wish. 513string. You can modify this member directly if you wish.
520
521=item $coroutine->throw ([$scalar])
522
523If C<$throw> is specified and defined, it will be thrown as an exception
524inside the coroutine at the next convinient point in time (usually after
525it gains control at the next schedule/transfer/cede). Otherwise clears the
526exception object.
527
528The exception object will be thrown "as is" with the specified scalar in
529C<$@>, i.e. if it is a string, no line number or newline will be appended
530(unlike with C<die>).
531
532This can be used as a softer means than C<cancel> to ask a coroutine to
533end itself, although there is no guarentee that the exception will lead to
534termination, and if the exception isn't caught it might well end the whole
535program.
536 514
537=cut 515=cut
538 516
539sub desc { 517sub desc {
540 my $old = $_[0]{desc}; 518 my $old = $_[0]{desc};
642 cede; # for short-lived callbacks, this reduces pressure on the coro pool 620 cede; # for short-lived callbacks, this reduces pressure on the coro pool
643 } 621 }
644 schedule; # sleep well 622 schedule; # sleep well
645 } 623 }
646}; 624};
647$unblock_scheduler->desc ("[unblock_sub scheduler]"); 625$unblock_scheduler->{desc} = "[unblock_sub scheduler]";
648 626
649sub unblock_sub(&) { 627sub unblock_sub(&) {
650 my $cb = shift; 628 my $cb = shift;
651 629
652 sub { 630 sub {
653 unshift @unblock_queue, [$cb, @_]; 631 unshift @unblock_queue, [$cb, @_];
654 $unblock_scheduler->ready; 632 $unblock_scheduler->ready;
655 } 633 }
656} 634}
657 635
636=item $cb = Coro::rouse_cb
637
638Create and return a "rouse callback". That's a code reference that, when
639called, will save its arguments and notify the owner coroutine of the
640callback.
641
642See the next function.
643
644=item @args = Coro::rouse_wait [$cb]
645
646Wait for the specified rouse callback (or the last one tht was created in
647this coroutine).
648
649As soon as the callback is invoked (or when the calback was invoked before
650C<rouse_wait>), it will return a copy of the arguments originally passed
651to the rouse callback.
652
653See the section B<HOW TO WAIT FOR A CALLBACK> for an actual usage example.
654
658=back 655=back
659 656
660=cut 657=cut
661 658
6621; 6591;
663 660
661=head1 HOW TO WAIT FOR A CALLBACK
662
663It is very common for a coroutine to wait for some callback to be
664called. This occurs naturally when you use coroutines in an otherwise
665event-based program, or when you use event-based libraries.
666
667These typically register a callback for some event, and call that callback
668when the event occured. In a coroutine, however, you typically want to
669just wait for the event, simplyifying things.
670
671For example C<< AnyEvent->child >> registers a callback to be called when
672a specific child has exited:
673
674 my $child_watcher = AnyEvent->child (pid => $pid, cb => sub { ... });
675
676But from withina coroutine, you often just want to write this:
677
678 my $status = wait_for_child $pid;
679
680Coro offers two functions specifically designed to make this easy,
681C<Coro::rouse_cb> and C<Coro::rouse_wait>.
682
683The first function, C<rouse_cb>, generates and returns a callback that,
684when invoked, will save it's arguments and notify the coroutine that
685created the callback.
686
687The second function, C<rouse_wait>, waits for the callback to be called
688(by calling C<schedule> to go to sleep) and returns the arguments
689originally passed to the callback.
690
691Using these functions, it becomes easy to write the C<wait_for_child>
692function mentioned above:
693
694 sub wait_for_child($) {
695 my ($pid) = @_;
696
697 my $watcher = AnyEvent->child (pid => $pid, cb => Coro::rouse_cb);
698
699 my ($rpid, $rstatus) = Coro::rouse_wait;
700 $rstatus
701 }
702
703In the case where C<rouse_cb> and C<rouse_wait> are not flexible enough,
704you can roll your own, using C<schedule>:
705
706 sub wait_for_child($) {
707 my ($pid) = @_;
708
709 # store the current coroutine in $current,
710 # and provide result variables for the closure passed to ->child
711 my $current = $Coro::current;
712 my ($done, $rstatus);
713
714 # pass a closure to ->child
715 my $watcher = AnyEvent->child (pid => $pid, cb => sub {
716 $rstatus = $_[1]; # remember rstatus
717 $done = 1; # mark $rstatus as valud
718 });
719
720 # wait until the closure has been called
721 schedule while !$done;
722
723 $rstatus
724 }
725
726
664=head1 BUGS/LIMITATIONS 727=head1 BUGS/LIMITATIONS
728
729=over 4
730
731=item fork with pthread backend
732
733When Coro is compiled using the pthread backend (which isn't recommended
734but required on many BSDs as their libcs are completely broken), then
735coroutines will not survive a fork. There is no known workaround except to
736fix your libc and use a saner backend.
737
738=item perl process emulation ("threads")
665 739
666This module is not perl-pseudo-thread-safe. You should only ever use this 740This module is not perl-pseudo-thread-safe. You should only ever use this
667module from the same thread (this requirement might be removed in the 741module from the same thread (this requirement might be removed in the
668future to allow per-thread schedulers, but Coro::State does not yet allow 742future to allow per-thread schedulers, but Coro::State does not yet allow
669this). I recommend disabling thread support and using processes, as this 743this). I recommend disabling thread support and using processes, as having
670is much faster and uses less memory. 744the windows process emulation enabled under unix roughly halves perl
745performance, even when not used.
746
747=item coroutine switching not signal safe
748
749You must not switch to another coroutine from within a signal handler
750(only relevant with %SIG - most event libraries provide safe signals).
751
752That means you I<MUST NOT> call any function that might "block" the
753current coroutine - C<cede>, C<schedule> C<< Coro::Semaphore->down >> or
754anything that calls those. Everything else, including calling C<ready>,
755works.
756
757=back
758
671 759
672=head1 SEE ALSO 760=head1 SEE ALSO
673 761
674Event-Loop integration: L<Coro::AnyEvent>, L<Coro::EV>, L<Coro::Event>. 762Event-Loop integration: L<Coro::AnyEvent>, L<Coro::EV>, L<Coro::Event>.
675 763

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines