ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/Coro/Coro.pm
(Generate patch)

Comparing Coro/Coro.pm (file contents):
Revision 1.222 by root, Tue Nov 18 08:59:46 2008 UTC vs.
Revision 1.233 by root, Fri Nov 21 06:02:07 2008 UTC

67 67
68our $idle; # idle handler 68our $idle; # idle handler
69our $main; # main coroutine 69our $main; # main coroutine
70our $current; # current coroutine 70our $current; # current coroutine
71 71
72our $VERSION = 5.0; 72our $VERSION = "5.0";
73 73
74our @EXPORT = qw(async async_pool cede schedule terminate current unblock_sub); 74our @EXPORT = qw(async async_pool cede schedule terminate current unblock_sub);
75our %EXPORT_TAGS = ( 75our %EXPORT_TAGS = (
76 prio => [qw(PRIO_MAX PRIO_HIGH PRIO_NORMAL PRIO_LOW PRIO_IDLE PRIO_MIN)], 76 prio => [qw(PRIO_MAX PRIO_HIGH PRIO_NORMAL PRIO_LOW PRIO_IDLE PRIO_MIN)],
77); 77);
135$idle = sub { 135$idle = sub {
136 require Carp; 136 require Carp;
137 Carp::croak ("FATAL: deadlock detected"); 137 Carp::croak ("FATAL: deadlock detected");
138}; 138};
139 139
140sub _cancel {
141 my ($self) = @_;
142
143 # free coroutine data and mark as destructed
144 $self->_destroy
145 or return;
146
147 # call all destruction callbacks
148 $_->(@{$self->{_status}})
149 for @{ delete $self->{_on_destroy} || [] };
150}
151
152# this coroutine is necessary because a coroutine 140# this coroutine is necessary because a coroutine
153# cannot destroy itself. 141# cannot destroy itself.
154my @destroy; 142our @destroy;
155my $manager; 143our $manager;
156 144
157$manager = new Coro sub { 145$manager = new Coro sub {
158 while () { 146 while () {
159 (shift @destroy)->_cancel 147 Coro::_cancel shift @destroy
160 while @destroy; 148 while @destroy;
161 149
162 &schedule; 150 &schedule;
163 } 151 }
164}; 152};
212Similar to C<async>, but uses a coroutine pool, so you should not call 200Similar to C<async>, but uses a coroutine pool, so you should not call
213terminate or join on it (although you are allowed to), and you get a 201terminate or join on it (although you are allowed to), and you get a
214coroutine that might have executed other code already (which can be good 202coroutine that might have executed other code already (which can be good
215or bad :). 203or bad :).
216 204
217On the plus side, this function is faster than creating (and destroying) 205On the plus side, this function is about twice as fast as creating (and
218a completly new coroutine, so if you need a lot of generic coroutines in 206destroying) a completely new coroutine, so if you need a lot of generic
219quick successsion, use C<async_pool>, not C<async>. 207coroutines in quick successsion, use C<async_pool>, not C<async>.
220 208
221The code block is executed in an C<eval> context and a warning will be 209The code block is executed in an C<eval> context and a warning will be
222issued in case of an exception instead of terminating the program, as 210issued in case of an exception instead of terminating the program, as
223C<async> does. As the coroutine is being reused, stuff like C<on_destroy> 211C<async> does. As the coroutine is being reused, stuff like C<on_destroy>
224will not work in the expected way, unless you call terminate or cancel, 212will not work in the expected way, unless you call terminate or cancel,
237coros as required. 225coros as required.
238 226
239If you are concerned about pooled coroutines growing a lot because a 227If you are concerned about pooled coroutines growing a lot because a
240single C<async_pool> used a lot of stackspace you can e.g. C<async_pool 228single C<async_pool> used a lot of stackspace you can e.g. C<async_pool
241{ terminate }> once per second or so to slowly replenish the pool. In 229{ terminate }> once per second or so to slowly replenish the pool. In
242addition to that, when the stacks used by a handler grows larger than 16kb 230addition to that, when the stacks used by a handler grows larger than 32kb
243(adjustable via $Coro::POOL_RSS) it will also be destroyed. 231(adjustable via $Coro::POOL_RSS) it will also be destroyed.
244 232
245=cut 233=cut
246 234
247our $POOL_SIZE = 8; 235our $POOL_SIZE = 8;
248our $POOL_RSS = 16 * 1024; 236our $POOL_RSS = 32 * 1024;
249our @async_pool; 237our @async_pool;
250 238
251sub pool_handler { 239sub pool_handler {
252 my $cb;
253
254 while () { 240 while () {
255 eval { 241 eval {
256 while () { 242 &{&_pool_handler} while 1;
257 _pool_1 $cb;
258 &$cb;
259 _pool_2 $cb;
260 &schedule;
261 }
262 }; 243 };
263 244
264 if ($@) {
265 last if $@ eq "\3async_pool terminate\2\n";
266 warn $@; 245 warn $@ if $@;
267 }
268 } 246 }
269}
270
271sub async_pool(&@) {
272 # this is also inlined into the unblock_scheduler
273 my $coro = (pop @async_pool) || new Coro \&pool_handler;
274
275 $coro->{_invoke} = [@_];
276 $coro->ready;
277
278 $coro
279} 247}
280 248
281=back 249=back
282 250
283=head2 STATIC METHODS 251=head2 STATIC METHODS
305>> on that once some event happens, and last you call C<schedule> to put 273>> on that once some event happens, and last you call C<schedule> to put
306yourself to sleep. Note that a lot of things can wake your coroutine up, 274yourself to sleep. Note that a lot of things can wake your coroutine up,
307so you need to check whether the event indeed happened, e.g. by storing the 275so you need to check whether the event indeed happened, e.g. by storing the
308status in a variable. 276status in a variable.
309 277
310The canonical way to wait on external events is this: 278See B<HOW TO WAIT FOR A CALLBACK>, below, for some ways to wait for callbacks.
311
312 {
313 # remember current coroutine
314 my $current = $Coro::current;
315
316 # register a hypothetical event handler
317 on_event_invoke sub {
318 # wake up sleeping coroutine
319 $current->ready;
320 undef $current;
321 };
322
323 # call schedule until event occurred.
324 # in case we are woken up for other reasons
325 # (current still defined), loop.
326 Coro::schedule while $current;
327 }
328 279
329=item cede 280=item cede
330 281
331"Cede" to other coroutines. This function puts the current coroutine into 282"Cede" to other coroutines. This function puts the current coroutine into
332the ready queue and calls C<schedule>, which has the effect of giving 283the ready queue and calls C<schedule>, which has the effect of giving
356you cannot free all of them, so if a coroutine that is not the main 307you cannot free all of them, so if a coroutine that is not the main
357program calls this function, there will be some one-time resource leak. 308program calls this function, there will be some one-time resource leak.
358 309
359=cut 310=cut
360 311
361sub terminate {
362 $current->cancel (@_);
363}
364
365sub killall { 312sub killall {
366 for (Coro::State::list) { 313 for (Coro::State::list) {
367 $_->cancel 314 $_->cancel
368 if $_ != $current && UNIVERSAL::isa $_, "Coro"; 315 if $_ != $current && UNIVERSAL::isa $_, "Coro";
369 } 316 }
388See C<async> and C<Coro::State::new> for additional info about the 335See C<async> and C<Coro::State::new> for additional info about the
389coroutine environment. 336coroutine environment.
390 337
391=cut 338=cut
392 339
393sub _run_coro { 340sub _terminate {
394 terminate &{+shift}; 341 terminate &{+shift};
395}
396
397sub new {
398 my $class = shift;
399
400 $class->SUPER::new (\&_run_coro, @_)
401} 342}
402 343
403=item $success = $coroutine->ready 344=item $success = $coroutine->ready
404 345
405Put the given coroutine into the end of its ready queue (there is one 346Put the given coroutine into the end of its ready queue (there is one
422 363
423=cut 364=cut
424 365
425sub cancel { 366sub cancel {
426 my $self = shift; 367 my $self = shift;
427 $self->{_status} = [@_];
428 368
429 if ($current == $self) { 369 if ($current == $self) {
430 push @destroy, $self; 370 terminate @_;
431 $manager->ready;
432 &schedule while 1;
433 } else { 371 } else {
372 $self->{_status} = [@_];
434 $self->_cancel; 373 $self->_cancel;
435 } 374 }
436} 375}
376
377=item $coroutine->schedule_to
378
379Puts the current coroutine to sleep (like C<Coro::schedule>), but instead
380of continuing with the next coro from the ready queue, always switch to
381the given coroutine object (regardless of priority etc.). The readyness
382state of that coroutine isn't changed.
383
384This is an advanced method for special cases - I'd love to hear about any
385uses for this one.
386
387=item $coroutine->cede_to
388
389Like C<schedule_to>, but puts the current coroutine into the ready
390queue. This has the effect of temporarily switching to the given
391coroutine, and continuing some time later.
392
393This is an advanced method for special cases - I'd love to hear about any
394uses for this one.
437 395
438=item $coroutine->throw ([$scalar]) 396=item $coroutine->throw ([$scalar])
439 397
440If C<$throw> is specified and defined, it will be thrown as an exception 398If C<$throw> is specified and defined, it will be thrown as an exception
441inside the coroutine at the next convenient point in time. Otherwise 399inside the coroutine at the next convenient point in time. Otherwise
442clears the exception object. 400clears the exception object.
443 401
444Coro will check for the exception each time a schedule-like-function 402Coro will check for the exception each time a schedule-like-function
445returns, i.e. after each C<schedule>, C<cede>, C<< Coro::Semaphore->down 403returns, i.e. after each C<schedule>, C<cede>, C<< Coro::Semaphore->down
446>>, C<< Coro::Handle->readable >> and so on. Note that this means that 404>>, C<< Coro::Handle->readable >> and so on. Most of these functions
447when a coroutine is acquiring a lock, it might only throw after it has 405detect this case and return early in case an exception is pending.
448sucessfully acquired it.
449 406
450The exception object will be thrown "as is" with the specified scalar in 407The exception object will be thrown "as is" with the specified scalar in
451C<$@>, i.e. if it is a string, no line number or newline will be appended 408C<$@>, i.e. if it is a string, no line number or newline will be appended
452(unlike with C<die>). 409(unlike with C<die>).
453 410
539 496
540sub desc { 497sub desc {
541 my $old = $_[0]{desc}; 498 my $old = $_[0]{desc};
542 $_[0]{desc} = $_[1] if @_ > 1; 499 $_[0]{desc} = $_[1] if @_ > 1;
543 $old; 500 $old;
501}
502
503sub transfer {
504 require Carp;
505 Carp::croak ("You must not call ->transfer on Coro objects. Use Coro::State objects or the ->schedule_to method. Caught");
544} 506}
545 507
546=back 508=back
547 509
548=head2 GLOBAL FUNCTIONS 510=head2 GLOBAL FUNCTIONS
633# return immediately and can be reused) and because we cannot cede 595# return immediately and can be reused) and because we cannot cede
634# inside an event callback. 596# inside an event callback.
635our $unblock_scheduler = new Coro sub { 597our $unblock_scheduler = new Coro sub {
636 while () { 598 while () {
637 while (my $cb = pop @unblock_queue) { 599 while (my $cb = pop @unblock_queue) {
638 # this is an inlined copy of async_pool 600 &async_pool (@$cb);
639 my $coro = (pop @async_pool) || new Coro \&pool_handler;
640 601
641 $coro->{_invoke} = $cb;
642 $coro->ready;
643 cede; # for short-lived callbacks, this reduces pressure on the coro pool 602 # for short-lived callbacks, this reduces pressure on the coro pool
603 # as the chance is very high that the async_poll coro will be back
604 # in the idle state when cede returns
605 cede;
644 } 606 }
645 schedule; # sleep well 607 schedule; # sleep well
646 } 608 }
647}; 609};
648$unblock_scheduler->{desc} = "[unblock_sub scheduler]"; 610$unblock_scheduler->{desc} = "[unblock_sub scheduler]";
654 unshift @unblock_queue, [$cb, @_]; 616 unshift @unblock_queue, [$cb, @_];
655 $unblock_scheduler->ready; 617 $unblock_scheduler->ready;
656 } 618 }
657} 619}
658 620
621=item $cb = Coro::rouse_cb
622
623Create and return a "rouse callback". That's a code reference that, when
624called, will save its arguments and notify the owner coroutine of the
625callback.
626
627See the next function.
628
629=item @args = Coro::rouse_wait [$cb]
630
631Wait for the specified rouse callback (or the last one tht was created in
632this coroutine).
633
634As soon as the callback is invoked (or when the calback was invoked before
635C<rouse_wait>), it will return a copy of the arguments originally passed
636to the rouse callback.
637
638See the section B<HOW TO WAIT FOR A CALLBACK> for an actual usage example.
639
659=back 640=back
660 641
661=cut 642=cut
662 643
6631; 6441;
645
646=head1 HOW TO WAIT FOR A CALLBACK
647
648It is very common for a coroutine to wait for some callback to be
649called. This occurs naturally when you use coroutines in an otherwise
650event-based program, or when you use event-based libraries.
651
652These typically register a callback for some event, and call that callback
653when the event occured. In a coroutine, however, you typically want to
654just wait for the event, simplyifying things.
655
656For example C<< AnyEvent->child >> registers a callback to be called when
657a specific child has exited:
658
659 my $child_watcher = AnyEvent->child (pid => $pid, cb => sub { ... });
660
661But from withina coroutine, you often just want to write this:
662
663 my $status = wait_for_child $pid;
664
665Coro offers two functions specifically designed to make this easy,
666C<Coro::rouse_cb> and C<Coro::rouse_wait>.
667
668The first function, C<rouse_cb>, generates and returns a callback that,
669when invoked, will save it's arguments and notify the coroutine that
670created the callback.
671
672The second function, C<rouse_wait>, waits for the callback to be called
673(by calling C<schedule> to go to sleep) and returns the arguments
674originally passed to the callback.
675
676Using these functions, it becomes easy to write the C<wait_for_child>
677function mentioned above:
678
679 sub wait_for_child($) {
680 my ($pid) = @_;
681
682 my $watcher = AnyEvent->child (pid => $pid, cb => Coro::rouse_cb);
683
684 my ($rpid, $rstatus) = Coro::rouse_wait;
685 $rstatus
686 }
687
688In the case where C<rouse_cb> and C<rouse_wait> are not flexible enough,
689you can roll your own, using C<schedule>:
690
691 sub wait_for_child($) {
692 my ($pid) = @_;
693
694 # store the current coroutine in $current,
695 # and provide result variables for the closure passed to ->child
696 my $current = $Coro::current;
697 my ($done, $rstatus);
698
699 # pass a closure to ->child
700 my $watcher = AnyEvent->child (pid => $pid, cb => sub {
701 $rstatus = $_[1]; # remember rstatus
702 $done = 1; # mark $rstatus as valud
703 });
704
705 # wait until the closure has been called
706 schedule while !$done;
707
708 $rstatus
709 }
710
664 711
665=head1 BUGS/LIMITATIONS 712=head1 BUGS/LIMITATIONS
666 713
667=over 4 714=over 4
668 715

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines