ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/Coro/Coro/State.xs
(Generate patch)

Comparing Coro/Coro/State.xs (file contents):
Revision 1.3 by root, Tue Jul 17 00:24:15 2001 UTC vs.
Revision 1.338 by root, Sun Dec 7 15:33:21 2008 UTC

1#include "libcoro/coro.c"
2
3#define PERL_NO_GET_CONTEXT
4#define PERL_EXT
5
1#include "EXTERN.h" 6#include "EXTERN.h"
2#include "perl.h" 7#include "perl.h"
3#include "XSUB.h" 8#include "XSUB.h"
9#include "perliol.h"
4 10
5#if 0 11#include "patchlevel.h"
6# define CHK(x) (void *)0 12
13#include <stdio.h>
14#include <errno.h>
15#include <assert.h>
16
17#ifdef WIN32
18# undef setjmp
19# undef longjmp
20# undef _exit
21# define setjmp _setjmp /* deep magic */
7#else 22#else
8# define CHK(x) if (!(x)) croak("FATAL, CHK: " #x) 23# include <inttypes.h> /* most portable stdint.h */
24#endif
25
26#ifdef HAVE_MMAP
27# include <unistd.h>
28# include <sys/mman.h>
29# ifndef MAP_ANONYMOUS
30# ifdef MAP_ANON
31# define MAP_ANONYMOUS MAP_ANON
32# else
33# undef HAVE_MMAP
34# endif
9#endif 35# endif
36# include <limits.h>
37# ifndef PAGESIZE
38# define PAGESIZE pagesize
39# define BOOT_PAGESIZE pagesize = sysconf (_SC_PAGESIZE)
40static long pagesize;
41# else
42# define BOOT_PAGESIZE (void)0
43# endif
44#else
45# define PAGESIZE 0
46# define BOOT_PAGESIZE (void)0
47#endif
10 48
49#if CORO_USE_VALGRIND
50# include <valgrind/valgrind.h>
51#endif
52
53/* the maximum number of idle cctx that will be pooled */
54static int cctx_max_idle = 4;
55
56#define PERL_VERSION_ATLEAST(a,b,c) \
57 (PERL_REVISION > (a) \
58 || (PERL_REVISION == (a) \
59 && (PERL_VERSION > (b) \
60 || (PERL_VERSION == (b) && PERL_SUBVERSION >= (c)))))
61
62#if !PERL_VERSION_ATLEAST (5,6,0)
63# ifndef PL_ppaddr
64# define PL_ppaddr ppaddr
65# endif
66# ifndef call_sv
67# define call_sv perl_call_sv
68# endif
69# ifndef get_sv
70# define get_sv perl_get_sv
71# endif
72# ifndef get_cv
73# define get_cv perl_get_cv
74# endif
75# ifndef IS_PADGV
76# define IS_PADGV(v) 0
77# endif
78# ifndef IS_PADCONST
79# define IS_PADCONST(v) 0
80# endif
81#endif
82
83/* 5.11 */
84#ifndef CxHASARGS
85# define CxHASARGS(cx) (cx)->blk_sub.hasargs
86#endif
87
88/* 5.10.0 */
89#ifndef SvREFCNT_inc_NN
90# define SvREFCNT_inc_NN(sv) SvREFCNT_inc (sv)
91#endif
92
93/* 5.8.8 */
94#ifndef GV_NOTQUAL
95# define GV_NOTQUAL 0
96#endif
97#ifndef newSV
98# define newSV(l) NEWSV(0,l)
99#endif
100#ifndef CvISXSUB_on
101# define CvISXSUB_on(cv) (void)cv
102#endif
103#ifndef CvISXSUB
104# define CvISXSUB(cv) (CvXSUB (cv) ? TRUE : FALSE)
105#endif
106#ifndef Newx
107# define Newx(ptr,nitems,type) New (0,ptr,nitems,type)
108#endif
109
110/* 5.8.7 */
111#ifndef SvRV_set
112# define SvRV_set(s,v) SvRV(s) = (v)
113#endif
114
115#if !__i386 && !__x86_64 && !__powerpc && !__m68k && !__alpha && !__mips && !__sparc64
116# undef CORO_STACKGUARD
117#endif
118
119#ifndef CORO_STACKGUARD
120# define CORO_STACKGUARD 0
121#endif
122
123/* prefer perl internal functions over our own? */
124#ifndef CORO_PREFER_PERL_FUNCTIONS
125# define CORO_PREFER_PERL_FUNCTIONS 0
126#endif
127
128/* The next macros try to return the current stack pointer, in an as
129 * portable way as possible. */
130#if __GNUC__ >= 4
131# define dSTACKLEVEL int stacklevel_dummy
132# define STACKLEVEL __builtin_frame_address (0)
133#else
134# define dSTACKLEVEL volatile void *stacklevel
135# define STACKLEVEL ((void *)&stacklevel)
136#endif
137
138#define IN_DESTRUCT PL_dirty
139
140#if __GNUC__ >= 3
141# define attribute(x) __attribute__(x)
142# define expect(expr,value) __builtin_expect ((expr),(value))
143# define INLINE static inline
144#else
145# define attribute(x)
146# define expect(expr,value) (expr)
147# define INLINE static
148#endif
149
150#define expect_false(expr) expect ((expr) != 0, 0)
151#define expect_true(expr) expect ((expr) != 0, 1)
152
153#define NOINLINE attribute ((noinline))
154
155#include "CoroAPI.h"
156#define GCoroAPI (&coroapi) /* very sneaky */
157
158#ifdef USE_ITHREADS
159# if CORO_PTHREAD
160static void *coro_thx;
161# endif
162#endif
163
164static double (*nvtime)(); /* so why doesn't it take void? */
165
166/* we hijack an hopefully unused CV flag for our purposes */
167#define CVf_SLF 0x4000
168static OP *pp_slf (pTHX);
169
170static U32 cctx_gen;
171static size_t cctx_stacksize = CORO_STACKSIZE;
172static struct CoroAPI coroapi;
173static AV *main_mainstack; /* used to differentiate between $main and others */
174static JMPENV *main_top_env;
175static HV *coro_state_stash, *coro_stash;
176static volatile SV *coro_mortal; /* will be freed/thrown after next transfer */
177
178static AV *av_destroy; /* destruction queue */
179static SV *sv_manager; /* the manager coro */
180static SV *sv_idle; /* $Coro::idle */
181
182static GV *irsgv; /* $/ */
183static GV *stdoutgv; /* *STDOUT */
184static SV *rv_diehook;
185static SV *rv_warnhook;
186static HV *hv_sig; /* %SIG */
187
188/* async_pool helper stuff */
189static SV *sv_pool_rss;
190static SV *sv_pool_size;
191static SV *sv_async_pool_idle; /* description string */
192static AV *av_async_pool; /* idle pool */
193static SV *sv_Coro; /* class string */
194static CV *cv_pool_handler;
195static CV *cv_coro_state_new;
196
197/* Coro::AnyEvent */
198static SV *sv_activity;
199
200static struct coro_cctx *cctx_first;
201static int cctx_count, cctx_idle;
202
203enum {
204 CC_MAPPED = 0x01,
205 CC_NOREUSE = 0x02, /* throw this away after tracing */
206 CC_TRACE = 0x04,
207 CC_TRACE_SUB = 0x08, /* trace sub calls */
208 CC_TRACE_LINE = 0x10, /* trace each statement */
209 CC_TRACE_ALL = CC_TRACE_SUB | CC_TRACE_LINE,
210};
211
212/* this is a structure representing a c-level coroutine */
213typedef struct coro_cctx
214{
215 struct coro_cctx *next;
216
217 /* the stack */
218 void *sptr;
219 size_t ssize;
220
221 /* cpu state */
222 void *idle_sp; /* sp of top-level transfer/schedule/cede call */
223 JMPENV *idle_te; /* same as idle_sp, but for top_env, TODO: remove once stable */
224 JMPENV *top_env;
225 coro_context cctx;
226
227 U32 gen;
228#if CORO_USE_VALGRIND
229 int valgrind_id;
230#endif
231 unsigned char flags;
232} coro_cctx;
233
234coro_cctx *cctx_current; /* the currently running cctx */
235
236/*****************************************************************************/
237
238enum {
239 CF_RUNNING = 0x0001, /* coroutine is running */
240 CF_READY = 0x0002, /* coroutine is ready */
241 CF_NEW = 0x0004, /* has never been switched to */
242 CF_DESTROYED = 0x0008, /* coroutine data has been freed */
243};
244
245/* the structure where most of the perl state is stored, overlaid on the cxstack */
246typedef struct
247{
248 SV *defsv;
249 AV *defav;
250 SV *errsv;
251 SV *irsgv;
252 HV *hinthv;
253#define VAR(name,type) type name;
254# include "state.h"
255#undef VAR
256} perl_slots;
257
258#define SLOT_COUNT ((sizeof (perl_slots) + sizeof (PERL_CONTEXT) - 1) / sizeof (PERL_CONTEXT))
259
260/* this is a structure representing a perl-level coroutine */
11struct coro { 261struct coro {
12 U8 dowarn; 262 /* the C coroutine allocated to this perl coroutine, if any */
13 AV *defav; 263 coro_cctx *cctx;
14 264
15 PERL_SI *curstackinfo; 265 /* state data */
16 AV *curstack; 266 struct CoroSLF slf_frame; /* saved slf frame */
17 AV *mainstack; 267 AV *mainstack;
18 SV **stack_sp; 268 perl_slots *slot; /* basically the saved sp */
19 OP *op;
20 SV **curpad;
21 SV **stack_base;
22 SV **stack_max;
23 SV **tmps_stack;
24 I32 tmps_floor;
25 I32 tmps_ix;
26 I32 tmps_max;
27 I32 *markstack;
28 I32 *markstack_ptr;
29 I32 *markstack_max;
30 I32 *scopestack;
31 I32 scopestack_ix;
32 I32 scopestack_max;
33 ANY *savestack;
34 I32 savestack_ix;
35 I32 savestack_max;
36 OP **retstack;
37 I32 retstack_ix;
38 I32 retstack_max;
39 COP *curcop;
40 269
41 AV *args; 270 CV *startcv; /* the CV to execute */
271 AV *args; /* data associated with this coroutine (initial args) */
272 int refcnt; /* coroutines are refcounted, yes */
273 int flags; /* CF_ flags */
274 HV *hv; /* the perl hash associated with this coro, if any */
275 void (*on_destroy)(pTHX_ struct coro *coro);
276
277 /* statistics */
278 int usecount; /* number of transfers to this coro */
279
280 /* coro process data */
281 int prio;
282 SV *except; /* exception to be thrown */
283 SV *rouse_cb;
284
285 /* async_pool */
286 SV *saved_deffh;
287 SV *invoke_cb;
288 AV *invoke_av;
289
290 /* linked list */
291 struct coro *next, *prev;
42}; 292};
43 293
44typedef struct coro *Coro__State; 294typedef struct coro *Coro__State;
45typedef struct coro *Coro__State_or_hashref; 295typedef struct coro *Coro__State_or_hashref;
46 296
47static HV *padlist_cache; 297/* the following variables are effectively part of the perl context */
298/* and get copied between struct coro and these variables */
299/* the mainr easonw e don't support windows process emulation */
300static struct CoroSLF slf_frame; /* the current slf frame */
48 301
49/* mostly copied from op.c:cv_clone2 */ 302/** Coro ********************************************************************/
50STATIC AV * 303
51clone_padlist (AV *protopadlist) 304#define PRIO_MAX 3
305#define PRIO_HIGH 1
306#define PRIO_NORMAL 0
307#define PRIO_LOW -1
308#define PRIO_IDLE -3
309#define PRIO_MIN -4
310
311/* for Coro.pm */
312static SV *coro_current;
313static SV *coro_readyhook;
314static AV *coro_ready [PRIO_MAX - PRIO_MIN + 1];
315static CV *cv_coro_run, *cv_coro_terminate;
316static struct coro *coro_first;
317#define coro_nready coroapi.nready
318
319/** lowlevel stuff **********************************************************/
320
321static SV *
322coro_get_sv (pTHX_ const char *name, int create)
52{ 323{
53 AV *av; 324#if PERL_VERSION_ATLEAST (5,10,0)
54 I32 ix; 325 /* silence stupid and wrong 5.10 warning that I am unable to switch off */
55 AV *protopad_name = (AV *) * av_fetch (protopadlist, 0, FALSE); 326 get_sv (name, create);
56 AV *protopad = (AV *) * av_fetch (protopadlist, 1, FALSE); 327#endif
57 SV **pname = AvARRAY (protopad_name); 328 return get_sv (name, create);
58 SV **ppad = AvARRAY (protopad); 329}
59 I32 fname = AvFILLp (protopad_name); 330
60 I32 fpad = AvFILLp (protopad); 331static AV *
332coro_get_av (pTHX_ const char *name, int create)
333{
334#if PERL_VERSION_ATLEAST (5,10,0)
335 /* silence stupid and wrong 5.10 warning that I am unable to switch off */
336 get_av (name, create);
337#endif
338 return get_av (name, create);
339}
340
341static HV *
342coro_get_hv (pTHX_ const char *name, int create)
343{
344#if PERL_VERSION_ATLEAST (5,10,0)
345 /* silence stupid and wrong 5.10 warning that I am unable to switch off */
346 get_hv (name, create);
347#endif
348 return get_hv (name, create);
349}
350
351/* may croak */
352INLINE CV *
353coro_sv_2cv (pTHX_ SV *sv)
354{
355 HV *st;
356 GV *gvp;
357 return sv_2cv (sv, &st, &gvp, 0);
358}
359
360/*****************************************************************************/
361/* magic glue */
362
363#define CORO_MAGIC_type_cv 26
364#define CORO_MAGIC_type_state PERL_MAGIC_ext
365
366#define CORO_MAGIC_NN(sv, type) \
367 (expect_true (SvMAGIC (sv)->mg_type == type) \
368 ? SvMAGIC (sv) \
369 : mg_find (sv, type))
370
371#define CORO_MAGIC(sv, type) \
372 (expect_true (SvMAGIC (sv)) \
373 ? CORO_MAGIC_NN (sv, type) \
374 : 0)
375
376#define CORO_MAGIC_cv(cv) CORO_MAGIC (((SV *)(cv)), CORO_MAGIC_type_cv)
377#define CORO_MAGIC_state(sv) CORO_MAGIC_NN (((SV *)(sv)), CORO_MAGIC_type_state)
378
379INLINE struct coro *
380SvSTATE_ (pTHX_ SV *coro)
381{
382 HV *stash;
383 MAGIC *mg;
384
385 if (SvROK (coro))
386 coro = SvRV (coro);
387
388 if (expect_false (SvTYPE (coro) != SVt_PVHV))
389 croak ("Coro::State object required");
390
391 stash = SvSTASH (coro);
392 if (expect_false (stash != coro_stash && stash != coro_state_stash))
393 {
394 /* very slow, but rare, check */
395 if (!sv_derived_from (sv_2mortal (newRV_inc (coro)), "Coro::State"))
396 croak ("Coro::State object required");
397 }
398
399 mg = CORO_MAGIC_state (coro);
400 return (struct coro *)mg->mg_ptr;
401}
402
403#define SvSTATE(sv) SvSTATE_ (aTHX_ (sv))
404
405/* faster than SvSTATE, but expects a coroutine hv */
406#define SvSTATE_hv(hv) ((struct coro *)CORO_MAGIC_NN ((SV *)hv, CORO_MAGIC_type_state)->mg_ptr)
407#define SvSTATE_current SvSTATE_hv (SvRV (coro_current))
408
409/*****************************************************************************/
410/* padlist management and caching */
411
412static AV *
413coro_derive_padlist (pTHX_ CV *cv)
414{
415 AV *padlist = CvPADLIST (cv);
61 AV *newpadlist, *newpad_name, *newpad; 416 AV *newpadlist, *newpad;
62 SV **npad;
63
64 newpad_name = newAV ();
65 for (ix = fname; ix >= 0; ix--)
66 av_store (newpad_name, ix, SvREFCNT_inc (pname[ix]));
67
68 newpad = newAV ();
69 av_fill (newpad, AvFILLp (protopad));
70 npad = AvARRAY (newpad);
71 417
72 newpadlist = newAV (); 418 newpadlist = newAV ();
73 AvREAL_off (newpadlist); 419 AvREAL_off (newpadlist);
74 av_store (newpadlist, 0, (SV *) newpad_name); 420#if PERL_VERSION_ATLEAST (5,10,0)
421 Perl_pad_push (aTHX_ padlist, AvFILLp (padlist) + 1);
422#else
423 Perl_pad_push (aTHX_ padlist, AvFILLp (padlist) + 1, 1);
424#endif
425 newpad = (AV *)AvARRAY (padlist)[AvFILLp (padlist)];
426 --AvFILLp (padlist);
427
428 av_store (newpadlist, 0, SvREFCNT_inc_NN (AvARRAY (padlist)[0]));
75 av_store (newpadlist, 1, (SV *) newpad); 429 av_store (newpadlist, 1, (SV *)newpad);
76 430
77 av = newAV (); /* will be @_ */ 431 return newpadlist;
78 av_extend (av, 0); 432}
79 av_store (newpad, 0, (SV *) av);
80 AvFLAGS (av) = AVf_REIFY;
81 433
82 for (ix = fpad; ix > 0; ix--) 434static void
435free_padlist (pTHX_ AV *padlist)
436{
437 /* may be during global destruction */
438 if (!IN_DESTRUCT)
83 { 439 {
84 SV *namesv = (ix <= fname) ? pname[ix] : Nullsv; 440 I32 i = AvFILLp (padlist);
85 if (namesv && namesv != &PL_sv_undef) 441
442 while (i > 0) /* special-case index 0 */
86 { 443 {
87 char *name = SvPVX (namesv); /* XXX */ 444 /* we try to be extra-careful here */
88 if (SvFLAGS (namesv) & SVf_FAKE || *name == '&') 445 AV *av = (AV *)AvARRAY (padlist)[i--];
89 { /* lexical from outside? */ 446 I32 j = AvFILLp (av);
90 npad[ix] = SvREFCNT_inc (ppad[ix]); 447
91 } 448 while (j >= 0)
92 else 449 SvREFCNT_dec (AvARRAY (av)[j--]);
93 { /* our own lexical */ 450
94 SV *sv; 451 AvFILLp (av) = -1;
95 if (*name == '&') 452 SvREFCNT_dec (av);
96 sv = SvREFCNT_inc (ppad[ix]);
97 else if (*name == '@')
98 sv = (SV *) newAV ();
99 else if (*name == '%')
100 sv = (SV *) newHV ();
101 else
102 sv = NEWSV (0, 0);
103 if (!SvPADBUSY (sv))
104 SvPADMY_on (sv);
105 npad[ix] = sv;
106 }
107 } 453 }
108 else if (IS_PADGV (ppad[ix]) || IS_PADCONST (ppad[ix]))
109 {
110 npad[ix] = SvREFCNT_inc (ppad[ix]);
111 }
112 else
113 {
114 SV *sv = NEWSV (0, 0);
115 SvPADTMP_on (sv);
116 npad[ix] = sv;
117 }
118 }
119 454
120#if 0 /* NONOTUNDERSTOOD */ 455 SvREFCNT_dec (AvARRAY (padlist)[0]);
121 /* Now that vars are all in place, clone nested closures. */
122 456
123 for (ix = fpad; ix > 0; ix--) {
124 SV* namesv = (ix <= fname) ? pname[ix] : Nullsv;
125 if (namesv
126 && namesv != &PL_sv_undef
127 && !(SvFLAGS(namesv) & SVf_FAKE)
128 && *SvPVX(namesv) == '&'
129 && CvCLONE(ppad[ix]))
130 {
131 CV *kid = cv_clone((CV*)ppad[ix]);
132 SvREFCNT_dec(ppad[ix]);
133 CvCLONE_on(kid);
134 SvPADMY_on(kid);
135 npad[ix] = (SV*)kid;
136 }
137 }
138#endif
139
140 return newpadlist;
141}
142
143STATIC AV *
144free_padlist (AV *padlist)
145{
146 /* may be during global destruction */
147 if (SvREFCNT(padlist))
148 {
149 I32 i = AvFILLp(padlist); 457 AvFILLp (padlist) = -1;
150 while (i >= 0)
151 {
152 SV **svp = av_fetch(padlist, i--, FALSE);
153 SV *sv = svp ? *svp : Nullsv;
154 if (sv)
155 SvREFCNT_dec(sv);
156 }
157
158 SvREFCNT_dec((SV*)padlist); 458 SvREFCNT_dec ((SV*)padlist);
459 }
460}
461
462static int
463coro_cv_free (pTHX_ SV *sv, MAGIC *mg)
464{
465 AV *padlist;
466 AV *av = (AV *)mg->mg_obj;
467
468 /* casting is fun. */
469 while (&PL_sv_undef != (SV *)(padlist = (AV *)av_pop (av)))
470 free_padlist (aTHX_ padlist);
471
472 SvREFCNT_dec (av); /* sv_magicext increased the refcount */
473
474 return 0;
475}
476
477static MGVTBL coro_cv_vtbl = {
478 0, 0, 0, 0,
479 coro_cv_free
480};
481
482/* the next two functions merely cache the padlists */
483static void
484get_padlist (pTHX_ CV *cv)
485{
486 MAGIC *mg = CORO_MAGIC_cv (cv);
487 AV *av;
488
489 if (expect_true (mg && AvFILLp ((av = (AV *)mg->mg_obj)) >= 0))
490 CvPADLIST (cv) = (AV *)AvARRAY (av)[AvFILLp (av)--];
491 else
492 {
493#if CORO_PREFER_PERL_FUNCTIONS
494 /* this is probably cleaner? but also slower! */
495 /* in practise, it seems to be less stable */
496 CV *cp = Perl_cv_clone (aTHX_ cv);
497 CvPADLIST (cv) = CvPADLIST (cp);
498 CvPADLIST (cp) = 0;
499 SvREFCNT_dec (cp);
500#else
501 CvPADLIST (cv) = coro_derive_padlist (aTHX_ cv);
502#endif
503 }
504}
505
506static void
507put_padlist (pTHX_ CV *cv)
508{
509 MAGIC *mg = CORO_MAGIC_cv (cv);
510 AV *av;
511
512 if (expect_false (!mg))
513 mg = sv_magicext ((SV *)cv, (SV *)newAV (), CORO_MAGIC_type_cv, &coro_cv_vtbl, 0, 0);
514
515 av = (AV *)mg->mg_obj;
516
517 if (expect_false (AvFILLp (av) >= AvMAX (av)))
518 av_extend (av, AvFILLp (av) + 1);
519
520 AvARRAY (av)[++AvFILLp (av)] = (SV *)CvPADLIST (cv);
521}
522
523/** load & save, init *******************************************************/
524
525static void
526load_perl (pTHX_ Coro__State c)
527{
528 perl_slots *slot = c->slot;
529 c->slot = 0;
530
531 PL_mainstack = c->mainstack;
532
533 GvSV (PL_defgv) = slot->defsv;
534 GvAV (PL_defgv) = slot->defav;
535 GvSV (PL_errgv) = slot->errsv;
536 GvSV (irsgv) = slot->irsgv;
537 GvHV (PL_hintgv) = slot->hinthv;
538
539 #define VAR(name,type) PL_ ## name = slot->name;
540 # include "state.h"
541 #undef VAR
542
543 {
544 dSP;
545
546 CV *cv;
547
548 /* now do the ugly restore mess */
549 while (expect_true (cv = (CV *)POPs))
550 {
551 put_padlist (aTHX_ cv); /* mark this padlist as available */
552 CvDEPTH (cv) = PTR2IV (POPs);
553 CvPADLIST (cv) = (AV *)POPs;
554 }
555
556 PUTBACK;
159 } 557 }
160}
161 558
162STATIC AV * 559 slf_frame = c->slf_frame;
163unuse_padlist (AV *padlist) 560 CORO_THROW = c->except;
164{
165 free_padlist (padlist);
166} 561}
167 562
168static void 563static void
169SAVE(pTHX_ Coro__State c) 564save_perl (pTHX_ Coro__State c)
170{ 565{
566 c->except = CORO_THROW;
567 c->slf_frame = slf_frame;
568
171 { 569 {
172 dSP; 570 dSP;
173 I32 cxix = cxstack_ix; 571 I32 cxix = cxstack_ix;
572 PERL_CONTEXT *ccstk = cxstack;
174 PERL_SI *top_si = PL_curstackinfo; 573 PERL_SI *top_si = PL_curstackinfo;
175 PERL_CONTEXT *ccstk = cxstack;
176 574
177 /* 575 /*
178 * the worst thing you can imagine happens first - we have to save 576 * the worst thing you can imagine happens first - we have to save
179 * (and reinitialize) all cv's in the whole callchain :( 577 * (and reinitialize) all cv's in the whole callchain :(
180 */ 578 */
181 579
182 PUSHs (Nullsv); 580 XPUSHs (Nullsv);
183 /* this loop was inspired by pp_caller */ 581 /* this loop was inspired by pp_caller */
184 for (;;) 582 for (;;)
185 { 583 {
186 while (cxix >= 0) 584 while (expect_true (cxix >= 0))
187 { 585 {
188 PERL_CONTEXT *cx = &ccstk[--cxix]; 586 PERL_CONTEXT *cx = &ccstk[cxix--];
189 587
190 if (CxTYPE(cx) == CXt_SUB) 588 if (expect_true (CxTYPE (cx) == CXt_SUB || CxTYPE (cx) == CXt_FORMAT))
191 { 589 {
192 CV *cv = cx->blk_sub.cv; 590 CV *cv = cx->blk_sub.cv;
591
193 if (CvDEPTH(cv)) 592 if (expect_true (CvDEPTH (cv)))
194 { 593 {
195#ifdef USE_THREADS
196 XPUSHs ((SV *)CvOWNER(cv));
197#endif
198 EXTEND (SP, 3); 594 EXTEND (SP, 3);
199 PUSHs ((SV *)CvDEPTH(cv));
200 PUSHs ((SV *)CvPADLIST(cv)); 595 PUSHs ((SV *)CvPADLIST (cv));
596 PUSHs (INT2PTR (SV *, (IV)CvDEPTH (cv)));
201 PUSHs ((SV *)cv); 597 PUSHs ((SV *)cv);
202 598
203 CvPADLIST(cv) = clone_padlist (CvPADLIST(cv));
204
205 CvDEPTH(cv) = 0; 599 CvDEPTH (cv) = 0;
206#ifdef USE_THREADS 600 get_padlist (aTHX_ cv);
207 CvOWNER(cv) = 0;
208 error must unlock this cv etc.. etc...
209 if you are here wondering about this error message then
210 the reason is that it will not work as advertised yet
211#endif
212 } 601 }
213 } 602 }
214 else if (CxTYPE(cx) == CXt_FORMAT) 603 }
604
605 if (expect_true (top_si->si_type == PERLSI_MAIN))
606 break;
607
608 top_si = top_si->si_prev;
609 ccstk = top_si->si_cxstack;
610 cxix = top_si->si_cxix;
611 }
612
613 PUTBACK;
614 }
615
616 /* allocate some space on the context stack for our purposes */
617 /* we manually unroll here, as usually 2 slots is enough */
618 if (SLOT_COUNT >= 1) CXINC;
619 if (SLOT_COUNT >= 2) CXINC;
620 if (SLOT_COUNT >= 3) CXINC;
621 {
622 int i;
623 for (i = 3; i < SLOT_COUNT; ++i)
624 CXINC;
625 }
626 cxstack_ix -= SLOT_COUNT; /* undo allocation */
627
628 c->mainstack = PL_mainstack;
629
630 {
631 perl_slots *slot = c->slot = (perl_slots *)(cxstack + cxstack_ix + 1);
632
633 slot->defav = GvAV (PL_defgv);
634 slot->defsv = DEFSV;
635 slot->errsv = ERRSV;
636 slot->irsgv = GvSV (irsgv);
637 slot->hinthv = GvHV (PL_hintgv);
638
639 #define VAR(name,type) slot->name = PL_ ## name;
640 # include "state.h"
641 #undef VAR
642 }
643}
644
645/*
646 * allocate various perl stacks. This is almost an exact copy
647 * of perl.c:init_stacks, except that it uses less memory
648 * on the (sometimes correct) assumption that coroutines do
649 * not usually need a lot of stackspace.
650 */
651#if CORO_PREFER_PERL_FUNCTIONS
652# define coro_init_stacks(thx) init_stacks ()
653#else
654static void
655coro_init_stacks (pTHX)
656{
657 PL_curstackinfo = new_stackinfo(32, 8);
658 PL_curstackinfo->si_type = PERLSI_MAIN;
659 PL_curstack = PL_curstackinfo->si_stack;
660 PL_mainstack = PL_curstack; /* remember in case we switch stacks */
661
662 PL_stack_base = AvARRAY(PL_curstack);
663 PL_stack_sp = PL_stack_base;
664 PL_stack_max = PL_stack_base + AvMAX(PL_curstack);
665
666 New(50,PL_tmps_stack,32,SV*);
667 PL_tmps_floor = -1;
668 PL_tmps_ix = -1;
669 PL_tmps_max = 32;
670
671 New(54,PL_markstack,16,I32);
672 PL_markstack_ptr = PL_markstack;
673 PL_markstack_max = PL_markstack + 16;
674
675#ifdef SET_MARK_OFFSET
676 SET_MARK_OFFSET;
677#endif
678
679 New(54,PL_scopestack,8,I32);
680 PL_scopestack_ix = 0;
681 PL_scopestack_max = 8;
682
683 New(54,PL_savestack,24,ANY);
684 PL_savestack_ix = 0;
685 PL_savestack_max = 24;
686
687#if !PERL_VERSION_ATLEAST (5,10,0)
688 New(54,PL_retstack,4,OP*);
689 PL_retstack_ix = 0;
690 PL_retstack_max = 4;
691#endif
692}
693#endif
694
695/*
696 * destroy the stacks, the callchain etc...
697 */
698static void
699coro_destruct_stacks (pTHX)
700{
701 while (PL_curstackinfo->si_next)
702 PL_curstackinfo = PL_curstackinfo->si_next;
703
704 while (PL_curstackinfo)
705 {
706 PERL_SI *p = PL_curstackinfo->si_prev;
707
708 if (!IN_DESTRUCT)
709 SvREFCNT_dec (PL_curstackinfo->si_stack);
710
711 Safefree (PL_curstackinfo->si_cxstack);
712 Safefree (PL_curstackinfo);
713 PL_curstackinfo = p;
714 }
715
716 Safefree (PL_tmps_stack);
717 Safefree (PL_markstack);
718 Safefree (PL_scopestack);
719 Safefree (PL_savestack);
720#if !PERL_VERSION_ATLEAST (5,10,0)
721 Safefree (PL_retstack);
722#endif
723}
724
725#define CORO_RSS \
726 rss += sizeof (SYM (curstackinfo)); \
727 rss += (SYM (curstackinfo->si_cxmax) + 1) * sizeof (PERL_CONTEXT); \
728 rss += sizeof (SV) + sizeof (struct xpvav) + (1 + AvMAX (SYM (curstack))) * sizeof (SV *); \
729 rss += SYM (tmps_max) * sizeof (SV *); \
730 rss += (SYM (markstack_max) - SYM (markstack_ptr)) * sizeof (I32); \
731 rss += SYM (scopestack_max) * sizeof (I32); \
732 rss += SYM (savestack_max) * sizeof (ANY);
733
734static size_t
735coro_rss (pTHX_ struct coro *coro)
736{
737 size_t rss = sizeof (*coro);
738
739 if (coro->mainstack)
740 {
741 if (coro->flags & CF_RUNNING)
742 {
743 #define SYM(sym) PL_ ## sym
744 CORO_RSS;
745 #undef SYM
746 }
747 else
748 {
749 #define SYM(sym) coro->slot->sym
750 CORO_RSS;
751 #undef SYM
752 }
753 }
754
755 return rss;
756}
757
758/** coroutine stack handling ************************************************/
759
760static int (*orig_sigelem_get) (pTHX_ SV *sv, MAGIC *mg);
761static int (*orig_sigelem_set) (pTHX_ SV *sv, MAGIC *mg);
762static int (*orig_sigelem_clr) (pTHX_ SV *sv, MAGIC *mg);
763
764/* apparently < 5.8.8 */
765#ifndef MgPV_nolen_const
766#define MgPV_nolen_const(mg) (((((int)(mg)->mg_len)) == HEf_SVKEY) ? \
767 SvPV_nolen((SV*)((mg)->mg_ptr)) : \
768 (const char*)(mg)->mg_ptr)
769#endif
770
771/*
772 * This overrides the default magic get method of %SIG elements.
773 * The original one doesn't provide for reading back of PL_diehook/PL_warnhook
774 * and instead of trying to save and restore the hash elements, we just provide
775 * readback here.
776 */
777static int
778coro_sigelem_get (pTHX_ SV *sv, MAGIC *mg)
779{
780 const char *s = MgPV_nolen_const (mg);
781
782 if (*s == '_')
783 {
784 SV **svp = 0;
785
786 if (strEQ (s, "__DIE__" )) svp = &PL_diehook;
787 if (strEQ (s, "__WARN__")) svp = &PL_warnhook;
788
789 if (svp)
790 {
791 sv_setsv (sv, *svp ? *svp : &PL_sv_undef);
792 return 0;
793 }
794 }
795
796 return orig_sigelem_get ? orig_sigelem_get (aTHX_ sv, mg) : 0;
797}
798
799static int
800coro_sigelem_clr (pTHX_ SV *sv, MAGIC *mg)
801{
802 const char *s = MgPV_nolen_const (mg);
803
804 if (*s == '_')
805 {
806 SV **svp = 0;
807
808 if (strEQ (s, "__DIE__" )) svp = &PL_diehook;
809 if (strEQ (s, "__WARN__")) svp = &PL_warnhook;
810
811 if (svp)
812 {
813 SV *old = *svp;
814 *svp = 0;
815 SvREFCNT_dec (old);
816 return 0;
817 }
818 }
819
820 return orig_sigelem_clr ? orig_sigelem_clr (aTHX_ sv, mg) : 0;
821}
822
823static int
824coro_sigelem_set (pTHX_ SV *sv, MAGIC *mg)
825{
826 const char *s = MgPV_nolen_const (mg);
827
828 if (*s == '_')
829 {
830 SV **svp = 0;
831
832 if (strEQ (s, "__DIE__" )) svp = &PL_diehook;
833 if (strEQ (s, "__WARN__")) svp = &PL_warnhook;
834
835 if (svp)
836 {
837 SV *old = *svp;
838 *svp = SvOK (sv) ? newSVsv (sv) : 0;
839 SvREFCNT_dec (old);
840 return 0;
841 }
842 }
843
844 return orig_sigelem_set ? orig_sigelem_set (aTHX_ sv, mg) : 0;
845}
846
847static void
848prepare_nop (pTHX_ struct coro_transfer_args *ta)
849{
850 /* kind of mega-hacky, but works */
851 ta->next = ta->prev = (struct coro *)ta;
852}
853
854static int
855slf_check_nop (pTHX_ struct CoroSLF *frame)
856{
857 return 0;
858}
859
860static int
861slf_check_repeat (pTHX_ struct CoroSLF *frame)
862{
863 return 1;
864}
865
866static UNOP coro_setup_op;
867
868static void NOINLINE /* noinline to keep it out of the transfer fast path */
869coro_setup (pTHX_ struct coro *coro)
870{
871 /*
872 * emulate part of the perl startup here.
873 */
874 coro_init_stacks (aTHX);
875
876 PL_runops = RUNOPS_DEFAULT;
877 PL_curcop = &PL_compiling;
878 PL_in_eval = EVAL_NULL;
879 PL_comppad = 0;
880 PL_comppad_name = 0;
881 PL_comppad_name_fill = 0;
882 PL_comppad_name_floor = 0;
883 PL_curpm = 0;
884 PL_curpad = 0;
885 PL_localizing = 0;
886 PL_dirty = 0;
887 PL_restartop = 0;
888#if PERL_VERSION_ATLEAST (5,10,0)
889 PL_parser = 0;
890#endif
891 PL_hints = 0;
892
893 /* recreate the die/warn hooks */
894 PL_diehook = 0; SvSetMagicSV (*hv_fetch (hv_sig, "__DIE__" , sizeof ("__DIE__" ) - 1, 1), rv_diehook );
895 PL_warnhook = 0; SvSetMagicSV (*hv_fetch (hv_sig, "__WARN__", sizeof ("__WARN__") - 1, 1), rv_warnhook);
896
897 GvSV (PL_defgv) = newSV (0);
898 GvAV (PL_defgv) = coro->args; coro->args = 0;
899 GvSV (PL_errgv) = newSV (0);
900 GvSV (irsgv) = newSVpvn ("\n", 1); sv_magic (GvSV (irsgv), (SV *)irsgv, PERL_MAGIC_sv, "/", 0);
901 GvHV (PL_hintgv) = 0;
902 PL_rs = newSVsv (GvSV (irsgv));
903 PL_defoutgv = (GV *)SvREFCNT_inc_NN (stdoutgv);
904
905 {
906 dSP;
907 UNOP myop;
908
909 Zero (&myop, 1, UNOP);
910 myop.op_next = Nullop;
911 myop.op_type = OP_ENTERSUB;
912 myop.op_flags = OPf_WANT_VOID;
913
914 PUSHMARK (SP);
915 PUSHs ((SV *)coro->startcv);
916 PUTBACK;
917 PL_op = (OP *)&myop;
918 PL_op = PL_ppaddr[OP_ENTERSUB](aTHX);
919 }
920
921 /* this newly created coroutine might be run on an existing cctx which most
922 * likely was suspended in pp_slf, so we have to emulate entering pp_slf here.
923 */
924 slf_frame.prepare = prepare_nop; /* provide a nop function for an eventual pp_slf */
925 slf_frame.check = slf_check_nop; /* signal pp_slf to not repeat */
926
927 /* and we have to provide the pp_slf op in any case, so pp_slf can skip it */
928 coro_setup_op.op_next = PL_op;
929 coro_setup_op.op_type = OP_ENTERSUB;
930 coro_setup_op.op_ppaddr = pp_slf;
931 /* no flags etc. required, as an init function won't be called */
932
933 PL_op = (OP *)&coro_setup_op;
934
935 /* copy throw, in case it was set before coro_setup */
936 CORO_THROW = coro->except;
937}
938
939static void
940coro_unwind_stacks (pTHX)
941{
942 if (!IN_DESTRUCT)
943 {
944 /* restore all saved variables and stuff */
945 LEAVE_SCOPE (0);
946 assert (PL_tmps_floor == -1);
947
948 /* free all temporaries */
949 FREETMPS;
950 assert (PL_tmps_ix == -1);
951
952 /* unwind all extra stacks */
953 POPSTACK_TO (PL_mainstack);
954
955 /* unwind main stack */
956 dounwind (-1);
957 }
958}
959
960static void
961coro_destruct_perl (pTHX_ struct coro *coro)
962{
963 coro_unwind_stacks (aTHX);
964
965 SvREFCNT_dec (GvSV (PL_defgv));
966 SvREFCNT_dec (GvAV (PL_defgv));
967 SvREFCNT_dec (GvSV (PL_errgv));
968 SvREFCNT_dec (PL_defoutgv);
969 SvREFCNT_dec (PL_rs);
970 SvREFCNT_dec (GvSV (irsgv));
971 SvREFCNT_dec (GvHV (PL_hintgv));
972
973 SvREFCNT_dec (PL_diehook);
974 SvREFCNT_dec (PL_warnhook);
975
976 SvREFCNT_dec (coro->saved_deffh);
977 SvREFCNT_dec (coro->rouse_cb);
978 SvREFCNT_dec (coro->invoke_cb);
979 SvREFCNT_dec (coro->invoke_av);
980
981 coro_destruct_stacks (aTHX);
982}
983
984INLINE void
985free_coro_mortal (pTHX)
986{
987 if (expect_true (coro_mortal))
988 {
989 SvREFCNT_dec (coro_mortal);
990 coro_mortal = 0;
991 }
992}
993
994static int
995runops_trace (pTHX)
996{
997 COP *oldcop = 0;
998 int oldcxix = -2;
999
1000 while ((PL_op = CALL_FPTR (PL_op->op_ppaddr) (aTHX)))
1001 {
1002 PERL_ASYNC_CHECK ();
1003
1004 if (cctx_current->flags & CC_TRACE_ALL)
1005 {
1006 if (PL_op->op_type == OP_LEAVESUB && cctx_current->flags & CC_TRACE_SUB)
1007 {
1008 PERL_CONTEXT *cx = &cxstack[cxstack_ix];
1009 SV **bot, **top;
1010 AV *av = newAV (); /* return values */
1011 SV **cb;
1012 dSP;
1013
1014 GV *gv = CvGV (cx->blk_sub.cv);
1015 SV *fullname = sv_2mortal (newSV (0));
1016 if (isGV (gv))
1017 gv_efullname3 (fullname, gv, 0);
1018
1019 bot = PL_stack_base + cx->blk_oldsp + 1;
1020 top = cx->blk_gimme == G_ARRAY ? SP + 1
1021 : cx->blk_gimme == G_SCALAR ? bot + 1
1022 : bot;
1023
1024 av_extend (av, top - bot);
1025 while (bot < top)
1026 av_push (av, SvREFCNT_inc_NN (*bot++));
1027
1028 PL_runops = RUNOPS_DEFAULT;
1029 ENTER;
1030 SAVETMPS;
1031 EXTEND (SP, 3);
1032 PUSHMARK (SP);
1033 PUSHs (&PL_sv_no);
1034 PUSHs (fullname);
1035 PUSHs (sv_2mortal (newRV_noinc ((SV *)av)));
1036 PUTBACK;
1037 cb = hv_fetch ((HV *)SvRV (coro_current), "_trace_sub_cb", sizeof ("_trace_sub_cb") - 1, 0);
1038 if (cb) call_sv (*cb, G_KEEPERR | G_EVAL | G_VOID | G_DISCARD);
1039 SPAGAIN;
1040 FREETMPS;
1041 LEAVE;
1042 PL_runops = runops_trace;
1043 }
1044
1045 if (oldcop != PL_curcop)
1046 {
1047 oldcop = PL_curcop;
1048
1049 if (PL_curcop != &PL_compiling)
1050 {
1051 SV **cb;
1052
1053 if (oldcxix != cxstack_ix && cctx_current->flags & CC_TRACE_SUB)
1054 {
1055 PERL_CONTEXT *cx = &cxstack[cxstack_ix];
1056
1057 if (CxTYPE (cx) == CXt_SUB && oldcxix < cxstack_ix)
1058 {
1059 dSP;
1060 GV *gv = CvGV (cx->blk_sub.cv);
1061 SV *fullname = sv_2mortal (newSV (0));
1062
1063 if (isGV (gv))
1064 gv_efullname3 (fullname, gv, 0);
1065
1066 PL_runops = RUNOPS_DEFAULT;
1067 ENTER;
1068 SAVETMPS;
1069 EXTEND (SP, 3);
1070 PUSHMARK (SP);
1071 PUSHs (&PL_sv_yes);
1072 PUSHs (fullname);
1073 PUSHs (CxHASARGS (cx) ? sv_2mortal (newRV_inc ((SV *)cx->blk_sub.argarray)) : &PL_sv_undef);
1074 PUTBACK;
1075 cb = hv_fetch ((HV *)SvRV (coro_current), "_trace_sub_cb", sizeof ("_trace_sub_cb") - 1, 0);
1076 if (cb) call_sv (*cb, G_KEEPERR | G_EVAL | G_VOID | G_DISCARD);
1077 SPAGAIN;
1078 FREETMPS;
1079 LEAVE;
1080 PL_runops = runops_trace;
1081 }
1082
1083 oldcxix = cxstack_ix;
1084 }
1085
1086 if (cctx_current->flags & CC_TRACE_LINE)
1087 {
1088 dSP;
1089
1090 PL_runops = RUNOPS_DEFAULT;
1091 ENTER;
1092 SAVETMPS;
1093 EXTEND (SP, 3);
1094 PL_runops = RUNOPS_DEFAULT;
1095 PUSHMARK (SP);
1096 PUSHs (sv_2mortal (newSVpv (OutCopFILE (oldcop), 0)));
1097 PUSHs (sv_2mortal (newSViv (CopLINE (oldcop))));
1098 PUTBACK;
1099 cb = hv_fetch ((HV *)SvRV (coro_current), "_trace_line_cb", sizeof ("_trace_line_cb") - 1, 0);
1100 if (cb) call_sv (*cb, G_KEEPERR | G_EVAL | G_VOID | G_DISCARD);
1101 SPAGAIN;
1102 FREETMPS;
1103 LEAVE;
1104 PL_runops = runops_trace;
1105 }
1106 }
1107 }
1108 }
1109 }
1110
1111 TAINT_NOT;
1112 return 0;
1113}
1114
1115static struct CoroSLF cctx_ssl_frame;
1116
1117static void
1118slf_prepare_set_stacklevel (pTHX_ struct coro_transfer_args *ta)
1119{
1120 ta->prev = 0;
1121}
1122
1123static int
1124slf_check_set_stacklevel (pTHX_ struct CoroSLF *frame)
1125{
1126 *frame = cctx_ssl_frame;
1127
1128 return frame->check (aTHX_ frame); /* execute the restored frame - there must be one */
1129}
1130
1131/* initialises PL_top_env and injects a pseudo-slf-call to set the stacklevel */
1132static void NOINLINE
1133cctx_prepare (pTHX)
1134{
1135 PL_top_env = &PL_start_env;
1136
1137 if (cctx_current->flags & CC_TRACE)
1138 PL_runops = runops_trace;
1139
1140 /* we already must be executing an SLF op, there is no other valid way
1141 * that can lead to creation of a new cctx */
1142 assert (("FATAL: can't prepare slf-less cctx in Coro module (please report)",
1143 slf_frame.prepare && PL_op->op_ppaddr == pp_slf));
1144
1145 /* we must emulate leaving pp_slf, which is done inside slf_check_set_stacklevel */
1146 cctx_ssl_frame = slf_frame;
1147
1148 slf_frame.prepare = slf_prepare_set_stacklevel;
1149 slf_frame.check = slf_check_set_stacklevel;
1150}
1151
1152/* the tail of transfer: execute stuff we can only do after a transfer */
1153INLINE void
1154transfer_tail (pTHX)
1155{
1156 free_coro_mortal (aTHX);
1157}
1158
1159/*
1160 * this is a _very_ stripped down perl interpreter ;)
1161 */
1162static void
1163cctx_run (void *arg)
1164{
1165#ifdef USE_ITHREADS
1166# if CORO_PTHREAD
1167 PERL_SET_CONTEXT (coro_thx);
1168# endif
1169#endif
1170 {
1171 dTHX;
1172
1173 /* normally we would need to skip the entersub here */
1174 /* not doing so will re-execute it, which is exactly what we want */
1175 /* PL_nop = PL_nop->op_next */
1176
1177 /* inject a fake subroutine call to cctx_init */
1178 cctx_prepare (aTHX);
1179
1180 /* cctx_run is the alternative tail of transfer() */
1181 transfer_tail (aTHX);
1182
1183 /* somebody or something will hit me for both perl_run and PL_restartop */
1184 PL_restartop = PL_op;
1185 perl_run (PL_curinterp);
1186 /*
1187 * Unfortunately, there is no way to get at the return values of the
1188 * coro body here, as perl_run destroys these
1189 */
1190
1191 /*
1192 * If perl-run returns we assume exit() was being called or the coro
1193 * fell off the end, which seems to be the only valid (non-bug)
1194 * reason for perl_run to return. We try to exit by jumping to the
1195 * bootstrap-time "top" top_env, as we cannot restore the "main"
1196 * coroutine as Coro has no such concept.
1197 * This actually isn't valid with the pthread backend, but OSes requiring
1198 * that backend are too broken to do it in a standards-compliant way.
1199 */
1200 PL_top_env = main_top_env;
1201 JMPENV_JUMP (2); /* I do not feel well about the hardcoded 2 at all */
1202 }
1203}
1204
1205static coro_cctx *
1206cctx_new ()
1207{
1208 coro_cctx *cctx;
1209
1210 ++cctx_count;
1211 New (0, cctx, 1, coro_cctx);
1212
1213 cctx->gen = cctx_gen;
1214 cctx->flags = 0;
1215 cctx->idle_sp = 0; /* can be accessed by transfer between cctx_run and set_stacklevel, on throw */
1216
1217 return cctx;
1218}
1219
1220/* create a new cctx only suitable as source */
1221static coro_cctx *
1222cctx_new_empty ()
1223{
1224 coro_cctx *cctx = cctx_new ();
1225
1226 cctx->sptr = 0;
1227 coro_create (&cctx->cctx, 0, 0, 0, 0);
1228
1229 return cctx;
1230}
1231
1232/* create a new cctx suitable as destination/running a perl interpreter */
1233static coro_cctx *
1234cctx_new_run ()
1235{
1236 coro_cctx *cctx = cctx_new ();
1237 void *stack_start;
1238 size_t stack_size;
1239
1240#if HAVE_MMAP
1241 cctx->ssize = ((cctx_stacksize * sizeof (long) + PAGESIZE - 1) / PAGESIZE + CORO_STACKGUARD) * PAGESIZE;
1242 /* mmap supposedly does allocate-on-write for us */
1243 cctx->sptr = mmap (0, cctx->ssize, PROT_EXEC|PROT_READ|PROT_WRITE, MAP_PRIVATE|MAP_ANONYMOUS, 0, 0);
1244
1245 if (cctx->sptr != (void *)-1)
1246 {
1247 #if CORO_STACKGUARD
1248 mprotect (cctx->sptr, CORO_STACKGUARD * PAGESIZE, PROT_NONE);
1249 #endif
1250 stack_start = (char *)cctx->sptr + CORO_STACKGUARD * PAGESIZE;
1251 stack_size = cctx->ssize - CORO_STACKGUARD * PAGESIZE;
1252 cctx->flags |= CC_MAPPED;
1253 }
1254 else
1255#endif
1256 {
1257 cctx->ssize = cctx_stacksize * (long)sizeof (long);
1258 New (0, cctx->sptr, cctx_stacksize, long);
1259
1260 if (!cctx->sptr)
1261 {
1262 perror ("FATAL: unable to allocate stack for coroutine, exiting.");
1263 _exit (EXIT_FAILURE);
1264 }
1265
1266 stack_start = cctx->sptr;
1267 stack_size = cctx->ssize;
1268 }
1269
1270 #if CORO_USE_VALGRIND
1271 cctx->valgrind_id = VALGRIND_STACK_REGISTER ((char *)stack_start, (char *)stack_start + stack_size);
1272 #endif
1273
1274 coro_create (&cctx->cctx, cctx_run, (void *)cctx, stack_start, stack_size);
1275
1276 return cctx;
1277}
1278
1279static void
1280cctx_destroy (coro_cctx *cctx)
1281{
1282 if (!cctx)
1283 return;
1284
1285 assert (("FATAL: tried to destroy current cctx", cctx != cctx_current));//D temporary?
1286
1287 --cctx_count;
1288 coro_destroy (&cctx->cctx);
1289
1290 /* coro_transfer creates new, empty cctx's */
1291 if (cctx->sptr)
1292 {
1293 #if CORO_USE_VALGRIND
1294 VALGRIND_STACK_DEREGISTER (cctx->valgrind_id);
1295 #endif
1296
1297#if HAVE_MMAP
1298 if (cctx->flags & CC_MAPPED)
1299 munmap (cctx->sptr, cctx->ssize);
1300 else
1301#endif
1302 Safefree (cctx->sptr);
1303 }
1304
1305 Safefree (cctx);
1306}
1307
1308/* wether this cctx should be destructed */
1309#define CCTX_EXPIRED(cctx) ((cctx)->gen != cctx_gen || ((cctx)->flags & CC_NOREUSE))
1310
1311static coro_cctx *
1312cctx_get (pTHX)
1313{
1314 while (expect_true (cctx_first))
1315 {
1316 coro_cctx *cctx = cctx_first;
1317 cctx_first = cctx->next;
1318 --cctx_idle;
1319
1320 if (expect_true (!CCTX_EXPIRED (cctx)))
1321 return cctx;
1322
1323 cctx_destroy (cctx);
1324 }
1325
1326 return cctx_new_run ();
1327}
1328
1329static void
1330cctx_put (coro_cctx *cctx)
1331{
1332 assert (("FATAL: cctx_put called on non-initialised cctx in Coro (please report)", cctx->sptr));
1333
1334 /* free another cctx if overlimit */
1335 if (expect_false (cctx_idle >= cctx_max_idle))
1336 {
1337 coro_cctx *first = cctx_first;
1338 cctx_first = first->next;
1339 --cctx_idle;
1340
1341 cctx_destroy (first);
1342 }
1343
1344 ++cctx_idle;
1345 cctx->next = cctx_first;
1346 cctx_first = cctx;
1347}
1348
1349/** coroutine switching *****************************************************/
1350
1351static void
1352transfer_check (pTHX_ struct coro *prev, struct coro *next)
1353{
1354 /* TODO: throwing up here is considered harmful */
1355
1356 if (expect_true (prev != next))
1357 {
1358 if (expect_false (!(prev->flags & (CF_RUNNING | CF_NEW))))
1359 croak ("Coro::State::transfer called with a suspended prev Coro::State, but can only transfer from running or new states,");
1360
1361 if (expect_false (next->flags & CF_RUNNING))
1362 croak ("Coro::State::transfer called with running next Coro::State, but can only transfer to inactive states,");
1363
1364 if (expect_false (next->flags & CF_DESTROYED))
1365 croak ("Coro::State::transfer called with destroyed next Coro::State, but can only transfer to inactive states,");
1366
1367#if !PERL_VERSION_ATLEAST (5,10,0)
1368 if (expect_false (PL_lex_state != LEX_NOTPARSING))
1369 croak ("Coro::State::transfer called while parsing, but this is not supported in your perl version,");
1370#endif
1371 }
1372}
1373
1374/* always use the TRANSFER macro */
1375static void NOINLINE /* noinline so we have a fixed stackframe */
1376transfer (pTHX_ struct coro *prev, struct coro *next, int force_cctx)
1377{
1378 dSTACKLEVEL;
1379
1380 /* sometimes transfer is only called to set idle_sp */
1381 if (expect_false (!prev))
1382 {
1383 cctx_current->idle_sp = STACKLEVEL;
1384 assert (cctx_current->idle_te = PL_top_env); /* just for the side-effect when asserts are enabled */
1385 }
1386 else if (expect_true (prev != next))
1387 {
1388 coro_cctx *cctx_prev;
1389
1390 if (expect_false (prev->flags & CF_NEW))
1391 {
1392 /* create a new empty/source context */
1393 prev->flags &= ~CF_NEW;
1394 prev->flags |= CF_RUNNING;
1395 }
1396
1397 prev->flags &= ~CF_RUNNING;
1398 next->flags |= CF_RUNNING;
1399
1400 /* first get rid of the old state */
1401 save_perl (aTHX_ prev);
1402
1403 if (expect_false (next->flags & CF_NEW))
1404 {
1405 /* need to start coroutine */
1406 next->flags &= ~CF_NEW;
1407 /* setup coroutine call */
1408 coro_setup (aTHX_ next);
1409 }
1410 else
1411 load_perl (aTHX_ next);
1412
1413 /* possibly untie and reuse the cctx */
1414 if (expect_true (
1415 cctx_current->idle_sp == STACKLEVEL
1416 && !(cctx_current->flags & CC_TRACE)
1417 && !force_cctx
1418 ))
1419 {
1420 /* I assume that stacklevel is a stronger indicator than PL_top_env changes */
1421 assert (("FATAL: current top_env must equal previous top_env in Coro (please report)", PL_top_env == cctx_current->idle_te));
1422
1423 /* if the cctx is about to be destroyed we need to make sure we won't see it in cctx_get. */
1424 /* without this the next cctx_get might destroy the running cctx while still in use */
1425 if (expect_false (CCTX_EXPIRED (cctx_current)))
1426 if (expect_true (!next->cctx))
1427 next->cctx = cctx_get (aTHX);
1428
1429 cctx_put (cctx_current);
1430 }
1431 else
1432 prev->cctx = cctx_current;
1433
1434 ++next->usecount;
1435
1436 cctx_prev = cctx_current;
1437 cctx_current = expect_false (next->cctx) ? next->cctx : cctx_get (aTHX);
1438
1439 next->cctx = 0;
1440
1441 if (expect_false (cctx_prev != cctx_current))
1442 {
1443 cctx_prev->top_env = PL_top_env;
1444 PL_top_env = cctx_current->top_env;
1445 coro_transfer (&cctx_prev->cctx, &cctx_current->cctx);
1446 }
1447
1448 transfer_tail (aTHX);
1449 }
1450}
1451
1452#define TRANSFER(ta, force_cctx) transfer (aTHX_ (ta).prev, (ta).next, (force_cctx))
1453#define TRANSFER_CHECK(ta) transfer_check (aTHX_ (ta).prev, (ta).next)
1454
1455/** high level stuff ********************************************************/
1456
1457static int
1458coro_state_destroy (pTHX_ struct coro *coro)
1459{
1460 if (coro->flags & CF_DESTROYED)
1461 return 0;
1462
1463 if (coro->on_destroy)
1464 coro->on_destroy (aTHX_ coro);
1465
1466 coro->flags |= CF_DESTROYED;
1467
1468 if (coro->flags & CF_READY)
1469 {
1470 /* reduce nready, as destroying a ready coro effectively unreadies it */
1471 /* alternative: look through all ready queues and remove the coro */
1472 --coro_nready;
1473 }
1474 else
1475 coro->flags |= CF_READY; /* make sure it is NOT put into the readyqueue */
1476
1477 if (coro->mainstack
1478 && coro->mainstack != main_mainstack
1479 && coro->slot
1480 && !PL_dirty)
1481 {
1482 struct coro temp;
1483
1484 assert (("FATAL: tried to destroy currently running coroutine", coro->mainstack != PL_mainstack));
1485
1486 save_perl (aTHX_ &temp);
1487 load_perl (aTHX_ coro);
1488
1489 coro_destruct_perl (aTHX_ coro);
1490
1491 load_perl (aTHX_ &temp);
1492
1493 coro->slot = 0;
1494 }
1495
1496 cctx_destroy (coro->cctx);
1497 SvREFCNT_dec (coro->startcv);
1498 SvREFCNT_dec (coro->args);
1499 SvREFCNT_dec (CORO_THROW);
1500
1501 if (coro->next) coro->next->prev = coro->prev;
1502 if (coro->prev) coro->prev->next = coro->next;
1503 if (coro == coro_first) coro_first = coro->next;
1504
1505 return 1;
1506}
1507
1508static int
1509coro_state_free (pTHX_ SV *sv, MAGIC *mg)
1510{
1511 struct coro *coro = (struct coro *)mg->mg_ptr;
1512 mg->mg_ptr = 0;
1513
1514 coro->hv = 0;
1515
1516 if (--coro->refcnt < 0)
1517 {
1518 coro_state_destroy (aTHX_ coro);
1519 Safefree (coro);
1520 }
1521
1522 return 0;
1523}
1524
1525static int
1526coro_state_dup (pTHX_ MAGIC *mg, CLONE_PARAMS *params)
1527{
1528 struct coro *coro = (struct coro *)mg->mg_ptr;
1529
1530 ++coro->refcnt;
1531
1532 return 0;
1533}
1534
1535static MGVTBL coro_state_vtbl = {
1536 0, 0, 0, 0,
1537 coro_state_free,
1538 0,
1539#ifdef MGf_DUP
1540 coro_state_dup,
1541#else
1542# define MGf_DUP 0
1543#endif
1544};
1545
1546static void
1547prepare_transfer (pTHX_ struct coro_transfer_args *ta, SV *prev_sv, SV *next_sv)
1548{
1549 ta->prev = SvSTATE (prev_sv);
1550 ta->next = SvSTATE (next_sv);
1551 TRANSFER_CHECK (*ta);
1552}
1553
1554static void
1555api_transfer (pTHX_ SV *prev_sv, SV *next_sv)
1556{
1557 struct coro_transfer_args ta;
1558
1559 prepare_transfer (aTHX_ &ta, prev_sv, next_sv);
1560 TRANSFER (ta, 1);
1561}
1562
1563/*****************************************************************************/
1564/* gensub: simple closure generation utility */
1565
1566#define GENSUB_ARG CvXSUBANY (cv).any_ptr
1567
1568/* create a closure from XS, returns a code reference */
1569/* the arg can be accessed via GENSUB_ARG from the callback */
1570/* the callback must use dXSARGS/XSRETURN */
1571static SV *
1572gensub (pTHX_ void (*xsub)(pTHX_ CV *), void *arg)
1573{
1574 CV *cv = (CV *)newSV (0);
1575
1576 sv_upgrade ((SV *)cv, SVt_PVCV);
1577
1578 CvANON_on (cv);
1579 CvISXSUB_on (cv);
1580 CvXSUB (cv) = xsub;
1581 GENSUB_ARG = arg;
1582
1583 return newRV_noinc ((SV *)cv);
1584}
1585
1586/** Coro ********************************************************************/
1587
1588INLINE void
1589coro_enq (pTHX_ struct coro *coro)
1590{
1591 av_push (coro_ready [coro->prio - PRIO_MIN], SvREFCNT_inc_NN (coro->hv));
1592}
1593
1594INLINE SV *
1595coro_deq (pTHX)
1596{
1597 int prio;
1598
1599 for (prio = PRIO_MAX - PRIO_MIN + 1; --prio >= 0; )
1600 if (AvFILLp (coro_ready [prio]) >= 0)
1601 return av_shift (coro_ready [prio]);
1602
1603 return 0;
1604}
1605
1606static int
1607api_ready (pTHX_ SV *coro_sv)
1608{
1609 struct coro *coro;
1610 SV *sv_hook;
1611 void (*xs_hook)(void);
1612
1613 coro = SvSTATE (coro_sv);
1614
1615 if (coro->flags & CF_READY)
1616 return 0;
1617
1618 coro->flags |= CF_READY;
1619
1620 sv_hook = coro_nready ? 0 : coro_readyhook;
1621 xs_hook = coro_nready ? 0 : coroapi.readyhook;
1622
1623 coro_enq (aTHX_ coro);
1624 ++coro_nready;
1625
1626 if (sv_hook)
1627 {
1628 dSP;
1629
1630 ENTER;
1631 SAVETMPS;
1632
1633 PUSHMARK (SP);
1634 PUTBACK;
1635 call_sv (sv_hook, G_VOID | G_DISCARD);
1636
1637 FREETMPS;
1638 LEAVE;
1639 }
1640
1641 if (xs_hook)
1642 xs_hook ();
1643
1644 return 1;
1645}
1646
1647static int
1648api_is_ready (pTHX_ SV *coro_sv)
1649{
1650 return !!(SvSTATE (coro_sv)->flags & CF_READY);
1651}
1652
1653/* expects to own a reference to next->hv */
1654INLINE void
1655prepare_schedule_to (pTHX_ struct coro_transfer_args *ta, struct coro *next)
1656{
1657 SV *prev_sv = SvRV (coro_current);
1658
1659 ta->prev = SvSTATE_hv (prev_sv);
1660 ta->next = next;
1661
1662 TRANSFER_CHECK (*ta);
1663
1664 SvRV_set (coro_current, (SV *)next->hv);
1665
1666 free_coro_mortal (aTHX);
1667 coro_mortal = prev_sv;
1668}
1669
1670static void
1671prepare_schedule (pTHX_ struct coro_transfer_args *ta)
1672{
1673 for (;;)
1674 {
1675 SV *next_sv = coro_deq (aTHX);
1676
1677 if (expect_true (next_sv))
1678 {
1679 struct coro *next = SvSTATE_hv (next_sv);
1680
1681 /* cannot transfer to destroyed coros, skip and look for next */
1682 if (expect_false (next->flags & CF_DESTROYED))
1683 SvREFCNT_dec (next_sv); /* coro_nready has already been taken care of by destroy */
1684 else
1685 {
1686 next->flags &= ~CF_READY;
1687 --coro_nready;
1688
1689 prepare_schedule_to (aTHX_ ta, next);
1690 break;
1691 }
1692 }
1693 else
1694 {
1695 /* nothing to schedule: call the idle handler */
1696 if (SvROK (sv_idle)
1697 && SvOBJECT (SvRV (sv_idle)))
1698 {
1699 ++coro_nready; /* hack so that api_ready doesn't invoke ready hook */
1700 api_ready (aTHX_ SvRV (sv_idle));
1701 --coro_nready;
1702 }
1703 else
1704 {
1705 dSP;
1706
1707 ENTER;
1708 SAVETMPS;
1709
1710 PUSHMARK (SP);
1711 PUTBACK;
1712 call_sv (sv_idle, G_VOID | G_DISCARD);
1713
1714 FREETMPS;
1715 LEAVE;
1716 }
1717 }
1718 }
1719}
1720
1721INLINE void
1722prepare_cede (pTHX_ struct coro_transfer_args *ta)
1723{
1724 api_ready (aTHX_ coro_current);
1725 prepare_schedule (aTHX_ ta);
1726}
1727
1728INLINE void
1729prepare_cede_notself (pTHX_ struct coro_transfer_args *ta)
1730{
1731 SV *prev = SvRV (coro_current);
1732
1733 if (coro_nready)
1734 {
1735 prepare_schedule (aTHX_ ta);
1736 api_ready (aTHX_ prev);
1737 }
1738 else
1739 prepare_nop (aTHX_ ta);
1740}
1741
1742static void
1743api_schedule (pTHX)
1744{
1745 struct coro_transfer_args ta;
1746
1747 prepare_schedule (aTHX_ &ta);
1748 TRANSFER (ta, 1);
1749}
1750
1751static void
1752api_schedule_to (pTHX_ SV *coro_sv)
1753{
1754 struct coro_transfer_args ta;
1755 struct coro *next = SvSTATE (coro_sv);
1756
1757 SvREFCNT_inc_NN (coro_sv);
1758 prepare_schedule_to (aTHX_ &ta, next);
1759}
1760
1761static int
1762api_cede (pTHX)
1763{
1764 struct coro_transfer_args ta;
1765
1766 prepare_cede (aTHX_ &ta);
1767
1768 if (expect_true (ta.prev != ta.next))
1769 {
1770 TRANSFER (ta, 1);
1771 return 1;
1772 }
1773 else
1774 return 0;
1775}
1776
1777static int
1778api_cede_notself (pTHX)
1779{
1780 if (coro_nready)
1781 {
1782 struct coro_transfer_args ta;
1783
1784 prepare_cede_notself (aTHX_ &ta);
1785 TRANSFER (ta, 1);
1786 return 1;
1787 }
1788 else
1789 return 0;
1790}
1791
1792static void
1793api_trace (pTHX_ SV *coro_sv, int flags)
1794{
1795 struct coro *coro = SvSTATE (coro_sv);
1796
1797 if (coro->flags & CF_RUNNING)
1798 croak ("cannot enable tracing on a running coroutine, caught");
1799
1800 if (flags & CC_TRACE)
1801 {
1802 if (!coro->cctx)
1803 coro->cctx = cctx_new_run ();
1804 else if (!(coro->cctx->flags & CC_TRACE))
1805 croak ("cannot enable tracing on coroutine with custom stack, caught");
1806
1807 coro->cctx->flags |= CC_NOREUSE | (flags & (CC_TRACE | CC_TRACE_ALL));
1808 }
1809 else if (coro->cctx && coro->cctx->flags & CC_TRACE)
1810 {
1811 coro->cctx->flags &= ~(CC_TRACE | CC_TRACE_ALL);
1812
1813 if (coro->flags & CF_RUNNING)
1814 PL_runops = RUNOPS_DEFAULT;
1815 else
1816 coro->slot->runops = RUNOPS_DEFAULT;
1817 }
1818}
1819
1820static void
1821coro_call_on_destroy (pTHX_ struct coro *coro)
1822{
1823 SV **on_destroyp = hv_fetch (coro->hv, "_on_destroy", sizeof ("_on_destroy") - 1, 0);
1824 SV **statusp = hv_fetch (coro->hv, "_status", sizeof ("_status") - 1, 0);
1825
1826 if (on_destroyp)
1827 {
1828 AV *on_destroy = (AV *)SvRV (*on_destroyp);
1829
1830 while (AvFILLp (on_destroy) >= 0)
1831 {
1832 dSP; /* don't disturb outer sp */
1833 SV *cb = av_pop (on_destroy);
1834
1835 PUSHMARK (SP);
1836
1837 if (statusp)
1838 {
1839 int i;
1840 AV *status = (AV *)SvRV (*statusp);
1841 EXTEND (SP, AvFILLp (status) + 1);
1842
1843 for (i = 0; i <= AvFILLp (status); ++i)
1844 PUSHs (AvARRAY (status)[i]);
1845 }
1846
1847 PUTBACK;
1848 call_sv (sv_2mortal (cb), G_VOID | G_DISCARD);
1849 }
1850 }
1851}
1852
1853static void
1854slf_init_terminate (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
1855{
1856 int i;
1857 HV *hv = (HV *)SvRV (coro_current);
1858 AV *av = newAV ();
1859
1860 av_extend (av, items - 1);
1861 for (i = 0; i < items; ++i)
1862 av_push (av, SvREFCNT_inc_NN (arg [i]));
1863
1864 hv_store (hv, "_status", sizeof ("_status") - 1, newRV_noinc ((SV *)av), 0);
1865
1866 av_push (av_destroy, (SV *)newRV_inc ((SV *)hv)); /* RVinc for perl */
1867 api_ready (aTHX_ sv_manager);
1868
1869 frame->prepare = prepare_schedule;
1870 frame->check = slf_check_repeat;
1871
1872 /* as a minor optimisation, we could unwind all stacks here */
1873 /* but that puts extra pressure on pp_slf, and is not worth much */
1874 /*coro_unwind_stacks (aTHX);*/
1875}
1876
1877/*****************************************************************************/
1878/* async pool handler */
1879
1880static int
1881slf_check_pool_handler (pTHX_ struct CoroSLF *frame)
1882{
1883 HV *hv = (HV *)SvRV (coro_current);
1884 struct coro *coro = (struct coro *)frame->data;
1885
1886 if (!coro->invoke_cb)
1887 return 1; /* loop till we have invoke */
1888 else
1889 {
1890 hv_store (hv, "desc", sizeof ("desc") - 1,
1891 newSVpvn ("[async_pool]", sizeof ("[async_pool]") - 1), 0);
1892
1893 coro->saved_deffh = SvREFCNT_inc_NN ((SV *)PL_defoutgv);
1894
1895 {
1896 dSP;
1897 XPUSHs (sv_2mortal (coro->invoke_cb)); coro->invoke_cb = 0;
1898 PUTBACK;
1899 }
1900
1901 SvREFCNT_dec (GvAV (PL_defgv));
1902 GvAV (PL_defgv) = coro->invoke_av;
1903 coro->invoke_av = 0;
1904
1905 return 0;
1906 }
1907}
1908
1909static void
1910slf_init_pool_handler (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
1911{
1912 HV *hv = (HV *)SvRV (coro_current);
1913 struct coro *coro = SvSTATE_hv ((SV *)hv);
1914
1915 if (expect_true (coro->saved_deffh))
1916 {
1917 /* subsequent iteration */
1918 SvREFCNT_dec ((SV *)PL_defoutgv); PL_defoutgv = (GV *)coro->saved_deffh;
1919 coro->saved_deffh = 0;
1920
1921 if (coro_rss (aTHX_ coro) > SvUV (sv_pool_rss)
1922 || av_len (av_async_pool) + 1 >= SvIV (sv_pool_size))
1923 {
1924 coro->invoke_cb = SvREFCNT_inc_NN ((SV *)cv_coro_terminate);
1925 coro->invoke_av = newAV ();
1926
1927 frame->prepare = prepare_nop;
1928 }
1929 else
1930 {
1931 av_clear (GvAV (PL_defgv));
1932 hv_store (hv, "desc", sizeof ("desc") - 1, SvREFCNT_inc_NN (sv_async_pool_idle), 0);
1933
1934 coro->prio = 0;
1935
1936 if (coro->cctx && (coro->cctx->flags & CC_TRACE))
1937 api_trace (aTHX_ coro_current, 0);
1938
1939 frame->prepare = prepare_schedule;
1940 av_push (av_async_pool, SvREFCNT_inc (hv));
1941 }
1942 }
1943 else
1944 {
1945 /* first iteration, simply fall through */
1946 frame->prepare = prepare_nop;
1947 }
1948
1949 frame->check = slf_check_pool_handler;
1950 frame->data = (void *)coro;
1951}
1952
1953/*****************************************************************************/
1954/* rouse callback */
1955
1956#define CORO_MAGIC_type_rouse PERL_MAGIC_ext
1957
1958static void
1959coro_rouse_callback (pTHX_ CV *cv)
1960{
1961 dXSARGS;
1962 SV *data = (SV *)GENSUB_ARG;
1963
1964 if (SvTYPE (SvRV (data)) != SVt_PVAV)
1965 {
1966 /* first call, set args */
1967 AV *av = newAV ();
1968 SV *coro = SvRV (data);
1969
1970 SvRV_set (data, (SV *)av);
1971 api_ready (aTHX_ coro);
1972 SvREFCNT_dec (coro);
1973
1974 /* better take a full copy of the arguments */
1975 while (items--)
1976 av_store (av, items, newSVsv (ST (items)));
1977 }
1978
1979 XSRETURN_EMPTY;
1980}
1981
1982static int
1983slf_check_rouse_wait (pTHX_ struct CoroSLF *frame)
1984{
1985 SV *data = (SV *)frame->data;
1986
1987 if (CORO_THROW)
1988 return 0;
1989
1990 if (SvTYPE (SvRV (data)) != SVt_PVAV)
1991 return 1;
1992
1993 /* now push all results on the stack */
1994 {
1995 dSP;
1996 AV *av = (AV *)SvRV (data);
1997 int i;
1998
1999 EXTEND (SP, AvFILLp (av) + 1);
2000 for (i = 0; i <= AvFILLp (av); ++i)
2001 PUSHs (sv_2mortal (AvARRAY (av)[i]));
2002
2003 /* we have stolen the elements, so ste length to zero and free */
2004 AvFILLp (av) = -1;
2005 av_undef (av);
2006
2007 PUTBACK;
2008 }
2009
2010 return 0;
2011}
2012
2013static void
2014slf_init_rouse_wait (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2015{
2016 SV *cb;
2017
2018 if (items)
2019 cb = arg [0];
2020 else
2021 {
2022 struct coro *coro = SvSTATE_current;
2023
2024 if (!coro->rouse_cb)
2025 croak ("Coro::rouse_wait called without rouse callback, and no default rouse callback found either,");
2026
2027 cb = sv_2mortal (coro->rouse_cb);
2028 coro->rouse_cb = 0;
2029 }
2030
2031 if (!SvROK (cb)
2032 || SvTYPE (SvRV (cb)) != SVt_PVCV
2033 || CvXSUB ((CV *)SvRV (cb)) != coro_rouse_callback)
2034 croak ("Coro::rouse_wait called with illegal callback argument,");
2035
2036 {
2037 CV *cv = (CV *)SvRV (cb); /* for GENSUB_ARG */
2038 SV *data = (SV *)GENSUB_ARG;
2039
2040 frame->data = (void *)data;
2041 frame->prepare = SvTYPE (SvRV (data)) == SVt_PVAV ? prepare_nop : prepare_schedule;
2042 frame->check = slf_check_rouse_wait;
2043 }
2044}
2045
2046static SV *
2047coro_new_rouse_cb (pTHX)
2048{
2049 HV *hv = (HV *)SvRV (coro_current);
2050 struct coro *coro = SvSTATE_hv (hv);
2051 SV *data = newRV_inc ((SV *)hv);
2052 SV *cb = gensub (aTHX_ coro_rouse_callback, (void *)data);
2053
2054 sv_magicext (SvRV (cb), data, CORO_MAGIC_type_rouse, 0, 0, 0);
2055 SvREFCNT_dec (data); /* magicext increases the refcount */
2056
2057 SvREFCNT_dec (coro->rouse_cb);
2058 coro->rouse_cb = SvREFCNT_inc_NN (cb);
2059
2060 return cb;
2061}
2062
2063/*****************************************************************************/
2064/* schedule-like-function opcode (SLF) */
2065
2066static UNOP slf_restore; /* restore stack as entersub did, for first-re-run */
2067static const CV *slf_cv;
2068static SV **slf_argv;
2069static int slf_argc, slf_arga; /* count, allocated */
2070static I32 slf_ax; /* top of stack, for restore */
2071
2072/* this restores the stack in the case we patched the entersub, to */
2073/* recreate the stack frame as perl will on following calls */
2074/* since entersub cleared the stack */
2075static OP *
2076pp_restore (pTHX)
2077{
2078 int i;
2079 SV **SP = PL_stack_base + slf_ax;
2080
2081 PUSHMARK (SP);
2082
2083 EXTEND (SP, slf_argc + 1);
2084
2085 for (i = 0; i < slf_argc; ++i)
2086 PUSHs (sv_2mortal (slf_argv [i]));
2087
2088 PUSHs ((SV *)CvGV (slf_cv));
2089
2090 RETURNOP (slf_restore.op_first);
2091}
2092
2093static void
2094slf_prepare_transfer (pTHX_ struct coro_transfer_args *ta)
2095{
2096 SV **arg = (SV **)slf_frame.data;
2097
2098 prepare_transfer (aTHX_ ta, arg [0], arg [1]);
2099}
2100
2101static void
2102slf_init_transfer (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2103{
2104 if (items != 2)
2105 croak ("Coro::State::transfer (prev, next) expects two arguments, not %d,", items);
2106
2107 frame->prepare = slf_prepare_transfer;
2108 frame->check = slf_check_nop;
2109 frame->data = (void *)arg; /* let's hope it will stay valid */
2110}
2111
2112static void
2113slf_init_schedule (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2114{
2115 frame->prepare = prepare_schedule;
2116 frame->check = slf_check_nop;
2117}
2118
2119static void
2120slf_prepare_schedule_to (pTHX_ struct coro_transfer_args *ta)
2121{
2122 struct coro *next = (struct coro *)slf_frame.data;
2123
2124 SvREFCNT_inc_NN (next->hv);
2125 prepare_schedule_to (aTHX_ ta, next);
2126}
2127
2128static void
2129slf_init_schedule_to (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2130{
2131 if (!items)
2132 croak ("Coro::schedule_to expects a coroutine argument, caught");
2133
2134 frame->data = (void *)SvSTATE (arg [0]);
2135 frame->prepare = slf_prepare_schedule_to;
2136 frame->check = slf_check_nop;
2137}
2138
2139static void
2140slf_init_cede_to (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2141{
2142 api_ready (aTHX_ SvRV (coro_current));
2143
2144 slf_init_schedule_to (aTHX_ frame, cv, arg, items);
2145}
2146
2147static void
2148slf_init_cede (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2149{
2150 frame->prepare = prepare_cede;
2151 frame->check = slf_check_nop;
2152}
2153
2154static void
2155slf_init_cede_notself (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2156{
2157 frame->prepare = prepare_cede_notself;
2158 frame->check = slf_check_nop;
2159}
2160
2161/*
2162 * these not obviously related functions are all rolled into one
2163 * function to increase chances that they all will call transfer with the same
2164 * stack offset
2165 * SLF stands for "schedule-like-function".
2166 */
2167static OP *
2168pp_slf (pTHX)
2169{
2170 I32 checkmark; /* mark SP to see how many elements check has pushed */
2171
2172 /* set up the slf frame, unless it has already been set-up */
2173 /* the latter happens when a new coro has been started */
2174 /* or when a new cctx was attached to an existing coroutine */
2175 if (expect_true (!slf_frame.prepare))
2176 {
2177 /* first iteration */
2178 dSP;
2179 SV **arg = PL_stack_base + TOPMARK + 1;
2180 int items = SP - arg; /* args without function object */
2181 SV *gv = *sp;
2182
2183 /* do a quick consistency check on the "function" object, and if it isn't */
2184 /* for us, divert to the real entersub */
2185 if (SvTYPE (gv) != SVt_PVGV
2186 || !GvCV (gv)
2187 || !(CvFLAGS (GvCV (gv)) & CVf_SLF))
2188 return PL_ppaddr[OP_ENTERSUB](aTHX);
2189
2190 if (!(PL_op->op_flags & OPf_STACKED))
2191 {
2192 /* ampersand-form of call, use @_ instead of stack */
2193 AV *av = GvAV (PL_defgv);
2194 arg = AvARRAY (av);
2195 items = AvFILLp (av) + 1;
2196 }
2197
2198 /* now call the init function, which needs to set up slf_frame */
2199 ((coro_slf_cb)CvXSUBANY (GvCV (gv)).any_ptr)
2200 (aTHX_ &slf_frame, GvCV (gv), arg, items);
2201
2202 /* pop args */
2203 SP = PL_stack_base + POPMARK;
2204
2205 PUTBACK;
2206 }
2207
2208 /* now that we have a slf_frame, interpret it! */
2209 /* we use a callback system not to make the code needlessly */
2210 /* complicated, but so we can run multiple perl coros from one cctx */
2211
2212 do
2213 {
2214 struct coro_transfer_args ta;
2215
2216 slf_frame.prepare (aTHX_ &ta);
2217 TRANSFER (ta, 0);
2218
2219 checkmark = PL_stack_sp - PL_stack_base;
2220 }
2221 while (slf_frame.check (aTHX_ &slf_frame));
2222
2223 slf_frame.prepare = 0; /* invalidate the frame, we are done processing it */
2224
2225 /* exception handling */
2226 if (expect_false (CORO_THROW))
2227 {
2228 SV *exception = sv_2mortal (CORO_THROW);
2229
2230 CORO_THROW = 0;
2231 sv_setsv (ERRSV, exception);
2232 croak (0);
2233 }
2234
2235 /* return value handling - mostly like entersub */
2236 /* make sure we put something on the stack in scalar context */
2237 if (GIMME_V == G_SCALAR)
2238 {
2239 dSP;
2240 SV **bot = PL_stack_base + checkmark;
2241
2242 if (sp == bot) /* too few, push undef */
2243 bot [1] = &PL_sv_undef;
2244 else if (sp != bot + 1) /* too many, take last one */
2245 bot [1] = *sp;
2246
2247 SP = bot + 1;
2248
2249 PUTBACK;
2250 }
2251
2252 return NORMAL;
2253}
2254
2255static void
2256api_execute_slf (pTHX_ CV *cv, coro_slf_cb init_cb, I32 ax)
2257{
2258 int i;
2259 SV **arg = PL_stack_base + ax;
2260 int items = PL_stack_sp - arg + 1;
2261
2262 assert (("FATAL: SLF call with illegal CV value", !CvANON (cv)));
2263
2264 if (PL_op->op_ppaddr != PL_ppaddr [OP_ENTERSUB]
2265 && PL_op->op_ppaddr != pp_slf)
2266 croak ("FATAL: Coro SLF calls can only be made normally, not via goto or any other means, caught");
2267
2268 CvFLAGS (cv) |= CVf_SLF;
2269 CvXSUBANY (cv).any_ptr = (void *)init_cb;
2270 slf_cv = cv;
2271
2272 /* we patch the op, and then re-run the whole call */
2273 /* we have to put the same argument on the stack for this to work */
2274 /* and this will be done by pp_restore */
2275 slf_restore.op_next = (OP *)&slf_restore;
2276 slf_restore.op_type = OP_CUSTOM;
2277 slf_restore.op_ppaddr = pp_restore;
2278 slf_restore.op_first = PL_op;
2279
2280 slf_ax = ax - 1; /* undo the ax++ inside dAXMARK */
2281
2282 if (PL_op->op_flags & OPf_STACKED)
2283 {
2284 if (items > slf_arga)
2285 {
2286 slf_arga = items;
2287 free (slf_argv);
2288 slf_argv = malloc (slf_arga * sizeof (SV *));
2289 }
2290
2291 slf_argc = items;
2292
2293 for (i = 0; i < items; ++i)
2294 slf_argv [i] = SvREFCNT_inc (arg [i]);
2295 }
2296 else
2297 slf_argc = 0;
2298
2299 PL_op->op_ppaddr = pp_slf;
2300 /*PL_op->op_type = OP_CUSTOM; /* we do behave like entersub still */
2301
2302 PL_op = (OP *)&slf_restore;
2303}
2304
2305/*****************************************************************************/
2306/* PerlIO::cede */
2307
2308typedef struct
2309{
2310 PerlIOBuf base;
2311 NV next, every;
2312} PerlIOCede;
2313
2314static IV
2315PerlIOCede_pushed (pTHX_ PerlIO *f, const char *mode, SV *arg, PerlIO_funcs *tab)
2316{
2317 PerlIOCede *self = PerlIOSelf (f, PerlIOCede);
2318
2319 self->every = SvCUR (arg) ? SvNV (arg) : 0.01;
2320 self->next = nvtime () + self->every;
2321
2322 return PerlIOBuf_pushed (aTHX_ f, mode, Nullsv, tab);
2323}
2324
2325static SV *
2326PerlIOCede_getarg (pTHX_ PerlIO *f, CLONE_PARAMS *param, int flags)
2327{
2328 PerlIOCede *self = PerlIOSelf (f, PerlIOCede);
2329
2330 return newSVnv (self->every);
2331}
2332
2333static IV
2334PerlIOCede_flush (pTHX_ PerlIO *f)
2335{
2336 PerlIOCede *self = PerlIOSelf (f, PerlIOCede);
2337 double now = nvtime ();
2338
2339 if (now >= self->next)
2340 {
2341 api_cede (aTHX);
2342 self->next = now + self->every;
2343 }
2344
2345 return PerlIOBuf_flush (aTHX_ f);
2346}
2347
2348static PerlIO_funcs PerlIO_cede =
2349{
2350 sizeof(PerlIO_funcs),
2351 "cede",
2352 sizeof(PerlIOCede),
2353 PERLIO_K_DESTRUCT | PERLIO_K_RAW,
2354 PerlIOCede_pushed,
2355 PerlIOBuf_popped,
2356 PerlIOBuf_open,
2357 PerlIOBase_binmode,
2358 PerlIOCede_getarg,
2359 PerlIOBase_fileno,
2360 PerlIOBuf_dup,
2361 PerlIOBuf_read,
2362 PerlIOBuf_unread,
2363 PerlIOBuf_write,
2364 PerlIOBuf_seek,
2365 PerlIOBuf_tell,
2366 PerlIOBuf_close,
2367 PerlIOCede_flush,
2368 PerlIOBuf_fill,
2369 PerlIOBase_eof,
2370 PerlIOBase_error,
2371 PerlIOBase_clearerr,
2372 PerlIOBase_setlinebuf,
2373 PerlIOBuf_get_base,
2374 PerlIOBuf_bufsiz,
2375 PerlIOBuf_get_ptr,
2376 PerlIOBuf_get_cnt,
2377 PerlIOBuf_set_ptrcnt,
2378};
2379
2380/*****************************************************************************/
2381/* Coro::Semaphore & Coro::Signal */
2382
2383static SV *
2384coro_waitarray_new (pTHX_ int count)
2385{
2386 /* a semaphore contains a counter IV in $sem->[0] and any waiters after that */
2387 AV *av = newAV ();
2388 SV **ary;
2389
2390 /* unfortunately, building manually saves memory */
2391 Newx (ary, 2, SV *);
2392 AvALLOC (av) = ary;
2393#if PERL_VERSION_ATLEAST (5,10,0)
2394 AvARRAY (av) = ary;
2395#else
2396 /* 5.8.8 needs this syntax instead of AvARRAY = ary, yet */
2397 /* -DDEBUGGING flags this as a bug, despite it perfectly working */
2398 SvPVX ((SV *)av) = (char *)ary;
2399#endif
2400 AvMAX (av) = 1;
2401 AvFILLp (av) = 0;
2402 ary [0] = newSViv (count);
2403
2404 return newRV_noinc ((SV *)av);
2405}
2406
2407/* semaphore */
2408
2409static void
2410coro_semaphore_adjust (pTHX_ AV *av, IV adjust)
2411{
2412 SV *count_sv = AvARRAY (av)[0];
2413 IV count = SvIVX (count_sv);
2414
2415 count += adjust;
2416 SvIVX (count_sv) = count;
2417
2418 /* now wake up as many waiters as are expected to lock */
2419 while (count > 0 && AvFILLp (av) > 0)
2420 {
2421 SV *cb;
2422
2423 /* swap first two elements so we can shift a waiter */
2424 AvARRAY (av)[0] = AvARRAY (av)[1];
2425 AvARRAY (av)[1] = count_sv;
2426 cb = av_shift (av);
2427
2428 if (SvOBJECT (cb))
2429 {
2430 api_ready (aTHX_ cb);
2431 --count;
2432 }
2433 else if (SvTYPE (cb) == SVt_PVCV)
2434 {
2435 dSP;
2436 PUSHMARK (SP);
2437 XPUSHs (sv_2mortal (newRV_inc ((SV *)av)));
2438 PUTBACK;
2439 call_sv (cb, G_VOID | G_DISCARD | G_EVAL | G_KEEPERR);
2440 }
2441
2442 SvREFCNT_dec (cb);
2443 }
2444}
2445
2446static void
2447coro_semaphore_on_destroy (pTHX_ struct coro *coro)
2448{
2449 /* call $sem->adjust (0) to possibly wake up some other waiters */
2450 coro_semaphore_adjust (aTHX_ (AV *)coro->slf_frame.data, 0);
2451}
2452
2453static int
2454slf_check_semaphore_down_or_wait (pTHX_ struct CoroSLF *frame, int acquire)
2455{
2456 AV *av = (AV *)frame->data;
2457 SV *count_sv = AvARRAY (av)[0];
2458
2459 /* if we are about to throw, don't actually acquire the lock, just throw */
2460 if (CORO_THROW)
2461 return 0;
2462 else if (SvIVX (count_sv) > 0)
2463 {
2464 SvSTATE_current->on_destroy = 0;
2465
2466 if (acquire)
2467 SvIVX (count_sv) = SvIVX (count_sv) - 1;
2468 else
2469 coro_semaphore_adjust (aTHX_ av, 0);
2470
2471 return 0;
2472 }
2473 else
2474 {
2475 int i;
2476 /* if we were woken up but can't down, we look through the whole */
2477 /* waiters list and only add us if we aren't in there already */
2478 /* this avoids some degenerate memory usage cases */
2479
2480 for (i = 1; i <= AvFILLp (av); ++i)
2481 if (AvARRAY (av)[i] == SvRV (coro_current))
2482 return 1;
2483
2484 av_push (av, SvREFCNT_inc (SvRV (coro_current)));
2485 return 1;
2486 }
2487}
2488
2489static int
2490slf_check_semaphore_down (pTHX_ struct CoroSLF *frame)
2491{
2492 return slf_check_semaphore_down_or_wait (aTHX_ frame, 1);
2493}
2494
2495static int
2496slf_check_semaphore_wait (pTHX_ struct CoroSLF *frame)
2497{
2498 return slf_check_semaphore_down_or_wait (aTHX_ frame, 0);
2499}
2500
2501static void
2502slf_init_semaphore_down_or_wait (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2503{
2504 AV *av = (AV *)SvRV (arg [0]);
2505
2506 if (SvIVX (AvARRAY (av)[0]) > 0)
2507 {
2508 frame->data = (void *)av;
2509 frame->prepare = prepare_nop;
2510 }
2511 else
2512 {
2513 av_push (av, SvREFCNT_inc (SvRV (coro_current)));
2514
2515 frame->data = (void *)sv_2mortal (SvREFCNT_inc ((SV *)av));
2516 frame->prepare = prepare_schedule;
2517
2518 /* to avoid race conditions when a woken-up coro gets terminated */
2519 /* we arrange for a temporary on_destroy that calls adjust (0) */
2520 SvSTATE_current->on_destroy = coro_semaphore_on_destroy;
2521 }
2522}
2523
2524static void
2525slf_init_semaphore_down (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2526{
2527 slf_init_semaphore_down_or_wait (aTHX_ frame, cv, arg, items);
2528 frame->check = slf_check_semaphore_down;
2529}
2530
2531static void
2532slf_init_semaphore_wait (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2533{
2534 if (items >= 2)
2535 {
2536 /* callback form */
2537 AV *av = (AV *)SvRV (arg [0]);
2538 CV *cb_cv = coro_sv_2cv (aTHX_ arg [1]);
2539
2540 av_push (av, (SV *)SvREFCNT_inc_NN (cb_cv));
2541
2542 if (SvIVX (AvARRAY (av)[0]) > 0)
2543 coro_semaphore_adjust (aTHX_ av, 0);
2544
2545 frame->prepare = prepare_nop;
2546 frame->check = slf_check_nop;
2547 }
2548 else
2549 {
2550 slf_init_semaphore_down_or_wait (aTHX_ frame, cv, arg, items);
2551 frame->check = slf_check_semaphore_wait;
2552 }
2553}
2554
2555/* signal */
2556
2557static void
2558coro_signal_wake (pTHX_ AV *av, int count)
2559{
2560 SvIVX (AvARRAY (av)[0]) = 0;
2561
2562 /* now signal count waiters */
2563 while (count > 0 && AvFILLp (av) > 0)
2564 {
2565 SV *cb;
2566
2567 /* swap first two elements so we can shift a waiter */
2568 cb = AvARRAY (av)[0];
2569 AvARRAY (av)[0] = AvARRAY (av)[1];
2570 AvARRAY (av)[1] = cb;
2571
2572 cb = av_shift (av);
2573
2574 api_ready (aTHX_ cb);
2575 sv_setiv (cb, 0); /* signal waiter */
2576 SvREFCNT_dec (cb);
2577
2578 --count;
2579 }
2580}
2581
2582static int
2583slf_check_signal_wait (pTHX_ struct CoroSLF *frame)
2584{
2585 /* if we are about to throw, also stop waiting */
2586 return SvROK ((SV *)frame->data) && !CORO_THROW;
2587}
2588
2589static void
2590slf_init_signal_wait (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2591{
2592 AV *av = (AV *)SvRV (arg [0]);
2593
2594 if (SvIVX (AvARRAY (av)[0]))
2595 {
2596 SvIVX (AvARRAY (av)[0]) = 0;
2597 frame->prepare = prepare_nop;
2598 frame->check = slf_check_nop;
2599 }
2600 else
2601 {
2602 SV *waiter = newRV_inc (SvRV (coro_current)); /* owned by signal av */
2603
2604 av_push (av, waiter);
2605
2606 frame->data = (void *)sv_2mortal (SvREFCNT_inc_NN (waiter)); /* owned by process */
2607 frame->prepare = prepare_schedule;
2608 frame->check = slf_check_signal_wait;
2609 }
2610}
2611
2612/*****************************************************************************/
2613/* Coro::AIO */
2614
2615#define CORO_MAGIC_type_aio PERL_MAGIC_ext
2616
2617/* helper storage struct */
2618struct io_state
2619{
2620 int errorno;
2621 I32 laststype; /* U16 in 5.10.0 */
2622 int laststatval;
2623 Stat_t statcache;
2624};
2625
2626static void
2627coro_aio_callback (pTHX_ CV *cv)
2628{
2629 dXSARGS;
2630 AV *state = (AV *)GENSUB_ARG;
2631 SV *coro = av_pop (state);
2632 SV *data_sv = newSV (sizeof (struct io_state));
2633
2634 av_extend (state, items - 1);
2635
2636 sv_upgrade (data_sv, SVt_PV);
2637 SvCUR_set (data_sv, sizeof (struct io_state));
2638 SvPOK_only (data_sv);
2639
2640 {
2641 struct io_state *data = (struct io_state *)SvPVX (data_sv);
2642
2643 data->errorno = errno;
2644 data->laststype = PL_laststype;
2645 data->laststatval = PL_laststatval;
2646 data->statcache = PL_statcache;
2647 }
2648
2649 /* now build the result vector out of all the parameters and the data_sv */
2650 {
2651 int i;
2652
2653 for (i = 0; i < items; ++i)
2654 av_push (state, SvREFCNT_inc_NN (ST (i)));
2655 }
2656
2657 av_push (state, data_sv);
2658
2659 api_ready (aTHX_ coro);
2660 SvREFCNT_dec (coro);
2661 SvREFCNT_dec ((AV *)state);
2662}
2663
2664static int
2665slf_check_aio_req (pTHX_ struct CoroSLF *frame)
2666{
2667 AV *state = (AV *)frame->data;
2668
2669 /* if we are about to throw, return early */
2670 /* this does not cancel the aio request, but at least */
2671 /* it quickly returns */
2672 if (CORO_THROW)
2673 return 0;
2674
2675 /* one element that is an RV? repeat! */
2676 if (AvFILLp (state) == 0 && SvROK (AvARRAY (state)[0]))
2677 return 1;
2678
2679 /* restore status */
2680 {
2681 SV *data_sv = av_pop (state);
2682 struct io_state *data = (struct io_state *)SvPVX (data_sv);
2683
2684 errno = data->errorno;
2685 PL_laststype = data->laststype;
2686 PL_laststatval = data->laststatval;
2687 PL_statcache = data->statcache;
2688
2689 SvREFCNT_dec (data_sv);
2690 }
2691
2692 /* push result values */
2693 {
2694 dSP;
2695 int i;
2696
2697 EXTEND (SP, AvFILLp (state) + 1);
2698 for (i = 0; i <= AvFILLp (state); ++i)
2699 PUSHs (sv_2mortal (SvREFCNT_inc_NN (AvARRAY (state)[i])));
2700
2701 PUTBACK;
2702 }
2703
2704 return 0;
2705}
2706
2707static void
2708slf_init_aio_req (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2709{
2710 AV *state = (AV *)sv_2mortal ((SV *)newAV ());
2711 SV *coro_hv = SvRV (coro_current);
2712 struct coro *coro = SvSTATE_hv (coro_hv);
2713
2714 /* put our coroutine id on the state arg */
2715 av_push (state, SvREFCNT_inc_NN (coro_hv));
2716
2717 /* first see whether we have a non-zero priority and set it as AIO prio */
2718 if (coro->prio)
2719 {
2720 dSP;
2721
2722 static SV *prio_cv;
2723 static SV *prio_sv;
2724
2725 if (expect_false (!prio_cv))
2726 {
2727 prio_cv = (SV *)get_cv ("IO::AIO::aioreq_pri", 0);
2728 prio_sv = newSViv (0);
2729 }
2730
2731 PUSHMARK (SP);
2732 sv_setiv (prio_sv, coro->prio);
2733 XPUSHs (prio_sv);
2734
2735 PUTBACK;
2736 call_sv (prio_cv, G_VOID | G_DISCARD);
2737 }
2738
2739 /* now call the original request */
2740 {
2741 dSP;
2742 CV *req = (CV *)CORO_MAGIC_NN ((SV *)cv, CORO_MAGIC_type_aio)->mg_obj;
2743 int i;
2744
2745 PUSHMARK (SP);
2746
2747 /* first push all args to the stack */
2748 EXTEND (SP, items + 1);
2749
2750 for (i = 0; i < items; ++i)
2751 PUSHs (arg [i]);
2752
2753 /* now push the callback closure */
2754 PUSHs (sv_2mortal (gensub (aTHX_ coro_aio_callback, (void *)SvREFCNT_inc_NN ((SV *)state))));
2755
2756 /* now call the AIO function - we assume our request is uncancelable */
2757 PUTBACK;
2758 call_sv ((SV *)req, G_VOID | G_DISCARD);
2759 }
2760
2761 /* now that the requets is going, we loop toll we have a result */
2762 frame->data = (void *)state;
2763 frame->prepare = prepare_schedule;
2764 frame->check = slf_check_aio_req;
2765}
2766
2767static void
2768coro_aio_req_xs (pTHX_ CV *cv)
2769{
2770 dXSARGS;
2771
2772 CORO_EXECUTE_SLF_XS (slf_init_aio_req);
2773
2774 XSRETURN_EMPTY;
2775}
2776
2777/*****************************************************************************/
2778
2779#if CORO_CLONE
2780# include "clone.c"
2781#endif
2782
2783MODULE = Coro::State PACKAGE = Coro::State PREFIX = api_
2784
2785PROTOTYPES: DISABLE
2786
2787BOOT:
2788{
2789#ifdef USE_ITHREADS
2790# if CORO_PTHREAD
2791 coro_thx = PERL_GET_CONTEXT;
2792# endif
2793#endif
2794 BOOT_PAGESIZE;
2795
2796 cctx_current = cctx_new_empty ();
2797
2798 irsgv = gv_fetchpv ("/" , GV_ADD|GV_NOTQUAL, SVt_PV);
2799 stdoutgv = gv_fetchpv ("STDOUT", GV_ADD|GV_NOTQUAL, SVt_PVIO);
2800
2801 orig_sigelem_get = PL_vtbl_sigelem.svt_get; PL_vtbl_sigelem.svt_get = coro_sigelem_get;
2802 orig_sigelem_set = PL_vtbl_sigelem.svt_set; PL_vtbl_sigelem.svt_set = coro_sigelem_set;
2803 orig_sigelem_clr = PL_vtbl_sigelem.svt_clear; PL_vtbl_sigelem.svt_clear = coro_sigelem_clr;
2804
2805 hv_sig = coro_get_hv (aTHX_ "SIG", TRUE);
2806 rv_diehook = newRV_inc ((SV *)gv_fetchpv ("Coro::State::diehook" , 0, SVt_PVCV));
2807 rv_warnhook = newRV_inc ((SV *)gv_fetchpv ("Coro::State::warnhook", 0, SVt_PVCV));
2808
2809 coro_state_stash = gv_stashpv ("Coro::State", TRUE);
2810
2811 newCONSTSUB (coro_state_stash, "CC_TRACE" , newSViv (CC_TRACE));
2812 newCONSTSUB (coro_state_stash, "CC_TRACE_SUB" , newSViv (CC_TRACE_SUB));
2813 newCONSTSUB (coro_state_stash, "CC_TRACE_LINE", newSViv (CC_TRACE_LINE));
2814 newCONSTSUB (coro_state_stash, "CC_TRACE_ALL" , newSViv (CC_TRACE_ALL));
2815
2816 main_mainstack = PL_mainstack;
2817 main_top_env = PL_top_env;
2818
2819 while (main_top_env->je_prev)
2820 main_top_env = main_top_env->je_prev;
2821
2822 {
2823 SV *slf = sv_2mortal (newSViv (PTR2IV (pp_slf)));
2824
2825 if (!PL_custom_op_names) PL_custom_op_names = newHV ();
2826 hv_store_ent (PL_custom_op_names, slf, newSVpv ("coro_slf", 0), 0);
2827
2828 if (!PL_custom_op_descs) PL_custom_op_descs = newHV ();
2829 hv_store_ent (PL_custom_op_descs, slf, newSVpv ("coro schedule like function", 0), 0);
2830 }
2831
2832 coroapi.ver = CORO_API_VERSION;
2833 coroapi.rev = CORO_API_REVISION;
2834
2835 coroapi.transfer = api_transfer;
2836
2837 coroapi.sv_state = SvSTATE_;
2838 coroapi.execute_slf = api_execute_slf;
2839 coroapi.prepare_nop = prepare_nop;
2840 coroapi.prepare_schedule = prepare_schedule;
2841 coroapi.prepare_cede = prepare_cede;
2842 coroapi.prepare_cede_notself = prepare_cede_notself;
2843
2844 {
2845 SV **svp = hv_fetch (PL_modglobal, "Time::NVtime", 12, 0);
2846
2847 if (!svp) croak ("Time::HiRes is required");
2848 if (!SvIOK (*svp)) croak ("Time::NVtime isn't a function pointer");
2849
2850 nvtime = INT2PTR (double (*)(), SvIV (*svp));
2851 }
2852
2853 assert (("PRIO_NORMAL must be 0", !PRIO_NORMAL));
2854}
2855
2856SV *
2857new (char *klass, ...)
2858 ALIAS:
2859 Coro::new = 1
2860 CODE:
2861{
2862 struct coro *coro;
2863 MAGIC *mg;
2864 HV *hv;
2865 CV *cb;
2866 int i;
2867
2868 if (items > 1)
2869 {
2870 cb = coro_sv_2cv (aTHX_ ST (1));
2871
2872 if (!ix)
215 { 2873 {
216 /* I never used formats, so how should I know how these are implemented? */ 2874 if (CvISXSUB (cb))
217 /* my bold guess is as a simple, plain sub... */ 2875 croak ("Coro::State doesn't support XS functions as coroutine start, caught");
218 croak ("CXt_FORMAT not yet handled. Don't switch coroutines from within formats"); 2876
2877 if (!CvROOT (cb))
2878 croak ("Coro::State doesn't support autoloaded or undefined functions as coroutine start, caught");
219 } 2879 }
220 } 2880 }
221 2881
222 if (top_si->si_type == PERLSI_MAIN)
223 break;
224
225 top_si = top_si->si_prev;
226 ccstk = top_si->si_cxstack;
227 cxix = top_si->si_cxix;
228 }
229
230 PUTBACK;
231 }
232
233 c->dowarn = PL_dowarn;
234 c->defav = GvAV (PL_defgv);
235 c->curstackinfo = PL_curstackinfo;
236 c->curstack = PL_curstack;
237 c->mainstack = PL_mainstack;
238 c->stack_sp = PL_stack_sp;
239 c->op = PL_op;
240 c->curpad = PL_curpad;
241 c->stack_base = PL_stack_base;
242 c->stack_max = PL_stack_max;
243 c->tmps_stack = PL_tmps_stack;
244 c->tmps_floor = PL_tmps_floor;
245 c->tmps_ix = PL_tmps_ix;
246 c->tmps_max = PL_tmps_max;
247 c->markstack = PL_markstack;
248 c->markstack_ptr = PL_markstack_ptr;
249 c->markstack_max = PL_markstack_max;
250 c->scopestack = PL_scopestack;
251 c->scopestack_ix = PL_scopestack_ix;
252 c->scopestack_max = PL_scopestack_max;
253 c->savestack = PL_savestack;
254 c->savestack_ix = PL_savestack_ix;
255 c->savestack_max = PL_savestack_max;
256 c->retstack = PL_retstack;
257 c->retstack_ix = PL_retstack_ix;
258 c->retstack_max = PL_retstack_max;
259 c->curcop = PL_curcop;
260}
261
262static void
263LOAD(pTHX_ Coro__State c)
264{
265 PL_dowarn = c->dowarn;
266 GvAV (PL_defgv) = c->defav;
267 PL_curstackinfo = c->curstackinfo;
268 PL_curstack = c->curstack;
269 PL_mainstack = c->mainstack;
270 PL_stack_sp = c->stack_sp;
271 PL_op = c->op;
272 PL_curpad = c->curpad;
273 PL_stack_base = c->stack_base;
274 PL_stack_max = c->stack_max;
275 PL_tmps_stack = c->tmps_stack;
276 PL_tmps_floor = c->tmps_floor;
277 PL_tmps_ix = c->tmps_ix;
278 PL_tmps_max = c->tmps_max;
279 PL_markstack = c->markstack;
280 PL_markstack_ptr = c->markstack_ptr;
281 PL_markstack_max = c->markstack_max;
282 PL_scopestack = c->scopestack;
283 PL_scopestack_ix = c->scopestack_ix;
284 PL_scopestack_max = c->scopestack_max;
285 PL_savestack = c->savestack;
286 PL_savestack_ix = c->savestack_ix;
287 PL_savestack_max = c->savestack_max;
288 PL_retstack = c->retstack;
289 PL_retstack_ix = c->retstack_ix;
290 PL_retstack_max = c->retstack_max;
291 PL_curcop = c->curcop;
292
293 {
294 dSP;
295 CV *cv;
296
297 /* now do the ugly restore mess */
298 while ((cv = (CV *)POPs))
299 {
300 AV *padlist = (AV *)POPs;
301
302 unuse_padlist (CvPADLIST(cv));
303 CvPADLIST(cv) = padlist;
304 CvDEPTH(cv) = (I32)POPs;
305
306#ifdef USE_THREADS
307 CvOWNER(cv) = (struct perl_thread *)POPs;
308 error does not work either
309#endif
310 }
311
312 PUTBACK;
313 }
314}
315
316/* this is an EXACT copy of S_nuke_stacks in perl.c, which is unfortunately static */
317STATIC void
318S_nuke_stacks(pTHX)
319{
320 while (PL_curstackinfo->si_next)
321 PL_curstackinfo = PL_curstackinfo->si_next;
322 while (PL_curstackinfo) {
323 PERL_SI *p = PL_curstackinfo->si_prev;
324 /* curstackinfo->si_stack got nuked by sv_free_arenas() */
325 Safefree(PL_curstackinfo->si_cxstack);
326 Safefree(PL_curstackinfo);
327 PL_curstackinfo = p;
328 }
329 Safefree(PL_tmps_stack);
330 Safefree(PL_markstack);
331 Safefree(PL_scopestack);
332 Safefree(PL_savestack);
333 Safefree(PL_retstack);
334}
335
336#define SUB_INIT "Coro::State::_newcoro"
337
338MODULE = Coro::State PACKAGE = Coro::State
339
340PROTOTYPES: ENABLE
341
342BOOT:
343 if (!padlist_cache)
344 padlist_cache = newHV ();
345
346Coro::State
347_newprocess(args)
348 SV * args
349 PROTOTYPE: $
350 CODE:
351 Coro__State coro;
352
353 if (!SvROK (args) || SvTYPE (SvRV (args)) != SVt_PVAV)
354 croak ("Coro::State::newprocess expects an arrayref");
355
356 New (0, coro, 1, struct coro); 2882 Newz (0, coro, 1, struct coro);
2883 coro->args = newAV ();
2884 coro->flags = CF_NEW;
357 2885
358 coro->mainstack = 0; /* actual work is done inside transfer */ 2886 if (coro_first) coro_first->prev = coro;
359 coro->args = (AV *)SvREFCNT_inc (SvRV (args)); 2887 coro->next = coro_first;
2888 coro_first = coro;
360 2889
361 RETVAL = coro; 2890 coro->hv = hv = newHV ();
2891 mg = sv_magicext ((SV *)hv, 0, CORO_MAGIC_type_state, &coro_state_vtbl, (char *)coro, 0);
2892 mg->mg_flags |= MGf_DUP;
2893 RETVAL = sv_bless (newRV_noinc ((SV *)hv), gv_stashpv (klass, 1));
2894
2895 if (items > 1)
2896 {
2897 av_extend (coro->args, items - 1 + ix - 1);
2898
2899 if (ix)
2900 {
2901 av_push (coro->args, SvREFCNT_inc_NN ((SV *)cb));
2902 cb = cv_coro_run;
2903 }
2904
2905 coro->startcv = (CV *)SvREFCNT_inc_NN ((SV *)cb);
2906
2907 for (i = 2; i < items; i++)
2908 av_push (coro->args, newSVsv (ST (i)));
2909 }
2910}
362 OUTPUT: 2911 OUTPUT:
363 RETVAL 2912 RETVAL
364 2913
365void 2914void
366transfer(prev,next) 2915transfer (...)
367 Coro::State_or_hashref prev 2916 PROTOTYPE: $$
368 Coro::State_or_hashref next 2917 CODE:
369 CODE: 2918 CORO_EXECUTE_SLF_XS (slf_init_transfer);
370 2919
371 if (prev != next) 2920bool
2921_destroy (SV *coro_sv)
2922 CODE:
2923 RETVAL = coro_state_destroy (aTHX_ SvSTATE (coro_sv));
2924 OUTPUT:
2925 RETVAL
2926
2927void
2928_exit (int code)
2929 PROTOTYPE: $
2930 CODE:
2931 _exit (code);
2932
2933SV *
2934clone (Coro::State coro)
2935 CODE:
2936{
2937#if CORO_CLONE
2938 struct coro *ncoro = coro_clone (aTHX_ coro);
2939 MAGIC *mg;
2940 /* TODO: too much duplication */
2941 ncoro->hv = newHV ();
2942 mg = sv_magicext ((SV *)ncoro->hv, 0, CORO_MAGIC_type_state, &coro_state_vtbl, (char *)ncoro, 0);
2943 mg->mg_flags |= MGf_DUP;
2944 RETVAL = sv_bless (newRV_noinc ((SV *)ncoro->hv), SvSTASH (coro->hv));
2945#else
2946 croak ("Coro::State->clone has not been configured into this installation of Coro, realised");
2947#endif
2948}
2949 OUTPUT:
2950 RETVAL
2951
2952int
2953cctx_stacksize (int new_stacksize = 0)
2954 PROTOTYPE: ;$
2955 CODE:
2956 RETVAL = cctx_stacksize;
2957 if (new_stacksize)
372 { 2958 {
373 PUTBACK; 2959 cctx_stacksize = new_stacksize;
374 SAVE (aTHX_ prev); 2960 ++cctx_gen;
375
376 /* 2961 }
377 * this could be done in newprocess which would lead to 2962 OUTPUT:
378 * extremely elegant and fast (just PUTBACK/SAVE/LOAD/SPAGAIN) 2963 RETVAL
379 * code here, but lazy allocation of stacks has also 2964
380 * some virtues and the overhead of the if() is nil. 2965int
2966cctx_max_idle (int max_idle = 0)
2967 PROTOTYPE: ;$
2968 CODE:
2969 RETVAL = cctx_max_idle;
2970 if (max_idle > 1)
2971 cctx_max_idle = max_idle;
2972 OUTPUT:
2973 RETVAL
2974
2975int
2976cctx_count ()
2977 PROTOTYPE:
2978 CODE:
2979 RETVAL = cctx_count;
2980 OUTPUT:
2981 RETVAL
2982
2983int
2984cctx_idle ()
2985 PROTOTYPE:
2986 CODE:
2987 RETVAL = cctx_idle;
2988 OUTPUT:
2989 RETVAL
2990
2991void
2992list ()
2993 PROTOTYPE:
2994 PPCODE:
2995{
2996 struct coro *coro;
2997 for (coro = coro_first; coro; coro = coro->next)
2998 if (coro->hv)
2999 XPUSHs (sv_2mortal (newRV_inc ((SV *)coro->hv)));
3000}
3001
3002void
3003call (Coro::State coro, SV *coderef)
3004 ALIAS:
3005 eval = 1
3006 CODE:
3007{
3008 if (coro->mainstack && ((coro->flags & CF_RUNNING) || coro->slot))
381 */ 3009 {
382 if (next->mainstack) 3010 struct coro temp;
3011
3012 if (!(coro->flags & CF_RUNNING))
383 { 3013 {
384 LOAD (aTHX_ next); 3014 PUTBACK;
385 next->mainstack = 0; /* unnecessary but much cleaner */ 3015 save_perl (aTHX_ &temp);
3016 load_perl (aTHX_ coro);
3017 }
3018
3019 {
3020 dSP;
3021 ENTER;
3022 SAVETMPS;
3023 PUTBACK;
3024 PUSHSTACK;
3025 PUSHMARK (SP);
3026
3027 if (ix)
3028 eval_sv (coderef, 0);
3029 else
3030 call_sv (coderef, G_KEEPERR | G_EVAL | G_VOID | G_DISCARD);
3031
3032 POPSTACK;
3033 SPAGAIN;
3034 FREETMPS;
3035 LEAVE;
3036 PUTBACK;
3037 }
3038
3039 if (!(coro->flags & CF_RUNNING))
3040 {
3041 save_perl (aTHX_ coro);
3042 load_perl (aTHX_ &temp);
386 SPAGAIN; 3043 SPAGAIN;
387 } 3044 }
388 else
389 {
390 /*
391 * emulate part of the perl startup here.
392 */
393 UNOP myop;
394
395 init_stacks ();
396 PL_op = (OP *)&myop;
397 /*PL_curcop = 0;*/
398 GvAV (PL_defgv) = (SV *)SvREFCNT_inc (next->args);
399
400 SPAGAIN;
401 Zero(&myop, 1, UNOP);
402 myop.op_next = Nullop;
403 myop.op_flags = OPf_WANT_VOID;
404
405 PUSHMARK(SP);
406 XPUSHs ((SV*)get_cv(SUB_INIT, TRUE));
407 PUTBACK;
408 /*
409 * the next line is slightly wrong, as PL_op->op_next
410 * is actually being executed so we skip the first op.
411 * that doesn't matter, though, since it is only
412 * pp_nextstate and we never return...
413 */
414 PL_op = Perl_pp_entersub(aTHX);
415 SPAGAIN;
416
417 ENTER;
418 }
419 } 3045 }
3046}
3047
3048SV *
3049is_ready (Coro::State coro)
3050 PROTOTYPE: $
3051 ALIAS:
3052 is_ready = CF_READY
3053 is_running = CF_RUNNING
3054 is_new = CF_NEW
3055 is_destroyed = CF_DESTROYED
3056 CODE:
3057 RETVAL = boolSV (coro->flags & ix);
3058 OUTPUT:
3059 RETVAL
420 3060
421void 3061void
422DESTROY(coro) 3062throw (Coro::State self, SV *throw = &PL_sv_undef)
423 Coro::State coro 3063 PROTOTYPE: $;$
424 CODE: 3064 CODE:
3065{
3066 struct coro *current = SvSTATE_current;
3067 SV **throwp = self == current ? &CORO_THROW : &self->except;
3068 SvREFCNT_dec (*throwp);
3069 *throwp = SvOK (throw) ? newSVsv (throw) : 0;
3070}
425 3071
426 if (coro->mainstack) 3072void
3073api_trace (SV *coro, int flags = CC_TRACE | CC_TRACE_SUB)
3074 PROTOTYPE: $;$
3075 C_ARGS: aTHX_ coro, flags
3076
3077SV *
3078has_cctx (Coro::State coro)
3079 PROTOTYPE: $
3080 CODE:
3081 /* maybe manage the running flag differently */
3082 RETVAL = boolSV (!!coro->cctx || (coro->flags & CF_RUNNING));
3083 OUTPUT:
3084 RETVAL
3085
3086int
3087is_traced (Coro::State coro)
3088 PROTOTYPE: $
3089 CODE:
3090 RETVAL = (coro->cctx ? coro->cctx->flags : 0) & CC_TRACE_ALL;
3091 OUTPUT:
3092 RETVAL
3093
3094UV
3095rss (Coro::State coro)
3096 PROTOTYPE: $
3097 ALIAS:
3098 usecount = 1
3099 CODE:
3100 switch (ix)
3101 {
3102 case 0: RETVAL = coro_rss (aTHX_ coro); break;
3103 case 1: RETVAL = coro->usecount; break;
3104 }
3105 OUTPUT:
3106 RETVAL
3107
3108void
3109force_cctx ()
3110 PROTOTYPE:
3111 CODE:
3112 cctx_current->idle_sp = 0;
3113
3114void
3115swap_defsv (Coro::State self)
3116 PROTOTYPE: $
3117 ALIAS:
3118 swap_defav = 1
3119 CODE:
3120 if (!self->slot)
3121 croak ("cannot swap state with coroutine that has no saved state,");
3122 else
427 { 3123 {
428 struct coro temp; 3124 SV **src = ix ? (SV **)&GvAV (PL_defgv) : &GvSV (PL_defgv);
3125 SV **dst = ix ? (SV **)&self->slot->defav : (SV **)&self->slot->defsv;
429 3126
3127 SV *tmp = *src; *src = *dst; *dst = tmp;
3128 }
3129
3130
3131MODULE = Coro::State PACKAGE = Coro
3132
3133BOOT:
3134{
3135 int i;
3136
3137 sv_pool_rss = coro_get_sv (aTHX_ "Coro::POOL_RSS" , TRUE);
3138 sv_pool_size = coro_get_sv (aTHX_ "Coro::POOL_SIZE" , TRUE);
3139 cv_coro_run = get_cv ( "Coro::_coro_run" , GV_ADD);
3140 cv_coro_terminate = get_cv ( "Coro::terminate" , GV_ADD);
3141 coro_current = coro_get_sv (aTHX_ "Coro::current" , FALSE); SvREADONLY_on (coro_current);
3142 av_async_pool = coro_get_av (aTHX_ "Coro::async_pool", TRUE);
3143 av_destroy = coro_get_av (aTHX_ "Coro::destroy" , TRUE);
3144 sv_manager = coro_get_sv (aTHX_ "Coro::manager" , TRUE);
3145 sv_idle = coro_get_sv (aTHX_ "Coro::idle" , TRUE);
3146
3147 sv_async_pool_idle = newSVpv ("[async pool idle]", 0); SvREADONLY_on (sv_async_pool_idle);
3148 sv_Coro = newSVpv ("Coro", 0); SvREADONLY_on (sv_Coro);
3149 cv_pool_handler = get_cv ("Coro::pool_handler", GV_ADD); SvREADONLY_on (cv_pool_handler);
3150 cv_coro_state_new = get_cv ("Coro::State::new", 0); SvREADONLY_on (cv_coro_state_new);
3151
3152 coro_stash = gv_stashpv ("Coro", TRUE);
3153
3154 newCONSTSUB (coro_stash, "PRIO_MAX", newSViv (PRIO_MAX));
3155 newCONSTSUB (coro_stash, "PRIO_HIGH", newSViv (PRIO_HIGH));
3156 newCONSTSUB (coro_stash, "PRIO_NORMAL", newSViv (PRIO_NORMAL));
3157 newCONSTSUB (coro_stash, "PRIO_LOW", newSViv (PRIO_LOW));
3158 newCONSTSUB (coro_stash, "PRIO_IDLE", newSViv (PRIO_IDLE));
3159 newCONSTSUB (coro_stash, "PRIO_MIN", newSViv (PRIO_MIN));
3160
3161 for (i = PRIO_MAX - PRIO_MIN + 1; i--; )
3162 coro_ready[i] = newAV ();
3163
3164 {
3165 SV *sv = coro_get_sv (aTHX_ "Coro::API", TRUE);
3166
3167 coroapi.schedule = api_schedule;
3168 coroapi.schedule_to = api_schedule_to;
3169 coroapi.cede = api_cede;
3170 coroapi.cede_notself = api_cede_notself;
3171 coroapi.ready = api_ready;
3172 coroapi.is_ready = api_is_ready;
3173 coroapi.nready = coro_nready;
3174 coroapi.current = coro_current;
3175
3176 /*GCoroAPI = &coroapi;*/
3177 sv_setiv (sv, (IV)&coroapi);
3178 SvREADONLY_on (sv);
3179 }
3180}
3181
3182void
3183terminate (...)
3184 CODE:
3185 CORO_EXECUTE_SLF_XS (slf_init_terminate);
3186
3187void
3188schedule (...)
3189 CODE:
3190 CORO_EXECUTE_SLF_XS (slf_init_schedule);
3191
3192void
3193schedule_to (...)
3194 CODE:
3195 CORO_EXECUTE_SLF_XS (slf_init_schedule_to);
3196
3197void
3198cede_to (...)
3199 CODE:
3200 CORO_EXECUTE_SLF_XS (slf_init_cede_to);
3201
3202void
3203cede (...)
3204 CODE:
3205 CORO_EXECUTE_SLF_XS (slf_init_cede);
3206
3207void
3208cede_notself (...)
3209 CODE:
3210 CORO_EXECUTE_SLF_XS (slf_init_cede_notself);
3211
3212void
3213_cancel (Coro::State self)
3214 CODE:
3215 coro_state_destroy (aTHX_ self);
3216 coro_call_on_destroy (aTHX_ self);
3217
3218void
3219_set_current (SV *current)
3220 PROTOTYPE: $
3221 CODE:
3222 SvREFCNT_dec (SvRV (coro_current));
3223 SvRV_set (coro_current, SvREFCNT_inc_NN (SvRV (current)));
3224
3225void
3226_set_readyhook (SV *hook)
3227 PROTOTYPE: $
3228 CODE:
3229 SvREFCNT_dec (coro_readyhook);
3230 coro_readyhook = SvOK (hook) ? newSVsv (hook) : 0;
3231
3232int
3233prio (Coro::State coro, int newprio = 0)
3234 PROTOTYPE: $;$
3235 ALIAS:
3236 nice = 1
3237 CODE:
3238{
3239 RETVAL = coro->prio;
3240
3241 if (items > 1)
3242 {
3243 if (ix)
3244 newprio = coro->prio - newprio;
3245
3246 if (newprio < PRIO_MIN) newprio = PRIO_MIN;
3247 if (newprio > PRIO_MAX) newprio = PRIO_MAX;
3248
3249 coro->prio = newprio;
3250 }
3251}
3252 OUTPUT:
3253 RETVAL
3254
3255SV *
3256ready (SV *self)
3257 PROTOTYPE: $
3258 CODE:
3259 RETVAL = boolSV (api_ready (aTHX_ self));
3260 OUTPUT:
3261 RETVAL
3262
3263int
3264nready (...)
3265 PROTOTYPE:
3266 CODE:
3267 RETVAL = coro_nready;
3268 OUTPUT:
3269 RETVAL
3270
3271void
3272_pool_handler (...)
3273 CODE:
3274 CORO_EXECUTE_SLF_XS (slf_init_pool_handler);
3275
3276void
3277async_pool (SV *cv, ...)
3278 PROTOTYPE: &@
3279 PPCODE:
3280{
3281 HV *hv = (HV *)av_pop (av_async_pool);
3282 AV *av = newAV ();
3283 SV *cb = ST (0);
3284 int i;
3285
3286 av_extend (av, items - 2);
3287 for (i = 1; i < items; ++i)
3288 av_push (av, SvREFCNT_inc_NN (ST (i)));
3289
3290 if ((SV *)hv == &PL_sv_undef)
3291 {
3292 PUSHMARK (SP);
3293 EXTEND (SP, 2);
3294 PUSHs (sv_Coro);
3295 PUSHs ((SV *)cv_pool_handler);
430 PUTBACK; 3296 PUTBACK;
431 SAVE(aTHX_ (&temp)); 3297 call_sv ((SV *)cv_coro_state_new, G_SCALAR);
432 LOAD(aTHX_ coro);
433
434 S_nuke_stacks ();
435 SvREFCNT_dec ((SV *)GvAV (PL_defgv));
436
437 LOAD((&temp));
438 SPAGAIN; 3298 SPAGAIN;
3299
3300 hv = (HV *)SvREFCNT_inc_NN (SvRV (POPs));
439 } 3301 }
440 3302
3303 {
3304 struct coro *coro = SvSTATE_hv (hv);
3305
3306 assert (!coro->invoke_cb);
3307 assert (!coro->invoke_av);
3308 coro->invoke_cb = SvREFCNT_inc (cb);
3309 coro->invoke_av = av;
3310 }
3311
3312 api_ready (aTHX_ (SV *)hv);
3313
3314 if (GIMME_V != G_VOID)
3315 XPUSHs (sv_2mortal (newRV_noinc ((SV *)hv)));
3316 else
441 SvREFCNT_dec (coro->args); 3317 SvREFCNT_dec (hv);
442 Safefree (coro); 3318}
443 3319
3320SV *
3321rouse_cb ()
3322 PROTOTYPE:
3323 CODE:
3324 RETVAL = coro_new_rouse_cb (aTHX);
3325 OUTPUT:
3326 RETVAL
444 3327
3328void
3329rouse_wait (...)
3330 PROTOTYPE: ;$
3331 PPCODE:
3332 CORO_EXECUTE_SLF_XS (slf_init_rouse_wait);
3333
3334
3335MODULE = Coro::State PACKAGE = PerlIO::cede
3336
3337BOOT:
3338 PerlIO_define_layer (aTHX_ &PerlIO_cede);
3339
3340
3341MODULE = Coro::State PACKAGE = Coro::Semaphore
3342
3343SV *
3344new (SV *klass, SV *count = 0)
3345 CODE:
3346 RETVAL = sv_bless (
3347 coro_waitarray_new (aTHX_ count && SvOK (count) ? SvIV (count) : 1),
3348 GvSTASH (CvGV (cv))
3349 );
3350 OUTPUT:
3351 RETVAL
3352
3353# helper for Coro::Channel
3354SV *
3355_alloc (int count)
3356 CODE:
3357 RETVAL = coro_waitarray_new (aTHX_ count);
3358 OUTPUT:
3359 RETVAL
3360
3361SV *
3362count (SV *self)
3363 CODE:
3364 RETVAL = newSVsv (AvARRAY ((AV *)SvRV (self))[0]);
3365 OUTPUT:
3366 RETVAL
3367
3368void
3369up (SV *self, int adjust = 1)
3370 ALIAS:
3371 adjust = 1
3372 CODE:
3373 coro_semaphore_adjust (aTHX_ (AV *)SvRV (self), ix ? adjust : 1);
3374
3375void
3376down (...)
3377 CODE:
3378 CORO_EXECUTE_SLF_XS (slf_init_semaphore_down);
3379
3380void
3381wait (...)
3382 CODE:
3383 CORO_EXECUTE_SLF_XS (slf_init_semaphore_wait);
3384
3385void
3386try (SV *self)
3387 PPCODE:
3388{
3389 AV *av = (AV *)SvRV (self);
3390 SV *count_sv = AvARRAY (av)[0];
3391 IV count = SvIVX (count_sv);
3392
3393 if (count > 0)
3394 {
3395 --count;
3396 SvIVX (count_sv) = count;
3397 XSRETURN_YES;
3398 }
3399 else
3400 XSRETURN_NO;
3401}
3402
3403void
3404waiters (SV *self)
3405 PPCODE:
3406{
3407 AV *av = (AV *)SvRV (self);
3408 int wcount = AvFILLp (av) + 1 - 1;
3409
3410 if (GIMME_V == G_SCALAR)
3411 XPUSHs (sv_2mortal (newSViv (wcount)));
3412 else
3413 {
3414 int i;
3415 EXTEND (SP, wcount);
3416 for (i = 1; i <= wcount; ++i)
3417 PUSHs (sv_2mortal (newRV_inc (AvARRAY (av)[i])));
3418 }
3419}
3420
3421MODULE = Coro::State PACKAGE = Coro::Signal
3422
3423SV *
3424new (SV *klass)
3425 CODE:
3426 RETVAL = sv_bless (
3427 coro_waitarray_new (aTHX_ 0),
3428 GvSTASH (CvGV (cv))
3429 );
3430 OUTPUT:
3431 RETVAL
3432
3433void
3434wait (...)
3435 CODE:
3436 CORO_EXECUTE_SLF_XS (slf_init_signal_wait);
3437
3438void
3439broadcast (SV *self)
3440 CODE:
3441{
3442 AV *av = (AV *)SvRV (self);
3443 coro_signal_wake (aTHX_ av, AvFILLp (av));
3444}
3445
3446void
3447send (SV *self)
3448 CODE:
3449{
3450 AV *av = (AV *)SvRV (self);
3451
3452 if (AvFILLp (av))
3453 coro_signal_wake (aTHX_ av, 1);
3454 else
3455 SvIVX (AvARRAY (av)[0]) = 1; /* remember the signal */
3456}
3457
3458IV
3459awaited (SV *self)
3460 CODE:
3461 RETVAL = AvFILLp ((AV *)SvRV (self)) + 1 - 1;
3462 OUTPUT:
3463 RETVAL
3464
3465
3466MODULE = Coro::State PACKAGE = Coro::AnyEvent
3467
3468BOOT:
3469 sv_activity = coro_get_sv (aTHX_ "Coro::AnyEvent::ACTIVITY", TRUE);
3470
3471void
3472_schedule (...)
3473 CODE:
3474{
3475 static int incede;
3476
3477 api_cede_notself (aTHX);
3478
3479 ++incede;
3480 while (coro_nready >= incede && api_cede (aTHX))
3481 ;
3482
3483 sv_setsv (sv_activity, &PL_sv_undef);
3484 if (coro_nready >= incede)
3485 {
3486 PUSHMARK (SP);
3487 PUTBACK;
3488 call_pv ("Coro::AnyEvent::_activity", G_KEEPERR | G_EVAL | G_VOID | G_DISCARD);
3489 }
3490
3491 --incede;
3492}
3493
3494
3495MODULE = Coro::State PACKAGE = Coro::AIO
3496
3497void
3498_register (char *target, char *proto, SV *req)
3499 CODE:
3500{
3501 CV *req_cv = coro_sv_2cv (aTHX_ req);
3502 /* newXSproto doesn't return the CV on 5.8 */
3503 CV *slf_cv = newXS (target, coro_aio_req_xs, __FILE__);
3504 sv_setpv ((SV *)slf_cv, proto);
3505 sv_magicext ((SV *)slf_cv, (SV *)req_cv, CORO_MAGIC_type_aio, 0, 0, 0);
3506}
3507

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines