--- Coro/Coro/State.xs 2001/07/23 23:48:05 1.14 +++ Coro/Coro/State.xs 2003/11/23 03:34:13 1.49 @@ -2,30 +2,83 @@ #include "perl.h" #include "XSUB.h" +#include "patchlevel.h" + +#if PATCHLEVEL < 6 +# ifndef PL_ppaddr +# define PL_ppaddr ppaddr +# endif +# ifndef call_sv +# define call_sv perl_call_sv +# endif +# ifndef get_sv +# define get_sv perl_get_sv +# endif +# ifndef get_cv +# define get_cv perl_get_cv +# endif +# ifndef IS_PADGV +# define IS_PADGV(v) 0 +# endif +# ifndef IS_PADCONST +# define IS_PADCONST(v) 0 +# endif +#endif + #include "libcoro/coro.c" +#include + #ifdef HAVE_MMAP # include # include +# ifndef MAP_ANONYMOUS +# ifdef MAP_ANON +# define MAP_ANONYMOUS MAP_ANON +# else +# undef HAVE_MMAP +# endif +# endif #endif -#define MAY_FLUSH /* increases codesize */ - -#define TRANSFER_SAVE_DEFAV 0x00000001 -#define TRANSFER_SAVE_DEFSV 0x00000002 -#define TRANSFER_SAVE_ERRSV 0x00000004 -#define TRANSFER_SAVE_CCTXT 0x00000008 - -#define TRANSFER_SAVE_ALL -1 +#define MAY_FLUSH /* increases codesize and is rarely used */ #define SUB_INIT "Coro::State::initialize" #define UCORO_STATE "_coro_state" -struct coro { - /* the optional C context */ +/* The next macro should declare a variable stacklevel that contains and approximation + * to the current C stack pointer. Its property is that it changes with each call + * and should be unique. */ +#define dSTACKLEVEL void *stacklevel = &stacklevel + +#define IN_DESTRUCT (PL_main_cv == Nullcv) + +#define labs(l) ((l) >= 0 ? (l) : -(l)) + +#include "CoroAPI.h" + +static struct CoroAPI coroapi; + +/* this is actually not only the c stack but also c registers etc... */ +typedef struct { + int refcnt; /* pointer reference counter */ + int usecnt; /* shared by how many coroutines */ + int gencnt; /* generation counter */ + coro_context cctx; + void *sptr; - long ssize; + long ssize; /* positive == mmap, otherwise malloc */ +} coro_stack; + +struct coro { + /* the top-level JMPENV for each coroutine, needed to catch dies. */ + JMPENV start_env; + + /* the optional C context */ + coro_stack *stack; + void *cursp; + int gencnt; /* optionally saved, might be zero */ AV *defav; @@ -34,6 +87,7 @@ /* saved global state not related to stacks */ U8 dowarn; + I32 in_eval; /* the stacks and related info (callchain etc..) */ PERL_SI *curstackinfo; @@ -42,6 +96,7 @@ SV **stack_sp; OP *op; SV **curpad; + AV *comppad; SV **stack_base; SV **stack_max; SV **tmps_stack; @@ -61,7 +116,6 @@ I32 retstack_ix; I32 retstack_max; COP *curcop; - JMPENV start_env; JMPENV *top_env; /* data associated with this coroutine (initial args) */ @@ -76,6 +130,7 @@ static SV *ucoro_state_sv; static U32 ucoro_state_hash; static HV *padlist_cache; +static SV *coro_mortal; /* will be freed after next transfer */ /* mostly copied from op.c:cv_clone2 */ STATIC AV * @@ -131,7 +186,9 @@ sv = (SV *) newHV (); else sv = NEWSV (0, 0); +#ifdef SvPADBUSY if (!SvPADBUSY (sv)) +#endif SvPADMY_on (sv); npad[ix] = sv; } @@ -172,7 +229,7 @@ } #ifdef MAY_FLUSH -STATIC AV * +STATIC void free_padlist (AV *padlist) { /* may be during global destruction */ @@ -247,15 +304,16 @@ #define SB do { #define SE } while (0) -#define LOAD(state) SB load_state(aTHX_ (state)); SPAGAIN; SE -#define SAVE(state,flags) SB PUTBACK; save_state(aTHX_ (state),(flags)); SE +#define LOAD(state) load_state(aTHX_ (state)); +#define SAVE(state,flags) save_state(aTHX_ (state),(flags)); -#define REPLACE_SV(sv,val) SB SvREFCNT_dec(sv); (sv) = (val); SE +#define REPLACE_SV(sv,val) SB SvREFCNT_dec(sv); (sv) = (val); (val) = 0; SE static void load_state(pTHX_ Coro__State c) { PL_dowarn = c->dowarn; + PL_in_eval = c->in_eval; PL_curstackinfo = c->curstackinfo; PL_curstack = c->curstack; @@ -263,6 +321,7 @@ PL_stack_sp = c->stack_sp; PL_op = c->op; PL_curpad = c->curpad; + PL_comppad = c->comppad; PL_stack_base = c->stack_base; PL_stack_max = c->stack_max; PL_tmps_stack = c->tmps_stack; @@ -282,7 +341,6 @@ PL_retstack_ix = c->retstack_ix; PL_retstack_max = c->retstack_max; PL_curcop = c->curcop; - PL_start_env = c->start_env; PL_top_env = c->top_env; if (c->defav) REPLACE_SV (GvAV (PL_defgv), c->defav); @@ -332,7 +390,7 @@ /* this loop was inspired by pp_caller */ for (;;) { - do + while (cxix >= 0) { PERL_CONTEXT *cx = &ccstk[cxix--]; @@ -361,14 +419,15 @@ get_padlist (cv); /* this is a monster */ } } +#ifdef CXt_FORMAT else if (CxTYPE(cx) == CXt_FORMAT) { /* I never used formats, so how should I know how these are implemented? */ /* my bold guess is as a simple, plain sub... */ croak ("CXt_FORMAT not yet handled. Don't switch coroutines from within formats"); } +#endif } - while (cxix >= 0); if (top_si->si_type == PERLSI_MAIN) break; @@ -385,12 +444,8 @@ c->defsv = flags & TRANSFER_SAVE_DEFSV ? SvREFCNT_inc (DEFSV) : 0; c->errsv = flags & TRANSFER_SAVE_ERRSV ? SvREFCNT_inc (ERRSV) : 0; - /* I have not the slightest idea of why av_reify is necessary */ - /* but if it's missing the defav contents magically get replaced sometimes */ - if (c->defav) - av_reify (c->defav); - c->dowarn = PL_dowarn; + c->in_eval = PL_in_eval; c->curstackinfo = PL_curstackinfo; c->curstack = PL_curstack; @@ -398,6 +453,7 @@ c->stack_sp = PL_stack_sp; c->op = PL_op; c->curpad = PL_curpad; + c->comppad = PL_comppad; c->stack_base = PL_stack_base; c->stack_max = PL_stack_max; c->tmps_stack = PL_tmps_stack; @@ -417,7 +473,6 @@ c->retstack_ix = PL_retstack_ix; c->retstack_max = PL_retstack_max; c->curcop = PL_curcop; - c->start_env = PL_start_env; c->top_env = PL_top_env; } @@ -439,24 +494,26 @@ PL_stack_sp = PL_stack_base; PL_stack_max = PL_stack_base + AvMAX(PL_curstack); - New(50,PL_tmps_stack,64,SV*); + New(50,PL_tmps_stack,96,SV*); PL_tmps_floor = -1; PL_tmps_ix = -1; - PL_tmps_max = 64; + PL_tmps_max = 96; - New(54,PL_markstack,12,I32); + New(54,PL_markstack,16,I32); PL_markstack_ptr = PL_markstack; - PL_markstack_max = PL_markstack + 12; + PL_markstack_max = PL_markstack + 16; +#ifdef SET_MARK_OFFSET SET_MARK_OFFSET; +#endif - New(54,PL_scopestack,12,I32); + New(54,PL_scopestack,16,I32); PL_scopestack_ix = 0; - PL_scopestack_max = 12; + PL_scopestack_max = 16; - New(54,PL_savestack,64,ANY); + New(54,PL_savestack,96,ANY); PL_savestack_ix = 0; - PL_savestack_max = 64; + PL_savestack_max = 96; New(54,PL_retstack,8,OP*); PL_retstack_ix = 0; @@ -465,18 +522,20 @@ /* * destroy the stacks, the callchain etc... - * still there is a memleak of 128 bytes... */ STATIC void destroy_stacks(pTHX) { - /* is this ugly, I ask? */ - while (PL_scopestack_ix) - LEAVE; - - /* sure it is, but more important: is it correct?? :/ */ - while (PL_tmps_ix > PL_tmps_floor) /* should only ever be one iteration */ - FREETMPS; + if (!IN_DESTRUCT) + { + /* is this ugly, I ask? */ + while (PL_scopestack_ix) + LEAVE; + + /* sure it is, but more important: is it correct?? :/ */ + while (PL_tmps_ix > PL_tmps_floor) /* should only ever be one iteration */ + FREETMPS; + } while (PL_curstackinfo->si_next) PL_curstackinfo = PL_curstackinfo->si_next; @@ -491,9 +550,12 @@ PUTBACK; /* possibly superfluous */ } - dounwind(-1); + if (!IN_DESTRUCT) + { + dounwind(-1); + SvREFCNT_dec(PL_curstackinfo->si_stack); + } - SvREFCNT_dec(PL_curstackinfo->si_stack); Safefree(PL_curstackinfo->si_cxstack); Safefree(PL_curstackinfo); PL_curstackinfo = p; @@ -507,31 +569,59 @@ } static void -allocate_stack (Coro__State ctx) +allocate_stack (Coro__State ctx, int alloc) { -#ifdef HAVE_MMAP - ctx->ssize = 128 * 1024 * sizeof (long); /* mmap should do allocate-on-use */ - ctx->sptr = mmap (0, ctx->ssize, PROT_EXEC|PROT_READ|PROT_WRITE, MAP_PRIVATE|MAP_ANON, 0, 0); - if (ctx->sptr == (void *)-1) -#endif + coro_stack *stack; + + New (0, stack, 1, coro_stack); + + stack->refcnt = 1; + stack->usecnt = 1; + stack->gencnt = ctx->gencnt = 0; + if (alloc) { - /*FIXME*//*D*//* reasonable stack size! */ - ctx->ssize = 4096 * sizeof (long); - New (0, ctx->sptr, 4096, long); +#if HAVE_MMAP + stack->ssize = 128 * 1024 * sizeof (long); /* mmap should do allocate-on-write for us */ + stack->sptr = mmap (0, stack->ssize, PROT_EXEC|PROT_READ|PROT_WRITE, MAP_PRIVATE|MAP_ANONYMOUS, 0, 0); + if (stack->sptr == (void *)-1) +#endif + { + /*FIXME*//*D*//* reasonable stack size! */ + stack->ssize = -4096 * sizeof (long); + New (0, stack->sptr, 4096, long); + } } + else + stack->sptr = 0; + + ctx->stack = stack; } static void deallocate_stack (Coro__State ctx) { + coro_stack *stack = ctx->stack; + + ctx->stack = 0; + + if (stack) + { + if (!--stack->refcnt) + { #ifdef HAVE_MMAP - munmap (ctx->sptr, ctx->ssize); -#else - Safefree (ctx->sptr); + if (stack->ssize > 0 && stack->sptr) + munmap (stack->sptr, stack->ssize); + else #endif + Safefree (stack->sptr); + + Safefree (stack); + } + else if (ctx->gencnt == stack->gencnt) + --stack->usecnt; + } } -/* might go away together with optional SAVE_CCTXT */ static void setup_coro (void *arg) { @@ -543,19 +633,25 @@ SV *sub_init = (SV*)get_cv(SUB_INIT, FALSE); coro_init_stacks (aTHX); - JMPENV_BOOTSTRAP; - SPAGAIN; - /*PL_curcop = 0;*/ + /*PL_in_eval = PL_in_eval;*/ /* inherit */ SvREFCNT_dec (GvAV (PL_defgv)); - GvAV (PL_defgv) = ctx->args; + GvAV (PL_defgv) = ctx->args; ctx->args = 0; + + SPAGAIN; - if (ctx->sptr) + if (ctx->stack) { + ctx->cursp = 0; + PUSHMARK(SP); PUTBACK; - (void) call_sv (sub_init, G_VOID|G_NOARGS); - croak ("FATAL: CCTXT coroutine returned!"); + (void) call_sv (sub_init, G_VOID|G_NOARGS|G_EVAL); + + if (SvTRUE (ERRSV)) + croak (NULL); + else + croak ("FATAL: CCTXT coroutine returned!"); } else { @@ -577,26 +673,41 @@ * ah yes, and I don't care anyways ;) */ PUTBACK; - PL_op = pp_entersub(); + PL_op = PL_ppaddr[OP_ENTERSUB](aTHX); SPAGAIN; ENTER; /* necessary e.g. for dounwind */ } } +static void +continue_coro (void *arg) +{ + /* + * this is a _very_ stripped down perl interpreter ;) + */ + Coro__State ctx = (Coro__State)arg; + JMPENV coro_start_env; + + PL_top_env = &ctx->start_env; + + ctx->cursp = 0; + PL_op = PL_op->op_next; + CALLRUNOPS(aTHX); + + abort (); +} + STATIC void transfer(pTHX_ struct coro *prev, struct coro *next, int flags) { - dSP; + dSTACKLEVEL; + static struct coro *xnext; if (prev != next) { - /* - * this could be done in newprocess which would lead to - * extremely elegant and fast (basically just SAVE/LOAD) - * code here, but lazy allocation of stacks has also - * some virtues and the overhead of the if() is nil. - */ + xnext = next; + if (next->mainstack) { SAVE (prev, flags); @@ -606,15 +717,30 @@ next->mainstack = 0; next->tmps_ix = -2; + /* stacklevel changed? if yes, grab the stack for us! */ if (flags & TRANSFER_SAVE_CCTXT) { - if (!next->ssize) - croak ("destination coroutine has no CCTXT (%p, %d)", next->sptr, next->ssize); + if (!prev->stack) + allocate_stack (prev, 0); + else if (prev->cursp != stacklevel + && prev->stack->usecnt > 1) + { + prev->gencnt = ++prev->stack->gencnt; + prev->stack->usecnt = 1; + } - if (!prev->ssize) - prev->ssize = 1; /* mark cctx as valid ;) */ + /* has our stack been invalidated? */ + if (next->stack && next->stack->gencnt != next->gencnt) + { + deallocate_stack (next); + allocate_stack (next, 1); + coro_create (&(next->stack->cctx), + continue_coro, (void *)next, + next->stack->sptr, labs (next->stack->ssize)); + } - coro_transfer (&(prev->cctx), &(next->cctx)); + coro_transfer (&(prev->stack->cctx), &(next->stack->cctx)); + /* don't add any code here */ } } @@ -626,23 +752,174 @@ if (flags & TRANSFER_SAVE_CCTXT) { - if (!next->ssize) + if (!prev->stack) + allocate_stack (prev, 0); + + if (prev->stack->sptr && flags & TRANSFER_LAZY_STACK) { - allocate_stack (next); - coro_create (&(next->cctx), - setup_coro, (void *)next, - next->sptr, next->ssize); - } + PL_top_env = &next->start_env; - if (!prev->ssize) - prev->ssize = 1; /* mark cctx as valid ;) */ + setup_coro (next); - coro_transfer (&(prev->cctx), &(next->cctx)); + prev->stack->refcnt++; + prev->stack->usecnt++; + next->stack = prev->stack; + next->gencnt = prev->gencnt; + } + else + { + assert (!next->stack); + allocate_stack (next, 1); + coro_create (&(next->stack->cctx), + setup_coro, (void *)next, + next->stack->sptr, labs (next->stack->ssize)); + coro_transfer (&(prev->stack->cctx), &(next->stack->cctx)); + /* don't add any code here */ + } } else setup_coro (next); } + + /* + * xnext is now either prev or next, depending on wether + * we switched the c stack or not. that's why I use a global + * variable, that should become thread-specific at one point. + */ + xnext->cursp = stacklevel; } + + if (coro_mortal) + { + SvREFCNT_dec (coro_mortal); + coro_mortal = 0; + } +} + +#define SV_CORO(sv,func) \ + do { \ + if (SvROK (sv)) \ + sv = SvRV (sv); \ + \ + if (SvTYPE(sv) == SVt_PVHV) \ + { \ + HE *he = hv_fetch_ent((HV *)sv, ucoro_state_sv, 0, ucoro_state_hash); \ + \ + if (!he) \ + croak ("%s() -- %s is a hashref but lacks the " UCORO_STATE " key", func, # sv); \ + \ + (sv) = SvRV (HeVAL(he)); \ + } \ + \ + /* must also be changed inside Coro::Cont::yield */ \ + if (!SvOBJECT(sv) || SvSTASH(sv) != coro_state_stash) \ + croak ("%s() -- %s is not (and contains not) a Coro::State object", func, # sv); \ + \ + } while(0) + +#define SvSTATE(sv) (struct coro *)SvIV (sv) + +static void +api_transfer(pTHX_ SV *prev, SV *next, int flags) +{ + SV_CORO (prev, "Coro::transfer"); + SV_CORO (next, "Coro::transfer"); + + transfer(aTHX_ SvSTATE(prev), SvSTATE(next), flags); +} + +/** Coro ********************************************************************/ + +#define PRIO_MAX 3 +#define PRIO_HIGH 1 +#define PRIO_NORMAL 0 +#define PRIO_LOW -1 +#define PRIO_IDLE -3 +#define PRIO_MIN -4 + +/* for Coro.pm */ +static GV *coro_current, *coro_idle; +static AV *coro_ready[PRIO_MAX-PRIO_MIN+1]; +static int coro_nready; + +static void +coro_enq (SV *sv) +{ + if (SvTYPE (sv) == SVt_PVHV) + { + SV **xprio = hv_fetch ((HV *)sv, "prio", 4, 0); + int prio = xprio ? SvIV (*xprio) : PRIO_NORMAL; + + prio = prio > PRIO_MAX ? PRIO_MAX + : prio < PRIO_MIN ? PRIO_MIN + : prio; + + av_push (coro_ready [prio - PRIO_MIN], sv); + coro_nready++; + + return; + } + + croak ("Coro::ready tried to enqueue something that is not a coroutine"); +} + +static SV * +coro_deq (int min_prio) +{ + int prio = PRIO_MAX - PRIO_MIN; + + min_prio -= PRIO_MIN; + if (min_prio < 0) + min_prio = 0; + + for (prio = PRIO_MAX - PRIO_MIN + 1; --prio >= min_prio; ) + if (av_len (coro_ready[prio]) >= 0) + { + coro_nready--; + return av_shift (coro_ready[prio]); + } + + return 0; +} + +static void +api_ready (SV *coro) +{ + if (SvROK (coro)) + coro = SvRV (coro); + + coro_enq (SvREFCNT_inc (coro)); +} + +static void +api_schedule (void) +{ + SV *prev, *next; + + prev = SvRV (GvSV (coro_current)); + next = coro_deq (PRIO_MIN); + + if (!next) + next = SvREFCNT_inc (SvRV (GvSV (coro_idle))); + + /* free this only after the transfer */ + coro_mortal = prev; + SV_CORO (prev, "Coro::schedule"); + + SvRV (GvSV (coro_current)) = next; + + SV_CORO (next, "Coro::schedule"); + + transfer (aTHX_ SvSTATE (prev), SvSTATE (next), + TRANSFER_SAVE_ALL | TRANSFER_LAZY_STACK); +} + +static void +api_cede (void) +{ + coro_enq (SvREFCNT_inc (SvRV (GvSV (coro_current)))); + + api_schedule (); } MODULE = Coro::State PACKAGE = Coro::State @@ -664,6 +941,9 @@ padlist_cache = newHV (); main_mainstack = PL_mainstack; + + coroapi.ver = CORO_API_VERSION; + coroapi.transfer = api_transfer; } Coro::State @@ -676,25 +956,34 @@ if (!SvROK (args) || SvTYPE (SvRV (args)) != SVt_PVAV) croak ("Coro::State::_newprocess expects an arrayref"); - New (0, coro, 1, struct coro); + Newz (0, coro, 1, struct coro); coro->args = (AV *)SvREFCNT_inc (SvRV (args)); coro->mainstack = 0; /* actual work is done inside transfer */ - coro->sptr = 0; - coro->ssize = 0; + coro->stack = 0; + + /* same as JMPENV_BOOTSTRAP */ + /* we might be able to recycle start_env, but safe is safe */ + //Zero(&coro->start_env, 1, JMPENV); + coro->start_env.je_ret = -1; + coro->start_env.je_mustcatch = TRUE; RETVAL = coro; OUTPUT: RETVAL void -transfer(prev, next, flags = TRANSFER_SAVE_ALL) - Coro::State_or_hashref prev - Coro::State_or_hashref next - int flags +transfer(prev, next, flags) + SV *prev + SV *next + int flags PROTOTYPE: @ CODE: - transfer (aTHX_ prev, next, flags); + PUTBACK; + SV_CORO (next, "Coro::transfer"); + SV_CORO (prev, "Coro::transfer"); + transfer (aTHX_ SvSTATE (prev), SvSTATE (next), flags); + SPAGAIN; void DESTROY(coro) @@ -705,22 +994,21 @@ { struct coro temp; + PUTBACK; SAVE(aTHX_ (&temp), TRANSFER_SAVE_ALL); LOAD(aTHX_ coro); + SPAGAIN; destroy_stacks (aTHX); LOAD((&temp)); /* this will get rid of defsv etc.. */ + SPAGAIN; coro->mainstack = 0; } - if (coro->sptr) - { - deallocate_stack (coro); - coro->sptr = 0; - } - + deallocate_stack (coro); + SvREFCNT_dec (coro->args); Safefree (coro); void @@ -730,9 +1018,22 @@ flush_padlist_cache (); #endif +void +_exit(code) + int code + PROTOTYPE: $ + CODE: +#if defined(__GLIBC__) || _POSIX_C_SOURCE + _exit (code); +#else + signal (SIGTERM, SIG_DFL); + raise (SIGTERM); + exit (code); +#endif + MODULE = Coro::State PACKAGE = Coro::Cont -# this is slightly dirty +# this is slightly dirty (should expose a c-level api) void yield(...) @@ -744,7 +1045,7 @@ struct coro *prev, *next; if (!returnstk) - returnstk = SvRV (get_sv ("Coro::Cont::return", FALSE)); + returnstk = SvRV ((SV *)get_sv ("Coro::Cont::return", FALSE)); /* set up @_ -- ugly */ av_clear (defav); @@ -760,3 +1061,67 @@ transfer(aTHX_ prev, next, 0); +MODULE = Coro::State PACKAGE = Coro + +# this is slightly dirty (should expose a c-level api) + +BOOT: +{ + int i; + HV *stash = gv_stashpv ("Coro", TRUE); + + newCONSTSUB (stash, "PRIO_MAX", newSViv (PRIO_MAX)); + newCONSTSUB (stash, "PRIO_HIGH", newSViv (PRIO_HIGH)); + newCONSTSUB (stash, "PRIO_NORMAL", newSViv (PRIO_NORMAL)); + newCONSTSUB (stash, "PRIO_LOW", newSViv (PRIO_LOW)); + newCONSTSUB (stash, "PRIO_IDLE", newSViv (PRIO_IDLE)); + newCONSTSUB (stash, "PRIO_MIN", newSViv (PRIO_MIN)); + + coro_current = gv_fetchpv ("Coro::current", TRUE, SVt_PV); + coro_idle = gv_fetchpv ("Coro::idle" , TRUE, SVt_PV); + + for (i = PRIO_MAX - PRIO_MIN + 1; i--; ) + coro_ready[i] = newAV (); + + { + SV *sv = perl_get_sv("Coro::API", 1); + + coroapi.schedule = api_schedule; + coroapi.cede = api_cede; + coroapi.ready = api_ready; + coroapi.nready = &coro_nready; + coroapi.current = coro_current; + + GCoroAPI = &coroapi; + sv_setiv(sv, (IV)&coroapi); + SvREADONLY_on(sv); + } +} + +void +ready(self) + SV * self + PROTOTYPE: $ + CODE: + api_ready (self); + +int +nready(...) + PROTOTYPE: + CODE: + RETVAL = coro_nready; + OUTPUT: + RETVAL + +void +schedule(...) + PROTOTYPE: + CODE: + api_schedule (); + +void +cede(...) + PROTOTYPE: + CODE: + api_cede (); +