ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/Coro/Coro.pm
Revision: 1.128
Committed: Wed Sep 19 21:39:15 2007 UTC (16 years, 8 months ago) by root
Branch: MAIN
Changes since 1.127: +6 -2 lines
Log Message:
yeah

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3 root 1.8 Coro - coroutine process abstraction
4 root 1.1
5     =head1 SYNOPSIS
6    
7     use Coro;
8    
9 root 1.8 async {
10     # some asynchronous thread of execution
11 root 1.2 };
12    
13 root 1.92 # alternatively create an async coroutine like this:
14 root 1.6
15 root 1.8 sub some_func : Coro {
16     # some more async code
17     }
18    
19 root 1.22 cede;
20 root 1.2
21 root 1.1 =head1 DESCRIPTION
22    
23 root 1.98 This module collection manages coroutines. Coroutines are similar
24     to threads but don't run in parallel at the same time even on SMP
25 root 1.124 machines. The specific flavor of coroutine used in this module also
26     guarantees you that it will not switch between coroutines unless
27 root 1.98 necessary, at easily-identified points in your program, so locking and
28     parallel access are rarely an issue, making coroutine programming much
29     safer than threads programming.
30    
31     (Perl, however, does not natively support real threads but instead does a
32     very slow and memory-intensive emulation of processes using threads. This
33     is a performance win on Windows machines, and a loss everywhere else).
34    
35     In this module, coroutines are defined as "callchain + lexical variables +
36     @_ + $_ + $@ + $/ + C stack), that is, a coroutine has its own callchain,
37     its own set of lexicals and its own set of perls most important global
38     variables.
39 root 1.22
40 root 1.8 =cut
41    
42     package Coro;
43    
44 root 1.71 use strict;
45     no warnings "uninitialized";
46 root 1.36
47 root 1.8 use Coro::State;
48    
49 root 1.83 use base qw(Coro::State Exporter);
50 pcg 1.55
51 root 1.83 our $idle; # idle handler
52 root 1.71 our $main; # main coroutine
53     our $current; # current coroutine
54 root 1.8
55 root 1.128 our $VERSION = '3.7';
56 root 1.8
57 root 1.105 our @EXPORT = qw(async async_pool cede schedule terminate current unblock_sub);
58 root 1.71 our %EXPORT_TAGS = (
59 root 1.31 prio => [qw(PRIO_MAX PRIO_HIGH PRIO_NORMAL PRIO_LOW PRIO_IDLE PRIO_MIN)],
60     );
61 root 1.97 our @EXPORT_OK = (@{$EXPORT_TAGS{prio}}, qw(nready));
62 root 1.8
63     {
64     my @async;
65 root 1.26 my $init;
66 root 1.8
67     # this way of handling attributes simply is NOT scalable ;()
68     sub import {
69 root 1.71 no strict 'refs';
70    
71 root 1.93 Coro->export_to_level (1, @_);
72 root 1.71
73 root 1.8 my $old = *{(caller)[0]."::MODIFY_CODE_ATTRIBUTES"}{CODE};
74     *{(caller)[0]."::MODIFY_CODE_ATTRIBUTES"} = sub {
75     my ($package, $ref) = (shift, shift);
76     my @attrs;
77     for (@_) {
78     if ($_ eq "Coro") {
79     push @async, $ref;
80 root 1.26 unless ($init++) {
81     eval q{
82     sub INIT {
83     &async(pop @async) while @async;
84     }
85     };
86     }
87 root 1.8 } else {
88 root 1.17 push @attrs, $_;
89 root 1.8 }
90     }
91 root 1.17 return $old ? $old->($package, $ref, @attrs) : @attrs;
92 root 1.8 };
93     }
94    
95     }
96    
97 root 1.43 =over 4
98    
99 root 1.8 =item $main
100 root 1.2
101 root 1.8 This coroutine represents the main program.
102 root 1.1
103     =cut
104    
105 pcg 1.55 $main = new Coro;
106 root 1.8
107 root 1.19 =item $current (or as function: current)
108 root 1.1
109 root 1.83 The current coroutine (the last coroutine switched to). The initial value
110     is C<$main> (of course).
111    
112     This variable is B<strictly> I<read-only>. It is provided for performance
113 root 1.124 reasons. If performance is not essential you are encouraged to use the
114 root 1.83 C<Coro::current> function instead.
115 root 1.1
116 root 1.8 =cut
117    
118     # maybe some other module used Coro::Specific before...
119 root 1.93 $main->{specific} = $current->{specific}
120     if $current;
121 root 1.1
122 root 1.93 _set_current $main;
123 root 1.19
124     sub current() { $current }
125 root 1.9
126     =item $idle
127    
128 root 1.83 A callback that is called whenever the scheduler finds no ready coroutines
129     to run. The default implementation prints "FATAL: deadlock detected" and
130 root 1.91 exits, because the program has no other way to continue.
131 root 1.83
132     This hook is overwritten by modules such as C<Coro::Timer> and
133 root 1.91 C<Coro::Event> to wait on an external event that hopefully wake up a
134     coroutine so the scheduler can run it.
135    
136     Please note that if your callback recursively invokes perl (e.g. for event
137     handlers), then it must be prepared to be called recursively.
138 root 1.9
139     =cut
140    
141 root 1.83 $idle = sub {
142 root 1.96 require Carp;
143     Carp::croak ("FATAL: deadlock detected");
144 root 1.9 };
145 root 1.8
146 root 1.103 sub _cancel {
147     my ($self) = @_;
148    
149     # free coroutine data and mark as destructed
150     $self->_destroy
151     or return;
152    
153     # call all destruction callbacks
154     $_->(@{$self->{status}})
155     for @{(delete $self->{destroy_cb}) || []};
156     }
157    
158 root 1.24 # this coroutine is necessary because a coroutine
159     # cannot destroy itself.
160     my @destroy;
161 root 1.103 my $manager;
162    
163     $manager = new Coro sub {
164 pcg 1.57 while () {
165 root 1.103 (shift @destroy)->_cancel
166     while @destroy;
167    
168 root 1.24 &schedule;
169     }
170     };
171    
172 root 1.103 $manager->prio (PRIO_MAX);
173    
174 root 1.8 # static methods. not really.
175 root 1.43
176     =back
177 root 1.8
178     =head2 STATIC METHODS
179    
180 root 1.92 Static methods are actually functions that operate on the current coroutine only.
181 root 1.8
182     =over 4
183    
184 root 1.13 =item async { ... } [@args...]
185 root 1.8
186 root 1.92 Create a new asynchronous coroutine and return it's coroutine object
187     (usually unused). When the sub returns the new coroutine is automatically
188 root 1.8 terminated.
189    
190 root 1.122 Calling C<exit> in a coroutine will do the same as calling exit outside
191     the coroutine. Likewise, when the coroutine dies, the program will exit,
192     just as it would in the main program.
193 root 1.79
194 root 1.13 # create a new coroutine that just prints its arguments
195     async {
196     print "@_\n";
197     } 1,2,3,4;
198    
199 root 1.8 =cut
200    
201 root 1.13 sub async(&@) {
202 root 1.104 my $coro = new Coro @_;
203     $coro->ready;
204     $coro
205 root 1.8 }
206 root 1.1
207 root 1.105 =item async_pool { ... } [@args...]
208    
209     Similar to C<async>, but uses a coroutine pool, so you should not call
210     terminate or join (although you are allowed to), and you get a coroutine
211     that might have executed other code already (which can be good or bad :).
212    
213     Also, the block is executed in an C<eval> context and a warning will be
214 root 1.108 issued in case of an exception instead of terminating the program, as
215     C<async> does. As the coroutine is being reused, stuff like C<on_destroy>
216     will not work in the expected way, unless you call terminate or cancel,
217     which somehow defeats the purpose of pooling.
218 root 1.105
219     The priority will be reset to C<0> after each job, otherwise the coroutine
220     will be re-used "as-is".
221    
222     The pool size is limited to 8 idle coroutines (this can be adjusted by
223     changing $Coro::POOL_SIZE), and there can be as many non-idle coros as
224     required.
225    
226     If you are concerned about pooled coroutines growing a lot because a
227     single C<async_pool> used a lot of stackspace you can e.g. C<async_pool {
228     terminate }> once per second or so to slowly replenish the pool.
229    
230     =cut
231    
232     our $POOL_SIZE = 8;
233     our @pool;
234    
235     sub pool_handler {
236     while () {
237     eval {
238 root 1.110 my ($cb, @arg) = @{ delete $current->{_invoke} or return };
239 root 1.105 $cb->(@arg);
240     };
241     warn $@ if $@;
242    
243     last if @pool >= $POOL_SIZE;
244     push @pool, $current;
245    
246 root 1.118 $current->save (Coro::State::SAVE_DEF);
247 root 1.105 $current->prio (0);
248     schedule;
249 root 1.106 }
250     }
251 root 1.105
252     sub async_pool(&@) {
253     # this is also inlined into the unlock_scheduler
254 root 1.128 my $coro = (pop @pool) || do {
255     my $coro = new Coro \&pool_handler;
256     $coro->{desc} = "async_pool";
257     $coro
258     };
259 root 1.105
260     $coro->{_invoke} = [@_];
261     $coro->ready;
262    
263     $coro
264     }
265    
266 root 1.8 =item schedule
267 root 1.6
268 root 1.92 Calls the scheduler. Please note that the current coroutine will not be put
269 root 1.8 into the ready queue, so calling this function usually means you will
270 root 1.91 never be called again unless something else (e.g. an event handler) calls
271     ready.
272    
273     The canonical way to wait on external events is this:
274    
275     {
276 root 1.92 # remember current coroutine
277 root 1.91 my $current = $Coro::current;
278    
279     # register a hypothetical event handler
280     on_event_invoke sub {
281     # wake up sleeping coroutine
282     $current->ready;
283     undef $current;
284     };
285    
286 root 1.124 # call schedule until event occurred.
287 root 1.91 # in case we are woken up for other reasons
288     # (current still defined), loop.
289     Coro::schedule while $current;
290     }
291 root 1.1
292 root 1.22 =item cede
293 root 1.1
294 root 1.92 "Cede" to other coroutines. This function puts the current coroutine into the
295 root 1.22 ready queue and calls C<schedule>, which has the effect of giving up the
296     current "timeslice" to other coroutines of the same or higher priority.
297 root 1.7
298 root 1.107 Returns true if at least one coroutine switch has happened.
299    
300 root 1.102 =item Coro::cede_notself
301    
302     Works like cede, but is not exported by default and will cede to any
303     coroutine, regardless of priority, once.
304    
305 root 1.107 Returns true if at least one coroutine switch has happened.
306    
307 root 1.40 =item terminate [arg...]
308 root 1.7
309 root 1.92 Terminates the current coroutine with the given status values (see L<cancel>).
310 root 1.13
311 root 1.1 =cut
312    
313 root 1.8 sub terminate {
314 pcg 1.59 $current->cancel (@_);
315 root 1.1 }
316 root 1.6
317 root 1.8 =back
318    
319     # dynamic methods
320    
321 root 1.92 =head2 COROUTINE METHODS
322 root 1.8
323 root 1.92 These are the methods you can call on coroutine objects.
324 root 1.6
325 root 1.8 =over 4
326    
327 root 1.13 =item new Coro \&sub [, @args...]
328 root 1.8
329 root 1.92 Create a new coroutine and return it. When the sub returns the coroutine
330 root 1.40 automatically terminates as if C<terminate> with the returned values were
331 root 1.92 called. To make the coroutine run you must first put it into the ready queue
332 root 1.41 by calling the ready method.
333 root 1.13
334 root 1.121 See C<async> for additional discussion.
335 root 1.89
336 root 1.6 =cut
337    
338 root 1.94 sub _run_coro {
339 root 1.13 terminate &{+shift};
340     }
341    
342 root 1.8 sub new {
343     my $class = shift;
344 root 1.83
345 root 1.94 $class->SUPER::new (\&_run_coro, @_)
346 root 1.8 }
347 root 1.6
348 root 1.92 =item $success = $coroutine->ready
349 root 1.1
350 root 1.92 Put the given coroutine into the ready queue (according to it's priority)
351     and return true. If the coroutine is already in the ready queue, do nothing
352 root 1.90 and return false.
353 root 1.1
354 root 1.92 =item $is_ready = $coroutine->is_ready
355 root 1.90
356 root 1.92 Return wether the coroutine is currently the ready queue or not,
357 root 1.28
358 root 1.92 =item $coroutine->cancel (arg...)
359 root 1.28
360 root 1.92 Terminates the given coroutine and makes it return the given arguments as
361 root 1.103 status (default: the empty list). Never returns if the coroutine is the
362     current coroutine.
363 root 1.28
364     =cut
365    
366     sub cancel {
367 pcg 1.59 my $self = shift;
368     $self->{status} = [@_];
369 root 1.103
370     if ($current == $self) {
371     push @destroy, $self;
372     $manager->ready;
373     &schedule while 1;
374     } else {
375     $self->_cancel;
376     }
377 root 1.40 }
378    
379 root 1.92 =item $coroutine->join
380 root 1.40
381     Wait until the coroutine terminates and return any values given to the
382 pcg 1.59 C<terminate> or C<cancel> functions. C<join> can be called multiple times
383 root 1.92 from multiple coroutine.
384 root 1.40
385     =cut
386    
387     sub join {
388     my $self = shift;
389 root 1.103
390 root 1.40 unless ($self->{status}) {
391 root 1.103 my $current = $current;
392    
393     push @{$self->{destroy_cb}}, sub {
394     $current->ready;
395     undef $current;
396     };
397    
398     &schedule while $current;
399 root 1.40 }
400 root 1.103
401 root 1.40 wantarray ? @{$self->{status}} : $self->{status}[0];
402 root 1.31 }
403    
404 root 1.101 =item $coroutine->on_destroy (\&cb)
405    
406     Registers a callback that is called when this coroutine gets destroyed,
407     but before it is joined. The callback gets passed the terminate arguments,
408     if any.
409    
410     =cut
411    
412     sub on_destroy {
413     my ($self, $cb) = @_;
414    
415     push @{ $self->{destroy_cb} }, $cb;
416     }
417    
418 root 1.92 =item $oldprio = $coroutine->prio ($newprio)
419 root 1.31
420 root 1.41 Sets (or gets, if the argument is missing) the priority of the
421 root 1.92 coroutine. Higher priority coroutines get run before lower priority
422     coroutines. Priorities are small signed integers (currently -4 .. +3),
423 root 1.41 that you can refer to using PRIO_xxx constants (use the import tag :prio
424     to get then):
425 root 1.31
426     PRIO_MAX > PRIO_HIGH > PRIO_NORMAL > PRIO_LOW > PRIO_IDLE > PRIO_MIN
427     3 > 1 > 0 > -1 > -3 > -4
428    
429     # set priority to HIGH
430     current->prio(PRIO_HIGH);
431    
432     The idle coroutine ($Coro::idle) always has a lower priority than any
433     existing coroutine.
434    
435 root 1.92 Changing the priority of the current coroutine will take effect immediately,
436     but changing the priority of coroutines in the ready queue (but not
437 root 1.31 running) will only take effect after the next schedule (of that
438 root 1.92 coroutine). This is a bug that will be fixed in some future version.
439 root 1.31
440 root 1.92 =item $newprio = $coroutine->nice ($change)
441 root 1.31
442     Similar to C<prio>, but subtract the given value from the priority (i.e.
443     higher values mean lower priority, just as in unix).
444    
445 root 1.92 =item $olddesc = $coroutine->desc ($newdesc)
446 root 1.41
447     Sets (or gets in case the argument is missing) the description for this
448 root 1.92 coroutine. This is just a free-form string you can associate with a coroutine.
449 root 1.41
450     =cut
451    
452     sub desc {
453     my $old = $_[0]{desc};
454     $_[0]{desc} = $_[1] if @_ > 1;
455     $old;
456 root 1.8 }
457 root 1.1
458 root 1.8 =back
459 root 1.2
460 root 1.97 =head2 GLOBAL FUNCTIONS
461 root 1.92
462     =over 4
463    
464 root 1.97 =item Coro::nready
465    
466     Returns the number of coroutines that are currently in the ready state,
467 root 1.124 i.e. that can be switched to. The value C<0> means that the only runnable
468 root 1.97 coroutine is the currently running one, so C<cede> would have no effect,
469     and C<schedule> would cause a deadlock unless there is an idle handler
470     that wakes up some coroutines.
471    
472 root 1.103 =item my $guard = Coro::guard { ... }
473    
474 root 1.119 This creates and returns a guard object. Nothing happens until the object
475 root 1.103 gets destroyed, in which case the codeblock given as argument will be
476     executed. This is useful to free locks or other resources in case of a
477     runtime error or when the coroutine gets canceled, as in both cases the
478     guard block will be executed. The guard object supports only one method,
479     C<< ->cancel >>, which will keep the codeblock from being executed.
480    
481     Example: set some flag and clear it again when the coroutine gets canceled
482     or the function returns:
483    
484     sub do_something {
485     my $guard = Coro::guard { $busy = 0 };
486     $busy = 1;
487    
488     # do something that requires $busy to be true
489     }
490    
491     =cut
492    
493     sub guard(&) {
494     bless \(my $cb = $_[0]), "Coro::guard"
495     }
496    
497     sub Coro::guard::cancel {
498     ${$_[0]} = sub { };
499     }
500    
501     sub Coro::guard::DESTROY {
502     ${$_[0]}->();
503     }
504    
505    
506 root 1.92 =item unblock_sub { ... }
507    
508     This utility function takes a BLOCK or code reference and "unblocks" it,
509     returning the new coderef. This means that the new coderef will return
510     immediately without blocking, returning nothing, while the original code
511     ref will be called (with parameters) from within its own coroutine.
512    
513 root 1.124 The reason this function exists is that many event libraries (such as the
514 root 1.92 venerable L<Event|Event> module) are not coroutine-safe (a weaker form
515     of thread-safety). This means you must not block within event callbacks,
516     otherwise you might suffer from crashes or worse.
517    
518     This function allows your callbacks to block by executing them in another
519     coroutine where it is safe to block. One example where blocking is handy
520     is when you use the L<Coro::AIO|Coro::AIO> functions to save results to
521     disk.
522    
523     In short: simply use C<unblock_sub { ... }> instead of C<sub { ... }> when
524     creating event callbacks that want to block.
525    
526     =cut
527    
528     our @unblock_queue;
529    
530 root 1.105 # we create a special coro because we want to cede,
531     # to reduce pressure on the coro pool (because most callbacks
532     # return immediately and can be reused) and because we cannot cede
533     # inside an event callback.
534 root 1.92 our $unblock_scheduler = async {
535     while () {
536     while (my $cb = pop @unblock_queue) {
537 root 1.105 # this is an inlined copy of async_pool
538     my $coro = (pop @pool or new Coro \&pool_handler);
539    
540     $coro->{_invoke} = $cb;
541     $coro->ready;
542     cede; # for short-lived callbacks, this reduces pressure on the coro pool
543 root 1.92 }
544 root 1.105 schedule; # sleep well
545 root 1.92 }
546     };
547    
548     sub unblock_sub(&) {
549     my $cb = shift;
550    
551     sub {
552 root 1.105 unshift @unblock_queue, [$cb, @_];
553 root 1.92 $unblock_scheduler->ready;
554     }
555     }
556    
557     =back
558    
559 root 1.8 =cut
560 root 1.2
561 root 1.8 1;
562 root 1.14
563 root 1.17 =head1 BUGS/LIMITATIONS
564 root 1.14
565 root 1.52 - you must make very sure that no coro is still active on global
566 root 1.53 destruction. very bad things might happen otherwise (usually segfaults).
567 root 1.52
568     - this module is not thread-safe. You should only ever use this module
569 root 1.124 from the same thread (this requirement might be loosened in the future
570 root 1.52 to allow per-thread schedulers, but Coro::State does not yet allow
571     this).
572 root 1.9
573     =head1 SEE ALSO
574    
575 root 1.67 Support/Utility: L<Coro::Cont>, L<Coro::Specific>, L<Coro::State>, L<Coro::Util>.
576    
577     Locking/IPC: L<Coro::Signal>, L<Coro::Channel>, L<Coro::Semaphore>, L<Coro::SemaphoreSet>, L<Coro::RWLock>.
578    
579     Event/IO: L<Coro::Timer>, L<Coro::Event>, L<Coro::Handle>, L<Coro::Socket>, L<Coro::Select>.
580    
581     Embedding: L<Coro:MakeMaker>
582 root 1.1
583     =head1 AUTHOR
584    
585 root 1.66 Marc Lehmann <schmorp@schmorp.de>
586 root 1.64 http://home.schmorp.de/
587 root 1.1
588     =cut
589