ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/Coro/Coro.pm
Revision: 1.42
Committed: Tue Nov 6 20:37:20 2001 UTC (22 years, 6 months ago) by root
Branch: MAIN
Changes since 1.41: +2 -4 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3 root 1.8 Coro - coroutine process abstraction
4 root 1.1
5     =head1 SYNOPSIS
6    
7     use Coro;
8    
9 root 1.8 async {
10     # some asynchronous thread of execution
11 root 1.2 };
12    
13 root 1.8 # alternatively create an async process like this:
14 root 1.6
15 root 1.8 sub some_func : Coro {
16     # some more async code
17     }
18    
19 root 1.22 cede;
20 root 1.2
21 root 1.1 =head1 DESCRIPTION
22    
23 root 1.14 This module collection manages coroutines. Coroutines are similar to
24 root 1.42 threads but don't run in parallel.
25 root 1.14
26 root 1.20 In this module, coroutines are defined as "callchain + lexical variables
27 root 1.23 + @_ + $_ + $@ + $^W + C stack), that is, a coroutine has it's own
28     callchain, it's own set of lexicals and it's own set of perl's most
29     important global variables.
30 root 1.22
31 root 1.8 =cut
32    
33     package Coro;
34    
35 root 1.36 no warnings qw(uninitialized);
36    
37 root 1.8 use Coro::State;
38    
39     use base Exporter;
40    
41 root 1.41 $VERSION = 0.52;
42 root 1.8
43 root 1.22 @EXPORT = qw(async cede schedule terminate current);
44 root 1.31 %EXPORT_TAGS = (
45     prio => [qw(PRIO_MAX PRIO_HIGH PRIO_NORMAL PRIO_LOW PRIO_IDLE PRIO_MIN)],
46     );
47     @EXPORT_OK = @{$EXPORT_TAGS{prio}};
48 root 1.8
49     {
50     my @async;
51 root 1.26 my $init;
52 root 1.8
53     # this way of handling attributes simply is NOT scalable ;()
54     sub import {
55     Coro->export_to_level(1, @_);
56     my $old = *{(caller)[0]."::MODIFY_CODE_ATTRIBUTES"}{CODE};
57     *{(caller)[0]."::MODIFY_CODE_ATTRIBUTES"} = sub {
58     my ($package, $ref) = (shift, shift);
59     my @attrs;
60     for (@_) {
61     if ($_ eq "Coro") {
62     push @async, $ref;
63 root 1.26 unless ($init++) {
64     eval q{
65     sub INIT {
66     &async(pop @async) while @async;
67     }
68     };
69     }
70 root 1.8 } else {
71 root 1.17 push @attrs, $_;
72 root 1.8 }
73     }
74 root 1.17 return $old ? $old->($package, $ref, @attrs) : @attrs;
75 root 1.8 };
76     }
77    
78     }
79    
80     =item $main
81 root 1.2
82 root 1.8 This coroutine represents the main program.
83 root 1.1
84     =cut
85    
86 root 1.9 our $main = new Coro;
87 root 1.8
88 root 1.19 =item $current (or as function: current)
89 root 1.1
90 root 1.8 The current coroutine (the last coroutine switched to). The initial value is C<$main> (of course).
91 root 1.1
92 root 1.8 =cut
93    
94     # maybe some other module used Coro::Specific before...
95     if ($current) {
96     $main->{specific} = $current->{specific};
97 root 1.1 }
98    
99 root 1.9 our $current = $main;
100 root 1.19
101     sub current() { $current }
102 root 1.9
103     =item $idle
104    
105     The coroutine to switch to when no other coroutine is running. The default
106     implementation prints "FATAL: deadlock detected" and exits.
107    
108     =cut
109    
110     # should be done using priorities :(
111     our $idle = new Coro sub {
112     print STDERR "FATAL: deadlock detected\n";
113     exit(51);
114     };
115 root 1.8
116 root 1.24 # this coroutine is necessary because a coroutine
117     # cannot destroy itself.
118     my @destroy;
119 root 1.38 my $manager;
120     $manager = new Coro sub {
121 root 1.24 while() {
122 root 1.37 # by overwriting the state object with the manager we destroy it
123     # while still being able to schedule this coroutine (in case it has
124     # been readied multiple times. this is harmless since the manager
125     # can be called as many times as neccessary and will always
126     # remove itself from the runqueue
127 root 1.40 while (@destroy) {
128     my $coro = pop @destroy;
129     $coro->{status} ||= [];
130     $_->ready for @{delete $coro->{join} || []};
131     $coro->{_coro_state} = $manager->{_coro_state};
132     }
133 root 1.24 &schedule;
134     }
135     };
136    
137 root 1.8 # static methods. not really.
138    
139     =head2 STATIC METHODS
140    
141     Static methods are actually functions that operate on the current process only.
142    
143     =over 4
144    
145 root 1.13 =item async { ... } [@args...]
146 root 1.8
147     Create a new asynchronous process and return it's process object
148     (usually unused). When the sub returns the new process is automatically
149     terminated.
150    
151 root 1.13 # create a new coroutine that just prints its arguments
152     async {
153     print "@_\n";
154     } 1,2,3,4;
155    
156     The coderef you submit MUST NOT be a closure that refers to variables
157     in an outer scope. This does NOT work. Pass arguments into it instead.
158    
159 root 1.8 =cut
160    
161 root 1.13 sub async(&@) {
162     my $pid = new Coro @_;
163 root 1.24 $manager->ready; # this ensures that the stack is cloned from the manager
164 root 1.11 $pid->ready;
165     $pid;
166 root 1.8 }
167 root 1.1
168 root 1.8 =item schedule
169 root 1.6
170 root 1.8 Calls the scheduler. Please note that the current process will not be put
171     into the ready queue, so calling this function usually means you will
172     never be called again.
173 root 1.1
174     =cut
175    
176 root 1.22 =item cede
177 root 1.1
178 root 1.22 "Cede" to other processes. This function puts the current process into the
179     ready queue and calls C<schedule>, which has the effect of giving up the
180     current "timeslice" to other coroutines of the same or higher priority.
181 root 1.7
182 root 1.8 =cut
183    
184 root 1.40 =item terminate [arg...]
185 root 1.7
186 root 1.8 Terminates the current process.
187 root 1.1
188 root 1.13 Future versions of this function will allow result arguments.
189    
190 root 1.1 =cut
191    
192 root 1.8 sub terminate {
193 root 1.40 $current->{status} = [@_];
194 root 1.28 $current->cancel;
195 root 1.23 &schedule;
196 root 1.28 die; # NORETURN
197 root 1.1 }
198 root 1.6
199 root 1.8 =back
200    
201     # dynamic methods
202    
203     =head2 PROCESS METHODS
204    
205     These are the methods you can call on process objects.
206 root 1.6
207 root 1.8 =over 4
208    
209 root 1.13 =item new Coro \&sub [, @args...]
210 root 1.8
211     Create a new process and return it. When the sub returns the process
212 root 1.40 automatically terminates as if C<terminate> with the returned values were
213 root 1.41 called. To make the process run you must first put it into the ready queue
214     by calling the ready method.
215 root 1.13
216 root 1.6 =cut
217    
218 root 1.13 sub _newcoro {
219     terminate &{+shift};
220     }
221    
222 root 1.8 sub new {
223     my $class = shift;
224     bless {
225 root 1.13 _coro_state => (new Coro::State $_[0] && \&_newcoro, @_),
226 root 1.8 }, $class;
227     }
228 root 1.6
229 root 1.8 =item $process->ready
230 root 1.1
231 root 1.39 Put the given process into the ready queue.
232 root 1.1
233 root 1.8 =cut
234 root 1.28
235     =item $process->cancel
236    
237     Like C<terminate>, but terminates the specified process instead.
238    
239     =cut
240    
241     sub cancel {
242     push @destroy, $_[0];
243     $manager->ready;
244 root 1.35 &schedule if $current == $_[0];
245 root 1.40 }
246    
247     =item $process->join
248    
249     Wait until the coroutine terminates and return any values given to the
250     C<terminate> function. C<join> can be called multiple times from multiple
251     processes.
252    
253     =cut
254    
255     sub join {
256     my $self = shift;
257     unless ($self->{status}) {
258     push @{$self->{join}}, $current;
259     &schedule;
260     }
261     wantarray ? @{$self->{status}} : $self->{status}[0];
262 root 1.31 }
263    
264     =item $oldprio = $process->prio($newprio)
265    
266 root 1.41 Sets (or gets, if the argument is missing) the priority of the
267     process. Higher priority processes get run before lower priority
268     processes. Priorities are smalled signed integer (currently -4 .. +3),
269     that you can refer to using PRIO_xxx constants (use the import tag :prio
270     to get then):
271 root 1.31
272     PRIO_MAX > PRIO_HIGH > PRIO_NORMAL > PRIO_LOW > PRIO_IDLE > PRIO_MIN
273     3 > 1 > 0 > -1 > -3 > -4
274    
275     # set priority to HIGH
276     current->prio(PRIO_HIGH);
277    
278     The idle coroutine ($Coro::idle) always has a lower priority than any
279     existing coroutine.
280    
281     Changing the priority of the current process will take effect immediately,
282     but changing the priority of processes in the ready queue (but not
283     running) will only take effect after the next schedule (of that
284     process). This is a bug that will be fixed in some future version.
285    
286     =cut
287    
288     sub prio {
289     my $old = $_[0]{prio};
290     $_[0]{prio} = $_[1] if @_ > 1;
291     $old;
292     }
293    
294     =item $newprio = $process->nice($change)
295    
296     Similar to C<prio>, but subtract the given value from the priority (i.e.
297     higher values mean lower priority, just as in unix).
298    
299     =cut
300    
301     sub nice {
302     $_[0]{prio} -= $_[1];
303 root 1.41 }
304    
305     =item $olddesc = $process->desc($newdesc)
306    
307     Sets (or gets in case the argument is missing) the description for this
308     process. This is just a free-form string you can associate with a process.
309    
310     =cut
311    
312     sub desc {
313     my $old = $_[0]{desc};
314     $_[0]{desc} = $_[1] if @_ > 1;
315     $old;
316 root 1.8 }
317 root 1.1
318 root 1.8 =back
319 root 1.2
320 root 1.8 =cut
321 root 1.2
322 root 1.8 1;
323 root 1.14
324 root 1.17 =head1 BUGS/LIMITATIONS
325 root 1.14
326 root 1.33 - you must make very sure that no coro is still active on global destruction.
327     very bad things might happen otherwise (usually segfaults).
328 root 1.42 - this module is not thread-safe. You should only ever use this module from
329 root 1.17 the same thread (this requirement might be loosened in the future to
330 root 1.20 allow per-thread schedulers, but Coro::State does not yet allow this).
331 root 1.9
332     =head1 SEE ALSO
333    
334     L<Coro::Channel>, L<Coro::Cont>, L<Coro::Specific>, L<Coro::Semaphore>,
335 root 1.25 L<Coro::Signal>, L<Coro::State>, L<Coro::Event>, L<Coro::RWLock>,
336 root 1.26 L<Coro::Handle>, L<Coro::Socket>.
337 root 1.1
338     =head1 AUTHOR
339    
340     Marc Lehmann <pcg@goof.com>
341     http://www.goof.com/pcg/marc/
342    
343     =cut
344