ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/Coro/Coro.pm
Revision: 1.180
Committed: Fri Apr 25 04:28:50 2008 UTC (16 years ago) by root
Branch: MAIN
Changes since 1.179: +1 -1 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3 root 1.8 Coro - coroutine process abstraction
4 root 1.1
5     =head1 SYNOPSIS
6    
7 root 1.179 use Coro;
8    
9     async {
10     # some asynchronous thread of execution
11     print "2\n";
12     cede; # yield back to main
13     print "4\n";
14     };
15     print "1\n";
16     cede; # yield to coroutine
17     print "3\n";
18     cede; # and again
19    
20     # use locking
21     my $lock = new Coro::Semaphore;
22     my $locked;
23    
24     $lock->down;
25     $locked = 1;
26     $lock->up;
27 root 1.2
28 root 1.1 =head1 DESCRIPTION
29    
30 root 1.98 This module collection manages coroutines. Coroutines are similar
31     to threads but don't run in parallel at the same time even on SMP
32 root 1.124 machines. The specific flavor of coroutine used in this module also
33     guarantees you that it will not switch between coroutines unless
34 root 1.98 necessary, at easily-identified points in your program, so locking and
35     parallel access are rarely an issue, making coroutine programming much
36     safer than threads programming.
37    
38     (Perl, however, does not natively support real threads but instead does a
39     very slow and memory-intensive emulation of processes using threads. This
40     is a performance win on Windows machines, and a loss everywhere else).
41    
42     In this module, coroutines are defined as "callchain + lexical variables +
43     @_ + $_ + $@ + $/ + C stack), that is, a coroutine has its own callchain,
44     its own set of lexicals and its own set of perls most important global
45 root 1.152 variables (see L<Coro::State> for more configuration).
46 root 1.22
47 root 1.8 =cut
48    
49     package Coro;
50    
51 root 1.71 use strict;
52     no warnings "uninitialized";
53 root 1.36
54 root 1.8 use Coro::State;
55    
56 root 1.83 use base qw(Coro::State Exporter);
57 pcg 1.55
58 root 1.83 our $idle; # idle handler
59 root 1.71 our $main; # main coroutine
60     our $current; # current coroutine
61 root 1.8
62 root 1.180 our $VERSION = 4.6;
63 root 1.8
64 root 1.105 our @EXPORT = qw(async async_pool cede schedule terminate current unblock_sub);
65 root 1.71 our %EXPORT_TAGS = (
66 root 1.31 prio => [qw(PRIO_MAX PRIO_HIGH PRIO_NORMAL PRIO_LOW PRIO_IDLE PRIO_MIN)],
67     );
68 root 1.97 our @EXPORT_OK = (@{$EXPORT_TAGS{prio}}, qw(nready));
69 root 1.8
70 root 1.43 =over 4
71    
72 root 1.8 =item $main
73 root 1.2
74 root 1.8 This coroutine represents the main program.
75 root 1.1
76     =cut
77    
78 pcg 1.55 $main = new Coro;
79 root 1.8
80 root 1.19 =item $current (or as function: current)
81 root 1.1
82 root 1.83 The current coroutine (the last coroutine switched to). The initial value
83     is C<$main> (of course).
84    
85     This variable is B<strictly> I<read-only>. It is provided for performance
86 root 1.124 reasons. If performance is not essential you are encouraged to use the
87 root 1.83 C<Coro::current> function instead.
88 root 1.1
89 root 1.8 =cut
90    
91 root 1.131 $main->{desc} = "[main::]";
92    
93 root 1.8 # maybe some other module used Coro::Specific before...
94 root 1.142 $main->{_specific} = $current->{_specific}
95 root 1.93 if $current;
96 root 1.1
97 root 1.93 _set_current $main;
98 root 1.19
99     sub current() { $current }
100 root 1.9
101     =item $idle
102    
103 root 1.83 A callback that is called whenever the scheduler finds no ready coroutines
104     to run. The default implementation prints "FATAL: deadlock detected" and
105 root 1.91 exits, because the program has no other way to continue.
106 root 1.83
107     This hook is overwritten by modules such as C<Coro::Timer> and
108 root 1.91 C<Coro::Event> to wait on an external event that hopefully wake up a
109     coroutine so the scheduler can run it.
110    
111     Please note that if your callback recursively invokes perl (e.g. for event
112 root 1.152 handlers), then it must be prepared to be called recursively itself.
113 root 1.9
114     =cut
115    
116 root 1.83 $idle = sub {
117 root 1.96 require Carp;
118     Carp::croak ("FATAL: deadlock detected");
119 root 1.9 };
120 root 1.8
121 root 1.103 sub _cancel {
122     my ($self) = @_;
123    
124     # free coroutine data and mark as destructed
125     $self->_destroy
126     or return;
127    
128     # call all destruction callbacks
129 root 1.142 $_->(@{$self->{_status}})
130     for @{(delete $self->{_on_destroy}) || []};
131 root 1.103 }
132    
133 root 1.24 # this coroutine is necessary because a coroutine
134     # cannot destroy itself.
135     my @destroy;
136 root 1.103 my $manager;
137    
138     $manager = new Coro sub {
139 pcg 1.57 while () {
140 root 1.103 (shift @destroy)->_cancel
141     while @destroy;
142    
143 root 1.24 &schedule;
144     }
145     };
146 root 1.132 $manager->desc ("[coro manager]");
147 root 1.103 $manager->prio (PRIO_MAX);
148    
149 root 1.43 =back
150 root 1.8
151     =head2 STATIC METHODS
152    
153 root 1.92 Static methods are actually functions that operate on the current coroutine only.
154 root 1.8
155     =over 4
156    
157 root 1.13 =item async { ... } [@args...]
158 root 1.8
159 root 1.92 Create a new asynchronous coroutine and return it's coroutine object
160     (usually unused). When the sub returns the new coroutine is automatically
161 root 1.8 terminated.
162    
163 root 1.145 See the C<Coro::State::new> constructor for info about the coroutine
164 root 1.152 environment in which coroutines run.
165 root 1.145
166 root 1.122 Calling C<exit> in a coroutine will do the same as calling exit outside
167     the coroutine. Likewise, when the coroutine dies, the program will exit,
168     just as it would in the main program.
169 root 1.79
170 root 1.13 # create a new coroutine that just prints its arguments
171     async {
172     print "@_\n";
173     } 1,2,3,4;
174    
175 root 1.8 =cut
176    
177 root 1.13 sub async(&@) {
178 root 1.104 my $coro = new Coro @_;
179     $coro->ready;
180     $coro
181 root 1.8 }
182 root 1.1
183 root 1.105 =item async_pool { ... } [@args...]
184    
185     Similar to C<async>, but uses a coroutine pool, so you should not call
186     terminate or join (although you are allowed to), and you get a coroutine
187     that might have executed other code already (which can be good or bad :).
188    
189     Also, the block is executed in an C<eval> context and a warning will be
190 root 1.108 issued in case of an exception instead of terminating the program, as
191     C<async> does. As the coroutine is being reused, stuff like C<on_destroy>
192     will not work in the expected way, unless you call terminate or cancel,
193     which somehow defeats the purpose of pooling.
194 root 1.105
195 root 1.146 The priority will be reset to C<0> after each job, tracing will be
196     disabled, the description will be reset and the default output filehandle
197     gets restored, so you can change alkl these. Otherwise the coroutine will
198     be re-used "as-is": most notably if you change other per-coroutine global
199     stuff such as C<$/> you need to revert that change, which is most simply
200     done by using local as in C< local $/ >.
201 root 1.105
202     The pool size is limited to 8 idle coroutines (this can be adjusted by
203     changing $Coro::POOL_SIZE), and there can be as many non-idle coros as
204     required.
205    
206     If you are concerned about pooled coroutines growing a lot because a
207 root 1.133 single C<async_pool> used a lot of stackspace you can e.g. C<async_pool
208     { terminate }> once per second or so to slowly replenish the pool. In
209     addition to that, when the stacks used by a handler grows larger than 16kb
210 root 1.134 (adjustable with $Coro::POOL_RSS) it will also exit.
211 root 1.105
212     =cut
213    
214     our $POOL_SIZE = 8;
215 root 1.134 our $POOL_RSS = 16 * 1024;
216     our @async_pool;
217 root 1.105
218     sub pool_handler {
219 root 1.134 my $cb;
220    
221 root 1.105 while () {
222 root 1.134 eval {
223     while () {
224 root 1.136 _pool_1 $cb;
225 root 1.134 &$cb;
226 root 1.136 _pool_2 $cb;
227 root 1.135 &schedule;
228 root 1.134 }
229 root 1.105 };
230 root 1.134
231 root 1.151 last if $@ eq "\3async_pool terminate\2\n";
232 root 1.105 warn $@ if $@;
233 root 1.106 }
234     }
235 root 1.105
236     sub async_pool(&@) {
237     # this is also inlined into the unlock_scheduler
238 root 1.135 my $coro = (pop @async_pool) || new Coro \&pool_handler;
239 root 1.105
240     $coro->{_invoke} = [@_];
241     $coro->ready;
242    
243     $coro
244     }
245    
246 root 1.8 =item schedule
247 root 1.6
248 root 1.92 Calls the scheduler. Please note that the current coroutine will not be put
249 root 1.8 into the ready queue, so calling this function usually means you will
250 root 1.91 never be called again unless something else (e.g. an event handler) calls
251     ready.
252    
253     The canonical way to wait on external events is this:
254    
255     {
256 root 1.92 # remember current coroutine
257 root 1.91 my $current = $Coro::current;
258    
259     # register a hypothetical event handler
260     on_event_invoke sub {
261     # wake up sleeping coroutine
262     $current->ready;
263     undef $current;
264     };
265    
266 root 1.124 # call schedule until event occurred.
267 root 1.91 # in case we are woken up for other reasons
268     # (current still defined), loop.
269     Coro::schedule while $current;
270     }
271 root 1.1
272 root 1.22 =item cede
273 root 1.1
274 root 1.92 "Cede" to other coroutines. This function puts the current coroutine into the
275 root 1.22 ready queue and calls C<schedule>, which has the effect of giving up the
276     current "timeslice" to other coroutines of the same or higher priority.
277 root 1.7
278 root 1.102 =item Coro::cede_notself
279    
280     Works like cede, but is not exported by default and will cede to any
281     coroutine, regardless of priority, once.
282    
283 root 1.40 =item terminate [arg...]
284 root 1.7
285 root 1.92 Terminates the current coroutine with the given status values (see L<cancel>).
286 root 1.13
287 root 1.141 =item killall
288    
289     Kills/terminates/cancels all coroutines except the currently running
290     one. This is useful after a fork, either in the child or the parent, as
291     usually only one of them should inherit the running coroutines.
292    
293 root 1.1 =cut
294    
295 root 1.8 sub terminate {
296 pcg 1.59 $current->cancel (@_);
297 root 1.1 }
298 root 1.6
299 root 1.141 sub killall {
300     for (Coro::State::list) {
301     $_->cancel
302     if $_ != $current && UNIVERSAL::isa $_, "Coro";
303     }
304     }
305    
306 root 1.8 =back
307    
308 root 1.92 =head2 COROUTINE METHODS
309 root 1.8
310 root 1.92 These are the methods you can call on coroutine objects.
311 root 1.6
312 root 1.8 =over 4
313    
314 root 1.13 =item new Coro \&sub [, @args...]
315 root 1.8
316 root 1.92 Create a new coroutine and return it. When the sub returns the coroutine
317 root 1.40 automatically terminates as if C<terminate> with the returned values were
318 root 1.92 called. To make the coroutine run you must first put it into the ready queue
319 root 1.41 by calling the ready method.
320 root 1.13
321 root 1.145 See C<async> and C<Coro::State::new> for additional info about the
322     coroutine environment.
323 root 1.89
324 root 1.6 =cut
325    
326 root 1.94 sub _run_coro {
327 root 1.13 terminate &{+shift};
328     }
329    
330 root 1.8 sub new {
331     my $class = shift;
332 root 1.83
333 root 1.94 $class->SUPER::new (\&_run_coro, @_)
334 root 1.8 }
335 root 1.6
336 root 1.92 =item $success = $coroutine->ready
337 root 1.1
338 root 1.92 Put the given coroutine into the ready queue (according to it's priority)
339     and return true. If the coroutine is already in the ready queue, do nothing
340 root 1.90 and return false.
341 root 1.1
342 root 1.92 =item $is_ready = $coroutine->is_ready
343 root 1.90
344 root 1.92 Return wether the coroutine is currently the ready queue or not,
345 root 1.28
346 root 1.92 =item $coroutine->cancel (arg...)
347 root 1.28
348 root 1.92 Terminates the given coroutine and makes it return the given arguments as
349 root 1.103 status (default: the empty list). Never returns if the coroutine is the
350     current coroutine.
351 root 1.28
352     =cut
353    
354     sub cancel {
355 pcg 1.59 my $self = shift;
356 root 1.142 $self->{_status} = [@_];
357 root 1.103
358     if ($current == $self) {
359     push @destroy, $self;
360     $manager->ready;
361     &schedule while 1;
362     } else {
363     $self->_cancel;
364     }
365 root 1.40 }
366    
367 root 1.92 =item $coroutine->join
368 root 1.40
369     Wait until the coroutine terminates and return any values given to the
370 root 1.143 C<terminate> or C<cancel> functions. C<join> can be called concurrently
371     from multiple coroutines.
372 root 1.40
373     =cut
374    
375     sub join {
376     my $self = shift;
377 root 1.103
378 root 1.142 unless ($self->{_status}) {
379 root 1.103 my $current = $current;
380    
381 root 1.142 push @{$self->{_on_destroy}}, sub {
382 root 1.103 $current->ready;
383     undef $current;
384     };
385    
386     &schedule while $current;
387 root 1.40 }
388 root 1.103
389 root 1.142 wantarray ? @{$self->{_status}} : $self->{_status}[0];
390 root 1.31 }
391    
392 root 1.101 =item $coroutine->on_destroy (\&cb)
393    
394     Registers a callback that is called when this coroutine gets destroyed,
395     but before it is joined. The callback gets passed the terminate arguments,
396     if any.
397    
398     =cut
399    
400     sub on_destroy {
401     my ($self, $cb) = @_;
402    
403 root 1.142 push @{ $self->{_on_destroy} }, $cb;
404 root 1.101 }
405    
406 root 1.92 =item $oldprio = $coroutine->prio ($newprio)
407 root 1.31
408 root 1.41 Sets (or gets, if the argument is missing) the priority of the
409 root 1.92 coroutine. Higher priority coroutines get run before lower priority
410     coroutines. Priorities are small signed integers (currently -4 .. +3),
411 root 1.41 that you can refer to using PRIO_xxx constants (use the import tag :prio
412     to get then):
413 root 1.31
414     PRIO_MAX > PRIO_HIGH > PRIO_NORMAL > PRIO_LOW > PRIO_IDLE > PRIO_MIN
415     3 > 1 > 0 > -1 > -3 > -4
416    
417     # set priority to HIGH
418     current->prio(PRIO_HIGH);
419    
420     The idle coroutine ($Coro::idle) always has a lower priority than any
421     existing coroutine.
422    
423 root 1.92 Changing the priority of the current coroutine will take effect immediately,
424     but changing the priority of coroutines in the ready queue (but not
425 root 1.31 running) will only take effect after the next schedule (of that
426 root 1.92 coroutine). This is a bug that will be fixed in some future version.
427 root 1.31
428 root 1.92 =item $newprio = $coroutine->nice ($change)
429 root 1.31
430     Similar to C<prio>, but subtract the given value from the priority (i.e.
431     higher values mean lower priority, just as in unix).
432    
433 root 1.92 =item $olddesc = $coroutine->desc ($newdesc)
434 root 1.41
435     Sets (or gets in case the argument is missing) the description for this
436 root 1.92 coroutine. This is just a free-form string you can associate with a coroutine.
437 root 1.41
438 root 1.142 This method simply sets the C<< $coroutine->{desc} >> member to the given string. You
439     can modify this member directly if you wish.
440    
441 root 1.150 =item $coroutine->throw ([$scalar])
442    
443     If C<$throw> is specified and defined, it will be thrown as an exception
444     inside the coroutine at the next convinient point in time (usually after
445     it gains control at the next schedule/transfer/cede). Otherwise clears the
446     exception object.
447    
448     The exception object will be thrown "as is" with the specified scalar in
449     C<$@>, i.e. if it is a string, no line number or newline will be appended
450     (unlike with C<die>).
451    
452     This can be used as a softer means than C<cancel> to ask a coroutine to
453     end itself, although there is no guarentee that the exception will lead to
454     termination, and if the exception isn't caught it might well end the whole
455     program.
456    
457 root 1.41 =cut
458    
459     sub desc {
460     my $old = $_[0]{desc};
461     $_[0]{desc} = $_[1] if @_ > 1;
462     $old;
463 root 1.8 }
464 root 1.1
465 root 1.8 =back
466 root 1.2
467 root 1.97 =head2 GLOBAL FUNCTIONS
468 root 1.92
469     =over 4
470    
471 root 1.97 =item Coro::nready
472    
473     Returns the number of coroutines that are currently in the ready state,
474 root 1.124 i.e. that can be switched to. The value C<0> means that the only runnable
475 root 1.97 coroutine is the currently running one, so C<cede> would have no effect,
476     and C<schedule> would cause a deadlock unless there is an idle handler
477     that wakes up some coroutines.
478    
479 root 1.103 =item my $guard = Coro::guard { ... }
480    
481 root 1.119 This creates and returns a guard object. Nothing happens until the object
482 root 1.103 gets destroyed, in which case the codeblock given as argument will be
483     executed. This is useful to free locks or other resources in case of a
484     runtime error or when the coroutine gets canceled, as in both cases the
485     guard block will be executed. The guard object supports only one method,
486     C<< ->cancel >>, which will keep the codeblock from being executed.
487    
488     Example: set some flag and clear it again when the coroutine gets canceled
489     or the function returns:
490    
491     sub do_something {
492     my $guard = Coro::guard { $busy = 0 };
493     $busy = 1;
494    
495     # do something that requires $busy to be true
496     }
497    
498     =cut
499    
500     sub guard(&) {
501     bless \(my $cb = $_[0]), "Coro::guard"
502     }
503    
504     sub Coro::guard::cancel {
505     ${$_[0]} = sub { };
506     }
507    
508     sub Coro::guard::DESTROY {
509     ${$_[0]}->();
510     }
511    
512    
513 root 1.92 =item unblock_sub { ... }
514    
515     This utility function takes a BLOCK or code reference and "unblocks" it,
516     returning the new coderef. This means that the new coderef will return
517     immediately without blocking, returning nothing, while the original code
518     ref will be called (with parameters) from within its own coroutine.
519    
520 root 1.124 The reason this function exists is that many event libraries (such as the
521 root 1.92 venerable L<Event|Event> module) are not coroutine-safe (a weaker form
522     of thread-safety). This means you must not block within event callbacks,
523     otherwise you might suffer from crashes or worse.
524    
525     This function allows your callbacks to block by executing them in another
526     coroutine where it is safe to block. One example where blocking is handy
527     is when you use the L<Coro::AIO|Coro::AIO> functions to save results to
528     disk.
529    
530     In short: simply use C<unblock_sub { ... }> instead of C<sub { ... }> when
531     creating event callbacks that want to block.
532    
533     =cut
534    
535     our @unblock_queue;
536    
537 root 1.105 # we create a special coro because we want to cede,
538     # to reduce pressure on the coro pool (because most callbacks
539     # return immediately and can be reused) and because we cannot cede
540     # inside an event callback.
541 root 1.132 our $unblock_scheduler = new Coro sub {
542 root 1.92 while () {
543     while (my $cb = pop @unblock_queue) {
544 root 1.105 # this is an inlined copy of async_pool
545 root 1.134 my $coro = (pop @async_pool) || new Coro \&pool_handler;
546 root 1.105
547     $coro->{_invoke} = $cb;
548     $coro->ready;
549     cede; # for short-lived callbacks, this reduces pressure on the coro pool
550 root 1.92 }
551 root 1.105 schedule; # sleep well
552 root 1.92 }
553     };
554 root 1.132 $unblock_scheduler->desc ("[unblock_sub scheduler]");
555 root 1.92
556     sub unblock_sub(&) {
557     my $cb = shift;
558    
559     sub {
560 root 1.105 unshift @unblock_queue, [$cb, @_];
561 root 1.92 $unblock_scheduler->ready;
562     }
563     }
564    
565     =back
566    
567 root 1.8 =cut
568 root 1.2
569 root 1.8 1;
570 root 1.14
571 root 1.17 =head1 BUGS/LIMITATIONS
572 root 1.14
573 root 1.52 - you must make very sure that no coro is still active on global
574 root 1.53 destruction. very bad things might happen otherwise (usually segfaults).
575 root 1.52
576     - this module is not thread-safe. You should only ever use this module
577 root 1.124 from the same thread (this requirement might be loosened in the future
578 root 1.52 to allow per-thread schedulers, but Coro::State does not yet allow
579     this).
580 root 1.9
581     =head1 SEE ALSO
582    
583 root 1.152 Lower level Configuration, Coroutine Environment: L<Coro::State>.
584    
585     Debugging: L<Coro::Debug>.
586    
587     Support/Utility: L<Coro::Specific>, L<Coro::Util>.
588 root 1.67
589     Locking/IPC: L<Coro::Signal>, L<Coro::Channel>, L<Coro::Semaphore>, L<Coro::SemaphoreSet>, L<Coro::RWLock>.
590    
591 root 1.152 Event/IO: L<Coro::Timer>, L<Coro::Event>, L<Coro::Handle>, L<Coro::Socket>.
592    
593     Compatibility: L<Coro::LWP>, L<Coro::Storable>, L<Coro::Select>.
594 root 1.67
595 root 1.162 Embedding: L<Coro::MakeMaker>.
596 root 1.1
597     =head1 AUTHOR
598    
599 root 1.66 Marc Lehmann <schmorp@schmorp.de>
600 root 1.64 http://home.schmorp.de/
601 root 1.1
602     =cut
603