ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/Coro/Coro/State.xs
(Generate patch)

Comparing Coro/Coro/State.xs (file contents):
Revision 1.6 by root, Tue Jul 17 15:42:28 2001 UTC vs.
Revision 1.346 by root, Tue Jun 9 18:56:45 2009 UTC

1#include "libcoro/coro.c"
2
3#define PERL_NO_GET_CONTEXT
4#define PERL_EXT
5
1#include "EXTERN.h" 6#include "EXTERN.h"
2#include "perl.h" 7#include "perl.h"
3#include "XSUB.h" 8#include "XSUB.h"
9#include "perliol.h"
4 10
5#if 0 11#include "patchlevel.h"
6# define CHK(x) (void *)0 12
13#include <stdio.h>
14#include <errno.h>
15#include <assert.h>
16
17#ifdef WIN32
18# undef setjmp
19# undef longjmp
20# undef _exit
21# define setjmp _setjmp /* deep magic */
7#else 22#else
8# define CHK(x) if (!(x)) croak("FATAL, CHK: " #x) 23# include <inttypes.h> /* most portable stdint.h */
24#endif
25
26#ifdef HAVE_MMAP
27# include <unistd.h>
28# include <sys/mman.h>
29# ifndef MAP_ANONYMOUS
30# ifdef MAP_ANON
31# define MAP_ANONYMOUS MAP_ANON
32# else
33# undef HAVE_MMAP
34# endif
9#endif 35# endif
36# include <limits.h>
37# ifndef PAGESIZE
38# define PAGESIZE pagesize
39# define BOOT_PAGESIZE pagesize = sysconf (_SC_PAGESIZE)
40static long pagesize;
41# else
42# define BOOT_PAGESIZE (void)0
43# endif
44#else
45# define PAGESIZE 0
46# define BOOT_PAGESIZE (void)0
47#endif
10 48
49#if CORO_USE_VALGRIND
50# include <valgrind/valgrind.h>
51#endif
52
53/* the maximum number of idle cctx that will be pooled */
54static int cctx_max_idle = 4;
55
56#define PERL_VERSION_ATLEAST(a,b,c) \
57 (PERL_REVISION > (a) \
58 || (PERL_REVISION == (a) \
59 && (PERL_VERSION > (b) \
60 || (PERL_VERSION == (b) && PERL_SUBVERSION >= (c)))))
61
62#if !PERL_VERSION_ATLEAST (5,6,0)
63# ifndef PL_ppaddr
64# define PL_ppaddr ppaddr
65# endif
66# ifndef call_sv
67# define call_sv perl_call_sv
68# endif
69# ifndef get_sv
70# define get_sv perl_get_sv
71# endif
72# ifndef get_cv
73# define get_cv perl_get_cv
74# endif
75# ifndef IS_PADGV
76# define IS_PADGV(v) 0
77# endif
78# ifndef IS_PADCONST
79# define IS_PADCONST(v) 0
80# endif
81#endif
82
83/* 5.11 */
84#ifndef CxHASARGS
85# define CxHASARGS(cx) (cx)->blk_sub.hasargs
86#endif
87
88/* 5.10.0 */
89#ifndef SvREFCNT_inc_NN
90# define SvREFCNT_inc_NN(sv) SvREFCNT_inc (sv)
91#endif
92
93/* 5.8.8 */
94#ifndef GV_NOTQUAL
95# define GV_NOTQUAL 0
96#endif
97#ifndef newSV
98# define newSV(l) NEWSV(0,l)
99#endif
100#ifndef CvISXSUB_on
101# define CvISXSUB_on(cv) (void)cv
102#endif
103#ifndef CvISXSUB
104# define CvISXSUB(cv) (CvXSUB (cv) ? TRUE : FALSE)
105#endif
106#ifndef Newx
107# define Newx(ptr,nitems,type) New (0,ptr,nitems,type)
108#endif
109
110/* 5.8.7 */
111#ifndef SvRV_set
112# define SvRV_set(s,v) SvRV(s) = (v)
113#endif
114
115#if !__i386 && !__x86_64 && !__powerpc && !__m68k && !__alpha && !__mips && !__sparc64
116# undef CORO_STACKGUARD
117#endif
118
119#ifndef CORO_STACKGUARD
120# define CORO_STACKGUARD 0
121#endif
122
123/* prefer perl internal functions over our own? */
124#ifndef CORO_PREFER_PERL_FUNCTIONS
125# define CORO_PREFER_PERL_FUNCTIONS 0
126#endif
127
128/* The next macros try to return the current stack pointer, in an as
129 * portable way as possible. */
130#if __GNUC__ >= 4
131# define dSTACKLEVEL int stacklevel_dummy
132# define STACKLEVEL __builtin_frame_address (0)
133#else
134# define dSTACKLEVEL volatile void *stacklevel
135# define STACKLEVEL ((void *)&stacklevel)
136#endif
137
138#define IN_DESTRUCT PL_dirty
139
140#if __GNUC__ >= 3
141# define attribute(x) __attribute__(x)
142# define expect(expr,value) __builtin_expect ((expr),(value))
143# define INLINE static inline
144#else
145# define attribute(x)
146# define expect(expr,value) (expr)
147# define INLINE static
148#endif
149
150#define expect_false(expr) expect ((expr) != 0, 0)
151#define expect_true(expr) expect ((expr) != 0, 1)
152
153#define NOINLINE attribute ((noinline))
154
155#include "CoroAPI.h"
156#define GCoroAPI (&coroapi) /* very sneaky */
157
158#ifdef USE_ITHREADS
159# if CORO_PTHREAD
160static void *coro_thx;
161# endif
162#endif
163
164static double (*nvtime)(); /* so why doesn't it take void? */
165
166/* we hijack an hopefully unused CV flag for our purposes */
167#define CVf_SLF 0x4000
168static OP *pp_slf (pTHX);
169
170static U32 cctx_gen;
171static size_t cctx_stacksize = CORO_STACKSIZE;
172static struct CoroAPI coroapi;
173static AV *main_mainstack; /* used to differentiate between $main and others */
174static JMPENV *main_top_env;
175static HV *coro_state_stash, *coro_stash;
176static volatile SV *coro_mortal; /* will be freed/thrown after next transfer */
177
178static AV *av_destroy; /* destruction queue */
179static SV *sv_manager; /* the manager coro */
180static SV *sv_idle; /* $Coro::idle */
181
182static GV *irsgv; /* $/ */
183static GV *stdoutgv; /* *STDOUT */
184static SV *rv_diehook;
185static SV *rv_warnhook;
186static HV *hv_sig; /* %SIG */
187
188/* async_pool helper stuff */
189static SV *sv_pool_rss;
190static SV *sv_pool_size;
191static SV *sv_async_pool_idle; /* description string */
192static AV *av_async_pool; /* idle pool */
193static SV *sv_Coro; /* class string */
194static CV *cv_pool_handler;
195static CV *cv_coro_state_new;
196
197/* Coro::AnyEvent */
198static SV *sv_activity;
199
200static struct coro_cctx *cctx_first;
201static int cctx_count, cctx_idle;
202
203enum {
204 CC_MAPPED = 0x01,
205 CC_NOREUSE = 0x02, /* throw this away after tracing */
206 CC_TRACE = 0x04,
207 CC_TRACE_SUB = 0x08, /* trace sub calls */
208 CC_TRACE_LINE = 0x10, /* trace each statement */
209 CC_TRACE_ALL = CC_TRACE_SUB | CC_TRACE_LINE,
210};
211
212/* this is a structure representing a c-level coroutine */
213typedef struct coro_cctx
214{
215 struct coro_cctx *next;
216
217 /* the stack */
218 void *sptr;
219 size_t ssize;
220
221 /* cpu state */
222 void *idle_sp; /* sp of top-level transfer/schedule/cede call */
223 JMPENV *idle_te; /* same as idle_sp, but for top_env, TODO: remove once stable */
224 JMPENV *top_env;
225 coro_context cctx;
226
227 U32 gen;
228#if CORO_USE_VALGRIND
229 int valgrind_id;
230#endif
231 unsigned char flags;
232} coro_cctx;
233
234coro_cctx *cctx_current; /* the currently running cctx */
235
236/*****************************************************************************/
237
238enum {
239 CF_RUNNING = 0x0001, /* coroutine is running */
240 CF_READY = 0x0002, /* coroutine is ready */
241 CF_NEW = 0x0004, /* has never been switched to */
242 CF_DESTROYED = 0x0008, /* coroutine data has been freed */
243 CF_SUSPENDED = 0x0010, /* coroutine can't be scheduled */
244};
245
246/* the structure where most of the perl state is stored, overlaid on the cxstack */
247typedef struct
248{
249 SV *defsv;
250 AV *defav;
251 SV *errsv;
252 SV *irsgv;
253 HV *hinthv;
254#define VAR(name,type) type name;
255# include "state.h"
256#undef VAR
257} perl_slots;
258
259#define SLOT_COUNT ((sizeof (perl_slots) + sizeof (PERL_CONTEXT) - 1) / sizeof (PERL_CONTEXT))
260
261/* this is a structure representing a perl-level coroutine */
11struct coro { 262struct coro {
12 U8 dowarn; 263 /* the C coroutine allocated to this perl coroutine, if any */
13 AV *defav; 264 coro_cctx *cctx;
14 265
15 PERL_SI *curstackinfo; 266 /* state data */
16 AV *curstack; 267 struct CoroSLF slf_frame; /* saved slf frame */
17 AV *mainstack; 268 AV *mainstack;
18 SV **stack_sp; 269 perl_slots *slot; /* basically the saved sp */
19 OP *op;
20 SV **curpad;
21 SV **stack_base;
22 SV **stack_max;
23 SV **tmps_stack;
24 I32 tmps_floor;
25 I32 tmps_ix;
26 I32 tmps_max;
27 I32 *markstack;
28 I32 *markstack_ptr;
29 I32 *markstack_max;
30 I32 *scopestack;
31 I32 scopestack_ix;
32 I32 scopestack_max;
33 ANY *savestack;
34 I32 savestack_ix;
35 I32 savestack_max;
36 OP **retstack;
37 I32 retstack_ix;
38 I32 retstack_max;
39 COP *curcop;
40 270
41 AV *args; 271 CV *startcv; /* the CV to execute */
272 AV *args; /* data associated with this coroutine (initial args) */
273 int refcnt; /* coroutines are refcounted, yes */
274 int flags; /* CF_ flags */
275 HV *hv; /* the perl hash associated with this coro, if any */
276 void (*on_destroy)(pTHX_ struct coro *coro);
277
278 /* statistics */
279 int usecount; /* number of transfers to this coro */
280
281 /* coro process data */
282 int prio;
283 SV *except; /* exception to be thrown */
284 SV *rouse_cb;
285
286 /* async_pool */
287 SV *saved_deffh;
288 SV *invoke_cb;
289 AV *invoke_av;
290
291 /* on_enter/on_leave */
292 AV *on_enter;
293 AV *on_leave;
294
295 /* linked list */
296 struct coro *next, *prev;
42}; 297};
43 298
44typedef struct coro *Coro__State; 299typedef struct coro *Coro__State;
45typedef struct coro *Coro__State_or_hashref; 300typedef struct coro *Coro__State_or_hashref;
46 301
47static HV *padlist_cache; 302/* the following variables are effectively part of the perl context */
303/* and get copied between struct coro and these variables */
304/* the mainr easonw e don't support windows process emulation */
305static struct CoroSLF slf_frame; /* the current slf frame */
48 306
49/* mostly copied from op.c:cv_clone2 */ 307/** Coro ********************************************************************/
50STATIC AV * 308
51clone_padlist (AV *protopadlist) 309#define PRIO_MAX 3
310#define PRIO_HIGH 1
311#define PRIO_NORMAL 0
312#define PRIO_LOW -1
313#define PRIO_IDLE -3
314#define PRIO_MIN -4
315
316/* for Coro.pm */
317static SV *coro_current;
318static SV *coro_readyhook;
319static AV *coro_ready [PRIO_MAX - PRIO_MIN + 1];
320static CV *cv_coro_run, *cv_coro_terminate;
321static struct coro *coro_first;
322#define coro_nready coroapi.nready
323
324/** lowlevel stuff **********************************************************/
325
326static SV *
327coro_get_sv (pTHX_ const char *name, int create)
52{ 328{
53 AV *av; 329#if PERL_VERSION_ATLEAST (5,10,0)
54 I32 ix; 330 /* silence stupid and wrong 5.10 warning that I am unable to switch off */
55 AV *protopad_name = (AV *) * av_fetch (protopadlist, 0, FALSE); 331 get_sv (name, create);
56 AV *protopad = (AV *) * av_fetch (protopadlist, 1, FALSE); 332#endif
57 SV **pname = AvARRAY (protopad_name); 333 return get_sv (name, create);
58 SV **ppad = AvARRAY (protopad); 334}
59 I32 fname = AvFILLp (protopad_name); 335
60 I32 fpad = AvFILLp (protopad); 336static AV *
337coro_get_av (pTHX_ const char *name, int create)
338{
339#if PERL_VERSION_ATLEAST (5,10,0)
340 /* silence stupid and wrong 5.10 warning that I am unable to switch off */
341 get_av (name, create);
342#endif
343 return get_av (name, create);
344}
345
346static HV *
347coro_get_hv (pTHX_ const char *name, int create)
348{
349#if PERL_VERSION_ATLEAST (5,10,0)
350 /* silence stupid and wrong 5.10 warning that I am unable to switch off */
351 get_hv (name, create);
352#endif
353 return get_hv (name, create);
354}
355
356/* may croak */
357INLINE CV *
358coro_sv_2cv (pTHX_ SV *sv)
359{
360 HV *st;
361 GV *gvp;
362 CV *cv = sv_2cv (sv, &st, &gvp, 0);
363
364 if (!cv)
365 croak ("code reference expected");
366
367 return cv;
368}
369
370/*****************************************************************************/
371/* magic glue */
372
373#define CORO_MAGIC_type_cv 26
374#define CORO_MAGIC_type_state PERL_MAGIC_ext
375
376#define CORO_MAGIC_NN(sv, type) \
377 (expect_true (SvMAGIC (sv)->mg_type == type) \
378 ? SvMAGIC (sv) \
379 : mg_find (sv, type))
380
381#define CORO_MAGIC(sv, type) \
382 (expect_true (SvMAGIC (sv)) \
383 ? CORO_MAGIC_NN (sv, type) \
384 : 0)
385
386#define CORO_MAGIC_cv(cv) CORO_MAGIC (((SV *)(cv)), CORO_MAGIC_type_cv)
387#define CORO_MAGIC_state(sv) CORO_MAGIC_NN (((SV *)(sv)), CORO_MAGIC_type_state)
388
389INLINE struct coro *
390SvSTATE_ (pTHX_ SV *coro)
391{
392 HV *stash;
393 MAGIC *mg;
394
395 if (SvROK (coro))
396 coro = SvRV (coro);
397
398 if (expect_false (SvTYPE (coro) != SVt_PVHV))
399 croak ("Coro::State object required");
400
401 stash = SvSTASH (coro);
402 if (expect_false (stash != coro_stash && stash != coro_state_stash))
403 {
404 /* very slow, but rare, check */
405 if (!sv_derived_from (sv_2mortal (newRV_inc (coro)), "Coro::State"))
406 croak ("Coro::State object required");
407 }
408
409 mg = CORO_MAGIC_state (coro);
410 return (struct coro *)mg->mg_ptr;
411}
412
413#define SvSTATE(sv) SvSTATE_ (aTHX_ (sv))
414
415/* faster than SvSTATE, but expects a coroutine hv */
416#define SvSTATE_hv(hv) ((struct coro *)CORO_MAGIC_NN ((SV *)hv, CORO_MAGIC_type_state)->mg_ptr)
417#define SvSTATE_current SvSTATE_hv (SvRV (coro_current))
418
419/*****************************************************************************/
420/* padlist management and caching */
421
422static AV *
423coro_derive_padlist (pTHX_ CV *cv)
424{
425 AV *padlist = CvPADLIST (cv);
61 AV *newpadlist, *newpad_name, *newpad; 426 AV *newpadlist, *newpad;
62 SV **npad;
63
64 newpad_name = newAV ();
65 for (ix = fname; ix >= 0; ix--)
66 av_store (newpad_name, ix, SvREFCNT_inc (pname[ix]));
67
68 newpad = newAV ();
69 av_fill (newpad, AvFILLp (protopad));
70 npad = AvARRAY (newpad);
71 427
72 newpadlist = newAV (); 428 newpadlist = newAV ();
73 AvREAL_off (newpadlist); 429 AvREAL_off (newpadlist);
74 av_store (newpadlist, 0, (SV *) newpad_name); 430#if PERL_VERSION_ATLEAST (5,10,0)
431 Perl_pad_push (aTHX_ padlist, AvFILLp (padlist) + 1);
432#else
433 Perl_pad_push (aTHX_ padlist, AvFILLp (padlist) + 1, 1);
434#endif
435 newpad = (AV *)AvARRAY (padlist)[AvFILLp (padlist)];
436 --AvFILLp (padlist);
437
438 av_store (newpadlist, 0, SvREFCNT_inc_NN (AvARRAY (padlist)[0]));
75 av_store (newpadlist, 1, (SV *) newpad); 439 av_store (newpadlist, 1, (SV *)newpad);
76 440
77 av = newAV (); /* will be @_ */ 441 return newpadlist;
78 av_extend (av, 0); 442}
79 av_store (newpad, 0, (SV *) av);
80 AvFLAGS (av) = AVf_REIFY;
81 443
82 for (ix = fpad; ix > 0; ix--) 444static void
445free_padlist (pTHX_ AV *padlist)
446{
447 /* may be during global destruction */
448 if (!IN_DESTRUCT)
83 { 449 {
84 SV *namesv = (ix <= fname) ? pname[ix] : Nullsv; 450 I32 i = AvFILLp (padlist);
85 if (namesv && namesv != &PL_sv_undef) 451
452 while (i > 0) /* special-case index 0 */
86 { 453 {
87 char *name = SvPVX (namesv); /* XXX */ 454 /* we try to be extra-careful here */
88 if (SvFLAGS (namesv) & SVf_FAKE || *name == '&') 455 AV *av = (AV *)AvARRAY (padlist)[i--];
89 { /* lexical from outside? */ 456 I32 j = AvFILLp (av);
90 npad[ix] = SvREFCNT_inc (ppad[ix]); 457
91 } 458 while (j >= 0)
92 else 459 SvREFCNT_dec (AvARRAY (av)[j--]);
93 { /* our own lexical */ 460
94 SV *sv; 461 AvFILLp (av) = -1;
95 if (*name == '&') 462 SvREFCNT_dec (av);
96 sv = SvREFCNT_inc (ppad[ix]);
97 else if (*name == '@')
98 sv = (SV *) newAV ();
99 else if (*name == '%')
100 sv = (SV *) newHV ();
101 else
102 sv = NEWSV (0, 0);
103 if (!SvPADBUSY (sv))
104 SvPADMY_on (sv);
105 npad[ix] = sv;
106 }
107 } 463 }
108 else if (IS_PADGV (ppad[ix]) || IS_PADCONST (ppad[ix]))
109 {
110 npad[ix] = SvREFCNT_inc (ppad[ix]);
111 }
112 else
113 {
114 SV *sv = NEWSV (0, 0);
115 SvPADTMP_on (sv);
116 npad[ix] = sv;
117 }
118 }
119 464
120#if 0 /* NONOTUNDERSTOOD */ 465 SvREFCNT_dec (AvARRAY (padlist)[0]);
121 /* Now that vars are all in place, clone nested closures. */
122 466
123 for (ix = fpad; ix > 0; ix--) {
124 SV* namesv = (ix <= fname) ? pname[ix] : Nullsv;
125 if (namesv
126 && namesv != &PL_sv_undef
127 && !(SvFLAGS(namesv) & SVf_FAKE)
128 && *SvPVX(namesv) == '&'
129 && CvCLONE(ppad[ix]))
130 {
131 CV *kid = cv_clone((CV*)ppad[ix]);
132 SvREFCNT_dec(ppad[ix]);
133 CvCLONE_on(kid);
134 SvPADMY_on(kid);
135 npad[ix] = (SV*)kid;
136 }
137 }
138#endif
139
140 return newpadlist;
141}
142
143STATIC AV *
144free_padlist (AV *padlist)
145{
146 /* may be during global destruction */
147 if (SvREFCNT(padlist))
148 {
149 I32 i = AvFILLp(padlist); 467 AvFILLp (padlist) = -1;
150 while (i >= 0)
151 {
152 SV **svp = av_fetch(padlist, i--, FALSE);
153 SV *sv = svp ? *svp : Nullsv;
154 if (sv)
155 SvREFCNT_dec(sv);
156 }
157
158 SvREFCNT_dec((SV*)padlist); 468 SvREFCNT_dec ((SV*)padlist);
469 }
470}
471
472static int
473coro_cv_free (pTHX_ SV *sv, MAGIC *mg)
474{
475 AV *padlist;
476 AV *av = (AV *)mg->mg_obj;
477
478 /* casting is fun. */
479 while (&PL_sv_undef != (SV *)(padlist = (AV *)av_pop (av)))
480 free_padlist (aTHX_ padlist);
481
482 SvREFCNT_dec (av); /* sv_magicext increased the refcount */
483
484 return 0;
485}
486
487static MGVTBL coro_cv_vtbl = {
488 0, 0, 0, 0,
489 coro_cv_free
490};
491
492/* the next two functions merely cache the padlists */
493static void
494get_padlist (pTHX_ CV *cv)
495{
496 MAGIC *mg = CORO_MAGIC_cv (cv);
497 AV *av;
498
499 if (expect_true (mg && AvFILLp ((av = (AV *)mg->mg_obj)) >= 0))
500 CvPADLIST (cv) = (AV *)AvARRAY (av)[AvFILLp (av)--];
501 else
502 {
503#if CORO_PREFER_PERL_FUNCTIONS
504 /* this is probably cleaner? but also slower! */
505 /* in practise, it seems to be less stable */
506 CV *cp = Perl_cv_clone (aTHX_ cv);
507 CvPADLIST (cv) = CvPADLIST (cp);
508 CvPADLIST (cp) = 0;
509 SvREFCNT_dec (cp);
510#else
511 CvPADLIST (cv) = coro_derive_padlist (aTHX_ cv);
512#endif
513 }
514}
515
516static void
517put_padlist (pTHX_ CV *cv)
518{
519 MAGIC *mg = CORO_MAGIC_cv (cv);
520 AV *av;
521
522 if (expect_false (!mg))
523 mg = sv_magicext ((SV *)cv, (SV *)newAV (), CORO_MAGIC_type_cv, &coro_cv_vtbl, 0, 0);
524
525 av = (AV *)mg->mg_obj;
526
527 if (expect_false (AvFILLp (av) >= AvMAX (av)))
528 av_extend (av, AvFILLp (av) + 1);
529
530 AvARRAY (av)[++AvFILLp (av)] = (SV *)CvPADLIST (cv);
531}
532
533/** load & save, init *******************************************************/
534
535static void
536on_enterleave_call (pTHX_ SV *cb);
537
538static void
539load_perl (pTHX_ Coro__State c)
540{
541 perl_slots *slot = c->slot;
542 c->slot = 0;
543
544 PL_mainstack = c->mainstack;
545
546 GvSV (PL_defgv) = slot->defsv;
547 GvAV (PL_defgv) = slot->defav;
548 GvSV (PL_errgv) = slot->errsv;
549 GvSV (irsgv) = slot->irsgv;
550 GvHV (PL_hintgv) = slot->hinthv;
551
552 #define VAR(name,type) PL_ ## name = slot->name;
553 # include "state.h"
554 #undef VAR
555
556 {
557 dSP;
558
559 CV *cv;
560
561 /* now do the ugly restore mess */
562 while (expect_true (cv = (CV *)POPs))
563 {
564 put_padlist (aTHX_ cv); /* mark this padlist as available */
565 CvDEPTH (cv) = PTR2IV (POPs);
566 CvPADLIST (cv) = (AV *)POPs;
567 }
568
569 PUTBACK;
159 } 570 }
160}
161 571
162/* the next tow functions merely cache the padlists */ 572 slf_frame = c->slf_frame;
163STATIC void 573 CORO_THROW = c->except;
164get_padlist (CV *cv)
165{
166 SV **he = hv_fetch (padlist_cache, (void *)&cv, sizeof (CV *), 0);
167 574
168 if (he && AvFILLp ((AV *)*he) >= 0) 575 if (expect_false (c->on_enter))
169 CvPADLIST (cv) = (AV *)av_pop ((AV *)*he);
170 else
171 CvPADLIST (cv) = clone_padlist (CvPADLIST (cv));
172}
173
174STATIC void
175put_padlist (CV *cv)
176{
177 SV **he = hv_fetch (padlist_cache, (void *)&cv, sizeof (CV *), 1);
178
179 if (SvTYPE (*he) != SVt_PVAV)
180 {
181 SvREFCNT_dec (*he);
182 *he = (SV *)newAV ();
183 } 576 {
577 int i;
184 578
185 av_push ((AV *)*he, (SV *)CvPADLIST (cv)); 579 for (i = 0; i <= AvFILLp (c->on_enter); ++i)
580 on_enterleave_call (aTHX_ AvARRAY (c->on_enter)[i]);
581 }
186} 582}
187 583
188static void 584static void
189save_state(pTHX_ Coro__State c) 585save_perl (pTHX_ Coro__State c)
190{ 586{
587 if (expect_false (c->on_leave))
588 {
589 int i;
590
591 for (i = AvFILLp (c->on_leave); i >= 0; --i)
592 on_enterleave_call (aTHX_ AvARRAY (c->on_leave)[i]);
593 }
594
595 c->except = CORO_THROW;
596 c->slf_frame = slf_frame;
597
191 { 598 {
192 dSP; 599 dSP;
193 I32 cxix = cxstack_ix; 600 I32 cxix = cxstack_ix;
601 PERL_CONTEXT *ccstk = cxstack;
194 PERL_SI *top_si = PL_curstackinfo; 602 PERL_SI *top_si = PL_curstackinfo;
195 PERL_CONTEXT *ccstk = cxstack;
196 603
197 /* 604 /*
198 * the worst thing you can imagine happens first - we have to save 605 * the worst thing you can imagine happens first - we have to save
199 * (and reinitialize) all cv's in the whole callchain :( 606 * (and reinitialize) all cv's in the whole callchain :(
200 */ 607 */
201 608
202 PUSHs (Nullsv); 609 XPUSHs (Nullsv);
203 /* this loop was inspired by pp_caller */ 610 /* this loop was inspired by pp_caller */
204 for (;;) 611 for (;;)
205 { 612 {
206 while (cxix >= 0) 613 while (expect_true (cxix >= 0))
207 { 614 {
208 PERL_CONTEXT *cx = &ccstk[cxix--]; 615 PERL_CONTEXT *cx = &ccstk[cxix--];
209 616
210 if (CxTYPE(cx) == CXt_SUB) 617 if (expect_true (CxTYPE (cx) == CXt_SUB || CxTYPE (cx) == CXt_FORMAT))
211 { 618 {
212 CV *cv = cx->blk_sub.cv; 619 CV *cv = cx->blk_sub.cv;
620
213 if (CvDEPTH(cv)) 621 if (expect_true (CvDEPTH (cv)))
214 { 622 {
215#ifdef USE_THREADS
216 XPUSHs ((SV *)CvOWNER(cv));
217#endif
218 EXTEND (SP, 3); 623 EXTEND (SP, 3);
219 PUSHs ((SV *)CvDEPTH(cv));
220 PUSHs ((SV *)CvPADLIST(cv)); 624 PUSHs ((SV *)CvPADLIST (cv));
625 PUSHs (INT2PTR (SV *, (IV)CvDEPTH (cv)));
221 PUSHs ((SV *)cv); 626 PUSHs ((SV *)cv);
222 627
223 get_padlist (cv);
224
225 CvDEPTH(cv) = 0; 628 CvDEPTH (cv) = 0;
226#ifdef USE_THREADS 629 get_padlist (aTHX_ cv);
227 CvOWNER(cv) = 0;
228 error must unlock this cv etc.. etc...
229 if you are here wondering about this error message then
230 the reason is that it will not work as advertised yet
231#endif
232 } 630 }
233 } 631 }
234 else if (CxTYPE(cx) == CXt_FORMAT) 632 }
633
634 if (expect_true (top_si->si_type == PERLSI_MAIN))
635 break;
636
637 top_si = top_si->si_prev;
638 ccstk = top_si->si_cxstack;
639 cxix = top_si->si_cxix;
640 }
641
642 PUTBACK;
643 }
644
645 /* allocate some space on the context stack for our purposes */
646 /* we manually unroll here, as usually 2 slots is enough */
647 if (SLOT_COUNT >= 1) CXINC;
648 if (SLOT_COUNT >= 2) CXINC;
649 if (SLOT_COUNT >= 3) CXINC;
650 {
651 int i;
652 for (i = 3; i < SLOT_COUNT; ++i)
653 CXINC;
654 }
655 cxstack_ix -= SLOT_COUNT; /* undo allocation */
656
657 c->mainstack = PL_mainstack;
658
659 {
660 perl_slots *slot = c->slot = (perl_slots *)(cxstack + cxstack_ix + 1);
661
662 slot->defav = GvAV (PL_defgv);
663 slot->defsv = DEFSV;
664 slot->errsv = ERRSV;
665 slot->irsgv = GvSV (irsgv);
666 slot->hinthv = GvHV (PL_hintgv);
667
668 #define VAR(name,type) slot->name = PL_ ## name;
669 # include "state.h"
670 #undef VAR
671 }
672}
673
674/*
675 * allocate various perl stacks. This is almost an exact copy
676 * of perl.c:init_stacks, except that it uses less memory
677 * on the (sometimes correct) assumption that coroutines do
678 * not usually need a lot of stackspace.
679 */
680#if CORO_PREFER_PERL_FUNCTIONS
681# define coro_init_stacks(thx) init_stacks ()
682#else
683static void
684coro_init_stacks (pTHX)
685{
686 PL_curstackinfo = new_stackinfo(32, 8);
687 PL_curstackinfo->si_type = PERLSI_MAIN;
688 PL_curstack = PL_curstackinfo->si_stack;
689 PL_mainstack = PL_curstack; /* remember in case we switch stacks */
690
691 PL_stack_base = AvARRAY(PL_curstack);
692 PL_stack_sp = PL_stack_base;
693 PL_stack_max = PL_stack_base + AvMAX(PL_curstack);
694
695 New(50,PL_tmps_stack,32,SV*);
696 PL_tmps_floor = -1;
697 PL_tmps_ix = -1;
698 PL_tmps_max = 32;
699
700 New(54,PL_markstack,16,I32);
701 PL_markstack_ptr = PL_markstack;
702 PL_markstack_max = PL_markstack + 16;
703
704#ifdef SET_MARK_OFFSET
705 SET_MARK_OFFSET;
706#endif
707
708 New(54,PL_scopestack,8,I32);
709 PL_scopestack_ix = 0;
710 PL_scopestack_max = 8;
711
712 New(54,PL_savestack,24,ANY);
713 PL_savestack_ix = 0;
714 PL_savestack_max = 24;
715
716#if !PERL_VERSION_ATLEAST (5,10,0)
717 New(54,PL_retstack,4,OP*);
718 PL_retstack_ix = 0;
719 PL_retstack_max = 4;
720#endif
721}
722#endif
723
724/*
725 * destroy the stacks, the callchain etc...
726 */
727static void
728coro_destruct_stacks (pTHX)
729{
730 while (PL_curstackinfo->si_next)
731 PL_curstackinfo = PL_curstackinfo->si_next;
732
733 while (PL_curstackinfo)
734 {
735 PERL_SI *p = PL_curstackinfo->si_prev;
736
737 if (!IN_DESTRUCT)
738 SvREFCNT_dec (PL_curstackinfo->si_stack);
739
740 Safefree (PL_curstackinfo->si_cxstack);
741 Safefree (PL_curstackinfo);
742 PL_curstackinfo = p;
743 }
744
745 Safefree (PL_tmps_stack);
746 Safefree (PL_markstack);
747 Safefree (PL_scopestack);
748 Safefree (PL_savestack);
749#if !PERL_VERSION_ATLEAST (5,10,0)
750 Safefree (PL_retstack);
751#endif
752}
753
754#define CORO_RSS \
755 rss += sizeof (SYM (curstackinfo)); \
756 rss += (SYM (curstackinfo->si_cxmax) + 1) * sizeof (PERL_CONTEXT); \
757 rss += sizeof (SV) + sizeof (struct xpvav) + (1 + AvMAX (SYM (curstack))) * sizeof (SV *); \
758 rss += SYM (tmps_max) * sizeof (SV *); \
759 rss += (SYM (markstack_max) - SYM (markstack_ptr)) * sizeof (I32); \
760 rss += SYM (scopestack_max) * sizeof (I32); \
761 rss += SYM (savestack_max) * sizeof (ANY);
762
763static size_t
764coro_rss (pTHX_ struct coro *coro)
765{
766 size_t rss = sizeof (*coro);
767
768 if (coro->mainstack)
769 {
770 if (coro->flags & CF_RUNNING)
771 {
772 #define SYM(sym) PL_ ## sym
773 CORO_RSS;
774 #undef SYM
775 }
776 else
777 {
778 #define SYM(sym) coro->slot->sym
779 CORO_RSS;
780 #undef SYM
781 }
782 }
783
784 return rss;
785}
786
787/** coroutine stack handling ************************************************/
788
789static int (*orig_sigelem_get) (pTHX_ SV *sv, MAGIC *mg);
790static int (*orig_sigelem_set) (pTHX_ SV *sv, MAGIC *mg);
791static int (*orig_sigelem_clr) (pTHX_ SV *sv, MAGIC *mg);
792
793/* apparently < 5.8.8 */
794#ifndef MgPV_nolen_const
795#define MgPV_nolen_const(mg) (((((int)(mg)->mg_len)) == HEf_SVKEY) ? \
796 SvPV_nolen((SV*)((mg)->mg_ptr)) : \
797 (const char*)(mg)->mg_ptr)
798#endif
799
800/*
801 * This overrides the default magic get method of %SIG elements.
802 * The original one doesn't provide for reading back of PL_diehook/PL_warnhook
803 * and instead of trying to save and restore the hash elements, we just provide
804 * readback here.
805 */
806static int
807coro_sigelem_get (pTHX_ SV *sv, MAGIC *mg)
808{
809 const char *s = MgPV_nolen_const (mg);
810
811 if (*s == '_')
812 {
813 SV **svp = 0;
814
815 if (strEQ (s, "__DIE__" )) svp = &PL_diehook;
816 if (strEQ (s, "__WARN__")) svp = &PL_warnhook;
817
818 if (svp)
819 {
820 sv_setsv (sv, *svp ? *svp : &PL_sv_undef);
821 return 0;
822 }
823 }
824
825 return orig_sigelem_get ? orig_sigelem_get (aTHX_ sv, mg) : 0;
826}
827
828static int
829coro_sigelem_clr (pTHX_ SV *sv, MAGIC *mg)
830{
831 const char *s = MgPV_nolen_const (mg);
832
833 if (*s == '_')
834 {
835 SV **svp = 0;
836
837 if (strEQ (s, "__DIE__" )) svp = &PL_diehook;
838 if (strEQ (s, "__WARN__")) svp = &PL_warnhook;
839
840 if (svp)
841 {
842 SV *old = *svp;
843 *svp = 0;
844 SvREFCNT_dec (old);
845 return 0;
846 }
847 }
848
849 return orig_sigelem_clr ? orig_sigelem_clr (aTHX_ sv, mg) : 0;
850}
851
852static int
853coro_sigelem_set (pTHX_ SV *sv, MAGIC *mg)
854{
855 const char *s = MgPV_nolen_const (mg);
856
857 if (*s == '_')
858 {
859 SV **svp = 0;
860
861 if (strEQ (s, "__DIE__" )) svp = &PL_diehook;
862 if (strEQ (s, "__WARN__")) svp = &PL_warnhook;
863
864 if (svp)
865 {
866 SV *old = *svp;
867 *svp = SvOK (sv) ? newSVsv (sv) : 0;
868 SvREFCNT_dec (old);
869 return 0;
870 }
871 }
872
873 return orig_sigelem_set ? orig_sigelem_set (aTHX_ sv, mg) : 0;
874}
875
876static void
877prepare_nop (pTHX_ struct coro_transfer_args *ta)
878{
879 /* kind of mega-hacky, but works */
880 ta->next = ta->prev = (struct coro *)ta;
881}
882
883static int
884slf_check_nop (pTHX_ struct CoroSLF *frame)
885{
886 return 0;
887}
888
889static int
890slf_check_repeat (pTHX_ struct CoroSLF *frame)
891{
892 return 1;
893}
894
895static UNOP coro_setup_op;
896
897static void NOINLINE /* noinline to keep it out of the transfer fast path */
898coro_setup (pTHX_ struct coro *coro)
899{
900 /*
901 * emulate part of the perl startup here.
902 */
903 coro_init_stacks (aTHX);
904
905 PL_runops = RUNOPS_DEFAULT;
906 PL_curcop = &PL_compiling;
907 PL_in_eval = EVAL_NULL;
908 PL_comppad = 0;
909 PL_comppad_name = 0;
910 PL_comppad_name_fill = 0;
911 PL_comppad_name_floor = 0;
912 PL_curpm = 0;
913 PL_curpad = 0;
914 PL_localizing = 0;
915 PL_dirty = 0;
916 PL_restartop = 0;
917#if PERL_VERSION_ATLEAST (5,10,0)
918 PL_parser = 0;
919#endif
920 PL_hints = 0;
921
922 /* recreate the die/warn hooks */
923 PL_diehook = 0; SvSetMagicSV (*hv_fetch (hv_sig, "__DIE__" , sizeof ("__DIE__" ) - 1, 1), rv_diehook );
924 PL_warnhook = 0; SvSetMagicSV (*hv_fetch (hv_sig, "__WARN__", sizeof ("__WARN__") - 1, 1), rv_warnhook);
925
926 GvSV (PL_defgv) = newSV (0);
927 GvAV (PL_defgv) = coro->args; coro->args = 0;
928 GvSV (PL_errgv) = newSV (0);
929 GvSV (irsgv) = newSVpvn ("\n", 1); sv_magic (GvSV (irsgv), (SV *)irsgv, PERL_MAGIC_sv, "/", 0);
930 GvHV (PL_hintgv) = 0;
931 PL_rs = newSVsv (GvSV (irsgv));
932 PL_defoutgv = (GV *)SvREFCNT_inc_NN (stdoutgv);
933
934 {
935 dSP;
936 UNOP myop;
937
938 Zero (&myop, 1, UNOP);
939 myop.op_next = Nullop;
940 myop.op_type = OP_ENTERSUB;
941 myop.op_flags = OPf_WANT_VOID;
942
943 PUSHMARK (SP);
944 PUSHs ((SV *)coro->startcv);
945 PUTBACK;
946 PL_op = (OP *)&myop;
947 PL_op = PL_ppaddr[OP_ENTERSUB](aTHX);
948 }
949
950 /* this newly created coroutine might be run on an existing cctx which most
951 * likely was suspended in pp_slf, so we have to emulate entering pp_slf here.
952 */
953 slf_frame.prepare = prepare_nop; /* provide a nop function for an eventual pp_slf */
954 slf_frame.check = slf_check_nop; /* signal pp_slf to not repeat */
955
956 /* and we have to provide the pp_slf op in any case, so pp_slf can skip it */
957 coro_setup_op.op_next = PL_op;
958 coro_setup_op.op_type = OP_ENTERSUB;
959 coro_setup_op.op_ppaddr = pp_slf;
960 /* no flags etc. required, as an init function won't be called */
961
962 PL_op = (OP *)&coro_setup_op;
963
964 /* copy throw, in case it was set before coro_setup */
965 CORO_THROW = coro->except;
966}
967
968static void
969coro_unwind_stacks (pTHX)
970{
971 if (!IN_DESTRUCT)
972 {
973 /* restore all saved variables and stuff */
974 LEAVE_SCOPE (0);
975 assert (PL_tmps_floor == -1);
976
977 /* free all temporaries */
978 FREETMPS;
979 assert (PL_tmps_ix == -1);
980
981 /* unwind all extra stacks */
982 POPSTACK_TO (PL_mainstack);
983
984 /* unwind main stack */
985 dounwind (-1);
986 }
987}
988
989static void
990coro_destruct_perl (pTHX_ struct coro *coro)
991{
992 coro_unwind_stacks (aTHX);
993
994 SvREFCNT_dec (GvSV (PL_defgv));
995 SvREFCNT_dec (GvAV (PL_defgv));
996 SvREFCNT_dec (GvSV (PL_errgv));
997 SvREFCNT_dec (PL_defoutgv);
998 SvREFCNT_dec (PL_rs);
999 SvREFCNT_dec (GvSV (irsgv));
1000 SvREFCNT_dec (GvHV (PL_hintgv));
1001
1002 SvREFCNT_dec (PL_diehook);
1003 SvREFCNT_dec (PL_warnhook);
1004
1005 SvREFCNT_dec (coro->saved_deffh);
1006 SvREFCNT_dec (coro->rouse_cb);
1007 SvREFCNT_dec (coro->invoke_cb);
1008 SvREFCNT_dec (coro->invoke_av);
1009
1010 coro_destruct_stacks (aTHX);
1011}
1012
1013INLINE void
1014free_coro_mortal (pTHX)
1015{
1016 if (expect_true (coro_mortal))
1017 {
1018 SvREFCNT_dec (coro_mortal);
1019 coro_mortal = 0;
1020 }
1021}
1022
1023static int
1024runops_trace (pTHX)
1025{
1026 COP *oldcop = 0;
1027 int oldcxix = -2;
1028
1029 while ((PL_op = CALL_FPTR (PL_op->op_ppaddr) (aTHX)))
1030 {
1031 PERL_ASYNC_CHECK ();
1032
1033 if (cctx_current->flags & CC_TRACE_ALL)
1034 {
1035 if (PL_op->op_type == OP_LEAVESUB && cctx_current->flags & CC_TRACE_SUB)
1036 {
1037 PERL_CONTEXT *cx = &cxstack[cxstack_ix];
1038 SV **bot, **top;
1039 AV *av = newAV (); /* return values */
1040 SV **cb;
1041 dSP;
1042
1043 GV *gv = CvGV (cx->blk_sub.cv);
1044 SV *fullname = sv_2mortal (newSV (0));
1045 if (isGV (gv))
1046 gv_efullname3 (fullname, gv, 0);
1047
1048 bot = PL_stack_base + cx->blk_oldsp + 1;
1049 top = cx->blk_gimme == G_ARRAY ? SP + 1
1050 : cx->blk_gimme == G_SCALAR ? bot + 1
1051 : bot;
1052
1053 av_extend (av, top - bot);
1054 while (bot < top)
1055 av_push (av, SvREFCNT_inc_NN (*bot++));
1056
1057 PL_runops = RUNOPS_DEFAULT;
1058 ENTER;
1059 SAVETMPS;
1060 EXTEND (SP, 3);
1061 PUSHMARK (SP);
1062 PUSHs (&PL_sv_no);
1063 PUSHs (fullname);
1064 PUSHs (sv_2mortal (newRV_noinc ((SV *)av)));
1065 PUTBACK;
1066 cb = hv_fetch ((HV *)SvRV (coro_current), "_trace_sub_cb", sizeof ("_trace_sub_cb") - 1, 0);
1067 if (cb) call_sv (*cb, G_KEEPERR | G_EVAL | G_VOID | G_DISCARD);
1068 SPAGAIN;
1069 FREETMPS;
1070 LEAVE;
1071 PL_runops = runops_trace;
1072 }
1073
1074 if (oldcop != PL_curcop)
1075 {
1076 oldcop = PL_curcop;
1077
1078 if (PL_curcop != &PL_compiling)
1079 {
1080 SV **cb;
1081
1082 if (oldcxix != cxstack_ix && cctx_current->flags & CC_TRACE_SUB)
1083 {
1084 PERL_CONTEXT *cx = &cxstack[cxstack_ix];
1085
1086 if (CxTYPE (cx) == CXt_SUB && oldcxix < cxstack_ix)
1087 {
1088 dSP;
1089 GV *gv = CvGV (cx->blk_sub.cv);
1090 SV *fullname = sv_2mortal (newSV (0));
1091
1092 if (isGV (gv))
1093 gv_efullname3 (fullname, gv, 0);
1094
1095 PL_runops = RUNOPS_DEFAULT;
1096 ENTER;
1097 SAVETMPS;
1098 EXTEND (SP, 3);
1099 PUSHMARK (SP);
1100 PUSHs (&PL_sv_yes);
1101 PUSHs (fullname);
1102 PUSHs (CxHASARGS (cx) ? sv_2mortal (newRV_inc ((SV *)cx->blk_sub.argarray)) : &PL_sv_undef);
1103 PUTBACK;
1104 cb = hv_fetch ((HV *)SvRV (coro_current), "_trace_sub_cb", sizeof ("_trace_sub_cb") - 1, 0);
1105 if (cb) call_sv (*cb, G_KEEPERR | G_EVAL | G_VOID | G_DISCARD);
1106 SPAGAIN;
1107 FREETMPS;
1108 LEAVE;
1109 PL_runops = runops_trace;
1110 }
1111
1112 oldcxix = cxstack_ix;
1113 }
1114
1115 if (cctx_current->flags & CC_TRACE_LINE)
1116 {
1117 dSP;
1118
1119 PL_runops = RUNOPS_DEFAULT;
1120 ENTER;
1121 SAVETMPS;
1122 EXTEND (SP, 3);
1123 PL_runops = RUNOPS_DEFAULT;
1124 PUSHMARK (SP);
1125 PUSHs (sv_2mortal (newSVpv (OutCopFILE (oldcop), 0)));
1126 PUSHs (sv_2mortal (newSViv (CopLINE (oldcop))));
1127 PUTBACK;
1128 cb = hv_fetch ((HV *)SvRV (coro_current), "_trace_line_cb", sizeof ("_trace_line_cb") - 1, 0);
1129 if (cb) call_sv (*cb, G_KEEPERR | G_EVAL | G_VOID | G_DISCARD);
1130 SPAGAIN;
1131 FREETMPS;
1132 LEAVE;
1133 PL_runops = runops_trace;
1134 }
1135 }
1136 }
1137 }
1138 }
1139
1140 TAINT_NOT;
1141 return 0;
1142}
1143
1144static struct CoroSLF cctx_ssl_frame;
1145
1146static void
1147slf_prepare_set_stacklevel (pTHX_ struct coro_transfer_args *ta)
1148{
1149 ta->prev = 0;
1150}
1151
1152static int
1153slf_check_set_stacklevel (pTHX_ struct CoroSLF *frame)
1154{
1155 *frame = cctx_ssl_frame;
1156
1157 return frame->check (aTHX_ frame); /* execute the restored frame - there must be one */
1158}
1159
1160/* initialises PL_top_env and injects a pseudo-slf-call to set the stacklevel */
1161static void NOINLINE
1162cctx_prepare (pTHX)
1163{
1164 PL_top_env = &PL_start_env;
1165
1166 if (cctx_current->flags & CC_TRACE)
1167 PL_runops = runops_trace;
1168
1169 /* we already must be executing an SLF op, there is no other valid way
1170 * that can lead to creation of a new cctx */
1171 assert (("FATAL: can't prepare slf-less cctx in Coro module (please report)",
1172 slf_frame.prepare && PL_op->op_ppaddr == pp_slf));
1173
1174 /* we must emulate leaving pp_slf, which is done inside slf_check_set_stacklevel */
1175 cctx_ssl_frame = slf_frame;
1176
1177 slf_frame.prepare = slf_prepare_set_stacklevel;
1178 slf_frame.check = slf_check_set_stacklevel;
1179}
1180
1181/* the tail of transfer: execute stuff we can only do after a transfer */
1182INLINE void
1183transfer_tail (pTHX)
1184{
1185 free_coro_mortal (aTHX);
1186}
1187
1188/*
1189 * this is a _very_ stripped down perl interpreter ;)
1190 */
1191static void
1192cctx_run (void *arg)
1193{
1194#ifdef USE_ITHREADS
1195# if CORO_PTHREAD
1196 PERL_SET_CONTEXT (coro_thx);
1197# endif
1198#endif
1199 {
1200 dTHX;
1201
1202 /* normally we would need to skip the entersub here */
1203 /* not doing so will re-execute it, which is exactly what we want */
1204 /* PL_nop = PL_nop->op_next */
1205
1206 /* inject a fake subroutine call to cctx_init */
1207 cctx_prepare (aTHX);
1208
1209 /* cctx_run is the alternative tail of transfer() */
1210 transfer_tail (aTHX);
1211
1212 /* somebody or something will hit me for both perl_run and PL_restartop */
1213 PL_restartop = PL_op;
1214 perl_run (PL_curinterp);
1215 /*
1216 * Unfortunately, there is no way to get at the return values of the
1217 * coro body here, as perl_run destroys these
1218 */
1219
1220 /*
1221 * If perl-run returns we assume exit() was being called or the coro
1222 * fell off the end, which seems to be the only valid (non-bug)
1223 * reason for perl_run to return. We try to exit by jumping to the
1224 * bootstrap-time "top" top_env, as we cannot restore the "main"
1225 * coroutine as Coro has no such concept.
1226 * This actually isn't valid with the pthread backend, but OSes requiring
1227 * that backend are too broken to do it in a standards-compliant way.
1228 */
1229 PL_top_env = main_top_env;
1230 JMPENV_JUMP (2); /* I do not feel well about the hardcoded 2 at all */
1231 }
1232}
1233
1234static coro_cctx *
1235cctx_new ()
1236{
1237 coro_cctx *cctx;
1238
1239 ++cctx_count;
1240 New (0, cctx, 1, coro_cctx);
1241
1242 cctx->gen = cctx_gen;
1243 cctx->flags = 0;
1244 cctx->idle_sp = 0; /* can be accessed by transfer between cctx_run and set_stacklevel, on throw */
1245
1246 return cctx;
1247}
1248
1249/* create a new cctx only suitable as source */
1250static coro_cctx *
1251cctx_new_empty ()
1252{
1253 coro_cctx *cctx = cctx_new ();
1254
1255 cctx->sptr = 0;
1256 coro_create (&cctx->cctx, 0, 0, 0, 0);
1257
1258 return cctx;
1259}
1260
1261/* create a new cctx suitable as destination/running a perl interpreter */
1262static coro_cctx *
1263cctx_new_run ()
1264{
1265 coro_cctx *cctx = cctx_new ();
1266 void *stack_start;
1267 size_t stack_size;
1268
1269#if HAVE_MMAP
1270 cctx->ssize = ((cctx_stacksize * sizeof (long) + PAGESIZE - 1) / PAGESIZE + CORO_STACKGUARD) * PAGESIZE;
1271 /* mmap supposedly does allocate-on-write for us */
1272 cctx->sptr = mmap (0, cctx->ssize, PROT_EXEC|PROT_READ|PROT_WRITE, MAP_PRIVATE|MAP_ANONYMOUS, 0, 0);
1273
1274 if (cctx->sptr != (void *)-1)
1275 {
1276 #if CORO_STACKGUARD
1277 mprotect (cctx->sptr, CORO_STACKGUARD * PAGESIZE, PROT_NONE);
1278 #endif
1279 stack_start = (char *)cctx->sptr + CORO_STACKGUARD * PAGESIZE;
1280 stack_size = cctx->ssize - CORO_STACKGUARD * PAGESIZE;
1281 cctx->flags |= CC_MAPPED;
1282 }
1283 else
1284#endif
1285 {
1286 cctx->ssize = cctx_stacksize * (long)sizeof (long);
1287 New (0, cctx->sptr, cctx_stacksize, long);
1288
1289 if (!cctx->sptr)
1290 {
1291 perror ("FATAL: unable to allocate stack for coroutine, exiting.");
1292 _exit (EXIT_FAILURE);
1293 }
1294
1295 stack_start = cctx->sptr;
1296 stack_size = cctx->ssize;
1297 }
1298
1299 #if CORO_USE_VALGRIND
1300 cctx->valgrind_id = VALGRIND_STACK_REGISTER ((char *)stack_start, (char *)stack_start + stack_size);
1301 #endif
1302
1303 coro_create (&cctx->cctx, cctx_run, (void *)cctx, stack_start, stack_size);
1304
1305 return cctx;
1306}
1307
1308static void
1309cctx_destroy (coro_cctx *cctx)
1310{
1311 if (!cctx)
1312 return;
1313
1314 assert (("FATAL: tried to destroy current cctx", cctx != cctx_current));//D temporary?
1315
1316 --cctx_count;
1317 coro_destroy (&cctx->cctx);
1318
1319 /* coro_transfer creates new, empty cctx's */
1320 if (cctx->sptr)
1321 {
1322 #if CORO_USE_VALGRIND
1323 VALGRIND_STACK_DEREGISTER (cctx->valgrind_id);
1324 #endif
1325
1326#if HAVE_MMAP
1327 if (cctx->flags & CC_MAPPED)
1328 munmap (cctx->sptr, cctx->ssize);
1329 else
1330#endif
1331 Safefree (cctx->sptr);
1332 }
1333
1334 Safefree (cctx);
1335}
1336
1337/* wether this cctx should be destructed */
1338#define CCTX_EXPIRED(cctx) ((cctx)->gen != cctx_gen || ((cctx)->flags & CC_NOREUSE))
1339
1340static coro_cctx *
1341cctx_get (pTHX)
1342{
1343 while (expect_true (cctx_first))
1344 {
1345 coro_cctx *cctx = cctx_first;
1346 cctx_first = cctx->next;
1347 --cctx_idle;
1348
1349 if (expect_true (!CCTX_EXPIRED (cctx)))
1350 return cctx;
1351
1352 cctx_destroy (cctx);
1353 }
1354
1355 return cctx_new_run ();
1356}
1357
1358static void
1359cctx_put (coro_cctx *cctx)
1360{
1361 assert (("FATAL: cctx_put called on non-initialised cctx in Coro (please report)", cctx->sptr));
1362
1363 /* free another cctx if overlimit */
1364 if (expect_false (cctx_idle >= cctx_max_idle))
1365 {
1366 coro_cctx *first = cctx_first;
1367 cctx_first = first->next;
1368 --cctx_idle;
1369
1370 cctx_destroy (first);
1371 }
1372
1373 ++cctx_idle;
1374 cctx->next = cctx_first;
1375 cctx_first = cctx;
1376}
1377
1378/** coroutine switching *****************************************************/
1379
1380static void
1381transfer_check (pTHX_ struct coro *prev, struct coro *next)
1382{
1383 /* TODO: throwing up here is considered harmful */
1384
1385 if (expect_true (prev != next))
1386 {
1387 if (expect_false (!(prev->flags & (CF_RUNNING | CF_NEW))))
1388 croak ("Coro::State::transfer called with a blocked prev Coro::State, but can only transfer from running or new states,");
1389
1390 if (expect_false (next->flags & (CF_RUNNING | CF_DESTROYED | CF_SUSPENDED)))
1391 croak ("Coro::State::transfer called with running, destroyed or suspended next Coro::State, but can only transfer to inactive states,");
1392
1393#if !PERL_VERSION_ATLEAST (5,10,0)
1394 if (expect_false (PL_lex_state != LEX_NOTPARSING))
1395 croak ("Coro::State::transfer called while parsing, but this is not supported in your perl version,");
1396#endif
1397 }
1398}
1399
1400/* always use the TRANSFER macro */
1401static void NOINLINE /* noinline so we have a fixed stackframe */
1402transfer (pTHX_ struct coro *prev, struct coro *next, int force_cctx)
1403{
1404 dSTACKLEVEL;
1405
1406 /* sometimes transfer is only called to set idle_sp */
1407 if (expect_false (!prev))
1408 {
1409 cctx_current->idle_sp = STACKLEVEL;
1410 assert (cctx_current->idle_te = PL_top_env); /* just for the side-effect when asserts are enabled */
1411 }
1412 else if (expect_true (prev != next))
1413 {
1414 coro_cctx *cctx_prev;
1415
1416 if (expect_false (prev->flags & CF_NEW))
1417 {
1418 /* create a new empty/source context */
1419 prev->flags &= ~CF_NEW;
1420 prev->flags |= CF_RUNNING;
1421 }
1422
1423 prev->flags &= ~CF_RUNNING;
1424 next->flags |= CF_RUNNING;
1425
1426 /* first get rid of the old state */
1427 save_perl (aTHX_ prev);
1428
1429 if (expect_false (next->flags & CF_NEW))
1430 {
1431 /* need to start coroutine */
1432 next->flags &= ~CF_NEW;
1433 /* setup coroutine call */
1434 coro_setup (aTHX_ next);
1435 }
1436 else
1437 load_perl (aTHX_ next);
1438
1439 /* possibly untie and reuse the cctx */
1440 if (expect_true (
1441 cctx_current->idle_sp == STACKLEVEL
1442 && !(cctx_current->flags & CC_TRACE)
1443 && !force_cctx
1444 ))
1445 {
1446 /* I assume that stacklevel is a stronger indicator than PL_top_env changes */
1447 assert (("FATAL: current top_env must equal previous top_env in Coro (please report)", PL_top_env == cctx_current->idle_te));
1448
1449 /* if the cctx is about to be destroyed we need to make sure we won't see it in cctx_get. */
1450 /* without this the next cctx_get might destroy the running cctx while still in use */
1451 if (expect_false (CCTX_EXPIRED (cctx_current)))
1452 if (expect_true (!next->cctx))
1453 next->cctx = cctx_get (aTHX);
1454
1455 cctx_put (cctx_current);
1456 }
1457 else
1458 prev->cctx = cctx_current;
1459
1460 ++next->usecount;
1461
1462 cctx_prev = cctx_current;
1463 cctx_current = expect_false (next->cctx) ? next->cctx : cctx_get (aTHX);
1464
1465 next->cctx = 0;
1466
1467 if (expect_false (cctx_prev != cctx_current))
1468 {
1469 cctx_prev->top_env = PL_top_env;
1470 PL_top_env = cctx_current->top_env;
1471 coro_transfer (&cctx_prev->cctx, &cctx_current->cctx);
1472 }
1473
1474 transfer_tail (aTHX);
1475 }
1476}
1477
1478#define TRANSFER(ta, force_cctx) transfer (aTHX_ (ta).prev, (ta).next, (force_cctx))
1479#define TRANSFER_CHECK(ta) transfer_check (aTHX_ (ta).prev, (ta).next)
1480
1481/** high level stuff ********************************************************/
1482
1483static int
1484coro_state_destroy (pTHX_ struct coro *coro)
1485{
1486 if (coro->flags & CF_DESTROYED)
1487 return 0;
1488
1489 if (coro->on_destroy && !PL_dirty)
1490 coro->on_destroy (aTHX_ coro);
1491
1492 coro->flags |= CF_DESTROYED;
1493
1494 if (coro->flags & CF_READY)
1495 {
1496 /* reduce nready, as destroying a ready coro effectively unreadies it */
1497 /* alternative: look through all ready queues and remove the coro */
1498 --coro_nready;
1499 }
1500 else
1501 coro->flags |= CF_READY; /* make sure it is NOT put into the readyqueue */
1502
1503 if (coro->mainstack
1504 && coro->mainstack != main_mainstack
1505 && coro->slot
1506 && !PL_dirty)
1507 {
1508 struct coro *current = SvSTATE_current;
1509
1510 assert (("FATAL: tried to destroy currently running coroutine", coro->mainstack != PL_mainstack));
1511
1512 save_perl (aTHX_ current);
1513 load_perl (aTHX_ coro);
1514
1515 coro_destruct_perl (aTHX_ coro);
1516
1517 load_perl (aTHX_ current);
1518
1519 coro->slot = 0;
1520 }
1521
1522 cctx_destroy (coro->cctx);
1523 SvREFCNT_dec (coro->startcv);
1524 SvREFCNT_dec (coro->args);
1525 SvREFCNT_dec (CORO_THROW);
1526
1527 if (coro->next) coro->next->prev = coro->prev;
1528 if (coro->prev) coro->prev->next = coro->next;
1529 if (coro == coro_first) coro_first = coro->next;
1530
1531 return 1;
1532}
1533
1534static int
1535coro_state_free (pTHX_ SV *sv, MAGIC *mg)
1536{
1537 struct coro *coro = (struct coro *)mg->mg_ptr;
1538 mg->mg_ptr = 0;
1539
1540 coro->hv = 0;
1541
1542 if (--coro->refcnt < 0)
1543 {
1544 coro_state_destroy (aTHX_ coro);
1545 Safefree (coro);
1546 }
1547
1548 return 0;
1549}
1550
1551static int
1552coro_state_dup (pTHX_ MAGIC *mg, CLONE_PARAMS *params)
1553{
1554 struct coro *coro = (struct coro *)mg->mg_ptr;
1555
1556 ++coro->refcnt;
1557
1558 return 0;
1559}
1560
1561static MGVTBL coro_state_vtbl = {
1562 0, 0, 0, 0,
1563 coro_state_free,
1564 0,
1565#ifdef MGf_DUP
1566 coro_state_dup,
1567#else
1568# define MGf_DUP 0
1569#endif
1570};
1571
1572static void
1573prepare_transfer (pTHX_ struct coro_transfer_args *ta, SV *prev_sv, SV *next_sv)
1574{
1575 ta->prev = SvSTATE (prev_sv);
1576 ta->next = SvSTATE (next_sv);
1577 TRANSFER_CHECK (*ta);
1578}
1579
1580static void
1581api_transfer (pTHX_ SV *prev_sv, SV *next_sv)
1582{
1583 struct coro_transfer_args ta;
1584
1585 prepare_transfer (aTHX_ &ta, prev_sv, next_sv);
1586 TRANSFER (ta, 1);
1587}
1588
1589/*****************************************************************************/
1590/* gensub: simple closure generation utility */
1591
1592#define GENSUB_ARG CvXSUBANY (cv).any_ptr
1593
1594/* create a closure from XS, returns a code reference */
1595/* the arg can be accessed via GENSUB_ARG from the callback */
1596/* the callback must use dXSARGS/XSRETURN */
1597static SV *
1598gensub (pTHX_ void (*xsub)(pTHX_ CV *), void *arg)
1599{
1600 CV *cv = (CV *)newSV (0);
1601
1602 sv_upgrade ((SV *)cv, SVt_PVCV);
1603
1604 CvANON_on (cv);
1605 CvISXSUB_on (cv);
1606 CvXSUB (cv) = xsub;
1607 GENSUB_ARG = arg;
1608
1609 return newRV_noinc ((SV *)cv);
1610}
1611
1612/** Coro ********************************************************************/
1613
1614INLINE void
1615coro_enq (pTHX_ struct coro *coro)
1616{
1617 av_push (coro_ready [coro->prio - PRIO_MIN], SvREFCNT_inc_NN (coro->hv));
1618}
1619
1620INLINE SV *
1621coro_deq (pTHX)
1622{
1623 int prio;
1624
1625 for (prio = PRIO_MAX - PRIO_MIN + 1; --prio >= 0; )
1626 if (AvFILLp (coro_ready [prio]) >= 0)
1627 return av_shift (coro_ready [prio]);
1628
1629 return 0;
1630}
1631
1632static int
1633api_ready (pTHX_ SV *coro_sv)
1634{
1635 struct coro *coro;
1636 SV *sv_hook;
1637 void (*xs_hook)(void);
1638
1639 coro = SvSTATE (coro_sv);
1640
1641 if (coro->flags & CF_READY)
1642 return 0;
1643
1644 coro->flags |= CF_READY;
1645
1646 sv_hook = coro_nready ? 0 : coro_readyhook;
1647 xs_hook = coro_nready ? 0 : coroapi.readyhook;
1648
1649 coro_enq (aTHX_ coro);
1650 ++coro_nready;
1651
1652 if (sv_hook)
1653 {
1654 dSP;
1655
1656 ENTER;
1657 SAVETMPS;
1658
1659 PUSHMARK (SP);
1660 PUTBACK;
1661 call_sv (sv_hook, G_VOID | G_DISCARD);
1662
1663 FREETMPS;
1664 LEAVE;
1665 }
1666
1667 if (xs_hook)
1668 xs_hook ();
1669
1670 return 1;
1671}
1672
1673static int
1674api_is_ready (pTHX_ SV *coro_sv)
1675{
1676 return !!(SvSTATE (coro_sv)->flags & CF_READY);
1677}
1678
1679/* expects to own a reference to next->hv */
1680INLINE void
1681prepare_schedule_to (pTHX_ struct coro_transfer_args *ta, struct coro *next)
1682{
1683 SV *prev_sv = SvRV (coro_current);
1684
1685 ta->prev = SvSTATE_hv (prev_sv);
1686 ta->next = next;
1687
1688 TRANSFER_CHECK (*ta);
1689
1690 SvRV_set (coro_current, (SV *)next->hv);
1691
1692 free_coro_mortal (aTHX);
1693 coro_mortal = prev_sv;
1694}
1695
1696static void
1697prepare_schedule (pTHX_ struct coro_transfer_args *ta)
1698{
1699 for (;;)
1700 {
1701 SV *next_sv = coro_deq (aTHX);
1702
1703 if (expect_true (next_sv))
1704 {
1705 struct coro *next = SvSTATE_hv (next_sv);
1706
1707 /* cannot transfer to destroyed coros, skip and look for next */
1708 if (expect_false (next->flags & (CF_DESTROYED | CF_SUSPENDED)))
1709 SvREFCNT_dec (next_sv); /* coro_nready has already been taken care of by destroy */
1710 else
1711 {
1712 next->flags &= ~CF_READY;
1713 --coro_nready;
1714
1715 prepare_schedule_to (aTHX_ ta, next);
1716 break;
1717 }
1718 }
1719 else
1720 {
1721 /* nothing to schedule: call the idle handler */
1722 if (SvROK (sv_idle)
1723 && SvOBJECT (SvRV (sv_idle)))
1724 {
1725 ++coro_nready; /* hack so that api_ready doesn't invoke ready hook */
1726 api_ready (aTHX_ SvRV (sv_idle));
1727 --coro_nready;
1728 }
1729 else
1730 {
1731 dSP;
1732
1733 ENTER;
1734 SAVETMPS;
1735
1736 PUSHMARK (SP);
1737 PUTBACK;
1738 call_sv (sv_idle, G_VOID | G_DISCARD);
1739
1740 FREETMPS;
1741 LEAVE;
1742 }
1743 }
1744 }
1745}
1746
1747INLINE void
1748prepare_cede (pTHX_ struct coro_transfer_args *ta)
1749{
1750 api_ready (aTHX_ coro_current);
1751 prepare_schedule (aTHX_ ta);
1752}
1753
1754INLINE void
1755prepare_cede_notself (pTHX_ struct coro_transfer_args *ta)
1756{
1757 SV *prev = SvRV (coro_current);
1758
1759 if (coro_nready)
1760 {
1761 prepare_schedule (aTHX_ ta);
1762 api_ready (aTHX_ prev);
1763 }
1764 else
1765 prepare_nop (aTHX_ ta);
1766}
1767
1768static void
1769api_schedule (pTHX)
1770{
1771 struct coro_transfer_args ta;
1772
1773 prepare_schedule (aTHX_ &ta);
1774 TRANSFER (ta, 1);
1775}
1776
1777static void
1778api_schedule_to (pTHX_ SV *coro_sv)
1779{
1780 struct coro_transfer_args ta;
1781 struct coro *next = SvSTATE (coro_sv);
1782
1783 SvREFCNT_inc_NN (coro_sv);
1784 prepare_schedule_to (aTHX_ &ta, next);
1785}
1786
1787static int
1788api_cede (pTHX)
1789{
1790 struct coro_transfer_args ta;
1791
1792 prepare_cede (aTHX_ &ta);
1793
1794 if (expect_true (ta.prev != ta.next))
1795 {
1796 TRANSFER (ta, 1);
1797 return 1;
1798 }
1799 else
1800 return 0;
1801}
1802
1803static int
1804api_cede_notself (pTHX)
1805{
1806 if (coro_nready)
1807 {
1808 struct coro_transfer_args ta;
1809
1810 prepare_cede_notself (aTHX_ &ta);
1811 TRANSFER (ta, 1);
1812 return 1;
1813 }
1814 else
1815 return 0;
1816}
1817
1818static void
1819api_trace (pTHX_ SV *coro_sv, int flags)
1820{
1821 struct coro *coro = SvSTATE (coro_sv);
1822
1823 if (coro->flags & CF_RUNNING)
1824 croak ("cannot enable tracing on a running coroutine, caught");
1825
1826 if (flags & CC_TRACE)
1827 {
1828 if (!coro->cctx)
1829 coro->cctx = cctx_new_run ();
1830 else if (!(coro->cctx->flags & CC_TRACE))
1831 croak ("cannot enable tracing on coroutine with custom stack, caught");
1832
1833 coro->cctx->flags |= CC_NOREUSE | (flags & (CC_TRACE | CC_TRACE_ALL));
1834 }
1835 else if (coro->cctx && coro->cctx->flags & CC_TRACE)
1836 {
1837 coro->cctx->flags &= ~(CC_TRACE | CC_TRACE_ALL);
1838
1839 if (coro->flags & CF_RUNNING)
1840 PL_runops = RUNOPS_DEFAULT;
1841 else
1842 coro->slot->runops = RUNOPS_DEFAULT;
1843 }
1844}
1845
1846static void
1847coro_call_on_destroy (pTHX_ struct coro *coro)
1848{
1849 SV **on_destroyp = hv_fetch (coro->hv, "_on_destroy", sizeof ("_on_destroy") - 1, 0);
1850 SV **statusp = hv_fetch (coro->hv, "_status", sizeof ("_status") - 1, 0);
1851
1852 if (on_destroyp)
1853 {
1854 AV *on_destroy = (AV *)SvRV (*on_destroyp);
1855
1856 while (AvFILLp (on_destroy) >= 0)
1857 {
1858 dSP; /* don't disturb outer sp */
1859 SV *cb = av_pop (on_destroy);
1860
1861 PUSHMARK (SP);
1862
1863 if (statusp)
1864 {
1865 int i;
1866 AV *status = (AV *)SvRV (*statusp);
1867 EXTEND (SP, AvFILLp (status) + 1);
1868
1869 for (i = 0; i <= AvFILLp (status); ++i)
1870 PUSHs (AvARRAY (status)[i]);
1871 }
1872
1873 PUTBACK;
1874 call_sv (sv_2mortal (cb), G_VOID | G_DISCARD);
1875 }
1876 }
1877}
1878
1879static void
1880slf_init_terminate (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
1881{
1882 int i;
1883 HV *hv = (HV *)SvRV (coro_current);
1884 AV *av = newAV ();
1885
1886 av_extend (av, items - 1);
1887 for (i = 0; i < items; ++i)
1888 av_push (av, SvREFCNT_inc_NN (arg [i]));
1889
1890 hv_store (hv, "_status", sizeof ("_status") - 1, newRV_noinc ((SV *)av), 0);
1891
1892 av_push (av_destroy, (SV *)newRV_inc ((SV *)hv)); /* RVinc for perl */
1893 api_ready (aTHX_ sv_manager);
1894
1895 frame->prepare = prepare_schedule;
1896 frame->check = slf_check_repeat;
1897
1898 /* as a minor optimisation, we could unwind all stacks here */
1899 /* but that puts extra pressure on pp_slf, and is not worth much */
1900 /*coro_unwind_stacks (aTHX);*/
1901}
1902
1903/*****************************************************************************/
1904/* async pool handler */
1905
1906static int
1907slf_check_pool_handler (pTHX_ struct CoroSLF *frame)
1908{
1909 HV *hv = (HV *)SvRV (coro_current);
1910 struct coro *coro = (struct coro *)frame->data;
1911
1912 if (!coro->invoke_cb)
1913 return 1; /* loop till we have invoke */
1914 else
1915 {
1916 hv_store (hv, "desc", sizeof ("desc") - 1,
1917 newSVpvn ("[async_pool]", sizeof ("[async_pool]") - 1), 0);
1918
1919 coro->saved_deffh = SvREFCNT_inc_NN ((SV *)PL_defoutgv);
1920
1921 {
1922 dSP;
1923 XPUSHs (sv_2mortal (coro->invoke_cb)); coro->invoke_cb = 0;
1924 PUTBACK;
1925 }
1926
1927 SvREFCNT_dec (GvAV (PL_defgv));
1928 GvAV (PL_defgv) = coro->invoke_av;
1929 coro->invoke_av = 0;
1930
1931 return 0;
1932 }
1933}
1934
1935static void
1936slf_init_pool_handler (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
1937{
1938 HV *hv = (HV *)SvRV (coro_current);
1939 struct coro *coro = SvSTATE_hv ((SV *)hv);
1940
1941 if (expect_true (coro->saved_deffh))
1942 {
1943 /* subsequent iteration */
1944 SvREFCNT_dec ((SV *)PL_defoutgv); PL_defoutgv = (GV *)coro->saved_deffh;
1945 coro->saved_deffh = 0;
1946
1947 if (coro_rss (aTHX_ coro) > SvUV (sv_pool_rss)
1948 || av_len (av_async_pool) + 1 >= SvIV (sv_pool_size))
1949 {
1950 coro->invoke_cb = SvREFCNT_inc_NN ((SV *)cv_coro_terminate);
1951 coro->invoke_av = newAV ();
1952
1953 frame->prepare = prepare_nop;
1954 }
1955 else
1956 {
1957 av_clear (GvAV (PL_defgv));
1958 hv_store (hv, "desc", sizeof ("desc") - 1, SvREFCNT_inc_NN (sv_async_pool_idle), 0);
1959
1960 coro->prio = 0;
1961
1962 if (coro->cctx && (coro->cctx->flags & CC_TRACE))
1963 api_trace (aTHX_ coro_current, 0);
1964
1965 frame->prepare = prepare_schedule;
1966 av_push (av_async_pool, SvREFCNT_inc (hv));
1967 }
1968 }
1969 else
1970 {
1971 /* first iteration, simply fall through */
1972 frame->prepare = prepare_nop;
1973 }
1974
1975 frame->check = slf_check_pool_handler;
1976 frame->data = (void *)coro;
1977}
1978
1979/*****************************************************************************/
1980/* rouse callback */
1981
1982#define CORO_MAGIC_type_rouse PERL_MAGIC_ext
1983
1984static void
1985coro_rouse_callback (pTHX_ CV *cv)
1986{
1987 dXSARGS;
1988 SV *data = (SV *)GENSUB_ARG;
1989
1990 if (SvTYPE (SvRV (data)) != SVt_PVAV)
1991 {
1992 /* first call, set args */
1993 SV *coro = SvRV (data);
1994 AV *av = newAV ();
1995
1996 SvRV_set (data, (SV *)av);
1997
1998 /* better take a full copy of the arguments */
1999 while (items--)
2000 av_store (av, items, newSVsv (ST (items)));
2001
2002 api_ready (aTHX_ coro);
2003 SvREFCNT_dec (coro);
2004 }
2005
2006 XSRETURN_EMPTY;
2007}
2008
2009static int
2010slf_check_rouse_wait (pTHX_ struct CoroSLF *frame)
2011{
2012 SV *data = (SV *)frame->data;
2013
2014 if (CORO_THROW)
2015 return 0;
2016
2017 if (SvTYPE (SvRV (data)) != SVt_PVAV)
2018 return 1;
2019
2020 /* now push all results on the stack */
2021 {
2022 dSP;
2023 AV *av = (AV *)SvRV (data);
2024 int i;
2025
2026 EXTEND (SP, AvFILLp (av) + 1);
2027 for (i = 0; i <= AvFILLp (av); ++i)
2028 PUSHs (sv_2mortal (AvARRAY (av)[i]));
2029
2030 /* we have stolen the elements, so set length to zero and free */
2031 AvFILLp (av) = -1;
2032 av_undef (av);
2033
2034 PUTBACK;
2035 }
2036
2037 return 0;
2038}
2039
2040static void
2041slf_init_rouse_wait (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2042{
2043 SV *cb;
2044
2045 if (items)
2046 cb = arg [0];
2047 else
2048 {
2049 struct coro *coro = SvSTATE_current;
2050
2051 if (!coro->rouse_cb)
2052 croak ("Coro::rouse_wait called without rouse callback, and no default rouse callback found either,");
2053
2054 cb = sv_2mortal (coro->rouse_cb);
2055 coro->rouse_cb = 0;
2056 }
2057
2058 if (!SvROK (cb)
2059 || SvTYPE (SvRV (cb)) != SVt_PVCV
2060 || CvXSUB ((CV *)SvRV (cb)) != coro_rouse_callback)
2061 croak ("Coro::rouse_wait called with illegal callback argument,");
2062
2063 {
2064 CV *cv = (CV *)SvRV (cb); /* for GENSUB_ARG */
2065 SV *data = (SV *)GENSUB_ARG;
2066
2067 frame->data = (void *)data;
2068 frame->prepare = SvTYPE (SvRV (data)) == SVt_PVAV ? prepare_nop : prepare_schedule;
2069 frame->check = slf_check_rouse_wait;
2070 }
2071}
2072
2073static SV *
2074coro_new_rouse_cb (pTHX)
2075{
2076 HV *hv = (HV *)SvRV (coro_current);
2077 struct coro *coro = SvSTATE_hv (hv);
2078 SV *data = newRV_inc ((SV *)hv);
2079 SV *cb = gensub (aTHX_ coro_rouse_callback, (void *)data);
2080
2081 sv_magicext (SvRV (cb), data, CORO_MAGIC_type_rouse, 0, 0, 0);
2082 SvREFCNT_dec (data); /* magicext increases the refcount */
2083
2084 SvREFCNT_dec (coro->rouse_cb);
2085 coro->rouse_cb = SvREFCNT_inc_NN (cb);
2086
2087 return cb;
2088}
2089
2090/*****************************************************************************/
2091/* schedule-like-function opcode (SLF) */
2092
2093static UNOP slf_restore; /* restore stack as entersub did, for first-re-run */
2094static const CV *slf_cv;
2095static SV **slf_argv;
2096static int slf_argc, slf_arga; /* count, allocated */
2097static I32 slf_ax; /* top of stack, for restore */
2098
2099/* this restores the stack in the case we patched the entersub, to */
2100/* recreate the stack frame as perl will on following calls */
2101/* since entersub cleared the stack */
2102static OP *
2103pp_restore (pTHX)
2104{
2105 int i;
2106 SV **SP = PL_stack_base + slf_ax;
2107
2108 PUSHMARK (SP);
2109
2110 EXTEND (SP, slf_argc + 1);
2111
2112 for (i = 0; i < slf_argc; ++i)
2113 PUSHs (sv_2mortal (slf_argv [i]));
2114
2115 PUSHs ((SV *)CvGV (slf_cv));
2116
2117 RETURNOP (slf_restore.op_first);
2118}
2119
2120static void
2121slf_prepare_transfer (pTHX_ struct coro_transfer_args *ta)
2122{
2123 SV **arg = (SV **)slf_frame.data;
2124
2125 prepare_transfer (aTHX_ ta, arg [0], arg [1]);
2126}
2127
2128static void
2129slf_init_transfer (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2130{
2131 if (items != 2)
2132 croak ("Coro::State::transfer (prev, next) expects two arguments, not %d,", items);
2133
2134 frame->prepare = slf_prepare_transfer;
2135 frame->check = slf_check_nop;
2136 frame->data = (void *)arg; /* let's hope it will stay valid */
2137}
2138
2139static void
2140slf_init_schedule (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2141{
2142 frame->prepare = prepare_schedule;
2143 frame->check = slf_check_nop;
2144}
2145
2146static void
2147slf_prepare_schedule_to (pTHX_ struct coro_transfer_args *ta)
2148{
2149 struct coro *next = (struct coro *)slf_frame.data;
2150
2151 SvREFCNT_inc_NN (next->hv);
2152 prepare_schedule_to (aTHX_ ta, next);
2153}
2154
2155static void
2156slf_init_schedule_to (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2157{
2158 if (!items)
2159 croak ("Coro::schedule_to expects a coroutine argument, caught");
2160
2161 frame->data = (void *)SvSTATE (arg [0]);
2162 frame->prepare = slf_prepare_schedule_to;
2163 frame->check = slf_check_nop;
2164}
2165
2166static void
2167slf_init_cede_to (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2168{
2169 api_ready (aTHX_ SvRV (coro_current));
2170
2171 slf_init_schedule_to (aTHX_ frame, cv, arg, items);
2172}
2173
2174static void
2175slf_init_cede (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2176{
2177 frame->prepare = prepare_cede;
2178 frame->check = slf_check_nop;
2179}
2180
2181static void
2182slf_init_cede_notself (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2183{
2184 frame->prepare = prepare_cede_notself;
2185 frame->check = slf_check_nop;
2186}
2187
2188/*
2189 * these not obviously related functions are all rolled into one
2190 * function to increase chances that they all will call transfer with the same
2191 * stack offset
2192 * SLF stands for "schedule-like-function".
2193 */
2194static OP *
2195pp_slf (pTHX)
2196{
2197 I32 checkmark; /* mark SP to see how many elements check has pushed */
2198
2199 /* set up the slf frame, unless it has already been set-up */
2200 /* the latter happens when a new coro has been started */
2201 /* or when a new cctx was attached to an existing coroutine */
2202 if (expect_true (!slf_frame.prepare))
2203 {
2204 /* first iteration */
2205 dSP;
2206 SV **arg = PL_stack_base + TOPMARK + 1;
2207 int items = SP - arg; /* args without function object */
2208 SV *gv = *sp;
2209
2210 /* do a quick consistency check on the "function" object, and if it isn't */
2211 /* for us, divert to the real entersub */
2212 if (SvTYPE (gv) != SVt_PVGV
2213 || !GvCV (gv)
2214 || !(CvFLAGS (GvCV (gv)) & CVf_SLF))
2215 return PL_ppaddr[OP_ENTERSUB](aTHX);
2216
2217 if (!(PL_op->op_flags & OPf_STACKED))
2218 {
2219 /* ampersand-form of call, use @_ instead of stack */
2220 AV *av = GvAV (PL_defgv);
2221 arg = AvARRAY (av);
2222 items = AvFILLp (av) + 1;
2223 }
2224
2225 /* now call the init function, which needs to set up slf_frame */
2226 ((coro_slf_cb)CvXSUBANY (GvCV (gv)).any_ptr)
2227 (aTHX_ &slf_frame, GvCV (gv), arg, items);
2228
2229 /* pop args */
2230 SP = PL_stack_base + POPMARK;
2231
2232 PUTBACK;
2233 }
2234
2235 /* now that we have a slf_frame, interpret it! */
2236 /* we use a callback system not to make the code needlessly */
2237 /* complicated, but so we can run multiple perl coros from one cctx */
2238
2239 do
2240 {
2241 struct coro_transfer_args ta;
2242
2243 slf_frame.prepare (aTHX_ &ta);
2244 TRANSFER (ta, 0);
2245
2246 checkmark = PL_stack_sp - PL_stack_base;
2247 }
2248 while (slf_frame.check (aTHX_ &slf_frame));
2249
2250 slf_frame.prepare = 0; /* invalidate the frame, we are done processing it */
2251
2252 /* exception handling */
2253 if (expect_false (CORO_THROW))
2254 {
2255 SV *exception = sv_2mortal (CORO_THROW);
2256
2257 CORO_THROW = 0;
2258 sv_setsv (ERRSV, exception);
2259 croak (0);
2260 }
2261
2262 /* return value handling - mostly like entersub */
2263 /* make sure we put something on the stack in scalar context */
2264 if (GIMME_V == G_SCALAR)
2265 {
2266 dSP;
2267 SV **bot = PL_stack_base + checkmark;
2268
2269 if (sp == bot) /* too few, push undef */
2270 bot [1] = &PL_sv_undef;
2271 else if (sp != bot + 1) /* too many, take last one */
2272 bot [1] = *sp;
2273
2274 SP = bot + 1;
2275
2276 PUTBACK;
2277 }
2278
2279 return NORMAL;
2280}
2281
2282static void
2283api_execute_slf (pTHX_ CV *cv, coro_slf_cb init_cb, I32 ax)
2284{
2285 int i;
2286 SV **arg = PL_stack_base + ax;
2287 int items = PL_stack_sp - arg + 1;
2288
2289 assert (("FATAL: SLF call with illegal CV value", !CvANON (cv)));
2290
2291 if (PL_op->op_ppaddr != PL_ppaddr [OP_ENTERSUB]
2292 && PL_op->op_ppaddr != pp_slf)
2293 croak ("FATAL: Coro SLF calls can only be made normally, not via goto or any other means, caught");
2294
2295 CvFLAGS (cv) |= CVf_SLF;
2296 CvXSUBANY (cv).any_ptr = (void *)init_cb;
2297 slf_cv = cv;
2298
2299 /* we patch the op, and then re-run the whole call */
2300 /* we have to put the same argument on the stack for this to work */
2301 /* and this will be done by pp_restore */
2302 slf_restore.op_next = (OP *)&slf_restore;
2303 slf_restore.op_type = OP_CUSTOM;
2304 slf_restore.op_ppaddr = pp_restore;
2305 slf_restore.op_first = PL_op;
2306
2307 slf_ax = ax - 1; /* undo the ax++ inside dAXMARK */
2308
2309 if (PL_op->op_flags & OPf_STACKED)
2310 {
2311 if (items > slf_arga)
2312 {
2313 slf_arga = items;
2314 free (slf_argv);
2315 slf_argv = malloc (slf_arga * sizeof (SV *));
2316 }
2317
2318 slf_argc = items;
2319
2320 for (i = 0; i < items; ++i)
2321 slf_argv [i] = SvREFCNT_inc (arg [i]);
2322 }
2323 else
2324 slf_argc = 0;
2325
2326 PL_op->op_ppaddr = pp_slf;
2327 /*PL_op->op_type = OP_CUSTOM; /* we do behave like entersub still */
2328
2329 PL_op = (OP *)&slf_restore;
2330}
2331
2332/*****************************************************************************/
2333/* dynamic wind */
2334
2335static void
2336on_enterleave_call (pTHX_ SV *cb)
2337{
2338 dSP;
2339
2340 PUSHSTACK;
2341
2342 PUSHMARK (SP);
2343 PUTBACK;
2344 call_sv (cb, G_VOID | G_DISCARD);
2345 SPAGAIN;
2346
2347 POPSTACK;
2348}
2349
2350static SV *
2351coro_avp_pop_and_free (pTHX_ AV **avp)
2352{
2353 AV *av = *avp;
2354 SV *res = av_pop (av);
2355
2356 if (AvFILLp (av) < 0)
2357 {
2358 *avp = 0;
2359 SvREFCNT_dec (av);
2360 }
2361
2362 return res;
2363}
2364
2365static void
2366coro_pop_on_enter (pTHX_ void *coro)
2367{
2368 SV *cb = coro_avp_pop_and_free (aTHX_ &((struct coro *)coro)->on_enter);
2369 SvREFCNT_dec (cb);
2370}
2371
2372static void
2373coro_pop_on_leave (pTHX_ void *coro)
2374{
2375 SV *cb = coro_avp_pop_and_free (aTHX_ &((struct coro *)coro)->on_leave);
2376 on_enterleave_call (aTHX_ sv_2mortal (cb));
2377}
2378
2379/*****************************************************************************/
2380/* PerlIO::cede */
2381
2382typedef struct
2383{
2384 PerlIOBuf base;
2385 NV next, every;
2386} PerlIOCede;
2387
2388static IV
2389PerlIOCede_pushed (pTHX_ PerlIO *f, const char *mode, SV *arg, PerlIO_funcs *tab)
2390{
2391 PerlIOCede *self = PerlIOSelf (f, PerlIOCede);
2392
2393 self->every = SvCUR (arg) ? SvNV (arg) : 0.01;
2394 self->next = nvtime () + self->every;
2395
2396 return PerlIOBuf_pushed (aTHX_ f, mode, Nullsv, tab);
2397}
2398
2399static SV *
2400PerlIOCede_getarg (pTHX_ PerlIO *f, CLONE_PARAMS *param, int flags)
2401{
2402 PerlIOCede *self = PerlIOSelf (f, PerlIOCede);
2403
2404 return newSVnv (self->every);
2405}
2406
2407static IV
2408PerlIOCede_flush (pTHX_ PerlIO *f)
2409{
2410 PerlIOCede *self = PerlIOSelf (f, PerlIOCede);
2411 double now = nvtime ();
2412
2413 if (now >= self->next)
2414 {
2415 api_cede (aTHX);
2416 self->next = now + self->every;
2417 }
2418
2419 return PerlIOBuf_flush (aTHX_ f);
2420}
2421
2422static PerlIO_funcs PerlIO_cede =
2423{
2424 sizeof(PerlIO_funcs),
2425 "cede",
2426 sizeof(PerlIOCede),
2427 PERLIO_K_DESTRUCT | PERLIO_K_RAW,
2428 PerlIOCede_pushed,
2429 PerlIOBuf_popped,
2430 PerlIOBuf_open,
2431 PerlIOBase_binmode,
2432 PerlIOCede_getarg,
2433 PerlIOBase_fileno,
2434 PerlIOBuf_dup,
2435 PerlIOBuf_read,
2436 PerlIOBuf_unread,
2437 PerlIOBuf_write,
2438 PerlIOBuf_seek,
2439 PerlIOBuf_tell,
2440 PerlIOBuf_close,
2441 PerlIOCede_flush,
2442 PerlIOBuf_fill,
2443 PerlIOBase_eof,
2444 PerlIOBase_error,
2445 PerlIOBase_clearerr,
2446 PerlIOBase_setlinebuf,
2447 PerlIOBuf_get_base,
2448 PerlIOBuf_bufsiz,
2449 PerlIOBuf_get_ptr,
2450 PerlIOBuf_get_cnt,
2451 PerlIOBuf_set_ptrcnt,
2452};
2453
2454/*****************************************************************************/
2455/* Coro::Semaphore & Coro::Signal */
2456
2457static SV *
2458coro_waitarray_new (pTHX_ int count)
2459{
2460 /* a semaphore contains a counter IV in $sem->[0] and any waiters after that */
2461 AV *av = newAV ();
2462 SV **ary;
2463
2464 /* unfortunately, building manually saves memory */
2465 Newx (ary, 2, SV *);
2466 AvALLOC (av) = ary;
2467#if PERL_VERSION_ATLEAST (5,10,0)
2468 AvARRAY (av) = ary;
2469#else
2470 /* 5.8.8 needs this syntax instead of AvARRAY = ary, yet */
2471 /* -DDEBUGGING flags this as a bug, despite it perfectly working */
2472 SvPVX ((SV *)av) = (char *)ary;
2473#endif
2474 AvMAX (av) = 1;
2475 AvFILLp (av) = 0;
2476 ary [0] = newSViv (count);
2477
2478 return newRV_noinc ((SV *)av);
2479}
2480
2481/* semaphore */
2482
2483static void
2484coro_semaphore_adjust (pTHX_ AV *av, IV adjust)
2485{
2486 SV *count_sv = AvARRAY (av)[0];
2487 IV count = SvIVX (count_sv);
2488
2489 count += adjust;
2490 SvIVX (count_sv) = count;
2491
2492 /* now wake up as many waiters as are expected to lock */
2493 while (count > 0 && AvFILLp (av) > 0)
2494 {
2495 SV *cb;
2496
2497 /* swap first two elements so we can shift a waiter */
2498 AvARRAY (av)[0] = AvARRAY (av)[1];
2499 AvARRAY (av)[1] = count_sv;
2500 cb = av_shift (av);
2501
2502 if (SvOBJECT (cb))
2503 {
2504 api_ready (aTHX_ cb);
2505 --count;
2506 }
2507 else if (SvTYPE (cb) == SVt_PVCV)
2508 {
2509 dSP;
2510 PUSHMARK (SP);
2511 XPUSHs (sv_2mortal (newRV_inc ((SV *)av)));
2512 PUTBACK;
2513 call_sv (cb, G_VOID | G_DISCARD | G_EVAL | G_KEEPERR);
2514 }
2515
2516 SvREFCNT_dec (cb);
2517 }
2518}
2519
2520static void
2521coro_semaphore_on_destroy (pTHX_ struct coro *coro)
2522{
2523 /* call $sem->adjust (0) to possibly wake up some other waiters */
2524 coro_semaphore_adjust (aTHX_ (AV *)coro->slf_frame.data, 0);
2525}
2526
2527static int
2528slf_check_semaphore_down_or_wait (pTHX_ struct CoroSLF *frame, int acquire)
2529{
2530 AV *av = (AV *)frame->data;
2531 SV *count_sv = AvARRAY (av)[0];
2532
2533 /* if we are about to throw, don't actually acquire the lock, just throw */
2534 if (CORO_THROW)
2535 return 0;
2536 else if (SvIVX (count_sv) > 0)
2537 {
2538 SvSTATE_current->on_destroy = 0;
2539
2540 if (acquire)
2541 SvIVX (count_sv) = SvIVX (count_sv) - 1;
2542 else
2543 coro_semaphore_adjust (aTHX_ av, 0);
2544
2545 return 0;
2546 }
2547 else
2548 {
2549 int i;
2550 /* if we were woken up but can't down, we look through the whole */
2551 /* waiters list and only add us if we aren't in there already */
2552 /* this avoids some degenerate memory usage cases */
2553
2554 for (i = 1; i <= AvFILLp (av); ++i)
2555 if (AvARRAY (av)[i] == SvRV (coro_current))
2556 return 1;
2557
2558 av_push (av, SvREFCNT_inc (SvRV (coro_current)));
2559 return 1;
2560 }
2561}
2562
2563static int
2564slf_check_semaphore_down (pTHX_ struct CoroSLF *frame)
2565{
2566 return slf_check_semaphore_down_or_wait (aTHX_ frame, 1);
2567}
2568
2569static int
2570slf_check_semaphore_wait (pTHX_ struct CoroSLF *frame)
2571{
2572 return slf_check_semaphore_down_or_wait (aTHX_ frame, 0);
2573}
2574
2575static void
2576slf_init_semaphore_down_or_wait (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2577{
2578 AV *av = (AV *)SvRV (arg [0]);
2579
2580 if (SvIVX (AvARRAY (av)[0]) > 0)
2581 {
2582 frame->data = (void *)av;
2583 frame->prepare = prepare_nop;
2584 }
2585 else
2586 {
2587 av_push (av, SvREFCNT_inc (SvRV (coro_current)));
2588
2589 frame->data = (void *)sv_2mortal (SvREFCNT_inc ((SV *)av));
2590 frame->prepare = prepare_schedule;
2591
2592 /* to avoid race conditions when a woken-up coro gets terminated */
2593 /* we arrange for a temporary on_destroy that calls adjust (0) */
2594 SvSTATE_current->on_destroy = coro_semaphore_on_destroy;
2595 }
2596}
2597
2598static void
2599slf_init_semaphore_down (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2600{
2601 slf_init_semaphore_down_or_wait (aTHX_ frame, cv, arg, items);
2602 frame->check = slf_check_semaphore_down;
2603}
2604
2605static void
2606slf_init_semaphore_wait (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2607{
2608 if (items >= 2)
2609 {
2610 /* callback form */
2611 AV *av = (AV *)SvRV (arg [0]);
2612 CV *cb_cv = coro_sv_2cv (aTHX_ arg [1]);
2613
2614 av_push (av, (SV *)SvREFCNT_inc_NN (cb_cv));
2615
2616 if (SvIVX (AvARRAY (av)[0]) > 0)
2617 coro_semaphore_adjust (aTHX_ av, 0);
2618
2619 frame->prepare = prepare_nop;
2620 frame->check = slf_check_nop;
2621 }
2622 else
2623 {
2624 slf_init_semaphore_down_or_wait (aTHX_ frame, cv, arg, items);
2625 frame->check = slf_check_semaphore_wait;
2626 }
2627}
2628
2629/* signal */
2630
2631static void
2632coro_signal_wake (pTHX_ AV *av, int count)
2633{
2634 SvIVX (AvARRAY (av)[0]) = 0;
2635
2636 /* now signal count waiters */
2637 while (count > 0 && AvFILLp (av) > 0)
2638 {
2639 SV *cb;
2640
2641 /* swap first two elements so we can shift a waiter */
2642 cb = AvARRAY (av)[0];
2643 AvARRAY (av)[0] = AvARRAY (av)[1];
2644 AvARRAY (av)[1] = cb;
2645
2646 cb = av_shift (av);
2647
2648 api_ready (aTHX_ cb);
2649 sv_setiv (cb, 0); /* signal waiter */
2650 SvREFCNT_dec (cb);
2651
2652 --count;
2653 }
2654}
2655
2656static int
2657slf_check_signal_wait (pTHX_ struct CoroSLF *frame)
2658{
2659 /* if we are about to throw, also stop waiting */
2660 return SvROK ((SV *)frame->data) && !CORO_THROW;
2661}
2662
2663static void
2664slf_init_signal_wait (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2665{
2666 AV *av = (AV *)SvRV (arg [0]);
2667
2668 if (SvIVX (AvARRAY (av)[0]))
2669 {
2670 SvIVX (AvARRAY (av)[0]) = 0;
2671 frame->prepare = prepare_nop;
2672 frame->check = slf_check_nop;
2673 }
2674 else
2675 {
2676 SV *waiter = newRV_inc (SvRV (coro_current)); /* owned by signal av */
2677
2678 av_push (av, waiter);
2679
2680 frame->data = (void *)sv_2mortal (SvREFCNT_inc_NN (waiter)); /* owned by process */
2681 frame->prepare = prepare_schedule;
2682 frame->check = slf_check_signal_wait;
2683 }
2684}
2685
2686/*****************************************************************************/
2687/* Coro::AIO */
2688
2689#define CORO_MAGIC_type_aio PERL_MAGIC_ext
2690
2691/* helper storage struct */
2692struct io_state
2693{
2694 int errorno;
2695 I32 laststype; /* U16 in 5.10.0 */
2696 int laststatval;
2697 Stat_t statcache;
2698};
2699
2700static void
2701coro_aio_callback (pTHX_ CV *cv)
2702{
2703 dXSARGS;
2704 AV *state = (AV *)GENSUB_ARG;
2705 SV *coro = av_pop (state);
2706 SV *data_sv = newSV (sizeof (struct io_state));
2707
2708 av_extend (state, items - 1);
2709
2710 sv_upgrade (data_sv, SVt_PV);
2711 SvCUR_set (data_sv, sizeof (struct io_state));
2712 SvPOK_only (data_sv);
2713
2714 {
2715 struct io_state *data = (struct io_state *)SvPVX (data_sv);
2716
2717 data->errorno = errno;
2718 data->laststype = PL_laststype;
2719 data->laststatval = PL_laststatval;
2720 data->statcache = PL_statcache;
2721 }
2722
2723 /* now build the result vector out of all the parameters and the data_sv */
2724 {
2725 int i;
2726
2727 for (i = 0; i < items; ++i)
2728 av_push (state, SvREFCNT_inc_NN (ST (i)));
2729 }
2730
2731 av_push (state, data_sv);
2732
2733 api_ready (aTHX_ coro);
2734 SvREFCNT_dec (coro);
2735 SvREFCNT_dec ((AV *)state);
2736}
2737
2738static int
2739slf_check_aio_req (pTHX_ struct CoroSLF *frame)
2740{
2741 AV *state = (AV *)frame->data;
2742
2743 /* if we are about to throw, return early */
2744 /* this does not cancel the aio request, but at least */
2745 /* it quickly returns */
2746 if (CORO_THROW)
2747 return 0;
2748
2749 /* one element that is an RV? repeat! */
2750 if (AvFILLp (state) == 0 && SvROK (AvARRAY (state)[0]))
2751 return 1;
2752
2753 /* restore status */
2754 {
2755 SV *data_sv = av_pop (state);
2756 struct io_state *data = (struct io_state *)SvPVX (data_sv);
2757
2758 errno = data->errorno;
2759 PL_laststype = data->laststype;
2760 PL_laststatval = data->laststatval;
2761 PL_statcache = data->statcache;
2762
2763 SvREFCNT_dec (data_sv);
2764 }
2765
2766 /* push result values */
2767 {
2768 dSP;
2769 int i;
2770
2771 EXTEND (SP, AvFILLp (state) + 1);
2772 for (i = 0; i <= AvFILLp (state); ++i)
2773 PUSHs (sv_2mortal (SvREFCNT_inc_NN (AvARRAY (state)[i])));
2774
2775 PUTBACK;
2776 }
2777
2778 return 0;
2779}
2780
2781static void
2782slf_init_aio_req (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2783{
2784 AV *state = (AV *)sv_2mortal ((SV *)newAV ());
2785 SV *coro_hv = SvRV (coro_current);
2786 struct coro *coro = SvSTATE_hv (coro_hv);
2787
2788 /* put our coroutine id on the state arg */
2789 av_push (state, SvREFCNT_inc_NN (coro_hv));
2790
2791 /* first see whether we have a non-zero priority and set it as AIO prio */
2792 if (coro->prio)
2793 {
2794 dSP;
2795
2796 static SV *prio_cv;
2797 static SV *prio_sv;
2798
2799 if (expect_false (!prio_cv))
2800 {
2801 prio_cv = (SV *)get_cv ("IO::AIO::aioreq_pri", 0);
2802 prio_sv = newSViv (0);
2803 }
2804
2805 PUSHMARK (SP);
2806 sv_setiv (prio_sv, coro->prio);
2807 XPUSHs (prio_sv);
2808
2809 PUTBACK;
2810 call_sv (prio_cv, G_VOID | G_DISCARD);
2811 }
2812
2813 /* now call the original request */
2814 {
2815 dSP;
2816 CV *req = (CV *)CORO_MAGIC_NN ((SV *)cv, CORO_MAGIC_type_aio)->mg_obj;
2817 int i;
2818
2819 PUSHMARK (SP);
2820
2821 /* first push all args to the stack */
2822 EXTEND (SP, items + 1);
2823
2824 for (i = 0; i < items; ++i)
2825 PUSHs (arg [i]);
2826
2827 /* now push the callback closure */
2828 PUSHs (sv_2mortal (gensub (aTHX_ coro_aio_callback, (void *)SvREFCNT_inc_NN ((SV *)state))));
2829
2830 /* now call the AIO function - we assume our request is uncancelable */
2831 PUTBACK;
2832 call_sv ((SV *)req, G_VOID | G_DISCARD);
2833 }
2834
2835 /* now that the requets is going, we loop toll we have a result */
2836 frame->data = (void *)state;
2837 frame->prepare = prepare_schedule;
2838 frame->check = slf_check_aio_req;
2839}
2840
2841static void
2842coro_aio_req_xs (pTHX_ CV *cv)
2843{
2844 dXSARGS;
2845
2846 CORO_EXECUTE_SLF_XS (slf_init_aio_req);
2847
2848 XSRETURN_EMPTY;
2849}
2850
2851/*****************************************************************************/
2852
2853#if CORO_CLONE
2854# include "clone.c"
2855#endif
2856
2857MODULE = Coro::State PACKAGE = Coro::State PREFIX = api_
2858
2859PROTOTYPES: DISABLE
2860
2861BOOT:
2862{
2863#ifdef USE_ITHREADS
2864# if CORO_PTHREAD
2865 coro_thx = PERL_GET_CONTEXT;
2866# endif
2867#endif
2868 BOOT_PAGESIZE;
2869
2870 cctx_current = cctx_new_empty ();
2871
2872 irsgv = gv_fetchpv ("/" , GV_ADD|GV_NOTQUAL, SVt_PV);
2873 stdoutgv = gv_fetchpv ("STDOUT", GV_ADD|GV_NOTQUAL, SVt_PVIO);
2874
2875 orig_sigelem_get = PL_vtbl_sigelem.svt_get; PL_vtbl_sigelem.svt_get = coro_sigelem_get;
2876 orig_sigelem_set = PL_vtbl_sigelem.svt_set; PL_vtbl_sigelem.svt_set = coro_sigelem_set;
2877 orig_sigelem_clr = PL_vtbl_sigelem.svt_clear; PL_vtbl_sigelem.svt_clear = coro_sigelem_clr;
2878
2879 hv_sig = coro_get_hv (aTHX_ "SIG", TRUE);
2880 rv_diehook = newRV_inc ((SV *)gv_fetchpv ("Coro::State::diehook" , 0, SVt_PVCV));
2881 rv_warnhook = newRV_inc ((SV *)gv_fetchpv ("Coro::State::warnhook", 0, SVt_PVCV));
2882
2883 coro_state_stash = gv_stashpv ("Coro::State", TRUE);
2884
2885 newCONSTSUB (coro_state_stash, "CC_TRACE" , newSViv (CC_TRACE));
2886 newCONSTSUB (coro_state_stash, "CC_TRACE_SUB" , newSViv (CC_TRACE_SUB));
2887 newCONSTSUB (coro_state_stash, "CC_TRACE_LINE", newSViv (CC_TRACE_LINE));
2888 newCONSTSUB (coro_state_stash, "CC_TRACE_ALL" , newSViv (CC_TRACE_ALL));
2889
2890 main_mainstack = PL_mainstack;
2891 main_top_env = PL_top_env;
2892
2893 while (main_top_env->je_prev)
2894 main_top_env = main_top_env->je_prev;
2895
2896 {
2897 SV *slf = sv_2mortal (newSViv (PTR2IV (pp_slf)));
2898
2899 if (!PL_custom_op_names) PL_custom_op_names = newHV ();
2900 hv_store_ent (PL_custom_op_names, slf, newSVpv ("coro_slf", 0), 0);
2901
2902 if (!PL_custom_op_descs) PL_custom_op_descs = newHV ();
2903 hv_store_ent (PL_custom_op_descs, slf, newSVpv ("coro schedule like function", 0), 0);
2904 }
2905
2906 coroapi.ver = CORO_API_VERSION;
2907 coroapi.rev = CORO_API_REVISION;
2908
2909 coroapi.transfer = api_transfer;
2910
2911 coroapi.sv_state = SvSTATE_;
2912 coroapi.execute_slf = api_execute_slf;
2913 coroapi.prepare_nop = prepare_nop;
2914 coroapi.prepare_schedule = prepare_schedule;
2915 coroapi.prepare_cede = prepare_cede;
2916 coroapi.prepare_cede_notself = prepare_cede_notself;
2917
2918 {
2919 SV **svp = hv_fetch (PL_modglobal, "Time::NVtime", 12, 0);
2920
2921 if (!svp) croak ("Time::HiRes is required");
2922 if (!SvIOK (*svp)) croak ("Time::NVtime isn't a function pointer");
2923
2924 nvtime = INT2PTR (double (*)(), SvIV (*svp));
2925 }
2926
2927 assert (("PRIO_NORMAL must be 0", !PRIO_NORMAL));
2928}
2929
2930SV *
2931new (char *klass, ...)
2932 ALIAS:
2933 Coro::new = 1
2934 CODE:
2935{
2936 struct coro *coro;
2937 MAGIC *mg;
2938 HV *hv;
2939 CV *cb;
2940 int i;
2941
2942 if (items > 1)
2943 {
2944 cb = coro_sv_2cv (aTHX_ ST (1));
2945
2946 if (!ix)
235 { 2947 {
236 /* I never used formats, so how should I know how these are implemented? */ 2948 if (CvISXSUB (cb))
237 /* my bold guess is as a simple, plain sub... */ 2949 croak ("Coro::State doesn't support XS functions as coroutine start, caught");
238 croak ("CXt_FORMAT not yet handled. Don't switch coroutines from within formats"); 2950
2951 if (!CvROOT (cb))
2952 croak ("Coro::State doesn't support autoloaded or undefined functions as coroutine start, caught");
239 } 2953 }
240 } 2954 }
241 2955
242 if (top_si->si_type == PERLSI_MAIN)
243 break;
244
245 top_si = top_si->si_prev;
246 ccstk = top_si->si_cxstack;
247 cxix = top_si->si_cxix;
248 }
249
250 PUTBACK;
251 }
252
253 c->dowarn = PL_dowarn;
254 c->defav = GvAV (PL_defgv);
255 c->curstackinfo = PL_curstackinfo;
256 c->curstack = PL_curstack;
257 c->mainstack = PL_mainstack;
258 c->stack_sp = PL_stack_sp;
259 c->op = PL_op;
260 c->curpad = PL_curpad;
261 c->stack_base = PL_stack_base;
262 c->stack_max = PL_stack_max;
263 c->tmps_stack = PL_tmps_stack;
264 c->tmps_floor = PL_tmps_floor;
265 c->tmps_ix = PL_tmps_ix;
266 c->tmps_max = PL_tmps_max;
267 c->markstack = PL_markstack;
268 c->markstack_ptr = PL_markstack_ptr;
269 c->markstack_max = PL_markstack_max;
270 c->scopestack = PL_scopestack;
271 c->scopestack_ix = PL_scopestack_ix;
272 c->scopestack_max = PL_scopestack_max;
273 c->savestack = PL_savestack;
274 c->savestack_ix = PL_savestack_ix;
275 c->savestack_max = PL_savestack_max;
276 c->retstack = PL_retstack;
277 c->retstack_ix = PL_retstack_ix;
278 c->retstack_max = PL_retstack_max;
279 c->curcop = PL_curcop;
280}
281
282#define LOAD(state) do { load_state(aTHX_ state); SPAGAIN; } while (0)
283#define SAVE(state) do { PUTBACK; save_state(aTHX_ state); } while (0)
284
285static void
286load_state(pTHX_ Coro__State c)
287{
288 PL_dowarn = c->dowarn;
289 GvAV (PL_defgv) = c->defav;
290 PL_curstackinfo = c->curstackinfo;
291 PL_curstack = c->curstack;
292 PL_mainstack = c->mainstack;
293 PL_stack_sp = c->stack_sp;
294 PL_op = c->op;
295 PL_curpad = c->curpad;
296 PL_stack_base = c->stack_base;
297 PL_stack_max = c->stack_max;
298 PL_tmps_stack = c->tmps_stack;
299 PL_tmps_floor = c->tmps_floor;
300 PL_tmps_ix = c->tmps_ix;
301 PL_tmps_max = c->tmps_max;
302 PL_markstack = c->markstack;
303 PL_markstack_ptr = c->markstack_ptr;
304 PL_markstack_max = c->markstack_max;
305 PL_scopestack = c->scopestack;
306 PL_scopestack_ix = c->scopestack_ix;
307 PL_scopestack_max = c->scopestack_max;
308 PL_savestack = c->savestack;
309 PL_savestack_ix = c->savestack_ix;
310 PL_savestack_max = c->savestack_max;
311 PL_retstack = c->retstack;
312 PL_retstack_ix = c->retstack_ix;
313 PL_retstack_max = c->retstack_max;
314 PL_curcop = c->curcop;
315
316 {
317 dSP;
318 CV *cv;
319
320 /* now do the ugly restore mess */
321 while ((cv = (CV *)POPs))
322 {
323 AV *padlist = (AV *)POPs;
324
325 put_padlist (cv);
326 CvPADLIST(cv) = padlist;
327 CvDEPTH(cv) = (I32)POPs;
328
329#ifdef USE_THREADS
330 CvOWNER(cv) = (struct perl_thread *)POPs;
331 error does not work either
332#endif
333 }
334
335 PUTBACK;
336 }
337}
338
339/* this is an EXACT copy of S_nuke_stacks in perl.c, which is unfortunately static */
340STATIC void
341destroy_stacks(pTHX)
342{
343 /* die does this while calling POPSTACK, but I just don't see why. */
344 /* OTOH, die does not have a memleak, but we do... */
345 dounwind(-1);
346
347 /* is this ugly, I ask? */
348 while (PL_scopestack_ix)
349 LEAVE;
350
351 while (PL_curstackinfo->si_next)
352 PL_curstackinfo = PL_curstackinfo->si_next;
353
354 while (PL_curstackinfo)
355 {
356 PERL_SI *p = PL_curstackinfo->si_prev;
357
358 SvREFCNT_dec(PL_curstackinfo->si_stack);
359 Safefree(PL_curstackinfo->si_cxstack);
360 Safefree(PL_curstackinfo);
361 PL_curstackinfo = p;
362 }
363
364 if (PL_scopestack_ix != 0)
365 Perl_warner(aTHX_ WARN_INTERNAL,
366 "Unbalanced scopes: %ld more ENTERs than LEAVEs\n",
367 (long)PL_scopestack_ix);
368 if (PL_savestack_ix != 0)
369 Perl_warner(aTHX_ WARN_INTERNAL,
370 "Unbalanced saves: %ld more saves than restores\n",
371 (long)PL_savestack_ix);
372 if (PL_tmps_floor != -1)
373 Perl_warner(aTHX_ WARN_INTERNAL,"Unbalanced tmps: %ld more allocs than frees\n",
374 (long)PL_tmps_floor + 1);
375 /*
376 */
377 Safefree(PL_tmps_stack);
378 Safefree(PL_markstack);
379 Safefree(PL_scopestack);
380 Safefree(PL_savestack);
381 Safefree(PL_retstack);
382}
383
384#define SUB_INIT "Coro::State::_newcoro"
385
386MODULE = Coro::State PACKAGE = Coro::State
387
388PROTOTYPES: ENABLE
389
390BOOT:
391 if (!padlist_cache)
392 padlist_cache = newHV ();
393
394Coro::State
395_newprocess(args)
396 SV * args
397 PROTOTYPE: $
398 CODE:
399 Coro__State coro;
400
401 if (!SvROK (args) || SvTYPE (SvRV (args)) != SVt_PVAV)
402 croak ("Coro::State::newprocess expects an arrayref");
403
404 New (0, coro, 1, struct coro); 2956 Newz (0, coro, 1, struct coro);
2957 coro->args = newAV ();
2958 coro->flags = CF_NEW;
405 2959
406 coro->mainstack = 0; /* actual work is done inside transfer */ 2960 if (coro_first) coro_first->prev = coro;
407 coro->args = (AV *)SvREFCNT_inc (SvRV (args)); 2961 coro->next = coro_first;
2962 coro_first = coro;
408 2963
409 RETVAL = coro; 2964 coro->hv = hv = newHV ();
2965 mg = sv_magicext ((SV *)hv, 0, CORO_MAGIC_type_state, &coro_state_vtbl, (char *)coro, 0);
2966 mg->mg_flags |= MGf_DUP;
2967 RETVAL = sv_bless (newRV_noinc ((SV *)hv), gv_stashpv (klass, 1));
2968
2969 if (items > 1)
2970 {
2971 av_extend (coro->args, items - 1 + ix - 1);
2972
2973 if (ix)
2974 {
2975 av_push (coro->args, SvREFCNT_inc_NN ((SV *)cb));
2976 cb = cv_coro_run;
2977 }
2978
2979 coro->startcv = (CV *)SvREFCNT_inc_NN ((SV *)cb);
2980
2981 for (i = 2; i < items; i++)
2982 av_push (coro->args, newSVsv (ST (i)));
2983 }
2984}
410 OUTPUT: 2985 OUTPUT:
411 RETVAL 2986 RETVAL
412 2987
413void 2988void
414transfer(prev,next) 2989transfer (...)
415 Coro::State_or_hashref prev 2990 PROTOTYPE: $$
416 Coro::State_or_hashref next 2991 CODE:
417 CODE: 2992 CORO_EXECUTE_SLF_XS (slf_init_transfer);
418 2993
419 if (prev != next) 2994bool
2995_destroy (SV *coro_sv)
2996 CODE:
2997 RETVAL = coro_state_destroy (aTHX_ SvSTATE (coro_sv));
2998 OUTPUT:
2999 RETVAL
3000
3001void
3002_exit (int code)
3003 PROTOTYPE: $
3004 CODE:
3005 _exit (code);
3006
3007SV *
3008clone (Coro::State coro)
3009 CODE:
3010{
3011#if CORO_CLONE
3012 struct coro *ncoro = coro_clone (aTHX_ coro);
3013 MAGIC *mg;
3014 /* TODO: too much duplication */
3015 ncoro->hv = newHV ();
3016 mg = sv_magicext ((SV *)ncoro->hv, 0, CORO_MAGIC_type_state, &coro_state_vtbl, (char *)ncoro, 0);
3017 mg->mg_flags |= MGf_DUP;
3018 RETVAL = sv_bless (newRV_noinc ((SV *)ncoro->hv), SvSTASH (coro->hv));
3019#else
3020 croak ("Coro::State->clone has not been configured into this installation of Coro, realised");
3021#endif
3022}
3023 OUTPUT:
3024 RETVAL
3025
3026int
3027cctx_stacksize (int new_stacksize = 0)
3028 PROTOTYPE: ;$
3029 CODE:
3030 RETVAL = cctx_stacksize;
3031 if (new_stacksize)
420 { 3032 {
3033 cctx_stacksize = new_stacksize;
3034 ++cctx_gen;
421 /* 3035 }
422 * this could be done in newprocess which would lead to 3036 OUTPUT:
423 * extremely elegant and fast (just SAVE/LOAD) 3037 RETVAL
424 * code here, but lazy allocation of stacks has also 3038
425 * some virtues and the overhead of the if() is nil. 3039int
3040cctx_max_idle (int max_idle = 0)
3041 PROTOTYPE: ;$
3042 CODE:
3043 RETVAL = cctx_max_idle;
3044 if (max_idle > 1)
3045 cctx_max_idle = max_idle;
3046 OUTPUT:
3047 RETVAL
3048
3049int
3050cctx_count ()
3051 PROTOTYPE:
3052 CODE:
3053 RETVAL = cctx_count;
3054 OUTPUT:
3055 RETVAL
3056
3057int
3058cctx_idle ()
3059 PROTOTYPE:
3060 CODE:
3061 RETVAL = cctx_idle;
3062 OUTPUT:
3063 RETVAL
3064
3065void
3066list ()
3067 PROTOTYPE:
3068 PPCODE:
3069{
3070 struct coro *coro;
3071 for (coro = coro_first; coro; coro = coro->next)
3072 if (coro->hv)
3073 XPUSHs (sv_2mortal (newRV_inc ((SV *)coro->hv)));
3074}
3075
3076void
3077call (Coro::State coro, SV *coderef)
3078 ALIAS:
3079 eval = 1
3080 CODE:
3081{
3082 if (coro->mainstack && ((coro->flags & CF_RUNNING) || coro->slot))
426 */ 3083 {
427 if (next->mainstack) 3084 struct coro *current = SvSTATE_current;
3085
3086 if (current != coro)
428 { 3087 {
429 SAVE (prev); 3088 PUTBACK;
430 LOAD (next); 3089 save_perl (aTHX_ current);
431 /* mark this state as in-use */ 3090 load_perl (aTHX_ coro);
432 next->mainstack = 0; 3091 SPAGAIN;
433 next->tmps_ix = -2;
434 } 3092 }
435 else if (next->tmps_ix == -2) 3093
3094 PUSHSTACK;
3095
3096 PUSHMARK (SP);
3097 PUTBACK;
3098
3099 if (ix)
3100 eval_sv (coderef, 0);
3101 else
3102 call_sv (coderef, G_KEEPERR | G_EVAL | G_VOID | G_DISCARD);
3103
3104 POPSTACK;
3105 SPAGAIN;
3106
3107 if (current != coro)
436 { 3108 {
437 croak ("tried to transfer to running coroutine");
438 }
439 else
440 {
441 SAVE (prev);
442
443 /* 3109 PUTBACK;
444 * emulate part of the perl startup here. 3110 save_perl (aTHX_ coro);
445 */ 3111 load_perl (aTHX_ current);
446 UNOP myop;
447
448 init_stacks (); /* from perl.c */
449 PL_op = (OP *)&myop;
450 /*PL_curcop = 0;*/
451 GvAV (PL_defgv) = (AV *)SvREFCNT_inc ((SV *)next->args);
452
453 SPAGAIN; 3112 SPAGAIN;
454 Zero(&myop, 1, UNOP);
455 myop.op_next = Nullop;
456 myop.op_flags = OPf_WANT_VOID;
457
458 PUSHMARK(SP);
459 XPUSHs ((SV*)get_cv(SUB_INIT, TRUE));
460 PUTBACK;
461 /*
462 * the next line is slightly wrong, as PL_op->op_next
463 * is actually being executed so we skip the first op.
464 * that doesn't matter, though, since it is only
465 * pp_nextstate and we never return...
466 */
467 PL_op = Perl_pp_entersub(aTHX);
468 SPAGAIN;
469
470 ENTER;
471 } 3113 }
472 } 3114 }
3115}
3116
3117SV *
3118is_ready (Coro::State coro)
3119 PROTOTYPE: $
3120 ALIAS:
3121 is_ready = CF_READY
3122 is_running = CF_RUNNING
3123 is_new = CF_NEW
3124 is_destroyed = CF_DESTROYED
3125 is_suspended = CF_SUSPENDED
3126 CODE:
3127 RETVAL = boolSV (coro->flags & ix);
3128 OUTPUT:
3129 RETVAL
473 3130
474void 3131void
475DESTROY(coro) 3132throw (Coro::State self, SV *throw = &PL_sv_undef)
476 Coro::State coro 3133 PROTOTYPE: $;$
477 CODE: 3134 CODE:
3135{
3136 struct coro *current = SvSTATE_current;
3137 SV **throwp = self == current ? &CORO_THROW : &self->except;
3138 SvREFCNT_dec (*throwp);
3139 *throwp = SvOK (throw) ? newSVsv (throw) : 0;
3140}
478 3141
479 if (coro->mainstack) 3142void
3143api_trace (SV *coro, int flags = CC_TRACE | CC_TRACE_SUB)
3144 PROTOTYPE: $;$
3145 C_ARGS: aTHX_ coro, flags
3146
3147SV *
3148has_cctx (Coro::State coro)
3149 PROTOTYPE: $
3150 CODE:
3151 /* maybe manage the running flag differently */
3152 RETVAL = boolSV (!!coro->cctx || (coro->flags & CF_RUNNING));
3153 OUTPUT:
3154 RETVAL
3155
3156int
3157is_traced (Coro::State coro)
3158 PROTOTYPE: $
3159 CODE:
3160 RETVAL = (coro->cctx ? coro->cctx->flags : 0) & CC_TRACE_ALL;
3161 OUTPUT:
3162 RETVAL
3163
3164UV
3165rss (Coro::State coro)
3166 PROTOTYPE: $
3167 ALIAS:
3168 usecount = 1
3169 CODE:
3170 switch (ix)
3171 {
3172 case 0: RETVAL = coro_rss (aTHX_ coro); break;
3173 case 1: RETVAL = coro->usecount; break;
3174 }
3175 OUTPUT:
3176 RETVAL
3177
3178void
3179force_cctx ()
3180 PROTOTYPE:
3181 CODE:
3182 cctx_current->idle_sp = 0;
3183
3184void
3185swap_defsv (Coro::State self)
3186 PROTOTYPE: $
3187 ALIAS:
3188 swap_defav = 1
3189 CODE:
3190 if (!self->slot)
3191 croak ("cannot swap state with coroutine that has no saved state,");
3192 else
480 { 3193 {
481 struct coro temp; 3194 SV **src = ix ? (SV **)&GvAV (PL_defgv) : &GvSV (PL_defgv);
3195 SV **dst = ix ? (SV **)&self->slot->defav : (SV **)&self->slot->defsv;
482 3196
483 SAVE(aTHX_ (&temp)); 3197 SV *tmp = *src; *src = *dst; *dst = tmp;
484 LOAD(aTHX_ coro);
485
486 destroy_stacks ();
487 SvREFCNT_dec ((SV *)GvAV (PL_defgv));
488
489 LOAD((&temp));
490 } 3198 }
491 3199
3200void
3201cancel (Coro::State self)
3202 CODE:
3203 coro_state_destroy (aTHX_ self);
3204 coro_call_on_destroy (aTHX_ self); /* actually only for Coro objects */
3205
3206
3207MODULE = Coro::State PACKAGE = Coro
3208
3209BOOT:
3210{
3211 int i;
3212
3213 sv_pool_rss = coro_get_sv (aTHX_ "Coro::POOL_RSS" , TRUE);
3214 sv_pool_size = coro_get_sv (aTHX_ "Coro::POOL_SIZE" , TRUE);
3215 cv_coro_run = get_cv ( "Coro::_coro_run" , GV_ADD);
3216 cv_coro_terminate = get_cv ( "Coro::terminate" , GV_ADD);
3217 coro_current = coro_get_sv (aTHX_ "Coro::current" , FALSE); SvREADONLY_on (coro_current);
3218 av_async_pool = coro_get_av (aTHX_ "Coro::async_pool", TRUE);
3219 av_destroy = coro_get_av (aTHX_ "Coro::destroy" , TRUE);
3220 sv_manager = coro_get_sv (aTHX_ "Coro::manager" , TRUE);
3221 sv_idle = coro_get_sv (aTHX_ "Coro::idle" , TRUE);
3222
3223 sv_async_pool_idle = newSVpv ("[async pool idle]", 0); SvREADONLY_on (sv_async_pool_idle);
3224 sv_Coro = newSVpv ("Coro", 0); SvREADONLY_on (sv_Coro);
3225 cv_pool_handler = get_cv ("Coro::pool_handler", GV_ADD); SvREADONLY_on (cv_pool_handler);
3226 cv_coro_state_new = get_cv ("Coro::State::new", 0); SvREADONLY_on (cv_coro_state_new);
3227
3228 coro_stash = gv_stashpv ("Coro", TRUE);
3229
3230 newCONSTSUB (coro_stash, "PRIO_MAX", newSViv (PRIO_MAX));
3231 newCONSTSUB (coro_stash, "PRIO_HIGH", newSViv (PRIO_HIGH));
3232 newCONSTSUB (coro_stash, "PRIO_NORMAL", newSViv (PRIO_NORMAL));
3233 newCONSTSUB (coro_stash, "PRIO_LOW", newSViv (PRIO_LOW));
3234 newCONSTSUB (coro_stash, "PRIO_IDLE", newSViv (PRIO_IDLE));
3235 newCONSTSUB (coro_stash, "PRIO_MIN", newSViv (PRIO_MIN));
3236
3237 for (i = PRIO_MAX - PRIO_MIN + 1; i--; )
3238 coro_ready[i] = newAV ();
3239
3240 {
3241 SV *sv = coro_get_sv (aTHX_ "Coro::API", TRUE);
3242
3243 coroapi.schedule = api_schedule;
3244 coroapi.schedule_to = api_schedule_to;
3245 coroapi.cede = api_cede;
3246 coroapi.cede_notself = api_cede_notself;
3247 coroapi.ready = api_ready;
3248 coroapi.is_ready = api_is_ready;
3249 coroapi.nready = coro_nready;
3250 coroapi.current = coro_current;
3251
3252 /*GCoroAPI = &coroapi;*/
3253 sv_setiv (sv, (IV)&coroapi);
3254 SvREADONLY_on (sv);
3255 }
3256}
3257
3258void
3259terminate (...)
3260 CODE:
3261 CORO_EXECUTE_SLF_XS (slf_init_terminate);
3262
3263void
3264schedule (...)
3265 CODE:
3266 CORO_EXECUTE_SLF_XS (slf_init_schedule);
3267
3268void
3269schedule_to (...)
3270 CODE:
3271 CORO_EXECUTE_SLF_XS (slf_init_schedule_to);
3272
3273void
3274cede_to (...)
3275 CODE:
3276 CORO_EXECUTE_SLF_XS (slf_init_cede_to);
3277
3278void
3279cede (...)
3280 CODE:
3281 CORO_EXECUTE_SLF_XS (slf_init_cede);
3282
3283void
3284cede_notself (...)
3285 CODE:
3286 CORO_EXECUTE_SLF_XS (slf_init_cede_notself);
3287
3288void
3289_set_current (SV *current)
3290 PROTOTYPE: $
3291 CODE:
3292 SvREFCNT_dec (SvRV (coro_current));
3293 SvRV_set (coro_current, SvREFCNT_inc_NN (SvRV (current)));
3294
3295void
3296_set_readyhook (SV *hook)
3297 PROTOTYPE: $
3298 CODE:
492 SvREFCNT_dec (coro->args); 3299 SvREFCNT_dec (coro_readyhook);
493 Safefree (coro); 3300 coro_readyhook = SvOK (hook) ? newSVsv (hook) : 0;
494 3301
3302int
3303prio (Coro::State coro, int newprio = 0)
3304 PROTOTYPE: $;$
3305 ALIAS:
3306 nice = 1
3307 CODE:
3308{
3309 RETVAL = coro->prio;
495 3310
3311 if (items > 1)
3312 {
3313 if (ix)
3314 newprio = coro->prio - newprio;
3315
3316 if (newprio < PRIO_MIN) newprio = PRIO_MIN;
3317 if (newprio > PRIO_MAX) newprio = PRIO_MAX;
3318
3319 coro->prio = newprio;
3320 }
3321}
3322 OUTPUT:
3323 RETVAL
3324
3325SV *
3326ready (SV *self)
3327 PROTOTYPE: $
3328 CODE:
3329 RETVAL = boolSV (api_ready (aTHX_ self));
3330 OUTPUT:
3331 RETVAL
3332
3333int
3334nready (...)
3335 PROTOTYPE:
3336 CODE:
3337 RETVAL = coro_nready;
3338 OUTPUT:
3339 RETVAL
3340
3341void
3342suspend (Coro::State self)
3343 PROTOTYPE: $
3344 CODE:
3345 self->flags |= CF_SUSPENDED;
3346
3347void
3348resume (Coro::State self)
3349 PROTOTYPE: $
3350 CODE:
3351 self->flags &= ~CF_SUSPENDED;
3352
3353void
3354_pool_handler (...)
3355 CODE:
3356 CORO_EXECUTE_SLF_XS (slf_init_pool_handler);
3357
3358void
3359async_pool (SV *cv, ...)
3360 PROTOTYPE: &@
3361 PPCODE:
3362{
3363 HV *hv = (HV *)av_pop (av_async_pool);
3364 AV *av = newAV ();
3365 SV *cb = ST (0);
3366 int i;
3367
3368 av_extend (av, items - 2);
3369 for (i = 1; i < items; ++i)
3370 av_push (av, SvREFCNT_inc_NN (ST (i)));
3371
3372 if ((SV *)hv == &PL_sv_undef)
3373 {
3374 PUSHMARK (SP);
3375 EXTEND (SP, 2);
3376 PUSHs (sv_Coro);
3377 PUSHs ((SV *)cv_pool_handler);
3378 PUTBACK;
3379 call_sv ((SV *)cv_coro_state_new, G_SCALAR);
3380 SPAGAIN;
3381
3382 hv = (HV *)SvREFCNT_inc_NN (SvRV (POPs));
3383 }
3384
3385 {
3386 struct coro *coro = SvSTATE_hv (hv);
3387
3388 assert (!coro->invoke_cb);
3389 assert (!coro->invoke_av);
3390 coro->invoke_cb = SvREFCNT_inc (cb);
3391 coro->invoke_av = av;
3392 }
3393
3394 api_ready (aTHX_ (SV *)hv);
3395
3396 if (GIMME_V != G_VOID)
3397 XPUSHs (sv_2mortal (newRV_noinc ((SV *)hv)));
3398 else
3399 SvREFCNT_dec (hv);
3400}
3401
3402SV *
3403rouse_cb ()
3404 PROTOTYPE:
3405 CODE:
3406 RETVAL = coro_new_rouse_cb (aTHX);
3407 OUTPUT:
3408 RETVAL
3409
3410void
3411rouse_wait (...)
3412 PROTOTYPE: ;$
3413 PPCODE:
3414 CORO_EXECUTE_SLF_XS (slf_init_rouse_wait);
3415
3416void
3417on_enter (SV *block)
3418 ALIAS:
3419 on_leave = 1
3420 PROTOTYPE: &
3421 CODE:
3422{
3423 struct coro *coro = SvSTATE_current;
3424 AV **avp = ix ? &coro->on_leave : &coro->on_enter;
3425
3426 block = (SV *)coro_sv_2cv (aTHX_ block);
3427
3428 if (!*avp)
3429 *avp = newAV ();
3430
3431 av_push (*avp, SvREFCNT_inc (block));
3432
3433 if (!ix)
3434 on_enterleave_call (aTHX_ block);
3435
3436 LEAVE; /* pp_entersub unfortunately forces an ENTER/LEAVE around XS calls */
3437 SAVEDESTRUCTOR_X (ix ? coro_pop_on_leave : coro_pop_on_enter, (void *)coro);
3438 ENTER; /* pp_entersub unfortunately forces an ENTER/LEAVE around XS calls */
3439}
3440
3441
3442MODULE = Coro::State PACKAGE = PerlIO::cede
3443
3444BOOT:
3445 PerlIO_define_layer (aTHX_ &PerlIO_cede);
3446
3447
3448MODULE = Coro::State PACKAGE = Coro::Semaphore
3449
3450SV *
3451new (SV *klass, SV *count = 0)
3452 CODE:
3453 RETVAL = sv_bless (
3454 coro_waitarray_new (aTHX_ count && SvOK (count) ? SvIV (count) : 1),
3455 GvSTASH (CvGV (cv))
3456 );
3457 OUTPUT:
3458 RETVAL
3459
3460# helper for Coro::Channel and others
3461SV *
3462_alloc (int count)
3463 CODE:
3464 RETVAL = coro_waitarray_new (aTHX_ count);
3465 OUTPUT:
3466 RETVAL
3467
3468SV *
3469count (SV *self)
3470 CODE:
3471 RETVAL = newSVsv (AvARRAY ((AV *)SvRV (self))[0]);
3472 OUTPUT:
3473 RETVAL
3474
3475void
3476up (SV *self, int adjust = 1)
3477 ALIAS:
3478 adjust = 1
3479 CODE:
3480 coro_semaphore_adjust (aTHX_ (AV *)SvRV (self), ix ? adjust : 1);
3481
3482void
3483down (...)
3484 CODE:
3485 CORO_EXECUTE_SLF_XS (slf_init_semaphore_down);
3486
3487void
3488wait (...)
3489 CODE:
3490 CORO_EXECUTE_SLF_XS (slf_init_semaphore_wait);
3491
3492void
3493try (SV *self)
3494 PPCODE:
3495{
3496 AV *av = (AV *)SvRV (self);
3497 SV *count_sv = AvARRAY (av)[0];
3498 IV count = SvIVX (count_sv);
3499
3500 if (count > 0)
3501 {
3502 --count;
3503 SvIVX (count_sv) = count;
3504 XSRETURN_YES;
3505 }
3506 else
3507 XSRETURN_NO;
3508}
3509
3510void
3511waiters (SV *self)
3512 PPCODE:
3513{
3514 AV *av = (AV *)SvRV (self);
3515 int wcount = AvFILLp (av) + 1 - 1;
3516
3517 if (GIMME_V == G_SCALAR)
3518 XPUSHs (sv_2mortal (newSViv (wcount)));
3519 else
3520 {
3521 int i;
3522 EXTEND (SP, wcount);
3523 for (i = 1; i <= wcount; ++i)
3524 PUSHs (sv_2mortal (newRV_inc (AvARRAY (av)[i])));
3525 }
3526}
3527
3528MODULE = Coro::State PACKAGE = Coro::SemaphoreSet
3529
3530void
3531_may_delete (SV *sem, int count, int extra_refs)
3532 PPCODE:
3533{
3534 AV *av = (AV *)SvRV (sem);
3535
3536 if (SvREFCNT ((SV *)av) == 1 + extra_refs
3537 && AvFILLp (av) == 0 /* no waiters, just count */
3538 && SvIV (AvARRAY (av)[0]) == count)
3539 XSRETURN_YES;
3540
3541 XSRETURN_NO;
3542}
3543
3544MODULE = Coro::State PACKAGE = Coro::Signal
3545
3546SV *
3547new (SV *klass)
3548 CODE:
3549 RETVAL = sv_bless (
3550 coro_waitarray_new (aTHX_ 0),
3551 GvSTASH (CvGV (cv))
3552 );
3553 OUTPUT:
3554 RETVAL
3555
3556void
3557wait (...)
3558 CODE:
3559 CORO_EXECUTE_SLF_XS (slf_init_signal_wait);
3560
3561void
3562broadcast (SV *self)
3563 CODE:
3564{
3565 AV *av = (AV *)SvRV (self);
3566 coro_signal_wake (aTHX_ av, AvFILLp (av));
3567}
3568
3569void
3570send (SV *self)
3571 CODE:
3572{
3573 AV *av = (AV *)SvRV (self);
3574
3575 if (AvFILLp (av))
3576 coro_signal_wake (aTHX_ av, 1);
3577 else
3578 SvIVX (AvARRAY (av)[0]) = 1; /* remember the signal */
3579}
3580
3581IV
3582awaited (SV *self)
3583 CODE:
3584 RETVAL = AvFILLp ((AV *)SvRV (self)) + 1 - 1;
3585 OUTPUT:
3586 RETVAL
3587
3588
3589MODULE = Coro::State PACKAGE = Coro::AnyEvent
3590
3591BOOT:
3592 sv_activity = coro_get_sv (aTHX_ "Coro::AnyEvent::ACTIVITY", TRUE);
3593
3594void
3595_schedule (...)
3596 CODE:
3597{
3598 static int incede;
3599
3600 api_cede_notself (aTHX);
3601
3602 ++incede;
3603 while (coro_nready >= incede && api_cede (aTHX))
3604 ;
3605
3606 sv_setsv (sv_activity, &PL_sv_undef);
3607 if (coro_nready >= incede)
3608 {
3609 PUSHMARK (SP);
3610 PUTBACK;
3611 call_pv ("Coro::AnyEvent::_activity", G_KEEPERR | G_EVAL | G_VOID | G_DISCARD);
3612 }
3613
3614 --incede;
3615}
3616
3617
3618MODULE = Coro::State PACKAGE = Coro::AIO
3619
3620void
3621_register (char *target, char *proto, SV *req)
3622 CODE:
3623{
3624 CV *req_cv = coro_sv_2cv (aTHX_ req);
3625 /* newXSproto doesn't return the CV on 5.8 */
3626 CV *slf_cv = newXS (target, coro_aio_req_xs, __FILE__);
3627 sv_setpv ((SV *)slf_cv, proto);
3628 sv_magicext ((SV *)slf_cv, (SV *)req_cv, CORO_MAGIC_type_aio, 0, 0, 0);
3629}
3630

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines