ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/Coro/Coro.pm
Revision: 1.82
Committed: Fri Nov 24 13:40:36 2006 UTC (17 years, 6 months ago) by root
Branch: MAIN
Changes since 1.81: +6 -8 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3 root 1.8 Coro - coroutine process abstraction
4 root 1.1
5     =head1 SYNOPSIS
6    
7     use Coro;
8    
9 root 1.8 async {
10     # some asynchronous thread of execution
11 root 1.2 };
12    
13 root 1.8 # alternatively create an async process like this:
14 root 1.6
15 root 1.8 sub some_func : Coro {
16     # some more async code
17     }
18    
19 root 1.22 cede;
20 root 1.2
21 root 1.1 =head1 DESCRIPTION
22    
23 root 1.14 This module collection manages coroutines. Coroutines are similar to
24 root 1.42 threads but don't run in parallel.
25 root 1.14
26 root 1.20 In this module, coroutines are defined as "callchain + lexical variables
27 root 1.23 + @_ + $_ + $@ + $^W + C stack), that is, a coroutine has it's own
28     callchain, it's own set of lexicals and it's own set of perl's most
29     important global variables.
30 root 1.22
31 root 1.8 =cut
32    
33     package Coro;
34    
35 root 1.71 use strict;
36     no warnings "uninitialized";
37 root 1.36
38 root 1.8 use Coro::State;
39    
40 root 1.71 use base Exporter::;
41 pcg 1.55
42 root 1.71 our $idle; # idle coroutine
43     our $main; # main coroutine
44     our $current; # current coroutine
45 root 1.8
46 root 1.80 our $VERSION = '2.5';
47 root 1.8
48 root 1.71 our @EXPORT = qw(async cede schedule terminate current);
49     our %EXPORT_TAGS = (
50 root 1.31 prio => [qw(PRIO_MAX PRIO_HIGH PRIO_NORMAL PRIO_LOW PRIO_IDLE PRIO_MIN)],
51     );
52 root 1.71 our @EXPORT_OK = @{$EXPORT_TAGS{prio}};
53 root 1.8
54     {
55     my @async;
56 root 1.26 my $init;
57 root 1.8
58     # this way of handling attributes simply is NOT scalable ;()
59     sub import {
60 root 1.71 no strict 'refs';
61    
62 root 1.8 Coro->export_to_level(1, @_);
63 root 1.71
64 root 1.8 my $old = *{(caller)[0]."::MODIFY_CODE_ATTRIBUTES"}{CODE};
65     *{(caller)[0]."::MODIFY_CODE_ATTRIBUTES"} = sub {
66     my ($package, $ref) = (shift, shift);
67     my @attrs;
68     for (@_) {
69     if ($_ eq "Coro") {
70     push @async, $ref;
71 root 1.26 unless ($init++) {
72     eval q{
73     sub INIT {
74     &async(pop @async) while @async;
75     }
76     };
77     }
78 root 1.8 } else {
79 root 1.17 push @attrs, $_;
80 root 1.8 }
81     }
82 root 1.17 return $old ? $old->($package, $ref, @attrs) : @attrs;
83 root 1.8 };
84     }
85    
86     }
87    
88 root 1.43 =over 4
89    
90 root 1.8 =item $main
91 root 1.2
92 root 1.8 This coroutine represents the main program.
93 root 1.1
94     =cut
95    
96 pcg 1.55 $main = new Coro;
97 root 1.8
98 root 1.19 =item $current (or as function: current)
99 root 1.1
100 root 1.8 The current coroutine (the last coroutine switched to). The initial value is C<$main> (of course).
101 root 1.1
102 root 1.8 =cut
103    
104     # maybe some other module used Coro::Specific before...
105     if ($current) {
106     $main->{specific} = $current->{specific};
107 root 1.1 }
108    
109 pcg 1.55 $current = $main;
110 root 1.19
111     sub current() { $current }
112 root 1.9
113     =item $idle
114    
115     The coroutine to switch to when no other coroutine is running. The default
116     implementation prints "FATAL: deadlock detected" and exits.
117    
118     =cut
119    
120     # should be done using priorities :(
121 pcg 1.55 $idle = new Coro sub {
122 root 1.9 print STDERR "FATAL: deadlock detected\n";
123     exit(51);
124     };
125 root 1.8
126 root 1.24 # this coroutine is necessary because a coroutine
127     # cannot destroy itself.
128     my @destroy;
129 root 1.38 my $manager;
130     $manager = new Coro sub {
131 pcg 1.57 while () {
132 root 1.37 # by overwriting the state object with the manager we destroy it
133     # while still being able to schedule this coroutine (in case it has
134     # been readied multiple times. this is harmless since the manager
135     # can be called as many times as neccessary and will always
136     # remove itself from the runqueue
137 root 1.40 while (@destroy) {
138     my $coro = pop @destroy;
139     $coro->{status} ||= [];
140     $_->ready for @{delete $coro->{join} || []};
141 pcg 1.59
142     # the next line destroys the _coro_state, but keeps the
143     # process itself intact (we basically make it a zombie
144     # process that always runs the manager thread, so it's possible
145     # to transfer() to this process).
146 root 1.82 $coro->{_coro_state}->_clone_state_from ($manager->{_coro_state});
147 root 1.40 }
148 root 1.24 &schedule;
149     }
150     };
151    
152 root 1.8 # static methods. not really.
153 root 1.43
154     =back
155 root 1.8
156     =head2 STATIC METHODS
157    
158     Static methods are actually functions that operate on the current process only.
159    
160     =over 4
161    
162 root 1.13 =item async { ... } [@args...]
163 root 1.8
164     Create a new asynchronous process and return it's process object
165     (usually unused). When the sub returns the new process is automatically
166     terminated.
167    
168 root 1.79 When the coroutine dies, the program will exit, just as in the main
169     program.
170    
171 root 1.13 # create a new coroutine that just prints its arguments
172     async {
173     print "@_\n";
174     } 1,2,3,4;
175    
176 root 1.8 =cut
177    
178 root 1.13 sub async(&@) {
179     my $pid = new Coro @_;
180 root 1.24 $manager->ready; # this ensures that the stack is cloned from the manager
181 root 1.11 $pid->ready;
182     $pid;
183 root 1.8 }
184 root 1.1
185 root 1.8 =item schedule
186 root 1.6
187 root 1.8 Calls the scheduler. Please note that the current process will not be put
188     into the ready queue, so calling this function usually means you will
189     never be called again.
190 root 1.1
191     =cut
192    
193 root 1.22 =item cede
194 root 1.1
195 root 1.22 "Cede" to other processes. This function puts the current process into the
196     ready queue and calls C<schedule>, which has the effect of giving up the
197     current "timeslice" to other coroutines of the same or higher priority.
198 root 1.7
199 root 1.8 =cut
200    
201 root 1.40 =item terminate [arg...]
202 root 1.7
203 pcg 1.59 Terminates the current process with the given status values (see L<cancel>).
204 root 1.13
205 root 1.1 =cut
206    
207 root 1.8 sub terminate {
208 pcg 1.59 $current->cancel (@_);
209 root 1.1 }
210 root 1.6
211 root 1.8 =back
212    
213     # dynamic methods
214    
215     =head2 PROCESS METHODS
216    
217     These are the methods you can call on process objects.
218 root 1.6
219 root 1.8 =over 4
220    
221 root 1.13 =item new Coro \&sub [, @args...]
222 root 1.8
223     Create a new process and return it. When the sub returns the process
224 root 1.40 automatically terminates as if C<terminate> with the returned values were
225 root 1.41 called. To make the process run you must first put it into the ready queue
226     by calling the ready method.
227 root 1.13
228 root 1.6 =cut
229    
230 root 1.13 sub _newcoro {
231     terminate &{+shift};
232     }
233    
234 root 1.8 sub new {
235     my $class = shift;
236     bless {
237 root 1.81 _coro_state => (new Coro::State \&_newcoro, @_),
238 root 1.8 }, $class;
239     }
240 root 1.6
241 root 1.8 =item $process->ready
242 root 1.1
243 root 1.39 Put the given process into the ready queue.
244 root 1.1
245 root 1.8 =cut
246 root 1.28
247 pcg 1.59 =item $process->cancel (arg...)
248 root 1.28
249 root 1.79 Terminates the given process and makes it return the given arguments as
250 pcg 1.59 status (default: the empty list).
251 root 1.28
252     =cut
253    
254     sub cancel {
255 pcg 1.59 my $self = shift;
256     $self->{status} = [@_];
257     push @destroy, $self;
258 root 1.28 $manager->ready;
259 pcg 1.59 &schedule if $current == $self;
260 root 1.40 }
261    
262     =item $process->join
263    
264     Wait until the coroutine terminates and return any values given to the
265 pcg 1.59 C<terminate> or C<cancel> functions. C<join> can be called multiple times
266     from multiple processes.
267 root 1.40
268     =cut
269    
270     sub join {
271     my $self = shift;
272     unless ($self->{status}) {
273     push @{$self->{join}}, $current;
274     &schedule;
275     }
276     wantarray ? @{$self->{status}} : $self->{status}[0];
277 root 1.31 }
278    
279 root 1.82 =item $oldprio = $process->prio ($newprio)
280 root 1.31
281 root 1.41 Sets (or gets, if the argument is missing) the priority of the
282     process. Higher priority processes get run before lower priority
283 root 1.52 processes. Priorities are small signed integers (currently -4 .. +3),
284 root 1.41 that you can refer to using PRIO_xxx constants (use the import tag :prio
285     to get then):
286 root 1.31
287     PRIO_MAX > PRIO_HIGH > PRIO_NORMAL > PRIO_LOW > PRIO_IDLE > PRIO_MIN
288     3 > 1 > 0 > -1 > -3 > -4
289    
290     # set priority to HIGH
291     current->prio(PRIO_HIGH);
292    
293     The idle coroutine ($Coro::idle) always has a lower priority than any
294     existing coroutine.
295    
296     Changing the priority of the current process will take effect immediately,
297     but changing the priority of processes in the ready queue (but not
298     running) will only take effect after the next schedule (of that
299     process). This is a bug that will be fixed in some future version.
300    
301     =cut
302    
303     sub prio {
304 root 1.82 shift->{_coro_state}->prio (@_)
305 root 1.31 }
306    
307 root 1.82 =item $newprio = $process->nice ($change)
308 root 1.31
309     Similar to C<prio>, but subtract the given value from the priority (i.e.
310     higher values mean lower priority, just as in unix).
311    
312     =cut
313    
314     sub nice {
315 root 1.82 shift->{_coro_state}->nice (@_)
316 root 1.41 }
317    
318 root 1.82 =item $olddesc = $process->desc ($newdesc)
319 root 1.41
320     Sets (or gets in case the argument is missing) the description for this
321     process. This is just a free-form string you can associate with a process.
322    
323     =cut
324    
325     sub desc {
326     my $old = $_[0]{desc};
327     $_[0]{desc} = $_[1] if @_ > 1;
328     $old;
329 root 1.8 }
330 root 1.1
331 root 1.8 =back
332 root 1.2
333 root 1.8 =cut
334 root 1.2
335 root 1.8 1;
336 root 1.14
337 root 1.17 =head1 BUGS/LIMITATIONS
338 root 1.14
339 root 1.52 - you must make very sure that no coro is still active on global
340 root 1.53 destruction. very bad things might happen otherwise (usually segfaults).
341 root 1.52
342     - this module is not thread-safe. You should only ever use this module
343     from the same thread (this requirement might be losened in the future
344     to allow per-thread schedulers, but Coro::State does not yet allow
345     this).
346 root 1.9
347     =head1 SEE ALSO
348    
349 root 1.67 Support/Utility: L<Coro::Cont>, L<Coro::Specific>, L<Coro::State>, L<Coro::Util>.
350    
351     Locking/IPC: L<Coro::Signal>, L<Coro::Channel>, L<Coro::Semaphore>, L<Coro::SemaphoreSet>, L<Coro::RWLock>.
352    
353     Event/IO: L<Coro::Timer>, L<Coro::Event>, L<Coro::Handle>, L<Coro::Socket>, L<Coro::Select>.
354    
355     Embedding: L<Coro:MakeMaker>
356 root 1.1
357     =head1 AUTHOR
358    
359 root 1.66 Marc Lehmann <schmorp@schmorp.de>
360 root 1.64 http://home.schmorp.de/
361 root 1.1
362     =cut
363