ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/Coro/Coro.pm
Revision: 1.71
Committed: Tue Nov 29 12:36:18 2005 UTC (18 years, 6 months ago) by root
Branch: MAIN
CVS Tags: rel-1_5
Changes since 1.70: +13 -7 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3 root 1.8 Coro - coroutine process abstraction
4 root 1.1
5     =head1 SYNOPSIS
6    
7     use Coro;
8    
9 root 1.8 async {
10     # some asynchronous thread of execution
11 root 1.2 };
12    
13 root 1.8 # alternatively create an async process like this:
14 root 1.6
15 root 1.8 sub some_func : Coro {
16     # some more async code
17     }
18    
19 root 1.22 cede;
20 root 1.2
21 root 1.1 =head1 DESCRIPTION
22    
23 root 1.14 This module collection manages coroutines. Coroutines are similar to
24 root 1.42 threads but don't run in parallel.
25 root 1.14
26 root 1.20 In this module, coroutines are defined as "callchain + lexical variables
27 root 1.23 + @_ + $_ + $@ + $^W + C stack), that is, a coroutine has it's own
28     callchain, it's own set of lexicals and it's own set of perl's most
29     important global variables.
30 root 1.22
31 root 1.8 =cut
32    
33     package Coro;
34    
35 root 1.71 use strict;
36     no warnings "uninitialized";
37 root 1.36
38 root 1.8 use Coro::State;
39    
40 root 1.71 use base Exporter::;
41 pcg 1.55
42 root 1.71 our $idle; # idle coroutine
43     our $main; # main coroutine
44     our $current; # current coroutine
45 root 1.8
46 root 1.71 our $VERSION = 1.5;
47 root 1.8
48 root 1.71 our @EXPORT = qw(async cede schedule terminate current);
49     our %EXPORT_TAGS = (
50 root 1.31 prio => [qw(PRIO_MAX PRIO_HIGH PRIO_NORMAL PRIO_LOW PRIO_IDLE PRIO_MIN)],
51     );
52 root 1.71 our @EXPORT_OK = @{$EXPORT_TAGS{prio}};
53 root 1.8
54     {
55     my @async;
56 root 1.26 my $init;
57 root 1.8
58     # this way of handling attributes simply is NOT scalable ;()
59     sub import {
60 root 1.71 no strict 'refs';
61    
62 root 1.8 Coro->export_to_level(1, @_);
63 root 1.71
64 root 1.8 my $old = *{(caller)[0]."::MODIFY_CODE_ATTRIBUTES"}{CODE};
65     *{(caller)[0]."::MODIFY_CODE_ATTRIBUTES"} = sub {
66     my ($package, $ref) = (shift, shift);
67     my @attrs;
68     for (@_) {
69     if ($_ eq "Coro") {
70     push @async, $ref;
71 root 1.26 unless ($init++) {
72     eval q{
73     sub INIT {
74     &async(pop @async) while @async;
75     }
76     };
77     }
78 root 1.8 } else {
79 root 1.17 push @attrs, $_;
80 root 1.8 }
81     }
82 root 1.17 return $old ? $old->($package, $ref, @attrs) : @attrs;
83 root 1.8 };
84     }
85    
86     }
87    
88 root 1.43 =over 4
89    
90 root 1.8 =item $main
91 root 1.2
92 root 1.8 This coroutine represents the main program.
93 root 1.1
94     =cut
95    
96 pcg 1.55 $main = new Coro;
97 root 1.8
98 root 1.19 =item $current (or as function: current)
99 root 1.1
100 root 1.8 The current coroutine (the last coroutine switched to). The initial value is C<$main> (of course).
101 root 1.1
102 root 1.8 =cut
103    
104     # maybe some other module used Coro::Specific before...
105     if ($current) {
106     $main->{specific} = $current->{specific};
107 root 1.1 }
108    
109 pcg 1.55 $current = $main;
110 root 1.19
111     sub current() { $current }
112 root 1.9
113     =item $idle
114    
115     The coroutine to switch to when no other coroutine is running. The default
116     implementation prints "FATAL: deadlock detected" and exits.
117    
118     =cut
119    
120     # should be done using priorities :(
121 pcg 1.55 $idle = new Coro sub {
122 root 1.9 print STDERR "FATAL: deadlock detected\n";
123     exit(51);
124     };
125 root 1.8
126 root 1.24 # this coroutine is necessary because a coroutine
127     # cannot destroy itself.
128     my @destroy;
129 root 1.38 my $manager;
130     $manager = new Coro sub {
131 pcg 1.57 while () {
132 root 1.37 # by overwriting the state object with the manager we destroy it
133     # while still being able to schedule this coroutine (in case it has
134     # been readied multiple times. this is harmless since the manager
135     # can be called as many times as neccessary and will always
136     # remove itself from the runqueue
137 root 1.40 while (@destroy) {
138     my $coro = pop @destroy;
139     $coro->{status} ||= [];
140     $_->ready for @{delete $coro->{join} || []};
141 pcg 1.59
142     # the next line destroys the _coro_state, but keeps the
143     # process itself intact (we basically make it a zombie
144     # process that always runs the manager thread, so it's possible
145     # to transfer() to this process).
146 root 1.40 $coro->{_coro_state} = $manager->{_coro_state};
147     }
148 root 1.24 &schedule;
149     }
150     };
151    
152 root 1.8 # static methods. not really.
153 root 1.43
154     =back
155 root 1.8
156     =head2 STATIC METHODS
157    
158     Static methods are actually functions that operate on the current process only.
159    
160     =over 4
161    
162 root 1.13 =item async { ... } [@args...]
163 root 1.8
164     Create a new asynchronous process and return it's process object
165     (usually unused). When the sub returns the new process is automatically
166     terminated.
167    
168 root 1.13 # create a new coroutine that just prints its arguments
169     async {
170     print "@_\n";
171     } 1,2,3,4;
172    
173 root 1.8 =cut
174    
175 root 1.13 sub async(&@) {
176     my $pid = new Coro @_;
177 root 1.24 $manager->ready; # this ensures that the stack is cloned from the manager
178 root 1.11 $pid->ready;
179     $pid;
180 root 1.8 }
181 root 1.1
182 root 1.8 =item schedule
183 root 1.6
184 root 1.8 Calls the scheduler. Please note that the current process will not be put
185     into the ready queue, so calling this function usually means you will
186     never be called again.
187 root 1.1
188     =cut
189    
190 root 1.22 =item cede
191 root 1.1
192 root 1.22 "Cede" to other processes. This function puts the current process into the
193     ready queue and calls C<schedule>, which has the effect of giving up the
194     current "timeslice" to other coroutines of the same or higher priority.
195 root 1.7
196 root 1.8 =cut
197    
198 root 1.40 =item terminate [arg...]
199 root 1.7
200 pcg 1.59 Terminates the current process with the given status values (see L<cancel>).
201 root 1.13
202 root 1.1 =cut
203    
204 root 1.8 sub terminate {
205 pcg 1.59 $current->cancel (@_);
206 root 1.1 }
207 root 1.6
208 root 1.8 =back
209    
210     # dynamic methods
211    
212     =head2 PROCESS METHODS
213    
214     These are the methods you can call on process objects.
215 root 1.6
216 root 1.8 =over 4
217    
218 root 1.13 =item new Coro \&sub [, @args...]
219 root 1.8
220     Create a new process and return it. When the sub returns the process
221 root 1.40 automatically terminates as if C<terminate> with the returned values were
222 root 1.41 called. To make the process run you must first put it into the ready queue
223     by calling the ready method.
224 root 1.13
225 root 1.6 =cut
226    
227 root 1.13 sub _newcoro {
228     terminate &{+shift};
229     }
230    
231 root 1.8 sub new {
232     my $class = shift;
233     bless {
234 root 1.13 _coro_state => (new Coro::State $_[0] && \&_newcoro, @_),
235 root 1.8 }, $class;
236     }
237 root 1.6
238 root 1.8 =item $process->ready
239 root 1.1
240 root 1.39 Put the given process into the ready queue.
241 root 1.1
242 root 1.8 =cut
243 root 1.28
244 pcg 1.59 =item $process->cancel (arg...)
245 root 1.28
246 pcg 1.59 Temrinates the given process and makes it return the given arguments as
247     status (default: the empty list).
248 root 1.28
249     =cut
250    
251     sub cancel {
252 pcg 1.59 my $self = shift;
253     $self->{status} = [@_];
254     push @destroy, $self;
255 root 1.28 $manager->ready;
256 pcg 1.59 &schedule if $current == $self;
257 root 1.40 }
258    
259     =item $process->join
260    
261     Wait until the coroutine terminates and return any values given to the
262 pcg 1.59 C<terminate> or C<cancel> functions. C<join> can be called multiple times
263     from multiple processes.
264 root 1.40
265     =cut
266    
267     sub join {
268     my $self = shift;
269     unless ($self->{status}) {
270     push @{$self->{join}}, $current;
271     &schedule;
272     }
273     wantarray ? @{$self->{status}} : $self->{status}[0];
274 root 1.31 }
275    
276     =item $oldprio = $process->prio($newprio)
277    
278 root 1.41 Sets (or gets, if the argument is missing) the priority of the
279     process. Higher priority processes get run before lower priority
280 root 1.52 processes. Priorities are small signed integers (currently -4 .. +3),
281 root 1.41 that you can refer to using PRIO_xxx constants (use the import tag :prio
282     to get then):
283 root 1.31
284     PRIO_MAX > PRIO_HIGH > PRIO_NORMAL > PRIO_LOW > PRIO_IDLE > PRIO_MIN
285     3 > 1 > 0 > -1 > -3 > -4
286    
287     # set priority to HIGH
288     current->prio(PRIO_HIGH);
289    
290     The idle coroutine ($Coro::idle) always has a lower priority than any
291     existing coroutine.
292    
293     Changing the priority of the current process will take effect immediately,
294     but changing the priority of processes in the ready queue (but not
295     running) will only take effect after the next schedule (of that
296     process). This is a bug that will be fixed in some future version.
297    
298     =cut
299    
300     sub prio {
301     my $old = $_[0]{prio};
302     $_[0]{prio} = $_[1] if @_ > 1;
303     $old;
304     }
305    
306     =item $newprio = $process->nice($change)
307    
308     Similar to C<prio>, but subtract the given value from the priority (i.e.
309     higher values mean lower priority, just as in unix).
310    
311     =cut
312    
313     sub nice {
314     $_[0]{prio} -= $_[1];
315 root 1.41 }
316    
317     =item $olddesc = $process->desc($newdesc)
318    
319     Sets (or gets in case the argument is missing) the description for this
320     process. This is just a free-form string you can associate with a process.
321    
322     =cut
323    
324     sub desc {
325     my $old = $_[0]{desc};
326     $_[0]{desc} = $_[1] if @_ > 1;
327     $old;
328 root 1.8 }
329 root 1.1
330 root 1.8 =back
331 root 1.2
332 root 1.8 =cut
333 root 1.2
334 root 1.8 1;
335 root 1.14
336 root 1.17 =head1 BUGS/LIMITATIONS
337 root 1.14
338 root 1.52 - you must make very sure that no coro is still active on global
339 root 1.53 destruction. very bad things might happen otherwise (usually segfaults).
340 root 1.52
341     - this module is not thread-safe. You should only ever use this module
342     from the same thread (this requirement might be losened in the future
343     to allow per-thread schedulers, but Coro::State does not yet allow
344     this).
345 root 1.9
346     =head1 SEE ALSO
347    
348 root 1.67 Support/Utility: L<Coro::Cont>, L<Coro::Specific>, L<Coro::State>, L<Coro::Util>.
349    
350     Locking/IPC: L<Coro::Signal>, L<Coro::Channel>, L<Coro::Semaphore>, L<Coro::SemaphoreSet>, L<Coro::RWLock>.
351    
352     Event/IO: L<Coro::Timer>, L<Coro::Event>, L<Coro::Handle>, L<Coro::Socket>, L<Coro::Select>.
353    
354     Embedding: L<Coro:MakeMaker>
355 root 1.1
356     =head1 AUTHOR
357    
358 root 1.66 Marc Lehmann <schmorp@schmorp.de>
359 root 1.64 http://home.schmorp.de/
360 root 1.1
361     =cut
362