… | |
… | |
283 | int usecount; /* number of transfers to this coro */ |
283 | int usecount; /* number of transfers to this coro */ |
284 | |
284 | |
285 | /* coro process data */ |
285 | /* coro process data */ |
286 | int prio; |
286 | int prio; |
287 | SV *except; /* exception to be thrown */ |
287 | SV *except; /* exception to be thrown */ |
288 | SV *rouse_cb; /* last rouse callback */ |
288 | SV *rouse_cb; /* most recently created rouse callback */ |
289 | AV *on_destroy; /* callbacks or coros to notify on destroy */ |
289 | AV *on_destroy; /* callbacks or coros to notify on destroy */ |
290 | AV *status; /* the exit status list */ |
290 | AV *status; /* the exit status list */ |
291 | |
291 | |
292 | /* async_pool */ |
292 | /* async_pool */ |
293 | SV *saved_deffh; |
293 | SV *saved_deffh; |
… | |
… | |
2504 | static void |
2504 | static void |
2505 | coro_rouse_callback (pTHX_ CV *cv) |
2505 | coro_rouse_callback (pTHX_ CV *cv) |
2506 | { |
2506 | { |
2507 | dXSARGS; |
2507 | dXSARGS; |
2508 | SV *data = (SV *)S_GENSUB_ARG; |
2508 | SV *data = (SV *)S_GENSUB_ARG; |
|
|
2509 | SV *coro = SvRV (data); |
2509 | |
2510 | |
2510 | /* data starts being the coro, and is replaced by the results when done */ |
2511 | /* data starts being either undef or a coro, and is replaced by the results when done */ |
2511 | if (SvTYPE (SvRV (data)) != SVt_PVAV) |
2512 | if (SvTYPE (coro) != SVt_PVAV) |
2512 | { |
2513 | { |
2513 | /* first call, set args */ |
2514 | /* first call, set args */ |
2514 | SV *coro = SvRV (data); |
|
|
2515 | AV *av = newAV (); |
|
|
2516 | |
2515 | |
2517 | SvRV_set (data, (SV *)av); |
2516 | assert (&ST (0) < &ST (1)); /* ensure the stack is in the order we expect it to be */ |
|
|
2517 | SvRV_set (data, (SV *)av_make (items, &ST (0))); /* av_make copies the SVs */ |
2518 | |
2518 | |
2519 | /* better take a full copy of the arguments */ |
2519 | if (coro != &PL_sv_undef) |
2520 | while (items--) |
2520 | { |
2521 | av_store (av, items, newSVsv (ST (items))); |
|
|
2522 | |
|
|
2523 | api_ready (aTHX_ coro); |
2521 | api_ready (aTHX_ coro); |
2524 | SvREFCNT_dec_NN (coro); |
2522 | SvREFCNT_dec_NN (coro); |
|
|
2523 | } |
2525 | } |
2524 | } |
2526 | |
2525 | |
2527 | XSRETURN_EMPTY; |
2526 | XSRETURN_EMPTY; |
2528 | } |
2527 | } |
2529 | |
2528 | |
… | |
… | |
2546 | |
2545 | |
2547 | EXTEND (SP, AvFILLp (av) + 1); |
2546 | EXTEND (SP, AvFILLp (av) + 1); |
2548 | for (i = 0; i <= AvFILLp (av); ++i) |
2547 | for (i = 0; i <= AvFILLp (av); ++i) |
2549 | PUSHs (sv_2mortal (AvARRAY (av)[i])); |
2548 | PUSHs (sv_2mortal (AvARRAY (av)[i])); |
2550 | |
2549 | |
2551 | /* we have stolen the elements, so set length to zero and free */ |
2550 | /* we have stolen the elements, make it unreal and free */ |
2552 | AvFILLp (av) = -1; |
2551 | AvREAL_off (av); |
2553 | av_undef (av); |
2552 | av_undef (av); |
2554 | |
2553 | |
2555 | PUTBACK; |
2554 | PUTBACK; |
2556 | } |
2555 | } |
2557 | |
2556 | |
… | |
… | |
2582 | croak ("Coro::rouse_wait called with illegal callback argument,"); |
2581 | croak ("Coro::rouse_wait called with illegal callback argument,"); |
2583 | |
2582 | |
2584 | { |
2583 | { |
2585 | CV *cv = (CV *)SvRV (cb); /* for S_GENSUB_ARG */ |
2584 | CV *cv = (CV *)SvRV (cb); /* for S_GENSUB_ARG */ |
2586 | SV *data = (SV *)S_GENSUB_ARG; |
2585 | SV *data = (SV *)S_GENSUB_ARG; |
|
|
2586 | int data_ready = SvTYPE (SvRV (data)) == SVt_PVAV; |
|
|
2587 | |
|
|
2588 | /* if there is no data, we need to store the current coro in the reference so we can be woken up */ |
|
|
2589 | if (!data_ready) |
|
|
2590 | if (SvRV (data) != &PL_sv_undef) |
|
|
2591 | croak ("Coro::rouse_wait was called on a calback that is already being waited for - only one thread can wait for a rouse callback, caught"); |
|
|
2592 | else |
|
|
2593 | SvRV_set (data, SvREFCNT_inc_NN (SvRV (coro_current))); |
2587 | |
2594 | |
2588 | frame->data = (void *)data; |
2595 | frame->data = (void *)data; |
2589 | frame->prepare = SvTYPE (SvRV (data)) == SVt_PVAV ? prepare_nop : prepare_schedule; |
2596 | frame->prepare = data_ready ? prepare_nop : prepare_schedule; |
2590 | frame->check = slf_check_rouse_wait; |
2597 | frame->check = slf_check_rouse_wait; |
2591 | } |
2598 | } |
2592 | } |
2599 | } |
2593 | |
2600 | |
2594 | static SV * |
2601 | static SV * |
2595 | coro_new_rouse_cb (pTHX) |
2602 | coro_new_rouse_cb (pTHX) |
2596 | { |
2603 | { |
2597 | HV *hv = (HV *)SvRV (coro_current); |
2604 | HV *hv = (HV *)SvRV (coro_current); |
2598 | struct coro *coro = SvSTATE_hv (hv); |
2605 | struct coro *coro = SvSTATE_hv (hv); |
2599 | SV *data = newRV_inc ((SV *)hv); |
2606 | SV *data = newRV_noinc (&PL_sv_undef); |
2600 | SV *cb = s_gensub (aTHX_ coro_rouse_callback, (void *)data); |
2607 | SV *cb = s_gensub (aTHX_ coro_rouse_callback, (void *)data); |
2601 | |
2608 | |
2602 | sv_magicext (SvRV (cb), data, CORO_MAGIC_type_rouse, 0, 0, 0); |
2609 | sv_magicext (SvRV (cb), data, CORO_MAGIC_type_rouse, 0, 0, 0); |
2603 | SvREFCNT_dec_NN (data); /* magicext increases the refcount */ |
2610 | SvREFCNT_dec_NN (data); /* magicext increases the refcount */ |
2604 | |
2611 | |