ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/Coro/Coro.pm
Revision: 1.65
Committed: Tue Feb 22 19:51:58 2005 UTC (19 years, 2 months ago) by root
Branch: MAIN
CVS Tags: rel-1_1
Changes since 1.64: +1 -1 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3 root 1.8 Coro - coroutine process abstraction
4 root 1.1
5     =head1 SYNOPSIS
6    
7     use Coro;
8    
9 root 1.8 async {
10     # some asynchronous thread of execution
11 root 1.2 };
12    
13 root 1.8 # alternatively create an async process like this:
14 root 1.6
15 root 1.8 sub some_func : Coro {
16     # some more async code
17     }
18    
19 root 1.22 cede;
20 root 1.2
21 root 1.1 =head1 DESCRIPTION
22    
23 root 1.14 This module collection manages coroutines. Coroutines are similar to
24 root 1.42 threads but don't run in parallel.
25 root 1.14
26 root 1.20 In this module, coroutines are defined as "callchain + lexical variables
27 root 1.23 + @_ + $_ + $@ + $^W + C stack), that is, a coroutine has it's own
28     callchain, it's own set of lexicals and it's own set of perl's most
29     important global variables.
30 root 1.22
31 root 1.8 =cut
32    
33     package Coro;
34    
35 pcg 1.55 BEGIN { eval { require warnings } && warnings->unimport ("uninitialized") }
36 root 1.36
37 root 1.8 use Coro::State;
38    
39 pcg 1.55 use vars qw($idle $main $current);
40    
41 root 1.8 use base Exporter;
42    
43 root 1.65 $VERSION = 1.1;
44 root 1.8
45 root 1.22 @EXPORT = qw(async cede schedule terminate current);
46 root 1.31 %EXPORT_TAGS = (
47     prio => [qw(PRIO_MAX PRIO_HIGH PRIO_NORMAL PRIO_LOW PRIO_IDLE PRIO_MIN)],
48     );
49     @EXPORT_OK = @{$EXPORT_TAGS{prio}};
50 root 1.8
51     {
52     my @async;
53 root 1.26 my $init;
54 root 1.8
55     # this way of handling attributes simply is NOT scalable ;()
56     sub import {
57     Coro->export_to_level(1, @_);
58     my $old = *{(caller)[0]."::MODIFY_CODE_ATTRIBUTES"}{CODE};
59     *{(caller)[0]."::MODIFY_CODE_ATTRIBUTES"} = sub {
60     my ($package, $ref) = (shift, shift);
61     my @attrs;
62     for (@_) {
63     if ($_ eq "Coro") {
64     push @async, $ref;
65 root 1.26 unless ($init++) {
66     eval q{
67     sub INIT {
68     &async(pop @async) while @async;
69     }
70     };
71     }
72 root 1.8 } else {
73 root 1.17 push @attrs, $_;
74 root 1.8 }
75     }
76 root 1.17 return $old ? $old->($package, $ref, @attrs) : @attrs;
77 root 1.8 };
78     }
79    
80     }
81    
82 root 1.43 =over 4
83    
84 root 1.8 =item $main
85 root 1.2
86 root 1.8 This coroutine represents the main program.
87 root 1.1
88     =cut
89    
90 pcg 1.55 $main = new Coro;
91 root 1.8
92 root 1.19 =item $current (or as function: current)
93 root 1.1
94 root 1.8 The current coroutine (the last coroutine switched to). The initial value is C<$main> (of course).
95 root 1.1
96 root 1.8 =cut
97    
98     # maybe some other module used Coro::Specific before...
99     if ($current) {
100     $main->{specific} = $current->{specific};
101 root 1.1 }
102    
103 pcg 1.55 $current = $main;
104 root 1.19
105     sub current() { $current }
106 root 1.9
107     =item $idle
108    
109     The coroutine to switch to when no other coroutine is running. The default
110     implementation prints "FATAL: deadlock detected" and exits.
111    
112     =cut
113    
114     # should be done using priorities :(
115 pcg 1.55 $idle = new Coro sub {
116 root 1.9 print STDERR "FATAL: deadlock detected\n";
117     exit(51);
118     };
119 root 1.8
120 root 1.24 # this coroutine is necessary because a coroutine
121     # cannot destroy itself.
122     my @destroy;
123 root 1.38 my $manager;
124     $manager = new Coro sub {
125 pcg 1.57 while () {
126 root 1.37 # by overwriting the state object with the manager we destroy it
127     # while still being able to schedule this coroutine (in case it has
128     # been readied multiple times. this is harmless since the manager
129     # can be called as many times as neccessary and will always
130     # remove itself from the runqueue
131 root 1.40 while (@destroy) {
132     my $coro = pop @destroy;
133     $coro->{status} ||= [];
134     $_->ready for @{delete $coro->{join} || []};
135 pcg 1.59
136     # the next line destroys the _coro_state, but keeps the
137     # process itself intact (we basically make it a zombie
138     # process that always runs the manager thread, so it's possible
139     # to transfer() to this process).
140 root 1.40 $coro->{_coro_state} = $manager->{_coro_state};
141     }
142 root 1.24 &schedule;
143     }
144     };
145    
146 root 1.8 # static methods. not really.
147 root 1.43
148     =back
149 root 1.8
150     =head2 STATIC METHODS
151    
152     Static methods are actually functions that operate on the current process only.
153    
154     =over 4
155    
156 root 1.13 =item async { ... } [@args...]
157 root 1.8
158     Create a new asynchronous process and return it's process object
159     (usually unused). When the sub returns the new process is automatically
160     terminated.
161    
162 root 1.13 # create a new coroutine that just prints its arguments
163     async {
164     print "@_\n";
165     } 1,2,3,4;
166    
167 root 1.8 =cut
168    
169 root 1.13 sub async(&@) {
170     my $pid = new Coro @_;
171 root 1.24 $manager->ready; # this ensures that the stack is cloned from the manager
172 root 1.11 $pid->ready;
173     $pid;
174 root 1.8 }
175 root 1.1
176 root 1.8 =item schedule
177 root 1.6
178 root 1.8 Calls the scheduler. Please note that the current process will not be put
179     into the ready queue, so calling this function usually means you will
180     never be called again.
181 root 1.1
182     =cut
183    
184 root 1.22 =item cede
185 root 1.1
186 root 1.22 "Cede" to other processes. This function puts the current process into the
187     ready queue and calls C<schedule>, which has the effect of giving up the
188     current "timeslice" to other coroutines of the same or higher priority.
189 root 1.7
190 root 1.8 =cut
191    
192 root 1.40 =item terminate [arg...]
193 root 1.7
194 pcg 1.59 Terminates the current process with the given status values (see L<cancel>).
195 root 1.13
196 root 1.1 =cut
197    
198 root 1.8 sub terminate {
199 pcg 1.59 $current->cancel (@_);
200 root 1.1 }
201 root 1.6
202 root 1.8 =back
203    
204     # dynamic methods
205    
206     =head2 PROCESS METHODS
207    
208     These are the methods you can call on process objects.
209 root 1.6
210 root 1.8 =over 4
211    
212 root 1.13 =item new Coro \&sub [, @args...]
213 root 1.8
214     Create a new process and return it. When the sub returns the process
215 root 1.40 automatically terminates as if C<terminate> with the returned values were
216 root 1.41 called. To make the process run you must first put it into the ready queue
217     by calling the ready method.
218 root 1.13
219 root 1.6 =cut
220    
221 root 1.13 sub _newcoro {
222     terminate &{+shift};
223     }
224    
225 root 1.8 sub new {
226     my $class = shift;
227     bless {
228 root 1.13 _coro_state => (new Coro::State $_[0] && \&_newcoro, @_),
229 root 1.8 }, $class;
230     }
231 root 1.6
232 root 1.8 =item $process->ready
233 root 1.1
234 root 1.39 Put the given process into the ready queue.
235 root 1.1
236 root 1.8 =cut
237 root 1.28
238 pcg 1.59 =item $process->cancel (arg...)
239 root 1.28
240 pcg 1.59 Temrinates the given process and makes it return the given arguments as
241     status (default: the empty list).
242 root 1.28
243     =cut
244    
245     sub cancel {
246 pcg 1.59 my $self = shift;
247     $self->{status} = [@_];
248     push @destroy, $self;
249 root 1.28 $manager->ready;
250 pcg 1.59 &schedule if $current == $self;
251 root 1.40 }
252    
253     =item $process->join
254    
255     Wait until the coroutine terminates and return any values given to the
256 pcg 1.59 C<terminate> or C<cancel> functions. C<join> can be called multiple times
257     from multiple processes.
258 root 1.40
259     =cut
260    
261     sub join {
262     my $self = shift;
263     unless ($self->{status}) {
264     push @{$self->{join}}, $current;
265     &schedule;
266     }
267     wantarray ? @{$self->{status}} : $self->{status}[0];
268 root 1.31 }
269    
270     =item $oldprio = $process->prio($newprio)
271    
272 root 1.41 Sets (or gets, if the argument is missing) the priority of the
273     process. Higher priority processes get run before lower priority
274 root 1.52 processes. Priorities are small signed integers (currently -4 .. +3),
275 root 1.41 that you can refer to using PRIO_xxx constants (use the import tag :prio
276     to get then):
277 root 1.31
278     PRIO_MAX > PRIO_HIGH > PRIO_NORMAL > PRIO_LOW > PRIO_IDLE > PRIO_MIN
279     3 > 1 > 0 > -1 > -3 > -4
280    
281     # set priority to HIGH
282     current->prio(PRIO_HIGH);
283    
284     The idle coroutine ($Coro::idle) always has a lower priority than any
285     existing coroutine.
286    
287     Changing the priority of the current process will take effect immediately,
288     but changing the priority of processes in the ready queue (but not
289     running) will only take effect after the next schedule (of that
290     process). This is a bug that will be fixed in some future version.
291    
292     =cut
293    
294     sub prio {
295     my $old = $_[0]{prio};
296     $_[0]{prio} = $_[1] if @_ > 1;
297     $old;
298     }
299    
300     =item $newprio = $process->nice($change)
301    
302     Similar to C<prio>, but subtract the given value from the priority (i.e.
303     higher values mean lower priority, just as in unix).
304    
305     =cut
306    
307     sub nice {
308     $_[0]{prio} -= $_[1];
309 root 1.41 }
310    
311     =item $olddesc = $process->desc($newdesc)
312    
313     Sets (or gets in case the argument is missing) the description for this
314     process. This is just a free-form string you can associate with a process.
315    
316     =cut
317    
318     sub desc {
319     my $old = $_[0]{desc};
320     $_[0]{desc} = $_[1] if @_ > 1;
321     $old;
322 root 1.8 }
323 root 1.1
324 root 1.8 =back
325 root 1.2
326 root 1.8 =cut
327 root 1.2
328 root 1.8 1;
329 root 1.14
330 root 1.17 =head1 BUGS/LIMITATIONS
331 root 1.14
332 root 1.52 - you must make very sure that no coro is still active on global
333 root 1.53 destruction. very bad things might happen otherwise (usually segfaults).
334 root 1.52
335     - this module is not thread-safe. You should only ever use this module
336     from the same thread (this requirement might be losened in the future
337     to allow per-thread schedulers, but Coro::State does not yet allow
338     this).
339 root 1.9
340     =head1 SEE ALSO
341    
342     L<Coro::Channel>, L<Coro::Cont>, L<Coro::Specific>, L<Coro::Semaphore>,
343 pcg 1.54 L<Coro::Signal>, L<Coro::State>, L<Coro::Timer>, L<Coro::Event>,
344 root 1.62 L<Coro::Handle>, L<Coro::RWLock>, L<Coro::Socket>.
345 root 1.1
346     =head1 AUTHOR
347    
348     Marc Lehmann <pcg@goof.com>
349 root 1.64 http://home.schmorp.de/
350 root 1.1
351     =cut
352