--- Coro/Coro/State.xs 2008/11/09 15:35:44 1.257 +++ Coro/Coro/State.xs 2008/11/16 08:59:16 1.279 @@ -48,9 +48,6 @@ #if CORO_USE_VALGRIND # include -# define REGISTER_STACK(cctx,start,end) (cctx)->valgrind_id = VALGRIND_STACK_REGISTER ((start), (end)) -#else -# define REGISTER_STACK(cctx,start,end) #endif /* the maximum number of idle cctx that will be pooled */ @@ -121,19 +118,22 @@ /* The next macros try to return the current stack pointer, in an as * portable way as possible. */ -#define dSTACKLEVEL volatile char stacklevel -#define STACKLEVEL ((void *)&stacklevel) +#if __GNUC__ >= 4 +# define dSTACKLEVEL void *stacklevel = __builtin_frame_address (0) +#else +# define dSTACKLEVEL volatile void *stacklevel = (volatile void *)&stacklevel +#endif #define IN_DESTRUCT (PL_main_cv == Nullcv) #if __GNUC__ >= 3 # define attribute(x) __attribute__(x) -# define BARRIER __asm__ __volatile__ ("" : : : "memory") # define expect(expr,value) __builtin_expect ((expr),(value)) +# define INLINE static inline #else # define attribute(x) -# define BARRIER # define expect(expr,value) (expr) +# define INLINE static #endif #define expect_false(expr) expect ((expr) != 0, 0) @@ -183,6 +183,7 @@ static JMPENV *main_top_env; static HV *coro_state_stash, *coro_stash; static volatile SV *coro_mortal; /* will be freed/thrown after next transfer */ +static volatile struct coro *transfer_next; static GV *irsgv; /* $/ */ static GV *stdoutgv; /* *STDOUT */ @@ -211,7 +212,8 @@ }; /* this is a structure representing a c-level coroutine */ -typedef struct coro_cctx { +typedef struct coro_cctx +{ struct coro_cctx *next; /* the stack */ @@ -239,7 +241,8 @@ }; /* the structure where most of the perl state is stored, overlaid on the cxstack */ -typedef struct { +typedef struct +{ SV *defsv; AV *defav; SV *errsv; @@ -253,10 +256,11 @@ /* this is a structure representing a perl-level coroutine */ struct coro { - /* the c coroutine allocated to this perl coroutine, if any */ + /* the C coroutine allocated to this perl coroutine, if any */ coro_cctx *cctx; /* process data */ + struct CoroSLF slf_frame; /* saved slf frame */ AV *mainstack; perl_slots *slot; /* basically the saved sp */ @@ -282,6 +286,8 @@ typedef struct coro *Coro__State; typedef struct coro *Coro__State_or_hashref; +static struct CoroSLF slf_frame; /* the current slf frame */ + /** Coro ********************************************************************/ #define PRIO_MAX 3 @@ -295,8 +301,8 @@ static SV *coro_current; static SV *coro_readyhook; static AV *coro_ready [PRIO_MAX - PRIO_MIN + 1]; -static int coro_nready; static struct coro *coro_first; +#define coro_nready coroapi.nready /** lowlevel stuff **********************************************************/ @@ -409,7 +415,7 @@ #define CORO_MAGIC_cv(cv) CORO_MAGIC (((SV *)(cv)), CORO_MAGIC_type_cv) #define CORO_MAGIC_state(sv) CORO_MAGIC (((SV *)(sv)), CORO_MAGIC_type_state) -static struct coro * +INLINE struct coro * SvSTATE_ (pTHX_ SV *coro) { HV *stash; @@ -510,11 +516,15 @@ PUTBACK; } + + slf_frame = c->slf_frame; } static void save_perl (pTHX_ Coro__State c) { + c->slf_frame = slf_frame; + { dSP; I32 cxix = cxstack_ix; @@ -591,7 +601,7 @@ } /* - * allocate various perl stacks. This is an exact copy + * allocate various perl stacks. This is almost an exact copy * of perl.c:init_stacks, except that it uses less memory * on the (sometimes correct) assumption that coroutines do * not usually need a lot of stackspace. @@ -803,6 +813,19 @@ } static void +prepare_nop (pTHX_ struct coro_transfer_args *ta) +{ + /* kind of mega-hacky, but works */ + ta->next = ta->prev = (struct coro *)ta; +} + +static int +slf_check_nop (pTHX_ struct CoroSLF *frame) +{ + return 0; +} + +static void coro_setup (pTHX_ struct coro *coro) { /* @@ -836,9 +859,9 @@ { dSP; - LOGOP myop; + UNOP myop; - Zero (&myop, 1, LOGOP); + Zero (&myop, 1, UNOP); myop.op_next = Nullop; myop.op_flags = OPf_WANT_VOID; @@ -851,11 +874,10 @@ } /* this newly created coroutine might be run on an existing cctx which most - * likely was suspended in set_stacklevel, called from entersub. - * set_stacklevl doesn't do anything on return, but entersub does LEAVE, - * so we ENTER here for symmetry + * likely was suspended in pp_slf, so we have to emulate entering pp_slf here. */ - ENTER; + slf_frame.prepare = prepare_nop; /* provide a nop function for an eventual pp_slf */ + slf_frame.check = slf_check_nop; /* signal pp_slf to not repeat */ } static void @@ -894,7 +916,7 @@ coro_destruct_stacks (aTHX); } -static void +INLINE void free_coro_mortal (pTHX) { if (expect_true (coro_mortal)) @@ -1028,27 +1050,35 @@ return 0; } +static void +prepare_set_stacklevel (struct coro_transfer_args *ta, struct coro_cctx *cctx) +{ + ta->prev = (struct coro *)cctx; + ta->next = 0; +} + /* inject a fake call to Coro::State::_cctx_init into the execution */ /* _cctx_init should be careful, as it could be called at almost any time */ /* during execution of a perl program */ +/* also initialises PL_top_env */ static void NOINLINE cctx_prepare (pTHX_ coro_cctx *cctx) { dSP; - LOGOP myop; + UNOP myop; PL_top_env = &PL_start_env; if (cctx->flags & CC_TRACE) PL_runops = runops_trace; - Zero (&myop, 1, LOGOP); - myop.op_next = PL_op; + Zero (&myop, 1, UNOP); + myop.op_next = PL_op; myop.op_flags = OPf_WANT_VOID | OPf_STACKED; PUSHMARK (SP); EXTEND (SP, 2); - PUSHs (sv_2mortal (newSViv (PTR2IV (cctx)))); + PUSHs (sv_2mortal (newSViv ((IV)cctx))); PUSHs ((SV *)get_cv ("Coro::State::_cctx_init", FALSE)); PUTBACK; PL_op = (OP *)&myop; @@ -1056,6 +1086,27 @@ SPAGAIN; } +/* the tail of transfer: execute stuff we can only do after a transfer */ +INLINE void +transfer_tail (pTHX) +{ + struct coro *next = (struct coro *)transfer_next; + assert (!(transfer_next = 0)); /* just used for the side effect when asserts are enabled */ + assert (("FATAL: next coroutine was zero in transfer_tail (please report)", next)); + + free_coro_mortal (aTHX); + UNLOCK; + + if (expect_false (next->throw)) + { + SV *exception = sv_2mortal (next->throw); + + next->throw = 0; + sv_setsv (ERRSV, exception); + croak (0); + } +} + /* * this is a _very_ stripped down perl interpreter ;) */ @@ -1070,15 +1121,17 @@ { dTHX; - /* cctx_run is the alternative tail of transfer(), so unlock here. */ - UNLOCK; - - /* we now skip the entersub that lead to transfer() */ - PL_op = PL_op->op_next; + /* normally we would need to skip the entersub here */ + /* not doing so will re-execute it, which is exactly what we want */ + /* PL_nop = PL_nop->op_next */ /* inject a fake subroutine call to cctx_init */ cctx_prepare (aTHX_ (coro_cctx *)arg); + /* cctx_run is the alternative tail of transfer() */ + /* TODO: throwing an exception here might be deadly, VERIFY */ + transfer_tail (aTHX); + /* somebody or something will hit me for both perl_run and PL_restartop */ PL_restartop = PL_op; perl_run (PL_curinterp); @@ -1099,13 +1152,36 @@ cctx_new () { coro_cctx *cctx; - void *stack_start; - size_t stack_size; ++cctx_count; - Newz (0, cctx, 1, coro_cctx); + New (0, cctx, 1, coro_cctx); + + cctx->gen = cctx_gen; + cctx->flags = 0; + cctx->idle_sp = 0; /* can be accessed by transfer between cctx_run and set_stacklevel, on throw */ + + return cctx; +} + +/* create a new cctx only suitable as source */ +static coro_cctx * +cctx_new_empty () +{ + coro_cctx *cctx = cctx_new (); - cctx->gen = cctx_gen; + cctx->sptr = 0; + coro_create (&cctx->cctx, 0, 0, 0, 0); + + return cctx; +} + +/* create a new cctx suitable as destination/running a perl interpreter */ +static coro_cctx * +cctx_new_run () +{ + coro_cctx *cctx = cctx_new (); + void *stack_start; + size_t stack_size; #if HAVE_MMAP cctx->ssize = ((cctx_stacksize * sizeof (long) + PAGESIZE - 1) / PAGESIZE + CORO_STACKGUARD) * PAGESIZE; @@ -1114,11 +1190,11 @@ if (cctx->sptr != (void *)-1) { -# if CORO_STACKGUARD - mprotect (cctx->sptr, CORO_STACKGUARD * PAGESIZE, PROT_NONE); -# endif - stack_start = CORO_STACKGUARD * PAGESIZE + (char *)cctx->sptr; - stack_size = cctx->ssize - CORO_STACKGUARD * PAGESIZE; + #if CORO_STACKGUARD + mprotect (cctx->sptr, CORO_STACKGUARD * PAGESIZE, PROT_NONE); + #endif + stack_start = (char *)cctx->sptr + CORO_STACKGUARD * PAGESIZE; + stack_size = cctx->ssize - CORO_STACKGUARD * PAGESIZE; cctx->flags |= CC_MAPPED; } else @@ -1129,7 +1205,7 @@ if (!cctx->sptr) { - perror ("FATAL: unable to allocate stack for coroutine"); + perror ("FATAL: unable to allocate stack for coroutine, exiting."); _exit (EXIT_FAILURE); } @@ -1137,7 +1213,10 @@ stack_size = cctx->ssize; } - REGISTER_STACK (cctx, (char *)stack_start, (char *)stack_start + stack_size); + #if CORO_USE_VALGRIND + cctx->valgrind_id = VALGRIND_STACK_REGISTER ((char *)stack_start, (char *)stack_start + stack_size); + #endif + coro_create (&cctx->cctx, cctx_run, (void *)cctx, stack_start, stack_size); return cctx; @@ -1155,9 +1234,9 @@ /* coro_transfer creates new, empty cctx's */ if (cctx->sptr) { -#if CORO_USE_VALGRIND - VALGRIND_STACK_DEREGISTER (cctx->valgrind_id); -#endif + #if CORO_USE_VALGRIND + VALGRIND_STACK_DEREGISTER (cctx->valgrind_id); + #endif #if HAVE_MMAP if (cctx->flags & CC_MAPPED) @@ -1188,13 +1267,13 @@ cctx_destroy (cctx); } - return cctx_new (); + return cctx_new_run (); } static void cctx_put (coro_cctx *cctx) { - assert (("cctx_put called on non-initialised cctx", cctx->sptr)); + assert (("FATAL: cctx_put called on non-initialised cctx in Coro (please report)", cctx->sptr)); /* free another cctx if overlimit */ if (expect_false (cctx_idle >= cctx_max_idle)) @@ -1219,17 +1298,17 @@ if (expect_true (prev != next)) { if (expect_false (!(prev->flags & (CF_RUNNING | CF_NEW)))) - croak ("Coro::State::transfer called with non-running/new prev Coro::State, but can only transfer from running or new states"); + croak ("Coro::State::transfer called with non-running/new prev Coro::State, but can only transfer from running or new states,"); if (expect_false (next->flags & CF_RUNNING)) - croak ("Coro::State::transfer called with running next Coro::State, but can only transfer to inactive states"); + croak ("Coro::State::transfer called with running next Coro::State, but can only transfer to inactive states,"); if (expect_false (next->flags & CF_DESTROYED)) - croak ("Coro::State::transfer called with destroyed next Coro::State, but can only transfer to inactive states"); + croak ("Coro::State::transfer called with destroyed next Coro::State, but can only transfer to inactive states,"); #if !PERL_VERSION_ATLEAST (5,10,0) if (expect_false (PL_lex_state != LEX_NOTPARSING)) - croak ("Coro::State::transfer called while parsing, but this is not supported in your perl version"); + croak ("Coro::State::transfer called while parsing, but this is not supported in your perl version,"); #endif } } @@ -1243,22 +1322,17 @@ /* sometimes transfer is only called to set idle_sp */ if (expect_false (!next)) { - ((coro_cctx *)prev)->idle_sp = STACKLEVEL; + ((coro_cctx *)prev)->idle_sp = (void *)stacklevel; assert (((coro_cctx *)prev)->idle_te = PL_top_env); /* just for the side-effect when asserts are enabled */ } else if (expect_true (prev != next)) { - static volatile int has_throw; coro_cctx *prev__cctx; if (expect_false (prev->flags & CF_NEW)) { /* create a new empty/source context */ - ++cctx_count; - New (0, prev->cctx, 1, coro_cctx); - prev->cctx->sptr = 0; - coro_create (&prev->cctx->cctx, 0, 0, 0, 0); - + prev->cctx = cctx_new_empty (); prev->flags &= ~CF_NEW; prev->flags |= CF_RUNNING; } @@ -1283,15 +1357,15 @@ prev__cctx = prev->cctx; - /* possibly "free" the cctx */ + /* possibly untie and reuse the cctx */ if (expect_true ( - prev__cctx->idle_sp == STACKLEVEL + prev__cctx->idle_sp == (void *)stacklevel && !(prev__cctx->flags & CC_TRACE) && !force_cctx )) { - /* I assume that STACKLEVEL is a stronger indicator than PL_top_env changes */ - assert (("ERROR: current top_env must equal previous top_env", PL_top_env == prev__cctx->idle_te)); + /* I assume that stacklevel is a stronger indicator than PL_top_env changes */ + assert (("FATAL: current top_env must equal previous top_env in Coro (please report)", PL_top_env == prev__cctx->idle_te)); prev->cctx = 0; @@ -1309,7 +1383,8 @@ if (expect_true (!next->cctx)) next->cctx = cctx_get (aTHX); - has_throw = !!next->throw; + assert (("FATAL: transfer_next already nonzero in Coro (please report)", !transfer_next)); + transfer_next = next; if (expect_false (prev__cctx != next->cctx)) { @@ -1318,29 +1393,10 @@ coro_transfer (&prev__cctx->cctx, &next->cctx->cctx); } - free_coro_mortal (aTHX); - UNLOCK; - - if (expect_false (has_throw)) - { - struct coro *coro = SvSTATE (coro_current); - - if (coro->throw) - { - SV *exception = coro->throw; - coro->throw = 0; - sv_setsv (ERRSV, exception); - croak (0); - } - } + transfer_tail (aTHX); } } -struct transfer_args -{ - struct coro *prev, *next; -}; - #define TRANSFER(ta, force_cctx) transfer (aTHX_ (ta).prev, (ta).next, (force_cctx)) #define TRANSFER_CHECK(ta) transfer_check (aTHX_ (ta).prev, (ta).next) @@ -1369,8 +1425,7 @@ { struct coro temp; - if (coro->flags & CF_RUNNING) - croak ("FATAL: tried to destroy currently running coroutine"); + assert (("FATAL: tried to destroy currently running coroutine (please report)", !(coro->flags & CF_RUNNING))); save_perl (aTHX_ &temp); load_perl (aTHX_ coro); @@ -1431,7 +1486,7 @@ }; static void -prepare_transfer (pTHX_ struct transfer_args *ta, SV *prev_sv, SV *next_sv) +prepare_transfer (pTHX_ struct coro_transfer_args *ta, SV *prev_sv, SV *next_sv) { ta->prev = SvSTATE (prev_sv); ta->next = SvSTATE (next_sv); @@ -1439,10 +1494,9 @@ } static void -api_transfer (SV *prev_sv, SV *next_sv) +api_transfer (pTHX_ SV *prev_sv, SV *next_sv) { - dTHX; - struct transfer_args ta; + struct coro_transfer_args ta; prepare_transfer (aTHX_ &ta, prev_sv, next_sv); TRANSFER (ta, 1); @@ -1469,9 +1523,8 @@ } static int -api_ready (SV *coro_sv) +api_ready (pTHX_ SV *coro_sv) { - dTHX; struct coro *coro; SV *sv_hook; void (*xs_hook)(void); @@ -1519,14 +1572,13 @@ } static int -api_is_ready (SV *coro_sv) +api_is_ready (pTHX_ SV *coro_sv) { - dTHX; return !!(SvSTATE (coro_sv)->flags & CF_READY); } -static void -prepare_schedule (pTHX_ struct transfer_args *ta) +INLINE void +prepare_schedule (pTHX_ struct coro_transfer_args *ta) { SV *prev_sv, *next_sv; @@ -1561,7 +1613,7 @@ { UNLOCK; SvREFCNT_dec (next_sv); - /* coro_nready is already taken care of by destroy */ + /* coro_nready has already been taken care of by destroy */ continue; } @@ -1574,7 +1626,7 @@ prev_sv = SvRV (coro_current); ta->prev = SvSTATE (prev_sv); TRANSFER_CHECK (*ta); - assert (ta->next->flags & CF_READY); + assert (("FATAL: next coroutine isn't marked as ready in Coro (please report)", ta->next->flags & CF_READY)); ta->next->flags &= ~CF_READY; SvRV_set (coro_current, next_sv); @@ -1584,42 +1636,40 @@ UNLOCK; } -static void -prepare_cede (pTHX_ struct transfer_args *ta) +INLINE void +prepare_cede (pTHX_ struct coro_transfer_args *ta) { - api_ready (coro_current); + api_ready (aTHX_ coro_current); prepare_schedule (aTHX_ ta); } -static int -prepare_cede_notself (pTHX_ struct transfer_args *ta) +INLINE void +prepare_cede_notself (pTHX_ struct coro_transfer_args *ta) { + SV *prev = SvRV (coro_current); + if (coro_nready) { - SV *prev = SvRV (coro_current); prepare_schedule (aTHX_ ta); - api_ready (prev); - return 1; + api_ready (aTHX_ prev); } else - return 0; + prepare_nop (aTHX_ ta); } static void -api_schedule (void) +api_schedule (pTHX) { - dTHX; - struct transfer_args ta; + struct coro_transfer_args ta; prepare_schedule (aTHX_ &ta); TRANSFER (ta, 1); } static int -api_cede (void) +api_cede (pTHX) { - dTHX; - struct transfer_args ta; + struct coro_transfer_args ta; prepare_cede (aTHX_ &ta); @@ -1633,13 +1683,13 @@ } static int -api_cede_notself (void) +api_cede_notself (pTHX) { - dTHX; - struct transfer_args ta; - - if (prepare_cede_notself (aTHX_ &ta)) + if (coro_nready) { + struct coro_transfer_args ta; + + prepare_cede_notself (aTHX_ &ta); TRANSFER (ta, 1); return 1; } @@ -1648,17 +1698,16 @@ } static void -api_trace (SV *coro_sv, int flags) +api_trace (pTHX_ SV *coro_sv, int flags) { - dTHX; struct coro *coro = SvSTATE (coro_sv); if (flags & CC_TRACE) { if (!coro->cctx) - coro->cctx = cctx_new (); + coro->cctx = cctx_new_run (); else if (!(coro->cctx->flags & CC_TRACE)) - croak ("cannot enable tracing on coroutine with custom stack"); + croak ("cannot enable tracing on coroutine with custom stack,"); coro->cctx->flags |= CC_NOREUSE | (flags & (CC_TRACE | CC_TRACE_ALL)); } @@ -1673,24 +1722,6 @@ } } -#if 0 -static int -coro_gensub_free (pTHX_ SV *sv, MAGIC *mg) -{ - AV *padlist; - AV *av = (AV *)mg->mg_obj; - - abort (); - - return 0; -} - -static MGVTBL coro_gensub_vtbl = { - 0, 0, 0, 0, - coro_gensub_free -}; -#endif - /*****************************************************************************/ /* PerlIO::cede */ @@ -1727,7 +1758,7 @@ if (now >= self->next) { - api_cede (); + api_cede (aTHX); self->next = now + self->every; } @@ -1766,6 +1797,280 @@ PerlIOBuf_set_ptrcnt, }; +/*****************************************************************************/ + +static const CV *slf_cv; /* for quick consistency check */ + +static UNOP slf_restore; /* restore stack as entersub did, for first-re-run */ +static SV *slf_arg0; +static SV *slf_arg1; +static SV *slf_arg2; + +/* this restores the stack in the case we patched the entersub, to */ +/* recreate the stack frame as perl will on following calls */ +/* since entersub cleared the stack */ +static OP * +pp_restore (pTHX) +{ + dSP; + + PUSHMARK (SP); + + EXTEND (SP, 3); + if (slf_arg0) PUSHs (sv_2mortal (slf_arg0)); + if (slf_arg1) PUSHs (sv_2mortal (slf_arg1)); + if (slf_arg2) PUSHs (sv_2mortal (slf_arg2)); + PUSHs ((SV *)CvGV (slf_cv)); + + RETURNOP (slf_restore.op_first); +} + +static void +slf_prepare_set_stacklevel (pTHX_ struct coro_transfer_args *ta) +{ + prepare_set_stacklevel (ta, (struct coro_cctx *)slf_frame.data); +} + +static void +slf_init_set_stacklevel (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items) +{ + assert (("FATAL: set_stacklevel needs the coro cctx as sole argument", items == 1)); + + frame->prepare = slf_prepare_set_stacklevel; + frame->check = slf_check_nop; + frame->data = (void *)SvIV (arg [0]); +} + +static void +slf_prepare_transfer (pTHX_ struct coro_transfer_args *ta) +{ + SV **arg = (SV **)slf_frame.data; + + prepare_transfer (aTHX_ ta, arg [0], arg [1]); +} + +static void +slf_init_transfer (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items) +{ + if (items != 2) + croak ("Coro::State::transfer (prev, next) expects two arguments, not %d,", items); + + frame->prepare = slf_prepare_transfer; + frame->check = slf_check_nop; + frame->data = (void *)arg; /* let's hope it will stay valid */ +} + +static void +slf_init_schedule (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items) +{ + frame->prepare = prepare_schedule; + frame->check = slf_check_nop; +} + +static void +slf_init_cede (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items) +{ + frame->prepare = prepare_cede; + frame->check = slf_check_nop; +} + +static void +slf_init_cede_notself (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items) +{ + frame->prepare = prepare_cede_notself; + frame->check = slf_check_nop; +} + +/* we hijack an hopefully unused CV flag for our purposes */ +#define CVf_SLF 0x4000 + +/* + * these not obviously related functions are all rolled into one + * function to increase chances that they all will call transfer with the same + * stack offset + * SLF stands for "schedule-like-function". + */ +static OP * +pp_slf (pTHX) +{ + I32 checkmark; /* mark SP to see how many elements check has pushed */ + + /* set up the slf frame, unless it has already been set-up */ + /* the latter happens when a new coro has been started */ + /* or when a new cctx was attached to an existing coroutine */ + if (expect_true (!slf_frame.prepare)) + { + /* first iteration */ + dSP; + SV **arg = PL_stack_base + TOPMARK + 1; + int items = SP - arg; /* args without function object */ + SV *gv = *sp; + + /* do a quick consistency check on the "function" object, and if it isn't */ + /* for us, divert to the real entersub */ + if (SvTYPE (gv) != SVt_PVGV || !(CvFLAGS (GvCV (gv)) & CVf_SLF)) + return PL_ppaddr[OP_ENTERSUB](aTHX); + + /* pop args */ + SP = PL_stack_base + POPMARK; + + if (!(PL_op->op_flags & OPf_STACKED)) + { + /* ampersand-form of call, use @_ instead of stack */ + AV *av = GvAV (PL_defgv); + arg = AvARRAY (av); + items = AvFILLp (av) + 1; + } + + PUTBACK; + + ((coro_slf_cb)CvXSUBANY (GvCV (gv)).any_ptr) (aTHX_ &slf_frame, GvCV (gv), arg, items); + } + + /* now interpret the slf_frame */ + /* we use a callback system not to make the code needlessly */ + /* complicated, but so we can run multiple perl coros from one cctx */ + + do + { + struct coro_transfer_args ta; + + slf_frame.prepare (aTHX_ &ta); + TRANSFER (ta, 0); + + checkmark = PL_stack_sp - PL_stack_base; + } + while (slf_frame.check (aTHX_ &slf_frame)); + + { + dSP; + SV **bot = PL_stack_base + checkmark; + int gimme = GIMME_V; + + slf_frame.prepare = 0; /* signal pp_slf that we need a new frame */ + + /* make sure we put something on the stack in scalar context */ + if (gimme == G_SCALAR) + { + if (sp == bot) + XPUSHs (&PL_sv_undef); + + SP = bot + 1; + } + + PUTBACK; + } + + return NORMAL; +} + +static void +api_execute_slf (pTHX_ CV *cv, coro_slf_cb init_cb, SV **arg, int items) +{ + assert (("FATAL: SLF call with illegal CV value", !CvANON (cv))); + + if (PL_op->op_ppaddr != PL_ppaddr [OP_ENTERSUB] + && PL_op->op_ppaddr != pp_slf) + croak ("FATAL: Coro SLF calls can only be made normally, not via goto or any other means, caught"); + + if (items > 3) + croak ("Coro only supports up to three arguments to SLF functions currently (not %d), caught", items); + + CvFLAGS (cv) |= CVf_SLF; + CvXSUBANY (cv).any_ptr = (void *)init_cb; + slf_cv = cv; + + /* we patch the op, and then re-run the whole call */ + /* we have to put the same argument on the stack for this to work */ + /* and this will be done by pp_restore */ + slf_restore.op_next = (OP *)&slf_restore; + slf_restore.op_type = OP_NULL; + slf_restore.op_ppaddr = pp_restore; + slf_restore.op_first = PL_op; + + slf_arg0 = items > 0 ? SvREFCNT_inc (arg [0]) : 0; + slf_arg1 = items > 1 ? SvREFCNT_inc (arg [1]) : 0; + slf_arg2 = items > 2 ? SvREFCNT_inc (arg [2]) : 0; + + PL_op->op_ppaddr = pp_slf; + + PL_op = (OP *)&slf_restore; +} + +/*****************************************************************************/ + +static int +slf_check_semaphore_down (pTHX_ struct CoroSLF *frame) +{ + AV *av = (AV *)frame->data; + SV *count_sv = AvARRAY (av)[0]; + + if (SvIVX (count_sv) > 0) + { + SvIVX (count_sv) = SvIVX (count_sv) - 1; + return 0; + } + else + { + int i; + /* if we were woken up but can't down, we look through the whole */ + /* waiters list and only add us if we aren't in there already */ + /* this avoids some degenerate memory usage cases */ + + for (i = 1; i <= AvFILLp (av); ++i) + if (AvARRAY (av)[i] == SvRV (coro_current)) + return 1; + + av_push (av, SvREFCNT_inc (SvRV (coro_current))); + return 1; + } +} + +static void +slf_init_semaphore_down (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items) +{ + AV *av = (AV *)SvRV (arg [0]); + + if (SvIVX (AvARRAY (av)[0]) > 0) + { + frame->data = (void *)av; + frame->prepare = prepare_nop; + } + else + { + av_push (av, SvREFCNT_inc (SvRV (coro_current))); + + frame->data = (void *)sv_2mortal (SvREFCNT_inc ((SV *)av)); + frame->prepare = prepare_schedule; + } + + frame->check = slf_check_semaphore_down; + +} + +/*****************************************************************************/ + +#define GENSUB_ARG CvXSUBANY (cv).any_ptr + +/* create a closure from XS, returns a code reference */ +/* the arg can be accessed via GENSUB_ARG from the callback */ +/* the callback must use dXSARGS/XSRETURN */ +static SV * +gensub (pTHX_ void (*xsub)(pTHX_ CV *), void *arg) +{ + CV *cv = (CV *)NEWSV (0, 0); + + sv_upgrade ((SV *)cv, SVt_PVCV); + + CvANON_on (cv); + CvISXSUB_on (cv); + CvXSUB (cv) = xsub; + GENSUB_ARG = arg; + + return newRV_noinc ((SV *)cv); +} + +/*****************************************************************************/ MODULE = Coro::State PACKAGE = Coro::State PREFIX = api_ @@ -1805,9 +2110,17 @@ while (main_top_env->je_prev) main_top_env = main_top_env->je_prev; - coroapi.ver = CORO_API_VERSION; - coroapi.rev = CORO_API_REVISION; - coroapi.transfer = api_transfer; + coroapi.ver = CORO_API_VERSION; + coroapi.rev = CORO_API_REVISION; + + coroapi.transfer = api_transfer; + + coroapi.sv_state = SvSTATE_; + coroapi.execute_slf = api_execute_slf; + coroapi.prepare_nop = prepare_nop; + coroapi.prepare_schedule = prepare_schedule; + coroapi.prepare_cede = prepare_cede; + coroapi.prepare_cede_notself = prepare_cede_notself; { SV **svp = hv_fetch (PL_modglobal, "Time::NVtime", 12, 0); @@ -1850,57 +2163,16 @@ OUTPUT: RETVAL -# these not obviously related functions are all rolled into the same xs -# function to increase chances that they all will call transfer with the same -# stack offset void _set_stacklevel (...) - ALIAS: - Coro::State::transfer = 1 - Coro::schedule = 2 - Coro::cede = 3 - Coro::cede_notself = 4 - CODE: -{ - struct transfer_args ta; - - PUTBACK; - switch (ix) - { - case 0: - ta.prev = (struct coro *)INT2PTR (coro_cctx *, SvIV (ST (0))); - ta.next = 0; - break; - - case 1: - if (items != 2) - croak ("Coro::State::transfer (prev, next) expects two arguments, not %d", items); - - prepare_transfer (aTHX_ &ta, ST (0), ST (1)); - break; - - case 2: - prepare_schedule (aTHX_ &ta); - break; - - case 3: - prepare_cede (aTHX_ &ta); - break; - - case 4: - if (!prepare_cede_notself (aTHX_ &ta)) - XSRETURN_EMPTY; - - break; - } - SPAGAIN; + CODE: + api_execute_slf (aTHX_ cv, slf_init_set_stacklevel, &ST (0), items); - BARRIER; - PUTBACK; - TRANSFER (ta, 0); - SPAGAIN; /* might be the sp of a different coroutine now */ - /* be extra careful not to ever do anything after TRANSFER */ -} +void +transfer (...) + PROTOTYPE: $$ + CODE: + api_execute_slf (aTHX_ cv, slf_init_transfer, &ST (0), items); bool _destroy (SV *coro_sv) @@ -1917,6 +2189,7 @@ int cctx_stacksize (int new_stacksize = 0) + PROTOTYPE: ;$ CODE: RETVAL = cctx_stacksize; if (new_stacksize) @@ -1929,6 +2202,7 @@ int cctx_max_idle (int max_idle = 0) + PROTOTYPE: ;$ CODE: RETVAL = cctx_max_idle; if (max_idle > 1) @@ -1938,6 +2212,7 @@ int cctx_count () + PROTOTYPE: CODE: RETVAL = cctx_count; OUTPUT: @@ -1945,6 +2220,7 @@ int cctx_idle () + PROTOTYPE: CODE: RETVAL = cctx_idle; OUTPUT: @@ -1952,6 +2228,7 @@ void list () + PROTOTYPE: PPCODE: { struct coro *coro; @@ -2020,7 +2297,16 @@ RETVAL void +throw (Coro::State self, SV *throw = &PL_sv_undef) + PROTOTYPE: $;$ + CODE: + SvREFCNT_dec (self->throw); + self->throw = SvOK (throw) ? newSVsv (throw) : 0; + +void api_trace (SV *coro, int flags = CC_TRACE | CC_TRACE_SUB) + PROTOTYPE: $;$ + C_ARGS: aTHX_ coro, flags SV * has_cctx (Coro::State coro) @@ -2054,6 +2340,7 @@ void force_cctx () + PROTOTYPE: CODE: struct coro *coro = SvSTATE (coro_current); coro->cctx->idle_sp = 0; @@ -2065,7 +2352,7 @@ swap_defav = 1 CODE: if (!self->slot) - croak ("cannot swap state with coroutine that has no saved state"); + croak ("cannot swap state with coroutine that has no saved state,"); else { SV **src = ix ? (SV **)&GvAV (PL_defgv) : &GvSV (PL_defgv); @@ -2100,15 +2387,14 @@ coro_ready[i] = newAV (); { - SV *sv = perl_get_sv ("Coro::API", TRUE); - perl_get_sv ("Coro::API", TRUE); /* silence 5.10 warning */ + SV *sv = coro_get_sv (aTHX_ "Coro::API", TRUE); coroapi.schedule = api_schedule; coroapi.cede = api_cede; coroapi.cede_notself = api_cede_notself; coroapi.ready = api_ready; coroapi.is_ready = api_is_ready; - coroapi.nready = &coro_nready; + coroapi.nready = coro_nready; coroapi.current = coro_current; GCoroAPI = &coroapi; @@ -2118,6 +2404,21 @@ } void +schedule (...) + CODE: + api_execute_slf (aTHX_ cv, slf_init_schedule, &ST (0), 0); + +void +cede (...) + CODE: + api_execute_slf (aTHX_ cv, slf_init_cede, &ST (0), 0); + +void +cede_notself (...) + CODE: + api_execute_slf (aTHX_ cv, slf_init_cede_notself, &ST (0), 0); + +void _set_current (SV *current) PROTOTYPE: $ CODE: @@ -2135,6 +2436,7 @@ int prio (Coro::State coro, int newprio = 0) + PROTOTYPE: $;$ ALIAS: nice = 1 CODE: @@ -2159,7 +2461,7 @@ ready (SV *self) PROTOTYPE: $ CODE: - RETVAL = boolSV (api_ready (self)); + RETVAL = boolSV (api_ready (aTHX_ self)); OUTPUT: RETVAL @@ -2171,13 +2473,6 @@ OUTPUT: RETVAL -void -throw (Coro::State self, SV *throw = &PL_sv_undef) - PROTOTYPE: $;$ - CODE: - SvREFCNT_dec (self->throw); - self->throw = SvOK (throw) ? newSVsv (throw) : 0; - # for async_pool speedup void _pool_1 (SV *cb) @@ -2215,8 +2510,6 @@ for (i = 0; i < len; ++i) av_store (defav, i, SvREFCNT_inc_NN (AvARRAY (invoke_av)[i + 1])); } - - SvREFCNT_dec (invoke); } void @@ -2246,64 +2539,17 @@ coro->prio = 0; if (coro->cctx && (coro->cctx->flags & CC_TRACE)) - api_trace (coro_current, 0); + api_trace (aTHX_ coro_current, 0); av_push (av_async_pool, newSVsv (coro_current)); } -#if 0 - -void -_generator_call (...) - PROTOTYPE: @ - PPCODE: - fprintf (stderr, "call %p\n", CvXSUBANY(cv).any_ptr); - xxxx - abort (); - -SV * -gensub (SV *sub, ...) - PROTOTYPE: &;@ - CODE: -{ - struct coro *coro; - MAGIC *mg; - CV *xcv; - CV *ncv = (CV *)newSV_type (SVt_PVCV); - int i; - - CvGV (ncv) = CvGV (cv); - CvFILE (ncv) = CvFILE (cv); - - Newz (0, coro, 1, struct coro); - coro->args = newAV (); - coro->flags = CF_NEW; - - av_extend (coro->args, items - 1); - for (i = 1; i < items; i++) - av_push (coro->args, newSVsv (ST (i))); - - CvISXSUB_on (ncv); - CvXSUBANY (ncv).any_ptr = (void *)coro; - - xcv = GvCV (gv_fetchpv ("Coro::_generator_call", 0, SVt_PVCV)); - - CvXSUB (ncv) = CvXSUB (xcv); - CvANON_on (ncv); - - mg = sv_magicext ((SV *)ncv, 0, CORO_MAGIC_type_state, &coro_gensub_vtbl, (char *)coro, 0); - RETVAL = newRV_noinc ((SV *)ncv); -} - OUTPUT: - RETVAL - -#endif - MODULE = Coro::State PACKAGE = Coro::AIO void _get_state (SV *self) + PROTOTYPE: $ PPCODE: { AV *defav = GvAV (PL_defgv); @@ -2328,7 +2574,7 @@ XPUSHs (sv_2mortal (newRV_noinc ((SV *)av))); - api_ready (self); + api_ready (aTHX_ self); } void @@ -2356,17 +2602,16 @@ BOOT: sv_activity = coro_get_sv (aTHX_ "Coro::AnyEvent::ACTIVITY", TRUE); -SV * +void _schedule (...) - PROTOTYPE: @ CODE: { static int incede; - api_cede_notself (); + api_cede_notself (aTHX); ++incede; - while (coro_nready >= incede && api_cede ()) + while (coro_nready >= incede && api_cede (aTHX)) ; sv_setsv (sv_activity, &PL_sv_undef); @@ -2386,3 +2631,97 @@ BOOT: PerlIO_define_layer (aTHX_ &PerlIO_cede); + +MODULE = Coro::State PACKAGE = Coro::Semaphore + +SV * +new (SV *klass, SV *count_ = 0) + CODE: +{ + /* a semaphore contains a counter IV in $sem->[0] and any waiters after that */ + AV *av = newAV (); + av_push (av, newSViv (count_ && SvOK (count_) ? SvIV (count_) : 1)); + RETVAL = sv_bless (newRV_noinc ((SV *)av), GvSTASH (CvGV (cv))); +} + OUTPUT: + RETVAL + +SV * +count (SV *self) + CODE: + RETVAL = newSVsv (AvARRAY ((AV *)SvRV (self))[0]); + OUTPUT: + RETVAL + +void +up (SV *self, int adjust = 1) + ALIAS: + adjust = 1 + CODE: +{ + AV *av = (AV *)SvRV (self); + SV *count_sv = AvARRAY (av)[0]; + IV count = SvIVX (count_sv); + + count += ix ? adjust : 1; + SvIVX (count_sv) = count; + + /* now wake up as many waiters as possible */ + while (count > 0 && AvFILLp (av) >= count) + { + SV *cb; + + /* swap first two elements so we can shift a waiter */ + AvARRAY (av)[0] = AvARRAY (av)[1]; + AvARRAY (av)[1] = count_sv; + cb = av_shift (av); + + if (SvOBJECT (cb)) + api_ready (aTHX_ cb); + else + croak ("callbacks not yet supported"); + + SvREFCNT_dec (cb); + } +} + +void +down (SV *self) + CODE: + api_execute_slf (aTHX_ cv, slf_init_semaphore_down, &ST (0), 1); + +void +try (SV *self) + PPCODE: +{ + AV *av = (AV *)SvRV (self); + SV *count_sv = AvARRAY (av)[0]; + IV count = SvIVX (count_sv); + + if (count > 0) + { + --count; + SvIVX (count_sv) = count; + XSRETURN_YES; + } + else + XSRETURN_NO; +} + +void +waiters (SV *self) + CODE: +{ + AV *av = (AV *)SvRV (self); + + if (GIMME_V == G_SCALAR) + XPUSHs (sv_2mortal (newSVsv (AvARRAY (av)[0]))); + else + { + int i; + EXTEND (SP, AvFILLp (av) + 1 - 1); + for (i = 1; i <= AvFILLp (av); ++i) + PUSHs (newSVsv (AvARRAY (av)[i])); + } +} +