… | |
… | |
126 | static void (*u2time)(pTHX_ UV ret[2]); |
126 | static void (*u2time)(pTHX_ UV ret[2]); |
127 | |
127 | |
128 | /* we hijack an hopefully unused CV flag for our purposes */ |
128 | /* we hijack an hopefully unused CV flag for our purposes */ |
129 | #define CVf_SLF 0x4000 |
129 | #define CVf_SLF 0x4000 |
130 | static OP *pp_slf (pTHX); |
130 | static OP *pp_slf (pTHX); |
|
|
131 | static void slf_destroy (pTHX_ struct coro *coro); |
131 | |
132 | |
132 | static U32 cctx_gen; |
133 | static U32 cctx_gen; |
133 | static size_t cctx_stacksize = CORO_STACKSIZE; |
134 | static size_t cctx_stacksize = CORO_STACKSIZE; |
134 | static struct CoroAPI coroapi; |
135 | static struct CoroAPI coroapi; |
135 | static AV *main_mainstack; /* used to differentiate between $main and others */ |
136 | static AV *main_mainstack; /* used to differentiate between $main and others */ |
… | |
… | |
242 | CV *startcv; /* the CV to execute */ |
243 | CV *startcv; /* the CV to execute */ |
243 | AV *args; /* data associated with this coroutine (initial args) */ |
244 | AV *args; /* data associated with this coroutine (initial args) */ |
244 | int refcnt; /* coroutines are refcounted, yes */ |
245 | int refcnt; /* coroutines are refcounted, yes */ |
245 | int flags; /* CF_ flags */ |
246 | int flags; /* CF_ flags */ |
246 | HV *hv; /* the perl hash associated with this coro, if any */ |
247 | HV *hv; /* the perl hash associated with this coro, if any */ |
247 | void (*on_destroy)(pTHX_ struct coro *coro); /* for temporary use by xs in critical sections */ |
|
|
248 | |
248 | |
249 | /* statistics */ |
249 | /* statistics */ |
250 | int usecount; /* number of transfers to this coro */ |
250 | int usecount; /* number of transfers to this coro */ |
251 | |
251 | |
252 | /* coro process data */ |
252 | /* coro process data */ |
… | |
… | |
292 | |
292 | |
293 | /* for Coro.pm */ |
293 | /* for Coro.pm */ |
294 | static SV *coro_current; |
294 | static SV *coro_current; |
295 | static SV *coro_readyhook; |
295 | static SV *coro_readyhook; |
296 | static struct coro *coro_ready [CORO_PRIO_MAX - CORO_PRIO_MIN + 1][2]; /* head|tail */ |
296 | static struct coro *coro_ready [CORO_PRIO_MAX - CORO_PRIO_MIN + 1][2]; /* head|tail */ |
297 | static CV *cv_coro_run, *cv_coro_terminate; |
297 | static CV *cv_coro_run; |
298 | static struct coro *coro_first; |
298 | static struct coro *coro_first; |
299 | #define coro_nready coroapi.nready |
299 | #define coro_nready coroapi.nready |
300 | |
300 | |
301 | /** Coro::Select ************************************************************/ |
301 | /** Coro::Select ************************************************************/ |
302 | |
302 | |
… | |
… | |
1154 | destroy_perl (pTHX_ struct coro *coro) |
1154 | destroy_perl (pTHX_ struct coro *coro) |
1155 | { |
1155 | { |
1156 | SV *svf [9]; |
1156 | SV *svf [9]; |
1157 | |
1157 | |
1158 | { |
1158 | { |
|
|
1159 | SV *old_current = SvRV (coro_current); |
1159 | struct coro *current = SvSTATE_current; |
1160 | struct coro *current = SvSTATE (old_current); |
1160 | |
1161 | |
1161 | assert (("FATAL: tried to destroy currently running coroutine", coro->mainstack != PL_mainstack)); |
1162 | assert (("FATAL: tried to destroy currently running coroutine", coro->mainstack != PL_mainstack)); |
1162 | |
1163 | |
1163 | save_perl (aTHX_ current); |
1164 | save_perl (aTHX_ current); |
1164 | SvRV_set (coro_current, (SV *)coro->hv); /* this will cause acroak in transfer_check */ |
1165 | |
|
|
1166 | /* this will cause transfer_check to croak on block*/ |
|
|
1167 | SvRV_set (coro_current, (SV *)coro->hv); |
|
|
1168 | |
1165 | load_perl (aTHX_ coro); |
1169 | load_perl (aTHX_ coro); |
1166 | |
1170 | |
1167 | coro_unwind_stacks (aTHX); |
1171 | coro_unwind_stacks (aTHX); |
1168 | coro_destruct_stacks (aTHX); |
1172 | coro_destruct_stacks (aTHX); |
1169 | |
1173 | |
… | |
… | |
1180 | svf [6] = (SV *)GvHV (PL_hintgv); |
1184 | svf [6] = (SV *)GvHV (PL_hintgv); |
1181 | svf [7] = PL_diehook; |
1185 | svf [7] = PL_diehook; |
1182 | svf [8] = PL_warnhook; |
1186 | svf [8] = PL_warnhook; |
1183 | assert (9 == sizeof (svf) / sizeof (*svf)); |
1187 | assert (9 == sizeof (svf) / sizeof (*svf)); |
1184 | |
1188 | |
1185 | SvRV_set (coro_current, (SV *)current->hv); |
1189 | SvRV_set (coro_current, old_current); |
|
|
1190 | |
1186 | load_perl (aTHX_ current); |
1191 | load_perl (aTHX_ current); |
1187 | } |
1192 | } |
1188 | |
1193 | |
1189 | { |
1194 | { |
1190 | unsigned int i; |
1195 | unsigned int i; |
… | |
… | |
1674 | coro_state_destroy (pTHX_ struct coro *coro) |
1679 | coro_state_destroy (pTHX_ struct coro *coro) |
1675 | { |
1680 | { |
1676 | if (coro->flags & CF_DESTROYED) |
1681 | if (coro->flags & CF_DESTROYED) |
1677 | return; |
1682 | return; |
1678 | |
1683 | |
1679 | /* this callback is reserved for slf functions needing to do cleanup */ |
|
|
1680 | if (coro->on_destroy && !PL_dirty) |
|
|
1681 | coro->on_destroy (aTHX_ coro); |
1684 | slf_destroy (aTHX_ coro); |
1682 | |
|
|
1683 | /* |
|
|
1684 | * The on_destroy above most likely is from an SLF call. |
|
|
1685 | * Since by definition the SLF call will not finish when we destroy |
|
|
1686 | * the coro, we will have to force-finish it here, otherwise |
|
|
1687 | * cleanup functions cannot call SLF functions. |
|
|
1688 | */ |
|
|
1689 | coro->slf_frame.prepare = 0; |
|
|
1690 | |
1685 | |
1691 | coro->flags |= CF_DESTROYED; |
1686 | coro->flags |= CF_DESTROYED; |
1692 | |
1687 | |
1693 | if (coro->flags & CF_READY) |
1688 | if (coro->flags & CF_READY) |
1694 | { |
1689 | { |
… | |
… | |
2147 | frame->check = slf_check_nop; |
2142 | frame->check = slf_check_nop; |
2148 | } |
2143 | } |
2149 | } |
2144 | } |
2150 | } |
2145 | } |
2151 | |
2146 | |
|
|
2147 | static int |
|
|
2148 | slf_check_safe_cancel (pTHX_ struct CoroSLF *frame) |
|
|
2149 | { |
|
|
2150 | frame->prepare = 0; |
|
|
2151 | coro_unwind_stacks (); |
|
|
2152 | |
|
|
2153 | slf_init_terminate_cancel_common (aTHX_ frame, (HV *)SvRV (coro_current)); |
|
|
2154 | |
|
|
2155 | return 1; |
|
|
2156 | } |
|
|
2157 | |
|
|
2158 | static int |
|
|
2159 | safe_cancel (pTHX_ struct coro *coro, SV **arg, int items) |
|
|
2160 | { |
|
|
2161 | if (coro->cctx) |
|
|
2162 | croak ("coro inside C callback, unable to cancel at this time, caught"); |
|
|
2163 | |
|
|
2164 | if (coro->flags & CF_NEW) |
|
|
2165 | { |
|
|
2166 | coro_set_status (coro->hv, arg, items); |
|
|
2167 | coro_state_destroy (aTHX_ coro); |
|
|
2168 | coro_call_on_destroy (aTHX_ coro); |
|
|
2169 | } |
|
|
2170 | else |
|
|
2171 | { |
|
|
2172 | if (!coro->slf_frame.prepare) |
|
|
2173 | croak ("coro outside an SLF function, unable to cancel at this time, caught"); |
|
|
2174 | |
|
|
2175 | slf_destroy (coro); |
|
|
2176 | |
|
|
2177 | coro_set_status (coro->hv, arg, items); |
|
|
2178 | coro->slf_frame.prepare = prepare_nop; |
|
|
2179 | coro->slf_frame.check = slf_check_safe_cancel; |
|
|
2180 | |
|
|
2181 | api_ready (aTHX_ coro->hv); |
|
|
2182 | } |
|
|
2183 | |
|
|
2184 | return 1; |
|
|
2185 | } |
|
|
2186 | |
2152 | /*****************************************************************************/ |
2187 | /*****************************************************************************/ |
2153 | /* async pool handler */ |
2188 | /* async pool handler */ |
2154 | |
2189 | |
2155 | static int |
2190 | static int |
2156 | slf_check_pool_handler (pTHX_ struct CoroSLF *frame) |
2191 | slf_check_pool_handler (pTHX_ struct CoroSLF *frame) |
… | |
… | |
2194 | coro->saved_deffh = 0; |
2229 | coro->saved_deffh = 0; |
2195 | |
2230 | |
2196 | if (coro_rss (aTHX_ coro) > SvUV (sv_pool_rss) |
2231 | if (coro_rss (aTHX_ coro) > SvUV (sv_pool_rss) |
2197 | || av_len (av_async_pool) + 1 >= SvIV (sv_pool_size)) |
2232 | || av_len (av_async_pool) + 1 >= SvIV (sv_pool_size)) |
2198 | { |
2233 | { |
2199 | coro->invoke_cb = SvREFCNT_inc_NN ((SV *)cv_coro_terminate); |
2234 | slf_init_terminate_cancel_common (aTHX_ frame, hv); |
2200 | coro->invoke_av = newAV (); |
2235 | return; |
2201 | |
|
|
2202 | frame->prepare = prepare_nop; |
|
|
2203 | } |
2236 | } |
2204 | else |
2237 | else |
2205 | { |
2238 | { |
2206 | av_clear (GvAV (PL_defgv)); |
2239 | av_clear (GvAV (PL_defgv)); |
2207 | hv_store (hv, "desc", sizeof ("desc") - 1, SvREFCNT_inc_NN (sv_async_pool_idle), 0); |
2240 | hv_store (hv, "desc", sizeof ("desc") - 1, SvREFCNT_inc_NN (sv_async_pool_idle), 0); |
… | |
… | |
2430 | static void |
2463 | static void |
2431 | slf_init_cede_notself (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items) |
2464 | slf_init_cede_notself (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items) |
2432 | { |
2465 | { |
2433 | frame->prepare = prepare_cede_notself; |
2466 | frame->prepare = prepare_cede_notself; |
2434 | frame->check = slf_check_nop; |
2467 | frame->check = slf_check_nop; |
|
|
2468 | } |
|
|
2469 | |
|
|
2470 | /* "undo"/cancel a running slf call - used when cancelling a coro, mainly */ |
|
|
2471 | static void |
|
|
2472 | slf_destroy (pTHX_ struct coro *coro) |
|
|
2473 | { |
|
|
2474 | /* this callback is reserved for slf functions needing to do cleanup */ |
|
|
2475 | if (coro->slf_frame.destroy && coro->slf_frame.prepare && !PL_dirty) |
|
|
2476 | coro->slf_frame.destroy (aTHX_ coro); |
|
|
2477 | |
|
|
2478 | /* |
|
|
2479 | * The on_destroy above most likely is from an SLF call. |
|
|
2480 | * Since by definition the SLF call will not finish when we destroy |
|
|
2481 | * the coro, we will have to force-finish it here, otherwise |
|
|
2482 | * cleanup functions cannot call SLF functions. |
|
|
2483 | */ |
|
|
2484 | coro->slf_frame.prepare = 0; |
2435 | } |
2485 | } |
2436 | |
2486 | |
2437 | /* |
2487 | /* |
2438 | * these not obviously related functions are all rolled into one |
2488 | * these not obviously related functions are all rolled into one |
2439 | * function to increase chances that they all will call transfer with the same |
2489 | * function to increase chances that they all will call transfer with the same |
… | |
… | |
2765 | SvREFCNT_dec (cb); |
2815 | SvREFCNT_dec (cb); |
2766 | } |
2816 | } |
2767 | } |
2817 | } |
2768 | |
2818 | |
2769 | static void |
2819 | static void |
2770 | coro_semaphore_on_destroy (pTHX_ struct coro *coro) |
2820 | coro_semaphore_destroy (pTHX_ struct coro *coro) |
2771 | { |
2821 | { |
2772 | /* call $sem->adjust (0) to possibly wake up some other waiters */ |
2822 | /* call $sem->adjust (0) to possibly wake up some other waiters */ |
2773 | coro_semaphore_adjust (aTHX_ (AV *)coro->slf_frame.data, 0); |
2823 | coro_semaphore_adjust (aTHX_ (AV *)coro->slf_frame.data, 0); |
2774 | } |
2824 | } |
2775 | |
2825 | |
… | |
… | |
2782 | /* if we are about to throw, don't actually acquire the lock, just throw */ |
2832 | /* if we are about to throw, don't actually acquire the lock, just throw */ |
2783 | if (CORO_THROW) |
2833 | if (CORO_THROW) |
2784 | return 0; |
2834 | return 0; |
2785 | else if (SvIVX (count_sv) > 0) |
2835 | else if (SvIVX (count_sv) > 0) |
2786 | { |
2836 | { |
2787 | SvSTATE_current->on_destroy = 0; |
2837 | frame->destroy = 0; |
2788 | |
2838 | |
2789 | if (acquire) |
2839 | if (acquire) |
2790 | SvIVX (count_sv) = SvIVX (count_sv) - 1; |
2840 | SvIVX (count_sv) = SvIVX (count_sv) - 1; |
2791 | else |
2841 | else |
2792 | coro_semaphore_adjust (aTHX_ av, 0); |
2842 | coro_semaphore_adjust (aTHX_ av, 0); |
… | |
… | |
2835 | { |
2885 | { |
2836 | av_push (av, SvREFCNT_inc (SvRV (coro_current))); |
2886 | av_push (av, SvREFCNT_inc (SvRV (coro_current))); |
2837 | |
2887 | |
2838 | frame->data = (void *)sv_2mortal (SvREFCNT_inc ((SV *)av)); |
2888 | frame->data = (void *)sv_2mortal (SvREFCNT_inc ((SV *)av)); |
2839 | frame->prepare = prepare_schedule; |
2889 | frame->prepare = prepare_schedule; |
2840 | |
|
|
2841 | /* to avoid race conditions when a woken-up coro gets terminated */ |
2890 | /* to avoid race conditions when a woken-up coro gets terminated */ |
2842 | /* we arrange for a temporary on_destroy that calls adjust (0) */ |
2891 | /* we arrange for a temporary on_destroy that calls adjust (0) */ |
2843 | SvSTATE_current->on_destroy = coro_semaphore_on_destroy; |
2892 | frame->destroy = coro_semaphore_destroy; |
2844 | } |
2893 | } |
2845 | } |
2894 | } |
2846 | |
2895 | |
2847 | static void |
2896 | static void |
2848 | slf_init_semaphore_down (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items) |
2897 | slf_init_semaphore_down (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items) |
… | |
… | |
3543 | BOOT: |
3592 | BOOT: |
3544 | { |
3593 | { |
3545 | sv_pool_rss = coro_get_sv (aTHX_ "Coro::POOL_RSS" , TRUE); |
3594 | sv_pool_rss = coro_get_sv (aTHX_ "Coro::POOL_RSS" , TRUE); |
3546 | sv_pool_size = coro_get_sv (aTHX_ "Coro::POOL_SIZE" , TRUE); |
3595 | sv_pool_size = coro_get_sv (aTHX_ "Coro::POOL_SIZE" , TRUE); |
3547 | cv_coro_run = get_cv ( "Coro::_coro_run" , GV_ADD); |
3596 | cv_coro_run = get_cv ( "Coro::_coro_run" , GV_ADD); |
3548 | cv_coro_terminate = get_cv ( "Coro::terminate" , GV_ADD); |
|
|
3549 | coro_current = coro_get_sv (aTHX_ "Coro::current" , FALSE); SvREADONLY_on (coro_current); |
3597 | coro_current = coro_get_sv (aTHX_ "Coro::current" , FALSE); SvREADONLY_on (coro_current); |
3550 | av_async_pool = coro_get_av (aTHX_ "Coro::async_pool", TRUE); |
3598 | av_async_pool = coro_get_av (aTHX_ "Coro::async_pool", TRUE); |
3551 | av_destroy = coro_get_av (aTHX_ "Coro::destroy" , TRUE); |
3599 | av_destroy = coro_get_av (aTHX_ "Coro::destroy" , TRUE); |
3552 | sv_manager = coro_get_sv (aTHX_ "Coro::manager" , TRUE); |
3600 | sv_manager = coro_get_sv (aTHX_ "Coro::manager" , TRUE); |
3553 | sv_idle = coro_get_sv (aTHX_ "Coro::idle" , TRUE); |
3601 | sv_idle = coro_get_sv (aTHX_ "Coro::idle" , TRUE); |
… | |
… | |
3607 | |
3655 | |
3608 | void |
3656 | void |
3609 | cancel (...) |
3657 | cancel (...) |
3610 | CODE: |
3658 | CODE: |
3611 | CORO_EXECUTE_SLF_XS (slf_init_cancel); |
3659 | CORO_EXECUTE_SLF_XS (slf_init_cancel); |
|
|
3660 | |
|
|
3661 | int |
|
|
3662 | safe_cancel (Coro::State self, ...) |
|
|
3663 | C_ARGS: aTHX_ self, &ST (1), items - 1 |
3612 | |
3664 | |
3613 | void |
3665 | void |
3614 | schedule (...) |
3666 | schedule (...) |
3615 | CODE: |
3667 | CODE: |
3616 | CORO_EXECUTE_SLF_XS (slf_init_schedule); |
3668 | CORO_EXECUTE_SLF_XS (slf_init_schedule); |