--- Coro/Coro/State.xs 2008/11/19 02:41:31 1.301 +++ Coro/Coro/State.xs 2008/11/19 04:48:24 1.302 @@ -18,7 +18,7 @@ # undef setjmp # undef longjmp # undef _exit -# define setjmp _setjmp // deep magic, don't ask +# define setjmp _setjmp /* deep magic */ #else # include /* most portable stdint.h */ #endif @@ -260,6 +260,7 @@ /* coro process data */ int prio; SV *except; /* exception to be thrown */ + SV *rouse_cb; /* async_pool */ SV *saved_deffh; @@ -920,8 +921,9 @@ SvREFCNT_dec (PL_diehook); SvREFCNT_dec (PL_warnhook); - SvREFCNT_dec (coro->saved_deffh); SvREFCNT_dec (CORO_THROW); + SvREFCNT_dec (coro->saved_deffh); + SvREFCNT_dec (coro->rouse_cb); coro_destruct_stacks (aTHX); } @@ -1499,6 +1501,29 @@ TRANSFER (ta, 1); } +/*****************************************************************************/ +/* gensub: simple closure generation utility */ + +#define GENSUB_ARG CvXSUBANY (cv).any_ptr + +/* create a closure from XS, returns a code reference */ +/* the arg can be accessed via GENSUB_ARG from the callback */ +/* the callback must use dXSARGS/XSRETURN */ +static SV * +gensub (pTHX_ void (*xsub)(pTHX_ CV *), void *arg) +{ + CV *cv = (CV *)newSV (0); + + sv_upgrade ((SV *)cv, SVt_PVCV); + + CvANON_on (cv); + CvISXSUB_on (cv); + CvXSUB (cv) = xsub; + GENSUB_ARG = arg; + + return newRV_noinc ((SV *)cv); +} + /** Coro ********************************************************************/ INLINE void @@ -1708,6 +1733,111 @@ } /*****************************************************************************/ +/* rouse callback */ + +#define CORO_MAGIC_type_rouse PERL_MAGIC_ext + +static void +coro_rouse_callback (pTHX_ CV *cv) +{ + dXSARGS; + SV *data = (SV *)GENSUB_ARG; + + if (SvTYPE (SvRV (data)) != SVt_PVAV) + { + /* first call, set args */ + int i; + AV *av = newAV (); + SV *coro = SvRV (data); + + SvRV_set (data, (SV *)av); + api_ready (aTHX_ coro); + SvREFCNT_dec (coro); + + while (items--) + av_store (av, items, SvREFCNT_inc_NN (ST (items))); + } + + XSRETURN_EMPTY; +} + +static int +slf_check_rouse_wait (pTHX_ struct CoroSLF *frame) +{ + SV *data = (SV *)frame->data; + + if (CORO_THROW) + return 0; + + if (SvTYPE (SvRV (data)) != SVt_PVAV) + return 1; + + /* now push all results on the stack */ + { + dSP; + AV *av = (AV *)SvRV (data); + int i; + + EXTEND (SP, AvFILLp (av) + 1); + for (i = 0; i <= AvFILLp (av); ++i) + PUSHs (AvARRAY (av)[i]); + + PUTBACK; + } + + return 0; +} + +static void +slf_init_rouse_wait (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items) +{ + SV *cb; + + if (items) + cb = arg [0]; + else + { + struct coro *coro = SvSTATE_current; + + if (!coro->rouse_cb) + croak ("Coro::rouse_wait called without rouse callback, and no default rouse callback found either,"); + + cb = sv_2mortal (coro->rouse_cb); + coro->rouse_cb = 0; + } + + if (!SvROK (cb) + || SvTYPE (SvRV (cb)) != SVt_PVCV + || CvXSUB ((CV *)SvRV (cb)) != coro_rouse_callback) + croak ("Coro::rouse_wait called with illegal callback argument,"); + + { + CV *cv = (CV *)SvRV (cb); /* for GENSUB_ARG */ + SV *data = (SV *)GENSUB_ARG; + + frame->data = (void *)data; + frame->prepare = SvTYPE (SvRV (data)) == SVt_PVAV ? prepare_nop : prepare_schedule; + frame->check = slf_check_rouse_wait; + } +} + +static SV * +coro_new_rouse_cb (pTHX) +{ + HV *hv = (HV *)SvRV (coro_current); + struct coro *coro = SvSTATE_hv (hv); + SV *data = newRV_inc ((SV *)hv); + SV *cb = gensub (aTHX_ coro_rouse_callback, (void *)data); + + sv_magicext (SvRV (cb), data, CORO_MAGIC_type_rouse, 0, 0, 0); + + SvREFCNT_dec (coro->rouse_cb); + coro->rouse_cb = SvREFCNT_inc_NN (cb); + + return cb; +} + +/*****************************************************************************/ /* schedule-like-function opcode (SLF) */ static UNOP slf_restore; /* restore stack as entersub did, for first-re-run */ @@ -2199,29 +2329,6 @@ } /*****************************************************************************/ -/* gensub: simple closure generation utility */ - -#define GENSUB_ARG CvXSUBANY (cv).any_ptr - -/* create a closure from XS, returns a code reference */ -/* the arg can be accessed via GENSUB_ARG from the callback */ -/* the callback must use dXSARGS/XSRETURN */ -static SV * -gensub (pTHX_ void (*xsub)(pTHX_ CV *), void *arg) -{ - CV *cv = (CV *)newSV (0); - - sv_upgrade ((SV *)cv, SVt_PVCV); - - CvANON_on (cv); - CvISXSUB_on (cv); - CvXSUB (cv) = xsub; - GENSUB_ARG = arg; - - return newRV_noinc ((SV *)cv); -} - -/*****************************************************************************/ /* Coro::AIO */ #define CORO_MAGIC_type_aio PERL_MAGIC_ext @@ -2869,6 +2976,20 @@ av_push (av_async_pool, newSVsv (coro_current)); } +SV * +rouse_cb () + PROTOTYPE: + CODE: + RETVAL = coro_new_rouse_cb (aTHX); + OUTPUT: + RETVAL + +void +rouse_wait (SV *cb = 0) + PROTOTYPE: ;$ + PPCODE: + CORO_EXECUTE_SLF_XS (slf_init_rouse_wait); + MODULE = Coro::State PACKAGE = PerlIO::cede