ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/Coro/Coro/State.xs
(Generate patch)

Comparing Coro/Coro/State.xs (file contents):
Revision 1.6 by root, Tue Jul 17 15:42:28 2001 UTC vs.
Revision 1.349 by root, Wed Jun 10 00:47:24 2009 UTC

1#include "libcoro/coro.c"
2
3#define PERL_NO_GET_CONTEXT
4#define PERL_EXT
5
1#include "EXTERN.h" 6#include "EXTERN.h"
2#include "perl.h" 7#include "perl.h"
3#include "XSUB.h" 8#include "XSUB.h"
9#include "perliol.h"
4 10
5#if 0 11#include "patchlevel.h"
6# define CHK(x) (void *)0 12
13#include <stdio.h>
14#include <errno.h>
15#include <assert.h>
16
17#ifdef WIN32
18# undef setjmp
19# undef longjmp
20# undef _exit
21# define setjmp _setjmp /* deep magic */
7#else 22#else
8# define CHK(x) if (!(x)) croak("FATAL, CHK: " #x) 23# include <inttypes.h> /* most portable stdint.h */
24#endif
25
26#ifdef HAVE_MMAP
27# include <unistd.h>
28# include <sys/mman.h>
29# ifndef MAP_ANONYMOUS
30# ifdef MAP_ANON
31# define MAP_ANONYMOUS MAP_ANON
32# else
33# undef HAVE_MMAP
34# endif
9#endif 35# endif
36# include <limits.h>
37# ifndef PAGESIZE
38# define PAGESIZE pagesize
39# define BOOT_PAGESIZE pagesize = sysconf (_SC_PAGESIZE)
40static long pagesize;
41# else
42# define BOOT_PAGESIZE (void)0
43# endif
44#else
45# define PAGESIZE 0
46# define BOOT_PAGESIZE (void)0
47#endif
10 48
49#if CORO_USE_VALGRIND
50# include <valgrind/valgrind.h>
51#endif
52
53/* the maximum number of idle cctx that will be pooled */
54static int cctx_max_idle = 4;
55
56#define PERL_VERSION_ATLEAST(a,b,c) \
57 (PERL_REVISION > (a) \
58 || (PERL_REVISION == (a) \
59 && (PERL_VERSION > (b) \
60 || (PERL_VERSION == (b) && PERL_SUBVERSION >= (c)))))
61
62#if !PERL_VERSION_ATLEAST (5,6,0)
63# ifndef PL_ppaddr
64# define PL_ppaddr ppaddr
65# endif
66# ifndef call_sv
67# define call_sv perl_call_sv
68# endif
69# ifndef get_sv
70# define get_sv perl_get_sv
71# endif
72# ifndef get_cv
73# define get_cv perl_get_cv
74# endif
75# ifndef IS_PADGV
76# define IS_PADGV(v) 0
77# endif
78# ifndef IS_PADCONST
79# define IS_PADCONST(v) 0
80# endif
81#endif
82
83/* 5.11 */
84#ifndef CxHASARGS
85# define CxHASARGS(cx) (cx)->blk_sub.hasargs
86#endif
87
88/* 5.10.0 */
89#ifndef SvREFCNT_inc_NN
90# define SvREFCNT_inc_NN(sv) SvREFCNT_inc (sv)
91#endif
92
93/* 5.8.8 */
94#ifndef GV_NOTQUAL
95# define GV_NOTQUAL 0
96#endif
97#ifndef newSV
98# define newSV(l) NEWSV(0,l)
99#endif
100#ifndef CvISXSUB_on
101# define CvISXSUB_on(cv) (void)cv
102#endif
103#ifndef CvISXSUB
104# define CvISXSUB(cv) (CvXSUB (cv) ? TRUE : FALSE)
105#endif
106#ifndef Newx
107# define Newx(ptr,nitems,type) New (0,ptr,nitems,type)
108#endif
109
110/* 5.8.7 */
111#ifndef SvRV_set
112# define SvRV_set(s,v) SvRV(s) = (v)
113#endif
114
115#if !__i386 && !__x86_64 && !__powerpc && !__m68k && !__alpha && !__mips && !__sparc64
116# undef CORO_STACKGUARD
117#endif
118
119#ifndef CORO_STACKGUARD
120# define CORO_STACKGUARD 0
121#endif
122
123/* prefer perl internal functions over our own? */
124#ifndef CORO_PREFER_PERL_FUNCTIONS
125# define CORO_PREFER_PERL_FUNCTIONS 0
126#endif
127
128/* The next macros try to return the current stack pointer, in an as
129 * portable way as possible. */
130#if __GNUC__ >= 4
131# define dSTACKLEVEL int stacklevel_dummy
132# define STACKLEVEL __builtin_frame_address (0)
133#else
134# define dSTACKLEVEL volatile void *stacklevel
135# define STACKLEVEL ((void *)&stacklevel)
136#endif
137
138#define IN_DESTRUCT PL_dirty
139
140#if __GNUC__ >= 3
141# define attribute(x) __attribute__(x)
142# define expect(expr,value) __builtin_expect ((expr),(value))
143# define INLINE static inline
144#else
145# define attribute(x)
146# define expect(expr,value) (expr)
147# define INLINE static
148#endif
149
150#define expect_false(expr) expect ((expr) != 0, 0)
151#define expect_true(expr) expect ((expr) != 0, 1)
152
153#define NOINLINE attribute ((noinline))
154
155#include "CoroAPI.h"
156#define GCoroAPI (&coroapi) /* very sneaky */
157
158#ifdef USE_ITHREADS
159# if CORO_PTHREAD
160static void *coro_thx;
161# endif
162#endif
163
164static double (*nvtime)(); /* so why doesn't it take void? */
165
166/* we hijack an hopefully unused CV flag for our purposes */
167#define CVf_SLF 0x4000
168static OP *pp_slf (pTHX);
169
170static U32 cctx_gen;
171static size_t cctx_stacksize = CORO_STACKSIZE;
172static struct CoroAPI coroapi;
173static AV *main_mainstack; /* used to differentiate between $main and others */
174static JMPENV *main_top_env;
175static HV *coro_state_stash, *coro_stash;
176static volatile SV *coro_mortal; /* will be freed/thrown after next transfer */
177
178static AV *av_destroy; /* destruction queue */
179static SV *sv_manager; /* the manager coro */
180static SV *sv_idle; /* $Coro::idle */
181
182static GV *irsgv; /* $/ */
183static GV *stdoutgv; /* *STDOUT */
184static SV *rv_diehook;
185static SV *rv_warnhook;
186static HV *hv_sig; /* %SIG */
187
188/* async_pool helper stuff */
189static SV *sv_pool_rss;
190static SV *sv_pool_size;
191static SV *sv_async_pool_idle; /* description string */
192static AV *av_async_pool; /* idle pool */
193static SV *sv_Coro; /* class string */
194static CV *cv_pool_handler;
195static CV *cv_coro_state_new;
196
197/* Coro::AnyEvent */
198static SV *sv_activity;
199
200static struct coro_cctx *cctx_first;
201static int cctx_count, cctx_idle;
202
203enum {
204 CC_MAPPED = 0x01,
205 CC_NOREUSE = 0x02, /* throw this away after tracing */
206 CC_TRACE = 0x04,
207 CC_TRACE_SUB = 0x08, /* trace sub calls */
208 CC_TRACE_LINE = 0x10, /* trace each statement */
209 CC_TRACE_ALL = CC_TRACE_SUB | CC_TRACE_LINE,
210};
211
212/* this is a structure representing a c-level coroutine */
213typedef struct coro_cctx
214{
215 struct coro_cctx *next;
216
217 /* the stack */
218 void *sptr;
219 size_t ssize;
220
221 /* cpu state */
222 void *idle_sp; /* sp of top-level transfer/schedule/cede call */
223 JMPENV *idle_te; /* same as idle_sp, but for top_env, TODO: remove once stable */
224 JMPENV *top_env;
225 coro_context cctx;
226
227 U32 gen;
228#if CORO_USE_VALGRIND
229 int valgrind_id;
230#endif
231 unsigned char flags;
232} coro_cctx;
233
234coro_cctx *cctx_current; /* the currently running cctx */
235
236/*****************************************************************************/
237
238enum {
239 CF_RUNNING = 0x0001, /* coroutine is running */
240 CF_READY = 0x0002, /* coroutine is ready */
241 CF_NEW = 0x0004, /* has never been switched to */
242 CF_DESTROYED = 0x0008, /* coroutine data has been freed */
243 CF_SUSPENDED = 0x0010, /* coroutine can't be scheduled */
244};
245
246/* the structure where most of the perl state is stored, overlaid on the cxstack */
247typedef struct
248{
249 SV *defsv;
250 AV *defav;
251 SV *errsv;
252 SV *irsgv;
253 HV *hinthv;
254#define VAR(name,type) type name;
255# include "state.h"
256#undef VAR
257} perl_slots;
258
259#define SLOT_COUNT ((sizeof (perl_slots) + sizeof (PERL_CONTEXT) - 1) / sizeof (PERL_CONTEXT))
260
261/* this is a structure representing a perl-level coroutine */
11struct coro { 262struct coro {
12 U8 dowarn; 263 /* the C coroutine allocated to this perl coroutine, if any */
13 AV *defav; 264 coro_cctx *cctx;
14 265
15 PERL_SI *curstackinfo; 266 /* ready queue */
16 AV *curstack; 267 struct coro *next_ready;
268
269 /* state data */
270 struct CoroSLF slf_frame; /* saved slf frame */
17 AV *mainstack; 271 AV *mainstack;
18 SV **stack_sp; 272 perl_slots *slot; /* basically the saved sp */
19 OP *op;
20 SV **curpad;
21 SV **stack_base;
22 SV **stack_max;
23 SV **tmps_stack;
24 I32 tmps_floor;
25 I32 tmps_ix;
26 I32 tmps_max;
27 I32 *markstack;
28 I32 *markstack_ptr;
29 I32 *markstack_max;
30 I32 *scopestack;
31 I32 scopestack_ix;
32 I32 scopestack_max;
33 ANY *savestack;
34 I32 savestack_ix;
35 I32 savestack_max;
36 OP **retstack;
37 I32 retstack_ix;
38 I32 retstack_max;
39 COP *curcop;
40 273
41 AV *args; 274 CV *startcv; /* the CV to execute */
275 AV *args; /* data associated with this coroutine (initial args) */
276 int refcnt; /* coroutines are refcounted, yes */
277 int flags; /* CF_ flags */
278 HV *hv; /* the perl hash associated with this coro, if any */
279 void (*on_destroy)(pTHX_ struct coro *coro);
280
281 /* statistics */
282 int usecount; /* number of transfers to this coro */
283
284 /* coro process data */
285 int prio;
286 SV *except; /* exception to be thrown */
287 SV *rouse_cb;
288
289 /* async_pool */
290 SV *saved_deffh;
291 SV *invoke_cb;
292 AV *invoke_av;
293
294 /* on_enter/on_leave */
295 AV *on_enter;
296 AV *on_leave;
297
298 /* linked list */
299 struct coro *next, *prev;
42}; 300};
43 301
44typedef struct coro *Coro__State; 302typedef struct coro *Coro__State;
45typedef struct coro *Coro__State_or_hashref; 303typedef struct coro *Coro__State_or_hashref;
46 304
47static HV *padlist_cache; 305/* the following variables are effectively part of the perl context */
306/* and get copied between struct coro and these variables */
307/* the mainr easonw e don't support windows process emulation */
308static struct CoroSLF slf_frame; /* the current slf frame */
48 309
49/* mostly copied from op.c:cv_clone2 */ 310/** Coro ********************************************************************/
50STATIC AV * 311
51clone_padlist (AV *protopadlist) 312#define PRIO_MAX 3
313#define PRIO_HIGH 1
314#define PRIO_NORMAL 0
315#define PRIO_LOW -1
316#define PRIO_IDLE -3
317#define PRIO_MIN -4
318
319/* for Coro.pm */
320static SV *coro_current;
321static SV *coro_readyhook;
322static struct coro *coro_ready [PRIO_MAX - PRIO_MIN + 1][2]; /* head|tail */
323static CV *cv_coro_run, *cv_coro_terminate;
324static struct coro *coro_first;
325#define coro_nready coroapi.nready
326
327/** lowlevel stuff **********************************************************/
328
329static SV *
330coro_get_sv (pTHX_ const char *name, int create)
52{ 331{
53 AV *av; 332#if PERL_VERSION_ATLEAST (5,10,0)
54 I32 ix; 333 /* silence stupid and wrong 5.10 warning that I am unable to switch off */
55 AV *protopad_name = (AV *) * av_fetch (protopadlist, 0, FALSE); 334 get_sv (name, create);
56 AV *protopad = (AV *) * av_fetch (protopadlist, 1, FALSE); 335#endif
57 SV **pname = AvARRAY (protopad_name); 336 return get_sv (name, create);
58 SV **ppad = AvARRAY (protopad); 337}
59 I32 fname = AvFILLp (protopad_name); 338
60 I32 fpad = AvFILLp (protopad); 339static AV *
340coro_get_av (pTHX_ const char *name, int create)
341{
342#if PERL_VERSION_ATLEAST (5,10,0)
343 /* silence stupid and wrong 5.10 warning that I am unable to switch off */
344 get_av (name, create);
345#endif
346 return get_av (name, create);
347}
348
349static HV *
350coro_get_hv (pTHX_ const char *name, int create)
351{
352#if PERL_VERSION_ATLEAST (5,10,0)
353 /* silence stupid and wrong 5.10 warning that I am unable to switch off */
354 get_hv (name, create);
355#endif
356 return get_hv (name, create);
357}
358
359/* may croak */
360INLINE CV *
361coro_sv_2cv (pTHX_ SV *sv)
362{
363 HV *st;
364 GV *gvp;
365 CV *cv = sv_2cv (sv, &st, &gvp, 0);
366
367 if (!cv)
368 croak ("code reference expected");
369
370 return cv;
371}
372
373/*****************************************************************************/
374/* magic glue */
375
376#define CORO_MAGIC_type_cv 26
377#define CORO_MAGIC_type_state PERL_MAGIC_ext
378
379#define CORO_MAGIC_NN(sv, type) \
380 (expect_true (SvMAGIC (sv)->mg_type == type) \
381 ? SvMAGIC (sv) \
382 : mg_find (sv, type))
383
384#define CORO_MAGIC(sv, type) \
385 (expect_true (SvMAGIC (sv)) \
386 ? CORO_MAGIC_NN (sv, type) \
387 : 0)
388
389#define CORO_MAGIC_cv(cv) CORO_MAGIC (((SV *)(cv)), CORO_MAGIC_type_cv)
390#define CORO_MAGIC_state(sv) CORO_MAGIC_NN (((SV *)(sv)), CORO_MAGIC_type_state)
391
392INLINE struct coro *
393SvSTATE_ (pTHX_ SV *coro)
394{
395 HV *stash;
396 MAGIC *mg;
397
398 if (SvROK (coro))
399 coro = SvRV (coro);
400
401 if (expect_false (SvTYPE (coro) != SVt_PVHV))
402 croak ("Coro::State object required");
403
404 stash = SvSTASH (coro);
405 if (expect_false (stash != coro_stash && stash != coro_state_stash))
406 {
407 /* very slow, but rare, check */
408 if (!sv_derived_from (sv_2mortal (newRV_inc (coro)), "Coro::State"))
409 croak ("Coro::State object required");
410 }
411
412 mg = CORO_MAGIC_state (coro);
413 return (struct coro *)mg->mg_ptr;
414}
415
416#define SvSTATE(sv) SvSTATE_ (aTHX_ (sv))
417
418/* faster than SvSTATE, but expects a coroutine hv */
419#define SvSTATE_hv(hv) ((struct coro *)CORO_MAGIC_NN ((SV *)hv, CORO_MAGIC_type_state)->mg_ptr)
420#define SvSTATE_current SvSTATE_hv (SvRV (coro_current))
421
422/*****************************************************************************/
423/* padlist management and caching */
424
425static AV *
426coro_derive_padlist (pTHX_ CV *cv)
427{
428 AV *padlist = CvPADLIST (cv);
61 AV *newpadlist, *newpad_name, *newpad; 429 AV *newpadlist, *newpad;
62 SV **npad;
63
64 newpad_name = newAV ();
65 for (ix = fname; ix >= 0; ix--)
66 av_store (newpad_name, ix, SvREFCNT_inc (pname[ix]));
67
68 newpad = newAV ();
69 av_fill (newpad, AvFILLp (protopad));
70 npad = AvARRAY (newpad);
71 430
72 newpadlist = newAV (); 431 newpadlist = newAV ();
73 AvREAL_off (newpadlist); 432 AvREAL_off (newpadlist);
74 av_store (newpadlist, 0, (SV *) newpad_name); 433#if PERL_VERSION_ATLEAST (5,10,0)
434 Perl_pad_push (aTHX_ padlist, AvFILLp (padlist) + 1);
435#else
436 Perl_pad_push (aTHX_ padlist, AvFILLp (padlist) + 1, 1);
437#endif
438 newpad = (AV *)AvARRAY (padlist)[AvFILLp (padlist)];
439 --AvFILLp (padlist);
440
441 av_store (newpadlist, 0, SvREFCNT_inc_NN (AvARRAY (padlist)[0]));
75 av_store (newpadlist, 1, (SV *) newpad); 442 av_store (newpadlist, 1, (SV *)newpad);
76 443
77 av = newAV (); /* will be @_ */ 444 return newpadlist;
78 av_extend (av, 0); 445}
79 av_store (newpad, 0, (SV *) av);
80 AvFLAGS (av) = AVf_REIFY;
81 446
82 for (ix = fpad; ix > 0; ix--) 447static void
448free_padlist (pTHX_ AV *padlist)
449{
450 /* may be during global destruction */
451 if (!IN_DESTRUCT)
83 { 452 {
84 SV *namesv = (ix <= fname) ? pname[ix] : Nullsv; 453 I32 i = AvFILLp (padlist);
85 if (namesv && namesv != &PL_sv_undef) 454
455 while (i > 0) /* special-case index 0 */
86 { 456 {
87 char *name = SvPVX (namesv); /* XXX */ 457 /* we try to be extra-careful here */
88 if (SvFLAGS (namesv) & SVf_FAKE || *name == '&') 458 AV *av = (AV *)AvARRAY (padlist)[i--];
89 { /* lexical from outside? */ 459 I32 j = AvFILLp (av);
90 npad[ix] = SvREFCNT_inc (ppad[ix]); 460
91 } 461 while (j >= 0)
92 else 462 SvREFCNT_dec (AvARRAY (av)[j--]);
93 { /* our own lexical */ 463
94 SV *sv; 464 AvFILLp (av) = -1;
95 if (*name == '&') 465 SvREFCNT_dec (av);
96 sv = SvREFCNT_inc (ppad[ix]);
97 else if (*name == '@')
98 sv = (SV *) newAV ();
99 else if (*name == '%')
100 sv = (SV *) newHV ();
101 else
102 sv = NEWSV (0, 0);
103 if (!SvPADBUSY (sv))
104 SvPADMY_on (sv);
105 npad[ix] = sv;
106 }
107 } 466 }
108 else if (IS_PADGV (ppad[ix]) || IS_PADCONST (ppad[ix]))
109 {
110 npad[ix] = SvREFCNT_inc (ppad[ix]);
111 }
112 else
113 {
114 SV *sv = NEWSV (0, 0);
115 SvPADTMP_on (sv);
116 npad[ix] = sv;
117 }
118 }
119 467
120#if 0 /* NONOTUNDERSTOOD */ 468 SvREFCNT_dec (AvARRAY (padlist)[0]);
121 /* Now that vars are all in place, clone nested closures. */
122 469
123 for (ix = fpad; ix > 0; ix--) {
124 SV* namesv = (ix <= fname) ? pname[ix] : Nullsv;
125 if (namesv
126 && namesv != &PL_sv_undef
127 && !(SvFLAGS(namesv) & SVf_FAKE)
128 && *SvPVX(namesv) == '&'
129 && CvCLONE(ppad[ix]))
130 {
131 CV *kid = cv_clone((CV*)ppad[ix]);
132 SvREFCNT_dec(ppad[ix]);
133 CvCLONE_on(kid);
134 SvPADMY_on(kid);
135 npad[ix] = (SV*)kid;
136 }
137 }
138#endif
139
140 return newpadlist;
141}
142
143STATIC AV *
144free_padlist (AV *padlist)
145{
146 /* may be during global destruction */
147 if (SvREFCNT(padlist))
148 {
149 I32 i = AvFILLp(padlist); 470 AvFILLp (padlist) = -1;
150 while (i >= 0)
151 {
152 SV **svp = av_fetch(padlist, i--, FALSE);
153 SV *sv = svp ? *svp : Nullsv;
154 if (sv)
155 SvREFCNT_dec(sv);
156 }
157
158 SvREFCNT_dec((SV*)padlist); 471 SvREFCNT_dec ((SV*)padlist);
472 }
473}
474
475static int
476coro_cv_free (pTHX_ SV *sv, MAGIC *mg)
477{
478 AV *padlist;
479 AV *av = (AV *)mg->mg_obj;
480
481 /* casting is fun. */
482 while (&PL_sv_undef != (SV *)(padlist = (AV *)av_pop (av)))
483 free_padlist (aTHX_ padlist);
484
485 SvREFCNT_dec (av); /* sv_magicext increased the refcount */
486
487 return 0;
488}
489
490static MGVTBL coro_cv_vtbl = {
491 0, 0, 0, 0,
492 coro_cv_free
493};
494
495/* the next two functions merely cache the padlists */
496static void
497get_padlist (pTHX_ CV *cv)
498{
499 MAGIC *mg = CORO_MAGIC_cv (cv);
500 AV *av;
501
502 if (expect_true (mg && AvFILLp ((av = (AV *)mg->mg_obj)) >= 0))
503 CvPADLIST (cv) = (AV *)AvARRAY (av)[AvFILLp (av)--];
504 else
505 {
506#if CORO_PREFER_PERL_FUNCTIONS
507 /* this is probably cleaner? but also slower! */
508 /* in practise, it seems to be less stable */
509 CV *cp = Perl_cv_clone (aTHX_ cv);
510 CvPADLIST (cv) = CvPADLIST (cp);
511 CvPADLIST (cp) = 0;
512 SvREFCNT_dec (cp);
513#else
514 CvPADLIST (cv) = coro_derive_padlist (aTHX_ cv);
515#endif
516 }
517}
518
519static void
520put_padlist (pTHX_ CV *cv)
521{
522 MAGIC *mg = CORO_MAGIC_cv (cv);
523 AV *av;
524
525 if (expect_false (!mg))
526 mg = sv_magicext ((SV *)cv, (SV *)newAV (), CORO_MAGIC_type_cv, &coro_cv_vtbl, 0, 0);
527
528 av = (AV *)mg->mg_obj;
529
530 if (expect_false (AvFILLp (av) >= AvMAX (av)))
531 av_extend (av, AvFILLp (av) + 1);
532
533 AvARRAY (av)[++AvFILLp (av)] = (SV *)CvPADLIST (cv);
534}
535
536/** load & save, init *******************************************************/
537
538static void
539on_enterleave_call (pTHX_ SV *cb);
540
541static void
542load_perl (pTHX_ Coro__State c)
543{
544 perl_slots *slot = c->slot;
545 c->slot = 0;
546
547 PL_mainstack = c->mainstack;
548
549 GvSV (PL_defgv) = slot->defsv;
550 GvAV (PL_defgv) = slot->defav;
551 GvSV (PL_errgv) = slot->errsv;
552 GvSV (irsgv) = slot->irsgv;
553 GvHV (PL_hintgv) = slot->hinthv;
554
555 #define VAR(name,type) PL_ ## name = slot->name;
556 # include "state.h"
557 #undef VAR
558
559 {
560 dSP;
561
562 CV *cv;
563
564 /* now do the ugly restore mess */
565 while (expect_true (cv = (CV *)POPs))
566 {
567 put_padlist (aTHX_ cv); /* mark this padlist as available */
568 CvDEPTH (cv) = PTR2IV (POPs);
569 CvPADLIST (cv) = (AV *)POPs;
570 }
571
572 PUTBACK;
159 } 573 }
160}
161 574
162/* the next tow functions merely cache the padlists */ 575 slf_frame = c->slf_frame;
163STATIC void 576 CORO_THROW = c->except;
164get_padlist (CV *cv)
165{
166 SV **he = hv_fetch (padlist_cache, (void *)&cv, sizeof (CV *), 0);
167 577
168 if (he && AvFILLp ((AV *)*he) >= 0) 578 if (expect_false (c->on_enter))
169 CvPADLIST (cv) = (AV *)av_pop ((AV *)*he);
170 else
171 CvPADLIST (cv) = clone_padlist (CvPADLIST (cv));
172}
173
174STATIC void
175put_padlist (CV *cv)
176{
177 SV **he = hv_fetch (padlist_cache, (void *)&cv, sizeof (CV *), 1);
178
179 if (SvTYPE (*he) != SVt_PVAV)
180 {
181 SvREFCNT_dec (*he);
182 *he = (SV *)newAV ();
183 } 579 {
580 int i;
184 581
185 av_push ((AV *)*he, (SV *)CvPADLIST (cv)); 582 for (i = 0; i <= AvFILLp (c->on_enter); ++i)
583 on_enterleave_call (aTHX_ AvARRAY (c->on_enter)[i]);
584 }
186} 585}
187 586
188static void 587static void
189save_state(pTHX_ Coro__State c) 588save_perl (pTHX_ Coro__State c)
190{ 589{
590 if (expect_false (c->on_leave))
591 {
592 int i;
593
594 for (i = AvFILLp (c->on_leave); i >= 0; --i)
595 on_enterleave_call (aTHX_ AvARRAY (c->on_leave)[i]);
596 }
597
598 c->except = CORO_THROW;
599 c->slf_frame = slf_frame;
600
191 { 601 {
192 dSP; 602 dSP;
193 I32 cxix = cxstack_ix; 603 I32 cxix = cxstack_ix;
604 PERL_CONTEXT *ccstk = cxstack;
194 PERL_SI *top_si = PL_curstackinfo; 605 PERL_SI *top_si = PL_curstackinfo;
195 PERL_CONTEXT *ccstk = cxstack;
196 606
197 /* 607 /*
198 * the worst thing you can imagine happens first - we have to save 608 * the worst thing you can imagine happens first - we have to save
199 * (and reinitialize) all cv's in the whole callchain :( 609 * (and reinitialize) all cv's in the whole callchain :(
200 */ 610 */
201 611
202 PUSHs (Nullsv); 612 XPUSHs (Nullsv);
203 /* this loop was inspired by pp_caller */ 613 /* this loop was inspired by pp_caller */
204 for (;;) 614 for (;;)
205 { 615 {
206 while (cxix >= 0) 616 while (expect_true (cxix >= 0))
207 { 617 {
208 PERL_CONTEXT *cx = &ccstk[cxix--]; 618 PERL_CONTEXT *cx = &ccstk[cxix--];
209 619
210 if (CxTYPE(cx) == CXt_SUB) 620 if (expect_true (CxTYPE (cx) == CXt_SUB || CxTYPE (cx) == CXt_FORMAT))
211 { 621 {
212 CV *cv = cx->blk_sub.cv; 622 CV *cv = cx->blk_sub.cv;
623
213 if (CvDEPTH(cv)) 624 if (expect_true (CvDEPTH (cv)))
214 { 625 {
215#ifdef USE_THREADS
216 XPUSHs ((SV *)CvOWNER(cv));
217#endif
218 EXTEND (SP, 3); 626 EXTEND (SP, 3);
219 PUSHs ((SV *)CvDEPTH(cv));
220 PUSHs ((SV *)CvPADLIST(cv)); 627 PUSHs ((SV *)CvPADLIST (cv));
628 PUSHs (INT2PTR (SV *, (IV)CvDEPTH (cv)));
221 PUSHs ((SV *)cv); 629 PUSHs ((SV *)cv);
222 630
223 get_padlist (cv);
224
225 CvDEPTH(cv) = 0; 631 CvDEPTH (cv) = 0;
226#ifdef USE_THREADS 632 get_padlist (aTHX_ cv);
227 CvOWNER(cv) = 0;
228 error must unlock this cv etc.. etc...
229 if you are here wondering about this error message then
230 the reason is that it will not work as advertised yet
231#endif
232 } 633 }
233 } 634 }
234 else if (CxTYPE(cx) == CXt_FORMAT) 635 }
636
637 if (expect_true (top_si->si_type == PERLSI_MAIN))
638 break;
639
640 top_si = top_si->si_prev;
641 ccstk = top_si->si_cxstack;
642 cxix = top_si->si_cxix;
643 }
644
645 PUTBACK;
646 }
647
648 /* allocate some space on the context stack for our purposes */
649 /* we manually unroll here, as usually 2 slots is enough */
650 if (SLOT_COUNT >= 1) CXINC;
651 if (SLOT_COUNT >= 2) CXINC;
652 if (SLOT_COUNT >= 3) CXINC;
653 {
654 int i;
655 for (i = 3; i < SLOT_COUNT; ++i)
656 CXINC;
657 }
658 cxstack_ix -= SLOT_COUNT; /* undo allocation */
659
660 c->mainstack = PL_mainstack;
661
662 {
663 perl_slots *slot = c->slot = (perl_slots *)(cxstack + cxstack_ix + 1);
664
665 slot->defav = GvAV (PL_defgv);
666 slot->defsv = DEFSV;
667 slot->errsv = ERRSV;
668 slot->irsgv = GvSV (irsgv);
669 slot->hinthv = GvHV (PL_hintgv);
670
671 #define VAR(name,type) slot->name = PL_ ## name;
672 # include "state.h"
673 #undef VAR
674 }
675}
676
677/*
678 * allocate various perl stacks. This is almost an exact copy
679 * of perl.c:init_stacks, except that it uses less memory
680 * on the (sometimes correct) assumption that coroutines do
681 * not usually need a lot of stackspace.
682 */
683#if CORO_PREFER_PERL_FUNCTIONS
684# define coro_init_stacks(thx) init_stacks ()
685#else
686static void
687coro_init_stacks (pTHX)
688{
689 PL_curstackinfo = new_stackinfo(32, 8);
690 PL_curstackinfo->si_type = PERLSI_MAIN;
691 PL_curstack = PL_curstackinfo->si_stack;
692 PL_mainstack = PL_curstack; /* remember in case we switch stacks */
693
694 PL_stack_base = AvARRAY(PL_curstack);
695 PL_stack_sp = PL_stack_base;
696 PL_stack_max = PL_stack_base + AvMAX(PL_curstack);
697
698 New(50,PL_tmps_stack,32,SV*);
699 PL_tmps_floor = -1;
700 PL_tmps_ix = -1;
701 PL_tmps_max = 32;
702
703 New(54,PL_markstack,16,I32);
704 PL_markstack_ptr = PL_markstack;
705 PL_markstack_max = PL_markstack + 16;
706
707#ifdef SET_MARK_OFFSET
708 SET_MARK_OFFSET;
709#endif
710
711 New(54,PL_scopestack,8,I32);
712 PL_scopestack_ix = 0;
713 PL_scopestack_max = 8;
714
715 New(54,PL_savestack,24,ANY);
716 PL_savestack_ix = 0;
717 PL_savestack_max = 24;
718
719#if !PERL_VERSION_ATLEAST (5,10,0)
720 New(54,PL_retstack,4,OP*);
721 PL_retstack_ix = 0;
722 PL_retstack_max = 4;
723#endif
724}
725#endif
726
727/*
728 * destroy the stacks, the callchain etc...
729 */
730static void
731coro_destruct_stacks (pTHX)
732{
733 while (PL_curstackinfo->si_next)
734 PL_curstackinfo = PL_curstackinfo->si_next;
735
736 while (PL_curstackinfo)
737 {
738 PERL_SI *p = PL_curstackinfo->si_prev;
739
740 if (!IN_DESTRUCT)
741 SvREFCNT_dec (PL_curstackinfo->si_stack);
742
743 Safefree (PL_curstackinfo->si_cxstack);
744 Safefree (PL_curstackinfo);
745 PL_curstackinfo = p;
746 }
747
748 Safefree (PL_tmps_stack);
749 Safefree (PL_markstack);
750 Safefree (PL_scopestack);
751 Safefree (PL_savestack);
752#if !PERL_VERSION_ATLEAST (5,10,0)
753 Safefree (PL_retstack);
754#endif
755}
756
757#define CORO_RSS \
758 rss += sizeof (SYM (curstackinfo)); \
759 rss += (SYM (curstackinfo->si_cxmax) + 1) * sizeof (PERL_CONTEXT); \
760 rss += sizeof (SV) + sizeof (struct xpvav) + (1 + AvMAX (SYM (curstack))) * sizeof (SV *); \
761 rss += SYM (tmps_max) * sizeof (SV *); \
762 rss += (SYM (markstack_max) - SYM (markstack_ptr)) * sizeof (I32); \
763 rss += SYM (scopestack_max) * sizeof (I32); \
764 rss += SYM (savestack_max) * sizeof (ANY);
765
766static size_t
767coro_rss (pTHX_ struct coro *coro)
768{
769 size_t rss = sizeof (*coro);
770
771 if (coro->mainstack)
772 {
773 if (coro->flags & CF_RUNNING)
774 {
775 #define SYM(sym) PL_ ## sym
776 CORO_RSS;
777 #undef SYM
778 }
779 else
780 {
781 #define SYM(sym) coro->slot->sym
782 CORO_RSS;
783 #undef SYM
784 }
785 }
786
787 return rss;
788}
789
790/** coroutine stack handling ************************************************/
791
792static int (*orig_sigelem_get) (pTHX_ SV *sv, MAGIC *mg);
793static int (*orig_sigelem_set) (pTHX_ SV *sv, MAGIC *mg);
794static int (*orig_sigelem_clr) (pTHX_ SV *sv, MAGIC *mg);
795
796/* apparently < 5.8.8 */
797#ifndef MgPV_nolen_const
798#define MgPV_nolen_const(mg) (((((int)(mg)->mg_len)) == HEf_SVKEY) ? \
799 SvPV_nolen((SV*)((mg)->mg_ptr)) : \
800 (const char*)(mg)->mg_ptr)
801#endif
802
803/*
804 * This overrides the default magic get method of %SIG elements.
805 * The original one doesn't provide for reading back of PL_diehook/PL_warnhook
806 * and instead of trying to save and restore the hash elements, we just provide
807 * readback here.
808 */
809static int
810coro_sigelem_get (pTHX_ SV *sv, MAGIC *mg)
811{
812 const char *s = MgPV_nolen_const (mg);
813
814 if (*s == '_')
815 {
816 SV **svp = 0;
817
818 if (strEQ (s, "__DIE__" )) svp = &PL_diehook;
819 if (strEQ (s, "__WARN__")) svp = &PL_warnhook;
820
821 if (svp)
822 {
823 sv_setsv (sv, *svp ? *svp : &PL_sv_undef);
824 return 0;
825 }
826 }
827
828 return orig_sigelem_get ? orig_sigelem_get (aTHX_ sv, mg) : 0;
829}
830
831static int
832coro_sigelem_clr (pTHX_ SV *sv, MAGIC *mg)
833{
834 const char *s = MgPV_nolen_const (mg);
835
836 if (*s == '_')
837 {
838 SV **svp = 0;
839
840 if (strEQ (s, "__DIE__" )) svp = &PL_diehook;
841 if (strEQ (s, "__WARN__")) svp = &PL_warnhook;
842
843 if (svp)
844 {
845 SV *old = *svp;
846 *svp = 0;
847 SvREFCNT_dec (old);
848 return 0;
849 }
850 }
851
852 return orig_sigelem_clr ? orig_sigelem_clr (aTHX_ sv, mg) : 0;
853}
854
855static int
856coro_sigelem_set (pTHX_ SV *sv, MAGIC *mg)
857{
858 const char *s = MgPV_nolen_const (mg);
859
860 if (*s == '_')
861 {
862 SV **svp = 0;
863
864 if (strEQ (s, "__DIE__" )) svp = &PL_diehook;
865 if (strEQ (s, "__WARN__")) svp = &PL_warnhook;
866
867 if (svp)
868 {
869 SV *old = *svp;
870 *svp = SvOK (sv) ? newSVsv (sv) : 0;
871 SvREFCNT_dec (old);
872 return 0;
873 }
874 }
875
876 return orig_sigelem_set ? orig_sigelem_set (aTHX_ sv, mg) : 0;
877}
878
879static void
880prepare_nop (pTHX_ struct coro_transfer_args *ta)
881{
882 /* kind of mega-hacky, but works */
883 ta->next = ta->prev = (struct coro *)ta;
884}
885
886static int
887slf_check_nop (pTHX_ struct CoroSLF *frame)
888{
889 return 0;
890}
891
892static int
893slf_check_repeat (pTHX_ struct CoroSLF *frame)
894{
895 return 1;
896}
897
898static UNOP coro_setup_op;
899
900static void NOINLINE /* noinline to keep it out of the transfer fast path */
901coro_setup (pTHX_ struct coro *coro)
902{
903 /*
904 * emulate part of the perl startup here.
905 */
906 coro_init_stacks (aTHX);
907
908 PL_runops = RUNOPS_DEFAULT;
909 PL_curcop = &PL_compiling;
910 PL_in_eval = EVAL_NULL;
911 PL_comppad = 0;
912 PL_comppad_name = 0;
913 PL_comppad_name_fill = 0;
914 PL_comppad_name_floor = 0;
915 PL_curpm = 0;
916 PL_curpad = 0;
917 PL_localizing = 0;
918 PL_dirty = 0;
919 PL_restartop = 0;
920#if PERL_VERSION_ATLEAST (5,10,0)
921 PL_parser = 0;
922#endif
923 PL_hints = 0;
924
925 /* recreate the die/warn hooks */
926 PL_diehook = 0; SvSetMagicSV (*hv_fetch (hv_sig, "__DIE__" , sizeof ("__DIE__" ) - 1, 1), rv_diehook );
927 PL_warnhook = 0; SvSetMagicSV (*hv_fetch (hv_sig, "__WARN__", sizeof ("__WARN__") - 1, 1), rv_warnhook);
928
929 GvSV (PL_defgv) = newSV (0);
930 GvAV (PL_defgv) = coro->args; coro->args = 0;
931 GvSV (PL_errgv) = newSV (0);
932 GvSV (irsgv) = newSVpvn ("\n", 1); sv_magic (GvSV (irsgv), (SV *)irsgv, PERL_MAGIC_sv, "/", 0);
933 GvHV (PL_hintgv) = 0;
934 PL_rs = newSVsv (GvSV (irsgv));
935 PL_defoutgv = (GV *)SvREFCNT_inc_NN (stdoutgv);
936
937 {
938 dSP;
939 UNOP myop;
940
941 Zero (&myop, 1, UNOP);
942 myop.op_next = Nullop;
943 myop.op_type = OP_ENTERSUB;
944 myop.op_flags = OPf_WANT_VOID;
945
946 PUSHMARK (SP);
947 PUSHs ((SV *)coro->startcv);
948 PUTBACK;
949 PL_op = (OP *)&myop;
950 PL_op = PL_ppaddr[OP_ENTERSUB](aTHX);
951 }
952
953 /* this newly created coroutine might be run on an existing cctx which most
954 * likely was suspended in pp_slf, so we have to emulate entering pp_slf here.
955 */
956 slf_frame.prepare = prepare_nop; /* provide a nop function for an eventual pp_slf */
957 slf_frame.check = slf_check_nop; /* signal pp_slf to not repeat */
958
959 /* and we have to provide the pp_slf op in any case, so pp_slf can skip it */
960 coro_setup_op.op_next = PL_op;
961 coro_setup_op.op_type = OP_ENTERSUB;
962 coro_setup_op.op_ppaddr = pp_slf;
963 /* no flags etc. required, as an init function won't be called */
964
965 PL_op = (OP *)&coro_setup_op;
966
967 /* copy throw, in case it was set before coro_setup */
968 CORO_THROW = coro->except;
969}
970
971static void
972coro_unwind_stacks (pTHX)
973{
974 if (!IN_DESTRUCT)
975 {
976 /* restore all saved variables and stuff */
977 LEAVE_SCOPE (0);
978 assert (PL_tmps_floor == -1);
979
980 /* free all temporaries */
981 FREETMPS;
982 assert (PL_tmps_ix == -1);
983
984 /* unwind all extra stacks */
985 POPSTACK_TO (PL_mainstack);
986
987 /* unwind main stack */
988 dounwind (-1);
989 }
990}
991
992static void
993coro_destruct_perl (pTHX_ struct coro *coro)
994{
995 coro_unwind_stacks (aTHX);
996
997 SvREFCNT_dec (GvSV (PL_defgv));
998 SvREFCNT_dec (GvAV (PL_defgv));
999 SvREFCNT_dec (GvSV (PL_errgv));
1000 SvREFCNT_dec (PL_defoutgv);
1001 SvREFCNT_dec (PL_rs);
1002 SvREFCNT_dec (GvSV (irsgv));
1003 SvREFCNT_dec (GvHV (PL_hintgv));
1004
1005 SvREFCNT_dec (PL_diehook);
1006 SvREFCNT_dec (PL_warnhook);
1007
1008 SvREFCNT_dec (coro->saved_deffh);
1009 SvREFCNT_dec (coro->rouse_cb);
1010 SvREFCNT_dec (coro->invoke_cb);
1011 SvREFCNT_dec (coro->invoke_av);
1012
1013 coro_destruct_stacks (aTHX);
1014}
1015
1016INLINE void
1017free_coro_mortal (pTHX)
1018{
1019 if (expect_true (coro_mortal))
1020 {
1021 SvREFCNT_dec (coro_mortal);
1022 coro_mortal = 0;
1023 }
1024}
1025
1026static int
1027runops_trace (pTHX)
1028{
1029 COP *oldcop = 0;
1030 int oldcxix = -2;
1031
1032 while ((PL_op = CALL_FPTR (PL_op->op_ppaddr) (aTHX)))
1033 {
1034 PERL_ASYNC_CHECK ();
1035
1036 if (cctx_current->flags & CC_TRACE_ALL)
1037 {
1038 if (PL_op->op_type == OP_LEAVESUB && cctx_current->flags & CC_TRACE_SUB)
1039 {
1040 PERL_CONTEXT *cx = &cxstack[cxstack_ix];
1041 SV **bot, **top;
1042 AV *av = newAV (); /* return values */
1043 SV **cb;
1044 dSP;
1045
1046 GV *gv = CvGV (cx->blk_sub.cv);
1047 SV *fullname = sv_2mortal (newSV (0));
1048 if (isGV (gv))
1049 gv_efullname3 (fullname, gv, 0);
1050
1051 bot = PL_stack_base + cx->blk_oldsp + 1;
1052 top = cx->blk_gimme == G_ARRAY ? SP + 1
1053 : cx->blk_gimme == G_SCALAR ? bot + 1
1054 : bot;
1055
1056 av_extend (av, top - bot);
1057 while (bot < top)
1058 av_push (av, SvREFCNT_inc_NN (*bot++));
1059
1060 PL_runops = RUNOPS_DEFAULT;
1061 ENTER;
1062 SAVETMPS;
1063 EXTEND (SP, 3);
1064 PUSHMARK (SP);
1065 PUSHs (&PL_sv_no);
1066 PUSHs (fullname);
1067 PUSHs (sv_2mortal (newRV_noinc ((SV *)av)));
1068 PUTBACK;
1069 cb = hv_fetch ((HV *)SvRV (coro_current), "_trace_sub_cb", sizeof ("_trace_sub_cb") - 1, 0);
1070 if (cb) call_sv (*cb, G_KEEPERR | G_EVAL | G_VOID | G_DISCARD);
1071 SPAGAIN;
1072 FREETMPS;
1073 LEAVE;
1074 PL_runops = runops_trace;
1075 }
1076
1077 if (oldcop != PL_curcop)
1078 {
1079 oldcop = PL_curcop;
1080
1081 if (PL_curcop != &PL_compiling)
1082 {
1083 SV **cb;
1084
1085 if (oldcxix != cxstack_ix && cctx_current->flags & CC_TRACE_SUB)
1086 {
1087 PERL_CONTEXT *cx = &cxstack[cxstack_ix];
1088
1089 if (CxTYPE (cx) == CXt_SUB && oldcxix < cxstack_ix)
1090 {
1091 dSP;
1092 GV *gv = CvGV (cx->blk_sub.cv);
1093 SV *fullname = sv_2mortal (newSV (0));
1094
1095 if (isGV (gv))
1096 gv_efullname3 (fullname, gv, 0);
1097
1098 PL_runops = RUNOPS_DEFAULT;
1099 ENTER;
1100 SAVETMPS;
1101 EXTEND (SP, 3);
1102 PUSHMARK (SP);
1103 PUSHs (&PL_sv_yes);
1104 PUSHs (fullname);
1105 PUSHs (CxHASARGS (cx) ? sv_2mortal (newRV_inc ((SV *)cx->blk_sub.argarray)) : &PL_sv_undef);
1106 PUTBACK;
1107 cb = hv_fetch ((HV *)SvRV (coro_current), "_trace_sub_cb", sizeof ("_trace_sub_cb") - 1, 0);
1108 if (cb) call_sv (*cb, G_KEEPERR | G_EVAL | G_VOID | G_DISCARD);
1109 SPAGAIN;
1110 FREETMPS;
1111 LEAVE;
1112 PL_runops = runops_trace;
1113 }
1114
1115 oldcxix = cxstack_ix;
1116 }
1117
1118 if (cctx_current->flags & CC_TRACE_LINE)
1119 {
1120 dSP;
1121
1122 PL_runops = RUNOPS_DEFAULT;
1123 ENTER;
1124 SAVETMPS;
1125 EXTEND (SP, 3);
1126 PL_runops = RUNOPS_DEFAULT;
1127 PUSHMARK (SP);
1128 PUSHs (sv_2mortal (newSVpv (OutCopFILE (oldcop), 0)));
1129 PUSHs (sv_2mortal (newSViv (CopLINE (oldcop))));
1130 PUTBACK;
1131 cb = hv_fetch ((HV *)SvRV (coro_current), "_trace_line_cb", sizeof ("_trace_line_cb") - 1, 0);
1132 if (cb) call_sv (*cb, G_KEEPERR | G_EVAL | G_VOID | G_DISCARD);
1133 SPAGAIN;
1134 FREETMPS;
1135 LEAVE;
1136 PL_runops = runops_trace;
1137 }
1138 }
1139 }
1140 }
1141 }
1142
1143 TAINT_NOT;
1144 return 0;
1145}
1146
1147static struct CoroSLF cctx_ssl_frame;
1148
1149static void
1150slf_prepare_set_stacklevel (pTHX_ struct coro_transfer_args *ta)
1151{
1152 ta->prev = 0;
1153}
1154
1155static int
1156slf_check_set_stacklevel (pTHX_ struct CoroSLF *frame)
1157{
1158 *frame = cctx_ssl_frame;
1159
1160 return frame->check (aTHX_ frame); /* execute the restored frame - there must be one */
1161}
1162
1163/* initialises PL_top_env and injects a pseudo-slf-call to set the stacklevel */
1164static void NOINLINE
1165cctx_prepare (pTHX)
1166{
1167 PL_top_env = &PL_start_env;
1168
1169 if (cctx_current->flags & CC_TRACE)
1170 PL_runops = runops_trace;
1171
1172 /* we already must be executing an SLF op, there is no other valid way
1173 * that can lead to creation of a new cctx */
1174 assert (("FATAL: can't prepare slf-less cctx in Coro module (please report)",
1175 slf_frame.prepare && PL_op->op_ppaddr == pp_slf));
1176
1177 /* we must emulate leaving pp_slf, which is done inside slf_check_set_stacklevel */
1178 cctx_ssl_frame = slf_frame;
1179
1180 slf_frame.prepare = slf_prepare_set_stacklevel;
1181 slf_frame.check = slf_check_set_stacklevel;
1182}
1183
1184/* the tail of transfer: execute stuff we can only do after a transfer */
1185INLINE void
1186transfer_tail (pTHX)
1187{
1188 free_coro_mortal (aTHX);
1189}
1190
1191/*
1192 * this is a _very_ stripped down perl interpreter ;)
1193 */
1194static void
1195cctx_run (void *arg)
1196{
1197#ifdef USE_ITHREADS
1198# if CORO_PTHREAD
1199 PERL_SET_CONTEXT (coro_thx);
1200# endif
1201#endif
1202 {
1203 dTHX;
1204
1205 /* normally we would need to skip the entersub here */
1206 /* not doing so will re-execute it, which is exactly what we want */
1207 /* PL_nop = PL_nop->op_next */
1208
1209 /* inject a fake subroutine call to cctx_init */
1210 cctx_prepare (aTHX);
1211
1212 /* cctx_run is the alternative tail of transfer() */
1213 transfer_tail (aTHX);
1214
1215 /* somebody or something will hit me for both perl_run and PL_restartop */
1216 PL_restartop = PL_op;
1217 perl_run (PL_curinterp);
1218 /*
1219 * Unfortunately, there is no way to get at the return values of the
1220 * coro body here, as perl_run destroys these
1221 */
1222
1223 /*
1224 * If perl-run returns we assume exit() was being called or the coro
1225 * fell off the end, which seems to be the only valid (non-bug)
1226 * reason for perl_run to return. We try to exit by jumping to the
1227 * bootstrap-time "top" top_env, as we cannot restore the "main"
1228 * coroutine as Coro has no such concept.
1229 * This actually isn't valid with the pthread backend, but OSes requiring
1230 * that backend are too broken to do it in a standards-compliant way.
1231 */
1232 PL_top_env = main_top_env;
1233 JMPENV_JUMP (2); /* I do not feel well about the hardcoded 2 at all */
1234 }
1235}
1236
1237static coro_cctx *
1238cctx_new ()
1239{
1240 coro_cctx *cctx;
1241
1242 ++cctx_count;
1243 New (0, cctx, 1, coro_cctx);
1244
1245 cctx->gen = cctx_gen;
1246 cctx->flags = 0;
1247 cctx->idle_sp = 0; /* can be accessed by transfer between cctx_run and set_stacklevel, on throw */
1248
1249 return cctx;
1250}
1251
1252/* create a new cctx only suitable as source */
1253static coro_cctx *
1254cctx_new_empty ()
1255{
1256 coro_cctx *cctx = cctx_new ();
1257
1258 cctx->sptr = 0;
1259 coro_create (&cctx->cctx, 0, 0, 0, 0);
1260
1261 return cctx;
1262}
1263
1264/* create a new cctx suitable as destination/running a perl interpreter */
1265static coro_cctx *
1266cctx_new_run ()
1267{
1268 coro_cctx *cctx = cctx_new ();
1269 void *stack_start;
1270 size_t stack_size;
1271
1272#if HAVE_MMAP
1273 cctx->ssize = ((cctx_stacksize * sizeof (long) + PAGESIZE - 1) / PAGESIZE + CORO_STACKGUARD) * PAGESIZE;
1274 /* mmap supposedly does allocate-on-write for us */
1275 cctx->sptr = mmap (0, cctx->ssize, PROT_EXEC|PROT_READ|PROT_WRITE, MAP_PRIVATE|MAP_ANONYMOUS, 0, 0);
1276
1277 if (cctx->sptr != (void *)-1)
1278 {
1279 #if CORO_STACKGUARD
1280 mprotect (cctx->sptr, CORO_STACKGUARD * PAGESIZE, PROT_NONE);
1281 #endif
1282 stack_start = (char *)cctx->sptr + CORO_STACKGUARD * PAGESIZE;
1283 stack_size = cctx->ssize - CORO_STACKGUARD * PAGESIZE;
1284 cctx->flags |= CC_MAPPED;
1285 }
1286 else
1287#endif
1288 {
1289 cctx->ssize = cctx_stacksize * (long)sizeof (long);
1290 New (0, cctx->sptr, cctx_stacksize, long);
1291
1292 if (!cctx->sptr)
1293 {
1294 perror ("FATAL: unable to allocate stack for coroutine, exiting.");
1295 _exit (EXIT_FAILURE);
1296 }
1297
1298 stack_start = cctx->sptr;
1299 stack_size = cctx->ssize;
1300 }
1301
1302 #if CORO_USE_VALGRIND
1303 cctx->valgrind_id = VALGRIND_STACK_REGISTER ((char *)stack_start, (char *)stack_start + stack_size);
1304 #endif
1305
1306 coro_create (&cctx->cctx, cctx_run, (void *)cctx, stack_start, stack_size);
1307
1308 return cctx;
1309}
1310
1311static void
1312cctx_destroy (coro_cctx *cctx)
1313{
1314 if (!cctx)
1315 return;
1316
1317 assert (("FATAL: tried to destroy current cctx", cctx != cctx_current));//D temporary?
1318
1319 --cctx_count;
1320 coro_destroy (&cctx->cctx);
1321
1322 /* coro_transfer creates new, empty cctx's */
1323 if (cctx->sptr)
1324 {
1325 #if CORO_USE_VALGRIND
1326 VALGRIND_STACK_DEREGISTER (cctx->valgrind_id);
1327 #endif
1328
1329#if HAVE_MMAP
1330 if (cctx->flags & CC_MAPPED)
1331 munmap (cctx->sptr, cctx->ssize);
1332 else
1333#endif
1334 Safefree (cctx->sptr);
1335 }
1336
1337 Safefree (cctx);
1338}
1339
1340/* wether this cctx should be destructed */
1341#define CCTX_EXPIRED(cctx) ((cctx)->gen != cctx_gen || ((cctx)->flags & CC_NOREUSE))
1342
1343static coro_cctx *
1344cctx_get (pTHX)
1345{
1346 while (expect_true (cctx_first))
1347 {
1348 coro_cctx *cctx = cctx_first;
1349 cctx_first = cctx->next;
1350 --cctx_idle;
1351
1352 if (expect_true (!CCTX_EXPIRED (cctx)))
1353 return cctx;
1354
1355 cctx_destroy (cctx);
1356 }
1357
1358 return cctx_new_run ();
1359}
1360
1361static void
1362cctx_put (coro_cctx *cctx)
1363{
1364 assert (("FATAL: cctx_put called on non-initialised cctx in Coro (please report)", cctx->sptr));
1365
1366 /* free another cctx if overlimit */
1367 if (expect_false (cctx_idle >= cctx_max_idle))
1368 {
1369 coro_cctx *first = cctx_first;
1370 cctx_first = first->next;
1371 --cctx_idle;
1372
1373 cctx_destroy (first);
1374 }
1375
1376 ++cctx_idle;
1377 cctx->next = cctx_first;
1378 cctx_first = cctx;
1379}
1380
1381/** coroutine switching *****************************************************/
1382
1383static void
1384transfer_check (pTHX_ struct coro *prev, struct coro *next)
1385{
1386 /* TODO: throwing up here is considered harmful */
1387
1388 if (expect_true (prev != next))
1389 {
1390 if (expect_false (!(prev->flags & (CF_RUNNING | CF_NEW))))
1391 croak ("Coro::State::transfer called with a blocked prev Coro::State, but can only transfer from running or new states,");
1392
1393 if (expect_false (next->flags & (CF_RUNNING | CF_DESTROYED | CF_SUSPENDED)))
1394 croak ("Coro::State::transfer called with running, destroyed or suspended next Coro::State, but can only transfer to inactive states,");
1395
1396#if !PERL_VERSION_ATLEAST (5,10,0)
1397 if (expect_false (PL_lex_state != LEX_NOTPARSING))
1398 croak ("Coro::State::transfer called while parsing, but this is not supported in your perl version,");
1399#endif
1400 }
1401}
1402
1403/* always use the TRANSFER macro */
1404static void NOINLINE /* noinline so we have a fixed stackframe */
1405transfer (pTHX_ struct coro *prev, struct coro *next, int force_cctx)
1406{
1407 dSTACKLEVEL;
1408
1409 /* sometimes transfer is only called to set idle_sp */
1410 if (expect_false (!prev))
1411 {
1412 cctx_current->idle_sp = STACKLEVEL;
1413 assert (cctx_current->idle_te = PL_top_env); /* just for the side-effect when asserts are enabled */
1414 }
1415 else if (expect_true (prev != next))
1416 {
1417 coro_cctx *cctx_prev;
1418
1419 if (expect_false (prev->flags & CF_NEW))
1420 {
1421 /* create a new empty/source context */
1422 prev->flags &= ~CF_NEW;
1423 prev->flags |= CF_RUNNING;
1424 }
1425
1426 prev->flags &= ~CF_RUNNING;
1427 next->flags |= CF_RUNNING;
1428
1429 /* first get rid of the old state */
1430 save_perl (aTHX_ prev);
1431
1432 if (expect_false (next->flags & CF_NEW))
1433 {
1434 /* need to start coroutine */
1435 next->flags &= ~CF_NEW;
1436 /* setup coroutine call */
1437 coro_setup (aTHX_ next);
1438 }
1439 else
1440 load_perl (aTHX_ next);
1441
1442 /* possibly untie and reuse the cctx */
1443 if (expect_true (
1444 cctx_current->idle_sp == STACKLEVEL
1445 && !(cctx_current->flags & CC_TRACE)
1446 && !force_cctx
1447 ))
1448 {
1449 /* I assume that stacklevel is a stronger indicator than PL_top_env changes */
1450 assert (("FATAL: current top_env must equal previous top_env in Coro (please report)", PL_top_env == cctx_current->idle_te));
1451
1452 /* if the cctx is about to be destroyed we need to make sure we won't see it in cctx_get. */
1453 /* without this the next cctx_get might destroy the running cctx while still in use */
1454 if (expect_false (CCTX_EXPIRED (cctx_current)))
1455 if (expect_true (!next->cctx))
1456 next->cctx = cctx_get (aTHX);
1457
1458 cctx_put (cctx_current);
1459 }
1460 else
1461 prev->cctx = cctx_current;
1462
1463 ++next->usecount;
1464
1465 cctx_prev = cctx_current;
1466 cctx_current = expect_false (next->cctx) ? next->cctx : cctx_get (aTHX);
1467
1468 next->cctx = 0;
1469
1470 if (expect_false (cctx_prev != cctx_current))
1471 {
1472 cctx_prev->top_env = PL_top_env;
1473 PL_top_env = cctx_current->top_env;
1474 coro_transfer (&cctx_prev->cctx, &cctx_current->cctx);
1475 }
1476
1477 transfer_tail (aTHX);
1478 }
1479}
1480
1481#define TRANSFER(ta, force_cctx) transfer (aTHX_ (ta).prev, (ta).next, (force_cctx))
1482#define TRANSFER_CHECK(ta) transfer_check (aTHX_ (ta).prev, (ta).next)
1483
1484/** high level stuff ********************************************************/
1485
1486static int
1487coro_state_destroy (pTHX_ struct coro *coro)
1488{
1489 if (coro->flags & CF_DESTROYED)
1490 return 0;
1491
1492 if (coro->on_destroy && !PL_dirty)
1493 coro->on_destroy (aTHX_ coro);
1494
1495 coro->flags |= CF_DESTROYED;
1496
1497 if (coro->flags & CF_READY)
1498 {
1499 /* reduce nready, as destroying a ready coro effectively unreadies it */
1500 /* alternative: look through all ready queues and remove the coro */
1501 --coro_nready;
1502 }
1503 else
1504 coro->flags |= CF_READY; /* make sure it is NOT put into the readyqueue */
1505
1506 if (coro->mainstack
1507 && coro->mainstack != main_mainstack
1508 && coro->slot
1509 && !PL_dirty)
1510 {
1511 struct coro *current = SvSTATE_current;
1512
1513 assert (("FATAL: tried to destroy currently running coroutine", coro->mainstack != PL_mainstack));
1514
1515 save_perl (aTHX_ current);
1516 load_perl (aTHX_ coro);
1517
1518 coro_destruct_perl (aTHX_ coro);
1519
1520 load_perl (aTHX_ current);
1521
1522 coro->slot = 0;
1523 }
1524
1525 cctx_destroy (coro->cctx);
1526 SvREFCNT_dec (coro->startcv);
1527 SvREFCNT_dec (coro->args);
1528 SvREFCNT_dec (CORO_THROW);
1529
1530 if (coro->next) coro->next->prev = coro->prev;
1531 if (coro->prev) coro->prev->next = coro->next;
1532 if (coro == coro_first) coro_first = coro->next;
1533
1534 return 1;
1535}
1536
1537static int
1538coro_state_free (pTHX_ SV *sv, MAGIC *mg)
1539{
1540 struct coro *coro = (struct coro *)mg->mg_ptr;
1541 mg->mg_ptr = 0;
1542
1543 coro->hv = 0;
1544
1545 if (--coro->refcnt < 0)
1546 {
1547 coro_state_destroy (aTHX_ coro);
1548 Safefree (coro);
1549 }
1550
1551 return 0;
1552}
1553
1554static int
1555coro_state_dup (pTHX_ MAGIC *mg, CLONE_PARAMS *params)
1556{
1557 struct coro *coro = (struct coro *)mg->mg_ptr;
1558
1559 ++coro->refcnt;
1560
1561 return 0;
1562}
1563
1564static MGVTBL coro_state_vtbl = {
1565 0, 0, 0, 0,
1566 coro_state_free,
1567 0,
1568#ifdef MGf_DUP
1569 coro_state_dup,
1570#else
1571# define MGf_DUP 0
1572#endif
1573};
1574
1575static void
1576prepare_transfer (pTHX_ struct coro_transfer_args *ta, SV *prev_sv, SV *next_sv)
1577{
1578 ta->prev = SvSTATE (prev_sv);
1579 ta->next = SvSTATE (next_sv);
1580 TRANSFER_CHECK (*ta);
1581}
1582
1583static void
1584api_transfer (pTHX_ SV *prev_sv, SV *next_sv)
1585{
1586 struct coro_transfer_args ta;
1587
1588 prepare_transfer (aTHX_ &ta, prev_sv, next_sv);
1589 TRANSFER (ta, 1);
1590}
1591
1592/*****************************************************************************/
1593/* gensub: simple closure generation utility */
1594
1595#define GENSUB_ARG CvXSUBANY (cv).any_ptr
1596
1597/* create a closure from XS, returns a code reference */
1598/* the arg can be accessed via GENSUB_ARG from the callback */
1599/* the callback must use dXSARGS/XSRETURN */
1600static SV *
1601gensub (pTHX_ void (*xsub)(pTHX_ CV *), void *arg)
1602{
1603 CV *cv = (CV *)newSV (0);
1604
1605 sv_upgrade ((SV *)cv, SVt_PVCV);
1606
1607 CvANON_on (cv);
1608 CvISXSUB_on (cv);
1609 CvXSUB (cv) = xsub;
1610 GENSUB_ARG = arg;
1611
1612 return newRV_noinc ((SV *)cv);
1613}
1614
1615/** Coro ********************************************************************/
1616
1617INLINE void
1618coro_enq (pTHX_ struct coro *coro)
1619{
1620 struct coro **ready = coro_ready [coro->prio - PRIO_MIN];
1621
1622 SvREFCNT_inc_NN (coro->hv);
1623
1624 coro->next_ready = 0;
1625 *(ready [0] ? &ready [1]->next_ready : &ready [0]) = coro;
1626 ready [1] = coro;
1627}
1628
1629INLINE struct coro *
1630coro_deq (pTHX)
1631{
1632 int prio;
1633
1634 for (prio = PRIO_MAX - PRIO_MIN + 1; --prio >= 0; )
1635 {
1636 struct coro **ready = coro_ready [prio];
1637
1638 if (ready [0])
1639 {
1640 struct coro *coro = ready [0];
1641 ready [0] = coro->next_ready;
1642 return coro;
1643 }
1644 }
1645
1646 return 0;
1647}
1648
1649static int
1650api_ready (pTHX_ SV *coro_sv)
1651{
1652 struct coro *coro;
1653 SV *sv_hook;
1654 void (*xs_hook)(void);
1655
1656 coro = SvSTATE (coro_sv);
1657
1658 if (coro->flags & CF_READY)
1659 return 0;
1660
1661 coro->flags |= CF_READY;
1662
1663 sv_hook = coro_nready ? 0 : coro_readyhook;
1664 xs_hook = coro_nready ? 0 : coroapi.readyhook;
1665
1666 coro_enq (aTHX_ coro);
1667 ++coro_nready;
1668
1669 if (sv_hook)
1670 {
1671 dSP;
1672
1673 ENTER;
1674 SAVETMPS;
1675
1676 PUSHMARK (SP);
1677 PUTBACK;
1678 call_sv (sv_hook, G_VOID | G_DISCARD);
1679
1680 FREETMPS;
1681 LEAVE;
1682 }
1683
1684 if (xs_hook)
1685 xs_hook ();
1686
1687 return 1;
1688}
1689
1690static int
1691api_is_ready (pTHX_ SV *coro_sv)
1692{
1693 return !!(SvSTATE (coro_sv)->flags & CF_READY);
1694}
1695
1696/* expects to own a reference to next->hv */
1697INLINE void
1698prepare_schedule_to (pTHX_ struct coro_transfer_args *ta, struct coro *next)
1699{
1700 SV *prev_sv = SvRV (coro_current);
1701
1702 ta->prev = SvSTATE_hv (prev_sv);
1703 ta->next = next;
1704
1705 TRANSFER_CHECK (*ta);
1706
1707 SvRV_set (coro_current, (SV *)next->hv);
1708
1709 free_coro_mortal (aTHX);
1710 coro_mortal = prev_sv;
1711}
1712
1713static void
1714prepare_schedule (pTHX_ struct coro_transfer_args *ta)
1715{
1716 for (;;)
1717 {
1718 struct coro *next = coro_deq (aTHX);
1719
1720 if (expect_true (next))
1721 {
1722 /* cannot transfer to destroyed coros, skip and look for next */
1723 if (expect_false (next->flags & (CF_DESTROYED | CF_SUSPENDED)))
1724 SvREFCNT_dec (next->hv); /* coro_nready has already been taken care of by destroy */
1725 else
1726 {
1727 next->flags &= ~CF_READY;
1728 --coro_nready;
1729
1730 prepare_schedule_to (aTHX_ ta, next);
1731 break;
1732 }
1733 }
1734 else
1735 {
1736 /* nothing to schedule: call the idle handler */
1737 if (SvROK (sv_idle)
1738 && SvOBJECT (SvRV (sv_idle)))
1739 {
1740 ++coro_nready; /* hack so that api_ready doesn't invoke ready hook */
1741 api_ready (aTHX_ SvRV (sv_idle));
1742 --coro_nready;
1743 }
1744 else
1745 {
1746 dSP;
1747
1748 ENTER;
1749 SAVETMPS;
1750
1751 PUSHMARK (SP);
1752 PUTBACK;
1753 call_sv (sv_idle, G_VOID | G_DISCARD);
1754
1755 FREETMPS;
1756 LEAVE;
1757 }
1758 }
1759 }
1760}
1761
1762INLINE void
1763prepare_cede (pTHX_ struct coro_transfer_args *ta)
1764{
1765 api_ready (aTHX_ coro_current);
1766 prepare_schedule (aTHX_ ta);
1767}
1768
1769INLINE void
1770prepare_cede_notself (pTHX_ struct coro_transfer_args *ta)
1771{
1772 SV *prev = SvRV (coro_current);
1773
1774 if (coro_nready)
1775 {
1776 prepare_schedule (aTHX_ ta);
1777 api_ready (aTHX_ prev);
1778 }
1779 else
1780 prepare_nop (aTHX_ ta);
1781}
1782
1783static void
1784api_schedule (pTHX)
1785{
1786 struct coro_transfer_args ta;
1787
1788 prepare_schedule (aTHX_ &ta);
1789 TRANSFER (ta, 1);
1790}
1791
1792static void
1793api_schedule_to (pTHX_ SV *coro_sv)
1794{
1795 struct coro_transfer_args ta;
1796 struct coro *next = SvSTATE (coro_sv);
1797
1798 SvREFCNT_inc_NN (coro_sv);
1799 prepare_schedule_to (aTHX_ &ta, next);
1800}
1801
1802static int
1803api_cede (pTHX)
1804{
1805 struct coro_transfer_args ta;
1806
1807 prepare_cede (aTHX_ &ta);
1808
1809 if (expect_true (ta.prev != ta.next))
1810 {
1811 TRANSFER (ta, 1);
1812 return 1;
1813 }
1814 else
1815 return 0;
1816}
1817
1818static int
1819api_cede_notself (pTHX)
1820{
1821 if (coro_nready)
1822 {
1823 struct coro_transfer_args ta;
1824
1825 prepare_cede_notself (aTHX_ &ta);
1826 TRANSFER (ta, 1);
1827 return 1;
1828 }
1829 else
1830 return 0;
1831}
1832
1833static void
1834api_trace (pTHX_ SV *coro_sv, int flags)
1835{
1836 struct coro *coro = SvSTATE (coro_sv);
1837
1838 if (coro->flags & CF_RUNNING)
1839 croak ("cannot enable tracing on a running coroutine, caught");
1840
1841 if (flags & CC_TRACE)
1842 {
1843 if (!coro->cctx)
1844 coro->cctx = cctx_new_run ();
1845 else if (!(coro->cctx->flags & CC_TRACE))
1846 croak ("cannot enable tracing on coroutine with custom stack, caught");
1847
1848 coro->cctx->flags |= CC_NOREUSE | (flags & (CC_TRACE | CC_TRACE_ALL));
1849 }
1850 else if (coro->cctx && coro->cctx->flags & CC_TRACE)
1851 {
1852 coro->cctx->flags &= ~(CC_TRACE | CC_TRACE_ALL);
1853
1854 if (coro->flags & CF_RUNNING)
1855 PL_runops = RUNOPS_DEFAULT;
1856 else
1857 coro->slot->runops = RUNOPS_DEFAULT;
1858 }
1859}
1860
1861static void
1862coro_call_on_destroy (pTHX_ struct coro *coro)
1863{
1864 SV **on_destroyp = hv_fetch (coro->hv, "_on_destroy", sizeof ("_on_destroy") - 1, 0);
1865 SV **statusp = hv_fetch (coro->hv, "_status", sizeof ("_status") - 1, 0);
1866
1867 if (on_destroyp)
1868 {
1869 AV *on_destroy = (AV *)SvRV (*on_destroyp);
1870
1871 while (AvFILLp (on_destroy) >= 0)
1872 {
1873 dSP; /* don't disturb outer sp */
1874 SV *cb = av_pop (on_destroy);
1875
1876 PUSHMARK (SP);
1877
1878 if (statusp)
1879 {
1880 int i;
1881 AV *status = (AV *)SvRV (*statusp);
1882 EXTEND (SP, AvFILLp (status) + 1);
1883
1884 for (i = 0; i <= AvFILLp (status); ++i)
1885 PUSHs (AvARRAY (status)[i]);
1886 }
1887
1888 PUTBACK;
1889 call_sv (sv_2mortal (cb), G_VOID | G_DISCARD);
1890 }
1891 }
1892}
1893
1894static void
1895slf_init_terminate (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
1896{
1897 int i;
1898 HV *hv = (HV *)SvRV (coro_current);
1899 AV *av = newAV ();
1900
1901 av_extend (av, items - 1);
1902 for (i = 0; i < items; ++i)
1903 av_push (av, SvREFCNT_inc_NN (arg [i]));
1904
1905 hv_store (hv, "_status", sizeof ("_status") - 1, newRV_noinc ((SV *)av), 0);
1906
1907 av_push (av_destroy, (SV *)newRV_inc ((SV *)hv)); /* RVinc for perl */
1908 api_ready (aTHX_ sv_manager);
1909
1910 frame->prepare = prepare_schedule;
1911 frame->check = slf_check_repeat;
1912
1913 /* as a minor optimisation, we could unwind all stacks here */
1914 /* but that puts extra pressure on pp_slf, and is not worth much */
1915 /*coro_unwind_stacks (aTHX);*/
1916}
1917
1918/*****************************************************************************/
1919/* async pool handler */
1920
1921static int
1922slf_check_pool_handler (pTHX_ struct CoroSLF *frame)
1923{
1924 HV *hv = (HV *)SvRV (coro_current);
1925 struct coro *coro = (struct coro *)frame->data;
1926
1927 if (!coro->invoke_cb)
1928 return 1; /* loop till we have invoke */
1929 else
1930 {
1931 hv_store (hv, "desc", sizeof ("desc") - 1,
1932 newSVpvn ("[async_pool]", sizeof ("[async_pool]") - 1), 0);
1933
1934 coro->saved_deffh = SvREFCNT_inc_NN ((SV *)PL_defoutgv);
1935
1936 {
1937 dSP;
1938 XPUSHs (sv_2mortal (coro->invoke_cb)); coro->invoke_cb = 0;
1939 PUTBACK;
1940 }
1941
1942 SvREFCNT_dec (GvAV (PL_defgv));
1943 GvAV (PL_defgv) = coro->invoke_av;
1944 coro->invoke_av = 0;
1945
1946 return 0;
1947 }
1948}
1949
1950static void
1951slf_init_pool_handler (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
1952{
1953 HV *hv = (HV *)SvRV (coro_current);
1954 struct coro *coro = SvSTATE_hv ((SV *)hv);
1955
1956 if (expect_true (coro->saved_deffh))
1957 {
1958 /* subsequent iteration */
1959 SvREFCNT_dec ((SV *)PL_defoutgv); PL_defoutgv = (GV *)coro->saved_deffh;
1960 coro->saved_deffh = 0;
1961
1962 if (coro_rss (aTHX_ coro) > SvUV (sv_pool_rss)
1963 || av_len (av_async_pool) + 1 >= SvIV (sv_pool_size))
1964 {
1965 coro->invoke_cb = SvREFCNT_inc_NN ((SV *)cv_coro_terminate);
1966 coro->invoke_av = newAV ();
1967
1968 frame->prepare = prepare_nop;
1969 }
1970 else
1971 {
1972 av_clear (GvAV (PL_defgv));
1973 hv_store (hv, "desc", sizeof ("desc") - 1, SvREFCNT_inc_NN (sv_async_pool_idle), 0);
1974
1975 coro->prio = 0;
1976
1977 if (coro->cctx && (coro->cctx->flags & CC_TRACE))
1978 api_trace (aTHX_ coro_current, 0);
1979
1980 frame->prepare = prepare_schedule;
1981 av_push (av_async_pool, SvREFCNT_inc (hv));
1982 }
1983 }
1984 else
1985 {
1986 /* first iteration, simply fall through */
1987 frame->prepare = prepare_nop;
1988 }
1989
1990 frame->check = slf_check_pool_handler;
1991 frame->data = (void *)coro;
1992}
1993
1994/*****************************************************************************/
1995/* rouse callback */
1996
1997#define CORO_MAGIC_type_rouse PERL_MAGIC_ext
1998
1999static void
2000coro_rouse_callback (pTHX_ CV *cv)
2001{
2002 dXSARGS;
2003 SV *data = (SV *)GENSUB_ARG;
2004
2005 if (SvTYPE (SvRV (data)) != SVt_PVAV)
2006 {
2007 /* first call, set args */
2008 SV *coro = SvRV (data);
2009 AV *av = newAV ();
2010
2011 SvRV_set (data, (SV *)av);
2012
2013 /* better take a full copy of the arguments */
2014 while (items--)
2015 av_store (av, items, newSVsv (ST (items)));
2016
2017 api_ready (aTHX_ coro);
2018 SvREFCNT_dec (coro);
2019 }
2020
2021 XSRETURN_EMPTY;
2022}
2023
2024static int
2025slf_check_rouse_wait (pTHX_ struct CoroSLF *frame)
2026{
2027 SV *data = (SV *)frame->data;
2028
2029 if (CORO_THROW)
2030 return 0;
2031
2032 if (SvTYPE (SvRV (data)) != SVt_PVAV)
2033 return 1;
2034
2035 /* now push all results on the stack */
2036 {
2037 dSP;
2038 AV *av = (AV *)SvRV (data);
2039 int i;
2040
2041 EXTEND (SP, AvFILLp (av) + 1);
2042 for (i = 0; i <= AvFILLp (av); ++i)
2043 PUSHs (sv_2mortal (AvARRAY (av)[i]));
2044
2045 /* we have stolen the elements, so set length to zero and free */
2046 AvFILLp (av) = -1;
2047 av_undef (av);
2048
2049 PUTBACK;
2050 }
2051
2052 return 0;
2053}
2054
2055static void
2056slf_init_rouse_wait (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2057{
2058 SV *cb;
2059
2060 if (items)
2061 cb = arg [0];
2062 else
2063 {
2064 struct coro *coro = SvSTATE_current;
2065
2066 if (!coro->rouse_cb)
2067 croak ("Coro::rouse_wait called without rouse callback, and no default rouse callback found either,");
2068
2069 cb = sv_2mortal (coro->rouse_cb);
2070 coro->rouse_cb = 0;
2071 }
2072
2073 if (!SvROK (cb)
2074 || SvTYPE (SvRV (cb)) != SVt_PVCV
2075 || CvXSUB ((CV *)SvRV (cb)) != coro_rouse_callback)
2076 croak ("Coro::rouse_wait called with illegal callback argument,");
2077
2078 {
2079 CV *cv = (CV *)SvRV (cb); /* for GENSUB_ARG */
2080 SV *data = (SV *)GENSUB_ARG;
2081
2082 frame->data = (void *)data;
2083 frame->prepare = SvTYPE (SvRV (data)) == SVt_PVAV ? prepare_nop : prepare_schedule;
2084 frame->check = slf_check_rouse_wait;
2085 }
2086}
2087
2088static SV *
2089coro_new_rouse_cb (pTHX)
2090{
2091 HV *hv = (HV *)SvRV (coro_current);
2092 struct coro *coro = SvSTATE_hv (hv);
2093 SV *data = newRV_inc ((SV *)hv);
2094 SV *cb = gensub (aTHX_ coro_rouse_callback, (void *)data);
2095
2096 sv_magicext (SvRV (cb), data, CORO_MAGIC_type_rouse, 0, 0, 0);
2097 SvREFCNT_dec (data); /* magicext increases the refcount */
2098
2099 SvREFCNT_dec (coro->rouse_cb);
2100 coro->rouse_cb = SvREFCNT_inc_NN (cb);
2101
2102 return cb;
2103}
2104
2105/*****************************************************************************/
2106/* schedule-like-function opcode (SLF) */
2107
2108static UNOP slf_restore; /* restore stack as entersub did, for first-re-run */
2109static const CV *slf_cv;
2110static SV **slf_argv;
2111static int slf_argc, slf_arga; /* count, allocated */
2112static I32 slf_ax; /* top of stack, for restore */
2113
2114/* this restores the stack in the case we patched the entersub, to */
2115/* recreate the stack frame as perl will on following calls */
2116/* since entersub cleared the stack */
2117static OP *
2118pp_restore (pTHX)
2119{
2120 int i;
2121 SV **SP = PL_stack_base + slf_ax;
2122
2123 PUSHMARK (SP);
2124
2125 EXTEND (SP, slf_argc + 1);
2126
2127 for (i = 0; i < slf_argc; ++i)
2128 PUSHs (sv_2mortal (slf_argv [i]));
2129
2130 PUSHs ((SV *)CvGV (slf_cv));
2131
2132 RETURNOP (slf_restore.op_first);
2133}
2134
2135static void
2136slf_prepare_transfer (pTHX_ struct coro_transfer_args *ta)
2137{
2138 SV **arg = (SV **)slf_frame.data;
2139
2140 prepare_transfer (aTHX_ ta, arg [0], arg [1]);
2141}
2142
2143static void
2144slf_init_transfer (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2145{
2146 if (items != 2)
2147 croak ("Coro::State::transfer (prev, next) expects two arguments, not %d,", items);
2148
2149 frame->prepare = slf_prepare_transfer;
2150 frame->check = slf_check_nop;
2151 frame->data = (void *)arg; /* let's hope it will stay valid */
2152}
2153
2154static void
2155slf_init_schedule (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2156{
2157 frame->prepare = prepare_schedule;
2158 frame->check = slf_check_nop;
2159}
2160
2161static void
2162slf_prepare_schedule_to (pTHX_ struct coro_transfer_args *ta)
2163{
2164 struct coro *next = (struct coro *)slf_frame.data;
2165
2166 SvREFCNT_inc_NN (next->hv);
2167 prepare_schedule_to (aTHX_ ta, next);
2168}
2169
2170static void
2171slf_init_schedule_to (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2172{
2173 if (!items)
2174 croak ("Coro::schedule_to expects a coroutine argument, caught");
2175
2176 frame->data = (void *)SvSTATE (arg [0]);
2177 frame->prepare = slf_prepare_schedule_to;
2178 frame->check = slf_check_nop;
2179}
2180
2181static void
2182slf_init_cede_to (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2183{
2184 api_ready (aTHX_ SvRV (coro_current));
2185
2186 slf_init_schedule_to (aTHX_ frame, cv, arg, items);
2187}
2188
2189static void
2190slf_init_cede (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2191{
2192 frame->prepare = prepare_cede;
2193 frame->check = slf_check_nop;
2194}
2195
2196static void
2197slf_init_cede_notself (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2198{
2199 frame->prepare = prepare_cede_notself;
2200 frame->check = slf_check_nop;
2201}
2202
2203/*
2204 * these not obviously related functions are all rolled into one
2205 * function to increase chances that they all will call transfer with the same
2206 * stack offset
2207 * SLF stands for "schedule-like-function".
2208 */
2209static OP *
2210pp_slf (pTHX)
2211{
2212 I32 checkmark; /* mark SP to see how many elements check has pushed */
2213
2214 /* set up the slf frame, unless it has already been set-up */
2215 /* the latter happens when a new coro has been started */
2216 /* or when a new cctx was attached to an existing coroutine */
2217 if (expect_true (!slf_frame.prepare))
2218 {
2219 /* first iteration */
2220 dSP;
2221 SV **arg = PL_stack_base + TOPMARK + 1;
2222 int items = SP - arg; /* args without function object */
2223 SV *gv = *sp;
2224
2225 /* do a quick consistency check on the "function" object, and if it isn't */
2226 /* for us, divert to the real entersub */
2227 if (SvTYPE (gv) != SVt_PVGV
2228 || !GvCV (gv)
2229 || !(CvFLAGS (GvCV (gv)) & CVf_SLF))
2230 return PL_ppaddr[OP_ENTERSUB](aTHX);
2231
2232 if (!(PL_op->op_flags & OPf_STACKED))
2233 {
2234 /* ampersand-form of call, use @_ instead of stack */
2235 AV *av = GvAV (PL_defgv);
2236 arg = AvARRAY (av);
2237 items = AvFILLp (av) + 1;
2238 }
2239
2240 /* now call the init function, which needs to set up slf_frame */
2241 ((coro_slf_cb)CvXSUBANY (GvCV (gv)).any_ptr)
2242 (aTHX_ &slf_frame, GvCV (gv), arg, items);
2243
2244 /* pop args */
2245 SP = PL_stack_base + POPMARK;
2246
2247 PUTBACK;
2248 }
2249
2250 /* now that we have a slf_frame, interpret it! */
2251 /* we use a callback system not to make the code needlessly */
2252 /* complicated, but so we can run multiple perl coros from one cctx */
2253
2254 do
2255 {
2256 struct coro_transfer_args ta;
2257
2258 slf_frame.prepare (aTHX_ &ta);
2259 TRANSFER (ta, 0);
2260
2261 checkmark = PL_stack_sp - PL_stack_base;
2262 }
2263 while (slf_frame.check (aTHX_ &slf_frame));
2264
2265 slf_frame.prepare = 0; /* invalidate the frame, we are done processing it */
2266
2267 /* exception handling */
2268 if (expect_false (CORO_THROW))
2269 {
2270 SV *exception = sv_2mortal (CORO_THROW);
2271
2272 CORO_THROW = 0;
2273 sv_setsv (ERRSV, exception);
2274 croak (0);
2275 }
2276
2277 /* return value handling - mostly like entersub */
2278 /* make sure we put something on the stack in scalar context */
2279 if (GIMME_V == G_SCALAR)
2280 {
2281 dSP;
2282 SV **bot = PL_stack_base + checkmark;
2283
2284 if (sp == bot) /* too few, push undef */
2285 bot [1] = &PL_sv_undef;
2286 else if (sp != bot + 1) /* too many, take last one */
2287 bot [1] = *sp;
2288
2289 SP = bot + 1;
2290
2291 PUTBACK;
2292 }
2293
2294 return NORMAL;
2295}
2296
2297static void
2298api_execute_slf (pTHX_ CV *cv, coro_slf_cb init_cb, I32 ax)
2299{
2300 int i;
2301 SV **arg = PL_stack_base + ax;
2302 int items = PL_stack_sp - arg + 1;
2303
2304 assert (("FATAL: SLF call with illegal CV value", !CvANON (cv)));
2305
2306 if (PL_op->op_ppaddr != PL_ppaddr [OP_ENTERSUB]
2307 && PL_op->op_ppaddr != pp_slf)
2308 croak ("FATAL: Coro SLF calls can only be made normally, not via goto or any other means, caught");
2309
2310 CvFLAGS (cv) |= CVf_SLF;
2311 CvXSUBANY (cv).any_ptr = (void *)init_cb;
2312 slf_cv = cv;
2313
2314 /* we patch the op, and then re-run the whole call */
2315 /* we have to put the same argument on the stack for this to work */
2316 /* and this will be done by pp_restore */
2317 slf_restore.op_next = (OP *)&slf_restore;
2318 slf_restore.op_type = OP_CUSTOM;
2319 slf_restore.op_ppaddr = pp_restore;
2320 slf_restore.op_first = PL_op;
2321
2322 slf_ax = ax - 1; /* undo the ax++ inside dAXMARK */
2323
2324 if (PL_op->op_flags & OPf_STACKED)
2325 {
2326 if (items > slf_arga)
2327 {
2328 slf_arga = items;
2329 free (slf_argv);
2330 slf_argv = malloc (slf_arga * sizeof (SV *));
2331 }
2332
2333 slf_argc = items;
2334
2335 for (i = 0; i < items; ++i)
2336 slf_argv [i] = SvREFCNT_inc (arg [i]);
2337 }
2338 else
2339 slf_argc = 0;
2340
2341 PL_op->op_ppaddr = pp_slf;
2342 /*PL_op->op_type = OP_CUSTOM; /* we do behave like entersub still */
2343
2344 PL_op = (OP *)&slf_restore;
2345}
2346
2347/*****************************************************************************/
2348/* dynamic wind */
2349
2350static void
2351on_enterleave_call (pTHX_ SV *cb)
2352{
2353 dSP;
2354
2355 PUSHSTACK;
2356
2357 PUSHMARK (SP);
2358 PUTBACK;
2359 call_sv (cb, G_VOID | G_DISCARD);
2360 SPAGAIN;
2361
2362 POPSTACK;
2363}
2364
2365static SV *
2366coro_avp_pop_and_free (pTHX_ AV **avp)
2367{
2368 AV *av = *avp;
2369 SV *res = av_pop (av);
2370
2371 if (AvFILLp (av) < 0)
2372 {
2373 *avp = 0;
2374 SvREFCNT_dec (av);
2375 }
2376
2377 return res;
2378}
2379
2380static void
2381coro_pop_on_enter (pTHX_ void *coro)
2382{
2383 SV *cb = coro_avp_pop_and_free (aTHX_ &((struct coro *)coro)->on_enter);
2384 SvREFCNT_dec (cb);
2385}
2386
2387static void
2388coro_pop_on_leave (pTHX_ void *coro)
2389{
2390 SV *cb = coro_avp_pop_and_free (aTHX_ &((struct coro *)coro)->on_leave);
2391 on_enterleave_call (aTHX_ sv_2mortal (cb));
2392}
2393
2394/*****************************************************************************/
2395/* PerlIO::cede */
2396
2397typedef struct
2398{
2399 PerlIOBuf base;
2400 NV next, every;
2401} PerlIOCede;
2402
2403static IV
2404PerlIOCede_pushed (pTHX_ PerlIO *f, const char *mode, SV *arg, PerlIO_funcs *tab)
2405{
2406 PerlIOCede *self = PerlIOSelf (f, PerlIOCede);
2407
2408 self->every = SvCUR (arg) ? SvNV (arg) : 0.01;
2409 self->next = nvtime () + self->every;
2410
2411 return PerlIOBuf_pushed (aTHX_ f, mode, Nullsv, tab);
2412}
2413
2414static SV *
2415PerlIOCede_getarg (pTHX_ PerlIO *f, CLONE_PARAMS *param, int flags)
2416{
2417 PerlIOCede *self = PerlIOSelf (f, PerlIOCede);
2418
2419 return newSVnv (self->every);
2420}
2421
2422static IV
2423PerlIOCede_flush (pTHX_ PerlIO *f)
2424{
2425 PerlIOCede *self = PerlIOSelf (f, PerlIOCede);
2426 double now = nvtime ();
2427
2428 if (now >= self->next)
2429 {
2430 api_cede (aTHX);
2431 self->next = now + self->every;
2432 }
2433
2434 return PerlIOBuf_flush (aTHX_ f);
2435}
2436
2437static PerlIO_funcs PerlIO_cede =
2438{
2439 sizeof(PerlIO_funcs),
2440 "cede",
2441 sizeof(PerlIOCede),
2442 PERLIO_K_DESTRUCT | PERLIO_K_RAW,
2443 PerlIOCede_pushed,
2444 PerlIOBuf_popped,
2445 PerlIOBuf_open,
2446 PerlIOBase_binmode,
2447 PerlIOCede_getarg,
2448 PerlIOBase_fileno,
2449 PerlIOBuf_dup,
2450 PerlIOBuf_read,
2451 PerlIOBuf_unread,
2452 PerlIOBuf_write,
2453 PerlIOBuf_seek,
2454 PerlIOBuf_tell,
2455 PerlIOBuf_close,
2456 PerlIOCede_flush,
2457 PerlIOBuf_fill,
2458 PerlIOBase_eof,
2459 PerlIOBase_error,
2460 PerlIOBase_clearerr,
2461 PerlIOBase_setlinebuf,
2462 PerlIOBuf_get_base,
2463 PerlIOBuf_bufsiz,
2464 PerlIOBuf_get_ptr,
2465 PerlIOBuf_get_cnt,
2466 PerlIOBuf_set_ptrcnt,
2467};
2468
2469/*****************************************************************************/
2470/* Coro::Semaphore & Coro::Signal */
2471
2472static SV *
2473coro_waitarray_new (pTHX_ int count)
2474{
2475 /* a semaphore contains a counter IV in $sem->[0] and any waiters after that */
2476 AV *av = newAV ();
2477 SV **ary;
2478
2479 /* unfortunately, building manually saves memory */
2480 Newx (ary, 2, SV *);
2481 AvALLOC (av) = ary;
2482#if PERL_VERSION_ATLEAST (5,10,0)
2483 AvARRAY (av) = ary;
2484#else
2485 /* 5.8.8 needs this syntax instead of AvARRAY = ary, yet */
2486 /* -DDEBUGGING flags this as a bug, despite it perfectly working */
2487 SvPVX ((SV *)av) = (char *)ary;
2488#endif
2489 AvMAX (av) = 1;
2490 AvFILLp (av) = 0;
2491 ary [0] = newSViv (count);
2492
2493 return newRV_noinc ((SV *)av);
2494}
2495
2496/* semaphore */
2497
2498static void
2499coro_semaphore_adjust (pTHX_ AV *av, IV adjust)
2500{
2501 SV *count_sv = AvARRAY (av)[0];
2502 IV count = SvIVX (count_sv);
2503
2504 count += adjust;
2505 SvIVX (count_sv) = count;
2506
2507 /* now wake up as many waiters as are expected to lock */
2508 while (count > 0 && AvFILLp (av) > 0)
2509 {
2510 SV *cb;
2511
2512 /* swap first two elements so we can shift a waiter */
2513 AvARRAY (av)[0] = AvARRAY (av)[1];
2514 AvARRAY (av)[1] = count_sv;
2515 cb = av_shift (av);
2516
2517 if (SvOBJECT (cb))
2518 {
2519 api_ready (aTHX_ cb);
2520 --count;
2521 }
2522 else if (SvTYPE (cb) == SVt_PVCV)
2523 {
2524 dSP;
2525 PUSHMARK (SP);
2526 XPUSHs (sv_2mortal (newRV_inc ((SV *)av)));
2527 PUTBACK;
2528 call_sv (cb, G_VOID | G_DISCARD | G_EVAL | G_KEEPERR);
2529 }
2530
2531 SvREFCNT_dec (cb);
2532 }
2533}
2534
2535static void
2536coro_semaphore_on_destroy (pTHX_ struct coro *coro)
2537{
2538 /* call $sem->adjust (0) to possibly wake up some other waiters */
2539 coro_semaphore_adjust (aTHX_ (AV *)coro->slf_frame.data, 0);
2540}
2541
2542static int
2543slf_check_semaphore_down_or_wait (pTHX_ struct CoroSLF *frame, int acquire)
2544{
2545 AV *av = (AV *)frame->data;
2546 SV *count_sv = AvARRAY (av)[0];
2547
2548 /* if we are about to throw, don't actually acquire the lock, just throw */
2549 if (CORO_THROW)
2550 return 0;
2551 else if (SvIVX (count_sv) > 0)
2552 {
2553 SvSTATE_current->on_destroy = 0;
2554
2555 if (acquire)
2556 SvIVX (count_sv) = SvIVX (count_sv) - 1;
2557 else
2558 coro_semaphore_adjust (aTHX_ av, 0);
2559
2560 return 0;
2561 }
2562 else
2563 {
2564 int i;
2565 /* if we were woken up but can't down, we look through the whole */
2566 /* waiters list and only add us if we aren't in there already */
2567 /* this avoids some degenerate memory usage cases */
2568
2569 for (i = 1; i <= AvFILLp (av); ++i)
2570 if (AvARRAY (av)[i] == SvRV (coro_current))
2571 return 1;
2572
2573 av_push (av, SvREFCNT_inc (SvRV (coro_current)));
2574 return 1;
2575 }
2576}
2577
2578static int
2579slf_check_semaphore_down (pTHX_ struct CoroSLF *frame)
2580{
2581 return slf_check_semaphore_down_or_wait (aTHX_ frame, 1);
2582}
2583
2584static int
2585slf_check_semaphore_wait (pTHX_ struct CoroSLF *frame)
2586{
2587 return slf_check_semaphore_down_or_wait (aTHX_ frame, 0);
2588}
2589
2590static void
2591slf_init_semaphore_down_or_wait (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2592{
2593 AV *av = (AV *)SvRV (arg [0]);
2594
2595 if (SvIVX (AvARRAY (av)[0]) > 0)
2596 {
2597 frame->data = (void *)av;
2598 frame->prepare = prepare_nop;
2599 }
2600 else
2601 {
2602 av_push (av, SvREFCNT_inc (SvRV (coro_current)));
2603
2604 frame->data = (void *)sv_2mortal (SvREFCNT_inc ((SV *)av));
2605 frame->prepare = prepare_schedule;
2606
2607 /* to avoid race conditions when a woken-up coro gets terminated */
2608 /* we arrange for a temporary on_destroy that calls adjust (0) */
2609 SvSTATE_current->on_destroy = coro_semaphore_on_destroy;
2610 }
2611}
2612
2613static void
2614slf_init_semaphore_down (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2615{
2616 slf_init_semaphore_down_or_wait (aTHX_ frame, cv, arg, items);
2617 frame->check = slf_check_semaphore_down;
2618}
2619
2620static void
2621slf_init_semaphore_wait (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2622{
2623 if (items >= 2)
2624 {
2625 /* callback form */
2626 AV *av = (AV *)SvRV (arg [0]);
2627 CV *cb_cv = coro_sv_2cv (aTHX_ arg [1]);
2628
2629 av_push (av, (SV *)SvREFCNT_inc_NN (cb_cv));
2630
2631 if (SvIVX (AvARRAY (av)[0]) > 0)
2632 coro_semaphore_adjust (aTHX_ av, 0);
2633
2634 frame->prepare = prepare_nop;
2635 frame->check = slf_check_nop;
2636 }
2637 else
2638 {
2639 slf_init_semaphore_down_or_wait (aTHX_ frame, cv, arg, items);
2640 frame->check = slf_check_semaphore_wait;
2641 }
2642}
2643
2644/* signal */
2645
2646static void
2647coro_signal_wake (pTHX_ AV *av, int count)
2648{
2649 SvIVX (AvARRAY (av)[0]) = 0;
2650
2651 /* now signal count waiters */
2652 while (count > 0 && AvFILLp (av) > 0)
2653 {
2654 SV *cb;
2655
2656 /* swap first two elements so we can shift a waiter */
2657 cb = AvARRAY (av)[0];
2658 AvARRAY (av)[0] = AvARRAY (av)[1];
2659 AvARRAY (av)[1] = cb;
2660
2661 cb = av_shift (av);
2662
2663 api_ready (aTHX_ cb);
2664 sv_setiv (cb, 0); /* signal waiter */
2665 SvREFCNT_dec (cb);
2666
2667 --count;
2668 }
2669}
2670
2671static int
2672slf_check_signal_wait (pTHX_ struct CoroSLF *frame)
2673{
2674 /* if we are about to throw, also stop waiting */
2675 return SvROK ((SV *)frame->data) && !CORO_THROW;
2676}
2677
2678static void
2679slf_init_signal_wait (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2680{
2681 AV *av = (AV *)SvRV (arg [0]);
2682
2683 if (SvIVX (AvARRAY (av)[0]))
2684 {
2685 SvIVX (AvARRAY (av)[0]) = 0;
2686 frame->prepare = prepare_nop;
2687 frame->check = slf_check_nop;
2688 }
2689 else
2690 {
2691 SV *waiter = newRV_inc (SvRV (coro_current)); /* owned by signal av */
2692
2693 av_push (av, waiter);
2694
2695 frame->data = (void *)sv_2mortal (SvREFCNT_inc_NN (waiter)); /* owned by process */
2696 frame->prepare = prepare_schedule;
2697 frame->check = slf_check_signal_wait;
2698 }
2699}
2700
2701/*****************************************************************************/
2702/* Coro::AIO */
2703
2704#define CORO_MAGIC_type_aio PERL_MAGIC_ext
2705
2706/* helper storage struct */
2707struct io_state
2708{
2709 int errorno;
2710 I32 laststype; /* U16 in 5.10.0 */
2711 int laststatval;
2712 Stat_t statcache;
2713};
2714
2715static void
2716coro_aio_callback (pTHX_ CV *cv)
2717{
2718 dXSARGS;
2719 AV *state = (AV *)GENSUB_ARG;
2720 SV *coro = av_pop (state);
2721 SV *data_sv = newSV (sizeof (struct io_state));
2722
2723 av_extend (state, items - 1);
2724
2725 sv_upgrade (data_sv, SVt_PV);
2726 SvCUR_set (data_sv, sizeof (struct io_state));
2727 SvPOK_only (data_sv);
2728
2729 {
2730 struct io_state *data = (struct io_state *)SvPVX (data_sv);
2731
2732 data->errorno = errno;
2733 data->laststype = PL_laststype;
2734 data->laststatval = PL_laststatval;
2735 data->statcache = PL_statcache;
2736 }
2737
2738 /* now build the result vector out of all the parameters and the data_sv */
2739 {
2740 int i;
2741
2742 for (i = 0; i < items; ++i)
2743 av_push (state, SvREFCNT_inc_NN (ST (i)));
2744 }
2745
2746 av_push (state, data_sv);
2747
2748 api_ready (aTHX_ coro);
2749 SvREFCNT_dec (coro);
2750 SvREFCNT_dec ((AV *)state);
2751}
2752
2753static int
2754slf_check_aio_req (pTHX_ struct CoroSLF *frame)
2755{
2756 AV *state = (AV *)frame->data;
2757
2758 /* if we are about to throw, return early */
2759 /* this does not cancel the aio request, but at least */
2760 /* it quickly returns */
2761 if (CORO_THROW)
2762 return 0;
2763
2764 /* one element that is an RV? repeat! */
2765 if (AvFILLp (state) == 0 && SvROK (AvARRAY (state)[0]))
2766 return 1;
2767
2768 /* restore status */
2769 {
2770 SV *data_sv = av_pop (state);
2771 struct io_state *data = (struct io_state *)SvPVX (data_sv);
2772
2773 errno = data->errorno;
2774 PL_laststype = data->laststype;
2775 PL_laststatval = data->laststatval;
2776 PL_statcache = data->statcache;
2777
2778 SvREFCNT_dec (data_sv);
2779 }
2780
2781 /* push result values */
2782 {
2783 dSP;
2784 int i;
2785
2786 EXTEND (SP, AvFILLp (state) + 1);
2787 for (i = 0; i <= AvFILLp (state); ++i)
2788 PUSHs (sv_2mortal (SvREFCNT_inc_NN (AvARRAY (state)[i])));
2789
2790 PUTBACK;
2791 }
2792
2793 return 0;
2794}
2795
2796static void
2797slf_init_aio_req (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2798{
2799 AV *state = (AV *)sv_2mortal ((SV *)newAV ());
2800 SV *coro_hv = SvRV (coro_current);
2801 struct coro *coro = SvSTATE_hv (coro_hv);
2802
2803 /* put our coroutine id on the state arg */
2804 av_push (state, SvREFCNT_inc_NN (coro_hv));
2805
2806 /* first see whether we have a non-zero priority and set it as AIO prio */
2807 if (coro->prio)
2808 {
2809 dSP;
2810
2811 static SV *prio_cv;
2812 static SV *prio_sv;
2813
2814 if (expect_false (!prio_cv))
2815 {
2816 prio_cv = (SV *)get_cv ("IO::AIO::aioreq_pri", 0);
2817 prio_sv = newSViv (0);
2818 }
2819
2820 PUSHMARK (SP);
2821 sv_setiv (prio_sv, coro->prio);
2822 XPUSHs (prio_sv);
2823
2824 PUTBACK;
2825 call_sv (prio_cv, G_VOID | G_DISCARD);
2826 }
2827
2828 /* now call the original request */
2829 {
2830 dSP;
2831 CV *req = (CV *)CORO_MAGIC_NN ((SV *)cv, CORO_MAGIC_type_aio)->mg_obj;
2832 int i;
2833
2834 PUSHMARK (SP);
2835
2836 /* first push all args to the stack */
2837 EXTEND (SP, items + 1);
2838
2839 for (i = 0; i < items; ++i)
2840 PUSHs (arg [i]);
2841
2842 /* now push the callback closure */
2843 PUSHs (sv_2mortal (gensub (aTHX_ coro_aio_callback, (void *)SvREFCNT_inc_NN ((SV *)state))));
2844
2845 /* now call the AIO function - we assume our request is uncancelable */
2846 PUTBACK;
2847 call_sv ((SV *)req, G_VOID | G_DISCARD);
2848 }
2849
2850 /* now that the requets is going, we loop toll we have a result */
2851 frame->data = (void *)state;
2852 frame->prepare = prepare_schedule;
2853 frame->check = slf_check_aio_req;
2854}
2855
2856static void
2857coro_aio_req_xs (pTHX_ CV *cv)
2858{
2859 dXSARGS;
2860
2861 CORO_EXECUTE_SLF_XS (slf_init_aio_req);
2862
2863 XSRETURN_EMPTY;
2864}
2865
2866/*****************************************************************************/
2867
2868#if CORO_CLONE
2869# include "clone.c"
2870#endif
2871
2872MODULE = Coro::State PACKAGE = Coro::State PREFIX = api_
2873
2874PROTOTYPES: DISABLE
2875
2876BOOT:
2877{
2878#ifdef USE_ITHREADS
2879# if CORO_PTHREAD
2880 coro_thx = PERL_GET_CONTEXT;
2881# endif
2882#endif
2883 BOOT_PAGESIZE;
2884
2885 cctx_current = cctx_new_empty ();
2886
2887 irsgv = gv_fetchpv ("/" , GV_ADD|GV_NOTQUAL, SVt_PV);
2888 stdoutgv = gv_fetchpv ("STDOUT", GV_ADD|GV_NOTQUAL, SVt_PVIO);
2889
2890 orig_sigelem_get = PL_vtbl_sigelem.svt_get; PL_vtbl_sigelem.svt_get = coro_sigelem_get;
2891 orig_sigelem_set = PL_vtbl_sigelem.svt_set; PL_vtbl_sigelem.svt_set = coro_sigelem_set;
2892 orig_sigelem_clr = PL_vtbl_sigelem.svt_clear; PL_vtbl_sigelem.svt_clear = coro_sigelem_clr;
2893
2894 hv_sig = coro_get_hv (aTHX_ "SIG", TRUE);
2895 rv_diehook = newRV_inc ((SV *)gv_fetchpv ("Coro::State::diehook" , 0, SVt_PVCV));
2896 rv_warnhook = newRV_inc ((SV *)gv_fetchpv ("Coro::State::warnhook", 0, SVt_PVCV));
2897
2898 coro_state_stash = gv_stashpv ("Coro::State", TRUE);
2899
2900 newCONSTSUB (coro_state_stash, "CC_TRACE" , newSViv (CC_TRACE));
2901 newCONSTSUB (coro_state_stash, "CC_TRACE_SUB" , newSViv (CC_TRACE_SUB));
2902 newCONSTSUB (coro_state_stash, "CC_TRACE_LINE", newSViv (CC_TRACE_LINE));
2903 newCONSTSUB (coro_state_stash, "CC_TRACE_ALL" , newSViv (CC_TRACE_ALL));
2904
2905 main_mainstack = PL_mainstack;
2906 main_top_env = PL_top_env;
2907
2908 while (main_top_env->je_prev)
2909 main_top_env = main_top_env->je_prev;
2910
2911 {
2912 SV *slf = sv_2mortal (newSViv (PTR2IV (pp_slf)));
2913
2914 if (!PL_custom_op_names) PL_custom_op_names = newHV ();
2915 hv_store_ent (PL_custom_op_names, slf, newSVpv ("coro_slf", 0), 0);
2916
2917 if (!PL_custom_op_descs) PL_custom_op_descs = newHV ();
2918 hv_store_ent (PL_custom_op_descs, slf, newSVpv ("coro schedule like function", 0), 0);
2919 }
2920
2921 coroapi.ver = CORO_API_VERSION;
2922 coroapi.rev = CORO_API_REVISION;
2923
2924 coroapi.transfer = api_transfer;
2925
2926 coroapi.sv_state = SvSTATE_;
2927 coroapi.execute_slf = api_execute_slf;
2928 coroapi.prepare_nop = prepare_nop;
2929 coroapi.prepare_schedule = prepare_schedule;
2930 coroapi.prepare_cede = prepare_cede;
2931 coroapi.prepare_cede_notself = prepare_cede_notself;
2932
2933 {
2934 SV **svp = hv_fetch (PL_modglobal, "Time::NVtime", 12, 0);
2935
2936 if (!svp) croak ("Time::HiRes is required");
2937 if (!SvIOK (*svp)) croak ("Time::NVtime isn't a function pointer");
2938
2939 nvtime = INT2PTR (double (*)(), SvIV (*svp));
2940 }
2941
2942 assert (("PRIO_NORMAL must be 0", !PRIO_NORMAL));
2943}
2944
2945SV *
2946new (char *klass, ...)
2947 ALIAS:
2948 Coro::new = 1
2949 CODE:
2950{
2951 struct coro *coro;
2952 MAGIC *mg;
2953 HV *hv;
2954 CV *cb;
2955 int i;
2956
2957 if (items > 1)
2958 {
2959 cb = coro_sv_2cv (aTHX_ ST (1));
2960
2961 if (!ix)
235 { 2962 {
236 /* I never used formats, so how should I know how these are implemented? */ 2963 if (CvISXSUB (cb))
237 /* my bold guess is as a simple, plain sub... */ 2964 croak ("Coro::State doesn't support XS functions as coroutine start, caught");
238 croak ("CXt_FORMAT not yet handled. Don't switch coroutines from within formats"); 2965
2966 if (!CvROOT (cb))
2967 croak ("Coro::State doesn't support autoloaded or undefined functions as coroutine start, caught");
239 } 2968 }
240 } 2969 }
241 2970
242 if (top_si->si_type == PERLSI_MAIN)
243 break;
244
245 top_si = top_si->si_prev;
246 ccstk = top_si->si_cxstack;
247 cxix = top_si->si_cxix;
248 }
249
250 PUTBACK;
251 }
252
253 c->dowarn = PL_dowarn;
254 c->defav = GvAV (PL_defgv);
255 c->curstackinfo = PL_curstackinfo;
256 c->curstack = PL_curstack;
257 c->mainstack = PL_mainstack;
258 c->stack_sp = PL_stack_sp;
259 c->op = PL_op;
260 c->curpad = PL_curpad;
261 c->stack_base = PL_stack_base;
262 c->stack_max = PL_stack_max;
263 c->tmps_stack = PL_tmps_stack;
264 c->tmps_floor = PL_tmps_floor;
265 c->tmps_ix = PL_tmps_ix;
266 c->tmps_max = PL_tmps_max;
267 c->markstack = PL_markstack;
268 c->markstack_ptr = PL_markstack_ptr;
269 c->markstack_max = PL_markstack_max;
270 c->scopestack = PL_scopestack;
271 c->scopestack_ix = PL_scopestack_ix;
272 c->scopestack_max = PL_scopestack_max;
273 c->savestack = PL_savestack;
274 c->savestack_ix = PL_savestack_ix;
275 c->savestack_max = PL_savestack_max;
276 c->retstack = PL_retstack;
277 c->retstack_ix = PL_retstack_ix;
278 c->retstack_max = PL_retstack_max;
279 c->curcop = PL_curcop;
280}
281
282#define LOAD(state) do { load_state(aTHX_ state); SPAGAIN; } while (0)
283#define SAVE(state) do { PUTBACK; save_state(aTHX_ state); } while (0)
284
285static void
286load_state(pTHX_ Coro__State c)
287{
288 PL_dowarn = c->dowarn;
289 GvAV (PL_defgv) = c->defav;
290 PL_curstackinfo = c->curstackinfo;
291 PL_curstack = c->curstack;
292 PL_mainstack = c->mainstack;
293 PL_stack_sp = c->stack_sp;
294 PL_op = c->op;
295 PL_curpad = c->curpad;
296 PL_stack_base = c->stack_base;
297 PL_stack_max = c->stack_max;
298 PL_tmps_stack = c->tmps_stack;
299 PL_tmps_floor = c->tmps_floor;
300 PL_tmps_ix = c->tmps_ix;
301 PL_tmps_max = c->tmps_max;
302 PL_markstack = c->markstack;
303 PL_markstack_ptr = c->markstack_ptr;
304 PL_markstack_max = c->markstack_max;
305 PL_scopestack = c->scopestack;
306 PL_scopestack_ix = c->scopestack_ix;
307 PL_scopestack_max = c->scopestack_max;
308 PL_savestack = c->savestack;
309 PL_savestack_ix = c->savestack_ix;
310 PL_savestack_max = c->savestack_max;
311 PL_retstack = c->retstack;
312 PL_retstack_ix = c->retstack_ix;
313 PL_retstack_max = c->retstack_max;
314 PL_curcop = c->curcop;
315
316 {
317 dSP;
318 CV *cv;
319
320 /* now do the ugly restore mess */
321 while ((cv = (CV *)POPs))
322 {
323 AV *padlist = (AV *)POPs;
324
325 put_padlist (cv);
326 CvPADLIST(cv) = padlist;
327 CvDEPTH(cv) = (I32)POPs;
328
329#ifdef USE_THREADS
330 CvOWNER(cv) = (struct perl_thread *)POPs;
331 error does not work either
332#endif
333 }
334
335 PUTBACK;
336 }
337}
338
339/* this is an EXACT copy of S_nuke_stacks in perl.c, which is unfortunately static */
340STATIC void
341destroy_stacks(pTHX)
342{
343 /* die does this while calling POPSTACK, but I just don't see why. */
344 /* OTOH, die does not have a memleak, but we do... */
345 dounwind(-1);
346
347 /* is this ugly, I ask? */
348 while (PL_scopestack_ix)
349 LEAVE;
350
351 while (PL_curstackinfo->si_next)
352 PL_curstackinfo = PL_curstackinfo->si_next;
353
354 while (PL_curstackinfo)
355 {
356 PERL_SI *p = PL_curstackinfo->si_prev;
357
358 SvREFCNT_dec(PL_curstackinfo->si_stack);
359 Safefree(PL_curstackinfo->si_cxstack);
360 Safefree(PL_curstackinfo);
361 PL_curstackinfo = p;
362 }
363
364 if (PL_scopestack_ix != 0)
365 Perl_warner(aTHX_ WARN_INTERNAL,
366 "Unbalanced scopes: %ld more ENTERs than LEAVEs\n",
367 (long)PL_scopestack_ix);
368 if (PL_savestack_ix != 0)
369 Perl_warner(aTHX_ WARN_INTERNAL,
370 "Unbalanced saves: %ld more saves than restores\n",
371 (long)PL_savestack_ix);
372 if (PL_tmps_floor != -1)
373 Perl_warner(aTHX_ WARN_INTERNAL,"Unbalanced tmps: %ld more allocs than frees\n",
374 (long)PL_tmps_floor + 1);
375 /*
376 */
377 Safefree(PL_tmps_stack);
378 Safefree(PL_markstack);
379 Safefree(PL_scopestack);
380 Safefree(PL_savestack);
381 Safefree(PL_retstack);
382}
383
384#define SUB_INIT "Coro::State::_newcoro"
385
386MODULE = Coro::State PACKAGE = Coro::State
387
388PROTOTYPES: ENABLE
389
390BOOT:
391 if (!padlist_cache)
392 padlist_cache = newHV ();
393
394Coro::State
395_newprocess(args)
396 SV * args
397 PROTOTYPE: $
398 CODE:
399 Coro__State coro;
400
401 if (!SvROK (args) || SvTYPE (SvRV (args)) != SVt_PVAV)
402 croak ("Coro::State::newprocess expects an arrayref");
403
404 New (0, coro, 1, struct coro); 2971 Newz (0, coro, 1, struct coro);
2972 coro->args = newAV ();
2973 coro->flags = CF_NEW;
405 2974
406 coro->mainstack = 0; /* actual work is done inside transfer */ 2975 if (coro_first) coro_first->prev = coro;
407 coro->args = (AV *)SvREFCNT_inc (SvRV (args)); 2976 coro->next = coro_first;
2977 coro_first = coro;
408 2978
409 RETVAL = coro; 2979 coro->hv = hv = newHV ();
2980 mg = sv_magicext ((SV *)hv, 0, CORO_MAGIC_type_state, &coro_state_vtbl, (char *)coro, 0);
2981 mg->mg_flags |= MGf_DUP;
2982 RETVAL = sv_bless (newRV_noinc ((SV *)hv), gv_stashpv (klass, 1));
2983
2984 if (items > 1)
2985 {
2986 av_extend (coro->args, items - 1 + ix - 1);
2987
2988 if (ix)
2989 {
2990 av_push (coro->args, SvREFCNT_inc_NN ((SV *)cb));
2991 cb = cv_coro_run;
2992 }
2993
2994 coro->startcv = (CV *)SvREFCNT_inc_NN ((SV *)cb);
2995
2996 for (i = 2; i < items; i++)
2997 av_push (coro->args, newSVsv (ST (i)));
2998 }
2999}
410 OUTPUT: 3000 OUTPUT:
411 RETVAL 3001 RETVAL
412 3002
413void 3003void
414transfer(prev,next) 3004transfer (...)
415 Coro::State_or_hashref prev 3005 PROTOTYPE: $$
416 Coro::State_or_hashref next 3006 CODE:
417 CODE: 3007 CORO_EXECUTE_SLF_XS (slf_init_transfer);
418 3008
419 if (prev != next) 3009bool
3010_destroy (SV *coro_sv)
3011 CODE:
3012 RETVAL = coro_state_destroy (aTHX_ SvSTATE (coro_sv));
3013 OUTPUT:
3014 RETVAL
3015
3016void
3017_exit (int code)
3018 PROTOTYPE: $
3019 CODE:
3020 _exit (code);
3021
3022SV *
3023clone (Coro::State coro)
3024 CODE:
3025{
3026#if CORO_CLONE
3027 struct coro *ncoro = coro_clone (aTHX_ coro);
3028 MAGIC *mg;
3029 /* TODO: too much duplication */
3030 ncoro->hv = newHV ();
3031 mg = sv_magicext ((SV *)ncoro->hv, 0, CORO_MAGIC_type_state, &coro_state_vtbl, (char *)ncoro, 0);
3032 mg->mg_flags |= MGf_DUP;
3033 RETVAL = sv_bless (newRV_noinc ((SV *)ncoro->hv), SvSTASH (coro->hv));
3034#else
3035 croak ("Coro::State->clone has not been configured into this installation of Coro, realised");
3036#endif
3037}
3038 OUTPUT:
3039 RETVAL
3040
3041int
3042cctx_stacksize (int new_stacksize = 0)
3043 PROTOTYPE: ;$
3044 CODE:
3045 RETVAL = cctx_stacksize;
3046 if (new_stacksize)
420 { 3047 {
3048 cctx_stacksize = new_stacksize;
3049 ++cctx_gen;
421 /* 3050 }
422 * this could be done in newprocess which would lead to 3051 OUTPUT:
423 * extremely elegant and fast (just SAVE/LOAD) 3052 RETVAL
424 * code here, but lazy allocation of stacks has also 3053
425 * some virtues and the overhead of the if() is nil. 3054int
3055cctx_max_idle (int max_idle = 0)
3056 PROTOTYPE: ;$
3057 CODE:
3058 RETVAL = cctx_max_idle;
3059 if (max_idle > 1)
3060 cctx_max_idle = max_idle;
3061 OUTPUT:
3062 RETVAL
3063
3064int
3065cctx_count ()
3066 PROTOTYPE:
3067 CODE:
3068 RETVAL = cctx_count;
3069 OUTPUT:
3070 RETVAL
3071
3072int
3073cctx_idle ()
3074 PROTOTYPE:
3075 CODE:
3076 RETVAL = cctx_idle;
3077 OUTPUT:
3078 RETVAL
3079
3080void
3081list ()
3082 PROTOTYPE:
3083 PPCODE:
3084{
3085 struct coro *coro;
3086 for (coro = coro_first; coro; coro = coro->next)
3087 if (coro->hv)
3088 XPUSHs (sv_2mortal (newRV_inc ((SV *)coro->hv)));
3089}
3090
3091void
3092call (Coro::State coro, SV *coderef)
3093 ALIAS:
3094 eval = 1
3095 CODE:
3096{
3097 if (coro->mainstack && ((coro->flags & CF_RUNNING) || coro->slot))
426 */ 3098 {
427 if (next->mainstack) 3099 struct coro *current = SvSTATE_current;
3100
3101 if (current != coro)
428 { 3102 {
429 SAVE (prev); 3103 PUTBACK;
430 LOAD (next); 3104 save_perl (aTHX_ current);
431 /* mark this state as in-use */ 3105 load_perl (aTHX_ coro);
432 next->mainstack = 0; 3106 SPAGAIN;
433 next->tmps_ix = -2;
434 } 3107 }
435 else if (next->tmps_ix == -2) 3108
3109 PUSHSTACK;
3110
3111 PUSHMARK (SP);
3112 PUTBACK;
3113
3114 if (ix)
3115 eval_sv (coderef, 0);
3116 else
3117 call_sv (coderef, G_KEEPERR | G_EVAL | G_VOID | G_DISCARD);
3118
3119 POPSTACK;
3120 SPAGAIN;
3121
3122 if (current != coro)
436 { 3123 {
437 croak ("tried to transfer to running coroutine");
438 }
439 else
440 {
441 SAVE (prev);
442
443 /* 3124 PUTBACK;
444 * emulate part of the perl startup here. 3125 save_perl (aTHX_ coro);
445 */ 3126 load_perl (aTHX_ current);
446 UNOP myop;
447
448 init_stacks (); /* from perl.c */
449 PL_op = (OP *)&myop;
450 /*PL_curcop = 0;*/
451 GvAV (PL_defgv) = (AV *)SvREFCNT_inc ((SV *)next->args);
452
453 SPAGAIN; 3127 SPAGAIN;
454 Zero(&myop, 1, UNOP);
455 myop.op_next = Nullop;
456 myop.op_flags = OPf_WANT_VOID;
457
458 PUSHMARK(SP);
459 XPUSHs ((SV*)get_cv(SUB_INIT, TRUE));
460 PUTBACK;
461 /*
462 * the next line is slightly wrong, as PL_op->op_next
463 * is actually being executed so we skip the first op.
464 * that doesn't matter, though, since it is only
465 * pp_nextstate and we never return...
466 */
467 PL_op = Perl_pp_entersub(aTHX);
468 SPAGAIN;
469
470 ENTER;
471 } 3128 }
472 } 3129 }
3130}
3131
3132SV *
3133is_ready (Coro::State coro)
3134 PROTOTYPE: $
3135 ALIAS:
3136 is_ready = CF_READY
3137 is_running = CF_RUNNING
3138 is_new = CF_NEW
3139 is_destroyed = CF_DESTROYED
3140 is_suspended = CF_SUSPENDED
3141 CODE:
3142 RETVAL = boolSV (coro->flags & ix);
3143 OUTPUT:
3144 RETVAL
473 3145
474void 3146void
475DESTROY(coro) 3147throw (Coro::State self, SV *throw = &PL_sv_undef)
476 Coro::State coro 3148 PROTOTYPE: $;$
477 CODE: 3149 CODE:
3150{
3151 struct coro *current = SvSTATE_current;
3152 SV **throwp = self == current ? &CORO_THROW : &self->except;
3153 SvREFCNT_dec (*throwp);
3154 *throwp = SvOK (throw) ? newSVsv (throw) : 0;
3155}
478 3156
479 if (coro->mainstack) 3157void
3158api_trace (SV *coro, int flags = CC_TRACE | CC_TRACE_SUB)
3159 PROTOTYPE: $;$
3160 C_ARGS: aTHX_ coro, flags
3161
3162SV *
3163has_cctx (Coro::State coro)
3164 PROTOTYPE: $
3165 CODE:
3166 /* maybe manage the running flag differently */
3167 RETVAL = boolSV (!!coro->cctx || (coro->flags & CF_RUNNING));
3168 OUTPUT:
3169 RETVAL
3170
3171int
3172is_traced (Coro::State coro)
3173 PROTOTYPE: $
3174 CODE:
3175 RETVAL = (coro->cctx ? coro->cctx->flags : 0) & CC_TRACE_ALL;
3176 OUTPUT:
3177 RETVAL
3178
3179UV
3180rss (Coro::State coro)
3181 PROTOTYPE: $
3182 ALIAS:
3183 usecount = 1
3184 CODE:
3185 switch (ix)
3186 {
3187 case 0: RETVAL = coro_rss (aTHX_ coro); break;
3188 case 1: RETVAL = coro->usecount; break;
3189 }
3190 OUTPUT:
3191 RETVAL
3192
3193void
3194force_cctx ()
3195 PROTOTYPE:
3196 CODE:
3197 cctx_current->idle_sp = 0;
3198
3199void
3200swap_defsv (Coro::State self)
3201 PROTOTYPE: $
3202 ALIAS:
3203 swap_defav = 1
3204 CODE:
3205 if (!self->slot)
3206 croak ("cannot swap state with coroutine that has no saved state,");
3207 else
480 { 3208 {
481 struct coro temp; 3209 SV **src = ix ? (SV **)&GvAV (PL_defgv) : &GvSV (PL_defgv);
3210 SV **dst = ix ? (SV **)&self->slot->defav : (SV **)&self->slot->defsv;
482 3211
483 SAVE(aTHX_ (&temp)); 3212 SV *tmp = *src; *src = *dst; *dst = tmp;
484 LOAD(aTHX_ coro);
485
486 destroy_stacks ();
487 SvREFCNT_dec ((SV *)GvAV (PL_defgv));
488
489 LOAD((&temp));
490 } 3213 }
491 3214
3215void
3216cancel (Coro::State self)
3217 CODE:
3218 coro_state_destroy (aTHX_ self);
3219 coro_call_on_destroy (aTHX_ self); /* actually only for Coro objects */
3220
3221
3222MODULE = Coro::State PACKAGE = Coro
3223
3224BOOT:
3225{
3226 sv_pool_rss = coro_get_sv (aTHX_ "Coro::POOL_RSS" , TRUE);
3227 sv_pool_size = coro_get_sv (aTHX_ "Coro::POOL_SIZE" , TRUE);
3228 cv_coro_run = get_cv ( "Coro::_coro_run" , GV_ADD);
3229 cv_coro_terminate = get_cv ( "Coro::terminate" , GV_ADD);
3230 coro_current = coro_get_sv (aTHX_ "Coro::current" , FALSE); SvREADONLY_on (coro_current);
3231 av_async_pool = coro_get_av (aTHX_ "Coro::async_pool", TRUE);
3232 av_destroy = coro_get_av (aTHX_ "Coro::destroy" , TRUE);
3233 sv_manager = coro_get_sv (aTHX_ "Coro::manager" , TRUE);
3234 sv_idle = coro_get_sv (aTHX_ "Coro::idle" , TRUE);
3235
3236 sv_async_pool_idle = newSVpv ("[async pool idle]", 0); SvREADONLY_on (sv_async_pool_idle);
3237 sv_Coro = newSVpv ("Coro", 0); SvREADONLY_on (sv_Coro);
3238 cv_pool_handler = get_cv ("Coro::pool_handler", GV_ADD); SvREADONLY_on (cv_pool_handler);
3239 cv_coro_state_new = get_cv ("Coro::State::new", 0); SvREADONLY_on (cv_coro_state_new);
3240
3241 coro_stash = gv_stashpv ("Coro", TRUE);
3242
3243 newCONSTSUB (coro_stash, "PRIO_MAX", newSViv (PRIO_MAX));
3244 newCONSTSUB (coro_stash, "PRIO_HIGH", newSViv (PRIO_HIGH));
3245 newCONSTSUB (coro_stash, "PRIO_NORMAL", newSViv (PRIO_NORMAL));
3246 newCONSTSUB (coro_stash, "PRIO_LOW", newSViv (PRIO_LOW));
3247 newCONSTSUB (coro_stash, "PRIO_IDLE", newSViv (PRIO_IDLE));
3248 newCONSTSUB (coro_stash, "PRIO_MIN", newSViv (PRIO_MIN));
3249
3250 {
3251 SV *sv = coro_get_sv (aTHX_ "Coro::API", TRUE);
3252
3253 coroapi.schedule = api_schedule;
3254 coroapi.schedule_to = api_schedule_to;
3255 coroapi.cede = api_cede;
3256 coroapi.cede_notself = api_cede_notself;
3257 coroapi.ready = api_ready;
3258 coroapi.is_ready = api_is_ready;
3259 coroapi.nready = coro_nready;
3260 coroapi.current = coro_current;
3261
3262 /*GCoroAPI = &coroapi;*/
3263 sv_setiv (sv, (IV)&coroapi);
3264 SvREADONLY_on (sv);
3265 }
3266}
3267
3268void
3269terminate (...)
3270 CODE:
3271 CORO_EXECUTE_SLF_XS (slf_init_terminate);
3272
3273void
3274schedule (...)
3275 CODE:
3276 CORO_EXECUTE_SLF_XS (slf_init_schedule);
3277
3278void
3279schedule_to (...)
3280 CODE:
3281 CORO_EXECUTE_SLF_XS (slf_init_schedule_to);
3282
3283void
3284cede_to (...)
3285 CODE:
3286 CORO_EXECUTE_SLF_XS (slf_init_cede_to);
3287
3288void
3289cede (...)
3290 CODE:
3291 CORO_EXECUTE_SLF_XS (slf_init_cede);
3292
3293void
3294cede_notself (...)
3295 CODE:
3296 CORO_EXECUTE_SLF_XS (slf_init_cede_notself);
3297
3298void
3299_set_current (SV *current)
3300 PROTOTYPE: $
3301 CODE:
3302 SvREFCNT_dec (SvRV (coro_current));
3303 SvRV_set (coro_current, SvREFCNT_inc_NN (SvRV (current)));
3304
3305void
3306_set_readyhook (SV *hook)
3307 PROTOTYPE: $
3308 CODE:
492 SvREFCNT_dec (coro->args); 3309 SvREFCNT_dec (coro_readyhook);
493 Safefree (coro); 3310 coro_readyhook = SvOK (hook) ? newSVsv (hook) : 0;
494 3311
3312int
3313prio (Coro::State coro, int newprio = 0)
3314 PROTOTYPE: $;$
3315 ALIAS:
3316 nice = 1
3317 CODE:
3318{
3319 RETVAL = coro->prio;
495 3320
3321 if (items > 1)
3322 {
3323 if (ix)
3324 newprio = coro->prio - newprio;
3325
3326 if (newprio < PRIO_MIN) newprio = PRIO_MIN;
3327 if (newprio > PRIO_MAX) newprio = PRIO_MAX;
3328
3329 coro->prio = newprio;
3330 }
3331}
3332 OUTPUT:
3333 RETVAL
3334
3335SV *
3336ready (SV *self)
3337 PROTOTYPE: $
3338 CODE:
3339 RETVAL = boolSV (api_ready (aTHX_ self));
3340 OUTPUT:
3341 RETVAL
3342
3343int
3344nready (...)
3345 PROTOTYPE:
3346 CODE:
3347 RETVAL = coro_nready;
3348 OUTPUT:
3349 RETVAL
3350
3351void
3352suspend (Coro::State self)
3353 PROTOTYPE: $
3354 CODE:
3355 self->flags |= CF_SUSPENDED;
3356
3357void
3358resume (Coro::State self)
3359 PROTOTYPE: $
3360 CODE:
3361 self->flags &= ~CF_SUSPENDED;
3362
3363void
3364_pool_handler (...)
3365 CODE:
3366 CORO_EXECUTE_SLF_XS (slf_init_pool_handler);
3367
3368void
3369async_pool (SV *cv, ...)
3370 PROTOTYPE: &@
3371 PPCODE:
3372{
3373 HV *hv = (HV *)av_pop (av_async_pool);
3374 AV *av = newAV ();
3375 SV *cb = ST (0);
3376 int i;
3377
3378 av_extend (av, items - 2);
3379 for (i = 1; i < items; ++i)
3380 av_push (av, SvREFCNT_inc_NN (ST (i)));
3381
3382 if ((SV *)hv == &PL_sv_undef)
3383 {
3384 PUSHMARK (SP);
3385 EXTEND (SP, 2);
3386 PUSHs (sv_Coro);
3387 PUSHs ((SV *)cv_pool_handler);
3388 PUTBACK;
3389 call_sv ((SV *)cv_coro_state_new, G_SCALAR);
3390 SPAGAIN;
3391
3392 hv = (HV *)SvREFCNT_inc_NN (SvRV (POPs));
3393 }
3394
3395 {
3396 struct coro *coro = SvSTATE_hv (hv);
3397
3398 assert (!coro->invoke_cb);
3399 assert (!coro->invoke_av);
3400 coro->invoke_cb = SvREFCNT_inc (cb);
3401 coro->invoke_av = av;
3402 }
3403
3404 api_ready (aTHX_ (SV *)hv);
3405
3406 if (GIMME_V != G_VOID)
3407 XPUSHs (sv_2mortal (newRV_noinc ((SV *)hv)));
3408 else
3409 SvREFCNT_dec (hv);
3410}
3411
3412SV *
3413rouse_cb ()
3414 PROTOTYPE:
3415 CODE:
3416 RETVAL = coro_new_rouse_cb (aTHX);
3417 OUTPUT:
3418 RETVAL
3419
3420void
3421rouse_wait (...)
3422 PROTOTYPE: ;$
3423 PPCODE:
3424 CORO_EXECUTE_SLF_XS (slf_init_rouse_wait);
3425
3426void
3427on_enter (SV *block)
3428 ALIAS:
3429 on_leave = 1
3430 PROTOTYPE: &
3431 CODE:
3432{
3433 struct coro *coro = SvSTATE_current;
3434 AV **avp = ix ? &coro->on_leave : &coro->on_enter;
3435
3436 block = (SV *)coro_sv_2cv (aTHX_ block);
3437
3438 if (!*avp)
3439 *avp = newAV ();
3440
3441 av_push (*avp, SvREFCNT_inc (block));
3442
3443 if (!ix)
3444 on_enterleave_call (aTHX_ block);
3445
3446 LEAVE; /* pp_entersub unfortunately forces an ENTER/LEAVE around XS calls */
3447 SAVEDESTRUCTOR_X (ix ? coro_pop_on_leave : coro_pop_on_enter, (void *)coro);
3448 ENTER; /* pp_entersub unfortunately forces an ENTER/LEAVE around XS calls */
3449}
3450
3451
3452MODULE = Coro::State PACKAGE = PerlIO::cede
3453
3454BOOT:
3455 PerlIO_define_layer (aTHX_ &PerlIO_cede);
3456
3457
3458MODULE = Coro::State PACKAGE = Coro::Semaphore
3459
3460SV *
3461new (SV *klass, SV *count = 0)
3462 CODE:
3463 RETVAL = sv_bless (
3464 coro_waitarray_new (aTHX_ count && SvOK (count) ? SvIV (count) : 1),
3465 GvSTASH (CvGV (cv))
3466 );
3467 OUTPUT:
3468 RETVAL
3469
3470# helper for Coro::Channel and others
3471SV *
3472_alloc (int count)
3473 CODE:
3474 RETVAL = coro_waitarray_new (aTHX_ count);
3475 OUTPUT:
3476 RETVAL
3477
3478SV *
3479count (SV *self)
3480 CODE:
3481 RETVAL = newSVsv (AvARRAY ((AV *)SvRV (self))[0]);
3482 OUTPUT:
3483 RETVAL
3484
3485void
3486up (SV *self, int adjust = 1)
3487 ALIAS:
3488 adjust = 1
3489 CODE:
3490 coro_semaphore_adjust (aTHX_ (AV *)SvRV (self), ix ? adjust : 1);
3491
3492void
3493down (...)
3494 CODE:
3495 CORO_EXECUTE_SLF_XS (slf_init_semaphore_down);
3496
3497void
3498wait (...)
3499 CODE:
3500 CORO_EXECUTE_SLF_XS (slf_init_semaphore_wait);
3501
3502void
3503try (SV *self)
3504 PPCODE:
3505{
3506 AV *av = (AV *)SvRV (self);
3507 SV *count_sv = AvARRAY (av)[0];
3508 IV count = SvIVX (count_sv);
3509
3510 if (count > 0)
3511 {
3512 --count;
3513 SvIVX (count_sv) = count;
3514 XSRETURN_YES;
3515 }
3516 else
3517 XSRETURN_NO;
3518}
3519
3520void
3521waiters (SV *self)
3522 PPCODE:
3523{
3524 AV *av = (AV *)SvRV (self);
3525 int wcount = AvFILLp (av) + 1 - 1;
3526
3527 if (GIMME_V == G_SCALAR)
3528 XPUSHs (sv_2mortal (newSViv (wcount)));
3529 else
3530 {
3531 int i;
3532 EXTEND (SP, wcount);
3533 for (i = 1; i <= wcount; ++i)
3534 PUSHs (sv_2mortal (newRV_inc (AvARRAY (av)[i])));
3535 }
3536}
3537
3538MODULE = Coro::State PACKAGE = Coro::SemaphoreSet
3539
3540void
3541_may_delete (SV *sem, int count, int extra_refs)
3542 PPCODE:
3543{
3544 AV *av = (AV *)SvRV (sem);
3545
3546 if (SvREFCNT ((SV *)av) == 1 + extra_refs
3547 && AvFILLp (av) == 0 /* no waiters, just count */
3548 && SvIV (AvARRAY (av)[0]) == count)
3549 XSRETURN_YES;
3550
3551 XSRETURN_NO;
3552}
3553
3554MODULE = Coro::State PACKAGE = Coro::Signal
3555
3556SV *
3557new (SV *klass)
3558 CODE:
3559 RETVAL = sv_bless (
3560 coro_waitarray_new (aTHX_ 0),
3561 GvSTASH (CvGV (cv))
3562 );
3563 OUTPUT:
3564 RETVAL
3565
3566void
3567wait (...)
3568 CODE:
3569 CORO_EXECUTE_SLF_XS (slf_init_signal_wait);
3570
3571void
3572broadcast (SV *self)
3573 CODE:
3574{
3575 AV *av = (AV *)SvRV (self);
3576 coro_signal_wake (aTHX_ av, AvFILLp (av));
3577}
3578
3579void
3580send (SV *self)
3581 CODE:
3582{
3583 AV *av = (AV *)SvRV (self);
3584
3585 if (AvFILLp (av))
3586 coro_signal_wake (aTHX_ av, 1);
3587 else
3588 SvIVX (AvARRAY (av)[0]) = 1; /* remember the signal */
3589}
3590
3591IV
3592awaited (SV *self)
3593 CODE:
3594 RETVAL = AvFILLp ((AV *)SvRV (self)) + 1 - 1;
3595 OUTPUT:
3596 RETVAL
3597
3598
3599MODULE = Coro::State PACKAGE = Coro::AnyEvent
3600
3601BOOT:
3602 sv_activity = coro_get_sv (aTHX_ "Coro::AnyEvent::ACTIVITY", TRUE);
3603
3604void
3605_schedule (...)
3606 CODE:
3607{
3608 static int incede;
3609
3610 api_cede_notself (aTHX);
3611
3612 ++incede;
3613 while (coro_nready >= incede && api_cede (aTHX))
3614 ;
3615
3616 sv_setsv (sv_activity, &PL_sv_undef);
3617 if (coro_nready >= incede)
3618 {
3619 PUSHMARK (SP);
3620 PUTBACK;
3621 call_pv ("Coro::AnyEvent::_activity", G_KEEPERR | G_EVAL | G_VOID | G_DISCARD);
3622 }
3623
3624 --incede;
3625}
3626
3627
3628MODULE = Coro::State PACKAGE = Coro::AIO
3629
3630void
3631_register (char *target, char *proto, SV *req)
3632 CODE:
3633{
3634 CV *req_cv = coro_sv_2cv (aTHX_ req);
3635 /* newXSproto doesn't return the CV on 5.8 */
3636 CV *slf_cv = newXS (target, coro_aio_req_xs, __FILE__);
3637 sv_setpv ((SV *)slf_cv, proto);
3638 sv_magicext ((SV *)slf_cv, (SV *)req_cv, CORO_MAGIC_type_aio, 0, 0, 0);
3639}
3640

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines