ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/Coro/Coro/State.xs
(Generate patch)

Comparing Coro/Coro/State.xs (file contents):
Revision 1.5 by root, Tue Jul 17 02:55:29 2001 UTC vs.
Revision 1.363 by root, Tue Jul 14 00:55:10 2009 UTC

1#include "libcoro/coro.c"
2
3#define PERL_NO_GET_CONTEXT
4#define PERL_EXT
5
1#include "EXTERN.h" 6#include "EXTERN.h"
2#include "perl.h" 7#include "perl.h"
3#include "XSUB.h" 8#include "XSUB.h"
9#include "perliol.h"
4 10
5#if 0 11#include "schmorp.h"
6# define CHK(x) (void *)0 12
13#include <stdio.h>
14#include <errno.h>
15#include <assert.h>
16
17#ifdef WIN32
18# undef setjmp
19# undef longjmp
20# undef _exit
21# define setjmp _setjmp /* deep magic */
7#else 22#else
8# define CHK(x) if (!(x)) croak("FATAL, CHK: " #x) 23# include <inttypes.h> /* most portable stdint.h */
9#endif 24#endif
10 25
26#ifdef HAVE_MMAP
27# include <unistd.h>
28# include <sys/mman.h>
29# ifndef MAP_ANONYMOUS
30# ifdef MAP_ANON
31# define MAP_ANONYMOUS MAP_ANON
32# else
33# undef HAVE_MMAP
34# endif
35# endif
36# include <limits.h>
37# ifndef PAGESIZE
38# define PAGESIZE pagesize
39# define BOOT_PAGESIZE pagesize = sysconf (_SC_PAGESIZE)
40static long pagesize;
41# else
42# define BOOT_PAGESIZE (void)0
43# endif
44#else
45# define PAGESIZE 0
46# define BOOT_PAGESIZE (void)0
47#endif
48
49#if CORO_USE_VALGRIND
50# include <valgrind/valgrind.h>
51#endif
52
53/* the maximum number of idle cctx that will be pooled */
54static int cctx_max_idle = 4;
55
56#if !__i386 && !__x86_64 && !__powerpc && !__m68k && !__alpha && !__mips && !__sparc64
57# undef CORO_STACKGUARD
58#endif
59
60#ifndef CORO_STACKGUARD
61# define CORO_STACKGUARD 0
62#endif
63
64/* prefer perl internal functions over our own? */
65#ifndef CORO_PREFER_PERL_FUNCTIONS
66# define CORO_PREFER_PERL_FUNCTIONS 0
67#endif
68
69/* The next macros try to return the current stack pointer, in an as
70 * portable way as possible. */
71#if __GNUC__ >= 4
72# define dSTACKLEVEL int stacklevel_dummy
73# define STACKLEVEL __builtin_frame_address (0)
74#else
75# define dSTACKLEVEL volatile void *stacklevel
76# define STACKLEVEL ((void *)&stacklevel)
77#endif
78
79#define IN_DESTRUCT PL_dirty
80
81#if __GNUC__ >= 3
82# define attribute(x) __attribute__(x)
83# define expect(expr,value) __builtin_expect ((expr), (value))
84# define INLINE static inline
85#else
86# define attribute(x)
87# define expect(expr,value) (expr)
88# define INLINE static
89#endif
90
91#define expect_false(expr) expect ((expr) != 0, 0)
92#define expect_true(expr) expect ((expr) != 0, 1)
93
94#define NOINLINE attribute ((noinline))
95
96#include "CoroAPI.h"
97#define GCoroAPI (&coroapi) /* very sneaky */
98
99#ifdef USE_ITHREADS
100# if CORO_PTHREAD
101static void *coro_thx;
102# endif
103#endif
104
105#ifdef __linux
106# include <time.h> /* for timespec */
107# include <syscall.h> /* for SYS_* */
108# ifdef SYS_clock_gettime
109# define coro_clock_gettime(id, ts) syscall (SYS_clock_gettime, (id), (ts))
110# define CORO_CLOCK_MONOTONIC 1
111# define CORO_CLOCK_THREAD_CPUTIME_ID 3
112# endif
113#endif
114
115static double (*nvtime)(); /* so why doesn't it take void? */
116static void (*u2time)(pTHX_ UV ret[2]);
117
118/* we hijack an hopefully unused CV flag for our purposes */
119#define CVf_SLF 0x4000
120static OP *pp_slf (pTHX);
121
122static U32 cctx_gen;
123static size_t cctx_stacksize = CORO_STACKSIZE;
124static struct CoroAPI coroapi;
125static AV *main_mainstack; /* used to differentiate between $main and others */
126static JMPENV *main_top_env;
127static HV *coro_state_stash, *coro_stash;
128static volatile SV *coro_mortal; /* will be freed/thrown after next transfer */
129
130static AV *av_destroy; /* destruction queue */
131static SV *sv_manager; /* the manager coro */
132static SV *sv_idle; /* $Coro::idle */
133
134static GV *irsgv; /* $/ */
135static GV *stdoutgv; /* *STDOUT */
136static SV *rv_diehook;
137static SV *rv_warnhook;
138static HV *hv_sig; /* %SIG */
139
140/* async_pool helper stuff */
141static SV *sv_pool_rss;
142static SV *sv_pool_size;
143static SV *sv_async_pool_idle; /* description string */
144static AV *av_async_pool; /* idle pool */
145static SV *sv_Coro; /* class string */
146static CV *cv_pool_handler;
147static CV *cv_coro_state_new;
148
149/* Coro::AnyEvent */
150static SV *sv_activity;
151
152/* enable processtime/realtime profiling */
153static char enable_times;
154typedef U32 coro_ts[2];
155static coro_ts time_real, time_cpu;
156static char times_valid;
157
158static struct coro_cctx *cctx_first;
159static int cctx_count, cctx_idle;
160
161enum {
162 CC_MAPPED = 0x01,
163 CC_NOREUSE = 0x02, /* throw this away after tracing */
164 CC_TRACE = 0x04,
165 CC_TRACE_SUB = 0x08, /* trace sub calls */
166 CC_TRACE_LINE = 0x10, /* trace each statement */
167 CC_TRACE_ALL = CC_TRACE_SUB | CC_TRACE_LINE,
168};
169
170/* this is a structure representing a c-level coroutine */
171typedef struct coro_cctx
172{
173 struct coro_cctx *next;
174
175 /* the stack */
176 void *sptr;
177 size_t ssize;
178
179 /* cpu state */
180 void *idle_sp; /* sp of top-level transfer/schedule/cede call */
181 JMPENV *idle_te; /* same as idle_sp, but for top_env, TODO: remove once stable */
182 JMPENV *top_env;
183 coro_context cctx;
184
185 U32 gen;
186#if CORO_USE_VALGRIND
187 int valgrind_id;
188#endif
189 unsigned char flags;
190} coro_cctx;
191
192coro_cctx *cctx_current; /* the currently running cctx */
193
194/*****************************************************************************/
195
196enum {
197 CF_RUNNING = 0x0001, /* coroutine is running */
198 CF_READY = 0x0002, /* coroutine is ready */
199 CF_NEW = 0x0004, /* has never been switched to */
200 CF_DESTROYED = 0x0008, /* coroutine data has been freed */
201 CF_SUSPENDED = 0x0010, /* coroutine can't be scheduled */
202};
203
204/* the structure where most of the perl state is stored, overlaid on the cxstack */
205typedef struct
206{
207 SV *defsv;
208 AV *defav;
209 SV *errsv;
210 SV *irsgv;
211 HV *hinthv;
212#define VAR(name,type) type name;
213# include "state.h"
214#undef VAR
215} perl_slots;
216
217#define SLOT_COUNT ((sizeof (perl_slots) + sizeof (PERL_CONTEXT) - 1) / sizeof (PERL_CONTEXT))
218
219/* this is a structure representing a perl-level coroutine */
11struct coro { 220struct coro {
12 U8 dowarn; 221 /* the C coroutine allocated to this perl coroutine, if any */
13 AV *defav; 222 coro_cctx *cctx;
14 223
15 PERL_SI *curstackinfo; 224 /* ready queue */
16 AV *curstack; 225 struct coro *next_ready;
226
227 /* state data */
228 struct CoroSLF slf_frame; /* saved slf frame */
17 AV *mainstack; 229 AV *mainstack;
18 SV **stack_sp; 230 perl_slots *slot; /* basically the saved sp */
19 OP *op;
20 SV **curpad;
21 SV **stack_base;
22 SV **stack_max;
23 SV **tmps_stack;
24 I32 tmps_floor;
25 I32 tmps_ix;
26 I32 tmps_max;
27 I32 *markstack;
28 I32 *markstack_ptr;
29 I32 *markstack_max;
30 I32 *scopestack;
31 I32 scopestack_ix;
32 I32 scopestack_max;
33 ANY *savestack;
34 I32 savestack_ix;
35 I32 savestack_max;
36 OP **retstack;
37 I32 retstack_ix;
38 I32 retstack_max;
39 COP *curcop;
40 231
41 AV *args; 232 CV *startcv; /* the CV to execute */
233 AV *args; /* data associated with this coroutine (initial args) */
234 int refcnt; /* coroutines are refcounted, yes */
235 int flags; /* CF_ flags */
236 HV *hv; /* the perl hash associated with this coro, if any */
237 void (*on_destroy)(pTHX_ struct coro *coro);
238
239 /* statistics */
240 int usecount; /* number of transfers to this coro */
241
242 /* coro process data */
243 int prio;
244 SV *except; /* exception to be thrown */
245 SV *rouse_cb;
246
247 /* async_pool */
248 SV *saved_deffh;
249 SV *invoke_cb;
250 AV *invoke_av;
251
252 /* on_enter/on_leave */
253 AV *on_enter;
254 AV *on_leave;
255
256 /* times */
257 coro_ts t_cpu, t_real;
258
259 /* linked list */
260 struct coro *next, *prev;
42}; 261};
43 262
44typedef struct coro *Coro__State; 263typedef struct coro *Coro__State;
45typedef struct coro *Coro__State_or_hashref; 264typedef struct coro *Coro__State_or_hashref;
46 265
47static HV *padlist_cache; 266/* the following variables are effectively part of the perl context */
267/* and get copied between struct coro and these variables */
268/* the mainr easonw e don't support windows process emulation */
269static struct CoroSLF slf_frame; /* the current slf frame */
48 270
49/* mostly copied from op.c:cv_clone2 */ 271/** Coro ********************************************************************/
50STATIC AV * 272
51clone_padlist (AV *protopadlist) 273#define CORO_PRIO_MAX 3
274#define CORO_PRIO_HIGH 1
275#define CORO_PRIO_NORMAL 0
276#define CORO_PRIO_LOW -1
277#define CORO_PRIO_IDLE -3
278#define CORO_PRIO_MIN -4
279
280/* for Coro.pm */
281static SV *coro_current;
282static SV *coro_readyhook;
283static struct coro *coro_ready [CORO_PRIO_MAX - CORO_PRIO_MIN + 1][2]; /* head|tail */
284static CV *cv_coro_run, *cv_coro_terminate;
285static struct coro *coro_first;
286#define coro_nready coroapi.nready
287
288/** lowlevel stuff **********************************************************/
289
290static SV *
291coro_get_sv (pTHX_ const char *name, int create)
52{ 292{
53 AV *av; 293#if PERL_VERSION_ATLEAST (5,10,0)
54 I32 ix; 294 /* silence stupid and wrong 5.10 warning that I am unable to switch off */
55 AV *protopad_name = (AV *) * av_fetch (protopadlist, 0, FALSE); 295 get_sv (name, create);
56 AV *protopad = (AV *) * av_fetch (protopadlist, 1, FALSE); 296#endif
57 SV **pname = AvARRAY (protopad_name); 297 return get_sv (name, create);
58 SV **ppad = AvARRAY (protopad); 298}
59 I32 fname = AvFILLp (protopad_name); 299
60 I32 fpad = AvFILLp (protopad); 300static AV *
301coro_get_av (pTHX_ const char *name, int create)
302{
303#if PERL_VERSION_ATLEAST (5,10,0)
304 /* silence stupid and wrong 5.10 warning that I am unable to switch off */
305 get_av (name, create);
306#endif
307 return get_av (name, create);
308}
309
310static HV *
311coro_get_hv (pTHX_ const char *name, int create)
312{
313#if PERL_VERSION_ATLEAST (5,10,0)
314 /* silence stupid and wrong 5.10 warning that I am unable to switch off */
315 get_hv (name, create);
316#endif
317 return get_hv (name, create);
318}
319
320INLINE void
321coro_times_update ()
322{
323#ifdef coro_clock_gettime
324 struct timespec ts;
325
326 ts.tv_sec = ts.tv_nsec = 0;
327 coro_clock_gettime (CORO_CLOCK_THREAD_CPUTIME_ID, &ts);
328 time_cpu [0] = ts.tv_sec; time_cpu [1] = ts.tv_nsec;
329
330 ts.tv_sec = ts.tv_nsec = 0;
331 coro_clock_gettime (CORO_CLOCK_MONOTONIC, &ts);
332 time_real [0] = ts.tv_sec; time_real [1] = ts.tv_nsec;
333#else
334 dTHX;
335 UV tv[2];
336
337 u2time (aTHX_ tv);
338 time_real [0] = tv [0];
339 time_real [1] = tv [1] * 1000;
340#endif
341}
342
343INLINE void
344coro_times_add (struct coro *c)
345{
346 c->t_real [1] += time_real [1];
347 if (c->t_real [1] > 1000000000) { c->t_real [1] -= 1000000000; ++c->t_real [0]; }
348 c->t_real [0] += time_real [0];
349
350 c->t_cpu [1] += time_cpu [1];
351 if (c->t_cpu [1] > 1000000000) { c->t_cpu [1] -= 1000000000; ++c->t_cpu [0]; }
352 c->t_cpu [0] += time_cpu [0];
353}
354
355INLINE void
356coro_times_sub (struct coro *c)
357{
358 if (c->t_real [1] < time_real [1]) { c->t_real [1] += 1000000000; --c->t_real [0]; }
359 c->t_real [1] -= time_real [1];
360 c->t_real [0] -= time_real [0];
361
362 if (c->t_cpu [1] < time_cpu [1]) { c->t_cpu [1] += 1000000000; --c->t_cpu [0]; }
363 c->t_cpu [1] -= time_cpu [1];
364 c->t_cpu [0] -= time_cpu [0];
365}
366
367/*****************************************************************************/
368/* magic glue */
369
370#define CORO_MAGIC_type_cv 26
371#define CORO_MAGIC_type_state PERL_MAGIC_ext
372
373#define CORO_MAGIC_NN(sv, type) \
374 (expect_true (SvMAGIC (sv)->mg_type == type) \
375 ? SvMAGIC (sv) \
376 : mg_find (sv, type))
377
378#define CORO_MAGIC(sv, type) \
379 (expect_true (SvMAGIC (sv)) \
380 ? CORO_MAGIC_NN (sv, type) \
381 : 0)
382
383#define CORO_MAGIC_cv(cv) CORO_MAGIC (((SV *)(cv)), CORO_MAGIC_type_cv)
384#define CORO_MAGIC_state(sv) CORO_MAGIC_NN (((SV *)(sv)), CORO_MAGIC_type_state)
385
386INLINE struct coro *
387SvSTATE_ (pTHX_ SV *coro)
388{
389 HV *stash;
390 MAGIC *mg;
391
392 if (SvROK (coro))
393 coro = SvRV (coro);
394
395 if (expect_false (SvTYPE (coro) != SVt_PVHV))
396 croak ("Coro::State object required");
397
398 stash = SvSTASH (coro);
399 if (expect_false (stash != coro_stash && stash != coro_state_stash))
400 {
401 /* very slow, but rare, check */
402 if (!sv_derived_from (sv_2mortal (newRV_inc (coro)), "Coro::State"))
403 croak ("Coro::State object required");
404 }
405
406 mg = CORO_MAGIC_state (coro);
407 return (struct coro *)mg->mg_ptr;
408}
409
410#define SvSTATE(sv) SvSTATE_ (aTHX_ (sv))
411
412/* faster than SvSTATE, but expects a coroutine hv */
413#define SvSTATE_hv(hv) ((struct coro *)CORO_MAGIC_NN ((SV *)hv, CORO_MAGIC_type_state)->mg_ptr)
414#define SvSTATE_current SvSTATE_hv (SvRV (coro_current))
415
416/*****************************************************************************/
417/* padlist management and caching */
418
419static AV *
420coro_derive_padlist (pTHX_ CV *cv)
421{
422 AV *padlist = CvPADLIST (cv);
61 AV *newpadlist, *newpad_name, *newpad; 423 AV *newpadlist, *newpad;
62 SV **npad;
63
64 newpad_name = newAV ();
65 for (ix = fname; ix >= 0; ix--)
66 av_store (newpad_name, ix, SvREFCNT_inc (pname[ix]));
67
68 newpad = newAV ();
69 av_fill (newpad, AvFILLp (protopad));
70 npad = AvARRAY (newpad);
71 424
72 newpadlist = newAV (); 425 newpadlist = newAV ();
73 AvREAL_off (newpadlist); 426 AvREAL_off (newpadlist);
74 av_store (newpadlist, 0, (SV *) newpad_name); 427#if PERL_VERSION_ATLEAST (5,10,0)
428 Perl_pad_push (aTHX_ padlist, AvFILLp (padlist) + 1);
429#else
430 Perl_pad_push (aTHX_ padlist, AvFILLp (padlist) + 1, 1);
431#endif
432 newpad = (AV *)AvARRAY (padlist)[AvFILLp (padlist)];
433 --AvFILLp (padlist);
434
435 av_store (newpadlist, 0, SvREFCNT_inc_NN (AvARRAY (padlist)[0]));
75 av_store (newpadlist, 1, (SV *) newpad); 436 av_store (newpadlist, 1, (SV *)newpad);
76 437
77 av = newAV (); /* will be @_ */ 438 return newpadlist;
78 av_extend (av, 0); 439}
79 av_store (newpad, 0, (SV *) av);
80 AvFLAGS (av) = AVf_REIFY;
81 440
82 for (ix = fpad; ix > 0; ix--) 441static void
442free_padlist (pTHX_ AV *padlist)
443{
444 /* may be during global destruction */
445 if (!IN_DESTRUCT)
83 { 446 {
84 SV *namesv = (ix <= fname) ? pname[ix] : Nullsv; 447 I32 i = AvFILLp (padlist);
85 if (namesv && namesv != &PL_sv_undef) 448
449 while (i > 0) /* special-case index 0 */
86 { 450 {
87 char *name = SvPVX (namesv); /* XXX */ 451 /* we try to be extra-careful here */
88 if (SvFLAGS (namesv) & SVf_FAKE || *name == '&') 452 AV *av = (AV *)AvARRAY (padlist)[i--];
89 { /* lexical from outside? */ 453 I32 j = AvFILLp (av);
90 npad[ix] = SvREFCNT_inc (ppad[ix]); 454
91 } 455 while (j >= 0)
92 else 456 SvREFCNT_dec (AvARRAY (av)[j--]);
93 { /* our own lexical */ 457
94 SV *sv; 458 AvFILLp (av) = -1;
95 if (*name == '&') 459 SvREFCNT_dec (av);
96 sv = SvREFCNT_inc (ppad[ix]);
97 else if (*name == '@')
98 sv = (SV *) newAV ();
99 else if (*name == '%')
100 sv = (SV *) newHV ();
101 else
102 sv = NEWSV (0, 0);
103 if (!SvPADBUSY (sv))
104 SvPADMY_on (sv);
105 npad[ix] = sv;
106 }
107 } 460 }
108 else if (IS_PADGV (ppad[ix]) || IS_PADCONST (ppad[ix])) 461
109 { 462 SvREFCNT_dec (AvARRAY (padlist)[0]);
110 npad[ix] = SvREFCNT_inc (ppad[ix]); 463
111 } 464 AvFILLp (padlist) = -1;
465 SvREFCNT_dec ((SV*)padlist);
466 }
467}
468
469static int
470coro_cv_free (pTHX_ SV *sv, MAGIC *mg)
471{
472 AV *padlist;
473 AV *av = (AV *)mg->mg_obj;
474
475 /* perl manages to free our internal AV and _then_ call us */
476 if (IN_DESTRUCT)
477 return 0;
478
479 /* casting is fun. */
480 while (&PL_sv_undef != (SV *)(padlist = (AV *)av_pop (av)))
481 free_padlist (aTHX_ padlist);
482
483 SvREFCNT_dec (av); /* sv_magicext increased the refcount */
484
485 return 0;
486}
487
488static MGVTBL coro_cv_vtbl = {
489 0, 0, 0, 0,
490 coro_cv_free
491};
492
493/* the next two functions merely cache the padlists */
494static void
495get_padlist (pTHX_ CV *cv)
496{
497 MAGIC *mg = CORO_MAGIC_cv (cv);
498 AV *av;
499
500 if (expect_true (mg && AvFILLp ((av = (AV *)mg->mg_obj)) >= 0))
501 CvPADLIST (cv) = (AV *)AvARRAY (av)[AvFILLp (av)--];
112 else 502 else
113 { 503 {
114 SV *sv = NEWSV (0, 0); 504#if CORO_PREFER_PERL_FUNCTIONS
115 SvPADTMP_on (sv); 505 /* this is probably cleaner? but also slower! */
116 npad[ix] = sv; 506 /* in practise, it seems to be less stable */
117 } 507 CV *cp = Perl_cv_clone (aTHX_ cv);
118 } 508 CvPADLIST (cv) = CvPADLIST (cp);
119 509 CvPADLIST (cp) = 0;
120#if 0 /* NONOTUNDERSTOOD */ 510 SvREFCNT_dec (cp);
121 /* Now that vars are all in place, clone nested closures. */ 511#else
122 512 CvPADLIST (cv) = coro_derive_padlist (aTHX_ cv);
123 for (ix = fpad; ix > 0; ix--) {
124 SV* namesv = (ix <= fname) ? pname[ix] : Nullsv;
125 if (namesv
126 && namesv != &PL_sv_undef
127 && !(SvFLAGS(namesv) & SVf_FAKE)
128 && *SvPVX(namesv) == '&'
129 && CvCLONE(ppad[ix]))
130 {
131 CV *kid = cv_clone((CV*)ppad[ix]);
132 SvREFCNT_dec(ppad[ix]);
133 CvCLONE_on(kid);
134 SvPADMY_on(kid);
135 npad[ix] = (SV*)kid;
136 }
137 }
138#endif 513#endif
139 514 }
140 return newpadlist;
141} 515}
142 516
143STATIC AV * 517static void
144free_padlist (AV *padlist) 518put_padlist (pTHX_ CV *cv)
145{ 519{
146 /* may be during global destruction */ 520 MAGIC *mg = CORO_MAGIC_cv (cv);
147 if (SvREFCNT(padlist)) 521 AV *av;
522
523 if (expect_false (!mg))
524 mg = sv_magicext ((SV *)cv, (SV *)newAV (), CORO_MAGIC_type_cv, &coro_cv_vtbl, 0, 0);
525
526 av = (AV *)mg->mg_obj;
527
528 if (expect_false (AvFILLp (av) >= AvMAX (av)))
529 av_extend (av, AvFILLp (av) + 1);
530
531 AvARRAY (av)[++AvFILLp (av)] = (SV *)CvPADLIST (cv);
532}
533
534/** load & save, init *******************************************************/
535
536static void
537on_enterleave_call (pTHX_ SV *cb);
538
539static void
540load_perl (pTHX_ Coro__State c)
541{
542 perl_slots *slot = c->slot;
543 c->slot = 0;
544
545 PL_mainstack = c->mainstack;
546
547 GvSV (PL_defgv) = slot->defsv;
548 GvAV (PL_defgv) = slot->defav;
549 GvSV (PL_errgv) = slot->errsv;
550 GvSV (irsgv) = slot->irsgv;
551 GvHV (PL_hintgv) = slot->hinthv;
552
553 #define VAR(name,type) PL_ ## name = slot->name;
554 # include "state.h"
555 #undef VAR
556
148 { 557 {
149 I32 i = AvFILLp(padlist); 558 dSP;
150 while (i >= 0) 559
560 CV *cv;
561
562 /* now do the ugly restore mess */
563 while (expect_true (cv = (CV *)POPs))
151 { 564 {
152 SV **svp = av_fetch(padlist, i--, FALSE); 565 put_padlist (aTHX_ cv); /* mark this padlist as available */
153 SV *sv = svp ? *svp : Nullsv; 566 CvDEPTH (cv) = PTR2IV (POPs);
154 if (sv) 567 CvPADLIST (cv) = (AV *)POPs;
155 SvREFCNT_dec(sv);
156 } 568 }
157 569
158 SvREFCNT_dec((SV*)padlist); 570 PUTBACK;
159 } 571 }
160}
161 572
162/* the next tow functions merely cache the padlists */ 573 slf_frame = c->slf_frame;
163STATIC void 574 CORO_THROW = c->except;
164get_padlist (CV *cv)
165{
166 SV **he = hv_fetch (padlist_cache, (void *)&cv, sizeof (CV *), 0);
167 575
168 if (he && AvFILLp ((AV *)*he) >= 0) 576 if (expect_false (enable_times))
169 CvPADLIST (cv) = (AV *)av_pop ((AV *)*he);
170 else
171 CvPADLIST (cv) = clone_padlist (CvPADLIST (cv));
172}
173
174STATIC void
175put_padlist (CV *cv)
176{
177 SV **he = hv_fetch (padlist_cache, (void *)&cv, sizeof (CV *), 1);
178
179 if (SvTYPE (*he) != SVt_PVAV)
180 {
181 SvREFCNT_dec (*he);
182 *he = (SV *)newAV ();
183 } 577 {
578 if (expect_false (!times_valid))
579 coro_times_update ();
184 580
185 av_push ((AV *)*he, (SV *)CvPADLIST (cv)); 581 coro_times_sub (c);
186} 582 }
187 583
584 if (expect_false (c->on_enter))
585 {
586 int i;
587
588 for (i = 0; i <= AvFILLp (c->on_enter); ++i)
589 on_enterleave_call (aTHX_ AvARRAY (c->on_enter)[i]);
590 }
591}
592
188static void 593static void
189SAVE(pTHX_ Coro__State c) 594save_perl (pTHX_ Coro__State c)
190{ 595{
596 if (expect_false (c->on_leave))
597 {
598 int i;
599
600 for (i = AvFILLp (c->on_leave); i >= 0; --i)
601 on_enterleave_call (aTHX_ AvARRAY (c->on_leave)[i]);
602 }
603
604 times_valid = 0;
605
606 if (expect_false (enable_times))
607 {
608 coro_times_update (); times_valid = 1;
609 coro_times_add (c);
610 }
611
612 c->except = CORO_THROW;
613 c->slf_frame = slf_frame;
614
191 { 615 {
192 dSP; 616 dSP;
193 I32 cxix = cxstack_ix; 617 I32 cxix = cxstack_ix;
618 PERL_CONTEXT *ccstk = cxstack;
194 PERL_SI *top_si = PL_curstackinfo; 619 PERL_SI *top_si = PL_curstackinfo;
195 PERL_CONTEXT *ccstk = cxstack;
196 620
197 /* 621 /*
198 * the worst thing you can imagine happens first - we have to save 622 * the worst thing you can imagine happens first - we have to save
199 * (and reinitialize) all cv's in the whole callchain :( 623 * (and reinitialize) all cv's in the whole callchain :(
200 */ 624 */
201 625
202 PUSHs (Nullsv); 626 XPUSHs (Nullsv);
203 /* this loop was inspired by pp_caller */ 627 /* this loop was inspired by pp_caller */
204 for (;;) 628 for (;;)
205 { 629 {
206 while (cxix >= 0) 630 while (expect_true (cxix >= 0))
207 { 631 {
208 PERL_CONTEXT *cx = &ccstk[cxix--]; 632 PERL_CONTEXT *cx = &ccstk[cxix--];
209 633
210 if (CxTYPE(cx) == CXt_SUB) 634 if (expect_true (CxTYPE (cx) == CXt_SUB) || expect_false (CxTYPE (cx) == CXt_FORMAT))
211 { 635 {
212 CV *cv = cx->blk_sub.cv; 636 CV *cv = cx->blk_sub.cv;
637
213 if (CvDEPTH(cv)) 638 if (expect_true (CvDEPTH (cv)))
214 { 639 {
215#ifdef USE_THREADS
216 XPUSHs ((SV *)CvOWNER(cv));
217#endif
218 EXTEND (SP, 3); 640 EXTEND (SP, 3);
219 PUSHs ((SV *)CvDEPTH(cv));
220 PUSHs ((SV *)CvPADLIST(cv)); 641 PUSHs ((SV *)CvPADLIST (cv));
642 PUSHs (INT2PTR (SV *, (IV)CvDEPTH (cv)));
221 PUSHs ((SV *)cv); 643 PUSHs ((SV *)cv);
222 644
223 get_padlist (cv);
224
225 CvDEPTH(cv) = 0; 645 CvDEPTH (cv) = 0;
226#ifdef USE_THREADS 646 get_padlist (aTHX_ cv);
227 CvOWNER(cv) = 0;
228 error must unlock this cv etc.. etc...
229 if you are here wondering about this error message then
230 the reason is that it will not work as advertised yet
231#endif
232 } 647 }
233 } 648 }
234 else if (CxTYPE(cx) == CXt_FORMAT) 649 }
650
651 if (expect_true (top_si->si_type == PERLSI_MAIN))
652 break;
653
654 top_si = top_si->si_prev;
655 ccstk = top_si->si_cxstack;
656 cxix = top_si->si_cxix;
657 }
658
659 PUTBACK;
660 }
661
662 /* allocate some space on the context stack for our purposes */
663 /* we manually unroll here, as usually 2 slots is enough */
664 if (SLOT_COUNT >= 1) CXINC;
665 if (SLOT_COUNT >= 2) CXINC;
666 if (SLOT_COUNT >= 3) CXINC;
667 {
668 unsigned int i;
669 for (i = 3; i < SLOT_COUNT; ++i)
670 CXINC;
671 }
672 cxstack_ix -= SLOT_COUNT; /* undo allocation */
673
674 c->mainstack = PL_mainstack;
675
676 {
677 perl_slots *slot = c->slot = (perl_slots *)(cxstack + cxstack_ix + 1);
678
679 slot->defav = GvAV (PL_defgv);
680 slot->defsv = DEFSV;
681 slot->errsv = ERRSV;
682 slot->irsgv = GvSV (irsgv);
683 slot->hinthv = GvHV (PL_hintgv);
684
685 #define VAR(name,type) slot->name = PL_ ## name;
686 # include "state.h"
687 #undef VAR
688 }
689}
690
691/*
692 * allocate various perl stacks. This is almost an exact copy
693 * of perl.c:init_stacks, except that it uses less memory
694 * on the (sometimes correct) assumption that coroutines do
695 * not usually need a lot of stackspace.
696 */
697#if CORO_PREFER_PERL_FUNCTIONS
698# define coro_init_stacks(thx) init_stacks ()
699#else
700static void
701coro_init_stacks (pTHX)
702{
703 PL_curstackinfo = new_stackinfo(32, 8);
704 PL_curstackinfo->si_type = PERLSI_MAIN;
705 PL_curstack = PL_curstackinfo->si_stack;
706 PL_mainstack = PL_curstack; /* remember in case we switch stacks */
707
708 PL_stack_base = AvARRAY(PL_curstack);
709 PL_stack_sp = PL_stack_base;
710 PL_stack_max = PL_stack_base + AvMAX(PL_curstack);
711
712 New(50,PL_tmps_stack,32,SV*);
713 PL_tmps_floor = -1;
714 PL_tmps_ix = -1;
715 PL_tmps_max = 32;
716
717 New(54,PL_markstack,16,I32);
718 PL_markstack_ptr = PL_markstack;
719 PL_markstack_max = PL_markstack + 16;
720
721#ifdef SET_MARK_OFFSET
722 SET_MARK_OFFSET;
723#endif
724
725 New(54,PL_scopestack,8,I32);
726 PL_scopestack_ix = 0;
727 PL_scopestack_max = 8;
728
729 New(54,PL_savestack,24,ANY);
730 PL_savestack_ix = 0;
731 PL_savestack_max = 24;
732
733#if !PERL_VERSION_ATLEAST (5,10,0)
734 New(54,PL_retstack,4,OP*);
735 PL_retstack_ix = 0;
736 PL_retstack_max = 4;
737#endif
738}
739#endif
740
741/*
742 * destroy the stacks, the callchain etc...
743 */
744static void
745coro_destruct_stacks (pTHX)
746{
747 while (PL_curstackinfo->si_next)
748 PL_curstackinfo = PL_curstackinfo->si_next;
749
750 while (PL_curstackinfo)
751 {
752 PERL_SI *p = PL_curstackinfo->si_prev;
753
754 if (!IN_DESTRUCT)
755 SvREFCNT_dec (PL_curstackinfo->si_stack);
756
757 Safefree (PL_curstackinfo->si_cxstack);
758 Safefree (PL_curstackinfo);
759 PL_curstackinfo = p;
760 }
761
762 Safefree (PL_tmps_stack);
763 Safefree (PL_markstack);
764 Safefree (PL_scopestack);
765 Safefree (PL_savestack);
766#if !PERL_VERSION_ATLEAST (5,10,0)
767 Safefree (PL_retstack);
768#endif
769}
770
771#define CORO_RSS \
772 rss += sizeof (SYM (curstackinfo)); \
773 rss += (SYM (curstackinfo->si_cxmax) + 1) * sizeof (PERL_CONTEXT); \
774 rss += sizeof (SV) + sizeof (struct xpvav) + (1 + AvMAX (SYM (curstack))) * sizeof (SV *); \
775 rss += SYM (tmps_max) * sizeof (SV *); \
776 rss += (SYM (markstack_max) - SYM (markstack_ptr)) * sizeof (I32); \
777 rss += SYM (scopestack_max) * sizeof (I32); \
778 rss += SYM (savestack_max) * sizeof (ANY);
779
780static size_t
781coro_rss (pTHX_ struct coro *coro)
782{
783 size_t rss = sizeof (*coro);
784
785 if (coro->mainstack)
786 {
787 if (coro->flags & CF_RUNNING)
788 {
789 #define SYM(sym) PL_ ## sym
790 CORO_RSS;
791 #undef SYM
792 }
793 else
794 {
795 #define SYM(sym) coro->slot->sym
796 CORO_RSS;
797 #undef SYM
798 }
799 }
800
801 return rss;
802}
803
804/** coroutine stack handling ************************************************/
805
806static int (*orig_sigelem_get) (pTHX_ SV *sv, MAGIC *mg);
807static int (*orig_sigelem_set) (pTHX_ SV *sv, MAGIC *mg);
808static int (*orig_sigelem_clr) (pTHX_ SV *sv, MAGIC *mg);
809
810/* apparently < 5.8.8 */
811#ifndef MgPV_nolen_const
812#define MgPV_nolen_const(mg) (((((int)(mg)->mg_len)) == HEf_SVKEY) ? \
813 SvPV_nolen((SV*)((mg)->mg_ptr)) : \
814 (const char*)(mg)->mg_ptr)
815#endif
816
817/*
818 * This overrides the default magic get method of %SIG elements.
819 * The original one doesn't provide for reading back of PL_diehook/PL_warnhook
820 * and instead of trying to save and restore the hash elements, we just provide
821 * readback here.
822 */
823static int
824coro_sigelem_get (pTHX_ SV *sv, MAGIC *mg)
825{
826 const char *s = MgPV_nolen_const (mg);
827
828 if (*s == '_')
829 {
830 SV **svp = 0;
831
832 if (strEQ (s, "__DIE__" )) svp = &PL_diehook;
833 if (strEQ (s, "__WARN__")) svp = &PL_warnhook;
834
835 if (svp)
836 {
837 sv_setsv (sv, *svp ? *svp : &PL_sv_undef);
838 return 0;
839 }
840 }
841
842 return orig_sigelem_get ? orig_sigelem_get (aTHX_ sv, mg) : 0;
843}
844
845static int
846coro_sigelem_clr (pTHX_ SV *sv, MAGIC *mg)
847{
848 const char *s = MgPV_nolen_const (mg);
849
850 if (*s == '_')
851 {
852 SV **svp = 0;
853
854 if (strEQ (s, "__DIE__" )) svp = &PL_diehook;
855 if (strEQ (s, "__WARN__")) svp = &PL_warnhook;
856
857 if (svp)
858 {
859 SV *old = *svp;
860 *svp = 0;
861 SvREFCNT_dec (old);
862 return 0;
863 }
864 }
865
866 return orig_sigelem_clr ? orig_sigelem_clr (aTHX_ sv, mg) : 0;
867}
868
869static int
870coro_sigelem_set (pTHX_ SV *sv, MAGIC *mg)
871{
872 const char *s = MgPV_nolen_const (mg);
873
874 if (*s == '_')
875 {
876 SV **svp = 0;
877
878 if (strEQ (s, "__DIE__" )) svp = &PL_diehook;
879 if (strEQ (s, "__WARN__")) svp = &PL_warnhook;
880
881 if (svp)
882 {
883 SV *old = *svp;
884 *svp = SvOK (sv) ? newSVsv (sv) : 0;
885 SvREFCNT_dec (old);
886 return 0;
887 }
888 }
889
890 return orig_sigelem_set ? orig_sigelem_set (aTHX_ sv, mg) : 0;
891}
892
893static void
894prepare_nop (pTHX_ struct coro_transfer_args *ta)
895{
896 /* kind of mega-hacky, but works */
897 ta->next = ta->prev = (struct coro *)ta;
898}
899
900static int
901slf_check_nop (pTHX_ struct CoroSLF *frame)
902{
903 return 0;
904}
905
906static int
907slf_check_repeat (pTHX_ struct CoroSLF *frame)
908{
909 return 1;
910}
911
912static UNOP coro_setup_op;
913
914static void NOINLINE /* noinline to keep it out of the transfer fast path */
915coro_setup (pTHX_ struct coro *coro)
916{
917 /*
918 * emulate part of the perl startup here.
919 */
920 coro_init_stacks (aTHX);
921
922 PL_runops = RUNOPS_DEFAULT;
923 PL_curcop = &PL_compiling;
924 PL_in_eval = EVAL_NULL;
925 PL_comppad = 0;
926 PL_comppad_name = 0;
927 PL_comppad_name_fill = 0;
928 PL_comppad_name_floor = 0;
929 PL_curpm = 0;
930 PL_curpad = 0;
931 PL_localizing = 0;
932 PL_dirty = 0;
933 PL_restartop = 0;
934#if PERL_VERSION_ATLEAST (5,10,0)
935 PL_parser = 0;
936#endif
937 PL_hints = 0;
938
939 /* recreate the die/warn hooks */
940 PL_diehook = 0; SvSetMagicSV (*hv_fetch (hv_sig, "__DIE__" , sizeof ("__DIE__" ) - 1, 1), rv_diehook );
941 PL_warnhook = 0; SvSetMagicSV (*hv_fetch (hv_sig, "__WARN__", sizeof ("__WARN__") - 1, 1), rv_warnhook);
942
943 GvSV (PL_defgv) = newSV (0);
944 GvAV (PL_defgv) = coro->args; coro->args = 0;
945 GvSV (PL_errgv) = newSV (0);
946 GvSV (irsgv) = newSVpvn ("\n", 1); sv_magic (GvSV (irsgv), (SV *)irsgv, PERL_MAGIC_sv, "/", 0);
947 GvHV (PL_hintgv) = 0;
948 PL_rs = newSVsv (GvSV (irsgv));
949 PL_defoutgv = (GV *)SvREFCNT_inc_NN (stdoutgv);
950
951 {
952 dSP;
953 UNOP myop;
954
955 Zero (&myop, 1, UNOP);
956 myop.op_next = Nullop;
957 myop.op_type = OP_ENTERSUB;
958 myop.op_flags = OPf_WANT_VOID;
959
960 PUSHMARK (SP);
961 PUSHs ((SV *)coro->startcv);
962 PUTBACK;
963 PL_op = (OP *)&myop;
964 PL_op = PL_ppaddr[OP_ENTERSUB](aTHX);
965 }
966
967 /* this newly created coroutine might be run on an existing cctx which most
968 * likely was suspended in pp_slf, so we have to emulate entering pp_slf here.
969 */
970 slf_frame.prepare = prepare_nop; /* provide a nop function for an eventual pp_slf */
971 slf_frame.check = slf_check_nop; /* signal pp_slf to not repeat */
972
973 /* and we have to provide the pp_slf op in any case, so pp_slf can skip it */
974 coro_setup_op.op_next = PL_op;
975 coro_setup_op.op_type = OP_ENTERSUB;
976 coro_setup_op.op_ppaddr = pp_slf;
977 /* no flags etc. required, as an init function won't be called */
978
979 PL_op = (OP *)&coro_setup_op;
980
981 /* copy throw, in case it was set before coro_setup */
982 CORO_THROW = coro->except;
983
984 if (expect_false (enable_times))
985 {
986 coro_times_update ();
987 coro_times_sub (coro);
988 }
989}
990
991static void
992coro_unwind_stacks (pTHX)
993{
994 if (!IN_DESTRUCT)
995 {
996 /* restore all saved variables and stuff */
997 LEAVE_SCOPE (0);
998 assert (PL_tmps_floor == -1);
999
1000 /* free all temporaries */
1001 FREETMPS;
1002 assert (PL_tmps_ix == -1);
1003
1004 /* unwind all extra stacks */
1005 POPSTACK_TO (PL_mainstack);
1006
1007 /* unwind main stack */
1008 dounwind (-1);
1009 }
1010}
1011
1012static void
1013coro_destruct_perl (pTHX_ struct coro *coro)
1014{
1015 SV *svf [9];
1016
1017 {
1018 struct coro *current = SvSTATE_current;
1019
1020 assert (("FATAL: tried to destroy currently running coroutine", coro->mainstack != PL_mainstack));
1021
1022 save_perl (aTHX_ current);
1023 load_perl (aTHX_ coro);
1024
1025 coro_unwind_stacks (aTHX);
1026 coro_destruct_stacks (aTHX);
1027
1028 // now save some sv's to be free'd later
1029 svf [0] = GvSV (PL_defgv);
1030 svf [1] = (SV *)GvAV (PL_defgv);
1031 svf [2] = GvSV (PL_errgv);
1032 svf [3] = (SV *)PL_defoutgv;
1033 svf [4] = PL_rs;
1034 svf [5] = GvSV (irsgv);
1035 svf [6] = (SV *)GvHV (PL_hintgv);
1036 svf [7] = PL_diehook;
1037 svf [8] = PL_warnhook;
1038 assert (9 == sizeof (svf) / sizeof (*svf));
1039
1040 load_perl (aTHX_ current);
1041 }
1042
1043 {
1044 unsigned int i;
1045
1046 for (i = 0; i < sizeof (svf) / sizeof (*svf); ++i)
1047 SvREFCNT_dec (svf [i]);
1048
1049 SvREFCNT_dec (coro->saved_deffh);
1050 SvREFCNT_dec (coro->rouse_cb);
1051 SvREFCNT_dec (coro->invoke_cb);
1052 SvREFCNT_dec (coro->invoke_av);
1053 }
1054}
1055
1056INLINE void
1057free_coro_mortal (pTHX)
1058{
1059 if (expect_true (coro_mortal))
1060 {
1061 SvREFCNT_dec (coro_mortal);
1062 coro_mortal = 0;
1063 }
1064}
1065
1066static int
1067runops_trace (pTHX)
1068{
1069 COP *oldcop = 0;
1070 int oldcxix = -2;
1071
1072 while ((PL_op = CALL_FPTR (PL_op->op_ppaddr) (aTHX)))
1073 {
1074 PERL_ASYNC_CHECK ();
1075
1076 if (cctx_current->flags & CC_TRACE_ALL)
1077 {
1078 if (PL_op->op_type == OP_LEAVESUB && cctx_current->flags & CC_TRACE_SUB)
1079 {
1080 PERL_CONTEXT *cx = &cxstack[cxstack_ix];
1081 SV **bot, **top;
1082 AV *av = newAV (); /* return values */
1083 SV **cb;
1084 dSP;
1085
1086 GV *gv = CvGV (cx->blk_sub.cv);
1087 SV *fullname = sv_2mortal (newSV (0));
1088 if (isGV (gv))
1089 gv_efullname3 (fullname, gv, 0);
1090
1091 bot = PL_stack_base + cx->blk_oldsp + 1;
1092 top = cx->blk_gimme == G_ARRAY ? SP + 1
1093 : cx->blk_gimme == G_SCALAR ? bot + 1
1094 : bot;
1095
1096 av_extend (av, top - bot);
1097 while (bot < top)
1098 av_push (av, SvREFCNT_inc_NN (*bot++));
1099
1100 PL_runops = RUNOPS_DEFAULT;
1101 ENTER;
1102 SAVETMPS;
1103 EXTEND (SP, 3);
1104 PUSHMARK (SP);
1105 PUSHs (&PL_sv_no);
1106 PUSHs (fullname);
1107 PUSHs (sv_2mortal (newRV_noinc ((SV *)av)));
1108 PUTBACK;
1109 cb = hv_fetch ((HV *)SvRV (coro_current), "_trace_sub_cb", sizeof ("_trace_sub_cb") - 1, 0);
1110 if (cb) call_sv (*cb, G_KEEPERR | G_EVAL | G_VOID | G_DISCARD);
1111 SPAGAIN;
1112 FREETMPS;
1113 LEAVE;
1114 PL_runops = runops_trace;
1115 }
1116
1117 if (oldcop != PL_curcop)
1118 {
1119 oldcop = PL_curcop;
1120
1121 if (PL_curcop != &PL_compiling)
1122 {
1123 SV **cb;
1124
1125 if (oldcxix != cxstack_ix && cctx_current->flags & CC_TRACE_SUB)
1126 {
1127 PERL_CONTEXT *cx = &cxstack[cxstack_ix];
1128
1129 if (CxTYPE (cx) == CXt_SUB && oldcxix < cxstack_ix)
1130 {
1131 dSP;
1132 GV *gv = CvGV (cx->blk_sub.cv);
1133 SV *fullname = sv_2mortal (newSV (0));
1134
1135 if (isGV (gv))
1136 gv_efullname3 (fullname, gv, 0);
1137
1138 PL_runops = RUNOPS_DEFAULT;
1139 ENTER;
1140 SAVETMPS;
1141 EXTEND (SP, 3);
1142 PUSHMARK (SP);
1143 PUSHs (&PL_sv_yes);
1144 PUSHs (fullname);
1145 PUSHs (CxHASARGS (cx) ? sv_2mortal (newRV_inc ((SV *)cx->blk_sub.argarray)) : &PL_sv_undef);
1146 PUTBACK;
1147 cb = hv_fetch ((HV *)SvRV (coro_current), "_trace_sub_cb", sizeof ("_trace_sub_cb") - 1, 0);
1148 if (cb) call_sv (*cb, G_KEEPERR | G_EVAL | G_VOID | G_DISCARD);
1149 SPAGAIN;
1150 FREETMPS;
1151 LEAVE;
1152 PL_runops = runops_trace;
1153 }
1154
1155 oldcxix = cxstack_ix;
1156 }
1157
1158 if (cctx_current->flags & CC_TRACE_LINE)
1159 {
1160 dSP;
1161
1162 PL_runops = RUNOPS_DEFAULT;
1163 ENTER;
1164 SAVETMPS;
1165 EXTEND (SP, 3);
1166 PL_runops = RUNOPS_DEFAULT;
1167 PUSHMARK (SP);
1168 PUSHs (sv_2mortal (newSVpv (OutCopFILE (oldcop), 0)));
1169 PUSHs (sv_2mortal (newSViv (CopLINE (oldcop))));
1170 PUTBACK;
1171 cb = hv_fetch ((HV *)SvRV (coro_current), "_trace_line_cb", sizeof ("_trace_line_cb") - 1, 0);
1172 if (cb) call_sv (*cb, G_KEEPERR | G_EVAL | G_VOID | G_DISCARD);
1173 SPAGAIN;
1174 FREETMPS;
1175 LEAVE;
1176 PL_runops = runops_trace;
1177 }
1178 }
1179 }
1180 }
1181 }
1182
1183 TAINT_NOT;
1184 return 0;
1185}
1186
1187static struct CoroSLF cctx_ssl_frame;
1188
1189static void
1190slf_prepare_set_stacklevel (pTHX_ struct coro_transfer_args *ta)
1191{
1192 ta->prev = 0;
1193}
1194
1195static int
1196slf_check_set_stacklevel (pTHX_ struct CoroSLF *frame)
1197{
1198 *frame = cctx_ssl_frame;
1199
1200 return frame->check (aTHX_ frame); /* execute the restored frame - there must be one */
1201}
1202
1203/* initialises PL_top_env and injects a pseudo-slf-call to set the stacklevel */
1204static void NOINLINE
1205cctx_prepare (pTHX)
1206{
1207 PL_top_env = &PL_start_env;
1208
1209 if (cctx_current->flags & CC_TRACE)
1210 PL_runops = runops_trace;
1211
1212 /* we already must be executing an SLF op, there is no other valid way
1213 * that can lead to creation of a new cctx */
1214 assert (("FATAL: can't prepare slf-less cctx in Coro module (please report)",
1215 slf_frame.prepare && PL_op->op_ppaddr == pp_slf));
1216
1217 /* we must emulate leaving pp_slf, which is done inside slf_check_set_stacklevel */
1218 cctx_ssl_frame = slf_frame;
1219
1220 slf_frame.prepare = slf_prepare_set_stacklevel;
1221 slf_frame.check = slf_check_set_stacklevel;
1222}
1223
1224/* the tail of transfer: execute stuff we can only do after a transfer */
1225INLINE void
1226transfer_tail (pTHX)
1227{
1228 free_coro_mortal (aTHX);
1229}
1230
1231/*
1232 * this is a _very_ stripped down perl interpreter ;)
1233 */
1234static void
1235cctx_run (void *arg)
1236{
1237#ifdef USE_ITHREADS
1238# if CORO_PTHREAD
1239 PERL_SET_CONTEXT (coro_thx);
1240# endif
1241#endif
1242 {
1243 dTHX;
1244
1245 /* normally we would need to skip the entersub here */
1246 /* not doing so will re-execute it, which is exactly what we want */
1247 /* PL_nop = PL_nop->op_next */
1248
1249 /* inject a fake subroutine call to cctx_init */
1250 cctx_prepare (aTHX);
1251
1252 /* cctx_run is the alternative tail of transfer() */
1253 transfer_tail (aTHX);
1254
1255 /* somebody or something will hit me for both perl_run and PL_restartop */
1256 PL_restartop = PL_op;
1257 perl_run (PL_curinterp);
1258 /*
1259 * Unfortunately, there is no way to get at the return values of the
1260 * coro body here, as perl_run destroys these
1261 */
1262
1263 /*
1264 * If perl-run returns we assume exit() was being called or the coro
1265 * fell off the end, which seems to be the only valid (non-bug)
1266 * reason for perl_run to return. We try to exit by jumping to the
1267 * bootstrap-time "top" top_env, as we cannot restore the "main"
1268 * coroutine as Coro has no such concept.
1269 * This actually isn't valid with the pthread backend, but OSes requiring
1270 * that backend are too broken to do it in a standards-compliant way.
1271 */
1272 PL_top_env = main_top_env;
1273 JMPENV_JUMP (2); /* I do not feel well about the hardcoded 2 at all */
1274 }
1275}
1276
1277static coro_cctx *
1278cctx_new ()
1279{
1280 coro_cctx *cctx;
1281
1282 ++cctx_count;
1283 New (0, cctx, 1, coro_cctx);
1284
1285 cctx->gen = cctx_gen;
1286 cctx->flags = 0;
1287 cctx->idle_sp = 0; /* can be accessed by transfer between cctx_run and set_stacklevel, on throw */
1288
1289 return cctx;
1290}
1291
1292/* create a new cctx only suitable as source */
1293static coro_cctx *
1294cctx_new_empty ()
1295{
1296 coro_cctx *cctx = cctx_new ();
1297
1298 cctx->sptr = 0;
1299 coro_create (&cctx->cctx, 0, 0, 0, 0);
1300
1301 return cctx;
1302}
1303
1304/* create a new cctx suitable as destination/running a perl interpreter */
1305static coro_cctx *
1306cctx_new_run ()
1307{
1308 coro_cctx *cctx = cctx_new ();
1309 void *stack_start;
1310 size_t stack_size;
1311
1312#if HAVE_MMAP
1313 cctx->ssize = ((cctx_stacksize * sizeof (long) + PAGESIZE - 1) / PAGESIZE + CORO_STACKGUARD) * PAGESIZE;
1314 /* mmap supposedly does allocate-on-write for us */
1315 cctx->sptr = mmap (0, cctx->ssize, PROT_EXEC|PROT_READ|PROT_WRITE, MAP_PRIVATE|MAP_ANONYMOUS, 0, 0);
1316
1317 if (cctx->sptr != (void *)-1)
1318 {
1319 #if CORO_STACKGUARD
1320 mprotect (cctx->sptr, CORO_STACKGUARD * PAGESIZE, PROT_NONE);
1321 #endif
1322 stack_start = (char *)cctx->sptr + CORO_STACKGUARD * PAGESIZE;
1323 stack_size = cctx->ssize - CORO_STACKGUARD * PAGESIZE;
1324 cctx->flags |= CC_MAPPED;
1325 }
1326 else
1327#endif
1328 {
1329 cctx->ssize = cctx_stacksize * (long)sizeof (long);
1330 New (0, cctx->sptr, cctx_stacksize, long);
1331
1332 if (!cctx->sptr)
1333 {
1334 perror ("FATAL: unable to allocate stack for coroutine, exiting.");
1335 _exit (EXIT_FAILURE);
1336 }
1337
1338 stack_start = cctx->sptr;
1339 stack_size = cctx->ssize;
1340 }
1341
1342 #if CORO_USE_VALGRIND
1343 cctx->valgrind_id = VALGRIND_STACK_REGISTER ((char *)stack_start, (char *)stack_start + stack_size);
1344 #endif
1345
1346 coro_create (&cctx->cctx, cctx_run, (void *)cctx, stack_start, stack_size);
1347
1348 return cctx;
1349}
1350
1351static void
1352cctx_destroy (coro_cctx *cctx)
1353{
1354 if (!cctx)
1355 return;
1356
1357 assert (("FATAL: tried to destroy current cctx", cctx != cctx_current));//D temporary?
1358
1359 --cctx_count;
1360 coro_destroy (&cctx->cctx);
1361
1362 /* coro_transfer creates new, empty cctx's */
1363 if (cctx->sptr)
1364 {
1365 #if CORO_USE_VALGRIND
1366 VALGRIND_STACK_DEREGISTER (cctx->valgrind_id);
1367 #endif
1368
1369#if HAVE_MMAP
1370 if (cctx->flags & CC_MAPPED)
1371 munmap (cctx->sptr, cctx->ssize);
1372 else
1373#endif
1374 Safefree (cctx->sptr);
1375 }
1376
1377 Safefree (cctx);
1378}
1379
1380/* wether this cctx should be destructed */
1381#define CCTX_EXPIRED(cctx) ((cctx)->gen != cctx_gen || ((cctx)->flags & CC_NOREUSE))
1382
1383static coro_cctx *
1384cctx_get (pTHX)
1385{
1386 while (expect_true (cctx_first))
1387 {
1388 coro_cctx *cctx = cctx_first;
1389 cctx_first = cctx->next;
1390 --cctx_idle;
1391
1392 if (expect_true (!CCTX_EXPIRED (cctx)))
1393 return cctx;
1394
1395 cctx_destroy (cctx);
1396 }
1397
1398 return cctx_new_run ();
1399}
1400
1401static void
1402cctx_put (coro_cctx *cctx)
1403{
1404 assert (("FATAL: cctx_put called on non-initialised cctx in Coro (please report)", cctx->sptr));
1405
1406 /* free another cctx if overlimit */
1407 if (expect_false (cctx_idle >= cctx_max_idle))
1408 {
1409 coro_cctx *first = cctx_first;
1410 cctx_first = first->next;
1411 --cctx_idle;
1412
1413 cctx_destroy (first);
1414 }
1415
1416 ++cctx_idle;
1417 cctx->next = cctx_first;
1418 cctx_first = cctx;
1419}
1420
1421/** coroutine switching *****************************************************/
1422
1423static void
1424transfer_check (pTHX_ struct coro *prev, struct coro *next)
1425{
1426 /* TODO: throwing up here is considered harmful */
1427
1428 if (expect_true (prev != next))
1429 {
1430 if (expect_false (!(prev->flags & (CF_RUNNING | CF_NEW))))
1431 croak ("Coro::State::transfer called with a blocked prev Coro::State, but can only transfer from running or new states,");
1432
1433 if (expect_false (next->flags & (CF_RUNNING | CF_DESTROYED | CF_SUSPENDED)))
1434 croak ("Coro::State::transfer called with running, destroyed or suspended next Coro::State, but can only transfer to inactive states,");
1435
1436#if !PERL_VERSION_ATLEAST (5,10,0)
1437 if (expect_false (PL_lex_state != LEX_NOTPARSING))
1438 croak ("Coro::State::transfer called while parsing, but this is not supported in your perl version,");
1439#endif
1440 }
1441}
1442
1443/* always use the TRANSFER macro */
1444static void NOINLINE /* noinline so we have a fixed stackframe */
1445transfer (pTHX_ struct coro *prev, struct coro *next, int force_cctx)
1446{
1447 dSTACKLEVEL;
1448
1449 /* sometimes transfer is only called to set idle_sp */
1450 if (expect_false (!prev))
1451 {
1452 cctx_current->idle_sp = STACKLEVEL;
1453 assert (cctx_current->idle_te = PL_top_env); /* just for the side-effect when asserts are enabled */
1454 }
1455 else if (expect_true (prev != next))
1456 {
1457 coro_cctx *cctx_prev;
1458
1459 if (expect_false (prev->flags & CF_NEW))
1460 {
1461 /* create a new empty/source context */
1462 prev->flags &= ~CF_NEW;
1463 prev->flags |= CF_RUNNING;
1464 }
1465
1466 prev->flags &= ~CF_RUNNING;
1467 next->flags |= CF_RUNNING;
1468
1469 /* first get rid of the old state */
1470 save_perl (aTHX_ prev);
1471
1472 if (expect_false (next->flags & CF_NEW))
1473 {
1474 /* need to start coroutine */
1475 next->flags &= ~CF_NEW;
1476 /* setup coroutine call */
1477 coro_setup (aTHX_ next);
1478 }
1479 else
1480 load_perl (aTHX_ next);
1481
1482 /* possibly untie and reuse the cctx */
1483 if (expect_true (
1484 cctx_current->idle_sp == STACKLEVEL
1485 && !(cctx_current->flags & CC_TRACE)
1486 && !force_cctx
1487 ))
1488 {
1489 /* I assume that stacklevel is a stronger indicator than PL_top_env changes */
1490 assert (("FATAL: current top_env must equal previous top_env in Coro (please report)", PL_top_env == cctx_current->idle_te));
1491
1492 /* if the cctx is about to be destroyed we need to make sure we won't see it in cctx_get. */
1493 /* without this the next cctx_get might destroy the running cctx while still in use */
1494 if (expect_false (CCTX_EXPIRED (cctx_current)))
1495 if (expect_true (!next->cctx))
1496 next->cctx = cctx_get (aTHX);
1497
1498 cctx_put (cctx_current);
1499 }
1500 else
1501 prev->cctx = cctx_current;
1502
1503 ++next->usecount;
1504
1505 cctx_prev = cctx_current;
1506 cctx_current = expect_false (next->cctx) ? next->cctx : cctx_get (aTHX);
1507
1508 next->cctx = 0;
1509
1510 if (expect_false (cctx_prev != cctx_current))
1511 {
1512 cctx_prev->top_env = PL_top_env;
1513 PL_top_env = cctx_current->top_env;
1514 coro_transfer (&cctx_prev->cctx, &cctx_current->cctx);
1515 }
1516
1517 transfer_tail (aTHX);
1518 }
1519}
1520
1521#define TRANSFER(ta, force_cctx) transfer (aTHX_ (ta).prev, (ta).next, (force_cctx))
1522#define TRANSFER_CHECK(ta) transfer_check (aTHX_ (ta).prev, (ta).next)
1523
1524/** high level stuff ********************************************************/
1525
1526static int
1527coro_state_destroy (pTHX_ struct coro *coro)
1528{
1529 if (coro->flags & CF_DESTROYED)
1530 return 0;
1531
1532 if (coro->on_destroy && !PL_dirty)
1533 coro->on_destroy (aTHX_ coro);
1534
1535 coro->flags |= CF_DESTROYED;
1536
1537 if (coro->flags & CF_READY)
1538 {
1539 /* reduce nready, as destroying a ready coro effectively unreadies it */
1540 /* alternative: look through all ready queues and remove the coro */
1541 --coro_nready;
1542 }
1543 else
1544 coro->flags |= CF_READY; /* make sure it is NOT put into the readyqueue */
1545
1546 if (coro->mainstack
1547 && coro->mainstack != main_mainstack
1548 && coro->slot
1549 && !PL_dirty)
1550 coro_destruct_perl (aTHX_ coro);
1551
1552 cctx_destroy (coro->cctx);
1553 SvREFCNT_dec (coro->startcv);
1554 SvREFCNT_dec (coro->args);
1555 SvREFCNT_dec (CORO_THROW);
1556
1557 if (coro->next) coro->next->prev = coro->prev;
1558 if (coro->prev) coro->prev->next = coro->next;
1559 if (coro == coro_first) coro_first = coro->next;
1560
1561 return 1;
1562}
1563
1564static int
1565coro_state_free (pTHX_ SV *sv, MAGIC *mg)
1566{
1567 struct coro *coro = (struct coro *)mg->mg_ptr;
1568 mg->mg_ptr = 0;
1569
1570 coro->hv = 0;
1571
1572 if (--coro->refcnt < 0)
1573 {
1574 coro_state_destroy (aTHX_ coro);
1575 Safefree (coro);
1576 }
1577
1578 return 0;
1579}
1580
1581static int
1582coro_state_dup (pTHX_ MAGIC *mg, CLONE_PARAMS *params)
1583{
1584 struct coro *coro = (struct coro *)mg->mg_ptr;
1585
1586 ++coro->refcnt;
1587
1588 return 0;
1589}
1590
1591static MGVTBL coro_state_vtbl = {
1592 0, 0, 0, 0,
1593 coro_state_free,
1594 0,
1595#ifdef MGf_DUP
1596 coro_state_dup,
1597#else
1598# define MGf_DUP 0
1599#endif
1600};
1601
1602static void
1603prepare_transfer (pTHX_ struct coro_transfer_args *ta, SV *prev_sv, SV *next_sv)
1604{
1605 ta->prev = SvSTATE (prev_sv);
1606 ta->next = SvSTATE (next_sv);
1607 TRANSFER_CHECK (*ta);
1608}
1609
1610static void
1611api_transfer (pTHX_ SV *prev_sv, SV *next_sv)
1612{
1613 struct coro_transfer_args ta;
1614
1615 prepare_transfer (aTHX_ &ta, prev_sv, next_sv);
1616 TRANSFER (ta, 1);
1617}
1618
1619/** Coro ********************************************************************/
1620
1621INLINE void
1622coro_enq (pTHX_ struct coro *coro)
1623{
1624 struct coro **ready = coro_ready [coro->prio - CORO_PRIO_MIN];
1625
1626 SvREFCNT_inc_NN (coro->hv);
1627
1628 coro->next_ready = 0;
1629 *(ready [0] ? &ready [1]->next_ready : &ready [0]) = coro;
1630 ready [1] = coro;
1631}
1632
1633INLINE struct coro *
1634coro_deq (pTHX)
1635{
1636 int prio;
1637
1638 for (prio = CORO_PRIO_MAX - CORO_PRIO_MIN + 1; --prio >= 0; )
1639 {
1640 struct coro **ready = coro_ready [prio];
1641
1642 if (ready [0])
1643 {
1644 struct coro *coro = ready [0];
1645 ready [0] = coro->next_ready;
1646 return coro;
1647 }
1648 }
1649
1650 return 0;
1651}
1652
1653static int
1654api_ready (pTHX_ SV *coro_sv)
1655{
1656 struct coro *coro;
1657 SV *sv_hook;
1658 void (*xs_hook)(void);
1659
1660 coro = SvSTATE (coro_sv);
1661
1662 if (coro->flags & CF_READY)
1663 return 0;
1664
1665 coro->flags |= CF_READY;
1666
1667 sv_hook = coro_nready ? 0 : coro_readyhook;
1668 xs_hook = coro_nready ? 0 : coroapi.readyhook;
1669
1670 coro_enq (aTHX_ coro);
1671 ++coro_nready;
1672
1673 if (sv_hook)
1674 {
1675 dSP;
1676
1677 ENTER;
1678 SAVETMPS;
1679
1680 PUSHMARK (SP);
1681 PUTBACK;
1682 call_sv (sv_hook, G_VOID | G_DISCARD);
1683
1684 FREETMPS;
1685 LEAVE;
1686 }
1687
1688 if (xs_hook)
1689 xs_hook ();
1690
1691 return 1;
1692}
1693
1694static int
1695api_is_ready (pTHX_ SV *coro_sv)
1696{
1697 return !!(SvSTATE (coro_sv)->flags & CF_READY);
1698}
1699
1700/* expects to own a reference to next->hv */
1701INLINE void
1702prepare_schedule_to (pTHX_ struct coro_transfer_args *ta, struct coro *next)
1703{
1704 SV *prev_sv = SvRV (coro_current);
1705
1706 ta->prev = SvSTATE_hv (prev_sv);
1707 ta->next = next;
1708
1709 TRANSFER_CHECK (*ta);
1710
1711 SvRV_set (coro_current, (SV *)next->hv);
1712
1713 free_coro_mortal (aTHX);
1714 coro_mortal = prev_sv;
1715}
1716
1717static void
1718prepare_schedule (pTHX_ struct coro_transfer_args *ta)
1719{
1720 for (;;)
1721 {
1722 struct coro *next = coro_deq (aTHX);
1723
1724 if (expect_true (next))
1725 {
1726 /* cannot transfer to destroyed coros, skip and look for next */
1727 if (expect_false (next->flags & (CF_DESTROYED | CF_SUSPENDED)))
1728 SvREFCNT_dec (next->hv); /* coro_nready has already been taken care of by destroy */
1729 else
1730 {
1731 next->flags &= ~CF_READY;
1732 --coro_nready;
1733
1734 prepare_schedule_to (aTHX_ ta, next);
1735 break;
1736 }
1737 }
1738 else
1739 {
1740 /* nothing to schedule: call the idle handler */
1741 if (SvROK (sv_idle)
1742 && SvOBJECT (SvRV (sv_idle)))
1743 {
1744 ++coro_nready; /* hack so that api_ready doesn't invoke ready hook */
1745 api_ready (aTHX_ SvRV (sv_idle));
1746 --coro_nready;
1747 }
1748 else
1749 {
1750 dSP;
1751
1752 ENTER;
1753 SAVETMPS;
1754
1755 PUSHMARK (SP);
1756 PUTBACK;
1757 call_sv (sv_idle, G_VOID | G_DISCARD);
1758
1759 FREETMPS;
1760 LEAVE;
1761 }
1762 }
1763 }
1764}
1765
1766INLINE void
1767prepare_cede (pTHX_ struct coro_transfer_args *ta)
1768{
1769 api_ready (aTHX_ coro_current);
1770 prepare_schedule (aTHX_ ta);
1771}
1772
1773INLINE void
1774prepare_cede_notself (pTHX_ struct coro_transfer_args *ta)
1775{
1776 SV *prev = SvRV (coro_current);
1777
1778 if (coro_nready)
1779 {
1780 prepare_schedule (aTHX_ ta);
1781 api_ready (aTHX_ prev);
1782 }
1783 else
1784 prepare_nop (aTHX_ ta);
1785}
1786
1787static void
1788api_schedule (pTHX)
1789{
1790 struct coro_transfer_args ta;
1791
1792 prepare_schedule (aTHX_ &ta);
1793 TRANSFER (ta, 1);
1794}
1795
1796static void
1797api_schedule_to (pTHX_ SV *coro_sv)
1798{
1799 struct coro_transfer_args ta;
1800 struct coro *next = SvSTATE (coro_sv);
1801
1802 SvREFCNT_inc_NN (coro_sv);
1803 prepare_schedule_to (aTHX_ &ta, next);
1804}
1805
1806static int
1807api_cede (pTHX)
1808{
1809 struct coro_transfer_args ta;
1810
1811 prepare_cede (aTHX_ &ta);
1812
1813 if (expect_true (ta.prev != ta.next))
1814 {
1815 TRANSFER (ta, 1);
1816 return 1;
1817 }
1818 else
1819 return 0;
1820}
1821
1822static int
1823api_cede_notself (pTHX)
1824{
1825 if (coro_nready)
1826 {
1827 struct coro_transfer_args ta;
1828
1829 prepare_cede_notself (aTHX_ &ta);
1830 TRANSFER (ta, 1);
1831 return 1;
1832 }
1833 else
1834 return 0;
1835}
1836
1837static void
1838api_trace (pTHX_ SV *coro_sv, int flags)
1839{
1840 struct coro *coro = SvSTATE (coro_sv);
1841
1842 if (coro->flags & CF_RUNNING)
1843 croak ("cannot enable tracing on a running coroutine, caught");
1844
1845 if (flags & CC_TRACE)
1846 {
1847 if (!coro->cctx)
1848 coro->cctx = cctx_new_run ();
1849 else if (!(coro->cctx->flags & CC_TRACE))
1850 croak ("cannot enable tracing on coroutine with custom stack, caught");
1851
1852 coro->cctx->flags |= CC_NOREUSE | (flags & (CC_TRACE | CC_TRACE_ALL));
1853 }
1854 else if (coro->cctx && coro->cctx->flags & CC_TRACE)
1855 {
1856 coro->cctx->flags &= ~(CC_TRACE | CC_TRACE_ALL);
1857
1858 if (coro->flags & CF_RUNNING)
1859 PL_runops = RUNOPS_DEFAULT;
1860 else
1861 coro->slot->runops = RUNOPS_DEFAULT;
1862 }
1863}
1864
1865static void
1866coro_call_on_destroy (pTHX_ struct coro *coro)
1867{
1868 SV **on_destroyp = hv_fetch (coro->hv, "_on_destroy", sizeof ("_on_destroy") - 1, 0);
1869 SV **statusp = hv_fetch (coro->hv, "_status", sizeof ("_status") - 1, 0);
1870
1871 if (on_destroyp)
1872 {
1873 AV *on_destroy = (AV *)SvRV (*on_destroyp);
1874
1875 while (AvFILLp (on_destroy) >= 0)
1876 {
1877 dSP; /* don't disturb outer sp */
1878 SV *cb = av_pop (on_destroy);
1879
1880 PUSHMARK (SP);
1881
1882 if (statusp)
1883 {
1884 int i;
1885 AV *status = (AV *)SvRV (*statusp);
1886 EXTEND (SP, AvFILLp (status) + 1);
1887
1888 for (i = 0; i <= AvFILLp (status); ++i)
1889 PUSHs (AvARRAY (status)[i]);
1890 }
1891
1892 PUTBACK;
1893 call_sv (sv_2mortal (cb), G_VOID | G_DISCARD);
1894 }
1895 }
1896}
1897
1898static void
1899slf_init_terminate (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
1900{
1901 int i;
1902 HV *hv = (HV *)SvRV (coro_current);
1903 AV *av = newAV ();
1904
1905 av_extend (av, items - 1);
1906 for (i = 0; i < items; ++i)
1907 av_push (av, SvREFCNT_inc_NN (arg [i]));
1908
1909 hv_store (hv, "_status", sizeof ("_status") - 1, newRV_noinc ((SV *)av), 0);
1910
1911 av_push (av_destroy, (SV *)newRV_inc ((SV *)hv)); /* RVinc for perl */
1912 api_ready (aTHX_ sv_manager);
1913
1914 frame->prepare = prepare_schedule;
1915 frame->check = slf_check_repeat;
1916
1917 /* as a minor optimisation, we could unwind all stacks here */
1918 /* but that puts extra pressure on pp_slf, and is not worth much */
1919 /*coro_unwind_stacks (aTHX);*/
1920}
1921
1922/*****************************************************************************/
1923/* async pool handler */
1924
1925static int
1926slf_check_pool_handler (pTHX_ struct CoroSLF *frame)
1927{
1928 HV *hv = (HV *)SvRV (coro_current);
1929 struct coro *coro = (struct coro *)frame->data;
1930
1931 if (!coro->invoke_cb)
1932 return 1; /* loop till we have invoke */
1933 else
1934 {
1935 hv_store (hv, "desc", sizeof ("desc") - 1,
1936 newSVpvn ("[async_pool]", sizeof ("[async_pool]") - 1), 0);
1937
1938 coro->saved_deffh = SvREFCNT_inc_NN ((SV *)PL_defoutgv);
1939
1940 {
1941 dSP;
1942 XPUSHs (sv_2mortal (coro->invoke_cb)); coro->invoke_cb = 0;
1943 PUTBACK;
1944 }
1945
1946 SvREFCNT_dec (GvAV (PL_defgv));
1947 GvAV (PL_defgv) = coro->invoke_av;
1948 coro->invoke_av = 0;
1949
1950 return 0;
1951 }
1952}
1953
1954static void
1955slf_init_pool_handler (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
1956{
1957 HV *hv = (HV *)SvRV (coro_current);
1958 struct coro *coro = SvSTATE_hv ((SV *)hv);
1959
1960 if (expect_true (coro->saved_deffh))
1961 {
1962 /* subsequent iteration */
1963 SvREFCNT_dec ((SV *)PL_defoutgv); PL_defoutgv = (GV *)coro->saved_deffh;
1964 coro->saved_deffh = 0;
1965
1966 if (coro_rss (aTHX_ coro) > SvUV (sv_pool_rss)
1967 || av_len (av_async_pool) + 1 >= SvIV (sv_pool_size))
1968 {
1969 coro->invoke_cb = SvREFCNT_inc_NN ((SV *)cv_coro_terminate);
1970 coro->invoke_av = newAV ();
1971
1972 frame->prepare = prepare_nop;
1973 }
1974 else
1975 {
1976 av_clear (GvAV (PL_defgv));
1977 hv_store (hv, "desc", sizeof ("desc") - 1, SvREFCNT_inc_NN (sv_async_pool_idle), 0);
1978
1979 coro->prio = 0;
1980
1981 if (coro->cctx && (coro->cctx->flags & CC_TRACE))
1982 api_trace (aTHX_ coro_current, 0);
1983
1984 frame->prepare = prepare_schedule;
1985 av_push (av_async_pool, SvREFCNT_inc (hv));
1986 }
1987 }
1988 else
1989 {
1990 /* first iteration, simply fall through */
1991 frame->prepare = prepare_nop;
1992 }
1993
1994 frame->check = slf_check_pool_handler;
1995 frame->data = (void *)coro;
1996}
1997
1998/*****************************************************************************/
1999/* rouse callback */
2000
2001#define CORO_MAGIC_type_rouse PERL_MAGIC_ext
2002
2003static void
2004coro_rouse_callback (pTHX_ CV *cv)
2005{
2006 dXSARGS;
2007 SV *data = (SV *)S_GENSUB_ARG;
2008
2009 if (SvTYPE (SvRV (data)) != SVt_PVAV)
2010 {
2011 /* first call, set args */
2012 SV *coro = SvRV (data);
2013 AV *av = newAV ();
2014
2015 SvRV_set (data, (SV *)av);
2016
2017 /* better take a full copy of the arguments */
2018 while (items--)
2019 av_store (av, items, newSVsv (ST (items)));
2020
2021 api_ready (aTHX_ coro);
2022 SvREFCNT_dec (coro);
2023 }
2024
2025 XSRETURN_EMPTY;
2026}
2027
2028static int
2029slf_check_rouse_wait (pTHX_ struct CoroSLF *frame)
2030{
2031 SV *data = (SV *)frame->data;
2032
2033 if (CORO_THROW)
2034 return 0;
2035
2036 if (SvTYPE (SvRV (data)) != SVt_PVAV)
2037 return 1;
2038
2039 /* now push all results on the stack */
2040 {
2041 dSP;
2042 AV *av = (AV *)SvRV (data);
2043 int i;
2044
2045 EXTEND (SP, AvFILLp (av) + 1);
2046 for (i = 0; i <= AvFILLp (av); ++i)
2047 PUSHs (sv_2mortal (AvARRAY (av)[i]));
2048
2049 /* we have stolen the elements, so set length to zero and free */
2050 AvFILLp (av) = -1;
2051 av_undef (av);
2052
2053 PUTBACK;
2054 }
2055
2056 return 0;
2057}
2058
2059static void
2060slf_init_rouse_wait (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2061{
2062 SV *cb;
2063
2064 if (items)
2065 cb = arg [0];
2066 else
2067 {
2068 struct coro *coro = SvSTATE_current;
2069
2070 if (!coro->rouse_cb)
2071 croak ("Coro::rouse_wait called without rouse callback, and no default rouse callback found either,");
2072
2073 cb = sv_2mortal (coro->rouse_cb);
2074 coro->rouse_cb = 0;
2075 }
2076
2077 if (!SvROK (cb)
2078 || SvTYPE (SvRV (cb)) != SVt_PVCV
2079 || CvXSUB ((CV *)SvRV (cb)) != coro_rouse_callback)
2080 croak ("Coro::rouse_wait called with illegal callback argument,");
2081
2082 {
2083 CV *cv = (CV *)SvRV (cb); /* for S_GENSUB_ARG */
2084 SV *data = (SV *)S_GENSUB_ARG;
2085
2086 frame->data = (void *)data;
2087 frame->prepare = SvTYPE (SvRV (data)) == SVt_PVAV ? prepare_nop : prepare_schedule;
2088 frame->check = slf_check_rouse_wait;
2089 }
2090}
2091
2092static SV *
2093coro_new_rouse_cb (pTHX)
2094{
2095 HV *hv = (HV *)SvRV (coro_current);
2096 struct coro *coro = SvSTATE_hv (hv);
2097 SV *data = newRV_inc ((SV *)hv);
2098 SV *cb = s_gensub (aTHX_ coro_rouse_callback, (void *)data);
2099
2100 sv_magicext (SvRV (cb), data, CORO_MAGIC_type_rouse, 0, 0, 0);
2101 SvREFCNT_dec (data); /* magicext increases the refcount */
2102
2103 SvREFCNT_dec (coro->rouse_cb);
2104 coro->rouse_cb = SvREFCNT_inc_NN (cb);
2105
2106 return cb;
2107}
2108
2109/*****************************************************************************/
2110/* schedule-like-function opcode (SLF) */
2111
2112static UNOP slf_restore; /* restore stack as entersub did, for first-re-run */
2113static const CV *slf_cv;
2114static SV **slf_argv;
2115static int slf_argc, slf_arga; /* count, allocated */
2116static I32 slf_ax; /* top of stack, for restore */
2117
2118/* this restores the stack in the case we patched the entersub, to */
2119/* recreate the stack frame as perl will on following calls */
2120/* since entersub cleared the stack */
2121static OP *
2122pp_restore (pTHX)
2123{
2124 int i;
2125 SV **SP = PL_stack_base + slf_ax;
2126
2127 PUSHMARK (SP);
2128
2129 EXTEND (SP, slf_argc + 1);
2130
2131 for (i = 0; i < slf_argc; ++i)
2132 PUSHs (sv_2mortal (slf_argv [i]));
2133
2134 PUSHs ((SV *)CvGV (slf_cv));
2135
2136 RETURNOP (slf_restore.op_first);
2137}
2138
2139static void
2140slf_prepare_transfer (pTHX_ struct coro_transfer_args *ta)
2141{
2142 SV **arg = (SV **)slf_frame.data;
2143
2144 prepare_transfer (aTHX_ ta, arg [0], arg [1]);
2145}
2146
2147static void
2148slf_init_transfer (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2149{
2150 if (items != 2)
2151 croak ("Coro::State::transfer (prev, next) expects two arguments, not %d,", items);
2152
2153 frame->prepare = slf_prepare_transfer;
2154 frame->check = slf_check_nop;
2155 frame->data = (void *)arg; /* let's hope it will stay valid */
2156}
2157
2158static void
2159slf_init_schedule (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2160{
2161 frame->prepare = prepare_schedule;
2162 frame->check = slf_check_nop;
2163}
2164
2165static void
2166slf_prepare_schedule_to (pTHX_ struct coro_transfer_args *ta)
2167{
2168 struct coro *next = (struct coro *)slf_frame.data;
2169
2170 SvREFCNT_inc_NN (next->hv);
2171 prepare_schedule_to (aTHX_ ta, next);
2172}
2173
2174static void
2175slf_init_schedule_to (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2176{
2177 if (!items)
2178 croak ("Coro::schedule_to expects a coroutine argument, caught");
2179
2180 frame->data = (void *)SvSTATE (arg [0]);
2181 frame->prepare = slf_prepare_schedule_to;
2182 frame->check = slf_check_nop;
2183}
2184
2185static void
2186slf_init_cede_to (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2187{
2188 api_ready (aTHX_ SvRV (coro_current));
2189
2190 slf_init_schedule_to (aTHX_ frame, cv, arg, items);
2191}
2192
2193static void
2194slf_init_cede (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2195{
2196 frame->prepare = prepare_cede;
2197 frame->check = slf_check_nop;
2198}
2199
2200static void
2201slf_init_cede_notself (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2202{
2203 frame->prepare = prepare_cede_notself;
2204 frame->check = slf_check_nop;
2205}
2206
2207/*
2208 * these not obviously related functions are all rolled into one
2209 * function to increase chances that they all will call transfer with the same
2210 * stack offset
2211 * SLF stands for "schedule-like-function".
2212 */
2213static OP *
2214pp_slf (pTHX)
2215{
2216 I32 checkmark; /* mark SP to see how many elements check has pushed */
2217
2218 /* set up the slf frame, unless it has already been set-up */
2219 /* the latter happens when a new coro has been started */
2220 /* or when a new cctx was attached to an existing coroutine */
2221 if (expect_true (!slf_frame.prepare))
2222 {
2223 /* first iteration */
2224 dSP;
2225 SV **arg = PL_stack_base + TOPMARK + 1;
2226 int items = SP - arg; /* args without function object */
2227 SV *gv = *sp;
2228
2229 /* do a quick consistency check on the "function" object, and if it isn't */
2230 /* for us, divert to the real entersub */
2231 if (SvTYPE (gv) != SVt_PVGV
2232 || !GvCV (gv)
2233 || !(CvFLAGS (GvCV (gv)) & CVf_SLF))
2234 return PL_ppaddr[OP_ENTERSUB](aTHX);
2235
2236 if (!(PL_op->op_flags & OPf_STACKED))
2237 {
2238 /* ampersand-form of call, use @_ instead of stack */
2239 AV *av = GvAV (PL_defgv);
2240 arg = AvARRAY (av);
2241 items = AvFILLp (av) + 1;
2242 }
2243
2244 /* now call the init function, which needs to set up slf_frame */
2245 ((coro_slf_cb)CvXSUBANY (GvCV (gv)).any_ptr)
2246 (aTHX_ &slf_frame, GvCV (gv), arg, items);
2247
2248 /* pop args */
2249 SP = PL_stack_base + POPMARK;
2250
2251 PUTBACK;
2252 }
2253
2254 /* now that we have a slf_frame, interpret it! */
2255 /* we use a callback system not to make the code needlessly */
2256 /* complicated, but so we can run multiple perl coros from one cctx */
2257
2258 do
2259 {
2260 struct coro_transfer_args ta;
2261
2262 slf_frame.prepare (aTHX_ &ta);
2263 TRANSFER (ta, 0);
2264
2265 checkmark = PL_stack_sp - PL_stack_base;
2266 }
2267 while (slf_frame.check (aTHX_ &slf_frame));
2268
2269 slf_frame.prepare = 0; /* invalidate the frame, we are done processing it */
2270
2271 /* exception handling */
2272 if (expect_false (CORO_THROW))
2273 {
2274 SV *exception = sv_2mortal (CORO_THROW);
2275
2276 CORO_THROW = 0;
2277 sv_setsv (ERRSV, exception);
2278 croak (0);
2279 }
2280
2281 /* return value handling - mostly like entersub */
2282 /* make sure we put something on the stack in scalar context */
2283 if (GIMME_V == G_SCALAR)
2284 {
2285 dSP;
2286 SV **bot = PL_stack_base + checkmark;
2287
2288 if (sp == bot) /* too few, push undef */
2289 bot [1] = &PL_sv_undef;
2290 else if (sp != bot + 1) /* too many, take last one */
2291 bot [1] = *sp;
2292
2293 SP = bot + 1;
2294
2295 PUTBACK;
2296 }
2297
2298 return NORMAL;
2299}
2300
2301static void
2302api_execute_slf (pTHX_ CV *cv, coro_slf_cb init_cb, I32 ax)
2303{
2304 int i;
2305 SV **arg = PL_stack_base + ax;
2306 int items = PL_stack_sp - arg + 1;
2307
2308 assert (("FATAL: SLF call with illegal CV value", !CvANON (cv)));
2309
2310 if (PL_op->op_ppaddr != PL_ppaddr [OP_ENTERSUB]
2311 && PL_op->op_ppaddr != pp_slf)
2312 croak ("FATAL: Coro SLF calls can only be made normally, not via goto or any other means, caught");
2313
2314 CvFLAGS (cv) |= CVf_SLF;
2315 CvXSUBANY (cv).any_ptr = (void *)init_cb;
2316 slf_cv = cv;
2317
2318 /* we patch the op, and then re-run the whole call */
2319 /* we have to put the same argument on the stack for this to work */
2320 /* and this will be done by pp_restore */
2321 slf_restore.op_next = (OP *)&slf_restore;
2322 slf_restore.op_type = OP_CUSTOM;
2323 slf_restore.op_ppaddr = pp_restore;
2324 slf_restore.op_first = PL_op;
2325
2326 slf_ax = ax - 1; /* undo the ax++ inside dAXMARK */
2327
2328 if (PL_op->op_flags & OPf_STACKED)
2329 {
2330 if (items > slf_arga)
2331 {
2332 slf_arga = items;
2333 free (slf_argv);
2334 slf_argv = malloc (slf_arga * sizeof (SV *));
2335 }
2336
2337 slf_argc = items;
2338
2339 for (i = 0; i < items; ++i)
2340 slf_argv [i] = SvREFCNT_inc (arg [i]);
2341 }
2342 else
2343 slf_argc = 0;
2344
2345 PL_op->op_ppaddr = pp_slf;
2346 /*PL_op->op_type = OP_CUSTOM; /* we do behave like entersub still */
2347
2348 PL_op = (OP *)&slf_restore;
2349}
2350
2351/*****************************************************************************/
2352/* dynamic wind */
2353
2354static void
2355on_enterleave_call (pTHX_ SV *cb)
2356{
2357 dSP;
2358
2359 PUSHSTACK;
2360
2361 PUSHMARK (SP);
2362 PUTBACK;
2363 call_sv (cb, G_VOID | G_DISCARD);
2364 SPAGAIN;
2365
2366 POPSTACK;
2367}
2368
2369static SV *
2370coro_avp_pop_and_free (pTHX_ AV **avp)
2371{
2372 AV *av = *avp;
2373 SV *res = av_pop (av);
2374
2375 if (AvFILLp (av) < 0)
2376 {
2377 *avp = 0;
2378 SvREFCNT_dec (av);
2379 }
2380
2381 return res;
2382}
2383
2384static void
2385coro_pop_on_enter (pTHX_ void *coro)
2386{
2387 SV *cb = coro_avp_pop_and_free (aTHX_ &((struct coro *)coro)->on_enter);
2388 SvREFCNT_dec (cb);
2389}
2390
2391static void
2392coro_pop_on_leave (pTHX_ void *coro)
2393{
2394 SV *cb = coro_avp_pop_and_free (aTHX_ &((struct coro *)coro)->on_leave);
2395 on_enterleave_call (aTHX_ sv_2mortal (cb));
2396}
2397
2398/*****************************************************************************/
2399/* PerlIO::cede */
2400
2401typedef struct
2402{
2403 PerlIOBuf base;
2404 NV next, every;
2405} PerlIOCede;
2406
2407static IV
2408PerlIOCede_pushed (pTHX_ PerlIO *f, const char *mode, SV *arg, PerlIO_funcs *tab)
2409{
2410 PerlIOCede *self = PerlIOSelf (f, PerlIOCede);
2411
2412 self->every = SvCUR (arg) ? SvNV (arg) : 0.01;
2413 self->next = nvtime () + self->every;
2414
2415 return PerlIOBuf_pushed (aTHX_ f, mode, Nullsv, tab);
2416}
2417
2418static SV *
2419PerlIOCede_getarg (pTHX_ PerlIO *f, CLONE_PARAMS *param, int flags)
2420{
2421 PerlIOCede *self = PerlIOSelf (f, PerlIOCede);
2422
2423 return newSVnv (self->every);
2424}
2425
2426static IV
2427PerlIOCede_flush (pTHX_ PerlIO *f)
2428{
2429 PerlIOCede *self = PerlIOSelf (f, PerlIOCede);
2430 double now = nvtime ();
2431
2432 if (now >= self->next)
2433 {
2434 api_cede (aTHX);
2435 self->next = now + self->every;
2436 }
2437
2438 return PerlIOBuf_flush (aTHX_ f);
2439}
2440
2441static PerlIO_funcs PerlIO_cede =
2442{
2443 sizeof(PerlIO_funcs),
2444 "cede",
2445 sizeof(PerlIOCede),
2446 PERLIO_K_DESTRUCT | PERLIO_K_RAW,
2447 PerlIOCede_pushed,
2448 PerlIOBuf_popped,
2449 PerlIOBuf_open,
2450 PerlIOBase_binmode,
2451 PerlIOCede_getarg,
2452 PerlIOBase_fileno,
2453 PerlIOBuf_dup,
2454 PerlIOBuf_read,
2455 PerlIOBuf_unread,
2456 PerlIOBuf_write,
2457 PerlIOBuf_seek,
2458 PerlIOBuf_tell,
2459 PerlIOBuf_close,
2460 PerlIOCede_flush,
2461 PerlIOBuf_fill,
2462 PerlIOBase_eof,
2463 PerlIOBase_error,
2464 PerlIOBase_clearerr,
2465 PerlIOBase_setlinebuf,
2466 PerlIOBuf_get_base,
2467 PerlIOBuf_bufsiz,
2468 PerlIOBuf_get_ptr,
2469 PerlIOBuf_get_cnt,
2470 PerlIOBuf_set_ptrcnt,
2471};
2472
2473/*****************************************************************************/
2474/* Coro::Semaphore & Coro::Signal */
2475
2476static SV *
2477coro_waitarray_new (pTHX_ int count)
2478{
2479 /* a waitarray=semaphore contains a counter IV in $sem->[0] and any waiters after that */
2480 AV *av = newAV ();
2481 SV **ary;
2482
2483 /* unfortunately, building manually saves memory */
2484 Newx (ary, 2, SV *);
2485 AvALLOC (av) = ary;
2486#if PERL_VERSION_ATLEAST (5,10,0)
2487 AvARRAY (av) = ary;
2488#else
2489 /* 5.8.8 needs this syntax instead of AvARRAY = ary, yet */
2490 /* -DDEBUGGING flags this as a bug, despite it perfectly working */
2491 SvPVX ((SV *)av) = (char *)ary;
2492#endif
2493 AvMAX (av) = 1;
2494 AvFILLp (av) = 0;
2495 ary [0] = newSViv (count);
2496
2497 return newRV_noinc ((SV *)av);
2498}
2499
2500/* semaphore */
2501
2502static void
2503coro_semaphore_adjust (pTHX_ AV *av, IV adjust)
2504{
2505 SV *count_sv = AvARRAY (av)[0];
2506 IV count = SvIVX (count_sv);
2507
2508 count += adjust;
2509 SvIVX (count_sv) = count;
2510
2511 /* now wake up as many waiters as are expected to lock */
2512 while (count > 0 && AvFILLp (av) > 0)
2513 {
2514 SV *cb;
2515
2516 /* swap first two elements so we can shift a waiter */
2517 AvARRAY (av)[0] = AvARRAY (av)[1];
2518 AvARRAY (av)[1] = count_sv;
2519 cb = av_shift (av);
2520
2521 if (SvOBJECT (cb))
2522 {
2523 api_ready (aTHX_ cb);
2524 --count;
2525 }
2526 else if (SvTYPE (cb) == SVt_PVCV)
2527 {
2528 dSP;
2529 PUSHMARK (SP);
2530 XPUSHs (sv_2mortal (newRV_inc ((SV *)av)));
2531 PUTBACK;
2532 call_sv (cb, G_VOID | G_DISCARD | G_EVAL | G_KEEPERR);
2533 }
2534
2535 SvREFCNT_dec (cb);
2536 }
2537}
2538
2539static void
2540coro_semaphore_on_destroy (pTHX_ struct coro *coro)
2541{
2542 /* call $sem->adjust (0) to possibly wake up some other waiters */
2543 coro_semaphore_adjust (aTHX_ (AV *)coro->slf_frame.data, 0);
2544}
2545
2546static int
2547slf_check_semaphore_down_or_wait (pTHX_ struct CoroSLF *frame, int acquire)
2548{
2549 AV *av = (AV *)frame->data;
2550 SV *count_sv = AvARRAY (av)[0];
2551
2552 /* if we are about to throw, don't actually acquire the lock, just throw */
2553 if (CORO_THROW)
2554 return 0;
2555 else if (SvIVX (count_sv) > 0)
2556 {
2557 SvSTATE_current->on_destroy = 0;
2558
2559 if (acquire)
2560 SvIVX (count_sv) = SvIVX (count_sv) - 1;
2561 else
2562 coro_semaphore_adjust (aTHX_ av, 0);
2563
2564 return 0;
2565 }
2566 else
2567 {
2568 int i;
2569 /* if we were woken up but can't down, we look through the whole */
2570 /* waiters list and only add us if we aren't in there already */
2571 /* this avoids some degenerate memory usage cases */
2572
2573 for (i = 1; i <= AvFILLp (av); ++i)
2574 if (AvARRAY (av)[i] == SvRV (coro_current))
2575 return 1;
2576
2577 av_push (av, SvREFCNT_inc (SvRV (coro_current)));
2578 return 1;
2579 }
2580}
2581
2582static int
2583slf_check_semaphore_down (pTHX_ struct CoroSLF *frame)
2584{
2585 return slf_check_semaphore_down_or_wait (aTHX_ frame, 1);
2586}
2587
2588static int
2589slf_check_semaphore_wait (pTHX_ struct CoroSLF *frame)
2590{
2591 return slf_check_semaphore_down_or_wait (aTHX_ frame, 0);
2592}
2593
2594static void
2595slf_init_semaphore_down_or_wait (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2596{
2597 AV *av = (AV *)SvRV (arg [0]);
2598
2599 if (SvIVX (AvARRAY (av)[0]) > 0)
2600 {
2601 frame->data = (void *)av;
2602 frame->prepare = prepare_nop;
2603 }
2604 else
2605 {
2606 av_push (av, SvREFCNT_inc (SvRV (coro_current)));
2607
2608 frame->data = (void *)sv_2mortal (SvREFCNT_inc ((SV *)av));
2609 frame->prepare = prepare_schedule;
2610
2611 /* to avoid race conditions when a woken-up coro gets terminated */
2612 /* we arrange for a temporary on_destroy that calls adjust (0) */
2613 SvSTATE_current->on_destroy = coro_semaphore_on_destroy;
2614 }
2615}
2616
2617static void
2618slf_init_semaphore_down (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2619{
2620 slf_init_semaphore_down_or_wait (aTHX_ frame, cv, arg, items);
2621 frame->check = slf_check_semaphore_down;
2622}
2623
2624static void
2625slf_init_semaphore_wait (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2626{
2627 if (items >= 2)
2628 {
2629 /* callback form */
2630 AV *av = (AV *)SvRV (arg [0]);
2631 SV *cb_cv = s_get_cv_croak (arg [1]);
2632
2633 av_push (av, SvREFCNT_inc_NN (cb_cv));
2634
2635 if (SvIVX (AvARRAY (av)[0]) > 0)
2636 coro_semaphore_adjust (aTHX_ av, 0);
2637
2638 frame->prepare = prepare_nop;
2639 frame->check = slf_check_nop;
2640 }
2641 else
2642 {
2643 slf_init_semaphore_down_or_wait (aTHX_ frame, cv, arg, items);
2644 frame->check = slf_check_semaphore_wait;
2645 }
2646}
2647
2648/* signal */
2649
2650static void
2651coro_signal_wake (pTHX_ AV *av, int count)
2652{
2653 SvIVX (AvARRAY (av)[0]) = 0;
2654
2655 /* now signal count waiters */
2656 while (count > 0 && AvFILLp (av) > 0)
2657 {
2658 SV *cb;
2659
2660 /* swap first two elements so we can shift a waiter */
2661 cb = AvARRAY (av)[0];
2662 AvARRAY (av)[0] = AvARRAY (av)[1];
2663 AvARRAY (av)[1] = cb;
2664
2665 cb = av_shift (av);
2666
2667 if (SvTYPE (cb) == SVt_PVCV)
2668 {
2669 dSP;
2670 PUSHMARK (SP);
2671 XPUSHs (sv_2mortal (newRV_inc ((SV *)av)));
2672 PUTBACK;
2673 call_sv (cb, G_VOID | G_DISCARD | G_EVAL | G_KEEPERR);
2674 }
2675 else
2676 {
2677 api_ready (aTHX_ cb);
2678 sv_setiv (cb, 0); /* signal waiter */
2679 }
2680
2681 SvREFCNT_dec (cb);
2682
2683 --count;
2684 }
2685}
2686
2687static int
2688slf_check_signal_wait (pTHX_ struct CoroSLF *frame)
2689{
2690 /* if we are about to throw, also stop waiting */
2691 return SvROK ((SV *)frame->data) && !CORO_THROW;
2692}
2693
2694static void
2695slf_init_signal_wait (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2696{
2697 AV *av = (AV *)SvRV (arg [0]);
2698
2699 if (items >= 2)
2700 {
2701 SV *cb_cv = s_get_cv_croak (arg [1]);
2702 av_push (av, SvREFCNT_inc_NN (cb_cv));
2703
2704 if (SvIVX (AvARRAY (av)[0]))
2705 coro_signal_wake (aTHX_ av, 1); /* ust be the only waiter */
2706
2707 frame->prepare = prepare_nop;
2708 frame->check = slf_check_nop;
2709 }
2710 else if (SvIVX (AvARRAY (av)[0]))
2711 {
2712 SvIVX (AvARRAY (av)[0]) = 0;
2713 frame->prepare = prepare_nop;
2714 frame->check = slf_check_nop;
2715 }
2716 else
2717 {
2718 SV *waiter = newSVsv (coro_current); /* owned by signal av */
2719
2720 av_push (av, waiter);
2721
2722 frame->data = (void *)sv_2mortal (SvREFCNT_inc_NN (waiter)); /* owned by process */
2723 frame->prepare = prepare_schedule;
2724 frame->check = slf_check_signal_wait;
2725 }
2726}
2727
2728/*****************************************************************************/
2729/* Coro::AIO */
2730
2731#define CORO_MAGIC_type_aio PERL_MAGIC_ext
2732
2733/* helper storage struct */
2734struct io_state
2735{
2736 int errorno;
2737 I32 laststype; /* U16 in 5.10.0 */
2738 int laststatval;
2739 Stat_t statcache;
2740};
2741
2742static void
2743coro_aio_callback (pTHX_ CV *cv)
2744{
2745 dXSARGS;
2746 AV *state = (AV *)S_GENSUB_ARG;
2747 SV *coro = av_pop (state);
2748 SV *data_sv = newSV (sizeof (struct io_state));
2749
2750 av_extend (state, items - 1);
2751
2752 sv_upgrade (data_sv, SVt_PV);
2753 SvCUR_set (data_sv, sizeof (struct io_state));
2754 SvPOK_only (data_sv);
2755
2756 {
2757 struct io_state *data = (struct io_state *)SvPVX (data_sv);
2758
2759 data->errorno = errno;
2760 data->laststype = PL_laststype;
2761 data->laststatval = PL_laststatval;
2762 data->statcache = PL_statcache;
2763 }
2764
2765 /* now build the result vector out of all the parameters and the data_sv */
2766 {
2767 int i;
2768
2769 for (i = 0; i < items; ++i)
2770 av_push (state, SvREFCNT_inc_NN (ST (i)));
2771 }
2772
2773 av_push (state, data_sv);
2774
2775 api_ready (aTHX_ coro);
2776 SvREFCNT_dec (coro);
2777 SvREFCNT_dec ((AV *)state);
2778}
2779
2780static int
2781slf_check_aio_req (pTHX_ struct CoroSLF *frame)
2782{
2783 AV *state = (AV *)frame->data;
2784
2785 /* if we are about to throw, return early */
2786 /* this does not cancel the aio request, but at least */
2787 /* it quickly returns */
2788 if (CORO_THROW)
2789 return 0;
2790
2791 /* one element that is an RV? repeat! */
2792 if (AvFILLp (state) == 0 && SvROK (AvARRAY (state)[0]))
2793 return 1;
2794
2795 /* restore status */
2796 {
2797 SV *data_sv = av_pop (state);
2798 struct io_state *data = (struct io_state *)SvPVX (data_sv);
2799
2800 errno = data->errorno;
2801 PL_laststype = data->laststype;
2802 PL_laststatval = data->laststatval;
2803 PL_statcache = data->statcache;
2804
2805 SvREFCNT_dec (data_sv);
2806 }
2807
2808 /* push result values */
2809 {
2810 dSP;
2811 int i;
2812
2813 EXTEND (SP, AvFILLp (state) + 1);
2814 for (i = 0; i <= AvFILLp (state); ++i)
2815 PUSHs (sv_2mortal (SvREFCNT_inc_NN (AvARRAY (state)[i])));
2816
2817 PUTBACK;
2818 }
2819
2820 return 0;
2821}
2822
2823static void
2824slf_init_aio_req (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2825{
2826 AV *state = (AV *)sv_2mortal ((SV *)newAV ());
2827 SV *coro_hv = SvRV (coro_current);
2828 struct coro *coro = SvSTATE_hv (coro_hv);
2829
2830 /* put our coroutine id on the state arg */
2831 av_push (state, SvREFCNT_inc_NN (coro_hv));
2832
2833 /* first see whether we have a non-zero priority and set it as AIO prio */
2834 if (coro->prio)
2835 {
2836 dSP;
2837
2838 static SV *prio_cv;
2839 static SV *prio_sv;
2840
2841 if (expect_false (!prio_cv))
2842 {
2843 prio_cv = (SV *)get_cv ("IO::AIO::aioreq_pri", 0);
2844 prio_sv = newSViv (0);
2845 }
2846
2847 PUSHMARK (SP);
2848 sv_setiv (prio_sv, coro->prio);
2849 XPUSHs (prio_sv);
2850
2851 PUTBACK;
2852 call_sv (prio_cv, G_VOID | G_DISCARD);
2853 }
2854
2855 /* now call the original request */
2856 {
2857 dSP;
2858 CV *req = (CV *)CORO_MAGIC_NN ((SV *)cv, CORO_MAGIC_type_aio)->mg_obj;
2859 int i;
2860
2861 PUSHMARK (SP);
2862
2863 /* first push all args to the stack */
2864 EXTEND (SP, items + 1);
2865
2866 for (i = 0; i < items; ++i)
2867 PUSHs (arg [i]);
2868
2869 /* now push the callback closure */
2870 PUSHs (sv_2mortal (s_gensub (aTHX_ coro_aio_callback, (void *)SvREFCNT_inc_NN ((SV *)state))));
2871
2872 /* now call the AIO function - we assume our request is uncancelable */
2873 PUTBACK;
2874 call_sv ((SV *)req, G_VOID | G_DISCARD);
2875 }
2876
2877 /* now that the requets is going, we loop toll we have a result */
2878 frame->data = (void *)state;
2879 frame->prepare = prepare_schedule;
2880 frame->check = slf_check_aio_req;
2881}
2882
2883static void
2884coro_aio_req_xs (pTHX_ CV *cv)
2885{
2886 dXSARGS;
2887
2888 CORO_EXECUTE_SLF_XS (slf_init_aio_req);
2889
2890 XSRETURN_EMPTY;
2891}
2892
2893/*****************************************************************************/
2894
2895#if CORO_CLONE
2896# include "clone.c"
2897#endif
2898
2899MODULE = Coro::State PACKAGE = Coro::State PREFIX = api_
2900
2901PROTOTYPES: DISABLE
2902
2903BOOT:
2904{
2905#ifdef USE_ITHREADS
2906# if CORO_PTHREAD
2907 coro_thx = PERL_GET_CONTEXT;
2908# endif
2909#endif
2910 BOOT_PAGESIZE;
2911
2912 cctx_current = cctx_new_empty ();
2913
2914 irsgv = gv_fetchpv ("/" , GV_ADD|GV_NOTQUAL, SVt_PV);
2915 stdoutgv = gv_fetchpv ("STDOUT", GV_ADD|GV_NOTQUAL, SVt_PVIO);
2916
2917 orig_sigelem_get = PL_vtbl_sigelem.svt_get; PL_vtbl_sigelem.svt_get = coro_sigelem_get;
2918 orig_sigelem_set = PL_vtbl_sigelem.svt_set; PL_vtbl_sigelem.svt_set = coro_sigelem_set;
2919 orig_sigelem_clr = PL_vtbl_sigelem.svt_clear; PL_vtbl_sigelem.svt_clear = coro_sigelem_clr;
2920
2921 hv_sig = coro_get_hv (aTHX_ "SIG", TRUE);
2922 rv_diehook = newRV_inc ((SV *)gv_fetchpv ("Coro::State::diehook" , 0, SVt_PVCV));
2923 rv_warnhook = newRV_inc ((SV *)gv_fetchpv ("Coro::State::warnhook", 0, SVt_PVCV));
2924
2925 coro_state_stash = gv_stashpv ("Coro::State", TRUE);
2926
2927 newCONSTSUB (coro_state_stash, "CC_TRACE" , newSViv (CC_TRACE));
2928 newCONSTSUB (coro_state_stash, "CC_TRACE_SUB" , newSViv (CC_TRACE_SUB));
2929 newCONSTSUB (coro_state_stash, "CC_TRACE_LINE", newSViv (CC_TRACE_LINE));
2930 newCONSTSUB (coro_state_stash, "CC_TRACE_ALL" , newSViv (CC_TRACE_ALL));
2931
2932 main_mainstack = PL_mainstack;
2933 main_top_env = PL_top_env;
2934
2935 while (main_top_env->je_prev)
2936 main_top_env = main_top_env->je_prev;
2937
2938 {
2939 SV *slf = sv_2mortal (newSViv (PTR2IV (pp_slf)));
2940
2941 if (!PL_custom_op_names) PL_custom_op_names = newHV ();
2942 hv_store_ent (PL_custom_op_names, slf, newSVpv ("coro_slf", 0), 0);
2943
2944 if (!PL_custom_op_descs) PL_custom_op_descs = newHV ();
2945 hv_store_ent (PL_custom_op_descs, slf, newSVpv ("coro schedule like function", 0), 0);
2946 }
2947
2948 coroapi.ver = CORO_API_VERSION;
2949 coroapi.rev = CORO_API_REVISION;
2950
2951 coroapi.transfer = api_transfer;
2952
2953 coroapi.sv_state = SvSTATE_;
2954 coroapi.execute_slf = api_execute_slf;
2955 coroapi.prepare_nop = prepare_nop;
2956 coroapi.prepare_schedule = prepare_schedule;
2957 coroapi.prepare_cede = prepare_cede;
2958 coroapi.prepare_cede_notself = prepare_cede_notself;
2959
2960 {
2961 SV **svp = hv_fetch (PL_modglobal, "Time::NVtime", 12, 0);
2962
2963 if (!svp) croak ("Time::HiRes is required");
2964 if (!SvIOK (*svp)) croak ("Time::NVtime isn't a function pointer");
2965
2966 nvtime = INT2PTR (double (*)(), SvIV (*svp));
2967
2968 svp = hv_fetch (PL_modglobal, "Time::U2time", 12, 0);
2969 u2time = INT2PTR (void (*)(pTHX_ UV ret[2]), SvIV (*svp));
2970 }
2971
2972 assert (("PRIO_NORMAL must be 0", !CORO_PRIO_NORMAL));
2973}
2974
2975SV *
2976new (char *klass, ...)
2977 ALIAS:
2978 Coro::new = 1
2979 CODE:
2980{
2981 struct coro *coro;
2982 MAGIC *mg;
2983 HV *hv;
2984 SV *cb;
2985 int i;
2986
2987 if (items > 1)
2988 {
2989 cb = s_get_cv_croak (ST (1));
2990
2991 if (!ix)
235 { 2992 {
236 /* I never used formats, so how should I know how these are implemented? */ 2993 if (CvISXSUB (cb))
237 /* my bold guess is as a simple, plain sub... */ 2994 croak ("Coro::State doesn't support XS functions as coroutine start, caught");
238 croak ("CXt_FORMAT not yet handled. Don't switch coroutines from within formats"); 2995
2996 if (!CvROOT (cb))
2997 croak ("Coro::State doesn't support autoloaded or undefined functions as coroutine start, caught");
239 } 2998 }
240 } 2999 }
241 3000
242 if (top_si->si_type == PERLSI_MAIN)
243 break;
244
245 top_si = top_si->si_prev;
246 ccstk = top_si->si_cxstack;
247 cxix = top_si->si_cxix;
248 }
249
250 PUTBACK;
251 }
252
253 c->dowarn = PL_dowarn;
254 c->defav = GvAV (PL_defgv);
255 c->curstackinfo = PL_curstackinfo;
256 c->curstack = PL_curstack;
257 c->mainstack = PL_mainstack;
258 c->stack_sp = PL_stack_sp;
259 c->op = PL_op;
260 c->curpad = PL_curpad;
261 c->stack_base = PL_stack_base;
262 c->stack_max = PL_stack_max;
263 c->tmps_stack = PL_tmps_stack;
264 c->tmps_floor = PL_tmps_floor;
265 c->tmps_ix = PL_tmps_ix;
266 c->tmps_max = PL_tmps_max;
267 c->markstack = PL_markstack;
268 c->markstack_ptr = PL_markstack_ptr;
269 c->markstack_max = PL_markstack_max;
270 c->scopestack = PL_scopestack;
271 c->scopestack_ix = PL_scopestack_ix;
272 c->scopestack_max = PL_scopestack_max;
273 c->savestack = PL_savestack;
274 c->savestack_ix = PL_savestack_ix;
275 c->savestack_max = PL_savestack_max;
276 c->retstack = PL_retstack;
277 c->retstack_ix = PL_retstack_ix;
278 c->retstack_max = PL_retstack_max;
279 c->curcop = PL_curcop;
280}
281
282static void
283LOAD(pTHX_ Coro__State c)
284{
285 PL_dowarn = c->dowarn;
286 GvAV (PL_defgv) = c->defav;
287 PL_curstackinfo = c->curstackinfo;
288 PL_curstack = c->curstack;
289 PL_mainstack = c->mainstack;
290 PL_stack_sp = c->stack_sp;
291 PL_op = c->op;
292 PL_curpad = c->curpad;
293 PL_stack_base = c->stack_base;
294 PL_stack_max = c->stack_max;
295 PL_tmps_stack = c->tmps_stack;
296 PL_tmps_floor = c->tmps_floor;
297 PL_tmps_ix = c->tmps_ix;
298 PL_tmps_max = c->tmps_max;
299 PL_markstack = c->markstack;
300 PL_markstack_ptr = c->markstack_ptr;
301 PL_markstack_max = c->markstack_max;
302 PL_scopestack = c->scopestack;
303 PL_scopestack_ix = c->scopestack_ix;
304 PL_scopestack_max = c->scopestack_max;
305 PL_savestack = c->savestack;
306 PL_savestack_ix = c->savestack_ix;
307 PL_savestack_max = c->savestack_max;
308 PL_retstack = c->retstack;
309 PL_retstack_ix = c->retstack_ix;
310 PL_retstack_max = c->retstack_max;
311 PL_curcop = c->curcop;
312
313 {
314 dSP;
315 CV *cv;
316
317 /* now do the ugly restore mess */
318 while ((cv = (CV *)POPs))
319 {
320 AV *padlist = (AV *)POPs;
321
322 put_padlist (cv);
323 CvPADLIST(cv) = padlist;
324 CvDEPTH(cv) = (I32)POPs;
325
326#ifdef USE_THREADS
327 CvOWNER(cv) = (struct perl_thread *)POPs;
328 error does not work either
329#endif
330 }
331
332 PUTBACK;
333 }
334}
335
336/* this is an EXACT copy of S_nuke_stacks in perl.c, which is unfortunately static */
337STATIC void
338destroy_stacks(pTHX)
339{
340 /* die does this while calling POPSTACK, but I just don't see why. */
341 dounwind(-1);
342
343 /* is this ugly, I ask? */
344 while (PL_scopestack_ix)
345 LEAVE;
346
347 while (PL_curstackinfo->si_next)
348 PL_curstackinfo = PL_curstackinfo->si_next;
349
350 while (PL_curstackinfo)
351 {
352 PERL_SI *p = PL_curstackinfo->si_prev;
353
354 SvREFCNT_dec(PL_curstackinfo->si_stack);
355 Safefree(PL_curstackinfo->si_cxstack);
356 Safefree(PL_curstackinfo);
357 PL_curstackinfo = p;
358 }
359
360 if (PL_scopestack_ix != 0)
361 Perl_warner(aTHX_ WARN_INTERNAL,
362 "Unbalanced scopes: %ld more ENTERs than LEAVEs\n",
363 (long)PL_scopestack_ix);
364 if (PL_savestack_ix != 0)
365 Perl_warner(aTHX_ WARN_INTERNAL,
366 "Unbalanced saves: %ld more saves than restores\n",
367 (long)PL_savestack_ix);
368 if (PL_tmps_floor != -1)
369 Perl_warner(aTHX_ WARN_INTERNAL,"Unbalanced tmps: %ld more allocs than frees\n",
370 (long)PL_tmps_floor + 1);
371 /*
372 */
373 Safefree(PL_tmps_stack);
374 Safefree(PL_markstack);
375 Safefree(PL_scopestack);
376 Safefree(PL_savestack);
377 Safefree(PL_retstack);
378}
379
380#define SUB_INIT "Coro::State::_newcoro"
381
382MODULE = Coro::State PACKAGE = Coro::State
383
384PROTOTYPES: ENABLE
385
386BOOT:
387 if (!padlist_cache)
388 padlist_cache = newHV ();
389
390Coro::State
391_newprocess(args)
392 SV * args
393 PROTOTYPE: $
394 CODE:
395 Coro__State coro;
396
397 if (!SvROK (args) || SvTYPE (SvRV (args)) != SVt_PVAV)
398 croak ("Coro::State::newprocess expects an arrayref");
399
400 New (0, coro, 1, struct coro); 3001 Newz (0, coro, 1, struct coro);
3002 coro->args = newAV ();
3003 coro->flags = CF_NEW;
401 3004
402 coro->mainstack = 0; /* actual work is done inside transfer */ 3005 if (coro_first) coro_first->prev = coro;
403 coro->args = (AV *)SvREFCNT_inc (SvRV (args)); 3006 coro->next = coro_first;
3007 coro_first = coro;
404 3008
405 RETVAL = coro; 3009 coro->hv = hv = newHV ();
3010 mg = sv_magicext ((SV *)hv, 0, CORO_MAGIC_type_state, &coro_state_vtbl, (char *)coro, 0);
3011 mg->mg_flags |= MGf_DUP;
3012 RETVAL = sv_bless (newRV_noinc ((SV *)hv), gv_stashpv (klass, 1));
3013
3014 if (items > 1)
3015 {
3016 av_extend (coro->args, items - 1 + ix - 1);
3017
3018 if (ix)
3019 {
3020 av_push (coro->args, SvREFCNT_inc_NN ((SV *)cb));
3021 cb = (SV *)cv_coro_run;
3022 }
3023
3024 coro->startcv = (CV *)SvREFCNT_inc_NN ((SV *)cb);
3025
3026 for (i = 2; i < items; i++)
3027 av_push (coro->args, newSVsv (ST (i)));
3028 }
3029}
406 OUTPUT: 3030 OUTPUT:
407 RETVAL 3031 RETVAL
408 3032
409void 3033void
410transfer(prev,next) 3034transfer (...)
411 Coro::State_or_hashref prev 3035 PROTOTYPE: $$
412 Coro::State_or_hashref next 3036 CODE:
413 CODE: 3037 CORO_EXECUTE_SLF_XS (slf_init_transfer);
414 3038
415 if (prev != next) 3039bool
3040_destroy (SV *coro_sv)
3041 CODE:
3042 RETVAL = coro_state_destroy (aTHX_ SvSTATE (coro_sv));
3043 OUTPUT:
3044 RETVAL
3045
3046void
3047_exit (int code)
3048 PROTOTYPE: $
3049 CODE:
3050 _exit (code);
3051
3052SV *
3053clone (Coro::State coro)
3054 CODE:
3055{
3056#if CORO_CLONE
3057 struct coro *ncoro = coro_clone (aTHX_ coro);
3058 MAGIC *mg;
3059 /* TODO: too much duplication */
3060 ncoro->hv = newHV ();
3061 mg = sv_magicext ((SV *)ncoro->hv, 0, CORO_MAGIC_type_state, &coro_state_vtbl, (char *)ncoro, 0);
3062 mg->mg_flags |= MGf_DUP;
3063 RETVAL = sv_bless (newRV_noinc ((SV *)ncoro->hv), SvSTASH (coro->hv));
3064#else
3065 croak ("Coro::State->clone has not been configured into this installation of Coro, realised");
3066#endif
3067}
3068 OUTPUT:
3069 RETVAL
3070
3071int
3072cctx_stacksize (int new_stacksize = 0)
3073 PROTOTYPE: ;$
3074 CODE:
3075 RETVAL = cctx_stacksize;
3076 if (new_stacksize)
416 { 3077 {
417 PUTBACK; 3078 cctx_stacksize = new_stacksize;
418 SAVE (aTHX_ prev); 3079 ++cctx_gen;
419
420 /* 3080 }
421 * this could be done in newprocess which would lead to 3081 OUTPUT:
422 * extremely elegant and fast (just PUTBACK/SAVE/LOAD/SPAGAIN) 3082 RETVAL
423 * code here, but lazy allocation of stacks has also 3083
424 * some virtues and the overhead of the if() is nil. 3084int
3085cctx_max_idle (int max_idle = 0)
3086 PROTOTYPE: ;$
3087 CODE:
3088 RETVAL = cctx_max_idle;
3089 if (max_idle > 1)
3090 cctx_max_idle = max_idle;
3091 OUTPUT:
3092 RETVAL
3093
3094int
3095cctx_count ()
3096 PROTOTYPE:
3097 CODE:
3098 RETVAL = cctx_count;
3099 OUTPUT:
3100 RETVAL
3101
3102int
3103cctx_idle ()
3104 PROTOTYPE:
3105 CODE:
3106 RETVAL = cctx_idle;
3107 OUTPUT:
3108 RETVAL
3109
3110void
3111list ()
3112 PROTOTYPE:
3113 PPCODE:
3114{
3115 struct coro *coro;
3116 for (coro = coro_first; coro; coro = coro->next)
3117 if (coro->hv)
3118 XPUSHs (sv_2mortal (newRV_inc ((SV *)coro->hv)));
3119}
3120
3121void
3122call (Coro::State coro, SV *coderef)
3123 ALIAS:
3124 eval = 1
3125 CODE:
3126{
3127 if (coro->mainstack && ((coro->flags & CF_RUNNING) || coro->slot))
425 */ 3128 {
426 if (next->mainstack) 3129 struct coro *current = SvSTATE_current;
3130
3131 if (current != coro)
427 { 3132 {
428 LOAD (aTHX_ next); 3133 PUTBACK;
429 next->mainstack = 0; /* unnecessary but much cleaner */ 3134 save_perl (aTHX_ current);
3135 load_perl (aTHX_ coro);
430 SPAGAIN; 3136 SPAGAIN;
431 } 3137 }
3138
3139 PUSHSTACK;
3140
3141 PUSHMARK (SP);
3142 PUTBACK;
3143
3144 if (ix)
3145 eval_sv (coderef, 0);
432 else 3146 else
3147 call_sv (coderef, G_KEEPERR | G_EVAL | G_VOID | G_DISCARD);
3148
3149 POPSTACK;
3150 SPAGAIN;
3151
3152 if (current != coro)
433 { 3153 {
434 /* 3154 PUTBACK;
435 * emulate part of the perl startup here. 3155 save_perl (aTHX_ coro);
436 */ 3156 load_perl (aTHX_ current);
437 UNOP myop;
438
439 init_stacks (); /* from perl.c */
440 PL_op = (OP *)&myop;
441 /*PL_curcop = 0;*/
442 GvAV (PL_defgv) = (AV *)SvREFCNT_inc ((SV *)next->args);
443
444 SPAGAIN; 3157 SPAGAIN;
445 Zero(&myop, 1, UNOP);
446 myop.op_next = Nullop;
447 myop.op_flags = OPf_WANT_VOID;
448
449 PUSHMARK(SP);
450 XPUSHs ((SV*)get_cv(SUB_INIT, TRUE));
451 PUTBACK;
452 /*
453 * the next line is slightly wrong, as PL_op->op_next
454 * is actually being executed so we skip the first op.
455 * that doesn't matter, though, since it is only
456 * pp_nextstate and we never return...
457 */
458 PL_op = Perl_pp_entersub(aTHX);
459 SPAGAIN;
460
461 ENTER;
462 } 3158 }
463 } 3159 }
3160}
3161
3162SV *
3163is_ready (Coro::State coro)
3164 PROTOTYPE: $
3165 ALIAS:
3166 is_ready = CF_READY
3167 is_running = CF_RUNNING
3168 is_new = CF_NEW
3169 is_destroyed = CF_DESTROYED
3170 is_suspended = CF_SUSPENDED
3171 CODE:
3172 RETVAL = boolSV (coro->flags & ix);
3173 OUTPUT:
3174 RETVAL
464 3175
465void 3176void
466DESTROY(coro) 3177throw (Coro::State self, SV *throw = &PL_sv_undef)
467 Coro::State coro 3178 PROTOTYPE: $;$
468 CODE: 3179 CODE:
3180{
3181 struct coro *current = SvSTATE_current;
3182 SV **throwp = self == current ? &CORO_THROW : &self->except;
3183 SvREFCNT_dec (*throwp);
3184 SvGETMAGIC (throw);
3185 *throwp = SvOK (throw) ? newSVsv (throw) : 0;
3186}
469 3187
470 if (coro->mainstack) 3188void
3189api_trace (SV *coro, int flags = CC_TRACE | CC_TRACE_SUB)
3190 PROTOTYPE: $;$
3191 C_ARGS: aTHX_ coro, flags
3192
3193SV *
3194has_cctx (Coro::State coro)
3195 PROTOTYPE: $
3196 CODE:
3197 /* maybe manage the running flag differently */
3198 RETVAL = boolSV (!!coro->cctx || (coro->flags & CF_RUNNING));
3199 OUTPUT:
3200 RETVAL
3201
3202int
3203is_traced (Coro::State coro)
3204 PROTOTYPE: $
3205 CODE:
3206 RETVAL = (coro->cctx ? coro->cctx->flags : 0) & CC_TRACE_ALL;
3207 OUTPUT:
3208 RETVAL
3209
3210UV
3211rss (Coro::State coro)
3212 PROTOTYPE: $
3213 ALIAS:
3214 usecount = 1
3215 CODE:
3216 switch (ix)
3217 {
3218 case 0: RETVAL = coro_rss (aTHX_ coro); break;
3219 case 1: RETVAL = coro->usecount; break;
3220 }
3221 OUTPUT:
3222 RETVAL
3223
3224void
3225force_cctx ()
3226 PROTOTYPE:
3227 CODE:
3228 cctx_current->idle_sp = 0;
3229
3230void
3231swap_defsv (Coro::State self)
3232 PROTOTYPE: $
3233 ALIAS:
3234 swap_defav = 1
3235 CODE:
3236 if (!self->slot)
3237 croak ("cannot swap state with coroutine that has no saved state,");
3238 else
471 { 3239 {
472 struct coro temp; 3240 SV **src = ix ? (SV **)&GvAV (PL_defgv) : &GvSV (PL_defgv);
3241 SV **dst = ix ? (SV **)&self->slot->defav : (SV **)&self->slot->defsv;
473 3242
3243 SV *tmp = *src; *src = *dst; *dst = tmp;
3244 }
3245
3246void
3247cancel (Coro::State self)
3248 CODE:
3249 coro_state_destroy (aTHX_ self);
3250 coro_call_on_destroy (aTHX_ self); /* actually only for Coro objects */
3251
3252
3253SV *
3254enable_times (int enabled = enable_times)
3255 CODE:
3256{
3257 RETVAL = boolSV (enable_times);
3258
3259 if (enabled != enable_times)
3260 {
3261 enable_times = enabled;
3262
3263 coro_times_update ();
3264 (enabled ? coro_times_sub : coro_times_add)(SvSTATE (coro_current));
3265 }
3266}
3267 OUTPUT:
3268 RETVAL
3269
3270void
3271times (Coro::State self)
3272 PPCODE:
3273{
3274 struct coro *current = SvSTATE (coro_current);
3275
3276 if (expect_false (current == self))
3277 {
3278 coro_times_update ();
3279 coro_times_add (SvSTATE (coro_current));
3280 }
3281
3282 EXTEND (SP, 2);
3283 PUSHs (sv_2mortal (newSVnv (self->t_real [0] + self->t_real [1] * 1e-9)));
3284 PUSHs (sv_2mortal (newSVnv (self->t_cpu [0] + self->t_cpu [1] * 1e-9)));
3285
3286 if (expect_false (current == self))
3287 coro_times_sub (SvSTATE (coro_current));
3288}
3289
3290MODULE = Coro::State PACKAGE = Coro
3291
3292BOOT:
3293{
3294 sv_pool_rss = coro_get_sv (aTHX_ "Coro::POOL_RSS" , TRUE);
3295 sv_pool_size = coro_get_sv (aTHX_ "Coro::POOL_SIZE" , TRUE);
3296 cv_coro_run = get_cv ( "Coro::_coro_run" , GV_ADD);
3297 cv_coro_terminate = get_cv ( "Coro::terminate" , GV_ADD);
3298 coro_current = coro_get_sv (aTHX_ "Coro::current" , FALSE); SvREADONLY_on (coro_current);
3299 av_async_pool = coro_get_av (aTHX_ "Coro::async_pool", TRUE);
3300 av_destroy = coro_get_av (aTHX_ "Coro::destroy" , TRUE);
3301 sv_manager = coro_get_sv (aTHX_ "Coro::manager" , TRUE);
3302 sv_idle = coro_get_sv (aTHX_ "Coro::idle" , TRUE);
3303
3304 sv_async_pool_idle = newSVpv ("[async pool idle]", 0); SvREADONLY_on (sv_async_pool_idle);
3305 sv_Coro = newSVpv ("Coro", 0); SvREADONLY_on (sv_Coro);
3306 cv_pool_handler = get_cv ("Coro::pool_handler", GV_ADD); SvREADONLY_on (cv_pool_handler);
3307 cv_coro_state_new = get_cv ("Coro::State::new", 0); SvREADONLY_on (cv_coro_state_new);
3308
3309 coro_stash = gv_stashpv ("Coro", TRUE);
3310
3311 newCONSTSUB (coro_stash, "PRIO_MAX", newSViv (CORO_PRIO_MAX));
3312 newCONSTSUB (coro_stash, "PRIO_HIGH", newSViv (CORO_PRIO_HIGH));
3313 newCONSTSUB (coro_stash, "PRIO_NORMAL", newSViv (CORO_PRIO_NORMAL));
3314 newCONSTSUB (coro_stash, "PRIO_LOW", newSViv (CORO_PRIO_LOW));
3315 newCONSTSUB (coro_stash, "PRIO_IDLE", newSViv (CORO_PRIO_IDLE));
3316 newCONSTSUB (coro_stash, "PRIO_MIN", newSViv (CORO_PRIO_MIN));
3317
3318 {
3319 SV *sv = coro_get_sv (aTHX_ "Coro::API", TRUE);
3320
3321 coroapi.schedule = api_schedule;
3322 coroapi.schedule_to = api_schedule_to;
3323 coroapi.cede = api_cede;
3324 coroapi.cede_notself = api_cede_notself;
3325 coroapi.ready = api_ready;
3326 coroapi.is_ready = api_is_ready;
3327 coroapi.nready = coro_nready;
3328 coroapi.current = coro_current;
3329
3330 /*GCoroAPI = &coroapi;*/
3331 sv_setiv (sv, (IV)&coroapi);
3332 SvREADONLY_on (sv);
3333 }
3334}
3335
3336void
3337terminate (...)
3338 CODE:
3339 CORO_EXECUTE_SLF_XS (slf_init_terminate);
3340
3341void
3342schedule (...)
3343 CODE:
3344 CORO_EXECUTE_SLF_XS (slf_init_schedule);
3345
3346void
3347schedule_to (...)
3348 CODE:
3349 CORO_EXECUTE_SLF_XS (slf_init_schedule_to);
3350
3351void
3352cede_to (...)
3353 CODE:
3354 CORO_EXECUTE_SLF_XS (slf_init_cede_to);
3355
3356void
3357cede (...)
3358 CODE:
3359 CORO_EXECUTE_SLF_XS (slf_init_cede);
3360
3361void
3362cede_notself (...)
3363 CODE:
3364 CORO_EXECUTE_SLF_XS (slf_init_cede_notself);
3365
3366void
3367_set_current (SV *current)
3368 PROTOTYPE: $
3369 CODE:
3370 SvREFCNT_dec (SvRV (coro_current));
3371 SvRV_set (coro_current, SvREFCNT_inc_NN (SvRV (current)));
3372
3373void
3374_set_readyhook (SV *hook)
3375 PROTOTYPE: $
3376 CODE:
3377 SvREFCNT_dec (coro_readyhook);
3378 SvGETMAGIC (hook);
3379 coro_readyhook = SvOK (hook) ? newSVsv (hook) : 0;
3380
3381int
3382prio (Coro::State coro, int newprio = 0)
3383 PROTOTYPE: $;$
3384 ALIAS:
3385 nice = 1
3386 CODE:
3387{
3388 RETVAL = coro->prio;
3389
3390 if (items > 1)
3391 {
3392 if (ix)
3393 newprio = coro->prio - newprio;
3394
3395 if (newprio < CORO_PRIO_MIN) newprio = CORO_PRIO_MIN;
3396 if (newprio > CORO_PRIO_MAX) newprio = CORO_PRIO_MAX;
3397
3398 coro->prio = newprio;
3399 }
3400}
3401 OUTPUT:
3402 RETVAL
3403
3404SV *
3405ready (SV *self)
3406 PROTOTYPE: $
3407 CODE:
3408 RETVAL = boolSV (api_ready (aTHX_ self));
3409 OUTPUT:
3410 RETVAL
3411
3412int
3413nready (...)
3414 PROTOTYPE:
3415 CODE:
3416 RETVAL = coro_nready;
3417 OUTPUT:
3418 RETVAL
3419
3420void
3421suspend (Coro::State self)
3422 PROTOTYPE: $
3423 CODE:
3424 self->flags |= CF_SUSPENDED;
3425
3426void
3427resume (Coro::State self)
3428 PROTOTYPE: $
3429 CODE:
3430 self->flags &= ~CF_SUSPENDED;
3431
3432void
3433_pool_handler (...)
3434 CODE:
3435 CORO_EXECUTE_SLF_XS (slf_init_pool_handler);
3436
3437void
3438async_pool (SV *cv, ...)
3439 PROTOTYPE: &@
3440 PPCODE:
3441{
3442 HV *hv = (HV *)av_pop (av_async_pool);
3443 AV *av = newAV ();
3444 SV *cb = ST (0);
3445 int i;
3446
3447 av_extend (av, items - 2);
3448 for (i = 1; i < items; ++i)
3449 av_push (av, SvREFCNT_inc_NN (ST (i)));
3450
3451 if ((SV *)hv == &PL_sv_undef)
3452 {
3453 PUSHMARK (SP);
3454 EXTEND (SP, 2);
3455 PUSHs (sv_Coro);
3456 PUSHs ((SV *)cv_pool_handler);
474 PUTBACK; 3457 PUTBACK;
475 SAVE(aTHX_ (&temp)); 3458 call_sv ((SV *)cv_coro_state_new, G_SCALAR);
476 LOAD(aTHX_ coro);
477
478 destroy_stacks ();
479 SvREFCNT_dec ((SV *)GvAV (PL_defgv));
480
481 LOAD((&temp));
482 SPAGAIN; 3459 SPAGAIN;
3460
3461 hv = (HV *)SvREFCNT_inc_NN (SvRV (POPs));
483 } 3462 }
484 3463
3464 {
3465 struct coro *coro = SvSTATE_hv (hv);
3466
3467 assert (!coro->invoke_cb);
3468 assert (!coro->invoke_av);
3469 coro->invoke_cb = SvREFCNT_inc (cb);
3470 coro->invoke_av = av;
3471 }
3472
3473 api_ready (aTHX_ (SV *)hv);
3474
3475 if (GIMME_V != G_VOID)
3476 XPUSHs (sv_2mortal (newRV_noinc ((SV *)hv)));
3477 else
485 SvREFCNT_dec (coro->args); 3478 SvREFCNT_dec (hv);
486 Safefree (coro); 3479}
487 3480
3481SV *
3482rouse_cb ()
3483 PROTOTYPE:
3484 CODE:
3485 RETVAL = coro_new_rouse_cb (aTHX);
3486 OUTPUT:
3487 RETVAL
488 3488
3489void
3490rouse_wait (...)
3491 PROTOTYPE: ;$
3492 PPCODE:
3493 CORO_EXECUTE_SLF_XS (slf_init_rouse_wait);
3494
3495void
3496on_enter (SV *block)
3497 ALIAS:
3498 on_leave = 1
3499 PROTOTYPE: &
3500 CODE:
3501{
3502 struct coro *coro = SvSTATE_current;
3503 AV **avp = ix ? &coro->on_leave : &coro->on_enter;
3504
3505 block = s_get_cv_croak (block);
3506
3507 if (!*avp)
3508 *avp = newAV ();
3509
3510 av_push (*avp, SvREFCNT_inc (block));
3511
3512 if (!ix)
3513 on_enterleave_call (aTHX_ block);
3514
3515 LEAVE; /* pp_entersub unfortunately forces an ENTER/LEAVE around XS calls */
3516 SAVEDESTRUCTOR_X (ix ? coro_pop_on_leave : coro_pop_on_enter, (void *)coro);
3517 ENTER; /* pp_entersub unfortunately forces an ENTER/LEAVE around XS calls */
3518}
3519
3520
3521MODULE = Coro::State PACKAGE = PerlIO::cede
3522
3523BOOT:
3524 PerlIO_define_layer (aTHX_ &PerlIO_cede);
3525
3526
3527MODULE = Coro::State PACKAGE = Coro::Semaphore
3528
3529SV *
3530new (SV *klass, SV *count = 0)
3531 CODE:
3532{
3533 int semcnt = 1;
3534
3535 if (count)
3536 {
3537 SvGETMAGIC (count);
3538
3539 if (SvOK (count))
3540 semcnt = SvIV (count);
3541 }
3542
3543 RETVAL = sv_bless (
3544 coro_waitarray_new (aTHX_ semcnt),
3545 GvSTASH (CvGV (cv))
3546 );
3547}
3548 OUTPUT:
3549 RETVAL
3550
3551# helper for Coro::Channel and others
3552SV *
3553_alloc (int count)
3554 CODE:
3555 RETVAL = coro_waitarray_new (aTHX_ count);
3556 OUTPUT:
3557 RETVAL
3558
3559SV *
3560count (SV *self)
3561 CODE:
3562 RETVAL = newSVsv (AvARRAY ((AV *)SvRV (self))[0]);
3563 OUTPUT:
3564 RETVAL
3565
3566void
3567up (SV *self, int adjust = 1)
3568 ALIAS:
3569 adjust = 1
3570 CODE:
3571 coro_semaphore_adjust (aTHX_ (AV *)SvRV (self), ix ? adjust : 1);
3572
3573void
3574down (...)
3575 CODE:
3576 CORO_EXECUTE_SLF_XS (slf_init_semaphore_down);
3577
3578void
3579wait (...)
3580 CODE:
3581 CORO_EXECUTE_SLF_XS (slf_init_semaphore_wait);
3582
3583void
3584try (SV *self)
3585 PPCODE:
3586{
3587 AV *av = (AV *)SvRV (self);
3588 SV *count_sv = AvARRAY (av)[0];
3589 IV count = SvIVX (count_sv);
3590
3591 if (count > 0)
3592 {
3593 --count;
3594 SvIVX (count_sv) = count;
3595 XSRETURN_YES;
3596 }
3597 else
3598 XSRETURN_NO;
3599}
3600
3601void
3602waiters (SV *self)
3603 PPCODE:
3604{
3605 AV *av = (AV *)SvRV (self);
3606 int wcount = AvFILLp (av) + 1 - 1;
3607
3608 if (GIMME_V == G_SCALAR)
3609 XPUSHs (sv_2mortal (newSViv (wcount)));
3610 else
3611 {
3612 int i;
3613 EXTEND (SP, wcount);
3614 for (i = 1; i <= wcount; ++i)
3615 PUSHs (sv_2mortal (newRV_inc (AvARRAY (av)[i])));
3616 }
3617}
3618
3619MODULE = Coro::State PACKAGE = Coro::SemaphoreSet
3620
3621void
3622_may_delete (SV *sem, int count, int extra_refs)
3623 PPCODE:
3624{
3625 AV *av = (AV *)SvRV (sem);
3626
3627 if (SvREFCNT ((SV *)av) == 1 + extra_refs
3628 && AvFILLp (av) == 0 /* no waiters, just count */
3629 && SvIV (AvARRAY (av)[0]) == count)
3630 XSRETURN_YES;
3631
3632 XSRETURN_NO;
3633}
3634
3635MODULE = Coro::State PACKAGE = Coro::Signal
3636
3637SV *
3638new (SV *klass)
3639 CODE:
3640 RETVAL = sv_bless (
3641 coro_waitarray_new (aTHX_ 0),
3642 GvSTASH (CvGV (cv))
3643 );
3644 OUTPUT:
3645 RETVAL
3646
3647void
3648wait (...)
3649 CODE:
3650 CORO_EXECUTE_SLF_XS (slf_init_signal_wait);
3651
3652void
3653broadcast (SV *self)
3654 CODE:
3655{
3656 AV *av = (AV *)SvRV (self);
3657 coro_signal_wake (aTHX_ av, AvFILLp (av));
3658}
3659
3660void
3661send (SV *self)
3662 CODE:
3663{
3664 AV *av = (AV *)SvRV (self);
3665
3666 if (AvFILLp (av))
3667 coro_signal_wake (aTHX_ av, 1);
3668 else
3669 SvIVX (AvARRAY (av)[0]) = 1; /* remember the signal */
3670}
3671
3672IV
3673awaited (SV *self)
3674 CODE:
3675 RETVAL = AvFILLp ((AV *)SvRV (self)) + 1 - 1;
3676 OUTPUT:
3677 RETVAL
3678
3679
3680MODULE = Coro::State PACKAGE = Coro::AnyEvent
3681
3682BOOT:
3683 sv_activity = coro_get_sv (aTHX_ "Coro::AnyEvent::ACTIVITY", TRUE);
3684
3685void
3686_schedule (...)
3687 CODE:
3688{
3689 static int incede;
3690
3691 api_cede_notself (aTHX);
3692
3693 ++incede;
3694 while (coro_nready >= incede && api_cede (aTHX))
3695 ;
3696
3697 sv_setsv (sv_activity, &PL_sv_undef);
3698 if (coro_nready >= incede)
3699 {
3700 PUSHMARK (SP);
3701 PUTBACK;
3702 call_pv ("Coro::AnyEvent::_activity", G_KEEPERR | G_EVAL | G_VOID | G_DISCARD);
3703 }
3704
3705 --incede;
3706}
3707
3708
3709MODULE = Coro::State PACKAGE = Coro::AIO
3710
3711void
3712_register (char *target, char *proto, SV *req)
3713 CODE:
3714{
3715 SV *req_cv = s_get_cv_croak (req);
3716 /* newXSproto doesn't return the CV on 5.8 */
3717 CV *slf_cv = newXS (target, coro_aio_req_xs, __FILE__);
3718 sv_setpv ((SV *)slf_cv, proto);
3719 sv_magicext ((SV *)slf_cv, (SV *)req_cv, CORO_MAGIC_type_aio, 0, 0, 0);
3720}
3721

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines