ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/Coro/Coro/State.xs
(Generate patch)

Comparing Coro/Coro/State.xs (file contents):
Revision 1.4 by root, Tue Jul 17 02:21:56 2001 UTC vs.
Revision 1.305 by root, Wed Nov 19 10:44:41 2008 UTC

1#include "libcoro/coro.c"
2
3#define PERL_NO_GET_CONTEXT
4#define PERL_EXT
5
1#include "EXTERN.h" 6#include "EXTERN.h"
2#include "perl.h" 7#include "perl.h"
3#include "XSUB.h" 8#include "XSUB.h"
9#include "perliol.h"
4 10
5#if 0 11#include "patchlevel.h"
6# define CHK(x) (void *)0 12
13#include <stdio.h>
14#include <errno.h>
15#include <assert.h>
16
17#ifdef WIN32
18# undef setjmp
19# undef longjmp
20# undef _exit
21# define setjmp _setjmp /* deep magic */
7#else 22#else
8# define CHK(x) if (!(x)) croak("FATAL, CHK: " #x) 23# include <inttypes.h> /* most portable stdint.h */
24#endif
25
26#ifdef HAVE_MMAP
27# include <unistd.h>
28# include <sys/mman.h>
29# ifndef MAP_ANONYMOUS
30# ifdef MAP_ANON
31# define MAP_ANONYMOUS MAP_ANON
32# else
33# undef HAVE_MMAP
34# endif
9#endif 35# endif
36# include <limits.h>
37# ifndef PAGESIZE
38# define PAGESIZE pagesize
39# define BOOT_PAGESIZE pagesize = sysconf (_SC_PAGESIZE)
40static long pagesize;
41# else
42# define BOOT_PAGESIZE (void)0
43# endif
44#else
45# define PAGESIZE 0
46# define BOOT_PAGESIZE (void)0
47#endif
10 48
49#if CORO_USE_VALGRIND
50# include <valgrind/valgrind.h>
51#endif
52
53/* the maximum number of idle cctx that will be pooled */
54static int cctx_max_idle = 4;
55
56#define PERL_VERSION_ATLEAST(a,b,c) \
57 (PERL_REVISION > (a) \
58 || (PERL_REVISION == (a) \
59 && (PERL_VERSION > (b) \
60 || (PERL_VERSION == (b) && PERLSUBVERSION >= (c)))))
61
62#if !PERL_VERSION_ATLEAST (5,6,0)
63# ifndef PL_ppaddr
64# define PL_ppaddr ppaddr
65# endif
66# ifndef call_sv
67# define call_sv perl_call_sv
68# endif
69# ifndef get_sv
70# define get_sv perl_get_sv
71# endif
72# ifndef get_cv
73# define get_cv perl_get_cv
74# endif
75# ifndef IS_PADGV
76# define IS_PADGV(v) 0
77# endif
78# ifndef IS_PADCONST
79# define IS_PADCONST(v) 0
80# endif
81#endif
82
83/* 5.11 */
84#ifndef CxHASARGS
85# define CxHASARGS(cx) (cx)->blk_sub.hasargs
86#endif
87
88/* 5.10.0 */
89#ifndef SvREFCNT_inc_NN
90# define SvREFCNT_inc_NN(sv) SvREFCNT_inc (sv)
91#endif
92
93/* 5.8.8 */
94#ifndef GV_NOTQUAL
95# define GV_NOTQUAL 0
96#endif
97#ifndef newSV
98# define newSV(l) NEWSV(0,l)
99#endif
100#ifndef CvISXSUB_on
101# define CvISXSUB_on(cv) (void)cv
102#endif
103
104/* 5.8.7 */
105#ifndef SvRV_set
106# define SvRV_set(s,v) SvRV(s) = (v)
107#endif
108
109#if !__i386 && !__x86_64 && !__powerpc && !__m68k && !__alpha && !__mips && !__sparc64
110# undef CORO_STACKGUARD
111#endif
112
113#ifndef CORO_STACKGUARD
114# define CORO_STACKGUARD 0
115#endif
116
117/* prefer perl internal functions over our own? */
118#ifndef CORO_PREFER_PERL_FUNCTIONS
119# define CORO_PREFER_PERL_FUNCTIONS 0
120#endif
121
122/* The next macros try to return the current stack pointer, in an as
123 * portable way as possible. */
124#if __GNUC__ >= 4
125# define dSTACKLEVEL int stacklevel_dummy
126# define STACKLEVEL __builtin_frame_address (0)
127#else
128# define dSTACKLEVEL volatile void *stacklevel
129# define STACKLEVEL ((void *)&stacklevel)
130#endif
131
132#define IN_DESTRUCT (PL_main_cv == Nullcv)
133
134#if __GNUC__ >= 3
135# define attribute(x) __attribute__(x)
136# define expect(expr,value) __builtin_expect ((expr),(value))
137# define INLINE static inline
138#else
139# define attribute(x)
140# define expect(expr,value) (expr)
141# define INLINE static
142#endif
143
144#define expect_false(expr) expect ((expr) != 0, 0)
145#define expect_true(expr) expect ((expr) != 0, 1)
146
147#define NOINLINE attribute ((noinline))
148
149#include "CoroAPI.h"
150#define GCoroAPI (&coroapi) /* very sneaky */
151
152#ifdef USE_ITHREADS
153# if CORO_PTHREAD
154static void *coro_thx;
155# endif
156#endif
157
158static double (*nvtime)(); /* so why doesn't it take void? */
159
160/* we hijack an hopefully unused CV flag for our purposes */
161#define CVf_SLF 0x4000
162static OP *pp_slf (pTHX);
163
164static U32 cctx_gen;
165static size_t cctx_stacksize = CORO_STACKSIZE;
166static struct CoroAPI coroapi;
167static AV *main_mainstack; /* used to differentiate between $main and others */
168static JMPENV *main_top_env;
169static HV *coro_state_stash, *coro_stash;
170static volatile SV *coro_mortal; /* will be freed/thrown after next transfer */
171
172static GV *irsgv; /* $/ */
173static GV *stdoutgv; /* *STDOUT */
174static SV *rv_diehook;
175static SV *rv_warnhook;
176static HV *hv_sig; /* %SIG */
177
178/* async_pool helper stuff */
179static SV *sv_pool_rss;
180static SV *sv_pool_size;
181static AV *av_async_pool;
182
183/* Coro::AnyEvent */
184static SV *sv_activity;
185
186static struct coro_cctx *cctx_first;
187static int cctx_count, cctx_idle;
188
189enum {
190 CC_MAPPED = 0x01,
191 CC_NOREUSE = 0x02, /* throw this away after tracing */
192 CC_TRACE = 0x04,
193 CC_TRACE_SUB = 0x08, /* trace sub calls */
194 CC_TRACE_LINE = 0x10, /* trace each statement */
195 CC_TRACE_ALL = CC_TRACE_SUB | CC_TRACE_LINE,
196};
197
198/* this is a structure representing a c-level coroutine */
199typedef struct coro_cctx
200{
201 struct coro_cctx *next;
202
203 /* the stack */
204 void *sptr;
205 size_t ssize;
206
207 /* cpu state */
208 void *idle_sp; /* sp of top-level transfer/schedule/cede call */
209 JMPENV *idle_te; /* same as idle_sp, but for top_env, TODO: remove once stable */
210 JMPENV *top_env;
211 coro_context cctx;
212
213 U32 gen;
214#if CORO_USE_VALGRIND
215 int valgrind_id;
216#endif
217 unsigned char flags;
218} coro_cctx;
219
220enum {
221 CF_RUNNING = 0x0001, /* coroutine is running */
222 CF_READY = 0x0002, /* coroutine is ready */
223 CF_NEW = 0x0004, /* has never been switched to */
224 CF_DESTROYED = 0x0008, /* coroutine data has been freed */
225};
226
227/* the structure where most of the perl state is stored, overlaid on the cxstack */
228typedef struct
229{
230 SV *defsv;
231 AV *defav;
232 SV *errsv;
233 SV *irsgv;
234#define VAR(name,type) type name;
235# include "state.h"
236#undef VAR
237} perl_slots;
238
239#define SLOT_COUNT ((sizeof (perl_slots) + sizeof (PERL_CONTEXT) - 1) / sizeof (PERL_CONTEXT))
240
241/* this is a structure representing a perl-level coroutine */
11struct coro { 242struct coro {
12 U8 dowarn; 243 /* the C coroutine allocated to this perl coroutine, if any */
13 AV *defav; 244 coro_cctx *cctx;
14 245
15 PERL_SI *curstackinfo; 246 /* state data */
16 AV *curstack; 247 struct CoroSLF slf_frame; /* saved slf frame */
17 AV *mainstack; 248 AV *mainstack;
18 SV **stack_sp; 249 perl_slots *slot; /* basically the saved sp */
19 OP *op;
20 SV **curpad;
21 SV **stack_base;
22 SV **stack_max;
23 SV **tmps_stack;
24 I32 tmps_floor;
25 I32 tmps_ix;
26 I32 tmps_max;
27 I32 *markstack;
28 I32 *markstack_ptr;
29 I32 *markstack_max;
30 I32 *scopestack;
31 I32 scopestack_ix;
32 I32 scopestack_max;
33 ANY *savestack;
34 I32 savestack_ix;
35 I32 savestack_max;
36 OP **retstack;
37 I32 retstack_ix;
38 I32 retstack_max;
39 COP *curcop;
40 250
41 AV *args; 251 AV *args; /* data associated with this coroutine (initial args) */
252 int refcnt; /* coroutines are refcounted, yes */
253 int flags; /* CF_ flags */
254 HV *hv; /* the perl hash associated with this coro, if any */
255 void (*on_destroy)(pTHX_ struct coro *coro);
256
257 /* statistics */
258 int usecount; /* number of transfers to this coro */
259
260 /* coro process data */
261 int prio;
262 SV *except; /* exception to be thrown */
263 SV *rouse_cb;
264
265 /* async_pool */
266 SV *saved_deffh;
267
268 /* linked list */
269 struct coro *next, *prev;
42}; 270};
43 271
44typedef struct coro *Coro__State; 272typedef struct coro *Coro__State;
45typedef struct coro *Coro__State_or_hashref; 273typedef struct coro *Coro__State_or_hashref;
46 274
47static HV *padlist_cache; 275/* the following variables are effectively part of the perl context */
276/* and get copied between struct coro and these variables */
277/* the mainr easonw e don't support windows process emulation */
278static struct CoroSLF slf_frame; /* the current slf frame */
48 279
49/* mostly copied from op.c:cv_clone2 */ 280/** Coro ********************************************************************/
50STATIC AV * 281
51clone_padlist (AV *protopadlist) 282#define PRIO_MAX 3
283#define PRIO_HIGH 1
284#define PRIO_NORMAL 0
285#define PRIO_LOW -1
286#define PRIO_IDLE -3
287#define PRIO_MIN -4
288
289/* for Coro.pm */
290static SV *coro_current;
291static SV *coro_readyhook;
292static AV *coro_ready [PRIO_MAX - PRIO_MIN + 1];
293static struct coro *coro_first;
294#define coro_nready coroapi.nready
295
296/** lowlevel stuff **********************************************************/
297
298static SV *
299coro_get_sv (pTHX_ const char *name, int create)
52{ 300{
53 AV *av; 301#if PERL_VERSION_ATLEAST (5,10,0)
54 I32 ix; 302 /* silence stupid and wrong 5.10 warning that I am unable to switch off */
55 AV *protopad_name = (AV *) * av_fetch (protopadlist, 0, FALSE); 303 get_sv (name, create);
56 AV *protopad = (AV *) * av_fetch (protopadlist, 1, FALSE); 304#endif
57 SV **pname = AvARRAY (protopad_name); 305 return get_sv (name, create);
58 SV **ppad = AvARRAY (protopad); 306}
59 I32 fname = AvFILLp (protopad_name); 307
60 I32 fpad = AvFILLp (protopad); 308static AV *
309coro_get_av (pTHX_ const char *name, int create)
310{
311#if PERL_VERSION_ATLEAST (5,10,0)
312 /* silence stupid and wrong 5.10 warning that I am unable to switch off */
313 get_av (name, create);
314#endif
315 return get_av (name, create);
316}
317
318static HV *
319coro_get_hv (pTHX_ const char *name, int create)
320{
321#if PERL_VERSION_ATLEAST (5,10,0)
322 /* silence stupid and wrong 5.10 warning that I am unable to switch off */
323 get_hv (name, create);
324#endif
325 return get_hv (name, create);
326}
327
328static AV *
329coro_clone_padlist (pTHX_ CV *cv)
330{
331 AV *padlist = CvPADLIST (cv);
61 AV *newpadlist, *newpad_name, *newpad; 332 AV *newpadlist, *newpad;
62 SV **npad;
63
64 newpad_name = newAV ();
65 for (ix = fname; ix >= 0; ix--)
66 av_store (newpad_name, ix, SvREFCNT_inc (pname[ix]));
67
68 newpad = newAV ();
69 av_fill (newpad, AvFILLp (protopad));
70 npad = AvARRAY (newpad);
71 333
72 newpadlist = newAV (); 334 newpadlist = newAV ();
73 AvREAL_off (newpadlist); 335 AvREAL_off (newpadlist);
74 av_store (newpadlist, 0, (SV *) newpad_name); 336#if PERL_VERSION_ATLEAST (5,10,0)
337 Perl_pad_push (aTHX_ padlist, AvFILLp (padlist) + 1);
338#else
339 Perl_pad_push (aTHX_ padlist, AvFILLp (padlist) + 1, 1);
340#endif
341 newpad = (AV *)AvARRAY (padlist)[AvFILLp (padlist)];
342 --AvFILLp (padlist);
343
344 av_store (newpadlist, 0, SvREFCNT_inc_NN (*av_fetch (padlist, 0, FALSE)));
75 av_store (newpadlist, 1, (SV *) newpad); 345 av_store (newpadlist, 1, (SV *)newpad);
76 346
77 av = newAV (); /* will be @_ */ 347 return newpadlist;
78 av_extend (av, 0); 348}
79 av_store (newpad, 0, (SV *) av);
80 AvFLAGS (av) = AVf_REIFY;
81 349
82 for (ix = fpad; ix > 0; ix--) 350static void
351free_padlist (pTHX_ AV *padlist)
352{
353 /* may be during global destruction */
354 if (SvREFCNT (padlist))
83 { 355 {
84 SV *namesv = (ix <= fname) ? pname[ix] : Nullsv; 356 I32 i = AvFILLp (padlist);
85 if (namesv && namesv != &PL_sv_undef) 357 while (i >= 0)
86 { 358 {
87 char *name = SvPVX (namesv); /* XXX */ 359 SV **svp = av_fetch (padlist, i--, FALSE);
88 if (SvFLAGS (namesv) & SVf_FAKE || *name == '&') 360 if (svp)
89 { /* lexical from outside? */
90 npad[ix] = SvREFCNT_inc (ppad[ix]);
91 } 361 {
92 else
93 { /* our own lexical */
94 SV *sv; 362 SV *sv;
95 if (*name == '&') 363 while (&PL_sv_undef != (sv = av_pop ((AV *)*svp)))
96 sv = SvREFCNT_inc (ppad[ix]); 364 SvREFCNT_dec (sv);
97 else if (*name == '@') 365
98 sv = (SV *) newAV (); 366 SvREFCNT_dec (*svp);
99 else if (*name == '%')
100 sv = (SV *) newHV ();
101 else
102 sv = NEWSV (0, 0);
103 if (!SvPADBUSY (sv))
104 SvPADMY_on (sv);
105 npad[ix] = sv;
106 } 367 }
107 } 368 }
108 else if (IS_PADGV (ppad[ix]) || IS_PADCONST (ppad[ix]))
109 {
110 npad[ix] = SvREFCNT_inc (ppad[ix]);
111 }
112 else
113 {
114 SV *sv = NEWSV (0, 0);
115 SvPADTMP_on (sv);
116 npad[ix] = sv;
117 }
118 }
119 369
120#if 0 /* NONOTUNDERSTOOD */
121 /* Now that vars are all in place, clone nested closures. */
122
123 for (ix = fpad; ix > 0; ix--) {
124 SV* namesv = (ix <= fname) ? pname[ix] : Nullsv;
125 if (namesv
126 && namesv != &PL_sv_undef
127 && !(SvFLAGS(namesv) & SVf_FAKE)
128 && *SvPVX(namesv) == '&'
129 && CvCLONE(ppad[ix]))
130 {
131 CV *kid = cv_clone((CV*)ppad[ix]);
132 SvREFCNT_dec(ppad[ix]);
133 CvCLONE_on(kid);
134 SvPADMY_on(kid);
135 npad[ix] = (SV*)kid;
136 }
137 }
138#endif
139
140 return newpadlist;
141}
142
143STATIC AV *
144free_padlist (AV *padlist)
145{
146 /* may be during global destruction */
147 if (SvREFCNT(padlist))
148 {
149 I32 i = AvFILLp(padlist);
150 while (i >= 0)
151 {
152 SV **svp = av_fetch(padlist, i--, FALSE);
153 SV *sv = svp ? *svp : Nullsv;
154 if (sv)
155 SvREFCNT_dec(sv);
156 }
157
158 SvREFCNT_dec((SV*)padlist); 370 SvREFCNT_dec ((SV*)padlist);
371 }
372}
373
374static int
375coro_cv_free (pTHX_ SV *sv, MAGIC *mg)
376{
377 AV *padlist;
378 AV *av = (AV *)mg->mg_obj;
379
380 /* casting is fun. */
381 while (&PL_sv_undef != (SV *)(padlist = (AV *)av_pop (av)))
382 free_padlist (aTHX_ padlist);
383
384 SvREFCNT_dec (av); /* sv_magicext increased the refcount */
385
386 return 0;
387}
388
389#define CORO_MAGIC_type_cv 26
390#define CORO_MAGIC_type_state PERL_MAGIC_ext
391
392static MGVTBL coro_cv_vtbl = {
393 0, 0, 0, 0,
394 coro_cv_free
395};
396
397#define CORO_MAGIC_NN(sv, type) \
398 (expect_true (SvMAGIC (sv)->mg_type == type) \
399 ? SvMAGIC (sv) \
400 : mg_find (sv, type))
401
402#define CORO_MAGIC(sv, type) \
403 (expect_true (SvMAGIC (sv)) \
404 ? CORO_MAGIC_NN (sv, type) \
405 : 0)
406
407#define CORO_MAGIC_cv(cv) CORO_MAGIC (((SV *)(cv)), CORO_MAGIC_type_cv)
408#define CORO_MAGIC_state(sv) CORO_MAGIC_NN (((SV *)(sv)), CORO_MAGIC_type_state)
409
410INLINE struct coro *
411SvSTATE_ (pTHX_ SV *coro)
412{
413 HV *stash;
414 MAGIC *mg;
415
416 if (SvROK (coro))
417 coro = SvRV (coro);
418
419 if (expect_false (SvTYPE (coro) != SVt_PVHV))
420 croak ("Coro::State object required");
421
422 stash = SvSTASH (coro);
423 if (expect_false (stash != coro_stash && stash != coro_state_stash))
424 {
425 /* very slow, but rare, check */
426 if (!sv_derived_from (sv_2mortal (newRV_inc (coro)), "Coro::State"))
427 croak ("Coro::State object required");
428 }
429
430 mg = CORO_MAGIC_state (coro);
431 return (struct coro *)mg->mg_ptr;
432}
433
434#define SvSTATE(sv) SvSTATE_ (aTHX_ (sv))
435
436/* faster than SvSTATE, but expects a coroutine hv */
437#define SvSTATE_hv(hv) ((struct coro *)CORO_MAGIC_NN ((SV *)hv, CORO_MAGIC_type_state)->mg_ptr)
438#define SvSTATE_current SvSTATE_hv (SvRV (coro_current))
439
440/* the next two functions merely cache the padlists */
441static void
442get_padlist (pTHX_ CV *cv)
443{
444 MAGIC *mg = CORO_MAGIC_cv (cv);
445 AV *av;
446
447 if (expect_true (mg && AvFILLp ((av = (AV *)mg->mg_obj)) >= 0))
448 CvPADLIST (cv) = (AV *)AvARRAY (av)[AvFILLp (av)--];
449 else
450 {
451#if CORO_PREFER_PERL_FUNCTIONS
452 /* this is probably cleaner? but also slower! */
453 /* in practise, it seems to be less stable */
454 CV *cp = Perl_cv_clone (aTHX_ cv);
455 CvPADLIST (cv) = CvPADLIST (cp);
456 CvPADLIST (cp) = 0;
457 SvREFCNT_dec (cp);
458#else
459 CvPADLIST (cv) = coro_clone_padlist (aTHX_ cv);
460#endif
461 }
462}
463
464static void
465put_padlist (pTHX_ CV *cv)
466{
467 MAGIC *mg = CORO_MAGIC_cv (cv);
468 AV *av;
469
470 if (expect_false (!mg))
471 mg = sv_magicext ((SV *)cv, (SV *)newAV (), CORO_MAGIC_type_cv, &coro_cv_vtbl, 0, 0);
472
473 av = (AV *)mg->mg_obj;
474
475 if (expect_false (AvFILLp (av) >= AvMAX (av)))
476 av_extend (av, AvMAX (av) + 1);
477
478 AvARRAY (av)[++AvFILLp (av)] = (SV *)CvPADLIST (cv);
479}
480
481/** load & save, init *******************************************************/
482
483static void
484load_perl (pTHX_ Coro__State c)
485{
486 perl_slots *slot = c->slot;
487 c->slot = 0;
488
489 PL_mainstack = c->mainstack;
490
491 GvSV (PL_defgv) = slot->defsv;
492 GvAV (PL_defgv) = slot->defav;
493 GvSV (PL_errgv) = slot->errsv;
494 GvSV (irsgv) = slot->irsgv;
495
496 #define VAR(name,type) PL_ ## name = slot->name;
497 # include "state.h"
498 #undef VAR
499
500 {
501 dSP;
502
503 CV *cv;
504
505 /* now do the ugly restore mess */
506 while (expect_true (cv = (CV *)POPs))
507 {
508 put_padlist (aTHX_ cv); /* mark this padlist as available */
509 CvDEPTH (cv) = PTR2IV (POPs);
510 CvPADLIST (cv) = (AV *)POPs;
511 }
512
513 PUTBACK;
159 } 514 }
160}
161 515
162/* the next tow functions merely cache the padlists */ 516 slf_frame = c->slf_frame;
163STATIC void 517 CORO_THROW = c->except;
164get_padlist (CV *cv)
165{
166 SV **he = hv_fetch (padlist_cache, (void *)&cv, sizeof (CV *), 0);
167
168 if (he && AvFILLp ((AV *)*he) >= 0)
169 CvPADLIST (cv) = (AV *)av_pop ((AV *)*he);
170 else
171 CvPADLIST (cv) = clone_padlist (CvPADLIST (cv));
172} 518}
173 519
174STATIC void
175put_padlist (CV *cv)
176{
177 SV **he = hv_fetch (padlist_cache, (void *)&cv, sizeof (CV *), 1);
178
179 if (SvTYPE (*he) != SVt_PVAV)
180 {
181 SvREFCNT_dec (*he);
182 *he = (SV *)newAV ();
183 }
184
185 av_push ((AV *)*he, (SV *)CvPADLIST (cv));
186}
187
188static void 520static void
189SAVE(pTHX_ Coro__State c) 521save_perl (pTHX_ Coro__State c)
190{ 522{
523 c->except = CORO_THROW;
524 c->slf_frame = slf_frame;
525
191 { 526 {
192 dSP; 527 dSP;
193 I32 cxix = cxstack_ix; 528 I32 cxix = cxstack_ix;
529 PERL_CONTEXT *ccstk = cxstack;
194 PERL_SI *top_si = PL_curstackinfo; 530 PERL_SI *top_si = PL_curstackinfo;
195 PERL_CONTEXT *ccstk = cxstack;
196 531
197 /* 532 /*
198 * the worst thing you can imagine happens first - we have to save 533 * the worst thing you can imagine happens first - we have to save
199 * (and reinitialize) all cv's in the whole callchain :( 534 * (and reinitialize) all cv's in the whole callchain :(
200 */ 535 */
201 536
202 PUSHs (Nullsv); 537 XPUSHs (Nullsv);
203 /* this loop was inspired by pp_caller */ 538 /* this loop was inspired by pp_caller */
204 for (;;) 539 for (;;)
205 { 540 {
206 while (cxix >= 0) 541 while (expect_true (cxix >= 0))
207 { 542 {
208 PERL_CONTEXT *cx = &ccstk[cxix--]; 543 PERL_CONTEXT *cx = &ccstk[cxix--];
209 544
210 if (CxTYPE(cx) == CXt_SUB) 545 if (expect_true (CxTYPE (cx) == CXt_SUB || CxTYPE (cx) == CXt_FORMAT))
211 { 546 {
212 CV *cv = cx->blk_sub.cv; 547 CV *cv = cx->blk_sub.cv;
548
213 if (CvDEPTH(cv)) 549 if (expect_true (CvDEPTH (cv)))
214 { 550 {
215#ifdef USE_THREADS
216 XPUSHs ((SV *)CvOWNER(cv));
217#endif
218 EXTEND (SP, 3); 551 EXTEND (SP, 3);
219 PUSHs ((SV *)CvDEPTH(cv));
220 PUSHs ((SV *)CvPADLIST(cv)); 552 PUSHs ((SV *)CvPADLIST (cv));
553 PUSHs (INT2PTR (SV *, (IV)CvDEPTH (cv)));
221 PUSHs ((SV *)cv); 554 PUSHs ((SV *)cv);
222 555
223 get_padlist (cv);
224
225 CvDEPTH(cv) = 0; 556 CvDEPTH (cv) = 0;
226#ifdef USE_THREADS 557 get_padlist (aTHX_ cv);
227 CvOWNER(cv) = 0;
228 error must unlock this cv etc.. etc...
229 if you are here wondering about this error message then
230 the reason is that it will not work as advertised yet
231#endif
232 } 558 }
233 } 559 }
234 else if (CxTYPE(cx) == CXt_FORMAT) 560 }
561
562 if (expect_true (top_si->si_type == PERLSI_MAIN))
563 break;
564
565 top_si = top_si->si_prev;
566 ccstk = top_si->si_cxstack;
567 cxix = top_si->si_cxix;
568 }
569
570 PUTBACK;
571 }
572
573 /* allocate some space on the context stack for our purposes */
574 /* we manually unroll here, as usually 2 slots is enough */
575 if (SLOT_COUNT >= 1) CXINC;
576 if (SLOT_COUNT >= 2) CXINC;
577 if (SLOT_COUNT >= 3) CXINC;
578 {
579 int i;
580 for (i = 3; i < SLOT_COUNT; ++i)
581 CXINC;
582 }
583 cxstack_ix -= SLOT_COUNT; /* undo allocation */
584
585 c->mainstack = PL_mainstack;
586
587 {
588 perl_slots *slot = c->slot = (perl_slots *)(cxstack + cxstack_ix + 1);
589
590 slot->defav = GvAV (PL_defgv);
591 slot->defsv = DEFSV;
592 slot->errsv = ERRSV;
593 slot->irsgv = GvSV (irsgv);
594
595 #define VAR(name,type) slot->name = PL_ ## name;
596 # include "state.h"
597 #undef VAR
598 }
599}
600
601/*
602 * allocate various perl stacks. This is almost an exact copy
603 * of perl.c:init_stacks, except that it uses less memory
604 * on the (sometimes correct) assumption that coroutines do
605 * not usually need a lot of stackspace.
606 */
607#if CORO_PREFER_PERL_FUNCTIONS
608# define coro_init_stacks(thx) init_stacks ()
609#else
610static void
611coro_init_stacks (pTHX)
612{
613 PL_curstackinfo = new_stackinfo(32, 8);
614 PL_curstackinfo->si_type = PERLSI_MAIN;
615 PL_curstack = PL_curstackinfo->si_stack;
616 PL_mainstack = PL_curstack; /* remember in case we switch stacks */
617
618 PL_stack_base = AvARRAY(PL_curstack);
619 PL_stack_sp = PL_stack_base;
620 PL_stack_max = PL_stack_base + AvMAX(PL_curstack);
621
622 New(50,PL_tmps_stack,32,SV*);
623 PL_tmps_floor = -1;
624 PL_tmps_ix = -1;
625 PL_tmps_max = 32;
626
627 New(54,PL_markstack,16,I32);
628 PL_markstack_ptr = PL_markstack;
629 PL_markstack_max = PL_markstack + 16;
630
631#ifdef SET_MARK_OFFSET
632 SET_MARK_OFFSET;
633#endif
634
635 New(54,PL_scopestack,8,I32);
636 PL_scopestack_ix = 0;
637 PL_scopestack_max = 8;
638
639 New(54,PL_savestack,24,ANY);
640 PL_savestack_ix = 0;
641 PL_savestack_max = 24;
642
643#if !PERL_VERSION_ATLEAST (5,10,0)
644 New(54,PL_retstack,4,OP*);
645 PL_retstack_ix = 0;
646 PL_retstack_max = 4;
647#endif
648}
649#endif
650
651/*
652 * destroy the stacks, the callchain etc...
653 */
654static void
655coro_destruct_stacks (pTHX)
656{
657 while (PL_curstackinfo->si_next)
658 PL_curstackinfo = PL_curstackinfo->si_next;
659
660 while (PL_curstackinfo)
661 {
662 PERL_SI *p = PL_curstackinfo->si_prev;
663
664 if (!IN_DESTRUCT)
665 SvREFCNT_dec (PL_curstackinfo->si_stack);
666
667 Safefree (PL_curstackinfo->si_cxstack);
668 Safefree (PL_curstackinfo);
669 PL_curstackinfo = p;
670 }
671
672 Safefree (PL_tmps_stack);
673 Safefree (PL_markstack);
674 Safefree (PL_scopestack);
675 Safefree (PL_savestack);
676#if !PERL_VERSION_ATLEAST (5,10,0)
677 Safefree (PL_retstack);
678#endif
679}
680
681static size_t
682coro_rss (pTHX_ struct coro *coro)
683{
684 size_t rss = sizeof (*coro);
685
686 if (coro->mainstack)
687 {
688 perl_slots tmp_slot;
689 perl_slots *slot;
690
691 if (coro->flags & CF_RUNNING)
692 {
693 slot = &tmp_slot;
694
695 #define VAR(name,type) slot->name = PL_ ## name;
696 # include "state.h"
697 #undef VAR
698 }
699 else
700 slot = coro->slot;
701
702 if (slot)
703 {
704 rss += sizeof (slot->curstackinfo);
705 rss += (slot->curstackinfo->si_cxmax + 1) * sizeof (PERL_CONTEXT);
706 rss += sizeof (SV) + sizeof (struct xpvav) + (1 + AvMAX (slot->curstack)) * sizeof (SV *);
707 rss += slot->tmps_max * sizeof (SV *);
708 rss += (slot->markstack_max - slot->markstack_ptr) * sizeof (I32);
709 rss += slot->scopestack_max * sizeof (I32);
710 rss += slot->savestack_max * sizeof (ANY);
711
712#if !PERL_VERSION_ATLEAST (5,10,0)
713 rss += slot->retstack_max * sizeof (OP *);
714#endif
715 }
716 }
717
718 return rss;
719}
720
721/** coroutine stack handling ************************************************/
722
723static int (*orig_sigelem_get) (pTHX_ SV *sv, MAGIC *mg);
724static int (*orig_sigelem_set) (pTHX_ SV *sv, MAGIC *mg);
725static int (*orig_sigelem_clr) (pTHX_ SV *sv, MAGIC *mg);
726
727/* apparently < 5.8.8 */
728#ifndef MgPV_nolen_const
729#define MgPV_nolen_const(mg) (((((int)(mg)->mg_len)) == HEf_SVKEY) ? \
730 SvPV_nolen((SV*)((mg)->mg_ptr)) : \
731 (const char*)(mg)->mg_ptr)
732#endif
733
734/*
735 * This overrides the default magic get method of %SIG elements.
736 * The original one doesn't provide for reading back of PL_diehook/PL_warnhook
737 * and instead of tryign to save and restore the hash elements, we just provide
738 * readback here.
739 * We only do this when the hook is != 0, as they are often set to 0 temporarily,
740 * not expecting this to actually change the hook. This is a potential problem
741 * when a schedule happens then, but we ignore this.
742 */
743static int
744coro_sigelem_get (pTHX_ SV *sv, MAGIC *mg)
745{
746 const char *s = MgPV_nolen_const (mg);
747
748 if (*s == '_')
749 {
750 SV **svp = 0;
751
752 if (strEQ (s, "__DIE__" )) svp = &PL_diehook;
753 if (strEQ (s, "__WARN__")) svp = &PL_warnhook;
754
755 if (svp)
756 {
757 sv_setsv (sv, *svp ? *svp : &PL_sv_undef);
758 return 0;
759 }
760 }
761
762 return orig_sigelem_get ? orig_sigelem_get (aTHX_ sv, mg) : 0;
763}
764
765static int
766coro_sigelem_clr (pTHX_ SV *sv, MAGIC *mg)
767{
768 const char *s = MgPV_nolen_const (mg);
769
770 if (*s == '_')
771 {
772 SV **svp = 0;
773
774 if (strEQ (s, "__DIE__" )) svp = &PL_diehook;
775 if (strEQ (s, "__WARN__")) svp = &PL_warnhook;
776
777 if (svp)
778 {
779 SV *old = *svp;
780 *svp = 0;
781 SvREFCNT_dec (old);
782 return 0;
783 }
784 }
785
786 return orig_sigelem_clr ? orig_sigelem_clr (aTHX_ sv, mg) : 0;
787}
788
789static int
790coro_sigelem_set (pTHX_ SV *sv, MAGIC *mg)
791{
792 const char *s = MgPV_nolen_const (mg);
793
794 if (*s == '_')
795 {
796 SV **svp = 0;
797
798 if (strEQ (s, "__DIE__" )) svp = &PL_diehook;
799 if (strEQ (s, "__WARN__")) svp = &PL_warnhook;
800
801 if (svp)
802 {
803 SV *old = *svp;
804 *svp = newSVsv (sv);
805 SvREFCNT_dec (old);
806 return 0;
807 }
808 }
809
810 return orig_sigelem_set ? orig_sigelem_set (aTHX_ sv, mg) : 0;
811}
812
813static void
814prepare_nop (pTHX_ struct coro_transfer_args *ta)
815{
816 /* kind of mega-hacky, but works */
817 ta->next = ta->prev = (struct coro *)ta;
818}
819
820static int
821slf_check_nop (pTHX_ struct CoroSLF *frame)
822{
823 return 0;
824}
825
826static UNOP coro_setup_op;
827
828static void NOINLINE /* noinline to keep it out of the transfer fast path */
829coro_setup (pTHX_ struct coro *coro)
830{
831 /*
832 * emulate part of the perl startup here.
833 */
834 coro_init_stacks (aTHX);
835
836 PL_runops = RUNOPS_DEFAULT;
837 PL_curcop = &PL_compiling;
838 PL_in_eval = EVAL_NULL;
839 PL_comppad = 0;
840 PL_curpm = 0;
841 PL_curpad = 0;
842 PL_localizing = 0;
843 PL_dirty = 0;
844 PL_restartop = 0;
845#if PERL_VERSION_ATLEAST (5,10,0)
846 PL_parser = 0;
847#endif
848
849 /* recreate the die/warn hooks */
850 PL_diehook = 0; SvSetMagicSV (*hv_fetch (hv_sig, "__DIE__" , sizeof ("__DIE__" ) - 1, 1), rv_diehook );
851 PL_warnhook = 0; SvSetMagicSV (*hv_fetch (hv_sig, "__WARN__", sizeof ("__WARN__") - 1, 1), rv_warnhook);
852
853 GvSV (PL_defgv) = newSV (0);
854 GvAV (PL_defgv) = coro->args; coro->args = 0;
855 GvSV (PL_errgv) = newSV (0);
856 GvSV (irsgv) = newSVpvn ("\n", 1); sv_magic (GvSV (irsgv), (SV *)irsgv, PERL_MAGIC_sv, "/", 0);
857 PL_rs = newSVsv (GvSV (irsgv));
858 PL_defoutgv = (GV *)SvREFCNT_inc_NN (stdoutgv);
859
860 {
861 dSP;
862 UNOP myop;
863
864 Zero (&myop, 1, UNOP);
865 myop.op_next = Nullop;
866 myop.op_flags = OPf_WANT_VOID;
867
868 PUSHMARK (SP);
869 XPUSHs (sv_2mortal (av_shift (GvAV (PL_defgv))));
870 PUTBACK;
871 PL_op = (OP *)&myop;
872 PL_op = PL_ppaddr[OP_ENTERSUB](aTHX);
873 SPAGAIN;
874 }
875
876 /* this newly created coroutine might be run on an existing cctx which most
877 * likely was suspended in pp_slf, so we have to emulate entering pp_slf here.
878 */
879 slf_frame.prepare = prepare_nop; /* provide a nop function for an eventual pp_slf */
880 slf_frame.check = slf_check_nop; /* signal pp_slf to not repeat */
881
882 /* and we have to provide the pp_slf op in any case, so pp_slf can skip it */
883 coro_setup_op.op_next = PL_op;
884 coro_setup_op.op_type = OP_CUSTOM;
885 coro_setup_op.op_ppaddr = pp_slf;
886 /* no flags required, as an init function won't be called */
887
888 PL_op = (OP *)&coro_setup_op;
889
890 /* copy throw, in case it was set before coro_setup */
891 CORO_THROW = coro->except;
892}
893
894static void
895coro_destruct (pTHX_ struct coro *coro)
896{
897 if (!IN_DESTRUCT)
898 {
899 /* restore all saved variables and stuff */
900 LEAVE_SCOPE (0);
901 assert (PL_tmps_floor == -1);
902
903 /* free all temporaries */
904 FREETMPS;
905 assert (PL_tmps_ix == -1);
906
907 /* unwind all extra stacks */
908 POPSTACK_TO (PL_mainstack);
909
910 /* unwind main stack */
911 dounwind (-1);
912 }
913
914 SvREFCNT_dec (GvSV (PL_defgv));
915 SvREFCNT_dec (GvAV (PL_defgv));
916 SvREFCNT_dec (GvSV (PL_errgv));
917 SvREFCNT_dec (PL_defoutgv);
918 SvREFCNT_dec (PL_rs);
919 SvREFCNT_dec (GvSV (irsgv));
920
921 SvREFCNT_dec (PL_diehook);
922 SvREFCNT_dec (PL_warnhook);
923
924 SvREFCNT_dec (CORO_THROW);
925 SvREFCNT_dec (coro->saved_deffh);
926 SvREFCNT_dec (coro->rouse_cb);
927
928 coro_destruct_stacks (aTHX);
929}
930
931INLINE void
932free_coro_mortal (pTHX)
933{
934 if (expect_true (coro_mortal))
935 {
936 SvREFCNT_dec (coro_mortal);
937 coro_mortal = 0;
938 }
939}
940
941static int
942runops_trace (pTHX)
943{
944 COP *oldcop = 0;
945 int oldcxix = -2;
946 struct coro *coro = SvSTATE_current; /* trace cctx is tied to specific coro */
947 coro_cctx *cctx = coro->cctx;
948
949 while ((PL_op = CALL_FPTR (PL_op->op_ppaddr) (aTHX)))
950 {
951 PERL_ASYNC_CHECK ();
952
953 if (cctx->flags & CC_TRACE_ALL)
954 {
955 if (PL_op->op_type == OP_LEAVESUB && cctx->flags & CC_TRACE_SUB)
956 {
957 PERL_CONTEXT *cx = &cxstack[cxstack_ix];
958 SV **bot, **top;
959 AV *av = newAV (); /* return values */
960 SV **cb;
961 dSP;
962
963 GV *gv = CvGV (cx->blk_sub.cv);
964 SV *fullname = sv_2mortal (newSV (0));
965 if (isGV (gv))
966 gv_efullname3 (fullname, gv, 0);
967
968 bot = PL_stack_base + cx->blk_oldsp + 1;
969 top = cx->blk_gimme == G_ARRAY ? SP + 1
970 : cx->blk_gimme == G_SCALAR ? bot + 1
971 : bot;
972
973 av_extend (av, top - bot);
974 while (bot < top)
975 av_push (av, SvREFCNT_inc_NN (*bot++));
976
977 PL_runops = RUNOPS_DEFAULT;
978 ENTER;
979 SAVETMPS;
980 EXTEND (SP, 3);
981 PUSHMARK (SP);
982 PUSHs (&PL_sv_no);
983 PUSHs (fullname);
984 PUSHs (sv_2mortal (newRV_noinc ((SV *)av)));
985 PUTBACK;
986 cb = hv_fetch ((HV *)SvRV (coro_current), "_trace_sub_cb", sizeof ("_trace_sub_cb") - 1, 0);
987 if (cb) call_sv (*cb, G_KEEPERR | G_EVAL | G_VOID | G_DISCARD);
988 SPAGAIN;
989 FREETMPS;
990 LEAVE;
991 PL_runops = runops_trace;
992 }
993
994 if (oldcop != PL_curcop)
995 {
996 oldcop = PL_curcop;
997
998 if (PL_curcop != &PL_compiling)
999 {
1000 SV **cb;
1001
1002 if (oldcxix != cxstack_ix && cctx->flags & CC_TRACE_SUB)
1003 {
1004 PERL_CONTEXT *cx = &cxstack[cxstack_ix];
1005
1006 if (CxTYPE (cx) == CXt_SUB && oldcxix < cxstack_ix)
1007 {
1008 runops_proc_t old_runops = PL_runops;
1009 dSP;
1010 GV *gv = CvGV (cx->blk_sub.cv);
1011 SV *fullname = sv_2mortal (newSV (0));
1012
1013 if (isGV (gv))
1014 gv_efullname3 (fullname, gv, 0);
1015
1016 PL_runops = RUNOPS_DEFAULT;
1017 ENTER;
1018 SAVETMPS;
1019 EXTEND (SP, 3);
1020 PUSHMARK (SP);
1021 PUSHs (&PL_sv_yes);
1022 PUSHs (fullname);
1023 PUSHs (CxHASARGS (cx) ? sv_2mortal (newRV_inc ((SV *)cx->blk_sub.argarray)) : &PL_sv_undef);
1024 PUTBACK;
1025 cb = hv_fetch ((HV *)SvRV (coro_current), "_trace_sub_cb", sizeof ("_trace_sub_cb") - 1, 0);
1026 if (cb) call_sv (*cb, G_KEEPERR | G_EVAL | G_VOID | G_DISCARD);
1027 SPAGAIN;
1028 FREETMPS;
1029 LEAVE;
1030 PL_runops = runops_trace;
1031 }
1032
1033 oldcxix = cxstack_ix;
1034 }
1035
1036 if (cctx->flags & CC_TRACE_LINE)
1037 {
1038 dSP;
1039
1040 PL_runops = RUNOPS_DEFAULT;
1041 ENTER;
1042 SAVETMPS;
1043 EXTEND (SP, 3);
1044 PL_runops = RUNOPS_DEFAULT;
1045 PUSHMARK (SP);
1046 PUSHs (sv_2mortal (newSVpv (OutCopFILE (oldcop), 0)));
1047 PUSHs (sv_2mortal (newSViv (CopLINE (oldcop))));
1048 PUTBACK;
1049 cb = hv_fetch ((HV *)SvRV (coro_current), "_trace_line_cb", sizeof ("_trace_line_cb") - 1, 0);
1050 if (cb) call_sv (*cb, G_KEEPERR | G_EVAL | G_VOID | G_DISCARD);
1051 SPAGAIN;
1052 FREETMPS;
1053 LEAVE;
1054 PL_runops = runops_trace;
1055 }
1056 }
1057 }
1058 }
1059 }
1060
1061 TAINT_NOT;
1062 return 0;
1063}
1064
1065static struct coro_cctx *cctx_ssl_cctx;
1066static struct CoroSLF cctx_ssl_frame;
1067
1068static void
1069slf_prepare_set_stacklevel (pTHX_ struct coro_transfer_args *ta)
1070{
1071 ta->prev = (struct coro *)cctx_ssl_cctx;
1072 ta->next = 0;
1073}
1074
1075static int
1076slf_check_set_stacklevel (pTHX_ struct CoroSLF *frame)
1077{
1078 *frame = cctx_ssl_frame;
1079
1080 return frame->check (aTHX_ frame); /* execute the restored frame - there must be one */
1081}
1082
1083/* initialises PL_top_env and injects a pseudo-slf-call to set the stacklevel */
1084static void NOINLINE
1085cctx_prepare (pTHX_ coro_cctx *cctx)
1086{
1087 PL_top_env = &PL_start_env;
1088
1089 if (cctx->flags & CC_TRACE)
1090 PL_runops = runops_trace;
1091
1092 /* we already must be executing an SLF op, there is no other valid way
1093 * that can lead to creation of a new cctx */
1094 assert (("FATAL: can't prepare slf-less cctx in Coro module (please report)",
1095 slf_frame.prepare && PL_op->op_ppaddr == pp_slf));
1096
1097 /* we must emulate leaving pp_slf, which is done inside slf_check_set_stacklevel */
1098 cctx_ssl_cctx = cctx;
1099 cctx_ssl_frame = slf_frame;
1100
1101 slf_frame.prepare = slf_prepare_set_stacklevel;
1102 slf_frame.check = slf_check_set_stacklevel;
1103}
1104
1105/* the tail of transfer: execute stuff we can only do after a transfer */
1106INLINE void
1107transfer_tail (pTHX)
1108{
1109 free_coro_mortal (aTHX);
1110}
1111
1112/*
1113 * this is a _very_ stripped down perl interpreter ;)
1114 */
1115static void
1116cctx_run (void *arg)
1117{
1118#ifdef USE_ITHREADS
1119# if CORO_PTHREAD
1120 PERL_SET_CONTEXT (coro_thx);
1121# endif
1122#endif
1123 {
1124 dTHX;
1125
1126 /* normally we would need to skip the entersub here */
1127 /* not doing so will re-execute it, which is exactly what we want */
1128 /* PL_nop = PL_nop->op_next */
1129
1130 /* inject a fake subroutine call to cctx_init */
1131 cctx_prepare (aTHX_ (coro_cctx *)arg);
1132
1133 /* cctx_run is the alternative tail of transfer() */
1134 transfer_tail (aTHX);
1135
1136 /* somebody or something will hit me for both perl_run and PL_restartop */
1137 PL_restartop = PL_op;
1138 perl_run (PL_curinterp);
1139
1140 /*
1141 * If perl-run returns we assume exit() was being called or the coro
1142 * fell off the end, which seems to be the only valid (non-bug)
1143 * reason for perl_run to return. We try to exit by jumping to the
1144 * bootstrap-time "top" top_env, as we cannot restore the "main"
1145 * coroutine as Coro has no such concept
1146 */
1147 PL_top_env = main_top_env;
1148 JMPENV_JUMP (2); /* I do not feel well about the hardcoded 2 at all */
1149 }
1150}
1151
1152static coro_cctx *
1153cctx_new ()
1154{
1155 coro_cctx *cctx;
1156
1157 ++cctx_count;
1158 New (0, cctx, 1, coro_cctx);
1159
1160 cctx->gen = cctx_gen;
1161 cctx->flags = 0;
1162 cctx->idle_sp = 0; /* can be accessed by transfer between cctx_run and set_stacklevel, on throw */
1163
1164 return cctx;
1165}
1166
1167/* create a new cctx only suitable as source */
1168static coro_cctx *
1169cctx_new_empty ()
1170{
1171 coro_cctx *cctx = cctx_new ();
1172
1173 cctx->sptr = 0;
1174 coro_create (&cctx->cctx, 0, 0, 0, 0);
1175
1176 return cctx;
1177}
1178
1179/* create a new cctx suitable as destination/running a perl interpreter */
1180static coro_cctx *
1181cctx_new_run ()
1182{
1183 coro_cctx *cctx = cctx_new ();
1184 void *stack_start;
1185 size_t stack_size;
1186
1187#if HAVE_MMAP
1188 cctx->ssize = ((cctx_stacksize * sizeof (long) + PAGESIZE - 1) / PAGESIZE + CORO_STACKGUARD) * PAGESIZE;
1189 /* mmap supposedly does allocate-on-write for us */
1190 cctx->sptr = mmap (0, cctx->ssize, PROT_EXEC|PROT_READ|PROT_WRITE, MAP_PRIVATE|MAP_ANONYMOUS, 0, 0);
1191
1192 if (cctx->sptr != (void *)-1)
1193 {
1194 #if CORO_STACKGUARD
1195 mprotect (cctx->sptr, CORO_STACKGUARD * PAGESIZE, PROT_NONE);
1196 #endif
1197 stack_start = (char *)cctx->sptr + CORO_STACKGUARD * PAGESIZE;
1198 stack_size = cctx->ssize - CORO_STACKGUARD * PAGESIZE;
1199 cctx->flags |= CC_MAPPED;
1200 }
1201 else
1202#endif
1203 {
1204 cctx->ssize = cctx_stacksize * (long)sizeof (long);
1205 New (0, cctx->sptr, cctx_stacksize, long);
1206
1207 if (!cctx->sptr)
1208 {
1209 perror ("FATAL: unable to allocate stack for coroutine, exiting.");
1210 _exit (EXIT_FAILURE);
1211 }
1212
1213 stack_start = cctx->sptr;
1214 stack_size = cctx->ssize;
1215 }
1216
1217 #if CORO_USE_VALGRIND
1218 cctx->valgrind_id = VALGRIND_STACK_REGISTER ((char *)stack_start, (char *)stack_start + stack_size);
1219 #endif
1220
1221 coro_create (&cctx->cctx, cctx_run, (void *)cctx, stack_start, stack_size);
1222
1223 return cctx;
1224}
1225
1226static void
1227cctx_destroy (coro_cctx *cctx)
1228{
1229 if (!cctx)
1230 return;
1231
1232 --cctx_count;
1233 coro_destroy (&cctx->cctx);
1234
1235 /* coro_transfer creates new, empty cctx's */
1236 if (cctx->sptr)
1237 {
1238 #if CORO_USE_VALGRIND
1239 VALGRIND_STACK_DEREGISTER (cctx->valgrind_id);
1240 #endif
1241
1242#if HAVE_MMAP
1243 if (cctx->flags & CC_MAPPED)
1244 munmap (cctx->sptr, cctx->ssize);
1245 else
1246#endif
1247 Safefree (cctx->sptr);
1248 }
1249
1250 Safefree (cctx);
1251}
1252
1253/* wether this cctx should be destructed */
1254#define CCTX_EXPIRED(cctx) ((cctx)->gen != cctx_gen || ((cctx)->flags & CC_NOREUSE))
1255
1256static coro_cctx *
1257cctx_get (pTHX)
1258{
1259 while (expect_true (cctx_first))
1260 {
1261 coro_cctx *cctx = cctx_first;
1262 cctx_first = cctx->next;
1263 --cctx_idle;
1264
1265 if (expect_true (!CCTX_EXPIRED (cctx)))
1266 return cctx;
1267
1268 cctx_destroy (cctx);
1269 }
1270
1271 return cctx_new_run ();
1272}
1273
1274static void
1275cctx_put (coro_cctx *cctx)
1276{
1277 assert (("FATAL: cctx_put called on non-initialised cctx in Coro (please report)", cctx->sptr));
1278
1279 /* free another cctx if overlimit */
1280 if (expect_false (cctx_idle >= cctx_max_idle))
1281 {
1282 coro_cctx *first = cctx_first;
1283 cctx_first = first->next;
1284 --cctx_idle;
1285
1286 cctx_destroy (first);
1287 }
1288
1289 ++cctx_idle;
1290 cctx->next = cctx_first;
1291 cctx_first = cctx;
1292}
1293
1294/** coroutine switching *****************************************************/
1295
1296static void
1297transfer_check (pTHX_ struct coro *prev, struct coro *next)
1298{
1299 /* TODO: throwing up here is considered harmful */
1300
1301 if (expect_true (prev != next))
1302 {
1303 if (expect_false (!(prev->flags & (CF_RUNNING | CF_NEW))))
1304 croak ("Coro::State::transfer called with non-running/new prev Coro::State, but can only transfer from running or new states,");
1305
1306 if (expect_false (next->flags & CF_RUNNING))
1307 croak ("Coro::State::transfer called with running next Coro::State, but can only transfer to inactive states,");
1308
1309 if (expect_false (next->flags & CF_DESTROYED))
1310 croak ("Coro::State::transfer called with destroyed next Coro::State, but can only transfer to inactive states,");
1311
1312#if !PERL_VERSION_ATLEAST (5,10,0)
1313 if (expect_false (PL_lex_state != LEX_NOTPARSING))
1314 croak ("Coro::State::transfer called while parsing, but this is not supported in your perl version,");
1315#endif
1316 }
1317}
1318
1319/* always use the TRANSFER macro */
1320static void NOINLINE /* noinline so we have a fixed stackframe */
1321transfer (pTHX_ struct coro *prev, struct coro *next, int force_cctx)
1322{
1323 dSTACKLEVEL;
1324
1325 /* sometimes transfer is only called to set idle_sp */
1326 if (expect_false (!next))
1327 {
1328 ((coro_cctx *)prev)->idle_sp = STACKLEVEL;
1329 assert (((coro_cctx *)prev)->idle_te = PL_top_env); /* just for the side-effect when asserts are enabled */
1330 }
1331 else if (expect_true (prev != next))
1332 {
1333 coro_cctx *prev__cctx;
1334
1335 if (expect_false (prev->flags & CF_NEW))
1336 {
1337 /* create a new empty/source context */
1338 prev->cctx = cctx_new_empty ();
1339 prev->flags &= ~CF_NEW;
1340 prev->flags |= CF_RUNNING;
1341 }
1342
1343 prev->flags &= ~CF_RUNNING;
1344 next->flags |= CF_RUNNING;
1345
1346 /* first get rid of the old state */
1347 save_perl (aTHX_ prev);
1348
1349 if (expect_false (next->flags & CF_NEW))
1350 {
1351 /* need to start coroutine */
1352 next->flags &= ~CF_NEW;
1353 /* setup coroutine call */
1354 coro_setup (aTHX_ next);
1355 }
1356 else
1357 load_perl (aTHX_ next);
1358
1359 prev__cctx = prev->cctx;
1360
1361 /* possibly untie and reuse the cctx */
1362 if (expect_true (
1363 prev__cctx->idle_sp == STACKLEVEL
1364 && !(prev__cctx->flags & CC_TRACE)
1365 && !force_cctx
1366 ))
1367 {
1368 /* I assume that stacklevel is a stronger indicator than PL_top_env changes */
1369 assert (("FATAL: current top_env must equal previous top_env in Coro (please report)", PL_top_env == prev__cctx->idle_te));
1370
1371 prev->cctx = 0;
1372
1373 /* if the cctx is about to be destroyed we need to make sure we won't see it in cctx_get */
1374 /* without this the next cctx_get might destroy the prev__cctx while still in use */
1375 if (expect_false (CCTX_EXPIRED (prev__cctx)))
1376 if (!next->cctx)
1377 next->cctx = cctx_get (aTHX);
1378
1379 cctx_put (prev__cctx);
1380 }
1381
1382 ++next->usecount;
1383
1384 if (expect_true (!next->cctx))
1385 next->cctx = cctx_get (aTHX);
1386
1387 if (expect_false (prev__cctx != next->cctx))
1388 {
1389 prev__cctx->top_env = PL_top_env;
1390 PL_top_env = next->cctx->top_env;
1391 coro_transfer (&prev__cctx->cctx, &next->cctx->cctx);
1392 }
1393
1394 transfer_tail (aTHX);
1395 }
1396}
1397
1398#define TRANSFER(ta, force_cctx) transfer (aTHX_ (ta).prev, (ta).next, (force_cctx))
1399#define TRANSFER_CHECK(ta) transfer_check (aTHX_ (ta).prev, (ta).next)
1400
1401/** high level stuff ********************************************************/
1402
1403static int
1404coro_state_destroy (pTHX_ struct coro *coro)
1405{
1406 if (coro->flags & CF_DESTROYED)
1407 return 0;
1408
1409 if (coro->on_destroy)
1410 coro->on_destroy (aTHX_ coro);
1411
1412 coro->flags |= CF_DESTROYED;
1413
1414 if (coro->flags & CF_READY)
1415 {
1416 /* reduce nready, as destroying a ready coro effectively unreadies it */
1417 /* alternative: look through all ready queues and remove the coro */
1418 --coro_nready;
1419 }
1420 else
1421 coro->flags |= CF_READY; /* make sure it is NOT put into the readyqueue */
1422
1423 if (coro->mainstack && coro->mainstack != main_mainstack)
1424 {
1425 struct coro temp;
1426
1427 assert (("FATAL: tried to destroy currently running coroutine (please report)", !(coro->flags & CF_RUNNING)));
1428
1429 save_perl (aTHX_ &temp);
1430 load_perl (aTHX_ coro);
1431
1432 coro_destruct (aTHX_ coro);
1433
1434 load_perl (aTHX_ &temp);
1435
1436 coro->slot = 0;
1437 }
1438
1439 cctx_destroy (coro->cctx);
1440 SvREFCNT_dec (coro->args);
1441
1442 if (coro->next) coro->next->prev = coro->prev;
1443 if (coro->prev) coro->prev->next = coro->next;
1444 if (coro == coro_first) coro_first = coro->next;
1445
1446 return 1;
1447}
1448
1449static int
1450coro_state_free (pTHX_ SV *sv, MAGIC *mg)
1451{
1452 struct coro *coro = (struct coro *)mg->mg_ptr;
1453 mg->mg_ptr = 0;
1454
1455 coro->hv = 0;
1456
1457 if (--coro->refcnt < 0)
1458 {
1459 coro_state_destroy (aTHX_ coro);
1460 Safefree (coro);
1461 }
1462
1463 return 0;
1464}
1465
1466static int
1467coro_state_dup (pTHX_ MAGIC *mg, CLONE_PARAMS *params)
1468{
1469 struct coro *coro = (struct coro *)mg->mg_ptr;
1470
1471 ++coro->refcnt;
1472
1473 return 0;
1474}
1475
1476static MGVTBL coro_state_vtbl = {
1477 0, 0, 0, 0,
1478 coro_state_free,
1479 0,
1480#ifdef MGf_DUP
1481 coro_state_dup,
1482#else
1483# define MGf_DUP 0
1484#endif
1485};
1486
1487static void
1488prepare_transfer (pTHX_ struct coro_transfer_args *ta, SV *prev_sv, SV *next_sv)
1489{
1490 ta->prev = SvSTATE (prev_sv);
1491 ta->next = SvSTATE (next_sv);
1492 TRANSFER_CHECK (*ta);
1493}
1494
1495static void
1496api_transfer (pTHX_ SV *prev_sv, SV *next_sv)
1497{
1498 struct coro_transfer_args ta;
1499
1500 prepare_transfer (aTHX_ &ta, prev_sv, next_sv);
1501 TRANSFER (ta, 1);
1502}
1503
1504/*****************************************************************************/
1505/* gensub: simple closure generation utility */
1506
1507#define GENSUB_ARG CvXSUBANY (cv).any_ptr
1508
1509/* create a closure from XS, returns a code reference */
1510/* the arg can be accessed via GENSUB_ARG from the callback */
1511/* the callback must use dXSARGS/XSRETURN */
1512static SV *
1513gensub (pTHX_ void (*xsub)(pTHX_ CV *), void *arg)
1514{
1515 CV *cv = (CV *)newSV (0);
1516
1517 sv_upgrade ((SV *)cv, SVt_PVCV);
1518
1519 CvANON_on (cv);
1520 CvISXSUB_on (cv);
1521 CvXSUB (cv) = xsub;
1522 GENSUB_ARG = arg;
1523
1524 return newRV_noinc ((SV *)cv);
1525}
1526
1527/** Coro ********************************************************************/
1528
1529INLINE void
1530coro_enq (pTHX_ struct coro *coro)
1531{
1532 av_push (coro_ready [coro->prio - PRIO_MIN], SvREFCNT_inc_NN (coro->hv));
1533}
1534
1535INLINE SV *
1536coro_deq (pTHX)
1537{
1538 int prio;
1539
1540 for (prio = PRIO_MAX - PRIO_MIN + 1; --prio >= 0; )
1541 if (AvFILLp (coro_ready [prio]) >= 0)
1542 return av_shift (coro_ready [prio]);
1543
1544 return 0;
1545}
1546
1547static int
1548api_ready (pTHX_ SV *coro_sv)
1549{
1550 struct coro *coro;
1551 SV *sv_hook;
1552 void (*xs_hook)(void);
1553
1554 if (SvROK (coro_sv))
1555 coro_sv = SvRV (coro_sv);
1556
1557 coro = SvSTATE (coro_sv);
1558
1559 if (coro->flags & CF_READY)
1560 return 0;
1561
1562 coro->flags |= CF_READY;
1563
1564 sv_hook = coro_nready ? 0 : coro_readyhook;
1565 xs_hook = coro_nready ? 0 : coroapi.readyhook;
1566
1567 coro_enq (aTHX_ coro);
1568 ++coro_nready;
1569
1570 if (sv_hook)
1571 {
1572 dSP;
1573
1574 ENTER;
1575 SAVETMPS;
1576
1577 PUSHMARK (SP);
1578 PUTBACK;
1579 call_sv (sv_hook, G_VOID | G_DISCARD);
1580
1581 FREETMPS;
1582 LEAVE;
1583 }
1584
1585 if (xs_hook)
1586 xs_hook ();
1587
1588 return 1;
1589}
1590
1591static int
1592api_is_ready (pTHX_ SV *coro_sv)
1593{
1594 return !!(SvSTATE (coro_sv)->flags & CF_READY);
1595}
1596
1597INLINE void
1598prepare_schedule (pTHX_ struct coro_transfer_args *ta)
1599{
1600 SV *prev_sv, *next_sv;
1601
1602 for (;;)
1603 {
1604 next_sv = coro_deq (aTHX);
1605
1606 /* nothing to schedule: call the idle handler */
1607 if (expect_false (!next_sv))
1608 {
1609 dSP;
1610
1611 ENTER;
1612 SAVETMPS;
1613
1614 PUSHMARK (SP);
1615 PUTBACK;
1616 call_sv (get_sv ("Coro::idle", FALSE), G_VOID | G_DISCARD);
1617
1618 FREETMPS;
1619 LEAVE;
1620 continue;
1621 }
1622
1623 ta->next = SvSTATE_hv (next_sv);
1624
1625 /* cannot transfer to destroyed coros, skip and look for next */
1626 if (expect_false (ta->next->flags & CF_DESTROYED))
1627 {
1628 SvREFCNT_dec (next_sv);
1629 /* coro_nready has already been taken care of by destroy */
1630 continue;
1631 }
1632
1633 --coro_nready;
1634 break;
1635 }
1636
1637 /* free this only after the transfer */
1638 prev_sv = SvRV (coro_current);
1639 ta->prev = SvSTATE_hv (prev_sv);
1640 TRANSFER_CHECK (*ta);
1641 assert (("FATAL: next coroutine isn't marked as ready in Coro (please report)", ta->next->flags & CF_READY));
1642 ta->next->flags &= ~CF_READY;
1643 SvRV_set (coro_current, next_sv);
1644
1645 free_coro_mortal (aTHX);
1646 coro_mortal = prev_sv;
1647}
1648
1649INLINE void
1650prepare_cede (pTHX_ struct coro_transfer_args *ta)
1651{
1652 api_ready (aTHX_ coro_current);
1653 prepare_schedule (aTHX_ ta);
1654}
1655
1656INLINE void
1657prepare_cede_notself (pTHX_ struct coro_transfer_args *ta)
1658{
1659 SV *prev = SvRV (coro_current);
1660
1661 if (coro_nready)
1662 {
1663 prepare_schedule (aTHX_ ta);
1664 api_ready (aTHX_ prev);
1665 }
1666 else
1667 prepare_nop (aTHX_ ta);
1668}
1669
1670static void
1671api_schedule (pTHX)
1672{
1673 struct coro_transfer_args ta;
1674
1675 prepare_schedule (aTHX_ &ta);
1676 TRANSFER (ta, 1);
1677}
1678
1679static int
1680api_cede (pTHX)
1681{
1682 struct coro_transfer_args ta;
1683
1684 prepare_cede (aTHX_ &ta);
1685
1686 if (expect_true (ta.prev != ta.next))
1687 {
1688 TRANSFER (ta, 1);
1689 return 1;
1690 }
1691 else
1692 return 0;
1693}
1694
1695static int
1696api_cede_notself (pTHX)
1697{
1698 if (coro_nready)
1699 {
1700 struct coro_transfer_args ta;
1701
1702 prepare_cede_notself (aTHX_ &ta);
1703 TRANSFER (ta, 1);
1704 return 1;
1705 }
1706 else
1707 return 0;
1708}
1709
1710static void
1711api_trace (pTHX_ SV *coro_sv, int flags)
1712{
1713 struct coro *coro = SvSTATE (coro_sv);
1714
1715 if (flags & CC_TRACE)
1716 {
1717 if (!coro->cctx)
1718 coro->cctx = cctx_new_run ();
1719 else if (!(coro->cctx->flags & CC_TRACE))
1720 croak ("cannot enable tracing on coroutine with custom stack,");
1721
1722 coro->cctx->flags |= CC_NOREUSE | (flags & (CC_TRACE | CC_TRACE_ALL));
1723 }
1724 else if (coro->cctx && coro->cctx->flags & CC_TRACE)
1725 {
1726 coro->cctx->flags &= ~(CC_TRACE | CC_TRACE_ALL);
1727
1728 if (coro->flags & CF_RUNNING)
1729 PL_runops = RUNOPS_DEFAULT;
1730 else
1731 coro->slot->runops = RUNOPS_DEFAULT;
1732 }
1733}
1734
1735/*****************************************************************************/
1736/* rouse callback */
1737
1738#define CORO_MAGIC_type_rouse PERL_MAGIC_ext
1739
1740static void
1741coro_rouse_callback (pTHX_ CV *cv)
1742{
1743 dXSARGS;
1744 SV *data = (SV *)GENSUB_ARG;
1745
1746 if (SvTYPE (SvRV (data)) != SVt_PVAV)
1747 {
1748 /* first call, set args */
1749 int i;
1750 AV *av = newAV ();
1751 SV *coro = SvRV (data);
1752
1753 SvRV_set (data, (SV *)av);
1754 api_ready (aTHX_ coro);
1755 SvREFCNT_dec (coro);
1756
1757 /* better take a full copy of the arguments */
1758 while (items--)
1759 av_store (av, items, newSVsv (ST (items)));
1760 }
1761
1762 XSRETURN_EMPTY;
1763}
1764
1765static int
1766slf_check_rouse_wait (pTHX_ struct CoroSLF *frame)
1767{
1768 SV *data = (SV *)frame->data;
1769
1770 if (CORO_THROW)
1771 return 0;
1772
1773 if (SvTYPE (SvRV (data)) != SVt_PVAV)
1774 return 1;
1775
1776 /* now push all results on the stack */
1777 {
1778 dSP;
1779 AV *av = (AV *)SvRV (data);
1780 int i;
1781
1782 EXTEND (SP, AvFILLp (av) + 1);
1783 for (i = 0; i <= AvFILLp (av); ++i)
1784 PUSHs (sv_2mortal (AvARRAY (av)[i]));
1785
1786 /* we have stolen the elements, so ste length to zero and free */
1787 AvFILLp (av) = -1;
1788 av_undef (av);
1789
1790 PUTBACK;
1791 }
1792
1793 return 0;
1794}
1795
1796static void
1797slf_init_rouse_wait (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
1798{
1799 SV *cb;
1800
1801 if (items)
1802 cb = arg [0];
1803 else
1804 {
1805 struct coro *coro = SvSTATE_current;
1806
1807 if (!coro->rouse_cb)
1808 croak ("Coro::rouse_wait called without rouse callback, and no default rouse callback found either,");
1809
1810 cb = sv_2mortal (coro->rouse_cb);
1811 coro->rouse_cb = 0;
1812 }
1813
1814 if (!SvROK (cb)
1815 || SvTYPE (SvRV (cb)) != SVt_PVCV
1816 || CvXSUB ((CV *)SvRV (cb)) != coro_rouse_callback)
1817 croak ("Coro::rouse_wait called with illegal callback argument,");
1818
1819 {
1820 CV *cv = (CV *)SvRV (cb); /* for GENSUB_ARG */
1821 SV *data = (SV *)GENSUB_ARG;
1822
1823 frame->data = (void *)data;
1824 frame->prepare = SvTYPE (SvRV (data)) == SVt_PVAV ? prepare_nop : prepare_schedule;
1825 frame->check = slf_check_rouse_wait;
1826 }
1827}
1828
1829static SV *
1830coro_new_rouse_cb (pTHX)
1831{
1832 HV *hv = (HV *)SvRV (coro_current);
1833 struct coro *coro = SvSTATE_hv (hv);
1834 SV *data = newRV_inc ((SV *)hv);
1835 SV *cb = gensub (aTHX_ coro_rouse_callback, (void *)data);
1836
1837 sv_magicext (SvRV (cb), data, CORO_MAGIC_type_rouse, 0, 0, 0);
1838 SvREFCNT_dec (data); /* magicext increases the refcount */
1839
1840 SvREFCNT_dec (coro->rouse_cb);
1841 coro->rouse_cb = SvREFCNT_inc_NN (cb);
1842
1843 return cb;
1844}
1845
1846/*****************************************************************************/
1847/* schedule-like-function opcode (SLF) */
1848
1849static UNOP slf_restore; /* restore stack as entersub did, for first-re-run */
1850static const CV *slf_cv;
1851static SV **slf_argv;
1852static int slf_argc, slf_arga; /* count, allocated */
1853static I32 slf_ax; /* top of stack, for restore */
1854
1855/* this restores the stack in the case we patched the entersub, to */
1856/* recreate the stack frame as perl will on following calls */
1857/* since entersub cleared the stack */
1858static OP *
1859pp_restore (pTHX)
1860{
1861 int i;
1862 SV **SP = PL_stack_base + slf_ax;
1863
1864 PUSHMARK (SP);
1865
1866 EXTEND (SP, slf_argc + 1);
1867
1868 for (i = 0; i < slf_argc; ++i)
1869 PUSHs (sv_2mortal (slf_argv [i]));
1870
1871 PUSHs ((SV *)CvGV (slf_cv));
1872
1873 RETURNOP (slf_restore.op_first);
1874}
1875
1876static void
1877slf_prepare_transfer (pTHX_ struct coro_transfer_args *ta)
1878{
1879 SV **arg = (SV **)slf_frame.data;
1880
1881 prepare_transfer (aTHX_ ta, arg [0], arg [1]);
1882}
1883
1884static void
1885slf_init_transfer (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
1886{
1887 if (items != 2)
1888 croak ("Coro::State::transfer (prev, next) expects two arguments, not %d,", items);
1889
1890 frame->prepare = slf_prepare_transfer;
1891 frame->check = slf_check_nop;
1892 frame->data = (void *)arg; /* let's hope it will stay valid */
1893}
1894
1895static void
1896slf_init_schedule (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
1897{
1898 frame->prepare = prepare_schedule;
1899 frame->check = slf_check_nop;
1900}
1901
1902static void
1903slf_init_cede (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
1904{
1905 frame->prepare = prepare_cede;
1906 frame->check = slf_check_nop;
1907}
1908
1909static void
1910slf_init_cede_notself (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
1911{
1912 frame->prepare = prepare_cede_notself;
1913 frame->check = slf_check_nop;
1914}
1915
1916/*
1917 * these not obviously related functions are all rolled into one
1918 * function to increase chances that they all will call transfer with the same
1919 * stack offset
1920 * SLF stands for "schedule-like-function".
1921 */
1922static OP *
1923pp_slf (pTHX)
1924{
1925 I32 checkmark; /* mark SP to see how many elements check has pushed */
1926
1927 /* set up the slf frame, unless it has already been set-up */
1928 /* the latter happens when a new coro has been started */
1929 /* or when a new cctx was attached to an existing coroutine */
1930 if (expect_true (!slf_frame.prepare))
1931 {
1932 /* first iteration */
1933 dSP;
1934 SV **arg = PL_stack_base + TOPMARK + 1;
1935 int items = SP - arg; /* args without function object */
1936 SV *gv = *sp;
1937
1938 /* do a quick consistency check on the "function" object, and if it isn't */
1939 /* for us, divert to the real entersub */
1940 if (SvTYPE (gv) != SVt_PVGV
1941 || !GvCV (gv)
1942 || !(CvFLAGS (GvCV (gv)) & CVf_SLF))
1943 return PL_ppaddr[OP_ENTERSUB](aTHX);
1944
1945 if (!(PL_op->op_flags & OPf_STACKED))
1946 {
1947 /* ampersand-form of call, use @_ instead of stack */
1948 AV *av = GvAV (PL_defgv);
1949 arg = AvARRAY (av);
1950 items = AvFILLp (av) + 1;
1951 }
1952
1953 /* now call the init function, which needs to set up slf_frame */
1954 ((coro_slf_cb)CvXSUBANY (GvCV (gv)).any_ptr)
1955 (aTHX_ &slf_frame, GvCV (gv), arg, items);
1956
1957 /* pop args */
1958 SP = PL_stack_base + POPMARK;
1959
1960 PUTBACK;
1961 }
1962
1963 /* now that we have a slf_frame, interpret it! */
1964 /* we use a callback system not to make the code needlessly */
1965 /* complicated, but so we can run multiple perl coros from one cctx */
1966
1967 do
1968 {
1969 struct coro_transfer_args ta;
1970
1971 slf_frame.prepare (aTHX_ &ta);
1972 TRANSFER (ta, 0);
1973
1974 checkmark = PL_stack_sp - PL_stack_base;
1975 }
1976 while (slf_frame.check (aTHX_ &slf_frame));
1977
1978 slf_frame.prepare = 0; /* invalidate the frame, we are done processing it */
1979
1980 /* exception handling */
1981 if (expect_false (CORO_THROW))
1982 {
1983 SV *exception = sv_2mortal (CORO_THROW);
1984
1985 CORO_THROW = 0;
1986 sv_setsv (ERRSV, exception);
1987 croak (0);
1988 }
1989
1990 /* return value handling - mostly like entersub */
1991 /* make sure we put something on the stack in scalar context */
1992 if (GIMME_V == G_SCALAR)
1993 {
1994 dSP;
1995 SV **bot = PL_stack_base + checkmark;
1996
1997 if (sp == bot) /* too few, push undef */
1998 bot [1] = &PL_sv_undef;
1999 else if (sp != bot + 1) /* too many, take last one */
2000 bot [1] = *sp;
2001
2002 SP = bot + 1;
2003
2004 PUTBACK;
2005 }
2006
2007 return NORMAL;
2008}
2009
2010static void
2011api_execute_slf (pTHX_ CV *cv, coro_slf_cb init_cb, I32 ax)
2012{
2013 int i;
2014 SV **arg = PL_stack_base + ax;
2015 int items = PL_stack_sp - arg + 1;
2016
2017 assert (("FATAL: SLF call with illegal CV value", !CvANON (cv)));
2018
2019 if (PL_op->op_ppaddr != PL_ppaddr [OP_ENTERSUB]
2020 && PL_op->op_ppaddr != pp_slf)
2021 croak ("FATAL: Coro SLF calls can only be made normally, not via goto or any other means, caught");
2022
2023 CvFLAGS (cv) |= CVf_SLF;
2024 CvXSUBANY (cv).any_ptr = (void *)init_cb;
2025 slf_cv = cv;
2026
2027 /* we patch the op, and then re-run the whole call */
2028 /* we have to put the same argument on the stack for this to work */
2029 /* and this will be done by pp_restore */
2030 slf_restore.op_next = (OP *)&slf_restore;
2031 slf_restore.op_type = OP_CUSTOM;
2032 slf_restore.op_ppaddr = pp_restore;
2033 slf_restore.op_first = PL_op;
2034
2035 slf_ax = ax - 1; /* undo the ax++ inside dAXMARK */
2036
2037 if (PL_op->op_flags & OPf_STACKED)
2038 {
2039 if (items > slf_arga)
2040 {
2041 slf_arga = items;
2042 free (slf_argv);
2043 slf_argv = malloc (slf_arga * sizeof (SV *));
2044 }
2045
2046 slf_argc = items;
2047
2048 for (i = 0; i < items; ++i)
2049 slf_argv [i] = SvREFCNT_inc (arg [i]);
2050 }
2051 else
2052 slf_argc = 0;
2053
2054 PL_op->op_ppaddr = pp_slf;
2055 PL_op->op_type = OP_CUSTOM; /* maybe we should leave it at entersub? */
2056
2057 PL_op = (OP *)&slf_restore;
2058}
2059
2060/*****************************************************************************/
2061/* PerlIO::cede */
2062
2063typedef struct
2064{
2065 PerlIOBuf base;
2066 NV next, every;
2067} PerlIOCede;
2068
2069static IV
2070PerlIOCede_pushed (pTHX_ PerlIO *f, const char *mode, SV *arg, PerlIO_funcs *tab)
2071{
2072 PerlIOCede *self = PerlIOSelf (f, PerlIOCede);
2073
2074 self->every = SvCUR (arg) ? SvNV (arg) : 0.01;
2075 self->next = nvtime () + self->every;
2076
2077 return PerlIOBuf_pushed (aTHX_ f, mode, Nullsv, tab);
2078}
2079
2080static SV *
2081PerlIOCede_getarg (pTHX_ PerlIO *f, CLONE_PARAMS *param, int flags)
2082{
2083 PerlIOCede *self = PerlIOSelf (f, PerlIOCede);
2084
2085 return newSVnv (self->every);
2086}
2087
2088static IV
2089PerlIOCede_flush (pTHX_ PerlIO *f)
2090{
2091 PerlIOCede *self = PerlIOSelf (f, PerlIOCede);
2092 double now = nvtime ();
2093
2094 if (now >= self->next)
2095 {
2096 api_cede (aTHX);
2097 self->next = now + self->every;
2098 }
2099
2100 return PerlIOBuf_flush (aTHX_ f);
2101}
2102
2103static PerlIO_funcs PerlIO_cede =
2104{
2105 sizeof(PerlIO_funcs),
2106 "cede",
2107 sizeof(PerlIOCede),
2108 PERLIO_K_DESTRUCT | PERLIO_K_RAW,
2109 PerlIOCede_pushed,
2110 PerlIOBuf_popped,
2111 PerlIOBuf_open,
2112 PerlIOBase_binmode,
2113 PerlIOCede_getarg,
2114 PerlIOBase_fileno,
2115 PerlIOBuf_dup,
2116 PerlIOBuf_read,
2117 PerlIOBuf_unread,
2118 PerlIOBuf_write,
2119 PerlIOBuf_seek,
2120 PerlIOBuf_tell,
2121 PerlIOBuf_close,
2122 PerlIOCede_flush,
2123 PerlIOBuf_fill,
2124 PerlIOBase_eof,
2125 PerlIOBase_error,
2126 PerlIOBase_clearerr,
2127 PerlIOBase_setlinebuf,
2128 PerlIOBuf_get_base,
2129 PerlIOBuf_bufsiz,
2130 PerlIOBuf_get_ptr,
2131 PerlIOBuf_get_cnt,
2132 PerlIOBuf_set_ptrcnt,
2133};
2134
2135/*****************************************************************************/
2136/* Coro::Semaphore & Coro::Signal */
2137
2138static SV *
2139coro_waitarray_new (pTHX_ int count)
2140{
2141 /* a semaphore contains a counter IV in $sem->[0] and any waiters after that */
2142 AV *av = newAV ();
2143 SV **ary;
2144
2145 /* unfortunately, building manually saves memory */
2146 Newx (ary, 2, SV *);
2147 AvALLOC (av) = ary;
2148 /*AvARRAY (av) = ary;*/
2149 SvPVX ((SV *)av) = (char *)ary; /* 5.8.8 needs this syntax instead of AvARRAY = ary */
2150 AvMAX (av) = 1;
2151 AvFILLp (av) = 0;
2152 ary [0] = newSViv (count);
2153
2154 return newRV_noinc ((SV *)av);
2155}
2156
2157/* semaphore */
2158
2159static void
2160coro_semaphore_adjust (pTHX_ AV *av, IV adjust)
2161{
2162 SV *count_sv = AvARRAY (av)[0];
2163 IV count = SvIVX (count_sv);
2164
2165 count += adjust;
2166 SvIVX (count_sv) = count;
2167
2168 /* now wake up as many waiters as are expected to lock */
2169 while (count > 0 && AvFILLp (av) > 0)
2170 {
2171 SV *cb;
2172
2173 /* swap first two elements so we can shift a waiter */
2174 AvARRAY (av)[0] = AvARRAY (av)[1];
2175 AvARRAY (av)[1] = count_sv;
2176 cb = av_shift (av);
2177
2178 if (SvOBJECT (cb))
2179 api_ready (aTHX_ cb);
2180 else
2181 croak ("callbacks not yet supported");
2182
2183 SvREFCNT_dec (cb);
2184
2185 --count;
2186 }
2187}
2188
2189static void
2190coro_semaphore_on_destroy (pTHX_ struct coro *coro)
2191{
2192 /* call $sem->adjust (0) to possibly wake up some other waiters */
2193 coro_semaphore_adjust (aTHX_ (AV *)coro->slf_frame.data, 0);
2194}
2195
2196static int
2197slf_check_semaphore_down_or_wait (pTHX_ struct CoroSLF *frame, int acquire)
2198{
2199 AV *av = (AV *)frame->data;
2200 SV *count_sv = AvARRAY (av)[0];
2201
2202 /* if we are about to throw, don't actually acquire the lock, just throw */
2203 if (CORO_THROW)
2204 return 0;
2205 else if (SvIVX (count_sv) > 0)
2206 {
2207 SvSTATE_current->on_destroy = 0;
2208
2209 if (acquire)
2210 SvIVX (count_sv) = SvIVX (count_sv) - 1;
2211 else
2212 coro_semaphore_adjust (aTHX_ av, 0);
2213
2214 return 0;
2215 }
2216 else
2217 {
2218 int i;
2219 /* if we were woken up but can't down, we look through the whole */
2220 /* waiters list and only add us if we aren't in there already */
2221 /* this avoids some degenerate memory usage cases */
2222
2223 for (i = 1; i <= AvFILLp (av); ++i)
2224 if (AvARRAY (av)[i] == SvRV (coro_current))
2225 return 1;
2226
2227 av_push (av, SvREFCNT_inc (SvRV (coro_current)));
2228 return 1;
2229 }
2230}
2231
2232static int
2233slf_check_semaphore_down (pTHX_ struct CoroSLF *frame)
2234{
2235 return slf_check_semaphore_down_or_wait (aTHX_ frame, 1);
2236}
2237
2238static int
2239slf_check_semaphore_wait (pTHX_ struct CoroSLF *frame)
2240{
2241 return slf_check_semaphore_down_or_wait (aTHX_ frame, 0);
2242}
2243
2244static void
2245slf_init_semaphore_down_or_wait (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2246{
2247 AV *av = (AV *)SvRV (arg [0]);
2248
2249 if (SvIVX (AvARRAY (av)[0]) > 0)
2250 {
2251 frame->data = (void *)av;
2252 frame->prepare = prepare_nop;
2253 }
2254 else
2255 {
2256 av_push (av, SvREFCNT_inc (SvRV (coro_current)));
2257
2258 frame->data = (void *)sv_2mortal (SvREFCNT_inc ((SV *)av));
2259 frame->prepare = prepare_schedule;
2260
2261 /* to avoid race conditions when a woken-up coro gets terminated */
2262 /* we arrange for a temporary on_destroy that calls adjust (0) */
2263 SvSTATE_current->on_destroy = coro_semaphore_on_destroy;
2264 }
2265}
2266
2267static void
2268slf_init_semaphore_down (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2269{
2270 slf_init_semaphore_down_or_wait (aTHX_ frame, cv, arg, items);
2271 frame->check = slf_check_semaphore_down;
2272}
2273
2274static void
2275slf_init_semaphore_wait (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2276{
2277 slf_init_semaphore_down_or_wait (aTHX_ frame, cv, arg, items);
2278 frame->check = slf_check_semaphore_wait;
2279}
2280
2281/* signal */
2282
2283static void
2284coro_signal_wake (pTHX_ AV *av, int count)
2285{
2286 SvIVX (AvARRAY (av)[0]) = 0;
2287
2288 /* now signal count waiters */
2289 while (count > 0 && AvFILLp (av) > 0)
2290 {
2291 SV *cb;
2292
2293 /* swap first two elements so we can shift a waiter */
2294 cb = AvARRAY (av)[0];
2295 AvARRAY (av)[0] = AvARRAY (av)[1];
2296 AvARRAY (av)[1] = cb;
2297
2298 cb = av_shift (av);
2299
2300 api_ready (aTHX_ cb);
2301 sv_setiv (cb, 0); /* signal waiter */
2302 SvREFCNT_dec (cb);
2303
2304 --count;
2305 }
2306}
2307
2308static int
2309slf_check_signal_wait (pTHX_ struct CoroSLF *frame)
2310{
2311 /* if we are about to throw, also stop waiting */
2312 return SvROK ((SV *)frame->data) && !CORO_THROW;
2313}
2314
2315static void
2316slf_init_signal_wait (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2317{
2318 AV *av = (AV *)SvRV (arg [0]);
2319
2320 if (SvIVX (AvARRAY (av)[0]))
2321 {
2322 SvIVX (AvARRAY (av)[0]) = 0;
2323 frame->prepare = prepare_nop;
2324 frame->check = slf_check_nop;
2325 }
2326 else
2327 {
2328 SV *waiter = newRV_inc (SvRV (coro_current)); /* owned by signal av */
2329
2330 av_push (av, waiter);
2331
2332 frame->data = (void *)sv_2mortal (SvREFCNT_inc_NN (waiter)); /* owned by process */
2333 frame->prepare = prepare_schedule;
2334 frame->check = slf_check_signal_wait;
2335 }
2336}
2337
2338/*****************************************************************************/
2339/* Coro::AIO */
2340
2341#define CORO_MAGIC_type_aio PERL_MAGIC_ext
2342
2343/* helper storage struct */
2344struct io_state
2345{
2346 int errorno;
2347 I32 laststype; /* U16 in 5.10.0 */
2348 int laststatval;
2349 Stat_t statcache;
2350};
2351
2352static void
2353coro_aio_callback (pTHX_ CV *cv)
2354{
2355 dXSARGS;
2356 AV *state = (AV *)GENSUB_ARG;
2357 SV *coro = av_pop (state);
2358 SV *data_sv = newSV (sizeof (struct io_state));
2359
2360 av_extend (state, items);
2361
2362 sv_upgrade (data_sv, SVt_PV);
2363 SvCUR_set (data_sv, sizeof (struct io_state));
2364 SvPOK_only (data_sv);
2365
2366 {
2367 struct io_state *data = (struct io_state *)SvPVX (data_sv);
2368
2369 data->errorno = errno;
2370 data->laststype = PL_laststype;
2371 data->laststatval = PL_laststatval;
2372 data->statcache = PL_statcache;
2373 }
2374
2375 /* now build the result vector out of all the parameters and the data_sv */
2376 {
2377 int i;
2378
2379 for (i = 0; i < items; ++i)
2380 av_push (state, SvREFCNT_inc_NN (ST (i)));
2381 }
2382
2383 av_push (state, data_sv);
2384
2385 api_ready (aTHX_ coro);
2386 SvREFCNT_dec (coro);
2387 SvREFCNT_dec ((AV *)state);
2388}
2389
2390static int
2391slf_check_aio_req (pTHX_ struct CoroSLF *frame)
2392{
2393 AV *state = (AV *)frame->data;
2394
2395 /* if we are about to throw, return early */
2396 /* this does not cancel the aio request, but at least */
2397 /* it quickly returns */
2398 if (CORO_THROW)
2399 return 0;
2400
2401 /* one element that is an RV? repeat! */
2402 if (AvFILLp (state) == 0 && SvROK (AvARRAY (state)[0]))
2403 return 1;
2404
2405 /* restore status */
2406 {
2407 SV *data_sv = av_pop (state);
2408 struct io_state *data = (struct io_state *)SvPVX (data_sv);
2409
2410 errno = data->errorno;
2411 PL_laststype = data->laststype;
2412 PL_laststatval = data->laststatval;
2413 PL_statcache = data->statcache;
2414
2415 SvREFCNT_dec (data_sv);
2416 }
2417
2418 /* push result values */
2419 {
2420 dSP;
2421 int i;
2422
2423 EXTEND (SP, AvFILLp (state) + 1);
2424 for (i = 0; i <= AvFILLp (state); ++i)
2425 PUSHs (sv_2mortal (SvREFCNT_inc_NN (AvARRAY (state)[i])));
2426
2427 PUTBACK;
2428 }
2429
2430 return 0;
2431}
2432
2433static void
2434slf_init_aio_req (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2435{
2436 AV *state = (AV *)sv_2mortal ((SV *)newAV ());
2437 SV *coro_hv = SvRV (coro_current);
2438 struct coro *coro = SvSTATE_hv (coro_hv);
2439
2440 /* put our coroutine id on the state arg */
2441 av_push (state, SvREFCNT_inc_NN (coro_hv));
2442
2443 /* first see whether we have a non-zero priority and set it as AIO prio */
2444 if (coro->prio)
2445 {
2446 dSP;
2447
2448 static SV *prio_cv;
2449 static SV *prio_sv;
2450
2451 if (expect_false (!prio_cv))
2452 {
2453 prio_cv = (SV *)get_cv ("IO::AIO::aioreq_pri", 0);
2454 prio_sv = newSViv (0);
2455 }
2456
2457 PUSHMARK (SP);
2458 sv_setiv (prio_sv, coro->prio);
2459 XPUSHs (prio_sv);
2460
2461 PUTBACK;
2462 call_sv (prio_cv, G_VOID | G_DISCARD);
2463 }
2464
2465 /* now call the original request */
2466 {
2467 dSP;
2468 CV *req = (CV *)CORO_MAGIC_NN ((SV *)cv, CORO_MAGIC_type_aio)->mg_obj;
2469 int i;
2470
2471 PUSHMARK (SP);
2472
2473 /* first push all args to the stack */
2474 EXTEND (SP, items + 1);
2475
2476 for (i = 0; i < items; ++i)
2477 PUSHs (arg [i]);
2478
2479 /* now push the callback closure */
2480 PUSHs (sv_2mortal (gensub (aTHX_ coro_aio_callback, (void *)SvREFCNT_inc_NN ((SV *)state))));
2481
2482 /* now call the AIO function - we assume our request is uncancelable */
2483 PUTBACK;
2484 call_sv ((SV *)req, G_VOID | G_DISCARD);
2485 }
2486
2487 /* now that the requets is going, we loop toll we have a result */
2488 frame->data = (void *)state;
2489 frame->prepare = prepare_schedule;
2490 frame->check = slf_check_aio_req;
2491}
2492
2493static void
2494coro_aio_req_xs (pTHX_ CV *cv)
2495{
2496 dXSARGS;
2497
2498 CORO_EXECUTE_SLF_XS (slf_init_aio_req);
2499
2500 XSRETURN_EMPTY;
2501}
2502
2503/*****************************************************************************/
2504
2505MODULE = Coro::State PACKAGE = Coro::State PREFIX = api_
2506
2507PROTOTYPES: DISABLE
2508
2509BOOT:
2510{
2511#ifdef USE_ITHREADS
2512# if CORO_PTHREAD
2513 coro_thx = PERL_GET_CONTEXT;
2514# endif
2515#endif
2516 BOOT_PAGESIZE;
2517
2518 irsgv = gv_fetchpv ("/" , GV_ADD|GV_NOTQUAL, SVt_PV);
2519 stdoutgv = gv_fetchpv ("STDOUT", GV_ADD|GV_NOTQUAL, SVt_PVIO);
2520
2521 orig_sigelem_get = PL_vtbl_sigelem.svt_get; PL_vtbl_sigelem.svt_get = coro_sigelem_get;
2522 orig_sigelem_set = PL_vtbl_sigelem.svt_set; PL_vtbl_sigelem.svt_set = coro_sigelem_set;
2523 orig_sigelem_clr = PL_vtbl_sigelem.svt_clear; PL_vtbl_sigelem.svt_clear = coro_sigelem_clr;
2524
2525 hv_sig = coro_get_hv (aTHX_ "SIG", TRUE);
2526 rv_diehook = newRV_inc ((SV *)gv_fetchpv ("Coro::State::diehook" , 0, SVt_PVCV));
2527 rv_warnhook = newRV_inc ((SV *)gv_fetchpv ("Coro::State::warnhook", 0, SVt_PVCV));
2528
2529 coro_state_stash = gv_stashpv ("Coro::State", TRUE);
2530
2531 newCONSTSUB (coro_state_stash, "CC_TRACE" , newSViv (CC_TRACE));
2532 newCONSTSUB (coro_state_stash, "CC_TRACE_SUB" , newSViv (CC_TRACE_SUB));
2533 newCONSTSUB (coro_state_stash, "CC_TRACE_LINE", newSViv (CC_TRACE_LINE));
2534 newCONSTSUB (coro_state_stash, "CC_TRACE_ALL" , newSViv (CC_TRACE_ALL));
2535
2536 main_mainstack = PL_mainstack;
2537 main_top_env = PL_top_env;
2538
2539 while (main_top_env->je_prev)
2540 main_top_env = main_top_env->je_prev;
2541
2542 {
2543 SV *slf = sv_2mortal (newSViv (PTR2IV (pp_slf)));
2544
2545 if (!PL_custom_op_names) PL_custom_op_names = newHV ();
2546 hv_store_ent (PL_custom_op_names, slf,
2547 newSVpv ("coro_slf", 0), 0);
2548
2549 if (!PL_custom_op_descs) PL_custom_op_descs = newHV ();
2550 hv_store_ent (PL_custom_op_descs, slf,
2551 newSVpv ("coro schedule like function", 0), 0);
2552 }
2553
2554 coroapi.ver = CORO_API_VERSION;
2555 coroapi.rev = CORO_API_REVISION;
2556
2557 coroapi.transfer = api_transfer;
2558
2559 coroapi.sv_state = SvSTATE_;
2560 coroapi.execute_slf = api_execute_slf;
2561 coroapi.prepare_nop = prepare_nop;
2562 coroapi.prepare_schedule = prepare_schedule;
2563 coroapi.prepare_cede = prepare_cede;
2564 coroapi.prepare_cede_notself = prepare_cede_notself;
2565
2566 {
2567 SV **svp = hv_fetch (PL_modglobal, "Time::NVtime", 12, 0);
2568
2569 if (!svp) croak ("Time::HiRes is required");
2570 if (!SvIOK (*svp)) croak ("Time::NVtime isn't a function pointer");
2571
2572 nvtime = INT2PTR (double (*)(), SvIV (*svp));
2573 }
2574
2575 assert (("PRIO_NORMAL must be 0", !PRIO_NORMAL));
2576}
2577
2578SV *
2579new (char *klass, ...)
2580 CODE:
2581{
2582 struct coro *coro;
2583 MAGIC *mg;
2584 HV *hv;
2585 int i;
2586
2587 Newz (0, coro, 1, struct coro);
2588 coro->args = newAV ();
2589 coro->flags = CF_NEW;
2590
2591 if (coro_first) coro_first->prev = coro;
2592 coro->next = coro_first;
2593 coro_first = coro;
2594
2595 coro->hv = hv = newHV ();
2596 mg = sv_magicext ((SV *)hv, 0, CORO_MAGIC_type_state, &coro_state_vtbl, (char *)coro, 0);
2597 mg->mg_flags |= MGf_DUP;
2598 RETVAL = sv_bless (newRV_noinc ((SV *)hv), gv_stashpv (klass, 1));
2599
2600 av_extend (coro->args, items - 1);
2601 for (i = 1; i < items; i++)
2602 av_push (coro->args, newSVsv (ST (i)));
2603}
2604 OUTPUT:
2605 RETVAL
2606
2607void
2608transfer (...)
2609 PROTOTYPE: $$
2610 CODE:
2611 CORO_EXECUTE_SLF_XS (slf_init_transfer);
2612
2613bool
2614_destroy (SV *coro_sv)
2615 CODE:
2616 RETVAL = coro_state_destroy (aTHX_ SvSTATE (coro_sv));
2617 OUTPUT:
2618 RETVAL
2619
2620void
2621_exit (int code)
2622 PROTOTYPE: $
2623 CODE:
2624 _exit (code);
2625
2626int
2627cctx_stacksize (int new_stacksize = 0)
2628 PROTOTYPE: ;$
2629 CODE:
2630 RETVAL = cctx_stacksize;
2631 if (new_stacksize)
2632 {
2633 cctx_stacksize = new_stacksize;
2634 ++cctx_gen;
2635 }
2636 OUTPUT:
2637 RETVAL
2638
2639int
2640cctx_max_idle (int max_idle = 0)
2641 PROTOTYPE: ;$
2642 CODE:
2643 RETVAL = cctx_max_idle;
2644 if (max_idle > 1)
2645 cctx_max_idle = max_idle;
2646 OUTPUT:
2647 RETVAL
2648
2649int
2650cctx_count ()
2651 PROTOTYPE:
2652 CODE:
2653 RETVAL = cctx_count;
2654 OUTPUT:
2655 RETVAL
2656
2657int
2658cctx_idle ()
2659 PROTOTYPE:
2660 CODE:
2661 RETVAL = cctx_idle;
2662 OUTPUT:
2663 RETVAL
2664
2665void
2666list ()
2667 PROTOTYPE:
2668 PPCODE:
2669{
2670 struct coro *coro;
2671 for (coro = coro_first; coro; coro = coro->next)
2672 if (coro->hv)
2673 XPUSHs (sv_2mortal (newRV_inc ((SV *)coro->hv)));
2674}
2675
2676void
2677call (Coro::State coro, SV *coderef)
2678 ALIAS:
2679 eval = 1
2680 CODE:
2681{
2682 if (coro->mainstack && ((coro->flags & CF_RUNNING) || coro->slot))
2683 {
2684 struct coro temp;
2685
2686 if (!(coro->flags & CF_RUNNING))
235 { 2687 {
236 /* I never used formats, so how should I know how these are implemented? */ 2688 PUTBACK;
237 /* my bold guess is as a simple, plain sub... */ 2689 save_perl (aTHX_ &temp);
238 croak ("CXt_FORMAT not yet handled. Don't switch coroutines from within formats"); 2690 load_perl (aTHX_ coro);
2691 }
2692
2693 {
2694 dSP;
2695 ENTER;
2696 SAVETMPS;
2697 PUTBACK;
2698 PUSHSTACK;
2699 PUSHMARK (SP);
2700
2701 if (ix)
2702 eval_sv (coderef, 0);
2703 else
2704 call_sv (coderef, G_KEEPERR | G_EVAL | G_VOID | G_DISCARD);
2705
2706 POPSTACK;
2707 SPAGAIN;
2708 FREETMPS;
2709 LEAVE;
2710 PUTBACK;
2711 }
2712
2713 if (!(coro->flags & CF_RUNNING))
2714 {
2715 save_perl (aTHX_ coro);
2716 load_perl (aTHX_ &temp);
2717 SPAGAIN;
239 } 2718 }
240 } 2719 }
241
242 if (top_si->si_type == PERLSI_MAIN)
243 break;
244
245 top_si = top_si->si_prev;
246 ccstk = top_si->si_cxstack;
247 cxix = top_si->si_cxix;
248 }
249
250 PUTBACK;
251 }
252
253 c->dowarn = PL_dowarn;
254 c->defav = GvAV (PL_defgv);
255 c->curstackinfo = PL_curstackinfo;
256 c->curstack = PL_curstack;
257 c->mainstack = PL_mainstack;
258 c->stack_sp = PL_stack_sp;
259 c->op = PL_op;
260 c->curpad = PL_curpad;
261 c->stack_base = PL_stack_base;
262 c->stack_max = PL_stack_max;
263 c->tmps_stack = PL_tmps_stack;
264 c->tmps_floor = PL_tmps_floor;
265 c->tmps_ix = PL_tmps_ix;
266 c->tmps_max = PL_tmps_max;
267 c->markstack = PL_markstack;
268 c->markstack_ptr = PL_markstack_ptr;
269 c->markstack_max = PL_markstack_max;
270 c->scopestack = PL_scopestack;
271 c->scopestack_ix = PL_scopestack_ix;
272 c->scopestack_max = PL_scopestack_max;
273 c->savestack = PL_savestack;
274 c->savestack_ix = PL_savestack_ix;
275 c->savestack_max = PL_savestack_max;
276 c->retstack = PL_retstack;
277 c->retstack_ix = PL_retstack_ix;
278 c->retstack_max = PL_retstack_max;
279 c->curcop = PL_curcop;
280} 2720}
281 2721
282static void 2722SV *
283LOAD(pTHX_ Coro__State c) 2723is_ready (Coro::State coro)
284{ 2724 PROTOTYPE: $
285 PL_dowarn = c->dowarn; 2725 ALIAS:
286 GvAV (PL_defgv) = c->defav; 2726 is_ready = CF_READY
287 PL_curstackinfo = c->curstackinfo; 2727 is_running = CF_RUNNING
288 PL_curstack = c->curstack; 2728 is_new = CF_NEW
289 PL_mainstack = c->mainstack; 2729 is_destroyed = CF_DESTROYED
290 PL_stack_sp = c->stack_sp; 2730 CODE:
291 PL_op = c->op; 2731 RETVAL = boolSV (coro->flags & ix);
292 PL_curpad = c->curpad; 2732 OUTPUT:
293 PL_stack_base = c->stack_base; 2733 RETVAL
294 PL_stack_max = c->stack_max;
295 PL_tmps_stack = c->tmps_stack;
296 PL_tmps_floor = c->tmps_floor;
297 PL_tmps_ix = c->tmps_ix;
298 PL_tmps_max = c->tmps_max;
299 PL_markstack = c->markstack;
300 PL_markstack_ptr = c->markstack_ptr;
301 PL_markstack_max = c->markstack_max;
302 PL_scopestack = c->scopestack;
303 PL_scopestack_ix = c->scopestack_ix;
304 PL_scopestack_max = c->scopestack_max;
305 PL_savestack = c->savestack;
306 PL_savestack_ix = c->savestack_ix;
307 PL_savestack_max = c->savestack_max;
308 PL_retstack = c->retstack;
309 PL_retstack_ix = c->retstack_ix;
310 PL_retstack_max = c->retstack_max;
311 PL_curcop = c->curcop;
312 2734
2735void
2736throw (Coro::State self, SV *throw = &PL_sv_undef)
2737 PROTOTYPE: $;$
2738 CODE:
2739{
2740 struct coro *current = SvSTATE_current;
2741 SV **throwp = self == current ? &CORO_THROW : &self->except;
2742 SvREFCNT_dec (*throwp);
2743 *throwp = SvOK (throw) ? newSVsv (throw) : 0;
2744}
2745
2746void
2747api_trace (SV *coro, int flags = CC_TRACE | CC_TRACE_SUB)
2748 PROTOTYPE: $;$
2749 C_ARGS: aTHX_ coro, flags
2750
2751SV *
2752has_cctx (Coro::State coro)
2753 PROTOTYPE: $
2754 CODE:
2755 RETVAL = boolSV (!!coro->cctx);
2756 OUTPUT:
2757 RETVAL
2758
2759int
2760is_traced (Coro::State coro)
2761 PROTOTYPE: $
2762 CODE:
2763 RETVAL = (coro->cctx ? coro->cctx->flags : 0) & CC_TRACE_ALL;
2764 OUTPUT:
2765 RETVAL
2766
2767UV
2768rss (Coro::State coro)
2769 PROTOTYPE: $
2770 ALIAS:
2771 usecount = 1
2772 CODE:
2773 switch (ix)
313 { 2774 {
314 dSP; 2775 case 0: RETVAL = coro_rss (aTHX_ coro); break;
315 CV *cv; 2776 case 1: RETVAL = coro->usecount; break;
316
317 /* now do the ugly restore mess */
318 while ((cv = (CV *)POPs))
319 {
320 AV *padlist = (AV *)POPs;
321
322 put_padlist (cv);
323 CvPADLIST(cv) = padlist;
324 CvDEPTH(cv) = (I32)POPs;
325
326#ifdef USE_THREADS
327 CvOWNER(cv) = (struct perl_thread *)POPs;
328 error does not work either
329#endif
330 } 2777 }
2778 OUTPUT:
2779 RETVAL
331 2780
332 PUTBACK; 2781void
333 } 2782force_cctx ()
334} 2783 PROTOTYPE:
2784 CODE:
2785 SvSTATE_current->cctx->idle_sp = 0;
335 2786
336/* this is an EXACT copy of S_nuke_stacks in perl.c, which is unfortunately static */ 2787void
337STATIC void 2788swap_defsv (Coro::State self)
338destroy_stacks(pTHX) 2789 PROTOTYPE: $
339{ 2790 ALIAS:
340 dSP; 2791 swap_defav = 1
2792 CODE:
2793 if (!self->slot)
2794 croak ("cannot swap state with coroutine that has no saved state,");
2795 else
2796 {
2797 SV **src = ix ? (SV **)&GvAV (PL_defgv) : &GvSV (PL_defgv);
2798 SV **dst = ix ? (SV **)&self->slot->defav : (SV **)&self->slot->defsv;
341 2799
342 /* die does this while calling POPSTACK, but I just don't see why. */ 2800 SV *tmp = *src; *src = *dst; *dst = tmp;
343 dounwind(-1); 2801 }
344 2802
345 /* is this ugly, I ask? */
346 while (PL_scopestack_ix)
347 LEAVE;
348 2803
349 while (PL_curstackinfo->si_next)
350 PL_curstackinfo = PL_curstackinfo->si_next;
351
352 while (PL_curstackinfo)
353 {
354 PERL_SI *p = PL_curstackinfo->si_prev;
355
356 SvREFCNT_dec(PL_curstackinfo->si_stack);
357 Safefree(PL_curstackinfo->si_cxstack);
358 Safefree(PL_curstackinfo);
359 PL_curstackinfo = p;
360 }
361
362 if (PL_scopestack_ix != 0)
363 Perl_warner(aTHX_ WARN_INTERNAL,
364 "Unbalanced scopes: %ld more ENTERs than LEAVEs\n",
365 (long)PL_scopestack_ix);
366 if (PL_savestack_ix != 0)
367 Perl_warner(aTHX_ WARN_INTERNAL,
368 "Unbalanced saves: %ld more saves than restores\n",
369 (long)PL_savestack_ix);
370 if (PL_tmps_floor != -1)
371 Perl_warner(aTHX_ WARN_INTERNAL,"Unbalanced tmps: %ld more allocs than frees\n",
372 (long)PL_tmps_floor + 1);
373 /*
374 */
375 Safefree(PL_tmps_stack);
376 Safefree(PL_markstack);
377 Safefree(PL_scopestack);
378 Safefree(PL_savestack);
379 Safefree(PL_retstack);
380}
381
382#define SUB_INIT "Coro::State::_newcoro"
383
384MODULE = Coro::State PACKAGE = Coro::State 2804MODULE = Coro::State PACKAGE = Coro
385
386PROTOTYPES: ENABLE
387 2805
388BOOT: 2806BOOT:
389 if (!padlist_cache) 2807{
390 padlist_cache = newHV (); 2808 int i;
391 2809
392Coro::State 2810 av_async_pool = coro_get_av (aTHX_ "Coro::async_pool", TRUE);
393_newprocess(args) 2811 sv_pool_rss = coro_get_sv (aTHX_ "Coro::POOL_RSS" , TRUE);
394 SV * args 2812 sv_pool_size = coro_get_sv (aTHX_ "Coro::POOL_SIZE" , TRUE);
2813
2814 coro_current = coro_get_sv (aTHX_ "Coro::current", FALSE);
2815 SvREADONLY_on (coro_current);
2816
2817 coro_stash = gv_stashpv ("Coro", TRUE);
2818
2819 newCONSTSUB (coro_stash, "PRIO_MAX", newSViv (PRIO_MAX));
2820 newCONSTSUB (coro_stash, "PRIO_HIGH", newSViv (PRIO_HIGH));
2821 newCONSTSUB (coro_stash, "PRIO_NORMAL", newSViv (PRIO_NORMAL));
2822 newCONSTSUB (coro_stash, "PRIO_LOW", newSViv (PRIO_LOW));
2823 newCONSTSUB (coro_stash, "PRIO_IDLE", newSViv (PRIO_IDLE));
2824 newCONSTSUB (coro_stash, "PRIO_MIN", newSViv (PRIO_MIN));
2825
2826 for (i = PRIO_MAX - PRIO_MIN + 1; i--; )
2827 coro_ready[i] = newAV ();
2828
2829 {
2830 SV *sv = coro_get_sv (aTHX_ "Coro::API", TRUE);
2831
2832 coroapi.schedule = api_schedule;
2833 coroapi.cede = api_cede;
2834 coroapi.cede_notself = api_cede_notself;
2835 coroapi.ready = api_ready;
2836 coroapi.is_ready = api_is_ready;
2837 coroapi.nready = coro_nready;
2838 coroapi.current = coro_current;
2839
2840 /*GCoroAPI = &coroapi;*/
2841 sv_setiv (sv, (IV)&coroapi);
2842 SvREADONLY_on (sv);
2843 }
2844}
2845
2846void
2847schedule (...)
2848 CODE:
2849 CORO_EXECUTE_SLF_XS (slf_init_schedule);
2850
2851void
2852cede (...)
2853 CODE:
2854 CORO_EXECUTE_SLF_XS (slf_init_cede);
2855
2856void
2857cede_notself (...)
2858 CODE:
2859 CORO_EXECUTE_SLF_XS (slf_init_cede_notself);
2860
2861void
2862_set_current (SV *current)
395 PROTOTYPE: $ 2863 PROTOTYPE: $
2864 CODE:
2865 SvREFCNT_dec (SvRV (coro_current));
2866 SvRV_set (coro_current, SvREFCNT_inc_NN (SvRV (current)));
2867
2868void
2869_set_readyhook (SV *hook)
2870 PROTOTYPE: $
396 CODE: 2871 CODE:
397 Coro__State coro; 2872 SvREFCNT_dec (coro_readyhook);
2873 coro_readyhook = SvOK (hook) ? newSVsv (hook) : 0;
398 2874
399 if (!SvROK (args) || SvTYPE (SvRV (args)) != SVt_PVAV) 2875int
400 croak ("Coro::State::newprocess expects an arrayref"); 2876prio (Coro::State coro, int newprio = 0)
2877 PROTOTYPE: $;$
2878 ALIAS:
2879 nice = 1
401 2880 CODE:
402 New (0, coro, 1, struct coro); 2881{
403
404 coro->mainstack = 0; /* actual work is done inside transfer */
405 coro->args = (AV *)SvREFCNT_inc (SvRV (args));
406
407 RETVAL = coro; 2882 RETVAL = coro->prio;
2883
2884 if (items > 1)
2885 {
2886 if (ix)
2887 newprio = coro->prio - newprio;
2888
2889 if (newprio < PRIO_MIN) newprio = PRIO_MIN;
2890 if (newprio > PRIO_MAX) newprio = PRIO_MAX;
2891
2892 coro->prio = newprio;
2893 }
2894}
2895 OUTPUT:
2896 RETVAL
2897
2898SV *
2899ready (SV *self)
2900 PROTOTYPE: $
2901 CODE:
2902 RETVAL = boolSV (api_ready (aTHX_ self));
2903 OUTPUT:
2904 RETVAL
2905
2906int
2907nready (...)
2908 PROTOTYPE:
2909 CODE:
2910 RETVAL = coro_nready;
2911 OUTPUT:
2912 RETVAL
2913
2914# for async_pool speedup
2915void
2916_pool_1 (SV *cb)
2917 CODE:
2918{
2919 HV *hv = (HV *)SvRV (coro_current);
2920 struct coro *coro = SvSTATE_hv ((SV *)hv);
2921 AV *defav = GvAV (PL_defgv);
2922 SV *invoke = hv_delete (hv, "_invoke", sizeof ("_invoke") - 1, 0);
2923 AV *invoke_av;
2924 int i, len;
2925
2926 if (!invoke)
2927 {
2928 SV *old = PL_diehook;
2929 PL_diehook = 0;
2930 SvREFCNT_dec (old);
2931 croak ("\3async_pool terminate\2\n");
2932 }
2933
2934 SvREFCNT_dec (coro->saved_deffh);
2935 coro->saved_deffh = SvREFCNT_inc_NN ((SV *)PL_defoutgv);
2936
2937 hv_store (hv, "desc", sizeof ("desc") - 1,
2938 newSVpvn ("[async_pool]", sizeof ("[async_pool]") - 1), 0);
2939
2940 invoke_av = (AV *)SvRV (invoke);
2941 len = av_len (invoke_av);
2942
2943 sv_setsv (cb, AvARRAY (invoke_av)[0]);
2944
2945 if (len > 0)
2946 {
2947 av_fill (defav, len - 1);
2948 for (i = 0; i < len; ++i)
2949 av_store (defav, i, SvREFCNT_inc_NN (AvARRAY (invoke_av)[i + 1]));
2950 }
2951}
2952
2953void
2954_pool_2 (SV *cb)
2955 CODE:
2956{
2957 HV *hv = (HV *)SvRV (coro_current);
2958 struct coro *coro = SvSTATE_hv ((SV *)hv);
2959
2960 sv_setsv (cb, &PL_sv_undef);
2961
2962 SvREFCNT_dec ((SV *)PL_defoutgv); PL_defoutgv = (GV *)coro->saved_deffh;
2963 coro->saved_deffh = 0;
2964
2965 if (coro_rss (aTHX_ coro) > SvUV (sv_pool_rss)
2966 || av_len (av_async_pool) + 1 >= SvIV (sv_pool_size))
2967 {
2968 SV *old = PL_diehook;
2969 PL_diehook = 0;
2970 SvREFCNT_dec (old);
2971 croak ("\3async_pool terminate\2\n");
2972 }
2973
2974 av_clear (GvAV (PL_defgv));
2975 hv_store (hv, "desc", sizeof ("desc") - 1,
2976 newSVpvn ("[async_pool idle]", sizeof ("[async_pool idle]") - 1), 0);
2977
2978 coro->prio = 0;
2979
2980 if (coro->cctx && (coro->cctx->flags & CC_TRACE))
2981 api_trace (aTHX_ coro_current, 0);
2982
2983 av_push (av_async_pool, newSVsv (coro_current));
2984}
2985
2986SV *
2987rouse_cb ()
2988 PROTOTYPE:
2989 CODE:
2990 RETVAL = coro_new_rouse_cb (aTHX);
2991 OUTPUT:
2992 RETVAL
2993
2994void
2995rouse_wait (SV *cb = 0)
2996 PROTOTYPE: ;$
2997 PPCODE:
2998 CORO_EXECUTE_SLF_XS (slf_init_rouse_wait);
2999
3000
3001MODULE = Coro::State PACKAGE = PerlIO::cede
3002
3003BOOT:
3004 PerlIO_define_layer (aTHX_ &PerlIO_cede);
3005
3006
3007MODULE = Coro::State PACKAGE = Coro::Semaphore
3008
3009SV *
3010new (SV *klass, SV *count = 0)
3011 CODE:
3012 RETVAL = sv_bless (
3013 coro_waitarray_new (aTHX_ count && SvOK (count) ? SvIV (count) : 1),
3014 GvSTASH (CvGV (cv))
3015 );
3016 OUTPUT:
3017 RETVAL
3018
3019# helper for Coro::Channel
3020SV *
3021_alloc (int count)
3022 CODE:
3023 RETVAL = coro_waitarray_new (aTHX_ count);
3024 OUTPUT:
3025 RETVAL
3026
3027SV *
3028count (SV *self)
3029 CODE:
3030 RETVAL = newSVsv (AvARRAY ((AV *)SvRV (self))[0]);
3031 OUTPUT:
3032 RETVAL
3033
3034void
3035up (SV *self, int adjust = 1)
3036 ALIAS:
3037 adjust = 1
3038 CODE:
3039 coro_semaphore_adjust (aTHX_ (AV *)SvRV (self), ix ? adjust : 1);
3040
3041void
3042down (SV *self)
3043 CODE:
3044 CORO_EXECUTE_SLF_XS (slf_init_semaphore_down);
3045
3046void
3047wait (SV *self)
3048 CODE:
3049 CORO_EXECUTE_SLF_XS (slf_init_semaphore_wait);
3050
3051void
3052try (SV *self)
3053 PPCODE:
3054{
3055 AV *av = (AV *)SvRV (self);
3056 SV *count_sv = AvARRAY (av)[0];
3057 IV count = SvIVX (count_sv);
3058
3059 if (count > 0)
3060 {
3061 --count;
3062 SvIVX (count_sv) = count;
3063 XSRETURN_YES;
3064 }
3065 else
3066 XSRETURN_NO;
3067}
3068
3069void
3070waiters (SV *self)
3071 PPCODE:
3072{
3073 AV *av = (AV *)SvRV (self);
3074 int wcount = AvFILLp (av) + 1 - 1;
3075
3076 if (GIMME_V == G_SCALAR)
3077 XPUSHs (sv_2mortal (newSViv (wcount)));
3078 else
3079 {
3080 int i;
3081 EXTEND (SP, wcount);
3082 for (i = 1; i <= wcount; ++i)
3083 PUSHs (sv_2mortal (newRV_inc (AvARRAY (av)[i])));
3084 }
3085}
3086
3087MODULE = Coro::State PACKAGE = Coro::Signal
3088
3089SV *
3090new (SV *klass)
3091 CODE:
3092 RETVAL = sv_bless (
3093 coro_waitarray_new (aTHX_ 0),
3094 GvSTASH (CvGV (cv))
3095 );
408 OUTPUT: 3096 OUTPUT:
409 RETVAL 3097 RETVAL
410 3098
411void 3099void
412transfer(prev,next) 3100wait (SV *self)
413 Coro::State_or_hashref prev
414 Coro::State_or_hashref next
415 CODE: 3101 CODE:
3102 CORO_EXECUTE_SLF_XS (slf_init_signal_wait);
416 3103
417 if (prev != next) 3104void
3105broadcast (SV *self)
3106 CODE:
3107{
3108 AV *av = (AV *)SvRV (self);
3109 coro_signal_wake (aTHX_ av, AvFILLp (av));
3110}
3111
3112void
3113send (SV *self)
3114 CODE:
3115{
3116 AV *av = (AV *)SvRV (self);
3117
3118 if (AvFILLp (av))
3119 coro_signal_wake (aTHX_ av, 1);
3120 else
3121 SvIVX (AvARRAY (av)[0]) = 1; /* remember the signal */
3122}
3123
3124IV
3125awaited (SV *self)
3126 CODE:
3127 RETVAL = AvFILLp ((AV *)SvRV (self)) + 1 - 1;
3128 OUTPUT:
3129 RETVAL
3130
3131
3132MODULE = Coro::State PACKAGE = Coro::AnyEvent
3133
3134BOOT:
3135 sv_activity = coro_get_sv (aTHX_ "Coro::AnyEvent::ACTIVITY", TRUE);
3136
3137void
3138_schedule (...)
3139 CODE:
3140{
3141 static int incede;
3142
3143 api_cede_notself (aTHX);
3144
3145 ++incede;
3146 while (coro_nready >= incede && api_cede (aTHX))
3147 ;
3148
3149 sv_setsv (sv_activity, &PL_sv_undef);
3150 if (coro_nready >= incede)
418 { 3151 {
3152 PUSHMARK (SP);
419 PUTBACK; 3153 PUTBACK;
420 SAVE (aTHX_ prev); 3154 call_pv ("Coro::AnyEvent::_activity", G_KEEPERR | G_EVAL | G_VOID | G_DISCARD);
421
422 /*
423 * this could be done in newprocess which would lead to
424 * extremely elegant and fast (just PUTBACK/SAVE/LOAD/SPAGAIN)
425 * code here, but lazy allocation of stacks has also
426 * some virtues and the overhead of the if() is nil.
427 */
428 if (next->mainstack)
429 {
430 LOAD (aTHX_ next);
431 next->mainstack = 0; /* unnecessary but much cleaner */
432 SPAGAIN;
433 }
434 else
435 {
436 /*
437 * emulate part of the perl startup here.
438 */
439 UNOP myop;
440
441 init_stacks (); /* from perl.c */
442 PL_op = (OP *)&myop;
443 /*PL_curcop = 0;*/
444 GvAV (PL_defgv) = (SV *)SvREFCNT_inc (next->args);
445
446 SPAGAIN;
447 Zero(&myop, 1, UNOP);
448 myop.op_next = Nullop;
449 myop.op_flags = OPf_WANT_VOID;
450
451 PUSHMARK(SP);
452 XPUSHs ((SV*)get_cv(SUB_INIT, TRUE));
453 PUTBACK;
454 /*
455 * the next line is slightly wrong, as PL_op->op_next
456 * is actually being executed so we skip the first op.
457 * that doesn't matter, though, since it is only
458 * pp_nextstate and we never return...
459 */
460 PL_op = Perl_pp_entersub(aTHX);
461 SPAGAIN;
462
463 ENTER;
464 }
465 } 3155 }
466 3156
3157 --incede;
3158}
3159
3160
3161MODULE = Coro::State PACKAGE = Coro::AIO
3162
467void 3163void
468DESTROY(coro) 3164_register (char *target, char *proto, SV *req)
469 Coro::State coro 3165 CODE:
470 CODE: 3166{
3167 HV *st;
3168 GV *gvp;
3169 CV *req_cv = sv_2cv (req, &st, &gvp, 0);
3170 /* newXSproto doesn't return the CV on 5.8 */
3171 CV *slf_cv = newXS (target, coro_aio_req_xs, __FILE__);
3172 sv_setpv ((SV *)slf_cv, proto);
3173 sv_magicext ((SV *)slf_cv, (SV *)req_cv, CORO_MAGIC_type_aio, 0, 0, 0);
3174}
471 3175
472 if (coro->mainstack)
473 {
474 struct coro temp;
475
476 PUTBACK;
477 SAVE(aTHX_ (&temp));
478 LOAD(aTHX_ coro);
479
480 destroy_stacks ();
481 SvREFCNT_dec ((SV *)GvAV (PL_defgv));
482
483 LOAD((&temp));
484 SPAGAIN;
485 }
486
487 SvREFCNT_dec (coro->args);
488 Safefree (coro);
489
490

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines