ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/Coro/Coro.pm
Revision: 1.75
Committed: Wed Jan 25 21:43:58 2006 UTC (18 years, 4 months ago) by root
Branch: MAIN
Changes since 1.74: +1 -1 lines
Log Message:
*** empty log message ***

File Contents

# Content
1 =head1 NAME
2
3 Coro - coroutine process abstraction
4
5 =head1 SYNOPSIS
6
7 use Coro;
8
9 async {
10 # some asynchronous thread of execution
11 };
12
13 # alternatively create an async process like this:
14
15 sub some_func : Coro {
16 # some more async code
17 }
18
19 cede;
20
21 =head1 DESCRIPTION
22
23 This module collection manages coroutines. Coroutines are similar to
24 threads but don't run in parallel.
25
26 In this module, coroutines are defined as "callchain + lexical variables
27 + @_ + $_ + $@ + $^W + C stack), that is, a coroutine has it's own
28 callchain, it's own set of lexicals and it's own set of perl's most
29 important global variables.
30
31 =cut
32
33 package Coro;
34
35 use strict;
36 no warnings "uninitialized";
37
38 use Coro::State;
39
40 use base Exporter::;
41
42 our $idle; # idle coroutine
43 our $main; # main coroutine
44 our $current; # current coroutine
45
46 our $VERSION = 1.8;
47
48 our @EXPORT = qw(async cede schedule terminate current);
49 our %EXPORT_TAGS = (
50 prio => [qw(PRIO_MAX PRIO_HIGH PRIO_NORMAL PRIO_LOW PRIO_IDLE PRIO_MIN)],
51 );
52 our @EXPORT_OK = @{$EXPORT_TAGS{prio}};
53
54 {
55 my @async;
56 my $init;
57
58 # this way of handling attributes simply is NOT scalable ;()
59 sub import {
60 no strict 'refs';
61
62 Coro->export_to_level(1, @_);
63
64 my $old = *{(caller)[0]."::MODIFY_CODE_ATTRIBUTES"}{CODE};
65 *{(caller)[0]."::MODIFY_CODE_ATTRIBUTES"} = sub {
66 my ($package, $ref) = (shift, shift);
67 my @attrs;
68 for (@_) {
69 if ($_ eq "Coro") {
70 push @async, $ref;
71 unless ($init++) {
72 eval q{
73 sub INIT {
74 &async(pop @async) while @async;
75 }
76 };
77 }
78 } else {
79 push @attrs, $_;
80 }
81 }
82 return $old ? $old->($package, $ref, @attrs) : @attrs;
83 };
84 }
85
86 }
87
88 =over 4
89
90 =item $main
91
92 This coroutine represents the main program.
93
94 =cut
95
96 $main = new Coro;
97
98 =item $current (or as function: current)
99
100 The current coroutine (the last coroutine switched to). The initial value is C<$main> (of course).
101
102 =cut
103
104 # maybe some other module used Coro::Specific before...
105 if ($current) {
106 $main->{specific} = $current->{specific};
107 }
108
109 $current = $main;
110
111 sub current() { $current }
112
113 =item $idle
114
115 The coroutine to switch to when no other coroutine is running. The default
116 implementation prints "FATAL: deadlock detected" and exits.
117
118 =cut
119
120 # should be done using priorities :(
121 $idle = new Coro sub {
122 print STDERR "FATAL: deadlock detected\n";
123 exit(51);
124 };
125
126 # this coroutine is necessary because a coroutine
127 # cannot destroy itself.
128 my @destroy;
129 my $manager;
130 $manager = new Coro sub {
131 while () {
132 # by overwriting the state object with the manager we destroy it
133 # while still being able to schedule this coroutine (in case it has
134 # been readied multiple times. this is harmless since the manager
135 # can be called as many times as neccessary and will always
136 # remove itself from the runqueue
137 while (@destroy) {
138 my $coro = pop @destroy;
139 $coro->{status} ||= [];
140 $_->ready for @{delete $coro->{join} || []};
141
142 # the next line destroys the _coro_state, but keeps the
143 # process itself intact (we basically make it a zombie
144 # process that always runs the manager thread, so it's possible
145 # to transfer() to this process).
146 $coro->{_coro_state} = $manager->{_coro_state};
147 }
148 &schedule;
149 }
150 };
151
152 # static methods. not really.
153
154 =back
155
156 =head2 STATIC METHODS
157
158 Static methods are actually functions that operate on the current process only.
159
160 =over 4
161
162 =item async { ... } [@args...]
163
164 Create a new asynchronous process and return it's process object
165 (usually unused). When the sub returns the new process is automatically
166 terminated.
167
168 # create a new coroutine that just prints its arguments
169 async {
170 print "@_\n";
171 } 1,2,3,4;
172
173 =cut
174
175 sub async(&@) {
176 my $pid = new Coro @_;
177 $manager->ready; # this ensures that the stack is cloned from the manager
178 $pid->ready;
179 $pid;
180 }
181
182 =item schedule
183
184 Calls the scheduler. Please note that the current process will not be put
185 into the ready queue, so calling this function usually means you will
186 never be called again.
187
188 =cut
189
190 =item cede
191
192 "Cede" to other processes. This function puts the current process into the
193 ready queue and calls C<schedule>, which has the effect of giving up the
194 current "timeslice" to other coroutines of the same or higher priority.
195
196 =cut
197
198 =item terminate [arg...]
199
200 Terminates the current process with the given status values (see L<cancel>).
201
202 =cut
203
204 sub terminate {
205 $current->cancel (@_);
206 }
207
208 =back
209
210 # dynamic methods
211
212 =head2 PROCESS METHODS
213
214 These are the methods you can call on process objects.
215
216 =over 4
217
218 =item new Coro \&sub [, @args...]
219
220 Create a new process and return it. When the sub returns the process
221 automatically terminates as if C<terminate> with the returned values were
222 called. To make the process run you must first put it into the ready queue
223 by calling the ready method.
224
225 =cut
226
227 sub _newcoro {
228 terminate &{+shift};
229 }
230
231 sub new {
232 my $class = shift;
233 bless {
234 _coro_state => (new Coro::State $_[0] && \&_newcoro, @_),
235 }, $class;
236 }
237
238 =item $process->ready
239
240 Put the given process into the ready queue.
241
242 =cut
243
244 =item $process->cancel (arg...)
245
246 Temrinates the given process and makes it return the given arguments as
247 status (default: the empty list).
248
249 =cut
250
251 sub cancel {
252 my $self = shift;
253 $self->{status} = [@_];
254 push @destroy, $self;
255 $manager->ready;
256 &schedule if $current == $self;
257 }
258
259 =item $process->join
260
261 Wait until the coroutine terminates and return any values given to the
262 C<terminate> or C<cancel> functions. C<join> can be called multiple times
263 from multiple processes.
264
265 =cut
266
267 sub join {
268 my $self = shift;
269 unless ($self->{status}) {
270 push @{$self->{join}}, $current;
271 &schedule;
272 }
273 wantarray ? @{$self->{status}} : $self->{status}[0];
274 }
275
276 =item $oldprio = $process->prio($newprio)
277
278 Sets (or gets, if the argument is missing) the priority of the
279 process. Higher priority processes get run before lower priority
280 processes. Priorities are small signed integers (currently -4 .. +3),
281 that you can refer to using PRIO_xxx constants (use the import tag :prio
282 to get then):
283
284 PRIO_MAX > PRIO_HIGH > PRIO_NORMAL > PRIO_LOW > PRIO_IDLE > PRIO_MIN
285 3 > 1 > 0 > -1 > -3 > -4
286
287 # set priority to HIGH
288 current->prio(PRIO_HIGH);
289
290 The idle coroutine ($Coro::idle) always has a lower priority than any
291 existing coroutine.
292
293 Changing the priority of the current process will take effect immediately,
294 but changing the priority of processes in the ready queue (but not
295 running) will only take effect after the next schedule (of that
296 process). This is a bug that will be fixed in some future version.
297
298 =cut
299
300 sub prio {
301 my $old = $_[0]{prio};
302 $_[0]{prio} = $_[1] if @_ > 1;
303 $old;
304 }
305
306 =item $newprio = $process->nice($change)
307
308 Similar to C<prio>, but subtract the given value from the priority (i.e.
309 higher values mean lower priority, just as in unix).
310
311 =cut
312
313 sub nice {
314 $_[0]{prio} -= $_[1];
315 }
316
317 =item $olddesc = $process->desc($newdesc)
318
319 Sets (or gets in case the argument is missing) the description for this
320 process. This is just a free-form string you can associate with a process.
321
322 =cut
323
324 sub desc {
325 my $old = $_[0]{desc};
326 $_[0]{desc} = $_[1] if @_ > 1;
327 $old;
328 }
329
330 =back
331
332 =cut
333
334 1;
335
336 =head1 BUGS/LIMITATIONS
337
338 - you must make very sure that no coro is still active on global
339 destruction. very bad things might happen otherwise (usually segfaults).
340
341 - this module is not thread-safe. You should only ever use this module
342 from the same thread (this requirement might be losened in the future
343 to allow per-thread schedulers, but Coro::State does not yet allow
344 this).
345
346 =head1 SEE ALSO
347
348 Support/Utility: L<Coro::Cont>, L<Coro::Specific>, L<Coro::State>, L<Coro::Util>.
349
350 Locking/IPC: L<Coro::Signal>, L<Coro::Channel>, L<Coro::Semaphore>, L<Coro::SemaphoreSet>, L<Coro::RWLock>.
351
352 Event/IO: L<Coro::Timer>, L<Coro::Event>, L<Coro::Handle>, L<Coro::Socket>, L<Coro::Select>.
353
354 Embedding: L<Coro:MakeMaker>
355
356 =head1 AUTHOR
357
358 Marc Lehmann <schmorp@schmorp.de>
359 http://home.schmorp.de/
360
361 =cut
362