ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/Coro/Coro/State.xs
(Generate patch)

Comparing Coro/Coro/State.xs (file contents):
Revision 1.5 by root, Tue Jul 17 02:55:29 2001 UTC vs.
Revision 1.351 by root, Sat Jun 20 08:58:25 2009 UTC

1#include "libcoro/coro.c"
2
3#define PERL_NO_GET_CONTEXT
4#define PERL_EXT
5
1#include "EXTERN.h" 6#include "EXTERN.h"
2#include "perl.h" 7#include "perl.h"
3#include "XSUB.h" 8#include "XSUB.h"
9#include "perliol.h"
4 10
5#if 0 11#include "patchlevel.h"
6# define CHK(x) (void *)0 12
13#include <stdio.h>
14#include <errno.h>
15#include <assert.h>
16
17#ifdef WIN32
18# undef setjmp
19# undef longjmp
20# undef _exit
21# define setjmp _setjmp /* deep magic */
7#else 22#else
8# define CHK(x) if (!(x)) croak("FATAL, CHK: " #x) 23# include <inttypes.h> /* most portable stdint.h */
24#endif
25
26#ifdef HAVE_MMAP
27# include <unistd.h>
28# include <sys/mman.h>
29# ifndef MAP_ANONYMOUS
30# ifdef MAP_ANON
31# define MAP_ANONYMOUS MAP_ANON
32# else
33# undef HAVE_MMAP
34# endif
9#endif 35# endif
36# include <limits.h>
37# ifndef PAGESIZE
38# define PAGESIZE pagesize
39# define BOOT_PAGESIZE pagesize = sysconf (_SC_PAGESIZE)
40static long pagesize;
41# else
42# define BOOT_PAGESIZE (void)0
43# endif
44#else
45# define PAGESIZE 0
46# define BOOT_PAGESIZE (void)0
47#endif
10 48
49#if CORO_USE_VALGRIND
50# include <valgrind/valgrind.h>
51#endif
52
53/* the maximum number of idle cctx that will be pooled */
54static int cctx_max_idle = 4;
55
56#define PERL_VERSION_ATLEAST(a,b,c) \
57 (PERL_REVISION > (a) \
58 || (PERL_REVISION == (a) \
59 && (PERL_VERSION > (b) \
60 || (PERL_VERSION == (b) && PERL_SUBVERSION >= (c)))))
61
62#if !PERL_VERSION_ATLEAST (5,6,0)
63# ifndef PL_ppaddr
64# define PL_ppaddr ppaddr
65# endif
66# ifndef call_sv
67# define call_sv perl_call_sv
68# endif
69# ifndef get_sv
70# define get_sv perl_get_sv
71# endif
72# ifndef get_cv
73# define get_cv perl_get_cv
74# endif
75# ifndef IS_PADGV
76# define IS_PADGV(v) 0
77# endif
78# ifndef IS_PADCONST
79# define IS_PADCONST(v) 0
80# endif
81#endif
82
83/* 5.11 */
84#ifndef CxHASARGS
85# define CxHASARGS(cx) (cx)->blk_sub.hasargs
86#endif
87
88/* 5.10.0 */
89#ifndef SvREFCNT_inc_NN
90# define SvREFCNT_inc_NN(sv) SvREFCNT_inc (sv)
91#endif
92
93/* 5.8.8 */
94#ifndef GV_NOTQUAL
95# define GV_NOTQUAL 0
96#endif
97#ifndef newSV
98# define newSV(l) NEWSV(0,l)
99#endif
100#ifndef CvISXSUB_on
101# define CvISXSUB_on(cv) (void)cv
102#endif
103#ifndef CvISXSUB
104# define CvISXSUB(cv) (CvXSUB (cv) ? TRUE : FALSE)
105#endif
106#ifndef Newx
107# define Newx(ptr,nitems,type) New (0,ptr,nitems,type)
108#endif
109
110/* 5.8.7 */
111#ifndef SvRV_set
112# define SvRV_set(s,v) SvRV(s) = (v)
113#endif
114
115#if !__i386 && !__x86_64 && !__powerpc && !__m68k && !__alpha && !__mips && !__sparc64
116# undef CORO_STACKGUARD
117#endif
118
119#ifndef CORO_STACKGUARD
120# define CORO_STACKGUARD 0
121#endif
122
123/* prefer perl internal functions over our own? */
124#ifndef CORO_PREFER_PERL_FUNCTIONS
125# define CORO_PREFER_PERL_FUNCTIONS 0
126#endif
127
128/* The next macros try to return the current stack pointer, in an as
129 * portable way as possible. */
130#if __GNUC__ >= 4
131# define dSTACKLEVEL int stacklevel_dummy
132# define STACKLEVEL __builtin_frame_address (0)
133#else
134# define dSTACKLEVEL volatile void *stacklevel
135# define STACKLEVEL ((void *)&stacklevel)
136#endif
137
138#define IN_DESTRUCT PL_dirty
139
140#if __GNUC__ >= 3
141# define attribute(x) __attribute__(x)
142# define expect(expr,value) __builtin_expect ((expr),(value))
143# define INLINE static inline
144#else
145# define attribute(x)
146# define expect(expr,value) (expr)
147# define INLINE static
148#endif
149
150#define expect_false(expr) expect ((expr) != 0, 0)
151#define expect_true(expr) expect ((expr) != 0, 1)
152
153#define NOINLINE attribute ((noinline))
154
155#include "CoroAPI.h"
156#define GCoroAPI (&coroapi) /* very sneaky */
157
158#ifdef USE_ITHREADS
159# if CORO_PTHREAD
160static void *coro_thx;
161# endif
162#endif
163
164static double (*nvtime)(); /* so why doesn't it take void? */
165
166/* we hijack an hopefully unused CV flag for our purposes */
167#define CVf_SLF 0x4000
168static OP *pp_slf (pTHX);
169
170static U32 cctx_gen;
171static size_t cctx_stacksize = CORO_STACKSIZE;
172static struct CoroAPI coroapi;
173static AV *main_mainstack; /* used to differentiate between $main and others */
174static JMPENV *main_top_env;
175static HV *coro_state_stash, *coro_stash;
176static volatile SV *coro_mortal; /* will be freed/thrown after next transfer */
177
178static AV *av_destroy; /* destruction queue */
179static SV *sv_manager; /* the manager coro */
180static SV *sv_idle; /* $Coro::idle */
181
182static GV *irsgv; /* $/ */
183static GV *stdoutgv; /* *STDOUT */
184static SV *rv_diehook;
185static SV *rv_warnhook;
186static HV *hv_sig; /* %SIG */
187
188/* async_pool helper stuff */
189static SV *sv_pool_rss;
190static SV *sv_pool_size;
191static SV *sv_async_pool_idle; /* description string */
192static AV *av_async_pool; /* idle pool */
193static SV *sv_Coro; /* class string */
194static CV *cv_pool_handler;
195static CV *cv_coro_state_new;
196
197/* Coro::AnyEvent */
198static SV *sv_activity;
199
200static struct coro_cctx *cctx_first;
201static int cctx_count, cctx_idle;
202
203enum {
204 CC_MAPPED = 0x01,
205 CC_NOREUSE = 0x02, /* throw this away after tracing */
206 CC_TRACE = 0x04,
207 CC_TRACE_SUB = 0x08, /* trace sub calls */
208 CC_TRACE_LINE = 0x10, /* trace each statement */
209 CC_TRACE_ALL = CC_TRACE_SUB | CC_TRACE_LINE,
210};
211
212/* this is a structure representing a c-level coroutine */
213typedef struct coro_cctx
214{
215 struct coro_cctx *next;
216
217 /* the stack */
218 void *sptr;
219 size_t ssize;
220
221 /* cpu state */
222 void *idle_sp; /* sp of top-level transfer/schedule/cede call */
223 JMPENV *idle_te; /* same as idle_sp, but for top_env, TODO: remove once stable */
224 JMPENV *top_env;
225 coro_context cctx;
226
227 U32 gen;
228#if CORO_USE_VALGRIND
229 int valgrind_id;
230#endif
231 unsigned char flags;
232} coro_cctx;
233
234coro_cctx *cctx_current; /* the currently running cctx */
235
236/*****************************************************************************/
237
238enum {
239 CF_RUNNING = 0x0001, /* coroutine is running */
240 CF_READY = 0x0002, /* coroutine is ready */
241 CF_NEW = 0x0004, /* has never been switched to */
242 CF_DESTROYED = 0x0008, /* coroutine data has been freed */
243 CF_SUSPENDED = 0x0010, /* coroutine can't be scheduled */
244};
245
246/* the structure where most of the perl state is stored, overlaid on the cxstack */
247typedef struct
248{
249 SV *defsv;
250 AV *defav;
251 SV *errsv;
252 SV *irsgv;
253 HV *hinthv;
254#define VAR(name,type) type name;
255# include "state.h"
256#undef VAR
257} perl_slots;
258
259#define SLOT_COUNT ((sizeof (perl_slots) + sizeof (PERL_CONTEXT) - 1) / sizeof (PERL_CONTEXT))
260
261/* this is a structure representing a perl-level coroutine */
11struct coro { 262struct coro {
12 U8 dowarn; 263 /* the C coroutine allocated to this perl coroutine, if any */
13 AV *defav; 264 coro_cctx *cctx;
14 265
15 PERL_SI *curstackinfo; 266 /* ready queue */
16 AV *curstack; 267 struct coro *next_ready;
268
269 /* state data */
270 struct CoroSLF slf_frame; /* saved slf frame */
17 AV *mainstack; 271 AV *mainstack;
18 SV **stack_sp; 272 perl_slots *slot; /* basically the saved sp */
19 OP *op;
20 SV **curpad;
21 SV **stack_base;
22 SV **stack_max;
23 SV **tmps_stack;
24 I32 tmps_floor;
25 I32 tmps_ix;
26 I32 tmps_max;
27 I32 *markstack;
28 I32 *markstack_ptr;
29 I32 *markstack_max;
30 I32 *scopestack;
31 I32 scopestack_ix;
32 I32 scopestack_max;
33 ANY *savestack;
34 I32 savestack_ix;
35 I32 savestack_max;
36 OP **retstack;
37 I32 retstack_ix;
38 I32 retstack_max;
39 COP *curcop;
40 273
41 AV *args; 274 CV *startcv; /* the CV to execute */
275 AV *args; /* data associated with this coroutine (initial args) */
276 int refcnt; /* coroutines are refcounted, yes */
277 int flags; /* CF_ flags */
278 HV *hv; /* the perl hash associated with this coro, if any */
279 void (*on_destroy)(pTHX_ struct coro *coro);
280
281 /* statistics */
282 int usecount; /* number of transfers to this coro */
283
284 /* coro process data */
285 int prio;
286 SV *except; /* exception to be thrown */
287 SV *rouse_cb;
288
289 /* async_pool */
290 SV *saved_deffh;
291 SV *invoke_cb;
292 AV *invoke_av;
293
294 /* on_enter/on_leave */
295 AV *on_enter;
296 AV *on_leave;
297
298 /* linked list */
299 struct coro *next, *prev;
42}; 300};
43 301
44typedef struct coro *Coro__State; 302typedef struct coro *Coro__State;
45typedef struct coro *Coro__State_or_hashref; 303typedef struct coro *Coro__State_or_hashref;
46 304
47static HV *padlist_cache; 305/* the following variables are effectively part of the perl context */
306/* and get copied between struct coro and these variables */
307/* the mainr easonw e don't support windows process emulation */
308static struct CoroSLF slf_frame; /* the current slf frame */
48 309
49/* mostly copied from op.c:cv_clone2 */ 310/** Coro ********************************************************************/
50STATIC AV * 311
51clone_padlist (AV *protopadlist) 312#define PRIO_MAX 3
313#define PRIO_HIGH 1
314#define PRIO_NORMAL 0
315#define PRIO_LOW -1
316#define PRIO_IDLE -3
317#define PRIO_MIN -4
318
319/* for Coro.pm */
320static SV *coro_current;
321static SV *coro_readyhook;
322static struct coro *coro_ready [PRIO_MAX - PRIO_MIN + 1][2]; /* head|tail */
323static CV *cv_coro_run, *cv_coro_terminate;
324static struct coro *coro_first;
325#define coro_nready coroapi.nready
326
327/** lowlevel stuff **********************************************************/
328
329static SV *
330coro_get_sv (pTHX_ const char *name, int create)
52{ 331{
53 AV *av; 332#if PERL_VERSION_ATLEAST (5,10,0)
54 I32 ix; 333 /* silence stupid and wrong 5.10 warning that I am unable to switch off */
55 AV *protopad_name = (AV *) * av_fetch (protopadlist, 0, FALSE); 334 get_sv (name, create);
56 AV *protopad = (AV *) * av_fetch (protopadlist, 1, FALSE); 335#endif
57 SV **pname = AvARRAY (protopad_name); 336 return get_sv (name, create);
58 SV **ppad = AvARRAY (protopad); 337}
59 I32 fname = AvFILLp (protopad_name); 338
60 I32 fpad = AvFILLp (protopad); 339static AV *
340coro_get_av (pTHX_ const char *name, int create)
341{
342#if PERL_VERSION_ATLEAST (5,10,0)
343 /* silence stupid and wrong 5.10 warning that I am unable to switch off */
344 get_av (name, create);
345#endif
346 return get_av (name, create);
347}
348
349static HV *
350coro_get_hv (pTHX_ const char *name, int create)
351{
352#if PERL_VERSION_ATLEAST (5,10,0)
353 /* silence stupid and wrong 5.10 warning that I am unable to switch off */
354 get_hv (name, create);
355#endif
356 return get_hv (name, create);
357}
358
359/* may croak */
360INLINE CV *
361coro_sv_2cv (pTHX_ SV *sv)
362{
363 HV *st;
364 GV *gvp;
365 CV *cv = sv_2cv (sv, &st, &gvp, 0);
366
367 if (!cv)
368 croak ("code reference expected");
369
370 return cv;
371}
372
373/*****************************************************************************/
374/* magic glue */
375
376#define CORO_MAGIC_type_cv 26
377#define CORO_MAGIC_type_state PERL_MAGIC_ext
378
379#define CORO_MAGIC_NN(sv, type) \
380 (expect_true (SvMAGIC (sv)->mg_type == type) \
381 ? SvMAGIC (sv) \
382 : mg_find (sv, type))
383
384#define CORO_MAGIC(sv, type) \
385 (expect_true (SvMAGIC (sv)) \
386 ? CORO_MAGIC_NN (sv, type) \
387 : 0)
388
389#define CORO_MAGIC_cv(cv) CORO_MAGIC (((SV *)(cv)), CORO_MAGIC_type_cv)
390#define CORO_MAGIC_state(sv) CORO_MAGIC_NN (((SV *)(sv)), CORO_MAGIC_type_state)
391
392INLINE struct coro *
393SvSTATE_ (pTHX_ SV *coro)
394{
395 HV *stash;
396 MAGIC *mg;
397
398 if (SvROK (coro))
399 coro = SvRV (coro);
400
401 if (expect_false (SvTYPE (coro) != SVt_PVHV))
402 croak ("Coro::State object required");
403
404 stash = SvSTASH (coro);
405 if (expect_false (stash != coro_stash && stash != coro_state_stash))
406 {
407 /* very slow, but rare, check */
408 if (!sv_derived_from (sv_2mortal (newRV_inc (coro)), "Coro::State"))
409 croak ("Coro::State object required");
410 }
411
412 mg = CORO_MAGIC_state (coro);
413 return (struct coro *)mg->mg_ptr;
414}
415
416#define SvSTATE(sv) SvSTATE_ (aTHX_ (sv))
417
418/* faster than SvSTATE, but expects a coroutine hv */
419#define SvSTATE_hv(hv) ((struct coro *)CORO_MAGIC_NN ((SV *)hv, CORO_MAGIC_type_state)->mg_ptr)
420#define SvSTATE_current SvSTATE_hv (SvRV (coro_current))
421
422/*****************************************************************************/
423/* padlist management and caching */
424
425static AV *
426coro_derive_padlist (pTHX_ CV *cv)
427{
428 AV *padlist = CvPADLIST (cv);
61 AV *newpadlist, *newpad_name, *newpad; 429 AV *newpadlist, *newpad;
62 SV **npad;
63
64 newpad_name = newAV ();
65 for (ix = fname; ix >= 0; ix--)
66 av_store (newpad_name, ix, SvREFCNT_inc (pname[ix]));
67
68 newpad = newAV ();
69 av_fill (newpad, AvFILLp (protopad));
70 npad = AvARRAY (newpad);
71 430
72 newpadlist = newAV (); 431 newpadlist = newAV ();
73 AvREAL_off (newpadlist); 432 AvREAL_off (newpadlist);
74 av_store (newpadlist, 0, (SV *) newpad_name); 433#if PERL_VERSION_ATLEAST (5,10,0)
434 Perl_pad_push (aTHX_ padlist, AvFILLp (padlist) + 1);
435#else
436 Perl_pad_push (aTHX_ padlist, AvFILLp (padlist) + 1, 1);
437#endif
438 newpad = (AV *)AvARRAY (padlist)[AvFILLp (padlist)];
439 --AvFILLp (padlist);
440
441 av_store (newpadlist, 0, SvREFCNT_inc_NN (AvARRAY (padlist)[0]));
75 av_store (newpadlist, 1, (SV *) newpad); 442 av_store (newpadlist, 1, (SV *)newpad);
76 443
77 av = newAV (); /* will be @_ */ 444 return newpadlist;
78 av_extend (av, 0); 445}
79 av_store (newpad, 0, (SV *) av);
80 AvFLAGS (av) = AVf_REIFY;
81 446
82 for (ix = fpad; ix > 0; ix--) 447static void
448free_padlist (pTHX_ AV *padlist)
449{
450 /* may be during global destruction */
451 if (!IN_DESTRUCT)
83 { 452 {
84 SV *namesv = (ix <= fname) ? pname[ix] : Nullsv; 453 I32 i = AvFILLp (padlist);
85 if (namesv && namesv != &PL_sv_undef) 454
455 while (i > 0) /* special-case index 0 */
86 { 456 {
87 char *name = SvPVX (namesv); /* XXX */ 457 /* we try to be extra-careful here */
88 if (SvFLAGS (namesv) & SVf_FAKE || *name == '&') 458 AV *av = (AV *)AvARRAY (padlist)[i--];
89 { /* lexical from outside? */ 459 I32 j = AvFILLp (av);
90 npad[ix] = SvREFCNT_inc (ppad[ix]); 460
91 } 461 while (j >= 0)
92 else 462 SvREFCNT_dec (AvARRAY (av)[j--]);
93 { /* our own lexical */ 463
94 SV *sv; 464 AvFILLp (av) = -1;
95 if (*name == '&') 465 SvREFCNT_dec (av);
96 sv = SvREFCNT_inc (ppad[ix]);
97 else if (*name == '@')
98 sv = (SV *) newAV ();
99 else if (*name == '%')
100 sv = (SV *) newHV ();
101 else
102 sv = NEWSV (0, 0);
103 if (!SvPADBUSY (sv))
104 SvPADMY_on (sv);
105 npad[ix] = sv;
106 }
107 } 466 }
108 else if (IS_PADGV (ppad[ix]) || IS_PADCONST (ppad[ix]))
109 {
110 npad[ix] = SvREFCNT_inc (ppad[ix]);
111 }
112 else
113 {
114 SV *sv = NEWSV (0, 0);
115 SvPADTMP_on (sv);
116 npad[ix] = sv;
117 }
118 }
119 467
120#if 0 /* NONOTUNDERSTOOD */ 468 SvREFCNT_dec (AvARRAY (padlist)[0]);
121 /* Now that vars are all in place, clone nested closures. */
122 469
123 for (ix = fpad; ix > 0; ix--) {
124 SV* namesv = (ix <= fname) ? pname[ix] : Nullsv;
125 if (namesv
126 && namesv != &PL_sv_undef
127 && !(SvFLAGS(namesv) & SVf_FAKE)
128 && *SvPVX(namesv) == '&'
129 && CvCLONE(ppad[ix]))
130 {
131 CV *kid = cv_clone((CV*)ppad[ix]);
132 SvREFCNT_dec(ppad[ix]);
133 CvCLONE_on(kid);
134 SvPADMY_on(kid);
135 npad[ix] = (SV*)kid;
136 }
137 }
138#endif
139
140 return newpadlist;
141}
142
143STATIC AV *
144free_padlist (AV *padlist)
145{
146 /* may be during global destruction */
147 if (SvREFCNT(padlist))
148 {
149 I32 i = AvFILLp(padlist); 470 AvFILLp (padlist) = -1;
150 while (i >= 0)
151 {
152 SV **svp = av_fetch(padlist, i--, FALSE);
153 SV *sv = svp ? *svp : Nullsv;
154 if (sv)
155 SvREFCNT_dec(sv);
156 }
157
158 SvREFCNT_dec((SV*)padlist); 471 SvREFCNT_dec ((SV*)padlist);
472 }
473}
474
475static int
476coro_cv_free (pTHX_ SV *sv, MAGIC *mg)
477{
478 AV *padlist;
479 AV *av = (AV *)mg->mg_obj;
480
481 /* casting is fun. */
482 while (&PL_sv_undef != (SV *)(padlist = (AV *)av_pop (av)))
483 free_padlist (aTHX_ padlist);
484
485 SvREFCNT_dec (av); /* sv_magicext increased the refcount */
486
487 return 0;
488}
489
490static MGVTBL coro_cv_vtbl = {
491 0, 0, 0, 0,
492 coro_cv_free
493};
494
495/* the next two functions merely cache the padlists */
496static void
497get_padlist (pTHX_ CV *cv)
498{
499 MAGIC *mg = CORO_MAGIC_cv (cv);
500 AV *av;
501
502 if (expect_true (mg && AvFILLp ((av = (AV *)mg->mg_obj)) >= 0))
503 CvPADLIST (cv) = (AV *)AvARRAY (av)[AvFILLp (av)--];
504 else
505 {
506#if CORO_PREFER_PERL_FUNCTIONS
507 /* this is probably cleaner? but also slower! */
508 /* in practise, it seems to be less stable */
509 CV *cp = Perl_cv_clone (aTHX_ cv);
510 CvPADLIST (cv) = CvPADLIST (cp);
511 CvPADLIST (cp) = 0;
512 SvREFCNT_dec (cp);
513#else
514 CvPADLIST (cv) = coro_derive_padlist (aTHX_ cv);
515#endif
516 }
517}
518
519static void
520put_padlist (pTHX_ CV *cv)
521{
522 MAGIC *mg = CORO_MAGIC_cv (cv);
523 AV *av;
524
525 if (expect_false (!mg))
526 mg = sv_magicext ((SV *)cv, (SV *)newAV (), CORO_MAGIC_type_cv, &coro_cv_vtbl, 0, 0);
527
528 av = (AV *)mg->mg_obj;
529
530 if (expect_false (AvFILLp (av) >= AvMAX (av)))
531 av_extend (av, AvFILLp (av) + 1);
532
533 AvARRAY (av)[++AvFILLp (av)] = (SV *)CvPADLIST (cv);
534}
535
536/** load & save, init *******************************************************/
537
538static void
539on_enterleave_call (pTHX_ SV *cb);
540
541static void
542load_perl (pTHX_ Coro__State c)
543{
544 perl_slots *slot = c->slot;
545 c->slot = 0;
546
547 PL_mainstack = c->mainstack;
548
549 GvSV (PL_defgv) = slot->defsv;
550 GvAV (PL_defgv) = slot->defav;
551 GvSV (PL_errgv) = slot->errsv;
552 GvSV (irsgv) = slot->irsgv;
553 GvHV (PL_hintgv) = slot->hinthv;
554
555 #define VAR(name,type) PL_ ## name = slot->name;
556 # include "state.h"
557 #undef VAR
558
559 {
560 dSP;
561
562 CV *cv;
563
564 /* now do the ugly restore mess */
565 while (expect_true (cv = (CV *)POPs))
566 {
567 put_padlist (aTHX_ cv); /* mark this padlist as available */
568 CvDEPTH (cv) = PTR2IV (POPs);
569 CvPADLIST (cv) = (AV *)POPs;
570 }
571
572 PUTBACK;
159 } 573 }
160}
161 574
162/* the next tow functions merely cache the padlists */ 575 slf_frame = c->slf_frame;
163STATIC void 576 CORO_THROW = c->except;
164get_padlist (CV *cv)
165{
166 SV **he = hv_fetch (padlist_cache, (void *)&cv, sizeof (CV *), 0);
167 577
168 if (he && AvFILLp ((AV *)*he) >= 0) 578 if (expect_false (c->on_enter))
169 CvPADLIST (cv) = (AV *)av_pop ((AV *)*he);
170 else
171 CvPADLIST (cv) = clone_padlist (CvPADLIST (cv));
172}
173
174STATIC void
175put_padlist (CV *cv)
176{
177 SV **he = hv_fetch (padlist_cache, (void *)&cv, sizeof (CV *), 1);
178
179 if (SvTYPE (*he) != SVt_PVAV)
180 {
181 SvREFCNT_dec (*he);
182 *he = (SV *)newAV ();
183 } 579 {
580 int i;
184 581
185 av_push ((AV *)*he, (SV *)CvPADLIST (cv)); 582 for (i = 0; i <= AvFILLp (c->on_enter); ++i)
583 on_enterleave_call (aTHX_ AvARRAY (c->on_enter)[i]);
584 }
186} 585}
187 586
188static void 587static void
189SAVE(pTHX_ Coro__State c) 588save_perl (pTHX_ Coro__State c)
190{ 589{
590 if (expect_false (c->on_leave))
591 {
592 int i;
593
594 for (i = AvFILLp (c->on_leave); i >= 0; --i)
595 on_enterleave_call (aTHX_ AvARRAY (c->on_leave)[i]);
596 }
597
598 c->except = CORO_THROW;
599 c->slf_frame = slf_frame;
600
191 { 601 {
192 dSP; 602 dSP;
193 I32 cxix = cxstack_ix; 603 I32 cxix = cxstack_ix;
604 PERL_CONTEXT *ccstk = cxstack;
194 PERL_SI *top_si = PL_curstackinfo; 605 PERL_SI *top_si = PL_curstackinfo;
195 PERL_CONTEXT *ccstk = cxstack;
196 606
197 /* 607 /*
198 * the worst thing you can imagine happens first - we have to save 608 * the worst thing you can imagine happens first - we have to save
199 * (and reinitialize) all cv's in the whole callchain :( 609 * (and reinitialize) all cv's in the whole callchain :(
200 */ 610 */
201 611
202 PUSHs (Nullsv); 612 XPUSHs (Nullsv);
203 /* this loop was inspired by pp_caller */ 613 /* this loop was inspired by pp_caller */
204 for (;;) 614 for (;;)
205 { 615 {
206 while (cxix >= 0) 616 while (expect_true (cxix >= 0))
207 { 617 {
208 PERL_CONTEXT *cx = &ccstk[cxix--]; 618 PERL_CONTEXT *cx = &ccstk[cxix--];
209 619
210 if (CxTYPE(cx) == CXt_SUB) 620 if (expect_true (CxTYPE (cx) == CXt_SUB || CxTYPE (cx) == CXt_FORMAT))
211 { 621 {
212 CV *cv = cx->blk_sub.cv; 622 CV *cv = cx->blk_sub.cv;
623
213 if (CvDEPTH(cv)) 624 if (expect_true (CvDEPTH (cv)))
214 { 625 {
215#ifdef USE_THREADS
216 XPUSHs ((SV *)CvOWNER(cv));
217#endif
218 EXTEND (SP, 3); 626 EXTEND (SP, 3);
219 PUSHs ((SV *)CvDEPTH(cv));
220 PUSHs ((SV *)CvPADLIST(cv)); 627 PUSHs ((SV *)CvPADLIST (cv));
628 PUSHs (INT2PTR (SV *, (IV)CvDEPTH (cv)));
221 PUSHs ((SV *)cv); 629 PUSHs ((SV *)cv);
222 630
223 get_padlist (cv);
224
225 CvDEPTH(cv) = 0; 631 CvDEPTH (cv) = 0;
226#ifdef USE_THREADS 632 get_padlist (aTHX_ cv);
227 CvOWNER(cv) = 0;
228 error must unlock this cv etc.. etc...
229 if you are here wondering about this error message then
230 the reason is that it will not work as advertised yet
231#endif
232 } 633 }
233 } 634 }
234 else if (CxTYPE(cx) == CXt_FORMAT) 635 }
636
637 if (expect_true (top_si->si_type == PERLSI_MAIN))
638 break;
639
640 top_si = top_si->si_prev;
641 ccstk = top_si->si_cxstack;
642 cxix = top_si->si_cxix;
643 }
644
645 PUTBACK;
646 }
647
648 /* allocate some space on the context stack for our purposes */
649 /* we manually unroll here, as usually 2 slots is enough */
650 if (SLOT_COUNT >= 1) CXINC;
651 if (SLOT_COUNT >= 2) CXINC;
652 if (SLOT_COUNT >= 3) CXINC;
653 {
654 int i;
655 for (i = 3; i < SLOT_COUNT; ++i)
656 CXINC;
657 }
658 cxstack_ix -= SLOT_COUNT; /* undo allocation */
659
660 c->mainstack = PL_mainstack;
661
662 {
663 perl_slots *slot = c->slot = (perl_slots *)(cxstack + cxstack_ix + 1);
664
665 slot->defav = GvAV (PL_defgv);
666 slot->defsv = DEFSV;
667 slot->errsv = ERRSV;
668 slot->irsgv = GvSV (irsgv);
669 slot->hinthv = GvHV (PL_hintgv);
670
671 #define VAR(name,type) slot->name = PL_ ## name;
672 # include "state.h"
673 #undef VAR
674 }
675}
676
677/*
678 * allocate various perl stacks. This is almost an exact copy
679 * of perl.c:init_stacks, except that it uses less memory
680 * on the (sometimes correct) assumption that coroutines do
681 * not usually need a lot of stackspace.
682 */
683#if CORO_PREFER_PERL_FUNCTIONS
684# define coro_init_stacks(thx) init_stacks ()
685#else
686static void
687coro_init_stacks (pTHX)
688{
689 PL_curstackinfo = new_stackinfo(32, 8);
690 PL_curstackinfo->si_type = PERLSI_MAIN;
691 PL_curstack = PL_curstackinfo->si_stack;
692 PL_mainstack = PL_curstack; /* remember in case we switch stacks */
693
694 PL_stack_base = AvARRAY(PL_curstack);
695 PL_stack_sp = PL_stack_base;
696 PL_stack_max = PL_stack_base + AvMAX(PL_curstack);
697
698 New(50,PL_tmps_stack,32,SV*);
699 PL_tmps_floor = -1;
700 PL_tmps_ix = -1;
701 PL_tmps_max = 32;
702
703 New(54,PL_markstack,16,I32);
704 PL_markstack_ptr = PL_markstack;
705 PL_markstack_max = PL_markstack + 16;
706
707#ifdef SET_MARK_OFFSET
708 SET_MARK_OFFSET;
709#endif
710
711 New(54,PL_scopestack,8,I32);
712 PL_scopestack_ix = 0;
713 PL_scopestack_max = 8;
714
715 New(54,PL_savestack,24,ANY);
716 PL_savestack_ix = 0;
717 PL_savestack_max = 24;
718
719#if !PERL_VERSION_ATLEAST (5,10,0)
720 New(54,PL_retstack,4,OP*);
721 PL_retstack_ix = 0;
722 PL_retstack_max = 4;
723#endif
724}
725#endif
726
727/*
728 * destroy the stacks, the callchain etc...
729 */
730static void
731coro_destruct_stacks (pTHX)
732{
733 while (PL_curstackinfo->si_next)
734 PL_curstackinfo = PL_curstackinfo->si_next;
735
736 while (PL_curstackinfo)
737 {
738 PERL_SI *p = PL_curstackinfo->si_prev;
739
740 if (!IN_DESTRUCT)
741 SvREFCNT_dec (PL_curstackinfo->si_stack);
742
743 Safefree (PL_curstackinfo->si_cxstack);
744 Safefree (PL_curstackinfo);
745 PL_curstackinfo = p;
746 }
747
748 Safefree (PL_tmps_stack);
749 Safefree (PL_markstack);
750 Safefree (PL_scopestack);
751 Safefree (PL_savestack);
752#if !PERL_VERSION_ATLEAST (5,10,0)
753 Safefree (PL_retstack);
754#endif
755}
756
757#define CORO_RSS \
758 rss += sizeof (SYM (curstackinfo)); \
759 rss += (SYM (curstackinfo->si_cxmax) + 1) * sizeof (PERL_CONTEXT); \
760 rss += sizeof (SV) + sizeof (struct xpvav) + (1 + AvMAX (SYM (curstack))) * sizeof (SV *); \
761 rss += SYM (tmps_max) * sizeof (SV *); \
762 rss += (SYM (markstack_max) - SYM (markstack_ptr)) * sizeof (I32); \
763 rss += SYM (scopestack_max) * sizeof (I32); \
764 rss += SYM (savestack_max) * sizeof (ANY);
765
766static size_t
767coro_rss (pTHX_ struct coro *coro)
768{
769 size_t rss = sizeof (*coro);
770
771 if (coro->mainstack)
772 {
773 if (coro->flags & CF_RUNNING)
774 {
775 #define SYM(sym) PL_ ## sym
776 CORO_RSS;
777 #undef SYM
778 }
779 else
780 {
781 #define SYM(sym) coro->slot->sym
782 CORO_RSS;
783 #undef SYM
784 }
785 }
786
787 return rss;
788}
789
790/** coroutine stack handling ************************************************/
791
792static int (*orig_sigelem_get) (pTHX_ SV *sv, MAGIC *mg);
793static int (*orig_sigelem_set) (pTHX_ SV *sv, MAGIC *mg);
794static int (*orig_sigelem_clr) (pTHX_ SV *sv, MAGIC *mg);
795
796/* apparently < 5.8.8 */
797#ifndef MgPV_nolen_const
798#define MgPV_nolen_const(mg) (((((int)(mg)->mg_len)) == HEf_SVKEY) ? \
799 SvPV_nolen((SV*)((mg)->mg_ptr)) : \
800 (const char*)(mg)->mg_ptr)
801#endif
802
803/*
804 * This overrides the default magic get method of %SIG elements.
805 * The original one doesn't provide for reading back of PL_diehook/PL_warnhook
806 * and instead of trying to save and restore the hash elements, we just provide
807 * readback here.
808 */
809static int
810coro_sigelem_get (pTHX_ SV *sv, MAGIC *mg)
811{
812 const char *s = MgPV_nolen_const (mg);
813
814 if (*s == '_')
815 {
816 SV **svp = 0;
817
818 if (strEQ (s, "__DIE__" )) svp = &PL_diehook;
819 if (strEQ (s, "__WARN__")) svp = &PL_warnhook;
820
821 if (svp)
822 {
823 sv_setsv (sv, *svp ? *svp : &PL_sv_undef);
824 return 0;
825 }
826 }
827
828 return orig_sigelem_get ? orig_sigelem_get (aTHX_ sv, mg) : 0;
829}
830
831static int
832coro_sigelem_clr (pTHX_ SV *sv, MAGIC *mg)
833{
834 const char *s = MgPV_nolen_const (mg);
835
836 if (*s == '_')
837 {
838 SV **svp = 0;
839
840 if (strEQ (s, "__DIE__" )) svp = &PL_diehook;
841 if (strEQ (s, "__WARN__")) svp = &PL_warnhook;
842
843 if (svp)
844 {
845 SV *old = *svp;
846 *svp = 0;
847 SvREFCNT_dec (old);
848 return 0;
849 }
850 }
851
852 return orig_sigelem_clr ? orig_sigelem_clr (aTHX_ sv, mg) : 0;
853}
854
855static int
856coro_sigelem_set (pTHX_ SV *sv, MAGIC *mg)
857{
858 const char *s = MgPV_nolen_const (mg);
859
860 if (*s == '_')
861 {
862 SV **svp = 0;
863
864 if (strEQ (s, "__DIE__" )) svp = &PL_diehook;
865 if (strEQ (s, "__WARN__")) svp = &PL_warnhook;
866
867 if (svp)
868 {
869 SV *old = *svp;
870 *svp = SvOK (sv) ? newSVsv (sv) : 0;
871 SvREFCNT_dec (old);
872 return 0;
873 }
874 }
875
876 return orig_sigelem_set ? orig_sigelem_set (aTHX_ sv, mg) : 0;
877}
878
879static void
880prepare_nop (pTHX_ struct coro_transfer_args *ta)
881{
882 /* kind of mega-hacky, but works */
883 ta->next = ta->prev = (struct coro *)ta;
884}
885
886static int
887slf_check_nop (pTHX_ struct CoroSLF *frame)
888{
889 return 0;
890}
891
892static int
893slf_check_repeat (pTHX_ struct CoroSLF *frame)
894{
895 return 1;
896}
897
898static UNOP coro_setup_op;
899
900static void NOINLINE /* noinline to keep it out of the transfer fast path */
901coro_setup (pTHX_ struct coro *coro)
902{
903 /*
904 * emulate part of the perl startup here.
905 */
906 coro_init_stacks (aTHX);
907
908 PL_runops = RUNOPS_DEFAULT;
909 PL_curcop = &PL_compiling;
910 PL_in_eval = EVAL_NULL;
911 PL_comppad = 0;
912 PL_comppad_name = 0;
913 PL_comppad_name_fill = 0;
914 PL_comppad_name_floor = 0;
915 PL_curpm = 0;
916 PL_curpad = 0;
917 PL_localizing = 0;
918 PL_dirty = 0;
919 PL_restartop = 0;
920#if PERL_VERSION_ATLEAST (5,10,0)
921 PL_parser = 0;
922#endif
923 PL_hints = 0;
924
925 /* recreate the die/warn hooks */
926 PL_diehook = 0; SvSetMagicSV (*hv_fetch (hv_sig, "__DIE__" , sizeof ("__DIE__" ) - 1, 1), rv_diehook );
927 PL_warnhook = 0; SvSetMagicSV (*hv_fetch (hv_sig, "__WARN__", sizeof ("__WARN__") - 1, 1), rv_warnhook);
928
929 GvSV (PL_defgv) = newSV (0);
930 GvAV (PL_defgv) = coro->args; coro->args = 0;
931 GvSV (PL_errgv) = newSV (0);
932 GvSV (irsgv) = newSVpvn ("\n", 1); sv_magic (GvSV (irsgv), (SV *)irsgv, PERL_MAGIC_sv, "/", 0);
933 GvHV (PL_hintgv) = 0;
934 PL_rs = newSVsv (GvSV (irsgv));
935 PL_defoutgv = (GV *)SvREFCNT_inc_NN (stdoutgv);
936
937 {
938 dSP;
939 UNOP myop;
940
941 Zero (&myop, 1, UNOP);
942 myop.op_next = Nullop;
943 myop.op_type = OP_ENTERSUB;
944 myop.op_flags = OPf_WANT_VOID;
945
946 PUSHMARK (SP);
947 PUSHs ((SV *)coro->startcv);
948 PUTBACK;
949 PL_op = (OP *)&myop;
950 PL_op = PL_ppaddr[OP_ENTERSUB](aTHX);
951 }
952
953 /* this newly created coroutine might be run on an existing cctx which most
954 * likely was suspended in pp_slf, so we have to emulate entering pp_slf here.
955 */
956 slf_frame.prepare = prepare_nop; /* provide a nop function for an eventual pp_slf */
957 slf_frame.check = slf_check_nop; /* signal pp_slf to not repeat */
958
959 /* and we have to provide the pp_slf op in any case, so pp_slf can skip it */
960 coro_setup_op.op_next = PL_op;
961 coro_setup_op.op_type = OP_ENTERSUB;
962 coro_setup_op.op_ppaddr = pp_slf;
963 /* no flags etc. required, as an init function won't be called */
964
965 PL_op = (OP *)&coro_setup_op;
966
967 /* copy throw, in case it was set before coro_setup */
968 CORO_THROW = coro->except;
969}
970
971static void
972coro_unwind_stacks (pTHX)
973{
974 if (!IN_DESTRUCT)
975 {
976 /* restore all saved variables and stuff */
977 LEAVE_SCOPE (0);
978 assert (PL_tmps_floor == -1);
979
980 /* free all temporaries */
981 FREETMPS;
982 assert (PL_tmps_ix == -1);
983
984 /* unwind all extra stacks */
985 POPSTACK_TO (PL_mainstack);
986
987 /* unwind main stack */
988 dounwind (-1);
989 }
990}
991
992static void
993coro_destruct_perl (pTHX_ struct coro *coro)
994{
995 SV *svf [9];
996
997 {
998 struct coro *current = SvSTATE_current;
999
1000 assert (("FATAL: tried to destroy currently running coroutine", coro->mainstack != PL_mainstack));
1001
1002 save_perl (aTHX_ current);
1003 load_perl (aTHX_ coro);
1004 coro_unwind_stacks (aTHX);
1005 coro_destruct_stacks (aTHX);
1006
1007 // now save some sv's to be free'd later
1008 svf [0] = GvSV (PL_defgv);
1009 svf [1] = (SV *)GvAV (PL_defgv);
1010 svf [2] = GvSV (PL_errgv);
1011 svf [3] = (SV *)PL_defoutgv;
1012 svf [4] = PL_rs;
1013 svf [5] = GvSV (irsgv);
1014 svf [6] = (SV *)GvHV (PL_hintgv);
1015 svf [7] = PL_diehook;
1016 svf [8] = PL_warnhook;
1017 assert (9 == sizeof (svf) / sizeof (*svf));
1018
1019 load_perl (aTHX_ current);
1020 }
1021
1022 {
1023 int i;
1024
1025 for (i = 0; i < sizeof (svf) / sizeof (*svf); ++i)
1026 SvREFCNT_dec (svf [i]);
1027
1028 SvREFCNT_dec (coro->saved_deffh);
1029 SvREFCNT_dec (coro->rouse_cb);
1030 SvREFCNT_dec (coro->invoke_cb);
1031 SvREFCNT_dec (coro->invoke_av);
1032 }
1033}
1034
1035INLINE void
1036free_coro_mortal (pTHX)
1037{
1038 if (expect_true (coro_mortal))
1039 {
1040 SvREFCNT_dec (coro_mortal);
1041 coro_mortal = 0;
1042 }
1043}
1044
1045static int
1046runops_trace (pTHX)
1047{
1048 COP *oldcop = 0;
1049 int oldcxix = -2;
1050
1051 while ((PL_op = CALL_FPTR (PL_op->op_ppaddr) (aTHX)))
1052 {
1053 PERL_ASYNC_CHECK ();
1054
1055 if (cctx_current->flags & CC_TRACE_ALL)
1056 {
1057 if (PL_op->op_type == OP_LEAVESUB && cctx_current->flags & CC_TRACE_SUB)
1058 {
1059 PERL_CONTEXT *cx = &cxstack[cxstack_ix];
1060 SV **bot, **top;
1061 AV *av = newAV (); /* return values */
1062 SV **cb;
1063 dSP;
1064
1065 GV *gv = CvGV (cx->blk_sub.cv);
1066 SV *fullname = sv_2mortal (newSV (0));
1067 if (isGV (gv))
1068 gv_efullname3 (fullname, gv, 0);
1069
1070 bot = PL_stack_base + cx->blk_oldsp + 1;
1071 top = cx->blk_gimme == G_ARRAY ? SP + 1
1072 : cx->blk_gimme == G_SCALAR ? bot + 1
1073 : bot;
1074
1075 av_extend (av, top - bot);
1076 while (bot < top)
1077 av_push (av, SvREFCNT_inc_NN (*bot++));
1078
1079 PL_runops = RUNOPS_DEFAULT;
1080 ENTER;
1081 SAVETMPS;
1082 EXTEND (SP, 3);
1083 PUSHMARK (SP);
1084 PUSHs (&PL_sv_no);
1085 PUSHs (fullname);
1086 PUSHs (sv_2mortal (newRV_noinc ((SV *)av)));
1087 PUTBACK;
1088 cb = hv_fetch ((HV *)SvRV (coro_current), "_trace_sub_cb", sizeof ("_trace_sub_cb") - 1, 0);
1089 if (cb) call_sv (*cb, G_KEEPERR | G_EVAL | G_VOID | G_DISCARD);
1090 SPAGAIN;
1091 FREETMPS;
1092 LEAVE;
1093 PL_runops = runops_trace;
1094 }
1095
1096 if (oldcop != PL_curcop)
1097 {
1098 oldcop = PL_curcop;
1099
1100 if (PL_curcop != &PL_compiling)
1101 {
1102 SV **cb;
1103
1104 if (oldcxix != cxstack_ix && cctx_current->flags & CC_TRACE_SUB)
1105 {
1106 PERL_CONTEXT *cx = &cxstack[cxstack_ix];
1107
1108 if (CxTYPE (cx) == CXt_SUB && oldcxix < cxstack_ix)
1109 {
1110 dSP;
1111 GV *gv = CvGV (cx->blk_sub.cv);
1112 SV *fullname = sv_2mortal (newSV (0));
1113
1114 if (isGV (gv))
1115 gv_efullname3 (fullname, gv, 0);
1116
1117 PL_runops = RUNOPS_DEFAULT;
1118 ENTER;
1119 SAVETMPS;
1120 EXTEND (SP, 3);
1121 PUSHMARK (SP);
1122 PUSHs (&PL_sv_yes);
1123 PUSHs (fullname);
1124 PUSHs (CxHASARGS (cx) ? sv_2mortal (newRV_inc ((SV *)cx->blk_sub.argarray)) : &PL_sv_undef);
1125 PUTBACK;
1126 cb = hv_fetch ((HV *)SvRV (coro_current), "_trace_sub_cb", sizeof ("_trace_sub_cb") - 1, 0);
1127 if (cb) call_sv (*cb, G_KEEPERR | G_EVAL | G_VOID | G_DISCARD);
1128 SPAGAIN;
1129 FREETMPS;
1130 LEAVE;
1131 PL_runops = runops_trace;
1132 }
1133
1134 oldcxix = cxstack_ix;
1135 }
1136
1137 if (cctx_current->flags & CC_TRACE_LINE)
1138 {
1139 dSP;
1140
1141 PL_runops = RUNOPS_DEFAULT;
1142 ENTER;
1143 SAVETMPS;
1144 EXTEND (SP, 3);
1145 PL_runops = RUNOPS_DEFAULT;
1146 PUSHMARK (SP);
1147 PUSHs (sv_2mortal (newSVpv (OutCopFILE (oldcop), 0)));
1148 PUSHs (sv_2mortal (newSViv (CopLINE (oldcop))));
1149 PUTBACK;
1150 cb = hv_fetch ((HV *)SvRV (coro_current), "_trace_line_cb", sizeof ("_trace_line_cb") - 1, 0);
1151 if (cb) call_sv (*cb, G_KEEPERR | G_EVAL | G_VOID | G_DISCARD);
1152 SPAGAIN;
1153 FREETMPS;
1154 LEAVE;
1155 PL_runops = runops_trace;
1156 }
1157 }
1158 }
1159 }
1160 }
1161
1162 TAINT_NOT;
1163 return 0;
1164}
1165
1166static struct CoroSLF cctx_ssl_frame;
1167
1168static void
1169slf_prepare_set_stacklevel (pTHX_ struct coro_transfer_args *ta)
1170{
1171 ta->prev = 0;
1172}
1173
1174static int
1175slf_check_set_stacklevel (pTHX_ struct CoroSLF *frame)
1176{
1177 *frame = cctx_ssl_frame;
1178
1179 return frame->check (aTHX_ frame); /* execute the restored frame - there must be one */
1180}
1181
1182/* initialises PL_top_env and injects a pseudo-slf-call to set the stacklevel */
1183static void NOINLINE
1184cctx_prepare (pTHX)
1185{
1186 PL_top_env = &PL_start_env;
1187
1188 if (cctx_current->flags & CC_TRACE)
1189 PL_runops = runops_trace;
1190
1191 /* we already must be executing an SLF op, there is no other valid way
1192 * that can lead to creation of a new cctx */
1193 assert (("FATAL: can't prepare slf-less cctx in Coro module (please report)",
1194 slf_frame.prepare && PL_op->op_ppaddr == pp_slf));
1195
1196 /* we must emulate leaving pp_slf, which is done inside slf_check_set_stacklevel */
1197 cctx_ssl_frame = slf_frame;
1198
1199 slf_frame.prepare = slf_prepare_set_stacklevel;
1200 slf_frame.check = slf_check_set_stacklevel;
1201}
1202
1203/* the tail of transfer: execute stuff we can only do after a transfer */
1204INLINE void
1205transfer_tail (pTHX)
1206{
1207 free_coro_mortal (aTHX);
1208}
1209
1210/*
1211 * this is a _very_ stripped down perl interpreter ;)
1212 */
1213static void
1214cctx_run (void *arg)
1215{
1216#ifdef USE_ITHREADS
1217# if CORO_PTHREAD
1218 PERL_SET_CONTEXT (coro_thx);
1219# endif
1220#endif
1221 {
1222 dTHX;
1223
1224 /* normally we would need to skip the entersub here */
1225 /* not doing so will re-execute it, which is exactly what we want */
1226 /* PL_nop = PL_nop->op_next */
1227
1228 /* inject a fake subroutine call to cctx_init */
1229 cctx_prepare (aTHX);
1230
1231 /* cctx_run is the alternative tail of transfer() */
1232 transfer_tail (aTHX);
1233
1234 /* somebody or something will hit me for both perl_run and PL_restartop */
1235 PL_restartop = PL_op;
1236 perl_run (PL_curinterp);
1237 /*
1238 * Unfortunately, there is no way to get at the return values of the
1239 * coro body here, as perl_run destroys these
1240 */
1241
1242 /*
1243 * If perl-run returns we assume exit() was being called or the coro
1244 * fell off the end, which seems to be the only valid (non-bug)
1245 * reason for perl_run to return. We try to exit by jumping to the
1246 * bootstrap-time "top" top_env, as we cannot restore the "main"
1247 * coroutine as Coro has no such concept.
1248 * This actually isn't valid with the pthread backend, but OSes requiring
1249 * that backend are too broken to do it in a standards-compliant way.
1250 */
1251 PL_top_env = main_top_env;
1252 JMPENV_JUMP (2); /* I do not feel well about the hardcoded 2 at all */
1253 }
1254}
1255
1256static coro_cctx *
1257cctx_new ()
1258{
1259 coro_cctx *cctx;
1260
1261 ++cctx_count;
1262 New (0, cctx, 1, coro_cctx);
1263
1264 cctx->gen = cctx_gen;
1265 cctx->flags = 0;
1266 cctx->idle_sp = 0; /* can be accessed by transfer between cctx_run and set_stacklevel, on throw */
1267
1268 return cctx;
1269}
1270
1271/* create a new cctx only suitable as source */
1272static coro_cctx *
1273cctx_new_empty ()
1274{
1275 coro_cctx *cctx = cctx_new ();
1276
1277 cctx->sptr = 0;
1278 coro_create (&cctx->cctx, 0, 0, 0, 0);
1279
1280 return cctx;
1281}
1282
1283/* create a new cctx suitable as destination/running a perl interpreter */
1284static coro_cctx *
1285cctx_new_run ()
1286{
1287 coro_cctx *cctx = cctx_new ();
1288 void *stack_start;
1289 size_t stack_size;
1290
1291#if HAVE_MMAP
1292 cctx->ssize = ((cctx_stacksize * sizeof (long) + PAGESIZE - 1) / PAGESIZE + CORO_STACKGUARD) * PAGESIZE;
1293 /* mmap supposedly does allocate-on-write for us */
1294 cctx->sptr = mmap (0, cctx->ssize, PROT_EXEC|PROT_READ|PROT_WRITE, MAP_PRIVATE|MAP_ANONYMOUS, 0, 0);
1295
1296 if (cctx->sptr != (void *)-1)
1297 {
1298 #if CORO_STACKGUARD
1299 mprotect (cctx->sptr, CORO_STACKGUARD * PAGESIZE, PROT_NONE);
1300 #endif
1301 stack_start = (char *)cctx->sptr + CORO_STACKGUARD * PAGESIZE;
1302 stack_size = cctx->ssize - CORO_STACKGUARD * PAGESIZE;
1303 cctx->flags |= CC_MAPPED;
1304 }
1305 else
1306#endif
1307 {
1308 cctx->ssize = cctx_stacksize * (long)sizeof (long);
1309 New (0, cctx->sptr, cctx_stacksize, long);
1310
1311 if (!cctx->sptr)
1312 {
1313 perror ("FATAL: unable to allocate stack for coroutine, exiting.");
1314 _exit (EXIT_FAILURE);
1315 }
1316
1317 stack_start = cctx->sptr;
1318 stack_size = cctx->ssize;
1319 }
1320
1321 #if CORO_USE_VALGRIND
1322 cctx->valgrind_id = VALGRIND_STACK_REGISTER ((char *)stack_start, (char *)stack_start + stack_size);
1323 #endif
1324
1325 coro_create (&cctx->cctx, cctx_run, (void *)cctx, stack_start, stack_size);
1326
1327 return cctx;
1328}
1329
1330static void
1331cctx_destroy (coro_cctx *cctx)
1332{
1333 if (!cctx)
1334 return;
1335
1336 assert (("FATAL: tried to destroy current cctx", cctx != cctx_current));//D temporary?
1337
1338 --cctx_count;
1339 coro_destroy (&cctx->cctx);
1340
1341 /* coro_transfer creates new, empty cctx's */
1342 if (cctx->sptr)
1343 {
1344 #if CORO_USE_VALGRIND
1345 VALGRIND_STACK_DEREGISTER (cctx->valgrind_id);
1346 #endif
1347
1348#if HAVE_MMAP
1349 if (cctx->flags & CC_MAPPED)
1350 munmap (cctx->sptr, cctx->ssize);
1351 else
1352#endif
1353 Safefree (cctx->sptr);
1354 }
1355
1356 Safefree (cctx);
1357}
1358
1359/* wether this cctx should be destructed */
1360#define CCTX_EXPIRED(cctx) ((cctx)->gen != cctx_gen || ((cctx)->flags & CC_NOREUSE))
1361
1362static coro_cctx *
1363cctx_get (pTHX)
1364{
1365 while (expect_true (cctx_first))
1366 {
1367 coro_cctx *cctx = cctx_first;
1368 cctx_first = cctx->next;
1369 --cctx_idle;
1370
1371 if (expect_true (!CCTX_EXPIRED (cctx)))
1372 return cctx;
1373
1374 cctx_destroy (cctx);
1375 }
1376
1377 return cctx_new_run ();
1378}
1379
1380static void
1381cctx_put (coro_cctx *cctx)
1382{
1383 assert (("FATAL: cctx_put called on non-initialised cctx in Coro (please report)", cctx->sptr));
1384
1385 /* free another cctx if overlimit */
1386 if (expect_false (cctx_idle >= cctx_max_idle))
1387 {
1388 coro_cctx *first = cctx_first;
1389 cctx_first = first->next;
1390 --cctx_idle;
1391
1392 cctx_destroy (first);
1393 }
1394
1395 ++cctx_idle;
1396 cctx->next = cctx_first;
1397 cctx_first = cctx;
1398}
1399
1400/** coroutine switching *****************************************************/
1401
1402static void
1403transfer_check (pTHX_ struct coro *prev, struct coro *next)
1404{
1405 /* TODO: throwing up here is considered harmful */
1406
1407 if (expect_true (prev != next))
1408 {
1409 if (expect_false (!(prev->flags & (CF_RUNNING | CF_NEW))))
1410 croak ("Coro::State::transfer called with a blocked prev Coro::State, but can only transfer from running or new states,");
1411
1412 if (expect_false (next->flags & (CF_RUNNING | CF_DESTROYED | CF_SUSPENDED)))
1413 croak ("Coro::State::transfer called with running, destroyed or suspended next Coro::State, but can only transfer to inactive states,");
1414
1415#if !PERL_VERSION_ATLEAST (5,10,0)
1416 if (expect_false (PL_lex_state != LEX_NOTPARSING))
1417 croak ("Coro::State::transfer called while parsing, but this is not supported in your perl version,");
1418#endif
1419 }
1420}
1421
1422/* always use the TRANSFER macro */
1423static void NOINLINE /* noinline so we have a fixed stackframe */
1424transfer (pTHX_ struct coro *prev, struct coro *next, int force_cctx)
1425{
1426 dSTACKLEVEL;
1427
1428 /* sometimes transfer is only called to set idle_sp */
1429 if (expect_false (!prev))
1430 {
1431 cctx_current->idle_sp = STACKLEVEL;
1432 assert (cctx_current->idle_te = PL_top_env); /* just for the side-effect when asserts are enabled */
1433 }
1434 else if (expect_true (prev != next))
1435 {
1436 coro_cctx *cctx_prev;
1437
1438 if (expect_false (prev->flags & CF_NEW))
1439 {
1440 /* create a new empty/source context */
1441 prev->flags &= ~CF_NEW;
1442 prev->flags |= CF_RUNNING;
1443 }
1444
1445 prev->flags &= ~CF_RUNNING;
1446 next->flags |= CF_RUNNING;
1447
1448 /* first get rid of the old state */
1449 save_perl (aTHX_ prev);
1450
1451 if (expect_false (next->flags & CF_NEW))
1452 {
1453 /* need to start coroutine */
1454 next->flags &= ~CF_NEW;
1455 /* setup coroutine call */
1456 coro_setup (aTHX_ next);
1457 }
1458 else
1459 load_perl (aTHX_ next);
1460
1461 /* possibly untie and reuse the cctx */
1462 if (expect_true (
1463 cctx_current->idle_sp == STACKLEVEL
1464 && !(cctx_current->flags & CC_TRACE)
1465 && !force_cctx
1466 ))
1467 {
1468 /* I assume that stacklevel is a stronger indicator than PL_top_env changes */
1469 assert (("FATAL: current top_env must equal previous top_env in Coro (please report)", PL_top_env == cctx_current->idle_te));
1470
1471 /* if the cctx is about to be destroyed we need to make sure we won't see it in cctx_get. */
1472 /* without this the next cctx_get might destroy the running cctx while still in use */
1473 if (expect_false (CCTX_EXPIRED (cctx_current)))
1474 if (expect_true (!next->cctx))
1475 next->cctx = cctx_get (aTHX);
1476
1477 cctx_put (cctx_current);
1478 }
1479 else
1480 prev->cctx = cctx_current;
1481
1482 ++next->usecount;
1483
1484 cctx_prev = cctx_current;
1485 cctx_current = expect_false (next->cctx) ? next->cctx : cctx_get (aTHX);
1486
1487 next->cctx = 0;
1488
1489 if (expect_false (cctx_prev != cctx_current))
1490 {
1491 cctx_prev->top_env = PL_top_env;
1492 PL_top_env = cctx_current->top_env;
1493 coro_transfer (&cctx_prev->cctx, &cctx_current->cctx);
1494 }
1495
1496 transfer_tail (aTHX);
1497 }
1498}
1499
1500#define TRANSFER(ta, force_cctx) transfer (aTHX_ (ta).prev, (ta).next, (force_cctx))
1501#define TRANSFER_CHECK(ta) transfer_check (aTHX_ (ta).prev, (ta).next)
1502
1503/** high level stuff ********************************************************/
1504
1505static int
1506coro_state_destroy (pTHX_ struct coro *coro)
1507{
1508 if (coro->flags & CF_DESTROYED)
1509 return 0;
1510
1511 if (coro->on_destroy && !PL_dirty)
1512 coro->on_destroy (aTHX_ coro);
1513
1514 coro->flags |= CF_DESTROYED;
1515
1516 if (coro->flags & CF_READY)
1517 {
1518 /* reduce nready, as destroying a ready coro effectively unreadies it */
1519 /* alternative: look through all ready queues and remove the coro */
1520 --coro_nready;
1521 }
1522 else
1523 coro->flags |= CF_READY; /* make sure it is NOT put into the readyqueue */
1524
1525 if (coro->mainstack
1526 && coro->mainstack != main_mainstack
1527 && coro->slot
1528 && !PL_dirty)
1529 coro_destruct_perl (aTHX_ coro);
1530
1531 cctx_destroy (coro->cctx);
1532 SvREFCNT_dec (coro->startcv);
1533 SvREFCNT_dec (coro->args);
1534 SvREFCNT_dec (CORO_THROW);
1535
1536 if (coro->next) coro->next->prev = coro->prev;
1537 if (coro->prev) coro->prev->next = coro->next;
1538 if (coro == coro_first) coro_first = coro->next;
1539
1540 return 1;
1541}
1542
1543static int
1544coro_state_free (pTHX_ SV *sv, MAGIC *mg)
1545{
1546 struct coro *coro = (struct coro *)mg->mg_ptr;
1547 mg->mg_ptr = 0;
1548
1549 coro->hv = 0;
1550
1551 if (--coro->refcnt < 0)
1552 {
1553 coro_state_destroy (aTHX_ coro);
1554 Safefree (coro);
1555 }
1556
1557 return 0;
1558}
1559
1560static int
1561coro_state_dup (pTHX_ MAGIC *mg, CLONE_PARAMS *params)
1562{
1563 struct coro *coro = (struct coro *)mg->mg_ptr;
1564
1565 ++coro->refcnt;
1566
1567 return 0;
1568}
1569
1570static MGVTBL coro_state_vtbl = {
1571 0, 0, 0, 0,
1572 coro_state_free,
1573 0,
1574#ifdef MGf_DUP
1575 coro_state_dup,
1576#else
1577# define MGf_DUP 0
1578#endif
1579};
1580
1581static void
1582prepare_transfer (pTHX_ struct coro_transfer_args *ta, SV *prev_sv, SV *next_sv)
1583{
1584 ta->prev = SvSTATE (prev_sv);
1585 ta->next = SvSTATE (next_sv);
1586 TRANSFER_CHECK (*ta);
1587}
1588
1589static void
1590api_transfer (pTHX_ SV *prev_sv, SV *next_sv)
1591{
1592 struct coro_transfer_args ta;
1593
1594 prepare_transfer (aTHX_ &ta, prev_sv, next_sv);
1595 TRANSFER (ta, 1);
1596}
1597
1598/*****************************************************************************/
1599/* gensub: simple closure generation utility */
1600
1601#define GENSUB_ARG CvXSUBANY (cv).any_ptr
1602
1603/* create a closure from XS, returns a code reference */
1604/* the arg can be accessed via GENSUB_ARG from the callback */
1605/* the callback must use dXSARGS/XSRETURN */
1606static SV *
1607gensub (pTHX_ void (*xsub)(pTHX_ CV *), void *arg)
1608{
1609 CV *cv = (CV *)newSV (0);
1610
1611 sv_upgrade ((SV *)cv, SVt_PVCV);
1612
1613 CvANON_on (cv);
1614 CvISXSUB_on (cv);
1615 CvXSUB (cv) = xsub;
1616 GENSUB_ARG = arg;
1617
1618 return newRV_noinc ((SV *)cv);
1619}
1620
1621/** Coro ********************************************************************/
1622
1623INLINE void
1624coro_enq (pTHX_ struct coro *coro)
1625{
1626 struct coro **ready = coro_ready [coro->prio - PRIO_MIN];
1627
1628 SvREFCNT_inc_NN (coro->hv);
1629
1630 coro->next_ready = 0;
1631 *(ready [0] ? &ready [1]->next_ready : &ready [0]) = coro;
1632 ready [1] = coro;
1633}
1634
1635INLINE struct coro *
1636coro_deq (pTHX)
1637{
1638 int prio;
1639
1640 for (prio = PRIO_MAX - PRIO_MIN + 1; --prio >= 0; )
1641 {
1642 struct coro **ready = coro_ready [prio];
1643
1644 if (ready [0])
1645 {
1646 struct coro *coro = ready [0];
1647 ready [0] = coro->next_ready;
1648 return coro;
1649 }
1650 }
1651
1652 return 0;
1653}
1654
1655static int
1656api_ready (pTHX_ SV *coro_sv)
1657{
1658 struct coro *coro;
1659 SV *sv_hook;
1660 void (*xs_hook)(void);
1661
1662 coro = SvSTATE (coro_sv);
1663
1664 if (coro->flags & CF_READY)
1665 return 0;
1666
1667 coro->flags |= CF_READY;
1668
1669 sv_hook = coro_nready ? 0 : coro_readyhook;
1670 xs_hook = coro_nready ? 0 : coroapi.readyhook;
1671
1672 coro_enq (aTHX_ coro);
1673 ++coro_nready;
1674
1675 if (sv_hook)
1676 {
1677 dSP;
1678
1679 ENTER;
1680 SAVETMPS;
1681
1682 PUSHMARK (SP);
1683 PUTBACK;
1684 call_sv (sv_hook, G_VOID | G_DISCARD);
1685
1686 FREETMPS;
1687 LEAVE;
1688 }
1689
1690 if (xs_hook)
1691 xs_hook ();
1692
1693 return 1;
1694}
1695
1696static int
1697api_is_ready (pTHX_ SV *coro_sv)
1698{
1699 return !!(SvSTATE (coro_sv)->flags & CF_READY);
1700}
1701
1702/* expects to own a reference to next->hv */
1703INLINE void
1704prepare_schedule_to (pTHX_ struct coro_transfer_args *ta, struct coro *next)
1705{
1706 SV *prev_sv = SvRV (coro_current);
1707
1708 ta->prev = SvSTATE_hv (prev_sv);
1709 ta->next = next;
1710
1711 TRANSFER_CHECK (*ta);
1712
1713 SvRV_set (coro_current, (SV *)next->hv);
1714
1715 free_coro_mortal (aTHX);
1716 coro_mortal = prev_sv;
1717}
1718
1719static void
1720prepare_schedule (pTHX_ struct coro_transfer_args *ta)
1721{
1722 for (;;)
1723 {
1724 struct coro *next = coro_deq (aTHX);
1725
1726 if (expect_true (next))
1727 {
1728 /* cannot transfer to destroyed coros, skip and look for next */
1729 if (expect_false (next->flags & (CF_DESTROYED | CF_SUSPENDED)))
1730 SvREFCNT_dec (next->hv); /* coro_nready has already been taken care of by destroy */
1731 else
1732 {
1733 next->flags &= ~CF_READY;
1734 --coro_nready;
1735
1736 prepare_schedule_to (aTHX_ ta, next);
1737 break;
1738 }
1739 }
1740 else
1741 {
1742 /* nothing to schedule: call the idle handler */
1743 if (SvROK (sv_idle)
1744 && SvOBJECT (SvRV (sv_idle)))
1745 {
1746 ++coro_nready; /* hack so that api_ready doesn't invoke ready hook */
1747 api_ready (aTHX_ SvRV (sv_idle));
1748 --coro_nready;
1749 }
1750 else
1751 {
1752 dSP;
1753
1754 ENTER;
1755 SAVETMPS;
1756
1757 PUSHMARK (SP);
1758 PUTBACK;
1759 call_sv (sv_idle, G_VOID | G_DISCARD);
1760
1761 FREETMPS;
1762 LEAVE;
1763 }
1764 }
1765 }
1766}
1767
1768INLINE void
1769prepare_cede (pTHX_ struct coro_transfer_args *ta)
1770{
1771 api_ready (aTHX_ coro_current);
1772 prepare_schedule (aTHX_ ta);
1773}
1774
1775INLINE void
1776prepare_cede_notself (pTHX_ struct coro_transfer_args *ta)
1777{
1778 SV *prev = SvRV (coro_current);
1779
1780 if (coro_nready)
1781 {
1782 prepare_schedule (aTHX_ ta);
1783 api_ready (aTHX_ prev);
1784 }
1785 else
1786 prepare_nop (aTHX_ ta);
1787}
1788
1789static void
1790api_schedule (pTHX)
1791{
1792 struct coro_transfer_args ta;
1793
1794 prepare_schedule (aTHX_ &ta);
1795 TRANSFER (ta, 1);
1796}
1797
1798static void
1799api_schedule_to (pTHX_ SV *coro_sv)
1800{
1801 struct coro_transfer_args ta;
1802 struct coro *next = SvSTATE (coro_sv);
1803
1804 SvREFCNT_inc_NN (coro_sv);
1805 prepare_schedule_to (aTHX_ &ta, next);
1806}
1807
1808static int
1809api_cede (pTHX)
1810{
1811 struct coro_transfer_args ta;
1812
1813 prepare_cede (aTHX_ &ta);
1814
1815 if (expect_true (ta.prev != ta.next))
1816 {
1817 TRANSFER (ta, 1);
1818 return 1;
1819 }
1820 else
1821 return 0;
1822}
1823
1824static int
1825api_cede_notself (pTHX)
1826{
1827 if (coro_nready)
1828 {
1829 struct coro_transfer_args ta;
1830
1831 prepare_cede_notself (aTHX_ &ta);
1832 TRANSFER (ta, 1);
1833 return 1;
1834 }
1835 else
1836 return 0;
1837}
1838
1839static void
1840api_trace (pTHX_ SV *coro_sv, int flags)
1841{
1842 struct coro *coro = SvSTATE (coro_sv);
1843
1844 if (coro->flags & CF_RUNNING)
1845 croak ("cannot enable tracing on a running coroutine, caught");
1846
1847 if (flags & CC_TRACE)
1848 {
1849 if (!coro->cctx)
1850 coro->cctx = cctx_new_run ();
1851 else if (!(coro->cctx->flags & CC_TRACE))
1852 croak ("cannot enable tracing on coroutine with custom stack, caught");
1853
1854 coro->cctx->flags |= CC_NOREUSE | (flags & (CC_TRACE | CC_TRACE_ALL));
1855 }
1856 else if (coro->cctx && coro->cctx->flags & CC_TRACE)
1857 {
1858 coro->cctx->flags &= ~(CC_TRACE | CC_TRACE_ALL);
1859
1860 if (coro->flags & CF_RUNNING)
1861 PL_runops = RUNOPS_DEFAULT;
1862 else
1863 coro->slot->runops = RUNOPS_DEFAULT;
1864 }
1865}
1866
1867static void
1868coro_call_on_destroy (pTHX_ struct coro *coro)
1869{
1870 SV **on_destroyp = hv_fetch (coro->hv, "_on_destroy", sizeof ("_on_destroy") - 1, 0);
1871 SV **statusp = hv_fetch (coro->hv, "_status", sizeof ("_status") - 1, 0);
1872
1873 if (on_destroyp)
1874 {
1875 AV *on_destroy = (AV *)SvRV (*on_destroyp);
1876
1877 while (AvFILLp (on_destroy) >= 0)
1878 {
1879 dSP; /* don't disturb outer sp */
1880 SV *cb = av_pop (on_destroy);
1881
1882 PUSHMARK (SP);
1883
1884 if (statusp)
1885 {
1886 int i;
1887 AV *status = (AV *)SvRV (*statusp);
1888 EXTEND (SP, AvFILLp (status) + 1);
1889
1890 for (i = 0; i <= AvFILLp (status); ++i)
1891 PUSHs (AvARRAY (status)[i]);
1892 }
1893
1894 PUTBACK;
1895 call_sv (sv_2mortal (cb), G_VOID | G_DISCARD);
1896 }
1897 }
1898}
1899
1900static void
1901slf_init_terminate (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
1902{
1903 int i;
1904 HV *hv = (HV *)SvRV (coro_current);
1905 AV *av = newAV ();
1906
1907 av_extend (av, items - 1);
1908 for (i = 0; i < items; ++i)
1909 av_push (av, SvREFCNT_inc_NN (arg [i]));
1910
1911 hv_store (hv, "_status", sizeof ("_status") - 1, newRV_noinc ((SV *)av), 0);
1912
1913 av_push (av_destroy, (SV *)newRV_inc ((SV *)hv)); /* RVinc for perl */
1914 api_ready (aTHX_ sv_manager);
1915
1916 frame->prepare = prepare_schedule;
1917 frame->check = slf_check_repeat;
1918
1919 /* as a minor optimisation, we could unwind all stacks here */
1920 /* but that puts extra pressure on pp_slf, and is not worth much */
1921 /*coro_unwind_stacks (aTHX);*/
1922}
1923
1924/*****************************************************************************/
1925/* async pool handler */
1926
1927static int
1928slf_check_pool_handler (pTHX_ struct CoroSLF *frame)
1929{
1930 HV *hv = (HV *)SvRV (coro_current);
1931 struct coro *coro = (struct coro *)frame->data;
1932
1933 if (!coro->invoke_cb)
1934 return 1; /* loop till we have invoke */
1935 else
1936 {
1937 hv_store (hv, "desc", sizeof ("desc") - 1,
1938 newSVpvn ("[async_pool]", sizeof ("[async_pool]") - 1), 0);
1939
1940 coro->saved_deffh = SvREFCNT_inc_NN ((SV *)PL_defoutgv);
1941
1942 {
1943 dSP;
1944 XPUSHs (sv_2mortal (coro->invoke_cb)); coro->invoke_cb = 0;
1945 PUTBACK;
1946 }
1947
1948 SvREFCNT_dec (GvAV (PL_defgv));
1949 GvAV (PL_defgv) = coro->invoke_av;
1950 coro->invoke_av = 0;
1951
1952 return 0;
1953 }
1954}
1955
1956static void
1957slf_init_pool_handler (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
1958{
1959 HV *hv = (HV *)SvRV (coro_current);
1960 struct coro *coro = SvSTATE_hv ((SV *)hv);
1961
1962 if (expect_true (coro->saved_deffh))
1963 {
1964 /* subsequent iteration */
1965 SvREFCNT_dec ((SV *)PL_defoutgv); PL_defoutgv = (GV *)coro->saved_deffh;
1966 coro->saved_deffh = 0;
1967
1968 if (coro_rss (aTHX_ coro) > SvUV (sv_pool_rss)
1969 || av_len (av_async_pool) + 1 >= SvIV (sv_pool_size))
1970 {
1971 coro->invoke_cb = SvREFCNT_inc_NN ((SV *)cv_coro_terminate);
1972 coro->invoke_av = newAV ();
1973
1974 frame->prepare = prepare_nop;
1975 }
1976 else
1977 {
1978 av_clear (GvAV (PL_defgv));
1979 hv_store (hv, "desc", sizeof ("desc") - 1, SvREFCNT_inc_NN (sv_async_pool_idle), 0);
1980
1981 coro->prio = 0;
1982
1983 if (coro->cctx && (coro->cctx->flags & CC_TRACE))
1984 api_trace (aTHX_ coro_current, 0);
1985
1986 frame->prepare = prepare_schedule;
1987 av_push (av_async_pool, SvREFCNT_inc (hv));
1988 }
1989 }
1990 else
1991 {
1992 /* first iteration, simply fall through */
1993 frame->prepare = prepare_nop;
1994 }
1995
1996 frame->check = slf_check_pool_handler;
1997 frame->data = (void *)coro;
1998}
1999
2000/*****************************************************************************/
2001/* rouse callback */
2002
2003#define CORO_MAGIC_type_rouse PERL_MAGIC_ext
2004
2005static void
2006coro_rouse_callback (pTHX_ CV *cv)
2007{
2008 dXSARGS;
2009 SV *data = (SV *)GENSUB_ARG;
2010
2011 if (SvTYPE (SvRV (data)) != SVt_PVAV)
2012 {
2013 /* first call, set args */
2014 SV *coro = SvRV (data);
2015 AV *av = newAV ();
2016
2017 SvRV_set (data, (SV *)av);
2018
2019 /* better take a full copy of the arguments */
2020 while (items--)
2021 av_store (av, items, newSVsv (ST (items)));
2022
2023 api_ready (aTHX_ coro);
2024 SvREFCNT_dec (coro);
2025 }
2026
2027 XSRETURN_EMPTY;
2028}
2029
2030static int
2031slf_check_rouse_wait (pTHX_ struct CoroSLF *frame)
2032{
2033 SV *data = (SV *)frame->data;
2034
2035 if (CORO_THROW)
2036 return 0;
2037
2038 if (SvTYPE (SvRV (data)) != SVt_PVAV)
2039 return 1;
2040
2041 /* now push all results on the stack */
2042 {
2043 dSP;
2044 AV *av = (AV *)SvRV (data);
2045 int i;
2046
2047 EXTEND (SP, AvFILLp (av) + 1);
2048 for (i = 0; i <= AvFILLp (av); ++i)
2049 PUSHs (sv_2mortal (AvARRAY (av)[i]));
2050
2051 /* we have stolen the elements, so set length to zero and free */
2052 AvFILLp (av) = -1;
2053 av_undef (av);
2054
2055 PUTBACK;
2056 }
2057
2058 return 0;
2059}
2060
2061static void
2062slf_init_rouse_wait (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2063{
2064 SV *cb;
2065
2066 if (items)
2067 cb = arg [0];
2068 else
2069 {
2070 struct coro *coro = SvSTATE_current;
2071
2072 if (!coro->rouse_cb)
2073 croak ("Coro::rouse_wait called without rouse callback, and no default rouse callback found either,");
2074
2075 cb = sv_2mortal (coro->rouse_cb);
2076 coro->rouse_cb = 0;
2077 }
2078
2079 if (!SvROK (cb)
2080 || SvTYPE (SvRV (cb)) != SVt_PVCV
2081 || CvXSUB ((CV *)SvRV (cb)) != coro_rouse_callback)
2082 croak ("Coro::rouse_wait called with illegal callback argument,");
2083
2084 {
2085 CV *cv = (CV *)SvRV (cb); /* for GENSUB_ARG */
2086 SV *data = (SV *)GENSUB_ARG;
2087
2088 frame->data = (void *)data;
2089 frame->prepare = SvTYPE (SvRV (data)) == SVt_PVAV ? prepare_nop : prepare_schedule;
2090 frame->check = slf_check_rouse_wait;
2091 }
2092}
2093
2094static SV *
2095coro_new_rouse_cb (pTHX)
2096{
2097 HV *hv = (HV *)SvRV (coro_current);
2098 struct coro *coro = SvSTATE_hv (hv);
2099 SV *data = newRV_inc ((SV *)hv);
2100 SV *cb = gensub (aTHX_ coro_rouse_callback, (void *)data);
2101
2102 sv_magicext (SvRV (cb), data, CORO_MAGIC_type_rouse, 0, 0, 0);
2103 SvREFCNT_dec (data); /* magicext increases the refcount */
2104
2105 SvREFCNT_dec (coro->rouse_cb);
2106 coro->rouse_cb = SvREFCNT_inc_NN (cb);
2107
2108 return cb;
2109}
2110
2111/*****************************************************************************/
2112/* schedule-like-function opcode (SLF) */
2113
2114static UNOP slf_restore; /* restore stack as entersub did, for first-re-run */
2115static const CV *slf_cv;
2116static SV **slf_argv;
2117static int slf_argc, slf_arga; /* count, allocated */
2118static I32 slf_ax; /* top of stack, for restore */
2119
2120/* this restores the stack in the case we patched the entersub, to */
2121/* recreate the stack frame as perl will on following calls */
2122/* since entersub cleared the stack */
2123static OP *
2124pp_restore (pTHX)
2125{
2126 int i;
2127 SV **SP = PL_stack_base + slf_ax;
2128
2129 PUSHMARK (SP);
2130
2131 EXTEND (SP, slf_argc + 1);
2132
2133 for (i = 0; i < slf_argc; ++i)
2134 PUSHs (sv_2mortal (slf_argv [i]));
2135
2136 PUSHs ((SV *)CvGV (slf_cv));
2137
2138 RETURNOP (slf_restore.op_first);
2139}
2140
2141static void
2142slf_prepare_transfer (pTHX_ struct coro_transfer_args *ta)
2143{
2144 SV **arg = (SV **)slf_frame.data;
2145
2146 prepare_transfer (aTHX_ ta, arg [0], arg [1]);
2147}
2148
2149static void
2150slf_init_transfer (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2151{
2152 if (items != 2)
2153 croak ("Coro::State::transfer (prev, next) expects two arguments, not %d,", items);
2154
2155 frame->prepare = slf_prepare_transfer;
2156 frame->check = slf_check_nop;
2157 frame->data = (void *)arg; /* let's hope it will stay valid */
2158}
2159
2160static void
2161slf_init_schedule (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2162{
2163 frame->prepare = prepare_schedule;
2164 frame->check = slf_check_nop;
2165}
2166
2167static void
2168slf_prepare_schedule_to (pTHX_ struct coro_transfer_args *ta)
2169{
2170 struct coro *next = (struct coro *)slf_frame.data;
2171
2172 SvREFCNT_inc_NN (next->hv);
2173 prepare_schedule_to (aTHX_ ta, next);
2174}
2175
2176static void
2177slf_init_schedule_to (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2178{
2179 if (!items)
2180 croak ("Coro::schedule_to expects a coroutine argument, caught");
2181
2182 frame->data = (void *)SvSTATE (arg [0]);
2183 frame->prepare = slf_prepare_schedule_to;
2184 frame->check = slf_check_nop;
2185}
2186
2187static void
2188slf_init_cede_to (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2189{
2190 api_ready (aTHX_ SvRV (coro_current));
2191
2192 slf_init_schedule_to (aTHX_ frame, cv, arg, items);
2193}
2194
2195static void
2196slf_init_cede (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2197{
2198 frame->prepare = prepare_cede;
2199 frame->check = slf_check_nop;
2200}
2201
2202static void
2203slf_init_cede_notself (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2204{
2205 frame->prepare = prepare_cede_notself;
2206 frame->check = slf_check_nop;
2207}
2208
2209/*
2210 * these not obviously related functions are all rolled into one
2211 * function to increase chances that they all will call transfer with the same
2212 * stack offset
2213 * SLF stands for "schedule-like-function".
2214 */
2215static OP *
2216pp_slf (pTHX)
2217{
2218 I32 checkmark; /* mark SP to see how many elements check has pushed */
2219
2220 /* set up the slf frame, unless it has already been set-up */
2221 /* the latter happens when a new coro has been started */
2222 /* or when a new cctx was attached to an existing coroutine */
2223 if (expect_true (!slf_frame.prepare))
2224 {
2225 /* first iteration */
2226 dSP;
2227 SV **arg = PL_stack_base + TOPMARK + 1;
2228 int items = SP - arg; /* args without function object */
2229 SV *gv = *sp;
2230
2231 /* do a quick consistency check on the "function" object, and if it isn't */
2232 /* for us, divert to the real entersub */
2233 if (SvTYPE (gv) != SVt_PVGV
2234 || !GvCV (gv)
2235 || !(CvFLAGS (GvCV (gv)) & CVf_SLF))
2236 return PL_ppaddr[OP_ENTERSUB](aTHX);
2237
2238 if (!(PL_op->op_flags & OPf_STACKED))
2239 {
2240 /* ampersand-form of call, use @_ instead of stack */
2241 AV *av = GvAV (PL_defgv);
2242 arg = AvARRAY (av);
2243 items = AvFILLp (av) + 1;
2244 }
2245
2246 /* now call the init function, which needs to set up slf_frame */
2247 ((coro_slf_cb)CvXSUBANY (GvCV (gv)).any_ptr)
2248 (aTHX_ &slf_frame, GvCV (gv), arg, items);
2249
2250 /* pop args */
2251 SP = PL_stack_base + POPMARK;
2252
2253 PUTBACK;
2254 }
2255
2256 /* now that we have a slf_frame, interpret it! */
2257 /* we use a callback system not to make the code needlessly */
2258 /* complicated, but so we can run multiple perl coros from one cctx */
2259
2260 do
2261 {
2262 struct coro_transfer_args ta;
2263
2264 slf_frame.prepare (aTHX_ &ta);
2265 TRANSFER (ta, 0);
2266
2267 checkmark = PL_stack_sp - PL_stack_base;
2268 }
2269 while (slf_frame.check (aTHX_ &slf_frame));
2270
2271 slf_frame.prepare = 0; /* invalidate the frame, we are done processing it */
2272
2273 /* exception handling */
2274 if (expect_false (CORO_THROW))
2275 {
2276 SV *exception = sv_2mortal (CORO_THROW);
2277
2278 CORO_THROW = 0;
2279 sv_setsv (ERRSV, exception);
2280 croak (0);
2281 }
2282
2283 /* return value handling - mostly like entersub */
2284 /* make sure we put something on the stack in scalar context */
2285 if (GIMME_V == G_SCALAR)
2286 {
2287 dSP;
2288 SV **bot = PL_stack_base + checkmark;
2289
2290 if (sp == bot) /* too few, push undef */
2291 bot [1] = &PL_sv_undef;
2292 else if (sp != bot + 1) /* too many, take last one */
2293 bot [1] = *sp;
2294
2295 SP = bot + 1;
2296
2297 PUTBACK;
2298 }
2299
2300 return NORMAL;
2301}
2302
2303static void
2304api_execute_slf (pTHX_ CV *cv, coro_slf_cb init_cb, I32 ax)
2305{
2306 int i;
2307 SV **arg = PL_stack_base + ax;
2308 int items = PL_stack_sp - arg + 1;
2309
2310 assert (("FATAL: SLF call with illegal CV value", !CvANON (cv)));
2311
2312 if (PL_op->op_ppaddr != PL_ppaddr [OP_ENTERSUB]
2313 && PL_op->op_ppaddr != pp_slf)
2314 croak ("FATAL: Coro SLF calls can only be made normally, not via goto or any other means, caught");
2315
2316 CvFLAGS (cv) |= CVf_SLF;
2317 CvXSUBANY (cv).any_ptr = (void *)init_cb;
2318 slf_cv = cv;
2319
2320 /* we patch the op, and then re-run the whole call */
2321 /* we have to put the same argument on the stack for this to work */
2322 /* and this will be done by pp_restore */
2323 slf_restore.op_next = (OP *)&slf_restore;
2324 slf_restore.op_type = OP_CUSTOM;
2325 slf_restore.op_ppaddr = pp_restore;
2326 slf_restore.op_first = PL_op;
2327
2328 slf_ax = ax - 1; /* undo the ax++ inside dAXMARK */
2329
2330 if (PL_op->op_flags & OPf_STACKED)
2331 {
2332 if (items > slf_arga)
2333 {
2334 slf_arga = items;
2335 free (slf_argv);
2336 slf_argv = malloc (slf_arga * sizeof (SV *));
2337 }
2338
2339 slf_argc = items;
2340
2341 for (i = 0; i < items; ++i)
2342 slf_argv [i] = SvREFCNT_inc (arg [i]);
2343 }
2344 else
2345 slf_argc = 0;
2346
2347 PL_op->op_ppaddr = pp_slf;
2348 /*PL_op->op_type = OP_CUSTOM; /* we do behave like entersub still */
2349
2350 PL_op = (OP *)&slf_restore;
2351}
2352
2353/*****************************************************************************/
2354/* dynamic wind */
2355
2356static void
2357on_enterleave_call (pTHX_ SV *cb)
2358{
2359 dSP;
2360
2361 PUSHSTACK;
2362
2363 PUSHMARK (SP);
2364 PUTBACK;
2365 call_sv (cb, G_VOID | G_DISCARD);
2366 SPAGAIN;
2367
2368 POPSTACK;
2369}
2370
2371static SV *
2372coro_avp_pop_and_free (pTHX_ AV **avp)
2373{
2374 AV *av = *avp;
2375 SV *res = av_pop (av);
2376
2377 if (AvFILLp (av) < 0)
2378 {
2379 *avp = 0;
2380 SvREFCNT_dec (av);
2381 }
2382
2383 return res;
2384}
2385
2386static void
2387coro_pop_on_enter (pTHX_ void *coro)
2388{
2389 SV *cb = coro_avp_pop_and_free (aTHX_ &((struct coro *)coro)->on_enter);
2390 SvREFCNT_dec (cb);
2391}
2392
2393static void
2394coro_pop_on_leave (pTHX_ void *coro)
2395{
2396 SV *cb = coro_avp_pop_and_free (aTHX_ &((struct coro *)coro)->on_leave);
2397 on_enterleave_call (aTHX_ sv_2mortal (cb));
2398}
2399
2400/*****************************************************************************/
2401/* PerlIO::cede */
2402
2403typedef struct
2404{
2405 PerlIOBuf base;
2406 NV next, every;
2407} PerlIOCede;
2408
2409static IV
2410PerlIOCede_pushed (pTHX_ PerlIO *f, const char *mode, SV *arg, PerlIO_funcs *tab)
2411{
2412 PerlIOCede *self = PerlIOSelf (f, PerlIOCede);
2413
2414 self->every = SvCUR (arg) ? SvNV (arg) : 0.01;
2415 self->next = nvtime () + self->every;
2416
2417 return PerlIOBuf_pushed (aTHX_ f, mode, Nullsv, tab);
2418}
2419
2420static SV *
2421PerlIOCede_getarg (pTHX_ PerlIO *f, CLONE_PARAMS *param, int flags)
2422{
2423 PerlIOCede *self = PerlIOSelf (f, PerlIOCede);
2424
2425 return newSVnv (self->every);
2426}
2427
2428static IV
2429PerlIOCede_flush (pTHX_ PerlIO *f)
2430{
2431 PerlIOCede *self = PerlIOSelf (f, PerlIOCede);
2432 double now = nvtime ();
2433
2434 if (now >= self->next)
2435 {
2436 api_cede (aTHX);
2437 self->next = now + self->every;
2438 }
2439
2440 return PerlIOBuf_flush (aTHX_ f);
2441}
2442
2443static PerlIO_funcs PerlIO_cede =
2444{
2445 sizeof(PerlIO_funcs),
2446 "cede",
2447 sizeof(PerlIOCede),
2448 PERLIO_K_DESTRUCT | PERLIO_K_RAW,
2449 PerlIOCede_pushed,
2450 PerlIOBuf_popped,
2451 PerlIOBuf_open,
2452 PerlIOBase_binmode,
2453 PerlIOCede_getarg,
2454 PerlIOBase_fileno,
2455 PerlIOBuf_dup,
2456 PerlIOBuf_read,
2457 PerlIOBuf_unread,
2458 PerlIOBuf_write,
2459 PerlIOBuf_seek,
2460 PerlIOBuf_tell,
2461 PerlIOBuf_close,
2462 PerlIOCede_flush,
2463 PerlIOBuf_fill,
2464 PerlIOBase_eof,
2465 PerlIOBase_error,
2466 PerlIOBase_clearerr,
2467 PerlIOBase_setlinebuf,
2468 PerlIOBuf_get_base,
2469 PerlIOBuf_bufsiz,
2470 PerlIOBuf_get_ptr,
2471 PerlIOBuf_get_cnt,
2472 PerlIOBuf_set_ptrcnt,
2473};
2474
2475/*****************************************************************************/
2476/* Coro::Semaphore & Coro::Signal */
2477
2478static SV *
2479coro_waitarray_new (pTHX_ int count)
2480{
2481 /* a semaphore contains a counter IV in $sem->[0] and any waiters after that */
2482 AV *av = newAV ();
2483 SV **ary;
2484
2485 /* unfortunately, building manually saves memory */
2486 Newx (ary, 2, SV *);
2487 AvALLOC (av) = ary;
2488#if PERL_VERSION_ATLEAST (5,10,0)
2489 AvARRAY (av) = ary;
2490#else
2491 /* 5.8.8 needs this syntax instead of AvARRAY = ary, yet */
2492 /* -DDEBUGGING flags this as a bug, despite it perfectly working */
2493 SvPVX ((SV *)av) = (char *)ary;
2494#endif
2495 AvMAX (av) = 1;
2496 AvFILLp (av) = 0;
2497 ary [0] = newSViv (count);
2498
2499 return newRV_noinc ((SV *)av);
2500}
2501
2502/* semaphore */
2503
2504static void
2505coro_semaphore_adjust (pTHX_ AV *av, IV adjust)
2506{
2507 SV *count_sv = AvARRAY (av)[0];
2508 IV count = SvIVX (count_sv);
2509
2510 count += adjust;
2511 SvIVX (count_sv) = count;
2512
2513 /* now wake up as many waiters as are expected to lock */
2514 while (count > 0 && AvFILLp (av) > 0)
2515 {
2516 SV *cb;
2517
2518 /* swap first two elements so we can shift a waiter */
2519 AvARRAY (av)[0] = AvARRAY (av)[1];
2520 AvARRAY (av)[1] = count_sv;
2521 cb = av_shift (av);
2522
2523 if (SvOBJECT (cb))
2524 {
2525 api_ready (aTHX_ cb);
2526 --count;
2527 }
2528 else if (SvTYPE (cb) == SVt_PVCV)
2529 {
2530 dSP;
2531 PUSHMARK (SP);
2532 XPUSHs (sv_2mortal (newRV_inc ((SV *)av)));
2533 PUTBACK;
2534 call_sv (cb, G_VOID | G_DISCARD | G_EVAL | G_KEEPERR);
2535 }
2536
2537 SvREFCNT_dec (cb);
2538 }
2539}
2540
2541static void
2542coro_semaphore_on_destroy (pTHX_ struct coro *coro)
2543{
2544 /* call $sem->adjust (0) to possibly wake up some other waiters */
2545 coro_semaphore_adjust (aTHX_ (AV *)coro->slf_frame.data, 0);
2546}
2547
2548static int
2549slf_check_semaphore_down_or_wait (pTHX_ struct CoroSLF *frame, int acquire)
2550{
2551 AV *av = (AV *)frame->data;
2552 SV *count_sv = AvARRAY (av)[0];
2553
2554 /* if we are about to throw, don't actually acquire the lock, just throw */
2555 if (CORO_THROW)
2556 return 0;
2557 else if (SvIVX (count_sv) > 0)
2558 {
2559 SvSTATE_current->on_destroy = 0;
2560
2561 if (acquire)
2562 SvIVX (count_sv) = SvIVX (count_sv) - 1;
2563 else
2564 coro_semaphore_adjust (aTHX_ av, 0);
2565
2566 return 0;
2567 }
2568 else
2569 {
2570 int i;
2571 /* if we were woken up but can't down, we look through the whole */
2572 /* waiters list and only add us if we aren't in there already */
2573 /* this avoids some degenerate memory usage cases */
2574
2575 for (i = 1; i <= AvFILLp (av); ++i)
2576 if (AvARRAY (av)[i] == SvRV (coro_current))
2577 return 1;
2578
2579 av_push (av, SvREFCNT_inc (SvRV (coro_current)));
2580 return 1;
2581 }
2582}
2583
2584static int
2585slf_check_semaphore_down (pTHX_ struct CoroSLF *frame)
2586{
2587 return slf_check_semaphore_down_or_wait (aTHX_ frame, 1);
2588}
2589
2590static int
2591slf_check_semaphore_wait (pTHX_ struct CoroSLF *frame)
2592{
2593 return slf_check_semaphore_down_or_wait (aTHX_ frame, 0);
2594}
2595
2596static void
2597slf_init_semaphore_down_or_wait (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2598{
2599 AV *av = (AV *)SvRV (arg [0]);
2600
2601 if (SvIVX (AvARRAY (av)[0]) > 0)
2602 {
2603 frame->data = (void *)av;
2604 frame->prepare = prepare_nop;
2605 }
2606 else
2607 {
2608 av_push (av, SvREFCNT_inc (SvRV (coro_current)));
2609
2610 frame->data = (void *)sv_2mortal (SvREFCNT_inc ((SV *)av));
2611 frame->prepare = prepare_schedule;
2612
2613 /* to avoid race conditions when a woken-up coro gets terminated */
2614 /* we arrange for a temporary on_destroy that calls adjust (0) */
2615 SvSTATE_current->on_destroy = coro_semaphore_on_destroy;
2616 }
2617}
2618
2619static void
2620slf_init_semaphore_down (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2621{
2622 slf_init_semaphore_down_or_wait (aTHX_ frame, cv, arg, items);
2623 frame->check = slf_check_semaphore_down;
2624}
2625
2626static void
2627slf_init_semaphore_wait (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2628{
2629 if (items >= 2)
2630 {
2631 /* callback form */
2632 AV *av = (AV *)SvRV (arg [0]);
2633 CV *cb_cv = coro_sv_2cv (aTHX_ arg [1]);
2634
2635 av_push (av, (SV *)SvREFCNT_inc_NN (cb_cv));
2636
2637 if (SvIVX (AvARRAY (av)[0]) > 0)
2638 coro_semaphore_adjust (aTHX_ av, 0);
2639
2640 frame->prepare = prepare_nop;
2641 frame->check = slf_check_nop;
2642 }
2643 else
2644 {
2645 slf_init_semaphore_down_or_wait (aTHX_ frame, cv, arg, items);
2646 frame->check = slf_check_semaphore_wait;
2647 }
2648}
2649
2650/* signal */
2651
2652static void
2653coro_signal_wake (pTHX_ AV *av, int count)
2654{
2655 SvIVX (AvARRAY (av)[0]) = 0;
2656
2657 /* now signal count waiters */
2658 while (count > 0 && AvFILLp (av) > 0)
2659 {
2660 SV *cb;
2661
2662 /* swap first two elements so we can shift a waiter */
2663 cb = AvARRAY (av)[0];
2664 AvARRAY (av)[0] = AvARRAY (av)[1];
2665 AvARRAY (av)[1] = cb;
2666
2667 cb = av_shift (av);
2668
2669 api_ready (aTHX_ cb);
2670 sv_setiv (cb, 0); /* signal waiter */
2671 SvREFCNT_dec (cb);
2672
2673 --count;
2674 }
2675}
2676
2677static int
2678slf_check_signal_wait (pTHX_ struct CoroSLF *frame)
2679{
2680 /* if we are about to throw, also stop waiting */
2681 return SvROK ((SV *)frame->data) && !CORO_THROW;
2682}
2683
2684static void
2685slf_init_signal_wait (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2686{
2687 AV *av = (AV *)SvRV (arg [0]);
2688
2689 if (SvIVX (AvARRAY (av)[0]))
2690 {
2691 SvIVX (AvARRAY (av)[0]) = 0;
2692 frame->prepare = prepare_nop;
2693 frame->check = slf_check_nop;
2694 }
2695 else
2696 {
2697 SV *waiter = newRV_inc (SvRV (coro_current)); /* owned by signal av */
2698
2699 av_push (av, waiter);
2700
2701 frame->data = (void *)sv_2mortal (SvREFCNT_inc_NN (waiter)); /* owned by process */
2702 frame->prepare = prepare_schedule;
2703 frame->check = slf_check_signal_wait;
2704 }
2705}
2706
2707/*****************************************************************************/
2708/* Coro::AIO */
2709
2710#define CORO_MAGIC_type_aio PERL_MAGIC_ext
2711
2712/* helper storage struct */
2713struct io_state
2714{
2715 int errorno;
2716 I32 laststype; /* U16 in 5.10.0 */
2717 int laststatval;
2718 Stat_t statcache;
2719};
2720
2721static void
2722coro_aio_callback (pTHX_ CV *cv)
2723{
2724 dXSARGS;
2725 AV *state = (AV *)GENSUB_ARG;
2726 SV *coro = av_pop (state);
2727 SV *data_sv = newSV (sizeof (struct io_state));
2728
2729 av_extend (state, items - 1);
2730
2731 sv_upgrade (data_sv, SVt_PV);
2732 SvCUR_set (data_sv, sizeof (struct io_state));
2733 SvPOK_only (data_sv);
2734
2735 {
2736 struct io_state *data = (struct io_state *)SvPVX (data_sv);
2737
2738 data->errorno = errno;
2739 data->laststype = PL_laststype;
2740 data->laststatval = PL_laststatval;
2741 data->statcache = PL_statcache;
2742 }
2743
2744 /* now build the result vector out of all the parameters and the data_sv */
2745 {
2746 int i;
2747
2748 for (i = 0; i < items; ++i)
2749 av_push (state, SvREFCNT_inc_NN (ST (i)));
2750 }
2751
2752 av_push (state, data_sv);
2753
2754 api_ready (aTHX_ coro);
2755 SvREFCNT_dec (coro);
2756 SvREFCNT_dec ((AV *)state);
2757}
2758
2759static int
2760slf_check_aio_req (pTHX_ struct CoroSLF *frame)
2761{
2762 AV *state = (AV *)frame->data;
2763
2764 /* if we are about to throw, return early */
2765 /* this does not cancel the aio request, but at least */
2766 /* it quickly returns */
2767 if (CORO_THROW)
2768 return 0;
2769
2770 /* one element that is an RV? repeat! */
2771 if (AvFILLp (state) == 0 && SvROK (AvARRAY (state)[0]))
2772 return 1;
2773
2774 /* restore status */
2775 {
2776 SV *data_sv = av_pop (state);
2777 struct io_state *data = (struct io_state *)SvPVX (data_sv);
2778
2779 errno = data->errorno;
2780 PL_laststype = data->laststype;
2781 PL_laststatval = data->laststatval;
2782 PL_statcache = data->statcache;
2783
2784 SvREFCNT_dec (data_sv);
2785 }
2786
2787 /* push result values */
2788 {
2789 dSP;
2790 int i;
2791
2792 EXTEND (SP, AvFILLp (state) + 1);
2793 for (i = 0; i <= AvFILLp (state); ++i)
2794 PUSHs (sv_2mortal (SvREFCNT_inc_NN (AvARRAY (state)[i])));
2795
2796 PUTBACK;
2797 }
2798
2799 return 0;
2800}
2801
2802static void
2803slf_init_aio_req (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2804{
2805 AV *state = (AV *)sv_2mortal ((SV *)newAV ());
2806 SV *coro_hv = SvRV (coro_current);
2807 struct coro *coro = SvSTATE_hv (coro_hv);
2808
2809 /* put our coroutine id on the state arg */
2810 av_push (state, SvREFCNT_inc_NN (coro_hv));
2811
2812 /* first see whether we have a non-zero priority and set it as AIO prio */
2813 if (coro->prio)
2814 {
2815 dSP;
2816
2817 static SV *prio_cv;
2818 static SV *prio_sv;
2819
2820 if (expect_false (!prio_cv))
2821 {
2822 prio_cv = (SV *)get_cv ("IO::AIO::aioreq_pri", 0);
2823 prio_sv = newSViv (0);
2824 }
2825
2826 PUSHMARK (SP);
2827 sv_setiv (prio_sv, coro->prio);
2828 XPUSHs (prio_sv);
2829
2830 PUTBACK;
2831 call_sv (prio_cv, G_VOID | G_DISCARD);
2832 }
2833
2834 /* now call the original request */
2835 {
2836 dSP;
2837 CV *req = (CV *)CORO_MAGIC_NN ((SV *)cv, CORO_MAGIC_type_aio)->mg_obj;
2838 int i;
2839
2840 PUSHMARK (SP);
2841
2842 /* first push all args to the stack */
2843 EXTEND (SP, items + 1);
2844
2845 for (i = 0; i < items; ++i)
2846 PUSHs (arg [i]);
2847
2848 /* now push the callback closure */
2849 PUSHs (sv_2mortal (gensub (aTHX_ coro_aio_callback, (void *)SvREFCNT_inc_NN ((SV *)state))));
2850
2851 /* now call the AIO function - we assume our request is uncancelable */
2852 PUTBACK;
2853 call_sv ((SV *)req, G_VOID | G_DISCARD);
2854 }
2855
2856 /* now that the requets is going, we loop toll we have a result */
2857 frame->data = (void *)state;
2858 frame->prepare = prepare_schedule;
2859 frame->check = slf_check_aio_req;
2860}
2861
2862static void
2863coro_aio_req_xs (pTHX_ CV *cv)
2864{
2865 dXSARGS;
2866
2867 CORO_EXECUTE_SLF_XS (slf_init_aio_req);
2868
2869 XSRETURN_EMPTY;
2870}
2871
2872/*****************************************************************************/
2873
2874#if CORO_CLONE
2875# include "clone.c"
2876#endif
2877
2878MODULE = Coro::State PACKAGE = Coro::State PREFIX = api_
2879
2880PROTOTYPES: DISABLE
2881
2882BOOT:
2883{
2884#ifdef USE_ITHREADS
2885# if CORO_PTHREAD
2886 coro_thx = PERL_GET_CONTEXT;
2887# endif
2888#endif
2889 BOOT_PAGESIZE;
2890
2891 cctx_current = cctx_new_empty ();
2892
2893 irsgv = gv_fetchpv ("/" , GV_ADD|GV_NOTQUAL, SVt_PV);
2894 stdoutgv = gv_fetchpv ("STDOUT", GV_ADD|GV_NOTQUAL, SVt_PVIO);
2895
2896 orig_sigelem_get = PL_vtbl_sigelem.svt_get; PL_vtbl_sigelem.svt_get = coro_sigelem_get;
2897 orig_sigelem_set = PL_vtbl_sigelem.svt_set; PL_vtbl_sigelem.svt_set = coro_sigelem_set;
2898 orig_sigelem_clr = PL_vtbl_sigelem.svt_clear; PL_vtbl_sigelem.svt_clear = coro_sigelem_clr;
2899
2900 hv_sig = coro_get_hv (aTHX_ "SIG", TRUE);
2901 rv_diehook = newRV_inc ((SV *)gv_fetchpv ("Coro::State::diehook" , 0, SVt_PVCV));
2902 rv_warnhook = newRV_inc ((SV *)gv_fetchpv ("Coro::State::warnhook", 0, SVt_PVCV));
2903
2904 coro_state_stash = gv_stashpv ("Coro::State", TRUE);
2905
2906 newCONSTSUB (coro_state_stash, "CC_TRACE" , newSViv (CC_TRACE));
2907 newCONSTSUB (coro_state_stash, "CC_TRACE_SUB" , newSViv (CC_TRACE_SUB));
2908 newCONSTSUB (coro_state_stash, "CC_TRACE_LINE", newSViv (CC_TRACE_LINE));
2909 newCONSTSUB (coro_state_stash, "CC_TRACE_ALL" , newSViv (CC_TRACE_ALL));
2910
2911 main_mainstack = PL_mainstack;
2912 main_top_env = PL_top_env;
2913
2914 while (main_top_env->je_prev)
2915 main_top_env = main_top_env->je_prev;
2916
2917 {
2918 SV *slf = sv_2mortal (newSViv (PTR2IV (pp_slf)));
2919
2920 if (!PL_custom_op_names) PL_custom_op_names = newHV ();
2921 hv_store_ent (PL_custom_op_names, slf, newSVpv ("coro_slf", 0), 0);
2922
2923 if (!PL_custom_op_descs) PL_custom_op_descs = newHV ();
2924 hv_store_ent (PL_custom_op_descs, slf, newSVpv ("coro schedule like function", 0), 0);
2925 }
2926
2927 coroapi.ver = CORO_API_VERSION;
2928 coroapi.rev = CORO_API_REVISION;
2929
2930 coroapi.transfer = api_transfer;
2931
2932 coroapi.sv_state = SvSTATE_;
2933 coroapi.execute_slf = api_execute_slf;
2934 coroapi.prepare_nop = prepare_nop;
2935 coroapi.prepare_schedule = prepare_schedule;
2936 coroapi.prepare_cede = prepare_cede;
2937 coroapi.prepare_cede_notself = prepare_cede_notself;
2938
2939 {
2940 SV **svp = hv_fetch (PL_modglobal, "Time::NVtime", 12, 0);
2941
2942 if (!svp) croak ("Time::HiRes is required");
2943 if (!SvIOK (*svp)) croak ("Time::NVtime isn't a function pointer");
2944
2945 nvtime = INT2PTR (double (*)(), SvIV (*svp));
2946 }
2947
2948 assert (("PRIO_NORMAL must be 0", !PRIO_NORMAL));
2949}
2950
2951SV *
2952new (char *klass, ...)
2953 ALIAS:
2954 Coro::new = 1
2955 CODE:
2956{
2957 struct coro *coro;
2958 MAGIC *mg;
2959 HV *hv;
2960 CV *cb;
2961 int i;
2962
2963 if (items > 1)
2964 {
2965 cb = coro_sv_2cv (aTHX_ ST (1));
2966
2967 if (!ix)
235 { 2968 {
236 /* I never used formats, so how should I know how these are implemented? */ 2969 if (CvISXSUB (cb))
237 /* my bold guess is as a simple, plain sub... */ 2970 croak ("Coro::State doesn't support XS functions as coroutine start, caught");
238 croak ("CXt_FORMAT not yet handled. Don't switch coroutines from within formats"); 2971
2972 if (!CvROOT (cb))
2973 croak ("Coro::State doesn't support autoloaded or undefined functions as coroutine start, caught");
239 } 2974 }
240 } 2975 }
241 2976
242 if (top_si->si_type == PERLSI_MAIN)
243 break;
244
245 top_si = top_si->si_prev;
246 ccstk = top_si->si_cxstack;
247 cxix = top_si->si_cxix;
248 }
249
250 PUTBACK;
251 }
252
253 c->dowarn = PL_dowarn;
254 c->defav = GvAV (PL_defgv);
255 c->curstackinfo = PL_curstackinfo;
256 c->curstack = PL_curstack;
257 c->mainstack = PL_mainstack;
258 c->stack_sp = PL_stack_sp;
259 c->op = PL_op;
260 c->curpad = PL_curpad;
261 c->stack_base = PL_stack_base;
262 c->stack_max = PL_stack_max;
263 c->tmps_stack = PL_tmps_stack;
264 c->tmps_floor = PL_tmps_floor;
265 c->tmps_ix = PL_tmps_ix;
266 c->tmps_max = PL_tmps_max;
267 c->markstack = PL_markstack;
268 c->markstack_ptr = PL_markstack_ptr;
269 c->markstack_max = PL_markstack_max;
270 c->scopestack = PL_scopestack;
271 c->scopestack_ix = PL_scopestack_ix;
272 c->scopestack_max = PL_scopestack_max;
273 c->savestack = PL_savestack;
274 c->savestack_ix = PL_savestack_ix;
275 c->savestack_max = PL_savestack_max;
276 c->retstack = PL_retstack;
277 c->retstack_ix = PL_retstack_ix;
278 c->retstack_max = PL_retstack_max;
279 c->curcop = PL_curcop;
280}
281
282static void
283LOAD(pTHX_ Coro__State c)
284{
285 PL_dowarn = c->dowarn;
286 GvAV (PL_defgv) = c->defav;
287 PL_curstackinfo = c->curstackinfo;
288 PL_curstack = c->curstack;
289 PL_mainstack = c->mainstack;
290 PL_stack_sp = c->stack_sp;
291 PL_op = c->op;
292 PL_curpad = c->curpad;
293 PL_stack_base = c->stack_base;
294 PL_stack_max = c->stack_max;
295 PL_tmps_stack = c->tmps_stack;
296 PL_tmps_floor = c->tmps_floor;
297 PL_tmps_ix = c->tmps_ix;
298 PL_tmps_max = c->tmps_max;
299 PL_markstack = c->markstack;
300 PL_markstack_ptr = c->markstack_ptr;
301 PL_markstack_max = c->markstack_max;
302 PL_scopestack = c->scopestack;
303 PL_scopestack_ix = c->scopestack_ix;
304 PL_scopestack_max = c->scopestack_max;
305 PL_savestack = c->savestack;
306 PL_savestack_ix = c->savestack_ix;
307 PL_savestack_max = c->savestack_max;
308 PL_retstack = c->retstack;
309 PL_retstack_ix = c->retstack_ix;
310 PL_retstack_max = c->retstack_max;
311 PL_curcop = c->curcop;
312
313 {
314 dSP;
315 CV *cv;
316
317 /* now do the ugly restore mess */
318 while ((cv = (CV *)POPs))
319 {
320 AV *padlist = (AV *)POPs;
321
322 put_padlist (cv);
323 CvPADLIST(cv) = padlist;
324 CvDEPTH(cv) = (I32)POPs;
325
326#ifdef USE_THREADS
327 CvOWNER(cv) = (struct perl_thread *)POPs;
328 error does not work either
329#endif
330 }
331
332 PUTBACK;
333 }
334}
335
336/* this is an EXACT copy of S_nuke_stacks in perl.c, which is unfortunately static */
337STATIC void
338destroy_stacks(pTHX)
339{
340 /* die does this while calling POPSTACK, but I just don't see why. */
341 dounwind(-1);
342
343 /* is this ugly, I ask? */
344 while (PL_scopestack_ix)
345 LEAVE;
346
347 while (PL_curstackinfo->si_next)
348 PL_curstackinfo = PL_curstackinfo->si_next;
349
350 while (PL_curstackinfo)
351 {
352 PERL_SI *p = PL_curstackinfo->si_prev;
353
354 SvREFCNT_dec(PL_curstackinfo->si_stack);
355 Safefree(PL_curstackinfo->si_cxstack);
356 Safefree(PL_curstackinfo);
357 PL_curstackinfo = p;
358 }
359
360 if (PL_scopestack_ix != 0)
361 Perl_warner(aTHX_ WARN_INTERNAL,
362 "Unbalanced scopes: %ld more ENTERs than LEAVEs\n",
363 (long)PL_scopestack_ix);
364 if (PL_savestack_ix != 0)
365 Perl_warner(aTHX_ WARN_INTERNAL,
366 "Unbalanced saves: %ld more saves than restores\n",
367 (long)PL_savestack_ix);
368 if (PL_tmps_floor != -1)
369 Perl_warner(aTHX_ WARN_INTERNAL,"Unbalanced tmps: %ld more allocs than frees\n",
370 (long)PL_tmps_floor + 1);
371 /*
372 */
373 Safefree(PL_tmps_stack);
374 Safefree(PL_markstack);
375 Safefree(PL_scopestack);
376 Safefree(PL_savestack);
377 Safefree(PL_retstack);
378}
379
380#define SUB_INIT "Coro::State::_newcoro"
381
382MODULE = Coro::State PACKAGE = Coro::State
383
384PROTOTYPES: ENABLE
385
386BOOT:
387 if (!padlist_cache)
388 padlist_cache = newHV ();
389
390Coro::State
391_newprocess(args)
392 SV * args
393 PROTOTYPE: $
394 CODE:
395 Coro__State coro;
396
397 if (!SvROK (args) || SvTYPE (SvRV (args)) != SVt_PVAV)
398 croak ("Coro::State::newprocess expects an arrayref");
399
400 New (0, coro, 1, struct coro); 2977 Newz (0, coro, 1, struct coro);
2978 coro->args = newAV ();
2979 coro->flags = CF_NEW;
401 2980
402 coro->mainstack = 0; /* actual work is done inside transfer */ 2981 if (coro_first) coro_first->prev = coro;
403 coro->args = (AV *)SvREFCNT_inc (SvRV (args)); 2982 coro->next = coro_first;
2983 coro_first = coro;
404 2984
405 RETVAL = coro; 2985 coro->hv = hv = newHV ();
2986 mg = sv_magicext ((SV *)hv, 0, CORO_MAGIC_type_state, &coro_state_vtbl, (char *)coro, 0);
2987 mg->mg_flags |= MGf_DUP;
2988 RETVAL = sv_bless (newRV_noinc ((SV *)hv), gv_stashpv (klass, 1));
2989
2990 if (items > 1)
2991 {
2992 av_extend (coro->args, items - 1 + ix - 1);
2993
2994 if (ix)
2995 {
2996 av_push (coro->args, SvREFCNT_inc_NN ((SV *)cb));
2997 cb = cv_coro_run;
2998 }
2999
3000 coro->startcv = (CV *)SvREFCNT_inc_NN ((SV *)cb);
3001
3002 for (i = 2; i < items; i++)
3003 av_push (coro->args, newSVsv (ST (i)));
3004 }
3005}
406 OUTPUT: 3006 OUTPUT:
407 RETVAL 3007 RETVAL
408 3008
409void 3009void
410transfer(prev,next) 3010transfer (...)
411 Coro::State_or_hashref prev 3011 PROTOTYPE: $$
412 Coro::State_or_hashref next 3012 CODE:
413 CODE: 3013 CORO_EXECUTE_SLF_XS (slf_init_transfer);
414 3014
415 if (prev != next) 3015bool
3016_destroy (SV *coro_sv)
3017 CODE:
3018 RETVAL = coro_state_destroy (aTHX_ SvSTATE (coro_sv));
3019 OUTPUT:
3020 RETVAL
3021
3022void
3023_exit (int code)
3024 PROTOTYPE: $
3025 CODE:
3026 _exit (code);
3027
3028SV *
3029clone (Coro::State coro)
3030 CODE:
3031{
3032#if CORO_CLONE
3033 struct coro *ncoro = coro_clone (aTHX_ coro);
3034 MAGIC *mg;
3035 /* TODO: too much duplication */
3036 ncoro->hv = newHV ();
3037 mg = sv_magicext ((SV *)ncoro->hv, 0, CORO_MAGIC_type_state, &coro_state_vtbl, (char *)ncoro, 0);
3038 mg->mg_flags |= MGf_DUP;
3039 RETVAL = sv_bless (newRV_noinc ((SV *)ncoro->hv), SvSTASH (coro->hv));
3040#else
3041 croak ("Coro::State->clone has not been configured into this installation of Coro, realised");
3042#endif
3043}
3044 OUTPUT:
3045 RETVAL
3046
3047int
3048cctx_stacksize (int new_stacksize = 0)
3049 PROTOTYPE: ;$
3050 CODE:
3051 RETVAL = cctx_stacksize;
3052 if (new_stacksize)
416 { 3053 {
417 PUTBACK; 3054 cctx_stacksize = new_stacksize;
418 SAVE (aTHX_ prev); 3055 ++cctx_gen;
419
420 /* 3056 }
421 * this could be done in newprocess which would lead to 3057 OUTPUT:
422 * extremely elegant and fast (just PUTBACK/SAVE/LOAD/SPAGAIN) 3058 RETVAL
423 * code here, but lazy allocation of stacks has also 3059
424 * some virtues and the overhead of the if() is nil. 3060int
3061cctx_max_idle (int max_idle = 0)
3062 PROTOTYPE: ;$
3063 CODE:
3064 RETVAL = cctx_max_idle;
3065 if (max_idle > 1)
3066 cctx_max_idle = max_idle;
3067 OUTPUT:
3068 RETVAL
3069
3070int
3071cctx_count ()
3072 PROTOTYPE:
3073 CODE:
3074 RETVAL = cctx_count;
3075 OUTPUT:
3076 RETVAL
3077
3078int
3079cctx_idle ()
3080 PROTOTYPE:
3081 CODE:
3082 RETVAL = cctx_idle;
3083 OUTPUT:
3084 RETVAL
3085
3086void
3087list ()
3088 PROTOTYPE:
3089 PPCODE:
3090{
3091 struct coro *coro;
3092 for (coro = coro_first; coro; coro = coro->next)
3093 if (coro->hv)
3094 XPUSHs (sv_2mortal (newRV_inc ((SV *)coro->hv)));
3095}
3096
3097void
3098call (Coro::State coro, SV *coderef)
3099 ALIAS:
3100 eval = 1
3101 CODE:
3102{
3103 if (coro->mainstack && ((coro->flags & CF_RUNNING) || coro->slot))
425 */ 3104 {
426 if (next->mainstack) 3105 struct coro *current = SvSTATE_current;
3106
3107 if (current != coro)
427 { 3108 {
428 LOAD (aTHX_ next); 3109 PUTBACK;
429 next->mainstack = 0; /* unnecessary but much cleaner */ 3110 save_perl (aTHX_ current);
3111 load_perl (aTHX_ coro);
430 SPAGAIN; 3112 SPAGAIN;
431 } 3113 }
3114
3115 PUSHSTACK;
3116
3117 PUSHMARK (SP);
3118 PUTBACK;
3119
3120 if (ix)
3121 eval_sv (coderef, 0);
432 else 3122 else
3123 call_sv (coderef, G_KEEPERR | G_EVAL | G_VOID | G_DISCARD);
3124
3125 POPSTACK;
3126 SPAGAIN;
3127
3128 if (current != coro)
433 { 3129 {
434 /* 3130 PUTBACK;
435 * emulate part of the perl startup here. 3131 save_perl (aTHX_ coro);
436 */ 3132 load_perl (aTHX_ current);
437 UNOP myop;
438
439 init_stacks (); /* from perl.c */
440 PL_op = (OP *)&myop;
441 /*PL_curcop = 0;*/
442 GvAV (PL_defgv) = (AV *)SvREFCNT_inc ((SV *)next->args);
443
444 SPAGAIN; 3133 SPAGAIN;
445 Zero(&myop, 1, UNOP);
446 myop.op_next = Nullop;
447 myop.op_flags = OPf_WANT_VOID;
448
449 PUSHMARK(SP);
450 XPUSHs ((SV*)get_cv(SUB_INIT, TRUE));
451 PUTBACK;
452 /*
453 * the next line is slightly wrong, as PL_op->op_next
454 * is actually being executed so we skip the first op.
455 * that doesn't matter, though, since it is only
456 * pp_nextstate and we never return...
457 */
458 PL_op = Perl_pp_entersub(aTHX);
459 SPAGAIN;
460
461 ENTER;
462 } 3134 }
463 } 3135 }
3136}
3137
3138SV *
3139is_ready (Coro::State coro)
3140 PROTOTYPE: $
3141 ALIAS:
3142 is_ready = CF_READY
3143 is_running = CF_RUNNING
3144 is_new = CF_NEW
3145 is_destroyed = CF_DESTROYED
3146 is_suspended = CF_SUSPENDED
3147 CODE:
3148 RETVAL = boolSV (coro->flags & ix);
3149 OUTPUT:
3150 RETVAL
464 3151
465void 3152void
466DESTROY(coro) 3153throw (Coro::State self, SV *throw = &PL_sv_undef)
467 Coro::State coro 3154 PROTOTYPE: $;$
468 CODE: 3155 CODE:
3156{
3157 struct coro *current = SvSTATE_current;
3158 SV **throwp = self == current ? &CORO_THROW : &self->except;
3159 SvREFCNT_dec (*throwp);
3160 *throwp = SvOK (throw) ? newSVsv (throw) : 0;
3161}
469 3162
470 if (coro->mainstack) 3163void
3164api_trace (SV *coro, int flags = CC_TRACE | CC_TRACE_SUB)
3165 PROTOTYPE: $;$
3166 C_ARGS: aTHX_ coro, flags
3167
3168SV *
3169has_cctx (Coro::State coro)
3170 PROTOTYPE: $
3171 CODE:
3172 /* maybe manage the running flag differently */
3173 RETVAL = boolSV (!!coro->cctx || (coro->flags & CF_RUNNING));
3174 OUTPUT:
3175 RETVAL
3176
3177int
3178is_traced (Coro::State coro)
3179 PROTOTYPE: $
3180 CODE:
3181 RETVAL = (coro->cctx ? coro->cctx->flags : 0) & CC_TRACE_ALL;
3182 OUTPUT:
3183 RETVAL
3184
3185UV
3186rss (Coro::State coro)
3187 PROTOTYPE: $
3188 ALIAS:
3189 usecount = 1
3190 CODE:
3191 switch (ix)
3192 {
3193 case 0: RETVAL = coro_rss (aTHX_ coro); break;
3194 case 1: RETVAL = coro->usecount; break;
3195 }
3196 OUTPUT:
3197 RETVAL
3198
3199void
3200force_cctx ()
3201 PROTOTYPE:
3202 CODE:
3203 cctx_current->idle_sp = 0;
3204
3205void
3206swap_defsv (Coro::State self)
3207 PROTOTYPE: $
3208 ALIAS:
3209 swap_defav = 1
3210 CODE:
3211 if (!self->slot)
3212 croak ("cannot swap state with coroutine that has no saved state,");
3213 else
471 { 3214 {
472 struct coro temp; 3215 SV **src = ix ? (SV **)&GvAV (PL_defgv) : &GvSV (PL_defgv);
3216 SV **dst = ix ? (SV **)&self->slot->defav : (SV **)&self->slot->defsv;
473 3217
3218 SV *tmp = *src; *src = *dst; *dst = tmp;
3219 }
3220
3221void
3222cancel (Coro::State self)
3223 CODE:
3224 coro_state_destroy (aTHX_ self);
3225 coro_call_on_destroy (aTHX_ self); /* actually only for Coro objects */
3226
3227
3228MODULE = Coro::State PACKAGE = Coro
3229
3230BOOT:
3231{
3232 sv_pool_rss = coro_get_sv (aTHX_ "Coro::POOL_RSS" , TRUE);
3233 sv_pool_size = coro_get_sv (aTHX_ "Coro::POOL_SIZE" , TRUE);
3234 cv_coro_run = get_cv ( "Coro::_coro_run" , GV_ADD);
3235 cv_coro_terminate = get_cv ( "Coro::terminate" , GV_ADD);
3236 coro_current = coro_get_sv (aTHX_ "Coro::current" , FALSE); SvREADONLY_on (coro_current);
3237 av_async_pool = coro_get_av (aTHX_ "Coro::async_pool", TRUE);
3238 av_destroy = coro_get_av (aTHX_ "Coro::destroy" , TRUE);
3239 sv_manager = coro_get_sv (aTHX_ "Coro::manager" , TRUE);
3240 sv_idle = coro_get_sv (aTHX_ "Coro::idle" , TRUE);
3241
3242 sv_async_pool_idle = newSVpv ("[async pool idle]", 0); SvREADONLY_on (sv_async_pool_idle);
3243 sv_Coro = newSVpv ("Coro", 0); SvREADONLY_on (sv_Coro);
3244 cv_pool_handler = get_cv ("Coro::pool_handler", GV_ADD); SvREADONLY_on (cv_pool_handler);
3245 cv_coro_state_new = get_cv ("Coro::State::new", 0); SvREADONLY_on (cv_coro_state_new);
3246
3247 coro_stash = gv_stashpv ("Coro", TRUE);
3248
3249 newCONSTSUB (coro_stash, "PRIO_MAX", newSViv (PRIO_MAX));
3250 newCONSTSUB (coro_stash, "PRIO_HIGH", newSViv (PRIO_HIGH));
3251 newCONSTSUB (coro_stash, "PRIO_NORMAL", newSViv (PRIO_NORMAL));
3252 newCONSTSUB (coro_stash, "PRIO_LOW", newSViv (PRIO_LOW));
3253 newCONSTSUB (coro_stash, "PRIO_IDLE", newSViv (PRIO_IDLE));
3254 newCONSTSUB (coro_stash, "PRIO_MIN", newSViv (PRIO_MIN));
3255
3256 {
3257 SV *sv = coro_get_sv (aTHX_ "Coro::API", TRUE);
3258
3259 coroapi.schedule = api_schedule;
3260 coroapi.schedule_to = api_schedule_to;
3261 coroapi.cede = api_cede;
3262 coroapi.cede_notself = api_cede_notself;
3263 coroapi.ready = api_ready;
3264 coroapi.is_ready = api_is_ready;
3265 coroapi.nready = coro_nready;
3266 coroapi.current = coro_current;
3267
3268 /*GCoroAPI = &coroapi;*/
3269 sv_setiv (sv, (IV)&coroapi);
3270 SvREADONLY_on (sv);
3271 }
3272}
3273
3274void
3275terminate (...)
3276 CODE:
3277 CORO_EXECUTE_SLF_XS (slf_init_terminate);
3278
3279void
3280schedule (...)
3281 CODE:
3282 CORO_EXECUTE_SLF_XS (slf_init_schedule);
3283
3284void
3285schedule_to (...)
3286 CODE:
3287 CORO_EXECUTE_SLF_XS (slf_init_schedule_to);
3288
3289void
3290cede_to (...)
3291 CODE:
3292 CORO_EXECUTE_SLF_XS (slf_init_cede_to);
3293
3294void
3295cede (...)
3296 CODE:
3297 CORO_EXECUTE_SLF_XS (slf_init_cede);
3298
3299void
3300cede_notself (...)
3301 CODE:
3302 CORO_EXECUTE_SLF_XS (slf_init_cede_notself);
3303
3304void
3305_set_current (SV *current)
3306 PROTOTYPE: $
3307 CODE:
3308 SvREFCNT_dec (SvRV (coro_current));
3309 SvRV_set (coro_current, SvREFCNT_inc_NN (SvRV (current)));
3310
3311void
3312_set_readyhook (SV *hook)
3313 PROTOTYPE: $
3314 CODE:
3315 SvREFCNT_dec (coro_readyhook);
3316 coro_readyhook = SvOK (hook) ? newSVsv (hook) : 0;
3317
3318int
3319prio (Coro::State coro, int newprio = 0)
3320 PROTOTYPE: $;$
3321 ALIAS:
3322 nice = 1
3323 CODE:
3324{
3325 RETVAL = coro->prio;
3326
3327 if (items > 1)
3328 {
3329 if (ix)
3330 newprio = coro->prio - newprio;
3331
3332 if (newprio < PRIO_MIN) newprio = PRIO_MIN;
3333 if (newprio > PRIO_MAX) newprio = PRIO_MAX;
3334
3335 coro->prio = newprio;
3336 }
3337}
3338 OUTPUT:
3339 RETVAL
3340
3341SV *
3342ready (SV *self)
3343 PROTOTYPE: $
3344 CODE:
3345 RETVAL = boolSV (api_ready (aTHX_ self));
3346 OUTPUT:
3347 RETVAL
3348
3349int
3350nready (...)
3351 PROTOTYPE:
3352 CODE:
3353 RETVAL = coro_nready;
3354 OUTPUT:
3355 RETVAL
3356
3357void
3358suspend (Coro::State self)
3359 PROTOTYPE: $
3360 CODE:
3361 self->flags |= CF_SUSPENDED;
3362
3363void
3364resume (Coro::State self)
3365 PROTOTYPE: $
3366 CODE:
3367 self->flags &= ~CF_SUSPENDED;
3368
3369void
3370_pool_handler (...)
3371 CODE:
3372 CORO_EXECUTE_SLF_XS (slf_init_pool_handler);
3373
3374void
3375async_pool (SV *cv, ...)
3376 PROTOTYPE: &@
3377 PPCODE:
3378{
3379 HV *hv = (HV *)av_pop (av_async_pool);
3380 AV *av = newAV ();
3381 SV *cb = ST (0);
3382 int i;
3383
3384 av_extend (av, items - 2);
3385 for (i = 1; i < items; ++i)
3386 av_push (av, SvREFCNT_inc_NN (ST (i)));
3387
3388 if ((SV *)hv == &PL_sv_undef)
3389 {
3390 PUSHMARK (SP);
3391 EXTEND (SP, 2);
3392 PUSHs (sv_Coro);
3393 PUSHs ((SV *)cv_pool_handler);
474 PUTBACK; 3394 PUTBACK;
475 SAVE(aTHX_ (&temp)); 3395 call_sv ((SV *)cv_coro_state_new, G_SCALAR);
476 LOAD(aTHX_ coro);
477
478 destroy_stacks ();
479 SvREFCNT_dec ((SV *)GvAV (PL_defgv));
480
481 LOAD((&temp));
482 SPAGAIN; 3396 SPAGAIN;
3397
3398 hv = (HV *)SvREFCNT_inc_NN (SvRV (POPs));
483 } 3399 }
484 3400
3401 {
3402 struct coro *coro = SvSTATE_hv (hv);
3403
3404 assert (!coro->invoke_cb);
3405 assert (!coro->invoke_av);
3406 coro->invoke_cb = SvREFCNT_inc (cb);
3407 coro->invoke_av = av;
3408 }
3409
3410 api_ready (aTHX_ (SV *)hv);
3411
3412 if (GIMME_V != G_VOID)
3413 XPUSHs (sv_2mortal (newRV_noinc ((SV *)hv)));
3414 else
485 SvREFCNT_dec (coro->args); 3415 SvREFCNT_dec (hv);
486 Safefree (coro); 3416}
487 3417
3418SV *
3419rouse_cb ()
3420 PROTOTYPE:
3421 CODE:
3422 RETVAL = coro_new_rouse_cb (aTHX);
3423 OUTPUT:
3424 RETVAL
488 3425
3426void
3427rouse_wait (...)
3428 PROTOTYPE: ;$
3429 PPCODE:
3430 CORO_EXECUTE_SLF_XS (slf_init_rouse_wait);
3431
3432void
3433on_enter (SV *block)
3434 ALIAS:
3435 on_leave = 1
3436 PROTOTYPE: &
3437 CODE:
3438{
3439 struct coro *coro = SvSTATE_current;
3440 AV **avp = ix ? &coro->on_leave : &coro->on_enter;
3441
3442 block = (SV *)coro_sv_2cv (aTHX_ block);
3443
3444 if (!*avp)
3445 *avp = newAV ();
3446
3447 av_push (*avp, SvREFCNT_inc (block));
3448
3449 if (!ix)
3450 on_enterleave_call (aTHX_ block);
3451
3452 LEAVE; /* pp_entersub unfortunately forces an ENTER/LEAVE around XS calls */
3453 SAVEDESTRUCTOR_X (ix ? coro_pop_on_leave : coro_pop_on_enter, (void *)coro);
3454 ENTER; /* pp_entersub unfortunately forces an ENTER/LEAVE around XS calls */
3455}
3456
3457
3458MODULE = Coro::State PACKAGE = PerlIO::cede
3459
3460BOOT:
3461 PerlIO_define_layer (aTHX_ &PerlIO_cede);
3462
3463
3464MODULE = Coro::State PACKAGE = Coro::Semaphore
3465
3466SV *
3467new (SV *klass, SV *count = 0)
3468 CODE:
3469 RETVAL = sv_bless (
3470 coro_waitarray_new (aTHX_ count && SvOK (count) ? SvIV (count) : 1),
3471 GvSTASH (CvGV (cv))
3472 );
3473 OUTPUT:
3474 RETVAL
3475
3476# helper for Coro::Channel and others
3477SV *
3478_alloc (int count)
3479 CODE:
3480 RETVAL = coro_waitarray_new (aTHX_ count);
3481 OUTPUT:
3482 RETVAL
3483
3484SV *
3485count (SV *self)
3486 CODE:
3487 RETVAL = newSVsv (AvARRAY ((AV *)SvRV (self))[0]);
3488 OUTPUT:
3489 RETVAL
3490
3491void
3492up (SV *self, int adjust = 1)
3493 ALIAS:
3494 adjust = 1
3495 CODE:
3496 coro_semaphore_adjust (aTHX_ (AV *)SvRV (self), ix ? adjust : 1);
3497
3498void
3499down (...)
3500 CODE:
3501 CORO_EXECUTE_SLF_XS (slf_init_semaphore_down);
3502
3503void
3504wait (...)
3505 CODE:
3506 CORO_EXECUTE_SLF_XS (slf_init_semaphore_wait);
3507
3508void
3509try (SV *self)
3510 PPCODE:
3511{
3512 AV *av = (AV *)SvRV (self);
3513 SV *count_sv = AvARRAY (av)[0];
3514 IV count = SvIVX (count_sv);
3515
3516 if (count > 0)
3517 {
3518 --count;
3519 SvIVX (count_sv) = count;
3520 XSRETURN_YES;
3521 }
3522 else
3523 XSRETURN_NO;
3524}
3525
3526void
3527waiters (SV *self)
3528 PPCODE:
3529{
3530 AV *av = (AV *)SvRV (self);
3531 int wcount = AvFILLp (av) + 1 - 1;
3532
3533 if (GIMME_V == G_SCALAR)
3534 XPUSHs (sv_2mortal (newSViv (wcount)));
3535 else
3536 {
3537 int i;
3538 EXTEND (SP, wcount);
3539 for (i = 1; i <= wcount; ++i)
3540 PUSHs (sv_2mortal (newRV_inc (AvARRAY (av)[i])));
3541 }
3542}
3543
3544MODULE = Coro::State PACKAGE = Coro::SemaphoreSet
3545
3546void
3547_may_delete (SV *sem, int count, int extra_refs)
3548 PPCODE:
3549{
3550 AV *av = (AV *)SvRV (sem);
3551
3552 if (SvREFCNT ((SV *)av) == 1 + extra_refs
3553 && AvFILLp (av) == 0 /* no waiters, just count */
3554 && SvIV (AvARRAY (av)[0]) == count)
3555 XSRETURN_YES;
3556
3557 XSRETURN_NO;
3558}
3559
3560MODULE = Coro::State PACKAGE = Coro::Signal
3561
3562SV *
3563new (SV *klass)
3564 CODE:
3565 RETVAL = sv_bless (
3566 coro_waitarray_new (aTHX_ 0),
3567 GvSTASH (CvGV (cv))
3568 );
3569 OUTPUT:
3570 RETVAL
3571
3572void
3573wait (...)
3574 CODE:
3575 CORO_EXECUTE_SLF_XS (slf_init_signal_wait);
3576
3577void
3578broadcast (SV *self)
3579 CODE:
3580{
3581 AV *av = (AV *)SvRV (self);
3582 coro_signal_wake (aTHX_ av, AvFILLp (av));
3583}
3584
3585void
3586send (SV *self)
3587 CODE:
3588{
3589 AV *av = (AV *)SvRV (self);
3590
3591 if (AvFILLp (av))
3592 coro_signal_wake (aTHX_ av, 1);
3593 else
3594 SvIVX (AvARRAY (av)[0]) = 1; /* remember the signal */
3595}
3596
3597IV
3598awaited (SV *self)
3599 CODE:
3600 RETVAL = AvFILLp ((AV *)SvRV (self)) + 1 - 1;
3601 OUTPUT:
3602 RETVAL
3603
3604
3605MODULE = Coro::State PACKAGE = Coro::AnyEvent
3606
3607BOOT:
3608 sv_activity = coro_get_sv (aTHX_ "Coro::AnyEvent::ACTIVITY", TRUE);
3609
3610void
3611_schedule (...)
3612 CODE:
3613{
3614 static int incede;
3615
3616 api_cede_notself (aTHX);
3617
3618 ++incede;
3619 while (coro_nready >= incede && api_cede (aTHX))
3620 ;
3621
3622 sv_setsv (sv_activity, &PL_sv_undef);
3623 if (coro_nready >= incede)
3624 {
3625 PUSHMARK (SP);
3626 PUTBACK;
3627 call_pv ("Coro::AnyEvent::_activity", G_KEEPERR | G_EVAL | G_VOID | G_DISCARD);
3628 }
3629
3630 --incede;
3631}
3632
3633
3634MODULE = Coro::State PACKAGE = Coro::AIO
3635
3636void
3637_register (char *target, char *proto, SV *req)
3638 CODE:
3639{
3640 CV *req_cv = coro_sv_2cv (aTHX_ req);
3641 /* newXSproto doesn't return the CV on 5.8 */
3642 CV *slf_cv = newXS (target, coro_aio_req_xs, __FILE__);
3643 sv_setpv ((SV *)slf_cv, proto);
3644 sv_magicext ((SV *)slf_cv, (SV *)req_cv, CORO_MAGIC_type_aio, 0, 0, 0);
3645}
3646

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines