ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/Coro/Coro.pm
Revision: 1.149
Committed: Sat Oct 6 00:39:07 2007 UTC (16 years, 8 months ago) by root
Branch: MAIN
CVS Tags: rel-4_02
Changes since 1.148: +1 -1 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3 root 1.8 Coro - coroutine process abstraction
4 root 1.1
5     =head1 SYNOPSIS
6    
7     use Coro;
8    
9 root 1.8 async {
10     # some asynchronous thread of execution
11 root 1.2 };
12    
13 root 1.92 # alternatively create an async coroutine like this:
14 root 1.6
15 root 1.8 sub some_func : Coro {
16     # some more async code
17     }
18    
19 root 1.22 cede;
20 root 1.2
21 root 1.1 =head1 DESCRIPTION
22    
23 root 1.98 This module collection manages coroutines. Coroutines are similar
24     to threads but don't run in parallel at the same time even on SMP
25 root 1.124 machines. The specific flavor of coroutine used in this module also
26     guarantees you that it will not switch between coroutines unless
27 root 1.98 necessary, at easily-identified points in your program, so locking and
28     parallel access are rarely an issue, making coroutine programming much
29     safer than threads programming.
30    
31     (Perl, however, does not natively support real threads but instead does a
32     very slow and memory-intensive emulation of processes using threads. This
33     is a performance win on Windows machines, and a loss everywhere else).
34    
35     In this module, coroutines are defined as "callchain + lexical variables +
36     @_ + $_ + $@ + $/ + C stack), that is, a coroutine has its own callchain,
37     its own set of lexicals and its own set of perls most important global
38     variables.
39 root 1.22
40 root 1.8 =cut
41    
42     package Coro;
43    
44 root 1.71 use strict;
45     no warnings "uninitialized";
46 root 1.36
47 root 1.8 use Coro::State;
48    
49 root 1.83 use base qw(Coro::State Exporter);
50 pcg 1.55
51 root 1.83 our $idle; # idle handler
52 root 1.71 our $main; # main coroutine
53     our $current; # current coroutine
54 root 1.8
55 root 1.149 our $VERSION = '4.02';
56 root 1.8
57 root 1.105 our @EXPORT = qw(async async_pool cede schedule terminate current unblock_sub);
58 root 1.71 our %EXPORT_TAGS = (
59 root 1.31 prio => [qw(PRIO_MAX PRIO_HIGH PRIO_NORMAL PRIO_LOW PRIO_IDLE PRIO_MIN)],
60     );
61 root 1.97 our @EXPORT_OK = (@{$EXPORT_TAGS{prio}}, qw(nready));
62 root 1.8
63     {
64     my @async;
65 root 1.26 my $init;
66 root 1.8
67     # this way of handling attributes simply is NOT scalable ;()
68     sub import {
69 root 1.71 no strict 'refs';
70    
71 root 1.93 Coro->export_to_level (1, @_);
72 root 1.71
73 root 1.8 my $old = *{(caller)[0]."::MODIFY_CODE_ATTRIBUTES"}{CODE};
74     *{(caller)[0]."::MODIFY_CODE_ATTRIBUTES"} = sub {
75     my ($package, $ref) = (shift, shift);
76     my @attrs;
77     for (@_) {
78     if ($_ eq "Coro") {
79     push @async, $ref;
80 root 1.26 unless ($init++) {
81     eval q{
82     sub INIT {
83     &async(pop @async) while @async;
84     }
85     };
86     }
87 root 1.8 } else {
88 root 1.17 push @attrs, $_;
89 root 1.8 }
90     }
91 root 1.17 return $old ? $old->($package, $ref, @attrs) : @attrs;
92 root 1.8 };
93     }
94    
95     }
96    
97 root 1.43 =over 4
98    
99 root 1.8 =item $main
100 root 1.2
101 root 1.8 This coroutine represents the main program.
102 root 1.1
103     =cut
104    
105 pcg 1.55 $main = new Coro;
106 root 1.8
107 root 1.19 =item $current (or as function: current)
108 root 1.1
109 root 1.83 The current coroutine (the last coroutine switched to). The initial value
110     is C<$main> (of course).
111    
112     This variable is B<strictly> I<read-only>. It is provided for performance
113 root 1.124 reasons. If performance is not essential you are encouraged to use the
114 root 1.83 C<Coro::current> function instead.
115 root 1.1
116 root 1.8 =cut
117    
118 root 1.131 $main->{desc} = "[main::]";
119    
120 root 1.8 # maybe some other module used Coro::Specific before...
121 root 1.142 $main->{_specific} = $current->{_specific}
122 root 1.93 if $current;
123 root 1.1
124 root 1.93 _set_current $main;
125 root 1.19
126     sub current() { $current }
127 root 1.9
128     =item $idle
129    
130 root 1.83 A callback that is called whenever the scheduler finds no ready coroutines
131     to run. The default implementation prints "FATAL: deadlock detected" and
132 root 1.91 exits, because the program has no other way to continue.
133 root 1.83
134     This hook is overwritten by modules such as C<Coro::Timer> and
135 root 1.91 C<Coro::Event> to wait on an external event that hopefully wake up a
136     coroutine so the scheduler can run it.
137    
138     Please note that if your callback recursively invokes perl (e.g. for event
139     handlers), then it must be prepared to be called recursively.
140 root 1.9
141     =cut
142    
143 root 1.83 $idle = sub {
144 root 1.96 require Carp;
145     Carp::croak ("FATAL: deadlock detected");
146 root 1.9 };
147 root 1.8
148 root 1.103 sub _cancel {
149     my ($self) = @_;
150    
151     # free coroutine data and mark as destructed
152     $self->_destroy
153     or return;
154    
155     # call all destruction callbacks
156 root 1.142 $_->(@{$self->{_status}})
157     for @{(delete $self->{_on_destroy}) || []};
158 root 1.103 }
159    
160 root 1.24 # this coroutine is necessary because a coroutine
161     # cannot destroy itself.
162     my @destroy;
163 root 1.103 my $manager;
164    
165     $manager = new Coro sub {
166 pcg 1.57 while () {
167 root 1.103 (shift @destroy)->_cancel
168     while @destroy;
169    
170 root 1.24 &schedule;
171     }
172     };
173 root 1.132 $manager->desc ("[coro manager]");
174 root 1.103 $manager->prio (PRIO_MAX);
175    
176 root 1.8 # static methods. not really.
177 root 1.43
178     =back
179 root 1.8
180     =head2 STATIC METHODS
181    
182 root 1.92 Static methods are actually functions that operate on the current coroutine only.
183 root 1.8
184     =over 4
185    
186 root 1.13 =item async { ... } [@args...]
187 root 1.8
188 root 1.92 Create a new asynchronous coroutine and return it's coroutine object
189     (usually unused). When the sub returns the new coroutine is automatically
190 root 1.8 terminated.
191    
192 root 1.145 See the C<Coro::State::new> constructor for info about the coroutine
193     environment.
194    
195 root 1.122 Calling C<exit> in a coroutine will do the same as calling exit outside
196     the coroutine. Likewise, when the coroutine dies, the program will exit,
197     just as it would in the main program.
198 root 1.79
199 root 1.13 # create a new coroutine that just prints its arguments
200     async {
201     print "@_\n";
202     } 1,2,3,4;
203    
204 root 1.8 =cut
205    
206 root 1.13 sub async(&@) {
207 root 1.104 my $coro = new Coro @_;
208     $coro->ready;
209     $coro
210 root 1.8 }
211 root 1.1
212 root 1.105 =item async_pool { ... } [@args...]
213    
214     Similar to C<async>, but uses a coroutine pool, so you should not call
215     terminate or join (although you are allowed to), and you get a coroutine
216     that might have executed other code already (which can be good or bad :).
217    
218     Also, the block is executed in an C<eval> context and a warning will be
219 root 1.108 issued in case of an exception instead of terminating the program, as
220     C<async> does. As the coroutine is being reused, stuff like C<on_destroy>
221     will not work in the expected way, unless you call terminate or cancel,
222     which somehow defeats the purpose of pooling.
223 root 1.105
224 root 1.146 The priority will be reset to C<0> after each job, tracing will be
225     disabled, the description will be reset and the default output filehandle
226     gets restored, so you can change alkl these. Otherwise the coroutine will
227     be re-used "as-is": most notably if you change other per-coroutine global
228     stuff such as C<$/> you need to revert that change, which is most simply
229     done by using local as in C< local $/ >.
230 root 1.105
231     The pool size is limited to 8 idle coroutines (this can be adjusted by
232     changing $Coro::POOL_SIZE), and there can be as many non-idle coros as
233     required.
234    
235     If you are concerned about pooled coroutines growing a lot because a
236 root 1.133 single C<async_pool> used a lot of stackspace you can e.g. C<async_pool
237     { terminate }> once per second or so to slowly replenish the pool. In
238     addition to that, when the stacks used by a handler grows larger than 16kb
239 root 1.134 (adjustable with $Coro::POOL_RSS) it will also exit.
240 root 1.105
241     =cut
242    
243     our $POOL_SIZE = 8;
244 root 1.134 our $POOL_RSS = 16 * 1024;
245     our @async_pool;
246 root 1.105
247     sub pool_handler {
248 root 1.134 my $cb;
249    
250 root 1.105 while () {
251 root 1.134 eval {
252     while () {
253 root 1.136 _pool_1 $cb;
254 root 1.134 &$cb;
255 root 1.136 _pool_2 $cb;
256 root 1.135 &schedule;
257 root 1.134 }
258 root 1.105 };
259 root 1.134
260 root 1.136 last if $@ eq "\3terminate\2\n";
261 root 1.105 warn $@ if $@;
262 root 1.106 }
263     }
264 root 1.105
265     sub async_pool(&@) {
266     # this is also inlined into the unlock_scheduler
267 root 1.135 my $coro = (pop @async_pool) || new Coro \&pool_handler;
268 root 1.105
269     $coro->{_invoke} = [@_];
270     $coro->ready;
271    
272     $coro
273     }
274    
275 root 1.8 =item schedule
276 root 1.6
277 root 1.92 Calls the scheduler. Please note that the current coroutine will not be put
278 root 1.8 into the ready queue, so calling this function usually means you will
279 root 1.91 never be called again unless something else (e.g. an event handler) calls
280     ready.
281    
282     The canonical way to wait on external events is this:
283    
284     {
285 root 1.92 # remember current coroutine
286 root 1.91 my $current = $Coro::current;
287    
288     # register a hypothetical event handler
289     on_event_invoke sub {
290     # wake up sleeping coroutine
291     $current->ready;
292     undef $current;
293     };
294    
295 root 1.124 # call schedule until event occurred.
296 root 1.91 # in case we are woken up for other reasons
297     # (current still defined), loop.
298     Coro::schedule while $current;
299     }
300 root 1.1
301 root 1.22 =item cede
302 root 1.1
303 root 1.92 "Cede" to other coroutines. This function puts the current coroutine into the
304 root 1.22 ready queue and calls C<schedule>, which has the effect of giving up the
305     current "timeslice" to other coroutines of the same or higher priority.
306 root 1.7
307 root 1.107 Returns true if at least one coroutine switch has happened.
308    
309 root 1.102 =item Coro::cede_notself
310    
311     Works like cede, but is not exported by default and will cede to any
312     coroutine, regardless of priority, once.
313    
314 root 1.107 Returns true if at least one coroutine switch has happened.
315    
316 root 1.40 =item terminate [arg...]
317 root 1.7
318 root 1.92 Terminates the current coroutine with the given status values (see L<cancel>).
319 root 1.13
320 root 1.141 =item killall
321    
322     Kills/terminates/cancels all coroutines except the currently running
323     one. This is useful after a fork, either in the child or the parent, as
324     usually only one of them should inherit the running coroutines.
325    
326 root 1.1 =cut
327    
328 root 1.8 sub terminate {
329 pcg 1.59 $current->cancel (@_);
330 root 1.1 }
331 root 1.6
332 root 1.141 sub killall {
333     for (Coro::State::list) {
334     $_->cancel
335     if $_ != $current && UNIVERSAL::isa $_, "Coro";
336     }
337     }
338    
339 root 1.8 =back
340    
341     # dynamic methods
342    
343 root 1.92 =head2 COROUTINE METHODS
344 root 1.8
345 root 1.92 These are the methods you can call on coroutine objects.
346 root 1.6
347 root 1.8 =over 4
348    
349 root 1.13 =item new Coro \&sub [, @args...]
350 root 1.8
351 root 1.92 Create a new coroutine and return it. When the sub returns the coroutine
352 root 1.40 automatically terminates as if C<terminate> with the returned values were
353 root 1.92 called. To make the coroutine run you must first put it into the ready queue
354 root 1.41 by calling the ready method.
355 root 1.13
356 root 1.145 See C<async> and C<Coro::State::new> for additional info about the
357     coroutine environment.
358 root 1.89
359 root 1.6 =cut
360    
361 root 1.94 sub _run_coro {
362 root 1.13 terminate &{+shift};
363     }
364    
365 root 1.8 sub new {
366     my $class = shift;
367 root 1.83
368 root 1.94 $class->SUPER::new (\&_run_coro, @_)
369 root 1.8 }
370 root 1.6
371 root 1.92 =item $success = $coroutine->ready
372 root 1.1
373 root 1.92 Put the given coroutine into the ready queue (according to it's priority)
374     and return true. If the coroutine is already in the ready queue, do nothing
375 root 1.90 and return false.
376 root 1.1
377 root 1.92 =item $is_ready = $coroutine->is_ready
378 root 1.90
379 root 1.92 Return wether the coroutine is currently the ready queue or not,
380 root 1.28
381 root 1.92 =item $coroutine->cancel (arg...)
382 root 1.28
383 root 1.92 Terminates the given coroutine and makes it return the given arguments as
384 root 1.103 status (default: the empty list). Never returns if the coroutine is the
385     current coroutine.
386 root 1.28
387     =cut
388    
389     sub cancel {
390 pcg 1.59 my $self = shift;
391 root 1.142 $self->{_status} = [@_];
392 root 1.103
393     if ($current == $self) {
394     push @destroy, $self;
395     $manager->ready;
396     &schedule while 1;
397     } else {
398     $self->_cancel;
399     }
400 root 1.40 }
401    
402 root 1.92 =item $coroutine->join
403 root 1.40
404     Wait until the coroutine terminates and return any values given to the
405 root 1.143 C<terminate> or C<cancel> functions. C<join> can be called concurrently
406     from multiple coroutines.
407 root 1.40
408     =cut
409    
410     sub join {
411     my $self = shift;
412 root 1.103
413 root 1.142 unless ($self->{_status}) {
414 root 1.103 my $current = $current;
415    
416 root 1.142 push @{$self->{_on_destroy}}, sub {
417 root 1.103 $current->ready;
418     undef $current;
419     };
420    
421     &schedule while $current;
422 root 1.40 }
423 root 1.103
424 root 1.142 wantarray ? @{$self->{_status}} : $self->{_status}[0];
425 root 1.31 }
426    
427 root 1.101 =item $coroutine->on_destroy (\&cb)
428    
429     Registers a callback that is called when this coroutine gets destroyed,
430     but before it is joined. The callback gets passed the terminate arguments,
431     if any.
432    
433     =cut
434    
435     sub on_destroy {
436     my ($self, $cb) = @_;
437    
438 root 1.142 push @{ $self->{_on_destroy} }, $cb;
439 root 1.101 }
440    
441 root 1.92 =item $oldprio = $coroutine->prio ($newprio)
442 root 1.31
443 root 1.41 Sets (or gets, if the argument is missing) the priority of the
444 root 1.92 coroutine. Higher priority coroutines get run before lower priority
445     coroutines. Priorities are small signed integers (currently -4 .. +3),
446 root 1.41 that you can refer to using PRIO_xxx constants (use the import tag :prio
447     to get then):
448 root 1.31
449     PRIO_MAX > PRIO_HIGH > PRIO_NORMAL > PRIO_LOW > PRIO_IDLE > PRIO_MIN
450     3 > 1 > 0 > -1 > -3 > -4
451    
452     # set priority to HIGH
453     current->prio(PRIO_HIGH);
454    
455     The idle coroutine ($Coro::idle) always has a lower priority than any
456     existing coroutine.
457    
458 root 1.92 Changing the priority of the current coroutine will take effect immediately,
459     but changing the priority of coroutines in the ready queue (but not
460 root 1.31 running) will only take effect after the next schedule (of that
461 root 1.92 coroutine). This is a bug that will be fixed in some future version.
462 root 1.31
463 root 1.92 =item $newprio = $coroutine->nice ($change)
464 root 1.31
465     Similar to C<prio>, but subtract the given value from the priority (i.e.
466     higher values mean lower priority, just as in unix).
467    
468 root 1.92 =item $olddesc = $coroutine->desc ($newdesc)
469 root 1.41
470     Sets (or gets in case the argument is missing) the description for this
471 root 1.92 coroutine. This is just a free-form string you can associate with a coroutine.
472 root 1.41
473 root 1.142 This method simply sets the C<< $coroutine->{desc} >> member to the given string. You
474     can modify this member directly if you wish.
475    
476 root 1.41 =cut
477    
478     sub desc {
479     my $old = $_[0]{desc};
480     $_[0]{desc} = $_[1] if @_ > 1;
481     $old;
482 root 1.8 }
483 root 1.1
484 root 1.8 =back
485 root 1.2
486 root 1.97 =head2 GLOBAL FUNCTIONS
487 root 1.92
488     =over 4
489    
490 root 1.97 =item Coro::nready
491    
492     Returns the number of coroutines that are currently in the ready state,
493 root 1.124 i.e. that can be switched to. The value C<0> means that the only runnable
494 root 1.97 coroutine is the currently running one, so C<cede> would have no effect,
495     and C<schedule> would cause a deadlock unless there is an idle handler
496     that wakes up some coroutines.
497    
498 root 1.103 =item my $guard = Coro::guard { ... }
499    
500 root 1.119 This creates and returns a guard object. Nothing happens until the object
501 root 1.103 gets destroyed, in which case the codeblock given as argument will be
502     executed. This is useful to free locks or other resources in case of a
503     runtime error or when the coroutine gets canceled, as in both cases the
504     guard block will be executed. The guard object supports only one method,
505     C<< ->cancel >>, which will keep the codeblock from being executed.
506    
507     Example: set some flag and clear it again when the coroutine gets canceled
508     or the function returns:
509    
510     sub do_something {
511     my $guard = Coro::guard { $busy = 0 };
512     $busy = 1;
513    
514     # do something that requires $busy to be true
515     }
516    
517     =cut
518    
519     sub guard(&) {
520     bless \(my $cb = $_[0]), "Coro::guard"
521     }
522    
523     sub Coro::guard::cancel {
524     ${$_[0]} = sub { };
525     }
526    
527     sub Coro::guard::DESTROY {
528     ${$_[0]}->();
529     }
530    
531    
532 root 1.92 =item unblock_sub { ... }
533    
534     This utility function takes a BLOCK or code reference and "unblocks" it,
535     returning the new coderef. This means that the new coderef will return
536     immediately without blocking, returning nothing, while the original code
537     ref will be called (with parameters) from within its own coroutine.
538    
539 root 1.124 The reason this function exists is that many event libraries (such as the
540 root 1.92 venerable L<Event|Event> module) are not coroutine-safe (a weaker form
541     of thread-safety). This means you must not block within event callbacks,
542     otherwise you might suffer from crashes or worse.
543    
544     This function allows your callbacks to block by executing them in another
545     coroutine where it is safe to block. One example where blocking is handy
546     is when you use the L<Coro::AIO|Coro::AIO> functions to save results to
547     disk.
548    
549     In short: simply use C<unblock_sub { ... }> instead of C<sub { ... }> when
550     creating event callbacks that want to block.
551    
552     =cut
553    
554     our @unblock_queue;
555    
556 root 1.105 # we create a special coro because we want to cede,
557     # to reduce pressure on the coro pool (because most callbacks
558     # return immediately and can be reused) and because we cannot cede
559     # inside an event callback.
560 root 1.132 our $unblock_scheduler = new Coro sub {
561 root 1.92 while () {
562     while (my $cb = pop @unblock_queue) {
563 root 1.105 # this is an inlined copy of async_pool
564 root 1.134 my $coro = (pop @async_pool) || new Coro \&pool_handler;
565 root 1.105
566     $coro->{_invoke} = $cb;
567     $coro->ready;
568     cede; # for short-lived callbacks, this reduces pressure on the coro pool
569 root 1.92 }
570 root 1.105 schedule; # sleep well
571 root 1.92 }
572     };
573 root 1.132 $unblock_scheduler->desc ("[unblock_sub scheduler]");
574 root 1.92
575     sub unblock_sub(&) {
576     my $cb = shift;
577    
578     sub {
579 root 1.105 unshift @unblock_queue, [$cb, @_];
580 root 1.92 $unblock_scheduler->ready;
581     }
582     }
583    
584     =back
585    
586 root 1.8 =cut
587 root 1.2
588 root 1.8 1;
589 root 1.14
590 root 1.17 =head1 BUGS/LIMITATIONS
591 root 1.14
592 root 1.52 - you must make very sure that no coro is still active on global
593 root 1.53 destruction. very bad things might happen otherwise (usually segfaults).
594 root 1.52
595     - this module is not thread-safe. You should only ever use this module
596 root 1.124 from the same thread (this requirement might be loosened in the future
597 root 1.52 to allow per-thread schedulers, but Coro::State does not yet allow
598     this).
599 root 1.9
600     =head1 SEE ALSO
601    
602 root 1.147 Support/Utility: L<Coro::Specific>, L<Coro::State>, L<Coro::Util>.
603 root 1.67
604     Locking/IPC: L<Coro::Signal>, L<Coro::Channel>, L<Coro::Semaphore>, L<Coro::SemaphoreSet>, L<Coro::RWLock>.
605    
606     Event/IO: L<Coro::Timer>, L<Coro::Event>, L<Coro::Handle>, L<Coro::Socket>, L<Coro::Select>.
607    
608     Embedding: L<Coro:MakeMaker>
609 root 1.1
610     =head1 AUTHOR
611    
612 root 1.66 Marc Lehmann <schmorp@schmorp.de>
613 root 1.64 http://home.schmorp.de/
614 root 1.1
615     =cut
616