ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/Coro/Coro.pm
Revision: 1.142
Committed: Tue Oct 2 23:16:24 2007 UTC (16 years, 8 months ago) by root
Branch: MAIN
Changes since 1.141: +11 -8 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3 root 1.8 Coro - coroutine process abstraction
4 root 1.1
5     =head1 SYNOPSIS
6    
7     use Coro;
8    
9 root 1.8 async {
10     # some asynchronous thread of execution
11 root 1.2 };
12    
13 root 1.92 # alternatively create an async coroutine like this:
14 root 1.6
15 root 1.8 sub some_func : Coro {
16     # some more async code
17     }
18    
19 root 1.22 cede;
20 root 1.2
21 root 1.1 =head1 DESCRIPTION
22    
23 root 1.98 This module collection manages coroutines. Coroutines are similar
24     to threads but don't run in parallel at the same time even on SMP
25 root 1.124 machines. The specific flavor of coroutine used in this module also
26     guarantees you that it will not switch between coroutines unless
27 root 1.98 necessary, at easily-identified points in your program, so locking and
28     parallel access are rarely an issue, making coroutine programming much
29     safer than threads programming.
30    
31     (Perl, however, does not natively support real threads but instead does a
32     very slow and memory-intensive emulation of processes using threads. This
33     is a performance win on Windows machines, and a loss everywhere else).
34    
35     In this module, coroutines are defined as "callchain + lexical variables +
36     @_ + $_ + $@ + $/ + C stack), that is, a coroutine has its own callchain,
37     its own set of lexicals and its own set of perls most important global
38     variables.
39 root 1.22
40 root 1.8 =cut
41    
42     package Coro;
43    
44 root 1.71 use strict;
45     no warnings "uninitialized";
46 root 1.36
47 root 1.8 use Coro::State;
48    
49 root 1.83 use base qw(Coro::State Exporter);
50 pcg 1.55
51 root 1.83 our $idle; # idle handler
52 root 1.71 our $main; # main coroutine
53     our $current; # current coroutine
54 root 1.8
55 root 1.138 our $VERSION = '3.8';
56 root 1.8
57 root 1.105 our @EXPORT = qw(async async_pool cede schedule terminate current unblock_sub);
58 root 1.71 our %EXPORT_TAGS = (
59 root 1.31 prio => [qw(PRIO_MAX PRIO_HIGH PRIO_NORMAL PRIO_LOW PRIO_IDLE PRIO_MIN)],
60     );
61 root 1.97 our @EXPORT_OK = (@{$EXPORT_TAGS{prio}}, qw(nready));
62 root 1.8
63     {
64     my @async;
65 root 1.26 my $init;
66 root 1.8
67     # this way of handling attributes simply is NOT scalable ;()
68     sub import {
69 root 1.71 no strict 'refs';
70    
71 root 1.93 Coro->export_to_level (1, @_);
72 root 1.71
73 root 1.8 my $old = *{(caller)[0]."::MODIFY_CODE_ATTRIBUTES"}{CODE};
74     *{(caller)[0]."::MODIFY_CODE_ATTRIBUTES"} = sub {
75     my ($package, $ref) = (shift, shift);
76     my @attrs;
77     for (@_) {
78     if ($_ eq "Coro") {
79     push @async, $ref;
80 root 1.26 unless ($init++) {
81     eval q{
82     sub INIT {
83     &async(pop @async) while @async;
84     }
85     };
86     }
87 root 1.8 } else {
88 root 1.17 push @attrs, $_;
89 root 1.8 }
90     }
91 root 1.17 return $old ? $old->($package, $ref, @attrs) : @attrs;
92 root 1.8 };
93     }
94    
95     }
96    
97 root 1.43 =over 4
98    
99 root 1.8 =item $main
100 root 1.2
101 root 1.8 This coroutine represents the main program.
102 root 1.1
103     =cut
104    
105 pcg 1.55 $main = new Coro;
106 root 1.8
107 root 1.19 =item $current (or as function: current)
108 root 1.1
109 root 1.83 The current coroutine (the last coroutine switched to). The initial value
110     is C<$main> (of course).
111    
112     This variable is B<strictly> I<read-only>. It is provided for performance
113 root 1.124 reasons. If performance is not essential you are encouraged to use the
114 root 1.83 C<Coro::current> function instead.
115 root 1.1
116 root 1.8 =cut
117    
118 root 1.131 $main->{desc} = "[main::]";
119    
120 root 1.8 # maybe some other module used Coro::Specific before...
121 root 1.142 $main->{_specific} = $current->{_specific}
122 root 1.93 if $current;
123 root 1.1
124 root 1.93 _set_current $main;
125 root 1.19
126     sub current() { $current }
127 root 1.9
128     =item $idle
129    
130 root 1.83 A callback that is called whenever the scheduler finds no ready coroutines
131     to run. The default implementation prints "FATAL: deadlock detected" and
132 root 1.91 exits, because the program has no other way to continue.
133 root 1.83
134     This hook is overwritten by modules such as C<Coro::Timer> and
135 root 1.91 C<Coro::Event> to wait on an external event that hopefully wake up a
136     coroutine so the scheduler can run it.
137    
138     Please note that if your callback recursively invokes perl (e.g. for event
139     handlers), then it must be prepared to be called recursively.
140 root 1.9
141     =cut
142    
143 root 1.83 $idle = sub {
144 root 1.96 require Carp;
145     Carp::croak ("FATAL: deadlock detected");
146 root 1.9 };
147 root 1.8
148 root 1.103 sub _cancel {
149     my ($self) = @_;
150    
151     # free coroutine data and mark as destructed
152     $self->_destroy
153     or return;
154    
155     # call all destruction callbacks
156 root 1.142 $_->(@{$self->{_status}})
157     for @{(delete $self->{_on_destroy}) || []};
158 root 1.103 }
159    
160 root 1.24 # this coroutine is necessary because a coroutine
161     # cannot destroy itself.
162     my @destroy;
163 root 1.103 my $manager;
164    
165     $manager = new Coro sub {
166 pcg 1.57 while () {
167 root 1.103 (shift @destroy)->_cancel
168     while @destroy;
169    
170 root 1.24 &schedule;
171     }
172     };
173 root 1.132 $manager->desc ("[coro manager]");
174 root 1.103 $manager->prio (PRIO_MAX);
175    
176 root 1.8 # static methods. not really.
177 root 1.43
178     =back
179 root 1.8
180     =head2 STATIC METHODS
181    
182 root 1.92 Static methods are actually functions that operate on the current coroutine only.
183 root 1.8
184     =over 4
185    
186 root 1.13 =item async { ... } [@args...]
187 root 1.8
188 root 1.92 Create a new asynchronous coroutine and return it's coroutine object
189     (usually unused). When the sub returns the new coroutine is automatically
190 root 1.8 terminated.
191    
192 root 1.122 Calling C<exit> in a coroutine will do the same as calling exit outside
193     the coroutine. Likewise, when the coroutine dies, the program will exit,
194     just as it would in the main program.
195 root 1.79
196 root 1.13 # create a new coroutine that just prints its arguments
197     async {
198     print "@_\n";
199     } 1,2,3,4;
200    
201 root 1.8 =cut
202    
203 root 1.13 sub async(&@) {
204 root 1.104 my $coro = new Coro @_;
205     $coro->ready;
206     $coro
207 root 1.8 }
208 root 1.1
209 root 1.105 =item async_pool { ... } [@args...]
210    
211     Similar to C<async>, but uses a coroutine pool, so you should not call
212     terminate or join (although you are allowed to), and you get a coroutine
213     that might have executed other code already (which can be good or bad :).
214    
215     Also, the block is executed in an C<eval> context and a warning will be
216 root 1.108 issued in case of an exception instead of terminating the program, as
217     C<async> does. As the coroutine is being reused, stuff like C<on_destroy>
218     will not work in the expected way, unless you call terminate or cancel,
219     which somehow defeats the purpose of pooling.
220 root 1.105
221     The priority will be reset to C<0> after each job, otherwise the coroutine
222     will be re-used "as-is".
223    
224     The pool size is limited to 8 idle coroutines (this can be adjusted by
225     changing $Coro::POOL_SIZE), and there can be as many non-idle coros as
226     required.
227    
228     If you are concerned about pooled coroutines growing a lot because a
229 root 1.133 single C<async_pool> used a lot of stackspace you can e.g. C<async_pool
230     { terminate }> once per second or so to slowly replenish the pool. In
231     addition to that, when the stacks used by a handler grows larger than 16kb
232 root 1.134 (adjustable with $Coro::POOL_RSS) it will also exit.
233 root 1.105
234     =cut
235    
236     our $POOL_SIZE = 8;
237 root 1.134 our $POOL_RSS = 16 * 1024;
238     our @async_pool;
239 root 1.105
240     sub pool_handler {
241 root 1.134 my $cb;
242    
243 root 1.105 while () {
244 root 1.134 eval {
245     while () {
246 root 1.136 _pool_1 $cb;
247 root 1.134 &$cb;
248 root 1.136 _pool_2 $cb;
249 root 1.135 &schedule;
250 root 1.134 }
251 root 1.105 };
252 root 1.134
253 root 1.136 last if $@ eq "\3terminate\2\n";
254 root 1.105 warn $@ if $@;
255 root 1.106 }
256     }
257 root 1.105
258     sub async_pool(&@) {
259     # this is also inlined into the unlock_scheduler
260 root 1.135 my $coro = (pop @async_pool) || new Coro \&pool_handler;
261 root 1.105
262     $coro->{_invoke} = [@_];
263     $coro->ready;
264    
265     $coro
266     }
267    
268 root 1.8 =item schedule
269 root 1.6
270 root 1.92 Calls the scheduler. Please note that the current coroutine will not be put
271 root 1.8 into the ready queue, so calling this function usually means you will
272 root 1.91 never be called again unless something else (e.g. an event handler) calls
273     ready.
274    
275     The canonical way to wait on external events is this:
276    
277     {
278 root 1.92 # remember current coroutine
279 root 1.91 my $current = $Coro::current;
280    
281     # register a hypothetical event handler
282     on_event_invoke sub {
283     # wake up sleeping coroutine
284     $current->ready;
285     undef $current;
286     };
287    
288 root 1.124 # call schedule until event occurred.
289 root 1.91 # in case we are woken up for other reasons
290     # (current still defined), loop.
291     Coro::schedule while $current;
292     }
293 root 1.1
294 root 1.22 =item cede
295 root 1.1
296 root 1.92 "Cede" to other coroutines. This function puts the current coroutine into the
297 root 1.22 ready queue and calls C<schedule>, which has the effect of giving up the
298     current "timeslice" to other coroutines of the same or higher priority.
299 root 1.7
300 root 1.107 Returns true if at least one coroutine switch has happened.
301    
302 root 1.102 =item Coro::cede_notself
303    
304     Works like cede, but is not exported by default and will cede to any
305     coroutine, regardless of priority, once.
306    
307 root 1.107 Returns true if at least one coroutine switch has happened.
308    
309 root 1.40 =item terminate [arg...]
310 root 1.7
311 root 1.92 Terminates the current coroutine with the given status values (see L<cancel>).
312 root 1.13
313 root 1.141 =item killall
314    
315     Kills/terminates/cancels all coroutines except the currently running
316     one. This is useful after a fork, either in the child or the parent, as
317     usually only one of them should inherit the running coroutines.
318    
319 root 1.1 =cut
320    
321 root 1.8 sub terminate {
322 pcg 1.59 $current->cancel (@_);
323 root 1.1 }
324 root 1.6
325 root 1.141 sub killall {
326     for (Coro::State::list) {
327     $_->cancel
328     if $_ != $current && UNIVERSAL::isa $_, "Coro";
329     }
330     }
331    
332 root 1.8 =back
333    
334     # dynamic methods
335    
336 root 1.92 =head2 COROUTINE METHODS
337 root 1.8
338 root 1.92 These are the methods you can call on coroutine objects.
339 root 1.6
340 root 1.8 =over 4
341    
342 root 1.13 =item new Coro \&sub [, @args...]
343 root 1.8
344 root 1.92 Create a new coroutine and return it. When the sub returns the coroutine
345 root 1.40 automatically terminates as if C<terminate> with the returned values were
346 root 1.92 called. To make the coroutine run you must first put it into the ready queue
347 root 1.41 by calling the ready method.
348 root 1.13
349 root 1.121 See C<async> for additional discussion.
350 root 1.89
351 root 1.6 =cut
352    
353 root 1.94 sub _run_coro {
354 root 1.13 terminate &{+shift};
355     }
356    
357 root 1.8 sub new {
358     my $class = shift;
359 root 1.83
360 root 1.94 $class->SUPER::new (\&_run_coro, @_)
361 root 1.8 }
362 root 1.6
363 root 1.92 =item $success = $coroutine->ready
364 root 1.1
365 root 1.92 Put the given coroutine into the ready queue (according to it's priority)
366     and return true. If the coroutine is already in the ready queue, do nothing
367 root 1.90 and return false.
368 root 1.1
369 root 1.92 =item $is_ready = $coroutine->is_ready
370 root 1.90
371 root 1.92 Return wether the coroutine is currently the ready queue or not,
372 root 1.28
373 root 1.92 =item $coroutine->cancel (arg...)
374 root 1.28
375 root 1.92 Terminates the given coroutine and makes it return the given arguments as
376 root 1.103 status (default: the empty list). Never returns if the coroutine is the
377     current coroutine.
378 root 1.28
379     =cut
380    
381     sub cancel {
382 pcg 1.59 my $self = shift;
383 root 1.142 $self->{_status} = [@_];
384 root 1.103
385     if ($current == $self) {
386     push @destroy, $self;
387     $manager->ready;
388     &schedule while 1;
389     } else {
390     $self->_cancel;
391     }
392 root 1.40 }
393    
394 root 1.92 =item $coroutine->join
395 root 1.40
396     Wait until the coroutine terminates and return any values given to the
397 pcg 1.59 C<terminate> or C<cancel> functions. C<join> can be called multiple times
398 root 1.92 from multiple coroutine.
399 root 1.40
400     =cut
401    
402     sub join {
403     my $self = shift;
404 root 1.103
405 root 1.142 unless ($self->{_status}) {
406 root 1.103 my $current = $current;
407    
408 root 1.142 push @{$self->{_on_destroy}}, sub {
409 root 1.103 $current->ready;
410     undef $current;
411     };
412    
413     &schedule while $current;
414 root 1.40 }
415 root 1.103
416 root 1.142 wantarray ? @{$self->{_status}} : $self->{_status}[0];
417 root 1.31 }
418    
419 root 1.101 =item $coroutine->on_destroy (\&cb)
420    
421     Registers a callback that is called when this coroutine gets destroyed,
422     but before it is joined. The callback gets passed the terminate arguments,
423     if any.
424    
425     =cut
426    
427     sub on_destroy {
428     my ($self, $cb) = @_;
429    
430 root 1.142 push @{ $self->{_on_destroy} }, $cb;
431 root 1.101 }
432    
433 root 1.92 =item $oldprio = $coroutine->prio ($newprio)
434 root 1.31
435 root 1.41 Sets (or gets, if the argument is missing) the priority of the
436 root 1.92 coroutine. Higher priority coroutines get run before lower priority
437     coroutines. Priorities are small signed integers (currently -4 .. +3),
438 root 1.41 that you can refer to using PRIO_xxx constants (use the import tag :prio
439     to get then):
440 root 1.31
441     PRIO_MAX > PRIO_HIGH > PRIO_NORMAL > PRIO_LOW > PRIO_IDLE > PRIO_MIN
442     3 > 1 > 0 > -1 > -3 > -4
443    
444     # set priority to HIGH
445     current->prio(PRIO_HIGH);
446    
447     The idle coroutine ($Coro::idle) always has a lower priority than any
448     existing coroutine.
449    
450 root 1.92 Changing the priority of the current coroutine will take effect immediately,
451     but changing the priority of coroutines in the ready queue (but not
452 root 1.31 running) will only take effect after the next schedule (of that
453 root 1.92 coroutine). This is a bug that will be fixed in some future version.
454 root 1.31
455 root 1.92 =item $newprio = $coroutine->nice ($change)
456 root 1.31
457     Similar to C<prio>, but subtract the given value from the priority (i.e.
458     higher values mean lower priority, just as in unix).
459    
460 root 1.92 =item $olddesc = $coroutine->desc ($newdesc)
461 root 1.41
462     Sets (or gets in case the argument is missing) the description for this
463 root 1.92 coroutine. This is just a free-form string you can associate with a coroutine.
464 root 1.41
465 root 1.142 This method simply sets the C<< $coroutine->{desc} >> member to the given string. You
466     can modify this member directly if you wish.
467    
468 root 1.41 =cut
469    
470     sub desc {
471     my $old = $_[0]{desc};
472     $_[0]{desc} = $_[1] if @_ > 1;
473     $old;
474 root 1.8 }
475 root 1.1
476 root 1.8 =back
477 root 1.2
478 root 1.97 =head2 GLOBAL FUNCTIONS
479 root 1.92
480     =over 4
481    
482 root 1.97 =item Coro::nready
483    
484     Returns the number of coroutines that are currently in the ready state,
485 root 1.124 i.e. that can be switched to. The value C<0> means that the only runnable
486 root 1.97 coroutine is the currently running one, so C<cede> would have no effect,
487     and C<schedule> would cause a deadlock unless there is an idle handler
488     that wakes up some coroutines.
489    
490 root 1.103 =item my $guard = Coro::guard { ... }
491    
492 root 1.119 This creates and returns a guard object. Nothing happens until the object
493 root 1.103 gets destroyed, in which case the codeblock given as argument will be
494     executed. This is useful to free locks or other resources in case of a
495     runtime error or when the coroutine gets canceled, as in both cases the
496     guard block will be executed. The guard object supports only one method,
497     C<< ->cancel >>, which will keep the codeblock from being executed.
498    
499     Example: set some flag and clear it again when the coroutine gets canceled
500     or the function returns:
501    
502     sub do_something {
503     my $guard = Coro::guard { $busy = 0 };
504     $busy = 1;
505    
506     # do something that requires $busy to be true
507     }
508    
509     =cut
510    
511     sub guard(&) {
512     bless \(my $cb = $_[0]), "Coro::guard"
513     }
514    
515     sub Coro::guard::cancel {
516     ${$_[0]} = sub { };
517     }
518    
519     sub Coro::guard::DESTROY {
520     ${$_[0]}->();
521     }
522    
523    
524 root 1.92 =item unblock_sub { ... }
525    
526     This utility function takes a BLOCK or code reference and "unblocks" it,
527     returning the new coderef. This means that the new coderef will return
528     immediately without blocking, returning nothing, while the original code
529     ref will be called (with parameters) from within its own coroutine.
530    
531 root 1.124 The reason this function exists is that many event libraries (such as the
532 root 1.92 venerable L<Event|Event> module) are not coroutine-safe (a weaker form
533     of thread-safety). This means you must not block within event callbacks,
534     otherwise you might suffer from crashes or worse.
535    
536     This function allows your callbacks to block by executing them in another
537     coroutine where it is safe to block. One example where blocking is handy
538     is when you use the L<Coro::AIO|Coro::AIO> functions to save results to
539     disk.
540    
541     In short: simply use C<unblock_sub { ... }> instead of C<sub { ... }> when
542     creating event callbacks that want to block.
543    
544     =cut
545    
546     our @unblock_queue;
547    
548 root 1.105 # we create a special coro because we want to cede,
549     # to reduce pressure on the coro pool (because most callbacks
550     # return immediately and can be reused) and because we cannot cede
551     # inside an event callback.
552 root 1.132 our $unblock_scheduler = new Coro sub {
553 root 1.92 while () {
554     while (my $cb = pop @unblock_queue) {
555 root 1.105 # this is an inlined copy of async_pool
556 root 1.134 my $coro = (pop @async_pool) || new Coro \&pool_handler;
557 root 1.105
558     $coro->{_invoke} = $cb;
559     $coro->ready;
560     cede; # for short-lived callbacks, this reduces pressure on the coro pool
561 root 1.92 }
562 root 1.105 schedule; # sleep well
563 root 1.92 }
564     };
565 root 1.132 $unblock_scheduler->desc ("[unblock_sub scheduler]");
566 root 1.92
567     sub unblock_sub(&) {
568     my $cb = shift;
569    
570     sub {
571 root 1.105 unshift @unblock_queue, [$cb, @_];
572 root 1.92 $unblock_scheduler->ready;
573     }
574     }
575    
576     =back
577    
578 root 1.8 =cut
579 root 1.2
580 root 1.8 1;
581 root 1.14
582 root 1.17 =head1 BUGS/LIMITATIONS
583 root 1.14
584 root 1.52 - you must make very sure that no coro is still active on global
585 root 1.53 destruction. very bad things might happen otherwise (usually segfaults).
586 root 1.52
587     - this module is not thread-safe. You should only ever use this module
588 root 1.124 from the same thread (this requirement might be loosened in the future
589 root 1.52 to allow per-thread schedulers, but Coro::State does not yet allow
590     this).
591 root 1.9
592     =head1 SEE ALSO
593    
594 root 1.67 Support/Utility: L<Coro::Cont>, L<Coro::Specific>, L<Coro::State>, L<Coro::Util>.
595    
596     Locking/IPC: L<Coro::Signal>, L<Coro::Channel>, L<Coro::Semaphore>, L<Coro::SemaphoreSet>, L<Coro::RWLock>.
597    
598     Event/IO: L<Coro::Timer>, L<Coro::Event>, L<Coro::Handle>, L<Coro::Socket>, L<Coro::Select>.
599    
600     Embedding: L<Coro:MakeMaker>
601 root 1.1
602     =head1 AUTHOR
603    
604 root 1.66 Marc Lehmann <schmorp@schmorp.de>
605 root 1.64 http://home.schmorp.de/
606 root 1.1
607     =cut
608