ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/Coro/Coro/State.xs
(Generate patch)

Comparing Coro/Coro/State.xs (file contents):
Revision 1.347 by root, Wed Jun 10 00:00:02 2009 UTC vs.
Revision 1.358 by root, Mon Jun 29 04:30:25 2009 UTC

137 137
138#define IN_DESTRUCT PL_dirty 138#define IN_DESTRUCT PL_dirty
139 139
140#if __GNUC__ >= 3 140#if __GNUC__ >= 3
141# define attribute(x) __attribute__(x) 141# define attribute(x) __attribute__(x)
142# define expect(expr,value) __builtin_expect ((expr),(value)) 142# define expect(expr,value) __builtin_expect ((expr), (value))
143# define INLINE static inline 143# define INLINE static inline
144#else 144#else
145# define attribute(x) 145# define attribute(x)
146# define expect(expr,value) (expr) 146# define expect(expr,value) (expr)
147# define INLINE static 147# define INLINE static
156#define GCoroAPI (&coroapi) /* very sneaky */ 156#define GCoroAPI (&coroapi) /* very sneaky */
157 157
158#ifdef USE_ITHREADS 158#ifdef USE_ITHREADS
159# if CORO_PTHREAD 159# if CORO_PTHREAD
160static void *coro_thx; 160static void *coro_thx;
161# endif
162#endif
163
164#ifdef __linux
165# include <time.h> /* for timespec */
166# include <syscall.h> /* for SYS_* */
167# ifdef SYS_clock_gettime
168# define coro_clock_gettime(id, ts) syscall (SYS_clock_gettime, (id), (ts))
169# define CORO_CLOCK_MONOTONIC 1
170# define CORO_CLOCK_THREAD_CPUTIME_ID 3
161# endif 171# endif
162#endif 172#endif
163 173
164static double (*nvtime)(); /* so why doesn't it take void? */ 174static double (*nvtime)(); /* so why doesn't it take void? */
165 175
307/* the mainr easonw e don't support windows process emulation */ 317/* the mainr easonw e don't support windows process emulation */
308static struct CoroSLF slf_frame; /* the current slf frame */ 318static struct CoroSLF slf_frame; /* the current slf frame */
309 319
310/** Coro ********************************************************************/ 320/** Coro ********************************************************************/
311 321
312#define PRIO_MAX 3 322#define CORO_PRIO_MAX 3
313#define PRIO_HIGH 1 323#define CORO_PRIO_HIGH 1
314#define PRIO_NORMAL 0 324#define CORO_PRIO_NORMAL 0
315#define PRIO_LOW -1 325#define CORO_PRIO_LOW -1
316#define PRIO_IDLE -3 326#define CORO_PRIO_IDLE -3
317#define PRIO_MIN -4 327#define CORO_PRIO_MIN -4
318 328
319/* for Coro.pm */ 329/* for Coro.pm */
320static SV *coro_current; 330static SV *coro_current;
321static SV *coro_readyhook; 331static SV *coro_readyhook;
322static struct coro *coro_ready [PRIO_MAX - PRIO_MIN + 1][2]; /* head|tail */ 332static struct coro *coro_ready [CORO_PRIO_MAX - CORO_PRIO_MIN + 1][2]; /* head|tail */
323static CV *cv_coro_run, *cv_coro_terminate; 333static CV *cv_coro_run, *cv_coro_terminate;
324static struct coro *coro_first; 334static struct coro *coro_first;
325#define coro_nready coroapi.nready 335#define coro_nready coroapi.nready
326 336
327/** lowlevel stuff **********************************************************/ 337/** lowlevel stuff **********************************************************/
475static int 485static int
476coro_cv_free (pTHX_ SV *sv, MAGIC *mg) 486coro_cv_free (pTHX_ SV *sv, MAGIC *mg)
477{ 487{
478 AV *padlist; 488 AV *padlist;
479 AV *av = (AV *)mg->mg_obj; 489 AV *av = (AV *)mg->mg_obj;
490
491 /* perl manages to free our internal AV and _then_ call us */
492 if (IN_DESTRUCT)
493 return;
480 494
481 /* casting is fun. */ 495 /* casting is fun. */
482 while (&PL_sv_undef != (SV *)(padlist = (AV *)av_pop (av))) 496 while (&PL_sv_undef != (SV *)(padlist = (AV *)av_pop (av)))
483 free_padlist (aTHX_ padlist); 497 free_padlist (aTHX_ padlist);
484 498
615 { 629 {
616 while (expect_true (cxix >= 0)) 630 while (expect_true (cxix >= 0))
617 { 631 {
618 PERL_CONTEXT *cx = &ccstk[cxix--]; 632 PERL_CONTEXT *cx = &ccstk[cxix--];
619 633
620 if (expect_true (CxTYPE (cx) == CXt_SUB || CxTYPE (cx) == CXt_FORMAT)) 634 if (expect_true (CxTYPE (cx) == CXt_SUB) || expect_false (CxTYPE (cx) == CXt_FORMAT))
621 { 635 {
622 CV *cv = cx->blk_sub.cv; 636 CV *cv = cx->blk_sub.cv;
623 637
624 if (expect_true (CvDEPTH (cv))) 638 if (expect_true (CvDEPTH (cv)))
625 { 639 {
990} 1004}
991 1005
992static void 1006static void
993coro_destruct_perl (pTHX_ struct coro *coro) 1007coro_destruct_perl (pTHX_ struct coro *coro)
994{ 1008{
1009 SV *svf [9];
1010
1011 {
1012 struct coro *current = SvSTATE_current;
1013
1014 assert (("FATAL: tried to destroy currently running coroutine", coro->mainstack != PL_mainstack));
1015
1016 save_perl (aTHX_ current);
1017 load_perl (aTHX_ coro);
1018
995 coro_unwind_stacks (aTHX); 1019 coro_unwind_stacks (aTHX);
1020 coro_destruct_stacks (aTHX);
996 1021
997 SvREFCNT_dec (GvSV (PL_defgv)); 1022 // now save some sv's to be free'd later
998 SvREFCNT_dec (GvAV (PL_defgv)); 1023 svf [0] = GvSV (PL_defgv);
999 SvREFCNT_dec (GvSV (PL_errgv)); 1024 svf [1] = (SV *)GvAV (PL_defgv);
1000 SvREFCNT_dec (PL_defoutgv); 1025 svf [2] = GvSV (PL_errgv);
1001 SvREFCNT_dec (PL_rs); 1026 svf [3] = (SV *)PL_defoutgv;
1002 SvREFCNT_dec (GvSV (irsgv)); 1027 svf [4] = PL_rs;
1003 SvREFCNT_dec (GvHV (PL_hintgv)); 1028 svf [5] = GvSV (irsgv);
1029 svf [6] = (SV *)GvHV (PL_hintgv);
1030 svf [7] = PL_diehook;
1031 svf [8] = PL_warnhook;
1032 assert (9 == sizeof (svf) / sizeof (*svf));
1004 1033
1005 SvREFCNT_dec (PL_diehook); 1034 load_perl (aTHX_ current);
1006 SvREFCNT_dec (PL_warnhook);
1007 1035 }
1036
1037 {
1038 int i;
1039
1040 for (i = 0; i < sizeof (svf) / sizeof (*svf); ++i)
1041 SvREFCNT_dec (svf [i]);
1042
1008 SvREFCNT_dec (coro->saved_deffh); 1043 SvREFCNT_dec (coro->saved_deffh);
1009 SvREFCNT_dec (coro->rouse_cb); 1044 SvREFCNT_dec (coro->rouse_cb);
1010 SvREFCNT_dec (coro->invoke_cb); 1045 SvREFCNT_dec (coro->invoke_cb);
1011 SvREFCNT_dec (coro->invoke_av); 1046 SvREFCNT_dec (coro->invoke_av);
1012 1047 }
1013 coro_destruct_stacks (aTHX);
1014} 1048}
1015 1049
1016INLINE void 1050INLINE void
1017free_coro_mortal (pTHX) 1051free_coro_mortal (pTHX)
1018{ 1052{
1505 1539
1506 if (coro->mainstack 1540 if (coro->mainstack
1507 && coro->mainstack != main_mainstack 1541 && coro->mainstack != main_mainstack
1508 && coro->slot 1542 && coro->slot
1509 && !PL_dirty) 1543 && !PL_dirty)
1510 {
1511 struct coro *current = SvSTATE_current;
1512
1513 assert (("FATAL: tried to destroy currently running coroutine", coro->mainstack != PL_mainstack));
1514
1515 save_perl (aTHX_ current);
1516 load_perl (aTHX_ coro);
1517
1518 coro_destruct_perl (aTHX_ coro); 1544 coro_destruct_perl (aTHX_ coro);
1519
1520 load_perl (aTHX_ current);
1521
1522 coro->slot = 0;
1523 }
1524 1545
1525 cctx_destroy (coro->cctx); 1546 cctx_destroy (coro->cctx);
1526 SvREFCNT_dec (coro->startcv); 1547 SvREFCNT_dec (coro->startcv);
1527 SvREFCNT_dec (coro->args); 1548 SvREFCNT_dec (coro->args);
1528 SvREFCNT_dec (CORO_THROW); 1549 SvREFCNT_dec (CORO_THROW);
1615/** Coro ********************************************************************/ 1636/** Coro ********************************************************************/
1616 1637
1617INLINE void 1638INLINE void
1618coro_enq (pTHX_ struct coro *coro) 1639coro_enq (pTHX_ struct coro *coro)
1619{ 1640{
1620 struct coro **ready = coro_ready [coro->prio - PRIO_MIN]; 1641 struct coro **ready = coro_ready [coro->prio - CORO_PRIO_MIN];
1621 1642
1622 SvREFCNT_inc_NN (coro->hv); 1643 SvREFCNT_inc_NN (coro->hv);
1623 1644
1624 coro->next_ready = 0; 1645 coro->next_ready = 0;
1625 *(ready [0] ? &ready [1]->next_ready : &ready [0]) = coro; 1646 *(ready [0] ? &ready [1]->next_ready : &ready [0]) = coro;
1629INLINE struct coro * 1650INLINE struct coro *
1630coro_deq (pTHX) 1651coro_deq (pTHX)
1631{ 1652{
1632 int prio; 1653 int prio;
1633 1654
1634 for (prio = PRIO_MAX - PRIO_MIN + 1; --prio >= 0; ) 1655 for (prio = CORO_PRIO_MAX - CORO_PRIO_MIN + 1; --prio >= 0; )
1635 { 1656 {
1636 struct coro **ready = coro_ready [prio]; 1657 struct coro **ready = coro_ready [prio];
1637 1658
1638 if (ready [0]) 1659 if (ready [0])
1639 { 1660 {
1640 struct coro *coro = ready [0]; 1661 struct coro *coro = ready [0];
1641 ready [0] = coro->next_ready; 1662 ready [0] = coro->next_ready;
1642 return coro; 1663 return coro;
1643 } 1664 }
1644 } 1665 }
1666
1667 return 0;
1645} 1668}
1646 1669
1647static int 1670static int
1648api_ready (pTHX_ SV *coro_sv) 1671api_ready (pTHX_ SV *coro_sv)
1649{ 1672{
2468/* Coro::Semaphore & Coro::Signal */ 2491/* Coro::Semaphore & Coro::Signal */
2469 2492
2470static SV * 2493static SV *
2471coro_waitarray_new (pTHX_ int count) 2494coro_waitarray_new (pTHX_ int count)
2472{ 2495{
2473 /* a semaphore contains a counter IV in $sem->[0] and any waiters after that */ 2496 /* a waitarray=semaphore contains a counter IV in $sem->[0] and any waiters after that */
2474 AV *av = newAV (); 2497 AV *av = newAV ();
2475 SV **ary; 2498 SV **ary;
2476 2499
2477 /* unfortunately, building manually saves memory */ 2500 /* unfortunately, building manually saves memory */
2478 Newx (ary, 2, SV *); 2501 Newx (ary, 2, SV *);
2622 { 2645 {
2623 /* callback form */ 2646 /* callback form */
2624 AV *av = (AV *)SvRV (arg [0]); 2647 AV *av = (AV *)SvRV (arg [0]);
2625 CV *cb_cv = coro_sv_2cv (aTHX_ arg [1]); 2648 CV *cb_cv = coro_sv_2cv (aTHX_ arg [1]);
2626 2649
2627 av_push (av, (SV *)SvREFCNT_inc_NN (cb_cv)); 2650 av_push (av, SvREFCNT_inc_NN (cb_cv));
2628 2651
2629 if (SvIVX (AvARRAY (av)[0]) > 0) 2652 if (SvIVX (AvARRAY (av)[0]) > 0)
2630 coro_semaphore_adjust (aTHX_ av, 0); 2653 coro_semaphore_adjust (aTHX_ av, 0);
2631 2654
2632 frame->prepare = prepare_nop; 2655 frame->prepare = prepare_nop;
2656 AvARRAY (av)[0] = AvARRAY (av)[1]; 2679 AvARRAY (av)[0] = AvARRAY (av)[1];
2657 AvARRAY (av)[1] = cb; 2680 AvARRAY (av)[1] = cb;
2658 2681
2659 cb = av_shift (av); 2682 cb = av_shift (av);
2660 2683
2684 if (SvTYPE (cb) == SVt_PVCV)
2685 {
2686 dSP;
2687 PUSHMARK (SP);
2688 XPUSHs (sv_2mortal (newRV_inc ((SV *)av)));
2689 PUTBACK;
2690 call_sv (cb, G_VOID | G_DISCARD | G_EVAL | G_KEEPERR);
2691 }
2692 else
2693 {
2661 api_ready (aTHX_ cb); 2694 api_ready (aTHX_ cb);
2662 sv_setiv (cb, 0); /* signal waiter */ 2695 sv_setiv (cb, 0); /* signal waiter */
2696 }
2697
2663 SvREFCNT_dec (cb); 2698 SvREFCNT_dec (cb);
2664 2699
2665 --count; 2700 --count;
2666 } 2701 }
2667} 2702}
2676static void 2711static void
2677slf_init_signal_wait (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items) 2712slf_init_signal_wait (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2678{ 2713{
2679 AV *av = (AV *)SvRV (arg [0]); 2714 AV *av = (AV *)SvRV (arg [0]);
2680 2715
2716 if (items >= 2)
2717 {
2718 CV *cb_cv = coro_sv_2cv (aTHX_ arg [1]);
2719 av_push (av, SvREFCNT_inc_NN (cb_cv));
2720
2681 if (SvIVX (AvARRAY (av)[0])) 2721 if (SvIVX (AvARRAY (av)[0]))
2722 coro_signal_wake (aTHX_ av, 1); /* ust be the only waiter */
2723
2724 frame->prepare = prepare_nop;
2725 frame->check = slf_check_nop;
2726 }
2727 else if (SvIVX (AvARRAY (av)[0]))
2682 { 2728 {
2683 SvIVX (AvARRAY (av)[0]) = 0; 2729 SvIVX (AvARRAY (av)[0]) = 0;
2684 frame->prepare = prepare_nop; 2730 frame->prepare = prepare_nop;
2685 frame->check = slf_check_nop; 2731 frame->check = slf_check_nop;
2686 } 2732 }
2687 else 2733 else
2688 { 2734 {
2689 SV *waiter = newRV_inc (SvRV (coro_current)); /* owned by signal av */ 2735 SV *waiter = newSVsv (coro_current); /* owned by signal av */
2690 2736
2691 av_push (av, waiter); 2737 av_push (av, waiter);
2692 2738
2693 frame->data = (void *)sv_2mortal (SvREFCNT_inc_NN (waiter)); /* owned by process */ 2739 frame->data = (void *)sv_2mortal (SvREFCNT_inc_NN (waiter)); /* owned by process */
2694 frame->prepare = prepare_schedule; 2740 frame->prepare = prepare_schedule;
2935 if (!SvIOK (*svp)) croak ("Time::NVtime isn't a function pointer"); 2981 if (!SvIOK (*svp)) croak ("Time::NVtime isn't a function pointer");
2936 2982
2937 nvtime = INT2PTR (double (*)(), SvIV (*svp)); 2983 nvtime = INT2PTR (double (*)(), SvIV (*svp));
2938 } 2984 }
2939 2985
2940 assert (("PRIO_NORMAL must be 0", !PRIO_NORMAL)); 2986 assert (("PRIO_NORMAL must be 0", !CORO_PRIO_NORMAL));
2941} 2987}
2942 2988
2943SV * 2989SV *
2944new (char *klass, ...) 2990new (char *klass, ...)
2945 ALIAS: 2991 ALIAS:
3147 CODE: 3193 CODE:
3148{ 3194{
3149 struct coro *current = SvSTATE_current; 3195 struct coro *current = SvSTATE_current;
3150 SV **throwp = self == current ? &CORO_THROW : &self->except; 3196 SV **throwp = self == current ? &CORO_THROW : &self->except;
3151 SvREFCNT_dec (*throwp); 3197 SvREFCNT_dec (*throwp);
3198 SvGETMAGIC (throw);
3152 *throwp = SvOK (throw) ? newSVsv (throw) : 0; 3199 *throwp = SvOK (throw) ? newSVsv (throw) : 0;
3153} 3200}
3154 3201
3155void 3202void
3156api_trace (SV *coro, int flags = CC_TRACE | CC_TRACE_SUB) 3203api_trace (SV *coro, int flags = CC_TRACE | CC_TRACE_SUB)
3219 3266
3220MODULE = Coro::State PACKAGE = Coro 3267MODULE = Coro::State PACKAGE = Coro
3221 3268
3222BOOT: 3269BOOT:
3223{ 3270{
3224 int i;
3225
3226 sv_pool_rss = coro_get_sv (aTHX_ "Coro::POOL_RSS" , TRUE); 3271 sv_pool_rss = coro_get_sv (aTHX_ "Coro::POOL_RSS" , TRUE);
3227 sv_pool_size = coro_get_sv (aTHX_ "Coro::POOL_SIZE" , TRUE); 3272 sv_pool_size = coro_get_sv (aTHX_ "Coro::POOL_SIZE" , TRUE);
3228 cv_coro_run = get_cv ( "Coro::_coro_run" , GV_ADD); 3273 cv_coro_run = get_cv ( "Coro::_coro_run" , GV_ADD);
3229 cv_coro_terminate = get_cv ( "Coro::terminate" , GV_ADD); 3274 cv_coro_terminate = get_cv ( "Coro::terminate" , GV_ADD);
3230 coro_current = coro_get_sv (aTHX_ "Coro::current" , FALSE); SvREADONLY_on (coro_current); 3275 coro_current = coro_get_sv (aTHX_ "Coro::current" , FALSE); SvREADONLY_on (coro_current);
3238 cv_pool_handler = get_cv ("Coro::pool_handler", GV_ADD); SvREADONLY_on (cv_pool_handler); 3283 cv_pool_handler = get_cv ("Coro::pool_handler", GV_ADD); SvREADONLY_on (cv_pool_handler);
3239 cv_coro_state_new = get_cv ("Coro::State::new", 0); SvREADONLY_on (cv_coro_state_new); 3284 cv_coro_state_new = get_cv ("Coro::State::new", 0); SvREADONLY_on (cv_coro_state_new);
3240 3285
3241 coro_stash = gv_stashpv ("Coro", TRUE); 3286 coro_stash = gv_stashpv ("Coro", TRUE);
3242 3287
3243 newCONSTSUB (coro_stash, "PRIO_MAX", newSViv (PRIO_MAX)); 3288 newCONSTSUB (coro_stash, "PRIO_MAX", newSViv (CORO_PRIO_MAX));
3244 newCONSTSUB (coro_stash, "PRIO_HIGH", newSViv (PRIO_HIGH)); 3289 newCONSTSUB (coro_stash, "PRIO_HIGH", newSViv (CORO_PRIO_HIGH));
3245 newCONSTSUB (coro_stash, "PRIO_NORMAL", newSViv (PRIO_NORMAL)); 3290 newCONSTSUB (coro_stash, "PRIO_NORMAL", newSViv (CORO_PRIO_NORMAL));
3246 newCONSTSUB (coro_stash, "PRIO_LOW", newSViv (PRIO_LOW)); 3291 newCONSTSUB (coro_stash, "PRIO_LOW", newSViv (CORO_PRIO_LOW));
3247 newCONSTSUB (coro_stash, "PRIO_IDLE", newSViv (PRIO_IDLE)); 3292 newCONSTSUB (coro_stash, "PRIO_IDLE", newSViv (CORO_PRIO_IDLE));
3248 newCONSTSUB (coro_stash, "PRIO_MIN", newSViv (PRIO_MIN)); 3293 newCONSTSUB (coro_stash, "PRIO_MIN", newSViv (CORO_PRIO_MIN));
3249 3294
3250 { 3295 {
3251 SV *sv = coro_get_sv (aTHX_ "Coro::API", TRUE); 3296 SV *sv = coro_get_sv (aTHX_ "Coro::API", TRUE);
3252 3297
3253 coroapi.schedule = api_schedule; 3298 coroapi.schedule = api_schedule;
3305void 3350void
3306_set_readyhook (SV *hook) 3351_set_readyhook (SV *hook)
3307 PROTOTYPE: $ 3352 PROTOTYPE: $
3308 CODE: 3353 CODE:
3309 SvREFCNT_dec (coro_readyhook); 3354 SvREFCNT_dec (coro_readyhook);
3355 SvGETMAGIC (hook);
3310 coro_readyhook = SvOK (hook) ? newSVsv (hook) : 0; 3356 coro_readyhook = SvOK (hook) ? newSVsv (hook) : 0;
3311 3357
3312int 3358int
3313prio (Coro::State coro, int newprio = 0) 3359prio (Coro::State coro, int newprio = 0)
3314 PROTOTYPE: $;$ 3360 PROTOTYPE: $;$
3458MODULE = Coro::State PACKAGE = Coro::Semaphore 3504MODULE = Coro::State PACKAGE = Coro::Semaphore
3459 3505
3460SV * 3506SV *
3461new (SV *klass, SV *count = 0) 3507new (SV *klass, SV *count = 0)
3462 CODE: 3508 CODE:
3509{
3510 int semcnt = 1;
3511
3512 if (count)
3513 {
3514 SvGETMAGIC (count);
3515
3516 if (SvOK (count))
3517 semcnt = SvIV (count);
3518 }
3519
3463 RETVAL = sv_bless ( 3520 RETVAL = sv_bless (
3464 coro_waitarray_new (aTHX_ count && SvOK (count) ? SvIV (count) : 1), 3521 coro_waitarray_new (aTHX_ semcnt),
3465 GvSTASH (CvGV (cv)) 3522 GvSTASH (CvGV (cv))
3466 ); 3523 );
3524}
3467 OUTPUT: 3525 OUTPUT:
3468 RETVAL 3526 RETVAL
3469 3527
3470# helper for Coro::Channel and others 3528# helper for Coro::Channel and others
3471SV * 3529SV *

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines