ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/Coro/Coro/State.xs
(Generate patch)

Comparing Coro/Coro/State.xs (file contents):
Revision 1.6 by root, Tue Jul 17 15:42:28 2001 UTC vs.
Revision 1.413 by root, Fri Jun 17 18:55:08 2011 UTC

1/* this works around a bug in mingw32 providing a non-working setjmp */
2#define USE_NO_MINGW_SETJMP_TWO_ARGS
3
4#define NDEBUG 1
5
6#include "libcoro/coro.c"
7
8#define PERL_NO_GET_CONTEXT
9#define PERL_EXT
10
1#include "EXTERN.h" 11#include "EXTERN.h"
2#include "perl.h" 12#include "perl.h"
3#include "XSUB.h" 13#include "XSUB.h"
14#include "perliol.h"
4 15
5#if 0 16#include "schmorp.h"
6# define CHK(x) (void *)0 17#include "ecb.h"
18
19#include <stddef.h>
20#include <stdio.h>
21#include <errno.h>
22#include <assert.h>
23
24#ifndef SVs_PADSTALE
25# define SVs_PADSTALE 0
26#endif
27
28#if defined(_WIN32)
29# undef HAS_GETTIMEOFDAY
30# undef setjmp
31# undef longjmp
32# undef _exit
33# define setjmp _setjmp /* deep magic */
7#else 34#else
8# define CHK(x) if (!(x)) croak("FATAL, CHK: " #x) 35# include <inttypes.h> /* most portable stdint.h */
9#endif 36#endif
10 37
38#if HAVE_MMAP
39# include <unistd.h>
40# include <sys/mman.h>
41# ifndef MAP_ANONYMOUS
42# ifdef MAP_ANON
43# define MAP_ANONYMOUS MAP_ANON
44# else
45# undef HAVE_MMAP
46# endif
47# endif
48# include <limits.h>
49# ifndef PAGESIZE
50# define PAGESIZE pagesize
51# define BOOT_PAGESIZE pagesize = sysconf (_SC_PAGESIZE)
52static long pagesize;
53# else
54# define BOOT_PAGESIZE (void)0
55# endif
56#else
57# define PAGESIZE 0
58# define BOOT_PAGESIZE (void)0
59#endif
60
61#if CORO_USE_VALGRIND
62# include <valgrind/valgrind.h>
63#endif
64
65/* the maximum number of idle cctx that will be pooled */
66static int cctx_max_idle = 4;
67
68#if !__i386 && !__x86_64 && !__powerpc && !__m68k && !__alpha && !__mips && !__sparc64
69# undef CORO_STACKGUARD
70#endif
71
72#ifndef CORO_STACKGUARD
73# define CORO_STACKGUARD 0
74#endif
75
76/* prefer perl internal functions over our own? */
77#ifndef CORO_PREFER_PERL_FUNCTIONS
78# define CORO_PREFER_PERL_FUNCTIONS 0
79#endif
80
81/* The next macros try to return the current stack pointer, in an as
82 * portable way as possible. */
83#if __GNUC__ >= 4
84# define dSTACKLEVEL int stacklevel_dummy
85# define STACKLEVEL __builtin_frame_address (0)
86#else
87# define dSTACKLEVEL volatile void *stacklevel
88# define STACKLEVEL ((void *)&stacklevel)
89#endif
90
91#define IN_DESTRUCT PL_dirty
92
93#include "CoroAPI.h"
94#define GCoroAPI (&coroapi) /* very sneaky */
95
96#ifdef USE_ITHREADS
97# if CORO_PTHREAD
98static void *coro_thx;
99# endif
100#endif
101
102/* used in state.h */
103#define VAR(name,type) VARx(name, PL_ ## name, type)
104
105#ifdef __linux
106# include <time.h> /* for timespec */
107# include <syscall.h> /* for SYS_* */
108# ifdef SYS_clock_gettime
109# define coro_clock_gettime(id, ts) syscall (SYS_clock_gettime, (id), (ts))
110# define CORO_CLOCK_MONOTONIC 1
111# define CORO_CLOCK_THREAD_CPUTIME_ID 3
112# endif
113#endif
114
115static double (*nvtime)(); /* so why doesn't it take void? */
116static void (*u2time)(pTHX_ UV ret[2]);
117
118/* we hijack an hopefully unused CV flag for our purposes */
119#define CVf_SLF 0x4000
120static OP *pp_slf (pTHX);
121static void slf_destroy (pTHX_ struct coro *coro);
122
123static U32 cctx_gen;
124static size_t cctx_stacksize = CORO_STACKSIZE;
125static struct CoroAPI coroapi;
126static AV *main_mainstack; /* used to differentiate between $main and others */
127static JMPENV *main_top_env;
128static HV *coro_state_stash, *coro_stash;
129static volatile SV *coro_mortal; /* will be freed/thrown after next transfer */
130
131static AV *av_destroy; /* destruction queue */
132static SV *sv_manager; /* the manager coro */
133static SV *sv_idle; /* $Coro::idle */
134
135static GV *irsgv; /* $/ */
136static GV *stdoutgv; /* *STDOUT */
137static SV *rv_diehook;
138static SV *rv_warnhook;
139static HV *hv_sig; /* %SIG */
140
141/* async_pool helper stuff */
142static SV *sv_pool_rss;
143static SV *sv_pool_size;
144static SV *sv_async_pool_idle; /* description string */
145static AV *av_async_pool; /* idle pool */
146static SV *sv_Coro; /* class string */
147static CV *cv_pool_handler;
148
149/* Coro::AnyEvent */
150static SV *sv_activity;
151
152/* enable processtime/realtime profiling */
153static char enable_times;
154typedef U32 coro_ts[2];
155static coro_ts time_real, time_cpu;
156static char times_valid;
157
158static struct coro_cctx *cctx_first;
159static int cctx_count, cctx_idle;
160
161enum
162{
163 CC_MAPPED = 0x01,
164 CC_NOREUSE = 0x02, /* throw this away after tracing */
165 CC_TRACE = 0x04,
166 CC_TRACE_SUB = 0x08, /* trace sub calls */
167 CC_TRACE_LINE = 0x10, /* trace each statement */
168 CC_TRACE_ALL = CC_TRACE_SUB | CC_TRACE_LINE,
169};
170
171/* this is a structure representing a c-level coroutine */
172typedef struct coro_cctx
173{
174 struct coro_cctx *next;
175
176 /* the stack */
177 void *sptr;
178 size_t ssize;
179
180 /* cpu state */
181 void *idle_sp; /* sp of top-level transfer/schedule/cede call */
182 JMPENV *idle_te; /* same as idle_sp, but for top_env, TODO: remove once stable */
183 JMPENV *top_env;
184 coro_context cctx;
185
186 U32 gen;
187#if CORO_USE_VALGRIND
188 int valgrind_id;
189#endif
190 unsigned char flags;
191} coro_cctx;
192
193static coro_cctx *cctx_current; /* the currently running cctx */
194
195/*****************************************************************************/
196
197static MGVTBL coro_state_vtbl;
198
199enum
200{
201 CF_RUNNING = 0x0001, /* coroutine is running */
202 CF_READY = 0x0002, /* coroutine is ready */
203 CF_NEW = 0x0004, /* has never been switched to */
204 CF_ZOMBIE = 0x0008, /* coroutine data has been freed */
205 CF_SUSPENDED = 0x0010, /* coroutine can't be scheduled */
206 CF_NOCANCEL = 0x0020, /* cannot cancel, set slf_frame.data to 1 (hackish) */
207};
208
209/* the structure where most of the perl state is stored, overlaid on the cxstack */
210typedef struct
211{
212#define VARx(name,expr,type) type name;
213# include "state.h"
214#undef VARx
215} perl_slots;
216
217/* how many context stack entries do we need for perl_slots */
218#define SLOT_COUNT ((sizeof (perl_slots) + sizeof (PERL_CONTEXT) - 1) / sizeof (PERL_CONTEXT))
219
220/* this is a structure representing a perl-level coroutine */
11struct coro { 221struct coro
12 U8 dowarn; 222{
13 AV *defav; 223 /* the C coroutine allocated to this perl coroutine, if any */
14 224 coro_cctx *cctx;
15 PERL_SI *curstackinfo; 225
16 AV *curstack; 226 /* ready queue */
227 struct coro *next_ready;
228
229 /* state data */
230 struct CoroSLF slf_frame; /* saved slf frame */
17 AV *mainstack; 231 AV *mainstack;
18 SV **stack_sp; 232 perl_slots *slot; /* basically the saved sp */
19 OP *op;
20 SV **curpad;
21 SV **stack_base;
22 SV **stack_max;
23 SV **tmps_stack;
24 I32 tmps_floor;
25 I32 tmps_ix;
26 I32 tmps_max;
27 I32 *markstack;
28 I32 *markstack_ptr;
29 I32 *markstack_max;
30 I32 *scopestack;
31 I32 scopestack_ix;
32 I32 scopestack_max;
33 ANY *savestack;
34 I32 savestack_ix;
35 I32 savestack_max;
36 OP **retstack;
37 I32 retstack_ix;
38 I32 retstack_max;
39 COP *curcop;
40 233
41 AV *args; 234 CV *startcv; /* the CV to execute */
235 AV *args; /* data associated with this coroutine (initial args) */
236 int flags; /* CF_ flags */
237 HV *hv; /* the perl hash associated with this coro, if any */
238
239 /* statistics */
240 int usecount; /* number of transfers to this coro */
241
242 /* coro process data */
243 int prio;
244 SV *except; /* exception to be thrown */
245 SV *rouse_cb; /* last rouse callback */
246 AV *on_destroy; /* callbacks or coros to notify on destroy */
247 AV *status; /* the exit status list */
248
249 /* async_pool */
250 SV *saved_deffh;
251 SV *invoke_cb;
252 AV *invoke_av;
253
254 /* on_enter/on_leave */
255 AV *on_enter;
256 AV *on_leave;
257
258 /* swap_sv */
259 AV *swap_sv;
260
261 /* times */
262 coro_ts t_cpu, t_real;
263
264 /* linked list */
265 struct coro *next, *prev;
42}; 266};
43 267
44typedef struct coro *Coro__State; 268typedef struct coro *Coro__State;
45typedef struct coro *Coro__State_or_hashref; 269typedef struct coro *Coro__State_or_hashref;
46 270
47static HV *padlist_cache; 271/* the following variables are effectively part of the perl context */
272/* and get copied between struct coro and these variables */
273/* the main reason we don't support windows process emulation */
274static struct CoroSLF slf_frame; /* the current slf frame */
48 275
49/* mostly copied from op.c:cv_clone2 */ 276/** Coro ********************************************************************/
50STATIC AV * 277
51clone_padlist (AV *protopadlist) 278#define CORO_PRIO_MAX 3
279#define CORO_PRIO_HIGH 1
280#define CORO_PRIO_NORMAL 0
281#define CORO_PRIO_LOW -1
282#define CORO_PRIO_IDLE -3
283#define CORO_PRIO_MIN -4
284
285/* for Coro.pm */
286static SV *coro_current;
287static SV *coro_readyhook;
288static struct coro *coro_ready [CORO_PRIO_MAX - CORO_PRIO_MIN + 1][2]; /* head|tail */
289static CV *cv_coro_run;
290static struct coro *coro_first;
291#define coro_nready coroapi.nready
292
293/** JIT *********************************************************************/
294
295#if CORO_JIT
296 /* APPLE doesn't have HAVE_MMAP though */
297 #define CORO_JIT_UNIXY (__linux || __FreeBSD__ || __OpenBSD__ || __NetBSD__ || __solaris || __APPLE__)
298 #ifndef CORO_JIT_TYPE
299 #if __x86_64 && CORO_JIT_UNIXY
300 #define CORO_JIT_TYPE "amd64-unix"
301 #elif __i386 && CORO_JIT_UNIXY
302 #define CORO_JIT_TYPE "x86-unix"
303 #endif
304 #endif
305#endif
306
307#if !defined(CORO_JIT_TYPE) || !HAVE_MMAP
308 #undef CORO_JIT
309#endif
310
311#if CORO_JIT
312 typedef void (*load_save_perl_slots_type)(perl_slots *);
313 static load_save_perl_slots_type load_perl_slots, save_perl_slots;
314#endif
315
316/** Coro::Select ************************************************************/
317
318static OP *(*coro_old_pp_sselect) (pTHX);
319static SV *coro_select_select;
320
321/* horrible hack, but if it works... */
322static OP *
323coro_pp_sselect (pTHX)
52{ 324{
53 AV *av; 325 dSP;
54 I32 ix; 326 PUSHMARK (SP - 4); /* fake argument list */
55 AV *protopad_name = (AV *) * av_fetch (protopadlist, 0, FALSE); 327 XPUSHs (coro_select_select);
56 AV *protopad = (AV *) * av_fetch (protopadlist, 1, FALSE); 328 PUTBACK;
57 SV **pname = AvARRAY (protopad_name); 329
58 SV **ppad = AvARRAY (protopad); 330 /* entersub is an UNOP, select a LISTOP... keep your fingers crossed */
59 I32 fname = AvFILLp (protopad_name); 331 PL_op->op_flags |= OPf_STACKED;
60 I32 fpad = AvFILLp (protopad); 332 PL_op->op_private = 0;
333 return PL_ppaddr [OP_ENTERSUB](aTHX);
334}
335
336/** time stuff **************************************************************/
337
338#ifdef HAS_GETTIMEOFDAY
339
340ecb_inline void
341coro_u2time (pTHX_ UV ret[2])
342{
343 struct timeval tv;
344 gettimeofday (&tv, 0);
345
346 ret [0] = tv.tv_sec;
347 ret [1] = tv.tv_usec;
348}
349
350ecb_inline double
351coro_nvtime (void)
352{
353 struct timeval tv;
354 gettimeofday (&tv, 0);
355
356 return tv.tv_sec + tv.tv_usec * 1e-6;
357}
358
359ecb_inline void
360time_init (pTHX)
361{
362 nvtime = coro_nvtime;
363 u2time = coro_u2time;
364}
365
366#else
367
368ecb_inline void
369time_init (pTHX)
370{
371 SV **svp;
372
373 require_pv ("Time/HiRes.pm");
374
375 svp = hv_fetch (PL_modglobal, "Time::NVtime", 12, 0);
376
377 if (!svp) croak ("Time::HiRes is required, but missing. Caught");
378 if (!SvIOK (*svp)) croak ("Time::NVtime isn't a function pointer. Caught");
379
380 nvtime = INT2PTR (double (*)(), SvIV (*svp));
381
382 svp = hv_fetch (PL_modglobal, "Time::U2time", 12, 0);
383 u2time = INT2PTR (void (*)(pTHX_ UV ret[2]), SvIV (*svp));
384}
385
386#endif
387
388/** lowlevel stuff **********************************************************/
389
390static SV * ecb_noinline
391coro_get_sv (pTHX_ const char *name, int create)
392{
393#if PERL_VERSION_ATLEAST (5,10,0)
394 /* silence stupid and wrong 5.10 warning that I am unable to switch off */
395 get_sv (name, create);
396#endif
397 return get_sv (name, create);
398}
399
400static AV * ecb_noinline
401coro_get_av (pTHX_ const char *name, int create)
402{
403#if PERL_VERSION_ATLEAST (5,10,0)
404 /* silence stupid and wrong 5.10 warning that I am unable to switch off */
405 get_av (name, create);
406#endif
407 return get_av (name, create);
408}
409
410static HV * ecb_noinline
411coro_get_hv (pTHX_ const char *name, int create)
412{
413#if PERL_VERSION_ATLEAST (5,10,0)
414 /* silence stupid and wrong 5.10 warning that I am unable to switch off */
415 get_hv (name, create);
416#endif
417 return get_hv (name, create);
418}
419
420ecb_inline void
421coro_times_update (void)
422{
423#ifdef coro_clock_gettime
424 struct timespec ts;
425
426 ts.tv_sec = ts.tv_nsec = 0;
427 coro_clock_gettime (CORO_CLOCK_THREAD_CPUTIME_ID, &ts);
428 time_cpu [0] = ts.tv_sec; time_cpu [1] = ts.tv_nsec;
429
430 ts.tv_sec = ts.tv_nsec = 0;
431 coro_clock_gettime (CORO_CLOCK_MONOTONIC, &ts);
432 time_real [0] = ts.tv_sec; time_real [1] = ts.tv_nsec;
433#else
434 dTHX;
435 UV tv[2];
436
437 u2time (aTHX_ tv);
438 time_real [0] = tv [0];
439 time_real [1] = tv [1] * 1000;
440#endif
441}
442
443ecb_inline void
444coro_times_add (struct coro *c)
445{
446 c->t_real [1] += time_real [1];
447 if (c->t_real [1] > 1000000000) { c->t_real [1] -= 1000000000; ++c->t_real [0]; }
448 c->t_real [0] += time_real [0];
449
450 c->t_cpu [1] += time_cpu [1];
451 if (c->t_cpu [1] > 1000000000) { c->t_cpu [1] -= 1000000000; ++c->t_cpu [0]; }
452 c->t_cpu [0] += time_cpu [0];
453}
454
455ecb_inline void
456coro_times_sub (struct coro *c)
457{
458 if (c->t_real [1] < time_real [1]) { c->t_real [1] += 1000000000; --c->t_real [0]; }
459 c->t_real [1] -= time_real [1];
460 c->t_real [0] -= time_real [0];
461
462 if (c->t_cpu [1] < time_cpu [1]) { c->t_cpu [1] += 1000000000; --c->t_cpu [0]; }
463 c->t_cpu [1] -= time_cpu [1];
464 c->t_cpu [0] -= time_cpu [0];
465}
466
467/*****************************************************************************/
468/* magic glue */
469
470#define CORO_MAGIC_type_cv 26
471#define CORO_MAGIC_type_state PERL_MAGIC_ext
472
473#define CORO_MAGIC_NN(sv, type) \
474 (ecb_expect_true (SvMAGIC (sv)->mg_type == type) \
475 ? SvMAGIC (sv) \
476 : mg_find (sv, type))
477
478#define CORO_MAGIC(sv, type) \
479 (ecb_expect_true (SvMAGIC (sv)) \
480 ? CORO_MAGIC_NN (sv, type) \
481 : 0)
482
483#define CORO_MAGIC_cv(cv) CORO_MAGIC (((SV *)(cv)), CORO_MAGIC_type_cv)
484#define CORO_MAGIC_state(sv) CORO_MAGIC_NN (((SV *)(sv)), CORO_MAGIC_type_state)
485
486ecb_inline MAGIC *
487SvSTATEhv_p (pTHX_ SV *coro)
488{
489 MAGIC *mg;
490
491 if (ecb_expect_true (
492 SvTYPE (coro) == SVt_PVHV
493 && (mg = CORO_MAGIC_state (coro))
494 && mg->mg_virtual == &coro_state_vtbl
495 ))
496 return mg;
497
498 return 0;
499}
500
501ecb_inline struct coro *
502SvSTATE_ (pTHX_ SV *coro)
503{
504 MAGIC *mg;
505
506 if (SvROK (coro))
507 coro = SvRV (coro);
508
509 mg = SvSTATEhv_p (aTHX_ coro);
510 if (!mg)
511 croak ("Coro::State object required");
512
513 return (struct coro *)mg->mg_ptr;
514}
515
516#define SvSTATE(sv) SvSTATE_ (aTHX_ (sv))
517
518/* faster than SvSTATE, but expects a coroutine hv */
519#define SvSTATE_hv(hv) ((struct coro *)CORO_MAGIC_NN ((SV *)hv, CORO_MAGIC_type_state)->mg_ptr)
520#define SvSTATE_current SvSTATE_hv (SvRV (coro_current))
521
522/*****************************************************************************/
523/* padlist management and caching */
524
525ecb_inline AV *
526coro_derive_padlist (pTHX_ CV *cv)
527{
528 AV *padlist = CvPADLIST (cv);
61 AV *newpadlist, *newpad_name, *newpad; 529 AV *newpadlist, *newpad;
62 SV **npad;
63
64 newpad_name = newAV ();
65 for (ix = fname; ix >= 0; ix--)
66 av_store (newpad_name, ix, SvREFCNT_inc (pname[ix]));
67
68 newpad = newAV ();
69 av_fill (newpad, AvFILLp (protopad));
70 npad = AvARRAY (newpad);
71 530
72 newpadlist = newAV (); 531 newpadlist = newAV ();
73 AvREAL_off (newpadlist); 532 AvREAL_off (newpadlist);
74 av_store (newpadlist, 0, (SV *) newpad_name); 533#if PERL_VERSION_ATLEAST (5,10,0)
534 Perl_pad_push (aTHX_ padlist, AvFILLp (padlist) + 1);
535#else
536 Perl_pad_push (aTHX_ padlist, AvFILLp (padlist) + 1, 1);
537#endif
538 newpad = (AV *)AvARRAY (padlist)[AvFILLp (padlist)];
539 --AvFILLp (padlist);
540
541 av_store (newpadlist, 0, SvREFCNT_inc_NN (AvARRAY (padlist)[0]));
75 av_store (newpadlist, 1, (SV *) newpad); 542 av_store (newpadlist, 1, (SV *)newpad);
76 543
77 av = newAV (); /* will be @_ */ 544 return newpadlist;
78 av_extend (av, 0); 545}
79 av_store (newpad, 0, (SV *) av);
80 AvFLAGS (av) = AVf_REIFY;
81 546
82 for (ix = fpad; ix > 0; ix--) 547ecb_inline void
548free_padlist (pTHX_ AV *padlist)
549{
550 /* may be during global destruction */
551 if (!IN_DESTRUCT)
83 { 552 {
84 SV *namesv = (ix <= fname) ? pname[ix] : Nullsv; 553 I32 i = AvFILLp (padlist);
85 if (namesv && namesv != &PL_sv_undef) 554
555 while (i > 0) /* special-case index 0 */
86 { 556 {
87 char *name = SvPVX (namesv); /* XXX */ 557 /* we try to be extra-careful here */
88 if (SvFLAGS (namesv) & SVf_FAKE || *name == '&') 558 AV *av = (AV *)AvARRAY (padlist)[i--];
89 { /* lexical from outside? */ 559 I32 j = AvFILLp (av);
90 npad[ix] = SvREFCNT_inc (ppad[ix]); 560
91 } 561 while (j >= 0)
92 else 562 SvREFCNT_dec (AvARRAY (av)[j--]);
93 { /* our own lexical */ 563
94 SV *sv; 564 AvFILLp (av) = -1;
95 if (*name == '&') 565 SvREFCNT_dec (av);
96 sv = SvREFCNT_inc (ppad[ix]);
97 else if (*name == '@')
98 sv = (SV *) newAV ();
99 else if (*name == '%')
100 sv = (SV *) newHV ();
101 else
102 sv = NEWSV (0, 0);
103 if (!SvPADBUSY (sv))
104 SvPADMY_on (sv);
105 npad[ix] = sv;
106 }
107 } 566 }
108 else if (IS_PADGV (ppad[ix]) || IS_PADCONST (ppad[ix])) 567
109 { 568 SvREFCNT_dec (AvARRAY (padlist)[0]);
110 npad[ix] = SvREFCNT_inc (ppad[ix]); 569
111 } 570 AvFILLp (padlist) = -1;
571 SvREFCNT_dec ((SV*)padlist);
572 }
573}
574
575static int
576coro_cv_free (pTHX_ SV *sv, MAGIC *mg)
577{
578 AV *padlist;
579 AV *av = (AV *)mg->mg_obj;
580
581 /* perl manages to free our internal AV and _then_ call us */
582 if (IN_DESTRUCT)
583 return 0;
584
585 /* casting is fun. */
586 while (&PL_sv_undef != (SV *)(padlist = (AV *)av_pop (av)))
587 free_padlist (aTHX_ padlist);
588
589 SvREFCNT_dec (av); /* sv_magicext increased the refcount */
590
591 return 0;
592}
593
594static MGVTBL coro_cv_vtbl = {
595 0, 0, 0, 0,
596 coro_cv_free
597};
598
599/* the next two functions merely cache the padlists */
600ecb_inline void
601get_padlist (pTHX_ CV *cv)
602{
603 MAGIC *mg = CORO_MAGIC_cv (cv);
604 AV *av;
605
606 if (ecb_expect_true (mg && AvFILLp ((av = (AV *)mg->mg_obj)) >= 0))
607 CvPADLIST (cv) = (AV *)AvARRAY (av)[AvFILLp (av)--];
112 else 608 else
113 { 609 {
114 SV *sv = NEWSV (0, 0); 610#if CORO_PREFER_PERL_FUNCTIONS
115 SvPADTMP_on (sv); 611 /* this is probably cleaner? but also slower! */
116 npad[ix] = sv; 612 /* in practise, it seems to be less stable */
117 } 613 CV *cp = Perl_cv_clone (aTHX_ cv);
118 } 614 CvPADLIST (cv) = CvPADLIST (cp);
119 615 CvPADLIST (cp) = 0;
120#if 0 /* NONOTUNDERSTOOD */ 616 SvREFCNT_dec (cp);
121 /* Now that vars are all in place, clone nested closures. */ 617#else
122 618 CvPADLIST (cv) = coro_derive_padlist (aTHX_ cv);
123 for (ix = fpad; ix > 0; ix--) {
124 SV* namesv = (ix <= fname) ? pname[ix] : Nullsv;
125 if (namesv
126 && namesv != &PL_sv_undef
127 && !(SvFLAGS(namesv) & SVf_FAKE)
128 && *SvPVX(namesv) == '&'
129 && CvCLONE(ppad[ix]))
130 {
131 CV *kid = cv_clone((CV*)ppad[ix]);
132 SvREFCNT_dec(ppad[ix]);
133 CvCLONE_on(kid);
134 SvPADMY_on(kid);
135 npad[ix] = (SV*)kid;
136 }
137 }
138#endif 619#endif
139 620 }
140 return newpadlist;
141} 621}
142 622
143STATIC AV * 623ecb_inline void
144free_padlist (AV *padlist) 624put_padlist (pTHX_ CV *cv)
145{ 625{
146 /* may be during global destruction */ 626 MAGIC *mg = CORO_MAGIC_cv (cv);
147 if (SvREFCNT(padlist)) 627 AV *av;
628
629 if (ecb_expect_false (!mg))
630 mg = sv_magicext ((SV *)cv, (SV *)newAV (), CORO_MAGIC_type_cv, &coro_cv_vtbl, 0, 0);
631
632 av = (AV *)mg->mg_obj;
633
634 if (ecb_expect_false (AvFILLp (av) >= AvMAX (av)))
635 av_extend (av, AvFILLp (av) + 1);
636
637 AvARRAY (av)[++AvFILLp (av)] = (SV *)CvPADLIST (cv);
638}
639
640/** load & save, init *******************************************************/
641
642/* swap sv heads, at least logically */
643static void
644swap_svs (pTHX_ Coro__State c)
645{
646 int i;
647
648 for (i = 0; i <= AvFILLp (c->swap_sv); )
148 { 649 {
149 I32 i = AvFILLp(padlist); 650 SV *a = AvARRAY (c->swap_sv)[i++];
150 while (i >= 0) 651 SV *b = AvARRAY (c->swap_sv)[i++];
652
653 const U32 keep = SVs_PADSTALE | SVs_PADTMP | SVs_PADMY; /* keep these flags */
654 SV tmp;
655
656 /* swap sv_any */
657 SvANY (&tmp) = SvANY (a); SvANY (a) = SvANY (b); SvANY (b) = SvANY (&tmp);
658
659 /* swap sv_flags */
660 SvFLAGS (&tmp) = SvFLAGS (a);
661 SvFLAGS (a) = (SvFLAGS (a) & keep) | (SvFLAGS (b ) & ~keep);
662 SvFLAGS (b) = (SvFLAGS (b) & keep) | (SvFLAGS (&tmp) & ~keep);
663
664#if PERL_VERSION_ATLEAST (5,10,0)
665 /* perl 5.10 complicates this _quite_ a bit, but it also is
666 * much faster, so no quarrels here. alternatively, we could
667 * sv_upgrade to avoid this.
668 */
151 { 669 {
152 SV **svp = av_fetch(padlist, i--, FALSE); 670 /* swap sv_u */
153 SV *sv = svp ? *svp : Nullsv; 671 tmp.sv_u = a->sv_u; a->sv_u = b->sv_u; b->sv_u = tmp.sv_u;
154 if (sv) 672
155 SvREFCNT_dec(sv); 673 /* if SvANY points to the head, we need to adjust the pointers,
674 * as the pointer for a still points to b, and maybe vice versa.
675 */
676 #define svany_in_head(type) \
677 (((1 << SVt_NULL) | (1 << SVt_BIND) | (1 << SVt_IV) | (1 << SVt_RV)) & (1 << (type)))
678
679 if (svany_in_head (SvTYPE (a)))
680 SvANY (a) = (void *)((PTRV)SvANY (a) - (PTRV)b + (PTRV)a);
681
682 if (svany_in_head (SvTYPE (b)))
683 SvANY (b) = (void *)((PTRV)SvANY (b) - (PTRV)a + (PTRV)b);
156 } 684 }
685#endif
686 }
687}
157 688
158 SvREFCNT_dec((SV*)padlist); 689#define SWAP_SVS(coro) \
690 if (ecb_expect_false ((coro)->swap_sv)) \
691 swap_svs (aTHX_ (coro))
692
693static void
694on_enterleave_call (pTHX_ SV *cb);
695
696static void
697load_perl (pTHX_ Coro__State c)
698{
699 perl_slots *slot = c->slot;
700 c->slot = 0;
701
702 PL_mainstack = c->mainstack;
703
704#if CORO_JIT
705 load_perl_slots (slot);
706#else
707 #define VARx(name,expr,type) expr = slot->name;
708 # include "state.h"
709 #undef VARx
710#endif
711
712 {
713 dSP;
714
715 CV *cv;
716
717 /* now do the ugly restore mess */
718 while (ecb_expect_true (cv = (CV *)POPs))
719 {
720 put_padlist (aTHX_ cv); /* mark this padlist as available */
721 CvDEPTH (cv) = PTR2IV (POPs);
722 CvPADLIST (cv) = (AV *)POPs;
723 }
724
725 PUTBACK;
159 } 726 }
160}
161 727
162/* the next tow functions merely cache the padlists */ 728 slf_frame = c->slf_frame;
163STATIC void 729 CORO_THROW = c->except;
164get_padlist (CV *cv)
165{
166 SV **he = hv_fetch (padlist_cache, (void *)&cv, sizeof (CV *), 0);
167 730
168 if (he && AvFILLp ((AV *)*he) >= 0) 731 if (ecb_expect_false (enable_times))
169 CvPADLIST (cv) = (AV *)av_pop ((AV *)*he);
170 else
171 CvPADLIST (cv) = clone_padlist (CvPADLIST (cv));
172}
173
174STATIC void
175put_padlist (CV *cv)
176{
177 SV **he = hv_fetch (padlist_cache, (void *)&cv, sizeof (CV *), 1);
178
179 if (SvTYPE (*he) != SVt_PVAV)
180 {
181 SvREFCNT_dec (*he);
182 *he = (SV *)newAV ();
183 } 732 {
733 if (ecb_expect_false (!times_valid))
734 coro_times_update ();
184 735
185 av_push ((AV *)*he, (SV *)CvPADLIST (cv)); 736 coro_times_sub (c);
186} 737 }
187 738
739 if (ecb_expect_false (c->on_enter))
740 {
741 int i;
742
743 for (i = 0; i <= AvFILLp (c->on_enter); ++i)
744 on_enterleave_call (aTHX_ AvARRAY (c->on_enter)[i]);
745 }
746
747 SWAP_SVS (c);
748}
749
188static void 750static void
189save_state(pTHX_ Coro__State c) 751save_perl (pTHX_ Coro__State c)
190{ 752{
753 SWAP_SVS (c);
754
755 if (ecb_expect_false (c->on_leave))
756 {
757 int i;
758
759 for (i = AvFILLp (c->on_leave); i >= 0; --i)
760 on_enterleave_call (aTHX_ AvARRAY (c->on_leave)[i]);
761 }
762
763 times_valid = 0;
764
765 if (ecb_expect_false (enable_times))
766 {
767 coro_times_update (); times_valid = 1;
768 coro_times_add (c);
769 }
770
771 c->except = CORO_THROW;
772 c->slf_frame = slf_frame;
773
191 { 774 {
192 dSP; 775 dSP;
193 I32 cxix = cxstack_ix; 776 I32 cxix = cxstack_ix;
777 PERL_CONTEXT *ccstk = cxstack;
194 PERL_SI *top_si = PL_curstackinfo; 778 PERL_SI *top_si = PL_curstackinfo;
195 PERL_CONTEXT *ccstk = cxstack;
196 779
197 /* 780 /*
198 * the worst thing you can imagine happens first - we have to save 781 * the worst thing you can imagine happens first - we have to save
199 * (and reinitialize) all cv's in the whole callchain :( 782 * (and reinitialize) all cv's in the whole callchain :(
200 */ 783 */
201 784
202 PUSHs (Nullsv); 785 XPUSHs (Nullsv);
203 /* this loop was inspired by pp_caller */ 786 /* this loop was inspired by pp_caller */
204 for (;;) 787 for (;;)
205 { 788 {
206 while (cxix >= 0) 789 while (ecb_expect_true (cxix >= 0))
207 { 790 {
208 PERL_CONTEXT *cx = &ccstk[cxix--]; 791 PERL_CONTEXT *cx = &ccstk[cxix--];
209 792
210 if (CxTYPE(cx) == CXt_SUB) 793 if (ecb_expect_true (CxTYPE (cx) == CXt_SUB) || ecb_expect_false (CxTYPE (cx) == CXt_FORMAT))
211 { 794 {
212 CV *cv = cx->blk_sub.cv; 795 CV *cv = cx->blk_sub.cv;
796
213 if (CvDEPTH(cv)) 797 if (ecb_expect_true (CvDEPTH (cv)))
214 { 798 {
215#ifdef USE_THREADS
216 XPUSHs ((SV *)CvOWNER(cv));
217#endif
218 EXTEND (SP, 3); 799 EXTEND (SP, 3);
219 PUSHs ((SV *)CvDEPTH(cv));
220 PUSHs ((SV *)CvPADLIST(cv)); 800 PUSHs ((SV *)CvPADLIST (cv));
801 PUSHs (INT2PTR (SV *, (IV)CvDEPTH (cv)));
221 PUSHs ((SV *)cv); 802 PUSHs ((SV *)cv);
222 803
223 get_padlist (cv);
224
225 CvDEPTH(cv) = 0; 804 CvDEPTH (cv) = 0;
226#ifdef USE_THREADS 805 get_padlist (aTHX_ cv);
227 CvOWNER(cv) = 0;
228 error must unlock this cv etc.. etc...
229 if you are here wondering about this error message then
230 the reason is that it will not work as advertised yet
231#endif
232 } 806 }
233 } 807 }
234 else if (CxTYPE(cx) == CXt_FORMAT) 808 }
809
810 if (ecb_expect_true (top_si->si_type == PERLSI_MAIN))
811 break;
812
813 top_si = top_si->si_prev;
814 ccstk = top_si->si_cxstack;
815 cxix = top_si->si_cxix;
816 }
817
818 PUTBACK;
819 }
820
821 /* allocate some space on the context stack for our purposes */
822 if (ecb_expect_false (cxstack_ix + (int)SLOT_COUNT >= cxstack_max))
823 {
824 unsigned int i;
825
826 for (i = 0; i < SLOT_COUNT; ++i)
827 CXINC;
828
829 cxstack_ix -= SLOT_COUNT; /* undo allocation */
830 }
831
832 c->mainstack = PL_mainstack;
833
834 {
835 perl_slots *slot = c->slot = (perl_slots *)(cxstack + cxstack_ix + 1);
836
837#if CORO_JIT
838 save_perl_slots (slot);
839#else
840 #define VARx(name,expr,type) slot->name = expr;
841 # include "state.h"
842 #undef VARx
843#endif
844 }
845}
846
847/*
848 * allocate various perl stacks. This is almost an exact copy
849 * of perl.c:init_stacks, except that it uses less memory
850 * on the (sometimes correct) assumption that coroutines do
851 * not usually need a lot of stackspace.
852 */
853#if CORO_PREFER_PERL_FUNCTIONS
854# define coro_init_stacks(thx) init_stacks ()
855#else
856static void
857coro_init_stacks (pTHX)
858{
859 PL_curstackinfo = new_stackinfo(32, 4 + SLOT_COUNT); /* 3 is minimum due to perl rounding down in scope.c:GROW() */
860 PL_curstackinfo->si_type = PERLSI_MAIN;
861 PL_curstack = PL_curstackinfo->si_stack;
862 PL_mainstack = PL_curstack; /* remember in case we switch stacks */
863
864 PL_stack_base = AvARRAY(PL_curstack);
865 PL_stack_sp = PL_stack_base;
866 PL_stack_max = PL_stack_base + AvMAX(PL_curstack);
867
868 New(50,PL_tmps_stack,32,SV*);
869 PL_tmps_floor = -1;
870 PL_tmps_ix = -1;
871 PL_tmps_max = 32;
872
873 New(54,PL_markstack,16,I32);
874 PL_markstack_ptr = PL_markstack;
875 PL_markstack_max = PL_markstack + 16;
876
877#ifdef SET_MARK_OFFSET
878 SET_MARK_OFFSET;
879#endif
880
881 New(54,PL_scopestack,8,I32);
882 PL_scopestack_ix = 0;
883 PL_scopestack_max = 8;
884
885 New(54,PL_savestack,24,ANY);
886 PL_savestack_ix = 0;
887 PL_savestack_max = 24;
888
889#if !PERL_VERSION_ATLEAST (5,10,0)
890 New(54,PL_retstack,4,OP*);
891 PL_retstack_ix = 0;
892 PL_retstack_max = 4;
893#endif
894}
895#endif
896
897/*
898 * destroy the stacks, the callchain etc...
899 */
900static void
901coro_destruct_stacks (pTHX)
902{
903 while (PL_curstackinfo->si_next)
904 PL_curstackinfo = PL_curstackinfo->si_next;
905
906 while (PL_curstackinfo)
907 {
908 PERL_SI *p = PL_curstackinfo->si_prev;
909
910 if (!IN_DESTRUCT)
911 SvREFCNT_dec (PL_curstackinfo->si_stack);
912
913 Safefree (PL_curstackinfo->si_cxstack);
914 Safefree (PL_curstackinfo);
915 PL_curstackinfo = p;
916 }
917
918 Safefree (PL_tmps_stack);
919 Safefree (PL_markstack);
920 Safefree (PL_scopestack);
921 Safefree (PL_savestack);
922#if !PERL_VERSION_ATLEAST (5,10,0)
923 Safefree (PL_retstack);
924#endif
925}
926
927#define CORO_RSS \
928 rss += sizeof (SYM (curstackinfo)); \
929 rss += (SYM (curstackinfo->si_cxmax) + 1) * sizeof (PERL_CONTEXT); \
930 rss += sizeof (SV) + sizeof (struct xpvav) + (1 + AvMAX (SYM (curstack))) * sizeof (SV *); \
931 rss += SYM (tmps_max) * sizeof (SV *); \
932 rss += (SYM (markstack_max) - SYM (markstack_ptr)) * sizeof (I32); \
933 rss += SYM (scopestack_max) * sizeof (I32); \
934 rss += SYM (savestack_max) * sizeof (ANY);
935
936static size_t
937coro_rss (pTHX_ struct coro *coro)
938{
939 size_t rss = sizeof (*coro);
940
941 if (coro->mainstack)
942 {
943 if (coro->flags & CF_RUNNING)
944 {
945 #define SYM(sym) PL_ ## sym
946 CORO_RSS;
947 #undef SYM
948 }
949 else
950 {
951 #define SYM(sym) coro->slot->sym
952 CORO_RSS;
953 #undef SYM
954 }
955 }
956
957 return rss;
958}
959
960/** coroutine stack handling ************************************************/
961
962static int (*orig_sigelem_get) (pTHX_ SV *sv, MAGIC *mg);
963static int (*orig_sigelem_set) (pTHX_ SV *sv, MAGIC *mg);
964static int (*orig_sigelem_clr) (pTHX_ SV *sv, MAGIC *mg);
965
966/* apparently < 5.8.8 */
967#ifndef MgPV_nolen_const
968#define MgPV_nolen_const(mg) (((((int)(mg)->mg_len)) == HEf_SVKEY) ? \
969 SvPV_nolen((SV*)((mg)->mg_ptr)) : \
970 (const char*)(mg)->mg_ptr)
971#endif
972
973/*
974 * This overrides the default magic get method of %SIG elements.
975 * The original one doesn't provide for reading back of PL_diehook/PL_warnhook
976 * and instead of trying to save and restore the hash elements (extremely slow),
977 * we just provide our own readback here.
978 */
979static int ecb_cold
980coro_sigelem_get (pTHX_ SV *sv, MAGIC *mg)
981{
982 const char *s = MgPV_nolen_const (mg);
983
984 if (*s == '_')
985 {
986 SV **svp = 0;
987
988 if (strEQ (s, "__DIE__" )) svp = &PL_diehook;
989 if (strEQ (s, "__WARN__")) svp = &PL_warnhook;
990
991 if (svp)
992 {
993 sv_setsv (sv, *svp ? *svp : &PL_sv_undef);
994 return 0;
995 }
996 }
997
998 return orig_sigelem_get ? orig_sigelem_get (aTHX_ sv, mg) : 0;
999}
1000
1001static int ecb_cold
1002coro_sigelem_clr (pTHX_ SV *sv, MAGIC *mg)
1003{
1004 const char *s = MgPV_nolen_const (mg);
1005
1006 if (*s == '_')
1007 {
1008 SV **svp = 0;
1009
1010 if (strEQ (s, "__DIE__" )) svp = &PL_diehook;
1011 if (strEQ (s, "__WARN__")) svp = &PL_warnhook;
1012
1013 if (svp)
1014 {
1015 SV *old = *svp;
1016 *svp = 0;
1017 SvREFCNT_dec (old);
1018 return 0;
1019 }
1020 }
1021
1022 return orig_sigelem_clr ? orig_sigelem_clr (aTHX_ sv, mg) : 0;
1023}
1024
1025static int ecb_cold
1026coro_sigelem_set (pTHX_ SV *sv, MAGIC *mg)
1027{
1028 const char *s = MgPV_nolen_const (mg);
1029
1030 if (*s == '_')
1031 {
1032 SV **svp = 0;
1033
1034 if (strEQ (s, "__DIE__" )) svp = &PL_diehook;
1035 if (strEQ (s, "__WARN__")) svp = &PL_warnhook;
1036
1037 if (svp)
1038 {
1039 SV *old = *svp;
1040 *svp = SvOK (sv) ? newSVsv (sv) : 0;
1041 SvREFCNT_dec (old);
1042 return 0;
1043 }
1044 }
1045
1046 return orig_sigelem_set ? orig_sigelem_set (aTHX_ sv, mg) : 0;
1047}
1048
1049static void
1050prepare_nop (pTHX_ struct coro_transfer_args *ta)
1051{
1052 /* kind of mega-hacky, but works */
1053 ta->next = ta->prev = (struct coro *)ta;
1054}
1055
1056static int
1057slf_check_nop (pTHX_ struct CoroSLF *frame)
1058{
1059 return 0;
1060}
1061
1062static int
1063slf_check_repeat (pTHX_ struct CoroSLF *frame)
1064{
1065 return 1;
1066}
1067
1068static UNOP init_perl_op;
1069
1070ecb_noinline static void /* noinline to keep it out of the transfer fast path */
1071init_perl (pTHX_ struct coro *coro)
1072{
1073 /*
1074 * emulate part of the perl startup here.
1075 */
1076 coro_init_stacks (aTHX);
1077
1078 PL_runops = RUNOPS_DEFAULT;
1079 PL_curcop = &PL_compiling;
1080 PL_in_eval = EVAL_NULL;
1081 PL_comppad = 0;
1082 PL_comppad_name = 0;
1083 PL_comppad_name_fill = 0;
1084 PL_comppad_name_floor = 0;
1085 PL_curpm = 0;
1086 PL_curpad = 0;
1087 PL_localizing = 0;
1088 PL_restartop = 0;
1089#if PERL_VERSION_ATLEAST (5,10,0)
1090 PL_parser = 0;
1091#endif
1092 PL_hints = 0;
1093
1094 /* recreate the die/warn hooks */
1095 PL_diehook = SvREFCNT_inc (rv_diehook);
1096 PL_warnhook = SvREFCNT_inc (rv_warnhook);
1097
1098 GvSV (PL_defgv) = newSV (0);
1099 GvAV (PL_defgv) = coro->args; coro->args = 0;
1100 GvSV (PL_errgv) = newSV (0);
1101 GvSV (irsgv) = newSVpvn ("\n", 1); sv_magic (GvSV (irsgv), (SV *)irsgv, PERL_MAGIC_sv, "/", 0);
1102 GvHV (PL_hintgv) = 0;
1103 PL_rs = newSVsv (GvSV (irsgv));
1104 PL_defoutgv = (GV *)SvREFCNT_inc_NN (stdoutgv);
1105
1106 {
1107 dSP;
1108 UNOP myop;
1109
1110 Zero (&myop, 1, UNOP);
1111 myop.op_next = Nullop;
1112 myop.op_type = OP_ENTERSUB;
1113 myop.op_flags = OPf_WANT_VOID;
1114
1115 PUSHMARK (SP);
1116 PUSHs ((SV *)coro->startcv);
1117 PUTBACK;
1118 PL_op = (OP *)&myop;
1119 PL_op = PL_ppaddr[OP_ENTERSUB](aTHX);
1120 }
1121
1122 /* this newly created coroutine might be run on an existing cctx which most
1123 * likely was suspended in pp_slf, so we have to emulate entering pp_slf here.
1124 */
1125 slf_frame.prepare = prepare_nop; /* provide a nop function for an eventual pp_slf */
1126 slf_frame.check = slf_check_nop; /* signal pp_slf to not repeat */
1127 slf_frame.destroy = 0;
1128
1129 /* and we have to provide the pp_slf op in any case, so pp_slf can skip it */
1130 init_perl_op.op_next = PL_op;
1131 init_perl_op.op_type = OP_ENTERSUB;
1132 init_perl_op.op_ppaddr = pp_slf;
1133 /* no flags etc. required, as an init function won't be called */
1134
1135 PL_op = (OP *)&init_perl_op;
1136
1137 /* copy throw, in case it was set before init_perl */
1138 CORO_THROW = coro->except;
1139
1140 SWAP_SVS (coro);
1141
1142 if (ecb_expect_false (enable_times))
1143 {
1144 coro_times_update ();
1145 coro_times_sub (coro);
1146 }
1147}
1148
1149static void
1150coro_unwind_stacks (pTHX)
1151{
1152 if (!IN_DESTRUCT)
1153 {
1154 /* restore all saved variables and stuff */
1155 LEAVE_SCOPE (0);
1156 assert (PL_tmps_floor == -1);
1157
1158 /* free all temporaries */
1159 FREETMPS;
1160 assert (PL_tmps_ix == -1);
1161
1162 /* unwind all extra stacks */
1163 POPSTACK_TO (PL_mainstack);
1164
1165 /* unwind main stack */
1166 dounwind (-1);
1167 }
1168}
1169
1170static void
1171destroy_perl (pTHX_ struct coro *coro)
1172{
1173 SV *svf [9];
1174
1175 {
1176 SV *old_current = SvRV (coro_current);
1177 struct coro *current = SvSTATE (old_current);
1178
1179 assert (("FATAL: tried to destroy currently running coroutine", coro->mainstack != PL_mainstack));
1180
1181 save_perl (aTHX_ current);
1182
1183 /* this will cause transfer_check to croak on block*/
1184 SvRV_set (coro_current, (SV *)coro->hv);
1185
1186 load_perl (aTHX_ coro);
1187
1188 coro_unwind_stacks (aTHX);
1189
1190 /* restore swapped sv's */
1191 SWAP_SVS (coro);
1192
1193 coro_destruct_stacks (aTHX);
1194
1195 /* now save some sv's to be free'd later */
1196 svf [0] = GvSV (PL_defgv);
1197 svf [1] = (SV *)GvAV (PL_defgv);
1198 svf [2] = GvSV (PL_errgv);
1199 svf [3] = (SV *)PL_defoutgv;
1200 svf [4] = PL_rs;
1201 svf [5] = GvSV (irsgv);
1202 svf [6] = (SV *)GvHV (PL_hintgv);
1203 svf [7] = PL_diehook;
1204 svf [8] = PL_warnhook;
1205 assert (9 == sizeof (svf) / sizeof (*svf));
1206
1207 SvRV_set (coro_current, old_current);
1208
1209 load_perl (aTHX_ current);
1210 }
1211
1212 {
1213 unsigned int i;
1214
1215 for (i = 0; i < sizeof (svf) / sizeof (*svf); ++i)
1216 SvREFCNT_dec (svf [i]);
1217
1218 SvREFCNT_dec (coro->saved_deffh);
1219 SvREFCNT_dec (coro->rouse_cb);
1220 SvREFCNT_dec (coro->invoke_cb);
1221 SvREFCNT_dec (coro->invoke_av);
1222 }
1223}
1224
1225ecb_inline void
1226free_coro_mortal (pTHX)
1227{
1228 if (ecb_expect_true (coro_mortal))
1229 {
1230 SvREFCNT_dec ((SV *)coro_mortal);
1231 coro_mortal = 0;
1232 }
1233}
1234
1235static int
1236runops_trace (pTHX)
1237{
1238 COP *oldcop = 0;
1239 int oldcxix = -2;
1240
1241 while ((PL_op = CALL_FPTR (PL_op->op_ppaddr) (aTHX)))
1242 {
1243 PERL_ASYNC_CHECK ();
1244
1245 if (cctx_current->flags & CC_TRACE_ALL)
1246 {
1247 if (PL_op->op_type == OP_LEAVESUB && cctx_current->flags & CC_TRACE_SUB)
1248 {
1249 PERL_CONTEXT *cx = &cxstack[cxstack_ix];
1250 SV **bot, **top;
1251 AV *av = newAV (); /* return values */
1252 SV **cb;
1253 dSP;
1254
1255 GV *gv = CvGV (cx->blk_sub.cv);
1256 SV *fullname = sv_2mortal (newSV (0));
1257 if (isGV (gv))
1258 gv_efullname3 (fullname, gv, 0);
1259
1260 bot = PL_stack_base + cx->blk_oldsp + 1;
1261 top = cx->blk_gimme == G_ARRAY ? SP + 1
1262 : cx->blk_gimme == G_SCALAR ? bot + 1
1263 : bot;
1264
1265 av_extend (av, top - bot);
1266 while (bot < top)
1267 av_push (av, SvREFCNT_inc_NN (*bot++));
1268
1269 PL_runops = RUNOPS_DEFAULT;
1270 ENTER;
1271 SAVETMPS;
1272 EXTEND (SP, 3);
1273 PUSHMARK (SP);
1274 PUSHs (&PL_sv_no);
1275 PUSHs (fullname);
1276 PUSHs (sv_2mortal (newRV_noinc ((SV *)av)));
1277 PUTBACK;
1278 cb = hv_fetch ((HV *)SvRV (coro_current), "_trace_sub_cb", sizeof ("_trace_sub_cb") - 1, 0);
1279 if (cb) call_sv (*cb, G_KEEPERR | G_EVAL | G_VOID | G_DISCARD);
1280 SPAGAIN;
1281 FREETMPS;
1282 LEAVE;
1283 PL_runops = runops_trace;
1284 }
1285
1286 if (oldcop != PL_curcop)
1287 {
1288 oldcop = PL_curcop;
1289
1290 if (PL_curcop != &PL_compiling)
1291 {
1292 SV **cb;
1293
1294 if (oldcxix != cxstack_ix && cctx_current->flags & CC_TRACE_SUB)
1295 {
1296 PERL_CONTEXT *cx = &cxstack[cxstack_ix];
1297
1298 if (CxTYPE (cx) == CXt_SUB && oldcxix < cxstack_ix)
1299 {
1300 dSP;
1301 GV *gv = CvGV (cx->blk_sub.cv);
1302 SV *fullname = sv_2mortal (newSV (0));
1303
1304 if (isGV (gv))
1305 gv_efullname3 (fullname, gv, 0);
1306
1307 PL_runops = RUNOPS_DEFAULT;
1308 ENTER;
1309 SAVETMPS;
1310 EXTEND (SP, 3);
1311 PUSHMARK (SP);
1312 PUSHs (&PL_sv_yes);
1313 PUSHs (fullname);
1314 PUSHs (CxHASARGS (cx) ? sv_2mortal (newRV_inc ((SV *)cx->blk_sub.argarray)) : &PL_sv_undef);
1315 PUTBACK;
1316 cb = hv_fetch ((HV *)SvRV (coro_current), "_trace_sub_cb", sizeof ("_trace_sub_cb") - 1, 0);
1317 if (cb) call_sv (*cb, G_KEEPERR | G_EVAL | G_VOID | G_DISCARD);
1318 SPAGAIN;
1319 FREETMPS;
1320 LEAVE;
1321 PL_runops = runops_trace;
1322 }
1323
1324 oldcxix = cxstack_ix;
1325 }
1326
1327 if (cctx_current->flags & CC_TRACE_LINE)
1328 {
1329 dSP;
1330
1331 PL_runops = RUNOPS_DEFAULT;
1332 ENTER;
1333 SAVETMPS;
1334 EXTEND (SP, 3);
1335 PL_runops = RUNOPS_DEFAULT;
1336 PUSHMARK (SP);
1337 PUSHs (sv_2mortal (newSVpv (OutCopFILE (oldcop), 0)));
1338 PUSHs (sv_2mortal (newSViv (CopLINE (oldcop))));
1339 PUTBACK;
1340 cb = hv_fetch ((HV *)SvRV (coro_current), "_trace_line_cb", sizeof ("_trace_line_cb") - 1, 0);
1341 if (cb) call_sv (*cb, G_KEEPERR | G_EVAL | G_VOID | G_DISCARD);
1342 SPAGAIN;
1343 FREETMPS;
1344 LEAVE;
1345 PL_runops = runops_trace;
1346 }
1347 }
1348 }
1349 }
1350 }
1351
1352 TAINT_NOT;
1353 return 0;
1354}
1355
1356static struct CoroSLF cctx_ssl_frame;
1357
1358static void
1359slf_prepare_set_stacklevel (pTHX_ struct coro_transfer_args *ta)
1360{
1361 ta->prev = 0;
1362}
1363
1364static int
1365slf_check_set_stacklevel (pTHX_ struct CoroSLF *frame)
1366{
1367 *frame = cctx_ssl_frame;
1368
1369 return frame->check (aTHX_ frame); /* execute the restored frame - there must be one */
1370}
1371
1372/* initialises PL_top_env and injects a pseudo-slf-call to set the stacklevel */
1373static void ecb_noinline
1374cctx_prepare (pTHX)
1375{
1376 PL_top_env = &PL_start_env;
1377
1378 if (cctx_current->flags & CC_TRACE)
1379 PL_runops = runops_trace;
1380
1381 /* we already must be executing an SLF op, there is no other valid way
1382 * that can lead to creation of a new cctx */
1383 assert (("FATAL: can't prepare slf-less cctx in Coro module (please report)",
1384 slf_frame.prepare && PL_op->op_ppaddr == pp_slf));
1385
1386 /* we must emulate leaving pp_slf, which is done inside slf_check_set_stacklevel */
1387 cctx_ssl_frame = slf_frame;
1388
1389 slf_frame.prepare = slf_prepare_set_stacklevel;
1390 slf_frame.check = slf_check_set_stacklevel;
1391}
1392
1393/* the tail of transfer: execute stuff we can only do after a transfer */
1394ecb_inline void
1395transfer_tail (pTHX)
1396{
1397 free_coro_mortal (aTHX);
1398}
1399
1400/*
1401 * this is a _very_ stripped down perl interpreter ;)
1402 */
1403static void
1404cctx_run (void *arg)
1405{
1406#ifdef USE_ITHREADS
1407# if CORO_PTHREAD
1408 PERL_SET_CONTEXT (coro_thx);
1409# endif
1410#endif
1411 {
1412 dTHX;
1413
1414 /* normally we would need to skip the entersub here */
1415 /* not doing so will re-execute it, which is exactly what we want */
1416 /* PL_nop = PL_nop->op_next */
1417
1418 /* inject a fake subroutine call to cctx_init */
1419 cctx_prepare (aTHX);
1420
1421 /* cctx_run is the alternative tail of transfer() */
1422 transfer_tail (aTHX);
1423
1424 /* somebody or something will hit me for both perl_run and PL_restartop */
1425 PL_restartop = PL_op;
1426 perl_run (PL_curinterp);
1427 /*
1428 * Unfortunately, there is no way to get at the return values of the
1429 * coro body here, as perl_run destroys these. Likewise, we cannot catch
1430 * runtime errors here, as this is just a random interpreter, not a thread.
1431 */
1432
1433 /*
1434 * If perl-run returns we assume exit() was being called or the coro
1435 * fell off the end, which seems to be the only valid (non-bug)
1436 * reason for perl_run to return. We try to exit by jumping to the
1437 * bootstrap-time "top" top_env, as we cannot restore the "main"
1438 * coroutine as Coro has no such concept.
1439 * This actually isn't valid with the pthread backend, but OSes requiring
1440 * that backend are too broken to do it in a standards-compliant way.
1441 */
1442 PL_top_env = main_top_env;
1443 JMPENV_JUMP (2); /* I do not feel well about the hardcoded 2 at all */
1444 }
1445}
1446
1447static coro_cctx *
1448cctx_new (void)
1449{
1450 coro_cctx *cctx;
1451
1452 ++cctx_count;
1453 New (0, cctx, 1, coro_cctx);
1454
1455 cctx->gen = cctx_gen;
1456 cctx->flags = 0;
1457 cctx->idle_sp = 0; /* can be accessed by transfer between cctx_run and set_stacklevel, on throw */
1458
1459 return cctx;
1460}
1461
1462/* create a new cctx only suitable as source */
1463static coro_cctx *
1464cctx_new_empty (void)
1465{
1466 coro_cctx *cctx = cctx_new ();
1467
1468 cctx->sptr = 0;
1469 coro_create (&cctx->cctx, 0, 0, 0, 0);
1470
1471 return cctx;
1472}
1473
1474/* create a new cctx suitable as destination/running a perl interpreter */
1475static coro_cctx *
1476cctx_new_run (void)
1477{
1478 coro_cctx *cctx = cctx_new ();
1479 void *stack_start;
1480 size_t stack_size;
1481
1482#if HAVE_MMAP
1483 cctx->ssize = ((cctx_stacksize * sizeof (long) + PAGESIZE - 1) / PAGESIZE + CORO_STACKGUARD) * PAGESIZE;
1484 /* mmap supposedly does allocate-on-write for us */
1485 cctx->sptr = mmap (0, cctx->ssize, PROT_READ | PROT_WRITE | PROT_EXEC, MAP_ANONYMOUS, 0, 0);
1486
1487 if (cctx->sptr != (void *)-1)
1488 {
1489 #if CORO_STACKGUARD
1490 mprotect (cctx->sptr, CORO_STACKGUARD * PAGESIZE, PROT_NONE);
1491 #endif
1492 stack_start = (char *)cctx->sptr + CORO_STACKGUARD * PAGESIZE;
1493 stack_size = cctx->ssize - CORO_STACKGUARD * PAGESIZE;
1494 cctx->flags |= CC_MAPPED;
1495 }
1496 else
1497#endif
1498 {
1499 cctx->ssize = cctx_stacksize * (long)sizeof (long);
1500 New (0, cctx->sptr, cctx_stacksize, long);
1501
1502 if (!cctx->sptr)
1503 {
1504 perror ("FATAL: unable to allocate stack for coroutine, exiting.");
1505 _exit (EXIT_FAILURE);
1506 }
1507
1508 stack_start = cctx->sptr;
1509 stack_size = cctx->ssize;
1510 }
1511
1512 #if CORO_USE_VALGRIND
1513 cctx->valgrind_id = VALGRIND_STACK_REGISTER ((char *)stack_start, (char *)stack_start + stack_size);
1514 #endif
1515
1516 coro_create (&cctx->cctx, cctx_run, (void *)cctx, stack_start, stack_size);
1517
1518 return cctx;
1519}
1520
1521static void
1522cctx_destroy (coro_cctx *cctx)
1523{
1524 if (!cctx)
1525 return;
1526
1527 assert (("FATAL: tried to destroy current cctx", cctx != cctx_current));
1528
1529 --cctx_count;
1530 coro_destroy (&cctx->cctx);
1531
1532 /* coro_transfer creates new, empty cctx's */
1533 if (cctx->sptr)
1534 {
1535 #if CORO_USE_VALGRIND
1536 VALGRIND_STACK_DEREGISTER (cctx->valgrind_id);
1537 #endif
1538
1539#if HAVE_MMAP
1540 if (cctx->flags & CC_MAPPED)
1541 munmap (cctx->sptr, cctx->ssize);
1542 else
1543#endif
1544 Safefree (cctx->sptr);
1545 }
1546
1547 Safefree (cctx);
1548}
1549
1550/* wether this cctx should be destructed */
1551#define CCTX_EXPIRED(cctx) ((cctx)->gen != cctx_gen || ((cctx)->flags & CC_NOREUSE))
1552
1553static coro_cctx *
1554cctx_get (pTHX)
1555{
1556 while (ecb_expect_true (cctx_first))
1557 {
1558 coro_cctx *cctx = cctx_first;
1559 cctx_first = cctx->next;
1560 --cctx_idle;
1561
1562 if (ecb_expect_true (!CCTX_EXPIRED (cctx)))
1563 return cctx;
1564
1565 cctx_destroy (cctx);
1566 }
1567
1568 return cctx_new_run ();
1569}
1570
1571static void
1572cctx_put (coro_cctx *cctx)
1573{
1574 assert (("FATAL: cctx_put called on non-initialised cctx in Coro (please report)", cctx->sptr));
1575
1576 /* free another cctx if overlimit */
1577 if (ecb_expect_false (cctx_idle >= cctx_max_idle))
1578 {
1579 coro_cctx *first = cctx_first;
1580 cctx_first = first->next;
1581 --cctx_idle;
1582
1583 cctx_destroy (first);
1584 }
1585
1586 ++cctx_idle;
1587 cctx->next = cctx_first;
1588 cctx_first = cctx;
1589}
1590
1591/** coroutine switching *****************************************************/
1592
1593static void
1594transfer_check (pTHX_ struct coro *prev, struct coro *next)
1595{
1596 /* TODO: throwing up here is considered harmful */
1597
1598 if (ecb_expect_true (prev != next))
1599 {
1600 if (ecb_expect_false (!(prev->flags & (CF_RUNNING | CF_NEW))))
1601 croak ("Coro::State::transfer called with a blocked prev Coro::State, but can only transfer from running or new states,");
1602
1603 if (ecb_expect_false (next->flags & (CF_RUNNING | CF_ZOMBIE | CF_SUSPENDED)))
1604 croak ("Coro::State::transfer called with running, destroyed or suspended next Coro::State, but can only transfer to inactive states,");
1605
1606#if !PERL_VERSION_ATLEAST (5,10,0)
1607 if (ecb_expect_false (PL_lex_state != LEX_NOTPARSING))
1608 croak ("Coro::State::transfer called while parsing, but this is not supported in your perl version,");
1609#endif
1610 }
1611}
1612
1613/* always use the TRANSFER macro */
1614static void ecb_noinline /* noinline so we have a fixed stackframe */
1615transfer (pTHX_ struct coro *prev, struct coro *next, int force_cctx)
1616{
1617 dSTACKLEVEL;
1618
1619 /* sometimes transfer is only called to set idle_sp */
1620 if (ecb_expect_false (!prev))
1621 {
1622 cctx_current->idle_sp = STACKLEVEL;
1623 assert (cctx_current->idle_te = PL_top_env); /* just for the side-effect when asserts are enabled */
1624 }
1625 else if (ecb_expect_true (prev != next))
1626 {
1627 coro_cctx *cctx_prev;
1628
1629 if (ecb_expect_false (prev->flags & CF_NEW))
1630 {
1631 /* create a new empty/source context */
1632 prev->flags &= ~CF_NEW;
1633 prev->flags |= CF_RUNNING;
1634 }
1635
1636 prev->flags &= ~CF_RUNNING;
1637 next->flags |= CF_RUNNING;
1638
1639 /* first get rid of the old state */
1640 save_perl (aTHX_ prev);
1641
1642 if (ecb_expect_false (next->flags & CF_NEW))
1643 {
1644 /* need to start coroutine */
1645 next->flags &= ~CF_NEW;
1646 /* setup coroutine call */
1647 init_perl (aTHX_ next);
1648 }
1649 else
1650 load_perl (aTHX_ next);
1651
1652 /* possibly untie and reuse the cctx */
1653 if (ecb_expect_true (
1654 cctx_current->idle_sp == STACKLEVEL
1655 && !(cctx_current->flags & CC_TRACE)
1656 && !force_cctx
1657 ))
1658 {
1659 /* I assume that stacklevel is a stronger indicator than PL_top_env changes */
1660 assert (("FATAL: current top_env must equal previous top_env in Coro (please report)", PL_top_env == cctx_current->idle_te));
1661
1662 /* if the cctx is about to be destroyed we need to make sure we won't see it in cctx_get. */
1663 /* without this the next cctx_get might destroy the running cctx while still in use */
1664 if (ecb_expect_false (CCTX_EXPIRED (cctx_current)))
1665 if (ecb_expect_true (!next->cctx))
1666 next->cctx = cctx_get (aTHX);
1667
1668 cctx_put (cctx_current);
1669 }
1670 else
1671 prev->cctx = cctx_current;
1672
1673 ++next->usecount;
1674
1675 cctx_prev = cctx_current;
1676 cctx_current = ecb_expect_false (next->cctx) ? next->cctx : cctx_get (aTHX);
1677
1678 next->cctx = 0;
1679
1680 if (ecb_expect_false (cctx_prev != cctx_current))
1681 {
1682 cctx_prev->top_env = PL_top_env;
1683 PL_top_env = cctx_current->top_env;
1684 coro_transfer (&cctx_prev->cctx, &cctx_current->cctx);
1685 }
1686
1687 transfer_tail (aTHX);
1688 }
1689}
1690
1691#define TRANSFER(ta, force_cctx) transfer (aTHX_ (ta).prev, (ta).next, (force_cctx))
1692#define TRANSFER_CHECK(ta) transfer_check (aTHX_ (ta).prev, (ta).next)
1693
1694/** high level stuff ********************************************************/
1695
1696/* this function is actually Coro, not Coro::State, but we call it from here */
1697/* because it is convenient - but it hasn't been declared yet for that reason */
1698static void
1699coro_call_on_destroy (pTHX_ struct coro *coro);
1700
1701static void
1702coro_state_destroy (pTHX_ struct coro *coro)
1703{
1704 if (coro->flags & CF_ZOMBIE)
1705 return;
1706
1707 slf_destroy (aTHX_ coro);
1708
1709 coro->flags |= CF_ZOMBIE;
1710
1711 if (coro->flags & CF_READY)
1712 {
1713 /* reduce nready, as destroying a ready coro effectively unreadies it */
1714 /* alternative: look through all ready queues and remove the coro */
1715 --coro_nready;
1716 }
1717 else
1718 coro->flags |= CF_READY; /* make sure it is NOT put into the readyqueue */
1719
1720 if (coro->next) coro->next->prev = coro->prev;
1721 if (coro->prev) coro->prev->next = coro->next;
1722 if (coro == coro_first) coro_first = coro->next;
1723
1724 if (coro->mainstack
1725 && coro->mainstack != main_mainstack
1726 && coro->slot
1727 && !PL_dirty)
1728 destroy_perl (aTHX_ coro);
1729
1730 cctx_destroy (coro->cctx);
1731 SvREFCNT_dec (coro->startcv);
1732 SvREFCNT_dec (coro->args);
1733 SvREFCNT_dec (coro->swap_sv);
1734 SvREFCNT_dec (CORO_THROW);
1735
1736 coro_call_on_destroy (aTHX_ coro);
1737
1738 /* more destruction mayhem in coro_state_free */
1739}
1740
1741static int
1742coro_state_free (pTHX_ SV *sv, MAGIC *mg)
1743{
1744 struct coro *coro = (struct coro *)mg->mg_ptr;
1745 mg->mg_ptr = 0;
1746
1747 coro_state_destroy (aTHX_ coro);
1748 SvREFCNT_dec (coro->on_destroy);
1749 SvREFCNT_dec (coro->status);
1750
1751 Safefree (coro);
1752
1753 return 0;
1754}
1755
1756static int ecb_cold
1757coro_state_dup (pTHX_ MAGIC *mg, CLONE_PARAMS *params)
1758{
1759 /* called when perl clones the current process the slow way (windows process emulation) */
1760 /* WE SIMply nuke the pointers in the copy, causing perl to croak */
1761 mg->mg_ptr = 0;
1762 mg->mg_virtual = 0;
1763
1764 return 0;
1765}
1766
1767static MGVTBL coro_state_vtbl = {
1768 0, 0, 0, 0,
1769 coro_state_free,
1770 0,
1771#ifdef MGf_DUP
1772 coro_state_dup,
1773#else
1774# define MGf_DUP 0
1775#endif
1776};
1777
1778static void
1779prepare_transfer (pTHX_ struct coro_transfer_args *ta, SV *prev_sv, SV *next_sv)
1780{
1781 ta->prev = SvSTATE (prev_sv);
1782 ta->next = SvSTATE (next_sv);
1783 TRANSFER_CHECK (*ta);
1784}
1785
1786static void
1787api_transfer (pTHX_ SV *prev_sv, SV *next_sv)
1788{
1789 struct coro_transfer_args ta;
1790
1791 prepare_transfer (aTHX_ &ta, prev_sv, next_sv);
1792 TRANSFER (ta, 1);
1793}
1794
1795/** Coro ********************************************************************/
1796
1797ecb_inline void
1798coro_enq (pTHX_ struct coro *coro)
1799{
1800 struct coro **ready = coro_ready [coro->prio - CORO_PRIO_MIN];
1801
1802 SvREFCNT_inc_NN (coro->hv);
1803
1804 coro->next_ready = 0;
1805 *(ready [0] ? &ready [1]->next_ready : &ready [0]) = coro;
1806 ready [1] = coro;
1807}
1808
1809ecb_inline struct coro *
1810coro_deq (pTHX)
1811{
1812 int prio;
1813
1814 for (prio = CORO_PRIO_MAX - CORO_PRIO_MIN + 1; --prio >= 0; )
1815 {
1816 struct coro **ready = coro_ready [prio];
1817
1818 if (ready [0])
1819 {
1820 struct coro *coro = ready [0];
1821 ready [0] = coro->next_ready;
1822 return coro;
1823 }
1824 }
1825
1826 return 0;
1827}
1828
1829static void
1830invoke_sv_ready_hook_helper (void)
1831{
1832 dTHX;
1833 dSP;
1834
1835 ENTER;
1836 SAVETMPS;
1837
1838 PUSHMARK (SP);
1839 PUTBACK;
1840 call_sv (coro_readyhook, G_VOID | G_DISCARD);
1841
1842 FREETMPS;
1843 LEAVE;
1844}
1845
1846static int
1847api_ready (pTHX_ SV *coro_sv)
1848{
1849 struct coro *coro = SvSTATE (coro_sv);
1850
1851 if (coro->flags & CF_READY)
1852 return 0;
1853
1854 coro->flags |= CF_READY;
1855
1856 coro_enq (aTHX_ coro);
1857
1858 if (!coro_nready++)
1859 if (coroapi.readyhook)
1860 coroapi.readyhook ();
1861
1862 return 1;
1863}
1864
1865static int
1866api_is_ready (pTHX_ SV *coro_sv)
1867{
1868 return !!(SvSTATE (coro_sv)->flags & CF_READY);
1869}
1870
1871/* expects to own a reference to next->hv */
1872ecb_inline void
1873prepare_schedule_to (pTHX_ struct coro_transfer_args *ta, struct coro *next)
1874{
1875 SV *prev_sv = SvRV (coro_current);
1876
1877 ta->prev = SvSTATE_hv (prev_sv);
1878 ta->next = next;
1879
1880 TRANSFER_CHECK (*ta);
1881
1882 SvRV_set (coro_current, (SV *)next->hv);
1883
1884 free_coro_mortal (aTHX);
1885 coro_mortal = prev_sv;
1886}
1887
1888static void
1889prepare_schedule (pTHX_ struct coro_transfer_args *ta)
1890{
1891 for (;;)
1892 {
1893 struct coro *next = coro_deq (aTHX);
1894
1895 if (ecb_expect_true (next))
1896 {
1897 /* cannot transfer to destroyed coros, skip and look for next */
1898 if (ecb_expect_false (next->flags & (CF_ZOMBIE | CF_SUSPENDED)))
1899 SvREFCNT_dec (next->hv); /* coro_nready has already been taken care of by destroy */
1900 else
1901 {
1902 next->flags &= ~CF_READY;
1903 --coro_nready;
1904
1905 prepare_schedule_to (aTHX_ ta, next);
1906 break;
1907 }
1908 }
1909 else
1910 {
1911 /* nothing to schedule: call the idle handler */
1912 if (SvROK (sv_idle)
1913 && SvOBJECT (SvRV (sv_idle)))
1914 {
1915 if (SvRV (sv_idle) == SvRV (coro_current))
1916 croak ("FATAL: $Coro::IDLE blocked itself - did you try to block inside an event loop callback? Caught");
1917
1918 ++coro_nready; /* hack so that api_ready doesn't invoke ready hook */
1919 api_ready (aTHX_ SvRV (sv_idle));
1920 --coro_nready;
1921 }
1922 else
1923 {
1924 /* TODO: deprecated, remove, cannot work reliably *//*D*/
1925 dSP;
1926
1927 ENTER;
1928 SAVETMPS;
1929
1930 PUSHMARK (SP);
1931 PUTBACK;
1932 call_sv (sv_idle, G_VOID | G_DISCARD);
1933
1934 FREETMPS;
1935 LEAVE;
1936 }
1937 }
1938 }
1939}
1940
1941ecb_inline void
1942prepare_cede (pTHX_ struct coro_transfer_args *ta)
1943{
1944 api_ready (aTHX_ coro_current);
1945 prepare_schedule (aTHX_ ta);
1946}
1947
1948ecb_inline void
1949prepare_cede_notself (pTHX_ struct coro_transfer_args *ta)
1950{
1951 SV *prev = SvRV (coro_current);
1952
1953 if (coro_nready)
1954 {
1955 prepare_schedule (aTHX_ ta);
1956 api_ready (aTHX_ prev);
1957 }
1958 else
1959 prepare_nop (aTHX_ ta);
1960}
1961
1962static void
1963api_schedule (pTHX)
1964{
1965 struct coro_transfer_args ta;
1966
1967 prepare_schedule (aTHX_ &ta);
1968 TRANSFER (ta, 1);
1969}
1970
1971static void
1972api_schedule_to (pTHX_ SV *coro_sv)
1973{
1974 struct coro_transfer_args ta;
1975 struct coro *next = SvSTATE (coro_sv);
1976
1977 SvREFCNT_inc_NN (coro_sv);
1978 prepare_schedule_to (aTHX_ &ta, next);
1979}
1980
1981static int
1982api_cede (pTHX)
1983{
1984 struct coro_transfer_args ta;
1985
1986 prepare_cede (aTHX_ &ta);
1987
1988 if (ecb_expect_true (ta.prev != ta.next))
1989 {
1990 TRANSFER (ta, 1);
1991 return 1;
1992 }
1993 else
1994 return 0;
1995}
1996
1997static int
1998api_cede_notself (pTHX)
1999{
2000 if (coro_nready)
2001 {
2002 struct coro_transfer_args ta;
2003
2004 prepare_cede_notself (aTHX_ &ta);
2005 TRANSFER (ta, 1);
2006 return 1;
2007 }
2008 else
2009 return 0;
2010}
2011
2012static void
2013api_trace (pTHX_ SV *coro_sv, int flags)
2014{
2015 struct coro *coro = SvSTATE (coro_sv);
2016
2017 if (coro->flags & CF_RUNNING)
2018 croak ("cannot enable tracing on a running coroutine, caught");
2019
2020 if (flags & CC_TRACE)
2021 {
2022 if (!coro->cctx)
2023 coro->cctx = cctx_new_run ();
2024 else if (!(coro->cctx->flags & CC_TRACE))
2025 croak ("cannot enable tracing on coroutine with custom stack, caught");
2026
2027 coro->cctx->flags |= CC_NOREUSE | (flags & (CC_TRACE | CC_TRACE_ALL));
2028 }
2029 else if (coro->cctx && coro->cctx->flags & CC_TRACE)
2030 {
2031 coro->cctx->flags &= ~(CC_TRACE | CC_TRACE_ALL);
2032
2033 if (coro->flags & CF_RUNNING)
2034 PL_runops = RUNOPS_DEFAULT;
2035 else
2036 coro->slot->runops = RUNOPS_DEFAULT;
2037 }
2038}
2039
2040static void
2041coro_push_av (pTHX_ AV *av, I32 gimme_v)
2042{
2043 if (AvFILLp (av) >= 0 && gimme_v != G_VOID)
2044 {
2045 dSP;
2046
2047 if (gimme_v == G_SCALAR)
2048 XPUSHs (AvARRAY (av)[AvFILLp (av)]);
2049 else
2050 {
2051 int i;
2052 EXTEND (SP, AvFILLp (av) + 1);
2053
2054 for (i = 0; i <= AvFILLp (av); ++i)
2055 PUSHs (AvARRAY (av)[i]);
2056 }
2057
2058 PUTBACK;
2059 }
2060}
2061
2062static void
2063coro_push_on_destroy (pTHX_ struct coro *coro, SV *cb)
2064{
2065 if (!coro->on_destroy)
2066 coro->on_destroy = newAV ();
2067
2068 av_push (coro->on_destroy, cb);
2069}
2070
2071static void
2072slf_destroy_join (pTHX_ struct CoroSLF *frame)
2073{
2074 SvREFCNT_dec ((SV *)((struct coro *)frame->data)->hv);
2075}
2076
2077static int
2078slf_check_join (pTHX_ struct CoroSLF *frame)
2079{
2080 struct coro *coro = (struct coro *)frame->data;
2081
2082 if (!coro->status)
2083 return 1;
2084
2085 frame->destroy = 0;
2086
2087 coro_push_av (aTHX_ coro->status, GIMME_V);
2088
2089 SvREFCNT_dec ((SV *)coro->hv);
2090
2091 return 0;
2092}
2093
2094static void
2095slf_init_join (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2096{
2097 struct coro *coro = SvSTATE (items > 0 ? arg [0] : &PL_sv_undef);
2098
2099 if (items > 1)
2100 croak ("join called with too many arguments");
2101
2102 if (coro->status)
2103 frame->prepare = prepare_nop;
2104 else
2105 {
2106 coro_push_on_destroy (aTHX_ coro, SvREFCNT_inc_NN (SvRV (coro_current)));
2107 frame->prepare = prepare_schedule;
2108 }
2109
2110 frame->check = slf_check_join;
2111 frame->destroy = slf_destroy_join;
2112 frame->data = (void *)coro;
2113 SvREFCNT_inc (coro->hv);
2114}
2115
2116static void
2117coro_call_on_destroy (pTHX_ struct coro *coro)
2118{
2119 AV *od = coro->on_destroy;
2120
2121 if (!od)
2122 return;
2123
2124 while (AvFILLp (od) >= 0)
2125 {
2126 SV *cb = sv_2mortal (av_pop (od));
2127
2128 /* coro hv's (and only hv's at the moment) are supported as well */
2129 if (SvSTATEhv_p (aTHX_ cb))
2130 api_ready (aTHX_ cb);
2131 else
2132 {
2133 dSP; /* don't disturb outer sp */
2134 PUSHMARK (SP);
2135
2136 if (coro->status)
2137 {
2138 PUTBACK;
2139 coro_push_av (aTHX_ coro->status, G_ARRAY);
2140 SPAGAIN;
2141 }
2142
2143 PUTBACK;
2144 call_sv (cb, G_VOID | G_DISCARD);
2145 }
2146 }
2147}
2148
2149static void
2150coro_set_status (pTHX_ struct coro *coro, SV **arg, int items)
2151{
2152 AV *av;
2153
2154 if (coro->status)
2155 {
2156 av = coro->status;
2157 av_clear (av);
2158 }
2159 else
2160 av = coro->status = newAV ();
2161
2162 /* items are actually not so common, so optimise for this case */
2163 if (items)
2164 {
2165 int i;
2166
2167 av_extend (av, items - 1);
2168
2169 for (i = 0; i < items; ++i)
2170 av_push (av, SvREFCNT_inc_NN (arg [i]));
2171 }
2172}
2173
2174static void
2175slf_init_terminate_cancel_common (pTHX_ struct CoroSLF *frame, HV *coro_hv)
2176{
2177 av_push (av_destroy, (SV *)newRV_inc ((SV *)coro_hv)); /* RVinc for perl */
2178 api_ready (aTHX_ sv_manager);
2179
2180 frame->prepare = prepare_schedule;
2181 frame->check = slf_check_repeat;
2182
2183 /* as a minor optimisation, we could unwind all stacks here */
2184 /* but that puts extra pressure on pp_slf, and is not worth much */
2185 /*coro_unwind_stacks (aTHX);*/
2186}
2187
2188static void
2189slf_init_terminate (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2190{
2191 HV *coro_hv = (HV *)SvRV (coro_current);
2192
2193 coro_set_status (aTHX_ SvSTATE ((SV *)coro_hv), arg, items);
2194 slf_init_terminate_cancel_common (aTHX_ frame, coro_hv);
2195}
2196
2197static void
2198slf_init_cancel (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2199{
2200 HV *coro_hv;
2201 struct coro *coro;
2202
2203 if (items <= 0)
2204 croak ("Coro::cancel called without coro object,");
2205
2206 coro = SvSTATE (arg [0]);
2207 coro_hv = coro->hv;
2208
2209 coro_set_status (aTHX_ coro, arg + 1, items - 1);
2210
2211 if (ecb_expect_false (coro->flags & CF_NOCANCEL))
2212 {
2213 /* coro currently busy cancelling something, so just notify it */
2214 coro->slf_frame.data = (void *)coro;
2215
2216 frame->prepare = prepare_nop;
2217 frame->check = slf_check_nop;
2218 }
2219 else if (coro_hv == (HV *)SvRV (coro_current))
2220 {
2221 /* cancelling the current coro is allowed, and equals terminate */
2222 slf_init_terminate_cancel_common (aTHX_ frame, coro_hv);
2223 }
2224 else
2225 {
2226 struct coro *self = SvSTATE_current;
2227
2228 /* otherwise we cancel directly, purely for speed reasons
2229 * unfortunately, this requires some magic trickery, as
2230 * somebody else could cancel us, so we have to fight the cancellation.
2231 * this is ugly, and hopefully fully worth the extra speed.
2232 * besides, I can't get the slow-but-safe version working...
2233 */
2234 slf_frame.data = 0;
2235 self->flags |= CF_NOCANCEL;
2236 coro_state_destroy (aTHX_ coro);
2237 self->flags &= ~CF_NOCANCEL;
2238
2239 if (slf_frame.data)
2240 {
2241 /* while we were busy we have been cancelled, so terminate */
2242 slf_init_terminate_cancel_common (aTHX_ frame, self->hv);
2243 }
2244 else
2245 {
2246 frame->prepare = prepare_nop;
2247 frame->check = slf_check_nop;
2248 }
2249 }
2250}
2251
2252static int
2253slf_check_safe_cancel (pTHX_ struct CoroSLF *frame)
2254{
2255 frame->prepare = 0;
2256 coro_unwind_stacks (aTHX);
2257
2258 slf_init_terminate_cancel_common (aTHX_ frame, (HV *)SvRV (coro_current));
2259
2260 return 1;
2261}
2262
2263static int
2264safe_cancel (pTHX_ struct coro *coro, SV **arg, int items)
2265{
2266 if (coro->cctx)
2267 croak ("coro inside C callback, unable to cancel at this time, caught");
2268
2269 if (coro->flags & CF_NEW)
2270 {
2271 coro_set_status (aTHX_ coro, arg, items);
2272 coro_state_destroy (aTHX_ coro);
2273 }
2274 else
2275 {
2276 if (!coro->slf_frame.prepare)
2277 croak ("coro outside an SLF function, unable to cancel at this time, caught");
2278
2279 slf_destroy (aTHX_ coro);
2280
2281 coro_set_status (aTHX_ coro, arg, items);
2282 coro->slf_frame.prepare = prepare_nop;
2283 coro->slf_frame.check = slf_check_safe_cancel;
2284
2285 api_ready (aTHX_ (SV *)coro->hv);
2286 }
2287
2288 return 1;
2289}
2290
2291/*****************************************************************************/
2292/* async pool handler */
2293
2294static int
2295slf_check_pool_handler (pTHX_ struct CoroSLF *frame)
2296{
2297 HV *hv = (HV *)SvRV (coro_current);
2298 struct coro *coro = (struct coro *)frame->data;
2299
2300 if (!coro->invoke_cb)
2301 return 1; /* loop till we have invoke */
2302 else
2303 {
2304 hv_store (hv, "desc", sizeof ("desc") - 1,
2305 newSVpvn ("[async_pool]", sizeof ("[async_pool]") - 1), 0);
2306
2307 coro->saved_deffh = SvREFCNT_inc_NN ((SV *)PL_defoutgv);
2308
2309 {
2310 dSP;
2311 XPUSHs (sv_2mortal (coro->invoke_cb)); coro->invoke_cb = 0;
2312 PUTBACK;
2313 }
2314
2315 SvREFCNT_dec (GvAV (PL_defgv));
2316 GvAV (PL_defgv) = coro->invoke_av;
2317 coro->invoke_av = 0;
2318
2319 return 0;
2320 }
2321}
2322
2323static void
2324slf_init_pool_handler (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2325{
2326 HV *hv = (HV *)SvRV (coro_current);
2327 struct coro *coro = SvSTATE_hv ((SV *)hv);
2328
2329 if (ecb_expect_true (coro->saved_deffh))
2330 {
2331 /* subsequent iteration */
2332 SvREFCNT_dec ((SV *)PL_defoutgv); PL_defoutgv = (GV *)coro->saved_deffh;
2333 coro->saved_deffh = 0;
2334
2335 if (coro_rss (aTHX_ coro) > SvUV (sv_pool_rss)
2336 || av_len (av_async_pool) + 1 >= SvIV (sv_pool_size))
2337 {
2338 slf_init_terminate_cancel_common (aTHX_ frame, hv);
2339 return;
2340 }
2341 else
2342 {
2343 av_clear (GvAV (PL_defgv));
2344 hv_store (hv, "desc", sizeof ("desc") - 1, SvREFCNT_inc_NN (sv_async_pool_idle), 0);
2345
2346 coro->prio = 0;
2347
2348 if (coro->cctx && (coro->cctx->flags & CC_TRACE))
2349 api_trace (aTHX_ coro_current, 0);
2350
2351 frame->prepare = prepare_schedule;
2352 av_push (av_async_pool, SvREFCNT_inc (hv));
2353 }
2354 }
2355 else
2356 {
2357 /* first iteration, simply fall through */
2358 frame->prepare = prepare_nop;
2359 }
2360
2361 frame->check = slf_check_pool_handler;
2362 frame->data = (void *)coro;
2363}
2364
2365/*****************************************************************************/
2366/* rouse callback */
2367
2368#define CORO_MAGIC_type_rouse PERL_MAGIC_ext
2369
2370static void
2371coro_rouse_callback (pTHX_ CV *cv)
2372{
2373 dXSARGS;
2374 SV *data = (SV *)S_GENSUB_ARG;
2375
2376 if (SvTYPE (SvRV (data)) != SVt_PVAV)
2377 {
2378 /* first call, set args */
2379 SV *coro = SvRV (data);
2380 AV *av = newAV ();
2381
2382 SvRV_set (data, (SV *)av);
2383
2384 /* better take a full copy of the arguments */
2385 while (items--)
2386 av_store (av, items, newSVsv (ST (items)));
2387
2388 api_ready (aTHX_ coro);
2389 SvREFCNT_dec (coro);
2390 }
2391
2392 XSRETURN_EMPTY;
2393}
2394
2395static int
2396slf_check_rouse_wait (pTHX_ struct CoroSLF *frame)
2397{
2398 SV *data = (SV *)frame->data;
2399
2400 if (CORO_THROW)
2401 return 0;
2402
2403 if (SvTYPE (SvRV (data)) != SVt_PVAV)
2404 return 1;
2405
2406 /* now push all results on the stack */
2407 {
2408 dSP;
2409 AV *av = (AV *)SvRV (data);
2410 int i;
2411
2412 EXTEND (SP, AvFILLp (av) + 1);
2413 for (i = 0; i <= AvFILLp (av); ++i)
2414 PUSHs (sv_2mortal (AvARRAY (av)[i]));
2415
2416 /* we have stolen the elements, so set length to zero and free */
2417 AvFILLp (av) = -1;
2418 av_undef (av);
2419
2420 PUTBACK;
2421 }
2422
2423 return 0;
2424}
2425
2426static void
2427slf_init_rouse_wait (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2428{
2429 SV *cb;
2430
2431 if (items)
2432 cb = arg [0];
2433 else
2434 {
2435 struct coro *coro = SvSTATE_current;
2436
2437 if (!coro->rouse_cb)
2438 croak ("Coro::rouse_wait called without rouse callback, and no default rouse callback found either,");
2439
2440 cb = sv_2mortal (coro->rouse_cb);
2441 coro->rouse_cb = 0;
2442 }
2443
2444 if (!SvROK (cb)
2445 || SvTYPE (SvRV (cb)) != SVt_PVCV
2446 || CvXSUB ((CV *)SvRV (cb)) != coro_rouse_callback)
2447 croak ("Coro::rouse_wait called with illegal callback argument,");
2448
2449 {
2450 CV *cv = (CV *)SvRV (cb); /* for S_GENSUB_ARG */
2451 SV *data = (SV *)S_GENSUB_ARG;
2452
2453 frame->data = (void *)data;
2454 frame->prepare = SvTYPE (SvRV (data)) == SVt_PVAV ? prepare_nop : prepare_schedule;
2455 frame->check = slf_check_rouse_wait;
2456 }
2457}
2458
2459static SV *
2460coro_new_rouse_cb (pTHX)
2461{
2462 HV *hv = (HV *)SvRV (coro_current);
2463 struct coro *coro = SvSTATE_hv (hv);
2464 SV *data = newRV_inc ((SV *)hv);
2465 SV *cb = s_gensub (aTHX_ coro_rouse_callback, (void *)data);
2466
2467 sv_magicext (SvRV (cb), data, CORO_MAGIC_type_rouse, 0, 0, 0);
2468 SvREFCNT_dec (data); /* magicext increases the refcount */
2469
2470 SvREFCNT_dec (coro->rouse_cb);
2471 coro->rouse_cb = SvREFCNT_inc_NN (cb);
2472
2473 return cb;
2474}
2475
2476/*****************************************************************************/
2477/* schedule-like-function opcode (SLF) */
2478
2479static UNOP slf_restore; /* restore stack as entersub did, for first-re-run */
2480static const CV *slf_cv;
2481static SV **slf_argv;
2482static int slf_argc, slf_arga; /* count, allocated */
2483static I32 slf_ax; /* top of stack, for restore */
2484
2485/* this restores the stack in the case we patched the entersub, to */
2486/* recreate the stack frame as perl will on following calls */
2487/* since entersub cleared the stack */
2488static OP *
2489pp_restore (pTHX)
2490{
2491 int i;
2492 SV **SP = PL_stack_base + slf_ax;
2493
2494 PUSHMARK (SP);
2495
2496 EXTEND (SP, slf_argc + 1);
2497
2498 for (i = 0; i < slf_argc; ++i)
2499 PUSHs (sv_2mortal (slf_argv [i]));
2500
2501 PUSHs ((SV *)CvGV (slf_cv));
2502
2503 RETURNOP (slf_restore.op_first);
2504}
2505
2506static void
2507slf_prepare_transfer (pTHX_ struct coro_transfer_args *ta)
2508{
2509 SV **arg = (SV **)slf_frame.data;
2510
2511 prepare_transfer (aTHX_ ta, arg [0], arg [1]);
2512}
2513
2514static void
2515slf_init_transfer (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2516{
2517 if (items != 2)
2518 croak ("Coro::State::transfer (prev, next) expects two arguments, not %d,", items);
2519
2520 frame->prepare = slf_prepare_transfer;
2521 frame->check = slf_check_nop;
2522 frame->data = (void *)arg; /* let's hope it will stay valid */
2523}
2524
2525static void
2526slf_init_schedule (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2527{
2528 frame->prepare = prepare_schedule;
2529 frame->check = slf_check_nop;
2530}
2531
2532static void
2533slf_prepare_schedule_to (pTHX_ struct coro_transfer_args *ta)
2534{
2535 struct coro *next = (struct coro *)slf_frame.data;
2536
2537 SvREFCNT_inc_NN (next->hv);
2538 prepare_schedule_to (aTHX_ ta, next);
2539}
2540
2541static void
2542slf_init_schedule_to (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2543{
2544 if (!items)
2545 croak ("Coro::schedule_to expects a coroutine argument, caught");
2546
2547 frame->data = (void *)SvSTATE (arg [0]);
2548 frame->prepare = slf_prepare_schedule_to;
2549 frame->check = slf_check_nop;
2550}
2551
2552static void
2553slf_init_cede_to (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2554{
2555 api_ready (aTHX_ SvRV (coro_current));
2556
2557 slf_init_schedule_to (aTHX_ frame, cv, arg, items);
2558}
2559
2560static void
2561slf_init_cede (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2562{
2563 frame->prepare = prepare_cede;
2564 frame->check = slf_check_nop;
2565}
2566
2567static void
2568slf_init_cede_notself (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2569{
2570 frame->prepare = prepare_cede_notself;
2571 frame->check = slf_check_nop;
2572}
2573
2574/* "undo"/cancel a running slf call - used when cancelling a coro, mainly */
2575static void
2576slf_destroy (pTHX_ struct coro *coro)
2577{
2578 /* this callback is reserved for slf functions needing to do cleanup */
2579 if (coro->slf_frame.destroy && coro->slf_frame.prepare && !PL_dirty)
2580 coro->slf_frame.destroy (aTHX_ &coro->slf_frame);
2581
2582 /*
2583 * The on_destroy above most likely is from an SLF call.
2584 * Since by definition the SLF call will not finish when we destroy
2585 * the coro, we will have to force-finish it here, otherwise
2586 * cleanup functions cannot call SLF functions.
2587 */
2588 coro->slf_frame.prepare = 0;
2589}
2590
2591/*
2592 * these not obviously related functions are all rolled into one
2593 * function to increase chances that they all will call transfer with the same
2594 * stack offset
2595 * SLF stands for "schedule-like-function".
2596 */
2597static OP *
2598pp_slf (pTHX)
2599{
2600 I32 checkmark; /* mark SP to see how many elements check has pushed */
2601
2602 /* set up the slf frame, unless it has already been set-up */
2603 /* the latter happens when a new coro has been started */
2604 /* or when a new cctx was attached to an existing coroutine */
2605 if (ecb_expect_true (!slf_frame.prepare))
2606 {
2607 /* first iteration */
2608 dSP;
2609 SV **arg = PL_stack_base + TOPMARK + 1;
2610 int items = SP - arg; /* args without function object */
2611 SV *gv = *sp;
2612
2613 /* do a quick consistency check on the "function" object, and if it isn't */
2614 /* for us, divert to the real entersub */
2615 if (SvTYPE (gv) != SVt_PVGV
2616 || !GvCV (gv)
2617 || !(CvFLAGS (GvCV (gv)) & CVf_SLF))
2618 return PL_ppaddr[OP_ENTERSUB](aTHX);
2619
2620 if (!(PL_op->op_flags & OPf_STACKED))
2621 {
2622 /* ampersand-form of call, use @_ instead of stack */
2623 AV *av = GvAV (PL_defgv);
2624 arg = AvARRAY (av);
2625 items = AvFILLp (av) + 1;
2626 }
2627
2628 /* now call the init function, which needs to set up slf_frame */
2629 ((coro_slf_cb)CvXSUBANY (GvCV (gv)).any_ptr)
2630 (aTHX_ &slf_frame, GvCV (gv), arg, items);
2631
2632 /* pop args */
2633 SP = PL_stack_base + POPMARK;
2634
2635 PUTBACK;
2636 }
2637
2638 /* now that we have a slf_frame, interpret it! */
2639 /* we use a callback system not to make the code needlessly */
2640 /* complicated, but so we can run multiple perl coros from one cctx */
2641
2642 do
2643 {
2644 struct coro_transfer_args ta;
2645
2646 slf_frame.prepare (aTHX_ &ta);
2647 TRANSFER (ta, 0);
2648
2649 checkmark = PL_stack_sp - PL_stack_base;
2650 }
2651 while (slf_frame.check (aTHX_ &slf_frame));
2652
2653 slf_frame.prepare = 0; /* invalidate the frame, we are done processing it */
2654
2655 /* exception handling */
2656 if (ecb_expect_false (CORO_THROW))
2657 {
2658 SV *exception = sv_2mortal (CORO_THROW);
2659
2660 CORO_THROW = 0;
2661 sv_setsv (ERRSV, exception);
2662 croak (0);
2663 }
2664
2665 /* return value handling - mostly like entersub */
2666 /* make sure we put something on the stack in scalar context */
2667 if (GIMME_V == G_SCALAR
2668 && ecb_expect_false (PL_stack_sp != PL_stack_base + checkmark + 1))
2669 {
2670 dSP;
2671 SV **bot = PL_stack_base + checkmark;
2672
2673 if (sp == bot) /* too few, push undef */
2674 bot [1] = &PL_sv_undef;
2675 else /* too many, take last one */
2676 bot [1] = *sp;
2677
2678 SP = bot + 1;
2679
2680 PUTBACK;
2681 }
2682
2683 return NORMAL;
2684}
2685
2686static void
2687api_execute_slf (pTHX_ CV *cv, coro_slf_cb init_cb, I32 ax)
2688{
2689 int i;
2690 SV **arg = PL_stack_base + ax;
2691 int items = PL_stack_sp - arg + 1;
2692
2693 assert (("FATAL: SLF call with illegal CV value", !CvANON (cv)));
2694
2695 if (PL_op->op_ppaddr != PL_ppaddr [OP_ENTERSUB]
2696 && PL_op->op_ppaddr != pp_slf)
2697 croak ("FATAL: Coro SLF calls can only be made normally, not via goto or any other means, caught");
2698
2699 CvFLAGS (cv) |= CVf_SLF;
2700 CvXSUBANY (cv).any_ptr = (void *)init_cb;
2701 slf_cv = cv;
2702
2703 /* we patch the op, and then re-run the whole call */
2704 /* we have to put the same argument on the stack for this to work */
2705 /* and this will be done by pp_restore */
2706 slf_restore.op_next = (OP *)&slf_restore;
2707 slf_restore.op_type = OP_CUSTOM;
2708 slf_restore.op_ppaddr = pp_restore;
2709 slf_restore.op_first = PL_op;
2710
2711 slf_ax = ax - 1; /* undo the ax++ inside dAXMARK */
2712
2713 if (PL_op->op_flags & OPf_STACKED)
2714 {
2715 if (items > slf_arga)
2716 {
2717 slf_arga = items;
2718 Safefree (slf_argv);
2719 New (0, slf_argv, slf_arga, SV *);
2720 }
2721
2722 slf_argc = items;
2723
2724 for (i = 0; i < items; ++i)
2725 slf_argv [i] = SvREFCNT_inc (arg [i]);
2726 }
2727 else
2728 slf_argc = 0;
2729
2730 PL_op->op_ppaddr = pp_slf;
2731 /*PL_op->op_type = OP_CUSTOM; /* we do behave like entersub still */
2732
2733 PL_op = (OP *)&slf_restore;
2734}
2735
2736/*****************************************************************************/
2737/* dynamic wind */
2738
2739static void
2740on_enterleave_call (pTHX_ SV *cb)
2741{
2742 dSP;
2743
2744 PUSHSTACK;
2745
2746 PUSHMARK (SP);
2747 PUTBACK;
2748 call_sv (cb, G_VOID | G_DISCARD);
2749 SPAGAIN;
2750
2751 POPSTACK;
2752}
2753
2754static SV *
2755coro_avp_pop_and_free (pTHX_ AV **avp)
2756{
2757 AV *av = *avp;
2758 SV *res = av_pop (av);
2759
2760 if (AvFILLp (av) < 0)
2761 {
2762 *avp = 0;
2763 SvREFCNT_dec (av);
2764 }
2765
2766 return res;
2767}
2768
2769static void
2770coro_pop_on_enter (pTHX_ void *coro)
2771{
2772 SV *cb = coro_avp_pop_and_free (aTHX_ &((struct coro *)coro)->on_enter);
2773 SvREFCNT_dec (cb);
2774}
2775
2776static void
2777coro_pop_on_leave (pTHX_ void *coro)
2778{
2779 SV *cb = coro_avp_pop_and_free (aTHX_ &((struct coro *)coro)->on_leave);
2780 on_enterleave_call (aTHX_ sv_2mortal (cb));
2781}
2782
2783/*****************************************************************************/
2784/* PerlIO::cede */
2785
2786typedef struct
2787{
2788 PerlIOBuf base;
2789 NV next, every;
2790} PerlIOCede;
2791
2792static IV ecb_cold
2793PerlIOCede_pushed (pTHX_ PerlIO *f, const char *mode, SV *arg, PerlIO_funcs *tab)
2794{
2795 PerlIOCede *self = PerlIOSelf (f, PerlIOCede);
2796
2797 self->every = SvCUR (arg) ? SvNV (arg) : 0.01;
2798 self->next = nvtime () + self->every;
2799
2800 return PerlIOBuf_pushed (aTHX_ f, mode, Nullsv, tab);
2801}
2802
2803static SV * ecb_cold
2804PerlIOCede_getarg (pTHX_ PerlIO *f, CLONE_PARAMS *param, int flags)
2805{
2806 PerlIOCede *self = PerlIOSelf (f, PerlIOCede);
2807
2808 return newSVnv (self->every);
2809}
2810
2811static IV
2812PerlIOCede_flush (pTHX_ PerlIO *f)
2813{
2814 PerlIOCede *self = PerlIOSelf (f, PerlIOCede);
2815 double now = nvtime ();
2816
2817 if (now >= self->next)
2818 {
2819 api_cede (aTHX);
2820 self->next = now + self->every;
2821 }
2822
2823 return PerlIOBuf_flush (aTHX_ f);
2824}
2825
2826static PerlIO_funcs PerlIO_cede =
2827{
2828 sizeof(PerlIO_funcs),
2829 "cede",
2830 sizeof(PerlIOCede),
2831 PERLIO_K_DESTRUCT | PERLIO_K_RAW,
2832 PerlIOCede_pushed,
2833 PerlIOBuf_popped,
2834 PerlIOBuf_open,
2835 PerlIOBase_binmode,
2836 PerlIOCede_getarg,
2837 PerlIOBase_fileno,
2838 PerlIOBuf_dup,
2839 PerlIOBuf_read,
2840 PerlIOBuf_unread,
2841 PerlIOBuf_write,
2842 PerlIOBuf_seek,
2843 PerlIOBuf_tell,
2844 PerlIOBuf_close,
2845 PerlIOCede_flush,
2846 PerlIOBuf_fill,
2847 PerlIOBase_eof,
2848 PerlIOBase_error,
2849 PerlIOBase_clearerr,
2850 PerlIOBase_setlinebuf,
2851 PerlIOBuf_get_base,
2852 PerlIOBuf_bufsiz,
2853 PerlIOBuf_get_ptr,
2854 PerlIOBuf_get_cnt,
2855 PerlIOBuf_set_ptrcnt,
2856};
2857
2858/*****************************************************************************/
2859/* Coro::Semaphore & Coro::Signal */
2860
2861static SV *
2862coro_waitarray_new (pTHX_ int count)
2863{
2864 /* a waitarray=semaphore contains a counter IV in $sem->[0] and any waiters after that */
2865 AV *av = newAV ();
2866 SV **ary;
2867
2868 /* unfortunately, building manually saves memory */
2869 Newx (ary, 2, SV *);
2870 AvALLOC (av) = ary;
2871#if PERL_VERSION_ATLEAST (5,10,0)
2872 AvARRAY (av) = ary;
2873#else
2874 /* 5.8.8 needs this syntax instead of AvARRAY = ary, yet */
2875 /* -DDEBUGGING flags this as a bug, despite it perfectly working */
2876 SvPVX ((SV *)av) = (char *)ary;
2877#endif
2878 AvMAX (av) = 1;
2879 AvFILLp (av) = 0;
2880 ary [0] = newSViv (count);
2881
2882 return newRV_noinc ((SV *)av);
2883}
2884
2885/* semaphore */
2886
2887static void
2888coro_semaphore_adjust (pTHX_ AV *av, IV adjust)
2889{
2890 SV *count_sv = AvARRAY (av)[0];
2891 IV count = SvIVX (count_sv);
2892
2893 count += adjust;
2894 SvIVX (count_sv) = count;
2895
2896 /* now wake up as many waiters as are expected to lock */
2897 while (count > 0 && AvFILLp (av) > 0)
2898 {
2899 SV *cb;
2900
2901 /* swap first two elements so we can shift a waiter */
2902 AvARRAY (av)[0] = AvARRAY (av)[1];
2903 AvARRAY (av)[1] = count_sv;
2904 cb = av_shift (av);
2905
2906 if (SvOBJECT (cb))
2907 {
2908 api_ready (aTHX_ cb);
2909 --count;
2910 }
2911 else if (SvTYPE (cb) == SVt_PVCV)
2912 {
2913 dSP;
2914 PUSHMARK (SP);
2915 XPUSHs (sv_2mortal (newRV_inc ((SV *)av)));
2916 PUTBACK;
2917 call_sv (cb, G_VOID | G_DISCARD | G_EVAL | G_KEEPERR);
2918 }
2919
2920 SvREFCNT_dec (cb);
2921 }
2922}
2923
2924static void
2925coro_semaphore_destroy (pTHX_ struct CoroSLF *frame)
2926{
2927 /* call $sem->adjust (0) to possibly wake up some other waiters */
2928 coro_semaphore_adjust (aTHX_ (AV *)frame->data, 0);
2929}
2930
2931static int
2932slf_check_semaphore_down_or_wait (pTHX_ struct CoroSLF *frame, int acquire)
2933{
2934 AV *av = (AV *)frame->data;
2935 SV *count_sv = AvARRAY (av)[0];
2936 SV *coro_hv = SvRV (coro_current);
2937
2938 /* if we are about to throw, don't actually acquire the lock, just throw */
2939 if (CORO_THROW)
2940 return 0;
2941 else if (SvIVX (count_sv) > 0)
2942 {
2943 frame->destroy = 0;
2944
2945 if (acquire)
2946 SvIVX (count_sv) = SvIVX (count_sv) - 1;
2947 else
2948 coro_semaphore_adjust (aTHX_ av, 0);
2949
2950 return 0;
2951 }
2952 else
2953 {
2954 int i;
2955 /* if we were woken up but can't down, we look through the whole */
2956 /* waiters list and only add us if we aren't in there already */
2957 /* this avoids some degenerate memory usage cases */
2958 for (i = AvFILLp (av); i > 0; --i) /* i > 0 is not an off-by-one bug */
2959 if (AvARRAY (av)[i] == coro_hv)
2960 return 1;
2961
2962 av_push (av, SvREFCNT_inc (coro_hv));
2963 return 1;
2964 }
2965}
2966
2967static int
2968slf_check_semaphore_down (pTHX_ struct CoroSLF *frame)
2969{
2970 return slf_check_semaphore_down_or_wait (aTHX_ frame, 1);
2971}
2972
2973static int
2974slf_check_semaphore_wait (pTHX_ struct CoroSLF *frame)
2975{
2976 return slf_check_semaphore_down_or_wait (aTHX_ frame, 0);
2977}
2978
2979static void
2980slf_init_semaphore_down_or_wait (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2981{
2982 AV *av = (AV *)SvRV (arg [0]);
2983
2984 if (SvIVX (AvARRAY (av)[0]) > 0)
2985 {
2986 frame->data = (void *)av;
2987 frame->prepare = prepare_nop;
2988 }
2989 else
2990 {
2991 av_push (av, SvREFCNT_inc (SvRV (coro_current)));
2992
2993 frame->data = (void *)sv_2mortal (SvREFCNT_inc ((SV *)av));
2994 frame->prepare = prepare_schedule;
2995 /* to avoid race conditions when a woken-up coro gets terminated */
2996 /* we arrange for a temporary on_destroy that calls adjust (0) */
2997 frame->destroy = coro_semaphore_destroy;
2998 }
2999}
3000
3001static void
3002slf_init_semaphore_down (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
3003{
3004 slf_init_semaphore_down_or_wait (aTHX_ frame, cv, arg, items);
3005 frame->check = slf_check_semaphore_down;
3006}
3007
3008static void
3009slf_init_semaphore_wait (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
3010{
3011 if (items >= 2)
3012 {
3013 /* callback form */
3014 AV *av = (AV *)SvRV (arg [0]);
3015 SV *cb_cv = s_get_cv_croak (arg [1]);
3016
3017 av_push (av, SvREFCNT_inc_NN (cb_cv));
3018
3019 if (SvIVX (AvARRAY (av)[0]) > 0)
3020 coro_semaphore_adjust (aTHX_ av, 0);
3021
3022 frame->prepare = prepare_nop;
3023 frame->check = slf_check_nop;
3024 }
3025 else
3026 {
3027 slf_init_semaphore_down_or_wait (aTHX_ frame, cv, arg, items);
3028 frame->check = slf_check_semaphore_wait;
3029 }
3030}
3031
3032/* signal */
3033
3034static void
3035coro_signal_wake (pTHX_ AV *av, int count)
3036{
3037 SvIVX (AvARRAY (av)[0]) = 0;
3038
3039 /* now signal count waiters */
3040 while (count > 0 && AvFILLp (av) > 0)
3041 {
3042 SV *cb;
3043
3044 /* swap first two elements so we can shift a waiter */
3045 cb = AvARRAY (av)[0];
3046 AvARRAY (av)[0] = AvARRAY (av)[1];
3047 AvARRAY (av)[1] = cb;
3048
3049 cb = av_shift (av);
3050
3051 if (SvTYPE (cb) == SVt_PVCV)
3052 {
3053 dSP;
3054 PUSHMARK (SP);
3055 XPUSHs (sv_2mortal (newRV_inc ((SV *)av)));
3056 PUTBACK;
3057 call_sv (cb, G_VOID | G_DISCARD | G_EVAL | G_KEEPERR);
3058 }
3059 else
3060 {
3061 api_ready (aTHX_ cb);
3062 sv_setiv (cb, 0); /* signal waiter */
3063 }
3064
3065 SvREFCNT_dec (cb);
3066
3067 --count;
3068 }
3069}
3070
3071static int
3072slf_check_signal_wait (pTHX_ struct CoroSLF *frame)
3073{
3074 /* if we are about to throw, also stop waiting */
3075 return SvROK ((SV *)frame->data) && !CORO_THROW;
3076}
3077
3078static void
3079slf_init_signal_wait (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
3080{
3081 AV *av = (AV *)SvRV (arg [0]);
3082
3083 if (items >= 2)
3084 {
3085 SV *cb_cv = s_get_cv_croak (arg [1]);
3086 av_push (av, SvREFCNT_inc_NN (cb_cv));
3087
3088 if (SvIVX (AvARRAY (av)[0]))
3089 coro_signal_wake (aTHX_ av, 1); /* must be the only waiter */
3090
3091 frame->prepare = prepare_nop;
3092 frame->check = slf_check_nop;
3093 }
3094 else if (SvIVX (AvARRAY (av)[0]))
3095 {
3096 SvIVX (AvARRAY (av)[0]) = 0;
3097 frame->prepare = prepare_nop;
3098 frame->check = slf_check_nop;
3099 }
3100 else
3101 {
3102 SV *waiter = newSVsv (coro_current); /* owned by signal av */
3103
3104 av_push (av, waiter);
3105
3106 frame->data = (void *)sv_2mortal (SvREFCNT_inc_NN (waiter)); /* owned by process */
3107 frame->prepare = prepare_schedule;
3108 frame->check = slf_check_signal_wait;
3109 }
3110}
3111
3112/*****************************************************************************/
3113/* Coro::AIO */
3114
3115#define CORO_MAGIC_type_aio PERL_MAGIC_ext
3116
3117/* helper storage struct */
3118struct io_state
3119{
3120 int errorno;
3121 I32 laststype; /* U16 in 5.10.0 */
3122 int laststatval;
3123 Stat_t statcache;
3124};
3125
3126static void
3127coro_aio_callback (pTHX_ CV *cv)
3128{
3129 dXSARGS;
3130 AV *state = (AV *)S_GENSUB_ARG;
3131 SV *coro = av_pop (state);
3132 SV *data_sv = newSV (sizeof (struct io_state));
3133
3134 av_extend (state, items - 1);
3135
3136 sv_upgrade (data_sv, SVt_PV);
3137 SvCUR_set (data_sv, sizeof (struct io_state));
3138 SvPOK_only (data_sv);
3139
3140 {
3141 struct io_state *data = (struct io_state *)SvPVX (data_sv);
3142
3143 data->errorno = errno;
3144 data->laststype = PL_laststype;
3145 data->laststatval = PL_laststatval;
3146 data->statcache = PL_statcache;
3147 }
3148
3149 /* now build the result vector out of all the parameters and the data_sv */
3150 {
3151 int i;
3152
3153 for (i = 0; i < items; ++i)
3154 av_push (state, SvREFCNT_inc_NN (ST (i)));
3155 }
3156
3157 av_push (state, data_sv);
3158
3159 api_ready (aTHX_ coro);
3160 SvREFCNT_dec (coro);
3161 SvREFCNT_dec ((AV *)state);
3162}
3163
3164static int
3165slf_check_aio_req (pTHX_ struct CoroSLF *frame)
3166{
3167 AV *state = (AV *)frame->data;
3168
3169 /* if we are about to throw, return early */
3170 /* this does not cancel the aio request, but at least */
3171 /* it quickly returns */
3172 if (CORO_THROW)
3173 return 0;
3174
3175 /* one element that is an RV? repeat! */
3176 if (AvFILLp (state) == 0 && SvROK (AvARRAY (state)[0]))
3177 return 1;
3178
3179 /* restore status */
3180 {
3181 SV *data_sv = av_pop (state);
3182 struct io_state *data = (struct io_state *)SvPVX (data_sv);
3183
3184 errno = data->errorno;
3185 PL_laststype = data->laststype;
3186 PL_laststatval = data->laststatval;
3187 PL_statcache = data->statcache;
3188
3189 SvREFCNT_dec (data_sv);
3190 }
3191
3192 /* push result values */
3193 {
3194 dSP;
3195 int i;
3196
3197 EXTEND (SP, AvFILLp (state) + 1);
3198 for (i = 0; i <= AvFILLp (state); ++i)
3199 PUSHs (sv_2mortal (SvREFCNT_inc_NN (AvARRAY (state)[i])));
3200
3201 PUTBACK;
3202 }
3203
3204 return 0;
3205}
3206
3207static void
3208slf_init_aio_req (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
3209{
3210 AV *state = (AV *)sv_2mortal ((SV *)newAV ());
3211 SV *coro_hv = SvRV (coro_current);
3212 struct coro *coro = SvSTATE_hv (coro_hv);
3213
3214 /* put our coroutine id on the state arg */
3215 av_push (state, SvREFCNT_inc_NN (coro_hv));
3216
3217 /* first see whether we have a non-zero priority and set it as AIO prio */
3218 if (coro->prio)
3219 {
3220 dSP;
3221
3222 static SV *prio_cv;
3223 static SV *prio_sv;
3224
3225 if (ecb_expect_false (!prio_cv))
3226 {
3227 prio_cv = (SV *)get_cv ("IO::AIO::aioreq_pri", 0);
3228 prio_sv = newSViv (0);
3229 }
3230
3231 PUSHMARK (SP);
3232 sv_setiv (prio_sv, coro->prio);
3233 XPUSHs (prio_sv);
3234
3235 PUTBACK;
3236 call_sv (prio_cv, G_VOID | G_DISCARD);
3237 }
3238
3239 /* now call the original request */
3240 {
3241 dSP;
3242 CV *req = (CV *)CORO_MAGIC_NN ((SV *)cv, CORO_MAGIC_type_aio)->mg_obj;
3243 int i;
3244
3245 PUSHMARK (SP);
3246
3247 /* first push all args to the stack */
3248 EXTEND (SP, items + 1);
3249
3250 for (i = 0; i < items; ++i)
3251 PUSHs (arg [i]);
3252
3253 /* now push the callback closure */
3254 PUSHs (sv_2mortal (s_gensub (aTHX_ coro_aio_callback, (void *)SvREFCNT_inc_NN ((SV *)state))));
3255
3256 /* now call the AIO function - we assume our request is uncancelable */
3257 PUTBACK;
3258 call_sv ((SV *)req, G_VOID | G_DISCARD);
3259 }
3260
3261 /* now that the request is going, we loop till we have a result */
3262 frame->data = (void *)state;
3263 frame->prepare = prepare_schedule;
3264 frame->check = slf_check_aio_req;
3265}
3266
3267static void
3268coro_aio_req_xs (pTHX_ CV *cv)
3269{
3270 dXSARGS;
3271
3272 CORO_EXECUTE_SLF_XS (slf_init_aio_req);
3273
3274 XSRETURN_EMPTY;
3275}
3276
3277/*****************************************************************************/
3278
3279#if CORO_CLONE
3280# include "clone.c"
3281#endif
3282
3283/*****************************************************************************/
3284
3285static SV *
3286coro_new (pTHX_ HV *stash, SV **argv, int argc, int is_coro)
3287{
3288 SV *coro_sv;
3289 struct coro *coro;
3290 MAGIC *mg;
3291 HV *hv;
3292 SV *cb;
3293 int i;
3294
3295 if (argc > 0)
3296 {
3297 cb = s_get_cv_croak (argv [0]);
3298
3299 if (!is_coro)
3300 {
3301 if (CvISXSUB (cb))
3302 croak ("Coro::State doesn't support XS functions as coroutine start, caught");
3303
3304 if (!CvROOT (cb))
3305 croak ("Coro::State doesn't support autoloaded or undefined functions as coroutine start, caught");
3306 }
3307 }
3308
3309 Newz (0, coro, 1, struct coro);
3310 coro->args = newAV ();
3311 coro->flags = CF_NEW;
3312
3313 if (coro_first) coro_first->prev = coro;
3314 coro->next = coro_first;
3315 coro_first = coro;
3316
3317 coro->hv = hv = newHV ();
3318 mg = sv_magicext ((SV *)hv, 0, CORO_MAGIC_type_state, &coro_state_vtbl, (char *)coro, 0);
3319 mg->mg_flags |= MGf_DUP;
3320 coro_sv = sv_bless (newRV_noinc ((SV *)hv), stash);
3321
3322 if (argc > 0)
3323 {
3324 av_extend (coro->args, argc + is_coro - 1);
3325
3326 if (is_coro)
3327 {
3328 av_push (coro->args, SvREFCNT_inc_NN ((SV *)cb));
3329 cb = (SV *)cv_coro_run;
3330 }
3331
3332 coro->startcv = (CV *)SvREFCNT_inc_NN ((SV *)cb);
3333
3334 for (i = 1; i < argc; i++)
3335 av_push (coro->args, newSVsv (argv [i]));
3336 }
3337
3338 return coro_sv;
3339}
3340
3341#ifndef __cplusplus
3342ecb_cold XS(boot_Coro__State);
3343#endif
3344
3345#if CORO_JIT
3346
3347static void ecb_noinline ecb_cold
3348pushav_4uv (pTHX_ UV a, UV b, UV c, UV d)
3349{
3350 dSP;
3351 AV *av = newAV ();
3352
3353 av_store (av, 3, newSVuv (d));
3354 av_store (av, 2, newSVuv (c));
3355 av_store (av, 1, newSVuv (b));
3356 av_store (av, 0, newSVuv (a));
3357
3358 XPUSHs (sv_2mortal (newRV_noinc ((SV *)av)));
3359
3360 PUTBACK;
3361}
3362
3363static void ecb_noinline ecb_cold
3364jit_init (pTHX)
3365{
3366 dSP;
3367 SV *load, *save;
3368 char *map_base;
3369 char *load_ptr, *save_ptr;
3370 STRLEN load_len, save_len, map_len;
3371 int count;
3372
3373 eval_pv ("require 'Coro/jit-" CORO_JIT_TYPE ".pl'", 1);
3374
3375 PUSHMARK (SP);
3376#define VARx(name,expr,type) pushav_4uv (aTHX_ (UV)&(expr), sizeof (expr), offsetof (perl_slots, name), sizeof (type));
3377# include "state.h"
3378#undef VARx
3379 count = call_pv ("Coro::State::_jit", G_ARRAY);
3380 SPAGAIN;
3381
3382 save = POPs; save_ptr = SvPVbyte (save, save_len);
3383 load = POPs; load_ptr = SvPVbyte (load, load_len);
3384
3385 map_len = load_len + save_len + 16;
3386
3387 map_base = mmap (0, map_len, PROT_READ | PROT_WRITE | PROT_EXEC, MAP_PRIVATE | MAP_ANONYMOUS, -1, 0);
3388
3389 assert (("Coro: unable to mmap jit code page, cannot continue.", map_base != (char *)MAP_FAILED));
3390
3391 load_perl_slots = (load_save_perl_slots_type)map_base;
3392 memcpy (map_base, load_ptr, load_len);
3393
3394 map_base += (load_len + 15) & ~15;
3395
3396 save_perl_slots = (load_save_perl_slots_type)map_base;
3397 memcpy (map_base, save_ptr, save_len);
3398
3399 /* we are good citizens and try to make the page read-only, so the evil evil */
3400 /* hackers might have it a bit more difficult */
3401 mprotect (map_base, map_len, PROT_READ | PROT_EXEC);
3402
3403 PUTBACK;
3404 eval_pv ("undef &Coro::State::_jit", 1);
3405}
3406
3407#endif
3408
3409MODULE = Coro::State PACKAGE = Coro::State PREFIX = api_
3410
3411PROTOTYPES: DISABLE
3412
3413BOOT:
3414{
3415#ifdef USE_ITHREADS
3416# if CORO_PTHREAD
3417 coro_thx = PERL_GET_CONTEXT;
3418# endif
3419#endif
3420 BOOT_PAGESIZE;
3421
3422 /* perl defines these to check for existance first, but why it doesn't */
3423 /* just create them one at init time is not clear to me, except for */
3424 /* programs trying to delete them, but... */
3425 /* anyway, we declare this as invalid and make sure they are initialised here */
3426 DEFSV;
3427 ERRSV;
3428
3429 cctx_current = cctx_new_empty ();
3430
3431 irsgv = gv_fetchpv ("/" , GV_ADD|GV_NOTQUAL, SVt_PV);
3432 stdoutgv = gv_fetchpv ("STDOUT", GV_ADD|GV_NOTQUAL, SVt_PVIO);
3433
3434 orig_sigelem_get = PL_vtbl_sigelem.svt_get; PL_vtbl_sigelem.svt_get = coro_sigelem_get;
3435 orig_sigelem_set = PL_vtbl_sigelem.svt_set; PL_vtbl_sigelem.svt_set = coro_sigelem_set;
3436 orig_sigelem_clr = PL_vtbl_sigelem.svt_clear; PL_vtbl_sigelem.svt_clear = coro_sigelem_clr;
3437
3438 hv_sig = coro_get_hv (aTHX_ "SIG", TRUE);
3439 rv_diehook = newRV_inc ((SV *)gv_fetchpv ("Coro::State::diehook" , 0, SVt_PVCV));
3440 rv_warnhook = newRV_inc ((SV *)gv_fetchpv ("Coro::State::warnhook", 0, SVt_PVCV));
3441
3442 coro_state_stash = gv_stashpv ("Coro::State", TRUE);
3443
3444 newCONSTSUB (coro_state_stash, "CC_TRACE" , newSViv (CC_TRACE));
3445 newCONSTSUB (coro_state_stash, "CC_TRACE_SUB" , newSViv (CC_TRACE_SUB));
3446 newCONSTSUB (coro_state_stash, "CC_TRACE_LINE", newSViv (CC_TRACE_LINE));
3447 newCONSTSUB (coro_state_stash, "CC_TRACE_ALL" , newSViv (CC_TRACE_ALL));
3448
3449 main_mainstack = PL_mainstack;
3450 main_top_env = PL_top_env;
3451
3452 while (main_top_env->je_prev)
3453 main_top_env = main_top_env->je_prev;
3454
3455 {
3456 SV *slf = sv_2mortal (newSViv (PTR2IV (pp_slf)));
3457
3458 if (!PL_custom_op_names) PL_custom_op_names = newHV ();
3459 hv_store_ent (PL_custom_op_names, slf, newSVpv ("coro_slf", 0), 0);
3460
3461 if (!PL_custom_op_descs) PL_custom_op_descs = newHV ();
3462 hv_store_ent (PL_custom_op_descs, slf, newSVpv ("coro schedule like function", 0), 0);
3463 }
3464
3465 coroapi.ver = CORO_API_VERSION;
3466 coroapi.rev = CORO_API_REVISION;
3467
3468 coroapi.transfer = api_transfer;
3469
3470 coroapi.sv_state = SvSTATE_;
3471 coroapi.execute_slf = api_execute_slf;
3472 coroapi.prepare_nop = prepare_nop;
3473 coroapi.prepare_schedule = prepare_schedule;
3474 coroapi.prepare_cede = prepare_cede;
3475 coroapi.prepare_cede_notself = prepare_cede_notself;
3476
3477 time_init (aTHX);
3478
3479 assert (("PRIO_NORMAL must be 0", !CORO_PRIO_NORMAL));
3480#if CORO_JIT
3481 PUTBACK;
3482 jit_init (aTHX);
3483 SPAGAIN;
3484#endif
3485}
3486
3487SV *
3488new (SV *klass, ...)
3489 ALIAS:
3490 Coro::new = 1
3491 CODE:
3492 RETVAL = coro_new (aTHX_ ix ? coro_stash : coro_state_stash, &ST (1), items - 1, ix);
3493 OUTPUT:
3494 RETVAL
3495
3496void
3497transfer (...)
3498 PROTOTYPE: $$
3499 CODE:
3500 CORO_EXECUTE_SLF_XS (slf_init_transfer);
3501
3502void
3503_exit (int code)
3504 PROTOTYPE: $
3505 CODE:
3506 _exit (code);
3507
3508SV *
3509clone (Coro::State coro)
3510 CODE:
3511{
3512#if CORO_CLONE
3513 struct coro *ncoro = coro_clone (aTHX_ coro);
3514 MAGIC *mg;
3515 /* TODO: too much duplication */
3516 ncoro->hv = newHV ();
3517 mg = sv_magicext ((SV *)ncoro->hv, 0, CORO_MAGIC_type_state, &coro_state_vtbl, (char *)ncoro, 0);
3518 mg->mg_flags |= MGf_DUP;
3519 RETVAL = sv_bless (newRV_noinc ((SV *)ncoro->hv), SvSTASH (coro->hv));
3520#else
3521 croak ("Coro::State->clone has not been configured into this installation of Coro, realised");
3522#endif
3523}
3524 OUTPUT:
3525 RETVAL
3526
3527int
3528cctx_stacksize (int new_stacksize = 0)
3529 PROTOTYPE: ;$
3530 CODE:
3531 RETVAL = cctx_stacksize;
3532 if (new_stacksize)
3533 {
3534 cctx_stacksize = new_stacksize;
3535 ++cctx_gen;
3536 }
3537 OUTPUT:
3538 RETVAL
3539
3540int
3541cctx_max_idle (int max_idle = 0)
3542 PROTOTYPE: ;$
3543 CODE:
3544 RETVAL = cctx_max_idle;
3545 if (max_idle > 1)
3546 cctx_max_idle = max_idle;
3547 OUTPUT:
3548 RETVAL
3549
3550int
3551cctx_count ()
3552 PROTOTYPE:
3553 CODE:
3554 RETVAL = cctx_count;
3555 OUTPUT:
3556 RETVAL
3557
3558int
3559cctx_idle ()
3560 PROTOTYPE:
3561 CODE:
3562 RETVAL = cctx_idle;
3563 OUTPUT:
3564 RETVAL
3565
3566void
3567list ()
3568 PROTOTYPE:
3569 PPCODE:
3570{
3571 struct coro *coro;
3572 for (coro = coro_first; coro; coro = coro->next)
3573 if (coro->hv)
3574 XPUSHs (sv_2mortal (newRV_inc ((SV *)coro->hv)));
3575}
3576
3577void
3578call (Coro::State coro, SV *coderef)
3579 ALIAS:
3580 eval = 1
3581 CODE:
3582{
3583 if (coro->mainstack && ((coro->flags & CF_RUNNING) || coro->slot))
3584 {
3585 struct coro *current = SvSTATE_current;
3586 struct CoroSLF slf_save;
3587
3588 if (current != coro)
235 { 3589 {
236 /* I never used formats, so how should I know how these are implemented? */ 3590 PUTBACK;
237 /* my bold guess is as a simple, plain sub... */ 3591 save_perl (aTHX_ current);
238 croak ("CXt_FORMAT not yet handled. Don't switch coroutines from within formats"); 3592 load_perl (aTHX_ coro);
3593 /* the coro is most likely in an active SLF call.
3594 * while not strictly required (the code we execute is
3595 * not allowed to call any SLF functions), it's cleaner
3596 * to reinitialise the slf_frame and restore it later.
3597 * This might one day allow us to actually do SLF calls
3598 * from code executed here.
3599 */
3600 slf_save = slf_frame;
3601 slf_frame.prepare = 0;
3602 SPAGAIN;
3603 }
3604
3605 PUSHSTACK;
3606
3607 PUSHMARK (SP);
3608 PUTBACK;
3609
3610 if (ix)
3611 eval_sv (coderef, 0);
3612 else
3613 call_sv (coderef, G_KEEPERR | G_EVAL | G_VOID | G_DISCARD);
3614
3615 POPSTACK;
3616 SPAGAIN;
3617
3618 if (current != coro)
3619 {
3620 PUTBACK;
3621 slf_frame = slf_save;
3622 save_perl (aTHX_ coro);
3623 load_perl (aTHX_ current);
3624 SPAGAIN;
239 } 3625 }
240 } 3626 }
241
242 if (top_si->si_type == PERLSI_MAIN)
243 break;
244
245 top_si = top_si->si_prev;
246 ccstk = top_si->si_cxstack;
247 cxix = top_si->si_cxix;
248 }
249
250 PUTBACK;
251 }
252
253 c->dowarn = PL_dowarn;
254 c->defav = GvAV (PL_defgv);
255 c->curstackinfo = PL_curstackinfo;
256 c->curstack = PL_curstack;
257 c->mainstack = PL_mainstack;
258 c->stack_sp = PL_stack_sp;
259 c->op = PL_op;
260 c->curpad = PL_curpad;
261 c->stack_base = PL_stack_base;
262 c->stack_max = PL_stack_max;
263 c->tmps_stack = PL_tmps_stack;
264 c->tmps_floor = PL_tmps_floor;
265 c->tmps_ix = PL_tmps_ix;
266 c->tmps_max = PL_tmps_max;
267 c->markstack = PL_markstack;
268 c->markstack_ptr = PL_markstack_ptr;
269 c->markstack_max = PL_markstack_max;
270 c->scopestack = PL_scopestack;
271 c->scopestack_ix = PL_scopestack_ix;
272 c->scopestack_max = PL_scopestack_max;
273 c->savestack = PL_savestack;
274 c->savestack_ix = PL_savestack_ix;
275 c->savestack_max = PL_savestack_max;
276 c->retstack = PL_retstack;
277 c->retstack_ix = PL_retstack_ix;
278 c->retstack_max = PL_retstack_max;
279 c->curcop = PL_curcop;
280} 3627}
281 3628
282#define LOAD(state) do { load_state(aTHX_ state); SPAGAIN; } while (0) 3629SV *
283#define SAVE(state) do { PUTBACK; save_state(aTHX_ state); } while (0) 3630is_ready (Coro::State coro)
284
285static void
286load_state(pTHX_ Coro__State c)
287{
288 PL_dowarn = c->dowarn;
289 GvAV (PL_defgv) = c->defav;
290 PL_curstackinfo = c->curstackinfo;
291 PL_curstack = c->curstack;
292 PL_mainstack = c->mainstack;
293 PL_stack_sp = c->stack_sp;
294 PL_op = c->op;
295 PL_curpad = c->curpad;
296 PL_stack_base = c->stack_base;
297 PL_stack_max = c->stack_max;
298 PL_tmps_stack = c->tmps_stack;
299 PL_tmps_floor = c->tmps_floor;
300 PL_tmps_ix = c->tmps_ix;
301 PL_tmps_max = c->tmps_max;
302 PL_markstack = c->markstack;
303 PL_markstack_ptr = c->markstack_ptr;
304 PL_markstack_max = c->markstack_max;
305 PL_scopestack = c->scopestack;
306 PL_scopestack_ix = c->scopestack_ix;
307 PL_scopestack_max = c->scopestack_max;
308 PL_savestack = c->savestack;
309 PL_savestack_ix = c->savestack_ix;
310 PL_savestack_max = c->savestack_max;
311 PL_retstack = c->retstack;
312 PL_retstack_ix = c->retstack_ix;
313 PL_retstack_max = c->retstack_max;
314 PL_curcop = c->curcop;
315
316 {
317 dSP;
318 CV *cv;
319
320 /* now do the ugly restore mess */
321 while ((cv = (CV *)POPs))
322 {
323 AV *padlist = (AV *)POPs;
324
325 put_padlist (cv);
326 CvPADLIST(cv) = padlist;
327 CvDEPTH(cv) = (I32)POPs;
328
329#ifdef USE_THREADS
330 CvOWNER(cv) = (struct perl_thread *)POPs;
331 error does not work either
332#endif
333 }
334
335 PUTBACK;
336 }
337}
338
339/* this is an EXACT copy of S_nuke_stacks in perl.c, which is unfortunately static */
340STATIC void
341destroy_stacks(pTHX)
342{
343 /* die does this while calling POPSTACK, but I just don't see why. */
344 /* OTOH, die does not have a memleak, but we do... */
345 dounwind(-1);
346
347 /* is this ugly, I ask? */
348 while (PL_scopestack_ix)
349 LEAVE;
350
351 while (PL_curstackinfo->si_next)
352 PL_curstackinfo = PL_curstackinfo->si_next;
353
354 while (PL_curstackinfo)
355 {
356 PERL_SI *p = PL_curstackinfo->si_prev;
357
358 SvREFCNT_dec(PL_curstackinfo->si_stack);
359 Safefree(PL_curstackinfo->si_cxstack);
360 Safefree(PL_curstackinfo);
361 PL_curstackinfo = p;
362 }
363
364 if (PL_scopestack_ix != 0)
365 Perl_warner(aTHX_ WARN_INTERNAL,
366 "Unbalanced scopes: %ld more ENTERs than LEAVEs\n",
367 (long)PL_scopestack_ix);
368 if (PL_savestack_ix != 0)
369 Perl_warner(aTHX_ WARN_INTERNAL,
370 "Unbalanced saves: %ld more saves than restores\n",
371 (long)PL_savestack_ix);
372 if (PL_tmps_floor != -1)
373 Perl_warner(aTHX_ WARN_INTERNAL,"Unbalanced tmps: %ld more allocs than frees\n",
374 (long)PL_tmps_floor + 1);
375 /*
376 */
377 Safefree(PL_tmps_stack);
378 Safefree(PL_markstack);
379 Safefree(PL_scopestack);
380 Safefree(PL_savestack);
381 Safefree(PL_retstack);
382}
383
384#define SUB_INIT "Coro::State::_newcoro"
385
386MODULE = Coro::State PACKAGE = Coro::State
387
388PROTOTYPES: ENABLE
389
390BOOT:
391 if (!padlist_cache)
392 padlist_cache = newHV ();
393
394Coro::State
395_newprocess(args)
396 SV * args
397 PROTOTYPE: $ 3631 PROTOTYPE: $
3632 ALIAS:
3633 is_ready = CF_READY
3634 is_running = CF_RUNNING
3635 is_new = CF_NEW
3636 is_destroyed = CF_ZOMBIE
3637 is_zombie = CF_ZOMBIE
3638 is_suspended = CF_SUSPENDED
3639 CODE:
3640 RETVAL = boolSV (coro->flags & ix);
3641 OUTPUT:
3642 RETVAL
3643
3644void
3645throw (Coro::State self, SV *exception = &PL_sv_undef)
3646 PROTOTYPE: $;$
398 CODE: 3647 CODE:
399 Coro__State coro; 3648{
3649 struct coro *current = SvSTATE_current;
3650 SV **exceptionp = self == current ? &CORO_THROW : &self->except;
3651 SvREFCNT_dec (*exceptionp);
3652 SvGETMAGIC (exception);
3653 *exceptionp = SvOK (exception) ? newSVsv (exception) : 0;
3654}
400 3655
401 if (!SvROK (args) || SvTYPE (SvRV (args)) != SVt_PVAV) 3656void
402 croak ("Coro::State::newprocess expects an arrayref"); 3657api_trace (SV *coro, int flags = CC_TRACE | CC_TRACE_SUB)
3658 PROTOTYPE: $;$
3659 C_ARGS: aTHX_ coro, flags
3660
3661SV *
3662has_cctx (Coro::State coro)
3663 PROTOTYPE: $
3664 CODE:
3665 /* maybe manage the running flag differently */
3666 RETVAL = boolSV (!!coro->cctx || (coro->flags & CF_RUNNING));
3667 OUTPUT:
3668 RETVAL
3669
3670int
3671is_traced (Coro::State coro)
3672 PROTOTYPE: $
3673 CODE:
3674 RETVAL = (coro->cctx ? coro->cctx->flags : 0) & CC_TRACE_ALL;
3675 OUTPUT:
3676 RETVAL
3677
3678UV
3679rss (Coro::State coro)
3680 PROTOTYPE: $
3681 ALIAS:
3682 usecount = 1
3683 CODE:
3684 switch (ix)
3685 {
3686 case 0: RETVAL = coro_rss (aTHX_ coro); break;
3687 case 1: RETVAL = coro->usecount; break;
403 3688 }
404 New (0, coro, 1, struct coro); 3689 OUTPUT:
405
406 coro->mainstack = 0; /* actual work is done inside transfer */
407 coro->args = (AV *)SvREFCNT_inc (SvRV (args));
408
409 RETVAL = coro; 3690 RETVAL
3691
3692void
3693force_cctx ()
3694 PROTOTYPE:
3695 CODE:
3696 cctx_current->idle_sp = 0;
3697
3698void
3699swap_defsv (Coro::State self)
3700 PROTOTYPE: $
3701 ALIAS:
3702 swap_defav = 1
3703 CODE:
3704 if (!self->slot)
3705 croak ("cannot swap state with coroutine that has no saved state,");
3706 else
3707 {
3708 SV **src = ix ? (SV **)&GvAV (PL_defgv) : &GvSV (PL_defgv);
3709 SV **dst = ix ? (SV **)&self->slot->defav : (SV **)&self->slot->defsv;
3710
3711 SV *tmp = *src; *src = *dst; *dst = tmp;
3712 }
3713
3714void
3715cancel (Coro::State self)
3716 CODE:
3717 coro_state_destroy (aTHX_ self);
3718
3719SV *
3720enable_times (int enabled = enable_times)
3721 CODE:
3722{
3723 RETVAL = boolSV (enable_times);
3724
3725 if (enabled != enable_times)
3726 {
3727 enable_times = enabled;
3728
3729 coro_times_update ();
3730 (enabled ? coro_times_sub : coro_times_add)(SvSTATE (coro_current));
3731 }
3732}
410 OUTPUT: 3733 OUTPUT:
411 RETVAL 3734 RETVAL
412 3735
413void 3736void
414transfer(prev,next) 3737times (Coro::State self)
415 Coro::State_or_hashref prev 3738 PPCODE:
416 Coro::State_or_hashref next 3739{
3740 struct coro *current = SvSTATE (coro_current);
3741
3742 if (ecb_expect_false (current == self))
3743 {
3744 coro_times_update ();
3745 coro_times_add (SvSTATE (coro_current));
3746 }
3747
3748 EXTEND (SP, 2);
3749 PUSHs (sv_2mortal (newSVnv (self->t_real [0] + self->t_real [1] * 1e-9)));
3750 PUSHs (sv_2mortal (newSVnv (self->t_cpu [0] + self->t_cpu [1] * 1e-9)));
3751
3752 if (ecb_expect_false (current == self))
3753 coro_times_sub (SvSTATE (coro_current));
3754}
3755
3756void
3757swap_sv (Coro::State coro, SV *sv, SV *swapsv)
3758 CODE:
3759{
3760 struct coro *current = SvSTATE_current;
3761
3762 if (current == coro)
3763 SWAP_SVS (current);
3764
3765 if (!coro->swap_sv)
3766 coro->swap_sv = newAV ();
3767
3768 av_push (coro->swap_sv, SvREFCNT_inc_NN (SvRV (sv )));
3769 av_push (coro->swap_sv, SvREFCNT_inc_NN (SvRV (swapsv)));
3770
3771 if (current == coro)
3772 SWAP_SVS (current);
3773}
3774
3775
3776MODULE = Coro::State PACKAGE = Coro
3777
3778BOOT:
3779{
3780 sv_pool_rss = coro_get_sv (aTHX_ "Coro::POOL_RSS" , TRUE);
3781 sv_pool_size = coro_get_sv (aTHX_ "Coro::POOL_SIZE" , TRUE);
3782 cv_coro_run = get_cv ( "Coro::_coro_run" , GV_ADD);
3783 coro_current = coro_get_sv (aTHX_ "Coro::current" , FALSE); SvREADONLY_on (coro_current);
3784 av_async_pool = coro_get_av (aTHX_ "Coro::async_pool", TRUE);
3785 av_destroy = coro_get_av (aTHX_ "Coro::destroy" , TRUE);
3786 sv_manager = coro_get_sv (aTHX_ "Coro::manager" , TRUE);
3787 sv_idle = coro_get_sv (aTHX_ "Coro::idle" , TRUE);
3788
3789 sv_async_pool_idle = newSVpv ("[async pool idle]", 0); SvREADONLY_on (sv_async_pool_idle);
3790 sv_Coro = newSVpv ("Coro", 0); SvREADONLY_on (sv_Coro);
3791 cv_pool_handler = get_cv ("Coro::pool_handler", GV_ADD); SvREADONLY_on (cv_pool_handler);
3792 CvNODEBUG_on (get_cv ("Coro::_pool_handler", 0)); /* work around a debugger bug */
3793
3794 coro_stash = gv_stashpv ("Coro", TRUE);
3795
3796 newCONSTSUB (coro_stash, "PRIO_MAX", newSViv (CORO_PRIO_MAX));
3797 newCONSTSUB (coro_stash, "PRIO_HIGH", newSViv (CORO_PRIO_HIGH));
3798 newCONSTSUB (coro_stash, "PRIO_NORMAL", newSViv (CORO_PRIO_NORMAL));
3799 newCONSTSUB (coro_stash, "PRIO_LOW", newSViv (CORO_PRIO_LOW));
3800 newCONSTSUB (coro_stash, "PRIO_IDLE", newSViv (CORO_PRIO_IDLE));
3801 newCONSTSUB (coro_stash, "PRIO_MIN", newSViv (CORO_PRIO_MIN));
3802
3803 {
3804 SV *sv = coro_get_sv (aTHX_ "Coro::API", TRUE);
3805
3806 coroapi.schedule = api_schedule;
3807 coroapi.schedule_to = api_schedule_to;
3808 coroapi.cede = api_cede;
3809 coroapi.cede_notself = api_cede_notself;
3810 coroapi.ready = api_ready;
3811 coroapi.is_ready = api_is_ready;
3812 coroapi.nready = coro_nready;
3813 coroapi.current = coro_current;
3814
3815 /*GCoroAPI = &coroapi;*/
3816 sv_setiv (sv, (IV)&coroapi);
3817 SvREADONLY_on (sv);
3818 }
3819}
3820
3821SV *
3822async (...)
3823 PROTOTYPE: &@
417 CODE: 3824 CODE:
3825 RETVAL = coro_new (aTHX_ coro_stash, &ST (0), items, 1);
3826 api_ready (aTHX_ RETVAL);
3827 OUTPUT:
3828 RETVAL
418 3829
419 if (prev != next) 3830void
3831_destroy (Coro::State coro)
3832 CODE:
3833 /* used by the manager thread */
3834 coro_state_destroy (aTHX_ coro);
3835
3836void
3837on_destroy (Coro::State coro, SV *cb)
3838 CODE:
3839 coro_push_on_destroy (aTHX_ coro, newSVsv (cb));
3840
3841void
3842join (...)
3843 CODE:
3844 CORO_EXECUTE_SLF_XS (slf_init_join);
3845
3846void
3847terminate (...)
3848 CODE:
3849 CORO_EXECUTE_SLF_XS (slf_init_terminate);
3850
3851void
3852cancel (...)
3853 CODE:
3854 CORO_EXECUTE_SLF_XS (slf_init_cancel);
3855
3856int
3857safe_cancel (Coro::State self, ...)
3858 C_ARGS: aTHX_ self, &ST (1), items - 1
3859
3860void
3861schedule (...)
3862 CODE:
3863 CORO_EXECUTE_SLF_XS (slf_init_schedule);
3864
3865void
3866schedule_to (...)
3867 CODE:
3868 CORO_EXECUTE_SLF_XS (slf_init_schedule_to);
3869
3870void
3871cede_to (...)
3872 CODE:
3873 CORO_EXECUTE_SLF_XS (slf_init_cede_to);
3874
3875void
3876cede (...)
3877 CODE:
3878 CORO_EXECUTE_SLF_XS (slf_init_cede);
3879
3880void
3881cede_notself (...)
3882 CODE:
3883 CORO_EXECUTE_SLF_XS (slf_init_cede_notself);
3884
3885void
3886_set_current (SV *current)
3887 PROTOTYPE: $
3888 CODE:
3889 SvREFCNT_dec (SvRV (coro_current));
3890 SvRV_set (coro_current, SvREFCNT_inc_NN (SvRV (current)));
3891
3892void
3893_set_readyhook (SV *hook)
3894 PROTOTYPE: $
3895 CODE:
3896 SvREFCNT_dec (coro_readyhook);
3897 SvGETMAGIC (hook);
3898 if (SvOK (hook))
3899 {
3900 coro_readyhook = newSVsv (hook);
3901 CORO_READYHOOK = invoke_sv_ready_hook_helper;
3902 }
3903 else
420 { 3904 {
421 /* 3905 coro_readyhook = 0;
422 * this could be done in newprocess which would lead to 3906 CORO_READYHOOK = 0;
423 * extremely elegant and fast (just SAVE/LOAD)
424 * code here, but lazy allocation of stacks has also
425 * some virtues and the overhead of the if() is nil.
426 */
427 if (next->mainstack)
428 {
429 SAVE (prev);
430 LOAD (next);
431 /* mark this state as in-use */
432 next->mainstack = 0;
433 next->tmps_ix = -2;
434 }
435 else if (next->tmps_ix == -2)
436 {
437 croak ("tried to transfer to running coroutine");
438 }
439 else
440 {
441 SAVE (prev);
442
443 /*
444 * emulate part of the perl startup here.
445 */
446 UNOP myop;
447
448 init_stacks (); /* from perl.c */
449 PL_op = (OP *)&myop;
450 /*PL_curcop = 0;*/
451 GvAV (PL_defgv) = (AV *)SvREFCNT_inc ((SV *)next->args);
452
453 SPAGAIN;
454 Zero(&myop, 1, UNOP);
455 myop.op_next = Nullop;
456 myop.op_flags = OPf_WANT_VOID;
457
458 PUSHMARK(SP);
459 XPUSHs ((SV*)get_cv(SUB_INIT, TRUE));
460 PUTBACK;
461 /*
462 * the next line is slightly wrong, as PL_op->op_next
463 * is actually being executed so we skip the first op.
464 * that doesn't matter, though, since it is only
465 * pp_nextstate and we never return...
466 */
467 PL_op = Perl_pp_entersub(aTHX);
468 SPAGAIN;
469
470 ENTER;
471 }
472 } 3907 }
473 3908
3909int
3910prio (Coro::State coro, int newprio = 0)
3911 PROTOTYPE: $;$
3912 ALIAS:
3913 nice = 1
3914 CODE:
3915{
3916 RETVAL = coro->prio;
3917
3918 if (items > 1)
3919 {
3920 if (ix)
3921 newprio = coro->prio - newprio;
3922
3923 if (newprio < CORO_PRIO_MIN) newprio = CORO_PRIO_MIN;
3924 if (newprio > CORO_PRIO_MAX) newprio = CORO_PRIO_MAX;
3925
3926 coro->prio = newprio;
3927 }
3928}
3929 OUTPUT:
3930 RETVAL
3931
3932SV *
3933ready (SV *self)
3934 PROTOTYPE: $
3935 CODE:
3936 RETVAL = boolSV (api_ready (aTHX_ self));
3937 OUTPUT:
3938 RETVAL
3939
3940int
3941nready (...)
3942 PROTOTYPE:
3943 CODE:
3944 RETVAL = coro_nready;
3945 OUTPUT:
3946 RETVAL
3947
474void 3948void
475DESTROY(coro) 3949suspend (Coro::State self)
476 Coro::State coro 3950 PROTOTYPE: $
3951 CODE:
3952 self->flags |= CF_SUSPENDED;
3953
3954void
3955resume (Coro::State self)
3956 PROTOTYPE: $
3957 CODE:
3958 self->flags &= ~CF_SUSPENDED;
3959
3960void
3961_pool_handler (...)
3962 CODE:
3963 CORO_EXECUTE_SLF_XS (slf_init_pool_handler);
3964
3965void
3966async_pool (SV *cv, ...)
3967 PROTOTYPE: &@
3968 PPCODE:
3969{
3970 HV *hv = (HV *)av_pop (av_async_pool);
3971 AV *av = newAV ();
3972 SV *cb = ST (0);
3973 int i;
3974
3975 av_extend (av, items - 2);
3976 for (i = 1; i < items; ++i)
3977 av_push (av, SvREFCNT_inc_NN (ST (i)));
3978
3979 if ((SV *)hv == &PL_sv_undef)
3980 {
3981 SV *sv = coro_new (aTHX_ coro_stash, (SV **)&cv_pool_handler, 1, 1);
3982 hv = (HV *)SvREFCNT_inc_NN (SvRV (sv));
3983 SvREFCNT_dec (sv);
3984 }
3985
3986 {
3987 struct coro *coro = SvSTATE_hv (hv);
3988
3989 assert (!coro->invoke_cb);
3990 assert (!coro->invoke_av);
3991 coro->invoke_cb = SvREFCNT_inc (cb);
3992 coro->invoke_av = av;
3993 }
3994
3995 api_ready (aTHX_ (SV *)hv);
3996
3997 if (GIMME_V != G_VOID)
3998 XPUSHs (sv_2mortal (newRV_noinc ((SV *)hv)));
3999 else
4000 SvREFCNT_dec (hv);
4001}
4002
4003SV *
4004rouse_cb ()
4005 PROTOTYPE:
4006 CODE:
4007 RETVAL = coro_new_rouse_cb (aTHX);
4008 OUTPUT:
4009 RETVAL
4010
4011void
4012rouse_wait (...)
4013 PROTOTYPE: ;$
4014 PPCODE:
4015 CORO_EXECUTE_SLF_XS (slf_init_rouse_wait);
4016
4017void
4018on_enter (SV *block)
4019 ALIAS:
4020 on_leave = 1
4021 PROTOTYPE: &
4022 CODE:
4023{
4024 struct coro *coro = SvSTATE_current;
4025 AV **avp = ix ? &coro->on_leave : &coro->on_enter;
4026
4027 block = s_get_cv_croak (block);
4028
4029 if (!*avp)
4030 *avp = newAV ();
4031
4032 av_push (*avp, SvREFCNT_inc (block));
4033
4034 if (!ix)
4035 on_enterleave_call (aTHX_ block);
4036
4037 LEAVE; /* pp_entersub unfortunately forces an ENTER/LEAVE around XS calls */
4038 SAVEDESTRUCTOR_X (ix ? coro_pop_on_leave : coro_pop_on_enter, (void *)coro);
4039 ENTER; /* pp_entersub unfortunately forces an ENTER/LEAVE around XS calls */
4040}
4041
4042
4043MODULE = Coro::State PACKAGE = PerlIO::cede
4044
4045BOOT:
4046 PerlIO_define_layer (aTHX_ &PerlIO_cede);
4047
4048
4049MODULE = Coro::State PACKAGE = Coro::Semaphore
4050
4051SV *
4052new (SV *klass, SV *count = 0)
4053 CODE:
4054{
4055 int semcnt = 1;
4056
4057 if (count)
4058 {
4059 SvGETMAGIC (count);
4060
4061 if (SvOK (count))
4062 semcnt = SvIV (count);
4063 }
4064
4065 RETVAL = sv_bless (
4066 coro_waitarray_new (aTHX_ semcnt),
4067 GvSTASH (CvGV (cv))
4068 );
4069}
4070 OUTPUT:
4071 RETVAL
4072
4073# helper for Coro::Channel and others
4074SV *
4075_alloc (int count)
4076 CODE:
4077 RETVAL = coro_waitarray_new (aTHX_ count);
4078 OUTPUT:
4079 RETVAL
4080
4081SV *
4082count (SV *self)
4083 CODE:
4084 RETVAL = newSVsv (AvARRAY ((AV *)SvRV (self))[0]);
4085 OUTPUT:
4086 RETVAL
4087
4088void
4089up (SV *self, int adjust = 1)
4090 ALIAS:
4091 adjust = 1
477 CODE: 4092 CODE:
4093 coro_semaphore_adjust (aTHX_ (AV *)SvRV (self), ix ? adjust : 1);
478 4094
479 if (coro->mainstack) 4095void
4096down (...)
4097 CODE:
4098 CORO_EXECUTE_SLF_XS (slf_init_semaphore_down);
4099
4100void
4101wait (...)
4102 CODE:
4103 CORO_EXECUTE_SLF_XS (slf_init_semaphore_wait);
4104
4105void
4106try (SV *self)
4107 PPCODE:
4108{
4109 AV *av = (AV *)SvRV (self);
4110 SV *count_sv = AvARRAY (av)[0];
4111 IV count = SvIVX (count_sv);
4112
4113 if (count > 0)
480 { 4114 {
481 struct coro temp; 4115 --count;
482 4116 SvIVX (count_sv) = count;
483 SAVE(aTHX_ (&temp)); 4117 XSRETURN_YES;
484 LOAD(aTHX_ coro);
485
486 destroy_stacks ();
487 SvREFCNT_dec ((SV *)GvAV (PL_defgv));
488
489 LOAD((&temp));
490 } 4118 }
4119 else
4120 XSRETURN_NO;
4121}
491 4122
492 SvREFCNT_dec (coro->args); 4123void
493 Safefree (coro); 4124waiters (SV *self)
4125 PPCODE:
4126{
4127 AV *av = (AV *)SvRV (self);
4128 int wcount = AvFILLp (av) + 1 - 1;
494 4129
4130 if (GIMME_V == G_SCALAR)
4131 XPUSHs (sv_2mortal (newSViv (wcount)));
4132 else
4133 {
4134 int i;
4135 EXTEND (SP, wcount);
4136 for (i = 1; i <= wcount; ++i)
4137 PUSHs (sv_2mortal (newRV_inc (AvARRAY (av)[i])));
4138 }
4139}
495 4140
4141MODULE = Coro::State PACKAGE = Coro::SemaphoreSet
4142
4143void
4144_may_delete (SV *sem, int count, unsigned int extra_refs)
4145 PPCODE:
4146{
4147 AV *av = (AV *)SvRV (sem);
4148
4149 if (SvREFCNT ((SV *)av) == 1 + extra_refs
4150 && AvFILLp (av) == 0 /* no waiters, just count */
4151 && SvIV (AvARRAY (av)[0]) == count)
4152 XSRETURN_YES;
4153
4154 XSRETURN_NO;
4155}
4156
4157MODULE = Coro::State PACKAGE = Coro::Signal
4158
4159SV *
4160new (SV *klass)
4161 CODE:
4162 RETVAL = sv_bless (
4163 coro_waitarray_new (aTHX_ 0),
4164 GvSTASH (CvGV (cv))
4165 );
4166 OUTPUT:
4167 RETVAL
4168
4169void
4170wait (...)
4171 CODE:
4172 CORO_EXECUTE_SLF_XS (slf_init_signal_wait);
4173
4174void
4175broadcast (SV *self)
4176 CODE:
4177{
4178 AV *av = (AV *)SvRV (self);
4179 coro_signal_wake (aTHX_ av, AvFILLp (av));
4180}
4181
4182void
4183send (SV *self)
4184 CODE:
4185{
4186 AV *av = (AV *)SvRV (self);
4187
4188 if (AvFILLp (av))
4189 coro_signal_wake (aTHX_ av, 1);
4190 else
4191 SvIVX (AvARRAY (av)[0]) = 1; /* remember the signal */
4192}
4193
4194IV
4195awaited (SV *self)
4196 CODE:
4197 RETVAL = AvFILLp ((AV *)SvRV (self)) + 1 - 1;
4198 OUTPUT:
4199 RETVAL
4200
4201
4202MODULE = Coro::State PACKAGE = Coro::AnyEvent
4203
4204BOOT:
4205 sv_activity = coro_get_sv (aTHX_ "Coro::AnyEvent::ACTIVITY", TRUE);
4206
4207void
4208_schedule (...)
4209 CODE:
4210{
4211 static int incede;
4212
4213 api_cede_notself (aTHX);
4214
4215 ++incede;
4216 while (coro_nready >= incede && api_cede (aTHX))
4217 ;
4218
4219 sv_setsv (sv_activity, &PL_sv_undef);
4220 if (coro_nready >= incede)
4221 {
4222 PUSHMARK (SP);
4223 PUTBACK;
4224 call_pv ("Coro::AnyEvent::_activity", G_KEEPERR | G_EVAL | G_VOID | G_DISCARD);
4225 }
4226
4227 --incede;
4228}
4229
4230
4231MODULE = Coro::State PACKAGE = Coro::AIO
4232
4233void
4234_register (char *target, char *proto, SV *req)
4235 CODE:
4236{
4237 SV *req_cv = s_get_cv_croak (req);
4238 /* newXSproto doesn't return the CV on 5.8 */
4239 CV *slf_cv = newXS (target, coro_aio_req_xs, __FILE__);
4240 sv_setpv ((SV *)slf_cv, proto);
4241 sv_magicext ((SV *)slf_cv, (SV *)req_cv, CORO_MAGIC_type_aio, 0, 0, 0);
4242}
4243
4244MODULE = Coro::State PACKAGE = Coro::Select
4245
4246void
4247patch_pp_sselect ()
4248 CODE:
4249 if (!coro_old_pp_sselect)
4250 {
4251 coro_select_select = (SV *)get_cv ("Coro::Select::select", 0);
4252 coro_old_pp_sselect = PL_ppaddr [OP_SSELECT];
4253 PL_ppaddr [OP_SSELECT] = coro_pp_sselect;
4254 }
4255
4256void
4257unpatch_pp_sselect ()
4258 CODE:
4259 if (coro_old_pp_sselect)
4260 {
4261 PL_ppaddr [OP_SSELECT] = coro_old_pp_sselect;
4262 coro_old_pp_sselect = 0;
4263 }
4264

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines