ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/Coro/Coro/State.xs
(Generate patch)

Comparing Coro/Coro/State.xs (file contents):
Revision 1.3 by root, Tue Jul 17 00:24:15 2001 UTC vs.
Revision 1.366 by root, Tue Jul 21 03:39:58 2009 UTC

1#include "libcoro/coro.c"
2
3#define PERL_NO_GET_CONTEXT
4#define PERL_EXT
5
1#include "EXTERN.h" 6#include "EXTERN.h"
2#include "perl.h" 7#include "perl.h"
3#include "XSUB.h" 8#include "XSUB.h"
9#include "perliol.h"
4 10
5#if 0 11#include "schmorp.h"
6# define CHK(x) (void *)0 12
13#include <stdio.h>
14#include <errno.h>
15#include <assert.h>
16
17#ifdef WIN32
18# undef setjmp
19# undef longjmp
20# undef _exit
21# define setjmp _setjmp /* deep magic */
7#else 22#else
8# define CHK(x) if (!(x)) croak("FATAL, CHK: " #x) 23# include <inttypes.h> /* most portable stdint.h */
9#endif 24#endif
10 25
26#ifdef HAVE_MMAP
27# include <unistd.h>
28# include <sys/mman.h>
29# ifndef MAP_ANONYMOUS
30# ifdef MAP_ANON
31# define MAP_ANONYMOUS MAP_ANON
32# else
33# undef HAVE_MMAP
34# endif
35# endif
36# include <limits.h>
37# ifndef PAGESIZE
38# define PAGESIZE pagesize
39# define BOOT_PAGESIZE pagesize = sysconf (_SC_PAGESIZE)
40static long pagesize;
41# else
42# define BOOT_PAGESIZE (void)0
43# endif
44#else
45# define PAGESIZE 0
46# define BOOT_PAGESIZE (void)0
47#endif
48
49#if CORO_USE_VALGRIND
50# include <valgrind/valgrind.h>
51#endif
52
53/* the maximum number of idle cctx that will be pooled */
54static int cctx_max_idle = 4;
55
56#if !__i386 && !__x86_64 && !__powerpc && !__m68k && !__alpha && !__mips && !__sparc64
57# undef CORO_STACKGUARD
58#endif
59
60#ifndef CORO_STACKGUARD
61# define CORO_STACKGUARD 0
62#endif
63
64/* prefer perl internal functions over our own? */
65#ifndef CORO_PREFER_PERL_FUNCTIONS
66# define CORO_PREFER_PERL_FUNCTIONS 0
67#endif
68
69/* The next macros try to return the current stack pointer, in an as
70 * portable way as possible. */
71#if __GNUC__ >= 4
72# define dSTACKLEVEL int stacklevel_dummy
73# define STACKLEVEL __builtin_frame_address (0)
74#else
75# define dSTACKLEVEL volatile void *stacklevel
76# define STACKLEVEL ((void *)&stacklevel)
77#endif
78
79#define IN_DESTRUCT PL_dirty
80
81#if __GNUC__ >= 3
82# define attribute(x) __attribute__(x)
83# define expect(expr,value) __builtin_expect ((expr), (value))
84# define INLINE static inline
85#else
86# define attribute(x)
87# define expect(expr,value) (expr)
88# define INLINE static
89#endif
90
91#define expect_false(expr) expect ((expr) != 0, 0)
92#define expect_true(expr) expect ((expr) != 0, 1)
93
94#define NOINLINE attribute ((noinline))
95
96#include "CoroAPI.h"
97#define GCoroAPI (&coroapi) /* very sneaky */
98
99#ifdef USE_ITHREADS
100# if CORO_PTHREAD
101static void *coro_thx;
102# endif
103#endif
104
105#ifdef __linux
106# include <time.h> /* for timespec */
107# include <syscall.h> /* for SYS_* */
108# ifdef SYS_clock_gettime
109# define coro_clock_gettime(id, ts) syscall (SYS_clock_gettime, (id), (ts))
110# define CORO_CLOCK_MONOTONIC 1
111# define CORO_CLOCK_THREAD_CPUTIME_ID 3
112# endif
113#endif
114
115static double (*nvtime)(); /* so why doesn't it take void? */
116static void (*u2time)(pTHX_ UV ret[2]);
117
118/* we hijack an hopefully unused CV flag for our purposes */
119#define CVf_SLF 0x4000
120static OP *pp_slf (pTHX);
121
122static U32 cctx_gen;
123static size_t cctx_stacksize = CORO_STACKSIZE;
124static struct CoroAPI coroapi;
125static AV *main_mainstack; /* used to differentiate between $main and others */
126static JMPENV *main_top_env;
127static HV *coro_state_stash, *coro_stash;
128static volatile SV *coro_mortal; /* will be freed/thrown after next transfer */
129
130static AV *av_destroy; /* destruction queue */
131static SV *sv_manager; /* the manager coro */
132static SV *sv_idle; /* $Coro::idle */
133
134static GV *irsgv; /* $/ */
135static GV *stdoutgv; /* *STDOUT */
136static SV *rv_diehook;
137static SV *rv_warnhook;
138static HV *hv_sig; /* %SIG */
139
140/* async_pool helper stuff */
141static SV *sv_pool_rss;
142static SV *sv_pool_size;
143static SV *sv_async_pool_idle; /* description string */
144static AV *av_async_pool; /* idle pool */
145static SV *sv_Coro; /* class string */
146static CV *cv_pool_handler;
147static CV *cv_coro_state_new;
148
149/* Coro::AnyEvent */
150static SV *sv_activity;
151
152/* enable processtime/realtime profiling */
153static char enable_times;
154typedef U32 coro_ts[2];
155static coro_ts time_real, time_cpu;
156static char times_valid;
157
158static struct coro_cctx *cctx_first;
159static int cctx_count, cctx_idle;
160
161enum {
162 CC_MAPPED = 0x01,
163 CC_NOREUSE = 0x02, /* throw this away after tracing */
164 CC_TRACE = 0x04,
165 CC_TRACE_SUB = 0x08, /* trace sub calls */
166 CC_TRACE_LINE = 0x10, /* trace each statement */
167 CC_TRACE_ALL = CC_TRACE_SUB | CC_TRACE_LINE,
168};
169
170/* this is a structure representing a c-level coroutine */
171typedef struct coro_cctx
172{
173 struct coro_cctx *next;
174
175 /* the stack */
176 void *sptr;
177 size_t ssize;
178
179 /* cpu state */
180 void *idle_sp; /* sp of top-level transfer/schedule/cede call */
181 JMPENV *idle_te; /* same as idle_sp, but for top_env, TODO: remove once stable */
182 JMPENV *top_env;
183 coro_context cctx;
184
185 U32 gen;
186#if CORO_USE_VALGRIND
187 int valgrind_id;
188#endif
189 unsigned char flags;
190} coro_cctx;
191
192coro_cctx *cctx_current; /* the currently running cctx */
193
194/*****************************************************************************/
195
196enum {
197 CF_RUNNING = 0x0001, /* coroutine is running */
198 CF_READY = 0x0002, /* coroutine is ready */
199 CF_NEW = 0x0004, /* has never been switched to */
200 CF_DESTROYED = 0x0008, /* coroutine data has been freed */
201 CF_SUSPENDED = 0x0010, /* coroutine can't be scheduled */
202};
203
204/* the structure where most of the perl state is stored, overlaid on the cxstack */
205typedef struct
206{
207 SV *defsv;
208 AV *defav;
209 SV *errsv;
210 SV *irsgv;
211 HV *hinthv;
212#define VAR(name,type) type name;
213# include "state.h"
214#undef VAR
215} perl_slots;
216
217#define SLOT_COUNT ((sizeof (perl_slots) + sizeof (PERL_CONTEXT) - 1) / sizeof (PERL_CONTEXT))
218
219/* this is a structure representing a perl-level coroutine */
11struct coro { 220struct coro {
12 U8 dowarn; 221 /* the C coroutine allocated to this perl coroutine, if any */
13 AV *defav; 222 coro_cctx *cctx;
14 223
15 PERL_SI *curstackinfo; 224 /* ready queue */
16 AV *curstack; 225 struct coro *next_ready;
226
227 /* state data */
228 struct CoroSLF slf_frame; /* saved slf frame */
17 AV *mainstack; 229 AV *mainstack;
18 SV **stack_sp; 230 perl_slots *slot; /* basically the saved sp */
19 OP *op;
20 SV **curpad;
21 SV **stack_base;
22 SV **stack_max;
23 SV **tmps_stack;
24 I32 tmps_floor;
25 I32 tmps_ix;
26 I32 tmps_max;
27 I32 *markstack;
28 I32 *markstack_ptr;
29 I32 *markstack_max;
30 I32 *scopestack;
31 I32 scopestack_ix;
32 I32 scopestack_max;
33 ANY *savestack;
34 I32 savestack_ix;
35 I32 savestack_max;
36 OP **retstack;
37 I32 retstack_ix;
38 I32 retstack_max;
39 COP *curcop;
40 231
41 AV *args; 232 CV *startcv; /* the CV to execute */
233 AV *args; /* data associated with this coroutine (initial args) */
234 int refcnt; /* coroutines are refcounted, yes */
235 int flags; /* CF_ flags */
236 HV *hv; /* the perl hash associated with this coro, if any */
237 void (*on_destroy)(pTHX_ struct coro *coro);
238
239 /* statistics */
240 int usecount; /* number of transfers to this coro */
241
242 /* coro process data */
243 int prio;
244 SV *except; /* exception to be thrown */
245 SV *rouse_cb;
246
247 /* async_pool */
248 SV *saved_deffh;
249 SV *invoke_cb;
250 AV *invoke_av;
251
252 /* on_enter/on_leave */
253 AV *on_enter;
254 AV *on_leave;
255
256 /* times */
257 coro_ts t_cpu, t_real;
258
259 /* linked list */
260 struct coro *next, *prev;
42}; 261};
43 262
44typedef struct coro *Coro__State; 263typedef struct coro *Coro__State;
45typedef struct coro *Coro__State_or_hashref; 264typedef struct coro *Coro__State_or_hashref;
46 265
47static HV *padlist_cache; 266/* the following variables are effectively part of the perl context */
267/* and get copied between struct coro and these variables */
268/* the mainr easonw e don't support windows process emulation */
269static struct CoroSLF slf_frame; /* the current slf frame */
48 270
49/* mostly copied from op.c:cv_clone2 */ 271/** Coro ********************************************************************/
50STATIC AV * 272
51clone_padlist (AV *protopadlist) 273#define CORO_PRIO_MAX 3
274#define CORO_PRIO_HIGH 1
275#define CORO_PRIO_NORMAL 0
276#define CORO_PRIO_LOW -1
277#define CORO_PRIO_IDLE -3
278#define CORO_PRIO_MIN -4
279
280/* for Coro.pm */
281static SV *coro_current;
282static SV *coro_readyhook;
283static struct coro *coro_ready [CORO_PRIO_MAX - CORO_PRIO_MIN + 1][2]; /* head|tail */
284static CV *cv_coro_run, *cv_coro_terminate;
285static struct coro *coro_first;
286#define coro_nready coroapi.nready
287
288/** Coro::Select ************************************************************/
289
290static OP *(*coro_old_pp_sselect) (pTHX);
291static SV *coro_select_select;
292
293/* horrible hack, but if it works... */
294static OP *
295coro_pp_sselect (aTHX)
52{ 296{
53 AV *av; 297 dSP;
54 I32 ix; 298 PUSHMARK (SP - 4); /* fake argument list */
55 AV *protopad_name = (AV *) * av_fetch (protopadlist, 0, FALSE); 299 XPUSHs (coro_select_select);
56 AV *protopad = (AV *) * av_fetch (protopadlist, 1, FALSE); 300 PUTBACK;
57 SV **pname = AvARRAY (protopad_name); 301
58 SV **ppad = AvARRAY (protopad); 302 /* entersub is an UNOP, select a LISTOP... keep your fingers crossed */
59 I32 fname = AvFILLp (protopad_name); 303 PL_op->op_flags |= OPf_STACKED;
60 I32 fpad = AvFILLp (protopad); 304 PL_op->op_private = 0;
305 return PL_ppaddr [OP_ENTERSUB](aTHX);
306}
307
308/** lowlevel stuff **********************************************************/
309
310static SV *
311coro_get_sv (pTHX_ const char *name, int create)
312{
313#if PERL_VERSION_ATLEAST (5,10,0)
314 /* silence stupid and wrong 5.10 warning that I am unable to switch off */
315 get_sv (name, create);
316#endif
317 return get_sv (name, create);
318}
319
320static AV *
321coro_get_av (pTHX_ const char *name, int create)
322{
323#if PERL_VERSION_ATLEAST (5,10,0)
324 /* silence stupid and wrong 5.10 warning that I am unable to switch off */
325 get_av (name, create);
326#endif
327 return get_av (name, create);
328}
329
330static HV *
331coro_get_hv (pTHX_ const char *name, int create)
332{
333#if PERL_VERSION_ATLEAST (5,10,0)
334 /* silence stupid and wrong 5.10 warning that I am unable to switch off */
335 get_hv (name, create);
336#endif
337 return get_hv (name, create);
338}
339
340INLINE void
341coro_times_update ()
342{
343#ifdef coro_clock_gettime
344 struct timespec ts;
345
346 ts.tv_sec = ts.tv_nsec = 0;
347 coro_clock_gettime (CORO_CLOCK_THREAD_CPUTIME_ID, &ts);
348 time_cpu [0] = ts.tv_sec; time_cpu [1] = ts.tv_nsec;
349
350 ts.tv_sec = ts.tv_nsec = 0;
351 coro_clock_gettime (CORO_CLOCK_MONOTONIC, &ts);
352 time_real [0] = ts.tv_sec; time_real [1] = ts.tv_nsec;
353#else
354 dTHX;
355 UV tv[2];
356
357 u2time (aTHX_ tv);
358 time_real [0] = tv [0];
359 time_real [1] = tv [1] * 1000;
360#endif
361}
362
363INLINE void
364coro_times_add (struct coro *c)
365{
366 c->t_real [1] += time_real [1];
367 if (c->t_real [1] > 1000000000) { c->t_real [1] -= 1000000000; ++c->t_real [0]; }
368 c->t_real [0] += time_real [0];
369
370 c->t_cpu [1] += time_cpu [1];
371 if (c->t_cpu [1] > 1000000000) { c->t_cpu [1] -= 1000000000; ++c->t_cpu [0]; }
372 c->t_cpu [0] += time_cpu [0];
373}
374
375INLINE void
376coro_times_sub (struct coro *c)
377{
378 if (c->t_real [1] < time_real [1]) { c->t_real [1] += 1000000000; --c->t_real [0]; }
379 c->t_real [1] -= time_real [1];
380 c->t_real [0] -= time_real [0];
381
382 if (c->t_cpu [1] < time_cpu [1]) { c->t_cpu [1] += 1000000000; --c->t_cpu [0]; }
383 c->t_cpu [1] -= time_cpu [1];
384 c->t_cpu [0] -= time_cpu [0];
385}
386
387/*****************************************************************************/
388/* magic glue */
389
390#define CORO_MAGIC_type_cv 26
391#define CORO_MAGIC_type_state PERL_MAGIC_ext
392
393#define CORO_MAGIC_NN(sv, type) \
394 (expect_true (SvMAGIC (sv)->mg_type == type) \
395 ? SvMAGIC (sv) \
396 : mg_find (sv, type))
397
398#define CORO_MAGIC(sv, type) \
399 (expect_true (SvMAGIC (sv)) \
400 ? CORO_MAGIC_NN (sv, type) \
401 : 0)
402
403#define CORO_MAGIC_cv(cv) CORO_MAGIC (((SV *)(cv)), CORO_MAGIC_type_cv)
404#define CORO_MAGIC_state(sv) CORO_MAGIC_NN (((SV *)(sv)), CORO_MAGIC_type_state)
405
406INLINE struct coro *
407SvSTATE_ (pTHX_ SV *coro)
408{
409 HV *stash;
410 MAGIC *mg;
411
412 if (SvROK (coro))
413 coro = SvRV (coro);
414
415 if (expect_false (SvTYPE (coro) != SVt_PVHV))
416 croak ("Coro::State object required");
417
418 stash = SvSTASH (coro);
419 if (expect_false (stash != coro_stash && stash != coro_state_stash))
420 {
421 /* very slow, but rare, check */
422 if (!sv_derived_from (sv_2mortal (newRV_inc (coro)), "Coro::State"))
423 croak ("Coro::State object required");
424 }
425
426 mg = CORO_MAGIC_state (coro);
427 return (struct coro *)mg->mg_ptr;
428}
429
430#define SvSTATE(sv) SvSTATE_ (aTHX_ (sv))
431
432/* faster than SvSTATE, but expects a coroutine hv */
433#define SvSTATE_hv(hv) ((struct coro *)CORO_MAGIC_NN ((SV *)hv, CORO_MAGIC_type_state)->mg_ptr)
434#define SvSTATE_current SvSTATE_hv (SvRV (coro_current))
435
436/*****************************************************************************/
437/* padlist management and caching */
438
439static AV *
440coro_derive_padlist (pTHX_ CV *cv)
441{
442 AV *padlist = CvPADLIST (cv);
61 AV *newpadlist, *newpad_name, *newpad; 443 AV *newpadlist, *newpad;
62 SV **npad;
63
64 newpad_name = newAV ();
65 for (ix = fname; ix >= 0; ix--)
66 av_store (newpad_name, ix, SvREFCNT_inc (pname[ix]));
67
68 newpad = newAV ();
69 av_fill (newpad, AvFILLp (protopad));
70 npad = AvARRAY (newpad);
71 444
72 newpadlist = newAV (); 445 newpadlist = newAV ();
73 AvREAL_off (newpadlist); 446 AvREAL_off (newpadlist);
74 av_store (newpadlist, 0, (SV *) newpad_name); 447#if PERL_VERSION_ATLEAST (5,10,0)
448 Perl_pad_push (aTHX_ padlist, AvFILLp (padlist) + 1);
449#else
450 Perl_pad_push (aTHX_ padlist, AvFILLp (padlist) + 1, 1);
451#endif
452 newpad = (AV *)AvARRAY (padlist)[AvFILLp (padlist)];
453 --AvFILLp (padlist);
454
455 av_store (newpadlist, 0, SvREFCNT_inc_NN (AvARRAY (padlist)[0]));
75 av_store (newpadlist, 1, (SV *) newpad); 456 av_store (newpadlist, 1, (SV *)newpad);
76 457
77 av = newAV (); /* will be @_ */ 458 return newpadlist;
78 av_extend (av, 0); 459}
79 av_store (newpad, 0, (SV *) av);
80 AvFLAGS (av) = AVf_REIFY;
81 460
82 for (ix = fpad; ix > 0; ix--) 461static void
462free_padlist (pTHX_ AV *padlist)
463{
464 /* may be during global destruction */
465 if (!IN_DESTRUCT)
83 { 466 {
84 SV *namesv = (ix <= fname) ? pname[ix] : Nullsv; 467 I32 i = AvFILLp (padlist);
85 if (namesv && namesv != &PL_sv_undef) 468
469 while (i > 0) /* special-case index 0 */
86 { 470 {
87 char *name = SvPVX (namesv); /* XXX */ 471 /* we try to be extra-careful here */
88 if (SvFLAGS (namesv) & SVf_FAKE || *name == '&') 472 AV *av = (AV *)AvARRAY (padlist)[i--];
89 { /* lexical from outside? */ 473 I32 j = AvFILLp (av);
90 npad[ix] = SvREFCNT_inc (ppad[ix]); 474
91 } 475 while (j >= 0)
92 else 476 SvREFCNT_dec (AvARRAY (av)[j--]);
93 { /* our own lexical */ 477
94 SV *sv; 478 AvFILLp (av) = -1;
95 if (*name == '&') 479 SvREFCNT_dec (av);
96 sv = SvREFCNT_inc (ppad[ix]);
97 else if (*name == '@')
98 sv = (SV *) newAV ();
99 else if (*name == '%')
100 sv = (SV *) newHV ();
101 else
102 sv = NEWSV (0, 0);
103 if (!SvPADBUSY (sv))
104 SvPADMY_on (sv);
105 npad[ix] = sv;
106 }
107 } 480 }
108 else if (IS_PADGV (ppad[ix]) || IS_PADCONST (ppad[ix])) 481
109 { 482 SvREFCNT_dec (AvARRAY (padlist)[0]);
110 npad[ix] = SvREFCNT_inc (ppad[ix]); 483
111 } 484 AvFILLp (padlist) = -1;
485 SvREFCNT_dec ((SV*)padlist);
486 }
487}
488
489static int
490coro_cv_free (pTHX_ SV *sv, MAGIC *mg)
491{
492 AV *padlist;
493 AV *av = (AV *)mg->mg_obj;
494
495 /* perl manages to free our internal AV and _then_ call us */
496 if (IN_DESTRUCT)
497 return 0;
498
499 /* casting is fun. */
500 while (&PL_sv_undef != (SV *)(padlist = (AV *)av_pop (av)))
501 free_padlist (aTHX_ padlist);
502
503 SvREFCNT_dec (av); /* sv_magicext increased the refcount */
504
505 return 0;
506}
507
508static MGVTBL coro_cv_vtbl = {
509 0, 0, 0, 0,
510 coro_cv_free
511};
512
513/* the next two functions merely cache the padlists */
514static void
515get_padlist (pTHX_ CV *cv)
516{
517 MAGIC *mg = CORO_MAGIC_cv (cv);
518 AV *av;
519
520 if (expect_true (mg && AvFILLp ((av = (AV *)mg->mg_obj)) >= 0))
521 CvPADLIST (cv) = (AV *)AvARRAY (av)[AvFILLp (av)--];
112 else 522 else
113 { 523 {
114 SV *sv = NEWSV (0, 0); 524#if CORO_PREFER_PERL_FUNCTIONS
115 SvPADTMP_on (sv); 525 /* this is probably cleaner? but also slower! */
116 npad[ix] = sv; 526 /* in practise, it seems to be less stable */
117 } 527 CV *cp = Perl_cv_clone (aTHX_ cv);
118 } 528 CvPADLIST (cv) = CvPADLIST (cp);
119 529 CvPADLIST (cp) = 0;
120#if 0 /* NONOTUNDERSTOOD */ 530 SvREFCNT_dec (cp);
121 /* Now that vars are all in place, clone nested closures. */ 531#else
122 532 CvPADLIST (cv) = coro_derive_padlist (aTHX_ cv);
123 for (ix = fpad; ix > 0; ix--) {
124 SV* namesv = (ix <= fname) ? pname[ix] : Nullsv;
125 if (namesv
126 && namesv != &PL_sv_undef
127 && !(SvFLAGS(namesv) & SVf_FAKE)
128 && *SvPVX(namesv) == '&'
129 && CvCLONE(ppad[ix]))
130 {
131 CV *kid = cv_clone((CV*)ppad[ix]);
132 SvREFCNT_dec(ppad[ix]);
133 CvCLONE_on(kid);
134 SvPADMY_on(kid);
135 npad[ix] = (SV*)kid;
136 }
137 }
138#endif 533#endif
139 534 }
140 return newpadlist;
141} 535}
142 536
143STATIC AV * 537static void
144free_padlist (AV *padlist) 538put_padlist (pTHX_ CV *cv)
145{ 539{
146 /* may be during global destruction */ 540 MAGIC *mg = CORO_MAGIC_cv (cv);
147 if (SvREFCNT(padlist)) 541 AV *av;
542
543 if (expect_false (!mg))
544 mg = sv_magicext ((SV *)cv, (SV *)newAV (), CORO_MAGIC_type_cv, &coro_cv_vtbl, 0, 0);
545
546 av = (AV *)mg->mg_obj;
547
548 if (expect_false (AvFILLp (av) >= AvMAX (av)))
549 av_extend (av, AvFILLp (av) + 1);
550
551 AvARRAY (av)[++AvFILLp (av)] = (SV *)CvPADLIST (cv);
552}
553
554/** load & save, init *******************************************************/
555
556static void
557on_enterleave_call (pTHX_ SV *cb);
558
559static void
560load_perl (pTHX_ Coro__State c)
561{
562 perl_slots *slot = c->slot;
563 c->slot = 0;
564
565 PL_mainstack = c->mainstack;
566
567 GvSV (PL_defgv) = slot->defsv;
568 GvAV (PL_defgv) = slot->defav;
569 GvSV (PL_errgv) = slot->errsv;
570 GvSV (irsgv) = slot->irsgv;
571 GvHV (PL_hintgv) = slot->hinthv;
572
573 #define VAR(name,type) PL_ ## name = slot->name;
574 # include "state.h"
575 #undef VAR
576
148 { 577 {
149 I32 i = AvFILLp(padlist); 578 dSP;
150 while (i >= 0) 579
580 CV *cv;
581
582 /* now do the ugly restore mess */
583 while (expect_true (cv = (CV *)POPs))
151 { 584 {
152 SV **svp = av_fetch(padlist, i--, FALSE); 585 put_padlist (aTHX_ cv); /* mark this padlist as available */
153 SV *sv = svp ? *svp : Nullsv; 586 CvDEPTH (cv) = PTR2IV (POPs);
154 if (sv) 587 CvPADLIST (cv) = (AV *)POPs;
155 SvREFCNT_dec(sv);
156 } 588 }
157 589
158 SvREFCNT_dec((SV*)padlist); 590 PUTBACK;
159 } 591 }
160}
161 592
162STATIC AV * 593 slf_frame = c->slf_frame;
163unuse_padlist (AV *padlist) 594 CORO_THROW = c->except;
164{
165 free_padlist (padlist);
166}
167 595
596 if (expect_false (enable_times))
597 {
598 if (expect_false (!times_valid))
599 coro_times_update ();
600
601 coro_times_sub (c);
602 }
603
604 if (expect_false (c->on_enter))
605 {
606 int i;
607
608 for (i = 0; i <= AvFILLp (c->on_enter); ++i)
609 on_enterleave_call (aTHX_ AvARRAY (c->on_enter)[i]);
610 }
611}
612
168static void 613static void
169SAVE(pTHX_ Coro__State c) 614save_perl (pTHX_ Coro__State c)
170{ 615{
616 if (expect_false (c->on_leave))
617 {
618 int i;
619
620 for (i = AvFILLp (c->on_leave); i >= 0; --i)
621 on_enterleave_call (aTHX_ AvARRAY (c->on_leave)[i]);
622 }
623
624 times_valid = 0;
625
626 if (expect_false (enable_times))
627 {
628 coro_times_update (); times_valid = 1;
629 coro_times_add (c);
630 }
631
632 c->except = CORO_THROW;
633 c->slf_frame = slf_frame;
634
171 { 635 {
172 dSP; 636 dSP;
173 I32 cxix = cxstack_ix; 637 I32 cxix = cxstack_ix;
638 PERL_CONTEXT *ccstk = cxstack;
174 PERL_SI *top_si = PL_curstackinfo; 639 PERL_SI *top_si = PL_curstackinfo;
175 PERL_CONTEXT *ccstk = cxstack;
176 640
177 /* 641 /*
178 * the worst thing you can imagine happens first - we have to save 642 * the worst thing you can imagine happens first - we have to save
179 * (and reinitialize) all cv's in the whole callchain :( 643 * (and reinitialize) all cv's in the whole callchain :(
180 */ 644 */
181 645
182 PUSHs (Nullsv); 646 XPUSHs (Nullsv);
183 /* this loop was inspired by pp_caller */ 647 /* this loop was inspired by pp_caller */
184 for (;;) 648 for (;;)
185 { 649 {
186 while (cxix >= 0) 650 while (expect_true (cxix >= 0))
187 { 651 {
188 PERL_CONTEXT *cx = &ccstk[--cxix]; 652 PERL_CONTEXT *cx = &ccstk[cxix--];
189 653
190 if (CxTYPE(cx) == CXt_SUB) 654 if (expect_true (CxTYPE (cx) == CXt_SUB) || expect_false (CxTYPE (cx) == CXt_FORMAT))
191 { 655 {
192 CV *cv = cx->blk_sub.cv; 656 CV *cv = cx->blk_sub.cv;
657
193 if (CvDEPTH(cv)) 658 if (expect_true (CvDEPTH (cv)))
194 { 659 {
195#ifdef USE_THREADS
196 XPUSHs ((SV *)CvOWNER(cv));
197#endif
198 EXTEND (SP, 3); 660 EXTEND (SP, 3);
199 PUSHs ((SV *)CvDEPTH(cv));
200 PUSHs ((SV *)CvPADLIST(cv)); 661 PUSHs ((SV *)CvPADLIST (cv));
662 PUSHs (INT2PTR (SV *, (IV)CvDEPTH (cv)));
201 PUSHs ((SV *)cv); 663 PUSHs ((SV *)cv);
202 664
203 CvPADLIST(cv) = clone_padlist (CvPADLIST(cv));
204
205 CvDEPTH(cv) = 0; 665 CvDEPTH (cv) = 0;
206#ifdef USE_THREADS 666 get_padlist (aTHX_ cv);
207 CvOWNER(cv) = 0;
208 error must unlock this cv etc.. etc...
209 if you are here wondering about this error message then
210 the reason is that it will not work as advertised yet
211#endif
212 } 667 }
213 } 668 }
214 else if (CxTYPE(cx) == CXt_FORMAT) 669 }
670
671 if (expect_true (top_si->si_type == PERLSI_MAIN))
672 break;
673
674 top_si = top_si->si_prev;
675 ccstk = top_si->si_cxstack;
676 cxix = top_si->si_cxix;
677 }
678
679 PUTBACK;
680 }
681
682 /* allocate some space on the context stack for our purposes */
683 /* we manually unroll here, as usually 2 slots is enough */
684 if (SLOT_COUNT >= 1) CXINC;
685 if (SLOT_COUNT >= 2) CXINC;
686 if (SLOT_COUNT >= 3) CXINC;
687 {
688 unsigned int i;
689 for (i = 3; i < SLOT_COUNT; ++i)
690 CXINC;
691 }
692 cxstack_ix -= SLOT_COUNT; /* undo allocation */
693
694 c->mainstack = PL_mainstack;
695
696 {
697 perl_slots *slot = c->slot = (perl_slots *)(cxstack + cxstack_ix + 1);
698
699 slot->defav = GvAV (PL_defgv);
700 slot->defsv = DEFSV;
701 slot->errsv = ERRSV;
702 slot->irsgv = GvSV (irsgv);
703 slot->hinthv = GvHV (PL_hintgv);
704
705 #define VAR(name,type) slot->name = PL_ ## name;
706 # include "state.h"
707 #undef VAR
708 }
709}
710
711/*
712 * allocate various perl stacks. This is almost an exact copy
713 * of perl.c:init_stacks, except that it uses less memory
714 * on the (sometimes correct) assumption that coroutines do
715 * not usually need a lot of stackspace.
716 */
717#if CORO_PREFER_PERL_FUNCTIONS
718# define coro_init_stacks(thx) init_stacks ()
719#else
720static void
721coro_init_stacks (pTHX)
722{
723 PL_curstackinfo = new_stackinfo(32, 8);
724 PL_curstackinfo->si_type = PERLSI_MAIN;
725 PL_curstack = PL_curstackinfo->si_stack;
726 PL_mainstack = PL_curstack; /* remember in case we switch stacks */
727
728 PL_stack_base = AvARRAY(PL_curstack);
729 PL_stack_sp = PL_stack_base;
730 PL_stack_max = PL_stack_base + AvMAX(PL_curstack);
731
732 New(50,PL_tmps_stack,32,SV*);
733 PL_tmps_floor = -1;
734 PL_tmps_ix = -1;
735 PL_tmps_max = 32;
736
737 New(54,PL_markstack,16,I32);
738 PL_markstack_ptr = PL_markstack;
739 PL_markstack_max = PL_markstack + 16;
740
741#ifdef SET_MARK_OFFSET
742 SET_MARK_OFFSET;
743#endif
744
745 New(54,PL_scopestack,8,I32);
746 PL_scopestack_ix = 0;
747 PL_scopestack_max = 8;
748
749 New(54,PL_savestack,24,ANY);
750 PL_savestack_ix = 0;
751 PL_savestack_max = 24;
752
753#if !PERL_VERSION_ATLEAST (5,10,0)
754 New(54,PL_retstack,4,OP*);
755 PL_retstack_ix = 0;
756 PL_retstack_max = 4;
757#endif
758}
759#endif
760
761/*
762 * destroy the stacks, the callchain etc...
763 */
764static void
765coro_destruct_stacks (pTHX)
766{
767 while (PL_curstackinfo->si_next)
768 PL_curstackinfo = PL_curstackinfo->si_next;
769
770 while (PL_curstackinfo)
771 {
772 PERL_SI *p = PL_curstackinfo->si_prev;
773
774 if (!IN_DESTRUCT)
775 SvREFCNT_dec (PL_curstackinfo->si_stack);
776
777 Safefree (PL_curstackinfo->si_cxstack);
778 Safefree (PL_curstackinfo);
779 PL_curstackinfo = p;
780 }
781
782 Safefree (PL_tmps_stack);
783 Safefree (PL_markstack);
784 Safefree (PL_scopestack);
785 Safefree (PL_savestack);
786#if !PERL_VERSION_ATLEAST (5,10,0)
787 Safefree (PL_retstack);
788#endif
789}
790
791#define CORO_RSS \
792 rss += sizeof (SYM (curstackinfo)); \
793 rss += (SYM (curstackinfo->si_cxmax) + 1) * sizeof (PERL_CONTEXT); \
794 rss += sizeof (SV) + sizeof (struct xpvav) + (1 + AvMAX (SYM (curstack))) * sizeof (SV *); \
795 rss += SYM (tmps_max) * sizeof (SV *); \
796 rss += (SYM (markstack_max) - SYM (markstack_ptr)) * sizeof (I32); \
797 rss += SYM (scopestack_max) * sizeof (I32); \
798 rss += SYM (savestack_max) * sizeof (ANY);
799
800static size_t
801coro_rss (pTHX_ struct coro *coro)
802{
803 size_t rss = sizeof (*coro);
804
805 if (coro->mainstack)
806 {
807 if (coro->flags & CF_RUNNING)
808 {
809 #define SYM(sym) PL_ ## sym
810 CORO_RSS;
811 #undef SYM
812 }
813 else
814 {
815 #define SYM(sym) coro->slot->sym
816 CORO_RSS;
817 #undef SYM
818 }
819 }
820
821 return rss;
822}
823
824/** coroutine stack handling ************************************************/
825
826static int (*orig_sigelem_get) (pTHX_ SV *sv, MAGIC *mg);
827static int (*orig_sigelem_set) (pTHX_ SV *sv, MAGIC *mg);
828static int (*orig_sigelem_clr) (pTHX_ SV *sv, MAGIC *mg);
829
830/* apparently < 5.8.8 */
831#ifndef MgPV_nolen_const
832#define MgPV_nolen_const(mg) (((((int)(mg)->mg_len)) == HEf_SVKEY) ? \
833 SvPV_nolen((SV*)((mg)->mg_ptr)) : \
834 (const char*)(mg)->mg_ptr)
835#endif
836
837/*
838 * This overrides the default magic get method of %SIG elements.
839 * The original one doesn't provide for reading back of PL_diehook/PL_warnhook
840 * and instead of trying to save and restore the hash elements, we just provide
841 * readback here.
842 */
843static int
844coro_sigelem_get (pTHX_ SV *sv, MAGIC *mg)
845{
846 const char *s = MgPV_nolen_const (mg);
847
848 if (*s == '_')
849 {
850 SV **svp = 0;
851
852 if (strEQ (s, "__DIE__" )) svp = &PL_diehook;
853 if (strEQ (s, "__WARN__")) svp = &PL_warnhook;
854
855 if (svp)
856 {
857 sv_setsv (sv, *svp ? *svp : &PL_sv_undef);
858 return 0;
859 }
860 }
861
862 return orig_sigelem_get ? orig_sigelem_get (aTHX_ sv, mg) : 0;
863}
864
865static int
866coro_sigelem_clr (pTHX_ SV *sv, MAGIC *mg)
867{
868 const char *s = MgPV_nolen_const (mg);
869
870 if (*s == '_')
871 {
872 SV **svp = 0;
873
874 if (strEQ (s, "__DIE__" )) svp = &PL_diehook;
875 if (strEQ (s, "__WARN__")) svp = &PL_warnhook;
876
877 if (svp)
878 {
879 SV *old = *svp;
880 *svp = 0;
881 SvREFCNT_dec (old);
882 return 0;
883 }
884 }
885
886 return orig_sigelem_clr ? orig_sigelem_clr (aTHX_ sv, mg) : 0;
887}
888
889static int
890coro_sigelem_set (pTHX_ SV *sv, MAGIC *mg)
891{
892 const char *s = MgPV_nolen_const (mg);
893
894 if (*s == '_')
895 {
896 SV **svp = 0;
897
898 if (strEQ (s, "__DIE__" )) svp = &PL_diehook;
899 if (strEQ (s, "__WARN__")) svp = &PL_warnhook;
900
901 if (svp)
902 {
903 SV *old = *svp;
904 *svp = SvOK (sv) ? newSVsv (sv) : 0;
905 SvREFCNT_dec (old);
906 return 0;
907 }
908 }
909
910 return orig_sigelem_set ? orig_sigelem_set (aTHX_ sv, mg) : 0;
911}
912
913static void
914prepare_nop (pTHX_ struct coro_transfer_args *ta)
915{
916 /* kind of mega-hacky, but works */
917 ta->next = ta->prev = (struct coro *)ta;
918}
919
920static int
921slf_check_nop (pTHX_ struct CoroSLF *frame)
922{
923 return 0;
924}
925
926static int
927slf_check_repeat (pTHX_ struct CoroSLF *frame)
928{
929 return 1;
930}
931
932static UNOP coro_setup_op;
933
934static void NOINLINE /* noinline to keep it out of the transfer fast path */
935coro_setup (pTHX_ struct coro *coro)
936{
937 /*
938 * emulate part of the perl startup here.
939 */
940 coro_init_stacks (aTHX);
941
942 PL_runops = RUNOPS_DEFAULT;
943 PL_curcop = &PL_compiling;
944 PL_in_eval = EVAL_NULL;
945 PL_comppad = 0;
946 PL_comppad_name = 0;
947 PL_comppad_name_fill = 0;
948 PL_comppad_name_floor = 0;
949 PL_curpm = 0;
950 PL_curpad = 0;
951 PL_localizing = 0;
952 PL_dirty = 0;
953 PL_restartop = 0;
954#if PERL_VERSION_ATLEAST (5,10,0)
955 PL_parser = 0;
956#endif
957 PL_hints = 0;
958
959 /* recreate the die/warn hooks */
960 PL_diehook = 0; SvSetMagicSV (*hv_fetch (hv_sig, "__DIE__" , sizeof ("__DIE__" ) - 1, 1), rv_diehook );
961 PL_warnhook = 0; SvSetMagicSV (*hv_fetch (hv_sig, "__WARN__", sizeof ("__WARN__") - 1, 1), rv_warnhook);
962
963 GvSV (PL_defgv) = newSV (0);
964 GvAV (PL_defgv) = coro->args; coro->args = 0;
965 GvSV (PL_errgv) = newSV (0);
966 GvSV (irsgv) = newSVpvn ("\n", 1); sv_magic (GvSV (irsgv), (SV *)irsgv, PERL_MAGIC_sv, "/", 0);
967 GvHV (PL_hintgv) = 0;
968 PL_rs = newSVsv (GvSV (irsgv));
969 PL_defoutgv = (GV *)SvREFCNT_inc_NN (stdoutgv);
970
971 {
972 dSP;
973 UNOP myop;
974
975 Zero (&myop, 1, UNOP);
976 myop.op_next = Nullop;
977 myop.op_type = OP_ENTERSUB;
978 myop.op_flags = OPf_WANT_VOID;
979
980 PUSHMARK (SP);
981 PUSHs ((SV *)coro->startcv);
982 PUTBACK;
983 PL_op = (OP *)&myop;
984 PL_op = PL_ppaddr[OP_ENTERSUB](aTHX);
985 }
986
987 /* this newly created coroutine might be run on an existing cctx which most
988 * likely was suspended in pp_slf, so we have to emulate entering pp_slf here.
989 */
990 slf_frame.prepare = prepare_nop; /* provide a nop function for an eventual pp_slf */
991 slf_frame.check = slf_check_nop; /* signal pp_slf to not repeat */
992
993 /* and we have to provide the pp_slf op in any case, so pp_slf can skip it */
994 coro_setup_op.op_next = PL_op;
995 coro_setup_op.op_type = OP_ENTERSUB;
996 coro_setup_op.op_ppaddr = pp_slf;
997 /* no flags etc. required, as an init function won't be called */
998
999 PL_op = (OP *)&coro_setup_op;
1000
1001 /* copy throw, in case it was set before coro_setup */
1002 CORO_THROW = coro->except;
1003
1004 if (expect_false (enable_times))
1005 {
1006 coro_times_update ();
1007 coro_times_sub (coro);
1008 }
1009}
1010
1011static void
1012coro_unwind_stacks (pTHX)
1013{
1014 if (!IN_DESTRUCT)
1015 {
1016 /* restore all saved variables and stuff */
1017 LEAVE_SCOPE (0);
1018 assert (PL_tmps_floor == -1);
1019
1020 /* free all temporaries */
1021 FREETMPS;
1022 assert (PL_tmps_ix == -1);
1023
1024 /* unwind all extra stacks */
1025 POPSTACK_TO (PL_mainstack);
1026
1027 /* unwind main stack */
1028 dounwind (-1);
1029 }
1030}
1031
1032static void
1033coro_destruct_perl (pTHX_ struct coro *coro)
1034{
1035 SV *svf [9];
1036
1037 {
1038 struct coro *current = SvSTATE_current;
1039
1040 assert (("FATAL: tried to destroy currently running coroutine", coro->mainstack != PL_mainstack));
1041
1042 save_perl (aTHX_ current);
1043 load_perl (aTHX_ coro);
1044
1045 coro_unwind_stacks (aTHX);
1046 coro_destruct_stacks (aTHX);
1047
1048 // now save some sv's to be free'd later
1049 svf [0] = GvSV (PL_defgv);
1050 svf [1] = (SV *)GvAV (PL_defgv);
1051 svf [2] = GvSV (PL_errgv);
1052 svf [3] = (SV *)PL_defoutgv;
1053 svf [4] = PL_rs;
1054 svf [5] = GvSV (irsgv);
1055 svf [6] = (SV *)GvHV (PL_hintgv);
1056 svf [7] = PL_diehook;
1057 svf [8] = PL_warnhook;
1058 assert (9 == sizeof (svf) / sizeof (*svf));
1059
1060 load_perl (aTHX_ current);
1061 }
1062
1063 {
1064 unsigned int i;
1065
1066 for (i = 0; i < sizeof (svf) / sizeof (*svf); ++i)
1067 SvREFCNT_dec (svf [i]);
1068
1069 SvREFCNT_dec (coro->saved_deffh);
1070 SvREFCNT_dec (coro->rouse_cb);
1071 SvREFCNT_dec (coro->invoke_cb);
1072 SvREFCNT_dec (coro->invoke_av);
1073 }
1074}
1075
1076INLINE void
1077free_coro_mortal (pTHX)
1078{
1079 if (expect_true (coro_mortal))
1080 {
1081 SvREFCNT_dec (coro_mortal);
1082 coro_mortal = 0;
1083 }
1084}
1085
1086static int
1087runops_trace (pTHX)
1088{
1089 COP *oldcop = 0;
1090 int oldcxix = -2;
1091
1092 while ((PL_op = CALL_FPTR (PL_op->op_ppaddr) (aTHX)))
1093 {
1094 PERL_ASYNC_CHECK ();
1095
1096 if (cctx_current->flags & CC_TRACE_ALL)
1097 {
1098 if (PL_op->op_type == OP_LEAVESUB && cctx_current->flags & CC_TRACE_SUB)
1099 {
1100 PERL_CONTEXT *cx = &cxstack[cxstack_ix];
1101 SV **bot, **top;
1102 AV *av = newAV (); /* return values */
1103 SV **cb;
1104 dSP;
1105
1106 GV *gv = CvGV (cx->blk_sub.cv);
1107 SV *fullname = sv_2mortal (newSV (0));
1108 if (isGV (gv))
1109 gv_efullname3 (fullname, gv, 0);
1110
1111 bot = PL_stack_base + cx->blk_oldsp + 1;
1112 top = cx->blk_gimme == G_ARRAY ? SP + 1
1113 : cx->blk_gimme == G_SCALAR ? bot + 1
1114 : bot;
1115
1116 av_extend (av, top - bot);
1117 while (bot < top)
1118 av_push (av, SvREFCNT_inc_NN (*bot++));
1119
1120 PL_runops = RUNOPS_DEFAULT;
1121 ENTER;
1122 SAVETMPS;
1123 EXTEND (SP, 3);
1124 PUSHMARK (SP);
1125 PUSHs (&PL_sv_no);
1126 PUSHs (fullname);
1127 PUSHs (sv_2mortal (newRV_noinc ((SV *)av)));
1128 PUTBACK;
1129 cb = hv_fetch ((HV *)SvRV (coro_current), "_trace_sub_cb", sizeof ("_trace_sub_cb") - 1, 0);
1130 if (cb) call_sv (*cb, G_KEEPERR | G_EVAL | G_VOID | G_DISCARD);
1131 SPAGAIN;
1132 FREETMPS;
1133 LEAVE;
1134 PL_runops = runops_trace;
1135 }
1136
1137 if (oldcop != PL_curcop)
1138 {
1139 oldcop = PL_curcop;
1140
1141 if (PL_curcop != &PL_compiling)
1142 {
1143 SV **cb;
1144
1145 if (oldcxix != cxstack_ix && cctx_current->flags & CC_TRACE_SUB)
1146 {
1147 PERL_CONTEXT *cx = &cxstack[cxstack_ix];
1148
1149 if (CxTYPE (cx) == CXt_SUB && oldcxix < cxstack_ix)
1150 {
1151 dSP;
1152 GV *gv = CvGV (cx->blk_sub.cv);
1153 SV *fullname = sv_2mortal (newSV (0));
1154
1155 if (isGV (gv))
1156 gv_efullname3 (fullname, gv, 0);
1157
1158 PL_runops = RUNOPS_DEFAULT;
1159 ENTER;
1160 SAVETMPS;
1161 EXTEND (SP, 3);
1162 PUSHMARK (SP);
1163 PUSHs (&PL_sv_yes);
1164 PUSHs (fullname);
1165 PUSHs (CxHASARGS (cx) ? sv_2mortal (newRV_inc ((SV *)cx->blk_sub.argarray)) : &PL_sv_undef);
1166 PUTBACK;
1167 cb = hv_fetch ((HV *)SvRV (coro_current), "_trace_sub_cb", sizeof ("_trace_sub_cb") - 1, 0);
1168 if (cb) call_sv (*cb, G_KEEPERR | G_EVAL | G_VOID | G_DISCARD);
1169 SPAGAIN;
1170 FREETMPS;
1171 LEAVE;
1172 PL_runops = runops_trace;
1173 }
1174
1175 oldcxix = cxstack_ix;
1176 }
1177
1178 if (cctx_current->flags & CC_TRACE_LINE)
1179 {
1180 dSP;
1181
1182 PL_runops = RUNOPS_DEFAULT;
1183 ENTER;
1184 SAVETMPS;
1185 EXTEND (SP, 3);
1186 PL_runops = RUNOPS_DEFAULT;
1187 PUSHMARK (SP);
1188 PUSHs (sv_2mortal (newSVpv (OutCopFILE (oldcop), 0)));
1189 PUSHs (sv_2mortal (newSViv (CopLINE (oldcop))));
1190 PUTBACK;
1191 cb = hv_fetch ((HV *)SvRV (coro_current), "_trace_line_cb", sizeof ("_trace_line_cb") - 1, 0);
1192 if (cb) call_sv (*cb, G_KEEPERR | G_EVAL | G_VOID | G_DISCARD);
1193 SPAGAIN;
1194 FREETMPS;
1195 LEAVE;
1196 PL_runops = runops_trace;
1197 }
1198 }
1199 }
1200 }
1201 }
1202
1203 TAINT_NOT;
1204 return 0;
1205}
1206
1207static struct CoroSLF cctx_ssl_frame;
1208
1209static void
1210slf_prepare_set_stacklevel (pTHX_ struct coro_transfer_args *ta)
1211{
1212 ta->prev = 0;
1213}
1214
1215static int
1216slf_check_set_stacklevel (pTHX_ struct CoroSLF *frame)
1217{
1218 *frame = cctx_ssl_frame;
1219
1220 return frame->check (aTHX_ frame); /* execute the restored frame - there must be one */
1221}
1222
1223/* initialises PL_top_env and injects a pseudo-slf-call to set the stacklevel */
1224static void NOINLINE
1225cctx_prepare (pTHX)
1226{
1227 PL_top_env = &PL_start_env;
1228
1229 if (cctx_current->flags & CC_TRACE)
1230 PL_runops = runops_trace;
1231
1232 /* we already must be executing an SLF op, there is no other valid way
1233 * that can lead to creation of a new cctx */
1234 assert (("FATAL: can't prepare slf-less cctx in Coro module (please report)",
1235 slf_frame.prepare && PL_op->op_ppaddr == pp_slf));
1236
1237 /* we must emulate leaving pp_slf, which is done inside slf_check_set_stacklevel */
1238 cctx_ssl_frame = slf_frame;
1239
1240 slf_frame.prepare = slf_prepare_set_stacklevel;
1241 slf_frame.check = slf_check_set_stacklevel;
1242}
1243
1244/* the tail of transfer: execute stuff we can only do after a transfer */
1245INLINE void
1246transfer_tail (pTHX)
1247{
1248 free_coro_mortal (aTHX);
1249}
1250
1251/*
1252 * this is a _very_ stripped down perl interpreter ;)
1253 */
1254static void
1255cctx_run (void *arg)
1256{
1257#ifdef USE_ITHREADS
1258# if CORO_PTHREAD
1259 PERL_SET_CONTEXT (coro_thx);
1260# endif
1261#endif
1262 {
1263 dTHX;
1264
1265 /* normally we would need to skip the entersub here */
1266 /* not doing so will re-execute it, which is exactly what we want */
1267 /* PL_nop = PL_nop->op_next */
1268
1269 /* inject a fake subroutine call to cctx_init */
1270 cctx_prepare (aTHX);
1271
1272 /* cctx_run is the alternative tail of transfer() */
1273 transfer_tail (aTHX);
1274
1275 /* somebody or something will hit me for both perl_run and PL_restartop */
1276 PL_restartop = PL_op;
1277 perl_run (PL_curinterp);
1278 /*
1279 * Unfortunately, there is no way to get at the return values of the
1280 * coro body here, as perl_run destroys these
1281 */
1282
1283 /*
1284 * If perl-run returns we assume exit() was being called or the coro
1285 * fell off the end, which seems to be the only valid (non-bug)
1286 * reason for perl_run to return. We try to exit by jumping to the
1287 * bootstrap-time "top" top_env, as we cannot restore the "main"
1288 * coroutine as Coro has no such concept.
1289 * This actually isn't valid with the pthread backend, but OSes requiring
1290 * that backend are too broken to do it in a standards-compliant way.
1291 */
1292 PL_top_env = main_top_env;
1293 JMPENV_JUMP (2); /* I do not feel well about the hardcoded 2 at all */
1294 }
1295}
1296
1297static coro_cctx *
1298cctx_new ()
1299{
1300 coro_cctx *cctx;
1301
1302 ++cctx_count;
1303 New (0, cctx, 1, coro_cctx);
1304
1305 cctx->gen = cctx_gen;
1306 cctx->flags = 0;
1307 cctx->idle_sp = 0; /* can be accessed by transfer between cctx_run and set_stacklevel, on throw */
1308
1309 return cctx;
1310}
1311
1312/* create a new cctx only suitable as source */
1313static coro_cctx *
1314cctx_new_empty ()
1315{
1316 coro_cctx *cctx = cctx_new ();
1317
1318 cctx->sptr = 0;
1319 coro_create (&cctx->cctx, 0, 0, 0, 0);
1320
1321 return cctx;
1322}
1323
1324/* create a new cctx suitable as destination/running a perl interpreter */
1325static coro_cctx *
1326cctx_new_run ()
1327{
1328 coro_cctx *cctx = cctx_new ();
1329 void *stack_start;
1330 size_t stack_size;
1331
1332#if HAVE_MMAP
1333 cctx->ssize = ((cctx_stacksize * sizeof (long) + PAGESIZE - 1) / PAGESIZE + CORO_STACKGUARD) * PAGESIZE;
1334 /* mmap supposedly does allocate-on-write for us */
1335 cctx->sptr = mmap (0, cctx->ssize, PROT_EXEC|PROT_READ|PROT_WRITE, MAP_PRIVATE|MAP_ANONYMOUS, 0, 0);
1336
1337 if (cctx->sptr != (void *)-1)
1338 {
1339 #if CORO_STACKGUARD
1340 mprotect (cctx->sptr, CORO_STACKGUARD * PAGESIZE, PROT_NONE);
1341 #endif
1342 stack_start = (char *)cctx->sptr + CORO_STACKGUARD * PAGESIZE;
1343 stack_size = cctx->ssize - CORO_STACKGUARD * PAGESIZE;
1344 cctx->flags |= CC_MAPPED;
1345 }
1346 else
1347#endif
1348 {
1349 cctx->ssize = cctx_stacksize * (long)sizeof (long);
1350 New (0, cctx->sptr, cctx_stacksize, long);
1351
1352 if (!cctx->sptr)
1353 {
1354 perror ("FATAL: unable to allocate stack for coroutine, exiting.");
1355 _exit (EXIT_FAILURE);
1356 }
1357
1358 stack_start = cctx->sptr;
1359 stack_size = cctx->ssize;
1360 }
1361
1362 #if CORO_USE_VALGRIND
1363 cctx->valgrind_id = VALGRIND_STACK_REGISTER ((char *)stack_start, (char *)stack_start + stack_size);
1364 #endif
1365
1366 coro_create (&cctx->cctx, cctx_run, (void *)cctx, stack_start, stack_size);
1367
1368 return cctx;
1369}
1370
1371static void
1372cctx_destroy (coro_cctx *cctx)
1373{
1374 if (!cctx)
1375 return;
1376
1377 assert (("FATAL: tried to destroy current cctx", cctx != cctx_current));//D temporary?
1378
1379 --cctx_count;
1380 coro_destroy (&cctx->cctx);
1381
1382 /* coro_transfer creates new, empty cctx's */
1383 if (cctx->sptr)
1384 {
1385 #if CORO_USE_VALGRIND
1386 VALGRIND_STACK_DEREGISTER (cctx->valgrind_id);
1387 #endif
1388
1389#if HAVE_MMAP
1390 if (cctx->flags & CC_MAPPED)
1391 munmap (cctx->sptr, cctx->ssize);
1392 else
1393#endif
1394 Safefree (cctx->sptr);
1395 }
1396
1397 Safefree (cctx);
1398}
1399
1400/* wether this cctx should be destructed */
1401#define CCTX_EXPIRED(cctx) ((cctx)->gen != cctx_gen || ((cctx)->flags & CC_NOREUSE))
1402
1403static coro_cctx *
1404cctx_get (pTHX)
1405{
1406 while (expect_true (cctx_first))
1407 {
1408 coro_cctx *cctx = cctx_first;
1409 cctx_first = cctx->next;
1410 --cctx_idle;
1411
1412 if (expect_true (!CCTX_EXPIRED (cctx)))
1413 return cctx;
1414
1415 cctx_destroy (cctx);
1416 }
1417
1418 return cctx_new_run ();
1419}
1420
1421static void
1422cctx_put (coro_cctx *cctx)
1423{
1424 assert (("FATAL: cctx_put called on non-initialised cctx in Coro (please report)", cctx->sptr));
1425
1426 /* free another cctx if overlimit */
1427 if (expect_false (cctx_idle >= cctx_max_idle))
1428 {
1429 coro_cctx *first = cctx_first;
1430 cctx_first = first->next;
1431 --cctx_idle;
1432
1433 cctx_destroy (first);
1434 }
1435
1436 ++cctx_idle;
1437 cctx->next = cctx_first;
1438 cctx_first = cctx;
1439}
1440
1441/** coroutine switching *****************************************************/
1442
1443static void
1444transfer_check (pTHX_ struct coro *prev, struct coro *next)
1445{
1446 /* TODO: throwing up here is considered harmful */
1447
1448 if (expect_true (prev != next))
1449 {
1450 if (expect_false (!(prev->flags & (CF_RUNNING | CF_NEW))))
1451 croak ("Coro::State::transfer called with a blocked prev Coro::State, but can only transfer from running or new states,");
1452
1453 if (expect_false (next->flags & (CF_RUNNING | CF_DESTROYED | CF_SUSPENDED)))
1454 croak ("Coro::State::transfer called with running, destroyed or suspended next Coro::State, but can only transfer to inactive states,");
1455
1456#if !PERL_VERSION_ATLEAST (5,10,0)
1457 if (expect_false (PL_lex_state != LEX_NOTPARSING))
1458 croak ("Coro::State::transfer called while parsing, but this is not supported in your perl version,");
1459#endif
1460 }
1461}
1462
1463/* always use the TRANSFER macro */
1464static void NOINLINE /* noinline so we have a fixed stackframe */
1465transfer (pTHX_ struct coro *prev, struct coro *next, int force_cctx)
1466{
1467 dSTACKLEVEL;
1468
1469 /* sometimes transfer is only called to set idle_sp */
1470 if (expect_false (!prev))
1471 {
1472 cctx_current->idle_sp = STACKLEVEL;
1473 assert (cctx_current->idle_te = PL_top_env); /* just for the side-effect when asserts are enabled */
1474 }
1475 else if (expect_true (prev != next))
1476 {
1477 coro_cctx *cctx_prev;
1478
1479 if (expect_false (prev->flags & CF_NEW))
1480 {
1481 /* create a new empty/source context */
1482 prev->flags &= ~CF_NEW;
1483 prev->flags |= CF_RUNNING;
1484 }
1485
1486 prev->flags &= ~CF_RUNNING;
1487 next->flags |= CF_RUNNING;
1488
1489 /* first get rid of the old state */
1490 save_perl (aTHX_ prev);
1491
1492 if (expect_false (next->flags & CF_NEW))
1493 {
1494 /* need to start coroutine */
1495 next->flags &= ~CF_NEW;
1496 /* setup coroutine call */
1497 coro_setup (aTHX_ next);
1498 }
1499 else
1500 load_perl (aTHX_ next);
1501
1502 /* possibly untie and reuse the cctx */
1503 if (expect_true (
1504 cctx_current->idle_sp == STACKLEVEL
1505 && !(cctx_current->flags & CC_TRACE)
1506 && !force_cctx
1507 ))
1508 {
1509 /* I assume that stacklevel is a stronger indicator than PL_top_env changes */
1510 assert (("FATAL: current top_env must equal previous top_env in Coro (please report)", PL_top_env == cctx_current->idle_te));
1511
1512 /* if the cctx is about to be destroyed we need to make sure we won't see it in cctx_get. */
1513 /* without this the next cctx_get might destroy the running cctx while still in use */
1514 if (expect_false (CCTX_EXPIRED (cctx_current)))
1515 if (expect_true (!next->cctx))
1516 next->cctx = cctx_get (aTHX);
1517
1518 cctx_put (cctx_current);
1519 }
1520 else
1521 prev->cctx = cctx_current;
1522
1523 ++next->usecount;
1524
1525 cctx_prev = cctx_current;
1526 cctx_current = expect_false (next->cctx) ? next->cctx : cctx_get (aTHX);
1527
1528 next->cctx = 0;
1529
1530 if (expect_false (cctx_prev != cctx_current))
1531 {
1532 cctx_prev->top_env = PL_top_env;
1533 PL_top_env = cctx_current->top_env;
1534 coro_transfer (&cctx_prev->cctx, &cctx_current->cctx);
1535 }
1536
1537 transfer_tail (aTHX);
1538 }
1539}
1540
1541#define TRANSFER(ta, force_cctx) transfer (aTHX_ (ta).prev, (ta).next, (force_cctx))
1542#define TRANSFER_CHECK(ta) transfer_check (aTHX_ (ta).prev, (ta).next)
1543
1544/** high level stuff ********************************************************/
1545
1546static int
1547coro_state_destroy (pTHX_ struct coro *coro)
1548{
1549 if (coro->flags & CF_DESTROYED)
1550 return 0;
1551
1552 if (coro->on_destroy && !PL_dirty)
1553 coro->on_destroy (aTHX_ coro);
1554
1555 coro->flags |= CF_DESTROYED;
1556
1557 if (coro->flags & CF_READY)
1558 {
1559 /* reduce nready, as destroying a ready coro effectively unreadies it */
1560 /* alternative: look through all ready queues and remove the coro */
1561 --coro_nready;
1562 }
1563 else
1564 coro->flags |= CF_READY; /* make sure it is NOT put into the readyqueue */
1565
1566 if (coro->mainstack
1567 && coro->mainstack != main_mainstack
1568 && coro->slot
1569 && !PL_dirty)
1570 coro_destruct_perl (aTHX_ coro);
1571
1572 cctx_destroy (coro->cctx);
1573 SvREFCNT_dec (coro->startcv);
1574 SvREFCNT_dec (coro->args);
1575 SvREFCNT_dec (CORO_THROW);
1576
1577 if (coro->next) coro->next->prev = coro->prev;
1578 if (coro->prev) coro->prev->next = coro->next;
1579 if (coro == coro_first) coro_first = coro->next;
1580
1581 return 1;
1582}
1583
1584static int
1585coro_state_free (pTHX_ SV *sv, MAGIC *mg)
1586{
1587 struct coro *coro = (struct coro *)mg->mg_ptr;
1588 mg->mg_ptr = 0;
1589
1590 coro->hv = 0;
1591
1592 if (--coro->refcnt < 0)
1593 {
1594 coro_state_destroy (aTHX_ coro);
1595 Safefree (coro);
1596 }
1597
1598 return 0;
1599}
1600
1601static int
1602coro_state_dup (pTHX_ MAGIC *mg, CLONE_PARAMS *params)
1603{
1604 struct coro *coro = (struct coro *)mg->mg_ptr;
1605
1606 ++coro->refcnt;
1607
1608 return 0;
1609}
1610
1611static MGVTBL coro_state_vtbl = {
1612 0, 0, 0, 0,
1613 coro_state_free,
1614 0,
1615#ifdef MGf_DUP
1616 coro_state_dup,
1617#else
1618# define MGf_DUP 0
1619#endif
1620};
1621
1622static void
1623prepare_transfer (pTHX_ struct coro_transfer_args *ta, SV *prev_sv, SV *next_sv)
1624{
1625 ta->prev = SvSTATE (prev_sv);
1626 ta->next = SvSTATE (next_sv);
1627 TRANSFER_CHECK (*ta);
1628}
1629
1630static void
1631api_transfer (pTHX_ SV *prev_sv, SV *next_sv)
1632{
1633 struct coro_transfer_args ta;
1634
1635 prepare_transfer (aTHX_ &ta, prev_sv, next_sv);
1636 TRANSFER (ta, 1);
1637}
1638
1639/** Coro ********************************************************************/
1640
1641INLINE void
1642coro_enq (pTHX_ struct coro *coro)
1643{
1644 struct coro **ready = coro_ready [coro->prio - CORO_PRIO_MIN];
1645
1646 SvREFCNT_inc_NN (coro->hv);
1647
1648 coro->next_ready = 0;
1649 *(ready [0] ? &ready [1]->next_ready : &ready [0]) = coro;
1650 ready [1] = coro;
1651}
1652
1653INLINE struct coro *
1654coro_deq (pTHX)
1655{
1656 int prio;
1657
1658 for (prio = CORO_PRIO_MAX - CORO_PRIO_MIN + 1; --prio >= 0; )
1659 {
1660 struct coro **ready = coro_ready [prio];
1661
1662 if (ready [0])
1663 {
1664 struct coro *coro = ready [0];
1665 ready [0] = coro->next_ready;
1666 return coro;
1667 }
1668 }
1669
1670 return 0;
1671}
1672
1673static void
1674invoke_sv_ready_hook_helper (void)
1675{
1676 dTHX;
1677 dSP;
1678
1679 ENTER;
1680 SAVETMPS;
1681
1682 PUSHMARK (SP);
1683 PUTBACK;
1684 call_sv (coro_readyhook, G_VOID | G_DISCARD);
1685
1686 FREETMPS;
1687 LEAVE;
1688}
1689
1690static int
1691api_ready (pTHX_ SV *coro_sv)
1692{
1693 struct coro *coro = SvSTATE (coro_sv);
1694
1695 if (coro->flags & CF_READY)
1696 return 0;
1697
1698 coro->flags |= CF_READY;
1699
1700 coro_enq (aTHX_ coro);
1701
1702 if (!coro_nready++)
1703 if (coroapi.readyhook)
1704 coroapi.readyhook ();
1705
1706 return 1;
1707}
1708
1709static int
1710api_is_ready (pTHX_ SV *coro_sv)
1711{
1712 return !!(SvSTATE (coro_sv)->flags & CF_READY);
1713}
1714
1715/* expects to own a reference to next->hv */
1716INLINE void
1717prepare_schedule_to (pTHX_ struct coro_transfer_args *ta, struct coro *next)
1718{
1719 SV *prev_sv = SvRV (coro_current);
1720
1721 ta->prev = SvSTATE_hv (prev_sv);
1722 ta->next = next;
1723
1724 TRANSFER_CHECK (*ta);
1725
1726 SvRV_set (coro_current, (SV *)next->hv);
1727
1728 free_coro_mortal (aTHX);
1729 coro_mortal = prev_sv;
1730}
1731
1732static void
1733prepare_schedule (pTHX_ struct coro_transfer_args *ta)
1734{
1735 for (;;)
1736 {
1737 struct coro *next = coro_deq (aTHX);
1738
1739 if (expect_true (next))
1740 {
1741 /* cannot transfer to destroyed coros, skip and look for next */
1742 if (expect_false (next->flags & (CF_DESTROYED | CF_SUSPENDED)))
1743 SvREFCNT_dec (next->hv); /* coro_nready has already been taken care of by destroy */
1744 else
1745 {
1746 next->flags &= ~CF_READY;
1747 --coro_nready;
1748
1749 prepare_schedule_to (aTHX_ ta, next);
1750 break;
1751 }
1752 }
1753 else
1754 {
1755 /* nothing to schedule: call the idle handler */
1756 if (SvROK (sv_idle)
1757 && SvOBJECT (SvRV (sv_idle)))
1758 {
1759 ++coro_nready; /* hack so that api_ready doesn't invoke ready hook */
1760 api_ready (aTHX_ SvRV (sv_idle));
1761 --coro_nready;
1762 }
1763 else
1764 {
1765 dSP;
1766
1767 ENTER;
1768 SAVETMPS;
1769
1770 PUSHMARK (SP);
1771 PUTBACK;
1772 call_sv (sv_idle, G_VOID | G_DISCARD);
1773
1774 FREETMPS;
1775 LEAVE;
1776 }
1777 }
1778 }
1779}
1780
1781INLINE void
1782prepare_cede (pTHX_ struct coro_transfer_args *ta)
1783{
1784 api_ready (aTHX_ coro_current);
1785 prepare_schedule (aTHX_ ta);
1786}
1787
1788INLINE void
1789prepare_cede_notself (pTHX_ struct coro_transfer_args *ta)
1790{
1791 SV *prev = SvRV (coro_current);
1792
1793 if (coro_nready)
1794 {
1795 prepare_schedule (aTHX_ ta);
1796 api_ready (aTHX_ prev);
1797 }
1798 else
1799 prepare_nop (aTHX_ ta);
1800}
1801
1802static void
1803api_schedule (pTHX)
1804{
1805 struct coro_transfer_args ta;
1806
1807 prepare_schedule (aTHX_ &ta);
1808 TRANSFER (ta, 1);
1809}
1810
1811static void
1812api_schedule_to (pTHX_ SV *coro_sv)
1813{
1814 struct coro_transfer_args ta;
1815 struct coro *next = SvSTATE (coro_sv);
1816
1817 SvREFCNT_inc_NN (coro_sv);
1818 prepare_schedule_to (aTHX_ &ta, next);
1819}
1820
1821static int
1822api_cede (pTHX)
1823{
1824 struct coro_transfer_args ta;
1825
1826 prepare_cede (aTHX_ &ta);
1827
1828 if (expect_true (ta.prev != ta.next))
1829 {
1830 TRANSFER (ta, 1);
1831 return 1;
1832 }
1833 else
1834 return 0;
1835}
1836
1837static int
1838api_cede_notself (pTHX)
1839{
1840 if (coro_nready)
1841 {
1842 struct coro_transfer_args ta;
1843
1844 prepare_cede_notself (aTHX_ &ta);
1845 TRANSFER (ta, 1);
1846 return 1;
1847 }
1848 else
1849 return 0;
1850}
1851
1852static void
1853api_trace (pTHX_ SV *coro_sv, int flags)
1854{
1855 struct coro *coro = SvSTATE (coro_sv);
1856
1857 if (coro->flags & CF_RUNNING)
1858 croak ("cannot enable tracing on a running coroutine, caught");
1859
1860 if (flags & CC_TRACE)
1861 {
1862 if (!coro->cctx)
1863 coro->cctx = cctx_new_run ();
1864 else if (!(coro->cctx->flags & CC_TRACE))
1865 croak ("cannot enable tracing on coroutine with custom stack, caught");
1866
1867 coro->cctx->flags |= CC_NOREUSE | (flags & (CC_TRACE | CC_TRACE_ALL));
1868 }
1869 else if (coro->cctx && coro->cctx->flags & CC_TRACE)
1870 {
1871 coro->cctx->flags &= ~(CC_TRACE | CC_TRACE_ALL);
1872
1873 if (coro->flags & CF_RUNNING)
1874 PL_runops = RUNOPS_DEFAULT;
1875 else
1876 coro->slot->runops = RUNOPS_DEFAULT;
1877 }
1878}
1879
1880static void
1881coro_call_on_destroy (pTHX_ struct coro *coro)
1882{
1883 SV **on_destroyp = hv_fetch (coro->hv, "_on_destroy", sizeof ("_on_destroy") - 1, 0);
1884 SV **statusp = hv_fetch (coro->hv, "_status", sizeof ("_status") - 1, 0);
1885
1886 if (on_destroyp)
1887 {
1888 AV *on_destroy = (AV *)SvRV (*on_destroyp);
1889
1890 while (AvFILLp (on_destroy) >= 0)
1891 {
1892 dSP; /* don't disturb outer sp */
1893 SV *cb = av_pop (on_destroy);
1894
1895 PUSHMARK (SP);
1896
1897 if (statusp)
1898 {
1899 int i;
1900 AV *status = (AV *)SvRV (*statusp);
1901 EXTEND (SP, AvFILLp (status) + 1);
1902
1903 for (i = 0; i <= AvFILLp (status); ++i)
1904 PUSHs (AvARRAY (status)[i]);
1905 }
1906
1907 PUTBACK;
1908 call_sv (sv_2mortal (cb), G_VOID | G_DISCARD);
1909 }
1910 }
1911}
1912
1913static void
1914slf_init_terminate (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
1915{
1916 int i;
1917 HV *hv = (HV *)SvRV (coro_current);
1918 AV *av = newAV ();
1919
1920 av_extend (av, items - 1);
1921 for (i = 0; i < items; ++i)
1922 av_push (av, SvREFCNT_inc_NN (arg [i]));
1923
1924 hv_store (hv, "_status", sizeof ("_status") - 1, newRV_noinc ((SV *)av), 0);
1925
1926 av_push (av_destroy, (SV *)newRV_inc ((SV *)hv)); /* RVinc for perl */
1927 api_ready (aTHX_ sv_manager);
1928
1929 frame->prepare = prepare_schedule;
1930 frame->check = slf_check_repeat;
1931
1932 /* as a minor optimisation, we could unwind all stacks here */
1933 /* but that puts extra pressure on pp_slf, and is not worth much */
1934 /*coro_unwind_stacks (aTHX);*/
1935}
1936
1937/*****************************************************************************/
1938/* async pool handler */
1939
1940static int
1941slf_check_pool_handler (pTHX_ struct CoroSLF *frame)
1942{
1943 HV *hv = (HV *)SvRV (coro_current);
1944 struct coro *coro = (struct coro *)frame->data;
1945
1946 if (!coro->invoke_cb)
1947 return 1; /* loop till we have invoke */
1948 else
1949 {
1950 hv_store (hv, "desc", sizeof ("desc") - 1,
1951 newSVpvn ("[async_pool]", sizeof ("[async_pool]") - 1), 0);
1952
1953 coro->saved_deffh = SvREFCNT_inc_NN ((SV *)PL_defoutgv);
1954
1955 {
1956 dSP;
1957 XPUSHs (sv_2mortal (coro->invoke_cb)); coro->invoke_cb = 0;
1958 PUTBACK;
1959 }
1960
1961 SvREFCNT_dec (GvAV (PL_defgv));
1962 GvAV (PL_defgv) = coro->invoke_av;
1963 coro->invoke_av = 0;
1964
1965 return 0;
1966 }
1967}
1968
1969static void
1970slf_init_pool_handler (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
1971{
1972 HV *hv = (HV *)SvRV (coro_current);
1973 struct coro *coro = SvSTATE_hv ((SV *)hv);
1974
1975 if (expect_true (coro->saved_deffh))
1976 {
1977 /* subsequent iteration */
1978 SvREFCNT_dec ((SV *)PL_defoutgv); PL_defoutgv = (GV *)coro->saved_deffh;
1979 coro->saved_deffh = 0;
1980
1981 if (coro_rss (aTHX_ coro) > SvUV (sv_pool_rss)
1982 || av_len (av_async_pool) + 1 >= SvIV (sv_pool_size))
1983 {
1984 coro->invoke_cb = SvREFCNT_inc_NN ((SV *)cv_coro_terminate);
1985 coro->invoke_av = newAV ();
1986
1987 frame->prepare = prepare_nop;
1988 }
1989 else
1990 {
1991 av_clear (GvAV (PL_defgv));
1992 hv_store (hv, "desc", sizeof ("desc") - 1, SvREFCNT_inc_NN (sv_async_pool_idle), 0);
1993
1994 coro->prio = 0;
1995
1996 if (coro->cctx && (coro->cctx->flags & CC_TRACE))
1997 api_trace (aTHX_ coro_current, 0);
1998
1999 frame->prepare = prepare_schedule;
2000 av_push (av_async_pool, SvREFCNT_inc (hv));
2001 }
2002 }
2003 else
2004 {
2005 /* first iteration, simply fall through */
2006 frame->prepare = prepare_nop;
2007 }
2008
2009 frame->check = slf_check_pool_handler;
2010 frame->data = (void *)coro;
2011}
2012
2013/*****************************************************************************/
2014/* rouse callback */
2015
2016#define CORO_MAGIC_type_rouse PERL_MAGIC_ext
2017
2018static void
2019coro_rouse_callback (pTHX_ CV *cv)
2020{
2021 dXSARGS;
2022 SV *data = (SV *)S_GENSUB_ARG;
2023
2024 if (SvTYPE (SvRV (data)) != SVt_PVAV)
2025 {
2026 /* first call, set args */
2027 SV *coro = SvRV (data);
2028 AV *av = newAV ();
2029
2030 SvRV_set (data, (SV *)av);
2031
2032 /* better take a full copy of the arguments */
2033 while (items--)
2034 av_store (av, items, newSVsv (ST (items)));
2035
2036 api_ready (aTHX_ coro);
2037 SvREFCNT_dec (coro);
2038 }
2039
2040 XSRETURN_EMPTY;
2041}
2042
2043static int
2044slf_check_rouse_wait (pTHX_ struct CoroSLF *frame)
2045{
2046 SV *data = (SV *)frame->data;
2047
2048 if (CORO_THROW)
2049 return 0;
2050
2051 if (SvTYPE (SvRV (data)) != SVt_PVAV)
2052 return 1;
2053
2054 /* now push all results on the stack */
2055 {
2056 dSP;
2057 AV *av = (AV *)SvRV (data);
2058 int i;
2059
2060 EXTEND (SP, AvFILLp (av) + 1);
2061 for (i = 0; i <= AvFILLp (av); ++i)
2062 PUSHs (sv_2mortal (AvARRAY (av)[i]));
2063
2064 /* we have stolen the elements, so set length to zero and free */
2065 AvFILLp (av) = -1;
2066 av_undef (av);
2067
2068 PUTBACK;
2069 }
2070
2071 return 0;
2072}
2073
2074static void
2075slf_init_rouse_wait (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2076{
2077 SV *cb;
2078
2079 if (items)
2080 cb = arg [0];
2081 else
2082 {
2083 struct coro *coro = SvSTATE_current;
2084
2085 if (!coro->rouse_cb)
2086 croak ("Coro::rouse_wait called without rouse callback, and no default rouse callback found either,");
2087
2088 cb = sv_2mortal (coro->rouse_cb);
2089 coro->rouse_cb = 0;
2090 }
2091
2092 if (!SvROK (cb)
2093 || SvTYPE (SvRV (cb)) != SVt_PVCV
2094 || CvXSUB ((CV *)SvRV (cb)) != coro_rouse_callback)
2095 croak ("Coro::rouse_wait called with illegal callback argument,");
2096
2097 {
2098 CV *cv = (CV *)SvRV (cb); /* for S_GENSUB_ARG */
2099 SV *data = (SV *)S_GENSUB_ARG;
2100
2101 frame->data = (void *)data;
2102 frame->prepare = SvTYPE (SvRV (data)) == SVt_PVAV ? prepare_nop : prepare_schedule;
2103 frame->check = slf_check_rouse_wait;
2104 }
2105}
2106
2107static SV *
2108coro_new_rouse_cb (pTHX)
2109{
2110 HV *hv = (HV *)SvRV (coro_current);
2111 struct coro *coro = SvSTATE_hv (hv);
2112 SV *data = newRV_inc ((SV *)hv);
2113 SV *cb = s_gensub (aTHX_ coro_rouse_callback, (void *)data);
2114
2115 sv_magicext (SvRV (cb), data, CORO_MAGIC_type_rouse, 0, 0, 0);
2116 SvREFCNT_dec (data); /* magicext increases the refcount */
2117
2118 SvREFCNT_dec (coro->rouse_cb);
2119 coro->rouse_cb = SvREFCNT_inc_NN (cb);
2120
2121 return cb;
2122}
2123
2124/*****************************************************************************/
2125/* schedule-like-function opcode (SLF) */
2126
2127static UNOP slf_restore; /* restore stack as entersub did, for first-re-run */
2128static const CV *slf_cv;
2129static SV **slf_argv;
2130static int slf_argc, slf_arga; /* count, allocated */
2131static I32 slf_ax; /* top of stack, for restore */
2132
2133/* this restores the stack in the case we patched the entersub, to */
2134/* recreate the stack frame as perl will on following calls */
2135/* since entersub cleared the stack */
2136static OP *
2137pp_restore (pTHX)
2138{
2139 int i;
2140 SV **SP = PL_stack_base + slf_ax;
2141
2142 PUSHMARK (SP);
2143
2144 EXTEND (SP, slf_argc + 1);
2145
2146 for (i = 0; i < slf_argc; ++i)
2147 PUSHs (sv_2mortal (slf_argv [i]));
2148
2149 PUSHs ((SV *)CvGV (slf_cv));
2150
2151 RETURNOP (slf_restore.op_first);
2152}
2153
2154static void
2155slf_prepare_transfer (pTHX_ struct coro_transfer_args *ta)
2156{
2157 SV **arg = (SV **)slf_frame.data;
2158
2159 prepare_transfer (aTHX_ ta, arg [0], arg [1]);
2160}
2161
2162static void
2163slf_init_transfer (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2164{
2165 if (items != 2)
2166 croak ("Coro::State::transfer (prev, next) expects two arguments, not %d,", items);
2167
2168 frame->prepare = slf_prepare_transfer;
2169 frame->check = slf_check_nop;
2170 frame->data = (void *)arg; /* let's hope it will stay valid */
2171}
2172
2173static void
2174slf_init_schedule (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2175{
2176 frame->prepare = prepare_schedule;
2177 frame->check = slf_check_nop;
2178}
2179
2180static void
2181slf_prepare_schedule_to (pTHX_ struct coro_transfer_args *ta)
2182{
2183 struct coro *next = (struct coro *)slf_frame.data;
2184
2185 SvREFCNT_inc_NN (next->hv);
2186 prepare_schedule_to (aTHX_ ta, next);
2187}
2188
2189static void
2190slf_init_schedule_to (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2191{
2192 if (!items)
2193 croak ("Coro::schedule_to expects a coroutine argument, caught");
2194
2195 frame->data = (void *)SvSTATE (arg [0]);
2196 frame->prepare = slf_prepare_schedule_to;
2197 frame->check = slf_check_nop;
2198}
2199
2200static void
2201slf_init_cede_to (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2202{
2203 api_ready (aTHX_ SvRV (coro_current));
2204
2205 slf_init_schedule_to (aTHX_ frame, cv, arg, items);
2206}
2207
2208static void
2209slf_init_cede (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2210{
2211 frame->prepare = prepare_cede;
2212 frame->check = slf_check_nop;
2213}
2214
2215static void
2216slf_init_cede_notself (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2217{
2218 frame->prepare = prepare_cede_notself;
2219 frame->check = slf_check_nop;
2220}
2221
2222/*
2223 * these not obviously related functions are all rolled into one
2224 * function to increase chances that they all will call transfer with the same
2225 * stack offset
2226 * SLF stands for "schedule-like-function".
2227 */
2228static OP *
2229pp_slf (pTHX)
2230{
2231 I32 checkmark; /* mark SP to see how many elements check has pushed */
2232
2233 /* set up the slf frame, unless it has already been set-up */
2234 /* the latter happens when a new coro has been started */
2235 /* or when a new cctx was attached to an existing coroutine */
2236 if (expect_true (!slf_frame.prepare))
2237 {
2238 /* first iteration */
2239 dSP;
2240 SV **arg = PL_stack_base + TOPMARK + 1;
2241 int items = SP - arg; /* args without function object */
2242 SV *gv = *sp;
2243
2244 /* do a quick consistency check on the "function" object, and if it isn't */
2245 /* for us, divert to the real entersub */
2246 if (SvTYPE (gv) != SVt_PVGV
2247 || !GvCV (gv)
2248 || !(CvFLAGS (GvCV (gv)) & CVf_SLF))
2249 return PL_ppaddr[OP_ENTERSUB](aTHX);
2250
2251 if (!(PL_op->op_flags & OPf_STACKED))
2252 {
2253 /* ampersand-form of call, use @_ instead of stack */
2254 AV *av = GvAV (PL_defgv);
2255 arg = AvARRAY (av);
2256 items = AvFILLp (av) + 1;
2257 }
2258
2259 /* now call the init function, which needs to set up slf_frame */
2260 ((coro_slf_cb)CvXSUBANY (GvCV (gv)).any_ptr)
2261 (aTHX_ &slf_frame, GvCV (gv), arg, items);
2262
2263 /* pop args */
2264 SP = PL_stack_base + POPMARK;
2265
2266 PUTBACK;
2267 }
2268
2269 /* now that we have a slf_frame, interpret it! */
2270 /* we use a callback system not to make the code needlessly */
2271 /* complicated, but so we can run multiple perl coros from one cctx */
2272
2273 do
2274 {
2275 struct coro_transfer_args ta;
2276
2277 slf_frame.prepare (aTHX_ &ta);
2278 TRANSFER (ta, 0);
2279
2280 checkmark = PL_stack_sp - PL_stack_base;
2281 }
2282 while (slf_frame.check (aTHX_ &slf_frame));
2283
2284 slf_frame.prepare = 0; /* invalidate the frame, we are done processing it */
2285
2286 /* exception handling */
2287 if (expect_false (CORO_THROW))
2288 {
2289 SV *exception = sv_2mortal (CORO_THROW);
2290
2291 CORO_THROW = 0;
2292 sv_setsv (ERRSV, exception);
2293 croak (0);
2294 }
2295
2296 /* return value handling - mostly like entersub */
2297 /* make sure we put something on the stack in scalar context */
2298 if (GIMME_V == G_SCALAR)
2299 {
2300 dSP;
2301 SV **bot = PL_stack_base + checkmark;
2302
2303 if (sp == bot) /* too few, push undef */
2304 bot [1] = &PL_sv_undef;
2305 else if (sp != bot + 1) /* too many, take last one */
2306 bot [1] = *sp;
2307
2308 SP = bot + 1;
2309
2310 PUTBACK;
2311 }
2312
2313 return NORMAL;
2314}
2315
2316static void
2317api_execute_slf (pTHX_ CV *cv, coro_slf_cb init_cb, I32 ax)
2318{
2319 int i;
2320 SV **arg = PL_stack_base + ax;
2321 int items = PL_stack_sp - arg + 1;
2322
2323 assert (("FATAL: SLF call with illegal CV value", !CvANON (cv)));
2324
2325 if (PL_op->op_ppaddr != PL_ppaddr [OP_ENTERSUB]
2326 && PL_op->op_ppaddr != pp_slf)
2327 croak ("FATAL: Coro SLF calls can only be made normally, not via goto or any other means, caught");
2328
2329 CvFLAGS (cv) |= CVf_SLF;
2330 CvXSUBANY (cv).any_ptr = (void *)init_cb;
2331 slf_cv = cv;
2332
2333 /* we patch the op, and then re-run the whole call */
2334 /* we have to put the same argument on the stack for this to work */
2335 /* and this will be done by pp_restore */
2336 slf_restore.op_next = (OP *)&slf_restore;
2337 slf_restore.op_type = OP_CUSTOM;
2338 slf_restore.op_ppaddr = pp_restore;
2339 slf_restore.op_first = PL_op;
2340
2341 slf_ax = ax - 1; /* undo the ax++ inside dAXMARK */
2342
2343 if (PL_op->op_flags & OPf_STACKED)
2344 {
2345 if (items > slf_arga)
2346 {
2347 slf_arga = items;
2348 free (slf_argv);
2349 slf_argv = malloc (slf_arga * sizeof (SV *));
2350 }
2351
2352 slf_argc = items;
2353
2354 for (i = 0; i < items; ++i)
2355 slf_argv [i] = SvREFCNT_inc (arg [i]);
2356 }
2357 else
2358 slf_argc = 0;
2359
2360 PL_op->op_ppaddr = pp_slf;
2361 /*PL_op->op_type = OP_CUSTOM; /* we do behave like entersub still */
2362
2363 PL_op = (OP *)&slf_restore;
2364}
2365
2366/*****************************************************************************/
2367/* dynamic wind */
2368
2369static void
2370on_enterleave_call (pTHX_ SV *cb)
2371{
2372 dSP;
2373
2374 PUSHSTACK;
2375
2376 PUSHMARK (SP);
2377 PUTBACK;
2378 call_sv (cb, G_VOID | G_DISCARD);
2379 SPAGAIN;
2380
2381 POPSTACK;
2382}
2383
2384static SV *
2385coro_avp_pop_and_free (pTHX_ AV **avp)
2386{
2387 AV *av = *avp;
2388 SV *res = av_pop (av);
2389
2390 if (AvFILLp (av) < 0)
2391 {
2392 *avp = 0;
2393 SvREFCNT_dec (av);
2394 }
2395
2396 return res;
2397}
2398
2399static void
2400coro_pop_on_enter (pTHX_ void *coro)
2401{
2402 SV *cb = coro_avp_pop_and_free (aTHX_ &((struct coro *)coro)->on_enter);
2403 SvREFCNT_dec (cb);
2404}
2405
2406static void
2407coro_pop_on_leave (pTHX_ void *coro)
2408{
2409 SV *cb = coro_avp_pop_and_free (aTHX_ &((struct coro *)coro)->on_leave);
2410 on_enterleave_call (aTHX_ sv_2mortal (cb));
2411}
2412
2413/*****************************************************************************/
2414/* PerlIO::cede */
2415
2416typedef struct
2417{
2418 PerlIOBuf base;
2419 NV next, every;
2420} PerlIOCede;
2421
2422static IV
2423PerlIOCede_pushed (pTHX_ PerlIO *f, const char *mode, SV *arg, PerlIO_funcs *tab)
2424{
2425 PerlIOCede *self = PerlIOSelf (f, PerlIOCede);
2426
2427 self->every = SvCUR (arg) ? SvNV (arg) : 0.01;
2428 self->next = nvtime () + self->every;
2429
2430 return PerlIOBuf_pushed (aTHX_ f, mode, Nullsv, tab);
2431}
2432
2433static SV *
2434PerlIOCede_getarg (pTHX_ PerlIO *f, CLONE_PARAMS *param, int flags)
2435{
2436 PerlIOCede *self = PerlIOSelf (f, PerlIOCede);
2437
2438 return newSVnv (self->every);
2439}
2440
2441static IV
2442PerlIOCede_flush (pTHX_ PerlIO *f)
2443{
2444 PerlIOCede *self = PerlIOSelf (f, PerlIOCede);
2445 double now = nvtime ();
2446
2447 if (now >= self->next)
2448 {
2449 api_cede (aTHX);
2450 self->next = now + self->every;
2451 }
2452
2453 return PerlIOBuf_flush (aTHX_ f);
2454}
2455
2456static PerlIO_funcs PerlIO_cede =
2457{
2458 sizeof(PerlIO_funcs),
2459 "cede",
2460 sizeof(PerlIOCede),
2461 PERLIO_K_DESTRUCT | PERLIO_K_RAW,
2462 PerlIOCede_pushed,
2463 PerlIOBuf_popped,
2464 PerlIOBuf_open,
2465 PerlIOBase_binmode,
2466 PerlIOCede_getarg,
2467 PerlIOBase_fileno,
2468 PerlIOBuf_dup,
2469 PerlIOBuf_read,
2470 PerlIOBuf_unread,
2471 PerlIOBuf_write,
2472 PerlIOBuf_seek,
2473 PerlIOBuf_tell,
2474 PerlIOBuf_close,
2475 PerlIOCede_flush,
2476 PerlIOBuf_fill,
2477 PerlIOBase_eof,
2478 PerlIOBase_error,
2479 PerlIOBase_clearerr,
2480 PerlIOBase_setlinebuf,
2481 PerlIOBuf_get_base,
2482 PerlIOBuf_bufsiz,
2483 PerlIOBuf_get_ptr,
2484 PerlIOBuf_get_cnt,
2485 PerlIOBuf_set_ptrcnt,
2486};
2487
2488/*****************************************************************************/
2489/* Coro::Semaphore & Coro::Signal */
2490
2491static SV *
2492coro_waitarray_new (pTHX_ int count)
2493{
2494 /* a waitarray=semaphore contains a counter IV in $sem->[0] and any waiters after that */
2495 AV *av = newAV ();
2496 SV **ary;
2497
2498 /* unfortunately, building manually saves memory */
2499 Newx (ary, 2, SV *);
2500 AvALLOC (av) = ary;
2501#if PERL_VERSION_ATLEAST (5,10,0)
2502 AvARRAY (av) = ary;
2503#else
2504 /* 5.8.8 needs this syntax instead of AvARRAY = ary, yet */
2505 /* -DDEBUGGING flags this as a bug, despite it perfectly working */
2506 SvPVX ((SV *)av) = (char *)ary;
2507#endif
2508 AvMAX (av) = 1;
2509 AvFILLp (av) = 0;
2510 ary [0] = newSViv (count);
2511
2512 return newRV_noinc ((SV *)av);
2513}
2514
2515/* semaphore */
2516
2517static void
2518coro_semaphore_adjust (pTHX_ AV *av, IV adjust)
2519{
2520 SV *count_sv = AvARRAY (av)[0];
2521 IV count = SvIVX (count_sv);
2522
2523 count += adjust;
2524 SvIVX (count_sv) = count;
2525
2526 /* now wake up as many waiters as are expected to lock */
2527 while (count > 0 && AvFILLp (av) > 0)
2528 {
2529 SV *cb;
2530
2531 /* swap first two elements so we can shift a waiter */
2532 AvARRAY (av)[0] = AvARRAY (av)[1];
2533 AvARRAY (av)[1] = count_sv;
2534 cb = av_shift (av);
2535
2536 if (SvOBJECT (cb))
2537 {
2538 api_ready (aTHX_ cb);
2539 --count;
2540 }
2541 else if (SvTYPE (cb) == SVt_PVCV)
2542 {
2543 dSP;
2544 PUSHMARK (SP);
2545 XPUSHs (sv_2mortal (newRV_inc ((SV *)av)));
2546 PUTBACK;
2547 call_sv (cb, G_VOID | G_DISCARD | G_EVAL | G_KEEPERR);
2548 }
2549
2550 SvREFCNT_dec (cb);
2551 }
2552}
2553
2554static void
2555coro_semaphore_on_destroy (pTHX_ struct coro *coro)
2556{
2557 /* call $sem->adjust (0) to possibly wake up some other waiters */
2558 coro_semaphore_adjust (aTHX_ (AV *)coro->slf_frame.data, 0);
2559}
2560
2561static int
2562slf_check_semaphore_down_or_wait (pTHX_ struct CoroSLF *frame, int acquire)
2563{
2564 AV *av = (AV *)frame->data;
2565 SV *count_sv = AvARRAY (av)[0];
2566
2567 /* if we are about to throw, don't actually acquire the lock, just throw */
2568 if (CORO_THROW)
2569 return 0;
2570 else if (SvIVX (count_sv) > 0)
2571 {
2572 SvSTATE_current->on_destroy = 0;
2573
2574 if (acquire)
2575 SvIVX (count_sv) = SvIVX (count_sv) - 1;
2576 else
2577 coro_semaphore_adjust (aTHX_ av, 0);
2578
2579 return 0;
2580 }
2581 else
2582 {
2583 int i;
2584 /* if we were woken up but can't down, we look through the whole */
2585 /* waiters list and only add us if we aren't in there already */
2586 /* this avoids some degenerate memory usage cases */
2587
2588 for (i = 1; i <= AvFILLp (av); ++i)
2589 if (AvARRAY (av)[i] == SvRV (coro_current))
2590 return 1;
2591
2592 av_push (av, SvREFCNT_inc (SvRV (coro_current)));
2593 return 1;
2594 }
2595}
2596
2597static int
2598slf_check_semaphore_down (pTHX_ struct CoroSLF *frame)
2599{
2600 return slf_check_semaphore_down_or_wait (aTHX_ frame, 1);
2601}
2602
2603static int
2604slf_check_semaphore_wait (pTHX_ struct CoroSLF *frame)
2605{
2606 return slf_check_semaphore_down_or_wait (aTHX_ frame, 0);
2607}
2608
2609static void
2610slf_init_semaphore_down_or_wait (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2611{
2612 AV *av = (AV *)SvRV (arg [0]);
2613
2614 if (SvIVX (AvARRAY (av)[0]) > 0)
2615 {
2616 frame->data = (void *)av;
2617 frame->prepare = prepare_nop;
2618 }
2619 else
2620 {
2621 av_push (av, SvREFCNT_inc (SvRV (coro_current)));
2622
2623 frame->data = (void *)sv_2mortal (SvREFCNT_inc ((SV *)av));
2624 frame->prepare = prepare_schedule;
2625
2626 /* to avoid race conditions when a woken-up coro gets terminated */
2627 /* we arrange for a temporary on_destroy that calls adjust (0) */
2628 SvSTATE_current->on_destroy = coro_semaphore_on_destroy;
2629 }
2630}
2631
2632static void
2633slf_init_semaphore_down (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2634{
2635 slf_init_semaphore_down_or_wait (aTHX_ frame, cv, arg, items);
2636 frame->check = slf_check_semaphore_down;
2637}
2638
2639static void
2640slf_init_semaphore_wait (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2641{
2642 if (items >= 2)
2643 {
2644 /* callback form */
2645 AV *av = (AV *)SvRV (arg [0]);
2646 SV *cb_cv = s_get_cv_croak (arg [1]);
2647
2648 av_push (av, SvREFCNT_inc_NN (cb_cv));
2649
2650 if (SvIVX (AvARRAY (av)[0]) > 0)
2651 coro_semaphore_adjust (aTHX_ av, 0);
2652
2653 frame->prepare = prepare_nop;
2654 frame->check = slf_check_nop;
2655 }
2656 else
2657 {
2658 slf_init_semaphore_down_or_wait (aTHX_ frame, cv, arg, items);
2659 frame->check = slf_check_semaphore_wait;
2660 }
2661}
2662
2663/* signal */
2664
2665static void
2666coro_signal_wake (pTHX_ AV *av, int count)
2667{
2668 SvIVX (AvARRAY (av)[0]) = 0;
2669
2670 /* now signal count waiters */
2671 while (count > 0 && AvFILLp (av) > 0)
2672 {
2673 SV *cb;
2674
2675 /* swap first two elements so we can shift a waiter */
2676 cb = AvARRAY (av)[0];
2677 AvARRAY (av)[0] = AvARRAY (av)[1];
2678 AvARRAY (av)[1] = cb;
2679
2680 cb = av_shift (av);
2681
2682 if (SvTYPE (cb) == SVt_PVCV)
2683 {
2684 dSP;
2685 PUSHMARK (SP);
2686 XPUSHs (sv_2mortal (newRV_inc ((SV *)av)));
2687 PUTBACK;
2688 call_sv (cb, G_VOID | G_DISCARD | G_EVAL | G_KEEPERR);
2689 }
2690 else
2691 {
2692 api_ready (aTHX_ cb);
2693 sv_setiv (cb, 0); /* signal waiter */
2694 }
2695
2696 SvREFCNT_dec (cb);
2697
2698 --count;
2699 }
2700}
2701
2702static int
2703slf_check_signal_wait (pTHX_ struct CoroSLF *frame)
2704{
2705 /* if we are about to throw, also stop waiting */
2706 return SvROK ((SV *)frame->data) && !CORO_THROW;
2707}
2708
2709static void
2710slf_init_signal_wait (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2711{
2712 AV *av = (AV *)SvRV (arg [0]);
2713
2714 if (items >= 2)
2715 {
2716 SV *cb_cv = s_get_cv_croak (arg [1]);
2717 av_push (av, SvREFCNT_inc_NN (cb_cv));
2718
2719 if (SvIVX (AvARRAY (av)[0]))
2720 coro_signal_wake (aTHX_ av, 1); /* ust be the only waiter */
2721
2722 frame->prepare = prepare_nop;
2723 frame->check = slf_check_nop;
2724 }
2725 else if (SvIVX (AvARRAY (av)[0]))
2726 {
2727 SvIVX (AvARRAY (av)[0]) = 0;
2728 frame->prepare = prepare_nop;
2729 frame->check = slf_check_nop;
2730 }
2731 else
2732 {
2733 SV *waiter = newSVsv (coro_current); /* owned by signal av */
2734
2735 av_push (av, waiter);
2736
2737 frame->data = (void *)sv_2mortal (SvREFCNT_inc_NN (waiter)); /* owned by process */
2738 frame->prepare = prepare_schedule;
2739 frame->check = slf_check_signal_wait;
2740 }
2741}
2742
2743/*****************************************************************************/
2744/* Coro::AIO */
2745
2746#define CORO_MAGIC_type_aio PERL_MAGIC_ext
2747
2748/* helper storage struct */
2749struct io_state
2750{
2751 int errorno;
2752 I32 laststype; /* U16 in 5.10.0 */
2753 int laststatval;
2754 Stat_t statcache;
2755};
2756
2757static void
2758coro_aio_callback (pTHX_ CV *cv)
2759{
2760 dXSARGS;
2761 AV *state = (AV *)S_GENSUB_ARG;
2762 SV *coro = av_pop (state);
2763 SV *data_sv = newSV (sizeof (struct io_state));
2764
2765 av_extend (state, items - 1);
2766
2767 sv_upgrade (data_sv, SVt_PV);
2768 SvCUR_set (data_sv, sizeof (struct io_state));
2769 SvPOK_only (data_sv);
2770
2771 {
2772 struct io_state *data = (struct io_state *)SvPVX (data_sv);
2773
2774 data->errorno = errno;
2775 data->laststype = PL_laststype;
2776 data->laststatval = PL_laststatval;
2777 data->statcache = PL_statcache;
2778 }
2779
2780 /* now build the result vector out of all the parameters and the data_sv */
2781 {
2782 int i;
2783
2784 for (i = 0; i < items; ++i)
2785 av_push (state, SvREFCNT_inc_NN (ST (i)));
2786 }
2787
2788 av_push (state, data_sv);
2789
2790 api_ready (aTHX_ coro);
2791 SvREFCNT_dec (coro);
2792 SvREFCNT_dec ((AV *)state);
2793}
2794
2795static int
2796slf_check_aio_req (pTHX_ struct CoroSLF *frame)
2797{
2798 AV *state = (AV *)frame->data;
2799
2800 /* if we are about to throw, return early */
2801 /* this does not cancel the aio request, but at least */
2802 /* it quickly returns */
2803 if (CORO_THROW)
2804 return 0;
2805
2806 /* one element that is an RV? repeat! */
2807 if (AvFILLp (state) == 0 && SvROK (AvARRAY (state)[0]))
2808 return 1;
2809
2810 /* restore status */
2811 {
2812 SV *data_sv = av_pop (state);
2813 struct io_state *data = (struct io_state *)SvPVX (data_sv);
2814
2815 errno = data->errorno;
2816 PL_laststype = data->laststype;
2817 PL_laststatval = data->laststatval;
2818 PL_statcache = data->statcache;
2819
2820 SvREFCNT_dec (data_sv);
2821 }
2822
2823 /* push result values */
2824 {
2825 dSP;
2826 int i;
2827
2828 EXTEND (SP, AvFILLp (state) + 1);
2829 for (i = 0; i <= AvFILLp (state); ++i)
2830 PUSHs (sv_2mortal (SvREFCNT_inc_NN (AvARRAY (state)[i])));
2831
2832 PUTBACK;
2833 }
2834
2835 return 0;
2836}
2837
2838static void
2839slf_init_aio_req (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2840{
2841 AV *state = (AV *)sv_2mortal ((SV *)newAV ());
2842 SV *coro_hv = SvRV (coro_current);
2843 struct coro *coro = SvSTATE_hv (coro_hv);
2844
2845 /* put our coroutine id on the state arg */
2846 av_push (state, SvREFCNT_inc_NN (coro_hv));
2847
2848 /* first see whether we have a non-zero priority and set it as AIO prio */
2849 if (coro->prio)
2850 {
2851 dSP;
2852
2853 static SV *prio_cv;
2854 static SV *prio_sv;
2855
2856 if (expect_false (!prio_cv))
2857 {
2858 prio_cv = (SV *)get_cv ("IO::AIO::aioreq_pri", 0);
2859 prio_sv = newSViv (0);
2860 }
2861
2862 PUSHMARK (SP);
2863 sv_setiv (prio_sv, coro->prio);
2864 XPUSHs (prio_sv);
2865
2866 PUTBACK;
2867 call_sv (prio_cv, G_VOID | G_DISCARD);
2868 }
2869
2870 /* now call the original request */
2871 {
2872 dSP;
2873 CV *req = (CV *)CORO_MAGIC_NN ((SV *)cv, CORO_MAGIC_type_aio)->mg_obj;
2874 int i;
2875
2876 PUSHMARK (SP);
2877
2878 /* first push all args to the stack */
2879 EXTEND (SP, items + 1);
2880
2881 for (i = 0; i < items; ++i)
2882 PUSHs (arg [i]);
2883
2884 /* now push the callback closure */
2885 PUSHs (sv_2mortal (s_gensub (aTHX_ coro_aio_callback, (void *)SvREFCNT_inc_NN ((SV *)state))));
2886
2887 /* now call the AIO function - we assume our request is uncancelable */
2888 PUTBACK;
2889 call_sv ((SV *)req, G_VOID | G_DISCARD);
2890 }
2891
2892 /* now that the requets is going, we loop toll we have a result */
2893 frame->data = (void *)state;
2894 frame->prepare = prepare_schedule;
2895 frame->check = slf_check_aio_req;
2896}
2897
2898static void
2899coro_aio_req_xs (pTHX_ CV *cv)
2900{
2901 dXSARGS;
2902
2903 CORO_EXECUTE_SLF_XS (slf_init_aio_req);
2904
2905 XSRETURN_EMPTY;
2906}
2907
2908/*****************************************************************************/
2909
2910#if CORO_CLONE
2911# include "clone.c"
2912#endif
2913
2914MODULE = Coro::State PACKAGE = Coro::State PREFIX = api_
2915
2916PROTOTYPES: DISABLE
2917
2918BOOT:
2919{
2920#ifdef USE_ITHREADS
2921# if CORO_PTHREAD
2922 coro_thx = PERL_GET_CONTEXT;
2923# endif
2924#endif
2925 BOOT_PAGESIZE;
2926
2927 cctx_current = cctx_new_empty ();
2928
2929 irsgv = gv_fetchpv ("/" , GV_ADD|GV_NOTQUAL, SVt_PV);
2930 stdoutgv = gv_fetchpv ("STDOUT", GV_ADD|GV_NOTQUAL, SVt_PVIO);
2931
2932 orig_sigelem_get = PL_vtbl_sigelem.svt_get; PL_vtbl_sigelem.svt_get = coro_sigelem_get;
2933 orig_sigelem_set = PL_vtbl_sigelem.svt_set; PL_vtbl_sigelem.svt_set = coro_sigelem_set;
2934 orig_sigelem_clr = PL_vtbl_sigelem.svt_clear; PL_vtbl_sigelem.svt_clear = coro_sigelem_clr;
2935
2936 hv_sig = coro_get_hv (aTHX_ "SIG", TRUE);
2937 rv_diehook = newRV_inc ((SV *)gv_fetchpv ("Coro::State::diehook" , 0, SVt_PVCV));
2938 rv_warnhook = newRV_inc ((SV *)gv_fetchpv ("Coro::State::warnhook", 0, SVt_PVCV));
2939
2940 coro_state_stash = gv_stashpv ("Coro::State", TRUE);
2941
2942 newCONSTSUB (coro_state_stash, "CC_TRACE" , newSViv (CC_TRACE));
2943 newCONSTSUB (coro_state_stash, "CC_TRACE_SUB" , newSViv (CC_TRACE_SUB));
2944 newCONSTSUB (coro_state_stash, "CC_TRACE_LINE", newSViv (CC_TRACE_LINE));
2945 newCONSTSUB (coro_state_stash, "CC_TRACE_ALL" , newSViv (CC_TRACE_ALL));
2946
2947 main_mainstack = PL_mainstack;
2948 main_top_env = PL_top_env;
2949
2950 while (main_top_env->je_prev)
2951 main_top_env = main_top_env->je_prev;
2952
2953 {
2954 SV *slf = sv_2mortal (newSViv (PTR2IV (pp_slf)));
2955
2956 if (!PL_custom_op_names) PL_custom_op_names = newHV ();
2957 hv_store_ent (PL_custom_op_names, slf, newSVpv ("coro_slf", 0), 0);
2958
2959 if (!PL_custom_op_descs) PL_custom_op_descs = newHV ();
2960 hv_store_ent (PL_custom_op_descs, slf, newSVpv ("coro schedule like function", 0), 0);
2961 }
2962
2963 coroapi.ver = CORO_API_VERSION;
2964 coroapi.rev = CORO_API_REVISION;
2965
2966 coroapi.transfer = api_transfer;
2967
2968 coroapi.sv_state = SvSTATE_;
2969 coroapi.execute_slf = api_execute_slf;
2970 coroapi.prepare_nop = prepare_nop;
2971 coroapi.prepare_schedule = prepare_schedule;
2972 coroapi.prepare_cede = prepare_cede;
2973 coroapi.prepare_cede_notself = prepare_cede_notself;
2974
2975 {
2976 SV **svp = hv_fetch (PL_modglobal, "Time::NVtime", 12, 0);
2977
2978 if (!svp) croak ("Time::HiRes is required");
2979 if (!SvIOK (*svp)) croak ("Time::NVtime isn't a function pointer");
2980
2981 nvtime = INT2PTR (double (*)(), SvIV (*svp));
2982
2983 svp = hv_fetch (PL_modglobal, "Time::U2time", 12, 0);
2984 u2time = INT2PTR (void (*)(pTHX_ UV ret[2]), SvIV (*svp));
2985 }
2986
2987 assert (("PRIO_NORMAL must be 0", !CORO_PRIO_NORMAL));
2988}
2989
2990SV *
2991new (char *klass, ...)
2992 ALIAS:
2993 Coro::new = 1
2994 CODE:
2995{
2996 struct coro *coro;
2997 MAGIC *mg;
2998 HV *hv;
2999 SV *cb;
3000 int i;
3001
3002 if (items > 1)
3003 {
3004 cb = s_get_cv_croak (ST (1));
3005
3006 if (!ix)
215 { 3007 {
216 /* I never used formats, so how should I know how these are implemented? */ 3008 if (CvISXSUB (cb))
217 /* my bold guess is as a simple, plain sub... */ 3009 croak ("Coro::State doesn't support XS functions as coroutine start, caught");
218 croak ("CXt_FORMAT not yet handled. Don't switch coroutines from within formats"); 3010
3011 if (!CvROOT (cb))
3012 croak ("Coro::State doesn't support autoloaded or undefined functions as coroutine start, caught");
219 } 3013 }
220 } 3014 }
221 3015
222 if (top_si->si_type == PERLSI_MAIN)
223 break;
224
225 top_si = top_si->si_prev;
226 ccstk = top_si->si_cxstack;
227 cxix = top_si->si_cxix;
228 }
229
230 PUTBACK;
231 }
232
233 c->dowarn = PL_dowarn;
234 c->defav = GvAV (PL_defgv);
235 c->curstackinfo = PL_curstackinfo;
236 c->curstack = PL_curstack;
237 c->mainstack = PL_mainstack;
238 c->stack_sp = PL_stack_sp;
239 c->op = PL_op;
240 c->curpad = PL_curpad;
241 c->stack_base = PL_stack_base;
242 c->stack_max = PL_stack_max;
243 c->tmps_stack = PL_tmps_stack;
244 c->tmps_floor = PL_tmps_floor;
245 c->tmps_ix = PL_tmps_ix;
246 c->tmps_max = PL_tmps_max;
247 c->markstack = PL_markstack;
248 c->markstack_ptr = PL_markstack_ptr;
249 c->markstack_max = PL_markstack_max;
250 c->scopestack = PL_scopestack;
251 c->scopestack_ix = PL_scopestack_ix;
252 c->scopestack_max = PL_scopestack_max;
253 c->savestack = PL_savestack;
254 c->savestack_ix = PL_savestack_ix;
255 c->savestack_max = PL_savestack_max;
256 c->retstack = PL_retstack;
257 c->retstack_ix = PL_retstack_ix;
258 c->retstack_max = PL_retstack_max;
259 c->curcop = PL_curcop;
260}
261
262static void
263LOAD(pTHX_ Coro__State c)
264{
265 PL_dowarn = c->dowarn;
266 GvAV (PL_defgv) = c->defav;
267 PL_curstackinfo = c->curstackinfo;
268 PL_curstack = c->curstack;
269 PL_mainstack = c->mainstack;
270 PL_stack_sp = c->stack_sp;
271 PL_op = c->op;
272 PL_curpad = c->curpad;
273 PL_stack_base = c->stack_base;
274 PL_stack_max = c->stack_max;
275 PL_tmps_stack = c->tmps_stack;
276 PL_tmps_floor = c->tmps_floor;
277 PL_tmps_ix = c->tmps_ix;
278 PL_tmps_max = c->tmps_max;
279 PL_markstack = c->markstack;
280 PL_markstack_ptr = c->markstack_ptr;
281 PL_markstack_max = c->markstack_max;
282 PL_scopestack = c->scopestack;
283 PL_scopestack_ix = c->scopestack_ix;
284 PL_scopestack_max = c->scopestack_max;
285 PL_savestack = c->savestack;
286 PL_savestack_ix = c->savestack_ix;
287 PL_savestack_max = c->savestack_max;
288 PL_retstack = c->retstack;
289 PL_retstack_ix = c->retstack_ix;
290 PL_retstack_max = c->retstack_max;
291 PL_curcop = c->curcop;
292
293 {
294 dSP;
295 CV *cv;
296
297 /* now do the ugly restore mess */
298 while ((cv = (CV *)POPs))
299 {
300 AV *padlist = (AV *)POPs;
301
302 unuse_padlist (CvPADLIST(cv));
303 CvPADLIST(cv) = padlist;
304 CvDEPTH(cv) = (I32)POPs;
305
306#ifdef USE_THREADS
307 CvOWNER(cv) = (struct perl_thread *)POPs;
308 error does not work either
309#endif
310 }
311
312 PUTBACK;
313 }
314}
315
316/* this is an EXACT copy of S_nuke_stacks in perl.c, which is unfortunately static */
317STATIC void
318S_nuke_stacks(pTHX)
319{
320 while (PL_curstackinfo->si_next)
321 PL_curstackinfo = PL_curstackinfo->si_next;
322 while (PL_curstackinfo) {
323 PERL_SI *p = PL_curstackinfo->si_prev;
324 /* curstackinfo->si_stack got nuked by sv_free_arenas() */
325 Safefree(PL_curstackinfo->si_cxstack);
326 Safefree(PL_curstackinfo);
327 PL_curstackinfo = p;
328 }
329 Safefree(PL_tmps_stack);
330 Safefree(PL_markstack);
331 Safefree(PL_scopestack);
332 Safefree(PL_savestack);
333 Safefree(PL_retstack);
334}
335
336#define SUB_INIT "Coro::State::_newcoro"
337
338MODULE = Coro::State PACKAGE = Coro::State
339
340PROTOTYPES: ENABLE
341
342BOOT:
343 if (!padlist_cache)
344 padlist_cache = newHV ();
345
346Coro::State
347_newprocess(args)
348 SV * args
349 PROTOTYPE: $
350 CODE:
351 Coro__State coro;
352
353 if (!SvROK (args) || SvTYPE (SvRV (args)) != SVt_PVAV)
354 croak ("Coro::State::newprocess expects an arrayref");
355
356 New (0, coro, 1, struct coro); 3016 Newz (0, coro, 1, struct coro);
3017 coro->args = newAV ();
3018 coro->flags = CF_NEW;
357 3019
358 coro->mainstack = 0; /* actual work is done inside transfer */ 3020 if (coro_first) coro_first->prev = coro;
359 coro->args = (AV *)SvREFCNT_inc (SvRV (args)); 3021 coro->next = coro_first;
3022 coro_first = coro;
360 3023
361 RETVAL = coro; 3024 coro->hv = hv = newHV ();
3025 mg = sv_magicext ((SV *)hv, 0, CORO_MAGIC_type_state, &coro_state_vtbl, (char *)coro, 0);
3026 mg->mg_flags |= MGf_DUP;
3027 RETVAL = sv_bless (newRV_noinc ((SV *)hv), gv_stashpv (klass, 1));
3028
3029 if (items > 1)
3030 {
3031 av_extend (coro->args, items - 1 + ix - 1);
3032
3033 if (ix)
3034 {
3035 av_push (coro->args, SvREFCNT_inc_NN ((SV *)cb));
3036 cb = (SV *)cv_coro_run;
3037 }
3038
3039 coro->startcv = (CV *)SvREFCNT_inc_NN ((SV *)cb);
3040
3041 for (i = 2; i < items; i++)
3042 av_push (coro->args, newSVsv (ST (i)));
3043 }
3044}
362 OUTPUT: 3045 OUTPUT:
363 RETVAL 3046 RETVAL
364 3047
365void 3048void
366transfer(prev,next) 3049transfer (...)
367 Coro::State_or_hashref prev 3050 PROTOTYPE: $$
368 Coro::State_or_hashref next 3051 CODE:
369 CODE: 3052 CORO_EXECUTE_SLF_XS (slf_init_transfer);
370 3053
371 if (prev != next) 3054bool
3055_destroy (SV *coro_sv)
3056 CODE:
3057 RETVAL = coro_state_destroy (aTHX_ SvSTATE (coro_sv));
3058 OUTPUT:
3059 RETVAL
3060
3061void
3062_exit (int code)
3063 PROTOTYPE: $
3064 CODE:
3065 _exit (code);
3066
3067SV *
3068clone (Coro::State coro)
3069 CODE:
3070{
3071#if CORO_CLONE
3072 struct coro *ncoro = coro_clone (aTHX_ coro);
3073 MAGIC *mg;
3074 /* TODO: too much duplication */
3075 ncoro->hv = newHV ();
3076 mg = sv_magicext ((SV *)ncoro->hv, 0, CORO_MAGIC_type_state, &coro_state_vtbl, (char *)ncoro, 0);
3077 mg->mg_flags |= MGf_DUP;
3078 RETVAL = sv_bless (newRV_noinc ((SV *)ncoro->hv), SvSTASH (coro->hv));
3079#else
3080 croak ("Coro::State->clone has not been configured into this installation of Coro, realised");
3081#endif
3082}
3083 OUTPUT:
3084 RETVAL
3085
3086int
3087cctx_stacksize (int new_stacksize = 0)
3088 PROTOTYPE: ;$
3089 CODE:
3090 RETVAL = cctx_stacksize;
3091 if (new_stacksize)
372 { 3092 {
373 PUTBACK; 3093 cctx_stacksize = new_stacksize;
374 SAVE (aTHX_ prev); 3094 ++cctx_gen;
375
376 /* 3095 }
377 * this could be done in newprocess which would lead to 3096 OUTPUT:
378 * extremely elegant and fast (just PUTBACK/SAVE/LOAD/SPAGAIN) 3097 RETVAL
379 * code here, but lazy allocation of stacks has also 3098
380 * some virtues and the overhead of the if() is nil. 3099int
3100cctx_max_idle (int max_idle = 0)
3101 PROTOTYPE: ;$
3102 CODE:
3103 RETVAL = cctx_max_idle;
3104 if (max_idle > 1)
3105 cctx_max_idle = max_idle;
3106 OUTPUT:
3107 RETVAL
3108
3109int
3110cctx_count ()
3111 PROTOTYPE:
3112 CODE:
3113 RETVAL = cctx_count;
3114 OUTPUT:
3115 RETVAL
3116
3117int
3118cctx_idle ()
3119 PROTOTYPE:
3120 CODE:
3121 RETVAL = cctx_idle;
3122 OUTPUT:
3123 RETVAL
3124
3125void
3126list ()
3127 PROTOTYPE:
3128 PPCODE:
3129{
3130 struct coro *coro;
3131 for (coro = coro_first; coro; coro = coro->next)
3132 if (coro->hv)
3133 XPUSHs (sv_2mortal (newRV_inc ((SV *)coro->hv)));
3134}
3135
3136void
3137call (Coro::State coro, SV *coderef)
3138 ALIAS:
3139 eval = 1
3140 CODE:
3141{
3142 if (coro->mainstack && ((coro->flags & CF_RUNNING) || coro->slot))
381 */ 3143 {
382 if (next->mainstack) 3144 struct coro *current = SvSTATE_current;
3145
3146 if (current != coro)
383 { 3147 {
384 LOAD (aTHX_ next); 3148 PUTBACK;
385 next->mainstack = 0; /* unnecessary but much cleaner */ 3149 save_perl (aTHX_ current);
3150 load_perl (aTHX_ coro);
386 SPAGAIN; 3151 SPAGAIN;
387 } 3152 }
3153
3154 PUSHSTACK;
3155
3156 PUSHMARK (SP);
3157 PUTBACK;
3158
3159 if (ix)
3160 eval_sv (coderef, 0);
388 else 3161 else
3162 call_sv (coderef, G_KEEPERR | G_EVAL | G_VOID | G_DISCARD);
3163
3164 POPSTACK;
3165 SPAGAIN;
3166
3167 if (current != coro)
389 { 3168 {
390 /* 3169 PUTBACK;
391 * emulate part of the perl startup here. 3170 save_perl (aTHX_ coro);
392 */ 3171 load_perl (aTHX_ current);
393 UNOP myop;
394
395 init_stacks ();
396 PL_op = (OP *)&myop;
397 /*PL_curcop = 0;*/
398 GvAV (PL_defgv) = (SV *)SvREFCNT_inc (next->args);
399
400 SPAGAIN; 3172 SPAGAIN;
401 Zero(&myop, 1, UNOP);
402 myop.op_next = Nullop;
403 myop.op_flags = OPf_WANT_VOID;
404
405 PUSHMARK(SP);
406 XPUSHs ((SV*)get_cv(SUB_INIT, TRUE));
407 PUTBACK;
408 /*
409 * the next line is slightly wrong, as PL_op->op_next
410 * is actually being executed so we skip the first op.
411 * that doesn't matter, though, since it is only
412 * pp_nextstate and we never return...
413 */
414 PL_op = Perl_pp_entersub(aTHX);
415 SPAGAIN;
416
417 ENTER;
418 } 3173 }
419 } 3174 }
3175}
3176
3177SV *
3178is_ready (Coro::State coro)
3179 PROTOTYPE: $
3180 ALIAS:
3181 is_ready = CF_READY
3182 is_running = CF_RUNNING
3183 is_new = CF_NEW
3184 is_destroyed = CF_DESTROYED
3185 is_suspended = CF_SUSPENDED
3186 CODE:
3187 RETVAL = boolSV (coro->flags & ix);
3188 OUTPUT:
3189 RETVAL
420 3190
421void 3191void
422DESTROY(coro) 3192throw (Coro::State self, SV *throw = &PL_sv_undef)
423 Coro::State coro 3193 PROTOTYPE: $;$
424 CODE: 3194 CODE:
3195{
3196 struct coro *current = SvSTATE_current;
3197 SV **throwp = self == current ? &CORO_THROW : &self->except;
3198 SvREFCNT_dec (*throwp);
3199 SvGETMAGIC (throw);
3200 *throwp = SvOK (throw) ? newSVsv (throw) : 0;
3201}
425 3202
426 if (coro->mainstack) 3203void
3204api_trace (SV *coro, int flags = CC_TRACE | CC_TRACE_SUB)
3205 PROTOTYPE: $;$
3206 C_ARGS: aTHX_ coro, flags
3207
3208SV *
3209has_cctx (Coro::State coro)
3210 PROTOTYPE: $
3211 CODE:
3212 /* maybe manage the running flag differently */
3213 RETVAL = boolSV (!!coro->cctx || (coro->flags & CF_RUNNING));
3214 OUTPUT:
3215 RETVAL
3216
3217int
3218is_traced (Coro::State coro)
3219 PROTOTYPE: $
3220 CODE:
3221 RETVAL = (coro->cctx ? coro->cctx->flags : 0) & CC_TRACE_ALL;
3222 OUTPUT:
3223 RETVAL
3224
3225UV
3226rss (Coro::State coro)
3227 PROTOTYPE: $
3228 ALIAS:
3229 usecount = 1
3230 CODE:
3231 switch (ix)
3232 {
3233 case 0: RETVAL = coro_rss (aTHX_ coro); break;
3234 case 1: RETVAL = coro->usecount; break;
3235 }
3236 OUTPUT:
3237 RETVAL
3238
3239void
3240force_cctx ()
3241 PROTOTYPE:
3242 CODE:
3243 cctx_current->idle_sp = 0;
3244
3245void
3246swap_defsv (Coro::State self)
3247 PROTOTYPE: $
3248 ALIAS:
3249 swap_defav = 1
3250 CODE:
3251 if (!self->slot)
3252 croak ("cannot swap state with coroutine that has no saved state,");
3253 else
427 { 3254 {
428 struct coro temp; 3255 SV **src = ix ? (SV **)&GvAV (PL_defgv) : &GvSV (PL_defgv);
3256 SV **dst = ix ? (SV **)&self->slot->defav : (SV **)&self->slot->defsv;
429 3257
3258 SV *tmp = *src; *src = *dst; *dst = tmp;
3259 }
3260
3261void
3262cancel (Coro::State self)
3263 CODE:
3264 coro_state_destroy (aTHX_ self);
3265 coro_call_on_destroy (aTHX_ self); /* actually only for Coro objects */
3266
3267
3268SV *
3269enable_times (int enabled = enable_times)
3270 CODE:
3271{
3272 RETVAL = boolSV (enable_times);
3273
3274 if (enabled != enable_times)
3275 {
3276 enable_times = enabled;
3277
3278 coro_times_update ();
3279 (enabled ? coro_times_sub : coro_times_add)(SvSTATE (coro_current));
3280 }
3281}
3282 OUTPUT:
3283 RETVAL
3284
3285void
3286times (Coro::State self)
3287 PPCODE:
3288{
3289 struct coro *current = SvSTATE (coro_current);
3290
3291 if (expect_false (current == self))
3292 {
3293 coro_times_update ();
3294 coro_times_add (SvSTATE (coro_current));
3295 }
3296
3297 EXTEND (SP, 2);
3298 PUSHs (sv_2mortal (newSVnv (self->t_real [0] + self->t_real [1] * 1e-9)));
3299 PUSHs (sv_2mortal (newSVnv (self->t_cpu [0] + self->t_cpu [1] * 1e-9)));
3300
3301 if (expect_false (current == self))
3302 coro_times_sub (SvSTATE (coro_current));
3303}
3304
3305MODULE = Coro::State PACKAGE = Coro
3306
3307BOOT:
3308{
3309 sv_pool_rss = coro_get_sv (aTHX_ "Coro::POOL_RSS" , TRUE);
3310 sv_pool_size = coro_get_sv (aTHX_ "Coro::POOL_SIZE" , TRUE);
3311 cv_coro_run = get_cv ( "Coro::_coro_run" , GV_ADD);
3312 cv_coro_terminate = get_cv ( "Coro::terminate" , GV_ADD);
3313 coro_current = coro_get_sv (aTHX_ "Coro::current" , FALSE); SvREADONLY_on (coro_current);
3314 av_async_pool = coro_get_av (aTHX_ "Coro::async_pool", TRUE);
3315 av_destroy = coro_get_av (aTHX_ "Coro::destroy" , TRUE);
3316 sv_manager = coro_get_sv (aTHX_ "Coro::manager" , TRUE);
3317 sv_idle = coro_get_sv (aTHX_ "Coro::idle" , TRUE);
3318
3319 sv_async_pool_idle = newSVpv ("[async pool idle]", 0); SvREADONLY_on (sv_async_pool_idle);
3320 sv_Coro = newSVpv ("Coro", 0); SvREADONLY_on (sv_Coro);
3321 cv_pool_handler = get_cv ("Coro::pool_handler", GV_ADD); SvREADONLY_on (cv_pool_handler);
3322 cv_coro_state_new = get_cv ("Coro::State::new", 0); SvREADONLY_on (cv_coro_state_new);
3323
3324 coro_stash = gv_stashpv ("Coro", TRUE);
3325
3326 newCONSTSUB (coro_stash, "PRIO_MAX", newSViv (CORO_PRIO_MAX));
3327 newCONSTSUB (coro_stash, "PRIO_HIGH", newSViv (CORO_PRIO_HIGH));
3328 newCONSTSUB (coro_stash, "PRIO_NORMAL", newSViv (CORO_PRIO_NORMAL));
3329 newCONSTSUB (coro_stash, "PRIO_LOW", newSViv (CORO_PRIO_LOW));
3330 newCONSTSUB (coro_stash, "PRIO_IDLE", newSViv (CORO_PRIO_IDLE));
3331 newCONSTSUB (coro_stash, "PRIO_MIN", newSViv (CORO_PRIO_MIN));
3332
3333 {
3334 SV *sv = coro_get_sv (aTHX_ "Coro::API", TRUE);
3335
3336 coroapi.schedule = api_schedule;
3337 coroapi.schedule_to = api_schedule_to;
3338 coroapi.cede = api_cede;
3339 coroapi.cede_notself = api_cede_notself;
3340 coroapi.ready = api_ready;
3341 coroapi.is_ready = api_is_ready;
3342 coroapi.nready = coro_nready;
3343 coroapi.current = coro_current;
3344
3345 /*GCoroAPI = &coroapi;*/
3346 sv_setiv (sv, (IV)&coroapi);
3347 SvREADONLY_on (sv);
3348 }
3349}
3350
3351void
3352terminate (...)
3353 CODE:
3354 CORO_EXECUTE_SLF_XS (slf_init_terminate);
3355
3356void
3357schedule (...)
3358 CODE:
3359 CORO_EXECUTE_SLF_XS (slf_init_schedule);
3360
3361void
3362schedule_to (...)
3363 CODE:
3364 CORO_EXECUTE_SLF_XS (slf_init_schedule_to);
3365
3366void
3367cede_to (...)
3368 CODE:
3369 CORO_EXECUTE_SLF_XS (slf_init_cede_to);
3370
3371void
3372cede (...)
3373 CODE:
3374 CORO_EXECUTE_SLF_XS (slf_init_cede);
3375
3376void
3377cede_notself (...)
3378 CODE:
3379 CORO_EXECUTE_SLF_XS (slf_init_cede_notself);
3380
3381void
3382_set_current (SV *current)
3383 PROTOTYPE: $
3384 CODE:
3385 SvREFCNT_dec (SvRV (coro_current));
3386 SvRV_set (coro_current, SvREFCNT_inc_NN (SvRV (current)));
3387
3388void
3389_set_readyhook (SV *hook)
3390 PROTOTYPE: $
3391 CODE:
3392 SvREFCNT_dec (coro_readyhook);
3393 SvGETMAGIC (hook);
3394 if (SvOK (hook))
3395 {
3396 coro_readyhook = newSVsv (hook);
3397 CORO_READYHOOK = invoke_sv_ready_hook_helper;
3398 }
3399 else
3400 {
3401 coro_readyhook = 0;
3402 CORO_READYHOOK = 0;
3403 }
3404
3405int
3406prio (Coro::State coro, int newprio = 0)
3407 PROTOTYPE: $;$
3408 ALIAS:
3409 nice = 1
3410 CODE:
3411{
3412 RETVAL = coro->prio;
3413
3414 if (items > 1)
3415 {
3416 if (ix)
3417 newprio = coro->prio - newprio;
3418
3419 if (newprio < CORO_PRIO_MIN) newprio = CORO_PRIO_MIN;
3420 if (newprio > CORO_PRIO_MAX) newprio = CORO_PRIO_MAX;
3421
3422 coro->prio = newprio;
3423 }
3424}
3425 OUTPUT:
3426 RETVAL
3427
3428SV *
3429ready (SV *self)
3430 PROTOTYPE: $
3431 CODE:
3432 RETVAL = boolSV (api_ready (aTHX_ self));
3433 OUTPUT:
3434 RETVAL
3435
3436int
3437nready (...)
3438 PROTOTYPE:
3439 CODE:
3440 RETVAL = coro_nready;
3441 OUTPUT:
3442 RETVAL
3443
3444void
3445suspend (Coro::State self)
3446 PROTOTYPE: $
3447 CODE:
3448 self->flags |= CF_SUSPENDED;
3449
3450void
3451resume (Coro::State self)
3452 PROTOTYPE: $
3453 CODE:
3454 self->flags &= ~CF_SUSPENDED;
3455
3456void
3457_pool_handler (...)
3458 CODE:
3459 CORO_EXECUTE_SLF_XS (slf_init_pool_handler);
3460
3461void
3462async_pool (SV *cv, ...)
3463 PROTOTYPE: &@
3464 PPCODE:
3465{
3466 HV *hv = (HV *)av_pop (av_async_pool);
3467 AV *av = newAV ();
3468 SV *cb = ST (0);
3469 int i;
3470
3471 av_extend (av, items - 2);
3472 for (i = 1; i < items; ++i)
3473 av_push (av, SvREFCNT_inc_NN (ST (i)));
3474
3475 if ((SV *)hv == &PL_sv_undef)
3476 {
3477 PUSHMARK (SP);
3478 EXTEND (SP, 2);
3479 PUSHs (sv_Coro);
3480 PUSHs ((SV *)cv_pool_handler);
430 PUTBACK; 3481 PUTBACK;
431 SAVE(aTHX_ (&temp)); 3482 call_sv ((SV *)cv_coro_state_new, G_SCALAR);
432 LOAD(aTHX_ coro);
433
434 S_nuke_stacks ();
435 SvREFCNT_dec ((SV *)GvAV (PL_defgv));
436
437 LOAD((&temp));
438 SPAGAIN; 3483 SPAGAIN;
3484
3485 hv = (HV *)SvREFCNT_inc_NN (SvRV (POPs));
439 } 3486 }
440 3487
3488 {
3489 struct coro *coro = SvSTATE_hv (hv);
3490
3491 assert (!coro->invoke_cb);
3492 assert (!coro->invoke_av);
3493 coro->invoke_cb = SvREFCNT_inc (cb);
3494 coro->invoke_av = av;
3495 }
3496
3497 api_ready (aTHX_ (SV *)hv);
3498
3499 if (GIMME_V != G_VOID)
3500 XPUSHs (sv_2mortal (newRV_noinc ((SV *)hv)));
3501 else
441 SvREFCNT_dec (coro->args); 3502 SvREFCNT_dec (hv);
442 Safefree (coro); 3503}
443 3504
3505SV *
3506rouse_cb ()
3507 PROTOTYPE:
3508 CODE:
3509 RETVAL = coro_new_rouse_cb (aTHX);
3510 OUTPUT:
3511 RETVAL
444 3512
3513void
3514rouse_wait (...)
3515 PROTOTYPE: ;$
3516 PPCODE:
3517 CORO_EXECUTE_SLF_XS (slf_init_rouse_wait);
3518
3519void
3520on_enter (SV *block)
3521 ALIAS:
3522 on_leave = 1
3523 PROTOTYPE: &
3524 CODE:
3525{
3526 struct coro *coro = SvSTATE_current;
3527 AV **avp = ix ? &coro->on_leave : &coro->on_enter;
3528
3529 block = s_get_cv_croak (block);
3530
3531 if (!*avp)
3532 *avp = newAV ();
3533
3534 av_push (*avp, SvREFCNT_inc (block));
3535
3536 if (!ix)
3537 on_enterleave_call (aTHX_ block);
3538
3539 LEAVE; /* pp_entersub unfortunately forces an ENTER/LEAVE around XS calls */
3540 SAVEDESTRUCTOR_X (ix ? coro_pop_on_leave : coro_pop_on_enter, (void *)coro);
3541 ENTER; /* pp_entersub unfortunately forces an ENTER/LEAVE around XS calls */
3542}
3543
3544
3545MODULE = Coro::State PACKAGE = PerlIO::cede
3546
3547BOOT:
3548 PerlIO_define_layer (aTHX_ &PerlIO_cede);
3549
3550
3551MODULE = Coro::State PACKAGE = Coro::Semaphore
3552
3553SV *
3554new (SV *klass, SV *count = 0)
3555 CODE:
3556{
3557 int semcnt = 1;
3558
3559 if (count)
3560 {
3561 SvGETMAGIC (count);
3562
3563 if (SvOK (count))
3564 semcnt = SvIV (count);
3565 }
3566
3567 RETVAL = sv_bless (
3568 coro_waitarray_new (aTHX_ semcnt),
3569 GvSTASH (CvGV (cv))
3570 );
3571}
3572 OUTPUT:
3573 RETVAL
3574
3575# helper for Coro::Channel and others
3576SV *
3577_alloc (int count)
3578 CODE:
3579 RETVAL = coro_waitarray_new (aTHX_ count);
3580 OUTPUT:
3581 RETVAL
3582
3583SV *
3584count (SV *self)
3585 CODE:
3586 RETVAL = newSVsv (AvARRAY ((AV *)SvRV (self))[0]);
3587 OUTPUT:
3588 RETVAL
3589
3590void
3591up (SV *self, int adjust = 1)
3592 ALIAS:
3593 adjust = 1
3594 CODE:
3595 coro_semaphore_adjust (aTHX_ (AV *)SvRV (self), ix ? adjust : 1);
3596
3597void
3598down (...)
3599 CODE:
3600 CORO_EXECUTE_SLF_XS (slf_init_semaphore_down);
3601
3602void
3603wait (...)
3604 CODE:
3605 CORO_EXECUTE_SLF_XS (slf_init_semaphore_wait);
3606
3607void
3608try (SV *self)
3609 PPCODE:
3610{
3611 AV *av = (AV *)SvRV (self);
3612 SV *count_sv = AvARRAY (av)[0];
3613 IV count = SvIVX (count_sv);
3614
3615 if (count > 0)
3616 {
3617 --count;
3618 SvIVX (count_sv) = count;
3619 XSRETURN_YES;
3620 }
3621 else
3622 XSRETURN_NO;
3623}
3624
3625void
3626waiters (SV *self)
3627 PPCODE:
3628{
3629 AV *av = (AV *)SvRV (self);
3630 int wcount = AvFILLp (av) + 1 - 1;
3631
3632 if (GIMME_V == G_SCALAR)
3633 XPUSHs (sv_2mortal (newSViv (wcount)));
3634 else
3635 {
3636 int i;
3637 EXTEND (SP, wcount);
3638 for (i = 1; i <= wcount; ++i)
3639 PUSHs (sv_2mortal (newRV_inc (AvARRAY (av)[i])));
3640 }
3641}
3642
3643MODULE = Coro::State PACKAGE = Coro::SemaphoreSet
3644
3645void
3646_may_delete (SV *sem, int count, int extra_refs)
3647 PPCODE:
3648{
3649 AV *av = (AV *)SvRV (sem);
3650
3651 if (SvREFCNT ((SV *)av) == 1 + extra_refs
3652 && AvFILLp (av) == 0 /* no waiters, just count */
3653 && SvIV (AvARRAY (av)[0]) == count)
3654 XSRETURN_YES;
3655
3656 XSRETURN_NO;
3657}
3658
3659MODULE = Coro::State PACKAGE = Coro::Signal
3660
3661SV *
3662new (SV *klass)
3663 CODE:
3664 RETVAL = sv_bless (
3665 coro_waitarray_new (aTHX_ 0),
3666 GvSTASH (CvGV (cv))
3667 );
3668 OUTPUT:
3669 RETVAL
3670
3671void
3672wait (...)
3673 CODE:
3674 CORO_EXECUTE_SLF_XS (slf_init_signal_wait);
3675
3676void
3677broadcast (SV *self)
3678 CODE:
3679{
3680 AV *av = (AV *)SvRV (self);
3681 coro_signal_wake (aTHX_ av, AvFILLp (av));
3682}
3683
3684void
3685send (SV *self)
3686 CODE:
3687{
3688 AV *av = (AV *)SvRV (self);
3689
3690 if (AvFILLp (av))
3691 coro_signal_wake (aTHX_ av, 1);
3692 else
3693 SvIVX (AvARRAY (av)[0]) = 1; /* remember the signal */
3694}
3695
3696IV
3697awaited (SV *self)
3698 CODE:
3699 RETVAL = AvFILLp ((AV *)SvRV (self)) + 1 - 1;
3700 OUTPUT:
3701 RETVAL
3702
3703
3704MODULE = Coro::State PACKAGE = Coro::AnyEvent
3705
3706BOOT:
3707 sv_activity = coro_get_sv (aTHX_ "Coro::AnyEvent::ACTIVITY", TRUE);
3708
3709void
3710_schedule (...)
3711 CODE:
3712{
3713 static int incede;
3714
3715 api_cede_notself (aTHX);
3716
3717 ++incede;
3718 while (coro_nready >= incede && api_cede (aTHX))
3719 ;
3720
3721 sv_setsv (sv_activity, &PL_sv_undef);
3722 if (coro_nready >= incede)
3723 {
3724 PUSHMARK (SP);
3725 PUTBACK;
3726 call_pv ("Coro::AnyEvent::_activity", G_KEEPERR | G_EVAL | G_VOID | G_DISCARD);
3727 }
3728
3729 --incede;
3730}
3731
3732
3733MODULE = Coro::State PACKAGE = Coro::AIO
3734
3735void
3736_register (char *target, char *proto, SV *req)
3737 CODE:
3738{
3739 SV *req_cv = s_get_cv_croak (req);
3740 /* newXSproto doesn't return the CV on 5.8 */
3741 CV *slf_cv = newXS (target, coro_aio_req_xs, __FILE__);
3742 sv_setpv ((SV *)slf_cv, proto);
3743 sv_magicext ((SV *)slf_cv, (SV *)req_cv, CORO_MAGIC_type_aio, 0, 0, 0);
3744}
3745
3746MODULE = Coro::State PACKAGE = Coro::Select
3747
3748void
3749patch_pp_sselect ()
3750 CODE:
3751 if (!coro_old_pp_sselect)
3752 {
3753 coro_select_select = (SV *)get_cv ("Coro::Select::select", 0);
3754 coro_old_pp_sselect = PL_ppaddr [OP_SSELECT];
3755 PL_ppaddr [OP_SSELECT] = coro_pp_sselect;
3756 }
3757
3758void
3759unpatch_pp_sselect ()
3760 CODE:
3761 if (coro_old_pp_sselect)
3762 {
3763 PL_ppaddr [OP_SSELECT] = coro_old_pp_sselect;
3764 coro_old_pp_sselect = 0;
3765 }
3766
3767

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines