ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/Coro/Coro.pm
Revision: 1.131
Committed: Thu Sep 20 12:24:42 2007 UTC (16 years, 8 months ago) by root
Branch: MAIN
Changes since 1.130: +2 -0 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3 root 1.8 Coro - coroutine process abstraction
4 root 1.1
5     =head1 SYNOPSIS
6    
7     use Coro;
8    
9 root 1.8 async {
10     # some asynchronous thread of execution
11 root 1.2 };
12    
13 root 1.92 # alternatively create an async coroutine like this:
14 root 1.6
15 root 1.8 sub some_func : Coro {
16     # some more async code
17     }
18    
19 root 1.22 cede;
20 root 1.2
21 root 1.1 =head1 DESCRIPTION
22    
23 root 1.98 This module collection manages coroutines. Coroutines are similar
24     to threads but don't run in parallel at the same time even on SMP
25 root 1.124 machines. The specific flavor of coroutine used in this module also
26     guarantees you that it will not switch between coroutines unless
27 root 1.98 necessary, at easily-identified points in your program, so locking and
28     parallel access are rarely an issue, making coroutine programming much
29     safer than threads programming.
30    
31     (Perl, however, does not natively support real threads but instead does a
32     very slow and memory-intensive emulation of processes using threads. This
33     is a performance win on Windows machines, and a loss everywhere else).
34    
35     In this module, coroutines are defined as "callchain + lexical variables +
36     @_ + $_ + $@ + $/ + C stack), that is, a coroutine has its own callchain,
37     its own set of lexicals and its own set of perls most important global
38     variables.
39 root 1.22
40 root 1.8 =cut
41    
42     package Coro;
43    
44 root 1.71 use strict;
45     no warnings "uninitialized";
46 root 1.36
47 root 1.8 use Coro::State;
48    
49 root 1.83 use base qw(Coro::State Exporter);
50 pcg 1.55
51 root 1.83 our $idle; # idle handler
52 root 1.71 our $main; # main coroutine
53     our $current; # current coroutine
54 root 1.8
55 root 1.128 our $VERSION = '3.7';
56 root 1.8
57 root 1.105 our @EXPORT = qw(async async_pool cede schedule terminate current unblock_sub);
58 root 1.71 our %EXPORT_TAGS = (
59 root 1.31 prio => [qw(PRIO_MAX PRIO_HIGH PRIO_NORMAL PRIO_LOW PRIO_IDLE PRIO_MIN)],
60     );
61 root 1.97 our @EXPORT_OK = (@{$EXPORT_TAGS{prio}}, qw(nready));
62 root 1.8
63     {
64     my @async;
65 root 1.26 my $init;
66 root 1.8
67     # this way of handling attributes simply is NOT scalable ;()
68     sub import {
69 root 1.71 no strict 'refs';
70    
71 root 1.93 Coro->export_to_level (1, @_);
72 root 1.71
73 root 1.8 my $old = *{(caller)[0]."::MODIFY_CODE_ATTRIBUTES"}{CODE};
74     *{(caller)[0]."::MODIFY_CODE_ATTRIBUTES"} = sub {
75     my ($package, $ref) = (shift, shift);
76     my @attrs;
77     for (@_) {
78     if ($_ eq "Coro") {
79     push @async, $ref;
80 root 1.26 unless ($init++) {
81     eval q{
82     sub INIT {
83     &async(pop @async) while @async;
84     }
85     };
86     }
87 root 1.8 } else {
88 root 1.17 push @attrs, $_;
89 root 1.8 }
90     }
91 root 1.17 return $old ? $old->($package, $ref, @attrs) : @attrs;
92 root 1.8 };
93     }
94    
95     }
96    
97 root 1.43 =over 4
98    
99 root 1.8 =item $main
100 root 1.2
101 root 1.8 This coroutine represents the main program.
102 root 1.1
103     =cut
104    
105 pcg 1.55 $main = new Coro;
106 root 1.8
107 root 1.19 =item $current (or as function: current)
108 root 1.1
109 root 1.83 The current coroutine (the last coroutine switched to). The initial value
110     is C<$main> (of course).
111    
112     This variable is B<strictly> I<read-only>. It is provided for performance
113 root 1.124 reasons. If performance is not essential you are encouraged to use the
114 root 1.83 C<Coro::current> function instead.
115 root 1.1
116 root 1.8 =cut
117    
118 root 1.131 $main->{desc} = "[main::]";
119    
120 root 1.8 # maybe some other module used Coro::Specific before...
121 root 1.93 $main->{specific} = $current->{specific}
122     if $current;
123 root 1.1
124 root 1.93 _set_current $main;
125 root 1.19
126     sub current() { $current }
127 root 1.9
128     =item $idle
129    
130 root 1.83 A callback that is called whenever the scheduler finds no ready coroutines
131     to run. The default implementation prints "FATAL: deadlock detected" and
132 root 1.91 exits, because the program has no other way to continue.
133 root 1.83
134     This hook is overwritten by modules such as C<Coro::Timer> and
135 root 1.91 C<Coro::Event> to wait on an external event that hopefully wake up a
136     coroutine so the scheduler can run it.
137    
138     Please note that if your callback recursively invokes perl (e.g. for event
139     handlers), then it must be prepared to be called recursively.
140 root 1.9
141     =cut
142    
143 root 1.83 $idle = sub {
144 root 1.96 require Carp;
145     Carp::croak ("FATAL: deadlock detected");
146 root 1.9 };
147 root 1.8
148 root 1.103 sub _cancel {
149     my ($self) = @_;
150    
151     # free coroutine data and mark as destructed
152     $self->_destroy
153     or return;
154    
155     # call all destruction callbacks
156     $_->(@{$self->{status}})
157     for @{(delete $self->{destroy_cb}) || []};
158     }
159    
160 root 1.24 # this coroutine is necessary because a coroutine
161     # cannot destroy itself.
162     my @destroy;
163 root 1.103 my $manager;
164    
165     $manager = new Coro sub {
166 root 1.129 $current->desc ("[coro manager]");
167    
168 pcg 1.57 while () {
169 root 1.103 (shift @destroy)->_cancel
170     while @destroy;
171    
172 root 1.24 &schedule;
173     }
174     };
175    
176 root 1.103 $manager->prio (PRIO_MAX);
177    
178 root 1.8 # static methods. not really.
179 root 1.43
180     =back
181 root 1.8
182     =head2 STATIC METHODS
183    
184 root 1.92 Static methods are actually functions that operate on the current coroutine only.
185 root 1.8
186     =over 4
187    
188 root 1.13 =item async { ... } [@args...]
189 root 1.8
190 root 1.92 Create a new asynchronous coroutine and return it's coroutine object
191     (usually unused). When the sub returns the new coroutine is automatically
192 root 1.8 terminated.
193    
194 root 1.122 Calling C<exit> in a coroutine will do the same as calling exit outside
195     the coroutine. Likewise, when the coroutine dies, the program will exit,
196     just as it would in the main program.
197 root 1.79
198 root 1.13 # create a new coroutine that just prints its arguments
199     async {
200     print "@_\n";
201     } 1,2,3,4;
202    
203 root 1.8 =cut
204    
205 root 1.13 sub async(&@) {
206 root 1.104 my $coro = new Coro @_;
207     $coro->ready;
208     $coro
209 root 1.8 }
210 root 1.1
211 root 1.105 =item async_pool { ... } [@args...]
212    
213     Similar to C<async>, but uses a coroutine pool, so you should not call
214     terminate or join (although you are allowed to), and you get a coroutine
215     that might have executed other code already (which can be good or bad :).
216    
217     Also, the block is executed in an C<eval> context and a warning will be
218 root 1.108 issued in case of an exception instead of terminating the program, as
219     C<async> does. As the coroutine is being reused, stuff like C<on_destroy>
220     will not work in the expected way, unless you call terminate or cancel,
221     which somehow defeats the purpose of pooling.
222 root 1.105
223     The priority will be reset to C<0> after each job, otherwise the coroutine
224     will be re-used "as-is".
225    
226     The pool size is limited to 8 idle coroutines (this can be adjusted by
227     changing $Coro::POOL_SIZE), and there can be as many non-idle coros as
228     required.
229    
230     If you are concerned about pooled coroutines growing a lot because a
231     single C<async_pool> used a lot of stackspace you can e.g. C<async_pool {
232     terminate }> once per second or so to slowly replenish the pool.
233    
234     =cut
235    
236     our $POOL_SIZE = 8;
237 root 1.130 our $MAX_POOL_RSS = 64 * 1024;
238 root 1.105 our @pool;
239    
240     sub pool_handler {
241     while () {
242 root 1.129 $current->{desc} = "[async_pool]";
243    
244 root 1.105 eval {
245 root 1.110 my ($cb, @arg) = @{ delete $current->{_invoke} or return };
246 root 1.105 $cb->(@arg);
247     };
248     warn $@ if $@;
249    
250 root 1.130 last if @pool >= $POOL_SIZE || $current->rss >= $MAX_POOL_RSS;
251 root 1.129
252 root 1.105 push @pool, $current;
253 root 1.129 $current->{desc} = "[async_pool idle]";
254 root 1.118 $current->save (Coro::State::SAVE_DEF);
255 root 1.105 $current->prio (0);
256     schedule;
257 root 1.106 }
258     }
259 root 1.105
260     sub async_pool(&@) {
261     # this is also inlined into the unlock_scheduler
262 root 1.129 my $coro = (pop @pool) || new Coro \&pool_handler;;
263 root 1.105
264     $coro->{_invoke} = [@_];
265     $coro->ready;
266    
267     $coro
268     }
269    
270 root 1.8 =item schedule
271 root 1.6
272 root 1.92 Calls the scheduler. Please note that the current coroutine will not be put
273 root 1.8 into the ready queue, so calling this function usually means you will
274 root 1.91 never be called again unless something else (e.g. an event handler) calls
275     ready.
276    
277     The canonical way to wait on external events is this:
278    
279     {
280 root 1.92 # remember current coroutine
281 root 1.91 my $current = $Coro::current;
282    
283     # register a hypothetical event handler
284     on_event_invoke sub {
285     # wake up sleeping coroutine
286     $current->ready;
287     undef $current;
288     };
289    
290 root 1.124 # call schedule until event occurred.
291 root 1.91 # in case we are woken up for other reasons
292     # (current still defined), loop.
293     Coro::schedule while $current;
294     }
295 root 1.1
296 root 1.22 =item cede
297 root 1.1
298 root 1.92 "Cede" to other coroutines. This function puts the current coroutine into the
299 root 1.22 ready queue and calls C<schedule>, which has the effect of giving up the
300     current "timeslice" to other coroutines of the same or higher priority.
301 root 1.7
302 root 1.107 Returns true if at least one coroutine switch has happened.
303    
304 root 1.102 =item Coro::cede_notself
305    
306     Works like cede, but is not exported by default and will cede to any
307     coroutine, regardless of priority, once.
308    
309 root 1.107 Returns true if at least one coroutine switch has happened.
310    
311 root 1.40 =item terminate [arg...]
312 root 1.7
313 root 1.92 Terminates the current coroutine with the given status values (see L<cancel>).
314 root 1.13
315 root 1.1 =cut
316    
317 root 1.8 sub terminate {
318 pcg 1.59 $current->cancel (@_);
319 root 1.1 }
320 root 1.6
321 root 1.8 =back
322    
323     # dynamic methods
324    
325 root 1.92 =head2 COROUTINE METHODS
326 root 1.8
327 root 1.92 These are the methods you can call on coroutine objects.
328 root 1.6
329 root 1.8 =over 4
330    
331 root 1.13 =item new Coro \&sub [, @args...]
332 root 1.8
333 root 1.92 Create a new coroutine and return it. When the sub returns the coroutine
334 root 1.40 automatically terminates as if C<terminate> with the returned values were
335 root 1.92 called. To make the coroutine run you must first put it into the ready queue
336 root 1.41 by calling the ready method.
337 root 1.13
338 root 1.121 See C<async> for additional discussion.
339 root 1.89
340 root 1.6 =cut
341    
342 root 1.94 sub _run_coro {
343 root 1.13 terminate &{+shift};
344     }
345    
346 root 1.8 sub new {
347     my $class = shift;
348 root 1.83
349 root 1.94 $class->SUPER::new (\&_run_coro, @_)
350 root 1.8 }
351 root 1.6
352 root 1.92 =item $success = $coroutine->ready
353 root 1.1
354 root 1.92 Put the given coroutine into the ready queue (according to it's priority)
355     and return true. If the coroutine is already in the ready queue, do nothing
356 root 1.90 and return false.
357 root 1.1
358 root 1.92 =item $is_ready = $coroutine->is_ready
359 root 1.90
360 root 1.92 Return wether the coroutine is currently the ready queue or not,
361 root 1.28
362 root 1.92 =item $coroutine->cancel (arg...)
363 root 1.28
364 root 1.92 Terminates the given coroutine and makes it return the given arguments as
365 root 1.103 status (default: the empty list). Never returns if the coroutine is the
366     current coroutine.
367 root 1.28
368     =cut
369    
370     sub cancel {
371 pcg 1.59 my $self = shift;
372     $self->{status} = [@_];
373 root 1.103
374     if ($current == $self) {
375     push @destroy, $self;
376     $manager->ready;
377     &schedule while 1;
378     } else {
379     $self->_cancel;
380     }
381 root 1.40 }
382    
383 root 1.92 =item $coroutine->join
384 root 1.40
385     Wait until the coroutine terminates and return any values given to the
386 pcg 1.59 C<terminate> or C<cancel> functions. C<join> can be called multiple times
387 root 1.92 from multiple coroutine.
388 root 1.40
389     =cut
390    
391     sub join {
392     my $self = shift;
393 root 1.103
394 root 1.40 unless ($self->{status}) {
395 root 1.103 my $current = $current;
396    
397     push @{$self->{destroy_cb}}, sub {
398     $current->ready;
399     undef $current;
400     };
401    
402     &schedule while $current;
403 root 1.40 }
404 root 1.103
405 root 1.40 wantarray ? @{$self->{status}} : $self->{status}[0];
406 root 1.31 }
407    
408 root 1.101 =item $coroutine->on_destroy (\&cb)
409    
410     Registers a callback that is called when this coroutine gets destroyed,
411     but before it is joined. The callback gets passed the terminate arguments,
412     if any.
413    
414     =cut
415    
416     sub on_destroy {
417     my ($self, $cb) = @_;
418    
419     push @{ $self->{destroy_cb} }, $cb;
420     }
421    
422 root 1.92 =item $oldprio = $coroutine->prio ($newprio)
423 root 1.31
424 root 1.41 Sets (or gets, if the argument is missing) the priority of the
425 root 1.92 coroutine. Higher priority coroutines get run before lower priority
426     coroutines. Priorities are small signed integers (currently -4 .. +3),
427 root 1.41 that you can refer to using PRIO_xxx constants (use the import tag :prio
428     to get then):
429 root 1.31
430     PRIO_MAX > PRIO_HIGH > PRIO_NORMAL > PRIO_LOW > PRIO_IDLE > PRIO_MIN
431     3 > 1 > 0 > -1 > -3 > -4
432    
433     # set priority to HIGH
434     current->prio(PRIO_HIGH);
435    
436     The idle coroutine ($Coro::idle) always has a lower priority than any
437     existing coroutine.
438    
439 root 1.92 Changing the priority of the current coroutine will take effect immediately,
440     but changing the priority of coroutines in the ready queue (but not
441 root 1.31 running) will only take effect after the next schedule (of that
442 root 1.92 coroutine). This is a bug that will be fixed in some future version.
443 root 1.31
444 root 1.92 =item $newprio = $coroutine->nice ($change)
445 root 1.31
446     Similar to C<prio>, but subtract the given value from the priority (i.e.
447     higher values mean lower priority, just as in unix).
448    
449 root 1.92 =item $olddesc = $coroutine->desc ($newdesc)
450 root 1.41
451     Sets (or gets in case the argument is missing) the description for this
452 root 1.92 coroutine. This is just a free-form string you can associate with a coroutine.
453 root 1.41
454     =cut
455    
456     sub desc {
457     my $old = $_[0]{desc};
458     $_[0]{desc} = $_[1] if @_ > 1;
459     $old;
460 root 1.8 }
461 root 1.1
462 root 1.8 =back
463 root 1.2
464 root 1.97 =head2 GLOBAL FUNCTIONS
465 root 1.92
466     =over 4
467    
468 root 1.97 =item Coro::nready
469    
470     Returns the number of coroutines that are currently in the ready state,
471 root 1.124 i.e. that can be switched to. The value C<0> means that the only runnable
472 root 1.97 coroutine is the currently running one, so C<cede> would have no effect,
473     and C<schedule> would cause a deadlock unless there is an idle handler
474     that wakes up some coroutines.
475    
476 root 1.103 =item my $guard = Coro::guard { ... }
477    
478 root 1.119 This creates and returns a guard object. Nothing happens until the object
479 root 1.103 gets destroyed, in which case the codeblock given as argument will be
480     executed. This is useful to free locks or other resources in case of a
481     runtime error or when the coroutine gets canceled, as in both cases the
482     guard block will be executed. The guard object supports only one method,
483     C<< ->cancel >>, which will keep the codeblock from being executed.
484    
485     Example: set some flag and clear it again when the coroutine gets canceled
486     or the function returns:
487    
488     sub do_something {
489     my $guard = Coro::guard { $busy = 0 };
490     $busy = 1;
491    
492     # do something that requires $busy to be true
493     }
494    
495     =cut
496    
497     sub guard(&) {
498     bless \(my $cb = $_[0]), "Coro::guard"
499     }
500    
501     sub Coro::guard::cancel {
502     ${$_[0]} = sub { };
503     }
504    
505     sub Coro::guard::DESTROY {
506     ${$_[0]}->();
507     }
508    
509    
510 root 1.92 =item unblock_sub { ... }
511    
512     This utility function takes a BLOCK or code reference and "unblocks" it,
513     returning the new coderef. This means that the new coderef will return
514     immediately without blocking, returning nothing, while the original code
515     ref will be called (with parameters) from within its own coroutine.
516    
517 root 1.124 The reason this function exists is that many event libraries (such as the
518 root 1.92 venerable L<Event|Event> module) are not coroutine-safe (a weaker form
519     of thread-safety). This means you must not block within event callbacks,
520     otherwise you might suffer from crashes or worse.
521    
522     This function allows your callbacks to block by executing them in another
523     coroutine where it is safe to block. One example where blocking is handy
524     is when you use the L<Coro::AIO|Coro::AIO> functions to save results to
525     disk.
526    
527     In short: simply use C<unblock_sub { ... }> instead of C<sub { ... }> when
528     creating event callbacks that want to block.
529    
530     =cut
531    
532     our @unblock_queue;
533    
534 root 1.105 # we create a special coro because we want to cede,
535     # to reduce pressure on the coro pool (because most callbacks
536     # return immediately and can be reused) and because we cannot cede
537     # inside an event callback.
538 root 1.92 our $unblock_scheduler = async {
539 root 1.129 $current->desc ("[unblock_sub scheduler]");
540 root 1.92 while () {
541     while (my $cb = pop @unblock_queue) {
542 root 1.105 # this is an inlined copy of async_pool
543     my $coro = (pop @pool or new Coro \&pool_handler);
544    
545     $coro->{_invoke} = $cb;
546     $coro->ready;
547     cede; # for short-lived callbacks, this reduces pressure on the coro pool
548 root 1.92 }
549 root 1.105 schedule; # sleep well
550 root 1.92 }
551     };
552    
553     sub unblock_sub(&) {
554     my $cb = shift;
555    
556     sub {
557 root 1.105 unshift @unblock_queue, [$cb, @_];
558 root 1.92 $unblock_scheduler->ready;
559     }
560     }
561    
562     =back
563    
564 root 1.8 =cut
565 root 1.2
566 root 1.8 1;
567 root 1.14
568 root 1.17 =head1 BUGS/LIMITATIONS
569 root 1.14
570 root 1.52 - you must make very sure that no coro is still active on global
571 root 1.53 destruction. very bad things might happen otherwise (usually segfaults).
572 root 1.52
573     - this module is not thread-safe. You should only ever use this module
574 root 1.124 from the same thread (this requirement might be loosened in the future
575 root 1.52 to allow per-thread schedulers, but Coro::State does not yet allow
576     this).
577 root 1.9
578     =head1 SEE ALSO
579    
580 root 1.67 Support/Utility: L<Coro::Cont>, L<Coro::Specific>, L<Coro::State>, L<Coro::Util>.
581    
582     Locking/IPC: L<Coro::Signal>, L<Coro::Channel>, L<Coro::Semaphore>, L<Coro::SemaphoreSet>, L<Coro::RWLock>.
583    
584     Event/IO: L<Coro::Timer>, L<Coro::Event>, L<Coro::Handle>, L<Coro::Socket>, L<Coro::Select>.
585    
586     Embedding: L<Coro:MakeMaker>
587 root 1.1
588     =head1 AUTHOR
589    
590 root 1.66 Marc Lehmann <schmorp@schmorp.de>
591 root 1.64 http://home.schmorp.de/
592 root 1.1
593     =cut
594