ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/Coro/Coro.pm
Revision: 1.107
Committed: Fri Jan 5 18:25:51 2007 UTC (17 years, 5 months ago) by root
Branch: MAIN
Changes since 1.106: +4 -0 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3 root 1.8 Coro - coroutine process abstraction
4 root 1.1
5     =head1 SYNOPSIS
6    
7     use Coro;
8    
9 root 1.8 async {
10     # some asynchronous thread of execution
11 root 1.2 };
12    
13 root 1.92 # alternatively create an async coroutine like this:
14 root 1.6
15 root 1.8 sub some_func : Coro {
16     # some more async code
17     }
18    
19 root 1.22 cede;
20 root 1.2
21 root 1.1 =head1 DESCRIPTION
22    
23 root 1.98 This module collection manages coroutines. Coroutines are similar
24     to threads but don't run in parallel at the same time even on SMP
25     machines. The specific flavor of coroutine use din this module also
26     guarentees you that it will not switch between coroutines unless
27     necessary, at easily-identified points in your program, so locking and
28     parallel access are rarely an issue, making coroutine programming much
29     safer than threads programming.
30    
31     (Perl, however, does not natively support real threads but instead does a
32     very slow and memory-intensive emulation of processes using threads. This
33     is a performance win on Windows machines, and a loss everywhere else).
34    
35     In this module, coroutines are defined as "callchain + lexical variables +
36     @_ + $_ + $@ + $/ + C stack), that is, a coroutine has its own callchain,
37     its own set of lexicals and its own set of perls most important global
38     variables.
39 root 1.22
40 root 1.8 =cut
41    
42     package Coro;
43    
44 root 1.71 use strict;
45     no warnings "uninitialized";
46 root 1.36
47 root 1.8 use Coro::State;
48    
49 root 1.83 use base qw(Coro::State Exporter);
50 pcg 1.55
51 root 1.83 our $idle; # idle handler
52 root 1.71 our $main; # main coroutine
53     our $current; # current coroutine
54 root 1.8
55 root 1.101 our $VERSION = '3.3';
56 root 1.8
57 root 1.105 our @EXPORT = qw(async async_pool cede schedule terminate current unblock_sub);
58 root 1.71 our %EXPORT_TAGS = (
59 root 1.31 prio => [qw(PRIO_MAX PRIO_HIGH PRIO_NORMAL PRIO_LOW PRIO_IDLE PRIO_MIN)],
60     );
61 root 1.97 our @EXPORT_OK = (@{$EXPORT_TAGS{prio}}, qw(nready));
62 root 1.8
63     {
64     my @async;
65 root 1.26 my $init;
66 root 1.8
67     # this way of handling attributes simply is NOT scalable ;()
68     sub import {
69 root 1.71 no strict 'refs';
70    
71 root 1.93 Coro->export_to_level (1, @_);
72 root 1.71
73 root 1.8 my $old = *{(caller)[0]."::MODIFY_CODE_ATTRIBUTES"}{CODE};
74     *{(caller)[0]."::MODIFY_CODE_ATTRIBUTES"} = sub {
75     my ($package, $ref) = (shift, shift);
76     my @attrs;
77     for (@_) {
78     if ($_ eq "Coro") {
79     push @async, $ref;
80 root 1.26 unless ($init++) {
81     eval q{
82     sub INIT {
83     &async(pop @async) while @async;
84     }
85     };
86     }
87 root 1.8 } else {
88 root 1.17 push @attrs, $_;
89 root 1.8 }
90     }
91 root 1.17 return $old ? $old->($package, $ref, @attrs) : @attrs;
92 root 1.8 };
93     }
94    
95     }
96    
97 root 1.43 =over 4
98    
99 root 1.8 =item $main
100 root 1.2
101 root 1.8 This coroutine represents the main program.
102 root 1.1
103     =cut
104    
105 pcg 1.55 $main = new Coro;
106 root 1.8
107 root 1.19 =item $current (or as function: current)
108 root 1.1
109 root 1.83 The current coroutine (the last coroutine switched to). The initial value
110     is C<$main> (of course).
111    
112     This variable is B<strictly> I<read-only>. It is provided for performance
113     reasons. If performance is not essentiel you are encouraged to use the
114     C<Coro::current> function instead.
115 root 1.1
116 root 1.8 =cut
117    
118     # maybe some other module used Coro::Specific before...
119 root 1.93 $main->{specific} = $current->{specific}
120     if $current;
121 root 1.1
122 root 1.93 _set_current $main;
123 root 1.19
124     sub current() { $current }
125 root 1.9
126     =item $idle
127    
128 root 1.83 A callback that is called whenever the scheduler finds no ready coroutines
129     to run. The default implementation prints "FATAL: deadlock detected" and
130 root 1.91 exits, because the program has no other way to continue.
131 root 1.83
132     This hook is overwritten by modules such as C<Coro::Timer> and
133 root 1.91 C<Coro::Event> to wait on an external event that hopefully wake up a
134     coroutine so the scheduler can run it.
135    
136     Please note that if your callback recursively invokes perl (e.g. for event
137     handlers), then it must be prepared to be called recursively.
138 root 1.9
139     =cut
140    
141 root 1.83 $idle = sub {
142 root 1.96 require Carp;
143     Carp::croak ("FATAL: deadlock detected");
144 root 1.9 };
145 root 1.8
146 root 1.103 sub _cancel {
147     my ($self) = @_;
148    
149     # free coroutine data and mark as destructed
150     $self->_destroy
151     or return;
152    
153     # call all destruction callbacks
154     $_->(@{$self->{status}})
155     for @{(delete $self->{destroy_cb}) || []};
156     }
157    
158 root 1.24 # this coroutine is necessary because a coroutine
159     # cannot destroy itself.
160     my @destroy;
161 root 1.103 my $manager;
162    
163     $manager = new Coro sub {
164 pcg 1.57 while () {
165 root 1.103 (shift @destroy)->_cancel
166     while @destroy;
167    
168 root 1.24 &schedule;
169     }
170     };
171    
172 root 1.103 $manager->prio (PRIO_MAX);
173    
174 root 1.8 # static methods. not really.
175 root 1.43
176     =back
177 root 1.8
178     =head2 STATIC METHODS
179    
180 root 1.92 Static methods are actually functions that operate on the current coroutine only.
181 root 1.8
182     =over 4
183    
184 root 1.13 =item async { ... } [@args...]
185 root 1.8
186 root 1.92 Create a new asynchronous coroutine and return it's coroutine object
187     (usually unused). When the sub returns the new coroutine is automatically
188 root 1.8 terminated.
189    
190 root 1.89 Calling C<exit> in a coroutine will not work correctly, so do not do that.
191    
192 root 1.79 When the coroutine dies, the program will exit, just as in the main
193     program.
194    
195 root 1.13 # create a new coroutine that just prints its arguments
196     async {
197     print "@_\n";
198     } 1,2,3,4;
199    
200 root 1.8 =cut
201    
202 root 1.13 sub async(&@) {
203 root 1.104 my $coro = new Coro @_;
204     $coro->ready;
205     $coro
206 root 1.8 }
207 root 1.1
208 root 1.105 =item async_pool { ... } [@args...]
209    
210     Similar to C<async>, but uses a coroutine pool, so you should not call
211     terminate or join (although you are allowed to), and you get a coroutine
212     that might have executed other code already (which can be good or bad :).
213    
214     Also, the block is executed in an C<eval> context and a warning will be
215     issued in case of an exception instead of terminating the program, as C<async> does.
216    
217     The priority will be reset to C<0> after each job, otherwise the coroutine
218     will be re-used "as-is".
219    
220     The pool size is limited to 8 idle coroutines (this can be adjusted by
221     changing $Coro::POOL_SIZE), and there can be as many non-idle coros as
222     required.
223    
224     If you are concerned about pooled coroutines growing a lot because a
225     single C<async_pool> used a lot of stackspace you can e.g. C<async_pool {
226     terminate }> once per second or so to slowly replenish the pool.
227    
228     =cut
229    
230     our $POOL_SIZE = 8;
231     our @pool;
232    
233     sub pool_handler {
234     while () {
235     my ($cb, @arg) = @{ delete $current->{_invoke} };
236    
237     eval {
238     $cb->(@arg);
239     };
240     warn $@ if $@;
241    
242     last if @pool >= $POOL_SIZE;
243     push @pool, $current;
244    
245     $current->prio (0);
246     schedule;
247 root 1.106 }
248     }
249 root 1.105
250     sub async_pool(&@) {
251     # this is also inlined into the unlock_scheduler
252     my $coro = (pop @pool or new Coro \&pool_handler);
253    
254     $coro->{_invoke} = [@_];
255     $coro->ready;
256    
257     $coro
258     }
259    
260 root 1.8 =item schedule
261 root 1.6
262 root 1.92 Calls the scheduler. Please note that the current coroutine will not be put
263 root 1.8 into the ready queue, so calling this function usually means you will
264 root 1.91 never be called again unless something else (e.g. an event handler) calls
265     ready.
266    
267     The canonical way to wait on external events is this:
268    
269     {
270 root 1.92 # remember current coroutine
271 root 1.91 my $current = $Coro::current;
272    
273     # register a hypothetical event handler
274     on_event_invoke sub {
275     # wake up sleeping coroutine
276     $current->ready;
277     undef $current;
278     };
279    
280     # call schedule until event occured.
281     # in case we are woken up for other reasons
282     # (current still defined), loop.
283     Coro::schedule while $current;
284     }
285 root 1.1
286 root 1.22 =item cede
287 root 1.1
288 root 1.92 "Cede" to other coroutines. This function puts the current coroutine into the
289 root 1.22 ready queue and calls C<schedule>, which has the effect of giving up the
290     current "timeslice" to other coroutines of the same or higher priority.
291 root 1.7
292 root 1.107 Returns true if at least one coroutine switch has happened.
293    
294 root 1.102 =item Coro::cede_notself
295    
296     Works like cede, but is not exported by default and will cede to any
297     coroutine, regardless of priority, once.
298    
299 root 1.107 Returns true if at least one coroutine switch has happened.
300    
301 root 1.40 =item terminate [arg...]
302 root 1.7
303 root 1.92 Terminates the current coroutine with the given status values (see L<cancel>).
304 root 1.13
305 root 1.1 =cut
306    
307 root 1.8 sub terminate {
308 pcg 1.59 $current->cancel (@_);
309 root 1.1 }
310 root 1.6
311 root 1.8 =back
312    
313     # dynamic methods
314    
315 root 1.92 =head2 COROUTINE METHODS
316 root 1.8
317 root 1.92 These are the methods you can call on coroutine objects.
318 root 1.6
319 root 1.8 =over 4
320    
321 root 1.13 =item new Coro \&sub [, @args...]
322 root 1.8
323 root 1.92 Create a new coroutine and return it. When the sub returns the coroutine
324 root 1.40 automatically terminates as if C<terminate> with the returned values were
325 root 1.92 called. To make the coroutine run you must first put it into the ready queue
326 root 1.41 by calling the ready method.
327 root 1.13
328 root 1.89 Calling C<exit> in a coroutine will not work correctly, so do not do that.
329    
330 root 1.6 =cut
331    
332 root 1.94 sub _run_coro {
333 root 1.13 terminate &{+shift};
334     }
335    
336 root 1.8 sub new {
337     my $class = shift;
338 root 1.83
339 root 1.94 $class->SUPER::new (\&_run_coro, @_)
340 root 1.8 }
341 root 1.6
342 root 1.92 =item $success = $coroutine->ready
343 root 1.1
344 root 1.92 Put the given coroutine into the ready queue (according to it's priority)
345     and return true. If the coroutine is already in the ready queue, do nothing
346 root 1.90 and return false.
347 root 1.1
348 root 1.92 =item $is_ready = $coroutine->is_ready
349 root 1.90
350 root 1.92 Return wether the coroutine is currently the ready queue or not,
351 root 1.28
352 root 1.92 =item $coroutine->cancel (arg...)
353 root 1.28
354 root 1.92 Terminates the given coroutine and makes it return the given arguments as
355 root 1.103 status (default: the empty list). Never returns if the coroutine is the
356     current coroutine.
357 root 1.28
358     =cut
359    
360     sub cancel {
361 pcg 1.59 my $self = shift;
362     $self->{status} = [@_];
363 root 1.103
364     if ($current == $self) {
365     push @destroy, $self;
366     $manager->ready;
367     &schedule while 1;
368     } else {
369     $self->_cancel;
370     }
371 root 1.40 }
372    
373 root 1.92 =item $coroutine->join
374 root 1.40
375     Wait until the coroutine terminates and return any values given to the
376 pcg 1.59 C<terminate> or C<cancel> functions. C<join> can be called multiple times
377 root 1.92 from multiple coroutine.
378 root 1.40
379     =cut
380    
381     sub join {
382     my $self = shift;
383 root 1.103
384 root 1.40 unless ($self->{status}) {
385 root 1.103 my $current = $current;
386    
387     push @{$self->{destroy_cb}}, sub {
388     $current->ready;
389     undef $current;
390     };
391    
392     &schedule while $current;
393 root 1.40 }
394 root 1.103
395 root 1.40 wantarray ? @{$self->{status}} : $self->{status}[0];
396 root 1.31 }
397    
398 root 1.101 =item $coroutine->on_destroy (\&cb)
399    
400     Registers a callback that is called when this coroutine gets destroyed,
401     but before it is joined. The callback gets passed the terminate arguments,
402     if any.
403    
404     =cut
405    
406     sub on_destroy {
407     my ($self, $cb) = @_;
408    
409     push @{ $self->{destroy_cb} }, $cb;
410     }
411    
412 root 1.92 =item $oldprio = $coroutine->prio ($newprio)
413 root 1.31
414 root 1.41 Sets (or gets, if the argument is missing) the priority of the
415 root 1.92 coroutine. Higher priority coroutines get run before lower priority
416     coroutines. Priorities are small signed integers (currently -4 .. +3),
417 root 1.41 that you can refer to using PRIO_xxx constants (use the import tag :prio
418     to get then):
419 root 1.31
420     PRIO_MAX > PRIO_HIGH > PRIO_NORMAL > PRIO_LOW > PRIO_IDLE > PRIO_MIN
421     3 > 1 > 0 > -1 > -3 > -4
422    
423     # set priority to HIGH
424     current->prio(PRIO_HIGH);
425    
426     The idle coroutine ($Coro::idle) always has a lower priority than any
427     existing coroutine.
428    
429 root 1.92 Changing the priority of the current coroutine will take effect immediately,
430     but changing the priority of coroutines in the ready queue (but not
431 root 1.31 running) will only take effect after the next schedule (of that
432 root 1.92 coroutine). This is a bug that will be fixed in some future version.
433 root 1.31
434 root 1.92 =item $newprio = $coroutine->nice ($change)
435 root 1.31
436     Similar to C<prio>, but subtract the given value from the priority (i.e.
437     higher values mean lower priority, just as in unix).
438    
439 root 1.92 =item $olddesc = $coroutine->desc ($newdesc)
440 root 1.41
441     Sets (or gets in case the argument is missing) the description for this
442 root 1.92 coroutine. This is just a free-form string you can associate with a coroutine.
443 root 1.41
444     =cut
445    
446     sub desc {
447     my $old = $_[0]{desc};
448     $_[0]{desc} = $_[1] if @_ > 1;
449     $old;
450 root 1.8 }
451 root 1.1
452 root 1.8 =back
453 root 1.2
454 root 1.97 =head2 GLOBAL FUNCTIONS
455 root 1.92
456     =over 4
457    
458 root 1.97 =item Coro::nready
459    
460     Returns the number of coroutines that are currently in the ready state,
461     i.e. that can be swicthed to. The value C<0> means that the only runnable
462     coroutine is the currently running one, so C<cede> would have no effect,
463     and C<schedule> would cause a deadlock unless there is an idle handler
464     that wakes up some coroutines.
465    
466 root 1.103 =item my $guard = Coro::guard { ... }
467    
468     This creates and returns a guard object. Nothing happens until the objetc
469     gets destroyed, in which case the codeblock given as argument will be
470     executed. This is useful to free locks or other resources in case of a
471     runtime error or when the coroutine gets canceled, as in both cases the
472     guard block will be executed. The guard object supports only one method,
473     C<< ->cancel >>, which will keep the codeblock from being executed.
474    
475     Example: set some flag and clear it again when the coroutine gets canceled
476     or the function returns:
477    
478     sub do_something {
479     my $guard = Coro::guard { $busy = 0 };
480     $busy = 1;
481    
482     # do something that requires $busy to be true
483     }
484    
485     =cut
486    
487     sub guard(&) {
488     bless \(my $cb = $_[0]), "Coro::guard"
489     }
490    
491     sub Coro::guard::cancel {
492     ${$_[0]} = sub { };
493     }
494    
495     sub Coro::guard::DESTROY {
496     ${$_[0]}->();
497     }
498    
499    
500 root 1.92 =item unblock_sub { ... }
501    
502     This utility function takes a BLOCK or code reference and "unblocks" it,
503     returning the new coderef. This means that the new coderef will return
504     immediately without blocking, returning nothing, while the original code
505     ref will be called (with parameters) from within its own coroutine.
506    
507     The reason this fucntion exists is that many event libraries (such as the
508     venerable L<Event|Event> module) are not coroutine-safe (a weaker form
509     of thread-safety). This means you must not block within event callbacks,
510     otherwise you might suffer from crashes or worse.
511    
512     This function allows your callbacks to block by executing them in another
513     coroutine where it is safe to block. One example where blocking is handy
514     is when you use the L<Coro::AIO|Coro::AIO> functions to save results to
515     disk.
516    
517     In short: simply use C<unblock_sub { ... }> instead of C<sub { ... }> when
518     creating event callbacks that want to block.
519    
520     =cut
521    
522     our @unblock_queue;
523    
524 root 1.105 # we create a special coro because we want to cede,
525     # to reduce pressure on the coro pool (because most callbacks
526     # return immediately and can be reused) and because we cannot cede
527     # inside an event callback.
528 root 1.92 our $unblock_scheduler = async {
529     while () {
530     while (my $cb = pop @unblock_queue) {
531 root 1.105 # this is an inlined copy of async_pool
532     my $coro = (pop @pool or new Coro \&pool_handler);
533    
534     $coro->{_invoke} = $cb;
535     $coro->ready;
536     cede; # for short-lived callbacks, this reduces pressure on the coro pool
537 root 1.92 }
538 root 1.105 schedule; # sleep well
539 root 1.92 }
540     };
541    
542     sub unblock_sub(&) {
543     my $cb = shift;
544    
545     sub {
546 root 1.105 unshift @unblock_queue, [$cb, @_];
547 root 1.92 $unblock_scheduler->ready;
548     }
549     }
550    
551     =back
552    
553 root 1.8 =cut
554 root 1.2
555 root 1.8 1;
556 root 1.14
557 root 1.17 =head1 BUGS/LIMITATIONS
558 root 1.14
559 root 1.52 - you must make very sure that no coro is still active on global
560 root 1.53 destruction. very bad things might happen otherwise (usually segfaults).
561 root 1.52
562     - this module is not thread-safe. You should only ever use this module
563     from the same thread (this requirement might be losened in the future
564     to allow per-thread schedulers, but Coro::State does not yet allow
565     this).
566 root 1.9
567     =head1 SEE ALSO
568    
569 root 1.67 Support/Utility: L<Coro::Cont>, L<Coro::Specific>, L<Coro::State>, L<Coro::Util>.
570    
571     Locking/IPC: L<Coro::Signal>, L<Coro::Channel>, L<Coro::Semaphore>, L<Coro::SemaphoreSet>, L<Coro::RWLock>.
572    
573     Event/IO: L<Coro::Timer>, L<Coro::Event>, L<Coro::Handle>, L<Coro::Socket>, L<Coro::Select>.
574    
575     Embedding: L<Coro:MakeMaker>
576 root 1.1
577     =head1 AUTHOR
578    
579 root 1.66 Marc Lehmann <schmorp@schmorp.de>
580 root 1.64 http://home.schmorp.de/
581 root 1.1
582     =cut
583