ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/Coro/Coro.pm
Revision: 1.101
Committed: Fri Dec 29 08:36:34 2006 UTC (17 years, 5 months ago) by root
Branch: MAIN
Changes since 1.100: +19 -2 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3 root 1.8 Coro - coroutine process abstraction
4 root 1.1
5     =head1 SYNOPSIS
6    
7     use Coro;
8    
9 root 1.8 async {
10     # some asynchronous thread of execution
11 root 1.2 };
12    
13 root 1.92 # alternatively create an async coroutine like this:
14 root 1.6
15 root 1.8 sub some_func : Coro {
16     # some more async code
17     }
18    
19 root 1.22 cede;
20 root 1.2
21 root 1.1 =head1 DESCRIPTION
22    
23 root 1.98 This module collection manages coroutines. Coroutines are similar
24     to threads but don't run in parallel at the same time even on SMP
25     machines. The specific flavor of coroutine use din this module also
26     guarentees you that it will not switch between coroutines unless
27     necessary, at easily-identified points in your program, so locking and
28     parallel access are rarely an issue, making coroutine programming much
29     safer than threads programming.
30    
31     (Perl, however, does not natively support real threads but instead does a
32     very slow and memory-intensive emulation of processes using threads. This
33     is a performance win on Windows machines, and a loss everywhere else).
34    
35     In this module, coroutines are defined as "callchain + lexical variables +
36     @_ + $_ + $@ + $/ + C stack), that is, a coroutine has its own callchain,
37     its own set of lexicals and its own set of perls most important global
38     variables.
39 root 1.22
40 root 1.8 =cut
41    
42     package Coro;
43    
44 root 1.71 use strict;
45     no warnings "uninitialized";
46 root 1.36
47 root 1.8 use Coro::State;
48    
49 root 1.83 use base qw(Coro::State Exporter);
50 pcg 1.55
51 root 1.83 our $idle; # idle handler
52 root 1.71 our $main; # main coroutine
53     our $current; # current coroutine
54 root 1.8
55 root 1.101 our $VERSION = '3.3';
56 root 1.8
57 root 1.92 our @EXPORT = qw(async cede schedule terminate current unblock_sub);
58 root 1.71 our %EXPORT_TAGS = (
59 root 1.31 prio => [qw(PRIO_MAX PRIO_HIGH PRIO_NORMAL PRIO_LOW PRIO_IDLE PRIO_MIN)],
60     );
61 root 1.97 our @EXPORT_OK = (@{$EXPORT_TAGS{prio}}, qw(nready));
62 root 1.8
63     {
64     my @async;
65 root 1.26 my $init;
66 root 1.8
67     # this way of handling attributes simply is NOT scalable ;()
68     sub import {
69 root 1.71 no strict 'refs';
70    
71 root 1.93 Coro->export_to_level (1, @_);
72 root 1.71
73 root 1.8 my $old = *{(caller)[0]."::MODIFY_CODE_ATTRIBUTES"}{CODE};
74     *{(caller)[0]."::MODIFY_CODE_ATTRIBUTES"} = sub {
75     my ($package, $ref) = (shift, shift);
76     my @attrs;
77     for (@_) {
78     if ($_ eq "Coro") {
79     push @async, $ref;
80 root 1.26 unless ($init++) {
81     eval q{
82     sub INIT {
83     &async(pop @async) while @async;
84     }
85     };
86     }
87 root 1.8 } else {
88 root 1.17 push @attrs, $_;
89 root 1.8 }
90     }
91 root 1.17 return $old ? $old->($package, $ref, @attrs) : @attrs;
92 root 1.8 };
93     }
94    
95     }
96    
97 root 1.43 =over 4
98    
99 root 1.8 =item $main
100 root 1.2
101 root 1.8 This coroutine represents the main program.
102 root 1.1
103     =cut
104    
105 pcg 1.55 $main = new Coro;
106 root 1.8
107 root 1.19 =item $current (or as function: current)
108 root 1.1
109 root 1.83 The current coroutine (the last coroutine switched to). The initial value
110     is C<$main> (of course).
111    
112     This variable is B<strictly> I<read-only>. It is provided for performance
113     reasons. If performance is not essentiel you are encouraged to use the
114     C<Coro::current> function instead.
115 root 1.1
116 root 1.8 =cut
117    
118     # maybe some other module used Coro::Specific before...
119 root 1.93 $main->{specific} = $current->{specific}
120     if $current;
121 root 1.1
122 root 1.93 _set_current $main;
123 root 1.19
124     sub current() { $current }
125 root 1.9
126     =item $idle
127    
128 root 1.83 A callback that is called whenever the scheduler finds no ready coroutines
129     to run. The default implementation prints "FATAL: deadlock detected" and
130 root 1.91 exits, because the program has no other way to continue.
131 root 1.83
132     This hook is overwritten by modules such as C<Coro::Timer> and
133 root 1.91 C<Coro::Event> to wait on an external event that hopefully wake up a
134     coroutine so the scheduler can run it.
135    
136     Please note that if your callback recursively invokes perl (e.g. for event
137     handlers), then it must be prepared to be called recursively.
138 root 1.9
139     =cut
140    
141 root 1.83 $idle = sub {
142 root 1.96 require Carp;
143     Carp::croak ("FATAL: deadlock detected");
144 root 1.9 };
145 root 1.8
146 root 1.24 # this coroutine is necessary because a coroutine
147     # cannot destroy itself.
148     my @destroy;
149 root 1.86 my $manager; $manager = new Coro sub {
150 pcg 1.57 while () {
151 root 1.37 # by overwriting the state object with the manager we destroy it
152     # while still being able to schedule this coroutine (in case it has
153     # been readied multiple times. this is harmless since the manager
154     # can be called as many times as neccessary and will always
155     # remove itself from the runqueue
156 root 1.40 while (@destroy) {
157     my $coro = pop @destroy;
158 root 1.101
159 root 1.40 $coro->{status} ||= [];
160 root 1.101
161     $_->ready for @{(delete $coro->{join} ) || []};
162     $_->(@{$coro->{status}}) for @{(delete $coro->{destroy_cb}) || []};
163 pcg 1.59
164 root 1.83 # the next line destroys the coro state, but keeps the
165 root 1.92 # coroutine itself intact (we basically make it a zombie
166     # coroutine that always runs the manager thread, so it's possible
167     # to transfer() to this coroutine).
168 root 1.83 $coro->_clone_state_from ($manager);
169 root 1.40 }
170 root 1.24 &schedule;
171     }
172     };
173    
174 root 1.8 # static methods. not really.
175 root 1.43
176     =back
177 root 1.8
178     =head2 STATIC METHODS
179    
180 root 1.92 Static methods are actually functions that operate on the current coroutine only.
181 root 1.8
182     =over 4
183    
184 root 1.13 =item async { ... } [@args...]
185 root 1.8
186 root 1.92 Create a new asynchronous coroutine and return it's coroutine object
187     (usually unused). When the sub returns the new coroutine is automatically
188 root 1.8 terminated.
189    
190 root 1.89 Calling C<exit> in a coroutine will not work correctly, so do not do that.
191    
192 root 1.79 When the coroutine dies, the program will exit, just as in the main
193     program.
194    
195 root 1.13 # create a new coroutine that just prints its arguments
196     async {
197     print "@_\n";
198     } 1,2,3,4;
199    
200 root 1.8 =cut
201    
202 root 1.13 sub async(&@) {
203     my $pid = new Coro @_;
204 root 1.11 $pid->ready;
205 root 1.85 $pid
206 root 1.8 }
207 root 1.1
208 root 1.8 =item schedule
209 root 1.6
210 root 1.92 Calls the scheduler. Please note that the current coroutine will not be put
211 root 1.8 into the ready queue, so calling this function usually means you will
212 root 1.91 never be called again unless something else (e.g. an event handler) calls
213     ready.
214    
215     The canonical way to wait on external events is this:
216    
217     {
218 root 1.92 # remember current coroutine
219 root 1.91 my $current = $Coro::current;
220    
221     # register a hypothetical event handler
222     on_event_invoke sub {
223     # wake up sleeping coroutine
224     $current->ready;
225     undef $current;
226     };
227    
228     # call schedule until event occured.
229     # in case we are woken up for other reasons
230     # (current still defined), loop.
231     Coro::schedule while $current;
232     }
233 root 1.1
234 root 1.22 =item cede
235 root 1.1
236 root 1.92 "Cede" to other coroutines. This function puts the current coroutine into the
237 root 1.22 ready queue and calls C<schedule>, which has the effect of giving up the
238     current "timeslice" to other coroutines of the same or higher priority.
239 root 1.7
240 root 1.40 =item terminate [arg...]
241 root 1.7
242 root 1.92 Terminates the current coroutine with the given status values (see L<cancel>).
243 root 1.13
244 root 1.1 =cut
245    
246 root 1.8 sub terminate {
247 pcg 1.59 $current->cancel (@_);
248 root 1.1 }
249 root 1.6
250 root 1.8 =back
251    
252     # dynamic methods
253    
254 root 1.92 =head2 COROUTINE METHODS
255 root 1.8
256 root 1.92 These are the methods you can call on coroutine objects.
257 root 1.6
258 root 1.8 =over 4
259    
260 root 1.13 =item new Coro \&sub [, @args...]
261 root 1.8
262 root 1.92 Create a new coroutine and return it. When the sub returns the coroutine
263 root 1.40 automatically terminates as if C<terminate> with the returned values were
264 root 1.92 called. To make the coroutine run you must first put it into the ready queue
265 root 1.41 by calling the ready method.
266 root 1.13
267 root 1.89 Calling C<exit> in a coroutine will not work correctly, so do not do that.
268    
269 root 1.6 =cut
270    
271 root 1.94 sub _run_coro {
272 root 1.13 terminate &{+shift};
273     }
274    
275 root 1.8 sub new {
276     my $class = shift;
277 root 1.83
278 root 1.94 $class->SUPER::new (\&_run_coro, @_)
279 root 1.8 }
280 root 1.6
281 root 1.92 =item $success = $coroutine->ready
282 root 1.1
283 root 1.92 Put the given coroutine into the ready queue (according to it's priority)
284     and return true. If the coroutine is already in the ready queue, do nothing
285 root 1.90 and return false.
286 root 1.1
287 root 1.92 =item $is_ready = $coroutine->is_ready
288 root 1.90
289 root 1.92 Return wether the coroutine is currently the ready queue or not,
290 root 1.28
291 root 1.92 =item $coroutine->cancel (arg...)
292 root 1.28
293 root 1.92 Terminates the given coroutine and makes it return the given arguments as
294 pcg 1.59 status (default: the empty list).
295 root 1.28
296     =cut
297    
298     sub cancel {
299 pcg 1.59 my $self = shift;
300     $self->{status} = [@_];
301     push @destroy, $self;
302 root 1.28 $manager->ready;
303 pcg 1.59 &schedule if $current == $self;
304 root 1.40 }
305    
306 root 1.92 =item $coroutine->join
307 root 1.40
308     Wait until the coroutine terminates and return any values given to the
309 pcg 1.59 C<terminate> or C<cancel> functions. C<join> can be called multiple times
310 root 1.92 from multiple coroutine.
311 root 1.40
312     =cut
313    
314     sub join {
315     my $self = shift;
316     unless ($self->{status}) {
317     push @{$self->{join}}, $current;
318     &schedule;
319     }
320     wantarray ? @{$self->{status}} : $self->{status}[0];
321 root 1.31 }
322    
323 root 1.101 =item $coroutine->on_destroy (\&cb)
324    
325     Registers a callback that is called when this coroutine gets destroyed,
326     but before it is joined. The callback gets passed the terminate arguments,
327     if any.
328    
329     =cut
330    
331     sub on_destroy {
332     my ($self, $cb) = @_;
333    
334     push @{ $self->{destroy_cb} }, $cb;
335     }
336    
337 root 1.92 =item $oldprio = $coroutine->prio ($newprio)
338 root 1.31
339 root 1.41 Sets (or gets, if the argument is missing) the priority of the
340 root 1.92 coroutine. Higher priority coroutines get run before lower priority
341     coroutines. Priorities are small signed integers (currently -4 .. +3),
342 root 1.41 that you can refer to using PRIO_xxx constants (use the import tag :prio
343     to get then):
344 root 1.31
345     PRIO_MAX > PRIO_HIGH > PRIO_NORMAL > PRIO_LOW > PRIO_IDLE > PRIO_MIN
346     3 > 1 > 0 > -1 > -3 > -4
347    
348     # set priority to HIGH
349     current->prio(PRIO_HIGH);
350    
351     The idle coroutine ($Coro::idle) always has a lower priority than any
352     existing coroutine.
353    
354 root 1.92 Changing the priority of the current coroutine will take effect immediately,
355     but changing the priority of coroutines in the ready queue (but not
356 root 1.31 running) will only take effect after the next schedule (of that
357 root 1.92 coroutine). This is a bug that will be fixed in some future version.
358 root 1.31
359 root 1.92 =item $newprio = $coroutine->nice ($change)
360 root 1.31
361     Similar to C<prio>, but subtract the given value from the priority (i.e.
362     higher values mean lower priority, just as in unix).
363    
364 root 1.92 =item $olddesc = $coroutine->desc ($newdesc)
365 root 1.41
366     Sets (or gets in case the argument is missing) the description for this
367 root 1.92 coroutine. This is just a free-form string you can associate with a coroutine.
368 root 1.41
369     =cut
370    
371     sub desc {
372     my $old = $_[0]{desc};
373     $_[0]{desc} = $_[1] if @_ > 1;
374     $old;
375 root 1.8 }
376 root 1.1
377 root 1.8 =back
378 root 1.2
379 root 1.97 =head2 GLOBAL FUNCTIONS
380 root 1.92
381     =over 4
382    
383 root 1.97 =item Coro::nready
384    
385     Returns the number of coroutines that are currently in the ready state,
386     i.e. that can be swicthed to. The value C<0> means that the only runnable
387     coroutine is the currently running one, so C<cede> would have no effect,
388     and C<schedule> would cause a deadlock unless there is an idle handler
389     that wakes up some coroutines.
390    
391 root 1.92 =item unblock_sub { ... }
392    
393     This utility function takes a BLOCK or code reference and "unblocks" it,
394     returning the new coderef. This means that the new coderef will return
395     immediately without blocking, returning nothing, while the original code
396     ref will be called (with parameters) from within its own coroutine.
397    
398     The reason this fucntion exists is that many event libraries (such as the
399     venerable L<Event|Event> module) are not coroutine-safe (a weaker form
400     of thread-safety). This means you must not block within event callbacks,
401     otherwise you might suffer from crashes or worse.
402    
403     This function allows your callbacks to block by executing them in another
404     coroutine where it is safe to block. One example where blocking is handy
405     is when you use the L<Coro::AIO|Coro::AIO> functions to save results to
406     disk.
407    
408     In short: simply use C<unblock_sub { ... }> instead of C<sub { ... }> when
409     creating event callbacks that want to block.
410    
411     =cut
412    
413     our @unblock_pool;
414     our @unblock_queue;
415     our $UNBLOCK_POOL_SIZE = 2;
416    
417     sub unblock_handler_ {
418     while () {
419     my ($cb, @arg) = @{ delete $Coro::current->{arg} };
420     $cb->(@arg);
421    
422     last if @unblock_pool >= $UNBLOCK_POOL_SIZE;
423     push @unblock_pool, $Coro::current;
424     schedule;
425     }
426     }
427    
428     our $unblock_scheduler = async {
429     while () {
430     while (my $cb = pop @unblock_queue) {
431     my $handler = (pop @unblock_pool or new Coro \&unblock_handler_);
432     $handler->{arg} = $cb;
433     $handler->ready;
434     cede;
435     }
436    
437     schedule;
438     }
439     };
440    
441     sub unblock_sub(&) {
442     my $cb = shift;
443    
444     sub {
445     push @unblock_queue, [$cb, @_];
446     $unblock_scheduler->ready;
447     }
448     }
449    
450     =back
451    
452 root 1.8 =cut
453 root 1.2
454 root 1.8 1;
455 root 1.14
456 root 1.17 =head1 BUGS/LIMITATIONS
457 root 1.14
458 root 1.52 - you must make very sure that no coro is still active on global
459 root 1.53 destruction. very bad things might happen otherwise (usually segfaults).
460 root 1.52
461     - this module is not thread-safe. You should only ever use this module
462     from the same thread (this requirement might be losened in the future
463     to allow per-thread schedulers, but Coro::State does not yet allow
464     this).
465 root 1.9
466     =head1 SEE ALSO
467    
468 root 1.67 Support/Utility: L<Coro::Cont>, L<Coro::Specific>, L<Coro::State>, L<Coro::Util>.
469    
470     Locking/IPC: L<Coro::Signal>, L<Coro::Channel>, L<Coro::Semaphore>, L<Coro::SemaphoreSet>, L<Coro::RWLock>.
471    
472     Event/IO: L<Coro::Timer>, L<Coro::Event>, L<Coro::Handle>, L<Coro::Socket>, L<Coro::Select>.
473    
474     Embedding: L<Coro:MakeMaker>
475 root 1.1
476     =head1 AUTHOR
477    
478 root 1.66 Marc Lehmann <schmorp@schmorp.de>
479 root 1.64 http://home.schmorp.de/
480 root 1.1
481     =cut
482