ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/Coro/Coro.pm
Revision: 1.152
Committed: Sun Oct 7 13:53:37 2007 UTC (16 years, 8 months ago) by root
Branch: MAIN
Changes since 1.151: +28 -15 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3 root 1.8 Coro - coroutine process abstraction
4 root 1.1
5     =head1 SYNOPSIS
6    
7     use Coro;
8    
9 root 1.8 async {
10     # some asynchronous thread of execution
11 root 1.152 print "2\n";
12     cede; # yield back to main
13     print "4\n";
14 root 1.2 };
15 root 1.152 print "1\n";
16     cede; # yield to coroutine
17     print "3\n";
18     cede; # and again
19    
20     # use locking
21     my $lock = new Coro::Semaphore;
22     my $locked;
23    
24     $lock->down;
25     $locked = 1;
26     $lock->up;
27 root 1.2
28 root 1.1 =head1 DESCRIPTION
29    
30 root 1.98 This module collection manages coroutines. Coroutines are similar
31     to threads but don't run in parallel at the same time even on SMP
32 root 1.124 machines. The specific flavor of coroutine used in this module also
33     guarantees you that it will not switch between coroutines unless
34 root 1.98 necessary, at easily-identified points in your program, so locking and
35     parallel access are rarely an issue, making coroutine programming much
36     safer than threads programming.
37    
38     (Perl, however, does not natively support real threads but instead does a
39     very slow and memory-intensive emulation of processes using threads. This
40     is a performance win on Windows machines, and a loss everywhere else).
41    
42     In this module, coroutines are defined as "callchain + lexical variables +
43     @_ + $_ + $@ + $/ + C stack), that is, a coroutine has its own callchain,
44     its own set of lexicals and its own set of perls most important global
45 root 1.152 variables (see L<Coro::State> for more configuration).
46 root 1.22
47 root 1.8 =cut
48    
49     package Coro;
50    
51 root 1.71 use strict;
52     no warnings "uninitialized";
53 root 1.36
54 root 1.8 use Coro::State;
55    
56 root 1.83 use base qw(Coro::State Exporter);
57 pcg 1.55
58 root 1.83 our $idle; # idle handler
59 root 1.71 our $main; # main coroutine
60     our $current; # current coroutine
61 root 1.8
62 root 1.152 our $VERSION = '4.1';
63 root 1.8
64 root 1.105 our @EXPORT = qw(async async_pool cede schedule terminate current unblock_sub);
65 root 1.71 our %EXPORT_TAGS = (
66 root 1.31 prio => [qw(PRIO_MAX PRIO_HIGH PRIO_NORMAL PRIO_LOW PRIO_IDLE PRIO_MIN)],
67     );
68 root 1.97 our @EXPORT_OK = (@{$EXPORT_TAGS{prio}}, qw(nready));
69 root 1.8
70     {
71     my @async;
72 root 1.26 my $init;
73 root 1.8
74     # this way of handling attributes simply is NOT scalable ;()
75     sub import {
76 root 1.71 no strict 'refs';
77    
78 root 1.93 Coro->export_to_level (1, @_);
79 root 1.71
80 root 1.8 my $old = *{(caller)[0]."::MODIFY_CODE_ATTRIBUTES"}{CODE};
81     *{(caller)[0]."::MODIFY_CODE_ATTRIBUTES"} = sub {
82     my ($package, $ref) = (shift, shift);
83     my @attrs;
84     for (@_) {
85     if ($_ eq "Coro") {
86     push @async, $ref;
87 root 1.26 unless ($init++) {
88     eval q{
89     sub INIT {
90     &async(pop @async) while @async;
91     }
92     };
93     }
94 root 1.8 } else {
95 root 1.17 push @attrs, $_;
96 root 1.8 }
97     }
98 root 1.17 return $old ? $old->($package, $ref, @attrs) : @attrs;
99 root 1.8 };
100     }
101    
102     }
103    
104 root 1.43 =over 4
105    
106 root 1.8 =item $main
107 root 1.2
108 root 1.8 This coroutine represents the main program.
109 root 1.1
110     =cut
111    
112 pcg 1.55 $main = new Coro;
113 root 1.8
114 root 1.19 =item $current (or as function: current)
115 root 1.1
116 root 1.83 The current coroutine (the last coroutine switched to). The initial value
117     is C<$main> (of course).
118    
119     This variable is B<strictly> I<read-only>. It is provided for performance
120 root 1.124 reasons. If performance is not essential you are encouraged to use the
121 root 1.83 C<Coro::current> function instead.
122 root 1.1
123 root 1.8 =cut
124    
125 root 1.131 $main->{desc} = "[main::]";
126    
127 root 1.8 # maybe some other module used Coro::Specific before...
128 root 1.142 $main->{_specific} = $current->{_specific}
129 root 1.93 if $current;
130 root 1.1
131 root 1.93 _set_current $main;
132 root 1.19
133     sub current() { $current }
134 root 1.9
135     =item $idle
136    
137 root 1.83 A callback that is called whenever the scheduler finds no ready coroutines
138     to run. The default implementation prints "FATAL: deadlock detected" and
139 root 1.91 exits, because the program has no other way to continue.
140 root 1.83
141     This hook is overwritten by modules such as C<Coro::Timer> and
142 root 1.91 C<Coro::Event> to wait on an external event that hopefully wake up a
143     coroutine so the scheduler can run it.
144    
145     Please note that if your callback recursively invokes perl (e.g. for event
146 root 1.152 handlers), then it must be prepared to be called recursively itself.
147 root 1.9
148     =cut
149    
150 root 1.83 $idle = sub {
151 root 1.96 require Carp;
152     Carp::croak ("FATAL: deadlock detected");
153 root 1.9 };
154 root 1.8
155 root 1.103 sub _cancel {
156     my ($self) = @_;
157    
158     # free coroutine data and mark as destructed
159     $self->_destroy
160     or return;
161    
162     # call all destruction callbacks
163 root 1.142 $_->(@{$self->{_status}})
164     for @{(delete $self->{_on_destroy}) || []};
165 root 1.103 }
166    
167 root 1.24 # this coroutine is necessary because a coroutine
168     # cannot destroy itself.
169     my @destroy;
170 root 1.103 my $manager;
171    
172     $manager = new Coro sub {
173 pcg 1.57 while () {
174 root 1.103 (shift @destroy)->_cancel
175     while @destroy;
176    
177 root 1.24 &schedule;
178     }
179     };
180 root 1.132 $manager->desc ("[coro manager]");
181 root 1.103 $manager->prio (PRIO_MAX);
182    
183 root 1.8 # static methods. not really.
184 root 1.43
185     =back
186 root 1.8
187     =head2 STATIC METHODS
188    
189 root 1.92 Static methods are actually functions that operate on the current coroutine only.
190 root 1.8
191     =over 4
192    
193 root 1.13 =item async { ... } [@args...]
194 root 1.8
195 root 1.92 Create a new asynchronous coroutine and return it's coroutine object
196     (usually unused). When the sub returns the new coroutine is automatically
197 root 1.8 terminated.
198    
199 root 1.145 See the C<Coro::State::new> constructor for info about the coroutine
200 root 1.152 environment in which coroutines run.
201 root 1.145
202 root 1.122 Calling C<exit> in a coroutine will do the same as calling exit outside
203     the coroutine. Likewise, when the coroutine dies, the program will exit,
204     just as it would in the main program.
205 root 1.79
206 root 1.13 # create a new coroutine that just prints its arguments
207     async {
208     print "@_\n";
209     } 1,2,3,4;
210    
211 root 1.8 =cut
212    
213 root 1.13 sub async(&@) {
214 root 1.104 my $coro = new Coro @_;
215     $coro->ready;
216     $coro
217 root 1.8 }
218 root 1.1
219 root 1.105 =item async_pool { ... } [@args...]
220    
221     Similar to C<async>, but uses a coroutine pool, so you should not call
222     terminate or join (although you are allowed to), and you get a coroutine
223     that might have executed other code already (which can be good or bad :).
224    
225     Also, the block is executed in an C<eval> context and a warning will be
226 root 1.108 issued in case of an exception instead of terminating the program, as
227     C<async> does. As the coroutine is being reused, stuff like C<on_destroy>
228     will not work in the expected way, unless you call terminate or cancel,
229     which somehow defeats the purpose of pooling.
230 root 1.105
231 root 1.146 The priority will be reset to C<0> after each job, tracing will be
232     disabled, the description will be reset and the default output filehandle
233     gets restored, so you can change alkl these. Otherwise the coroutine will
234     be re-used "as-is": most notably if you change other per-coroutine global
235     stuff such as C<$/> you need to revert that change, which is most simply
236     done by using local as in C< local $/ >.
237 root 1.105
238     The pool size is limited to 8 idle coroutines (this can be adjusted by
239     changing $Coro::POOL_SIZE), and there can be as many non-idle coros as
240     required.
241    
242     If you are concerned about pooled coroutines growing a lot because a
243 root 1.133 single C<async_pool> used a lot of stackspace you can e.g. C<async_pool
244     { terminate }> once per second or so to slowly replenish the pool. In
245     addition to that, when the stacks used by a handler grows larger than 16kb
246 root 1.134 (adjustable with $Coro::POOL_RSS) it will also exit.
247 root 1.105
248     =cut
249    
250     our $POOL_SIZE = 8;
251 root 1.134 our $POOL_RSS = 16 * 1024;
252     our @async_pool;
253 root 1.105
254     sub pool_handler {
255 root 1.134 my $cb;
256    
257 root 1.105 while () {
258 root 1.134 eval {
259     while () {
260 root 1.136 _pool_1 $cb;
261 root 1.134 &$cb;
262 root 1.136 _pool_2 $cb;
263 root 1.135 &schedule;
264 root 1.134 }
265 root 1.105 };
266 root 1.134
267 root 1.151 last if $@ eq "\3async_pool terminate\2\n";
268 root 1.105 warn $@ if $@;
269 root 1.106 }
270     }
271 root 1.105
272     sub async_pool(&@) {
273     # this is also inlined into the unlock_scheduler
274 root 1.135 my $coro = (pop @async_pool) || new Coro \&pool_handler;
275 root 1.105
276     $coro->{_invoke} = [@_];
277     $coro->ready;
278    
279     $coro
280     }
281    
282 root 1.8 =item schedule
283 root 1.6
284 root 1.92 Calls the scheduler. Please note that the current coroutine will not be put
285 root 1.8 into the ready queue, so calling this function usually means you will
286 root 1.91 never be called again unless something else (e.g. an event handler) calls
287     ready.
288    
289     The canonical way to wait on external events is this:
290    
291     {
292 root 1.92 # remember current coroutine
293 root 1.91 my $current = $Coro::current;
294    
295     # register a hypothetical event handler
296     on_event_invoke sub {
297     # wake up sleeping coroutine
298     $current->ready;
299     undef $current;
300     };
301    
302 root 1.124 # call schedule until event occurred.
303 root 1.91 # in case we are woken up for other reasons
304     # (current still defined), loop.
305     Coro::schedule while $current;
306     }
307 root 1.1
308 root 1.22 =item cede
309 root 1.1
310 root 1.92 "Cede" to other coroutines. This function puts the current coroutine into the
311 root 1.22 ready queue and calls C<schedule>, which has the effect of giving up the
312     current "timeslice" to other coroutines of the same or higher priority.
313 root 1.7
314 root 1.107 Returns true if at least one coroutine switch has happened.
315    
316 root 1.102 =item Coro::cede_notself
317    
318     Works like cede, but is not exported by default and will cede to any
319     coroutine, regardless of priority, once.
320    
321 root 1.107 Returns true if at least one coroutine switch has happened.
322    
323 root 1.40 =item terminate [arg...]
324 root 1.7
325 root 1.92 Terminates the current coroutine with the given status values (see L<cancel>).
326 root 1.13
327 root 1.141 =item killall
328    
329     Kills/terminates/cancels all coroutines except the currently running
330     one. This is useful after a fork, either in the child or the parent, as
331     usually only one of them should inherit the running coroutines.
332    
333 root 1.1 =cut
334    
335 root 1.8 sub terminate {
336 pcg 1.59 $current->cancel (@_);
337 root 1.1 }
338 root 1.6
339 root 1.141 sub killall {
340     for (Coro::State::list) {
341     $_->cancel
342     if $_ != $current && UNIVERSAL::isa $_, "Coro";
343     }
344     }
345    
346 root 1.8 =back
347    
348     # dynamic methods
349    
350 root 1.92 =head2 COROUTINE METHODS
351 root 1.8
352 root 1.92 These are the methods you can call on coroutine objects.
353 root 1.6
354 root 1.8 =over 4
355    
356 root 1.13 =item new Coro \&sub [, @args...]
357 root 1.8
358 root 1.92 Create a new coroutine and return it. When the sub returns the coroutine
359 root 1.40 automatically terminates as if C<terminate> with the returned values were
360 root 1.92 called. To make the coroutine run you must first put it into the ready queue
361 root 1.41 by calling the ready method.
362 root 1.13
363 root 1.145 See C<async> and C<Coro::State::new> for additional info about the
364     coroutine environment.
365 root 1.89
366 root 1.6 =cut
367    
368 root 1.94 sub _run_coro {
369 root 1.13 terminate &{+shift};
370     }
371    
372 root 1.8 sub new {
373     my $class = shift;
374 root 1.83
375 root 1.94 $class->SUPER::new (\&_run_coro, @_)
376 root 1.8 }
377 root 1.6
378 root 1.92 =item $success = $coroutine->ready
379 root 1.1
380 root 1.92 Put the given coroutine into the ready queue (according to it's priority)
381     and return true. If the coroutine is already in the ready queue, do nothing
382 root 1.90 and return false.
383 root 1.1
384 root 1.92 =item $is_ready = $coroutine->is_ready
385 root 1.90
386 root 1.92 Return wether the coroutine is currently the ready queue or not,
387 root 1.28
388 root 1.92 =item $coroutine->cancel (arg...)
389 root 1.28
390 root 1.92 Terminates the given coroutine and makes it return the given arguments as
391 root 1.103 status (default: the empty list). Never returns if the coroutine is the
392     current coroutine.
393 root 1.28
394     =cut
395    
396     sub cancel {
397 pcg 1.59 my $self = shift;
398 root 1.142 $self->{_status} = [@_];
399 root 1.103
400     if ($current == $self) {
401     push @destroy, $self;
402     $manager->ready;
403     &schedule while 1;
404     } else {
405     $self->_cancel;
406     }
407 root 1.40 }
408    
409 root 1.92 =item $coroutine->join
410 root 1.40
411     Wait until the coroutine terminates and return any values given to the
412 root 1.143 C<terminate> or C<cancel> functions. C<join> can be called concurrently
413     from multiple coroutines.
414 root 1.40
415     =cut
416    
417     sub join {
418     my $self = shift;
419 root 1.103
420 root 1.142 unless ($self->{_status}) {
421 root 1.103 my $current = $current;
422    
423 root 1.142 push @{$self->{_on_destroy}}, sub {
424 root 1.103 $current->ready;
425     undef $current;
426     };
427    
428     &schedule while $current;
429 root 1.40 }
430 root 1.103
431 root 1.142 wantarray ? @{$self->{_status}} : $self->{_status}[0];
432 root 1.31 }
433    
434 root 1.101 =item $coroutine->on_destroy (\&cb)
435    
436     Registers a callback that is called when this coroutine gets destroyed,
437     but before it is joined. The callback gets passed the terminate arguments,
438     if any.
439    
440     =cut
441    
442     sub on_destroy {
443     my ($self, $cb) = @_;
444    
445 root 1.142 push @{ $self->{_on_destroy} }, $cb;
446 root 1.101 }
447    
448 root 1.92 =item $oldprio = $coroutine->prio ($newprio)
449 root 1.31
450 root 1.41 Sets (or gets, if the argument is missing) the priority of the
451 root 1.92 coroutine. Higher priority coroutines get run before lower priority
452     coroutines. Priorities are small signed integers (currently -4 .. +3),
453 root 1.41 that you can refer to using PRIO_xxx constants (use the import tag :prio
454     to get then):
455 root 1.31
456     PRIO_MAX > PRIO_HIGH > PRIO_NORMAL > PRIO_LOW > PRIO_IDLE > PRIO_MIN
457     3 > 1 > 0 > -1 > -3 > -4
458    
459     # set priority to HIGH
460     current->prio(PRIO_HIGH);
461    
462     The idle coroutine ($Coro::idle) always has a lower priority than any
463     existing coroutine.
464    
465 root 1.92 Changing the priority of the current coroutine will take effect immediately,
466     but changing the priority of coroutines in the ready queue (but not
467 root 1.31 running) will only take effect after the next schedule (of that
468 root 1.92 coroutine). This is a bug that will be fixed in some future version.
469 root 1.31
470 root 1.92 =item $newprio = $coroutine->nice ($change)
471 root 1.31
472     Similar to C<prio>, but subtract the given value from the priority (i.e.
473     higher values mean lower priority, just as in unix).
474    
475 root 1.92 =item $olddesc = $coroutine->desc ($newdesc)
476 root 1.41
477     Sets (or gets in case the argument is missing) the description for this
478 root 1.92 coroutine. This is just a free-form string you can associate with a coroutine.
479 root 1.41
480 root 1.142 This method simply sets the C<< $coroutine->{desc} >> member to the given string. You
481     can modify this member directly if you wish.
482    
483 root 1.150 =item $coroutine->throw ([$scalar])
484    
485     If C<$throw> is specified and defined, it will be thrown as an exception
486     inside the coroutine at the next convinient point in time (usually after
487     it gains control at the next schedule/transfer/cede). Otherwise clears the
488     exception object.
489    
490     The exception object will be thrown "as is" with the specified scalar in
491     C<$@>, i.e. if it is a string, no line number or newline will be appended
492     (unlike with C<die>).
493    
494     This can be used as a softer means than C<cancel> to ask a coroutine to
495     end itself, although there is no guarentee that the exception will lead to
496     termination, and if the exception isn't caught it might well end the whole
497     program.
498    
499 root 1.41 =cut
500    
501     sub desc {
502     my $old = $_[0]{desc};
503     $_[0]{desc} = $_[1] if @_ > 1;
504     $old;
505 root 1.8 }
506 root 1.1
507 root 1.8 =back
508 root 1.2
509 root 1.97 =head2 GLOBAL FUNCTIONS
510 root 1.92
511     =over 4
512    
513 root 1.97 =item Coro::nready
514    
515     Returns the number of coroutines that are currently in the ready state,
516 root 1.124 i.e. that can be switched to. The value C<0> means that the only runnable
517 root 1.97 coroutine is the currently running one, so C<cede> would have no effect,
518     and C<schedule> would cause a deadlock unless there is an idle handler
519     that wakes up some coroutines.
520    
521 root 1.103 =item my $guard = Coro::guard { ... }
522    
523 root 1.119 This creates and returns a guard object. Nothing happens until the object
524 root 1.103 gets destroyed, in which case the codeblock given as argument will be
525     executed. This is useful to free locks or other resources in case of a
526     runtime error or when the coroutine gets canceled, as in both cases the
527     guard block will be executed. The guard object supports only one method,
528     C<< ->cancel >>, which will keep the codeblock from being executed.
529    
530     Example: set some flag and clear it again when the coroutine gets canceled
531     or the function returns:
532    
533     sub do_something {
534     my $guard = Coro::guard { $busy = 0 };
535     $busy = 1;
536    
537     # do something that requires $busy to be true
538     }
539    
540     =cut
541    
542     sub guard(&) {
543     bless \(my $cb = $_[0]), "Coro::guard"
544     }
545    
546     sub Coro::guard::cancel {
547     ${$_[0]} = sub { };
548     }
549    
550     sub Coro::guard::DESTROY {
551     ${$_[0]}->();
552     }
553    
554    
555 root 1.92 =item unblock_sub { ... }
556    
557     This utility function takes a BLOCK or code reference and "unblocks" it,
558     returning the new coderef. This means that the new coderef will return
559     immediately without blocking, returning nothing, while the original code
560     ref will be called (with parameters) from within its own coroutine.
561    
562 root 1.124 The reason this function exists is that many event libraries (such as the
563 root 1.92 venerable L<Event|Event> module) are not coroutine-safe (a weaker form
564     of thread-safety). This means you must not block within event callbacks,
565     otherwise you might suffer from crashes or worse.
566    
567     This function allows your callbacks to block by executing them in another
568     coroutine where it is safe to block. One example where blocking is handy
569     is when you use the L<Coro::AIO|Coro::AIO> functions to save results to
570     disk.
571    
572     In short: simply use C<unblock_sub { ... }> instead of C<sub { ... }> when
573     creating event callbacks that want to block.
574    
575     =cut
576    
577     our @unblock_queue;
578    
579 root 1.105 # we create a special coro because we want to cede,
580     # to reduce pressure on the coro pool (because most callbacks
581     # return immediately and can be reused) and because we cannot cede
582     # inside an event callback.
583 root 1.132 our $unblock_scheduler = new Coro sub {
584 root 1.92 while () {
585     while (my $cb = pop @unblock_queue) {
586 root 1.105 # this is an inlined copy of async_pool
587 root 1.134 my $coro = (pop @async_pool) || new Coro \&pool_handler;
588 root 1.105
589     $coro->{_invoke} = $cb;
590     $coro->ready;
591     cede; # for short-lived callbacks, this reduces pressure on the coro pool
592 root 1.92 }
593 root 1.105 schedule; # sleep well
594 root 1.92 }
595     };
596 root 1.132 $unblock_scheduler->desc ("[unblock_sub scheduler]");
597 root 1.92
598     sub unblock_sub(&) {
599     my $cb = shift;
600    
601     sub {
602 root 1.105 unshift @unblock_queue, [$cb, @_];
603 root 1.92 $unblock_scheduler->ready;
604     }
605     }
606    
607     =back
608    
609 root 1.8 =cut
610 root 1.2
611 root 1.8 1;
612 root 1.14
613 root 1.17 =head1 BUGS/LIMITATIONS
614 root 1.14
615 root 1.52 - you must make very sure that no coro is still active on global
616 root 1.53 destruction. very bad things might happen otherwise (usually segfaults).
617 root 1.52
618     - this module is not thread-safe. You should only ever use this module
619 root 1.124 from the same thread (this requirement might be loosened in the future
620 root 1.52 to allow per-thread schedulers, but Coro::State does not yet allow
621     this).
622 root 1.9
623     =head1 SEE ALSO
624    
625 root 1.152 Lower level Configuration, Coroutine Environment: L<Coro::State>.
626    
627     Debugging: L<Coro::Debug>.
628    
629     Support/Utility: L<Coro::Specific>, L<Coro::Util>.
630 root 1.67
631     Locking/IPC: L<Coro::Signal>, L<Coro::Channel>, L<Coro::Semaphore>, L<Coro::SemaphoreSet>, L<Coro::RWLock>.
632    
633 root 1.152 Event/IO: L<Coro::Timer>, L<Coro::Event>, L<Coro::Handle>, L<Coro::Socket>.
634    
635     Compatibility: L<Coro::LWP>, L<Coro::Storable>, L<Coro::Select>.
636 root 1.67
637 root 1.152 Embedding: L<Coro:MakeMaker>.
638 root 1.1
639     =head1 AUTHOR
640    
641 root 1.66 Marc Lehmann <schmorp@schmorp.de>
642 root 1.64 http://home.schmorp.de/
643 root 1.1
644     =cut
645