--- Coro/Coro/State.xs 2007/10/08 02:50:23 1.202 +++ Coro/Coro/State.xs 2008/02/13 15:46:00 1.223 @@ -12,6 +12,7 @@ #include #include #include +#include /* portable stdint.h */ #ifdef HAVE_MMAP # include @@ -73,11 +74,6 @@ # endif #endif -/* 5.8.7 */ -#ifndef SvRV_set -# define SvRV_set(s,v) SvRV(s) = (v) -#endif - /* 5.8.8 */ #ifndef GV_NOTQUAL # define GV_NOTQUAL 0 @@ -86,6 +82,16 @@ # define newSV(l) NEWSV(0,l) #endif +/* 5.11 */ +#ifndef CxHASARGS +# define CxHASARGS(cx) (cx)->blk_sub.hasargs +#endif + +/* 5.8.7 */ +#ifndef SvRV_set +# define SvRV_set(s,v) SvRV(s) = (v) +#endif + #if !__i386 && !__x86_64 && !__powerpc && !__m68k && !__alpha && !__mips && !__sparc64 # undef CORO_STACKGUARD #endif @@ -132,8 +138,6 @@ # define UNLOCK (void)0 #endif -#define strpair(const) const, sizeof (const) - 1 - /* helper storage struct for Coro::AIO */ struct io_state { @@ -148,14 +152,13 @@ static AV *main_mainstack; /* used to differentiate between $main and others */ static JMPENV *main_top_env; static HV *coro_state_stash, *coro_stash; -static SV *coro_mortal; /* will be freed after next transfer */ +static volatile SV *coro_mortal; /* will be freed after next transfer */ static GV *irsgv; /* $/ */ static GV *stdoutgv; /* *STDOUT */ - +static SV *rv_diehook; +static SV *rv_warnhook; static HV *hv_sig; /* %SIG */ -static SV *sv_diehook; -static SV *sv_warnhook; /* async_pool helper stuff */ static SV *sv_pool_rss; @@ -223,24 +226,23 @@ AV *mainstack; perl_slots *slot; /* basically the saved sp */ - /* data associated with this coroutine (initial args) */ - AV *args; - int refcnt; - int flags; /* CF_ flags */ + AV *args; /* data associated with this coroutine (initial args) */ + int refcnt; /* coroutines are refcounted, yes */ + int flags; /* CF_ flags */ + HV *hv; /* the perl hash associated with this coro, if any */ /* statistics */ int usecount; /* number of transfers to this coro */ /* coro process data */ int prio; - SV *throw; + SV *throw; /* exception to be thrown */ /* async_pool */ SV *saved_deffh; /* linked list */ struct coro *next, *prev; - HV *hv; /* the perl hash associated with this coro, if any */ }; typedef struct coro *Coro__State; @@ -264,7 +266,7 @@ /** lowlevel stuff **********************************************************/ static SV * -coro_get_sv (const char *name, int create) +coro_get_sv (pTHX_ const char *name, int create) { #if PERL_VERSION_ATLEAST (5,9,0) /* silence stupid and wrong 5.10 warning that I am unable to switch off */ @@ -274,7 +276,7 @@ } static AV * -coro_get_av (const char *name, int create) +coro_get_av (pTHX_ const char *name, int create) { #if PERL_VERSION_ATLEAST (5,9,0) /* silence stupid and wrong 5.10 warning that I am unable to switch off */ @@ -284,7 +286,7 @@ } static HV * -coro_get_hv (const char *name, int create) +coro_get_hv (pTHX_ const char *name, int create) { #if PERL_VERSION_ATLEAST (5,9,0) /* silence stupid and wrong 5.10 warning that I am unable to switch off */ @@ -349,22 +351,27 @@ while (&PL_sv_undef != (SV *)(padlist = (AV *)av_pop (av))) free_padlist (aTHX_ padlist); - SvREFCNT_dec (av); - return 0; } -#define PERL_MAGIC_coro PERL_MAGIC_ext +#define CORO_MAGIC_type_cv PERL_MAGIC_ext +#define CORO_MAGIC_type_state PERL_MAGIC_ext -static MGVTBL vtbl_coro = {0, 0, 0, 0, coro_cv_free}; +static MGVTBL coro_cv_vtbl = { + 0, 0, 0, 0, + coro_cv_free +}; -#define CORO_MAGIC(cv) \ - SvMAGIC (cv) \ - ? SvMAGIC (cv)->mg_type == PERL_MAGIC_coro \ - ? SvMAGIC (cv) \ - : mg_find ((SV *)cv, PERL_MAGIC_coro) \ +#define CORO_MAGIC(sv,type) \ + SvMAGIC (sv) \ + ? SvMAGIC (sv)->mg_type == type \ + ? SvMAGIC (sv) \ + : mg_find (sv, type) \ : 0 +#define CORO_MAGIC_cv(cv) CORO_MAGIC (((SV *)(cv)), CORO_MAGIC_type_cv) +#define CORO_MAGIC_state(sv) CORO_MAGIC (((SV *)(sv)), CORO_MAGIC_type_state) + static struct coro * SvSTATE_ (pTHX_ SV *coro) { @@ -385,7 +392,7 @@ croak ("Coro::State object required"); } - mg = CORO_MAGIC (coro); + mg = CORO_MAGIC_state (coro); return (struct coro *)mg->mg_ptr; } @@ -395,7 +402,7 @@ static void get_padlist (pTHX_ CV *cv) { - MAGIC *mg = CORO_MAGIC (cv); + MAGIC *mg = CORO_MAGIC_cv (cv); AV *av; if (expect_true (mg && AvFILLp ((av = (AV *)mg->mg_obj)) >= 0)) @@ -417,16 +424,11 @@ static void put_padlist (pTHX_ CV *cv) { - MAGIC *mg = CORO_MAGIC (cv); + MAGIC *mg = CORO_MAGIC_cv (cv); AV *av; if (expect_false (!mg)) - { - sv_magic ((SV *)cv, 0, PERL_MAGIC_coro, 0, 0); - mg = mg_find ((SV *)cv, PERL_MAGIC_coro); - mg->mg_virtual = &vtbl_coro; - mg->mg_obj = (SV *)newAV (); - } + mg = sv_magicext ((SV *)cv, (SV *)newAV (), CORO_MAGIC_type_cv, &coro_cv_vtbl, 0, 0); av = (AV *)mg->mg_obj; @@ -455,9 +457,6 @@ # include "state.h" #undef VAR - /*hv_store (hv_sig, strpair ("__DIE__" ), SvREFCNT_inc (sv_diehook ), 0);*/ - /*hv_store (hv_sig, strpair ("__WARN__"), SvREFCNT_inc (sv_warnhook), 0);*/ - { dSP; @@ -559,7 +558,7 @@ * on the (sometimes correct) assumption that coroutines do * not usually need a lot of stackspace. */ -#if 1 +#if CORO_PREFER_PERL_FUNCTIONS # define coro_init_stacks init_stacks #else static void @@ -672,6 +671,56 @@ /** coroutine stack handling ************************************************/ +static int (*orig_sigelem_get) (pTHX_ SV *sv, MAGIC *mg); +static int (*orig_sigelem_set) (pTHX_ SV *sv, MAGIC *mg); + +/* + * This overrides the default magic get method of %SIG elements. + * The original one doesn't provide for reading back of PL_diehook/PL_warnhook + * and instead of tryign to save and restore the hash elements, we just provide + * readback here. + * We only do this when the hook is != 0, as they are often set to 0 temporarily, + * not expecting this to actually change the hook. This is a potential problem + * when a schedule happens then, but we ignore this. + */ +static int +coro_sigelem_get (pTHX_ SV *sv, MAGIC *mg) +{ + const char *s = MgPV_nolen_const (mg); + + if (*s == '_') + { + if (strEQ (s, "__DIE__" ) && PL_diehook ) return sv_setsv (sv, PL_diehook ), 0; + if (strEQ (s, "__WARN__") && PL_warnhook) return sv_setsv (sv, PL_warnhook), 0; + } + + return orig_sigelem_get ? orig_sigelem_get (aTHX_ sv, mg) : 0; +} + +static int +coro_sigelem_set (pTHX_ SV *sv, MAGIC *mg) +{ + const char *s = MgPV_nolen_const (mg); + + if (*s == '_') + { + SV **svp = 0; + + if (strEQ (s, "__DIE__" )) svp = &PL_diehook; + if (strEQ (s, "__WARN__")) svp = &PL_warnhook; + + if (svp) + { + SV *old = *svp; + *svp = newSVsv (sv); + SvREFCNT_dec (old); + return; + } + } + + return orig_sigelem_set ? orig_sigelem_set (aTHX_ sv, mg) : 0; +} + static void coro_setup (pTHX_ struct coro *coro) { @@ -689,8 +738,10 @@ PL_localizing = 0; PL_dirty = 0; PL_restartop = 0; - PL_diehook = 0; hv_store (hv_sig, strpair ("__DIE__" ), SvREFCNT_inc (sv_diehook ), 0); - PL_warnhook = 0; hv_store (hv_sig, strpair ("__WARN__"), SvREFCNT_inc (sv_warnhook), 0); + + /* recreate the die/warn hooks */ + PL_diehook = 0; SvSetMagicSV (*hv_fetch (hv_sig, "__DIE__" , sizeof ("__DIE__" ) - 1, 1), rv_diehook ); + PL_warnhook = 0; SvSetMagicSV (*hv_fetch (hv_sig, "__WARN__", sizeof ("__WARN__") - 1, 1), rv_warnhook); GvSV (PL_defgv) = newSV (0); GvAV (PL_defgv) = coro->args; coro->args = 0; @@ -715,7 +766,12 @@ SPAGAIN; } - ENTER; /* necessary e.g. for dounwind and to balance the xsub-entersub */ + /* this newly created coroutine might be run on an existing cctx which most + * likely was suspended in set_stacklevel, called from entersub. + * set_stacklevl doesn't do anything on return, but entersub does LEAVE, + * so we ENTER here for symmetry + */ + ENTER; } static void @@ -809,7 +865,7 @@ PUSHs (fullname); PUSHs (sv_2mortal (newRV_noinc ((SV *)av))); PUTBACK; - cb = hv_fetch ((HV *)SvRV (coro_current), strpair ("_trace_sub_cb"), 0); + cb = hv_fetch ((HV *)SvRV (coro_current), "_trace_sub_cb", sizeof ("_trace_sub_cb") - 1, 0); if (cb) call_sv (*cb, G_KEEPERR | G_EVAL | G_VOID | G_DISCARD); SPAGAIN; FREETMPS; @@ -846,9 +902,9 @@ PUSHMARK (SP); PUSHs (&PL_sv_yes); PUSHs (fullname); - PUSHs (cx->blk_sub.hasargs ? sv_2mortal (newRV_inc ((SV *)cx->blk_sub.argarray)) : &PL_sv_undef); + PUSHs (CxHASARGS (cx) ? sv_2mortal (newRV_inc ((SV *)cx->blk_sub.argarray)) : &PL_sv_undef); PUTBACK; - cb = hv_fetch ((HV *)SvRV (coro_current), strpair ("_trace_sub_cb"), 0); + cb = hv_fetch ((HV *)SvRV (coro_current), "_trace_sub_cb", sizeof ("_trace_sub_cb") - 1, 0); if (cb) call_sv (*cb, G_KEEPERR | G_EVAL | G_VOID | G_DISCARD); SPAGAIN; FREETMPS; @@ -872,7 +928,7 @@ PUSHs (sv_2mortal (newSVpv (OutCopFILE (oldcop), 0))); PUSHs (sv_2mortal (newSViv (CopLINE (oldcop)))); PUTBACK; - cb = hv_fetch ((HV *)SvRV (coro_current), strpair ("_trace_line_cb"), 0); + cb = hv_fetch ((HV *)SvRV (coro_current), "_trace_line_cb", sizeof ("_trace_line_cb") - 1, 0); if (cb) call_sv (*cb, G_KEEPERR | G_EVAL | G_VOID | G_DISCARD); SPAGAIN; FREETMPS; @@ -1074,7 +1130,7 @@ if ( #if PERL_VERSION_ATLEAST (5,9,0) - expect_false (PL_parser) + expect_false (PL_parser && PL_parser->lex_state != LEX_NOTPARSING) #else expect_false (PL_lex_state != LEX_NOTPARSING) #endif @@ -1088,6 +1144,7 @@ transfer (pTHX_ struct coro *prev, struct coro *next) { dSTACKLEVEL; + static volatile int has_throw; /* sometimes transfer is only called to set idle_sp */ if (expect_false (!next)) @@ -1112,21 +1169,18 @@ LOCK; + /* first get rid of the old state */ + save_perl (aTHX_ prev); + if (expect_false (next->flags & CF_NEW)) { /* need to start coroutine */ next->flags &= ~CF_NEW; - /* first get rid of the old state */ - save_perl (aTHX_ prev); /* setup coroutine call */ coro_setup (aTHX_ next); } else - { - /* coroutine already started */ - save_perl (aTHX_ prev); - load_perl (aTHX_ next); - } + load_perl (aTHX_ next); prev__cctx = prev->cctx; @@ -1152,6 +1206,8 @@ if (expect_true (!next->cctx)) next->cctx = cctx_get (aTHX); + has_throw = !!next->throw; + if (expect_false (prev__cctx != next->cctx)) { prev__cctx->top_env = PL_top_env; @@ -1162,7 +1218,7 @@ free_coro_mortal (aTHX); UNLOCK; - if (expect_false (prev->throw || next->throw)) + if (expect_false (has_throw)) { struct coro *coro = SvSTATE (coro_current); @@ -1298,15 +1354,11 @@ } static SV * -coro_deq (pTHX_ int min_prio) +coro_deq (pTHX) { - int prio = PRIO_MAX - PRIO_MIN; - - min_prio -= PRIO_MIN; - if (min_prio < 0) - min_prio = 0; + int prio; - for (prio = PRIO_MAX - PRIO_MIN + 1; --prio >= min_prio; ) + for (prio = PRIO_MAX - PRIO_MIN + 1; --prio >= 0; ) if (AvFILLp (coro_ready [prio]) >= 0) return av_shift (coro_ready [prio]); @@ -1352,7 +1404,7 @@ for (;;) { LOCK; - next_sv = coro_deq (aTHX_ PRIO_MIN); + next_sv = coro_deq (aTHX); /* nothing to schedule: call the idle handler */ if (expect_false (!next_sv)) @@ -1366,6 +1418,7 @@ PUSHMARK (SP); PUTBACK; call_sv (get_sv ("Coro::idle", FALSE), G_DISCARD); + SPAGAIN; FREETMPS; LEAVE; @@ -1505,12 +1558,14 @@ irsgv = gv_fetchpv ("/" , GV_ADD|GV_NOTQUAL, SVt_PV); stdoutgv = gv_fetchpv ("STDOUT", GV_ADD|GV_NOTQUAL, SVt_PVIO); - hv_sig = coro_get_hv ("SIG", TRUE); - sv_diehook = coro_get_sv ("Coro::State::DIEHOOK" , TRUE); - sv_warnhook = coro_get_sv ("Coro::State::WARNHOOK", TRUE); - - if (!PL_diehook ) hv_store (hv_sig, strpair ("__DIE__" ), SvREFCNT_inc (sv_diehook ), 0); - if (!PL_warnhook) hv_store (hv_sig, strpair ("__WARN__"), SvREFCNT_inc (sv_warnhook), 0); + orig_sigelem_get = PL_vtbl_sigelem.svt_get; + PL_vtbl_sigelem.svt_get = coro_sigelem_get; + orig_sigelem_set = PL_vtbl_sigelem.svt_set; + PL_vtbl_sigelem.svt_set = coro_sigelem_set; + + hv_sig = coro_get_hv (aTHX_ "SIG", TRUE); + rv_diehook = newRV_inc ((SV *)gv_fetchpv ("Coro::State::diehook" , 0, SVt_PVCV)); + rv_warnhook = newRV_inc ((SV *)gv_fetchpv ("Coro::State::warnhook", 0, SVt_PVCV)); coro_state_stash = gv_stashpv ("Coro::State", TRUE); @@ -1526,6 +1581,7 @@ main_top_env = main_top_env->je_prev; coroapi.ver = CORO_API_VERSION; + coroapi.rev = CORO_API_REVISION; coroapi.transfer = api_transfer; assert (("PRIO_NORMAL must be 0", !PRIO_NORMAL)); @@ -1536,6 +1592,7 @@ CODE: { struct coro *coro; + MAGIC *mg; HV *hv; int i; @@ -1548,7 +1605,8 @@ coro_first = coro; coro->hv = hv = newHV (); - sv_magicext ((SV *)hv, 0, PERL_MAGIC_ext, &coro_state_vtbl, (char *)coro, 0)->mg_flags |= MGf_DUP; + mg = sv_magicext ((SV *)hv, 0, CORO_MAGIC_type_state, &coro_state_vtbl, (char *)coro, 0); + mg->mg_flags |= MGf_DUP; RETVAL = sv_bless (newRV_noinc ((SV *)hv), gv_stashpv (klass, 1)); av_extend (coro->args, items - 1); @@ -1572,6 +1630,7 @@ { struct transfer_args ta; + PUTBACK; switch (ix) { case 0: @@ -1600,12 +1659,13 @@ break; } + SPAGAIN; BARRIER; + PUTBACK; TRANSFER (ta); - - if (expect_false (GIMME_V != G_VOID && ta.next != ta.prev)) - XSRETURN_YES; + SPAGAIN; /* might be the sp of a different coroutine now */ + /* be extra careful not to ever do anything after TRANSFER */ } bool @@ -1667,6 +1727,7 @@ if (!(coro->flags & CF_RUNNING)) { + PUTBACK; save_perl (aTHX_ &temp); load_perl (aTHX_ coro); } @@ -1675,14 +1736,16 @@ dSP; ENTER; SAVETMPS; - PUSHMARK (SP); PUTBACK; + PUSHSTACK; + PUSHMARK (SP); if (ix) eval_sv (coderef, 0); else call_sv (coderef, G_KEEPERR | G_EVAL | G_VOID | G_DISCARD); + POPSTACK; SPAGAIN; FREETMPS; LEAVE; @@ -1693,6 +1756,7 @@ { save_perl (aTHX_ coro); load_perl (aTHX_ &temp); + SPAGAIN; } } } @@ -1750,11 +1814,11 @@ { int i; - sv_pool_rss = coro_get_sv ("Coro::POOL_RSS" , TRUE); - sv_pool_size = coro_get_sv ("Coro::POOL_SIZE" , TRUE); - av_async_pool = coro_get_av ("Coro::async_pool", TRUE); + sv_pool_rss = coro_get_sv (aTHX_ "Coro::POOL_RSS" , TRUE); + sv_pool_size = coro_get_sv (aTHX_ "Coro::POOL_SIZE" , TRUE); + av_async_pool = coro_get_av (aTHX_ "Coro::async_pool", TRUE); - coro_current = coro_get_sv ("Coro::current", FALSE); + coro_current = coro_get_sv (aTHX_ "Coro::current", FALSE); SvREADONLY_on (coro_current); coro_stash = gv_stashpv ("Coro", TRUE); @@ -1847,18 +1911,21 @@ struct coro *coro = SvSTATE (coro_current); HV *hv = (HV *)SvRV (coro_current); AV *defav = GvAV (PL_defgv); - SV *invoke = hv_delete (hv, strpair ("_invoke"), 0); + SV *invoke = hv_delete (hv, "_invoke", sizeof ("_invoke") - 1, 0); AV *invoke_av; int i, len; if (!invoke) - croak ("\3async_pool terminate\2\n"); + { + SvREFCNT_dec (PL_diehook); PL_diehook = 0; + croak ("\3async_pool terminate\2\n"); + } SvREFCNT_dec (coro->saved_deffh); coro->saved_deffh = SvREFCNT_inc ((SV *)PL_defoutgv); hv_store (hv, "desc", sizeof ("desc") - 1, - newSVpvn (strpair ("[async_pool]")), 0); + newSVpvn ("[async_pool]", sizeof ("[async_pool]") - 1), 0); invoke_av = (AV *)SvRV (invoke); len = av_len (invoke_av); @@ -1888,11 +1955,14 @@ if (coro_rss (aTHX_ coro) > SvIV (sv_pool_rss) || av_len (av_async_pool) + 1 >= SvIV (sv_pool_size)) - croak ("\3async_pool terminate\2\n"); + { + SvREFCNT_dec (PL_diehook); PL_diehook = 0; + croak ("\3async_pool terminate\2\n"); + } av_clear (GvAV (PL_defgv)); - hv_store ((HV *)SvRV (coro_current), strpair ("desc"), - newSVpvn (strpair ("[async_pool idle]")), 0); + hv_store ((HV *)SvRV (coro_current), "desc", sizeof ("desc") - 1, + newSVpvn ("[async_pool idle]", sizeof ("[async_pool idle]") - 1), 0); coro->prio = 0;