ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/Coro/Coro.pm
Revision: 1.129
Committed: Wed Sep 19 22:33:08 2007 UTC (16 years, 8 months ago) by root
Branch: MAIN
Changes since 1.128: +8 -6 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3 root 1.8 Coro - coroutine process abstraction
4 root 1.1
5     =head1 SYNOPSIS
6    
7     use Coro;
8    
9 root 1.8 async {
10     # some asynchronous thread of execution
11 root 1.2 };
12    
13 root 1.92 # alternatively create an async coroutine like this:
14 root 1.6
15 root 1.8 sub some_func : Coro {
16     # some more async code
17     }
18    
19 root 1.22 cede;
20 root 1.2
21 root 1.1 =head1 DESCRIPTION
22    
23 root 1.98 This module collection manages coroutines. Coroutines are similar
24     to threads but don't run in parallel at the same time even on SMP
25 root 1.124 machines. The specific flavor of coroutine used in this module also
26     guarantees you that it will not switch between coroutines unless
27 root 1.98 necessary, at easily-identified points in your program, so locking and
28     parallel access are rarely an issue, making coroutine programming much
29     safer than threads programming.
30    
31     (Perl, however, does not natively support real threads but instead does a
32     very slow and memory-intensive emulation of processes using threads. This
33     is a performance win on Windows machines, and a loss everywhere else).
34    
35     In this module, coroutines are defined as "callchain + lexical variables +
36     @_ + $_ + $@ + $/ + C stack), that is, a coroutine has its own callchain,
37     its own set of lexicals and its own set of perls most important global
38     variables.
39 root 1.22
40 root 1.8 =cut
41    
42     package Coro;
43    
44 root 1.71 use strict;
45     no warnings "uninitialized";
46 root 1.36
47 root 1.8 use Coro::State;
48    
49 root 1.83 use base qw(Coro::State Exporter);
50 pcg 1.55
51 root 1.83 our $idle; # idle handler
52 root 1.71 our $main; # main coroutine
53     our $current; # current coroutine
54 root 1.8
55 root 1.128 our $VERSION = '3.7';
56 root 1.8
57 root 1.105 our @EXPORT = qw(async async_pool cede schedule terminate current unblock_sub);
58 root 1.71 our %EXPORT_TAGS = (
59 root 1.31 prio => [qw(PRIO_MAX PRIO_HIGH PRIO_NORMAL PRIO_LOW PRIO_IDLE PRIO_MIN)],
60     );
61 root 1.97 our @EXPORT_OK = (@{$EXPORT_TAGS{prio}}, qw(nready));
62 root 1.8
63     {
64     my @async;
65 root 1.26 my $init;
66 root 1.8
67     # this way of handling attributes simply is NOT scalable ;()
68     sub import {
69 root 1.71 no strict 'refs';
70    
71 root 1.93 Coro->export_to_level (1, @_);
72 root 1.71
73 root 1.8 my $old = *{(caller)[0]."::MODIFY_CODE_ATTRIBUTES"}{CODE};
74     *{(caller)[0]."::MODIFY_CODE_ATTRIBUTES"} = sub {
75     my ($package, $ref) = (shift, shift);
76     my @attrs;
77     for (@_) {
78     if ($_ eq "Coro") {
79     push @async, $ref;
80 root 1.26 unless ($init++) {
81     eval q{
82     sub INIT {
83     &async(pop @async) while @async;
84     }
85     };
86     }
87 root 1.8 } else {
88 root 1.17 push @attrs, $_;
89 root 1.8 }
90     }
91 root 1.17 return $old ? $old->($package, $ref, @attrs) : @attrs;
92 root 1.8 };
93     }
94    
95     }
96    
97 root 1.43 =over 4
98    
99 root 1.8 =item $main
100 root 1.2
101 root 1.8 This coroutine represents the main program.
102 root 1.1
103     =cut
104    
105 pcg 1.55 $main = new Coro;
106 root 1.8
107 root 1.19 =item $current (or as function: current)
108 root 1.1
109 root 1.83 The current coroutine (the last coroutine switched to). The initial value
110     is C<$main> (of course).
111    
112     This variable is B<strictly> I<read-only>. It is provided for performance
113 root 1.124 reasons. If performance is not essential you are encouraged to use the
114 root 1.83 C<Coro::current> function instead.
115 root 1.1
116 root 1.8 =cut
117    
118     # maybe some other module used Coro::Specific before...
119 root 1.93 $main->{specific} = $current->{specific}
120     if $current;
121 root 1.1
122 root 1.93 _set_current $main;
123 root 1.19
124     sub current() { $current }
125 root 1.9
126     =item $idle
127    
128 root 1.83 A callback that is called whenever the scheduler finds no ready coroutines
129     to run. The default implementation prints "FATAL: deadlock detected" and
130 root 1.91 exits, because the program has no other way to continue.
131 root 1.83
132     This hook is overwritten by modules such as C<Coro::Timer> and
133 root 1.91 C<Coro::Event> to wait on an external event that hopefully wake up a
134     coroutine so the scheduler can run it.
135    
136     Please note that if your callback recursively invokes perl (e.g. for event
137     handlers), then it must be prepared to be called recursively.
138 root 1.9
139     =cut
140    
141 root 1.83 $idle = sub {
142 root 1.96 require Carp;
143     Carp::croak ("FATAL: deadlock detected");
144 root 1.9 };
145 root 1.8
146 root 1.103 sub _cancel {
147     my ($self) = @_;
148    
149     # free coroutine data and mark as destructed
150     $self->_destroy
151     or return;
152    
153     # call all destruction callbacks
154     $_->(@{$self->{status}})
155     for @{(delete $self->{destroy_cb}) || []};
156     }
157    
158 root 1.24 # this coroutine is necessary because a coroutine
159     # cannot destroy itself.
160     my @destroy;
161 root 1.103 my $manager;
162    
163     $manager = new Coro sub {
164 root 1.129 $current->desc ("[coro manager]");
165    
166 pcg 1.57 while () {
167 root 1.103 (shift @destroy)->_cancel
168     while @destroy;
169    
170 root 1.24 &schedule;
171     }
172     };
173    
174 root 1.103 $manager->prio (PRIO_MAX);
175    
176 root 1.8 # static methods. not really.
177 root 1.43
178     =back
179 root 1.8
180     =head2 STATIC METHODS
181    
182 root 1.92 Static methods are actually functions that operate on the current coroutine only.
183 root 1.8
184     =over 4
185    
186 root 1.13 =item async { ... } [@args...]
187 root 1.8
188 root 1.92 Create a new asynchronous coroutine and return it's coroutine object
189     (usually unused). When the sub returns the new coroutine is automatically
190 root 1.8 terminated.
191    
192 root 1.122 Calling C<exit> in a coroutine will do the same as calling exit outside
193     the coroutine. Likewise, when the coroutine dies, the program will exit,
194     just as it would in the main program.
195 root 1.79
196 root 1.13 # create a new coroutine that just prints its arguments
197     async {
198     print "@_\n";
199     } 1,2,3,4;
200    
201 root 1.8 =cut
202    
203 root 1.13 sub async(&@) {
204 root 1.104 my $coro = new Coro @_;
205     $coro->ready;
206     $coro
207 root 1.8 }
208 root 1.1
209 root 1.105 =item async_pool { ... } [@args...]
210    
211     Similar to C<async>, but uses a coroutine pool, so you should not call
212     terminate or join (although you are allowed to), and you get a coroutine
213     that might have executed other code already (which can be good or bad :).
214    
215     Also, the block is executed in an C<eval> context and a warning will be
216 root 1.108 issued in case of an exception instead of terminating the program, as
217     C<async> does. As the coroutine is being reused, stuff like C<on_destroy>
218     will not work in the expected way, unless you call terminate or cancel,
219     which somehow defeats the purpose of pooling.
220 root 1.105
221     The priority will be reset to C<0> after each job, otherwise the coroutine
222     will be re-used "as-is".
223    
224     The pool size is limited to 8 idle coroutines (this can be adjusted by
225     changing $Coro::POOL_SIZE), and there can be as many non-idle coros as
226     required.
227    
228     If you are concerned about pooled coroutines growing a lot because a
229     single C<async_pool> used a lot of stackspace you can e.g. C<async_pool {
230     terminate }> once per second or so to slowly replenish the pool.
231    
232     =cut
233    
234     our $POOL_SIZE = 8;
235     our @pool;
236    
237     sub pool_handler {
238     while () {
239 root 1.129 $current->{desc} = "[async_pool]";
240    
241 root 1.105 eval {
242 root 1.110 my ($cb, @arg) = @{ delete $current->{_invoke} or return };
243 root 1.105 $cb->(@arg);
244     };
245     warn $@ if $@;
246    
247     last if @pool >= $POOL_SIZE;
248 root 1.129
249 root 1.105 push @pool, $current;
250 root 1.129 $current->{desc} = "[async_pool idle]";
251 root 1.118 $current->save (Coro::State::SAVE_DEF);
252 root 1.105 $current->prio (0);
253     schedule;
254 root 1.106 }
255     }
256 root 1.105
257     sub async_pool(&@) {
258     # this is also inlined into the unlock_scheduler
259 root 1.129 my $coro = (pop @pool) || new Coro \&pool_handler;;
260 root 1.105
261     $coro->{_invoke} = [@_];
262     $coro->ready;
263    
264     $coro
265     }
266    
267 root 1.8 =item schedule
268 root 1.6
269 root 1.92 Calls the scheduler. Please note that the current coroutine will not be put
270 root 1.8 into the ready queue, so calling this function usually means you will
271 root 1.91 never be called again unless something else (e.g. an event handler) calls
272     ready.
273    
274     The canonical way to wait on external events is this:
275    
276     {
277 root 1.92 # remember current coroutine
278 root 1.91 my $current = $Coro::current;
279    
280     # register a hypothetical event handler
281     on_event_invoke sub {
282     # wake up sleeping coroutine
283     $current->ready;
284     undef $current;
285     };
286    
287 root 1.124 # call schedule until event occurred.
288 root 1.91 # in case we are woken up for other reasons
289     # (current still defined), loop.
290     Coro::schedule while $current;
291     }
292 root 1.1
293 root 1.22 =item cede
294 root 1.1
295 root 1.92 "Cede" to other coroutines. This function puts the current coroutine into the
296 root 1.22 ready queue and calls C<schedule>, which has the effect of giving up the
297     current "timeslice" to other coroutines of the same or higher priority.
298 root 1.7
299 root 1.107 Returns true if at least one coroutine switch has happened.
300    
301 root 1.102 =item Coro::cede_notself
302    
303     Works like cede, but is not exported by default and will cede to any
304     coroutine, regardless of priority, once.
305    
306 root 1.107 Returns true if at least one coroutine switch has happened.
307    
308 root 1.40 =item terminate [arg...]
309 root 1.7
310 root 1.92 Terminates the current coroutine with the given status values (see L<cancel>).
311 root 1.13
312 root 1.1 =cut
313    
314 root 1.8 sub terminate {
315 pcg 1.59 $current->cancel (@_);
316 root 1.1 }
317 root 1.6
318 root 1.8 =back
319    
320     # dynamic methods
321    
322 root 1.92 =head2 COROUTINE METHODS
323 root 1.8
324 root 1.92 These are the methods you can call on coroutine objects.
325 root 1.6
326 root 1.8 =over 4
327    
328 root 1.13 =item new Coro \&sub [, @args...]
329 root 1.8
330 root 1.92 Create a new coroutine and return it. When the sub returns the coroutine
331 root 1.40 automatically terminates as if C<terminate> with the returned values were
332 root 1.92 called. To make the coroutine run you must first put it into the ready queue
333 root 1.41 by calling the ready method.
334 root 1.13
335 root 1.121 See C<async> for additional discussion.
336 root 1.89
337 root 1.6 =cut
338    
339 root 1.94 sub _run_coro {
340 root 1.13 terminate &{+shift};
341     }
342    
343 root 1.8 sub new {
344     my $class = shift;
345 root 1.83
346 root 1.94 $class->SUPER::new (\&_run_coro, @_)
347 root 1.8 }
348 root 1.6
349 root 1.92 =item $success = $coroutine->ready
350 root 1.1
351 root 1.92 Put the given coroutine into the ready queue (according to it's priority)
352     and return true. If the coroutine is already in the ready queue, do nothing
353 root 1.90 and return false.
354 root 1.1
355 root 1.92 =item $is_ready = $coroutine->is_ready
356 root 1.90
357 root 1.92 Return wether the coroutine is currently the ready queue or not,
358 root 1.28
359 root 1.92 =item $coroutine->cancel (arg...)
360 root 1.28
361 root 1.92 Terminates the given coroutine and makes it return the given arguments as
362 root 1.103 status (default: the empty list). Never returns if the coroutine is the
363     current coroutine.
364 root 1.28
365     =cut
366    
367     sub cancel {
368 pcg 1.59 my $self = shift;
369     $self->{status} = [@_];
370 root 1.103
371     if ($current == $self) {
372     push @destroy, $self;
373     $manager->ready;
374     &schedule while 1;
375     } else {
376     $self->_cancel;
377     }
378 root 1.40 }
379    
380 root 1.92 =item $coroutine->join
381 root 1.40
382     Wait until the coroutine terminates and return any values given to the
383 pcg 1.59 C<terminate> or C<cancel> functions. C<join> can be called multiple times
384 root 1.92 from multiple coroutine.
385 root 1.40
386     =cut
387    
388     sub join {
389     my $self = shift;
390 root 1.103
391 root 1.40 unless ($self->{status}) {
392 root 1.103 my $current = $current;
393    
394     push @{$self->{destroy_cb}}, sub {
395     $current->ready;
396     undef $current;
397     };
398    
399     &schedule while $current;
400 root 1.40 }
401 root 1.103
402 root 1.40 wantarray ? @{$self->{status}} : $self->{status}[0];
403 root 1.31 }
404    
405 root 1.101 =item $coroutine->on_destroy (\&cb)
406    
407     Registers a callback that is called when this coroutine gets destroyed,
408     but before it is joined. The callback gets passed the terminate arguments,
409     if any.
410    
411     =cut
412    
413     sub on_destroy {
414     my ($self, $cb) = @_;
415    
416     push @{ $self->{destroy_cb} }, $cb;
417     }
418    
419 root 1.92 =item $oldprio = $coroutine->prio ($newprio)
420 root 1.31
421 root 1.41 Sets (or gets, if the argument is missing) the priority of the
422 root 1.92 coroutine. Higher priority coroutines get run before lower priority
423     coroutines. Priorities are small signed integers (currently -4 .. +3),
424 root 1.41 that you can refer to using PRIO_xxx constants (use the import tag :prio
425     to get then):
426 root 1.31
427     PRIO_MAX > PRIO_HIGH > PRIO_NORMAL > PRIO_LOW > PRIO_IDLE > PRIO_MIN
428     3 > 1 > 0 > -1 > -3 > -4
429    
430     # set priority to HIGH
431     current->prio(PRIO_HIGH);
432    
433     The idle coroutine ($Coro::idle) always has a lower priority than any
434     existing coroutine.
435    
436 root 1.92 Changing the priority of the current coroutine will take effect immediately,
437     but changing the priority of coroutines in the ready queue (but not
438 root 1.31 running) will only take effect after the next schedule (of that
439 root 1.92 coroutine). This is a bug that will be fixed in some future version.
440 root 1.31
441 root 1.92 =item $newprio = $coroutine->nice ($change)
442 root 1.31
443     Similar to C<prio>, but subtract the given value from the priority (i.e.
444     higher values mean lower priority, just as in unix).
445    
446 root 1.92 =item $olddesc = $coroutine->desc ($newdesc)
447 root 1.41
448     Sets (or gets in case the argument is missing) the description for this
449 root 1.92 coroutine. This is just a free-form string you can associate with a coroutine.
450 root 1.41
451     =cut
452    
453     sub desc {
454     my $old = $_[0]{desc};
455     $_[0]{desc} = $_[1] if @_ > 1;
456     $old;
457 root 1.8 }
458 root 1.1
459 root 1.8 =back
460 root 1.2
461 root 1.97 =head2 GLOBAL FUNCTIONS
462 root 1.92
463     =over 4
464    
465 root 1.97 =item Coro::nready
466    
467     Returns the number of coroutines that are currently in the ready state,
468 root 1.124 i.e. that can be switched to. The value C<0> means that the only runnable
469 root 1.97 coroutine is the currently running one, so C<cede> would have no effect,
470     and C<schedule> would cause a deadlock unless there is an idle handler
471     that wakes up some coroutines.
472    
473 root 1.103 =item my $guard = Coro::guard { ... }
474    
475 root 1.119 This creates and returns a guard object. Nothing happens until the object
476 root 1.103 gets destroyed, in which case the codeblock given as argument will be
477     executed. This is useful to free locks or other resources in case of a
478     runtime error or when the coroutine gets canceled, as in both cases the
479     guard block will be executed. The guard object supports only one method,
480     C<< ->cancel >>, which will keep the codeblock from being executed.
481    
482     Example: set some flag and clear it again when the coroutine gets canceled
483     or the function returns:
484    
485     sub do_something {
486     my $guard = Coro::guard { $busy = 0 };
487     $busy = 1;
488    
489     # do something that requires $busy to be true
490     }
491    
492     =cut
493    
494     sub guard(&) {
495     bless \(my $cb = $_[0]), "Coro::guard"
496     }
497    
498     sub Coro::guard::cancel {
499     ${$_[0]} = sub { };
500     }
501    
502     sub Coro::guard::DESTROY {
503     ${$_[0]}->();
504     }
505    
506    
507 root 1.92 =item unblock_sub { ... }
508    
509     This utility function takes a BLOCK or code reference and "unblocks" it,
510     returning the new coderef. This means that the new coderef will return
511     immediately without blocking, returning nothing, while the original code
512     ref will be called (with parameters) from within its own coroutine.
513    
514 root 1.124 The reason this function exists is that many event libraries (such as the
515 root 1.92 venerable L<Event|Event> module) are not coroutine-safe (a weaker form
516     of thread-safety). This means you must not block within event callbacks,
517     otherwise you might suffer from crashes or worse.
518    
519     This function allows your callbacks to block by executing them in another
520     coroutine where it is safe to block. One example where blocking is handy
521     is when you use the L<Coro::AIO|Coro::AIO> functions to save results to
522     disk.
523    
524     In short: simply use C<unblock_sub { ... }> instead of C<sub { ... }> when
525     creating event callbacks that want to block.
526    
527     =cut
528    
529     our @unblock_queue;
530    
531 root 1.105 # we create a special coro because we want to cede,
532     # to reduce pressure on the coro pool (because most callbacks
533     # return immediately and can be reused) and because we cannot cede
534     # inside an event callback.
535 root 1.92 our $unblock_scheduler = async {
536 root 1.129 $current->desc ("[unblock_sub scheduler]");
537 root 1.92 while () {
538     while (my $cb = pop @unblock_queue) {
539 root 1.105 # this is an inlined copy of async_pool
540     my $coro = (pop @pool or new Coro \&pool_handler);
541    
542     $coro->{_invoke} = $cb;
543     $coro->ready;
544     cede; # for short-lived callbacks, this reduces pressure on the coro pool
545 root 1.92 }
546 root 1.105 schedule; # sleep well
547 root 1.92 }
548     };
549    
550     sub unblock_sub(&) {
551     my $cb = shift;
552    
553     sub {
554 root 1.105 unshift @unblock_queue, [$cb, @_];
555 root 1.92 $unblock_scheduler->ready;
556     }
557     }
558    
559     =back
560    
561 root 1.8 =cut
562 root 1.2
563 root 1.8 1;
564 root 1.14
565 root 1.17 =head1 BUGS/LIMITATIONS
566 root 1.14
567 root 1.52 - you must make very sure that no coro is still active on global
568 root 1.53 destruction. very bad things might happen otherwise (usually segfaults).
569 root 1.52
570     - this module is not thread-safe. You should only ever use this module
571 root 1.124 from the same thread (this requirement might be loosened in the future
572 root 1.52 to allow per-thread schedulers, but Coro::State does not yet allow
573     this).
574 root 1.9
575     =head1 SEE ALSO
576    
577 root 1.67 Support/Utility: L<Coro::Cont>, L<Coro::Specific>, L<Coro::State>, L<Coro::Util>.
578    
579     Locking/IPC: L<Coro::Signal>, L<Coro::Channel>, L<Coro::Semaphore>, L<Coro::SemaphoreSet>, L<Coro::RWLock>.
580    
581     Event/IO: L<Coro::Timer>, L<Coro::Event>, L<Coro::Handle>, L<Coro::Socket>, L<Coro::Select>.
582    
583     Embedding: L<Coro:MakeMaker>
584 root 1.1
585     =head1 AUTHOR
586    
587 root 1.66 Marc Lehmann <schmorp@schmorp.de>
588 root 1.64 http://home.schmorp.de/
589 root 1.1
590     =cut
591