ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/Coro/Coro.pm
Revision: 1.99
Committed: Tue Dec 5 12:50:04 2006 UTC (17 years, 5 months ago) by root
Branch: MAIN
CVS Tags: rel-3_11
Changes since 1.98: +1 -1 lines
Log Message:
3.11

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3 root 1.8 Coro - coroutine process abstraction
4 root 1.1
5     =head1 SYNOPSIS
6    
7     use Coro;
8    
9 root 1.8 async {
10     # some asynchronous thread of execution
11 root 1.2 };
12    
13 root 1.92 # alternatively create an async coroutine like this:
14 root 1.6
15 root 1.8 sub some_func : Coro {
16     # some more async code
17     }
18    
19 root 1.22 cede;
20 root 1.2
21 root 1.1 =head1 DESCRIPTION
22    
23 root 1.98 This module collection manages coroutines. Coroutines are similar
24     to threads but don't run in parallel at the same time even on SMP
25     machines. The specific flavor of coroutine use din this module also
26     guarentees you that it will not switch between coroutines unless
27     necessary, at easily-identified points in your program, so locking and
28     parallel access are rarely an issue, making coroutine programming much
29     safer than threads programming.
30    
31     (Perl, however, does not natively support real threads but instead does a
32     very slow and memory-intensive emulation of processes using threads. This
33     is a performance win on Windows machines, and a loss everywhere else).
34    
35     In this module, coroutines are defined as "callchain + lexical variables +
36     @_ + $_ + $@ + $/ + C stack), that is, a coroutine has its own callchain,
37     its own set of lexicals and its own set of perls most important global
38     variables.
39 root 1.22
40 root 1.8 =cut
41    
42     package Coro;
43    
44 root 1.71 use strict;
45     no warnings "uninitialized";
46 root 1.36
47 root 1.8 use Coro::State;
48    
49 root 1.83 use base qw(Coro::State Exporter);
50 pcg 1.55
51 root 1.83 our $idle; # idle handler
52 root 1.71 our $main; # main coroutine
53     our $current; # current coroutine
54 root 1.8
55 root 1.99 our $VERSION = '3.11';
56 root 1.8
57 root 1.92 our @EXPORT = qw(async cede schedule terminate current unblock_sub);
58 root 1.71 our %EXPORT_TAGS = (
59 root 1.31 prio => [qw(PRIO_MAX PRIO_HIGH PRIO_NORMAL PRIO_LOW PRIO_IDLE PRIO_MIN)],
60     );
61 root 1.97 our @EXPORT_OK = (@{$EXPORT_TAGS{prio}}, qw(nready));
62 root 1.8
63     {
64     my @async;
65 root 1.26 my $init;
66 root 1.8
67     # this way of handling attributes simply is NOT scalable ;()
68     sub import {
69 root 1.71 no strict 'refs';
70    
71 root 1.93 Coro->export_to_level (1, @_);
72 root 1.71
73 root 1.8 my $old = *{(caller)[0]."::MODIFY_CODE_ATTRIBUTES"}{CODE};
74     *{(caller)[0]."::MODIFY_CODE_ATTRIBUTES"} = sub {
75     my ($package, $ref) = (shift, shift);
76     my @attrs;
77     for (@_) {
78     if ($_ eq "Coro") {
79     push @async, $ref;
80 root 1.26 unless ($init++) {
81     eval q{
82     sub INIT {
83     &async(pop @async) while @async;
84     }
85     };
86     }
87 root 1.8 } else {
88 root 1.17 push @attrs, $_;
89 root 1.8 }
90     }
91 root 1.17 return $old ? $old->($package, $ref, @attrs) : @attrs;
92 root 1.8 };
93     }
94    
95     }
96    
97 root 1.43 =over 4
98    
99 root 1.8 =item $main
100 root 1.2
101 root 1.8 This coroutine represents the main program.
102 root 1.1
103     =cut
104    
105 pcg 1.55 $main = new Coro;
106 root 1.8
107 root 1.19 =item $current (or as function: current)
108 root 1.1
109 root 1.83 The current coroutine (the last coroutine switched to). The initial value
110     is C<$main> (of course).
111    
112     This variable is B<strictly> I<read-only>. It is provided for performance
113     reasons. If performance is not essentiel you are encouraged to use the
114     C<Coro::current> function instead.
115 root 1.1
116 root 1.8 =cut
117    
118     # maybe some other module used Coro::Specific before...
119 root 1.93 $main->{specific} = $current->{specific}
120     if $current;
121 root 1.1
122 root 1.93 _set_current $main;
123 root 1.19
124     sub current() { $current }
125 root 1.9
126     =item $idle
127    
128 root 1.83 A callback that is called whenever the scheduler finds no ready coroutines
129     to run. The default implementation prints "FATAL: deadlock detected" and
130 root 1.91 exits, because the program has no other way to continue.
131 root 1.83
132     This hook is overwritten by modules such as C<Coro::Timer> and
133 root 1.91 C<Coro::Event> to wait on an external event that hopefully wake up a
134     coroutine so the scheduler can run it.
135    
136     Please note that if your callback recursively invokes perl (e.g. for event
137     handlers), then it must be prepared to be called recursively.
138 root 1.9
139     =cut
140    
141 root 1.83 $idle = sub {
142 root 1.96 require Carp;
143     Carp::croak ("FATAL: deadlock detected");
144 root 1.9 };
145 root 1.8
146 root 1.24 # this coroutine is necessary because a coroutine
147     # cannot destroy itself.
148     my @destroy;
149 root 1.86 my $manager; $manager = new Coro sub {
150 pcg 1.57 while () {
151 root 1.37 # by overwriting the state object with the manager we destroy it
152     # while still being able to schedule this coroutine (in case it has
153     # been readied multiple times. this is harmless since the manager
154     # can be called as many times as neccessary and will always
155     # remove itself from the runqueue
156 root 1.40 while (@destroy) {
157     my $coro = pop @destroy;
158     $coro->{status} ||= [];
159     $_->ready for @{delete $coro->{join} || []};
160 pcg 1.59
161 root 1.83 # the next line destroys the coro state, but keeps the
162 root 1.92 # coroutine itself intact (we basically make it a zombie
163     # coroutine that always runs the manager thread, so it's possible
164     # to transfer() to this coroutine).
165 root 1.83 $coro->_clone_state_from ($manager);
166 root 1.40 }
167 root 1.24 &schedule;
168     }
169     };
170    
171 root 1.8 # static methods. not really.
172 root 1.43
173     =back
174 root 1.8
175     =head2 STATIC METHODS
176    
177 root 1.92 Static methods are actually functions that operate on the current coroutine only.
178 root 1.8
179     =over 4
180    
181 root 1.13 =item async { ... } [@args...]
182 root 1.8
183 root 1.92 Create a new asynchronous coroutine and return it's coroutine object
184     (usually unused). When the sub returns the new coroutine is automatically
185 root 1.8 terminated.
186    
187 root 1.89 Calling C<exit> in a coroutine will not work correctly, so do not do that.
188    
189 root 1.79 When the coroutine dies, the program will exit, just as in the main
190     program.
191    
192 root 1.13 # create a new coroutine that just prints its arguments
193     async {
194     print "@_\n";
195     } 1,2,3,4;
196    
197 root 1.8 =cut
198    
199 root 1.13 sub async(&@) {
200     my $pid = new Coro @_;
201 root 1.11 $pid->ready;
202 root 1.85 $pid
203 root 1.8 }
204 root 1.1
205 root 1.8 =item schedule
206 root 1.6
207 root 1.92 Calls the scheduler. Please note that the current coroutine will not be put
208 root 1.8 into the ready queue, so calling this function usually means you will
209 root 1.91 never be called again unless something else (e.g. an event handler) calls
210     ready.
211    
212     The canonical way to wait on external events is this:
213    
214     {
215 root 1.92 # remember current coroutine
216 root 1.91 my $current = $Coro::current;
217    
218     # register a hypothetical event handler
219     on_event_invoke sub {
220     # wake up sleeping coroutine
221     $current->ready;
222     undef $current;
223     };
224    
225     # call schedule until event occured.
226     # in case we are woken up for other reasons
227     # (current still defined), loop.
228     Coro::schedule while $current;
229     }
230 root 1.1
231 root 1.22 =item cede
232 root 1.1
233 root 1.92 "Cede" to other coroutines. This function puts the current coroutine into the
234 root 1.22 ready queue and calls C<schedule>, which has the effect of giving up the
235     current "timeslice" to other coroutines of the same or higher priority.
236 root 1.7
237 root 1.40 =item terminate [arg...]
238 root 1.7
239 root 1.92 Terminates the current coroutine with the given status values (see L<cancel>).
240 root 1.13
241 root 1.1 =cut
242    
243 root 1.8 sub terminate {
244 pcg 1.59 $current->cancel (@_);
245 root 1.1 }
246 root 1.6
247 root 1.8 =back
248    
249     # dynamic methods
250    
251 root 1.92 =head2 COROUTINE METHODS
252 root 1.8
253 root 1.92 These are the methods you can call on coroutine objects.
254 root 1.6
255 root 1.8 =over 4
256    
257 root 1.13 =item new Coro \&sub [, @args...]
258 root 1.8
259 root 1.92 Create a new coroutine and return it. When the sub returns the coroutine
260 root 1.40 automatically terminates as if C<terminate> with the returned values were
261 root 1.92 called. To make the coroutine run you must first put it into the ready queue
262 root 1.41 by calling the ready method.
263 root 1.13
264 root 1.89 Calling C<exit> in a coroutine will not work correctly, so do not do that.
265    
266 root 1.6 =cut
267    
268 root 1.94 sub _run_coro {
269 root 1.13 terminate &{+shift};
270     }
271    
272 root 1.8 sub new {
273     my $class = shift;
274 root 1.83
275 root 1.94 $class->SUPER::new (\&_run_coro, @_)
276 root 1.8 }
277 root 1.6
278 root 1.92 =item $success = $coroutine->ready
279 root 1.1
280 root 1.92 Put the given coroutine into the ready queue (according to it's priority)
281     and return true. If the coroutine is already in the ready queue, do nothing
282 root 1.90 and return false.
283 root 1.1
284 root 1.92 =item $is_ready = $coroutine->is_ready
285 root 1.90
286 root 1.92 Return wether the coroutine is currently the ready queue or not,
287 root 1.28
288 root 1.92 =item $coroutine->cancel (arg...)
289 root 1.28
290 root 1.92 Terminates the given coroutine and makes it return the given arguments as
291 pcg 1.59 status (default: the empty list).
292 root 1.28
293     =cut
294    
295     sub cancel {
296 pcg 1.59 my $self = shift;
297     $self->{status} = [@_];
298     push @destroy, $self;
299 root 1.28 $manager->ready;
300 pcg 1.59 &schedule if $current == $self;
301 root 1.40 }
302    
303 root 1.92 =item $coroutine->join
304 root 1.40
305     Wait until the coroutine terminates and return any values given to the
306 pcg 1.59 C<terminate> or C<cancel> functions. C<join> can be called multiple times
307 root 1.92 from multiple coroutine.
308 root 1.40
309     =cut
310    
311     sub join {
312     my $self = shift;
313     unless ($self->{status}) {
314     push @{$self->{join}}, $current;
315     &schedule;
316     }
317     wantarray ? @{$self->{status}} : $self->{status}[0];
318 root 1.31 }
319    
320 root 1.92 =item $oldprio = $coroutine->prio ($newprio)
321 root 1.31
322 root 1.41 Sets (or gets, if the argument is missing) the priority of the
323 root 1.92 coroutine. Higher priority coroutines get run before lower priority
324     coroutines. Priorities are small signed integers (currently -4 .. +3),
325 root 1.41 that you can refer to using PRIO_xxx constants (use the import tag :prio
326     to get then):
327 root 1.31
328     PRIO_MAX > PRIO_HIGH > PRIO_NORMAL > PRIO_LOW > PRIO_IDLE > PRIO_MIN
329     3 > 1 > 0 > -1 > -3 > -4
330    
331     # set priority to HIGH
332     current->prio(PRIO_HIGH);
333    
334     The idle coroutine ($Coro::idle) always has a lower priority than any
335     existing coroutine.
336    
337 root 1.92 Changing the priority of the current coroutine will take effect immediately,
338     but changing the priority of coroutines in the ready queue (but not
339 root 1.31 running) will only take effect after the next schedule (of that
340 root 1.92 coroutine). This is a bug that will be fixed in some future version.
341 root 1.31
342 root 1.92 =item $newprio = $coroutine->nice ($change)
343 root 1.31
344     Similar to C<prio>, but subtract the given value from the priority (i.e.
345     higher values mean lower priority, just as in unix).
346    
347 root 1.92 =item $olddesc = $coroutine->desc ($newdesc)
348 root 1.41
349     Sets (or gets in case the argument is missing) the description for this
350 root 1.92 coroutine. This is just a free-form string you can associate with a coroutine.
351 root 1.41
352     =cut
353    
354     sub desc {
355     my $old = $_[0]{desc};
356     $_[0]{desc} = $_[1] if @_ > 1;
357     $old;
358 root 1.8 }
359 root 1.1
360 root 1.8 =back
361 root 1.2
362 root 1.97 =head2 GLOBAL FUNCTIONS
363 root 1.92
364     =over 4
365    
366 root 1.97 =item Coro::nready
367    
368     Returns the number of coroutines that are currently in the ready state,
369     i.e. that can be swicthed to. The value C<0> means that the only runnable
370     coroutine is the currently running one, so C<cede> would have no effect,
371     and C<schedule> would cause a deadlock unless there is an idle handler
372     that wakes up some coroutines.
373    
374 root 1.92 =item unblock_sub { ... }
375    
376     This utility function takes a BLOCK or code reference and "unblocks" it,
377     returning the new coderef. This means that the new coderef will return
378     immediately without blocking, returning nothing, while the original code
379     ref will be called (with parameters) from within its own coroutine.
380    
381     The reason this fucntion exists is that many event libraries (such as the
382     venerable L<Event|Event> module) are not coroutine-safe (a weaker form
383     of thread-safety). This means you must not block within event callbacks,
384     otherwise you might suffer from crashes or worse.
385    
386     This function allows your callbacks to block by executing them in another
387     coroutine where it is safe to block. One example where blocking is handy
388     is when you use the L<Coro::AIO|Coro::AIO> functions to save results to
389     disk.
390    
391     In short: simply use C<unblock_sub { ... }> instead of C<sub { ... }> when
392     creating event callbacks that want to block.
393    
394     =cut
395    
396     our @unblock_pool;
397     our @unblock_queue;
398     our $UNBLOCK_POOL_SIZE = 2;
399    
400     sub unblock_handler_ {
401     while () {
402     my ($cb, @arg) = @{ delete $Coro::current->{arg} };
403     $cb->(@arg);
404    
405     last if @unblock_pool >= $UNBLOCK_POOL_SIZE;
406     push @unblock_pool, $Coro::current;
407     schedule;
408     }
409     }
410    
411     our $unblock_scheduler = async {
412     while () {
413     while (my $cb = pop @unblock_queue) {
414     my $handler = (pop @unblock_pool or new Coro \&unblock_handler_);
415     $handler->{arg} = $cb;
416     $handler->ready;
417     cede;
418     }
419    
420     schedule;
421     }
422     };
423    
424     sub unblock_sub(&) {
425     my $cb = shift;
426    
427     sub {
428     push @unblock_queue, [$cb, @_];
429     $unblock_scheduler->ready;
430     }
431     }
432    
433     =back
434    
435 root 1.8 =cut
436 root 1.2
437 root 1.8 1;
438 root 1.14
439 root 1.17 =head1 BUGS/LIMITATIONS
440 root 1.14
441 root 1.52 - you must make very sure that no coro is still active on global
442 root 1.53 destruction. very bad things might happen otherwise (usually segfaults).
443 root 1.52
444     - this module is not thread-safe. You should only ever use this module
445     from the same thread (this requirement might be losened in the future
446     to allow per-thread schedulers, but Coro::State does not yet allow
447     this).
448 root 1.9
449     =head1 SEE ALSO
450    
451 root 1.67 Support/Utility: L<Coro::Cont>, L<Coro::Specific>, L<Coro::State>, L<Coro::Util>.
452    
453     Locking/IPC: L<Coro::Signal>, L<Coro::Channel>, L<Coro::Semaphore>, L<Coro::SemaphoreSet>, L<Coro::RWLock>.
454    
455     Event/IO: L<Coro::Timer>, L<Coro::Event>, L<Coro::Handle>, L<Coro::Socket>, L<Coro::Select>.
456    
457     Embedding: L<Coro:MakeMaker>
458 root 1.1
459     =head1 AUTHOR
460    
461 root 1.66 Marc Lehmann <schmorp@schmorp.de>
462 root 1.64 http://home.schmorp.de/
463 root 1.1
464     =cut
465