ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/Coro/Coro.pm
Revision: 1.92
Committed: Fri Dec 1 03:47:55 2006 UTC (17 years, 5 months ago) by root
Branch: MAIN
Changes since 1.91: +99 -38 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3 root 1.8 Coro - coroutine process abstraction
4 root 1.1
5     =head1 SYNOPSIS
6    
7     use Coro;
8    
9 root 1.8 async {
10     # some asynchronous thread of execution
11 root 1.2 };
12    
13 root 1.92 # alternatively create an async coroutine like this:
14 root 1.6
15 root 1.8 sub some_func : Coro {
16     # some more async code
17     }
18    
19 root 1.22 cede;
20 root 1.2
21 root 1.1 =head1 DESCRIPTION
22    
23 root 1.14 This module collection manages coroutines. Coroutines are similar to
24 root 1.42 threads but don't run in parallel.
25 root 1.14
26 root 1.20 In this module, coroutines are defined as "callchain + lexical variables
27 root 1.23 + @_ + $_ + $@ + $^W + C stack), that is, a coroutine has it's own
28     callchain, it's own set of lexicals and it's own set of perl's most
29     important global variables.
30 root 1.22
31 root 1.8 =cut
32    
33     package Coro;
34    
35 root 1.71 use strict;
36     no warnings "uninitialized";
37 root 1.36
38 root 1.8 use Coro::State;
39    
40 root 1.83 use base qw(Coro::State Exporter);
41 pcg 1.55
42 root 1.83 our $idle; # idle handler
43 root 1.71 our $main; # main coroutine
44     our $current; # current coroutine
45 root 1.8
46 root 1.88 our $VERSION = '3.0';
47 root 1.8
48 root 1.92 our @EXPORT = qw(async cede schedule terminate current unblock_sub);
49 root 1.71 our %EXPORT_TAGS = (
50 root 1.31 prio => [qw(PRIO_MAX PRIO_HIGH PRIO_NORMAL PRIO_LOW PRIO_IDLE PRIO_MIN)],
51     );
52 root 1.71 our @EXPORT_OK = @{$EXPORT_TAGS{prio}};
53 root 1.8
54     {
55     my @async;
56 root 1.26 my $init;
57 root 1.8
58     # this way of handling attributes simply is NOT scalable ;()
59     sub import {
60 root 1.71 no strict 'refs';
61    
62 root 1.8 Coro->export_to_level(1, @_);
63 root 1.71
64 root 1.8 my $old = *{(caller)[0]."::MODIFY_CODE_ATTRIBUTES"}{CODE};
65     *{(caller)[0]."::MODIFY_CODE_ATTRIBUTES"} = sub {
66     my ($package, $ref) = (shift, shift);
67     my @attrs;
68     for (@_) {
69     if ($_ eq "Coro") {
70     push @async, $ref;
71 root 1.26 unless ($init++) {
72     eval q{
73     sub INIT {
74     &async(pop @async) while @async;
75     }
76     };
77     }
78 root 1.8 } else {
79 root 1.17 push @attrs, $_;
80 root 1.8 }
81     }
82 root 1.17 return $old ? $old->($package, $ref, @attrs) : @attrs;
83 root 1.8 };
84     }
85    
86     }
87    
88 root 1.43 =over 4
89    
90 root 1.8 =item $main
91 root 1.2
92 root 1.8 This coroutine represents the main program.
93 root 1.1
94     =cut
95    
96 pcg 1.55 $main = new Coro;
97 root 1.8
98 root 1.19 =item $current (or as function: current)
99 root 1.1
100 root 1.83 The current coroutine (the last coroutine switched to). The initial value
101     is C<$main> (of course).
102    
103     This variable is B<strictly> I<read-only>. It is provided for performance
104     reasons. If performance is not essentiel you are encouraged to use the
105     C<Coro::current> function instead.
106 root 1.1
107 root 1.8 =cut
108    
109     # maybe some other module used Coro::Specific before...
110     if ($current) {
111     $main->{specific} = $current->{specific};
112 root 1.1 }
113    
114 pcg 1.55 $current = $main;
115 root 1.19
116     sub current() { $current }
117 root 1.9
118     =item $idle
119    
120 root 1.83 A callback that is called whenever the scheduler finds no ready coroutines
121     to run. The default implementation prints "FATAL: deadlock detected" and
122 root 1.91 exits, because the program has no other way to continue.
123 root 1.83
124     This hook is overwritten by modules such as C<Coro::Timer> and
125 root 1.91 C<Coro::Event> to wait on an external event that hopefully wake up a
126     coroutine so the scheduler can run it.
127    
128     Please note that if your callback recursively invokes perl (e.g. for event
129     handlers), then it must be prepared to be called recursively.
130 root 1.9
131     =cut
132    
133 root 1.83 $idle = sub {
134 root 1.9 print STDERR "FATAL: deadlock detected\n";
135 root 1.83 exit (51);
136 root 1.9 };
137 root 1.8
138 root 1.24 # this coroutine is necessary because a coroutine
139     # cannot destroy itself.
140     my @destroy;
141 root 1.86 my $manager; $manager = new Coro sub {
142 pcg 1.57 while () {
143 root 1.37 # by overwriting the state object with the manager we destroy it
144     # while still being able to schedule this coroutine (in case it has
145     # been readied multiple times. this is harmless since the manager
146     # can be called as many times as neccessary and will always
147     # remove itself from the runqueue
148 root 1.40 while (@destroy) {
149     my $coro = pop @destroy;
150     $coro->{status} ||= [];
151     $_->ready for @{delete $coro->{join} || []};
152 pcg 1.59
153 root 1.83 # the next line destroys the coro state, but keeps the
154 root 1.92 # coroutine itself intact (we basically make it a zombie
155     # coroutine that always runs the manager thread, so it's possible
156     # to transfer() to this coroutine).
157 root 1.83 $coro->_clone_state_from ($manager);
158 root 1.40 }
159 root 1.24 &schedule;
160     }
161     };
162    
163 root 1.8 # static methods. not really.
164 root 1.43
165     =back
166 root 1.8
167     =head2 STATIC METHODS
168    
169 root 1.92 Static methods are actually functions that operate on the current coroutine only.
170 root 1.8
171     =over 4
172    
173 root 1.13 =item async { ... } [@args...]
174 root 1.8
175 root 1.92 Create a new asynchronous coroutine and return it's coroutine object
176     (usually unused). When the sub returns the new coroutine is automatically
177 root 1.8 terminated.
178    
179 root 1.89 Calling C<exit> in a coroutine will not work correctly, so do not do that.
180    
181 root 1.79 When the coroutine dies, the program will exit, just as in the main
182     program.
183    
184 root 1.13 # create a new coroutine that just prints its arguments
185     async {
186     print "@_\n";
187     } 1,2,3,4;
188    
189 root 1.8 =cut
190    
191 root 1.13 sub async(&@) {
192     my $pid = new Coro @_;
193 root 1.11 $pid->ready;
194 root 1.85 $pid
195 root 1.8 }
196 root 1.1
197 root 1.8 =item schedule
198 root 1.6
199 root 1.92 Calls the scheduler. Please note that the current coroutine will not be put
200 root 1.8 into the ready queue, so calling this function usually means you will
201 root 1.91 never be called again unless something else (e.g. an event handler) calls
202     ready.
203    
204     The canonical way to wait on external events is this:
205    
206     {
207 root 1.92 # remember current coroutine
208 root 1.91 my $current = $Coro::current;
209    
210     # register a hypothetical event handler
211     on_event_invoke sub {
212     # wake up sleeping coroutine
213     $current->ready;
214     undef $current;
215     };
216    
217     # call schedule until event occured.
218     # in case we are woken up for other reasons
219     # (current still defined), loop.
220     Coro::schedule while $current;
221     }
222 root 1.1
223 root 1.22 =item cede
224 root 1.1
225 root 1.92 "Cede" to other coroutines. This function puts the current coroutine into the
226 root 1.22 ready queue and calls C<schedule>, which has the effect of giving up the
227     current "timeslice" to other coroutines of the same or higher priority.
228 root 1.7
229 root 1.40 =item terminate [arg...]
230 root 1.7
231 root 1.92 Terminates the current coroutine with the given status values (see L<cancel>).
232 root 1.13
233 root 1.1 =cut
234    
235 root 1.8 sub terminate {
236 pcg 1.59 $current->cancel (@_);
237 root 1.1 }
238 root 1.6
239 root 1.8 =back
240    
241     # dynamic methods
242    
243 root 1.92 =head2 COROUTINE METHODS
244 root 1.8
245 root 1.92 These are the methods you can call on coroutine objects.
246 root 1.6
247 root 1.8 =over 4
248    
249 root 1.13 =item new Coro \&sub [, @args...]
250 root 1.8
251 root 1.92 Create a new coroutine and return it. When the sub returns the coroutine
252 root 1.40 automatically terminates as if C<terminate> with the returned values were
253 root 1.92 called. To make the coroutine run you must first put it into the ready queue
254 root 1.41 by calling the ready method.
255 root 1.13
256 root 1.89 Calling C<exit> in a coroutine will not work correctly, so do not do that.
257    
258 root 1.6 =cut
259    
260 root 1.84 sub _new_coro {
261 root 1.13 terminate &{+shift};
262     }
263    
264 root 1.8 sub new {
265     my $class = shift;
266 root 1.83
267 root 1.84 $class->SUPER::new (\&_new_coro, @_)
268 root 1.8 }
269 root 1.6
270 root 1.92 =item $success = $coroutine->ready
271 root 1.1
272 root 1.92 Put the given coroutine into the ready queue (according to it's priority)
273     and return true. If the coroutine is already in the ready queue, do nothing
274 root 1.90 and return false.
275 root 1.1
276 root 1.92 =item $is_ready = $coroutine->is_ready
277 root 1.90
278 root 1.92 Return wether the coroutine is currently the ready queue or not,
279 root 1.28
280 root 1.92 =item $coroutine->cancel (arg...)
281 root 1.28
282 root 1.92 Terminates the given coroutine and makes it return the given arguments as
283 pcg 1.59 status (default: the empty list).
284 root 1.28
285     =cut
286    
287     sub cancel {
288 pcg 1.59 my $self = shift;
289     $self->{status} = [@_];
290     push @destroy, $self;
291 root 1.28 $manager->ready;
292 pcg 1.59 &schedule if $current == $self;
293 root 1.40 }
294    
295 root 1.92 =item $coroutine->join
296 root 1.40
297     Wait until the coroutine terminates and return any values given to the
298 pcg 1.59 C<terminate> or C<cancel> functions. C<join> can be called multiple times
299 root 1.92 from multiple coroutine.
300 root 1.40
301     =cut
302    
303     sub join {
304     my $self = shift;
305     unless ($self->{status}) {
306     push @{$self->{join}}, $current;
307     &schedule;
308     }
309     wantarray ? @{$self->{status}} : $self->{status}[0];
310 root 1.31 }
311    
312 root 1.92 =item $oldprio = $coroutine->prio ($newprio)
313 root 1.31
314 root 1.41 Sets (or gets, if the argument is missing) the priority of the
315 root 1.92 coroutine. Higher priority coroutines get run before lower priority
316     coroutines. Priorities are small signed integers (currently -4 .. +3),
317 root 1.41 that you can refer to using PRIO_xxx constants (use the import tag :prio
318     to get then):
319 root 1.31
320     PRIO_MAX > PRIO_HIGH > PRIO_NORMAL > PRIO_LOW > PRIO_IDLE > PRIO_MIN
321     3 > 1 > 0 > -1 > -3 > -4
322    
323     # set priority to HIGH
324     current->prio(PRIO_HIGH);
325    
326     The idle coroutine ($Coro::idle) always has a lower priority than any
327     existing coroutine.
328    
329 root 1.92 Changing the priority of the current coroutine will take effect immediately,
330     but changing the priority of coroutines in the ready queue (but not
331 root 1.31 running) will only take effect after the next schedule (of that
332 root 1.92 coroutine). This is a bug that will be fixed in some future version.
333 root 1.31
334 root 1.92 =item $newprio = $coroutine->nice ($change)
335 root 1.31
336     Similar to C<prio>, but subtract the given value from the priority (i.e.
337     higher values mean lower priority, just as in unix).
338    
339 root 1.92 =item $olddesc = $coroutine->desc ($newdesc)
340 root 1.41
341     Sets (or gets in case the argument is missing) the description for this
342 root 1.92 coroutine. This is just a free-form string you can associate with a coroutine.
343 root 1.41
344     =cut
345    
346     sub desc {
347     my $old = $_[0]{desc};
348     $_[0]{desc} = $_[1] if @_ > 1;
349     $old;
350 root 1.8 }
351 root 1.1
352 root 1.8 =back
353 root 1.2
354 root 1.92 =head2 UTILITY FUNCTIONS
355    
356     =over 4
357    
358     =item unblock_sub { ... }
359    
360     This utility function takes a BLOCK or code reference and "unblocks" it,
361     returning the new coderef. This means that the new coderef will return
362     immediately without blocking, returning nothing, while the original code
363     ref will be called (with parameters) from within its own coroutine.
364    
365     The reason this fucntion exists is that many event libraries (such as the
366     venerable L<Event|Event> module) are not coroutine-safe (a weaker form
367     of thread-safety). This means you must not block within event callbacks,
368     otherwise you might suffer from crashes or worse.
369    
370     This function allows your callbacks to block by executing them in another
371     coroutine where it is safe to block. One example where blocking is handy
372     is when you use the L<Coro::AIO|Coro::AIO> functions to save results to
373     disk.
374    
375     In short: simply use C<unblock_sub { ... }> instead of C<sub { ... }> when
376     creating event callbacks that want to block.
377    
378     =cut
379    
380     our @unblock_pool;
381     our @unblock_queue;
382     our $UNBLOCK_POOL_SIZE = 2;
383    
384     sub unblock_handler_ {
385     while () {
386     my ($cb, @arg) = @{ delete $Coro::current->{arg} };
387     $cb->(@arg);
388    
389     last if @unblock_pool >= $UNBLOCK_POOL_SIZE;
390     push @unblock_pool, $Coro::current;
391     schedule;
392     }
393     }
394    
395     our $unblock_scheduler = async {
396     while () {
397     while (my $cb = pop @unblock_queue) {
398     my $handler = (pop @unblock_pool or new Coro \&unblock_handler_);
399     $handler->{arg} = $cb;
400     $handler->ready;
401     cede;
402     }
403    
404     schedule;
405     }
406     };
407    
408     sub unblock_sub(&) {
409     my $cb = shift;
410    
411     sub {
412     push @unblock_queue, [$cb, @_];
413     $unblock_scheduler->ready;
414     }
415     }
416    
417     =back
418    
419 root 1.8 =cut
420 root 1.2
421 root 1.8 1;
422 root 1.14
423 root 1.17 =head1 BUGS/LIMITATIONS
424 root 1.14
425 root 1.52 - you must make very sure that no coro is still active on global
426 root 1.53 destruction. very bad things might happen otherwise (usually segfaults).
427 root 1.52
428     - this module is not thread-safe. You should only ever use this module
429     from the same thread (this requirement might be losened in the future
430     to allow per-thread schedulers, but Coro::State does not yet allow
431     this).
432 root 1.9
433     =head1 SEE ALSO
434    
435 root 1.67 Support/Utility: L<Coro::Cont>, L<Coro::Specific>, L<Coro::State>, L<Coro::Util>.
436    
437     Locking/IPC: L<Coro::Signal>, L<Coro::Channel>, L<Coro::Semaphore>, L<Coro::SemaphoreSet>, L<Coro::RWLock>.
438    
439     Event/IO: L<Coro::Timer>, L<Coro::Event>, L<Coro::Handle>, L<Coro::Socket>, L<Coro::Select>.
440    
441     Embedding: L<Coro:MakeMaker>
442 root 1.1
443     =head1 AUTHOR
444    
445 root 1.66 Marc Lehmann <schmorp@schmorp.de>
446 root 1.64 http://home.schmorp.de/
447 root 1.1
448     =cut
449