ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/Coro/Coro.pm
Revision: 1.177
Committed: Mon Apr 14 11:28:59 2008 UTC (16 years, 1 month ago) by root
Branch: MAIN
CVS Tags: rel-4_51
Changes since 1.176: +1 -1 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3 root 1.8 Coro - coroutine process abstraction
4 root 1.1
5     =head1 SYNOPSIS
6    
7     use Coro;
8    
9 root 1.8 async {
10     # some asynchronous thread of execution
11 root 1.152 print "2\n";
12     cede; # yield back to main
13     print "4\n";
14 root 1.2 };
15 root 1.152 print "1\n";
16     cede; # yield to coroutine
17     print "3\n";
18     cede; # and again
19    
20     # use locking
21     my $lock = new Coro::Semaphore;
22     my $locked;
23    
24     $lock->down;
25     $locked = 1;
26     $lock->up;
27 root 1.2
28 root 1.1 =head1 DESCRIPTION
29    
30 root 1.98 This module collection manages coroutines. Coroutines are similar
31     to threads but don't run in parallel at the same time even on SMP
32 root 1.124 machines. The specific flavor of coroutine used in this module also
33     guarantees you that it will not switch between coroutines unless
34 root 1.98 necessary, at easily-identified points in your program, so locking and
35     parallel access are rarely an issue, making coroutine programming much
36     safer than threads programming.
37    
38     (Perl, however, does not natively support real threads but instead does a
39     very slow and memory-intensive emulation of processes using threads. This
40     is a performance win on Windows machines, and a loss everywhere else).
41    
42     In this module, coroutines are defined as "callchain + lexical variables +
43     @_ + $_ + $@ + $/ + C stack), that is, a coroutine has its own callchain,
44     its own set of lexicals and its own set of perls most important global
45 root 1.152 variables (see L<Coro::State> for more configuration).
46 root 1.22
47 root 1.8 =cut
48    
49     package Coro;
50    
51 root 1.71 use strict;
52     no warnings "uninitialized";
53 root 1.36
54 root 1.8 use Coro::State;
55    
56 root 1.83 use base qw(Coro::State Exporter);
57 pcg 1.55
58 root 1.83 our $idle; # idle handler
59 root 1.71 our $main; # main coroutine
60     our $current; # current coroutine
61 root 1.8
62 root 1.177 our $VERSION = '4.51';
63 root 1.8
64 root 1.105 our @EXPORT = qw(async async_pool cede schedule terminate current unblock_sub);
65 root 1.71 our %EXPORT_TAGS = (
66 root 1.31 prio => [qw(PRIO_MAX PRIO_HIGH PRIO_NORMAL PRIO_LOW PRIO_IDLE PRIO_MIN)],
67     );
68 root 1.97 our @EXPORT_OK = (@{$EXPORT_TAGS{prio}}, qw(nready));
69 root 1.8
70     {
71     my @async;
72 root 1.26 my $init;
73 root 1.8
74     # this way of handling attributes simply is NOT scalable ;()
75     sub import {
76 root 1.71 no strict 'refs';
77    
78 root 1.93 Coro->export_to_level (1, @_);
79 root 1.71
80 root 1.8 my $old = *{(caller)[0]."::MODIFY_CODE_ATTRIBUTES"}{CODE};
81     *{(caller)[0]."::MODIFY_CODE_ATTRIBUTES"} = sub {
82     my ($package, $ref) = (shift, shift);
83     my @attrs;
84     for (@_) {
85     if ($_ eq "Coro") {
86     push @async, $ref;
87 root 1.26 unless ($init++) {
88     eval q{
89     sub INIT {
90     &async(pop @async) while @async;
91     }
92     };
93     }
94 root 1.8 } else {
95 root 1.17 push @attrs, $_;
96 root 1.8 }
97     }
98 root 1.17 return $old ? $old->($package, $ref, @attrs) : @attrs;
99 root 1.8 };
100     }
101    
102     }
103    
104 root 1.43 =over 4
105    
106 root 1.8 =item $main
107 root 1.2
108 root 1.8 This coroutine represents the main program.
109 root 1.1
110     =cut
111    
112 pcg 1.55 $main = new Coro;
113 root 1.8
114 root 1.19 =item $current (or as function: current)
115 root 1.1
116 root 1.83 The current coroutine (the last coroutine switched to). The initial value
117     is C<$main> (of course).
118    
119     This variable is B<strictly> I<read-only>. It is provided for performance
120 root 1.124 reasons. If performance is not essential you are encouraged to use the
121 root 1.83 C<Coro::current> function instead.
122 root 1.1
123 root 1.8 =cut
124    
125 root 1.131 $main->{desc} = "[main::]";
126    
127 root 1.8 # maybe some other module used Coro::Specific before...
128 root 1.142 $main->{_specific} = $current->{_specific}
129 root 1.93 if $current;
130 root 1.1
131 root 1.93 _set_current $main;
132 root 1.19
133     sub current() { $current }
134 root 1.9
135     =item $idle
136    
137 root 1.83 A callback that is called whenever the scheduler finds no ready coroutines
138     to run. The default implementation prints "FATAL: deadlock detected" and
139 root 1.91 exits, because the program has no other way to continue.
140 root 1.83
141     This hook is overwritten by modules such as C<Coro::Timer> and
142 root 1.91 C<Coro::Event> to wait on an external event that hopefully wake up a
143     coroutine so the scheduler can run it.
144    
145     Please note that if your callback recursively invokes perl (e.g. for event
146 root 1.152 handlers), then it must be prepared to be called recursively itself.
147 root 1.9
148     =cut
149    
150 root 1.83 $idle = sub {
151 root 1.96 require Carp;
152     Carp::croak ("FATAL: deadlock detected");
153 root 1.9 };
154 root 1.8
155 root 1.103 sub _cancel {
156     my ($self) = @_;
157    
158     # free coroutine data and mark as destructed
159     $self->_destroy
160     or return;
161    
162     # call all destruction callbacks
163 root 1.142 $_->(@{$self->{_status}})
164     for @{(delete $self->{_on_destroy}) || []};
165 root 1.103 }
166    
167 root 1.24 # this coroutine is necessary because a coroutine
168     # cannot destroy itself.
169     my @destroy;
170 root 1.103 my $manager;
171    
172     $manager = new Coro sub {
173 pcg 1.57 while () {
174 root 1.103 (shift @destroy)->_cancel
175     while @destroy;
176    
177 root 1.24 &schedule;
178     }
179     };
180 root 1.132 $manager->desc ("[coro manager]");
181 root 1.103 $manager->prio (PRIO_MAX);
182    
183 root 1.8 # static methods. not really.
184 root 1.43
185     =back
186 root 1.8
187     =head2 STATIC METHODS
188    
189 root 1.92 Static methods are actually functions that operate on the current coroutine only.
190 root 1.8
191     =over 4
192    
193 root 1.13 =item async { ... } [@args...]
194 root 1.8
195 root 1.92 Create a new asynchronous coroutine and return it's coroutine object
196     (usually unused). When the sub returns the new coroutine is automatically
197 root 1.8 terminated.
198    
199 root 1.145 See the C<Coro::State::new> constructor for info about the coroutine
200 root 1.152 environment in which coroutines run.
201 root 1.145
202 root 1.122 Calling C<exit> in a coroutine will do the same as calling exit outside
203     the coroutine. Likewise, when the coroutine dies, the program will exit,
204     just as it would in the main program.
205 root 1.79
206 root 1.13 # create a new coroutine that just prints its arguments
207     async {
208     print "@_\n";
209     } 1,2,3,4;
210    
211 root 1.8 =cut
212    
213 root 1.13 sub async(&@) {
214 root 1.104 my $coro = new Coro @_;
215     $coro->ready;
216     $coro
217 root 1.8 }
218 root 1.1
219 root 1.105 =item async_pool { ... } [@args...]
220    
221     Similar to C<async>, but uses a coroutine pool, so you should not call
222     terminate or join (although you are allowed to), and you get a coroutine
223     that might have executed other code already (which can be good or bad :).
224    
225     Also, the block is executed in an C<eval> context and a warning will be
226 root 1.108 issued in case of an exception instead of terminating the program, as
227     C<async> does. As the coroutine is being reused, stuff like C<on_destroy>
228     will not work in the expected way, unless you call terminate or cancel,
229     which somehow defeats the purpose of pooling.
230 root 1.105
231 root 1.146 The priority will be reset to C<0> after each job, tracing will be
232     disabled, the description will be reset and the default output filehandle
233     gets restored, so you can change alkl these. Otherwise the coroutine will
234     be re-used "as-is": most notably if you change other per-coroutine global
235     stuff such as C<$/> you need to revert that change, which is most simply
236     done by using local as in C< local $/ >.
237 root 1.105
238     The pool size is limited to 8 idle coroutines (this can be adjusted by
239     changing $Coro::POOL_SIZE), and there can be as many non-idle coros as
240     required.
241    
242     If you are concerned about pooled coroutines growing a lot because a
243 root 1.133 single C<async_pool> used a lot of stackspace you can e.g. C<async_pool
244     { terminate }> once per second or so to slowly replenish the pool. In
245     addition to that, when the stacks used by a handler grows larger than 16kb
246 root 1.134 (adjustable with $Coro::POOL_RSS) it will also exit.
247 root 1.105
248     =cut
249    
250     our $POOL_SIZE = 8;
251 root 1.134 our $POOL_RSS = 16 * 1024;
252     our @async_pool;
253 root 1.105
254     sub pool_handler {
255 root 1.134 my $cb;
256    
257 root 1.105 while () {
258 root 1.134 eval {
259     while () {
260 root 1.136 _pool_1 $cb;
261 root 1.134 &$cb;
262 root 1.136 _pool_2 $cb;
263 root 1.135 &schedule;
264 root 1.134 }
265 root 1.105 };
266 root 1.134
267 root 1.151 last if $@ eq "\3async_pool terminate\2\n";
268 root 1.105 warn $@ if $@;
269 root 1.106 }
270     }
271 root 1.105
272     sub async_pool(&@) {
273     # this is also inlined into the unlock_scheduler
274 root 1.135 my $coro = (pop @async_pool) || new Coro \&pool_handler;
275 root 1.105
276     $coro->{_invoke} = [@_];
277     $coro->ready;
278    
279     $coro
280     }
281    
282 root 1.8 =item schedule
283 root 1.6
284 root 1.92 Calls the scheduler. Please note that the current coroutine will not be put
285 root 1.8 into the ready queue, so calling this function usually means you will
286 root 1.91 never be called again unless something else (e.g. an event handler) calls
287     ready.
288    
289     The canonical way to wait on external events is this:
290    
291     {
292 root 1.92 # remember current coroutine
293 root 1.91 my $current = $Coro::current;
294    
295     # register a hypothetical event handler
296     on_event_invoke sub {
297     # wake up sleeping coroutine
298     $current->ready;
299     undef $current;
300     };
301    
302 root 1.124 # call schedule until event occurred.
303 root 1.91 # in case we are woken up for other reasons
304     # (current still defined), loop.
305     Coro::schedule while $current;
306     }
307 root 1.1
308 root 1.22 =item cede
309 root 1.1
310 root 1.92 "Cede" to other coroutines. This function puts the current coroutine into the
311 root 1.22 ready queue and calls C<schedule>, which has the effect of giving up the
312     current "timeslice" to other coroutines of the same or higher priority.
313 root 1.7
314 root 1.102 =item Coro::cede_notself
315    
316     Works like cede, but is not exported by default and will cede to any
317     coroutine, regardless of priority, once.
318    
319 root 1.40 =item terminate [arg...]
320 root 1.7
321 root 1.92 Terminates the current coroutine with the given status values (see L<cancel>).
322 root 1.13
323 root 1.141 =item killall
324    
325     Kills/terminates/cancels all coroutines except the currently running
326     one. This is useful after a fork, either in the child or the parent, as
327     usually only one of them should inherit the running coroutines.
328    
329 root 1.1 =cut
330    
331 root 1.8 sub terminate {
332 pcg 1.59 $current->cancel (@_);
333 root 1.1 }
334 root 1.6
335 root 1.141 sub killall {
336     for (Coro::State::list) {
337     $_->cancel
338     if $_ != $current && UNIVERSAL::isa $_, "Coro";
339     }
340     }
341    
342 root 1.8 =back
343    
344     # dynamic methods
345    
346 root 1.92 =head2 COROUTINE METHODS
347 root 1.8
348 root 1.92 These are the methods you can call on coroutine objects.
349 root 1.6
350 root 1.8 =over 4
351    
352 root 1.13 =item new Coro \&sub [, @args...]
353 root 1.8
354 root 1.92 Create a new coroutine and return it. When the sub returns the coroutine
355 root 1.40 automatically terminates as if C<terminate> with the returned values were
356 root 1.92 called. To make the coroutine run you must first put it into the ready queue
357 root 1.41 by calling the ready method.
358 root 1.13
359 root 1.145 See C<async> and C<Coro::State::new> for additional info about the
360     coroutine environment.
361 root 1.89
362 root 1.6 =cut
363    
364 root 1.94 sub _run_coro {
365 root 1.13 terminate &{+shift};
366     }
367    
368 root 1.8 sub new {
369     my $class = shift;
370 root 1.83
371 root 1.94 $class->SUPER::new (\&_run_coro, @_)
372 root 1.8 }
373 root 1.6
374 root 1.92 =item $success = $coroutine->ready
375 root 1.1
376 root 1.92 Put the given coroutine into the ready queue (according to it's priority)
377     and return true. If the coroutine is already in the ready queue, do nothing
378 root 1.90 and return false.
379 root 1.1
380 root 1.92 =item $is_ready = $coroutine->is_ready
381 root 1.90
382 root 1.92 Return wether the coroutine is currently the ready queue or not,
383 root 1.28
384 root 1.92 =item $coroutine->cancel (arg...)
385 root 1.28
386 root 1.92 Terminates the given coroutine and makes it return the given arguments as
387 root 1.103 status (default: the empty list). Never returns if the coroutine is the
388     current coroutine.
389 root 1.28
390     =cut
391    
392     sub cancel {
393 pcg 1.59 my $self = shift;
394 root 1.142 $self->{_status} = [@_];
395 root 1.103
396     if ($current == $self) {
397     push @destroy, $self;
398     $manager->ready;
399     &schedule while 1;
400     } else {
401     $self->_cancel;
402     }
403 root 1.40 }
404    
405 root 1.92 =item $coroutine->join
406 root 1.40
407     Wait until the coroutine terminates and return any values given to the
408 root 1.143 C<terminate> or C<cancel> functions. C<join> can be called concurrently
409     from multiple coroutines.
410 root 1.40
411     =cut
412    
413     sub join {
414     my $self = shift;
415 root 1.103
416 root 1.142 unless ($self->{_status}) {
417 root 1.103 my $current = $current;
418    
419 root 1.142 push @{$self->{_on_destroy}}, sub {
420 root 1.103 $current->ready;
421     undef $current;
422     };
423    
424     &schedule while $current;
425 root 1.40 }
426 root 1.103
427 root 1.142 wantarray ? @{$self->{_status}} : $self->{_status}[0];
428 root 1.31 }
429    
430 root 1.101 =item $coroutine->on_destroy (\&cb)
431    
432     Registers a callback that is called when this coroutine gets destroyed,
433     but before it is joined. The callback gets passed the terminate arguments,
434     if any.
435    
436     =cut
437    
438     sub on_destroy {
439     my ($self, $cb) = @_;
440    
441 root 1.142 push @{ $self->{_on_destroy} }, $cb;
442 root 1.101 }
443    
444 root 1.92 =item $oldprio = $coroutine->prio ($newprio)
445 root 1.31
446 root 1.41 Sets (or gets, if the argument is missing) the priority of the
447 root 1.92 coroutine. Higher priority coroutines get run before lower priority
448     coroutines. Priorities are small signed integers (currently -4 .. +3),
449 root 1.41 that you can refer to using PRIO_xxx constants (use the import tag :prio
450     to get then):
451 root 1.31
452     PRIO_MAX > PRIO_HIGH > PRIO_NORMAL > PRIO_LOW > PRIO_IDLE > PRIO_MIN
453     3 > 1 > 0 > -1 > -3 > -4
454    
455     # set priority to HIGH
456     current->prio(PRIO_HIGH);
457    
458     The idle coroutine ($Coro::idle) always has a lower priority than any
459     existing coroutine.
460    
461 root 1.92 Changing the priority of the current coroutine will take effect immediately,
462     but changing the priority of coroutines in the ready queue (but not
463 root 1.31 running) will only take effect after the next schedule (of that
464 root 1.92 coroutine). This is a bug that will be fixed in some future version.
465 root 1.31
466 root 1.92 =item $newprio = $coroutine->nice ($change)
467 root 1.31
468     Similar to C<prio>, but subtract the given value from the priority (i.e.
469     higher values mean lower priority, just as in unix).
470    
471 root 1.92 =item $olddesc = $coroutine->desc ($newdesc)
472 root 1.41
473     Sets (or gets in case the argument is missing) the description for this
474 root 1.92 coroutine. This is just a free-form string you can associate with a coroutine.
475 root 1.41
476 root 1.142 This method simply sets the C<< $coroutine->{desc} >> member to the given string. You
477     can modify this member directly if you wish.
478    
479 root 1.150 =item $coroutine->throw ([$scalar])
480    
481     If C<$throw> is specified and defined, it will be thrown as an exception
482     inside the coroutine at the next convinient point in time (usually after
483     it gains control at the next schedule/transfer/cede). Otherwise clears the
484     exception object.
485    
486     The exception object will be thrown "as is" with the specified scalar in
487     C<$@>, i.e. if it is a string, no line number or newline will be appended
488     (unlike with C<die>).
489    
490     This can be used as a softer means than C<cancel> to ask a coroutine to
491     end itself, although there is no guarentee that the exception will lead to
492     termination, and if the exception isn't caught it might well end the whole
493     program.
494    
495 root 1.41 =cut
496    
497     sub desc {
498     my $old = $_[0]{desc};
499     $_[0]{desc} = $_[1] if @_ > 1;
500     $old;
501 root 1.8 }
502 root 1.1
503 root 1.8 =back
504 root 1.2
505 root 1.97 =head2 GLOBAL FUNCTIONS
506 root 1.92
507     =over 4
508    
509 root 1.97 =item Coro::nready
510    
511     Returns the number of coroutines that are currently in the ready state,
512 root 1.124 i.e. that can be switched to. The value C<0> means that the only runnable
513 root 1.97 coroutine is the currently running one, so C<cede> would have no effect,
514     and C<schedule> would cause a deadlock unless there is an idle handler
515     that wakes up some coroutines.
516    
517 root 1.103 =item my $guard = Coro::guard { ... }
518    
519 root 1.119 This creates and returns a guard object. Nothing happens until the object
520 root 1.103 gets destroyed, in which case the codeblock given as argument will be
521     executed. This is useful to free locks or other resources in case of a
522     runtime error or when the coroutine gets canceled, as in both cases the
523     guard block will be executed. The guard object supports only one method,
524     C<< ->cancel >>, which will keep the codeblock from being executed.
525    
526     Example: set some flag and clear it again when the coroutine gets canceled
527     or the function returns:
528    
529     sub do_something {
530     my $guard = Coro::guard { $busy = 0 };
531     $busy = 1;
532    
533     # do something that requires $busy to be true
534     }
535    
536     =cut
537    
538     sub guard(&) {
539     bless \(my $cb = $_[0]), "Coro::guard"
540     }
541    
542     sub Coro::guard::cancel {
543     ${$_[0]} = sub { };
544     }
545    
546     sub Coro::guard::DESTROY {
547     ${$_[0]}->();
548     }
549    
550    
551 root 1.92 =item unblock_sub { ... }
552    
553     This utility function takes a BLOCK or code reference and "unblocks" it,
554     returning the new coderef. This means that the new coderef will return
555     immediately without blocking, returning nothing, while the original code
556     ref will be called (with parameters) from within its own coroutine.
557    
558 root 1.124 The reason this function exists is that many event libraries (such as the
559 root 1.92 venerable L<Event|Event> module) are not coroutine-safe (a weaker form
560     of thread-safety). This means you must not block within event callbacks,
561     otherwise you might suffer from crashes or worse.
562    
563     This function allows your callbacks to block by executing them in another
564     coroutine where it is safe to block. One example where blocking is handy
565     is when you use the L<Coro::AIO|Coro::AIO> functions to save results to
566     disk.
567    
568     In short: simply use C<unblock_sub { ... }> instead of C<sub { ... }> when
569     creating event callbacks that want to block.
570    
571     =cut
572    
573     our @unblock_queue;
574    
575 root 1.105 # we create a special coro because we want to cede,
576     # to reduce pressure on the coro pool (because most callbacks
577     # return immediately and can be reused) and because we cannot cede
578     # inside an event callback.
579 root 1.132 our $unblock_scheduler = new Coro sub {
580 root 1.92 while () {
581     while (my $cb = pop @unblock_queue) {
582 root 1.105 # this is an inlined copy of async_pool
583 root 1.134 my $coro = (pop @async_pool) || new Coro \&pool_handler;
584 root 1.105
585     $coro->{_invoke} = $cb;
586     $coro->ready;
587     cede; # for short-lived callbacks, this reduces pressure on the coro pool
588 root 1.92 }
589 root 1.105 schedule; # sleep well
590 root 1.92 }
591     };
592 root 1.132 $unblock_scheduler->desc ("[unblock_sub scheduler]");
593 root 1.92
594     sub unblock_sub(&) {
595     my $cb = shift;
596    
597     sub {
598 root 1.105 unshift @unblock_queue, [$cb, @_];
599 root 1.92 $unblock_scheduler->ready;
600     }
601     }
602    
603     =back
604    
605 root 1.8 =cut
606 root 1.2
607 root 1.8 1;
608 root 1.14
609 root 1.17 =head1 BUGS/LIMITATIONS
610 root 1.14
611 root 1.52 - you must make very sure that no coro is still active on global
612 root 1.53 destruction. very bad things might happen otherwise (usually segfaults).
613 root 1.52
614     - this module is not thread-safe. You should only ever use this module
615 root 1.124 from the same thread (this requirement might be loosened in the future
616 root 1.52 to allow per-thread schedulers, but Coro::State does not yet allow
617     this).
618 root 1.9
619     =head1 SEE ALSO
620    
621 root 1.152 Lower level Configuration, Coroutine Environment: L<Coro::State>.
622    
623     Debugging: L<Coro::Debug>.
624    
625     Support/Utility: L<Coro::Specific>, L<Coro::Util>.
626 root 1.67
627     Locking/IPC: L<Coro::Signal>, L<Coro::Channel>, L<Coro::Semaphore>, L<Coro::SemaphoreSet>, L<Coro::RWLock>.
628    
629 root 1.152 Event/IO: L<Coro::Timer>, L<Coro::Event>, L<Coro::Handle>, L<Coro::Socket>.
630    
631     Compatibility: L<Coro::LWP>, L<Coro::Storable>, L<Coro::Select>.
632 root 1.67
633 root 1.162 Embedding: L<Coro::MakeMaker>.
634 root 1.1
635     =head1 AUTHOR
636    
637 root 1.66 Marc Lehmann <schmorp@schmorp.de>
638 root 1.64 http://home.schmorp.de/
639 root 1.1
640     =cut
641