ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/Coro/Coro/State.xs
(Generate patch)

Comparing Coro/Coro/State.xs (file contents):
Revision 1.279 by root, Sun Nov 16 08:59:16 2008 UTC vs.
Revision 1.280 by root, Sun Nov 16 09:43:18 2008 UTC

142#define NOINLINE attribute ((noinline)) 142#define NOINLINE attribute ((noinline))
143 143
144#include "CoroAPI.h" 144#include "CoroAPI.h"
145 145
146#ifdef USE_ITHREADS 146#ifdef USE_ITHREADS
147
148static perl_mutex coro_lock;
149# define LOCK do { MUTEX_LOCK (&coro_lock); } while (0)
150# define UNLOCK do { MUTEX_UNLOCK (&coro_lock); } while (0)
151# if CORO_PTHREAD 147# if CORO_PTHREAD
152static void *coro_thx; 148static void *coro_thx;
153# endif 149# endif
154
155#else
156
157# define LOCK (void)0
158# define UNLOCK (void)0
159
160#endif 150#endif
161
162# undef LOCK
163# define LOCK (void)0
164# undef UNLOCK
165# define UNLOCK (void)0
166 151
167/* helper storage struct for Coro::AIO */ 152/* helper storage struct for Coro::AIO */
168struct io_state 153struct io_state
169{ 154{
170 AV *res; 155 AV *res;
266 251
267 AV *args; /* data associated with this coroutine (initial args) */ 252 AV *args; /* data associated with this coroutine (initial args) */
268 int refcnt; /* coroutines are refcounted, yes */ 253 int refcnt; /* coroutines are refcounted, yes */
269 int flags; /* CF_ flags */ 254 int flags; /* CF_ flags */
270 HV *hv; /* the perl hash associated with this coro, if any */ 255 HV *hv; /* the perl hash associated with this coro, if any */
256 void (*on_destroy)(pTHX_ struct coro *coro);
271 257
272 /* statistics */ 258 /* statistics */
273 int usecount; /* number of transfers to this coro */ 259 int usecount; /* number of transfers to this coro */
274 260
275 /* coro process data */ 261 /* coro process data */
1093 struct coro *next = (struct coro *)transfer_next; 1079 struct coro *next = (struct coro *)transfer_next;
1094 assert (!(transfer_next = 0)); /* just used for the side effect when asserts are enabled */ 1080 assert (!(transfer_next = 0)); /* just used for the side effect when asserts are enabled */
1095 assert (("FATAL: next coroutine was zero in transfer_tail (please report)", next)); 1081 assert (("FATAL: next coroutine was zero in transfer_tail (please report)", next));
1096 1082
1097 free_coro_mortal (aTHX); 1083 free_coro_mortal (aTHX);
1098 UNLOCK;
1099 1084
1100 if (expect_false (next->throw)) 1085 if (expect_false (next->throw))
1101 { 1086 {
1102 SV *exception = sv_2mortal (next->throw); 1087 SV *exception = sv_2mortal (next->throw);
1103 1088
1338 } 1323 }
1339 1324
1340 prev->flags &= ~CF_RUNNING; 1325 prev->flags &= ~CF_RUNNING;
1341 next->flags |= CF_RUNNING; 1326 next->flags |= CF_RUNNING;
1342 1327
1343 LOCK;
1344
1345 /* first get rid of the old state */ 1328 /* first get rid of the old state */
1346 save_perl (aTHX_ prev); 1329 save_perl (aTHX_ prev);
1347 1330
1348 if (expect_false (next->flags & CF_NEW)) 1331 if (expect_false (next->flags & CF_NEW))
1349 { 1332 {
1406coro_state_destroy (pTHX_ struct coro *coro) 1389coro_state_destroy (pTHX_ struct coro *coro)
1407{ 1390{
1408 if (coro->flags & CF_DESTROYED) 1391 if (coro->flags & CF_DESTROYED)
1409 return 0; 1392 return 0;
1410 1393
1394 if (coro->on_destroy)
1395 coro->on_destroy (aTHX_ coro);
1396
1411 coro->flags |= CF_DESTROYED; 1397 coro->flags |= CF_DESTROYED;
1412 1398
1413 if (coro->flags & CF_READY) 1399 if (coro->flags & CF_READY)
1414 { 1400 {
1415 /* reduce nready, as destroying a ready coro effectively unreadies it */ 1401 /* reduce nready, as destroying a ready coro effectively unreadies it */
1416 /* alternative: look through all ready queues and remove the coro */ 1402 /* alternative: look through all ready queues and remove the coro */
1417 LOCK;
1418 --coro_nready; 1403 --coro_nready;
1419 UNLOCK;
1420 } 1404 }
1421 else 1405 else
1422 coro->flags |= CF_READY; /* make sure it is NOT put into the readyqueue */ 1406 coro->flags |= CF_READY; /* make sure it is NOT put into the readyqueue */
1423 1407
1424 if (coro->mainstack && coro->mainstack != main_mainstack) 1408 if (coro->mainstack && coro->mainstack != main_mainstack)
1537 if (coro->flags & CF_READY) 1521 if (coro->flags & CF_READY)
1538 return 0; 1522 return 0;
1539 1523
1540 coro->flags |= CF_READY; 1524 coro->flags |= CF_READY;
1541 1525
1542 LOCK;
1543
1544 sv_hook = coro_nready ? 0 : coro_readyhook; 1526 sv_hook = coro_nready ? 0 : coro_readyhook;
1545 xs_hook = coro_nready ? 0 : coroapi.readyhook; 1527 xs_hook = coro_nready ? 0 : coroapi.readyhook;
1546 1528
1547 coro_enq (aTHX_ SvREFCNT_inc_NN (coro_sv)); 1529 coro_enq (aTHX_ SvREFCNT_inc_NN (coro_sv));
1548 ++coro_nready; 1530 ++coro_nready;
1549 1531
1550 UNLOCK;
1551
1552 if (sv_hook) 1532 if (sv_hook)
1553 { 1533 {
1554 dSP; 1534 dSP;
1555 1535
1556 ENTER; 1536 ENTER;
1582{ 1562{
1583 SV *prev_sv, *next_sv; 1563 SV *prev_sv, *next_sv;
1584 1564
1585 for (;;) 1565 for (;;)
1586 { 1566 {
1587 LOCK;
1588 next_sv = coro_deq (aTHX); 1567 next_sv = coro_deq (aTHX);
1589 1568
1590 /* nothing to schedule: call the idle handler */ 1569 /* nothing to schedule: call the idle handler */
1591 if (expect_false (!next_sv)) 1570 if (expect_false (!next_sv))
1592 { 1571 {
1593 dSP; 1572 dSP;
1594 UNLOCK;
1595 1573
1596 ENTER; 1574 ENTER;
1597 SAVETMPS; 1575 SAVETMPS;
1598 1576
1599 PUSHMARK (SP); 1577 PUSHMARK (SP);
1609 ta->next = SvSTATE (next_sv); 1587 ta->next = SvSTATE (next_sv);
1610 1588
1611 /* cannot transfer to destroyed coros, skip and look for next */ 1589 /* cannot transfer to destroyed coros, skip and look for next */
1612 if (expect_false (ta->next->flags & CF_DESTROYED)) 1590 if (expect_false (ta->next->flags & CF_DESTROYED))
1613 { 1591 {
1614 UNLOCK;
1615 SvREFCNT_dec (next_sv); 1592 SvREFCNT_dec (next_sv);
1616 /* coro_nready has already been taken care of by destroy */ 1593 /* coro_nready has already been taken care of by destroy */
1617 continue; 1594 continue;
1618 } 1595 }
1619 1596
1620 --coro_nready; 1597 --coro_nready;
1621 UNLOCK;
1622 break; 1598 break;
1623 } 1599 }
1624 1600
1625 /* free this only after the transfer */ 1601 /* free this only after the transfer */
1626 prev_sv = SvRV (coro_current); 1602 prev_sv = SvRV (coro_current);
1628 TRANSFER_CHECK (*ta); 1604 TRANSFER_CHECK (*ta);
1629 assert (("FATAL: next coroutine isn't marked as ready in Coro (please report)", ta->next->flags & CF_READY)); 1605 assert (("FATAL: next coroutine isn't marked as ready in Coro (please report)", ta->next->flags & CF_READY));
1630 ta->next->flags &= ~CF_READY; 1606 ta->next->flags &= ~CF_READY;
1631 SvRV_set (coro_current, next_sv); 1607 SvRV_set (coro_current, next_sv);
1632 1608
1633 LOCK;
1634 free_coro_mortal (aTHX); 1609 free_coro_mortal (aTHX);
1635 coro_mortal = prev_sv; 1610 coro_mortal = prev_sv;
1636 UNLOCK;
1637} 1611}
1638 1612
1639INLINE void 1613INLINE void
1640prepare_cede (pTHX_ struct coro_transfer_args *ta) 1614prepare_cede (pTHX_ struct coro_transfer_args *ta)
1641{ 1615{
1922 items = AvFILLp (av) + 1; 1896 items = AvFILLp (av) + 1;
1923 } 1897 }
1924 1898
1925 PUTBACK; 1899 PUTBACK;
1926 1900
1927 ((coro_slf_cb)CvXSUBANY (GvCV (gv)).any_ptr) (aTHX_ &slf_frame, GvCV (gv), arg, items); 1901 /* now call the init function, which needs to set up slf_frame */
1902 ((coro_slf_cb)CvXSUBANY (GvCV (gv)).any_ptr)
1903 (aTHX_ &slf_frame, GvCV (gv), arg, items);
1928 } 1904 }
1929 1905
1930 /* now interpret the slf_frame */ 1906 /* now that we have a slf_frame, interpret it! */
1931 /* we use a callback system not to make the code needlessly */ 1907 /* we use a callback system not to make the code needlessly */
1932 /* complicated, but so we can run multiple perl coros from one cctx */ 1908 /* complicated, but so we can run multiple perl coros from one cctx */
1933 1909
1934 do 1910 do
1935 { 1911 {
1945 { 1921 {
1946 dSP; 1922 dSP;
1947 SV **bot = PL_stack_base + checkmark; 1923 SV **bot = PL_stack_base + checkmark;
1948 int gimme = GIMME_V; 1924 int gimme = GIMME_V;
1949 1925
1950 slf_frame.prepare = 0; /* signal pp_slf that we need a new frame */ 1926 slf_frame.prepare = 0; /* invalidate the frame, so it gets initialised again next time */
1951 1927
1952 /* make sure we put something on the stack in scalar context */ 1928 /* make sure we put something on the stack in scalar context */
1953 if (gimme == G_SCALAR) 1929 if (gimme == G_SCALAR)
1954 { 1930 {
1955 if (sp == bot) 1931 if (sp == bot)
1982 1958
1983 /* we patch the op, and then re-run the whole call */ 1959 /* we patch the op, and then re-run the whole call */
1984 /* we have to put the same argument on the stack for this to work */ 1960 /* we have to put the same argument on the stack for this to work */
1985 /* and this will be done by pp_restore */ 1961 /* and this will be done by pp_restore */
1986 slf_restore.op_next = (OP *)&slf_restore; 1962 slf_restore.op_next = (OP *)&slf_restore;
1987 slf_restore.op_type = OP_NULL; 1963 slf_restore.op_type = OP_CUSTOM;
1988 slf_restore.op_ppaddr = pp_restore; 1964 slf_restore.op_ppaddr = pp_restore;
1989 slf_restore.op_first = PL_op; 1965 slf_restore.op_first = PL_op;
1990 1966
1991 slf_arg0 = items > 0 ? SvREFCNT_inc (arg [0]) : 0; 1967 slf_arg0 = items > 0 ? SvREFCNT_inc (arg [0]) : 0;
1992 slf_arg1 = items > 1 ? SvREFCNT_inc (arg [1]) : 0; 1968 slf_arg1 = items > 1 ? SvREFCNT_inc (arg [1]) : 0;
1997 PL_op = (OP *)&slf_restore; 1973 PL_op = (OP *)&slf_restore;
1998} 1974}
1999 1975
2000/*****************************************************************************/ 1976/*****************************************************************************/
2001 1977
1978static void
1979coro_semaphore_adjust (AV *av, int adjust)
1980{
1981 SV *count_sv = AvARRAY (av)[0];
1982 IV count = SvIVX (count_sv);
1983
1984 count += adjust;
1985 SvIVX (count_sv) = count;
1986
1987 /* now wake up as many waiters as possible */
1988 while (count > 0 && AvFILLp (av) >= count)
1989 {
1990 SV *cb;
1991
1992 /* swap first two elements so we can shift a waiter */
1993 AvARRAY (av)[0] = AvARRAY (av)[1];
1994 AvARRAY (av)[1] = count_sv;
1995 cb = av_shift (av);
1996
1997 if (SvOBJECT (cb))
1998 api_ready (aTHX_ cb);
1999 else
2000 croak ("callbacks not yet supported");
2001
2002 SvREFCNT_dec (cb);
2003
2004 --count;
2005 }
2006}
2007
2008static void
2009coro_semaphore_on_destroy (pTHX_ struct coro *coro)
2010{
2011 /* call $sem->adjust (0) to possibly wake up some waiters */
2012 coro_semaphore_adjust ((AV *)coro->slf_frame.data, 0);
2013}
2014
2002static int 2015static int
2003slf_check_semaphore_down (pTHX_ struct CoroSLF *frame) 2016slf_check_semaphore_down (pTHX_ struct CoroSLF *frame)
2004{ 2017{
2005 AV *av = (AV *)frame->data; 2018 AV *av = (AV *)frame->data;
2006 SV *count_sv = AvARRAY (av)[0]; 2019 SV *count_sv = AvARRAY (av)[0];
2007 2020
2008 if (SvIVX (count_sv) > 0) 2021 if (SvIVX (count_sv) > 0)
2009 { 2022 {
2023 SvSTATE (coro_current)->on_destroy = 0;
2010 SvIVX (count_sv) = SvIVX (count_sv) - 1; 2024 SvIVX (count_sv) = SvIVX (count_sv) - 1;
2011 return 0; 2025 return 0;
2012 } 2026 }
2013 else 2027 else
2014 { 2028 {
2040 { 2054 {
2041 av_push (av, SvREFCNT_inc (SvRV (coro_current))); 2055 av_push (av, SvREFCNT_inc (SvRV (coro_current)));
2042 2056
2043 frame->data = (void *)sv_2mortal (SvREFCNT_inc ((SV *)av)); 2057 frame->data = (void *)sv_2mortal (SvREFCNT_inc ((SV *)av));
2044 frame->prepare = prepare_schedule; 2058 frame->prepare = prepare_schedule;
2059
2060 /* to avoid race conditions when a woken-up coro gets terminated */
2061 /* we arrange for a temporary on_destroy that calls adjust (0) */
2062 SvSTATE (coro_current)->on_destroy = coro_semaphore_on_destroy;
2045 } 2063 }
2046 2064
2047 frame->check = slf_check_semaphore_down; 2065 frame->check = slf_check_semaphore_down;
2048 2066
2049} 2067}
2107 main_mainstack = PL_mainstack; 2125 main_mainstack = PL_mainstack;
2108 main_top_env = PL_top_env; 2126 main_top_env = PL_top_env;
2109 2127
2110 while (main_top_env->je_prev) 2128 while (main_top_env->je_prev)
2111 main_top_env = main_top_env->je_prev; 2129 main_top_env = main_top_env->je_prev;
2130
2131 {
2132 SV *slf = sv_2mortal (newSViv (PTR2IV (pp_slf)));
2133
2134 if (!PL_custom_op_names) PL_custom_op_names = newHV ();
2135 hv_store_ent (PL_custom_op_names, slf,
2136 newSVpv ("coro_slf", 0), 0);
2137
2138 if (!PL_custom_op_descs) PL_custom_op_descs = newHV ();
2139 hv_store_ent (PL_custom_op_descs, slf,
2140 newSVpv ("coro schedule like function", 0), 0);
2141 }
2112 2142
2113 coroapi.ver = CORO_API_VERSION; 2143 coroapi.ver = CORO_API_VERSION;
2114 coroapi.rev = CORO_API_REVISION; 2144 coroapi.rev = CORO_API_REVISION;
2115 2145
2116 coroapi.transfer = api_transfer; 2146 coroapi.transfer = api_transfer;
2427 2457
2428void 2458void
2429_set_readyhook (SV *hook) 2459_set_readyhook (SV *hook)
2430 PROTOTYPE: $ 2460 PROTOTYPE: $
2431 CODE: 2461 CODE:
2432 LOCK;
2433 SvREFCNT_dec (coro_readyhook); 2462 SvREFCNT_dec (coro_readyhook);
2434 coro_readyhook = SvOK (hook) ? newSVsv (hook) : 0; 2463 coro_readyhook = SvOK (hook) ? newSVsv (hook) : 0;
2435 UNLOCK;
2436 2464
2437int 2465int
2438prio (Coro::State coro, int newprio = 0) 2466prio (Coro::State coro, int newprio = 0)
2439 PROTOTYPE: $;$ 2467 PROTOTYPE: $;$
2440 ALIAS: 2468 ALIAS:
2656void 2684void
2657up (SV *self, int adjust = 1) 2685up (SV *self, int adjust = 1)
2658 ALIAS: 2686 ALIAS:
2659 adjust = 1 2687 adjust = 1
2660 CODE: 2688 CODE:
2661{ 2689 coro_semaphore_adjust ((AV *)SvRV (self), ix ? adjust : 1);
2662 AV *av = (AV *)SvRV (self);
2663 SV *count_sv = AvARRAY (av)[0];
2664 IV count = SvIVX (count_sv);
2665
2666 count += ix ? adjust : 1;
2667 SvIVX (count_sv) = count;
2668
2669 /* now wake up as many waiters as possible */
2670 while (count > 0 && AvFILLp (av) >= count)
2671 {
2672 SV *cb;
2673
2674 /* swap first two elements so we can shift a waiter */
2675 AvARRAY (av)[0] = AvARRAY (av)[1];
2676 AvARRAY (av)[1] = count_sv;
2677 cb = av_shift (av);
2678
2679 if (SvOBJECT (cb))
2680 api_ready (aTHX_ cb);
2681 else
2682 croak ("callbacks not yet supported");
2683
2684 SvREFCNT_dec (cb);
2685 }
2686}
2687 2690
2688void 2691void
2689down (SV *self) 2692down (SV *self)
2690 CODE: 2693 CODE:
2691 api_execute_slf (aTHX_ cv, slf_init_semaphore_down, &ST (0), 1); 2694 api_execute_slf (aTHX_ cv, slf_init_semaphore_down, &ST (0), 1);

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines