--- Coro/Coro/State.xs 2008/04/10 07:45:33 1.229 +++ Coro/Coro/State.xs 2008/09/22 05:40:21 1.245 @@ -12,7 +12,15 @@ #include #include #include -#include /* portable stdint.h */ + +#ifdef WIN32 +# undef setjmp +# undef longjmp +# undef _exit +# define setjmp _setjmp // deep magic, don't ask +#else +# include /* most portable stdint.h */ +#endif #ifdef HAVE_MMAP # include @@ -165,6 +173,9 @@ static SV *sv_pool_size; static AV *av_async_pool; +/* Coro::AnyEvent */ +static SV *sv_activity; + static struct coro_cctx *cctx_first; static int cctx_count, cctx_idle; @@ -259,6 +270,7 @@ /* for Coro.pm */ static SV *coro_current; +static SV *coro_readyhook; static AV *coro_ready [PRIO_MAX-PRIO_MIN+1]; static int coro_nready; static struct coro *coro_first; @@ -351,6 +363,8 @@ while (&PL_sv_undef != (SV *)(padlist = (AV *)av_pop (av))) free_padlist (aTHX_ padlist); + SvREFCNT_dec (av); /* sv_magicext increased the refcount */ + return 0; } @@ -410,7 +424,8 @@ else { #if CORO_PREFER_PERL_FUNCTIONS - /* this is probably cleaner, but also slower? */ + /* this is probably cleaner? but also slower! */ + /* in practise, it seems to be less stable */ CV *cp = Perl_cv_clone (cv); CvPADLIST (cv) = CvPADLIST (cp); CvPADLIST (cp) = 0; @@ -504,7 +519,7 @@ { EXTEND (SP, 3); PUSHs ((SV *)CvPADLIST (cv)); - PUSHs (INT2PTR (SV *, CvDEPTH (cv))); + PUSHs (INT2PTR (SV *, (IV)CvDEPTH (cv))); PUSHs ((SV *)cv); CvDEPTH (cv) = 0; @@ -653,17 +668,20 @@ else slot = coro->slot; - rss += sizeof (slot->curstackinfo); - rss += (slot->curstackinfo->si_cxmax + 1) * sizeof (PERL_CONTEXT); - rss += sizeof (SV) + sizeof (struct xpvav) + (1 + AvMAX (slot->curstack)) * sizeof (SV *); - rss += slot->tmps_max * sizeof (SV *); - rss += (slot->markstack_max - slot->markstack_ptr) * sizeof (I32); - rss += slot->scopestack_max * sizeof (I32); - rss += slot->savestack_max * sizeof (ANY); + if (slot) + { + rss += sizeof (slot->curstackinfo); + rss += (slot->curstackinfo->si_cxmax + 1) * sizeof (PERL_CONTEXT); + rss += sizeof (SV) + sizeof (struct xpvav) + (1 + AvMAX (slot->curstack)) * sizeof (SV *); + rss += slot->tmps_max * sizeof (SV *); + rss += (slot->markstack_max - slot->markstack_ptr) * sizeof (I32); + rss += slot->scopestack_max * sizeof (I32); + rss += slot->savestack_max * sizeof (ANY); #if !PERL_VERSION_ATLEAST (5,10,0) - rss += slot->retstack_max * sizeof (OP *); + rss += slot->retstack_max * sizeof (OP *); #endif + } } return rss; @@ -673,6 +691,14 @@ static int (*orig_sigelem_get) (pTHX_ SV *sv, MAGIC *mg); static int (*orig_sigelem_set) (pTHX_ SV *sv, MAGIC *mg); +static int (*orig_sigelem_clr) (pTHX_ SV *sv, MAGIC *mg); + +/* apparently < 5.8.8 */ +#ifndef MgPV_nolen_const +#define MgPV_nolen_const(mg) (((((int)(mg)->mg_len)) == HEf_SVKEY) ? \ + SvPV_nolen((SV*)((mg)->mg_ptr)) : \ + (const char*)(mg)->mg_ptr) +#endif /* * This overrides the default magic get method of %SIG elements. @@ -690,14 +716,46 @@ if (*s == '_') { - if (strEQ (s, "__DIE__" ) && PL_diehook ) return sv_setsv (sv, PL_diehook ), 0; - if (strEQ (s, "__WARN__") && PL_warnhook) return sv_setsv (sv, PL_warnhook), 0; + SV **svp = 0; + + if (strEQ (s, "__DIE__" )) svp = &PL_diehook; + if (strEQ (s, "__WARN__")) svp = &PL_warnhook; + + if (svp) + { + sv_setsv (sv, *svp ? *svp : &PL_sv_undef); + return 0; + } } return orig_sigelem_get ? orig_sigelem_get (aTHX_ sv, mg) : 0; } static int +coro_sigelem_clr (pTHX_ SV *sv, MAGIC *mg) +{ + const char *s = MgPV_nolen_const (mg); + + if (*s == '_') + { + SV **svp = 0; + + if (strEQ (s, "__DIE__" )) svp = &PL_diehook; + if (strEQ (s, "__WARN__")) svp = &PL_warnhook; + + if (svp) + { + SV *old = *svp; + *svp = 0; + SvREFCNT_dec (old); + return 0; + } + } + + return orig_sigelem_clr ? orig_sigelem_clr (aTHX_ sv, mg) : 0; +} + +static int coro_sigelem_set (pTHX_ SV *sv, MAGIC *mg) { const char *s = MgPV_nolen_const (mg); @@ -714,7 +772,7 @@ SV *old = *svp; *svp = newSVsv (sv); SvREFCNT_dec (old); - return; + return 0; } } @@ -1132,7 +1190,7 @@ croak ("Coro::State::transfer called with destroyed next Coro::State, but can only transfer to inactive states"); #if !PERL_VERSION_ATLEAST (5,10,0) - if (expect_false (PL_lex_state != LEX_NOTPARSING) + if (expect_false (PL_lex_state != LEX_NOTPARSING)) croak ("Coro::State::transfer called while parsing, but this is not supported in your perl version"); #endif } @@ -1373,6 +1431,8 @@ { dTHX; struct coro *coro; + SV *sv_hook; + void (*xs_hook)(void); if (SvROK (coro_sv)) coro_sv = SvRV (coro_sv); @@ -1385,9 +1445,33 @@ coro->flags |= CF_READY; LOCK; + + sv_hook = coro_nready ? 0 : coro_readyhook; + xs_hook = coro_nready ? 0 : coroapi.readyhook; + coro_enq (aTHX_ SvREFCNT_inc (coro_sv)); ++coro_nready; + UNLOCK; + + if (sv_hook) + { + dSP; + + ENTER; + SAVETMPS; + + PUSHMARK (SP); + PUTBACK; + call_sv (sv_hook, G_DISCARD); + SPAGAIN; + + FREETMPS; + LEAVE; + } + + if (xs_hook) + xs_hook (); return 1; } @@ -1547,6 +1631,22 @@ } } +static int +coro_gensub_free (pTHX_ SV *sv, MAGIC *mg) +{ + AV *padlist; + AV *av = (AV *)mg->mg_obj; + + abort (); + + return 0; +} + +static MGVTBL coro_gensub_vtbl = { + 0, 0, 0, 0, + coro_gensub_free +}; + MODULE = Coro::State PACKAGE = Coro::State PREFIX = api_ PROTOTYPES: DISABLE @@ -1561,10 +1661,9 @@ irsgv = gv_fetchpv ("/" , GV_ADD|GV_NOTQUAL, SVt_PV); stdoutgv = gv_fetchpv ("STDOUT", GV_ADD|GV_NOTQUAL, SVt_PVIO); - orig_sigelem_get = PL_vtbl_sigelem.svt_get; - PL_vtbl_sigelem.svt_get = coro_sigelem_get; - orig_sigelem_set = PL_vtbl_sigelem.svt_set; - PL_vtbl_sigelem.svt_set = coro_sigelem_set; + orig_sigelem_get = PL_vtbl_sigelem.svt_get; PL_vtbl_sigelem.svt_get = coro_sigelem_get; + orig_sigelem_set = PL_vtbl_sigelem.svt_set; PL_vtbl_sigelem.svt_set = coro_sigelem_set; + orig_sigelem_clr = PL_vtbl_sigelem.svt_clear; PL_vtbl_sigelem.svt_clear = coro_sigelem_clr; hv_sig = coro_get_hv (aTHX_ "SIG", TRUE); rv_diehook = newRV_inc ((SV *)gv_fetchpv ("Coro::State::diehook" , 0, SVt_PVCV)); @@ -1583,9 +1682,9 @@ while (main_top_env->je_prev) main_top_env = main_top_env->je_prev; - coroapi.ver = CORO_API_VERSION; - coroapi.rev = CORO_API_REVISION; - coroapi.transfer = api_transfer; + coroapi.ver = CORO_API_VERSION; + coroapi.rev = CORO_API_REVISION; + coroapi.transfer = api_transfer; assert (("PRIO_NORMAL must be 0", !PRIO_NORMAL)); } @@ -1679,8 +1778,7 @@ RETVAL void -_exit (code) - int code +_exit (int code) PROTOTYPE: $ CODE: _exit (code); @@ -1724,7 +1822,7 @@ eval = 1 CODE: { - if (coro->mainstack) + if (coro->mainstack && ((coro->flags & CF_RUNNING) || coro->slot)) { struct coro temp; @@ -1781,7 +1879,7 @@ api_trace (SV *coro, int flags = CC_TRACE | CC_TRACE_SUB) SV * -has_stack (Coro::State coro) +has_cctx (Coro::State coro) PROTOTYPE: $ CODE: RETVAL = boolSV (!!coro->cctx); @@ -1816,15 +1914,38 @@ struct coro *coro = SvSTATE (coro_current); coro->cctx->idle_sp = 0; +void +throw (Coro::State self, SV *throw = &PL_sv_undef) + PROTOTYPE: $;$ + CODE: + SvREFCNT_dec (self->throw); + self->throw = SvOK (throw) ? newSVsv (throw) : 0; + +void +swap_defsv (Coro::State self) + PROTOTYPE: $ + ALIAS: + swap_defav = 1 + CODE: + if (!self->slot) + croak ("cannot swap state with coroutine that has no saved state"); + else + { + SV **src = ix ? (SV **)&GvAV (PL_defgv) : &GvSV (PL_defgv); + SV **dst = ix ? (SV **)&self->slot->defav : (SV **)&self->slot->defsv; + + SV *tmp = *src; *src = *dst; *dst = tmp; + } + MODULE = Coro::State PACKAGE = Coro BOOT: { int i; + av_async_pool = coro_get_av (aTHX_ "Coro::async_pool", TRUE); sv_pool_rss = coro_get_sv (aTHX_ "Coro::POOL_RSS" , TRUE); sv_pool_size = coro_get_sv (aTHX_ "Coro::POOL_SIZE" , TRUE); - av_async_pool = coro_get_av (aTHX_ "Coro::async_pool", TRUE); coro_current = coro_get_sv (aTHX_ "Coro::current", FALSE); SvREADONLY_on (coro_current); @@ -1866,6 +1987,15 @@ SvREFCNT_dec (SvRV (coro_current)); SvRV_set (coro_current, SvREFCNT_inc (SvRV (current))); +void +_set_readyhook (SV *hook) + PROTOTYPE: $ + CODE: + LOCK; + SvREFCNT_dec (coro_readyhook); + coro_readyhook = SvOK (hook) ? newSVsv (hook) : 0; + UNLOCK; + int prio (Coro::State coro, int newprio = 0) ALIAS: @@ -1904,29 +2034,6 @@ OUTPUT: RETVAL -void -throw (Coro::State self, SV *throw = &PL_sv_undef) - PROTOTYPE: $;$ - CODE: - SvREFCNT_dec (self->throw); - self->throw = SvOK (throw) ? newSVsv (throw) : 0; - -void -swap_defsv (Coro::State self) - PROTOTYPE: $ - ALIAS: - swap_defav = 1 - CODE: - if (!self->slot) - croak ("cannot swap state with coroutine that has no saved state"); - else - { - SV **src = ix ? (SV **)&GvAV (PL_defgv) : &GvSV (PL_defgv); - SV **dst = ix ? (SV **)&self->slot->defav : (SV **)&self->slot->defsv; - - SV *tmp = *src; *src = *dst; *dst = tmp; - } - # for async_pool speedup void _pool_1 (SV *cb) @@ -1941,7 +2048,9 @@ if (!invoke) { - SvREFCNT_dec (PL_diehook); PL_diehook = 0; + SV *old = PL_diehook; + PL_diehook = 0; + SvREFCNT_dec (old); croak ("\3async_pool terminate\2\n"); } @@ -1980,7 +2089,9 @@ if (coro_rss (aTHX_ coro) > SvIV (sv_pool_rss) || av_len (av_async_pool) + 1 >= SvIV (sv_pool_size)) { - SvREFCNT_dec (PL_diehook); PL_diehook = 0; + SV *old = PL_diehook; + PL_diehook = 0; + SvREFCNT_dec (old); croak ("\3async_pool terminate\2\n"); } @@ -1996,6 +2107,54 @@ av_push (av_async_pool, newSVsv (coro_current)); } +#if 0 + +void +_generator_call (...) + PROTOTYPE: @ + PPCODE: + fprintf (stderr, "call %p\n", CvXSUBANY(cv).any_ptr); + xxxx + abort (); + +SV * +gensub (SV *sub, ...) + PROTOTYPE: &;@ + CODE: +{ + struct coro *coro; + MAGIC *mg; + CV *xcv; + CV *ncv = (CV *)newSV_type (SVt_PVCV); + int i; + + CvGV (ncv) = CvGV (cv); + CvFILE (ncv) = CvFILE (cv); + + Newz (0, coro, 1, struct coro); + coro->args = newAV (); + coro->flags = CF_NEW; + + av_extend (coro->args, items - 1); + for (i = 1; i < items; i++) + av_push (coro->args, newSVsv (ST (i))); + + CvISXSUB_on (ncv); + CvXSUBANY (ncv).any_ptr = (void *)coro; + + xcv = GvCV (gv_fetchpv ("Coro::_generator_call", 0, SVt_PVCV)); + + CvXSUB (ncv) = CvXSUB (xcv); + CvANON_on (ncv); + + mg = sv_magicext ((SV *)ncv, 0, CORO_MAGIC_type_state, &coro_gensub_vtbl, (char *)coro, 0); + RETVAL = newRV_noinc ((SV *)ncv); +} + OUTPUT: + RETVAL + +#endif + MODULE = Coro::State PACKAGE = Coro::AIO @@ -2031,3 +2190,34 @@ PL_statcache = data->statcache; } + +MODULE = Coro::State PACKAGE = Coro::AnyEvent + +BOOT: + sv_activity = coro_get_sv (aTHX_ "Coro::AnyEvent::ACTIVITY", TRUE); + +SV * +_schedule (...) + PROTOTYPE: @ + CODE: +{ + static int incede; + + api_cede_notself (); + + ++incede; + while (coro_nready >= incede && api_cede ()) + ; + + sv_setsv (sv_activity, &PL_sv_undef); + if (coro_nready >= incede) + { + PUSHMARK (SP); + PUTBACK; + call_pv ("Coro::AnyEvent::_activity", G_DISCARD | G_EVAL); + SPAGAIN; + } + + --incede; +} +