ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/Coro/Coro.pm
Revision: 1.137
Committed: Wed Sep 26 19:26:48 2007 UTC (16 years, 7 months ago) by root
Branch: MAIN
Changes since 1.136: +4 -0 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3 root 1.8 Coro - coroutine process abstraction
4 root 1.1
5     =head1 SYNOPSIS
6    
7     use Coro;
8    
9 root 1.8 async {
10     # some asynchronous thread of execution
11 root 1.2 };
12    
13 root 1.92 # alternatively create an async coroutine like this:
14 root 1.6
15 root 1.8 sub some_func : Coro {
16     # some more async code
17     }
18    
19 root 1.22 cede;
20 root 1.2
21 root 1.1 =head1 DESCRIPTION
22    
23 root 1.98 This module collection manages coroutines. Coroutines are similar
24     to threads but don't run in parallel at the same time even on SMP
25 root 1.124 machines. The specific flavor of coroutine used in this module also
26     guarantees you that it will not switch between coroutines unless
27 root 1.98 necessary, at easily-identified points in your program, so locking and
28     parallel access are rarely an issue, making coroutine programming much
29     safer than threads programming.
30    
31     (Perl, however, does not natively support real threads but instead does a
32     very slow and memory-intensive emulation of processes using threads. This
33     is a performance win on Windows machines, and a loss everywhere else).
34    
35     In this module, coroutines are defined as "callchain + lexical variables +
36     @_ + $_ + $@ + $/ + C stack), that is, a coroutine has its own callchain,
37     its own set of lexicals and its own set of perls most important global
38     variables.
39 root 1.22
40 root 1.8 =cut
41    
42     package Coro;
43    
44 root 1.71 use strict;
45     no warnings "uninitialized";
46 root 1.36
47 root 1.8 use Coro::State;
48    
49 root 1.83 use base qw(Coro::State Exporter);
50 pcg 1.55
51 root 1.83 our $idle; # idle handler
52 root 1.71 our $main; # main coroutine
53     our $current; # current coroutine
54 root 1.8
55 root 1.128 our $VERSION = '3.7';
56 root 1.8
57 root 1.105 our @EXPORT = qw(async async_pool cede schedule terminate current unblock_sub);
58 root 1.71 our %EXPORT_TAGS = (
59 root 1.31 prio => [qw(PRIO_MAX PRIO_HIGH PRIO_NORMAL PRIO_LOW PRIO_IDLE PRIO_MIN)],
60     );
61 root 1.97 our @EXPORT_OK = (@{$EXPORT_TAGS{prio}}, qw(nready));
62 root 1.8
63     {
64     my @async;
65 root 1.26 my $init;
66 root 1.8
67     # this way of handling attributes simply is NOT scalable ;()
68     sub import {
69 root 1.71 no strict 'refs';
70    
71 root 1.93 Coro->export_to_level (1, @_);
72 root 1.71
73 root 1.8 my $old = *{(caller)[0]."::MODIFY_CODE_ATTRIBUTES"}{CODE};
74     *{(caller)[0]."::MODIFY_CODE_ATTRIBUTES"} = sub {
75     my ($package, $ref) = (shift, shift);
76     my @attrs;
77     for (@_) {
78     if ($_ eq "Coro") {
79     push @async, $ref;
80 root 1.26 unless ($init++) {
81     eval q{
82     sub INIT {
83     &async(pop @async) while @async;
84     }
85     };
86     }
87 root 1.8 } else {
88 root 1.17 push @attrs, $_;
89 root 1.8 }
90     }
91 root 1.17 return $old ? $old->($package, $ref, @attrs) : @attrs;
92 root 1.8 };
93     }
94    
95     }
96    
97 root 1.43 =over 4
98    
99 root 1.8 =item $main
100 root 1.2
101 root 1.8 This coroutine represents the main program.
102 root 1.1
103     =cut
104    
105 pcg 1.55 $main = new Coro;
106 root 1.8
107 root 1.19 =item $current (or as function: current)
108 root 1.1
109 root 1.83 The current coroutine (the last coroutine switched to). The initial value
110     is C<$main> (of course).
111    
112     This variable is B<strictly> I<read-only>. It is provided for performance
113 root 1.124 reasons. If performance is not essential you are encouraged to use the
114 root 1.83 C<Coro::current> function instead.
115 root 1.1
116 root 1.8 =cut
117    
118 root 1.131 $main->{desc} = "[main::]";
119    
120 root 1.8 # maybe some other module used Coro::Specific before...
121 root 1.93 $main->{specific} = $current->{specific}
122     if $current;
123 root 1.1
124 root 1.93 _set_current $main;
125 root 1.19
126     sub current() { $current }
127 root 1.9
128     =item $idle
129    
130 root 1.83 A callback that is called whenever the scheduler finds no ready coroutines
131     to run. The default implementation prints "FATAL: deadlock detected" and
132 root 1.91 exits, because the program has no other way to continue.
133 root 1.83
134     This hook is overwritten by modules such as C<Coro::Timer> and
135 root 1.91 C<Coro::Event> to wait on an external event that hopefully wake up a
136     coroutine so the scheduler can run it.
137    
138     Please note that if your callback recursively invokes perl (e.g. for event
139     handlers), then it must be prepared to be called recursively.
140 root 1.9
141     =cut
142    
143 root 1.83 $idle = sub {
144 root 1.96 require Carp;
145     Carp::croak ("FATAL: deadlock detected");
146 root 1.9 };
147 root 1.8
148 root 1.103 sub _cancel {
149     my ($self) = @_;
150    
151     # free coroutine data and mark as destructed
152     $self->_destroy
153     or return;
154    
155     # call all destruction callbacks
156     $_->(@{$self->{status}})
157     for @{(delete $self->{destroy_cb}) || []};
158     }
159    
160 root 1.137 sub _do_trace {
161     $current->{_trace_cb}->();
162     }
163    
164 root 1.24 # this coroutine is necessary because a coroutine
165     # cannot destroy itself.
166     my @destroy;
167 root 1.103 my $manager;
168    
169     $manager = new Coro sub {
170 pcg 1.57 while () {
171 root 1.103 (shift @destroy)->_cancel
172     while @destroy;
173    
174 root 1.24 &schedule;
175     }
176     };
177 root 1.132 $manager->desc ("[coro manager]");
178 root 1.103 $manager->prio (PRIO_MAX);
179    
180 root 1.8 # static methods. not really.
181 root 1.43
182     =back
183 root 1.8
184     =head2 STATIC METHODS
185    
186 root 1.92 Static methods are actually functions that operate on the current coroutine only.
187 root 1.8
188     =over 4
189    
190 root 1.13 =item async { ... } [@args...]
191 root 1.8
192 root 1.92 Create a new asynchronous coroutine and return it's coroutine object
193     (usually unused). When the sub returns the new coroutine is automatically
194 root 1.8 terminated.
195    
196 root 1.122 Calling C<exit> in a coroutine will do the same as calling exit outside
197     the coroutine. Likewise, when the coroutine dies, the program will exit,
198     just as it would in the main program.
199 root 1.79
200 root 1.13 # create a new coroutine that just prints its arguments
201     async {
202     print "@_\n";
203     } 1,2,3,4;
204    
205 root 1.8 =cut
206    
207 root 1.13 sub async(&@) {
208 root 1.104 my $coro = new Coro @_;
209     $coro->ready;
210     $coro
211 root 1.8 }
212 root 1.1
213 root 1.105 =item async_pool { ... } [@args...]
214    
215     Similar to C<async>, but uses a coroutine pool, so you should not call
216     terminate or join (although you are allowed to), and you get a coroutine
217     that might have executed other code already (which can be good or bad :).
218    
219     Also, the block is executed in an C<eval> context and a warning will be
220 root 1.108 issued in case of an exception instead of terminating the program, as
221     C<async> does. As the coroutine is being reused, stuff like C<on_destroy>
222     will not work in the expected way, unless you call terminate or cancel,
223     which somehow defeats the purpose of pooling.
224 root 1.105
225     The priority will be reset to C<0> after each job, otherwise the coroutine
226     will be re-used "as-is".
227    
228     The pool size is limited to 8 idle coroutines (this can be adjusted by
229     changing $Coro::POOL_SIZE), and there can be as many non-idle coros as
230     required.
231    
232     If you are concerned about pooled coroutines growing a lot because a
233 root 1.133 single C<async_pool> used a lot of stackspace you can e.g. C<async_pool
234     { terminate }> once per second or so to slowly replenish the pool. In
235     addition to that, when the stacks used by a handler grows larger than 16kb
236 root 1.134 (adjustable with $Coro::POOL_RSS) it will also exit.
237 root 1.105
238     =cut
239    
240     our $POOL_SIZE = 8;
241 root 1.134 our $POOL_RSS = 16 * 1024;
242     our @async_pool;
243 root 1.105
244     sub pool_handler {
245 root 1.134 my $cb;
246    
247 root 1.105 while () {
248 root 1.134 eval {
249     while () {
250 root 1.136 _pool_1 $cb;
251 root 1.134 &$cb;
252 root 1.136 _pool_2 $cb;
253 root 1.135 &schedule;
254 root 1.134 }
255 root 1.105 };
256 root 1.134
257 root 1.136 last if $@ eq "\3terminate\2\n";
258 root 1.105 warn $@ if $@;
259 root 1.106 }
260     }
261 root 1.105
262     sub async_pool(&@) {
263     # this is also inlined into the unlock_scheduler
264 root 1.135 my $coro = (pop @async_pool) || new Coro \&pool_handler;
265 root 1.105
266     $coro->{_invoke} = [@_];
267     $coro->ready;
268    
269     $coro
270     }
271    
272 root 1.8 =item schedule
273 root 1.6
274 root 1.92 Calls the scheduler. Please note that the current coroutine will not be put
275 root 1.8 into the ready queue, so calling this function usually means you will
276 root 1.91 never be called again unless something else (e.g. an event handler) calls
277     ready.
278    
279     The canonical way to wait on external events is this:
280    
281     {
282 root 1.92 # remember current coroutine
283 root 1.91 my $current = $Coro::current;
284    
285     # register a hypothetical event handler
286     on_event_invoke sub {
287     # wake up sleeping coroutine
288     $current->ready;
289     undef $current;
290     };
291    
292 root 1.124 # call schedule until event occurred.
293 root 1.91 # in case we are woken up for other reasons
294     # (current still defined), loop.
295     Coro::schedule while $current;
296     }
297 root 1.1
298 root 1.22 =item cede
299 root 1.1
300 root 1.92 "Cede" to other coroutines. This function puts the current coroutine into the
301 root 1.22 ready queue and calls C<schedule>, which has the effect of giving up the
302     current "timeslice" to other coroutines of the same or higher priority.
303 root 1.7
304 root 1.107 Returns true if at least one coroutine switch has happened.
305    
306 root 1.102 =item Coro::cede_notself
307    
308     Works like cede, but is not exported by default and will cede to any
309     coroutine, regardless of priority, once.
310    
311 root 1.107 Returns true if at least one coroutine switch has happened.
312    
313 root 1.40 =item terminate [arg...]
314 root 1.7
315 root 1.92 Terminates the current coroutine with the given status values (see L<cancel>).
316 root 1.13
317 root 1.1 =cut
318    
319 root 1.8 sub terminate {
320 pcg 1.59 $current->cancel (@_);
321 root 1.1 }
322 root 1.6
323 root 1.8 =back
324    
325     # dynamic methods
326    
327 root 1.92 =head2 COROUTINE METHODS
328 root 1.8
329 root 1.92 These are the methods you can call on coroutine objects.
330 root 1.6
331 root 1.8 =over 4
332    
333 root 1.13 =item new Coro \&sub [, @args...]
334 root 1.8
335 root 1.92 Create a new coroutine and return it. When the sub returns the coroutine
336 root 1.40 automatically terminates as if C<terminate> with the returned values were
337 root 1.92 called. To make the coroutine run you must first put it into the ready queue
338 root 1.41 by calling the ready method.
339 root 1.13
340 root 1.121 See C<async> for additional discussion.
341 root 1.89
342 root 1.6 =cut
343    
344 root 1.94 sub _run_coro {
345 root 1.13 terminate &{+shift};
346     }
347    
348 root 1.8 sub new {
349     my $class = shift;
350 root 1.83
351 root 1.94 $class->SUPER::new (\&_run_coro, @_)
352 root 1.8 }
353 root 1.6
354 root 1.92 =item $success = $coroutine->ready
355 root 1.1
356 root 1.92 Put the given coroutine into the ready queue (according to it's priority)
357     and return true. If the coroutine is already in the ready queue, do nothing
358 root 1.90 and return false.
359 root 1.1
360 root 1.92 =item $is_ready = $coroutine->is_ready
361 root 1.90
362 root 1.92 Return wether the coroutine is currently the ready queue or not,
363 root 1.28
364 root 1.92 =item $coroutine->cancel (arg...)
365 root 1.28
366 root 1.92 Terminates the given coroutine and makes it return the given arguments as
367 root 1.103 status (default: the empty list). Never returns if the coroutine is the
368     current coroutine.
369 root 1.28
370     =cut
371    
372     sub cancel {
373 pcg 1.59 my $self = shift;
374     $self->{status} = [@_];
375 root 1.103
376     if ($current == $self) {
377     push @destroy, $self;
378     $manager->ready;
379     &schedule while 1;
380     } else {
381     $self->_cancel;
382     }
383 root 1.40 }
384    
385 root 1.92 =item $coroutine->join
386 root 1.40
387     Wait until the coroutine terminates and return any values given to the
388 pcg 1.59 C<terminate> or C<cancel> functions. C<join> can be called multiple times
389 root 1.92 from multiple coroutine.
390 root 1.40
391     =cut
392    
393     sub join {
394     my $self = shift;
395 root 1.103
396 root 1.40 unless ($self->{status}) {
397 root 1.103 my $current = $current;
398    
399     push @{$self->{destroy_cb}}, sub {
400     $current->ready;
401     undef $current;
402     };
403    
404     &schedule while $current;
405 root 1.40 }
406 root 1.103
407 root 1.40 wantarray ? @{$self->{status}} : $self->{status}[0];
408 root 1.31 }
409    
410 root 1.101 =item $coroutine->on_destroy (\&cb)
411    
412     Registers a callback that is called when this coroutine gets destroyed,
413     but before it is joined. The callback gets passed the terminate arguments,
414     if any.
415    
416     =cut
417    
418     sub on_destroy {
419     my ($self, $cb) = @_;
420    
421     push @{ $self->{destroy_cb} }, $cb;
422     }
423    
424 root 1.92 =item $oldprio = $coroutine->prio ($newprio)
425 root 1.31
426 root 1.41 Sets (or gets, if the argument is missing) the priority of the
427 root 1.92 coroutine. Higher priority coroutines get run before lower priority
428     coroutines. Priorities are small signed integers (currently -4 .. +3),
429 root 1.41 that you can refer to using PRIO_xxx constants (use the import tag :prio
430     to get then):
431 root 1.31
432     PRIO_MAX > PRIO_HIGH > PRIO_NORMAL > PRIO_LOW > PRIO_IDLE > PRIO_MIN
433     3 > 1 > 0 > -1 > -3 > -4
434    
435     # set priority to HIGH
436     current->prio(PRIO_HIGH);
437    
438     The idle coroutine ($Coro::idle) always has a lower priority than any
439     existing coroutine.
440    
441 root 1.92 Changing the priority of the current coroutine will take effect immediately,
442     but changing the priority of coroutines in the ready queue (but not
443 root 1.31 running) will only take effect after the next schedule (of that
444 root 1.92 coroutine). This is a bug that will be fixed in some future version.
445 root 1.31
446 root 1.92 =item $newprio = $coroutine->nice ($change)
447 root 1.31
448     Similar to C<prio>, but subtract the given value from the priority (i.e.
449     higher values mean lower priority, just as in unix).
450    
451 root 1.92 =item $olddesc = $coroutine->desc ($newdesc)
452 root 1.41
453     Sets (or gets in case the argument is missing) the description for this
454 root 1.92 coroutine. This is just a free-form string you can associate with a coroutine.
455 root 1.41
456     =cut
457    
458     sub desc {
459     my $old = $_[0]{desc};
460     $_[0]{desc} = $_[1] if @_ > 1;
461     $old;
462 root 1.8 }
463 root 1.1
464 root 1.8 =back
465 root 1.2
466 root 1.97 =head2 GLOBAL FUNCTIONS
467 root 1.92
468     =over 4
469    
470 root 1.97 =item Coro::nready
471    
472     Returns the number of coroutines that are currently in the ready state,
473 root 1.124 i.e. that can be switched to. The value C<0> means that the only runnable
474 root 1.97 coroutine is the currently running one, so C<cede> would have no effect,
475     and C<schedule> would cause a deadlock unless there is an idle handler
476     that wakes up some coroutines.
477    
478 root 1.103 =item my $guard = Coro::guard { ... }
479    
480 root 1.119 This creates and returns a guard object. Nothing happens until the object
481 root 1.103 gets destroyed, in which case the codeblock given as argument will be
482     executed. This is useful to free locks or other resources in case of a
483     runtime error or when the coroutine gets canceled, as in both cases the
484     guard block will be executed. The guard object supports only one method,
485     C<< ->cancel >>, which will keep the codeblock from being executed.
486    
487     Example: set some flag and clear it again when the coroutine gets canceled
488     or the function returns:
489    
490     sub do_something {
491     my $guard = Coro::guard { $busy = 0 };
492     $busy = 1;
493    
494     # do something that requires $busy to be true
495     }
496    
497     =cut
498    
499     sub guard(&) {
500     bless \(my $cb = $_[0]), "Coro::guard"
501     }
502    
503     sub Coro::guard::cancel {
504     ${$_[0]} = sub { };
505     }
506    
507     sub Coro::guard::DESTROY {
508     ${$_[0]}->();
509     }
510    
511    
512 root 1.92 =item unblock_sub { ... }
513    
514     This utility function takes a BLOCK or code reference and "unblocks" it,
515     returning the new coderef. This means that the new coderef will return
516     immediately without blocking, returning nothing, while the original code
517     ref will be called (with parameters) from within its own coroutine.
518    
519 root 1.124 The reason this function exists is that many event libraries (such as the
520 root 1.92 venerable L<Event|Event> module) are not coroutine-safe (a weaker form
521     of thread-safety). This means you must not block within event callbacks,
522     otherwise you might suffer from crashes or worse.
523    
524     This function allows your callbacks to block by executing them in another
525     coroutine where it is safe to block. One example where blocking is handy
526     is when you use the L<Coro::AIO|Coro::AIO> functions to save results to
527     disk.
528    
529     In short: simply use C<unblock_sub { ... }> instead of C<sub { ... }> when
530     creating event callbacks that want to block.
531    
532     =cut
533    
534     our @unblock_queue;
535    
536 root 1.105 # we create a special coro because we want to cede,
537     # to reduce pressure on the coro pool (because most callbacks
538     # return immediately and can be reused) and because we cannot cede
539     # inside an event callback.
540 root 1.132 our $unblock_scheduler = new Coro sub {
541 root 1.92 while () {
542     while (my $cb = pop @unblock_queue) {
543 root 1.105 # this is an inlined copy of async_pool
544 root 1.134 my $coro = (pop @async_pool) || new Coro \&pool_handler;
545 root 1.105
546     $coro->{_invoke} = $cb;
547     $coro->ready;
548     cede; # for short-lived callbacks, this reduces pressure on the coro pool
549 root 1.92 }
550 root 1.105 schedule; # sleep well
551 root 1.92 }
552     };
553 root 1.132 $unblock_scheduler->desc ("[unblock_sub scheduler]");
554 root 1.92
555     sub unblock_sub(&) {
556     my $cb = shift;
557    
558     sub {
559 root 1.105 unshift @unblock_queue, [$cb, @_];
560 root 1.92 $unblock_scheduler->ready;
561     }
562     }
563    
564     =back
565    
566 root 1.8 =cut
567 root 1.2
568 root 1.8 1;
569 root 1.14
570 root 1.17 =head1 BUGS/LIMITATIONS
571 root 1.14
572 root 1.52 - you must make very sure that no coro is still active on global
573 root 1.53 destruction. very bad things might happen otherwise (usually segfaults).
574 root 1.52
575     - this module is not thread-safe. You should only ever use this module
576 root 1.124 from the same thread (this requirement might be loosened in the future
577 root 1.52 to allow per-thread schedulers, but Coro::State does not yet allow
578     this).
579 root 1.9
580     =head1 SEE ALSO
581    
582 root 1.67 Support/Utility: L<Coro::Cont>, L<Coro::Specific>, L<Coro::State>, L<Coro::Util>.
583    
584     Locking/IPC: L<Coro::Signal>, L<Coro::Channel>, L<Coro::Semaphore>, L<Coro::SemaphoreSet>, L<Coro::RWLock>.
585    
586     Event/IO: L<Coro::Timer>, L<Coro::Event>, L<Coro::Handle>, L<Coro::Socket>, L<Coro::Select>.
587    
588     Embedding: L<Coro:MakeMaker>
589 root 1.1
590     =head1 AUTHOR
591    
592 root 1.66 Marc Lehmann <schmorp@schmorp.de>
593 root 1.64 http://home.schmorp.de/
594 root 1.1
595     =cut
596