ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/Coro/Coro/State.xs
(Generate patch)

Comparing Coro/Coro/State.xs (file contents):
Revision 1.273 by root, Fri Nov 14 23:48:10 2008 UTC vs.
Revision 1.282 by root, Sun Nov 16 10:33:08 2008 UTC

142#define NOINLINE attribute ((noinline)) 142#define NOINLINE attribute ((noinline))
143 143
144#include "CoroAPI.h" 144#include "CoroAPI.h"
145 145
146#ifdef USE_ITHREADS 146#ifdef USE_ITHREADS
147
148static perl_mutex coro_lock;
149# define LOCK do { MUTEX_LOCK (&coro_lock); } while (0)
150# define UNLOCK do { MUTEX_UNLOCK (&coro_lock); } while (0)
151# if CORO_PTHREAD 147# if CORO_PTHREAD
152static void *coro_thx; 148static void *coro_thx;
153# endif 149# endif
154
155#else
156
157# define LOCK (void)0
158# define UNLOCK (void)0
159
160#endif 150#endif
161
162# undef LOCK
163# define LOCK (void)0
164# undef UNLOCK
165# define UNLOCK (void)0
166 151
167/* helper storage struct for Coro::AIO */ 152/* helper storage struct for Coro::AIO */
168struct io_state 153struct io_state
169{ 154{
170 AV *res; 155 AV *res;
266 251
267 AV *args; /* data associated with this coroutine (initial args) */ 252 AV *args; /* data associated with this coroutine (initial args) */
268 int refcnt; /* coroutines are refcounted, yes */ 253 int refcnt; /* coroutines are refcounted, yes */
269 int flags; /* CF_ flags */ 254 int flags; /* CF_ flags */
270 HV *hv; /* the perl hash associated with this coro, if any */ 255 HV *hv; /* the perl hash associated with this coro, if any */
256 void (*on_destroy)(pTHX_ struct coro *coro);
271 257
272 /* statistics */ 258 /* statistics */
273 int usecount; /* number of transfers to this coro */ 259 int usecount; /* number of transfers to this coro */
274 260
275 /* coro process data */ 261 /* coro process data */
403static MGVTBL coro_cv_vtbl = { 389static MGVTBL coro_cv_vtbl = {
404 0, 0, 0, 0, 390 0, 0, 0, 0,
405 coro_cv_free 391 coro_cv_free
406}; 392};
407 393
408#define CORO_MAGIC(sv, type) \ 394#define CORO_MAGIC(sv, type) \
409 SvMAGIC (sv) \ 395 expect_true (SvMAGIC (sv)) \
410 ? SvMAGIC (sv)->mg_type == type \ 396 ? expect_true (SvMAGIC (sv)->mg_type == type) \
411 ? SvMAGIC (sv) \ 397 ? SvMAGIC (sv) \
412 : mg_find (sv, type) \ 398 : mg_find (sv, type) \
413 : 0 399 : 0
414 400
415#define CORO_MAGIC_cv(cv) CORO_MAGIC (((SV *)(cv)), CORO_MAGIC_type_cv) 401#define CORO_MAGIC_cv(cv) CORO_MAGIC (((SV *)(cv)), CORO_MAGIC_type_cv)
416#define CORO_MAGIC_state(sv) CORO_MAGIC (((SV *)(sv)), CORO_MAGIC_type_state) 402#define CORO_MAGIC_state(sv) CORO_MAGIC (((SV *)(sv)), CORO_MAGIC_type_state)
417 403
438 mg = CORO_MAGIC_state (coro); 424 mg = CORO_MAGIC_state (coro);
439 return (struct coro *)mg->mg_ptr; 425 return (struct coro *)mg->mg_ptr;
440} 426}
441 427
442#define SvSTATE(sv) SvSTATE_ (aTHX_ (sv)) 428#define SvSTATE(sv) SvSTATE_ (aTHX_ (sv))
429
430/* fastert than SvSTATE, but expects a coroutine hv */
431INLINE struct coro *
432SvSTATE_hv (SV *sv)
433{
434 MAGIC *mg = expect_true (SvMAGIC (sv)->mg_type == CORO_MAGIC_type_state)
435 ? SvMAGIC (sv)
436 : mg_find (sv, CORO_MAGIC_type_state);
437
438 return (struct coro *)mg->mg_ptr;
439}
440
441#define SvSTATE_current SvSTATE_hv (SvRV (coro_current))
443 442
444/* the next two functions merely cache the padlists */ 443/* the next two functions merely cache the padlists */
445static void 444static void
446get_padlist (pTHX_ CV *cv) 445get_padlist (pTHX_ CV *cv)
447{ 446{
811 810
812 return orig_sigelem_set ? orig_sigelem_set (aTHX_ sv, mg) : 0; 811 return orig_sigelem_set ? orig_sigelem_set (aTHX_ sv, mg) : 0;
813} 812}
814 813
815static void 814static void
816prepare_nop (aTHX_ struct coro_transfer_args *ta) 815prepare_nop (pTHX_ struct coro_transfer_args *ta)
817{ 816{
818 /* kind of mega-hacky, but works */ 817 /* kind of mega-hacky, but works */
819 ta->next = ta->prev = (struct coro *)ta; 818 ta->next = ta->prev = (struct coro *)ta;
820} 819}
821 820
822static int 821static int
823slf_check_nop (aTHX) 822slf_check_nop (pTHX_ struct CoroSLF *frame)
824{ 823{
825 return 0; 824 return 0;
826} 825}
827 826
828static void 827static void
929static int 928static int
930runops_trace (pTHX) 929runops_trace (pTHX)
931{ 930{
932 COP *oldcop = 0; 931 COP *oldcop = 0;
933 int oldcxix = -2; 932 int oldcxix = -2;
934 struct coro *coro = SvSTATE (coro_current); /* trace cctx is tied to specific coro */ 933 struct coro *coro = SvSTATE_current; /* trace cctx is tied to specific coro */
935 coro_cctx *cctx = coro->cctx; 934 coro_cctx *cctx = coro->cctx;
936 935
937 while ((PL_op = CALL_FPTR (PL_op->op_ppaddr) (aTHX))) 936 while ((PL_op = CALL_FPTR (PL_op->op_ppaddr) (aTHX)))
938 { 937 {
939 PERL_ASYNC_CHECK (); 938 PERL_ASYNC_CHECK ();
1093 struct coro *next = (struct coro *)transfer_next; 1092 struct coro *next = (struct coro *)transfer_next;
1094 assert (!(transfer_next = 0)); /* just used for the side effect when asserts are enabled */ 1093 assert (!(transfer_next = 0)); /* just used for the side effect when asserts are enabled */
1095 assert (("FATAL: next coroutine was zero in transfer_tail (please report)", next)); 1094 assert (("FATAL: next coroutine was zero in transfer_tail (please report)", next));
1096 1095
1097 free_coro_mortal (aTHX); 1096 free_coro_mortal (aTHX);
1098 UNLOCK;
1099 1097
1100 if (expect_false (next->throw)) 1098 if (expect_false (next->throw))
1101 { 1099 {
1102 SV *exception = sv_2mortal (next->throw); 1100 SV *exception = sv_2mortal (next->throw);
1103 1101
1320 dSTACKLEVEL; 1318 dSTACKLEVEL;
1321 1319
1322 /* sometimes transfer is only called to set idle_sp */ 1320 /* sometimes transfer is only called to set idle_sp */
1323 if (expect_false (!next)) 1321 if (expect_false (!next))
1324 { 1322 {
1325 ((coro_cctx *)prev)->idle_sp = stacklevel; 1323 ((coro_cctx *)prev)->idle_sp = (void *)stacklevel;
1326 assert (((coro_cctx *)prev)->idle_te = PL_top_env); /* just for the side-effect when asserts are enabled */ 1324 assert (((coro_cctx *)prev)->idle_te = PL_top_env); /* just for the side-effect when asserts are enabled */
1327 } 1325 }
1328 else if (expect_true (prev != next)) 1326 else if (expect_true (prev != next))
1329 { 1327 {
1330 coro_cctx *prev__cctx; 1328 coro_cctx *prev__cctx;
1337 prev->flags |= CF_RUNNING; 1335 prev->flags |= CF_RUNNING;
1338 } 1336 }
1339 1337
1340 prev->flags &= ~CF_RUNNING; 1338 prev->flags &= ~CF_RUNNING;
1341 next->flags |= CF_RUNNING; 1339 next->flags |= CF_RUNNING;
1342
1343 LOCK;
1344 1340
1345 /* first get rid of the old state */ 1341 /* first get rid of the old state */
1346 save_perl (aTHX_ prev); 1342 save_perl (aTHX_ prev);
1347 1343
1348 if (expect_false (next->flags & CF_NEW)) 1344 if (expect_false (next->flags & CF_NEW))
1357 1353
1358 prev__cctx = prev->cctx; 1354 prev__cctx = prev->cctx;
1359 1355
1360 /* possibly untie and reuse the cctx */ 1356 /* possibly untie and reuse the cctx */
1361 if (expect_true ( 1357 if (expect_true (
1362 prev__cctx->idle_sp == stacklevel 1358 prev__cctx->idle_sp == (void *)stacklevel
1363 && !(prev__cctx->flags & CC_TRACE) 1359 && !(prev__cctx->flags & CC_TRACE)
1364 && !force_cctx 1360 && !force_cctx
1365 )) 1361 ))
1366 { 1362 {
1367 /* I assume that stacklevel is a stronger indicator than PL_top_env changes */ 1363 /* I assume that stacklevel is a stronger indicator than PL_top_env changes */
1406coro_state_destroy (pTHX_ struct coro *coro) 1402coro_state_destroy (pTHX_ struct coro *coro)
1407{ 1403{
1408 if (coro->flags & CF_DESTROYED) 1404 if (coro->flags & CF_DESTROYED)
1409 return 0; 1405 return 0;
1410 1406
1407 if (coro->on_destroy)
1408 coro->on_destroy (aTHX_ coro);
1409
1411 coro->flags |= CF_DESTROYED; 1410 coro->flags |= CF_DESTROYED;
1412 1411
1413 if (coro->flags & CF_READY) 1412 if (coro->flags & CF_READY)
1414 { 1413 {
1415 /* reduce nready, as destroying a ready coro effectively unreadies it */ 1414 /* reduce nready, as destroying a ready coro effectively unreadies it */
1416 /* alternative: look through all ready queues and remove the coro */ 1415 /* alternative: look through all ready queues and remove the coro */
1417 LOCK;
1418 --coro_nready; 1416 --coro_nready;
1419 UNLOCK;
1420 } 1417 }
1421 else 1418 else
1422 coro->flags |= CF_READY; /* make sure it is NOT put into the readyqueue */ 1419 coro->flags |= CF_READY; /* make sure it is NOT put into the readyqueue */
1423 1420
1424 if (coro->mainstack && coro->mainstack != main_mainstack) 1421 if (coro->mainstack && coro->mainstack != main_mainstack)
1502 TRANSFER (ta, 1); 1499 TRANSFER (ta, 1);
1503} 1500}
1504 1501
1505/** Coro ********************************************************************/ 1502/** Coro ********************************************************************/
1506 1503
1507static void 1504INLINE void
1508coro_enq (pTHX_ SV *coro_sv) 1505coro_enq (pTHX_ struct coro *coro)
1509{ 1506{
1510 av_push (coro_ready [SvSTATE (coro_sv)->prio - PRIO_MIN], coro_sv); 1507 av_push (coro_ready [coro->prio - PRIO_MIN], SvREFCNT_inc_NN (coro->hv));
1511} 1508}
1512 1509
1513static SV * 1510INLINE SV *
1514coro_deq (pTHX) 1511coro_deq (pTHX)
1515{ 1512{
1516 int prio; 1513 int prio;
1517 1514
1518 for (prio = PRIO_MAX - PRIO_MIN + 1; --prio >= 0; ) 1515 for (prio = PRIO_MAX - PRIO_MIN + 1; --prio >= 0; )
1537 if (coro->flags & CF_READY) 1534 if (coro->flags & CF_READY)
1538 return 0; 1535 return 0;
1539 1536
1540 coro->flags |= CF_READY; 1537 coro->flags |= CF_READY;
1541 1538
1542 LOCK;
1543
1544 sv_hook = coro_nready ? 0 : coro_readyhook; 1539 sv_hook = coro_nready ? 0 : coro_readyhook;
1545 xs_hook = coro_nready ? 0 : coroapi.readyhook; 1540 xs_hook = coro_nready ? 0 : coroapi.readyhook;
1546 1541
1547 coro_enq (aTHX_ SvREFCNT_inc_NN (coro_sv)); 1542 coro_enq (aTHX_ coro);
1548 ++coro_nready; 1543 ++coro_nready;
1549 1544
1550 UNLOCK;
1551
1552 if (sv_hook) 1545 if (sv_hook)
1553 { 1546 {
1554 dSP; 1547 dSP;
1555 1548
1556 ENTER; 1549 ENTER;
1582{ 1575{
1583 SV *prev_sv, *next_sv; 1576 SV *prev_sv, *next_sv;
1584 1577
1585 for (;;) 1578 for (;;)
1586 { 1579 {
1587 LOCK;
1588 next_sv = coro_deq (aTHX); 1580 next_sv = coro_deq (aTHX);
1589 1581
1590 /* nothing to schedule: call the idle handler */ 1582 /* nothing to schedule: call the idle handler */
1591 if (expect_false (!next_sv)) 1583 if (expect_false (!next_sv))
1592 { 1584 {
1593 dSP; 1585 dSP;
1594 UNLOCK;
1595 1586
1596 ENTER; 1587 ENTER;
1597 SAVETMPS; 1588 SAVETMPS;
1598 1589
1599 PUSHMARK (SP); 1590 PUSHMARK (SP);
1604 FREETMPS; 1595 FREETMPS;
1605 LEAVE; 1596 LEAVE;
1606 continue; 1597 continue;
1607 } 1598 }
1608 1599
1609 ta->next = SvSTATE (next_sv); 1600 ta->next = SvSTATE_hv (next_sv);
1610 1601
1611 /* cannot transfer to destroyed coros, skip and look for next */ 1602 /* cannot transfer to destroyed coros, skip and look for next */
1612 if (expect_false (ta->next->flags & CF_DESTROYED)) 1603 if (expect_false (ta->next->flags & CF_DESTROYED))
1613 { 1604 {
1614 UNLOCK;
1615 SvREFCNT_dec (next_sv); 1605 SvREFCNT_dec (next_sv);
1616 /* coro_nready has already been taken care of by destroy */ 1606 /* coro_nready has already been taken care of by destroy */
1617 continue; 1607 continue;
1618 } 1608 }
1619 1609
1620 --coro_nready; 1610 --coro_nready;
1621 UNLOCK;
1622 break; 1611 break;
1623 } 1612 }
1624 1613
1625 /* free this only after the transfer */ 1614 /* free this only after the transfer */
1626 prev_sv = SvRV (coro_current); 1615 prev_sv = SvRV (coro_current);
1627 ta->prev = SvSTATE (prev_sv); 1616 ta->prev = SvSTATE_hv (prev_sv);
1628 TRANSFER_CHECK (*ta); 1617 TRANSFER_CHECK (*ta);
1629 assert (("FATAL: next coroutine isn't marked as ready in Coro (please report)", ta->next->flags & CF_READY)); 1618 assert (("FATAL: next coroutine isn't marked as ready in Coro (please report)", ta->next->flags & CF_READY));
1630 ta->next->flags &= ~CF_READY; 1619 ta->next->flags &= ~CF_READY;
1631 SvRV_set (coro_current, next_sv); 1620 SvRV_set (coro_current, next_sv);
1632 1621
1633 LOCK;
1634 free_coro_mortal (aTHX); 1622 free_coro_mortal (aTHX);
1635 coro_mortal = prev_sv; 1623 coro_mortal = prev_sv;
1636 UNLOCK;
1637} 1624}
1638 1625
1639INLINE void 1626INLINE void
1640prepare_cede (pTHX_ struct coro_transfer_args *ta) 1627prepare_cede (pTHX_ struct coro_transfer_args *ta)
1641{ 1628{
1719 PL_runops = RUNOPS_DEFAULT; 1706 PL_runops = RUNOPS_DEFAULT;
1720 else 1707 else
1721 coro->slot->runops = RUNOPS_DEFAULT; 1708 coro->slot->runops = RUNOPS_DEFAULT;
1722 } 1709 }
1723} 1710}
1724
1725#if 0
1726static int
1727coro_gensub_free (pTHX_ SV *sv, MAGIC *mg)
1728{
1729 AV *padlist;
1730 AV *av = (AV *)mg->mg_obj;
1731
1732 abort ();
1733
1734 return 0;
1735}
1736
1737static MGVTBL coro_gensub_vtbl = {
1738 0, 0, 0, 0,
1739 coro_gensub_free
1740};
1741#endif
1742 1711
1743/*****************************************************************************/ 1712/*****************************************************************************/
1744/* PerlIO::cede */ 1713/* PerlIO::cede */
1745 1714
1746typedef struct 1715typedef struct
1848{ 1817{
1849 prepare_set_stacklevel (ta, (struct coro_cctx *)slf_frame.data); 1818 prepare_set_stacklevel (ta, (struct coro_cctx *)slf_frame.data);
1850} 1819}
1851 1820
1852static void 1821static void
1853slf_init_set_stacklevel (pTHX_ struct CoroSLF *frame, SV **arg, int items) 1822slf_init_set_stacklevel (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
1854{ 1823{
1855 assert (("FATAL: set_stacklevel needs the coro cctx as sole argument", items == 1)); 1824 assert (("FATAL: set_stacklevel needs the coro cctx as sole argument", items == 1));
1856 1825
1857 frame->prepare = slf_prepare_set_stacklevel; 1826 frame->prepare = slf_prepare_set_stacklevel;
1858 frame->check = slf_check_nop; 1827 frame->check = slf_check_nop;
1862static void 1831static void
1863slf_prepare_transfer (pTHX_ struct coro_transfer_args *ta) 1832slf_prepare_transfer (pTHX_ struct coro_transfer_args *ta)
1864{ 1833{
1865 SV **arg = (SV **)slf_frame.data; 1834 SV **arg = (SV **)slf_frame.data;
1866 1835
1867 prepare_transfer (ta, arg [0], arg [1]); 1836 prepare_transfer (aTHX_ ta, arg [0], arg [1]);
1868} 1837}
1869 1838
1870static void 1839static void
1871slf_init_transfer (pTHX_ struct CoroSLF *frame, SV **arg, int items) 1840slf_init_transfer (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
1872{ 1841{
1873 if (items != 2) 1842 if (items != 2)
1874 croak ("Coro::State::transfer (prev, next) expects two arguments, not %d,", items); 1843 croak ("Coro::State::transfer (prev, next) expects two arguments, not %d,", items);
1875 1844
1876 frame->prepare = slf_prepare_transfer; 1845 frame->prepare = slf_prepare_transfer;
1877 frame->check = slf_check_nop; 1846 frame->check = slf_check_nop;
1878 frame->data = (void *)arg; /* let's hope it will stay valid */ 1847 frame->data = (void *)arg; /* let's hope it will stay valid */
1879} 1848}
1880 1849
1881static void 1850static void
1882slf_init_schedule (pTHX_ struct CoroSLF *frame, SV **arg, int items) 1851slf_init_schedule (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
1883{ 1852{
1884 frame->prepare = prepare_schedule; 1853 frame->prepare = prepare_schedule;
1885 frame->check = slf_check_nop; 1854 frame->check = slf_check_nop;
1886} 1855}
1887 1856
1888static void 1857static void
1889slf_init_cede (pTHX_ struct CoroSLF *frame, SV **arg, int items) 1858slf_init_cede (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
1890{ 1859{
1891 frame->prepare = prepare_cede; 1860 frame->prepare = prepare_cede;
1892 frame->check = slf_check_nop; 1861 frame->check = slf_check_nop;
1893} 1862}
1894 1863
1895static void 1864static void
1896slf_init_cede_notself (pTHX_ struct CoroSLF *frame, SV **arg, int items) 1865slf_init_cede_notself (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
1897{ 1866{
1898 frame->prepare = prepare_cede_notself; 1867 frame->prepare = prepare_cede_notself;
1899 frame->check = slf_check_nop; 1868 frame->check = slf_check_nop;
1900} 1869}
1901 1870
1940 items = AvFILLp (av) + 1; 1909 items = AvFILLp (av) + 1;
1941 } 1910 }
1942 1911
1943 PUTBACK; 1912 PUTBACK;
1944 1913
1914 /* now call the init function, which needs to set up slf_frame */
1945 ((coro_slf_cb)CvXSUBANY (GvCV (gv)).any_ptr) (aTHX_ &slf_frame, arg, items); 1915 ((coro_slf_cb)CvXSUBANY (GvCV (gv)).any_ptr)
1916 (aTHX_ &slf_frame, GvCV (gv), arg, items);
1946 } 1917 }
1947 1918
1948 /* now interpret the slf_frame */ 1919 /* now that we have a slf_frame, interpret it! */
1949 /* we use a callback system not to make the code needlessly */ 1920 /* we use a callback system not to make the code needlessly */
1950 /* complicated, but so we can run multiple perl coros from one cctx */ 1921 /* complicated, but so we can run multiple perl coros from one cctx */
1951 1922
1952 do 1923 do
1953 { 1924 {
1963 { 1934 {
1964 dSP; 1935 dSP;
1965 SV **bot = PL_stack_base + checkmark; 1936 SV **bot = PL_stack_base + checkmark;
1966 int gimme = GIMME_V; 1937 int gimme = GIMME_V;
1967 1938
1968 slf_frame.prepare = 0; /* signal pp_slf that we need a new frame */ 1939 slf_frame.prepare = 0; /* invalidate the frame, so it gets initialised again next time */
1969 1940
1970 /* make sure we put something on the stack in scalar context */ 1941 /* make sure we put something on the stack in scalar context */
1971 if (gimme == G_SCALAR) 1942 if (gimme == G_SCALAR)
1972 { 1943 {
1973 if (sp == bot) 1944 if (sp == bot)
1987{ 1958{
1988 assert (("FATAL: SLF call with illegal CV value", !CvANON (cv))); 1959 assert (("FATAL: SLF call with illegal CV value", !CvANON (cv)));
1989 1960
1990 if (PL_op->op_ppaddr != PL_ppaddr [OP_ENTERSUB] 1961 if (PL_op->op_ppaddr != PL_ppaddr [OP_ENTERSUB]
1991 && PL_op->op_ppaddr != pp_slf) 1962 && PL_op->op_ppaddr != pp_slf)
1992 croak ("FATAL: Coro SLF calls can only be made normally, not via goto or other means, caught"); 1963 croak ("FATAL: Coro SLF calls can only be made normally, not via goto or any other means, caught");
1993 1964
1994 if (items > 3) 1965 if (items > 3)
1995 croak ("Coro only supports up to three arguments to SLF functions currently, caught"); 1966 croak ("Coro only supports up to three arguments to SLF functions currently (not %d), caught", items);
1996 1967
1997 CvFLAGS (cv) |= CVf_SLF; 1968 CvFLAGS (cv) |= CVf_SLF;
1998 CvXSUBANY (cv).any_ptr = (void *)init_cb; 1969 CvXSUBANY (cv).any_ptr = (void *)init_cb;
1999 slf_cv = cv; 1970 slf_cv = cv;
2000 1971
2001 /* we patch the op, and then re-run the whole call */ 1972 /* we patch the op, and then re-run the whole call */
2002 /* we have to put the same argument on the stack for this to work */ 1973 /* we have to put the same argument on the stack for this to work */
2003 /* and this will be done by pp_restore */ 1974 /* and this will be done by pp_restore */
2004 slf_restore.op_next = (OP *)&slf_restore; 1975 slf_restore.op_next = (OP *)&slf_restore;
2005 slf_restore.op_type = OP_NULL; 1976 slf_restore.op_type = OP_CUSTOM;
2006 slf_restore.op_ppaddr = pp_restore; 1977 slf_restore.op_ppaddr = pp_restore;
2007 slf_restore.op_first = PL_op; 1978 slf_restore.op_first = PL_op;
2008 1979
2009 slf_arg0 = items > 0 ? SvREFCNT_inc (arg [0]) : 0; 1980 slf_arg0 = items > 0 ? SvREFCNT_inc (arg [0]) : 0;
2010 slf_arg1 = items > 1 ? SvREFCNT_inc (arg [1]) : 0; 1981 slf_arg1 = items > 1 ? SvREFCNT_inc (arg [1]) : 0;
2013 PL_op->op_ppaddr = pp_slf; 1984 PL_op->op_ppaddr = pp_slf;
2014 1985
2015 PL_op = (OP *)&slf_restore; 1986 PL_op = (OP *)&slf_restore;
2016} 1987}
2017 1988
1989/*****************************************************************************/
1990
1991static void
1992coro_semaphore_adjust (pTHX_ AV *av, IV adjust)
1993{
1994 SV *count_sv = AvARRAY (av)[0];
1995 IV count = SvIVX (count_sv);
1996
1997 count += adjust;
1998 SvIVX (count_sv) = count;
1999
2000 /* now wake up as many waiters as are expected to lock */
2001 while (count > 0 && AvFILLp (av) > 0)
2002 {
2003 SV *cb;
2004
2005 /* swap first two elements so we can shift a waiter */
2006 AvARRAY (av)[0] = AvARRAY (av)[1];
2007 AvARRAY (av)[1] = count_sv;
2008 cb = av_shift (av);
2009
2010 if (SvOBJECT (cb))
2011 api_ready (aTHX_ cb);
2012 else
2013 croak ("callbacks not yet supported");
2014
2015 SvREFCNT_dec (cb);
2016
2017 --count;
2018 }
2019}
2020
2021static void
2022coro_semaphore_on_destroy (pTHX_ struct coro *coro)
2023{
2024 /* call $sem->adjust (0) to possibly wake up some other waiters */
2025 coro_semaphore_adjust (aTHX_ (AV *)coro->slf_frame.data, 0);
2026}
2027
2028static int
2029slf_check_semaphore_down (pTHX_ struct CoroSLF *frame)
2030{
2031 AV *av = (AV *)frame->data;
2032 SV *count_sv = AvARRAY (av)[0];
2033
2034 if (SvIVX (count_sv) > 0)
2035 {
2036 SvSTATE_current->on_destroy = 0;
2037 SvIVX (count_sv) = SvIVX (count_sv) - 1;
2038 return 0;
2039 }
2040 else
2041 {
2042 int i;
2043 /* if we were woken up but can't down, we look through the whole */
2044 /* waiters list and only add us if we aren't in there already */
2045 /* this avoids some degenerate memory usage cases */
2046
2047 for (i = 1; i <= AvFILLp (av); ++i)
2048 if (AvARRAY (av)[i] == SvRV (coro_current))
2049 return 1;
2050
2051 av_push (av, SvREFCNT_inc (SvRV (coro_current)));
2052 return 1;
2053 }
2054}
2055
2056static void
2057slf_init_semaphore_down (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2058{
2059 AV *av = (AV *)SvRV (arg [0]);
2060
2061 if (SvIVX (AvARRAY (av)[0]) > 0)
2062 {
2063 frame->data = (void *)av;
2064 frame->prepare = prepare_nop;
2065 SvSTATE_current->on_destroy = coro_semaphore_on_destroy;
2066 }
2067 else
2068 {
2069 av_push (av, SvREFCNT_inc (SvRV (coro_current)));
2070
2071 frame->data = (void *)sv_2mortal (SvREFCNT_inc ((SV *)av));
2072 frame->prepare = prepare_schedule;
2073
2074 /* to avoid race conditions when a woken-up coro gets terminated */
2075 /* we arrange for a temporary on_destroy that calls adjust (0) */
2076 assert (!SvSTATE_current->on_destroy);//D
2077 SvSTATE_current->on_destroy = coro_semaphore_on_destroy;
2078 }
2079
2080 frame->check = slf_check_semaphore_down;
2081
2082}
2083
2084/*****************************************************************************/
2085
2086#define GENSUB_ARG CvXSUBANY (cv).any_ptr
2087
2088/* create a closure from XS, returns a code reference */
2089/* the arg can be accessed via GENSUB_ARG from the callback */
2090/* the callback must use dXSARGS/XSRETURN */
2091static SV *
2092gensub (pTHX_ void (*xsub)(pTHX_ CV *), void *arg)
2093{
2094 CV *cv = (CV *)NEWSV (0, 0);
2095
2096 sv_upgrade ((SV *)cv, SVt_PVCV);
2097
2098 CvANON_on (cv);
2099 CvISXSUB_on (cv);
2100 CvXSUB (cv) = xsub;
2101 GENSUB_ARG = arg;
2102
2103 return newRV_noinc ((SV *)cv);
2104}
2105
2106/*****************************************************************************/
2107
2018MODULE = Coro::State PACKAGE = Coro::State PREFIX = api_ 2108MODULE = Coro::State PACKAGE = Coro::State PREFIX = api_
2019 2109
2020PROTOTYPES: DISABLE 2110PROTOTYPES: DISABLE
2021 2111
2022BOOT: 2112BOOT:
2023{ 2113{
2024#ifdef USE_ITHREADS 2114#ifdef USE_ITHREADS
2025 MUTEX_INIT (&coro_lock);
2026# if CORO_PTHREAD 2115# if CORO_PTHREAD
2027 coro_thx = PERL_GET_CONTEXT; 2116 coro_thx = PERL_GET_CONTEXT;
2028# endif 2117# endif
2029#endif 2118#endif
2030 BOOT_PAGESIZE; 2119 BOOT_PAGESIZE;
2050 main_mainstack = PL_mainstack; 2139 main_mainstack = PL_mainstack;
2051 main_top_env = PL_top_env; 2140 main_top_env = PL_top_env;
2052 2141
2053 while (main_top_env->je_prev) 2142 while (main_top_env->je_prev)
2054 main_top_env = main_top_env->je_prev; 2143 main_top_env = main_top_env->je_prev;
2144
2145 {
2146 SV *slf = sv_2mortal (newSViv (PTR2IV (pp_slf)));
2147
2148 if (!PL_custom_op_names) PL_custom_op_names = newHV ();
2149 hv_store_ent (PL_custom_op_names, slf,
2150 newSVpv ("coro_slf", 0), 0);
2151
2152 if (!PL_custom_op_descs) PL_custom_op_descs = newHV ();
2153 hv_store_ent (PL_custom_op_descs, slf,
2154 newSVpv ("coro schedule like function", 0), 0);
2155 }
2055 2156
2056 coroapi.ver = CORO_API_VERSION; 2157 coroapi.ver = CORO_API_VERSION;
2057 coroapi.rev = CORO_API_REVISION; 2158 coroapi.rev = CORO_API_REVISION;
2058 2159
2059 coroapi.transfer = api_transfer; 2160 coroapi.transfer = api_transfer;
2111 CODE: 2212 CODE:
2112 api_execute_slf (aTHX_ cv, slf_init_set_stacklevel, &ST (0), items); 2213 api_execute_slf (aTHX_ cv, slf_init_set_stacklevel, &ST (0), items);
2113 2214
2114void 2215void
2115transfer (...) 2216transfer (...)
2217 PROTOTYPE: $$
2116 CODE: 2218 CODE:
2117 api_execute_slf (aTHX_ cv, slf_init_transfer, &ST (0), items); 2219 api_execute_slf (aTHX_ cv, slf_init_transfer, &ST (0), items);
2118 2220
2119bool 2221bool
2120_destroy (SV *coro_sv) 2222_destroy (SV *coro_sv)
2129 CODE: 2231 CODE:
2130 _exit (code); 2232 _exit (code);
2131 2233
2132int 2234int
2133cctx_stacksize (int new_stacksize = 0) 2235cctx_stacksize (int new_stacksize = 0)
2236 PROTOTYPE: ;$
2134 CODE: 2237 CODE:
2135 RETVAL = cctx_stacksize; 2238 RETVAL = cctx_stacksize;
2136 if (new_stacksize) 2239 if (new_stacksize)
2137 { 2240 {
2138 cctx_stacksize = new_stacksize; 2241 cctx_stacksize = new_stacksize;
2141 OUTPUT: 2244 OUTPUT:
2142 RETVAL 2245 RETVAL
2143 2246
2144int 2247int
2145cctx_max_idle (int max_idle = 0) 2248cctx_max_idle (int max_idle = 0)
2249 PROTOTYPE: ;$
2146 CODE: 2250 CODE:
2147 RETVAL = cctx_max_idle; 2251 RETVAL = cctx_max_idle;
2148 if (max_idle > 1) 2252 if (max_idle > 1)
2149 cctx_max_idle = max_idle; 2253 cctx_max_idle = max_idle;
2150 OUTPUT: 2254 OUTPUT:
2151 RETVAL 2255 RETVAL
2152 2256
2153int 2257int
2154cctx_count () 2258cctx_count ()
2259 PROTOTYPE:
2155 CODE: 2260 CODE:
2156 RETVAL = cctx_count; 2261 RETVAL = cctx_count;
2157 OUTPUT: 2262 OUTPUT:
2158 RETVAL 2263 RETVAL
2159 2264
2160int 2265int
2161cctx_idle () 2266cctx_idle ()
2267 PROTOTYPE:
2162 CODE: 2268 CODE:
2163 RETVAL = cctx_idle; 2269 RETVAL = cctx_idle;
2164 OUTPUT: 2270 OUTPUT:
2165 RETVAL 2271 RETVAL
2166 2272
2167void 2273void
2168list () 2274list ()
2275 PROTOTYPE:
2169 PPCODE: 2276 PPCODE:
2170{ 2277{
2171 struct coro *coro; 2278 struct coro *coro;
2172 for (coro = coro_first; coro; coro = coro->next) 2279 for (coro = coro_first; coro; coro = coro->next)
2173 if (coro->hv) 2280 if (coro->hv)
2240 SvREFCNT_dec (self->throw); 2347 SvREFCNT_dec (self->throw);
2241 self->throw = SvOK (throw) ? newSVsv (throw) : 0; 2348 self->throw = SvOK (throw) ? newSVsv (throw) : 0;
2242 2349
2243void 2350void
2244api_trace (SV *coro, int flags = CC_TRACE | CC_TRACE_SUB) 2351api_trace (SV *coro, int flags = CC_TRACE | CC_TRACE_SUB)
2352 PROTOTYPE: $;$
2245 C_ARGS: aTHX_ coro, flags 2353 C_ARGS: aTHX_ coro, flags
2246 2354
2247SV * 2355SV *
2248has_cctx (Coro::State coro) 2356has_cctx (Coro::State coro)
2249 PROTOTYPE: $ 2357 PROTOTYPE: $
2274 OUTPUT: 2382 OUTPUT:
2275 RETVAL 2383 RETVAL
2276 2384
2277void 2385void
2278force_cctx () 2386force_cctx ()
2387 PROTOTYPE:
2279 CODE: 2388 CODE:
2280 struct coro *coro = SvSTATE (coro_current);
2281 coro->cctx->idle_sp = 0; 2389 SvSTATE_current->cctx->idle_sp = 0;
2282 2390
2283void 2391void
2284swap_defsv (Coro::State self) 2392swap_defsv (Coro::State self)
2285 PROTOTYPE: $ 2393 PROTOTYPE: $
2286 ALIAS: 2394 ALIAS:
2339} 2447}
2340 2448
2341void 2449void
2342schedule (...) 2450schedule (...)
2343 CODE: 2451 CODE:
2344 api_execute_slf (aTHX_ cv, slf_init_schedule, &ST (0), items); 2452 api_execute_slf (aTHX_ cv, slf_init_schedule, &ST (0), 0);
2345 2453
2346void 2454void
2347cede (...) 2455cede (...)
2348 CODE: 2456 CODE:
2349 api_execute_slf (aTHX_ cv, slf_init_cede, &ST (0), items); 2457 api_execute_slf (aTHX_ cv, slf_init_cede, &ST (0), 0);
2350 2458
2351void 2459void
2352cede_notself (...) 2460cede_notself (...)
2353 CODE: 2461 CODE:
2354 api_execute_slf (aTHX_ cv, slf_init_cede_notself, &ST (0), items); 2462 api_execute_slf (aTHX_ cv, slf_init_cede_notself, &ST (0), 0);
2355 2463
2356void 2464void
2357_set_current (SV *current) 2465_set_current (SV *current)
2358 PROTOTYPE: $ 2466 PROTOTYPE: $
2359 CODE: 2467 CODE:
2362 2470
2363void 2471void
2364_set_readyhook (SV *hook) 2472_set_readyhook (SV *hook)
2365 PROTOTYPE: $ 2473 PROTOTYPE: $
2366 CODE: 2474 CODE:
2367 LOCK;
2368 SvREFCNT_dec (coro_readyhook); 2475 SvREFCNT_dec (coro_readyhook);
2369 coro_readyhook = SvOK (hook) ? newSVsv (hook) : 0; 2476 coro_readyhook = SvOK (hook) ? newSVsv (hook) : 0;
2370 UNLOCK;
2371 2477
2372int 2478int
2373prio (Coro::State coro, int newprio = 0) 2479prio (Coro::State coro, int newprio = 0)
2480 PROTOTYPE: $;$
2374 ALIAS: 2481 ALIAS:
2375 nice = 1 2482 nice = 1
2376 CODE: 2483 CODE:
2377{ 2484{
2378 RETVAL = coro->prio; 2485 RETVAL = coro->prio;
2410# for async_pool speedup 2517# for async_pool speedup
2411void 2518void
2412_pool_1 (SV *cb) 2519_pool_1 (SV *cb)
2413 CODE: 2520 CODE:
2414{ 2521{
2415 struct coro *coro = SvSTATE (coro_current);
2416 HV *hv = (HV *)SvRV (coro_current); 2522 HV *hv = (HV *)SvRV (coro_current);
2523 struct coro *coro = SvSTATE_hv ((SV *)hv);
2417 AV *defav = GvAV (PL_defgv); 2524 AV *defav = GvAV (PL_defgv);
2418 SV *invoke = hv_delete (hv, "_invoke", sizeof ("_invoke") - 1, 0); 2525 SV *invoke = hv_delete (hv, "_invoke", sizeof ("_invoke") - 1, 0);
2419 AV *invoke_av; 2526 AV *invoke_av;
2420 int i, len; 2527 int i, len;
2421 2528
2442 { 2549 {
2443 av_fill (defav, len - 1); 2550 av_fill (defav, len - 1);
2444 for (i = 0; i < len; ++i) 2551 for (i = 0; i < len; ++i)
2445 av_store (defav, i, SvREFCNT_inc_NN (AvARRAY (invoke_av)[i + 1])); 2552 av_store (defav, i, SvREFCNT_inc_NN (AvARRAY (invoke_av)[i + 1]));
2446 } 2553 }
2447
2448 SvREFCNT_dec (invoke);
2449} 2554}
2450 2555
2451void 2556void
2452_pool_2 (SV *cb) 2557_pool_2 (SV *cb)
2453 CODE: 2558 CODE:
2454{ 2559{
2455 struct coro *coro = SvSTATE (coro_current); 2560 struct coro *coro = SvSTATE_current;
2456 2561
2457 sv_setsv (cb, &PL_sv_undef); 2562 sv_setsv (cb, &PL_sv_undef);
2458 2563
2459 SvREFCNT_dec ((SV *)PL_defoutgv); PL_defoutgv = (GV *)coro->saved_deffh; 2564 SvREFCNT_dec ((SV *)PL_defoutgv); PL_defoutgv = (GV *)coro->saved_deffh;
2460 coro->saved_deffh = 0; 2565 coro->saved_deffh = 0;
2478 api_trace (aTHX_ coro_current, 0); 2583 api_trace (aTHX_ coro_current, 0);
2479 2584
2480 av_push (av_async_pool, newSVsv (coro_current)); 2585 av_push (av_async_pool, newSVsv (coro_current));
2481} 2586}
2482 2587
2483#if 0
2484
2485void
2486_generator_call (...)
2487 PROTOTYPE: @
2488 PPCODE:
2489 fprintf (stderr, "call %p\n", CvXSUBANY(cv).any_ptr);
2490 xxxx
2491 abort ();
2492
2493SV *
2494gensub (SV *sub, ...)
2495 PROTOTYPE: &;@
2496 CODE:
2497{
2498 struct coro *coro;
2499 MAGIC *mg;
2500 CV *xcv;
2501 CV *ncv = (CV *)newSV_type (SVt_PVCV);
2502 int i;
2503
2504 CvGV (ncv) = CvGV (cv);
2505 CvFILE (ncv) = CvFILE (cv);
2506
2507 Newz (0, coro, 1, struct coro);
2508 coro->args = newAV ();
2509 coro->flags = CF_NEW;
2510
2511 av_extend (coro->args, items - 1);
2512 for (i = 1; i < items; i++)
2513 av_push (coro->args, newSVsv (ST (i)));
2514
2515 CvISXSUB_on (ncv);
2516 CvXSUBANY (ncv).any_ptr = (void *)coro;
2517
2518 xcv = GvCV (gv_fetchpv ("Coro::_generator_call", 0, SVt_PVCV));
2519
2520 CvXSUB (ncv) = CvXSUB (xcv);
2521 CvANON_on (ncv);
2522
2523 mg = sv_magicext ((SV *)ncv, 0, CORO_MAGIC_type_state, &coro_gensub_vtbl, (char *)coro, 0);
2524 RETVAL = newRV_noinc ((SV *)ncv);
2525}
2526 OUTPUT:
2527 RETVAL
2528
2529#endif
2530
2531 2588
2532MODULE = Coro::State PACKAGE = Coro::AIO 2589MODULE = Coro::State PACKAGE = Coro::AIO
2533 2590
2534void 2591void
2535_get_state (SV *self) 2592_get_state (SV *self)
2593 PROTOTYPE: $
2536 PPCODE: 2594 PPCODE:
2537{ 2595{
2538 AV *defav = GvAV (PL_defgv); 2596 AV *defav = GvAV (PL_defgv);
2539 AV *av = newAV (); 2597 AV *av = newAV ();
2540 int i; 2598 int i;
2583MODULE = Coro::State PACKAGE = Coro::AnyEvent 2641MODULE = Coro::State PACKAGE = Coro::AnyEvent
2584 2642
2585BOOT: 2643BOOT:
2586 sv_activity = coro_get_sv (aTHX_ "Coro::AnyEvent::ACTIVITY", TRUE); 2644 sv_activity = coro_get_sv (aTHX_ "Coro::AnyEvent::ACTIVITY", TRUE);
2587 2645
2588SV * 2646void
2589_schedule (...) 2647_schedule (...)
2590 PROTOTYPE: @
2591 CODE: 2648 CODE:
2592{ 2649{
2593 static int incede; 2650 static int incede;
2594 2651
2595 api_cede_notself (aTHX); 2652 api_cede_notself (aTHX);
2614MODULE = Coro::State PACKAGE = PerlIO::cede 2671MODULE = Coro::State PACKAGE = PerlIO::cede
2615 2672
2616BOOT: 2673BOOT:
2617 PerlIO_define_layer (aTHX_ &PerlIO_cede); 2674 PerlIO_define_layer (aTHX_ &PerlIO_cede);
2618 2675
2676MODULE = Coro::State PACKAGE = Coro::Semaphore
2677
2678SV *
2679new (SV *klass, SV *count_ = 0)
2680 CODE:
2681{
2682 /* a semaphore contains a counter IV in $sem->[0] and any waiters after that */
2683 AV *av = newAV ();
2684 av_push (av, newSViv (count_ && SvOK (count_) ? SvIV (count_) : 1));
2685 RETVAL = sv_bless (newRV_noinc ((SV *)av), GvSTASH (CvGV (cv)));
2686}
2687 OUTPUT:
2688 RETVAL
2689
2690SV *
2691count (SV *self)
2692 CODE:
2693 RETVAL = newSVsv (AvARRAY ((AV *)SvRV (self))[0]);
2694 OUTPUT:
2695 RETVAL
2696
2697void
2698up (SV *self, int adjust = 1)
2699 ALIAS:
2700 adjust = 1
2701 CODE:
2702 coro_semaphore_adjust (aTHX_ (AV *)SvRV (self), ix ? adjust : 1);
2703
2704void
2705down (SV *self)
2706 CODE:
2707 api_execute_slf (aTHX_ cv, slf_init_semaphore_down, &ST (0), 1);
2708
2709void
2710try (SV *self)
2711 PPCODE:
2712{
2713 AV *av = (AV *)SvRV (self);
2714 SV *count_sv = AvARRAY (av)[0];
2715 IV count = SvIVX (count_sv);
2716
2717 if (count > 0)
2718 {
2719 --count;
2720 SvIVX (count_sv) = count;
2721 XSRETURN_YES;
2722 }
2723 else
2724 XSRETURN_NO;
2725}
2726
2727void
2728waiters (SV *self)
2729 CODE:
2730{
2731 AV *av = (AV *)SvRV (self);
2732
2733 if (GIMME_V == G_SCALAR)
2734 XPUSHs (sv_2mortal (newSVsv (AvARRAY (av)[0])));
2735 else
2736 {
2737 int i;
2738 EXTEND (SP, AvFILLp (av) + 1 - 1);
2739 for (i = 1; i <= AvFILLp (av); ++i)
2740 PUSHs (newSVsv (AvARRAY (av)[i]));
2741 }
2742}
2743

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines