ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/Coro/Coro.pm
Revision: 1.123
Committed: Mon Apr 16 13:26:43 2007 UTC (17 years, 1 month ago) by root
Branch: MAIN
Changes since 1.122: +1 -1 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3 root 1.8 Coro - coroutine process abstraction
4 root 1.1
5     =head1 SYNOPSIS
6    
7     use Coro;
8    
9 root 1.8 async {
10     # some asynchronous thread of execution
11 root 1.2 };
12    
13 root 1.92 # alternatively create an async coroutine like this:
14 root 1.6
15 root 1.8 sub some_func : Coro {
16     # some more async code
17     }
18    
19 root 1.22 cede;
20 root 1.2
21 root 1.1 =head1 DESCRIPTION
22    
23 root 1.98 This module collection manages coroutines. Coroutines are similar
24     to threads but don't run in parallel at the same time even on SMP
25     machines. The specific flavor of coroutine use din this module also
26     guarentees you that it will not switch between coroutines unless
27     necessary, at easily-identified points in your program, so locking and
28     parallel access are rarely an issue, making coroutine programming much
29     safer than threads programming.
30    
31     (Perl, however, does not natively support real threads but instead does a
32     very slow and memory-intensive emulation of processes using threads. This
33     is a performance win on Windows machines, and a loss everywhere else).
34    
35     In this module, coroutines are defined as "callchain + lexical variables +
36     @_ + $_ + $@ + $/ + C stack), that is, a coroutine has its own callchain,
37     its own set of lexicals and its own set of perls most important global
38     variables.
39 root 1.22
40 root 1.8 =cut
41    
42     package Coro;
43    
44 root 1.71 use strict;
45     no warnings "uninitialized";
46 root 1.36
47 root 1.8 use Coro::State;
48    
49 root 1.83 use base qw(Coro::State Exporter);
50 pcg 1.55
51 root 1.83 our $idle; # idle handler
52 root 1.71 our $main; # main coroutine
53     our $current; # current coroutine
54 root 1.8
55 root 1.123 our $VERSION = '3.61';
56 root 1.8
57 root 1.105 our @EXPORT = qw(async async_pool cede schedule terminate current unblock_sub);
58 root 1.71 our %EXPORT_TAGS = (
59 root 1.31 prio => [qw(PRIO_MAX PRIO_HIGH PRIO_NORMAL PRIO_LOW PRIO_IDLE PRIO_MIN)],
60     );
61 root 1.97 our @EXPORT_OK = (@{$EXPORT_TAGS{prio}}, qw(nready));
62 root 1.8
63     {
64     my @async;
65 root 1.26 my $init;
66 root 1.8
67     # this way of handling attributes simply is NOT scalable ;()
68     sub import {
69 root 1.71 no strict 'refs';
70    
71 root 1.93 Coro->export_to_level (1, @_);
72 root 1.71
73 root 1.8 my $old = *{(caller)[0]."::MODIFY_CODE_ATTRIBUTES"}{CODE};
74     *{(caller)[0]."::MODIFY_CODE_ATTRIBUTES"} = sub {
75     my ($package, $ref) = (shift, shift);
76     my @attrs;
77     for (@_) {
78     if ($_ eq "Coro") {
79     push @async, $ref;
80 root 1.26 unless ($init++) {
81     eval q{
82     sub INIT {
83     &async(pop @async) while @async;
84     }
85     };
86     }
87 root 1.8 } else {
88 root 1.17 push @attrs, $_;
89 root 1.8 }
90     }
91 root 1.17 return $old ? $old->($package, $ref, @attrs) : @attrs;
92 root 1.8 };
93     }
94    
95     }
96    
97 root 1.43 =over 4
98    
99 root 1.8 =item $main
100 root 1.2
101 root 1.8 This coroutine represents the main program.
102 root 1.1
103     =cut
104    
105 pcg 1.55 $main = new Coro;
106 root 1.8
107 root 1.19 =item $current (or as function: current)
108 root 1.1
109 root 1.83 The current coroutine (the last coroutine switched to). The initial value
110     is C<$main> (of course).
111    
112     This variable is B<strictly> I<read-only>. It is provided for performance
113     reasons. If performance is not essentiel you are encouraged to use the
114     C<Coro::current> function instead.
115 root 1.1
116 root 1.8 =cut
117    
118     # maybe some other module used Coro::Specific before...
119 root 1.93 $main->{specific} = $current->{specific}
120     if $current;
121 root 1.1
122 root 1.93 _set_current $main;
123 root 1.19
124     sub current() { $current }
125 root 1.9
126     =item $idle
127    
128 root 1.83 A callback that is called whenever the scheduler finds no ready coroutines
129     to run. The default implementation prints "FATAL: deadlock detected" and
130 root 1.91 exits, because the program has no other way to continue.
131 root 1.83
132     This hook is overwritten by modules such as C<Coro::Timer> and
133 root 1.91 C<Coro::Event> to wait on an external event that hopefully wake up a
134     coroutine so the scheduler can run it.
135    
136     Please note that if your callback recursively invokes perl (e.g. for event
137     handlers), then it must be prepared to be called recursively.
138 root 1.9
139     =cut
140    
141 root 1.83 $idle = sub {
142 root 1.96 require Carp;
143     Carp::croak ("FATAL: deadlock detected");
144 root 1.9 };
145 root 1.8
146 root 1.103 sub _cancel {
147     my ($self) = @_;
148    
149     # free coroutine data and mark as destructed
150     $self->_destroy
151     or return;
152    
153     # call all destruction callbacks
154     $_->(@{$self->{status}})
155     for @{(delete $self->{destroy_cb}) || []};
156     }
157    
158 root 1.24 # this coroutine is necessary because a coroutine
159     # cannot destroy itself.
160     my @destroy;
161 root 1.103 my $manager;
162    
163     $manager = new Coro sub {
164 pcg 1.57 while () {
165 root 1.103 (shift @destroy)->_cancel
166     while @destroy;
167    
168 root 1.24 &schedule;
169     }
170     };
171    
172 root 1.103 $manager->prio (PRIO_MAX);
173    
174 root 1.8 # static methods. not really.
175 root 1.43
176     =back
177 root 1.8
178     =head2 STATIC METHODS
179    
180 root 1.92 Static methods are actually functions that operate on the current coroutine only.
181 root 1.8
182     =over 4
183    
184 root 1.13 =item async { ... } [@args...]
185 root 1.8
186 root 1.92 Create a new asynchronous coroutine and return it's coroutine object
187     (usually unused). When the sub returns the new coroutine is automatically
188 root 1.8 terminated.
189    
190 root 1.122 Calling C<exit> in a coroutine will do the same as calling exit outside
191     the coroutine. Likewise, when the coroutine dies, the program will exit,
192     just as it would in the main program.
193 root 1.79
194 root 1.13 # create a new coroutine that just prints its arguments
195     async {
196     print "@_\n";
197     } 1,2,3,4;
198    
199 root 1.8 =cut
200    
201 root 1.13 sub async(&@) {
202 root 1.104 my $coro = new Coro @_;
203     $coro->ready;
204     $coro
205 root 1.8 }
206 root 1.1
207 root 1.105 =item async_pool { ... } [@args...]
208    
209     Similar to C<async>, but uses a coroutine pool, so you should not call
210     terminate or join (although you are allowed to), and you get a coroutine
211     that might have executed other code already (which can be good or bad :).
212    
213     Also, the block is executed in an C<eval> context and a warning will be
214 root 1.108 issued in case of an exception instead of terminating the program, as
215     C<async> does. As the coroutine is being reused, stuff like C<on_destroy>
216     will not work in the expected way, unless you call terminate or cancel,
217     which somehow defeats the purpose of pooling.
218 root 1.105
219     The priority will be reset to C<0> after each job, otherwise the coroutine
220     will be re-used "as-is".
221    
222     The pool size is limited to 8 idle coroutines (this can be adjusted by
223     changing $Coro::POOL_SIZE), and there can be as many non-idle coros as
224     required.
225    
226     If you are concerned about pooled coroutines growing a lot because a
227     single C<async_pool> used a lot of stackspace you can e.g. C<async_pool {
228     terminate }> once per second or so to slowly replenish the pool.
229    
230     =cut
231    
232     our $POOL_SIZE = 8;
233     our @pool;
234    
235     sub pool_handler {
236     while () {
237     eval {
238 root 1.110 my ($cb, @arg) = @{ delete $current->{_invoke} or return };
239 root 1.105 $cb->(@arg);
240     };
241     warn $@ if $@;
242    
243     last if @pool >= $POOL_SIZE;
244     push @pool, $current;
245    
246 root 1.118 $current->save (Coro::State::SAVE_DEF);
247 root 1.105 $current->prio (0);
248     schedule;
249 root 1.106 }
250     }
251 root 1.105
252     sub async_pool(&@) {
253     # this is also inlined into the unlock_scheduler
254     my $coro = (pop @pool or new Coro \&pool_handler);
255    
256     $coro->{_invoke} = [@_];
257     $coro->ready;
258    
259     $coro
260     }
261    
262 root 1.8 =item schedule
263 root 1.6
264 root 1.92 Calls the scheduler. Please note that the current coroutine will not be put
265 root 1.8 into the ready queue, so calling this function usually means you will
266 root 1.91 never be called again unless something else (e.g. an event handler) calls
267     ready.
268    
269     The canonical way to wait on external events is this:
270    
271     {
272 root 1.92 # remember current coroutine
273 root 1.91 my $current = $Coro::current;
274    
275     # register a hypothetical event handler
276     on_event_invoke sub {
277     # wake up sleeping coroutine
278     $current->ready;
279     undef $current;
280     };
281    
282     # call schedule until event occured.
283     # in case we are woken up for other reasons
284     # (current still defined), loop.
285     Coro::schedule while $current;
286     }
287 root 1.1
288 root 1.22 =item cede
289 root 1.1
290 root 1.92 "Cede" to other coroutines. This function puts the current coroutine into the
291 root 1.22 ready queue and calls C<schedule>, which has the effect of giving up the
292     current "timeslice" to other coroutines of the same or higher priority.
293 root 1.7
294 root 1.107 Returns true if at least one coroutine switch has happened.
295    
296 root 1.102 =item Coro::cede_notself
297    
298     Works like cede, but is not exported by default and will cede to any
299     coroutine, regardless of priority, once.
300    
301 root 1.107 Returns true if at least one coroutine switch has happened.
302    
303 root 1.40 =item terminate [arg...]
304 root 1.7
305 root 1.92 Terminates the current coroutine with the given status values (see L<cancel>).
306 root 1.13
307 root 1.1 =cut
308    
309 root 1.8 sub terminate {
310 pcg 1.59 $current->cancel (@_);
311 root 1.1 }
312 root 1.6
313 root 1.8 =back
314    
315     # dynamic methods
316    
317 root 1.92 =head2 COROUTINE METHODS
318 root 1.8
319 root 1.92 These are the methods you can call on coroutine objects.
320 root 1.6
321 root 1.8 =over 4
322    
323 root 1.13 =item new Coro \&sub [, @args...]
324 root 1.8
325 root 1.92 Create a new coroutine and return it. When the sub returns the coroutine
326 root 1.40 automatically terminates as if C<terminate> with the returned values were
327 root 1.92 called. To make the coroutine run you must first put it into the ready queue
328 root 1.41 by calling the ready method.
329 root 1.13
330 root 1.121 See C<async> for additional discussion.
331 root 1.89
332 root 1.6 =cut
333    
334 root 1.94 sub _run_coro {
335 root 1.13 terminate &{+shift};
336     }
337    
338 root 1.8 sub new {
339     my $class = shift;
340 root 1.83
341 root 1.94 $class->SUPER::new (\&_run_coro, @_)
342 root 1.8 }
343 root 1.6
344 root 1.92 =item $success = $coroutine->ready
345 root 1.1
346 root 1.92 Put the given coroutine into the ready queue (according to it's priority)
347     and return true. If the coroutine is already in the ready queue, do nothing
348 root 1.90 and return false.
349 root 1.1
350 root 1.92 =item $is_ready = $coroutine->is_ready
351 root 1.90
352 root 1.92 Return wether the coroutine is currently the ready queue or not,
353 root 1.28
354 root 1.92 =item $coroutine->cancel (arg...)
355 root 1.28
356 root 1.92 Terminates the given coroutine and makes it return the given arguments as
357 root 1.103 status (default: the empty list). Never returns if the coroutine is the
358     current coroutine.
359 root 1.28
360     =cut
361    
362     sub cancel {
363 pcg 1.59 my $self = shift;
364     $self->{status} = [@_];
365 root 1.103
366     if ($current == $self) {
367     push @destroy, $self;
368     $manager->ready;
369     &schedule while 1;
370     } else {
371     $self->_cancel;
372     }
373 root 1.40 }
374    
375 root 1.92 =item $coroutine->join
376 root 1.40
377     Wait until the coroutine terminates and return any values given to the
378 pcg 1.59 C<terminate> or C<cancel> functions. C<join> can be called multiple times
379 root 1.92 from multiple coroutine.
380 root 1.40
381     =cut
382    
383     sub join {
384     my $self = shift;
385 root 1.103
386 root 1.40 unless ($self->{status}) {
387 root 1.103 my $current = $current;
388    
389     push @{$self->{destroy_cb}}, sub {
390     $current->ready;
391     undef $current;
392     };
393    
394     &schedule while $current;
395 root 1.40 }
396 root 1.103
397 root 1.40 wantarray ? @{$self->{status}} : $self->{status}[0];
398 root 1.31 }
399    
400 root 1.101 =item $coroutine->on_destroy (\&cb)
401    
402     Registers a callback that is called when this coroutine gets destroyed,
403     but before it is joined. The callback gets passed the terminate arguments,
404     if any.
405    
406     =cut
407    
408     sub on_destroy {
409     my ($self, $cb) = @_;
410    
411     push @{ $self->{destroy_cb} }, $cb;
412     }
413    
414 root 1.92 =item $oldprio = $coroutine->prio ($newprio)
415 root 1.31
416 root 1.41 Sets (or gets, if the argument is missing) the priority of the
417 root 1.92 coroutine. Higher priority coroutines get run before lower priority
418     coroutines. Priorities are small signed integers (currently -4 .. +3),
419 root 1.41 that you can refer to using PRIO_xxx constants (use the import tag :prio
420     to get then):
421 root 1.31
422     PRIO_MAX > PRIO_HIGH > PRIO_NORMAL > PRIO_LOW > PRIO_IDLE > PRIO_MIN
423     3 > 1 > 0 > -1 > -3 > -4
424    
425     # set priority to HIGH
426     current->prio(PRIO_HIGH);
427    
428     The idle coroutine ($Coro::idle) always has a lower priority than any
429     existing coroutine.
430    
431 root 1.92 Changing the priority of the current coroutine will take effect immediately,
432     but changing the priority of coroutines in the ready queue (but not
433 root 1.31 running) will only take effect after the next schedule (of that
434 root 1.92 coroutine). This is a bug that will be fixed in some future version.
435 root 1.31
436 root 1.92 =item $newprio = $coroutine->nice ($change)
437 root 1.31
438     Similar to C<prio>, but subtract the given value from the priority (i.e.
439     higher values mean lower priority, just as in unix).
440    
441 root 1.92 =item $olddesc = $coroutine->desc ($newdesc)
442 root 1.41
443     Sets (or gets in case the argument is missing) the description for this
444 root 1.92 coroutine. This is just a free-form string you can associate with a coroutine.
445 root 1.41
446     =cut
447    
448     sub desc {
449     my $old = $_[0]{desc};
450     $_[0]{desc} = $_[1] if @_ > 1;
451     $old;
452 root 1.8 }
453 root 1.1
454 root 1.8 =back
455 root 1.2
456 root 1.97 =head2 GLOBAL FUNCTIONS
457 root 1.92
458     =over 4
459    
460 root 1.97 =item Coro::nready
461    
462     Returns the number of coroutines that are currently in the ready state,
463     i.e. that can be swicthed to. The value C<0> means that the only runnable
464     coroutine is the currently running one, so C<cede> would have no effect,
465     and C<schedule> would cause a deadlock unless there is an idle handler
466     that wakes up some coroutines.
467    
468 root 1.103 =item my $guard = Coro::guard { ... }
469    
470 root 1.119 This creates and returns a guard object. Nothing happens until the object
471 root 1.103 gets destroyed, in which case the codeblock given as argument will be
472     executed. This is useful to free locks or other resources in case of a
473     runtime error or when the coroutine gets canceled, as in both cases the
474     guard block will be executed. The guard object supports only one method,
475     C<< ->cancel >>, which will keep the codeblock from being executed.
476    
477     Example: set some flag and clear it again when the coroutine gets canceled
478     or the function returns:
479    
480     sub do_something {
481     my $guard = Coro::guard { $busy = 0 };
482     $busy = 1;
483    
484     # do something that requires $busy to be true
485     }
486    
487     =cut
488    
489     sub guard(&) {
490     bless \(my $cb = $_[0]), "Coro::guard"
491     }
492    
493     sub Coro::guard::cancel {
494     ${$_[0]} = sub { };
495     }
496    
497     sub Coro::guard::DESTROY {
498     ${$_[0]}->();
499     }
500    
501    
502 root 1.92 =item unblock_sub { ... }
503    
504     This utility function takes a BLOCK or code reference and "unblocks" it,
505     returning the new coderef. This means that the new coderef will return
506     immediately without blocking, returning nothing, while the original code
507     ref will be called (with parameters) from within its own coroutine.
508    
509     The reason this fucntion exists is that many event libraries (such as the
510     venerable L<Event|Event> module) are not coroutine-safe (a weaker form
511     of thread-safety). This means you must not block within event callbacks,
512     otherwise you might suffer from crashes or worse.
513    
514     This function allows your callbacks to block by executing them in another
515     coroutine where it is safe to block. One example where blocking is handy
516     is when you use the L<Coro::AIO|Coro::AIO> functions to save results to
517     disk.
518    
519     In short: simply use C<unblock_sub { ... }> instead of C<sub { ... }> when
520     creating event callbacks that want to block.
521    
522     =cut
523    
524     our @unblock_queue;
525    
526 root 1.105 # we create a special coro because we want to cede,
527     # to reduce pressure on the coro pool (because most callbacks
528     # return immediately and can be reused) and because we cannot cede
529     # inside an event callback.
530 root 1.92 our $unblock_scheduler = async {
531     while () {
532     while (my $cb = pop @unblock_queue) {
533 root 1.105 # this is an inlined copy of async_pool
534     my $coro = (pop @pool or new Coro \&pool_handler);
535    
536     $coro->{_invoke} = $cb;
537     $coro->ready;
538     cede; # for short-lived callbacks, this reduces pressure on the coro pool
539 root 1.92 }
540 root 1.105 schedule; # sleep well
541 root 1.92 }
542     };
543    
544     sub unblock_sub(&) {
545     my $cb = shift;
546    
547     sub {
548 root 1.105 unshift @unblock_queue, [$cb, @_];
549 root 1.92 $unblock_scheduler->ready;
550     }
551     }
552    
553     =back
554    
555 root 1.8 =cut
556 root 1.2
557 root 1.8 1;
558 root 1.14
559 root 1.17 =head1 BUGS/LIMITATIONS
560 root 1.14
561 root 1.52 - you must make very sure that no coro is still active on global
562 root 1.53 destruction. very bad things might happen otherwise (usually segfaults).
563 root 1.52
564     - this module is not thread-safe. You should only ever use this module
565     from the same thread (this requirement might be losened in the future
566     to allow per-thread schedulers, but Coro::State does not yet allow
567     this).
568 root 1.9
569     =head1 SEE ALSO
570    
571 root 1.67 Support/Utility: L<Coro::Cont>, L<Coro::Specific>, L<Coro::State>, L<Coro::Util>.
572    
573     Locking/IPC: L<Coro::Signal>, L<Coro::Channel>, L<Coro::Semaphore>, L<Coro::SemaphoreSet>, L<Coro::RWLock>.
574    
575     Event/IO: L<Coro::Timer>, L<Coro::Event>, L<Coro::Handle>, L<Coro::Socket>, L<Coro::Select>.
576    
577     Embedding: L<Coro:MakeMaker>
578 root 1.1
579     =head1 AUTHOR
580    
581 root 1.66 Marc Lehmann <schmorp@schmorp.de>
582 root 1.64 http://home.schmorp.de/
583 root 1.1
584     =cut
585