ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/Coro/Coro/State.xs
(Generate patch)

Comparing Coro/Coro/State.xs (file contents):
Revision 1.3 by root, Tue Jul 17 00:24:15 2001 UTC vs.
Revision 1.361 by root, Wed Jul 1 07:26:40 2009 UTC

1#include "libcoro/coro.c"
2
3#define PERL_NO_GET_CONTEXT
4#define PERL_EXT
5
1#include "EXTERN.h" 6#include "EXTERN.h"
2#include "perl.h" 7#include "perl.h"
3#include "XSUB.h" 8#include "XSUB.h"
9#include "perliol.h"
4 10
5#if 0 11#include "patchlevel.h"
6# define CHK(x) (void *)0 12
13#include <stdio.h>
14#include <errno.h>
15#include <assert.h>
16
17#ifdef WIN32
18# undef setjmp
19# undef longjmp
20# undef _exit
21# define setjmp _setjmp /* deep magic */
7#else 22#else
8# define CHK(x) if (!(x)) croak("FATAL, CHK: " #x) 23# include <inttypes.h> /* most portable stdint.h */
24#endif
25
26#ifdef HAVE_MMAP
27# include <unistd.h>
28# include <sys/mman.h>
29# ifndef MAP_ANONYMOUS
30# ifdef MAP_ANON
31# define MAP_ANONYMOUS MAP_ANON
32# else
33# undef HAVE_MMAP
34# endif
9#endif 35# endif
36# include <limits.h>
37# ifndef PAGESIZE
38# define PAGESIZE pagesize
39# define BOOT_PAGESIZE pagesize = sysconf (_SC_PAGESIZE)
40static long pagesize;
41# else
42# define BOOT_PAGESIZE (void)0
43# endif
44#else
45# define PAGESIZE 0
46# define BOOT_PAGESIZE (void)0
47#endif
10 48
49#if CORO_USE_VALGRIND
50# include <valgrind/valgrind.h>
51#endif
52
53/* the maximum number of idle cctx that will be pooled */
54static int cctx_max_idle = 4;
55
56#define PERL_VERSION_ATLEAST(a,b,c) \
57 (PERL_REVISION > (a) \
58 || (PERL_REVISION == (a) \
59 && (PERL_VERSION > (b) \
60 || (PERL_VERSION == (b) && PERL_SUBVERSION >= (c)))))
61
62#if !PERL_VERSION_ATLEAST (5,6,0)
63# ifndef PL_ppaddr
64# define PL_ppaddr ppaddr
65# endif
66# ifndef call_sv
67# define call_sv perl_call_sv
68# endif
69# ifndef get_sv
70# define get_sv perl_get_sv
71# endif
72# ifndef get_cv
73# define get_cv perl_get_cv
74# endif
75# ifndef IS_PADGV
76# define IS_PADGV(v) 0
77# endif
78# ifndef IS_PADCONST
79# define IS_PADCONST(v) 0
80# endif
81#endif
82
83/* 5.11 */
84#ifndef CxHASARGS
85# define CxHASARGS(cx) (cx)->blk_sub.hasargs
86#endif
87
88/* 5.10.0 */
89#ifndef SvREFCNT_inc_NN
90# define SvREFCNT_inc_NN(sv) SvREFCNT_inc (sv)
91#endif
92
93/* 5.8.8 */
94#ifndef GV_NOTQUAL
95# define GV_NOTQUAL 0
96#endif
97#ifndef newSV
98# define newSV(l) NEWSV(0,l)
99#endif
100#ifndef CvISXSUB_on
101# define CvISXSUB_on(cv) (void)cv
102#endif
103#ifndef CvISXSUB
104# define CvISXSUB(cv) (CvXSUB (cv) ? TRUE : FALSE)
105#endif
106#ifndef Newx
107# define Newx(ptr,nitems,type) New (0,ptr,nitems,type)
108#endif
109
110/* 5.8.7 */
111#ifndef SvRV_set
112# define SvRV_set(s,v) SvRV(s) = (v)
113#endif
114
115#if !__i386 && !__x86_64 && !__powerpc && !__m68k && !__alpha && !__mips && !__sparc64
116# undef CORO_STACKGUARD
117#endif
118
119#ifndef CORO_STACKGUARD
120# define CORO_STACKGUARD 0
121#endif
122
123/* prefer perl internal functions over our own? */
124#ifndef CORO_PREFER_PERL_FUNCTIONS
125# define CORO_PREFER_PERL_FUNCTIONS 0
126#endif
127
128/* The next macros try to return the current stack pointer, in an as
129 * portable way as possible. */
130#if __GNUC__ >= 4
131# define dSTACKLEVEL int stacklevel_dummy
132# define STACKLEVEL __builtin_frame_address (0)
133#else
134# define dSTACKLEVEL volatile void *stacklevel
135# define STACKLEVEL ((void *)&stacklevel)
136#endif
137
138#define IN_DESTRUCT PL_dirty
139
140#if __GNUC__ >= 3
141# define attribute(x) __attribute__(x)
142# define expect(expr,value) __builtin_expect ((expr), (value))
143# define INLINE static inline
144#else
145# define attribute(x)
146# define expect(expr,value) (expr)
147# define INLINE static
148#endif
149
150#define expect_false(expr) expect ((expr) != 0, 0)
151#define expect_true(expr) expect ((expr) != 0, 1)
152
153#define NOINLINE attribute ((noinline))
154
155#include "CoroAPI.h"
156#define GCoroAPI (&coroapi) /* very sneaky */
157
158#ifdef USE_ITHREADS
159# if CORO_PTHREAD
160static void *coro_thx;
161# endif
162#endif
163
164#ifdef __linux
165# include <time.h> /* for timespec */
166# include <syscall.h> /* for SYS_* */
167# ifdef SYS_clock_gettime
168# define coro_clock_gettime(id, ts) syscall (SYS_clock_gettime, (id), (ts))
169# define CORO_CLOCK_MONOTONIC 1
170# define CORO_CLOCK_THREAD_CPUTIME_ID 3
171# endif
172#endif
173
174static double (*nvtime)(); /* so why doesn't it take void? */
175static void (*u2time)(pTHX_ UV ret[2]);
176
177/* we hijack an hopefully unused CV flag for our purposes */
178#define CVf_SLF 0x4000
179static OP *pp_slf (pTHX);
180
181static U32 cctx_gen;
182static size_t cctx_stacksize = CORO_STACKSIZE;
183static struct CoroAPI coroapi;
184static AV *main_mainstack; /* used to differentiate between $main and others */
185static JMPENV *main_top_env;
186static HV *coro_state_stash, *coro_stash;
187static volatile SV *coro_mortal; /* will be freed/thrown after next transfer */
188
189static AV *av_destroy; /* destruction queue */
190static SV *sv_manager; /* the manager coro */
191static SV *sv_idle; /* $Coro::idle */
192
193static GV *irsgv; /* $/ */
194static GV *stdoutgv; /* *STDOUT */
195static SV *rv_diehook;
196static SV *rv_warnhook;
197static HV *hv_sig; /* %SIG */
198
199/* async_pool helper stuff */
200static SV *sv_pool_rss;
201static SV *sv_pool_size;
202static SV *sv_async_pool_idle; /* description string */
203static AV *av_async_pool; /* idle pool */
204static SV *sv_Coro; /* class string */
205static CV *cv_pool_handler;
206static CV *cv_coro_state_new;
207
208/* Coro::AnyEvent */
209static SV *sv_activity;
210
211/* enable processtime/realtime profiling */
212static char enable_times;
213typedef U32 coro_ts[2];
214static coro_ts time_real, time_cpu;
215static char times_valid;
216
217static struct coro_cctx *cctx_first;
218static int cctx_count, cctx_idle;
219
220enum {
221 CC_MAPPED = 0x01,
222 CC_NOREUSE = 0x02, /* throw this away after tracing */
223 CC_TRACE = 0x04,
224 CC_TRACE_SUB = 0x08, /* trace sub calls */
225 CC_TRACE_LINE = 0x10, /* trace each statement */
226 CC_TRACE_ALL = CC_TRACE_SUB | CC_TRACE_LINE,
227};
228
229/* this is a structure representing a c-level coroutine */
230typedef struct coro_cctx
231{
232 struct coro_cctx *next;
233
234 /* the stack */
235 void *sptr;
236 size_t ssize;
237
238 /* cpu state */
239 void *idle_sp; /* sp of top-level transfer/schedule/cede call */
240 JMPENV *idle_te; /* same as idle_sp, but for top_env, TODO: remove once stable */
241 JMPENV *top_env;
242 coro_context cctx;
243
244 U32 gen;
245#if CORO_USE_VALGRIND
246 int valgrind_id;
247#endif
248 unsigned char flags;
249} coro_cctx;
250
251coro_cctx *cctx_current; /* the currently running cctx */
252
253/*****************************************************************************/
254
255enum {
256 CF_RUNNING = 0x0001, /* coroutine is running */
257 CF_READY = 0x0002, /* coroutine is ready */
258 CF_NEW = 0x0004, /* has never been switched to */
259 CF_DESTROYED = 0x0008, /* coroutine data has been freed */
260 CF_SUSPENDED = 0x0010, /* coroutine can't be scheduled */
261};
262
263/* the structure where most of the perl state is stored, overlaid on the cxstack */
264typedef struct
265{
266 SV *defsv;
267 AV *defav;
268 SV *errsv;
269 SV *irsgv;
270 HV *hinthv;
271#define VAR(name,type) type name;
272# include "state.h"
273#undef VAR
274} perl_slots;
275
276#define SLOT_COUNT ((sizeof (perl_slots) + sizeof (PERL_CONTEXT) - 1) / sizeof (PERL_CONTEXT))
277
278/* this is a structure representing a perl-level coroutine */
11struct coro { 279struct coro {
12 U8 dowarn; 280 /* the C coroutine allocated to this perl coroutine, if any */
13 AV *defav; 281 coro_cctx *cctx;
14 282
15 PERL_SI *curstackinfo; 283 /* ready queue */
16 AV *curstack; 284 struct coro *next_ready;
285
286 /* state data */
287 struct CoroSLF slf_frame; /* saved slf frame */
17 AV *mainstack; 288 AV *mainstack;
18 SV **stack_sp; 289 perl_slots *slot; /* basically the saved sp */
19 OP *op;
20 SV **curpad;
21 SV **stack_base;
22 SV **stack_max;
23 SV **tmps_stack;
24 I32 tmps_floor;
25 I32 tmps_ix;
26 I32 tmps_max;
27 I32 *markstack;
28 I32 *markstack_ptr;
29 I32 *markstack_max;
30 I32 *scopestack;
31 I32 scopestack_ix;
32 I32 scopestack_max;
33 ANY *savestack;
34 I32 savestack_ix;
35 I32 savestack_max;
36 OP **retstack;
37 I32 retstack_ix;
38 I32 retstack_max;
39 COP *curcop;
40 290
41 AV *args; 291 CV *startcv; /* the CV to execute */
292 AV *args; /* data associated with this coroutine (initial args) */
293 int refcnt; /* coroutines are refcounted, yes */
294 int flags; /* CF_ flags */
295 HV *hv; /* the perl hash associated with this coro, if any */
296 void (*on_destroy)(pTHX_ struct coro *coro);
297
298 /* statistics */
299 int usecount; /* number of transfers to this coro */
300
301 /* coro process data */
302 int prio;
303 SV *except; /* exception to be thrown */
304 SV *rouse_cb;
305
306 /* async_pool */
307 SV *saved_deffh;
308 SV *invoke_cb;
309 AV *invoke_av;
310
311 /* on_enter/on_leave */
312 AV *on_enter;
313 AV *on_leave;
314
315 /* times */
316 coro_ts t_cpu, t_real;
317
318 /* linked list */
319 struct coro *next, *prev;
42}; 320};
43 321
44typedef struct coro *Coro__State; 322typedef struct coro *Coro__State;
45typedef struct coro *Coro__State_or_hashref; 323typedef struct coro *Coro__State_or_hashref;
46 324
47static HV *padlist_cache; 325/* the following variables are effectively part of the perl context */
326/* and get copied between struct coro and these variables */
327/* the mainr easonw e don't support windows process emulation */
328static struct CoroSLF slf_frame; /* the current slf frame */
48 329
49/* mostly copied from op.c:cv_clone2 */ 330/** Coro ********************************************************************/
50STATIC AV * 331
51clone_padlist (AV *protopadlist) 332#define CORO_PRIO_MAX 3
333#define CORO_PRIO_HIGH 1
334#define CORO_PRIO_NORMAL 0
335#define CORO_PRIO_LOW -1
336#define CORO_PRIO_IDLE -3
337#define CORO_PRIO_MIN -4
338
339/* for Coro.pm */
340static SV *coro_current;
341static SV *coro_readyhook;
342static struct coro *coro_ready [CORO_PRIO_MAX - CORO_PRIO_MIN + 1][2]; /* head|tail */
343static CV *cv_coro_run, *cv_coro_terminate;
344static struct coro *coro_first;
345#define coro_nready coroapi.nready
346
347/** lowlevel stuff **********************************************************/
348
349static SV *
350coro_get_sv (pTHX_ const char *name, int create)
52{ 351{
53 AV *av; 352#if PERL_VERSION_ATLEAST (5,10,0)
54 I32 ix; 353 /* silence stupid and wrong 5.10 warning that I am unable to switch off */
55 AV *protopad_name = (AV *) * av_fetch (protopadlist, 0, FALSE); 354 get_sv (name, create);
56 AV *protopad = (AV *) * av_fetch (protopadlist, 1, FALSE); 355#endif
57 SV **pname = AvARRAY (protopad_name); 356 return get_sv (name, create);
58 SV **ppad = AvARRAY (protopad); 357}
59 I32 fname = AvFILLp (protopad_name); 358
60 I32 fpad = AvFILLp (protopad); 359static AV *
360coro_get_av (pTHX_ const char *name, int create)
361{
362#if PERL_VERSION_ATLEAST (5,10,0)
363 /* silence stupid and wrong 5.10 warning that I am unable to switch off */
364 get_av (name, create);
365#endif
366 return get_av (name, create);
367}
368
369static HV *
370coro_get_hv (pTHX_ const char *name, int create)
371{
372#if PERL_VERSION_ATLEAST (5,10,0)
373 /* silence stupid and wrong 5.10 warning that I am unable to switch off */
374 get_hv (name, create);
375#endif
376 return get_hv (name, create);
377}
378
379/* may croak */
380INLINE CV *
381coro_sv_2cv (pTHX_ SV *sv)
382{
383 HV *st;
384 GV *gvp;
385 CV *cv = sv_2cv (sv, &st, &gvp, 0);
386
387 if (!cv)
388 croak ("code reference expected");
389
390 return cv;
391}
392
393INLINE void
394coro_times_update ()
395{
396#ifdef coro_clock_gettime
397 struct timespec ts;
398
399 ts.tv_sec = ts.tv_nsec = 0;
400 coro_clock_gettime (CORO_CLOCK_THREAD_CPUTIME_ID, &ts);
401 time_cpu [0] = ts.tv_sec; time_cpu [1] = ts.tv_nsec;
402
403 ts.tv_sec = ts.tv_nsec = 0;
404 coro_clock_gettime (CORO_CLOCK_MONOTONIC, &ts);
405 time_real [0] = ts.tv_sec; time_real [1] = ts.tv_nsec;
406#else
407 dTHX;
408 UV tv[2];
409
410 u2time (aTHX_ &tv);
411 time_real [0] = tv [0];
412 time_real [1] = tv [1] * 1000;
413#endif
414}
415
416INLINE void
417coro_times_add (struct coro *c)
418{
419 c->t_real [1] += time_real [1];
420 if (c->t_real [1] > 1000000000) { c->t_real [1] -= 1000000000; ++c->t_real [0]; }
421 c->t_real [0] += time_real [0];
422
423 c->t_cpu [1] += time_cpu [1];
424 if (c->t_cpu [1] > 1000000000) { c->t_cpu [1] -= 1000000000; ++c->t_cpu [0]; }
425 c->t_cpu [0] += time_cpu [0];
426}
427
428INLINE void
429coro_times_sub (struct coro *c)
430{
431 if (c->t_real [1] < time_real [1]) { c->t_real [1] += 1000000000; --c->t_real [0]; }
432 c->t_real [1] -= time_real [1];
433 c->t_real [0] -= time_real [0];
434
435 if (c->t_cpu [1] < time_cpu [1]) { c->t_cpu [1] += 1000000000; --c->t_cpu [0]; }
436 c->t_cpu [1] -= time_cpu [1];
437 c->t_cpu [0] -= time_cpu [0];
438}
439
440/*****************************************************************************/
441/* magic glue */
442
443#define CORO_MAGIC_type_cv 26
444#define CORO_MAGIC_type_state PERL_MAGIC_ext
445
446#define CORO_MAGIC_NN(sv, type) \
447 (expect_true (SvMAGIC (sv)->mg_type == type) \
448 ? SvMAGIC (sv) \
449 : mg_find (sv, type))
450
451#define CORO_MAGIC(sv, type) \
452 (expect_true (SvMAGIC (sv)) \
453 ? CORO_MAGIC_NN (sv, type) \
454 : 0)
455
456#define CORO_MAGIC_cv(cv) CORO_MAGIC (((SV *)(cv)), CORO_MAGIC_type_cv)
457#define CORO_MAGIC_state(sv) CORO_MAGIC_NN (((SV *)(sv)), CORO_MAGIC_type_state)
458
459INLINE struct coro *
460SvSTATE_ (pTHX_ SV *coro)
461{
462 HV *stash;
463 MAGIC *mg;
464
465 if (SvROK (coro))
466 coro = SvRV (coro);
467
468 if (expect_false (SvTYPE (coro) != SVt_PVHV))
469 croak ("Coro::State object required");
470
471 stash = SvSTASH (coro);
472 if (expect_false (stash != coro_stash && stash != coro_state_stash))
473 {
474 /* very slow, but rare, check */
475 if (!sv_derived_from (sv_2mortal (newRV_inc (coro)), "Coro::State"))
476 croak ("Coro::State object required");
477 }
478
479 mg = CORO_MAGIC_state (coro);
480 return (struct coro *)mg->mg_ptr;
481}
482
483#define SvSTATE(sv) SvSTATE_ (aTHX_ (sv))
484
485/* faster than SvSTATE, but expects a coroutine hv */
486#define SvSTATE_hv(hv) ((struct coro *)CORO_MAGIC_NN ((SV *)hv, CORO_MAGIC_type_state)->mg_ptr)
487#define SvSTATE_current SvSTATE_hv (SvRV (coro_current))
488
489/*****************************************************************************/
490/* padlist management and caching */
491
492static AV *
493coro_derive_padlist (pTHX_ CV *cv)
494{
495 AV *padlist = CvPADLIST (cv);
61 AV *newpadlist, *newpad_name, *newpad; 496 AV *newpadlist, *newpad;
62 SV **npad;
63
64 newpad_name = newAV ();
65 for (ix = fname; ix >= 0; ix--)
66 av_store (newpad_name, ix, SvREFCNT_inc (pname[ix]));
67
68 newpad = newAV ();
69 av_fill (newpad, AvFILLp (protopad));
70 npad = AvARRAY (newpad);
71 497
72 newpadlist = newAV (); 498 newpadlist = newAV ();
73 AvREAL_off (newpadlist); 499 AvREAL_off (newpadlist);
74 av_store (newpadlist, 0, (SV *) newpad_name); 500#if PERL_VERSION_ATLEAST (5,10,0)
501 Perl_pad_push (aTHX_ padlist, AvFILLp (padlist) + 1);
502#else
503 Perl_pad_push (aTHX_ padlist, AvFILLp (padlist) + 1, 1);
504#endif
505 newpad = (AV *)AvARRAY (padlist)[AvFILLp (padlist)];
506 --AvFILLp (padlist);
507
508 av_store (newpadlist, 0, SvREFCNT_inc_NN (AvARRAY (padlist)[0]));
75 av_store (newpadlist, 1, (SV *) newpad); 509 av_store (newpadlist, 1, (SV *)newpad);
76 510
77 av = newAV (); /* will be @_ */ 511 return newpadlist;
78 av_extend (av, 0); 512}
79 av_store (newpad, 0, (SV *) av);
80 AvFLAGS (av) = AVf_REIFY;
81 513
82 for (ix = fpad; ix > 0; ix--) 514static void
515free_padlist (pTHX_ AV *padlist)
516{
517 /* may be during global destruction */
518 if (!IN_DESTRUCT)
83 { 519 {
84 SV *namesv = (ix <= fname) ? pname[ix] : Nullsv; 520 I32 i = AvFILLp (padlist);
85 if (namesv && namesv != &PL_sv_undef) 521
522 while (i > 0) /* special-case index 0 */
86 { 523 {
87 char *name = SvPVX (namesv); /* XXX */ 524 /* we try to be extra-careful here */
88 if (SvFLAGS (namesv) & SVf_FAKE || *name == '&') 525 AV *av = (AV *)AvARRAY (padlist)[i--];
89 { /* lexical from outside? */ 526 I32 j = AvFILLp (av);
90 npad[ix] = SvREFCNT_inc (ppad[ix]); 527
91 } 528 while (j >= 0)
92 else 529 SvREFCNT_dec (AvARRAY (av)[j--]);
93 { /* our own lexical */ 530
94 SV *sv; 531 AvFILLp (av) = -1;
95 if (*name == '&') 532 SvREFCNT_dec (av);
96 sv = SvREFCNT_inc (ppad[ix]);
97 else if (*name == '@')
98 sv = (SV *) newAV ();
99 else if (*name == '%')
100 sv = (SV *) newHV ();
101 else
102 sv = NEWSV (0, 0);
103 if (!SvPADBUSY (sv))
104 SvPADMY_on (sv);
105 npad[ix] = sv;
106 }
107 } 533 }
108 else if (IS_PADGV (ppad[ix]) || IS_PADCONST (ppad[ix]))
109 {
110 npad[ix] = SvREFCNT_inc (ppad[ix]);
111 }
112 else
113 {
114 SV *sv = NEWSV (0, 0);
115 SvPADTMP_on (sv);
116 npad[ix] = sv;
117 }
118 }
119 534
120#if 0 /* NONOTUNDERSTOOD */ 535 SvREFCNT_dec (AvARRAY (padlist)[0]);
121 /* Now that vars are all in place, clone nested closures. */
122 536
123 for (ix = fpad; ix > 0; ix--) {
124 SV* namesv = (ix <= fname) ? pname[ix] : Nullsv;
125 if (namesv
126 && namesv != &PL_sv_undef
127 && !(SvFLAGS(namesv) & SVf_FAKE)
128 && *SvPVX(namesv) == '&'
129 && CvCLONE(ppad[ix]))
130 {
131 CV *kid = cv_clone((CV*)ppad[ix]);
132 SvREFCNT_dec(ppad[ix]);
133 CvCLONE_on(kid);
134 SvPADMY_on(kid);
135 npad[ix] = (SV*)kid;
136 }
137 }
138#endif
139
140 return newpadlist;
141}
142
143STATIC AV *
144free_padlist (AV *padlist)
145{
146 /* may be during global destruction */
147 if (SvREFCNT(padlist))
148 {
149 I32 i = AvFILLp(padlist); 537 AvFILLp (padlist) = -1;
150 while (i >= 0)
151 {
152 SV **svp = av_fetch(padlist, i--, FALSE);
153 SV *sv = svp ? *svp : Nullsv;
154 if (sv)
155 SvREFCNT_dec(sv);
156 }
157
158 SvREFCNT_dec((SV*)padlist); 538 SvREFCNT_dec ((SV*)padlist);
539 }
540}
541
542static int
543coro_cv_free (pTHX_ SV *sv, MAGIC *mg)
544{
545 AV *padlist;
546 AV *av = (AV *)mg->mg_obj;
547
548 /* perl manages to free our internal AV and _then_ call us */
549 if (IN_DESTRUCT)
550 return;
551
552 /* casting is fun. */
553 while (&PL_sv_undef != (SV *)(padlist = (AV *)av_pop (av)))
554 free_padlist (aTHX_ padlist);
555
556 SvREFCNT_dec (av); /* sv_magicext increased the refcount */
557
558 return 0;
559}
560
561static MGVTBL coro_cv_vtbl = {
562 0, 0, 0, 0,
563 coro_cv_free
564};
565
566/* the next two functions merely cache the padlists */
567static void
568get_padlist (pTHX_ CV *cv)
569{
570 MAGIC *mg = CORO_MAGIC_cv (cv);
571 AV *av;
572
573 if (expect_true (mg && AvFILLp ((av = (AV *)mg->mg_obj)) >= 0))
574 CvPADLIST (cv) = (AV *)AvARRAY (av)[AvFILLp (av)--];
575 else
576 {
577#if CORO_PREFER_PERL_FUNCTIONS
578 /* this is probably cleaner? but also slower! */
579 /* in practise, it seems to be less stable */
580 CV *cp = Perl_cv_clone (aTHX_ cv);
581 CvPADLIST (cv) = CvPADLIST (cp);
582 CvPADLIST (cp) = 0;
583 SvREFCNT_dec (cp);
584#else
585 CvPADLIST (cv) = coro_derive_padlist (aTHX_ cv);
586#endif
587 }
588}
589
590static void
591put_padlist (pTHX_ CV *cv)
592{
593 MAGIC *mg = CORO_MAGIC_cv (cv);
594 AV *av;
595
596 if (expect_false (!mg))
597 mg = sv_magicext ((SV *)cv, (SV *)newAV (), CORO_MAGIC_type_cv, &coro_cv_vtbl, 0, 0);
598
599 av = (AV *)mg->mg_obj;
600
601 if (expect_false (AvFILLp (av) >= AvMAX (av)))
602 av_extend (av, AvFILLp (av) + 1);
603
604 AvARRAY (av)[++AvFILLp (av)] = (SV *)CvPADLIST (cv);
605}
606
607/** load & save, init *******************************************************/
608
609static void
610on_enterleave_call (pTHX_ SV *cb);
611
612static void
613load_perl (pTHX_ Coro__State c)
614{
615 perl_slots *slot = c->slot;
616 c->slot = 0;
617
618 PL_mainstack = c->mainstack;
619
620 GvSV (PL_defgv) = slot->defsv;
621 GvAV (PL_defgv) = slot->defav;
622 GvSV (PL_errgv) = slot->errsv;
623 GvSV (irsgv) = slot->irsgv;
624 GvHV (PL_hintgv) = slot->hinthv;
625
626 #define VAR(name,type) PL_ ## name = slot->name;
627 # include "state.h"
628 #undef VAR
629
630 {
631 dSP;
632
633 CV *cv;
634
635 /* now do the ugly restore mess */
636 while (expect_true (cv = (CV *)POPs))
637 {
638 put_padlist (aTHX_ cv); /* mark this padlist as available */
639 CvDEPTH (cv) = PTR2IV (POPs);
640 CvPADLIST (cv) = (AV *)POPs;
641 }
642
643 PUTBACK;
159 } 644 }
160}
161 645
162STATIC AV * 646 slf_frame = c->slf_frame;
163unuse_padlist (AV *padlist) 647 CORO_THROW = c->except;
164{
165 free_padlist (padlist);
166}
167 648
649 if (expect_false (enable_times))
650 {
651 if (expect_false (!times_valid))
652 coro_times_update ();
653
654 coro_times_sub (c);
655 }
656
657 if (expect_false (c->on_enter))
658 {
659 int i;
660
661 for (i = 0; i <= AvFILLp (c->on_enter); ++i)
662 on_enterleave_call (aTHX_ AvARRAY (c->on_enter)[i]);
663 }
664}
665
168static void 666static void
169SAVE(pTHX_ Coro__State c) 667save_perl (pTHX_ Coro__State c)
170{ 668{
669 if (expect_false (c->on_leave))
670 {
671 int i;
672
673 for (i = AvFILLp (c->on_leave); i >= 0; --i)
674 on_enterleave_call (aTHX_ AvARRAY (c->on_leave)[i]);
675 }
676
677 times_valid = 0;
678
679 if (expect_false (enable_times))
680 {
681 coro_times_update (); times_valid = 1;
682 coro_times_add (c);
683 }
684
685 c->except = CORO_THROW;
686 c->slf_frame = slf_frame;
687
171 { 688 {
172 dSP; 689 dSP;
173 I32 cxix = cxstack_ix; 690 I32 cxix = cxstack_ix;
691 PERL_CONTEXT *ccstk = cxstack;
174 PERL_SI *top_si = PL_curstackinfo; 692 PERL_SI *top_si = PL_curstackinfo;
175 PERL_CONTEXT *ccstk = cxstack;
176 693
177 /* 694 /*
178 * the worst thing you can imagine happens first - we have to save 695 * the worst thing you can imagine happens first - we have to save
179 * (and reinitialize) all cv's in the whole callchain :( 696 * (and reinitialize) all cv's in the whole callchain :(
180 */ 697 */
181 698
182 PUSHs (Nullsv); 699 XPUSHs (Nullsv);
183 /* this loop was inspired by pp_caller */ 700 /* this loop was inspired by pp_caller */
184 for (;;) 701 for (;;)
185 { 702 {
186 while (cxix >= 0) 703 while (expect_true (cxix >= 0))
187 { 704 {
188 PERL_CONTEXT *cx = &ccstk[--cxix]; 705 PERL_CONTEXT *cx = &ccstk[cxix--];
189 706
190 if (CxTYPE(cx) == CXt_SUB) 707 if (expect_true (CxTYPE (cx) == CXt_SUB) || expect_false (CxTYPE (cx) == CXt_FORMAT))
191 { 708 {
192 CV *cv = cx->blk_sub.cv; 709 CV *cv = cx->blk_sub.cv;
710
193 if (CvDEPTH(cv)) 711 if (expect_true (CvDEPTH (cv)))
194 { 712 {
195#ifdef USE_THREADS
196 XPUSHs ((SV *)CvOWNER(cv));
197#endif
198 EXTEND (SP, 3); 713 EXTEND (SP, 3);
199 PUSHs ((SV *)CvDEPTH(cv));
200 PUSHs ((SV *)CvPADLIST(cv)); 714 PUSHs ((SV *)CvPADLIST (cv));
715 PUSHs (INT2PTR (SV *, (IV)CvDEPTH (cv)));
201 PUSHs ((SV *)cv); 716 PUSHs ((SV *)cv);
202 717
203 CvPADLIST(cv) = clone_padlist (CvPADLIST(cv));
204
205 CvDEPTH(cv) = 0; 718 CvDEPTH (cv) = 0;
206#ifdef USE_THREADS 719 get_padlist (aTHX_ cv);
207 CvOWNER(cv) = 0;
208 error must unlock this cv etc.. etc...
209 if you are here wondering about this error message then
210 the reason is that it will not work as advertised yet
211#endif
212 } 720 }
213 } 721 }
214 else if (CxTYPE(cx) == CXt_FORMAT) 722 }
723
724 if (expect_true (top_si->si_type == PERLSI_MAIN))
725 break;
726
727 top_si = top_si->si_prev;
728 ccstk = top_si->si_cxstack;
729 cxix = top_si->si_cxix;
730 }
731
732 PUTBACK;
733 }
734
735 /* allocate some space on the context stack for our purposes */
736 /* we manually unroll here, as usually 2 slots is enough */
737 if (SLOT_COUNT >= 1) CXINC;
738 if (SLOT_COUNT >= 2) CXINC;
739 if (SLOT_COUNT >= 3) CXINC;
740 {
741 int i;
742 for (i = 3; i < SLOT_COUNT; ++i)
743 CXINC;
744 }
745 cxstack_ix -= SLOT_COUNT; /* undo allocation */
746
747 c->mainstack = PL_mainstack;
748
749 {
750 perl_slots *slot = c->slot = (perl_slots *)(cxstack + cxstack_ix + 1);
751
752 slot->defav = GvAV (PL_defgv);
753 slot->defsv = DEFSV;
754 slot->errsv = ERRSV;
755 slot->irsgv = GvSV (irsgv);
756 slot->hinthv = GvHV (PL_hintgv);
757
758 #define VAR(name,type) slot->name = PL_ ## name;
759 # include "state.h"
760 #undef VAR
761 }
762}
763
764/*
765 * allocate various perl stacks. This is almost an exact copy
766 * of perl.c:init_stacks, except that it uses less memory
767 * on the (sometimes correct) assumption that coroutines do
768 * not usually need a lot of stackspace.
769 */
770#if CORO_PREFER_PERL_FUNCTIONS
771# define coro_init_stacks(thx) init_stacks ()
772#else
773static void
774coro_init_stacks (pTHX)
775{
776 PL_curstackinfo = new_stackinfo(32, 8);
777 PL_curstackinfo->si_type = PERLSI_MAIN;
778 PL_curstack = PL_curstackinfo->si_stack;
779 PL_mainstack = PL_curstack; /* remember in case we switch stacks */
780
781 PL_stack_base = AvARRAY(PL_curstack);
782 PL_stack_sp = PL_stack_base;
783 PL_stack_max = PL_stack_base + AvMAX(PL_curstack);
784
785 New(50,PL_tmps_stack,32,SV*);
786 PL_tmps_floor = -1;
787 PL_tmps_ix = -1;
788 PL_tmps_max = 32;
789
790 New(54,PL_markstack,16,I32);
791 PL_markstack_ptr = PL_markstack;
792 PL_markstack_max = PL_markstack + 16;
793
794#ifdef SET_MARK_OFFSET
795 SET_MARK_OFFSET;
796#endif
797
798 New(54,PL_scopestack,8,I32);
799 PL_scopestack_ix = 0;
800 PL_scopestack_max = 8;
801
802 New(54,PL_savestack,24,ANY);
803 PL_savestack_ix = 0;
804 PL_savestack_max = 24;
805
806#if !PERL_VERSION_ATLEAST (5,10,0)
807 New(54,PL_retstack,4,OP*);
808 PL_retstack_ix = 0;
809 PL_retstack_max = 4;
810#endif
811}
812#endif
813
814/*
815 * destroy the stacks, the callchain etc...
816 */
817static void
818coro_destruct_stacks (pTHX)
819{
820 while (PL_curstackinfo->si_next)
821 PL_curstackinfo = PL_curstackinfo->si_next;
822
823 while (PL_curstackinfo)
824 {
825 PERL_SI *p = PL_curstackinfo->si_prev;
826
827 if (!IN_DESTRUCT)
828 SvREFCNT_dec (PL_curstackinfo->si_stack);
829
830 Safefree (PL_curstackinfo->si_cxstack);
831 Safefree (PL_curstackinfo);
832 PL_curstackinfo = p;
833 }
834
835 Safefree (PL_tmps_stack);
836 Safefree (PL_markstack);
837 Safefree (PL_scopestack);
838 Safefree (PL_savestack);
839#if !PERL_VERSION_ATLEAST (5,10,0)
840 Safefree (PL_retstack);
841#endif
842}
843
844#define CORO_RSS \
845 rss += sizeof (SYM (curstackinfo)); \
846 rss += (SYM (curstackinfo->si_cxmax) + 1) * sizeof (PERL_CONTEXT); \
847 rss += sizeof (SV) + sizeof (struct xpvav) + (1 + AvMAX (SYM (curstack))) * sizeof (SV *); \
848 rss += SYM (tmps_max) * sizeof (SV *); \
849 rss += (SYM (markstack_max) - SYM (markstack_ptr)) * sizeof (I32); \
850 rss += SYM (scopestack_max) * sizeof (I32); \
851 rss += SYM (savestack_max) * sizeof (ANY);
852
853static size_t
854coro_rss (pTHX_ struct coro *coro)
855{
856 size_t rss = sizeof (*coro);
857
858 if (coro->mainstack)
859 {
860 if (coro->flags & CF_RUNNING)
861 {
862 #define SYM(sym) PL_ ## sym
863 CORO_RSS;
864 #undef SYM
865 }
866 else
867 {
868 #define SYM(sym) coro->slot->sym
869 CORO_RSS;
870 #undef SYM
871 }
872 }
873
874 return rss;
875}
876
877/** coroutine stack handling ************************************************/
878
879static int (*orig_sigelem_get) (pTHX_ SV *sv, MAGIC *mg);
880static int (*orig_sigelem_set) (pTHX_ SV *sv, MAGIC *mg);
881static int (*orig_sigelem_clr) (pTHX_ SV *sv, MAGIC *mg);
882
883/* apparently < 5.8.8 */
884#ifndef MgPV_nolen_const
885#define MgPV_nolen_const(mg) (((((int)(mg)->mg_len)) == HEf_SVKEY) ? \
886 SvPV_nolen((SV*)((mg)->mg_ptr)) : \
887 (const char*)(mg)->mg_ptr)
888#endif
889
890/*
891 * This overrides the default magic get method of %SIG elements.
892 * The original one doesn't provide for reading back of PL_diehook/PL_warnhook
893 * and instead of trying to save and restore the hash elements, we just provide
894 * readback here.
895 */
896static int
897coro_sigelem_get (pTHX_ SV *sv, MAGIC *mg)
898{
899 const char *s = MgPV_nolen_const (mg);
900
901 if (*s == '_')
902 {
903 SV **svp = 0;
904
905 if (strEQ (s, "__DIE__" )) svp = &PL_diehook;
906 if (strEQ (s, "__WARN__")) svp = &PL_warnhook;
907
908 if (svp)
909 {
910 sv_setsv (sv, *svp ? *svp : &PL_sv_undef);
911 return 0;
912 }
913 }
914
915 return orig_sigelem_get ? orig_sigelem_get (aTHX_ sv, mg) : 0;
916}
917
918static int
919coro_sigelem_clr (pTHX_ SV *sv, MAGIC *mg)
920{
921 const char *s = MgPV_nolen_const (mg);
922
923 if (*s == '_')
924 {
925 SV **svp = 0;
926
927 if (strEQ (s, "__DIE__" )) svp = &PL_diehook;
928 if (strEQ (s, "__WARN__")) svp = &PL_warnhook;
929
930 if (svp)
931 {
932 SV *old = *svp;
933 *svp = 0;
934 SvREFCNT_dec (old);
935 return 0;
936 }
937 }
938
939 return orig_sigelem_clr ? orig_sigelem_clr (aTHX_ sv, mg) : 0;
940}
941
942static int
943coro_sigelem_set (pTHX_ SV *sv, MAGIC *mg)
944{
945 const char *s = MgPV_nolen_const (mg);
946
947 if (*s == '_')
948 {
949 SV **svp = 0;
950
951 if (strEQ (s, "__DIE__" )) svp = &PL_diehook;
952 if (strEQ (s, "__WARN__")) svp = &PL_warnhook;
953
954 if (svp)
955 {
956 SV *old = *svp;
957 *svp = SvOK (sv) ? newSVsv (sv) : 0;
958 SvREFCNT_dec (old);
959 return 0;
960 }
961 }
962
963 return orig_sigelem_set ? orig_sigelem_set (aTHX_ sv, mg) : 0;
964}
965
966static void
967prepare_nop (pTHX_ struct coro_transfer_args *ta)
968{
969 /* kind of mega-hacky, but works */
970 ta->next = ta->prev = (struct coro *)ta;
971}
972
973static int
974slf_check_nop (pTHX_ struct CoroSLF *frame)
975{
976 return 0;
977}
978
979static int
980slf_check_repeat (pTHX_ struct CoroSLF *frame)
981{
982 return 1;
983}
984
985static UNOP coro_setup_op;
986
987static void NOINLINE /* noinline to keep it out of the transfer fast path */
988coro_setup (pTHX_ struct coro *coro)
989{
990 /*
991 * emulate part of the perl startup here.
992 */
993 coro_init_stacks (aTHX);
994
995 PL_runops = RUNOPS_DEFAULT;
996 PL_curcop = &PL_compiling;
997 PL_in_eval = EVAL_NULL;
998 PL_comppad = 0;
999 PL_comppad_name = 0;
1000 PL_comppad_name_fill = 0;
1001 PL_comppad_name_floor = 0;
1002 PL_curpm = 0;
1003 PL_curpad = 0;
1004 PL_localizing = 0;
1005 PL_dirty = 0;
1006 PL_restartop = 0;
1007#if PERL_VERSION_ATLEAST (5,10,0)
1008 PL_parser = 0;
1009#endif
1010 PL_hints = 0;
1011
1012 /* recreate the die/warn hooks */
1013 PL_diehook = 0; SvSetMagicSV (*hv_fetch (hv_sig, "__DIE__" , sizeof ("__DIE__" ) - 1, 1), rv_diehook );
1014 PL_warnhook = 0; SvSetMagicSV (*hv_fetch (hv_sig, "__WARN__", sizeof ("__WARN__") - 1, 1), rv_warnhook);
1015
1016 GvSV (PL_defgv) = newSV (0);
1017 GvAV (PL_defgv) = coro->args; coro->args = 0;
1018 GvSV (PL_errgv) = newSV (0);
1019 GvSV (irsgv) = newSVpvn ("\n", 1); sv_magic (GvSV (irsgv), (SV *)irsgv, PERL_MAGIC_sv, "/", 0);
1020 GvHV (PL_hintgv) = 0;
1021 PL_rs = newSVsv (GvSV (irsgv));
1022 PL_defoutgv = (GV *)SvREFCNT_inc_NN (stdoutgv);
1023
1024 {
1025 dSP;
1026 UNOP myop;
1027
1028 Zero (&myop, 1, UNOP);
1029 myop.op_next = Nullop;
1030 myop.op_type = OP_ENTERSUB;
1031 myop.op_flags = OPf_WANT_VOID;
1032
1033 PUSHMARK (SP);
1034 PUSHs ((SV *)coro->startcv);
1035 PUTBACK;
1036 PL_op = (OP *)&myop;
1037 PL_op = PL_ppaddr[OP_ENTERSUB](aTHX);
1038 }
1039
1040 /* this newly created coroutine might be run on an existing cctx which most
1041 * likely was suspended in pp_slf, so we have to emulate entering pp_slf here.
1042 */
1043 slf_frame.prepare = prepare_nop; /* provide a nop function for an eventual pp_slf */
1044 slf_frame.check = slf_check_nop; /* signal pp_slf to not repeat */
1045
1046 /* and we have to provide the pp_slf op in any case, so pp_slf can skip it */
1047 coro_setup_op.op_next = PL_op;
1048 coro_setup_op.op_type = OP_ENTERSUB;
1049 coro_setup_op.op_ppaddr = pp_slf;
1050 /* no flags etc. required, as an init function won't be called */
1051
1052 PL_op = (OP *)&coro_setup_op;
1053
1054 /* copy throw, in case it was set before coro_setup */
1055 CORO_THROW = coro->except;
1056
1057 if (expect_false (enable_times))
1058 {
1059 coro_times_update ();
1060 coro_times_sub (coro);
1061 }
1062}
1063
1064static void
1065coro_unwind_stacks (pTHX)
1066{
1067 if (!IN_DESTRUCT)
1068 {
1069 /* restore all saved variables and stuff */
1070 LEAVE_SCOPE (0);
1071 assert (PL_tmps_floor == -1);
1072
1073 /* free all temporaries */
1074 FREETMPS;
1075 assert (PL_tmps_ix == -1);
1076
1077 /* unwind all extra stacks */
1078 POPSTACK_TO (PL_mainstack);
1079
1080 /* unwind main stack */
1081 dounwind (-1);
1082 }
1083}
1084
1085static void
1086coro_destruct_perl (pTHX_ struct coro *coro)
1087{
1088 SV *svf [9];
1089
1090 {
1091 struct coro *current = SvSTATE_current;
1092
1093 assert (("FATAL: tried to destroy currently running coroutine", coro->mainstack != PL_mainstack));
1094
1095 save_perl (aTHX_ current);
1096 load_perl (aTHX_ coro);
1097
1098 coro_unwind_stacks (aTHX);
1099 coro_destruct_stacks (aTHX);
1100
1101 // now save some sv's to be free'd later
1102 svf [0] = GvSV (PL_defgv);
1103 svf [1] = (SV *)GvAV (PL_defgv);
1104 svf [2] = GvSV (PL_errgv);
1105 svf [3] = (SV *)PL_defoutgv;
1106 svf [4] = PL_rs;
1107 svf [5] = GvSV (irsgv);
1108 svf [6] = (SV *)GvHV (PL_hintgv);
1109 svf [7] = PL_diehook;
1110 svf [8] = PL_warnhook;
1111 assert (9 == sizeof (svf) / sizeof (*svf));
1112
1113 load_perl (aTHX_ current);
1114 }
1115
1116 {
1117 int i;
1118
1119 for (i = 0; i < sizeof (svf) / sizeof (*svf); ++i)
1120 SvREFCNT_dec (svf [i]);
1121
1122 SvREFCNT_dec (coro->saved_deffh);
1123 SvREFCNT_dec (coro->rouse_cb);
1124 SvREFCNT_dec (coro->invoke_cb);
1125 SvREFCNT_dec (coro->invoke_av);
1126 }
1127}
1128
1129INLINE void
1130free_coro_mortal (pTHX)
1131{
1132 if (expect_true (coro_mortal))
1133 {
1134 SvREFCNT_dec (coro_mortal);
1135 coro_mortal = 0;
1136 }
1137}
1138
1139static int
1140runops_trace (pTHX)
1141{
1142 COP *oldcop = 0;
1143 int oldcxix = -2;
1144
1145 while ((PL_op = CALL_FPTR (PL_op->op_ppaddr) (aTHX)))
1146 {
1147 PERL_ASYNC_CHECK ();
1148
1149 if (cctx_current->flags & CC_TRACE_ALL)
1150 {
1151 if (PL_op->op_type == OP_LEAVESUB && cctx_current->flags & CC_TRACE_SUB)
1152 {
1153 PERL_CONTEXT *cx = &cxstack[cxstack_ix];
1154 SV **bot, **top;
1155 AV *av = newAV (); /* return values */
1156 SV **cb;
1157 dSP;
1158
1159 GV *gv = CvGV (cx->blk_sub.cv);
1160 SV *fullname = sv_2mortal (newSV (0));
1161 if (isGV (gv))
1162 gv_efullname3 (fullname, gv, 0);
1163
1164 bot = PL_stack_base + cx->blk_oldsp + 1;
1165 top = cx->blk_gimme == G_ARRAY ? SP + 1
1166 : cx->blk_gimme == G_SCALAR ? bot + 1
1167 : bot;
1168
1169 av_extend (av, top - bot);
1170 while (bot < top)
1171 av_push (av, SvREFCNT_inc_NN (*bot++));
1172
1173 PL_runops = RUNOPS_DEFAULT;
1174 ENTER;
1175 SAVETMPS;
1176 EXTEND (SP, 3);
1177 PUSHMARK (SP);
1178 PUSHs (&PL_sv_no);
1179 PUSHs (fullname);
1180 PUSHs (sv_2mortal (newRV_noinc ((SV *)av)));
1181 PUTBACK;
1182 cb = hv_fetch ((HV *)SvRV (coro_current), "_trace_sub_cb", sizeof ("_trace_sub_cb") - 1, 0);
1183 if (cb) call_sv (*cb, G_KEEPERR | G_EVAL | G_VOID | G_DISCARD);
1184 SPAGAIN;
1185 FREETMPS;
1186 LEAVE;
1187 PL_runops = runops_trace;
1188 }
1189
1190 if (oldcop != PL_curcop)
1191 {
1192 oldcop = PL_curcop;
1193
1194 if (PL_curcop != &PL_compiling)
1195 {
1196 SV **cb;
1197
1198 if (oldcxix != cxstack_ix && cctx_current->flags & CC_TRACE_SUB)
1199 {
1200 PERL_CONTEXT *cx = &cxstack[cxstack_ix];
1201
1202 if (CxTYPE (cx) == CXt_SUB && oldcxix < cxstack_ix)
1203 {
1204 dSP;
1205 GV *gv = CvGV (cx->blk_sub.cv);
1206 SV *fullname = sv_2mortal (newSV (0));
1207
1208 if (isGV (gv))
1209 gv_efullname3 (fullname, gv, 0);
1210
1211 PL_runops = RUNOPS_DEFAULT;
1212 ENTER;
1213 SAVETMPS;
1214 EXTEND (SP, 3);
1215 PUSHMARK (SP);
1216 PUSHs (&PL_sv_yes);
1217 PUSHs (fullname);
1218 PUSHs (CxHASARGS (cx) ? sv_2mortal (newRV_inc ((SV *)cx->blk_sub.argarray)) : &PL_sv_undef);
1219 PUTBACK;
1220 cb = hv_fetch ((HV *)SvRV (coro_current), "_trace_sub_cb", sizeof ("_trace_sub_cb") - 1, 0);
1221 if (cb) call_sv (*cb, G_KEEPERR | G_EVAL | G_VOID | G_DISCARD);
1222 SPAGAIN;
1223 FREETMPS;
1224 LEAVE;
1225 PL_runops = runops_trace;
1226 }
1227
1228 oldcxix = cxstack_ix;
1229 }
1230
1231 if (cctx_current->flags & CC_TRACE_LINE)
1232 {
1233 dSP;
1234
1235 PL_runops = RUNOPS_DEFAULT;
1236 ENTER;
1237 SAVETMPS;
1238 EXTEND (SP, 3);
1239 PL_runops = RUNOPS_DEFAULT;
1240 PUSHMARK (SP);
1241 PUSHs (sv_2mortal (newSVpv (OutCopFILE (oldcop), 0)));
1242 PUSHs (sv_2mortal (newSViv (CopLINE (oldcop))));
1243 PUTBACK;
1244 cb = hv_fetch ((HV *)SvRV (coro_current), "_trace_line_cb", sizeof ("_trace_line_cb") - 1, 0);
1245 if (cb) call_sv (*cb, G_KEEPERR | G_EVAL | G_VOID | G_DISCARD);
1246 SPAGAIN;
1247 FREETMPS;
1248 LEAVE;
1249 PL_runops = runops_trace;
1250 }
1251 }
1252 }
1253 }
1254 }
1255
1256 TAINT_NOT;
1257 return 0;
1258}
1259
1260static struct CoroSLF cctx_ssl_frame;
1261
1262static void
1263slf_prepare_set_stacklevel (pTHX_ struct coro_transfer_args *ta)
1264{
1265 ta->prev = 0;
1266}
1267
1268static int
1269slf_check_set_stacklevel (pTHX_ struct CoroSLF *frame)
1270{
1271 *frame = cctx_ssl_frame;
1272
1273 return frame->check (aTHX_ frame); /* execute the restored frame - there must be one */
1274}
1275
1276/* initialises PL_top_env and injects a pseudo-slf-call to set the stacklevel */
1277static void NOINLINE
1278cctx_prepare (pTHX)
1279{
1280 PL_top_env = &PL_start_env;
1281
1282 if (cctx_current->flags & CC_TRACE)
1283 PL_runops = runops_trace;
1284
1285 /* we already must be executing an SLF op, there is no other valid way
1286 * that can lead to creation of a new cctx */
1287 assert (("FATAL: can't prepare slf-less cctx in Coro module (please report)",
1288 slf_frame.prepare && PL_op->op_ppaddr == pp_slf));
1289
1290 /* we must emulate leaving pp_slf, which is done inside slf_check_set_stacklevel */
1291 cctx_ssl_frame = slf_frame;
1292
1293 slf_frame.prepare = slf_prepare_set_stacklevel;
1294 slf_frame.check = slf_check_set_stacklevel;
1295}
1296
1297/* the tail of transfer: execute stuff we can only do after a transfer */
1298INLINE void
1299transfer_tail (pTHX)
1300{
1301 free_coro_mortal (aTHX);
1302}
1303
1304/*
1305 * this is a _very_ stripped down perl interpreter ;)
1306 */
1307static void
1308cctx_run (void *arg)
1309{
1310#ifdef USE_ITHREADS
1311# if CORO_PTHREAD
1312 PERL_SET_CONTEXT (coro_thx);
1313# endif
1314#endif
1315 {
1316 dTHX;
1317
1318 /* normally we would need to skip the entersub here */
1319 /* not doing so will re-execute it, which is exactly what we want */
1320 /* PL_nop = PL_nop->op_next */
1321
1322 /* inject a fake subroutine call to cctx_init */
1323 cctx_prepare (aTHX);
1324
1325 /* cctx_run is the alternative tail of transfer() */
1326 transfer_tail (aTHX);
1327
1328 /* somebody or something will hit me for both perl_run and PL_restartop */
1329 PL_restartop = PL_op;
1330 perl_run (PL_curinterp);
1331 /*
1332 * Unfortunately, there is no way to get at the return values of the
1333 * coro body here, as perl_run destroys these
1334 */
1335
1336 /*
1337 * If perl-run returns we assume exit() was being called or the coro
1338 * fell off the end, which seems to be the only valid (non-bug)
1339 * reason for perl_run to return. We try to exit by jumping to the
1340 * bootstrap-time "top" top_env, as we cannot restore the "main"
1341 * coroutine as Coro has no such concept.
1342 * This actually isn't valid with the pthread backend, but OSes requiring
1343 * that backend are too broken to do it in a standards-compliant way.
1344 */
1345 PL_top_env = main_top_env;
1346 JMPENV_JUMP (2); /* I do not feel well about the hardcoded 2 at all */
1347 }
1348}
1349
1350static coro_cctx *
1351cctx_new ()
1352{
1353 coro_cctx *cctx;
1354
1355 ++cctx_count;
1356 New (0, cctx, 1, coro_cctx);
1357
1358 cctx->gen = cctx_gen;
1359 cctx->flags = 0;
1360 cctx->idle_sp = 0; /* can be accessed by transfer between cctx_run and set_stacklevel, on throw */
1361
1362 return cctx;
1363}
1364
1365/* create a new cctx only suitable as source */
1366static coro_cctx *
1367cctx_new_empty ()
1368{
1369 coro_cctx *cctx = cctx_new ();
1370
1371 cctx->sptr = 0;
1372 coro_create (&cctx->cctx, 0, 0, 0, 0);
1373
1374 return cctx;
1375}
1376
1377/* create a new cctx suitable as destination/running a perl interpreter */
1378static coro_cctx *
1379cctx_new_run ()
1380{
1381 coro_cctx *cctx = cctx_new ();
1382 void *stack_start;
1383 size_t stack_size;
1384
1385#if HAVE_MMAP
1386 cctx->ssize = ((cctx_stacksize * sizeof (long) + PAGESIZE - 1) / PAGESIZE + CORO_STACKGUARD) * PAGESIZE;
1387 /* mmap supposedly does allocate-on-write for us */
1388 cctx->sptr = mmap (0, cctx->ssize, PROT_EXEC|PROT_READ|PROT_WRITE, MAP_PRIVATE|MAP_ANONYMOUS, 0, 0);
1389
1390 if (cctx->sptr != (void *)-1)
1391 {
1392 #if CORO_STACKGUARD
1393 mprotect (cctx->sptr, CORO_STACKGUARD * PAGESIZE, PROT_NONE);
1394 #endif
1395 stack_start = (char *)cctx->sptr + CORO_STACKGUARD * PAGESIZE;
1396 stack_size = cctx->ssize - CORO_STACKGUARD * PAGESIZE;
1397 cctx->flags |= CC_MAPPED;
1398 }
1399 else
1400#endif
1401 {
1402 cctx->ssize = cctx_stacksize * (long)sizeof (long);
1403 New (0, cctx->sptr, cctx_stacksize, long);
1404
1405 if (!cctx->sptr)
1406 {
1407 perror ("FATAL: unable to allocate stack for coroutine, exiting.");
1408 _exit (EXIT_FAILURE);
1409 }
1410
1411 stack_start = cctx->sptr;
1412 stack_size = cctx->ssize;
1413 }
1414
1415 #if CORO_USE_VALGRIND
1416 cctx->valgrind_id = VALGRIND_STACK_REGISTER ((char *)stack_start, (char *)stack_start + stack_size);
1417 #endif
1418
1419 coro_create (&cctx->cctx, cctx_run, (void *)cctx, stack_start, stack_size);
1420
1421 return cctx;
1422}
1423
1424static void
1425cctx_destroy (coro_cctx *cctx)
1426{
1427 if (!cctx)
1428 return;
1429
1430 assert (("FATAL: tried to destroy current cctx", cctx != cctx_current));//D temporary?
1431
1432 --cctx_count;
1433 coro_destroy (&cctx->cctx);
1434
1435 /* coro_transfer creates new, empty cctx's */
1436 if (cctx->sptr)
1437 {
1438 #if CORO_USE_VALGRIND
1439 VALGRIND_STACK_DEREGISTER (cctx->valgrind_id);
1440 #endif
1441
1442#if HAVE_MMAP
1443 if (cctx->flags & CC_MAPPED)
1444 munmap (cctx->sptr, cctx->ssize);
1445 else
1446#endif
1447 Safefree (cctx->sptr);
1448 }
1449
1450 Safefree (cctx);
1451}
1452
1453/* wether this cctx should be destructed */
1454#define CCTX_EXPIRED(cctx) ((cctx)->gen != cctx_gen || ((cctx)->flags & CC_NOREUSE))
1455
1456static coro_cctx *
1457cctx_get (pTHX)
1458{
1459 while (expect_true (cctx_first))
1460 {
1461 coro_cctx *cctx = cctx_first;
1462 cctx_first = cctx->next;
1463 --cctx_idle;
1464
1465 if (expect_true (!CCTX_EXPIRED (cctx)))
1466 return cctx;
1467
1468 cctx_destroy (cctx);
1469 }
1470
1471 return cctx_new_run ();
1472}
1473
1474static void
1475cctx_put (coro_cctx *cctx)
1476{
1477 assert (("FATAL: cctx_put called on non-initialised cctx in Coro (please report)", cctx->sptr));
1478
1479 /* free another cctx if overlimit */
1480 if (expect_false (cctx_idle >= cctx_max_idle))
1481 {
1482 coro_cctx *first = cctx_first;
1483 cctx_first = first->next;
1484 --cctx_idle;
1485
1486 cctx_destroy (first);
1487 }
1488
1489 ++cctx_idle;
1490 cctx->next = cctx_first;
1491 cctx_first = cctx;
1492}
1493
1494/** coroutine switching *****************************************************/
1495
1496static void
1497transfer_check (pTHX_ struct coro *prev, struct coro *next)
1498{
1499 /* TODO: throwing up here is considered harmful */
1500
1501 if (expect_true (prev != next))
1502 {
1503 if (expect_false (!(prev->flags & (CF_RUNNING | CF_NEW))))
1504 croak ("Coro::State::transfer called with a blocked prev Coro::State, but can only transfer from running or new states,");
1505
1506 if (expect_false (next->flags & (CF_RUNNING | CF_DESTROYED | CF_SUSPENDED)))
1507 croak ("Coro::State::transfer called with running, destroyed or suspended next Coro::State, but can only transfer to inactive states,");
1508
1509#if !PERL_VERSION_ATLEAST (5,10,0)
1510 if (expect_false (PL_lex_state != LEX_NOTPARSING))
1511 croak ("Coro::State::transfer called while parsing, but this is not supported in your perl version,");
1512#endif
1513 }
1514}
1515
1516/* always use the TRANSFER macro */
1517static void NOINLINE /* noinline so we have a fixed stackframe */
1518transfer (pTHX_ struct coro *prev, struct coro *next, int force_cctx)
1519{
1520 dSTACKLEVEL;
1521
1522 /* sometimes transfer is only called to set idle_sp */
1523 if (expect_false (!prev))
1524 {
1525 cctx_current->idle_sp = STACKLEVEL;
1526 assert (cctx_current->idle_te = PL_top_env); /* just for the side-effect when asserts are enabled */
1527 }
1528 else if (expect_true (prev != next))
1529 {
1530 coro_cctx *cctx_prev;
1531
1532 if (expect_false (prev->flags & CF_NEW))
1533 {
1534 /* create a new empty/source context */
1535 prev->flags &= ~CF_NEW;
1536 prev->flags |= CF_RUNNING;
1537 }
1538
1539 prev->flags &= ~CF_RUNNING;
1540 next->flags |= CF_RUNNING;
1541
1542 /* first get rid of the old state */
1543 save_perl (aTHX_ prev);
1544
1545 if (expect_false (next->flags & CF_NEW))
1546 {
1547 /* need to start coroutine */
1548 next->flags &= ~CF_NEW;
1549 /* setup coroutine call */
1550 coro_setup (aTHX_ next);
1551 }
1552 else
1553 load_perl (aTHX_ next);
1554
1555 /* possibly untie and reuse the cctx */
1556 if (expect_true (
1557 cctx_current->idle_sp == STACKLEVEL
1558 && !(cctx_current->flags & CC_TRACE)
1559 && !force_cctx
1560 ))
1561 {
1562 /* I assume that stacklevel is a stronger indicator than PL_top_env changes */
1563 assert (("FATAL: current top_env must equal previous top_env in Coro (please report)", PL_top_env == cctx_current->idle_te));
1564
1565 /* if the cctx is about to be destroyed we need to make sure we won't see it in cctx_get. */
1566 /* without this the next cctx_get might destroy the running cctx while still in use */
1567 if (expect_false (CCTX_EXPIRED (cctx_current)))
1568 if (expect_true (!next->cctx))
1569 next->cctx = cctx_get (aTHX);
1570
1571 cctx_put (cctx_current);
1572 }
1573 else
1574 prev->cctx = cctx_current;
1575
1576 ++next->usecount;
1577
1578 cctx_prev = cctx_current;
1579 cctx_current = expect_false (next->cctx) ? next->cctx : cctx_get (aTHX);
1580
1581 next->cctx = 0;
1582
1583 if (expect_false (cctx_prev != cctx_current))
1584 {
1585 cctx_prev->top_env = PL_top_env;
1586 PL_top_env = cctx_current->top_env;
1587 coro_transfer (&cctx_prev->cctx, &cctx_current->cctx);
1588 }
1589
1590 transfer_tail (aTHX);
1591 }
1592}
1593
1594#define TRANSFER(ta, force_cctx) transfer (aTHX_ (ta).prev, (ta).next, (force_cctx))
1595#define TRANSFER_CHECK(ta) transfer_check (aTHX_ (ta).prev, (ta).next)
1596
1597/** high level stuff ********************************************************/
1598
1599static int
1600coro_state_destroy (pTHX_ struct coro *coro)
1601{
1602 if (coro->flags & CF_DESTROYED)
1603 return 0;
1604
1605 if (coro->on_destroy && !PL_dirty)
1606 coro->on_destroy (aTHX_ coro);
1607
1608 coro->flags |= CF_DESTROYED;
1609
1610 if (coro->flags & CF_READY)
1611 {
1612 /* reduce nready, as destroying a ready coro effectively unreadies it */
1613 /* alternative: look through all ready queues and remove the coro */
1614 --coro_nready;
1615 }
1616 else
1617 coro->flags |= CF_READY; /* make sure it is NOT put into the readyqueue */
1618
1619 if (coro->mainstack
1620 && coro->mainstack != main_mainstack
1621 && coro->slot
1622 && !PL_dirty)
1623 coro_destruct_perl (aTHX_ coro);
1624
1625 cctx_destroy (coro->cctx);
1626 SvREFCNT_dec (coro->startcv);
1627 SvREFCNT_dec (coro->args);
1628 SvREFCNT_dec (CORO_THROW);
1629
1630 if (coro->next) coro->next->prev = coro->prev;
1631 if (coro->prev) coro->prev->next = coro->next;
1632 if (coro == coro_first) coro_first = coro->next;
1633
1634 return 1;
1635}
1636
1637static int
1638coro_state_free (pTHX_ SV *sv, MAGIC *mg)
1639{
1640 struct coro *coro = (struct coro *)mg->mg_ptr;
1641 mg->mg_ptr = 0;
1642
1643 coro->hv = 0;
1644
1645 if (--coro->refcnt < 0)
1646 {
1647 coro_state_destroy (aTHX_ coro);
1648 Safefree (coro);
1649 }
1650
1651 return 0;
1652}
1653
1654static int
1655coro_state_dup (pTHX_ MAGIC *mg, CLONE_PARAMS *params)
1656{
1657 struct coro *coro = (struct coro *)mg->mg_ptr;
1658
1659 ++coro->refcnt;
1660
1661 return 0;
1662}
1663
1664static MGVTBL coro_state_vtbl = {
1665 0, 0, 0, 0,
1666 coro_state_free,
1667 0,
1668#ifdef MGf_DUP
1669 coro_state_dup,
1670#else
1671# define MGf_DUP 0
1672#endif
1673};
1674
1675static void
1676prepare_transfer (pTHX_ struct coro_transfer_args *ta, SV *prev_sv, SV *next_sv)
1677{
1678 ta->prev = SvSTATE (prev_sv);
1679 ta->next = SvSTATE (next_sv);
1680 TRANSFER_CHECK (*ta);
1681}
1682
1683static void
1684api_transfer (pTHX_ SV *prev_sv, SV *next_sv)
1685{
1686 struct coro_transfer_args ta;
1687
1688 prepare_transfer (aTHX_ &ta, prev_sv, next_sv);
1689 TRANSFER (ta, 1);
1690}
1691
1692/*****************************************************************************/
1693/* gensub: simple closure generation utility */
1694
1695#define GENSUB_ARG CvXSUBANY (cv).any_ptr
1696
1697/* create a closure from XS, returns a code reference */
1698/* the arg can be accessed via GENSUB_ARG from the callback */
1699/* the callback must use dXSARGS/XSRETURN */
1700static SV *
1701gensub (pTHX_ void (*xsub)(pTHX_ CV *), void *arg)
1702{
1703 CV *cv = (CV *)newSV (0);
1704
1705 sv_upgrade ((SV *)cv, SVt_PVCV);
1706
1707 CvANON_on (cv);
1708 CvISXSUB_on (cv);
1709 CvXSUB (cv) = xsub;
1710 GENSUB_ARG = arg;
1711
1712 return newRV_noinc ((SV *)cv);
1713}
1714
1715/** Coro ********************************************************************/
1716
1717INLINE void
1718coro_enq (pTHX_ struct coro *coro)
1719{
1720 struct coro **ready = coro_ready [coro->prio - CORO_PRIO_MIN];
1721
1722 SvREFCNT_inc_NN (coro->hv);
1723
1724 coro->next_ready = 0;
1725 *(ready [0] ? &ready [1]->next_ready : &ready [0]) = coro;
1726 ready [1] = coro;
1727}
1728
1729INLINE struct coro *
1730coro_deq (pTHX)
1731{
1732 int prio;
1733
1734 for (prio = CORO_PRIO_MAX - CORO_PRIO_MIN + 1; --prio >= 0; )
1735 {
1736 struct coro **ready = coro_ready [prio];
1737
1738 if (ready [0])
1739 {
1740 struct coro *coro = ready [0];
1741 ready [0] = coro->next_ready;
1742 return coro;
1743 }
1744 }
1745
1746 return 0;
1747}
1748
1749static int
1750api_ready (pTHX_ SV *coro_sv)
1751{
1752 struct coro *coro;
1753 SV *sv_hook;
1754 void (*xs_hook)(void);
1755
1756 coro = SvSTATE (coro_sv);
1757
1758 if (coro->flags & CF_READY)
1759 return 0;
1760
1761 coro->flags |= CF_READY;
1762
1763 sv_hook = coro_nready ? 0 : coro_readyhook;
1764 xs_hook = coro_nready ? 0 : coroapi.readyhook;
1765
1766 coro_enq (aTHX_ coro);
1767 ++coro_nready;
1768
1769 if (sv_hook)
1770 {
1771 dSP;
1772
1773 ENTER;
1774 SAVETMPS;
1775
1776 PUSHMARK (SP);
1777 PUTBACK;
1778 call_sv (sv_hook, G_VOID | G_DISCARD);
1779
1780 FREETMPS;
1781 LEAVE;
1782 }
1783
1784 if (xs_hook)
1785 xs_hook ();
1786
1787 return 1;
1788}
1789
1790static int
1791api_is_ready (pTHX_ SV *coro_sv)
1792{
1793 return !!(SvSTATE (coro_sv)->flags & CF_READY);
1794}
1795
1796/* expects to own a reference to next->hv */
1797INLINE void
1798prepare_schedule_to (pTHX_ struct coro_transfer_args *ta, struct coro *next)
1799{
1800 SV *prev_sv = SvRV (coro_current);
1801
1802 ta->prev = SvSTATE_hv (prev_sv);
1803 ta->next = next;
1804
1805 TRANSFER_CHECK (*ta);
1806
1807 SvRV_set (coro_current, (SV *)next->hv);
1808
1809 free_coro_mortal (aTHX);
1810 coro_mortal = prev_sv;
1811}
1812
1813static void
1814prepare_schedule (pTHX_ struct coro_transfer_args *ta)
1815{
1816 for (;;)
1817 {
1818 struct coro *next = coro_deq (aTHX);
1819
1820 if (expect_true (next))
1821 {
1822 /* cannot transfer to destroyed coros, skip and look for next */
1823 if (expect_false (next->flags & (CF_DESTROYED | CF_SUSPENDED)))
1824 SvREFCNT_dec (next->hv); /* coro_nready has already been taken care of by destroy */
1825 else
1826 {
1827 next->flags &= ~CF_READY;
1828 --coro_nready;
1829
1830 prepare_schedule_to (aTHX_ ta, next);
1831 break;
1832 }
1833 }
1834 else
1835 {
1836 /* nothing to schedule: call the idle handler */
1837 if (SvROK (sv_idle)
1838 && SvOBJECT (SvRV (sv_idle)))
1839 {
1840 ++coro_nready; /* hack so that api_ready doesn't invoke ready hook */
1841 api_ready (aTHX_ SvRV (sv_idle));
1842 --coro_nready;
1843 }
1844 else
1845 {
1846 dSP;
1847
1848 ENTER;
1849 SAVETMPS;
1850
1851 PUSHMARK (SP);
1852 PUTBACK;
1853 call_sv (sv_idle, G_VOID | G_DISCARD);
1854
1855 FREETMPS;
1856 LEAVE;
1857 }
1858 }
1859 }
1860}
1861
1862INLINE void
1863prepare_cede (pTHX_ struct coro_transfer_args *ta)
1864{
1865 api_ready (aTHX_ coro_current);
1866 prepare_schedule (aTHX_ ta);
1867}
1868
1869INLINE void
1870prepare_cede_notself (pTHX_ struct coro_transfer_args *ta)
1871{
1872 SV *prev = SvRV (coro_current);
1873
1874 if (coro_nready)
1875 {
1876 prepare_schedule (aTHX_ ta);
1877 api_ready (aTHX_ prev);
1878 }
1879 else
1880 prepare_nop (aTHX_ ta);
1881}
1882
1883static void
1884api_schedule (pTHX)
1885{
1886 struct coro_transfer_args ta;
1887
1888 prepare_schedule (aTHX_ &ta);
1889 TRANSFER (ta, 1);
1890}
1891
1892static void
1893api_schedule_to (pTHX_ SV *coro_sv)
1894{
1895 struct coro_transfer_args ta;
1896 struct coro *next = SvSTATE (coro_sv);
1897
1898 SvREFCNT_inc_NN (coro_sv);
1899 prepare_schedule_to (aTHX_ &ta, next);
1900}
1901
1902static int
1903api_cede (pTHX)
1904{
1905 struct coro_transfer_args ta;
1906
1907 prepare_cede (aTHX_ &ta);
1908
1909 if (expect_true (ta.prev != ta.next))
1910 {
1911 TRANSFER (ta, 1);
1912 return 1;
1913 }
1914 else
1915 return 0;
1916}
1917
1918static int
1919api_cede_notself (pTHX)
1920{
1921 if (coro_nready)
1922 {
1923 struct coro_transfer_args ta;
1924
1925 prepare_cede_notself (aTHX_ &ta);
1926 TRANSFER (ta, 1);
1927 return 1;
1928 }
1929 else
1930 return 0;
1931}
1932
1933static void
1934api_trace (pTHX_ SV *coro_sv, int flags)
1935{
1936 struct coro *coro = SvSTATE (coro_sv);
1937
1938 if (coro->flags & CF_RUNNING)
1939 croak ("cannot enable tracing on a running coroutine, caught");
1940
1941 if (flags & CC_TRACE)
1942 {
1943 if (!coro->cctx)
1944 coro->cctx = cctx_new_run ();
1945 else if (!(coro->cctx->flags & CC_TRACE))
1946 croak ("cannot enable tracing on coroutine with custom stack, caught");
1947
1948 coro->cctx->flags |= CC_NOREUSE | (flags & (CC_TRACE | CC_TRACE_ALL));
1949 }
1950 else if (coro->cctx && coro->cctx->flags & CC_TRACE)
1951 {
1952 coro->cctx->flags &= ~(CC_TRACE | CC_TRACE_ALL);
1953
1954 if (coro->flags & CF_RUNNING)
1955 PL_runops = RUNOPS_DEFAULT;
1956 else
1957 coro->slot->runops = RUNOPS_DEFAULT;
1958 }
1959}
1960
1961static void
1962coro_call_on_destroy (pTHX_ struct coro *coro)
1963{
1964 SV **on_destroyp = hv_fetch (coro->hv, "_on_destroy", sizeof ("_on_destroy") - 1, 0);
1965 SV **statusp = hv_fetch (coro->hv, "_status", sizeof ("_status") - 1, 0);
1966
1967 if (on_destroyp)
1968 {
1969 AV *on_destroy = (AV *)SvRV (*on_destroyp);
1970
1971 while (AvFILLp (on_destroy) >= 0)
1972 {
1973 dSP; /* don't disturb outer sp */
1974 SV *cb = av_pop (on_destroy);
1975
1976 PUSHMARK (SP);
1977
1978 if (statusp)
1979 {
1980 int i;
1981 AV *status = (AV *)SvRV (*statusp);
1982 EXTEND (SP, AvFILLp (status) + 1);
1983
1984 for (i = 0; i <= AvFILLp (status); ++i)
1985 PUSHs (AvARRAY (status)[i]);
1986 }
1987
1988 PUTBACK;
1989 call_sv (sv_2mortal (cb), G_VOID | G_DISCARD);
1990 }
1991 }
1992}
1993
1994static void
1995slf_init_terminate (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
1996{
1997 int i;
1998 HV *hv = (HV *)SvRV (coro_current);
1999 AV *av = newAV ();
2000
2001 av_extend (av, items - 1);
2002 for (i = 0; i < items; ++i)
2003 av_push (av, SvREFCNT_inc_NN (arg [i]));
2004
2005 hv_store (hv, "_status", sizeof ("_status") - 1, newRV_noinc ((SV *)av), 0);
2006
2007 av_push (av_destroy, (SV *)newRV_inc ((SV *)hv)); /* RVinc for perl */
2008 api_ready (aTHX_ sv_manager);
2009
2010 frame->prepare = prepare_schedule;
2011 frame->check = slf_check_repeat;
2012
2013 /* as a minor optimisation, we could unwind all stacks here */
2014 /* but that puts extra pressure on pp_slf, and is not worth much */
2015 /*coro_unwind_stacks (aTHX);*/
2016}
2017
2018/*****************************************************************************/
2019/* async pool handler */
2020
2021static int
2022slf_check_pool_handler (pTHX_ struct CoroSLF *frame)
2023{
2024 HV *hv = (HV *)SvRV (coro_current);
2025 struct coro *coro = (struct coro *)frame->data;
2026
2027 if (!coro->invoke_cb)
2028 return 1; /* loop till we have invoke */
2029 else
2030 {
2031 hv_store (hv, "desc", sizeof ("desc") - 1,
2032 newSVpvn ("[async_pool]", sizeof ("[async_pool]") - 1), 0);
2033
2034 coro->saved_deffh = SvREFCNT_inc_NN ((SV *)PL_defoutgv);
2035
2036 {
2037 dSP;
2038 XPUSHs (sv_2mortal (coro->invoke_cb)); coro->invoke_cb = 0;
2039 PUTBACK;
2040 }
2041
2042 SvREFCNT_dec (GvAV (PL_defgv));
2043 GvAV (PL_defgv) = coro->invoke_av;
2044 coro->invoke_av = 0;
2045
2046 return 0;
2047 }
2048}
2049
2050static void
2051slf_init_pool_handler (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2052{
2053 HV *hv = (HV *)SvRV (coro_current);
2054 struct coro *coro = SvSTATE_hv ((SV *)hv);
2055
2056 if (expect_true (coro->saved_deffh))
2057 {
2058 /* subsequent iteration */
2059 SvREFCNT_dec ((SV *)PL_defoutgv); PL_defoutgv = (GV *)coro->saved_deffh;
2060 coro->saved_deffh = 0;
2061
2062 if (coro_rss (aTHX_ coro) > SvUV (sv_pool_rss)
2063 || av_len (av_async_pool) + 1 >= SvIV (sv_pool_size))
2064 {
2065 coro->invoke_cb = SvREFCNT_inc_NN ((SV *)cv_coro_terminate);
2066 coro->invoke_av = newAV ();
2067
2068 frame->prepare = prepare_nop;
2069 }
2070 else
2071 {
2072 av_clear (GvAV (PL_defgv));
2073 hv_store (hv, "desc", sizeof ("desc") - 1, SvREFCNT_inc_NN (sv_async_pool_idle), 0);
2074
2075 coro->prio = 0;
2076
2077 if (coro->cctx && (coro->cctx->flags & CC_TRACE))
2078 api_trace (aTHX_ coro_current, 0);
2079
2080 frame->prepare = prepare_schedule;
2081 av_push (av_async_pool, SvREFCNT_inc (hv));
2082 }
2083 }
2084 else
2085 {
2086 /* first iteration, simply fall through */
2087 frame->prepare = prepare_nop;
2088 }
2089
2090 frame->check = slf_check_pool_handler;
2091 frame->data = (void *)coro;
2092}
2093
2094/*****************************************************************************/
2095/* rouse callback */
2096
2097#define CORO_MAGIC_type_rouse PERL_MAGIC_ext
2098
2099static void
2100coro_rouse_callback (pTHX_ CV *cv)
2101{
2102 dXSARGS;
2103 SV *data = (SV *)GENSUB_ARG;
2104
2105 if (SvTYPE (SvRV (data)) != SVt_PVAV)
2106 {
2107 /* first call, set args */
2108 SV *coro = SvRV (data);
2109 AV *av = newAV ();
2110
2111 SvRV_set (data, (SV *)av);
2112
2113 /* better take a full copy of the arguments */
2114 while (items--)
2115 av_store (av, items, newSVsv (ST (items)));
2116
2117 api_ready (aTHX_ coro);
2118 SvREFCNT_dec (coro);
2119 }
2120
2121 XSRETURN_EMPTY;
2122}
2123
2124static int
2125slf_check_rouse_wait (pTHX_ struct CoroSLF *frame)
2126{
2127 SV *data = (SV *)frame->data;
2128
2129 if (CORO_THROW)
2130 return 0;
2131
2132 if (SvTYPE (SvRV (data)) != SVt_PVAV)
2133 return 1;
2134
2135 /* now push all results on the stack */
2136 {
2137 dSP;
2138 AV *av = (AV *)SvRV (data);
2139 int i;
2140
2141 EXTEND (SP, AvFILLp (av) + 1);
2142 for (i = 0; i <= AvFILLp (av); ++i)
2143 PUSHs (sv_2mortal (AvARRAY (av)[i]));
2144
2145 /* we have stolen the elements, so set length to zero and free */
2146 AvFILLp (av) = -1;
2147 av_undef (av);
2148
2149 PUTBACK;
2150 }
2151
2152 return 0;
2153}
2154
2155static void
2156slf_init_rouse_wait (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2157{
2158 SV *cb;
2159
2160 if (items)
2161 cb = arg [0];
2162 else
2163 {
2164 struct coro *coro = SvSTATE_current;
2165
2166 if (!coro->rouse_cb)
2167 croak ("Coro::rouse_wait called without rouse callback, and no default rouse callback found either,");
2168
2169 cb = sv_2mortal (coro->rouse_cb);
2170 coro->rouse_cb = 0;
2171 }
2172
2173 if (!SvROK (cb)
2174 || SvTYPE (SvRV (cb)) != SVt_PVCV
2175 || CvXSUB ((CV *)SvRV (cb)) != coro_rouse_callback)
2176 croak ("Coro::rouse_wait called with illegal callback argument,");
2177
2178 {
2179 CV *cv = (CV *)SvRV (cb); /* for GENSUB_ARG */
2180 SV *data = (SV *)GENSUB_ARG;
2181
2182 frame->data = (void *)data;
2183 frame->prepare = SvTYPE (SvRV (data)) == SVt_PVAV ? prepare_nop : prepare_schedule;
2184 frame->check = slf_check_rouse_wait;
2185 }
2186}
2187
2188static SV *
2189coro_new_rouse_cb (pTHX)
2190{
2191 HV *hv = (HV *)SvRV (coro_current);
2192 struct coro *coro = SvSTATE_hv (hv);
2193 SV *data = newRV_inc ((SV *)hv);
2194 SV *cb = gensub (aTHX_ coro_rouse_callback, (void *)data);
2195
2196 sv_magicext (SvRV (cb), data, CORO_MAGIC_type_rouse, 0, 0, 0);
2197 SvREFCNT_dec (data); /* magicext increases the refcount */
2198
2199 SvREFCNT_dec (coro->rouse_cb);
2200 coro->rouse_cb = SvREFCNT_inc_NN (cb);
2201
2202 return cb;
2203}
2204
2205/*****************************************************************************/
2206/* schedule-like-function opcode (SLF) */
2207
2208static UNOP slf_restore; /* restore stack as entersub did, for first-re-run */
2209static const CV *slf_cv;
2210static SV **slf_argv;
2211static int slf_argc, slf_arga; /* count, allocated */
2212static I32 slf_ax; /* top of stack, for restore */
2213
2214/* this restores the stack in the case we patched the entersub, to */
2215/* recreate the stack frame as perl will on following calls */
2216/* since entersub cleared the stack */
2217static OP *
2218pp_restore (pTHX)
2219{
2220 int i;
2221 SV **SP = PL_stack_base + slf_ax;
2222
2223 PUSHMARK (SP);
2224
2225 EXTEND (SP, slf_argc + 1);
2226
2227 for (i = 0; i < slf_argc; ++i)
2228 PUSHs (sv_2mortal (slf_argv [i]));
2229
2230 PUSHs ((SV *)CvGV (slf_cv));
2231
2232 RETURNOP (slf_restore.op_first);
2233}
2234
2235static void
2236slf_prepare_transfer (pTHX_ struct coro_transfer_args *ta)
2237{
2238 SV **arg = (SV **)slf_frame.data;
2239
2240 prepare_transfer (aTHX_ ta, arg [0], arg [1]);
2241}
2242
2243static void
2244slf_init_transfer (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2245{
2246 if (items != 2)
2247 croak ("Coro::State::transfer (prev, next) expects two arguments, not %d,", items);
2248
2249 frame->prepare = slf_prepare_transfer;
2250 frame->check = slf_check_nop;
2251 frame->data = (void *)arg; /* let's hope it will stay valid */
2252}
2253
2254static void
2255slf_init_schedule (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2256{
2257 frame->prepare = prepare_schedule;
2258 frame->check = slf_check_nop;
2259}
2260
2261static void
2262slf_prepare_schedule_to (pTHX_ struct coro_transfer_args *ta)
2263{
2264 struct coro *next = (struct coro *)slf_frame.data;
2265
2266 SvREFCNT_inc_NN (next->hv);
2267 prepare_schedule_to (aTHX_ ta, next);
2268}
2269
2270static void
2271slf_init_schedule_to (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2272{
2273 if (!items)
2274 croak ("Coro::schedule_to expects a coroutine argument, caught");
2275
2276 frame->data = (void *)SvSTATE (arg [0]);
2277 frame->prepare = slf_prepare_schedule_to;
2278 frame->check = slf_check_nop;
2279}
2280
2281static void
2282slf_init_cede_to (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2283{
2284 api_ready (aTHX_ SvRV (coro_current));
2285
2286 slf_init_schedule_to (aTHX_ frame, cv, arg, items);
2287}
2288
2289static void
2290slf_init_cede (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2291{
2292 frame->prepare = prepare_cede;
2293 frame->check = slf_check_nop;
2294}
2295
2296static void
2297slf_init_cede_notself (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2298{
2299 frame->prepare = prepare_cede_notself;
2300 frame->check = slf_check_nop;
2301}
2302
2303/*
2304 * these not obviously related functions are all rolled into one
2305 * function to increase chances that they all will call transfer with the same
2306 * stack offset
2307 * SLF stands for "schedule-like-function".
2308 */
2309static OP *
2310pp_slf (pTHX)
2311{
2312 I32 checkmark; /* mark SP to see how many elements check has pushed */
2313
2314 /* set up the slf frame, unless it has already been set-up */
2315 /* the latter happens when a new coro has been started */
2316 /* or when a new cctx was attached to an existing coroutine */
2317 if (expect_true (!slf_frame.prepare))
2318 {
2319 /* first iteration */
2320 dSP;
2321 SV **arg = PL_stack_base + TOPMARK + 1;
2322 int items = SP - arg; /* args without function object */
2323 SV *gv = *sp;
2324
2325 /* do a quick consistency check on the "function" object, and if it isn't */
2326 /* for us, divert to the real entersub */
2327 if (SvTYPE (gv) != SVt_PVGV
2328 || !GvCV (gv)
2329 || !(CvFLAGS (GvCV (gv)) & CVf_SLF))
2330 return PL_ppaddr[OP_ENTERSUB](aTHX);
2331
2332 if (!(PL_op->op_flags & OPf_STACKED))
2333 {
2334 /* ampersand-form of call, use @_ instead of stack */
2335 AV *av = GvAV (PL_defgv);
2336 arg = AvARRAY (av);
2337 items = AvFILLp (av) + 1;
2338 }
2339
2340 /* now call the init function, which needs to set up slf_frame */
2341 ((coro_slf_cb)CvXSUBANY (GvCV (gv)).any_ptr)
2342 (aTHX_ &slf_frame, GvCV (gv), arg, items);
2343
2344 /* pop args */
2345 SP = PL_stack_base + POPMARK;
2346
2347 PUTBACK;
2348 }
2349
2350 /* now that we have a slf_frame, interpret it! */
2351 /* we use a callback system not to make the code needlessly */
2352 /* complicated, but so we can run multiple perl coros from one cctx */
2353
2354 do
2355 {
2356 struct coro_transfer_args ta;
2357
2358 slf_frame.prepare (aTHX_ &ta);
2359 TRANSFER (ta, 0);
2360
2361 checkmark = PL_stack_sp - PL_stack_base;
2362 }
2363 while (slf_frame.check (aTHX_ &slf_frame));
2364
2365 slf_frame.prepare = 0; /* invalidate the frame, we are done processing it */
2366
2367 /* exception handling */
2368 if (expect_false (CORO_THROW))
2369 {
2370 SV *exception = sv_2mortal (CORO_THROW);
2371
2372 CORO_THROW = 0;
2373 sv_setsv (ERRSV, exception);
2374 croak (0);
2375 }
2376
2377 /* return value handling - mostly like entersub */
2378 /* make sure we put something on the stack in scalar context */
2379 if (GIMME_V == G_SCALAR)
2380 {
2381 dSP;
2382 SV **bot = PL_stack_base + checkmark;
2383
2384 if (sp == bot) /* too few, push undef */
2385 bot [1] = &PL_sv_undef;
2386 else if (sp != bot + 1) /* too many, take last one */
2387 bot [1] = *sp;
2388
2389 SP = bot + 1;
2390
2391 PUTBACK;
2392 }
2393
2394 return NORMAL;
2395}
2396
2397static void
2398api_execute_slf (pTHX_ CV *cv, coro_slf_cb init_cb, I32 ax)
2399{
2400 int i;
2401 SV **arg = PL_stack_base + ax;
2402 int items = PL_stack_sp - arg + 1;
2403
2404 assert (("FATAL: SLF call with illegal CV value", !CvANON (cv)));
2405
2406 if (PL_op->op_ppaddr != PL_ppaddr [OP_ENTERSUB]
2407 && PL_op->op_ppaddr != pp_slf)
2408 croak ("FATAL: Coro SLF calls can only be made normally, not via goto or any other means, caught");
2409
2410 CvFLAGS (cv) |= CVf_SLF;
2411 CvXSUBANY (cv).any_ptr = (void *)init_cb;
2412 slf_cv = cv;
2413
2414 /* we patch the op, and then re-run the whole call */
2415 /* we have to put the same argument on the stack for this to work */
2416 /* and this will be done by pp_restore */
2417 slf_restore.op_next = (OP *)&slf_restore;
2418 slf_restore.op_type = OP_CUSTOM;
2419 slf_restore.op_ppaddr = pp_restore;
2420 slf_restore.op_first = PL_op;
2421
2422 slf_ax = ax - 1; /* undo the ax++ inside dAXMARK */
2423
2424 if (PL_op->op_flags & OPf_STACKED)
2425 {
2426 if (items > slf_arga)
2427 {
2428 slf_arga = items;
2429 free (slf_argv);
2430 slf_argv = malloc (slf_arga * sizeof (SV *));
2431 }
2432
2433 slf_argc = items;
2434
2435 for (i = 0; i < items; ++i)
2436 slf_argv [i] = SvREFCNT_inc (arg [i]);
2437 }
2438 else
2439 slf_argc = 0;
2440
2441 PL_op->op_ppaddr = pp_slf;
2442 /*PL_op->op_type = OP_CUSTOM; /* we do behave like entersub still */
2443
2444 PL_op = (OP *)&slf_restore;
2445}
2446
2447/*****************************************************************************/
2448/* dynamic wind */
2449
2450static void
2451on_enterleave_call (pTHX_ SV *cb)
2452{
2453 dSP;
2454
2455 PUSHSTACK;
2456
2457 PUSHMARK (SP);
2458 PUTBACK;
2459 call_sv (cb, G_VOID | G_DISCARD);
2460 SPAGAIN;
2461
2462 POPSTACK;
2463}
2464
2465static SV *
2466coro_avp_pop_and_free (pTHX_ AV **avp)
2467{
2468 AV *av = *avp;
2469 SV *res = av_pop (av);
2470
2471 if (AvFILLp (av) < 0)
2472 {
2473 *avp = 0;
2474 SvREFCNT_dec (av);
2475 }
2476
2477 return res;
2478}
2479
2480static void
2481coro_pop_on_enter (pTHX_ void *coro)
2482{
2483 SV *cb = coro_avp_pop_and_free (aTHX_ &((struct coro *)coro)->on_enter);
2484 SvREFCNT_dec (cb);
2485}
2486
2487static void
2488coro_pop_on_leave (pTHX_ void *coro)
2489{
2490 SV *cb = coro_avp_pop_and_free (aTHX_ &((struct coro *)coro)->on_leave);
2491 on_enterleave_call (aTHX_ sv_2mortal (cb));
2492}
2493
2494/*****************************************************************************/
2495/* PerlIO::cede */
2496
2497typedef struct
2498{
2499 PerlIOBuf base;
2500 NV next, every;
2501} PerlIOCede;
2502
2503static IV
2504PerlIOCede_pushed (pTHX_ PerlIO *f, const char *mode, SV *arg, PerlIO_funcs *tab)
2505{
2506 PerlIOCede *self = PerlIOSelf (f, PerlIOCede);
2507
2508 self->every = SvCUR (arg) ? SvNV (arg) : 0.01;
2509 self->next = nvtime () + self->every;
2510
2511 return PerlIOBuf_pushed (aTHX_ f, mode, Nullsv, tab);
2512}
2513
2514static SV *
2515PerlIOCede_getarg (pTHX_ PerlIO *f, CLONE_PARAMS *param, int flags)
2516{
2517 PerlIOCede *self = PerlIOSelf (f, PerlIOCede);
2518
2519 return newSVnv (self->every);
2520}
2521
2522static IV
2523PerlIOCede_flush (pTHX_ PerlIO *f)
2524{
2525 PerlIOCede *self = PerlIOSelf (f, PerlIOCede);
2526 double now = nvtime ();
2527
2528 if (now >= self->next)
2529 {
2530 api_cede (aTHX);
2531 self->next = now + self->every;
2532 }
2533
2534 return PerlIOBuf_flush (aTHX_ f);
2535}
2536
2537static PerlIO_funcs PerlIO_cede =
2538{
2539 sizeof(PerlIO_funcs),
2540 "cede",
2541 sizeof(PerlIOCede),
2542 PERLIO_K_DESTRUCT | PERLIO_K_RAW,
2543 PerlIOCede_pushed,
2544 PerlIOBuf_popped,
2545 PerlIOBuf_open,
2546 PerlIOBase_binmode,
2547 PerlIOCede_getarg,
2548 PerlIOBase_fileno,
2549 PerlIOBuf_dup,
2550 PerlIOBuf_read,
2551 PerlIOBuf_unread,
2552 PerlIOBuf_write,
2553 PerlIOBuf_seek,
2554 PerlIOBuf_tell,
2555 PerlIOBuf_close,
2556 PerlIOCede_flush,
2557 PerlIOBuf_fill,
2558 PerlIOBase_eof,
2559 PerlIOBase_error,
2560 PerlIOBase_clearerr,
2561 PerlIOBase_setlinebuf,
2562 PerlIOBuf_get_base,
2563 PerlIOBuf_bufsiz,
2564 PerlIOBuf_get_ptr,
2565 PerlIOBuf_get_cnt,
2566 PerlIOBuf_set_ptrcnt,
2567};
2568
2569/*****************************************************************************/
2570/* Coro::Semaphore & Coro::Signal */
2571
2572static SV *
2573coro_waitarray_new (pTHX_ int count)
2574{
2575 /* a waitarray=semaphore contains a counter IV in $sem->[0] and any waiters after that */
2576 AV *av = newAV ();
2577 SV **ary;
2578
2579 /* unfortunately, building manually saves memory */
2580 Newx (ary, 2, SV *);
2581 AvALLOC (av) = ary;
2582#if PERL_VERSION_ATLEAST (5,10,0)
2583 AvARRAY (av) = ary;
2584#else
2585 /* 5.8.8 needs this syntax instead of AvARRAY = ary, yet */
2586 /* -DDEBUGGING flags this as a bug, despite it perfectly working */
2587 SvPVX ((SV *)av) = (char *)ary;
2588#endif
2589 AvMAX (av) = 1;
2590 AvFILLp (av) = 0;
2591 ary [0] = newSViv (count);
2592
2593 return newRV_noinc ((SV *)av);
2594}
2595
2596/* semaphore */
2597
2598static void
2599coro_semaphore_adjust (pTHX_ AV *av, IV adjust)
2600{
2601 SV *count_sv = AvARRAY (av)[0];
2602 IV count = SvIVX (count_sv);
2603
2604 count += adjust;
2605 SvIVX (count_sv) = count;
2606
2607 /* now wake up as many waiters as are expected to lock */
2608 while (count > 0 && AvFILLp (av) > 0)
2609 {
2610 SV *cb;
2611
2612 /* swap first two elements so we can shift a waiter */
2613 AvARRAY (av)[0] = AvARRAY (av)[1];
2614 AvARRAY (av)[1] = count_sv;
2615 cb = av_shift (av);
2616
2617 if (SvOBJECT (cb))
2618 {
2619 api_ready (aTHX_ cb);
2620 --count;
2621 }
2622 else if (SvTYPE (cb) == SVt_PVCV)
2623 {
2624 dSP;
2625 PUSHMARK (SP);
2626 XPUSHs (sv_2mortal (newRV_inc ((SV *)av)));
2627 PUTBACK;
2628 call_sv (cb, G_VOID | G_DISCARD | G_EVAL | G_KEEPERR);
2629 }
2630
2631 SvREFCNT_dec (cb);
2632 }
2633}
2634
2635static void
2636coro_semaphore_on_destroy (pTHX_ struct coro *coro)
2637{
2638 /* call $sem->adjust (0) to possibly wake up some other waiters */
2639 coro_semaphore_adjust (aTHX_ (AV *)coro->slf_frame.data, 0);
2640}
2641
2642static int
2643slf_check_semaphore_down_or_wait (pTHX_ struct CoroSLF *frame, int acquire)
2644{
2645 AV *av = (AV *)frame->data;
2646 SV *count_sv = AvARRAY (av)[0];
2647
2648 /* if we are about to throw, don't actually acquire the lock, just throw */
2649 if (CORO_THROW)
2650 return 0;
2651 else if (SvIVX (count_sv) > 0)
2652 {
2653 SvSTATE_current->on_destroy = 0;
2654
2655 if (acquire)
2656 SvIVX (count_sv) = SvIVX (count_sv) - 1;
2657 else
2658 coro_semaphore_adjust (aTHX_ av, 0);
2659
2660 return 0;
2661 }
2662 else
2663 {
2664 int i;
2665 /* if we were woken up but can't down, we look through the whole */
2666 /* waiters list and only add us if we aren't in there already */
2667 /* this avoids some degenerate memory usage cases */
2668
2669 for (i = 1; i <= AvFILLp (av); ++i)
2670 if (AvARRAY (av)[i] == SvRV (coro_current))
2671 return 1;
2672
2673 av_push (av, SvREFCNT_inc (SvRV (coro_current)));
2674 return 1;
2675 }
2676}
2677
2678static int
2679slf_check_semaphore_down (pTHX_ struct CoroSLF *frame)
2680{
2681 return slf_check_semaphore_down_or_wait (aTHX_ frame, 1);
2682}
2683
2684static int
2685slf_check_semaphore_wait (pTHX_ struct CoroSLF *frame)
2686{
2687 return slf_check_semaphore_down_or_wait (aTHX_ frame, 0);
2688}
2689
2690static void
2691slf_init_semaphore_down_or_wait (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2692{
2693 AV *av = (AV *)SvRV (arg [0]);
2694
2695 if (SvIVX (AvARRAY (av)[0]) > 0)
2696 {
2697 frame->data = (void *)av;
2698 frame->prepare = prepare_nop;
2699 }
2700 else
2701 {
2702 av_push (av, SvREFCNT_inc (SvRV (coro_current)));
2703
2704 frame->data = (void *)sv_2mortal (SvREFCNT_inc ((SV *)av));
2705 frame->prepare = prepare_schedule;
2706
2707 /* to avoid race conditions when a woken-up coro gets terminated */
2708 /* we arrange for a temporary on_destroy that calls adjust (0) */
2709 SvSTATE_current->on_destroy = coro_semaphore_on_destroy;
2710 }
2711}
2712
2713static void
2714slf_init_semaphore_down (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2715{
2716 slf_init_semaphore_down_or_wait (aTHX_ frame, cv, arg, items);
2717 frame->check = slf_check_semaphore_down;
2718}
2719
2720static void
2721slf_init_semaphore_wait (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2722{
2723 if (items >= 2)
2724 {
2725 /* callback form */
2726 AV *av = (AV *)SvRV (arg [0]);
2727 CV *cb_cv = coro_sv_2cv (aTHX_ arg [1]);
2728
2729 av_push (av, SvREFCNT_inc_NN (cb_cv));
2730
2731 if (SvIVX (AvARRAY (av)[0]) > 0)
2732 coro_semaphore_adjust (aTHX_ av, 0);
2733
2734 frame->prepare = prepare_nop;
2735 frame->check = slf_check_nop;
2736 }
2737 else
2738 {
2739 slf_init_semaphore_down_or_wait (aTHX_ frame, cv, arg, items);
2740 frame->check = slf_check_semaphore_wait;
2741 }
2742}
2743
2744/* signal */
2745
2746static void
2747coro_signal_wake (pTHX_ AV *av, int count)
2748{
2749 SvIVX (AvARRAY (av)[0]) = 0;
2750
2751 /* now signal count waiters */
2752 while (count > 0 && AvFILLp (av) > 0)
2753 {
2754 SV *cb;
2755
2756 /* swap first two elements so we can shift a waiter */
2757 cb = AvARRAY (av)[0];
2758 AvARRAY (av)[0] = AvARRAY (av)[1];
2759 AvARRAY (av)[1] = cb;
2760
2761 cb = av_shift (av);
2762
2763 if (SvTYPE (cb) == SVt_PVCV)
2764 {
2765 dSP;
2766 PUSHMARK (SP);
2767 XPUSHs (sv_2mortal (newRV_inc ((SV *)av)));
2768 PUTBACK;
2769 call_sv (cb, G_VOID | G_DISCARD | G_EVAL | G_KEEPERR);
2770 }
2771 else
2772 {
2773 api_ready (aTHX_ cb);
2774 sv_setiv (cb, 0); /* signal waiter */
2775 }
2776
2777 SvREFCNT_dec (cb);
2778
2779 --count;
2780 }
2781}
2782
2783static int
2784slf_check_signal_wait (pTHX_ struct CoroSLF *frame)
2785{
2786 /* if we are about to throw, also stop waiting */
2787 return SvROK ((SV *)frame->data) && !CORO_THROW;
2788}
2789
2790static void
2791slf_init_signal_wait (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2792{
2793 AV *av = (AV *)SvRV (arg [0]);
2794
2795 if (items >= 2)
2796 {
2797 CV *cb_cv = coro_sv_2cv (aTHX_ arg [1]);
2798 av_push (av, SvREFCNT_inc_NN (cb_cv));
2799
2800 if (SvIVX (AvARRAY (av)[0]))
2801 coro_signal_wake (aTHX_ av, 1); /* ust be the only waiter */
2802
2803 frame->prepare = prepare_nop;
2804 frame->check = slf_check_nop;
2805 }
2806 else if (SvIVX (AvARRAY (av)[0]))
2807 {
2808 SvIVX (AvARRAY (av)[0]) = 0;
2809 frame->prepare = prepare_nop;
2810 frame->check = slf_check_nop;
2811 }
2812 else
2813 {
2814 SV *waiter = newSVsv (coro_current); /* owned by signal av */
2815
2816 av_push (av, waiter);
2817
2818 frame->data = (void *)sv_2mortal (SvREFCNT_inc_NN (waiter)); /* owned by process */
2819 frame->prepare = prepare_schedule;
2820 frame->check = slf_check_signal_wait;
2821 }
2822}
2823
2824/*****************************************************************************/
2825/* Coro::AIO */
2826
2827#define CORO_MAGIC_type_aio PERL_MAGIC_ext
2828
2829/* helper storage struct */
2830struct io_state
2831{
2832 int errorno;
2833 I32 laststype; /* U16 in 5.10.0 */
2834 int laststatval;
2835 Stat_t statcache;
2836};
2837
2838static void
2839coro_aio_callback (pTHX_ CV *cv)
2840{
2841 dXSARGS;
2842 AV *state = (AV *)GENSUB_ARG;
2843 SV *coro = av_pop (state);
2844 SV *data_sv = newSV (sizeof (struct io_state));
2845
2846 av_extend (state, items - 1);
2847
2848 sv_upgrade (data_sv, SVt_PV);
2849 SvCUR_set (data_sv, sizeof (struct io_state));
2850 SvPOK_only (data_sv);
2851
2852 {
2853 struct io_state *data = (struct io_state *)SvPVX (data_sv);
2854
2855 data->errorno = errno;
2856 data->laststype = PL_laststype;
2857 data->laststatval = PL_laststatval;
2858 data->statcache = PL_statcache;
2859 }
2860
2861 /* now build the result vector out of all the parameters and the data_sv */
2862 {
2863 int i;
2864
2865 for (i = 0; i < items; ++i)
2866 av_push (state, SvREFCNT_inc_NN (ST (i)));
2867 }
2868
2869 av_push (state, data_sv);
2870
2871 api_ready (aTHX_ coro);
2872 SvREFCNT_dec (coro);
2873 SvREFCNT_dec ((AV *)state);
2874}
2875
2876static int
2877slf_check_aio_req (pTHX_ struct CoroSLF *frame)
2878{
2879 AV *state = (AV *)frame->data;
2880
2881 /* if we are about to throw, return early */
2882 /* this does not cancel the aio request, but at least */
2883 /* it quickly returns */
2884 if (CORO_THROW)
2885 return 0;
2886
2887 /* one element that is an RV? repeat! */
2888 if (AvFILLp (state) == 0 && SvROK (AvARRAY (state)[0]))
2889 return 1;
2890
2891 /* restore status */
2892 {
2893 SV *data_sv = av_pop (state);
2894 struct io_state *data = (struct io_state *)SvPVX (data_sv);
2895
2896 errno = data->errorno;
2897 PL_laststype = data->laststype;
2898 PL_laststatval = data->laststatval;
2899 PL_statcache = data->statcache;
2900
2901 SvREFCNT_dec (data_sv);
2902 }
2903
2904 /* push result values */
2905 {
2906 dSP;
2907 int i;
2908
2909 EXTEND (SP, AvFILLp (state) + 1);
2910 for (i = 0; i <= AvFILLp (state); ++i)
2911 PUSHs (sv_2mortal (SvREFCNT_inc_NN (AvARRAY (state)[i])));
2912
2913 PUTBACK;
2914 }
2915
2916 return 0;
2917}
2918
2919static void
2920slf_init_aio_req (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2921{
2922 AV *state = (AV *)sv_2mortal ((SV *)newAV ());
2923 SV *coro_hv = SvRV (coro_current);
2924 struct coro *coro = SvSTATE_hv (coro_hv);
2925
2926 /* put our coroutine id on the state arg */
2927 av_push (state, SvREFCNT_inc_NN (coro_hv));
2928
2929 /* first see whether we have a non-zero priority and set it as AIO prio */
2930 if (coro->prio)
2931 {
2932 dSP;
2933
2934 static SV *prio_cv;
2935 static SV *prio_sv;
2936
2937 if (expect_false (!prio_cv))
2938 {
2939 prio_cv = (SV *)get_cv ("IO::AIO::aioreq_pri", 0);
2940 prio_sv = newSViv (0);
2941 }
2942
2943 PUSHMARK (SP);
2944 sv_setiv (prio_sv, coro->prio);
2945 XPUSHs (prio_sv);
2946
2947 PUTBACK;
2948 call_sv (prio_cv, G_VOID | G_DISCARD);
2949 }
2950
2951 /* now call the original request */
2952 {
2953 dSP;
2954 CV *req = (CV *)CORO_MAGIC_NN ((SV *)cv, CORO_MAGIC_type_aio)->mg_obj;
2955 int i;
2956
2957 PUSHMARK (SP);
2958
2959 /* first push all args to the stack */
2960 EXTEND (SP, items + 1);
2961
2962 for (i = 0; i < items; ++i)
2963 PUSHs (arg [i]);
2964
2965 /* now push the callback closure */
2966 PUSHs (sv_2mortal (gensub (aTHX_ coro_aio_callback, (void *)SvREFCNT_inc_NN ((SV *)state))));
2967
2968 /* now call the AIO function - we assume our request is uncancelable */
2969 PUTBACK;
2970 call_sv ((SV *)req, G_VOID | G_DISCARD);
2971 }
2972
2973 /* now that the requets is going, we loop toll we have a result */
2974 frame->data = (void *)state;
2975 frame->prepare = prepare_schedule;
2976 frame->check = slf_check_aio_req;
2977}
2978
2979static void
2980coro_aio_req_xs (pTHX_ CV *cv)
2981{
2982 dXSARGS;
2983
2984 CORO_EXECUTE_SLF_XS (slf_init_aio_req);
2985
2986 XSRETURN_EMPTY;
2987}
2988
2989/*****************************************************************************/
2990
2991#if CORO_CLONE
2992# include "clone.c"
2993#endif
2994
2995MODULE = Coro::State PACKAGE = Coro::State PREFIX = api_
2996
2997PROTOTYPES: DISABLE
2998
2999BOOT:
3000{
3001#ifdef USE_ITHREADS
3002# if CORO_PTHREAD
3003 coro_thx = PERL_GET_CONTEXT;
3004# endif
3005#endif
3006 BOOT_PAGESIZE;
3007
3008 cctx_current = cctx_new_empty ();
3009
3010 irsgv = gv_fetchpv ("/" , GV_ADD|GV_NOTQUAL, SVt_PV);
3011 stdoutgv = gv_fetchpv ("STDOUT", GV_ADD|GV_NOTQUAL, SVt_PVIO);
3012
3013 orig_sigelem_get = PL_vtbl_sigelem.svt_get; PL_vtbl_sigelem.svt_get = coro_sigelem_get;
3014 orig_sigelem_set = PL_vtbl_sigelem.svt_set; PL_vtbl_sigelem.svt_set = coro_sigelem_set;
3015 orig_sigelem_clr = PL_vtbl_sigelem.svt_clear; PL_vtbl_sigelem.svt_clear = coro_sigelem_clr;
3016
3017 hv_sig = coro_get_hv (aTHX_ "SIG", TRUE);
3018 rv_diehook = newRV_inc ((SV *)gv_fetchpv ("Coro::State::diehook" , 0, SVt_PVCV));
3019 rv_warnhook = newRV_inc ((SV *)gv_fetchpv ("Coro::State::warnhook", 0, SVt_PVCV));
3020
3021 coro_state_stash = gv_stashpv ("Coro::State", TRUE);
3022
3023 newCONSTSUB (coro_state_stash, "CC_TRACE" , newSViv (CC_TRACE));
3024 newCONSTSUB (coro_state_stash, "CC_TRACE_SUB" , newSViv (CC_TRACE_SUB));
3025 newCONSTSUB (coro_state_stash, "CC_TRACE_LINE", newSViv (CC_TRACE_LINE));
3026 newCONSTSUB (coro_state_stash, "CC_TRACE_ALL" , newSViv (CC_TRACE_ALL));
3027
3028 main_mainstack = PL_mainstack;
3029 main_top_env = PL_top_env;
3030
3031 while (main_top_env->je_prev)
3032 main_top_env = main_top_env->je_prev;
3033
3034 {
3035 SV *slf = sv_2mortal (newSViv (PTR2IV (pp_slf)));
3036
3037 if (!PL_custom_op_names) PL_custom_op_names = newHV ();
3038 hv_store_ent (PL_custom_op_names, slf, newSVpv ("coro_slf", 0), 0);
3039
3040 if (!PL_custom_op_descs) PL_custom_op_descs = newHV ();
3041 hv_store_ent (PL_custom_op_descs, slf, newSVpv ("coro schedule like function", 0), 0);
3042 }
3043
3044 coroapi.ver = CORO_API_VERSION;
3045 coroapi.rev = CORO_API_REVISION;
3046
3047 coroapi.transfer = api_transfer;
3048
3049 coroapi.sv_state = SvSTATE_;
3050 coroapi.execute_slf = api_execute_slf;
3051 coroapi.prepare_nop = prepare_nop;
3052 coroapi.prepare_schedule = prepare_schedule;
3053 coroapi.prepare_cede = prepare_cede;
3054 coroapi.prepare_cede_notself = prepare_cede_notself;
3055
3056 {
3057 SV **svp = hv_fetch (PL_modglobal, "Time::NVtime", 12, 0);
3058
3059 if (!svp) croak ("Time::HiRes is required");
3060 if (!SvIOK (*svp)) croak ("Time::NVtime isn't a function pointer");
3061
3062 nvtime = INT2PTR (double (*)(), SvIV (*svp));
3063
3064 svp = hv_fetch (PL_modglobal, "Time::U2time", 12, 0);
3065 u2time = INT2PTR (double (*)(), SvIV (*svp));
3066 }
3067
3068 assert (("PRIO_NORMAL must be 0", !CORO_PRIO_NORMAL));
3069}
3070
3071SV *
3072new (char *klass, ...)
3073 ALIAS:
3074 Coro::new = 1
3075 CODE:
3076{
3077 struct coro *coro;
3078 MAGIC *mg;
3079 HV *hv;
3080 CV *cb;
3081 int i;
3082
3083 if (items > 1)
3084 {
3085 cb = coro_sv_2cv (aTHX_ ST (1));
3086
3087 if (!ix)
215 { 3088 {
216 /* I never used formats, so how should I know how these are implemented? */ 3089 if (CvISXSUB (cb))
217 /* my bold guess is as a simple, plain sub... */ 3090 croak ("Coro::State doesn't support XS functions as coroutine start, caught");
218 croak ("CXt_FORMAT not yet handled. Don't switch coroutines from within formats"); 3091
3092 if (!CvROOT (cb))
3093 croak ("Coro::State doesn't support autoloaded or undefined functions as coroutine start, caught");
219 } 3094 }
220 } 3095 }
221 3096
222 if (top_si->si_type == PERLSI_MAIN)
223 break;
224
225 top_si = top_si->si_prev;
226 ccstk = top_si->si_cxstack;
227 cxix = top_si->si_cxix;
228 }
229
230 PUTBACK;
231 }
232
233 c->dowarn = PL_dowarn;
234 c->defav = GvAV (PL_defgv);
235 c->curstackinfo = PL_curstackinfo;
236 c->curstack = PL_curstack;
237 c->mainstack = PL_mainstack;
238 c->stack_sp = PL_stack_sp;
239 c->op = PL_op;
240 c->curpad = PL_curpad;
241 c->stack_base = PL_stack_base;
242 c->stack_max = PL_stack_max;
243 c->tmps_stack = PL_tmps_stack;
244 c->tmps_floor = PL_tmps_floor;
245 c->tmps_ix = PL_tmps_ix;
246 c->tmps_max = PL_tmps_max;
247 c->markstack = PL_markstack;
248 c->markstack_ptr = PL_markstack_ptr;
249 c->markstack_max = PL_markstack_max;
250 c->scopestack = PL_scopestack;
251 c->scopestack_ix = PL_scopestack_ix;
252 c->scopestack_max = PL_scopestack_max;
253 c->savestack = PL_savestack;
254 c->savestack_ix = PL_savestack_ix;
255 c->savestack_max = PL_savestack_max;
256 c->retstack = PL_retstack;
257 c->retstack_ix = PL_retstack_ix;
258 c->retstack_max = PL_retstack_max;
259 c->curcop = PL_curcop;
260}
261
262static void
263LOAD(pTHX_ Coro__State c)
264{
265 PL_dowarn = c->dowarn;
266 GvAV (PL_defgv) = c->defav;
267 PL_curstackinfo = c->curstackinfo;
268 PL_curstack = c->curstack;
269 PL_mainstack = c->mainstack;
270 PL_stack_sp = c->stack_sp;
271 PL_op = c->op;
272 PL_curpad = c->curpad;
273 PL_stack_base = c->stack_base;
274 PL_stack_max = c->stack_max;
275 PL_tmps_stack = c->tmps_stack;
276 PL_tmps_floor = c->tmps_floor;
277 PL_tmps_ix = c->tmps_ix;
278 PL_tmps_max = c->tmps_max;
279 PL_markstack = c->markstack;
280 PL_markstack_ptr = c->markstack_ptr;
281 PL_markstack_max = c->markstack_max;
282 PL_scopestack = c->scopestack;
283 PL_scopestack_ix = c->scopestack_ix;
284 PL_scopestack_max = c->scopestack_max;
285 PL_savestack = c->savestack;
286 PL_savestack_ix = c->savestack_ix;
287 PL_savestack_max = c->savestack_max;
288 PL_retstack = c->retstack;
289 PL_retstack_ix = c->retstack_ix;
290 PL_retstack_max = c->retstack_max;
291 PL_curcop = c->curcop;
292
293 {
294 dSP;
295 CV *cv;
296
297 /* now do the ugly restore mess */
298 while ((cv = (CV *)POPs))
299 {
300 AV *padlist = (AV *)POPs;
301
302 unuse_padlist (CvPADLIST(cv));
303 CvPADLIST(cv) = padlist;
304 CvDEPTH(cv) = (I32)POPs;
305
306#ifdef USE_THREADS
307 CvOWNER(cv) = (struct perl_thread *)POPs;
308 error does not work either
309#endif
310 }
311
312 PUTBACK;
313 }
314}
315
316/* this is an EXACT copy of S_nuke_stacks in perl.c, which is unfortunately static */
317STATIC void
318S_nuke_stacks(pTHX)
319{
320 while (PL_curstackinfo->si_next)
321 PL_curstackinfo = PL_curstackinfo->si_next;
322 while (PL_curstackinfo) {
323 PERL_SI *p = PL_curstackinfo->si_prev;
324 /* curstackinfo->si_stack got nuked by sv_free_arenas() */
325 Safefree(PL_curstackinfo->si_cxstack);
326 Safefree(PL_curstackinfo);
327 PL_curstackinfo = p;
328 }
329 Safefree(PL_tmps_stack);
330 Safefree(PL_markstack);
331 Safefree(PL_scopestack);
332 Safefree(PL_savestack);
333 Safefree(PL_retstack);
334}
335
336#define SUB_INIT "Coro::State::_newcoro"
337
338MODULE = Coro::State PACKAGE = Coro::State
339
340PROTOTYPES: ENABLE
341
342BOOT:
343 if (!padlist_cache)
344 padlist_cache = newHV ();
345
346Coro::State
347_newprocess(args)
348 SV * args
349 PROTOTYPE: $
350 CODE:
351 Coro__State coro;
352
353 if (!SvROK (args) || SvTYPE (SvRV (args)) != SVt_PVAV)
354 croak ("Coro::State::newprocess expects an arrayref");
355
356 New (0, coro, 1, struct coro); 3097 Newz (0, coro, 1, struct coro);
3098 coro->args = newAV ();
3099 coro->flags = CF_NEW;
357 3100
358 coro->mainstack = 0; /* actual work is done inside transfer */ 3101 if (coro_first) coro_first->prev = coro;
359 coro->args = (AV *)SvREFCNT_inc (SvRV (args)); 3102 coro->next = coro_first;
3103 coro_first = coro;
360 3104
361 RETVAL = coro; 3105 coro->hv = hv = newHV ();
3106 mg = sv_magicext ((SV *)hv, 0, CORO_MAGIC_type_state, &coro_state_vtbl, (char *)coro, 0);
3107 mg->mg_flags |= MGf_DUP;
3108 RETVAL = sv_bless (newRV_noinc ((SV *)hv), gv_stashpv (klass, 1));
3109
3110 if (items > 1)
3111 {
3112 av_extend (coro->args, items - 1 + ix - 1);
3113
3114 if (ix)
3115 {
3116 av_push (coro->args, SvREFCNT_inc_NN ((SV *)cb));
3117 cb = cv_coro_run;
3118 }
3119
3120 coro->startcv = (CV *)SvREFCNT_inc_NN ((SV *)cb);
3121
3122 for (i = 2; i < items; i++)
3123 av_push (coro->args, newSVsv (ST (i)));
3124 }
3125}
362 OUTPUT: 3126 OUTPUT:
363 RETVAL 3127 RETVAL
364 3128
365void 3129void
366transfer(prev,next) 3130transfer (...)
367 Coro::State_or_hashref prev 3131 PROTOTYPE: $$
368 Coro::State_or_hashref next 3132 CODE:
369 CODE: 3133 CORO_EXECUTE_SLF_XS (slf_init_transfer);
370 3134
371 if (prev != next) 3135bool
3136_destroy (SV *coro_sv)
3137 CODE:
3138 RETVAL = coro_state_destroy (aTHX_ SvSTATE (coro_sv));
3139 OUTPUT:
3140 RETVAL
3141
3142void
3143_exit (int code)
3144 PROTOTYPE: $
3145 CODE:
3146 _exit (code);
3147
3148SV *
3149clone (Coro::State coro)
3150 CODE:
3151{
3152#if CORO_CLONE
3153 struct coro *ncoro = coro_clone (aTHX_ coro);
3154 MAGIC *mg;
3155 /* TODO: too much duplication */
3156 ncoro->hv = newHV ();
3157 mg = sv_magicext ((SV *)ncoro->hv, 0, CORO_MAGIC_type_state, &coro_state_vtbl, (char *)ncoro, 0);
3158 mg->mg_flags |= MGf_DUP;
3159 RETVAL = sv_bless (newRV_noinc ((SV *)ncoro->hv), SvSTASH (coro->hv));
3160#else
3161 croak ("Coro::State->clone has not been configured into this installation of Coro, realised");
3162#endif
3163}
3164 OUTPUT:
3165 RETVAL
3166
3167int
3168cctx_stacksize (int new_stacksize = 0)
3169 PROTOTYPE: ;$
3170 CODE:
3171 RETVAL = cctx_stacksize;
3172 if (new_stacksize)
372 { 3173 {
373 PUTBACK; 3174 cctx_stacksize = new_stacksize;
374 SAVE (aTHX_ prev); 3175 ++cctx_gen;
375
376 /* 3176 }
377 * this could be done in newprocess which would lead to 3177 OUTPUT:
378 * extremely elegant and fast (just PUTBACK/SAVE/LOAD/SPAGAIN) 3178 RETVAL
379 * code here, but lazy allocation of stacks has also 3179
380 * some virtues and the overhead of the if() is nil. 3180int
3181cctx_max_idle (int max_idle = 0)
3182 PROTOTYPE: ;$
3183 CODE:
3184 RETVAL = cctx_max_idle;
3185 if (max_idle > 1)
3186 cctx_max_idle = max_idle;
3187 OUTPUT:
3188 RETVAL
3189
3190int
3191cctx_count ()
3192 PROTOTYPE:
3193 CODE:
3194 RETVAL = cctx_count;
3195 OUTPUT:
3196 RETVAL
3197
3198int
3199cctx_idle ()
3200 PROTOTYPE:
3201 CODE:
3202 RETVAL = cctx_idle;
3203 OUTPUT:
3204 RETVAL
3205
3206void
3207list ()
3208 PROTOTYPE:
3209 PPCODE:
3210{
3211 struct coro *coro;
3212 for (coro = coro_first; coro; coro = coro->next)
3213 if (coro->hv)
3214 XPUSHs (sv_2mortal (newRV_inc ((SV *)coro->hv)));
3215}
3216
3217void
3218call (Coro::State coro, SV *coderef)
3219 ALIAS:
3220 eval = 1
3221 CODE:
3222{
3223 if (coro->mainstack && ((coro->flags & CF_RUNNING) || coro->slot))
381 */ 3224 {
382 if (next->mainstack) 3225 struct coro *current = SvSTATE_current;
3226
3227 if (current != coro)
383 { 3228 {
384 LOAD (aTHX_ next); 3229 PUTBACK;
385 next->mainstack = 0; /* unnecessary but much cleaner */ 3230 save_perl (aTHX_ current);
3231 load_perl (aTHX_ coro);
386 SPAGAIN; 3232 SPAGAIN;
387 } 3233 }
3234
3235 PUSHSTACK;
3236
3237 PUSHMARK (SP);
3238 PUTBACK;
3239
3240 if (ix)
3241 eval_sv (coderef, 0);
388 else 3242 else
3243 call_sv (coderef, G_KEEPERR | G_EVAL | G_VOID | G_DISCARD);
3244
3245 POPSTACK;
3246 SPAGAIN;
3247
3248 if (current != coro)
389 { 3249 {
390 /* 3250 PUTBACK;
391 * emulate part of the perl startup here. 3251 save_perl (aTHX_ coro);
392 */ 3252 load_perl (aTHX_ current);
393 UNOP myop;
394
395 init_stacks ();
396 PL_op = (OP *)&myop;
397 /*PL_curcop = 0;*/
398 GvAV (PL_defgv) = (SV *)SvREFCNT_inc (next->args);
399
400 SPAGAIN; 3253 SPAGAIN;
401 Zero(&myop, 1, UNOP);
402 myop.op_next = Nullop;
403 myop.op_flags = OPf_WANT_VOID;
404
405 PUSHMARK(SP);
406 XPUSHs ((SV*)get_cv(SUB_INIT, TRUE));
407 PUTBACK;
408 /*
409 * the next line is slightly wrong, as PL_op->op_next
410 * is actually being executed so we skip the first op.
411 * that doesn't matter, though, since it is only
412 * pp_nextstate and we never return...
413 */
414 PL_op = Perl_pp_entersub(aTHX);
415 SPAGAIN;
416
417 ENTER;
418 } 3254 }
419 } 3255 }
3256}
3257
3258SV *
3259is_ready (Coro::State coro)
3260 PROTOTYPE: $
3261 ALIAS:
3262 is_ready = CF_READY
3263 is_running = CF_RUNNING
3264 is_new = CF_NEW
3265 is_destroyed = CF_DESTROYED
3266 is_suspended = CF_SUSPENDED
3267 CODE:
3268 RETVAL = boolSV (coro->flags & ix);
3269 OUTPUT:
3270 RETVAL
420 3271
421void 3272void
422DESTROY(coro) 3273throw (Coro::State self, SV *throw = &PL_sv_undef)
423 Coro::State coro 3274 PROTOTYPE: $;$
424 CODE: 3275 CODE:
3276{
3277 struct coro *current = SvSTATE_current;
3278 SV **throwp = self == current ? &CORO_THROW : &self->except;
3279 SvREFCNT_dec (*throwp);
3280 SvGETMAGIC (throw);
3281 *throwp = SvOK (throw) ? newSVsv (throw) : 0;
3282}
425 3283
426 if (coro->mainstack) 3284void
3285api_trace (SV *coro, int flags = CC_TRACE | CC_TRACE_SUB)
3286 PROTOTYPE: $;$
3287 C_ARGS: aTHX_ coro, flags
3288
3289SV *
3290has_cctx (Coro::State coro)
3291 PROTOTYPE: $
3292 CODE:
3293 /* maybe manage the running flag differently */
3294 RETVAL = boolSV (!!coro->cctx || (coro->flags & CF_RUNNING));
3295 OUTPUT:
3296 RETVAL
3297
3298int
3299is_traced (Coro::State coro)
3300 PROTOTYPE: $
3301 CODE:
3302 RETVAL = (coro->cctx ? coro->cctx->flags : 0) & CC_TRACE_ALL;
3303 OUTPUT:
3304 RETVAL
3305
3306UV
3307rss (Coro::State coro)
3308 PROTOTYPE: $
3309 ALIAS:
3310 usecount = 1
3311 CODE:
3312 switch (ix)
3313 {
3314 case 0: RETVAL = coro_rss (aTHX_ coro); break;
3315 case 1: RETVAL = coro->usecount; break;
3316 }
3317 OUTPUT:
3318 RETVAL
3319
3320void
3321force_cctx ()
3322 PROTOTYPE:
3323 CODE:
3324 cctx_current->idle_sp = 0;
3325
3326void
3327swap_defsv (Coro::State self)
3328 PROTOTYPE: $
3329 ALIAS:
3330 swap_defav = 1
3331 CODE:
3332 if (!self->slot)
3333 croak ("cannot swap state with coroutine that has no saved state,");
3334 else
427 { 3335 {
428 struct coro temp; 3336 SV **src = ix ? (SV **)&GvAV (PL_defgv) : &GvSV (PL_defgv);
3337 SV **dst = ix ? (SV **)&self->slot->defav : (SV **)&self->slot->defsv;
429 3338
3339 SV *tmp = *src; *src = *dst; *dst = tmp;
3340 }
3341
3342void
3343cancel (Coro::State self)
3344 CODE:
3345 coro_state_destroy (aTHX_ self);
3346 coro_call_on_destroy (aTHX_ self); /* actually only for Coro objects */
3347
3348
3349SV *
3350enable_times (int enabled = enable_times)
3351 CODE:
3352{
3353 RETVAL = boolSV (enable_times);
3354
3355 if (enabled != enable_times)
3356 {
3357 enable_times = enabled;
3358
3359 coro_times_update ();
3360 (enabled ? coro_times_sub : coro_times_add)(SvSTATE (coro_current));
3361 }
3362}
3363 OUTPUT:
3364 RETVAL
3365
3366void
3367times (Coro::State self)
3368 PPCODE:
3369{
3370 struct coro *current = SvSTATE (coro_current);
3371
3372 if (expect_false (current == self))
3373 {
3374 coro_times_update ();
3375 coro_times_add (SvSTATE (coro_current));
3376 }
3377
3378 EXTEND (SP, 2);
3379 PUSHs (sv_2mortal (newSVnv (self->t_real [0] + self->t_real [1] * 1e-9)));
3380 PUSHs (sv_2mortal (newSVnv (self->t_cpu [0] + self->t_cpu [1] * 1e-9)));
3381
3382 if (expect_false (current == self))
3383 coro_times_sub (SvSTATE (coro_current));
3384}
3385
3386MODULE = Coro::State PACKAGE = Coro
3387
3388BOOT:
3389{
3390 sv_pool_rss = coro_get_sv (aTHX_ "Coro::POOL_RSS" , TRUE);
3391 sv_pool_size = coro_get_sv (aTHX_ "Coro::POOL_SIZE" , TRUE);
3392 cv_coro_run = get_cv ( "Coro::_coro_run" , GV_ADD);
3393 cv_coro_terminate = get_cv ( "Coro::terminate" , GV_ADD);
3394 coro_current = coro_get_sv (aTHX_ "Coro::current" , FALSE); SvREADONLY_on (coro_current);
3395 av_async_pool = coro_get_av (aTHX_ "Coro::async_pool", TRUE);
3396 av_destroy = coro_get_av (aTHX_ "Coro::destroy" , TRUE);
3397 sv_manager = coro_get_sv (aTHX_ "Coro::manager" , TRUE);
3398 sv_idle = coro_get_sv (aTHX_ "Coro::idle" , TRUE);
3399
3400 sv_async_pool_idle = newSVpv ("[async pool idle]", 0); SvREADONLY_on (sv_async_pool_idle);
3401 sv_Coro = newSVpv ("Coro", 0); SvREADONLY_on (sv_Coro);
3402 cv_pool_handler = get_cv ("Coro::pool_handler", GV_ADD); SvREADONLY_on (cv_pool_handler);
3403 cv_coro_state_new = get_cv ("Coro::State::new", 0); SvREADONLY_on (cv_coro_state_new);
3404
3405 coro_stash = gv_stashpv ("Coro", TRUE);
3406
3407 newCONSTSUB (coro_stash, "PRIO_MAX", newSViv (CORO_PRIO_MAX));
3408 newCONSTSUB (coro_stash, "PRIO_HIGH", newSViv (CORO_PRIO_HIGH));
3409 newCONSTSUB (coro_stash, "PRIO_NORMAL", newSViv (CORO_PRIO_NORMAL));
3410 newCONSTSUB (coro_stash, "PRIO_LOW", newSViv (CORO_PRIO_LOW));
3411 newCONSTSUB (coro_stash, "PRIO_IDLE", newSViv (CORO_PRIO_IDLE));
3412 newCONSTSUB (coro_stash, "PRIO_MIN", newSViv (CORO_PRIO_MIN));
3413
3414 {
3415 SV *sv = coro_get_sv (aTHX_ "Coro::API", TRUE);
3416
3417 coroapi.schedule = api_schedule;
3418 coroapi.schedule_to = api_schedule_to;
3419 coroapi.cede = api_cede;
3420 coroapi.cede_notself = api_cede_notself;
3421 coroapi.ready = api_ready;
3422 coroapi.is_ready = api_is_ready;
3423 coroapi.nready = coro_nready;
3424 coroapi.current = coro_current;
3425
3426 /*GCoroAPI = &coroapi;*/
3427 sv_setiv (sv, (IV)&coroapi);
3428 SvREADONLY_on (sv);
3429 }
3430}
3431
3432void
3433terminate (...)
3434 CODE:
3435 CORO_EXECUTE_SLF_XS (slf_init_terminate);
3436
3437void
3438schedule (...)
3439 CODE:
3440 CORO_EXECUTE_SLF_XS (slf_init_schedule);
3441
3442void
3443schedule_to (...)
3444 CODE:
3445 CORO_EXECUTE_SLF_XS (slf_init_schedule_to);
3446
3447void
3448cede_to (...)
3449 CODE:
3450 CORO_EXECUTE_SLF_XS (slf_init_cede_to);
3451
3452void
3453cede (...)
3454 CODE:
3455 CORO_EXECUTE_SLF_XS (slf_init_cede);
3456
3457void
3458cede_notself (...)
3459 CODE:
3460 CORO_EXECUTE_SLF_XS (slf_init_cede_notself);
3461
3462void
3463_set_current (SV *current)
3464 PROTOTYPE: $
3465 CODE:
3466 SvREFCNT_dec (SvRV (coro_current));
3467 SvRV_set (coro_current, SvREFCNT_inc_NN (SvRV (current)));
3468
3469void
3470_set_readyhook (SV *hook)
3471 PROTOTYPE: $
3472 CODE:
3473 SvREFCNT_dec (coro_readyhook);
3474 SvGETMAGIC (hook);
3475 coro_readyhook = SvOK (hook) ? newSVsv (hook) : 0;
3476
3477int
3478prio (Coro::State coro, int newprio = 0)
3479 PROTOTYPE: $;$
3480 ALIAS:
3481 nice = 1
3482 CODE:
3483{
3484 RETVAL = coro->prio;
3485
3486 if (items > 1)
3487 {
3488 if (ix)
3489 newprio = coro->prio - newprio;
3490
3491 if (newprio < CORO_PRIO_MIN) newprio = CORO_PRIO_MIN;
3492 if (newprio > CORO_PRIO_MAX) newprio = CORO_PRIO_MAX;
3493
3494 coro->prio = newprio;
3495 }
3496}
3497 OUTPUT:
3498 RETVAL
3499
3500SV *
3501ready (SV *self)
3502 PROTOTYPE: $
3503 CODE:
3504 RETVAL = boolSV (api_ready (aTHX_ self));
3505 OUTPUT:
3506 RETVAL
3507
3508int
3509nready (...)
3510 PROTOTYPE:
3511 CODE:
3512 RETVAL = coro_nready;
3513 OUTPUT:
3514 RETVAL
3515
3516void
3517suspend (Coro::State self)
3518 PROTOTYPE: $
3519 CODE:
3520 self->flags |= CF_SUSPENDED;
3521
3522void
3523resume (Coro::State self)
3524 PROTOTYPE: $
3525 CODE:
3526 self->flags &= ~CF_SUSPENDED;
3527
3528void
3529_pool_handler (...)
3530 CODE:
3531 CORO_EXECUTE_SLF_XS (slf_init_pool_handler);
3532
3533void
3534async_pool (SV *cv, ...)
3535 PROTOTYPE: &@
3536 PPCODE:
3537{
3538 HV *hv = (HV *)av_pop (av_async_pool);
3539 AV *av = newAV ();
3540 SV *cb = ST (0);
3541 int i;
3542
3543 av_extend (av, items - 2);
3544 for (i = 1; i < items; ++i)
3545 av_push (av, SvREFCNT_inc_NN (ST (i)));
3546
3547 if ((SV *)hv == &PL_sv_undef)
3548 {
3549 PUSHMARK (SP);
3550 EXTEND (SP, 2);
3551 PUSHs (sv_Coro);
3552 PUSHs ((SV *)cv_pool_handler);
430 PUTBACK; 3553 PUTBACK;
431 SAVE(aTHX_ (&temp)); 3554 call_sv ((SV *)cv_coro_state_new, G_SCALAR);
432 LOAD(aTHX_ coro);
433
434 S_nuke_stacks ();
435 SvREFCNT_dec ((SV *)GvAV (PL_defgv));
436
437 LOAD((&temp));
438 SPAGAIN; 3555 SPAGAIN;
3556
3557 hv = (HV *)SvREFCNT_inc_NN (SvRV (POPs));
439 } 3558 }
440 3559
3560 {
3561 struct coro *coro = SvSTATE_hv (hv);
3562
3563 assert (!coro->invoke_cb);
3564 assert (!coro->invoke_av);
3565 coro->invoke_cb = SvREFCNT_inc (cb);
3566 coro->invoke_av = av;
3567 }
3568
3569 api_ready (aTHX_ (SV *)hv);
3570
3571 if (GIMME_V != G_VOID)
3572 XPUSHs (sv_2mortal (newRV_noinc ((SV *)hv)));
3573 else
441 SvREFCNT_dec (coro->args); 3574 SvREFCNT_dec (hv);
442 Safefree (coro); 3575}
443 3576
3577SV *
3578rouse_cb ()
3579 PROTOTYPE:
3580 CODE:
3581 RETVAL = coro_new_rouse_cb (aTHX);
3582 OUTPUT:
3583 RETVAL
444 3584
3585void
3586rouse_wait (...)
3587 PROTOTYPE: ;$
3588 PPCODE:
3589 CORO_EXECUTE_SLF_XS (slf_init_rouse_wait);
3590
3591void
3592on_enter (SV *block)
3593 ALIAS:
3594 on_leave = 1
3595 PROTOTYPE: &
3596 CODE:
3597{
3598 struct coro *coro = SvSTATE_current;
3599 AV **avp = ix ? &coro->on_leave : &coro->on_enter;
3600
3601 block = (SV *)coro_sv_2cv (aTHX_ block);
3602
3603 if (!*avp)
3604 *avp = newAV ();
3605
3606 av_push (*avp, SvREFCNT_inc (block));
3607
3608 if (!ix)
3609 on_enterleave_call (aTHX_ block);
3610
3611 LEAVE; /* pp_entersub unfortunately forces an ENTER/LEAVE around XS calls */
3612 SAVEDESTRUCTOR_X (ix ? coro_pop_on_leave : coro_pop_on_enter, (void *)coro);
3613 ENTER; /* pp_entersub unfortunately forces an ENTER/LEAVE around XS calls */
3614}
3615
3616
3617MODULE = Coro::State PACKAGE = PerlIO::cede
3618
3619BOOT:
3620 PerlIO_define_layer (aTHX_ &PerlIO_cede);
3621
3622
3623MODULE = Coro::State PACKAGE = Coro::Semaphore
3624
3625SV *
3626new (SV *klass, SV *count = 0)
3627 CODE:
3628{
3629 int semcnt = 1;
3630
3631 if (count)
3632 {
3633 SvGETMAGIC (count);
3634
3635 if (SvOK (count))
3636 semcnt = SvIV (count);
3637 }
3638
3639 RETVAL = sv_bless (
3640 coro_waitarray_new (aTHX_ semcnt),
3641 GvSTASH (CvGV (cv))
3642 );
3643}
3644 OUTPUT:
3645 RETVAL
3646
3647# helper for Coro::Channel and others
3648SV *
3649_alloc (int count)
3650 CODE:
3651 RETVAL = coro_waitarray_new (aTHX_ count);
3652 OUTPUT:
3653 RETVAL
3654
3655SV *
3656count (SV *self)
3657 CODE:
3658 RETVAL = newSVsv (AvARRAY ((AV *)SvRV (self))[0]);
3659 OUTPUT:
3660 RETVAL
3661
3662void
3663up (SV *self, int adjust = 1)
3664 ALIAS:
3665 adjust = 1
3666 CODE:
3667 coro_semaphore_adjust (aTHX_ (AV *)SvRV (self), ix ? adjust : 1);
3668
3669void
3670down (...)
3671 CODE:
3672 CORO_EXECUTE_SLF_XS (slf_init_semaphore_down);
3673
3674void
3675wait (...)
3676 CODE:
3677 CORO_EXECUTE_SLF_XS (slf_init_semaphore_wait);
3678
3679void
3680try (SV *self)
3681 PPCODE:
3682{
3683 AV *av = (AV *)SvRV (self);
3684 SV *count_sv = AvARRAY (av)[0];
3685 IV count = SvIVX (count_sv);
3686
3687 if (count > 0)
3688 {
3689 --count;
3690 SvIVX (count_sv) = count;
3691 XSRETURN_YES;
3692 }
3693 else
3694 XSRETURN_NO;
3695}
3696
3697void
3698waiters (SV *self)
3699 PPCODE:
3700{
3701 AV *av = (AV *)SvRV (self);
3702 int wcount = AvFILLp (av) + 1 - 1;
3703
3704 if (GIMME_V == G_SCALAR)
3705 XPUSHs (sv_2mortal (newSViv (wcount)));
3706 else
3707 {
3708 int i;
3709 EXTEND (SP, wcount);
3710 for (i = 1; i <= wcount; ++i)
3711 PUSHs (sv_2mortal (newRV_inc (AvARRAY (av)[i])));
3712 }
3713}
3714
3715MODULE = Coro::State PACKAGE = Coro::SemaphoreSet
3716
3717void
3718_may_delete (SV *sem, int count, int extra_refs)
3719 PPCODE:
3720{
3721 AV *av = (AV *)SvRV (sem);
3722
3723 if (SvREFCNT ((SV *)av) == 1 + extra_refs
3724 && AvFILLp (av) == 0 /* no waiters, just count */
3725 && SvIV (AvARRAY (av)[0]) == count)
3726 XSRETURN_YES;
3727
3728 XSRETURN_NO;
3729}
3730
3731MODULE = Coro::State PACKAGE = Coro::Signal
3732
3733SV *
3734new (SV *klass)
3735 CODE:
3736 RETVAL = sv_bless (
3737 coro_waitarray_new (aTHX_ 0),
3738 GvSTASH (CvGV (cv))
3739 );
3740 OUTPUT:
3741 RETVAL
3742
3743void
3744wait (...)
3745 CODE:
3746 CORO_EXECUTE_SLF_XS (slf_init_signal_wait);
3747
3748void
3749broadcast (SV *self)
3750 CODE:
3751{
3752 AV *av = (AV *)SvRV (self);
3753 coro_signal_wake (aTHX_ av, AvFILLp (av));
3754}
3755
3756void
3757send (SV *self)
3758 CODE:
3759{
3760 AV *av = (AV *)SvRV (self);
3761
3762 if (AvFILLp (av))
3763 coro_signal_wake (aTHX_ av, 1);
3764 else
3765 SvIVX (AvARRAY (av)[0]) = 1; /* remember the signal */
3766}
3767
3768IV
3769awaited (SV *self)
3770 CODE:
3771 RETVAL = AvFILLp ((AV *)SvRV (self)) + 1 - 1;
3772 OUTPUT:
3773 RETVAL
3774
3775
3776MODULE = Coro::State PACKAGE = Coro::AnyEvent
3777
3778BOOT:
3779 sv_activity = coro_get_sv (aTHX_ "Coro::AnyEvent::ACTIVITY", TRUE);
3780
3781void
3782_schedule (...)
3783 CODE:
3784{
3785 static int incede;
3786
3787 api_cede_notself (aTHX);
3788
3789 ++incede;
3790 while (coro_nready >= incede && api_cede (aTHX))
3791 ;
3792
3793 sv_setsv (sv_activity, &PL_sv_undef);
3794 if (coro_nready >= incede)
3795 {
3796 PUSHMARK (SP);
3797 PUTBACK;
3798 call_pv ("Coro::AnyEvent::_activity", G_KEEPERR | G_EVAL | G_VOID | G_DISCARD);
3799 }
3800
3801 --incede;
3802}
3803
3804
3805MODULE = Coro::State PACKAGE = Coro::AIO
3806
3807void
3808_register (char *target, char *proto, SV *req)
3809 CODE:
3810{
3811 CV *req_cv = coro_sv_2cv (aTHX_ req);
3812 /* newXSproto doesn't return the CV on 5.8 */
3813 CV *slf_cv = newXS (target, coro_aio_req_xs, __FILE__);
3814 sv_setpv ((SV *)slf_cv, proto);
3815 sv_magicext ((SV *)slf_cv, (SV *)req_cv, CORO_MAGIC_type_aio, 0, 0, 0);
3816}
3817

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines