ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/Coro/Coro/State.xs
(Generate patch)

Comparing Coro/Coro/State.xs (file contents):
Revision 1.4 by root, Tue Jul 17 02:21:56 2001 UTC vs.
Revision 1.357 by root, Sat Jun 27 14:09:28 2009 UTC

1#include "libcoro/coro.c"
2
3#define PERL_NO_GET_CONTEXT
4#define PERL_EXT
5
1#include "EXTERN.h" 6#include "EXTERN.h"
2#include "perl.h" 7#include "perl.h"
3#include "XSUB.h" 8#include "XSUB.h"
9#include "perliol.h"
4 10
5#if 0 11#include "patchlevel.h"
6# define CHK(x) (void *)0 12
13#include <stdio.h>
14#include <errno.h>
15#include <assert.h>
16
17#ifdef WIN32
18# undef setjmp
19# undef longjmp
20# undef _exit
21# define setjmp _setjmp /* deep magic */
7#else 22#else
8# define CHK(x) if (!(x)) croak("FATAL, CHK: " #x) 23# include <inttypes.h> /* most portable stdint.h */
24#endif
25
26#ifdef HAVE_MMAP
27# include <unistd.h>
28# include <sys/mman.h>
29# ifndef MAP_ANONYMOUS
30# ifdef MAP_ANON
31# define MAP_ANONYMOUS MAP_ANON
32# else
33# undef HAVE_MMAP
34# endif
9#endif 35# endif
36# include <limits.h>
37# ifndef PAGESIZE
38# define PAGESIZE pagesize
39# define BOOT_PAGESIZE pagesize = sysconf (_SC_PAGESIZE)
40static long pagesize;
41# else
42# define BOOT_PAGESIZE (void)0
43# endif
44#else
45# define PAGESIZE 0
46# define BOOT_PAGESIZE (void)0
47#endif
10 48
49#if CORO_USE_VALGRIND
50# include <valgrind/valgrind.h>
51#endif
52
53/* the maximum number of idle cctx that will be pooled */
54static int cctx_max_idle = 4;
55
56#define PERL_VERSION_ATLEAST(a,b,c) \
57 (PERL_REVISION > (a) \
58 || (PERL_REVISION == (a) \
59 && (PERL_VERSION > (b) \
60 || (PERL_VERSION == (b) && PERL_SUBVERSION >= (c)))))
61
62#if !PERL_VERSION_ATLEAST (5,6,0)
63# ifndef PL_ppaddr
64# define PL_ppaddr ppaddr
65# endif
66# ifndef call_sv
67# define call_sv perl_call_sv
68# endif
69# ifndef get_sv
70# define get_sv perl_get_sv
71# endif
72# ifndef get_cv
73# define get_cv perl_get_cv
74# endif
75# ifndef IS_PADGV
76# define IS_PADGV(v) 0
77# endif
78# ifndef IS_PADCONST
79# define IS_PADCONST(v) 0
80# endif
81#endif
82
83/* 5.11 */
84#ifndef CxHASARGS
85# define CxHASARGS(cx) (cx)->blk_sub.hasargs
86#endif
87
88/* 5.10.0 */
89#ifndef SvREFCNT_inc_NN
90# define SvREFCNT_inc_NN(sv) SvREFCNT_inc (sv)
91#endif
92
93/* 5.8.8 */
94#ifndef GV_NOTQUAL
95# define GV_NOTQUAL 0
96#endif
97#ifndef newSV
98# define newSV(l) NEWSV(0,l)
99#endif
100#ifndef CvISXSUB_on
101# define CvISXSUB_on(cv) (void)cv
102#endif
103#ifndef CvISXSUB
104# define CvISXSUB(cv) (CvXSUB (cv) ? TRUE : FALSE)
105#endif
106#ifndef Newx
107# define Newx(ptr,nitems,type) New (0,ptr,nitems,type)
108#endif
109
110/* 5.8.7 */
111#ifndef SvRV_set
112# define SvRV_set(s,v) SvRV(s) = (v)
113#endif
114
115#if !__i386 && !__x86_64 && !__powerpc && !__m68k && !__alpha && !__mips && !__sparc64
116# undef CORO_STACKGUARD
117#endif
118
119#ifndef CORO_STACKGUARD
120# define CORO_STACKGUARD 0
121#endif
122
123/* prefer perl internal functions over our own? */
124#ifndef CORO_PREFER_PERL_FUNCTIONS
125# define CORO_PREFER_PERL_FUNCTIONS 0
126#endif
127
128/* The next macros try to return the current stack pointer, in an as
129 * portable way as possible. */
130#if __GNUC__ >= 4
131# define dSTACKLEVEL int stacklevel_dummy
132# define STACKLEVEL __builtin_frame_address (0)
133#else
134# define dSTACKLEVEL volatile void *stacklevel
135# define STACKLEVEL ((void *)&stacklevel)
136#endif
137
138#define IN_DESTRUCT PL_dirty
139
140#if __GNUC__ >= 3
141# define attribute(x) __attribute__(x)
142# define expect(expr,value) __builtin_expect ((expr), (value))
143# define INLINE static inline
144#else
145# define attribute(x)
146# define expect(expr,value) (expr)
147# define INLINE static
148#endif
149
150#define expect_false(expr) expect ((expr) != 0, 0)
151#define expect_true(expr) expect ((expr) != 0, 1)
152
153#define NOINLINE attribute ((noinline))
154
155#include "CoroAPI.h"
156#define GCoroAPI (&coroapi) /* very sneaky */
157
158#ifdef USE_ITHREADS
159# if CORO_PTHREAD
160static void *coro_thx;
161# endif
162#endif
163
164static double (*nvtime)(); /* so why doesn't it take void? */
165
166/* we hijack an hopefully unused CV flag for our purposes */
167#define CVf_SLF 0x4000
168static OP *pp_slf (pTHX);
169
170static U32 cctx_gen;
171static size_t cctx_stacksize = CORO_STACKSIZE;
172static struct CoroAPI coroapi;
173static AV *main_mainstack; /* used to differentiate between $main and others */
174static JMPENV *main_top_env;
175static HV *coro_state_stash, *coro_stash;
176static volatile SV *coro_mortal; /* will be freed/thrown after next transfer */
177
178static AV *av_destroy; /* destruction queue */
179static SV *sv_manager; /* the manager coro */
180static SV *sv_idle; /* $Coro::idle */
181
182static GV *irsgv; /* $/ */
183static GV *stdoutgv; /* *STDOUT */
184static SV *rv_diehook;
185static SV *rv_warnhook;
186static HV *hv_sig; /* %SIG */
187
188/* async_pool helper stuff */
189static SV *sv_pool_rss;
190static SV *sv_pool_size;
191static SV *sv_async_pool_idle; /* description string */
192static AV *av_async_pool; /* idle pool */
193static SV *sv_Coro; /* class string */
194static CV *cv_pool_handler;
195static CV *cv_coro_state_new;
196
197/* Coro::AnyEvent */
198static SV *sv_activity;
199
200static struct coro_cctx *cctx_first;
201static int cctx_count, cctx_idle;
202
203enum {
204 CC_MAPPED = 0x01,
205 CC_NOREUSE = 0x02, /* throw this away after tracing */
206 CC_TRACE = 0x04,
207 CC_TRACE_SUB = 0x08, /* trace sub calls */
208 CC_TRACE_LINE = 0x10, /* trace each statement */
209 CC_TRACE_ALL = CC_TRACE_SUB | CC_TRACE_LINE,
210};
211
212/* this is a structure representing a c-level coroutine */
213typedef struct coro_cctx
214{
215 struct coro_cctx *next;
216
217 /* the stack */
218 void *sptr;
219 size_t ssize;
220
221 /* cpu state */
222 void *idle_sp; /* sp of top-level transfer/schedule/cede call */
223 JMPENV *idle_te; /* same as idle_sp, but for top_env, TODO: remove once stable */
224 JMPENV *top_env;
225 coro_context cctx;
226
227 U32 gen;
228#if CORO_USE_VALGRIND
229 int valgrind_id;
230#endif
231 unsigned char flags;
232} coro_cctx;
233
234coro_cctx *cctx_current; /* the currently running cctx */
235
236/*****************************************************************************/
237
238enum {
239 CF_RUNNING = 0x0001, /* coroutine is running */
240 CF_READY = 0x0002, /* coroutine is ready */
241 CF_NEW = 0x0004, /* has never been switched to */
242 CF_DESTROYED = 0x0008, /* coroutine data has been freed */
243 CF_SUSPENDED = 0x0010, /* coroutine can't be scheduled */
244};
245
246/* the structure where most of the perl state is stored, overlaid on the cxstack */
247typedef struct
248{
249 SV *defsv;
250 AV *defav;
251 SV *errsv;
252 SV *irsgv;
253 HV *hinthv;
254#define VAR(name,type) type name;
255# include "state.h"
256#undef VAR
257} perl_slots;
258
259#define SLOT_COUNT ((sizeof (perl_slots) + sizeof (PERL_CONTEXT) - 1) / sizeof (PERL_CONTEXT))
260
261/* this is a structure representing a perl-level coroutine */
11struct coro { 262struct coro {
12 U8 dowarn; 263 /* the C coroutine allocated to this perl coroutine, if any */
13 AV *defav; 264 coro_cctx *cctx;
14 265
15 PERL_SI *curstackinfo; 266 /* ready queue */
16 AV *curstack; 267 struct coro *next_ready;
268
269 /* state data */
270 struct CoroSLF slf_frame; /* saved slf frame */
17 AV *mainstack; 271 AV *mainstack;
18 SV **stack_sp; 272 perl_slots *slot; /* basically the saved sp */
19 OP *op;
20 SV **curpad;
21 SV **stack_base;
22 SV **stack_max;
23 SV **tmps_stack;
24 I32 tmps_floor;
25 I32 tmps_ix;
26 I32 tmps_max;
27 I32 *markstack;
28 I32 *markstack_ptr;
29 I32 *markstack_max;
30 I32 *scopestack;
31 I32 scopestack_ix;
32 I32 scopestack_max;
33 ANY *savestack;
34 I32 savestack_ix;
35 I32 savestack_max;
36 OP **retstack;
37 I32 retstack_ix;
38 I32 retstack_max;
39 COP *curcop;
40 273
41 AV *args; 274 CV *startcv; /* the CV to execute */
275 AV *args; /* data associated with this coroutine (initial args) */
276 int refcnt; /* coroutines are refcounted, yes */
277 int flags; /* CF_ flags */
278 HV *hv; /* the perl hash associated with this coro, if any */
279 void (*on_destroy)(pTHX_ struct coro *coro);
280
281 /* statistics */
282 int usecount; /* number of transfers to this coro */
283
284 /* coro process data */
285 int prio;
286 SV *except; /* exception to be thrown */
287 SV *rouse_cb;
288
289 /* async_pool */
290 SV *saved_deffh;
291 SV *invoke_cb;
292 AV *invoke_av;
293
294 /* on_enter/on_leave */
295 AV *on_enter;
296 AV *on_leave;
297
298 /* linked list */
299 struct coro *next, *prev;
42}; 300};
43 301
44typedef struct coro *Coro__State; 302typedef struct coro *Coro__State;
45typedef struct coro *Coro__State_or_hashref; 303typedef struct coro *Coro__State_or_hashref;
46 304
47static HV *padlist_cache; 305/* the following variables are effectively part of the perl context */
306/* and get copied between struct coro and these variables */
307/* the mainr easonw e don't support windows process emulation */
308static struct CoroSLF slf_frame; /* the current slf frame */
48 309
49/* mostly copied from op.c:cv_clone2 */ 310/** Coro ********************************************************************/
50STATIC AV * 311
51clone_padlist (AV *protopadlist) 312#define PRIO_MAX 3
313#define PRIO_HIGH 1
314#define PRIO_NORMAL 0
315#define PRIO_LOW -1
316#define PRIO_IDLE -3
317#define PRIO_MIN -4
318
319/* for Coro.pm */
320static SV *coro_current;
321static SV *coro_readyhook;
322static struct coro *coro_ready [PRIO_MAX - PRIO_MIN + 1][2]; /* head|tail */
323static CV *cv_coro_run, *cv_coro_terminate;
324static struct coro *coro_first;
325#define coro_nready coroapi.nready
326
327/** lowlevel stuff **********************************************************/
328
329static SV *
330coro_get_sv (pTHX_ const char *name, int create)
52{ 331{
53 AV *av; 332#if PERL_VERSION_ATLEAST (5,10,0)
54 I32 ix; 333 /* silence stupid and wrong 5.10 warning that I am unable to switch off */
55 AV *protopad_name = (AV *) * av_fetch (protopadlist, 0, FALSE); 334 get_sv (name, create);
56 AV *protopad = (AV *) * av_fetch (protopadlist, 1, FALSE); 335#endif
57 SV **pname = AvARRAY (protopad_name); 336 return get_sv (name, create);
58 SV **ppad = AvARRAY (protopad); 337}
59 I32 fname = AvFILLp (protopad_name); 338
60 I32 fpad = AvFILLp (protopad); 339static AV *
340coro_get_av (pTHX_ const char *name, int create)
341{
342#if PERL_VERSION_ATLEAST (5,10,0)
343 /* silence stupid and wrong 5.10 warning that I am unable to switch off */
344 get_av (name, create);
345#endif
346 return get_av (name, create);
347}
348
349static HV *
350coro_get_hv (pTHX_ const char *name, int create)
351{
352#if PERL_VERSION_ATLEAST (5,10,0)
353 /* silence stupid and wrong 5.10 warning that I am unable to switch off */
354 get_hv (name, create);
355#endif
356 return get_hv (name, create);
357}
358
359/* may croak */
360INLINE CV *
361coro_sv_2cv (pTHX_ SV *sv)
362{
363 HV *st;
364 GV *gvp;
365 CV *cv = sv_2cv (sv, &st, &gvp, 0);
366
367 if (!cv)
368 croak ("code reference expected");
369
370 return cv;
371}
372
373/*****************************************************************************/
374/* magic glue */
375
376#define CORO_MAGIC_type_cv 26
377#define CORO_MAGIC_type_state PERL_MAGIC_ext
378
379#define CORO_MAGIC_NN(sv, type) \
380 (expect_true (SvMAGIC (sv)->mg_type == type) \
381 ? SvMAGIC (sv) \
382 : mg_find (sv, type))
383
384#define CORO_MAGIC(sv, type) \
385 (expect_true (SvMAGIC (sv)) \
386 ? CORO_MAGIC_NN (sv, type) \
387 : 0)
388
389#define CORO_MAGIC_cv(cv) CORO_MAGIC (((SV *)(cv)), CORO_MAGIC_type_cv)
390#define CORO_MAGIC_state(sv) CORO_MAGIC_NN (((SV *)(sv)), CORO_MAGIC_type_state)
391
392INLINE struct coro *
393SvSTATE_ (pTHX_ SV *coro)
394{
395 HV *stash;
396 MAGIC *mg;
397
398 if (SvROK (coro))
399 coro = SvRV (coro);
400
401 if (expect_false (SvTYPE (coro) != SVt_PVHV))
402 croak ("Coro::State object required");
403
404 stash = SvSTASH (coro);
405 if (expect_false (stash != coro_stash && stash != coro_state_stash))
406 {
407 /* very slow, but rare, check */
408 if (!sv_derived_from (sv_2mortal (newRV_inc (coro)), "Coro::State"))
409 croak ("Coro::State object required");
410 }
411
412 mg = CORO_MAGIC_state (coro);
413 return (struct coro *)mg->mg_ptr;
414}
415
416#define SvSTATE(sv) SvSTATE_ (aTHX_ (sv))
417
418/* faster than SvSTATE, but expects a coroutine hv */
419#define SvSTATE_hv(hv) ((struct coro *)CORO_MAGIC_NN ((SV *)hv, CORO_MAGIC_type_state)->mg_ptr)
420#define SvSTATE_current SvSTATE_hv (SvRV (coro_current))
421
422/*****************************************************************************/
423/* padlist management and caching */
424
425static AV *
426coro_derive_padlist (pTHX_ CV *cv)
427{
428 AV *padlist = CvPADLIST (cv);
61 AV *newpadlist, *newpad_name, *newpad; 429 AV *newpadlist, *newpad;
62 SV **npad;
63
64 newpad_name = newAV ();
65 for (ix = fname; ix >= 0; ix--)
66 av_store (newpad_name, ix, SvREFCNT_inc (pname[ix]));
67
68 newpad = newAV ();
69 av_fill (newpad, AvFILLp (protopad));
70 npad = AvARRAY (newpad);
71 430
72 newpadlist = newAV (); 431 newpadlist = newAV ();
73 AvREAL_off (newpadlist); 432 AvREAL_off (newpadlist);
74 av_store (newpadlist, 0, (SV *) newpad_name); 433#if PERL_VERSION_ATLEAST (5,10,0)
434 Perl_pad_push (aTHX_ padlist, AvFILLp (padlist) + 1);
435#else
436 Perl_pad_push (aTHX_ padlist, AvFILLp (padlist) + 1, 1);
437#endif
438 newpad = (AV *)AvARRAY (padlist)[AvFILLp (padlist)];
439 --AvFILLp (padlist);
440
441 av_store (newpadlist, 0, SvREFCNT_inc_NN (AvARRAY (padlist)[0]));
75 av_store (newpadlist, 1, (SV *) newpad); 442 av_store (newpadlist, 1, (SV *)newpad);
76 443
77 av = newAV (); /* will be @_ */ 444 return newpadlist;
78 av_extend (av, 0); 445}
79 av_store (newpad, 0, (SV *) av);
80 AvFLAGS (av) = AVf_REIFY;
81 446
82 for (ix = fpad; ix > 0; ix--) 447static void
448free_padlist (pTHX_ AV *padlist)
449{
450 /* may be during global destruction */
451 if (!IN_DESTRUCT)
83 { 452 {
84 SV *namesv = (ix <= fname) ? pname[ix] : Nullsv; 453 I32 i = AvFILLp (padlist);
85 if (namesv && namesv != &PL_sv_undef) 454
455 while (i > 0) /* special-case index 0 */
86 { 456 {
87 char *name = SvPVX (namesv); /* XXX */ 457 /* we try to be extra-careful here */
88 if (SvFLAGS (namesv) & SVf_FAKE || *name == '&') 458 AV *av = (AV *)AvARRAY (padlist)[i--];
89 { /* lexical from outside? */ 459 I32 j = AvFILLp (av);
90 npad[ix] = SvREFCNT_inc (ppad[ix]); 460
91 } 461 while (j >= 0)
92 else 462 SvREFCNT_dec (AvARRAY (av)[j--]);
93 { /* our own lexical */ 463
94 SV *sv; 464 AvFILLp (av) = -1;
95 if (*name == '&') 465 SvREFCNT_dec (av);
96 sv = SvREFCNT_inc (ppad[ix]);
97 else if (*name == '@')
98 sv = (SV *) newAV ();
99 else if (*name == '%')
100 sv = (SV *) newHV ();
101 else
102 sv = NEWSV (0, 0);
103 if (!SvPADBUSY (sv))
104 SvPADMY_on (sv);
105 npad[ix] = sv;
106 }
107 } 466 }
108 else if (IS_PADGV (ppad[ix]) || IS_PADCONST (ppad[ix]))
109 {
110 npad[ix] = SvREFCNT_inc (ppad[ix]);
111 }
112 else
113 {
114 SV *sv = NEWSV (0, 0);
115 SvPADTMP_on (sv);
116 npad[ix] = sv;
117 }
118 }
119 467
120#if 0 /* NONOTUNDERSTOOD */ 468 SvREFCNT_dec (AvARRAY (padlist)[0]);
121 /* Now that vars are all in place, clone nested closures. */
122 469
123 for (ix = fpad; ix > 0; ix--) {
124 SV* namesv = (ix <= fname) ? pname[ix] : Nullsv;
125 if (namesv
126 && namesv != &PL_sv_undef
127 && !(SvFLAGS(namesv) & SVf_FAKE)
128 && *SvPVX(namesv) == '&'
129 && CvCLONE(ppad[ix]))
130 {
131 CV *kid = cv_clone((CV*)ppad[ix]);
132 SvREFCNT_dec(ppad[ix]);
133 CvCLONE_on(kid);
134 SvPADMY_on(kid);
135 npad[ix] = (SV*)kid;
136 }
137 }
138#endif
139
140 return newpadlist;
141}
142
143STATIC AV *
144free_padlist (AV *padlist)
145{
146 /* may be during global destruction */
147 if (SvREFCNT(padlist))
148 {
149 I32 i = AvFILLp(padlist); 470 AvFILLp (padlist) = -1;
150 while (i >= 0)
151 {
152 SV **svp = av_fetch(padlist, i--, FALSE);
153 SV *sv = svp ? *svp : Nullsv;
154 if (sv)
155 SvREFCNT_dec(sv);
156 }
157
158 SvREFCNT_dec((SV*)padlist); 471 SvREFCNT_dec ((SV*)padlist);
472 }
473}
474
475static int
476coro_cv_free (pTHX_ SV *sv, MAGIC *mg)
477{
478 AV *padlist;
479 AV *av = (AV *)mg->mg_obj;
480
481 /* casting is fun. */
482 while (&PL_sv_undef != (SV *)(padlist = (AV *)av_pop (av)))
483 free_padlist (aTHX_ padlist);
484
485 SvREFCNT_dec (av); /* sv_magicext increased the refcount */
486
487 return 0;
488}
489
490static MGVTBL coro_cv_vtbl = {
491 0, 0, 0, 0,
492 coro_cv_free
493};
494
495/* the next two functions merely cache the padlists */
496static void
497get_padlist (pTHX_ CV *cv)
498{
499 MAGIC *mg = CORO_MAGIC_cv (cv);
500 AV *av;
501
502 if (expect_true (mg && AvFILLp ((av = (AV *)mg->mg_obj)) >= 0))
503 CvPADLIST (cv) = (AV *)AvARRAY (av)[AvFILLp (av)--];
504 else
505 {
506#if CORO_PREFER_PERL_FUNCTIONS
507 /* this is probably cleaner? but also slower! */
508 /* in practise, it seems to be less stable */
509 CV *cp = Perl_cv_clone (aTHX_ cv);
510 CvPADLIST (cv) = CvPADLIST (cp);
511 CvPADLIST (cp) = 0;
512 SvREFCNT_dec (cp);
513#else
514 CvPADLIST (cv) = coro_derive_padlist (aTHX_ cv);
515#endif
516 }
517}
518
519static void
520put_padlist (pTHX_ CV *cv)
521{
522 MAGIC *mg = CORO_MAGIC_cv (cv);
523 AV *av;
524
525 if (expect_false (!mg))
526 mg = sv_magicext ((SV *)cv, (SV *)newAV (), CORO_MAGIC_type_cv, &coro_cv_vtbl, 0, 0);
527
528 av = (AV *)mg->mg_obj;
529
530 if (expect_false (AvFILLp (av) >= AvMAX (av)))
531 av_extend (av, AvFILLp (av) + 1);
532
533 AvARRAY (av)[++AvFILLp (av)] = (SV *)CvPADLIST (cv);
534}
535
536/** load & save, init *******************************************************/
537
538static void
539on_enterleave_call (pTHX_ SV *cb);
540
541static void
542load_perl (pTHX_ Coro__State c)
543{
544 perl_slots *slot = c->slot;
545 c->slot = 0;
546
547 PL_mainstack = c->mainstack;
548
549 GvSV (PL_defgv) = slot->defsv;
550 GvAV (PL_defgv) = slot->defav;
551 GvSV (PL_errgv) = slot->errsv;
552 GvSV (irsgv) = slot->irsgv;
553 GvHV (PL_hintgv) = slot->hinthv;
554
555 #define VAR(name,type) PL_ ## name = slot->name;
556 # include "state.h"
557 #undef VAR
558
559 {
560 dSP;
561
562 CV *cv;
563
564 /* now do the ugly restore mess */
565 while (expect_true (cv = (CV *)POPs))
566 {
567 put_padlist (aTHX_ cv); /* mark this padlist as available */
568 CvDEPTH (cv) = PTR2IV (POPs);
569 CvPADLIST (cv) = (AV *)POPs;
570 }
571
572 PUTBACK;
159 } 573 }
160}
161 574
162/* the next tow functions merely cache the padlists */ 575 slf_frame = c->slf_frame;
163STATIC void 576 CORO_THROW = c->except;
164get_padlist (CV *cv)
165{
166 SV **he = hv_fetch (padlist_cache, (void *)&cv, sizeof (CV *), 0);
167 577
168 if (he && AvFILLp ((AV *)*he) >= 0) 578 if (expect_false (c->on_enter))
169 CvPADLIST (cv) = (AV *)av_pop ((AV *)*he);
170 else
171 CvPADLIST (cv) = clone_padlist (CvPADLIST (cv));
172}
173
174STATIC void
175put_padlist (CV *cv)
176{
177 SV **he = hv_fetch (padlist_cache, (void *)&cv, sizeof (CV *), 1);
178
179 if (SvTYPE (*he) != SVt_PVAV)
180 {
181 SvREFCNT_dec (*he);
182 *he = (SV *)newAV ();
183 } 579 {
580 int i;
184 581
185 av_push ((AV *)*he, (SV *)CvPADLIST (cv)); 582 for (i = 0; i <= AvFILLp (c->on_enter); ++i)
583 on_enterleave_call (aTHX_ AvARRAY (c->on_enter)[i]);
584 }
186} 585}
187 586
188static void 587static void
189SAVE(pTHX_ Coro__State c) 588save_perl (pTHX_ Coro__State c)
190{ 589{
590 if (expect_false (c->on_leave))
591 {
592 int i;
593
594 for (i = AvFILLp (c->on_leave); i >= 0; --i)
595 on_enterleave_call (aTHX_ AvARRAY (c->on_leave)[i]);
596 }
597
598 c->except = CORO_THROW;
599 c->slf_frame = slf_frame;
600
191 { 601 {
192 dSP; 602 dSP;
193 I32 cxix = cxstack_ix; 603 I32 cxix = cxstack_ix;
604 PERL_CONTEXT *ccstk = cxstack;
194 PERL_SI *top_si = PL_curstackinfo; 605 PERL_SI *top_si = PL_curstackinfo;
195 PERL_CONTEXT *ccstk = cxstack;
196 606
197 /* 607 /*
198 * the worst thing you can imagine happens first - we have to save 608 * the worst thing you can imagine happens first - we have to save
199 * (and reinitialize) all cv's in the whole callchain :( 609 * (and reinitialize) all cv's in the whole callchain :(
200 */ 610 */
201 611
202 PUSHs (Nullsv); 612 XPUSHs (Nullsv);
203 /* this loop was inspired by pp_caller */ 613 /* this loop was inspired by pp_caller */
204 for (;;) 614 for (;;)
205 { 615 {
206 while (cxix >= 0) 616 while (expect_true (cxix >= 0))
207 { 617 {
208 PERL_CONTEXT *cx = &ccstk[cxix--]; 618 PERL_CONTEXT *cx = &ccstk[cxix--];
209 619
210 if (CxTYPE(cx) == CXt_SUB) 620 if (expect_true (CxTYPE (cx) == CXt_SUB) || expect_false (CxTYPE (cx) == CXt_FORMAT))
211 { 621 {
212 CV *cv = cx->blk_sub.cv; 622 CV *cv = cx->blk_sub.cv;
623
213 if (CvDEPTH(cv)) 624 if (expect_true (CvDEPTH (cv)))
214 { 625 {
215#ifdef USE_THREADS
216 XPUSHs ((SV *)CvOWNER(cv));
217#endif
218 EXTEND (SP, 3); 626 EXTEND (SP, 3);
219 PUSHs ((SV *)CvDEPTH(cv));
220 PUSHs ((SV *)CvPADLIST(cv)); 627 PUSHs ((SV *)CvPADLIST (cv));
628 PUSHs (INT2PTR (SV *, (IV)CvDEPTH (cv)));
221 PUSHs ((SV *)cv); 629 PUSHs ((SV *)cv);
222 630
223 get_padlist (cv);
224
225 CvDEPTH(cv) = 0; 631 CvDEPTH (cv) = 0;
226#ifdef USE_THREADS 632 get_padlist (aTHX_ cv);
227 CvOWNER(cv) = 0;
228 error must unlock this cv etc.. etc...
229 if you are here wondering about this error message then
230 the reason is that it will not work as advertised yet
231#endif
232 } 633 }
233 } 634 }
234 else if (CxTYPE(cx) == CXt_FORMAT) 635 }
636
637 if (expect_true (top_si->si_type == PERLSI_MAIN))
638 break;
639
640 top_si = top_si->si_prev;
641 ccstk = top_si->si_cxstack;
642 cxix = top_si->si_cxix;
643 }
644
645 PUTBACK;
646 }
647
648 /* allocate some space on the context stack for our purposes */
649 /* we manually unroll here, as usually 2 slots is enough */
650 if (SLOT_COUNT >= 1) CXINC;
651 if (SLOT_COUNT >= 2) CXINC;
652 if (SLOT_COUNT >= 3) CXINC;
653 {
654 int i;
655 for (i = 3; i < SLOT_COUNT; ++i)
656 CXINC;
657 }
658 cxstack_ix -= SLOT_COUNT; /* undo allocation */
659
660 c->mainstack = PL_mainstack;
661
662 {
663 perl_slots *slot = c->slot = (perl_slots *)(cxstack + cxstack_ix + 1);
664
665 slot->defav = GvAV (PL_defgv);
666 slot->defsv = DEFSV;
667 slot->errsv = ERRSV;
668 slot->irsgv = GvSV (irsgv);
669 slot->hinthv = GvHV (PL_hintgv);
670
671 #define VAR(name,type) slot->name = PL_ ## name;
672 # include "state.h"
673 #undef VAR
674 }
675}
676
677/*
678 * allocate various perl stacks. This is almost an exact copy
679 * of perl.c:init_stacks, except that it uses less memory
680 * on the (sometimes correct) assumption that coroutines do
681 * not usually need a lot of stackspace.
682 */
683#if CORO_PREFER_PERL_FUNCTIONS
684# define coro_init_stacks(thx) init_stacks ()
685#else
686static void
687coro_init_stacks (pTHX)
688{
689 PL_curstackinfo = new_stackinfo(32, 8);
690 PL_curstackinfo->si_type = PERLSI_MAIN;
691 PL_curstack = PL_curstackinfo->si_stack;
692 PL_mainstack = PL_curstack; /* remember in case we switch stacks */
693
694 PL_stack_base = AvARRAY(PL_curstack);
695 PL_stack_sp = PL_stack_base;
696 PL_stack_max = PL_stack_base + AvMAX(PL_curstack);
697
698 New(50,PL_tmps_stack,32,SV*);
699 PL_tmps_floor = -1;
700 PL_tmps_ix = -1;
701 PL_tmps_max = 32;
702
703 New(54,PL_markstack,16,I32);
704 PL_markstack_ptr = PL_markstack;
705 PL_markstack_max = PL_markstack + 16;
706
707#ifdef SET_MARK_OFFSET
708 SET_MARK_OFFSET;
709#endif
710
711 New(54,PL_scopestack,8,I32);
712 PL_scopestack_ix = 0;
713 PL_scopestack_max = 8;
714
715 New(54,PL_savestack,24,ANY);
716 PL_savestack_ix = 0;
717 PL_savestack_max = 24;
718
719#if !PERL_VERSION_ATLEAST (5,10,0)
720 New(54,PL_retstack,4,OP*);
721 PL_retstack_ix = 0;
722 PL_retstack_max = 4;
723#endif
724}
725#endif
726
727/*
728 * destroy the stacks, the callchain etc...
729 */
730static void
731coro_destruct_stacks (pTHX)
732{
733 while (PL_curstackinfo->si_next)
734 PL_curstackinfo = PL_curstackinfo->si_next;
735
736 while (PL_curstackinfo)
737 {
738 PERL_SI *p = PL_curstackinfo->si_prev;
739
740 if (!IN_DESTRUCT)
741 SvREFCNT_dec (PL_curstackinfo->si_stack);
742
743 Safefree (PL_curstackinfo->si_cxstack);
744 Safefree (PL_curstackinfo);
745 PL_curstackinfo = p;
746 }
747
748 Safefree (PL_tmps_stack);
749 Safefree (PL_markstack);
750 Safefree (PL_scopestack);
751 Safefree (PL_savestack);
752#if !PERL_VERSION_ATLEAST (5,10,0)
753 Safefree (PL_retstack);
754#endif
755}
756
757#define CORO_RSS \
758 rss += sizeof (SYM (curstackinfo)); \
759 rss += (SYM (curstackinfo->si_cxmax) + 1) * sizeof (PERL_CONTEXT); \
760 rss += sizeof (SV) + sizeof (struct xpvav) + (1 + AvMAX (SYM (curstack))) * sizeof (SV *); \
761 rss += SYM (tmps_max) * sizeof (SV *); \
762 rss += (SYM (markstack_max) - SYM (markstack_ptr)) * sizeof (I32); \
763 rss += SYM (scopestack_max) * sizeof (I32); \
764 rss += SYM (savestack_max) * sizeof (ANY);
765
766static size_t
767coro_rss (pTHX_ struct coro *coro)
768{
769 size_t rss = sizeof (*coro);
770
771 if (coro->mainstack)
772 {
773 if (coro->flags & CF_RUNNING)
774 {
775 #define SYM(sym) PL_ ## sym
776 CORO_RSS;
777 #undef SYM
778 }
779 else
780 {
781 #define SYM(sym) coro->slot->sym
782 CORO_RSS;
783 #undef SYM
784 }
785 }
786
787 return rss;
788}
789
790/** coroutine stack handling ************************************************/
791
792static int (*orig_sigelem_get) (pTHX_ SV *sv, MAGIC *mg);
793static int (*orig_sigelem_set) (pTHX_ SV *sv, MAGIC *mg);
794static int (*orig_sigelem_clr) (pTHX_ SV *sv, MAGIC *mg);
795
796/* apparently < 5.8.8 */
797#ifndef MgPV_nolen_const
798#define MgPV_nolen_const(mg) (((((int)(mg)->mg_len)) == HEf_SVKEY) ? \
799 SvPV_nolen((SV*)((mg)->mg_ptr)) : \
800 (const char*)(mg)->mg_ptr)
801#endif
802
803/*
804 * This overrides the default magic get method of %SIG elements.
805 * The original one doesn't provide for reading back of PL_diehook/PL_warnhook
806 * and instead of trying to save and restore the hash elements, we just provide
807 * readback here.
808 */
809static int
810coro_sigelem_get (pTHX_ SV *sv, MAGIC *mg)
811{
812 const char *s = MgPV_nolen_const (mg);
813
814 if (*s == '_')
815 {
816 SV **svp = 0;
817
818 if (strEQ (s, "__DIE__" )) svp = &PL_diehook;
819 if (strEQ (s, "__WARN__")) svp = &PL_warnhook;
820
821 if (svp)
822 {
823 sv_setsv (sv, *svp ? *svp : &PL_sv_undef);
824 return 0;
825 }
826 }
827
828 return orig_sigelem_get ? orig_sigelem_get (aTHX_ sv, mg) : 0;
829}
830
831static int
832coro_sigelem_clr (pTHX_ SV *sv, MAGIC *mg)
833{
834 const char *s = MgPV_nolen_const (mg);
835
836 if (*s == '_')
837 {
838 SV **svp = 0;
839
840 if (strEQ (s, "__DIE__" )) svp = &PL_diehook;
841 if (strEQ (s, "__WARN__")) svp = &PL_warnhook;
842
843 if (svp)
844 {
845 SV *old = *svp;
846 *svp = 0;
847 SvREFCNT_dec (old);
848 return 0;
849 }
850 }
851
852 return orig_sigelem_clr ? orig_sigelem_clr (aTHX_ sv, mg) : 0;
853}
854
855static int
856coro_sigelem_set (pTHX_ SV *sv, MAGIC *mg)
857{
858 const char *s = MgPV_nolen_const (mg);
859
860 if (*s == '_')
861 {
862 SV **svp = 0;
863
864 if (strEQ (s, "__DIE__" )) svp = &PL_diehook;
865 if (strEQ (s, "__WARN__")) svp = &PL_warnhook;
866
867 if (svp)
868 {
869 SV *old = *svp;
870 *svp = SvOK (sv) ? newSVsv (sv) : 0;
871 SvREFCNT_dec (old);
872 return 0;
873 }
874 }
875
876 return orig_sigelem_set ? orig_sigelem_set (aTHX_ sv, mg) : 0;
877}
878
879static void
880prepare_nop (pTHX_ struct coro_transfer_args *ta)
881{
882 /* kind of mega-hacky, but works */
883 ta->next = ta->prev = (struct coro *)ta;
884}
885
886static int
887slf_check_nop (pTHX_ struct CoroSLF *frame)
888{
889 return 0;
890}
891
892static int
893slf_check_repeat (pTHX_ struct CoroSLF *frame)
894{
895 return 1;
896}
897
898static UNOP coro_setup_op;
899
900static void NOINLINE /* noinline to keep it out of the transfer fast path */
901coro_setup (pTHX_ struct coro *coro)
902{
903 /*
904 * emulate part of the perl startup here.
905 */
906 coro_init_stacks (aTHX);
907
908 PL_runops = RUNOPS_DEFAULT;
909 PL_curcop = &PL_compiling;
910 PL_in_eval = EVAL_NULL;
911 PL_comppad = 0;
912 PL_comppad_name = 0;
913 PL_comppad_name_fill = 0;
914 PL_comppad_name_floor = 0;
915 PL_curpm = 0;
916 PL_curpad = 0;
917 PL_localizing = 0;
918 PL_dirty = 0;
919 PL_restartop = 0;
920#if PERL_VERSION_ATLEAST (5,10,0)
921 PL_parser = 0;
922#endif
923 PL_hints = 0;
924
925 /* recreate the die/warn hooks */
926 PL_diehook = 0; SvSetMagicSV (*hv_fetch (hv_sig, "__DIE__" , sizeof ("__DIE__" ) - 1, 1), rv_diehook );
927 PL_warnhook = 0; SvSetMagicSV (*hv_fetch (hv_sig, "__WARN__", sizeof ("__WARN__") - 1, 1), rv_warnhook);
928
929 GvSV (PL_defgv) = newSV (0);
930 GvAV (PL_defgv) = coro->args; coro->args = 0;
931 GvSV (PL_errgv) = newSV (0);
932 GvSV (irsgv) = newSVpvn ("\n", 1); sv_magic (GvSV (irsgv), (SV *)irsgv, PERL_MAGIC_sv, "/", 0);
933 GvHV (PL_hintgv) = 0;
934 PL_rs = newSVsv (GvSV (irsgv));
935 PL_defoutgv = (GV *)SvREFCNT_inc_NN (stdoutgv);
936
937 {
938 dSP;
939 UNOP myop;
940
941 Zero (&myop, 1, UNOP);
942 myop.op_next = Nullop;
943 myop.op_type = OP_ENTERSUB;
944 myop.op_flags = OPf_WANT_VOID;
945
946 PUSHMARK (SP);
947 PUSHs ((SV *)coro->startcv);
948 PUTBACK;
949 PL_op = (OP *)&myop;
950 PL_op = PL_ppaddr[OP_ENTERSUB](aTHX);
951 }
952
953 /* this newly created coroutine might be run on an existing cctx which most
954 * likely was suspended in pp_slf, so we have to emulate entering pp_slf here.
955 */
956 slf_frame.prepare = prepare_nop; /* provide a nop function for an eventual pp_slf */
957 slf_frame.check = slf_check_nop; /* signal pp_slf to not repeat */
958
959 /* and we have to provide the pp_slf op in any case, so pp_slf can skip it */
960 coro_setup_op.op_next = PL_op;
961 coro_setup_op.op_type = OP_ENTERSUB;
962 coro_setup_op.op_ppaddr = pp_slf;
963 /* no flags etc. required, as an init function won't be called */
964
965 PL_op = (OP *)&coro_setup_op;
966
967 /* copy throw, in case it was set before coro_setup */
968 CORO_THROW = coro->except;
969}
970
971static void
972coro_unwind_stacks (pTHX)
973{
974 if (!IN_DESTRUCT)
975 {
976 /* restore all saved variables and stuff */
977 LEAVE_SCOPE (0);
978 assert (PL_tmps_floor == -1);
979
980 /* free all temporaries */
981 FREETMPS;
982 assert (PL_tmps_ix == -1);
983
984 /* unwind all extra stacks */
985 POPSTACK_TO (PL_mainstack);
986
987 /* unwind main stack */
988 dounwind (-1);
989 }
990}
991
992static void
993coro_destruct_perl (pTHX_ struct coro *coro)
994{
995 SV *svf [9];
996
997 {
998 struct coro *current = SvSTATE_current;
999
1000 assert (("FATAL: tried to destroy currently running coroutine", coro->mainstack != PL_mainstack));
1001
1002 save_perl (aTHX_ current);
1003 load_perl (aTHX_ coro);
1004
1005 coro_unwind_stacks (aTHX);
1006 coro_destruct_stacks (aTHX);
1007
1008 // now save some sv's to be free'd later
1009 svf [0] = GvSV (PL_defgv);
1010 svf [1] = (SV *)GvAV (PL_defgv);
1011 svf [2] = GvSV (PL_errgv);
1012 svf [3] = (SV *)PL_defoutgv;
1013 svf [4] = PL_rs;
1014 svf [5] = GvSV (irsgv);
1015 svf [6] = (SV *)GvHV (PL_hintgv);
1016 svf [7] = PL_diehook;
1017 svf [8] = PL_warnhook;
1018 assert (9 == sizeof (svf) / sizeof (*svf));
1019
1020 load_perl (aTHX_ current);
1021 }
1022
1023 {
1024 int i;
1025
1026 for (i = 0; i < sizeof (svf) / sizeof (*svf); ++i)
1027 SvREFCNT_dec (svf [i]);
1028
1029 SvREFCNT_dec (coro->saved_deffh);
1030 SvREFCNT_dec (coro->rouse_cb);
1031 SvREFCNT_dec (coro->invoke_cb);
1032 SvREFCNT_dec (coro->invoke_av);
1033 }
1034}
1035
1036INLINE void
1037free_coro_mortal (pTHX)
1038{
1039 if (expect_true (coro_mortal))
1040 {
1041 SvREFCNT_dec (coro_mortal);
1042 coro_mortal = 0;
1043 }
1044}
1045
1046static int
1047runops_trace (pTHX)
1048{
1049 COP *oldcop = 0;
1050 int oldcxix = -2;
1051
1052 while ((PL_op = CALL_FPTR (PL_op->op_ppaddr) (aTHX)))
1053 {
1054 PERL_ASYNC_CHECK ();
1055
1056 if (cctx_current->flags & CC_TRACE_ALL)
1057 {
1058 if (PL_op->op_type == OP_LEAVESUB && cctx_current->flags & CC_TRACE_SUB)
1059 {
1060 PERL_CONTEXT *cx = &cxstack[cxstack_ix];
1061 SV **bot, **top;
1062 AV *av = newAV (); /* return values */
1063 SV **cb;
1064 dSP;
1065
1066 GV *gv = CvGV (cx->blk_sub.cv);
1067 SV *fullname = sv_2mortal (newSV (0));
1068 if (isGV (gv))
1069 gv_efullname3 (fullname, gv, 0);
1070
1071 bot = PL_stack_base + cx->blk_oldsp + 1;
1072 top = cx->blk_gimme == G_ARRAY ? SP + 1
1073 : cx->blk_gimme == G_SCALAR ? bot + 1
1074 : bot;
1075
1076 av_extend (av, top - bot);
1077 while (bot < top)
1078 av_push (av, SvREFCNT_inc_NN (*bot++));
1079
1080 PL_runops = RUNOPS_DEFAULT;
1081 ENTER;
1082 SAVETMPS;
1083 EXTEND (SP, 3);
1084 PUSHMARK (SP);
1085 PUSHs (&PL_sv_no);
1086 PUSHs (fullname);
1087 PUSHs (sv_2mortal (newRV_noinc ((SV *)av)));
1088 PUTBACK;
1089 cb = hv_fetch ((HV *)SvRV (coro_current), "_trace_sub_cb", sizeof ("_trace_sub_cb") - 1, 0);
1090 if (cb) call_sv (*cb, G_KEEPERR | G_EVAL | G_VOID | G_DISCARD);
1091 SPAGAIN;
1092 FREETMPS;
1093 LEAVE;
1094 PL_runops = runops_trace;
1095 }
1096
1097 if (oldcop != PL_curcop)
1098 {
1099 oldcop = PL_curcop;
1100
1101 if (PL_curcop != &PL_compiling)
1102 {
1103 SV **cb;
1104
1105 if (oldcxix != cxstack_ix && cctx_current->flags & CC_TRACE_SUB)
1106 {
1107 PERL_CONTEXT *cx = &cxstack[cxstack_ix];
1108
1109 if (CxTYPE (cx) == CXt_SUB && oldcxix < cxstack_ix)
1110 {
1111 dSP;
1112 GV *gv = CvGV (cx->blk_sub.cv);
1113 SV *fullname = sv_2mortal (newSV (0));
1114
1115 if (isGV (gv))
1116 gv_efullname3 (fullname, gv, 0);
1117
1118 PL_runops = RUNOPS_DEFAULT;
1119 ENTER;
1120 SAVETMPS;
1121 EXTEND (SP, 3);
1122 PUSHMARK (SP);
1123 PUSHs (&PL_sv_yes);
1124 PUSHs (fullname);
1125 PUSHs (CxHASARGS (cx) ? sv_2mortal (newRV_inc ((SV *)cx->blk_sub.argarray)) : &PL_sv_undef);
1126 PUTBACK;
1127 cb = hv_fetch ((HV *)SvRV (coro_current), "_trace_sub_cb", sizeof ("_trace_sub_cb") - 1, 0);
1128 if (cb) call_sv (*cb, G_KEEPERR | G_EVAL | G_VOID | G_DISCARD);
1129 SPAGAIN;
1130 FREETMPS;
1131 LEAVE;
1132 PL_runops = runops_trace;
1133 }
1134
1135 oldcxix = cxstack_ix;
1136 }
1137
1138 if (cctx_current->flags & CC_TRACE_LINE)
1139 {
1140 dSP;
1141
1142 PL_runops = RUNOPS_DEFAULT;
1143 ENTER;
1144 SAVETMPS;
1145 EXTEND (SP, 3);
1146 PL_runops = RUNOPS_DEFAULT;
1147 PUSHMARK (SP);
1148 PUSHs (sv_2mortal (newSVpv (OutCopFILE (oldcop), 0)));
1149 PUSHs (sv_2mortal (newSViv (CopLINE (oldcop))));
1150 PUTBACK;
1151 cb = hv_fetch ((HV *)SvRV (coro_current), "_trace_line_cb", sizeof ("_trace_line_cb") - 1, 0);
1152 if (cb) call_sv (*cb, G_KEEPERR | G_EVAL | G_VOID | G_DISCARD);
1153 SPAGAIN;
1154 FREETMPS;
1155 LEAVE;
1156 PL_runops = runops_trace;
1157 }
1158 }
1159 }
1160 }
1161 }
1162
1163 TAINT_NOT;
1164 return 0;
1165}
1166
1167static struct CoroSLF cctx_ssl_frame;
1168
1169static void
1170slf_prepare_set_stacklevel (pTHX_ struct coro_transfer_args *ta)
1171{
1172 ta->prev = 0;
1173}
1174
1175static int
1176slf_check_set_stacklevel (pTHX_ struct CoroSLF *frame)
1177{
1178 *frame = cctx_ssl_frame;
1179
1180 return frame->check (aTHX_ frame); /* execute the restored frame - there must be one */
1181}
1182
1183/* initialises PL_top_env and injects a pseudo-slf-call to set the stacklevel */
1184static void NOINLINE
1185cctx_prepare (pTHX)
1186{
1187 PL_top_env = &PL_start_env;
1188
1189 if (cctx_current->flags & CC_TRACE)
1190 PL_runops = runops_trace;
1191
1192 /* we already must be executing an SLF op, there is no other valid way
1193 * that can lead to creation of a new cctx */
1194 assert (("FATAL: can't prepare slf-less cctx in Coro module (please report)",
1195 slf_frame.prepare && PL_op->op_ppaddr == pp_slf));
1196
1197 /* we must emulate leaving pp_slf, which is done inside slf_check_set_stacklevel */
1198 cctx_ssl_frame = slf_frame;
1199
1200 slf_frame.prepare = slf_prepare_set_stacklevel;
1201 slf_frame.check = slf_check_set_stacklevel;
1202}
1203
1204/* the tail of transfer: execute stuff we can only do after a transfer */
1205INLINE void
1206transfer_tail (pTHX)
1207{
1208 free_coro_mortal (aTHX);
1209}
1210
1211/*
1212 * this is a _very_ stripped down perl interpreter ;)
1213 */
1214static void
1215cctx_run (void *arg)
1216{
1217#ifdef USE_ITHREADS
1218# if CORO_PTHREAD
1219 PERL_SET_CONTEXT (coro_thx);
1220# endif
1221#endif
1222 {
1223 dTHX;
1224
1225 /* normally we would need to skip the entersub here */
1226 /* not doing so will re-execute it, which is exactly what we want */
1227 /* PL_nop = PL_nop->op_next */
1228
1229 /* inject a fake subroutine call to cctx_init */
1230 cctx_prepare (aTHX);
1231
1232 /* cctx_run is the alternative tail of transfer() */
1233 transfer_tail (aTHX);
1234
1235 /* somebody or something will hit me for both perl_run and PL_restartop */
1236 PL_restartop = PL_op;
1237 perl_run (PL_curinterp);
1238 /*
1239 * Unfortunately, there is no way to get at the return values of the
1240 * coro body here, as perl_run destroys these
1241 */
1242
1243 /*
1244 * If perl-run returns we assume exit() was being called or the coro
1245 * fell off the end, which seems to be the only valid (non-bug)
1246 * reason for perl_run to return. We try to exit by jumping to the
1247 * bootstrap-time "top" top_env, as we cannot restore the "main"
1248 * coroutine as Coro has no such concept.
1249 * This actually isn't valid with the pthread backend, but OSes requiring
1250 * that backend are too broken to do it in a standards-compliant way.
1251 */
1252 PL_top_env = main_top_env;
1253 JMPENV_JUMP (2); /* I do not feel well about the hardcoded 2 at all */
1254 }
1255}
1256
1257static coro_cctx *
1258cctx_new ()
1259{
1260 coro_cctx *cctx;
1261
1262 ++cctx_count;
1263 New (0, cctx, 1, coro_cctx);
1264
1265 cctx->gen = cctx_gen;
1266 cctx->flags = 0;
1267 cctx->idle_sp = 0; /* can be accessed by transfer between cctx_run and set_stacklevel, on throw */
1268
1269 return cctx;
1270}
1271
1272/* create a new cctx only suitable as source */
1273static coro_cctx *
1274cctx_new_empty ()
1275{
1276 coro_cctx *cctx = cctx_new ();
1277
1278 cctx->sptr = 0;
1279 coro_create (&cctx->cctx, 0, 0, 0, 0);
1280
1281 return cctx;
1282}
1283
1284/* create a new cctx suitable as destination/running a perl interpreter */
1285static coro_cctx *
1286cctx_new_run ()
1287{
1288 coro_cctx *cctx = cctx_new ();
1289 void *stack_start;
1290 size_t stack_size;
1291
1292#if HAVE_MMAP
1293 cctx->ssize = ((cctx_stacksize * sizeof (long) + PAGESIZE - 1) / PAGESIZE + CORO_STACKGUARD) * PAGESIZE;
1294 /* mmap supposedly does allocate-on-write for us */
1295 cctx->sptr = mmap (0, cctx->ssize, PROT_EXEC|PROT_READ|PROT_WRITE, MAP_PRIVATE|MAP_ANONYMOUS, 0, 0);
1296
1297 if (cctx->sptr != (void *)-1)
1298 {
1299 #if CORO_STACKGUARD
1300 mprotect (cctx->sptr, CORO_STACKGUARD * PAGESIZE, PROT_NONE);
1301 #endif
1302 stack_start = (char *)cctx->sptr + CORO_STACKGUARD * PAGESIZE;
1303 stack_size = cctx->ssize - CORO_STACKGUARD * PAGESIZE;
1304 cctx->flags |= CC_MAPPED;
1305 }
1306 else
1307#endif
1308 {
1309 cctx->ssize = cctx_stacksize * (long)sizeof (long);
1310 New (0, cctx->sptr, cctx_stacksize, long);
1311
1312 if (!cctx->sptr)
1313 {
1314 perror ("FATAL: unable to allocate stack for coroutine, exiting.");
1315 _exit (EXIT_FAILURE);
1316 }
1317
1318 stack_start = cctx->sptr;
1319 stack_size = cctx->ssize;
1320 }
1321
1322 #if CORO_USE_VALGRIND
1323 cctx->valgrind_id = VALGRIND_STACK_REGISTER ((char *)stack_start, (char *)stack_start + stack_size);
1324 #endif
1325
1326 coro_create (&cctx->cctx, cctx_run, (void *)cctx, stack_start, stack_size);
1327
1328 return cctx;
1329}
1330
1331static void
1332cctx_destroy (coro_cctx *cctx)
1333{
1334 if (!cctx)
1335 return;
1336
1337 assert (("FATAL: tried to destroy current cctx", cctx != cctx_current));//D temporary?
1338
1339 --cctx_count;
1340 coro_destroy (&cctx->cctx);
1341
1342 /* coro_transfer creates new, empty cctx's */
1343 if (cctx->sptr)
1344 {
1345 #if CORO_USE_VALGRIND
1346 VALGRIND_STACK_DEREGISTER (cctx->valgrind_id);
1347 #endif
1348
1349#if HAVE_MMAP
1350 if (cctx->flags & CC_MAPPED)
1351 munmap (cctx->sptr, cctx->ssize);
1352 else
1353#endif
1354 Safefree (cctx->sptr);
1355 }
1356
1357 Safefree (cctx);
1358}
1359
1360/* wether this cctx should be destructed */
1361#define CCTX_EXPIRED(cctx) ((cctx)->gen != cctx_gen || ((cctx)->flags & CC_NOREUSE))
1362
1363static coro_cctx *
1364cctx_get (pTHX)
1365{
1366 while (expect_true (cctx_first))
1367 {
1368 coro_cctx *cctx = cctx_first;
1369 cctx_first = cctx->next;
1370 --cctx_idle;
1371
1372 if (expect_true (!CCTX_EXPIRED (cctx)))
1373 return cctx;
1374
1375 cctx_destroy (cctx);
1376 }
1377
1378 return cctx_new_run ();
1379}
1380
1381static void
1382cctx_put (coro_cctx *cctx)
1383{
1384 assert (("FATAL: cctx_put called on non-initialised cctx in Coro (please report)", cctx->sptr));
1385
1386 /* free another cctx if overlimit */
1387 if (expect_false (cctx_idle >= cctx_max_idle))
1388 {
1389 coro_cctx *first = cctx_first;
1390 cctx_first = first->next;
1391 --cctx_idle;
1392
1393 cctx_destroy (first);
1394 }
1395
1396 ++cctx_idle;
1397 cctx->next = cctx_first;
1398 cctx_first = cctx;
1399}
1400
1401/** coroutine switching *****************************************************/
1402
1403static void
1404transfer_check (pTHX_ struct coro *prev, struct coro *next)
1405{
1406 /* TODO: throwing up here is considered harmful */
1407
1408 if (expect_true (prev != next))
1409 {
1410 if (expect_false (!(prev->flags & (CF_RUNNING | CF_NEW))))
1411 croak ("Coro::State::transfer called with a blocked prev Coro::State, but can only transfer from running or new states,");
1412
1413 if (expect_false (next->flags & (CF_RUNNING | CF_DESTROYED | CF_SUSPENDED)))
1414 croak ("Coro::State::transfer called with running, destroyed or suspended next Coro::State, but can only transfer to inactive states,");
1415
1416#if !PERL_VERSION_ATLEAST (5,10,0)
1417 if (expect_false (PL_lex_state != LEX_NOTPARSING))
1418 croak ("Coro::State::transfer called while parsing, but this is not supported in your perl version,");
1419#endif
1420 }
1421}
1422
1423/* always use the TRANSFER macro */
1424static void NOINLINE /* noinline so we have a fixed stackframe */
1425transfer (pTHX_ struct coro *prev, struct coro *next, int force_cctx)
1426{
1427 dSTACKLEVEL;
1428
1429 /* sometimes transfer is only called to set idle_sp */
1430 if (expect_false (!prev))
1431 {
1432 cctx_current->idle_sp = STACKLEVEL;
1433 assert (cctx_current->idle_te = PL_top_env); /* just for the side-effect when asserts are enabled */
1434 }
1435 else if (expect_true (prev != next))
1436 {
1437 coro_cctx *cctx_prev;
1438
1439 if (expect_false (prev->flags & CF_NEW))
1440 {
1441 /* create a new empty/source context */
1442 prev->flags &= ~CF_NEW;
1443 prev->flags |= CF_RUNNING;
1444 }
1445
1446 prev->flags &= ~CF_RUNNING;
1447 next->flags |= CF_RUNNING;
1448
1449 /* first get rid of the old state */
1450 save_perl (aTHX_ prev);
1451
1452 if (expect_false (next->flags & CF_NEW))
1453 {
1454 /* need to start coroutine */
1455 next->flags &= ~CF_NEW;
1456 /* setup coroutine call */
1457 coro_setup (aTHX_ next);
1458 }
1459 else
1460 load_perl (aTHX_ next);
1461
1462 /* possibly untie and reuse the cctx */
1463 if (expect_true (
1464 cctx_current->idle_sp == STACKLEVEL
1465 && !(cctx_current->flags & CC_TRACE)
1466 && !force_cctx
1467 ))
1468 {
1469 /* I assume that stacklevel is a stronger indicator than PL_top_env changes */
1470 assert (("FATAL: current top_env must equal previous top_env in Coro (please report)", PL_top_env == cctx_current->idle_te));
1471
1472 /* if the cctx is about to be destroyed we need to make sure we won't see it in cctx_get. */
1473 /* without this the next cctx_get might destroy the running cctx while still in use */
1474 if (expect_false (CCTX_EXPIRED (cctx_current)))
1475 if (expect_true (!next->cctx))
1476 next->cctx = cctx_get (aTHX);
1477
1478 cctx_put (cctx_current);
1479 }
1480 else
1481 prev->cctx = cctx_current;
1482
1483 ++next->usecount;
1484
1485 cctx_prev = cctx_current;
1486 cctx_current = expect_false (next->cctx) ? next->cctx : cctx_get (aTHX);
1487
1488 next->cctx = 0;
1489
1490 if (expect_false (cctx_prev != cctx_current))
1491 {
1492 cctx_prev->top_env = PL_top_env;
1493 PL_top_env = cctx_current->top_env;
1494 coro_transfer (&cctx_prev->cctx, &cctx_current->cctx);
1495 }
1496
1497 transfer_tail (aTHX);
1498 }
1499}
1500
1501#define TRANSFER(ta, force_cctx) transfer (aTHX_ (ta).prev, (ta).next, (force_cctx))
1502#define TRANSFER_CHECK(ta) transfer_check (aTHX_ (ta).prev, (ta).next)
1503
1504/** high level stuff ********************************************************/
1505
1506static int
1507coro_state_destroy (pTHX_ struct coro *coro)
1508{
1509 if (coro->flags & CF_DESTROYED)
1510 return 0;
1511
1512 if (coro->on_destroy && !PL_dirty)
1513 coro->on_destroy (aTHX_ coro);
1514
1515 coro->flags |= CF_DESTROYED;
1516
1517 if (coro->flags & CF_READY)
1518 {
1519 /* reduce nready, as destroying a ready coro effectively unreadies it */
1520 /* alternative: look through all ready queues and remove the coro */
1521 --coro_nready;
1522 }
1523 else
1524 coro->flags |= CF_READY; /* make sure it is NOT put into the readyqueue */
1525
1526 if (coro->mainstack
1527 && coro->mainstack != main_mainstack
1528 && coro->slot
1529 && !PL_dirty)
1530 coro_destruct_perl (aTHX_ coro);
1531
1532 cctx_destroy (coro->cctx);
1533 SvREFCNT_dec (coro->startcv);
1534 SvREFCNT_dec (coro->args);
1535 SvREFCNT_dec (CORO_THROW);
1536
1537 if (coro->next) coro->next->prev = coro->prev;
1538 if (coro->prev) coro->prev->next = coro->next;
1539 if (coro == coro_first) coro_first = coro->next;
1540
1541 return 1;
1542}
1543
1544static int
1545coro_state_free (pTHX_ SV *sv, MAGIC *mg)
1546{
1547 struct coro *coro = (struct coro *)mg->mg_ptr;
1548 mg->mg_ptr = 0;
1549
1550 coro->hv = 0;
1551
1552 if (--coro->refcnt < 0)
1553 {
1554 coro_state_destroy (aTHX_ coro);
1555 Safefree (coro);
1556 }
1557
1558 return 0;
1559}
1560
1561static int
1562coro_state_dup (pTHX_ MAGIC *mg, CLONE_PARAMS *params)
1563{
1564 struct coro *coro = (struct coro *)mg->mg_ptr;
1565
1566 ++coro->refcnt;
1567
1568 return 0;
1569}
1570
1571static MGVTBL coro_state_vtbl = {
1572 0, 0, 0, 0,
1573 coro_state_free,
1574 0,
1575#ifdef MGf_DUP
1576 coro_state_dup,
1577#else
1578# define MGf_DUP 0
1579#endif
1580};
1581
1582static void
1583prepare_transfer (pTHX_ struct coro_transfer_args *ta, SV *prev_sv, SV *next_sv)
1584{
1585 ta->prev = SvSTATE (prev_sv);
1586 ta->next = SvSTATE (next_sv);
1587 TRANSFER_CHECK (*ta);
1588}
1589
1590static void
1591api_transfer (pTHX_ SV *prev_sv, SV *next_sv)
1592{
1593 struct coro_transfer_args ta;
1594
1595 prepare_transfer (aTHX_ &ta, prev_sv, next_sv);
1596 TRANSFER (ta, 1);
1597}
1598
1599/*****************************************************************************/
1600/* gensub: simple closure generation utility */
1601
1602#define GENSUB_ARG CvXSUBANY (cv).any_ptr
1603
1604/* create a closure from XS, returns a code reference */
1605/* the arg can be accessed via GENSUB_ARG from the callback */
1606/* the callback must use dXSARGS/XSRETURN */
1607static SV *
1608gensub (pTHX_ void (*xsub)(pTHX_ CV *), void *arg)
1609{
1610 CV *cv = (CV *)newSV (0);
1611
1612 sv_upgrade ((SV *)cv, SVt_PVCV);
1613
1614 CvANON_on (cv);
1615 CvISXSUB_on (cv);
1616 CvXSUB (cv) = xsub;
1617 GENSUB_ARG = arg;
1618
1619 return newRV_noinc ((SV *)cv);
1620}
1621
1622/** Coro ********************************************************************/
1623
1624INLINE void
1625coro_enq (pTHX_ struct coro *coro)
1626{
1627 struct coro **ready = coro_ready [coro->prio - PRIO_MIN];
1628
1629 SvREFCNT_inc_NN (coro->hv);
1630
1631 coro->next_ready = 0;
1632 *(ready [0] ? &ready [1]->next_ready : &ready [0]) = coro;
1633 ready [1] = coro;
1634}
1635
1636INLINE struct coro *
1637coro_deq (pTHX)
1638{
1639 int prio;
1640
1641 for (prio = PRIO_MAX - PRIO_MIN + 1; --prio >= 0; )
1642 {
1643 struct coro **ready = coro_ready [prio];
1644
1645 if (ready [0])
1646 {
1647 struct coro *coro = ready [0];
1648 ready [0] = coro->next_ready;
1649 return coro;
1650 }
1651 }
1652
1653 return 0;
1654}
1655
1656static int
1657api_ready (pTHX_ SV *coro_sv)
1658{
1659 struct coro *coro;
1660 SV *sv_hook;
1661 void (*xs_hook)(void);
1662
1663 coro = SvSTATE (coro_sv);
1664
1665 if (coro->flags & CF_READY)
1666 return 0;
1667
1668 coro->flags |= CF_READY;
1669
1670 sv_hook = coro_nready ? 0 : coro_readyhook;
1671 xs_hook = coro_nready ? 0 : coroapi.readyhook;
1672
1673 coro_enq (aTHX_ coro);
1674 ++coro_nready;
1675
1676 if (sv_hook)
1677 {
1678 dSP;
1679
1680 ENTER;
1681 SAVETMPS;
1682
1683 PUSHMARK (SP);
1684 PUTBACK;
1685 call_sv (sv_hook, G_VOID | G_DISCARD);
1686
1687 FREETMPS;
1688 LEAVE;
1689 }
1690
1691 if (xs_hook)
1692 xs_hook ();
1693
1694 return 1;
1695}
1696
1697static int
1698api_is_ready (pTHX_ SV *coro_sv)
1699{
1700 return !!(SvSTATE (coro_sv)->flags & CF_READY);
1701}
1702
1703/* expects to own a reference to next->hv */
1704INLINE void
1705prepare_schedule_to (pTHX_ struct coro_transfer_args *ta, struct coro *next)
1706{
1707 SV *prev_sv = SvRV (coro_current);
1708
1709 ta->prev = SvSTATE_hv (prev_sv);
1710 ta->next = next;
1711
1712 TRANSFER_CHECK (*ta);
1713
1714 SvRV_set (coro_current, (SV *)next->hv);
1715
1716 free_coro_mortal (aTHX);
1717 coro_mortal = prev_sv;
1718}
1719
1720static void
1721prepare_schedule (pTHX_ struct coro_transfer_args *ta)
1722{
1723 for (;;)
1724 {
1725 struct coro *next = coro_deq (aTHX);
1726
1727 if (expect_true (next))
1728 {
1729 /* cannot transfer to destroyed coros, skip and look for next */
1730 if (expect_false (next->flags & (CF_DESTROYED | CF_SUSPENDED)))
1731 SvREFCNT_dec (next->hv); /* coro_nready has already been taken care of by destroy */
1732 else
1733 {
1734 next->flags &= ~CF_READY;
1735 --coro_nready;
1736
1737 prepare_schedule_to (aTHX_ ta, next);
1738 break;
1739 }
1740 }
1741 else
1742 {
1743 /* nothing to schedule: call the idle handler */
1744 if (SvROK (sv_idle)
1745 && SvOBJECT (SvRV (sv_idle)))
1746 {
1747 ++coro_nready; /* hack so that api_ready doesn't invoke ready hook */
1748 api_ready (aTHX_ SvRV (sv_idle));
1749 --coro_nready;
1750 }
1751 else
1752 {
1753 dSP;
1754
1755 ENTER;
1756 SAVETMPS;
1757
1758 PUSHMARK (SP);
1759 PUTBACK;
1760 call_sv (sv_idle, G_VOID | G_DISCARD);
1761
1762 FREETMPS;
1763 LEAVE;
1764 }
1765 }
1766 }
1767}
1768
1769INLINE void
1770prepare_cede (pTHX_ struct coro_transfer_args *ta)
1771{
1772 api_ready (aTHX_ coro_current);
1773 prepare_schedule (aTHX_ ta);
1774}
1775
1776INLINE void
1777prepare_cede_notself (pTHX_ struct coro_transfer_args *ta)
1778{
1779 SV *prev = SvRV (coro_current);
1780
1781 if (coro_nready)
1782 {
1783 prepare_schedule (aTHX_ ta);
1784 api_ready (aTHX_ prev);
1785 }
1786 else
1787 prepare_nop (aTHX_ ta);
1788}
1789
1790static void
1791api_schedule (pTHX)
1792{
1793 struct coro_transfer_args ta;
1794
1795 prepare_schedule (aTHX_ &ta);
1796 TRANSFER (ta, 1);
1797}
1798
1799static void
1800api_schedule_to (pTHX_ SV *coro_sv)
1801{
1802 struct coro_transfer_args ta;
1803 struct coro *next = SvSTATE (coro_sv);
1804
1805 SvREFCNT_inc_NN (coro_sv);
1806 prepare_schedule_to (aTHX_ &ta, next);
1807}
1808
1809static int
1810api_cede (pTHX)
1811{
1812 struct coro_transfer_args ta;
1813
1814 prepare_cede (aTHX_ &ta);
1815
1816 if (expect_true (ta.prev != ta.next))
1817 {
1818 TRANSFER (ta, 1);
1819 return 1;
1820 }
1821 else
1822 return 0;
1823}
1824
1825static int
1826api_cede_notself (pTHX)
1827{
1828 if (coro_nready)
1829 {
1830 struct coro_transfer_args ta;
1831
1832 prepare_cede_notself (aTHX_ &ta);
1833 TRANSFER (ta, 1);
1834 return 1;
1835 }
1836 else
1837 return 0;
1838}
1839
1840static void
1841api_trace (pTHX_ SV *coro_sv, int flags)
1842{
1843 struct coro *coro = SvSTATE (coro_sv);
1844
1845 if (coro->flags & CF_RUNNING)
1846 croak ("cannot enable tracing on a running coroutine, caught");
1847
1848 if (flags & CC_TRACE)
1849 {
1850 if (!coro->cctx)
1851 coro->cctx = cctx_new_run ();
1852 else if (!(coro->cctx->flags & CC_TRACE))
1853 croak ("cannot enable tracing on coroutine with custom stack, caught");
1854
1855 coro->cctx->flags |= CC_NOREUSE | (flags & (CC_TRACE | CC_TRACE_ALL));
1856 }
1857 else if (coro->cctx && coro->cctx->flags & CC_TRACE)
1858 {
1859 coro->cctx->flags &= ~(CC_TRACE | CC_TRACE_ALL);
1860
1861 if (coro->flags & CF_RUNNING)
1862 PL_runops = RUNOPS_DEFAULT;
1863 else
1864 coro->slot->runops = RUNOPS_DEFAULT;
1865 }
1866}
1867
1868static void
1869coro_call_on_destroy (pTHX_ struct coro *coro)
1870{
1871 SV **on_destroyp = hv_fetch (coro->hv, "_on_destroy", sizeof ("_on_destroy") - 1, 0);
1872 SV **statusp = hv_fetch (coro->hv, "_status", sizeof ("_status") - 1, 0);
1873
1874 if (on_destroyp)
1875 {
1876 AV *on_destroy = (AV *)SvRV (*on_destroyp);
1877
1878 while (AvFILLp (on_destroy) >= 0)
1879 {
1880 dSP; /* don't disturb outer sp */
1881 SV *cb = av_pop (on_destroy);
1882
1883 PUSHMARK (SP);
1884
1885 if (statusp)
1886 {
1887 int i;
1888 AV *status = (AV *)SvRV (*statusp);
1889 EXTEND (SP, AvFILLp (status) + 1);
1890
1891 for (i = 0; i <= AvFILLp (status); ++i)
1892 PUSHs (AvARRAY (status)[i]);
1893 }
1894
1895 PUTBACK;
1896 call_sv (sv_2mortal (cb), G_VOID | G_DISCARD);
1897 }
1898 }
1899}
1900
1901static void
1902slf_init_terminate (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
1903{
1904 int i;
1905 HV *hv = (HV *)SvRV (coro_current);
1906 AV *av = newAV ();
1907
1908 av_extend (av, items - 1);
1909 for (i = 0; i < items; ++i)
1910 av_push (av, SvREFCNT_inc_NN (arg [i]));
1911
1912 hv_store (hv, "_status", sizeof ("_status") - 1, newRV_noinc ((SV *)av), 0);
1913
1914 av_push (av_destroy, (SV *)newRV_inc ((SV *)hv)); /* RVinc for perl */
1915 api_ready (aTHX_ sv_manager);
1916
1917 frame->prepare = prepare_schedule;
1918 frame->check = slf_check_repeat;
1919
1920 /* as a minor optimisation, we could unwind all stacks here */
1921 /* but that puts extra pressure on pp_slf, and is not worth much */
1922 /*coro_unwind_stacks (aTHX);*/
1923}
1924
1925/*****************************************************************************/
1926/* async pool handler */
1927
1928static int
1929slf_check_pool_handler (pTHX_ struct CoroSLF *frame)
1930{
1931 HV *hv = (HV *)SvRV (coro_current);
1932 struct coro *coro = (struct coro *)frame->data;
1933
1934 if (!coro->invoke_cb)
1935 return 1; /* loop till we have invoke */
1936 else
1937 {
1938 hv_store (hv, "desc", sizeof ("desc") - 1,
1939 newSVpvn ("[async_pool]", sizeof ("[async_pool]") - 1), 0);
1940
1941 coro->saved_deffh = SvREFCNT_inc_NN ((SV *)PL_defoutgv);
1942
1943 {
1944 dSP;
1945 XPUSHs (sv_2mortal (coro->invoke_cb)); coro->invoke_cb = 0;
1946 PUTBACK;
1947 }
1948
1949 SvREFCNT_dec (GvAV (PL_defgv));
1950 GvAV (PL_defgv) = coro->invoke_av;
1951 coro->invoke_av = 0;
1952
1953 return 0;
1954 }
1955}
1956
1957static void
1958slf_init_pool_handler (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
1959{
1960 HV *hv = (HV *)SvRV (coro_current);
1961 struct coro *coro = SvSTATE_hv ((SV *)hv);
1962
1963 if (expect_true (coro->saved_deffh))
1964 {
1965 /* subsequent iteration */
1966 SvREFCNT_dec ((SV *)PL_defoutgv); PL_defoutgv = (GV *)coro->saved_deffh;
1967 coro->saved_deffh = 0;
1968
1969 if (coro_rss (aTHX_ coro) > SvUV (sv_pool_rss)
1970 || av_len (av_async_pool) + 1 >= SvIV (sv_pool_size))
1971 {
1972 coro->invoke_cb = SvREFCNT_inc_NN ((SV *)cv_coro_terminate);
1973 coro->invoke_av = newAV ();
1974
1975 frame->prepare = prepare_nop;
1976 }
1977 else
1978 {
1979 av_clear (GvAV (PL_defgv));
1980 hv_store (hv, "desc", sizeof ("desc") - 1, SvREFCNT_inc_NN (sv_async_pool_idle), 0);
1981
1982 coro->prio = 0;
1983
1984 if (coro->cctx && (coro->cctx->flags & CC_TRACE))
1985 api_trace (aTHX_ coro_current, 0);
1986
1987 frame->prepare = prepare_schedule;
1988 av_push (av_async_pool, SvREFCNT_inc (hv));
1989 }
1990 }
1991 else
1992 {
1993 /* first iteration, simply fall through */
1994 frame->prepare = prepare_nop;
1995 }
1996
1997 frame->check = slf_check_pool_handler;
1998 frame->data = (void *)coro;
1999}
2000
2001/*****************************************************************************/
2002/* rouse callback */
2003
2004#define CORO_MAGIC_type_rouse PERL_MAGIC_ext
2005
2006static void
2007coro_rouse_callback (pTHX_ CV *cv)
2008{
2009 dXSARGS;
2010 SV *data = (SV *)GENSUB_ARG;
2011
2012 if (SvTYPE (SvRV (data)) != SVt_PVAV)
2013 {
2014 /* first call, set args */
2015 SV *coro = SvRV (data);
2016 AV *av = newAV ();
2017
2018 SvRV_set (data, (SV *)av);
2019
2020 /* better take a full copy of the arguments */
2021 while (items--)
2022 av_store (av, items, newSVsv (ST (items)));
2023
2024 api_ready (aTHX_ coro);
2025 SvREFCNT_dec (coro);
2026 }
2027
2028 XSRETURN_EMPTY;
2029}
2030
2031static int
2032slf_check_rouse_wait (pTHX_ struct CoroSLF *frame)
2033{
2034 SV *data = (SV *)frame->data;
2035
2036 if (CORO_THROW)
2037 return 0;
2038
2039 if (SvTYPE (SvRV (data)) != SVt_PVAV)
2040 return 1;
2041
2042 /* now push all results on the stack */
2043 {
2044 dSP;
2045 AV *av = (AV *)SvRV (data);
2046 int i;
2047
2048 EXTEND (SP, AvFILLp (av) + 1);
2049 for (i = 0; i <= AvFILLp (av); ++i)
2050 PUSHs (sv_2mortal (AvARRAY (av)[i]));
2051
2052 /* we have stolen the elements, so set length to zero and free */
2053 AvFILLp (av) = -1;
2054 av_undef (av);
2055
2056 PUTBACK;
2057 }
2058
2059 return 0;
2060}
2061
2062static void
2063slf_init_rouse_wait (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2064{
2065 SV *cb;
2066
2067 if (items)
2068 cb = arg [0];
2069 else
2070 {
2071 struct coro *coro = SvSTATE_current;
2072
2073 if (!coro->rouse_cb)
2074 croak ("Coro::rouse_wait called without rouse callback, and no default rouse callback found either,");
2075
2076 cb = sv_2mortal (coro->rouse_cb);
2077 coro->rouse_cb = 0;
2078 }
2079
2080 if (!SvROK (cb)
2081 || SvTYPE (SvRV (cb)) != SVt_PVCV
2082 || CvXSUB ((CV *)SvRV (cb)) != coro_rouse_callback)
2083 croak ("Coro::rouse_wait called with illegal callback argument,");
2084
2085 {
2086 CV *cv = (CV *)SvRV (cb); /* for GENSUB_ARG */
2087 SV *data = (SV *)GENSUB_ARG;
2088
2089 frame->data = (void *)data;
2090 frame->prepare = SvTYPE (SvRV (data)) == SVt_PVAV ? prepare_nop : prepare_schedule;
2091 frame->check = slf_check_rouse_wait;
2092 }
2093}
2094
2095static SV *
2096coro_new_rouse_cb (pTHX)
2097{
2098 HV *hv = (HV *)SvRV (coro_current);
2099 struct coro *coro = SvSTATE_hv (hv);
2100 SV *data = newRV_inc ((SV *)hv);
2101 SV *cb = gensub (aTHX_ coro_rouse_callback, (void *)data);
2102
2103 sv_magicext (SvRV (cb), data, CORO_MAGIC_type_rouse, 0, 0, 0);
2104 SvREFCNT_dec (data); /* magicext increases the refcount */
2105
2106 SvREFCNT_dec (coro->rouse_cb);
2107 coro->rouse_cb = SvREFCNT_inc_NN (cb);
2108
2109 return cb;
2110}
2111
2112/*****************************************************************************/
2113/* schedule-like-function opcode (SLF) */
2114
2115static UNOP slf_restore; /* restore stack as entersub did, for first-re-run */
2116static const CV *slf_cv;
2117static SV **slf_argv;
2118static int slf_argc, slf_arga; /* count, allocated */
2119static I32 slf_ax; /* top of stack, for restore */
2120
2121/* this restores the stack in the case we patched the entersub, to */
2122/* recreate the stack frame as perl will on following calls */
2123/* since entersub cleared the stack */
2124static OP *
2125pp_restore (pTHX)
2126{
2127 int i;
2128 SV **SP = PL_stack_base + slf_ax;
2129
2130 PUSHMARK (SP);
2131
2132 EXTEND (SP, slf_argc + 1);
2133
2134 for (i = 0; i < slf_argc; ++i)
2135 PUSHs (sv_2mortal (slf_argv [i]));
2136
2137 PUSHs ((SV *)CvGV (slf_cv));
2138
2139 RETURNOP (slf_restore.op_first);
2140}
2141
2142static void
2143slf_prepare_transfer (pTHX_ struct coro_transfer_args *ta)
2144{
2145 SV **arg = (SV **)slf_frame.data;
2146
2147 prepare_transfer (aTHX_ ta, arg [0], arg [1]);
2148}
2149
2150static void
2151slf_init_transfer (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2152{
2153 if (items != 2)
2154 croak ("Coro::State::transfer (prev, next) expects two arguments, not %d,", items);
2155
2156 frame->prepare = slf_prepare_transfer;
2157 frame->check = slf_check_nop;
2158 frame->data = (void *)arg; /* let's hope it will stay valid */
2159}
2160
2161static void
2162slf_init_schedule (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2163{
2164 frame->prepare = prepare_schedule;
2165 frame->check = slf_check_nop;
2166}
2167
2168static void
2169slf_prepare_schedule_to (pTHX_ struct coro_transfer_args *ta)
2170{
2171 struct coro *next = (struct coro *)slf_frame.data;
2172
2173 SvREFCNT_inc_NN (next->hv);
2174 prepare_schedule_to (aTHX_ ta, next);
2175}
2176
2177static void
2178slf_init_schedule_to (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2179{
2180 if (!items)
2181 croak ("Coro::schedule_to expects a coroutine argument, caught");
2182
2183 frame->data = (void *)SvSTATE (arg [0]);
2184 frame->prepare = slf_prepare_schedule_to;
2185 frame->check = slf_check_nop;
2186}
2187
2188static void
2189slf_init_cede_to (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2190{
2191 api_ready (aTHX_ SvRV (coro_current));
2192
2193 slf_init_schedule_to (aTHX_ frame, cv, arg, items);
2194}
2195
2196static void
2197slf_init_cede (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2198{
2199 frame->prepare = prepare_cede;
2200 frame->check = slf_check_nop;
2201}
2202
2203static void
2204slf_init_cede_notself (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2205{
2206 frame->prepare = prepare_cede_notself;
2207 frame->check = slf_check_nop;
2208}
2209
2210/*
2211 * these not obviously related functions are all rolled into one
2212 * function to increase chances that they all will call transfer with the same
2213 * stack offset
2214 * SLF stands for "schedule-like-function".
2215 */
2216static OP *
2217pp_slf (pTHX)
2218{
2219 I32 checkmark; /* mark SP to see how many elements check has pushed */
2220
2221 /* set up the slf frame, unless it has already been set-up */
2222 /* the latter happens when a new coro has been started */
2223 /* or when a new cctx was attached to an existing coroutine */
2224 if (expect_true (!slf_frame.prepare))
2225 {
2226 /* first iteration */
2227 dSP;
2228 SV **arg = PL_stack_base + TOPMARK + 1;
2229 int items = SP - arg; /* args without function object */
2230 SV *gv = *sp;
2231
2232 /* do a quick consistency check on the "function" object, and if it isn't */
2233 /* for us, divert to the real entersub */
2234 if (SvTYPE (gv) != SVt_PVGV
2235 || !GvCV (gv)
2236 || !(CvFLAGS (GvCV (gv)) & CVf_SLF))
2237 return PL_ppaddr[OP_ENTERSUB](aTHX);
2238
2239 if (!(PL_op->op_flags & OPf_STACKED))
2240 {
2241 /* ampersand-form of call, use @_ instead of stack */
2242 AV *av = GvAV (PL_defgv);
2243 arg = AvARRAY (av);
2244 items = AvFILLp (av) + 1;
2245 }
2246
2247 /* now call the init function, which needs to set up slf_frame */
2248 ((coro_slf_cb)CvXSUBANY (GvCV (gv)).any_ptr)
2249 (aTHX_ &slf_frame, GvCV (gv), arg, items);
2250
2251 /* pop args */
2252 SP = PL_stack_base + POPMARK;
2253
2254 PUTBACK;
2255 }
2256
2257 /* now that we have a slf_frame, interpret it! */
2258 /* we use a callback system not to make the code needlessly */
2259 /* complicated, but so we can run multiple perl coros from one cctx */
2260
2261 do
2262 {
2263 struct coro_transfer_args ta;
2264
2265 slf_frame.prepare (aTHX_ &ta);
2266 TRANSFER (ta, 0);
2267
2268 checkmark = PL_stack_sp - PL_stack_base;
2269 }
2270 while (slf_frame.check (aTHX_ &slf_frame));
2271
2272 slf_frame.prepare = 0; /* invalidate the frame, we are done processing it */
2273
2274 /* exception handling */
2275 if (expect_false (CORO_THROW))
2276 {
2277 SV *exception = sv_2mortal (CORO_THROW);
2278
2279 CORO_THROW = 0;
2280 sv_setsv (ERRSV, exception);
2281 croak (0);
2282 }
2283
2284 /* return value handling - mostly like entersub */
2285 /* make sure we put something on the stack in scalar context */
2286 if (GIMME_V == G_SCALAR)
2287 {
2288 dSP;
2289 SV **bot = PL_stack_base + checkmark;
2290
2291 if (sp == bot) /* too few, push undef */
2292 bot [1] = &PL_sv_undef;
2293 else if (sp != bot + 1) /* too many, take last one */
2294 bot [1] = *sp;
2295
2296 SP = bot + 1;
2297
2298 PUTBACK;
2299 }
2300
2301 return NORMAL;
2302}
2303
2304static void
2305api_execute_slf (pTHX_ CV *cv, coro_slf_cb init_cb, I32 ax)
2306{
2307 int i;
2308 SV **arg = PL_stack_base + ax;
2309 int items = PL_stack_sp - arg + 1;
2310
2311 assert (("FATAL: SLF call with illegal CV value", !CvANON (cv)));
2312
2313 if (PL_op->op_ppaddr != PL_ppaddr [OP_ENTERSUB]
2314 && PL_op->op_ppaddr != pp_slf)
2315 croak ("FATAL: Coro SLF calls can only be made normally, not via goto or any other means, caught");
2316
2317 CvFLAGS (cv) |= CVf_SLF;
2318 CvXSUBANY (cv).any_ptr = (void *)init_cb;
2319 slf_cv = cv;
2320
2321 /* we patch the op, and then re-run the whole call */
2322 /* we have to put the same argument on the stack for this to work */
2323 /* and this will be done by pp_restore */
2324 slf_restore.op_next = (OP *)&slf_restore;
2325 slf_restore.op_type = OP_CUSTOM;
2326 slf_restore.op_ppaddr = pp_restore;
2327 slf_restore.op_first = PL_op;
2328
2329 slf_ax = ax - 1; /* undo the ax++ inside dAXMARK */
2330
2331 if (PL_op->op_flags & OPf_STACKED)
2332 {
2333 if (items > slf_arga)
2334 {
2335 slf_arga = items;
2336 free (slf_argv);
2337 slf_argv = malloc (slf_arga * sizeof (SV *));
2338 }
2339
2340 slf_argc = items;
2341
2342 for (i = 0; i < items; ++i)
2343 slf_argv [i] = SvREFCNT_inc (arg [i]);
2344 }
2345 else
2346 slf_argc = 0;
2347
2348 PL_op->op_ppaddr = pp_slf;
2349 /*PL_op->op_type = OP_CUSTOM; /* we do behave like entersub still */
2350
2351 PL_op = (OP *)&slf_restore;
2352}
2353
2354/*****************************************************************************/
2355/* dynamic wind */
2356
2357static void
2358on_enterleave_call (pTHX_ SV *cb)
2359{
2360 dSP;
2361
2362 PUSHSTACK;
2363
2364 PUSHMARK (SP);
2365 PUTBACK;
2366 call_sv (cb, G_VOID | G_DISCARD);
2367 SPAGAIN;
2368
2369 POPSTACK;
2370}
2371
2372static SV *
2373coro_avp_pop_and_free (pTHX_ AV **avp)
2374{
2375 AV *av = *avp;
2376 SV *res = av_pop (av);
2377
2378 if (AvFILLp (av) < 0)
2379 {
2380 *avp = 0;
2381 SvREFCNT_dec (av);
2382 }
2383
2384 return res;
2385}
2386
2387static void
2388coro_pop_on_enter (pTHX_ void *coro)
2389{
2390 SV *cb = coro_avp_pop_and_free (aTHX_ &((struct coro *)coro)->on_enter);
2391 SvREFCNT_dec (cb);
2392}
2393
2394static void
2395coro_pop_on_leave (pTHX_ void *coro)
2396{
2397 SV *cb = coro_avp_pop_and_free (aTHX_ &((struct coro *)coro)->on_leave);
2398 on_enterleave_call (aTHX_ sv_2mortal (cb));
2399}
2400
2401/*****************************************************************************/
2402/* PerlIO::cede */
2403
2404typedef struct
2405{
2406 PerlIOBuf base;
2407 NV next, every;
2408} PerlIOCede;
2409
2410static IV
2411PerlIOCede_pushed (pTHX_ PerlIO *f, const char *mode, SV *arg, PerlIO_funcs *tab)
2412{
2413 PerlIOCede *self = PerlIOSelf (f, PerlIOCede);
2414
2415 self->every = SvCUR (arg) ? SvNV (arg) : 0.01;
2416 self->next = nvtime () + self->every;
2417
2418 return PerlIOBuf_pushed (aTHX_ f, mode, Nullsv, tab);
2419}
2420
2421static SV *
2422PerlIOCede_getarg (pTHX_ PerlIO *f, CLONE_PARAMS *param, int flags)
2423{
2424 PerlIOCede *self = PerlIOSelf (f, PerlIOCede);
2425
2426 return newSVnv (self->every);
2427}
2428
2429static IV
2430PerlIOCede_flush (pTHX_ PerlIO *f)
2431{
2432 PerlIOCede *self = PerlIOSelf (f, PerlIOCede);
2433 double now = nvtime ();
2434
2435 if (now >= self->next)
2436 {
2437 api_cede (aTHX);
2438 self->next = now + self->every;
2439 }
2440
2441 return PerlIOBuf_flush (aTHX_ f);
2442}
2443
2444static PerlIO_funcs PerlIO_cede =
2445{
2446 sizeof(PerlIO_funcs),
2447 "cede",
2448 sizeof(PerlIOCede),
2449 PERLIO_K_DESTRUCT | PERLIO_K_RAW,
2450 PerlIOCede_pushed,
2451 PerlIOBuf_popped,
2452 PerlIOBuf_open,
2453 PerlIOBase_binmode,
2454 PerlIOCede_getarg,
2455 PerlIOBase_fileno,
2456 PerlIOBuf_dup,
2457 PerlIOBuf_read,
2458 PerlIOBuf_unread,
2459 PerlIOBuf_write,
2460 PerlIOBuf_seek,
2461 PerlIOBuf_tell,
2462 PerlIOBuf_close,
2463 PerlIOCede_flush,
2464 PerlIOBuf_fill,
2465 PerlIOBase_eof,
2466 PerlIOBase_error,
2467 PerlIOBase_clearerr,
2468 PerlIOBase_setlinebuf,
2469 PerlIOBuf_get_base,
2470 PerlIOBuf_bufsiz,
2471 PerlIOBuf_get_ptr,
2472 PerlIOBuf_get_cnt,
2473 PerlIOBuf_set_ptrcnt,
2474};
2475
2476/*****************************************************************************/
2477/* Coro::Semaphore & Coro::Signal */
2478
2479static SV *
2480coro_waitarray_new (pTHX_ int count)
2481{
2482 /* a waitarray=semaphore contains a counter IV in $sem->[0] and any waiters after that */
2483 AV *av = newAV ();
2484 SV **ary;
2485
2486 /* unfortunately, building manually saves memory */
2487 Newx (ary, 2, SV *);
2488 AvALLOC (av) = ary;
2489#if PERL_VERSION_ATLEAST (5,10,0)
2490 AvARRAY (av) = ary;
2491#else
2492 /* 5.8.8 needs this syntax instead of AvARRAY = ary, yet */
2493 /* -DDEBUGGING flags this as a bug, despite it perfectly working */
2494 SvPVX ((SV *)av) = (char *)ary;
2495#endif
2496 AvMAX (av) = 1;
2497 AvFILLp (av) = 0;
2498 ary [0] = newSViv (count);
2499
2500 return newRV_noinc ((SV *)av);
2501}
2502
2503/* semaphore */
2504
2505static void
2506coro_semaphore_adjust (pTHX_ AV *av, IV adjust)
2507{
2508 SV *count_sv = AvARRAY (av)[0];
2509 IV count = SvIVX (count_sv);
2510
2511 count += adjust;
2512 SvIVX (count_sv) = count;
2513
2514 /* now wake up as many waiters as are expected to lock */
2515 while (count > 0 && AvFILLp (av) > 0)
2516 {
2517 SV *cb;
2518
2519 /* swap first two elements so we can shift a waiter */
2520 AvARRAY (av)[0] = AvARRAY (av)[1];
2521 AvARRAY (av)[1] = count_sv;
2522 cb = av_shift (av);
2523
2524 if (SvOBJECT (cb))
2525 {
2526 api_ready (aTHX_ cb);
2527 --count;
2528 }
2529 else if (SvTYPE (cb) == SVt_PVCV)
2530 {
2531 dSP;
2532 PUSHMARK (SP);
2533 XPUSHs (sv_2mortal (newRV_inc ((SV *)av)));
2534 PUTBACK;
2535 call_sv (cb, G_VOID | G_DISCARD | G_EVAL | G_KEEPERR);
2536 }
2537
2538 SvREFCNT_dec (cb);
2539 }
2540}
2541
2542static void
2543coro_semaphore_on_destroy (pTHX_ struct coro *coro)
2544{
2545 /* call $sem->adjust (0) to possibly wake up some other waiters */
2546 coro_semaphore_adjust (aTHX_ (AV *)coro->slf_frame.data, 0);
2547}
2548
2549static int
2550slf_check_semaphore_down_or_wait (pTHX_ struct CoroSLF *frame, int acquire)
2551{
2552 AV *av = (AV *)frame->data;
2553 SV *count_sv = AvARRAY (av)[0];
2554
2555 /* if we are about to throw, don't actually acquire the lock, just throw */
2556 if (CORO_THROW)
2557 return 0;
2558 else if (SvIVX (count_sv) > 0)
2559 {
2560 SvSTATE_current->on_destroy = 0;
2561
2562 if (acquire)
2563 SvIVX (count_sv) = SvIVX (count_sv) - 1;
2564 else
2565 coro_semaphore_adjust (aTHX_ av, 0);
2566
2567 return 0;
2568 }
2569 else
2570 {
2571 int i;
2572 /* if we were woken up but can't down, we look through the whole */
2573 /* waiters list and only add us if we aren't in there already */
2574 /* this avoids some degenerate memory usage cases */
2575
2576 for (i = 1; i <= AvFILLp (av); ++i)
2577 if (AvARRAY (av)[i] == SvRV (coro_current))
2578 return 1;
2579
2580 av_push (av, SvREFCNT_inc (SvRV (coro_current)));
2581 return 1;
2582 }
2583}
2584
2585static int
2586slf_check_semaphore_down (pTHX_ struct CoroSLF *frame)
2587{
2588 return slf_check_semaphore_down_or_wait (aTHX_ frame, 1);
2589}
2590
2591static int
2592slf_check_semaphore_wait (pTHX_ struct CoroSLF *frame)
2593{
2594 return slf_check_semaphore_down_or_wait (aTHX_ frame, 0);
2595}
2596
2597static void
2598slf_init_semaphore_down_or_wait (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2599{
2600 AV *av = (AV *)SvRV (arg [0]);
2601
2602 if (SvIVX (AvARRAY (av)[0]) > 0)
2603 {
2604 frame->data = (void *)av;
2605 frame->prepare = prepare_nop;
2606 }
2607 else
2608 {
2609 av_push (av, SvREFCNT_inc (SvRV (coro_current)));
2610
2611 frame->data = (void *)sv_2mortal (SvREFCNT_inc ((SV *)av));
2612 frame->prepare = prepare_schedule;
2613
2614 /* to avoid race conditions when a woken-up coro gets terminated */
2615 /* we arrange for a temporary on_destroy that calls adjust (0) */
2616 SvSTATE_current->on_destroy = coro_semaphore_on_destroy;
2617 }
2618}
2619
2620static void
2621slf_init_semaphore_down (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2622{
2623 slf_init_semaphore_down_or_wait (aTHX_ frame, cv, arg, items);
2624 frame->check = slf_check_semaphore_down;
2625}
2626
2627static void
2628slf_init_semaphore_wait (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2629{
2630 if (items >= 2)
2631 {
2632 /* callback form */
2633 AV *av = (AV *)SvRV (arg [0]);
2634 CV *cb_cv = coro_sv_2cv (aTHX_ arg [1]);
2635
2636 av_push (av, SvREFCNT_inc_NN (cb_cv));
2637
2638 if (SvIVX (AvARRAY (av)[0]) > 0)
2639 coro_semaphore_adjust (aTHX_ av, 0);
2640
2641 frame->prepare = prepare_nop;
2642 frame->check = slf_check_nop;
2643 }
2644 else
2645 {
2646 slf_init_semaphore_down_or_wait (aTHX_ frame, cv, arg, items);
2647 frame->check = slf_check_semaphore_wait;
2648 }
2649}
2650
2651/* signal */
2652
2653static void
2654coro_signal_wake (pTHX_ AV *av, int count)
2655{
2656 SvIVX (AvARRAY (av)[0]) = 0;
2657
2658 /* now signal count waiters */
2659 while (count > 0 && AvFILLp (av) > 0)
2660 {
2661 SV *cb;
2662
2663 /* swap first two elements so we can shift a waiter */
2664 cb = AvARRAY (av)[0];
2665 AvARRAY (av)[0] = AvARRAY (av)[1];
2666 AvARRAY (av)[1] = cb;
2667
2668 cb = av_shift (av);
2669
2670 if (SvTYPE (cb) == SVt_PVCV)
2671 {
2672 dSP;
2673 PUSHMARK (SP);
2674 XPUSHs (sv_2mortal (newRV_inc ((SV *)av)));
2675 PUTBACK;
2676 call_sv (cb, G_VOID | G_DISCARD | G_EVAL | G_KEEPERR);
2677 }
2678 else
2679 {
2680 api_ready (aTHX_ cb);
2681 sv_setiv (cb, 0); /* signal waiter */
2682 }
2683
2684 SvREFCNT_dec (cb);
2685
2686 --count;
2687 }
2688}
2689
2690static int
2691slf_check_signal_wait (pTHX_ struct CoroSLF *frame)
2692{
2693 /* if we are about to throw, also stop waiting */
2694 return SvROK ((SV *)frame->data) && !CORO_THROW;
2695}
2696
2697static void
2698slf_init_signal_wait (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2699{
2700 AV *av = (AV *)SvRV (arg [0]);
2701
2702 if (items >= 2)
2703 {
2704 CV *cb_cv = coro_sv_2cv (aTHX_ arg [1]);
2705 av_push (av, SvREFCNT_inc_NN (cb_cv));
2706
2707 if (SvIVX (AvARRAY (av)[0]))
2708 coro_signal_wake (aTHX_ av, 1); /* ust be the only waiter */
2709
2710 frame->prepare = prepare_nop;
2711 frame->check = slf_check_nop;
2712 }
2713 else if (SvIVX (AvARRAY (av)[0]))
2714 {
2715 SvIVX (AvARRAY (av)[0]) = 0;
2716 frame->prepare = prepare_nop;
2717 frame->check = slf_check_nop;
2718 }
2719 else
2720 {
2721 SV *waiter = newSVsv (coro_current); /* owned by signal av */
2722
2723 av_push (av, waiter);
2724
2725 frame->data = (void *)sv_2mortal (SvREFCNT_inc_NN (waiter)); /* owned by process */
2726 frame->prepare = prepare_schedule;
2727 frame->check = slf_check_signal_wait;
2728 }
2729}
2730
2731/*****************************************************************************/
2732/* Coro::AIO */
2733
2734#define CORO_MAGIC_type_aio PERL_MAGIC_ext
2735
2736/* helper storage struct */
2737struct io_state
2738{
2739 int errorno;
2740 I32 laststype; /* U16 in 5.10.0 */
2741 int laststatval;
2742 Stat_t statcache;
2743};
2744
2745static void
2746coro_aio_callback (pTHX_ CV *cv)
2747{
2748 dXSARGS;
2749 AV *state = (AV *)GENSUB_ARG;
2750 SV *coro = av_pop (state);
2751 SV *data_sv = newSV (sizeof (struct io_state));
2752
2753 av_extend (state, items - 1);
2754
2755 sv_upgrade (data_sv, SVt_PV);
2756 SvCUR_set (data_sv, sizeof (struct io_state));
2757 SvPOK_only (data_sv);
2758
2759 {
2760 struct io_state *data = (struct io_state *)SvPVX (data_sv);
2761
2762 data->errorno = errno;
2763 data->laststype = PL_laststype;
2764 data->laststatval = PL_laststatval;
2765 data->statcache = PL_statcache;
2766 }
2767
2768 /* now build the result vector out of all the parameters and the data_sv */
2769 {
2770 int i;
2771
2772 for (i = 0; i < items; ++i)
2773 av_push (state, SvREFCNT_inc_NN (ST (i)));
2774 }
2775
2776 av_push (state, data_sv);
2777
2778 api_ready (aTHX_ coro);
2779 SvREFCNT_dec (coro);
2780 SvREFCNT_dec ((AV *)state);
2781}
2782
2783static int
2784slf_check_aio_req (pTHX_ struct CoroSLF *frame)
2785{
2786 AV *state = (AV *)frame->data;
2787
2788 /* if we are about to throw, return early */
2789 /* this does not cancel the aio request, but at least */
2790 /* it quickly returns */
2791 if (CORO_THROW)
2792 return 0;
2793
2794 /* one element that is an RV? repeat! */
2795 if (AvFILLp (state) == 0 && SvROK (AvARRAY (state)[0]))
2796 return 1;
2797
2798 /* restore status */
2799 {
2800 SV *data_sv = av_pop (state);
2801 struct io_state *data = (struct io_state *)SvPVX (data_sv);
2802
2803 errno = data->errorno;
2804 PL_laststype = data->laststype;
2805 PL_laststatval = data->laststatval;
2806 PL_statcache = data->statcache;
2807
2808 SvREFCNT_dec (data_sv);
2809 }
2810
2811 /* push result values */
2812 {
2813 dSP;
2814 int i;
2815
2816 EXTEND (SP, AvFILLp (state) + 1);
2817 for (i = 0; i <= AvFILLp (state); ++i)
2818 PUSHs (sv_2mortal (SvREFCNT_inc_NN (AvARRAY (state)[i])));
2819
2820 PUTBACK;
2821 }
2822
2823 return 0;
2824}
2825
2826static void
2827slf_init_aio_req (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2828{
2829 AV *state = (AV *)sv_2mortal ((SV *)newAV ());
2830 SV *coro_hv = SvRV (coro_current);
2831 struct coro *coro = SvSTATE_hv (coro_hv);
2832
2833 /* put our coroutine id on the state arg */
2834 av_push (state, SvREFCNT_inc_NN (coro_hv));
2835
2836 /* first see whether we have a non-zero priority and set it as AIO prio */
2837 if (coro->prio)
2838 {
2839 dSP;
2840
2841 static SV *prio_cv;
2842 static SV *prio_sv;
2843
2844 if (expect_false (!prio_cv))
2845 {
2846 prio_cv = (SV *)get_cv ("IO::AIO::aioreq_pri", 0);
2847 prio_sv = newSViv (0);
2848 }
2849
2850 PUSHMARK (SP);
2851 sv_setiv (prio_sv, coro->prio);
2852 XPUSHs (prio_sv);
2853
2854 PUTBACK;
2855 call_sv (prio_cv, G_VOID | G_DISCARD);
2856 }
2857
2858 /* now call the original request */
2859 {
2860 dSP;
2861 CV *req = (CV *)CORO_MAGIC_NN ((SV *)cv, CORO_MAGIC_type_aio)->mg_obj;
2862 int i;
2863
2864 PUSHMARK (SP);
2865
2866 /* first push all args to the stack */
2867 EXTEND (SP, items + 1);
2868
2869 for (i = 0; i < items; ++i)
2870 PUSHs (arg [i]);
2871
2872 /* now push the callback closure */
2873 PUSHs (sv_2mortal (gensub (aTHX_ coro_aio_callback, (void *)SvREFCNT_inc_NN ((SV *)state))));
2874
2875 /* now call the AIO function - we assume our request is uncancelable */
2876 PUTBACK;
2877 call_sv ((SV *)req, G_VOID | G_DISCARD);
2878 }
2879
2880 /* now that the requets is going, we loop toll we have a result */
2881 frame->data = (void *)state;
2882 frame->prepare = prepare_schedule;
2883 frame->check = slf_check_aio_req;
2884}
2885
2886static void
2887coro_aio_req_xs (pTHX_ CV *cv)
2888{
2889 dXSARGS;
2890
2891 CORO_EXECUTE_SLF_XS (slf_init_aio_req);
2892
2893 XSRETURN_EMPTY;
2894}
2895
2896/*****************************************************************************/
2897
2898#if CORO_CLONE
2899# include "clone.c"
2900#endif
2901
2902MODULE = Coro::State PACKAGE = Coro::State PREFIX = api_
2903
2904PROTOTYPES: DISABLE
2905
2906BOOT:
2907{
2908#ifdef USE_ITHREADS
2909# if CORO_PTHREAD
2910 coro_thx = PERL_GET_CONTEXT;
2911# endif
2912#endif
2913 BOOT_PAGESIZE;
2914
2915 cctx_current = cctx_new_empty ();
2916
2917 irsgv = gv_fetchpv ("/" , GV_ADD|GV_NOTQUAL, SVt_PV);
2918 stdoutgv = gv_fetchpv ("STDOUT", GV_ADD|GV_NOTQUAL, SVt_PVIO);
2919
2920 orig_sigelem_get = PL_vtbl_sigelem.svt_get; PL_vtbl_sigelem.svt_get = coro_sigelem_get;
2921 orig_sigelem_set = PL_vtbl_sigelem.svt_set; PL_vtbl_sigelem.svt_set = coro_sigelem_set;
2922 orig_sigelem_clr = PL_vtbl_sigelem.svt_clear; PL_vtbl_sigelem.svt_clear = coro_sigelem_clr;
2923
2924 hv_sig = coro_get_hv (aTHX_ "SIG", TRUE);
2925 rv_diehook = newRV_inc ((SV *)gv_fetchpv ("Coro::State::diehook" , 0, SVt_PVCV));
2926 rv_warnhook = newRV_inc ((SV *)gv_fetchpv ("Coro::State::warnhook", 0, SVt_PVCV));
2927
2928 coro_state_stash = gv_stashpv ("Coro::State", TRUE);
2929
2930 newCONSTSUB (coro_state_stash, "CC_TRACE" , newSViv (CC_TRACE));
2931 newCONSTSUB (coro_state_stash, "CC_TRACE_SUB" , newSViv (CC_TRACE_SUB));
2932 newCONSTSUB (coro_state_stash, "CC_TRACE_LINE", newSViv (CC_TRACE_LINE));
2933 newCONSTSUB (coro_state_stash, "CC_TRACE_ALL" , newSViv (CC_TRACE_ALL));
2934
2935 main_mainstack = PL_mainstack;
2936 main_top_env = PL_top_env;
2937
2938 while (main_top_env->je_prev)
2939 main_top_env = main_top_env->je_prev;
2940
2941 {
2942 SV *slf = sv_2mortal (newSViv (PTR2IV (pp_slf)));
2943
2944 if (!PL_custom_op_names) PL_custom_op_names = newHV ();
2945 hv_store_ent (PL_custom_op_names, slf, newSVpv ("coro_slf", 0), 0);
2946
2947 if (!PL_custom_op_descs) PL_custom_op_descs = newHV ();
2948 hv_store_ent (PL_custom_op_descs, slf, newSVpv ("coro schedule like function", 0), 0);
2949 }
2950
2951 coroapi.ver = CORO_API_VERSION;
2952 coroapi.rev = CORO_API_REVISION;
2953
2954 coroapi.transfer = api_transfer;
2955
2956 coroapi.sv_state = SvSTATE_;
2957 coroapi.execute_slf = api_execute_slf;
2958 coroapi.prepare_nop = prepare_nop;
2959 coroapi.prepare_schedule = prepare_schedule;
2960 coroapi.prepare_cede = prepare_cede;
2961 coroapi.prepare_cede_notself = prepare_cede_notself;
2962
2963 {
2964 SV **svp = hv_fetch (PL_modglobal, "Time::NVtime", 12, 0);
2965
2966 if (!svp) croak ("Time::HiRes is required");
2967 if (!SvIOK (*svp)) croak ("Time::NVtime isn't a function pointer");
2968
2969 nvtime = INT2PTR (double (*)(), SvIV (*svp));
2970 }
2971
2972 assert (("PRIO_NORMAL must be 0", !PRIO_NORMAL));
2973}
2974
2975SV *
2976new (char *klass, ...)
2977 ALIAS:
2978 Coro::new = 1
2979 CODE:
2980{
2981 struct coro *coro;
2982 MAGIC *mg;
2983 HV *hv;
2984 CV *cb;
2985 int i;
2986
2987 if (items > 1)
2988 {
2989 cb = coro_sv_2cv (aTHX_ ST (1));
2990
2991 if (!ix)
235 { 2992 {
236 /* I never used formats, so how should I know how these are implemented? */ 2993 if (CvISXSUB (cb))
237 /* my bold guess is as a simple, plain sub... */ 2994 croak ("Coro::State doesn't support XS functions as coroutine start, caught");
238 croak ("CXt_FORMAT not yet handled. Don't switch coroutines from within formats"); 2995
2996 if (!CvROOT (cb))
2997 croak ("Coro::State doesn't support autoloaded or undefined functions as coroutine start, caught");
239 } 2998 }
240 } 2999 }
241 3000
242 if (top_si->si_type == PERLSI_MAIN)
243 break;
244
245 top_si = top_si->si_prev;
246 ccstk = top_si->si_cxstack;
247 cxix = top_si->si_cxix;
248 }
249
250 PUTBACK;
251 }
252
253 c->dowarn = PL_dowarn;
254 c->defav = GvAV (PL_defgv);
255 c->curstackinfo = PL_curstackinfo;
256 c->curstack = PL_curstack;
257 c->mainstack = PL_mainstack;
258 c->stack_sp = PL_stack_sp;
259 c->op = PL_op;
260 c->curpad = PL_curpad;
261 c->stack_base = PL_stack_base;
262 c->stack_max = PL_stack_max;
263 c->tmps_stack = PL_tmps_stack;
264 c->tmps_floor = PL_tmps_floor;
265 c->tmps_ix = PL_tmps_ix;
266 c->tmps_max = PL_tmps_max;
267 c->markstack = PL_markstack;
268 c->markstack_ptr = PL_markstack_ptr;
269 c->markstack_max = PL_markstack_max;
270 c->scopestack = PL_scopestack;
271 c->scopestack_ix = PL_scopestack_ix;
272 c->scopestack_max = PL_scopestack_max;
273 c->savestack = PL_savestack;
274 c->savestack_ix = PL_savestack_ix;
275 c->savestack_max = PL_savestack_max;
276 c->retstack = PL_retstack;
277 c->retstack_ix = PL_retstack_ix;
278 c->retstack_max = PL_retstack_max;
279 c->curcop = PL_curcop;
280}
281
282static void
283LOAD(pTHX_ Coro__State c)
284{
285 PL_dowarn = c->dowarn;
286 GvAV (PL_defgv) = c->defav;
287 PL_curstackinfo = c->curstackinfo;
288 PL_curstack = c->curstack;
289 PL_mainstack = c->mainstack;
290 PL_stack_sp = c->stack_sp;
291 PL_op = c->op;
292 PL_curpad = c->curpad;
293 PL_stack_base = c->stack_base;
294 PL_stack_max = c->stack_max;
295 PL_tmps_stack = c->tmps_stack;
296 PL_tmps_floor = c->tmps_floor;
297 PL_tmps_ix = c->tmps_ix;
298 PL_tmps_max = c->tmps_max;
299 PL_markstack = c->markstack;
300 PL_markstack_ptr = c->markstack_ptr;
301 PL_markstack_max = c->markstack_max;
302 PL_scopestack = c->scopestack;
303 PL_scopestack_ix = c->scopestack_ix;
304 PL_scopestack_max = c->scopestack_max;
305 PL_savestack = c->savestack;
306 PL_savestack_ix = c->savestack_ix;
307 PL_savestack_max = c->savestack_max;
308 PL_retstack = c->retstack;
309 PL_retstack_ix = c->retstack_ix;
310 PL_retstack_max = c->retstack_max;
311 PL_curcop = c->curcop;
312
313 {
314 dSP;
315 CV *cv;
316
317 /* now do the ugly restore mess */
318 while ((cv = (CV *)POPs))
319 {
320 AV *padlist = (AV *)POPs;
321
322 put_padlist (cv);
323 CvPADLIST(cv) = padlist;
324 CvDEPTH(cv) = (I32)POPs;
325
326#ifdef USE_THREADS
327 CvOWNER(cv) = (struct perl_thread *)POPs;
328 error does not work either
329#endif
330 }
331
332 PUTBACK;
333 }
334}
335
336/* this is an EXACT copy of S_nuke_stacks in perl.c, which is unfortunately static */
337STATIC void
338destroy_stacks(pTHX)
339{
340 dSP;
341
342 /* die does this while calling POPSTACK, but I just don't see why. */
343 dounwind(-1);
344
345 /* is this ugly, I ask? */
346 while (PL_scopestack_ix)
347 LEAVE;
348
349 while (PL_curstackinfo->si_next)
350 PL_curstackinfo = PL_curstackinfo->si_next;
351
352 while (PL_curstackinfo)
353 {
354 PERL_SI *p = PL_curstackinfo->si_prev;
355
356 SvREFCNT_dec(PL_curstackinfo->si_stack);
357 Safefree(PL_curstackinfo->si_cxstack);
358 Safefree(PL_curstackinfo);
359 PL_curstackinfo = p;
360 }
361
362 if (PL_scopestack_ix != 0)
363 Perl_warner(aTHX_ WARN_INTERNAL,
364 "Unbalanced scopes: %ld more ENTERs than LEAVEs\n",
365 (long)PL_scopestack_ix);
366 if (PL_savestack_ix != 0)
367 Perl_warner(aTHX_ WARN_INTERNAL,
368 "Unbalanced saves: %ld more saves than restores\n",
369 (long)PL_savestack_ix);
370 if (PL_tmps_floor != -1)
371 Perl_warner(aTHX_ WARN_INTERNAL,"Unbalanced tmps: %ld more allocs than frees\n",
372 (long)PL_tmps_floor + 1);
373 /*
374 */
375 Safefree(PL_tmps_stack);
376 Safefree(PL_markstack);
377 Safefree(PL_scopestack);
378 Safefree(PL_savestack);
379 Safefree(PL_retstack);
380}
381
382#define SUB_INIT "Coro::State::_newcoro"
383
384MODULE = Coro::State PACKAGE = Coro::State
385
386PROTOTYPES: ENABLE
387
388BOOT:
389 if (!padlist_cache)
390 padlist_cache = newHV ();
391
392Coro::State
393_newprocess(args)
394 SV * args
395 PROTOTYPE: $
396 CODE:
397 Coro__State coro;
398
399 if (!SvROK (args) || SvTYPE (SvRV (args)) != SVt_PVAV)
400 croak ("Coro::State::newprocess expects an arrayref");
401
402 New (0, coro, 1, struct coro); 3001 Newz (0, coro, 1, struct coro);
3002 coro->args = newAV ();
3003 coro->flags = CF_NEW;
403 3004
404 coro->mainstack = 0; /* actual work is done inside transfer */ 3005 if (coro_first) coro_first->prev = coro;
405 coro->args = (AV *)SvREFCNT_inc (SvRV (args)); 3006 coro->next = coro_first;
3007 coro_first = coro;
406 3008
407 RETVAL = coro; 3009 coro->hv = hv = newHV ();
3010 mg = sv_magicext ((SV *)hv, 0, CORO_MAGIC_type_state, &coro_state_vtbl, (char *)coro, 0);
3011 mg->mg_flags |= MGf_DUP;
3012 RETVAL = sv_bless (newRV_noinc ((SV *)hv), gv_stashpv (klass, 1));
3013
3014 if (items > 1)
3015 {
3016 av_extend (coro->args, items - 1 + ix - 1);
3017
3018 if (ix)
3019 {
3020 av_push (coro->args, SvREFCNT_inc_NN ((SV *)cb));
3021 cb = cv_coro_run;
3022 }
3023
3024 coro->startcv = (CV *)SvREFCNT_inc_NN ((SV *)cb);
3025
3026 for (i = 2; i < items; i++)
3027 av_push (coro->args, newSVsv (ST (i)));
3028 }
3029}
408 OUTPUT: 3030 OUTPUT:
409 RETVAL 3031 RETVAL
410 3032
411void 3033void
412transfer(prev,next) 3034transfer (...)
413 Coro::State_or_hashref prev 3035 PROTOTYPE: $$
414 Coro::State_or_hashref next 3036 CODE:
415 CODE: 3037 CORO_EXECUTE_SLF_XS (slf_init_transfer);
416 3038
417 if (prev != next) 3039bool
3040_destroy (SV *coro_sv)
3041 CODE:
3042 RETVAL = coro_state_destroy (aTHX_ SvSTATE (coro_sv));
3043 OUTPUT:
3044 RETVAL
3045
3046void
3047_exit (int code)
3048 PROTOTYPE: $
3049 CODE:
3050 _exit (code);
3051
3052SV *
3053clone (Coro::State coro)
3054 CODE:
3055{
3056#if CORO_CLONE
3057 struct coro *ncoro = coro_clone (aTHX_ coro);
3058 MAGIC *mg;
3059 /* TODO: too much duplication */
3060 ncoro->hv = newHV ();
3061 mg = sv_magicext ((SV *)ncoro->hv, 0, CORO_MAGIC_type_state, &coro_state_vtbl, (char *)ncoro, 0);
3062 mg->mg_flags |= MGf_DUP;
3063 RETVAL = sv_bless (newRV_noinc ((SV *)ncoro->hv), SvSTASH (coro->hv));
3064#else
3065 croak ("Coro::State->clone has not been configured into this installation of Coro, realised");
3066#endif
3067}
3068 OUTPUT:
3069 RETVAL
3070
3071int
3072cctx_stacksize (int new_stacksize = 0)
3073 PROTOTYPE: ;$
3074 CODE:
3075 RETVAL = cctx_stacksize;
3076 if (new_stacksize)
418 { 3077 {
419 PUTBACK; 3078 cctx_stacksize = new_stacksize;
420 SAVE (aTHX_ prev); 3079 ++cctx_gen;
421
422 /* 3080 }
423 * this could be done in newprocess which would lead to 3081 OUTPUT:
424 * extremely elegant and fast (just PUTBACK/SAVE/LOAD/SPAGAIN) 3082 RETVAL
425 * code here, but lazy allocation of stacks has also 3083
426 * some virtues and the overhead of the if() is nil. 3084int
3085cctx_max_idle (int max_idle = 0)
3086 PROTOTYPE: ;$
3087 CODE:
3088 RETVAL = cctx_max_idle;
3089 if (max_idle > 1)
3090 cctx_max_idle = max_idle;
3091 OUTPUT:
3092 RETVAL
3093
3094int
3095cctx_count ()
3096 PROTOTYPE:
3097 CODE:
3098 RETVAL = cctx_count;
3099 OUTPUT:
3100 RETVAL
3101
3102int
3103cctx_idle ()
3104 PROTOTYPE:
3105 CODE:
3106 RETVAL = cctx_idle;
3107 OUTPUT:
3108 RETVAL
3109
3110void
3111list ()
3112 PROTOTYPE:
3113 PPCODE:
3114{
3115 struct coro *coro;
3116 for (coro = coro_first; coro; coro = coro->next)
3117 if (coro->hv)
3118 XPUSHs (sv_2mortal (newRV_inc ((SV *)coro->hv)));
3119}
3120
3121void
3122call (Coro::State coro, SV *coderef)
3123 ALIAS:
3124 eval = 1
3125 CODE:
3126{
3127 if (coro->mainstack && ((coro->flags & CF_RUNNING) || coro->slot))
427 */ 3128 {
428 if (next->mainstack) 3129 struct coro *current = SvSTATE_current;
3130
3131 if (current != coro)
429 { 3132 {
430 LOAD (aTHX_ next); 3133 PUTBACK;
431 next->mainstack = 0; /* unnecessary but much cleaner */ 3134 save_perl (aTHX_ current);
3135 load_perl (aTHX_ coro);
432 SPAGAIN; 3136 SPAGAIN;
433 } 3137 }
3138
3139 PUSHSTACK;
3140
3141 PUSHMARK (SP);
3142 PUTBACK;
3143
3144 if (ix)
3145 eval_sv (coderef, 0);
434 else 3146 else
3147 call_sv (coderef, G_KEEPERR | G_EVAL | G_VOID | G_DISCARD);
3148
3149 POPSTACK;
3150 SPAGAIN;
3151
3152 if (current != coro)
435 { 3153 {
436 /* 3154 PUTBACK;
437 * emulate part of the perl startup here. 3155 save_perl (aTHX_ coro);
438 */ 3156 load_perl (aTHX_ current);
439 UNOP myop;
440
441 init_stacks (); /* from perl.c */
442 PL_op = (OP *)&myop;
443 /*PL_curcop = 0;*/
444 GvAV (PL_defgv) = (SV *)SvREFCNT_inc (next->args);
445
446 SPAGAIN; 3157 SPAGAIN;
447 Zero(&myop, 1, UNOP);
448 myop.op_next = Nullop;
449 myop.op_flags = OPf_WANT_VOID;
450
451 PUSHMARK(SP);
452 XPUSHs ((SV*)get_cv(SUB_INIT, TRUE));
453 PUTBACK;
454 /*
455 * the next line is slightly wrong, as PL_op->op_next
456 * is actually being executed so we skip the first op.
457 * that doesn't matter, though, since it is only
458 * pp_nextstate and we never return...
459 */
460 PL_op = Perl_pp_entersub(aTHX);
461 SPAGAIN;
462
463 ENTER;
464 } 3158 }
465 } 3159 }
3160}
3161
3162SV *
3163is_ready (Coro::State coro)
3164 PROTOTYPE: $
3165 ALIAS:
3166 is_ready = CF_READY
3167 is_running = CF_RUNNING
3168 is_new = CF_NEW
3169 is_destroyed = CF_DESTROYED
3170 is_suspended = CF_SUSPENDED
3171 CODE:
3172 RETVAL = boolSV (coro->flags & ix);
3173 OUTPUT:
3174 RETVAL
466 3175
467void 3176void
468DESTROY(coro) 3177throw (Coro::State self, SV *throw = &PL_sv_undef)
469 Coro::State coro 3178 PROTOTYPE: $;$
470 CODE: 3179 CODE:
3180{
3181 struct coro *current = SvSTATE_current;
3182 SV **throwp = self == current ? &CORO_THROW : &self->except;
3183 SvREFCNT_dec (*throwp);
3184 SvGETMAGIC (throw);
3185 *throwp = SvOK (throw) ? newSVsv (throw) : 0;
3186}
471 3187
472 if (coro->mainstack) 3188void
3189api_trace (SV *coro, int flags = CC_TRACE | CC_TRACE_SUB)
3190 PROTOTYPE: $;$
3191 C_ARGS: aTHX_ coro, flags
3192
3193SV *
3194has_cctx (Coro::State coro)
3195 PROTOTYPE: $
3196 CODE:
3197 /* maybe manage the running flag differently */
3198 RETVAL = boolSV (!!coro->cctx || (coro->flags & CF_RUNNING));
3199 OUTPUT:
3200 RETVAL
3201
3202int
3203is_traced (Coro::State coro)
3204 PROTOTYPE: $
3205 CODE:
3206 RETVAL = (coro->cctx ? coro->cctx->flags : 0) & CC_TRACE_ALL;
3207 OUTPUT:
3208 RETVAL
3209
3210UV
3211rss (Coro::State coro)
3212 PROTOTYPE: $
3213 ALIAS:
3214 usecount = 1
3215 CODE:
3216 switch (ix)
3217 {
3218 case 0: RETVAL = coro_rss (aTHX_ coro); break;
3219 case 1: RETVAL = coro->usecount; break;
3220 }
3221 OUTPUT:
3222 RETVAL
3223
3224void
3225force_cctx ()
3226 PROTOTYPE:
3227 CODE:
3228 cctx_current->idle_sp = 0;
3229
3230void
3231swap_defsv (Coro::State self)
3232 PROTOTYPE: $
3233 ALIAS:
3234 swap_defav = 1
3235 CODE:
3236 if (!self->slot)
3237 croak ("cannot swap state with coroutine that has no saved state,");
3238 else
473 { 3239 {
474 struct coro temp; 3240 SV **src = ix ? (SV **)&GvAV (PL_defgv) : &GvSV (PL_defgv);
3241 SV **dst = ix ? (SV **)&self->slot->defav : (SV **)&self->slot->defsv;
475 3242
3243 SV *tmp = *src; *src = *dst; *dst = tmp;
3244 }
3245
3246void
3247cancel (Coro::State self)
3248 CODE:
3249 coro_state_destroy (aTHX_ self);
3250 coro_call_on_destroy (aTHX_ self); /* actually only for Coro objects */
3251
3252
3253MODULE = Coro::State PACKAGE = Coro
3254
3255BOOT:
3256{
3257 sv_pool_rss = coro_get_sv (aTHX_ "Coro::POOL_RSS" , TRUE);
3258 sv_pool_size = coro_get_sv (aTHX_ "Coro::POOL_SIZE" , TRUE);
3259 cv_coro_run = get_cv ( "Coro::_coro_run" , GV_ADD);
3260 cv_coro_terminate = get_cv ( "Coro::terminate" , GV_ADD);
3261 coro_current = coro_get_sv (aTHX_ "Coro::current" , FALSE); SvREADONLY_on (coro_current);
3262 av_async_pool = coro_get_av (aTHX_ "Coro::async_pool", TRUE);
3263 av_destroy = coro_get_av (aTHX_ "Coro::destroy" , TRUE);
3264 sv_manager = coro_get_sv (aTHX_ "Coro::manager" , TRUE);
3265 sv_idle = coro_get_sv (aTHX_ "Coro::idle" , TRUE);
3266
3267 sv_async_pool_idle = newSVpv ("[async pool idle]", 0); SvREADONLY_on (sv_async_pool_idle);
3268 sv_Coro = newSVpv ("Coro", 0); SvREADONLY_on (sv_Coro);
3269 cv_pool_handler = get_cv ("Coro::pool_handler", GV_ADD); SvREADONLY_on (cv_pool_handler);
3270 cv_coro_state_new = get_cv ("Coro::State::new", 0); SvREADONLY_on (cv_coro_state_new);
3271
3272 coro_stash = gv_stashpv ("Coro", TRUE);
3273
3274 newCONSTSUB (coro_stash, "PRIO_MAX", newSViv (PRIO_MAX));
3275 newCONSTSUB (coro_stash, "PRIO_HIGH", newSViv (PRIO_HIGH));
3276 newCONSTSUB (coro_stash, "PRIO_NORMAL", newSViv (PRIO_NORMAL));
3277 newCONSTSUB (coro_stash, "PRIO_LOW", newSViv (PRIO_LOW));
3278 newCONSTSUB (coro_stash, "PRIO_IDLE", newSViv (PRIO_IDLE));
3279 newCONSTSUB (coro_stash, "PRIO_MIN", newSViv (PRIO_MIN));
3280
3281 {
3282 SV *sv = coro_get_sv (aTHX_ "Coro::API", TRUE);
3283
3284 coroapi.schedule = api_schedule;
3285 coroapi.schedule_to = api_schedule_to;
3286 coroapi.cede = api_cede;
3287 coroapi.cede_notself = api_cede_notself;
3288 coroapi.ready = api_ready;
3289 coroapi.is_ready = api_is_ready;
3290 coroapi.nready = coro_nready;
3291 coroapi.current = coro_current;
3292
3293 /*GCoroAPI = &coroapi;*/
3294 sv_setiv (sv, (IV)&coroapi);
3295 SvREADONLY_on (sv);
3296 }
3297}
3298
3299void
3300terminate (...)
3301 CODE:
3302 CORO_EXECUTE_SLF_XS (slf_init_terminate);
3303
3304void
3305schedule (...)
3306 CODE:
3307 CORO_EXECUTE_SLF_XS (slf_init_schedule);
3308
3309void
3310schedule_to (...)
3311 CODE:
3312 CORO_EXECUTE_SLF_XS (slf_init_schedule_to);
3313
3314void
3315cede_to (...)
3316 CODE:
3317 CORO_EXECUTE_SLF_XS (slf_init_cede_to);
3318
3319void
3320cede (...)
3321 CODE:
3322 CORO_EXECUTE_SLF_XS (slf_init_cede);
3323
3324void
3325cede_notself (...)
3326 CODE:
3327 CORO_EXECUTE_SLF_XS (slf_init_cede_notself);
3328
3329void
3330_set_current (SV *current)
3331 PROTOTYPE: $
3332 CODE:
3333 SvREFCNT_dec (SvRV (coro_current));
3334 SvRV_set (coro_current, SvREFCNT_inc_NN (SvRV (current)));
3335
3336void
3337_set_readyhook (SV *hook)
3338 PROTOTYPE: $
3339 CODE:
3340 SvREFCNT_dec (coro_readyhook);
3341 SvGETMAGIC (hook);
3342 coro_readyhook = SvOK (hook) ? newSVsv (hook) : 0;
3343
3344int
3345prio (Coro::State coro, int newprio = 0)
3346 PROTOTYPE: $;$
3347 ALIAS:
3348 nice = 1
3349 CODE:
3350{
3351 RETVAL = coro->prio;
3352
3353 if (items > 1)
3354 {
3355 if (ix)
3356 newprio = coro->prio - newprio;
3357
3358 if (newprio < PRIO_MIN) newprio = PRIO_MIN;
3359 if (newprio > PRIO_MAX) newprio = PRIO_MAX;
3360
3361 coro->prio = newprio;
3362 }
3363}
3364 OUTPUT:
3365 RETVAL
3366
3367SV *
3368ready (SV *self)
3369 PROTOTYPE: $
3370 CODE:
3371 RETVAL = boolSV (api_ready (aTHX_ self));
3372 OUTPUT:
3373 RETVAL
3374
3375int
3376nready (...)
3377 PROTOTYPE:
3378 CODE:
3379 RETVAL = coro_nready;
3380 OUTPUT:
3381 RETVAL
3382
3383void
3384suspend (Coro::State self)
3385 PROTOTYPE: $
3386 CODE:
3387 self->flags |= CF_SUSPENDED;
3388
3389void
3390resume (Coro::State self)
3391 PROTOTYPE: $
3392 CODE:
3393 self->flags &= ~CF_SUSPENDED;
3394
3395void
3396_pool_handler (...)
3397 CODE:
3398 CORO_EXECUTE_SLF_XS (slf_init_pool_handler);
3399
3400void
3401async_pool (SV *cv, ...)
3402 PROTOTYPE: &@
3403 PPCODE:
3404{
3405 HV *hv = (HV *)av_pop (av_async_pool);
3406 AV *av = newAV ();
3407 SV *cb = ST (0);
3408 int i;
3409
3410 av_extend (av, items - 2);
3411 for (i = 1; i < items; ++i)
3412 av_push (av, SvREFCNT_inc_NN (ST (i)));
3413
3414 if ((SV *)hv == &PL_sv_undef)
3415 {
3416 PUSHMARK (SP);
3417 EXTEND (SP, 2);
3418 PUSHs (sv_Coro);
3419 PUSHs ((SV *)cv_pool_handler);
476 PUTBACK; 3420 PUTBACK;
477 SAVE(aTHX_ (&temp)); 3421 call_sv ((SV *)cv_coro_state_new, G_SCALAR);
478 LOAD(aTHX_ coro);
479
480 destroy_stacks ();
481 SvREFCNT_dec ((SV *)GvAV (PL_defgv));
482
483 LOAD((&temp));
484 SPAGAIN; 3422 SPAGAIN;
3423
3424 hv = (HV *)SvREFCNT_inc_NN (SvRV (POPs));
485 } 3425 }
486 3426
3427 {
3428 struct coro *coro = SvSTATE_hv (hv);
3429
3430 assert (!coro->invoke_cb);
3431 assert (!coro->invoke_av);
3432 coro->invoke_cb = SvREFCNT_inc (cb);
3433 coro->invoke_av = av;
3434 }
3435
3436 api_ready (aTHX_ (SV *)hv);
3437
3438 if (GIMME_V != G_VOID)
3439 XPUSHs (sv_2mortal (newRV_noinc ((SV *)hv)));
3440 else
487 SvREFCNT_dec (coro->args); 3441 SvREFCNT_dec (hv);
488 Safefree (coro); 3442}
489 3443
3444SV *
3445rouse_cb ()
3446 PROTOTYPE:
3447 CODE:
3448 RETVAL = coro_new_rouse_cb (aTHX);
3449 OUTPUT:
3450 RETVAL
490 3451
3452void
3453rouse_wait (...)
3454 PROTOTYPE: ;$
3455 PPCODE:
3456 CORO_EXECUTE_SLF_XS (slf_init_rouse_wait);
3457
3458void
3459on_enter (SV *block)
3460 ALIAS:
3461 on_leave = 1
3462 PROTOTYPE: &
3463 CODE:
3464{
3465 struct coro *coro = SvSTATE_current;
3466 AV **avp = ix ? &coro->on_leave : &coro->on_enter;
3467
3468 block = (SV *)coro_sv_2cv (aTHX_ block);
3469
3470 if (!*avp)
3471 *avp = newAV ();
3472
3473 av_push (*avp, SvREFCNT_inc (block));
3474
3475 if (!ix)
3476 on_enterleave_call (aTHX_ block);
3477
3478 LEAVE; /* pp_entersub unfortunately forces an ENTER/LEAVE around XS calls */
3479 SAVEDESTRUCTOR_X (ix ? coro_pop_on_leave : coro_pop_on_enter, (void *)coro);
3480 ENTER; /* pp_entersub unfortunately forces an ENTER/LEAVE around XS calls */
3481}
3482
3483
3484MODULE = Coro::State PACKAGE = PerlIO::cede
3485
3486BOOT:
3487 PerlIO_define_layer (aTHX_ &PerlIO_cede);
3488
3489
3490MODULE = Coro::State PACKAGE = Coro::Semaphore
3491
3492SV *
3493new (SV *klass, SV *count = 0)
3494 CODE:
3495{
3496 int semcnt = 1;
3497
3498 if (count)
3499 {
3500 SvGETMAGIC (count);
3501
3502 if (SvOK (count))
3503 semcnt = SvIV (count);
3504 }
3505
3506 RETVAL = sv_bless (
3507 coro_waitarray_new (aTHX_ semcnt),
3508 GvSTASH (CvGV (cv))
3509 );
3510}
3511 OUTPUT:
3512 RETVAL
3513
3514# helper for Coro::Channel and others
3515SV *
3516_alloc (int count)
3517 CODE:
3518 RETVAL = coro_waitarray_new (aTHX_ count);
3519 OUTPUT:
3520 RETVAL
3521
3522SV *
3523count (SV *self)
3524 CODE:
3525 RETVAL = newSVsv (AvARRAY ((AV *)SvRV (self))[0]);
3526 OUTPUT:
3527 RETVAL
3528
3529void
3530up (SV *self, int adjust = 1)
3531 ALIAS:
3532 adjust = 1
3533 CODE:
3534 coro_semaphore_adjust (aTHX_ (AV *)SvRV (self), ix ? adjust : 1);
3535
3536void
3537down (...)
3538 CODE:
3539 CORO_EXECUTE_SLF_XS (slf_init_semaphore_down);
3540
3541void
3542wait (...)
3543 CODE:
3544 CORO_EXECUTE_SLF_XS (slf_init_semaphore_wait);
3545
3546void
3547try (SV *self)
3548 PPCODE:
3549{
3550 AV *av = (AV *)SvRV (self);
3551 SV *count_sv = AvARRAY (av)[0];
3552 IV count = SvIVX (count_sv);
3553
3554 if (count > 0)
3555 {
3556 --count;
3557 SvIVX (count_sv) = count;
3558 XSRETURN_YES;
3559 }
3560 else
3561 XSRETURN_NO;
3562}
3563
3564void
3565waiters (SV *self)
3566 PPCODE:
3567{
3568 AV *av = (AV *)SvRV (self);
3569 int wcount = AvFILLp (av) + 1 - 1;
3570
3571 if (GIMME_V == G_SCALAR)
3572 XPUSHs (sv_2mortal (newSViv (wcount)));
3573 else
3574 {
3575 int i;
3576 EXTEND (SP, wcount);
3577 for (i = 1; i <= wcount; ++i)
3578 PUSHs (sv_2mortal (newRV_inc (AvARRAY (av)[i])));
3579 }
3580}
3581
3582MODULE = Coro::State PACKAGE = Coro::SemaphoreSet
3583
3584void
3585_may_delete (SV *sem, int count, int extra_refs)
3586 PPCODE:
3587{
3588 AV *av = (AV *)SvRV (sem);
3589
3590 if (SvREFCNT ((SV *)av) == 1 + extra_refs
3591 && AvFILLp (av) == 0 /* no waiters, just count */
3592 && SvIV (AvARRAY (av)[0]) == count)
3593 XSRETURN_YES;
3594
3595 XSRETURN_NO;
3596}
3597
3598MODULE = Coro::State PACKAGE = Coro::Signal
3599
3600SV *
3601new (SV *klass)
3602 CODE:
3603 RETVAL = sv_bless (
3604 coro_waitarray_new (aTHX_ 0),
3605 GvSTASH (CvGV (cv))
3606 );
3607 OUTPUT:
3608 RETVAL
3609
3610void
3611wait (...)
3612 CODE:
3613 CORO_EXECUTE_SLF_XS (slf_init_signal_wait);
3614
3615void
3616broadcast (SV *self)
3617 CODE:
3618{
3619 AV *av = (AV *)SvRV (self);
3620 coro_signal_wake (aTHX_ av, AvFILLp (av));
3621}
3622
3623void
3624send (SV *self)
3625 CODE:
3626{
3627 AV *av = (AV *)SvRV (self);
3628
3629 if (AvFILLp (av))
3630 coro_signal_wake (aTHX_ av, 1);
3631 else
3632 SvIVX (AvARRAY (av)[0]) = 1; /* remember the signal */
3633}
3634
3635IV
3636awaited (SV *self)
3637 CODE:
3638 RETVAL = AvFILLp ((AV *)SvRV (self)) + 1 - 1;
3639 OUTPUT:
3640 RETVAL
3641
3642
3643MODULE = Coro::State PACKAGE = Coro::AnyEvent
3644
3645BOOT:
3646 sv_activity = coro_get_sv (aTHX_ "Coro::AnyEvent::ACTIVITY", TRUE);
3647
3648void
3649_schedule (...)
3650 CODE:
3651{
3652 static int incede;
3653
3654 api_cede_notself (aTHX);
3655
3656 ++incede;
3657 while (coro_nready >= incede && api_cede (aTHX))
3658 ;
3659
3660 sv_setsv (sv_activity, &PL_sv_undef);
3661 if (coro_nready >= incede)
3662 {
3663 PUSHMARK (SP);
3664 PUTBACK;
3665 call_pv ("Coro::AnyEvent::_activity", G_KEEPERR | G_EVAL | G_VOID | G_DISCARD);
3666 }
3667
3668 --incede;
3669}
3670
3671
3672MODULE = Coro::State PACKAGE = Coro::AIO
3673
3674void
3675_register (char *target, char *proto, SV *req)
3676 CODE:
3677{
3678 CV *req_cv = coro_sv_2cv (aTHX_ req);
3679 /* newXSproto doesn't return the CV on 5.8 */
3680 CV *slf_cv = newXS (target, coro_aio_req_xs, __FILE__);
3681 sv_setpv ((SV *)slf_cv, proto);
3682 sv_magicext ((SV *)slf_cv, (SV *)req_cv, CORO_MAGIC_type_aio, 0, 0, 0);
3683}
3684

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines