ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/Coro/Coro.pm
Revision: 1.121
Committed: Fri Apr 13 12:56:55 2007 UTC (17 years, 2 months ago) by root
Branch: MAIN
Changes since 1.120: +4 -2 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3 root 1.8 Coro - coroutine process abstraction
4 root 1.1
5     =head1 SYNOPSIS
6    
7     use Coro;
8    
9 root 1.8 async {
10     # some asynchronous thread of execution
11 root 1.2 };
12    
13 root 1.92 # alternatively create an async coroutine like this:
14 root 1.6
15 root 1.8 sub some_func : Coro {
16     # some more async code
17     }
18    
19 root 1.22 cede;
20 root 1.2
21 root 1.1 =head1 DESCRIPTION
22    
23 root 1.98 This module collection manages coroutines. Coroutines are similar
24     to threads but don't run in parallel at the same time even on SMP
25     machines. The specific flavor of coroutine use din this module also
26     guarentees you that it will not switch between coroutines unless
27     necessary, at easily-identified points in your program, so locking and
28     parallel access are rarely an issue, making coroutine programming much
29     safer than threads programming.
30    
31     (Perl, however, does not natively support real threads but instead does a
32     very slow and memory-intensive emulation of processes using threads. This
33     is a performance win on Windows machines, and a loss everywhere else).
34    
35     In this module, coroutines are defined as "callchain + lexical variables +
36     @_ + $_ + $@ + $/ + C stack), that is, a coroutine has its own callchain,
37     its own set of lexicals and its own set of perls most important global
38     variables.
39 root 1.22
40 root 1.8 =cut
41    
42     package Coro;
43    
44 root 1.71 use strict;
45     no warnings "uninitialized";
46 root 1.36
47 root 1.8 use Coro::State;
48    
49 root 1.83 use base qw(Coro::State Exporter);
50 pcg 1.55
51 root 1.83 our $idle; # idle handler
52 root 1.71 our $main; # main coroutine
53     our $current; # current coroutine
54 root 1.8
55 root 1.120 our $VERSION = '3.56';
56 root 1.8
57 root 1.105 our @EXPORT = qw(async async_pool cede schedule terminate current unblock_sub);
58 root 1.71 our %EXPORT_TAGS = (
59 root 1.31 prio => [qw(PRIO_MAX PRIO_HIGH PRIO_NORMAL PRIO_LOW PRIO_IDLE PRIO_MIN)],
60     );
61 root 1.97 our @EXPORT_OK = (@{$EXPORT_TAGS{prio}}, qw(nready));
62 root 1.8
63     {
64     my @async;
65 root 1.26 my $init;
66 root 1.8
67     # this way of handling attributes simply is NOT scalable ;()
68     sub import {
69 root 1.71 no strict 'refs';
70    
71 root 1.93 Coro->export_to_level (1, @_);
72 root 1.71
73 root 1.8 my $old = *{(caller)[0]."::MODIFY_CODE_ATTRIBUTES"}{CODE};
74     *{(caller)[0]."::MODIFY_CODE_ATTRIBUTES"} = sub {
75     my ($package, $ref) = (shift, shift);
76     my @attrs;
77     for (@_) {
78     if ($_ eq "Coro") {
79     push @async, $ref;
80 root 1.26 unless ($init++) {
81     eval q{
82     sub INIT {
83     &async(pop @async) while @async;
84     }
85     };
86     }
87 root 1.8 } else {
88 root 1.17 push @attrs, $_;
89 root 1.8 }
90     }
91 root 1.17 return $old ? $old->($package, $ref, @attrs) : @attrs;
92 root 1.8 };
93     }
94    
95     }
96    
97 root 1.43 =over 4
98    
99 root 1.8 =item $main
100 root 1.2
101 root 1.8 This coroutine represents the main program.
102 root 1.1
103     =cut
104    
105 pcg 1.55 $main = new Coro;
106 root 1.8
107 root 1.19 =item $current (or as function: current)
108 root 1.1
109 root 1.83 The current coroutine (the last coroutine switched to). The initial value
110     is C<$main> (of course).
111    
112     This variable is B<strictly> I<read-only>. It is provided for performance
113     reasons. If performance is not essentiel you are encouraged to use the
114     C<Coro::current> function instead.
115 root 1.1
116 root 1.8 =cut
117    
118     # maybe some other module used Coro::Specific before...
119 root 1.93 $main->{specific} = $current->{specific}
120     if $current;
121 root 1.1
122 root 1.93 _set_current $main;
123 root 1.19
124     sub current() { $current }
125 root 1.9
126     =item $idle
127    
128 root 1.83 A callback that is called whenever the scheduler finds no ready coroutines
129     to run. The default implementation prints "FATAL: deadlock detected" and
130 root 1.91 exits, because the program has no other way to continue.
131 root 1.83
132     This hook is overwritten by modules such as C<Coro::Timer> and
133 root 1.91 C<Coro::Event> to wait on an external event that hopefully wake up a
134     coroutine so the scheduler can run it.
135    
136     Please note that if your callback recursively invokes perl (e.g. for event
137     handlers), then it must be prepared to be called recursively.
138 root 1.9
139     =cut
140    
141 root 1.83 $idle = sub {
142 root 1.96 require Carp;
143     Carp::croak ("FATAL: deadlock detected");
144 root 1.9 };
145 root 1.8
146 root 1.103 sub _cancel {
147     my ($self) = @_;
148    
149     # free coroutine data and mark as destructed
150     $self->_destroy
151     or return;
152    
153     # call all destruction callbacks
154     $_->(@{$self->{status}})
155     for @{(delete $self->{destroy_cb}) || []};
156     }
157    
158 root 1.24 # this coroutine is necessary because a coroutine
159     # cannot destroy itself.
160     my @destroy;
161 root 1.103 my $manager;
162    
163     $manager = new Coro sub {
164 pcg 1.57 while () {
165 root 1.103 (shift @destroy)->_cancel
166     while @destroy;
167    
168 root 1.24 &schedule;
169     }
170     };
171    
172 root 1.103 $manager->prio (PRIO_MAX);
173    
174 root 1.8 # static methods. not really.
175 root 1.43
176     =back
177 root 1.8
178     =head2 STATIC METHODS
179    
180 root 1.92 Static methods are actually functions that operate on the current coroutine only.
181 root 1.8
182     =over 4
183    
184 root 1.13 =item async { ... } [@args...]
185 root 1.8
186 root 1.92 Create a new asynchronous coroutine and return it's coroutine object
187     (usually unused). When the sub returns the new coroutine is automatically
188 root 1.8 terminated.
189    
190 root 1.121 Calling C<exit> in a coroutine will try to do the same as calling exit
191     outside the coroutine, but this is experimental. It is best not to rely on
192     exit doing any cleanups or even not crashing.
193 root 1.89
194 root 1.79 When the coroutine dies, the program will exit, just as in the main
195     program.
196    
197 root 1.13 # create a new coroutine that just prints its arguments
198     async {
199     print "@_\n";
200     } 1,2,3,4;
201    
202 root 1.8 =cut
203    
204 root 1.13 sub async(&@) {
205 root 1.104 my $coro = new Coro @_;
206     $coro->ready;
207     $coro
208 root 1.8 }
209 root 1.1
210 root 1.105 =item async_pool { ... } [@args...]
211    
212     Similar to C<async>, but uses a coroutine pool, so you should not call
213     terminate or join (although you are allowed to), and you get a coroutine
214     that might have executed other code already (which can be good or bad :).
215    
216     Also, the block is executed in an C<eval> context and a warning will be
217 root 1.108 issued in case of an exception instead of terminating the program, as
218     C<async> does. As the coroutine is being reused, stuff like C<on_destroy>
219     will not work in the expected way, unless you call terminate or cancel,
220     which somehow defeats the purpose of pooling.
221 root 1.105
222     The priority will be reset to C<0> after each job, otherwise the coroutine
223     will be re-used "as-is".
224    
225     The pool size is limited to 8 idle coroutines (this can be adjusted by
226     changing $Coro::POOL_SIZE), and there can be as many non-idle coros as
227     required.
228    
229     If you are concerned about pooled coroutines growing a lot because a
230     single C<async_pool> used a lot of stackspace you can e.g. C<async_pool {
231     terminate }> once per second or so to slowly replenish the pool.
232    
233     =cut
234    
235     our $POOL_SIZE = 8;
236     our @pool;
237    
238     sub pool_handler {
239     while () {
240     eval {
241 root 1.110 my ($cb, @arg) = @{ delete $current->{_invoke} or return };
242 root 1.105 $cb->(@arg);
243     };
244     warn $@ if $@;
245    
246     last if @pool >= $POOL_SIZE;
247     push @pool, $current;
248    
249 root 1.118 $current->save (Coro::State::SAVE_DEF);
250 root 1.105 $current->prio (0);
251     schedule;
252 root 1.106 }
253     }
254 root 1.105
255     sub async_pool(&@) {
256     # this is also inlined into the unlock_scheduler
257     my $coro = (pop @pool or new Coro \&pool_handler);
258    
259     $coro->{_invoke} = [@_];
260     $coro->ready;
261    
262     $coro
263     }
264    
265 root 1.8 =item schedule
266 root 1.6
267 root 1.92 Calls the scheduler. Please note that the current coroutine will not be put
268 root 1.8 into the ready queue, so calling this function usually means you will
269 root 1.91 never be called again unless something else (e.g. an event handler) calls
270     ready.
271    
272     The canonical way to wait on external events is this:
273    
274     {
275 root 1.92 # remember current coroutine
276 root 1.91 my $current = $Coro::current;
277    
278     # register a hypothetical event handler
279     on_event_invoke sub {
280     # wake up sleeping coroutine
281     $current->ready;
282     undef $current;
283     };
284    
285     # call schedule until event occured.
286     # in case we are woken up for other reasons
287     # (current still defined), loop.
288     Coro::schedule while $current;
289     }
290 root 1.1
291 root 1.22 =item cede
292 root 1.1
293 root 1.92 "Cede" to other coroutines. This function puts the current coroutine into the
294 root 1.22 ready queue and calls C<schedule>, which has the effect of giving up the
295     current "timeslice" to other coroutines of the same or higher priority.
296 root 1.7
297 root 1.107 Returns true if at least one coroutine switch has happened.
298    
299 root 1.102 =item Coro::cede_notself
300    
301     Works like cede, but is not exported by default and will cede to any
302     coroutine, regardless of priority, once.
303    
304 root 1.107 Returns true if at least one coroutine switch has happened.
305    
306 root 1.40 =item terminate [arg...]
307 root 1.7
308 root 1.92 Terminates the current coroutine with the given status values (see L<cancel>).
309 root 1.13
310 root 1.1 =cut
311    
312 root 1.8 sub terminate {
313 pcg 1.59 $current->cancel (@_);
314 root 1.1 }
315 root 1.6
316 root 1.8 =back
317    
318     # dynamic methods
319    
320 root 1.92 =head2 COROUTINE METHODS
321 root 1.8
322 root 1.92 These are the methods you can call on coroutine objects.
323 root 1.6
324 root 1.8 =over 4
325    
326 root 1.13 =item new Coro \&sub [, @args...]
327 root 1.8
328 root 1.92 Create a new coroutine and return it. When the sub returns the coroutine
329 root 1.40 automatically terminates as if C<terminate> with the returned values were
330 root 1.92 called. To make the coroutine run you must first put it into the ready queue
331 root 1.41 by calling the ready method.
332 root 1.13
333 root 1.121 See C<async> for additional discussion.
334 root 1.89
335 root 1.6 =cut
336    
337 root 1.94 sub _run_coro {
338 root 1.13 terminate &{+shift};
339     }
340    
341 root 1.8 sub new {
342     my $class = shift;
343 root 1.83
344 root 1.94 $class->SUPER::new (\&_run_coro, @_)
345 root 1.8 }
346 root 1.6
347 root 1.92 =item $success = $coroutine->ready
348 root 1.1
349 root 1.92 Put the given coroutine into the ready queue (according to it's priority)
350     and return true. If the coroutine is already in the ready queue, do nothing
351 root 1.90 and return false.
352 root 1.1
353 root 1.92 =item $is_ready = $coroutine->is_ready
354 root 1.90
355 root 1.92 Return wether the coroutine is currently the ready queue or not,
356 root 1.28
357 root 1.92 =item $coroutine->cancel (arg...)
358 root 1.28
359 root 1.92 Terminates the given coroutine and makes it return the given arguments as
360 root 1.103 status (default: the empty list). Never returns if the coroutine is the
361     current coroutine.
362 root 1.28
363     =cut
364    
365     sub cancel {
366 pcg 1.59 my $self = shift;
367     $self->{status} = [@_];
368 root 1.103
369     if ($current == $self) {
370     push @destroy, $self;
371     $manager->ready;
372     &schedule while 1;
373     } else {
374     $self->_cancel;
375     }
376 root 1.40 }
377    
378 root 1.92 =item $coroutine->join
379 root 1.40
380     Wait until the coroutine terminates and return any values given to the
381 pcg 1.59 C<terminate> or C<cancel> functions. C<join> can be called multiple times
382 root 1.92 from multiple coroutine.
383 root 1.40
384     =cut
385    
386     sub join {
387     my $self = shift;
388 root 1.103
389 root 1.40 unless ($self->{status}) {
390 root 1.103 my $current = $current;
391    
392     push @{$self->{destroy_cb}}, sub {
393     $current->ready;
394     undef $current;
395     };
396    
397     &schedule while $current;
398 root 1.40 }
399 root 1.103
400 root 1.40 wantarray ? @{$self->{status}} : $self->{status}[0];
401 root 1.31 }
402    
403 root 1.101 =item $coroutine->on_destroy (\&cb)
404    
405     Registers a callback that is called when this coroutine gets destroyed,
406     but before it is joined. The callback gets passed the terminate arguments,
407     if any.
408    
409     =cut
410    
411     sub on_destroy {
412     my ($self, $cb) = @_;
413    
414     push @{ $self->{destroy_cb} }, $cb;
415     }
416    
417 root 1.92 =item $oldprio = $coroutine->prio ($newprio)
418 root 1.31
419 root 1.41 Sets (or gets, if the argument is missing) the priority of the
420 root 1.92 coroutine. Higher priority coroutines get run before lower priority
421     coroutines. Priorities are small signed integers (currently -4 .. +3),
422 root 1.41 that you can refer to using PRIO_xxx constants (use the import tag :prio
423     to get then):
424 root 1.31
425     PRIO_MAX > PRIO_HIGH > PRIO_NORMAL > PRIO_LOW > PRIO_IDLE > PRIO_MIN
426     3 > 1 > 0 > -1 > -3 > -4
427    
428     # set priority to HIGH
429     current->prio(PRIO_HIGH);
430    
431     The idle coroutine ($Coro::idle) always has a lower priority than any
432     existing coroutine.
433    
434 root 1.92 Changing the priority of the current coroutine will take effect immediately,
435     but changing the priority of coroutines in the ready queue (but not
436 root 1.31 running) will only take effect after the next schedule (of that
437 root 1.92 coroutine). This is a bug that will be fixed in some future version.
438 root 1.31
439 root 1.92 =item $newprio = $coroutine->nice ($change)
440 root 1.31
441     Similar to C<prio>, but subtract the given value from the priority (i.e.
442     higher values mean lower priority, just as in unix).
443    
444 root 1.92 =item $olddesc = $coroutine->desc ($newdesc)
445 root 1.41
446     Sets (or gets in case the argument is missing) the description for this
447 root 1.92 coroutine. This is just a free-form string you can associate with a coroutine.
448 root 1.41
449     =cut
450    
451     sub desc {
452     my $old = $_[0]{desc};
453     $_[0]{desc} = $_[1] if @_ > 1;
454     $old;
455 root 1.8 }
456 root 1.1
457 root 1.8 =back
458 root 1.2
459 root 1.97 =head2 GLOBAL FUNCTIONS
460 root 1.92
461     =over 4
462    
463 root 1.97 =item Coro::nready
464    
465     Returns the number of coroutines that are currently in the ready state,
466     i.e. that can be swicthed to. The value C<0> means that the only runnable
467     coroutine is the currently running one, so C<cede> would have no effect,
468     and C<schedule> would cause a deadlock unless there is an idle handler
469     that wakes up some coroutines.
470    
471 root 1.103 =item my $guard = Coro::guard { ... }
472    
473 root 1.119 This creates and returns a guard object. Nothing happens until the object
474 root 1.103 gets destroyed, in which case the codeblock given as argument will be
475     executed. This is useful to free locks or other resources in case of a
476     runtime error or when the coroutine gets canceled, as in both cases the
477     guard block will be executed. The guard object supports only one method,
478     C<< ->cancel >>, which will keep the codeblock from being executed.
479    
480     Example: set some flag and clear it again when the coroutine gets canceled
481     or the function returns:
482    
483     sub do_something {
484     my $guard = Coro::guard { $busy = 0 };
485     $busy = 1;
486    
487     # do something that requires $busy to be true
488     }
489    
490     =cut
491    
492     sub guard(&) {
493     bless \(my $cb = $_[0]), "Coro::guard"
494     }
495    
496     sub Coro::guard::cancel {
497     ${$_[0]} = sub { };
498     }
499    
500     sub Coro::guard::DESTROY {
501     ${$_[0]}->();
502     }
503    
504    
505 root 1.92 =item unblock_sub { ... }
506    
507     This utility function takes a BLOCK or code reference and "unblocks" it,
508     returning the new coderef. This means that the new coderef will return
509     immediately without blocking, returning nothing, while the original code
510     ref will be called (with parameters) from within its own coroutine.
511    
512     The reason this fucntion exists is that many event libraries (such as the
513     venerable L<Event|Event> module) are not coroutine-safe (a weaker form
514     of thread-safety). This means you must not block within event callbacks,
515     otherwise you might suffer from crashes or worse.
516    
517     This function allows your callbacks to block by executing them in another
518     coroutine where it is safe to block. One example where blocking is handy
519     is when you use the L<Coro::AIO|Coro::AIO> functions to save results to
520     disk.
521    
522     In short: simply use C<unblock_sub { ... }> instead of C<sub { ... }> when
523     creating event callbacks that want to block.
524    
525     =cut
526    
527     our @unblock_queue;
528    
529 root 1.105 # we create a special coro because we want to cede,
530     # to reduce pressure on the coro pool (because most callbacks
531     # return immediately and can be reused) and because we cannot cede
532     # inside an event callback.
533 root 1.92 our $unblock_scheduler = async {
534     while () {
535     while (my $cb = pop @unblock_queue) {
536 root 1.105 # this is an inlined copy of async_pool
537     my $coro = (pop @pool or new Coro \&pool_handler);
538    
539     $coro->{_invoke} = $cb;
540     $coro->ready;
541     cede; # for short-lived callbacks, this reduces pressure on the coro pool
542 root 1.92 }
543 root 1.105 schedule; # sleep well
544 root 1.92 }
545     };
546    
547     sub unblock_sub(&) {
548     my $cb = shift;
549    
550     sub {
551 root 1.105 unshift @unblock_queue, [$cb, @_];
552 root 1.92 $unblock_scheduler->ready;
553     }
554     }
555    
556     =back
557    
558 root 1.8 =cut
559 root 1.2
560 root 1.8 1;
561 root 1.14
562 root 1.17 =head1 BUGS/LIMITATIONS
563 root 1.14
564 root 1.52 - you must make very sure that no coro is still active on global
565 root 1.53 destruction. very bad things might happen otherwise (usually segfaults).
566 root 1.52
567     - this module is not thread-safe. You should only ever use this module
568     from the same thread (this requirement might be losened in the future
569     to allow per-thread schedulers, but Coro::State does not yet allow
570     this).
571 root 1.9
572     =head1 SEE ALSO
573    
574 root 1.67 Support/Utility: L<Coro::Cont>, L<Coro::Specific>, L<Coro::State>, L<Coro::Util>.
575    
576     Locking/IPC: L<Coro::Signal>, L<Coro::Channel>, L<Coro::Semaphore>, L<Coro::SemaphoreSet>, L<Coro::RWLock>.
577    
578     Event/IO: L<Coro::Timer>, L<Coro::Event>, L<Coro::Handle>, L<Coro::Socket>, L<Coro::Select>.
579    
580     Embedding: L<Coro:MakeMaker>
581 root 1.1
582     =head1 AUTHOR
583    
584 root 1.66 Marc Lehmann <schmorp@schmorp.de>
585 root 1.64 http://home.schmorp.de/
586 root 1.1
587     =cut
588