ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/Coro/Coro.pm
(Generate patch)

Comparing Coro/Coro.pm (file contents):
Revision 1.54 by pcg, Sun Sep 28 09:00:48 2003 UTC vs.
Revision 1.78 by root, Wed Nov 1 01:21:21 2006 UTC

30 30
31=cut 31=cut
32 32
33package Coro; 33package Coro;
34 34
35use strict;
35no warnings qw(uninitialized); 36no warnings "uninitialized";
36 37
37use Coro::State; 38use Coro::State;
38 39
39use base Exporter; 40use base Exporter::;
40 41
41$VERSION = 0.7; 42our $idle; # idle coroutine
43our $main; # main coroutine
44our $current; # current coroutine
42 45
46our $VERSION = '2.1';
47
43@EXPORT = qw(async cede schedule terminate current); 48our @EXPORT = qw(async cede schedule terminate current);
44%EXPORT_TAGS = ( 49our %EXPORT_TAGS = (
45 prio => [qw(PRIO_MAX PRIO_HIGH PRIO_NORMAL PRIO_LOW PRIO_IDLE PRIO_MIN)], 50 prio => [qw(PRIO_MAX PRIO_HIGH PRIO_NORMAL PRIO_LOW PRIO_IDLE PRIO_MIN)],
46); 51);
47@EXPORT_OK = @{$EXPORT_TAGS{prio}}; 52our @EXPORT_OK = @{$EXPORT_TAGS{prio}};
48 53
49{ 54{
50 my @async; 55 my @async;
51 my $init; 56 my $init;
52 57
53 # this way of handling attributes simply is NOT scalable ;() 58 # this way of handling attributes simply is NOT scalable ;()
54 sub import { 59 sub import {
60 no strict 'refs';
61
55 Coro->export_to_level(1, @_); 62 Coro->export_to_level(1, @_);
63
56 my $old = *{(caller)[0]."::MODIFY_CODE_ATTRIBUTES"}{CODE}; 64 my $old = *{(caller)[0]."::MODIFY_CODE_ATTRIBUTES"}{CODE};
57 *{(caller)[0]."::MODIFY_CODE_ATTRIBUTES"} = sub { 65 *{(caller)[0]."::MODIFY_CODE_ATTRIBUTES"} = sub {
58 my ($package, $ref) = (shift, shift); 66 my ($package, $ref) = (shift, shift);
59 my @attrs; 67 my @attrs;
60 for (@_) { 68 for (@_) {
83 91
84This coroutine represents the main program. 92This coroutine represents the main program.
85 93
86=cut 94=cut
87 95
88our $main = new Coro; 96$main = new Coro;
89 97
90=item $current (or as function: current) 98=item $current (or as function: current)
91 99
92The current coroutine (the last coroutine switched to). The initial value is C<$main> (of course). 100The current coroutine (the last coroutine switched to). The initial value is C<$main> (of course).
93 101
96# maybe some other module used Coro::Specific before... 104# maybe some other module used Coro::Specific before...
97if ($current) { 105if ($current) {
98 $main->{specific} = $current->{specific}; 106 $main->{specific} = $current->{specific};
99} 107}
100 108
101our $current = $main; 109$current = $main;
102 110
103sub current() { $current } 111sub current() { $current }
104 112
105=item $idle 113=item $idle
106 114
108implementation prints "FATAL: deadlock detected" and exits. 116implementation prints "FATAL: deadlock detected" and exits.
109 117
110=cut 118=cut
111 119
112# should be done using priorities :( 120# should be done using priorities :(
113our $idle = new Coro sub { 121$idle = new Coro sub {
114 print STDERR "FATAL: deadlock detected\n"; 122 print STDERR "FATAL: deadlock detected\n";
115 exit(51); 123 exit(51);
116}; 124};
117 125
118# this coroutine is necessary because a coroutine 126# this coroutine is necessary because a coroutine
119# cannot destroy itself. 127# cannot destroy itself.
120my @destroy; 128my @destroy;
121my $manager; 129my $manager;
122$manager = new Coro sub { 130$manager = new Coro sub {
123 while() { 131 while () {
124 # by overwriting the state object with the manager we destroy it 132 # by overwriting the state object with the manager we destroy it
125 # while still being able to schedule this coroutine (in case it has 133 # while still being able to schedule this coroutine (in case it has
126 # been readied multiple times. this is harmless since the manager 134 # been readied multiple times. this is harmless since the manager
127 # can be called as many times as neccessary and will always 135 # can be called as many times as neccessary and will always
128 # remove itself from the runqueue 136 # remove itself from the runqueue
129 while (@destroy) { 137 while (@destroy) {
130 my $coro = pop @destroy; 138 my $coro = pop @destroy;
131 $coro->{status} ||= []; 139 $coro->{status} ||= [];
132 $_->ready for @{delete $coro->{join} || []}; 140 $_->ready for @{delete $coro->{join} || []};
141
142 # the next line destroys the _coro_state, but keeps the
143 # process itself intact (we basically make it a zombie
144 # process that always runs the manager thread, so it's possible
145 # to transfer() to this process).
133 $coro->{_coro_state} = $manager->{_coro_state}; 146 $coro->{_coro_state} = $manager->{_coro_state};
134 } 147 }
135 &schedule; 148 &schedule;
136 } 149 }
137}; 150};
155 # create a new coroutine that just prints its arguments 168 # create a new coroutine that just prints its arguments
156 async { 169 async {
157 print "@_\n"; 170 print "@_\n";
158 } 1,2,3,4; 171 } 1,2,3,4;
159 172
160The coderef you submit MUST NOT be a closure that refers to variables
161in an outer scope. This does NOT work. Pass arguments into it instead.
162
163=cut 173=cut
164 174
165sub async(&@) { 175sub async(&@) {
166 my $pid = new Coro @_; 176 my $pid = new Coro @_;
167 $manager->ready; # this ensures that the stack is cloned from the manager 177 $manager->ready; # this ensures that the stack is cloned from the manager
185 195
186=cut 196=cut
187 197
188=item terminate [arg...] 198=item terminate [arg...]
189 199
190Terminates the current process. 200Terminates the current process with the given status values (see L<cancel>).
191
192Future versions of this function will allow result arguments.
193 201
194=cut 202=cut
195 203
196sub terminate { 204sub terminate {
197 $current->{status} = [@_];
198 $current->cancel; 205 $current->cancel (@_);
199 &schedule;
200 die; # NORETURN
201} 206}
202 207
203=back 208=back
204 209
205# dynamic methods 210# dynamic methods
234 239
235Put the given process into the ready queue. 240Put the given process into the ready queue.
236 241
237=cut 242=cut
238 243
239=item $process->cancel 244=item $process->cancel (arg...)
240 245
241Like C<terminate>, but terminates the specified process instead. 246Temrinates the given process and makes it return the given arguments as
247status (default: the empty list).
242 248
243=cut 249=cut
244 250
245sub cancel { 251sub cancel {
252 my $self = shift;
253 $self->{status} = [@_];
246 push @destroy, $_[0]; 254 push @destroy, $self;
247 $manager->ready; 255 $manager->ready;
248 &schedule if $current == $_[0]; 256 &schedule if $current == $self;
249} 257}
250 258
251=item $process->join 259=item $process->join
252 260
253Wait until the coroutine terminates and return any values given to the 261Wait until the coroutine terminates and return any values given to the
254C<terminate> function. C<join> can be called multiple times from multiple 262C<terminate> or C<cancel> functions. C<join> can be called multiple times
255processes. 263from multiple processes.
256 264
257=cut 265=cut
258 266
259sub join { 267sub join {
260 my $self = shift; 268 my $self = shift;
335 to allow per-thread schedulers, but Coro::State does not yet allow 343 to allow per-thread schedulers, but Coro::State does not yet allow
336 this). 344 this).
337 345
338=head1 SEE ALSO 346=head1 SEE ALSO
339 347
340L<Coro::Channel>, L<Coro::Cont>, L<Coro::Specific>, L<Coro::Semaphore>, 348Support/Utility: L<Coro::Cont>, L<Coro::Specific>, L<Coro::State>, L<Coro::Util>.
341L<Coro::Signal>, L<Coro::State>, L<Coro::Timer>, L<Coro::Event>, 349
342L<Coro::L<Coro::RWLock>, Handle>, L<Coro::Socket>. 350Locking/IPC: L<Coro::Signal>, L<Coro::Channel>, L<Coro::Semaphore>, L<Coro::SemaphoreSet>, L<Coro::RWLock>.
351
352Event/IO: L<Coro::Timer>, L<Coro::Event>, L<Coro::Handle>, L<Coro::Socket>, L<Coro::Select>.
353
354Embedding: L<Coro:MakeMaker>
343 355
344=head1 AUTHOR 356=head1 AUTHOR
345 357
346 Marc Lehmann <pcg@goof.com> 358 Marc Lehmann <schmorp@schmorp.de>
347 http://www.goof.com/pcg/marc/ 359 http://home.schmorp.de/
348 360
349=cut 361=cut
350 362

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines