ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/Coro/Coro.pm
Revision: 1.97
Committed: Mon Dec 4 13:47:56 2006 UTC (17 years, 6 months ago) by root
Branch: MAIN
Changes since 1.96: +10 -2 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3 root 1.8 Coro - coroutine process abstraction
4 root 1.1
5     =head1 SYNOPSIS
6    
7     use Coro;
8    
9 root 1.8 async {
10     # some asynchronous thread of execution
11 root 1.2 };
12    
13 root 1.92 # alternatively create an async coroutine like this:
14 root 1.6
15 root 1.8 sub some_func : Coro {
16     # some more async code
17     }
18    
19 root 1.22 cede;
20 root 1.2
21 root 1.1 =head1 DESCRIPTION
22    
23 root 1.14 This module collection manages coroutines. Coroutines are similar to
24 root 1.42 threads but don't run in parallel.
25 root 1.14
26 root 1.20 In this module, coroutines are defined as "callchain + lexical variables
27 root 1.23 + @_ + $_ + $@ + $^W + C stack), that is, a coroutine has it's own
28     callchain, it's own set of lexicals and it's own set of perl's most
29     important global variables.
30 root 1.22
31 root 1.8 =cut
32    
33     package Coro;
34    
35 root 1.71 use strict;
36     no warnings "uninitialized";
37 root 1.36
38 root 1.8 use Coro::State;
39    
40 root 1.83 use base qw(Coro::State Exporter);
41 pcg 1.55
42 root 1.83 our $idle; # idle handler
43 root 1.71 our $main; # main coroutine
44     our $current; # current coroutine
45 root 1.8
46 root 1.95 our $VERSION = '3.01';
47 root 1.8
48 root 1.92 our @EXPORT = qw(async cede schedule terminate current unblock_sub);
49 root 1.71 our %EXPORT_TAGS = (
50 root 1.31 prio => [qw(PRIO_MAX PRIO_HIGH PRIO_NORMAL PRIO_LOW PRIO_IDLE PRIO_MIN)],
51     );
52 root 1.97 our @EXPORT_OK = (@{$EXPORT_TAGS{prio}}, qw(nready));
53 root 1.8
54     {
55     my @async;
56 root 1.26 my $init;
57 root 1.8
58     # this way of handling attributes simply is NOT scalable ;()
59     sub import {
60 root 1.71 no strict 'refs';
61    
62 root 1.93 Coro->export_to_level (1, @_);
63 root 1.71
64 root 1.8 my $old = *{(caller)[0]."::MODIFY_CODE_ATTRIBUTES"}{CODE};
65     *{(caller)[0]."::MODIFY_CODE_ATTRIBUTES"} = sub {
66     my ($package, $ref) = (shift, shift);
67     my @attrs;
68     for (@_) {
69     if ($_ eq "Coro") {
70     push @async, $ref;
71 root 1.26 unless ($init++) {
72     eval q{
73     sub INIT {
74     &async(pop @async) while @async;
75     }
76     };
77     }
78 root 1.8 } else {
79 root 1.17 push @attrs, $_;
80 root 1.8 }
81     }
82 root 1.17 return $old ? $old->($package, $ref, @attrs) : @attrs;
83 root 1.8 };
84     }
85    
86     }
87    
88 root 1.43 =over 4
89    
90 root 1.8 =item $main
91 root 1.2
92 root 1.8 This coroutine represents the main program.
93 root 1.1
94     =cut
95    
96 pcg 1.55 $main = new Coro;
97 root 1.8
98 root 1.19 =item $current (or as function: current)
99 root 1.1
100 root 1.83 The current coroutine (the last coroutine switched to). The initial value
101     is C<$main> (of course).
102    
103     This variable is B<strictly> I<read-only>. It is provided for performance
104     reasons. If performance is not essentiel you are encouraged to use the
105     C<Coro::current> function instead.
106 root 1.1
107 root 1.8 =cut
108    
109     # maybe some other module used Coro::Specific before...
110 root 1.93 $main->{specific} = $current->{specific}
111     if $current;
112 root 1.1
113 root 1.93 _set_current $main;
114 root 1.19
115     sub current() { $current }
116 root 1.9
117     =item $idle
118    
119 root 1.83 A callback that is called whenever the scheduler finds no ready coroutines
120     to run. The default implementation prints "FATAL: deadlock detected" and
121 root 1.91 exits, because the program has no other way to continue.
122 root 1.83
123     This hook is overwritten by modules such as C<Coro::Timer> and
124 root 1.91 C<Coro::Event> to wait on an external event that hopefully wake up a
125     coroutine so the scheduler can run it.
126    
127     Please note that if your callback recursively invokes perl (e.g. for event
128     handlers), then it must be prepared to be called recursively.
129 root 1.9
130     =cut
131    
132 root 1.83 $idle = sub {
133 root 1.96 require Carp;
134     Carp::croak ("FATAL: deadlock detected");
135 root 1.9 };
136 root 1.8
137 root 1.24 # this coroutine is necessary because a coroutine
138     # cannot destroy itself.
139     my @destroy;
140 root 1.86 my $manager; $manager = new Coro sub {
141 pcg 1.57 while () {
142 root 1.37 # by overwriting the state object with the manager we destroy it
143     # while still being able to schedule this coroutine (in case it has
144     # been readied multiple times. this is harmless since the manager
145     # can be called as many times as neccessary and will always
146     # remove itself from the runqueue
147 root 1.40 while (@destroy) {
148     my $coro = pop @destroy;
149     $coro->{status} ||= [];
150     $_->ready for @{delete $coro->{join} || []};
151 pcg 1.59
152 root 1.83 # the next line destroys the coro state, but keeps the
153 root 1.92 # coroutine itself intact (we basically make it a zombie
154     # coroutine that always runs the manager thread, so it's possible
155     # to transfer() to this coroutine).
156 root 1.83 $coro->_clone_state_from ($manager);
157 root 1.40 }
158 root 1.24 &schedule;
159     }
160     };
161    
162 root 1.8 # static methods. not really.
163 root 1.43
164     =back
165 root 1.8
166     =head2 STATIC METHODS
167    
168 root 1.92 Static methods are actually functions that operate on the current coroutine only.
169 root 1.8
170     =over 4
171    
172 root 1.13 =item async { ... } [@args...]
173 root 1.8
174 root 1.92 Create a new asynchronous coroutine and return it's coroutine object
175     (usually unused). When the sub returns the new coroutine is automatically
176 root 1.8 terminated.
177    
178 root 1.89 Calling C<exit> in a coroutine will not work correctly, so do not do that.
179    
180 root 1.79 When the coroutine dies, the program will exit, just as in the main
181     program.
182    
183 root 1.13 # create a new coroutine that just prints its arguments
184     async {
185     print "@_\n";
186     } 1,2,3,4;
187    
188 root 1.8 =cut
189    
190 root 1.13 sub async(&@) {
191     my $pid = new Coro @_;
192 root 1.11 $pid->ready;
193 root 1.85 $pid
194 root 1.8 }
195 root 1.1
196 root 1.8 =item schedule
197 root 1.6
198 root 1.92 Calls the scheduler. Please note that the current coroutine will not be put
199 root 1.8 into the ready queue, so calling this function usually means you will
200 root 1.91 never be called again unless something else (e.g. an event handler) calls
201     ready.
202    
203     The canonical way to wait on external events is this:
204    
205     {
206 root 1.92 # remember current coroutine
207 root 1.91 my $current = $Coro::current;
208    
209     # register a hypothetical event handler
210     on_event_invoke sub {
211     # wake up sleeping coroutine
212     $current->ready;
213     undef $current;
214     };
215    
216     # call schedule until event occured.
217     # in case we are woken up for other reasons
218     # (current still defined), loop.
219     Coro::schedule while $current;
220     }
221 root 1.1
222 root 1.22 =item cede
223 root 1.1
224 root 1.92 "Cede" to other coroutines. This function puts the current coroutine into the
225 root 1.22 ready queue and calls C<schedule>, which has the effect of giving up the
226     current "timeslice" to other coroutines of the same or higher priority.
227 root 1.7
228 root 1.40 =item terminate [arg...]
229 root 1.7
230 root 1.92 Terminates the current coroutine with the given status values (see L<cancel>).
231 root 1.13
232 root 1.1 =cut
233    
234 root 1.8 sub terminate {
235 pcg 1.59 $current->cancel (@_);
236 root 1.1 }
237 root 1.6
238 root 1.8 =back
239    
240     # dynamic methods
241    
242 root 1.92 =head2 COROUTINE METHODS
243 root 1.8
244 root 1.92 These are the methods you can call on coroutine objects.
245 root 1.6
246 root 1.8 =over 4
247    
248 root 1.13 =item new Coro \&sub [, @args...]
249 root 1.8
250 root 1.92 Create a new coroutine and return it. When the sub returns the coroutine
251 root 1.40 automatically terminates as if C<terminate> with the returned values were
252 root 1.92 called. To make the coroutine run you must first put it into the ready queue
253 root 1.41 by calling the ready method.
254 root 1.13
255 root 1.89 Calling C<exit> in a coroutine will not work correctly, so do not do that.
256    
257 root 1.6 =cut
258    
259 root 1.94 sub _run_coro {
260 root 1.13 terminate &{+shift};
261     }
262    
263 root 1.8 sub new {
264     my $class = shift;
265 root 1.83
266 root 1.94 $class->SUPER::new (\&_run_coro, @_)
267 root 1.8 }
268 root 1.6
269 root 1.92 =item $success = $coroutine->ready
270 root 1.1
271 root 1.92 Put the given coroutine into the ready queue (according to it's priority)
272     and return true. If the coroutine is already in the ready queue, do nothing
273 root 1.90 and return false.
274 root 1.1
275 root 1.92 =item $is_ready = $coroutine->is_ready
276 root 1.90
277 root 1.92 Return wether the coroutine is currently the ready queue or not,
278 root 1.28
279 root 1.92 =item $coroutine->cancel (arg...)
280 root 1.28
281 root 1.92 Terminates the given coroutine and makes it return the given arguments as
282 pcg 1.59 status (default: the empty list).
283 root 1.28
284     =cut
285    
286     sub cancel {
287 pcg 1.59 my $self = shift;
288     $self->{status} = [@_];
289     push @destroy, $self;
290 root 1.28 $manager->ready;
291 pcg 1.59 &schedule if $current == $self;
292 root 1.40 }
293    
294 root 1.92 =item $coroutine->join
295 root 1.40
296     Wait until the coroutine terminates and return any values given to the
297 pcg 1.59 C<terminate> or C<cancel> functions. C<join> can be called multiple times
298 root 1.92 from multiple coroutine.
299 root 1.40
300     =cut
301    
302     sub join {
303     my $self = shift;
304     unless ($self->{status}) {
305     push @{$self->{join}}, $current;
306     &schedule;
307     }
308     wantarray ? @{$self->{status}} : $self->{status}[0];
309 root 1.31 }
310    
311 root 1.92 =item $oldprio = $coroutine->prio ($newprio)
312 root 1.31
313 root 1.41 Sets (or gets, if the argument is missing) the priority of the
314 root 1.92 coroutine. Higher priority coroutines get run before lower priority
315     coroutines. Priorities are small signed integers (currently -4 .. +3),
316 root 1.41 that you can refer to using PRIO_xxx constants (use the import tag :prio
317     to get then):
318 root 1.31
319     PRIO_MAX > PRIO_HIGH > PRIO_NORMAL > PRIO_LOW > PRIO_IDLE > PRIO_MIN
320     3 > 1 > 0 > -1 > -3 > -4
321    
322     # set priority to HIGH
323     current->prio(PRIO_HIGH);
324    
325     The idle coroutine ($Coro::idle) always has a lower priority than any
326     existing coroutine.
327    
328 root 1.92 Changing the priority of the current coroutine will take effect immediately,
329     but changing the priority of coroutines in the ready queue (but not
330 root 1.31 running) will only take effect after the next schedule (of that
331 root 1.92 coroutine). This is a bug that will be fixed in some future version.
332 root 1.31
333 root 1.92 =item $newprio = $coroutine->nice ($change)
334 root 1.31
335     Similar to C<prio>, but subtract the given value from the priority (i.e.
336     higher values mean lower priority, just as in unix).
337    
338 root 1.92 =item $olddesc = $coroutine->desc ($newdesc)
339 root 1.41
340     Sets (or gets in case the argument is missing) the description for this
341 root 1.92 coroutine. This is just a free-form string you can associate with a coroutine.
342 root 1.41
343     =cut
344    
345     sub desc {
346     my $old = $_[0]{desc};
347     $_[0]{desc} = $_[1] if @_ > 1;
348     $old;
349 root 1.8 }
350 root 1.1
351 root 1.8 =back
352 root 1.2
353 root 1.97 =head2 GLOBAL FUNCTIONS
354 root 1.92
355     =over 4
356    
357 root 1.97 =item Coro::nready
358    
359     Returns the number of coroutines that are currently in the ready state,
360     i.e. that can be swicthed to. The value C<0> means that the only runnable
361     coroutine is the currently running one, so C<cede> would have no effect,
362     and C<schedule> would cause a deadlock unless there is an idle handler
363     that wakes up some coroutines.
364    
365 root 1.92 =item unblock_sub { ... }
366    
367     This utility function takes a BLOCK or code reference and "unblocks" it,
368     returning the new coderef. This means that the new coderef will return
369     immediately without blocking, returning nothing, while the original code
370     ref will be called (with parameters) from within its own coroutine.
371    
372     The reason this fucntion exists is that many event libraries (such as the
373     venerable L<Event|Event> module) are not coroutine-safe (a weaker form
374     of thread-safety). This means you must not block within event callbacks,
375     otherwise you might suffer from crashes or worse.
376    
377     This function allows your callbacks to block by executing them in another
378     coroutine where it is safe to block. One example where blocking is handy
379     is when you use the L<Coro::AIO|Coro::AIO> functions to save results to
380     disk.
381    
382     In short: simply use C<unblock_sub { ... }> instead of C<sub { ... }> when
383     creating event callbacks that want to block.
384    
385     =cut
386    
387     our @unblock_pool;
388     our @unblock_queue;
389     our $UNBLOCK_POOL_SIZE = 2;
390    
391     sub unblock_handler_ {
392     while () {
393     my ($cb, @arg) = @{ delete $Coro::current->{arg} };
394     $cb->(@arg);
395    
396     last if @unblock_pool >= $UNBLOCK_POOL_SIZE;
397     push @unblock_pool, $Coro::current;
398     schedule;
399     }
400     }
401    
402     our $unblock_scheduler = async {
403     while () {
404     while (my $cb = pop @unblock_queue) {
405     my $handler = (pop @unblock_pool or new Coro \&unblock_handler_);
406     $handler->{arg} = $cb;
407     $handler->ready;
408     cede;
409     }
410    
411     schedule;
412     }
413     };
414    
415     sub unblock_sub(&) {
416     my $cb = shift;
417    
418     sub {
419     push @unblock_queue, [$cb, @_];
420     $unblock_scheduler->ready;
421     }
422     }
423    
424     =back
425    
426 root 1.8 =cut
427 root 1.2
428 root 1.8 1;
429 root 1.14
430 root 1.17 =head1 BUGS/LIMITATIONS
431 root 1.14
432 root 1.52 - you must make very sure that no coro is still active on global
433 root 1.53 destruction. very bad things might happen otherwise (usually segfaults).
434 root 1.52
435     - this module is not thread-safe. You should only ever use this module
436     from the same thread (this requirement might be losened in the future
437     to allow per-thread schedulers, but Coro::State does not yet allow
438     this).
439 root 1.9
440     =head1 SEE ALSO
441    
442 root 1.67 Support/Utility: L<Coro::Cont>, L<Coro::Specific>, L<Coro::State>, L<Coro::Util>.
443    
444     Locking/IPC: L<Coro::Signal>, L<Coro::Channel>, L<Coro::Semaphore>, L<Coro::SemaphoreSet>, L<Coro::RWLock>.
445    
446     Event/IO: L<Coro::Timer>, L<Coro::Event>, L<Coro::Handle>, L<Coro::Socket>, L<Coro::Select>.
447    
448     Embedding: L<Coro:MakeMaker>
449 root 1.1
450     =head1 AUTHOR
451    
452 root 1.66 Marc Lehmann <schmorp@schmorp.de>
453 root 1.64 http://home.schmorp.de/
454 root 1.1
455     =cut
456