ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/Coro/Coro/State.xs
(Generate patch)

Comparing Coro/Coro/State.xs (file contents):
Revision 1.6 by root, Tue Jul 17 15:42:28 2001 UTC vs.
Revision 1.303 by root, Wed Nov 19 05:52:42 2008 UTC

1#include "libcoro/coro.c"
2
3#define PERL_NO_GET_CONTEXT
4#define PERL_EXT
5
1#include "EXTERN.h" 6#include "EXTERN.h"
2#include "perl.h" 7#include "perl.h"
3#include "XSUB.h" 8#include "XSUB.h"
9#include "perliol.h"
4 10
5#if 0 11#include "patchlevel.h"
6# define CHK(x) (void *)0 12
13#include <stdio.h>
14#include <errno.h>
15#include <assert.h>
16
17#ifdef WIN32
18# undef setjmp
19# undef longjmp
20# undef _exit
21# define setjmp _setjmp /* deep magic */
7#else 22#else
8# define CHK(x) if (!(x)) croak("FATAL, CHK: " #x) 23# include <inttypes.h> /* most portable stdint.h */
24#endif
25
26#ifdef HAVE_MMAP
27# include <unistd.h>
28# include <sys/mman.h>
29# ifndef MAP_ANONYMOUS
30# ifdef MAP_ANON
31# define MAP_ANONYMOUS MAP_ANON
32# else
33# undef HAVE_MMAP
34# endif
9#endif 35# endif
36# include <limits.h>
37# ifndef PAGESIZE
38# define PAGESIZE pagesize
39# define BOOT_PAGESIZE pagesize = sysconf (_SC_PAGESIZE)
40static long pagesize;
41# else
42# define BOOT_PAGESIZE (void)0
43# endif
44#else
45# define PAGESIZE 0
46# define BOOT_PAGESIZE (void)0
47#endif
10 48
49#if CORO_USE_VALGRIND
50# include <valgrind/valgrind.h>
51#endif
52
53/* the maximum number of idle cctx that will be pooled */
54static int cctx_max_idle = 4;
55
56#define PERL_VERSION_ATLEAST(a,b,c) \
57 (PERL_REVISION > (a) \
58 || (PERL_REVISION == (a) \
59 && (PERL_VERSION > (b) \
60 || (PERL_VERSION == (b) && PERLSUBVERSION >= (c)))))
61
62#if !PERL_VERSION_ATLEAST (5,6,0)
63# ifndef PL_ppaddr
64# define PL_ppaddr ppaddr
65# endif
66# ifndef call_sv
67# define call_sv perl_call_sv
68# endif
69# ifndef get_sv
70# define get_sv perl_get_sv
71# endif
72# ifndef get_cv
73# define get_cv perl_get_cv
74# endif
75# ifndef IS_PADGV
76# define IS_PADGV(v) 0
77# endif
78# ifndef IS_PADCONST
79# define IS_PADCONST(v) 0
80# endif
81#endif
82
83/* 5.11 */
84#ifndef CxHASARGS
85# define CxHASARGS(cx) (cx)->blk_sub.hasargs
86#endif
87
88/* 5.10.0 */
89#ifndef SvREFCNT_inc_NN
90# define SvREFCNT_inc_NN(sv) SvREFCNT_inc (sv)
91#endif
92
93/* 5.8.8 */
94#ifndef GV_NOTQUAL
95# define GV_NOTQUAL 0
96#endif
97#ifndef newSV
98# define newSV(l) NEWSV(0,l)
99#endif
100#ifndef CvISXSUB_on
101# define CvISXSUB_on(cv) (void)cv
102#endif
103
104/* 5.8.7 */
105#ifndef SvRV_set
106# define SvRV_set(s,v) SvRV(s) = (v)
107#endif
108
109#if !__i386 && !__x86_64 && !__powerpc && !__m68k && !__alpha && !__mips && !__sparc64
110# undef CORO_STACKGUARD
111#endif
112
113#ifndef CORO_STACKGUARD
114# define CORO_STACKGUARD 0
115#endif
116
117/* prefer perl internal functions over our own? */
118#ifndef CORO_PREFER_PERL_FUNCTIONS
119# define CORO_PREFER_PERL_FUNCTIONS 0
120#endif
121
122/* The next macros try to return the current stack pointer, in an as
123 * portable way as possible. */
124#if __GNUC__ >= 4
125# define dSTACKLEVEL int stacklevel_dummy
126# define STACKLEVEL __builtin_frame_address (0)
127#else
128# define dSTACKLEVEL volatile void *stacklevel
129# define STACKLEVEL ((void *)&stacklevel)
130#endif
131
132#define IN_DESTRUCT (PL_main_cv == Nullcv)
133
134#if __GNUC__ >= 3
135# define attribute(x) __attribute__(x)
136# define expect(expr,value) __builtin_expect ((expr),(value))
137# define INLINE static inline
138#else
139# define attribute(x)
140# define expect(expr,value) (expr)
141# define INLINE static
142#endif
143
144#define expect_false(expr) expect ((expr) != 0, 0)
145#define expect_true(expr) expect ((expr) != 0, 1)
146
147#define NOINLINE attribute ((noinline))
148
149#include "CoroAPI.h"
150#define GCoroAPI (&coroapi) /* very sneaky */
151
152#ifdef USE_ITHREADS
153# if CORO_PTHREAD
154static void *coro_thx;
155# endif
156#endif
157
158static double (*nvtime)(); /* so why doesn't it take void? */
159
160/* we hijack an hopefully unused CV flag for our purposes */
161#define CVf_SLF 0x4000
162static OP *pp_slf (pTHX);
163
164static U32 cctx_gen;
165static size_t cctx_stacksize = CORO_STACKSIZE;
166static struct CoroAPI coroapi;
167static AV *main_mainstack; /* used to differentiate between $main and others */
168static JMPENV *main_top_env;
169static HV *coro_state_stash, *coro_stash;
170static volatile SV *coro_mortal; /* will be freed/thrown after next transfer */
171
172static GV *irsgv; /* $/ */
173static GV *stdoutgv; /* *STDOUT */
174static SV *rv_diehook;
175static SV *rv_warnhook;
176static HV *hv_sig; /* %SIG */
177
178/* async_pool helper stuff */
179static SV *sv_pool_rss;
180static SV *sv_pool_size;
181static AV *av_async_pool;
182
183/* Coro::AnyEvent */
184static SV *sv_activity;
185
186static struct coro_cctx *cctx_first;
187static int cctx_count, cctx_idle;
188
189enum {
190 CC_MAPPED = 0x01,
191 CC_NOREUSE = 0x02, /* throw this away after tracing */
192 CC_TRACE = 0x04,
193 CC_TRACE_SUB = 0x08, /* trace sub calls */
194 CC_TRACE_LINE = 0x10, /* trace each statement */
195 CC_TRACE_ALL = CC_TRACE_SUB | CC_TRACE_LINE,
196};
197
198/* this is a structure representing a c-level coroutine */
199typedef struct coro_cctx
200{
201 struct coro_cctx *next;
202
203 /* the stack */
204 void *sptr;
205 size_t ssize;
206
207 /* cpu state */
208 void *idle_sp; /* sp of top-level transfer/schedule/cede call */
209 JMPENV *idle_te; /* same as idle_sp, but for top_env, TODO: remove once stable */
210 JMPENV *top_env;
211 coro_context cctx;
212
213 U32 gen;
214#if CORO_USE_VALGRIND
215 int valgrind_id;
216#endif
217 unsigned char flags;
218} coro_cctx;
219
220enum {
221 CF_RUNNING = 0x0001, /* coroutine is running */
222 CF_READY = 0x0002, /* coroutine is ready */
223 CF_NEW = 0x0004, /* has never been switched to */
224 CF_DESTROYED = 0x0008, /* coroutine data has been freed */
225};
226
227/* the structure where most of the perl state is stored, overlaid on the cxstack */
228typedef struct
229{
230 SV *defsv;
231 AV *defav;
232 SV *errsv;
233 SV *irsgv;
234#define VAR(name,type) type name;
235# include "state.h"
236#undef VAR
237} perl_slots;
238
239#define SLOT_COUNT ((sizeof (perl_slots) + sizeof (PERL_CONTEXT) - 1) / sizeof (PERL_CONTEXT))
240
241/* this is a structure representing a perl-level coroutine */
11struct coro { 242struct coro {
12 U8 dowarn; 243 /* the C coroutine allocated to this perl coroutine, if any */
13 AV *defav; 244 coro_cctx *cctx;
14 245
15 PERL_SI *curstackinfo; 246 /* state data */
16 AV *curstack; 247 struct CoroSLF slf_frame; /* saved slf frame */
17 AV *mainstack; 248 AV *mainstack;
18 SV **stack_sp; 249 perl_slots *slot; /* basically the saved sp */
19 OP *op;
20 SV **curpad;
21 SV **stack_base;
22 SV **stack_max;
23 SV **tmps_stack;
24 I32 tmps_floor;
25 I32 tmps_ix;
26 I32 tmps_max;
27 I32 *markstack;
28 I32 *markstack_ptr;
29 I32 *markstack_max;
30 I32 *scopestack;
31 I32 scopestack_ix;
32 I32 scopestack_max;
33 ANY *savestack;
34 I32 savestack_ix;
35 I32 savestack_max;
36 OP **retstack;
37 I32 retstack_ix;
38 I32 retstack_max;
39 COP *curcop;
40 250
41 AV *args; 251 AV *args; /* data associated with this coroutine (initial args) */
252 int refcnt; /* coroutines are refcounted, yes */
253 int flags; /* CF_ flags */
254 HV *hv; /* the perl hash associated with this coro, if any */
255 void (*on_destroy)(pTHX_ struct coro *coro);
256
257 /* statistics */
258 int usecount; /* number of transfers to this coro */
259
260 /* coro process data */
261 int prio;
262 SV *except; /* exception to be thrown */
263 SV *rouse_cb;
264
265 /* async_pool */
266 SV *saved_deffh;
267
268 /* linked list */
269 struct coro *next, *prev;
42}; 270};
43 271
44typedef struct coro *Coro__State; 272typedef struct coro *Coro__State;
45typedef struct coro *Coro__State_or_hashref; 273typedef struct coro *Coro__State_or_hashref;
46 274
47static HV *padlist_cache; 275/* the following variables are effectively part of the perl context */
276/* and get copied between struct coro and these variables */
277/* the mainr easonw e don't support windows process emulation */
278static struct CoroSLF slf_frame; /* the current slf frame */
48 279
49/* mostly copied from op.c:cv_clone2 */ 280/** Coro ********************************************************************/
50STATIC AV * 281
51clone_padlist (AV *protopadlist) 282#define PRIO_MAX 3
283#define PRIO_HIGH 1
284#define PRIO_NORMAL 0
285#define PRIO_LOW -1
286#define PRIO_IDLE -3
287#define PRIO_MIN -4
288
289/* for Coro.pm */
290static SV *coro_current;
291static SV *coro_readyhook;
292static AV *coro_ready [PRIO_MAX - PRIO_MIN + 1];
293static struct coro *coro_first;
294#define coro_nready coroapi.nready
295
296/** lowlevel stuff **********************************************************/
297
298static SV *
299coro_get_sv (pTHX_ const char *name, int create)
52{ 300{
53 AV *av; 301#if PERL_VERSION_ATLEAST (5,10,0)
54 I32 ix; 302 /* silence stupid and wrong 5.10 warning that I am unable to switch off */
55 AV *protopad_name = (AV *) * av_fetch (protopadlist, 0, FALSE); 303 get_sv (name, create);
56 AV *protopad = (AV *) * av_fetch (protopadlist, 1, FALSE); 304#endif
57 SV **pname = AvARRAY (protopad_name); 305 return get_sv (name, create);
58 SV **ppad = AvARRAY (protopad); 306}
59 I32 fname = AvFILLp (protopad_name); 307
60 I32 fpad = AvFILLp (protopad); 308static AV *
309coro_get_av (pTHX_ const char *name, int create)
310{
311#if PERL_VERSION_ATLEAST (5,10,0)
312 /* silence stupid and wrong 5.10 warning that I am unable to switch off */
313 get_av (name, create);
314#endif
315 return get_av (name, create);
316}
317
318static HV *
319coro_get_hv (pTHX_ const char *name, int create)
320{
321#if PERL_VERSION_ATLEAST (5,10,0)
322 /* silence stupid and wrong 5.10 warning that I am unable to switch off */
323 get_hv (name, create);
324#endif
325 return get_hv (name, create);
326}
327
328static AV *
329coro_clone_padlist (pTHX_ CV *cv)
330{
331 AV *padlist = CvPADLIST (cv);
61 AV *newpadlist, *newpad_name, *newpad; 332 AV *newpadlist, *newpad;
62 SV **npad;
63
64 newpad_name = newAV ();
65 for (ix = fname; ix >= 0; ix--)
66 av_store (newpad_name, ix, SvREFCNT_inc (pname[ix]));
67
68 newpad = newAV ();
69 av_fill (newpad, AvFILLp (protopad));
70 npad = AvARRAY (newpad);
71 333
72 newpadlist = newAV (); 334 newpadlist = newAV ();
73 AvREAL_off (newpadlist); 335 AvREAL_off (newpadlist);
74 av_store (newpadlist, 0, (SV *) newpad_name); 336#if PERL_VERSION_ATLEAST (5,10,0)
337 Perl_pad_push (aTHX_ padlist, AvFILLp (padlist) + 1);
338#else
339 Perl_pad_push (aTHX_ padlist, AvFILLp (padlist) + 1, 1);
340#endif
341 newpad = (AV *)AvARRAY (padlist)[AvFILLp (padlist)];
342 --AvFILLp (padlist);
343
344 av_store (newpadlist, 0, SvREFCNT_inc_NN (*av_fetch (padlist, 0, FALSE)));
75 av_store (newpadlist, 1, (SV *) newpad); 345 av_store (newpadlist, 1, (SV *)newpad);
76 346
77 av = newAV (); /* will be @_ */ 347 return newpadlist;
78 av_extend (av, 0); 348}
79 av_store (newpad, 0, (SV *) av);
80 AvFLAGS (av) = AVf_REIFY;
81 349
82 for (ix = fpad; ix > 0; ix--) 350static void
351free_padlist (pTHX_ AV *padlist)
352{
353 /* may be during global destruction */
354 if (SvREFCNT (padlist))
83 { 355 {
84 SV *namesv = (ix <= fname) ? pname[ix] : Nullsv; 356 I32 i = AvFILLp (padlist);
85 if (namesv && namesv != &PL_sv_undef) 357 while (i >= 0)
86 { 358 {
87 char *name = SvPVX (namesv); /* XXX */ 359 SV **svp = av_fetch (padlist, i--, FALSE);
88 if (SvFLAGS (namesv) & SVf_FAKE || *name == '&') 360 if (svp)
89 { /* lexical from outside? */
90 npad[ix] = SvREFCNT_inc (ppad[ix]);
91 } 361 {
92 else
93 { /* our own lexical */
94 SV *sv; 362 SV *sv;
95 if (*name == '&') 363 while (&PL_sv_undef != (sv = av_pop ((AV *)*svp)))
96 sv = SvREFCNT_inc (ppad[ix]); 364 SvREFCNT_dec (sv);
97 else if (*name == '@') 365
98 sv = (SV *) newAV (); 366 SvREFCNT_dec (*svp);
99 else if (*name == '%')
100 sv = (SV *) newHV ();
101 else
102 sv = NEWSV (0, 0);
103 if (!SvPADBUSY (sv))
104 SvPADMY_on (sv);
105 npad[ix] = sv;
106 } 367 }
107 } 368 }
108 else if (IS_PADGV (ppad[ix]) || IS_PADCONST (ppad[ix]))
109 {
110 npad[ix] = SvREFCNT_inc (ppad[ix]);
111 }
112 else
113 {
114 SV *sv = NEWSV (0, 0);
115 SvPADTMP_on (sv);
116 npad[ix] = sv;
117 }
118 }
119 369
120#if 0 /* NONOTUNDERSTOOD */
121 /* Now that vars are all in place, clone nested closures. */
122
123 for (ix = fpad; ix > 0; ix--) {
124 SV* namesv = (ix <= fname) ? pname[ix] : Nullsv;
125 if (namesv
126 && namesv != &PL_sv_undef
127 && !(SvFLAGS(namesv) & SVf_FAKE)
128 && *SvPVX(namesv) == '&'
129 && CvCLONE(ppad[ix]))
130 {
131 CV *kid = cv_clone((CV*)ppad[ix]);
132 SvREFCNT_dec(ppad[ix]);
133 CvCLONE_on(kid);
134 SvPADMY_on(kid);
135 npad[ix] = (SV*)kid;
136 }
137 }
138#endif
139
140 return newpadlist;
141}
142
143STATIC AV *
144free_padlist (AV *padlist)
145{
146 /* may be during global destruction */
147 if (SvREFCNT(padlist))
148 {
149 I32 i = AvFILLp(padlist);
150 while (i >= 0)
151 {
152 SV **svp = av_fetch(padlist, i--, FALSE);
153 SV *sv = svp ? *svp : Nullsv;
154 if (sv)
155 SvREFCNT_dec(sv);
156 }
157
158 SvREFCNT_dec((SV*)padlist); 370 SvREFCNT_dec ((SV*)padlist);
371 }
372}
373
374static int
375coro_cv_free (pTHX_ SV *sv, MAGIC *mg)
376{
377 AV *padlist;
378 AV *av = (AV *)mg->mg_obj;
379
380 /* casting is fun. */
381 while (&PL_sv_undef != (SV *)(padlist = (AV *)av_pop (av)))
382 free_padlist (aTHX_ padlist);
383
384 SvREFCNT_dec (av); /* sv_magicext increased the refcount */
385
386 return 0;
387}
388
389#define CORO_MAGIC_type_cv 26
390#define CORO_MAGIC_type_state PERL_MAGIC_ext
391
392static MGVTBL coro_cv_vtbl = {
393 0, 0, 0, 0,
394 coro_cv_free
395};
396
397#define CORO_MAGIC_NN(sv, type) \
398 (expect_true (SvMAGIC (sv)->mg_type == type) \
399 ? SvMAGIC (sv) \
400 : mg_find (sv, type))
401
402#define CORO_MAGIC(sv, type) \
403 (expect_true (SvMAGIC (sv)) \
404 ? CORO_MAGIC_NN (sv, type) \
405 : 0)
406
407#define CORO_MAGIC_cv(cv) CORO_MAGIC (((SV *)(cv)), CORO_MAGIC_type_cv)
408#define CORO_MAGIC_state(sv) CORO_MAGIC_NN (((SV *)(sv)), CORO_MAGIC_type_state)
409
410INLINE struct coro *
411SvSTATE_ (pTHX_ SV *coro)
412{
413 HV *stash;
414 MAGIC *mg;
415
416 if (SvROK (coro))
417 coro = SvRV (coro);
418
419 if (expect_false (SvTYPE (coro) != SVt_PVHV))
420 croak ("Coro::State object required");
421
422 stash = SvSTASH (coro);
423 if (expect_false (stash != coro_stash && stash != coro_state_stash))
424 {
425 /* very slow, but rare, check */
426 if (!sv_derived_from (sv_2mortal (newRV_inc (coro)), "Coro::State"))
427 croak ("Coro::State object required");
428 }
429
430 mg = CORO_MAGIC_state (coro);
431 return (struct coro *)mg->mg_ptr;
432}
433
434#define SvSTATE(sv) SvSTATE_ (aTHX_ (sv))
435
436/* faster than SvSTATE, but expects a coroutine hv */
437#define SvSTATE_hv(hv) ((struct coro *)CORO_MAGIC_NN ((SV *)hv, CORO_MAGIC_type_state)->mg_ptr)
438#define SvSTATE_current SvSTATE_hv (SvRV (coro_current))
439
440/* the next two functions merely cache the padlists */
441static void
442get_padlist (pTHX_ CV *cv)
443{
444 MAGIC *mg = CORO_MAGIC_cv (cv);
445 AV *av;
446
447 if (expect_true (mg && AvFILLp ((av = (AV *)mg->mg_obj)) >= 0))
448 CvPADLIST (cv) = (AV *)AvARRAY (av)[AvFILLp (av)--];
449 else
450 {
451#if CORO_PREFER_PERL_FUNCTIONS
452 /* this is probably cleaner? but also slower! */
453 /* in practise, it seems to be less stable */
454 CV *cp = Perl_cv_clone (cv);
455 CvPADLIST (cv) = CvPADLIST (cp);
456 CvPADLIST (cp) = 0;
457 SvREFCNT_dec (cp);
458#else
459 CvPADLIST (cv) = coro_clone_padlist (aTHX_ cv);
460#endif
461 }
462}
463
464static void
465put_padlist (pTHX_ CV *cv)
466{
467 MAGIC *mg = CORO_MAGIC_cv (cv);
468 AV *av;
469
470 if (expect_false (!mg))
471 mg = sv_magicext ((SV *)cv, (SV *)newAV (), CORO_MAGIC_type_cv, &coro_cv_vtbl, 0, 0);
472
473 av = (AV *)mg->mg_obj;
474
475 if (expect_false (AvFILLp (av) >= AvMAX (av)))
476 av_extend (av, AvMAX (av) + 1);
477
478 AvARRAY (av)[++AvFILLp (av)] = (SV *)CvPADLIST (cv);
479}
480
481/** load & save, init *******************************************************/
482
483static void
484load_perl (pTHX_ Coro__State c)
485{
486 perl_slots *slot = c->slot;
487 c->slot = 0;
488
489 PL_mainstack = c->mainstack;
490
491 GvSV (PL_defgv) = slot->defsv;
492 GvAV (PL_defgv) = slot->defav;
493 GvSV (PL_errgv) = slot->errsv;
494 GvSV (irsgv) = slot->irsgv;
495
496 #define VAR(name,type) PL_ ## name = slot->name;
497 # include "state.h"
498 #undef VAR
499
500 {
501 dSP;
502
503 CV *cv;
504
505 /* now do the ugly restore mess */
506 while (expect_true (cv = (CV *)POPs))
507 {
508 put_padlist (aTHX_ cv); /* mark this padlist as available */
509 CvDEPTH (cv) = PTR2IV (POPs);
510 CvPADLIST (cv) = (AV *)POPs;
511 }
512
513 PUTBACK;
159 } 514 }
160}
161 515
162/* the next tow functions merely cache the padlists */ 516 slf_frame = c->slf_frame;
163STATIC void 517 CORO_THROW = c->except;
164get_padlist (CV *cv)
165{
166 SV **he = hv_fetch (padlist_cache, (void *)&cv, sizeof (CV *), 0);
167
168 if (he && AvFILLp ((AV *)*he) >= 0)
169 CvPADLIST (cv) = (AV *)av_pop ((AV *)*he);
170 else
171 CvPADLIST (cv) = clone_padlist (CvPADLIST (cv));
172} 518}
173 519
174STATIC void
175put_padlist (CV *cv)
176{
177 SV **he = hv_fetch (padlist_cache, (void *)&cv, sizeof (CV *), 1);
178
179 if (SvTYPE (*he) != SVt_PVAV)
180 {
181 SvREFCNT_dec (*he);
182 *he = (SV *)newAV ();
183 }
184
185 av_push ((AV *)*he, (SV *)CvPADLIST (cv));
186}
187
188static void 520static void
189save_state(pTHX_ Coro__State c) 521save_perl (pTHX_ Coro__State c)
190{ 522{
523 c->except = CORO_THROW;
524 c->slf_frame = slf_frame;
525
191 { 526 {
192 dSP; 527 dSP;
193 I32 cxix = cxstack_ix; 528 I32 cxix = cxstack_ix;
529 PERL_CONTEXT *ccstk = cxstack;
194 PERL_SI *top_si = PL_curstackinfo; 530 PERL_SI *top_si = PL_curstackinfo;
195 PERL_CONTEXT *ccstk = cxstack;
196 531
197 /* 532 /*
198 * the worst thing you can imagine happens first - we have to save 533 * the worst thing you can imagine happens first - we have to save
199 * (and reinitialize) all cv's in the whole callchain :( 534 * (and reinitialize) all cv's in the whole callchain :(
200 */ 535 */
201 536
202 PUSHs (Nullsv); 537 XPUSHs (Nullsv);
203 /* this loop was inspired by pp_caller */ 538 /* this loop was inspired by pp_caller */
204 for (;;) 539 for (;;)
205 { 540 {
206 while (cxix >= 0) 541 while (expect_true (cxix >= 0))
207 { 542 {
208 PERL_CONTEXT *cx = &ccstk[cxix--]; 543 PERL_CONTEXT *cx = &ccstk[cxix--];
209 544
210 if (CxTYPE(cx) == CXt_SUB) 545 if (expect_true (CxTYPE (cx) == CXt_SUB || CxTYPE (cx) == CXt_FORMAT))
211 { 546 {
212 CV *cv = cx->blk_sub.cv; 547 CV *cv = cx->blk_sub.cv;
548
213 if (CvDEPTH(cv)) 549 if (expect_true (CvDEPTH (cv)))
214 { 550 {
215#ifdef USE_THREADS
216 XPUSHs ((SV *)CvOWNER(cv));
217#endif
218 EXTEND (SP, 3); 551 EXTEND (SP, 3);
219 PUSHs ((SV *)CvDEPTH(cv));
220 PUSHs ((SV *)CvPADLIST(cv)); 552 PUSHs ((SV *)CvPADLIST (cv));
553 PUSHs (INT2PTR (SV *, (IV)CvDEPTH (cv)));
221 PUSHs ((SV *)cv); 554 PUSHs ((SV *)cv);
222 555
223 get_padlist (cv);
224
225 CvDEPTH(cv) = 0; 556 CvDEPTH (cv) = 0;
226#ifdef USE_THREADS 557 get_padlist (aTHX_ cv);
227 CvOWNER(cv) = 0;
228 error must unlock this cv etc.. etc...
229 if you are here wondering about this error message then
230 the reason is that it will not work as advertised yet
231#endif
232 } 558 }
233 } 559 }
234 else if (CxTYPE(cx) == CXt_FORMAT) 560 }
561
562 if (expect_true (top_si->si_type == PERLSI_MAIN))
563 break;
564
565 top_si = top_si->si_prev;
566 ccstk = top_si->si_cxstack;
567 cxix = top_si->si_cxix;
568 }
569
570 PUTBACK;
571 }
572
573 /* allocate some space on the context stack for our purposes */
574 /* we manually unroll here, as usually 2 slots is enough */
575 if (SLOT_COUNT >= 1) CXINC;
576 if (SLOT_COUNT >= 2) CXINC;
577 if (SLOT_COUNT >= 3) CXINC;
578 {
579 int i;
580 for (i = 3; i < SLOT_COUNT; ++i)
581 CXINC;
582 }
583 cxstack_ix -= SLOT_COUNT; /* undo allocation */
584
585 c->mainstack = PL_mainstack;
586
587 {
588 perl_slots *slot = c->slot = (perl_slots *)(cxstack + cxstack_ix + 1);
589
590 slot->defav = GvAV (PL_defgv);
591 slot->defsv = DEFSV;
592 slot->errsv = ERRSV;
593 slot->irsgv = GvSV (irsgv);
594
595 #define VAR(name,type) slot->name = PL_ ## name;
596 # include "state.h"
597 #undef VAR
598 }
599}
600
601/*
602 * allocate various perl stacks. This is almost an exact copy
603 * of perl.c:init_stacks, except that it uses less memory
604 * on the (sometimes correct) assumption that coroutines do
605 * not usually need a lot of stackspace.
606 */
607#if CORO_PREFER_PERL_FUNCTIONS
608# define coro_init_stacks init_stacks
609#else
610static void
611coro_init_stacks (pTHX)
612{
613 PL_curstackinfo = new_stackinfo(32, 8);
614 PL_curstackinfo->si_type = PERLSI_MAIN;
615 PL_curstack = PL_curstackinfo->si_stack;
616 PL_mainstack = PL_curstack; /* remember in case we switch stacks */
617
618 PL_stack_base = AvARRAY(PL_curstack);
619 PL_stack_sp = PL_stack_base;
620 PL_stack_max = PL_stack_base + AvMAX(PL_curstack);
621
622 New(50,PL_tmps_stack,32,SV*);
623 PL_tmps_floor = -1;
624 PL_tmps_ix = -1;
625 PL_tmps_max = 32;
626
627 New(54,PL_markstack,16,I32);
628 PL_markstack_ptr = PL_markstack;
629 PL_markstack_max = PL_markstack + 16;
630
631#ifdef SET_MARK_OFFSET
632 SET_MARK_OFFSET;
633#endif
634
635 New(54,PL_scopestack,8,I32);
636 PL_scopestack_ix = 0;
637 PL_scopestack_max = 8;
638
639 New(54,PL_savestack,24,ANY);
640 PL_savestack_ix = 0;
641 PL_savestack_max = 24;
642
643#if !PERL_VERSION_ATLEAST (5,10,0)
644 New(54,PL_retstack,4,OP*);
645 PL_retstack_ix = 0;
646 PL_retstack_max = 4;
647#endif
648}
649#endif
650
651/*
652 * destroy the stacks, the callchain etc...
653 */
654static void
655coro_destruct_stacks (pTHX)
656{
657 while (PL_curstackinfo->si_next)
658 PL_curstackinfo = PL_curstackinfo->si_next;
659
660 while (PL_curstackinfo)
661 {
662 PERL_SI *p = PL_curstackinfo->si_prev;
663
664 if (!IN_DESTRUCT)
665 SvREFCNT_dec (PL_curstackinfo->si_stack);
666
667 Safefree (PL_curstackinfo->si_cxstack);
668 Safefree (PL_curstackinfo);
669 PL_curstackinfo = p;
670 }
671
672 Safefree (PL_tmps_stack);
673 Safefree (PL_markstack);
674 Safefree (PL_scopestack);
675 Safefree (PL_savestack);
676#if !PERL_VERSION_ATLEAST (5,10,0)
677 Safefree (PL_retstack);
678#endif
679}
680
681static size_t
682coro_rss (pTHX_ struct coro *coro)
683{
684 size_t rss = sizeof (*coro);
685
686 if (coro->mainstack)
687 {
688 perl_slots tmp_slot;
689 perl_slots *slot;
690
691 if (coro->flags & CF_RUNNING)
692 {
693 slot = &tmp_slot;
694
695 #define VAR(name,type) slot->name = PL_ ## name;
696 # include "state.h"
697 #undef VAR
698 }
699 else
700 slot = coro->slot;
701
702 if (slot)
703 {
704 rss += sizeof (slot->curstackinfo);
705 rss += (slot->curstackinfo->si_cxmax + 1) * sizeof (PERL_CONTEXT);
706 rss += sizeof (SV) + sizeof (struct xpvav) + (1 + AvMAX (slot->curstack)) * sizeof (SV *);
707 rss += slot->tmps_max * sizeof (SV *);
708 rss += (slot->markstack_max - slot->markstack_ptr) * sizeof (I32);
709 rss += slot->scopestack_max * sizeof (I32);
710 rss += slot->savestack_max * sizeof (ANY);
711
712#if !PERL_VERSION_ATLEAST (5,10,0)
713 rss += slot->retstack_max * sizeof (OP *);
714#endif
715 }
716 }
717
718 return rss;
719}
720
721/** coroutine stack handling ************************************************/
722
723static int (*orig_sigelem_get) (pTHX_ SV *sv, MAGIC *mg);
724static int (*orig_sigelem_set) (pTHX_ SV *sv, MAGIC *mg);
725static int (*orig_sigelem_clr) (pTHX_ SV *sv, MAGIC *mg);
726
727/* apparently < 5.8.8 */
728#ifndef MgPV_nolen_const
729#define MgPV_nolen_const(mg) (((((int)(mg)->mg_len)) == HEf_SVKEY) ? \
730 SvPV_nolen((SV*)((mg)->mg_ptr)) : \
731 (const char*)(mg)->mg_ptr)
732#endif
733
734/*
735 * This overrides the default magic get method of %SIG elements.
736 * The original one doesn't provide for reading back of PL_diehook/PL_warnhook
737 * and instead of tryign to save and restore the hash elements, we just provide
738 * readback here.
739 * We only do this when the hook is != 0, as they are often set to 0 temporarily,
740 * not expecting this to actually change the hook. This is a potential problem
741 * when a schedule happens then, but we ignore this.
742 */
743static int
744coro_sigelem_get (pTHX_ SV *sv, MAGIC *mg)
745{
746 const char *s = MgPV_nolen_const (mg);
747
748 if (*s == '_')
749 {
750 SV **svp = 0;
751
752 if (strEQ (s, "__DIE__" )) svp = &PL_diehook;
753 if (strEQ (s, "__WARN__")) svp = &PL_warnhook;
754
755 if (svp)
756 {
757 sv_setsv (sv, *svp ? *svp : &PL_sv_undef);
758 return 0;
759 }
760 }
761
762 return orig_sigelem_get ? orig_sigelem_get (aTHX_ sv, mg) : 0;
763}
764
765static int
766coro_sigelem_clr (pTHX_ SV *sv, MAGIC *mg)
767{
768 const char *s = MgPV_nolen_const (mg);
769
770 if (*s == '_')
771 {
772 SV **svp = 0;
773
774 if (strEQ (s, "__DIE__" )) svp = &PL_diehook;
775 if (strEQ (s, "__WARN__")) svp = &PL_warnhook;
776
777 if (svp)
778 {
779 SV *old = *svp;
780 *svp = 0;
781 SvREFCNT_dec (old);
782 return 0;
783 }
784 }
785
786 return orig_sigelem_clr ? orig_sigelem_clr (aTHX_ sv, mg) : 0;
787}
788
789static int
790coro_sigelem_set (pTHX_ SV *sv, MAGIC *mg)
791{
792 const char *s = MgPV_nolen_const (mg);
793
794 if (*s == '_')
795 {
796 SV **svp = 0;
797
798 if (strEQ (s, "__DIE__" )) svp = &PL_diehook;
799 if (strEQ (s, "__WARN__")) svp = &PL_warnhook;
800
801 if (svp)
802 {
803 SV *old = *svp;
804 *svp = newSVsv (sv);
805 SvREFCNT_dec (old);
806 return 0;
807 }
808 }
809
810 return orig_sigelem_set ? orig_sigelem_set (aTHX_ sv, mg) : 0;
811}
812
813static void
814prepare_nop (pTHX_ struct coro_transfer_args *ta)
815{
816 /* kind of mega-hacky, but works */
817 ta->next = ta->prev = (struct coro *)ta;
818}
819
820static int
821slf_check_nop (pTHX_ struct CoroSLF *frame)
822{
823 return 0;
824}
825
826static UNOP coro_setup_op;
827
828static void NOINLINE /* noinline to keep it out of the transfer fast path */
829coro_setup (pTHX_ struct coro *coro)
830{
831 /*
832 * emulate part of the perl startup here.
833 */
834 coro_init_stacks (aTHX);
835
836 PL_runops = RUNOPS_DEFAULT;
837 PL_curcop = &PL_compiling;
838 PL_in_eval = EVAL_NULL;
839 PL_comppad = 0;
840 PL_curpm = 0;
841 PL_curpad = 0;
842 PL_localizing = 0;
843 PL_dirty = 0;
844 PL_restartop = 0;
845#if PERL_VERSION_ATLEAST (5,10,0)
846 PL_parser = 0;
847#endif
848
849 /* recreate the die/warn hooks */
850 PL_diehook = 0; SvSetMagicSV (*hv_fetch (hv_sig, "__DIE__" , sizeof ("__DIE__" ) - 1, 1), rv_diehook );
851 PL_warnhook = 0; SvSetMagicSV (*hv_fetch (hv_sig, "__WARN__", sizeof ("__WARN__") - 1, 1), rv_warnhook);
852
853 GvSV (PL_defgv) = newSV (0);
854 GvAV (PL_defgv) = coro->args; coro->args = 0;
855 GvSV (PL_errgv) = newSV (0);
856 GvSV (irsgv) = newSVpvn ("\n", 1); sv_magic (GvSV (irsgv), (SV *)irsgv, PERL_MAGIC_sv, "/", 0);
857 PL_rs = newSVsv (GvSV (irsgv));
858 PL_defoutgv = (GV *)SvREFCNT_inc_NN (stdoutgv);
859
860 {
861 dSP;
862 UNOP myop;
863
864 Zero (&myop, 1, UNOP);
865 myop.op_next = Nullop;
866 myop.op_flags = OPf_WANT_VOID;
867
868 PUSHMARK (SP);
869 XPUSHs (sv_2mortal (av_shift (GvAV (PL_defgv))));
870 PUTBACK;
871 PL_op = (OP *)&myop;
872 PL_op = PL_ppaddr[OP_ENTERSUB](aTHX);
873 SPAGAIN;
874 }
875
876 /* this newly created coroutine might be run on an existing cctx which most
877 * likely was suspended in pp_slf, so we have to emulate entering pp_slf here.
878 */
879 slf_frame.prepare = prepare_nop; /* provide a nop function for an eventual pp_slf */
880 slf_frame.check = slf_check_nop; /* signal pp_slf to not repeat */
881
882 /* and we have to provide the pp_slf op in any case, so pp_slf can skip it */
883 coro_setup_op.op_next = PL_op;
884 coro_setup_op.op_type = OP_CUSTOM;
885 coro_setup_op.op_ppaddr = pp_slf;
886 /* no flags required, as an init function won't be called */
887
888 PL_op = (OP *)&coro_setup_op;
889
890 /* copy throw, in case it was set before coro_setup */
891 CORO_THROW = coro->except;
892}
893
894static void
895coro_destruct (pTHX_ struct coro *coro)
896{
897 if (!IN_DESTRUCT)
898 {
899 /* restore all saved variables and stuff */
900 LEAVE_SCOPE (0);
901 assert (PL_tmps_floor == -1);
902
903 /* free all temporaries */
904 FREETMPS;
905 assert (PL_tmps_ix == -1);
906
907 /* unwind all extra stacks */
908 POPSTACK_TO (PL_mainstack);
909
910 /* unwind main stack */
911 dounwind (-1);
912 }
913
914 SvREFCNT_dec (GvSV (PL_defgv));
915 SvREFCNT_dec (GvAV (PL_defgv));
916 SvREFCNT_dec (GvSV (PL_errgv));
917 SvREFCNT_dec (PL_defoutgv);
918 SvREFCNT_dec (PL_rs);
919 SvREFCNT_dec (GvSV (irsgv));
920
921 SvREFCNT_dec (PL_diehook);
922 SvREFCNT_dec (PL_warnhook);
923
924 SvREFCNT_dec (CORO_THROW);
925 SvREFCNT_dec (coro->saved_deffh);
926 SvREFCNT_dec (coro->rouse_cb);
927
928 coro_destruct_stacks (aTHX);
929}
930
931INLINE void
932free_coro_mortal (pTHX)
933{
934 if (expect_true (coro_mortal))
935 {
936 SvREFCNT_dec (coro_mortal);
937 coro_mortal = 0;
938 }
939}
940
941static int
942runops_trace (pTHX)
943{
944 COP *oldcop = 0;
945 int oldcxix = -2;
946 struct coro *coro = SvSTATE_current; /* trace cctx is tied to specific coro */
947 coro_cctx *cctx = coro->cctx;
948
949 while ((PL_op = CALL_FPTR (PL_op->op_ppaddr) (aTHX)))
950 {
951 PERL_ASYNC_CHECK ();
952
953 if (cctx->flags & CC_TRACE_ALL)
954 {
955 if (PL_op->op_type == OP_LEAVESUB && cctx->flags & CC_TRACE_SUB)
956 {
957 PERL_CONTEXT *cx = &cxstack[cxstack_ix];
958 SV **bot, **top;
959 AV *av = newAV (); /* return values */
960 SV **cb;
961 dSP;
962
963 GV *gv = CvGV (cx->blk_sub.cv);
964 SV *fullname = sv_2mortal (newSV (0));
965 if (isGV (gv))
966 gv_efullname3 (fullname, gv, 0);
967
968 bot = PL_stack_base + cx->blk_oldsp + 1;
969 top = cx->blk_gimme == G_ARRAY ? SP + 1
970 : cx->blk_gimme == G_SCALAR ? bot + 1
971 : bot;
972
973 av_extend (av, top - bot);
974 while (bot < top)
975 av_push (av, SvREFCNT_inc_NN (*bot++));
976
977 PL_runops = RUNOPS_DEFAULT;
978 ENTER;
979 SAVETMPS;
980 EXTEND (SP, 3);
981 PUSHMARK (SP);
982 PUSHs (&PL_sv_no);
983 PUSHs (fullname);
984 PUSHs (sv_2mortal (newRV_noinc ((SV *)av)));
985 PUTBACK;
986 cb = hv_fetch ((HV *)SvRV (coro_current), "_trace_sub_cb", sizeof ("_trace_sub_cb") - 1, 0);
987 if (cb) call_sv (*cb, G_KEEPERR | G_EVAL | G_VOID | G_DISCARD);
988 SPAGAIN;
989 FREETMPS;
990 LEAVE;
991 PL_runops = runops_trace;
992 }
993
994 if (oldcop != PL_curcop)
995 {
996 oldcop = PL_curcop;
997
998 if (PL_curcop != &PL_compiling)
999 {
1000 SV **cb;
1001
1002 if (oldcxix != cxstack_ix && cctx->flags & CC_TRACE_SUB)
1003 {
1004 PERL_CONTEXT *cx = &cxstack[cxstack_ix];
1005
1006 if (CxTYPE (cx) == CXt_SUB && oldcxix < cxstack_ix)
1007 {
1008 runops_proc_t old_runops = PL_runops;
1009 dSP;
1010 GV *gv = CvGV (cx->blk_sub.cv);
1011 SV *fullname = sv_2mortal (newSV (0));
1012
1013 if (isGV (gv))
1014 gv_efullname3 (fullname, gv, 0);
1015
1016 PL_runops = RUNOPS_DEFAULT;
1017 ENTER;
1018 SAVETMPS;
1019 EXTEND (SP, 3);
1020 PUSHMARK (SP);
1021 PUSHs (&PL_sv_yes);
1022 PUSHs (fullname);
1023 PUSHs (CxHASARGS (cx) ? sv_2mortal (newRV_inc ((SV *)cx->blk_sub.argarray)) : &PL_sv_undef);
1024 PUTBACK;
1025 cb = hv_fetch ((HV *)SvRV (coro_current), "_trace_sub_cb", sizeof ("_trace_sub_cb") - 1, 0);
1026 if (cb) call_sv (*cb, G_KEEPERR | G_EVAL | G_VOID | G_DISCARD);
1027 SPAGAIN;
1028 FREETMPS;
1029 LEAVE;
1030 PL_runops = runops_trace;
1031 }
1032
1033 oldcxix = cxstack_ix;
1034 }
1035
1036 if (cctx->flags & CC_TRACE_LINE)
1037 {
1038 dSP;
1039
1040 PL_runops = RUNOPS_DEFAULT;
1041 ENTER;
1042 SAVETMPS;
1043 EXTEND (SP, 3);
1044 PL_runops = RUNOPS_DEFAULT;
1045 PUSHMARK (SP);
1046 PUSHs (sv_2mortal (newSVpv (OutCopFILE (oldcop), 0)));
1047 PUSHs (sv_2mortal (newSViv (CopLINE (oldcop))));
1048 PUTBACK;
1049 cb = hv_fetch ((HV *)SvRV (coro_current), "_trace_line_cb", sizeof ("_trace_line_cb") - 1, 0);
1050 if (cb) call_sv (*cb, G_KEEPERR | G_EVAL | G_VOID | G_DISCARD);
1051 SPAGAIN;
1052 FREETMPS;
1053 LEAVE;
1054 PL_runops = runops_trace;
1055 }
1056 }
1057 }
1058 }
1059 }
1060
1061 TAINT_NOT;
1062 return 0;
1063}
1064
1065static struct coro_cctx *cctx_ssl_cctx;
1066static struct CoroSLF cctx_ssl_frame;
1067
1068static void
1069slf_prepare_set_stacklevel (pTHX_ struct coro_transfer_args *ta)
1070{
1071 ta->prev = (struct coro *)cctx_ssl_cctx;
1072 ta->next = 0;
1073}
1074
1075static int
1076slf_check_set_stacklevel (pTHX_ struct CoroSLF *frame)
1077{
1078 *frame = cctx_ssl_frame;
1079
1080 return frame->check (aTHX_ frame); /* execute the restored frame - there must be one */
1081}
1082
1083/* initialises PL_top_env and injects a pseudo-slf-call to set the stacklevel */
1084static void NOINLINE
1085cctx_prepare (pTHX_ coro_cctx *cctx)
1086{
1087 PL_top_env = &PL_start_env;
1088
1089 if (cctx->flags & CC_TRACE)
1090 PL_runops = runops_trace;
1091
1092 /* we already must be executing an SLF op, there is no other valid way
1093 * that can lead to creation of a new cctx */
1094 assert (("FATAL: can't prepare slf-less cctx in Coro module (please report)",
1095 slf_frame.prepare && PL_op->op_ppaddr == pp_slf));
1096
1097 /* we must emulate leaving pp_slf, which is done inside slf_check_set_stacklevel */
1098 cctx_ssl_cctx = cctx;
1099 cctx_ssl_frame = slf_frame;
1100
1101 slf_frame.prepare = slf_prepare_set_stacklevel;
1102 slf_frame.check = slf_check_set_stacklevel;
1103}
1104
1105/* the tail of transfer: execute stuff we can only do after a transfer */
1106INLINE void
1107transfer_tail (pTHX)
1108{
1109 free_coro_mortal (aTHX);
1110}
1111
1112/*
1113 * this is a _very_ stripped down perl interpreter ;)
1114 */
1115static void
1116cctx_run (void *arg)
1117{
1118#ifdef USE_ITHREADS
1119# if CORO_PTHREAD
1120 PERL_SET_CONTEXT (coro_thx);
1121# endif
1122#endif
1123 {
1124 dTHX;
1125
1126 /* normally we would need to skip the entersub here */
1127 /* not doing so will re-execute it, which is exactly what we want */
1128 /* PL_nop = PL_nop->op_next */
1129
1130 /* inject a fake subroutine call to cctx_init */
1131 cctx_prepare (aTHX_ (coro_cctx *)arg);
1132
1133 /* cctx_run is the alternative tail of transfer() */
1134 transfer_tail (aTHX);
1135
1136 /* somebody or something will hit me for both perl_run and PL_restartop */
1137 PL_restartop = PL_op;
1138 perl_run (PL_curinterp);
1139
1140 /*
1141 * If perl-run returns we assume exit() was being called or the coro
1142 * fell off the end, which seems to be the only valid (non-bug)
1143 * reason for perl_run to return. We try to exit by jumping to the
1144 * bootstrap-time "top" top_env, as we cannot restore the "main"
1145 * coroutine as Coro has no such concept
1146 */
1147 PL_top_env = main_top_env;
1148 JMPENV_JUMP (2); /* I do not feel well about the hardcoded 2 at all */
1149 }
1150}
1151
1152static coro_cctx *
1153cctx_new ()
1154{
1155 coro_cctx *cctx;
1156
1157 ++cctx_count;
1158 New (0, cctx, 1, coro_cctx);
1159
1160 cctx->gen = cctx_gen;
1161 cctx->flags = 0;
1162 cctx->idle_sp = 0; /* can be accessed by transfer between cctx_run and set_stacklevel, on throw */
1163
1164 return cctx;
1165}
1166
1167/* create a new cctx only suitable as source */
1168static coro_cctx *
1169cctx_new_empty ()
1170{
1171 coro_cctx *cctx = cctx_new ();
1172
1173 cctx->sptr = 0;
1174 coro_create (&cctx->cctx, 0, 0, 0, 0);
1175
1176 return cctx;
1177}
1178
1179/* create a new cctx suitable as destination/running a perl interpreter */
1180static coro_cctx *
1181cctx_new_run ()
1182{
1183 coro_cctx *cctx = cctx_new ();
1184 void *stack_start;
1185 size_t stack_size;
1186
1187#if HAVE_MMAP
1188 cctx->ssize = ((cctx_stacksize * sizeof (long) + PAGESIZE - 1) / PAGESIZE + CORO_STACKGUARD) * PAGESIZE;
1189 /* mmap supposedly does allocate-on-write for us */
1190 cctx->sptr = mmap (0, cctx->ssize, PROT_EXEC|PROT_READ|PROT_WRITE, MAP_PRIVATE|MAP_ANONYMOUS, 0, 0);
1191
1192 if (cctx->sptr != (void *)-1)
1193 {
1194 #if CORO_STACKGUARD
1195 mprotect (cctx->sptr, CORO_STACKGUARD * PAGESIZE, PROT_NONE);
1196 #endif
1197 stack_start = (char *)cctx->sptr + CORO_STACKGUARD * PAGESIZE;
1198 stack_size = cctx->ssize - CORO_STACKGUARD * PAGESIZE;
1199 cctx->flags |= CC_MAPPED;
1200 }
1201 else
1202#endif
1203 {
1204 cctx->ssize = cctx_stacksize * (long)sizeof (long);
1205 New (0, cctx->sptr, cctx_stacksize, long);
1206
1207 if (!cctx->sptr)
1208 {
1209 perror ("FATAL: unable to allocate stack for coroutine, exiting.");
1210 _exit (EXIT_FAILURE);
1211 }
1212
1213 stack_start = cctx->sptr;
1214 stack_size = cctx->ssize;
1215 }
1216
1217 #if CORO_USE_VALGRIND
1218 cctx->valgrind_id = VALGRIND_STACK_REGISTER ((char *)stack_start, (char *)stack_start + stack_size);
1219 #endif
1220
1221 coro_create (&cctx->cctx, cctx_run, (void *)cctx, stack_start, stack_size);
1222
1223 return cctx;
1224}
1225
1226static void
1227cctx_destroy (coro_cctx *cctx)
1228{
1229 if (!cctx)
1230 return;
1231
1232 --cctx_count;
1233 coro_destroy (&cctx->cctx);
1234
1235 /* coro_transfer creates new, empty cctx's */
1236 if (cctx->sptr)
1237 {
1238 #if CORO_USE_VALGRIND
1239 VALGRIND_STACK_DEREGISTER (cctx->valgrind_id);
1240 #endif
1241
1242#if HAVE_MMAP
1243 if (cctx->flags & CC_MAPPED)
1244 munmap (cctx->sptr, cctx->ssize);
1245 else
1246#endif
1247 Safefree (cctx->sptr);
1248 }
1249
1250 Safefree (cctx);
1251}
1252
1253/* wether this cctx should be destructed */
1254#define CCTX_EXPIRED(cctx) ((cctx)->gen != cctx_gen || ((cctx)->flags & CC_NOREUSE))
1255
1256static coro_cctx *
1257cctx_get (pTHX)
1258{
1259 while (expect_true (cctx_first))
1260 {
1261 coro_cctx *cctx = cctx_first;
1262 cctx_first = cctx->next;
1263 --cctx_idle;
1264
1265 if (expect_true (!CCTX_EXPIRED (cctx)))
1266 return cctx;
1267
1268 cctx_destroy (cctx);
1269 }
1270
1271 return cctx_new_run ();
1272}
1273
1274static void
1275cctx_put (coro_cctx *cctx)
1276{
1277 assert (("FATAL: cctx_put called on non-initialised cctx in Coro (please report)", cctx->sptr));
1278
1279 /* free another cctx if overlimit */
1280 if (expect_false (cctx_idle >= cctx_max_idle))
1281 {
1282 coro_cctx *first = cctx_first;
1283 cctx_first = first->next;
1284 --cctx_idle;
1285
1286 cctx_destroy (first);
1287 }
1288
1289 ++cctx_idle;
1290 cctx->next = cctx_first;
1291 cctx_first = cctx;
1292}
1293
1294/** coroutine switching *****************************************************/
1295
1296static void
1297transfer_check (pTHX_ struct coro *prev, struct coro *next)
1298{
1299 /* TODO: throwing up here is considered harmful */
1300
1301 if (expect_true (prev != next))
1302 {
1303 if (expect_false (!(prev->flags & (CF_RUNNING | CF_NEW))))
1304 croak ("Coro::State::transfer called with non-running/new prev Coro::State, but can only transfer from running or new states,");
1305
1306 if (expect_false (next->flags & CF_RUNNING))
1307 croak ("Coro::State::transfer called with running next Coro::State, but can only transfer to inactive states,");
1308
1309 if (expect_false (next->flags & CF_DESTROYED))
1310 croak ("Coro::State::transfer called with destroyed next Coro::State, but can only transfer to inactive states,");
1311
1312#if !PERL_VERSION_ATLEAST (5,10,0)
1313 if (expect_false (PL_lex_state != LEX_NOTPARSING))
1314 croak ("Coro::State::transfer called while parsing, but this is not supported in your perl version,");
1315#endif
1316 }
1317}
1318
1319/* always use the TRANSFER macro */
1320static void NOINLINE /* noinline so we have a fixed stackframe */
1321transfer (pTHX_ struct coro *prev, struct coro *next, int force_cctx)
1322{
1323 dSTACKLEVEL;
1324
1325 /* sometimes transfer is only called to set idle_sp */
1326 if (expect_false (!next))
1327 {
1328 ((coro_cctx *)prev)->idle_sp = STACKLEVEL;
1329 assert (((coro_cctx *)prev)->idle_te = PL_top_env); /* just for the side-effect when asserts are enabled */
1330 }
1331 else if (expect_true (prev != next))
1332 {
1333 coro_cctx *prev__cctx;
1334
1335 if (expect_false (prev->flags & CF_NEW))
1336 {
1337 /* create a new empty/source context */
1338 prev->cctx = cctx_new_empty ();
1339 prev->flags &= ~CF_NEW;
1340 prev->flags |= CF_RUNNING;
1341 }
1342
1343 prev->flags &= ~CF_RUNNING;
1344 next->flags |= CF_RUNNING;
1345
1346 /* first get rid of the old state */
1347 save_perl (aTHX_ prev);
1348
1349 if (expect_false (next->flags & CF_NEW))
1350 {
1351 /* need to start coroutine */
1352 next->flags &= ~CF_NEW;
1353 /* setup coroutine call */
1354 coro_setup (aTHX_ next);
1355 }
1356 else
1357 load_perl (aTHX_ next);
1358
1359 prev__cctx = prev->cctx;
1360
1361 /* possibly untie and reuse the cctx */
1362 if (expect_true (
1363 prev__cctx->idle_sp == STACKLEVEL
1364 && !(prev__cctx->flags & CC_TRACE)
1365 && !force_cctx
1366 ))
1367 {
1368 /* I assume that stacklevel is a stronger indicator than PL_top_env changes */
1369 assert (("FATAL: current top_env must equal previous top_env in Coro (please report)", PL_top_env == prev__cctx->idle_te));
1370
1371 prev->cctx = 0;
1372
1373 /* if the cctx is about to be destroyed we need to make sure we won't see it in cctx_get */
1374 /* without this the next cctx_get might destroy the prev__cctx while still in use */
1375 if (expect_false (CCTX_EXPIRED (prev__cctx)))
1376 if (!next->cctx)
1377 next->cctx = cctx_get (aTHX);
1378
1379 cctx_put (prev__cctx);
1380 }
1381
1382 ++next->usecount;
1383
1384 if (expect_true (!next->cctx))
1385 next->cctx = cctx_get (aTHX);
1386
1387 if (expect_false (prev__cctx != next->cctx))
1388 {
1389 prev__cctx->top_env = PL_top_env;
1390 PL_top_env = next->cctx->top_env;
1391 coro_transfer (&prev__cctx->cctx, &next->cctx->cctx);
1392 }
1393
1394 transfer_tail (aTHX);
1395 }
1396}
1397
1398#define TRANSFER(ta, force_cctx) transfer (aTHX_ (ta).prev, (ta).next, (force_cctx))
1399#define TRANSFER_CHECK(ta) transfer_check (aTHX_ (ta).prev, (ta).next)
1400
1401/** high level stuff ********************************************************/
1402
1403static int
1404coro_state_destroy (pTHX_ struct coro *coro)
1405{
1406 if (coro->flags & CF_DESTROYED)
1407 return 0;
1408
1409 if (coro->on_destroy)
1410 coro->on_destroy (aTHX_ coro);
1411
1412 coro->flags |= CF_DESTROYED;
1413
1414 if (coro->flags & CF_READY)
1415 {
1416 /* reduce nready, as destroying a ready coro effectively unreadies it */
1417 /* alternative: look through all ready queues and remove the coro */
1418 --coro_nready;
1419 }
1420 else
1421 coro->flags |= CF_READY; /* make sure it is NOT put into the readyqueue */
1422
1423 if (coro->mainstack && coro->mainstack != main_mainstack)
1424 {
1425 struct coro temp;
1426
1427 assert (("FATAL: tried to destroy currently running coroutine (please report)", !(coro->flags & CF_RUNNING)));
1428
1429 save_perl (aTHX_ &temp);
1430 load_perl (aTHX_ coro);
1431
1432 coro_destruct (aTHX_ coro);
1433
1434 load_perl (aTHX_ &temp);
1435
1436 coro->slot = 0;
1437 }
1438
1439 cctx_destroy (coro->cctx);
1440 SvREFCNT_dec (coro->args);
1441
1442 if (coro->next) coro->next->prev = coro->prev;
1443 if (coro->prev) coro->prev->next = coro->next;
1444 if (coro == coro_first) coro_first = coro->next;
1445
1446 return 1;
1447}
1448
1449static int
1450coro_state_free (pTHX_ SV *sv, MAGIC *mg)
1451{
1452 struct coro *coro = (struct coro *)mg->mg_ptr;
1453 mg->mg_ptr = 0;
1454
1455 coro->hv = 0;
1456
1457 if (--coro->refcnt < 0)
1458 {
1459 coro_state_destroy (aTHX_ coro);
1460 Safefree (coro);
1461 }
1462
1463 return 0;
1464}
1465
1466static int
1467coro_state_dup (pTHX_ MAGIC *mg, CLONE_PARAMS *params)
1468{
1469 struct coro *coro = (struct coro *)mg->mg_ptr;
1470
1471 ++coro->refcnt;
1472
1473 return 0;
1474}
1475
1476static MGVTBL coro_state_vtbl = {
1477 0, 0, 0, 0,
1478 coro_state_free,
1479 0,
1480#ifdef MGf_DUP
1481 coro_state_dup,
1482#else
1483# define MGf_DUP 0
1484#endif
1485};
1486
1487static void
1488prepare_transfer (pTHX_ struct coro_transfer_args *ta, SV *prev_sv, SV *next_sv)
1489{
1490 ta->prev = SvSTATE (prev_sv);
1491 ta->next = SvSTATE (next_sv);
1492 TRANSFER_CHECK (*ta);
1493}
1494
1495static void
1496api_transfer (pTHX_ SV *prev_sv, SV *next_sv)
1497{
1498 struct coro_transfer_args ta;
1499
1500 prepare_transfer (aTHX_ &ta, prev_sv, next_sv);
1501 TRANSFER (ta, 1);
1502}
1503
1504/*****************************************************************************/
1505/* gensub: simple closure generation utility */
1506
1507#define GENSUB_ARG CvXSUBANY (cv).any_ptr
1508
1509/* create a closure from XS, returns a code reference */
1510/* the arg can be accessed via GENSUB_ARG from the callback */
1511/* the callback must use dXSARGS/XSRETURN */
1512static SV *
1513gensub (pTHX_ void (*xsub)(pTHX_ CV *), void *arg)
1514{
1515 CV *cv = (CV *)newSV (0);
1516
1517 sv_upgrade ((SV *)cv, SVt_PVCV);
1518
1519 CvANON_on (cv);
1520 CvISXSUB_on (cv);
1521 CvXSUB (cv) = xsub;
1522 GENSUB_ARG = arg;
1523
1524 return newRV_noinc ((SV *)cv);
1525}
1526
1527/** Coro ********************************************************************/
1528
1529INLINE void
1530coro_enq (pTHX_ struct coro *coro)
1531{
1532 av_push (coro_ready [coro->prio - PRIO_MIN], SvREFCNT_inc_NN (coro->hv));
1533}
1534
1535INLINE SV *
1536coro_deq (pTHX)
1537{
1538 int prio;
1539
1540 for (prio = PRIO_MAX - PRIO_MIN + 1; --prio >= 0; )
1541 if (AvFILLp (coro_ready [prio]) >= 0)
1542 return av_shift (coro_ready [prio]);
1543
1544 return 0;
1545}
1546
1547static int
1548api_ready (pTHX_ SV *coro_sv)
1549{
1550 struct coro *coro;
1551 SV *sv_hook;
1552 void (*xs_hook)(void);
1553
1554 if (SvROK (coro_sv))
1555 coro_sv = SvRV (coro_sv);
1556
1557 coro = SvSTATE (coro_sv);
1558
1559 if (coro->flags & CF_READY)
1560 return 0;
1561
1562 coro->flags |= CF_READY;
1563
1564 sv_hook = coro_nready ? 0 : coro_readyhook;
1565 xs_hook = coro_nready ? 0 : coroapi.readyhook;
1566
1567 coro_enq (aTHX_ coro);
1568 ++coro_nready;
1569
1570 if (sv_hook)
1571 {
1572 dSP;
1573
1574 ENTER;
1575 SAVETMPS;
1576
1577 PUSHMARK (SP);
1578 PUTBACK;
1579 call_sv (sv_hook, G_VOID | G_DISCARD);
1580
1581 FREETMPS;
1582 LEAVE;
1583 }
1584
1585 if (xs_hook)
1586 xs_hook ();
1587
1588 return 1;
1589}
1590
1591static int
1592api_is_ready (pTHX_ SV *coro_sv)
1593{
1594 return !!(SvSTATE (coro_sv)->flags & CF_READY);
1595}
1596
1597INLINE void
1598prepare_schedule (pTHX_ struct coro_transfer_args *ta)
1599{
1600 SV *prev_sv, *next_sv;
1601
1602 for (;;)
1603 {
1604 next_sv = coro_deq (aTHX);
1605
1606 /* nothing to schedule: call the idle handler */
1607 if (expect_false (!next_sv))
1608 {
1609 dSP;
1610
1611 ENTER;
1612 SAVETMPS;
1613
1614 PUSHMARK (SP);
1615 PUTBACK;
1616 call_sv (get_sv ("Coro::idle", FALSE), G_VOID | G_DISCARD);
1617
1618 FREETMPS;
1619 LEAVE;
1620 continue;
1621 }
1622
1623 ta->next = SvSTATE_hv (next_sv);
1624
1625 /* cannot transfer to destroyed coros, skip and look for next */
1626 if (expect_false (ta->next->flags & CF_DESTROYED))
1627 {
1628 SvREFCNT_dec (next_sv);
1629 /* coro_nready has already been taken care of by destroy */
1630 continue;
1631 }
1632
1633 --coro_nready;
1634 break;
1635 }
1636
1637 /* free this only after the transfer */
1638 prev_sv = SvRV (coro_current);
1639 ta->prev = SvSTATE_hv (prev_sv);
1640 TRANSFER_CHECK (*ta);
1641 assert (("FATAL: next coroutine isn't marked as ready in Coro (please report)", ta->next->flags & CF_READY));
1642 ta->next->flags &= ~CF_READY;
1643 SvRV_set (coro_current, next_sv);
1644
1645 free_coro_mortal (aTHX);
1646 coro_mortal = prev_sv;
1647}
1648
1649INLINE void
1650prepare_cede (pTHX_ struct coro_transfer_args *ta)
1651{
1652 api_ready (aTHX_ coro_current);
1653 prepare_schedule (aTHX_ ta);
1654}
1655
1656INLINE void
1657prepare_cede_notself (pTHX_ struct coro_transfer_args *ta)
1658{
1659 SV *prev = SvRV (coro_current);
1660
1661 if (coro_nready)
1662 {
1663 prepare_schedule (aTHX_ ta);
1664 api_ready (aTHX_ prev);
1665 }
1666 else
1667 prepare_nop (aTHX_ ta);
1668}
1669
1670static void
1671api_schedule (pTHX)
1672{
1673 struct coro_transfer_args ta;
1674
1675 prepare_schedule (aTHX_ &ta);
1676 TRANSFER (ta, 1);
1677}
1678
1679static int
1680api_cede (pTHX)
1681{
1682 struct coro_transfer_args ta;
1683
1684 prepare_cede (aTHX_ &ta);
1685
1686 if (expect_true (ta.prev != ta.next))
1687 {
1688 TRANSFER (ta, 1);
1689 return 1;
1690 }
1691 else
1692 return 0;
1693}
1694
1695static int
1696api_cede_notself (pTHX)
1697{
1698 if (coro_nready)
1699 {
1700 struct coro_transfer_args ta;
1701
1702 prepare_cede_notself (aTHX_ &ta);
1703 TRANSFER (ta, 1);
1704 return 1;
1705 }
1706 else
1707 return 0;
1708}
1709
1710static void
1711api_trace (pTHX_ SV *coro_sv, int flags)
1712{
1713 struct coro *coro = SvSTATE (coro_sv);
1714
1715 if (flags & CC_TRACE)
1716 {
1717 if (!coro->cctx)
1718 coro->cctx = cctx_new_run ();
1719 else if (!(coro->cctx->flags & CC_TRACE))
1720 croak ("cannot enable tracing on coroutine with custom stack,");
1721
1722 coro->cctx->flags |= CC_NOREUSE | (flags & (CC_TRACE | CC_TRACE_ALL));
1723 }
1724 else if (coro->cctx && coro->cctx->flags & CC_TRACE)
1725 {
1726 coro->cctx->flags &= ~(CC_TRACE | CC_TRACE_ALL);
1727
1728 if (coro->flags & CF_RUNNING)
1729 PL_runops = RUNOPS_DEFAULT;
1730 else
1731 coro->slot->runops = RUNOPS_DEFAULT;
1732 }
1733}
1734
1735/*****************************************************************************/
1736/* rouse callback */
1737
1738#define CORO_MAGIC_type_rouse PERL_MAGIC_ext
1739
1740static void
1741coro_rouse_callback (pTHX_ CV *cv)
1742{
1743 dXSARGS;
1744 SV *data = (SV *)GENSUB_ARG;
1745
1746 if (SvTYPE (SvRV (data)) != SVt_PVAV)
1747 {
1748 /* first call, set args */
1749 int i;
1750 AV *av = newAV ();
1751 SV *coro = SvRV (data);
1752
1753 SvRV_set (data, (SV *)av);
1754 api_ready (aTHX_ coro);
1755 SvREFCNT_dec (coro);
1756
1757 /* better take a full copy of the arguments */
1758 while (items--)
1759 av_store (av, items, newSVsv (ST (items)));
1760 }
1761
1762 XSRETURN_EMPTY;
1763}
1764
1765static int
1766slf_check_rouse_wait (pTHX_ struct CoroSLF *frame)
1767{
1768 SV *data = (SV *)frame->data;
1769
1770 if (CORO_THROW)
1771 return 0;
1772
1773 if (SvTYPE (SvRV (data)) != SVt_PVAV)
1774 return 1;
1775
1776 /* now push all results on the stack */
1777 {
1778 dSP;
1779 AV *av = (AV *)SvRV (data);
1780 int i;
1781
1782 EXTEND (SP, AvFILLp (av) + 1);
1783 for (i = 0; i <= AvFILLp (av); ++i)
1784 PUSHs (sv_2mortal (AvARRAY (av)[i]));
1785
1786 /* we have stolen the elements, so ste length to zero and free */
1787 AvFILLp (av) = -1;
1788 av_undef (av);
1789
1790 PUTBACK;
1791 }
1792
1793 return 0;
1794}
1795
1796static void
1797slf_init_rouse_wait (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
1798{
1799 SV *cb;
1800
1801 if (items)
1802 cb = arg [0];
1803 else
1804 {
1805 struct coro *coro = SvSTATE_current;
1806
1807 if (!coro->rouse_cb)
1808 croak ("Coro::rouse_wait called without rouse callback, and no default rouse callback found either,");
1809
1810 cb = sv_2mortal (coro->rouse_cb);
1811 coro->rouse_cb = 0;
1812 }
1813
1814 if (!SvROK (cb)
1815 || SvTYPE (SvRV (cb)) != SVt_PVCV
1816 || CvXSUB ((CV *)SvRV (cb)) != coro_rouse_callback)
1817 croak ("Coro::rouse_wait called with illegal callback argument,");
1818
1819 {
1820 CV *cv = (CV *)SvRV (cb); /* for GENSUB_ARG */
1821 SV *data = (SV *)GENSUB_ARG;
1822
1823 frame->data = (void *)data;
1824 frame->prepare = SvTYPE (SvRV (data)) == SVt_PVAV ? prepare_nop : prepare_schedule;
1825 frame->check = slf_check_rouse_wait;
1826 }
1827}
1828
1829static SV *
1830coro_new_rouse_cb (pTHX)
1831{
1832 HV *hv = (HV *)SvRV (coro_current);
1833 struct coro *coro = SvSTATE_hv (hv);
1834 SV *data = newRV_inc ((SV *)hv);
1835 SV *cb = gensub (aTHX_ coro_rouse_callback, (void *)data);
1836
1837 sv_magicext (SvRV (cb), data, CORO_MAGIC_type_rouse, 0, 0, 0);
1838 SvREFCNT_dec (data); /* magicext increases the refcount */
1839
1840 SvREFCNT_dec (coro->rouse_cb);
1841 coro->rouse_cb = SvREFCNT_inc_NN (cb);
1842
1843 return cb;
1844}
1845
1846/*****************************************************************************/
1847/* schedule-like-function opcode (SLF) */
1848
1849static UNOP slf_restore; /* restore stack as entersub did, for first-re-run */
1850static const CV *slf_cv;
1851static SV **slf_argv;
1852static int slf_argc, slf_arga; /* count, allocated */
1853static I32 slf_ax; /* top of stack, for restore */
1854
1855/* this restores the stack in the case we patched the entersub, to */
1856/* recreate the stack frame as perl will on following calls */
1857/* since entersub cleared the stack */
1858static OP *
1859pp_restore (pTHX)
1860{
1861 int i;
1862 SV **SP = PL_stack_base + slf_ax;
1863
1864 PUSHMARK (SP);
1865
1866 EXTEND (SP, slf_argc + 1);
1867
1868 for (i = 0; i < slf_argc; ++i)
1869 PUSHs (sv_2mortal (slf_argv [i]));
1870
1871 PUSHs ((SV *)CvGV (slf_cv));
1872
1873 RETURNOP (slf_restore.op_first);
1874}
1875
1876static void
1877slf_prepare_transfer (pTHX_ struct coro_transfer_args *ta)
1878{
1879 SV **arg = (SV **)slf_frame.data;
1880
1881 prepare_transfer (aTHX_ ta, arg [0], arg [1]);
1882}
1883
1884static void
1885slf_init_transfer (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
1886{
1887 if (items != 2)
1888 croak ("Coro::State::transfer (prev, next) expects two arguments, not %d,", items);
1889
1890 frame->prepare = slf_prepare_transfer;
1891 frame->check = slf_check_nop;
1892 frame->data = (void *)arg; /* let's hope it will stay valid */
1893}
1894
1895static void
1896slf_init_schedule (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
1897{
1898 frame->prepare = prepare_schedule;
1899 frame->check = slf_check_nop;
1900}
1901
1902static void
1903slf_init_cede (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
1904{
1905 frame->prepare = prepare_cede;
1906 frame->check = slf_check_nop;
1907}
1908
1909static void
1910slf_init_cede_notself (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
1911{
1912 frame->prepare = prepare_cede_notself;
1913 frame->check = slf_check_nop;
1914}
1915
1916/*
1917 * these not obviously related functions are all rolled into one
1918 * function to increase chances that they all will call transfer with the same
1919 * stack offset
1920 * SLF stands for "schedule-like-function".
1921 */
1922static OP *
1923pp_slf (pTHX)
1924{
1925 I32 checkmark; /* mark SP to see how many elements check has pushed */
1926
1927 /* set up the slf frame, unless it has already been set-up */
1928 /* the latter happens when a new coro has been started */
1929 /* or when a new cctx was attached to an existing coroutine */
1930 if (expect_true (!slf_frame.prepare))
1931 {
1932 /* first iteration */
1933 dSP;
1934 SV **arg = PL_stack_base + TOPMARK + 1;
1935 int items = SP - arg; /* args without function object */
1936 SV *gv = *sp;
1937
1938 /* do a quick consistency check on the "function" object, and if it isn't */
1939 /* for us, divert to the real entersub */
1940 if (SvTYPE (gv) != SVt_PVGV
1941 || !GvCV (gv)
1942 || !(CvFLAGS (GvCV (gv)) & CVf_SLF))
1943 return PL_ppaddr[OP_ENTERSUB](aTHX);
1944
1945 if (!(PL_op->op_flags & OPf_STACKED))
1946 {
1947 /* ampersand-form of call, use @_ instead of stack */
1948 AV *av = GvAV (PL_defgv);
1949 arg = AvARRAY (av);
1950 items = AvFILLp (av) + 1;
1951 }
1952
1953 /* now call the init function, which needs to set up slf_frame */
1954 ((coro_slf_cb)CvXSUBANY (GvCV (gv)).any_ptr)
1955 (aTHX_ &slf_frame, GvCV (gv), arg, items);
1956
1957 /* pop args */
1958 SP = PL_stack_base + POPMARK;
1959
1960 PUTBACK;
1961 }
1962
1963 /* now that we have a slf_frame, interpret it! */
1964 /* we use a callback system not to make the code needlessly */
1965 /* complicated, but so we can run multiple perl coros from one cctx */
1966
1967 do
1968 {
1969 struct coro_transfer_args ta;
1970
1971 slf_frame.prepare (aTHX_ &ta);
1972 TRANSFER (ta, 0);
1973
1974 checkmark = PL_stack_sp - PL_stack_base;
1975 }
1976 while (slf_frame.check (aTHX_ &slf_frame));
1977
1978 slf_frame.prepare = 0; /* invalidate the frame, we are done processing it */
1979
1980 /* exception handling */
1981 if (expect_false (CORO_THROW))
1982 {
1983 SV *exception = sv_2mortal (CORO_THROW);
1984
1985 CORO_THROW = 0;
1986 sv_setsv (ERRSV, exception);
1987 croak (0);
1988 }
1989
1990 /* return value handling - mostly like entersub */
1991 /* make sure we put something on the stack in scalar context */
1992 if (GIMME_V == G_SCALAR)
1993 {
1994 dSP;
1995 SV **bot = PL_stack_base + checkmark;
1996
1997 if (sp == bot) /* too few, push undef */
1998 bot [1] = &PL_sv_undef;
1999 else if (sp != bot + 1) /* too many, take last one */
2000 bot [1] = *sp;
2001
2002 SP = bot + 1;
2003
2004 PUTBACK;
2005 }
2006
2007 return NORMAL;
2008}
2009
2010static void
2011api_execute_slf (pTHX_ CV *cv, coro_slf_cb init_cb, I32 ax)
2012{
2013 int i;
2014 SV **arg = PL_stack_base + ax;
2015 int items = PL_stack_sp - arg + 1;
2016
2017 assert (("FATAL: SLF call with illegal CV value", !CvANON (cv)));
2018
2019 if (PL_op->op_ppaddr != PL_ppaddr [OP_ENTERSUB]
2020 && PL_op->op_ppaddr != pp_slf)
2021 croak ("FATAL: Coro SLF calls can only be made normally, not via goto or any other means, caught");
2022
2023 CvFLAGS (cv) |= CVf_SLF;
2024 CvXSUBANY (cv).any_ptr = (void *)init_cb;
2025 slf_cv = cv;
2026
2027 /* we patch the op, and then re-run the whole call */
2028 /* we have to put the same argument on the stack for this to work */
2029 /* and this will be done by pp_restore */
2030 slf_restore.op_next = (OP *)&slf_restore;
2031 slf_restore.op_type = OP_CUSTOM;
2032 slf_restore.op_ppaddr = pp_restore;
2033 slf_restore.op_first = PL_op;
2034
2035 slf_ax = ax - 1; /* undo the ax++ inside dAXMARK */
2036
2037 if (PL_op->op_flags & OPf_STACKED)
2038 {
2039 if (items > slf_arga)
2040 {
2041 slf_arga = items;
2042 free (slf_argv);
2043 slf_argv = malloc (slf_arga * sizeof (SV *));
2044 }
2045
2046 slf_argc = items;
2047
2048 for (i = 0; i < items; ++i)
2049 slf_argv [i] = SvREFCNT_inc (arg [i]);
2050 }
2051 else
2052 slf_argc = 0;
2053
2054 PL_op->op_ppaddr = pp_slf;
2055 PL_op->op_type = OP_CUSTOM; /* maybe we should leave it at entersub? */
2056
2057 PL_op = (OP *)&slf_restore;
2058}
2059
2060/*****************************************************************************/
2061/* PerlIO::cede */
2062
2063typedef struct
2064{
2065 PerlIOBuf base;
2066 NV next, every;
2067} PerlIOCede;
2068
2069static IV
2070PerlIOCede_pushed (pTHX_ PerlIO *f, const char *mode, SV *arg, PerlIO_funcs *tab)
2071{
2072 PerlIOCede *self = PerlIOSelf (f, PerlIOCede);
2073
2074 self->every = SvCUR (arg) ? SvNV (arg) : 0.01;
2075 self->next = nvtime () + self->every;
2076
2077 return PerlIOBuf_pushed (aTHX_ f, mode, Nullsv, tab);
2078}
2079
2080static SV *
2081PerlIOCede_getarg (pTHX_ PerlIO *f, CLONE_PARAMS *param, int flags)
2082{
2083 PerlIOCede *self = PerlIOSelf (f, PerlIOCede);
2084
2085 return newSVnv (self->every);
2086}
2087
2088static IV
2089PerlIOCede_flush (pTHX_ PerlIO *f)
2090{
2091 PerlIOCede *self = PerlIOSelf (f, PerlIOCede);
2092 double now = nvtime ();
2093
2094 if (now >= self->next)
2095 {
2096 api_cede (aTHX);
2097 self->next = now + self->every;
2098 }
2099
2100 return PerlIOBuf_flush (aTHX_ f);
2101}
2102
2103static PerlIO_funcs PerlIO_cede =
2104{
2105 sizeof(PerlIO_funcs),
2106 "cede",
2107 sizeof(PerlIOCede),
2108 PERLIO_K_DESTRUCT | PERLIO_K_RAW,
2109 PerlIOCede_pushed,
2110 PerlIOBuf_popped,
2111 PerlIOBuf_open,
2112 PerlIOBase_binmode,
2113 PerlIOCede_getarg,
2114 PerlIOBase_fileno,
2115 PerlIOBuf_dup,
2116 PerlIOBuf_read,
2117 PerlIOBuf_unread,
2118 PerlIOBuf_write,
2119 PerlIOBuf_seek,
2120 PerlIOBuf_tell,
2121 PerlIOBuf_close,
2122 PerlIOCede_flush,
2123 PerlIOBuf_fill,
2124 PerlIOBase_eof,
2125 PerlIOBase_error,
2126 PerlIOBase_clearerr,
2127 PerlIOBase_setlinebuf,
2128 PerlIOBuf_get_base,
2129 PerlIOBuf_bufsiz,
2130 PerlIOBuf_get_ptr,
2131 PerlIOBuf_get_cnt,
2132 PerlIOBuf_set_ptrcnt,
2133};
2134
2135/*****************************************************************************/
2136/* Coro::Semaphore & Coro::Signal */
2137
2138static SV *
2139coro_waitarray_new (pTHX_ int count)
2140{
2141 /* a semaphore contains a counter IV in $sem->[0] and any waiters after that */
2142 AV *av = newAV ();
2143 SV **ary;
2144
2145 /* unfortunately, building manually saves memory */
2146 Newx (ary, 2, SV *);
2147 AvALLOC (av) = ary;
2148 AvARRAY (av) = ary;
2149 AvMAX (av) = 1;
2150 AvFILLp (av) = 0;
2151 ary [0] = newSViv (count);
2152
2153 return newRV_noinc ((SV *)av);
2154}
2155
2156/* semaphore */
2157
2158static void
2159coro_semaphore_adjust (pTHX_ AV *av, IV adjust)
2160{
2161 SV *count_sv = AvARRAY (av)[0];
2162 IV count = SvIVX (count_sv);
2163
2164 count += adjust;
2165 SvIVX (count_sv) = count;
2166
2167 /* now wake up as many waiters as are expected to lock */
2168 while (count > 0 && AvFILLp (av) > 0)
2169 {
2170 SV *cb;
2171
2172 /* swap first two elements so we can shift a waiter */
2173 AvARRAY (av)[0] = AvARRAY (av)[1];
2174 AvARRAY (av)[1] = count_sv;
2175 cb = av_shift (av);
2176
2177 if (SvOBJECT (cb))
2178 api_ready (aTHX_ cb);
2179 else
2180 croak ("callbacks not yet supported");
2181
2182 SvREFCNT_dec (cb);
2183
2184 --count;
2185 }
2186}
2187
2188static void
2189coro_semaphore_on_destroy (pTHX_ struct coro *coro)
2190{
2191 /* call $sem->adjust (0) to possibly wake up some other waiters */
2192 coro_semaphore_adjust (aTHX_ (AV *)coro->slf_frame.data, 0);
2193}
2194
2195static int
2196slf_check_semaphore_down_or_wait (pTHX_ struct CoroSLF *frame, int acquire)
2197{
2198 AV *av = (AV *)frame->data;
2199 SV *count_sv = AvARRAY (av)[0];
2200
2201 /* if we are about to throw, don't actually acquire the lock, just throw */
2202 if (CORO_THROW)
2203 return 0;
2204 else if (SvIVX (count_sv) > 0)
2205 {
2206 SvSTATE_current->on_destroy = 0;
2207
2208 if (acquire)
2209 SvIVX (count_sv) = SvIVX (count_sv) - 1;
2210 else
2211 coro_semaphore_adjust (aTHX_ av, 0);
2212
2213 return 0;
2214 }
2215 else
2216 {
2217 int i;
2218 /* if we were woken up but can't down, we look through the whole */
2219 /* waiters list and only add us if we aren't in there already */
2220 /* this avoids some degenerate memory usage cases */
2221
2222 for (i = 1; i <= AvFILLp (av); ++i)
2223 if (AvARRAY (av)[i] == SvRV (coro_current))
2224 return 1;
2225
2226 av_push (av, SvREFCNT_inc (SvRV (coro_current)));
2227 return 1;
2228 }
2229}
2230
2231static int
2232slf_check_semaphore_down (pTHX_ struct CoroSLF *frame)
2233{
2234 return slf_check_semaphore_down_or_wait (aTHX_ frame, 1);
2235}
2236
2237static int
2238slf_check_semaphore_wait (pTHX_ struct CoroSLF *frame)
2239{
2240 return slf_check_semaphore_down_or_wait (aTHX_ frame, 0);
2241}
2242
2243static void
2244slf_init_semaphore_down_or_wait (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2245{
2246 AV *av = (AV *)SvRV (arg [0]);
2247
2248 if (SvIVX (AvARRAY (av)[0]) > 0)
2249 {
2250 frame->data = (void *)av;
2251 frame->prepare = prepare_nop;
2252 }
2253 else
2254 {
2255 av_push (av, SvREFCNT_inc (SvRV (coro_current)));
2256
2257 frame->data = (void *)sv_2mortal (SvREFCNT_inc ((SV *)av));
2258 frame->prepare = prepare_schedule;
2259
2260 /* to avoid race conditions when a woken-up coro gets terminated */
2261 /* we arrange for a temporary on_destroy that calls adjust (0) */
2262 SvSTATE_current->on_destroy = coro_semaphore_on_destroy;
2263 }
2264}
2265
2266static void
2267slf_init_semaphore_down (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2268{
2269 slf_init_semaphore_down_or_wait (aTHX_ frame, cv, arg, items);
2270 frame->check = slf_check_semaphore_down;
2271}
2272
2273static void
2274slf_init_semaphore_wait (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2275{
2276 slf_init_semaphore_down_or_wait (aTHX_ frame, cv, arg, items);
2277 frame->check = slf_check_semaphore_wait;
2278}
2279
2280/* signal */
2281
2282static void
2283coro_signal_wake (pTHX_ AV *av, int count)
2284{
2285 SvIVX (AvARRAY (av)[0]) = 0;
2286
2287 /* now signal count waiters */
2288 while (count > 0 && AvFILLp (av) > 0)
2289 {
2290 SV *cb;
2291
2292 /* swap first two elements so we can shift a waiter */
2293 cb = AvARRAY (av)[0];
2294 AvARRAY (av)[0] = AvARRAY (av)[1];
2295 AvARRAY (av)[1] = cb;
2296
2297 cb = av_shift (av);
2298
2299 api_ready (aTHX_ cb);
2300 sv_setiv (cb, 0); /* signal waiter */
2301 SvREFCNT_dec (cb);
2302
2303 --count;
2304 }
2305}
2306
2307static int
2308slf_check_signal_wait (pTHX_ struct CoroSLF *frame)
2309{
2310 /* if we are about to throw, also stop waiting */
2311 return SvROK ((SV *)frame->data) && !CORO_THROW;
2312}
2313
2314static void
2315slf_init_signal_wait (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2316{
2317 AV *av = (AV *)SvRV (arg [0]);
2318
2319 if (SvIVX (AvARRAY (av)[0]))
2320 {
2321 SvIVX (AvARRAY (av)[0]) = 0;
2322 frame->prepare = prepare_nop;
2323 frame->check = slf_check_nop;
2324 }
2325 else
2326 {
2327 SV *waiter = newRV_inc (SvRV (coro_current)); /* owned by signal av */
2328
2329 av_push (av, waiter);
2330
2331 frame->data = (void *)sv_2mortal (SvREFCNT_inc_NN (waiter)); /* owned by process */
2332 frame->prepare = prepare_schedule;
2333 frame->check = slf_check_signal_wait;
2334 }
2335}
2336
2337/*****************************************************************************/
2338/* Coro::AIO */
2339
2340#define CORO_MAGIC_type_aio PERL_MAGIC_ext
2341
2342/* helper storage struct */
2343struct io_state
2344{
2345 int errorno;
2346 I32 laststype; /* U16 in 5.10.0 */
2347 int laststatval;
2348 Stat_t statcache;
2349};
2350
2351static void
2352coro_aio_callback (pTHX_ CV *cv)
2353{
2354 dXSARGS;
2355 AV *state = (AV *)GENSUB_ARG;
2356 SV *coro = av_pop (state);
2357 SV *data_sv = newSV (sizeof (struct io_state));
2358
2359 av_extend (state, items);
2360
2361 sv_upgrade (data_sv, SVt_PV);
2362 SvCUR_set (data_sv, sizeof (struct io_state));
2363 SvPOK_only (data_sv);
2364
2365 {
2366 struct io_state *data = (struct io_state *)SvPVX (data_sv);
2367
2368 data->errorno = errno;
2369 data->laststype = PL_laststype;
2370 data->laststatval = PL_laststatval;
2371 data->statcache = PL_statcache;
2372 }
2373
2374 /* now build the result vector out of all the parameters and the data_sv */
2375 {
2376 int i;
2377
2378 for (i = 0; i < items; ++i)
2379 av_push (state, SvREFCNT_inc_NN (ST (i)));
2380 }
2381
2382 av_push (state, data_sv);
2383
2384 api_ready (aTHX_ coro);
2385 SvREFCNT_dec (coro);
2386 SvREFCNT_dec ((AV *)state);
2387}
2388
2389static int
2390slf_check_aio_req (pTHX_ struct CoroSLF *frame)
2391{
2392 AV *state = (AV *)frame->data;
2393
2394 /* if we are about to throw, return early */
2395 /* this does not cancel the aio request, but at least */
2396 /* it quickly returns */
2397 if (CORO_THROW)
2398 return 0;
2399
2400 /* one element that is an RV? repeat! */
2401 if (AvFILLp (state) == 0 && SvROK (AvARRAY (state)[0]))
2402 return 1;
2403
2404 /* restore status */
2405 {
2406 SV *data_sv = av_pop (state);
2407 struct io_state *data = (struct io_state *)SvPVX (data_sv);
2408
2409 errno = data->errorno;
2410 PL_laststype = data->laststype;
2411 PL_laststatval = data->laststatval;
2412 PL_statcache = data->statcache;
2413
2414 SvREFCNT_dec (data_sv);
2415 }
2416
2417 /* push result values */
2418 {
2419 dSP;
2420 int i;
2421
2422 EXTEND (SP, AvFILLp (state) + 1);
2423 for (i = 0; i <= AvFILLp (state); ++i)
2424 PUSHs (sv_2mortal (SvREFCNT_inc_NN (AvARRAY (state)[i])));
2425
2426 PUTBACK;
2427 }
2428
2429 return 0;
2430}
2431
2432static void
2433slf_init_aio_req (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2434{
2435 AV *state = (AV *)sv_2mortal ((SV *)newAV ());
2436 SV *coro_hv = SvRV (coro_current);
2437 struct coro *coro = SvSTATE_hv (coro_hv);
2438
2439 /* put our coroutine id on the state arg */
2440 av_push (state, SvREFCNT_inc_NN (coro_hv));
2441
2442 /* first see whether we have a non-zero priority and set it as AIO prio */
2443 if (coro->prio)
2444 {
2445 dSP;
2446
2447 static SV *prio_cv;
2448 static SV *prio_sv;
2449
2450 if (expect_false (!prio_cv))
2451 {
2452 prio_cv = (SV *)get_cv ("IO::AIO::aioreq_pri", 0);
2453 prio_sv = newSViv (0);
2454 }
2455
2456 PUSHMARK (SP);
2457 sv_setiv (prio_sv, coro->prio);
2458 XPUSHs (prio_sv);
2459
2460 PUTBACK;
2461 call_sv (prio_cv, G_VOID | G_DISCARD);
2462 }
2463
2464 /* now call the original request */
2465 {
2466 dSP;
2467 CV *req = (CV *)CORO_MAGIC_NN ((SV *)cv, CORO_MAGIC_type_aio)->mg_obj;
2468 int i;
2469
2470 PUSHMARK (SP);
2471
2472 /* first push all args to the stack */
2473 EXTEND (SP, items + 1);
2474
2475 for (i = 0; i < items; ++i)
2476 PUSHs (arg [i]);
2477
2478 /* now push the callback closure */
2479 PUSHs (sv_2mortal (gensub (aTHX_ coro_aio_callback, (void *)SvREFCNT_inc_NN ((SV *)state))));
2480
2481 /* now call the AIO function - we assume our request is uncancelable */
2482 PUTBACK;
2483 call_sv ((SV *)req, G_VOID | G_DISCARD);
2484 }
2485
2486 /* now that the requets is going, we loop toll we have a result */
2487 frame->data = (void *)state;
2488 frame->prepare = prepare_schedule;
2489 frame->check = slf_check_aio_req;
2490}
2491
2492static void
2493coro_aio_req_xs (pTHX_ CV *cv)
2494{
2495 dXSARGS;
2496
2497 CORO_EXECUTE_SLF_XS (slf_init_aio_req);
2498
2499 XSRETURN_EMPTY;
2500}
2501
2502/*****************************************************************************/
2503
2504MODULE = Coro::State PACKAGE = Coro::State PREFIX = api_
2505
2506PROTOTYPES: DISABLE
2507
2508BOOT:
2509{
2510#ifdef USE_ITHREADS
2511# if CORO_PTHREAD
2512 coro_thx = PERL_GET_CONTEXT;
2513# endif
2514#endif
2515 BOOT_PAGESIZE;
2516
2517 irsgv = gv_fetchpv ("/" , GV_ADD|GV_NOTQUAL, SVt_PV);
2518 stdoutgv = gv_fetchpv ("STDOUT", GV_ADD|GV_NOTQUAL, SVt_PVIO);
2519
2520 orig_sigelem_get = PL_vtbl_sigelem.svt_get; PL_vtbl_sigelem.svt_get = coro_sigelem_get;
2521 orig_sigelem_set = PL_vtbl_sigelem.svt_set; PL_vtbl_sigelem.svt_set = coro_sigelem_set;
2522 orig_sigelem_clr = PL_vtbl_sigelem.svt_clear; PL_vtbl_sigelem.svt_clear = coro_sigelem_clr;
2523
2524 hv_sig = coro_get_hv (aTHX_ "SIG", TRUE);
2525 rv_diehook = newRV_inc ((SV *)gv_fetchpv ("Coro::State::diehook" , 0, SVt_PVCV));
2526 rv_warnhook = newRV_inc ((SV *)gv_fetchpv ("Coro::State::warnhook", 0, SVt_PVCV));
2527
2528 coro_state_stash = gv_stashpv ("Coro::State", TRUE);
2529
2530 newCONSTSUB (coro_state_stash, "CC_TRACE" , newSViv (CC_TRACE));
2531 newCONSTSUB (coro_state_stash, "CC_TRACE_SUB" , newSViv (CC_TRACE_SUB));
2532 newCONSTSUB (coro_state_stash, "CC_TRACE_LINE", newSViv (CC_TRACE_LINE));
2533 newCONSTSUB (coro_state_stash, "CC_TRACE_ALL" , newSViv (CC_TRACE_ALL));
2534
2535 main_mainstack = PL_mainstack;
2536 main_top_env = PL_top_env;
2537
2538 while (main_top_env->je_prev)
2539 main_top_env = main_top_env->je_prev;
2540
2541 {
2542 SV *slf = sv_2mortal (newSViv (PTR2IV (pp_slf)));
2543
2544 if (!PL_custom_op_names) PL_custom_op_names = newHV ();
2545 hv_store_ent (PL_custom_op_names, slf,
2546 newSVpv ("coro_slf", 0), 0);
2547
2548 if (!PL_custom_op_descs) PL_custom_op_descs = newHV ();
2549 hv_store_ent (PL_custom_op_descs, slf,
2550 newSVpv ("coro schedule like function", 0), 0);
2551 }
2552
2553 coroapi.ver = CORO_API_VERSION;
2554 coroapi.rev = CORO_API_REVISION;
2555
2556 coroapi.transfer = api_transfer;
2557
2558 coroapi.sv_state = SvSTATE_;
2559 coroapi.execute_slf = api_execute_slf;
2560 coroapi.prepare_nop = prepare_nop;
2561 coroapi.prepare_schedule = prepare_schedule;
2562 coroapi.prepare_cede = prepare_cede;
2563 coroapi.prepare_cede_notself = prepare_cede_notself;
2564
2565 {
2566 SV **svp = hv_fetch (PL_modglobal, "Time::NVtime", 12, 0);
2567
2568 if (!svp) croak ("Time::HiRes is required");
2569 if (!SvIOK (*svp)) croak ("Time::NVtime isn't a function pointer");
2570
2571 nvtime = INT2PTR (double (*)(), SvIV (*svp));
2572 }
2573
2574 assert (("PRIO_NORMAL must be 0", !PRIO_NORMAL));
2575}
2576
2577SV *
2578new (char *klass, ...)
2579 CODE:
2580{
2581 struct coro *coro;
2582 MAGIC *mg;
2583 HV *hv;
2584 int i;
2585
2586 Newz (0, coro, 1, struct coro);
2587 coro->args = newAV ();
2588 coro->flags = CF_NEW;
2589
2590 if (coro_first) coro_first->prev = coro;
2591 coro->next = coro_first;
2592 coro_first = coro;
2593
2594 coro->hv = hv = newHV ();
2595 mg = sv_magicext ((SV *)hv, 0, CORO_MAGIC_type_state, &coro_state_vtbl, (char *)coro, 0);
2596 mg->mg_flags |= MGf_DUP;
2597 RETVAL = sv_bless (newRV_noinc ((SV *)hv), gv_stashpv (klass, 1));
2598
2599 av_extend (coro->args, items - 1);
2600 for (i = 1; i < items; i++)
2601 av_push (coro->args, newSVsv (ST (i)));
2602}
2603 OUTPUT:
2604 RETVAL
2605
2606void
2607transfer (...)
2608 PROTOTYPE: $$
2609 CODE:
2610 CORO_EXECUTE_SLF_XS (slf_init_transfer);
2611
2612bool
2613_destroy (SV *coro_sv)
2614 CODE:
2615 RETVAL = coro_state_destroy (aTHX_ SvSTATE (coro_sv));
2616 OUTPUT:
2617 RETVAL
2618
2619void
2620_exit (int code)
2621 PROTOTYPE: $
2622 CODE:
2623 _exit (code);
2624
2625int
2626cctx_stacksize (int new_stacksize = 0)
2627 PROTOTYPE: ;$
2628 CODE:
2629 RETVAL = cctx_stacksize;
2630 if (new_stacksize)
2631 {
2632 cctx_stacksize = new_stacksize;
2633 ++cctx_gen;
2634 }
2635 OUTPUT:
2636 RETVAL
2637
2638int
2639cctx_max_idle (int max_idle = 0)
2640 PROTOTYPE: ;$
2641 CODE:
2642 RETVAL = cctx_max_idle;
2643 if (max_idle > 1)
2644 cctx_max_idle = max_idle;
2645 OUTPUT:
2646 RETVAL
2647
2648int
2649cctx_count ()
2650 PROTOTYPE:
2651 CODE:
2652 RETVAL = cctx_count;
2653 OUTPUT:
2654 RETVAL
2655
2656int
2657cctx_idle ()
2658 PROTOTYPE:
2659 CODE:
2660 RETVAL = cctx_idle;
2661 OUTPUT:
2662 RETVAL
2663
2664void
2665list ()
2666 PROTOTYPE:
2667 PPCODE:
2668{
2669 struct coro *coro;
2670 for (coro = coro_first; coro; coro = coro->next)
2671 if (coro->hv)
2672 XPUSHs (sv_2mortal (newRV_inc ((SV *)coro->hv)));
2673}
2674
2675void
2676call (Coro::State coro, SV *coderef)
2677 ALIAS:
2678 eval = 1
2679 CODE:
2680{
2681 if (coro->mainstack && ((coro->flags & CF_RUNNING) || coro->slot))
2682 {
2683 struct coro temp;
2684
2685 if (!(coro->flags & CF_RUNNING))
235 { 2686 {
236 /* I never used formats, so how should I know how these are implemented? */ 2687 PUTBACK;
237 /* my bold guess is as a simple, plain sub... */ 2688 save_perl (aTHX_ &temp);
238 croak ("CXt_FORMAT not yet handled. Don't switch coroutines from within formats"); 2689 load_perl (aTHX_ coro);
2690 }
2691
2692 {
2693 dSP;
2694 ENTER;
2695 SAVETMPS;
2696 PUTBACK;
2697 PUSHSTACK;
2698 PUSHMARK (SP);
2699
2700 if (ix)
2701 eval_sv (coderef, 0);
2702 else
2703 call_sv (coderef, G_KEEPERR | G_EVAL | G_VOID | G_DISCARD);
2704
2705 POPSTACK;
2706 SPAGAIN;
2707 FREETMPS;
2708 LEAVE;
2709 PUTBACK;
2710 }
2711
2712 if (!(coro->flags & CF_RUNNING))
2713 {
2714 save_perl (aTHX_ coro);
2715 load_perl (aTHX_ &temp);
2716 SPAGAIN;
239 } 2717 }
240 } 2718 }
241
242 if (top_si->si_type == PERLSI_MAIN)
243 break;
244
245 top_si = top_si->si_prev;
246 ccstk = top_si->si_cxstack;
247 cxix = top_si->si_cxix;
248 }
249
250 PUTBACK;
251 }
252
253 c->dowarn = PL_dowarn;
254 c->defav = GvAV (PL_defgv);
255 c->curstackinfo = PL_curstackinfo;
256 c->curstack = PL_curstack;
257 c->mainstack = PL_mainstack;
258 c->stack_sp = PL_stack_sp;
259 c->op = PL_op;
260 c->curpad = PL_curpad;
261 c->stack_base = PL_stack_base;
262 c->stack_max = PL_stack_max;
263 c->tmps_stack = PL_tmps_stack;
264 c->tmps_floor = PL_tmps_floor;
265 c->tmps_ix = PL_tmps_ix;
266 c->tmps_max = PL_tmps_max;
267 c->markstack = PL_markstack;
268 c->markstack_ptr = PL_markstack_ptr;
269 c->markstack_max = PL_markstack_max;
270 c->scopestack = PL_scopestack;
271 c->scopestack_ix = PL_scopestack_ix;
272 c->scopestack_max = PL_scopestack_max;
273 c->savestack = PL_savestack;
274 c->savestack_ix = PL_savestack_ix;
275 c->savestack_max = PL_savestack_max;
276 c->retstack = PL_retstack;
277 c->retstack_ix = PL_retstack_ix;
278 c->retstack_max = PL_retstack_max;
279 c->curcop = PL_curcop;
280} 2719}
281 2720
282#define LOAD(state) do { load_state(aTHX_ state); SPAGAIN; } while (0) 2721SV *
283#define SAVE(state) do { PUTBACK; save_state(aTHX_ state); } while (0) 2722is_ready (Coro::State coro)
2723 PROTOTYPE: $
2724 ALIAS:
2725 is_ready = CF_READY
2726 is_running = CF_RUNNING
2727 is_new = CF_NEW
2728 is_destroyed = CF_DESTROYED
2729 CODE:
2730 RETVAL = boolSV (coro->flags & ix);
2731 OUTPUT:
2732 RETVAL
284 2733
285static void 2734void
286load_state(pTHX_ Coro__State c) 2735throw (Coro::State self, SV *throw = &PL_sv_undef)
2736 PROTOTYPE: $;$
2737 CODE:
287{ 2738{
288 PL_dowarn = c->dowarn; 2739 struct coro *current = SvSTATE_current;
289 GvAV (PL_defgv) = c->defav; 2740 SV **throwp = self == current ? &CORO_THROW : &self->except;
290 PL_curstackinfo = c->curstackinfo; 2741 SvREFCNT_dec (*throwp);
291 PL_curstack = c->curstack; 2742 *throwp = SvOK (throw) ? newSVsv (throw) : 0;
292 PL_mainstack = c->mainstack; 2743}
293 PL_stack_sp = c->stack_sp;
294 PL_op = c->op;
295 PL_curpad = c->curpad;
296 PL_stack_base = c->stack_base;
297 PL_stack_max = c->stack_max;
298 PL_tmps_stack = c->tmps_stack;
299 PL_tmps_floor = c->tmps_floor;
300 PL_tmps_ix = c->tmps_ix;
301 PL_tmps_max = c->tmps_max;
302 PL_markstack = c->markstack;
303 PL_markstack_ptr = c->markstack_ptr;
304 PL_markstack_max = c->markstack_max;
305 PL_scopestack = c->scopestack;
306 PL_scopestack_ix = c->scopestack_ix;
307 PL_scopestack_max = c->scopestack_max;
308 PL_savestack = c->savestack;
309 PL_savestack_ix = c->savestack_ix;
310 PL_savestack_max = c->savestack_max;
311 PL_retstack = c->retstack;
312 PL_retstack_ix = c->retstack_ix;
313 PL_retstack_max = c->retstack_max;
314 PL_curcop = c->curcop;
315 2744
2745void
2746api_trace (SV *coro, int flags = CC_TRACE | CC_TRACE_SUB)
2747 PROTOTYPE: $;$
2748 C_ARGS: aTHX_ coro, flags
2749
2750SV *
2751has_cctx (Coro::State coro)
2752 PROTOTYPE: $
2753 CODE:
2754 RETVAL = boolSV (!!coro->cctx);
2755 OUTPUT:
2756 RETVAL
2757
2758int
2759is_traced (Coro::State coro)
2760 PROTOTYPE: $
2761 CODE:
2762 RETVAL = (coro->cctx ? coro->cctx->flags : 0) & CC_TRACE_ALL;
2763 OUTPUT:
2764 RETVAL
2765
2766UV
2767rss (Coro::State coro)
2768 PROTOTYPE: $
2769 ALIAS:
2770 usecount = 1
2771 CODE:
2772 switch (ix)
316 { 2773 {
317 dSP; 2774 case 0: RETVAL = coro_rss (aTHX_ coro); break;
318 CV *cv; 2775 case 1: RETVAL = coro->usecount; break;
319
320 /* now do the ugly restore mess */
321 while ((cv = (CV *)POPs))
322 {
323 AV *padlist = (AV *)POPs;
324
325 put_padlist (cv);
326 CvPADLIST(cv) = padlist;
327 CvDEPTH(cv) = (I32)POPs;
328
329#ifdef USE_THREADS
330 CvOWNER(cv) = (struct perl_thread *)POPs;
331 error does not work either
332#endif
333 } 2776 }
2777 OUTPUT:
2778 RETVAL
334 2779
335 PUTBACK; 2780void
336 } 2781force_cctx ()
337} 2782 PROTOTYPE:
2783 CODE:
2784 SvSTATE_current->cctx->idle_sp = 0;
338 2785
339/* this is an EXACT copy of S_nuke_stacks in perl.c, which is unfortunately static */ 2786void
340STATIC void 2787swap_defsv (Coro::State self)
341destroy_stacks(pTHX) 2788 PROTOTYPE: $
342{ 2789 ALIAS:
343 /* die does this while calling POPSTACK, but I just don't see why. */ 2790 swap_defav = 1
344 /* OTOH, die does not have a memleak, but we do... */ 2791 CODE:
345 dounwind(-1); 2792 if (!self->slot)
2793 croak ("cannot swap state with coroutine that has no saved state,");
2794 else
2795 {
2796 SV **src = ix ? (SV **)&GvAV (PL_defgv) : &GvSV (PL_defgv);
2797 SV **dst = ix ? (SV **)&self->slot->defav : (SV **)&self->slot->defsv;
346 2798
347 /* is this ugly, I ask? */ 2799 SV *tmp = *src; *src = *dst; *dst = tmp;
348 while (PL_scopestack_ix) 2800 }
349 LEAVE;
350 2801
351 while (PL_curstackinfo->si_next)
352 PL_curstackinfo = PL_curstackinfo->si_next;
353 2802
354 while (PL_curstackinfo)
355 {
356 PERL_SI *p = PL_curstackinfo->si_prev;
357
358 SvREFCNT_dec(PL_curstackinfo->si_stack);
359 Safefree(PL_curstackinfo->si_cxstack);
360 Safefree(PL_curstackinfo);
361 PL_curstackinfo = p;
362 }
363
364 if (PL_scopestack_ix != 0)
365 Perl_warner(aTHX_ WARN_INTERNAL,
366 "Unbalanced scopes: %ld more ENTERs than LEAVEs\n",
367 (long)PL_scopestack_ix);
368 if (PL_savestack_ix != 0)
369 Perl_warner(aTHX_ WARN_INTERNAL,
370 "Unbalanced saves: %ld more saves than restores\n",
371 (long)PL_savestack_ix);
372 if (PL_tmps_floor != -1)
373 Perl_warner(aTHX_ WARN_INTERNAL,"Unbalanced tmps: %ld more allocs than frees\n",
374 (long)PL_tmps_floor + 1);
375 /*
376 */
377 Safefree(PL_tmps_stack);
378 Safefree(PL_markstack);
379 Safefree(PL_scopestack);
380 Safefree(PL_savestack);
381 Safefree(PL_retstack);
382}
383
384#define SUB_INIT "Coro::State::_newcoro"
385
386MODULE = Coro::State PACKAGE = Coro::State 2803MODULE = Coro::State PACKAGE = Coro
387
388PROTOTYPES: ENABLE
389 2804
390BOOT: 2805BOOT:
391 if (!padlist_cache) 2806{
392 padlist_cache = newHV (); 2807 int i;
393 2808
394Coro::State 2809 av_async_pool = coro_get_av (aTHX_ "Coro::async_pool", TRUE);
395_newprocess(args) 2810 sv_pool_rss = coro_get_sv (aTHX_ "Coro::POOL_RSS" , TRUE);
396 SV * args 2811 sv_pool_size = coro_get_sv (aTHX_ "Coro::POOL_SIZE" , TRUE);
2812
2813 coro_current = coro_get_sv (aTHX_ "Coro::current", FALSE);
2814 SvREADONLY_on (coro_current);
2815
2816 coro_stash = gv_stashpv ("Coro", TRUE);
2817
2818 newCONSTSUB (coro_stash, "PRIO_MAX", newSViv (PRIO_MAX));
2819 newCONSTSUB (coro_stash, "PRIO_HIGH", newSViv (PRIO_HIGH));
2820 newCONSTSUB (coro_stash, "PRIO_NORMAL", newSViv (PRIO_NORMAL));
2821 newCONSTSUB (coro_stash, "PRIO_LOW", newSViv (PRIO_LOW));
2822 newCONSTSUB (coro_stash, "PRIO_IDLE", newSViv (PRIO_IDLE));
2823 newCONSTSUB (coro_stash, "PRIO_MIN", newSViv (PRIO_MIN));
2824
2825 for (i = PRIO_MAX - PRIO_MIN + 1; i--; )
2826 coro_ready[i] = newAV ();
2827
2828 {
2829 SV *sv = coro_get_sv (aTHX_ "Coro::API", TRUE);
2830
2831 coroapi.schedule = api_schedule;
2832 coroapi.cede = api_cede;
2833 coroapi.cede_notself = api_cede_notself;
2834 coroapi.ready = api_ready;
2835 coroapi.is_ready = api_is_ready;
2836 coroapi.nready = coro_nready;
2837 coroapi.current = coro_current;
2838
2839 /*GCoroAPI = &coroapi;*/
2840 sv_setiv (sv, (IV)&coroapi);
2841 SvREADONLY_on (sv);
2842 }
2843}
2844
2845void
2846schedule (...)
2847 CODE:
2848 CORO_EXECUTE_SLF_XS (slf_init_schedule);
2849
2850void
2851cede (...)
2852 CODE:
2853 CORO_EXECUTE_SLF_XS (slf_init_cede);
2854
2855void
2856cede_notself (...)
2857 CODE:
2858 CORO_EXECUTE_SLF_XS (slf_init_cede_notself);
2859
2860void
2861_set_current (SV *current)
397 PROTOTYPE: $ 2862 PROTOTYPE: $
2863 CODE:
2864 SvREFCNT_dec (SvRV (coro_current));
2865 SvRV_set (coro_current, SvREFCNT_inc_NN (SvRV (current)));
2866
2867void
2868_set_readyhook (SV *hook)
2869 PROTOTYPE: $
398 CODE: 2870 CODE:
399 Coro__State coro; 2871 SvREFCNT_dec (coro_readyhook);
2872 coro_readyhook = SvOK (hook) ? newSVsv (hook) : 0;
400 2873
401 if (!SvROK (args) || SvTYPE (SvRV (args)) != SVt_PVAV) 2874int
402 croak ("Coro::State::newprocess expects an arrayref"); 2875prio (Coro::State coro, int newprio = 0)
2876 PROTOTYPE: $;$
2877 ALIAS:
2878 nice = 1
403 2879 CODE:
404 New (0, coro, 1, struct coro); 2880{
405
406 coro->mainstack = 0; /* actual work is done inside transfer */
407 coro->args = (AV *)SvREFCNT_inc (SvRV (args));
408
409 RETVAL = coro; 2881 RETVAL = coro->prio;
2882
2883 if (items > 1)
2884 {
2885 if (ix)
2886 newprio = coro->prio - newprio;
2887
2888 if (newprio < PRIO_MIN) newprio = PRIO_MIN;
2889 if (newprio > PRIO_MAX) newprio = PRIO_MAX;
2890
2891 coro->prio = newprio;
2892 }
2893}
2894 OUTPUT:
2895 RETVAL
2896
2897SV *
2898ready (SV *self)
2899 PROTOTYPE: $
2900 CODE:
2901 RETVAL = boolSV (api_ready (aTHX_ self));
2902 OUTPUT:
2903 RETVAL
2904
2905int
2906nready (...)
2907 PROTOTYPE:
2908 CODE:
2909 RETVAL = coro_nready;
2910 OUTPUT:
2911 RETVAL
2912
2913# for async_pool speedup
2914void
2915_pool_1 (SV *cb)
2916 CODE:
2917{
2918 HV *hv = (HV *)SvRV (coro_current);
2919 struct coro *coro = SvSTATE_hv ((SV *)hv);
2920 AV *defav = GvAV (PL_defgv);
2921 SV *invoke = hv_delete (hv, "_invoke", sizeof ("_invoke") - 1, 0);
2922 AV *invoke_av;
2923 int i, len;
2924
2925 if (!invoke)
2926 {
2927 SV *old = PL_diehook;
2928 PL_diehook = 0;
2929 SvREFCNT_dec (old);
2930 croak ("\3async_pool terminate\2\n");
2931 }
2932
2933 SvREFCNT_dec (coro->saved_deffh);
2934 coro->saved_deffh = SvREFCNT_inc_NN ((SV *)PL_defoutgv);
2935
2936 hv_store (hv, "desc", sizeof ("desc") - 1,
2937 newSVpvn ("[async_pool]", sizeof ("[async_pool]") - 1), 0);
2938
2939 invoke_av = (AV *)SvRV (invoke);
2940 len = av_len (invoke_av);
2941
2942 sv_setsv (cb, AvARRAY (invoke_av)[0]);
2943
2944 if (len > 0)
2945 {
2946 av_fill (defav, len - 1);
2947 for (i = 0; i < len; ++i)
2948 av_store (defav, i, SvREFCNT_inc_NN (AvARRAY (invoke_av)[i + 1]));
2949 }
2950}
2951
2952void
2953_pool_2 (SV *cb)
2954 CODE:
2955{
2956 HV *hv = (HV *)SvRV (coro_current);
2957 struct coro *coro = SvSTATE_hv ((SV *)hv);
2958
2959 sv_setsv (cb, &PL_sv_undef);
2960
2961 SvREFCNT_dec ((SV *)PL_defoutgv); PL_defoutgv = (GV *)coro->saved_deffh;
2962 coro->saved_deffh = 0;
2963
2964 if (coro_rss (aTHX_ coro) > SvUV (sv_pool_rss)
2965 || av_len (av_async_pool) + 1 >= SvIV (sv_pool_size))
2966 {
2967 SV *old = PL_diehook;
2968 PL_diehook = 0;
2969 SvREFCNT_dec (old);
2970 croak ("\3async_pool terminate\2\n");
2971 }
2972
2973 av_clear (GvAV (PL_defgv));
2974 hv_store (hv, "desc", sizeof ("desc") - 1,
2975 newSVpvn ("[async_pool idle]", sizeof ("[async_pool idle]") - 1), 0);
2976
2977 coro->prio = 0;
2978
2979 if (coro->cctx && (coro->cctx->flags & CC_TRACE))
2980 api_trace (aTHX_ coro_current, 0);
2981
2982 av_push (av_async_pool, newSVsv (coro_current));
2983}
2984
2985SV *
2986rouse_cb ()
2987 PROTOTYPE:
2988 CODE:
2989 RETVAL = coro_new_rouse_cb (aTHX);
2990 OUTPUT:
2991 RETVAL
2992
2993void
2994rouse_wait (SV *cb = 0)
2995 PROTOTYPE: ;$
2996 PPCODE:
2997 CORO_EXECUTE_SLF_XS (slf_init_rouse_wait);
2998
2999
3000MODULE = Coro::State PACKAGE = PerlIO::cede
3001
3002BOOT:
3003 PerlIO_define_layer (aTHX_ &PerlIO_cede);
3004
3005
3006MODULE = Coro::State PACKAGE = Coro::Semaphore
3007
3008SV *
3009new (SV *klass, SV *count = 0)
3010 CODE:
3011 RETVAL = sv_bless (
3012 coro_waitarray_new (aTHX_ count && SvOK (count) ? SvIV (count) : 1),
3013 GvSTASH (CvGV (cv))
3014 );
3015 OUTPUT:
3016 RETVAL
3017
3018# helper for Coro::Channel
3019SV *
3020_alloc (int count)
3021 CODE:
3022 RETVAL = coro_waitarray_new (aTHX_ count);
3023 OUTPUT:
3024 RETVAL
3025
3026SV *
3027count (SV *self)
3028 CODE:
3029 RETVAL = newSVsv (AvARRAY ((AV *)SvRV (self))[0]);
3030 OUTPUT:
3031 RETVAL
3032
3033void
3034up (SV *self, int adjust = 1)
3035 ALIAS:
3036 adjust = 1
3037 CODE:
3038 coro_semaphore_adjust (aTHX_ (AV *)SvRV (self), ix ? adjust : 1);
3039
3040void
3041down (SV *self)
3042 CODE:
3043 CORO_EXECUTE_SLF_XS (slf_init_semaphore_down);
3044
3045void
3046wait (SV *self)
3047 CODE:
3048 CORO_EXECUTE_SLF_XS (slf_init_semaphore_wait);
3049
3050void
3051try (SV *self)
3052 PPCODE:
3053{
3054 AV *av = (AV *)SvRV (self);
3055 SV *count_sv = AvARRAY (av)[0];
3056 IV count = SvIVX (count_sv);
3057
3058 if (count > 0)
3059 {
3060 --count;
3061 SvIVX (count_sv) = count;
3062 XSRETURN_YES;
3063 }
3064 else
3065 XSRETURN_NO;
3066}
3067
3068void
3069waiters (SV *self)
3070 PPCODE:
3071{
3072 AV *av = (AV *)SvRV (self);
3073 int wcount = AvFILLp (av) + 1 - 1;
3074
3075 if (GIMME_V == G_SCALAR)
3076 XPUSHs (sv_2mortal (newSViv (wcount)));
3077 else
3078 {
3079 int i;
3080 EXTEND (SP, wcount);
3081 for (i = 1; i <= wcount; ++i)
3082 PUSHs (sv_2mortal (newRV_inc (AvARRAY (av)[i])));
3083 }
3084}
3085
3086MODULE = Coro::State PACKAGE = Coro::Signal
3087
3088SV *
3089new (SV *klass)
3090 CODE:
3091 RETVAL = sv_bless (
3092 coro_waitarray_new (aTHX_ 0),
3093 GvSTASH (CvGV (cv))
3094 );
410 OUTPUT: 3095 OUTPUT:
411 RETVAL 3096 RETVAL
412 3097
413void 3098void
414transfer(prev,next) 3099wait (SV *self)
415 Coro::State_or_hashref prev
416 Coro::State_or_hashref next
417 CODE: 3100 CODE:
3101 CORO_EXECUTE_SLF_XS (slf_init_signal_wait);
418 3102
419 if (prev != next) 3103void
3104broadcast (SV *self)
3105 CODE:
3106{
3107 AV *av = (AV *)SvRV (self);
3108 coro_signal_wake (aTHX_ av, AvFILLp (av));
3109}
3110
3111void
3112send (SV *self)
3113 CODE:
3114{
3115 AV *av = (AV *)SvRV (self);
3116
3117 if (AvFILLp (av))
3118 coro_signal_wake (aTHX_ av, 1);
3119 else
3120 SvIVX (AvARRAY (av)[0]) = 1; /* remember the signal */
3121}
3122
3123IV
3124awaited (SV *self)
3125 CODE:
3126 RETVAL = AvFILLp ((AV *)SvRV (self)) + 1 - 1;
3127 OUTPUT:
3128 RETVAL
3129
3130
3131MODULE = Coro::State PACKAGE = Coro::AnyEvent
3132
3133BOOT:
3134 sv_activity = coro_get_sv (aTHX_ "Coro::AnyEvent::ACTIVITY", TRUE);
3135
3136void
3137_schedule (...)
3138 CODE:
3139{
3140 static int incede;
3141
3142 api_cede_notself (aTHX);
3143
3144 ++incede;
3145 while (coro_nready >= incede && api_cede (aTHX))
3146 ;
3147
3148 sv_setsv (sv_activity, &PL_sv_undef);
3149 if (coro_nready >= incede)
420 { 3150 {
421 /*
422 * this could be done in newprocess which would lead to
423 * extremely elegant and fast (just SAVE/LOAD)
424 * code here, but lazy allocation of stacks has also
425 * some virtues and the overhead of the if() is nil.
426 */
427 if (next->mainstack)
428 {
429 SAVE (prev);
430 LOAD (next);
431 /* mark this state as in-use */
432 next->mainstack = 0;
433 next->tmps_ix = -2;
434 }
435 else if (next->tmps_ix == -2)
436 {
437 croak ("tried to transfer to running coroutine");
438 }
439 else
440 {
441 SAVE (prev);
442
443 /*
444 * emulate part of the perl startup here.
445 */
446 UNOP myop;
447
448 init_stacks (); /* from perl.c */
449 PL_op = (OP *)&myop;
450 /*PL_curcop = 0;*/
451 GvAV (PL_defgv) = (AV *)SvREFCNT_inc ((SV *)next->args);
452
453 SPAGAIN;
454 Zero(&myop, 1, UNOP);
455 myop.op_next = Nullop;
456 myop.op_flags = OPf_WANT_VOID;
457
458 PUSHMARK(SP); 3151 PUSHMARK (SP);
459 XPUSHs ((SV*)get_cv(SUB_INIT, TRUE));
460 PUTBACK; 3152 PUTBACK;
461 /* 3153 call_pv ("Coro::AnyEvent::_activity", G_KEEPERR | G_EVAL | G_VOID | G_DISCARD);
462 * the next line is slightly wrong, as PL_op->op_next
463 * is actually being executed so we skip the first op.
464 * that doesn't matter, though, since it is only
465 * pp_nextstate and we never return...
466 */
467 PL_op = Perl_pp_entersub(aTHX);
468 SPAGAIN;
469
470 ENTER;
471 }
472 } 3154 }
473 3155
3156 --incede;
3157}
3158
3159
3160MODULE = Coro::State PACKAGE = Coro::AIO
3161
474void 3162void
475DESTROY(coro) 3163_register (char *target, char *proto, SV *req)
476 Coro::State coro 3164 CODE:
477 CODE: 3165{
3166 HV *st;
3167 GV *gvp;
3168 CV *req_cv = sv_2cv (req, &st, &gvp, 0);
3169 /* newXSproto doesn't return the CV on 5.8 */
3170 CV *slf_cv = newXS (target, coro_aio_req_xs, __FILE__);
3171 sv_setpv ((SV *)slf_cv, proto);
3172 sv_magicext ((SV *)slf_cv, (SV *)req_cv, CORO_MAGIC_type_aio, 0, 0, 0);
3173}
478 3174
479 if (coro->mainstack)
480 {
481 struct coro temp;
482
483 SAVE(aTHX_ (&temp));
484 LOAD(aTHX_ coro);
485
486 destroy_stacks ();
487 SvREFCNT_dec ((SV *)GvAV (PL_defgv));
488
489 LOAD((&temp));
490 }
491
492 SvREFCNT_dec (coro->args);
493 Safefree (coro);
494
495

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines