ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/Coro/Coro/State.xs
(Generate patch)

Comparing Coro/Coro/State.xs (file contents):
Revision 1.6 by root, Tue Jul 17 15:42:28 2001 UTC vs.
Revision 1.337 by root, Fri Dec 5 18:26:10 2008 UTC

1#include "libcoro/coro.c"
2
3#define PERL_NO_GET_CONTEXT
4#define PERL_EXT
5
1#include "EXTERN.h" 6#include "EXTERN.h"
2#include "perl.h" 7#include "perl.h"
3#include "XSUB.h" 8#include "XSUB.h"
9#include "perliol.h"
4 10
5#if 0 11#include "patchlevel.h"
6# define CHK(x) (void *)0 12
13#include <stdio.h>
14#include <errno.h>
15#include <assert.h>
16
17#ifdef WIN32
18# undef setjmp
19# undef longjmp
20# undef _exit
21# define setjmp _setjmp /* deep magic */
7#else 22#else
8# define CHK(x) if (!(x)) croak("FATAL, CHK: " #x) 23# include <inttypes.h> /* most portable stdint.h */
24#endif
25
26#ifdef HAVE_MMAP
27# include <unistd.h>
28# include <sys/mman.h>
29# ifndef MAP_ANONYMOUS
30# ifdef MAP_ANON
31# define MAP_ANONYMOUS MAP_ANON
32# else
33# undef HAVE_MMAP
34# endif
9#endif 35# endif
36# include <limits.h>
37# ifndef PAGESIZE
38# define PAGESIZE pagesize
39# define BOOT_PAGESIZE pagesize = sysconf (_SC_PAGESIZE)
40static long pagesize;
41# else
42# define BOOT_PAGESIZE (void)0
43# endif
44#else
45# define PAGESIZE 0
46# define BOOT_PAGESIZE (void)0
47#endif
10 48
49#if CORO_USE_VALGRIND
50# include <valgrind/valgrind.h>
51#endif
52
53/* the maximum number of idle cctx that will be pooled */
54static int cctx_max_idle = 4;
55
56#define PERL_VERSION_ATLEAST(a,b,c) \
57 (PERL_REVISION > (a) \
58 || (PERL_REVISION == (a) \
59 && (PERL_VERSION > (b) \
60 || (PERL_VERSION == (b) && PERL_SUBVERSION >= (c)))))
61
62#if !PERL_VERSION_ATLEAST (5,6,0)
63# ifndef PL_ppaddr
64# define PL_ppaddr ppaddr
65# endif
66# ifndef call_sv
67# define call_sv perl_call_sv
68# endif
69# ifndef get_sv
70# define get_sv perl_get_sv
71# endif
72# ifndef get_cv
73# define get_cv perl_get_cv
74# endif
75# ifndef IS_PADGV
76# define IS_PADGV(v) 0
77# endif
78# ifndef IS_PADCONST
79# define IS_PADCONST(v) 0
80# endif
81#endif
82
83/* 5.11 */
84#ifndef CxHASARGS
85# define CxHASARGS(cx) (cx)->blk_sub.hasargs
86#endif
87
88/* 5.10.0 */
89#ifndef SvREFCNT_inc_NN
90# define SvREFCNT_inc_NN(sv) SvREFCNT_inc (sv)
91#endif
92
93/* 5.8.8 */
94#ifndef GV_NOTQUAL
95# define GV_NOTQUAL 0
96#endif
97#ifndef newSV
98# define newSV(l) NEWSV(0,l)
99#endif
100#ifndef CvISXSUB_on
101# define CvISXSUB_on(cv) (void)cv
102#endif
103#ifndef CvISXSUB
104# define CvISXSUB(cv) (CvXSUB (cv) ? TRUE : FALSE)
105#endif
106#ifndef Newx
107# define Newx(ptr,nitems,type) New (0,ptr,nitems,type)
108#endif
109
110/* 5.8.7 */
111#ifndef SvRV_set
112# define SvRV_set(s,v) SvRV(s) = (v)
113#endif
114
115#if !__i386 && !__x86_64 && !__powerpc && !__m68k && !__alpha && !__mips && !__sparc64
116# undef CORO_STACKGUARD
117#endif
118
119#ifndef CORO_STACKGUARD
120# define CORO_STACKGUARD 0
121#endif
122
123/* prefer perl internal functions over our own? */
124#ifndef CORO_PREFER_PERL_FUNCTIONS
125# define CORO_PREFER_PERL_FUNCTIONS 0
126#endif
127
128/* The next macros try to return the current stack pointer, in an as
129 * portable way as possible. */
130#if __GNUC__ >= 4
131# define dSTACKLEVEL int stacklevel_dummy
132# define STACKLEVEL __builtin_frame_address (0)
133#else
134# define dSTACKLEVEL volatile void *stacklevel
135# define STACKLEVEL ((void *)&stacklevel)
136#endif
137
138#define IN_DESTRUCT PL_dirty
139
140#if __GNUC__ >= 3
141# define attribute(x) __attribute__(x)
142# define expect(expr,value) __builtin_expect ((expr),(value))
143# define INLINE static inline
144#else
145# define attribute(x)
146# define expect(expr,value) (expr)
147# define INLINE static
148#endif
149
150#define expect_false(expr) expect ((expr) != 0, 0)
151#define expect_true(expr) expect ((expr) != 0, 1)
152
153#define NOINLINE attribute ((noinline))
154
155#include "CoroAPI.h"
156#define GCoroAPI (&coroapi) /* very sneaky */
157
158#ifdef USE_ITHREADS
159# if CORO_PTHREAD
160static void *coro_thx;
161# endif
162#endif
163
164static double (*nvtime)(); /* so why doesn't it take void? */
165
166/* we hijack an hopefully unused CV flag for our purposes */
167#define CVf_SLF 0x4000
168static OP *pp_slf (pTHX);
169
170static U32 cctx_gen;
171static size_t cctx_stacksize = CORO_STACKSIZE;
172static struct CoroAPI coroapi;
173static AV *main_mainstack; /* used to differentiate between $main and others */
174static JMPENV *main_top_env;
175static HV *coro_state_stash, *coro_stash;
176static volatile SV *coro_mortal; /* will be freed/thrown after next transfer */
177
178static AV *av_destroy; /* destruction queue */
179static SV *sv_manager; /* the manager coro */
180static SV *sv_idle; /* $Coro::idle */
181
182static GV *irsgv; /* $/ */
183static GV *stdoutgv; /* *STDOUT */
184static SV *rv_diehook;
185static SV *rv_warnhook;
186static HV *hv_sig; /* %SIG */
187
188/* async_pool helper stuff */
189static SV *sv_pool_rss;
190static SV *sv_pool_size;
191static SV *sv_async_pool_idle; /* description string */
192static AV *av_async_pool; /* idle pool */
193static SV *sv_Coro; /* class string */
194static CV *cv_pool_handler;
195static CV *cv_coro_state_new;
196
197/* Coro::AnyEvent */
198static SV *sv_activity;
199
200static struct coro_cctx *cctx_first;
201static int cctx_count, cctx_idle;
202
203enum {
204 CC_MAPPED = 0x01,
205 CC_NOREUSE = 0x02, /* throw this away after tracing */
206 CC_TRACE = 0x04,
207 CC_TRACE_SUB = 0x08, /* trace sub calls */
208 CC_TRACE_LINE = 0x10, /* trace each statement */
209 CC_TRACE_ALL = CC_TRACE_SUB | CC_TRACE_LINE,
210};
211
212/* this is a structure representing a c-level coroutine */
213typedef struct coro_cctx
214{
215 struct coro_cctx *next;
216
217 /* the stack */
218 void *sptr;
219 size_t ssize;
220
221 /* cpu state */
222 void *idle_sp; /* sp of top-level transfer/schedule/cede call */
223 JMPENV *idle_te; /* same as idle_sp, but for top_env, TODO: remove once stable */
224 JMPENV *top_env;
225 coro_context cctx;
226
227 U32 gen;
228#if CORO_USE_VALGRIND
229 int valgrind_id;
230#endif
231 unsigned char flags;
232} coro_cctx;
233
234coro_cctx *cctx_current; /* the currently running cctx */
235
236/*****************************************************************************/
237
238enum {
239 CF_RUNNING = 0x0001, /* coroutine is running */
240 CF_READY = 0x0002, /* coroutine is ready */
241 CF_NEW = 0x0004, /* has never been switched to */
242 CF_DESTROYED = 0x0008, /* coroutine data has been freed */
243};
244
245/* the structure where most of the perl state is stored, overlaid on the cxstack */
246typedef struct
247{
248 SV *defsv;
249 AV *defav;
250 SV *errsv;
251 SV *irsgv;
252 HV *hinthv;
253#define VAR(name,type) type name;
254# include "state.h"
255#undef VAR
256} perl_slots;
257
258#define SLOT_COUNT ((sizeof (perl_slots) + sizeof (PERL_CONTEXT) - 1) / sizeof (PERL_CONTEXT))
259
260/* this is a structure representing a perl-level coroutine */
11struct coro { 261struct coro {
12 U8 dowarn; 262 /* the C coroutine allocated to this perl coroutine, if any */
13 AV *defav; 263 coro_cctx *cctx;
14 264
15 PERL_SI *curstackinfo; 265 /* state data */
16 AV *curstack; 266 struct CoroSLF slf_frame; /* saved slf frame */
17 AV *mainstack; 267 AV *mainstack;
18 SV **stack_sp; 268 perl_slots *slot; /* basically the saved sp */
19 OP *op;
20 SV **curpad;
21 SV **stack_base;
22 SV **stack_max;
23 SV **tmps_stack;
24 I32 tmps_floor;
25 I32 tmps_ix;
26 I32 tmps_max;
27 I32 *markstack;
28 I32 *markstack_ptr;
29 I32 *markstack_max;
30 I32 *scopestack;
31 I32 scopestack_ix;
32 I32 scopestack_max;
33 ANY *savestack;
34 I32 savestack_ix;
35 I32 savestack_max;
36 OP **retstack;
37 I32 retstack_ix;
38 I32 retstack_max;
39 COP *curcop;
40 269
41 AV *args; 270 CV *startcv; /* the CV to execute */
271 AV *args; /* data associated with this coroutine (initial args) */
272 int refcnt; /* coroutines are refcounted, yes */
273 int flags; /* CF_ flags */
274 HV *hv; /* the perl hash associated with this coro, if any */
275 void (*on_destroy)(pTHX_ struct coro *coro);
276
277 /* statistics */
278 int usecount; /* number of transfers to this coro */
279
280 /* coro process data */
281 int prio;
282 SV *except; /* exception to be thrown */
283 SV *rouse_cb;
284
285 /* async_pool */
286 SV *saved_deffh;
287 SV *invoke_cb;
288 AV *invoke_av;
289
290 /* linked list */
291 struct coro *next, *prev;
42}; 292};
43 293
44typedef struct coro *Coro__State; 294typedef struct coro *Coro__State;
45typedef struct coro *Coro__State_or_hashref; 295typedef struct coro *Coro__State_or_hashref;
46 296
47static HV *padlist_cache; 297/* the following variables are effectively part of the perl context */
298/* and get copied between struct coro and these variables */
299/* the mainr easonw e don't support windows process emulation */
300static struct CoroSLF slf_frame; /* the current slf frame */
48 301
49/* mostly copied from op.c:cv_clone2 */ 302/** Coro ********************************************************************/
50STATIC AV * 303
51clone_padlist (AV *protopadlist) 304#define PRIO_MAX 3
305#define PRIO_HIGH 1
306#define PRIO_NORMAL 0
307#define PRIO_LOW -1
308#define PRIO_IDLE -3
309#define PRIO_MIN -4
310
311/* for Coro.pm */
312static SV *coro_current;
313static SV *coro_readyhook;
314static AV *coro_ready [PRIO_MAX - PRIO_MIN + 1];
315static CV *cv_coro_run, *cv_coro_terminate;
316static struct coro *coro_first;
317#define coro_nready coroapi.nready
318
319/** lowlevel stuff **********************************************************/
320
321static SV *
322coro_get_sv (pTHX_ const char *name, int create)
52{ 323{
53 AV *av; 324#if PERL_VERSION_ATLEAST (5,10,0)
54 I32 ix; 325 /* silence stupid and wrong 5.10 warning that I am unable to switch off */
55 AV *protopad_name = (AV *) * av_fetch (protopadlist, 0, FALSE); 326 get_sv (name, create);
56 AV *protopad = (AV *) * av_fetch (protopadlist, 1, FALSE); 327#endif
57 SV **pname = AvARRAY (protopad_name); 328 return get_sv (name, create);
58 SV **ppad = AvARRAY (protopad); 329}
59 I32 fname = AvFILLp (protopad_name); 330
60 I32 fpad = AvFILLp (protopad); 331static AV *
332coro_get_av (pTHX_ const char *name, int create)
333{
334#if PERL_VERSION_ATLEAST (5,10,0)
335 /* silence stupid and wrong 5.10 warning that I am unable to switch off */
336 get_av (name, create);
337#endif
338 return get_av (name, create);
339}
340
341static HV *
342coro_get_hv (pTHX_ const char *name, int create)
343{
344#if PERL_VERSION_ATLEAST (5,10,0)
345 /* silence stupid and wrong 5.10 warning that I am unable to switch off */
346 get_hv (name, create);
347#endif
348 return get_hv (name, create);
349}
350
351/* may croak */
352INLINE CV *
353coro_sv_2cv (pTHX_ SV *sv)
354{
355 HV *st;
356 GV *gvp;
357 return sv_2cv (sv, &st, &gvp, 0);
358}
359
360/*****************************************************************************/
361/* magic glue */
362
363#define CORO_MAGIC_type_cv 26
364#define CORO_MAGIC_type_state PERL_MAGIC_ext
365
366#define CORO_MAGIC_NN(sv, type) \
367 (expect_true (SvMAGIC (sv)->mg_type == type) \
368 ? SvMAGIC (sv) \
369 : mg_find (sv, type))
370
371#define CORO_MAGIC(sv, type) \
372 (expect_true (SvMAGIC (sv)) \
373 ? CORO_MAGIC_NN (sv, type) \
374 : 0)
375
376#define CORO_MAGIC_cv(cv) CORO_MAGIC (((SV *)(cv)), CORO_MAGIC_type_cv)
377#define CORO_MAGIC_state(sv) CORO_MAGIC_NN (((SV *)(sv)), CORO_MAGIC_type_state)
378
379INLINE struct coro *
380SvSTATE_ (pTHX_ SV *coro)
381{
382 HV *stash;
383 MAGIC *mg;
384
385 if (SvROK (coro))
386 coro = SvRV (coro);
387
388 if (expect_false (SvTYPE (coro) != SVt_PVHV))
389 croak ("Coro::State object required");
390
391 stash = SvSTASH (coro);
392 if (expect_false (stash != coro_stash && stash != coro_state_stash))
393 {
394 /* very slow, but rare, check */
395 if (!sv_derived_from (sv_2mortal (newRV_inc (coro)), "Coro::State"))
396 croak ("Coro::State object required");
397 }
398
399 mg = CORO_MAGIC_state (coro);
400 return (struct coro *)mg->mg_ptr;
401}
402
403#define SvSTATE(sv) SvSTATE_ (aTHX_ (sv))
404
405/* faster than SvSTATE, but expects a coroutine hv */
406#define SvSTATE_hv(hv) ((struct coro *)CORO_MAGIC_NN ((SV *)hv, CORO_MAGIC_type_state)->mg_ptr)
407#define SvSTATE_current SvSTATE_hv (SvRV (coro_current))
408
409/*****************************************************************************/
410/* padlist management and caching */
411
412static AV *
413coro_derive_padlist (pTHX_ CV *cv)
414{
415 AV *padlist = CvPADLIST (cv);
61 AV *newpadlist, *newpad_name, *newpad; 416 AV *newpadlist, *newpad;
62 SV **npad;
63
64 newpad_name = newAV ();
65 for (ix = fname; ix >= 0; ix--)
66 av_store (newpad_name, ix, SvREFCNT_inc (pname[ix]));
67
68 newpad = newAV ();
69 av_fill (newpad, AvFILLp (protopad));
70 npad = AvARRAY (newpad);
71 417
72 newpadlist = newAV (); 418 newpadlist = newAV ();
73 AvREAL_off (newpadlist); 419 AvREAL_off (newpadlist);
74 av_store (newpadlist, 0, (SV *) newpad_name); 420#if PERL_VERSION_ATLEAST (5,10,0)
421 Perl_pad_push (aTHX_ padlist, AvFILLp (padlist) + 1);
422#else
423 Perl_pad_push (aTHX_ padlist, AvFILLp (padlist) + 1, 1);
424#endif
425 newpad = (AV *)AvARRAY (padlist)[AvFILLp (padlist)];
426 --AvFILLp (padlist);
427
428 av_store (newpadlist, 0, SvREFCNT_inc_NN (AvARRAY (padlist)[0]));
75 av_store (newpadlist, 1, (SV *) newpad); 429 av_store (newpadlist, 1, (SV *)newpad);
76 430
77 av = newAV (); /* will be @_ */ 431 return newpadlist;
78 av_extend (av, 0); 432}
79 av_store (newpad, 0, (SV *) av);
80 AvFLAGS (av) = AVf_REIFY;
81 433
82 for (ix = fpad; ix > 0; ix--) 434static void
435free_padlist (pTHX_ AV *padlist)
436{
437 /* may be during global destruction */
438 if (!IN_DESTRUCT)
83 { 439 {
84 SV *namesv = (ix <= fname) ? pname[ix] : Nullsv; 440 I32 i = AvFILLp (padlist);
85 if (namesv && namesv != &PL_sv_undef) 441
442 while (i > 0) /* special-case index 0 */
86 { 443 {
87 char *name = SvPVX (namesv); /* XXX */ 444 /* we try to be extra-careful here */
88 if (SvFLAGS (namesv) & SVf_FAKE || *name == '&') 445 AV *av = (AV *)AvARRAY (padlist)[i--];
89 { /* lexical from outside? */ 446 I32 j = AvFILLp (av);
90 npad[ix] = SvREFCNT_inc (ppad[ix]); 447
91 } 448 while (j >= 0)
92 else 449 SvREFCNT_dec (AvARRAY (av)[j--]);
93 { /* our own lexical */ 450
94 SV *sv; 451 AvFILLp (av) = -1;
95 if (*name == '&') 452 SvREFCNT_dec (av);
96 sv = SvREFCNT_inc (ppad[ix]);
97 else if (*name == '@')
98 sv = (SV *) newAV ();
99 else if (*name == '%')
100 sv = (SV *) newHV ();
101 else
102 sv = NEWSV (0, 0);
103 if (!SvPADBUSY (sv))
104 SvPADMY_on (sv);
105 npad[ix] = sv;
106 }
107 } 453 }
108 else if (IS_PADGV (ppad[ix]) || IS_PADCONST (ppad[ix]))
109 {
110 npad[ix] = SvREFCNT_inc (ppad[ix]);
111 }
112 else
113 {
114 SV *sv = NEWSV (0, 0);
115 SvPADTMP_on (sv);
116 npad[ix] = sv;
117 }
118 }
119 454
120#if 0 /* NONOTUNDERSTOOD */ 455 SvREFCNT_dec (AvARRAY (padlist)[0]);
121 /* Now that vars are all in place, clone nested closures. */
122 456
123 for (ix = fpad; ix > 0; ix--) {
124 SV* namesv = (ix <= fname) ? pname[ix] : Nullsv;
125 if (namesv
126 && namesv != &PL_sv_undef
127 && !(SvFLAGS(namesv) & SVf_FAKE)
128 && *SvPVX(namesv) == '&'
129 && CvCLONE(ppad[ix]))
130 {
131 CV *kid = cv_clone((CV*)ppad[ix]);
132 SvREFCNT_dec(ppad[ix]);
133 CvCLONE_on(kid);
134 SvPADMY_on(kid);
135 npad[ix] = (SV*)kid;
136 }
137 }
138#endif
139
140 return newpadlist;
141}
142
143STATIC AV *
144free_padlist (AV *padlist)
145{
146 /* may be during global destruction */
147 if (SvREFCNT(padlist))
148 {
149 I32 i = AvFILLp(padlist); 457 AvFILLp (padlist) = -1;
150 while (i >= 0)
151 {
152 SV **svp = av_fetch(padlist, i--, FALSE);
153 SV *sv = svp ? *svp : Nullsv;
154 if (sv)
155 SvREFCNT_dec(sv);
156 }
157
158 SvREFCNT_dec((SV*)padlist); 458 SvREFCNT_dec ((SV*)padlist);
459 }
460}
461
462static int
463coro_cv_free (pTHX_ SV *sv, MAGIC *mg)
464{
465 AV *padlist;
466 AV *av = (AV *)mg->mg_obj;
467
468 /* casting is fun. */
469 while (&PL_sv_undef != (SV *)(padlist = (AV *)av_pop (av)))
470 free_padlist (aTHX_ padlist);
471
472 SvREFCNT_dec (av); /* sv_magicext increased the refcount */
473
474 return 0;
475}
476
477static MGVTBL coro_cv_vtbl = {
478 0, 0, 0, 0,
479 coro_cv_free
480};
481
482/* the next two functions merely cache the padlists */
483static void
484get_padlist (pTHX_ CV *cv)
485{
486 MAGIC *mg = CORO_MAGIC_cv (cv);
487 AV *av;
488
489 if (expect_true (mg && AvFILLp ((av = (AV *)mg->mg_obj)) >= 0))
490 CvPADLIST (cv) = (AV *)AvARRAY (av)[AvFILLp (av)--];
491 else
492 {
493#if CORO_PREFER_PERL_FUNCTIONS
494 /* this is probably cleaner? but also slower! */
495 /* in practise, it seems to be less stable */
496 CV *cp = Perl_cv_clone (aTHX_ cv);
497 CvPADLIST (cv) = CvPADLIST (cp);
498 CvPADLIST (cp) = 0;
499 SvREFCNT_dec (cp);
500#else
501 CvPADLIST (cv) = coro_derive_padlist (aTHX_ cv);
502#endif
503 }
504}
505
506static void
507put_padlist (pTHX_ CV *cv)
508{
509 MAGIC *mg = CORO_MAGIC_cv (cv);
510 AV *av;
511
512 if (expect_false (!mg))
513 mg = sv_magicext ((SV *)cv, (SV *)newAV (), CORO_MAGIC_type_cv, &coro_cv_vtbl, 0, 0);
514
515 av = (AV *)mg->mg_obj;
516
517 if (expect_false (AvFILLp (av) >= AvMAX (av)))
518 av_extend (av, AvFILLp (av) + 1);
519
520 AvARRAY (av)[++AvFILLp (av)] = (SV *)CvPADLIST (cv);
521}
522
523/** load & save, init *******************************************************/
524
525static void
526load_perl (pTHX_ Coro__State c)
527{
528 perl_slots *slot = c->slot;
529 c->slot = 0;
530
531 PL_mainstack = c->mainstack;
532
533 GvSV (PL_defgv) = slot->defsv;
534 GvAV (PL_defgv) = slot->defav;
535 GvSV (PL_errgv) = slot->errsv;
536 GvSV (irsgv) = slot->irsgv;
537 GvHV (PL_hintgv) = slot->hinthv;
538
539 #define VAR(name,type) PL_ ## name = slot->name;
540 # include "state.h"
541 #undef VAR
542
543 {
544 dSP;
545
546 CV *cv;
547
548 /* now do the ugly restore mess */
549 while (expect_true (cv = (CV *)POPs))
550 {
551 put_padlist (aTHX_ cv); /* mark this padlist as available */
552 CvDEPTH (cv) = PTR2IV (POPs);
553 CvPADLIST (cv) = (AV *)POPs;
554 }
555
556 PUTBACK;
159 } 557 }
160}
161 558
162/* the next tow functions merely cache the padlists */ 559 slf_frame = c->slf_frame;
163STATIC void 560 CORO_THROW = c->except;
164get_padlist (CV *cv)
165{
166 SV **he = hv_fetch (padlist_cache, (void *)&cv, sizeof (CV *), 0);
167
168 if (he && AvFILLp ((AV *)*he) >= 0)
169 CvPADLIST (cv) = (AV *)av_pop ((AV *)*he);
170 else
171 CvPADLIST (cv) = clone_padlist (CvPADLIST (cv));
172} 561}
173 562
174STATIC void
175put_padlist (CV *cv)
176{
177 SV **he = hv_fetch (padlist_cache, (void *)&cv, sizeof (CV *), 1);
178
179 if (SvTYPE (*he) != SVt_PVAV)
180 {
181 SvREFCNT_dec (*he);
182 *he = (SV *)newAV ();
183 }
184
185 av_push ((AV *)*he, (SV *)CvPADLIST (cv));
186}
187
188static void 563static void
189save_state(pTHX_ Coro__State c) 564save_perl (pTHX_ Coro__State c)
190{ 565{
566 c->except = CORO_THROW;
567 c->slf_frame = slf_frame;
568
191 { 569 {
192 dSP; 570 dSP;
193 I32 cxix = cxstack_ix; 571 I32 cxix = cxstack_ix;
572 PERL_CONTEXT *ccstk = cxstack;
194 PERL_SI *top_si = PL_curstackinfo; 573 PERL_SI *top_si = PL_curstackinfo;
195 PERL_CONTEXT *ccstk = cxstack;
196 574
197 /* 575 /*
198 * the worst thing you can imagine happens first - we have to save 576 * the worst thing you can imagine happens first - we have to save
199 * (and reinitialize) all cv's in the whole callchain :( 577 * (and reinitialize) all cv's in the whole callchain :(
200 */ 578 */
201 579
202 PUSHs (Nullsv); 580 XPUSHs (Nullsv);
203 /* this loop was inspired by pp_caller */ 581 /* this loop was inspired by pp_caller */
204 for (;;) 582 for (;;)
205 { 583 {
206 while (cxix >= 0) 584 while (expect_true (cxix >= 0))
207 { 585 {
208 PERL_CONTEXT *cx = &ccstk[cxix--]; 586 PERL_CONTEXT *cx = &ccstk[cxix--];
209 587
210 if (CxTYPE(cx) == CXt_SUB) 588 if (expect_true (CxTYPE (cx) == CXt_SUB || CxTYPE (cx) == CXt_FORMAT))
211 { 589 {
212 CV *cv = cx->blk_sub.cv; 590 CV *cv = cx->blk_sub.cv;
591
213 if (CvDEPTH(cv)) 592 if (expect_true (CvDEPTH (cv)))
214 { 593 {
215#ifdef USE_THREADS
216 XPUSHs ((SV *)CvOWNER(cv));
217#endif
218 EXTEND (SP, 3); 594 EXTEND (SP, 3);
219 PUSHs ((SV *)CvDEPTH(cv));
220 PUSHs ((SV *)CvPADLIST(cv)); 595 PUSHs ((SV *)CvPADLIST (cv));
596 PUSHs (INT2PTR (SV *, (IV)CvDEPTH (cv)));
221 PUSHs ((SV *)cv); 597 PUSHs ((SV *)cv);
222 598
223 get_padlist (cv);
224
225 CvDEPTH(cv) = 0; 599 CvDEPTH (cv) = 0;
226#ifdef USE_THREADS 600 get_padlist (aTHX_ cv);
227 CvOWNER(cv) = 0;
228 error must unlock this cv etc.. etc...
229 if you are here wondering about this error message then
230 the reason is that it will not work as advertised yet
231#endif
232 } 601 }
233 } 602 }
234 else if (CxTYPE(cx) == CXt_FORMAT) 603 }
604
605 if (expect_true (top_si->si_type == PERLSI_MAIN))
606 break;
607
608 top_si = top_si->si_prev;
609 ccstk = top_si->si_cxstack;
610 cxix = top_si->si_cxix;
611 }
612
613 PUTBACK;
614 }
615
616 /* allocate some space on the context stack for our purposes */
617 /* we manually unroll here, as usually 2 slots is enough */
618 if (SLOT_COUNT >= 1) CXINC;
619 if (SLOT_COUNT >= 2) CXINC;
620 if (SLOT_COUNT >= 3) CXINC;
621 {
622 int i;
623 for (i = 3; i < SLOT_COUNT; ++i)
624 CXINC;
625 }
626 cxstack_ix -= SLOT_COUNT; /* undo allocation */
627
628 c->mainstack = PL_mainstack;
629
630 {
631 perl_slots *slot = c->slot = (perl_slots *)(cxstack + cxstack_ix + 1);
632
633 slot->defav = GvAV (PL_defgv);
634 slot->defsv = DEFSV;
635 slot->errsv = ERRSV;
636 slot->irsgv = GvSV (irsgv);
637 slot->hinthv = GvHV (PL_hintgv);
638
639 #define VAR(name,type) slot->name = PL_ ## name;
640 # include "state.h"
641 #undef VAR
642 }
643}
644
645/*
646 * allocate various perl stacks. This is almost an exact copy
647 * of perl.c:init_stacks, except that it uses less memory
648 * on the (sometimes correct) assumption that coroutines do
649 * not usually need a lot of stackspace.
650 */
651#if CORO_PREFER_PERL_FUNCTIONS
652# define coro_init_stacks(thx) init_stacks ()
653#else
654static void
655coro_init_stacks (pTHX)
656{
657 PL_curstackinfo = new_stackinfo(32, 8);
658 PL_curstackinfo->si_type = PERLSI_MAIN;
659 PL_curstack = PL_curstackinfo->si_stack;
660 PL_mainstack = PL_curstack; /* remember in case we switch stacks */
661
662 PL_stack_base = AvARRAY(PL_curstack);
663 PL_stack_sp = PL_stack_base;
664 PL_stack_max = PL_stack_base + AvMAX(PL_curstack);
665
666 New(50,PL_tmps_stack,32,SV*);
667 PL_tmps_floor = -1;
668 PL_tmps_ix = -1;
669 PL_tmps_max = 32;
670
671 New(54,PL_markstack,16,I32);
672 PL_markstack_ptr = PL_markstack;
673 PL_markstack_max = PL_markstack + 16;
674
675#ifdef SET_MARK_OFFSET
676 SET_MARK_OFFSET;
677#endif
678
679 New(54,PL_scopestack,8,I32);
680 PL_scopestack_ix = 0;
681 PL_scopestack_max = 8;
682
683 New(54,PL_savestack,24,ANY);
684 PL_savestack_ix = 0;
685 PL_savestack_max = 24;
686
687#if !PERL_VERSION_ATLEAST (5,10,0)
688 New(54,PL_retstack,4,OP*);
689 PL_retstack_ix = 0;
690 PL_retstack_max = 4;
691#endif
692}
693#endif
694
695/*
696 * destroy the stacks, the callchain etc...
697 */
698static void
699coro_destruct_stacks (pTHX)
700{
701 while (PL_curstackinfo->si_next)
702 PL_curstackinfo = PL_curstackinfo->si_next;
703
704 while (PL_curstackinfo)
705 {
706 PERL_SI *p = PL_curstackinfo->si_prev;
707
708 if (!IN_DESTRUCT)
709 SvREFCNT_dec (PL_curstackinfo->si_stack);
710
711 Safefree (PL_curstackinfo->si_cxstack);
712 Safefree (PL_curstackinfo);
713 PL_curstackinfo = p;
714 }
715
716 Safefree (PL_tmps_stack);
717 Safefree (PL_markstack);
718 Safefree (PL_scopestack);
719 Safefree (PL_savestack);
720#if !PERL_VERSION_ATLEAST (5,10,0)
721 Safefree (PL_retstack);
722#endif
723}
724
725#define CORO_RSS \
726 rss += sizeof (SYM (curstackinfo)); \
727 rss += (SYM (curstackinfo->si_cxmax) + 1) * sizeof (PERL_CONTEXT); \
728 rss += sizeof (SV) + sizeof (struct xpvav) + (1 + AvMAX (SYM (curstack))) * sizeof (SV *); \
729 rss += SYM (tmps_max) * sizeof (SV *); \
730 rss += (SYM (markstack_max) - SYM (markstack_ptr)) * sizeof (I32); \
731 rss += SYM (scopestack_max) * sizeof (I32); \
732 rss += SYM (savestack_max) * sizeof (ANY);
733
734static size_t
735coro_rss (pTHX_ struct coro *coro)
736{
737 size_t rss = sizeof (*coro);
738
739 if (coro->mainstack)
740 {
741 if (coro->flags & CF_RUNNING)
742 {
743 #define SYM(sym) PL_ ## sym
744 CORO_RSS;
745 #undef SYM
746 }
747 else
748 {
749 #define SYM(sym) coro->slot->sym
750 CORO_RSS;
751 #undef SYM
752 }
753 }
754
755 return rss;
756}
757
758/** coroutine stack handling ************************************************/
759
760static int (*orig_sigelem_get) (pTHX_ SV *sv, MAGIC *mg);
761static int (*orig_sigelem_set) (pTHX_ SV *sv, MAGIC *mg);
762static int (*orig_sigelem_clr) (pTHX_ SV *sv, MAGIC *mg);
763
764/* apparently < 5.8.8 */
765#ifndef MgPV_nolen_const
766#define MgPV_nolen_const(mg) (((((int)(mg)->mg_len)) == HEf_SVKEY) ? \
767 SvPV_nolen((SV*)((mg)->mg_ptr)) : \
768 (const char*)(mg)->mg_ptr)
769#endif
770
771/*
772 * This overrides the default magic get method of %SIG elements.
773 * The original one doesn't provide for reading back of PL_diehook/PL_warnhook
774 * and instead of tryign to save and restore the hash elements, we just provide
775 * readback here.
776 * We only do this when the hook is != 0, as they are often set to 0 temporarily,
777 * not expecting this to actually change the hook. This is a potential problem
778 * when a schedule happens then, but we ignore this.
779 */
780static int
781coro_sigelem_get (pTHX_ SV *sv, MAGIC *mg)
782{
783 const char *s = MgPV_nolen_const (mg);
784
785 if (*s == '_')
786 {
787 SV **svp = 0;
788
789 if (strEQ (s, "__DIE__" )) svp = &PL_diehook;
790 if (strEQ (s, "__WARN__")) svp = &PL_warnhook;
791
792 if (svp)
793 {
794 sv_setsv (sv, *svp ? *svp : &PL_sv_undef);
795 return 0;
796 }
797 }
798
799 return orig_sigelem_get ? orig_sigelem_get (aTHX_ sv, mg) : 0;
800}
801
802static int
803coro_sigelem_clr (pTHX_ SV *sv, MAGIC *mg)
804{
805 const char *s = MgPV_nolen_const (mg);
806
807 if (*s == '_')
808 {
809 SV **svp = 0;
810
811 if (strEQ (s, "__DIE__" )) svp = &PL_diehook;
812 if (strEQ (s, "__WARN__")) svp = &PL_warnhook;
813
814 if (svp)
815 {
816 SV *old = *svp;
817 *svp = 0;
818 SvREFCNT_dec (old);
819 return 0;
820 }
821 }
822
823 return orig_sigelem_clr ? orig_sigelem_clr (aTHX_ sv, mg) : 0;
824}
825
826static int
827coro_sigelem_set (pTHX_ SV *sv, MAGIC *mg)
828{
829 const char *s = MgPV_nolen_const (mg);
830
831 if (*s == '_')
832 {
833 SV **svp = 0;
834
835 if (strEQ (s, "__DIE__" )) svp = &PL_diehook;
836 if (strEQ (s, "__WARN__")) svp = &PL_warnhook;
837
838 if (svp)
839 {
840 SV *old = *svp;
841 *svp = newSVsv (sv);
842 SvREFCNT_dec (old);
843 return 0;
844 }
845 }
846
847 return orig_sigelem_set ? orig_sigelem_set (aTHX_ sv, mg) : 0;
848}
849
850static void
851prepare_nop (pTHX_ struct coro_transfer_args *ta)
852{
853 /* kind of mega-hacky, but works */
854 ta->next = ta->prev = (struct coro *)ta;
855}
856
857static int
858slf_check_nop (pTHX_ struct CoroSLF *frame)
859{
860 return 0;
861}
862
863static int
864slf_check_repeat (pTHX_ struct CoroSLF *frame)
865{
866 return 1;
867}
868
869static UNOP coro_setup_op;
870
871static void NOINLINE /* noinline to keep it out of the transfer fast path */
872coro_setup (pTHX_ struct coro *coro)
873{
874 /*
875 * emulate part of the perl startup here.
876 */
877 coro_init_stacks (aTHX);
878
879 PL_runops = RUNOPS_DEFAULT;
880 PL_curcop = &PL_compiling;
881 PL_in_eval = EVAL_NULL;
882 PL_comppad = 0;
883 PL_comppad_name = 0;
884 PL_comppad_name_fill = 0;
885 PL_comppad_name_floor = 0;
886 PL_curpm = 0;
887 PL_curpad = 0;
888 PL_localizing = 0;
889 PL_dirty = 0;
890 PL_restartop = 0;
891#if PERL_VERSION_ATLEAST (5,10,0)
892 PL_parser = 0;
893#endif
894 PL_hints = 0;
895
896 /* recreate the die/warn hooks */
897 PL_diehook = 0; SvSetMagicSV (*hv_fetch (hv_sig, "__DIE__" , sizeof ("__DIE__" ) - 1, 1), rv_diehook );
898 PL_warnhook = 0; SvSetMagicSV (*hv_fetch (hv_sig, "__WARN__", sizeof ("__WARN__") - 1, 1), rv_warnhook);
899
900 GvSV (PL_defgv) = newSV (0);
901 GvAV (PL_defgv) = coro->args; coro->args = 0;
902 GvSV (PL_errgv) = newSV (0);
903 GvSV (irsgv) = newSVpvn ("\n", 1); sv_magic (GvSV (irsgv), (SV *)irsgv, PERL_MAGIC_sv, "/", 0);
904 GvHV (PL_hintgv) = 0;
905 PL_rs = newSVsv (GvSV (irsgv));
906 PL_defoutgv = (GV *)SvREFCNT_inc_NN (stdoutgv);
907
908 {
909 dSP;
910 UNOP myop;
911
912 Zero (&myop, 1, UNOP);
913 myop.op_next = Nullop;
914 myop.op_type = OP_ENTERSUB;
915 myop.op_flags = OPf_WANT_VOID;
916
917 PUSHMARK (SP);
918 PUSHs ((SV *)coro->startcv);
919 PUTBACK;
920 PL_op = (OP *)&myop;
921 PL_op = PL_ppaddr[OP_ENTERSUB](aTHX);
922 }
923
924 /* this newly created coroutine might be run on an existing cctx which most
925 * likely was suspended in pp_slf, so we have to emulate entering pp_slf here.
926 */
927 slf_frame.prepare = prepare_nop; /* provide a nop function for an eventual pp_slf */
928 slf_frame.check = slf_check_nop; /* signal pp_slf to not repeat */
929
930 /* and we have to provide the pp_slf op in any case, so pp_slf can skip it */
931 coro_setup_op.op_next = PL_op;
932 coro_setup_op.op_type = OP_ENTERSUB;
933 coro_setup_op.op_ppaddr = pp_slf;
934 /* no flags etc. required, as an init function won't be called */
935
936 PL_op = (OP *)&coro_setup_op;
937
938 /* copy throw, in case it was set before coro_setup */
939 CORO_THROW = coro->except;
940}
941
942static void
943coro_unwind_stacks (pTHX)
944{
945 if (!IN_DESTRUCT)
946 {
947 /* restore all saved variables and stuff */
948 LEAVE_SCOPE (0);
949 assert (PL_tmps_floor == -1);
950
951 /* free all temporaries */
952 FREETMPS;
953 assert (PL_tmps_ix == -1);
954
955 /* unwind all extra stacks */
956 POPSTACK_TO (PL_mainstack);
957
958 /* unwind main stack */
959 dounwind (-1);
960 }
961}
962
963static void
964coro_destruct_perl (pTHX_ struct coro *coro)
965{
966 coro_unwind_stacks (aTHX);
967
968 SvREFCNT_dec (GvSV (PL_defgv));
969 SvREFCNT_dec (GvAV (PL_defgv));
970 SvREFCNT_dec (GvSV (PL_errgv));
971 SvREFCNT_dec (PL_defoutgv);
972 SvREFCNT_dec (PL_rs);
973 SvREFCNT_dec (GvSV (irsgv));
974 SvREFCNT_dec (GvHV (PL_hintgv));
975
976 SvREFCNT_dec (PL_diehook);
977 SvREFCNT_dec (PL_warnhook);
978
979 SvREFCNT_dec (coro->saved_deffh);
980 SvREFCNT_dec (coro->rouse_cb);
981 SvREFCNT_dec (coro->invoke_cb);
982 SvREFCNT_dec (coro->invoke_av);
983
984 coro_destruct_stacks (aTHX);
985}
986
987INLINE void
988free_coro_mortal (pTHX)
989{
990 if (expect_true (coro_mortal))
991 {
992 SvREFCNT_dec (coro_mortal);
993 coro_mortal = 0;
994 }
995}
996
997static int
998runops_trace (pTHX)
999{
1000 COP *oldcop = 0;
1001 int oldcxix = -2;
1002
1003 while ((PL_op = CALL_FPTR (PL_op->op_ppaddr) (aTHX)))
1004 {
1005 PERL_ASYNC_CHECK ();
1006
1007 if (cctx_current->flags & CC_TRACE_ALL)
1008 {
1009 if (PL_op->op_type == OP_LEAVESUB && cctx_current->flags & CC_TRACE_SUB)
1010 {
1011 PERL_CONTEXT *cx = &cxstack[cxstack_ix];
1012 SV **bot, **top;
1013 AV *av = newAV (); /* return values */
1014 SV **cb;
1015 dSP;
1016
1017 GV *gv = CvGV (cx->blk_sub.cv);
1018 SV *fullname = sv_2mortal (newSV (0));
1019 if (isGV (gv))
1020 gv_efullname3 (fullname, gv, 0);
1021
1022 bot = PL_stack_base + cx->blk_oldsp + 1;
1023 top = cx->blk_gimme == G_ARRAY ? SP + 1
1024 : cx->blk_gimme == G_SCALAR ? bot + 1
1025 : bot;
1026
1027 av_extend (av, top - bot);
1028 while (bot < top)
1029 av_push (av, SvREFCNT_inc_NN (*bot++));
1030
1031 PL_runops = RUNOPS_DEFAULT;
1032 ENTER;
1033 SAVETMPS;
1034 EXTEND (SP, 3);
1035 PUSHMARK (SP);
1036 PUSHs (&PL_sv_no);
1037 PUSHs (fullname);
1038 PUSHs (sv_2mortal (newRV_noinc ((SV *)av)));
1039 PUTBACK;
1040 cb = hv_fetch ((HV *)SvRV (coro_current), "_trace_sub_cb", sizeof ("_trace_sub_cb") - 1, 0);
1041 if (cb) call_sv (*cb, G_KEEPERR | G_EVAL | G_VOID | G_DISCARD);
1042 SPAGAIN;
1043 FREETMPS;
1044 LEAVE;
1045 PL_runops = runops_trace;
1046 }
1047
1048 if (oldcop != PL_curcop)
1049 {
1050 oldcop = PL_curcop;
1051
1052 if (PL_curcop != &PL_compiling)
1053 {
1054 SV **cb;
1055
1056 if (oldcxix != cxstack_ix && cctx_current->flags & CC_TRACE_SUB)
1057 {
1058 PERL_CONTEXT *cx = &cxstack[cxstack_ix];
1059
1060 if (CxTYPE (cx) == CXt_SUB && oldcxix < cxstack_ix)
1061 {
1062 dSP;
1063 GV *gv = CvGV (cx->blk_sub.cv);
1064 SV *fullname = sv_2mortal (newSV (0));
1065
1066 if (isGV (gv))
1067 gv_efullname3 (fullname, gv, 0);
1068
1069 PL_runops = RUNOPS_DEFAULT;
1070 ENTER;
1071 SAVETMPS;
1072 EXTEND (SP, 3);
1073 PUSHMARK (SP);
1074 PUSHs (&PL_sv_yes);
1075 PUSHs (fullname);
1076 PUSHs (CxHASARGS (cx) ? sv_2mortal (newRV_inc ((SV *)cx->blk_sub.argarray)) : &PL_sv_undef);
1077 PUTBACK;
1078 cb = hv_fetch ((HV *)SvRV (coro_current), "_trace_sub_cb", sizeof ("_trace_sub_cb") - 1, 0);
1079 if (cb) call_sv (*cb, G_KEEPERR | G_EVAL | G_VOID | G_DISCARD);
1080 SPAGAIN;
1081 FREETMPS;
1082 LEAVE;
1083 PL_runops = runops_trace;
1084 }
1085
1086 oldcxix = cxstack_ix;
1087 }
1088
1089 if (cctx_current->flags & CC_TRACE_LINE)
1090 {
1091 dSP;
1092
1093 PL_runops = RUNOPS_DEFAULT;
1094 ENTER;
1095 SAVETMPS;
1096 EXTEND (SP, 3);
1097 PL_runops = RUNOPS_DEFAULT;
1098 PUSHMARK (SP);
1099 PUSHs (sv_2mortal (newSVpv (OutCopFILE (oldcop), 0)));
1100 PUSHs (sv_2mortal (newSViv (CopLINE (oldcop))));
1101 PUTBACK;
1102 cb = hv_fetch ((HV *)SvRV (coro_current), "_trace_line_cb", sizeof ("_trace_line_cb") - 1, 0);
1103 if (cb) call_sv (*cb, G_KEEPERR | G_EVAL | G_VOID | G_DISCARD);
1104 SPAGAIN;
1105 FREETMPS;
1106 LEAVE;
1107 PL_runops = runops_trace;
1108 }
1109 }
1110 }
1111 }
1112 }
1113
1114 TAINT_NOT;
1115 return 0;
1116}
1117
1118static struct CoroSLF cctx_ssl_frame;
1119
1120static void
1121slf_prepare_set_stacklevel (pTHX_ struct coro_transfer_args *ta)
1122{
1123 ta->prev = 0;
1124}
1125
1126static int
1127slf_check_set_stacklevel (pTHX_ struct CoroSLF *frame)
1128{
1129 *frame = cctx_ssl_frame;
1130
1131 return frame->check (aTHX_ frame); /* execute the restored frame - there must be one */
1132}
1133
1134/* initialises PL_top_env and injects a pseudo-slf-call to set the stacklevel */
1135static void NOINLINE
1136cctx_prepare (pTHX)
1137{
1138 PL_top_env = &PL_start_env;
1139
1140 if (cctx_current->flags & CC_TRACE)
1141 PL_runops = runops_trace;
1142
1143 /* we already must be executing an SLF op, there is no other valid way
1144 * that can lead to creation of a new cctx */
1145 assert (("FATAL: can't prepare slf-less cctx in Coro module (please report)",
1146 slf_frame.prepare && PL_op->op_ppaddr == pp_slf));
1147
1148 /* we must emulate leaving pp_slf, which is done inside slf_check_set_stacklevel */
1149 cctx_ssl_frame = slf_frame;
1150
1151 slf_frame.prepare = slf_prepare_set_stacklevel;
1152 slf_frame.check = slf_check_set_stacklevel;
1153}
1154
1155/* the tail of transfer: execute stuff we can only do after a transfer */
1156INLINE void
1157transfer_tail (pTHX)
1158{
1159 free_coro_mortal (aTHX);
1160}
1161
1162/*
1163 * this is a _very_ stripped down perl interpreter ;)
1164 */
1165static void
1166cctx_run (void *arg)
1167{
1168#ifdef USE_ITHREADS
1169# if CORO_PTHREAD
1170 PERL_SET_CONTEXT (coro_thx);
1171# endif
1172#endif
1173 {
1174 dTHX;
1175
1176 /* normally we would need to skip the entersub here */
1177 /* not doing so will re-execute it, which is exactly what we want */
1178 /* PL_nop = PL_nop->op_next */
1179
1180 /* inject a fake subroutine call to cctx_init */
1181 cctx_prepare (aTHX);
1182
1183 /* cctx_run is the alternative tail of transfer() */
1184 transfer_tail (aTHX);
1185
1186 /* somebody or something will hit me for both perl_run and PL_restartop */
1187 PL_restartop = PL_op;
1188 perl_run (PL_curinterp);
1189 /*
1190 * Unfortunately, there is no way to get at the return values of the
1191 * coro body here, as perl_run destroys these
1192 */
1193
1194 /*
1195 * If perl-run returns we assume exit() was being called or the coro
1196 * fell off the end, which seems to be the only valid (non-bug)
1197 * reason for perl_run to return. We try to exit by jumping to the
1198 * bootstrap-time "top" top_env, as we cannot restore the "main"
1199 * coroutine as Coro has no such concept.
1200 * This actually isn't valid with the pthread backend, but OSes requiring
1201 * that backend are too broken to do it in a standards-compliant way.
1202 */
1203 PL_top_env = main_top_env;
1204 JMPENV_JUMP (2); /* I do not feel well about the hardcoded 2 at all */
1205 }
1206}
1207
1208static coro_cctx *
1209cctx_new ()
1210{
1211 coro_cctx *cctx;
1212
1213 ++cctx_count;
1214 New (0, cctx, 1, coro_cctx);
1215
1216 cctx->gen = cctx_gen;
1217 cctx->flags = 0;
1218 cctx->idle_sp = 0; /* can be accessed by transfer between cctx_run and set_stacklevel, on throw */
1219
1220 return cctx;
1221}
1222
1223/* create a new cctx only suitable as source */
1224static coro_cctx *
1225cctx_new_empty ()
1226{
1227 coro_cctx *cctx = cctx_new ();
1228
1229 cctx->sptr = 0;
1230 coro_create (&cctx->cctx, 0, 0, 0, 0);
1231
1232 return cctx;
1233}
1234
1235/* create a new cctx suitable as destination/running a perl interpreter */
1236static coro_cctx *
1237cctx_new_run ()
1238{
1239 coro_cctx *cctx = cctx_new ();
1240 void *stack_start;
1241 size_t stack_size;
1242
1243#if HAVE_MMAP
1244 cctx->ssize = ((cctx_stacksize * sizeof (long) + PAGESIZE - 1) / PAGESIZE + CORO_STACKGUARD) * PAGESIZE;
1245 /* mmap supposedly does allocate-on-write for us */
1246 cctx->sptr = mmap (0, cctx->ssize, PROT_EXEC|PROT_READ|PROT_WRITE, MAP_PRIVATE|MAP_ANONYMOUS, 0, 0);
1247
1248 if (cctx->sptr != (void *)-1)
1249 {
1250 #if CORO_STACKGUARD
1251 mprotect (cctx->sptr, CORO_STACKGUARD * PAGESIZE, PROT_NONE);
1252 #endif
1253 stack_start = (char *)cctx->sptr + CORO_STACKGUARD * PAGESIZE;
1254 stack_size = cctx->ssize - CORO_STACKGUARD * PAGESIZE;
1255 cctx->flags |= CC_MAPPED;
1256 }
1257 else
1258#endif
1259 {
1260 cctx->ssize = cctx_stacksize * (long)sizeof (long);
1261 New (0, cctx->sptr, cctx_stacksize, long);
1262
1263 if (!cctx->sptr)
1264 {
1265 perror ("FATAL: unable to allocate stack for coroutine, exiting.");
1266 _exit (EXIT_FAILURE);
1267 }
1268
1269 stack_start = cctx->sptr;
1270 stack_size = cctx->ssize;
1271 }
1272
1273 #if CORO_USE_VALGRIND
1274 cctx->valgrind_id = VALGRIND_STACK_REGISTER ((char *)stack_start, (char *)stack_start + stack_size);
1275 #endif
1276
1277 coro_create (&cctx->cctx, cctx_run, (void *)cctx, stack_start, stack_size);
1278
1279 return cctx;
1280}
1281
1282static void
1283cctx_destroy (coro_cctx *cctx)
1284{
1285 if (!cctx)
1286 return;
1287
1288 assert (("FATAL: tried to destroy current cctx", cctx != cctx_current));//D temporary?
1289
1290 --cctx_count;
1291 coro_destroy (&cctx->cctx);
1292
1293 /* coro_transfer creates new, empty cctx's */
1294 if (cctx->sptr)
1295 {
1296 #if CORO_USE_VALGRIND
1297 VALGRIND_STACK_DEREGISTER (cctx->valgrind_id);
1298 #endif
1299
1300#if HAVE_MMAP
1301 if (cctx->flags & CC_MAPPED)
1302 munmap (cctx->sptr, cctx->ssize);
1303 else
1304#endif
1305 Safefree (cctx->sptr);
1306 }
1307
1308 Safefree (cctx);
1309}
1310
1311/* wether this cctx should be destructed */
1312#define CCTX_EXPIRED(cctx) ((cctx)->gen != cctx_gen || ((cctx)->flags & CC_NOREUSE))
1313
1314static coro_cctx *
1315cctx_get (pTHX)
1316{
1317 while (expect_true (cctx_first))
1318 {
1319 coro_cctx *cctx = cctx_first;
1320 cctx_first = cctx->next;
1321 --cctx_idle;
1322
1323 if (expect_true (!CCTX_EXPIRED (cctx)))
1324 return cctx;
1325
1326 cctx_destroy (cctx);
1327 }
1328
1329 return cctx_new_run ();
1330}
1331
1332static void
1333cctx_put (coro_cctx *cctx)
1334{
1335 assert (("FATAL: cctx_put called on non-initialised cctx in Coro (please report)", cctx->sptr));
1336
1337 /* free another cctx if overlimit */
1338 if (expect_false (cctx_idle >= cctx_max_idle))
1339 {
1340 coro_cctx *first = cctx_first;
1341 cctx_first = first->next;
1342 --cctx_idle;
1343
1344 cctx_destroy (first);
1345 }
1346
1347 ++cctx_idle;
1348 cctx->next = cctx_first;
1349 cctx_first = cctx;
1350}
1351
1352/** coroutine switching *****************************************************/
1353
1354static void
1355transfer_check (pTHX_ struct coro *prev, struct coro *next)
1356{
1357 /* TODO: throwing up here is considered harmful */
1358
1359 if (expect_true (prev != next))
1360 {
1361 if (expect_false (!(prev->flags & (CF_RUNNING | CF_NEW))))
1362 croak ("Coro::State::transfer called with a suspended prev Coro::State, but can only transfer from running or new states,");
1363
1364 if (expect_false (next->flags & CF_RUNNING))
1365 croak ("Coro::State::transfer called with running next Coro::State, but can only transfer to inactive states,");
1366
1367 if (expect_false (next->flags & CF_DESTROYED))
1368 croak ("Coro::State::transfer called with destroyed next Coro::State, but can only transfer to inactive states,");
1369
1370#if !PERL_VERSION_ATLEAST (5,10,0)
1371 if (expect_false (PL_lex_state != LEX_NOTPARSING))
1372 croak ("Coro::State::transfer called while parsing, but this is not supported in your perl version,");
1373#endif
1374 }
1375}
1376
1377/* always use the TRANSFER macro */
1378static void NOINLINE /* noinline so we have a fixed stackframe */
1379transfer (pTHX_ struct coro *prev, struct coro *next, int force_cctx)
1380{
1381 dSTACKLEVEL;
1382
1383 /* sometimes transfer is only called to set idle_sp */
1384 if (expect_false (!prev))
1385 {
1386 cctx_current->idle_sp = STACKLEVEL;
1387 assert (cctx_current->idle_te = PL_top_env); /* just for the side-effect when asserts are enabled */
1388 }
1389 else if (expect_true (prev != next))
1390 {
1391 coro_cctx *cctx_prev;
1392
1393 if (expect_false (prev->flags & CF_NEW))
1394 {
1395 /* create a new empty/source context */
1396 prev->flags &= ~CF_NEW;
1397 prev->flags |= CF_RUNNING;
1398 }
1399
1400 prev->flags &= ~CF_RUNNING;
1401 next->flags |= CF_RUNNING;
1402
1403 /* first get rid of the old state */
1404 save_perl (aTHX_ prev);
1405
1406 if (expect_false (next->flags & CF_NEW))
1407 {
1408 /* need to start coroutine */
1409 next->flags &= ~CF_NEW;
1410 /* setup coroutine call */
1411 coro_setup (aTHX_ next);
1412 }
1413 else
1414 load_perl (aTHX_ next);
1415
1416 /* possibly untie and reuse the cctx */
1417 if (expect_true (
1418 cctx_current->idle_sp == STACKLEVEL
1419 && !(cctx_current->flags & CC_TRACE)
1420 && !force_cctx
1421 ))
1422 {
1423 /* I assume that stacklevel is a stronger indicator than PL_top_env changes */
1424 assert (("FATAL: current top_env must equal previous top_env in Coro (please report)", PL_top_env == cctx_current->idle_te));
1425
1426 /* if the cctx is about to be destroyed we need to make sure we won't see it in cctx_get. */
1427 /* without this the next cctx_get might destroy the running cctx while still in use */
1428 if (expect_false (CCTX_EXPIRED (cctx_current)))
1429 if (expect_true (!next->cctx))
1430 next->cctx = cctx_get (aTHX);
1431
1432 cctx_put (cctx_current);
1433 }
1434 else
1435 prev->cctx = cctx_current;
1436
1437 ++next->usecount;
1438
1439 cctx_prev = cctx_current;
1440 cctx_current = expect_false (next->cctx) ? next->cctx : cctx_get (aTHX);
1441
1442 next->cctx = 0;
1443
1444 if (expect_false (cctx_prev != cctx_current))
1445 {
1446 cctx_prev->top_env = PL_top_env;
1447 PL_top_env = cctx_current->top_env;
1448 coro_transfer (&cctx_prev->cctx, &cctx_current->cctx);
1449 }
1450
1451 transfer_tail (aTHX);
1452 }
1453}
1454
1455#define TRANSFER(ta, force_cctx) transfer (aTHX_ (ta).prev, (ta).next, (force_cctx))
1456#define TRANSFER_CHECK(ta) transfer_check (aTHX_ (ta).prev, (ta).next)
1457
1458/** high level stuff ********************************************************/
1459
1460static int
1461coro_state_destroy (pTHX_ struct coro *coro)
1462{
1463 if (coro->flags & CF_DESTROYED)
1464 return 0;
1465
1466 if (coro->on_destroy)
1467 coro->on_destroy (aTHX_ coro);
1468
1469 coro->flags |= CF_DESTROYED;
1470
1471 if (coro->flags & CF_READY)
1472 {
1473 /* reduce nready, as destroying a ready coro effectively unreadies it */
1474 /* alternative: look through all ready queues and remove the coro */
1475 --coro_nready;
1476 }
1477 else
1478 coro->flags |= CF_READY; /* make sure it is NOT put into the readyqueue */
1479
1480 if (coro->mainstack
1481 && coro->mainstack != main_mainstack
1482 && coro->slot
1483 && !PL_dirty)
1484 {
1485 struct coro temp;
1486
1487 assert (("FATAL: tried to destroy currently running coroutine", coro->mainstack != PL_mainstack));
1488
1489 save_perl (aTHX_ &temp);
1490 load_perl (aTHX_ coro);
1491
1492 coro_destruct_perl (aTHX_ coro);
1493
1494 load_perl (aTHX_ &temp);
1495
1496 coro->slot = 0;
1497 }
1498
1499 cctx_destroy (coro->cctx);
1500 SvREFCNT_dec (coro->startcv);
1501 SvREFCNT_dec (coro->args);
1502 SvREFCNT_dec (CORO_THROW);
1503
1504 if (coro->next) coro->next->prev = coro->prev;
1505 if (coro->prev) coro->prev->next = coro->next;
1506 if (coro == coro_first) coro_first = coro->next;
1507
1508 return 1;
1509}
1510
1511static int
1512coro_state_free (pTHX_ SV *sv, MAGIC *mg)
1513{
1514 struct coro *coro = (struct coro *)mg->mg_ptr;
1515 mg->mg_ptr = 0;
1516
1517 coro->hv = 0;
1518
1519 if (--coro->refcnt < 0)
1520 {
1521 coro_state_destroy (aTHX_ coro);
1522 Safefree (coro);
1523 }
1524
1525 return 0;
1526}
1527
1528static int
1529coro_state_dup (pTHX_ MAGIC *mg, CLONE_PARAMS *params)
1530{
1531 struct coro *coro = (struct coro *)mg->mg_ptr;
1532
1533 ++coro->refcnt;
1534
1535 return 0;
1536}
1537
1538static MGVTBL coro_state_vtbl = {
1539 0, 0, 0, 0,
1540 coro_state_free,
1541 0,
1542#ifdef MGf_DUP
1543 coro_state_dup,
1544#else
1545# define MGf_DUP 0
1546#endif
1547};
1548
1549static void
1550prepare_transfer (pTHX_ struct coro_transfer_args *ta, SV *prev_sv, SV *next_sv)
1551{
1552 ta->prev = SvSTATE (prev_sv);
1553 ta->next = SvSTATE (next_sv);
1554 TRANSFER_CHECK (*ta);
1555}
1556
1557static void
1558api_transfer (pTHX_ SV *prev_sv, SV *next_sv)
1559{
1560 struct coro_transfer_args ta;
1561
1562 prepare_transfer (aTHX_ &ta, prev_sv, next_sv);
1563 TRANSFER (ta, 1);
1564}
1565
1566/*****************************************************************************/
1567/* gensub: simple closure generation utility */
1568
1569#define GENSUB_ARG CvXSUBANY (cv).any_ptr
1570
1571/* create a closure from XS, returns a code reference */
1572/* the arg can be accessed via GENSUB_ARG from the callback */
1573/* the callback must use dXSARGS/XSRETURN */
1574static SV *
1575gensub (pTHX_ void (*xsub)(pTHX_ CV *), void *arg)
1576{
1577 CV *cv = (CV *)newSV (0);
1578
1579 sv_upgrade ((SV *)cv, SVt_PVCV);
1580
1581 CvANON_on (cv);
1582 CvISXSUB_on (cv);
1583 CvXSUB (cv) = xsub;
1584 GENSUB_ARG = arg;
1585
1586 return newRV_noinc ((SV *)cv);
1587}
1588
1589/** Coro ********************************************************************/
1590
1591INLINE void
1592coro_enq (pTHX_ struct coro *coro)
1593{
1594 av_push (coro_ready [coro->prio - PRIO_MIN], SvREFCNT_inc_NN (coro->hv));
1595}
1596
1597INLINE SV *
1598coro_deq (pTHX)
1599{
1600 int prio;
1601
1602 for (prio = PRIO_MAX - PRIO_MIN + 1; --prio >= 0; )
1603 if (AvFILLp (coro_ready [prio]) >= 0)
1604 return av_shift (coro_ready [prio]);
1605
1606 return 0;
1607}
1608
1609static int
1610api_ready (pTHX_ SV *coro_sv)
1611{
1612 struct coro *coro;
1613 SV *sv_hook;
1614 void (*xs_hook)(void);
1615
1616 coro = SvSTATE (coro_sv);
1617
1618 if (coro->flags & CF_READY)
1619 return 0;
1620
1621 coro->flags |= CF_READY;
1622
1623 sv_hook = coro_nready ? 0 : coro_readyhook;
1624 xs_hook = coro_nready ? 0 : coroapi.readyhook;
1625
1626 coro_enq (aTHX_ coro);
1627 ++coro_nready;
1628
1629 if (sv_hook)
1630 {
1631 dSP;
1632
1633 ENTER;
1634 SAVETMPS;
1635
1636 PUSHMARK (SP);
1637 PUTBACK;
1638 call_sv (sv_hook, G_VOID | G_DISCARD);
1639
1640 FREETMPS;
1641 LEAVE;
1642 }
1643
1644 if (xs_hook)
1645 xs_hook ();
1646
1647 return 1;
1648}
1649
1650static int
1651api_is_ready (pTHX_ SV *coro_sv)
1652{
1653 return !!(SvSTATE (coro_sv)->flags & CF_READY);
1654}
1655
1656/* expects to own a reference to next->hv */
1657INLINE void
1658prepare_schedule_to (pTHX_ struct coro_transfer_args *ta, struct coro *next)
1659{
1660 SV *prev_sv = SvRV (coro_current);
1661
1662 ta->prev = SvSTATE_hv (prev_sv);
1663 ta->next = next;
1664
1665 TRANSFER_CHECK (*ta);
1666
1667 SvRV_set (coro_current, (SV *)next->hv);
1668
1669 free_coro_mortal (aTHX);
1670 coro_mortal = prev_sv;
1671}
1672
1673static void
1674prepare_schedule (pTHX_ struct coro_transfer_args *ta)
1675{
1676 for (;;)
1677 {
1678 SV *next_sv = coro_deq (aTHX);
1679
1680 if (expect_true (next_sv))
1681 {
1682 struct coro *next = SvSTATE_hv (next_sv);
1683
1684 /* cannot transfer to destroyed coros, skip and look for next */
1685 if (expect_false (next->flags & CF_DESTROYED))
1686 SvREFCNT_dec (next_sv); /* coro_nready has already been taken care of by destroy */
1687 else
1688 {
1689 next->flags &= ~CF_READY;
1690 --coro_nready;
1691
1692 prepare_schedule_to (aTHX_ ta, next);
1693 break;
1694 }
1695 }
1696 else
1697 {
1698 /* nothing to schedule: call the idle handler */
1699 if (SvROK (sv_idle)
1700 && SvOBJECT (SvRV (sv_idle)))
1701 {
1702 ++coro_nready; /* hack so that api_ready doesn't invoke ready hook */
1703 api_ready (aTHX_ SvRV (sv_idle));
1704 --coro_nready;
1705 }
1706 else
1707 {
1708 dSP;
1709
1710 ENTER;
1711 SAVETMPS;
1712
1713 PUSHMARK (SP);
1714 PUTBACK;
1715 call_sv (sv_idle, G_VOID | G_DISCARD);
1716
1717 FREETMPS;
1718 LEAVE;
1719 }
1720 }
1721 }
1722}
1723
1724INLINE void
1725prepare_cede (pTHX_ struct coro_transfer_args *ta)
1726{
1727 api_ready (aTHX_ coro_current);
1728 prepare_schedule (aTHX_ ta);
1729}
1730
1731INLINE void
1732prepare_cede_notself (pTHX_ struct coro_transfer_args *ta)
1733{
1734 SV *prev = SvRV (coro_current);
1735
1736 if (coro_nready)
1737 {
1738 prepare_schedule (aTHX_ ta);
1739 api_ready (aTHX_ prev);
1740 }
1741 else
1742 prepare_nop (aTHX_ ta);
1743}
1744
1745static void
1746api_schedule (pTHX)
1747{
1748 struct coro_transfer_args ta;
1749
1750 prepare_schedule (aTHX_ &ta);
1751 TRANSFER (ta, 1);
1752}
1753
1754static void
1755api_schedule_to (pTHX_ SV *coro_sv)
1756{
1757 struct coro_transfer_args ta;
1758 struct coro *next = SvSTATE (coro_sv);
1759
1760 SvREFCNT_inc_NN (coro_sv);
1761 prepare_schedule_to (aTHX_ &ta, next);
1762}
1763
1764static int
1765api_cede (pTHX)
1766{
1767 struct coro_transfer_args ta;
1768
1769 prepare_cede (aTHX_ &ta);
1770
1771 if (expect_true (ta.prev != ta.next))
1772 {
1773 TRANSFER (ta, 1);
1774 return 1;
1775 }
1776 else
1777 return 0;
1778}
1779
1780static int
1781api_cede_notself (pTHX)
1782{
1783 if (coro_nready)
1784 {
1785 struct coro_transfer_args ta;
1786
1787 prepare_cede_notself (aTHX_ &ta);
1788 TRANSFER (ta, 1);
1789 return 1;
1790 }
1791 else
1792 return 0;
1793}
1794
1795static void
1796api_trace (pTHX_ SV *coro_sv, int flags)
1797{
1798 struct coro *coro = SvSTATE (coro_sv);
1799
1800 if (coro->flags & CF_RUNNING)
1801 croak ("cannot enable tracing on a running coroutine, caught");
1802
1803 if (flags & CC_TRACE)
1804 {
1805 if (!coro->cctx)
1806 coro->cctx = cctx_new_run ();
1807 else if (!(coro->cctx->flags & CC_TRACE))
1808 croak ("cannot enable tracing on coroutine with custom stack, caught");
1809
1810 coro->cctx->flags |= CC_NOREUSE | (flags & (CC_TRACE | CC_TRACE_ALL));
1811 }
1812 else if (coro->cctx && coro->cctx->flags & CC_TRACE)
1813 {
1814 coro->cctx->flags &= ~(CC_TRACE | CC_TRACE_ALL);
1815
1816 if (coro->flags & CF_RUNNING)
1817 PL_runops = RUNOPS_DEFAULT;
1818 else
1819 coro->slot->runops = RUNOPS_DEFAULT;
1820 }
1821}
1822
1823static void
1824coro_call_on_destroy (pTHX_ struct coro *coro)
1825{
1826 SV **on_destroyp = hv_fetch (coro->hv, "_on_destroy", sizeof ("_on_destroy") - 1, 0);
1827 SV **statusp = hv_fetch (coro->hv, "_status", sizeof ("_status") - 1, 0);
1828
1829 if (on_destroyp)
1830 {
1831 AV *on_destroy = (AV *)SvRV (*on_destroyp);
1832
1833 while (AvFILLp (on_destroy) >= 0)
1834 {
1835 dSP; /* don't disturb outer sp */
1836 SV *cb = av_pop (on_destroy);
1837
1838 PUSHMARK (SP);
1839
1840 if (statusp)
1841 {
1842 int i;
1843 AV *status = (AV *)SvRV (*statusp);
1844 EXTEND (SP, AvFILLp (status) + 1);
1845
1846 for (i = 0; i <= AvFILLp (status); ++i)
1847 PUSHs (AvARRAY (status)[i]);
1848 }
1849
1850 PUTBACK;
1851 call_sv (sv_2mortal (cb), G_VOID | G_DISCARD);
1852 }
1853 }
1854}
1855
1856static void
1857slf_init_terminate (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
1858{
1859 int i;
1860 HV *hv = (HV *)SvRV (coro_current);
1861 AV *av = newAV ();
1862
1863 av_extend (av, items - 1);
1864 for (i = 0; i < items; ++i)
1865 av_push (av, SvREFCNT_inc_NN (arg [i]));
1866
1867 hv_store (hv, "_status", sizeof ("_status") - 1, newRV_noinc ((SV *)av), 0);
1868
1869 av_push (av_destroy, (SV *)newRV_inc ((SV *)hv)); /* RVinc for perl */
1870 api_ready (aTHX_ sv_manager);
1871
1872 frame->prepare = prepare_schedule;
1873 frame->check = slf_check_repeat;
1874
1875 /* as a minor optimisation, we could unwind all stacks here */
1876 /* but that puts extra pressure on pp_slf, and is not worth much */
1877 /*coro_unwind_stacks (aTHX);*/
1878}
1879
1880/*****************************************************************************/
1881/* async pool handler */
1882
1883static int
1884slf_check_pool_handler (pTHX_ struct CoroSLF *frame)
1885{
1886 HV *hv = (HV *)SvRV (coro_current);
1887 struct coro *coro = (struct coro *)frame->data;
1888
1889 if (!coro->invoke_cb)
1890 return 1; /* loop till we have invoke */
1891 else
1892 {
1893 hv_store (hv, "desc", sizeof ("desc") - 1,
1894 newSVpvn ("[async_pool]", sizeof ("[async_pool]") - 1), 0);
1895
1896 coro->saved_deffh = SvREFCNT_inc_NN ((SV *)PL_defoutgv);
1897
1898 {
1899 dSP;
1900 XPUSHs (sv_2mortal (coro->invoke_cb)); coro->invoke_cb = 0;
1901 PUTBACK;
1902 }
1903
1904 SvREFCNT_dec (GvAV (PL_defgv));
1905 GvAV (PL_defgv) = coro->invoke_av;
1906 coro->invoke_av = 0;
1907
1908 return 0;
1909 }
1910}
1911
1912static void
1913slf_init_pool_handler (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
1914{
1915 HV *hv = (HV *)SvRV (coro_current);
1916 struct coro *coro = SvSTATE_hv ((SV *)hv);
1917
1918 if (expect_true (coro->saved_deffh))
1919 {
1920 /* subsequent iteration */
1921 SvREFCNT_dec ((SV *)PL_defoutgv); PL_defoutgv = (GV *)coro->saved_deffh;
1922 coro->saved_deffh = 0;
1923
1924 if (coro_rss (aTHX_ coro) > SvUV (sv_pool_rss)
1925 || av_len (av_async_pool) + 1 >= SvIV (sv_pool_size))
1926 {
1927 coro->invoke_cb = SvREFCNT_inc_NN ((SV *)cv_coro_terminate);
1928 coro->invoke_av = newAV ();
1929
1930 frame->prepare = prepare_nop;
1931 }
1932 else
1933 {
1934 av_clear (GvAV (PL_defgv));
1935 hv_store (hv, "desc", sizeof ("desc") - 1, SvREFCNT_inc_NN (sv_async_pool_idle), 0);
1936
1937 coro->prio = 0;
1938
1939 if (coro->cctx && (coro->cctx->flags & CC_TRACE))
1940 api_trace (aTHX_ coro_current, 0);
1941
1942 frame->prepare = prepare_schedule;
1943 av_push (av_async_pool, SvREFCNT_inc (hv));
1944 }
1945 }
1946 else
1947 {
1948 /* first iteration, simply fall through */
1949 frame->prepare = prepare_nop;
1950 }
1951
1952 frame->check = slf_check_pool_handler;
1953 frame->data = (void *)coro;
1954}
1955
1956/*****************************************************************************/
1957/* rouse callback */
1958
1959#define CORO_MAGIC_type_rouse PERL_MAGIC_ext
1960
1961static void
1962coro_rouse_callback (pTHX_ CV *cv)
1963{
1964 dXSARGS;
1965 SV *data = (SV *)GENSUB_ARG;
1966
1967 if (SvTYPE (SvRV (data)) != SVt_PVAV)
1968 {
1969 /* first call, set args */
1970 AV *av = newAV ();
1971 SV *coro = SvRV (data);
1972
1973 SvRV_set (data, (SV *)av);
1974 api_ready (aTHX_ coro);
1975 SvREFCNT_dec (coro);
1976
1977 /* better take a full copy of the arguments */
1978 while (items--)
1979 av_store (av, items, newSVsv (ST (items)));
1980 }
1981
1982 XSRETURN_EMPTY;
1983}
1984
1985static int
1986slf_check_rouse_wait (pTHX_ struct CoroSLF *frame)
1987{
1988 SV *data = (SV *)frame->data;
1989
1990 if (CORO_THROW)
1991 return 0;
1992
1993 if (SvTYPE (SvRV (data)) != SVt_PVAV)
1994 return 1;
1995
1996 /* now push all results on the stack */
1997 {
1998 dSP;
1999 AV *av = (AV *)SvRV (data);
2000 int i;
2001
2002 EXTEND (SP, AvFILLp (av) + 1);
2003 for (i = 0; i <= AvFILLp (av); ++i)
2004 PUSHs (sv_2mortal (AvARRAY (av)[i]));
2005
2006 /* we have stolen the elements, so ste length to zero and free */
2007 AvFILLp (av) = -1;
2008 av_undef (av);
2009
2010 PUTBACK;
2011 }
2012
2013 return 0;
2014}
2015
2016static void
2017slf_init_rouse_wait (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2018{
2019 SV *cb;
2020
2021 if (items)
2022 cb = arg [0];
2023 else
2024 {
2025 struct coro *coro = SvSTATE_current;
2026
2027 if (!coro->rouse_cb)
2028 croak ("Coro::rouse_wait called without rouse callback, and no default rouse callback found either,");
2029
2030 cb = sv_2mortal (coro->rouse_cb);
2031 coro->rouse_cb = 0;
2032 }
2033
2034 if (!SvROK (cb)
2035 || SvTYPE (SvRV (cb)) != SVt_PVCV
2036 || CvXSUB ((CV *)SvRV (cb)) != coro_rouse_callback)
2037 croak ("Coro::rouse_wait called with illegal callback argument,");
2038
2039 {
2040 CV *cv = (CV *)SvRV (cb); /* for GENSUB_ARG */
2041 SV *data = (SV *)GENSUB_ARG;
2042
2043 frame->data = (void *)data;
2044 frame->prepare = SvTYPE (SvRV (data)) == SVt_PVAV ? prepare_nop : prepare_schedule;
2045 frame->check = slf_check_rouse_wait;
2046 }
2047}
2048
2049static SV *
2050coro_new_rouse_cb (pTHX)
2051{
2052 HV *hv = (HV *)SvRV (coro_current);
2053 struct coro *coro = SvSTATE_hv (hv);
2054 SV *data = newRV_inc ((SV *)hv);
2055 SV *cb = gensub (aTHX_ coro_rouse_callback, (void *)data);
2056
2057 sv_magicext (SvRV (cb), data, CORO_MAGIC_type_rouse, 0, 0, 0);
2058 SvREFCNT_dec (data); /* magicext increases the refcount */
2059
2060 SvREFCNT_dec (coro->rouse_cb);
2061 coro->rouse_cb = SvREFCNT_inc_NN (cb);
2062
2063 return cb;
2064}
2065
2066/*****************************************************************************/
2067/* schedule-like-function opcode (SLF) */
2068
2069static UNOP slf_restore; /* restore stack as entersub did, for first-re-run */
2070static const CV *slf_cv;
2071static SV **slf_argv;
2072static int slf_argc, slf_arga; /* count, allocated */
2073static I32 slf_ax; /* top of stack, for restore */
2074
2075/* this restores the stack in the case we patched the entersub, to */
2076/* recreate the stack frame as perl will on following calls */
2077/* since entersub cleared the stack */
2078static OP *
2079pp_restore (pTHX)
2080{
2081 int i;
2082 SV **SP = PL_stack_base + slf_ax;
2083
2084 PUSHMARK (SP);
2085
2086 EXTEND (SP, slf_argc + 1);
2087
2088 for (i = 0; i < slf_argc; ++i)
2089 PUSHs (sv_2mortal (slf_argv [i]));
2090
2091 PUSHs ((SV *)CvGV (slf_cv));
2092
2093 RETURNOP (slf_restore.op_first);
2094}
2095
2096static void
2097slf_prepare_transfer (pTHX_ struct coro_transfer_args *ta)
2098{
2099 SV **arg = (SV **)slf_frame.data;
2100
2101 prepare_transfer (aTHX_ ta, arg [0], arg [1]);
2102}
2103
2104static void
2105slf_init_transfer (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2106{
2107 if (items != 2)
2108 croak ("Coro::State::transfer (prev, next) expects two arguments, not %d,", items);
2109
2110 frame->prepare = slf_prepare_transfer;
2111 frame->check = slf_check_nop;
2112 frame->data = (void *)arg; /* let's hope it will stay valid */
2113}
2114
2115static void
2116slf_init_schedule (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2117{
2118 frame->prepare = prepare_schedule;
2119 frame->check = slf_check_nop;
2120}
2121
2122static void
2123slf_prepare_schedule_to (pTHX_ struct coro_transfer_args *ta)
2124{
2125 struct coro *next = (struct coro *)slf_frame.data;
2126
2127 SvREFCNT_inc_NN (next->hv);
2128 prepare_schedule_to (aTHX_ ta, next);
2129}
2130
2131static void
2132slf_init_schedule_to (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2133{
2134 if (!items)
2135 croak ("Coro::schedule_to expects a coroutine argument, caught");
2136
2137 frame->data = (void *)SvSTATE (arg [0]);
2138 frame->prepare = slf_prepare_schedule_to;
2139 frame->check = slf_check_nop;
2140}
2141
2142static void
2143slf_init_cede_to (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2144{
2145 api_ready (aTHX_ SvRV (coro_current));
2146
2147 slf_init_schedule_to (aTHX_ frame, cv, arg, items);
2148}
2149
2150static void
2151slf_init_cede (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2152{
2153 frame->prepare = prepare_cede;
2154 frame->check = slf_check_nop;
2155}
2156
2157static void
2158slf_init_cede_notself (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2159{
2160 frame->prepare = prepare_cede_notself;
2161 frame->check = slf_check_nop;
2162}
2163
2164/*
2165 * these not obviously related functions are all rolled into one
2166 * function to increase chances that they all will call transfer with the same
2167 * stack offset
2168 * SLF stands for "schedule-like-function".
2169 */
2170static OP *
2171pp_slf (pTHX)
2172{
2173 I32 checkmark; /* mark SP to see how many elements check has pushed */
2174
2175 /* set up the slf frame, unless it has already been set-up */
2176 /* the latter happens when a new coro has been started */
2177 /* or when a new cctx was attached to an existing coroutine */
2178 if (expect_true (!slf_frame.prepare))
2179 {
2180 /* first iteration */
2181 dSP;
2182 SV **arg = PL_stack_base + TOPMARK + 1;
2183 int items = SP - arg; /* args without function object */
2184 SV *gv = *sp;
2185
2186 /* do a quick consistency check on the "function" object, and if it isn't */
2187 /* for us, divert to the real entersub */
2188 if (SvTYPE (gv) != SVt_PVGV
2189 || !GvCV (gv)
2190 || !(CvFLAGS (GvCV (gv)) & CVf_SLF))
2191 return PL_ppaddr[OP_ENTERSUB](aTHX);
2192
2193 if (!(PL_op->op_flags & OPf_STACKED))
2194 {
2195 /* ampersand-form of call, use @_ instead of stack */
2196 AV *av = GvAV (PL_defgv);
2197 arg = AvARRAY (av);
2198 items = AvFILLp (av) + 1;
2199 }
2200
2201 /* now call the init function, which needs to set up slf_frame */
2202 ((coro_slf_cb)CvXSUBANY (GvCV (gv)).any_ptr)
2203 (aTHX_ &slf_frame, GvCV (gv), arg, items);
2204
2205 /* pop args */
2206 SP = PL_stack_base + POPMARK;
2207
2208 PUTBACK;
2209 }
2210
2211 /* now that we have a slf_frame, interpret it! */
2212 /* we use a callback system not to make the code needlessly */
2213 /* complicated, but so we can run multiple perl coros from one cctx */
2214
2215 do
2216 {
2217 struct coro_transfer_args ta;
2218
2219 slf_frame.prepare (aTHX_ &ta);
2220 TRANSFER (ta, 0);
2221
2222 checkmark = PL_stack_sp - PL_stack_base;
2223 }
2224 while (slf_frame.check (aTHX_ &slf_frame));
2225
2226 slf_frame.prepare = 0; /* invalidate the frame, we are done processing it */
2227
2228 /* exception handling */
2229 if (expect_false (CORO_THROW))
2230 {
2231 SV *exception = sv_2mortal (CORO_THROW);
2232
2233 CORO_THROW = 0;
2234 sv_setsv (ERRSV, exception);
2235 croak (0);
2236 }
2237
2238 /* return value handling - mostly like entersub */
2239 /* make sure we put something on the stack in scalar context */
2240 if (GIMME_V == G_SCALAR)
2241 {
2242 dSP;
2243 SV **bot = PL_stack_base + checkmark;
2244
2245 if (sp == bot) /* too few, push undef */
2246 bot [1] = &PL_sv_undef;
2247 else if (sp != bot + 1) /* too many, take last one */
2248 bot [1] = *sp;
2249
2250 SP = bot + 1;
2251
2252 PUTBACK;
2253 }
2254
2255 return NORMAL;
2256}
2257
2258static void
2259api_execute_slf (pTHX_ CV *cv, coro_slf_cb init_cb, I32 ax)
2260{
2261 int i;
2262 SV **arg = PL_stack_base + ax;
2263 int items = PL_stack_sp - arg + 1;
2264
2265 assert (("FATAL: SLF call with illegal CV value", !CvANON (cv)));
2266
2267 if (PL_op->op_ppaddr != PL_ppaddr [OP_ENTERSUB]
2268 && PL_op->op_ppaddr != pp_slf)
2269 croak ("FATAL: Coro SLF calls can only be made normally, not via goto or any other means, caught");
2270
2271 CvFLAGS (cv) |= CVf_SLF;
2272 CvXSUBANY (cv).any_ptr = (void *)init_cb;
2273 slf_cv = cv;
2274
2275 /* we patch the op, and then re-run the whole call */
2276 /* we have to put the same argument on the stack for this to work */
2277 /* and this will be done by pp_restore */
2278 slf_restore.op_next = (OP *)&slf_restore;
2279 slf_restore.op_type = OP_CUSTOM;
2280 slf_restore.op_ppaddr = pp_restore;
2281 slf_restore.op_first = PL_op;
2282
2283 slf_ax = ax - 1; /* undo the ax++ inside dAXMARK */
2284
2285 if (PL_op->op_flags & OPf_STACKED)
2286 {
2287 if (items > slf_arga)
2288 {
2289 slf_arga = items;
2290 free (slf_argv);
2291 slf_argv = malloc (slf_arga * sizeof (SV *));
2292 }
2293
2294 slf_argc = items;
2295
2296 for (i = 0; i < items; ++i)
2297 slf_argv [i] = SvREFCNT_inc (arg [i]);
2298 }
2299 else
2300 slf_argc = 0;
2301
2302 PL_op->op_ppaddr = pp_slf;
2303 /*PL_op->op_type = OP_CUSTOM; /* we do behave like entersub still */
2304
2305 PL_op = (OP *)&slf_restore;
2306}
2307
2308/*****************************************************************************/
2309/* PerlIO::cede */
2310
2311typedef struct
2312{
2313 PerlIOBuf base;
2314 NV next, every;
2315} PerlIOCede;
2316
2317static IV
2318PerlIOCede_pushed (pTHX_ PerlIO *f, const char *mode, SV *arg, PerlIO_funcs *tab)
2319{
2320 PerlIOCede *self = PerlIOSelf (f, PerlIOCede);
2321
2322 self->every = SvCUR (arg) ? SvNV (arg) : 0.01;
2323 self->next = nvtime () + self->every;
2324
2325 return PerlIOBuf_pushed (aTHX_ f, mode, Nullsv, tab);
2326}
2327
2328static SV *
2329PerlIOCede_getarg (pTHX_ PerlIO *f, CLONE_PARAMS *param, int flags)
2330{
2331 PerlIOCede *self = PerlIOSelf (f, PerlIOCede);
2332
2333 return newSVnv (self->every);
2334}
2335
2336static IV
2337PerlIOCede_flush (pTHX_ PerlIO *f)
2338{
2339 PerlIOCede *self = PerlIOSelf (f, PerlIOCede);
2340 double now = nvtime ();
2341
2342 if (now >= self->next)
2343 {
2344 api_cede (aTHX);
2345 self->next = now + self->every;
2346 }
2347
2348 return PerlIOBuf_flush (aTHX_ f);
2349}
2350
2351static PerlIO_funcs PerlIO_cede =
2352{
2353 sizeof(PerlIO_funcs),
2354 "cede",
2355 sizeof(PerlIOCede),
2356 PERLIO_K_DESTRUCT | PERLIO_K_RAW,
2357 PerlIOCede_pushed,
2358 PerlIOBuf_popped,
2359 PerlIOBuf_open,
2360 PerlIOBase_binmode,
2361 PerlIOCede_getarg,
2362 PerlIOBase_fileno,
2363 PerlIOBuf_dup,
2364 PerlIOBuf_read,
2365 PerlIOBuf_unread,
2366 PerlIOBuf_write,
2367 PerlIOBuf_seek,
2368 PerlIOBuf_tell,
2369 PerlIOBuf_close,
2370 PerlIOCede_flush,
2371 PerlIOBuf_fill,
2372 PerlIOBase_eof,
2373 PerlIOBase_error,
2374 PerlIOBase_clearerr,
2375 PerlIOBase_setlinebuf,
2376 PerlIOBuf_get_base,
2377 PerlIOBuf_bufsiz,
2378 PerlIOBuf_get_ptr,
2379 PerlIOBuf_get_cnt,
2380 PerlIOBuf_set_ptrcnt,
2381};
2382
2383/*****************************************************************************/
2384/* Coro::Semaphore & Coro::Signal */
2385
2386static SV *
2387coro_waitarray_new (pTHX_ int count)
2388{
2389 /* a semaphore contains a counter IV in $sem->[0] and any waiters after that */
2390 AV *av = newAV ();
2391 SV **ary;
2392
2393 /* unfortunately, building manually saves memory */
2394 Newx (ary, 2, SV *);
2395 AvALLOC (av) = ary;
2396#if PERL_VERSION_ATLEAST (5,10,0)
2397 AvARRAY (av) = ary;
2398#else
2399 /* 5.8.8 needs this syntax instead of AvARRAY = ary, yet */
2400 /* -DDEBUGGING flags this as a bug, despite it perfectly working */
2401 SvPVX ((SV *)av) = (char *)ary;
2402#endif
2403 AvMAX (av) = 1;
2404 AvFILLp (av) = 0;
2405 ary [0] = newSViv (count);
2406
2407 return newRV_noinc ((SV *)av);
2408}
2409
2410/* semaphore */
2411
2412static void
2413coro_semaphore_adjust (pTHX_ AV *av, IV adjust)
2414{
2415 SV *count_sv = AvARRAY (av)[0];
2416 IV count = SvIVX (count_sv);
2417
2418 count += adjust;
2419 SvIVX (count_sv) = count;
2420
2421 /* now wake up as many waiters as are expected to lock */
2422 while (count > 0 && AvFILLp (av) > 0)
2423 {
2424 SV *cb;
2425
2426 /* swap first two elements so we can shift a waiter */
2427 AvARRAY (av)[0] = AvARRAY (av)[1];
2428 AvARRAY (av)[1] = count_sv;
2429 cb = av_shift (av);
2430
2431 if (SvOBJECT (cb))
2432 {
2433 api_ready (aTHX_ cb);
2434 --count;
2435 }
2436 else if (SvTYPE (cb) == SVt_PVCV)
2437 {
2438 dSP;
2439 PUSHMARK (SP);
2440 XPUSHs (sv_2mortal (newRV_inc ((SV *)av)));
2441 PUTBACK;
2442 call_sv (cb, G_VOID | G_DISCARD | G_EVAL | G_KEEPERR);
2443 }
2444
2445 SvREFCNT_dec (cb);
2446 }
2447}
2448
2449static void
2450coro_semaphore_on_destroy (pTHX_ struct coro *coro)
2451{
2452 /* call $sem->adjust (0) to possibly wake up some other waiters */
2453 coro_semaphore_adjust (aTHX_ (AV *)coro->slf_frame.data, 0);
2454}
2455
2456static int
2457slf_check_semaphore_down_or_wait (pTHX_ struct CoroSLF *frame, int acquire)
2458{
2459 AV *av = (AV *)frame->data;
2460 SV *count_sv = AvARRAY (av)[0];
2461
2462 /* if we are about to throw, don't actually acquire the lock, just throw */
2463 if (CORO_THROW)
2464 return 0;
2465 else if (SvIVX (count_sv) > 0)
2466 {
2467 SvSTATE_current->on_destroy = 0;
2468
2469 if (acquire)
2470 SvIVX (count_sv) = SvIVX (count_sv) - 1;
2471 else
2472 coro_semaphore_adjust (aTHX_ av, 0);
2473
2474 return 0;
2475 }
2476 else
2477 {
2478 int i;
2479 /* if we were woken up but can't down, we look through the whole */
2480 /* waiters list and only add us if we aren't in there already */
2481 /* this avoids some degenerate memory usage cases */
2482
2483 for (i = 1; i <= AvFILLp (av); ++i)
2484 if (AvARRAY (av)[i] == SvRV (coro_current))
2485 return 1;
2486
2487 av_push (av, SvREFCNT_inc (SvRV (coro_current)));
2488 return 1;
2489 }
2490}
2491
2492static int
2493slf_check_semaphore_down (pTHX_ struct CoroSLF *frame)
2494{
2495 return slf_check_semaphore_down_or_wait (aTHX_ frame, 1);
2496}
2497
2498static int
2499slf_check_semaphore_wait (pTHX_ struct CoroSLF *frame)
2500{
2501 return slf_check_semaphore_down_or_wait (aTHX_ frame, 0);
2502}
2503
2504static void
2505slf_init_semaphore_down_or_wait (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2506{
2507 AV *av = (AV *)SvRV (arg [0]);
2508
2509 if (SvIVX (AvARRAY (av)[0]) > 0)
2510 {
2511 frame->data = (void *)av;
2512 frame->prepare = prepare_nop;
2513 }
2514 else
2515 {
2516 av_push (av, SvREFCNT_inc (SvRV (coro_current)));
2517
2518 frame->data = (void *)sv_2mortal (SvREFCNT_inc ((SV *)av));
2519 frame->prepare = prepare_schedule;
2520
2521 /* to avoid race conditions when a woken-up coro gets terminated */
2522 /* we arrange for a temporary on_destroy that calls adjust (0) */
2523 SvSTATE_current->on_destroy = coro_semaphore_on_destroy;
2524 }
2525}
2526
2527static void
2528slf_init_semaphore_down (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2529{
2530 slf_init_semaphore_down_or_wait (aTHX_ frame, cv, arg, items);
2531 frame->check = slf_check_semaphore_down;
2532}
2533
2534static void
2535slf_init_semaphore_wait (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2536{
2537 if (items >= 2)
2538 {
2539 /* callback form */
2540 AV *av = (AV *)SvRV (arg [0]);
2541 CV *cb_cv = coro_sv_2cv (aTHX_ arg [1]);
2542
2543 av_push (av, (SV *)SvREFCNT_inc_NN (cb_cv));
2544
2545 if (SvIVX (AvARRAY (av)[0]) > 0)
2546 coro_semaphore_adjust (aTHX_ av, 0);
2547
2548 frame->prepare = prepare_nop;
2549 frame->check = slf_check_nop;
2550 }
2551 else
2552 {
2553 slf_init_semaphore_down_or_wait (aTHX_ frame, cv, arg, items);
2554 frame->check = slf_check_semaphore_wait;
2555 }
2556}
2557
2558/* signal */
2559
2560static void
2561coro_signal_wake (pTHX_ AV *av, int count)
2562{
2563 SvIVX (AvARRAY (av)[0]) = 0;
2564
2565 /* now signal count waiters */
2566 while (count > 0 && AvFILLp (av) > 0)
2567 {
2568 SV *cb;
2569
2570 /* swap first two elements so we can shift a waiter */
2571 cb = AvARRAY (av)[0];
2572 AvARRAY (av)[0] = AvARRAY (av)[1];
2573 AvARRAY (av)[1] = cb;
2574
2575 cb = av_shift (av);
2576
2577 api_ready (aTHX_ cb);
2578 sv_setiv (cb, 0); /* signal waiter */
2579 SvREFCNT_dec (cb);
2580
2581 --count;
2582 }
2583}
2584
2585static int
2586slf_check_signal_wait (pTHX_ struct CoroSLF *frame)
2587{
2588 /* if we are about to throw, also stop waiting */
2589 return SvROK ((SV *)frame->data) && !CORO_THROW;
2590}
2591
2592static void
2593slf_init_signal_wait (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2594{
2595 AV *av = (AV *)SvRV (arg [0]);
2596
2597 if (SvIVX (AvARRAY (av)[0]))
2598 {
2599 SvIVX (AvARRAY (av)[0]) = 0;
2600 frame->prepare = prepare_nop;
2601 frame->check = slf_check_nop;
2602 }
2603 else
2604 {
2605 SV *waiter = newRV_inc (SvRV (coro_current)); /* owned by signal av */
2606
2607 av_push (av, waiter);
2608
2609 frame->data = (void *)sv_2mortal (SvREFCNT_inc_NN (waiter)); /* owned by process */
2610 frame->prepare = prepare_schedule;
2611 frame->check = slf_check_signal_wait;
2612 }
2613}
2614
2615/*****************************************************************************/
2616/* Coro::AIO */
2617
2618#define CORO_MAGIC_type_aio PERL_MAGIC_ext
2619
2620/* helper storage struct */
2621struct io_state
2622{
2623 int errorno;
2624 I32 laststype; /* U16 in 5.10.0 */
2625 int laststatval;
2626 Stat_t statcache;
2627};
2628
2629static void
2630coro_aio_callback (pTHX_ CV *cv)
2631{
2632 dXSARGS;
2633 AV *state = (AV *)GENSUB_ARG;
2634 SV *coro = av_pop (state);
2635 SV *data_sv = newSV (sizeof (struct io_state));
2636
2637 av_extend (state, items - 1);
2638
2639 sv_upgrade (data_sv, SVt_PV);
2640 SvCUR_set (data_sv, sizeof (struct io_state));
2641 SvPOK_only (data_sv);
2642
2643 {
2644 struct io_state *data = (struct io_state *)SvPVX (data_sv);
2645
2646 data->errorno = errno;
2647 data->laststype = PL_laststype;
2648 data->laststatval = PL_laststatval;
2649 data->statcache = PL_statcache;
2650 }
2651
2652 /* now build the result vector out of all the parameters and the data_sv */
2653 {
2654 int i;
2655
2656 for (i = 0; i < items; ++i)
2657 av_push (state, SvREFCNT_inc_NN (ST (i)));
2658 }
2659
2660 av_push (state, data_sv);
2661
2662 api_ready (aTHX_ coro);
2663 SvREFCNT_dec (coro);
2664 SvREFCNT_dec ((AV *)state);
2665}
2666
2667static int
2668slf_check_aio_req (pTHX_ struct CoroSLF *frame)
2669{
2670 AV *state = (AV *)frame->data;
2671
2672 /* if we are about to throw, return early */
2673 /* this does not cancel the aio request, but at least */
2674 /* it quickly returns */
2675 if (CORO_THROW)
2676 return 0;
2677
2678 /* one element that is an RV? repeat! */
2679 if (AvFILLp (state) == 0 && SvROK (AvARRAY (state)[0]))
2680 return 1;
2681
2682 /* restore status */
2683 {
2684 SV *data_sv = av_pop (state);
2685 struct io_state *data = (struct io_state *)SvPVX (data_sv);
2686
2687 errno = data->errorno;
2688 PL_laststype = data->laststype;
2689 PL_laststatval = data->laststatval;
2690 PL_statcache = data->statcache;
2691
2692 SvREFCNT_dec (data_sv);
2693 }
2694
2695 /* push result values */
2696 {
2697 dSP;
2698 int i;
2699
2700 EXTEND (SP, AvFILLp (state) + 1);
2701 for (i = 0; i <= AvFILLp (state); ++i)
2702 PUSHs (sv_2mortal (SvREFCNT_inc_NN (AvARRAY (state)[i])));
2703
2704 PUTBACK;
2705 }
2706
2707 return 0;
2708}
2709
2710static void
2711slf_init_aio_req (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2712{
2713 AV *state = (AV *)sv_2mortal ((SV *)newAV ());
2714 SV *coro_hv = SvRV (coro_current);
2715 struct coro *coro = SvSTATE_hv (coro_hv);
2716
2717 /* put our coroutine id on the state arg */
2718 av_push (state, SvREFCNT_inc_NN (coro_hv));
2719
2720 /* first see whether we have a non-zero priority and set it as AIO prio */
2721 if (coro->prio)
2722 {
2723 dSP;
2724
2725 static SV *prio_cv;
2726 static SV *prio_sv;
2727
2728 if (expect_false (!prio_cv))
2729 {
2730 prio_cv = (SV *)get_cv ("IO::AIO::aioreq_pri", 0);
2731 prio_sv = newSViv (0);
2732 }
2733
2734 PUSHMARK (SP);
2735 sv_setiv (prio_sv, coro->prio);
2736 XPUSHs (prio_sv);
2737
2738 PUTBACK;
2739 call_sv (prio_cv, G_VOID | G_DISCARD);
2740 }
2741
2742 /* now call the original request */
2743 {
2744 dSP;
2745 CV *req = (CV *)CORO_MAGIC_NN ((SV *)cv, CORO_MAGIC_type_aio)->mg_obj;
2746 int i;
2747
2748 PUSHMARK (SP);
2749
2750 /* first push all args to the stack */
2751 EXTEND (SP, items + 1);
2752
2753 for (i = 0; i < items; ++i)
2754 PUSHs (arg [i]);
2755
2756 /* now push the callback closure */
2757 PUSHs (sv_2mortal (gensub (aTHX_ coro_aio_callback, (void *)SvREFCNT_inc_NN ((SV *)state))));
2758
2759 /* now call the AIO function - we assume our request is uncancelable */
2760 PUTBACK;
2761 call_sv ((SV *)req, G_VOID | G_DISCARD);
2762 }
2763
2764 /* now that the requets is going, we loop toll we have a result */
2765 frame->data = (void *)state;
2766 frame->prepare = prepare_schedule;
2767 frame->check = slf_check_aio_req;
2768}
2769
2770static void
2771coro_aio_req_xs (pTHX_ CV *cv)
2772{
2773 dXSARGS;
2774
2775 CORO_EXECUTE_SLF_XS (slf_init_aio_req);
2776
2777 XSRETURN_EMPTY;
2778}
2779
2780/*****************************************************************************/
2781
2782#if CORO_CLONE
2783# include "clone.c"
2784#endif
2785
2786MODULE = Coro::State PACKAGE = Coro::State PREFIX = api_
2787
2788PROTOTYPES: DISABLE
2789
2790BOOT:
2791{
2792#ifdef USE_ITHREADS
2793# if CORO_PTHREAD
2794 coro_thx = PERL_GET_CONTEXT;
2795# endif
2796#endif
2797 BOOT_PAGESIZE;
2798
2799 cctx_current = cctx_new_empty ();
2800
2801 irsgv = gv_fetchpv ("/" , GV_ADD|GV_NOTQUAL, SVt_PV);
2802 stdoutgv = gv_fetchpv ("STDOUT", GV_ADD|GV_NOTQUAL, SVt_PVIO);
2803
2804 orig_sigelem_get = PL_vtbl_sigelem.svt_get; PL_vtbl_sigelem.svt_get = coro_sigelem_get;
2805 orig_sigelem_set = PL_vtbl_sigelem.svt_set; PL_vtbl_sigelem.svt_set = coro_sigelem_set;
2806 orig_sigelem_clr = PL_vtbl_sigelem.svt_clear; PL_vtbl_sigelem.svt_clear = coro_sigelem_clr;
2807
2808 hv_sig = coro_get_hv (aTHX_ "SIG", TRUE);
2809 rv_diehook = newRV_inc ((SV *)gv_fetchpv ("Coro::State::diehook" , 0, SVt_PVCV));
2810 rv_warnhook = newRV_inc ((SV *)gv_fetchpv ("Coro::State::warnhook", 0, SVt_PVCV));
2811
2812 coro_state_stash = gv_stashpv ("Coro::State", TRUE);
2813
2814 newCONSTSUB (coro_state_stash, "CC_TRACE" , newSViv (CC_TRACE));
2815 newCONSTSUB (coro_state_stash, "CC_TRACE_SUB" , newSViv (CC_TRACE_SUB));
2816 newCONSTSUB (coro_state_stash, "CC_TRACE_LINE", newSViv (CC_TRACE_LINE));
2817 newCONSTSUB (coro_state_stash, "CC_TRACE_ALL" , newSViv (CC_TRACE_ALL));
2818
2819 main_mainstack = PL_mainstack;
2820 main_top_env = PL_top_env;
2821
2822 while (main_top_env->je_prev)
2823 main_top_env = main_top_env->je_prev;
2824
2825 {
2826 SV *slf = sv_2mortal (newSViv (PTR2IV (pp_slf)));
2827
2828 if (!PL_custom_op_names) PL_custom_op_names = newHV ();
2829 hv_store_ent (PL_custom_op_names, slf, newSVpv ("coro_slf", 0), 0);
2830
2831 if (!PL_custom_op_descs) PL_custom_op_descs = newHV ();
2832 hv_store_ent (PL_custom_op_descs, slf, newSVpv ("coro schedule like function", 0), 0);
2833 }
2834
2835 coroapi.ver = CORO_API_VERSION;
2836 coroapi.rev = CORO_API_REVISION;
2837
2838 coroapi.transfer = api_transfer;
2839
2840 coroapi.sv_state = SvSTATE_;
2841 coroapi.execute_slf = api_execute_slf;
2842 coroapi.prepare_nop = prepare_nop;
2843 coroapi.prepare_schedule = prepare_schedule;
2844 coroapi.prepare_cede = prepare_cede;
2845 coroapi.prepare_cede_notself = prepare_cede_notself;
2846
2847 {
2848 SV **svp = hv_fetch (PL_modglobal, "Time::NVtime", 12, 0);
2849
2850 if (!svp) croak ("Time::HiRes is required");
2851 if (!SvIOK (*svp)) croak ("Time::NVtime isn't a function pointer");
2852
2853 nvtime = INT2PTR (double (*)(), SvIV (*svp));
2854 }
2855
2856 assert (("PRIO_NORMAL must be 0", !PRIO_NORMAL));
2857}
2858
2859SV *
2860new (char *klass, ...)
2861 ALIAS:
2862 Coro::new = 1
2863 CODE:
2864{
2865 struct coro *coro;
2866 MAGIC *mg;
2867 HV *hv;
2868 CV *cb;
2869 int i;
2870
2871 if (items > 1)
2872 {
2873 cb = coro_sv_2cv (aTHX_ ST (1));
2874
2875 if (!ix)
235 { 2876 {
236 /* I never used formats, so how should I know how these are implemented? */ 2877 if (CvISXSUB (cb))
237 /* my bold guess is as a simple, plain sub... */ 2878 croak ("Coro::State doesn't support XS functions as coroutine start, caught");
238 croak ("CXt_FORMAT not yet handled. Don't switch coroutines from within formats"); 2879
2880 if (!CvROOT (cb))
2881 croak ("Coro::State doesn't support autoloaded or undefined functions as coroutine start, caught");
239 } 2882 }
240 } 2883 }
241 2884
242 if (top_si->si_type == PERLSI_MAIN)
243 break;
244
245 top_si = top_si->si_prev;
246 ccstk = top_si->si_cxstack;
247 cxix = top_si->si_cxix;
248 }
249
250 PUTBACK;
251 }
252
253 c->dowarn = PL_dowarn;
254 c->defav = GvAV (PL_defgv);
255 c->curstackinfo = PL_curstackinfo;
256 c->curstack = PL_curstack;
257 c->mainstack = PL_mainstack;
258 c->stack_sp = PL_stack_sp;
259 c->op = PL_op;
260 c->curpad = PL_curpad;
261 c->stack_base = PL_stack_base;
262 c->stack_max = PL_stack_max;
263 c->tmps_stack = PL_tmps_stack;
264 c->tmps_floor = PL_tmps_floor;
265 c->tmps_ix = PL_tmps_ix;
266 c->tmps_max = PL_tmps_max;
267 c->markstack = PL_markstack;
268 c->markstack_ptr = PL_markstack_ptr;
269 c->markstack_max = PL_markstack_max;
270 c->scopestack = PL_scopestack;
271 c->scopestack_ix = PL_scopestack_ix;
272 c->scopestack_max = PL_scopestack_max;
273 c->savestack = PL_savestack;
274 c->savestack_ix = PL_savestack_ix;
275 c->savestack_max = PL_savestack_max;
276 c->retstack = PL_retstack;
277 c->retstack_ix = PL_retstack_ix;
278 c->retstack_max = PL_retstack_max;
279 c->curcop = PL_curcop;
280}
281
282#define LOAD(state) do { load_state(aTHX_ state); SPAGAIN; } while (0)
283#define SAVE(state) do { PUTBACK; save_state(aTHX_ state); } while (0)
284
285static void
286load_state(pTHX_ Coro__State c)
287{
288 PL_dowarn = c->dowarn;
289 GvAV (PL_defgv) = c->defav;
290 PL_curstackinfo = c->curstackinfo;
291 PL_curstack = c->curstack;
292 PL_mainstack = c->mainstack;
293 PL_stack_sp = c->stack_sp;
294 PL_op = c->op;
295 PL_curpad = c->curpad;
296 PL_stack_base = c->stack_base;
297 PL_stack_max = c->stack_max;
298 PL_tmps_stack = c->tmps_stack;
299 PL_tmps_floor = c->tmps_floor;
300 PL_tmps_ix = c->tmps_ix;
301 PL_tmps_max = c->tmps_max;
302 PL_markstack = c->markstack;
303 PL_markstack_ptr = c->markstack_ptr;
304 PL_markstack_max = c->markstack_max;
305 PL_scopestack = c->scopestack;
306 PL_scopestack_ix = c->scopestack_ix;
307 PL_scopestack_max = c->scopestack_max;
308 PL_savestack = c->savestack;
309 PL_savestack_ix = c->savestack_ix;
310 PL_savestack_max = c->savestack_max;
311 PL_retstack = c->retstack;
312 PL_retstack_ix = c->retstack_ix;
313 PL_retstack_max = c->retstack_max;
314 PL_curcop = c->curcop;
315
316 {
317 dSP;
318 CV *cv;
319
320 /* now do the ugly restore mess */
321 while ((cv = (CV *)POPs))
322 {
323 AV *padlist = (AV *)POPs;
324
325 put_padlist (cv);
326 CvPADLIST(cv) = padlist;
327 CvDEPTH(cv) = (I32)POPs;
328
329#ifdef USE_THREADS
330 CvOWNER(cv) = (struct perl_thread *)POPs;
331 error does not work either
332#endif
333 }
334
335 PUTBACK;
336 }
337}
338
339/* this is an EXACT copy of S_nuke_stacks in perl.c, which is unfortunately static */
340STATIC void
341destroy_stacks(pTHX)
342{
343 /* die does this while calling POPSTACK, but I just don't see why. */
344 /* OTOH, die does not have a memleak, but we do... */
345 dounwind(-1);
346
347 /* is this ugly, I ask? */
348 while (PL_scopestack_ix)
349 LEAVE;
350
351 while (PL_curstackinfo->si_next)
352 PL_curstackinfo = PL_curstackinfo->si_next;
353
354 while (PL_curstackinfo)
355 {
356 PERL_SI *p = PL_curstackinfo->si_prev;
357
358 SvREFCNT_dec(PL_curstackinfo->si_stack);
359 Safefree(PL_curstackinfo->si_cxstack);
360 Safefree(PL_curstackinfo);
361 PL_curstackinfo = p;
362 }
363
364 if (PL_scopestack_ix != 0)
365 Perl_warner(aTHX_ WARN_INTERNAL,
366 "Unbalanced scopes: %ld more ENTERs than LEAVEs\n",
367 (long)PL_scopestack_ix);
368 if (PL_savestack_ix != 0)
369 Perl_warner(aTHX_ WARN_INTERNAL,
370 "Unbalanced saves: %ld more saves than restores\n",
371 (long)PL_savestack_ix);
372 if (PL_tmps_floor != -1)
373 Perl_warner(aTHX_ WARN_INTERNAL,"Unbalanced tmps: %ld more allocs than frees\n",
374 (long)PL_tmps_floor + 1);
375 /*
376 */
377 Safefree(PL_tmps_stack);
378 Safefree(PL_markstack);
379 Safefree(PL_scopestack);
380 Safefree(PL_savestack);
381 Safefree(PL_retstack);
382}
383
384#define SUB_INIT "Coro::State::_newcoro"
385
386MODULE = Coro::State PACKAGE = Coro::State
387
388PROTOTYPES: ENABLE
389
390BOOT:
391 if (!padlist_cache)
392 padlist_cache = newHV ();
393
394Coro::State
395_newprocess(args)
396 SV * args
397 PROTOTYPE: $
398 CODE:
399 Coro__State coro;
400
401 if (!SvROK (args) || SvTYPE (SvRV (args)) != SVt_PVAV)
402 croak ("Coro::State::newprocess expects an arrayref");
403
404 New (0, coro, 1, struct coro); 2885 Newz (0, coro, 1, struct coro);
2886 coro->args = newAV ();
2887 coro->flags = CF_NEW;
405 2888
406 coro->mainstack = 0; /* actual work is done inside transfer */ 2889 if (coro_first) coro_first->prev = coro;
407 coro->args = (AV *)SvREFCNT_inc (SvRV (args)); 2890 coro->next = coro_first;
2891 coro_first = coro;
408 2892
409 RETVAL = coro; 2893 coro->hv = hv = newHV ();
2894 mg = sv_magicext ((SV *)hv, 0, CORO_MAGIC_type_state, &coro_state_vtbl, (char *)coro, 0);
2895 mg->mg_flags |= MGf_DUP;
2896 RETVAL = sv_bless (newRV_noinc ((SV *)hv), gv_stashpv (klass, 1));
2897
2898 if (items > 1)
2899 {
2900 av_extend (coro->args, items - 1 + ix - 1);
2901
2902 if (ix)
2903 {
2904 av_push (coro->args, SvREFCNT_inc_NN ((SV *)cb));
2905 cb = cv_coro_run;
2906 }
2907
2908 coro->startcv = (CV *)SvREFCNT_inc_NN ((SV *)cb);
2909
2910 for (i = 2; i < items; i++)
2911 av_push (coro->args, newSVsv (ST (i)));
2912 }
2913}
410 OUTPUT: 2914 OUTPUT:
411 RETVAL 2915 RETVAL
412 2916
413void 2917void
414transfer(prev,next) 2918transfer (...)
415 Coro::State_or_hashref prev 2919 PROTOTYPE: $$
416 Coro::State_or_hashref next 2920 CODE:
417 CODE: 2921 CORO_EXECUTE_SLF_XS (slf_init_transfer);
418 2922
419 if (prev != next) 2923bool
2924_destroy (SV *coro_sv)
2925 CODE:
2926 RETVAL = coro_state_destroy (aTHX_ SvSTATE (coro_sv));
2927 OUTPUT:
2928 RETVAL
2929
2930void
2931_exit (int code)
2932 PROTOTYPE: $
2933 CODE:
2934 _exit (code);
2935
2936SV *
2937clone (Coro::State coro)
2938 CODE:
2939{
2940#if CORO_CLONE
2941 struct coro *ncoro = coro_clone (aTHX_ coro);
2942 MAGIC *mg;
2943 /* TODO: too much duplication */
2944 ncoro->hv = newHV ();
2945 mg = sv_magicext ((SV *)ncoro->hv, 0, CORO_MAGIC_type_state, &coro_state_vtbl, (char *)ncoro, 0);
2946 mg->mg_flags |= MGf_DUP;
2947 RETVAL = sv_bless (newRV_noinc ((SV *)ncoro->hv), SvSTASH (coro->hv));
2948#else
2949 croak ("Coro::State->clone has not been configured into this installation of Coro, realised");
2950#endif
2951}
2952 OUTPUT:
2953 RETVAL
2954
2955int
2956cctx_stacksize (int new_stacksize = 0)
2957 PROTOTYPE: ;$
2958 CODE:
2959 RETVAL = cctx_stacksize;
2960 if (new_stacksize)
420 { 2961 {
2962 cctx_stacksize = new_stacksize;
2963 ++cctx_gen;
421 /* 2964 }
422 * this could be done in newprocess which would lead to 2965 OUTPUT:
423 * extremely elegant and fast (just SAVE/LOAD) 2966 RETVAL
424 * code here, but lazy allocation of stacks has also 2967
425 * some virtues and the overhead of the if() is nil. 2968int
2969cctx_max_idle (int max_idle = 0)
2970 PROTOTYPE: ;$
2971 CODE:
2972 RETVAL = cctx_max_idle;
2973 if (max_idle > 1)
2974 cctx_max_idle = max_idle;
2975 OUTPUT:
2976 RETVAL
2977
2978int
2979cctx_count ()
2980 PROTOTYPE:
2981 CODE:
2982 RETVAL = cctx_count;
2983 OUTPUT:
2984 RETVAL
2985
2986int
2987cctx_idle ()
2988 PROTOTYPE:
2989 CODE:
2990 RETVAL = cctx_idle;
2991 OUTPUT:
2992 RETVAL
2993
2994void
2995list ()
2996 PROTOTYPE:
2997 PPCODE:
2998{
2999 struct coro *coro;
3000 for (coro = coro_first; coro; coro = coro->next)
3001 if (coro->hv)
3002 XPUSHs (sv_2mortal (newRV_inc ((SV *)coro->hv)));
3003}
3004
3005void
3006call (Coro::State coro, SV *coderef)
3007 ALIAS:
3008 eval = 1
3009 CODE:
3010{
3011 if (coro->mainstack && ((coro->flags & CF_RUNNING) || coro->slot))
426 */ 3012 {
427 if (next->mainstack) 3013 struct coro temp;
3014
3015 if (!(coro->flags & CF_RUNNING))
428 { 3016 {
429 SAVE (prev); 3017 PUTBACK;
430 LOAD (next); 3018 save_perl (aTHX_ &temp);
431 /* mark this state as in-use */ 3019 load_perl (aTHX_ coro);
432 next->mainstack = 0;
433 next->tmps_ix = -2;
434 } 3020 }
435 else if (next->tmps_ix == -2) 3021
3022 {
3023 dSP;
3024 ENTER;
3025 SAVETMPS;
3026 PUTBACK;
3027 PUSHSTACK;
3028 PUSHMARK (SP);
3029
3030 if (ix)
3031 eval_sv (coderef, 0);
3032 else
3033 call_sv (coderef, G_KEEPERR | G_EVAL | G_VOID | G_DISCARD);
3034
3035 POPSTACK;
3036 SPAGAIN;
3037 FREETMPS;
3038 LEAVE;
3039 PUTBACK;
3040 }
3041
3042 if (!(coro->flags & CF_RUNNING))
436 { 3043 {
437 croak ("tried to transfer to running coroutine"); 3044 save_perl (aTHX_ coro);
438 } 3045 load_perl (aTHX_ &temp);
439 else
440 {
441 SAVE (prev);
442
443 /*
444 * emulate part of the perl startup here.
445 */
446 UNOP myop;
447
448 init_stacks (); /* from perl.c */
449 PL_op = (OP *)&myop;
450 /*PL_curcop = 0;*/
451 GvAV (PL_defgv) = (AV *)SvREFCNT_inc ((SV *)next->args);
452
453 SPAGAIN; 3046 SPAGAIN;
454 Zero(&myop, 1, UNOP);
455 myop.op_next = Nullop;
456 myop.op_flags = OPf_WANT_VOID;
457
458 PUSHMARK(SP);
459 XPUSHs ((SV*)get_cv(SUB_INIT, TRUE));
460 PUTBACK;
461 /*
462 * the next line is slightly wrong, as PL_op->op_next
463 * is actually being executed so we skip the first op.
464 * that doesn't matter, though, since it is only
465 * pp_nextstate and we never return...
466 */
467 PL_op = Perl_pp_entersub(aTHX);
468 SPAGAIN;
469
470 ENTER;
471 } 3047 }
472 } 3048 }
3049}
3050
3051SV *
3052is_ready (Coro::State coro)
3053 PROTOTYPE: $
3054 ALIAS:
3055 is_ready = CF_READY
3056 is_running = CF_RUNNING
3057 is_new = CF_NEW
3058 is_destroyed = CF_DESTROYED
3059 CODE:
3060 RETVAL = boolSV (coro->flags & ix);
3061 OUTPUT:
3062 RETVAL
473 3063
474void 3064void
475DESTROY(coro) 3065throw (Coro::State self, SV *throw = &PL_sv_undef)
476 Coro::State coro 3066 PROTOTYPE: $;$
477 CODE: 3067 CODE:
3068{
3069 struct coro *current = SvSTATE_current;
3070 SV **throwp = self == current ? &CORO_THROW : &self->except;
3071 SvREFCNT_dec (*throwp);
3072 *throwp = SvOK (throw) ? newSVsv (throw) : 0;
3073}
478 3074
479 if (coro->mainstack) 3075void
3076api_trace (SV *coro, int flags = CC_TRACE | CC_TRACE_SUB)
3077 PROTOTYPE: $;$
3078 C_ARGS: aTHX_ coro, flags
3079
3080SV *
3081has_cctx (Coro::State coro)
3082 PROTOTYPE: $
3083 CODE:
3084 /* maybe manage the running flag differently */
3085 RETVAL = boolSV (!!coro->cctx || (coro->flags & CF_RUNNING));
3086 OUTPUT:
3087 RETVAL
3088
3089int
3090is_traced (Coro::State coro)
3091 PROTOTYPE: $
3092 CODE:
3093 RETVAL = (coro->cctx ? coro->cctx->flags : 0) & CC_TRACE_ALL;
3094 OUTPUT:
3095 RETVAL
3096
3097UV
3098rss (Coro::State coro)
3099 PROTOTYPE: $
3100 ALIAS:
3101 usecount = 1
3102 CODE:
3103 switch (ix)
3104 {
3105 case 0: RETVAL = coro_rss (aTHX_ coro); break;
3106 case 1: RETVAL = coro->usecount; break;
3107 }
3108 OUTPUT:
3109 RETVAL
3110
3111void
3112force_cctx ()
3113 PROTOTYPE:
3114 CODE:
3115 cctx_current->idle_sp = 0;
3116
3117void
3118swap_defsv (Coro::State self)
3119 PROTOTYPE: $
3120 ALIAS:
3121 swap_defav = 1
3122 CODE:
3123 if (!self->slot)
3124 croak ("cannot swap state with coroutine that has no saved state,");
3125 else
480 { 3126 {
481 struct coro temp; 3127 SV **src = ix ? (SV **)&GvAV (PL_defgv) : &GvSV (PL_defgv);
3128 SV **dst = ix ? (SV **)&self->slot->defav : (SV **)&self->slot->defsv;
482 3129
483 SAVE(aTHX_ (&temp)); 3130 SV *tmp = *src; *src = *dst; *dst = tmp;
484 LOAD(aTHX_ coro);
485
486 destroy_stacks ();
487 SvREFCNT_dec ((SV *)GvAV (PL_defgv));
488
489 LOAD((&temp));
490 } 3131 }
491 3132
3133
3134MODULE = Coro::State PACKAGE = Coro
3135
3136BOOT:
3137{
3138 int i;
3139
3140 sv_pool_rss = coro_get_sv (aTHX_ "Coro::POOL_RSS" , TRUE);
3141 sv_pool_size = coro_get_sv (aTHX_ "Coro::POOL_SIZE" , TRUE);
3142 cv_coro_run = get_cv ( "Coro::_coro_run" , GV_ADD);
3143 cv_coro_terminate = get_cv ( "Coro::terminate" , GV_ADD);
3144 coro_current = coro_get_sv (aTHX_ "Coro::current" , FALSE); SvREADONLY_on (coro_current);
3145 av_async_pool = coro_get_av (aTHX_ "Coro::async_pool", TRUE);
3146 av_destroy = coro_get_av (aTHX_ "Coro::destroy" , TRUE);
3147 sv_manager = coro_get_sv (aTHX_ "Coro::manager" , TRUE);
3148 sv_idle = coro_get_sv (aTHX_ "Coro::idle" , TRUE);
3149
3150 sv_async_pool_idle = newSVpv ("[async pool idle]", 0); SvREADONLY_on (sv_async_pool_idle);
3151 sv_Coro = newSVpv ("Coro", 0); SvREADONLY_on (sv_Coro);
3152 cv_pool_handler = get_cv ("Coro::pool_handler", GV_ADD); SvREADONLY_on (cv_pool_handler);
3153 cv_coro_state_new = get_cv ("Coro::State::new", 0); SvREADONLY_on (cv_coro_state_new);
3154
3155 coro_stash = gv_stashpv ("Coro", TRUE);
3156
3157 newCONSTSUB (coro_stash, "PRIO_MAX", newSViv (PRIO_MAX));
3158 newCONSTSUB (coro_stash, "PRIO_HIGH", newSViv (PRIO_HIGH));
3159 newCONSTSUB (coro_stash, "PRIO_NORMAL", newSViv (PRIO_NORMAL));
3160 newCONSTSUB (coro_stash, "PRIO_LOW", newSViv (PRIO_LOW));
3161 newCONSTSUB (coro_stash, "PRIO_IDLE", newSViv (PRIO_IDLE));
3162 newCONSTSUB (coro_stash, "PRIO_MIN", newSViv (PRIO_MIN));
3163
3164 for (i = PRIO_MAX - PRIO_MIN + 1; i--; )
3165 coro_ready[i] = newAV ();
3166
3167 {
3168 SV *sv = coro_get_sv (aTHX_ "Coro::API", TRUE);
3169
3170 coroapi.schedule = api_schedule;
3171 coroapi.schedule_to = api_schedule_to;
3172 coroapi.cede = api_cede;
3173 coroapi.cede_notself = api_cede_notself;
3174 coroapi.ready = api_ready;
3175 coroapi.is_ready = api_is_ready;
3176 coroapi.nready = coro_nready;
3177 coroapi.current = coro_current;
3178
3179 /*GCoroAPI = &coroapi;*/
3180 sv_setiv (sv, (IV)&coroapi);
3181 SvREADONLY_on (sv);
3182 }
3183}
3184
3185void
3186terminate (...)
3187 CODE:
3188 CORO_EXECUTE_SLF_XS (slf_init_terminate);
3189
3190void
3191schedule (...)
3192 CODE:
3193 CORO_EXECUTE_SLF_XS (slf_init_schedule);
3194
3195void
3196schedule_to (...)
3197 CODE:
3198 CORO_EXECUTE_SLF_XS (slf_init_schedule_to);
3199
3200void
3201cede_to (...)
3202 CODE:
3203 CORO_EXECUTE_SLF_XS (slf_init_cede_to);
3204
3205void
3206cede (...)
3207 CODE:
3208 CORO_EXECUTE_SLF_XS (slf_init_cede);
3209
3210void
3211cede_notself (...)
3212 CODE:
3213 CORO_EXECUTE_SLF_XS (slf_init_cede_notself);
3214
3215void
3216_cancel (Coro::State self)
3217 CODE:
3218 coro_state_destroy (aTHX_ self);
3219 coro_call_on_destroy (aTHX_ self);
3220
3221void
3222_set_current (SV *current)
3223 PROTOTYPE: $
3224 CODE:
3225 SvREFCNT_dec (SvRV (coro_current));
3226 SvRV_set (coro_current, SvREFCNT_inc_NN (SvRV (current)));
3227
3228void
3229_set_readyhook (SV *hook)
3230 PROTOTYPE: $
3231 CODE:
492 SvREFCNT_dec (coro->args); 3232 SvREFCNT_dec (coro_readyhook);
493 Safefree (coro); 3233 coro_readyhook = SvOK (hook) ? newSVsv (hook) : 0;
494 3234
3235int
3236prio (Coro::State coro, int newprio = 0)
3237 PROTOTYPE: $;$
3238 ALIAS:
3239 nice = 1
3240 CODE:
3241{
3242 RETVAL = coro->prio;
495 3243
3244 if (items > 1)
3245 {
3246 if (ix)
3247 newprio = coro->prio - newprio;
3248
3249 if (newprio < PRIO_MIN) newprio = PRIO_MIN;
3250 if (newprio > PRIO_MAX) newprio = PRIO_MAX;
3251
3252 coro->prio = newprio;
3253 }
3254}
3255 OUTPUT:
3256 RETVAL
3257
3258SV *
3259ready (SV *self)
3260 PROTOTYPE: $
3261 CODE:
3262 RETVAL = boolSV (api_ready (aTHX_ self));
3263 OUTPUT:
3264 RETVAL
3265
3266int
3267nready (...)
3268 PROTOTYPE:
3269 CODE:
3270 RETVAL = coro_nready;
3271 OUTPUT:
3272 RETVAL
3273
3274void
3275_pool_handler (...)
3276 CODE:
3277 CORO_EXECUTE_SLF_XS (slf_init_pool_handler);
3278
3279void
3280async_pool (SV *cv, ...)
3281 PROTOTYPE: &@
3282 PPCODE:
3283{
3284 HV *hv = (HV *)av_pop (av_async_pool);
3285 AV *av = newAV ();
3286 SV *cb = ST (0);
3287 int i;
3288
3289 av_extend (av, items - 2);
3290 for (i = 1; i < items; ++i)
3291 av_push (av, SvREFCNT_inc_NN (ST (i)));
3292
3293 if ((SV *)hv == &PL_sv_undef)
3294 {
3295 PUSHMARK (SP);
3296 EXTEND (SP, 2);
3297 PUSHs (sv_Coro);
3298 PUSHs ((SV *)cv_pool_handler);
3299 PUTBACK;
3300 call_sv ((SV *)cv_coro_state_new, G_SCALAR);
3301 SPAGAIN;
3302
3303 hv = (HV *)SvREFCNT_inc_NN (SvRV (POPs));
3304 }
3305
3306 {
3307 struct coro *coro = SvSTATE_hv (hv);
3308
3309 assert (!coro->invoke_cb);
3310 assert (!coro->invoke_av);
3311 coro->invoke_cb = SvREFCNT_inc (cb);
3312 coro->invoke_av = av;
3313 }
3314
3315 api_ready (aTHX_ (SV *)hv);
3316
3317 if (GIMME_V != G_VOID)
3318 XPUSHs (sv_2mortal (newRV_noinc ((SV *)hv)));
3319 else
3320 SvREFCNT_dec (hv);
3321}
3322
3323SV *
3324rouse_cb ()
3325 PROTOTYPE:
3326 CODE:
3327 RETVAL = coro_new_rouse_cb (aTHX);
3328 OUTPUT:
3329 RETVAL
3330
3331void
3332rouse_wait (...)
3333 PROTOTYPE: ;$
3334 PPCODE:
3335 CORO_EXECUTE_SLF_XS (slf_init_rouse_wait);
3336
3337
3338MODULE = Coro::State PACKAGE = PerlIO::cede
3339
3340BOOT:
3341 PerlIO_define_layer (aTHX_ &PerlIO_cede);
3342
3343
3344MODULE = Coro::State PACKAGE = Coro::Semaphore
3345
3346SV *
3347new (SV *klass, SV *count = 0)
3348 CODE:
3349 RETVAL = sv_bless (
3350 coro_waitarray_new (aTHX_ count && SvOK (count) ? SvIV (count) : 1),
3351 GvSTASH (CvGV (cv))
3352 );
3353 OUTPUT:
3354 RETVAL
3355
3356# helper for Coro::Channel
3357SV *
3358_alloc (int count)
3359 CODE:
3360 RETVAL = coro_waitarray_new (aTHX_ count);
3361 OUTPUT:
3362 RETVAL
3363
3364SV *
3365count (SV *self)
3366 CODE:
3367 RETVAL = newSVsv (AvARRAY ((AV *)SvRV (self))[0]);
3368 OUTPUT:
3369 RETVAL
3370
3371void
3372up (SV *self, int adjust = 1)
3373 ALIAS:
3374 adjust = 1
3375 CODE:
3376 coro_semaphore_adjust (aTHX_ (AV *)SvRV (self), ix ? adjust : 1);
3377
3378void
3379down (...)
3380 CODE:
3381 CORO_EXECUTE_SLF_XS (slf_init_semaphore_down);
3382
3383void
3384wait (...)
3385 CODE:
3386 CORO_EXECUTE_SLF_XS (slf_init_semaphore_wait);
3387
3388void
3389try (SV *self)
3390 PPCODE:
3391{
3392 AV *av = (AV *)SvRV (self);
3393 SV *count_sv = AvARRAY (av)[0];
3394 IV count = SvIVX (count_sv);
3395
3396 if (count > 0)
3397 {
3398 --count;
3399 SvIVX (count_sv) = count;
3400 XSRETURN_YES;
3401 }
3402 else
3403 XSRETURN_NO;
3404}
3405
3406void
3407waiters (SV *self)
3408 PPCODE:
3409{
3410 AV *av = (AV *)SvRV (self);
3411 int wcount = AvFILLp (av) + 1 - 1;
3412
3413 if (GIMME_V == G_SCALAR)
3414 XPUSHs (sv_2mortal (newSViv (wcount)));
3415 else
3416 {
3417 int i;
3418 EXTEND (SP, wcount);
3419 for (i = 1; i <= wcount; ++i)
3420 PUSHs (sv_2mortal (newRV_inc (AvARRAY (av)[i])));
3421 }
3422}
3423
3424MODULE = Coro::State PACKAGE = Coro::Signal
3425
3426SV *
3427new (SV *klass)
3428 CODE:
3429 RETVAL = sv_bless (
3430 coro_waitarray_new (aTHX_ 0),
3431 GvSTASH (CvGV (cv))
3432 );
3433 OUTPUT:
3434 RETVAL
3435
3436void
3437wait (...)
3438 CODE:
3439 CORO_EXECUTE_SLF_XS (slf_init_signal_wait);
3440
3441void
3442broadcast (SV *self)
3443 CODE:
3444{
3445 AV *av = (AV *)SvRV (self);
3446 coro_signal_wake (aTHX_ av, AvFILLp (av));
3447}
3448
3449void
3450send (SV *self)
3451 CODE:
3452{
3453 AV *av = (AV *)SvRV (self);
3454
3455 if (AvFILLp (av))
3456 coro_signal_wake (aTHX_ av, 1);
3457 else
3458 SvIVX (AvARRAY (av)[0]) = 1; /* remember the signal */
3459}
3460
3461IV
3462awaited (SV *self)
3463 CODE:
3464 RETVAL = AvFILLp ((AV *)SvRV (self)) + 1 - 1;
3465 OUTPUT:
3466 RETVAL
3467
3468
3469MODULE = Coro::State PACKAGE = Coro::AnyEvent
3470
3471BOOT:
3472 sv_activity = coro_get_sv (aTHX_ "Coro::AnyEvent::ACTIVITY", TRUE);
3473
3474void
3475_schedule (...)
3476 CODE:
3477{
3478 static int incede;
3479
3480 api_cede_notself (aTHX);
3481
3482 ++incede;
3483 while (coro_nready >= incede && api_cede (aTHX))
3484 ;
3485
3486 sv_setsv (sv_activity, &PL_sv_undef);
3487 if (coro_nready >= incede)
3488 {
3489 PUSHMARK (SP);
3490 PUTBACK;
3491 call_pv ("Coro::AnyEvent::_activity", G_KEEPERR | G_EVAL | G_VOID | G_DISCARD);
3492 }
3493
3494 --incede;
3495}
3496
3497
3498MODULE = Coro::State PACKAGE = Coro::AIO
3499
3500void
3501_register (char *target, char *proto, SV *req)
3502 CODE:
3503{
3504 CV *req_cv = coro_sv_2cv (aTHX_ req);
3505 /* newXSproto doesn't return the CV on 5.8 */
3506 CV *slf_cv = newXS (target, coro_aio_req_xs, __FILE__);
3507 sv_setpv ((SV *)slf_cv, proto);
3508 sv_magicext ((SV *)slf_cv, (SV *)req_cv, CORO_MAGIC_type_aio, 0, 0, 0);
3509}
3510

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines