ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/Coro/Coro.pm
Revision: 1.139
Committed: Thu Sep 27 15:52:30 2007 UTC (16 years, 8 months ago) by root
Branch: MAIN
Changes since 1.138: +6 -2 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3 root 1.8 Coro - coroutine process abstraction
4 root 1.1
5     =head1 SYNOPSIS
6    
7     use Coro;
8    
9 root 1.8 async {
10     # some asynchronous thread of execution
11 root 1.2 };
12    
13 root 1.92 # alternatively create an async coroutine like this:
14 root 1.6
15 root 1.8 sub some_func : Coro {
16     # some more async code
17     }
18    
19 root 1.22 cede;
20 root 1.2
21 root 1.1 =head1 DESCRIPTION
22    
23 root 1.98 This module collection manages coroutines. Coroutines are similar
24     to threads but don't run in parallel at the same time even on SMP
25 root 1.124 machines. The specific flavor of coroutine used in this module also
26     guarantees you that it will not switch between coroutines unless
27 root 1.98 necessary, at easily-identified points in your program, so locking and
28     parallel access are rarely an issue, making coroutine programming much
29     safer than threads programming.
30    
31     (Perl, however, does not natively support real threads but instead does a
32     very slow and memory-intensive emulation of processes using threads. This
33     is a performance win on Windows machines, and a loss everywhere else).
34    
35     In this module, coroutines are defined as "callchain + lexical variables +
36     @_ + $_ + $@ + $/ + C stack), that is, a coroutine has its own callchain,
37     its own set of lexicals and its own set of perls most important global
38     variables.
39 root 1.22
40 root 1.8 =cut
41    
42     package Coro;
43    
44 root 1.71 use strict;
45     no warnings "uninitialized";
46 root 1.36
47 root 1.8 use Coro::State;
48    
49 root 1.83 use base qw(Coro::State Exporter);
50 pcg 1.55
51 root 1.83 our $idle; # idle handler
52 root 1.71 our $main; # main coroutine
53     our $current; # current coroutine
54 root 1.8
55 root 1.138 our $VERSION = '3.8';
56 root 1.8
57 root 1.105 our @EXPORT = qw(async async_pool cede schedule terminate current unblock_sub);
58 root 1.71 our %EXPORT_TAGS = (
59 root 1.31 prio => [qw(PRIO_MAX PRIO_HIGH PRIO_NORMAL PRIO_LOW PRIO_IDLE PRIO_MIN)],
60     );
61 root 1.97 our @EXPORT_OK = (@{$EXPORT_TAGS{prio}}, qw(nready));
62 root 1.8
63     {
64     my @async;
65 root 1.26 my $init;
66 root 1.8
67     # this way of handling attributes simply is NOT scalable ;()
68     sub import {
69 root 1.71 no strict 'refs';
70    
71 root 1.93 Coro->export_to_level (1, @_);
72 root 1.71
73 root 1.8 my $old = *{(caller)[0]."::MODIFY_CODE_ATTRIBUTES"}{CODE};
74     *{(caller)[0]."::MODIFY_CODE_ATTRIBUTES"} = sub {
75     my ($package, $ref) = (shift, shift);
76     my @attrs;
77     for (@_) {
78     if ($_ eq "Coro") {
79     push @async, $ref;
80 root 1.26 unless ($init++) {
81     eval q{
82     sub INIT {
83     &async(pop @async) while @async;
84     }
85     };
86     }
87 root 1.8 } else {
88 root 1.17 push @attrs, $_;
89 root 1.8 }
90     }
91 root 1.17 return $old ? $old->($package, $ref, @attrs) : @attrs;
92 root 1.8 };
93     }
94    
95     }
96    
97 root 1.43 =over 4
98    
99 root 1.8 =item $main
100 root 1.2
101 root 1.8 This coroutine represents the main program.
102 root 1.1
103     =cut
104    
105 pcg 1.55 $main = new Coro;
106 root 1.8
107 root 1.19 =item $current (or as function: current)
108 root 1.1
109 root 1.83 The current coroutine (the last coroutine switched to). The initial value
110     is C<$main> (of course).
111    
112     This variable is B<strictly> I<read-only>. It is provided for performance
113 root 1.124 reasons. If performance is not essential you are encouraged to use the
114 root 1.83 C<Coro::current> function instead.
115 root 1.1
116 root 1.8 =cut
117    
118 root 1.131 $main->{desc} = "[main::]";
119    
120 root 1.8 # maybe some other module used Coro::Specific before...
121 root 1.93 $main->{specific} = $current->{specific}
122     if $current;
123 root 1.1
124 root 1.93 _set_current $main;
125 root 1.19
126     sub current() { $current }
127 root 1.9
128     =item $idle
129    
130 root 1.83 A callback that is called whenever the scheduler finds no ready coroutines
131     to run. The default implementation prints "FATAL: deadlock detected" and
132 root 1.91 exits, because the program has no other way to continue.
133 root 1.83
134     This hook is overwritten by modules such as C<Coro::Timer> and
135 root 1.91 C<Coro::Event> to wait on an external event that hopefully wake up a
136     coroutine so the scheduler can run it.
137    
138     Please note that if your callback recursively invokes perl (e.g. for event
139     handlers), then it must be prepared to be called recursively.
140 root 1.9
141     =cut
142    
143 root 1.83 $idle = sub {
144 root 1.96 require Carp;
145     Carp::croak ("FATAL: deadlock detected");
146 root 1.9 };
147 root 1.8
148 root 1.103 sub _cancel {
149     my ($self) = @_;
150    
151     # free coroutine data and mark as destructed
152     $self->_destroy
153     or return;
154    
155     # call all destruction callbacks
156     $_->(@{$self->{status}})
157     for @{(delete $self->{destroy_cb}) || []};
158     }
159    
160 root 1.139 sub _do_trace_sub {
161     &{$current->{_trace_sub_cb}}
162     }
163    
164     sub _do_trace_line {
165     &{$current->{_trace_line_cb}}
166 root 1.137 }
167    
168 root 1.24 # this coroutine is necessary because a coroutine
169     # cannot destroy itself.
170     my @destroy;
171 root 1.103 my $manager;
172    
173     $manager = new Coro sub {
174 pcg 1.57 while () {
175 root 1.103 (shift @destroy)->_cancel
176     while @destroy;
177    
178 root 1.24 &schedule;
179     }
180     };
181 root 1.132 $manager->desc ("[coro manager]");
182 root 1.103 $manager->prio (PRIO_MAX);
183    
184 root 1.8 # static methods. not really.
185 root 1.43
186     =back
187 root 1.8
188     =head2 STATIC METHODS
189    
190 root 1.92 Static methods are actually functions that operate on the current coroutine only.
191 root 1.8
192     =over 4
193    
194 root 1.13 =item async { ... } [@args...]
195 root 1.8
196 root 1.92 Create a new asynchronous coroutine and return it's coroutine object
197     (usually unused). When the sub returns the new coroutine is automatically
198 root 1.8 terminated.
199    
200 root 1.122 Calling C<exit> in a coroutine will do the same as calling exit outside
201     the coroutine. Likewise, when the coroutine dies, the program will exit,
202     just as it would in the main program.
203 root 1.79
204 root 1.13 # create a new coroutine that just prints its arguments
205     async {
206     print "@_\n";
207     } 1,2,3,4;
208    
209 root 1.8 =cut
210    
211 root 1.13 sub async(&@) {
212 root 1.104 my $coro = new Coro @_;
213     $coro->ready;
214     $coro
215 root 1.8 }
216 root 1.1
217 root 1.105 =item async_pool { ... } [@args...]
218    
219     Similar to C<async>, but uses a coroutine pool, so you should not call
220     terminate or join (although you are allowed to), and you get a coroutine
221     that might have executed other code already (which can be good or bad :).
222    
223     Also, the block is executed in an C<eval> context and a warning will be
224 root 1.108 issued in case of an exception instead of terminating the program, as
225     C<async> does. As the coroutine is being reused, stuff like C<on_destroy>
226     will not work in the expected way, unless you call terminate or cancel,
227     which somehow defeats the purpose of pooling.
228 root 1.105
229     The priority will be reset to C<0> after each job, otherwise the coroutine
230     will be re-used "as-is".
231    
232     The pool size is limited to 8 idle coroutines (this can be adjusted by
233     changing $Coro::POOL_SIZE), and there can be as many non-idle coros as
234     required.
235    
236     If you are concerned about pooled coroutines growing a lot because a
237 root 1.133 single C<async_pool> used a lot of stackspace you can e.g. C<async_pool
238     { terminate }> once per second or so to slowly replenish the pool. In
239     addition to that, when the stacks used by a handler grows larger than 16kb
240 root 1.134 (adjustable with $Coro::POOL_RSS) it will also exit.
241 root 1.105
242     =cut
243    
244     our $POOL_SIZE = 8;
245 root 1.134 our $POOL_RSS = 16 * 1024;
246     our @async_pool;
247 root 1.105
248     sub pool_handler {
249 root 1.134 my $cb;
250    
251 root 1.105 while () {
252 root 1.134 eval {
253     while () {
254 root 1.136 _pool_1 $cb;
255 root 1.134 &$cb;
256 root 1.136 _pool_2 $cb;
257 root 1.135 &schedule;
258 root 1.134 }
259 root 1.105 };
260 root 1.134
261 root 1.136 last if $@ eq "\3terminate\2\n";
262 root 1.105 warn $@ if $@;
263 root 1.106 }
264     }
265 root 1.105
266     sub async_pool(&@) {
267     # this is also inlined into the unlock_scheduler
268 root 1.135 my $coro = (pop @async_pool) || new Coro \&pool_handler;
269 root 1.105
270     $coro->{_invoke} = [@_];
271     $coro->ready;
272    
273     $coro
274     }
275    
276 root 1.8 =item schedule
277 root 1.6
278 root 1.92 Calls the scheduler. Please note that the current coroutine will not be put
279 root 1.8 into the ready queue, so calling this function usually means you will
280 root 1.91 never be called again unless something else (e.g. an event handler) calls
281     ready.
282    
283     The canonical way to wait on external events is this:
284    
285     {
286 root 1.92 # remember current coroutine
287 root 1.91 my $current = $Coro::current;
288    
289     # register a hypothetical event handler
290     on_event_invoke sub {
291     # wake up sleeping coroutine
292     $current->ready;
293     undef $current;
294     };
295    
296 root 1.124 # call schedule until event occurred.
297 root 1.91 # in case we are woken up for other reasons
298     # (current still defined), loop.
299     Coro::schedule while $current;
300     }
301 root 1.1
302 root 1.22 =item cede
303 root 1.1
304 root 1.92 "Cede" to other coroutines. This function puts the current coroutine into the
305 root 1.22 ready queue and calls C<schedule>, which has the effect of giving up the
306     current "timeslice" to other coroutines of the same or higher priority.
307 root 1.7
308 root 1.107 Returns true if at least one coroutine switch has happened.
309    
310 root 1.102 =item Coro::cede_notself
311    
312     Works like cede, but is not exported by default and will cede to any
313     coroutine, regardless of priority, once.
314    
315 root 1.107 Returns true if at least one coroutine switch has happened.
316    
317 root 1.40 =item terminate [arg...]
318 root 1.7
319 root 1.92 Terminates the current coroutine with the given status values (see L<cancel>).
320 root 1.13
321 root 1.1 =cut
322    
323 root 1.8 sub terminate {
324 pcg 1.59 $current->cancel (@_);
325 root 1.1 }
326 root 1.6
327 root 1.8 =back
328    
329     # dynamic methods
330    
331 root 1.92 =head2 COROUTINE METHODS
332 root 1.8
333 root 1.92 These are the methods you can call on coroutine objects.
334 root 1.6
335 root 1.8 =over 4
336    
337 root 1.13 =item new Coro \&sub [, @args...]
338 root 1.8
339 root 1.92 Create a new coroutine and return it. When the sub returns the coroutine
340 root 1.40 automatically terminates as if C<terminate> with the returned values were
341 root 1.92 called. To make the coroutine run you must first put it into the ready queue
342 root 1.41 by calling the ready method.
343 root 1.13
344 root 1.121 See C<async> for additional discussion.
345 root 1.89
346 root 1.6 =cut
347    
348 root 1.94 sub _run_coro {
349 root 1.13 terminate &{+shift};
350     }
351    
352 root 1.8 sub new {
353     my $class = shift;
354 root 1.83
355 root 1.94 $class->SUPER::new (\&_run_coro, @_)
356 root 1.8 }
357 root 1.6
358 root 1.92 =item $success = $coroutine->ready
359 root 1.1
360 root 1.92 Put the given coroutine into the ready queue (according to it's priority)
361     and return true. If the coroutine is already in the ready queue, do nothing
362 root 1.90 and return false.
363 root 1.1
364 root 1.92 =item $is_ready = $coroutine->is_ready
365 root 1.90
366 root 1.92 Return wether the coroutine is currently the ready queue or not,
367 root 1.28
368 root 1.92 =item $coroutine->cancel (arg...)
369 root 1.28
370 root 1.92 Terminates the given coroutine and makes it return the given arguments as
371 root 1.103 status (default: the empty list). Never returns if the coroutine is the
372     current coroutine.
373 root 1.28
374     =cut
375    
376     sub cancel {
377 pcg 1.59 my $self = shift;
378     $self->{status} = [@_];
379 root 1.103
380     if ($current == $self) {
381     push @destroy, $self;
382     $manager->ready;
383     &schedule while 1;
384     } else {
385     $self->_cancel;
386     }
387 root 1.40 }
388    
389 root 1.92 =item $coroutine->join
390 root 1.40
391     Wait until the coroutine terminates and return any values given to the
392 pcg 1.59 C<terminate> or C<cancel> functions. C<join> can be called multiple times
393 root 1.92 from multiple coroutine.
394 root 1.40
395     =cut
396    
397     sub join {
398     my $self = shift;
399 root 1.103
400 root 1.40 unless ($self->{status}) {
401 root 1.103 my $current = $current;
402    
403     push @{$self->{destroy_cb}}, sub {
404     $current->ready;
405     undef $current;
406     };
407    
408     &schedule while $current;
409 root 1.40 }
410 root 1.103
411 root 1.40 wantarray ? @{$self->{status}} : $self->{status}[0];
412 root 1.31 }
413    
414 root 1.101 =item $coroutine->on_destroy (\&cb)
415    
416     Registers a callback that is called when this coroutine gets destroyed,
417     but before it is joined. The callback gets passed the terminate arguments,
418     if any.
419    
420     =cut
421    
422     sub on_destroy {
423     my ($self, $cb) = @_;
424    
425     push @{ $self->{destroy_cb} }, $cb;
426     }
427    
428 root 1.92 =item $oldprio = $coroutine->prio ($newprio)
429 root 1.31
430 root 1.41 Sets (or gets, if the argument is missing) the priority of the
431 root 1.92 coroutine. Higher priority coroutines get run before lower priority
432     coroutines. Priorities are small signed integers (currently -4 .. +3),
433 root 1.41 that you can refer to using PRIO_xxx constants (use the import tag :prio
434     to get then):
435 root 1.31
436     PRIO_MAX > PRIO_HIGH > PRIO_NORMAL > PRIO_LOW > PRIO_IDLE > PRIO_MIN
437     3 > 1 > 0 > -1 > -3 > -4
438    
439     # set priority to HIGH
440     current->prio(PRIO_HIGH);
441    
442     The idle coroutine ($Coro::idle) always has a lower priority than any
443     existing coroutine.
444    
445 root 1.92 Changing the priority of the current coroutine will take effect immediately,
446     but changing the priority of coroutines in the ready queue (but not
447 root 1.31 running) will only take effect after the next schedule (of that
448 root 1.92 coroutine). This is a bug that will be fixed in some future version.
449 root 1.31
450 root 1.92 =item $newprio = $coroutine->nice ($change)
451 root 1.31
452     Similar to C<prio>, but subtract the given value from the priority (i.e.
453     higher values mean lower priority, just as in unix).
454    
455 root 1.92 =item $olddesc = $coroutine->desc ($newdesc)
456 root 1.41
457     Sets (or gets in case the argument is missing) the description for this
458 root 1.92 coroutine. This is just a free-form string you can associate with a coroutine.
459 root 1.41
460     =cut
461    
462     sub desc {
463     my $old = $_[0]{desc};
464     $_[0]{desc} = $_[1] if @_ > 1;
465     $old;
466 root 1.8 }
467 root 1.1
468 root 1.8 =back
469 root 1.2
470 root 1.97 =head2 GLOBAL FUNCTIONS
471 root 1.92
472     =over 4
473    
474 root 1.97 =item Coro::nready
475    
476     Returns the number of coroutines that are currently in the ready state,
477 root 1.124 i.e. that can be switched to. The value C<0> means that the only runnable
478 root 1.97 coroutine is the currently running one, so C<cede> would have no effect,
479     and C<schedule> would cause a deadlock unless there is an idle handler
480     that wakes up some coroutines.
481    
482 root 1.103 =item my $guard = Coro::guard { ... }
483    
484 root 1.119 This creates and returns a guard object. Nothing happens until the object
485 root 1.103 gets destroyed, in which case the codeblock given as argument will be
486     executed. This is useful to free locks or other resources in case of a
487     runtime error or when the coroutine gets canceled, as in both cases the
488     guard block will be executed. The guard object supports only one method,
489     C<< ->cancel >>, which will keep the codeblock from being executed.
490    
491     Example: set some flag and clear it again when the coroutine gets canceled
492     or the function returns:
493    
494     sub do_something {
495     my $guard = Coro::guard { $busy = 0 };
496     $busy = 1;
497    
498     # do something that requires $busy to be true
499     }
500    
501     =cut
502    
503     sub guard(&) {
504     bless \(my $cb = $_[0]), "Coro::guard"
505     }
506    
507     sub Coro::guard::cancel {
508     ${$_[0]} = sub { };
509     }
510    
511     sub Coro::guard::DESTROY {
512     ${$_[0]}->();
513     }
514    
515    
516 root 1.92 =item unblock_sub { ... }
517    
518     This utility function takes a BLOCK or code reference and "unblocks" it,
519     returning the new coderef. This means that the new coderef will return
520     immediately without blocking, returning nothing, while the original code
521     ref will be called (with parameters) from within its own coroutine.
522    
523 root 1.124 The reason this function exists is that many event libraries (such as the
524 root 1.92 venerable L<Event|Event> module) are not coroutine-safe (a weaker form
525     of thread-safety). This means you must not block within event callbacks,
526     otherwise you might suffer from crashes or worse.
527    
528     This function allows your callbacks to block by executing them in another
529     coroutine where it is safe to block. One example where blocking is handy
530     is when you use the L<Coro::AIO|Coro::AIO> functions to save results to
531     disk.
532    
533     In short: simply use C<unblock_sub { ... }> instead of C<sub { ... }> when
534     creating event callbacks that want to block.
535    
536     =cut
537    
538     our @unblock_queue;
539    
540 root 1.105 # we create a special coro because we want to cede,
541     # to reduce pressure on the coro pool (because most callbacks
542     # return immediately and can be reused) and because we cannot cede
543     # inside an event callback.
544 root 1.132 our $unblock_scheduler = new Coro sub {
545 root 1.92 while () {
546     while (my $cb = pop @unblock_queue) {
547 root 1.105 # this is an inlined copy of async_pool
548 root 1.134 my $coro = (pop @async_pool) || new Coro \&pool_handler;
549 root 1.105
550     $coro->{_invoke} = $cb;
551     $coro->ready;
552     cede; # for short-lived callbacks, this reduces pressure on the coro pool
553 root 1.92 }
554 root 1.105 schedule; # sleep well
555 root 1.92 }
556     };
557 root 1.132 $unblock_scheduler->desc ("[unblock_sub scheduler]");
558 root 1.92
559     sub unblock_sub(&) {
560     my $cb = shift;
561    
562     sub {
563 root 1.105 unshift @unblock_queue, [$cb, @_];
564 root 1.92 $unblock_scheduler->ready;
565     }
566     }
567    
568     =back
569    
570 root 1.8 =cut
571 root 1.2
572 root 1.8 1;
573 root 1.14
574 root 1.17 =head1 BUGS/LIMITATIONS
575 root 1.14
576 root 1.52 - you must make very sure that no coro is still active on global
577 root 1.53 destruction. very bad things might happen otherwise (usually segfaults).
578 root 1.52
579     - this module is not thread-safe. You should only ever use this module
580 root 1.124 from the same thread (this requirement might be loosened in the future
581 root 1.52 to allow per-thread schedulers, but Coro::State does not yet allow
582     this).
583 root 1.9
584     =head1 SEE ALSO
585    
586 root 1.67 Support/Utility: L<Coro::Cont>, L<Coro::Specific>, L<Coro::State>, L<Coro::Util>.
587    
588     Locking/IPC: L<Coro::Signal>, L<Coro::Channel>, L<Coro::Semaphore>, L<Coro::SemaphoreSet>, L<Coro::RWLock>.
589    
590     Event/IO: L<Coro::Timer>, L<Coro::Event>, L<Coro::Handle>, L<Coro::Socket>, L<Coro::Select>.
591    
592     Embedding: L<Coro:MakeMaker>
593 root 1.1
594     =head1 AUTHOR
595    
596 root 1.66 Marc Lehmann <schmorp@schmorp.de>
597 root 1.64 http://home.schmorp.de/
598 root 1.1
599     =cut
600