ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/Coro/Coro.pm
Revision: 1.178
Committed: Thu Apr 17 22:33:10 2008 UTC (16 years, 1 month ago) by root
Branch: MAIN
Changes since 1.177: +0 -4 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3 root 1.8 Coro - coroutine process abstraction
4 root 1.1
5     =head1 SYNOPSIS
6    
7     use Coro;
8    
9 root 1.8 async {
10     # some asynchronous thread of execution
11 root 1.152 print "2\n";
12     cede; # yield back to main
13     print "4\n";
14 root 1.2 };
15 root 1.152 print "1\n";
16     cede; # yield to coroutine
17     print "3\n";
18     cede; # and again
19    
20     # use locking
21     my $lock = new Coro::Semaphore;
22     my $locked;
23    
24     $lock->down;
25     $locked = 1;
26     $lock->up;
27 root 1.2
28 root 1.1 =head1 DESCRIPTION
29    
30 root 1.98 This module collection manages coroutines. Coroutines are similar
31     to threads but don't run in parallel at the same time even on SMP
32 root 1.124 machines. The specific flavor of coroutine used in this module also
33     guarantees you that it will not switch between coroutines unless
34 root 1.98 necessary, at easily-identified points in your program, so locking and
35     parallel access are rarely an issue, making coroutine programming much
36     safer than threads programming.
37    
38     (Perl, however, does not natively support real threads but instead does a
39     very slow and memory-intensive emulation of processes using threads. This
40     is a performance win on Windows machines, and a loss everywhere else).
41    
42     In this module, coroutines are defined as "callchain + lexical variables +
43     @_ + $_ + $@ + $/ + C stack), that is, a coroutine has its own callchain,
44     its own set of lexicals and its own set of perls most important global
45 root 1.152 variables (see L<Coro::State> for more configuration).
46 root 1.22
47 root 1.8 =cut
48    
49     package Coro;
50    
51 root 1.71 use strict;
52     no warnings "uninitialized";
53 root 1.36
54 root 1.8 use Coro::State;
55    
56 root 1.83 use base qw(Coro::State Exporter);
57 pcg 1.55
58 root 1.83 our $idle; # idle handler
59 root 1.71 our $main; # main coroutine
60     our $current; # current coroutine
61 root 1.8
62 root 1.177 our $VERSION = '4.51';
63 root 1.8
64 root 1.105 our @EXPORT = qw(async async_pool cede schedule terminate current unblock_sub);
65 root 1.71 our %EXPORT_TAGS = (
66 root 1.31 prio => [qw(PRIO_MAX PRIO_HIGH PRIO_NORMAL PRIO_LOW PRIO_IDLE PRIO_MIN)],
67     );
68 root 1.97 our @EXPORT_OK = (@{$EXPORT_TAGS{prio}}, qw(nready));
69 root 1.8
70     {
71     my @async;
72 root 1.26 my $init;
73 root 1.8
74     # this way of handling attributes simply is NOT scalable ;()
75     sub import {
76 root 1.71 no strict 'refs';
77    
78 root 1.93 Coro->export_to_level (1, @_);
79 root 1.71
80 root 1.8 my $old = *{(caller)[0]."::MODIFY_CODE_ATTRIBUTES"}{CODE};
81     *{(caller)[0]."::MODIFY_CODE_ATTRIBUTES"} = sub {
82     my ($package, $ref) = (shift, shift);
83     my @attrs;
84     for (@_) {
85     if ($_ eq "Coro") {
86     push @async, $ref;
87 root 1.26 unless ($init++) {
88     eval q{
89     sub INIT {
90     &async(pop @async) while @async;
91     }
92     };
93     }
94 root 1.8 } else {
95 root 1.17 push @attrs, $_;
96 root 1.8 }
97     }
98 root 1.17 return $old ? $old->($package, $ref, @attrs) : @attrs;
99 root 1.8 };
100     }
101    
102     }
103    
104 root 1.43 =over 4
105    
106 root 1.8 =item $main
107 root 1.2
108 root 1.8 This coroutine represents the main program.
109 root 1.1
110     =cut
111    
112 pcg 1.55 $main = new Coro;
113 root 1.8
114 root 1.19 =item $current (or as function: current)
115 root 1.1
116 root 1.83 The current coroutine (the last coroutine switched to). The initial value
117     is C<$main> (of course).
118    
119     This variable is B<strictly> I<read-only>. It is provided for performance
120 root 1.124 reasons. If performance is not essential you are encouraged to use the
121 root 1.83 C<Coro::current> function instead.
122 root 1.1
123 root 1.8 =cut
124    
125 root 1.131 $main->{desc} = "[main::]";
126    
127 root 1.8 # maybe some other module used Coro::Specific before...
128 root 1.142 $main->{_specific} = $current->{_specific}
129 root 1.93 if $current;
130 root 1.1
131 root 1.93 _set_current $main;
132 root 1.19
133     sub current() { $current }
134 root 1.9
135     =item $idle
136    
137 root 1.83 A callback that is called whenever the scheduler finds no ready coroutines
138     to run. The default implementation prints "FATAL: deadlock detected" and
139 root 1.91 exits, because the program has no other way to continue.
140 root 1.83
141     This hook is overwritten by modules such as C<Coro::Timer> and
142 root 1.91 C<Coro::Event> to wait on an external event that hopefully wake up a
143     coroutine so the scheduler can run it.
144    
145     Please note that if your callback recursively invokes perl (e.g. for event
146 root 1.152 handlers), then it must be prepared to be called recursively itself.
147 root 1.9
148     =cut
149    
150 root 1.83 $idle = sub {
151 root 1.96 require Carp;
152     Carp::croak ("FATAL: deadlock detected");
153 root 1.9 };
154 root 1.8
155 root 1.103 sub _cancel {
156     my ($self) = @_;
157    
158     # free coroutine data and mark as destructed
159     $self->_destroy
160     or return;
161    
162     # call all destruction callbacks
163 root 1.142 $_->(@{$self->{_status}})
164     for @{(delete $self->{_on_destroy}) || []};
165 root 1.103 }
166    
167 root 1.24 # this coroutine is necessary because a coroutine
168     # cannot destroy itself.
169     my @destroy;
170 root 1.103 my $manager;
171    
172     $manager = new Coro sub {
173 pcg 1.57 while () {
174 root 1.103 (shift @destroy)->_cancel
175     while @destroy;
176    
177 root 1.24 &schedule;
178     }
179     };
180 root 1.132 $manager->desc ("[coro manager]");
181 root 1.103 $manager->prio (PRIO_MAX);
182    
183 root 1.43 =back
184 root 1.8
185     =head2 STATIC METHODS
186    
187 root 1.92 Static methods are actually functions that operate on the current coroutine only.
188 root 1.8
189     =over 4
190    
191 root 1.13 =item async { ... } [@args...]
192 root 1.8
193 root 1.92 Create a new asynchronous coroutine and return it's coroutine object
194     (usually unused). When the sub returns the new coroutine is automatically
195 root 1.8 terminated.
196    
197 root 1.145 See the C<Coro::State::new> constructor for info about the coroutine
198 root 1.152 environment in which coroutines run.
199 root 1.145
200 root 1.122 Calling C<exit> in a coroutine will do the same as calling exit outside
201     the coroutine. Likewise, when the coroutine dies, the program will exit,
202     just as it would in the main program.
203 root 1.79
204 root 1.13 # create a new coroutine that just prints its arguments
205     async {
206     print "@_\n";
207     } 1,2,3,4;
208    
209 root 1.8 =cut
210    
211 root 1.13 sub async(&@) {
212 root 1.104 my $coro = new Coro @_;
213     $coro->ready;
214     $coro
215 root 1.8 }
216 root 1.1
217 root 1.105 =item async_pool { ... } [@args...]
218    
219     Similar to C<async>, but uses a coroutine pool, so you should not call
220     terminate or join (although you are allowed to), and you get a coroutine
221     that might have executed other code already (which can be good or bad :).
222    
223     Also, the block is executed in an C<eval> context and a warning will be
224 root 1.108 issued in case of an exception instead of terminating the program, as
225     C<async> does. As the coroutine is being reused, stuff like C<on_destroy>
226     will not work in the expected way, unless you call terminate or cancel,
227     which somehow defeats the purpose of pooling.
228 root 1.105
229 root 1.146 The priority will be reset to C<0> after each job, tracing will be
230     disabled, the description will be reset and the default output filehandle
231     gets restored, so you can change alkl these. Otherwise the coroutine will
232     be re-used "as-is": most notably if you change other per-coroutine global
233     stuff such as C<$/> you need to revert that change, which is most simply
234     done by using local as in C< local $/ >.
235 root 1.105
236     The pool size is limited to 8 idle coroutines (this can be adjusted by
237     changing $Coro::POOL_SIZE), and there can be as many non-idle coros as
238     required.
239    
240     If you are concerned about pooled coroutines growing a lot because a
241 root 1.133 single C<async_pool> used a lot of stackspace you can e.g. C<async_pool
242     { terminate }> once per second or so to slowly replenish the pool. In
243     addition to that, when the stacks used by a handler grows larger than 16kb
244 root 1.134 (adjustable with $Coro::POOL_RSS) it will also exit.
245 root 1.105
246     =cut
247    
248     our $POOL_SIZE = 8;
249 root 1.134 our $POOL_RSS = 16 * 1024;
250     our @async_pool;
251 root 1.105
252     sub pool_handler {
253 root 1.134 my $cb;
254    
255 root 1.105 while () {
256 root 1.134 eval {
257     while () {
258 root 1.136 _pool_1 $cb;
259 root 1.134 &$cb;
260 root 1.136 _pool_2 $cb;
261 root 1.135 &schedule;
262 root 1.134 }
263 root 1.105 };
264 root 1.134
265 root 1.151 last if $@ eq "\3async_pool terminate\2\n";
266 root 1.105 warn $@ if $@;
267 root 1.106 }
268     }
269 root 1.105
270     sub async_pool(&@) {
271     # this is also inlined into the unlock_scheduler
272 root 1.135 my $coro = (pop @async_pool) || new Coro \&pool_handler;
273 root 1.105
274     $coro->{_invoke} = [@_];
275     $coro->ready;
276    
277     $coro
278     }
279    
280 root 1.8 =item schedule
281 root 1.6
282 root 1.92 Calls the scheduler. Please note that the current coroutine will not be put
283 root 1.8 into the ready queue, so calling this function usually means you will
284 root 1.91 never be called again unless something else (e.g. an event handler) calls
285     ready.
286    
287     The canonical way to wait on external events is this:
288    
289     {
290 root 1.92 # remember current coroutine
291 root 1.91 my $current = $Coro::current;
292    
293     # register a hypothetical event handler
294     on_event_invoke sub {
295     # wake up sleeping coroutine
296     $current->ready;
297     undef $current;
298     };
299    
300 root 1.124 # call schedule until event occurred.
301 root 1.91 # in case we are woken up for other reasons
302     # (current still defined), loop.
303     Coro::schedule while $current;
304     }
305 root 1.1
306 root 1.22 =item cede
307 root 1.1
308 root 1.92 "Cede" to other coroutines. This function puts the current coroutine into the
309 root 1.22 ready queue and calls C<schedule>, which has the effect of giving up the
310     current "timeslice" to other coroutines of the same or higher priority.
311 root 1.7
312 root 1.102 =item Coro::cede_notself
313    
314     Works like cede, but is not exported by default and will cede to any
315     coroutine, regardless of priority, once.
316    
317 root 1.40 =item terminate [arg...]
318 root 1.7
319 root 1.92 Terminates the current coroutine with the given status values (see L<cancel>).
320 root 1.13
321 root 1.141 =item killall
322    
323     Kills/terminates/cancels all coroutines except the currently running
324     one. This is useful after a fork, either in the child or the parent, as
325     usually only one of them should inherit the running coroutines.
326    
327 root 1.1 =cut
328    
329 root 1.8 sub terminate {
330 pcg 1.59 $current->cancel (@_);
331 root 1.1 }
332 root 1.6
333 root 1.141 sub killall {
334     for (Coro::State::list) {
335     $_->cancel
336     if $_ != $current && UNIVERSAL::isa $_, "Coro";
337     }
338     }
339    
340 root 1.8 =back
341    
342 root 1.92 =head2 COROUTINE METHODS
343 root 1.8
344 root 1.92 These are the methods you can call on coroutine objects.
345 root 1.6
346 root 1.8 =over 4
347    
348 root 1.13 =item new Coro \&sub [, @args...]
349 root 1.8
350 root 1.92 Create a new coroutine and return it. When the sub returns the coroutine
351 root 1.40 automatically terminates as if C<terminate> with the returned values were
352 root 1.92 called. To make the coroutine run you must first put it into the ready queue
353 root 1.41 by calling the ready method.
354 root 1.13
355 root 1.145 See C<async> and C<Coro::State::new> for additional info about the
356     coroutine environment.
357 root 1.89
358 root 1.6 =cut
359    
360 root 1.94 sub _run_coro {
361 root 1.13 terminate &{+shift};
362     }
363    
364 root 1.8 sub new {
365     my $class = shift;
366 root 1.83
367 root 1.94 $class->SUPER::new (\&_run_coro, @_)
368 root 1.8 }
369 root 1.6
370 root 1.92 =item $success = $coroutine->ready
371 root 1.1
372 root 1.92 Put the given coroutine into the ready queue (according to it's priority)
373     and return true. If the coroutine is already in the ready queue, do nothing
374 root 1.90 and return false.
375 root 1.1
376 root 1.92 =item $is_ready = $coroutine->is_ready
377 root 1.90
378 root 1.92 Return wether the coroutine is currently the ready queue or not,
379 root 1.28
380 root 1.92 =item $coroutine->cancel (arg...)
381 root 1.28
382 root 1.92 Terminates the given coroutine and makes it return the given arguments as
383 root 1.103 status (default: the empty list). Never returns if the coroutine is the
384     current coroutine.
385 root 1.28
386     =cut
387    
388     sub cancel {
389 pcg 1.59 my $self = shift;
390 root 1.142 $self->{_status} = [@_];
391 root 1.103
392     if ($current == $self) {
393     push @destroy, $self;
394     $manager->ready;
395     &schedule while 1;
396     } else {
397     $self->_cancel;
398     }
399 root 1.40 }
400    
401 root 1.92 =item $coroutine->join
402 root 1.40
403     Wait until the coroutine terminates and return any values given to the
404 root 1.143 C<terminate> or C<cancel> functions. C<join> can be called concurrently
405     from multiple coroutines.
406 root 1.40
407     =cut
408    
409     sub join {
410     my $self = shift;
411 root 1.103
412 root 1.142 unless ($self->{_status}) {
413 root 1.103 my $current = $current;
414    
415 root 1.142 push @{$self->{_on_destroy}}, sub {
416 root 1.103 $current->ready;
417     undef $current;
418     };
419    
420     &schedule while $current;
421 root 1.40 }
422 root 1.103
423 root 1.142 wantarray ? @{$self->{_status}} : $self->{_status}[0];
424 root 1.31 }
425    
426 root 1.101 =item $coroutine->on_destroy (\&cb)
427    
428     Registers a callback that is called when this coroutine gets destroyed,
429     but before it is joined. The callback gets passed the terminate arguments,
430     if any.
431    
432     =cut
433    
434     sub on_destroy {
435     my ($self, $cb) = @_;
436    
437 root 1.142 push @{ $self->{_on_destroy} }, $cb;
438 root 1.101 }
439    
440 root 1.92 =item $oldprio = $coroutine->prio ($newprio)
441 root 1.31
442 root 1.41 Sets (or gets, if the argument is missing) the priority of the
443 root 1.92 coroutine. Higher priority coroutines get run before lower priority
444     coroutines. Priorities are small signed integers (currently -4 .. +3),
445 root 1.41 that you can refer to using PRIO_xxx constants (use the import tag :prio
446     to get then):
447 root 1.31
448     PRIO_MAX > PRIO_HIGH > PRIO_NORMAL > PRIO_LOW > PRIO_IDLE > PRIO_MIN
449     3 > 1 > 0 > -1 > -3 > -4
450    
451     # set priority to HIGH
452     current->prio(PRIO_HIGH);
453    
454     The idle coroutine ($Coro::idle) always has a lower priority than any
455     existing coroutine.
456    
457 root 1.92 Changing the priority of the current coroutine will take effect immediately,
458     but changing the priority of coroutines in the ready queue (but not
459 root 1.31 running) will only take effect after the next schedule (of that
460 root 1.92 coroutine). This is a bug that will be fixed in some future version.
461 root 1.31
462 root 1.92 =item $newprio = $coroutine->nice ($change)
463 root 1.31
464     Similar to C<prio>, but subtract the given value from the priority (i.e.
465     higher values mean lower priority, just as in unix).
466    
467 root 1.92 =item $olddesc = $coroutine->desc ($newdesc)
468 root 1.41
469     Sets (or gets in case the argument is missing) the description for this
470 root 1.92 coroutine. This is just a free-form string you can associate with a coroutine.
471 root 1.41
472 root 1.142 This method simply sets the C<< $coroutine->{desc} >> member to the given string. You
473     can modify this member directly if you wish.
474    
475 root 1.150 =item $coroutine->throw ([$scalar])
476    
477     If C<$throw> is specified and defined, it will be thrown as an exception
478     inside the coroutine at the next convinient point in time (usually after
479     it gains control at the next schedule/transfer/cede). Otherwise clears the
480     exception object.
481    
482     The exception object will be thrown "as is" with the specified scalar in
483     C<$@>, i.e. if it is a string, no line number or newline will be appended
484     (unlike with C<die>).
485    
486     This can be used as a softer means than C<cancel> to ask a coroutine to
487     end itself, although there is no guarentee that the exception will lead to
488     termination, and if the exception isn't caught it might well end the whole
489     program.
490    
491 root 1.41 =cut
492    
493     sub desc {
494     my $old = $_[0]{desc};
495     $_[0]{desc} = $_[1] if @_ > 1;
496     $old;
497 root 1.8 }
498 root 1.1
499 root 1.8 =back
500 root 1.2
501 root 1.97 =head2 GLOBAL FUNCTIONS
502 root 1.92
503     =over 4
504    
505 root 1.97 =item Coro::nready
506    
507     Returns the number of coroutines that are currently in the ready state,
508 root 1.124 i.e. that can be switched to. The value C<0> means that the only runnable
509 root 1.97 coroutine is the currently running one, so C<cede> would have no effect,
510     and C<schedule> would cause a deadlock unless there is an idle handler
511     that wakes up some coroutines.
512    
513 root 1.103 =item my $guard = Coro::guard { ... }
514    
515 root 1.119 This creates and returns a guard object. Nothing happens until the object
516 root 1.103 gets destroyed, in which case the codeblock given as argument will be
517     executed. This is useful to free locks or other resources in case of a
518     runtime error or when the coroutine gets canceled, as in both cases the
519     guard block will be executed. The guard object supports only one method,
520     C<< ->cancel >>, which will keep the codeblock from being executed.
521    
522     Example: set some flag and clear it again when the coroutine gets canceled
523     or the function returns:
524    
525     sub do_something {
526     my $guard = Coro::guard { $busy = 0 };
527     $busy = 1;
528    
529     # do something that requires $busy to be true
530     }
531    
532     =cut
533    
534     sub guard(&) {
535     bless \(my $cb = $_[0]), "Coro::guard"
536     }
537    
538     sub Coro::guard::cancel {
539     ${$_[0]} = sub { };
540     }
541    
542     sub Coro::guard::DESTROY {
543     ${$_[0]}->();
544     }
545    
546    
547 root 1.92 =item unblock_sub { ... }
548    
549     This utility function takes a BLOCK or code reference and "unblocks" it,
550     returning the new coderef. This means that the new coderef will return
551     immediately without blocking, returning nothing, while the original code
552     ref will be called (with parameters) from within its own coroutine.
553    
554 root 1.124 The reason this function exists is that many event libraries (such as the
555 root 1.92 venerable L<Event|Event> module) are not coroutine-safe (a weaker form
556     of thread-safety). This means you must not block within event callbacks,
557     otherwise you might suffer from crashes or worse.
558    
559     This function allows your callbacks to block by executing them in another
560     coroutine where it is safe to block. One example where blocking is handy
561     is when you use the L<Coro::AIO|Coro::AIO> functions to save results to
562     disk.
563    
564     In short: simply use C<unblock_sub { ... }> instead of C<sub { ... }> when
565     creating event callbacks that want to block.
566    
567     =cut
568    
569     our @unblock_queue;
570    
571 root 1.105 # we create a special coro because we want to cede,
572     # to reduce pressure on the coro pool (because most callbacks
573     # return immediately and can be reused) and because we cannot cede
574     # inside an event callback.
575 root 1.132 our $unblock_scheduler = new Coro sub {
576 root 1.92 while () {
577     while (my $cb = pop @unblock_queue) {
578 root 1.105 # this is an inlined copy of async_pool
579 root 1.134 my $coro = (pop @async_pool) || new Coro \&pool_handler;
580 root 1.105
581     $coro->{_invoke} = $cb;
582     $coro->ready;
583     cede; # for short-lived callbacks, this reduces pressure on the coro pool
584 root 1.92 }
585 root 1.105 schedule; # sleep well
586 root 1.92 }
587     };
588 root 1.132 $unblock_scheduler->desc ("[unblock_sub scheduler]");
589 root 1.92
590     sub unblock_sub(&) {
591     my $cb = shift;
592    
593     sub {
594 root 1.105 unshift @unblock_queue, [$cb, @_];
595 root 1.92 $unblock_scheduler->ready;
596     }
597     }
598    
599     =back
600    
601 root 1.8 =cut
602 root 1.2
603 root 1.8 1;
604 root 1.14
605 root 1.17 =head1 BUGS/LIMITATIONS
606 root 1.14
607 root 1.52 - you must make very sure that no coro is still active on global
608 root 1.53 destruction. very bad things might happen otherwise (usually segfaults).
609 root 1.52
610     - this module is not thread-safe. You should only ever use this module
611 root 1.124 from the same thread (this requirement might be loosened in the future
612 root 1.52 to allow per-thread schedulers, but Coro::State does not yet allow
613     this).
614 root 1.9
615     =head1 SEE ALSO
616    
617 root 1.152 Lower level Configuration, Coroutine Environment: L<Coro::State>.
618    
619     Debugging: L<Coro::Debug>.
620    
621     Support/Utility: L<Coro::Specific>, L<Coro::Util>.
622 root 1.67
623     Locking/IPC: L<Coro::Signal>, L<Coro::Channel>, L<Coro::Semaphore>, L<Coro::SemaphoreSet>, L<Coro::RWLock>.
624    
625 root 1.152 Event/IO: L<Coro::Timer>, L<Coro::Event>, L<Coro::Handle>, L<Coro::Socket>.
626    
627     Compatibility: L<Coro::LWP>, L<Coro::Storable>, L<Coro::Select>.
628 root 1.67
629 root 1.162 Embedding: L<Coro::MakeMaker>.
630 root 1.1
631     =head1 AUTHOR
632    
633 root 1.66 Marc Lehmann <schmorp@schmorp.de>
634 root 1.64 http://home.schmorp.de/
635 root 1.1
636     =cut
637