--- Coro/Coro/State.xs 2008/11/08 04:52:01 1.255 +++ Coro/Coro/State.xs 2008/11/14 06:29:52 1.267 @@ -48,9 +48,6 @@ #if CORO_USE_VALGRIND # include -# define REGISTER_STACK(cctx,start,end) (cctx)->valgrind_id = VALGRIND_STACK_REGISTER ((start), (end)) -#else -# define REGISTER_STACK(cctx,start,end) #endif /* the maximum number of idle cctx that will be pooled */ @@ -128,12 +125,12 @@ #if __GNUC__ >= 3 # define attribute(x) __attribute__(x) -# define BARRIER __asm__ __volatile__ ("" : : : "memory") # define expect(expr,value) __builtin_expect ((expr),(value)) +# define INLINE static inline #else # define attribute(x) -# define BARRIER # define expect(expr,value) (expr) +# define INLINE static #endif #define expect_false(expr) expect ((expr) != 0, 0) @@ -144,14 +141,26 @@ #include "CoroAPI.h" #ifdef USE_ITHREADS + static perl_mutex coro_lock; # define LOCK do { MUTEX_LOCK (&coro_lock); } while (0) # define UNLOCK do { MUTEX_UNLOCK (&coro_lock); } while (0) +# if CORO_PTHREAD +static void *coro_thx; +# endif + #else + # define LOCK (void)0 # define UNLOCK (void)0 + #endif +# undef LOCK +# define LOCK (void)0 +# undef UNLOCK +# define UNLOCK (void)0 + /* helper storage struct for Coro::AIO */ struct io_state { @@ -171,6 +180,12 @@ static JMPENV *main_top_env; static HV *coro_state_stash, *coro_stash; static volatile SV *coro_mortal; /* will be freed/thrown after next transfer */ +static volatile struct coro *transfer_next; + +struct transfer_args +{ + struct coro *prev, *next; +}; static GV *irsgv; /* $/ */ static GV *stdoutgv; /* *STDOUT */ @@ -282,7 +297,7 @@ /* for Coro.pm */ static SV *coro_current; static SV *coro_readyhook; -static AV *coro_ready [PRIO_MAX-PRIO_MIN+1]; +static AV *coro_ready [PRIO_MAX - PRIO_MIN + 1]; static int coro_nready; static struct coro *coro_first; @@ -397,7 +412,7 @@ #define CORO_MAGIC_cv(cv) CORO_MAGIC (((SV *)(cv)), CORO_MAGIC_type_cv) #define CORO_MAGIC_state(sv) CORO_MAGIC (((SV *)(sv)), CORO_MAGIC_type_state) -static struct coro * +INLINE struct coro * SvSTATE_ (pTHX_ SV *coro) { HV *stash; @@ -698,6 +713,33 @@ return rss; } +/** set stacklevel support **************************************************/ + +/* we sometimes need to create the effect of pp_set_stacklevel calling us */ +#define SSL_HEAD (void)0 +/* we sometimes need to create the effect of leaving via pp_set_stacklevel */ +#define SSL_TAIL set_stacklevel_tail (aTHX) + +INLINE void +set_stacklevel_tail (pTHX) +{ + dSP; + SV **bot = SP; + + int gimme = GIMME_V; + + /* make sure we put something on the stack in scalar context */ + if (gimme == G_SCALAR) + { + if (sp == bot) + XPUSHs (&PL_sv_undef); + + SP = bot + 1; + } + + PUTBACK; +} + /** coroutine stack handling ************************************************/ static int (*orig_sigelem_get) (pTHX_ SV *sv, MAGIC *mg); @@ -824,9 +866,9 @@ { dSP; - LOGOP myop; + UNOP myop; - Zero (&myop, 1, LOGOP); + Zero (&myop, 1, UNOP); myop.op_next = Nullop; myop.op_flags = OPf_WANT_VOID; @@ -839,11 +881,10 @@ } /* this newly created coroutine might be run on an existing cctx which most - * likely was suspended in set_stacklevel, called from entersub. - * set_stacklevl doesn't do anything on return, but entersub does LEAVE, - * so we ENTER here for symmetry + * likely was suspended in set_stacklevel, called from pp_set_stacklevel, + * so we have to emulate entering pp_set_stacklevel here. */ - ENTER; + SSL_HEAD; } static void @@ -882,7 +923,7 @@ coro_destruct_stacks (aTHX); } -static void +INLINE void free_coro_mortal (pTHX) { if (expect_true (coro_mortal)) @@ -1016,27 +1057,35 @@ return 0; } +static void +prepare_set_stacklevel (struct transfer_args *ta, struct coro_cctx *cctx) +{ + ta->prev = (struct coro *)cctx; + ta->next = 0; +} + /* inject a fake call to Coro::State::_cctx_init into the execution */ /* _cctx_init should be careful, as it could be called at almost any time */ /* during execution of a perl program */ +/* also initialises PL_top_env */ static void NOINLINE cctx_prepare (pTHX_ coro_cctx *cctx) { dSP; - LOGOP myop; + UNOP myop; PL_top_env = &PL_start_env; if (cctx->flags & CC_TRACE) PL_runops = runops_trace; - Zero (&myop, 1, LOGOP); - myop.op_next = PL_op; + Zero (&myop, 1, UNOP); + myop.op_next = PL_op; myop.op_flags = OPf_WANT_VOID | OPf_STACKED; PUSHMARK (SP); EXTEND (SP, 2); - PUSHs (sv_2mortal (newSViv (PTR2IV (cctx)))); + PUSHs (sv_2mortal (newSViv ((IV)cctx))); PUSHs ((SV *)get_cv ("Coro::State::_cctx_init", FALSE)); PUTBACK; PL_op = (OP *)&myop; @@ -1044,49 +1093,104 @@ SPAGAIN; } +/* the tail of transfer: execute stuff we can only do after a transfer */ +INLINE void +transfer_tail (pTHX) +{ + struct coro *next = (struct coro *)transfer_next; + assert (!(transfer_next = 0)); /* just used for the side effect when asserts are enabled */ + assert (("FATAL: next coroutine was zero in transfer_tail (please report)", next)); + + free_coro_mortal (aTHX); + UNLOCK; + + if (expect_false (next->throw)) + { + SV *exception = sv_2mortal (next->throw); + + next->throw = 0; + sv_setsv (ERRSV, exception); + croak (0); + } +} + /* * this is a _very_ stripped down perl interpreter ;) */ static void cctx_run (void *arg) { - dTHX; +#ifdef USE_ITHREADS +# if CORO_PTHREAD + PERL_SET_CONTEXT (coro_thx); +# endif +#endif + { + dTHX; - /* cctx_run is the alternative tail of transfer(), so unlock here. */ - UNLOCK; + /* we are the alternative tail to pp_set_stacklevel */ + /* so do the same things here */ + SSL_TAIL; - /* we now skip the entersub that lead to transfer() */ - PL_op = PL_op->op_next; + /* we now skip the op that did lead to transfer() */ + PL_op = PL_op->op_next; - /* inject a fake subroutine call to cctx_init */ - cctx_prepare (aTHX_ (coro_cctx *)arg); + /* inject a fake subroutine call to cctx_init */ + cctx_prepare (aTHX_ (coro_cctx *)arg); - /* somebody or something will hit me for both perl_run and PL_restartop */ - PL_restartop = PL_op; - perl_run (PL_curinterp); + /* cctx_run is the alternative tail of transfer() */ + transfer_tail (aTHX); - /* - * If perl-run returns we assume exit() was being called or the coro - * fell off the end, which seems to be the only valid (non-bug) - * reason for perl_run to return. We try to exit by jumping to the - * bootstrap-time "top" top_env, as we cannot restore the "main" - * coroutine as Coro has no such concept - */ - PL_top_env = main_top_env; - JMPENV_JUMP (2); /* I do not feel well about the hardcoded 2 at all */ + /* somebody or something will hit me for both perl_run and PL_restartop */ + PL_restartop = PL_op; + perl_run (PL_curinterp); + + /* + * If perl-run returns we assume exit() was being called or the coro + * fell off the end, which seems to be the only valid (non-bug) + * reason for perl_run to return. We try to exit by jumping to the + * bootstrap-time "top" top_env, as we cannot restore the "main" + * coroutine as Coro has no such concept + */ + PL_top_env = main_top_env; + JMPENV_JUMP (2); /* I do not feel well about the hardcoded 2 at all */ + } } static coro_cctx * cctx_new () { coro_cctx *cctx; - void *stack_start; - size_t stack_size; ++cctx_count; - Newz (0, cctx, 1, coro_cctx); + New (0, cctx, 1, coro_cctx); + + cctx->gen = cctx_gen; + cctx->flags = 0; + cctx->idle_sp = 0; /* can be accessed by transfer between cctx_run and set_stacklevel, on throw */ + + return cctx; +} + +/* create a new cctx only suitable as source */ +static coro_cctx * +cctx_new_empty () +{ + coro_cctx *cctx = cctx_new (); + + cctx->sptr = 0; + coro_create (&cctx->cctx, 0, 0, 0, 0); - cctx->gen = cctx_gen; + return cctx; +} + +/* create a new cctx suitable as destination/running a perl interpreter */ +static coro_cctx * +cctx_new_run () +{ + coro_cctx *cctx = cctx_new (); + void *stack_start; + size_t stack_size; #if HAVE_MMAP cctx->ssize = ((cctx_stacksize * sizeof (long) + PAGESIZE - 1) / PAGESIZE + CORO_STACKGUARD) * PAGESIZE; @@ -1095,11 +1199,11 @@ if (cctx->sptr != (void *)-1) { -# if CORO_STACKGUARD - mprotect (cctx->sptr, CORO_STACKGUARD * PAGESIZE, PROT_NONE); -# endif - stack_start = CORO_STACKGUARD * PAGESIZE + (char *)cctx->sptr; - stack_size = cctx->ssize - CORO_STACKGUARD * PAGESIZE; + #if CORO_STACKGUARD + mprotect (cctx->sptr, CORO_STACKGUARD * PAGESIZE, PROT_NONE); + #endif + stack_start = (char *)cctx->sptr + CORO_STACKGUARD * PAGESIZE; + stack_size = cctx->ssize - CORO_STACKGUARD * PAGESIZE; cctx->flags |= CC_MAPPED; } else @@ -1110,7 +1214,7 @@ if (!cctx->sptr) { - perror ("FATAL: unable to allocate stack for coroutine"); + perror ("FATAL: unable to allocate stack for coroutine, exiting."); _exit (EXIT_FAILURE); } @@ -1118,7 +1222,10 @@ stack_size = cctx->ssize; } - REGISTER_STACK (cctx, (char *)stack_start, (char *)stack_start + stack_size); + #if CORO_USE_VALGRIND + cctx->valgrind_id = VALGRIND_STACK_REGISTER ((char *)stack_start, (char *)stack_start + stack_size); + #endif + coro_create (&cctx->cctx, cctx_run, (void *)cctx, stack_start, stack_size); return cctx; @@ -1136,9 +1243,9 @@ /* coro_transfer creates new, empty cctx's */ if (cctx->sptr) { -#if CORO_USE_VALGRIND - VALGRIND_STACK_DEREGISTER (cctx->valgrind_id); -#endif + #if CORO_USE_VALGRIND + VALGRIND_STACK_DEREGISTER (cctx->valgrind_id); + #endif #if HAVE_MMAP if (cctx->flags & CC_MAPPED) @@ -1169,13 +1276,13 @@ cctx_destroy (cctx); } - return cctx_new (); + return cctx_new_run (); } static void cctx_put (coro_cctx *cctx) { - assert (("cctx_put called on non-initialised cctx", cctx->sptr)); + assert (("FATAL: cctx_put called on non-initialised cctx in Coro (please report)", cctx->sptr)); /* free another cctx if overlimit */ if (expect_false (cctx_idle >= cctx_max_idle)) @@ -1229,17 +1336,12 @@ } else if (expect_true (prev != next)) { - static volatile int has_throw; coro_cctx *prev__cctx; if (expect_false (prev->flags & CF_NEW)) { /* create a new empty/source context */ - ++cctx_count; - New (0, prev->cctx, 1, coro_cctx); - prev->cctx->sptr = 0; - coro_create (&prev->cctx->cctx, 0, 0, 0, 0); - + prev->cctx = cctx_new_empty (); prev->flags &= ~CF_NEW; prev->flags |= CF_RUNNING; } @@ -1264,7 +1366,7 @@ prev__cctx = prev->cctx; - /* possibly "free" the cctx */ + /* possibly untie and reuse the cctx */ if (expect_true ( prev__cctx->idle_sp == STACKLEVEL && !(prev__cctx->flags & CC_TRACE) @@ -1272,7 +1374,7 @@ )) { /* I assume that STACKLEVEL is a stronger indicator than PL_top_env changes */ - assert (("ERROR: current top_env must equal previous top_env", PL_top_env == prev__cctx->idle_te)); + assert (("FATAL: current top_env must equal previous top_env in Coro (please report)", PL_top_env == prev__cctx->idle_te)); prev->cctx = 0; @@ -1290,7 +1392,8 @@ if (expect_true (!next->cctx)) next->cctx = cctx_get (aTHX); - has_throw = !!next->throw; + assert (("FATAL: transfer_next already nonzero in Coro (please report)", !transfer_next)); + transfer_next = next; if (expect_false (prev__cctx != next->cctx)) { @@ -1299,29 +1402,10 @@ coro_transfer (&prev__cctx->cctx, &next->cctx->cctx); } - free_coro_mortal (aTHX); - UNLOCK; - - if (expect_false (has_throw)) - { - struct coro *coro = SvSTATE (coro_current); - - if (coro->throw) - { - SV *exception = coro->throw; - coro->throw = 0; - sv_setsv (ERRSV, exception); - croak (0); - } - } + transfer_tail (aTHX); } } -struct transfer_args -{ - struct coro *prev, *next; -}; - #define TRANSFER(ta, force_cctx) transfer (aTHX_ (ta).prev, (ta).next, (force_cctx)) #define TRANSFER_CHECK(ta) transfer_check (aTHX_ (ta).prev, (ta).next) @@ -1503,10 +1587,11 @@ api_is_ready (SV *coro_sv) { dTHX; + return !!(SvSTATE (coro_sv)->flags & CF_READY); } -static void +INLINE void prepare_schedule (pTHX_ struct transfer_args *ta) { SV *prev_sv, *next_sv; @@ -1542,7 +1627,7 @@ { UNLOCK; SvREFCNT_dec (next_sv); - /* coro_nready is already taken care of by destroy */ + /* coro_nready has already been taken care of by destroy */ continue; } @@ -1555,7 +1640,7 @@ prev_sv = SvRV (coro_current); ta->prev = SvSTATE (prev_sv); TRANSFER_CHECK (*ta); - assert (ta->next->flags & CF_READY); + assert (("FATAL: next coroutine isn't marked as ready in Coro (please report)", ta->next->flags & CF_READY)); ta->next->flags &= ~CF_READY; SvRV_set (coro_current, next_sv); @@ -1565,25 +1650,25 @@ UNLOCK; } -static void +INLINE void prepare_cede (pTHX_ struct transfer_args *ta) { api_ready (coro_current); prepare_schedule (aTHX_ ta); } -static int +static void prepare_cede_notself (pTHX_ struct transfer_args *ta) { + SV *prev = SvRV (coro_current); + if (coro_nready) { - SV *prev = SvRV (coro_current); prepare_schedule (aTHX_ ta); api_ready (prev); - return 1; } else - return 0; + ta->prev = ta->next = SvSTATE (prev); } static void @@ -1616,11 +1701,12 @@ static int api_cede_notself (void) { - dTHX; - struct transfer_args ta; - - if (prepare_cede_notself (aTHX_ &ta)) + if (coro_nready) { + dTHX; + struct transfer_args ta; + + prepare_cede_notself (aTHX_ &ta); TRANSFER (ta, 1); return 1; } @@ -1637,7 +1723,7 @@ if (flags & CC_TRACE) { if (!coro->cctx) - coro->cctx = cctx_new (); + coro->cctx = cctx_new_run (); else if (!(coro->cctx->flags & CC_TRACE)) croak ("cannot enable tracing on coroutine with custom stack"); @@ -1747,6 +1833,126 @@ PerlIOBuf_set_ptrcnt, }; +/*****************************************************************************/ + +static const CV *ssl_cv; /* for quick consistency check */ + +static UNOP ssl_restore; /* restore stack as entersub did, for first-re-run */ +static SV *ssl_arg0; +static SV *ssl_arg1; + +/* this restores the stack in the case we patched the entersub, to */ +/* recreate the stack frame as perl will on following calls */ +/* since entersub cleared the stack */ +static OP * +pp_restore (pTHX) +{ + dSP; + + PUSHMARK (SP); + + EXTEND (SP, 3); + if (ssl_arg0) PUSHs (sv_2mortal (ssl_arg0)), ssl_arg0 = 0; + if (ssl_arg1) PUSHs (sv_2mortal (ssl_arg1)), ssl_arg1 = 0; + PUSHs ((SV *)CvGV (ssl_cv)); + + RETURNOP (ssl_restore.op_first); +} + +#define OPpENTERSUB_SSL 15 /* the part of op_private entersub hopefully doesn't use */ + +/* declare prototype */ +XS(XS_Coro__State__set_stacklevel); + +/* + * these not obviously related functions are all rolled into one + * function to increase chances that they all will call transfer with the same + * stack offset + */ +static OP * +pp_set_stacklevel (pTHX) +{ + dSP; + struct transfer_args ta; + SV **arg = PL_stack_base + TOPMARK + 1; + int items = SP - arg; /* args without function object */ + + /* do a quick consistency check on the "function" object, and if it isn't */ + /* for us, divert to the real entersub */ + if (SvTYPE (*sp) != SVt_PVGV || CvXSUB (GvCV (*sp)) != XS_Coro__State__set_stacklevel) + return PL_ppaddr[OP_ENTERSUB](aTHX); + + /* pop args */ + SP = PL_stack_base + POPMARK; + + if (!(PL_op->op_flags & OPf_STACKED)) + { + /* ampersand-form of call, use @_ instead of stack */ + AV *av = GvAV (PL_defgv); + arg = AvARRAY (av); + items = AvFILLp (av) + 1; + } + + PUTBACK; + switch (PL_op->op_private & OPpENTERSUB_SSL) + { + case 0: + prepare_set_stacklevel (&ta, (struct coro_cctx *)SvIV (arg [0])); + break; + + case 1: + if (items != 2) + croak ("Coro::State::transfer (prev, next) expects two arguments, not %d.", items); + + prepare_transfer (aTHX_ &ta, arg [0], arg [1]); + break; + + case 2: + prepare_schedule (aTHX_ &ta); + break; + + case 3: + prepare_cede (aTHX_ &ta); + break; + + case 4: + prepare_cede_notself (aTHX_ &ta); + break; + } + + TRANSFER (ta, 0); + SPAGAIN; + +skip: + PUTBACK; + SSL_TAIL; + SPAGAIN; + RETURN; +} + +static void +coro_ssl_patch (pTHX_ CV *cv, int ix, SV **args, int items) +{ + assert (("FATAL: ssl call recursion in Coro module (please report)", PL_op->op_ppaddr != pp_set_stacklevel)); + + assert (("FATAL: ssl call with illegal CV value", CvGV (cv))); + ssl_cv = cv; + + /* we patch the op, and then re-run the whole call */ + /* we have to put some dummy argument on the stack for this to work */ + ssl_restore.op_next = (OP *)&ssl_restore; + ssl_restore.op_type = OP_NULL; + ssl_restore.op_ppaddr = pp_restore; + ssl_restore.op_first = PL_op; + + ssl_arg0 = items > 0 ? SvREFCNT_inc (args [0]) : 0; + ssl_arg1 = items > 1 ? SvREFCNT_inc (args [1]) : 0; + + PL_op->op_ppaddr = pp_set_stacklevel; + PL_op->op_private = PL_op->op_private & ~OPpENTERSUB_SSL | ix; /* we potentially share our private flags with entersub */ + + PL_op = (OP *)&ssl_restore; +} MODULE = Coro::State PACKAGE = Coro::State PREFIX = api_ @@ -1756,6 +1962,9 @@ { #ifdef USE_ITHREADS MUTEX_INIT (&coro_lock); +# if CORO_PTHREAD + coro_thx = PERL_GET_CONTEXT; +# endif #endif BOOT_PAGESIZE; @@ -1828,9 +2037,6 @@ OUTPUT: RETVAL -# these not obviously related functions are all rolled into the same xs -# function to increase chances that they all will call transfer with the same -# stack offset void _set_stacklevel (...) ALIAS: @@ -1839,46 +2045,7 @@ Coro::cede = 3 Coro::cede_notself = 4 CODE: -{ - struct transfer_args ta; - - PUTBACK; - switch (ix) - { - case 0: - ta.prev = (struct coro *)INT2PTR (coro_cctx *, SvIV (ST (0))); - ta.next = 0; - break; - - case 1: - if (items != 2) - croak ("Coro::State::transfer (prev, next) expects two arguments, not %d", items); - - prepare_transfer (aTHX_ &ta, ST (0), ST (1)); - break; - - case 2: - prepare_schedule (aTHX_ &ta); - break; - - case 3: - prepare_cede (aTHX_ &ta); - break; - - case 4: - if (!prepare_cede_notself (aTHX_ &ta)) - XSRETURN_EMPTY; - - break; - } - SPAGAIN; - - BARRIER; - PUTBACK; - TRANSFER (ta, 0); - SPAGAIN; /* might be the sp of a different coroutine now */ - /* be extra careful not to ever do anything after TRANSFER */ -} + coro_ssl_patch (aTHX_ cv, ix, &ST (0), items); bool _destroy (SV *coro_sv) @@ -1998,6 +2165,13 @@ RETVAL void +throw (Coro::State self, SV *throw = &PL_sv_undef) + PROTOTYPE: $;$ + CODE: + SvREFCNT_dec (self->throw); + self->throw = SvOK (throw) ? newSVsv (throw) : 0; + +void api_trace (SV *coro, int flags = CC_TRACE | CC_TRACE_SUB) SV * @@ -2078,8 +2252,7 @@ coro_ready[i] = newAV (); { - SV *sv = perl_get_sv ("Coro::API", TRUE); - perl_get_sv ("Coro::API", TRUE); /* silence 5.10 warning */ + SV *sv = coro_get_sv (aTHX_ "Coro::API", TRUE); coroapi.schedule = api_schedule; coroapi.cede = api_cede; @@ -2149,13 +2322,6 @@ OUTPUT: RETVAL -void -throw (Coro::State self, SV *throw = &PL_sv_undef) - PROTOTYPE: $;$ - CODE: - SvREFCNT_dec (self->throw); - self->throw = SvOK (throw) ? newSVsv (throw) : 0; - # for async_pool speedup void _pool_1 (SV *cb) @@ -2364,3 +2530,4 @@ BOOT: PerlIO_define_layer (aTHX_ &PerlIO_cede); +