--- Coro/Coro/State.xs 2009/10/02 19:58:02 1.375 +++ Coro/Coro/State.xs 2011/05/06 21:15:17 1.397 @@ -1,3 +1,6 @@ +/* this works around a bug in mingw32 providing a non-working setjmp */ +#define USE_NO_MINGW_SETJMP_TWO_ARGS + #define NDEBUG 1 #include "libcoro/coro.c" @@ -20,7 +23,8 @@ # define SVs_PADSTALE 0 #endif -#ifdef WIN32 +#if defined(_WIN32) +# undef HAS_GETTIMEOFDAY # undef setjmp # undef longjmp # undef _exit @@ -124,6 +128,7 @@ /* we hijack an hopefully unused CV flag for our purposes */ #define CVf_SLF 0x4000 static OP *pp_slf (pTHX); +static void slf_destroy (pTHX_ struct coro *coro); static U32 cctx_gen; static size_t cctx_stacksize = CORO_STACKSIZE; @@ -194,7 +199,7 @@ unsigned char flags; } coro_cctx; -coro_cctx *cctx_current; /* the currently running cctx */ +static coro_cctx *cctx_current; /* the currently running cctx */ /*****************************************************************************/ @@ -204,6 +209,7 @@ CF_NEW = 0x0004, /* has never been switched to */ CF_DESTROYED = 0x0008, /* coroutine data has been freed */ CF_SUSPENDED = 0x0010, /* coroutine can't be scheduled */ + CF_NOCANCEL = 0x0020, /* cannot cancel, set slf_frame.data to 1 (hackish) */ }; /* the structure where most of the perl state is stored, overlaid on the cxstack */ @@ -239,7 +245,6 @@ int refcnt; /* coroutines are refcounted, yes */ int flags; /* CF_ flags */ HV *hv; /* the perl hash associated with this coro, if any */ - void (*on_destroy)(pTHX_ struct coro *coro); /* statistics */ int usecount; /* number of transfers to this coro */ @@ -273,7 +278,7 @@ /* the following variables are effectively part of the perl context */ /* and get copied between struct coro and these variables */ -/* the mainr easonw e don't support windows process emulation */ +/* the main reason we don't support windows process emulation */ static struct CoroSLF slf_frame; /* the current slf frame */ /** Coro ********************************************************************/ @@ -289,7 +294,7 @@ static SV *coro_current; static SV *coro_readyhook; static struct coro *coro_ready [CORO_PRIO_MAX - CORO_PRIO_MIN + 1][2]; /* head|tail */ -static CV *cv_coro_run, *cv_coro_terminate; +static CV *cv_coro_run; static struct coro *coro_first; #define coro_nready coroapi.nready @@ -313,6 +318,58 @@ return PL_ppaddr [OP_ENTERSUB](aTHX); } +/** time stuff **************************************************************/ + +#ifdef HAS_GETTIMEOFDAY + +static void +coro_u2time (pTHX_ UV ret[2]) +{ + struct timeval tv; + gettimeofday (&tv, 0); + + ret [0] = tv.tv_sec; + ret [1] = tv.tv_usec; +} + +static double +coro_nvtime () +{ + struct timeval tv; + gettimeofday (&tv, 0); + + return tv.tv_sec + tv.tv_usec * 1e-6; +} + +static void +time_init (pTHX) +{ + nvtime = coro_nvtime; + u2time = coro_u2time; +} + +#else + +static void +time_init (pTHX) +{ + SV **svp; + + require_pv ("Time/HiRes.pm"); + + svp = hv_fetch (PL_modglobal, "Time::NVtime", 12, 0); + + if (!svp) croak ("Time::HiRes is required, but missing. Caught"); + if (!SvIOK (*svp)) croak ("Time::NVtime isn't a function pointer. Caught"); + + nvtime = INT2PTR (double (*)(), SvIV (*svp)); + + svp = hv_fetch (PL_modglobal, "Time::U2time", 12, 0); + u2time = INT2PTR (void (*)(pTHX_ UV ret[2]), SvIV (*svp)); +} + +#endif + /** lowlevel stuff **********************************************************/ static SV * @@ -585,7 +642,7 @@ #if PERL_VERSION_ATLEAST (5,10,0) /* perl 5.10 complicates this _quite_ a bit, but it also is - * is much faster, so no quarrels here. alternatively, we could + * much faster, so no quarrels here. alternatively, we could * sv_upgrade to avoid this. */ { @@ -720,6 +777,7 @@ if (expect_true (CvDEPTH (cv))) { + EXTEND (SP, 3); PUSHs ((SV *)CvPADLIST (cv)); PUSHs (INT2PTR (SV *, (IV)CvDEPTH (cv))); PUSHs ((SV *)cv); @@ -1011,7 +1069,6 @@ PL_curpm = 0; PL_curpad = 0; PL_localizing = 0; - PL_dirty = 0; PL_restartop = 0; #if PERL_VERSION_ATLEAST (5,10,0) PL_parser = 0; @@ -1099,11 +1156,16 @@ SV *svf [9]; { - struct coro *current = SvSTATE_current; + SV *old_current = SvRV (coro_current); + struct coro *current = SvSTATE (old_current); assert (("FATAL: tried to destroy currently running coroutine", coro->mainstack != PL_mainstack)); save_perl (aTHX_ current); + + /* this will cause transfer_check to croak on block*/ + SvRV_set (coro_current, (SV *)coro->hv); + load_perl (aTHX_ coro); coro_unwind_stacks (aTHX); @@ -1124,6 +1186,8 @@ svf [8] = PL_warnhook; assert (9 == sizeof (svf) / sizeof (*svf)); + SvRV_set (coro_current, old_current); + load_perl (aTHX_ current); } @@ -1137,7 +1201,6 @@ SvREFCNT_dec (coro->rouse_cb); SvREFCNT_dec (coro->invoke_cb); SvREFCNT_dec (coro->invoke_av); - SvREFCNT_dec (coro->swap_sv); } } @@ -1443,7 +1506,7 @@ if (!cctx) return; - assert (("FATAL: tried to destroy current cctx", cctx != cctx_current));//D temporary? + assert (("FATAL: tried to destroy current cctx", cctx != cctx_current)); --cctx_count; coro_destroy (&cctx->cctx); @@ -1612,14 +1675,13 @@ /** high level stuff ********************************************************/ -static int +static void coro_state_destroy (pTHX_ struct coro *coro) { if (coro->flags & CF_DESTROYED) - return 0; + return; - if (coro->on_destroy && !PL_dirty) - coro->on_destroy (aTHX_ coro); + slf_destroy (aTHX_ coro); coro->flags |= CF_DESTROYED; @@ -1638,16 +1700,15 @@ && !PL_dirty) destroy_perl (aTHX_ coro); - cctx_destroy (coro->cctx); - SvREFCNT_dec (coro->startcv); - SvREFCNT_dec (coro->args); - SvREFCNT_dec (CORO_THROW); - if (coro->next) coro->next->prev = coro->prev; if (coro->prev) coro->prev->next = coro->next; if (coro == coro_first) coro_first = coro->next; - return 1; + cctx_destroy (coro->cctx); + SvREFCNT_dec (coro->startcv); + SvREFCNT_dec (coro->args); + SvREFCNT_dec (coro->swap_sv); + SvREFCNT_dec (CORO_THROW); } static int @@ -1825,6 +1886,9 @@ if (SvROK (sv_idle) && SvOBJECT (SvRV (sv_idle))) { + if (SvRV (sv_idle) == SvRV (coro_current)) + croak ("FATAL: $Coro::IDLE blocked itself - did you try to block inside an event loop callback? Caught"); + ++coro_nready; /* hack so that api_ready doesn't invoke ready hook */ api_ready (aTHX_ SvRV (sv_idle)); --coro_nready; @@ -1951,11 +2015,12 @@ coro_call_on_destroy (pTHX_ struct coro *coro) { SV **on_destroyp = hv_fetch (coro->hv, "_on_destroy", sizeof ("_on_destroy") - 1, 0); - SV **statusp = hv_fetch (coro->hv, "_status", sizeof ("_status") - 1, 0); if (on_destroyp) { - AV *on_destroy = (AV *)SvRV (*on_destroyp); + SV **statusp = hv_fetch (coro->hv, "_status", sizeof ("_status") - 1, 0); + AV *on_destroy = sv_2mortal (SvREFCNT_inc ((AV *)SvRV (*on_destroyp))); + AV *status = statusp ? sv_2mortal (SvREFCNT_inc ((AV *)SvRV (*statusp))) : 0; while (AvFILLp (on_destroy) >= 0) { @@ -1967,7 +2032,6 @@ if (statusp) { int i; - AV *status = (AV *)SvRV (*statusp); EXTEND (SP, AvFILLp (status) + 1); for (i = 0; i <= AvFILLp (status); ++i) @@ -1981,24 +2045,28 @@ } static void -slf_init_terminate (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items) +coro_set_status (HV *coro_hv, SV **arg, int items) { - int i; - HV *hv = (HV *)SvRV (coro_current); AV *av = newAV (); /* items are actually not so common, so optimise for this case */ if (items) { + int i; + av_extend (av, items - 1); for (i = 0; i < items; ++i) av_push (av, SvREFCNT_inc_NN (arg [i])); } - hv_store (hv, "_status", sizeof ("_status") - 1, newRV_noinc ((SV *)av), 0); + hv_store (coro_hv, "_status", sizeof ("_status") - 1, newRV_noinc ((SV *)av), 0); +} - av_push (av_destroy, (SV *)newRV_inc ((SV *)hv)); /* RVinc for perl */ +static void +slf_init_terminate_cancel_common (pTHX_ struct CoroSLF *frame, HV *coro_hv) +{ + av_push (av_destroy, (SV *)newRV_inc ((SV *)coro_hv)); /* RVinc for perl */ api_ready (aTHX_ sv_manager); frame->prepare = prepare_schedule; @@ -2009,6 +2077,113 @@ /*coro_unwind_stacks (aTHX);*/ } +static void +slf_init_terminate (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items) +{ + HV *coro_hv = (HV *)SvRV (coro_current); + + coro_set_status (coro_hv, arg, items); + slf_init_terminate_cancel_common (frame, coro_hv); +} + +static void +slf_init_cancel (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items) +{ + HV *coro_hv; + struct coro *coro; + + if (items <= 0) + croak ("Coro::cancel called without coro object,"); + + coro = SvSTATE (arg [0]); + coro_hv = coro->hv; + + coro_set_status (coro_hv, arg + 1, items - 1); + + if (expect_false (coro->flags & CF_NOCANCEL)) + { + /* coro currently busy cancelling something, so just notify it */ + coro->slf_frame.data = (void *)coro; + + frame->prepare = prepare_nop; + frame->check = slf_check_nop; + } + else if (coro_hv == (HV *)SvRV (coro_current)) + { + /* cancelling the current coro is allowed, and equals terminate */ + slf_init_terminate_cancel_common (frame, coro_hv); + } + else + { + struct coro *self = SvSTATE_current; + + /* otherwise we cancel directly, purely for speed reasons + * unfortunately, this requires some magic trickery, as + * somebody else could cancel us, so we have to fight the cancellation. + * this is ugly, and hopefully fully worth the extra speed. + * besides, I can't get the slow-but-safe version working... + */ + slf_frame.data = 0; + self->flags |= CF_NOCANCEL; + + coro_state_destroy (aTHX_ coro); + coro_call_on_destroy (aTHX_ coro); + + self->flags &= ~CF_NOCANCEL; + + if (slf_frame.data) + { + /* while we were busy we have been cancelled, so terminate */ + slf_init_terminate_cancel_common (frame, self->hv); + } + else + { + frame->prepare = prepare_nop; + frame->check = slf_check_nop; + } + } +} + +static int +slf_check_safe_cancel (pTHX_ struct CoroSLF *frame) +{ + frame->prepare = 0; + coro_unwind_stacks (); + + slf_init_terminate_cancel_common (aTHX_ frame, (HV *)SvRV (coro_current)); + + return 1; +} + +static int +safe_cancel (pTHX_ struct coro *coro, SV **arg, int items) +{ + if (coro->cctx) + croak ("coro inside C callback, unable to cancel at this time, caught"); + + if (coro->flags & CF_NEW) + { + coro_set_status (coro->hv, arg, items); + coro_state_destroy (aTHX_ coro); + coro_call_on_destroy (aTHX_ coro); + } + else + { + if (!coro->slf_frame.prepare) + croak ("coro outside an SLF function, unable to cancel at this time, caught"); + + slf_destroy (coro); + + coro_set_status (coro->hv, arg, items); + coro->slf_frame.prepare = prepare_nop; + coro->slf_frame.check = slf_check_safe_cancel; + + api_ready (aTHX_ coro->hv); + } + + return 1; +} + /*****************************************************************************/ /* async pool handler */ @@ -2056,10 +2231,8 @@ if (coro_rss (aTHX_ coro) > SvUV (sv_pool_rss) || av_len (av_async_pool) + 1 >= SvIV (sv_pool_size)) { - coro->invoke_cb = SvREFCNT_inc_NN ((SV *)cv_coro_terminate); - coro->invoke_av = newAV (); - - frame->prepare = prepare_nop; + slf_init_terminate_cancel_common (aTHX_ frame, hv); + return; } else { @@ -2294,6 +2467,23 @@ frame->check = slf_check_nop; } +/* "undo"/cancel a running slf call - used when cancelling a coro, mainly */ +static void +slf_destroy (pTHX_ struct coro *coro) +{ + /* this callback is reserved for slf functions needing to do cleanup */ + if (coro->slf_frame.destroy && coro->slf_frame.prepare && !PL_dirty) + coro->slf_frame.destroy (aTHX_ coro); + + /* + * The on_destroy above most likely is from an SLF call. + * Since by definition the SLF call will not finish when we destroy + * the coro, we will have to force-finish it here, otherwise + * cleanup functions cannot call SLF functions. + */ + coro->slf_frame.prepare = 0; +} + /* * these not obviously related functions are all rolled into one * function to increase chances that they all will call transfer with the same @@ -2420,8 +2610,8 @@ if (items > slf_arga) { slf_arga = items; - free (slf_argv); - slf_argv = malloc (slf_arga * sizeof (SV *)); + Safefree (slf_argv); + New (0, slf_argv, slf_arga, SV *); } slf_argc = items; @@ -2627,7 +2817,7 @@ } static void -coro_semaphore_on_destroy (pTHX_ struct coro *coro) +coro_semaphore_destroy (pTHX_ struct coro *coro) { /* call $sem->adjust (0) to possibly wake up some other waiters */ coro_semaphore_adjust (aTHX_ (AV *)coro->slf_frame.data, 0); @@ -2644,7 +2834,7 @@ return 0; else if (SvIVX (count_sv) > 0) { - SvSTATE_current->on_destroy = 0; + frame->destroy = 0; if (acquire) SvIVX (count_sv) = SvIVX (count_sv) - 1; @@ -2697,10 +2887,9 @@ frame->data = (void *)sv_2mortal (SvREFCNT_inc ((SV *)av)); frame->prepare = prepare_schedule; - /* to avoid race conditions when a woken-up coro gets terminated */ /* we arrange for a temporary on_destroy that calls adjust (0) */ - SvSTATE_current->on_destroy = coro_semaphore_on_destroy; + frame->destroy = coro_semaphore_destroy; } } @@ -2792,7 +2981,7 @@ av_push (av, SvREFCNT_inc_NN (cb_cv)); if (SvIVX (AvARRAY (av)[0])) - coro_signal_wake (aTHX_ av, 1); /* ust be the only waiter */ + coro_signal_wake (aTHX_ av, 1); /* must be the only waiter */ frame->prepare = prepare_nop; frame->check = slf_check_nop; @@ -2964,7 +3153,7 @@ call_sv ((SV *)req, G_VOID | G_DISCARD); } - /* now that the requets is going, we loop toll we have a result */ + /* now that the request is going, we loop till we have a result */ frame->data = (void *)state; frame->prepare = prepare_schedule; frame->check = slf_check_aio_req; @@ -3105,17 +3294,7 @@ coroapi.prepare_cede = prepare_cede; coroapi.prepare_cede_notself = prepare_cede_notself; - { - SV **svp = hv_fetch (PL_modglobal, "Time::NVtime", 12, 0); - - if (!svp) croak ("Time::HiRes is required"); - if (!SvIOK (*svp)) croak ("Time::NVtime isn't a function pointer"); - - nvtime = INT2PTR (double (*)(), SvIV (*svp)); - - svp = hv_fetch (PL_modglobal, "Time::U2time", 12, 0); - u2time = INT2PTR (void (*)(pTHX_ UV ret[2]), SvIV (*svp)); - } + time_init (aTHX); assert (("PRIO_NORMAL must be 0", !CORO_PRIO_NORMAL)); } @@ -3135,13 +3314,6 @@ CODE: CORO_EXECUTE_SLF_XS (slf_init_transfer); -bool -_destroy (SV *coro_sv) - CODE: - RETVAL = coro_state_destroy (aTHX_ SvSTATE (coro_sv)); - OUTPUT: - RETVAL - void _exit (int code) PROTOTYPE: $ @@ -3226,12 +3398,22 @@ if (coro->mainstack && ((coro->flags & CF_RUNNING) || coro->slot)) { struct coro *current = SvSTATE_current; + struct CoroSLF slf_save; if (current != coro) { PUTBACK; save_perl (aTHX_ current); load_perl (aTHX_ coro); + /* the coro is most likely in an active SLF call. + * while not strictly required (the code we execute is + * not allowed to call any SLF functions), it's cleaner + * to reinitialise the slf_frame and restore it later. + * This might one day allow us to actually do SLF calls + * from code executed here. + */ + slf_save = slf_frame; + slf_frame.prepare = 0; SPAGAIN; } @@ -3251,6 +3433,7 @@ if (current != coro) { PUTBACK; + slf_frame = slf_save; save_perl (aTHX_ coro); load_perl (aTHX_ current); SPAGAIN; @@ -3273,15 +3456,15 @@ RETVAL void -throw (Coro::State self, SV *throw = &PL_sv_undef) +throw (Coro::State self, SV *exception = &PL_sv_undef) PROTOTYPE: $;$ CODE: { struct coro *current = SvSTATE_current; - SV **throwp = self == current ? &CORO_THROW : &self->except; - SvREFCNT_dec (*throwp); - SvGETMAGIC (throw); - *throwp = SvOK (throw) ? newSVsv (throw) : 0; + SV **exceptionp = self == current ? &CORO_THROW : &self->except; + SvREFCNT_dec (*exceptionp); + SvGETMAGIC (exception); + *exceptionp = SvOK (exception) ? newSVsv (exception) : 0; } void @@ -3346,8 +3529,6 @@ cancel (Coro::State self) CODE: coro_state_destroy (aTHX_ self); - coro_call_on_destroy (aTHX_ self); /* actually only for Coro objects */ - SV * enable_times (int enabled = enable_times) @@ -3413,7 +3594,6 @@ sv_pool_rss = coro_get_sv (aTHX_ "Coro::POOL_RSS" , TRUE); sv_pool_size = coro_get_sv (aTHX_ "Coro::POOL_SIZE" , TRUE); cv_coro_run = get_cv ( "Coro::_coro_run" , GV_ADD); - cv_coro_terminate = get_cv ( "Coro::terminate" , GV_ADD); coro_current = coro_get_sv (aTHX_ "Coro::current" , FALSE); SvREADONLY_on (coro_current); av_async_pool = coro_get_av (aTHX_ "Coro::async_pool", TRUE); av_destroy = coro_get_av (aTHX_ "Coro::destroy" , TRUE); @@ -3462,11 +3642,27 @@ RETVAL void +_destroy (Coro::State coro) + CODE: + /* used by the manager thread */ + coro_state_destroy (aTHX_ coro); + coro_call_on_destroy (aTHX_ coro); + +void terminate (...) CODE: CORO_EXECUTE_SLF_XS (slf_init_terminate); void +cancel (...) + CODE: + CORO_EXECUTE_SLF_XS (slf_init_cancel); + +int +safe_cancel (Coro::State self, ...) + C_ARGS: aTHX_ self, &ST (1), items - 1 + +void schedule (...) CODE: CORO_EXECUTE_SLF_XS (slf_init_schedule); @@ -3871,4 +4067,3 @@ coro_old_pp_sselect = 0; } -