ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/Coro/Coro/State.xs
(Generate patch)

Comparing Coro/Coro/State.xs (file contents):
Revision 1.3 by root, Tue Jul 17 00:24:15 2001 UTC vs.
Revision 1.364 by root, Thu Jul 16 02:19:17 2009 UTC

1#include "libcoro/coro.c"
2
3#define PERL_NO_GET_CONTEXT
4#define PERL_EXT
5
1#include "EXTERN.h" 6#include "EXTERN.h"
2#include "perl.h" 7#include "perl.h"
3#include "XSUB.h" 8#include "XSUB.h"
9#include "perliol.h"
4 10
5#if 0 11#include "schmorp.h"
6# define CHK(x) (void *)0 12
13#include <stdio.h>
14#include <errno.h>
15#include <assert.h>
16
17#ifdef WIN32
18# undef setjmp
19# undef longjmp
20# undef _exit
21# define setjmp _setjmp /* deep magic */
7#else 22#else
8# define CHK(x) if (!(x)) croak("FATAL, CHK: " #x) 23# include <inttypes.h> /* most portable stdint.h */
9#endif 24#endif
10 25
26#ifdef HAVE_MMAP
27# include <unistd.h>
28# include <sys/mman.h>
29# ifndef MAP_ANONYMOUS
30# ifdef MAP_ANON
31# define MAP_ANONYMOUS MAP_ANON
32# else
33# undef HAVE_MMAP
34# endif
35# endif
36# include <limits.h>
37# ifndef PAGESIZE
38# define PAGESIZE pagesize
39# define BOOT_PAGESIZE pagesize = sysconf (_SC_PAGESIZE)
40static long pagesize;
41# else
42# define BOOT_PAGESIZE (void)0
43# endif
44#else
45# define PAGESIZE 0
46# define BOOT_PAGESIZE (void)0
47#endif
48
49#if CORO_USE_VALGRIND
50# include <valgrind/valgrind.h>
51#endif
52
53/* the maximum number of idle cctx that will be pooled */
54static int cctx_max_idle = 4;
55
56#if !__i386 && !__x86_64 && !__powerpc && !__m68k && !__alpha && !__mips && !__sparc64
57# undef CORO_STACKGUARD
58#endif
59
60#ifndef CORO_STACKGUARD
61# define CORO_STACKGUARD 0
62#endif
63
64/* prefer perl internal functions over our own? */
65#ifndef CORO_PREFER_PERL_FUNCTIONS
66# define CORO_PREFER_PERL_FUNCTIONS 0
67#endif
68
69/* The next macros try to return the current stack pointer, in an as
70 * portable way as possible. */
71#if __GNUC__ >= 4
72# define dSTACKLEVEL int stacklevel_dummy
73# define STACKLEVEL __builtin_frame_address (0)
74#else
75# define dSTACKLEVEL volatile void *stacklevel
76# define STACKLEVEL ((void *)&stacklevel)
77#endif
78
79#define IN_DESTRUCT PL_dirty
80
81#if __GNUC__ >= 3
82# define attribute(x) __attribute__(x)
83# define expect(expr,value) __builtin_expect ((expr), (value))
84# define INLINE static inline
85#else
86# define attribute(x)
87# define expect(expr,value) (expr)
88# define INLINE static
89#endif
90
91#define expect_false(expr) expect ((expr) != 0, 0)
92#define expect_true(expr) expect ((expr) != 0, 1)
93
94#define NOINLINE attribute ((noinline))
95
96#include "CoroAPI.h"
97#define GCoroAPI (&coroapi) /* very sneaky */
98
99#ifdef USE_ITHREADS
100# if CORO_PTHREAD
101static void *coro_thx;
102# endif
103#endif
104
105#ifdef __linux
106# include <time.h> /* for timespec */
107# include <syscall.h> /* for SYS_* */
108# ifdef SYS_clock_gettime
109# define coro_clock_gettime(id, ts) syscall (SYS_clock_gettime, (id), (ts))
110# define CORO_CLOCK_MONOTONIC 1
111# define CORO_CLOCK_THREAD_CPUTIME_ID 3
112# endif
113#endif
114
115static double (*nvtime)(); /* so why doesn't it take void? */
116static void (*u2time)(pTHX_ UV ret[2]);
117
118/* we hijack an hopefully unused CV flag for our purposes */
119#define CVf_SLF 0x4000
120static OP *pp_slf (pTHX);
121
122static U32 cctx_gen;
123static size_t cctx_stacksize = CORO_STACKSIZE;
124static struct CoroAPI coroapi;
125static AV *main_mainstack; /* used to differentiate between $main and others */
126static JMPENV *main_top_env;
127static HV *coro_state_stash, *coro_stash;
128static volatile SV *coro_mortal; /* will be freed/thrown after next transfer */
129
130static AV *av_destroy; /* destruction queue */
131static SV *sv_manager; /* the manager coro */
132static SV *sv_idle; /* $Coro::idle */
133
134static GV *irsgv; /* $/ */
135static GV *stdoutgv; /* *STDOUT */
136static SV *rv_diehook;
137static SV *rv_warnhook;
138static HV *hv_sig; /* %SIG */
139
140/* async_pool helper stuff */
141static SV *sv_pool_rss;
142static SV *sv_pool_size;
143static SV *sv_async_pool_idle; /* description string */
144static AV *av_async_pool; /* idle pool */
145static SV *sv_Coro; /* class string */
146static CV *cv_pool_handler;
147static CV *cv_coro_state_new;
148
149/* Coro::AnyEvent */
150static SV *sv_activity;
151
152/* enable processtime/realtime profiling */
153static char enable_times;
154typedef U32 coro_ts[2];
155static coro_ts time_real, time_cpu;
156static char times_valid;
157
158static struct coro_cctx *cctx_first;
159static int cctx_count, cctx_idle;
160
161enum {
162 CC_MAPPED = 0x01,
163 CC_NOREUSE = 0x02, /* throw this away after tracing */
164 CC_TRACE = 0x04,
165 CC_TRACE_SUB = 0x08, /* trace sub calls */
166 CC_TRACE_LINE = 0x10, /* trace each statement */
167 CC_TRACE_ALL = CC_TRACE_SUB | CC_TRACE_LINE,
168};
169
170/* this is a structure representing a c-level coroutine */
171typedef struct coro_cctx
172{
173 struct coro_cctx *next;
174
175 /* the stack */
176 void *sptr;
177 size_t ssize;
178
179 /* cpu state */
180 void *idle_sp; /* sp of top-level transfer/schedule/cede call */
181 JMPENV *idle_te; /* same as idle_sp, but for top_env, TODO: remove once stable */
182 JMPENV *top_env;
183 coro_context cctx;
184
185 U32 gen;
186#if CORO_USE_VALGRIND
187 int valgrind_id;
188#endif
189 unsigned char flags;
190} coro_cctx;
191
192coro_cctx *cctx_current; /* the currently running cctx */
193
194/*****************************************************************************/
195
196enum {
197 CF_RUNNING = 0x0001, /* coroutine is running */
198 CF_READY = 0x0002, /* coroutine is ready */
199 CF_NEW = 0x0004, /* has never been switched to */
200 CF_DESTROYED = 0x0008, /* coroutine data has been freed */
201 CF_SUSPENDED = 0x0010, /* coroutine can't be scheduled */
202};
203
204/* the structure where most of the perl state is stored, overlaid on the cxstack */
205typedef struct
206{
207 SV *defsv;
208 AV *defav;
209 SV *errsv;
210 SV *irsgv;
211 HV *hinthv;
212#define VAR(name,type) type name;
213# include "state.h"
214#undef VAR
215} perl_slots;
216
217#define SLOT_COUNT ((sizeof (perl_slots) + sizeof (PERL_CONTEXT) - 1) / sizeof (PERL_CONTEXT))
218
219/* this is a structure representing a perl-level coroutine */
11struct coro { 220struct coro {
12 U8 dowarn; 221 /* the C coroutine allocated to this perl coroutine, if any */
13 AV *defav; 222 coro_cctx *cctx;
14 223
15 PERL_SI *curstackinfo; 224 /* ready queue */
16 AV *curstack; 225 struct coro *next_ready;
226
227 /* state data */
228 struct CoroSLF slf_frame; /* saved slf frame */
17 AV *mainstack; 229 AV *mainstack;
18 SV **stack_sp; 230 perl_slots *slot; /* basically the saved sp */
19 OP *op;
20 SV **curpad;
21 SV **stack_base;
22 SV **stack_max;
23 SV **tmps_stack;
24 I32 tmps_floor;
25 I32 tmps_ix;
26 I32 tmps_max;
27 I32 *markstack;
28 I32 *markstack_ptr;
29 I32 *markstack_max;
30 I32 *scopestack;
31 I32 scopestack_ix;
32 I32 scopestack_max;
33 ANY *savestack;
34 I32 savestack_ix;
35 I32 savestack_max;
36 OP **retstack;
37 I32 retstack_ix;
38 I32 retstack_max;
39 COP *curcop;
40 231
41 AV *args; 232 CV *startcv; /* the CV to execute */
233 AV *args; /* data associated with this coroutine (initial args) */
234 int refcnt; /* coroutines are refcounted, yes */
235 int flags; /* CF_ flags */
236 HV *hv; /* the perl hash associated with this coro, if any */
237 void (*on_destroy)(pTHX_ struct coro *coro);
238
239 /* statistics */
240 int usecount; /* number of transfers to this coro */
241
242 /* coro process data */
243 int prio;
244 SV *except; /* exception to be thrown */
245 SV *rouse_cb;
246
247 /* async_pool */
248 SV *saved_deffh;
249 SV *invoke_cb;
250 AV *invoke_av;
251
252 /* on_enter/on_leave */
253 AV *on_enter;
254 AV *on_leave;
255
256 /* times */
257 coro_ts t_cpu, t_real;
258
259 /* linked list */
260 struct coro *next, *prev;
42}; 261};
43 262
44typedef struct coro *Coro__State; 263typedef struct coro *Coro__State;
45typedef struct coro *Coro__State_or_hashref; 264typedef struct coro *Coro__State_or_hashref;
46 265
47static HV *padlist_cache; 266/* the following variables are effectively part of the perl context */
267/* and get copied between struct coro and these variables */
268/* the mainr easonw e don't support windows process emulation */
269static struct CoroSLF slf_frame; /* the current slf frame */
48 270
49/* mostly copied from op.c:cv_clone2 */ 271/** Coro ********************************************************************/
50STATIC AV * 272
51clone_padlist (AV *protopadlist) 273#define CORO_PRIO_MAX 3
274#define CORO_PRIO_HIGH 1
275#define CORO_PRIO_NORMAL 0
276#define CORO_PRIO_LOW -1
277#define CORO_PRIO_IDLE -3
278#define CORO_PRIO_MIN -4
279
280/* for Coro.pm */
281static SV *coro_current;
282static SV *coro_readyhook;
283static struct coro *coro_ready [CORO_PRIO_MAX - CORO_PRIO_MIN + 1][2]; /* head|tail */
284static CV *cv_coro_run, *cv_coro_terminate;
285static struct coro *coro_first;
286#define coro_nready coroapi.nready
287
288/** Coro::Select ************************************************************/
289
290static OP *(*coro_old_pp_sselect) (pTHX);
291static SV *coro_select_select;
292
293/* horrible hack, but if it works... */
294static OP *
295coro_pp_sselect (aTHX)
52{ 296{
53 AV *av; 297 dSP;
54 I32 ix; 298 PUSHMARK (SP - 4); /* fake argument list */
55 AV *protopad_name = (AV *) * av_fetch (protopadlist, 0, FALSE); 299 XPUSHs (coro_select_select);
56 AV *protopad = (AV *) * av_fetch (protopadlist, 1, FALSE); 300 PUTBACK;
57 SV **pname = AvARRAY (protopad_name); 301
58 SV **ppad = AvARRAY (protopad); 302 /* entersub is an UNOP, select a LISTOP... keep your fingers crossed */
59 I32 fname = AvFILLp (protopad_name); 303 PL_op->op_flags |= OPf_STACKED;
60 I32 fpad = AvFILLp (protopad); 304 PL_op->op_private = 0;
305 return PL_ppaddr [OP_ENTERSUB](aTHX);
306}
307
308/** lowlevel stuff **********************************************************/
309
310static SV *
311coro_get_sv (pTHX_ const char *name, int create)
312{
313#if PERL_VERSION_ATLEAST (5,10,0)
314 /* silence stupid and wrong 5.10 warning that I am unable to switch off */
315 get_sv (name, create);
316#endif
317 return get_sv (name, create);
318}
319
320static AV *
321coro_get_av (pTHX_ const char *name, int create)
322{
323#if PERL_VERSION_ATLEAST (5,10,0)
324 /* silence stupid and wrong 5.10 warning that I am unable to switch off */
325 get_av (name, create);
326#endif
327 return get_av (name, create);
328}
329
330static HV *
331coro_get_hv (pTHX_ const char *name, int create)
332{
333#if PERL_VERSION_ATLEAST (5,10,0)
334 /* silence stupid and wrong 5.10 warning that I am unable to switch off */
335 get_hv (name, create);
336#endif
337 return get_hv (name, create);
338}
339
340INLINE void
341coro_times_update ()
342{
343#ifdef coro_clock_gettime
344 struct timespec ts;
345
346 ts.tv_sec = ts.tv_nsec = 0;
347 coro_clock_gettime (CORO_CLOCK_THREAD_CPUTIME_ID, &ts);
348 time_cpu [0] = ts.tv_sec; time_cpu [1] = ts.tv_nsec;
349
350 ts.tv_sec = ts.tv_nsec = 0;
351 coro_clock_gettime (CORO_CLOCK_MONOTONIC, &ts);
352 time_real [0] = ts.tv_sec; time_real [1] = ts.tv_nsec;
353#else
354 dTHX;
355 UV tv[2];
356
357 u2time (aTHX_ tv);
358 time_real [0] = tv [0];
359 time_real [1] = tv [1] * 1000;
360#endif
361}
362
363INLINE void
364coro_times_add (struct coro *c)
365{
366 c->t_real [1] += time_real [1];
367 if (c->t_real [1] > 1000000000) { c->t_real [1] -= 1000000000; ++c->t_real [0]; }
368 c->t_real [0] += time_real [0];
369
370 c->t_cpu [1] += time_cpu [1];
371 if (c->t_cpu [1] > 1000000000) { c->t_cpu [1] -= 1000000000; ++c->t_cpu [0]; }
372 c->t_cpu [0] += time_cpu [0];
373}
374
375INLINE void
376coro_times_sub (struct coro *c)
377{
378 if (c->t_real [1] < time_real [1]) { c->t_real [1] += 1000000000; --c->t_real [0]; }
379 c->t_real [1] -= time_real [1];
380 c->t_real [0] -= time_real [0];
381
382 if (c->t_cpu [1] < time_cpu [1]) { c->t_cpu [1] += 1000000000; --c->t_cpu [0]; }
383 c->t_cpu [1] -= time_cpu [1];
384 c->t_cpu [0] -= time_cpu [0];
385}
386
387/*****************************************************************************/
388/* magic glue */
389
390#define CORO_MAGIC_type_cv 26
391#define CORO_MAGIC_type_state PERL_MAGIC_ext
392
393#define CORO_MAGIC_NN(sv, type) \
394 (expect_true (SvMAGIC (sv)->mg_type == type) \
395 ? SvMAGIC (sv) \
396 : mg_find (sv, type))
397
398#define CORO_MAGIC(sv, type) \
399 (expect_true (SvMAGIC (sv)) \
400 ? CORO_MAGIC_NN (sv, type) \
401 : 0)
402
403#define CORO_MAGIC_cv(cv) CORO_MAGIC (((SV *)(cv)), CORO_MAGIC_type_cv)
404#define CORO_MAGIC_state(sv) CORO_MAGIC_NN (((SV *)(sv)), CORO_MAGIC_type_state)
405
406INLINE struct coro *
407SvSTATE_ (pTHX_ SV *coro)
408{
409 HV *stash;
410 MAGIC *mg;
411
412 if (SvROK (coro))
413 coro = SvRV (coro);
414
415 if (expect_false (SvTYPE (coro) != SVt_PVHV))
416 croak ("Coro::State object required");
417
418 stash = SvSTASH (coro);
419 if (expect_false (stash != coro_stash && stash != coro_state_stash))
420 {
421 /* very slow, but rare, check */
422 if (!sv_derived_from (sv_2mortal (newRV_inc (coro)), "Coro::State"))
423 croak ("Coro::State object required");
424 }
425
426 mg = CORO_MAGIC_state (coro);
427 return (struct coro *)mg->mg_ptr;
428}
429
430#define SvSTATE(sv) SvSTATE_ (aTHX_ (sv))
431
432/* faster than SvSTATE, but expects a coroutine hv */
433#define SvSTATE_hv(hv) ((struct coro *)CORO_MAGIC_NN ((SV *)hv, CORO_MAGIC_type_state)->mg_ptr)
434#define SvSTATE_current SvSTATE_hv (SvRV (coro_current))
435
436/*****************************************************************************/
437/* padlist management and caching */
438
439static AV *
440coro_derive_padlist (pTHX_ CV *cv)
441{
442 AV *padlist = CvPADLIST (cv);
61 AV *newpadlist, *newpad_name, *newpad; 443 AV *newpadlist, *newpad;
62 SV **npad;
63
64 newpad_name = newAV ();
65 for (ix = fname; ix >= 0; ix--)
66 av_store (newpad_name, ix, SvREFCNT_inc (pname[ix]));
67
68 newpad = newAV ();
69 av_fill (newpad, AvFILLp (protopad));
70 npad = AvARRAY (newpad);
71 444
72 newpadlist = newAV (); 445 newpadlist = newAV ();
73 AvREAL_off (newpadlist); 446 AvREAL_off (newpadlist);
74 av_store (newpadlist, 0, (SV *) newpad_name); 447#if PERL_VERSION_ATLEAST (5,10,0)
448 Perl_pad_push (aTHX_ padlist, AvFILLp (padlist) + 1);
449#else
450 Perl_pad_push (aTHX_ padlist, AvFILLp (padlist) + 1, 1);
451#endif
452 newpad = (AV *)AvARRAY (padlist)[AvFILLp (padlist)];
453 --AvFILLp (padlist);
454
455 av_store (newpadlist, 0, SvREFCNT_inc_NN (AvARRAY (padlist)[0]));
75 av_store (newpadlist, 1, (SV *) newpad); 456 av_store (newpadlist, 1, (SV *)newpad);
76 457
77 av = newAV (); /* will be @_ */ 458 return newpadlist;
78 av_extend (av, 0); 459}
79 av_store (newpad, 0, (SV *) av);
80 AvFLAGS (av) = AVf_REIFY;
81 460
82 for (ix = fpad; ix > 0; ix--) 461static void
462free_padlist (pTHX_ AV *padlist)
463{
464 /* may be during global destruction */
465 if (!IN_DESTRUCT)
83 { 466 {
84 SV *namesv = (ix <= fname) ? pname[ix] : Nullsv; 467 I32 i = AvFILLp (padlist);
85 if (namesv && namesv != &PL_sv_undef) 468
469 while (i > 0) /* special-case index 0 */
86 { 470 {
87 char *name = SvPVX (namesv); /* XXX */ 471 /* we try to be extra-careful here */
88 if (SvFLAGS (namesv) & SVf_FAKE || *name == '&') 472 AV *av = (AV *)AvARRAY (padlist)[i--];
89 { /* lexical from outside? */ 473 I32 j = AvFILLp (av);
90 npad[ix] = SvREFCNT_inc (ppad[ix]); 474
91 } 475 while (j >= 0)
92 else 476 SvREFCNT_dec (AvARRAY (av)[j--]);
93 { /* our own lexical */ 477
94 SV *sv; 478 AvFILLp (av) = -1;
95 if (*name == '&') 479 SvREFCNT_dec (av);
96 sv = SvREFCNT_inc (ppad[ix]);
97 else if (*name == '@')
98 sv = (SV *) newAV ();
99 else if (*name == '%')
100 sv = (SV *) newHV ();
101 else
102 sv = NEWSV (0, 0);
103 if (!SvPADBUSY (sv))
104 SvPADMY_on (sv);
105 npad[ix] = sv;
106 }
107 } 480 }
108 else if (IS_PADGV (ppad[ix]) || IS_PADCONST (ppad[ix])) 481
109 { 482 SvREFCNT_dec (AvARRAY (padlist)[0]);
110 npad[ix] = SvREFCNT_inc (ppad[ix]); 483
111 } 484 AvFILLp (padlist) = -1;
485 SvREFCNT_dec ((SV*)padlist);
486 }
487}
488
489static int
490coro_cv_free (pTHX_ SV *sv, MAGIC *mg)
491{
492 AV *padlist;
493 AV *av = (AV *)mg->mg_obj;
494
495 /* perl manages to free our internal AV and _then_ call us */
496 if (IN_DESTRUCT)
497 return 0;
498
499 /* casting is fun. */
500 while (&PL_sv_undef != (SV *)(padlist = (AV *)av_pop (av)))
501 free_padlist (aTHX_ padlist);
502
503 SvREFCNT_dec (av); /* sv_magicext increased the refcount */
504
505 return 0;
506}
507
508static MGVTBL coro_cv_vtbl = {
509 0, 0, 0, 0,
510 coro_cv_free
511};
512
513/* the next two functions merely cache the padlists */
514static void
515get_padlist (pTHX_ CV *cv)
516{
517 MAGIC *mg = CORO_MAGIC_cv (cv);
518 AV *av;
519
520 if (expect_true (mg && AvFILLp ((av = (AV *)mg->mg_obj)) >= 0))
521 CvPADLIST (cv) = (AV *)AvARRAY (av)[AvFILLp (av)--];
112 else 522 else
113 { 523 {
114 SV *sv = NEWSV (0, 0); 524#if CORO_PREFER_PERL_FUNCTIONS
115 SvPADTMP_on (sv); 525 /* this is probably cleaner? but also slower! */
116 npad[ix] = sv; 526 /* in practise, it seems to be less stable */
117 } 527 CV *cp = Perl_cv_clone (aTHX_ cv);
118 } 528 CvPADLIST (cv) = CvPADLIST (cp);
119 529 CvPADLIST (cp) = 0;
120#if 0 /* NONOTUNDERSTOOD */ 530 SvREFCNT_dec (cp);
121 /* Now that vars are all in place, clone nested closures. */ 531#else
122 532 CvPADLIST (cv) = coro_derive_padlist (aTHX_ cv);
123 for (ix = fpad; ix > 0; ix--) {
124 SV* namesv = (ix <= fname) ? pname[ix] : Nullsv;
125 if (namesv
126 && namesv != &PL_sv_undef
127 && !(SvFLAGS(namesv) & SVf_FAKE)
128 && *SvPVX(namesv) == '&'
129 && CvCLONE(ppad[ix]))
130 {
131 CV *kid = cv_clone((CV*)ppad[ix]);
132 SvREFCNT_dec(ppad[ix]);
133 CvCLONE_on(kid);
134 SvPADMY_on(kid);
135 npad[ix] = (SV*)kid;
136 }
137 }
138#endif 533#endif
139 534 }
140 return newpadlist;
141} 535}
142 536
143STATIC AV * 537static void
144free_padlist (AV *padlist) 538put_padlist (pTHX_ CV *cv)
145{ 539{
146 /* may be during global destruction */ 540 MAGIC *mg = CORO_MAGIC_cv (cv);
147 if (SvREFCNT(padlist)) 541 AV *av;
542
543 if (expect_false (!mg))
544 mg = sv_magicext ((SV *)cv, (SV *)newAV (), CORO_MAGIC_type_cv, &coro_cv_vtbl, 0, 0);
545
546 av = (AV *)mg->mg_obj;
547
548 if (expect_false (AvFILLp (av) >= AvMAX (av)))
549 av_extend (av, AvFILLp (av) + 1);
550
551 AvARRAY (av)[++AvFILLp (av)] = (SV *)CvPADLIST (cv);
552}
553
554/** load & save, init *******************************************************/
555
556static void
557on_enterleave_call (pTHX_ SV *cb);
558
559static void
560load_perl (pTHX_ Coro__State c)
561{
562 perl_slots *slot = c->slot;
563 c->slot = 0;
564
565 PL_mainstack = c->mainstack;
566
567 GvSV (PL_defgv) = slot->defsv;
568 GvAV (PL_defgv) = slot->defav;
569 GvSV (PL_errgv) = slot->errsv;
570 GvSV (irsgv) = slot->irsgv;
571 GvHV (PL_hintgv) = slot->hinthv;
572
573 #define VAR(name,type) PL_ ## name = slot->name;
574 # include "state.h"
575 #undef VAR
576
148 { 577 {
149 I32 i = AvFILLp(padlist); 578 dSP;
150 while (i >= 0) 579
580 CV *cv;
581
582 /* now do the ugly restore mess */
583 while (expect_true (cv = (CV *)POPs))
151 { 584 {
152 SV **svp = av_fetch(padlist, i--, FALSE); 585 put_padlist (aTHX_ cv); /* mark this padlist as available */
153 SV *sv = svp ? *svp : Nullsv; 586 CvDEPTH (cv) = PTR2IV (POPs);
154 if (sv) 587 CvPADLIST (cv) = (AV *)POPs;
155 SvREFCNT_dec(sv);
156 } 588 }
157 589
158 SvREFCNT_dec((SV*)padlist); 590 PUTBACK;
159 } 591 }
160}
161 592
162STATIC AV * 593 slf_frame = c->slf_frame;
163unuse_padlist (AV *padlist) 594 CORO_THROW = c->except;
164{
165 free_padlist (padlist);
166}
167 595
596 if (expect_false (enable_times))
597 {
598 if (expect_false (!times_valid))
599 coro_times_update ();
600
601 coro_times_sub (c);
602 }
603
604 if (expect_false (c->on_enter))
605 {
606 int i;
607
608 for (i = 0; i <= AvFILLp (c->on_enter); ++i)
609 on_enterleave_call (aTHX_ AvARRAY (c->on_enter)[i]);
610 }
611}
612
168static void 613static void
169SAVE(pTHX_ Coro__State c) 614save_perl (pTHX_ Coro__State c)
170{ 615{
616 if (expect_false (c->on_leave))
617 {
618 int i;
619
620 for (i = AvFILLp (c->on_leave); i >= 0; --i)
621 on_enterleave_call (aTHX_ AvARRAY (c->on_leave)[i]);
622 }
623
624 times_valid = 0;
625
626 if (expect_false (enable_times))
627 {
628 coro_times_update (); times_valid = 1;
629 coro_times_add (c);
630 }
631
632 c->except = CORO_THROW;
633 c->slf_frame = slf_frame;
634
171 { 635 {
172 dSP; 636 dSP;
173 I32 cxix = cxstack_ix; 637 I32 cxix = cxstack_ix;
638 PERL_CONTEXT *ccstk = cxstack;
174 PERL_SI *top_si = PL_curstackinfo; 639 PERL_SI *top_si = PL_curstackinfo;
175 PERL_CONTEXT *ccstk = cxstack;
176 640
177 /* 641 /*
178 * the worst thing you can imagine happens first - we have to save 642 * the worst thing you can imagine happens first - we have to save
179 * (and reinitialize) all cv's in the whole callchain :( 643 * (and reinitialize) all cv's in the whole callchain :(
180 */ 644 */
181 645
182 PUSHs (Nullsv); 646 XPUSHs (Nullsv);
183 /* this loop was inspired by pp_caller */ 647 /* this loop was inspired by pp_caller */
184 for (;;) 648 for (;;)
185 { 649 {
186 while (cxix >= 0) 650 while (expect_true (cxix >= 0))
187 { 651 {
188 PERL_CONTEXT *cx = &ccstk[--cxix]; 652 PERL_CONTEXT *cx = &ccstk[cxix--];
189 653
190 if (CxTYPE(cx) == CXt_SUB) 654 if (expect_true (CxTYPE (cx) == CXt_SUB) || expect_false (CxTYPE (cx) == CXt_FORMAT))
191 { 655 {
192 CV *cv = cx->blk_sub.cv; 656 CV *cv = cx->blk_sub.cv;
657
193 if (CvDEPTH(cv)) 658 if (expect_true (CvDEPTH (cv)))
194 { 659 {
195#ifdef USE_THREADS
196 XPUSHs ((SV *)CvOWNER(cv));
197#endif
198 EXTEND (SP, 3); 660 EXTEND (SP, 3);
199 PUSHs ((SV *)CvDEPTH(cv));
200 PUSHs ((SV *)CvPADLIST(cv)); 661 PUSHs ((SV *)CvPADLIST (cv));
662 PUSHs (INT2PTR (SV *, (IV)CvDEPTH (cv)));
201 PUSHs ((SV *)cv); 663 PUSHs ((SV *)cv);
202 664
203 CvPADLIST(cv) = clone_padlist (CvPADLIST(cv));
204
205 CvDEPTH(cv) = 0; 665 CvDEPTH (cv) = 0;
206#ifdef USE_THREADS 666 get_padlist (aTHX_ cv);
207 CvOWNER(cv) = 0;
208 error must unlock this cv etc.. etc...
209 if you are here wondering about this error message then
210 the reason is that it will not work as advertised yet
211#endif
212 } 667 }
213 } 668 }
214 else if (CxTYPE(cx) == CXt_FORMAT) 669 }
670
671 if (expect_true (top_si->si_type == PERLSI_MAIN))
672 break;
673
674 top_si = top_si->si_prev;
675 ccstk = top_si->si_cxstack;
676 cxix = top_si->si_cxix;
677 }
678
679 PUTBACK;
680 }
681
682 /* allocate some space on the context stack for our purposes */
683 /* we manually unroll here, as usually 2 slots is enough */
684 if (SLOT_COUNT >= 1) CXINC;
685 if (SLOT_COUNT >= 2) CXINC;
686 if (SLOT_COUNT >= 3) CXINC;
687 {
688 unsigned int i;
689 for (i = 3; i < SLOT_COUNT; ++i)
690 CXINC;
691 }
692 cxstack_ix -= SLOT_COUNT; /* undo allocation */
693
694 c->mainstack = PL_mainstack;
695
696 {
697 perl_slots *slot = c->slot = (perl_slots *)(cxstack + cxstack_ix + 1);
698
699 slot->defav = GvAV (PL_defgv);
700 slot->defsv = DEFSV;
701 slot->errsv = ERRSV;
702 slot->irsgv = GvSV (irsgv);
703 slot->hinthv = GvHV (PL_hintgv);
704
705 #define VAR(name,type) slot->name = PL_ ## name;
706 # include "state.h"
707 #undef VAR
708 }
709}
710
711/*
712 * allocate various perl stacks. This is almost an exact copy
713 * of perl.c:init_stacks, except that it uses less memory
714 * on the (sometimes correct) assumption that coroutines do
715 * not usually need a lot of stackspace.
716 */
717#if CORO_PREFER_PERL_FUNCTIONS
718# define coro_init_stacks(thx) init_stacks ()
719#else
720static void
721coro_init_stacks (pTHX)
722{
723 PL_curstackinfo = new_stackinfo(32, 8);
724 PL_curstackinfo->si_type = PERLSI_MAIN;
725 PL_curstack = PL_curstackinfo->si_stack;
726 PL_mainstack = PL_curstack; /* remember in case we switch stacks */
727
728 PL_stack_base = AvARRAY(PL_curstack);
729 PL_stack_sp = PL_stack_base;
730 PL_stack_max = PL_stack_base + AvMAX(PL_curstack);
731
732 New(50,PL_tmps_stack,32,SV*);
733 PL_tmps_floor = -1;
734 PL_tmps_ix = -1;
735 PL_tmps_max = 32;
736
737 New(54,PL_markstack,16,I32);
738 PL_markstack_ptr = PL_markstack;
739 PL_markstack_max = PL_markstack + 16;
740
741#ifdef SET_MARK_OFFSET
742 SET_MARK_OFFSET;
743#endif
744
745 New(54,PL_scopestack,8,I32);
746 PL_scopestack_ix = 0;
747 PL_scopestack_max = 8;
748
749 New(54,PL_savestack,24,ANY);
750 PL_savestack_ix = 0;
751 PL_savestack_max = 24;
752
753#if !PERL_VERSION_ATLEAST (5,10,0)
754 New(54,PL_retstack,4,OP*);
755 PL_retstack_ix = 0;
756 PL_retstack_max = 4;
757#endif
758}
759#endif
760
761/*
762 * destroy the stacks, the callchain etc...
763 */
764static void
765coro_destruct_stacks (pTHX)
766{
767 while (PL_curstackinfo->si_next)
768 PL_curstackinfo = PL_curstackinfo->si_next;
769
770 while (PL_curstackinfo)
771 {
772 PERL_SI *p = PL_curstackinfo->si_prev;
773
774 if (!IN_DESTRUCT)
775 SvREFCNT_dec (PL_curstackinfo->si_stack);
776
777 Safefree (PL_curstackinfo->si_cxstack);
778 Safefree (PL_curstackinfo);
779 PL_curstackinfo = p;
780 }
781
782 Safefree (PL_tmps_stack);
783 Safefree (PL_markstack);
784 Safefree (PL_scopestack);
785 Safefree (PL_savestack);
786#if !PERL_VERSION_ATLEAST (5,10,0)
787 Safefree (PL_retstack);
788#endif
789}
790
791#define CORO_RSS \
792 rss += sizeof (SYM (curstackinfo)); \
793 rss += (SYM (curstackinfo->si_cxmax) + 1) * sizeof (PERL_CONTEXT); \
794 rss += sizeof (SV) + sizeof (struct xpvav) + (1 + AvMAX (SYM (curstack))) * sizeof (SV *); \
795 rss += SYM (tmps_max) * sizeof (SV *); \
796 rss += (SYM (markstack_max) - SYM (markstack_ptr)) * sizeof (I32); \
797 rss += SYM (scopestack_max) * sizeof (I32); \
798 rss += SYM (savestack_max) * sizeof (ANY);
799
800static size_t
801coro_rss (pTHX_ struct coro *coro)
802{
803 size_t rss = sizeof (*coro);
804
805 if (coro->mainstack)
806 {
807 if (coro->flags & CF_RUNNING)
808 {
809 #define SYM(sym) PL_ ## sym
810 CORO_RSS;
811 #undef SYM
812 }
813 else
814 {
815 #define SYM(sym) coro->slot->sym
816 CORO_RSS;
817 #undef SYM
818 }
819 }
820
821 return rss;
822}
823
824/** coroutine stack handling ************************************************/
825
826static int (*orig_sigelem_get) (pTHX_ SV *sv, MAGIC *mg);
827static int (*orig_sigelem_set) (pTHX_ SV *sv, MAGIC *mg);
828static int (*orig_sigelem_clr) (pTHX_ SV *sv, MAGIC *mg);
829
830/* apparently < 5.8.8 */
831#ifndef MgPV_nolen_const
832#define MgPV_nolen_const(mg) (((((int)(mg)->mg_len)) == HEf_SVKEY) ? \
833 SvPV_nolen((SV*)((mg)->mg_ptr)) : \
834 (const char*)(mg)->mg_ptr)
835#endif
836
837/*
838 * This overrides the default magic get method of %SIG elements.
839 * The original one doesn't provide for reading back of PL_diehook/PL_warnhook
840 * and instead of trying to save and restore the hash elements, we just provide
841 * readback here.
842 */
843static int
844coro_sigelem_get (pTHX_ SV *sv, MAGIC *mg)
845{
846 const char *s = MgPV_nolen_const (mg);
847
848 if (*s == '_')
849 {
850 SV **svp = 0;
851
852 if (strEQ (s, "__DIE__" )) svp = &PL_diehook;
853 if (strEQ (s, "__WARN__")) svp = &PL_warnhook;
854
855 if (svp)
856 {
857 sv_setsv (sv, *svp ? *svp : &PL_sv_undef);
858 return 0;
859 }
860 }
861
862 return orig_sigelem_get ? orig_sigelem_get (aTHX_ sv, mg) : 0;
863}
864
865static int
866coro_sigelem_clr (pTHX_ SV *sv, MAGIC *mg)
867{
868 const char *s = MgPV_nolen_const (mg);
869
870 if (*s == '_')
871 {
872 SV **svp = 0;
873
874 if (strEQ (s, "__DIE__" )) svp = &PL_diehook;
875 if (strEQ (s, "__WARN__")) svp = &PL_warnhook;
876
877 if (svp)
878 {
879 SV *old = *svp;
880 *svp = 0;
881 SvREFCNT_dec (old);
882 return 0;
883 }
884 }
885
886 return orig_sigelem_clr ? orig_sigelem_clr (aTHX_ sv, mg) : 0;
887}
888
889static int
890coro_sigelem_set (pTHX_ SV *sv, MAGIC *mg)
891{
892 const char *s = MgPV_nolen_const (mg);
893
894 if (*s == '_')
895 {
896 SV **svp = 0;
897
898 if (strEQ (s, "__DIE__" )) svp = &PL_diehook;
899 if (strEQ (s, "__WARN__")) svp = &PL_warnhook;
900
901 if (svp)
902 {
903 SV *old = *svp;
904 *svp = SvOK (sv) ? newSVsv (sv) : 0;
905 SvREFCNT_dec (old);
906 return 0;
907 }
908 }
909
910 return orig_sigelem_set ? orig_sigelem_set (aTHX_ sv, mg) : 0;
911}
912
913static void
914prepare_nop (pTHX_ struct coro_transfer_args *ta)
915{
916 /* kind of mega-hacky, but works */
917 ta->next = ta->prev = (struct coro *)ta;
918}
919
920static int
921slf_check_nop (pTHX_ struct CoroSLF *frame)
922{
923 return 0;
924}
925
926static int
927slf_check_repeat (pTHX_ struct CoroSLF *frame)
928{
929 return 1;
930}
931
932static UNOP coro_setup_op;
933
934static void NOINLINE /* noinline to keep it out of the transfer fast path */
935coro_setup (pTHX_ struct coro *coro)
936{
937 /*
938 * emulate part of the perl startup here.
939 */
940 coro_init_stacks (aTHX);
941
942 PL_runops = RUNOPS_DEFAULT;
943 PL_curcop = &PL_compiling;
944 PL_in_eval = EVAL_NULL;
945 PL_comppad = 0;
946 PL_comppad_name = 0;
947 PL_comppad_name_fill = 0;
948 PL_comppad_name_floor = 0;
949 PL_curpm = 0;
950 PL_curpad = 0;
951 PL_localizing = 0;
952 PL_dirty = 0;
953 PL_restartop = 0;
954#if PERL_VERSION_ATLEAST (5,10,0)
955 PL_parser = 0;
956#endif
957 PL_hints = 0;
958
959 /* recreate the die/warn hooks */
960 PL_diehook = 0; SvSetMagicSV (*hv_fetch (hv_sig, "__DIE__" , sizeof ("__DIE__" ) - 1, 1), rv_diehook );
961 PL_warnhook = 0; SvSetMagicSV (*hv_fetch (hv_sig, "__WARN__", sizeof ("__WARN__") - 1, 1), rv_warnhook);
962
963 GvSV (PL_defgv) = newSV (0);
964 GvAV (PL_defgv) = coro->args; coro->args = 0;
965 GvSV (PL_errgv) = newSV (0);
966 GvSV (irsgv) = newSVpvn ("\n", 1); sv_magic (GvSV (irsgv), (SV *)irsgv, PERL_MAGIC_sv, "/", 0);
967 GvHV (PL_hintgv) = 0;
968 PL_rs = newSVsv (GvSV (irsgv));
969 PL_defoutgv = (GV *)SvREFCNT_inc_NN (stdoutgv);
970
971 {
972 dSP;
973 UNOP myop;
974
975 Zero (&myop, 1, UNOP);
976 myop.op_next = Nullop;
977 myop.op_type = OP_ENTERSUB;
978 myop.op_flags = OPf_WANT_VOID;
979
980 PUSHMARK (SP);
981 PUSHs ((SV *)coro->startcv);
982 PUTBACK;
983 PL_op = (OP *)&myop;
984 PL_op = PL_ppaddr[OP_ENTERSUB](aTHX);
985 }
986
987 /* this newly created coroutine might be run on an existing cctx which most
988 * likely was suspended in pp_slf, so we have to emulate entering pp_slf here.
989 */
990 slf_frame.prepare = prepare_nop; /* provide a nop function for an eventual pp_slf */
991 slf_frame.check = slf_check_nop; /* signal pp_slf to not repeat */
992
993 /* and we have to provide the pp_slf op in any case, so pp_slf can skip it */
994 coro_setup_op.op_next = PL_op;
995 coro_setup_op.op_type = OP_ENTERSUB;
996 coro_setup_op.op_ppaddr = pp_slf;
997 /* no flags etc. required, as an init function won't be called */
998
999 PL_op = (OP *)&coro_setup_op;
1000
1001 /* copy throw, in case it was set before coro_setup */
1002 CORO_THROW = coro->except;
1003
1004 if (expect_false (enable_times))
1005 {
1006 coro_times_update ();
1007 coro_times_sub (coro);
1008 }
1009}
1010
1011static void
1012coro_unwind_stacks (pTHX)
1013{
1014 if (!IN_DESTRUCT)
1015 {
1016 /* restore all saved variables and stuff */
1017 LEAVE_SCOPE (0);
1018 assert (PL_tmps_floor == -1);
1019
1020 /* free all temporaries */
1021 FREETMPS;
1022 assert (PL_tmps_ix == -1);
1023
1024 /* unwind all extra stacks */
1025 POPSTACK_TO (PL_mainstack);
1026
1027 /* unwind main stack */
1028 dounwind (-1);
1029 }
1030}
1031
1032static void
1033coro_destruct_perl (pTHX_ struct coro *coro)
1034{
1035 SV *svf [9];
1036
1037 {
1038 struct coro *current = SvSTATE_current;
1039
1040 assert (("FATAL: tried to destroy currently running coroutine", coro->mainstack != PL_mainstack));
1041
1042 save_perl (aTHX_ current);
1043 load_perl (aTHX_ coro);
1044
1045 coro_unwind_stacks (aTHX);
1046 coro_destruct_stacks (aTHX);
1047
1048 // now save some sv's to be free'd later
1049 svf [0] = GvSV (PL_defgv);
1050 svf [1] = (SV *)GvAV (PL_defgv);
1051 svf [2] = GvSV (PL_errgv);
1052 svf [3] = (SV *)PL_defoutgv;
1053 svf [4] = PL_rs;
1054 svf [5] = GvSV (irsgv);
1055 svf [6] = (SV *)GvHV (PL_hintgv);
1056 svf [7] = PL_diehook;
1057 svf [8] = PL_warnhook;
1058 assert (9 == sizeof (svf) / sizeof (*svf));
1059
1060 load_perl (aTHX_ current);
1061 }
1062
1063 {
1064 unsigned int i;
1065
1066 for (i = 0; i < sizeof (svf) / sizeof (*svf); ++i)
1067 SvREFCNT_dec (svf [i]);
1068
1069 SvREFCNT_dec (coro->saved_deffh);
1070 SvREFCNT_dec (coro->rouse_cb);
1071 SvREFCNT_dec (coro->invoke_cb);
1072 SvREFCNT_dec (coro->invoke_av);
1073 }
1074}
1075
1076INLINE void
1077free_coro_mortal (pTHX)
1078{
1079 if (expect_true (coro_mortal))
1080 {
1081 SvREFCNT_dec (coro_mortal);
1082 coro_mortal = 0;
1083 }
1084}
1085
1086static int
1087runops_trace (pTHX)
1088{
1089 COP *oldcop = 0;
1090 int oldcxix = -2;
1091
1092 while ((PL_op = CALL_FPTR (PL_op->op_ppaddr) (aTHX)))
1093 {
1094 PERL_ASYNC_CHECK ();
1095
1096 if (cctx_current->flags & CC_TRACE_ALL)
1097 {
1098 if (PL_op->op_type == OP_LEAVESUB && cctx_current->flags & CC_TRACE_SUB)
1099 {
1100 PERL_CONTEXT *cx = &cxstack[cxstack_ix];
1101 SV **bot, **top;
1102 AV *av = newAV (); /* return values */
1103 SV **cb;
1104 dSP;
1105
1106 GV *gv = CvGV (cx->blk_sub.cv);
1107 SV *fullname = sv_2mortal (newSV (0));
1108 if (isGV (gv))
1109 gv_efullname3 (fullname, gv, 0);
1110
1111 bot = PL_stack_base + cx->blk_oldsp + 1;
1112 top = cx->blk_gimme == G_ARRAY ? SP + 1
1113 : cx->blk_gimme == G_SCALAR ? bot + 1
1114 : bot;
1115
1116 av_extend (av, top - bot);
1117 while (bot < top)
1118 av_push (av, SvREFCNT_inc_NN (*bot++));
1119
1120 PL_runops = RUNOPS_DEFAULT;
1121 ENTER;
1122 SAVETMPS;
1123 EXTEND (SP, 3);
1124 PUSHMARK (SP);
1125 PUSHs (&PL_sv_no);
1126 PUSHs (fullname);
1127 PUSHs (sv_2mortal (newRV_noinc ((SV *)av)));
1128 PUTBACK;
1129 cb = hv_fetch ((HV *)SvRV (coro_current), "_trace_sub_cb", sizeof ("_trace_sub_cb") - 1, 0);
1130 if (cb) call_sv (*cb, G_KEEPERR | G_EVAL | G_VOID | G_DISCARD);
1131 SPAGAIN;
1132 FREETMPS;
1133 LEAVE;
1134 PL_runops = runops_trace;
1135 }
1136
1137 if (oldcop != PL_curcop)
1138 {
1139 oldcop = PL_curcop;
1140
1141 if (PL_curcop != &PL_compiling)
1142 {
1143 SV **cb;
1144
1145 if (oldcxix != cxstack_ix && cctx_current->flags & CC_TRACE_SUB)
1146 {
1147 PERL_CONTEXT *cx = &cxstack[cxstack_ix];
1148
1149 if (CxTYPE (cx) == CXt_SUB && oldcxix < cxstack_ix)
1150 {
1151 dSP;
1152 GV *gv = CvGV (cx->blk_sub.cv);
1153 SV *fullname = sv_2mortal (newSV (0));
1154
1155 if (isGV (gv))
1156 gv_efullname3 (fullname, gv, 0);
1157
1158 PL_runops = RUNOPS_DEFAULT;
1159 ENTER;
1160 SAVETMPS;
1161 EXTEND (SP, 3);
1162 PUSHMARK (SP);
1163 PUSHs (&PL_sv_yes);
1164 PUSHs (fullname);
1165 PUSHs (CxHASARGS (cx) ? sv_2mortal (newRV_inc ((SV *)cx->blk_sub.argarray)) : &PL_sv_undef);
1166 PUTBACK;
1167 cb = hv_fetch ((HV *)SvRV (coro_current), "_trace_sub_cb", sizeof ("_trace_sub_cb") - 1, 0);
1168 if (cb) call_sv (*cb, G_KEEPERR | G_EVAL | G_VOID | G_DISCARD);
1169 SPAGAIN;
1170 FREETMPS;
1171 LEAVE;
1172 PL_runops = runops_trace;
1173 }
1174
1175 oldcxix = cxstack_ix;
1176 }
1177
1178 if (cctx_current->flags & CC_TRACE_LINE)
1179 {
1180 dSP;
1181
1182 PL_runops = RUNOPS_DEFAULT;
1183 ENTER;
1184 SAVETMPS;
1185 EXTEND (SP, 3);
1186 PL_runops = RUNOPS_DEFAULT;
1187 PUSHMARK (SP);
1188 PUSHs (sv_2mortal (newSVpv (OutCopFILE (oldcop), 0)));
1189 PUSHs (sv_2mortal (newSViv (CopLINE (oldcop))));
1190 PUTBACK;
1191 cb = hv_fetch ((HV *)SvRV (coro_current), "_trace_line_cb", sizeof ("_trace_line_cb") - 1, 0);
1192 if (cb) call_sv (*cb, G_KEEPERR | G_EVAL | G_VOID | G_DISCARD);
1193 SPAGAIN;
1194 FREETMPS;
1195 LEAVE;
1196 PL_runops = runops_trace;
1197 }
1198 }
1199 }
1200 }
1201 }
1202
1203 TAINT_NOT;
1204 return 0;
1205}
1206
1207static struct CoroSLF cctx_ssl_frame;
1208
1209static void
1210slf_prepare_set_stacklevel (pTHX_ struct coro_transfer_args *ta)
1211{
1212 ta->prev = 0;
1213}
1214
1215static int
1216slf_check_set_stacklevel (pTHX_ struct CoroSLF *frame)
1217{
1218 *frame = cctx_ssl_frame;
1219
1220 return frame->check (aTHX_ frame); /* execute the restored frame - there must be one */
1221}
1222
1223/* initialises PL_top_env and injects a pseudo-slf-call to set the stacklevel */
1224static void NOINLINE
1225cctx_prepare (pTHX)
1226{
1227 PL_top_env = &PL_start_env;
1228
1229 if (cctx_current->flags & CC_TRACE)
1230 PL_runops = runops_trace;
1231
1232 /* we already must be executing an SLF op, there is no other valid way
1233 * that can lead to creation of a new cctx */
1234 assert (("FATAL: can't prepare slf-less cctx in Coro module (please report)",
1235 slf_frame.prepare && PL_op->op_ppaddr == pp_slf));
1236
1237 /* we must emulate leaving pp_slf, which is done inside slf_check_set_stacklevel */
1238 cctx_ssl_frame = slf_frame;
1239
1240 slf_frame.prepare = slf_prepare_set_stacklevel;
1241 slf_frame.check = slf_check_set_stacklevel;
1242}
1243
1244/* the tail of transfer: execute stuff we can only do after a transfer */
1245INLINE void
1246transfer_tail (pTHX)
1247{
1248 free_coro_mortal (aTHX);
1249}
1250
1251/*
1252 * this is a _very_ stripped down perl interpreter ;)
1253 */
1254static void
1255cctx_run (void *arg)
1256{
1257#ifdef USE_ITHREADS
1258# if CORO_PTHREAD
1259 PERL_SET_CONTEXT (coro_thx);
1260# endif
1261#endif
1262 {
1263 dTHX;
1264
1265 /* normally we would need to skip the entersub here */
1266 /* not doing so will re-execute it, which is exactly what we want */
1267 /* PL_nop = PL_nop->op_next */
1268
1269 /* inject a fake subroutine call to cctx_init */
1270 cctx_prepare (aTHX);
1271
1272 /* cctx_run is the alternative tail of transfer() */
1273 transfer_tail (aTHX);
1274
1275 /* somebody or something will hit me for both perl_run and PL_restartop */
1276 PL_restartop = PL_op;
1277 perl_run (PL_curinterp);
1278 /*
1279 * Unfortunately, there is no way to get at the return values of the
1280 * coro body here, as perl_run destroys these
1281 */
1282
1283 /*
1284 * If perl-run returns we assume exit() was being called or the coro
1285 * fell off the end, which seems to be the only valid (non-bug)
1286 * reason for perl_run to return. We try to exit by jumping to the
1287 * bootstrap-time "top" top_env, as we cannot restore the "main"
1288 * coroutine as Coro has no such concept.
1289 * This actually isn't valid with the pthread backend, but OSes requiring
1290 * that backend are too broken to do it in a standards-compliant way.
1291 */
1292 PL_top_env = main_top_env;
1293 JMPENV_JUMP (2); /* I do not feel well about the hardcoded 2 at all */
1294 }
1295}
1296
1297static coro_cctx *
1298cctx_new ()
1299{
1300 coro_cctx *cctx;
1301
1302 ++cctx_count;
1303 New (0, cctx, 1, coro_cctx);
1304
1305 cctx->gen = cctx_gen;
1306 cctx->flags = 0;
1307 cctx->idle_sp = 0; /* can be accessed by transfer between cctx_run and set_stacklevel, on throw */
1308
1309 return cctx;
1310}
1311
1312/* create a new cctx only suitable as source */
1313static coro_cctx *
1314cctx_new_empty ()
1315{
1316 coro_cctx *cctx = cctx_new ();
1317
1318 cctx->sptr = 0;
1319 coro_create (&cctx->cctx, 0, 0, 0, 0);
1320
1321 return cctx;
1322}
1323
1324/* create a new cctx suitable as destination/running a perl interpreter */
1325static coro_cctx *
1326cctx_new_run ()
1327{
1328 coro_cctx *cctx = cctx_new ();
1329 void *stack_start;
1330 size_t stack_size;
1331
1332#if HAVE_MMAP
1333 cctx->ssize = ((cctx_stacksize * sizeof (long) + PAGESIZE - 1) / PAGESIZE + CORO_STACKGUARD) * PAGESIZE;
1334 /* mmap supposedly does allocate-on-write for us */
1335 cctx->sptr = mmap (0, cctx->ssize, PROT_EXEC|PROT_READ|PROT_WRITE, MAP_PRIVATE|MAP_ANONYMOUS, 0, 0);
1336
1337 if (cctx->sptr != (void *)-1)
1338 {
1339 #if CORO_STACKGUARD
1340 mprotect (cctx->sptr, CORO_STACKGUARD * PAGESIZE, PROT_NONE);
1341 #endif
1342 stack_start = (char *)cctx->sptr + CORO_STACKGUARD * PAGESIZE;
1343 stack_size = cctx->ssize - CORO_STACKGUARD * PAGESIZE;
1344 cctx->flags |= CC_MAPPED;
1345 }
1346 else
1347#endif
1348 {
1349 cctx->ssize = cctx_stacksize * (long)sizeof (long);
1350 New (0, cctx->sptr, cctx_stacksize, long);
1351
1352 if (!cctx->sptr)
1353 {
1354 perror ("FATAL: unable to allocate stack for coroutine, exiting.");
1355 _exit (EXIT_FAILURE);
1356 }
1357
1358 stack_start = cctx->sptr;
1359 stack_size = cctx->ssize;
1360 }
1361
1362 #if CORO_USE_VALGRIND
1363 cctx->valgrind_id = VALGRIND_STACK_REGISTER ((char *)stack_start, (char *)stack_start + stack_size);
1364 #endif
1365
1366 coro_create (&cctx->cctx, cctx_run, (void *)cctx, stack_start, stack_size);
1367
1368 return cctx;
1369}
1370
1371static void
1372cctx_destroy (coro_cctx *cctx)
1373{
1374 if (!cctx)
1375 return;
1376
1377 assert (("FATAL: tried to destroy current cctx", cctx != cctx_current));//D temporary?
1378
1379 --cctx_count;
1380 coro_destroy (&cctx->cctx);
1381
1382 /* coro_transfer creates new, empty cctx's */
1383 if (cctx->sptr)
1384 {
1385 #if CORO_USE_VALGRIND
1386 VALGRIND_STACK_DEREGISTER (cctx->valgrind_id);
1387 #endif
1388
1389#if HAVE_MMAP
1390 if (cctx->flags & CC_MAPPED)
1391 munmap (cctx->sptr, cctx->ssize);
1392 else
1393#endif
1394 Safefree (cctx->sptr);
1395 }
1396
1397 Safefree (cctx);
1398}
1399
1400/* wether this cctx should be destructed */
1401#define CCTX_EXPIRED(cctx) ((cctx)->gen != cctx_gen || ((cctx)->flags & CC_NOREUSE))
1402
1403static coro_cctx *
1404cctx_get (pTHX)
1405{
1406 while (expect_true (cctx_first))
1407 {
1408 coro_cctx *cctx = cctx_first;
1409 cctx_first = cctx->next;
1410 --cctx_idle;
1411
1412 if (expect_true (!CCTX_EXPIRED (cctx)))
1413 return cctx;
1414
1415 cctx_destroy (cctx);
1416 }
1417
1418 return cctx_new_run ();
1419}
1420
1421static void
1422cctx_put (coro_cctx *cctx)
1423{
1424 assert (("FATAL: cctx_put called on non-initialised cctx in Coro (please report)", cctx->sptr));
1425
1426 /* free another cctx if overlimit */
1427 if (expect_false (cctx_idle >= cctx_max_idle))
1428 {
1429 coro_cctx *first = cctx_first;
1430 cctx_first = first->next;
1431 --cctx_idle;
1432
1433 cctx_destroy (first);
1434 }
1435
1436 ++cctx_idle;
1437 cctx->next = cctx_first;
1438 cctx_first = cctx;
1439}
1440
1441/** coroutine switching *****************************************************/
1442
1443static void
1444transfer_check (pTHX_ struct coro *prev, struct coro *next)
1445{
1446 /* TODO: throwing up here is considered harmful */
1447
1448 if (expect_true (prev != next))
1449 {
1450 if (expect_false (!(prev->flags & (CF_RUNNING | CF_NEW))))
1451 croak ("Coro::State::transfer called with a blocked prev Coro::State, but can only transfer from running or new states,");
1452
1453 if (expect_false (next->flags & (CF_RUNNING | CF_DESTROYED | CF_SUSPENDED)))
1454 croak ("Coro::State::transfer called with running, destroyed or suspended next Coro::State, but can only transfer to inactive states,");
1455
1456#if !PERL_VERSION_ATLEAST (5,10,0)
1457 if (expect_false (PL_lex_state != LEX_NOTPARSING))
1458 croak ("Coro::State::transfer called while parsing, but this is not supported in your perl version,");
1459#endif
1460 }
1461}
1462
1463/* always use the TRANSFER macro */
1464static void NOINLINE /* noinline so we have a fixed stackframe */
1465transfer (pTHX_ struct coro *prev, struct coro *next, int force_cctx)
1466{
1467 dSTACKLEVEL;
1468
1469 /* sometimes transfer is only called to set idle_sp */
1470 if (expect_false (!prev))
1471 {
1472 cctx_current->idle_sp = STACKLEVEL;
1473 assert (cctx_current->idle_te = PL_top_env); /* just for the side-effect when asserts are enabled */
1474 }
1475 else if (expect_true (prev != next))
1476 {
1477 coro_cctx *cctx_prev;
1478
1479 if (expect_false (prev->flags & CF_NEW))
1480 {
1481 /* create a new empty/source context */
1482 prev->flags &= ~CF_NEW;
1483 prev->flags |= CF_RUNNING;
1484 }
1485
1486 prev->flags &= ~CF_RUNNING;
1487 next->flags |= CF_RUNNING;
1488
1489 /* first get rid of the old state */
1490 save_perl (aTHX_ prev);
1491
1492 if (expect_false (next->flags & CF_NEW))
1493 {
1494 /* need to start coroutine */
1495 next->flags &= ~CF_NEW;
1496 /* setup coroutine call */
1497 coro_setup (aTHX_ next);
1498 }
1499 else
1500 load_perl (aTHX_ next);
1501
1502 /* possibly untie and reuse the cctx */
1503 if (expect_true (
1504 cctx_current->idle_sp == STACKLEVEL
1505 && !(cctx_current->flags & CC_TRACE)
1506 && !force_cctx
1507 ))
1508 {
1509 /* I assume that stacklevel is a stronger indicator than PL_top_env changes */
1510 assert (("FATAL: current top_env must equal previous top_env in Coro (please report)", PL_top_env == cctx_current->idle_te));
1511
1512 /* if the cctx is about to be destroyed we need to make sure we won't see it in cctx_get. */
1513 /* without this the next cctx_get might destroy the running cctx while still in use */
1514 if (expect_false (CCTX_EXPIRED (cctx_current)))
1515 if (expect_true (!next->cctx))
1516 next->cctx = cctx_get (aTHX);
1517
1518 cctx_put (cctx_current);
1519 }
1520 else
1521 prev->cctx = cctx_current;
1522
1523 ++next->usecount;
1524
1525 cctx_prev = cctx_current;
1526 cctx_current = expect_false (next->cctx) ? next->cctx : cctx_get (aTHX);
1527
1528 next->cctx = 0;
1529
1530 if (expect_false (cctx_prev != cctx_current))
1531 {
1532 cctx_prev->top_env = PL_top_env;
1533 PL_top_env = cctx_current->top_env;
1534 coro_transfer (&cctx_prev->cctx, &cctx_current->cctx);
1535 }
1536
1537 transfer_tail (aTHX);
1538 }
1539}
1540
1541#define TRANSFER(ta, force_cctx) transfer (aTHX_ (ta).prev, (ta).next, (force_cctx))
1542#define TRANSFER_CHECK(ta) transfer_check (aTHX_ (ta).prev, (ta).next)
1543
1544/** high level stuff ********************************************************/
1545
1546static int
1547coro_state_destroy (pTHX_ struct coro *coro)
1548{
1549 if (coro->flags & CF_DESTROYED)
1550 return 0;
1551
1552 if (coro->on_destroy && !PL_dirty)
1553 coro->on_destroy (aTHX_ coro);
1554
1555 coro->flags |= CF_DESTROYED;
1556
1557 if (coro->flags & CF_READY)
1558 {
1559 /* reduce nready, as destroying a ready coro effectively unreadies it */
1560 /* alternative: look through all ready queues and remove the coro */
1561 --coro_nready;
1562 }
1563 else
1564 coro->flags |= CF_READY; /* make sure it is NOT put into the readyqueue */
1565
1566 if (coro->mainstack
1567 && coro->mainstack != main_mainstack
1568 && coro->slot
1569 && !PL_dirty)
1570 coro_destruct_perl (aTHX_ coro);
1571
1572 cctx_destroy (coro->cctx);
1573 SvREFCNT_dec (coro->startcv);
1574 SvREFCNT_dec (coro->args);
1575 SvREFCNT_dec (CORO_THROW);
1576
1577 if (coro->next) coro->next->prev = coro->prev;
1578 if (coro->prev) coro->prev->next = coro->next;
1579 if (coro == coro_first) coro_first = coro->next;
1580
1581 return 1;
1582}
1583
1584static int
1585coro_state_free (pTHX_ SV *sv, MAGIC *mg)
1586{
1587 struct coro *coro = (struct coro *)mg->mg_ptr;
1588 mg->mg_ptr = 0;
1589
1590 coro->hv = 0;
1591
1592 if (--coro->refcnt < 0)
1593 {
1594 coro_state_destroy (aTHX_ coro);
1595 Safefree (coro);
1596 }
1597
1598 return 0;
1599}
1600
1601static int
1602coro_state_dup (pTHX_ MAGIC *mg, CLONE_PARAMS *params)
1603{
1604 struct coro *coro = (struct coro *)mg->mg_ptr;
1605
1606 ++coro->refcnt;
1607
1608 return 0;
1609}
1610
1611static MGVTBL coro_state_vtbl = {
1612 0, 0, 0, 0,
1613 coro_state_free,
1614 0,
1615#ifdef MGf_DUP
1616 coro_state_dup,
1617#else
1618# define MGf_DUP 0
1619#endif
1620};
1621
1622static void
1623prepare_transfer (pTHX_ struct coro_transfer_args *ta, SV *prev_sv, SV *next_sv)
1624{
1625 ta->prev = SvSTATE (prev_sv);
1626 ta->next = SvSTATE (next_sv);
1627 TRANSFER_CHECK (*ta);
1628}
1629
1630static void
1631api_transfer (pTHX_ SV *prev_sv, SV *next_sv)
1632{
1633 struct coro_transfer_args ta;
1634
1635 prepare_transfer (aTHX_ &ta, prev_sv, next_sv);
1636 TRANSFER (ta, 1);
1637}
1638
1639/** Coro ********************************************************************/
1640
1641INLINE void
1642coro_enq (pTHX_ struct coro *coro)
1643{
1644 struct coro **ready = coro_ready [coro->prio - CORO_PRIO_MIN];
1645
1646 SvREFCNT_inc_NN (coro->hv);
1647
1648 coro->next_ready = 0;
1649 *(ready [0] ? &ready [1]->next_ready : &ready [0]) = coro;
1650 ready [1] = coro;
1651}
1652
1653INLINE struct coro *
1654coro_deq (pTHX)
1655{
1656 int prio;
1657
1658 for (prio = CORO_PRIO_MAX - CORO_PRIO_MIN + 1; --prio >= 0; )
1659 {
1660 struct coro **ready = coro_ready [prio];
1661
1662 if (ready [0])
1663 {
1664 struct coro *coro = ready [0];
1665 ready [0] = coro->next_ready;
1666 return coro;
1667 }
1668 }
1669
1670 return 0;
1671}
1672
1673static int
1674api_ready (pTHX_ SV *coro_sv)
1675{
1676 struct coro *coro;
1677 SV *sv_hook;
1678 void (*xs_hook)(void);
1679
1680 coro = SvSTATE (coro_sv);
1681
1682 if (coro->flags & CF_READY)
1683 return 0;
1684
1685 coro->flags |= CF_READY;
1686
1687 sv_hook = coro_nready ? 0 : coro_readyhook;
1688 xs_hook = coro_nready ? 0 : coroapi.readyhook;
1689
1690 coro_enq (aTHX_ coro);
1691 ++coro_nready;
1692
1693 if (sv_hook)
1694 {
1695 dSP;
1696
1697 ENTER;
1698 SAVETMPS;
1699
1700 PUSHMARK (SP);
1701 PUTBACK;
1702 call_sv (sv_hook, G_VOID | G_DISCARD);
1703
1704 FREETMPS;
1705 LEAVE;
1706 }
1707
1708 if (xs_hook)
1709 xs_hook ();
1710
1711 return 1;
1712}
1713
1714static int
1715api_is_ready (pTHX_ SV *coro_sv)
1716{
1717 return !!(SvSTATE (coro_sv)->flags & CF_READY);
1718}
1719
1720/* expects to own a reference to next->hv */
1721INLINE void
1722prepare_schedule_to (pTHX_ struct coro_transfer_args *ta, struct coro *next)
1723{
1724 SV *prev_sv = SvRV (coro_current);
1725
1726 ta->prev = SvSTATE_hv (prev_sv);
1727 ta->next = next;
1728
1729 TRANSFER_CHECK (*ta);
1730
1731 SvRV_set (coro_current, (SV *)next->hv);
1732
1733 free_coro_mortal (aTHX);
1734 coro_mortal = prev_sv;
1735}
1736
1737static void
1738prepare_schedule (pTHX_ struct coro_transfer_args *ta)
1739{
1740 for (;;)
1741 {
1742 struct coro *next = coro_deq (aTHX);
1743
1744 if (expect_true (next))
1745 {
1746 /* cannot transfer to destroyed coros, skip and look for next */
1747 if (expect_false (next->flags & (CF_DESTROYED | CF_SUSPENDED)))
1748 SvREFCNT_dec (next->hv); /* coro_nready has already been taken care of by destroy */
1749 else
1750 {
1751 next->flags &= ~CF_READY;
1752 --coro_nready;
1753
1754 prepare_schedule_to (aTHX_ ta, next);
1755 break;
1756 }
1757 }
1758 else
1759 {
1760 /* nothing to schedule: call the idle handler */
1761 if (SvROK (sv_idle)
1762 && SvOBJECT (SvRV (sv_idle)))
1763 {
1764 ++coro_nready; /* hack so that api_ready doesn't invoke ready hook */
1765 api_ready (aTHX_ SvRV (sv_idle));
1766 --coro_nready;
1767 }
1768 else
1769 {
1770 dSP;
1771
1772 ENTER;
1773 SAVETMPS;
1774
1775 PUSHMARK (SP);
1776 PUTBACK;
1777 call_sv (sv_idle, G_VOID | G_DISCARD);
1778
1779 FREETMPS;
1780 LEAVE;
1781 }
1782 }
1783 }
1784}
1785
1786INLINE void
1787prepare_cede (pTHX_ struct coro_transfer_args *ta)
1788{
1789 api_ready (aTHX_ coro_current);
1790 prepare_schedule (aTHX_ ta);
1791}
1792
1793INLINE void
1794prepare_cede_notself (pTHX_ struct coro_transfer_args *ta)
1795{
1796 SV *prev = SvRV (coro_current);
1797
1798 if (coro_nready)
1799 {
1800 prepare_schedule (aTHX_ ta);
1801 api_ready (aTHX_ prev);
1802 }
1803 else
1804 prepare_nop (aTHX_ ta);
1805}
1806
1807static void
1808api_schedule (pTHX)
1809{
1810 struct coro_transfer_args ta;
1811
1812 prepare_schedule (aTHX_ &ta);
1813 TRANSFER (ta, 1);
1814}
1815
1816static void
1817api_schedule_to (pTHX_ SV *coro_sv)
1818{
1819 struct coro_transfer_args ta;
1820 struct coro *next = SvSTATE (coro_sv);
1821
1822 SvREFCNT_inc_NN (coro_sv);
1823 prepare_schedule_to (aTHX_ &ta, next);
1824}
1825
1826static int
1827api_cede (pTHX)
1828{
1829 struct coro_transfer_args ta;
1830
1831 prepare_cede (aTHX_ &ta);
1832
1833 if (expect_true (ta.prev != ta.next))
1834 {
1835 TRANSFER (ta, 1);
1836 return 1;
1837 }
1838 else
1839 return 0;
1840}
1841
1842static int
1843api_cede_notself (pTHX)
1844{
1845 if (coro_nready)
1846 {
1847 struct coro_transfer_args ta;
1848
1849 prepare_cede_notself (aTHX_ &ta);
1850 TRANSFER (ta, 1);
1851 return 1;
1852 }
1853 else
1854 return 0;
1855}
1856
1857static void
1858api_trace (pTHX_ SV *coro_sv, int flags)
1859{
1860 struct coro *coro = SvSTATE (coro_sv);
1861
1862 if (coro->flags & CF_RUNNING)
1863 croak ("cannot enable tracing on a running coroutine, caught");
1864
1865 if (flags & CC_TRACE)
1866 {
1867 if (!coro->cctx)
1868 coro->cctx = cctx_new_run ();
1869 else if (!(coro->cctx->flags & CC_TRACE))
1870 croak ("cannot enable tracing on coroutine with custom stack, caught");
1871
1872 coro->cctx->flags |= CC_NOREUSE | (flags & (CC_TRACE | CC_TRACE_ALL));
1873 }
1874 else if (coro->cctx && coro->cctx->flags & CC_TRACE)
1875 {
1876 coro->cctx->flags &= ~(CC_TRACE | CC_TRACE_ALL);
1877
1878 if (coro->flags & CF_RUNNING)
1879 PL_runops = RUNOPS_DEFAULT;
1880 else
1881 coro->slot->runops = RUNOPS_DEFAULT;
1882 }
1883}
1884
1885static void
1886coro_call_on_destroy (pTHX_ struct coro *coro)
1887{
1888 SV **on_destroyp = hv_fetch (coro->hv, "_on_destroy", sizeof ("_on_destroy") - 1, 0);
1889 SV **statusp = hv_fetch (coro->hv, "_status", sizeof ("_status") - 1, 0);
1890
1891 if (on_destroyp)
1892 {
1893 AV *on_destroy = (AV *)SvRV (*on_destroyp);
1894
1895 while (AvFILLp (on_destroy) >= 0)
1896 {
1897 dSP; /* don't disturb outer sp */
1898 SV *cb = av_pop (on_destroy);
1899
1900 PUSHMARK (SP);
1901
1902 if (statusp)
1903 {
1904 int i;
1905 AV *status = (AV *)SvRV (*statusp);
1906 EXTEND (SP, AvFILLp (status) + 1);
1907
1908 for (i = 0; i <= AvFILLp (status); ++i)
1909 PUSHs (AvARRAY (status)[i]);
1910 }
1911
1912 PUTBACK;
1913 call_sv (sv_2mortal (cb), G_VOID | G_DISCARD);
1914 }
1915 }
1916}
1917
1918static void
1919slf_init_terminate (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
1920{
1921 int i;
1922 HV *hv = (HV *)SvRV (coro_current);
1923 AV *av = newAV ();
1924
1925 av_extend (av, items - 1);
1926 for (i = 0; i < items; ++i)
1927 av_push (av, SvREFCNT_inc_NN (arg [i]));
1928
1929 hv_store (hv, "_status", sizeof ("_status") - 1, newRV_noinc ((SV *)av), 0);
1930
1931 av_push (av_destroy, (SV *)newRV_inc ((SV *)hv)); /* RVinc for perl */
1932 api_ready (aTHX_ sv_manager);
1933
1934 frame->prepare = prepare_schedule;
1935 frame->check = slf_check_repeat;
1936
1937 /* as a minor optimisation, we could unwind all stacks here */
1938 /* but that puts extra pressure on pp_slf, and is not worth much */
1939 /*coro_unwind_stacks (aTHX);*/
1940}
1941
1942/*****************************************************************************/
1943/* async pool handler */
1944
1945static int
1946slf_check_pool_handler (pTHX_ struct CoroSLF *frame)
1947{
1948 HV *hv = (HV *)SvRV (coro_current);
1949 struct coro *coro = (struct coro *)frame->data;
1950
1951 if (!coro->invoke_cb)
1952 return 1; /* loop till we have invoke */
1953 else
1954 {
1955 hv_store (hv, "desc", sizeof ("desc") - 1,
1956 newSVpvn ("[async_pool]", sizeof ("[async_pool]") - 1), 0);
1957
1958 coro->saved_deffh = SvREFCNT_inc_NN ((SV *)PL_defoutgv);
1959
1960 {
1961 dSP;
1962 XPUSHs (sv_2mortal (coro->invoke_cb)); coro->invoke_cb = 0;
1963 PUTBACK;
1964 }
1965
1966 SvREFCNT_dec (GvAV (PL_defgv));
1967 GvAV (PL_defgv) = coro->invoke_av;
1968 coro->invoke_av = 0;
1969
1970 return 0;
1971 }
1972}
1973
1974static void
1975slf_init_pool_handler (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
1976{
1977 HV *hv = (HV *)SvRV (coro_current);
1978 struct coro *coro = SvSTATE_hv ((SV *)hv);
1979
1980 if (expect_true (coro->saved_deffh))
1981 {
1982 /* subsequent iteration */
1983 SvREFCNT_dec ((SV *)PL_defoutgv); PL_defoutgv = (GV *)coro->saved_deffh;
1984 coro->saved_deffh = 0;
1985
1986 if (coro_rss (aTHX_ coro) > SvUV (sv_pool_rss)
1987 || av_len (av_async_pool) + 1 >= SvIV (sv_pool_size))
1988 {
1989 coro->invoke_cb = SvREFCNT_inc_NN ((SV *)cv_coro_terminate);
1990 coro->invoke_av = newAV ();
1991
1992 frame->prepare = prepare_nop;
1993 }
1994 else
1995 {
1996 av_clear (GvAV (PL_defgv));
1997 hv_store (hv, "desc", sizeof ("desc") - 1, SvREFCNT_inc_NN (sv_async_pool_idle), 0);
1998
1999 coro->prio = 0;
2000
2001 if (coro->cctx && (coro->cctx->flags & CC_TRACE))
2002 api_trace (aTHX_ coro_current, 0);
2003
2004 frame->prepare = prepare_schedule;
2005 av_push (av_async_pool, SvREFCNT_inc (hv));
2006 }
2007 }
2008 else
2009 {
2010 /* first iteration, simply fall through */
2011 frame->prepare = prepare_nop;
2012 }
2013
2014 frame->check = slf_check_pool_handler;
2015 frame->data = (void *)coro;
2016}
2017
2018/*****************************************************************************/
2019/* rouse callback */
2020
2021#define CORO_MAGIC_type_rouse PERL_MAGIC_ext
2022
2023static void
2024coro_rouse_callback (pTHX_ CV *cv)
2025{
2026 dXSARGS;
2027 SV *data = (SV *)S_GENSUB_ARG;
2028
2029 if (SvTYPE (SvRV (data)) != SVt_PVAV)
2030 {
2031 /* first call, set args */
2032 SV *coro = SvRV (data);
2033 AV *av = newAV ();
2034
2035 SvRV_set (data, (SV *)av);
2036
2037 /* better take a full copy of the arguments */
2038 while (items--)
2039 av_store (av, items, newSVsv (ST (items)));
2040
2041 api_ready (aTHX_ coro);
2042 SvREFCNT_dec (coro);
2043 }
2044
2045 XSRETURN_EMPTY;
2046}
2047
2048static int
2049slf_check_rouse_wait (pTHX_ struct CoroSLF *frame)
2050{
2051 SV *data = (SV *)frame->data;
2052
2053 if (CORO_THROW)
2054 return 0;
2055
2056 if (SvTYPE (SvRV (data)) != SVt_PVAV)
2057 return 1;
2058
2059 /* now push all results on the stack */
2060 {
2061 dSP;
2062 AV *av = (AV *)SvRV (data);
2063 int i;
2064
2065 EXTEND (SP, AvFILLp (av) + 1);
2066 for (i = 0; i <= AvFILLp (av); ++i)
2067 PUSHs (sv_2mortal (AvARRAY (av)[i]));
2068
2069 /* we have stolen the elements, so set length to zero and free */
2070 AvFILLp (av) = -1;
2071 av_undef (av);
2072
2073 PUTBACK;
2074 }
2075
2076 return 0;
2077}
2078
2079static void
2080slf_init_rouse_wait (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2081{
2082 SV *cb;
2083
2084 if (items)
2085 cb = arg [0];
2086 else
2087 {
2088 struct coro *coro = SvSTATE_current;
2089
2090 if (!coro->rouse_cb)
2091 croak ("Coro::rouse_wait called without rouse callback, and no default rouse callback found either,");
2092
2093 cb = sv_2mortal (coro->rouse_cb);
2094 coro->rouse_cb = 0;
2095 }
2096
2097 if (!SvROK (cb)
2098 || SvTYPE (SvRV (cb)) != SVt_PVCV
2099 || CvXSUB ((CV *)SvRV (cb)) != coro_rouse_callback)
2100 croak ("Coro::rouse_wait called with illegal callback argument,");
2101
2102 {
2103 CV *cv = (CV *)SvRV (cb); /* for S_GENSUB_ARG */
2104 SV *data = (SV *)S_GENSUB_ARG;
2105
2106 frame->data = (void *)data;
2107 frame->prepare = SvTYPE (SvRV (data)) == SVt_PVAV ? prepare_nop : prepare_schedule;
2108 frame->check = slf_check_rouse_wait;
2109 }
2110}
2111
2112static SV *
2113coro_new_rouse_cb (pTHX)
2114{
2115 HV *hv = (HV *)SvRV (coro_current);
2116 struct coro *coro = SvSTATE_hv (hv);
2117 SV *data = newRV_inc ((SV *)hv);
2118 SV *cb = s_gensub (aTHX_ coro_rouse_callback, (void *)data);
2119
2120 sv_magicext (SvRV (cb), data, CORO_MAGIC_type_rouse, 0, 0, 0);
2121 SvREFCNT_dec (data); /* magicext increases the refcount */
2122
2123 SvREFCNT_dec (coro->rouse_cb);
2124 coro->rouse_cb = SvREFCNT_inc_NN (cb);
2125
2126 return cb;
2127}
2128
2129/*****************************************************************************/
2130/* schedule-like-function opcode (SLF) */
2131
2132static UNOP slf_restore; /* restore stack as entersub did, for first-re-run */
2133static const CV *slf_cv;
2134static SV **slf_argv;
2135static int slf_argc, slf_arga; /* count, allocated */
2136static I32 slf_ax; /* top of stack, for restore */
2137
2138/* this restores the stack in the case we patched the entersub, to */
2139/* recreate the stack frame as perl will on following calls */
2140/* since entersub cleared the stack */
2141static OP *
2142pp_restore (pTHX)
2143{
2144 int i;
2145 SV **SP = PL_stack_base + slf_ax;
2146
2147 PUSHMARK (SP);
2148
2149 EXTEND (SP, slf_argc + 1);
2150
2151 for (i = 0; i < slf_argc; ++i)
2152 PUSHs (sv_2mortal (slf_argv [i]));
2153
2154 PUSHs ((SV *)CvGV (slf_cv));
2155
2156 RETURNOP (slf_restore.op_first);
2157}
2158
2159static void
2160slf_prepare_transfer (pTHX_ struct coro_transfer_args *ta)
2161{
2162 SV **arg = (SV **)slf_frame.data;
2163
2164 prepare_transfer (aTHX_ ta, arg [0], arg [1]);
2165}
2166
2167static void
2168slf_init_transfer (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2169{
2170 if (items != 2)
2171 croak ("Coro::State::transfer (prev, next) expects two arguments, not %d,", items);
2172
2173 frame->prepare = slf_prepare_transfer;
2174 frame->check = slf_check_nop;
2175 frame->data = (void *)arg; /* let's hope it will stay valid */
2176}
2177
2178static void
2179slf_init_schedule (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2180{
2181 frame->prepare = prepare_schedule;
2182 frame->check = slf_check_nop;
2183}
2184
2185static void
2186slf_prepare_schedule_to (pTHX_ struct coro_transfer_args *ta)
2187{
2188 struct coro *next = (struct coro *)slf_frame.data;
2189
2190 SvREFCNT_inc_NN (next->hv);
2191 prepare_schedule_to (aTHX_ ta, next);
2192}
2193
2194static void
2195slf_init_schedule_to (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2196{
2197 if (!items)
2198 croak ("Coro::schedule_to expects a coroutine argument, caught");
2199
2200 frame->data = (void *)SvSTATE (arg [0]);
2201 frame->prepare = slf_prepare_schedule_to;
2202 frame->check = slf_check_nop;
2203}
2204
2205static void
2206slf_init_cede_to (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2207{
2208 api_ready (aTHX_ SvRV (coro_current));
2209
2210 slf_init_schedule_to (aTHX_ frame, cv, arg, items);
2211}
2212
2213static void
2214slf_init_cede (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2215{
2216 frame->prepare = prepare_cede;
2217 frame->check = slf_check_nop;
2218}
2219
2220static void
2221slf_init_cede_notself (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2222{
2223 frame->prepare = prepare_cede_notself;
2224 frame->check = slf_check_nop;
2225}
2226
2227/*
2228 * these not obviously related functions are all rolled into one
2229 * function to increase chances that they all will call transfer with the same
2230 * stack offset
2231 * SLF stands for "schedule-like-function".
2232 */
2233static OP *
2234pp_slf (pTHX)
2235{
2236 I32 checkmark; /* mark SP to see how many elements check has pushed */
2237
2238 /* set up the slf frame, unless it has already been set-up */
2239 /* the latter happens when a new coro has been started */
2240 /* or when a new cctx was attached to an existing coroutine */
2241 if (expect_true (!slf_frame.prepare))
2242 {
2243 /* first iteration */
2244 dSP;
2245 SV **arg = PL_stack_base + TOPMARK + 1;
2246 int items = SP - arg; /* args without function object */
2247 SV *gv = *sp;
2248
2249 /* do a quick consistency check on the "function" object, and if it isn't */
2250 /* for us, divert to the real entersub */
2251 if (SvTYPE (gv) != SVt_PVGV
2252 || !GvCV (gv)
2253 || !(CvFLAGS (GvCV (gv)) & CVf_SLF))
2254 return PL_ppaddr[OP_ENTERSUB](aTHX);
2255
2256 if (!(PL_op->op_flags & OPf_STACKED))
2257 {
2258 /* ampersand-form of call, use @_ instead of stack */
2259 AV *av = GvAV (PL_defgv);
2260 arg = AvARRAY (av);
2261 items = AvFILLp (av) + 1;
2262 }
2263
2264 /* now call the init function, which needs to set up slf_frame */
2265 ((coro_slf_cb)CvXSUBANY (GvCV (gv)).any_ptr)
2266 (aTHX_ &slf_frame, GvCV (gv), arg, items);
2267
2268 /* pop args */
2269 SP = PL_stack_base + POPMARK;
2270
2271 PUTBACK;
2272 }
2273
2274 /* now that we have a slf_frame, interpret it! */
2275 /* we use a callback system not to make the code needlessly */
2276 /* complicated, but so we can run multiple perl coros from one cctx */
2277
2278 do
2279 {
2280 struct coro_transfer_args ta;
2281
2282 slf_frame.prepare (aTHX_ &ta);
2283 TRANSFER (ta, 0);
2284
2285 checkmark = PL_stack_sp - PL_stack_base;
2286 }
2287 while (slf_frame.check (aTHX_ &slf_frame));
2288
2289 slf_frame.prepare = 0; /* invalidate the frame, we are done processing it */
2290
2291 /* exception handling */
2292 if (expect_false (CORO_THROW))
2293 {
2294 SV *exception = sv_2mortal (CORO_THROW);
2295
2296 CORO_THROW = 0;
2297 sv_setsv (ERRSV, exception);
2298 croak (0);
2299 }
2300
2301 /* return value handling - mostly like entersub */
2302 /* make sure we put something on the stack in scalar context */
2303 if (GIMME_V == G_SCALAR)
2304 {
2305 dSP;
2306 SV **bot = PL_stack_base + checkmark;
2307
2308 if (sp == bot) /* too few, push undef */
2309 bot [1] = &PL_sv_undef;
2310 else if (sp != bot + 1) /* too many, take last one */
2311 bot [1] = *sp;
2312
2313 SP = bot + 1;
2314
2315 PUTBACK;
2316 }
2317
2318 return NORMAL;
2319}
2320
2321static void
2322api_execute_slf (pTHX_ CV *cv, coro_slf_cb init_cb, I32 ax)
2323{
2324 int i;
2325 SV **arg = PL_stack_base + ax;
2326 int items = PL_stack_sp - arg + 1;
2327
2328 assert (("FATAL: SLF call with illegal CV value", !CvANON (cv)));
2329
2330 if (PL_op->op_ppaddr != PL_ppaddr [OP_ENTERSUB]
2331 && PL_op->op_ppaddr != pp_slf)
2332 croak ("FATAL: Coro SLF calls can only be made normally, not via goto or any other means, caught");
2333
2334 CvFLAGS (cv) |= CVf_SLF;
2335 CvXSUBANY (cv).any_ptr = (void *)init_cb;
2336 slf_cv = cv;
2337
2338 /* we patch the op, and then re-run the whole call */
2339 /* we have to put the same argument on the stack for this to work */
2340 /* and this will be done by pp_restore */
2341 slf_restore.op_next = (OP *)&slf_restore;
2342 slf_restore.op_type = OP_CUSTOM;
2343 slf_restore.op_ppaddr = pp_restore;
2344 slf_restore.op_first = PL_op;
2345
2346 slf_ax = ax - 1; /* undo the ax++ inside dAXMARK */
2347
2348 if (PL_op->op_flags & OPf_STACKED)
2349 {
2350 if (items > slf_arga)
2351 {
2352 slf_arga = items;
2353 free (slf_argv);
2354 slf_argv = malloc (slf_arga * sizeof (SV *));
2355 }
2356
2357 slf_argc = items;
2358
2359 for (i = 0; i < items; ++i)
2360 slf_argv [i] = SvREFCNT_inc (arg [i]);
2361 }
2362 else
2363 slf_argc = 0;
2364
2365 PL_op->op_ppaddr = pp_slf;
2366 /*PL_op->op_type = OP_CUSTOM; /* we do behave like entersub still */
2367
2368 PL_op = (OP *)&slf_restore;
2369}
2370
2371/*****************************************************************************/
2372/* dynamic wind */
2373
2374static void
2375on_enterleave_call (pTHX_ SV *cb)
2376{
2377 dSP;
2378
2379 PUSHSTACK;
2380
2381 PUSHMARK (SP);
2382 PUTBACK;
2383 call_sv (cb, G_VOID | G_DISCARD);
2384 SPAGAIN;
2385
2386 POPSTACK;
2387}
2388
2389static SV *
2390coro_avp_pop_and_free (pTHX_ AV **avp)
2391{
2392 AV *av = *avp;
2393 SV *res = av_pop (av);
2394
2395 if (AvFILLp (av) < 0)
2396 {
2397 *avp = 0;
2398 SvREFCNT_dec (av);
2399 }
2400
2401 return res;
2402}
2403
2404static void
2405coro_pop_on_enter (pTHX_ void *coro)
2406{
2407 SV *cb = coro_avp_pop_and_free (aTHX_ &((struct coro *)coro)->on_enter);
2408 SvREFCNT_dec (cb);
2409}
2410
2411static void
2412coro_pop_on_leave (pTHX_ void *coro)
2413{
2414 SV *cb = coro_avp_pop_and_free (aTHX_ &((struct coro *)coro)->on_leave);
2415 on_enterleave_call (aTHX_ sv_2mortal (cb));
2416}
2417
2418/*****************************************************************************/
2419/* PerlIO::cede */
2420
2421typedef struct
2422{
2423 PerlIOBuf base;
2424 NV next, every;
2425} PerlIOCede;
2426
2427static IV
2428PerlIOCede_pushed (pTHX_ PerlIO *f, const char *mode, SV *arg, PerlIO_funcs *tab)
2429{
2430 PerlIOCede *self = PerlIOSelf (f, PerlIOCede);
2431
2432 self->every = SvCUR (arg) ? SvNV (arg) : 0.01;
2433 self->next = nvtime () + self->every;
2434
2435 return PerlIOBuf_pushed (aTHX_ f, mode, Nullsv, tab);
2436}
2437
2438static SV *
2439PerlIOCede_getarg (pTHX_ PerlIO *f, CLONE_PARAMS *param, int flags)
2440{
2441 PerlIOCede *self = PerlIOSelf (f, PerlIOCede);
2442
2443 return newSVnv (self->every);
2444}
2445
2446static IV
2447PerlIOCede_flush (pTHX_ PerlIO *f)
2448{
2449 PerlIOCede *self = PerlIOSelf (f, PerlIOCede);
2450 double now = nvtime ();
2451
2452 if (now >= self->next)
2453 {
2454 api_cede (aTHX);
2455 self->next = now + self->every;
2456 }
2457
2458 return PerlIOBuf_flush (aTHX_ f);
2459}
2460
2461static PerlIO_funcs PerlIO_cede =
2462{
2463 sizeof(PerlIO_funcs),
2464 "cede",
2465 sizeof(PerlIOCede),
2466 PERLIO_K_DESTRUCT | PERLIO_K_RAW,
2467 PerlIOCede_pushed,
2468 PerlIOBuf_popped,
2469 PerlIOBuf_open,
2470 PerlIOBase_binmode,
2471 PerlIOCede_getarg,
2472 PerlIOBase_fileno,
2473 PerlIOBuf_dup,
2474 PerlIOBuf_read,
2475 PerlIOBuf_unread,
2476 PerlIOBuf_write,
2477 PerlIOBuf_seek,
2478 PerlIOBuf_tell,
2479 PerlIOBuf_close,
2480 PerlIOCede_flush,
2481 PerlIOBuf_fill,
2482 PerlIOBase_eof,
2483 PerlIOBase_error,
2484 PerlIOBase_clearerr,
2485 PerlIOBase_setlinebuf,
2486 PerlIOBuf_get_base,
2487 PerlIOBuf_bufsiz,
2488 PerlIOBuf_get_ptr,
2489 PerlIOBuf_get_cnt,
2490 PerlIOBuf_set_ptrcnt,
2491};
2492
2493/*****************************************************************************/
2494/* Coro::Semaphore & Coro::Signal */
2495
2496static SV *
2497coro_waitarray_new (pTHX_ int count)
2498{
2499 /* a waitarray=semaphore contains a counter IV in $sem->[0] and any waiters after that */
2500 AV *av = newAV ();
2501 SV **ary;
2502
2503 /* unfortunately, building manually saves memory */
2504 Newx (ary, 2, SV *);
2505 AvALLOC (av) = ary;
2506#if PERL_VERSION_ATLEAST (5,10,0)
2507 AvARRAY (av) = ary;
2508#else
2509 /* 5.8.8 needs this syntax instead of AvARRAY = ary, yet */
2510 /* -DDEBUGGING flags this as a bug, despite it perfectly working */
2511 SvPVX ((SV *)av) = (char *)ary;
2512#endif
2513 AvMAX (av) = 1;
2514 AvFILLp (av) = 0;
2515 ary [0] = newSViv (count);
2516
2517 return newRV_noinc ((SV *)av);
2518}
2519
2520/* semaphore */
2521
2522static void
2523coro_semaphore_adjust (pTHX_ AV *av, IV adjust)
2524{
2525 SV *count_sv = AvARRAY (av)[0];
2526 IV count = SvIVX (count_sv);
2527
2528 count += adjust;
2529 SvIVX (count_sv) = count;
2530
2531 /* now wake up as many waiters as are expected to lock */
2532 while (count > 0 && AvFILLp (av) > 0)
2533 {
2534 SV *cb;
2535
2536 /* swap first two elements so we can shift a waiter */
2537 AvARRAY (av)[0] = AvARRAY (av)[1];
2538 AvARRAY (av)[1] = count_sv;
2539 cb = av_shift (av);
2540
2541 if (SvOBJECT (cb))
2542 {
2543 api_ready (aTHX_ cb);
2544 --count;
2545 }
2546 else if (SvTYPE (cb) == SVt_PVCV)
2547 {
2548 dSP;
2549 PUSHMARK (SP);
2550 XPUSHs (sv_2mortal (newRV_inc ((SV *)av)));
2551 PUTBACK;
2552 call_sv (cb, G_VOID | G_DISCARD | G_EVAL | G_KEEPERR);
2553 }
2554
2555 SvREFCNT_dec (cb);
2556 }
2557}
2558
2559static void
2560coro_semaphore_on_destroy (pTHX_ struct coro *coro)
2561{
2562 /* call $sem->adjust (0) to possibly wake up some other waiters */
2563 coro_semaphore_adjust (aTHX_ (AV *)coro->slf_frame.data, 0);
2564}
2565
2566static int
2567slf_check_semaphore_down_or_wait (pTHX_ struct CoroSLF *frame, int acquire)
2568{
2569 AV *av = (AV *)frame->data;
2570 SV *count_sv = AvARRAY (av)[0];
2571
2572 /* if we are about to throw, don't actually acquire the lock, just throw */
2573 if (CORO_THROW)
2574 return 0;
2575 else if (SvIVX (count_sv) > 0)
2576 {
2577 SvSTATE_current->on_destroy = 0;
2578
2579 if (acquire)
2580 SvIVX (count_sv) = SvIVX (count_sv) - 1;
2581 else
2582 coro_semaphore_adjust (aTHX_ av, 0);
2583
2584 return 0;
2585 }
2586 else
2587 {
2588 int i;
2589 /* if we were woken up but can't down, we look through the whole */
2590 /* waiters list and only add us if we aren't in there already */
2591 /* this avoids some degenerate memory usage cases */
2592
2593 for (i = 1; i <= AvFILLp (av); ++i)
2594 if (AvARRAY (av)[i] == SvRV (coro_current))
2595 return 1;
2596
2597 av_push (av, SvREFCNT_inc (SvRV (coro_current)));
2598 return 1;
2599 }
2600}
2601
2602static int
2603slf_check_semaphore_down (pTHX_ struct CoroSLF *frame)
2604{
2605 return slf_check_semaphore_down_or_wait (aTHX_ frame, 1);
2606}
2607
2608static int
2609slf_check_semaphore_wait (pTHX_ struct CoroSLF *frame)
2610{
2611 return slf_check_semaphore_down_or_wait (aTHX_ frame, 0);
2612}
2613
2614static void
2615slf_init_semaphore_down_or_wait (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2616{
2617 AV *av = (AV *)SvRV (arg [0]);
2618
2619 if (SvIVX (AvARRAY (av)[0]) > 0)
2620 {
2621 frame->data = (void *)av;
2622 frame->prepare = prepare_nop;
2623 }
2624 else
2625 {
2626 av_push (av, SvREFCNT_inc (SvRV (coro_current)));
2627
2628 frame->data = (void *)sv_2mortal (SvREFCNT_inc ((SV *)av));
2629 frame->prepare = prepare_schedule;
2630
2631 /* to avoid race conditions when a woken-up coro gets terminated */
2632 /* we arrange for a temporary on_destroy that calls adjust (0) */
2633 SvSTATE_current->on_destroy = coro_semaphore_on_destroy;
2634 }
2635}
2636
2637static void
2638slf_init_semaphore_down (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2639{
2640 slf_init_semaphore_down_or_wait (aTHX_ frame, cv, arg, items);
2641 frame->check = slf_check_semaphore_down;
2642}
2643
2644static void
2645slf_init_semaphore_wait (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2646{
2647 if (items >= 2)
2648 {
2649 /* callback form */
2650 AV *av = (AV *)SvRV (arg [0]);
2651 SV *cb_cv = s_get_cv_croak (arg [1]);
2652
2653 av_push (av, SvREFCNT_inc_NN (cb_cv));
2654
2655 if (SvIVX (AvARRAY (av)[0]) > 0)
2656 coro_semaphore_adjust (aTHX_ av, 0);
2657
2658 frame->prepare = prepare_nop;
2659 frame->check = slf_check_nop;
2660 }
2661 else
2662 {
2663 slf_init_semaphore_down_or_wait (aTHX_ frame, cv, arg, items);
2664 frame->check = slf_check_semaphore_wait;
2665 }
2666}
2667
2668/* signal */
2669
2670static void
2671coro_signal_wake (pTHX_ AV *av, int count)
2672{
2673 SvIVX (AvARRAY (av)[0]) = 0;
2674
2675 /* now signal count waiters */
2676 while (count > 0 && AvFILLp (av) > 0)
2677 {
2678 SV *cb;
2679
2680 /* swap first two elements so we can shift a waiter */
2681 cb = AvARRAY (av)[0];
2682 AvARRAY (av)[0] = AvARRAY (av)[1];
2683 AvARRAY (av)[1] = cb;
2684
2685 cb = av_shift (av);
2686
2687 if (SvTYPE (cb) == SVt_PVCV)
2688 {
2689 dSP;
2690 PUSHMARK (SP);
2691 XPUSHs (sv_2mortal (newRV_inc ((SV *)av)));
2692 PUTBACK;
2693 call_sv (cb, G_VOID | G_DISCARD | G_EVAL | G_KEEPERR);
2694 }
2695 else
2696 {
2697 api_ready (aTHX_ cb);
2698 sv_setiv (cb, 0); /* signal waiter */
2699 }
2700
2701 SvREFCNT_dec (cb);
2702
2703 --count;
2704 }
2705}
2706
2707static int
2708slf_check_signal_wait (pTHX_ struct CoroSLF *frame)
2709{
2710 /* if we are about to throw, also stop waiting */
2711 return SvROK ((SV *)frame->data) && !CORO_THROW;
2712}
2713
2714static void
2715slf_init_signal_wait (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2716{
2717 AV *av = (AV *)SvRV (arg [0]);
2718
2719 if (items >= 2)
2720 {
2721 SV *cb_cv = s_get_cv_croak (arg [1]);
2722 av_push (av, SvREFCNT_inc_NN (cb_cv));
2723
2724 if (SvIVX (AvARRAY (av)[0]))
2725 coro_signal_wake (aTHX_ av, 1); /* ust be the only waiter */
2726
2727 frame->prepare = prepare_nop;
2728 frame->check = slf_check_nop;
2729 }
2730 else if (SvIVX (AvARRAY (av)[0]))
2731 {
2732 SvIVX (AvARRAY (av)[0]) = 0;
2733 frame->prepare = prepare_nop;
2734 frame->check = slf_check_nop;
2735 }
2736 else
2737 {
2738 SV *waiter = newSVsv (coro_current); /* owned by signal av */
2739
2740 av_push (av, waiter);
2741
2742 frame->data = (void *)sv_2mortal (SvREFCNT_inc_NN (waiter)); /* owned by process */
2743 frame->prepare = prepare_schedule;
2744 frame->check = slf_check_signal_wait;
2745 }
2746}
2747
2748/*****************************************************************************/
2749/* Coro::AIO */
2750
2751#define CORO_MAGIC_type_aio PERL_MAGIC_ext
2752
2753/* helper storage struct */
2754struct io_state
2755{
2756 int errorno;
2757 I32 laststype; /* U16 in 5.10.0 */
2758 int laststatval;
2759 Stat_t statcache;
2760};
2761
2762static void
2763coro_aio_callback (pTHX_ CV *cv)
2764{
2765 dXSARGS;
2766 AV *state = (AV *)S_GENSUB_ARG;
2767 SV *coro = av_pop (state);
2768 SV *data_sv = newSV (sizeof (struct io_state));
2769
2770 av_extend (state, items - 1);
2771
2772 sv_upgrade (data_sv, SVt_PV);
2773 SvCUR_set (data_sv, sizeof (struct io_state));
2774 SvPOK_only (data_sv);
2775
2776 {
2777 struct io_state *data = (struct io_state *)SvPVX (data_sv);
2778
2779 data->errorno = errno;
2780 data->laststype = PL_laststype;
2781 data->laststatval = PL_laststatval;
2782 data->statcache = PL_statcache;
2783 }
2784
2785 /* now build the result vector out of all the parameters and the data_sv */
2786 {
2787 int i;
2788
2789 for (i = 0; i < items; ++i)
2790 av_push (state, SvREFCNT_inc_NN (ST (i)));
2791 }
2792
2793 av_push (state, data_sv);
2794
2795 api_ready (aTHX_ coro);
2796 SvREFCNT_dec (coro);
2797 SvREFCNT_dec ((AV *)state);
2798}
2799
2800static int
2801slf_check_aio_req (pTHX_ struct CoroSLF *frame)
2802{
2803 AV *state = (AV *)frame->data;
2804
2805 /* if we are about to throw, return early */
2806 /* this does not cancel the aio request, but at least */
2807 /* it quickly returns */
2808 if (CORO_THROW)
2809 return 0;
2810
2811 /* one element that is an RV? repeat! */
2812 if (AvFILLp (state) == 0 && SvROK (AvARRAY (state)[0]))
2813 return 1;
2814
2815 /* restore status */
2816 {
2817 SV *data_sv = av_pop (state);
2818 struct io_state *data = (struct io_state *)SvPVX (data_sv);
2819
2820 errno = data->errorno;
2821 PL_laststype = data->laststype;
2822 PL_laststatval = data->laststatval;
2823 PL_statcache = data->statcache;
2824
2825 SvREFCNT_dec (data_sv);
2826 }
2827
2828 /* push result values */
2829 {
2830 dSP;
2831 int i;
2832
2833 EXTEND (SP, AvFILLp (state) + 1);
2834 for (i = 0; i <= AvFILLp (state); ++i)
2835 PUSHs (sv_2mortal (SvREFCNT_inc_NN (AvARRAY (state)[i])));
2836
2837 PUTBACK;
2838 }
2839
2840 return 0;
2841}
2842
2843static void
2844slf_init_aio_req (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2845{
2846 AV *state = (AV *)sv_2mortal ((SV *)newAV ());
2847 SV *coro_hv = SvRV (coro_current);
2848 struct coro *coro = SvSTATE_hv (coro_hv);
2849
2850 /* put our coroutine id on the state arg */
2851 av_push (state, SvREFCNT_inc_NN (coro_hv));
2852
2853 /* first see whether we have a non-zero priority and set it as AIO prio */
2854 if (coro->prio)
2855 {
2856 dSP;
2857
2858 static SV *prio_cv;
2859 static SV *prio_sv;
2860
2861 if (expect_false (!prio_cv))
2862 {
2863 prio_cv = (SV *)get_cv ("IO::AIO::aioreq_pri", 0);
2864 prio_sv = newSViv (0);
2865 }
2866
2867 PUSHMARK (SP);
2868 sv_setiv (prio_sv, coro->prio);
2869 XPUSHs (prio_sv);
2870
2871 PUTBACK;
2872 call_sv (prio_cv, G_VOID | G_DISCARD);
2873 }
2874
2875 /* now call the original request */
2876 {
2877 dSP;
2878 CV *req = (CV *)CORO_MAGIC_NN ((SV *)cv, CORO_MAGIC_type_aio)->mg_obj;
2879 int i;
2880
2881 PUSHMARK (SP);
2882
2883 /* first push all args to the stack */
2884 EXTEND (SP, items + 1);
2885
2886 for (i = 0; i < items; ++i)
2887 PUSHs (arg [i]);
2888
2889 /* now push the callback closure */
2890 PUSHs (sv_2mortal (s_gensub (aTHX_ coro_aio_callback, (void *)SvREFCNT_inc_NN ((SV *)state))));
2891
2892 /* now call the AIO function - we assume our request is uncancelable */
2893 PUTBACK;
2894 call_sv ((SV *)req, G_VOID | G_DISCARD);
2895 }
2896
2897 /* now that the requets is going, we loop toll we have a result */
2898 frame->data = (void *)state;
2899 frame->prepare = prepare_schedule;
2900 frame->check = slf_check_aio_req;
2901}
2902
2903static void
2904coro_aio_req_xs (pTHX_ CV *cv)
2905{
2906 dXSARGS;
2907
2908 CORO_EXECUTE_SLF_XS (slf_init_aio_req);
2909
2910 XSRETURN_EMPTY;
2911}
2912
2913/*****************************************************************************/
2914
2915#if CORO_CLONE
2916# include "clone.c"
2917#endif
2918
2919MODULE = Coro::State PACKAGE = Coro::State PREFIX = api_
2920
2921PROTOTYPES: DISABLE
2922
2923BOOT:
2924{
2925#ifdef USE_ITHREADS
2926# if CORO_PTHREAD
2927 coro_thx = PERL_GET_CONTEXT;
2928# endif
2929#endif
2930 BOOT_PAGESIZE;
2931
2932 cctx_current = cctx_new_empty ();
2933
2934 irsgv = gv_fetchpv ("/" , GV_ADD|GV_NOTQUAL, SVt_PV);
2935 stdoutgv = gv_fetchpv ("STDOUT", GV_ADD|GV_NOTQUAL, SVt_PVIO);
2936
2937 orig_sigelem_get = PL_vtbl_sigelem.svt_get; PL_vtbl_sigelem.svt_get = coro_sigelem_get;
2938 orig_sigelem_set = PL_vtbl_sigelem.svt_set; PL_vtbl_sigelem.svt_set = coro_sigelem_set;
2939 orig_sigelem_clr = PL_vtbl_sigelem.svt_clear; PL_vtbl_sigelem.svt_clear = coro_sigelem_clr;
2940
2941 hv_sig = coro_get_hv (aTHX_ "SIG", TRUE);
2942 rv_diehook = newRV_inc ((SV *)gv_fetchpv ("Coro::State::diehook" , 0, SVt_PVCV));
2943 rv_warnhook = newRV_inc ((SV *)gv_fetchpv ("Coro::State::warnhook", 0, SVt_PVCV));
2944
2945 coro_state_stash = gv_stashpv ("Coro::State", TRUE);
2946
2947 newCONSTSUB (coro_state_stash, "CC_TRACE" , newSViv (CC_TRACE));
2948 newCONSTSUB (coro_state_stash, "CC_TRACE_SUB" , newSViv (CC_TRACE_SUB));
2949 newCONSTSUB (coro_state_stash, "CC_TRACE_LINE", newSViv (CC_TRACE_LINE));
2950 newCONSTSUB (coro_state_stash, "CC_TRACE_ALL" , newSViv (CC_TRACE_ALL));
2951
2952 main_mainstack = PL_mainstack;
2953 main_top_env = PL_top_env;
2954
2955 while (main_top_env->je_prev)
2956 main_top_env = main_top_env->je_prev;
2957
2958 {
2959 SV *slf = sv_2mortal (newSViv (PTR2IV (pp_slf)));
2960
2961 if (!PL_custom_op_names) PL_custom_op_names = newHV ();
2962 hv_store_ent (PL_custom_op_names, slf, newSVpv ("coro_slf", 0), 0);
2963
2964 if (!PL_custom_op_descs) PL_custom_op_descs = newHV ();
2965 hv_store_ent (PL_custom_op_descs, slf, newSVpv ("coro schedule like function", 0), 0);
2966 }
2967
2968 coroapi.ver = CORO_API_VERSION;
2969 coroapi.rev = CORO_API_REVISION;
2970
2971 coroapi.transfer = api_transfer;
2972
2973 coroapi.sv_state = SvSTATE_;
2974 coroapi.execute_slf = api_execute_slf;
2975 coroapi.prepare_nop = prepare_nop;
2976 coroapi.prepare_schedule = prepare_schedule;
2977 coroapi.prepare_cede = prepare_cede;
2978 coroapi.prepare_cede_notself = prepare_cede_notself;
2979
2980 {
2981 SV **svp = hv_fetch (PL_modglobal, "Time::NVtime", 12, 0);
2982
2983 if (!svp) croak ("Time::HiRes is required");
2984 if (!SvIOK (*svp)) croak ("Time::NVtime isn't a function pointer");
2985
2986 nvtime = INT2PTR (double (*)(), SvIV (*svp));
2987
2988 svp = hv_fetch (PL_modglobal, "Time::U2time", 12, 0);
2989 u2time = INT2PTR (void (*)(pTHX_ UV ret[2]), SvIV (*svp));
2990 }
2991
2992 assert (("PRIO_NORMAL must be 0", !CORO_PRIO_NORMAL));
2993}
2994
2995SV *
2996new (char *klass, ...)
2997 ALIAS:
2998 Coro::new = 1
2999 CODE:
3000{
3001 struct coro *coro;
3002 MAGIC *mg;
3003 HV *hv;
3004 SV *cb;
3005 int i;
3006
3007 if (items > 1)
3008 {
3009 cb = s_get_cv_croak (ST (1));
3010
3011 if (!ix)
215 { 3012 {
216 /* I never used formats, so how should I know how these are implemented? */ 3013 if (CvISXSUB (cb))
217 /* my bold guess is as a simple, plain sub... */ 3014 croak ("Coro::State doesn't support XS functions as coroutine start, caught");
218 croak ("CXt_FORMAT not yet handled. Don't switch coroutines from within formats"); 3015
3016 if (!CvROOT (cb))
3017 croak ("Coro::State doesn't support autoloaded or undefined functions as coroutine start, caught");
219 } 3018 }
220 } 3019 }
221 3020
222 if (top_si->si_type == PERLSI_MAIN)
223 break;
224
225 top_si = top_si->si_prev;
226 ccstk = top_si->si_cxstack;
227 cxix = top_si->si_cxix;
228 }
229
230 PUTBACK;
231 }
232
233 c->dowarn = PL_dowarn;
234 c->defav = GvAV (PL_defgv);
235 c->curstackinfo = PL_curstackinfo;
236 c->curstack = PL_curstack;
237 c->mainstack = PL_mainstack;
238 c->stack_sp = PL_stack_sp;
239 c->op = PL_op;
240 c->curpad = PL_curpad;
241 c->stack_base = PL_stack_base;
242 c->stack_max = PL_stack_max;
243 c->tmps_stack = PL_tmps_stack;
244 c->tmps_floor = PL_tmps_floor;
245 c->tmps_ix = PL_tmps_ix;
246 c->tmps_max = PL_tmps_max;
247 c->markstack = PL_markstack;
248 c->markstack_ptr = PL_markstack_ptr;
249 c->markstack_max = PL_markstack_max;
250 c->scopestack = PL_scopestack;
251 c->scopestack_ix = PL_scopestack_ix;
252 c->scopestack_max = PL_scopestack_max;
253 c->savestack = PL_savestack;
254 c->savestack_ix = PL_savestack_ix;
255 c->savestack_max = PL_savestack_max;
256 c->retstack = PL_retstack;
257 c->retstack_ix = PL_retstack_ix;
258 c->retstack_max = PL_retstack_max;
259 c->curcop = PL_curcop;
260}
261
262static void
263LOAD(pTHX_ Coro__State c)
264{
265 PL_dowarn = c->dowarn;
266 GvAV (PL_defgv) = c->defav;
267 PL_curstackinfo = c->curstackinfo;
268 PL_curstack = c->curstack;
269 PL_mainstack = c->mainstack;
270 PL_stack_sp = c->stack_sp;
271 PL_op = c->op;
272 PL_curpad = c->curpad;
273 PL_stack_base = c->stack_base;
274 PL_stack_max = c->stack_max;
275 PL_tmps_stack = c->tmps_stack;
276 PL_tmps_floor = c->tmps_floor;
277 PL_tmps_ix = c->tmps_ix;
278 PL_tmps_max = c->tmps_max;
279 PL_markstack = c->markstack;
280 PL_markstack_ptr = c->markstack_ptr;
281 PL_markstack_max = c->markstack_max;
282 PL_scopestack = c->scopestack;
283 PL_scopestack_ix = c->scopestack_ix;
284 PL_scopestack_max = c->scopestack_max;
285 PL_savestack = c->savestack;
286 PL_savestack_ix = c->savestack_ix;
287 PL_savestack_max = c->savestack_max;
288 PL_retstack = c->retstack;
289 PL_retstack_ix = c->retstack_ix;
290 PL_retstack_max = c->retstack_max;
291 PL_curcop = c->curcop;
292
293 {
294 dSP;
295 CV *cv;
296
297 /* now do the ugly restore mess */
298 while ((cv = (CV *)POPs))
299 {
300 AV *padlist = (AV *)POPs;
301
302 unuse_padlist (CvPADLIST(cv));
303 CvPADLIST(cv) = padlist;
304 CvDEPTH(cv) = (I32)POPs;
305
306#ifdef USE_THREADS
307 CvOWNER(cv) = (struct perl_thread *)POPs;
308 error does not work either
309#endif
310 }
311
312 PUTBACK;
313 }
314}
315
316/* this is an EXACT copy of S_nuke_stacks in perl.c, which is unfortunately static */
317STATIC void
318S_nuke_stacks(pTHX)
319{
320 while (PL_curstackinfo->si_next)
321 PL_curstackinfo = PL_curstackinfo->si_next;
322 while (PL_curstackinfo) {
323 PERL_SI *p = PL_curstackinfo->si_prev;
324 /* curstackinfo->si_stack got nuked by sv_free_arenas() */
325 Safefree(PL_curstackinfo->si_cxstack);
326 Safefree(PL_curstackinfo);
327 PL_curstackinfo = p;
328 }
329 Safefree(PL_tmps_stack);
330 Safefree(PL_markstack);
331 Safefree(PL_scopestack);
332 Safefree(PL_savestack);
333 Safefree(PL_retstack);
334}
335
336#define SUB_INIT "Coro::State::_newcoro"
337
338MODULE = Coro::State PACKAGE = Coro::State
339
340PROTOTYPES: ENABLE
341
342BOOT:
343 if (!padlist_cache)
344 padlist_cache = newHV ();
345
346Coro::State
347_newprocess(args)
348 SV * args
349 PROTOTYPE: $
350 CODE:
351 Coro__State coro;
352
353 if (!SvROK (args) || SvTYPE (SvRV (args)) != SVt_PVAV)
354 croak ("Coro::State::newprocess expects an arrayref");
355
356 New (0, coro, 1, struct coro); 3021 Newz (0, coro, 1, struct coro);
3022 coro->args = newAV ();
3023 coro->flags = CF_NEW;
357 3024
358 coro->mainstack = 0; /* actual work is done inside transfer */ 3025 if (coro_first) coro_first->prev = coro;
359 coro->args = (AV *)SvREFCNT_inc (SvRV (args)); 3026 coro->next = coro_first;
3027 coro_first = coro;
360 3028
361 RETVAL = coro; 3029 coro->hv = hv = newHV ();
3030 mg = sv_magicext ((SV *)hv, 0, CORO_MAGIC_type_state, &coro_state_vtbl, (char *)coro, 0);
3031 mg->mg_flags |= MGf_DUP;
3032 RETVAL = sv_bless (newRV_noinc ((SV *)hv), gv_stashpv (klass, 1));
3033
3034 if (items > 1)
3035 {
3036 av_extend (coro->args, items - 1 + ix - 1);
3037
3038 if (ix)
3039 {
3040 av_push (coro->args, SvREFCNT_inc_NN ((SV *)cb));
3041 cb = (SV *)cv_coro_run;
3042 }
3043
3044 coro->startcv = (CV *)SvREFCNT_inc_NN ((SV *)cb);
3045
3046 for (i = 2; i < items; i++)
3047 av_push (coro->args, newSVsv (ST (i)));
3048 }
3049}
362 OUTPUT: 3050 OUTPUT:
363 RETVAL 3051 RETVAL
364 3052
365void 3053void
366transfer(prev,next) 3054transfer (...)
367 Coro::State_or_hashref prev 3055 PROTOTYPE: $$
368 Coro::State_or_hashref next 3056 CODE:
369 CODE: 3057 CORO_EXECUTE_SLF_XS (slf_init_transfer);
370 3058
371 if (prev != next) 3059bool
3060_destroy (SV *coro_sv)
3061 CODE:
3062 RETVAL = coro_state_destroy (aTHX_ SvSTATE (coro_sv));
3063 OUTPUT:
3064 RETVAL
3065
3066void
3067_exit (int code)
3068 PROTOTYPE: $
3069 CODE:
3070 _exit (code);
3071
3072SV *
3073clone (Coro::State coro)
3074 CODE:
3075{
3076#if CORO_CLONE
3077 struct coro *ncoro = coro_clone (aTHX_ coro);
3078 MAGIC *mg;
3079 /* TODO: too much duplication */
3080 ncoro->hv = newHV ();
3081 mg = sv_magicext ((SV *)ncoro->hv, 0, CORO_MAGIC_type_state, &coro_state_vtbl, (char *)ncoro, 0);
3082 mg->mg_flags |= MGf_DUP;
3083 RETVAL = sv_bless (newRV_noinc ((SV *)ncoro->hv), SvSTASH (coro->hv));
3084#else
3085 croak ("Coro::State->clone has not been configured into this installation of Coro, realised");
3086#endif
3087}
3088 OUTPUT:
3089 RETVAL
3090
3091int
3092cctx_stacksize (int new_stacksize = 0)
3093 PROTOTYPE: ;$
3094 CODE:
3095 RETVAL = cctx_stacksize;
3096 if (new_stacksize)
372 { 3097 {
373 PUTBACK; 3098 cctx_stacksize = new_stacksize;
374 SAVE (aTHX_ prev); 3099 ++cctx_gen;
375
376 /* 3100 }
377 * this could be done in newprocess which would lead to 3101 OUTPUT:
378 * extremely elegant and fast (just PUTBACK/SAVE/LOAD/SPAGAIN) 3102 RETVAL
379 * code here, but lazy allocation of stacks has also 3103
380 * some virtues and the overhead of the if() is nil. 3104int
3105cctx_max_idle (int max_idle = 0)
3106 PROTOTYPE: ;$
3107 CODE:
3108 RETVAL = cctx_max_idle;
3109 if (max_idle > 1)
3110 cctx_max_idle = max_idle;
3111 OUTPUT:
3112 RETVAL
3113
3114int
3115cctx_count ()
3116 PROTOTYPE:
3117 CODE:
3118 RETVAL = cctx_count;
3119 OUTPUT:
3120 RETVAL
3121
3122int
3123cctx_idle ()
3124 PROTOTYPE:
3125 CODE:
3126 RETVAL = cctx_idle;
3127 OUTPUT:
3128 RETVAL
3129
3130void
3131list ()
3132 PROTOTYPE:
3133 PPCODE:
3134{
3135 struct coro *coro;
3136 for (coro = coro_first; coro; coro = coro->next)
3137 if (coro->hv)
3138 XPUSHs (sv_2mortal (newRV_inc ((SV *)coro->hv)));
3139}
3140
3141void
3142call (Coro::State coro, SV *coderef)
3143 ALIAS:
3144 eval = 1
3145 CODE:
3146{
3147 if (coro->mainstack && ((coro->flags & CF_RUNNING) || coro->slot))
381 */ 3148 {
382 if (next->mainstack) 3149 struct coro *current = SvSTATE_current;
3150
3151 if (current != coro)
383 { 3152 {
384 LOAD (aTHX_ next); 3153 PUTBACK;
385 next->mainstack = 0; /* unnecessary but much cleaner */ 3154 save_perl (aTHX_ current);
3155 load_perl (aTHX_ coro);
386 SPAGAIN; 3156 SPAGAIN;
387 } 3157 }
3158
3159 PUSHSTACK;
3160
3161 PUSHMARK (SP);
3162 PUTBACK;
3163
3164 if (ix)
3165 eval_sv (coderef, 0);
388 else 3166 else
3167 call_sv (coderef, G_KEEPERR | G_EVAL | G_VOID | G_DISCARD);
3168
3169 POPSTACK;
3170 SPAGAIN;
3171
3172 if (current != coro)
389 { 3173 {
390 /* 3174 PUTBACK;
391 * emulate part of the perl startup here. 3175 save_perl (aTHX_ coro);
392 */ 3176 load_perl (aTHX_ current);
393 UNOP myop;
394
395 init_stacks ();
396 PL_op = (OP *)&myop;
397 /*PL_curcop = 0;*/
398 GvAV (PL_defgv) = (SV *)SvREFCNT_inc (next->args);
399
400 SPAGAIN; 3177 SPAGAIN;
401 Zero(&myop, 1, UNOP);
402 myop.op_next = Nullop;
403 myop.op_flags = OPf_WANT_VOID;
404
405 PUSHMARK(SP);
406 XPUSHs ((SV*)get_cv(SUB_INIT, TRUE));
407 PUTBACK;
408 /*
409 * the next line is slightly wrong, as PL_op->op_next
410 * is actually being executed so we skip the first op.
411 * that doesn't matter, though, since it is only
412 * pp_nextstate and we never return...
413 */
414 PL_op = Perl_pp_entersub(aTHX);
415 SPAGAIN;
416
417 ENTER;
418 } 3178 }
419 } 3179 }
3180}
3181
3182SV *
3183is_ready (Coro::State coro)
3184 PROTOTYPE: $
3185 ALIAS:
3186 is_ready = CF_READY
3187 is_running = CF_RUNNING
3188 is_new = CF_NEW
3189 is_destroyed = CF_DESTROYED
3190 is_suspended = CF_SUSPENDED
3191 CODE:
3192 RETVAL = boolSV (coro->flags & ix);
3193 OUTPUT:
3194 RETVAL
420 3195
421void 3196void
422DESTROY(coro) 3197throw (Coro::State self, SV *throw = &PL_sv_undef)
423 Coro::State coro 3198 PROTOTYPE: $;$
424 CODE: 3199 CODE:
3200{
3201 struct coro *current = SvSTATE_current;
3202 SV **throwp = self == current ? &CORO_THROW : &self->except;
3203 SvREFCNT_dec (*throwp);
3204 SvGETMAGIC (throw);
3205 *throwp = SvOK (throw) ? newSVsv (throw) : 0;
3206}
425 3207
426 if (coro->mainstack) 3208void
3209api_trace (SV *coro, int flags = CC_TRACE | CC_TRACE_SUB)
3210 PROTOTYPE: $;$
3211 C_ARGS: aTHX_ coro, flags
3212
3213SV *
3214has_cctx (Coro::State coro)
3215 PROTOTYPE: $
3216 CODE:
3217 /* maybe manage the running flag differently */
3218 RETVAL = boolSV (!!coro->cctx || (coro->flags & CF_RUNNING));
3219 OUTPUT:
3220 RETVAL
3221
3222int
3223is_traced (Coro::State coro)
3224 PROTOTYPE: $
3225 CODE:
3226 RETVAL = (coro->cctx ? coro->cctx->flags : 0) & CC_TRACE_ALL;
3227 OUTPUT:
3228 RETVAL
3229
3230UV
3231rss (Coro::State coro)
3232 PROTOTYPE: $
3233 ALIAS:
3234 usecount = 1
3235 CODE:
3236 switch (ix)
3237 {
3238 case 0: RETVAL = coro_rss (aTHX_ coro); break;
3239 case 1: RETVAL = coro->usecount; break;
3240 }
3241 OUTPUT:
3242 RETVAL
3243
3244void
3245force_cctx ()
3246 PROTOTYPE:
3247 CODE:
3248 cctx_current->idle_sp = 0;
3249
3250void
3251swap_defsv (Coro::State self)
3252 PROTOTYPE: $
3253 ALIAS:
3254 swap_defav = 1
3255 CODE:
3256 if (!self->slot)
3257 croak ("cannot swap state with coroutine that has no saved state,");
3258 else
427 { 3259 {
428 struct coro temp; 3260 SV **src = ix ? (SV **)&GvAV (PL_defgv) : &GvSV (PL_defgv);
3261 SV **dst = ix ? (SV **)&self->slot->defav : (SV **)&self->slot->defsv;
429 3262
3263 SV *tmp = *src; *src = *dst; *dst = tmp;
3264 }
3265
3266void
3267cancel (Coro::State self)
3268 CODE:
3269 coro_state_destroy (aTHX_ self);
3270 coro_call_on_destroy (aTHX_ self); /* actually only for Coro objects */
3271
3272
3273SV *
3274enable_times (int enabled = enable_times)
3275 CODE:
3276{
3277 RETVAL = boolSV (enable_times);
3278
3279 if (enabled != enable_times)
3280 {
3281 enable_times = enabled;
3282
3283 coro_times_update ();
3284 (enabled ? coro_times_sub : coro_times_add)(SvSTATE (coro_current));
3285 }
3286}
3287 OUTPUT:
3288 RETVAL
3289
3290void
3291times (Coro::State self)
3292 PPCODE:
3293{
3294 struct coro *current = SvSTATE (coro_current);
3295
3296 if (expect_false (current == self))
3297 {
3298 coro_times_update ();
3299 coro_times_add (SvSTATE (coro_current));
3300 }
3301
3302 EXTEND (SP, 2);
3303 PUSHs (sv_2mortal (newSVnv (self->t_real [0] + self->t_real [1] * 1e-9)));
3304 PUSHs (sv_2mortal (newSVnv (self->t_cpu [0] + self->t_cpu [1] * 1e-9)));
3305
3306 if (expect_false (current == self))
3307 coro_times_sub (SvSTATE (coro_current));
3308}
3309
3310MODULE = Coro::State PACKAGE = Coro
3311
3312BOOT:
3313{
3314 sv_pool_rss = coro_get_sv (aTHX_ "Coro::POOL_RSS" , TRUE);
3315 sv_pool_size = coro_get_sv (aTHX_ "Coro::POOL_SIZE" , TRUE);
3316 cv_coro_run = get_cv ( "Coro::_coro_run" , GV_ADD);
3317 cv_coro_terminate = get_cv ( "Coro::terminate" , GV_ADD);
3318 coro_current = coro_get_sv (aTHX_ "Coro::current" , FALSE); SvREADONLY_on (coro_current);
3319 av_async_pool = coro_get_av (aTHX_ "Coro::async_pool", TRUE);
3320 av_destroy = coro_get_av (aTHX_ "Coro::destroy" , TRUE);
3321 sv_manager = coro_get_sv (aTHX_ "Coro::manager" , TRUE);
3322 sv_idle = coro_get_sv (aTHX_ "Coro::idle" , TRUE);
3323
3324 sv_async_pool_idle = newSVpv ("[async pool idle]", 0); SvREADONLY_on (sv_async_pool_idle);
3325 sv_Coro = newSVpv ("Coro", 0); SvREADONLY_on (sv_Coro);
3326 cv_pool_handler = get_cv ("Coro::pool_handler", GV_ADD); SvREADONLY_on (cv_pool_handler);
3327 cv_coro_state_new = get_cv ("Coro::State::new", 0); SvREADONLY_on (cv_coro_state_new);
3328
3329 coro_stash = gv_stashpv ("Coro", TRUE);
3330
3331 newCONSTSUB (coro_stash, "PRIO_MAX", newSViv (CORO_PRIO_MAX));
3332 newCONSTSUB (coro_stash, "PRIO_HIGH", newSViv (CORO_PRIO_HIGH));
3333 newCONSTSUB (coro_stash, "PRIO_NORMAL", newSViv (CORO_PRIO_NORMAL));
3334 newCONSTSUB (coro_stash, "PRIO_LOW", newSViv (CORO_PRIO_LOW));
3335 newCONSTSUB (coro_stash, "PRIO_IDLE", newSViv (CORO_PRIO_IDLE));
3336 newCONSTSUB (coro_stash, "PRIO_MIN", newSViv (CORO_PRIO_MIN));
3337
3338 {
3339 SV *sv = coro_get_sv (aTHX_ "Coro::API", TRUE);
3340
3341 coroapi.schedule = api_schedule;
3342 coroapi.schedule_to = api_schedule_to;
3343 coroapi.cede = api_cede;
3344 coroapi.cede_notself = api_cede_notself;
3345 coroapi.ready = api_ready;
3346 coroapi.is_ready = api_is_ready;
3347 coroapi.nready = coro_nready;
3348 coroapi.current = coro_current;
3349
3350 /*GCoroAPI = &coroapi;*/
3351 sv_setiv (sv, (IV)&coroapi);
3352 SvREADONLY_on (sv);
3353 }
3354}
3355
3356void
3357terminate (...)
3358 CODE:
3359 CORO_EXECUTE_SLF_XS (slf_init_terminate);
3360
3361void
3362schedule (...)
3363 CODE:
3364 CORO_EXECUTE_SLF_XS (slf_init_schedule);
3365
3366void
3367schedule_to (...)
3368 CODE:
3369 CORO_EXECUTE_SLF_XS (slf_init_schedule_to);
3370
3371void
3372cede_to (...)
3373 CODE:
3374 CORO_EXECUTE_SLF_XS (slf_init_cede_to);
3375
3376void
3377cede (...)
3378 CODE:
3379 CORO_EXECUTE_SLF_XS (slf_init_cede);
3380
3381void
3382cede_notself (...)
3383 CODE:
3384 CORO_EXECUTE_SLF_XS (slf_init_cede_notself);
3385
3386void
3387_set_current (SV *current)
3388 PROTOTYPE: $
3389 CODE:
3390 SvREFCNT_dec (SvRV (coro_current));
3391 SvRV_set (coro_current, SvREFCNT_inc_NN (SvRV (current)));
3392
3393void
3394_set_readyhook (SV *hook)
3395 PROTOTYPE: $
3396 CODE:
3397 SvREFCNT_dec (coro_readyhook);
3398 SvGETMAGIC (hook);
3399 coro_readyhook = SvOK (hook) ? newSVsv (hook) : 0;
3400
3401int
3402prio (Coro::State coro, int newprio = 0)
3403 PROTOTYPE: $;$
3404 ALIAS:
3405 nice = 1
3406 CODE:
3407{
3408 RETVAL = coro->prio;
3409
3410 if (items > 1)
3411 {
3412 if (ix)
3413 newprio = coro->prio - newprio;
3414
3415 if (newprio < CORO_PRIO_MIN) newprio = CORO_PRIO_MIN;
3416 if (newprio > CORO_PRIO_MAX) newprio = CORO_PRIO_MAX;
3417
3418 coro->prio = newprio;
3419 }
3420}
3421 OUTPUT:
3422 RETVAL
3423
3424SV *
3425ready (SV *self)
3426 PROTOTYPE: $
3427 CODE:
3428 RETVAL = boolSV (api_ready (aTHX_ self));
3429 OUTPUT:
3430 RETVAL
3431
3432int
3433nready (...)
3434 PROTOTYPE:
3435 CODE:
3436 RETVAL = coro_nready;
3437 OUTPUT:
3438 RETVAL
3439
3440void
3441suspend (Coro::State self)
3442 PROTOTYPE: $
3443 CODE:
3444 self->flags |= CF_SUSPENDED;
3445
3446void
3447resume (Coro::State self)
3448 PROTOTYPE: $
3449 CODE:
3450 self->flags &= ~CF_SUSPENDED;
3451
3452void
3453_pool_handler (...)
3454 CODE:
3455 CORO_EXECUTE_SLF_XS (slf_init_pool_handler);
3456
3457void
3458async_pool (SV *cv, ...)
3459 PROTOTYPE: &@
3460 PPCODE:
3461{
3462 HV *hv = (HV *)av_pop (av_async_pool);
3463 AV *av = newAV ();
3464 SV *cb = ST (0);
3465 int i;
3466
3467 av_extend (av, items - 2);
3468 for (i = 1; i < items; ++i)
3469 av_push (av, SvREFCNT_inc_NN (ST (i)));
3470
3471 if ((SV *)hv == &PL_sv_undef)
3472 {
3473 PUSHMARK (SP);
3474 EXTEND (SP, 2);
3475 PUSHs (sv_Coro);
3476 PUSHs ((SV *)cv_pool_handler);
430 PUTBACK; 3477 PUTBACK;
431 SAVE(aTHX_ (&temp)); 3478 call_sv ((SV *)cv_coro_state_new, G_SCALAR);
432 LOAD(aTHX_ coro);
433
434 S_nuke_stacks ();
435 SvREFCNT_dec ((SV *)GvAV (PL_defgv));
436
437 LOAD((&temp));
438 SPAGAIN; 3479 SPAGAIN;
3480
3481 hv = (HV *)SvREFCNT_inc_NN (SvRV (POPs));
439 } 3482 }
440 3483
3484 {
3485 struct coro *coro = SvSTATE_hv (hv);
3486
3487 assert (!coro->invoke_cb);
3488 assert (!coro->invoke_av);
3489 coro->invoke_cb = SvREFCNT_inc (cb);
3490 coro->invoke_av = av;
3491 }
3492
3493 api_ready (aTHX_ (SV *)hv);
3494
3495 if (GIMME_V != G_VOID)
3496 XPUSHs (sv_2mortal (newRV_noinc ((SV *)hv)));
3497 else
441 SvREFCNT_dec (coro->args); 3498 SvREFCNT_dec (hv);
442 Safefree (coro); 3499}
443 3500
3501SV *
3502rouse_cb ()
3503 PROTOTYPE:
3504 CODE:
3505 RETVAL = coro_new_rouse_cb (aTHX);
3506 OUTPUT:
3507 RETVAL
444 3508
3509void
3510rouse_wait (...)
3511 PROTOTYPE: ;$
3512 PPCODE:
3513 CORO_EXECUTE_SLF_XS (slf_init_rouse_wait);
3514
3515void
3516on_enter (SV *block)
3517 ALIAS:
3518 on_leave = 1
3519 PROTOTYPE: &
3520 CODE:
3521{
3522 struct coro *coro = SvSTATE_current;
3523 AV **avp = ix ? &coro->on_leave : &coro->on_enter;
3524
3525 block = s_get_cv_croak (block);
3526
3527 if (!*avp)
3528 *avp = newAV ();
3529
3530 av_push (*avp, SvREFCNT_inc (block));
3531
3532 if (!ix)
3533 on_enterleave_call (aTHX_ block);
3534
3535 LEAVE; /* pp_entersub unfortunately forces an ENTER/LEAVE around XS calls */
3536 SAVEDESTRUCTOR_X (ix ? coro_pop_on_leave : coro_pop_on_enter, (void *)coro);
3537 ENTER; /* pp_entersub unfortunately forces an ENTER/LEAVE around XS calls */
3538}
3539
3540
3541MODULE = Coro::State PACKAGE = PerlIO::cede
3542
3543BOOT:
3544 PerlIO_define_layer (aTHX_ &PerlIO_cede);
3545
3546
3547MODULE = Coro::State PACKAGE = Coro::Semaphore
3548
3549SV *
3550new (SV *klass, SV *count = 0)
3551 CODE:
3552{
3553 int semcnt = 1;
3554
3555 if (count)
3556 {
3557 SvGETMAGIC (count);
3558
3559 if (SvOK (count))
3560 semcnt = SvIV (count);
3561 }
3562
3563 RETVAL = sv_bless (
3564 coro_waitarray_new (aTHX_ semcnt),
3565 GvSTASH (CvGV (cv))
3566 );
3567}
3568 OUTPUT:
3569 RETVAL
3570
3571# helper for Coro::Channel and others
3572SV *
3573_alloc (int count)
3574 CODE:
3575 RETVAL = coro_waitarray_new (aTHX_ count);
3576 OUTPUT:
3577 RETVAL
3578
3579SV *
3580count (SV *self)
3581 CODE:
3582 RETVAL = newSVsv (AvARRAY ((AV *)SvRV (self))[0]);
3583 OUTPUT:
3584 RETVAL
3585
3586void
3587up (SV *self, int adjust = 1)
3588 ALIAS:
3589 adjust = 1
3590 CODE:
3591 coro_semaphore_adjust (aTHX_ (AV *)SvRV (self), ix ? adjust : 1);
3592
3593void
3594down (...)
3595 CODE:
3596 CORO_EXECUTE_SLF_XS (slf_init_semaphore_down);
3597
3598void
3599wait (...)
3600 CODE:
3601 CORO_EXECUTE_SLF_XS (slf_init_semaphore_wait);
3602
3603void
3604try (SV *self)
3605 PPCODE:
3606{
3607 AV *av = (AV *)SvRV (self);
3608 SV *count_sv = AvARRAY (av)[0];
3609 IV count = SvIVX (count_sv);
3610
3611 if (count > 0)
3612 {
3613 --count;
3614 SvIVX (count_sv) = count;
3615 XSRETURN_YES;
3616 }
3617 else
3618 XSRETURN_NO;
3619}
3620
3621void
3622waiters (SV *self)
3623 PPCODE:
3624{
3625 AV *av = (AV *)SvRV (self);
3626 int wcount = AvFILLp (av) + 1 - 1;
3627
3628 if (GIMME_V == G_SCALAR)
3629 XPUSHs (sv_2mortal (newSViv (wcount)));
3630 else
3631 {
3632 int i;
3633 EXTEND (SP, wcount);
3634 for (i = 1; i <= wcount; ++i)
3635 PUSHs (sv_2mortal (newRV_inc (AvARRAY (av)[i])));
3636 }
3637}
3638
3639MODULE = Coro::State PACKAGE = Coro::SemaphoreSet
3640
3641void
3642_may_delete (SV *sem, int count, int extra_refs)
3643 PPCODE:
3644{
3645 AV *av = (AV *)SvRV (sem);
3646
3647 if (SvREFCNT ((SV *)av) == 1 + extra_refs
3648 && AvFILLp (av) == 0 /* no waiters, just count */
3649 && SvIV (AvARRAY (av)[0]) == count)
3650 XSRETURN_YES;
3651
3652 XSRETURN_NO;
3653}
3654
3655MODULE = Coro::State PACKAGE = Coro::Signal
3656
3657SV *
3658new (SV *klass)
3659 CODE:
3660 RETVAL = sv_bless (
3661 coro_waitarray_new (aTHX_ 0),
3662 GvSTASH (CvGV (cv))
3663 );
3664 OUTPUT:
3665 RETVAL
3666
3667void
3668wait (...)
3669 CODE:
3670 CORO_EXECUTE_SLF_XS (slf_init_signal_wait);
3671
3672void
3673broadcast (SV *self)
3674 CODE:
3675{
3676 AV *av = (AV *)SvRV (self);
3677 coro_signal_wake (aTHX_ av, AvFILLp (av));
3678}
3679
3680void
3681send (SV *self)
3682 CODE:
3683{
3684 AV *av = (AV *)SvRV (self);
3685
3686 if (AvFILLp (av))
3687 coro_signal_wake (aTHX_ av, 1);
3688 else
3689 SvIVX (AvARRAY (av)[0]) = 1; /* remember the signal */
3690}
3691
3692IV
3693awaited (SV *self)
3694 CODE:
3695 RETVAL = AvFILLp ((AV *)SvRV (self)) + 1 - 1;
3696 OUTPUT:
3697 RETVAL
3698
3699
3700MODULE = Coro::State PACKAGE = Coro::AnyEvent
3701
3702BOOT:
3703 sv_activity = coro_get_sv (aTHX_ "Coro::AnyEvent::ACTIVITY", TRUE);
3704
3705void
3706_schedule (...)
3707 CODE:
3708{
3709 static int incede;
3710
3711 api_cede_notself (aTHX);
3712
3713 ++incede;
3714 while (coro_nready >= incede && api_cede (aTHX))
3715 ;
3716
3717 sv_setsv (sv_activity, &PL_sv_undef);
3718 if (coro_nready >= incede)
3719 {
3720 PUSHMARK (SP);
3721 PUTBACK;
3722 call_pv ("Coro::AnyEvent::_activity", G_KEEPERR | G_EVAL | G_VOID | G_DISCARD);
3723 }
3724
3725 --incede;
3726}
3727
3728
3729MODULE = Coro::State PACKAGE = Coro::AIO
3730
3731void
3732_register (char *target, char *proto, SV *req)
3733 CODE:
3734{
3735 SV *req_cv = s_get_cv_croak (req);
3736 /* newXSproto doesn't return the CV on 5.8 */
3737 CV *slf_cv = newXS (target, coro_aio_req_xs, __FILE__);
3738 sv_setpv ((SV *)slf_cv, proto);
3739 sv_magicext ((SV *)slf_cv, (SV *)req_cv, CORO_MAGIC_type_aio, 0, 0, 0);
3740}
3741
3742MODULE = Coro::State PACKAGE = Coro::Select
3743
3744void
3745patch_pp_sselect ()
3746 CODE:
3747 if (!coro_old_pp_sselect)
3748 {
3749 coro_select_select = (SV *)get_cv ("Coro::Select::select", 0);
3750 coro_old_pp_sselect = PL_ppaddr [OP_SSELECT];
3751 PL_ppaddr [OP_SSELECT] = coro_pp_sselect;
3752 }
3753
3754void
3755unpatch_pp_sselect ()
3756 CODE:
3757 if (coro_old_pp_sselect)
3758 {
3759 PL_ppaddr [OP_SSELECT] = coro_old_pp_sselect;
3760 coro_old_pp_sselect = 0;
3761 }
3762
3763

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines