--- Coro/Coro/State.xs 2008/11/25 09:49:43 1.328 +++ Coro/Coro/State.xs 2009/06/09 18:56:45 1.346 @@ -135,7 +135,7 @@ # define STACKLEVEL ((void *)&stacklevel) #endif -#define IN_DESTRUCT (PL_main_cv == Nullcv) +#define IN_DESTRUCT PL_dirty #if __GNUC__ >= 3 # define attribute(x) __attribute__(x) @@ -240,6 +240,7 @@ CF_READY = 0x0002, /* coroutine is ready */ CF_NEW = 0x0004, /* has never been switched to */ CF_DESTROYED = 0x0008, /* coroutine data has been freed */ + CF_SUSPENDED = 0x0010, /* coroutine can't be scheduled */ }; /* the structure where most of the perl state is stored, overlaid on the cxstack */ @@ -287,6 +288,10 @@ SV *invoke_cb; AV *invoke_av; + /* on_enter/on_leave */ + AV *on_enter; + AV *on_leave; + /* linked list */ struct coro *next, *prev; }; @@ -354,9 +359,66 @@ { HV *st; GV *gvp; - return sv_2cv (sv, &st, &gvp, 0); + CV *cv = sv_2cv (sv, &st, &gvp, 0); + + if (!cv) + croak ("code reference expected"); + + return cv; +} + +/*****************************************************************************/ +/* magic glue */ + +#define CORO_MAGIC_type_cv 26 +#define CORO_MAGIC_type_state PERL_MAGIC_ext + +#define CORO_MAGIC_NN(sv, type) \ + (expect_true (SvMAGIC (sv)->mg_type == type) \ + ? SvMAGIC (sv) \ + : mg_find (sv, type)) + +#define CORO_MAGIC(sv, type) \ + (expect_true (SvMAGIC (sv)) \ + ? CORO_MAGIC_NN (sv, type) \ + : 0) + +#define CORO_MAGIC_cv(cv) CORO_MAGIC (((SV *)(cv)), CORO_MAGIC_type_cv) +#define CORO_MAGIC_state(sv) CORO_MAGIC_NN (((SV *)(sv)), CORO_MAGIC_type_state) + +INLINE struct coro * +SvSTATE_ (pTHX_ SV *coro) +{ + HV *stash; + MAGIC *mg; + + if (SvROK (coro)) + coro = SvRV (coro); + + if (expect_false (SvTYPE (coro) != SVt_PVHV)) + croak ("Coro::State object required"); + + stash = SvSTASH (coro); + if (expect_false (stash != coro_stash && stash != coro_state_stash)) + { + /* very slow, but rare, check */ + if (!sv_derived_from (sv_2mortal (newRV_inc (coro)), "Coro::State")) + croak ("Coro::State object required"); + } + + mg = CORO_MAGIC_state (coro); + return (struct coro *)mg->mg_ptr; } +#define SvSTATE(sv) SvSTATE_ (aTHX_ (sv)) + +/* faster than SvSTATE, but expects a coroutine hv */ +#define SvSTATE_hv(hv) ((struct coro *)CORO_MAGIC_NN ((SV *)hv, CORO_MAGIC_type_state)->mg_ptr) +#define SvSTATE_current SvSTATE_hv (SvRV (coro_current)) + +/*****************************************************************************/ +/* padlist management and caching */ + static AV * coro_derive_padlist (pTHX_ CV *cv) { @@ -373,7 +435,7 @@ newpad = (AV *)AvARRAY (padlist)[AvFILLp (padlist)]; --AvFILLp (padlist); - av_store (newpadlist, 0, SvREFCNT_inc_NN (*av_fetch (padlist, 0, FALSE))); + av_store (newpadlist, 0, SvREFCNT_inc_NN (AvARRAY (padlist)[0])); av_store (newpadlist, 1, (SV *)newpad); return newpadlist; @@ -383,22 +445,26 @@ free_padlist (pTHX_ AV *padlist) { /* may be during global destruction */ - if (SvREFCNT (padlist)) + if (!IN_DESTRUCT) { I32 i = AvFILLp (padlist); - while (i >= 0) + + while (i > 0) /* special-case index 0 */ { - SV **svp = av_fetch (padlist, i--, FALSE); - if (svp) - { - SV *sv; - while (&PL_sv_undef != (sv = av_pop ((AV *)*svp))) - SvREFCNT_dec (sv); + /* we try to be extra-careful here */ + AV *av = (AV *)AvARRAY (padlist)[i--]; + I32 j = AvFILLp (av); - SvREFCNT_dec (*svp); - } + while (j >= 0) + SvREFCNT_dec (AvARRAY (av)[j--]); + + AvFILLp (av) = -1; + SvREFCNT_dec (av); } + SvREFCNT_dec (AvARRAY (padlist)[0]); + + AvFILLp (padlist) = -1; SvREFCNT_dec ((SV*)padlist); } } @@ -418,57 +484,11 @@ return 0; } -#define CORO_MAGIC_type_cv 26 -#define CORO_MAGIC_type_state PERL_MAGIC_ext - static MGVTBL coro_cv_vtbl = { 0, 0, 0, 0, coro_cv_free }; -#define CORO_MAGIC_NN(sv, type) \ - (expect_true (SvMAGIC (sv)->mg_type == type) \ - ? SvMAGIC (sv) \ - : mg_find (sv, type)) - -#define CORO_MAGIC(sv, type) \ - (expect_true (SvMAGIC (sv)) \ - ? CORO_MAGIC_NN (sv, type) \ - : 0) - -#define CORO_MAGIC_cv(cv) CORO_MAGIC (((SV *)(cv)), CORO_MAGIC_type_cv) -#define CORO_MAGIC_state(sv) CORO_MAGIC_NN (((SV *)(sv)), CORO_MAGIC_type_state) - -INLINE struct coro * -SvSTATE_ (pTHX_ SV *coro) -{ - HV *stash; - MAGIC *mg; - - if (SvROK (coro)) - coro = SvRV (coro); - - if (expect_false (SvTYPE (coro) != SVt_PVHV)) - croak ("Coro::State object required"); - - stash = SvSTASH (coro); - if (expect_false (stash != coro_stash && stash != coro_state_stash)) - { - /* very slow, but rare, check */ - if (!sv_derived_from (sv_2mortal (newRV_inc (coro)), "Coro::State")) - croak ("Coro::State object required"); - } - - mg = CORO_MAGIC_state (coro); - return (struct coro *)mg->mg_ptr; -} - -#define SvSTATE(sv) SvSTATE_ (aTHX_ (sv)) - -/* faster than SvSTATE, but expects a coroutine hv */ -#define SvSTATE_hv(hv) ((struct coro *)CORO_MAGIC_NN ((SV *)hv, CORO_MAGIC_type_state)->mg_ptr) -#define SvSTATE_current SvSTATE_hv (SvRV (coro_current)) - /* the next two functions merely cache the padlists */ static void get_padlist (pTHX_ CV *cv) @@ -505,7 +525,7 @@ av = (AV *)mg->mg_obj; if (expect_false (AvFILLp (av) >= AvMAX (av))) - av_extend (av, AvMAX (av) + 1); + av_extend (av, AvFILLp (av) + 1); AvARRAY (av)[++AvFILLp (av)] = (SV *)CvPADLIST (cv); } @@ -513,6 +533,9 @@ /** load & save, init *******************************************************/ static void +on_enterleave_call (pTHX_ SV *cb); + +static void load_perl (pTHX_ Coro__State c) { perl_slots *slot = c->slot; @@ -548,11 +571,27 @@ slf_frame = c->slf_frame; CORO_THROW = c->except; + + if (expect_false (c->on_enter)) + { + int i; + + for (i = 0; i <= AvFILLp (c->on_enter); ++i) + on_enterleave_call (aTHX_ AvARRAY (c->on_enter)[i]); + } } static void save_perl (pTHX_ Coro__State c) { + if (expect_false (c->on_leave)) + { + int i; + + for (i = AvFILLp (c->on_leave); i >= 0; --i) + on_enterleave_call (aTHX_ AvARRAY (c->on_leave)[i]); + } + c->except = CORO_THROW; c->slf_frame = slf_frame; @@ -761,11 +800,8 @@ /* * This overrides the default magic get method of %SIG elements. * The original one doesn't provide for reading back of PL_diehook/PL_warnhook - * and instead of tryign to save and restore the hash elements, we just provide + * and instead of trying to save and restore the hash elements, we just provide * readback here. - * We only do this when the hook is != 0, as they are often set to 0 temporarily, - * not expecting this to actually change the hook. This is a potential problem - * when a schedule happens then, but we ignore this. */ static int coro_sigelem_get (pTHX_ SV *sv, MAGIC *mg) @@ -828,7 +864,7 @@ if (svp) { SV *old = *svp; - *svp = newSVsv (sv); + *svp = SvOK (sv) ? newSVsv (sv) : 0; SvREFCNT_dec (old); return 0; } @@ -930,7 +966,7 @@ } static void -coro_destruct (pTHX_ struct coro *coro) +coro_unwind_stacks (pTHX) { if (!IN_DESTRUCT) { @@ -948,6 +984,12 @@ /* unwind main stack */ dounwind (-1); } +} + +static void +coro_destruct_perl (pTHX_ struct coro *coro) +{ + coro_unwind_stacks (aTHX); SvREFCNT_dec (GvSV (PL_defgv)); SvREFCNT_dec (GvAV (PL_defgv)); @@ -1269,7 +1311,7 @@ if (!cctx) return; - assert (cctx != cctx_current);//D temporary + assert (("FATAL: tried to destroy current cctx", cctx != cctx_current));//D temporary? --cctx_count; coro_destroy (&cctx->cctx); @@ -1343,13 +1385,10 @@ if (expect_true (prev != next)) { if (expect_false (!(prev->flags & (CF_RUNNING | CF_NEW)))) - croak ("Coro::State::transfer called with a suspended prev Coro::State, but can only transfer from running or new states,"); + croak ("Coro::State::transfer called with a blocked prev Coro::State, but can only transfer from running or new states,"); - if (expect_false (next->flags & CF_RUNNING)) - croak ("Coro::State::transfer called with running next Coro::State, but can only transfer to inactive states,"); - - if (expect_false (next->flags & CF_DESTROYED)) - croak ("Coro::State::transfer called with destroyed next Coro::State, but can only transfer to inactive states,"); + if (expect_false (next->flags & (CF_RUNNING | CF_DESTROYED | CF_SUSPENDED))) + croak ("Coro::State::transfer called with running, destroyed or suspended next Coro::State, but can only transfer to inactive states,"); #if !PERL_VERSION_ATLEAST (5,10,0) if (expect_false (PL_lex_state != LEX_NOTPARSING)) @@ -1397,8 +1436,6 @@ else load_perl (aTHX_ next); - assert (!prev->cctx);//D temporary - /* possibly untie and reuse the cctx */ if (expect_true ( cctx_current->idle_sp == STACKLEVEL @@ -1449,7 +1486,7 @@ if (coro->flags & CF_DESTROYED) return 0; - if (coro->on_destroy) + if (coro->on_destroy && !PL_dirty) coro->on_destroy (aTHX_ coro); coro->flags |= CF_DESTROYED; @@ -1463,18 +1500,21 @@ else coro->flags |= CF_READY; /* make sure it is NOT put into the readyqueue */ - if (coro->mainstack && coro->mainstack != main_mainstack) + if (coro->mainstack + && coro->mainstack != main_mainstack + && coro->slot + && !PL_dirty) { - struct coro temp; + struct coro *current = SvSTATE_current; - assert (("FATAL: tried to destroy currently running coroutine (please report)", !(coro->flags & CF_RUNNING))); + assert (("FATAL: tried to destroy currently running coroutine", coro->mainstack != PL_mainstack)); - save_perl (aTHX_ &temp); + save_perl (aTHX_ current); load_perl (aTHX_ coro); - coro_destruct (aTHX_ coro); + coro_destruct_perl (aTHX_ coro); - load_perl (aTHX_ &temp); + load_perl (aTHX_ current); coro->slot = 0; } @@ -1665,7 +1705,7 @@ struct coro *next = SvSTATE_hv (next_sv); /* cannot transfer to destroyed coros, skip and look for next */ - if (expect_false (next->flags & CF_DESTROYED)) + if (expect_false (next->flags & (CF_DESTROYED | CF_SUSPENDED))) SvREFCNT_dec (next_sv); /* coro_nready has already been taken care of by destroy */ else { @@ -1854,6 +1894,10 @@ frame->prepare = prepare_schedule; frame->check = slf_check_repeat; + + /* as a minor optimisation, we could unwind all stacks here */ + /* but that puts extra pressure on pp_slf, and is not worth much */ + /*coro_unwind_stacks (aTHX);*/ } /*****************************************************************************/ @@ -1946,16 +1990,17 @@ if (SvTYPE (SvRV (data)) != SVt_PVAV) { /* first call, set args */ - AV *av = newAV (); SV *coro = SvRV (data); + AV *av = newAV (); SvRV_set (data, (SV *)av); - api_ready (aTHX_ coro); - SvREFCNT_dec (coro); /* better take a full copy of the arguments */ while (items--) av_store (av, items, newSVsv (ST (items))); + + api_ready (aTHX_ coro); + SvREFCNT_dec (coro); } XSRETURN_EMPTY; @@ -1982,7 +2027,7 @@ for (i = 0; i <= AvFILLp (av); ++i) PUSHs (sv_2mortal (AvARRAY (av)[i])); - /* we have stolen the elements, so ste length to zero and free */ + /* we have stolen the elements, so set length to zero and free */ AvFILLp (av) = -1; av_undef (av); @@ -2285,6 +2330,53 @@ } /*****************************************************************************/ +/* dynamic wind */ + +static void +on_enterleave_call (pTHX_ SV *cb) +{ + dSP; + + PUSHSTACK; + + PUSHMARK (SP); + PUTBACK; + call_sv (cb, G_VOID | G_DISCARD); + SPAGAIN; + + POPSTACK; +} + +static SV * +coro_avp_pop_and_free (pTHX_ AV **avp) +{ + AV *av = *avp; + SV *res = av_pop (av); + + if (AvFILLp (av) < 0) + { + *avp = 0; + SvREFCNT_dec (av); + } + + return res; +} + +static void +coro_pop_on_enter (pTHX_ void *coro) +{ + SV *cb = coro_avp_pop_and_free (aTHX_ &((struct coro *)coro)->on_enter); + SvREFCNT_dec (cb); +} + +static void +coro_pop_on_leave (pTHX_ void *coro) +{ + SV *cb = coro_avp_pop_and_free (aTHX_ &((struct coro *)coro)->on_leave); + on_enterleave_call (aTHX_ sv_2mortal (cb)); +} + +/*****************************************************************************/ /* PerlIO::cede */ typedef struct @@ -2372,8 +2464,13 @@ /* unfortunately, building manually saves memory */ Newx (ary, 2, SV *); AvALLOC (av) = ary; - /*AvARRAY (av) = ary;*/ - SvPVX ((SV *)av) = (char *)ary; /* 5.8.8 needs this syntax instead of AvARRAY = ary */ +#if PERL_VERSION_ATLEAST (5,10,0) + AvARRAY (av) = ary; +#else + /* 5.8.8 needs this syntax instead of AvARRAY = ary, yet */ + /* -DDEBUGGING flags this as a bug, despite it perfectly working */ + SvPVX ((SV *)av) = (char *)ary; +#endif AvMAX (av) = 1; AvFILLp (av) = 0; ary [0] = newSViv (count); @@ -2912,7 +3009,7 @@ CODE: { #if CORO_CLONE - struct coro *ncoro = coro_clone (coro); + struct coro *ncoro = coro_clone (aTHX_ coro); MAGIC *mg; /* TODO: too much duplication */ ncoro->hv = newHV (); @@ -2984,39 +3081,34 @@ { if (coro->mainstack && ((coro->flags & CF_RUNNING) || coro->slot)) { - struct coro temp; + struct coro *current = SvSTATE_current; - if (!(coro->flags & CF_RUNNING)) + if (current != coro) { PUTBACK; - save_perl (aTHX_ &temp); + save_perl (aTHX_ current); load_perl (aTHX_ coro); + SPAGAIN; } - { - dSP; - ENTER; - SAVETMPS; - PUTBACK; - PUSHSTACK; - PUSHMARK (SP); + PUSHSTACK; - if (ix) - eval_sv (coderef, 0); - else - call_sv (coderef, G_KEEPERR | G_EVAL | G_VOID | G_DISCARD); + PUSHMARK (SP); + PUTBACK; - POPSTACK; - SPAGAIN; - FREETMPS; - LEAVE; - PUTBACK; - } + if (ix) + eval_sv (coderef, 0); + else + call_sv (coderef, G_KEEPERR | G_EVAL | G_VOID | G_DISCARD); + + POPSTACK; + SPAGAIN; - if (!(coro->flags & CF_RUNNING)) + if (current != coro) { + PUTBACK; save_perl (aTHX_ coro); - load_perl (aTHX_ &temp); + load_perl (aTHX_ current); SPAGAIN; } } @@ -3030,6 +3122,7 @@ is_running = CF_RUNNING is_new = CF_NEW is_destroyed = CF_DESTROYED + is_suspended = CF_SUSPENDED CODE: RETVAL = boolSV (coro->flags & ix); OUTPUT: @@ -3104,6 +3197,12 @@ SV *tmp = *src; *src = *dst; *dst = tmp; } +void +cancel (Coro::State self) + CODE: + coro_state_destroy (aTHX_ self); + coro_call_on_destroy (aTHX_ self); /* actually only for Coro objects */ + MODULE = Coro::State PACKAGE = Coro @@ -3113,7 +3212,7 @@ sv_pool_rss = coro_get_sv (aTHX_ "Coro::POOL_RSS" , TRUE); sv_pool_size = coro_get_sv (aTHX_ "Coro::POOL_SIZE" , TRUE); - cv_coro_run = get_cv ( "Coro::_terminate", GV_ADD); + cv_coro_run = get_cv ( "Coro::_coro_run" , GV_ADD); cv_coro_terminate = get_cv ( "Coro::terminate" , GV_ADD); coro_current = coro_get_sv (aTHX_ "Coro::current" , FALSE); SvREADONLY_on (coro_current); av_async_pool = coro_get_av (aTHX_ "Coro::async_pool", TRUE); @@ -3187,12 +3286,6 @@ CORO_EXECUTE_SLF_XS (slf_init_cede_notself); void -_cancel (Coro::State self) - CODE: - coro_state_destroy (aTHX_ self); - coro_call_on_destroy (aTHX_ self); - -void _set_current (SV *current) PROTOTYPE: $ CODE: @@ -3246,6 +3339,18 @@ RETVAL void +suspend (Coro::State self) + PROTOTYPE: $ + CODE: + self->flags |= CF_SUSPENDED; + +void +resume (Coro::State self) + PROTOTYPE: $ + CODE: + self->flags &= ~CF_SUSPENDED; + +void _pool_handler (...) CODE: CORO_EXECUTE_SLF_XS (slf_init_pool_handler); @@ -3308,6 +3413,31 @@ PPCODE: CORO_EXECUTE_SLF_XS (slf_init_rouse_wait); +void +on_enter (SV *block) + ALIAS: + on_leave = 1 + PROTOTYPE: & + CODE: +{ + struct coro *coro = SvSTATE_current; + AV **avp = ix ? &coro->on_leave : &coro->on_enter; + + block = (SV *)coro_sv_2cv (aTHX_ block); + + if (!*avp) + *avp = newAV (); + + av_push (*avp, SvREFCNT_inc (block)); + + if (!ix) + on_enterleave_call (aTHX_ block); + + LEAVE; /* pp_entersub unfortunately forces an ENTER/LEAVE around XS calls */ + SAVEDESTRUCTOR_X (ix ? coro_pop_on_leave : coro_pop_on_enter, (void *)coro); + ENTER; /* pp_entersub unfortunately forces an ENTER/LEAVE around XS calls */ +} + MODULE = Coro::State PACKAGE = PerlIO::cede @@ -3327,7 +3457,7 @@ OUTPUT: RETVAL -# helper for Coro::Channel +# helper for Coro::Channel and others SV * _alloc (int count) CODE: @@ -3395,6 +3525,22 @@ } } +MODULE = Coro::State PACKAGE = Coro::SemaphoreSet + +void +_may_delete (SV *sem, int count, int extra_refs) + PPCODE: +{ + AV *av = (AV *)SvRV (sem); + + if (SvREFCNT ((SV *)av) == 1 + extra_refs + && AvFILLp (av) == 0 /* no waiters, just count */ + && SvIV (AvARRAY (av)[0]) == count) + XSRETURN_YES; + + XSRETURN_NO; +} + MODULE = Coro::State PACKAGE = Coro::Signal SV *