ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/Coro/Coro.pm
Revision: 1.103
Committed: Thu Jan 4 20:14:19 2007 UTC (17 years, 5 months ago) by root
Branch: MAIN
Changes since 1.102: +74 -26 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3 root 1.8 Coro - coroutine process abstraction
4 root 1.1
5     =head1 SYNOPSIS
6    
7     use Coro;
8    
9 root 1.8 async {
10     # some asynchronous thread of execution
11 root 1.2 };
12    
13 root 1.92 # alternatively create an async coroutine like this:
14 root 1.6
15 root 1.8 sub some_func : Coro {
16     # some more async code
17     }
18    
19 root 1.22 cede;
20 root 1.2
21 root 1.1 =head1 DESCRIPTION
22    
23 root 1.98 This module collection manages coroutines. Coroutines are similar
24     to threads but don't run in parallel at the same time even on SMP
25     machines. The specific flavor of coroutine use din this module also
26     guarentees you that it will not switch between coroutines unless
27     necessary, at easily-identified points in your program, so locking and
28     parallel access are rarely an issue, making coroutine programming much
29     safer than threads programming.
30    
31     (Perl, however, does not natively support real threads but instead does a
32     very slow and memory-intensive emulation of processes using threads. This
33     is a performance win on Windows machines, and a loss everywhere else).
34    
35     In this module, coroutines are defined as "callchain + lexical variables +
36     @_ + $_ + $@ + $/ + C stack), that is, a coroutine has its own callchain,
37     its own set of lexicals and its own set of perls most important global
38     variables.
39 root 1.22
40 root 1.8 =cut
41    
42     package Coro;
43    
44 root 1.71 use strict;
45     no warnings "uninitialized";
46 root 1.36
47 root 1.8 use Coro::State;
48    
49 root 1.83 use base qw(Coro::State Exporter);
50 pcg 1.55
51 root 1.83 our $idle; # idle handler
52 root 1.71 our $main; # main coroutine
53     our $current; # current coroutine
54 root 1.8
55 root 1.101 our $VERSION = '3.3';
56 root 1.8
57 root 1.92 our @EXPORT = qw(async cede schedule terminate current unblock_sub);
58 root 1.71 our %EXPORT_TAGS = (
59 root 1.31 prio => [qw(PRIO_MAX PRIO_HIGH PRIO_NORMAL PRIO_LOW PRIO_IDLE PRIO_MIN)],
60     );
61 root 1.97 our @EXPORT_OK = (@{$EXPORT_TAGS{prio}}, qw(nready));
62 root 1.8
63     {
64     my @async;
65 root 1.26 my $init;
66 root 1.8
67     # this way of handling attributes simply is NOT scalable ;()
68     sub import {
69 root 1.71 no strict 'refs';
70    
71 root 1.93 Coro->export_to_level (1, @_);
72 root 1.71
73 root 1.8 my $old = *{(caller)[0]."::MODIFY_CODE_ATTRIBUTES"}{CODE};
74     *{(caller)[0]."::MODIFY_CODE_ATTRIBUTES"} = sub {
75     my ($package, $ref) = (shift, shift);
76     my @attrs;
77     for (@_) {
78     if ($_ eq "Coro") {
79     push @async, $ref;
80 root 1.26 unless ($init++) {
81     eval q{
82     sub INIT {
83     &async(pop @async) while @async;
84     }
85     };
86     }
87 root 1.8 } else {
88 root 1.17 push @attrs, $_;
89 root 1.8 }
90     }
91 root 1.17 return $old ? $old->($package, $ref, @attrs) : @attrs;
92 root 1.8 };
93     }
94    
95     }
96    
97 root 1.43 =over 4
98    
99 root 1.8 =item $main
100 root 1.2
101 root 1.8 This coroutine represents the main program.
102 root 1.1
103     =cut
104    
105 pcg 1.55 $main = new Coro;
106 root 1.8
107 root 1.19 =item $current (or as function: current)
108 root 1.1
109 root 1.83 The current coroutine (the last coroutine switched to). The initial value
110     is C<$main> (of course).
111    
112     This variable is B<strictly> I<read-only>. It is provided for performance
113     reasons. If performance is not essentiel you are encouraged to use the
114     C<Coro::current> function instead.
115 root 1.1
116 root 1.8 =cut
117    
118     # maybe some other module used Coro::Specific before...
119 root 1.93 $main->{specific} = $current->{specific}
120     if $current;
121 root 1.1
122 root 1.93 _set_current $main;
123 root 1.19
124     sub current() { $current }
125 root 1.9
126     =item $idle
127    
128 root 1.83 A callback that is called whenever the scheduler finds no ready coroutines
129     to run. The default implementation prints "FATAL: deadlock detected" and
130 root 1.91 exits, because the program has no other way to continue.
131 root 1.83
132     This hook is overwritten by modules such as C<Coro::Timer> and
133 root 1.91 C<Coro::Event> to wait on an external event that hopefully wake up a
134     coroutine so the scheduler can run it.
135    
136     Please note that if your callback recursively invokes perl (e.g. for event
137     handlers), then it must be prepared to be called recursively.
138 root 1.9
139     =cut
140    
141 root 1.83 $idle = sub {
142 root 1.96 require Carp;
143     Carp::croak ("FATAL: deadlock detected");
144 root 1.9 };
145 root 1.8
146 root 1.103 sub _cancel {
147     my ($self) = @_;
148    
149     # free coroutine data and mark as destructed
150     $self->_destroy
151     or return;
152    
153     # call all destruction callbacks
154     $_->(@{$self->{status}})
155     for @{(delete $self->{destroy_cb}) || []};
156     }
157    
158 root 1.24 # this coroutine is necessary because a coroutine
159     # cannot destroy itself.
160     my @destroy;
161 root 1.103 my $manager;
162    
163     $manager = new Coro sub {
164 pcg 1.57 while () {
165 root 1.103 (shift @destroy)->_cancel
166     while @destroy;
167    
168 root 1.24 &schedule;
169     }
170     };
171    
172 root 1.103 $manager->prio (PRIO_MAX);
173    
174 root 1.8 # static methods. not really.
175 root 1.43
176     =back
177 root 1.8
178     =head2 STATIC METHODS
179    
180 root 1.92 Static methods are actually functions that operate on the current coroutine only.
181 root 1.8
182     =over 4
183    
184 root 1.13 =item async { ... } [@args...]
185 root 1.8
186 root 1.92 Create a new asynchronous coroutine and return it's coroutine object
187     (usually unused). When the sub returns the new coroutine is automatically
188 root 1.8 terminated.
189    
190 root 1.89 Calling C<exit> in a coroutine will not work correctly, so do not do that.
191    
192 root 1.79 When the coroutine dies, the program will exit, just as in the main
193     program.
194    
195 root 1.13 # create a new coroutine that just prints its arguments
196     async {
197     print "@_\n";
198     } 1,2,3,4;
199    
200 root 1.8 =cut
201    
202 root 1.13 sub async(&@) {
203     my $pid = new Coro @_;
204 root 1.11 $pid->ready;
205 root 1.85 $pid
206 root 1.8 }
207 root 1.1
208 root 1.8 =item schedule
209 root 1.6
210 root 1.92 Calls the scheduler. Please note that the current coroutine will not be put
211 root 1.8 into the ready queue, so calling this function usually means you will
212 root 1.91 never be called again unless something else (e.g. an event handler) calls
213     ready.
214    
215     The canonical way to wait on external events is this:
216    
217     {
218 root 1.92 # remember current coroutine
219 root 1.91 my $current = $Coro::current;
220    
221     # register a hypothetical event handler
222     on_event_invoke sub {
223     # wake up sleeping coroutine
224     $current->ready;
225     undef $current;
226     };
227    
228     # call schedule until event occured.
229     # in case we are woken up for other reasons
230     # (current still defined), loop.
231     Coro::schedule while $current;
232     }
233 root 1.1
234 root 1.22 =item cede
235 root 1.1
236 root 1.92 "Cede" to other coroutines. This function puts the current coroutine into the
237 root 1.22 ready queue and calls C<schedule>, which has the effect of giving up the
238     current "timeslice" to other coroutines of the same or higher priority.
239 root 1.7
240 root 1.102 =item Coro::cede_notself
241    
242     Works like cede, but is not exported by default and will cede to any
243     coroutine, regardless of priority, once.
244    
245 root 1.40 =item terminate [arg...]
246 root 1.7
247 root 1.92 Terminates the current coroutine with the given status values (see L<cancel>).
248 root 1.13
249 root 1.1 =cut
250    
251 root 1.8 sub terminate {
252 pcg 1.59 $current->cancel (@_);
253 root 1.1 }
254 root 1.6
255 root 1.8 =back
256    
257     # dynamic methods
258    
259 root 1.92 =head2 COROUTINE METHODS
260 root 1.8
261 root 1.92 These are the methods you can call on coroutine objects.
262 root 1.6
263 root 1.8 =over 4
264    
265 root 1.13 =item new Coro \&sub [, @args...]
266 root 1.8
267 root 1.92 Create a new coroutine and return it. When the sub returns the coroutine
268 root 1.40 automatically terminates as if C<terminate> with the returned values were
269 root 1.92 called. To make the coroutine run you must first put it into the ready queue
270 root 1.41 by calling the ready method.
271 root 1.13
272 root 1.89 Calling C<exit> in a coroutine will not work correctly, so do not do that.
273    
274 root 1.6 =cut
275    
276 root 1.94 sub _run_coro {
277 root 1.13 terminate &{+shift};
278     }
279    
280 root 1.8 sub new {
281     my $class = shift;
282 root 1.83
283 root 1.94 $class->SUPER::new (\&_run_coro, @_)
284 root 1.8 }
285 root 1.6
286 root 1.92 =item $success = $coroutine->ready
287 root 1.1
288 root 1.92 Put the given coroutine into the ready queue (according to it's priority)
289     and return true. If the coroutine is already in the ready queue, do nothing
290 root 1.90 and return false.
291 root 1.1
292 root 1.92 =item $is_ready = $coroutine->is_ready
293 root 1.90
294 root 1.92 Return wether the coroutine is currently the ready queue or not,
295 root 1.28
296 root 1.92 =item $coroutine->cancel (arg...)
297 root 1.28
298 root 1.92 Terminates the given coroutine and makes it return the given arguments as
299 root 1.103 status (default: the empty list). Never returns if the coroutine is the
300     current coroutine.
301 root 1.28
302     =cut
303    
304     sub cancel {
305 pcg 1.59 my $self = shift;
306     $self->{status} = [@_];
307 root 1.103
308     if ($current == $self) {
309     push @destroy, $self;
310     $manager->ready;
311     &schedule while 1;
312     } else {
313     $self->_cancel;
314     }
315 root 1.40 }
316    
317 root 1.92 =item $coroutine->join
318 root 1.40
319     Wait until the coroutine terminates and return any values given to the
320 pcg 1.59 C<terminate> or C<cancel> functions. C<join> can be called multiple times
321 root 1.92 from multiple coroutine.
322 root 1.40
323     =cut
324    
325     sub join {
326     my $self = shift;
327 root 1.103
328 root 1.40 unless ($self->{status}) {
329 root 1.103 my $current = $current;
330    
331     push @{$self->{destroy_cb}}, sub {
332     $current->ready;
333     undef $current;
334     };
335    
336     &schedule while $current;
337 root 1.40 }
338 root 1.103
339 root 1.40 wantarray ? @{$self->{status}} : $self->{status}[0];
340 root 1.31 }
341    
342 root 1.101 =item $coroutine->on_destroy (\&cb)
343    
344     Registers a callback that is called when this coroutine gets destroyed,
345     but before it is joined. The callback gets passed the terminate arguments,
346     if any.
347    
348     =cut
349    
350     sub on_destroy {
351     my ($self, $cb) = @_;
352    
353     push @{ $self->{destroy_cb} }, $cb;
354     }
355    
356 root 1.92 =item $oldprio = $coroutine->prio ($newprio)
357 root 1.31
358 root 1.41 Sets (or gets, if the argument is missing) the priority of the
359 root 1.92 coroutine. Higher priority coroutines get run before lower priority
360     coroutines. Priorities are small signed integers (currently -4 .. +3),
361 root 1.41 that you can refer to using PRIO_xxx constants (use the import tag :prio
362     to get then):
363 root 1.31
364     PRIO_MAX > PRIO_HIGH > PRIO_NORMAL > PRIO_LOW > PRIO_IDLE > PRIO_MIN
365     3 > 1 > 0 > -1 > -3 > -4
366    
367     # set priority to HIGH
368     current->prio(PRIO_HIGH);
369    
370     The idle coroutine ($Coro::idle) always has a lower priority than any
371     existing coroutine.
372    
373 root 1.92 Changing the priority of the current coroutine will take effect immediately,
374     but changing the priority of coroutines in the ready queue (but not
375 root 1.31 running) will only take effect after the next schedule (of that
376 root 1.92 coroutine). This is a bug that will be fixed in some future version.
377 root 1.31
378 root 1.92 =item $newprio = $coroutine->nice ($change)
379 root 1.31
380     Similar to C<prio>, but subtract the given value from the priority (i.e.
381     higher values mean lower priority, just as in unix).
382    
383 root 1.92 =item $olddesc = $coroutine->desc ($newdesc)
384 root 1.41
385     Sets (or gets in case the argument is missing) the description for this
386 root 1.92 coroutine. This is just a free-form string you can associate with a coroutine.
387 root 1.41
388     =cut
389    
390     sub desc {
391     my $old = $_[0]{desc};
392     $_[0]{desc} = $_[1] if @_ > 1;
393     $old;
394 root 1.8 }
395 root 1.1
396 root 1.8 =back
397 root 1.2
398 root 1.97 =head2 GLOBAL FUNCTIONS
399 root 1.92
400     =over 4
401    
402 root 1.97 =item Coro::nready
403    
404     Returns the number of coroutines that are currently in the ready state,
405     i.e. that can be swicthed to. The value C<0> means that the only runnable
406     coroutine is the currently running one, so C<cede> would have no effect,
407     and C<schedule> would cause a deadlock unless there is an idle handler
408     that wakes up some coroutines.
409    
410 root 1.103 =item my $guard = Coro::guard { ... }
411    
412     This creates and returns a guard object. Nothing happens until the objetc
413     gets destroyed, in which case the codeblock given as argument will be
414     executed. This is useful to free locks or other resources in case of a
415     runtime error or when the coroutine gets canceled, as in both cases the
416     guard block will be executed. The guard object supports only one method,
417     C<< ->cancel >>, which will keep the codeblock from being executed.
418    
419     Example: set some flag and clear it again when the coroutine gets canceled
420     or the function returns:
421    
422     sub do_something {
423     my $guard = Coro::guard { $busy = 0 };
424     $busy = 1;
425    
426     # do something that requires $busy to be true
427     }
428    
429     =cut
430    
431     sub guard(&) {
432     bless \(my $cb = $_[0]), "Coro::guard"
433     }
434    
435     sub Coro::guard::cancel {
436     ${$_[0]} = sub { };
437     }
438    
439     sub Coro::guard::DESTROY {
440     ${$_[0]}->();
441     }
442    
443    
444 root 1.92 =item unblock_sub { ... }
445    
446     This utility function takes a BLOCK or code reference and "unblocks" it,
447     returning the new coderef. This means that the new coderef will return
448     immediately without blocking, returning nothing, while the original code
449     ref will be called (with parameters) from within its own coroutine.
450    
451     The reason this fucntion exists is that many event libraries (such as the
452     venerable L<Event|Event> module) are not coroutine-safe (a weaker form
453     of thread-safety). This means you must not block within event callbacks,
454     otherwise you might suffer from crashes or worse.
455    
456     This function allows your callbacks to block by executing them in another
457     coroutine where it is safe to block. One example where blocking is handy
458     is when you use the L<Coro::AIO|Coro::AIO> functions to save results to
459     disk.
460    
461     In short: simply use C<unblock_sub { ... }> instead of C<sub { ... }> when
462     creating event callbacks that want to block.
463    
464     =cut
465    
466     our @unblock_pool;
467     our @unblock_queue;
468     our $UNBLOCK_POOL_SIZE = 2;
469    
470     sub unblock_handler_ {
471     while () {
472     my ($cb, @arg) = @{ delete $Coro::current->{arg} };
473     $cb->(@arg);
474    
475     last if @unblock_pool >= $UNBLOCK_POOL_SIZE;
476     push @unblock_pool, $Coro::current;
477     schedule;
478     }
479     }
480    
481     our $unblock_scheduler = async {
482     while () {
483     while (my $cb = pop @unblock_queue) {
484     my $handler = (pop @unblock_pool or new Coro \&unblock_handler_);
485     $handler->{arg} = $cb;
486     $handler->ready;
487     cede;
488     }
489    
490     schedule;
491     }
492     };
493    
494     sub unblock_sub(&) {
495     my $cb = shift;
496    
497     sub {
498     push @unblock_queue, [$cb, @_];
499     $unblock_scheduler->ready;
500     }
501     }
502    
503     =back
504    
505 root 1.8 =cut
506 root 1.2
507 root 1.8 1;
508 root 1.14
509 root 1.17 =head1 BUGS/LIMITATIONS
510 root 1.14
511 root 1.52 - you must make very sure that no coro is still active on global
512 root 1.53 destruction. very bad things might happen otherwise (usually segfaults).
513 root 1.52
514     - this module is not thread-safe. You should only ever use this module
515     from the same thread (this requirement might be losened in the future
516     to allow per-thread schedulers, but Coro::State does not yet allow
517     this).
518 root 1.9
519     =head1 SEE ALSO
520    
521 root 1.67 Support/Utility: L<Coro::Cont>, L<Coro::Specific>, L<Coro::State>, L<Coro::Util>.
522    
523     Locking/IPC: L<Coro::Signal>, L<Coro::Channel>, L<Coro::Semaphore>, L<Coro::SemaphoreSet>, L<Coro::RWLock>.
524    
525     Event/IO: L<Coro::Timer>, L<Coro::Event>, L<Coro::Handle>, L<Coro::Socket>, L<Coro::Select>.
526    
527     Embedding: L<Coro:MakeMaker>
528 root 1.1
529     =head1 AUTHOR
530    
531 root 1.66 Marc Lehmann <schmorp@schmorp.de>
532 root 1.64 http://home.schmorp.de/
533 root 1.1
534     =cut
535