ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/Coro/Coro/State.xs
(Generate patch)

Comparing Coro/Coro/State.xs (file contents):
Revision 1.4 by root, Tue Jul 17 02:21:56 2001 UTC vs.
Revision 1.373 by root, Thu Oct 1 23:51:33 2009 UTC

1#include "libcoro/coro.c"
2
3#define PERL_NO_GET_CONTEXT
4#define PERL_EXT
5
1#include "EXTERN.h" 6#include "EXTERN.h"
2#include "perl.h" 7#include "perl.h"
3#include "XSUB.h" 8#include "XSUB.h"
9#include "perliol.h"
4 10
5#if 0 11#include "schmorp.h"
6# define CHK(x) (void *)0 12
13#include <stdio.h>
14#include <errno.h>
15#include <assert.h>
16
17#ifdef WIN32
18# undef setjmp
19# undef longjmp
20# undef _exit
21# define setjmp _setjmp /* deep magic */
7#else 22#else
8# define CHK(x) if (!(x)) croak("FATAL, CHK: " #x) 23# include <inttypes.h> /* most portable stdint.h */
9#endif 24#endif
10 25
26#ifdef HAVE_MMAP
27# include <unistd.h>
28# include <sys/mman.h>
29# ifndef MAP_ANONYMOUS
30# ifdef MAP_ANON
31# define MAP_ANONYMOUS MAP_ANON
32# else
33# undef HAVE_MMAP
34# endif
35# endif
36# include <limits.h>
37# ifndef PAGESIZE
38# define PAGESIZE pagesize
39# define BOOT_PAGESIZE pagesize = sysconf (_SC_PAGESIZE)
40static long pagesize;
41# else
42# define BOOT_PAGESIZE (void)0
43# endif
44#else
45# define PAGESIZE 0
46# define BOOT_PAGESIZE (void)0
47#endif
48
49#if CORO_USE_VALGRIND
50# include <valgrind/valgrind.h>
51#endif
52
53/* the maximum number of idle cctx that will be pooled */
54static int cctx_max_idle = 4;
55
56#if !__i386 && !__x86_64 && !__powerpc && !__m68k && !__alpha && !__mips && !__sparc64
57# undef CORO_STACKGUARD
58#endif
59
60#ifndef CORO_STACKGUARD
61# define CORO_STACKGUARD 0
62#endif
63
64/* prefer perl internal functions over our own? */
65#ifndef CORO_PREFER_PERL_FUNCTIONS
66# define CORO_PREFER_PERL_FUNCTIONS 0
67#endif
68
69/* The next macros try to return the current stack pointer, in an as
70 * portable way as possible. */
71#if __GNUC__ >= 4
72# define dSTACKLEVEL int stacklevel_dummy
73# define STACKLEVEL __builtin_frame_address (0)
74#else
75# define dSTACKLEVEL volatile void *stacklevel
76# define STACKLEVEL ((void *)&stacklevel)
77#endif
78
79#define IN_DESTRUCT PL_dirty
80
81#if __GNUC__ >= 3
82# define attribute(x) __attribute__(x)
83# define expect(expr,value) __builtin_expect ((expr), (value))
84# define INLINE static inline
85#else
86# define attribute(x)
87# define expect(expr,value) (expr)
88# define INLINE static
89#endif
90
91#define expect_false(expr) expect ((expr) != 0, 0)
92#define expect_true(expr) expect ((expr) != 0, 1)
93
94#define NOINLINE attribute ((noinline))
95
96#include "CoroAPI.h"
97#define GCoroAPI (&coroapi) /* very sneaky */
98
99#ifdef USE_ITHREADS
100# if CORO_PTHREAD
101static void *coro_thx;
102# endif
103#endif
104
105#ifdef __linux
106# include <time.h> /* for timespec */
107# include <syscall.h> /* for SYS_* */
108# ifdef SYS_clock_gettime
109# define coro_clock_gettime(id, ts) syscall (SYS_clock_gettime, (id), (ts))
110# define CORO_CLOCK_MONOTONIC 1
111# define CORO_CLOCK_THREAD_CPUTIME_ID 3
112# endif
113#endif
114
115static double (*nvtime)(); /* so why doesn't it take void? */
116static void (*u2time)(pTHX_ UV ret[2]);
117
118/* we hijack an hopefully unused CV flag for our purposes */
119#define CVf_SLF 0x4000
120static OP *pp_slf (pTHX);
121
122static U32 cctx_gen;
123static size_t cctx_stacksize = CORO_STACKSIZE;
124static struct CoroAPI coroapi;
125static AV *main_mainstack; /* used to differentiate between $main and others */
126static JMPENV *main_top_env;
127static HV *coro_state_stash, *coro_stash;
128static volatile SV *coro_mortal; /* will be freed/thrown after next transfer */
129
130static AV *av_destroy; /* destruction queue */
131static SV *sv_manager; /* the manager coro */
132static SV *sv_idle; /* $Coro::idle */
133
134static GV *irsgv; /* $/ */
135static GV *stdoutgv; /* *STDOUT */
136static SV *rv_diehook;
137static SV *rv_warnhook;
138static HV *hv_sig; /* %SIG */
139
140/* async_pool helper stuff */
141static SV *sv_pool_rss;
142static SV *sv_pool_size;
143static SV *sv_async_pool_idle; /* description string */
144static AV *av_async_pool; /* idle pool */
145static SV *sv_Coro; /* class string */
146static CV *cv_pool_handler;
147
148/* Coro::AnyEvent */
149static SV *sv_activity;
150
151/* enable processtime/realtime profiling */
152static char enable_times;
153typedef U32 coro_ts[2];
154static coro_ts time_real, time_cpu;
155static char times_valid;
156
157static struct coro_cctx *cctx_first;
158static int cctx_count, cctx_idle;
159
160enum {
161 CC_MAPPED = 0x01,
162 CC_NOREUSE = 0x02, /* throw this away after tracing */
163 CC_TRACE = 0x04,
164 CC_TRACE_SUB = 0x08, /* trace sub calls */
165 CC_TRACE_LINE = 0x10, /* trace each statement */
166 CC_TRACE_ALL = CC_TRACE_SUB | CC_TRACE_LINE,
167};
168
169/* this is a structure representing a c-level coroutine */
170typedef struct coro_cctx
171{
172 struct coro_cctx *next;
173
174 /* the stack */
175 void *sptr;
176 size_t ssize;
177
178 /* cpu state */
179 void *idle_sp; /* sp of top-level transfer/schedule/cede call */
180 JMPENV *idle_te; /* same as idle_sp, but for top_env, TODO: remove once stable */
181 JMPENV *top_env;
182 coro_context cctx;
183
184 U32 gen;
185#if CORO_USE_VALGRIND
186 int valgrind_id;
187#endif
188 unsigned char flags;
189} coro_cctx;
190
191coro_cctx *cctx_current; /* the currently running cctx */
192
193/*****************************************************************************/
194
195enum {
196 CF_RUNNING = 0x0001, /* coroutine is running */
197 CF_READY = 0x0002, /* coroutine is ready */
198 CF_NEW = 0x0004, /* has never been switched to */
199 CF_DESTROYED = 0x0008, /* coroutine data has been freed */
200 CF_SUSPENDED = 0x0010, /* coroutine can't be scheduled */
201};
202
203/* the structure where most of the perl state is stored, overlaid on the cxstack */
204typedef struct
205{
206 SV *defsv;
207 AV *defav;
208 SV *errsv;
209 SV *irsgv;
210 HV *hinthv;
211#define VAR(name,type) type name;
212# include "state.h"
213#undef VAR
214} perl_slots;
215
216#define SLOT_COUNT ((sizeof (perl_slots) + sizeof (PERL_CONTEXT) - 1) / sizeof (PERL_CONTEXT))
217
218/* this is a structure representing a perl-level coroutine */
11struct coro { 219struct coro {
12 U8 dowarn; 220 /* the C coroutine allocated to this perl coroutine, if any */
13 AV *defav; 221 coro_cctx *cctx;
14 222
15 PERL_SI *curstackinfo; 223 /* ready queue */
16 AV *curstack; 224 struct coro *next_ready;
225
226 /* state data */
227 struct CoroSLF slf_frame; /* saved slf frame */
17 AV *mainstack; 228 AV *mainstack;
18 SV **stack_sp; 229 perl_slots *slot; /* basically the saved sp */
19 OP *op;
20 SV **curpad;
21 SV **stack_base;
22 SV **stack_max;
23 SV **tmps_stack;
24 I32 tmps_floor;
25 I32 tmps_ix;
26 I32 tmps_max;
27 I32 *markstack;
28 I32 *markstack_ptr;
29 I32 *markstack_max;
30 I32 *scopestack;
31 I32 scopestack_ix;
32 I32 scopestack_max;
33 ANY *savestack;
34 I32 savestack_ix;
35 I32 savestack_max;
36 OP **retstack;
37 I32 retstack_ix;
38 I32 retstack_max;
39 COP *curcop;
40 230
41 AV *args; 231 CV *startcv; /* the CV to execute */
232 AV *args; /* data associated with this coroutine (initial args) */
233 int refcnt; /* coroutines are refcounted, yes */
234 int flags; /* CF_ flags */
235 HV *hv; /* the perl hash associated with this coro, if any */
236 void (*on_destroy)(pTHX_ struct coro *coro);
237
238 /* statistics */
239 int usecount; /* number of transfers to this coro */
240
241 /* coro process data */
242 int prio;
243 SV *except; /* exception to be thrown */
244 SV *rouse_cb;
245
246 /* async_pool */
247 SV *saved_deffh;
248 SV *invoke_cb;
249 AV *invoke_av;
250
251 /* on_enter/on_leave */
252 AV *on_enter;
253 AV *on_leave;
254
255 /* times */
256 coro_ts t_cpu, t_real;
257
258 /* linked list */
259 struct coro *next, *prev;
42}; 260};
43 261
44typedef struct coro *Coro__State; 262typedef struct coro *Coro__State;
45typedef struct coro *Coro__State_or_hashref; 263typedef struct coro *Coro__State_or_hashref;
46 264
47static HV *padlist_cache; 265/* the following variables are effectively part of the perl context */
266/* and get copied between struct coro and these variables */
267/* the mainr easonw e don't support windows process emulation */
268static struct CoroSLF slf_frame; /* the current slf frame */
48 269
49/* mostly copied from op.c:cv_clone2 */ 270/** Coro ********************************************************************/
50STATIC AV * 271
51clone_padlist (AV *protopadlist) 272#define CORO_PRIO_MAX 3
273#define CORO_PRIO_HIGH 1
274#define CORO_PRIO_NORMAL 0
275#define CORO_PRIO_LOW -1
276#define CORO_PRIO_IDLE -3
277#define CORO_PRIO_MIN -4
278
279/* for Coro.pm */
280static SV *coro_current;
281static SV *coro_readyhook;
282static struct coro *coro_ready [CORO_PRIO_MAX - CORO_PRIO_MIN + 1][2]; /* head|tail */
283static CV *cv_coro_run, *cv_coro_terminate;
284static struct coro *coro_first;
285#define coro_nready coroapi.nready
286
287/** Coro::Select ************************************************************/
288
289static OP *(*coro_old_pp_sselect) (pTHX);
290static SV *coro_select_select;
291
292/* horrible hack, but if it works... */
293static OP *
294coro_pp_sselect (pTHX)
52{ 295{
53 AV *av; 296 dSP;
54 I32 ix; 297 PUSHMARK (SP - 4); /* fake argument list */
55 AV *protopad_name = (AV *) * av_fetch (protopadlist, 0, FALSE); 298 XPUSHs (coro_select_select);
56 AV *protopad = (AV *) * av_fetch (protopadlist, 1, FALSE); 299 PUTBACK;
57 SV **pname = AvARRAY (protopad_name); 300
58 SV **ppad = AvARRAY (protopad); 301 /* entersub is an UNOP, select a LISTOP... keep your fingers crossed */
59 I32 fname = AvFILLp (protopad_name); 302 PL_op->op_flags |= OPf_STACKED;
60 I32 fpad = AvFILLp (protopad); 303 PL_op->op_private = 0;
304 return PL_ppaddr [OP_ENTERSUB](aTHX);
305}
306
307/** lowlevel stuff **********************************************************/
308
309static SV *
310coro_get_sv (pTHX_ const char *name, int create)
311{
312#if PERL_VERSION_ATLEAST (5,10,0)
313 /* silence stupid and wrong 5.10 warning that I am unable to switch off */
314 get_sv (name, create);
315#endif
316 return get_sv (name, create);
317}
318
319static AV *
320coro_get_av (pTHX_ const char *name, int create)
321{
322#if PERL_VERSION_ATLEAST (5,10,0)
323 /* silence stupid and wrong 5.10 warning that I am unable to switch off */
324 get_av (name, create);
325#endif
326 return get_av (name, create);
327}
328
329static HV *
330coro_get_hv (pTHX_ const char *name, int create)
331{
332#if PERL_VERSION_ATLEAST (5,10,0)
333 /* silence stupid and wrong 5.10 warning that I am unable to switch off */
334 get_hv (name, create);
335#endif
336 return get_hv (name, create);
337}
338
339INLINE void
340coro_times_update ()
341{
342#ifdef coro_clock_gettime
343 struct timespec ts;
344
345 ts.tv_sec = ts.tv_nsec = 0;
346 coro_clock_gettime (CORO_CLOCK_THREAD_CPUTIME_ID, &ts);
347 time_cpu [0] = ts.tv_sec; time_cpu [1] = ts.tv_nsec;
348
349 ts.tv_sec = ts.tv_nsec = 0;
350 coro_clock_gettime (CORO_CLOCK_MONOTONIC, &ts);
351 time_real [0] = ts.tv_sec; time_real [1] = ts.tv_nsec;
352#else
353 dTHX;
354 UV tv[2];
355
356 u2time (aTHX_ tv);
357 time_real [0] = tv [0];
358 time_real [1] = tv [1] * 1000;
359#endif
360}
361
362INLINE void
363coro_times_add (struct coro *c)
364{
365 c->t_real [1] += time_real [1];
366 if (c->t_real [1] > 1000000000) { c->t_real [1] -= 1000000000; ++c->t_real [0]; }
367 c->t_real [0] += time_real [0];
368
369 c->t_cpu [1] += time_cpu [1];
370 if (c->t_cpu [1] > 1000000000) { c->t_cpu [1] -= 1000000000; ++c->t_cpu [0]; }
371 c->t_cpu [0] += time_cpu [0];
372}
373
374INLINE void
375coro_times_sub (struct coro *c)
376{
377 if (c->t_real [1] < time_real [1]) { c->t_real [1] += 1000000000; --c->t_real [0]; }
378 c->t_real [1] -= time_real [1];
379 c->t_real [0] -= time_real [0];
380
381 if (c->t_cpu [1] < time_cpu [1]) { c->t_cpu [1] += 1000000000; --c->t_cpu [0]; }
382 c->t_cpu [1] -= time_cpu [1];
383 c->t_cpu [0] -= time_cpu [0];
384}
385
386/*****************************************************************************/
387/* magic glue */
388
389#define CORO_MAGIC_type_cv 26
390#define CORO_MAGIC_type_state PERL_MAGIC_ext
391
392#define CORO_MAGIC_NN(sv, type) \
393 (expect_true (SvMAGIC (sv)->mg_type == type) \
394 ? SvMAGIC (sv) \
395 : mg_find (sv, type))
396
397#define CORO_MAGIC(sv, type) \
398 (expect_true (SvMAGIC (sv)) \
399 ? CORO_MAGIC_NN (sv, type) \
400 : 0)
401
402#define CORO_MAGIC_cv(cv) CORO_MAGIC (((SV *)(cv)), CORO_MAGIC_type_cv)
403#define CORO_MAGIC_state(sv) CORO_MAGIC_NN (((SV *)(sv)), CORO_MAGIC_type_state)
404
405INLINE struct coro *
406SvSTATE_ (pTHX_ SV *coro)
407{
408 HV *stash;
409 MAGIC *mg;
410
411 if (SvROK (coro))
412 coro = SvRV (coro);
413
414 if (expect_false (SvTYPE (coro) != SVt_PVHV))
415 croak ("Coro::State object required");
416
417 stash = SvSTASH (coro);
418 if (expect_false (stash != coro_stash && stash != coro_state_stash))
419 {
420 /* very slow, but rare, check */
421 if (!sv_derived_from (sv_2mortal (newRV_inc (coro)), "Coro::State"))
422 croak ("Coro::State object required");
423 }
424
425 mg = CORO_MAGIC_state (coro);
426 return (struct coro *)mg->mg_ptr;
427}
428
429#define SvSTATE(sv) SvSTATE_ (aTHX_ (sv))
430
431/* faster than SvSTATE, but expects a coroutine hv */
432#define SvSTATE_hv(hv) ((struct coro *)CORO_MAGIC_NN ((SV *)hv, CORO_MAGIC_type_state)->mg_ptr)
433#define SvSTATE_current SvSTATE_hv (SvRV (coro_current))
434
435/*****************************************************************************/
436/* padlist management and caching */
437
438static AV *
439coro_derive_padlist (pTHX_ CV *cv)
440{
441 AV *padlist = CvPADLIST (cv);
61 AV *newpadlist, *newpad_name, *newpad; 442 AV *newpadlist, *newpad;
62 SV **npad;
63
64 newpad_name = newAV ();
65 for (ix = fname; ix >= 0; ix--)
66 av_store (newpad_name, ix, SvREFCNT_inc (pname[ix]));
67
68 newpad = newAV ();
69 av_fill (newpad, AvFILLp (protopad));
70 npad = AvARRAY (newpad);
71 443
72 newpadlist = newAV (); 444 newpadlist = newAV ();
73 AvREAL_off (newpadlist); 445 AvREAL_off (newpadlist);
74 av_store (newpadlist, 0, (SV *) newpad_name); 446#if PERL_VERSION_ATLEAST (5,10,0)
447 Perl_pad_push (aTHX_ padlist, AvFILLp (padlist) + 1);
448#else
449 Perl_pad_push (aTHX_ padlist, AvFILLp (padlist) + 1, 1);
450#endif
451 newpad = (AV *)AvARRAY (padlist)[AvFILLp (padlist)];
452 --AvFILLp (padlist);
453
454 av_store (newpadlist, 0, SvREFCNT_inc_NN (AvARRAY (padlist)[0]));
75 av_store (newpadlist, 1, (SV *) newpad); 455 av_store (newpadlist, 1, (SV *)newpad);
76 456
77 av = newAV (); /* will be @_ */ 457 return newpadlist;
78 av_extend (av, 0); 458}
79 av_store (newpad, 0, (SV *) av);
80 AvFLAGS (av) = AVf_REIFY;
81 459
82 for (ix = fpad; ix > 0; ix--) 460static void
461free_padlist (pTHX_ AV *padlist)
462{
463 /* may be during global destruction */
464 if (!IN_DESTRUCT)
83 { 465 {
84 SV *namesv = (ix <= fname) ? pname[ix] : Nullsv; 466 I32 i = AvFILLp (padlist);
85 if (namesv && namesv != &PL_sv_undef) 467
468 while (i > 0) /* special-case index 0 */
86 { 469 {
87 char *name = SvPVX (namesv); /* XXX */ 470 /* we try to be extra-careful here */
88 if (SvFLAGS (namesv) & SVf_FAKE || *name == '&') 471 AV *av = (AV *)AvARRAY (padlist)[i--];
89 { /* lexical from outside? */ 472 I32 j = AvFILLp (av);
90 npad[ix] = SvREFCNT_inc (ppad[ix]); 473
91 } 474 while (j >= 0)
92 else 475 SvREFCNT_dec (AvARRAY (av)[j--]);
93 { /* our own lexical */ 476
94 SV *sv; 477 AvFILLp (av) = -1;
95 if (*name == '&') 478 SvREFCNT_dec (av);
96 sv = SvREFCNT_inc (ppad[ix]);
97 else if (*name == '@')
98 sv = (SV *) newAV ();
99 else if (*name == '%')
100 sv = (SV *) newHV ();
101 else
102 sv = NEWSV (0, 0);
103 if (!SvPADBUSY (sv))
104 SvPADMY_on (sv);
105 npad[ix] = sv;
106 }
107 } 479 }
108 else if (IS_PADGV (ppad[ix]) || IS_PADCONST (ppad[ix])) 480
109 { 481 SvREFCNT_dec (AvARRAY (padlist)[0]);
110 npad[ix] = SvREFCNT_inc (ppad[ix]); 482
111 } 483 AvFILLp (padlist) = -1;
484 SvREFCNT_dec ((SV*)padlist);
485 }
486}
487
488static int
489coro_cv_free (pTHX_ SV *sv, MAGIC *mg)
490{
491 AV *padlist;
492 AV *av = (AV *)mg->mg_obj;
493
494 /* perl manages to free our internal AV and _then_ call us */
495 if (IN_DESTRUCT)
496 return 0;
497
498 /* casting is fun. */
499 while (&PL_sv_undef != (SV *)(padlist = (AV *)av_pop (av)))
500 free_padlist (aTHX_ padlist);
501
502 SvREFCNT_dec (av); /* sv_magicext increased the refcount */
503
504 return 0;
505}
506
507static MGVTBL coro_cv_vtbl = {
508 0, 0, 0, 0,
509 coro_cv_free
510};
511
512/* the next two functions merely cache the padlists */
513static void
514get_padlist (pTHX_ CV *cv)
515{
516 MAGIC *mg = CORO_MAGIC_cv (cv);
517 AV *av;
518
519 if (expect_true (mg && AvFILLp ((av = (AV *)mg->mg_obj)) >= 0))
520 CvPADLIST (cv) = (AV *)AvARRAY (av)[AvFILLp (av)--];
112 else 521 else
113 { 522 {
114 SV *sv = NEWSV (0, 0); 523#if CORO_PREFER_PERL_FUNCTIONS
115 SvPADTMP_on (sv); 524 /* this is probably cleaner? but also slower! */
116 npad[ix] = sv; 525 /* in practise, it seems to be less stable */
117 } 526 CV *cp = Perl_cv_clone (aTHX_ cv);
118 } 527 CvPADLIST (cv) = CvPADLIST (cp);
119 528 CvPADLIST (cp) = 0;
120#if 0 /* NONOTUNDERSTOOD */ 529 SvREFCNT_dec (cp);
121 /* Now that vars are all in place, clone nested closures. */ 530#else
122 531 CvPADLIST (cv) = coro_derive_padlist (aTHX_ cv);
123 for (ix = fpad; ix > 0; ix--) {
124 SV* namesv = (ix <= fname) ? pname[ix] : Nullsv;
125 if (namesv
126 && namesv != &PL_sv_undef
127 && !(SvFLAGS(namesv) & SVf_FAKE)
128 && *SvPVX(namesv) == '&'
129 && CvCLONE(ppad[ix]))
130 {
131 CV *kid = cv_clone((CV*)ppad[ix]);
132 SvREFCNT_dec(ppad[ix]);
133 CvCLONE_on(kid);
134 SvPADMY_on(kid);
135 npad[ix] = (SV*)kid;
136 }
137 }
138#endif 532#endif
139 533 }
140 return newpadlist;
141} 534}
142 535
143STATIC AV * 536static void
144free_padlist (AV *padlist) 537put_padlist (pTHX_ CV *cv)
145{ 538{
146 /* may be during global destruction */ 539 MAGIC *mg = CORO_MAGIC_cv (cv);
147 if (SvREFCNT(padlist)) 540 AV *av;
541
542 if (expect_false (!mg))
543 mg = sv_magicext ((SV *)cv, (SV *)newAV (), CORO_MAGIC_type_cv, &coro_cv_vtbl, 0, 0);
544
545 av = (AV *)mg->mg_obj;
546
547 if (expect_false (AvFILLp (av) >= AvMAX (av)))
548 av_extend (av, AvFILLp (av) + 1);
549
550 AvARRAY (av)[++AvFILLp (av)] = (SV *)CvPADLIST (cv);
551}
552
553/** load & save, init *******************************************************/
554
555static void
556on_enterleave_call (pTHX_ SV *cb);
557
558static void
559load_perl (pTHX_ Coro__State c)
560{
561 perl_slots *slot = c->slot;
562 c->slot = 0;
563
564 PL_mainstack = c->mainstack;
565
566 GvSV (PL_defgv) = slot->defsv;
567 GvAV (PL_defgv) = slot->defav;
568 GvSV (PL_errgv) = slot->errsv;
569 GvSV (irsgv) = slot->irsgv;
570 GvHV (PL_hintgv) = slot->hinthv;
571
572 #define VAR(name,type) PL_ ## name = slot->name;
573 # include "state.h"
574 #undef VAR
575
148 { 576 {
149 I32 i = AvFILLp(padlist); 577 dSP;
150 while (i >= 0) 578
579 CV *cv;
580
581 /* now do the ugly restore mess */
582 while (expect_true (cv = (CV *)POPs))
151 { 583 {
152 SV **svp = av_fetch(padlist, i--, FALSE); 584 put_padlist (aTHX_ cv); /* mark this padlist as available */
153 SV *sv = svp ? *svp : Nullsv; 585 CvDEPTH (cv) = PTR2IV (POPs);
154 if (sv) 586 CvPADLIST (cv) = (AV *)POPs;
155 SvREFCNT_dec(sv);
156 } 587 }
157 588
158 SvREFCNT_dec((SV*)padlist); 589 PUTBACK;
159 } 590 }
160}
161 591
162/* the next tow functions merely cache the padlists */ 592 slf_frame = c->slf_frame;
163STATIC void 593 CORO_THROW = c->except;
164get_padlist (CV *cv)
165{
166 SV **he = hv_fetch (padlist_cache, (void *)&cv, sizeof (CV *), 0);
167 594
168 if (he && AvFILLp ((AV *)*he) >= 0) 595 if (expect_false (enable_times))
169 CvPADLIST (cv) = (AV *)av_pop ((AV *)*he);
170 else
171 CvPADLIST (cv) = clone_padlist (CvPADLIST (cv));
172}
173
174STATIC void
175put_padlist (CV *cv)
176{
177 SV **he = hv_fetch (padlist_cache, (void *)&cv, sizeof (CV *), 1);
178
179 if (SvTYPE (*he) != SVt_PVAV)
180 {
181 SvREFCNT_dec (*he);
182 *he = (SV *)newAV ();
183 } 596 {
597 if (expect_false (!times_valid))
598 coro_times_update ();
184 599
185 av_push ((AV *)*he, (SV *)CvPADLIST (cv)); 600 coro_times_sub (c);
186} 601 }
187 602
603 if (expect_false (c->on_enter))
604 {
605 int i;
606
607 for (i = 0; i <= AvFILLp (c->on_enter); ++i)
608 on_enterleave_call (aTHX_ AvARRAY (c->on_enter)[i]);
609 }
610}
611
188static void 612static void
189SAVE(pTHX_ Coro__State c) 613save_perl (pTHX_ Coro__State c)
190{ 614{
615 if (expect_false (c->on_leave))
616 {
617 int i;
618
619 for (i = AvFILLp (c->on_leave); i >= 0; --i)
620 on_enterleave_call (aTHX_ AvARRAY (c->on_leave)[i]);
621 }
622
623 times_valid = 0;
624
625 if (expect_false (enable_times))
626 {
627 coro_times_update (); times_valid = 1;
628 coro_times_add (c);
629 }
630
631 c->except = CORO_THROW;
632 c->slf_frame = slf_frame;
633
191 { 634 {
192 dSP; 635 dSP;
193 I32 cxix = cxstack_ix; 636 I32 cxix = cxstack_ix;
637 PERL_CONTEXT *ccstk = cxstack;
194 PERL_SI *top_si = PL_curstackinfo; 638 PERL_SI *top_si = PL_curstackinfo;
195 PERL_CONTEXT *ccstk = cxstack;
196 639
197 /* 640 /*
198 * the worst thing you can imagine happens first - we have to save 641 * the worst thing you can imagine happens first - we have to save
199 * (and reinitialize) all cv's in the whole callchain :( 642 * (and reinitialize) all cv's in the whole callchain :(
200 */ 643 */
201 644
202 PUSHs (Nullsv); 645 XPUSHs (Nullsv);
203 /* this loop was inspired by pp_caller */ 646 /* this loop was inspired by pp_caller */
204 for (;;) 647 for (;;)
205 { 648 {
206 while (cxix >= 0) 649 while (expect_true (cxix >= 0))
207 { 650 {
208 PERL_CONTEXT *cx = &ccstk[cxix--]; 651 PERL_CONTEXT *cx = &ccstk[cxix--];
209 652
210 if (CxTYPE(cx) == CXt_SUB) 653 if (expect_true (CxTYPE (cx) == CXt_SUB) || expect_false (CxTYPE (cx) == CXt_FORMAT))
211 { 654 {
212 CV *cv = cx->blk_sub.cv; 655 CV *cv = cx->blk_sub.cv;
656
213 if (CvDEPTH(cv)) 657 if (expect_true (CvDEPTH (cv)))
214 { 658 {
215#ifdef USE_THREADS
216 XPUSHs ((SV *)CvOWNER(cv));
217#endif
218 EXTEND (SP, 3); 659 EXTEND (SP, 3);
219 PUSHs ((SV *)CvDEPTH(cv));
220 PUSHs ((SV *)CvPADLIST(cv)); 660 PUSHs ((SV *)CvPADLIST (cv));
661 PUSHs (INT2PTR (SV *, (IV)CvDEPTH (cv)));
221 PUSHs ((SV *)cv); 662 PUSHs ((SV *)cv);
222 663
223 get_padlist (cv);
224
225 CvDEPTH(cv) = 0; 664 CvDEPTH (cv) = 0;
226#ifdef USE_THREADS 665 get_padlist (aTHX_ cv);
227 CvOWNER(cv) = 0;
228 error must unlock this cv etc.. etc...
229 if you are here wondering about this error message then
230 the reason is that it will not work as advertised yet
231#endif
232 } 666 }
233 } 667 }
234 else if (CxTYPE(cx) == CXt_FORMAT) 668 }
669
670 if (expect_true (top_si->si_type == PERLSI_MAIN))
671 break;
672
673 top_si = top_si->si_prev;
674 ccstk = top_si->si_cxstack;
675 cxix = top_si->si_cxix;
676 }
677
678 PUTBACK;
679 }
680
681 /* allocate some space on the context stack for our purposes */
682 /* we manually unroll here, as usually 2 slots is enough */
683 if (SLOT_COUNT >= 1) CXINC;
684 if (SLOT_COUNT >= 2) CXINC;
685 if (SLOT_COUNT >= 3) CXINC;
686 {
687 unsigned int i;
688 for (i = 3; i < SLOT_COUNT; ++i)
689 CXINC;
690 }
691 cxstack_ix -= SLOT_COUNT; /* undo allocation */
692
693 c->mainstack = PL_mainstack;
694
695 {
696 perl_slots *slot = c->slot = (perl_slots *)(cxstack + cxstack_ix + 1);
697
698 slot->defav = GvAV (PL_defgv);
699 slot->defsv = DEFSV;
700 slot->errsv = ERRSV;
701 slot->irsgv = GvSV (irsgv);
702 slot->hinthv = GvHV (PL_hintgv);
703
704 #define VAR(name,type) slot->name = PL_ ## name;
705 # include "state.h"
706 #undef VAR
707 }
708}
709
710/*
711 * allocate various perl stacks. This is almost an exact copy
712 * of perl.c:init_stacks, except that it uses less memory
713 * on the (sometimes correct) assumption that coroutines do
714 * not usually need a lot of stackspace.
715 */
716#if CORO_PREFER_PERL_FUNCTIONS
717# define coro_init_stacks(thx) init_stacks ()
718#else
719static void
720coro_init_stacks (pTHX)
721{
722 PL_curstackinfo = new_stackinfo(32, 8);
723 PL_curstackinfo->si_type = PERLSI_MAIN;
724 PL_curstack = PL_curstackinfo->si_stack;
725 PL_mainstack = PL_curstack; /* remember in case we switch stacks */
726
727 PL_stack_base = AvARRAY(PL_curstack);
728 PL_stack_sp = PL_stack_base;
729 PL_stack_max = PL_stack_base + AvMAX(PL_curstack);
730
731 New(50,PL_tmps_stack,32,SV*);
732 PL_tmps_floor = -1;
733 PL_tmps_ix = -1;
734 PL_tmps_max = 32;
735
736 New(54,PL_markstack,16,I32);
737 PL_markstack_ptr = PL_markstack;
738 PL_markstack_max = PL_markstack + 16;
739
740#ifdef SET_MARK_OFFSET
741 SET_MARK_OFFSET;
742#endif
743
744 New(54,PL_scopestack,8,I32);
745 PL_scopestack_ix = 0;
746 PL_scopestack_max = 8;
747
748 New(54,PL_savestack,24,ANY);
749 PL_savestack_ix = 0;
750 PL_savestack_max = 24;
751
752#if !PERL_VERSION_ATLEAST (5,10,0)
753 New(54,PL_retstack,4,OP*);
754 PL_retstack_ix = 0;
755 PL_retstack_max = 4;
756#endif
757}
758#endif
759
760/*
761 * destroy the stacks, the callchain etc...
762 */
763static void
764coro_destruct_stacks (pTHX)
765{
766 while (PL_curstackinfo->si_next)
767 PL_curstackinfo = PL_curstackinfo->si_next;
768
769 while (PL_curstackinfo)
770 {
771 PERL_SI *p = PL_curstackinfo->si_prev;
772
773 if (!IN_DESTRUCT)
774 SvREFCNT_dec (PL_curstackinfo->si_stack);
775
776 Safefree (PL_curstackinfo->si_cxstack);
777 Safefree (PL_curstackinfo);
778 PL_curstackinfo = p;
779 }
780
781 Safefree (PL_tmps_stack);
782 Safefree (PL_markstack);
783 Safefree (PL_scopestack);
784 Safefree (PL_savestack);
785#if !PERL_VERSION_ATLEAST (5,10,0)
786 Safefree (PL_retstack);
787#endif
788}
789
790#define CORO_RSS \
791 rss += sizeof (SYM (curstackinfo)); \
792 rss += (SYM (curstackinfo->si_cxmax) + 1) * sizeof (PERL_CONTEXT); \
793 rss += sizeof (SV) + sizeof (struct xpvav) + (1 + AvMAX (SYM (curstack))) * sizeof (SV *); \
794 rss += SYM (tmps_max) * sizeof (SV *); \
795 rss += (SYM (markstack_max) - SYM (markstack_ptr)) * sizeof (I32); \
796 rss += SYM (scopestack_max) * sizeof (I32); \
797 rss += SYM (savestack_max) * sizeof (ANY);
798
799static size_t
800coro_rss (pTHX_ struct coro *coro)
801{
802 size_t rss = sizeof (*coro);
803
804 if (coro->mainstack)
805 {
806 if (coro->flags & CF_RUNNING)
807 {
808 #define SYM(sym) PL_ ## sym
809 CORO_RSS;
810 #undef SYM
811 }
812 else
813 {
814 #define SYM(sym) coro->slot->sym
815 CORO_RSS;
816 #undef SYM
817 }
818 }
819
820 return rss;
821}
822
823/** coroutine stack handling ************************************************/
824
825static int (*orig_sigelem_get) (pTHX_ SV *sv, MAGIC *mg);
826static int (*orig_sigelem_set) (pTHX_ SV *sv, MAGIC *mg);
827static int (*orig_sigelem_clr) (pTHX_ SV *sv, MAGIC *mg);
828
829/* apparently < 5.8.8 */
830#ifndef MgPV_nolen_const
831#define MgPV_nolen_const(mg) (((((int)(mg)->mg_len)) == HEf_SVKEY) ? \
832 SvPV_nolen((SV*)((mg)->mg_ptr)) : \
833 (const char*)(mg)->mg_ptr)
834#endif
835
836/*
837 * This overrides the default magic get method of %SIG elements.
838 * The original one doesn't provide for reading back of PL_diehook/PL_warnhook
839 * and instead of trying to save and restore the hash elements, we just provide
840 * readback here.
841 */
842static int
843coro_sigelem_get (pTHX_ SV *sv, MAGIC *mg)
844{
845 const char *s = MgPV_nolen_const (mg);
846
847 if (*s == '_')
848 {
849 SV **svp = 0;
850
851 if (strEQ (s, "__DIE__" )) svp = &PL_diehook;
852 if (strEQ (s, "__WARN__")) svp = &PL_warnhook;
853
854 if (svp)
855 {
856 sv_setsv (sv, *svp ? *svp : &PL_sv_undef);
857 return 0;
858 }
859 }
860
861 return orig_sigelem_get ? orig_sigelem_get (aTHX_ sv, mg) : 0;
862}
863
864static int
865coro_sigelem_clr (pTHX_ SV *sv, MAGIC *mg)
866{
867 const char *s = MgPV_nolen_const (mg);
868
869 if (*s == '_')
870 {
871 SV **svp = 0;
872
873 if (strEQ (s, "__DIE__" )) svp = &PL_diehook;
874 if (strEQ (s, "__WARN__")) svp = &PL_warnhook;
875
876 if (svp)
877 {
878 SV *old = *svp;
879 *svp = 0;
880 SvREFCNT_dec (old);
881 return 0;
882 }
883 }
884
885 return orig_sigelem_clr ? orig_sigelem_clr (aTHX_ sv, mg) : 0;
886}
887
888static int
889coro_sigelem_set (pTHX_ SV *sv, MAGIC *mg)
890{
891 const char *s = MgPV_nolen_const (mg);
892
893 if (*s == '_')
894 {
895 SV **svp = 0;
896
897 if (strEQ (s, "__DIE__" )) svp = &PL_diehook;
898 if (strEQ (s, "__WARN__")) svp = &PL_warnhook;
899
900 if (svp)
901 {
902 SV *old = *svp;
903 *svp = SvOK (sv) ? newSVsv (sv) : 0;
904 SvREFCNT_dec (old);
905 return 0;
906 }
907 }
908
909 return orig_sigelem_set ? orig_sigelem_set (aTHX_ sv, mg) : 0;
910}
911
912static void
913prepare_nop (pTHX_ struct coro_transfer_args *ta)
914{
915 /* kind of mega-hacky, but works */
916 ta->next = ta->prev = (struct coro *)ta;
917}
918
919static int
920slf_check_nop (pTHX_ struct CoroSLF *frame)
921{
922 return 0;
923}
924
925static int
926slf_check_repeat (pTHX_ struct CoroSLF *frame)
927{
928 return 1;
929}
930
931static UNOP coro_setup_op;
932
933static void NOINLINE /* noinline to keep it out of the transfer fast path */
934coro_setup (pTHX_ struct coro *coro)
935{
936 /*
937 * emulate part of the perl startup here.
938 */
939 coro_init_stacks (aTHX);
940
941 PL_runops = RUNOPS_DEFAULT;
942 PL_curcop = &PL_compiling;
943 PL_in_eval = EVAL_NULL;
944 PL_comppad = 0;
945 PL_comppad_name = 0;
946 PL_comppad_name_fill = 0;
947 PL_comppad_name_floor = 0;
948 PL_curpm = 0;
949 PL_curpad = 0;
950 PL_localizing = 0;
951 PL_dirty = 0;
952 PL_restartop = 0;
953#if PERL_VERSION_ATLEAST (5,10,0)
954 PL_parser = 0;
955#endif
956 PL_hints = 0;
957
958 /* recreate the die/warn hooks */
959 PL_diehook = 0; SvSetMagicSV (*hv_fetch (hv_sig, "__DIE__" , sizeof ("__DIE__" ) - 1, 1), rv_diehook );
960 PL_warnhook = 0; SvSetMagicSV (*hv_fetch (hv_sig, "__WARN__", sizeof ("__WARN__") - 1, 1), rv_warnhook);
961
962 GvSV (PL_defgv) = newSV (0);
963 GvAV (PL_defgv) = coro->args; coro->args = 0;
964 GvSV (PL_errgv) = newSV (0);
965 GvSV (irsgv) = newSVpvn ("\n", 1); sv_magic (GvSV (irsgv), (SV *)irsgv, PERL_MAGIC_sv, "/", 0);
966 GvHV (PL_hintgv) = 0;
967 PL_rs = newSVsv (GvSV (irsgv));
968 PL_defoutgv = (GV *)SvREFCNT_inc_NN (stdoutgv);
969
970 {
971 dSP;
972 UNOP myop;
973
974 Zero (&myop, 1, UNOP);
975 myop.op_next = Nullop;
976 myop.op_type = OP_ENTERSUB;
977 myop.op_flags = OPf_WANT_VOID;
978
979 PUSHMARK (SP);
980 PUSHs ((SV *)coro->startcv);
981 PUTBACK;
982 PL_op = (OP *)&myop;
983 PL_op = PL_ppaddr[OP_ENTERSUB](aTHX);
984 }
985
986 /* this newly created coroutine might be run on an existing cctx which most
987 * likely was suspended in pp_slf, so we have to emulate entering pp_slf here.
988 */
989 slf_frame.prepare = prepare_nop; /* provide a nop function for an eventual pp_slf */
990 slf_frame.check = slf_check_nop; /* signal pp_slf to not repeat */
991
992 /* and we have to provide the pp_slf op in any case, so pp_slf can skip it */
993 coro_setup_op.op_next = PL_op;
994 coro_setup_op.op_type = OP_ENTERSUB;
995 coro_setup_op.op_ppaddr = pp_slf;
996 /* no flags etc. required, as an init function won't be called */
997
998 PL_op = (OP *)&coro_setup_op;
999
1000 /* copy throw, in case it was set before coro_setup */
1001 CORO_THROW = coro->except;
1002
1003 if (expect_false (enable_times))
1004 {
1005 coro_times_update ();
1006 coro_times_sub (coro);
1007 }
1008}
1009
1010static void
1011coro_unwind_stacks (pTHX)
1012{
1013 if (!IN_DESTRUCT)
1014 {
1015 /* restore all saved variables and stuff */
1016 LEAVE_SCOPE (0);
1017 assert (PL_tmps_floor == -1);
1018
1019 /* free all temporaries */
1020 FREETMPS;
1021 assert (PL_tmps_ix == -1);
1022
1023 /* unwind all extra stacks */
1024 POPSTACK_TO (PL_mainstack);
1025
1026 /* unwind main stack */
1027 dounwind (-1);
1028 }
1029}
1030
1031static void
1032coro_destruct_perl (pTHX_ struct coro *coro)
1033{
1034 SV *svf [9];
1035
1036 {
1037 struct coro *current = SvSTATE_current;
1038
1039 assert (("FATAL: tried to destroy currently running coroutine", coro->mainstack != PL_mainstack));
1040
1041 save_perl (aTHX_ current);
1042 load_perl (aTHX_ coro);
1043
1044 coro_unwind_stacks (aTHX);
1045 coro_destruct_stacks (aTHX);
1046
1047 // now save some sv's to be free'd later
1048 svf [0] = GvSV (PL_defgv);
1049 svf [1] = (SV *)GvAV (PL_defgv);
1050 svf [2] = GvSV (PL_errgv);
1051 svf [3] = (SV *)PL_defoutgv;
1052 svf [4] = PL_rs;
1053 svf [5] = GvSV (irsgv);
1054 svf [6] = (SV *)GvHV (PL_hintgv);
1055 svf [7] = PL_diehook;
1056 svf [8] = PL_warnhook;
1057 assert (9 == sizeof (svf) / sizeof (*svf));
1058
1059 load_perl (aTHX_ current);
1060 }
1061
1062 {
1063 unsigned int i;
1064
1065 for (i = 0; i < sizeof (svf) / sizeof (*svf); ++i)
1066 SvREFCNT_dec (svf [i]);
1067
1068 SvREFCNT_dec (coro->saved_deffh);
1069 SvREFCNT_dec (coro->rouse_cb);
1070 SvREFCNT_dec (coro->invoke_cb);
1071 SvREFCNT_dec (coro->invoke_av);
1072 }
1073}
1074
1075INLINE void
1076free_coro_mortal (pTHX)
1077{
1078 if (expect_true (coro_mortal))
1079 {
1080 SvREFCNT_dec (coro_mortal);
1081 coro_mortal = 0;
1082 }
1083}
1084
1085static int
1086runops_trace (pTHX)
1087{
1088 COP *oldcop = 0;
1089 int oldcxix = -2;
1090
1091 while ((PL_op = CALL_FPTR (PL_op->op_ppaddr) (aTHX)))
1092 {
1093 PERL_ASYNC_CHECK ();
1094
1095 if (cctx_current->flags & CC_TRACE_ALL)
1096 {
1097 if (PL_op->op_type == OP_LEAVESUB && cctx_current->flags & CC_TRACE_SUB)
1098 {
1099 PERL_CONTEXT *cx = &cxstack[cxstack_ix];
1100 SV **bot, **top;
1101 AV *av = newAV (); /* return values */
1102 SV **cb;
1103 dSP;
1104
1105 GV *gv = CvGV (cx->blk_sub.cv);
1106 SV *fullname = sv_2mortal (newSV (0));
1107 if (isGV (gv))
1108 gv_efullname3 (fullname, gv, 0);
1109
1110 bot = PL_stack_base + cx->blk_oldsp + 1;
1111 top = cx->blk_gimme == G_ARRAY ? SP + 1
1112 : cx->blk_gimme == G_SCALAR ? bot + 1
1113 : bot;
1114
1115 av_extend (av, top - bot);
1116 while (bot < top)
1117 av_push (av, SvREFCNT_inc_NN (*bot++));
1118
1119 PL_runops = RUNOPS_DEFAULT;
1120 ENTER;
1121 SAVETMPS;
1122 EXTEND (SP, 3);
1123 PUSHMARK (SP);
1124 PUSHs (&PL_sv_no);
1125 PUSHs (fullname);
1126 PUSHs (sv_2mortal (newRV_noinc ((SV *)av)));
1127 PUTBACK;
1128 cb = hv_fetch ((HV *)SvRV (coro_current), "_trace_sub_cb", sizeof ("_trace_sub_cb") - 1, 0);
1129 if (cb) call_sv (*cb, G_KEEPERR | G_EVAL | G_VOID | G_DISCARD);
1130 SPAGAIN;
1131 FREETMPS;
1132 LEAVE;
1133 PL_runops = runops_trace;
1134 }
1135
1136 if (oldcop != PL_curcop)
1137 {
1138 oldcop = PL_curcop;
1139
1140 if (PL_curcop != &PL_compiling)
1141 {
1142 SV **cb;
1143
1144 if (oldcxix != cxstack_ix && cctx_current->flags & CC_TRACE_SUB)
1145 {
1146 PERL_CONTEXT *cx = &cxstack[cxstack_ix];
1147
1148 if (CxTYPE (cx) == CXt_SUB && oldcxix < cxstack_ix)
1149 {
1150 dSP;
1151 GV *gv = CvGV (cx->blk_sub.cv);
1152 SV *fullname = sv_2mortal (newSV (0));
1153
1154 if (isGV (gv))
1155 gv_efullname3 (fullname, gv, 0);
1156
1157 PL_runops = RUNOPS_DEFAULT;
1158 ENTER;
1159 SAVETMPS;
1160 EXTEND (SP, 3);
1161 PUSHMARK (SP);
1162 PUSHs (&PL_sv_yes);
1163 PUSHs (fullname);
1164 PUSHs (CxHASARGS (cx) ? sv_2mortal (newRV_inc ((SV *)cx->blk_sub.argarray)) : &PL_sv_undef);
1165 PUTBACK;
1166 cb = hv_fetch ((HV *)SvRV (coro_current), "_trace_sub_cb", sizeof ("_trace_sub_cb") - 1, 0);
1167 if (cb) call_sv (*cb, G_KEEPERR | G_EVAL | G_VOID | G_DISCARD);
1168 SPAGAIN;
1169 FREETMPS;
1170 LEAVE;
1171 PL_runops = runops_trace;
1172 }
1173
1174 oldcxix = cxstack_ix;
1175 }
1176
1177 if (cctx_current->flags & CC_TRACE_LINE)
1178 {
1179 dSP;
1180
1181 PL_runops = RUNOPS_DEFAULT;
1182 ENTER;
1183 SAVETMPS;
1184 EXTEND (SP, 3);
1185 PL_runops = RUNOPS_DEFAULT;
1186 PUSHMARK (SP);
1187 PUSHs (sv_2mortal (newSVpv (OutCopFILE (oldcop), 0)));
1188 PUSHs (sv_2mortal (newSViv (CopLINE (oldcop))));
1189 PUTBACK;
1190 cb = hv_fetch ((HV *)SvRV (coro_current), "_trace_line_cb", sizeof ("_trace_line_cb") - 1, 0);
1191 if (cb) call_sv (*cb, G_KEEPERR | G_EVAL | G_VOID | G_DISCARD);
1192 SPAGAIN;
1193 FREETMPS;
1194 LEAVE;
1195 PL_runops = runops_trace;
1196 }
1197 }
1198 }
1199 }
1200 }
1201
1202 TAINT_NOT;
1203 return 0;
1204}
1205
1206static struct CoroSLF cctx_ssl_frame;
1207
1208static void
1209slf_prepare_set_stacklevel (pTHX_ struct coro_transfer_args *ta)
1210{
1211 ta->prev = 0;
1212}
1213
1214static int
1215slf_check_set_stacklevel (pTHX_ struct CoroSLF *frame)
1216{
1217 *frame = cctx_ssl_frame;
1218
1219 return frame->check (aTHX_ frame); /* execute the restored frame - there must be one */
1220}
1221
1222/* initialises PL_top_env and injects a pseudo-slf-call to set the stacklevel */
1223static void NOINLINE
1224cctx_prepare (pTHX)
1225{
1226 PL_top_env = &PL_start_env;
1227
1228 if (cctx_current->flags & CC_TRACE)
1229 PL_runops = runops_trace;
1230
1231 /* we already must be executing an SLF op, there is no other valid way
1232 * that can lead to creation of a new cctx */
1233 assert (("FATAL: can't prepare slf-less cctx in Coro module (please report)",
1234 slf_frame.prepare && PL_op->op_ppaddr == pp_slf));
1235
1236 /* we must emulate leaving pp_slf, which is done inside slf_check_set_stacklevel */
1237 cctx_ssl_frame = slf_frame;
1238
1239 slf_frame.prepare = slf_prepare_set_stacklevel;
1240 slf_frame.check = slf_check_set_stacklevel;
1241}
1242
1243/* the tail of transfer: execute stuff we can only do after a transfer */
1244INLINE void
1245transfer_tail (pTHX)
1246{
1247 free_coro_mortal (aTHX);
1248}
1249
1250/*
1251 * this is a _very_ stripped down perl interpreter ;)
1252 */
1253static void
1254cctx_run (void *arg)
1255{
1256#ifdef USE_ITHREADS
1257# if CORO_PTHREAD
1258 PERL_SET_CONTEXT (coro_thx);
1259# endif
1260#endif
1261 {
1262 dTHX;
1263
1264 /* normally we would need to skip the entersub here */
1265 /* not doing so will re-execute it, which is exactly what we want */
1266 /* PL_nop = PL_nop->op_next */
1267
1268 /* inject a fake subroutine call to cctx_init */
1269 cctx_prepare (aTHX);
1270
1271 /* cctx_run is the alternative tail of transfer() */
1272 transfer_tail (aTHX);
1273
1274 /* somebody or something will hit me for both perl_run and PL_restartop */
1275 PL_restartop = PL_op;
1276 perl_run (PL_curinterp);
1277 /*
1278 * Unfortunately, there is no way to get at the return values of the
1279 * coro body here, as perl_run destroys these. Likewise, we cannot catch
1280 * runtime errors here, as this is just a random interpreter, not a thread.
1281 */
1282
1283 /*
1284 * If perl-run returns we assume exit() was being called or the coro
1285 * fell off the end, which seems to be the only valid (non-bug)
1286 * reason for perl_run to return. We try to exit by jumping to the
1287 * bootstrap-time "top" top_env, as we cannot restore the "main"
1288 * coroutine as Coro has no such concept.
1289 * This actually isn't valid with the pthread backend, but OSes requiring
1290 * that backend are too broken to do it in a standards-compliant way.
1291 */
1292 PL_top_env = main_top_env;
1293 JMPENV_JUMP (2); /* I do not feel well about the hardcoded 2 at all */
1294 }
1295}
1296
1297static coro_cctx *
1298cctx_new ()
1299{
1300 coro_cctx *cctx;
1301
1302 ++cctx_count;
1303 New (0, cctx, 1, coro_cctx);
1304
1305 cctx->gen = cctx_gen;
1306 cctx->flags = 0;
1307 cctx->idle_sp = 0; /* can be accessed by transfer between cctx_run and set_stacklevel, on throw */
1308
1309 return cctx;
1310}
1311
1312/* create a new cctx only suitable as source */
1313static coro_cctx *
1314cctx_new_empty ()
1315{
1316 coro_cctx *cctx = cctx_new ();
1317
1318 cctx->sptr = 0;
1319 coro_create (&cctx->cctx, 0, 0, 0, 0);
1320
1321 return cctx;
1322}
1323
1324/* create a new cctx suitable as destination/running a perl interpreter */
1325static coro_cctx *
1326cctx_new_run ()
1327{
1328 coro_cctx *cctx = cctx_new ();
1329 void *stack_start;
1330 size_t stack_size;
1331
1332#if HAVE_MMAP
1333 cctx->ssize = ((cctx_stacksize * sizeof (long) + PAGESIZE - 1) / PAGESIZE + CORO_STACKGUARD) * PAGESIZE;
1334 /* mmap supposedly does allocate-on-write for us */
1335 cctx->sptr = mmap (0, cctx->ssize, PROT_EXEC|PROT_READ|PROT_WRITE, MAP_PRIVATE|MAP_ANONYMOUS, 0, 0);
1336
1337 if (cctx->sptr != (void *)-1)
1338 {
1339 #if CORO_STACKGUARD
1340 mprotect (cctx->sptr, CORO_STACKGUARD * PAGESIZE, PROT_NONE);
1341 #endif
1342 stack_start = (char *)cctx->sptr + CORO_STACKGUARD * PAGESIZE;
1343 stack_size = cctx->ssize - CORO_STACKGUARD * PAGESIZE;
1344 cctx->flags |= CC_MAPPED;
1345 }
1346 else
1347#endif
1348 {
1349 cctx->ssize = cctx_stacksize * (long)sizeof (long);
1350 New (0, cctx->sptr, cctx_stacksize, long);
1351
1352 if (!cctx->sptr)
1353 {
1354 perror ("FATAL: unable to allocate stack for coroutine, exiting.");
1355 _exit (EXIT_FAILURE);
1356 }
1357
1358 stack_start = cctx->sptr;
1359 stack_size = cctx->ssize;
1360 }
1361
1362 #if CORO_USE_VALGRIND
1363 cctx->valgrind_id = VALGRIND_STACK_REGISTER ((char *)stack_start, (char *)stack_start + stack_size);
1364 #endif
1365
1366 coro_create (&cctx->cctx, cctx_run, (void *)cctx, stack_start, stack_size);
1367
1368 return cctx;
1369}
1370
1371static void
1372cctx_destroy (coro_cctx *cctx)
1373{
1374 if (!cctx)
1375 return;
1376
1377 assert (("FATAL: tried to destroy current cctx", cctx != cctx_current));//D temporary?
1378
1379 --cctx_count;
1380 coro_destroy (&cctx->cctx);
1381
1382 /* coro_transfer creates new, empty cctx's */
1383 if (cctx->sptr)
1384 {
1385 #if CORO_USE_VALGRIND
1386 VALGRIND_STACK_DEREGISTER (cctx->valgrind_id);
1387 #endif
1388
1389#if HAVE_MMAP
1390 if (cctx->flags & CC_MAPPED)
1391 munmap (cctx->sptr, cctx->ssize);
1392 else
1393#endif
1394 Safefree (cctx->sptr);
1395 }
1396
1397 Safefree (cctx);
1398}
1399
1400/* wether this cctx should be destructed */
1401#define CCTX_EXPIRED(cctx) ((cctx)->gen != cctx_gen || ((cctx)->flags & CC_NOREUSE))
1402
1403static coro_cctx *
1404cctx_get (pTHX)
1405{
1406 while (expect_true (cctx_first))
1407 {
1408 coro_cctx *cctx = cctx_first;
1409 cctx_first = cctx->next;
1410 --cctx_idle;
1411
1412 if (expect_true (!CCTX_EXPIRED (cctx)))
1413 return cctx;
1414
1415 cctx_destroy (cctx);
1416 }
1417
1418 return cctx_new_run ();
1419}
1420
1421static void
1422cctx_put (coro_cctx *cctx)
1423{
1424 assert (("FATAL: cctx_put called on non-initialised cctx in Coro (please report)", cctx->sptr));
1425
1426 /* free another cctx if overlimit */
1427 if (expect_false (cctx_idle >= cctx_max_idle))
1428 {
1429 coro_cctx *first = cctx_first;
1430 cctx_first = first->next;
1431 --cctx_idle;
1432
1433 cctx_destroy (first);
1434 }
1435
1436 ++cctx_idle;
1437 cctx->next = cctx_first;
1438 cctx_first = cctx;
1439}
1440
1441/** coroutine switching *****************************************************/
1442
1443static void
1444transfer_check (pTHX_ struct coro *prev, struct coro *next)
1445{
1446 /* TODO: throwing up here is considered harmful */
1447
1448 if (expect_true (prev != next))
1449 {
1450 if (expect_false (!(prev->flags & (CF_RUNNING | CF_NEW))))
1451 croak ("Coro::State::transfer called with a blocked prev Coro::State, but can only transfer from running or new states,");
1452
1453 if (expect_false (next->flags & (CF_RUNNING | CF_DESTROYED | CF_SUSPENDED)))
1454 croak ("Coro::State::transfer called with running, destroyed or suspended next Coro::State, but can only transfer to inactive states,");
1455
1456#if !PERL_VERSION_ATLEAST (5,10,0)
1457 if (expect_false (PL_lex_state != LEX_NOTPARSING))
1458 croak ("Coro::State::transfer called while parsing, but this is not supported in your perl version,");
1459#endif
1460 }
1461}
1462
1463/* always use the TRANSFER macro */
1464static void NOINLINE /* noinline so we have a fixed stackframe */
1465transfer (pTHX_ struct coro *prev, struct coro *next, int force_cctx)
1466{
1467 dSTACKLEVEL;
1468
1469 /* sometimes transfer is only called to set idle_sp */
1470 if (expect_false (!prev))
1471 {
1472 cctx_current->idle_sp = STACKLEVEL;
1473 assert (cctx_current->idle_te = PL_top_env); /* just for the side-effect when asserts are enabled */
1474 }
1475 else if (expect_true (prev != next))
1476 {
1477 coro_cctx *cctx_prev;
1478
1479 if (expect_false (prev->flags & CF_NEW))
1480 {
1481 /* create a new empty/source context */
1482 prev->flags &= ~CF_NEW;
1483 prev->flags |= CF_RUNNING;
1484 }
1485
1486 prev->flags &= ~CF_RUNNING;
1487 next->flags |= CF_RUNNING;
1488
1489 /* first get rid of the old state */
1490 save_perl (aTHX_ prev);
1491
1492 if (expect_false (next->flags & CF_NEW))
1493 {
1494 /* need to start coroutine */
1495 next->flags &= ~CF_NEW;
1496 /* setup coroutine call */
1497 coro_setup (aTHX_ next);
1498 }
1499 else
1500 load_perl (aTHX_ next);
1501
1502 /* possibly untie and reuse the cctx */
1503 if (expect_true (
1504 cctx_current->idle_sp == STACKLEVEL
1505 && !(cctx_current->flags & CC_TRACE)
1506 && !force_cctx
1507 ))
1508 {
1509 /* I assume that stacklevel is a stronger indicator than PL_top_env changes */
1510 assert (("FATAL: current top_env must equal previous top_env in Coro (please report)", PL_top_env == cctx_current->idle_te));
1511
1512 /* if the cctx is about to be destroyed we need to make sure we won't see it in cctx_get. */
1513 /* without this the next cctx_get might destroy the running cctx while still in use */
1514 if (expect_false (CCTX_EXPIRED (cctx_current)))
1515 if (expect_true (!next->cctx))
1516 next->cctx = cctx_get (aTHX);
1517
1518 cctx_put (cctx_current);
1519 }
1520 else
1521 prev->cctx = cctx_current;
1522
1523 ++next->usecount;
1524
1525 cctx_prev = cctx_current;
1526 cctx_current = expect_false (next->cctx) ? next->cctx : cctx_get (aTHX);
1527
1528 next->cctx = 0;
1529
1530 if (expect_false (cctx_prev != cctx_current))
1531 {
1532 cctx_prev->top_env = PL_top_env;
1533 PL_top_env = cctx_current->top_env;
1534 coro_transfer (&cctx_prev->cctx, &cctx_current->cctx);
1535 }
1536
1537 transfer_tail (aTHX);
1538 }
1539}
1540
1541#define TRANSFER(ta, force_cctx) transfer (aTHX_ (ta).prev, (ta).next, (force_cctx))
1542#define TRANSFER_CHECK(ta) transfer_check (aTHX_ (ta).prev, (ta).next)
1543
1544/** high level stuff ********************************************************/
1545
1546static int
1547coro_state_destroy (pTHX_ struct coro *coro)
1548{
1549 if (coro->flags & CF_DESTROYED)
1550 return 0;
1551
1552 if (coro->on_destroy && !PL_dirty)
1553 coro->on_destroy (aTHX_ coro);
1554
1555 coro->flags |= CF_DESTROYED;
1556
1557 if (coro->flags & CF_READY)
1558 {
1559 /* reduce nready, as destroying a ready coro effectively unreadies it */
1560 /* alternative: look through all ready queues and remove the coro */
1561 --coro_nready;
1562 }
1563 else
1564 coro->flags |= CF_READY; /* make sure it is NOT put into the readyqueue */
1565
1566 if (coro->mainstack
1567 && coro->mainstack != main_mainstack
1568 && coro->slot
1569 && !PL_dirty)
1570 coro_destruct_perl (aTHX_ coro);
1571
1572 cctx_destroy (coro->cctx);
1573 SvREFCNT_dec (coro->startcv);
1574 SvREFCNT_dec (coro->args);
1575 SvREFCNT_dec (CORO_THROW);
1576
1577 if (coro->next) coro->next->prev = coro->prev;
1578 if (coro->prev) coro->prev->next = coro->next;
1579 if (coro == coro_first) coro_first = coro->next;
1580
1581 return 1;
1582}
1583
1584static int
1585coro_state_free (pTHX_ SV *sv, MAGIC *mg)
1586{
1587 struct coro *coro = (struct coro *)mg->mg_ptr;
1588 mg->mg_ptr = 0;
1589
1590 coro->hv = 0;
1591
1592 if (--coro->refcnt < 0)
1593 {
1594 coro_state_destroy (aTHX_ coro);
1595 Safefree (coro);
1596 }
1597
1598 return 0;
1599}
1600
1601static int
1602coro_state_dup (pTHX_ MAGIC *mg, CLONE_PARAMS *params)
1603{
1604 struct coro *coro = (struct coro *)mg->mg_ptr;
1605
1606 ++coro->refcnt;
1607
1608 return 0;
1609}
1610
1611static MGVTBL coro_state_vtbl = {
1612 0, 0, 0, 0,
1613 coro_state_free,
1614 0,
1615#ifdef MGf_DUP
1616 coro_state_dup,
1617#else
1618# define MGf_DUP 0
1619#endif
1620};
1621
1622static void
1623prepare_transfer (pTHX_ struct coro_transfer_args *ta, SV *prev_sv, SV *next_sv)
1624{
1625 ta->prev = SvSTATE (prev_sv);
1626 ta->next = SvSTATE (next_sv);
1627 TRANSFER_CHECK (*ta);
1628}
1629
1630static void
1631api_transfer (pTHX_ SV *prev_sv, SV *next_sv)
1632{
1633 struct coro_transfer_args ta;
1634
1635 prepare_transfer (aTHX_ &ta, prev_sv, next_sv);
1636 TRANSFER (ta, 1);
1637}
1638
1639/** Coro ********************************************************************/
1640
1641INLINE void
1642coro_enq (pTHX_ struct coro *coro)
1643{
1644 struct coro **ready = coro_ready [coro->prio - CORO_PRIO_MIN];
1645
1646 SvREFCNT_inc_NN (coro->hv);
1647
1648 coro->next_ready = 0;
1649 *(ready [0] ? &ready [1]->next_ready : &ready [0]) = coro;
1650 ready [1] = coro;
1651}
1652
1653INLINE struct coro *
1654coro_deq (pTHX)
1655{
1656 int prio;
1657
1658 for (prio = CORO_PRIO_MAX - CORO_PRIO_MIN + 1; --prio >= 0; )
1659 {
1660 struct coro **ready = coro_ready [prio];
1661
1662 if (ready [0])
1663 {
1664 struct coro *coro = ready [0];
1665 ready [0] = coro->next_ready;
1666 return coro;
1667 }
1668 }
1669
1670 return 0;
1671}
1672
1673static void
1674invoke_sv_ready_hook_helper (void)
1675{
1676 dTHX;
1677 dSP;
1678
1679 ENTER;
1680 SAVETMPS;
1681
1682 PUSHMARK (SP);
1683 PUTBACK;
1684 call_sv (coro_readyhook, G_VOID | G_DISCARD);
1685
1686 FREETMPS;
1687 LEAVE;
1688}
1689
1690static int
1691api_ready (pTHX_ SV *coro_sv)
1692{
1693 struct coro *coro = SvSTATE (coro_sv);
1694
1695 if (coro->flags & CF_READY)
1696 return 0;
1697
1698 coro->flags |= CF_READY;
1699
1700 coro_enq (aTHX_ coro);
1701
1702 if (!coro_nready++)
1703 if (coroapi.readyhook)
1704 coroapi.readyhook ();
1705
1706 return 1;
1707}
1708
1709static int
1710api_is_ready (pTHX_ SV *coro_sv)
1711{
1712 return !!(SvSTATE (coro_sv)->flags & CF_READY);
1713}
1714
1715/* expects to own a reference to next->hv */
1716INLINE void
1717prepare_schedule_to (pTHX_ struct coro_transfer_args *ta, struct coro *next)
1718{
1719 SV *prev_sv = SvRV (coro_current);
1720
1721 ta->prev = SvSTATE_hv (prev_sv);
1722 ta->next = next;
1723
1724 TRANSFER_CHECK (*ta);
1725
1726 SvRV_set (coro_current, (SV *)next->hv);
1727
1728 free_coro_mortal (aTHX);
1729 coro_mortal = prev_sv;
1730}
1731
1732static void
1733prepare_schedule (pTHX_ struct coro_transfer_args *ta)
1734{
1735 for (;;)
1736 {
1737 struct coro *next = coro_deq (aTHX);
1738
1739 if (expect_true (next))
1740 {
1741 /* cannot transfer to destroyed coros, skip and look for next */
1742 if (expect_false (next->flags & (CF_DESTROYED | CF_SUSPENDED)))
1743 SvREFCNT_dec (next->hv); /* coro_nready has already been taken care of by destroy */
1744 else
1745 {
1746 next->flags &= ~CF_READY;
1747 --coro_nready;
1748
1749 prepare_schedule_to (aTHX_ ta, next);
1750 break;
1751 }
1752 }
1753 else
1754 {
1755 /* nothing to schedule: call the idle handler */
1756 if (SvROK (sv_idle)
1757 && SvOBJECT (SvRV (sv_idle)))
1758 {
1759 ++coro_nready; /* hack so that api_ready doesn't invoke ready hook */
1760 api_ready (aTHX_ SvRV (sv_idle));
1761 --coro_nready;
1762 }
1763 else
1764 {
1765 /* TODO: deprecated, remove, cannot work reliably *//*D*/
1766 dSP;
1767
1768 ENTER;
1769 SAVETMPS;
1770
1771 PUSHMARK (SP);
1772 PUTBACK;
1773 call_sv (sv_idle, G_VOID | G_DISCARD);
1774
1775 FREETMPS;
1776 LEAVE;
1777 }
1778 }
1779 }
1780}
1781
1782INLINE void
1783prepare_cede (pTHX_ struct coro_transfer_args *ta)
1784{
1785 api_ready (aTHX_ coro_current);
1786 prepare_schedule (aTHX_ ta);
1787}
1788
1789INLINE void
1790prepare_cede_notself (pTHX_ struct coro_transfer_args *ta)
1791{
1792 SV *prev = SvRV (coro_current);
1793
1794 if (coro_nready)
1795 {
1796 prepare_schedule (aTHX_ ta);
1797 api_ready (aTHX_ prev);
1798 }
1799 else
1800 prepare_nop (aTHX_ ta);
1801}
1802
1803static void
1804api_schedule (pTHX)
1805{
1806 struct coro_transfer_args ta;
1807
1808 prepare_schedule (aTHX_ &ta);
1809 TRANSFER (ta, 1);
1810}
1811
1812static void
1813api_schedule_to (pTHX_ SV *coro_sv)
1814{
1815 struct coro_transfer_args ta;
1816 struct coro *next = SvSTATE (coro_sv);
1817
1818 SvREFCNT_inc_NN (coro_sv);
1819 prepare_schedule_to (aTHX_ &ta, next);
1820}
1821
1822static int
1823api_cede (pTHX)
1824{
1825 struct coro_transfer_args ta;
1826
1827 prepare_cede (aTHX_ &ta);
1828
1829 if (expect_true (ta.prev != ta.next))
1830 {
1831 TRANSFER (ta, 1);
1832 return 1;
1833 }
1834 else
1835 return 0;
1836}
1837
1838static int
1839api_cede_notself (pTHX)
1840{
1841 if (coro_nready)
1842 {
1843 struct coro_transfer_args ta;
1844
1845 prepare_cede_notself (aTHX_ &ta);
1846 TRANSFER (ta, 1);
1847 return 1;
1848 }
1849 else
1850 return 0;
1851}
1852
1853static void
1854api_trace (pTHX_ SV *coro_sv, int flags)
1855{
1856 struct coro *coro = SvSTATE (coro_sv);
1857
1858 if (coro->flags & CF_RUNNING)
1859 croak ("cannot enable tracing on a running coroutine, caught");
1860
1861 if (flags & CC_TRACE)
1862 {
1863 if (!coro->cctx)
1864 coro->cctx = cctx_new_run ();
1865 else if (!(coro->cctx->flags & CC_TRACE))
1866 croak ("cannot enable tracing on coroutine with custom stack, caught");
1867
1868 coro->cctx->flags |= CC_NOREUSE | (flags & (CC_TRACE | CC_TRACE_ALL));
1869 }
1870 else if (coro->cctx && coro->cctx->flags & CC_TRACE)
1871 {
1872 coro->cctx->flags &= ~(CC_TRACE | CC_TRACE_ALL);
1873
1874 if (coro->flags & CF_RUNNING)
1875 PL_runops = RUNOPS_DEFAULT;
1876 else
1877 coro->slot->runops = RUNOPS_DEFAULT;
1878 }
1879}
1880
1881static void
1882coro_call_on_destroy (pTHX_ struct coro *coro)
1883{
1884 SV **on_destroyp = hv_fetch (coro->hv, "_on_destroy", sizeof ("_on_destroy") - 1, 0);
1885 SV **statusp = hv_fetch (coro->hv, "_status", sizeof ("_status") - 1, 0);
1886
1887 if (on_destroyp)
1888 {
1889 AV *on_destroy = (AV *)SvRV (*on_destroyp);
1890
1891 while (AvFILLp (on_destroy) >= 0)
1892 {
1893 dSP; /* don't disturb outer sp */
1894 SV *cb = av_pop (on_destroy);
1895
1896 PUSHMARK (SP);
1897
1898 if (statusp)
1899 {
1900 int i;
1901 AV *status = (AV *)SvRV (*statusp);
1902 EXTEND (SP, AvFILLp (status) + 1);
1903
1904 for (i = 0; i <= AvFILLp (status); ++i)
1905 PUSHs (AvARRAY (status)[i]);
1906 }
1907
1908 PUTBACK;
1909 call_sv (sv_2mortal (cb), G_VOID | G_DISCARD);
1910 }
1911 }
1912}
1913
1914static void
1915slf_init_terminate (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
1916{
1917 int i;
1918 HV *hv = (HV *)SvRV (coro_current);
1919 AV *av = newAV ();
1920
1921 /* items are actually not so common, so optimise for this case */
1922 if (items)
1923 {
1924 av_extend (av, items - 1);
1925
1926 for (i = 0; i < items; ++i)
1927 av_push (av, SvREFCNT_inc_NN (arg [i]));
1928 }
1929
1930 hv_store (hv, "_status", sizeof ("_status") - 1, newRV_noinc ((SV *)av), 0);
1931
1932 av_push (av_destroy, (SV *)newRV_inc ((SV *)hv)); /* RVinc for perl */
1933 api_ready (aTHX_ sv_manager);
1934
1935 frame->prepare = prepare_schedule;
1936 frame->check = slf_check_repeat;
1937
1938 /* as a minor optimisation, we could unwind all stacks here */
1939 /* but that puts extra pressure on pp_slf, and is not worth much */
1940 /*coro_unwind_stacks (aTHX);*/
1941}
1942
1943/*****************************************************************************/
1944/* async pool handler */
1945
1946static int
1947slf_check_pool_handler (pTHX_ struct CoroSLF *frame)
1948{
1949 HV *hv = (HV *)SvRV (coro_current);
1950 struct coro *coro = (struct coro *)frame->data;
1951
1952 if (!coro->invoke_cb)
1953 return 1; /* loop till we have invoke */
1954 else
1955 {
1956 hv_store (hv, "desc", sizeof ("desc") - 1,
1957 newSVpvn ("[async_pool]", sizeof ("[async_pool]") - 1), 0);
1958
1959 coro->saved_deffh = SvREFCNT_inc_NN ((SV *)PL_defoutgv);
1960
1961 {
1962 dSP;
1963 XPUSHs (sv_2mortal (coro->invoke_cb)); coro->invoke_cb = 0;
1964 PUTBACK;
1965 }
1966
1967 SvREFCNT_dec (GvAV (PL_defgv));
1968 GvAV (PL_defgv) = coro->invoke_av;
1969 coro->invoke_av = 0;
1970
1971 return 0;
1972 }
1973}
1974
1975static void
1976slf_init_pool_handler (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
1977{
1978 HV *hv = (HV *)SvRV (coro_current);
1979 struct coro *coro = SvSTATE_hv ((SV *)hv);
1980
1981 if (expect_true (coro->saved_deffh))
1982 {
1983 /* subsequent iteration */
1984 SvREFCNT_dec ((SV *)PL_defoutgv); PL_defoutgv = (GV *)coro->saved_deffh;
1985 coro->saved_deffh = 0;
1986
1987 if (coro_rss (aTHX_ coro) > SvUV (sv_pool_rss)
1988 || av_len (av_async_pool) + 1 >= SvIV (sv_pool_size))
1989 {
1990 coro->invoke_cb = SvREFCNT_inc_NN ((SV *)cv_coro_terminate);
1991 coro->invoke_av = newAV ();
1992
1993 frame->prepare = prepare_nop;
1994 }
1995 else
1996 {
1997 av_clear (GvAV (PL_defgv));
1998 hv_store (hv, "desc", sizeof ("desc") - 1, SvREFCNT_inc_NN (sv_async_pool_idle), 0);
1999
2000 coro->prio = 0;
2001
2002 if (coro->cctx && (coro->cctx->flags & CC_TRACE))
2003 api_trace (aTHX_ coro_current, 0);
2004
2005 frame->prepare = prepare_schedule;
2006 av_push (av_async_pool, SvREFCNT_inc (hv));
2007 }
2008 }
2009 else
2010 {
2011 /* first iteration, simply fall through */
2012 frame->prepare = prepare_nop;
2013 }
2014
2015 frame->check = slf_check_pool_handler;
2016 frame->data = (void *)coro;
2017}
2018
2019/*****************************************************************************/
2020/* rouse callback */
2021
2022#define CORO_MAGIC_type_rouse PERL_MAGIC_ext
2023
2024static void
2025coro_rouse_callback (pTHX_ CV *cv)
2026{
2027 dXSARGS;
2028 SV *data = (SV *)S_GENSUB_ARG;
2029
2030 if (SvTYPE (SvRV (data)) != SVt_PVAV)
2031 {
2032 /* first call, set args */
2033 SV *coro = SvRV (data);
2034 AV *av = newAV ();
2035
2036 SvRV_set (data, (SV *)av);
2037
2038 /* better take a full copy of the arguments */
2039 while (items--)
2040 av_store (av, items, newSVsv (ST (items)));
2041
2042 api_ready (aTHX_ coro);
2043 SvREFCNT_dec (coro);
2044 }
2045
2046 XSRETURN_EMPTY;
2047}
2048
2049static int
2050slf_check_rouse_wait (pTHX_ struct CoroSLF *frame)
2051{
2052 SV *data = (SV *)frame->data;
2053
2054 if (CORO_THROW)
2055 return 0;
2056
2057 if (SvTYPE (SvRV (data)) != SVt_PVAV)
2058 return 1;
2059
2060 /* now push all results on the stack */
2061 {
2062 dSP;
2063 AV *av = (AV *)SvRV (data);
2064 int i;
2065
2066 EXTEND (SP, AvFILLp (av) + 1);
2067 for (i = 0; i <= AvFILLp (av); ++i)
2068 PUSHs (sv_2mortal (AvARRAY (av)[i]));
2069
2070 /* we have stolen the elements, so set length to zero and free */
2071 AvFILLp (av) = -1;
2072 av_undef (av);
2073
2074 PUTBACK;
2075 }
2076
2077 return 0;
2078}
2079
2080static void
2081slf_init_rouse_wait (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2082{
2083 SV *cb;
2084
2085 if (items)
2086 cb = arg [0];
2087 else
2088 {
2089 struct coro *coro = SvSTATE_current;
2090
2091 if (!coro->rouse_cb)
2092 croak ("Coro::rouse_wait called without rouse callback, and no default rouse callback found either,");
2093
2094 cb = sv_2mortal (coro->rouse_cb);
2095 coro->rouse_cb = 0;
2096 }
2097
2098 if (!SvROK (cb)
2099 || SvTYPE (SvRV (cb)) != SVt_PVCV
2100 || CvXSUB ((CV *)SvRV (cb)) != coro_rouse_callback)
2101 croak ("Coro::rouse_wait called with illegal callback argument,");
2102
2103 {
2104 CV *cv = (CV *)SvRV (cb); /* for S_GENSUB_ARG */
2105 SV *data = (SV *)S_GENSUB_ARG;
2106
2107 frame->data = (void *)data;
2108 frame->prepare = SvTYPE (SvRV (data)) == SVt_PVAV ? prepare_nop : prepare_schedule;
2109 frame->check = slf_check_rouse_wait;
2110 }
2111}
2112
2113static SV *
2114coro_new_rouse_cb (pTHX)
2115{
2116 HV *hv = (HV *)SvRV (coro_current);
2117 struct coro *coro = SvSTATE_hv (hv);
2118 SV *data = newRV_inc ((SV *)hv);
2119 SV *cb = s_gensub (aTHX_ coro_rouse_callback, (void *)data);
2120
2121 sv_magicext (SvRV (cb), data, CORO_MAGIC_type_rouse, 0, 0, 0);
2122 SvREFCNT_dec (data); /* magicext increases the refcount */
2123
2124 SvREFCNT_dec (coro->rouse_cb);
2125 coro->rouse_cb = SvREFCNT_inc_NN (cb);
2126
2127 return cb;
2128}
2129
2130/*****************************************************************************/
2131/* schedule-like-function opcode (SLF) */
2132
2133static UNOP slf_restore; /* restore stack as entersub did, for first-re-run */
2134static const CV *slf_cv;
2135static SV **slf_argv;
2136static int slf_argc, slf_arga; /* count, allocated */
2137static I32 slf_ax; /* top of stack, for restore */
2138
2139/* this restores the stack in the case we patched the entersub, to */
2140/* recreate the stack frame as perl will on following calls */
2141/* since entersub cleared the stack */
2142static OP *
2143pp_restore (pTHX)
2144{
2145 int i;
2146 SV **SP = PL_stack_base + slf_ax;
2147
2148 PUSHMARK (SP);
2149
2150 EXTEND (SP, slf_argc + 1);
2151
2152 for (i = 0; i < slf_argc; ++i)
2153 PUSHs (sv_2mortal (slf_argv [i]));
2154
2155 PUSHs ((SV *)CvGV (slf_cv));
2156
2157 RETURNOP (slf_restore.op_first);
2158}
2159
2160static void
2161slf_prepare_transfer (pTHX_ struct coro_transfer_args *ta)
2162{
2163 SV **arg = (SV **)slf_frame.data;
2164
2165 prepare_transfer (aTHX_ ta, arg [0], arg [1]);
2166}
2167
2168static void
2169slf_init_transfer (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2170{
2171 if (items != 2)
2172 croak ("Coro::State::transfer (prev, next) expects two arguments, not %d,", items);
2173
2174 frame->prepare = slf_prepare_transfer;
2175 frame->check = slf_check_nop;
2176 frame->data = (void *)arg; /* let's hope it will stay valid */
2177}
2178
2179static void
2180slf_init_schedule (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2181{
2182 frame->prepare = prepare_schedule;
2183 frame->check = slf_check_nop;
2184}
2185
2186static void
2187slf_prepare_schedule_to (pTHX_ struct coro_transfer_args *ta)
2188{
2189 struct coro *next = (struct coro *)slf_frame.data;
2190
2191 SvREFCNT_inc_NN (next->hv);
2192 prepare_schedule_to (aTHX_ ta, next);
2193}
2194
2195static void
2196slf_init_schedule_to (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2197{
2198 if (!items)
2199 croak ("Coro::schedule_to expects a coroutine argument, caught");
2200
2201 frame->data = (void *)SvSTATE (arg [0]);
2202 frame->prepare = slf_prepare_schedule_to;
2203 frame->check = slf_check_nop;
2204}
2205
2206static void
2207slf_init_cede_to (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2208{
2209 api_ready (aTHX_ SvRV (coro_current));
2210
2211 slf_init_schedule_to (aTHX_ frame, cv, arg, items);
2212}
2213
2214static void
2215slf_init_cede (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2216{
2217 frame->prepare = prepare_cede;
2218 frame->check = slf_check_nop;
2219}
2220
2221static void
2222slf_init_cede_notself (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2223{
2224 frame->prepare = prepare_cede_notself;
2225 frame->check = slf_check_nop;
2226}
2227
2228/*
2229 * these not obviously related functions are all rolled into one
2230 * function to increase chances that they all will call transfer with the same
2231 * stack offset
2232 * SLF stands for "schedule-like-function".
2233 */
2234static OP *
2235pp_slf (pTHX)
2236{
2237 I32 checkmark; /* mark SP to see how many elements check has pushed */
2238
2239 /* set up the slf frame, unless it has already been set-up */
2240 /* the latter happens when a new coro has been started */
2241 /* or when a new cctx was attached to an existing coroutine */
2242 if (expect_true (!slf_frame.prepare))
2243 {
2244 /* first iteration */
2245 dSP;
2246 SV **arg = PL_stack_base + TOPMARK + 1;
2247 int items = SP - arg; /* args without function object */
2248 SV *gv = *sp;
2249
2250 /* do a quick consistency check on the "function" object, and if it isn't */
2251 /* for us, divert to the real entersub */
2252 if (SvTYPE (gv) != SVt_PVGV
2253 || !GvCV (gv)
2254 || !(CvFLAGS (GvCV (gv)) & CVf_SLF))
2255 return PL_ppaddr[OP_ENTERSUB](aTHX);
2256
2257 if (!(PL_op->op_flags & OPf_STACKED))
2258 {
2259 /* ampersand-form of call, use @_ instead of stack */
2260 AV *av = GvAV (PL_defgv);
2261 arg = AvARRAY (av);
2262 items = AvFILLp (av) + 1;
2263 }
2264
2265 /* now call the init function, which needs to set up slf_frame */
2266 ((coro_slf_cb)CvXSUBANY (GvCV (gv)).any_ptr)
2267 (aTHX_ &slf_frame, GvCV (gv), arg, items);
2268
2269 /* pop args */
2270 SP = PL_stack_base + POPMARK;
2271
2272 PUTBACK;
2273 }
2274
2275 /* now that we have a slf_frame, interpret it! */
2276 /* we use a callback system not to make the code needlessly */
2277 /* complicated, but so we can run multiple perl coros from one cctx */
2278
2279 do
2280 {
2281 struct coro_transfer_args ta;
2282
2283 slf_frame.prepare (aTHX_ &ta);
2284 TRANSFER (ta, 0);
2285
2286 checkmark = PL_stack_sp - PL_stack_base;
2287 }
2288 while (slf_frame.check (aTHX_ &slf_frame));
2289
2290 slf_frame.prepare = 0; /* invalidate the frame, we are done processing it */
2291
2292 /* exception handling */
2293 if (expect_false (CORO_THROW))
2294 {
2295 SV *exception = sv_2mortal (CORO_THROW);
2296
2297 CORO_THROW = 0;
2298 sv_setsv (ERRSV, exception);
2299 croak (0);
2300 }
2301
2302 /* return value handling - mostly like entersub */
2303 /* make sure we put something on the stack in scalar context */
2304 if (GIMME_V == G_SCALAR)
2305 {
2306 dSP;
2307 SV **bot = PL_stack_base + checkmark;
2308
2309 if (sp == bot) /* too few, push undef */
2310 bot [1] = &PL_sv_undef;
2311 else if (sp != bot + 1) /* too many, take last one */
2312 bot [1] = *sp;
2313
2314 SP = bot + 1;
2315
2316 PUTBACK;
2317 }
2318
2319 return NORMAL;
2320}
2321
2322static void
2323api_execute_slf (pTHX_ CV *cv, coro_slf_cb init_cb, I32 ax)
2324{
2325 int i;
2326 SV **arg = PL_stack_base + ax;
2327 int items = PL_stack_sp - arg + 1;
2328
2329 assert (("FATAL: SLF call with illegal CV value", !CvANON (cv)));
2330
2331 if (PL_op->op_ppaddr != PL_ppaddr [OP_ENTERSUB]
2332 && PL_op->op_ppaddr != pp_slf)
2333 croak ("FATAL: Coro SLF calls can only be made normally, not via goto or any other means, caught");
2334
2335 CvFLAGS (cv) |= CVf_SLF;
2336 CvXSUBANY (cv).any_ptr = (void *)init_cb;
2337 slf_cv = cv;
2338
2339 /* we patch the op, and then re-run the whole call */
2340 /* we have to put the same argument on the stack for this to work */
2341 /* and this will be done by pp_restore */
2342 slf_restore.op_next = (OP *)&slf_restore;
2343 slf_restore.op_type = OP_CUSTOM;
2344 slf_restore.op_ppaddr = pp_restore;
2345 slf_restore.op_first = PL_op;
2346
2347 slf_ax = ax - 1; /* undo the ax++ inside dAXMARK */
2348
2349 if (PL_op->op_flags & OPf_STACKED)
2350 {
2351 if (items > slf_arga)
2352 {
2353 slf_arga = items;
2354 free (slf_argv);
2355 slf_argv = malloc (slf_arga * sizeof (SV *));
2356 }
2357
2358 slf_argc = items;
2359
2360 for (i = 0; i < items; ++i)
2361 slf_argv [i] = SvREFCNT_inc (arg [i]);
2362 }
2363 else
2364 slf_argc = 0;
2365
2366 PL_op->op_ppaddr = pp_slf;
2367 /*PL_op->op_type = OP_CUSTOM; /* we do behave like entersub still */
2368
2369 PL_op = (OP *)&slf_restore;
2370}
2371
2372/*****************************************************************************/
2373/* dynamic wind */
2374
2375static void
2376on_enterleave_call (pTHX_ SV *cb)
2377{
2378 dSP;
2379
2380 PUSHSTACK;
2381
2382 PUSHMARK (SP);
2383 PUTBACK;
2384 call_sv (cb, G_VOID | G_DISCARD);
2385 SPAGAIN;
2386
2387 POPSTACK;
2388}
2389
2390static SV *
2391coro_avp_pop_and_free (pTHX_ AV **avp)
2392{
2393 AV *av = *avp;
2394 SV *res = av_pop (av);
2395
2396 if (AvFILLp (av) < 0)
2397 {
2398 *avp = 0;
2399 SvREFCNT_dec (av);
2400 }
2401
2402 return res;
2403}
2404
2405static void
2406coro_pop_on_enter (pTHX_ void *coro)
2407{
2408 SV *cb = coro_avp_pop_and_free (aTHX_ &((struct coro *)coro)->on_enter);
2409 SvREFCNT_dec (cb);
2410}
2411
2412static void
2413coro_pop_on_leave (pTHX_ void *coro)
2414{
2415 SV *cb = coro_avp_pop_and_free (aTHX_ &((struct coro *)coro)->on_leave);
2416 on_enterleave_call (aTHX_ sv_2mortal (cb));
2417}
2418
2419/*****************************************************************************/
2420/* PerlIO::cede */
2421
2422typedef struct
2423{
2424 PerlIOBuf base;
2425 NV next, every;
2426} PerlIOCede;
2427
2428static IV
2429PerlIOCede_pushed (pTHX_ PerlIO *f, const char *mode, SV *arg, PerlIO_funcs *tab)
2430{
2431 PerlIOCede *self = PerlIOSelf (f, PerlIOCede);
2432
2433 self->every = SvCUR (arg) ? SvNV (arg) : 0.01;
2434 self->next = nvtime () + self->every;
2435
2436 return PerlIOBuf_pushed (aTHX_ f, mode, Nullsv, tab);
2437}
2438
2439static SV *
2440PerlIOCede_getarg (pTHX_ PerlIO *f, CLONE_PARAMS *param, int flags)
2441{
2442 PerlIOCede *self = PerlIOSelf (f, PerlIOCede);
2443
2444 return newSVnv (self->every);
2445}
2446
2447static IV
2448PerlIOCede_flush (pTHX_ PerlIO *f)
2449{
2450 PerlIOCede *self = PerlIOSelf (f, PerlIOCede);
2451 double now = nvtime ();
2452
2453 if (now >= self->next)
2454 {
2455 api_cede (aTHX);
2456 self->next = now + self->every;
2457 }
2458
2459 return PerlIOBuf_flush (aTHX_ f);
2460}
2461
2462static PerlIO_funcs PerlIO_cede =
2463{
2464 sizeof(PerlIO_funcs),
2465 "cede",
2466 sizeof(PerlIOCede),
2467 PERLIO_K_DESTRUCT | PERLIO_K_RAW,
2468 PerlIOCede_pushed,
2469 PerlIOBuf_popped,
2470 PerlIOBuf_open,
2471 PerlIOBase_binmode,
2472 PerlIOCede_getarg,
2473 PerlIOBase_fileno,
2474 PerlIOBuf_dup,
2475 PerlIOBuf_read,
2476 PerlIOBuf_unread,
2477 PerlIOBuf_write,
2478 PerlIOBuf_seek,
2479 PerlIOBuf_tell,
2480 PerlIOBuf_close,
2481 PerlIOCede_flush,
2482 PerlIOBuf_fill,
2483 PerlIOBase_eof,
2484 PerlIOBase_error,
2485 PerlIOBase_clearerr,
2486 PerlIOBase_setlinebuf,
2487 PerlIOBuf_get_base,
2488 PerlIOBuf_bufsiz,
2489 PerlIOBuf_get_ptr,
2490 PerlIOBuf_get_cnt,
2491 PerlIOBuf_set_ptrcnt,
2492};
2493
2494/*****************************************************************************/
2495/* Coro::Semaphore & Coro::Signal */
2496
2497static SV *
2498coro_waitarray_new (pTHX_ int count)
2499{
2500 /* a waitarray=semaphore contains a counter IV in $sem->[0] and any waiters after that */
2501 AV *av = newAV ();
2502 SV **ary;
2503
2504 /* unfortunately, building manually saves memory */
2505 Newx (ary, 2, SV *);
2506 AvALLOC (av) = ary;
2507#if PERL_VERSION_ATLEAST (5,10,0)
2508 AvARRAY (av) = ary;
2509#else
2510 /* 5.8.8 needs this syntax instead of AvARRAY = ary, yet */
2511 /* -DDEBUGGING flags this as a bug, despite it perfectly working */
2512 SvPVX ((SV *)av) = (char *)ary;
2513#endif
2514 AvMAX (av) = 1;
2515 AvFILLp (av) = 0;
2516 ary [0] = newSViv (count);
2517
2518 return newRV_noinc ((SV *)av);
2519}
2520
2521/* semaphore */
2522
2523static void
2524coro_semaphore_adjust (pTHX_ AV *av, IV adjust)
2525{
2526 SV *count_sv = AvARRAY (av)[0];
2527 IV count = SvIVX (count_sv);
2528
2529 count += adjust;
2530 SvIVX (count_sv) = count;
2531
2532 /* now wake up as many waiters as are expected to lock */
2533 while (count > 0 && AvFILLp (av) > 0)
2534 {
2535 SV *cb;
2536
2537 /* swap first two elements so we can shift a waiter */
2538 AvARRAY (av)[0] = AvARRAY (av)[1];
2539 AvARRAY (av)[1] = count_sv;
2540 cb = av_shift (av);
2541
2542 if (SvOBJECT (cb))
2543 {
2544 api_ready (aTHX_ cb);
2545 --count;
2546 }
2547 else if (SvTYPE (cb) == SVt_PVCV)
2548 {
2549 dSP;
2550 PUSHMARK (SP);
2551 XPUSHs (sv_2mortal (newRV_inc ((SV *)av)));
2552 PUTBACK;
2553 call_sv (cb, G_VOID | G_DISCARD | G_EVAL | G_KEEPERR);
2554 }
2555
2556 SvREFCNT_dec (cb);
2557 }
2558}
2559
2560static void
2561coro_semaphore_on_destroy (pTHX_ struct coro *coro)
2562{
2563 /* call $sem->adjust (0) to possibly wake up some other waiters */
2564 coro_semaphore_adjust (aTHX_ (AV *)coro->slf_frame.data, 0);
2565}
2566
2567static int
2568slf_check_semaphore_down_or_wait (pTHX_ struct CoroSLF *frame, int acquire)
2569{
2570 AV *av = (AV *)frame->data;
2571 SV *count_sv = AvARRAY (av)[0];
2572
2573 /* if we are about to throw, don't actually acquire the lock, just throw */
2574 if (CORO_THROW)
2575 return 0;
2576 else if (SvIVX (count_sv) > 0)
2577 {
2578 SvSTATE_current->on_destroy = 0;
2579
2580 if (acquire)
2581 SvIVX (count_sv) = SvIVX (count_sv) - 1;
2582 else
2583 coro_semaphore_adjust (aTHX_ av, 0);
2584
2585 return 0;
2586 }
2587 else
2588 {
2589 int i;
2590 /* if we were woken up but can't down, we look through the whole */
2591 /* waiters list and only add us if we aren't in there already */
2592 /* this avoids some degenerate memory usage cases */
2593
2594 for (i = 1; i <= AvFILLp (av); ++i)
2595 if (AvARRAY (av)[i] == SvRV (coro_current))
2596 return 1;
2597
2598 av_push (av, SvREFCNT_inc (SvRV (coro_current)));
2599 return 1;
2600 }
2601}
2602
2603static int
2604slf_check_semaphore_down (pTHX_ struct CoroSLF *frame)
2605{
2606 return slf_check_semaphore_down_or_wait (aTHX_ frame, 1);
2607}
2608
2609static int
2610slf_check_semaphore_wait (pTHX_ struct CoroSLF *frame)
2611{
2612 return slf_check_semaphore_down_or_wait (aTHX_ frame, 0);
2613}
2614
2615static void
2616slf_init_semaphore_down_or_wait (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2617{
2618 AV *av = (AV *)SvRV (arg [0]);
2619
2620 if (SvIVX (AvARRAY (av)[0]) > 0)
2621 {
2622 frame->data = (void *)av;
2623 frame->prepare = prepare_nop;
2624 }
2625 else
2626 {
2627 av_push (av, SvREFCNT_inc (SvRV (coro_current)));
2628
2629 frame->data = (void *)sv_2mortal (SvREFCNT_inc ((SV *)av));
2630 frame->prepare = prepare_schedule;
2631
2632 /* to avoid race conditions when a woken-up coro gets terminated */
2633 /* we arrange for a temporary on_destroy that calls adjust (0) */
2634 SvSTATE_current->on_destroy = coro_semaphore_on_destroy;
2635 }
2636}
2637
2638static void
2639slf_init_semaphore_down (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2640{
2641 slf_init_semaphore_down_or_wait (aTHX_ frame, cv, arg, items);
2642 frame->check = slf_check_semaphore_down;
2643}
2644
2645static void
2646slf_init_semaphore_wait (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2647{
2648 if (items >= 2)
2649 {
2650 /* callback form */
2651 AV *av = (AV *)SvRV (arg [0]);
2652 SV *cb_cv = s_get_cv_croak (arg [1]);
2653
2654 av_push (av, SvREFCNT_inc_NN (cb_cv));
2655
2656 if (SvIVX (AvARRAY (av)[0]) > 0)
2657 coro_semaphore_adjust (aTHX_ av, 0);
2658
2659 frame->prepare = prepare_nop;
2660 frame->check = slf_check_nop;
2661 }
2662 else
2663 {
2664 slf_init_semaphore_down_or_wait (aTHX_ frame, cv, arg, items);
2665 frame->check = slf_check_semaphore_wait;
2666 }
2667}
2668
2669/* signal */
2670
2671static void
2672coro_signal_wake (pTHX_ AV *av, int count)
2673{
2674 SvIVX (AvARRAY (av)[0]) = 0;
2675
2676 /* now signal count waiters */
2677 while (count > 0 && AvFILLp (av) > 0)
2678 {
2679 SV *cb;
2680
2681 /* swap first two elements so we can shift a waiter */
2682 cb = AvARRAY (av)[0];
2683 AvARRAY (av)[0] = AvARRAY (av)[1];
2684 AvARRAY (av)[1] = cb;
2685
2686 cb = av_shift (av);
2687
2688 if (SvTYPE (cb) == SVt_PVCV)
2689 {
2690 dSP;
2691 PUSHMARK (SP);
2692 XPUSHs (sv_2mortal (newRV_inc ((SV *)av)));
2693 PUTBACK;
2694 call_sv (cb, G_VOID | G_DISCARD | G_EVAL | G_KEEPERR);
2695 }
2696 else
2697 {
2698 api_ready (aTHX_ cb);
2699 sv_setiv (cb, 0); /* signal waiter */
2700 }
2701
2702 SvREFCNT_dec (cb);
2703
2704 --count;
2705 }
2706}
2707
2708static int
2709slf_check_signal_wait (pTHX_ struct CoroSLF *frame)
2710{
2711 /* if we are about to throw, also stop waiting */
2712 return SvROK ((SV *)frame->data) && !CORO_THROW;
2713}
2714
2715static void
2716slf_init_signal_wait (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2717{
2718 AV *av = (AV *)SvRV (arg [0]);
2719
2720 if (items >= 2)
2721 {
2722 SV *cb_cv = s_get_cv_croak (arg [1]);
2723 av_push (av, SvREFCNT_inc_NN (cb_cv));
2724
2725 if (SvIVX (AvARRAY (av)[0]))
2726 coro_signal_wake (aTHX_ av, 1); /* ust be the only waiter */
2727
2728 frame->prepare = prepare_nop;
2729 frame->check = slf_check_nop;
2730 }
2731 else if (SvIVX (AvARRAY (av)[0]))
2732 {
2733 SvIVX (AvARRAY (av)[0]) = 0;
2734 frame->prepare = prepare_nop;
2735 frame->check = slf_check_nop;
2736 }
2737 else
2738 {
2739 SV *waiter = newSVsv (coro_current); /* owned by signal av */
2740
2741 av_push (av, waiter);
2742
2743 frame->data = (void *)sv_2mortal (SvREFCNT_inc_NN (waiter)); /* owned by process */
2744 frame->prepare = prepare_schedule;
2745 frame->check = slf_check_signal_wait;
2746 }
2747}
2748
2749/*****************************************************************************/
2750/* Coro::AIO */
2751
2752#define CORO_MAGIC_type_aio PERL_MAGIC_ext
2753
2754/* helper storage struct */
2755struct io_state
2756{
2757 int errorno;
2758 I32 laststype; /* U16 in 5.10.0 */
2759 int laststatval;
2760 Stat_t statcache;
2761};
2762
2763static void
2764coro_aio_callback (pTHX_ CV *cv)
2765{
2766 dXSARGS;
2767 AV *state = (AV *)S_GENSUB_ARG;
2768 SV *coro = av_pop (state);
2769 SV *data_sv = newSV (sizeof (struct io_state));
2770
2771 av_extend (state, items - 1);
2772
2773 sv_upgrade (data_sv, SVt_PV);
2774 SvCUR_set (data_sv, sizeof (struct io_state));
2775 SvPOK_only (data_sv);
2776
2777 {
2778 struct io_state *data = (struct io_state *)SvPVX (data_sv);
2779
2780 data->errorno = errno;
2781 data->laststype = PL_laststype;
2782 data->laststatval = PL_laststatval;
2783 data->statcache = PL_statcache;
2784 }
2785
2786 /* now build the result vector out of all the parameters and the data_sv */
2787 {
2788 int i;
2789
2790 for (i = 0; i < items; ++i)
2791 av_push (state, SvREFCNT_inc_NN (ST (i)));
2792 }
2793
2794 av_push (state, data_sv);
2795
2796 api_ready (aTHX_ coro);
2797 SvREFCNT_dec (coro);
2798 SvREFCNT_dec ((AV *)state);
2799}
2800
2801static int
2802slf_check_aio_req (pTHX_ struct CoroSLF *frame)
2803{
2804 AV *state = (AV *)frame->data;
2805
2806 /* if we are about to throw, return early */
2807 /* this does not cancel the aio request, but at least */
2808 /* it quickly returns */
2809 if (CORO_THROW)
2810 return 0;
2811
2812 /* one element that is an RV? repeat! */
2813 if (AvFILLp (state) == 0 && SvROK (AvARRAY (state)[0]))
2814 return 1;
2815
2816 /* restore status */
2817 {
2818 SV *data_sv = av_pop (state);
2819 struct io_state *data = (struct io_state *)SvPVX (data_sv);
2820
2821 errno = data->errorno;
2822 PL_laststype = data->laststype;
2823 PL_laststatval = data->laststatval;
2824 PL_statcache = data->statcache;
2825
2826 SvREFCNT_dec (data_sv);
2827 }
2828
2829 /* push result values */
2830 {
2831 dSP;
2832 int i;
2833
2834 EXTEND (SP, AvFILLp (state) + 1);
2835 for (i = 0; i <= AvFILLp (state); ++i)
2836 PUSHs (sv_2mortal (SvREFCNT_inc_NN (AvARRAY (state)[i])));
2837
2838 PUTBACK;
2839 }
2840
2841 return 0;
2842}
2843
2844static void
2845slf_init_aio_req (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2846{
2847 AV *state = (AV *)sv_2mortal ((SV *)newAV ());
2848 SV *coro_hv = SvRV (coro_current);
2849 struct coro *coro = SvSTATE_hv (coro_hv);
2850
2851 /* put our coroutine id on the state arg */
2852 av_push (state, SvREFCNT_inc_NN (coro_hv));
2853
2854 /* first see whether we have a non-zero priority and set it as AIO prio */
2855 if (coro->prio)
2856 {
2857 dSP;
2858
2859 static SV *prio_cv;
2860 static SV *prio_sv;
2861
2862 if (expect_false (!prio_cv))
2863 {
2864 prio_cv = (SV *)get_cv ("IO::AIO::aioreq_pri", 0);
2865 prio_sv = newSViv (0);
2866 }
2867
2868 PUSHMARK (SP);
2869 sv_setiv (prio_sv, coro->prio);
2870 XPUSHs (prio_sv);
2871
2872 PUTBACK;
2873 call_sv (prio_cv, G_VOID | G_DISCARD);
2874 }
2875
2876 /* now call the original request */
2877 {
2878 dSP;
2879 CV *req = (CV *)CORO_MAGIC_NN ((SV *)cv, CORO_MAGIC_type_aio)->mg_obj;
2880 int i;
2881
2882 PUSHMARK (SP);
2883
2884 /* first push all args to the stack */
2885 EXTEND (SP, items + 1);
2886
2887 for (i = 0; i < items; ++i)
2888 PUSHs (arg [i]);
2889
2890 /* now push the callback closure */
2891 PUSHs (sv_2mortal (s_gensub (aTHX_ coro_aio_callback, (void *)SvREFCNT_inc_NN ((SV *)state))));
2892
2893 /* now call the AIO function - we assume our request is uncancelable */
2894 PUTBACK;
2895 call_sv ((SV *)req, G_VOID | G_DISCARD);
2896 }
2897
2898 /* now that the requets is going, we loop toll we have a result */
2899 frame->data = (void *)state;
2900 frame->prepare = prepare_schedule;
2901 frame->check = slf_check_aio_req;
2902}
2903
2904static void
2905coro_aio_req_xs (pTHX_ CV *cv)
2906{
2907 dXSARGS;
2908
2909 CORO_EXECUTE_SLF_XS (slf_init_aio_req);
2910
2911 XSRETURN_EMPTY;
2912}
2913
2914/*****************************************************************************/
2915
2916#if CORO_CLONE
2917# include "clone.c"
2918#endif
2919
2920/*****************************************************************************/
2921
2922static SV *
2923coro_new (pTHX_ HV *stash, SV **argv, int argc, int is_coro)
2924{
2925 SV *coro_sv;
2926 struct coro *coro;
2927 MAGIC *mg;
2928 HV *hv;
2929 SV *cb;
2930 int i;
2931
2932 if (argc > 0)
2933 {
2934 cb = s_get_cv_croak (argv [0]);
2935
2936 if (!is_coro)
2937 {
2938 if (CvISXSUB (cb))
2939 croak ("Coro::State doesn't support XS functions as coroutine start, caught");
2940
2941 if (!CvROOT (cb))
2942 croak ("Coro::State doesn't support autoloaded or undefined functions as coroutine start, caught");
2943 }
2944 }
2945
2946 Newz (0, coro, 1, struct coro);
2947 coro->args = newAV ();
2948 coro->flags = CF_NEW;
2949
2950 if (coro_first) coro_first->prev = coro;
2951 coro->next = coro_first;
2952 coro_first = coro;
2953
2954 coro->hv = hv = newHV ();
2955 mg = sv_magicext ((SV *)hv, 0, CORO_MAGIC_type_state, &coro_state_vtbl, (char *)coro, 0);
2956 mg->mg_flags |= MGf_DUP;
2957 coro_sv = sv_bless (newRV_noinc ((SV *)hv), stash);
2958
2959 if (argc > 0)
2960 {
2961 av_extend (coro->args, argc + is_coro - 1);
2962
2963 if (is_coro)
2964 {
2965 av_push (coro->args, SvREFCNT_inc_NN ((SV *)cb));
2966 cb = (SV *)cv_coro_run;
2967 }
2968
2969 coro->startcv = (CV *)SvREFCNT_inc_NN ((SV *)cb);
2970
2971 for (i = 1; i < argc; i++)
2972 av_push (coro->args, newSVsv (argv [i]));
2973 }
2974
2975 return coro_sv;
2976}
2977
2978MODULE = Coro::State PACKAGE = Coro::State PREFIX = api_
2979
2980PROTOTYPES: DISABLE
2981
2982BOOT:
2983{
2984#ifdef USE_ITHREADS
2985# if CORO_PTHREAD
2986 coro_thx = PERL_GET_CONTEXT;
2987# endif
2988#endif
2989 BOOT_PAGESIZE;
2990
2991 cctx_current = cctx_new_empty ();
2992
2993 irsgv = gv_fetchpv ("/" , GV_ADD|GV_NOTQUAL, SVt_PV);
2994 stdoutgv = gv_fetchpv ("STDOUT", GV_ADD|GV_NOTQUAL, SVt_PVIO);
2995
2996 orig_sigelem_get = PL_vtbl_sigelem.svt_get; PL_vtbl_sigelem.svt_get = coro_sigelem_get;
2997 orig_sigelem_set = PL_vtbl_sigelem.svt_set; PL_vtbl_sigelem.svt_set = coro_sigelem_set;
2998 orig_sigelem_clr = PL_vtbl_sigelem.svt_clear; PL_vtbl_sigelem.svt_clear = coro_sigelem_clr;
2999
3000 hv_sig = coro_get_hv (aTHX_ "SIG", TRUE);
3001 rv_diehook = newRV_inc ((SV *)gv_fetchpv ("Coro::State::diehook" , 0, SVt_PVCV));
3002 rv_warnhook = newRV_inc ((SV *)gv_fetchpv ("Coro::State::warnhook", 0, SVt_PVCV));
3003
3004 coro_state_stash = gv_stashpv ("Coro::State", TRUE);
3005
3006 newCONSTSUB (coro_state_stash, "CC_TRACE" , newSViv (CC_TRACE));
3007 newCONSTSUB (coro_state_stash, "CC_TRACE_SUB" , newSViv (CC_TRACE_SUB));
3008 newCONSTSUB (coro_state_stash, "CC_TRACE_LINE", newSViv (CC_TRACE_LINE));
3009 newCONSTSUB (coro_state_stash, "CC_TRACE_ALL" , newSViv (CC_TRACE_ALL));
3010
3011 main_mainstack = PL_mainstack;
3012 main_top_env = PL_top_env;
3013
3014 while (main_top_env->je_prev)
3015 main_top_env = main_top_env->je_prev;
3016
3017 {
3018 SV *slf = sv_2mortal (newSViv (PTR2IV (pp_slf)));
3019
3020 if (!PL_custom_op_names) PL_custom_op_names = newHV ();
3021 hv_store_ent (PL_custom_op_names, slf, newSVpv ("coro_slf", 0), 0);
3022
3023 if (!PL_custom_op_descs) PL_custom_op_descs = newHV ();
3024 hv_store_ent (PL_custom_op_descs, slf, newSVpv ("coro schedule like function", 0), 0);
3025 }
3026
3027 coroapi.ver = CORO_API_VERSION;
3028 coroapi.rev = CORO_API_REVISION;
3029
3030 coroapi.transfer = api_transfer;
3031
3032 coroapi.sv_state = SvSTATE_;
3033 coroapi.execute_slf = api_execute_slf;
3034 coroapi.prepare_nop = prepare_nop;
3035 coroapi.prepare_schedule = prepare_schedule;
3036 coroapi.prepare_cede = prepare_cede;
3037 coroapi.prepare_cede_notself = prepare_cede_notself;
3038
3039 {
3040 SV **svp = hv_fetch (PL_modglobal, "Time::NVtime", 12, 0);
3041
3042 if (!svp) croak ("Time::HiRes is required");
3043 if (!SvIOK (*svp)) croak ("Time::NVtime isn't a function pointer");
3044
3045 nvtime = INT2PTR (double (*)(), SvIV (*svp));
3046
3047 svp = hv_fetch (PL_modglobal, "Time::U2time", 12, 0);
3048 u2time = INT2PTR (void (*)(pTHX_ UV ret[2]), SvIV (*svp));
3049 }
3050
3051 assert (("PRIO_NORMAL must be 0", !CORO_PRIO_NORMAL));
3052}
3053
3054SV *
3055new (SV *klass, ...)
3056 ALIAS:
3057 Coro::new = 1
3058 CODE:
3059 RETVAL = coro_new (aTHX_ ix ? coro_stash : coro_state_stash, &ST (1), items - 1, ix);
3060 OUTPUT:
3061 RETVAL
3062
3063void
3064transfer (...)
3065 PROTOTYPE: $$
3066 CODE:
3067 CORO_EXECUTE_SLF_XS (slf_init_transfer);
3068
3069bool
3070_destroy (SV *coro_sv)
3071 CODE:
3072 RETVAL = coro_state_destroy (aTHX_ SvSTATE (coro_sv));
3073 OUTPUT:
3074 RETVAL
3075
3076void
3077_exit (int code)
3078 PROTOTYPE: $
3079 CODE:
3080 _exit (code);
3081
3082SV *
3083clone (Coro::State coro)
3084 CODE:
3085{
3086#if CORO_CLONE
3087 struct coro *ncoro = coro_clone (aTHX_ coro);
3088 MAGIC *mg;
3089 /* TODO: too much duplication */
3090 ncoro->hv = newHV ();
3091 mg = sv_magicext ((SV *)ncoro->hv, 0, CORO_MAGIC_type_state, &coro_state_vtbl, (char *)ncoro, 0);
3092 mg->mg_flags |= MGf_DUP;
3093 RETVAL = sv_bless (newRV_noinc ((SV *)ncoro->hv), SvSTASH (coro->hv));
3094#else
3095 croak ("Coro::State->clone has not been configured into this installation of Coro, realised");
3096#endif
3097}
3098 OUTPUT:
3099 RETVAL
3100
3101int
3102cctx_stacksize (int new_stacksize = 0)
3103 PROTOTYPE: ;$
3104 CODE:
3105 RETVAL = cctx_stacksize;
3106 if (new_stacksize)
3107 {
3108 cctx_stacksize = new_stacksize;
3109 ++cctx_gen;
3110 }
3111 OUTPUT:
3112 RETVAL
3113
3114int
3115cctx_max_idle (int max_idle = 0)
3116 PROTOTYPE: ;$
3117 CODE:
3118 RETVAL = cctx_max_idle;
3119 if (max_idle > 1)
3120 cctx_max_idle = max_idle;
3121 OUTPUT:
3122 RETVAL
3123
3124int
3125cctx_count ()
3126 PROTOTYPE:
3127 CODE:
3128 RETVAL = cctx_count;
3129 OUTPUT:
3130 RETVAL
3131
3132int
3133cctx_idle ()
3134 PROTOTYPE:
3135 CODE:
3136 RETVAL = cctx_idle;
3137 OUTPUT:
3138 RETVAL
3139
3140void
3141list ()
3142 PROTOTYPE:
3143 PPCODE:
3144{
3145 struct coro *coro;
3146 for (coro = coro_first; coro; coro = coro->next)
3147 if (coro->hv)
3148 XPUSHs (sv_2mortal (newRV_inc ((SV *)coro->hv)));
3149}
3150
3151void
3152call (Coro::State coro, SV *coderef)
3153 ALIAS:
3154 eval = 1
3155 CODE:
3156{
3157 if (coro->mainstack && ((coro->flags & CF_RUNNING) || coro->slot))
3158 {
3159 struct coro *current = SvSTATE_current;
3160
3161 if (current != coro)
235 { 3162 {
236 /* I never used formats, so how should I know how these are implemented? */ 3163 PUTBACK;
237 /* my bold guess is as a simple, plain sub... */ 3164 save_perl (aTHX_ current);
238 croak ("CXt_FORMAT not yet handled. Don't switch coroutines from within formats"); 3165 load_perl (aTHX_ coro);
3166 SPAGAIN;
3167 }
3168
3169 PUSHSTACK;
3170
3171 PUSHMARK (SP);
3172 PUTBACK;
3173
3174 if (ix)
3175 eval_sv (coderef, 0);
3176 else
3177 call_sv (coderef, G_KEEPERR | G_EVAL | G_VOID | G_DISCARD);
3178
3179 POPSTACK;
3180 SPAGAIN;
3181
3182 if (current != coro)
3183 {
3184 PUTBACK;
3185 save_perl (aTHX_ coro);
3186 load_perl (aTHX_ current);
3187 SPAGAIN;
239 } 3188 }
240 } 3189 }
241
242 if (top_si->si_type == PERLSI_MAIN)
243 break;
244
245 top_si = top_si->si_prev;
246 ccstk = top_si->si_cxstack;
247 cxix = top_si->si_cxix;
248 }
249
250 PUTBACK;
251 }
252
253 c->dowarn = PL_dowarn;
254 c->defav = GvAV (PL_defgv);
255 c->curstackinfo = PL_curstackinfo;
256 c->curstack = PL_curstack;
257 c->mainstack = PL_mainstack;
258 c->stack_sp = PL_stack_sp;
259 c->op = PL_op;
260 c->curpad = PL_curpad;
261 c->stack_base = PL_stack_base;
262 c->stack_max = PL_stack_max;
263 c->tmps_stack = PL_tmps_stack;
264 c->tmps_floor = PL_tmps_floor;
265 c->tmps_ix = PL_tmps_ix;
266 c->tmps_max = PL_tmps_max;
267 c->markstack = PL_markstack;
268 c->markstack_ptr = PL_markstack_ptr;
269 c->markstack_max = PL_markstack_max;
270 c->scopestack = PL_scopestack;
271 c->scopestack_ix = PL_scopestack_ix;
272 c->scopestack_max = PL_scopestack_max;
273 c->savestack = PL_savestack;
274 c->savestack_ix = PL_savestack_ix;
275 c->savestack_max = PL_savestack_max;
276 c->retstack = PL_retstack;
277 c->retstack_ix = PL_retstack_ix;
278 c->retstack_max = PL_retstack_max;
279 c->curcop = PL_curcop;
280} 3190}
281 3191
282static void 3192SV *
283LOAD(pTHX_ Coro__State c) 3193is_ready (Coro::State coro)
284{
285 PL_dowarn = c->dowarn;
286 GvAV (PL_defgv) = c->defav;
287 PL_curstackinfo = c->curstackinfo;
288 PL_curstack = c->curstack;
289 PL_mainstack = c->mainstack;
290 PL_stack_sp = c->stack_sp;
291 PL_op = c->op;
292 PL_curpad = c->curpad;
293 PL_stack_base = c->stack_base;
294 PL_stack_max = c->stack_max;
295 PL_tmps_stack = c->tmps_stack;
296 PL_tmps_floor = c->tmps_floor;
297 PL_tmps_ix = c->tmps_ix;
298 PL_tmps_max = c->tmps_max;
299 PL_markstack = c->markstack;
300 PL_markstack_ptr = c->markstack_ptr;
301 PL_markstack_max = c->markstack_max;
302 PL_scopestack = c->scopestack;
303 PL_scopestack_ix = c->scopestack_ix;
304 PL_scopestack_max = c->scopestack_max;
305 PL_savestack = c->savestack;
306 PL_savestack_ix = c->savestack_ix;
307 PL_savestack_max = c->savestack_max;
308 PL_retstack = c->retstack;
309 PL_retstack_ix = c->retstack_ix;
310 PL_retstack_max = c->retstack_max;
311 PL_curcop = c->curcop;
312
313 {
314 dSP;
315 CV *cv;
316
317 /* now do the ugly restore mess */
318 while ((cv = (CV *)POPs))
319 {
320 AV *padlist = (AV *)POPs;
321
322 put_padlist (cv);
323 CvPADLIST(cv) = padlist;
324 CvDEPTH(cv) = (I32)POPs;
325
326#ifdef USE_THREADS
327 CvOWNER(cv) = (struct perl_thread *)POPs;
328 error does not work either
329#endif
330 }
331
332 PUTBACK;
333 }
334}
335
336/* this is an EXACT copy of S_nuke_stacks in perl.c, which is unfortunately static */
337STATIC void
338destroy_stacks(pTHX)
339{
340 dSP;
341
342 /* die does this while calling POPSTACK, but I just don't see why. */
343 dounwind(-1);
344
345 /* is this ugly, I ask? */
346 while (PL_scopestack_ix)
347 LEAVE;
348
349 while (PL_curstackinfo->si_next)
350 PL_curstackinfo = PL_curstackinfo->si_next;
351
352 while (PL_curstackinfo)
353 {
354 PERL_SI *p = PL_curstackinfo->si_prev;
355
356 SvREFCNT_dec(PL_curstackinfo->si_stack);
357 Safefree(PL_curstackinfo->si_cxstack);
358 Safefree(PL_curstackinfo);
359 PL_curstackinfo = p;
360 }
361
362 if (PL_scopestack_ix != 0)
363 Perl_warner(aTHX_ WARN_INTERNAL,
364 "Unbalanced scopes: %ld more ENTERs than LEAVEs\n",
365 (long)PL_scopestack_ix);
366 if (PL_savestack_ix != 0)
367 Perl_warner(aTHX_ WARN_INTERNAL,
368 "Unbalanced saves: %ld more saves than restores\n",
369 (long)PL_savestack_ix);
370 if (PL_tmps_floor != -1)
371 Perl_warner(aTHX_ WARN_INTERNAL,"Unbalanced tmps: %ld more allocs than frees\n",
372 (long)PL_tmps_floor + 1);
373 /*
374 */
375 Safefree(PL_tmps_stack);
376 Safefree(PL_markstack);
377 Safefree(PL_scopestack);
378 Safefree(PL_savestack);
379 Safefree(PL_retstack);
380}
381
382#define SUB_INIT "Coro::State::_newcoro"
383
384MODULE = Coro::State PACKAGE = Coro::State
385
386PROTOTYPES: ENABLE
387
388BOOT:
389 if (!padlist_cache)
390 padlist_cache = newHV ();
391
392Coro::State
393_newprocess(args)
394 SV * args
395 PROTOTYPE: $ 3194 PROTOTYPE: $
3195 ALIAS:
3196 is_ready = CF_READY
3197 is_running = CF_RUNNING
3198 is_new = CF_NEW
3199 is_destroyed = CF_DESTROYED
3200 is_suspended = CF_SUSPENDED
3201 CODE:
3202 RETVAL = boolSV (coro->flags & ix);
3203 OUTPUT:
3204 RETVAL
3205
3206void
3207throw (Coro::State self, SV *throw = &PL_sv_undef)
3208 PROTOTYPE: $;$
396 CODE: 3209 CODE:
397 Coro__State coro; 3210{
3211 struct coro *current = SvSTATE_current;
3212 SV **throwp = self == current ? &CORO_THROW : &self->except;
3213 SvREFCNT_dec (*throwp);
3214 SvGETMAGIC (throw);
3215 *throwp = SvOK (throw) ? newSVsv (throw) : 0;
3216}
398 3217
399 if (!SvROK (args) || SvTYPE (SvRV (args)) != SVt_PVAV) 3218void
400 croak ("Coro::State::newprocess expects an arrayref"); 3219api_trace (SV *coro, int flags = CC_TRACE | CC_TRACE_SUB)
3220 PROTOTYPE: $;$
3221 C_ARGS: aTHX_ coro, flags
3222
3223SV *
3224has_cctx (Coro::State coro)
3225 PROTOTYPE: $
3226 CODE:
3227 /* maybe manage the running flag differently */
3228 RETVAL = boolSV (!!coro->cctx || (coro->flags & CF_RUNNING));
3229 OUTPUT:
3230 RETVAL
3231
3232int
3233is_traced (Coro::State coro)
3234 PROTOTYPE: $
3235 CODE:
3236 RETVAL = (coro->cctx ? coro->cctx->flags : 0) & CC_TRACE_ALL;
3237 OUTPUT:
3238 RETVAL
3239
3240UV
3241rss (Coro::State coro)
3242 PROTOTYPE: $
3243 ALIAS:
3244 usecount = 1
3245 CODE:
3246 switch (ix)
3247 {
3248 case 0: RETVAL = coro_rss (aTHX_ coro); break;
3249 case 1: RETVAL = coro->usecount; break;
401 3250 }
402 New (0, coro, 1, struct coro); 3251 OUTPUT:
403
404 coro->mainstack = 0; /* actual work is done inside transfer */
405 coro->args = (AV *)SvREFCNT_inc (SvRV (args));
406
407 RETVAL = coro; 3252 RETVAL
3253
3254void
3255force_cctx ()
3256 PROTOTYPE:
3257 CODE:
3258 cctx_current->idle_sp = 0;
3259
3260void
3261swap_defsv (Coro::State self)
3262 PROTOTYPE: $
3263 ALIAS:
3264 swap_defav = 1
3265 CODE:
3266 if (!self->slot)
3267 croak ("cannot swap state with coroutine that has no saved state,");
3268 else
3269 {
3270 SV **src = ix ? (SV **)&GvAV (PL_defgv) : &GvSV (PL_defgv);
3271 SV **dst = ix ? (SV **)&self->slot->defav : (SV **)&self->slot->defsv;
3272
3273 SV *tmp = *src; *src = *dst; *dst = tmp;
3274 }
3275
3276void
3277cancel (Coro::State self)
3278 CODE:
3279 coro_state_destroy (aTHX_ self);
3280 coro_call_on_destroy (aTHX_ self); /* actually only for Coro objects */
3281
3282
3283SV *
3284enable_times (int enabled = enable_times)
3285 CODE:
3286{
3287 RETVAL = boolSV (enable_times);
3288
3289 if (enabled != enable_times)
3290 {
3291 enable_times = enabled;
3292
3293 coro_times_update ();
3294 (enabled ? coro_times_sub : coro_times_add)(SvSTATE (coro_current));
3295 }
3296}
408 OUTPUT: 3297 OUTPUT:
409 RETVAL 3298 RETVAL
410 3299
411void 3300void
412transfer(prev,next) 3301times (Coro::State self)
413 Coro::State_or_hashref prev 3302 PPCODE:
414 Coro::State_or_hashref next 3303{
3304 struct coro *current = SvSTATE (coro_current);
3305
3306 if (expect_false (current == self))
3307 {
3308 coro_times_update ();
3309 coro_times_add (SvSTATE (coro_current));
3310 }
3311
3312 EXTEND (SP, 2);
3313 PUSHs (sv_2mortal (newSVnv (self->t_real [0] + self->t_real [1] * 1e-9)));
3314 PUSHs (sv_2mortal (newSVnv (self->t_cpu [0] + self->t_cpu [1] * 1e-9)));
3315
3316 if (expect_false (current == self))
3317 coro_times_sub (SvSTATE (coro_current));
3318}
3319
3320MODULE = Coro::State PACKAGE = Coro
3321
3322BOOT:
3323{
3324 sv_pool_rss = coro_get_sv (aTHX_ "Coro::POOL_RSS" , TRUE);
3325 sv_pool_size = coro_get_sv (aTHX_ "Coro::POOL_SIZE" , TRUE);
3326 cv_coro_run = get_cv ( "Coro::_coro_run" , GV_ADD);
3327 cv_coro_terminate = get_cv ( "Coro::terminate" , GV_ADD);
3328 coro_current = coro_get_sv (aTHX_ "Coro::current" , FALSE); SvREADONLY_on (coro_current);
3329 av_async_pool = coro_get_av (aTHX_ "Coro::async_pool", TRUE);
3330 av_destroy = coro_get_av (aTHX_ "Coro::destroy" , TRUE);
3331 sv_manager = coro_get_sv (aTHX_ "Coro::manager" , TRUE);
3332 sv_idle = coro_get_sv (aTHX_ "Coro::idle" , TRUE);
3333
3334 sv_async_pool_idle = newSVpv ("[async pool idle]", 0); SvREADONLY_on (sv_async_pool_idle);
3335 sv_Coro = newSVpv ("Coro", 0); SvREADONLY_on (sv_Coro);
3336 cv_pool_handler = get_cv ("Coro::pool_handler", GV_ADD); SvREADONLY_on (cv_pool_handler);
3337 CvNODEBUG_on (get_cv ("Coro::_pool_handler", 0)); /* work around a debugger bug */
3338
3339 coro_stash = gv_stashpv ("Coro", TRUE);
3340
3341 newCONSTSUB (coro_stash, "PRIO_MAX", newSViv (CORO_PRIO_MAX));
3342 newCONSTSUB (coro_stash, "PRIO_HIGH", newSViv (CORO_PRIO_HIGH));
3343 newCONSTSUB (coro_stash, "PRIO_NORMAL", newSViv (CORO_PRIO_NORMAL));
3344 newCONSTSUB (coro_stash, "PRIO_LOW", newSViv (CORO_PRIO_LOW));
3345 newCONSTSUB (coro_stash, "PRIO_IDLE", newSViv (CORO_PRIO_IDLE));
3346 newCONSTSUB (coro_stash, "PRIO_MIN", newSViv (CORO_PRIO_MIN));
3347
3348 {
3349 SV *sv = coro_get_sv (aTHX_ "Coro::API", TRUE);
3350
3351 coroapi.schedule = api_schedule;
3352 coroapi.schedule_to = api_schedule_to;
3353 coroapi.cede = api_cede;
3354 coroapi.cede_notself = api_cede_notself;
3355 coroapi.ready = api_ready;
3356 coroapi.is_ready = api_is_ready;
3357 coroapi.nready = coro_nready;
3358 coroapi.current = coro_current;
3359
3360 /*GCoroAPI = &coroapi;*/
3361 sv_setiv (sv, (IV)&coroapi);
3362 SvREADONLY_on (sv);
3363 }
3364}
3365
3366SV *
3367async (...)
3368 PROTOTYPE: &@
415 CODE: 3369 CODE:
3370 RETVAL = coro_new (aTHX_ coro_stash, &ST (0), items, 1);
3371 api_ready (aTHX_ RETVAL);
3372 OUTPUT:
3373 RETVAL
416 3374
417 if (prev != next) 3375void
3376terminate (...)
3377 CODE:
3378 CORO_EXECUTE_SLF_XS (slf_init_terminate);
3379
3380void
3381schedule (...)
3382 CODE:
3383 CORO_EXECUTE_SLF_XS (slf_init_schedule);
3384
3385void
3386schedule_to (...)
3387 CODE:
3388 CORO_EXECUTE_SLF_XS (slf_init_schedule_to);
3389
3390void
3391cede_to (...)
3392 CODE:
3393 CORO_EXECUTE_SLF_XS (slf_init_cede_to);
3394
3395void
3396cede (...)
3397 CODE:
3398 CORO_EXECUTE_SLF_XS (slf_init_cede);
3399
3400void
3401cede_notself (...)
3402 CODE:
3403 CORO_EXECUTE_SLF_XS (slf_init_cede_notself);
3404
3405void
3406_set_current (SV *current)
3407 PROTOTYPE: $
3408 CODE:
3409 SvREFCNT_dec (SvRV (coro_current));
3410 SvRV_set (coro_current, SvREFCNT_inc_NN (SvRV (current)));
3411
3412void
3413_set_readyhook (SV *hook)
3414 PROTOTYPE: $
3415 CODE:
3416 SvREFCNT_dec (coro_readyhook);
3417 SvGETMAGIC (hook);
3418 if (SvOK (hook))
3419 {
3420 coro_readyhook = newSVsv (hook);
3421 CORO_READYHOOK = invoke_sv_ready_hook_helper;
3422 }
3423 else
418 { 3424 {
3425 coro_readyhook = 0;
3426 CORO_READYHOOK = 0;
3427 }
3428
3429int
3430prio (Coro::State coro, int newprio = 0)
3431 PROTOTYPE: $;$
3432 ALIAS:
3433 nice = 1
3434 CODE:
3435{
3436 RETVAL = coro->prio;
3437
3438 if (items > 1)
3439 {
3440 if (ix)
3441 newprio = coro->prio - newprio;
3442
3443 if (newprio < CORO_PRIO_MIN) newprio = CORO_PRIO_MIN;
3444 if (newprio > CORO_PRIO_MAX) newprio = CORO_PRIO_MAX;
3445
3446 coro->prio = newprio;
3447 }
3448}
3449 OUTPUT:
3450 RETVAL
3451
3452SV *
3453ready (SV *self)
3454 PROTOTYPE: $
3455 CODE:
3456 RETVAL = boolSV (api_ready (aTHX_ self));
3457 OUTPUT:
3458 RETVAL
3459
3460int
3461nready (...)
3462 PROTOTYPE:
3463 CODE:
3464 RETVAL = coro_nready;
3465 OUTPUT:
3466 RETVAL
3467
3468void
3469suspend (Coro::State self)
3470 PROTOTYPE: $
3471 CODE:
3472 self->flags |= CF_SUSPENDED;
3473
3474void
3475resume (Coro::State self)
3476 PROTOTYPE: $
3477 CODE:
3478 self->flags &= ~CF_SUSPENDED;
3479
3480void
3481_pool_handler (...)
3482 CODE:
3483 CORO_EXECUTE_SLF_XS (slf_init_pool_handler);
3484
3485void
3486async_pool (SV *cv, ...)
3487 PROTOTYPE: &@
3488 PPCODE:
3489{
3490 HV *hv = (HV *)av_pop (av_async_pool);
3491 AV *av = newAV ();
3492 SV *cb = ST (0);
3493 int i;
3494
3495 av_extend (av, items - 2);
3496 for (i = 1; i < items; ++i)
3497 av_push (av, SvREFCNT_inc_NN (ST (i)));
3498
3499 if ((SV *)hv == &PL_sv_undef)
3500 {
3501 SV *sv = coro_new (aTHX_ coro_stash, (SV **)&cv_pool_handler, 1, 1);
3502 hv = (HV *)SvREFCNT_inc_NN (SvRV (sv));
3503 SvREFCNT_dec (sv);
3504 }
3505
3506 {
3507 struct coro *coro = SvSTATE_hv (hv);
3508
3509 assert (!coro->invoke_cb);
3510 assert (!coro->invoke_av);
3511 coro->invoke_cb = SvREFCNT_inc (cb);
3512 coro->invoke_av = av;
3513 }
3514
3515 api_ready (aTHX_ (SV *)hv);
3516
3517 if (GIMME_V != G_VOID)
3518 XPUSHs (sv_2mortal (newRV_noinc ((SV *)hv)));
3519 else
3520 SvREFCNT_dec (hv);
3521}
3522
3523SV *
3524rouse_cb ()
3525 PROTOTYPE:
3526 CODE:
3527 RETVAL = coro_new_rouse_cb (aTHX);
3528 OUTPUT:
3529 RETVAL
3530
3531void
3532rouse_wait (...)
3533 PROTOTYPE: ;$
3534 PPCODE:
3535 CORO_EXECUTE_SLF_XS (slf_init_rouse_wait);
3536
3537void
3538on_enter (SV *block)
3539 ALIAS:
3540 on_leave = 1
3541 PROTOTYPE: &
3542 CODE:
3543{
3544 struct coro *coro = SvSTATE_current;
3545 AV **avp = ix ? &coro->on_leave : &coro->on_enter;
3546
3547 block = s_get_cv_croak (block);
3548
3549 if (!*avp)
3550 *avp = newAV ();
3551
3552 av_push (*avp, SvREFCNT_inc (block));
3553
3554 if (!ix)
3555 on_enterleave_call (aTHX_ block);
3556
3557 LEAVE; /* pp_entersub unfortunately forces an ENTER/LEAVE around XS calls */
3558 SAVEDESTRUCTOR_X (ix ? coro_pop_on_leave : coro_pop_on_enter, (void *)coro);
3559 ENTER; /* pp_entersub unfortunately forces an ENTER/LEAVE around XS calls */
3560}
3561
3562
3563MODULE = Coro::State PACKAGE = PerlIO::cede
3564
3565BOOT:
3566 PerlIO_define_layer (aTHX_ &PerlIO_cede);
3567
3568
3569MODULE = Coro::State PACKAGE = Coro::Semaphore
3570
3571SV *
3572new (SV *klass, SV *count = 0)
3573 CODE:
3574{
3575 int semcnt = 1;
3576
3577 if (count)
3578 {
3579 SvGETMAGIC (count);
3580
3581 if (SvOK (count))
3582 semcnt = SvIV (count);
3583 }
3584
3585 RETVAL = sv_bless (
3586 coro_waitarray_new (aTHX_ semcnt),
3587 GvSTASH (CvGV (cv))
3588 );
3589}
3590 OUTPUT:
3591 RETVAL
3592
3593# helper for Coro::Channel and others
3594SV *
3595_alloc (int count)
3596 CODE:
3597 RETVAL = coro_waitarray_new (aTHX_ count);
3598 OUTPUT:
3599 RETVAL
3600
3601SV *
3602count (SV *self)
3603 CODE:
3604 RETVAL = newSVsv (AvARRAY ((AV *)SvRV (self))[0]);
3605 OUTPUT:
3606 RETVAL
3607
3608void
3609up (SV *self, int adjust = 1)
3610 ALIAS:
3611 adjust = 1
3612 CODE:
3613 coro_semaphore_adjust (aTHX_ (AV *)SvRV (self), ix ? adjust : 1);
3614
3615void
3616down (...)
3617 CODE:
3618 CORO_EXECUTE_SLF_XS (slf_init_semaphore_down);
3619
3620void
3621wait (...)
3622 CODE:
3623 CORO_EXECUTE_SLF_XS (slf_init_semaphore_wait);
3624
3625void
3626try (SV *self)
3627 PPCODE:
3628{
3629 AV *av = (AV *)SvRV (self);
3630 SV *count_sv = AvARRAY (av)[0];
3631 IV count = SvIVX (count_sv);
3632
3633 if (count > 0)
3634 {
3635 --count;
3636 SvIVX (count_sv) = count;
3637 XSRETURN_YES;
3638 }
3639 else
3640 XSRETURN_NO;
3641}
3642
3643void
3644waiters (SV *self)
3645 PPCODE:
3646{
3647 AV *av = (AV *)SvRV (self);
3648 int wcount = AvFILLp (av) + 1 - 1;
3649
3650 if (GIMME_V == G_SCALAR)
3651 XPUSHs (sv_2mortal (newSViv (wcount)));
3652 else
3653 {
3654 int i;
3655 EXTEND (SP, wcount);
3656 for (i = 1; i <= wcount; ++i)
3657 PUSHs (sv_2mortal (newRV_inc (AvARRAY (av)[i])));
3658 }
3659}
3660
3661MODULE = Coro::State PACKAGE = Coro::SemaphoreSet
3662
3663void
3664_may_delete (SV *sem, int count, int extra_refs)
3665 PPCODE:
3666{
3667 AV *av = (AV *)SvRV (sem);
3668
3669 if (SvREFCNT ((SV *)av) == 1 + extra_refs
3670 && AvFILLp (av) == 0 /* no waiters, just count */
3671 && SvIV (AvARRAY (av)[0]) == count)
3672 XSRETURN_YES;
3673
3674 XSRETURN_NO;
3675}
3676
3677MODULE = Coro::State PACKAGE = Coro::Signal
3678
3679SV *
3680new (SV *klass)
3681 CODE:
3682 RETVAL = sv_bless (
3683 coro_waitarray_new (aTHX_ 0),
3684 GvSTASH (CvGV (cv))
3685 );
3686 OUTPUT:
3687 RETVAL
3688
3689void
3690wait (...)
3691 CODE:
3692 CORO_EXECUTE_SLF_XS (slf_init_signal_wait);
3693
3694void
3695broadcast (SV *self)
3696 CODE:
3697{
3698 AV *av = (AV *)SvRV (self);
3699 coro_signal_wake (aTHX_ av, AvFILLp (av));
3700}
3701
3702void
3703send (SV *self)
3704 CODE:
3705{
3706 AV *av = (AV *)SvRV (self);
3707
3708 if (AvFILLp (av))
3709 coro_signal_wake (aTHX_ av, 1);
3710 else
3711 SvIVX (AvARRAY (av)[0]) = 1; /* remember the signal */
3712}
3713
3714IV
3715awaited (SV *self)
3716 CODE:
3717 RETVAL = AvFILLp ((AV *)SvRV (self)) + 1 - 1;
3718 OUTPUT:
3719 RETVAL
3720
3721
3722MODULE = Coro::State PACKAGE = Coro::AnyEvent
3723
3724BOOT:
3725 sv_activity = coro_get_sv (aTHX_ "Coro::AnyEvent::ACTIVITY", TRUE);
3726
3727void
3728_schedule (...)
3729 CODE:
3730{
3731 static int incede;
3732
3733 api_cede_notself (aTHX);
3734
3735 ++incede;
3736 while (coro_nready >= incede && api_cede (aTHX))
3737 ;
3738
3739 sv_setsv (sv_activity, &PL_sv_undef);
3740 if (coro_nready >= incede)
3741 {
3742 PUSHMARK (SP);
419 PUTBACK; 3743 PUTBACK;
420 SAVE (aTHX_ prev); 3744 call_pv ("Coro::AnyEvent::_activity", G_KEEPERR | G_EVAL | G_VOID | G_DISCARD);
421
422 /*
423 * this could be done in newprocess which would lead to
424 * extremely elegant and fast (just PUTBACK/SAVE/LOAD/SPAGAIN)
425 * code here, but lazy allocation of stacks has also
426 * some virtues and the overhead of the if() is nil.
427 */
428 if (next->mainstack)
429 {
430 LOAD (aTHX_ next);
431 next->mainstack = 0; /* unnecessary but much cleaner */
432 SPAGAIN;
433 }
434 else
435 {
436 /*
437 * emulate part of the perl startup here.
438 */
439 UNOP myop;
440
441 init_stacks (); /* from perl.c */
442 PL_op = (OP *)&myop;
443 /*PL_curcop = 0;*/
444 GvAV (PL_defgv) = (SV *)SvREFCNT_inc (next->args);
445
446 SPAGAIN;
447 Zero(&myop, 1, UNOP);
448 myop.op_next = Nullop;
449 myop.op_flags = OPf_WANT_VOID;
450
451 PUSHMARK(SP);
452 XPUSHs ((SV*)get_cv(SUB_INIT, TRUE));
453 PUTBACK;
454 /*
455 * the next line is slightly wrong, as PL_op->op_next
456 * is actually being executed so we skip the first op.
457 * that doesn't matter, though, since it is only
458 * pp_nextstate and we never return...
459 */
460 PL_op = Perl_pp_entersub(aTHX);
461 SPAGAIN;
462
463 ENTER;
464 }
465 } 3745 }
466 3746
3747 --incede;
3748}
3749
3750
3751MODULE = Coro::State PACKAGE = Coro::AIO
3752
467void 3753void
468DESTROY(coro) 3754_register (char *target, char *proto, SV *req)
469 Coro::State coro 3755 CODE:
470 CODE: 3756{
3757 SV *req_cv = s_get_cv_croak (req);
3758 /* newXSproto doesn't return the CV on 5.8 */
3759 CV *slf_cv = newXS (target, coro_aio_req_xs, __FILE__);
3760 sv_setpv ((SV *)slf_cv, proto);
3761 sv_magicext ((SV *)slf_cv, (SV *)req_cv, CORO_MAGIC_type_aio, 0, 0, 0);
3762}
471 3763
472 if (coro->mainstack) 3764MODULE = Coro::State PACKAGE = Coro::Select
3765
3766void
3767patch_pp_sselect ()
3768 CODE:
3769 if (!coro_old_pp_sselect)
473 { 3770 {
474 struct coro temp; 3771 coro_select_select = (SV *)get_cv ("Coro::Select::select", 0);
475 3772 coro_old_pp_sselect = PL_ppaddr [OP_SSELECT];
476 PUTBACK; 3773 PL_ppaddr [OP_SSELECT] = coro_pp_sselect;
477 SAVE(aTHX_ (&temp));
478 LOAD(aTHX_ coro);
479
480 destroy_stacks ();
481 SvREFCNT_dec ((SV *)GvAV (PL_defgv));
482
483 LOAD((&temp));
484 SPAGAIN;
485 } 3774 }
486 3775
487 SvREFCNT_dec (coro->args); 3776void
488 Safefree (coro); 3777unpatch_pp_sselect ()
3778 CODE:
3779 if (coro_old_pp_sselect)
3780 {
3781 PL_ppaddr [OP_SSELECT] = coro_old_pp_sselect;
3782 coro_old_pp_sselect = 0;
3783 }
489 3784
490 3785

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines