ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/Coro/Coro.pm
Revision: 1.118
Committed: Mon Mar 19 15:50:48 2007 UTC (17 years, 2 months ago) by root
Branch: MAIN
CVS Tags: rel-3_55
Changes since 1.117: +2 -1 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3 root 1.8 Coro - coroutine process abstraction
4 root 1.1
5     =head1 SYNOPSIS
6    
7     use Coro;
8    
9 root 1.8 async {
10     # some asynchronous thread of execution
11 root 1.2 };
12    
13 root 1.92 # alternatively create an async coroutine like this:
14 root 1.6
15 root 1.8 sub some_func : Coro {
16     # some more async code
17     }
18    
19 root 1.22 cede;
20 root 1.2
21 root 1.1 =head1 DESCRIPTION
22    
23 root 1.98 This module collection manages coroutines. Coroutines are similar
24     to threads but don't run in parallel at the same time even on SMP
25     machines. The specific flavor of coroutine use din this module also
26     guarentees you that it will not switch between coroutines unless
27     necessary, at easily-identified points in your program, so locking and
28     parallel access are rarely an issue, making coroutine programming much
29     safer than threads programming.
30    
31     (Perl, however, does not natively support real threads but instead does a
32     very slow and memory-intensive emulation of processes using threads. This
33     is a performance win on Windows machines, and a loss everywhere else).
34    
35     In this module, coroutines are defined as "callchain + lexical variables +
36     @_ + $_ + $@ + $/ + C stack), that is, a coroutine has its own callchain,
37     its own set of lexicals and its own set of perls most important global
38     variables.
39 root 1.22
40 root 1.8 =cut
41    
42     package Coro;
43    
44 root 1.71 use strict;
45     no warnings "uninitialized";
46 root 1.36
47 root 1.8 use Coro::State;
48    
49 root 1.83 use base qw(Coro::State Exporter);
50 pcg 1.55
51 root 1.83 our $idle; # idle handler
52 root 1.71 our $main; # main coroutine
53     our $current; # current coroutine
54 root 1.8
55 root 1.118 our $VERSION = '3.55';
56 root 1.8
57 root 1.105 our @EXPORT = qw(async async_pool cede schedule terminate current unblock_sub);
58 root 1.71 our %EXPORT_TAGS = (
59 root 1.31 prio => [qw(PRIO_MAX PRIO_HIGH PRIO_NORMAL PRIO_LOW PRIO_IDLE PRIO_MIN)],
60     );
61 root 1.97 our @EXPORT_OK = (@{$EXPORT_TAGS{prio}}, qw(nready));
62 root 1.8
63     {
64     my @async;
65 root 1.26 my $init;
66 root 1.8
67     # this way of handling attributes simply is NOT scalable ;()
68     sub import {
69 root 1.71 no strict 'refs';
70    
71 root 1.93 Coro->export_to_level (1, @_);
72 root 1.71
73 root 1.8 my $old = *{(caller)[0]."::MODIFY_CODE_ATTRIBUTES"}{CODE};
74     *{(caller)[0]."::MODIFY_CODE_ATTRIBUTES"} = sub {
75     my ($package, $ref) = (shift, shift);
76     my @attrs;
77     for (@_) {
78     if ($_ eq "Coro") {
79     push @async, $ref;
80 root 1.26 unless ($init++) {
81     eval q{
82     sub INIT {
83     &async(pop @async) while @async;
84     }
85     };
86     }
87 root 1.8 } else {
88 root 1.17 push @attrs, $_;
89 root 1.8 }
90     }
91 root 1.17 return $old ? $old->($package, $ref, @attrs) : @attrs;
92 root 1.8 };
93     }
94    
95     }
96    
97 root 1.43 =over 4
98    
99 root 1.8 =item $main
100 root 1.2
101 root 1.8 This coroutine represents the main program.
102 root 1.1
103     =cut
104    
105 pcg 1.55 $main = new Coro;
106 root 1.8
107 root 1.19 =item $current (or as function: current)
108 root 1.1
109 root 1.83 The current coroutine (the last coroutine switched to). The initial value
110     is C<$main> (of course).
111    
112     This variable is B<strictly> I<read-only>. It is provided for performance
113     reasons. If performance is not essentiel you are encouraged to use the
114     C<Coro::current> function instead.
115 root 1.1
116 root 1.8 =cut
117    
118     # maybe some other module used Coro::Specific before...
119 root 1.93 $main->{specific} = $current->{specific}
120     if $current;
121 root 1.1
122 root 1.93 _set_current $main;
123 root 1.19
124     sub current() { $current }
125 root 1.9
126     =item $idle
127    
128 root 1.83 A callback that is called whenever the scheduler finds no ready coroutines
129     to run. The default implementation prints "FATAL: deadlock detected" and
130 root 1.91 exits, because the program has no other way to continue.
131 root 1.83
132     This hook is overwritten by modules such as C<Coro::Timer> and
133 root 1.91 C<Coro::Event> to wait on an external event that hopefully wake up a
134     coroutine so the scheduler can run it.
135    
136     Please note that if your callback recursively invokes perl (e.g. for event
137     handlers), then it must be prepared to be called recursively.
138 root 1.9
139     =cut
140    
141 root 1.83 $idle = sub {
142 root 1.96 require Carp;
143     Carp::croak ("FATAL: deadlock detected");
144 root 1.9 };
145 root 1.8
146 root 1.103 sub _cancel {
147     my ($self) = @_;
148    
149     # free coroutine data and mark as destructed
150     $self->_destroy
151     or return;
152    
153     # call all destruction callbacks
154     $_->(@{$self->{status}})
155     for @{(delete $self->{destroy_cb}) || []};
156     }
157    
158 root 1.24 # this coroutine is necessary because a coroutine
159     # cannot destroy itself.
160     my @destroy;
161 root 1.103 my $manager;
162    
163     $manager = new Coro sub {
164 pcg 1.57 while () {
165 root 1.103 (shift @destroy)->_cancel
166     while @destroy;
167    
168 root 1.24 &schedule;
169     }
170     };
171    
172 root 1.103 $manager->prio (PRIO_MAX);
173    
174 root 1.8 # static methods. not really.
175 root 1.43
176     =back
177 root 1.8
178     =head2 STATIC METHODS
179    
180 root 1.92 Static methods are actually functions that operate on the current coroutine only.
181 root 1.8
182     =over 4
183    
184 root 1.13 =item async { ... } [@args...]
185 root 1.8
186 root 1.92 Create a new asynchronous coroutine and return it's coroutine object
187     (usually unused). When the sub returns the new coroutine is automatically
188 root 1.8 terminated.
189    
190 root 1.89 Calling C<exit> in a coroutine will not work correctly, so do not do that.
191    
192 root 1.79 When the coroutine dies, the program will exit, just as in the main
193     program.
194    
195 root 1.13 # create a new coroutine that just prints its arguments
196     async {
197     print "@_\n";
198     } 1,2,3,4;
199    
200 root 1.8 =cut
201    
202 root 1.13 sub async(&@) {
203 root 1.104 my $coro = new Coro @_;
204     $coro->ready;
205     $coro
206 root 1.8 }
207 root 1.1
208 root 1.105 =item async_pool { ... } [@args...]
209    
210     Similar to C<async>, but uses a coroutine pool, so you should not call
211     terminate or join (although you are allowed to), and you get a coroutine
212     that might have executed other code already (which can be good or bad :).
213    
214     Also, the block is executed in an C<eval> context and a warning will be
215 root 1.108 issued in case of an exception instead of terminating the program, as
216     C<async> does. As the coroutine is being reused, stuff like C<on_destroy>
217     will not work in the expected way, unless you call terminate or cancel,
218     which somehow defeats the purpose of pooling.
219 root 1.105
220     The priority will be reset to C<0> after each job, otherwise the coroutine
221     will be re-used "as-is".
222    
223     The pool size is limited to 8 idle coroutines (this can be adjusted by
224     changing $Coro::POOL_SIZE), and there can be as many non-idle coros as
225     required.
226    
227     If you are concerned about pooled coroutines growing a lot because a
228     single C<async_pool> used a lot of stackspace you can e.g. C<async_pool {
229     terminate }> once per second or so to slowly replenish the pool.
230    
231     =cut
232    
233     our $POOL_SIZE = 8;
234     our @pool;
235    
236     sub pool_handler {
237     while () {
238     eval {
239 root 1.110 my ($cb, @arg) = @{ delete $current->{_invoke} or return };
240 root 1.105 $cb->(@arg);
241     };
242     warn $@ if $@;
243    
244     last if @pool >= $POOL_SIZE;
245     push @pool, $current;
246    
247 root 1.118 $current->save (Coro::State::SAVE_DEF);
248 root 1.105 $current->prio (0);
249     schedule;
250 root 1.106 }
251     }
252 root 1.105
253     sub async_pool(&@) {
254     # this is also inlined into the unlock_scheduler
255     my $coro = (pop @pool or new Coro \&pool_handler);
256    
257     $coro->{_invoke} = [@_];
258     $coro->ready;
259    
260     $coro
261     }
262    
263 root 1.8 =item schedule
264 root 1.6
265 root 1.92 Calls the scheduler. Please note that the current coroutine will not be put
266 root 1.8 into the ready queue, so calling this function usually means you will
267 root 1.91 never be called again unless something else (e.g. an event handler) calls
268     ready.
269    
270     The canonical way to wait on external events is this:
271    
272     {
273 root 1.92 # remember current coroutine
274 root 1.91 my $current = $Coro::current;
275    
276     # register a hypothetical event handler
277     on_event_invoke sub {
278     # wake up sleeping coroutine
279     $current->ready;
280     undef $current;
281     };
282    
283     # call schedule until event occured.
284     # in case we are woken up for other reasons
285     # (current still defined), loop.
286     Coro::schedule while $current;
287     }
288 root 1.1
289 root 1.22 =item cede
290 root 1.1
291 root 1.92 "Cede" to other coroutines. This function puts the current coroutine into the
292 root 1.22 ready queue and calls C<schedule>, which has the effect of giving up the
293     current "timeslice" to other coroutines of the same or higher priority.
294 root 1.7
295 root 1.107 Returns true if at least one coroutine switch has happened.
296    
297 root 1.102 =item Coro::cede_notself
298    
299     Works like cede, but is not exported by default and will cede to any
300     coroutine, regardless of priority, once.
301    
302 root 1.107 Returns true if at least one coroutine switch has happened.
303    
304 root 1.40 =item terminate [arg...]
305 root 1.7
306 root 1.92 Terminates the current coroutine with the given status values (see L<cancel>).
307 root 1.13
308 root 1.1 =cut
309    
310 root 1.8 sub terminate {
311 pcg 1.59 $current->cancel (@_);
312 root 1.1 }
313 root 1.6
314 root 1.8 =back
315    
316     # dynamic methods
317    
318 root 1.92 =head2 COROUTINE METHODS
319 root 1.8
320 root 1.92 These are the methods you can call on coroutine objects.
321 root 1.6
322 root 1.8 =over 4
323    
324 root 1.13 =item new Coro \&sub [, @args...]
325 root 1.8
326 root 1.92 Create a new coroutine and return it. When the sub returns the coroutine
327 root 1.40 automatically terminates as if C<terminate> with the returned values were
328 root 1.92 called. To make the coroutine run you must first put it into the ready queue
329 root 1.41 by calling the ready method.
330 root 1.13
331 root 1.89 Calling C<exit> in a coroutine will not work correctly, so do not do that.
332    
333 root 1.6 =cut
334    
335 root 1.94 sub _run_coro {
336 root 1.13 terminate &{+shift};
337     }
338    
339 root 1.8 sub new {
340     my $class = shift;
341 root 1.83
342 root 1.94 $class->SUPER::new (\&_run_coro, @_)
343 root 1.8 }
344 root 1.6
345 root 1.92 =item $success = $coroutine->ready
346 root 1.1
347 root 1.92 Put the given coroutine into the ready queue (according to it's priority)
348     and return true. If the coroutine is already in the ready queue, do nothing
349 root 1.90 and return false.
350 root 1.1
351 root 1.92 =item $is_ready = $coroutine->is_ready
352 root 1.90
353 root 1.92 Return wether the coroutine is currently the ready queue or not,
354 root 1.28
355 root 1.92 =item $coroutine->cancel (arg...)
356 root 1.28
357 root 1.92 Terminates the given coroutine and makes it return the given arguments as
358 root 1.103 status (default: the empty list). Never returns if the coroutine is the
359     current coroutine.
360 root 1.28
361     =cut
362    
363     sub cancel {
364 pcg 1.59 my $self = shift;
365     $self->{status} = [@_];
366 root 1.103
367     if ($current == $self) {
368     push @destroy, $self;
369     $manager->ready;
370     &schedule while 1;
371     } else {
372     $self->_cancel;
373     }
374 root 1.40 }
375    
376 root 1.92 =item $coroutine->join
377 root 1.40
378     Wait until the coroutine terminates and return any values given to the
379 pcg 1.59 C<terminate> or C<cancel> functions. C<join> can be called multiple times
380 root 1.92 from multiple coroutine.
381 root 1.40
382     =cut
383    
384     sub join {
385     my $self = shift;
386 root 1.103
387 root 1.40 unless ($self->{status}) {
388 root 1.103 my $current = $current;
389    
390     push @{$self->{destroy_cb}}, sub {
391     $current->ready;
392     undef $current;
393     };
394    
395     &schedule while $current;
396 root 1.40 }
397 root 1.103
398 root 1.40 wantarray ? @{$self->{status}} : $self->{status}[0];
399 root 1.31 }
400    
401 root 1.101 =item $coroutine->on_destroy (\&cb)
402    
403     Registers a callback that is called when this coroutine gets destroyed,
404     but before it is joined. The callback gets passed the terminate arguments,
405     if any.
406    
407     =cut
408    
409     sub on_destroy {
410     my ($self, $cb) = @_;
411    
412     push @{ $self->{destroy_cb} }, $cb;
413     }
414    
415 root 1.92 =item $oldprio = $coroutine->prio ($newprio)
416 root 1.31
417 root 1.41 Sets (or gets, if the argument is missing) the priority of the
418 root 1.92 coroutine. Higher priority coroutines get run before lower priority
419     coroutines. Priorities are small signed integers (currently -4 .. +3),
420 root 1.41 that you can refer to using PRIO_xxx constants (use the import tag :prio
421     to get then):
422 root 1.31
423     PRIO_MAX > PRIO_HIGH > PRIO_NORMAL > PRIO_LOW > PRIO_IDLE > PRIO_MIN
424     3 > 1 > 0 > -1 > -3 > -4
425    
426     # set priority to HIGH
427     current->prio(PRIO_HIGH);
428    
429     The idle coroutine ($Coro::idle) always has a lower priority than any
430     existing coroutine.
431    
432 root 1.92 Changing the priority of the current coroutine will take effect immediately,
433     but changing the priority of coroutines in the ready queue (but not
434 root 1.31 running) will only take effect after the next schedule (of that
435 root 1.92 coroutine). This is a bug that will be fixed in some future version.
436 root 1.31
437 root 1.92 =item $newprio = $coroutine->nice ($change)
438 root 1.31
439     Similar to C<prio>, but subtract the given value from the priority (i.e.
440     higher values mean lower priority, just as in unix).
441    
442 root 1.92 =item $olddesc = $coroutine->desc ($newdesc)
443 root 1.41
444     Sets (or gets in case the argument is missing) the description for this
445 root 1.92 coroutine. This is just a free-form string you can associate with a coroutine.
446 root 1.41
447     =cut
448    
449     sub desc {
450     my $old = $_[0]{desc};
451     $_[0]{desc} = $_[1] if @_ > 1;
452     $old;
453 root 1.8 }
454 root 1.1
455 root 1.8 =back
456 root 1.2
457 root 1.97 =head2 GLOBAL FUNCTIONS
458 root 1.92
459     =over 4
460    
461 root 1.97 =item Coro::nready
462    
463     Returns the number of coroutines that are currently in the ready state,
464     i.e. that can be swicthed to. The value C<0> means that the only runnable
465     coroutine is the currently running one, so C<cede> would have no effect,
466     and C<schedule> would cause a deadlock unless there is an idle handler
467     that wakes up some coroutines.
468    
469 root 1.103 =item my $guard = Coro::guard { ... }
470    
471     This creates and returns a guard object. Nothing happens until the objetc
472     gets destroyed, in which case the codeblock given as argument will be
473     executed. This is useful to free locks or other resources in case of a
474     runtime error or when the coroutine gets canceled, as in both cases the
475     guard block will be executed. The guard object supports only one method,
476     C<< ->cancel >>, which will keep the codeblock from being executed.
477    
478     Example: set some flag and clear it again when the coroutine gets canceled
479     or the function returns:
480    
481     sub do_something {
482     my $guard = Coro::guard { $busy = 0 };
483     $busy = 1;
484    
485     # do something that requires $busy to be true
486     }
487    
488     =cut
489    
490     sub guard(&) {
491     bless \(my $cb = $_[0]), "Coro::guard"
492     }
493    
494     sub Coro::guard::cancel {
495     ${$_[0]} = sub { };
496     }
497    
498     sub Coro::guard::DESTROY {
499     ${$_[0]}->();
500     }
501    
502    
503 root 1.92 =item unblock_sub { ... }
504    
505     This utility function takes a BLOCK or code reference and "unblocks" it,
506     returning the new coderef. This means that the new coderef will return
507     immediately without blocking, returning nothing, while the original code
508     ref will be called (with parameters) from within its own coroutine.
509    
510     The reason this fucntion exists is that many event libraries (such as the
511     venerable L<Event|Event> module) are not coroutine-safe (a weaker form
512     of thread-safety). This means you must not block within event callbacks,
513     otherwise you might suffer from crashes or worse.
514    
515     This function allows your callbacks to block by executing them in another
516     coroutine where it is safe to block. One example where blocking is handy
517     is when you use the L<Coro::AIO|Coro::AIO> functions to save results to
518     disk.
519    
520     In short: simply use C<unblock_sub { ... }> instead of C<sub { ... }> when
521     creating event callbacks that want to block.
522    
523     =cut
524    
525     our @unblock_queue;
526    
527 root 1.105 # we create a special coro because we want to cede,
528     # to reduce pressure on the coro pool (because most callbacks
529     # return immediately and can be reused) and because we cannot cede
530     # inside an event callback.
531 root 1.92 our $unblock_scheduler = async {
532     while () {
533     while (my $cb = pop @unblock_queue) {
534 root 1.105 # this is an inlined copy of async_pool
535     my $coro = (pop @pool or new Coro \&pool_handler);
536    
537     $coro->{_invoke} = $cb;
538     $coro->ready;
539     cede; # for short-lived callbacks, this reduces pressure on the coro pool
540 root 1.92 }
541 root 1.105 schedule; # sleep well
542 root 1.92 }
543     };
544    
545     sub unblock_sub(&) {
546     my $cb = shift;
547    
548     sub {
549 root 1.105 unshift @unblock_queue, [$cb, @_];
550 root 1.92 $unblock_scheduler->ready;
551     }
552     }
553    
554     =back
555    
556 root 1.8 =cut
557 root 1.2
558 root 1.8 1;
559 root 1.14
560 root 1.17 =head1 BUGS/LIMITATIONS
561 root 1.14
562 root 1.52 - you must make very sure that no coro is still active on global
563 root 1.53 destruction. very bad things might happen otherwise (usually segfaults).
564 root 1.52
565     - this module is not thread-safe. You should only ever use this module
566     from the same thread (this requirement might be losened in the future
567     to allow per-thread schedulers, but Coro::State does not yet allow
568     this).
569 root 1.9
570     =head1 SEE ALSO
571    
572 root 1.67 Support/Utility: L<Coro::Cont>, L<Coro::Specific>, L<Coro::State>, L<Coro::Util>.
573    
574     Locking/IPC: L<Coro::Signal>, L<Coro::Channel>, L<Coro::Semaphore>, L<Coro::SemaphoreSet>, L<Coro::RWLock>.
575    
576     Event/IO: L<Coro::Timer>, L<Coro::Event>, L<Coro::Handle>, L<Coro::Socket>, L<Coro::Select>.
577    
578     Embedding: L<Coro:MakeMaker>
579 root 1.1
580     =head1 AUTHOR
581    
582 root 1.66 Marc Lehmann <schmorp@schmorp.de>
583 root 1.64 http://home.schmorp.de/
584 root 1.1
585     =cut
586