ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/Coro/Coro/State.xs
(Generate patch)

Comparing Coro/Coro/State.xs (file contents):
Revision 1.276 by root, Sat Nov 15 07:49:48 2008 UTC vs.
Revision 1.280 by root, Sun Nov 16 09:43:18 2008 UTC

142#define NOINLINE attribute ((noinline)) 142#define NOINLINE attribute ((noinline))
143 143
144#include "CoroAPI.h" 144#include "CoroAPI.h"
145 145
146#ifdef USE_ITHREADS 146#ifdef USE_ITHREADS
147
148static perl_mutex coro_lock;
149# define LOCK do { MUTEX_LOCK (&coro_lock); } while (0)
150# define UNLOCK do { MUTEX_UNLOCK (&coro_lock); } while (0)
151# if CORO_PTHREAD 147# if CORO_PTHREAD
152static void *coro_thx; 148static void *coro_thx;
153# endif 149# endif
154
155#else
156
157# define LOCK (void)0
158# define UNLOCK (void)0
159
160#endif 150#endif
161
162# undef LOCK
163# define LOCK (void)0
164# undef UNLOCK
165# define UNLOCK (void)0
166 151
167/* helper storage struct for Coro::AIO */ 152/* helper storage struct for Coro::AIO */
168struct io_state 153struct io_state
169{ 154{
170 AV *res; 155 AV *res;
266 251
267 AV *args; /* data associated with this coroutine (initial args) */ 252 AV *args; /* data associated with this coroutine (initial args) */
268 int refcnt; /* coroutines are refcounted, yes */ 253 int refcnt; /* coroutines are refcounted, yes */
269 int flags; /* CF_ flags */ 254 int flags; /* CF_ flags */
270 HV *hv; /* the perl hash associated with this coro, if any */ 255 HV *hv; /* the perl hash associated with this coro, if any */
256 void (*on_destroy)(pTHX_ struct coro *coro);
271 257
272 /* statistics */ 258 /* statistics */
273 int usecount; /* number of transfers to this coro */ 259 int usecount; /* number of transfers to this coro */
274 260
275 /* coro process data */ 261 /* coro process data */
811 797
812 return orig_sigelem_set ? orig_sigelem_set (aTHX_ sv, mg) : 0; 798 return orig_sigelem_set ? orig_sigelem_set (aTHX_ sv, mg) : 0;
813} 799}
814 800
815static void 801static void
816prepare_nop (aTHX_ struct coro_transfer_args *ta) 802prepare_nop (pTHX_ struct coro_transfer_args *ta)
817{ 803{
818 /* kind of mega-hacky, but works */ 804 /* kind of mega-hacky, but works */
819 ta->next = ta->prev = (struct coro *)ta; 805 ta->next = ta->prev = (struct coro *)ta;
820} 806}
821 807
822static int 808static int
823slf_check_nop (aTHX) 809slf_check_nop (pTHX_ struct CoroSLF *frame)
824{ 810{
825 return 0; 811 return 0;
826} 812}
827 813
828static void 814static void
1093 struct coro *next = (struct coro *)transfer_next; 1079 struct coro *next = (struct coro *)transfer_next;
1094 assert (!(transfer_next = 0)); /* just used for the side effect when asserts are enabled */ 1080 assert (!(transfer_next = 0)); /* just used for the side effect when asserts are enabled */
1095 assert (("FATAL: next coroutine was zero in transfer_tail (please report)", next)); 1081 assert (("FATAL: next coroutine was zero in transfer_tail (please report)", next));
1096 1082
1097 free_coro_mortal (aTHX); 1083 free_coro_mortal (aTHX);
1098 UNLOCK;
1099 1084
1100 if (expect_false (next->throw)) 1085 if (expect_false (next->throw))
1101 { 1086 {
1102 SV *exception = sv_2mortal (next->throw); 1087 SV *exception = sv_2mortal (next->throw);
1103 1088
1320 dSTACKLEVEL; 1305 dSTACKLEVEL;
1321 1306
1322 /* sometimes transfer is only called to set idle_sp */ 1307 /* sometimes transfer is only called to set idle_sp */
1323 if (expect_false (!next)) 1308 if (expect_false (!next))
1324 { 1309 {
1325 ((coro_cctx *)prev)->idle_sp = stacklevel; 1310 ((coro_cctx *)prev)->idle_sp = (void *)stacklevel;
1326 assert (((coro_cctx *)prev)->idle_te = PL_top_env); /* just for the side-effect when asserts are enabled */ 1311 assert (((coro_cctx *)prev)->idle_te = PL_top_env); /* just for the side-effect when asserts are enabled */
1327 } 1312 }
1328 else if (expect_true (prev != next)) 1313 else if (expect_true (prev != next))
1329 { 1314 {
1330 coro_cctx *prev__cctx; 1315 coro_cctx *prev__cctx;
1337 prev->flags |= CF_RUNNING; 1322 prev->flags |= CF_RUNNING;
1338 } 1323 }
1339 1324
1340 prev->flags &= ~CF_RUNNING; 1325 prev->flags &= ~CF_RUNNING;
1341 next->flags |= CF_RUNNING; 1326 next->flags |= CF_RUNNING;
1342
1343 LOCK;
1344 1327
1345 /* first get rid of the old state */ 1328 /* first get rid of the old state */
1346 save_perl (aTHX_ prev); 1329 save_perl (aTHX_ prev);
1347 1330
1348 if (expect_false (next->flags & CF_NEW)) 1331 if (expect_false (next->flags & CF_NEW))
1357 1340
1358 prev__cctx = prev->cctx; 1341 prev__cctx = prev->cctx;
1359 1342
1360 /* possibly untie and reuse the cctx */ 1343 /* possibly untie and reuse the cctx */
1361 if (expect_true ( 1344 if (expect_true (
1362 prev__cctx->idle_sp == stacklevel 1345 prev__cctx->idle_sp == (void *)stacklevel
1363 && !(prev__cctx->flags & CC_TRACE) 1346 && !(prev__cctx->flags & CC_TRACE)
1364 && !force_cctx 1347 && !force_cctx
1365 )) 1348 ))
1366 { 1349 {
1367 /* I assume that stacklevel is a stronger indicator than PL_top_env changes */ 1350 /* I assume that stacklevel is a stronger indicator than PL_top_env changes */
1406coro_state_destroy (pTHX_ struct coro *coro) 1389coro_state_destroy (pTHX_ struct coro *coro)
1407{ 1390{
1408 if (coro->flags & CF_DESTROYED) 1391 if (coro->flags & CF_DESTROYED)
1409 return 0; 1392 return 0;
1410 1393
1394 if (coro->on_destroy)
1395 coro->on_destroy (aTHX_ coro);
1396
1411 coro->flags |= CF_DESTROYED; 1397 coro->flags |= CF_DESTROYED;
1412 1398
1413 if (coro->flags & CF_READY) 1399 if (coro->flags & CF_READY)
1414 { 1400 {
1415 /* reduce nready, as destroying a ready coro effectively unreadies it */ 1401 /* reduce nready, as destroying a ready coro effectively unreadies it */
1416 /* alternative: look through all ready queues and remove the coro */ 1402 /* alternative: look through all ready queues and remove the coro */
1417 LOCK;
1418 --coro_nready; 1403 --coro_nready;
1419 UNLOCK;
1420 } 1404 }
1421 else 1405 else
1422 coro->flags |= CF_READY; /* make sure it is NOT put into the readyqueue */ 1406 coro->flags |= CF_READY; /* make sure it is NOT put into the readyqueue */
1423 1407
1424 if (coro->mainstack && coro->mainstack != main_mainstack) 1408 if (coro->mainstack && coro->mainstack != main_mainstack)
1537 if (coro->flags & CF_READY) 1521 if (coro->flags & CF_READY)
1538 return 0; 1522 return 0;
1539 1523
1540 coro->flags |= CF_READY; 1524 coro->flags |= CF_READY;
1541 1525
1542 LOCK;
1543
1544 sv_hook = coro_nready ? 0 : coro_readyhook; 1526 sv_hook = coro_nready ? 0 : coro_readyhook;
1545 xs_hook = coro_nready ? 0 : coroapi.readyhook; 1527 xs_hook = coro_nready ? 0 : coroapi.readyhook;
1546 1528
1547 coro_enq (aTHX_ SvREFCNT_inc_NN (coro_sv)); 1529 coro_enq (aTHX_ SvREFCNT_inc_NN (coro_sv));
1548 ++coro_nready; 1530 ++coro_nready;
1549 1531
1550 UNLOCK;
1551
1552 if (sv_hook) 1532 if (sv_hook)
1553 { 1533 {
1554 dSP; 1534 dSP;
1555 1535
1556 ENTER; 1536 ENTER;
1582{ 1562{
1583 SV *prev_sv, *next_sv; 1563 SV *prev_sv, *next_sv;
1584 1564
1585 for (;;) 1565 for (;;)
1586 { 1566 {
1587 LOCK;
1588 next_sv = coro_deq (aTHX); 1567 next_sv = coro_deq (aTHX);
1589 1568
1590 /* nothing to schedule: call the idle handler */ 1569 /* nothing to schedule: call the idle handler */
1591 if (expect_false (!next_sv)) 1570 if (expect_false (!next_sv))
1592 { 1571 {
1593 dSP; 1572 dSP;
1594 UNLOCK;
1595 1573
1596 ENTER; 1574 ENTER;
1597 SAVETMPS; 1575 SAVETMPS;
1598 1576
1599 PUSHMARK (SP); 1577 PUSHMARK (SP);
1609 ta->next = SvSTATE (next_sv); 1587 ta->next = SvSTATE (next_sv);
1610 1588
1611 /* cannot transfer to destroyed coros, skip and look for next */ 1589 /* cannot transfer to destroyed coros, skip and look for next */
1612 if (expect_false (ta->next->flags & CF_DESTROYED)) 1590 if (expect_false (ta->next->flags & CF_DESTROYED))
1613 { 1591 {
1614 UNLOCK;
1615 SvREFCNT_dec (next_sv); 1592 SvREFCNT_dec (next_sv);
1616 /* coro_nready has already been taken care of by destroy */ 1593 /* coro_nready has already been taken care of by destroy */
1617 continue; 1594 continue;
1618 } 1595 }
1619 1596
1620 --coro_nready; 1597 --coro_nready;
1621 UNLOCK;
1622 break; 1598 break;
1623 } 1599 }
1624 1600
1625 /* free this only after the transfer */ 1601 /* free this only after the transfer */
1626 prev_sv = SvRV (coro_current); 1602 prev_sv = SvRV (coro_current);
1628 TRANSFER_CHECK (*ta); 1604 TRANSFER_CHECK (*ta);
1629 assert (("FATAL: next coroutine isn't marked as ready in Coro (please report)", ta->next->flags & CF_READY)); 1605 assert (("FATAL: next coroutine isn't marked as ready in Coro (please report)", ta->next->flags & CF_READY));
1630 ta->next->flags &= ~CF_READY; 1606 ta->next->flags &= ~CF_READY;
1631 SvRV_set (coro_current, next_sv); 1607 SvRV_set (coro_current, next_sv);
1632 1608
1633 LOCK;
1634 free_coro_mortal (aTHX); 1609 free_coro_mortal (aTHX);
1635 coro_mortal = prev_sv; 1610 coro_mortal = prev_sv;
1636 UNLOCK;
1637} 1611}
1638 1612
1639INLINE void 1613INLINE void
1640prepare_cede (pTHX_ struct coro_transfer_args *ta) 1614prepare_cede (pTHX_ struct coro_transfer_args *ta)
1641{ 1615{
1719 PL_runops = RUNOPS_DEFAULT; 1693 PL_runops = RUNOPS_DEFAULT;
1720 else 1694 else
1721 coro->slot->runops = RUNOPS_DEFAULT; 1695 coro->slot->runops = RUNOPS_DEFAULT;
1722 } 1696 }
1723} 1697}
1724
1725#if 0
1726static int
1727coro_gensub_free (pTHX_ SV *sv, MAGIC *mg)
1728{
1729 AV *padlist;
1730 AV *av = (AV *)mg->mg_obj;
1731
1732 abort ();
1733
1734 return 0;
1735}
1736
1737static MGVTBL coro_gensub_vtbl = {
1738 0, 0, 0, 0,
1739 coro_gensub_free
1740};
1741#endif
1742 1698
1743/*****************************************************************************/ 1699/*****************************************************************************/
1744/* PerlIO::cede */ 1700/* PerlIO::cede */
1745 1701
1746typedef struct 1702typedef struct
1848{ 1804{
1849 prepare_set_stacklevel (ta, (struct coro_cctx *)slf_frame.data); 1805 prepare_set_stacklevel (ta, (struct coro_cctx *)slf_frame.data);
1850} 1806}
1851 1807
1852static void 1808static void
1853slf_init_set_stacklevel (pTHX_ struct CoroSLF *frame, SV **arg, int items) 1809slf_init_set_stacklevel (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
1854{ 1810{
1855 assert (("FATAL: set_stacklevel needs the coro cctx as sole argument", items == 1)); 1811 assert (("FATAL: set_stacklevel needs the coro cctx as sole argument", items == 1));
1856 1812
1857 frame->prepare = slf_prepare_set_stacklevel; 1813 frame->prepare = slf_prepare_set_stacklevel;
1858 frame->check = slf_check_nop; 1814 frame->check = slf_check_nop;
1862static void 1818static void
1863slf_prepare_transfer (pTHX_ struct coro_transfer_args *ta) 1819slf_prepare_transfer (pTHX_ struct coro_transfer_args *ta)
1864{ 1820{
1865 SV **arg = (SV **)slf_frame.data; 1821 SV **arg = (SV **)slf_frame.data;
1866 1822
1867 prepare_transfer (ta, arg [0], arg [1]); 1823 prepare_transfer (aTHX_ ta, arg [0], arg [1]);
1868} 1824}
1869 1825
1870static void 1826static void
1871slf_init_transfer (pTHX_ struct CoroSLF *frame, SV **arg, int items) 1827slf_init_transfer (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
1872{ 1828{
1873 if (items != 2) 1829 if (items != 2)
1874 croak ("Coro::State::transfer (prev, next) expects two arguments, not %d,", items); 1830 croak ("Coro::State::transfer (prev, next) expects two arguments, not %d,", items);
1875 1831
1876 frame->prepare = slf_prepare_transfer; 1832 frame->prepare = slf_prepare_transfer;
1877 frame->check = slf_check_nop; 1833 frame->check = slf_check_nop;
1878 frame->data = (void *)arg; /* let's hope it will stay valid */ 1834 frame->data = (void *)arg; /* let's hope it will stay valid */
1879} 1835}
1880 1836
1881static void 1837static void
1882slf_init_schedule (pTHX_ struct CoroSLF *frame, SV **arg, int items) 1838slf_init_schedule (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
1883{ 1839{
1884 frame->prepare = prepare_schedule; 1840 frame->prepare = prepare_schedule;
1885 frame->check = slf_check_nop; 1841 frame->check = slf_check_nop;
1886} 1842}
1887 1843
1888static void 1844static void
1889slf_init_cede (pTHX_ struct CoroSLF *frame, SV **arg, int items) 1845slf_init_cede (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
1890{ 1846{
1891 frame->prepare = prepare_cede; 1847 frame->prepare = prepare_cede;
1892 frame->check = slf_check_nop; 1848 frame->check = slf_check_nop;
1893} 1849}
1894 1850
1895static void 1851static void
1896slf_init_cede_notself (pTHX_ struct CoroSLF *frame, SV **arg, int items) 1852slf_init_cede_notself (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
1897{ 1853{
1898 frame->prepare = prepare_cede_notself; 1854 frame->prepare = prepare_cede_notself;
1899 frame->check = slf_check_nop; 1855 frame->check = slf_check_nop;
1900} 1856}
1901 1857
1940 items = AvFILLp (av) + 1; 1896 items = AvFILLp (av) + 1;
1941 } 1897 }
1942 1898
1943 PUTBACK; 1899 PUTBACK;
1944 1900
1901 /* now call the init function, which needs to set up slf_frame */
1945 ((coro_slf_cb)CvXSUBANY (GvCV (gv)).any_ptr) (aTHX_ &slf_frame, arg, items); 1902 ((coro_slf_cb)CvXSUBANY (GvCV (gv)).any_ptr)
1903 (aTHX_ &slf_frame, GvCV (gv), arg, items);
1946 } 1904 }
1947 1905
1948 /* now interpret the slf_frame */ 1906 /* now that we have a slf_frame, interpret it! */
1949 /* we use a callback system not to make the code needlessly */ 1907 /* we use a callback system not to make the code needlessly */
1950 /* complicated, but so we can run multiple perl coros from one cctx */ 1908 /* complicated, but so we can run multiple perl coros from one cctx */
1951 1909
1952 do 1910 do
1953 { 1911 {
1963 { 1921 {
1964 dSP; 1922 dSP;
1965 SV **bot = PL_stack_base + checkmark; 1923 SV **bot = PL_stack_base + checkmark;
1966 int gimme = GIMME_V; 1924 int gimme = GIMME_V;
1967 1925
1968 slf_frame.prepare = 0; /* signal pp_slf that we need a new frame */ 1926 slf_frame.prepare = 0; /* invalidate the frame, so it gets initialised again next time */
1969 1927
1970 /* make sure we put something on the stack in scalar context */ 1928 /* make sure we put something on the stack in scalar context */
1971 if (gimme == G_SCALAR) 1929 if (gimme == G_SCALAR)
1972 { 1930 {
1973 if (sp == bot) 1931 if (sp == bot)
2000 1958
2001 /* we patch the op, and then re-run the whole call */ 1959 /* we patch the op, and then re-run the whole call */
2002 /* we have to put the same argument on the stack for this to work */ 1960 /* we have to put the same argument on the stack for this to work */
2003 /* and this will be done by pp_restore */ 1961 /* and this will be done by pp_restore */
2004 slf_restore.op_next = (OP *)&slf_restore; 1962 slf_restore.op_next = (OP *)&slf_restore;
2005 slf_restore.op_type = OP_NULL; 1963 slf_restore.op_type = OP_CUSTOM;
2006 slf_restore.op_ppaddr = pp_restore; 1964 slf_restore.op_ppaddr = pp_restore;
2007 slf_restore.op_first = PL_op; 1965 slf_restore.op_first = PL_op;
2008 1966
2009 slf_arg0 = items > 0 ? SvREFCNT_inc (arg [0]) : 0; 1967 slf_arg0 = items > 0 ? SvREFCNT_inc (arg [0]) : 0;
2010 slf_arg1 = items > 1 ? SvREFCNT_inc (arg [1]) : 0; 1968 slf_arg1 = items > 1 ? SvREFCNT_inc (arg [1]) : 0;
2015 PL_op = (OP *)&slf_restore; 1973 PL_op = (OP *)&slf_restore;
2016} 1974}
2017 1975
2018/*****************************************************************************/ 1976/*****************************************************************************/
2019 1977
1978static void
1979coro_semaphore_adjust (AV *av, int adjust)
1980{
1981 SV *count_sv = AvARRAY (av)[0];
1982 IV count = SvIVX (count_sv);
1983
1984 count += adjust;
1985 SvIVX (count_sv) = count;
1986
1987 /* now wake up as many waiters as possible */
1988 while (count > 0 && AvFILLp (av) >= count)
1989 {
1990 SV *cb;
1991
1992 /* swap first two elements so we can shift a waiter */
1993 AvARRAY (av)[0] = AvARRAY (av)[1];
1994 AvARRAY (av)[1] = count_sv;
1995 cb = av_shift (av);
1996
1997 if (SvOBJECT (cb))
1998 api_ready (aTHX_ cb);
1999 else
2000 croak ("callbacks not yet supported");
2001
2002 SvREFCNT_dec (cb);
2003
2004 --count;
2005 }
2006}
2007
2008static void
2009coro_semaphore_on_destroy (pTHX_ struct coro *coro)
2010{
2011 /* call $sem->adjust (0) to possibly wake up some waiters */
2012 coro_semaphore_adjust ((AV *)coro->slf_frame.data, 0);
2013}
2014
2020static int 2015static int
2021slf_check_semaphore_down (pTHX_ struct CoroSLF *frame) 2016slf_check_semaphore_down (pTHX_ struct CoroSLF *frame)
2022{ 2017{
2023 AV *av = (AV *)frame->data; 2018 AV *av = (AV *)frame->data;
2024 SV *count_sv = AvARRAY (av)[0]; 2019 SV *count_sv = AvARRAY (av)[0];
2025 2020
2026 if (SvIVX (count_sv) > 0) 2021 if (SvIVX (count_sv) > 0)
2027 { 2022 {
2023 SvSTATE (coro_current)->on_destroy = 0;
2028 SvIVX (count_sv) = SvIVX (count_sv) - 1; 2024 SvIVX (count_sv) = SvIVX (count_sv) - 1;
2029 return 0; 2025 return 0;
2030 } 2026 }
2031 else 2027 else
2032 { 2028 {
2043 return 1; 2039 return 1;
2044 } 2040 }
2045} 2041}
2046 2042
2047static void 2043static void
2048slf_init_semaphore_down (pTHX_ struct CoroSLF *frame, SV **arg, int items) 2044slf_init_semaphore_down (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2049{ 2045{
2050 AV *av = (AV *)SvRV (arg [0]); 2046 AV *av = (AV *)SvRV (arg [0]);
2051 2047
2052 if (SvIVX (AvARRAY (av)[0]) > 0) 2048 if (SvIVX (AvARRAY (av)[0]) > 0)
2053 { 2049 {
2058 { 2054 {
2059 av_push (av, SvREFCNT_inc (SvRV (coro_current))); 2055 av_push (av, SvREFCNT_inc (SvRV (coro_current)));
2060 2056
2061 frame->data = (void *)sv_2mortal (SvREFCNT_inc ((SV *)av)); 2057 frame->data = (void *)sv_2mortal (SvREFCNT_inc ((SV *)av));
2062 frame->prepare = prepare_schedule; 2058 frame->prepare = prepare_schedule;
2059
2060 /* to avoid race conditions when a woken-up coro gets terminated */
2061 /* we arrange for a temporary on_destroy that calls adjust (0) */
2062 SvSTATE (coro_current)->on_destroy = coro_semaphore_on_destroy;
2063 } 2063 }
2064 2064
2065 frame->check = slf_check_semaphore_down; 2065 frame->check = slf_check_semaphore_down;
2066 2066
2067} 2067}
2068
2069/*****************************************************************************/
2070
2071#define GENSUB_ARG CvXSUBANY (cv).any_ptr
2072
2073/* create a closure from XS, returns a code reference */
2074/* the arg can be accessed via GENSUB_ARG from the callback */
2075/* the callback must use dXSARGS/XSRETURN */
2076static SV *
2077gensub (pTHX_ void (*xsub)(pTHX_ CV *), void *arg)
2078{
2079 CV *cv = (CV *)NEWSV (0, 0);
2080
2081 sv_upgrade ((SV *)cv, SVt_PVCV);
2082
2083 CvANON_on (cv);
2084 CvISXSUB_on (cv);
2085 CvXSUB (cv) = xsub;
2086 GENSUB_ARG = arg;
2087
2088 return newRV_noinc ((SV *)cv);
2089}
2090
2091/*****************************************************************************/
2068 2092
2069MODULE = Coro::State PACKAGE = Coro::State PREFIX = api_ 2093MODULE = Coro::State PACKAGE = Coro::State PREFIX = api_
2070 2094
2071PROTOTYPES: DISABLE 2095PROTOTYPES: DISABLE
2072 2096
2101 main_mainstack = PL_mainstack; 2125 main_mainstack = PL_mainstack;
2102 main_top_env = PL_top_env; 2126 main_top_env = PL_top_env;
2103 2127
2104 while (main_top_env->je_prev) 2128 while (main_top_env->je_prev)
2105 main_top_env = main_top_env->je_prev; 2129 main_top_env = main_top_env->je_prev;
2130
2131 {
2132 SV *slf = sv_2mortal (newSViv (PTR2IV (pp_slf)));
2133
2134 if (!PL_custom_op_names) PL_custom_op_names = newHV ();
2135 hv_store_ent (PL_custom_op_names, slf,
2136 newSVpv ("coro_slf", 0), 0);
2137
2138 if (!PL_custom_op_descs) PL_custom_op_descs = newHV ();
2139 hv_store_ent (PL_custom_op_descs, slf,
2140 newSVpv ("coro schedule like function", 0), 0);
2141 }
2106 2142
2107 coroapi.ver = CORO_API_VERSION; 2143 coroapi.ver = CORO_API_VERSION;
2108 coroapi.rev = CORO_API_REVISION; 2144 coroapi.rev = CORO_API_REVISION;
2109 2145
2110 coroapi.transfer = api_transfer; 2146 coroapi.transfer = api_transfer;
2421 2457
2422void 2458void
2423_set_readyhook (SV *hook) 2459_set_readyhook (SV *hook)
2424 PROTOTYPE: $ 2460 PROTOTYPE: $
2425 CODE: 2461 CODE:
2426 LOCK;
2427 SvREFCNT_dec (coro_readyhook); 2462 SvREFCNT_dec (coro_readyhook);
2428 coro_readyhook = SvOK (hook) ? newSVsv (hook) : 0; 2463 coro_readyhook = SvOK (hook) ? newSVsv (hook) : 0;
2429 UNLOCK;
2430 2464
2431int 2465int
2432prio (Coro::State coro, int newprio = 0) 2466prio (Coro::State coro, int newprio = 0)
2433 PROTOTYPE: $;$ 2467 PROTOTYPE: $;$
2434 ALIAS: 2468 ALIAS:
2536 api_trace (aTHX_ coro_current, 0); 2570 api_trace (aTHX_ coro_current, 0);
2537 2571
2538 av_push (av_async_pool, newSVsv (coro_current)); 2572 av_push (av_async_pool, newSVsv (coro_current));
2539} 2573}
2540 2574
2541#if 0
2542
2543void
2544_generator_call (...)
2545 PROTOTYPE: @
2546 PPCODE:
2547 fprintf (stderr, "call %p\n", CvXSUBANY(cv).any_ptr);
2548 xxxx
2549 abort ();
2550
2551SV *
2552gensub (SV *sub, ...)
2553 PROTOTYPE: &;@
2554 CODE:
2555{
2556 struct coro *coro;
2557 MAGIC *mg;
2558 CV *xcv;
2559 CV *ncv = (CV *)newSV_type (SVt_PVCV);
2560 int i;
2561
2562 CvGV (ncv) = CvGV (cv);
2563 CvFILE (ncv) = CvFILE (cv);
2564
2565 Newz (0, coro, 1, struct coro);
2566 coro->args = newAV ();
2567 coro->flags = CF_NEW;
2568
2569 av_extend (coro->args, items - 1);
2570 for (i = 1; i < items; i++)
2571 av_push (coro->args, newSVsv (ST (i)));
2572
2573 CvISXSUB_on (ncv);
2574 CvXSUBANY (ncv).any_ptr = (void *)coro;
2575
2576 xcv = GvCV (gv_fetchpv ("Coro::_generator_call", 0, SVt_PVCV));
2577
2578 CvXSUB (ncv) = CvXSUB (xcv);
2579 CvANON_on (ncv);
2580
2581 mg = sv_magicext ((SV *)ncv, 0, CORO_MAGIC_type_state, &coro_gensub_vtbl, (char *)coro, 0);
2582 RETVAL = newRV_noinc ((SV *)ncv);
2583}
2584 OUTPUT:
2585 RETVAL
2586
2587#endif
2588
2589 2575
2590MODULE = Coro::State PACKAGE = Coro::AIO 2576MODULE = Coro::State PACKAGE = Coro::AIO
2591 2577
2592void 2578void
2593_get_state (SV *self) 2579_get_state (SV *self)
2642MODULE = Coro::State PACKAGE = Coro::AnyEvent 2628MODULE = Coro::State PACKAGE = Coro::AnyEvent
2643 2629
2644BOOT: 2630BOOT:
2645 sv_activity = coro_get_sv (aTHX_ "Coro::AnyEvent::ACTIVITY", TRUE); 2631 sv_activity = coro_get_sv (aTHX_ "Coro::AnyEvent::ACTIVITY", TRUE);
2646 2632
2647SV * 2633void
2648_schedule (...) 2634_schedule (...)
2649 CODE: 2635 CODE:
2650{ 2636{
2651 static int incede; 2637 static int incede;
2652 2638
2698void 2684void
2699up (SV *self, int adjust = 1) 2685up (SV *self, int adjust = 1)
2700 ALIAS: 2686 ALIAS:
2701 adjust = 1 2687 adjust = 1
2702 CODE: 2688 CODE:
2703{ 2689 coro_semaphore_adjust ((AV *)SvRV (self), ix ? adjust : 1);
2704 AV *av = (AV *)SvRV (self);
2705 SV *count_sv = AvARRAY (av)[0];
2706 IV count = SvIVX (count_sv);
2707
2708 count += ix ? adjust : 1;
2709 SvIVX (count_sv) = count;
2710
2711 /* now wake up as many waiters as possible */
2712 while (count > 0 && AvFILLp (av) >= count)
2713 {
2714 SV *cb;
2715
2716 /* swap first two elements so we can shift a waiter */
2717 AvARRAY (av)[0] = AvARRAY (av)[1];
2718 AvARRAY (av)[1] = count_sv;
2719 cb = av_shift (av);
2720
2721 if (SvOBJECT (cb))
2722 api_ready (cb);
2723 else
2724 croak ("callbacks not yet supported");
2725
2726 SvREFCNT_dec (cb);
2727 }
2728}
2729 2690
2730void 2691void
2731down (SV *self) 2692down (SV *self)
2732 CODE: 2693 CODE:
2733 api_execute_slf (aTHX_ cv, slf_init_semaphore_down, &ST (0), 1); 2694 api_execute_slf (aTHX_ cv, slf_init_semaphore_down, &ST (0), 1);

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines