ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/Coro/Coro/State.xs
(Generate patch)

Comparing Coro/Coro/State.xs (file contents):
Revision 1.4 by root, Tue Jul 17 02:21:56 2001 UTC vs.
Revision 1.280 by root, Sun Nov 16 09:43:18 2008 UTC

1#include "libcoro/coro.c"
2
3#define PERL_NO_GET_CONTEXT
4#define PERL_EXT
5
1#include "EXTERN.h" 6#include "EXTERN.h"
2#include "perl.h" 7#include "perl.h"
3#include "XSUB.h" 8#include "XSUB.h"
9#include "perliol.h"
4 10
5#if 0 11#include "patchlevel.h"
6# define CHK(x) (void *)0 12
13#include <stdio.h>
14#include <errno.h>
15#include <assert.h>
16
17#ifdef WIN32
18# undef setjmp
19# undef longjmp
20# undef _exit
21# define setjmp _setjmp // deep magic, don't ask
7#else 22#else
8# define CHK(x) if (!(x)) croak("FATAL, CHK: " #x) 23# include <inttypes.h> /* most portable stdint.h */
24#endif
25
26#ifdef HAVE_MMAP
27# include <unistd.h>
28# include <sys/mman.h>
29# ifndef MAP_ANONYMOUS
30# ifdef MAP_ANON
31# define MAP_ANONYMOUS MAP_ANON
32# else
33# undef HAVE_MMAP
34# endif
9#endif 35# endif
36# include <limits.h>
37# ifndef PAGESIZE
38# define PAGESIZE pagesize
39# define BOOT_PAGESIZE pagesize = sysconf (_SC_PAGESIZE)
40static long pagesize;
41# else
42# define BOOT_PAGESIZE (void)0
43# endif
44#else
45# define PAGESIZE 0
46# define BOOT_PAGESIZE (void)0
47#endif
10 48
49#if CORO_USE_VALGRIND
50# include <valgrind/valgrind.h>
51#endif
52
53/* the maximum number of idle cctx that will be pooled */
54static int cctx_max_idle = 4;
55
56#define PERL_VERSION_ATLEAST(a,b,c) \
57 (PERL_REVISION > (a) \
58 || (PERL_REVISION == (a) \
59 && (PERL_VERSION > (b) \
60 || (PERL_VERSION == (b) && PERLSUBVERSION >= (c)))))
61
62#if !PERL_VERSION_ATLEAST (5,6,0)
63# ifndef PL_ppaddr
64# define PL_ppaddr ppaddr
65# endif
66# ifndef call_sv
67# define call_sv perl_call_sv
68# endif
69# ifndef get_sv
70# define get_sv perl_get_sv
71# endif
72# ifndef get_cv
73# define get_cv perl_get_cv
74# endif
75# ifndef IS_PADGV
76# define IS_PADGV(v) 0
77# endif
78# ifndef IS_PADCONST
79# define IS_PADCONST(v) 0
80# endif
81#endif
82
83/* 5.11 */
84#ifndef CxHASARGS
85# define CxHASARGS(cx) (cx)->blk_sub.hasargs
86#endif
87
88/* 5.10.0 */
89#ifndef SvREFCNT_inc_NN
90# define SvREFCNT_inc_NN(sv) SvREFCNT_inc (sv)
91#endif
92
93/* 5.8.8 */
94#ifndef GV_NOTQUAL
95# define GV_NOTQUAL 0
96#endif
97#ifndef newSV
98# define newSV(l) NEWSV(0,l)
99#endif
100
101/* 5.8.7 */
102#ifndef SvRV_set
103# define SvRV_set(s,v) SvRV(s) = (v)
104#endif
105
106#if !__i386 && !__x86_64 && !__powerpc && !__m68k && !__alpha && !__mips && !__sparc64
107# undef CORO_STACKGUARD
108#endif
109
110#ifndef CORO_STACKGUARD
111# define CORO_STACKGUARD 0
112#endif
113
114/* prefer perl internal functions over our own? */
115#ifndef CORO_PREFER_PERL_FUNCTIONS
116# define CORO_PREFER_PERL_FUNCTIONS 0
117#endif
118
119/* The next macros try to return the current stack pointer, in an as
120 * portable way as possible. */
121#if __GNUC__ >= 4
122# define dSTACKLEVEL void *stacklevel = __builtin_frame_address (0)
123#else
124# define dSTACKLEVEL volatile void *stacklevel = (volatile void *)&stacklevel
125#endif
126
127#define IN_DESTRUCT (PL_main_cv == Nullcv)
128
129#if __GNUC__ >= 3
130# define attribute(x) __attribute__(x)
131# define expect(expr,value) __builtin_expect ((expr),(value))
132# define INLINE static inline
133#else
134# define attribute(x)
135# define expect(expr,value) (expr)
136# define INLINE static
137#endif
138
139#define expect_false(expr) expect ((expr) != 0, 0)
140#define expect_true(expr) expect ((expr) != 0, 1)
141
142#define NOINLINE attribute ((noinline))
143
144#include "CoroAPI.h"
145
146#ifdef USE_ITHREADS
147# if CORO_PTHREAD
148static void *coro_thx;
149# endif
150#endif
151
152/* helper storage struct for Coro::AIO */
153struct io_state
154{
155 AV *res;
156 int errorno;
157 I32 laststype; /* U16 in 5.10.0 */
158 int laststatval;
159 Stat_t statcache;
160};
161
162static double (*nvtime)(); /* so why doesn't it take void? */
163
164static U32 cctx_gen;
165static size_t cctx_stacksize = CORO_STACKSIZE;
166static struct CoroAPI coroapi;
167static AV *main_mainstack; /* used to differentiate between $main and others */
168static JMPENV *main_top_env;
169static HV *coro_state_stash, *coro_stash;
170static volatile SV *coro_mortal; /* will be freed/thrown after next transfer */
171static volatile struct coro *transfer_next;
172
173static GV *irsgv; /* $/ */
174static GV *stdoutgv; /* *STDOUT */
175static SV *rv_diehook;
176static SV *rv_warnhook;
177static HV *hv_sig; /* %SIG */
178
179/* async_pool helper stuff */
180static SV *sv_pool_rss;
181static SV *sv_pool_size;
182static AV *av_async_pool;
183
184/* Coro::AnyEvent */
185static SV *sv_activity;
186
187static struct coro_cctx *cctx_first;
188static int cctx_count, cctx_idle;
189
190enum {
191 CC_MAPPED = 0x01,
192 CC_NOREUSE = 0x02, /* throw this away after tracing */
193 CC_TRACE = 0x04,
194 CC_TRACE_SUB = 0x08, /* trace sub calls */
195 CC_TRACE_LINE = 0x10, /* trace each statement */
196 CC_TRACE_ALL = CC_TRACE_SUB | CC_TRACE_LINE,
197};
198
199/* this is a structure representing a c-level coroutine */
200typedef struct coro_cctx
201{
202 struct coro_cctx *next;
203
204 /* the stack */
205 void *sptr;
206 size_t ssize;
207
208 /* cpu state */
209 void *idle_sp; /* sp of top-level transfer/schedule/cede call */
210 JMPENV *idle_te; /* same as idle_sp, but for top_env, TODO: remove once stable */
211 JMPENV *top_env;
212 coro_context cctx;
213
214 U32 gen;
215#if CORO_USE_VALGRIND
216 int valgrind_id;
217#endif
218 unsigned char flags;
219} coro_cctx;
220
221enum {
222 CF_RUNNING = 0x0001, /* coroutine is running */
223 CF_READY = 0x0002, /* coroutine is ready */
224 CF_NEW = 0x0004, /* has never been switched to */
225 CF_DESTROYED = 0x0008, /* coroutine data has been freed */
226};
227
228/* the structure where most of the perl state is stored, overlaid on the cxstack */
229typedef struct
230{
231 SV *defsv;
232 AV *defav;
233 SV *errsv;
234 SV *irsgv;
235#define VAR(name,type) type name;
236# include "state.h"
237#undef VAR
238} perl_slots;
239
240#define SLOT_COUNT ((sizeof (perl_slots) + sizeof (PERL_CONTEXT) - 1) / sizeof (PERL_CONTEXT))
241
242/* this is a structure representing a perl-level coroutine */
11struct coro { 243struct coro {
12 U8 dowarn; 244 /* the C coroutine allocated to this perl coroutine, if any */
13 AV *defav; 245 coro_cctx *cctx;
14 246
15 PERL_SI *curstackinfo; 247 /* process data */
16 AV *curstack; 248 struct CoroSLF slf_frame; /* saved slf frame */
17 AV *mainstack; 249 AV *mainstack;
18 SV **stack_sp; 250 perl_slots *slot; /* basically the saved sp */
19 OP *op;
20 SV **curpad;
21 SV **stack_base;
22 SV **stack_max;
23 SV **tmps_stack;
24 I32 tmps_floor;
25 I32 tmps_ix;
26 I32 tmps_max;
27 I32 *markstack;
28 I32 *markstack_ptr;
29 I32 *markstack_max;
30 I32 *scopestack;
31 I32 scopestack_ix;
32 I32 scopestack_max;
33 ANY *savestack;
34 I32 savestack_ix;
35 I32 savestack_max;
36 OP **retstack;
37 I32 retstack_ix;
38 I32 retstack_max;
39 COP *curcop;
40 251
41 AV *args; 252 AV *args; /* data associated with this coroutine (initial args) */
253 int refcnt; /* coroutines are refcounted, yes */
254 int flags; /* CF_ flags */
255 HV *hv; /* the perl hash associated with this coro, if any */
256 void (*on_destroy)(pTHX_ struct coro *coro);
257
258 /* statistics */
259 int usecount; /* number of transfers to this coro */
260
261 /* coro process data */
262 int prio;
263 SV *throw; /* exception to be thrown */
264
265 /* async_pool */
266 SV *saved_deffh;
267
268 /* linked list */
269 struct coro *next, *prev;
42}; 270};
43 271
44typedef struct coro *Coro__State; 272typedef struct coro *Coro__State;
45typedef struct coro *Coro__State_or_hashref; 273typedef struct coro *Coro__State_or_hashref;
46 274
47static HV *padlist_cache; 275static struct CoroSLF slf_frame; /* the current slf frame */
48 276
49/* mostly copied from op.c:cv_clone2 */ 277/** Coro ********************************************************************/
50STATIC AV * 278
51clone_padlist (AV *protopadlist) 279#define PRIO_MAX 3
280#define PRIO_HIGH 1
281#define PRIO_NORMAL 0
282#define PRIO_LOW -1
283#define PRIO_IDLE -3
284#define PRIO_MIN -4
285
286/* for Coro.pm */
287static SV *coro_current;
288static SV *coro_readyhook;
289static AV *coro_ready [PRIO_MAX - PRIO_MIN + 1];
290static struct coro *coro_first;
291#define coro_nready coroapi.nready
292
293/** lowlevel stuff **********************************************************/
294
295static SV *
296coro_get_sv (pTHX_ const char *name, int create)
52{ 297{
53 AV *av; 298#if PERL_VERSION_ATLEAST (5,10,0)
54 I32 ix; 299 /* silence stupid and wrong 5.10 warning that I am unable to switch off */
55 AV *protopad_name = (AV *) * av_fetch (protopadlist, 0, FALSE); 300 get_sv (name, create);
56 AV *protopad = (AV *) * av_fetch (protopadlist, 1, FALSE); 301#endif
57 SV **pname = AvARRAY (protopad_name); 302 return get_sv (name, create);
58 SV **ppad = AvARRAY (protopad); 303}
59 I32 fname = AvFILLp (protopad_name); 304
60 I32 fpad = AvFILLp (protopad); 305static AV *
306coro_get_av (pTHX_ const char *name, int create)
307{
308#if PERL_VERSION_ATLEAST (5,10,0)
309 /* silence stupid and wrong 5.10 warning that I am unable to switch off */
310 get_av (name, create);
311#endif
312 return get_av (name, create);
313}
314
315static HV *
316coro_get_hv (pTHX_ const char *name, int create)
317{
318#if PERL_VERSION_ATLEAST (5,10,0)
319 /* silence stupid and wrong 5.10 warning that I am unable to switch off */
320 get_hv (name, create);
321#endif
322 return get_hv (name, create);
323}
324
325static AV *
326coro_clone_padlist (pTHX_ CV *cv)
327{
328 AV *padlist = CvPADLIST (cv);
61 AV *newpadlist, *newpad_name, *newpad; 329 AV *newpadlist, *newpad;
62 SV **npad;
63
64 newpad_name = newAV ();
65 for (ix = fname; ix >= 0; ix--)
66 av_store (newpad_name, ix, SvREFCNT_inc (pname[ix]));
67
68 newpad = newAV ();
69 av_fill (newpad, AvFILLp (protopad));
70 npad = AvARRAY (newpad);
71 330
72 newpadlist = newAV (); 331 newpadlist = newAV ();
73 AvREAL_off (newpadlist); 332 AvREAL_off (newpadlist);
74 av_store (newpadlist, 0, (SV *) newpad_name); 333#if PERL_VERSION_ATLEAST (5,10,0)
334 Perl_pad_push (aTHX_ padlist, AvFILLp (padlist) + 1);
335#else
336 Perl_pad_push (aTHX_ padlist, AvFILLp (padlist) + 1, 1);
337#endif
338 newpad = (AV *)AvARRAY (padlist)[AvFILLp (padlist)];
339 --AvFILLp (padlist);
340
341 av_store (newpadlist, 0, SvREFCNT_inc_NN (*av_fetch (padlist, 0, FALSE)));
75 av_store (newpadlist, 1, (SV *) newpad); 342 av_store (newpadlist, 1, (SV *)newpad);
76 343
77 av = newAV (); /* will be @_ */ 344 return newpadlist;
78 av_extend (av, 0); 345}
79 av_store (newpad, 0, (SV *) av);
80 AvFLAGS (av) = AVf_REIFY;
81 346
82 for (ix = fpad; ix > 0; ix--) 347static void
348free_padlist (pTHX_ AV *padlist)
349{
350 /* may be during global destruction */
351 if (SvREFCNT (padlist))
83 { 352 {
84 SV *namesv = (ix <= fname) ? pname[ix] : Nullsv; 353 I32 i = AvFILLp (padlist);
85 if (namesv && namesv != &PL_sv_undef) 354 while (i >= 0)
86 { 355 {
87 char *name = SvPVX (namesv); /* XXX */ 356 SV **svp = av_fetch (padlist, i--, FALSE);
88 if (SvFLAGS (namesv) & SVf_FAKE || *name == '&') 357 if (svp)
89 { /* lexical from outside? */
90 npad[ix] = SvREFCNT_inc (ppad[ix]);
91 } 358 {
92 else
93 { /* our own lexical */
94 SV *sv; 359 SV *sv;
95 if (*name == '&') 360 while (&PL_sv_undef != (sv = av_pop ((AV *)*svp)))
96 sv = SvREFCNT_inc (ppad[ix]); 361 SvREFCNT_dec (sv);
97 else if (*name == '@') 362
98 sv = (SV *) newAV (); 363 SvREFCNT_dec (*svp);
99 else if (*name == '%')
100 sv = (SV *) newHV ();
101 else
102 sv = NEWSV (0, 0);
103 if (!SvPADBUSY (sv))
104 SvPADMY_on (sv);
105 npad[ix] = sv;
106 } 364 }
107 } 365 }
108 else if (IS_PADGV (ppad[ix]) || IS_PADCONST (ppad[ix]))
109 {
110 npad[ix] = SvREFCNT_inc (ppad[ix]);
111 }
112 else
113 {
114 SV *sv = NEWSV (0, 0);
115 SvPADTMP_on (sv);
116 npad[ix] = sv;
117 }
118 }
119 366
120#if 0 /* NONOTUNDERSTOOD */
121 /* Now that vars are all in place, clone nested closures. */
122
123 for (ix = fpad; ix > 0; ix--) {
124 SV* namesv = (ix <= fname) ? pname[ix] : Nullsv;
125 if (namesv
126 && namesv != &PL_sv_undef
127 && !(SvFLAGS(namesv) & SVf_FAKE)
128 && *SvPVX(namesv) == '&'
129 && CvCLONE(ppad[ix]))
130 {
131 CV *kid = cv_clone((CV*)ppad[ix]);
132 SvREFCNT_dec(ppad[ix]);
133 CvCLONE_on(kid);
134 SvPADMY_on(kid);
135 npad[ix] = (SV*)kid;
136 }
137 }
138#endif
139
140 return newpadlist;
141}
142
143STATIC AV *
144free_padlist (AV *padlist)
145{
146 /* may be during global destruction */
147 if (SvREFCNT(padlist))
148 {
149 I32 i = AvFILLp(padlist);
150 while (i >= 0)
151 {
152 SV **svp = av_fetch(padlist, i--, FALSE);
153 SV *sv = svp ? *svp : Nullsv;
154 if (sv)
155 SvREFCNT_dec(sv);
156 }
157
158 SvREFCNT_dec((SV*)padlist); 367 SvREFCNT_dec ((SV*)padlist);
368 }
369}
370
371static int
372coro_cv_free (pTHX_ SV *sv, MAGIC *mg)
373{
374 AV *padlist;
375 AV *av = (AV *)mg->mg_obj;
376
377 /* casting is fun. */
378 while (&PL_sv_undef != (SV *)(padlist = (AV *)av_pop (av)))
379 free_padlist (aTHX_ padlist);
380
381 SvREFCNT_dec (av); /* sv_magicext increased the refcount */
382
383 return 0;
384}
385
386#define CORO_MAGIC_type_cv PERL_MAGIC_ext
387#define CORO_MAGIC_type_state PERL_MAGIC_ext
388
389static MGVTBL coro_cv_vtbl = {
390 0, 0, 0, 0,
391 coro_cv_free
392};
393
394#define CORO_MAGIC(sv, type) \
395 SvMAGIC (sv) \
396 ? SvMAGIC (sv)->mg_type == type \
397 ? SvMAGIC (sv) \
398 : mg_find (sv, type) \
399 : 0
400
401#define CORO_MAGIC_cv(cv) CORO_MAGIC (((SV *)(cv)), CORO_MAGIC_type_cv)
402#define CORO_MAGIC_state(sv) CORO_MAGIC (((SV *)(sv)), CORO_MAGIC_type_state)
403
404INLINE struct coro *
405SvSTATE_ (pTHX_ SV *coro)
406{
407 HV *stash;
408 MAGIC *mg;
409
410 if (SvROK (coro))
411 coro = SvRV (coro);
412
413 if (expect_false (SvTYPE (coro) != SVt_PVHV))
414 croak ("Coro::State object required");
415
416 stash = SvSTASH (coro);
417 if (expect_false (stash != coro_stash && stash != coro_state_stash))
418 {
419 /* very slow, but rare, check */
420 if (!sv_derived_from (sv_2mortal (newRV_inc (coro)), "Coro::State"))
421 croak ("Coro::State object required");
422 }
423
424 mg = CORO_MAGIC_state (coro);
425 return (struct coro *)mg->mg_ptr;
426}
427
428#define SvSTATE(sv) SvSTATE_ (aTHX_ (sv))
429
430/* the next two functions merely cache the padlists */
431static void
432get_padlist (pTHX_ CV *cv)
433{
434 MAGIC *mg = CORO_MAGIC_cv (cv);
435 AV *av;
436
437 if (expect_true (mg && AvFILLp ((av = (AV *)mg->mg_obj)) >= 0))
438 CvPADLIST (cv) = (AV *)AvARRAY (av)[AvFILLp (av)--];
439 else
440 {
441#if CORO_PREFER_PERL_FUNCTIONS
442 /* this is probably cleaner? but also slower! */
443 /* in practise, it seems to be less stable */
444 CV *cp = Perl_cv_clone (cv);
445 CvPADLIST (cv) = CvPADLIST (cp);
446 CvPADLIST (cp) = 0;
447 SvREFCNT_dec (cp);
448#else
449 CvPADLIST (cv) = coro_clone_padlist (aTHX_ cv);
450#endif
451 }
452}
453
454static void
455put_padlist (pTHX_ CV *cv)
456{
457 MAGIC *mg = CORO_MAGIC_cv (cv);
458 AV *av;
459
460 if (expect_false (!mg))
461 mg = sv_magicext ((SV *)cv, (SV *)newAV (), CORO_MAGIC_type_cv, &coro_cv_vtbl, 0, 0);
462
463 av = (AV *)mg->mg_obj;
464
465 if (expect_false (AvFILLp (av) >= AvMAX (av)))
466 av_extend (av, AvMAX (av) + 1);
467
468 AvARRAY (av)[++AvFILLp (av)] = (SV *)CvPADLIST (cv);
469}
470
471/** load & save, init *******************************************************/
472
473static void
474load_perl (pTHX_ Coro__State c)
475{
476 perl_slots *slot = c->slot;
477 c->slot = 0;
478
479 PL_mainstack = c->mainstack;
480
481 GvSV (PL_defgv) = slot->defsv;
482 GvAV (PL_defgv) = slot->defav;
483 GvSV (PL_errgv) = slot->errsv;
484 GvSV (irsgv) = slot->irsgv;
485
486 #define VAR(name,type) PL_ ## name = slot->name;
487 # include "state.h"
488 #undef VAR
489
490 {
491 dSP;
492
493 CV *cv;
494
495 /* now do the ugly restore mess */
496 while (expect_true (cv = (CV *)POPs))
497 {
498 put_padlist (aTHX_ cv); /* mark this padlist as available */
499 CvDEPTH (cv) = PTR2IV (POPs);
500 CvPADLIST (cv) = (AV *)POPs;
501 }
502
503 PUTBACK;
159 } 504 }
160}
161 505
162/* the next tow functions merely cache the padlists */ 506 slf_frame = c->slf_frame;
163STATIC void
164get_padlist (CV *cv)
165{
166 SV **he = hv_fetch (padlist_cache, (void *)&cv, sizeof (CV *), 0);
167
168 if (he && AvFILLp ((AV *)*he) >= 0)
169 CvPADLIST (cv) = (AV *)av_pop ((AV *)*he);
170 else
171 CvPADLIST (cv) = clone_padlist (CvPADLIST (cv));
172} 507}
173 508
174STATIC void
175put_padlist (CV *cv)
176{
177 SV **he = hv_fetch (padlist_cache, (void *)&cv, sizeof (CV *), 1);
178
179 if (SvTYPE (*he) != SVt_PVAV)
180 {
181 SvREFCNT_dec (*he);
182 *he = (SV *)newAV ();
183 }
184
185 av_push ((AV *)*he, (SV *)CvPADLIST (cv));
186}
187
188static void 509static void
189SAVE(pTHX_ Coro__State c) 510save_perl (pTHX_ Coro__State c)
190{ 511{
512 c->slf_frame = slf_frame;
513
191 { 514 {
192 dSP; 515 dSP;
193 I32 cxix = cxstack_ix; 516 I32 cxix = cxstack_ix;
517 PERL_CONTEXT *ccstk = cxstack;
194 PERL_SI *top_si = PL_curstackinfo; 518 PERL_SI *top_si = PL_curstackinfo;
195 PERL_CONTEXT *ccstk = cxstack;
196 519
197 /* 520 /*
198 * the worst thing you can imagine happens first - we have to save 521 * the worst thing you can imagine happens first - we have to save
199 * (and reinitialize) all cv's in the whole callchain :( 522 * (and reinitialize) all cv's in the whole callchain :(
200 */ 523 */
201 524
202 PUSHs (Nullsv); 525 XPUSHs (Nullsv);
203 /* this loop was inspired by pp_caller */ 526 /* this loop was inspired by pp_caller */
204 for (;;) 527 for (;;)
205 { 528 {
206 while (cxix >= 0) 529 while (expect_true (cxix >= 0))
207 { 530 {
208 PERL_CONTEXT *cx = &ccstk[cxix--]; 531 PERL_CONTEXT *cx = &ccstk[cxix--];
209 532
210 if (CxTYPE(cx) == CXt_SUB) 533 if (expect_true (CxTYPE (cx) == CXt_SUB || CxTYPE (cx) == CXt_FORMAT))
211 { 534 {
212 CV *cv = cx->blk_sub.cv; 535 CV *cv = cx->blk_sub.cv;
536
213 if (CvDEPTH(cv)) 537 if (expect_true (CvDEPTH (cv)))
214 { 538 {
215#ifdef USE_THREADS
216 XPUSHs ((SV *)CvOWNER(cv));
217#endif
218 EXTEND (SP, 3); 539 EXTEND (SP, 3);
219 PUSHs ((SV *)CvDEPTH(cv));
220 PUSHs ((SV *)CvPADLIST(cv)); 540 PUSHs ((SV *)CvPADLIST (cv));
541 PUSHs (INT2PTR (SV *, (IV)CvDEPTH (cv)));
221 PUSHs ((SV *)cv); 542 PUSHs ((SV *)cv);
222 543
223 get_padlist (cv);
224
225 CvDEPTH(cv) = 0; 544 CvDEPTH (cv) = 0;
226#ifdef USE_THREADS 545 get_padlist (aTHX_ cv);
227 CvOWNER(cv) = 0;
228 error must unlock this cv etc.. etc...
229 if you are here wondering about this error message then
230 the reason is that it will not work as advertised yet
231#endif
232 } 546 }
233 } 547 }
234 else if (CxTYPE(cx) == CXt_FORMAT) 548 }
549
550 if (expect_true (top_si->si_type == PERLSI_MAIN))
551 break;
552
553 top_si = top_si->si_prev;
554 ccstk = top_si->si_cxstack;
555 cxix = top_si->si_cxix;
556 }
557
558 PUTBACK;
559 }
560
561 /* allocate some space on the context stack for our purposes */
562 /* we manually unroll here, as usually 2 slots is enough */
563 if (SLOT_COUNT >= 1) CXINC;
564 if (SLOT_COUNT >= 2) CXINC;
565 if (SLOT_COUNT >= 3) CXINC;
566 {
567 int i;
568 for (i = 3; i < SLOT_COUNT; ++i)
569 CXINC;
570 }
571 cxstack_ix -= SLOT_COUNT; /* undo allocation */
572
573 c->mainstack = PL_mainstack;
574
575 {
576 perl_slots *slot = c->slot = (perl_slots *)(cxstack + cxstack_ix + 1);
577
578 slot->defav = GvAV (PL_defgv);
579 slot->defsv = DEFSV;
580 slot->errsv = ERRSV;
581 slot->irsgv = GvSV (irsgv);
582
583 #define VAR(name,type) slot->name = PL_ ## name;
584 # include "state.h"
585 #undef VAR
586 }
587}
588
589/*
590 * allocate various perl stacks. This is almost an exact copy
591 * of perl.c:init_stacks, except that it uses less memory
592 * on the (sometimes correct) assumption that coroutines do
593 * not usually need a lot of stackspace.
594 */
595#if CORO_PREFER_PERL_FUNCTIONS
596# define coro_init_stacks init_stacks
597#else
598static void
599coro_init_stacks (pTHX)
600{
601 PL_curstackinfo = new_stackinfo(32, 8);
602 PL_curstackinfo->si_type = PERLSI_MAIN;
603 PL_curstack = PL_curstackinfo->si_stack;
604 PL_mainstack = PL_curstack; /* remember in case we switch stacks */
605
606 PL_stack_base = AvARRAY(PL_curstack);
607 PL_stack_sp = PL_stack_base;
608 PL_stack_max = PL_stack_base + AvMAX(PL_curstack);
609
610 New(50,PL_tmps_stack,32,SV*);
611 PL_tmps_floor = -1;
612 PL_tmps_ix = -1;
613 PL_tmps_max = 32;
614
615 New(54,PL_markstack,16,I32);
616 PL_markstack_ptr = PL_markstack;
617 PL_markstack_max = PL_markstack + 16;
618
619#ifdef SET_MARK_OFFSET
620 SET_MARK_OFFSET;
621#endif
622
623 New(54,PL_scopestack,8,I32);
624 PL_scopestack_ix = 0;
625 PL_scopestack_max = 8;
626
627 New(54,PL_savestack,24,ANY);
628 PL_savestack_ix = 0;
629 PL_savestack_max = 24;
630
631#if !PERL_VERSION_ATLEAST (5,10,0)
632 New(54,PL_retstack,4,OP*);
633 PL_retstack_ix = 0;
634 PL_retstack_max = 4;
635#endif
636}
637#endif
638
639/*
640 * destroy the stacks, the callchain etc...
641 */
642static void
643coro_destruct_stacks (pTHX)
644{
645 while (PL_curstackinfo->si_next)
646 PL_curstackinfo = PL_curstackinfo->si_next;
647
648 while (PL_curstackinfo)
649 {
650 PERL_SI *p = PL_curstackinfo->si_prev;
651
652 if (!IN_DESTRUCT)
653 SvREFCNT_dec (PL_curstackinfo->si_stack);
654
655 Safefree (PL_curstackinfo->si_cxstack);
656 Safefree (PL_curstackinfo);
657 PL_curstackinfo = p;
658 }
659
660 Safefree (PL_tmps_stack);
661 Safefree (PL_markstack);
662 Safefree (PL_scopestack);
663 Safefree (PL_savestack);
664#if !PERL_VERSION_ATLEAST (5,10,0)
665 Safefree (PL_retstack);
666#endif
667}
668
669static size_t
670coro_rss (pTHX_ struct coro *coro)
671{
672 size_t rss = sizeof (*coro);
673
674 if (coro->mainstack)
675 {
676 perl_slots tmp_slot;
677 perl_slots *slot;
678
679 if (coro->flags & CF_RUNNING)
680 {
681 slot = &tmp_slot;
682
683 #define VAR(name,type) slot->name = PL_ ## name;
684 # include "state.h"
685 #undef VAR
686 }
687 else
688 slot = coro->slot;
689
690 if (slot)
691 {
692 rss += sizeof (slot->curstackinfo);
693 rss += (slot->curstackinfo->si_cxmax + 1) * sizeof (PERL_CONTEXT);
694 rss += sizeof (SV) + sizeof (struct xpvav) + (1 + AvMAX (slot->curstack)) * sizeof (SV *);
695 rss += slot->tmps_max * sizeof (SV *);
696 rss += (slot->markstack_max - slot->markstack_ptr) * sizeof (I32);
697 rss += slot->scopestack_max * sizeof (I32);
698 rss += slot->savestack_max * sizeof (ANY);
699
700#if !PERL_VERSION_ATLEAST (5,10,0)
701 rss += slot->retstack_max * sizeof (OP *);
702#endif
703 }
704 }
705
706 return rss;
707}
708
709/** coroutine stack handling ************************************************/
710
711static int (*orig_sigelem_get) (pTHX_ SV *sv, MAGIC *mg);
712static int (*orig_sigelem_set) (pTHX_ SV *sv, MAGIC *mg);
713static int (*orig_sigelem_clr) (pTHX_ SV *sv, MAGIC *mg);
714
715/* apparently < 5.8.8 */
716#ifndef MgPV_nolen_const
717#define MgPV_nolen_const(mg) (((((int)(mg)->mg_len)) == HEf_SVKEY) ? \
718 SvPV_nolen((SV*)((mg)->mg_ptr)) : \
719 (const char*)(mg)->mg_ptr)
720#endif
721
722/*
723 * This overrides the default magic get method of %SIG elements.
724 * The original one doesn't provide for reading back of PL_diehook/PL_warnhook
725 * and instead of tryign to save and restore the hash elements, we just provide
726 * readback here.
727 * We only do this when the hook is != 0, as they are often set to 0 temporarily,
728 * not expecting this to actually change the hook. This is a potential problem
729 * when a schedule happens then, but we ignore this.
730 */
731static int
732coro_sigelem_get (pTHX_ SV *sv, MAGIC *mg)
733{
734 const char *s = MgPV_nolen_const (mg);
735
736 if (*s == '_')
737 {
738 SV **svp = 0;
739
740 if (strEQ (s, "__DIE__" )) svp = &PL_diehook;
741 if (strEQ (s, "__WARN__")) svp = &PL_warnhook;
742
743 if (svp)
744 {
745 sv_setsv (sv, *svp ? *svp : &PL_sv_undef);
746 return 0;
747 }
748 }
749
750 return orig_sigelem_get ? orig_sigelem_get (aTHX_ sv, mg) : 0;
751}
752
753static int
754coro_sigelem_clr (pTHX_ SV *sv, MAGIC *mg)
755{
756 const char *s = MgPV_nolen_const (mg);
757
758 if (*s == '_')
759 {
760 SV **svp = 0;
761
762 if (strEQ (s, "__DIE__" )) svp = &PL_diehook;
763 if (strEQ (s, "__WARN__")) svp = &PL_warnhook;
764
765 if (svp)
766 {
767 SV *old = *svp;
768 *svp = 0;
769 SvREFCNT_dec (old);
770 return 0;
771 }
772 }
773
774 return orig_sigelem_clr ? orig_sigelem_clr (aTHX_ sv, mg) : 0;
775}
776
777static int
778coro_sigelem_set (pTHX_ SV *sv, MAGIC *mg)
779{
780 const char *s = MgPV_nolen_const (mg);
781
782 if (*s == '_')
783 {
784 SV **svp = 0;
785
786 if (strEQ (s, "__DIE__" )) svp = &PL_diehook;
787 if (strEQ (s, "__WARN__")) svp = &PL_warnhook;
788
789 if (svp)
790 {
791 SV *old = *svp;
792 *svp = newSVsv (sv);
793 SvREFCNT_dec (old);
794 return 0;
795 }
796 }
797
798 return orig_sigelem_set ? orig_sigelem_set (aTHX_ sv, mg) : 0;
799}
800
801static void
802prepare_nop (pTHX_ struct coro_transfer_args *ta)
803{
804 /* kind of mega-hacky, but works */
805 ta->next = ta->prev = (struct coro *)ta;
806}
807
808static int
809slf_check_nop (pTHX_ struct CoroSLF *frame)
810{
811 return 0;
812}
813
814static void
815coro_setup (pTHX_ struct coro *coro)
816{
817 /*
818 * emulate part of the perl startup here.
819 */
820 coro_init_stacks (aTHX);
821
822 PL_runops = RUNOPS_DEFAULT;
823 PL_curcop = &PL_compiling;
824 PL_in_eval = EVAL_NULL;
825 PL_comppad = 0;
826 PL_curpm = 0;
827 PL_curpad = 0;
828 PL_localizing = 0;
829 PL_dirty = 0;
830 PL_restartop = 0;
831#if PERL_VERSION_ATLEAST (5,10,0)
832 PL_parser = 0;
833#endif
834
835 /* recreate the die/warn hooks */
836 PL_diehook = 0; SvSetMagicSV (*hv_fetch (hv_sig, "__DIE__" , sizeof ("__DIE__" ) - 1, 1), rv_diehook );
837 PL_warnhook = 0; SvSetMagicSV (*hv_fetch (hv_sig, "__WARN__", sizeof ("__WARN__") - 1, 1), rv_warnhook);
838
839 GvSV (PL_defgv) = newSV (0);
840 GvAV (PL_defgv) = coro->args; coro->args = 0;
841 GvSV (PL_errgv) = newSV (0);
842 GvSV (irsgv) = newSVpvn ("\n", 1); sv_magic (GvSV (irsgv), (SV *)irsgv, PERL_MAGIC_sv, "/", 0);
843 PL_rs = newSVsv (GvSV (irsgv));
844 PL_defoutgv = (GV *)SvREFCNT_inc_NN (stdoutgv);
845
846 {
847 dSP;
848 UNOP myop;
849
850 Zero (&myop, 1, UNOP);
851 myop.op_next = Nullop;
852 myop.op_flags = OPf_WANT_VOID;
853
854 PUSHMARK (SP);
855 XPUSHs (sv_2mortal (av_shift (GvAV (PL_defgv))));
856 PUTBACK;
857 PL_op = (OP *)&myop;
858 PL_op = PL_ppaddr[OP_ENTERSUB](aTHX);
859 SPAGAIN;
860 }
861
862 /* this newly created coroutine might be run on an existing cctx which most
863 * likely was suspended in pp_slf, so we have to emulate entering pp_slf here.
864 */
865 slf_frame.prepare = prepare_nop; /* provide a nop function for an eventual pp_slf */
866 slf_frame.check = slf_check_nop; /* signal pp_slf to not repeat */
867}
868
869static void
870coro_destruct (pTHX_ struct coro *coro)
871{
872 if (!IN_DESTRUCT)
873 {
874 /* restore all saved variables and stuff */
875 LEAVE_SCOPE (0);
876 assert (PL_tmps_floor == -1);
877
878 /* free all temporaries */
879 FREETMPS;
880 assert (PL_tmps_ix == -1);
881
882 /* unwind all extra stacks */
883 POPSTACK_TO (PL_mainstack);
884
885 /* unwind main stack */
886 dounwind (-1);
887 }
888
889 SvREFCNT_dec (GvSV (PL_defgv));
890 SvREFCNT_dec (GvAV (PL_defgv));
891 SvREFCNT_dec (GvSV (PL_errgv));
892 SvREFCNT_dec (PL_defoutgv);
893 SvREFCNT_dec (PL_rs);
894 SvREFCNT_dec (GvSV (irsgv));
895
896 SvREFCNT_dec (PL_diehook);
897 SvREFCNT_dec (PL_warnhook);
898
899 SvREFCNT_dec (coro->saved_deffh);
900 SvREFCNT_dec (coro->throw);
901
902 coro_destruct_stacks (aTHX);
903}
904
905INLINE void
906free_coro_mortal (pTHX)
907{
908 if (expect_true (coro_mortal))
909 {
910 SvREFCNT_dec (coro_mortal);
911 coro_mortal = 0;
912 }
913}
914
915static int
916runops_trace (pTHX)
917{
918 COP *oldcop = 0;
919 int oldcxix = -2;
920 struct coro *coro = SvSTATE (coro_current); /* trace cctx is tied to specific coro */
921 coro_cctx *cctx = coro->cctx;
922
923 while ((PL_op = CALL_FPTR (PL_op->op_ppaddr) (aTHX)))
924 {
925 PERL_ASYNC_CHECK ();
926
927 if (cctx->flags & CC_TRACE_ALL)
928 {
929 if (PL_op->op_type == OP_LEAVESUB && cctx->flags & CC_TRACE_SUB)
930 {
931 PERL_CONTEXT *cx = &cxstack[cxstack_ix];
932 SV **bot, **top;
933 AV *av = newAV (); /* return values */
934 SV **cb;
935 dSP;
936
937 GV *gv = CvGV (cx->blk_sub.cv);
938 SV *fullname = sv_2mortal (newSV (0));
939 if (isGV (gv))
940 gv_efullname3 (fullname, gv, 0);
941
942 bot = PL_stack_base + cx->blk_oldsp + 1;
943 top = cx->blk_gimme == G_ARRAY ? SP + 1
944 : cx->blk_gimme == G_SCALAR ? bot + 1
945 : bot;
946
947 av_extend (av, top - bot);
948 while (bot < top)
949 av_push (av, SvREFCNT_inc_NN (*bot++));
950
951 PL_runops = RUNOPS_DEFAULT;
952 ENTER;
953 SAVETMPS;
954 EXTEND (SP, 3);
955 PUSHMARK (SP);
956 PUSHs (&PL_sv_no);
957 PUSHs (fullname);
958 PUSHs (sv_2mortal (newRV_noinc ((SV *)av)));
959 PUTBACK;
960 cb = hv_fetch ((HV *)SvRV (coro_current), "_trace_sub_cb", sizeof ("_trace_sub_cb") - 1, 0);
961 if (cb) call_sv (*cb, G_KEEPERR | G_EVAL | G_VOID | G_DISCARD);
962 SPAGAIN;
963 FREETMPS;
964 LEAVE;
965 PL_runops = runops_trace;
966 }
967
968 if (oldcop != PL_curcop)
969 {
970 oldcop = PL_curcop;
971
972 if (PL_curcop != &PL_compiling)
973 {
974 SV **cb;
975
976 if (oldcxix != cxstack_ix && cctx->flags & CC_TRACE_SUB)
977 {
978 PERL_CONTEXT *cx = &cxstack[cxstack_ix];
979
980 if (CxTYPE (cx) == CXt_SUB && oldcxix < cxstack_ix)
981 {
982 runops_proc_t old_runops = PL_runops;
983 dSP;
984 GV *gv = CvGV (cx->blk_sub.cv);
985 SV *fullname = sv_2mortal (newSV (0));
986
987 if (isGV (gv))
988 gv_efullname3 (fullname, gv, 0);
989
990 PL_runops = RUNOPS_DEFAULT;
991 ENTER;
992 SAVETMPS;
993 EXTEND (SP, 3);
994 PUSHMARK (SP);
995 PUSHs (&PL_sv_yes);
996 PUSHs (fullname);
997 PUSHs (CxHASARGS (cx) ? sv_2mortal (newRV_inc ((SV *)cx->blk_sub.argarray)) : &PL_sv_undef);
998 PUTBACK;
999 cb = hv_fetch ((HV *)SvRV (coro_current), "_trace_sub_cb", sizeof ("_trace_sub_cb") - 1, 0);
1000 if (cb) call_sv (*cb, G_KEEPERR | G_EVAL | G_VOID | G_DISCARD);
1001 SPAGAIN;
1002 FREETMPS;
1003 LEAVE;
1004 PL_runops = runops_trace;
1005 }
1006
1007 oldcxix = cxstack_ix;
1008 }
1009
1010 if (cctx->flags & CC_TRACE_LINE)
1011 {
1012 dSP;
1013
1014 PL_runops = RUNOPS_DEFAULT;
1015 ENTER;
1016 SAVETMPS;
1017 EXTEND (SP, 3);
1018 PL_runops = RUNOPS_DEFAULT;
1019 PUSHMARK (SP);
1020 PUSHs (sv_2mortal (newSVpv (OutCopFILE (oldcop), 0)));
1021 PUSHs (sv_2mortal (newSViv (CopLINE (oldcop))));
1022 PUTBACK;
1023 cb = hv_fetch ((HV *)SvRV (coro_current), "_trace_line_cb", sizeof ("_trace_line_cb") - 1, 0);
1024 if (cb) call_sv (*cb, G_KEEPERR | G_EVAL | G_VOID | G_DISCARD);
1025 SPAGAIN;
1026 FREETMPS;
1027 LEAVE;
1028 PL_runops = runops_trace;
1029 }
1030 }
1031 }
1032 }
1033 }
1034
1035 TAINT_NOT;
1036 return 0;
1037}
1038
1039static void
1040prepare_set_stacklevel (struct coro_transfer_args *ta, struct coro_cctx *cctx)
1041{
1042 ta->prev = (struct coro *)cctx;
1043 ta->next = 0;
1044}
1045
1046/* inject a fake call to Coro::State::_cctx_init into the execution */
1047/* _cctx_init should be careful, as it could be called at almost any time */
1048/* during execution of a perl program */
1049/* also initialises PL_top_env */
1050static void NOINLINE
1051cctx_prepare (pTHX_ coro_cctx *cctx)
1052{
1053 dSP;
1054 UNOP myop;
1055
1056 PL_top_env = &PL_start_env;
1057
1058 if (cctx->flags & CC_TRACE)
1059 PL_runops = runops_trace;
1060
1061 Zero (&myop, 1, UNOP);
1062 myop.op_next = PL_op;
1063 myop.op_flags = OPf_WANT_VOID | OPf_STACKED;
1064
1065 PUSHMARK (SP);
1066 EXTEND (SP, 2);
1067 PUSHs (sv_2mortal (newSViv ((IV)cctx)));
1068 PUSHs ((SV *)get_cv ("Coro::State::_cctx_init", FALSE));
1069 PUTBACK;
1070 PL_op = (OP *)&myop;
1071 PL_op = PL_ppaddr[OP_ENTERSUB](aTHX);
1072 SPAGAIN;
1073}
1074
1075/* the tail of transfer: execute stuff we can only do after a transfer */
1076INLINE void
1077transfer_tail (pTHX)
1078{
1079 struct coro *next = (struct coro *)transfer_next;
1080 assert (!(transfer_next = 0)); /* just used for the side effect when asserts are enabled */
1081 assert (("FATAL: next coroutine was zero in transfer_tail (please report)", next));
1082
1083 free_coro_mortal (aTHX);
1084
1085 if (expect_false (next->throw))
1086 {
1087 SV *exception = sv_2mortal (next->throw);
1088
1089 next->throw = 0;
1090 sv_setsv (ERRSV, exception);
1091 croak (0);
1092 }
1093}
1094
1095/*
1096 * this is a _very_ stripped down perl interpreter ;)
1097 */
1098static void
1099cctx_run (void *arg)
1100{
1101#ifdef USE_ITHREADS
1102# if CORO_PTHREAD
1103 PERL_SET_CONTEXT (coro_thx);
1104# endif
1105#endif
1106 {
1107 dTHX;
1108
1109 /* normally we would need to skip the entersub here */
1110 /* not doing so will re-execute it, which is exactly what we want */
1111 /* PL_nop = PL_nop->op_next */
1112
1113 /* inject a fake subroutine call to cctx_init */
1114 cctx_prepare (aTHX_ (coro_cctx *)arg);
1115
1116 /* cctx_run is the alternative tail of transfer() */
1117 /* TODO: throwing an exception here might be deadly, VERIFY */
1118 transfer_tail (aTHX);
1119
1120 /* somebody or something will hit me for both perl_run and PL_restartop */
1121 PL_restartop = PL_op;
1122 perl_run (PL_curinterp);
1123
1124 /*
1125 * If perl-run returns we assume exit() was being called or the coro
1126 * fell off the end, which seems to be the only valid (non-bug)
1127 * reason for perl_run to return. We try to exit by jumping to the
1128 * bootstrap-time "top" top_env, as we cannot restore the "main"
1129 * coroutine as Coro has no such concept
1130 */
1131 PL_top_env = main_top_env;
1132 JMPENV_JUMP (2); /* I do not feel well about the hardcoded 2 at all */
1133 }
1134}
1135
1136static coro_cctx *
1137cctx_new ()
1138{
1139 coro_cctx *cctx;
1140
1141 ++cctx_count;
1142 New (0, cctx, 1, coro_cctx);
1143
1144 cctx->gen = cctx_gen;
1145 cctx->flags = 0;
1146 cctx->idle_sp = 0; /* can be accessed by transfer between cctx_run and set_stacklevel, on throw */
1147
1148 return cctx;
1149}
1150
1151/* create a new cctx only suitable as source */
1152static coro_cctx *
1153cctx_new_empty ()
1154{
1155 coro_cctx *cctx = cctx_new ();
1156
1157 cctx->sptr = 0;
1158 coro_create (&cctx->cctx, 0, 0, 0, 0);
1159
1160 return cctx;
1161}
1162
1163/* create a new cctx suitable as destination/running a perl interpreter */
1164static coro_cctx *
1165cctx_new_run ()
1166{
1167 coro_cctx *cctx = cctx_new ();
1168 void *stack_start;
1169 size_t stack_size;
1170
1171#if HAVE_MMAP
1172 cctx->ssize = ((cctx_stacksize * sizeof (long) + PAGESIZE - 1) / PAGESIZE + CORO_STACKGUARD) * PAGESIZE;
1173 /* mmap supposedly does allocate-on-write for us */
1174 cctx->sptr = mmap (0, cctx->ssize, PROT_EXEC|PROT_READ|PROT_WRITE, MAP_PRIVATE|MAP_ANONYMOUS, 0, 0);
1175
1176 if (cctx->sptr != (void *)-1)
1177 {
1178 #if CORO_STACKGUARD
1179 mprotect (cctx->sptr, CORO_STACKGUARD * PAGESIZE, PROT_NONE);
1180 #endif
1181 stack_start = (char *)cctx->sptr + CORO_STACKGUARD * PAGESIZE;
1182 stack_size = cctx->ssize - CORO_STACKGUARD * PAGESIZE;
1183 cctx->flags |= CC_MAPPED;
1184 }
1185 else
1186#endif
1187 {
1188 cctx->ssize = cctx_stacksize * (long)sizeof (long);
1189 New (0, cctx->sptr, cctx_stacksize, long);
1190
1191 if (!cctx->sptr)
1192 {
1193 perror ("FATAL: unable to allocate stack for coroutine, exiting.");
1194 _exit (EXIT_FAILURE);
1195 }
1196
1197 stack_start = cctx->sptr;
1198 stack_size = cctx->ssize;
1199 }
1200
1201 #if CORO_USE_VALGRIND
1202 cctx->valgrind_id = VALGRIND_STACK_REGISTER ((char *)stack_start, (char *)stack_start + stack_size);
1203 #endif
1204
1205 coro_create (&cctx->cctx, cctx_run, (void *)cctx, stack_start, stack_size);
1206
1207 return cctx;
1208}
1209
1210static void
1211cctx_destroy (coro_cctx *cctx)
1212{
1213 if (!cctx)
1214 return;
1215
1216 --cctx_count;
1217 coro_destroy (&cctx->cctx);
1218
1219 /* coro_transfer creates new, empty cctx's */
1220 if (cctx->sptr)
1221 {
1222 #if CORO_USE_VALGRIND
1223 VALGRIND_STACK_DEREGISTER (cctx->valgrind_id);
1224 #endif
1225
1226#if HAVE_MMAP
1227 if (cctx->flags & CC_MAPPED)
1228 munmap (cctx->sptr, cctx->ssize);
1229 else
1230#endif
1231 Safefree (cctx->sptr);
1232 }
1233
1234 Safefree (cctx);
1235}
1236
1237/* wether this cctx should be destructed */
1238#define CCTX_EXPIRED(cctx) ((cctx)->gen != cctx_gen || ((cctx)->flags & CC_NOREUSE))
1239
1240static coro_cctx *
1241cctx_get (pTHX)
1242{
1243 while (expect_true (cctx_first))
1244 {
1245 coro_cctx *cctx = cctx_first;
1246 cctx_first = cctx->next;
1247 --cctx_idle;
1248
1249 if (expect_true (!CCTX_EXPIRED (cctx)))
1250 return cctx;
1251
1252 cctx_destroy (cctx);
1253 }
1254
1255 return cctx_new_run ();
1256}
1257
1258static void
1259cctx_put (coro_cctx *cctx)
1260{
1261 assert (("FATAL: cctx_put called on non-initialised cctx in Coro (please report)", cctx->sptr));
1262
1263 /* free another cctx if overlimit */
1264 if (expect_false (cctx_idle >= cctx_max_idle))
1265 {
1266 coro_cctx *first = cctx_first;
1267 cctx_first = first->next;
1268 --cctx_idle;
1269
1270 cctx_destroy (first);
1271 }
1272
1273 ++cctx_idle;
1274 cctx->next = cctx_first;
1275 cctx_first = cctx;
1276}
1277
1278/** coroutine switching *****************************************************/
1279
1280static void
1281transfer_check (pTHX_ struct coro *prev, struct coro *next)
1282{
1283 if (expect_true (prev != next))
1284 {
1285 if (expect_false (!(prev->flags & (CF_RUNNING | CF_NEW))))
1286 croak ("Coro::State::transfer called with non-running/new prev Coro::State, but can only transfer from running or new states,");
1287
1288 if (expect_false (next->flags & CF_RUNNING))
1289 croak ("Coro::State::transfer called with running next Coro::State, but can only transfer to inactive states,");
1290
1291 if (expect_false (next->flags & CF_DESTROYED))
1292 croak ("Coro::State::transfer called with destroyed next Coro::State, but can only transfer to inactive states,");
1293
1294#if !PERL_VERSION_ATLEAST (5,10,0)
1295 if (expect_false (PL_lex_state != LEX_NOTPARSING))
1296 croak ("Coro::State::transfer called while parsing, but this is not supported in your perl version,");
1297#endif
1298 }
1299}
1300
1301/* always use the TRANSFER macro */
1302static void NOINLINE
1303transfer (pTHX_ struct coro *prev, struct coro *next, int force_cctx)
1304{
1305 dSTACKLEVEL;
1306
1307 /* sometimes transfer is only called to set idle_sp */
1308 if (expect_false (!next))
1309 {
1310 ((coro_cctx *)prev)->idle_sp = (void *)stacklevel;
1311 assert (((coro_cctx *)prev)->idle_te = PL_top_env); /* just for the side-effect when asserts are enabled */
1312 }
1313 else if (expect_true (prev != next))
1314 {
1315 coro_cctx *prev__cctx;
1316
1317 if (expect_false (prev->flags & CF_NEW))
1318 {
1319 /* create a new empty/source context */
1320 prev->cctx = cctx_new_empty ();
1321 prev->flags &= ~CF_NEW;
1322 prev->flags |= CF_RUNNING;
1323 }
1324
1325 prev->flags &= ~CF_RUNNING;
1326 next->flags |= CF_RUNNING;
1327
1328 /* first get rid of the old state */
1329 save_perl (aTHX_ prev);
1330
1331 if (expect_false (next->flags & CF_NEW))
1332 {
1333 /* need to start coroutine */
1334 next->flags &= ~CF_NEW;
1335 /* setup coroutine call */
1336 coro_setup (aTHX_ next);
1337 }
1338 else
1339 load_perl (aTHX_ next);
1340
1341 prev__cctx = prev->cctx;
1342
1343 /* possibly untie and reuse the cctx */
1344 if (expect_true (
1345 prev__cctx->idle_sp == (void *)stacklevel
1346 && !(prev__cctx->flags & CC_TRACE)
1347 && !force_cctx
1348 ))
1349 {
1350 /* I assume that stacklevel is a stronger indicator than PL_top_env changes */
1351 assert (("FATAL: current top_env must equal previous top_env in Coro (please report)", PL_top_env == prev__cctx->idle_te));
1352
1353 prev->cctx = 0;
1354
1355 /* if the cctx is about to be destroyed we need to make sure we won't see it in cctx_get */
1356 /* without this the next cctx_get might destroy the prev__cctx while still in use */
1357 if (expect_false (CCTX_EXPIRED (prev__cctx)))
1358 if (!next->cctx)
1359 next->cctx = cctx_get (aTHX);
1360
1361 cctx_put (prev__cctx);
1362 }
1363
1364 ++next->usecount;
1365
1366 if (expect_true (!next->cctx))
1367 next->cctx = cctx_get (aTHX);
1368
1369 assert (("FATAL: transfer_next already nonzero in Coro (please report)", !transfer_next));
1370 transfer_next = next;
1371
1372 if (expect_false (prev__cctx != next->cctx))
1373 {
1374 prev__cctx->top_env = PL_top_env;
1375 PL_top_env = next->cctx->top_env;
1376 coro_transfer (&prev__cctx->cctx, &next->cctx->cctx);
1377 }
1378
1379 transfer_tail (aTHX);
1380 }
1381}
1382
1383#define TRANSFER(ta, force_cctx) transfer (aTHX_ (ta).prev, (ta).next, (force_cctx))
1384#define TRANSFER_CHECK(ta) transfer_check (aTHX_ (ta).prev, (ta).next)
1385
1386/** high level stuff ********************************************************/
1387
1388static int
1389coro_state_destroy (pTHX_ struct coro *coro)
1390{
1391 if (coro->flags & CF_DESTROYED)
1392 return 0;
1393
1394 if (coro->on_destroy)
1395 coro->on_destroy (aTHX_ coro);
1396
1397 coro->flags |= CF_DESTROYED;
1398
1399 if (coro->flags & CF_READY)
1400 {
1401 /* reduce nready, as destroying a ready coro effectively unreadies it */
1402 /* alternative: look through all ready queues and remove the coro */
1403 --coro_nready;
1404 }
1405 else
1406 coro->flags |= CF_READY; /* make sure it is NOT put into the readyqueue */
1407
1408 if (coro->mainstack && coro->mainstack != main_mainstack)
1409 {
1410 struct coro temp;
1411
1412 assert (("FATAL: tried to destroy currently running coroutine (please report)", !(coro->flags & CF_RUNNING)));
1413
1414 save_perl (aTHX_ &temp);
1415 load_perl (aTHX_ coro);
1416
1417 coro_destruct (aTHX_ coro);
1418
1419 load_perl (aTHX_ &temp);
1420
1421 coro->slot = 0;
1422 }
1423
1424 cctx_destroy (coro->cctx);
1425 SvREFCNT_dec (coro->args);
1426
1427 if (coro->next) coro->next->prev = coro->prev;
1428 if (coro->prev) coro->prev->next = coro->next;
1429 if (coro == coro_first) coro_first = coro->next;
1430
1431 return 1;
1432}
1433
1434static int
1435coro_state_free (pTHX_ SV *sv, MAGIC *mg)
1436{
1437 struct coro *coro = (struct coro *)mg->mg_ptr;
1438 mg->mg_ptr = 0;
1439
1440 coro->hv = 0;
1441
1442 if (--coro->refcnt < 0)
1443 {
1444 coro_state_destroy (aTHX_ coro);
1445 Safefree (coro);
1446 }
1447
1448 return 0;
1449}
1450
1451static int
1452coro_state_dup (pTHX_ MAGIC *mg, CLONE_PARAMS *params)
1453{
1454 struct coro *coro = (struct coro *)mg->mg_ptr;
1455
1456 ++coro->refcnt;
1457
1458 return 0;
1459}
1460
1461static MGVTBL coro_state_vtbl = {
1462 0, 0, 0, 0,
1463 coro_state_free,
1464 0,
1465#ifdef MGf_DUP
1466 coro_state_dup,
1467#else
1468# define MGf_DUP 0
1469#endif
1470};
1471
1472static void
1473prepare_transfer (pTHX_ struct coro_transfer_args *ta, SV *prev_sv, SV *next_sv)
1474{
1475 ta->prev = SvSTATE (prev_sv);
1476 ta->next = SvSTATE (next_sv);
1477 TRANSFER_CHECK (*ta);
1478}
1479
1480static void
1481api_transfer (pTHX_ SV *prev_sv, SV *next_sv)
1482{
1483 struct coro_transfer_args ta;
1484
1485 prepare_transfer (aTHX_ &ta, prev_sv, next_sv);
1486 TRANSFER (ta, 1);
1487}
1488
1489/** Coro ********************************************************************/
1490
1491static void
1492coro_enq (pTHX_ SV *coro_sv)
1493{
1494 av_push (coro_ready [SvSTATE (coro_sv)->prio - PRIO_MIN], coro_sv);
1495}
1496
1497static SV *
1498coro_deq (pTHX)
1499{
1500 int prio;
1501
1502 for (prio = PRIO_MAX - PRIO_MIN + 1; --prio >= 0; )
1503 if (AvFILLp (coro_ready [prio]) >= 0)
1504 return av_shift (coro_ready [prio]);
1505
1506 return 0;
1507}
1508
1509static int
1510api_ready (pTHX_ SV *coro_sv)
1511{
1512 struct coro *coro;
1513 SV *sv_hook;
1514 void (*xs_hook)(void);
1515
1516 if (SvROK (coro_sv))
1517 coro_sv = SvRV (coro_sv);
1518
1519 coro = SvSTATE (coro_sv);
1520
1521 if (coro->flags & CF_READY)
1522 return 0;
1523
1524 coro->flags |= CF_READY;
1525
1526 sv_hook = coro_nready ? 0 : coro_readyhook;
1527 xs_hook = coro_nready ? 0 : coroapi.readyhook;
1528
1529 coro_enq (aTHX_ SvREFCNT_inc_NN (coro_sv));
1530 ++coro_nready;
1531
1532 if (sv_hook)
1533 {
1534 dSP;
1535
1536 ENTER;
1537 SAVETMPS;
1538
1539 PUSHMARK (SP);
1540 PUTBACK;
1541 call_sv (sv_hook, G_DISCARD);
1542 SPAGAIN;
1543
1544 FREETMPS;
1545 LEAVE;
1546 }
1547
1548 if (xs_hook)
1549 xs_hook ();
1550
1551 return 1;
1552}
1553
1554static int
1555api_is_ready (pTHX_ SV *coro_sv)
1556{
1557 return !!(SvSTATE (coro_sv)->flags & CF_READY);
1558}
1559
1560INLINE void
1561prepare_schedule (pTHX_ struct coro_transfer_args *ta)
1562{
1563 SV *prev_sv, *next_sv;
1564
1565 for (;;)
1566 {
1567 next_sv = coro_deq (aTHX);
1568
1569 /* nothing to schedule: call the idle handler */
1570 if (expect_false (!next_sv))
1571 {
1572 dSP;
1573
1574 ENTER;
1575 SAVETMPS;
1576
1577 PUSHMARK (SP);
1578 PUTBACK;
1579 call_sv (get_sv ("Coro::idle", FALSE), G_DISCARD);
1580 SPAGAIN;
1581
1582 FREETMPS;
1583 LEAVE;
1584 continue;
1585 }
1586
1587 ta->next = SvSTATE (next_sv);
1588
1589 /* cannot transfer to destroyed coros, skip and look for next */
1590 if (expect_false (ta->next->flags & CF_DESTROYED))
1591 {
1592 SvREFCNT_dec (next_sv);
1593 /* coro_nready has already been taken care of by destroy */
1594 continue;
1595 }
1596
1597 --coro_nready;
1598 break;
1599 }
1600
1601 /* free this only after the transfer */
1602 prev_sv = SvRV (coro_current);
1603 ta->prev = SvSTATE (prev_sv);
1604 TRANSFER_CHECK (*ta);
1605 assert (("FATAL: next coroutine isn't marked as ready in Coro (please report)", ta->next->flags & CF_READY));
1606 ta->next->flags &= ~CF_READY;
1607 SvRV_set (coro_current, next_sv);
1608
1609 free_coro_mortal (aTHX);
1610 coro_mortal = prev_sv;
1611}
1612
1613INLINE void
1614prepare_cede (pTHX_ struct coro_transfer_args *ta)
1615{
1616 api_ready (aTHX_ coro_current);
1617 prepare_schedule (aTHX_ ta);
1618}
1619
1620INLINE void
1621prepare_cede_notself (pTHX_ struct coro_transfer_args *ta)
1622{
1623 SV *prev = SvRV (coro_current);
1624
1625 if (coro_nready)
1626 {
1627 prepare_schedule (aTHX_ ta);
1628 api_ready (aTHX_ prev);
1629 }
1630 else
1631 prepare_nop (aTHX_ ta);
1632}
1633
1634static void
1635api_schedule (pTHX)
1636{
1637 struct coro_transfer_args ta;
1638
1639 prepare_schedule (aTHX_ &ta);
1640 TRANSFER (ta, 1);
1641}
1642
1643static int
1644api_cede (pTHX)
1645{
1646 struct coro_transfer_args ta;
1647
1648 prepare_cede (aTHX_ &ta);
1649
1650 if (expect_true (ta.prev != ta.next))
1651 {
1652 TRANSFER (ta, 1);
1653 return 1;
1654 }
1655 else
1656 return 0;
1657}
1658
1659static int
1660api_cede_notself (pTHX)
1661{
1662 if (coro_nready)
1663 {
1664 struct coro_transfer_args ta;
1665
1666 prepare_cede_notself (aTHX_ &ta);
1667 TRANSFER (ta, 1);
1668 return 1;
1669 }
1670 else
1671 return 0;
1672}
1673
1674static void
1675api_trace (pTHX_ SV *coro_sv, int flags)
1676{
1677 struct coro *coro = SvSTATE (coro_sv);
1678
1679 if (flags & CC_TRACE)
1680 {
1681 if (!coro->cctx)
1682 coro->cctx = cctx_new_run ();
1683 else if (!(coro->cctx->flags & CC_TRACE))
1684 croak ("cannot enable tracing on coroutine with custom stack,");
1685
1686 coro->cctx->flags |= CC_NOREUSE | (flags & (CC_TRACE | CC_TRACE_ALL));
1687 }
1688 else if (coro->cctx && coro->cctx->flags & CC_TRACE)
1689 {
1690 coro->cctx->flags &= ~(CC_TRACE | CC_TRACE_ALL);
1691
1692 if (coro->flags & CF_RUNNING)
1693 PL_runops = RUNOPS_DEFAULT;
1694 else
1695 coro->slot->runops = RUNOPS_DEFAULT;
1696 }
1697}
1698
1699/*****************************************************************************/
1700/* PerlIO::cede */
1701
1702typedef struct
1703{
1704 PerlIOBuf base;
1705 NV next, every;
1706} PerlIOCede;
1707
1708static IV
1709PerlIOCede_pushed (pTHX_ PerlIO *f, const char *mode, SV *arg, PerlIO_funcs *tab)
1710{
1711 PerlIOCede *self = PerlIOSelf (f, PerlIOCede);
1712
1713 self->every = SvCUR (arg) ? SvNV (arg) : 0.01;
1714 self->next = nvtime () + self->every;
1715
1716 return PerlIOBuf_pushed (aTHX_ f, mode, Nullsv, tab);
1717}
1718
1719static SV *
1720PerlIOCede_getarg (pTHX_ PerlIO *f, CLONE_PARAMS *param, int flags)
1721{
1722 PerlIOCede *self = PerlIOSelf (f, PerlIOCede);
1723
1724 return newSVnv (self->every);
1725}
1726
1727static IV
1728PerlIOCede_flush (pTHX_ PerlIO *f)
1729{
1730 PerlIOCede *self = PerlIOSelf (f, PerlIOCede);
1731 double now = nvtime ();
1732
1733 if (now >= self->next)
1734 {
1735 api_cede (aTHX);
1736 self->next = now + self->every;
1737 }
1738
1739 return PerlIOBuf_flush (aTHX_ f);
1740}
1741
1742static PerlIO_funcs PerlIO_cede =
1743{
1744 sizeof(PerlIO_funcs),
1745 "cede",
1746 sizeof(PerlIOCede),
1747 PERLIO_K_DESTRUCT | PERLIO_K_RAW,
1748 PerlIOCede_pushed,
1749 PerlIOBuf_popped,
1750 PerlIOBuf_open,
1751 PerlIOBase_binmode,
1752 PerlIOCede_getarg,
1753 PerlIOBase_fileno,
1754 PerlIOBuf_dup,
1755 PerlIOBuf_read,
1756 PerlIOBuf_unread,
1757 PerlIOBuf_write,
1758 PerlIOBuf_seek,
1759 PerlIOBuf_tell,
1760 PerlIOBuf_close,
1761 PerlIOCede_flush,
1762 PerlIOBuf_fill,
1763 PerlIOBase_eof,
1764 PerlIOBase_error,
1765 PerlIOBase_clearerr,
1766 PerlIOBase_setlinebuf,
1767 PerlIOBuf_get_base,
1768 PerlIOBuf_bufsiz,
1769 PerlIOBuf_get_ptr,
1770 PerlIOBuf_get_cnt,
1771 PerlIOBuf_set_ptrcnt,
1772};
1773
1774/*****************************************************************************/
1775
1776static const CV *slf_cv; /* for quick consistency check */
1777
1778static UNOP slf_restore; /* restore stack as entersub did, for first-re-run */
1779static SV *slf_arg0;
1780static SV *slf_arg1;
1781static SV *slf_arg2;
1782
1783/* this restores the stack in the case we patched the entersub, to */
1784/* recreate the stack frame as perl will on following calls */
1785/* since entersub cleared the stack */
1786static OP *
1787pp_restore (pTHX)
1788{
1789 dSP;
1790
1791 PUSHMARK (SP);
1792
1793 EXTEND (SP, 3);
1794 if (slf_arg0) PUSHs (sv_2mortal (slf_arg0));
1795 if (slf_arg1) PUSHs (sv_2mortal (slf_arg1));
1796 if (slf_arg2) PUSHs (sv_2mortal (slf_arg2));
1797 PUSHs ((SV *)CvGV (slf_cv));
1798
1799 RETURNOP (slf_restore.op_first);
1800}
1801
1802static void
1803slf_prepare_set_stacklevel (pTHX_ struct coro_transfer_args *ta)
1804{
1805 prepare_set_stacklevel (ta, (struct coro_cctx *)slf_frame.data);
1806}
1807
1808static void
1809slf_init_set_stacklevel (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
1810{
1811 assert (("FATAL: set_stacklevel needs the coro cctx as sole argument", items == 1));
1812
1813 frame->prepare = slf_prepare_set_stacklevel;
1814 frame->check = slf_check_nop;
1815 frame->data = (void *)SvIV (arg [0]);
1816}
1817
1818static void
1819slf_prepare_transfer (pTHX_ struct coro_transfer_args *ta)
1820{
1821 SV **arg = (SV **)slf_frame.data;
1822
1823 prepare_transfer (aTHX_ ta, arg [0], arg [1]);
1824}
1825
1826static void
1827slf_init_transfer (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
1828{
1829 if (items != 2)
1830 croak ("Coro::State::transfer (prev, next) expects two arguments, not %d,", items);
1831
1832 frame->prepare = slf_prepare_transfer;
1833 frame->check = slf_check_nop;
1834 frame->data = (void *)arg; /* let's hope it will stay valid */
1835}
1836
1837static void
1838slf_init_schedule (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
1839{
1840 frame->prepare = prepare_schedule;
1841 frame->check = slf_check_nop;
1842}
1843
1844static void
1845slf_init_cede (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
1846{
1847 frame->prepare = prepare_cede;
1848 frame->check = slf_check_nop;
1849}
1850
1851static void
1852slf_init_cede_notself (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
1853{
1854 frame->prepare = prepare_cede_notself;
1855 frame->check = slf_check_nop;
1856}
1857
1858/* we hijack an hopefully unused CV flag for our purposes */
1859#define CVf_SLF 0x4000
1860
1861/*
1862 * these not obviously related functions are all rolled into one
1863 * function to increase chances that they all will call transfer with the same
1864 * stack offset
1865 * SLF stands for "schedule-like-function".
1866 */
1867static OP *
1868pp_slf (pTHX)
1869{
1870 I32 checkmark; /* mark SP to see how many elements check has pushed */
1871
1872 /* set up the slf frame, unless it has already been set-up */
1873 /* the latter happens when a new coro has been started */
1874 /* or when a new cctx was attached to an existing coroutine */
1875 if (expect_true (!slf_frame.prepare))
1876 {
1877 /* first iteration */
1878 dSP;
1879 SV **arg = PL_stack_base + TOPMARK + 1;
1880 int items = SP - arg; /* args without function object */
1881 SV *gv = *sp;
1882
1883 /* do a quick consistency check on the "function" object, and if it isn't */
1884 /* for us, divert to the real entersub */
1885 if (SvTYPE (gv) != SVt_PVGV || !(CvFLAGS (GvCV (gv)) & CVf_SLF))
1886 return PL_ppaddr[OP_ENTERSUB](aTHX);
1887
1888 /* pop args */
1889 SP = PL_stack_base + POPMARK;
1890
1891 if (!(PL_op->op_flags & OPf_STACKED))
1892 {
1893 /* ampersand-form of call, use @_ instead of stack */
1894 AV *av = GvAV (PL_defgv);
1895 arg = AvARRAY (av);
1896 items = AvFILLp (av) + 1;
1897 }
1898
1899 PUTBACK;
1900
1901 /* now call the init function, which needs to set up slf_frame */
1902 ((coro_slf_cb)CvXSUBANY (GvCV (gv)).any_ptr)
1903 (aTHX_ &slf_frame, GvCV (gv), arg, items);
1904 }
1905
1906 /* now that we have a slf_frame, interpret it! */
1907 /* we use a callback system not to make the code needlessly */
1908 /* complicated, but so we can run multiple perl coros from one cctx */
1909
1910 do
1911 {
1912 struct coro_transfer_args ta;
1913
1914 slf_frame.prepare (aTHX_ &ta);
1915 TRANSFER (ta, 0);
1916
1917 checkmark = PL_stack_sp - PL_stack_base;
1918 }
1919 while (slf_frame.check (aTHX_ &slf_frame));
1920
1921 {
1922 dSP;
1923 SV **bot = PL_stack_base + checkmark;
1924 int gimme = GIMME_V;
1925
1926 slf_frame.prepare = 0; /* invalidate the frame, so it gets initialised again next time */
1927
1928 /* make sure we put something on the stack in scalar context */
1929 if (gimme == G_SCALAR)
1930 {
1931 if (sp == bot)
1932 XPUSHs (&PL_sv_undef);
1933
1934 SP = bot + 1;
1935 }
1936
1937 PUTBACK;
1938 }
1939
1940 return NORMAL;
1941}
1942
1943static void
1944api_execute_slf (pTHX_ CV *cv, coro_slf_cb init_cb, SV **arg, int items)
1945{
1946 assert (("FATAL: SLF call with illegal CV value", !CvANON (cv)));
1947
1948 if (PL_op->op_ppaddr != PL_ppaddr [OP_ENTERSUB]
1949 && PL_op->op_ppaddr != pp_slf)
1950 croak ("FATAL: Coro SLF calls can only be made normally, not via goto or any other means, caught");
1951
1952 if (items > 3)
1953 croak ("Coro only supports up to three arguments to SLF functions currently (not %d), caught", items);
1954
1955 CvFLAGS (cv) |= CVf_SLF;
1956 CvXSUBANY (cv).any_ptr = (void *)init_cb;
1957 slf_cv = cv;
1958
1959 /* we patch the op, and then re-run the whole call */
1960 /* we have to put the same argument on the stack for this to work */
1961 /* and this will be done by pp_restore */
1962 slf_restore.op_next = (OP *)&slf_restore;
1963 slf_restore.op_type = OP_CUSTOM;
1964 slf_restore.op_ppaddr = pp_restore;
1965 slf_restore.op_first = PL_op;
1966
1967 slf_arg0 = items > 0 ? SvREFCNT_inc (arg [0]) : 0;
1968 slf_arg1 = items > 1 ? SvREFCNT_inc (arg [1]) : 0;
1969 slf_arg2 = items > 2 ? SvREFCNT_inc (arg [2]) : 0;
1970
1971 PL_op->op_ppaddr = pp_slf;
1972
1973 PL_op = (OP *)&slf_restore;
1974}
1975
1976/*****************************************************************************/
1977
1978static void
1979coro_semaphore_adjust (AV *av, int adjust)
1980{
1981 SV *count_sv = AvARRAY (av)[0];
1982 IV count = SvIVX (count_sv);
1983
1984 count += adjust;
1985 SvIVX (count_sv) = count;
1986
1987 /* now wake up as many waiters as possible */
1988 while (count > 0 && AvFILLp (av) >= count)
1989 {
1990 SV *cb;
1991
1992 /* swap first two elements so we can shift a waiter */
1993 AvARRAY (av)[0] = AvARRAY (av)[1];
1994 AvARRAY (av)[1] = count_sv;
1995 cb = av_shift (av);
1996
1997 if (SvOBJECT (cb))
1998 api_ready (aTHX_ cb);
1999 else
2000 croak ("callbacks not yet supported");
2001
2002 SvREFCNT_dec (cb);
2003
2004 --count;
2005 }
2006}
2007
2008static void
2009coro_semaphore_on_destroy (pTHX_ struct coro *coro)
2010{
2011 /* call $sem->adjust (0) to possibly wake up some waiters */
2012 coro_semaphore_adjust ((AV *)coro->slf_frame.data, 0);
2013}
2014
2015static int
2016slf_check_semaphore_down (pTHX_ struct CoroSLF *frame)
2017{
2018 AV *av = (AV *)frame->data;
2019 SV *count_sv = AvARRAY (av)[0];
2020
2021 if (SvIVX (count_sv) > 0)
2022 {
2023 SvSTATE (coro_current)->on_destroy = 0;
2024 SvIVX (count_sv) = SvIVX (count_sv) - 1;
2025 return 0;
2026 }
2027 else
2028 {
2029 int i;
2030 /* if we were woken up but can't down, we look through the whole */
2031 /* waiters list and only add us if we aren't in there already */
2032 /* this avoids some degenerate memory usage cases */
2033
2034 for (i = 1; i <= AvFILLp (av); ++i)
2035 if (AvARRAY (av)[i] == SvRV (coro_current))
2036 return 1;
2037
2038 av_push (av, SvREFCNT_inc (SvRV (coro_current)));
2039 return 1;
2040 }
2041}
2042
2043static void
2044slf_init_semaphore_down (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2045{
2046 AV *av = (AV *)SvRV (arg [0]);
2047
2048 if (SvIVX (AvARRAY (av)[0]) > 0)
2049 {
2050 frame->data = (void *)av;
2051 frame->prepare = prepare_nop;
2052 }
2053 else
2054 {
2055 av_push (av, SvREFCNT_inc (SvRV (coro_current)));
2056
2057 frame->data = (void *)sv_2mortal (SvREFCNT_inc ((SV *)av));
2058 frame->prepare = prepare_schedule;
2059
2060 /* to avoid race conditions when a woken-up coro gets terminated */
2061 /* we arrange for a temporary on_destroy that calls adjust (0) */
2062 SvSTATE (coro_current)->on_destroy = coro_semaphore_on_destroy;
2063 }
2064
2065 frame->check = slf_check_semaphore_down;
2066
2067}
2068
2069/*****************************************************************************/
2070
2071#define GENSUB_ARG CvXSUBANY (cv).any_ptr
2072
2073/* create a closure from XS, returns a code reference */
2074/* the arg can be accessed via GENSUB_ARG from the callback */
2075/* the callback must use dXSARGS/XSRETURN */
2076static SV *
2077gensub (pTHX_ void (*xsub)(pTHX_ CV *), void *arg)
2078{
2079 CV *cv = (CV *)NEWSV (0, 0);
2080
2081 sv_upgrade ((SV *)cv, SVt_PVCV);
2082
2083 CvANON_on (cv);
2084 CvISXSUB_on (cv);
2085 CvXSUB (cv) = xsub;
2086 GENSUB_ARG = arg;
2087
2088 return newRV_noinc ((SV *)cv);
2089}
2090
2091/*****************************************************************************/
2092
2093MODULE = Coro::State PACKAGE = Coro::State PREFIX = api_
2094
2095PROTOTYPES: DISABLE
2096
2097BOOT:
2098{
2099#ifdef USE_ITHREADS
2100 MUTEX_INIT (&coro_lock);
2101# if CORO_PTHREAD
2102 coro_thx = PERL_GET_CONTEXT;
2103# endif
2104#endif
2105 BOOT_PAGESIZE;
2106
2107 irsgv = gv_fetchpv ("/" , GV_ADD|GV_NOTQUAL, SVt_PV);
2108 stdoutgv = gv_fetchpv ("STDOUT", GV_ADD|GV_NOTQUAL, SVt_PVIO);
2109
2110 orig_sigelem_get = PL_vtbl_sigelem.svt_get; PL_vtbl_sigelem.svt_get = coro_sigelem_get;
2111 orig_sigelem_set = PL_vtbl_sigelem.svt_set; PL_vtbl_sigelem.svt_set = coro_sigelem_set;
2112 orig_sigelem_clr = PL_vtbl_sigelem.svt_clear; PL_vtbl_sigelem.svt_clear = coro_sigelem_clr;
2113
2114 hv_sig = coro_get_hv (aTHX_ "SIG", TRUE);
2115 rv_diehook = newRV_inc ((SV *)gv_fetchpv ("Coro::State::diehook" , 0, SVt_PVCV));
2116 rv_warnhook = newRV_inc ((SV *)gv_fetchpv ("Coro::State::warnhook", 0, SVt_PVCV));
2117
2118 coro_state_stash = gv_stashpv ("Coro::State", TRUE);
2119
2120 newCONSTSUB (coro_state_stash, "CC_TRACE" , newSViv (CC_TRACE));
2121 newCONSTSUB (coro_state_stash, "CC_TRACE_SUB" , newSViv (CC_TRACE_SUB));
2122 newCONSTSUB (coro_state_stash, "CC_TRACE_LINE", newSViv (CC_TRACE_LINE));
2123 newCONSTSUB (coro_state_stash, "CC_TRACE_ALL" , newSViv (CC_TRACE_ALL));
2124
2125 main_mainstack = PL_mainstack;
2126 main_top_env = PL_top_env;
2127
2128 while (main_top_env->je_prev)
2129 main_top_env = main_top_env->je_prev;
2130
2131 {
2132 SV *slf = sv_2mortal (newSViv (PTR2IV (pp_slf)));
2133
2134 if (!PL_custom_op_names) PL_custom_op_names = newHV ();
2135 hv_store_ent (PL_custom_op_names, slf,
2136 newSVpv ("coro_slf", 0), 0);
2137
2138 if (!PL_custom_op_descs) PL_custom_op_descs = newHV ();
2139 hv_store_ent (PL_custom_op_descs, slf,
2140 newSVpv ("coro schedule like function", 0), 0);
2141 }
2142
2143 coroapi.ver = CORO_API_VERSION;
2144 coroapi.rev = CORO_API_REVISION;
2145
2146 coroapi.transfer = api_transfer;
2147
2148 coroapi.sv_state = SvSTATE_;
2149 coroapi.execute_slf = api_execute_slf;
2150 coroapi.prepare_nop = prepare_nop;
2151 coroapi.prepare_schedule = prepare_schedule;
2152 coroapi.prepare_cede = prepare_cede;
2153 coroapi.prepare_cede_notself = prepare_cede_notself;
2154
2155 {
2156 SV **svp = hv_fetch (PL_modglobal, "Time::NVtime", 12, 0);
2157
2158 if (!svp) croak ("Time::HiRes is required");
2159 if (!SvIOK (*svp)) croak ("Time::NVtime isn't a function pointer");
2160
2161 nvtime = INT2PTR (double (*)(), SvIV (*svp));
2162 }
2163
2164 assert (("PRIO_NORMAL must be 0", !PRIO_NORMAL));
2165}
2166
2167SV *
2168new (char *klass, ...)
2169 CODE:
2170{
2171 struct coro *coro;
2172 MAGIC *mg;
2173 HV *hv;
2174 int i;
2175
2176 Newz (0, coro, 1, struct coro);
2177 coro->args = newAV ();
2178 coro->flags = CF_NEW;
2179
2180 if (coro_first) coro_first->prev = coro;
2181 coro->next = coro_first;
2182 coro_first = coro;
2183
2184 coro->hv = hv = newHV ();
2185 mg = sv_magicext ((SV *)hv, 0, CORO_MAGIC_type_state, &coro_state_vtbl, (char *)coro, 0);
2186 mg->mg_flags |= MGf_DUP;
2187 RETVAL = sv_bless (newRV_noinc ((SV *)hv), gv_stashpv (klass, 1));
2188
2189 av_extend (coro->args, items - 1);
2190 for (i = 1; i < items; i++)
2191 av_push (coro->args, newSVsv (ST (i)));
2192}
2193 OUTPUT:
2194 RETVAL
2195
2196void
2197_set_stacklevel (...)
2198 CODE:
2199 api_execute_slf (aTHX_ cv, slf_init_set_stacklevel, &ST (0), items);
2200
2201void
2202transfer (...)
2203 PROTOTYPE: $$
2204 CODE:
2205 api_execute_slf (aTHX_ cv, slf_init_transfer, &ST (0), items);
2206
2207bool
2208_destroy (SV *coro_sv)
2209 CODE:
2210 RETVAL = coro_state_destroy (aTHX_ SvSTATE (coro_sv));
2211 OUTPUT:
2212 RETVAL
2213
2214void
2215_exit (int code)
2216 PROTOTYPE: $
2217 CODE:
2218 _exit (code);
2219
2220int
2221cctx_stacksize (int new_stacksize = 0)
2222 PROTOTYPE: ;$
2223 CODE:
2224 RETVAL = cctx_stacksize;
2225 if (new_stacksize)
2226 {
2227 cctx_stacksize = new_stacksize;
2228 ++cctx_gen;
2229 }
2230 OUTPUT:
2231 RETVAL
2232
2233int
2234cctx_max_idle (int max_idle = 0)
2235 PROTOTYPE: ;$
2236 CODE:
2237 RETVAL = cctx_max_idle;
2238 if (max_idle > 1)
2239 cctx_max_idle = max_idle;
2240 OUTPUT:
2241 RETVAL
2242
2243int
2244cctx_count ()
2245 PROTOTYPE:
2246 CODE:
2247 RETVAL = cctx_count;
2248 OUTPUT:
2249 RETVAL
2250
2251int
2252cctx_idle ()
2253 PROTOTYPE:
2254 CODE:
2255 RETVAL = cctx_idle;
2256 OUTPUT:
2257 RETVAL
2258
2259void
2260list ()
2261 PROTOTYPE:
2262 PPCODE:
2263{
2264 struct coro *coro;
2265 for (coro = coro_first; coro; coro = coro->next)
2266 if (coro->hv)
2267 XPUSHs (sv_2mortal (newRV_inc ((SV *)coro->hv)));
2268}
2269
2270void
2271call (Coro::State coro, SV *coderef)
2272 ALIAS:
2273 eval = 1
2274 CODE:
2275{
2276 if (coro->mainstack && ((coro->flags & CF_RUNNING) || coro->slot))
2277 {
2278 struct coro temp;
2279
2280 if (!(coro->flags & CF_RUNNING))
235 { 2281 {
236 /* I never used formats, so how should I know how these are implemented? */ 2282 PUTBACK;
237 /* my bold guess is as a simple, plain sub... */ 2283 save_perl (aTHX_ &temp);
238 croak ("CXt_FORMAT not yet handled. Don't switch coroutines from within formats"); 2284 load_perl (aTHX_ coro);
2285 }
2286
2287 {
2288 dSP;
2289 ENTER;
2290 SAVETMPS;
2291 PUTBACK;
2292 PUSHSTACK;
2293 PUSHMARK (SP);
2294
2295 if (ix)
2296 eval_sv (coderef, 0);
2297 else
2298 call_sv (coderef, G_KEEPERR | G_EVAL | G_VOID | G_DISCARD);
2299
2300 POPSTACK;
2301 SPAGAIN;
2302 FREETMPS;
2303 LEAVE;
2304 PUTBACK;
2305 }
2306
2307 if (!(coro->flags & CF_RUNNING))
2308 {
2309 save_perl (aTHX_ coro);
2310 load_perl (aTHX_ &temp);
2311 SPAGAIN;
239 } 2312 }
240 } 2313 }
241
242 if (top_si->si_type == PERLSI_MAIN)
243 break;
244
245 top_si = top_si->si_prev;
246 ccstk = top_si->si_cxstack;
247 cxix = top_si->si_cxix;
248 }
249
250 PUTBACK;
251 }
252
253 c->dowarn = PL_dowarn;
254 c->defav = GvAV (PL_defgv);
255 c->curstackinfo = PL_curstackinfo;
256 c->curstack = PL_curstack;
257 c->mainstack = PL_mainstack;
258 c->stack_sp = PL_stack_sp;
259 c->op = PL_op;
260 c->curpad = PL_curpad;
261 c->stack_base = PL_stack_base;
262 c->stack_max = PL_stack_max;
263 c->tmps_stack = PL_tmps_stack;
264 c->tmps_floor = PL_tmps_floor;
265 c->tmps_ix = PL_tmps_ix;
266 c->tmps_max = PL_tmps_max;
267 c->markstack = PL_markstack;
268 c->markstack_ptr = PL_markstack_ptr;
269 c->markstack_max = PL_markstack_max;
270 c->scopestack = PL_scopestack;
271 c->scopestack_ix = PL_scopestack_ix;
272 c->scopestack_max = PL_scopestack_max;
273 c->savestack = PL_savestack;
274 c->savestack_ix = PL_savestack_ix;
275 c->savestack_max = PL_savestack_max;
276 c->retstack = PL_retstack;
277 c->retstack_ix = PL_retstack_ix;
278 c->retstack_max = PL_retstack_max;
279 c->curcop = PL_curcop;
280} 2314}
281 2315
282static void 2316SV *
283LOAD(pTHX_ Coro__State c) 2317is_ready (Coro::State coro)
284{ 2318 PROTOTYPE: $
285 PL_dowarn = c->dowarn; 2319 ALIAS:
286 GvAV (PL_defgv) = c->defav; 2320 is_ready = CF_READY
287 PL_curstackinfo = c->curstackinfo; 2321 is_running = CF_RUNNING
288 PL_curstack = c->curstack; 2322 is_new = CF_NEW
289 PL_mainstack = c->mainstack; 2323 is_destroyed = CF_DESTROYED
290 PL_stack_sp = c->stack_sp; 2324 CODE:
291 PL_op = c->op; 2325 RETVAL = boolSV (coro->flags & ix);
292 PL_curpad = c->curpad; 2326 OUTPUT:
293 PL_stack_base = c->stack_base; 2327 RETVAL
294 PL_stack_max = c->stack_max;
295 PL_tmps_stack = c->tmps_stack;
296 PL_tmps_floor = c->tmps_floor;
297 PL_tmps_ix = c->tmps_ix;
298 PL_tmps_max = c->tmps_max;
299 PL_markstack = c->markstack;
300 PL_markstack_ptr = c->markstack_ptr;
301 PL_markstack_max = c->markstack_max;
302 PL_scopestack = c->scopestack;
303 PL_scopestack_ix = c->scopestack_ix;
304 PL_scopestack_max = c->scopestack_max;
305 PL_savestack = c->savestack;
306 PL_savestack_ix = c->savestack_ix;
307 PL_savestack_max = c->savestack_max;
308 PL_retstack = c->retstack;
309 PL_retstack_ix = c->retstack_ix;
310 PL_retstack_max = c->retstack_max;
311 PL_curcop = c->curcop;
312 2328
2329void
2330throw (Coro::State self, SV *throw = &PL_sv_undef)
2331 PROTOTYPE: $;$
2332 CODE:
2333 SvREFCNT_dec (self->throw);
2334 self->throw = SvOK (throw) ? newSVsv (throw) : 0;
2335
2336void
2337api_trace (SV *coro, int flags = CC_TRACE | CC_TRACE_SUB)
2338 PROTOTYPE: $;$
2339 C_ARGS: aTHX_ coro, flags
2340
2341SV *
2342has_cctx (Coro::State coro)
2343 PROTOTYPE: $
2344 CODE:
2345 RETVAL = boolSV (!!coro->cctx);
2346 OUTPUT:
2347 RETVAL
2348
2349int
2350is_traced (Coro::State coro)
2351 PROTOTYPE: $
2352 CODE:
2353 RETVAL = (coro->cctx ? coro->cctx->flags : 0) & CC_TRACE_ALL;
2354 OUTPUT:
2355 RETVAL
2356
2357UV
2358rss (Coro::State coro)
2359 PROTOTYPE: $
2360 ALIAS:
2361 usecount = 1
2362 CODE:
2363 switch (ix)
313 { 2364 {
314 dSP; 2365 case 0: RETVAL = coro_rss (aTHX_ coro); break;
315 CV *cv; 2366 case 1: RETVAL = coro->usecount; break;
316
317 /* now do the ugly restore mess */
318 while ((cv = (CV *)POPs))
319 {
320 AV *padlist = (AV *)POPs;
321
322 put_padlist (cv);
323 CvPADLIST(cv) = padlist;
324 CvDEPTH(cv) = (I32)POPs;
325
326#ifdef USE_THREADS
327 CvOWNER(cv) = (struct perl_thread *)POPs;
328 error does not work either
329#endif
330 } 2367 }
2368 OUTPUT:
2369 RETVAL
331 2370
332 PUTBACK; 2371void
333 } 2372force_cctx ()
334} 2373 PROTOTYPE:
2374 CODE:
2375 struct coro *coro = SvSTATE (coro_current);
2376 coro->cctx->idle_sp = 0;
335 2377
336/* this is an EXACT copy of S_nuke_stacks in perl.c, which is unfortunately static */ 2378void
337STATIC void 2379swap_defsv (Coro::State self)
338destroy_stacks(pTHX) 2380 PROTOTYPE: $
339{ 2381 ALIAS:
340 dSP; 2382 swap_defav = 1
2383 CODE:
2384 if (!self->slot)
2385 croak ("cannot swap state with coroutine that has no saved state,");
2386 else
2387 {
2388 SV **src = ix ? (SV **)&GvAV (PL_defgv) : &GvSV (PL_defgv);
2389 SV **dst = ix ? (SV **)&self->slot->defav : (SV **)&self->slot->defsv;
341 2390
342 /* die does this while calling POPSTACK, but I just don't see why. */ 2391 SV *tmp = *src; *src = *dst; *dst = tmp;
343 dounwind(-1); 2392 }
344 2393
345 /* is this ugly, I ask? */
346 while (PL_scopestack_ix)
347 LEAVE;
348
349 while (PL_curstackinfo->si_next)
350 PL_curstackinfo = PL_curstackinfo->si_next;
351
352 while (PL_curstackinfo)
353 {
354 PERL_SI *p = PL_curstackinfo->si_prev;
355
356 SvREFCNT_dec(PL_curstackinfo->si_stack);
357 Safefree(PL_curstackinfo->si_cxstack);
358 Safefree(PL_curstackinfo);
359 PL_curstackinfo = p;
360 }
361
362 if (PL_scopestack_ix != 0)
363 Perl_warner(aTHX_ WARN_INTERNAL,
364 "Unbalanced scopes: %ld more ENTERs than LEAVEs\n",
365 (long)PL_scopestack_ix);
366 if (PL_savestack_ix != 0)
367 Perl_warner(aTHX_ WARN_INTERNAL,
368 "Unbalanced saves: %ld more saves than restores\n",
369 (long)PL_savestack_ix);
370 if (PL_tmps_floor != -1)
371 Perl_warner(aTHX_ WARN_INTERNAL,"Unbalanced tmps: %ld more allocs than frees\n",
372 (long)PL_tmps_floor + 1);
373 /*
374 */
375 Safefree(PL_tmps_stack);
376 Safefree(PL_markstack);
377 Safefree(PL_scopestack);
378 Safefree(PL_savestack);
379 Safefree(PL_retstack);
380}
381
382#define SUB_INIT "Coro::State::_newcoro"
383
384MODULE = Coro::State PACKAGE = Coro::State 2394MODULE = Coro::State PACKAGE = Coro
385
386PROTOTYPES: ENABLE
387 2395
388BOOT: 2396BOOT:
389 if (!padlist_cache) 2397{
390 padlist_cache = newHV (); 2398 int i;
391 2399
392Coro::State 2400 av_async_pool = coro_get_av (aTHX_ "Coro::async_pool", TRUE);
393_newprocess(args) 2401 sv_pool_rss = coro_get_sv (aTHX_ "Coro::POOL_RSS" , TRUE);
394 SV * args 2402 sv_pool_size = coro_get_sv (aTHX_ "Coro::POOL_SIZE" , TRUE);
2403
2404 coro_current = coro_get_sv (aTHX_ "Coro::current", FALSE);
2405 SvREADONLY_on (coro_current);
2406
2407 coro_stash = gv_stashpv ("Coro", TRUE);
2408
2409 newCONSTSUB (coro_stash, "PRIO_MAX", newSViv (PRIO_MAX));
2410 newCONSTSUB (coro_stash, "PRIO_HIGH", newSViv (PRIO_HIGH));
2411 newCONSTSUB (coro_stash, "PRIO_NORMAL", newSViv (PRIO_NORMAL));
2412 newCONSTSUB (coro_stash, "PRIO_LOW", newSViv (PRIO_LOW));
2413 newCONSTSUB (coro_stash, "PRIO_IDLE", newSViv (PRIO_IDLE));
2414 newCONSTSUB (coro_stash, "PRIO_MIN", newSViv (PRIO_MIN));
2415
2416 for (i = PRIO_MAX - PRIO_MIN + 1; i--; )
2417 coro_ready[i] = newAV ();
2418
2419 {
2420 SV *sv = coro_get_sv (aTHX_ "Coro::API", TRUE);
2421
2422 coroapi.schedule = api_schedule;
2423 coroapi.cede = api_cede;
2424 coroapi.cede_notself = api_cede_notself;
2425 coroapi.ready = api_ready;
2426 coroapi.is_ready = api_is_ready;
2427 coroapi.nready = coro_nready;
2428 coroapi.current = coro_current;
2429
2430 GCoroAPI = &coroapi;
2431 sv_setiv (sv, (IV)&coroapi);
2432 SvREADONLY_on (sv);
2433 }
2434}
2435
2436void
2437schedule (...)
2438 CODE:
2439 api_execute_slf (aTHX_ cv, slf_init_schedule, &ST (0), 0);
2440
2441void
2442cede (...)
2443 CODE:
2444 api_execute_slf (aTHX_ cv, slf_init_cede, &ST (0), 0);
2445
2446void
2447cede_notself (...)
2448 CODE:
2449 api_execute_slf (aTHX_ cv, slf_init_cede_notself, &ST (0), 0);
2450
2451void
2452_set_current (SV *current)
395 PROTOTYPE: $ 2453 PROTOTYPE: $
2454 CODE:
2455 SvREFCNT_dec (SvRV (coro_current));
2456 SvRV_set (coro_current, SvREFCNT_inc_NN (SvRV (current)));
2457
2458void
2459_set_readyhook (SV *hook)
2460 PROTOTYPE: $
396 CODE: 2461 CODE:
397 Coro__State coro; 2462 SvREFCNT_dec (coro_readyhook);
2463 coro_readyhook = SvOK (hook) ? newSVsv (hook) : 0;
398 2464
399 if (!SvROK (args) || SvTYPE (SvRV (args)) != SVt_PVAV) 2465int
400 croak ("Coro::State::newprocess expects an arrayref"); 2466prio (Coro::State coro, int newprio = 0)
2467 PROTOTYPE: $;$
2468 ALIAS:
2469 nice = 1
401 2470 CODE:
402 New (0, coro, 1, struct coro); 2471{
403
404 coro->mainstack = 0; /* actual work is done inside transfer */
405 coro->args = (AV *)SvREFCNT_inc (SvRV (args));
406
407 RETVAL = coro; 2472 RETVAL = coro->prio;
408 OUTPUT: 2473
2474 if (items > 1)
2475 {
2476 if (ix)
2477 newprio = coro->prio - newprio;
2478
2479 if (newprio < PRIO_MIN) newprio = PRIO_MIN;
2480 if (newprio > PRIO_MAX) newprio = PRIO_MAX;
2481
2482 coro->prio = newprio;
2483 }
2484}
2485 OUTPUT:
409 RETVAL 2486 RETVAL
410 2487
2488SV *
2489ready (SV *self)
2490 PROTOTYPE: $
2491 CODE:
2492 RETVAL = boolSV (api_ready (aTHX_ self));
2493 OUTPUT:
2494 RETVAL
2495
2496int
2497nready (...)
2498 PROTOTYPE:
2499 CODE:
2500 RETVAL = coro_nready;
2501 OUTPUT:
2502 RETVAL
2503
2504# for async_pool speedup
411void 2505void
412transfer(prev,next) 2506_pool_1 (SV *cb)
413 Coro::State_or_hashref prev 2507 CODE:
414 Coro::State_or_hashref next 2508{
415 CODE: 2509 struct coro *coro = SvSTATE (coro_current);
2510 HV *hv = (HV *)SvRV (coro_current);
2511 AV *defav = GvAV (PL_defgv);
2512 SV *invoke = hv_delete (hv, "_invoke", sizeof ("_invoke") - 1, 0);
2513 AV *invoke_av;
2514 int i, len;
416 2515
417 if (prev != next) 2516 if (!invoke)
418 { 2517 {
2518 SV *old = PL_diehook;
2519 PL_diehook = 0;
2520 SvREFCNT_dec (old);
2521 croak ("\3async_pool terminate\2\n");
2522 }
2523
2524 SvREFCNT_dec (coro->saved_deffh);
2525 coro->saved_deffh = SvREFCNT_inc_NN ((SV *)PL_defoutgv);
2526
2527 hv_store (hv, "desc", sizeof ("desc") - 1,
2528 newSVpvn ("[async_pool]", sizeof ("[async_pool]") - 1), 0);
2529
2530 invoke_av = (AV *)SvRV (invoke);
2531 len = av_len (invoke_av);
2532
2533 sv_setsv (cb, AvARRAY (invoke_av)[0]);
2534
2535 if (len > 0)
2536 {
2537 av_fill (defav, len - 1);
2538 for (i = 0; i < len; ++i)
2539 av_store (defav, i, SvREFCNT_inc_NN (AvARRAY (invoke_av)[i + 1]));
2540 }
2541}
2542
2543void
2544_pool_2 (SV *cb)
2545 CODE:
2546{
2547 struct coro *coro = SvSTATE (coro_current);
2548
2549 sv_setsv (cb, &PL_sv_undef);
2550
2551 SvREFCNT_dec ((SV *)PL_defoutgv); PL_defoutgv = (GV *)coro->saved_deffh;
2552 coro->saved_deffh = 0;
2553
2554 if (coro_rss (aTHX_ coro) > SvUV (sv_pool_rss)
2555 || av_len (av_async_pool) + 1 >= SvIV (sv_pool_size))
2556 {
2557 SV *old = PL_diehook;
2558 PL_diehook = 0;
2559 SvREFCNT_dec (old);
2560 croak ("\3async_pool terminate\2\n");
2561 }
2562
2563 av_clear (GvAV (PL_defgv));
2564 hv_store ((HV *)SvRV (coro_current), "desc", sizeof ("desc") - 1,
2565 newSVpvn ("[async_pool idle]", sizeof ("[async_pool idle]") - 1), 0);
2566
2567 coro->prio = 0;
2568
2569 if (coro->cctx && (coro->cctx->flags & CC_TRACE))
2570 api_trace (aTHX_ coro_current, 0);
2571
2572 av_push (av_async_pool, newSVsv (coro_current));
2573}
2574
2575
2576MODULE = Coro::State PACKAGE = Coro::AIO
2577
2578void
2579_get_state (SV *self)
2580 PROTOTYPE: $
2581 PPCODE:
2582{
2583 AV *defav = GvAV (PL_defgv);
2584 AV *av = newAV ();
2585 int i;
2586 SV *data_sv = newSV (sizeof (struct io_state));
2587 struct io_state *data = (struct io_state *)SvPVX (data_sv);
2588 SvCUR_set (data_sv, sizeof (struct io_state));
2589 SvPOK_only (data_sv);
2590
2591 data->errorno = errno;
2592 data->laststype = PL_laststype;
2593 data->laststatval = PL_laststatval;
2594 data->statcache = PL_statcache;
2595
2596 av_extend (av, AvFILLp (defav) + 1 + 1);
2597
2598 for (i = 0; i <= AvFILLp (defav); ++i)
2599 av_push (av, SvREFCNT_inc_NN (AvARRAY (defav)[i]));
2600
2601 av_push (av, data_sv);
2602
2603 XPUSHs (sv_2mortal (newRV_noinc ((SV *)av)));
2604
2605 api_ready (aTHX_ self);
2606}
2607
2608void
2609_set_state (SV *state)
2610 PROTOTYPE: $
2611 PPCODE:
2612{
2613 AV *av = (AV *)SvRV (state);
2614 struct io_state *data = (struct io_state *)SvPVX (AvARRAY (av)[AvFILLp (av)]);
2615 int i;
2616
2617 errno = data->errorno;
2618 PL_laststype = data->laststype;
2619 PL_laststatval = data->laststatval;
2620 PL_statcache = data->statcache;
2621
2622 EXTEND (SP, AvFILLp (av));
2623 for (i = 0; i < AvFILLp (av); ++i)
2624 PUSHs (sv_2mortal (SvREFCNT_inc_NN (AvARRAY (av)[i])));
2625}
2626
2627
2628MODULE = Coro::State PACKAGE = Coro::AnyEvent
2629
2630BOOT:
2631 sv_activity = coro_get_sv (aTHX_ "Coro::AnyEvent::ACTIVITY", TRUE);
2632
2633void
2634_schedule (...)
2635 CODE:
2636{
2637 static int incede;
2638
2639 api_cede_notself (aTHX);
2640
2641 ++incede;
2642 while (coro_nready >= incede && api_cede (aTHX))
2643 ;
2644
2645 sv_setsv (sv_activity, &PL_sv_undef);
2646 if (coro_nready >= incede)
2647 {
2648 PUSHMARK (SP);
419 PUTBACK; 2649 PUTBACK;
420 SAVE (aTHX_ prev); 2650 call_pv ("Coro::AnyEvent::_activity", G_DISCARD | G_EVAL);
421
422 /*
423 * this could be done in newprocess which would lead to
424 * extremely elegant and fast (just PUTBACK/SAVE/LOAD/SPAGAIN)
425 * code here, but lazy allocation of stacks has also
426 * some virtues and the overhead of the if() is nil.
427 */
428 if (next->mainstack)
429 {
430 LOAD (aTHX_ next);
431 next->mainstack = 0; /* unnecessary but much cleaner */
432 SPAGAIN;
433 }
434 else
435 {
436 /*
437 * emulate part of the perl startup here.
438 */
439 UNOP myop;
440
441 init_stacks (); /* from perl.c */
442 PL_op = (OP *)&myop;
443 /*PL_curcop = 0;*/
444 GvAV (PL_defgv) = (SV *)SvREFCNT_inc (next->args);
445
446 SPAGAIN;
447 Zero(&myop, 1, UNOP);
448 myop.op_next = Nullop;
449 myop.op_flags = OPf_WANT_VOID;
450
451 PUSHMARK(SP);
452 XPUSHs ((SV*)get_cv(SUB_INIT, TRUE));
453 PUTBACK;
454 /*
455 * the next line is slightly wrong, as PL_op->op_next
456 * is actually being executed so we skip the first op.
457 * that doesn't matter, though, since it is only
458 * pp_nextstate and we never return...
459 */
460 PL_op = Perl_pp_entersub(aTHX);
461 SPAGAIN;
462
463 ENTER;
464 }
465 }
466
467void
468DESTROY(coro)
469 Coro::State coro
470 CODE:
471
472 if (coro->mainstack)
473 {
474 struct coro temp;
475
476 PUTBACK;
477 SAVE(aTHX_ (&temp));
478 LOAD(aTHX_ coro);
479
480 destroy_stacks ();
481 SvREFCNT_dec ((SV *)GvAV (PL_defgv));
482
483 LOAD((&temp));
484 SPAGAIN; 2651 SPAGAIN;
485 } 2652 }
486 2653
487 SvREFCNT_dec (coro->args); 2654 --incede;
488 Safefree (coro); 2655}
489 2656
490 2657
2658MODULE = Coro::State PACKAGE = PerlIO::cede
2659
2660BOOT:
2661 PerlIO_define_layer (aTHX_ &PerlIO_cede);
2662
2663MODULE = Coro::State PACKAGE = Coro::Semaphore
2664
2665SV *
2666new (SV *klass, SV *count_ = 0)
2667 CODE:
2668{
2669 /* a semaphore contains a counter IV in $sem->[0] and any waiters after that */
2670 AV *av = newAV ();
2671 av_push (av, newSViv (count_ && SvOK (count_) ? SvIV (count_) : 1));
2672 RETVAL = sv_bless (newRV_noinc ((SV *)av), GvSTASH (CvGV (cv)));
2673}
2674 OUTPUT:
2675 RETVAL
2676
2677SV *
2678count (SV *self)
2679 CODE:
2680 RETVAL = newSVsv (AvARRAY ((AV *)SvRV (self))[0]);
2681 OUTPUT:
2682 RETVAL
2683
2684void
2685up (SV *self, int adjust = 1)
2686 ALIAS:
2687 adjust = 1
2688 CODE:
2689 coro_semaphore_adjust ((AV *)SvRV (self), ix ? adjust : 1);
2690
2691void
2692down (SV *self)
2693 CODE:
2694 api_execute_slf (aTHX_ cv, slf_init_semaphore_down, &ST (0), 1);
2695
2696void
2697try (SV *self)
2698 PPCODE:
2699{
2700 AV *av = (AV *)SvRV (self);
2701 SV *count_sv = AvARRAY (av)[0];
2702 IV count = SvIVX (count_sv);
2703
2704 if (count > 0)
2705 {
2706 --count;
2707 SvIVX (count_sv) = count;
2708 XSRETURN_YES;
2709 }
2710 else
2711 XSRETURN_NO;
2712}
2713
2714void
2715waiters (SV *self)
2716 CODE:
2717{
2718 AV *av = (AV *)SvRV (self);
2719
2720 if (GIMME_V == G_SCALAR)
2721 XPUSHs (sv_2mortal (newSVsv (AvARRAY (av)[0])));
2722 else
2723 {
2724 int i;
2725 EXTEND (SP, AvFILLp (av) + 1 - 1);
2726 for (i = 1; i <= AvFILLp (av); ++i)
2727 PUSHs (newSVsv (AvARRAY (av)[i]));
2728 }
2729}
2730

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines