ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/Coro/Coro/State.xs
(Generate patch)

Comparing Coro/Coro/State.xs (file contents):
Revision 1.6 by root, Tue Jul 17 15:42:28 2001 UTC vs.
Revision 1.416 by root, Mon Dec 12 15:19:56 2011 UTC

1/* this works around a bug in mingw32 providing a non-working setjmp */
2#define USE_NO_MINGW_SETJMP_TWO_ARGS
3
4#define NDEBUG 1
5
6#include "libcoro/coro.c"
7
8#define PERL_NO_GET_CONTEXT
9#define PERL_EXT
10
1#include "EXTERN.h" 11#include "EXTERN.h"
2#include "perl.h" 12#include "perl.h"
3#include "XSUB.h" 13#include "XSUB.h"
14#include "perliol.h"
4 15
5#if 0 16#include "schmorp.h"
6# define CHK(x) (void *)0 17#include "ecb.h"
18
19#include <stddef.h>
20#include <stdio.h>
21#include <errno.h>
22#include <assert.h>
23
24#ifndef SVs_PADSTALE
25# define SVs_PADSTALE 0
26#endif
27
28#if defined(_WIN32)
29# undef HAS_GETTIMEOFDAY
30# undef setjmp
31# undef longjmp
32# undef _exit
33# define setjmp _setjmp /* deep magic */
7#else 34#else
8# define CHK(x) if (!(x)) croak("FATAL, CHK: " #x) 35# include <inttypes.h> /* most portable stdint.h */
36#endif
37
38#if HAVE_MMAP
39# include <unistd.h>
40# include <sys/mman.h>
41# ifndef MAP_ANONYMOUS
42# ifdef MAP_ANON
43# define MAP_ANONYMOUS MAP_ANON
44# else
45# undef HAVE_MMAP
46# endif
9#endif 47# endif
48# include <limits.h>
49# ifndef PAGESIZE
50# define PAGESIZE pagesize
51# define BOOT_PAGESIZE pagesize = sysconf (_SC_PAGESIZE)
52static long pagesize;
53# else
54# define BOOT_PAGESIZE (void)0
55# endif
56#else
57# define PAGESIZE 0
58# define BOOT_PAGESIZE (void)0
59#endif
10 60
61#if CORO_USE_VALGRIND
62# include <valgrind/valgrind.h>
63#endif
64
65/* the maximum number of idle cctx that will be pooled */
66static int cctx_max_idle = 4;
67
68#if defined(DEBUGGING) && PERL_VERSION_ATLEAST(5,12,0)
69# define HAS_SCOPESTACK_NAME 1
70#endif
71
72#if !__i386 && !__x86_64 && !__powerpc && !__m68k && !__alpha && !__mips && !__sparc64
73# undef CORO_STACKGUARD
74#endif
75
76#ifndef CORO_STACKGUARD
77# define CORO_STACKGUARD 0
78#endif
79
80/* prefer perl internal functions over our own? */
81#ifndef CORO_PREFER_PERL_FUNCTIONS
82# define CORO_PREFER_PERL_FUNCTIONS 0
83#endif
84
85/* The next macros try to return the current stack pointer, in an as
86 * portable way as possible. */
87#if __GNUC__ >= 4
88# define dSTACKLEVEL int stacklevel_dummy
89# define STACKLEVEL __builtin_frame_address (0)
90#else
91# define dSTACKLEVEL volatile void *stacklevel
92# define STACKLEVEL ((void *)&stacklevel)
93#endif
94
95#define IN_DESTRUCT PL_dirty
96
97#include "CoroAPI.h"
98#define GCoroAPI (&coroapi) /* very sneaky */
99
100#ifdef USE_ITHREADS
101# if CORO_PTHREAD
102static void *coro_thx;
103# endif
104#endif
105
106/* used in state.h */
107#define VAR(name,type) VARx(name, PL_ ## name, type)
108
109#ifdef __linux
110# include <time.h> /* for timespec */
111# include <syscall.h> /* for SYS_* */
112# ifdef SYS_clock_gettime
113# define coro_clock_gettime(id, ts) syscall (SYS_clock_gettime, (id), (ts))
114# define CORO_CLOCK_MONOTONIC 1
115# define CORO_CLOCK_THREAD_CPUTIME_ID 3
116# endif
117#endif
118
119static double (*nvtime)(); /* so why doesn't it take void? */
120static void (*u2time)(pTHX_ UV ret[2]);
121
122/* we hijack an hopefully unused CV flag for our purposes */
123#define CVf_SLF 0x4000
124static OP *pp_slf (pTHX);
125static void slf_destroy (pTHX_ struct coro *coro);
126
127static U32 cctx_gen;
128static size_t cctx_stacksize = CORO_STACKSIZE;
129static struct CoroAPI coroapi;
130static AV *main_mainstack; /* used to differentiate between $main and others */
131static JMPENV *main_top_env;
132static HV *coro_state_stash, *coro_stash;
133static volatile SV *coro_mortal; /* will be freed/thrown after next transfer */
134
135static AV *av_destroy; /* destruction queue */
136static SV *sv_manager; /* the manager coro */
137static SV *sv_idle; /* $Coro::idle */
138
139static GV *irsgv; /* $/ */
140static GV *stdoutgv; /* *STDOUT */
141static SV *rv_diehook;
142static SV *rv_warnhook;
143static HV *hv_sig; /* %SIG */
144
145/* async_pool helper stuff */
146static SV *sv_pool_rss;
147static SV *sv_pool_size;
148static SV *sv_async_pool_idle; /* description string */
149static AV *av_async_pool; /* idle pool */
150static SV *sv_Coro; /* class string */
151static CV *cv_pool_handler;
152
153/* Coro::AnyEvent */
154static SV *sv_activity;
155
156/* enable processtime/realtime profiling */
157static char enable_times;
158typedef U32 coro_ts[2];
159static coro_ts time_real, time_cpu;
160static char times_valid;
161
162static struct coro_cctx *cctx_first;
163static int cctx_count, cctx_idle;
164
165enum
166{
167 CC_MAPPED = 0x01,
168 CC_NOREUSE = 0x02, /* throw this away after tracing */
169 CC_TRACE = 0x04,
170 CC_TRACE_SUB = 0x08, /* trace sub calls */
171 CC_TRACE_LINE = 0x10, /* trace each statement */
172 CC_TRACE_ALL = CC_TRACE_SUB | CC_TRACE_LINE,
173};
174
175/* this is a structure representing a c-level coroutine */
176typedef struct coro_cctx
177{
178 struct coro_cctx *next;
179
180 /* the stack */
181 void *sptr;
182 size_t ssize;
183
184 /* cpu state */
185 void *idle_sp; /* sp of top-level transfer/schedule/cede call */
186#ifndef NDEBUG
187 JMPENV *idle_te; /* same as idle_sp, but for top_env */
188#endif
189 JMPENV *top_env;
190 coro_context cctx;
191
192 U32 gen;
193#if CORO_USE_VALGRIND
194 int valgrind_id;
195#endif
196 unsigned char flags;
197} coro_cctx;
198
199static coro_cctx *cctx_current; /* the currently running cctx */
200
201/*****************************************************************************/
202
203static MGVTBL coro_state_vtbl;
204
205enum
206{
207 CF_RUNNING = 0x0001, /* coroutine is running */
208 CF_READY = 0x0002, /* coroutine is ready */
209 CF_NEW = 0x0004, /* has never been switched to */
210 CF_ZOMBIE = 0x0008, /* coroutine data has been freed */
211 CF_SUSPENDED = 0x0010, /* coroutine can't be scheduled */
212 CF_NOCANCEL = 0x0020, /* cannot cancel, set slf_frame.data to 1 (hackish) */
213};
214
215/* the structure where most of the perl state is stored, overlaid on the cxstack */
216typedef struct
217{
218#define VARx(name,expr,type) type name;
219# include "state.h"
220#undef VARx
221} perl_slots;
222
223/* how many context stack entries do we need for perl_slots */
224#define SLOT_COUNT ((sizeof (perl_slots) + sizeof (PERL_CONTEXT) - 1) / sizeof (PERL_CONTEXT))
225
226/* this is a structure representing a perl-level coroutine */
11struct coro { 227struct coro
12 U8 dowarn; 228{
13 AV *defav; 229 /* the C coroutine allocated to this perl coroutine, if any */
14 230 coro_cctx *cctx;
15 PERL_SI *curstackinfo; 231
16 AV *curstack; 232 /* ready queue */
233 struct coro *next_ready;
234
235 /* state data */
236 struct CoroSLF slf_frame; /* saved slf frame */
17 AV *mainstack; 237 AV *mainstack;
18 SV **stack_sp; 238 perl_slots *slot; /* basically the saved sp */
19 OP *op;
20 SV **curpad;
21 SV **stack_base;
22 SV **stack_max;
23 SV **tmps_stack;
24 I32 tmps_floor;
25 I32 tmps_ix;
26 I32 tmps_max;
27 I32 *markstack;
28 I32 *markstack_ptr;
29 I32 *markstack_max;
30 I32 *scopestack;
31 I32 scopestack_ix;
32 I32 scopestack_max;
33 ANY *savestack;
34 I32 savestack_ix;
35 I32 savestack_max;
36 OP **retstack;
37 I32 retstack_ix;
38 I32 retstack_max;
39 COP *curcop;
40 239
41 AV *args; 240 CV *startcv; /* the CV to execute */
241 AV *args; /* data associated with this coroutine (initial args) */
242 int flags; /* CF_ flags */
243 HV *hv; /* the perl hash associated with this coro, if any */
244
245 /* statistics */
246 int usecount; /* number of transfers to this coro */
247
248 /* coro process data */
249 int prio;
250 SV *except; /* exception to be thrown */
251 SV *rouse_cb; /* last rouse callback */
252 AV *on_destroy; /* callbacks or coros to notify on destroy */
253 AV *status; /* the exit status list */
254
255 /* async_pool */
256 SV *saved_deffh;
257 SV *invoke_cb;
258 AV *invoke_av;
259
260 /* on_enter/on_leave */
261 AV *on_enter;
262 AV *on_leave;
263
264 /* swap_sv */
265 AV *swap_sv;
266
267 /* times */
268 coro_ts t_cpu, t_real;
269
270 /* linked list */
271 struct coro *next, *prev;
42}; 272};
43 273
44typedef struct coro *Coro__State; 274typedef struct coro *Coro__State;
45typedef struct coro *Coro__State_or_hashref; 275typedef struct coro *Coro__State_or_hashref;
46 276
47static HV *padlist_cache; 277/* the following variables are effectively part of the perl context */
278/* and get copied between struct coro and these variables */
279/* the main reason we don't support windows process emulation */
280static struct CoroSLF slf_frame; /* the current slf frame */
48 281
49/* mostly copied from op.c:cv_clone2 */ 282/** Coro ********************************************************************/
50STATIC AV * 283
51clone_padlist (AV *protopadlist) 284#define CORO_PRIO_MAX 3
285#define CORO_PRIO_HIGH 1
286#define CORO_PRIO_NORMAL 0
287#define CORO_PRIO_LOW -1
288#define CORO_PRIO_IDLE -3
289#define CORO_PRIO_MIN -4
290
291/* for Coro.pm */
292static SV *coro_current;
293static SV *coro_readyhook;
294static struct coro *coro_ready [CORO_PRIO_MAX - CORO_PRIO_MIN + 1][2]; /* head|tail */
295static CV *cv_coro_run;
296static struct coro *coro_first;
297#define coro_nready coroapi.nready
298
299/** JIT *********************************************************************/
300
301#if CORO_JIT
302 /* APPLE doesn't have HAVE_MMAP though */
303 #define CORO_JIT_UNIXY (__linux || __FreeBSD__ || __OpenBSD__ || __NetBSD__ || __solaris || __APPLE__)
304 #ifndef CORO_JIT_TYPE
305 #if __x86_64 && CORO_JIT_UNIXY
306 #define CORO_JIT_TYPE "amd64-unix"
307 #elif __i386 && CORO_JIT_UNIXY
308 #define CORO_JIT_TYPE "x86-unix"
309 #endif
310 #endif
311#endif
312
313#if !defined(CORO_JIT_TYPE) || !HAVE_MMAP
314 #undef CORO_JIT
315#endif
316
317#if CORO_JIT
318 typedef void (*load_save_perl_slots_type)(perl_slots *);
319 static load_save_perl_slots_type load_perl_slots, save_perl_slots;
320#endif
321
322/** Coro::Select ************************************************************/
323
324static OP *(*coro_old_pp_sselect) (pTHX);
325static SV *coro_select_select;
326
327/* horrible hack, but if it works... */
328static OP *
329coro_pp_sselect (pTHX)
52{ 330{
53 AV *av; 331 dSP;
54 I32 ix; 332 PUSHMARK (SP - 4); /* fake argument list */
55 AV *protopad_name = (AV *) * av_fetch (protopadlist, 0, FALSE); 333 XPUSHs (coro_select_select);
56 AV *protopad = (AV *) * av_fetch (protopadlist, 1, FALSE); 334 PUTBACK;
57 SV **pname = AvARRAY (protopad_name); 335
58 SV **ppad = AvARRAY (protopad); 336 /* entersub is an UNOP, select a LISTOP... keep your fingers crossed */
59 I32 fname = AvFILLp (protopad_name); 337 PL_op->op_flags |= OPf_STACKED;
60 I32 fpad = AvFILLp (protopad); 338 PL_op->op_private = 0;
339 return PL_ppaddr [OP_ENTERSUB](aTHX);
340}
341
342/** time stuff **************************************************************/
343
344#ifdef HAS_GETTIMEOFDAY
345
346ecb_inline void
347coro_u2time (pTHX_ UV ret[2])
348{
349 struct timeval tv;
350 gettimeofday (&tv, 0);
351
352 ret [0] = tv.tv_sec;
353 ret [1] = tv.tv_usec;
354}
355
356ecb_inline double
357coro_nvtime (void)
358{
359 struct timeval tv;
360 gettimeofday (&tv, 0);
361
362 return tv.tv_sec + tv.tv_usec * 1e-6;
363}
364
365ecb_inline void
366time_init (pTHX)
367{
368 nvtime = coro_nvtime;
369 u2time = coro_u2time;
370}
371
372#else
373
374ecb_inline void
375time_init (pTHX)
376{
377 SV **svp;
378
379 require_pv ("Time/HiRes.pm");
380
381 svp = hv_fetch (PL_modglobal, "Time::NVtime", 12, 0);
382
383 if (!svp) croak ("Time::HiRes is required, but missing. Caught");
384 if (!SvIOK (*svp)) croak ("Time::NVtime isn't a function pointer. Caught");
385
386 nvtime = INT2PTR (double (*)(), SvIV (*svp));
387
388 svp = hv_fetch (PL_modglobal, "Time::U2time", 12, 0);
389 u2time = INT2PTR (void (*)(pTHX_ UV ret[2]), SvIV (*svp));
390}
391
392#endif
393
394/** lowlevel stuff **********************************************************/
395
396static SV * ecb_noinline
397coro_get_sv (pTHX_ const char *name, int create)
398{
399#if PERL_VERSION_ATLEAST (5,10,0)
400 /* silence stupid and wrong 5.10 warning that I am unable to switch off */
401 get_sv (name, create);
402#endif
403 return get_sv (name, create);
404}
405
406static AV * ecb_noinline
407coro_get_av (pTHX_ const char *name, int create)
408{
409#if PERL_VERSION_ATLEAST (5,10,0)
410 /* silence stupid and wrong 5.10 warning that I am unable to switch off */
411 get_av (name, create);
412#endif
413 return get_av (name, create);
414}
415
416static HV * ecb_noinline
417coro_get_hv (pTHX_ const char *name, int create)
418{
419#if PERL_VERSION_ATLEAST (5,10,0)
420 /* silence stupid and wrong 5.10 warning that I am unable to switch off */
421 get_hv (name, create);
422#endif
423 return get_hv (name, create);
424}
425
426ecb_inline void
427coro_times_update (void)
428{
429#ifdef coro_clock_gettime
430 struct timespec ts;
431
432 ts.tv_sec = ts.tv_nsec = 0;
433 coro_clock_gettime (CORO_CLOCK_THREAD_CPUTIME_ID, &ts);
434 time_cpu [0] = ts.tv_sec; time_cpu [1] = ts.tv_nsec;
435
436 ts.tv_sec = ts.tv_nsec = 0;
437 coro_clock_gettime (CORO_CLOCK_MONOTONIC, &ts);
438 time_real [0] = ts.tv_sec; time_real [1] = ts.tv_nsec;
439#else
440 dTHX;
441 UV tv[2];
442
443 u2time (aTHX_ tv);
444 time_real [0] = tv [0];
445 time_real [1] = tv [1] * 1000;
446#endif
447}
448
449ecb_inline void
450coro_times_add (struct coro *c)
451{
452 c->t_real [1] += time_real [1];
453 if (c->t_real [1] > 1000000000) { c->t_real [1] -= 1000000000; ++c->t_real [0]; }
454 c->t_real [0] += time_real [0];
455
456 c->t_cpu [1] += time_cpu [1];
457 if (c->t_cpu [1] > 1000000000) { c->t_cpu [1] -= 1000000000; ++c->t_cpu [0]; }
458 c->t_cpu [0] += time_cpu [0];
459}
460
461ecb_inline void
462coro_times_sub (struct coro *c)
463{
464 if (c->t_real [1] < time_real [1]) { c->t_real [1] += 1000000000; --c->t_real [0]; }
465 c->t_real [1] -= time_real [1];
466 c->t_real [0] -= time_real [0];
467
468 if (c->t_cpu [1] < time_cpu [1]) { c->t_cpu [1] += 1000000000; --c->t_cpu [0]; }
469 c->t_cpu [1] -= time_cpu [1];
470 c->t_cpu [0] -= time_cpu [0];
471}
472
473/*****************************************************************************/
474/* magic glue */
475
476#define CORO_MAGIC_type_cv 26
477#define CORO_MAGIC_type_state PERL_MAGIC_ext
478
479#define CORO_MAGIC_NN(sv, type) \
480 (ecb_expect_true (SvMAGIC (sv)->mg_type == type) \
481 ? SvMAGIC (sv) \
482 : mg_find (sv, type))
483
484#define CORO_MAGIC(sv, type) \
485 (ecb_expect_true (SvMAGIC (sv)) \
486 ? CORO_MAGIC_NN (sv, type) \
487 : 0)
488
489#define CORO_MAGIC_cv(cv) CORO_MAGIC (((SV *)(cv)), CORO_MAGIC_type_cv)
490#define CORO_MAGIC_state(sv) CORO_MAGIC_NN (((SV *)(sv)), CORO_MAGIC_type_state)
491
492ecb_inline MAGIC *
493SvSTATEhv_p (pTHX_ SV *coro)
494{
495 MAGIC *mg;
496
497 if (ecb_expect_true (
498 SvTYPE (coro) == SVt_PVHV
499 && (mg = CORO_MAGIC_state (coro))
500 && mg->mg_virtual == &coro_state_vtbl
501 ))
502 return mg;
503
504 return 0;
505}
506
507ecb_inline struct coro *
508SvSTATE_ (pTHX_ SV *coro)
509{
510 MAGIC *mg;
511
512 if (SvROK (coro))
513 coro = SvRV (coro);
514
515 mg = SvSTATEhv_p (aTHX_ coro);
516 if (!mg)
517 croak ("Coro::State object required");
518
519 return (struct coro *)mg->mg_ptr;
520}
521
522#define SvSTATE(sv) SvSTATE_ (aTHX_ (sv))
523
524/* faster than SvSTATE, but expects a coroutine hv */
525#define SvSTATE_hv(hv) ((struct coro *)CORO_MAGIC_NN ((SV *)hv, CORO_MAGIC_type_state)->mg_ptr)
526#define SvSTATE_current SvSTATE_hv (SvRV (coro_current))
527
528/*****************************************************************************/
529/* padlist management and caching */
530
531ecb_inline AV *
532coro_derive_padlist (pTHX_ CV *cv)
533{
534 AV *padlist = CvPADLIST (cv);
61 AV *newpadlist, *newpad_name, *newpad; 535 AV *newpadlist, *newpad;
62 SV **npad;
63
64 newpad_name = newAV ();
65 for (ix = fname; ix >= 0; ix--)
66 av_store (newpad_name, ix, SvREFCNT_inc (pname[ix]));
67
68 newpad = newAV ();
69 av_fill (newpad, AvFILLp (protopad));
70 npad = AvARRAY (newpad);
71 536
72 newpadlist = newAV (); 537 newpadlist = newAV ();
73 AvREAL_off (newpadlist); 538 AvREAL_off (newpadlist);
74 av_store (newpadlist, 0, (SV *) newpad_name); 539#if PERL_VERSION_ATLEAST (5,10,0)
540 Perl_pad_push (aTHX_ padlist, AvFILLp (padlist) + 1);
541#else
542 Perl_pad_push (aTHX_ padlist, AvFILLp (padlist) + 1, 1);
543#endif
544 newpad = (AV *)AvARRAY (padlist)[AvFILLp (padlist)];
545 --AvFILLp (padlist);
546
547 av_store (newpadlist, 0, SvREFCNT_inc_NN (AvARRAY (padlist)[0]));
75 av_store (newpadlist, 1, (SV *) newpad); 548 av_store (newpadlist, 1, (SV *)newpad);
76 549
77 av = newAV (); /* will be @_ */ 550 return newpadlist;
78 av_extend (av, 0); 551}
79 av_store (newpad, 0, (SV *) av);
80 AvFLAGS (av) = AVf_REIFY;
81 552
82 for (ix = fpad; ix > 0; ix--) 553ecb_inline void
554free_padlist (pTHX_ AV *padlist)
555{
556 /* may be during global destruction */
557 if (!IN_DESTRUCT)
83 { 558 {
84 SV *namesv = (ix <= fname) ? pname[ix] : Nullsv; 559 I32 i = AvFILLp (padlist);
85 if (namesv && namesv != &PL_sv_undef) 560
561 while (i > 0) /* special-case index 0 */
86 { 562 {
87 char *name = SvPVX (namesv); /* XXX */ 563 /* we try to be extra-careful here */
88 if (SvFLAGS (namesv) & SVf_FAKE || *name == '&') 564 AV *av = (AV *)AvARRAY (padlist)[i--];
89 { /* lexical from outside? */ 565 I32 j = AvFILLp (av);
90 npad[ix] = SvREFCNT_inc (ppad[ix]); 566
91 } 567 while (j >= 0)
92 else 568 SvREFCNT_dec (AvARRAY (av)[j--]);
93 { /* our own lexical */ 569
94 SV *sv; 570 AvFILLp (av) = -1;
95 if (*name == '&') 571 SvREFCNT_dec (av);
96 sv = SvREFCNT_inc (ppad[ix]);
97 else if (*name == '@')
98 sv = (SV *) newAV ();
99 else if (*name == '%')
100 sv = (SV *) newHV ();
101 else
102 sv = NEWSV (0, 0);
103 if (!SvPADBUSY (sv))
104 SvPADMY_on (sv);
105 npad[ix] = sv;
106 }
107 } 572 }
108 else if (IS_PADGV (ppad[ix]) || IS_PADCONST (ppad[ix]))
109 {
110 npad[ix] = SvREFCNT_inc (ppad[ix]);
111 }
112 else
113 {
114 SV *sv = NEWSV (0, 0);
115 SvPADTMP_on (sv);
116 npad[ix] = sv;
117 }
118 }
119 573
120#if 0 /* NONOTUNDERSTOOD */ 574 SvREFCNT_dec (AvARRAY (padlist)[0]);
121 /* Now that vars are all in place, clone nested closures. */
122 575
123 for (ix = fpad; ix > 0; ix--) {
124 SV* namesv = (ix <= fname) ? pname[ix] : Nullsv;
125 if (namesv
126 && namesv != &PL_sv_undef
127 && !(SvFLAGS(namesv) & SVf_FAKE)
128 && *SvPVX(namesv) == '&'
129 && CvCLONE(ppad[ix]))
130 {
131 CV *kid = cv_clone((CV*)ppad[ix]);
132 SvREFCNT_dec(ppad[ix]);
133 CvCLONE_on(kid);
134 SvPADMY_on(kid);
135 npad[ix] = (SV*)kid;
136 }
137 }
138#endif
139
140 return newpadlist;
141}
142
143STATIC AV *
144free_padlist (AV *padlist)
145{
146 /* may be during global destruction */
147 if (SvREFCNT(padlist))
148 {
149 I32 i = AvFILLp(padlist); 576 AvFILLp (padlist) = -1;
150 while (i >= 0)
151 {
152 SV **svp = av_fetch(padlist, i--, FALSE);
153 SV *sv = svp ? *svp : Nullsv;
154 if (sv)
155 SvREFCNT_dec(sv);
156 }
157
158 SvREFCNT_dec((SV*)padlist); 577 SvREFCNT_dec ((SV*)padlist);
578 }
579}
580
581static int
582coro_cv_free (pTHX_ SV *sv, MAGIC *mg)
583{
584 AV *padlist;
585 AV *av = (AV *)mg->mg_obj;
586
587 /* perl manages to free our internal AV and _then_ call us */
588 if (IN_DESTRUCT)
589 return 0;
590
591 /* casting is fun. */
592 while (&PL_sv_undef != (SV *)(padlist = (AV *)av_pop (av)))
593 free_padlist (aTHX_ padlist);
594
595 SvREFCNT_dec (av); /* sv_magicext increased the refcount */
596
597 return 0;
598}
599
600static MGVTBL coro_cv_vtbl = {
601 0, 0, 0, 0,
602 coro_cv_free
603};
604
605/* the next two functions merely cache the padlists */
606ecb_inline void
607get_padlist (pTHX_ CV *cv)
608{
609 MAGIC *mg = CORO_MAGIC_cv (cv);
610 AV *av;
611
612 if (ecb_expect_true (mg && AvFILLp ((av = (AV *)mg->mg_obj)) >= 0))
613 CvPADLIST (cv) = (AV *)AvARRAY (av)[AvFILLp (av)--];
614 else
615 {
616#if CORO_PREFER_PERL_FUNCTIONS
617 /* this is probably cleaner? but also slower! */
618 /* in practise, it seems to be less stable */
619 CV *cp = Perl_cv_clone (aTHX_ cv);
620 CvPADLIST (cv) = CvPADLIST (cp);
621 CvPADLIST (cp) = 0;
622 SvREFCNT_dec (cp);
623#else
624 CvPADLIST (cv) = coro_derive_padlist (aTHX_ cv);
625#endif
626 }
627}
628
629ecb_inline void
630put_padlist (pTHX_ CV *cv)
631{
632 MAGIC *mg = CORO_MAGIC_cv (cv);
633 AV *av;
634
635 if (ecb_expect_false (!mg))
636 mg = sv_magicext ((SV *)cv, (SV *)newAV (), CORO_MAGIC_type_cv, &coro_cv_vtbl, 0, 0);
637
638 av = (AV *)mg->mg_obj;
639
640 if (ecb_expect_false (AvFILLp (av) >= AvMAX (av)))
641 av_extend (av, AvFILLp (av) + 1);
642
643 AvARRAY (av)[++AvFILLp (av)] = (SV *)CvPADLIST (cv);
644}
645
646/** load & save, init *******************************************************/
647
648ecb_inline void
649swap_sv (SV *a, SV *b)
650{
651 const U32 keep = SVs_PADSTALE | SVs_PADTMP | SVs_PADMY; /* keep these flags */
652 SV tmp;
653
654 /* swap sv_any */
655 SvANY (&tmp) = SvANY (a); SvANY (a) = SvANY (b); SvANY (b) = SvANY (&tmp);
656
657 /* swap sv_flags */
658 SvFLAGS (&tmp) = SvFLAGS (a);
659 SvFLAGS (a) = (SvFLAGS (a) & keep) | (SvFLAGS (b ) & ~keep);
660 SvFLAGS (b) = (SvFLAGS (b) & keep) | (SvFLAGS (&tmp) & ~keep);
661
662#if PERL_VERSION_ATLEAST (5,10,0)
663 /* perl 5.10 and later complicates this _quite_ a bit, but it also
664 * is much faster, so no quarrels here. alternatively, we could
665 * sv_upgrade to avoid this.
666 */
667 {
668 /* swap sv_u */
669 tmp.sv_u = a->sv_u; a->sv_u = b->sv_u; b->sv_u = tmp.sv_u;
670
671 /* if SvANY points to the head, we need to adjust the pointers,
672 * as the pointer for a still points to b, and maybe vice versa.
673 */
674 #define svany_in_head(type) \
675 (((1 << SVt_NULL) | (1 << SVt_BIND) | (1 << SVt_IV) | (1 << SVt_RV)) & (1 << (type)))
676
677 if (svany_in_head (SvTYPE (a)))
678 SvANY (a) = (void *)((PTRV)SvANY (a) - (PTRV)b + (PTRV)a);
679
680 if (svany_in_head (SvTYPE (b)))
681 SvANY (b) = (void *)((PTRV)SvANY (b) - (PTRV)a + (PTRV)b);
159 } 682 }
683#endif
160} 684}
161 685
162/* the next tow functions merely cache the padlists */ 686/* swap sv heads, at least logically */
163STATIC void 687static void
164get_padlist (CV *cv) 688swap_svs (pTHX_ Coro__State c)
165{ 689{
166 SV **he = hv_fetch (padlist_cache, (void *)&cv, sizeof (CV *), 0); 690 int i;
167 691
168 if (he && AvFILLp ((AV *)*he) >= 0) 692 for (i = 0; i <= AvFILLp (c->swap_sv); i += 2)
169 CvPADLIST (cv) = (AV *)av_pop ((AV *)*he); 693 swap_sv (AvARRAY (c->swap_sv)[i], AvARRAY (c->swap_sv)[i + 1]);
170 else
171 CvPADLIST (cv) = clone_padlist (CvPADLIST (cv));
172} 694}
173 695
174STATIC void 696#define SWAP_SVS(coro) \
175put_padlist (CV *cv) 697 if (ecb_expect_false ((coro)->swap_sv)) \
176{ 698 swap_svs (aTHX_ (coro))
177 SV **he = hv_fetch (padlist_cache, (void *)&cv, sizeof (CV *), 1);
178 699
179 if (SvTYPE (*he) != SVt_PVAV) 700static void
701on_enterleave_call (pTHX_ SV *cb);
702
703static void
704load_perl (pTHX_ Coro__State c)
705{
706 perl_slots *slot = c->slot;
707 c->slot = 0;
708
709 PL_mainstack = c->mainstack;
710
711#if CORO_JIT
712 load_perl_slots (slot);
713#else
714 #define VARx(name,expr,type) expr = slot->name;
715 # include "state.h"
716 #undef VARx
717#endif
718
719 {
720 dSP;
721
722 CV *cv;
723
724 /* now do the ugly restore mess */
725 while (ecb_expect_true (cv = (CV *)POPs))
180 { 726 {
181 SvREFCNT_dec (*he); 727 put_padlist (aTHX_ cv); /* mark this padlist as available */
182 *he = (SV *)newAV (); 728 CvDEPTH (cv) = PTR2IV (POPs);
729 CvPADLIST (cv) = (AV *)POPs;
183 } 730 }
184 731
185 av_push ((AV *)*he, (SV *)CvPADLIST (cv)); 732 PUTBACK;
186} 733 }
187 734
735 slf_frame = c->slf_frame;
736 CORO_THROW = c->except;
737
738 if (ecb_expect_false (enable_times))
739 {
740 if (ecb_expect_false (!times_valid))
741 coro_times_update ();
742
743 coro_times_sub (c);
744 }
745
746 if (ecb_expect_false (c->on_enter))
747 {
748 int i;
749
750 for (i = 0; i <= AvFILLp (c->on_enter); ++i)
751 on_enterleave_call (aTHX_ AvARRAY (c->on_enter)[i]);
752 }
753
754 SWAP_SVS (c);
755}
756
188static void 757static void
189save_state(pTHX_ Coro__State c) 758save_perl (pTHX_ Coro__State c)
190{ 759{
760 SWAP_SVS (c);
761
762 if (ecb_expect_false (c->on_leave))
763 {
764 int i;
765
766 for (i = AvFILLp (c->on_leave); i >= 0; --i)
767 on_enterleave_call (aTHX_ AvARRAY (c->on_leave)[i]);
768 }
769
770 times_valid = 0;
771
772 if (ecb_expect_false (enable_times))
773 {
774 coro_times_update (); times_valid = 1;
775 coro_times_add (c);
776 }
777
778 c->except = CORO_THROW;
779 c->slf_frame = slf_frame;
780
191 { 781 {
192 dSP; 782 dSP;
193 I32 cxix = cxstack_ix; 783 I32 cxix = cxstack_ix;
784 PERL_CONTEXT *ccstk = cxstack;
194 PERL_SI *top_si = PL_curstackinfo; 785 PERL_SI *top_si = PL_curstackinfo;
195 PERL_CONTEXT *ccstk = cxstack;
196 786
197 /* 787 /*
198 * the worst thing you can imagine happens first - we have to save 788 * the worst thing you can imagine happens first - we have to save
199 * (and reinitialize) all cv's in the whole callchain :( 789 * (and reinitialize) all cv's in the whole callchain :(
200 */ 790 */
201 791
202 PUSHs (Nullsv); 792 XPUSHs (Nullsv);
203 /* this loop was inspired by pp_caller */ 793 /* this loop was inspired by pp_caller */
204 for (;;) 794 for (;;)
205 { 795 {
206 while (cxix >= 0) 796 while (ecb_expect_true (cxix >= 0))
207 { 797 {
208 PERL_CONTEXT *cx = &ccstk[cxix--]; 798 PERL_CONTEXT *cx = &ccstk[cxix--];
209 799
210 if (CxTYPE(cx) == CXt_SUB) 800 if (ecb_expect_true (CxTYPE (cx) == CXt_SUB) || ecb_expect_false (CxTYPE (cx) == CXt_FORMAT))
211 { 801 {
212 CV *cv = cx->blk_sub.cv; 802 CV *cv = cx->blk_sub.cv;
803
213 if (CvDEPTH(cv)) 804 if (ecb_expect_true (CvDEPTH (cv)))
214 { 805 {
215#ifdef USE_THREADS
216 XPUSHs ((SV *)CvOWNER(cv));
217#endif
218 EXTEND (SP, 3); 806 EXTEND (SP, 3);
219 PUSHs ((SV *)CvDEPTH(cv));
220 PUSHs ((SV *)CvPADLIST(cv)); 807 PUSHs ((SV *)CvPADLIST (cv));
808 PUSHs (INT2PTR (SV *, (IV)CvDEPTH (cv)));
221 PUSHs ((SV *)cv); 809 PUSHs ((SV *)cv);
222 810
223 get_padlist (cv);
224
225 CvDEPTH(cv) = 0; 811 CvDEPTH (cv) = 0;
226#ifdef USE_THREADS 812 get_padlist (aTHX_ cv);
227 CvOWNER(cv) = 0;
228 error must unlock this cv etc.. etc...
229 if you are here wondering about this error message then
230 the reason is that it will not work as advertised yet
231#endif
232 } 813 }
233 } 814 }
234 else if (CxTYPE(cx) == CXt_FORMAT) 815 }
816
817 if (ecb_expect_true (top_si->si_type == PERLSI_MAIN))
818 break;
819
820 top_si = top_si->si_prev;
821 ccstk = top_si->si_cxstack;
822 cxix = top_si->si_cxix;
823 }
824
825 PUTBACK;
826 }
827
828 /* allocate some space on the context stack for our purposes */
829 if (ecb_expect_false (cxstack_ix + (int)SLOT_COUNT >= cxstack_max))
830 {
831 unsigned int i;
832
833 for (i = 0; i < SLOT_COUNT; ++i)
834 CXINC;
835
836 cxstack_ix -= SLOT_COUNT; /* undo allocation */
837 }
838
839 c->mainstack = PL_mainstack;
840
841 {
842 perl_slots *slot = c->slot = (perl_slots *)(cxstack + cxstack_ix + 1);
843
844#if CORO_JIT
845 save_perl_slots (slot);
846#else
847 #define VARx(name,expr,type) slot->name = expr;
848 # include "state.h"
849 #undef VARx
850#endif
851 }
852}
853
854/*
855 * allocate various perl stacks. This is almost an exact copy
856 * of perl.c:init_stacks, except that it uses less memory
857 * on the (sometimes correct) assumption that coroutines do
858 * not usually need a lot of stackspace.
859 */
860#if CORO_PREFER_PERL_FUNCTIONS
861# define coro_init_stacks(thx) init_stacks ()
862#else
863static void
864coro_init_stacks (pTHX)
865{
866 PL_curstackinfo = new_stackinfo(32, 4 + SLOT_COUNT); /* 3 is minimum due to perl rounding down in scope.c:GROW() */
867 PL_curstackinfo->si_type = PERLSI_MAIN;
868 PL_curstack = PL_curstackinfo->si_stack;
869 PL_mainstack = PL_curstack; /* remember in case we switch stacks */
870
871 PL_stack_base = AvARRAY(PL_curstack);
872 PL_stack_sp = PL_stack_base;
873 PL_stack_max = PL_stack_base + AvMAX(PL_curstack);
874
875 New(50,PL_tmps_stack,32,SV*);
876 PL_tmps_floor = -1;
877 PL_tmps_ix = -1;
878 PL_tmps_max = 32;
879
880 New(54,PL_markstack,16,I32);
881 PL_markstack_ptr = PL_markstack;
882 PL_markstack_max = PL_markstack + 16;
883
884#ifdef SET_MARK_OFFSET
885 SET_MARK_OFFSET;
886#endif
887
888 New(54,PL_scopestack,8,I32);
889 PL_scopestack_ix = 0;
890 PL_scopestack_max = 8;
891#if HAS_SCOPESTACK_NAME
892 New(54,PL_scopestack_name,8,const char*);
893#endif
894
895 New(54,PL_savestack,24,ANY);
896 PL_savestack_ix = 0;
897 PL_savestack_max = 24;
898
899#if !PERL_VERSION_ATLEAST (5,10,0)
900 New(54,PL_retstack,4,OP*);
901 PL_retstack_ix = 0;
902 PL_retstack_max = 4;
903#endif
904}
905#endif
906
907/*
908 * destroy the stacks, the callchain etc...
909 */
910static void
911coro_destruct_stacks (pTHX)
912{
913 while (PL_curstackinfo->si_next)
914 PL_curstackinfo = PL_curstackinfo->si_next;
915
916 while (PL_curstackinfo)
917 {
918 PERL_SI *p = PL_curstackinfo->si_prev;
919
920 if (!IN_DESTRUCT)
921 SvREFCNT_dec (PL_curstackinfo->si_stack);
922
923 Safefree (PL_curstackinfo->si_cxstack);
924 Safefree (PL_curstackinfo);
925 PL_curstackinfo = p;
926 }
927
928 Safefree (PL_tmps_stack);
929 Safefree (PL_markstack);
930 Safefree (PL_scopestack);
931#if HAS_SCOPESTACK_NAME
932 Safefree (PL_scopestack_name);
933#endif
934 Safefree (PL_savestack);
935#if !PERL_VERSION_ATLEAST (5,10,0)
936 Safefree (PL_retstack);
937#endif
938}
939
940#define CORO_RSS \
941 rss += sizeof (SYM (curstackinfo)); \
942 rss += (SYM (curstackinfo->si_cxmax) + 1) * sizeof (PERL_CONTEXT); \
943 rss += sizeof (SV) + sizeof (struct xpvav) + (1 + AvMAX (SYM (curstack))) * sizeof (SV *); \
944 rss += SYM (tmps_max) * sizeof (SV *); \
945 rss += (SYM (markstack_max) - SYM (markstack_ptr)) * sizeof (I32); \
946 rss += SYM (scopestack_max) * sizeof (I32); \
947 rss += SYM (savestack_max) * sizeof (ANY);
948
949static size_t
950coro_rss (pTHX_ struct coro *coro)
951{
952 size_t rss = sizeof (*coro);
953
954 if (coro->mainstack)
955 {
956 if (coro->flags & CF_RUNNING)
957 {
958 #define SYM(sym) PL_ ## sym
959 CORO_RSS;
960 #undef SYM
961 }
962 else
963 {
964 #define SYM(sym) coro->slot->sym
965 CORO_RSS;
966 #undef SYM
967 }
968 }
969
970 return rss;
971}
972
973/** coroutine stack handling ************************************************/
974
975static int (*orig_sigelem_get) (pTHX_ SV *sv, MAGIC *mg);
976static int (*orig_sigelem_set) (pTHX_ SV *sv, MAGIC *mg);
977static int (*orig_sigelem_clr) (pTHX_ SV *sv, MAGIC *mg);
978
979/* apparently < 5.8.8 */
980#ifndef MgPV_nolen_const
981#define MgPV_nolen_const(mg) (((((int)(mg)->mg_len)) == HEf_SVKEY) ? \
982 SvPV_nolen((SV*)((mg)->mg_ptr)) : \
983 (const char*)(mg)->mg_ptr)
984#endif
985
986/*
987 * This overrides the default magic get method of %SIG elements.
988 * The original one doesn't provide for reading back of PL_diehook/PL_warnhook
989 * and instead of trying to save and restore the hash elements (extremely slow),
990 * we just provide our own readback here.
991 */
992static int ecb_cold
993coro_sigelem_get (pTHX_ SV *sv, MAGIC *mg)
994{
995 const char *s = MgPV_nolen_const (mg);
996
997 if (*s == '_')
998 {
999 SV **svp = 0;
1000
1001 if (strEQ (s, "__DIE__" )) svp = &PL_diehook;
1002 if (strEQ (s, "__WARN__")) svp = &PL_warnhook;
1003
1004 if (svp)
1005 {
1006 SV *ssv;
1007
1008 if (!*svp)
1009 ssv = &PL_sv_undef;
1010 else if (SvTYPE (*svp) == SVt_PVCV) /* perlio directly stores a CV in warnhook. ugh. */
1011 ssv = sv_2mortal (newRV_inc (*svp));
1012 else
1013 ssv = *svp;
1014
1015 sv_setsv (sv, ssv);
1016 return 0;
1017 }
1018 }
1019
1020 return orig_sigelem_get ? orig_sigelem_get (aTHX_ sv, mg) : 0;
1021}
1022
1023static int ecb_cold
1024coro_sigelem_clr (pTHX_ SV *sv, MAGIC *mg)
1025{
1026 const char *s = MgPV_nolen_const (mg);
1027
1028 if (*s == '_')
1029 {
1030 SV **svp = 0;
1031
1032 if (strEQ (s, "__DIE__" )) svp = &PL_diehook;
1033 if (strEQ (s, "__WARN__")) svp = &PL_warnhook;
1034
1035 if (svp)
1036 {
1037 SV *old = *svp;
1038 *svp = 0;
1039 SvREFCNT_dec (old);
1040 return 0;
1041 }
1042 }
1043
1044 return orig_sigelem_clr ? orig_sigelem_clr (aTHX_ sv, mg) : 0;
1045}
1046
1047static int ecb_cold
1048coro_sigelem_set (pTHX_ SV *sv, MAGIC *mg)
1049{
1050 const char *s = MgPV_nolen_const (mg);
1051
1052 if (*s == '_')
1053 {
1054 SV **svp = 0;
1055
1056 if (strEQ (s, "__DIE__" )) svp = &PL_diehook;
1057 if (strEQ (s, "__WARN__")) svp = &PL_warnhook;
1058
1059 if (svp)
1060 {
1061 SV *old = *svp;
1062 *svp = SvOK (sv) ? newSVsv (sv) : 0;
1063 SvREFCNT_dec (old);
1064 return 0;
1065 }
1066 }
1067
1068 return orig_sigelem_set ? orig_sigelem_set (aTHX_ sv, mg) : 0;
1069}
1070
1071static void
1072prepare_nop (pTHX_ struct coro_transfer_args *ta)
1073{
1074 /* kind of mega-hacky, but works */
1075 ta->next = ta->prev = (struct coro *)ta;
1076}
1077
1078static int
1079slf_check_nop (pTHX_ struct CoroSLF *frame)
1080{
1081 return 0;
1082}
1083
1084static int
1085slf_check_repeat (pTHX_ struct CoroSLF *frame)
1086{
1087 return 1;
1088}
1089
1090static UNOP init_perl_op;
1091
1092ecb_noinline static void /* noinline to keep it out of the transfer fast path */
1093init_perl (pTHX_ struct coro *coro)
1094{
1095 /*
1096 * emulate part of the perl startup here.
1097 */
1098 coro_init_stacks (aTHX);
1099
1100 PL_runops = RUNOPS_DEFAULT;
1101 PL_curcop = &PL_compiling;
1102 PL_in_eval = EVAL_NULL;
1103 PL_comppad = 0;
1104 PL_comppad_name = 0;
1105 PL_comppad_name_fill = 0;
1106 PL_comppad_name_floor = 0;
1107 PL_curpm = 0;
1108 PL_curpad = 0;
1109 PL_localizing = 0;
1110 PL_restartop = 0;
1111#if PERL_VERSION_ATLEAST (5,10,0)
1112 PL_parser = 0;
1113#endif
1114 PL_hints = 0;
1115
1116 /* recreate the die/warn hooks */
1117 PL_diehook = SvREFCNT_inc (rv_diehook);
1118 PL_warnhook = SvREFCNT_inc (rv_warnhook);
1119
1120 GvSV (PL_defgv) = newSV (0);
1121 GvAV (PL_defgv) = coro->args; coro->args = 0;
1122 GvSV (PL_errgv) = newSV (0);
1123 GvSV (irsgv) = newSVpvn ("\n", 1); sv_magic (GvSV (irsgv), (SV *)irsgv, PERL_MAGIC_sv, "/", 0);
1124 GvHV (PL_hintgv) = 0;
1125 PL_rs = newSVsv (GvSV (irsgv));
1126 PL_defoutgv = (GV *)SvREFCNT_inc_NN (stdoutgv);
1127
1128 {
1129 dSP;
1130 UNOP myop;
1131
1132 Zero (&myop, 1, UNOP);
1133 myop.op_next = Nullop;
1134 myop.op_type = OP_ENTERSUB;
1135 myop.op_flags = OPf_WANT_VOID;
1136
1137 PUSHMARK (SP);
1138 PUSHs ((SV *)coro->startcv);
1139 PUTBACK;
1140 PL_op = (OP *)&myop;
1141 PL_op = PL_ppaddr[OP_ENTERSUB](aTHX);
1142 }
1143
1144 /* this newly created coroutine might be run on an existing cctx which most
1145 * likely was suspended in pp_slf, so we have to emulate entering pp_slf here.
1146 */
1147 slf_frame.prepare = prepare_nop; /* provide a nop function for an eventual pp_slf */
1148 slf_frame.check = slf_check_nop; /* signal pp_slf to not repeat */
1149 slf_frame.destroy = 0;
1150
1151 /* and we have to provide the pp_slf op in any case, so pp_slf can skip it */
1152 init_perl_op.op_next = PL_op;
1153 init_perl_op.op_type = OP_ENTERSUB;
1154 init_perl_op.op_ppaddr = pp_slf;
1155 /* no flags etc. required, as an init function won't be called */
1156
1157 PL_op = (OP *)&init_perl_op;
1158
1159 /* copy throw, in case it was set before init_perl */
1160 CORO_THROW = coro->except;
1161
1162 SWAP_SVS (coro);
1163
1164 if (ecb_expect_false (enable_times))
1165 {
1166 coro_times_update ();
1167 coro_times_sub (coro);
1168 }
1169}
1170
1171static void
1172coro_unwind_stacks (pTHX)
1173{
1174 if (!IN_DESTRUCT)
1175 {
1176 /* restore all saved variables and stuff */
1177 LEAVE_SCOPE (0);
1178 assert (PL_tmps_floor == -1);
1179
1180 /* free all temporaries */
1181 FREETMPS;
1182 assert (PL_tmps_ix == -1);
1183
1184 /* unwind all extra stacks */
1185 POPSTACK_TO (PL_mainstack);
1186
1187 /* unwind main stack */
1188 dounwind (-1);
1189 }
1190}
1191
1192static void
1193destroy_perl (pTHX_ struct coro *coro)
1194{
1195 SV *svf [9];
1196
1197 {
1198 SV *old_current = SvRV (coro_current);
1199 struct coro *current = SvSTATE (old_current);
1200
1201 assert (("FATAL: tried to destroy currently running coroutine", coro->mainstack != PL_mainstack));
1202
1203 save_perl (aTHX_ current);
1204
1205 /* this will cause transfer_check to croak on block*/
1206 SvRV_set (coro_current, (SV *)coro->hv);
1207
1208 load_perl (aTHX_ coro);
1209
1210 coro_unwind_stacks (aTHX);
1211
1212 /* restore swapped sv's */
1213 SWAP_SVS (coro);
1214
1215 coro_destruct_stacks (aTHX);
1216
1217 /* now save some sv's to be free'd later */
1218 svf [0] = GvSV (PL_defgv);
1219 svf [1] = (SV *)GvAV (PL_defgv);
1220 svf [2] = GvSV (PL_errgv);
1221 svf [3] = (SV *)PL_defoutgv;
1222 svf [4] = PL_rs;
1223 svf [5] = GvSV (irsgv);
1224 svf [6] = (SV *)GvHV (PL_hintgv);
1225 svf [7] = PL_diehook;
1226 svf [8] = PL_warnhook;
1227 assert (9 == sizeof (svf) / sizeof (*svf));
1228
1229 SvRV_set (coro_current, old_current);
1230
1231 load_perl (aTHX_ current);
1232 }
1233
1234 {
1235 unsigned int i;
1236
1237 for (i = 0; i < sizeof (svf) / sizeof (*svf); ++i)
1238 SvREFCNT_dec (svf [i]);
1239
1240 SvREFCNT_dec (coro->saved_deffh);
1241 SvREFCNT_dec (coro->rouse_cb);
1242 SvREFCNT_dec (coro->invoke_cb);
1243 SvREFCNT_dec (coro->invoke_av);
1244 }
1245}
1246
1247ecb_inline void
1248free_coro_mortal (pTHX)
1249{
1250 if (ecb_expect_true (coro_mortal))
1251 {
1252 SvREFCNT_dec ((SV *)coro_mortal);
1253 coro_mortal = 0;
1254 }
1255}
1256
1257static int
1258runops_trace (pTHX)
1259{
1260 COP *oldcop = 0;
1261 int oldcxix = -2;
1262
1263 while ((PL_op = CALL_FPTR (PL_op->op_ppaddr) (aTHX)))
1264 {
1265 PERL_ASYNC_CHECK ();
1266
1267 if (cctx_current->flags & CC_TRACE_ALL)
1268 {
1269 if (PL_op->op_type == OP_LEAVESUB && cctx_current->flags & CC_TRACE_SUB)
1270 {
1271 PERL_CONTEXT *cx = &cxstack[cxstack_ix];
1272 SV **bot, **top;
1273 AV *av = newAV (); /* return values */
1274 SV **cb;
1275 dSP;
1276
1277 GV *gv = CvGV (cx->blk_sub.cv);
1278 SV *fullname = sv_2mortal (newSV (0));
1279 if (isGV (gv))
1280 gv_efullname3 (fullname, gv, 0);
1281
1282 bot = PL_stack_base + cx->blk_oldsp + 1;
1283 top = cx->blk_gimme == G_ARRAY ? SP + 1
1284 : cx->blk_gimme == G_SCALAR ? bot + 1
1285 : bot;
1286
1287 av_extend (av, top - bot);
1288 while (bot < top)
1289 av_push (av, SvREFCNT_inc_NN (*bot++));
1290
1291 PL_runops = RUNOPS_DEFAULT;
1292 ENTER;
1293 SAVETMPS;
1294 EXTEND (SP, 3);
1295 PUSHMARK (SP);
1296 PUSHs (&PL_sv_no);
1297 PUSHs (fullname);
1298 PUSHs (sv_2mortal (newRV_noinc ((SV *)av)));
1299 PUTBACK;
1300 cb = hv_fetch ((HV *)SvRV (coro_current), "_trace_sub_cb", sizeof ("_trace_sub_cb") - 1, 0);
1301 if (cb) call_sv (*cb, G_KEEPERR | G_EVAL | G_VOID | G_DISCARD);
1302 SPAGAIN;
1303 FREETMPS;
1304 LEAVE;
1305 PL_runops = runops_trace;
1306 }
1307
1308 if (oldcop != PL_curcop)
1309 {
1310 oldcop = PL_curcop;
1311
1312 if (PL_curcop != &PL_compiling)
1313 {
1314 SV **cb;
1315
1316 if (oldcxix != cxstack_ix && cctx_current->flags & CC_TRACE_SUB)
1317 {
1318 PERL_CONTEXT *cx = &cxstack[cxstack_ix];
1319
1320 if (CxTYPE (cx) == CXt_SUB && oldcxix < cxstack_ix)
1321 {
1322 dSP;
1323 GV *gv = CvGV (cx->blk_sub.cv);
1324 SV *fullname = sv_2mortal (newSV (0));
1325
1326 if (isGV (gv))
1327 gv_efullname3 (fullname, gv, 0);
1328
1329 PL_runops = RUNOPS_DEFAULT;
1330 ENTER;
1331 SAVETMPS;
1332 EXTEND (SP, 3);
1333 PUSHMARK (SP);
1334 PUSHs (&PL_sv_yes);
1335 PUSHs (fullname);
1336 PUSHs (CxHASARGS (cx) ? sv_2mortal (newRV_inc ((SV *)cx->blk_sub.argarray)) : &PL_sv_undef);
1337 PUTBACK;
1338 cb = hv_fetch ((HV *)SvRV (coro_current), "_trace_sub_cb", sizeof ("_trace_sub_cb") - 1, 0);
1339 if (cb) call_sv (*cb, G_KEEPERR | G_EVAL | G_VOID | G_DISCARD);
1340 SPAGAIN;
1341 FREETMPS;
1342 LEAVE;
1343 PL_runops = runops_trace;
1344 }
1345
1346 oldcxix = cxstack_ix;
1347 }
1348
1349 if (cctx_current->flags & CC_TRACE_LINE)
1350 {
1351 dSP;
1352
1353 PL_runops = RUNOPS_DEFAULT;
1354 ENTER;
1355 SAVETMPS;
1356 EXTEND (SP, 3);
1357 PL_runops = RUNOPS_DEFAULT;
1358 PUSHMARK (SP);
1359 PUSHs (sv_2mortal (newSVpv (OutCopFILE (oldcop), 0)));
1360 PUSHs (sv_2mortal (newSViv (CopLINE (oldcop))));
1361 PUTBACK;
1362 cb = hv_fetch ((HV *)SvRV (coro_current), "_trace_line_cb", sizeof ("_trace_line_cb") - 1, 0);
1363 if (cb) call_sv (*cb, G_KEEPERR | G_EVAL | G_VOID | G_DISCARD);
1364 SPAGAIN;
1365 FREETMPS;
1366 LEAVE;
1367 PL_runops = runops_trace;
1368 }
1369 }
1370 }
1371 }
1372 }
1373
1374 TAINT_NOT;
1375 return 0;
1376}
1377
1378static struct CoroSLF cctx_ssl_frame;
1379
1380static void
1381slf_prepare_set_stacklevel (pTHX_ struct coro_transfer_args *ta)
1382{
1383 ta->prev = 0;
1384}
1385
1386static int
1387slf_check_set_stacklevel (pTHX_ struct CoroSLF *frame)
1388{
1389 *frame = cctx_ssl_frame;
1390
1391 return frame->check (aTHX_ frame); /* execute the restored frame - there must be one */
1392}
1393
1394/* initialises PL_top_env and injects a pseudo-slf-call to set the stacklevel */
1395static void ecb_noinline
1396cctx_prepare (pTHX)
1397{
1398 PL_top_env = &PL_start_env;
1399
1400 if (cctx_current->flags & CC_TRACE)
1401 PL_runops = runops_trace;
1402
1403 /* we already must be executing an SLF op, there is no other valid way
1404 * that can lead to creation of a new cctx */
1405 assert (("FATAL: can't prepare slf-less cctx in Coro module (please report)",
1406 slf_frame.prepare && PL_op->op_ppaddr == pp_slf));
1407
1408 /* we must emulate leaving pp_slf, which is done inside slf_check_set_stacklevel */
1409 cctx_ssl_frame = slf_frame;
1410
1411 slf_frame.prepare = slf_prepare_set_stacklevel;
1412 slf_frame.check = slf_check_set_stacklevel;
1413}
1414
1415/* the tail of transfer: execute stuff we can only do after a transfer */
1416ecb_inline void
1417transfer_tail (pTHX)
1418{
1419 free_coro_mortal (aTHX);
1420}
1421
1422/*
1423 * this is a _very_ stripped down perl interpreter ;)
1424 */
1425static void
1426cctx_run (void *arg)
1427{
1428#ifdef USE_ITHREADS
1429# if CORO_PTHREAD
1430 PERL_SET_CONTEXT (coro_thx);
1431# endif
1432#endif
1433 {
1434 dTHX;
1435
1436 /* normally we would need to skip the entersub here */
1437 /* not doing so will re-execute it, which is exactly what we want */
1438 /* PL_nop = PL_nop->op_next */
1439
1440 /* inject a fake subroutine call to cctx_init */
1441 cctx_prepare (aTHX);
1442
1443 /* cctx_run is the alternative tail of transfer() */
1444 transfer_tail (aTHX);
1445
1446 /* somebody or something will hit me for both perl_run and PL_restartop */
1447 PL_restartop = PL_op;
1448 perl_run (PL_curinterp);
1449 /*
1450 * Unfortunately, there is no way to get at the return values of the
1451 * coro body here, as perl_run destroys these. Likewise, we cannot catch
1452 * runtime errors here, as this is just a random interpreter, not a thread.
1453 */
1454
1455 /*
1456 * If perl-run returns we assume exit() was being called or the coro
1457 * fell off the end, which seems to be the only valid (non-bug)
1458 * reason for perl_run to return. We try to exit by jumping to the
1459 * bootstrap-time "top" top_env, as we cannot restore the "main"
1460 * coroutine as Coro has no such concept.
1461 * This actually isn't valid with the pthread backend, but OSes requiring
1462 * that backend are too broken to do it in a standards-compliant way.
1463 */
1464 PL_top_env = main_top_env;
1465 JMPENV_JUMP (2); /* I do not feel well about the hardcoded 2 at all */
1466 }
1467}
1468
1469static coro_cctx *
1470cctx_new (void)
1471{
1472 coro_cctx *cctx;
1473
1474 ++cctx_count;
1475 New (0, cctx, 1, coro_cctx);
1476
1477 cctx->gen = cctx_gen;
1478 cctx->flags = 0;
1479 cctx->idle_sp = 0; /* can be accessed by transfer between cctx_run and set_stacklevel, on throw */
1480
1481 return cctx;
1482}
1483
1484/* create a new cctx only suitable as source */
1485static coro_cctx *
1486cctx_new_empty (void)
1487{
1488 coro_cctx *cctx = cctx_new ();
1489
1490 cctx->sptr = 0;
1491 coro_create (&cctx->cctx, 0, 0, 0, 0);
1492
1493 return cctx;
1494}
1495
1496/* create a new cctx suitable as destination/running a perl interpreter */
1497static coro_cctx *
1498cctx_new_run (void)
1499{
1500 coro_cctx *cctx = cctx_new ();
1501 void *stack_start;
1502 size_t stack_size;
1503
1504#if HAVE_MMAP
1505 cctx->ssize = ((cctx_stacksize * sizeof (long) + PAGESIZE - 1) / PAGESIZE + CORO_STACKGUARD) * PAGESIZE;
1506 /* mmap supposedly does allocate-on-write for us */
1507 cctx->sptr = mmap (0, cctx->ssize, PROT_READ | PROT_WRITE | PROT_EXEC, MAP_ANONYMOUS, 0, 0);
1508
1509 if (cctx->sptr != (void *)-1)
1510 {
1511 #if CORO_STACKGUARD
1512 mprotect (cctx->sptr, CORO_STACKGUARD * PAGESIZE, PROT_NONE);
1513 #endif
1514 stack_start = (char *)cctx->sptr + CORO_STACKGUARD * PAGESIZE;
1515 stack_size = cctx->ssize - CORO_STACKGUARD * PAGESIZE;
1516 cctx->flags |= CC_MAPPED;
1517 }
1518 else
1519#endif
1520 {
1521 cctx->ssize = cctx_stacksize * (long)sizeof (long);
1522 New (0, cctx->sptr, cctx_stacksize, long);
1523
1524 if (!cctx->sptr)
1525 {
1526 perror ("FATAL: unable to allocate stack for coroutine, exiting.");
1527 _exit (EXIT_FAILURE);
1528 }
1529
1530 stack_start = cctx->sptr;
1531 stack_size = cctx->ssize;
1532 }
1533
1534 #if CORO_USE_VALGRIND
1535 cctx->valgrind_id = VALGRIND_STACK_REGISTER ((char *)stack_start, (char *)stack_start + stack_size);
1536 #endif
1537
1538 coro_create (&cctx->cctx, cctx_run, (void *)cctx, stack_start, stack_size);
1539
1540 return cctx;
1541}
1542
1543static void
1544cctx_destroy (coro_cctx *cctx)
1545{
1546 if (!cctx)
1547 return;
1548
1549 assert (("FATAL: tried to destroy current cctx", cctx != cctx_current));
1550
1551 --cctx_count;
1552 coro_destroy (&cctx->cctx);
1553
1554 /* coro_transfer creates new, empty cctx's */
1555 if (cctx->sptr)
1556 {
1557 #if CORO_USE_VALGRIND
1558 VALGRIND_STACK_DEREGISTER (cctx->valgrind_id);
1559 #endif
1560
1561#if HAVE_MMAP
1562 if (cctx->flags & CC_MAPPED)
1563 munmap (cctx->sptr, cctx->ssize);
1564 else
1565#endif
1566 Safefree (cctx->sptr);
1567 }
1568
1569 Safefree (cctx);
1570}
1571
1572/* wether this cctx should be destructed */
1573#define CCTX_EXPIRED(cctx) ((cctx)->gen != cctx_gen || ((cctx)->flags & CC_NOREUSE))
1574
1575static coro_cctx *
1576cctx_get (pTHX)
1577{
1578 while (ecb_expect_true (cctx_first))
1579 {
1580 coro_cctx *cctx = cctx_first;
1581 cctx_first = cctx->next;
1582 --cctx_idle;
1583
1584 if (ecb_expect_true (!CCTX_EXPIRED (cctx)))
1585 return cctx;
1586
1587 cctx_destroy (cctx);
1588 }
1589
1590 return cctx_new_run ();
1591}
1592
1593static void
1594cctx_put (coro_cctx *cctx)
1595{
1596 assert (("FATAL: cctx_put called on non-initialised cctx in Coro (please report)", cctx->sptr));
1597
1598 /* free another cctx if overlimit */
1599 if (ecb_expect_false (cctx_idle >= cctx_max_idle))
1600 {
1601 coro_cctx *first = cctx_first;
1602 cctx_first = first->next;
1603 --cctx_idle;
1604
1605 cctx_destroy (first);
1606 }
1607
1608 ++cctx_idle;
1609 cctx->next = cctx_first;
1610 cctx_first = cctx;
1611}
1612
1613/** coroutine switching *****************************************************/
1614
1615static void
1616transfer_check (pTHX_ struct coro *prev, struct coro *next)
1617{
1618 /* TODO: throwing up here is considered harmful */
1619
1620 if (ecb_expect_true (prev != next))
1621 {
1622 if (ecb_expect_false (!(prev->flags & (CF_RUNNING | CF_NEW))))
1623 croak ("Coro::State::transfer called with a blocked prev Coro::State, but can only transfer from running or new states,");
1624
1625 if (ecb_expect_false (next->flags & (CF_RUNNING | CF_ZOMBIE | CF_SUSPENDED)))
1626 croak ("Coro::State::transfer called with running, destroyed or suspended next Coro::State, but can only transfer to inactive states,");
1627
1628#if !PERL_VERSION_ATLEAST (5,10,0)
1629 if (ecb_expect_false (PL_lex_state != LEX_NOTPARSING))
1630 croak ("Coro::State::transfer called while parsing, but this is not supported in your perl version,");
1631#endif
1632 }
1633}
1634
1635/* always use the TRANSFER macro */
1636static void ecb_noinline /* noinline so we have a fixed stackframe */
1637transfer (pTHX_ struct coro *prev, struct coro *next, int force_cctx)
1638{
1639 dSTACKLEVEL;
1640
1641 /* sometimes transfer is only called to set idle_sp */
1642 if (ecb_expect_false (!prev))
1643 {
1644 cctx_current->idle_sp = STACKLEVEL;
1645 assert (cctx_current->idle_te = PL_top_env); /* just for the side-effect when asserts are enabled */
1646 }
1647 else if (ecb_expect_true (prev != next))
1648 {
1649 coro_cctx *cctx_prev;
1650
1651 if (ecb_expect_false (prev->flags & CF_NEW))
1652 {
1653 /* create a new empty/source context */
1654 prev->flags &= ~CF_NEW;
1655 prev->flags |= CF_RUNNING;
1656 }
1657
1658 prev->flags &= ~CF_RUNNING;
1659 next->flags |= CF_RUNNING;
1660
1661 /* first get rid of the old state */
1662 save_perl (aTHX_ prev);
1663
1664 if (ecb_expect_false (next->flags & CF_NEW))
1665 {
1666 /* need to start coroutine */
1667 next->flags &= ~CF_NEW;
1668 /* setup coroutine call */
1669 init_perl (aTHX_ next);
1670 }
1671 else
1672 load_perl (aTHX_ next);
1673
1674 /* possibly untie and reuse the cctx */
1675 if (ecb_expect_true (
1676 cctx_current->idle_sp == STACKLEVEL
1677 && !(cctx_current->flags & CC_TRACE)
1678 && !force_cctx
1679 ))
1680 {
1681 /* I assume that stacklevel is a stronger indicator than PL_top_env changes */
1682 assert (("FATAL: current top_env must equal previous top_env in Coro (please report)", PL_top_env == cctx_current->idle_te));
1683
1684 /* if the cctx is about to be destroyed we need to make sure we won't see it in cctx_get. */
1685 /* without this the next cctx_get might destroy the running cctx while still in use */
1686 if (ecb_expect_false (CCTX_EXPIRED (cctx_current)))
1687 if (ecb_expect_true (!next->cctx))
1688 next->cctx = cctx_get (aTHX);
1689
1690 cctx_put (cctx_current);
1691 }
1692 else
1693 prev->cctx = cctx_current;
1694
1695 ++next->usecount;
1696
1697 cctx_prev = cctx_current;
1698 cctx_current = ecb_expect_false (next->cctx) ? next->cctx : cctx_get (aTHX);
1699
1700 next->cctx = 0;
1701
1702 if (ecb_expect_false (cctx_prev != cctx_current))
1703 {
1704 cctx_prev->top_env = PL_top_env;
1705 PL_top_env = cctx_current->top_env;
1706 coro_transfer (&cctx_prev->cctx, &cctx_current->cctx);
1707 }
1708
1709 transfer_tail (aTHX);
1710 }
1711}
1712
1713#define TRANSFER(ta, force_cctx) transfer (aTHX_ (ta).prev, (ta).next, (force_cctx))
1714#define TRANSFER_CHECK(ta) transfer_check (aTHX_ (ta).prev, (ta).next)
1715
1716/** high level stuff ********************************************************/
1717
1718/* this function is actually Coro, not Coro::State, but we call it from here */
1719/* because it is convenient - but it hasn't been declared yet for that reason */
1720static void
1721coro_call_on_destroy (pTHX_ struct coro *coro);
1722
1723static void
1724coro_state_destroy (pTHX_ struct coro *coro)
1725{
1726 if (coro->flags & CF_ZOMBIE)
1727 return;
1728
1729 slf_destroy (aTHX_ coro);
1730
1731 coro->flags |= CF_ZOMBIE;
1732
1733 if (coro->flags & CF_READY)
1734 {
1735 /* reduce nready, as destroying a ready coro effectively unreadies it */
1736 /* alternative: look through all ready queues and remove the coro */
1737 --coro_nready;
1738 }
1739 else
1740 coro->flags |= CF_READY; /* make sure it is NOT put into the readyqueue */
1741
1742 if (coro->next) coro->next->prev = coro->prev;
1743 if (coro->prev) coro->prev->next = coro->next;
1744 if (coro == coro_first) coro_first = coro->next;
1745
1746 if (coro->mainstack
1747 && coro->mainstack != main_mainstack
1748 && coro->slot
1749 && !PL_dirty)
1750 destroy_perl (aTHX_ coro);
1751
1752 cctx_destroy (coro->cctx);
1753 SvREFCNT_dec (coro->startcv);
1754 SvREFCNT_dec (coro->args);
1755 SvREFCNT_dec (coro->swap_sv);
1756 SvREFCNT_dec (CORO_THROW);
1757
1758 coro_call_on_destroy (aTHX_ coro);
1759
1760 /* more destruction mayhem in coro_state_free */
1761}
1762
1763static int
1764coro_state_free (pTHX_ SV *sv, MAGIC *mg)
1765{
1766 struct coro *coro = (struct coro *)mg->mg_ptr;
1767 mg->mg_ptr = 0;
1768
1769 coro_state_destroy (aTHX_ coro);
1770 SvREFCNT_dec (coro->on_destroy);
1771 SvREFCNT_dec (coro->status);
1772
1773 Safefree (coro);
1774
1775 return 0;
1776}
1777
1778static int ecb_cold
1779coro_state_dup (pTHX_ MAGIC *mg, CLONE_PARAMS *params)
1780{
1781 /* called when perl clones the current process the slow way (windows process emulation) */
1782 /* WE SIMply nuke the pointers in the copy, causing perl to croak */
1783 mg->mg_ptr = 0;
1784 mg->mg_virtual = 0;
1785
1786 return 0;
1787}
1788
1789static MGVTBL coro_state_vtbl = {
1790 0, 0, 0, 0,
1791 coro_state_free,
1792 0,
1793#ifdef MGf_DUP
1794 coro_state_dup,
1795#else
1796# define MGf_DUP 0
1797#endif
1798};
1799
1800static void
1801prepare_transfer (pTHX_ struct coro_transfer_args *ta, SV *prev_sv, SV *next_sv)
1802{
1803 ta->prev = SvSTATE (prev_sv);
1804 ta->next = SvSTATE (next_sv);
1805 TRANSFER_CHECK (*ta);
1806}
1807
1808static void
1809api_transfer (pTHX_ SV *prev_sv, SV *next_sv)
1810{
1811 struct coro_transfer_args ta;
1812
1813 prepare_transfer (aTHX_ &ta, prev_sv, next_sv);
1814 TRANSFER (ta, 1);
1815}
1816
1817/** Coro ********************************************************************/
1818
1819ecb_inline void
1820coro_enq (pTHX_ struct coro *coro)
1821{
1822 struct coro **ready = coro_ready [coro->prio - CORO_PRIO_MIN];
1823
1824 SvREFCNT_inc_NN (coro->hv);
1825
1826 coro->next_ready = 0;
1827 *(ready [0] ? &ready [1]->next_ready : &ready [0]) = coro;
1828 ready [1] = coro;
1829}
1830
1831ecb_inline struct coro *
1832coro_deq (pTHX)
1833{
1834 int prio;
1835
1836 for (prio = CORO_PRIO_MAX - CORO_PRIO_MIN + 1; --prio >= 0; )
1837 {
1838 struct coro **ready = coro_ready [prio];
1839
1840 if (ready [0])
1841 {
1842 struct coro *coro = ready [0];
1843 ready [0] = coro->next_ready;
1844 return coro;
1845 }
1846 }
1847
1848 return 0;
1849}
1850
1851static void
1852invoke_sv_ready_hook_helper (void)
1853{
1854 dTHX;
1855 dSP;
1856
1857 ENTER;
1858 SAVETMPS;
1859
1860 PUSHMARK (SP);
1861 PUTBACK;
1862 call_sv (coro_readyhook, G_VOID | G_DISCARD);
1863
1864 FREETMPS;
1865 LEAVE;
1866}
1867
1868static int
1869api_ready (pTHX_ SV *coro_sv)
1870{
1871 struct coro *coro = SvSTATE (coro_sv);
1872
1873 if (coro->flags & CF_READY)
1874 return 0;
1875
1876 coro->flags |= CF_READY;
1877
1878 coro_enq (aTHX_ coro);
1879
1880 if (!coro_nready++)
1881 if (coroapi.readyhook)
1882 coroapi.readyhook ();
1883
1884 return 1;
1885}
1886
1887static int
1888api_is_ready (pTHX_ SV *coro_sv)
1889{
1890 return !!(SvSTATE (coro_sv)->flags & CF_READY);
1891}
1892
1893/* expects to own a reference to next->hv */
1894ecb_inline void
1895prepare_schedule_to (pTHX_ struct coro_transfer_args *ta, struct coro *next)
1896{
1897 SV *prev_sv = SvRV (coro_current);
1898
1899 ta->prev = SvSTATE_hv (prev_sv);
1900 ta->next = next;
1901
1902 TRANSFER_CHECK (*ta);
1903
1904 SvRV_set (coro_current, (SV *)next->hv);
1905
1906 free_coro_mortal (aTHX);
1907 coro_mortal = prev_sv;
1908}
1909
1910static void
1911prepare_schedule (pTHX_ struct coro_transfer_args *ta)
1912{
1913 for (;;)
1914 {
1915 struct coro *next = coro_deq (aTHX);
1916
1917 if (ecb_expect_true (next))
1918 {
1919 /* cannot transfer to destroyed coros, skip and look for next */
1920 if (ecb_expect_false (next->flags & (CF_ZOMBIE | CF_SUSPENDED)))
1921 SvREFCNT_dec (next->hv); /* coro_nready has already been taken care of by destroy */
1922 else
1923 {
1924 next->flags &= ~CF_READY;
1925 --coro_nready;
1926
1927 prepare_schedule_to (aTHX_ ta, next);
1928 break;
1929 }
1930 }
1931 else
1932 {
1933 /* nothing to schedule: call the idle handler */
1934 if (SvROK (sv_idle)
1935 && SvOBJECT (SvRV (sv_idle)))
1936 {
1937 if (SvRV (sv_idle) == SvRV (coro_current))
1938 croak ("FATAL: $Coro::IDLE blocked itself - did you try to block inside an event loop callback? Caught");
1939
1940 ++coro_nready; /* hack so that api_ready doesn't invoke ready hook */
1941 api_ready (aTHX_ SvRV (sv_idle));
1942 --coro_nready;
1943 }
1944 else
1945 {
1946 /* TODO: deprecated, remove, cannot work reliably *//*D*/
1947 dSP;
1948
1949 ENTER;
1950 SAVETMPS;
1951
1952 PUSHMARK (SP);
1953 PUTBACK;
1954 call_sv (sv_idle, G_VOID | G_DISCARD);
1955
1956 FREETMPS;
1957 LEAVE;
1958 }
1959 }
1960 }
1961}
1962
1963ecb_inline void
1964prepare_cede (pTHX_ struct coro_transfer_args *ta)
1965{
1966 api_ready (aTHX_ coro_current);
1967 prepare_schedule (aTHX_ ta);
1968}
1969
1970ecb_inline void
1971prepare_cede_notself (pTHX_ struct coro_transfer_args *ta)
1972{
1973 SV *prev = SvRV (coro_current);
1974
1975 if (coro_nready)
1976 {
1977 prepare_schedule (aTHX_ ta);
1978 api_ready (aTHX_ prev);
1979 }
1980 else
1981 prepare_nop (aTHX_ ta);
1982}
1983
1984static void
1985api_schedule (pTHX)
1986{
1987 struct coro_transfer_args ta;
1988
1989 prepare_schedule (aTHX_ &ta);
1990 TRANSFER (ta, 1);
1991}
1992
1993static void
1994api_schedule_to (pTHX_ SV *coro_sv)
1995{
1996 struct coro_transfer_args ta;
1997 struct coro *next = SvSTATE (coro_sv);
1998
1999 SvREFCNT_inc_NN (coro_sv);
2000 prepare_schedule_to (aTHX_ &ta, next);
2001}
2002
2003static int
2004api_cede (pTHX)
2005{
2006 struct coro_transfer_args ta;
2007
2008 prepare_cede (aTHX_ &ta);
2009
2010 if (ecb_expect_true (ta.prev != ta.next))
2011 {
2012 TRANSFER (ta, 1);
2013 return 1;
2014 }
2015 else
2016 return 0;
2017}
2018
2019static int
2020api_cede_notself (pTHX)
2021{
2022 if (coro_nready)
2023 {
2024 struct coro_transfer_args ta;
2025
2026 prepare_cede_notself (aTHX_ &ta);
2027 TRANSFER (ta, 1);
2028 return 1;
2029 }
2030 else
2031 return 0;
2032}
2033
2034static void
2035api_trace (pTHX_ SV *coro_sv, int flags)
2036{
2037 struct coro *coro = SvSTATE (coro_sv);
2038
2039 if (coro->flags & CF_RUNNING)
2040 croak ("cannot enable tracing on a running coroutine, caught");
2041
2042 if (flags & CC_TRACE)
2043 {
2044 if (!coro->cctx)
2045 coro->cctx = cctx_new_run ();
2046 else if (!(coro->cctx->flags & CC_TRACE))
2047 croak ("cannot enable tracing on coroutine with custom stack, caught");
2048
2049 coro->cctx->flags |= CC_NOREUSE | (flags & (CC_TRACE | CC_TRACE_ALL));
2050 }
2051 else if (coro->cctx && coro->cctx->flags & CC_TRACE)
2052 {
2053 coro->cctx->flags &= ~(CC_TRACE | CC_TRACE_ALL);
2054
2055 if (coro->flags & CF_RUNNING)
2056 PL_runops = RUNOPS_DEFAULT;
2057 else
2058 coro->slot->runops = RUNOPS_DEFAULT;
2059 }
2060}
2061
2062static void
2063coro_push_av (pTHX_ AV *av, I32 gimme_v)
2064{
2065 if (AvFILLp (av) >= 0 && gimme_v != G_VOID)
2066 {
2067 dSP;
2068
2069 if (gimme_v == G_SCALAR)
2070 XPUSHs (AvARRAY (av)[AvFILLp (av)]);
2071 else
2072 {
2073 int i;
2074 EXTEND (SP, AvFILLp (av) + 1);
2075
2076 for (i = 0; i <= AvFILLp (av); ++i)
2077 PUSHs (AvARRAY (av)[i]);
2078 }
2079
2080 PUTBACK;
2081 }
2082}
2083
2084static void
2085coro_push_on_destroy (pTHX_ struct coro *coro, SV *cb)
2086{
2087 if (!coro->on_destroy)
2088 coro->on_destroy = newAV ();
2089
2090 av_push (coro->on_destroy, cb);
2091}
2092
2093static void
2094slf_destroy_join (pTHX_ struct CoroSLF *frame)
2095{
2096 SvREFCNT_dec ((SV *)((struct coro *)frame->data)->hv);
2097}
2098
2099static int
2100slf_check_join (pTHX_ struct CoroSLF *frame)
2101{
2102 struct coro *coro = (struct coro *)frame->data;
2103
2104 if (!coro->status)
2105 return 1;
2106
2107 frame->destroy = 0;
2108
2109 coro_push_av (aTHX_ coro->status, GIMME_V);
2110
2111 SvREFCNT_dec ((SV *)coro->hv);
2112
2113 return 0;
2114}
2115
2116static void
2117slf_init_join (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2118{
2119 struct coro *coro = SvSTATE (items > 0 ? arg [0] : &PL_sv_undef);
2120
2121 if (items > 1)
2122 croak ("join called with too many arguments");
2123
2124 if (coro->status)
2125 frame->prepare = prepare_nop;
2126 else
2127 {
2128 coro_push_on_destroy (aTHX_ coro, SvREFCNT_inc_NN (SvRV (coro_current)));
2129 frame->prepare = prepare_schedule;
2130 }
2131
2132 frame->check = slf_check_join;
2133 frame->destroy = slf_destroy_join;
2134 frame->data = (void *)coro;
2135 SvREFCNT_inc (coro->hv);
2136}
2137
2138static void
2139coro_call_on_destroy (pTHX_ struct coro *coro)
2140{
2141 AV *od = coro->on_destroy;
2142
2143 if (!od)
2144 return;
2145
2146 while (AvFILLp (od) >= 0)
2147 {
2148 SV *cb = sv_2mortal (av_pop (od));
2149
2150 /* coro hv's (and only hv's at the moment) are supported as well */
2151 if (SvSTATEhv_p (aTHX_ cb))
2152 api_ready (aTHX_ cb);
2153 else
2154 {
2155 dSP; /* don't disturb outer sp */
2156 PUSHMARK (SP);
2157
2158 if (coro->status)
2159 {
2160 PUTBACK;
2161 coro_push_av (aTHX_ coro->status, G_ARRAY);
2162 SPAGAIN;
2163 }
2164
2165 PUTBACK;
2166 call_sv (cb, G_VOID | G_DISCARD);
2167 }
2168 }
2169}
2170
2171static void
2172coro_set_status (pTHX_ struct coro *coro, SV **arg, int items)
2173{
2174 AV *av;
2175
2176 if (coro->status)
2177 {
2178 av = coro->status;
2179 av_clear (av);
2180 }
2181 else
2182 av = coro->status = newAV ();
2183
2184 /* items are actually not so common, so optimise for this case */
2185 if (items)
2186 {
2187 int i;
2188
2189 av_extend (av, items - 1);
2190
2191 for (i = 0; i < items; ++i)
2192 av_push (av, SvREFCNT_inc_NN (arg [i]));
2193 }
2194}
2195
2196static void
2197slf_init_terminate_cancel_common (pTHX_ struct CoroSLF *frame, HV *coro_hv)
2198{
2199 av_push (av_destroy, (SV *)newRV_inc ((SV *)coro_hv)); /* RVinc for perl */
2200 api_ready (aTHX_ sv_manager);
2201
2202 frame->prepare = prepare_schedule;
2203 frame->check = slf_check_repeat;
2204
2205 /* as a minor optimisation, we could unwind all stacks here */
2206 /* but that puts extra pressure on pp_slf, and is not worth much */
2207 /*coro_unwind_stacks (aTHX);*/
2208}
2209
2210static void
2211slf_init_terminate (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2212{
2213 HV *coro_hv = (HV *)SvRV (coro_current);
2214
2215 coro_set_status (aTHX_ SvSTATE ((SV *)coro_hv), arg, items);
2216 slf_init_terminate_cancel_common (aTHX_ frame, coro_hv);
2217}
2218
2219static void
2220slf_init_cancel (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2221{
2222 HV *coro_hv;
2223 struct coro *coro;
2224
2225 if (items <= 0)
2226 croak ("Coro::cancel called without coro object,");
2227
2228 coro = SvSTATE (arg [0]);
2229 coro_hv = coro->hv;
2230
2231 coro_set_status (aTHX_ coro, arg + 1, items - 1);
2232
2233 if (ecb_expect_false (coro->flags & CF_NOCANCEL))
2234 {
2235 /* coro currently busy cancelling something, so just notify it */
2236 coro->slf_frame.data = (void *)coro;
2237
2238 frame->prepare = prepare_nop;
2239 frame->check = slf_check_nop;
2240 }
2241 else if (coro_hv == (HV *)SvRV (coro_current))
2242 {
2243 /* cancelling the current coro is allowed, and equals terminate */
2244 slf_init_terminate_cancel_common (aTHX_ frame, coro_hv);
2245 }
2246 else
2247 {
2248 struct coro *self = SvSTATE_current;
2249
2250 /* otherwise we cancel directly, purely for speed reasons
2251 * unfortunately, this requires some magic trickery, as
2252 * somebody else could cancel us, so we have to fight the cancellation.
2253 * this is ugly, and hopefully fully worth the extra speed.
2254 * besides, I can't get the slow-but-safe version working...
2255 */
2256 slf_frame.data = 0;
2257 self->flags |= CF_NOCANCEL;
2258 coro_state_destroy (aTHX_ coro);
2259 self->flags &= ~CF_NOCANCEL;
2260
2261 if (slf_frame.data)
2262 {
2263 /* while we were busy we have been cancelled, so terminate */
2264 slf_init_terminate_cancel_common (aTHX_ frame, self->hv);
2265 }
2266 else
2267 {
2268 frame->prepare = prepare_nop;
2269 frame->check = slf_check_nop;
2270 }
2271 }
2272}
2273
2274static int
2275slf_check_safe_cancel (pTHX_ struct CoroSLF *frame)
2276{
2277 frame->prepare = 0;
2278 coro_unwind_stacks (aTHX);
2279
2280 slf_init_terminate_cancel_common (aTHX_ frame, (HV *)SvRV (coro_current));
2281
2282 return 1;
2283}
2284
2285static int
2286safe_cancel (pTHX_ struct coro *coro, SV **arg, int items)
2287{
2288 if (coro->cctx)
2289 croak ("coro inside C callback, unable to cancel at this time, caught");
2290
2291 if (coro->flags & CF_NEW)
2292 {
2293 coro_set_status (aTHX_ coro, arg, items);
2294 coro_state_destroy (aTHX_ coro);
2295 }
2296 else
2297 {
2298 if (!coro->slf_frame.prepare)
2299 croak ("coro outside an SLF function, unable to cancel at this time, caught");
2300
2301 slf_destroy (aTHX_ coro);
2302
2303 coro_set_status (aTHX_ coro, arg, items);
2304 coro->slf_frame.prepare = prepare_nop;
2305 coro->slf_frame.check = slf_check_safe_cancel;
2306
2307 api_ready (aTHX_ (SV *)coro->hv);
2308 }
2309
2310 return 1;
2311}
2312
2313/*****************************************************************************/
2314/* async pool handler */
2315
2316static int
2317slf_check_pool_handler (pTHX_ struct CoroSLF *frame)
2318{
2319 HV *hv = (HV *)SvRV (coro_current);
2320 struct coro *coro = (struct coro *)frame->data;
2321
2322 if (!coro->invoke_cb)
2323 return 1; /* loop till we have invoke */
2324 else
2325 {
2326 hv_store (hv, "desc", sizeof ("desc") - 1,
2327 newSVpvn ("[async_pool]", sizeof ("[async_pool]") - 1), 0);
2328
2329 coro->saved_deffh = SvREFCNT_inc_NN ((SV *)PL_defoutgv);
2330
2331 {
2332 dSP;
2333 XPUSHs (sv_2mortal (coro->invoke_cb)); coro->invoke_cb = 0;
2334 PUTBACK;
2335 }
2336
2337 SvREFCNT_dec (GvAV (PL_defgv));
2338 GvAV (PL_defgv) = coro->invoke_av;
2339 coro->invoke_av = 0;
2340
2341 return 0;
2342 }
2343}
2344
2345static void
2346slf_init_pool_handler (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2347{
2348 HV *hv = (HV *)SvRV (coro_current);
2349 struct coro *coro = SvSTATE_hv ((SV *)hv);
2350
2351 if (ecb_expect_true (coro->saved_deffh))
2352 {
2353 /* subsequent iteration */
2354 SvREFCNT_dec ((SV *)PL_defoutgv); PL_defoutgv = (GV *)coro->saved_deffh;
2355 coro->saved_deffh = 0;
2356
2357 if (coro_rss (aTHX_ coro) > SvUV (sv_pool_rss)
2358 || av_len (av_async_pool) + 1 >= SvIV (sv_pool_size))
2359 {
2360 slf_init_terminate_cancel_common (aTHX_ frame, hv);
2361 return;
2362 }
2363 else
2364 {
2365 av_clear (GvAV (PL_defgv));
2366 hv_store (hv, "desc", sizeof ("desc") - 1, SvREFCNT_inc_NN (sv_async_pool_idle), 0);
2367
2368 coro->prio = 0;
2369
2370 if (coro->cctx && (coro->cctx->flags & CC_TRACE))
2371 api_trace (aTHX_ coro_current, 0);
2372
2373 frame->prepare = prepare_schedule;
2374 av_push (av_async_pool, SvREFCNT_inc (hv));
2375 }
2376 }
2377 else
2378 {
2379 /* first iteration, simply fall through */
2380 frame->prepare = prepare_nop;
2381 }
2382
2383 frame->check = slf_check_pool_handler;
2384 frame->data = (void *)coro;
2385}
2386
2387/*****************************************************************************/
2388/* rouse callback */
2389
2390#define CORO_MAGIC_type_rouse PERL_MAGIC_ext
2391
2392static void
2393coro_rouse_callback (pTHX_ CV *cv)
2394{
2395 dXSARGS;
2396 SV *data = (SV *)S_GENSUB_ARG;
2397
2398 if (SvTYPE (SvRV (data)) != SVt_PVAV)
2399 {
2400 /* first call, set args */
2401 SV *coro = SvRV (data);
2402 AV *av = newAV ();
2403
2404 SvRV_set (data, (SV *)av);
2405
2406 /* better take a full copy of the arguments */
2407 while (items--)
2408 av_store (av, items, newSVsv (ST (items)));
2409
2410 api_ready (aTHX_ coro);
2411 SvREFCNT_dec (coro);
2412 }
2413
2414 XSRETURN_EMPTY;
2415}
2416
2417static int
2418slf_check_rouse_wait (pTHX_ struct CoroSLF *frame)
2419{
2420 SV *data = (SV *)frame->data;
2421
2422 if (CORO_THROW)
2423 return 0;
2424
2425 if (SvTYPE (SvRV (data)) != SVt_PVAV)
2426 return 1;
2427
2428 /* now push all results on the stack */
2429 {
2430 dSP;
2431 AV *av = (AV *)SvRV (data);
2432 int i;
2433
2434 EXTEND (SP, AvFILLp (av) + 1);
2435 for (i = 0; i <= AvFILLp (av); ++i)
2436 PUSHs (sv_2mortal (AvARRAY (av)[i]));
2437
2438 /* we have stolen the elements, so set length to zero and free */
2439 AvFILLp (av) = -1;
2440 av_undef (av);
2441
2442 PUTBACK;
2443 }
2444
2445 return 0;
2446}
2447
2448static void
2449slf_init_rouse_wait (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2450{
2451 SV *cb;
2452
2453 if (items)
2454 cb = arg [0];
2455 else
2456 {
2457 struct coro *coro = SvSTATE_current;
2458
2459 if (!coro->rouse_cb)
2460 croak ("Coro::rouse_wait called without rouse callback, and no default rouse callback found either,");
2461
2462 cb = sv_2mortal (coro->rouse_cb);
2463 coro->rouse_cb = 0;
2464 }
2465
2466 if (!SvROK (cb)
2467 || SvTYPE (SvRV (cb)) != SVt_PVCV
2468 || CvXSUB ((CV *)SvRV (cb)) != coro_rouse_callback)
2469 croak ("Coro::rouse_wait called with illegal callback argument,");
2470
2471 {
2472 CV *cv = (CV *)SvRV (cb); /* for S_GENSUB_ARG */
2473 SV *data = (SV *)S_GENSUB_ARG;
2474
2475 frame->data = (void *)data;
2476 frame->prepare = SvTYPE (SvRV (data)) == SVt_PVAV ? prepare_nop : prepare_schedule;
2477 frame->check = slf_check_rouse_wait;
2478 }
2479}
2480
2481static SV *
2482coro_new_rouse_cb (pTHX)
2483{
2484 HV *hv = (HV *)SvRV (coro_current);
2485 struct coro *coro = SvSTATE_hv (hv);
2486 SV *data = newRV_inc ((SV *)hv);
2487 SV *cb = s_gensub (aTHX_ coro_rouse_callback, (void *)data);
2488
2489 sv_magicext (SvRV (cb), data, CORO_MAGIC_type_rouse, 0, 0, 0);
2490 SvREFCNT_dec (data); /* magicext increases the refcount */
2491
2492 SvREFCNT_dec (coro->rouse_cb);
2493 coro->rouse_cb = SvREFCNT_inc_NN (cb);
2494
2495 return cb;
2496}
2497
2498/*****************************************************************************/
2499/* schedule-like-function opcode (SLF) */
2500
2501static UNOP slf_restore; /* restore stack as entersub did, for first-re-run */
2502static const CV *slf_cv;
2503static SV **slf_argv;
2504static int slf_argc, slf_arga; /* count, allocated */
2505static I32 slf_ax; /* top of stack, for restore */
2506
2507/* this restores the stack in the case we patched the entersub, to */
2508/* recreate the stack frame as perl will on following calls */
2509/* since entersub cleared the stack */
2510static OP *
2511pp_restore (pTHX)
2512{
2513 int i;
2514 SV **SP = PL_stack_base + slf_ax;
2515
2516 PUSHMARK (SP);
2517
2518 EXTEND (SP, slf_argc + 1);
2519
2520 for (i = 0; i < slf_argc; ++i)
2521 PUSHs (sv_2mortal (slf_argv [i]));
2522
2523 PUSHs ((SV *)CvGV (slf_cv));
2524
2525 RETURNOP (slf_restore.op_first);
2526}
2527
2528static void
2529slf_prepare_transfer (pTHX_ struct coro_transfer_args *ta)
2530{
2531 SV **arg = (SV **)slf_frame.data;
2532
2533 prepare_transfer (aTHX_ ta, arg [0], arg [1]);
2534}
2535
2536static void
2537slf_init_transfer (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2538{
2539 if (items != 2)
2540 croak ("Coro::State::transfer (prev, next) expects two arguments, not %d,", items);
2541
2542 frame->prepare = slf_prepare_transfer;
2543 frame->check = slf_check_nop;
2544 frame->data = (void *)arg; /* let's hope it will stay valid */
2545}
2546
2547static void
2548slf_init_schedule (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2549{
2550 frame->prepare = prepare_schedule;
2551 frame->check = slf_check_nop;
2552}
2553
2554static void
2555slf_prepare_schedule_to (pTHX_ struct coro_transfer_args *ta)
2556{
2557 struct coro *next = (struct coro *)slf_frame.data;
2558
2559 SvREFCNT_inc_NN (next->hv);
2560 prepare_schedule_to (aTHX_ ta, next);
2561}
2562
2563static void
2564slf_init_schedule_to (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2565{
2566 if (!items)
2567 croak ("Coro::schedule_to expects a coroutine argument, caught");
2568
2569 frame->data = (void *)SvSTATE (arg [0]);
2570 frame->prepare = slf_prepare_schedule_to;
2571 frame->check = slf_check_nop;
2572}
2573
2574static void
2575slf_init_cede_to (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2576{
2577 api_ready (aTHX_ SvRV (coro_current));
2578
2579 slf_init_schedule_to (aTHX_ frame, cv, arg, items);
2580}
2581
2582static void
2583slf_init_cede (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2584{
2585 frame->prepare = prepare_cede;
2586 frame->check = slf_check_nop;
2587}
2588
2589static void
2590slf_init_cede_notself (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2591{
2592 frame->prepare = prepare_cede_notself;
2593 frame->check = slf_check_nop;
2594}
2595
2596/* "undo"/cancel a running slf call - used when cancelling a coro, mainly */
2597static void
2598slf_destroy (pTHX_ struct coro *coro)
2599{
2600 /* this callback is reserved for slf functions needing to do cleanup */
2601 if (coro->slf_frame.destroy && coro->slf_frame.prepare && !PL_dirty)
2602 coro->slf_frame.destroy (aTHX_ &coro->slf_frame);
2603
2604 /*
2605 * The on_destroy above most likely is from an SLF call.
2606 * Since by definition the SLF call will not finish when we destroy
2607 * the coro, we will have to force-finish it here, otherwise
2608 * cleanup functions cannot call SLF functions.
2609 */
2610 coro->slf_frame.prepare = 0;
2611}
2612
2613/*
2614 * these not obviously related functions are all rolled into one
2615 * function to increase chances that they all will call transfer with the same
2616 * stack offset
2617 * SLF stands for "schedule-like-function".
2618 */
2619static OP *
2620pp_slf (pTHX)
2621{
2622 I32 checkmark; /* mark SP to see how many elements check has pushed */
2623
2624 /* set up the slf frame, unless it has already been set-up */
2625 /* the latter happens when a new coro has been started */
2626 /* or when a new cctx was attached to an existing coroutine */
2627 if (ecb_expect_true (!slf_frame.prepare))
2628 {
2629 /* first iteration */
2630 dSP;
2631 SV **arg = PL_stack_base + TOPMARK + 1;
2632 int items = SP - arg; /* args without function object */
2633 SV *gv = *sp;
2634
2635 /* do a quick consistency check on the "function" object, and if it isn't */
2636 /* for us, divert to the real entersub */
2637 if (SvTYPE (gv) != SVt_PVGV
2638 || !GvCV (gv)
2639 || !(CvFLAGS (GvCV (gv)) & CVf_SLF))
2640 return PL_ppaddr[OP_ENTERSUB](aTHX);
2641
2642 if (!(PL_op->op_flags & OPf_STACKED))
2643 {
2644 /* ampersand-form of call, use @_ instead of stack */
2645 AV *av = GvAV (PL_defgv);
2646 arg = AvARRAY (av);
2647 items = AvFILLp (av) + 1;
2648 }
2649
2650 /* now call the init function, which needs to set up slf_frame */
2651 ((coro_slf_cb)CvXSUBANY (GvCV (gv)).any_ptr)
2652 (aTHX_ &slf_frame, GvCV (gv), arg, items);
2653
2654 /* pop args */
2655 SP = PL_stack_base + POPMARK;
2656
2657 PUTBACK;
2658 }
2659
2660 /* now that we have a slf_frame, interpret it! */
2661 /* we use a callback system not to make the code needlessly */
2662 /* complicated, but so we can run multiple perl coros from one cctx */
2663
2664 do
2665 {
2666 struct coro_transfer_args ta;
2667
2668 slf_frame.prepare (aTHX_ &ta);
2669 TRANSFER (ta, 0);
2670
2671 checkmark = PL_stack_sp - PL_stack_base;
2672 }
2673 while (slf_frame.check (aTHX_ &slf_frame));
2674
2675 slf_frame.prepare = 0; /* invalidate the frame, we are done processing it */
2676
2677 /* exception handling */
2678 if (ecb_expect_false (CORO_THROW))
2679 {
2680 SV *exception = sv_2mortal (CORO_THROW);
2681
2682 CORO_THROW = 0;
2683 sv_setsv (ERRSV, exception);
2684 croak (0);
2685 }
2686
2687 /* return value handling - mostly like entersub */
2688 /* make sure we put something on the stack in scalar context */
2689 if (GIMME_V == G_SCALAR
2690 && ecb_expect_false (PL_stack_sp != PL_stack_base + checkmark + 1))
2691 {
2692 dSP;
2693 SV **bot = PL_stack_base + checkmark;
2694
2695 if (sp == bot) /* too few, push undef */
2696 bot [1] = &PL_sv_undef;
2697 else /* too many, take last one */
2698 bot [1] = *sp;
2699
2700 SP = bot + 1;
2701
2702 PUTBACK;
2703 }
2704
2705 return NORMAL;
2706}
2707
2708static void
2709api_execute_slf (pTHX_ CV *cv, coro_slf_cb init_cb, I32 ax)
2710{
2711 int i;
2712 SV **arg = PL_stack_base + ax;
2713 int items = PL_stack_sp - arg + 1;
2714
2715 assert (("FATAL: SLF call with illegal CV value", !CvANON (cv)));
2716
2717 if (PL_op->op_ppaddr != PL_ppaddr [OP_ENTERSUB]
2718 && PL_op->op_ppaddr != pp_slf)
2719 croak ("FATAL: Coro SLF calls can only be made normally, not via goto or any other means, caught");
2720
2721 CvFLAGS (cv) |= CVf_SLF;
2722 CvXSUBANY (cv).any_ptr = (void *)init_cb;
2723 slf_cv = cv;
2724
2725 /* we patch the op, and then re-run the whole call */
2726 /* we have to put the same argument on the stack for this to work */
2727 /* and this will be done by pp_restore */
2728 slf_restore.op_next = (OP *)&slf_restore;
2729 slf_restore.op_type = OP_CUSTOM;
2730 slf_restore.op_ppaddr = pp_restore;
2731 slf_restore.op_first = PL_op;
2732
2733 slf_ax = ax - 1; /* undo the ax++ inside dAXMARK */
2734
2735 if (PL_op->op_flags & OPf_STACKED)
2736 {
2737 if (items > slf_arga)
2738 {
2739 slf_arga = items;
2740 Safefree (slf_argv);
2741 New (0, slf_argv, slf_arga, SV *);
2742 }
2743
2744 slf_argc = items;
2745
2746 for (i = 0; i < items; ++i)
2747 slf_argv [i] = SvREFCNT_inc (arg [i]);
2748 }
2749 else
2750 slf_argc = 0;
2751
2752 PL_op->op_ppaddr = pp_slf;
2753 /*PL_op->op_type = OP_CUSTOM; /* we do behave like entersub still */
2754
2755 PL_op = (OP *)&slf_restore;
2756}
2757
2758/*****************************************************************************/
2759/* dynamic wind */
2760
2761static void
2762on_enterleave_call (pTHX_ SV *cb)
2763{
2764 dSP;
2765
2766 PUSHSTACK;
2767
2768 PUSHMARK (SP);
2769 PUTBACK;
2770 call_sv (cb, G_VOID | G_DISCARD);
2771 SPAGAIN;
2772
2773 POPSTACK;
2774}
2775
2776static SV *
2777coro_avp_pop_and_free (pTHX_ AV **avp)
2778{
2779 AV *av = *avp;
2780 SV *res = av_pop (av);
2781
2782 if (AvFILLp (av) < 0)
2783 {
2784 *avp = 0;
2785 SvREFCNT_dec (av);
2786 }
2787
2788 return res;
2789}
2790
2791static void
2792coro_pop_on_enter (pTHX_ void *coro)
2793{
2794 SV *cb = coro_avp_pop_and_free (aTHX_ &((struct coro *)coro)->on_enter);
2795 SvREFCNT_dec (cb);
2796}
2797
2798static void
2799coro_pop_on_leave (pTHX_ void *coro)
2800{
2801 SV *cb = coro_avp_pop_and_free (aTHX_ &((struct coro *)coro)->on_leave);
2802 on_enterleave_call (aTHX_ sv_2mortal (cb));
2803}
2804
2805/*****************************************************************************/
2806/* PerlIO::cede */
2807
2808typedef struct
2809{
2810 PerlIOBuf base;
2811 NV next, every;
2812} PerlIOCede;
2813
2814static IV ecb_cold
2815PerlIOCede_pushed (pTHX_ PerlIO *f, const char *mode, SV *arg, PerlIO_funcs *tab)
2816{
2817 PerlIOCede *self = PerlIOSelf (f, PerlIOCede);
2818
2819 self->every = SvCUR (arg) ? SvNV (arg) : 0.01;
2820 self->next = nvtime () + self->every;
2821
2822 return PerlIOBuf_pushed (aTHX_ f, mode, Nullsv, tab);
2823}
2824
2825static SV * ecb_cold
2826PerlIOCede_getarg (pTHX_ PerlIO *f, CLONE_PARAMS *param, int flags)
2827{
2828 PerlIOCede *self = PerlIOSelf (f, PerlIOCede);
2829
2830 return newSVnv (self->every);
2831}
2832
2833static IV
2834PerlIOCede_flush (pTHX_ PerlIO *f)
2835{
2836 PerlIOCede *self = PerlIOSelf (f, PerlIOCede);
2837 double now = nvtime ();
2838
2839 if (now >= self->next)
2840 {
2841 api_cede (aTHX);
2842 self->next = now + self->every;
2843 }
2844
2845 return PerlIOBuf_flush (aTHX_ f);
2846}
2847
2848static PerlIO_funcs PerlIO_cede =
2849{
2850 sizeof(PerlIO_funcs),
2851 "cede",
2852 sizeof(PerlIOCede),
2853 PERLIO_K_DESTRUCT | PERLIO_K_RAW,
2854 PerlIOCede_pushed,
2855 PerlIOBuf_popped,
2856 PerlIOBuf_open,
2857 PerlIOBase_binmode,
2858 PerlIOCede_getarg,
2859 PerlIOBase_fileno,
2860 PerlIOBuf_dup,
2861 PerlIOBuf_read,
2862 PerlIOBuf_unread,
2863 PerlIOBuf_write,
2864 PerlIOBuf_seek,
2865 PerlIOBuf_tell,
2866 PerlIOBuf_close,
2867 PerlIOCede_flush,
2868 PerlIOBuf_fill,
2869 PerlIOBase_eof,
2870 PerlIOBase_error,
2871 PerlIOBase_clearerr,
2872 PerlIOBase_setlinebuf,
2873 PerlIOBuf_get_base,
2874 PerlIOBuf_bufsiz,
2875 PerlIOBuf_get_ptr,
2876 PerlIOBuf_get_cnt,
2877 PerlIOBuf_set_ptrcnt,
2878};
2879
2880/*****************************************************************************/
2881/* Coro::Semaphore & Coro::Signal */
2882
2883static SV *
2884coro_waitarray_new (pTHX_ int count)
2885{
2886 /* a waitarray=semaphore contains a counter IV in $sem->[0] and any waiters after that */
2887 AV *av = newAV ();
2888 SV **ary;
2889
2890 /* unfortunately, building manually saves memory */
2891 Newx (ary, 2, SV *);
2892 AvALLOC (av) = ary;
2893#if PERL_VERSION_ATLEAST (5,10,0)
2894 AvARRAY (av) = ary;
2895#else
2896 /* 5.8.8 needs this syntax instead of AvARRAY = ary, yet */
2897 /* -DDEBUGGING flags this as a bug, despite it perfectly working */
2898 SvPVX ((SV *)av) = (char *)ary;
2899#endif
2900 AvMAX (av) = 1;
2901 AvFILLp (av) = 0;
2902 ary [0] = newSViv (count);
2903
2904 return newRV_noinc ((SV *)av);
2905}
2906
2907/* semaphore */
2908
2909static void
2910coro_semaphore_adjust (pTHX_ AV *av, IV adjust)
2911{
2912 SV *count_sv = AvARRAY (av)[0];
2913 IV count = SvIVX (count_sv);
2914
2915 count += adjust;
2916 SvIVX (count_sv) = count;
2917
2918 /* now wake up as many waiters as are expected to lock */
2919 while (count > 0 && AvFILLp (av) > 0)
2920 {
2921 SV *cb;
2922
2923 /* swap first two elements so we can shift a waiter */
2924 AvARRAY (av)[0] = AvARRAY (av)[1];
2925 AvARRAY (av)[1] = count_sv;
2926 cb = av_shift (av);
2927
2928 if (SvOBJECT (cb))
2929 {
2930 api_ready (aTHX_ cb);
2931 --count;
2932 }
2933 else if (SvTYPE (cb) == SVt_PVCV)
2934 {
2935 dSP;
2936 PUSHMARK (SP);
2937 XPUSHs (sv_2mortal (newRV_inc ((SV *)av)));
2938 PUTBACK;
2939 call_sv (cb, G_VOID | G_DISCARD | G_EVAL | G_KEEPERR);
2940 }
2941
2942 SvREFCNT_dec (cb);
2943 }
2944}
2945
2946static void
2947coro_semaphore_destroy (pTHX_ struct CoroSLF *frame)
2948{
2949 /* call $sem->adjust (0) to possibly wake up some other waiters */
2950 coro_semaphore_adjust (aTHX_ (AV *)frame->data, 0);
2951}
2952
2953static int
2954slf_check_semaphore_down_or_wait (pTHX_ struct CoroSLF *frame, int acquire)
2955{
2956 AV *av = (AV *)frame->data;
2957 SV *count_sv = AvARRAY (av)[0];
2958 SV *coro_hv = SvRV (coro_current);
2959
2960 /* if we are about to throw, don't actually acquire the lock, just throw */
2961 if (CORO_THROW)
2962 return 0;
2963 else if (SvIVX (count_sv) > 0)
2964 {
2965 frame->destroy = 0;
2966
2967 if (acquire)
2968 SvIVX (count_sv) = SvIVX (count_sv) - 1;
2969 else
2970 coro_semaphore_adjust (aTHX_ av, 0);
2971
2972 return 0;
2973 }
2974 else
2975 {
2976 int i;
2977 /* if we were woken up but can't down, we look through the whole */
2978 /* waiters list and only add us if we aren't in there already */
2979 /* this avoids some degenerate memory usage cases */
2980 for (i = AvFILLp (av); i > 0; --i) /* i > 0 is not an off-by-one bug */
2981 if (AvARRAY (av)[i] == coro_hv)
2982 return 1;
2983
2984 av_push (av, SvREFCNT_inc (coro_hv));
2985 return 1;
2986 }
2987}
2988
2989static int
2990slf_check_semaphore_down (pTHX_ struct CoroSLF *frame)
2991{
2992 return slf_check_semaphore_down_or_wait (aTHX_ frame, 1);
2993}
2994
2995static int
2996slf_check_semaphore_wait (pTHX_ struct CoroSLF *frame)
2997{
2998 return slf_check_semaphore_down_or_wait (aTHX_ frame, 0);
2999}
3000
3001static void
3002slf_init_semaphore_down_or_wait (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
3003{
3004 AV *av = (AV *)SvRV (arg [0]);
3005
3006 if (SvIVX (AvARRAY (av)[0]) > 0)
3007 {
3008 frame->data = (void *)av;
3009 frame->prepare = prepare_nop;
3010 }
3011 else
3012 {
3013 av_push (av, SvREFCNT_inc (SvRV (coro_current)));
3014
3015 frame->data = (void *)sv_2mortal (SvREFCNT_inc ((SV *)av));
3016 frame->prepare = prepare_schedule;
3017 /* to avoid race conditions when a woken-up coro gets terminated */
3018 /* we arrange for a temporary on_destroy that calls adjust (0) */
3019 frame->destroy = coro_semaphore_destroy;
3020 }
3021}
3022
3023static void
3024slf_init_semaphore_down (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
3025{
3026 slf_init_semaphore_down_or_wait (aTHX_ frame, cv, arg, items);
3027 frame->check = slf_check_semaphore_down;
3028}
3029
3030static void
3031slf_init_semaphore_wait (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
3032{
3033 if (items >= 2)
3034 {
3035 /* callback form */
3036 AV *av = (AV *)SvRV (arg [0]);
3037 SV *cb_cv = s_get_cv_croak (arg [1]);
3038
3039 av_push (av, SvREFCNT_inc_NN (cb_cv));
3040
3041 if (SvIVX (AvARRAY (av)[0]) > 0)
3042 coro_semaphore_adjust (aTHX_ av, 0);
3043
3044 frame->prepare = prepare_nop;
3045 frame->check = slf_check_nop;
3046 }
3047 else
3048 {
3049 slf_init_semaphore_down_or_wait (aTHX_ frame, cv, arg, items);
3050 frame->check = slf_check_semaphore_wait;
3051 }
3052}
3053
3054/* signal */
3055
3056static void
3057coro_signal_wake (pTHX_ AV *av, int count)
3058{
3059 SvIVX (AvARRAY (av)[0]) = 0;
3060
3061 /* now signal count waiters */
3062 while (count > 0 && AvFILLp (av) > 0)
3063 {
3064 SV *cb;
3065
3066 /* swap first two elements so we can shift a waiter */
3067 cb = AvARRAY (av)[0];
3068 AvARRAY (av)[0] = AvARRAY (av)[1];
3069 AvARRAY (av)[1] = cb;
3070
3071 cb = av_shift (av);
3072
3073 if (SvTYPE (cb) == SVt_PVCV)
3074 {
3075 dSP;
3076 PUSHMARK (SP);
3077 XPUSHs (sv_2mortal (newRV_inc ((SV *)av)));
3078 PUTBACK;
3079 call_sv (cb, G_VOID | G_DISCARD | G_EVAL | G_KEEPERR);
3080 }
3081 else
3082 {
3083 api_ready (aTHX_ cb);
3084 sv_setiv (cb, 0); /* signal waiter */
3085 }
3086
3087 SvREFCNT_dec (cb);
3088
3089 --count;
3090 }
3091}
3092
3093static int
3094slf_check_signal_wait (pTHX_ struct CoroSLF *frame)
3095{
3096 /* if we are about to throw, also stop waiting */
3097 return SvROK ((SV *)frame->data) && !CORO_THROW;
3098}
3099
3100static void
3101slf_init_signal_wait (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
3102{
3103 AV *av = (AV *)SvRV (arg [0]);
3104
3105 if (items >= 2)
3106 {
3107 SV *cb_cv = s_get_cv_croak (arg [1]);
3108 av_push (av, SvREFCNT_inc_NN (cb_cv));
3109
3110 if (SvIVX (AvARRAY (av)[0]))
3111 coro_signal_wake (aTHX_ av, 1); /* must be the only waiter */
3112
3113 frame->prepare = prepare_nop;
3114 frame->check = slf_check_nop;
3115 }
3116 else if (SvIVX (AvARRAY (av)[0]))
3117 {
3118 SvIVX (AvARRAY (av)[0]) = 0;
3119 frame->prepare = prepare_nop;
3120 frame->check = slf_check_nop;
3121 }
3122 else
3123 {
3124 SV *waiter = newSVsv (coro_current); /* owned by signal av */
3125
3126 av_push (av, waiter);
3127
3128 frame->data = (void *)sv_2mortal (SvREFCNT_inc_NN (waiter)); /* owned by process */
3129 frame->prepare = prepare_schedule;
3130 frame->check = slf_check_signal_wait;
3131 }
3132}
3133
3134/*****************************************************************************/
3135/* Coro::AIO */
3136
3137#define CORO_MAGIC_type_aio PERL_MAGIC_ext
3138
3139/* helper storage struct */
3140struct io_state
3141{
3142 int errorno;
3143 I32 laststype; /* U16 in 5.10.0 */
3144 int laststatval;
3145 Stat_t statcache;
3146};
3147
3148static void
3149coro_aio_callback (pTHX_ CV *cv)
3150{
3151 dXSARGS;
3152 AV *state = (AV *)S_GENSUB_ARG;
3153 SV *coro = av_pop (state);
3154 SV *data_sv = newSV (sizeof (struct io_state));
3155
3156 av_extend (state, items - 1);
3157
3158 sv_upgrade (data_sv, SVt_PV);
3159 SvCUR_set (data_sv, sizeof (struct io_state));
3160 SvPOK_only (data_sv);
3161
3162 {
3163 struct io_state *data = (struct io_state *)SvPVX (data_sv);
3164
3165 data->errorno = errno;
3166 data->laststype = PL_laststype;
3167 data->laststatval = PL_laststatval;
3168 data->statcache = PL_statcache;
3169 }
3170
3171 /* now build the result vector out of all the parameters and the data_sv */
3172 {
3173 int i;
3174
3175 for (i = 0; i < items; ++i)
3176 av_push (state, SvREFCNT_inc_NN (ST (i)));
3177 }
3178
3179 av_push (state, data_sv);
3180
3181 api_ready (aTHX_ coro);
3182 SvREFCNT_dec (coro);
3183 SvREFCNT_dec ((AV *)state);
3184}
3185
3186static int
3187slf_check_aio_req (pTHX_ struct CoroSLF *frame)
3188{
3189 AV *state = (AV *)frame->data;
3190
3191 /* if we are about to throw, return early */
3192 /* this does not cancel the aio request, but at least */
3193 /* it quickly returns */
3194 if (CORO_THROW)
3195 return 0;
3196
3197 /* one element that is an RV? repeat! */
3198 if (AvFILLp (state) == 0 && SvROK (AvARRAY (state)[0]))
3199 return 1;
3200
3201 /* restore status */
3202 {
3203 SV *data_sv = av_pop (state);
3204 struct io_state *data = (struct io_state *)SvPVX (data_sv);
3205
3206 errno = data->errorno;
3207 PL_laststype = data->laststype;
3208 PL_laststatval = data->laststatval;
3209 PL_statcache = data->statcache;
3210
3211 SvREFCNT_dec (data_sv);
3212 }
3213
3214 /* push result values */
3215 {
3216 dSP;
3217 int i;
3218
3219 EXTEND (SP, AvFILLp (state) + 1);
3220 for (i = 0; i <= AvFILLp (state); ++i)
3221 PUSHs (sv_2mortal (SvREFCNT_inc_NN (AvARRAY (state)[i])));
3222
3223 PUTBACK;
3224 }
3225
3226 return 0;
3227}
3228
3229static void
3230slf_init_aio_req (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
3231{
3232 AV *state = (AV *)sv_2mortal ((SV *)newAV ());
3233 SV *coro_hv = SvRV (coro_current);
3234 struct coro *coro = SvSTATE_hv (coro_hv);
3235
3236 /* put our coroutine id on the state arg */
3237 av_push (state, SvREFCNT_inc_NN (coro_hv));
3238
3239 /* first see whether we have a non-zero priority and set it as AIO prio */
3240 if (coro->prio)
3241 {
3242 dSP;
3243
3244 static SV *prio_cv;
3245 static SV *prio_sv;
3246
3247 if (ecb_expect_false (!prio_cv))
3248 {
3249 prio_cv = (SV *)get_cv ("IO::AIO::aioreq_pri", 0);
3250 prio_sv = newSViv (0);
3251 }
3252
3253 PUSHMARK (SP);
3254 sv_setiv (prio_sv, coro->prio);
3255 XPUSHs (prio_sv);
3256
3257 PUTBACK;
3258 call_sv (prio_cv, G_VOID | G_DISCARD);
3259 }
3260
3261 /* now call the original request */
3262 {
3263 dSP;
3264 CV *req = (CV *)CORO_MAGIC_NN ((SV *)cv, CORO_MAGIC_type_aio)->mg_obj;
3265 int i;
3266
3267 PUSHMARK (SP);
3268
3269 /* first push all args to the stack */
3270 EXTEND (SP, items + 1);
3271
3272 for (i = 0; i < items; ++i)
3273 PUSHs (arg [i]);
3274
3275 /* now push the callback closure */
3276 PUSHs (sv_2mortal (s_gensub (aTHX_ coro_aio_callback, (void *)SvREFCNT_inc_NN ((SV *)state))));
3277
3278 /* now call the AIO function - we assume our request is uncancelable */
3279 PUTBACK;
3280 call_sv ((SV *)req, G_VOID | G_DISCARD);
3281 }
3282
3283 /* now that the request is going, we loop till we have a result */
3284 frame->data = (void *)state;
3285 frame->prepare = prepare_schedule;
3286 frame->check = slf_check_aio_req;
3287}
3288
3289static void
3290coro_aio_req_xs (pTHX_ CV *cv)
3291{
3292 dXSARGS;
3293
3294 CORO_EXECUTE_SLF_XS (slf_init_aio_req);
3295
3296 XSRETURN_EMPTY;
3297}
3298
3299/*****************************************************************************/
3300
3301#if CORO_CLONE
3302# include "clone.c"
3303#endif
3304
3305/*****************************************************************************/
3306
3307static SV *
3308coro_new (pTHX_ HV *stash, SV **argv, int argc, int is_coro)
3309{
3310 SV *coro_sv;
3311 struct coro *coro;
3312 MAGIC *mg;
3313 HV *hv;
3314 SV *cb;
3315 int i;
3316
3317 if (argc > 0)
3318 {
3319 cb = s_get_cv_croak (argv [0]);
3320
3321 if (!is_coro)
3322 {
3323 if (CvISXSUB (cb))
3324 croak ("Coro::State doesn't support XS functions as coroutine start, caught");
3325
3326 if (!CvROOT (cb))
3327 croak ("Coro::State doesn't support autoloaded or undefined functions as coroutine start, caught");
3328 }
3329 }
3330
3331 Newz (0, coro, 1, struct coro);
3332 coro->args = newAV ();
3333 coro->flags = CF_NEW;
3334
3335 if (coro_first) coro_first->prev = coro;
3336 coro->next = coro_first;
3337 coro_first = coro;
3338
3339 coro->hv = hv = newHV ();
3340 mg = sv_magicext ((SV *)hv, 0, CORO_MAGIC_type_state, &coro_state_vtbl, (char *)coro, 0);
3341 mg->mg_flags |= MGf_DUP;
3342 coro_sv = sv_bless (newRV_noinc ((SV *)hv), stash);
3343
3344 if (argc > 0)
3345 {
3346 av_extend (coro->args, argc + is_coro - 1);
3347
3348 if (is_coro)
3349 {
3350 av_push (coro->args, SvREFCNT_inc_NN ((SV *)cb));
3351 cb = (SV *)cv_coro_run;
3352 }
3353
3354 coro->startcv = (CV *)SvREFCNT_inc_NN ((SV *)cb);
3355
3356 for (i = 1; i < argc; i++)
3357 av_push (coro->args, newSVsv (argv [i]));
3358 }
3359
3360 return coro_sv;
3361}
3362
3363#ifndef __cplusplus
3364ecb_cold XS(boot_Coro__State);
3365#endif
3366
3367#if CORO_JIT
3368
3369static void ecb_noinline ecb_cold
3370pushav_4uv (pTHX_ UV a, UV b, UV c, UV d)
3371{
3372 dSP;
3373 AV *av = newAV ();
3374
3375 av_store (av, 3, newSVuv (d));
3376 av_store (av, 2, newSVuv (c));
3377 av_store (av, 1, newSVuv (b));
3378 av_store (av, 0, newSVuv (a));
3379
3380 XPUSHs (sv_2mortal (newRV_noinc ((SV *)av)));
3381
3382 PUTBACK;
3383}
3384
3385static void ecb_noinline ecb_cold
3386jit_init (pTHX)
3387{
3388 dSP;
3389 SV *load, *save;
3390 char *map_base;
3391 char *load_ptr, *save_ptr;
3392 STRLEN load_len, save_len, map_len;
3393 int count;
3394
3395 eval_pv ("require 'Coro/jit-" CORO_JIT_TYPE ".pl'", 1);
3396
3397 PUSHMARK (SP);
3398#define VARx(name,expr,type) pushav_4uv (aTHX_ (UV)&(expr), sizeof (expr), offsetof (perl_slots, name), sizeof (type));
3399# include "state.h"
3400#undef VARx
3401 count = call_pv ("Coro::State::_jit", G_ARRAY);
3402 SPAGAIN;
3403
3404 save = POPs; save_ptr = SvPVbyte (save, save_len);
3405 load = POPs; load_ptr = SvPVbyte (load, load_len);
3406
3407 map_len = load_len + save_len + 16;
3408
3409 map_base = mmap (0, map_len, PROT_READ | PROT_WRITE | PROT_EXEC, MAP_PRIVATE | MAP_ANONYMOUS, -1, 0);
3410
3411 assert (("Coro: unable to mmap jit code page, cannot continue.", map_base != (char *)MAP_FAILED));
3412
3413 load_perl_slots = (load_save_perl_slots_type)map_base;
3414 memcpy (map_base, load_ptr, load_len);
3415
3416 map_base += (load_len + 15) & ~15;
3417
3418 save_perl_slots = (load_save_perl_slots_type)map_base;
3419 memcpy (map_base, save_ptr, save_len);
3420
3421 /* we are good citizens and try to make the page read-only, so the evil evil */
3422 /* hackers might have it a bit more difficult */
3423 mprotect (map_base, map_len, PROT_READ | PROT_EXEC);
3424
3425 PUTBACK;
3426 eval_pv ("undef &Coro::State::_jit", 1);
3427}
3428
3429#endif
3430
3431MODULE = Coro::State PACKAGE = Coro::State PREFIX = api_
3432
3433PROTOTYPES: DISABLE
3434
3435BOOT:
3436{
3437#ifdef USE_ITHREADS
3438# if CORO_PTHREAD
3439 coro_thx = PERL_GET_CONTEXT;
3440# endif
3441#endif
3442 BOOT_PAGESIZE;
3443
3444 /* perl defines these to check for existance first, but why it doesn't */
3445 /* just create them one at init time is not clear to me, except for */
3446 /* programs trying to delete them, but... */
3447 /* anyway, we declare this as invalid and make sure they are initialised here */
3448 DEFSV;
3449 ERRSV;
3450
3451 cctx_current = cctx_new_empty ();
3452
3453 irsgv = gv_fetchpv ("/" , GV_ADD|GV_NOTQUAL, SVt_PV);
3454 stdoutgv = gv_fetchpv ("STDOUT", GV_ADD|GV_NOTQUAL, SVt_PVIO);
3455
3456 orig_sigelem_get = PL_vtbl_sigelem.svt_get; PL_vtbl_sigelem.svt_get = coro_sigelem_get;
3457 orig_sigelem_set = PL_vtbl_sigelem.svt_set; PL_vtbl_sigelem.svt_set = coro_sigelem_set;
3458 orig_sigelem_clr = PL_vtbl_sigelem.svt_clear; PL_vtbl_sigelem.svt_clear = coro_sigelem_clr;
3459
3460 hv_sig = coro_get_hv (aTHX_ "SIG", TRUE);
3461 rv_diehook = newRV_inc ((SV *)gv_fetchpv ("Coro::State::diehook" , 0, SVt_PVCV));
3462 rv_warnhook = newRV_inc ((SV *)gv_fetchpv ("Coro::State::warnhook", 0, SVt_PVCV));
3463
3464 coro_state_stash = gv_stashpv ("Coro::State", TRUE);
3465
3466 newCONSTSUB (coro_state_stash, "CC_TRACE" , newSViv (CC_TRACE));
3467 newCONSTSUB (coro_state_stash, "CC_TRACE_SUB" , newSViv (CC_TRACE_SUB));
3468 newCONSTSUB (coro_state_stash, "CC_TRACE_LINE", newSViv (CC_TRACE_LINE));
3469 newCONSTSUB (coro_state_stash, "CC_TRACE_ALL" , newSViv (CC_TRACE_ALL));
3470
3471 main_mainstack = PL_mainstack;
3472 main_top_env = PL_top_env;
3473
3474 while (main_top_env->je_prev)
3475 main_top_env = main_top_env->je_prev;
3476
3477 {
3478 SV *slf = sv_2mortal (newSViv (PTR2IV (pp_slf)));
3479
3480 if (!PL_custom_op_names) PL_custom_op_names = newHV ();
3481 hv_store_ent (PL_custom_op_names, slf, newSVpv ("coro_slf", 0), 0);
3482
3483 if (!PL_custom_op_descs) PL_custom_op_descs = newHV ();
3484 hv_store_ent (PL_custom_op_descs, slf, newSVpv ("coro schedule like function", 0), 0);
3485 }
3486
3487 coroapi.ver = CORO_API_VERSION;
3488 coroapi.rev = CORO_API_REVISION;
3489
3490 coroapi.transfer = api_transfer;
3491
3492 coroapi.sv_state = SvSTATE_;
3493 coroapi.execute_slf = api_execute_slf;
3494 coroapi.prepare_nop = prepare_nop;
3495 coroapi.prepare_schedule = prepare_schedule;
3496 coroapi.prepare_cede = prepare_cede;
3497 coroapi.prepare_cede_notself = prepare_cede_notself;
3498
3499 time_init (aTHX);
3500
3501 assert (("PRIO_NORMAL must be 0", !CORO_PRIO_NORMAL));
3502#if CORO_JIT
3503 PUTBACK;
3504 jit_init (aTHX);
3505 SPAGAIN;
3506#endif
3507}
3508
3509SV *
3510new (SV *klass, ...)
3511 ALIAS:
3512 Coro::new = 1
3513 CODE:
3514 RETVAL = coro_new (aTHX_ ix ? coro_stash : coro_state_stash, &ST (1), items - 1, ix);
3515 OUTPUT:
3516 RETVAL
3517
3518void
3519transfer (...)
3520 PROTOTYPE: $$
3521 CODE:
3522 CORO_EXECUTE_SLF_XS (slf_init_transfer);
3523
3524void
3525_exit (int code)
3526 PROTOTYPE: $
3527 CODE:
3528 _exit (code);
3529
3530SV *
3531clone (Coro::State coro)
3532 CODE:
3533{
3534#if CORO_CLONE
3535 struct coro *ncoro = coro_clone (aTHX_ coro);
3536 MAGIC *mg;
3537 /* TODO: too much duplication */
3538 ncoro->hv = newHV ();
3539 mg = sv_magicext ((SV *)ncoro->hv, 0, CORO_MAGIC_type_state, &coro_state_vtbl, (char *)ncoro, 0);
3540 mg->mg_flags |= MGf_DUP;
3541 RETVAL = sv_bless (newRV_noinc ((SV *)ncoro->hv), SvSTASH (coro->hv));
3542#else
3543 croak ("Coro::State->clone has not been configured into this installation of Coro, realised");
3544#endif
3545}
3546 OUTPUT:
3547 RETVAL
3548
3549int
3550cctx_stacksize (int new_stacksize = 0)
3551 PROTOTYPE: ;$
3552 CODE:
3553 RETVAL = cctx_stacksize;
3554 if (new_stacksize)
3555 {
3556 cctx_stacksize = new_stacksize;
3557 ++cctx_gen;
3558 }
3559 OUTPUT:
3560 RETVAL
3561
3562int
3563cctx_max_idle (int max_idle = 0)
3564 PROTOTYPE: ;$
3565 CODE:
3566 RETVAL = cctx_max_idle;
3567 if (max_idle > 1)
3568 cctx_max_idle = max_idle;
3569 OUTPUT:
3570 RETVAL
3571
3572int
3573cctx_count ()
3574 PROTOTYPE:
3575 CODE:
3576 RETVAL = cctx_count;
3577 OUTPUT:
3578 RETVAL
3579
3580int
3581cctx_idle ()
3582 PROTOTYPE:
3583 CODE:
3584 RETVAL = cctx_idle;
3585 OUTPUT:
3586 RETVAL
3587
3588void
3589list ()
3590 PROTOTYPE:
3591 PPCODE:
3592{
3593 struct coro *coro;
3594 for (coro = coro_first; coro; coro = coro->next)
3595 if (coro->hv)
3596 XPUSHs (sv_2mortal (newRV_inc ((SV *)coro->hv)));
3597}
3598
3599void
3600call (Coro::State coro, SV *coderef)
3601 ALIAS:
3602 eval = 1
3603 CODE:
3604{
3605 if (coro->mainstack && ((coro->flags & CF_RUNNING) || coro->slot))
3606 {
3607 struct coro *current = SvSTATE_current;
3608 struct CoroSLF slf_save;
3609
3610 if (current != coro)
235 { 3611 {
236 /* I never used formats, so how should I know how these are implemented? */ 3612 PUTBACK;
237 /* my bold guess is as a simple, plain sub... */ 3613 save_perl (aTHX_ current);
238 croak ("CXt_FORMAT not yet handled. Don't switch coroutines from within formats"); 3614 load_perl (aTHX_ coro);
3615 /* the coro is most likely in an active SLF call.
3616 * while not strictly required (the code we execute is
3617 * not allowed to call any SLF functions), it's cleaner
3618 * to reinitialise the slf_frame and restore it later.
3619 * This might one day allow us to actually do SLF calls
3620 * from code executed here.
3621 */
3622 slf_save = slf_frame;
3623 slf_frame.prepare = 0;
3624 SPAGAIN;
3625 }
3626
3627 PUSHSTACK;
3628
3629 PUSHMARK (SP);
3630 PUTBACK;
3631
3632 if (ix)
3633 eval_sv (coderef, 0);
3634 else
3635 call_sv (coderef, G_KEEPERR | G_EVAL | G_VOID | G_DISCARD);
3636
3637 POPSTACK;
3638 SPAGAIN;
3639
3640 if (current != coro)
3641 {
3642 PUTBACK;
3643 slf_frame = slf_save;
3644 save_perl (aTHX_ coro);
3645 load_perl (aTHX_ current);
3646 SPAGAIN;
239 } 3647 }
240 } 3648 }
241
242 if (top_si->si_type == PERLSI_MAIN)
243 break;
244
245 top_si = top_si->si_prev;
246 ccstk = top_si->si_cxstack;
247 cxix = top_si->si_cxix;
248 }
249
250 PUTBACK;
251 }
252
253 c->dowarn = PL_dowarn;
254 c->defav = GvAV (PL_defgv);
255 c->curstackinfo = PL_curstackinfo;
256 c->curstack = PL_curstack;
257 c->mainstack = PL_mainstack;
258 c->stack_sp = PL_stack_sp;
259 c->op = PL_op;
260 c->curpad = PL_curpad;
261 c->stack_base = PL_stack_base;
262 c->stack_max = PL_stack_max;
263 c->tmps_stack = PL_tmps_stack;
264 c->tmps_floor = PL_tmps_floor;
265 c->tmps_ix = PL_tmps_ix;
266 c->tmps_max = PL_tmps_max;
267 c->markstack = PL_markstack;
268 c->markstack_ptr = PL_markstack_ptr;
269 c->markstack_max = PL_markstack_max;
270 c->scopestack = PL_scopestack;
271 c->scopestack_ix = PL_scopestack_ix;
272 c->scopestack_max = PL_scopestack_max;
273 c->savestack = PL_savestack;
274 c->savestack_ix = PL_savestack_ix;
275 c->savestack_max = PL_savestack_max;
276 c->retstack = PL_retstack;
277 c->retstack_ix = PL_retstack_ix;
278 c->retstack_max = PL_retstack_max;
279 c->curcop = PL_curcop;
280} 3649}
281 3650
282#define LOAD(state) do { load_state(aTHX_ state); SPAGAIN; } while (0) 3651SV *
283#define SAVE(state) do { PUTBACK; save_state(aTHX_ state); } while (0) 3652is_ready (Coro::State coro)
284
285static void
286load_state(pTHX_ Coro__State c)
287{
288 PL_dowarn = c->dowarn;
289 GvAV (PL_defgv) = c->defav;
290 PL_curstackinfo = c->curstackinfo;
291 PL_curstack = c->curstack;
292 PL_mainstack = c->mainstack;
293 PL_stack_sp = c->stack_sp;
294 PL_op = c->op;
295 PL_curpad = c->curpad;
296 PL_stack_base = c->stack_base;
297 PL_stack_max = c->stack_max;
298 PL_tmps_stack = c->tmps_stack;
299 PL_tmps_floor = c->tmps_floor;
300 PL_tmps_ix = c->tmps_ix;
301 PL_tmps_max = c->tmps_max;
302 PL_markstack = c->markstack;
303 PL_markstack_ptr = c->markstack_ptr;
304 PL_markstack_max = c->markstack_max;
305 PL_scopestack = c->scopestack;
306 PL_scopestack_ix = c->scopestack_ix;
307 PL_scopestack_max = c->scopestack_max;
308 PL_savestack = c->savestack;
309 PL_savestack_ix = c->savestack_ix;
310 PL_savestack_max = c->savestack_max;
311 PL_retstack = c->retstack;
312 PL_retstack_ix = c->retstack_ix;
313 PL_retstack_max = c->retstack_max;
314 PL_curcop = c->curcop;
315
316 {
317 dSP;
318 CV *cv;
319
320 /* now do the ugly restore mess */
321 while ((cv = (CV *)POPs))
322 {
323 AV *padlist = (AV *)POPs;
324
325 put_padlist (cv);
326 CvPADLIST(cv) = padlist;
327 CvDEPTH(cv) = (I32)POPs;
328
329#ifdef USE_THREADS
330 CvOWNER(cv) = (struct perl_thread *)POPs;
331 error does not work either
332#endif
333 }
334
335 PUTBACK;
336 }
337}
338
339/* this is an EXACT copy of S_nuke_stacks in perl.c, which is unfortunately static */
340STATIC void
341destroy_stacks(pTHX)
342{
343 /* die does this while calling POPSTACK, but I just don't see why. */
344 /* OTOH, die does not have a memleak, but we do... */
345 dounwind(-1);
346
347 /* is this ugly, I ask? */
348 while (PL_scopestack_ix)
349 LEAVE;
350
351 while (PL_curstackinfo->si_next)
352 PL_curstackinfo = PL_curstackinfo->si_next;
353
354 while (PL_curstackinfo)
355 {
356 PERL_SI *p = PL_curstackinfo->si_prev;
357
358 SvREFCNT_dec(PL_curstackinfo->si_stack);
359 Safefree(PL_curstackinfo->si_cxstack);
360 Safefree(PL_curstackinfo);
361 PL_curstackinfo = p;
362 }
363
364 if (PL_scopestack_ix != 0)
365 Perl_warner(aTHX_ WARN_INTERNAL,
366 "Unbalanced scopes: %ld more ENTERs than LEAVEs\n",
367 (long)PL_scopestack_ix);
368 if (PL_savestack_ix != 0)
369 Perl_warner(aTHX_ WARN_INTERNAL,
370 "Unbalanced saves: %ld more saves than restores\n",
371 (long)PL_savestack_ix);
372 if (PL_tmps_floor != -1)
373 Perl_warner(aTHX_ WARN_INTERNAL,"Unbalanced tmps: %ld more allocs than frees\n",
374 (long)PL_tmps_floor + 1);
375 /*
376 */
377 Safefree(PL_tmps_stack);
378 Safefree(PL_markstack);
379 Safefree(PL_scopestack);
380 Safefree(PL_savestack);
381 Safefree(PL_retstack);
382}
383
384#define SUB_INIT "Coro::State::_newcoro"
385
386MODULE = Coro::State PACKAGE = Coro::State
387
388PROTOTYPES: ENABLE
389
390BOOT:
391 if (!padlist_cache)
392 padlist_cache = newHV ();
393
394Coro::State
395_newprocess(args)
396 SV * args
397 PROTOTYPE: $ 3653 PROTOTYPE: $
3654 ALIAS:
3655 is_ready = CF_READY
3656 is_running = CF_RUNNING
3657 is_new = CF_NEW
3658 is_destroyed = CF_ZOMBIE
3659 is_zombie = CF_ZOMBIE
3660 is_suspended = CF_SUSPENDED
3661 CODE:
3662 RETVAL = boolSV (coro->flags & ix);
3663 OUTPUT:
3664 RETVAL
3665
3666void
3667throw (Coro::State self, SV *exception = &PL_sv_undef)
3668 PROTOTYPE: $;$
398 CODE: 3669 CODE:
399 Coro__State coro; 3670{
3671 struct coro *current = SvSTATE_current;
3672 SV **exceptionp = self == current ? &CORO_THROW : &self->except;
3673 SvREFCNT_dec (*exceptionp);
3674 SvGETMAGIC (exception);
3675 *exceptionp = SvOK (exception) ? newSVsv (exception) : 0;
3676}
400 3677
401 if (!SvROK (args) || SvTYPE (SvRV (args)) != SVt_PVAV) 3678void
402 croak ("Coro::State::newprocess expects an arrayref"); 3679api_trace (SV *coro, int flags = CC_TRACE | CC_TRACE_SUB)
3680 PROTOTYPE: $;$
3681 C_ARGS: aTHX_ coro, flags
3682
3683SV *
3684has_cctx (Coro::State coro)
3685 PROTOTYPE: $
3686 CODE:
3687 /* maybe manage the running flag differently */
3688 RETVAL = boolSV (!!coro->cctx || (coro->flags & CF_RUNNING));
3689 OUTPUT:
3690 RETVAL
3691
3692int
3693is_traced (Coro::State coro)
3694 PROTOTYPE: $
3695 CODE:
3696 RETVAL = (coro->cctx ? coro->cctx->flags : 0) & CC_TRACE_ALL;
3697 OUTPUT:
3698 RETVAL
3699
3700UV
3701rss (Coro::State coro)
3702 PROTOTYPE: $
3703 ALIAS:
3704 usecount = 1
3705 CODE:
3706 switch (ix)
3707 {
3708 case 0: RETVAL = coro_rss (aTHX_ coro); break;
3709 case 1: RETVAL = coro->usecount; break;
403 3710 }
404 New (0, coro, 1, struct coro); 3711 OUTPUT:
405
406 coro->mainstack = 0; /* actual work is done inside transfer */
407 coro->args = (AV *)SvREFCNT_inc (SvRV (args));
408
409 RETVAL = coro; 3712 RETVAL
3713
3714void
3715force_cctx ()
3716 PROTOTYPE:
3717 CODE:
3718 cctx_current->idle_sp = 0;
3719
3720void
3721swap_defsv (Coro::State self)
3722 PROTOTYPE: $
3723 ALIAS:
3724 swap_defav = 1
3725 CODE:
3726 if (!self->slot)
3727 croak ("cannot swap state with coroutine that has no saved state,");
3728 else
3729 {
3730 SV **src = ix ? (SV **)&GvAV (PL_defgv) : &GvSV (PL_defgv);
3731 SV **dst = ix ? (SV **)&self->slot->defav : (SV **)&self->slot->defsv;
3732
3733 SV *tmp = *src; *src = *dst; *dst = tmp;
3734 }
3735
3736void
3737cancel (Coro::State self)
3738 CODE:
3739 coro_state_destroy (aTHX_ self);
3740
3741SV *
3742enable_times (int enabled = enable_times)
3743 CODE:
3744{
3745 RETVAL = boolSV (enable_times);
3746
3747 if (enabled != enable_times)
3748 {
3749 enable_times = enabled;
3750
3751 coro_times_update ();
3752 (enabled ? coro_times_sub : coro_times_add)(SvSTATE (coro_current));
3753 }
3754}
410 OUTPUT: 3755 OUTPUT:
411 RETVAL 3756 RETVAL
412 3757
413void 3758void
414transfer(prev,next) 3759times (Coro::State self)
415 Coro::State_or_hashref prev 3760 PPCODE:
416 Coro::State_or_hashref next 3761{
3762 struct coro *current = SvSTATE (coro_current);
3763
3764 if (ecb_expect_false (current == self))
3765 {
3766 coro_times_update ();
3767 coro_times_add (SvSTATE (coro_current));
3768 }
3769
3770 EXTEND (SP, 2);
3771 PUSHs (sv_2mortal (newSVnv (self->t_real [0] + self->t_real [1] * 1e-9)));
3772 PUSHs (sv_2mortal (newSVnv (self->t_cpu [0] + self->t_cpu [1] * 1e-9)));
3773
3774 if (ecb_expect_false (current == self))
3775 coro_times_sub (SvSTATE (coro_current));
3776}
3777
3778void
3779swap_sv (Coro::State coro, SV *sv, SV *swapsv)
3780 CODE:
3781{
3782 struct coro *current = SvSTATE_current;
3783
3784 if (current == coro)
3785 SWAP_SVS (current);
3786
3787 if (!coro->swap_sv)
3788 coro->swap_sv = newAV ();
3789
3790 av_push (coro->swap_sv, SvREFCNT_inc_NN (SvRV (sv )));
3791 av_push (coro->swap_sv, SvREFCNT_inc_NN (SvRV (swapsv)));
3792
3793 if (current == coro)
3794 SWAP_SVS (current);
3795}
3796
3797
3798MODULE = Coro::State PACKAGE = Coro
3799
3800BOOT:
3801{
3802 sv_pool_rss = coro_get_sv (aTHX_ "Coro::POOL_RSS" , TRUE);
3803 sv_pool_size = coro_get_sv (aTHX_ "Coro::POOL_SIZE" , TRUE);
3804 cv_coro_run = get_cv ( "Coro::_coro_run" , GV_ADD);
3805 coro_current = coro_get_sv (aTHX_ "Coro::current" , FALSE); SvREADONLY_on (coro_current);
3806 av_async_pool = coro_get_av (aTHX_ "Coro::async_pool", TRUE);
3807 av_destroy = coro_get_av (aTHX_ "Coro::destroy" , TRUE);
3808 sv_manager = coro_get_sv (aTHX_ "Coro::manager" , TRUE);
3809 sv_idle = coro_get_sv (aTHX_ "Coro::idle" , TRUE);
3810
3811 sv_async_pool_idle = newSVpv ("[async pool idle]", 0); SvREADONLY_on (sv_async_pool_idle);
3812 sv_Coro = newSVpv ("Coro", 0); SvREADONLY_on (sv_Coro);
3813 cv_pool_handler = get_cv ("Coro::pool_handler", GV_ADD); SvREADONLY_on (cv_pool_handler);
3814 CvNODEBUG_on (get_cv ("Coro::_pool_handler", 0)); /* work around a debugger bug */
3815
3816 coro_stash = gv_stashpv ("Coro", TRUE);
3817
3818 newCONSTSUB (coro_stash, "PRIO_MAX", newSViv (CORO_PRIO_MAX));
3819 newCONSTSUB (coro_stash, "PRIO_HIGH", newSViv (CORO_PRIO_HIGH));
3820 newCONSTSUB (coro_stash, "PRIO_NORMAL", newSViv (CORO_PRIO_NORMAL));
3821 newCONSTSUB (coro_stash, "PRIO_LOW", newSViv (CORO_PRIO_LOW));
3822 newCONSTSUB (coro_stash, "PRIO_IDLE", newSViv (CORO_PRIO_IDLE));
3823 newCONSTSUB (coro_stash, "PRIO_MIN", newSViv (CORO_PRIO_MIN));
3824
3825 {
3826 SV *sv = coro_get_sv (aTHX_ "Coro::API", TRUE);
3827
3828 coroapi.schedule = api_schedule;
3829 coroapi.schedule_to = api_schedule_to;
3830 coroapi.cede = api_cede;
3831 coroapi.cede_notself = api_cede_notself;
3832 coroapi.ready = api_ready;
3833 coroapi.is_ready = api_is_ready;
3834 coroapi.nready = coro_nready;
3835 coroapi.current = coro_current;
3836
3837 /*GCoroAPI = &coroapi;*/
3838 sv_setiv (sv, (IV)&coroapi);
3839 SvREADONLY_on (sv);
3840 }
3841}
3842
3843SV *
3844async (...)
3845 PROTOTYPE: &@
417 CODE: 3846 CODE:
3847 RETVAL = coro_new (aTHX_ coro_stash, &ST (0), items, 1);
3848 api_ready (aTHX_ RETVAL);
3849 OUTPUT:
3850 RETVAL
418 3851
419 if (prev != next) 3852void
3853_destroy (Coro::State coro)
3854 CODE:
3855 /* used by the manager thread */
3856 coro_state_destroy (aTHX_ coro);
3857
3858void
3859on_destroy (Coro::State coro, SV *cb)
3860 CODE:
3861 coro_push_on_destroy (aTHX_ coro, newSVsv (cb));
3862
3863void
3864join (...)
3865 CODE:
3866 CORO_EXECUTE_SLF_XS (slf_init_join);
3867
3868void
3869terminate (...)
3870 CODE:
3871 CORO_EXECUTE_SLF_XS (slf_init_terminate);
3872
3873void
3874cancel (...)
3875 CODE:
3876 CORO_EXECUTE_SLF_XS (slf_init_cancel);
3877
3878int
3879safe_cancel (Coro::State self, ...)
3880 C_ARGS: aTHX_ self, &ST (1), items - 1
3881
3882void
3883schedule (...)
3884 CODE:
3885 CORO_EXECUTE_SLF_XS (slf_init_schedule);
3886
3887void
3888schedule_to (...)
3889 CODE:
3890 CORO_EXECUTE_SLF_XS (slf_init_schedule_to);
3891
3892void
3893cede_to (...)
3894 CODE:
3895 CORO_EXECUTE_SLF_XS (slf_init_cede_to);
3896
3897void
3898cede (...)
3899 CODE:
3900 CORO_EXECUTE_SLF_XS (slf_init_cede);
3901
3902void
3903cede_notself (...)
3904 CODE:
3905 CORO_EXECUTE_SLF_XS (slf_init_cede_notself);
3906
3907void
3908_set_current (SV *current)
3909 PROTOTYPE: $
3910 CODE:
3911 SvREFCNT_dec (SvRV (coro_current));
3912 SvRV_set (coro_current, SvREFCNT_inc_NN (SvRV (current)));
3913
3914void
3915_set_readyhook (SV *hook)
3916 PROTOTYPE: $
3917 CODE:
3918 SvREFCNT_dec (coro_readyhook);
3919 SvGETMAGIC (hook);
3920 if (SvOK (hook))
3921 {
3922 coro_readyhook = newSVsv (hook);
3923 CORO_READYHOOK = invoke_sv_ready_hook_helper;
3924 }
3925 else
420 { 3926 {
421 /* 3927 coro_readyhook = 0;
422 * this could be done in newprocess which would lead to 3928 CORO_READYHOOK = 0;
423 * extremely elegant and fast (just SAVE/LOAD)
424 * code here, but lazy allocation of stacks has also
425 * some virtues and the overhead of the if() is nil.
426 */
427 if (next->mainstack)
428 {
429 SAVE (prev);
430 LOAD (next);
431 /* mark this state as in-use */
432 next->mainstack = 0;
433 next->tmps_ix = -2;
434 }
435 else if (next->tmps_ix == -2)
436 {
437 croak ("tried to transfer to running coroutine");
438 }
439 else
440 {
441 SAVE (prev);
442
443 /*
444 * emulate part of the perl startup here.
445 */
446 UNOP myop;
447
448 init_stacks (); /* from perl.c */
449 PL_op = (OP *)&myop;
450 /*PL_curcop = 0;*/
451 GvAV (PL_defgv) = (AV *)SvREFCNT_inc ((SV *)next->args);
452
453 SPAGAIN;
454 Zero(&myop, 1, UNOP);
455 myop.op_next = Nullop;
456 myop.op_flags = OPf_WANT_VOID;
457
458 PUSHMARK(SP);
459 XPUSHs ((SV*)get_cv(SUB_INIT, TRUE));
460 PUTBACK;
461 /*
462 * the next line is slightly wrong, as PL_op->op_next
463 * is actually being executed so we skip the first op.
464 * that doesn't matter, though, since it is only
465 * pp_nextstate and we never return...
466 */
467 PL_op = Perl_pp_entersub(aTHX);
468 SPAGAIN;
469
470 ENTER;
471 }
472 } 3929 }
473 3930
3931int
3932prio (Coro::State coro, int newprio = 0)
3933 PROTOTYPE: $;$
3934 ALIAS:
3935 nice = 1
3936 CODE:
3937{
3938 RETVAL = coro->prio;
3939
3940 if (items > 1)
3941 {
3942 if (ix)
3943 newprio = coro->prio - newprio;
3944
3945 if (newprio < CORO_PRIO_MIN) newprio = CORO_PRIO_MIN;
3946 if (newprio > CORO_PRIO_MAX) newprio = CORO_PRIO_MAX;
3947
3948 coro->prio = newprio;
3949 }
3950}
3951 OUTPUT:
3952 RETVAL
3953
3954SV *
3955ready (SV *self)
3956 PROTOTYPE: $
3957 CODE:
3958 RETVAL = boolSV (api_ready (aTHX_ self));
3959 OUTPUT:
3960 RETVAL
3961
3962int
3963nready (...)
3964 PROTOTYPE:
3965 CODE:
3966 RETVAL = coro_nready;
3967 OUTPUT:
3968 RETVAL
3969
474void 3970void
475DESTROY(coro) 3971suspend (Coro::State self)
476 Coro::State coro 3972 PROTOTYPE: $
3973 CODE:
3974 self->flags |= CF_SUSPENDED;
3975
3976void
3977resume (Coro::State self)
3978 PROTOTYPE: $
3979 CODE:
3980 self->flags &= ~CF_SUSPENDED;
3981
3982void
3983_pool_handler (...)
3984 CODE:
3985 CORO_EXECUTE_SLF_XS (slf_init_pool_handler);
3986
3987void
3988async_pool (SV *cv, ...)
3989 PROTOTYPE: &@
3990 PPCODE:
3991{
3992 HV *hv = (HV *)av_pop (av_async_pool);
3993 AV *av = newAV ();
3994 SV *cb = ST (0);
3995 int i;
3996
3997 av_extend (av, items - 2);
3998 for (i = 1; i < items; ++i)
3999 av_push (av, SvREFCNT_inc_NN (ST (i)));
4000
4001 if ((SV *)hv == &PL_sv_undef)
4002 {
4003 SV *sv = coro_new (aTHX_ coro_stash, (SV **)&cv_pool_handler, 1, 1);
4004 hv = (HV *)SvREFCNT_inc_NN (SvRV (sv));
4005 SvREFCNT_dec (sv);
4006 }
4007
4008 {
4009 struct coro *coro = SvSTATE_hv (hv);
4010
4011 assert (!coro->invoke_cb);
4012 assert (!coro->invoke_av);
4013 coro->invoke_cb = SvREFCNT_inc (cb);
4014 coro->invoke_av = av;
4015 }
4016
4017 api_ready (aTHX_ (SV *)hv);
4018
4019 if (GIMME_V != G_VOID)
4020 XPUSHs (sv_2mortal (newRV_noinc ((SV *)hv)));
4021 else
4022 SvREFCNT_dec (hv);
4023}
4024
4025SV *
4026rouse_cb ()
4027 PROTOTYPE:
4028 CODE:
4029 RETVAL = coro_new_rouse_cb (aTHX);
4030 OUTPUT:
4031 RETVAL
4032
4033void
4034rouse_wait (...)
4035 PROTOTYPE: ;$
4036 PPCODE:
4037 CORO_EXECUTE_SLF_XS (slf_init_rouse_wait);
4038
4039void
4040on_enter (SV *block)
4041 ALIAS:
4042 on_leave = 1
4043 PROTOTYPE: &
4044 CODE:
4045{
4046 struct coro *coro = SvSTATE_current;
4047 AV **avp = ix ? &coro->on_leave : &coro->on_enter;
4048
4049 block = s_get_cv_croak (block);
4050
4051 if (!*avp)
4052 *avp = newAV ();
4053
4054 av_push (*avp, SvREFCNT_inc (block));
4055
4056 if (!ix)
4057 on_enterleave_call (aTHX_ block);
4058
4059 LEAVE; /* pp_entersub unfortunately forces an ENTER/LEAVE around XS calls */
4060 SAVEDESTRUCTOR_X (ix ? coro_pop_on_leave : coro_pop_on_enter, (void *)coro);
4061 ENTER; /* pp_entersub unfortunately forces an ENTER/LEAVE around XS calls */
4062}
4063
4064
4065MODULE = Coro::State PACKAGE = PerlIO::cede
4066
4067BOOT:
4068 PerlIO_define_layer (aTHX_ &PerlIO_cede);
4069
4070
4071MODULE = Coro::State PACKAGE = Coro::Semaphore
4072
4073SV *
4074new (SV *klass, SV *count = 0)
4075 CODE:
4076{
4077 int semcnt = 1;
4078
4079 if (count)
4080 {
4081 SvGETMAGIC (count);
4082
4083 if (SvOK (count))
4084 semcnt = SvIV (count);
4085 }
4086
4087 RETVAL = sv_bless (
4088 coro_waitarray_new (aTHX_ semcnt),
4089 GvSTASH (CvGV (cv))
4090 );
4091}
4092 OUTPUT:
4093 RETVAL
4094
4095# helper for Coro::Channel and others
4096SV *
4097_alloc (int count)
4098 CODE:
4099 RETVAL = coro_waitarray_new (aTHX_ count);
4100 OUTPUT:
4101 RETVAL
4102
4103SV *
4104count (SV *self)
4105 CODE:
4106 RETVAL = newSVsv (AvARRAY ((AV *)SvRV (self))[0]);
4107 OUTPUT:
4108 RETVAL
4109
4110void
4111up (SV *self, int adjust = 1)
4112 ALIAS:
4113 adjust = 1
477 CODE: 4114 CODE:
4115 coro_semaphore_adjust (aTHX_ (AV *)SvRV (self), ix ? adjust : 1);
478 4116
479 if (coro->mainstack) 4117void
4118down (...)
4119 CODE:
4120 CORO_EXECUTE_SLF_XS (slf_init_semaphore_down);
4121
4122void
4123wait (...)
4124 CODE:
4125 CORO_EXECUTE_SLF_XS (slf_init_semaphore_wait);
4126
4127void
4128try (SV *self)
4129 PPCODE:
4130{
4131 AV *av = (AV *)SvRV (self);
4132 SV *count_sv = AvARRAY (av)[0];
4133 IV count = SvIVX (count_sv);
4134
4135 if (count > 0)
480 { 4136 {
481 struct coro temp; 4137 --count;
482 4138 SvIVX (count_sv) = count;
483 SAVE(aTHX_ (&temp)); 4139 XSRETURN_YES;
484 LOAD(aTHX_ coro);
485
486 destroy_stacks ();
487 SvREFCNT_dec ((SV *)GvAV (PL_defgv));
488
489 LOAD((&temp));
490 } 4140 }
4141 else
4142 XSRETURN_NO;
4143}
491 4144
492 SvREFCNT_dec (coro->args); 4145void
493 Safefree (coro); 4146waiters (SV *self)
4147 PPCODE:
4148{
4149 AV *av = (AV *)SvRV (self);
4150 int wcount = AvFILLp (av) + 1 - 1;
494 4151
4152 if (GIMME_V == G_SCALAR)
4153 XPUSHs (sv_2mortal (newSViv (wcount)));
4154 else
4155 {
4156 int i;
4157 EXTEND (SP, wcount);
4158 for (i = 1; i <= wcount; ++i)
4159 PUSHs (sv_2mortal (newRV_inc (AvARRAY (av)[i])));
4160 }
4161}
495 4162
4163MODULE = Coro::State PACKAGE = Coro::SemaphoreSet
4164
4165void
4166_may_delete (SV *sem, int count, unsigned int extra_refs)
4167 PPCODE:
4168{
4169 AV *av = (AV *)SvRV (sem);
4170
4171 if (SvREFCNT ((SV *)av) == 1 + extra_refs
4172 && AvFILLp (av) == 0 /* no waiters, just count */
4173 && SvIV (AvARRAY (av)[0]) == count)
4174 XSRETURN_YES;
4175
4176 XSRETURN_NO;
4177}
4178
4179MODULE = Coro::State PACKAGE = Coro::Signal
4180
4181SV *
4182new (SV *klass)
4183 CODE:
4184 RETVAL = sv_bless (
4185 coro_waitarray_new (aTHX_ 0),
4186 GvSTASH (CvGV (cv))
4187 );
4188 OUTPUT:
4189 RETVAL
4190
4191void
4192wait (...)
4193 CODE:
4194 CORO_EXECUTE_SLF_XS (slf_init_signal_wait);
4195
4196void
4197broadcast (SV *self)
4198 CODE:
4199{
4200 AV *av = (AV *)SvRV (self);
4201 coro_signal_wake (aTHX_ av, AvFILLp (av));
4202}
4203
4204void
4205send (SV *self)
4206 CODE:
4207{
4208 AV *av = (AV *)SvRV (self);
4209
4210 if (AvFILLp (av))
4211 coro_signal_wake (aTHX_ av, 1);
4212 else
4213 SvIVX (AvARRAY (av)[0]) = 1; /* remember the signal */
4214}
4215
4216IV
4217awaited (SV *self)
4218 CODE:
4219 RETVAL = AvFILLp ((AV *)SvRV (self)) + 1 - 1;
4220 OUTPUT:
4221 RETVAL
4222
4223
4224MODULE = Coro::State PACKAGE = Coro::AnyEvent
4225
4226BOOT:
4227 sv_activity = coro_get_sv (aTHX_ "Coro::AnyEvent::ACTIVITY", TRUE);
4228
4229void
4230_schedule (...)
4231 CODE:
4232{
4233 static int incede;
4234
4235 api_cede_notself (aTHX);
4236
4237 ++incede;
4238 while (coro_nready >= incede && api_cede (aTHX))
4239 ;
4240
4241 sv_setsv (sv_activity, &PL_sv_undef);
4242 if (coro_nready >= incede)
4243 {
4244 PUSHMARK (SP);
4245 PUTBACK;
4246 call_pv ("Coro::AnyEvent::_activity", G_KEEPERR | G_EVAL | G_VOID | G_DISCARD);
4247 }
4248
4249 --incede;
4250}
4251
4252
4253MODULE = Coro::State PACKAGE = Coro::AIO
4254
4255void
4256_register (char *target, char *proto, SV *req)
4257 CODE:
4258{
4259 SV *req_cv = s_get_cv_croak (req);
4260 /* newXSproto doesn't return the CV on 5.8 */
4261 CV *slf_cv = newXS (target, coro_aio_req_xs, __FILE__);
4262 sv_setpv ((SV *)slf_cv, proto);
4263 sv_magicext ((SV *)slf_cv, (SV *)req_cv, CORO_MAGIC_type_aio, 0, 0, 0);
4264}
4265
4266MODULE = Coro::State PACKAGE = Coro::Select
4267
4268void
4269patch_pp_sselect ()
4270 CODE:
4271 if (!coro_old_pp_sselect)
4272 {
4273 coro_select_select = (SV *)get_cv ("Coro::Select::select", 0);
4274 coro_old_pp_sselect = PL_ppaddr [OP_SSELECT];
4275 PL_ppaddr [OP_SSELECT] = coro_pp_sselect;
4276 }
4277
4278void
4279unpatch_pp_sselect ()
4280 CODE:
4281 if (coro_old_pp_sselect)
4282 {
4283 PL_ppaddr [OP_SSELECT] = coro_old_pp_sselect;
4284 coro_old_pp_sselect = 0;
4285 }
4286

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines