ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/Coro/Coro/State.xs
(Generate patch)

Comparing Coro/Coro/State.xs (file contents):
Revision 1.5 by root, Tue Jul 17 02:55:29 2001 UTC vs.
Revision 1.372 by root, Thu Oct 1 23:50:23 2009 UTC

1#include "libcoro/coro.c"
2
3#define PERL_NO_GET_CONTEXT
4#define PERL_EXT
5
1#include "EXTERN.h" 6#include "EXTERN.h"
2#include "perl.h" 7#include "perl.h"
3#include "XSUB.h" 8#include "XSUB.h"
9#include "perliol.h"
4 10
5#if 0 11#include "schmorp.h"
6# define CHK(x) (void *)0 12
13#include <stdio.h>
14#include <errno.h>
15#include <assert.h>
16
17#ifdef WIN32
18# undef setjmp
19# undef longjmp
20# undef _exit
21# define setjmp _setjmp /* deep magic */
7#else 22#else
8# define CHK(x) if (!(x)) croak("FATAL, CHK: " #x) 23# include <inttypes.h> /* most portable stdint.h */
9#endif 24#endif
10 25
26#ifdef HAVE_MMAP
27# include <unistd.h>
28# include <sys/mman.h>
29# ifndef MAP_ANONYMOUS
30# ifdef MAP_ANON
31# define MAP_ANONYMOUS MAP_ANON
32# else
33# undef HAVE_MMAP
34# endif
35# endif
36# include <limits.h>
37# ifndef PAGESIZE
38# define PAGESIZE pagesize
39# define BOOT_PAGESIZE pagesize = sysconf (_SC_PAGESIZE)
40static long pagesize;
41# else
42# define BOOT_PAGESIZE (void)0
43# endif
44#else
45# define PAGESIZE 0
46# define BOOT_PAGESIZE (void)0
47#endif
48
49#if CORO_USE_VALGRIND
50# include <valgrind/valgrind.h>
51#endif
52
53/* the maximum number of idle cctx that will be pooled */
54static int cctx_max_idle = 4;
55
56#if !__i386 && !__x86_64 && !__powerpc && !__m68k && !__alpha && !__mips && !__sparc64
57# undef CORO_STACKGUARD
58#endif
59
60#ifndef CORO_STACKGUARD
61# define CORO_STACKGUARD 0
62#endif
63
64/* prefer perl internal functions over our own? */
65#ifndef CORO_PREFER_PERL_FUNCTIONS
66# define CORO_PREFER_PERL_FUNCTIONS 0
67#endif
68
69/* The next macros try to return the current stack pointer, in an as
70 * portable way as possible. */
71#if __GNUC__ >= 4
72# define dSTACKLEVEL int stacklevel_dummy
73# define STACKLEVEL __builtin_frame_address (0)
74#else
75# define dSTACKLEVEL volatile void *stacklevel
76# define STACKLEVEL ((void *)&stacklevel)
77#endif
78
79#define IN_DESTRUCT PL_dirty
80
81#if __GNUC__ >= 3
82# define attribute(x) __attribute__(x)
83# define expect(expr,value) __builtin_expect ((expr), (value))
84# define INLINE static inline
85#else
86# define attribute(x)
87# define expect(expr,value) (expr)
88# define INLINE static
89#endif
90
91#define expect_false(expr) expect ((expr) != 0, 0)
92#define expect_true(expr) expect ((expr) != 0, 1)
93
94#define NOINLINE attribute ((noinline))
95
96#include "CoroAPI.h"
97#define GCoroAPI (&coroapi) /* very sneaky */
98
99#ifdef USE_ITHREADS
100# if CORO_PTHREAD
101static void *coro_thx;
102# endif
103#endif
104
105#ifdef __linux
106# include <time.h> /* for timespec */
107# include <syscall.h> /* for SYS_* */
108# ifdef SYS_clock_gettime
109# define coro_clock_gettime(id, ts) syscall (SYS_clock_gettime, (id), (ts))
110# define CORO_CLOCK_MONOTONIC 1
111# define CORO_CLOCK_THREAD_CPUTIME_ID 3
112# endif
113#endif
114
115static double (*nvtime)(); /* so why doesn't it take void? */
116static void (*u2time)(pTHX_ UV ret[2]);
117
118/* we hijack an hopefully unused CV flag for our purposes */
119#define CVf_SLF 0x4000
120static OP *pp_slf (pTHX);
121
122static U32 cctx_gen;
123static size_t cctx_stacksize = CORO_STACKSIZE;
124static struct CoroAPI coroapi;
125static AV *main_mainstack; /* used to differentiate between $main and others */
126static JMPENV *main_top_env;
127static HV *coro_state_stash, *coro_stash;
128static volatile SV *coro_mortal; /* will be freed/thrown after next transfer */
129
130static AV *av_destroy; /* destruction queue */
131static SV *sv_manager; /* the manager coro */
132static SV *sv_idle; /* $Coro::idle */
133
134static GV *irsgv; /* $/ */
135static GV *stdoutgv; /* *STDOUT */
136static SV *rv_diehook;
137static SV *rv_warnhook;
138static HV *hv_sig; /* %SIG */
139
140/* async_pool helper stuff */
141static SV *sv_pool_rss;
142static SV *sv_pool_size;
143static SV *sv_async_pool_idle; /* description string */
144static AV *av_async_pool; /* idle pool */
145static SV *sv_Coro; /* class string */
146static CV *cv_pool_handler;
147
148/* Coro::AnyEvent */
149static SV *sv_activity;
150
151/* enable processtime/realtime profiling */
152static char enable_times;
153typedef U32 coro_ts[2];
154static coro_ts time_real, time_cpu;
155static char times_valid;
156
157static struct coro_cctx *cctx_first;
158static int cctx_count, cctx_idle;
159
160enum {
161 CC_MAPPED = 0x01,
162 CC_NOREUSE = 0x02, /* throw this away after tracing */
163 CC_TRACE = 0x04,
164 CC_TRACE_SUB = 0x08, /* trace sub calls */
165 CC_TRACE_LINE = 0x10, /* trace each statement */
166 CC_TRACE_ALL = CC_TRACE_SUB | CC_TRACE_LINE,
167};
168
169/* this is a structure representing a c-level coroutine */
170typedef struct coro_cctx
171{
172 struct coro_cctx *next;
173
174 /* the stack */
175 void *sptr;
176 size_t ssize;
177
178 /* cpu state */
179 void *idle_sp; /* sp of top-level transfer/schedule/cede call */
180 JMPENV *idle_te; /* same as idle_sp, but for top_env, TODO: remove once stable */
181 JMPENV *top_env;
182 coro_context cctx;
183
184 U32 gen;
185#if CORO_USE_VALGRIND
186 int valgrind_id;
187#endif
188 unsigned char flags;
189} coro_cctx;
190
191coro_cctx *cctx_current; /* the currently running cctx */
192
193/*****************************************************************************/
194
195enum {
196 CF_RUNNING = 0x0001, /* coroutine is running */
197 CF_READY = 0x0002, /* coroutine is ready */
198 CF_NEW = 0x0004, /* has never been switched to */
199 CF_DESTROYED = 0x0008, /* coroutine data has been freed */
200 CF_SUSPENDED = 0x0010, /* coroutine can't be scheduled */
201};
202
203/* the structure where most of the perl state is stored, overlaid on the cxstack */
204typedef struct
205{
206 SV *defsv;
207 AV *defav;
208 SV *errsv;
209 SV *irsgv;
210 HV *hinthv;
211#define VAR(name,type) type name;
212# include "state.h"
213#undef VAR
214} perl_slots;
215
216#define SLOT_COUNT ((sizeof (perl_slots) + sizeof (PERL_CONTEXT) - 1) / sizeof (PERL_CONTEXT))
217
218/* this is a structure representing a perl-level coroutine */
11struct coro { 219struct coro {
12 U8 dowarn; 220 /* the C coroutine allocated to this perl coroutine, if any */
13 AV *defav; 221 coro_cctx *cctx;
14 222
15 PERL_SI *curstackinfo; 223 /* ready queue */
16 AV *curstack; 224 struct coro *next_ready;
225
226 /* state data */
227 struct CoroSLF slf_frame; /* saved slf frame */
17 AV *mainstack; 228 AV *mainstack;
18 SV **stack_sp; 229 perl_slots *slot; /* basically the saved sp */
19 OP *op;
20 SV **curpad;
21 SV **stack_base;
22 SV **stack_max;
23 SV **tmps_stack;
24 I32 tmps_floor;
25 I32 tmps_ix;
26 I32 tmps_max;
27 I32 *markstack;
28 I32 *markstack_ptr;
29 I32 *markstack_max;
30 I32 *scopestack;
31 I32 scopestack_ix;
32 I32 scopestack_max;
33 ANY *savestack;
34 I32 savestack_ix;
35 I32 savestack_max;
36 OP **retstack;
37 I32 retstack_ix;
38 I32 retstack_max;
39 COP *curcop;
40 230
41 AV *args; 231 CV *startcv; /* the CV to execute */
232 AV *args; /* data associated with this coroutine (initial args) */
233 int refcnt; /* coroutines are refcounted, yes */
234 int flags; /* CF_ flags */
235 HV *hv; /* the perl hash associated with this coro, if any */
236 void (*on_destroy)(pTHX_ struct coro *coro);
237
238 /* statistics */
239 int usecount; /* number of transfers to this coro */
240
241 /* coro process data */
242 int prio;
243 SV *except; /* exception to be thrown */
244 SV *rouse_cb;
245
246 /* async_pool */
247 SV *saved_deffh;
248 SV *invoke_cb;
249 AV *invoke_av;
250
251 /* on_enter/on_leave */
252 AV *on_enter;
253 AV *on_leave;
254
255 /* times */
256 coro_ts t_cpu, t_real;
257
258 /* linked list */
259 struct coro *next, *prev;
42}; 260};
43 261
44typedef struct coro *Coro__State; 262typedef struct coro *Coro__State;
45typedef struct coro *Coro__State_or_hashref; 263typedef struct coro *Coro__State_or_hashref;
46 264
47static HV *padlist_cache; 265/* the following variables are effectively part of the perl context */
266/* and get copied between struct coro and these variables */
267/* the mainr easonw e don't support windows process emulation */
268static struct CoroSLF slf_frame; /* the current slf frame */
48 269
49/* mostly copied from op.c:cv_clone2 */ 270/** Coro ********************************************************************/
50STATIC AV * 271
51clone_padlist (AV *protopadlist) 272#define CORO_PRIO_MAX 3
273#define CORO_PRIO_HIGH 1
274#define CORO_PRIO_NORMAL 0
275#define CORO_PRIO_LOW -1
276#define CORO_PRIO_IDLE -3
277#define CORO_PRIO_MIN -4
278
279/* for Coro.pm */
280static SV *coro_current;
281static SV *coro_readyhook;
282static struct coro *coro_ready [CORO_PRIO_MAX - CORO_PRIO_MIN + 1][2]; /* head|tail */
283static CV *cv_coro_run, *cv_coro_terminate;
284static struct coro *coro_first;
285#define coro_nready coroapi.nready
286
287/** Coro::Select ************************************************************/
288
289static OP *(*coro_old_pp_sselect) (pTHX);
290static SV *coro_select_select;
291
292/* horrible hack, but if it works... */
293static OP *
294coro_pp_sselect (pTHX)
52{ 295{
53 AV *av; 296 dSP;
54 I32 ix; 297 PUSHMARK (SP - 4); /* fake argument list */
55 AV *protopad_name = (AV *) * av_fetch (protopadlist, 0, FALSE); 298 XPUSHs (coro_select_select);
56 AV *protopad = (AV *) * av_fetch (protopadlist, 1, FALSE); 299 PUTBACK;
57 SV **pname = AvARRAY (protopad_name); 300
58 SV **ppad = AvARRAY (protopad); 301 /* entersub is an UNOP, select a LISTOP... keep your fingers crossed */
59 I32 fname = AvFILLp (protopad_name); 302 PL_op->op_flags |= OPf_STACKED;
60 I32 fpad = AvFILLp (protopad); 303 PL_op->op_private = 0;
304 return PL_ppaddr [OP_ENTERSUB](aTHX);
305}
306
307/** lowlevel stuff **********************************************************/
308
309static SV *
310coro_get_sv (pTHX_ const char *name, int create)
311{
312#if PERL_VERSION_ATLEAST (5,10,0)
313 /* silence stupid and wrong 5.10 warning that I am unable to switch off */
314 get_sv (name, create);
315#endif
316 return get_sv (name, create);
317}
318
319static AV *
320coro_get_av (pTHX_ const char *name, int create)
321{
322#if PERL_VERSION_ATLEAST (5,10,0)
323 /* silence stupid and wrong 5.10 warning that I am unable to switch off */
324 get_av (name, create);
325#endif
326 return get_av (name, create);
327}
328
329static HV *
330coro_get_hv (pTHX_ const char *name, int create)
331{
332#if PERL_VERSION_ATLEAST (5,10,0)
333 /* silence stupid and wrong 5.10 warning that I am unable to switch off */
334 get_hv (name, create);
335#endif
336 return get_hv (name, create);
337}
338
339INLINE void
340coro_times_update ()
341{
342#ifdef coro_clock_gettime
343 struct timespec ts;
344
345 ts.tv_sec = ts.tv_nsec = 0;
346 coro_clock_gettime (CORO_CLOCK_THREAD_CPUTIME_ID, &ts);
347 time_cpu [0] = ts.tv_sec; time_cpu [1] = ts.tv_nsec;
348
349 ts.tv_sec = ts.tv_nsec = 0;
350 coro_clock_gettime (CORO_CLOCK_MONOTONIC, &ts);
351 time_real [0] = ts.tv_sec; time_real [1] = ts.tv_nsec;
352#else
353 dTHX;
354 UV tv[2];
355
356 u2time (aTHX_ tv);
357 time_real [0] = tv [0];
358 time_real [1] = tv [1] * 1000;
359#endif
360}
361
362INLINE void
363coro_times_add (struct coro *c)
364{
365 c->t_real [1] += time_real [1];
366 if (c->t_real [1] > 1000000000) { c->t_real [1] -= 1000000000; ++c->t_real [0]; }
367 c->t_real [0] += time_real [0];
368
369 c->t_cpu [1] += time_cpu [1];
370 if (c->t_cpu [1] > 1000000000) { c->t_cpu [1] -= 1000000000; ++c->t_cpu [0]; }
371 c->t_cpu [0] += time_cpu [0];
372}
373
374INLINE void
375coro_times_sub (struct coro *c)
376{
377 if (c->t_real [1] < time_real [1]) { c->t_real [1] += 1000000000; --c->t_real [0]; }
378 c->t_real [1] -= time_real [1];
379 c->t_real [0] -= time_real [0];
380
381 if (c->t_cpu [1] < time_cpu [1]) { c->t_cpu [1] += 1000000000; --c->t_cpu [0]; }
382 c->t_cpu [1] -= time_cpu [1];
383 c->t_cpu [0] -= time_cpu [0];
384}
385
386/*****************************************************************************/
387/* magic glue */
388
389#define CORO_MAGIC_type_cv 26
390#define CORO_MAGIC_type_state PERL_MAGIC_ext
391
392#define CORO_MAGIC_NN(sv, type) \
393 (expect_true (SvMAGIC (sv)->mg_type == type) \
394 ? SvMAGIC (sv) \
395 : mg_find (sv, type))
396
397#define CORO_MAGIC(sv, type) \
398 (expect_true (SvMAGIC (sv)) \
399 ? CORO_MAGIC_NN (sv, type) \
400 : 0)
401
402#define CORO_MAGIC_cv(cv) CORO_MAGIC (((SV *)(cv)), CORO_MAGIC_type_cv)
403#define CORO_MAGIC_state(sv) CORO_MAGIC_NN (((SV *)(sv)), CORO_MAGIC_type_state)
404
405INLINE struct coro *
406SvSTATE_ (pTHX_ SV *coro)
407{
408 HV *stash;
409 MAGIC *mg;
410
411 if (SvROK (coro))
412 coro = SvRV (coro);
413
414 if (expect_false (SvTYPE (coro) != SVt_PVHV))
415 croak ("Coro::State object required");
416
417 stash = SvSTASH (coro);
418 if (expect_false (stash != coro_stash && stash != coro_state_stash))
419 {
420 /* very slow, but rare, check */
421 if (!sv_derived_from (sv_2mortal (newRV_inc (coro)), "Coro::State"))
422 croak ("Coro::State object required");
423 }
424
425 mg = CORO_MAGIC_state (coro);
426 return (struct coro *)mg->mg_ptr;
427}
428
429#define SvSTATE(sv) SvSTATE_ (aTHX_ (sv))
430
431/* faster than SvSTATE, but expects a coroutine hv */
432#define SvSTATE_hv(hv) ((struct coro *)CORO_MAGIC_NN ((SV *)hv, CORO_MAGIC_type_state)->mg_ptr)
433#define SvSTATE_current SvSTATE_hv (SvRV (coro_current))
434
435/*****************************************************************************/
436/* padlist management and caching */
437
438static AV *
439coro_derive_padlist (pTHX_ CV *cv)
440{
441 AV *padlist = CvPADLIST (cv);
61 AV *newpadlist, *newpad_name, *newpad; 442 AV *newpadlist, *newpad;
62 SV **npad;
63
64 newpad_name = newAV ();
65 for (ix = fname; ix >= 0; ix--)
66 av_store (newpad_name, ix, SvREFCNT_inc (pname[ix]));
67
68 newpad = newAV ();
69 av_fill (newpad, AvFILLp (protopad));
70 npad = AvARRAY (newpad);
71 443
72 newpadlist = newAV (); 444 newpadlist = newAV ();
73 AvREAL_off (newpadlist); 445 AvREAL_off (newpadlist);
74 av_store (newpadlist, 0, (SV *) newpad_name); 446#if PERL_VERSION_ATLEAST (5,10,0)
447 Perl_pad_push (aTHX_ padlist, AvFILLp (padlist) + 1);
448#else
449 Perl_pad_push (aTHX_ padlist, AvFILLp (padlist) + 1, 1);
450#endif
451 newpad = (AV *)AvARRAY (padlist)[AvFILLp (padlist)];
452 --AvFILLp (padlist);
453
454 av_store (newpadlist, 0, SvREFCNT_inc_NN (AvARRAY (padlist)[0]));
75 av_store (newpadlist, 1, (SV *) newpad); 455 av_store (newpadlist, 1, (SV *)newpad);
76 456
77 av = newAV (); /* will be @_ */ 457 return newpadlist;
78 av_extend (av, 0); 458}
79 av_store (newpad, 0, (SV *) av);
80 AvFLAGS (av) = AVf_REIFY;
81 459
82 for (ix = fpad; ix > 0; ix--) 460static void
461free_padlist (pTHX_ AV *padlist)
462{
463 /* may be during global destruction */
464 if (!IN_DESTRUCT)
83 { 465 {
84 SV *namesv = (ix <= fname) ? pname[ix] : Nullsv; 466 I32 i = AvFILLp (padlist);
85 if (namesv && namesv != &PL_sv_undef) 467
468 while (i > 0) /* special-case index 0 */
86 { 469 {
87 char *name = SvPVX (namesv); /* XXX */ 470 /* we try to be extra-careful here */
88 if (SvFLAGS (namesv) & SVf_FAKE || *name == '&') 471 AV *av = (AV *)AvARRAY (padlist)[i--];
89 { /* lexical from outside? */ 472 I32 j = AvFILLp (av);
90 npad[ix] = SvREFCNT_inc (ppad[ix]); 473
91 } 474 while (j >= 0)
92 else 475 SvREFCNT_dec (AvARRAY (av)[j--]);
93 { /* our own lexical */ 476
94 SV *sv; 477 AvFILLp (av) = -1;
95 if (*name == '&') 478 SvREFCNT_dec (av);
96 sv = SvREFCNT_inc (ppad[ix]);
97 else if (*name == '@')
98 sv = (SV *) newAV ();
99 else if (*name == '%')
100 sv = (SV *) newHV ();
101 else
102 sv = NEWSV (0, 0);
103 if (!SvPADBUSY (sv))
104 SvPADMY_on (sv);
105 npad[ix] = sv;
106 }
107 } 479 }
108 else if (IS_PADGV (ppad[ix]) || IS_PADCONST (ppad[ix])) 480
109 { 481 SvREFCNT_dec (AvARRAY (padlist)[0]);
110 npad[ix] = SvREFCNT_inc (ppad[ix]); 482
111 } 483 AvFILLp (padlist) = -1;
484 SvREFCNT_dec ((SV*)padlist);
485 }
486}
487
488static int
489coro_cv_free (pTHX_ SV *sv, MAGIC *mg)
490{
491 AV *padlist;
492 AV *av = (AV *)mg->mg_obj;
493
494 /* perl manages to free our internal AV and _then_ call us */
495 if (IN_DESTRUCT)
496 return 0;
497
498 /* casting is fun. */
499 while (&PL_sv_undef != (SV *)(padlist = (AV *)av_pop (av)))
500 free_padlist (aTHX_ padlist);
501
502 SvREFCNT_dec (av); /* sv_magicext increased the refcount */
503
504 return 0;
505}
506
507static MGVTBL coro_cv_vtbl = {
508 0, 0, 0, 0,
509 coro_cv_free
510};
511
512/* the next two functions merely cache the padlists */
513static void
514get_padlist (pTHX_ CV *cv)
515{
516 MAGIC *mg = CORO_MAGIC_cv (cv);
517 AV *av;
518
519 if (expect_true (mg && AvFILLp ((av = (AV *)mg->mg_obj)) >= 0))
520 CvPADLIST (cv) = (AV *)AvARRAY (av)[AvFILLp (av)--];
112 else 521 else
113 { 522 {
114 SV *sv = NEWSV (0, 0); 523#if CORO_PREFER_PERL_FUNCTIONS
115 SvPADTMP_on (sv); 524 /* this is probably cleaner? but also slower! */
116 npad[ix] = sv; 525 /* in practise, it seems to be less stable */
117 } 526 CV *cp = Perl_cv_clone (aTHX_ cv);
118 } 527 CvPADLIST (cv) = CvPADLIST (cp);
119 528 CvPADLIST (cp) = 0;
120#if 0 /* NONOTUNDERSTOOD */ 529 SvREFCNT_dec (cp);
121 /* Now that vars are all in place, clone nested closures. */ 530#else
122 531 CvPADLIST (cv) = coro_derive_padlist (aTHX_ cv);
123 for (ix = fpad; ix > 0; ix--) {
124 SV* namesv = (ix <= fname) ? pname[ix] : Nullsv;
125 if (namesv
126 && namesv != &PL_sv_undef
127 && !(SvFLAGS(namesv) & SVf_FAKE)
128 && *SvPVX(namesv) == '&'
129 && CvCLONE(ppad[ix]))
130 {
131 CV *kid = cv_clone((CV*)ppad[ix]);
132 SvREFCNT_dec(ppad[ix]);
133 CvCLONE_on(kid);
134 SvPADMY_on(kid);
135 npad[ix] = (SV*)kid;
136 }
137 }
138#endif 532#endif
139 533 }
140 return newpadlist;
141} 534}
142 535
143STATIC AV * 536static void
144free_padlist (AV *padlist) 537put_padlist (pTHX_ CV *cv)
145{ 538{
146 /* may be during global destruction */ 539 MAGIC *mg = CORO_MAGIC_cv (cv);
147 if (SvREFCNT(padlist)) 540 AV *av;
541
542 if (expect_false (!mg))
543 mg = sv_magicext ((SV *)cv, (SV *)newAV (), CORO_MAGIC_type_cv, &coro_cv_vtbl, 0, 0);
544
545 av = (AV *)mg->mg_obj;
546
547 if (expect_false (AvFILLp (av) >= AvMAX (av)))
548 av_extend (av, AvFILLp (av) + 1);
549
550 AvARRAY (av)[++AvFILLp (av)] = (SV *)CvPADLIST (cv);
551}
552
553/** load & save, init *******************************************************/
554
555static void
556on_enterleave_call (pTHX_ SV *cb);
557
558static void
559load_perl (pTHX_ Coro__State c)
560{
561 perl_slots *slot = c->slot;
562 c->slot = 0;
563
564 PL_mainstack = c->mainstack;
565
566 GvSV (PL_defgv) = slot->defsv;
567 GvAV (PL_defgv) = slot->defav;
568 GvSV (PL_errgv) = slot->errsv;
569 GvSV (irsgv) = slot->irsgv;
570 GvHV (PL_hintgv) = slot->hinthv;
571
572 #define VAR(name,type) PL_ ## name = slot->name;
573 # include "state.h"
574 #undef VAR
575
148 { 576 {
149 I32 i = AvFILLp(padlist); 577 dSP;
150 while (i >= 0) 578
579 CV *cv;
580
581 /* now do the ugly restore mess */
582 while (expect_true (cv = (CV *)POPs))
151 { 583 {
152 SV **svp = av_fetch(padlist, i--, FALSE); 584 put_padlist (aTHX_ cv); /* mark this padlist as available */
153 SV *sv = svp ? *svp : Nullsv; 585 CvDEPTH (cv) = PTR2IV (POPs);
154 if (sv) 586 CvPADLIST (cv) = (AV *)POPs;
155 SvREFCNT_dec(sv);
156 } 587 }
157 588
158 SvREFCNT_dec((SV*)padlist); 589 PUTBACK;
159 } 590 }
160}
161 591
162/* the next tow functions merely cache the padlists */ 592 slf_frame = c->slf_frame;
163STATIC void 593 CORO_THROW = c->except;
164get_padlist (CV *cv)
165{
166 SV **he = hv_fetch (padlist_cache, (void *)&cv, sizeof (CV *), 0);
167 594
168 if (he && AvFILLp ((AV *)*he) >= 0) 595 if (expect_false (enable_times))
169 CvPADLIST (cv) = (AV *)av_pop ((AV *)*he);
170 else
171 CvPADLIST (cv) = clone_padlist (CvPADLIST (cv));
172}
173
174STATIC void
175put_padlist (CV *cv)
176{
177 SV **he = hv_fetch (padlist_cache, (void *)&cv, sizeof (CV *), 1);
178
179 if (SvTYPE (*he) != SVt_PVAV)
180 {
181 SvREFCNT_dec (*he);
182 *he = (SV *)newAV ();
183 } 596 {
597 if (expect_false (!times_valid))
598 coro_times_update ();
184 599
185 av_push ((AV *)*he, (SV *)CvPADLIST (cv)); 600 coro_times_sub (c);
186} 601 }
187 602
603 if (expect_false (c->on_enter))
604 {
605 int i;
606
607 for (i = 0; i <= AvFILLp (c->on_enter); ++i)
608 on_enterleave_call (aTHX_ AvARRAY (c->on_enter)[i]);
609 }
610}
611
188static void 612static void
189SAVE(pTHX_ Coro__State c) 613save_perl (pTHX_ Coro__State c)
190{ 614{
615 if (expect_false (c->on_leave))
616 {
617 int i;
618
619 for (i = AvFILLp (c->on_leave); i >= 0; --i)
620 on_enterleave_call (aTHX_ AvARRAY (c->on_leave)[i]);
621 }
622
623 times_valid = 0;
624
625 if (expect_false (enable_times))
626 {
627 coro_times_update (); times_valid = 1;
628 coro_times_add (c);
629 }
630
631 c->except = CORO_THROW;
632 c->slf_frame = slf_frame;
633
191 { 634 {
192 dSP; 635 dSP;
193 I32 cxix = cxstack_ix; 636 I32 cxix = cxstack_ix;
637 PERL_CONTEXT *ccstk = cxstack;
194 PERL_SI *top_si = PL_curstackinfo; 638 PERL_SI *top_si = PL_curstackinfo;
195 PERL_CONTEXT *ccstk = cxstack;
196 639
197 /* 640 /*
198 * the worst thing you can imagine happens first - we have to save 641 * the worst thing you can imagine happens first - we have to save
199 * (and reinitialize) all cv's in the whole callchain :( 642 * (and reinitialize) all cv's in the whole callchain :(
200 */ 643 */
201 644
202 PUSHs (Nullsv); 645 XPUSHs (Nullsv);
203 /* this loop was inspired by pp_caller */ 646 /* this loop was inspired by pp_caller */
204 for (;;) 647 for (;;)
205 { 648 {
206 while (cxix >= 0) 649 while (expect_true (cxix >= 0))
207 { 650 {
208 PERL_CONTEXT *cx = &ccstk[cxix--]; 651 PERL_CONTEXT *cx = &ccstk[cxix--];
209 652
210 if (CxTYPE(cx) == CXt_SUB) 653 if (expect_true (CxTYPE (cx) == CXt_SUB) || expect_false (CxTYPE (cx) == CXt_FORMAT))
211 { 654 {
212 CV *cv = cx->blk_sub.cv; 655 CV *cv = cx->blk_sub.cv;
656
213 if (CvDEPTH(cv)) 657 if (expect_true (CvDEPTH (cv)))
214 { 658 {
215#ifdef USE_THREADS
216 XPUSHs ((SV *)CvOWNER(cv));
217#endif
218 EXTEND (SP, 3); 659 EXTEND (SP, 3);
219 PUSHs ((SV *)CvDEPTH(cv));
220 PUSHs ((SV *)CvPADLIST(cv)); 660 PUSHs ((SV *)CvPADLIST (cv));
661 PUSHs (INT2PTR (SV *, (IV)CvDEPTH (cv)));
221 PUSHs ((SV *)cv); 662 PUSHs ((SV *)cv);
222 663
223 get_padlist (cv);
224
225 CvDEPTH(cv) = 0; 664 CvDEPTH (cv) = 0;
226#ifdef USE_THREADS 665 get_padlist (aTHX_ cv);
227 CvOWNER(cv) = 0;
228 error must unlock this cv etc.. etc...
229 if you are here wondering about this error message then
230 the reason is that it will not work as advertised yet
231#endif
232 } 666 }
233 } 667 }
234 else if (CxTYPE(cx) == CXt_FORMAT) 668 }
669
670 if (expect_true (top_si->si_type == PERLSI_MAIN))
671 break;
672
673 top_si = top_si->si_prev;
674 ccstk = top_si->si_cxstack;
675 cxix = top_si->si_cxix;
676 }
677
678 PUTBACK;
679 }
680
681 /* allocate some space on the context stack for our purposes */
682 /* we manually unroll here, as usually 2 slots is enough */
683 if (SLOT_COUNT >= 1) CXINC;
684 if (SLOT_COUNT >= 2) CXINC;
685 if (SLOT_COUNT >= 3) CXINC;
686 {
687 unsigned int i;
688 for (i = 3; i < SLOT_COUNT; ++i)
689 CXINC;
690 }
691 cxstack_ix -= SLOT_COUNT; /* undo allocation */
692
693 c->mainstack = PL_mainstack;
694
695 {
696 perl_slots *slot = c->slot = (perl_slots *)(cxstack + cxstack_ix + 1);
697
698 slot->defav = GvAV (PL_defgv);
699 slot->defsv = DEFSV;
700 slot->errsv = ERRSV;
701 slot->irsgv = GvSV (irsgv);
702 slot->hinthv = GvHV (PL_hintgv);
703
704 #define VAR(name,type) slot->name = PL_ ## name;
705 # include "state.h"
706 #undef VAR
707 }
708}
709
710/*
711 * allocate various perl stacks. This is almost an exact copy
712 * of perl.c:init_stacks, except that it uses less memory
713 * on the (sometimes correct) assumption that coroutines do
714 * not usually need a lot of stackspace.
715 */
716#if CORO_PREFER_PERL_FUNCTIONS
717# define coro_init_stacks(thx) init_stacks ()
718#else
719static void
720coro_init_stacks (pTHX)
721{
722 PL_curstackinfo = new_stackinfo(32, 8);
723 PL_curstackinfo->si_type = PERLSI_MAIN;
724 PL_curstack = PL_curstackinfo->si_stack;
725 PL_mainstack = PL_curstack; /* remember in case we switch stacks */
726
727 PL_stack_base = AvARRAY(PL_curstack);
728 PL_stack_sp = PL_stack_base;
729 PL_stack_max = PL_stack_base + AvMAX(PL_curstack);
730
731 New(50,PL_tmps_stack,32,SV*);
732 PL_tmps_floor = -1;
733 PL_tmps_ix = -1;
734 PL_tmps_max = 32;
735
736 New(54,PL_markstack,16,I32);
737 PL_markstack_ptr = PL_markstack;
738 PL_markstack_max = PL_markstack + 16;
739
740#ifdef SET_MARK_OFFSET
741 SET_MARK_OFFSET;
742#endif
743
744 New(54,PL_scopestack,8,I32);
745 PL_scopestack_ix = 0;
746 PL_scopestack_max = 8;
747
748 New(54,PL_savestack,24,ANY);
749 PL_savestack_ix = 0;
750 PL_savestack_max = 24;
751
752#if !PERL_VERSION_ATLEAST (5,10,0)
753 New(54,PL_retstack,4,OP*);
754 PL_retstack_ix = 0;
755 PL_retstack_max = 4;
756#endif
757}
758#endif
759
760/*
761 * destroy the stacks, the callchain etc...
762 */
763static void
764coro_destruct_stacks (pTHX)
765{
766 while (PL_curstackinfo->si_next)
767 PL_curstackinfo = PL_curstackinfo->si_next;
768
769 while (PL_curstackinfo)
770 {
771 PERL_SI *p = PL_curstackinfo->si_prev;
772
773 if (!IN_DESTRUCT)
774 SvREFCNT_dec (PL_curstackinfo->si_stack);
775
776 Safefree (PL_curstackinfo->si_cxstack);
777 Safefree (PL_curstackinfo);
778 PL_curstackinfo = p;
779 }
780
781 Safefree (PL_tmps_stack);
782 Safefree (PL_markstack);
783 Safefree (PL_scopestack);
784 Safefree (PL_savestack);
785#if !PERL_VERSION_ATLEAST (5,10,0)
786 Safefree (PL_retstack);
787#endif
788}
789
790#define CORO_RSS \
791 rss += sizeof (SYM (curstackinfo)); \
792 rss += (SYM (curstackinfo->si_cxmax) + 1) * sizeof (PERL_CONTEXT); \
793 rss += sizeof (SV) + sizeof (struct xpvav) + (1 + AvMAX (SYM (curstack))) * sizeof (SV *); \
794 rss += SYM (tmps_max) * sizeof (SV *); \
795 rss += (SYM (markstack_max) - SYM (markstack_ptr)) * sizeof (I32); \
796 rss += SYM (scopestack_max) * sizeof (I32); \
797 rss += SYM (savestack_max) * sizeof (ANY);
798
799static size_t
800coro_rss (pTHX_ struct coro *coro)
801{
802 size_t rss = sizeof (*coro);
803
804 if (coro->mainstack)
805 {
806 if (coro->flags & CF_RUNNING)
807 {
808 #define SYM(sym) PL_ ## sym
809 CORO_RSS;
810 #undef SYM
811 }
812 else
813 {
814 #define SYM(sym) coro->slot->sym
815 CORO_RSS;
816 #undef SYM
817 }
818 }
819
820 return rss;
821}
822
823/** coroutine stack handling ************************************************/
824
825static int (*orig_sigelem_get) (pTHX_ SV *sv, MAGIC *mg);
826static int (*orig_sigelem_set) (pTHX_ SV *sv, MAGIC *mg);
827static int (*orig_sigelem_clr) (pTHX_ SV *sv, MAGIC *mg);
828
829/* apparently < 5.8.8 */
830#ifndef MgPV_nolen_const
831#define MgPV_nolen_const(mg) (((((int)(mg)->mg_len)) == HEf_SVKEY) ? \
832 SvPV_nolen((SV*)((mg)->mg_ptr)) : \
833 (const char*)(mg)->mg_ptr)
834#endif
835
836/*
837 * This overrides the default magic get method of %SIG elements.
838 * The original one doesn't provide for reading back of PL_diehook/PL_warnhook
839 * and instead of trying to save and restore the hash elements, we just provide
840 * readback here.
841 */
842static int
843coro_sigelem_get (pTHX_ SV *sv, MAGIC *mg)
844{
845 const char *s = MgPV_nolen_const (mg);
846
847 if (*s == '_')
848 {
849 SV **svp = 0;
850
851 if (strEQ (s, "__DIE__" )) svp = &PL_diehook;
852 if (strEQ (s, "__WARN__")) svp = &PL_warnhook;
853
854 if (svp)
855 {
856 sv_setsv (sv, *svp ? *svp : &PL_sv_undef);
857 return 0;
858 }
859 }
860
861 return orig_sigelem_get ? orig_sigelem_get (aTHX_ sv, mg) : 0;
862}
863
864static int
865coro_sigelem_clr (pTHX_ SV *sv, MAGIC *mg)
866{
867 const char *s = MgPV_nolen_const (mg);
868
869 if (*s == '_')
870 {
871 SV **svp = 0;
872
873 if (strEQ (s, "__DIE__" )) svp = &PL_diehook;
874 if (strEQ (s, "__WARN__")) svp = &PL_warnhook;
875
876 if (svp)
877 {
878 SV *old = *svp;
879 *svp = 0;
880 SvREFCNT_dec (old);
881 return 0;
882 }
883 }
884
885 return orig_sigelem_clr ? orig_sigelem_clr (aTHX_ sv, mg) : 0;
886}
887
888static int
889coro_sigelem_set (pTHX_ SV *sv, MAGIC *mg)
890{
891 const char *s = MgPV_nolen_const (mg);
892
893 if (*s == '_')
894 {
895 SV **svp = 0;
896
897 if (strEQ (s, "__DIE__" )) svp = &PL_diehook;
898 if (strEQ (s, "__WARN__")) svp = &PL_warnhook;
899
900 if (svp)
901 {
902 SV *old = *svp;
903 *svp = SvOK (sv) ? newSVsv (sv) : 0;
904 SvREFCNT_dec (old);
905 return 0;
906 }
907 }
908
909 return orig_sigelem_set ? orig_sigelem_set (aTHX_ sv, mg) : 0;
910}
911
912static void
913prepare_nop (pTHX_ struct coro_transfer_args *ta)
914{
915 /* kind of mega-hacky, but works */
916 ta->next = ta->prev = (struct coro *)ta;
917}
918
919static int
920slf_check_nop (pTHX_ struct CoroSLF *frame)
921{
922 return 0;
923}
924
925static int
926slf_check_repeat (pTHX_ struct CoroSLF *frame)
927{
928 return 1;
929}
930
931static UNOP coro_setup_op;
932
933static void NOINLINE /* noinline to keep it out of the transfer fast path */
934coro_setup (pTHX_ struct coro *coro)
935{
936 /*
937 * emulate part of the perl startup here.
938 */
939 coro_init_stacks (aTHX);
940
941 PL_runops = RUNOPS_DEFAULT;
942 PL_curcop = &PL_compiling;
943 PL_in_eval = EVAL_NULL;
944 PL_comppad = 0;
945 PL_comppad_name = 0;
946 PL_comppad_name_fill = 0;
947 PL_comppad_name_floor = 0;
948 PL_curpm = 0;
949 PL_curpad = 0;
950 PL_localizing = 0;
951 PL_dirty = 0;
952 PL_restartop = 0;
953#if PERL_VERSION_ATLEAST (5,10,0)
954 PL_parser = 0;
955#endif
956 PL_hints = 0;
957
958 /* recreate the die/warn hooks */
959 PL_diehook = 0; SvSetMagicSV (*hv_fetch (hv_sig, "__DIE__" , sizeof ("__DIE__" ) - 1, 1), rv_diehook );
960 PL_warnhook = 0; SvSetMagicSV (*hv_fetch (hv_sig, "__WARN__", sizeof ("__WARN__") - 1, 1), rv_warnhook);
961
962 GvSV (PL_defgv) = newSV (0);
963 GvAV (PL_defgv) = coro->args; coro->args = 0;
964 GvSV (PL_errgv) = newSV (0);
965 GvSV (irsgv) = newSVpvn ("\n", 1); sv_magic (GvSV (irsgv), (SV *)irsgv, PERL_MAGIC_sv, "/", 0);
966 GvHV (PL_hintgv) = 0;
967 PL_rs = newSVsv (GvSV (irsgv));
968 PL_defoutgv = (GV *)SvREFCNT_inc_NN (stdoutgv);
969
970 {
971 dSP;
972 UNOP myop;
973
974 Zero (&myop, 1, UNOP);
975 myop.op_next = Nullop;
976 myop.op_type = OP_ENTERSUB;
977 myop.op_flags = OPf_WANT_VOID;
978
979 PUSHMARK (SP);
980 PUSHs ((SV *)coro->startcv);
981 PUTBACK;
982 PL_op = (OP *)&myop;
983 PL_op = PL_ppaddr[OP_ENTERSUB](aTHX);
984 }
985
986 /* this newly created coroutine might be run on an existing cctx which most
987 * likely was suspended in pp_slf, so we have to emulate entering pp_slf here.
988 */
989 slf_frame.prepare = prepare_nop; /* provide a nop function for an eventual pp_slf */
990 slf_frame.check = slf_check_nop; /* signal pp_slf to not repeat */
991
992 /* and we have to provide the pp_slf op in any case, so pp_slf can skip it */
993 coro_setup_op.op_next = PL_op;
994 coro_setup_op.op_type = OP_ENTERSUB;
995 coro_setup_op.op_ppaddr = pp_slf;
996 /* no flags etc. required, as an init function won't be called */
997
998 PL_op = (OP *)&coro_setup_op;
999
1000 /* copy throw, in case it was set before coro_setup */
1001 CORO_THROW = coro->except;
1002
1003 if (expect_false (enable_times))
1004 {
1005 coro_times_update ();
1006 coro_times_sub (coro);
1007 }
1008}
1009
1010static void
1011coro_unwind_stacks (pTHX)
1012{
1013 if (!IN_DESTRUCT)
1014 {
1015 /* restore all saved variables and stuff */
1016 LEAVE_SCOPE (0);
1017 assert (PL_tmps_floor == -1);
1018
1019 /* free all temporaries */
1020 FREETMPS;
1021 assert (PL_tmps_ix == -1);
1022
1023 /* unwind all extra stacks */
1024 POPSTACK_TO (PL_mainstack);
1025
1026 /* unwind main stack */
1027 dounwind (-1);
1028 }
1029}
1030
1031static void
1032coro_destruct_perl (pTHX_ struct coro *coro)
1033{
1034 SV *svf [9];
1035
1036 {
1037 struct coro *current = SvSTATE_current;
1038
1039 assert (("FATAL: tried to destroy currently running coroutine", coro->mainstack != PL_mainstack));
1040
1041 save_perl (aTHX_ current);
1042 load_perl (aTHX_ coro);
1043
1044 coro_unwind_stacks (aTHX);
1045 coro_destruct_stacks (aTHX);
1046
1047 // now save some sv's to be free'd later
1048 svf [0] = GvSV (PL_defgv);
1049 svf [1] = (SV *)GvAV (PL_defgv);
1050 svf [2] = GvSV (PL_errgv);
1051 svf [3] = (SV *)PL_defoutgv;
1052 svf [4] = PL_rs;
1053 svf [5] = GvSV (irsgv);
1054 svf [6] = (SV *)GvHV (PL_hintgv);
1055 svf [7] = PL_diehook;
1056 svf [8] = PL_warnhook;
1057 assert (9 == sizeof (svf) / sizeof (*svf));
1058
1059 load_perl (aTHX_ current);
1060 }
1061
1062 {
1063 unsigned int i;
1064
1065 for (i = 0; i < sizeof (svf) / sizeof (*svf); ++i)
1066 SvREFCNT_dec (svf [i]);
1067
1068 SvREFCNT_dec (coro->saved_deffh);
1069 SvREFCNT_dec (coro->rouse_cb);
1070 SvREFCNT_dec (coro->invoke_cb);
1071 SvREFCNT_dec (coro->invoke_av);
1072 }
1073}
1074
1075INLINE void
1076free_coro_mortal (pTHX)
1077{
1078 if (expect_true (coro_mortal))
1079 {
1080 SvREFCNT_dec (coro_mortal);
1081 coro_mortal = 0;
1082 }
1083}
1084
1085static int
1086runops_trace (pTHX)
1087{
1088 COP *oldcop = 0;
1089 int oldcxix = -2;
1090
1091 while ((PL_op = CALL_FPTR (PL_op->op_ppaddr) (aTHX)))
1092 {
1093 PERL_ASYNC_CHECK ();
1094
1095 if (cctx_current->flags & CC_TRACE_ALL)
1096 {
1097 if (PL_op->op_type == OP_LEAVESUB && cctx_current->flags & CC_TRACE_SUB)
1098 {
1099 PERL_CONTEXT *cx = &cxstack[cxstack_ix];
1100 SV **bot, **top;
1101 AV *av = newAV (); /* return values */
1102 SV **cb;
1103 dSP;
1104
1105 GV *gv = CvGV (cx->blk_sub.cv);
1106 SV *fullname = sv_2mortal (newSV (0));
1107 if (isGV (gv))
1108 gv_efullname3 (fullname, gv, 0);
1109
1110 bot = PL_stack_base + cx->blk_oldsp + 1;
1111 top = cx->blk_gimme == G_ARRAY ? SP + 1
1112 : cx->blk_gimme == G_SCALAR ? bot + 1
1113 : bot;
1114
1115 av_extend (av, top - bot);
1116 while (bot < top)
1117 av_push (av, SvREFCNT_inc_NN (*bot++));
1118
1119 PL_runops = RUNOPS_DEFAULT;
1120 ENTER;
1121 SAVETMPS;
1122 EXTEND (SP, 3);
1123 PUSHMARK (SP);
1124 PUSHs (&PL_sv_no);
1125 PUSHs (fullname);
1126 PUSHs (sv_2mortal (newRV_noinc ((SV *)av)));
1127 PUTBACK;
1128 cb = hv_fetch ((HV *)SvRV (coro_current), "_trace_sub_cb", sizeof ("_trace_sub_cb") - 1, 0);
1129 if (cb) call_sv (*cb, G_KEEPERR | G_EVAL | G_VOID | G_DISCARD);
1130 SPAGAIN;
1131 FREETMPS;
1132 LEAVE;
1133 PL_runops = runops_trace;
1134 }
1135
1136 if (oldcop != PL_curcop)
1137 {
1138 oldcop = PL_curcop;
1139
1140 if (PL_curcop != &PL_compiling)
1141 {
1142 SV **cb;
1143
1144 if (oldcxix != cxstack_ix && cctx_current->flags & CC_TRACE_SUB)
1145 {
1146 PERL_CONTEXT *cx = &cxstack[cxstack_ix];
1147
1148 if (CxTYPE (cx) == CXt_SUB && oldcxix < cxstack_ix)
1149 {
1150 dSP;
1151 GV *gv = CvGV (cx->blk_sub.cv);
1152 SV *fullname = sv_2mortal (newSV (0));
1153
1154 if (isGV (gv))
1155 gv_efullname3 (fullname, gv, 0);
1156
1157 PL_runops = RUNOPS_DEFAULT;
1158 ENTER;
1159 SAVETMPS;
1160 EXTEND (SP, 3);
1161 PUSHMARK (SP);
1162 PUSHs (&PL_sv_yes);
1163 PUSHs (fullname);
1164 PUSHs (CxHASARGS (cx) ? sv_2mortal (newRV_inc ((SV *)cx->blk_sub.argarray)) : &PL_sv_undef);
1165 PUTBACK;
1166 cb = hv_fetch ((HV *)SvRV (coro_current), "_trace_sub_cb", sizeof ("_trace_sub_cb") - 1, 0);
1167 if (cb) call_sv (*cb, G_KEEPERR | G_EVAL | G_VOID | G_DISCARD);
1168 SPAGAIN;
1169 FREETMPS;
1170 LEAVE;
1171 PL_runops = runops_trace;
1172 }
1173
1174 oldcxix = cxstack_ix;
1175 }
1176
1177 if (cctx_current->flags & CC_TRACE_LINE)
1178 {
1179 dSP;
1180
1181 PL_runops = RUNOPS_DEFAULT;
1182 ENTER;
1183 SAVETMPS;
1184 EXTEND (SP, 3);
1185 PL_runops = RUNOPS_DEFAULT;
1186 PUSHMARK (SP);
1187 PUSHs (sv_2mortal (newSVpv (OutCopFILE (oldcop), 0)));
1188 PUSHs (sv_2mortal (newSViv (CopLINE (oldcop))));
1189 PUTBACK;
1190 cb = hv_fetch ((HV *)SvRV (coro_current), "_trace_line_cb", sizeof ("_trace_line_cb") - 1, 0);
1191 if (cb) call_sv (*cb, G_KEEPERR | G_EVAL | G_VOID | G_DISCARD);
1192 SPAGAIN;
1193 FREETMPS;
1194 LEAVE;
1195 PL_runops = runops_trace;
1196 }
1197 }
1198 }
1199 }
1200 }
1201
1202 TAINT_NOT;
1203 return 0;
1204}
1205
1206static struct CoroSLF cctx_ssl_frame;
1207
1208static void
1209slf_prepare_set_stacklevel (pTHX_ struct coro_transfer_args *ta)
1210{
1211 ta->prev = 0;
1212}
1213
1214static int
1215slf_check_set_stacklevel (pTHX_ struct CoroSLF *frame)
1216{
1217 *frame = cctx_ssl_frame;
1218
1219 return frame->check (aTHX_ frame); /* execute the restored frame - there must be one */
1220}
1221
1222/* initialises PL_top_env and injects a pseudo-slf-call to set the stacklevel */
1223static void NOINLINE
1224cctx_prepare (pTHX)
1225{
1226 PL_top_env = &PL_start_env;
1227
1228 if (cctx_current->flags & CC_TRACE)
1229 PL_runops = runops_trace;
1230
1231 /* we already must be executing an SLF op, there is no other valid way
1232 * that can lead to creation of a new cctx */
1233 assert (("FATAL: can't prepare slf-less cctx in Coro module (please report)",
1234 slf_frame.prepare && PL_op->op_ppaddr == pp_slf));
1235
1236 /* we must emulate leaving pp_slf, which is done inside slf_check_set_stacklevel */
1237 cctx_ssl_frame = slf_frame;
1238
1239 slf_frame.prepare = slf_prepare_set_stacklevel;
1240 slf_frame.check = slf_check_set_stacklevel;
1241}
1242
1243/* the tail of transfer: execute stuff we can only do after a transfer */
1244INLINE void
1245transfer_tail (pTHX)
1246{
1247 free_coro_mortal (aTHX);
1248}
1249
1250/*
1251 * this is a _very_ stripped down perl interpreter ;)
1252 */
1253static void
1254cctx_run (void *arg)
1255{
1256#ifdef USE_ITHREADS
1257# if CORO_PTHREAD
1258 PERL_SET_CONTEXT (coro_thx);
1259# endif
1260#endif
1261 {
1262 dTHX;
1263
1264 /* normally we would need to skip the entersub here */
1265 /* not doing so will re-execute it, which is exactly what we want */
1266 /* PL_nop = PL_nop->op_next */
1267
1268 /* inject a fake subroutine call to cctx_init */
1269 cctx_prepare (aTHX);
1270
1271 /* cctx_run is the alternative tail of transfer() */
1272 transfer_tail (aTHX);
1273
1274 /* somebody or something will hit me for both perl_run and PL_restartop */
1275 PL_restartop = PL_op;
1276 perl_run (PL_curinterp);
1277 /*
1278 * Unfortunately, there is no way to get at the return values of the
1279 * coro body here, as perl_run destroys these. Likewise, we cannot catch
1280 * runtime errors here, as this is just a random interpreter, not a thread.
1281 */
1282
1283 /*
1284 * If perl-run returns we assume exit() was being called or the coro
1285 * fell off the end, which seems to be the only valid (non-bug)
1286 * reason for perl_run to return. We try to exit by jumping to the
1287 * bootstrap-time "top" top_env, as we cannot restore the "main"
1288 * coroutine as Coro has no such concept.
1289 * This actually isn't valid with the pthread backend, but OSes requiring
1290 * that backend are too broken to do it in a standards-compliant way.
1291 */
1292 PL_top_env = main_top_env;
1293 JMPENV_JUMP (2); /* I do not feel well about the hardcoded 2 at all */
1294 }
1295}
1296
1297static coro_cctx *
1298cctx_new ()
1299{
1300 coro_cctx *cctx;
1301
1302 ++cctx_count;
1303 New (0, cctx, 1, coro_cctx);
1304
1305 cctx->gen = cctx_gen;
1306 cctx->flags = 0;
1307 cctx->idle_sp = 0; /* can be accessed by transfer between cctx_run and set_stacklevel, on throw */
1308
1309 return cctx;
1310}
1311
1312/* create a new cctx only suitable as source */
1313static coro_cctx *
1314cctx_new_empty ()
1315{
1316 coro_cctx *cctx = cctx_new ();
1317
1318 cctx->sptr = 0;
1319 coro_create (&cctx->cctx, 0, 0, 0, 0);
1320
1321 return cctx;
1322}
1323
1324/* create a new cctx suitable as destination/running a perl interpreter */
1325static coro_cctx *
1326cctx_new_run ()
1327{
1328 coro_cctx *cctx = cctx_new ();
1329 void *stack_start;
1330 size_t stack_size;
1331
1332#if HAVE_MMAP
1333 cctx->ssize = ((cctx_stacksize * sizeof (long) + PAGESIZE - 1) / PAGESIZE + CORO_STACKGUARD) * PAGESIZE;
1334 /* mmap supposedly does allocate-on-write for us */
1335 cctx->sptr = mmap (0, cctx->ssize, PROT_EXEC|PROT_READ|PROT_WRITE, MAP_PRIVATE|MAP_ANONYMOUS, 0, 0);
1336
1337 if (cctx->sptr != (void *)-1)
1338 {
1339 #if CORO_STACKGUARD
1340 mprotect (cctx->sptr, CORO_STACKGUARD * PAGESIZE, PROT_NONE);
1341 #endif
1342 stack_start = (char *)cctx->sptr + CORO_STACKGUARD * PAGESIZE;
1343 stack_size = cctx->ssize - CORO_STACKGUARD * PAGESIZE;
1344 cctx->flags |= CC_MAPPED;
1345 }
1346 else
1347#endif
1348 {
1349 cctx->ssize = cctx_stacksize * (long)sizeof (long);
1350 New (0, cctx->sptr, cctx_stacksize, long);
1351
1352 if (!cctx->sptr)
1353 {
1354 perror ("FATAL: unable to allocate stack for coroutine, exiting.");
1355 _exit (EXIT_FAILURE);
1356 }
1357
1358 stack_start = cctx->sptr;
1359 stack_size = cctx->ssize;
1360 }
1361
1362 #if CORO_USE_VALGRIND
1363 cctx->valgrind_id = VALGRIND_STACK_REGISTER ((char *)stack_start, (char *)stack_start + stack_size);
1364 #endif
1365
1366 coro_create (&cctx->cctx, cctx_run, (void *)cctx, stack_start, stack_size);
1367
1368 return cctx;
1369}
1370
1371static void
1372cctx_destroy (coro_cctx *cctx)
1373{
1374 if (!cctx)
1375 return;
1376
1377 assert (("FATAL: tried to destroy current cctx", cctx != cctx_current));//D temporary?
1378
1379 --cctx_count;
1380 coro_destroy (&cctx->cctx);
1381
1382 /* coro_transfer creates new, empty cctx's */
1383 if (cctx->sptr)
1384 {
1385 #if CORO_USE_VALGRIND
1386 VALGRIND_STACK_DEREGISTER (cctx->valgrind_id);
1387 #endif
1388
1389#if HAVE_MMAP
1390 if (cctx->flags & CC_MAPPED)
1391 munmap (cctx->sptr, cctx->ssize);
1392 else
1393#endif
1394 Safefree (cctx->sptr);
1395 }
1396
1397 Safefree (cctx);
1398}
1399
1400/* wether this cctx should be destructed */
1401#define CCTX_EXPIRED(cctx) ((cctx)->gen != cctx_gen || ((cctx)->flags & CC_NOREUSE))
1402
1403static coro_cctx *
1404cctx_get (pTHX)
1405{
1406 while (expect_true (cctx_first))
1407 {
1408 coro_cctx *cctx = cctx_first;
1409 cctx_first = cctx->next;
1410 --cctx_idle;
1411
1412 if (expect_true (!CCTX_EXPIRED (cctx)))
1413 return cctx;
1414
1415 cctx_destroy (cctx);
1416 }
1417
1418 return cctx_new_run ();
1419}
1420
1421static void
1422cctx_put (coro_cctx *cctx)
1423{
1424 assert (("FATAL: cctx_put called on non-initialised cctx in Coro (please report)", cctx->sptr));
1425
1426 /* free another cctx if overlimit */
1427 if (expect_false (cctx_idle >= cctx_max_idle))
1428 {
1429 coro_cctx *first = cctx_first;
1430 cctx_first = first->next;
1431 --cctx_idle;
1432
1433 cctx_destroy (first);
1434 }
1435
1436 ++cctx_idle;
1437 cctx->next = cctx_first;
1438 cctx_first = cctx;
1439}
1440
1441/** coroutine switching *****************************************************/
1442
1443static void
1444transfer_check (pTHX_ struct coro *prev, struct coro *next)
1445{
1446 /* TODO: throwing up here is considered harmful */
1447
1448 if (expect_true (prev != next))
1449 {
1450 if (expect_false (!(prev->flags & (CF_RUNNING | CF_NEW))))
1451 croak ("Coro::State::transfer called with a blocked prev Coro::State, but can only transfer from running or new states,");
1452
1453 if (expect_false (next->flags & (CF_RUNNING | CF_DESTROYED | CF_SUSPENDED)))
1454 croak ("Coro::State::transfer called with running, destroyed or suspended next Coro::State, but can only transfer to inactive states,");
1455
1456#if !PERL_VERSION_ATLEAST (5,10,0)
1457 if (expect_false (PL_lex_state != LEX_NOTPARSING))
1458 croak ("Coro::State::transfer called while parsing, but this is not supported in your perl version,");
1459#endif
1460 }
1461}
1462
1463/* always use the TRANSFER macro */
1464static void NOINLINE /* noinline so we have a fixed stackframe */
1465transfer (pTHX_ struct coro *prev, struct coro *next, int force_cctx)
1466{
1467 dSTACKLEVEL;
1468
1469 /* sometimes transfer is only called to set idle_sp */
1470 if (expect_false (!prev))
1471 {
1472 cctx_current->idle_sp = STACKLEVEL;
1473 assert (cctx_current->idle_te = PL_top_env); /* just for the side-effect when asserts are enabled */
1474 }
1475 else if (expect_true (prev != next))
1476 {
1477 coro_cctx *cctx_prev;
1478
1479 if (expect_false (prev->flags & CF_NEW))
1480 {
1481 /* create a new empty/source context */
1482 prev->flags &= ~CF_NEW;
1483 prev->flags |= CF_RUNNING;
1484 }
1485
1486 prev->flags &= ~CF_RUNNING;
1487 next->flags |= CF_RUNNING;
1488
1489 /* first get rid of the old state */
1490 save_perl (aTHX_ prev);
1491
1492 if (expect_false (next->flags & CF_NEW))
1493 {
1494 /* need to start coroutine */
1495 next->flags &= ~CF_NEW;
1496 /* setup coroutine call */
1497 coro_setup (aTHX_ next);
1498 }
1499 else
1500 load_perl (aTHX_ next);
1501
1502 /* possibly untie and reuse the cctx */
1503 if (expect_true (
1504 cctx_current->idle_sp == STACKLEVEL
1505 && !(cctx_current->flags & CC_TRACE)
1506 && !force_cctx
1507 ))
1508 {
1509 /* I assume that stacklevel is a stronger indicator than PL_top_env changes */
1510 assert (("FATAL: current top_env must equal previous top_env in Coro (please report)", PL_top_env == cctx_current->idle_te));
1511
1512 /* if the cctx is about to be destroyed we need to make sure we won't see it in cctx_get. */
1513 /* without this the next cctx_get might destroy the running cctx while still in use */
1514 if (expect_false (CCTX_EXPIRED (cctx_current)))
1515 if (expect_true (!next->cctx))
1516 next->cctx = cctx_get (aTHX);
1517
1518 cctx_put (cctx_current);
1519 }
1520 else
1521 prev->cctx = cctx_current;
1522
1523 ++next->usecount;
1524
1525 cctx_prev = cctx_current;
1526 cctx_current = expect_false (next->cctx) ? next->cctx : cctx_get (aTHX);
1527
1528 next->cctx = 0;
1529
1530 if (expect_false (cctx_prev != cctx_current))
1531 {
1532 cctx_prev->top_env = PL_top_env;
1533 PL_top_env = cctx_current->top_env;
1534 coro_transfer (&cctx_prev->cctx, &cctx_current->cctx);
1535 }
1536
1537 transfer_tail (aTHX);
1538 }
1539}
1540
1541#define TRANSFER(ta, force_cctx) transfer (aTHX_ (ta).prev, (ta).next, (force_cctx))
1542#define TRANSFER_CHECK(ta) transfer_check (aTHX_ (ta).prev, (ta).next)
1543
1544/** high level stuff ********************************************************/
1545
1546static int
1547coro_state_destroy (pTHX_ struct coro *coro)
1548{
1549 if (coro->flags & CF_DESTROYED)
1550 return 0;
1551
1552 if (coro->on_destroy && !PL_dirty)
1553 coro->on_destroy (aTHX_ coro);
1554
1555 coro->flags |= CF_DESTROYED;
1556
1557 if (coro->flags & CF_READY)
1558 {
1559 /* reduce nready, as destroying a ready coro effectively unreadies it */
1560 /* alternative: look through all ready queues and remove the coro */
1561 --coro_nready;
1562 }
1563 else
1564 coro->flags |= CF_READY; /* make sure it is NOT put into the readyqueue */
1565
1566 if (coro->mainstack
1567 && coro->mainstack != main_mainstack
1568 && coro->slot
1569 && !PL_dirty)
1570 coro_destruct_perl (aTHX_ coro);
1571
1572 cctx_destroy (coro->cctx);
1573 SvREFCNT_dec (coro->startcv);
1574 SvREFCNT_dec (coro->args);
1575 SvREFCNT_dec (CORO_THROW);
1576
1577 if (coro->next) coro->next->prev = coro->prev;
1578 if (coro->prev) coro->prev->next = coro->next;
1579 if (coro == coro_first) coro_first = coro->next;
1580
1581 return 1;
1582}
1583
1584static int
1585coro_state_free (pTHX_ SV *sv, MAGIC *mg)
1586{
1587 struct coro *coro = (struct coro *)mg->mg_ptr;
1588 mg->mg_ptr = 0;
1589
1590 coro->hv = 0;
1591
1592 if (--coro->refcnt < 0)
1593 {
1594 coro_state_destroy (aTHX_ coro);
1595 Safefree (coro);
1596 }
1597
1598 return 0;
1599}
1600
1601static int
1602coro_state_dup (pTHX_ MAGIC *mg, CLONE_PARAMS *params)
1603{
1604 struct coro *coro = (struct coro *)mg->mg_ptr;
1605
1606 ++coro->refcnt;
1607
1608 return 0;
1609}
1610
1611static MGVTBL coro_state_vtbl = {
1612 0, 0, 0, 0,
1613 coro_state_free,
1614 0,
1615#ifdef MGf_DUP
1616 coro_state_dup,
1617#else
1618# define MGf_DUP 0
1619#endif
1620};
1621
1622static void
1623prepare_transfer (pTHX_ struct coro_transfer_args *ta, SV *prev_sv, SV *next_sv)
1624{
1625 ta->prev = SvSTATE (prev_sv);
1626 ta->next = SvSTATE (next_sv);
1627 TRANSFER_CHECK (*ta);
1628}
1629
1630static void
1631api_transfer (pTHX_ SV *prev_sv, SV *next_sv)
1632{
1633 struct coro_transfer_args ta;
1634
1635 prepare_transfer (aTHX_ &ta, prev_sv, next_sv);
1636 TRANSFER (ta, 1);
1637}
1638
1639/** Coro ********************************************************************/
1640
1641INLINE void
1642coro_enq (pTHX_ struct coro *coro)
1643{
1644 struct coro **ready = coro_ready [coro->prio - CORO_PRIO_MIN];
1645
1646 SvREFCNT_inc_NN (coro->hv);
1647
1648 coro->next_ready = 0;
1649 *(ready [0] ? &ready [1]->next_ready : &ready [0]) = coro;
1650 ready [1] = coro;
1651}
1652
1653INLINE struct coro *
1654coro_deq (pTHX)
1655{
1656 int prio;
1657
1658 for (prio = CORO_PRIO_MAX - CORO_PRIO_MIN + 1; --prio >= 0; )
1659 {
1660 struct coro **ready = coro_ready [prio];
1661
1662 if (ready [0])
1663 {
1664 struct coro *coro = ready [0];
1665 ready [0] = coro->next_ready;
1666 return coro;
1667 }
1668 }
1669
1670 return 0;
1671}
1672
1673static void
1674invoke_sv_ready_hook_helper (void)
1675{
1676 dTHX;
1677 dSP;
1678
1679 ENTER;
1680 SAVETMPS;
1681
1682 PUSHMARK (SP);
1683 PUTBACK;
1684 call_sv (coro_readyhook, G_VOID | G_DISCARD);
1685
1686 FREETMPS;
1687 LEAVE;
1688}
1689
1690static int
1691api_ready (pTHX_ SV *coro_sv)
1692{
1693 struct coro *coro = SvSTATE (coro_sv);
1694
1695 if (coro->flags & CF_READY)
1696 return 0;
1697
1698 coro->flags |= CF_READY;
1699
1700 coro_enq (aTHX_ coro);
1701
1702 if (!coro_nready++)
1703 if (coroapi.readyhook)
1704 coroapi.readyhook ();
1705
1706 return 1;
1707}
1708
1709static int
1710api_is_ready (pTHX_ SV *coro_sv)
1711{
1712 return !!(SvSTATE (coro_sv)->flags & CF_READY);
1713}
1714
1715/* expects to own a reference to next->hv */
1716INLINE void
1717prepare_schedule_to (pTHX_ struct coro_transfer_args *ta, struct coro *next)
1718{
1719 SV *prev_sv = SvRV (coro_current);
1720
1721 ta->prev = SvSTATE_hv (prev_sv);
1722 ta->next = next;
1723
1724 TRANSFER_CHECK (*ta);
1725
1726 SvRV_set (coro_current, (SV *)next->hv);
1727
1728 free_coro_mortal (aTHX);
1729 coro_mortal = prev_sv;
1730}
1731
1732static void
1733prepare_schedule (pTHX_ struct coro_transfer_args *ta)
1734{
1735 for (;;)
1736 {
1737 struct coro *next = coro_deq (aTHX);
1738
1739 if (expect_true (next))
1740 {
1741 /* cannot transfer to destroyed coros, skip and look for next */
1742 if (expect_false (next->flags & (CF_DESTROYED | CF_SUSPENDED)))
1743 SvREFCNT_dec (next->hv); /* coro_nready has already been taken care of by destroy */
1744 else
1745 {
1746 next->flags &= ~CF_READY;
1747 --coro_nready;
1748
1749 prepare_schedule_to (aTHX_ ta, next);
1750 break;
1751 }
1752 }
1753 else
1754 {
1755 /* nothing to schedule: call the idle handler */
1756 if (SvROK (sv_idle)
1757 && SvOBJECT (SvRV (sv_idle)))
1758 {
1759 ++coro_nready; /* hack so that api_ready doesn't invoke ready hook */
1760 api_ready (aTHX_ SvRV (sv_idle));
1761 --coro_nready;
1762 }
1763 else
1764 {
1765 /* TODO: deprecated, remove, cannot work reliably *//*D*/
1766 dSP;
1767
1768 ENTER;
1769 SAVETMPS;
1770
1771 PUSHMARK (SP);
1772 PUTBACK;
1773 call_sv (sv_idle, G_VOID | G_DISCARD);
1774
1775 FREETMPS;
1776 LEAVE;
1777 }
1778 }
1779 }
1780}
1781
1782INLINE void
1783prepare_cede (pTHX_ struct coro_transfer_args *ta)
1784{
1785 api_ready (aTHX_ coro_current);
1786 prepare_schedule (aTHX_ ta);
1787}
1788
1789INLINE void
1790prepare_cede_notself (pTHX_ struct coro_transfer_args *ta)
1791{
1792 SV *prev = SvRV (coro_current);
1793
1794 if (coro_nready)
1795 {
1796 prepare_schedule (aTHX_ ta);
1797 api_ready (aTHX_ prev);
1798 }
1799 else
1800 prepare_nop (aTHX_ ta);
1801}
1802
1803static void
1804api_schedule (pTHX)
1805{
1806 struct coro_transfer_args ta;
1807
1808 prepare_schedule (aTHX_ &ta);
1809 TRANSFER (ta, 1);
1810}
1811
1812static void
1813api_schedule_to (pTHX_ SV *coro_sv)
1814{
1815 struct coro_transfer_args ta;
1816 struct coro *next = SvSTATE (coro_sv);
1817
1818 SvREFCNT_inc_NN (coro_sv);
1819 prepare_schedule_to (aTHX_ &ta, next);
1820}
1821
1822static int
1823api_cede (pTHX)
1824{
1825 struct coro_transfer_args ta;
1826
1827 prepare_cede (aTHX_ &ta);
1828
1829 if (expect_true (ta.prev != ta.next))
1830 {
1831 TRANSFER (ta, 1);
1832 return 1;
1833 }
1834 else
1835 return 0;
1836}
1837
1838static int
1839api_cede_notself (pTHX)
1840{
1841 if (coro_nready)
1842 {
1843 struct coro_transfer_args ta;
1844
1845 prepare_cede_notself (aTHX_ &ta);
1846 TRANSFER (ta, 1);
1847 return 1;
1848 }
1849 else
1850 return 0;
1851}
1852
1853static void
1854api_trace (pTHX_ SV *coro_sv, int flags)
1855{
1856 struct coro *coro = SvSTATE (coro_sv);
1857
1858 if (coro->flags & CF_RUNNING)
1859 croak ("cannot enable tracing on a running coroutine, caught");
1860
1861 if (flags & CC_TRACE)
1862 {
1863 if (!coro->cctx)
1864 coro->cctx = cctx_new_run ();
1865 else if (!(coro->cctx->flags & CC_TRACE))
1866 croak ("cannot enable tracing on coroutine with custom stack, caught");
1867
1868 coro->cctx->flags |= CC_NOREUSE | (flags & (CC_TRACE | CC_TRACE_ALL));
1869 }
1870 else if (coro->cctx && coro->cctx->flags & CC_TRACE)
1871 {
1872 coro->cctx->flags &= ~(CC_TRACE | CC_TRACE_ALL);
1873
1874 if (coro->flags & CF_RUNNING)
1875 PL_runops = RUNOPS_DEFAULT;
1876 else
1877 coro->slot->runops = RUNOPS_DEFAULT;
1878 }
1879}
1880
1881static void
1882coro_call_on_destroy (pTHX_ struct coro *coro)
1883{
1884 SV **on_destroyp = hv_fetch (coro->hv, "_on_destroy", sizeof ("_on_destroy") - 1, 0);
1885 SV **statusp = hv_fetch (coro->hv, "_status", sizeof ("_status") - 1, 0);
1886
1887 if (on_destroyp)
1888 {
1889 AV *on_destroy = (AV *)SvRV (*on_destroyp);
1890
1891 while (AvFILLp (on_destroy) >= 0)
1892 {
1893 dSP; /* don't disturb outer sp */
1894 SV *cb = av_pop (on_destroy);
1895
1896 PUSHMARK (SP);
1897
1898 if (statusp)
1899 {
1900 int i;
1901 AV *status = (AV *)SvRV (*statusp);
1902 EXTEND (SP, AvFILLp (status) + 1);
1903
1904 for (i = 0; i <= AvFILLp (status); ++i)
1905 PUSHs (AvARRAY (status)[i]);
1906 }
1907
1908 PUTBACK;
1909 call_sv (sv_2mortal (cb), G_VOID | G_DISCARD);
1910 }
1911 }
1912}
1913
1914static void
1915slf_init_terminate (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
1916{
1917 int i;
1918 HV *hv = (HV *)SvRV (coro_current);
1919 AV *av = newAV ();
1920
1921 /* items are actually not so common, so optimise for this case */
1922 if (items)
1923 {
1924 av_extend (av, items - 1);
1925
1926 for (i = 0; i < items; ++i)
1927 av_push (av, SvREFCNT_inc_NN (arg [i]));
1928 }
1929
1930 hv_store (hv, "_status", sizeof ("_status") - 1, newRV_noinc ((SV *)av), 0);
1931
1932 av_push (av_destroy, (SV *)newRV_inc ((SV *)hv)); /* RVinc for perl */
1933 api_ready (aTHX_ sv_manager);
1934
1935 frame->prepare = prepare_schedule;
1936 frame->check = slf_check_repeat;
1937
1938 /* as a minor optimisation, we could unwind all stacks here */
1939 /* but that puts extra pressure on pp_slf, and is not worth much */
1940 /*coro_unwind_stacks (aTHX);*/
1941}
1942
1943/*****************************************************************************/
1944/* async pool handler */
1945
1946static int
1947slf_check_pool_handler (pTHX_ struct CoroSLF *frame)
1948{
1949 HV *hv = (HV *)SvRV (coro_current);
1950 struct coro *coro = (struct coro *)frame->data;
1951
1952 if (!coro->invoke_cb)
1953 return 1; /* loop till we have invoke */
1954 else
1955 {
1956 hv_store (hv, "desc", sizeof ("desc") - 1,
1957 newSVpvn ("[async_pool]", sizeof ("[async_pool]") - 1), 0);
1958
1959 coro->saved_deffh = SvREFCNT_inc_NN ((SV *)PL_defoutgv);
1960
1961 {
1962 dSP;
1963 XPUSHs (sv_2mortal (coro->invoke_cb)); coro->invoke_cb = 0;
1964 PUTBACK;
1965 }
1966
1967 SvREFCNT_dec (GvAV (PL_defgv));
1968 GvAV (PL_defgv) = coro->invoke_av;
1969 coro->invoke_av = 0;
1970
1971 return 0;
1972 }
1973}
1974
1975static void
1976slf_init_pool_handler (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
1977{
1978 HV *hv = (HV *)SvRV (coro_current);
1979 struct coro *coro = SvSTATE_hv ((SV *)hv);
1980
1981 if (expect_true (coro->saved_deffh))
1982 {
1983 /* subsequent iteration */
1984 SvREFCNT_dec ((SV *)PL_defoutgv); PL_defoutgv = (GV *)coro->saved_deffh;
1985 coro->saved_deffh = 0;
1986
1987 if (coro_rss (aTHX_ coro) > SvUV (sv_pool_rss)
1988 || av_len (av_async_pool) + 1 >= SvIV (sv_pool_size))
1989 {
1990 coro->invoke_cb = SvREFCNT_inc_NN ((SV *)cv_coro_terminate);
1991 coro->invoke_av = newAV ();
1992
1993 frame->prepare = prepare_nop;
1994 }
1995 else
1996 {
1997 av_clear (GvAV (PL_defgv));
1998 hv_store (hv, "desc", sizeof ("desc") - 1, SvREFCNT_inc_NN (sv_async_pool_idle), 0);
1999
2000 coro->prio = 0;
2001
2002 if (coro->cctx && (coro->cctx->flags & CC_TRACE))
2003 api_trace (aTHX_ coro_current, 0);
2004
2005 frame->prepare = prepare_schedule;
2006 av_push (av_async_pool, SvREFCNT_inc (hv));
2007 }
2008 }
2009 else
2010 {
2011 /* first iteration, simply fall through */
2012 frame->prepare = prepare_nop;
2013 }
2014
2015 frame->check = slf_check_pool_handler;
2016 frame->data = (void *)coro;
2017}
2018
2019/*****************************************************************************/
2020/* rouse callback */
2021
2022#define CORO_MAGIC_type_rouse PERL_MAGIC_ext
2023
2024static void
2025coro_rouse_callback (pTHX_ CV *cv)
2026{
2027 dXSARGS;
2028 SV *data = (SV *)S_GENSUB_ARG;
2029
2030 if (SvTYPE (SvRV (data)) != SVt_PVAV)
2031 {
2032 /* first call, set args */
2033 SV *coro = SvRV (data);
2034 AV *av = newAV ();
2035
2036 SvRV_set (data, (SV *)av);
2037
2038 /* better take a full copy of the arguments */
2039 while (items--)
2040 av_store (av, items, newSVsv (ST (items)));
2041
2042 api_ready (aTHX_ coro);
2043 SvREFCNT_dec (coro);
2044 }
2045
2046 XSRETURN_EMPTY;
2047}
2048
2049static int
2050slf_check_rouse_wait (pTHX_ struct CoroSLF *frame)
2051{
2052 SV *data = (SV *)frame->data;
2053
2054 if (CORO_THROW)
2055 return 0;
2056
2057 if (SvTYPE (SvRV (data)) != SVt_PVAV)
2058 return 1;
2059
2060 /* now push all results on the stack */
2061 {
2062 dSP;
2063 AV *av = (AV *)SvRV (data);
2064 int i;
2065
2066 EXTEND (SP, AvFILLp (av) + 1);
2067 for (i = 0; i <= AvFILLp (av); ++i)
2068 PUSHs (sv_2mortal (AvARRAY (av)[i]));
2069
2070 /* we have stolen the elements, so set length to zero and free */
2071 AvFILLp (av) = -1;
2072 av_undef (av);
2073
2074 PUTBACK;
2075 }
2076
2077 return 0;
2078}
2079
2080static void
2081slf_init_rouse_wait (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2082{
2083 SV *cb;
2084
2085 if (items)
2086 cb = arg [0];
2087 else
2088 {
2089 struct coro *coro = SvSTATE_current;
2090
2091 if (!coro->rouse_cb)
2092 croak ("Coro::rouse_wait called without rouse callback, and no default rouse callback found either,");
2093
2094 cb = sv_2mortal (coro->rouse_cb);
2095 coro->rouse_cb = 0;
2096 }
2097
2098 if (!SvROK (cb)
2099 || SvTYPE (SvRV (cb)) != SVt_PVCV
2100 || CvXSUB ((CV *)SvRV (cb)) != coro_rouse_callback)
2101 croak ("Coro::rouse_wait called with illegal callback argument,");
2102
2103 {
2104 CV *cv = (CV *)SvRV (cb); /* for S_GENSUB_ARG */
2105 SV *data = (SV *)S_GENSUB_ARG;
2106
2107 frame->data = (void *)data;
2108 frame->prepare = SvTYPE (SvRV (data)) == SVt_PVAV ? prepare_nop : prepare_schedule;
2109 frame->check = slf_check_rouse_wait;
2110 }
2111}
2112
2113static SV *
2114coro_new_rouse_cb (pTHX)
2115{
2116 HV *hv = (HV *)SvRV (coro_current);
2117 struct coro *coro = SvSTATE_hv (hv);
2118 SV *data = newRV_inc ((SV *)hv);
2119 SV *cb = s_gensub (aTHX_ coro_rouse_callback, (void *)data);
2120
2121 sv_magicext (SvRV (cb), data, CORO_MAGIC_type_rouse, 0, 0, 0);
2122 SvREFCNT_dec (data); /* magicext increases the refcount */
2123
2124 SvREFCNT_dec (coro->rouse_cb);
2125 coro->rouse_cb = SvREFCNT_inc_NN (cb);
2126
2127 return cb;
2128}
2129
2130/*****************************************************************************/
2131/* schedule-like-function opcode (SLF) */
2132
2133static UNOP slf_restore; /* restore stack as entersub did, for first-re-run */
2134static const CV *slf_cv;
2135static SV **slf_argv;
2136static int slf_argc, slf_arga; /* count, allocated */
2137static I32 slf_ax; /* top of stack, for restore */
2138
2139/* this restores the stack in the case we patched the entersub, to */
2140/* recreate the stack frame as perl will on following calls */
2141/* since entersub cleared the stack */
2142static OP *
2143pp_restore (pTHX)
2144{
2145 int i;
2146 SV **SP = PL_stack_base + slf_ax;
2147
2148 PUSHMARK (SP);
2149
2150 EXTEND (SP, slf_argc + 1);
2151
2152 for (i = 0; i < slf_argc; ++i)
2153 PUSHs (sv_2mortal (slf_argv [i]));
2154
2155 PUSHs ((SV *)CvGV (slf_cv));
2156
2157 RETURNOP (slf_restore.op_first);
2158}
2159
2160static void
2161slf_prepare_transfer (pTHX_ struct coro_transfer_args *ta)
2162{
2163 SV **arg = (SV **)slf_frame.data;
2164
2165 prepare_transfer (aTHX_ ta, arg [0], arg [1]);
2166}
2167
2168static void
2169slf_init_transfer (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2170{
2171 if (items != 2)
2172 croak ("Coro::State::transfer (prev, next) expects two arguments, not %d,", items);
2173
2174 frame->prepare = slf_prepare_transfer;
2175 frame->check = slf_check_nop;
2176 frame->data = (void *)arg; /* let's hope it will stay valid */
2177}
2178
2179static void
2180slf_init_schedule (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2181{
2182 frame->prepare = prepare_schedule;
2183 frame->check = slf_check_nop;
2184}
2185
2186static void
2187slf_prepare_schedule_to (pTHX_ struct coro_transfer_args *ta)
2188{
2189 struct coro *next = (struct coro *)slf_frame.data;
2190
2191 SvREFCNT_inc_NN (next->hv);
2192 prepare_schedule_to (aTHX_ ta, next);
2193}
2194
2195static void
2196slf_init_schedule_to (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2197{
2198 if (!items)
2199 croak ("Coro::schedule_to expects a coroutine argument, caught");
2200
2201 frame->data = (void *)SvSTATE (arg [0]);
2202 frame->prepare = slf_prepare_schedule_to;
2203 frame->check = slf_check_nop;
2204}
2205
2206static void
2207slf_init_cede_to (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2208{
2209 api_ready (aTHX_ SvRV (coro_current));
2210
2211 slf_init_schedule_to (aTHX_ frame, cv, arg, items);
2212}
2213
2214static void
2215slf_init_cede (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2216{
2217 frame->prepare = prepare_cede;
2218 frame->check = slf_check_nop;
2219}
2220
2221static void
2222slf_init_cede_notself (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2223{
2224 frame->prepare = prepare_cede_notself;
2225 frame->check = slf_check_nop;
2226}
2227
2228/*
2229 * these not obviously related functions are all rolled into one
2230 * function to increase chances that they all will call transfer with the same
2231 * stack offset
2232 * SLF stands for "schedule-like-function".
2233 */
2234static OP *
2235pp_slf (pTHX)
2236{
2237 I32 checkmark; /* mark SP to see how many elements check has pushed */
2238
2239 /* set up the slf frame, unless it has already been set-up */
2240 /* the latter happens when a new coro has been started */
2241 /* or when a new cctx was attached to an existing coroutine */
2242 if (expect_true (!slf_frame.prepare))
2243 {
2244 /* first iteration */
2245 dSP;
2246 SV **arg = PL_stack_base + TOPMARK + 1;
2247 int items = SP - arg; /* args without function object */
2248 SV *gv = *sp;
2249
2250 /* do a quick consistency check on the "function" object, and if it isn't */
2251 /* for us, divert to the real entersub */
2252 if (SvTYPE (gv) != SVt_PVGV
2253 || !GvCV (gv)
2254 || !(CvFLAGS (GvCV (gv)) & CVf_SLF))
2255 return PL_ppaddr[OP_ENTERSUB](aTHX);
2256
2257 if (!(PL_op->op_flags & OPf_STACKED))
2258 {
2259 /* ampersand-form of call, use @_ instead of stack */
2260 AV *av = GvAV (PL_defgv);
2261 arg = AvARRAY (av);
2262 items = AvFILLp (av) + 1;
2263 }
2264
2265 /* now call the init function, which needs to set up slf_frame */
2266 ((coro_slf_cb)CvXSUBANY (GvCV (gv)).any_ptr)
2267 (aTHX_ &slf_frame, GvCV (gv), arg, items);
2268
2269 /* pop args */
2270 SP = PL_stack_base + POPMARK;
2271
2272 PUTBACK;
2273 }
2274
2275 /* now that we have a slf_frame, interpret it! */
2276 /* we use a callback system not to make the code needlessly */
2277 /* complicated, but so we can run multiple perl coros from one cctx */
2278
2279 do
2280 {
2281 struct coro_transfer_args ta;
2282
2283 slf_frame.prepare (aTHX_ &ta);
2284 TRANSFER (ta, 0);
2285
2286 checkmark = PL_stack_sp - PL_stack_base;
2287 }
2288 while (slf_frame.check (aTHX_ &slf_frame));
2289
2290 slf_frame.prepare = 0; /* invalidate the frame, we are done processing it */
2291
2292 /* exception handling */
2293 if (expect_false (CORO_THROW))
2294 {
2295 SV *exception = sv_2mortal (CORO_THROW);
2296
2297 CORO_THROW = 0;
2298 sv_setsv (ERRSV, exception);
2299 croak (0);
2300 }
2301
2302 /* return value handling - mostly like entersub */
2303 /* make sure we put something on the stack in scalar context */
2304 if (GIMME_V == G_SCALAR)
2305 {
2306 dSP;
2307 SV **bot = PL_stack_base + checkmark;
2308
2309 if (sp == bot) /* too few, push undef */
2310 bot [1] = &PL_sv_undef;
2311 else if (sp != bot + 1) /* too many, take last one */
2312 bot [1] = *sp;
2313
2314 SP = bot + 1;
2315
2316 PUTBACK;
2317 }
2318
2319 return NORMAL;
2320}
2321
2322static void
2323api_execute_slf (pTHX_ CV *cv, coro_slf_cb init_cb, I32 ax)
2324{
2325 int i;
2326 SV **arg = PL_stack_base + ax;
2327 int items = PL_stack_sp - arg + 1;
2328
2329 assert (("FATAL: SLF call with illegal CV value", !CvANON (cv)));
2330
2331 if (PL_op->op_ppaddr != PL_ppaddr [OP_ENTERSUB]
2332 && PL_op->op_ppaddr != pp_slf)
2333 croak ("FATAL: Coro SLF calls can only be made normally, not via goto or any other means, caught");
2334
2335 CvFLAGS (cv) |= CVf_SLF;
2336 CvNODEBUG_on (cv);
2337 CvXSUBANY (cv).any_ptr = (void *)init_cb;
2338 slf_cv = cv;
2339
2340 /* we patch the op, and then re-run the whole call */
2341 /* we have to put the same argument on the stack for this to work */
2342 /* and this will be done by pp_restore */
2343 slf_restore.op_next = (OP *)&slf_restore;
2344 slf_restore.op_type = OP_CUSTOM;
2345 slf_restore.op_ppaddr = pp_restore;
2346 slf_restore.op_first = PL_op;
2347
2348 slf_ax = ax - 1; /* undo the ax++ inside dAXMARK */
2349
2350 if (PL_op->op_flags & OPf_STACKED)
2351 {
2352 if (items > slf_arga)
2353 {
2354 slf_arga = items;
2355 free (slf_argv);
2356 slf_argv = malloc (slf_arga * sizeof (SV *));
2357 }
2358
2359 slf_argc = items;
2360
2361 for (i = 0; i < items; ++i)
2362 slf_argv [i] = SvREFCNT_inc (arg [i]);
2363 }
2364 else
2365 slf_argc = 0;
2366
2367 PL_op->op_ppaddr = pp_slf;
2368 /*PL_op->op_type = OP_CUSTOM; /* we do behave like entersub still */
2369
2370 PL_op = (OP *)&slf_restore;
2371}
2372
2373/*****************************************************************************/
2374/* dynamic wind */
2375
2376static void
2377on_enterleave_call (pTHX_ SV *cb)
2378{
2379 dSP;
2380
2381 PUSHSTACK;
2382
2383 PUSHMARK (SP);
2384 PUTBACK;
2385 call_sv (cb, G_VOID | G_DISCARD);
2386 SPAGAIN;
2387
2388 POPSTACK;
2389}
2390
2391static SV *
2392coro_avp_pop_and_free (pTHX_ AV **avp)
2393{
2394 AV *av = *avp;
2395 SV *res = av_pop (av);
2396
2397 if (AvFILLp (av) < 0)
2398 {
2399 *avp = 0;
2400 SvREFCNT_dec (av);
2401 }
2402
2403 return res;
2404}
2405
2406static void
2407coro_pop_on_enter (pTHX_ void *coro)
2408{
2409 SV *cb = coro_avp_pop_and_free (aTHX_ &((struct coro *)coro)->on_enter);
2410 SvREFCNT_dec (cb);
2411}
2412
2413static void
2414coro_pop_on_leave (pTHX_ void *coro)
2415{
2416 SV *cb = coro_avp_pop_and_free (aTHX_ &((struct coro *)coro)->on_leave);
2417 on_enterleave_call (aTHX_ sv_2mortal (cb));
2418}
2419
2420/*****************************************************************************/
2421/* PerlIO::cede */
2422
2423typedef struct
2424{
2425 PerlIOBuf base;
2426 NV next, every;
2427} PerlIOCede;
2428
2429static IV
2430PerlIOCede_pushed (pTHX_ PerlIO *f, const char *mode, SV *arg, PerlIO_funcs *tab)
2431{
2432 PerlIOCede *self = PerlIOSelf (f, PerlIOCede);
2433
2434 self->every = SvCUR (arg) ? SvNV (arg) : 0.01;
2435 self->next = nvtime () + self->every;
2436
2437 return PerlIOBuf_pushed (aTHX_ f, mode, Nullsv, tab);
2438}
2439
2440static SV *
2441PerlIOCede_getarg (pTHX_ PerlIO *f, CLONE_PARAMS *param, int flags)
2442{
2443 PerlIOCede *self = PerlIOSelf (f, PerlIOCede);
2444
2445 return newSVnv (self->every);
2446}
2447
2448static IV
2449PerlIOCede_flush (pTHX_ PerlIO *f)
2450{
2451 PerlIOCede *self = PerlIOSelf (f, PerlIOCede);
2452 double now = nvtime ();
2453
2454 if (now >= self->next)
2455 {
2456 api_cede (aTHX);
2457 self->next = now + self->every;
2458 }
2459
2460 return PerlIOBuf_flush (aTHX_ f);
2461}
2462
2463static PerlIO_funcs PerlIO_cede =
2464{
2465 sizeof(PerlIO_funcs),
2466 "cede",
2467 sizeof(PerlIOCede),
2468 PERLIO_K_DESTRUCT | PERLIO_K_RAW,
2469 PerlIOCede_pushed,
2470 PerlIOBuf_popped,
2471 PerlIOBuf_open,
2472 PerlIOBase_binmode,
2473 PerlIOCede_getarg,
2474 PerlIOBase_fileno,
2475 PerlIOBuf_dup,
2476 PerlIOBuf_read,
2477 PerlIOBuf_unread,
2478 PerlIOBuf_write,
2479 PerlIOBuf_seek,
2480 PerlIOBuf_tell,
2481 PerlIOBuf_close,
2482 PerlIOCede_flush,
2483 PerlIOBuf_fill,
2484 PerlIOBase_eof,
2485 PerlIOBase_error,
2486 PerlIOBase_clearerr,
2487 PerlIOBase_setlinebuf,
2488 PerlIOBuf_get_base,
2489 PerlIOBuf_bufsiz,
2490 PerlIOBuf_get_ptr,
2491 PerlIOBuf_get_cnt,
2492 PerlIOBuf_set_ptrcnt,
2493};
2494
2495/*****************************************************************************/
2496/* Coro::Semaphore & Coro::Signal */
2497
2498static SV *
2499coro_waitarray_new (pTHX_ int count)
2500{
2501 /* a waitarray=semaphore contains a counter IV in $sem->[0] and any waiters after that */
2502 AV *av = newAV ();
2503 SV **ary;
2504
2505 /* unfortunately, building manually saves memory */
2506 Newx (ary, 2, SV *);
2507 AvALLOC (av) = ary;
2508#if PERL_VERSION_ATLEAST (5,10,0)
2509 AvARRAY (av) = ary;
2510#else
2511 /* 5.8.8 needs this syntax instead of AvARRAY = ary, yet */
2512 /* -DDEBUGGING flags this as a bug, despite it perfectly working */
2513 SvPVX ((SV *)av) = (char *)ary;
2514#endif
2515 AvMAX (av) = 1;
2516 AvFILLp (av) = 0;
2517 ary [0] = newSViv (count);
2518
2519 return newRV_noinc ((SV *)av);
2520}
2521
2522/* semaphore */
2523
2524static void
2525coro_semaphore_adjust (pTHX_ AV *av, IV adjust)
2526{
2527 SV *count_sv = AvARRAY (av)[0];
2528 IV count = SvIVX (count_sv);
2529
2530 count += adjust;
2531 SvIVX (count_sv) = count;
2532
2533 /* now wake up as many waiters as are expected to lock */
2534 while (count > 0 && AvFILLp (av) > 0)
2535 {
2536 SV *cb;
2537
2538 /* swap first two elements so we can shift a waiter */
2539 AvARRAY (av)[0] = AvARRAY (av)[1];
2540 AvARRAY (av)[1] = count_sv;
2541 cb = av_shift (av);
2542
2543 if (SvOBJECT (cb))
2544 {
2545 api_ready (aTHX_ cb);
2546 --count;
2547 }
2548 else if (SvTYPE (cb) == SVt_PVCV)
2549 {
2550 dSP;
2551 PUSHMARK (SP);
2552 XPUSHs (sv_2mortal (newRV_inc ((SV *)av)));
2553 PUTBACK;
2554 call_sv (cb, G_VOID | G_DISCARD | G_EVAL | G_KEEPERR);
2555 }
2556
2557 SvREFCNT_dec (cb);
2558 }
2559}
2560
2561static void
2562coro_semaphore_on_destroy (pTHX_ struct coro *coro)
2563{
2564 /* call $sem->adjust (0) to possibly wake up some other waiters */
2565 coro_semaphore_adjust (aTHX_ (AV *)coro->slf_frame.data, 0);
2566}
2567
2568static int
2569slf_check_semaphore_down_or_wait (pTHX_ struct CoroSLF *frame, int acquire)
2570{
2571 AV *av = (AV *)frame->data;
2572 SV *count_sv = AvARRAY (av)[0];
2573
2574 /* if we are about to throw, don't actually acquire the lock, just throw */
2575 if (CORO_THROW)
2576 return 0;
2577 else if (SvIVX (count_sv) > 0)
2578 {
2579 SvSTATE_current->on_destroy = 0;
2580
2581 if (acquire)
2582 SvIVX (count_sv) = SvIVX (count_sv) - 1;
2583 else
2584 coro_semaphore_adjust (aTHX_ av, 0);
2585
2586 return 0;
2587 }
2588 else
2589 {
2590 int i;
2591 /* if we were woken up but can't down, we look through the whole */
2592 /* waiters list and only add us if we aren't in there already */
2593 /* this avoids some degenerate memory usage cases */
2594
2595 for (i = 1; i <= AvFILLp (av); ++i)
2596 if (AvARRAY (av)[i] == SvRV (coro_current))
2597 return 1;
2598
2599 av_push (av, SvREFCNT_inc (SvRV (coro_current)));
2600 return 1;
2601 }
2602}
2603
2604static int
2605slf_check_semaphore_down (pTHX_ struct CoroSLF *frame)
2606{
2607 return slf_check_semaphore_down_or_wait (aTHX_ frame, 1);
2608}
2609
2610static int
2611slf_check_semaphore_wait (pTHX_ struct CoroSLF *frame)
2612{
2613 return slf_check_semaphore_down_or_wait (aTHX_ frame, 0);
2614}
2615
2616static void
2617slf_init_semaphore_down_or_wait (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2618{
2619 AV *av = (AV *)SvRV (arg [0]);
2620
2621 if (SvIVX (AvARRAY (av)[0]) > 0)
2622 {
2623 frame->data = (void *)av;
2624 frame->prepare = prepare_nop;
2625 }
2626 else
2627 {
2628 av_push (av, SvREFCNT_inc (SvRV (coro_current)));
2629
2630 frame->data = (void *)sv_2mortal (SvREFCNT_inc ((SV *)av));
2631 frame->prepare = prepare_schedule;
2632
2633 /* to avoid race conditions when a woken-up coro gets terminated */
2634 /* we arrange for a temporary on_destroy that calls adjust (0) */
2635 SvSTATE_current->on_destroy = coro_semaphore_on_destroy;
2636 }
2637}
2638
2639static void
2640slf_init_semaphore_down (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2641{
2642 slf_init_semaphore_down_or_wait (aTHX_ frame, cv, arg, items);
2643 frame->check = slf_check_semaphore_down;
2644}
2645
2646static void
2647slf_init_semaphore_wait (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2648{
2649 if (items >= 2)
2650 {
2651 /* callback form */
2652 AV *av = (AV *)SvRV (arg [0]);
2653 SV *cb_cv = s_get_cv_croak (arg [1]);
2654
2655 av_push (av, SvREFCNT_inc_NN (cb_cv));
2656
2657 if (SvIVX (AvARRAY (av)[0]) > 0)
2658 coro_semaphore_adjust (aTHX_ av, 0);
2659
2660 frame->prepare = prepare_nop;
2661 frame->check = slf_check_nop;
2662 }
2663 else
2664 {
2665 slf_init_semaphore_down_or_wait (aTHX_ frame, cv, arg, items);
2666 frame->check = slf_check_semaphore_wait;
2667 }
2668}
2669
2670/* signal */
2671
2672static void
2673coro_signal_wake (pTHX_ AV *av, int count)
2674{
2675 SvIVX (AvARRAY (av)[0]) = 0;
2676
2677 /* now signal count waiters */
2678 while (count > 0 && AvFILLp (av) > 0)
2679 {
2680 SV *cb;
2681
2682 /* swap first two elements so we can shift a waiter */
2683 cb = AvARRAY (av)[0];
2684 AvARRAY (av)[0] = AvARRAY (av)[1];
2685 AvARRAY (av)[1] = cb;
2686
2687 cb = av_shift (av);
2688
2689 if (SvTYPE (cb) == SVt_PVCV)
2690 {
2691 dSP;
2692 PUSHMARK (SP);
2693 XPUSHs (sv_2mortal (newRV_inc ((SV *)av)));
2694 PUTBACK;
2695 call_sv (cb, G_VOID | G_DISCARD | G_EVAL | G_KEEPERR);
2696 }
2697 else
2698 {
2699 api_ready (aTHX_ cb);
2700 sv_setiv (cb, 0); /* signal waiter */
2701 }
2702
2703 SvREFCNT_dec (cb);
2704
2705 --count;
2706 }
2707}
2708
2709static int
2710slf_check_signal_wait (pTHX_ struct CoroSLF *frame)
2711{
2712 /* if we are about to throw, also stop waiting */
2713 return SvROK ((SV *)frame->data) && !CORO_THROW;
2714}
2715
2716static void
2717slf_init_signal_wait (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2718{
2719 AV *av = (AV *)SvRV (arg [0]);
2720
2721 if (items >= 2)
2722 {
2723 SV *cb_cv = s_get_cv_croak (arg [1]);
2724 av_push (av, SvREFCNT_inc_NN (cb_cv));
2725
2726 if (SvIVX (AvARRAY (av)[0]))
2727 coro_signal_wake (aTHX_ av, 1); /* ust be the only waiter */
2728
2729 frame->prepare = prepare_nop;
2730 frame->check = slf_check_nop;
2731 }
2732 else if (SvIVX (AvARRAY (av)[0]))
2733 {
2734 SvIVX (AvARRAY (av)[0]) = 0;
2735 frame->prepare = prepare_nop;
2736 frame->check = slf_check_nop;
2737 }
2738 else
2739 {
2740 SV *waiter = newSVsv (coro_current); /* owned by signal av */
2741
2742 av_push (av, waiter);
2743
2744 frame->data = (void *)sv_2mortal (SvREFCNT_inc_NN (waiter)); /* owned by process */
2745 frame->prepare = prepare_schedule;
2746 frame->check = slf_check_signal_wait;
2747 }
2748}
2749
2750/*****************************************************************************/
2751/* Coro::AIO */
2752
2753#define CORO_MAGIC_type_aio PERL_MAGIC_ext
2754
2755/* helper storage struct */
2756struct io_state
2757{
2758 int errorno;
2759 I32 laststype; /* U16 in 5.10.0 */
2760 int laststatval;
2761 Stat_t statcache;
2762};
2763
2764static void
2765coro_aio_callback (pTHX_ CV *cv)
2766{
2767 dXSARGS;
2768 AV *state = (AV *)S_GENSUB_ARG;
2769 SV *coro = av_pop (state);
2770 SV *data_sv = newSV (sizeof (struct io_state));
2771
2772 av_extend (state, items - 1);
2773
2774 sv_upgrade (data_sv, SVt_PV);
2775 SvCUR_set (data_sv, sizeof (struct io_state));
2776 SvPOK_only (data_sv);
2777
2778 {
2779 struct io_state *data = (struct io_state *)SvPVX (data_sv);
2780
2781 data->errorno = errno;
2782 data->laststype = PL_laststype;
2783 data->laststatval = PL_laststatval;
2784 data->statcache = PL_statcache;
2785 }
2786
2787 /* now build the result vector out of all the parameters and the data_sv */
2788 {
2789 int i;
2790
2791 for (i = 0; i < items; ++i)
2792 av_push (state, SvREFCNT_inc_NN (ST (i)));
2793 }
2794
2795 av_push (state, data_sv);
2796
2797 api_ready (aTHX_ coro);
2798 SvREFCNT_dec (coro);
2799 SvREFCNT_dec ((AV *)state);
2800}
2801
2802static int
2803slf_check_aio_req (pTHX_ struct CoroSLF *frame)
2804{
2805 AV *state = (AV *)frame->data;
2806
2807 /* if we are about to throw, return early */
2808 /* this does not cancel the aio request, but at least */
2809 /* it quickly returns */
2810 if (CORO_THROW)
2811 return 0;
2812
2813 /* one element that is an RV? repeat! */
2814 if (AvFILLp (state) == 0 && SvROK (AvARRAY (state)[0]))
2815 return 1;
2816
2817 /* restore status */
2818 {
2819 SV *data_sv = av_pop (state);
2820 struct io_state *data = (struct io_state *)SvPVX (data_sv);
2821
2822 errno = data->errorno;
2823 PL_laststype = data->laststype;
2824 PL_laststatval = data->laststatval;
2825 PL_statcache = data->statcache;
2826
2827 SvREFCNT_dec (data_sv);
2828 }
2829
2830 /* push result values */
2831 {
2832 dSP;
2833 int i;
2834
2835 EXTEND (SP, AvFILLp (state) + 1);
2836 for (i = 0; i <= AvFILLp (state); ++i)
2837 PUSHs (sv_2mortal (SvREFCNT_inc_NN (AvARRAY (state)[i])));
2838
2839 PUTBACK;
2840 }
2841
2842 return 0;
2843}
2844
2845static void
2846slf_init_aio_req (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2847{
2848 AV *state = (AV *)sv_2mortal ((SV *)newAV ());
2849 SV *coro_hv = SvRV (coro_current);
2850 struct coro *coro = SvSTATE_hv (coro_hv);
2851
2852 /* put our coroutine id on the state arg */
2853 av_push (state, SvREFCNT_inc_NN (coro_hv));
2854
2855 /* first see whether we have a non-zero priority and set it as AIO prio */
2856 if (coro->prio)
2857 {
2858 dSP;
2859
2860 static SV *prio_cv;
2861 static SV *prio_sv;
2862
2863 if (expect_false (!prio_cv))
2864 {
2865 prio_cv = (SV *)get_cv ("IO::AIO::aioreq_pri", 0);
2866 prio_sv = newSViv (0);
2867 }
2868
2869 PUSHMARK (SP);
2870 sv_setiv (prio_sv, coro->prio);
2871 XPUSHs (prio_sv);
2872
2873 PUTBACK;
2874 call_sv (prio_cv, G_VOID | G_DISCARD);
2875 }
2876
2877 /* now call the original request */
2878 {
2879 dSP;
2880 CV *req = (CV *)CORO_MAGIC_NN ((SV *)cv, CORO_MAGIC_type_aio)->mg_obj;
2881 int i;
2882
2883 PUSHMARK (SP);
2884
2885 /* first push all args to the stack */
2886 EXTEND (SP, items + 1);
2887
2888 for (i = 0; i < items; ++i)
2889 PUSHs (arg [i]);
2890
2891 /* now push the callback closure */
2892 PUSHs (sv_2mortal (s_gensub (aTHX_ coro_aio_callback, (void *)SvREFCNT_inc_NN ((SV *)state))));
2893
2894 /* now call the AIO function - we assume our request is uncancelable */
2895 PUTBACK;
2896 call_sv ((SV *)req, G_VOID | G_DISCARD);
2897 }
2898
2899 /* now that the requets is going, we loop toll we have a result */
2900 frame->data = (void *)state;
2901 frame->prepare = prepare_schedule;
2902 frame->check = slf_check_aio_req;
2903}
2904
2905static void
2906coro_aio_req_xs (pTHX_ CV *cv)
2907{
2908 dXSARGS;
2909
2910 CORO_EXECUTE_SLF_XS (slf_init_aio_req);
2911
2912 XSRETURN_EMPTY;
2913}
2914
2915/*****************************************************************************/
2916
2917#if CORO_CLONE
2918# include "clone.c"
2919#endif
2920
2921/*****************************************************************************/
2922
2923static SV *
2924coro_new (pTHX_ HV *stash, SV **argv, int argc, int is_coro)
2925{
2926 SV *coro_sv;
2927 struct coro *coro;
2928 MAGIC *mg;
2929 HV *hv;
2930 SV *cb;
2931 int i;
2932
2933 if (argc > 0)
2934 {
2935 cb = s_get_cv_croak (argv [0]);
2936
2937 if (!is_coro)
2938 {
2939 if (CvISXSUB (cb))
2940 croak ("Coro::State doesn't support XS functions as coroutine start, caught");
2941
2942 if (!CvROOT (cb))
2943 croak ("Coro::State doesn't support autoloaded or undefined functions as coroutine start, caught");
2944 }
2945 }
2946
2947 Newz (0, coro, 1, struct coro);
2948 coro->args = newAV ();
2949 coro->flags = CF_NEW;
2950
2951 if (coro_first) coro_first->prev = coro;
2952 coro->next = coro_first;
2953 coro_first = coro;
2954
2955 coro->hv = hv = newHV ();
2956 mg = sv_magicext ((SV *)hv, 0, CORO_MAGIC_type_state, &coro_state_vtbl, (char *)coro, 0);
2957 mg->mg_flags |= MGf_DUP;
2958 coro_sv = sv_bless (newRV_noinc ((SV *)hv), stash);
2959
2960 if (argc > 0)
2961 {
2962 av_extend (coro->args, argc + is_coro - 1);
2963
2964 if (is_coro)
2965 {
2966 av_push (coro->args, SvREFCNT_inc_NN ((SV *)cb));
2967 cb = (SV *)cv_coro_run;
2968 }
2969
2970 coro->startcv = (CV *)SvREFCNT_inc_NN ((SV *)cb);
2971
2972 for (i = 1; i < argc; i++)
2973 av_push (coro->args, newSVsv (argv [i]));
2974 }
2975
2976 return coro_sv;
2977}
2978
2979MODULE = Coro::State PACKAGE = Coro::State PREFIX = api_
2980
2981PROTOTYPES: DISABLE
2982
2983BOOT:
2984{
2985#ifdef USE_ITHREADS
2986# if CORO_PTHREAD
2987 coro_thx = PERL_GET_CONTEXT;
2988# endif
2989#endif
2990 BOOT_PAGESIZE;
2991
2992 cctx_current = cctx_new_empty ();
2993
2994 irsgv = gv_fetchpv ("/" , GV_ADD|GV_NOTQUAL, SVt_PV);
2995 stdoutgv = gv_fetchpv ("STDOUT", GV_ADD|GV_NOTQUAL, SVt_PVIO);
2996
2997 orig_sigelem_get = PL_vtbl_sigelem.svt_get; PL_vtbl_sigelem.svt_get = coro_sigelem_get;
2998 orig_sigelem_set = PL_vtbl_sigelem.svt_set; PL_vtbl_sigelem.svt_set = coro_sigelem_set;
2999 orig_sigelem_clr = PL_vtbl_sigelem.svt_clear; PL_vtbl_sigelem.svt_clear = coro_sigelem_clr;
3000
3001 hv_sig = coro_get_hv (aTHX_ "SIG", TRUE);
3002 rv_diehook = newRV_inc ((SV *)gv_fetchpv ("Coro::State::diehook" , 0, SVt_PVCV));
3003 rv_warnhook = newRV_inc ((SV *)gv_fetchpv ("Coro::State::warnhook", 0, SVt_PVCV));
3004
3005 coro_state_stash = gv_stashpv ("Coro::State", TRUE);
3006
3007 newCONSTSUB (coro_state_stash, "CC_TRACE" , newSViv (CC_TRACE));
3008 newCONSTSUB (coro_state_stash, "CC_TRACE_SUB" , newSViv (CC_TRACE_SUB));
3009 newCONSTSUB (coro_state_stash, "CC_TRACE_LINE", newSViv (CC_TRACE_LINE));
3010 newCONSTSUB (coro_state_stash, "CC_TRACE_ALL" , newSViv (CC_TRACE_ALL));
3011
3012 main_mainstack = PL_mainstack;
3013 main_top_env = PL_top_env;
3014
3015 while (main_top_env->je_prev)
3016 main_top_env = main_top_env->je_prev;
3017
3018 {
3019 SV *slf = sv_2mortal (newSViv (PTR2IV (pp_slf)));
3020
3021 if (!PL_custom_op_names) PL_custom_op_names = newHV ();
3022 hv_store_ent (PL_custom_op_names, slf, newSVpv ("coro_slf", 0), 0);
3023
3024 if (!PL_custom_op_descs) PL_custom_op_descs = newHV ();
3025 hv_store_ent (PL_custom_op_descs, slf, newSVpv ("coro schedule like function", 0), 0);
3026 }
3027
3028 coroapi.ver = CORO_API_VERSION;
3029 coroapi.rev = CORO_API_REVISION;
3030
3031 coroapi.transfer = api_transfer;
3032
3033 coroapi.sv_state = SvSTATE_;
3034 coroapi.execute_slf = api_execute_slf;
3035 coroapi.prepare_nop = prepare_nop;
3036 coroapi.prepare_schedule = prepare_schedule;
3037 coroapi.prepare_cede = prepare_cede;
3038 coroapi.prepare_cede_notself = prepare_cede_notself;
3039
3040 {
3041 SV **svp = hv_fetch (PL_modglobal, "Time::NVtime", 12, 0);
3042
3043 if (!svp) croak ("Time::HiRes is required");
3044 if (!SvIOK (*svp)) croak ("Time::NVtime isn't a function pointer");
3045
3046 nvtime = INT2PTR (double (*)(), SvIV (*svp));
3047
3048 svp = hv_fetch (PL_modglobal, "Time::U2time", 12, 0);
3049 u2time = INT2PTR (void (*)(pTHX_ UV ret[2]), SvIV (*svp));
3050 }
3051
3052 assert (("PRIO_NORMAL must be 0", !CORO_PRIO_NORMAL));
3053}
3054
3055SV *
3056new (SV *klass, ...)
3057 ALIAS:
3058 Coro::new = 1
3059 CODE:
3060 RETVAL = coro_new (aTHX_ ix ? coro_stash : coro_state_stash, &ST (1), items - 1, ix);
3061 OUTPUT:
3062 RETVAL
3063
3064void
3065transfer (...)
3066 PROTOTYPE: $$
3067 CODE:
3068 CORO_EXECUTE_SLF_XS (slf_init_transfer);
3069
3070bool
3071_destroy (SV *coro_sv)
3072 CODE:
3073 RETVAL = coro_state_destroy (aTHX_ SvSTATE (coro_sv));
3074 OUTPUT:
3075 RETVAL
3076
3077void
3078_exit (int code)
3079 PROTOTYPE: $
3080 CODE:
3081 _exit (code);
3082
3083SV *
3084clone (Coro::State coro)
3085 CODE:
3086{
3087#if CORO_CLONE
3088 struct coro *ncoro = coro_clone (aTHX_ coro);
3089 MAGIC *mg;
3090 /* TODO: too much duplication */
3091 ncoro->hv = newHV ();
3092 mg = sv_magicext ((SV *)ncoro->hv, 0, CORO_MAGIC_type_state, &coro_state_vtbl, (char *)ncoro, 0);
3093 mg->mg_flags |= MGf_DUP;
3094 RETVAL = sv_bless (newRV_noinc ((SV *)ncoro->hv), SvSTASH (coro->hv));
3095#else
3096 croak ("Coro::State->clone has not been configured into this installation of Coro, realised");
3097#endif
3098}
3099 OUTPUT:
3100 RETVAL
3101
3102int
3103cctx_stacksize (int new_stacksize = 0)
3104 PROTOTYPE: ;$
3105 CODE:
3106 RETVAL = cctx_stacksize;
3107 if (new_stacksize)
3108 {
3109 cctx_stacksize = new_stacksize;
3110 ++cctx_gen;
3111 }
3112 OUTPUT:
3113 RETVAL
3114
3115int
3116cctx_max_idle (int max_idle = 0)
3117 PROTOTYPE: ;$
3118 CODE:
3119 RETVAL = cctx_max_idle;
3120 if (max_idle > 1)
3121 cctx_max_idle = max_idle;
3122 OUTPUT:
3123 RETVAL
3124
3125int
3126cctx_count ()
3127 PROTOTYPE:
3128 CODE:
3129 RETVAL = cctx_count;
3130 OUTPUT:
3131 RETVAL
3132
3133int
3134cctx_idle ()
3135 PROTOTYPE:
3136 CODE:
3137 RETVAL = cctx_idle;
3138 OUTPUT:
3139 RETVAL
3140
3141void
3142list ()
3143 PROTOTYPE:
3144 PPCODE:
3145{
3146 struct coro *coro;
3147 for (coro = coro_first; coro; coro = coro->next)
3148 if (coro->hv)
3149 XPUSHs (sv_2mortal (newRV_inc ((SV *)coro->hv)));
3150}
3151
3152void
3153call (Coro::State coro, SV *coderef)
3154 ALIAS:
3155 eval = 1
3156 CODE:
3157{
3158 if (coro->mainstack && ((coro->flags & CF_RUNNING) || coro->slot))
3159 {
3160 struct coro *current = SvSTATE_current;
3161
3162 if (current != coro)
235 { 3163 {
236 /* I never used formats, so how should I know how these are implemented? */ 3164 PUTBACK;
237 /* my bold guess is as a simple, plain sub... */ 3165 save_perl (aTHX_ current);
238 croak ("CXt_FORMAT not yet handled. Don't switch coroutines from within formats"); 3166 load_perl (aTHX_ coro);
3167 SPAGAIN;
3168 }
3169
3170 PUSHSTACK;
3171
3172 PUSHMARK (SP);
3173 PUTBACK;
3174
3175 if (ix)
3176 eval_sv (coderef, 0);
3177 else
3178 call_sv (coderef, G_KEEPERR | G_EVAL | G_VOID | G_DISCARD);
3179
3180 POPSTACK;
3181 SPAGAIN;
3182
3183 if (current != coro)
3184 {
3185 PUTBACK;
3186 save_perl (aTHX_ coro);
3187 load_perl (aTHX_ current);
3188 SPAGAIN;
239 } 3189 }
240 } 3190 }
241
242 if (top_si->si_type == PERLSI_MAIN)
243 break;
244
245 top_si = top_si->si_prev;
246 ccstk = top_si->si_cxstack;
247 cxix = top_si->si_cxix;
248 }
249
250 PUTBACK;
251 }
252
253 c->dowarn = PL_dowarn;
254 c->defav = GvAV (PL_defgv);
255 c->curstackinfo = PL_curstackinfo;
256 c->curstack = PL_curstack;
257 c->mainstack = PL_mainstack;
258 c->stack_sp = PL_stack_sp;
259 c->op = PL_op;
260 c->curpad = PL_curpad;
261 c->stack_base = PL_stack_base;
262 c->stack_max = PL_stack_max;
263 c->tmps_stack = PL_tmps_stack;
264 c->tmps_floor = PL_tmps_floor;
265 c->tmps_ix = PL_tmps_ix;
266 c->tmps_max = PL_tmps_max;
267 c->markstack = PL_markstack;
268 c->markstack_ptr = PL_markstack_ptr;
269 c->markstack_max = PL_markstack_max;
270 c->scopestack = PL_scopestack;
271 c->scopestack_ix = PL_scopestack_ix;
272 c->scopestack_max = PL_scopestack_max;
273 c->savestack = PL_savestack;
274 c->savestack_ix = PL_savestack_ix;
275 c->savestack_max = PL_savestack_max;
276 c->retstack = PL_retstack;
277 c->retstack_ix = PL_retstack_ix;
278 c->retstack_max = PL_retstack_max;
279 c->curcop = PL_curcop;
280} 3191}
281 3192
282static void 3193SV *
283LOAD(pTHX_ Coro__State c) 3194is_ready (Coro::State coro)
284{
285 PL_dowarn = c->dowarn;
286 GvAV (PL_defgv) = c->defav;
287 PL_curstackinfo = c->curstackinfo;
288 PL_curstack = c->curstack;
289 PL_mainstack = c->mainstack;
290 PL_stack_sp = c->stack_sp;
291 PL_op = c->op;
292 PL_curpad = c->curpad;
293 PL_stack_base = c->stack_base;
294 PL_stack_max = c->stack_max;
295 PL_tmps_stack = c->tmps_stack;
296 PL_tmps_floor = c->tmps_floor;
297 PL_tmps_ix = c->tmps_ix;
298 PL_tmps_max = c->tmps_max;
299 PL_markstack = c->markstack;
300 PL_markstack_ptr = c->markstack_ptr;
301 PL_markstack_max = c->markstack_max;
302 PL_scopestack = c->scopestack;
303 PL_scopestack_ix = c->scopestack_ix;
304 PL_scopestack_max = c->scopestack_max;
305 PL_savestack = c->savestack;
306 PL_savestack_ix = c->savestack_ix;
307 PL_savestack_max = c->savestack_max;
308 PL_retstack = c->retstack;
309 PL_retstack_ix = c->retstack_ix;
310 PL_retstack_max = c->retstack_max;
311 PL_curcop = c->curcop;
312
313 {
314 dSP;
315 CV *cv;
316
317 /* now do the ugly restore mess */
318 while ((cv = (CV *)POPs))
319 {
320 AV *padlist = (AV *)POPs;
321
322 put_padlist (cv);
323 CvPADLIST(cv) = padlist;
324 CvDEPTH(cv) = (I32)POPs;
325
326#ifdef USE_THREADS
327 CvOWNER(cv) = (struct perl_thread *)POPs;
328 error does not work either
329#endif
330 }
331
332 PUTBACK;
333 }
334}
335
336/* this is an EXACT copy of S_nuke_stacks in perl.c, which is unfortunately static */
337STATIC void
338destroy_stacks(pTHX)
339{
340 /* die does this while calling POPSTACK, but I just don't see why. */
341 dounwind(-1);
342
343 /* is this ugly, I ask? */
344 while (PL_scopestack_ix)
345 LEAVE;
346
347 while (PL_curstackinfo->si_next)
348 PL_curstackinfo = PL_curstackinfo->si_next;
349
350 while (PL_curstackinfo)
351 {
352 PERL_SI *p = PL_curstackinfo->si_prev;
353
354 SvREFCNT_dec(PL_curstackinfo->si_stack);
355 Safefree(PL_curstackinfo->si_cxstack);
356 Safefree(PL_curstackinfo);
357 PL_curstackinfo = p;
358 }
359
360 if (PL_scopestack_ix != 0)
361 Perl_warner(aTHX_ WARN_INTERNAL,
362 "Unbalanced scopes: %ld more ENTERs than LEAVEs\n",
363 (long)PL_scopestack_ix);
364 if (PL_savestack_ix != 0)
365 Perl_warner(aTHX_ WARN_INTERNAL,
366 "Unbalanced saves: %ld more saves than restores\n",
367 (long)PL_savestack_ix);
368 if (PL_tmps_floor != -1)
369 Perl_warner(aTHX_ WARN_INTERNAL,"Unbalanced tmps: %ld more allocs than frees\n",
370 (long)PL_tmps_floor + 1);
371 /*
372 */
373 Safefree(PL_tmps_stack);
374 Safefree(PL_markstack);
375 Safefree(PL_scopestack);
376 Safefree(PL_savestack);
377 Safefree(PL_retstack);
378}
379
380#define SUB_INIT "Coro::State::_newcoro"
381
382MODULE = Coro::State PACKAGE = Coro::State
383
384PROTOTYPES: ENABLE
385
386BOOT:
387 if (!padlist_cache)
388 padlist_cache = newHV ();
389
390Coro::State
391_newprocess(args)
392 SV * args
393 PROTOTYPE: $ 3195 PROTOTYPE: $
3196 ALIAS:
3197 is_ready = CF_READY
3198 is_running = CF_RUNNING
3199 is_new = CF_NEW
3200 is_destroyed = CF_DESTROYED
3201 is_suspended = CF_SUSPENDED
3202 CODE:
3203 RETVAL = boolSV (coro->flags & ix);
3204 OUTPUT:
3205 RETVAL
3206
3207void
3208throw (Coro::State self, SV *throw = &PL_sv_undef)
3209 PROTOTYPE: $;$
394 CODE: 3210 CODE:
395 Coro__State coro; 3211{
3212 struct coro *current = SvSTATE_current;
3213 SV **throwp = self == current ? &CORO_THROW : &self->except;
3214 SvREFCNT_dec (*throwp);
3215 SvGETMAGIC (throw);
3216 *throwp = SvOK (throw) ? newSVsv (throw) : 0;
3217}
396 3218
397 if (!SvROK (args) || SvTYPE (SvRV (args)) != SVt_PVAV) 3219void
398 croak ("Coro::State::newprocess expects an arrayref"); 3220api_trace (SV *coro, int flags = CC_TRACE | CC_TRACE_SUB)
3221 PROTOTYPE: $;$
3222 C_ARGS: aTHX_ coro, flags
3223
3224SV *
3225has_cctx (Coro::State coro)
3226 PROTOTYPE: $
3227 CODE:
3228 /* maybe manage the running flag differently */
3229 RETVAL = boolSV (!!coro->cctx || (coro->flags & CF_RUNNING));
3230 OUTPUT:
3231 RETVAL
3232
3233int
3234is_traced (Coro::State coro)
3235 PROTOTYPE: $
3236 CODE:
3237 RETVAL = (coro->cctx ? coro->cctx->flags : 0) & CC_TRACE_ALL;
3238 OUTPUT:
3239 RETVAL
3240
3241UV
3242rss (Coro::State coro)
3243 PROTOTYPE: $
3244 ALIAS:
3245 usecount = 1
3246 CODE:
3247 switch (ix)
3248 {
3249 case 0: RETVAL = coro_rss (aTHX_ coro); break;
3250 case 1: RETVAL = coro->usecount; break;
399 3251 }
400 New (0, coro, 1, struct coro); 3252 OUTPUT:
401
402 coro->mainstack = 0; /* actual work is done inside transfer */
403 coro->args = (AV *)SvREFCNT_inc (SvRV (args));
404
405 RETVAL = coro; 3253 RETVAL
3254
3255void
3256force_cctx ()
3257 PROTOTYPE:
3258 CODE:
3259 cctx_current->idle_sp = 0;
3260
3261void
3262swap_defsv (Coro::State self)
3263 PROTOTYPE: $
3264 ALIAS:
3265 swap_defav = 1
3266 CODE:
3267 if (!self->slot)
3268 croak ("cannot swap state with coroutine that has no saved state,");
3269 else
3270 {
3271 SV **src = ix ? (SV **)&GvAV (PL_defgv) : &GvSV (PL_defgv);
3272 SV **dst = ix ? (SV **)&self->slot->defav : (SV **)&self->slot->defsv;
3273
3274 SV *tmp = *src; *src = *dst; *dst = tmp;
3275 }
3276
3277void
3278cancel (Coro::State self)
3279 CODE:
3280 coro_state_destroy (aTHX_ self);
3281 coro_call_on_destroy (aTHX_ self); /* actually only for Coro objects */
3282
3283
3284SV *
3285enable_times (int enabled = enable_times)
3286 CODE:
3287{
3288 RETVAL = boolSV (enable_times);
3289
3290 if (enabled != enable_times)
3291 {
3292 enable_times = enabled;
3293
3294 coro_times_update ();
3295 (enabled ? coro_times_sub : coro_times_add)(SvSTATE (coro_current));
3296 }
3297}
406 OUTPUT: 3298 OUTPUT:
407 RETVAL 3299 RETVAL
408 3300
409void 3301void
410transfer(prev,next) 3302times (Coro::State self)
411 Coro::State_or_hashref prev 3303 PPCODE:
412 Coro::State_or_hashref next 3304{
3305 struct coro *current = SvSTATE (coro_current);
3306
3307 if (expect_false (current == self))
3308 {
3309 coro_times_update ();
3310 coro_times_add (SvSTATE (coro_current));
3311 }
3312
3313 EXTEND (SP, 2);
3314 PUSHs (sv_2mortal (newSVnv (self->t_real [0] + self->t_real [1] * 1e-9)));
3315 PUSHs (sv_2mortal (newSVnv (self->t_cpu [0] + self->t_cpu [1] * 1e-9)));
3316
3317 if (expect_false (current == self))
3318 coro_times_sub (SvSTATE (coro_current));
3319}
3320
3321MODULE = Coro::State PACKAGE = Coro
3322
3323BOOT:
3324{
3325 sv_pool_rss = coro_get_sv (aTHX_ "Coro::POOL_RSS" , TRUE);
3326 sv_pool_size = coro_get_sv (aTHX_ "Coro::POOL_SIZE" , TRUE);
3327 cv_coro_run = get_cv ( "Coro::_coro_run" , GV_ADD);
3328 cv_coro_terminate = get_cv ( "Coro::terminate" , GV_ADD);
3329 coro_current = coro_get_sv (aTHX_ "Coro::current" , FALSE); SvREADONLY_on (coro_current);
3330 av_async_pool = coro_get_av (aTHX_ "Coro::async_pool", TRUE);
3331 av_destroy = coro_get_av (aTHX_ "Coro::destroy" , TRUE);
3332 sv_manager = coro_get_sv (aTHX_ "Coro::manager" , TRUE);
3333 sv_idle = coro_get_sv (aTHX_ "Coro::idle" , TRUE);
3334
3335 sv_async_pool_idle = newSVpv ("[async pool idle]", 0); SvREADONLY_on (sv_async_pool_idle);
3336 sv_Coro = newSVpv ("Coro", 0); SvREADONLY_on (sv_Coro);
3337 cv_pool_handler = get_cv ("Coro::pool_handler", GV_ADD); SvREADONLY_on (cv_pool_handler);
3338 CvNODEBUG_on (get_cv ("Coro::_pool_handler", 0)); /* work around a debugger bug */
3339
3340 coro_stash = gv_stashpv ("Coro", TRUE);
3341
3342 newCONSTSUB (coro_stash, "PRIO_MAX", newSViv (CORO_PRIO_MAX));
3343 newCONSTSUB (coro_stash, "PRIO_HIGH", newSViv (CORO_PRIO_HIGH));
3344 newCONSTSUB (coro_stash, "PRIO_NORMAL", newSViv (CORO_PRIO_NORMAL));
3345 newCONSTSUB (coro_stash, "PRIO_LOW", newSViv (CORO_PRIO_LOW));
3346 newCONSTSUB (coro_stash, "PRIO_IDLE", newSViv (CORO_PRIO_IDLE));
3347 newCONSTSUB (coro_stash, "PRIO_MIN", newSViv (CORO_PRIO_MIN));
3348
3349 {
3350 SV *sv = coro_get_sv (aTHX_ "Coro::API", TRUE);
3351
3352 coroapi.schedule = api_schedule;
3353 coroapi.schedule_to = api_schedule_to;
3354 coroapi.cede = api_cede;
3355 coroapi.cede_notself = api_cede_notself;
3356 coroapi.ready = api_ready;
3357 coroapi.is_ready = api_is_ready;
3358 coroapi.nready = coro_nready;
3359 coroapi.current = coro_current;
3360
3361 /*GCoroAPI = &coroapi;*/
3362 sv_setiv (sv, (IV)&coroapi);
3363 SvREADONLY_on (sv);
3364 }
3365}
3366
3367SV *
3368async (...)
3369 PROTOTYPE: &@
413 CODE: 3370 CODE:
3371 RETVAL = coro_new (aTHX_ coro_stash, &ST (0), items, 1);
3372 api_ready (aTHX_ RETVAL);
3373 OUTPUT:
3374 RETVAL
414 3375
415 if (prev != next) 3376void
3377terminate (...)
3378 CODE:
3379 CORO_EXECUTE_SLF_XS (slf_init_terminate);
3380
3381void
3382schedule (...)
3383 CODE:
3384 CORO_EXECUTE_SLF_XS (slf_init_schedule);
3385
3386void
3387schedule_to (...)
3388 CODE:
3389 CORO_EXECUTE_SLF_XS (slf_init_schedule_to);
3390
3391void
3392cede_to (...)
3393 CODE:
3394 CORO_EXECUTE_SLF_XS (slf_init_cede_to);
3395
3396void
3397cede (...)
3398 CODE:
3399 CORO_EXECUTE_SLF_XS (slf_init_cede);
3400
3401void
3402cede_notself (...)
3403 CODE:
3404 CORO_EXECUTE_SLF_XS (slf_init_cede_notself);
3405
3406void
3407_set_current (SV *current)
3408 PROTOTYPE: $
3409 CODE:
3410 SvREFCNT_dec (SvRV (coro_current));
3411 SvRV_set (coro_current, SvREFCNT_inc_NN (SvRV (current)));
3412
3413void
3414_set_readyhook (SV *hook)
3415 PROTOTYPE: $
3416 CODE:
3417 SvREFCNT_dec (coro_readyhook);
3418 SvGETMAGIC (hook);
3419 if (SvOK (hook))
3420 {
3421 coro_readyhook = newSVsv (hook);
3422 CORO_READYHOOK = invoke_sv_ready_hook_helper;
3423 }
3424 else
416 { 3425 {
3426 coro_readyhook = 0;
3427 CORO_READYHOOK = 0;
3428 }
3429
3430int
3431prio (Coro::State coro, int newprio = 0)
3432 PROTOTYPE: $;$
3433 ALIAS:
3434 nice = 1
3435 CODE:
3436{
3437 RETVAL = coro->prio;
3438
3439 if (items > 1)
3440 {
3441 if (ix)
3442 newprio = coro->prio - newprio;
3443
3444 if (newprio < CORO_PRIO_MIN) newprio = CORO_PRIO_MIN;
3445 if (newprio > CORO_PRIO_MAX) newprio = CORO_PRIO_MAX;
3446
3447 coro->prio = newprio;
3448 }
3449}
3450 OUTPUT:
3451 RETVAL
3452
3453SV *
3454ready (SV *self)
3455 PROTOTYPE: $
3456 CODE:
3457 RETVAL = boolSV (api_ready (aTHX_ self));
3458 OUTPUT:
3459 RETVAL
3460
3461int
3462nready (...)
3463 PROTOTYPE:
3464 CODE:
3465 RETVAL = coro_nready;
3466 OUTPUT:
3467 RETVAL
3468
3469void
3470suspend (Coro::State self)
3471 PROTOTYPE: $
3472 CODE:
3473 self->flags |= CF_SUSPENDED;
3474
3475void
3476resume (Coro::State self)
3477 PROTOTYPE: $
3478 CODE:
3479 self->flags &= ~CF_SUSPENDED;
3480
3481void
3482_pool_handler (...)
3483 CODE:
3484 CORO_EXECUTE_SLF_XS (slf_init_pool_handler);
3485
3486void
3487async_pool (SV *cv, ...)
3488 PROTOTYPE: &@
3489 PPCODE:
3490{
3491 HV *hv = (HV *)av_pop (av_async_pool);
3492 AV *av = newAV ();
3493 SV *cb = ST (0);
3494 int i;
3495
3496 av_extend (av, items - 2);
3497 for (i = 1; i < items; ++i)
3498 av_push (av, SvREFCNT_inc_NN (ST (i)));
3499
3500 if ((SV *)hv == &PL_sv_undef)
3501 {
3502 SV *sv = coro_new (aTHX_ coro_stash, (SV **)&cv_pool_handler, 1, 1);
3503 hv = (HV *)SvREFCNT_inc_NN (SvRV (sv));
3504 SvREFCNT_dec (sv);
3505 }
3506
3507 {
3508 struct coro *coro = SvSTATE_hv (hv);
3509
3510 assert (!coro->invoke_cb);
3511 assert (!coro->invoke_av);
3512 coro->invoke_cb = SvREFCNT_inc (cb);
3513 coro->invoke_av = av;
3514 }
3515
3516 api_ready (aTHX_ (SV *)hv);
3517
3518 if (GIMME_V != G_VOID)
3519 XPUSHs (sv_2mortal (newRV_noinc ((SV *)hv)));
3520 else
3521 SvREFCNT_dec (hv);
3522}
3523
3524SV *
3525rouse_cb ()
3526 PROTOTYPE:
3527 CODE:
3528 RETVAL = coro_new_rouse_cb (aTHX);
3529 OUTPUT:
3530 RETVAL
3531
3532void
3533rouse_wait (...)
3534 PROTOTYPE: ;$
3535 PPCODE:
3536 CORO_EXECUTE_SLF_XS (slf_init_rouse_wait);
3537
3538void
3539on_enter (SV *block)
3540 ALIAS:
3541 on_leave = 1
3542 PROTOTYPE: &
3543 CODE:
3544{
3545 struct coro *coro = SvSTATE_current;
3546 AV **avp = ix ? &coro->on_leave : &coro->on_enter;
3547
3548 block = s_get_cv_croak (block);
3549
3550 if (!*avp)
3551 *avp = newAV ();
3552
3553 av_push (*avp, SvREFCNT_inc (block));
3554
3555 if (!ix)
3556 on_enterleave_call (aTHX_ block);
3557
3558 LEAVE; /* pp_entersub unfortunately forces an ENTER/LEAVE around XS calls */
3559 SAVEDESTRUCTOR_X (ix ? coro_pop_on_leave : coro_pop_on_enter, (void *)coro);
3560 ENTER; /* pp_entersub unfortunately forces an ENTER/LEAVE around XS calls */
3561}
3562
3563
3564MODULE = Coro::State PACKAGE = PerlIO::cede
3565
3566BOOT:
3567 PerlIO_define_layer (aTHX_ &PerlIO_cede);
3568
3569
3570MODULE = Coro::State PACKAGE = Coro::Semaphore
3571
3572SV *
3573new (SV *klass, SV *count = 0)
3574 CODE:
3575{
3576 int semcnt = 1;
3577
3578 if (count)
3579 {
3580 SvGETMAGIC (count);
3581
3582 if (SvOK (count))
3583 semcnt = SvIV (count);
3584 }
3585
3586 RETVAL = sv_bless (
3587 coro_waitarray_new (aTHX_ semcnt),
3588 GvSTASH (CvGV (cv))
3589 );
3590}
3591 OUTPUT:
3592 RETVAL
3593
3594# helper for Coro::Channel and others
3595SV *
3596_alloc (int count)
3597 CODE:
3598 RETVAL = coro_waitarray_new (aTHX_ count);
3599 OUTPUT:
3600 RETVAL
3601
3602SV *
3603count (SV *self)
3604 CODE:
3605 RETVAL = newSVsv (AvARRAY ((AV *)SvRV (self))[0]);
3606 OUTPUT:
3607 RETVAL
3608
3609void
3610up (SV *self, int adjust = 1)
3611 ALIAS:
3612 adjust = 1
3613 CODE:
3614 coro_semaphore_adjust (aTHX_ (AV *)SvRV (self), ix ? adjust : 1);
3615
3616void
3617down (...)
3618 CODE:
3619 CORO_EXECUTE_SLF_XS (slf_init_semaphore_down);
3620
3621void
3622wait (...)
3623 CODE:
3624 CORO_EXECUTE_SLF_XS (slf_init_semaphore_wait);
3625
3626void
3627try (SV *self)
3628 PPCODE:
3629{
3630 AV *av = (AV *)SvRV (self);
3631 SV *count_sv = AvARRAY (av)[0];
3632 IV count = SvIVX (count_sv);
3633
3634 if (count > 0)
3635 {
3636 --count;
3637 SvIVX (count_sv) = count;
3638 XSRETURN_YES;
3639 }
3640 else
3641 XSRETURN_NO;
3642}
3643
3644void
3645waiters (SV *self)
3646 PPCODE:
3647{
3648 AV *av = (AV *)SvRV (self);
3649 int wcount = AvFILLp (av) + 1 - 1;
3650
3651 if (GIMME_V == G_SCALAR)
3652 XPUSHs (sv_2mortal (newSViv (wcount)));
3653 else
3654 {
3655 int i;
3656 EXTEND (SP, wcount);
3657 for (i = 1; i <= wcount; ++i)
3658 PUSHs (sv_2mortal (newRV_inc (AvARRAY (av)[i])));
3659 }
3660}
3661
3662MODULE = Coro::State PACKAGE = Coro::SemaphoreSet
3663
3664void
3665_may_delete (SV *sem, int count, int extra_refs)
3666 PPCODE:
3667{
3668 AV *av = (AV *)SvRV (sem);
3669
3670 if (SvREFCNT ((SV *)av) == 1 + extra_refs
3671 && AvFILLp (av) == 0 /* no waiters, just count */
3672 && SvIV (AvARRAY (av)[0]) == count)
3673 XSRETURN_YES;
3674
3675 XSRETURN_NO;
3676}
3677
3678MODULE = Coro::State PACKAGE = Coro::Signal
3679
3680SV *
3681new (SV *klass)
3682 CODE:
3683 RETVAL = sv_bless (
3684 coro_waitarray_new (aTHX_ 0),
3685 GvSTASH (CvGV (cv))
3686 );
3687 OUTPUT:
3688 RETVAL
3689
3690void
3691wait (...)
3692 CODE:
3693 CORO_EXECUTE_SLF_XS (slf_init_signal_wait);
3694
3695void
3696broadcast (SV *self)
3697 CODE:
3698{
3699 AV *av = (AV *)SvRV (self);
3700 coro_signal_wake (aTHX_ av, AvFILLp (av));
3701}
3702
3703void
3704send (SV *self)
3705 CODE:
3706{
3707 AV *av = (AV *)SvRV (self);
3708
3709 if (AvFILLp (av))
3710 coro_signal_wake (aTHX_ av, 1);
3711 else
3712 SvIVX (AvARRAY (av)[0]) = 1; /* remember the signal */
3713}
3714
3715IV
3716awaited (SV *self)
3717 CODE:
3718 RETVAL = AvFILLp ((AV *)SvRV (self)) + 1 - 1;
3719 OUTPUT:
3720 RETVAL
3721
3722
3723MODULE = Coro::State PACKAGE = Coro::AnyEvent
3724
3725BOOT:
3726 sv_activity = coro_get_sv (aTHX_ "Coro::AnyEvent::ACTIVITY", TRUE);
3727
3728void
3729_schedule (...)
3730 CODE:
3731{
3732 static int incede;
3733
3734 api_cede_notself (aTHX);
3735
3736 ++incede;
3737 while (coro_nready >= incede && api_cede (aTHX))
3738 ;
3739
3740 sv_setsv (sv_activity, &PL_sv_undef);
3741 if (coro_nready >= incede)
3742 {
3743 PUSHMARK (SP);
417 PUTBACK; 3744 PUTBACK;
418 SAVE (aTHX_ prev); 3745 call_pv ("Coro::AnyEvent::_activity", G_KEEPERR | G_EVAL | G_VOID | G_DISCARD);
419
420 /*
421 * this could be done in newprocess which would lead to
422 * extremely elegant and fast (just PUTBACK/SAVE/LOAD/SPAGAIN)
423 * code here, but lazy allocation of stacks has also
424 * some virtues and the overhead of the if() is nil.
425 */
426 if (next->mainstack)
427 {
428 LOAD (aTHX_ next);
429 next->mainstack = 0; /* unnecessary but much cleaner */
430 SPAGAIN;
431 }
432 else
433 {
434 /*
435 * emulate part of the perl startup here.
436 */
437 UNOP myop;
438
439 init_stacks (); /* from perl.c */
440 PL_op = (OP *)&myop;
441 /*PL_curcop = 0;*/
442 GvAV (PL_defgv) = (AV *)SvREFCNT_inc ((SV *)next->args);
443
444 SPAGAIN;
445 Zero(&myop, 1, UNOP);
446 myop.op_next = Nullop;
447 myop.op_flags = OPf_WANT_VOID;
448
449 PUSHMARK(SP);
450 XPUSHs ((SV*)get_cv(SUB_INIT, TRUE));
451 PUTBACK;
452 /*
453 * the next line is slightly wrong, as PL_op->op_next
454 * is actually being executed so we skip the first op.
455 * that doesn't matter, though, since it is only
456 * pp_nextstate and we never return...
457 */
458 PL_op = Perl_pp_entersub(aTHX);
459 SPAGAIN;
460
461 ENTER;
462 }
463 } 3746 }
464 3747
3748 --incede;
3749}
3750
3751
3752MODULE = Coro::State PACKAGE = Coro::AIO
3753
465void 3754void
466DESTROY(coro) 3755_register (char *target, char *proto, SV *req)
467 Coro::State coro 3756 CODE:
468 CODE: 3757{
3758 SV *req_cv = s_get_cv_croak (req);
3759 /* newXSproto doesn't return the CV on 5.8 */
3760 CV *slf_cv = newXS (target, coro_aio_req_xs, __FILE__);
3761 sv_setpv ((SV *)slf_cv, proto);
3762 sv_magicext ((SV *)slf_cv, (SV *)req_cv, CORO_MAGIC_type_aio, 0, 0, 0);
3763}
469 3764
470 if (coro->mainstack) 3765MODULE = Coro::State PACKAGE = Coro::Select
3766
3767void
3768patch_pp_sselect ()
3769 CODE:
3770 if (!coro_old_pp_sselect)
471 { 3771 {
472 struct coro temp; 3772 coro_select_select = (SV *)get_cv ("Coro::Select::select", 0);
473 3773 coro_old_pp_sselect = PL_ppaddr [OP_SSELECT];
474 PUTBACK; 3774 PL_ppaddr [OP_SSELECT] = coro_pp_sselect;
475 SAVE(aTHX_ (&temp));
476 LOAD(aTHX_ coro);
477
478 destroy_stacks ();
479 SvREFCNT_dec ((SV *)GvAV (PL_defgv));
480
481 LOAD((&temp));
482 SPAGAIN;
483 } 3775 }
484 3776
485 SvREFCNT_dec (coro->args); 3777void
486 Safefree (coro); 3778unpatch_pp_sselect ()
3779 CODE:
3780 if (coro_old_pp_sselect)
3781 {
3782 PL_ppaddr [OP_SSELECT] = coro_old_pp_sselect;
3783 coro_old_pp_sselect = 0;
3784 }
487 3785
488 3786

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines