ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/Coro/Coro/State.xs
(Generate patch)

Comparing Coro/Coro/State.xs (file contents):
Revision 1.6 by root, Tue Jul 17 15:42:28 2001 UTC vs.
Revision 1.328 by root, Tue Nov 25 09:49:43 2008 UTC

1#include "libcoro/coro.c"
2
3#define PERL_NO_GET_CONTEXT
4#define PERL_EXT
5
1#include "EXTERN.h" 6#include "EXTERN.h"
2#include "perl.h" 7#include "perl.h"
3#include "XSUB.h" 8#include "XSUB.h"
9#include "perliol.h"
4 10
5#if 0 11#include "patchlevel.h"
6# define CHK(x) (void *)0 12
13#include <stdio.h>
14#include <errno.h>
15#include <assert.h>
16
17#ifdef WIN32
18# undef setjmp
19# undef longjmp
20# undef _exit
21# define setjmp _setjmp /* deep magic */
7#else 22#else
8# define CHK(x) if (!(x)) croak("FATAL, CHK: " #x) 23# include <inttypes.h> /* most portable stdint.h */
24#endif
25
26#ifdef HAVE_MMAP
27# include <unistd.h>
28# include <sys/mman.h>
29# ifndef MAP_ANONYMOUS
30# ifdef MAP_ANON
31# define MAP_ANONYMOUS MAP_ANON
32# else
33# undef HAVE_MMAP
34# endif
9#endif 35# endif
36# include <limits.h>
37# ifndef PAGESIZE
38# define PAGESIZE pagesize
39# define BOOT_PAGESIZE pagesize = sysconf (_SC_PAGESIZE)
40static long pagesize;
41# else
42# define BOOT_PAGESIZE (void)0
43# endif
44#else
45# define PAGESIZE 0
46# define BOOT_PAGESIZE (void)0
47#endif
10 48
49#if CORO_USE_VALGRIND
50# include <valgrind/valgrind.h>
51#endif
52
53/* the maximum number of idle cctx that will be pooled */
54static int cctx_max_idle = 4;
55
56#define PERL_VERSION_ATLEAST(a,b,c) \
57 (PERL_REVISION > (a) \
58 || (PERL_REVISION == (a) \
59 && (PERL_VERSION > (b) \
60 || (PERL_VERSION == (b) && PERL_SUBVERSION >= (c)))))
61
62#if !PERL_VERSION_ATLEAST (5,6,0)
63# ifndef PL_ppaddr
64# define PL_ppaddr ppaddr
65# endif
66# ifndef call_sv
67# define call_sv perl_call_sv
68# endif
69# ifndef get_sv
70# define get_sv perl_get_sv
71# endif
72# ifndef get_cv
73# define get_cv perl_get_cv
74# endif
75# ifndef IS_PADGV
76# define IS_PADGV(v) 0
77# endif
78# ifndef IS_PADCONST
79# define IS_PADCONST(v) 0
80# endif
81#endif
82
83/* 5.11 */
84#ifndef CxHASARGS
85# define CxHASARGS(cx) (cx)->blk_sub.hasargs
86#endif
87
88/* 5.10.0 */
89#ifndef SvREFCNT_inc_NN
90# define SvREFCNT_inc_NN(sv) SvREFCNT_inc (sv)
91#endif
92
93/* 5.8.8 */
94#ifndef GV_NOTQUAL
95# define GV_NOTQUAL 0
96#endif
97#ifndef newSV
98# define newSV(l) NEWSV(0,l)
99#endif
100#ifndef CvISXSUB_on
101# define CvISXSUB_on(cv) (void)cv
102#endif
103#ifndef CvISXSUB
104# define CvISXSUB(cv) (CvXSUB (cv) ? TRUE : FALSE)
105#endif
106#ifndef Newx
107# define Newx(ptr,nitems,type) New (0,ptr,nitems,type)
108#endif
109
110/* 5.8.7 */
111#ifndef SvRV_set
112# define SvRV_set(s,v) SvRV(s) = (v)
113#endif
114
115#if !__i386 && !__x86_64 && !__powerpc && !__m68k && !__alpha && !__mips && !__sparc64
116# undef CORO_STACKGUARD
117#endif
118
119#ifndef CORO_STACKGUARD
120# define CORO_STACKGUARD 0
121#endif
122
123/* prefer perl internal functions over our own? */
124#ifndef CORO_PREFER_PERL_FUNCTIONS
125# define CORO_PREFER_PERL_FUNCTIONS 0
126#endif
127
128/* The next macros try to return the current stack pointer, in an as
129 * portable way as possible. */
130#if __GNUC__ >= 4
131# define dSTACKLEVEL int stacklevel_dummy
132# define STACKLEVEL __builtin_frame_address (0)
133#else
134# define dSTACKLEVEL volatile void *stacklevel
135# define STACKLEVEL ((void *)&stacklevel)
136#endif
137
138#define IN_DESTRUCT (PL_main_cv == Nullcv)
139
140#if __GNUC__ >= 3
141# define attribute(x) __attribute__(x)
142# define expect(expr,value) __builtin_expect ((expr),(value))
143# define INLINE static inline
144#else
145# define attribute(x)
146# define expect(expr,value) (expr)
147# define INLINE static
148#endif
149
150#define expect_false(expr) expect ((expr) != 0, 0)
151#define expect_true(expr) expect ((expr) != 0, 1)
152
153#define NOINLINE attribute ((noinline))
154
155#include "CoroAPI.h"
156#define GCoroAPI (&coroapi) /* very sneaky */
157
158#ifdef USE_ITHREADS
159# if CORO_PTHREAD
160static void *coro_thx;
161# endif
162#endif
163
164static double (*nvtime)(); /* so why doesn't it take void? */
165
166/* we hijack an hopefully unused CV flag for our purposes */
167#define CVf_SLF 0x4000
168static OP *pp_slf (pTHX);
169
170static U32 cctx_gen;
171static size_t cctx_stacksize = CORO_STACKSIZE;
172static struct CoroAPI coroapi;
173static AV *main_mainstack; /* used to differentiate between $main and others */
174static JMPENV *main_top_env;
175static HV *coro_state_stash, *coro_stash;
176static volatile SV *coro_mortal; /* will be freed/thrown after next transfer */
177
178static AV *av_destroy; /* destruction queue */
179static SV *sv_manager; /* the manager coro */
180static SV *sv_idle; /* $Coro::idle */
181
182static GV *irsgv; /* $/ */
183static GV *stdoutgv; /* *STDOUT */
184static SV *rv_diehook;
185static SV *rv_warnhook;
186static HV *hv_sig; /* %SIG */
187
188/* async_pool helper stuff */
189static SV *sv_pool_rss;
190static SV *sv_pool_size;
191static SV *sv_async_pool_idle; /* description string */
192static AV *av_async_pool; /* idle pool */
193static SV *sv_Coro; /* class string */
194static CV *cv_pool_handler;
195static CV *cv_coro_state_new;
196
197/* Coro::AnyEvent */
198static SV *sv_activity;
199
200static struct coro_cctx *cctx_first;
201static int cctx_count, cctx_idle;
202
203enum {
204 CC_MAPPED = 0x01,
205 CC_NOREUSE = 0x02, /* throw this away after tracing */
206 CC_TRACE = 0x04,
207 CC_TRACE_SUB = 0x08, /* trace sub calls */
208 CC_TRACE_LINE = 0x10, /* trace each statement */
209 CC_TRACE_ALL = CC_TRACE_SUB | CC_TRACE_LINE,
210};
211
212/* this is a structure representing a c-level coroutine */
213typedef struct coro_cctx
214{
215 struct coro_cctx *next;
216
217 /* the stack */
218 void *sptr;
219 size_t ssize;
220
221 /* cpu state */
222 void *idle_sp; /* sp of top-level transfer/schedule/cede call */
223 JMPENV *idle_te; /* same as idle_sp, but for top_env, TODO: remove once stable */
224 JMPENV *top_env;
225 coro_context cctx;
226
227 U32 gen;
228#if CORO_USE_VALGRIND
229 int valgrind_id;
230#endif
231 unsigned char flags;
232} coro_cctx;
233
234coro_cctx *cctx_current; /* the currently running cctx */
235
236/*****************************************************************************/
237
238enum {
239 CF_RUNNING = 0x0001, /* coroutine is running */
240 CF_READY = 0x0002, /* coroutine is ready */
241 CF_NEW = 0x0004, /* has never been switched to */
242 CF_DESTROYED = 0x0008, /* coroutine data has been freed */
243};
244
245/* the structure where most of the perl state is stored, overlaid on the cxstack */
246typedef struct
247{
248 SV *defsv;
249 AV *defav;
250 SV *errsv;
251 SV *irsgv;
252 HV *hinthv;
253#define VAR(name,type) type name;
254# include "state.h"
255#undef VAR
256} perl_slots;
257
258#define SLOT_COUNT ((sizeof (perl_slots) + sizeof (PERL_CONTEXT) - 1) / sizeof (PERL_CONTEXT))
259
260/* this is a structure representing a perl-level coroutine */
11struct coro { 261struct coro {
12 U8 dowarn; 262 /* the C coroutine allocated to this perl coroutine, if any */
13 AV *defav; 263 coro_cctx *cctx;
14 264
15 PERL_SI *curstackinfo; 265 /* state data */
16 AV *curstack; 266 struct CoroSLF slf_frame; /* saved slf frame */
17 AV *mainstack; 267 AV *mainstack;
18 SV **stack_sp; 268 perl_slots *slot; /* basically the saved sp */
19 OP *op;
20 SV **curpad;
21 SV **stack_base;
22 SV **stack_max;
23 SV **tmps_stack;
24 I32 tmps_floor;
25 I32 tmps_ix;
26 I32 tmps_max;
27 I32 *markstack;
28 I32 *markstack_ptr;
29 I32 *markstack_max;
30 I32 *scopestack;
31 I32 scopestack_ix;
32 I32 scopestack_max;
33 ANY *savestack;
34 I32 savestack_ix;
35 I32 savestack_max;
36 OP **retstack;
37 I32 retstack_ix;
38 I32 retstack_max;
39 COP *curcop;
40 269
41 AV *args; 270 CV *startcv; /* the CV to execute */
271 AV *args; /* data associated with this coroutine (initial args) */
272 int refcnt; /* coroutines are refcounted, yes */
273 int flags; /* CF_ flags */
274 HV *hv; /* the perl hash associated with this coro, if any */
275 void (*on_destroy)(pTHX_ struct coro *coro);
276
277 /* statistics */
278 int usecount; /* number of transfers to this coro */
279
280 /* coro process data */
281 int prio;
282 SV *except; /* exception to be thrown */
283 SV *rouse_cb;
284
285 /* async_pool */
286 SV *saved_deffh;
287 SV *invoke_cb;
288 AV *invoke_av;
289
290 /* linked list */
291 struct coro *next, *prev;
42}; 292};
43 293
44typedef struct coro *Coro__State; 294typedef struct coro *Coro__State;
45typedef struct coro *Coro__State_or_hashref; 295typedef struct coro *Coro__State_or_hashref;
46 296
47static HV *padlist_cache; 297/* the following variables are effectively part of the perl context */
298/* and get copied between struct coro and these variables */
299/* the mainr easonw e don't support windows process emulation */
300static struct CoroSLF slf_frame; /* the current slf frame */
48 301
49/* mostly copied from op.c:cv_clone2 */ 302/** Coro ********************************************************************/
50STATIC AV * 303
51clone_padlist (AV *protopadlist) 304#define PRIO_MAX 3
305#define PRIO_HIGH 1
306#define PRIO_NORMAL 0
307#define PRIO_LOW -1
308#define PRIO_IDLE -3
309#define PRIO_MIN -4
310
311/* for Coro.pm */
312static SV *coro_current;
313static SV *coro_readyhook;
314static AV *coro_ready [PRIO_MAX - PRIO_MIN + 1];
315static CV *cv_coro_run, *cv_coro_terminate;
316static struct coro *coro_first;
317#define coro_nready coroapi.nready
318
319/** lowlevel stuff **********************************************************/
320
321static SV *
322coro_get_sv (pTHX_ const char *name, int create)
52{ 323{
53 AV *av; 324#if PERL_VERSION_ATLEAST (5,10,0)
54 I32 ix; 325 /* silence stupid and wrong 5.10 warning that I am unable to switch off */
55 AV *protopad_name = (AV *) * av_fetch (protopadlist, 0, FALSE); 326 get_sv (name, create);
56 AV *protopad = (AV *) * av_fetch (protopadlist, 1, FALSE); 327#endif
57 SV **pname = AvARRAY (protopad_name); 328 return get_sv (name, create);
58 SV **ppad = AvARRAY (protopad); 329}
59 I32 fname = AvFILLp (protopad_name); 330
60 I32 fpad = AvFILLp (protopad); 331static AV *
332coro_get_av (pTHX_ const char *name, int create)
333{
334#if PERL_VERSION_ATLEAST (5,10,0)
335 /* silence stupid and wrong 5.10 warning that I am unable to switch off */
336 get_av (name, create);
337#endif
338 return get_av (name, create);
339}
340
341static HV *
342coro_get_hv (pTHX_ const char *name, int create)
343{
344#if PERL_VERSION_ATLEAST (5,10,0)
345 /* silence stupid and wrong 5.10 warning that I am unable to switch off */
346 get_hv (name, create);
347#endif
348 return get_hv (name, create);
349}
350
351/* may croak */
352INLINE CV *
353coro_sv_2cv (pTHX_ SV *sv)
354{
355 HV *st;
356 GV *gvp;
357 return sv_2cv (sv, &st, &gvp, 0);
358}
359
360static AV *
361coro_derive_padlist (pTHX_ CV *cv)
362{
363 AV *padlist = CvPADLIST (cv);
61 AV *newpadlist, *newpad_name, *newpad; 364 AV *newpadlist, *newpad;
62 SV **npad;
63
64 newpad_name = newAV ();
65 for (ix = fname; ix >= 0; ix--)
66 av_store (newpad_name, ix, SvREFCNT_inc (pname[ix]));
67
68 newpad = newAV ();
69 av_fill (newpad, AvFILLp (protopad));
70 npad = AvARRAY (newpad);
71 365
72 newpadlist = newAV (); 366 newpadlist = newAV ();
73 AvREAL_off (newpadlist); 367 AvREAL_off (newpadlist);
74 av_store (newpadlist, 0, (SV *) newpad_name); 368#if PERL_VERSION_ATLEAST (5,10,0)
369 Perl_pad_push (aTHX_ padlist, AvFILLp (padlist) + 1);
370#else
371 Perl_pad_push (aTHX_ padlist, AvFILLp (padlist) + 1, 1);
372#endif
373 newpad = (AV *)AvARRAY (padlist)[AvFILLp (padlist)];
374 --AvFILLp (padlist);
375
376 av_store (newpadlist, 0, SvREFCNT_inc_NN (*av_fetch (padlist, 0, FALSE)));
75 av_store (newpadlist, 1, (SV *) newpad); 377 av_store (newpadlist, 1, (SV *)newpad);
76 378
77 av = newAV (); /* will be @_ */ 379 return newpadlist;
78 av_extend (av, 0); 380}
79 av_store (newpad, 0, (SV *) av);
80 AvFLAGS (av) = AVf_REIFY;
81 381
82 for (ix = fpad; ix > 0; ix--) 382static void
383free_padlist (pTHX_ AV *padlist)
384{
385 /* may be during global destruction */
386 if (SvREFCNT (padlist))
83 { 387 {
84 SV *namesv = (ix <= fname) ? pname[ix] : Nullsv; 388 I32 i = AvFILLp (padlist);
85 if (namesv && namesv != &PL_sv_undef) 389 while (i >= 0)
86 { 390 {
87 char *name = SvPVX (namesv); /* XXX */ 391 SV **svp = av_fetch (padlist, i--, FALSE);
88 if (SvFLAGS (namesv) & SVf_FAKE || *name == '&') 392 if (svp)
89 { /* lexical from outside? */
90 npad[ix] = SvREFCNT_inc (ppad[ix]);
91 } 393 {
92 else
93 { /* our own lexical */
94 SV *sv; 394 SV *sv;
95 if (*name == '&') 395 while (&PL_sv_undef != (sv = av_pop ((AV *)*svp)))
96 sv = SvREFCNT_inc (ppad[ix]); 396 SvREFCNT_dec (sv);
97 else if (*name == '@') 397
98 sv = (SV *) newAV (); 398 SvREFCNT_dec (*svp);
99 else if (*name == '%')
100 sv = (SV *) newHV ();
101 else
102 sv = NEWSV (0, 0);
103 if (!SvPADBUSY (sv))
104 SvPADMY_on (sv);
105 npad[ix] = sv;
106 } 399 }
107 } 400 }
108 else if (IS_PADGV (ppad[ix]) || IS_PADCONST (ppad[ix]))
109 {
110 npad[ix] = SvREFCNT_inc (ppad[ix]);
111 }
112 else
113 {
114 SV *sv = NEWSV (0, 0);
115 SvPADTMP_on (sv);
116 npad[ix] = sv;
117 }
118 }
119 401
120#if 0 /* NONOTUNDERSTOOD */
121 /* Now that vars are all in place, clone nested closures. */
122
123 for (ix = fpad; ix > 0; ix--) {
124 SV* namesv = (ix <= fname) ? pname[ix] : Nullsv;
125 if (namesv
126 && namesv != &PL_sv_undef
127 && !(SvFLAGS(namesv) & SVf_FAKE)
128 && *SvPVX(namesv) == '&'
129 && CvCLONE(ppad[ix]))
130 {
131 CV *kid = cv_clone((CV*)ppad[ix]);
132 SvREFCNT_dec(ppad[ix]);
133 CvCLONE_on(kid);
134 SvPADMY_on(kid);
135 npad[ix] = (SV*)kid;
136 }
137 }
138#endif
139
140 return newpadlist;
141}
142
143STATIC AV *
144free_padlist (AV *padlist)
145{
146 /* may be during global destruction */
147 if (SvREFCNT(padlist))
148 {
149 I32 i = AvFILLp(padlist);
150 while (i >= 0)
151 {
152 SV **svp = av_fetch(padlist, i--, FALSE);
153 SV *sv = svp ? *svp : Nullsv;
154 if (sv)
155 SvREFCNT_dec(sv);
156 }
157
158 SvREFCNT_dec((SV*)padlist); 402 SvREFCNT_dec ((SV*)padlist);
403 }
404}
405
406static int
407coro_cv_free (pTHX_ SV *sv, MAGIC *mg)
408{
409 AV *padlist;
410 AV *av = (AV *)mg->mg_obj;
411
412 /* casting is fun. */
413 while (&PL_sv_undef != (SV *)(padlist = (AV *)av_pop (av)))
414 free_padlist (aTHX_ padlist);
415
416 SvREFCNT_dec (av); /* sv_magicext increased the refcount */
417
418 return 0;
419}
420
421#define CORO_MAGIC_type_cv 26
422#define CORO_MAGIC_type_state PERL_MAGIC_ext
423
424static MGVTBL coro_cv_vtbl = {
425 0, 0, 0, 0,
426 coro_cv_free
427};
428
429#define CORO_MAGIC_NN(sv, type) \
430 (expect_true (SvMAGIC (sv)->mg_type == type) \
431 ? SvMAGIC (sv) \
432 : mg_find (sv, type))
433
434#define CORO_MAGIC(sv, type) \
435 (expect_true (SvMAGIC (sv)) \
436 ? CORO_MAGIC_NN (sv, type) \
437 : 0)
438
439#define CORO_MAGIC_cv(cv) CORO_MAGIC (((SV *)(cv)), CORO_MAGIC_type_cv)
440#define CORO_MAGIC_state(sv) CORO_MAGIC_NN (((SV *)(sv)), CORO_MAGIC_type_state)
441
442INLINE struct coro *
443SvSTATE_ (pTHX_ SV *coro)
444{
445 HV *stash;
446 MAGIC *mg;
447
448 if (SvROK (coro))
449 coro = SvRV (coro);
450
451 if (expect_false (SvTYPE (coro) != SVt_PVHV))
452 croak ("Coro::State object required");
453
454 stash = SvSTASH (coro);
455 if (expect_false (stash != coro_stash && stash != coro_state_stash))
456 {
457 /* very slow, but rare, check */
458 if (!sv_derived_from (sv_2mortal (newRV_inc (coro)), "Coro::State"))
459 croak ("Coro::State object required");
460 }
461
462 mg = CORO_MAGIC_state (coro);
463 return (struct coro *)mg->mg_ptr;
464}
465
466#define SvSTATE(sv) SvSTATE_ (aTHX_ (sv))
467
468/* faster than SvSTATE, but expects a coroutine hv */
469#define SvSTATE_hv(hv) ((struct coro *)CORO_MAGIC_NN ((SV *)hv, CORO_MAGIC_type_state)->mg_ptr)
470#define SvSTATE_current SvSTATE_hv (SvRV (coro_current))
471
472/* the next two functions merely cache the padlists */
473static void
474get_padlist (pTHX_ CV *cv)
475{
476 MAGIC *mg = CORO_MAGIC_cv (cv);
477 AV *av;
478
479 if (expect_true (mg && AvFILLp ((av = (AV *)mg->mg_obj)) >= 0))
480 CvPADLIST (cv) = (AV *)AvARRAY (av)[AvFILLp (av)--];
481 else
482 {
483#if CORO_PREFER_PERL_FUNCTIONS
484 /* this is probably cleaner? but also slower! */
485 /* in practise, it seems to be less stable */
486 CV *cp = Perl_cv_clone (aTHX_ cv);
487 CvPADLIST (cv) = CvPADLIST (cp);
488 CvPADLIST (cp) = 0;
489 SvREFCNT_dec (cp);
490#else
491 CvPADLIST (cv) = coro_derive_padlist (aTHX_ cv);
492#endif
493 }
494}
495
496static void
497put_padlist (pTHX_ CV *cv)
498{
499 MAGIC *mg = CORO_MAGIC_cv (cv);
500 AV *av;
501
502 if (expect_false (!mg))
503 mg = sv_magicext ((SV *)cv, (SV *)newAV (), CORO_MAGIC_type_cv, &coro_cv_vtbl, 0, 0);
504
505 av = (AV *)mg->mg_obj;
506
507 if (expect_false (AvFILLp (av) >= AvMAX (av)))
508 av_extend (av, AvMAX (av) + 1);
509
510 AvARRAY (av)[++AvFILLp (av)] = (SV *)CvPADLIST (cv);
511}
512
513/** load & save, init *******************************************************/
514
515static void
516load_perl (pTHX_ Coro__State c)
517{
518 perl_slots *slot = c->slot;
519 c->slot = 0;
520
521 PL_mainstack = c->mainstack;
522
523 GvSV (PL_defgv) = slot->defsv;
524 GvAV (PL_defgv) = slot->defav;
525 GvSV (PL_errgv) = slot->errsv;
526 GvSV (irsgv) = slot->irsgv;
527 GvHV (PL_hintgv) = slot->hinthv;
528
529 #define VAR(name,type) PL_ ## name = slot->name;
530 # include "state.h"
531 #undef VAR
532
533 {
534 dSP;
535
536 CV *cv;
537
538 /* now do the ugly restore mess */
539 while (expect_true (cv = (CV *)POPs))
540 {
541 put_padlist (aTHX_ cv); /* mark this padlist as available */
542 CvDEPTH (cv) = PTR2IV (POPs);
543 CvPADLIST (cv) = (AV *)POPs;
544 }
545
546 PUTBACK;
159 } 547 }
160}
161 548
162/* the next tow functions merely cache the padlists */ 549 slf_frame = c->slf_frame;
163STATIC void 550 CORO_THROW = c->except;
164get_padlist (CV *cv)
165{
166 SV **he = hv_fetch (padlist_cache, (void *)&cv, sizeof (CV *), 0);
167
168 if (he && AvFILLp ((AV *)*he) >= 0)
169 CvPADLIST (cv) = (AV *)av_pop ((AV *)*he);
170 else
171 CvPADLIST (cv) = clone_padlist (CvPADLIST (cv));
172} 551}
173 552
174STATIC void
175put_padlist (CV *cv)
176{
177 SV **he = hv_fetch (padlist_cache, (void *)&cv, sizeof (CV *), 1);
178
179 if (SvTYPE (*he) != SVt_PVAV)
180 {
181 SvREFCNT_dec (*he);
182 *he = (SV *)newAV ();
183 }
184
185 av_push ((AV *)*he, (SV *)CvPADLIST (cv));
186}
187
188static void 553static void
189save_state(pTHX_ Coro__State c) 554save_perl (pTHX_ Coro__State c)
190{ 555{
556 c->except = CORO_THROW;
557 c->slf_frame = slf_frame;
558
191 { 559 {
192 dSP; 560 dSP;
193 I32 cxix = cxstack_ix; 561 I32 cxix = cxstack_ix;
562 PERL_CONTEXT *ccstk = cxstack;
194 PERL_SI *top_si = PL_curstackinfo; 563 PERL_SI *top_si = PL_curstackinfo;
195 PERL_CONTEXT *ccstk = cxstack;
196 564
197 /* 565 /*
198 * the worst thing you can imagine happens first - we have to save 566 * the worst thing you can imagine happens first - we have to save
199 * (and reinitialize) all cv's in the whole callchain :( 567 * (and reinitialize) all cv's in the whole callchain :(
200 */ 568 */
201 569
202 PUSHs (Nullsv); 570 XPUSHs (Nullsv);
203 /* this loop was inspired by pp_caller */ 571 /* this loop was inspired by pp_caller */
204 for (;;) 572 for (;;)
205 { 573 {
206 while (cxix >= 0) 574 while (expect_true (cxix >= 0))
207 { 575 {
208 PERL_CONTEXT *cx = &ccstk[cxix--]; 576 PERL_CONTEXT *cx = &ccstk[cxix--];
209 577
210 if (CxTYPE(cx) == CXt_SUB) 578 if (expect_true (CxTYPE (cx) == CXt_SUB || CxTYPE (cx) == CXt_FORMAT))
211 { 579 {
212 CV *cv = cx->blk_sub.cv; 580 CV *cv = cx->blk_sub.cv;
581
213 if (CvDEPTH(cv)) 582 if (expect_true (CvDEPTH (cv)))
214 { 583 {
215#ifdef USE_THREADS
216 XPUSHs ((SV *)CvOWNER(cv));
217#endif
218 EXTEND (SP, 3); 584 EXTEND (SP, 3);
219 PUSHs ((SV *)CvDEPTH(cv));
220 PUSHs ((SV *)CvPADLIST(cv)); 585 PUSHs ((SV *)CvPADLIST (cv));
586 PUSHs (INT2PTR (SV *, (IV)CvDEPTH (cv)));
221 PUSHs ((SV *)cv); 587 PUSHs ((SV *)cv);
222 588
223 get_padlist (cv);
224
225 CvDEPTH(cv) = 0; 589 CvDEPTH (cv) = 0;
226#ifdef USE_THREADS 590 get_padlist (aTHX_ cv);
227 CvOWNER(cv) = 0;
228 error must unlock this cv etc.. etc...
229 if you are here wondering about this error message then
230 the reason is that it will not work as advertised yet
231#endif
232 } 591 }
233 } 592 }
234 else if (CxTYPE(cx) == CXt_FORMAT) 593 }
594
595 if (expect_true (top_si->si_type == PERLSI_MAIN))
596 break;
597
598 top_si = top_si->si_prev;
599 ccstk = top_si->si_cxstack;
600 cxix = top_si->si_cxix;
601 }
602
603 PUTBACK;
604 }
605
606 /* allocate some space on the context stack for our purposes */
607 /* we manually unroll here, as usually 2 slots is enough */
608 if (SLOT_COUNT >= 1) CXINC;
609 if (SLOT_COUNT >= 2) CXINC;
610 if (SLOT_COUNT >= 3) CXINC;
611 {
612 int i;
613 for (i = 3; i < SLOT_COUNT; ++i)
614 CXINC;
615 }
616 cxstack_ix -= SLOT_COUNT; /* undo allocation */
617
618 c->mainstack = PL_mainstack;
619
620 {
621 perl_slots *slot = c->slot = (perl_slots *)(cxstack + cxstack_ix + 1);
622
623 slot->defav = GvAV (PL_defgv);
624 slot->defsv = DEFSV;
625 slot->errsv = ERRSV;
626 slot->irsgv = GvSV (irsgv);
627 slot->hinthv = GvHV (PL_hintgv);
628
629 #define VAR(name,type) slot->name = PL_ ## name;
630 # include "state.h"
631 #undef VAR
632 }
633}
634
635/*
636 * allocate various perl stacks. This is almost an exact copy
637 * of perl.c:init_stacks, except that it uses less memory
638 * on the (sometimes correct) assumption that coroutines do
639 * not usually need a lot of stackspace.
640 */
641#if CORO_PREFER_PERL_FUNCTIONS
642# define coro_init_stacks(thx) init_stacks ()
643#else
644static void
645coro_init_stacks (pTHX)
646{
647 PL_curstackinfo = new_stackinfo(32, 8);
648 PL_curstackinfo->si_type = PERLSI_MAIN;
649 PL_curstack = PL_curstackinfo->si_stack;
650 PL_mainstack = PL_curstack; /* remember in case we switch stacks */
651
652 PL_stack_base = AvARRAY(PL_curstack);
653 PL_stack_sp = PL_stack_base;
654 PL_stack_max = PL_stack_base + AvMAX(PL_curstack);
655
656 New(50,PL_tmps_stack,32,SV*);
657 PL_tmps_floor = -1;
658 PL_tmps_ix = -1;
659 PL_tmps_max = 32;
660
661 New(54,PL_markstack,16,I32);
662 PL_markstack_ptr = PL_markstack;
663 PL_markstack_max = PL_markstack + 16;
664
665#ifdef SET_MARK_OFFSET
666 SET_MARK_OFFSET;
667#endif
668
669 New(54,PL_scopestack,8,I32);
670 PL_scopestack_ix = 0;
671 PL_scopestack_max = 8;
672
673 New(54,PL_savestack,24,ANY);
674 PL_savestack_ix = 0;
675 PL_savestack_max = 24;
676
677#if !PERL_VERSION_ATLEAST (5,10,0)
678 New(54,PL_retstack,4,OP*);
679 PL_retstack_ix = 0;
680 PL_retstack_max = 4;
681#endif
682}
683#endif
684
685/*
686 * destroy the stacks, the callchain etc...
687 */
688static void
689coro_destruct_stacks (pTHX)
690{
691 while (PL_curstackinfo->si_next)
692 PL_curstackinfo = PL_curstackinfo->si_next;
693
694 while (PL_curstackinfo)
695 {
696 PERL_SI *p = PL_curstackinfo->si_prev;
697
698 if (!IN_DESTRUCT)
699 SvREFCNT_dec (PL_curstackinfo->si_stack);
700
701 Safefree (PL_curstackinfo->si_cxstack);
702 Safefree (PL_curstackinfo);
703 PL_curstackinfo = p;
704 }
705
706 Safefree (PL_tmps_stack);
707 Safefree (PL_markstack);
708 Safefree (PL_scopestack);
709 Safefree (PL_savestack);
710#if !PERL_VERSION_ATLEAST (5,10,0)
711 Safefree (PL_retstack);
712#endif
713}
714
715#define CORO_RSS \
716 rss += sizeof (SYM (curstackinfo)); \
717 rss += (SYM (curstackinfo->si_cxmax) + 1) * sizeof (PERL_CONTEXT); \
718 rss += sizeof (SV) + sizeof (struct xpvav) + (1 + AvMAX (SYM (curstack))) * sizeof (SV *); \
719 rss += SYM (tmps_max) * sizeof (SV *); \
720 rss += (SYM (markstack_max) - SYM (markstack_ptr)) * sizeof (I32); \
721 rss += SYM (scopestack_max) * sizeof (I32); \
722 rss += SYM (savestack_max) * sizeof (ANY);
723
724static size_t
725coro_rss (pTHX_ struct coro *coro)
726{
727 size_t rss = sizeof (*coro);
728
729 if (coro->mainstack)
730 {
731 if (coro->flags & CF_RUNNING)
732 {
733 #define SYM(sym) PL_ ## sym
734 CORO_RSS;
735 #undef SYM
736 }
737 else
738 {
739 #define SYM(sym) coro->slot->sym
740 CORO_RSS;
741 #undef SYM
742 }
743 }
744
745 return rss;
746}
747
748/** coroutine stack handling ************************************************/
749
750static int (*orig_sigelem_get) (pTHX_ SV *sv, MAGIC *mg);
751static int (*orig_sigelem_set) (pTHX_ SV *sv, MAGIC *mg);
752static int (*orig_sigelem_clr) (pTHX_ SV *sv, MAGIC *mg);
753
754/* apparently < 5.8.8 */
755#ifndef MgPV_nolen_const
756#define MgPV_nolen_const(mg) (((((int)(mg)->mg_len)) == HEf_SVKEY) ? \
757 SvPV_nolen((SV*)((mg)->mg_ptr)) : \
758 (const char*)(mg)->mg_ptr)
759#endif
760
761/*
762 * This overrides the default magic get method of %SIG elements.
763 * The original one doesn't provide for reading back of PL_diehook/PL_warnhook
764 * and instead of tryign to save and restore the hash elements, we just provide
765 * readback here.
766 * We only do this when the hook is != 0, as they are often set to 0 temporarily,
767 * not expecting this to actually change the hook. This is a potential problem
768 * when a schedule happens then, but we ignore this.
769 */
770static int
771coro_sigelem_get (pTHX_ SV *sv, MAGIC *mg)
772{
773 const char *s = MgPV_nolen_const (mg);
774
775 if (*s == '_')
776 {
777 SV **svp = 0;
778
779 if (strEQ (s, "__DIE__" )) svp = &PL_diehook;
780 if (strEQ (s, "__WARN__")) svp = &PL_warnhook;
781
782 if (svp)
783 {
784 sv_setsv (sv, *svp ? *svp : &PL_sv_undef);
785 return 0;
786 }
787 }
788
789 return orig_sigelem_get ? orig_sigelem_get (aTHX_ sv, mg) : 0;
790}
791
792static int
793coro_sigelem_clr (pTHX_ SV *sv, MAGIC *mg)
794{
795 const char *s = MgPV_nolen_const (mg);
796
797 if (*s == '_')
798 {
799 SV **svp = 0;
800
801 if (strEQ (s, "__DIE__" )) svp = &PL_diehook;
802 if (strEQ (s, "__WARN__")) svp = &PL_warnhook;
803
804 if (svp)
805 {
806 SV *old = *svp;
807 *svp = 0;
808 SvREFCNT_dec (old);
809 return 0;
810 }
811 }
812
813 return orig_sigelem_clr ? orig_sigelem_clr (aTHX_ sv, mg) : 0;
814}
815
816static int
817coro_sigelem_set (pTHX_ SV *sv, MAGIC *mg)
818{
819 const char *s = MgPV_nolen_const (mg);
820
821 if (*s == '_')
822 {
823 SV **svp = 0;
824
825 if (strEQ (s, "__DIE__" )) svp = &PL_diehook;
826 if (strEQ (s, "__WARN__")) svp = &PL_warnhook;
827
828 if (svp)
829 {
830 SV *old = *svp;
831 *svp = newSVsv (sv);
832 SvREFCNT_dec (old);
833 return 0;
834 }
835 }
836
837 return orig_sigelem_set ? orig_sigelem_set (aTHX_ sv, mg) : 0;
838}
839
840static void
841prepare_nop (pTHX_ struct coro_transfer_args *ta)
842{
843 /* kind of mega-hacky, but works */
844 ta->next = ta->prev = (struct coro *)ta;
845}
846
847static int
848slf_check_nop (pTHX_ struct CoroSLF *frame)
849{
850 return 0;
851}
852
853static int
854slf_check_repeat (pTHX_ struct CoroSLF *frame)
855{
856 return 1;
857}
858
859static UNOP coro_setup_op;
860
861static void NOINLINE /* noinline to keep it out of the transfer fast path */
862coro_setup (pTHX_ struct coro *coro)
863{
864 /*
865 * emulate part of the perl startup here.
866 */
867 coro_init_stacks (aTHX);
868
869 PL_runops = RUNOPS_DEFAULT;
870 PL_curcop = &PL_compiling;
871 PL_in_eval = EVAL_NULL;
872 PL_comppad = 0;
873 PL_comppad_name = 0;
874 PL_comppad_name_fill = 0;
875 PL_comppad_name_floor = 0;
876 PL_curpm = 0;
877 PL_curpad = 0;
878 PL_localizing = 0;
879 PL_dirty = 0;
880 PL_restartop = 0;
881#if PERL_VERSION_ATLEAST (5,10,0)
882 PL_parser = 0;
883#endif
884 PL_hints = 0;
885
886 /* recreate the die/warn hooks */
887 PL_diehook = 0; SvSetMagicSV (*hv_fetch (hv_sig, "__DIE__" , sizeof ("__DIE__" ) - 1, 1), rv_diehook );
888 PL_warnhook = 0; SvSetMagicSV (*hv_fetch (hv_sig, "__WARN__", sizeof ("__WARN__") - 1, 1), rv_warnhook);
889
890 GvSV (PL_defgv) = newSV (0);
891 GvAV (PL_defgv) = coro->args; coro->args = 0;
892 GvSV (PL_errgv) = newSV (0);
893 GvSV (irsgv) = newSVpvn ("\n", 1); sv_magic (GvSV (irsgv), (SV *)irsgv, PERL_MAGIC_sv, "/", 0);
894 GvHV (PL_hintgv) = 0;
895 PL_rs = newSVsv (GvSV (irsgv));
896 PL_defoutgv = (GV *)SvREFCNT_inc_NN (stdoutgv);
897
898 {
899 dSP;
900 UNOP myop;
901
902 Zero (&myop, 1, UNOP);
903 myop.op_next = Nullop;
904 myop.op_type = OP_ENTERSUB;
905 myop.op_flags = OPf_WANT_VOID;
906
907 PUSHMARK (SP);
908 PUSHs ((SV *)coro->startcv);
909 PUTBACK;
910 PL_op = (OP *)&myop;
911 PL_op = PL_ppaddr[OP_ENTERSUB](aTHX);
912 }
913
914 /* this newly created coroutine might be run on an existing cctx which most
915 * likely was suspended in pp_slf, so we have to emulate entering pp_slf here.
916 */
917 slf_frame.prepare = prepare_nop; /* provide a nop function for an eventual pp_slf */
918 slf_frame.check = slf_check_nop; /* signal pp_slf to not repeat */
919
920 /* and we have to provide the pp_slf op in any case, so pp_slf can skip it */
921 coro_setup_op.op_next = PL_op;
922 coro_setup_op.op_type = OP_ENTERSUB;
923 coro_setup_op.op_ppaddr = pp_slf;
924 /* no flags etc. required, as an init function won't be called */
925
926 PL_op = (OP *)&coro_setup_op;
927
928 /* copy throw, in case it was set before coro_setup */
929 CORO_THROW = coro->except;
930}
931
932static void
933coro_destruct (pTHX_ struct coro *coro)
934{
935 if (!IN_DESTRUCT)
936 {
937 /* restore all saved variables and stuff */
938 LEAVE_SCOPE (0);
939 assert (PL_tmps_floor == -1);
940
941 /* free all temporaries */
942 FREETMPS;
943 assert (PL_tmps_ix == -1);
944
945 /* unwind all extra stacks */
946 POPSTACK_TO (PL_mainstack);
947
948 /* unwind main stack */
949 dounwind (-1);
950 }
951
952 SvREFCNT_dec (GvSV (PL_defgv));
953 SvREFCNT_dec (GvAV (PL_defgv));
954 SvREFCNT_dec (GvSV (PL_errgv));
955 SvREFCNT_dec (PL_defoutgv);
956 SvREFCNT_dec (PL_rs);
957 SvREFCNT_dec (GvSV (irsgv));
958 SvREFCNT_dec (GvHV (PL_hintgv));
959
960 SvREFCNT_dec (PL_diehook);
961 SvREFCNT_dec (PL_warnhook);
962
963 SvREFCNT_dec (coro->saved_deffh);
964 SvREFCNT_dec (coro->rouse_cb);
965 SvREFCNT_dec (coro->invoke_cb);
966 SvREFCNT_dec (coro->invoke_av);
967
968 coro_destruct_stacks (aTHX);
969}
970
971INLINE void
972free_coro_mortal (pTHX)
973{
974 if (expect_true (coro_mortal))
975 {
976 SvREFCNT_dec (coro_mortal);
977 coro_mortal = 0;
978 }
979}
980
981static int
982runops_trace (pTHX)
983{
984 COP *oldcop = 0;
985 int oldcxix = -2;
986
987 while ((PL_op = CALL_FPTR (PL_op->op_ppaddr) (aTHX)))
988 {
989 PERL_ASYNC_CHECK ();
990
991 if (cctx_current->flags & CC_TRACE_ALL)
992 {
993 if (PL_op->op_type == OP_LEAVESUB && cctx_current->flags & CC_TRACE_SUB)
994 {
995 PERL_CONTEXT *cx = &cxstack[cxstack_ix];
996 SV **bot, **top;
997 AV *av = newAV (); /* return values */
998 SV **cb;
999 dSP;
1000
1001 GV *gv = CvGV (cx->blk_sub.cv);
1002 SV *fullname = sv_2mortal (newSV (0));
1003 if (isGV (gv))
1004 gv_efullname3 (fullname, gv, 0);
1005
1006 bot = PL_stack_base + cx->blk_oldsp + 1;
1007 top = cx->blk_gimme == G_ARRAY ? SP + 1
1008 : cx->blk_gimme == G_SCALAR ? bot + 1
1009 : bot;
1010
1011 av_extend (av, top - bot);
1012 while (bot < top)
1013 av_push (av, SvREFCNT_inc_NN (*bot++));
1014
1015 PL_runops = RUNOPS_DEFAULT;
1016 ENTER;
1017 SAVETMPS;
1018 EXTEND (SP, 3);
1019 PUSHMARK (SP);
1020 PUSHs (&PL_sv_no);
1021 PUSHs (fullname);
1022 PUSHs (sv_2mortal (newRV_noinc ((SV *)av)));
1023 PUTBACK;
1024 cb = hv_fetch ((HV *)SvRV (coro_current), "_trace_sub_cb", sizeof ("_trace_sub_cb") - 1, 0);
1025 if (cb) call_sv (*cb, G_KEEPERR | G_EVAL | G_VOID | G_DISCARD);
1026 SPAGAIN;
1027 FREETMPS;
1028 LEAVE;
1029 PL_runops = runops_trace;
1030 }
1031
1032 if (oldcop != PL_curcop)
1033 {
1034 oldcop = PL_curcop;
1035
1036 if (PL_curcop != &PL_compiling)
1037 {
1038 SV **cb;
1039
1040 if (oldcxix != cxstack_ix && cctx_current->flags & CC_TRACE_SUB)
1041 {
1042 PERL_CONTEXT *cx = &cxstack[cxstack_ix];
1043
1044 if (CxTYPE (cx) == CXt_SUB && oldcxix < cxstack_ix)
1045 {
1046 dSP;
1047 GV *gv = CvGV (cx->blk_sub.cv);
1048 SV *fullname = sv_2mortal (newSV (0));
1049
1050 if (isGV (gv))
1051 gv_efullname3 (fullname, gv, 0);
1052
1053 PL_runops = RUNOPS_DEFAULT;
1054 ENTER;
1055 SAVETMPS;
1056 EXTEND (SP, 3);
1057 PUSHMARK (SP);
1058 PUSHs (&PL_sv_yes);
1059 PUSHs (fullname);
1060 PUSHs (CxHASARGS (cx) ? sv_2mortal (newRV_inc ((SV *)cx->blk_sub.argarray)) : &PL_sv_undef);
1061 PUTBACK;
1062 cb = hv_fetch ((HV *)SvRV (coro_current), "_trace_sub_cb", sizeof ("_trace_sub_cb") - 1, 0);
1063 if (cb) call_sv (*cb, G_KEEPERR | G_EVAL | G_VOID | G_DISCARD);
1064 SPAGAIN;
1065 FREETMPS;
1066 LEAVE;
1067 PL_runops = runops_trace;
1068 }
1069
1070 oldcxix = cxstack_ix;
1071 }
1072
1073 if (cctx_current->flags & CC_TRACE_LINE)
1074 {
1075 dSP;
1076
1077 PL_runops = RUNOPS_DEFAULT;
1078 ENTER;
1079 SAVETMPS;
1080 EXTEND (SP, 3);
1081 PL_runops = RUNOPS_DEFAULT;
1082 PUSHMARK (SP);
1083 PUSHs (sv_2mortal (newSVpv (OutCopFILE (oldcop), 0)));
1084 PUSHs (sv_2mortal (newSViv (CopLINE (oldcop))));
1085 PUTBACK;
1086 cb = hv_fetch ((HV *)SvRV (coro_current), "_trace_line_cb", sizeof ("_trace_line_cb") - 1, 0);
1087 if (cb) call_sv (*cb, G_KEEPERR | G_EVAL | G_VOID | G_DISCARD);
1088 SPAGAIN;
1089 FREETMPS;
1090 LEAVE;
1091 PL_runops = runops_trace;
1092 }
1093 }
1094 }
1095 }
1096 }
1097
1098 TAINT_NOT;
1099 return 0;
1100}
1101
1102static struct CoroSLF cctx_ssl_frame;
1103
1104static void
1105slf_prepare_set_stacklevel (pTHX_ struct coro_transfer_args *ta)
1106{
1107 ta->prev = 0;
1108}
1109
1110static int
1111slf_check_set_stacklevel (pTHX_ struct CoroSLF *frame)
1112{
1113 *frame = cctx_ssl_frame;
1114
1115 return frame->check (aTHX_ frame); /* execute the restored frame - there must be one */
1116}
1117
1118/* initialises PL_top_env and injects a pseudo-slf-call to set the stacklevel */
1119static void NOINLINE
1120cctx_prepare (pTHX)
1121{
1122 PL_top_env = &PL_start_env;
1123
1124 if (cctx_current->flags & CC_TRACE)
1125 PL_runops = runops_trace;
1126
1127 /* we already must be executing an SLF op, there is no other valid way
1128 * that can lead to creation of a new cctx */
1129 assert (("FATAL: can't prepare slf-less cctx in Coro module (please report)",
1130 slf_frame.prepare && PL_op->op_ppaddr == pp_slf));
1131
1132 /* we must emulate leaving pp_slf, which is done inside slf_check_set_stacklevel */
1133 cctx_ssl_frame = slf_frame;
1134
1135 slf_frame.prepare = slf_prepare_set_stacklevel;
1136 slf_frame.check = slf_check_set_stacklevel;
1137}
1138
1139/* the tail of transfer: execute stuff we can only do after a transfer */
1140INLINE void
1141transfer_tail (pTHX)
1142{
1143 free_coro_mortal (aTHX);
1144}
1145
1146/*
1147 * this is a _very_ stripped down perl interpreter ;)
1148 */
1149static void
1150cctx_run (void *arg)
1151{
1152#ifdef USE_ITHREADS
1153# if CORO_PTHREAD
1154 PERL_SET_CONTEXT (coro_thx);
1155# endif
1156#endif
1157 {
1158 dTHX;
1159
1160 /* normally we would need to skip the entersub here */
1161 /* not doing so will re-execute it, which is exactly what we want */
1162 /* PL_nop = PL_nop->op_next */
1163
1164 /* inject a fake subroutine call to cctx_init */
1165 cctx_prepare (aTHX);
1166
1167 /* cctx_run is the alternative tail of transfer() */
1168 transfer_tail (aTHX);
1169
1170 /* somebody or something will hit me for both perl_run and PL_restartop */
1171 PL_restartop = PL_op;
1172 perl_run (PL_curinterp);
1173 /*
1174 * Unfortunately, there is no way to get at the return values of the
1175 * coro body here, as perl_run destroys these
1176 */
1177
1178 /*
1179 * If perl-run returns we assume exit() was being called or the coro
1180 * fell off the end, which seems to be the only valid (non-bug)
1181 * reason for perl_run to return. We try to exit by jumping to the
1182 * bootstrap-time "top" top_env, as we cannot restore the "main"
1183 * coroutine as Coro has no such concept.
1184 * This actually isn't valid with the pthread backend, but OSes requiring
1185 * that backend are too broken to do it in a standards-compliant way.
1186 */
1187 PL_top_env = main_top_env;
1188 JMPENV_JUMP (2); /* I do not feel well about the hardcoded 2 at all */
1189 }
1190}
1191
1192static coro_cctx *
1193cctx_new ()
1194{
1195 coro_cctx *cctx;
1196
1197 ++cctx_count;
1198 New (0, cctx, 1, coro_cctx);
1199
1200 cctx->gen = cctx_gen;
1201 cctx->flags = 0;
1202 cctx->idle_sp = 0; /* can be accessed by transfer between cctx_run and set_stacklevel, on throw */
1203
1204 return cctx;
1205}
1206
1207/* create a new cctx only suitable as source */
1208static coro_cctx *
1209cctx_new_empty ()
1210{
1211 coro_cctx *cctx = cctx_new ();
1212
1213 cctx->sptr = 0;
1214 coro_create (&cctx->cctx, 0, 0, 0, 0);
1215
1216 return cctx;
1217}
1218
1219/* create a new cctx suitable as destination/running a perl interpreter */
1220static coro_cctx *
1221cctx_new_run ()
1222{
1223 coro_cctx *cctx = cctx_new ();
1224 void *stack_start;
1225 size_t stack_size;
1226
1227#if HAVE_MMAP
1228 cctx->ssize = ((cctx_stacksize * sizeof (long) + PAGESIZE - 1) / PAGESIZE + CORO_STACKGUARD) * PAGESIZE;
1229 /* mmap supposedly does allocate-on-write for us */
1230 cctx->sptr = mmap (0, cctx->ssize, PROT_EXEC|PROT_READ|PROT_WRITE, MAP_PRIVATE|MAP_ANONYMOUS, 0, 0);
1231
1232 if (cctx->sptr != (void *)-1)
1233 {
1234 #if CORO_STACKGUARD
1235 mprotect (cctx->sptr, CORO_STACKGUARD * PAGESIZE, PROT_NONE);
1236 #endif
1237 stack_start = (char *)cctx->sptr + CORO_STACKGUARD * PAGESIZE;
1238 stack_size = cctx->ssize - CORO_STACKGUARD * PAGESIZE;
1239 cctx->flags |= CC_MAPPED;
1240 }
1241 else
1242#endif
1243 {
1244 cctx->ssize = cctx_stacksize * (long)sizeof (long);
1245 New (0, cctx->sptr, cctx_stacksize, long);
1246
1247 if (!cctx->sptr)
1248 {
1249 perror ("FATAL: unable to allocate stack for coroutine, exiting.");
1250 _exit (EXIT_FAILURE);
1251 }
1252
1253 stack_start = cctx->sptr;
1254 stack_size = cctx->ssize;
1255 }
1256
1257 #if CORO_USE_VALGRIND
1258 cctx->valgrind_id = VALGRIND_STACK_REGISTER ((char *)stack_start, (char *)stack_start + stack_size);
1259 #endif
1260
1261 coro_create (&cctx->cctx, cctx_run, (void *)cctx, stack_start, stack_size);
1262
1263 return cctx;
1264}
1265
1266static void
1267cctx_destroy (coro_cctx *cctx)
1268{
1269 if (!cctx)
1270 return;
1271
1272 assert (cctx != cctx_current);//D temporary
1273
1274 --cctx_count;
1275 coro_destroy (&cctx->cctx);
1276
1277 /* coro_transfer creates new, empty cctx's */
1278 if (cctx->sptr)
1279 {
1280 #if CORO_USE_VALGRIND
1281 VALGRIND_STACK_DEREGISTER (cctx->valgrind_id);
1282 #endif
1283
1284#if HAVE_MMAP
1285 if (cctx->flags & CC_MAPPED)
1286 munmap (cctx->sptr, cctx->ssize);
1287 else
1288#endif
1289 Safefree (cctx->sptr);
1290 }
1291
1292 Safefree (cctx);
1293}
1294
1295/* wether this cctx should be destructed */
1296#define CCTX_EXPIRED(cctx) ((cctx)->gen != cctx_gen || ((cctx)->flags & CC_NOREUSE))
1297
1298static coro_cctx *
1299cctx_get (pTHX)
1300{
1301 while (expect_true (cctx_first))
1302 {
1303 coro_cctx *cctx = cctx_first;
1304 cctx_first = cctx->next;
1305 --cctx_idle;
1306
1307 if (expect_true (!CCTX_EXPIRED (cctx)))
1308 return cctx;
1309
1310 cctx_destroy (cctx);
1311 }
1312
1313 return cctx_new_run ();
1314}
1315
1316static void
1317cctx_put (coro_cctx *cctx)
1318{
1319 assert (("FATAL: cctx_put called on non-initialised cctx in Coro (please report)", cctx->sptr));
1320
1321 /* free another cctx if overlimit */
1322 if (expect_false (cctx_idle >= cctx_max_idle))
1323 {
1324 coro_cctx *first = cctx_first;
1325 cctx_first = first->next;
1326 --cctx_idle;
1327
1328 cctx_destroy (first);
1329 }
1330
1331 ++cctx_idle;
1332 cctx->next = cctx_first;
1333 cctx_first = cctx;
1334}
1335
1336/** coroutine switching *****************************************************/
1337
1338static void
1339transfer_check (pTHX_ struct coro *prev, struct coro *next)
1340{
1341 /* TODO: throwing up here is considered harmful */
1342
1343 if (expect_true (prev != next))
1344 {
1345 if (expect_false (!(prev->flags & (CF_RUNNING | CF_NEW))))
1346 croak ("Coro::State::transfer called with a suspended prev Coro::State, but can only transfer from running or new states,");
1347
1348 if (expect_false (next->flags & CF_RUNNING))
1349 croak ("Coro::State::transfer called with running next Coro::State, but can only transfer to inactive states,");
1350
1351 if (expect_false (next->flags & CF_DESTROYED))
1352 croak ("Coro::State::transfer called with destroyed next Coro::State, but can only transfer to inactive states,");
1353
1354#if !PERL_VERSION_ATLEAST (5,10,0)
1355 if (expect_false (PL_lex_state != LEX_NOTPARSING))
1356 croak ("Coro::State::transfer called while parsing, but this is not supported in your perl version,");
1357#endif
1358 }
1359}
1360
1361/* always use the TRANSFER macro */
1362static void NOINLINE /* noinline so we have a fixed stackframe */
1363transfer (pTHX_ struct coro *prev, struct coro *next, int force_cctx)
1364{
1365 dSTACKLEVEL;
1366
1367 /* sometimes transfer is only called to set idle_sp */
1368 if (expect_false (!prev))
1369 {
1370 cctx_current->idle_sp = STACKLEVEL;
1371 assert (cctx_current->idle_te = PL_top_env); /* just for the side-effect when asserts are enabled */
1372 }
1373 else if (expect_true (prev != next))
1374 {
1375 coro_cctx *cctx_prev;
1376
1377 if (expect_false (prev->flags & CF_NEW))
1378 {
1379 /* create a new empty/source context */
1380 prev->flags &= ~CF_NEW;
1381 prev->flags |= CF_RUNNING;
1382 }
1383
1384 prev->flags &= ~CF_RUNNING;
1385 next->flags |= CF_RUNNING;
1386
1387 /* first get rid of the old state */
1388 save_perl (aTHX_ prev);
1389
1390 if (expect_false (next->flags & CF_NEW))
1391 {
1392 /* need to start coroutine */
1393 next->flags &= ~CF_NEW;
1394 /* setup coroutine call */
1395 coro_setup (aTHX_ next);
1396 }
1397 else
1398 load_perl (aTHX_ next);
1399
1400 assert (!prev->cctx);//D temporary
1401
1402 /* possibly untie and reuse the cctx */
1403 if (expect_true (
1404 cctx_current->idle_sp == STACKLEVEL
1405 && !(cctx_current->flags & CC_TRACE)
1406 && !force_cctx
1407 ))
1408 {
1409 /* I assume that stacklevel is a stronger indicator than PL_top_env changes */
1410 assert (("FATAL: current top_env must equal previous top_env in Coro (please report)", PL_top_env == cctx_current->idle_te));
1411
1412 /* if the cctx is about to be destroyed we need to make sure we won't see it in cctx_get. */
1413 /* without this the next cctx_get might destroy the running cctx while still in use */
1414 if (expect_false (CCTX_EXPIRED (cctx_current)))
1415 if (expect_true (!next->cctx))
1416 next->cctx = cctx_get (aTHX);
1417
1418 cctx_put (cctx_current);
1419 }
1420 else
1421 prev->cctx = cctx_current;
1422
1423 ++next->usecount;
1424
1425 cctx_prev = cctx_current;
1426 cctx_current = expect_false (next->cctx) ? next->cctx : cctx_get (aTHX);
1427
1428 next->cctx = 0;
1429
1430 if (expect_false (cctx_prev != cctx_current))
1431 {
1432 cctx_prev->top_env = PL_top_env;
1433 PL_top_env = cctx_current->top_env;
1434 coro_transfer (&cctx_prev->cctx, &cctx_current->cctx);
1435 }
1436
1437 transfer_tail (aTHX);
1438 }
1439}
1440
1441#define TRANSFER(ta, force_cctx) transfer (aTHX_ (ta).prev, (ta).next, (force_cctx))
1442#define TRANSFER_CHECK(ta) transfer_check (aTHX_ (ta).prev, (ta).next)
1443
1444/** high level stuff ********************************************************/
1445
1446static int
1447coro_state_destroy (pTHX_ struct coro *coro)
1448{
1449 if (coro->flags & CF_DESTROYED)
1450 return 0;
1451
1452 if (coro->on_destroy)
1453 coro->on_destroy (aTHX_ coro);
1454
1455 coro->flags |= CF_DESTROYED;
1456
1457 if (coro->flags & CF_READY)
1458 {
1459 /* reduce nready, as destroying a ready coro effectively unreadies it */
1460 /* alternative: look through all ready queues and remove the coro */
1461 --coro_nready;
1462 }
1463 else
1464 coro->flags |= CF_READY; /* make sure it is NOT put into the readyqueue */
1465
1466 if (coro->mainstack && coro->mainstack != main_mainstack)
1467 {
1468 struct coro temp;
1469
1470 assert (("FATAL: tried to destroy currently running coroutine (please report)", !(coro->flags & CF_RUNNING)));
1471
1472 save_perl (aTHX_ &temp);
1473 load_perl (aTHX_ coro);
1474
1475 coro_destruct (aTHX_ coro);
1476
1477 load_perl (aTHX_ &temp);
1478
1479 coro->slot = 0;
1480 }
1481
1482 cctx_destroy (coro->cctx);
1483 SvREFCNT_dec (coro->startcv);
1484 SvREFCNT_dec (coro->args);
1485 SvREFCNT_dec (CORO_THROW);
1486
1487 if (coro->next) coro->next->prev = coro->prev;
1488 if (coro->prev) coro->prev->next = coro->next;
1489 if (coro == coro_first) coro_first = coro->next;
1490
1491 return 1;
1492}
1493
1494static int
1495coro_state_free (pTHX_ SV *sv, MAGIC *mg)
1496{
1497 struct coro *coro = (struct coro *)mg->mg_ptr;
1498 mg->mg_ptr = 0;
1499
1500 coro->hv = 0;
1501
1502 if (--coro->refcnt < 0)
1503 {
1504 coro_state_destroy (aTHX_ coro);
1505 Safefree (coro);
1506 }
1507
1508 return 0;
1509}
1510
1511static int
1512coro_state_dup (pTHX_ MAGIC *mg, CLONE_PARAMS *params)
1513{
1514 struct coro *coro = (struct coro *)mg->mg_ptr;
1515
1516 ++coro->refcnt;
1517
1518 return 0;
1519}
1520
1521static MGVTBL coro_state_vtbl = {
1522 0, 0, 0, 0,
1523 coro_state_free,
1524 0,
1525#ifdef MGf_DUP
1526 coro_state_dup,
1527#else
1528# define MGf_DUP 0
1529#endif
1530};
1531
1532static void
1533prepare_transfer (pTHX_ struct coro_transfer_args *ta, SV *prev_sv, SV *next_sv)
1534{
1535 ta->prev = SvSTATE (prev_sv);
1536 ta->next = SvSTATE (next_sv);
1537 TRANSFER_CHECK (*ta);
1538}
1539
1540static void
1541api_transfer (pTHX_ SV *prev_sv, SV *next_sv)
1542{
1543 struct coro_transfer_args ta;
1544
1545 prepare_transfer (aTHX_ &ta, prev_sv, next_sv);
1546 TRANSFER (ta, 1);
1547}
1548
1549/*****************************************************************************/
1550/* gensub: simple closure generation utility */
1551
1552#define GENSUB_ARG CvXSUBANY (cv).any_ptr
1553
1554/* create a closure from XS, returns a code reference */
1555/* the arg can be accessed via GENSUB_ARG from the callback */
1556/* the callback must use dXSARGS/XSRETURN */
1557static SV *
1558gensub (pTHX_ void (*xsub)(pTHX_ CV *), void *arg)
1559{
1560 CV *cv = (CV *)newSV (0);
1561
1562 sv_upgrade ((SV *)cv, SVt_PVCV);
1563
1564 CvANON_on (cv);
1565 CvISXSUB_on (cv);
1566 CvXSUB (cv) = xsub;
1567 GENSUB_ARG = arg;
1568
1569 return newRV_noinc ((SV *)cv);
1570}
1571
1572/** Coro ********************************************************************/
1573
1574INLINE void
1575coro_enq (pTHX_ struct coro *coro)
1576{
1577 av_push (coro_ready [coro->prio - PRIO_MIN], SvREFCNT_inc_NN (coro->hv));
1578}
1579
1580INLINE SV *
1581coro_deq (pTHX)
1582{
1583 int prio;
1584
1585 for (prio = PRIO_MAX - PRIO_MIN + 1; --prio >= 0; )
1586 if (AvFILLp (coro_ready [prio]) >= 0)
1587 return av_shift (coro_ready [prio]);
1588
1589 return 0;
1590}
1591
1592static int
1593api_ready (pTHX_ SV *coro_sv)
1594{
1595 struct coro *coro;
1596 SV *sv_hook;
1597 void (*xs_hook)(void);
1598
1599 coro = SvSTATE (coro_sv);
1600
1601 if (coro->flags & CF_READY)
1602 return 0;
1603
1604 coro->flags |= CF_READY;
1605
1606 sv_hook = coro_nready ? 0 : coro_readyhook;
1607 xs_hook = coro_nready ? 0 : coroapi.readyhook;
1608
1609 coro_enq (aTHX_ coro);
1610 ++coro_nready;
1611
1612 if (sv_hook)
1613 {
1614 dSP;
1615
1616 ENTER;
1617 SAVETMPS;
1618
1619 PUSHMARK (SP);
1620 PUTBACK;
1621 call_sv (sv_hook, G_VOID | G_DISCARD);
1622
1623 FREETMPS;
1624 LEAVE;
1625 }
1626
1627 if (xs_hook)
1628 xs_hook ();
1629
1630 return 1;
1631}
1632
1633static int
1634api_is_ready (pTHX_ SV *coro_sv)
1635{
1636 return !!(SvSTATE (coro_sv)->flags & CF_READY);
1637}
1638
1639/* expects to own a reference to next->hv */
1640INLINE void
1641prepare_schedule_to (pTHX_ struct coro_transfer_args *ta, struct coro *next)
1642{
1643 SV *prev_sv = SvRV (coro_current);
1644
1645 ta->prev = SvSTATE_hv (prev_sv);
1646 ta->next = next;
1647
1648 TRANSFER_CHECK (*ta);
1649
1650 SvRV_set (coro_current, (SV *)next->hv);
1651
1652 free_coro_mortal (aTHX);
1653 coro_mortal = prev_sv;
1654}
1655
1656static void
1657prepare_schedule (pTHX_ struct coro_transfer_args *ta)
1658{
1659 for (;;)
1660 {
1661 SV *next_sv = coro_deq (aTHX);
1662
1663 if (expect_true (next_sv))
1664 {
1665 struct coro *next = SvSTATE_hv (next_sv);
1666
1667 /* cannot transfer to destroyed coros, skip and look for next */
1668 if (expect_false (next->flags & CF_DESTROYED))
1669 SvREFCNT_dec (next_sv); /* coro_nready has already been taken care of by destroy */
1670 else
1671 {
1672 next->flags &= ~CF_READY;
1673 --coro_nready;
1674
1675 prepare_schedule_to (aTHX_ ta, next);
1676 break;
1677 }
1678 }
1679 else
1680 {
1681 /* nothing to schedule: call the idle handler */
1682 if (SvROK (sv_idle)
1683 && SvOBJECT (SvRV (sv_idle)))
1684 {
1685 ++coro_nready; /* hack so that api_ready doesn't invoke ready hook */
1686 api_ready (aTHX_ SvRV (sv_idle));
1687 --coro_nready;
1688 }
1689 else
1690 {
1691 dSP;
1692
1693 ENTER;
1694 SAVETMPS;
1695
1696 PUSHMARK (SP);
1697 PUTBACK;
1698 call_sv (sv_idle, G_VOID | G_DISCARD);
1699
1700 FREETMPS;
1701 LEAVE;
1702 }
1703 }
1704 }
1705}
1706
1707INLINE void
1708prepare_cede (pTHX_ struct coro_transfer_args *ta)
1709{
1710 api_ready (aTHX_ coro_current);
1711 prepare_schedule (aTHX_ ta);
1712}
1713
1714INLINE void
1715prepare_cede_notself (pTHX_ struct coro_transfer_args *ta)
1716{
1717 SV *prev = SvRV (coro_current);
1718
1719 if (coro_nready)
1720 {
1721 prepare_schedule (aTHX_ ta);
1722 api_ready (aTHX_ prev);
1723 }
1724 else
1725 prepare_nop (aTHX_ ta);
1726}
1727
1728static void
1729api_schedule (pTHX)
1730{
1731 struct coro_transfer_args ta;
1732
1733 prepare_schedule (aTHX_ &ta);
1734 TRANSFER (ta, 1);
1735}
1736
1737static void
1738api_schedule_to (pTHX_ SV *coro_sv)
1739{
1740 struct coro_transfer_args ta;
1741 struct coro *next = SvSTATE (coro_sv);
1742
1743 SvREFCNT_inc_NN (coro_sv);
1744 prepare_schedule_to (aTHX_ &ta, next);
1745}
1746
1747static int
1748api_cede (pTHX)
1749{
1750 struct coro_transfer_args ta;
1751
1752 prepare_cede (aTHX_ &ta);
1753
1754 if (expect_true (ta.prev != ta.next))
1755 {
1756 TRANSFER (ta, 1);
1757 return 1;
1758 }
1759 else
1760 return 0;
1761}
1762
1763static int
1764api_cede_notself (pTHX)
1765{
1766 if (coro_nready)
1767 {
1768 struct coro_transfer_args ta;
1769
1770 prepare_cede_notself (aTHX_ &ta);
1771 TRANSFER (ta, 1);
1772 return 1;
1773 }
1774 else
1775 return 0;
1776}
1777
1778static void
1779api_trace (pTHX_ SV *coro_sv, int flags)
1780{
1781 struct coro *coro = SvSTATE (coro_sv);
1782
1783 if (coro->flags & CF_RUNNING)
1784 croak ("cannot enable tracing on a running coroutine, caught");
1785
1786 if (flags & CC_TRACE)
1787 {
1788 if (!coro->cctx)
1789 coro->cctx = cctx_new_run ();
1790 else if (!(coro->cctx->flags & CC_TRACE))
1791 croak ("cannot enable tracing on coroutine with custom stack, caught");
1792
1793 coro->cctx->flags |= CC_NOREUSE | (flags & (CC_TRACE | CC_TRACE_ALL));
1794 }
1795 else if (coro->cctx && coro->cctx->flags & CC_TRACE)
1796 {
1797 coro->cctx->flags &= ~(CC_TRACE | CC_TRACE_ALL);
1798
1799 if (coro->flags & CF_RUNNING)
1800 PL_runops = RUNOPS_DEFAULT;
1801 else
1802 coro->slot->runops = RUNOPS_DEFAULT;
1803 }
1804}
1805
1806static void
1807coro_call_on_destroy (pTHX_ struct coro *coro)
1808{
1809 SV **on_destroyp = hv_fetch (coro->hv, "_on_destroy", sizeof ("_on_destroy") - 1, 0);
1810 SV **statusp = hv_fetch (coro->hv, "_status", sizeof ("_status") - 1, 0);
1811
1812 if (on_destroyp)
1813 {
1814 AV *on_destroy = (AV *)SvRV (*on_destroyp);
1815
1816 while (AvFILLp (on_destroy) >= 0)
1817 {
1818 dSP; /* don't disturb outer sp */
1819 SV *cb = av_pop (on_destroy);
1820
1821 PUSHMARK (SP);
1822
1823 if (statusp)
1824 {
1825 int i;
1826 AV *status = (AV *)SvRV (*statusp);
1827 EXTEND (SP, AvFILLp (status) + 1);
1828
1829 for (i = 0; i <= AvFILLp (status); ++i)
1830 PUSHs (AvARRAY (status)[i]);
1831 }
1832
1833 PUTBACK;
1834 call_sv (sv_2mortal (cb), G_VOID | G_DISCARD);
1835 }
1836 }
1837}
1838
1839static void
1840slf_init_terminate (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
1841{
1842 int i;
1843 HV *hv = (HV *)SvRV (coro_current);
1844 AV *av = newAV ();
1845
1846 av_extend (av, items - 1);
1847 for (i = 0; i < items; ++i)
1848 av_push (av, SvREFCNT_inc_NN (arg [i]));
1849
1850 hv_store (hv, "_status", sizeof ("_status") - 1, newRV_noinc ((SV *)av), 0);
1851
1852 av_push (av_destroy, (SV *)newRV_inc ((SV *)hv)); /* RVinc for perl */
1853 api_ready (aTHX_ sv_manager);
1854
1855 frame->prepare = prepare_schedule;
1856 frame->check = slf_check_repeat;
1857}
1858
1859/*****************************************************************************/
1860/* async pool handler */
1861
1862static int
1863slf_check_pool_handler (pTHX_ struct CoroSLF *frame)
1864{
1865 HV *hv = (HV *)SvRV (coro_current);
1866 struct coro *coro = (struct coro *)frame->data;
1867
1868 if (!coro->invoke_cb)
1869 return 1; /* loop till we have invoke */
1870 else
1871 {
1872 hv_store (hv, "desc", sizeof ("desc") - 1,
1873 newSVpvn ("[async_pool]", sizeof ("[async_pool]") - 1), 0);
1874
1875 coro->saved_deffh = SvREFCNT_inc_NN ((SV *)PL_defoutgv);
1876
1877 {
1878 dSP;
1879 XPUSHs (sv_2mortal (coro->invoke_cb)); coro->invoke_cb = 0;
1880 PUTBACK;
1881 }
1882
1883 SvREFCNT_dec (GvAV (PL_defgv));
1884 GvAV (PL_defgv) = coro->invoke_av;
1885 coro->invoke_av = 0;
1886
1887 return 0;
1888 }
1889}
1890
1891static void
1892slf_init_pool_handler (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
1893{
1894 HV *hv = (HV *)SvRV (coro_current);
1895 struct coro *coro = SvSTATE_hv ((SV *)hv);
1896
1897 if (expect_true (coro->saved_deffh))
1898 {
1899 /* subsequent iteration */
1900 SvREFCNT_dec ((SV *)PL_defoutgv); PL_defoutgv = (GV *)coro->saved_deffh;
1901 coro->saved_deffh = 0;
1902
1903 if (coro_rss (aTHX_ coro) > SvUV (sv_pool_rss)
1904 || av_len (av_async_pool) + 1 >= SvIV (sv_pool_size))
1905 {
1906 coro->invoke_cb = SvREFCNT_inc_NN ((SV *)cv_coro_terminate);
1907 coro->invoke_av = newAV ();
1908
1909 frame->prepare = prepare_nop;
1910 }
1911 else
1912 {
1913 av_clear (GvAV (PL_defgv));
1914 hv_store (hv, "desc", sizeof ("desc") - 1, SvREFCNT_inc_NN (sv_async_pool_idle), 0);
1915
1916 coro->prio = 0;
1917
1918 if (coro->cctx && (coro->cctx->flags & CC_TRACE))
1919 api_trace (aTHX_ coro_current, 0);
1920
1921 frame->prepare = prepare_schedule;
1922 av_push (av_async_pool, SvREFCNT_inc (hv));
1923 }
1924 }
1925 else
1926 {
1927 /* first iteration, simply fall through */
1928 frame->prepare = prepare_nop;
1929 }
1930
1931 frame->check = slf_check_pool_handler;
1932 frame->data = (void *)coro;
1933}
1934
1935/*****************************************************************************/
1936/* rouse callback */
1937
1938#define CORO_MAGIC_type_rouse PERL_MAGIC_ext
1939
1940static void
1941coro_rouse_callback (pTHX_ CV *cv)
1942{
1943 dXSARGS;
1944 SV *data = (SV *)GENSUB_ARG;
1945
1946 if (SvTYPE (SvRV (data)) != SVt_PVAV)
1947 {
1948 /* first call, set args */
1949 AV *av = newAV ();
1950 SV *coro = SvRV (data);
1951
1952 SvRV_set (data, (SV *)av);
1953 api_ready (aTHX_ coro);
1954 SvREFCNT_dec (coro);
1955
1956 /* better take a full copy of the arguments */
1957 while (items--)
1958 av_store (av, items, newSVsv (ST (items)));
1959 }
1960
1961 XSRETURN_EMPTY;
1962}
1963
1964static int
1965slf_check_rouse_wait (pTHX_ struct CoroSLF *frame)
1966{
1967 SV *data = (SV *)frame->data;
1968
1969 if (CORO_THROW)
1970 return 0;
1971
1972 if (SvTYPE (SvRV (data)) != SVt_PVAV)
1973 return 1;
1974
1975 /* now push all results on the stack */
1976 {
1977 dSP;
1978 AV *av = (AV *)SvRV (data);
1979 int i;
1980
1981 EXTEND (SP, AvFILLp (av) + 1);
1982 for (i = 0; i <= AvFILLp (av); ++i)
1983 PUSHs (sv_2mortal (AvARRAY (av)[i]));
1984
1985 /* we have stolen the elements, so ste length to zero and free */
1986 AvFILLp (av) = -1;
1987 av_undef (av);
1988
1989 PUTBACK;
1990 }
1991
1992 return 0;
1993}
1994
1995static void
1996slf_init_rouse_wait (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
1997{
1998 SV *cb;
1999
2000 if (items)
2001 cb = arg [0];
2002 else
2003 {
2004 struct coro *coro = SvSTATE_current;
2005
2006 if (!coro->rouse_cb)
2007 croak ("Coro::rouse_wait called without rouse callback, and no default rouse callback found either,");
2008
2009 cb = sv_2mortal (coro->rouse_cb);
2010 coro->rouse_cb = 0;
2011 }
2012
2013 if (!SvROK (cb)
2014 || SvTYPE (SvRV (cb)) != SVt_PVCV
2015 || CvXSUB ((CV *)SvRV (cb)) != coro_rouse_callback)
2016 croak ("Coro::rouse_wait called with illegal callback argument,");
2017
2018 {
2019 CV *cv = (CV *)SvRV (cb); /* for GENSUB_ARG */
2020 SV *data = (SV *)GENSUB_ARG;
2021
2022 frame->data = (void *)data;
2023 frame->prepare = SvTYPE (SvRV (data)) == SVt_PVAV ? prepare_nop : prepare_schedule;
2024 frame->check = slf_check_rouse_wait;
2025 }
2026}
2027
2028static SV *
2029coro_new_rouse_cb (pTHX)
2030{
2031 HV *hv = (HV *)SvRV (coro_current);
2032 struct coro *coro = SvSTATE_hv (hv);
2033 SV *data = newRV_inc ((SV *)hv);
2034 SV *cb = gensub (aTHX_ coro_rouse_callback, (void *)data);
2035
2036 sv_magicext (SvRV (cb), data, CORO_MAGIC_type_rouse, 0, 0, 0);
2037 SvREFCNT_dec (data); /* magicext increases the refcount */
2038
2039 SvREFCNT_dec (coro->rouse_cb);
2040 coro->rouse_cb = SvREFCNT_inc_NN (cb);
2041
2042 return cb;
2043}
2044
2045/*****************************************************************************/
2046/* schedule-like-function opcode (SLF) */
2047
2048static UNOP slf_restore; /* restore stack as entersub did, for first-re-run */
2049static const CV *slf_cv;
2050static SV **slf_argv;
2051static int slf_argc, slf_arga; /* count, allocated */
2052static I32 slf_ax; /* top of stack, for restore */
2053
2054/* this restores the stack in the case we patched the entersub, to */
2055/* recreate the stack frame as perl will on following calls */
2056/* since entersub cleared the stack */
2057static OP *
2058pp_restore (pTHX)
2059{
2060 int i;
2061 SV **SP = PL_stack_base + slf_ax;
2062
2063 PUSHMARK (SP);
2064
2065 EXTEND (SP, slf_argc + 1);
2066
2067 for (i = 0; i < slf_argc; ++i)
2068 PUSHs (sv_2mortal (slf_argv [i]));
2069
2070 PUSHs ((SV *)CvGV (slf_cv));
2071
2072 RETURNOP (slf_restore.op_first);
2073}
2074
2075static void
2076slf_prepare_transfer (pTHX_ struct coro_transfer_args *ta)
2077{
2078 SV **arg = (SV **)slf_frame.data;
2079
2080 prepare_transfer (aTHX_ ta, arg [0], arg [1]);
2081}
2082
2083static void
2084slf_init_transfer (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2085{
2086 if (items != 2)
2087 croak ("Coro::State::transfer (prev, next) expects two arguments, not %d,", items);
2088
2089 frame->prepare = slf_prepare_transfer;
2090 frame->check = slf_check_nop;
2091 frame->data = (void *)arg; /* let's hope it will stay valid */
2092}
2093
2094static void
2095slf_init_schedule (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2096{
2097 frame->prepare = prepare_schedule;
2098 frame->check = slf_check_nop;
2099}
2100
2101static void
2102slf_prepare_schedule_to (pTHX_ struct coro_transfer_args *ta)
2103{
2104 struct coro *next = (struct coro *)slf_frame.data;
2105
2106 SvREFCNT_inc_NN (next->hv);
2107 prepare_schedule_to (aTHX_ ta, next);
2108}
2109
2110static void
2111slf_init_schedule_to (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2112{
2113 if (!items)
2114 croak ("Coro::schedule_to expects a coroutine argument, caught");
2115
2116 frame->data = (void *)SvSTATE (arg [0]);
2117 frame->prepare = slf_prepare_schedule_to;
2118 frame->check = slf_check_nop;
2119}
2120
2121static void
2122slf_init_cede_to (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2123{
2124 api_ready (aTHX_ SvRV (coro_current));
2125
2126 slf_init_schedule_to (aTHX_ frame, cv, arg, items);
2127}
2128
2129static void
2130slf_init_cede (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2131{
2132 frame->prepare = prepare_cede;
2133 frame->check = slf_check_nop;
2134}
2135
2136static void
2137slf_init_cede_notself (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2138{
2139 frame->prepare = prepare_cede_notself;
2140 frame->check = slf_check_nop;
2141}
2142
2143/*
2144 * these not obviously related functions are all rolled into one
2145 * function to increase chances that they all will call transfer with the same
2146 * stack offset
2147 * SLF stands for "schedule-like-function".
2148 */
2149static OP *
2150pp_slf (pTHX)
2151{
2152 I32 checkmark; /* mark SP to see how many elements check has pushed */
2153
2154 /* set up the slf frame, unless it has already been set-up */
2155 /* the latter happens when a new coro has been started */
2156 /* or when a new cctx was attached to an existing coroutine */
2157 if (expect_true (!slf_frame.prepare))
2158 {
2159 /* first iteration */
2160 dSP;
2161 SV **arg = PL_stack_base + TOPMARK + 1;
2162 int items = SP - arg; /* args without function object */
2163 SV *gv = *sp;
2164
2165 /* do a quick consistency check on the "function" object, and if it isn't */
2166 /* for us, divert to the real entersub */
2167 if (SvTYPE (gv) != SVt_PVGV
2168 || !GvCV (gv)
2169 || !(CvFLAGS (GvCV (gv)) & CVf_SLF))
2170 return PL_ppaddr[OP_ENTERSUB](aTHX);
2171
2172 if (!(PL_op->op_flags & OPf_STACKED))
2173 {
2174 /* ampersand-form of call, use @_ instead of stack */
2175 AV *av = GvAV (PL_defgv);
2176 arg = AvARRAY (av);
2177 items = AvFILLp (av) + 1;
2178 }
2179
2180 /* now call the init function, which needs to set up slf_frame */
2181 ((coro_slf_cb)CvXSUBANY (GvCV (gv)).any_ptr)
2182 (aTHX_ &slf_frame, GvCV (gv), arg, items);
2183
2184 /* pop args */
2185 SP = PL_stack_base + POPMARK;
2186
2187 PUTBACK;
2188 }
2189
2190 /* now that we have a slf_frame, interpret it! */
2191 /* we use a callback system not to make the code needlessly */
2192 /* complicated, but so we can run multiple perl coros from one cctx */
2193
2194 do
2195 {
2196 struct coro_transfer_args ta;
2197
2198 slf_frame.prepare (aTHX_ &ta);
2199 TRANSFER (ta, 0);
2200
2201 checkmark = PL_stack_sp - PL_stack_base;
2202 }
2203 while (slf_frame.check (aTHX_ &slf_frame));
2204
2205 slf_frame.prepare = 0; /* invalidate the frame, we are done processing it */
2206
2207 /* exception handling */
2208 if (expect_false (CORO_THROW))
2209 {
2210 SV *exception = sv_2mortal (CORO_THROW);
2211
2212 CORO_THROW = 0;
2213 sv_setsv (ERRSV, exception);
2214 croak (0);
2215 }
2216
2217 /* return value handling - mostly like entersub */
2218 /* make sure we put something on the stack in scalar context */
2219 if (GIMME_V == G_SCALAR)
2220 {
2221 dSP;
2222 SV **bot = PL_stack_base + checkmark;
2223
2224 if (sp == bot) /* too few, push undef */
2225 bot [1] = &PL_sv_undef;
2226 else if (sp != bot + 1) /* too many, take last one */
2227 bot [1] = *sp;
2228
2229 SP = bot + 1;
2230
2231 PUTBACK;
2232 }
2233
2234 return NORMAL;
2235}
2236
2237static void
2238api_execute_slf (pTHX_ CV *cv, coro_slf_cb init_cb, I32 ax)
2239{
2240 int i;
2241 SV **arg = PL_stack_base + ax;
2242 int items = PL_stack_sp - arg + 1;
2243
2244 assert (("FATAL: SLF call with illegal CV value", !CvANON (cv)));
2245
2246 if (PL_op->op_ppaddr != PL_ppaddr [OP_ENTERSUB]
2247 && PL_op->op_ppaddr != pp_slf)
2248 croak ("FATAL: Coro SLF calls can only be made normally, not via goto or any other means, caught");
2249
2250 CvFLAGS (cv) |= CVf_SLF;
2251 CvXSUBANY (cv).any_ptr = (void *)init_cb;
2252 slf_cv = cv;
2253
2254 /* we patch the op, and then re-run the whole call */
2255 /* we have to put the same argument on the stack for this to work */
2256 /* and this will be done by pp_restore */
2257 slf_restore.op_next = (OP *)&slf_restore;
2258 slf_restore.op_type = OP_CUSTOM;
2259 slf_restore.op_ppaddr = pp_restore;
2260 slf_restore.op_first = PL_op;
2261
2262 slf_ax = ax - 1; /* undo the ax++ inside dAXMARK */
2263
2264 if (PL_op->op_flags & OPf_STACKED)
2265 {
2266 if (items > slf_arga)
2267 {
2268 slf_arga = items;
2269 free (slf_argv);
2270 slf_argv = malloc (slf_arga * sizeof (SV *));
2271 }
2272
2273 slf_argc = items;
2274
2275 for (i = 0; i < items; ++i)
2276 slf_argv [i] = SvREFCNT_inc (arg [i]);
2277 }
2278 else
2279 slf_argc = 0;
2280
2281 PL_op->op_ppaddr = pp_slf;
2282 /*PL_op->op_type = OP_CUSTOM; /* we do behave like entersub still */
2283
2284 PL_op = (OP *)&slf_restore;
2285}
2286
2287/*****************************************************************************/
2288/* PerlIO::cede */
2289
2290typedef struct
2291{
2292 PerlIOBuf base;
2293 NV next, every;
2294} PerlIOCede;
2295
2296static IV
2297PerlIOCede_pushed (pTHX_ PerlIO *f, const char *mode, SV *arg, PerlIO_funcs *tab)
2298{
2299 PerlIOCede *self = PerlIOSelf (f, PerlIOCede);
2300
2301 self->every = SvCUR (arg) ? SvNV (arg) : 0.01;
2302 self->next = nvtime () + self->every;
2303
2304 return PerlIOBuf_pushed (aTHX_ f, mode, Nullsv, tab);
2305}
2306
2307static SV *
2308PerlIOCede_getarg (pTHX_ PerlIO *f, CLONE_PARAMS *param, int flags)
2309{
2310 PerlIOCede *self = PerlIOSelf (f, PerlIOCede);
2311
2312 return newSVnv (self->every);
2313}
2314
2315static IV
2316PerlIOCede_flush (pTHX_ PerlIO *f)
2317{
2318 PerlIOCede *self = PerlIOSelf (f, PerlIOCede);
2319 double now = nvtime ();
2320
2321 if (now >= self->next)
2322 {
2323 api_cede (aTHX);
2324 self->next = now + self->every;
2325 }
2326
2327 return PerlIOBuf_flush (aTHX_ f);
2328}
2329
2330static PerlIO_funcs PerlIO_cede =
2331{
2332 sizeof(PerlIO_funcs),
2333 "cede",
2334 sizeof(PerlIOCede),
2335 PERLIO_K_DESTRUCT | PERLIO_K_RAW,
2336 PerlIOCede_pushed,
2337 PerlIOBuf_popped,
2338 PerlIOBuf_open,
2339 PerlIOBase_binmode,
2340 PerlIOCede_getarg,
2341 PerlIOBase_fileno,
2342 PerlIOBuf_dup,
2343 PerlIOBuf_read,
2344 PerlIOBuf_unread,
2345 PerlIOBuf_write,
2346 PerlIOBuf_seek,
2347 PerlIOBuf_tell,
2348 PerlIOBuf_close,
2349 PerlIOCede_flush,
2350 PerlIOBuf_fill,
2351 PerlIOBase_eof,
2352 PerlIOBase_error,
2353 PerlIOBase_clearerr,
2354 PerlIOBase_setlinebuf,
2355 PerlIOBuf_get_base,
2356 PerlIOBuf_bufsiz,
2357 PerlIOBuf_get_ptr,
2358 PerlIOBuf_get_cnt,
2359 PerlIOBuf_set_ptrcnt,
2360};
2361
2362/*****************************************************************************/
2363/* Coro::Semaphore & Coro::Signal */
2364
2365static SV *
2366coro_waitarray_new (pTHX_ int count)
2367{
2368 /* a semaphore contains a counter IV in $sem->[0] and any waiters after that */
2369 AV *av = newAV ();
2370 SV **ary;
2371
2372 /* unfortunately, building manually saves memory */
2373 Newx (ary, 2, SV *);
2374 AvALLOC (av) = ary;
2375 /*AvARRAY (av) = ary;*/
2376 SvPVX ((SV *)av) = (char *)ary; /* 5.8.8 needs this syntax instead of AvARRAY = ary */
2377 AvMAX (av) = 1;
2378 AvFILLp (av) = 0;
2379 ary [0] = newSViv (count);
2380
2381 return newRV_noinc ((SV *)av);
2382}
2383
2384/* semaphore */
2385
2386static void
2387coro_semaphore_adjust (pTHX_ AV *av, IV adjust)
2388{
2389 SV *count_sv = AvARRAY (av)[0];
2390 IV count = SvIVX (count_sv);
2391
2392 count += adjust;
2393 SvIVX (count_sv) = count;
2394
2395 /* now wake up as many waiters as are expected to lock */
2396 while (count > 0 && AvFILLp (av) > 0)
2397 {
2398 SV *cb;
2399
2400 /* swap first two elements so we can shift a waiter */
2401 AvARRAY (av)[0] = AvARRAY (av)[1];
2402 AvARRAY (av)[1] = count_sv;
2403 cb = av_shift (av);
2404
2405 if (SvOBJECT (cb))
2406 {
2407 api_ready (aTHX_ cb);
2408 --count;
2409 }
2410 else if (SvTYPE (cb) == SVt_PVCV)
2411 {
2412 dSP;
2413 PUSHMARK (SP);
2414 XPUSHs (sv_2mortal (newRV_inc ((SV *)av)));
2415 PUTBACK;
2416 call_sv (cb, G_VOID | G_DISCARD | G_EVAL | G_KEEPERR);
2417 }
2418
2419 SvREFCNT_dec (cb);
2420 }
2421}
2422
2423static void
2424coro_semaphore_on_destroy (pTHX_ struct coro *coro)
2425{
2426 /* call $sem->adjust (0) to possibly wake up some other waiters */
2427 coro_semaphore_adjust (aTHX_ (AV *)coro->slf_frame.data, 0);
2428}
2429
2430static int
2431slf_check_semaphore_down_or_wait (pTHX_ struct CoroSLF *frame, int acquire)
2432{
2433 AV *av = (AV *)frame->data;
2434 SV *count_sv = AvARRAY (av)[0];
2435
2436 /* if we are about to throw, don't actually acquire the lock, just throw */
2437 if (CORO_THROW)
2438 return 0;
2439 else if (SvIVX (count_sv) > 0)
2440 {
2441 SvSTATE_current->on_destroy = 0;
2442
2443 if (acquire)
2444 SvIVX (count_sv) = SvIVX (count_sv) - 1;
2445 else
2446 coro_semaphore_adjust (aTHX_ av, 0);
2447
2448 return 0;
2449 }
2450 else
2451 {
2452 int i;
2453 /* if we were woken up but can't down, we look through the whole */
2454 /* waiters list and only add us if we aren't in there already */
2455 /* this avoids some degenerate memory usage cases */
2456
2457 for (i = 1; i <= AvFILLp (av); ++i)
2458 if (AvARRAY (av)[i] == SvRV (coro_current))
2459 return 1;
2460
2461 av_push (av, SvREFCNT_inc (SvRV (coro_current)));
2462 return 1;
2463 }
2464}
2465
2466static int
2467slf_check_semaphore_down (pTHX_ struct CoroSLF *frame)
2468{
2469 return slf_check_semaphore_down_or_wait (aTHX_ frame, 1);
2470}
2471
2472static int
2473slf_check_semaphore_wait (pTHX_ struct CoroSLF *frame)
2474{
2475 return slf_check_semaphore_down_or_wait (aTHX_ frame, 0);
2476}
2477
2478static void
2479slf_init_semaphore_down_or_wait (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2480{
2481 AV *av = (AV *)SvRV (arg [0]);
2482
2483 if (SvIVX (AvARRAY (av)[0]) > 0)
2484 {
2485 frame->data = (void *)av;
2486 frame->prepare = prepare_nop;
2487 }
2488 else
2489 {
2490 av_push (av, SvREFCNT_inc (SvRV (coro_current)));
2491
2492 frame->data = (void *)sv_2mortal (SvREFCNT_inc ((SV *)av));
2493 frame->prepare = prepare_schedule;
2494
2495 /* to avoid race conditions when a woken-up coro gets terminated */
2496 /* we arrange for a temporary on_destroy that calls adjust (0) */
2497 SvSTATE_current->on_destroy = coro_semaphore_on_destroy;
2498 }
2499}
2500
2501static void
2502slf_init_semaphore_down (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2503{
2504 slf_init_semaphore_down_or_wait (aTHX_ frame, cv, arg, items);
2505 frame->check = slf_check_semaphore_down;
2506}
2507
2508static void
2509slf_init_semaphore_wait (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2510{
2511 if (items >= 2)
2512 {
2513 /* callback form */
2514 AV *av = (AV *)SvRV (arg [0]);
2515 CV *cb_cv = coro_sv_2cv (aTHX_ arg [1]);
2516
2517 av_push (av, (SV *)SvREFCNT_inc_NN (cb_cv));
2518
2519 if (SvIVX (AvARRAY (av)[0]) > 0)
2520 coro_semaphore_adjust (aTHX_ av, 0);
2521
2522 frame->prepare = prepare_nop;
2523 frame->check = slf_check_nop;
2524 }
2525 else
2526 {
2527 slf_init_semaphore_down_or_wait (aTHX_ frame, cv, arg, items);
2528 frame->check = slf_check_semaphore_wait;
2529 }
2530}
2531
2532/* signal */
2533
2534static void
2535coro_signal_wake (pTHX_ AV *av, int count)
2536{
2537 SvIVX (AvARRAY (av)[0]) = 0;
2538
2539 /* now signal count waiters */
2540 while (count > 0 && AvFILLp (av) > 0)
2541 {
2542 SV *cb;
2543
2544 /* swap first two elements so we can shift a waiter */
2545 cb = AvARRAY (av)[0];
2546 AvARRAY (av)[0] = AvARRAY (av)[1];
2547 AvARRAY (av)[1] = cb;
2548
2549 cb = av_shift (av);
2550
2551 api_ready (aTHX_ cb);
2552 sv_setiv (cb, 0); /* signal waiter */
2553 SvREFCNT_dec (cb);
2554
2555 --count;
2556 }
2557}
2558
2559static int
2560slf_check_signal_wait (pTHX_ struct CoroSLF *frame)
2561{
2562 /* if we are about to throw, also stop waiting */
2563 return SvROK ((SV *)frame->data) && !CORO_THROW;
2564}
2565
2566static void
2567slf_init_signal_wait (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2568{
2569 AV *av = (AV *)SvRV (arg [0]);
2570
2571 if (SvIVX (AvARRAY (av)[0]))
2572 {
2573 SvIVX (AvARRAY (av)[0]) = 0;
2574 frame->prepare = prepare_nop;
2575 frame->check = slf_check_nop;
2576 }
2577 else
2578 {
2579 SV *waiter = newRV_inc (SvRV (coro_current)); /* owned by signal av */
2580
2581 av_push (av, waiter);
2582
2583 frame->data = (void *)sv_2mortal (SvREFCNT_inc_NN (waiter)); /* owned by process */
2584 frame->prepare = prepare_schedule;
2585 frame->check = slf_check_signal_wait;
2586 }
2587}
2588
2589/*****************************************************************************/
2590/* Coro::AIO */
2591
2592#define CORO_MAGIC_type_aio PERL_MAGIC_ext
2593
2594/* helper storage struct */
2595struct io_state
2596{
2597 int errorno;
2598 I32 laststype; /* U16 in 5.10.0 */
2599 int laststatval;
2600 Stat_t statcache;
2601};
2602
2603static void
2604coro_aio_callback (pTHX_ CV *cv)
2605{
2606 dXSARGS;
2607 AV *state = (AV *)GENSUB_ARG;
2608 SV *coro = av_pop (state);
2609 SV *data_sv = newSV (sizeof (struct io_state));
2610
2611 av_extend (state, items - 1);
2612
2613 sv_upgrade (data_sv, SVt_PV);
2614 SvCUR_set (data_sv, sizeof (struct io_state));
2615 SvPOK_only (data_sv);
2616
2617 {
2618 struct io_state *data = (struct io_state *)SvPVX (data_sv);
2619
2620 data->errorno = errno;
2621 data->laststype = PL_laststype;
2622 data->laststatval = PL_laststatval;
2623 data->statcache = PL_statcache;
2624 }
2625
2626 /* now build the result vector out of all the parameters and the data_sv */
2627 {
2628 int i;
2629
2630 for (i = 0; i < items; ++i)
2631 av_push (state, SvREFCNT_inc_NN (ST (i)));
2632 }
2633
2634 av_push (state, data_sv);
2635
2636 api_ready (aTHX_ coro);
2637 SvREFCNT_dec (coro);
2638 SvREFCNT_dec ((AV *)state);
2639}
2640
2641static int
2642slf_check_aio_req (pTHX_ struct CoroSLF *frame)
2643{
2644 AV *state = (AV *)frame->data;
2645
2646 /* if we are about to throw, return early */
2647 /* this does not cancel the aio request, but at least */
2648 /* it quickly returns */
2649 if (CORO_THROW)
2650 return 0;
2651
2652 /* one element that is an RV? repeat! */
2653 if (AvFILLp (state) == 0 && SvROK (AvARRAY (state)[0]))
2654 return 1;
2655
2656 /* restore status */
2657 {
2658 SV *data_sv = av_pop (state);
2659 struct io_state *data = (struct io_state *)SvPVX (data_sv);
2660
2661 errno = data->errorno;
2662 PL_laststype = data->laststype;
2663 PL_laststatval = data->laststatval;
2664 PL_statcache = data->statcache;
2665
2666 SvREFCNT_dec (data_sv);
2667 }
2668
2669 /* push result values */
2670 {
2671 dSP;
2672 int i;
2673
2674 EXTEND (SP, AvFILLp (state) + 1);
2675 for (i = 0; i <= AvFILLp (state); ++i)
2676 PUSHs (sv_2mortal (SvREFCNT_inc_NN (AvARRAY (state)[i])));
2677
2678 PUTBACK;
2679 }
2680
2681 return 0;
2682}
2683
2684static void
2685slf_init_aio_req (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2686{
2687 AV *state = (AV *)sv_2mortal ((SV *)newAV ());
2688 SV *coro_hv = SvRV (coro_current);
2689 struct coro *coro = SvSTATE_hv (coro_hv);
2690
2691 /* put our coroutine id on the state arg */
2692 av_push (state, SvREFCNT_inc_NN (coro_hv));
2693
2694 /* first see whether we have a non-zero priority and set it as AIO prio */
2695 if (coro->prio)
2696 {
2697 dSP;
2698
2699 static SV *prio_cv;
2700 static SV *prio_sv;
2701
2702 if (expect_false (!prio_cv))
2703 {
2704 prio_cv = (SV *)get_cv ("IO::AIO::aioreq_pri", 0);
2705 prio_sv = newSViv (0);
2706 }
2707
2708 PUSHMARK (SP);
2709 sv_setiv (prio_sv, coro->prio);
2710 XPUSHs (prio_sv);
2711
2712 PUTBACK;
2713 call_sv (prio_cv, G_VOID | G_DISCARD);
2714 }
2715
2716 /* now call the original request */
2717 {
2718 dSP;
2719 CV *req = (CV *)CORO_MAGIC_NN ((SV *)cv, CORO_MAGIC_type_aio)->mg_obj;
2720 int i;
2721
2722 PUSHMARK (SP);
2723
2724 /* first push all args to the stack */
2725 EXTEND (SP, items + 1);
2726
2727 for (i = 0; i < items; ++i)
2728 PUSHs (arg [i]);
2729
2730 /* now push the callback closure */
2731 PUSHs (sv_2mortal (gensub (aTHX_ coro_aio_callback, (void *)SvREFCNT_inc_NN ((SV *)state))));
2732
2733 /* now call the AIO function - we assume our request is uncancelable */
2734 PUTBACK;
2735 call_sv ((SV *)req, G_VOID | G_DISCARD);
2736 }
2737
2738 /* now that the requets is going, we loop toll we have a result */
2739 frame->data = (void *)state;
2740 frame->prepare = prepare_schedule;
2741 frame->check = slf_check_aio_req;
2742}
2743
2744static void
2745coro_aio_req_xs (pTHX_ CV *cv)
2746{
2747 dXSARGS;
2748
2749 CORO_EXECUTE_SLF_XS (slf_init_aio_req);
2750
2751 XSRETURN_EMPTY;
2752}
2753
2754/*****************************************************************************/
2755
2756#if CORO_CLONE
2757# include "clone.c"
2758#endif
2759
2760MODULE = Coro::State PACKAGE = Coro::State PREFIX = api_
2761
2762PROTOTYPES: DISABLE
2763
2764BOOT:
2765{
2766#ifdef USE_ITHREADS
2767# if CORO_PTHREAD
2768 coro_thx = PERL_GET_CONTEXT;
2769# endif
2770#endif
2771 BOOT_PAGESIZE;
2772
2773 cctx_current = cctx_new_empty ();
2774
2775 irsgv = gv_fetchpv ("/" , GV_ADD|GV_NOTQUAL, SVt_PV);
2776 stdoutgv = gv_fetchpv ("STDOUT", GV_ADD|GV_NOTQUAL, SVt_PVIO);
2777
2778 orig_sigelem_get = PL_vtbl_sigelem.svt_get; PL_vtbl_sigelem.svt_get = coro_sigelem_get;
2779 orig_sigelem_set = PL_vtbl_sigelem.svt_set; PL_vtbl_sigelem.svt_set = coro_sigelem_set;
2780 orig_sigelem_clr = PL_vtbl_sigelem.svt_clear; PL_vtbl_sigelem.svt_clear = coro_sigelem_clr;
2781
2782 hv_sig = coro_get_hv (aTHX_ "SIG", TRUE);
2783 rv_diehook = newRV_inc ((SV *)gv_fetchpv ("Coro::State::diehook" , 0, SVt_PVCV));
2784 rv_warnhook = newRV_inc ((SV *)gv_fetchpv ("Coro::State::warnhook", 0, SVt_PVCV));
2785
2786 coro_state_stash = gv_stashpv ("Coro::State", TRUE);
2787
2788 newCONSTSUB (coro_state_stash, "CC_TRACE" , newSViv (CC_TRACE));
2789 newCONSTSUB (coro_state_stash, "CC_TRACE_SUB" , newSViv (CC_TRACE_SUB));
2790 newCONSTSUB (coro_state_stash, "CC_TRACE_LINE", newSViv (CC_TRACE_LINE));
2791 newCONSTSUB (coro_state_stash, "CC_TRACE_ALL" , newSViv (CC_TRACE_ALL));
2792
2793 main_mainstack = PL_mainstack;
2794 main_top_env = PL_top_env;
2795
2796 while (main_top_env->je_prev)
2797 main_top_env = main_top_env->je_prev;
2798
2799 {
2800 SV *slf = sv_2mortal (newSViv (PTR2IV (pp_slf)));
2801
2802 if (!PL_custom_op_names) PL_custom_op_names = newHV ();
2803 hv_store_ent (PL_custom_op_names, slf, newSVpv ("coro_slf", 0), 0);
2804
2805 if (!PL_custom_op_descs) PL_custom_op_descs = newHV ();
2806 hv_store_ent (PL_custom_op_descs, slf, newSVpv ("coro schedule like function", 0), 0);
2807 }
2808
2809 coroapi.ver = CORO_API_VERSION;
2810 coroapi.rev = CORO_API_REVISION;
2811
2812 coroapi.transfer = api_transfer;
2813
2814 coroapi.sv_state = SvSTATE_;
2815 coroapi.execute_slf = api_execute_slf;
2816 coroapi.prepare_nop = prepare_nop;
2817 coroapi.prepare_schedule = prepare_schedule;
2818 coroapi.prepare_cede = prepare_cede;
2819 coroapi.prepare_cede_notself = prepare_cede_notself;
2820
2821 {
2822 SV **svp = hv_fetch (PL_modglobal, "Time::NVtime", 12, 0);
2823
2824 if (!svp) croak ("Time::HiRes is required");
2825 if (!SvIOK (*svp)) croak ("Time::NVtime isn't a function pointer");
2826
2827 nvtime = INT2PTR (double (*)(), SvIV (*svp));
2828 }
2829
2830 assert (("PRIO_NORMAL must be 0", !PRIO_NORMAL));
2831}
2832
2833SV *
2834new (char *klass, ...)
2835 ALIAS:
2836 Coro::new = 1
2837 CODE:
2838{
2839 struct coro *coro;
2840 MAGIC *mg;
2841 HV *hv;
2842 CV *cb;
2843 int i;
2844
2845 if (items > 1)
2846 {
2847 cb = coro_sv_2cv (aTHX_ ST (1));
2848
2849 if (!ix)
235 { 2850 {
236 /* I never used formats, so how should I know how these are implemented? */ 2851 if (CvISXSUB (cb))
237 /* my bold guess is as a simple, plain sub... */ 2852 croak ("Coro::State doesn't support XS functions as coroutine start, caught");
238 croak ("CXt_FORMAT not yet handled. Don't switch coroutines from within formats"); 2853
2854 if (!CvROOT (cb))
2855 croak ("Coro::State doesn't support autoloaded or undefined functions as coroutine start, caught");
239 } 2856 }
240 } 2857 }
241 2858
242 if (top_si->si_type == PERLSI_MAIN)
243 break;
244
245 top_si = top_si->si_prev;
246 ccstk = top_si->si_cxstack;
247 cxix = top_si->si_cxix;
248 }
249
250 PUTBACK;
251 }
252
253 c->dowarn = PL_dowarn;
254 c->defav = GvAV (PL_defgv);
255 c->curstackinfo = PL_curstackinfo;
256 c->curstack = PL_curstack;
257 c->mainstack = PL_mainstack;
258 c->stack_sp = PL_stack_sp;
259 c->op = PL_op;
260 c->curpad = PL_curpad;
261 c->stack_base = PL_stack_base;
262 c->stack_max = PL_stack_max;
263 c->tmps_stack = PL_tmps_stack;
264 c->tmps_floor = PL_tmps_floor;
265 c->tmps_ix = PL_tmps_ix;
266 c->tmps_max = PL_tmps_max;
267 c->markstack = PL_markstack;
268 c->markstack_ptr = PL_markstack_ptr;
269 c->markstack_max = PL_markstack_max;
270 c->scopestack = PL_scopestack;
271 c->scopestack_ix = PL_scopestack_ix;
272 c->scopestack_max = PL_scopestack_max;
273 c->savestack = PL_savestack;
274 c->savestack_ix = PL_savestack_ix;
275 c->savestack_max = PL_savestack_max;
276 c->retstack = PL_retstack;
277 c->retstack_ix = PL_retstack_ix;
278 c->retstack_max = PL_retstack_max;
279 c->curcop = PL_curcop;
280}
281
282#define LOAD(state) do { load_state(aTHX_ state); SPAGAIN; } while (0)
283#define SAVE(state) do { PUTBACK; save_state(aTHX_ state); } while (0)
284
285static void
286load_state(pTHX_ Coro__State c)
287{
288 PL_dowarn = c->dowarn;
289 GvAV (PL_defgv) = c->defav;
290 PL_curstackinfo = c->curstackinfo;
291 PL_curstack = c->curstack;
292 PL_mainstack = c->mainstack;
293 PL_stack_sp = c->stack_sp;
294 PL_op = c->op;
295 PL_curpad = c->curpad;
296 PL_stack_base = c->stack_base;
297 PL_stack_max = c->stack_max;
298 PL_tmps_stack = c->tmps_stack;
299 PL_tmps_floor = c->tmps_floor;
300 PL_tmps_ix = c->tmps_ix;
301 PL_tmps_max = c->tmps_max;
302 PL_markstack = c->markstack;
303 PL_markstack_ptr = c->markstack_ptr;
304 PL_markstack_max = c->markstack_max;
305 PL_scopestack = c->scopestack;
306 PL_scopestack_ix = c->scopestack_ix;
307 PL_scopestack_max = c->scopestack_max;
308 PL_savestack = c->savestack;
309 PL_savestack_ix = c->savestack_ix;
310 PL_savestack_max = c->savestack_max;
311 PL_retstack = c->retstack;
312 PL_retstack_ix = c->retstack_ix;
313 PL_retstack_max = c->retstack_max;
314 PL_curcop = c->curcop;
315
316 {
317 dSP;
318 CV *cv;
319
320 /* now do the ugly restore mess */
321 while ((cv = (CV *)POPs))
322 {
323 AV *padlist = (AV *)POPs;
324
325 put_padlist (cv);
326 CvPADLIST(cv) = padlist;
327 CvDEPTH(cv) = (I32)POPs;
328
329#ifdef USE_THREADS
330 CvOWNER(cv) = (struct perl_thread *)POPs;
331 error does not work either
332#endif
333 }
334
335 PUTBACK;
336 }
337}
338
339/* this is an EXACT copy of S_nuke_stacks in perl.c, which is unfortunately static */
340STATIC void
341destroy_stacks(pTHX)
342{
343 /* die does this while calling POPSTACK, but I just don't see why. */
344 /* OTOH, die does not have a memleak, but we do... */
345 dounwind(-1);
346
347 /* is this ugly, I ask? */
348 while (PL_scopestack_ix)
349 LEAVE;
350
351 while (PL_curstackinfo->si_next)
352 PL_curstackinfo = PL_curstackinfo->si_next;
353
354 while (PL_curstackinfo)
355 {
356 PERL_SI *p = PL_curstackinfo->si_prev;
357
358 SvREFCNT_dec(PL_curstackinfo->si_stack);
359 Safefree(PL_curstackinfo->si_cxstack);
360 Safefree(PL_curstackinfo);
361 PL_curstackinfo = p;
362 }
363
364 if (PL_scopestack_ix != 0)
365 Perl_warner(aTHX_ WARN_INTERNAL,
366 "Unbalanced scopes: %ld more ENTERs than LEAVEs\n",
367 (long)PL_scopestack_ix);
368 if (PL_savestack_ix != 0)
369 Perl_warner(aTHX_ WARN_INTERNAL,
370 "Unbalanced saves: %ld more saves than restores\n",
371 (long)PL_savestack_ix);
372 if (PL_tmps_floor != -1)
373 Perl_warner(aTHX_ WARN_INTERNAL,"Unbalanced tmps: %ld more allocs than frees\n",
374 (long)PL_tmps_floor + 1);
375 /*
376 */
377 Safefree(PL_tmps_stack);
378 Safefree(PL_markstack);
379 Safefree(PL_scopestack);
380 Safefree(PL_savestack);
381 Safefree(PL_retstack);
382}
383
384#define SUB_INIT "Coro::State::_newcoro"
385
386MODULE = Coro::State PACKAGE = Coro::State
387
388PROTOTYPES: ENABLE
389
390BOOT:
391 if (!padlist_cache)
392 padlist_cache = newHV ();
393
394Coro::State
395_newprocess(args)
396 SV * args
397 PROTOTYPE: $
398 CODE:
399 Coro__State coro;
400
401 if (!SvROK (args) || SvTYPE (SvRV (args)) != SVt_PVAV)
402 croak ("Coro::State::newprocess expects an arrayref");
403
404 New (0, coro, 1, struct coro); 2859 Newz (0, coro, 1, struct coro);
2860 coro->args = newAV ();
2861 coro->flags = CF_NEW;
405 2862
406 coro->mainstack = 0; /* actual work is done inside transfer */ 2863 if (coro_first) coro_first->prev = coro;
407 coro->args = (AV *)SvREFCNT_inc (SvRV (args)); 2864 coro->next = coro_first;
2865 coro_first = coro;
408 2866
409 RETVAL = coro; 2867 coro->hv = hv = newHV ();
2868 mg = sv_magicext ((SV *)hv, 0, CORO_MAGIC_type_state, &coro_state_vtbl, (char *)coro, 0);
2869 mg->mg_flags |= MGf_DUP;
2870 RETVAL = sv_bless (newRV_noinc ((SV *)hv), gv_stashpv (klass, 1));
2871
2872 if (items > 1)
2873 {
2874 av_extend (coro->args, items - 1 + ix - 1);
2875
2876 if (ix)
2877 {
2878 av_push (coro->args, SvREFCNT_inc_NN ((SV *)cb));
2879 cb = cv_coro_run;
2880 }
2881
2882 coro->startcv = (CV *)SvREFCNT_inc_NN ((SV *)cb);
2883
2884 for (i = 2; i < items; i++)
2885 av_push (coro->args, newSVsv (ST (i)));
2886 }
2887}
410 OUTPUT: 2888 OUTPUT:
411 RETVAL 2889 RETVAL
412 2890
413void 2891void
414transfer(prev,next) 2892transfer (...)
415 Coro::State_or_hashref prev 2893 PROTOTYPE: $$
416 Coro::State_or_hashref next 2894 CODE:
417 CODE: 2895 CORO_EXECUTE_SLF_XS (slf_init_transfer);
418 2896
419 if (prev != next) 2897bool
2898_destroy (SV *coro_sv)
2899 CODE:
2900 RETVAL = coro_state_destroy (aTHX_ SvSTATE (coro_sv));
2901 OUTPUT:
2902 RETVAL
2903
2904void
2905_exit (int code)
2906 PROTOTYPE: $
2907 CODE:
2908 _exit (code);
2909
2910SV *
2911clone (Coro::State coro)
2912 CODE:
2913{
2914#if CORO_CLONE
2915 struct coro *ncoro = coro_clone (coro);
2916 MAGIC *mg;
2917 /* TODO: too much duplication */
2918 ncoro->hv = newHV ();
2919 mg = sv_magicext ((SV *)ncoro->hv, 0, CORO_MAGIC_type_state, &coro_state_vtbl, (char *)ncoro, 0);
2920 mg->mg_flags |= MGf_DUP;
2921 RETVAL = sv_bless (newRV_noinc ((SV *)ncoro->hv), SvSTASH (coro->hv));
2922#else
2923 croak ("Coro::State->clone has not been configured into this installation of Coro, realised");
2924#endif
2925}
2926 OUTPUT:
2927 RETVAL
2928
2929int
2930cctx_stacksize (int new_stacksize = 0)
2931 PROTOTYPE: ;$
2932 CODE:
2933 RETVAL = cctx_stacksize;
2934 if (new_stacksize)
420 { 2935 {
2936 cctx_stacksize = new_stacksize;
2937 ++cctx_gen;
421 /* 2938 }
422 * this could be done in newprocess which would lead to 2939 OUTPUT:
423 * extremely elegant and fast (just SAVE/LOAD) 2940 RETVAL
424 * code here, but lazy allocation of stacks has also 2941
425 * some virtues and the overhead of the if() is nil. 2942int
2943cctx_max_idle (int max_idle = 0)
2944 PROTOTYPE: ;$
2945 CODE:
2946 RETVAL = cctx_max_idle;
2947 if (max_idle > 1)
2948 cctx_max_idle = max_idle;
2949 OUTPUT:
2950 RETVAL
2951
2952int
2953cctx_count ()
2954 PROTOTYPE:
2955 CODE:
2956 RETVAL = cctx_count;
2957 OUTPUT:
2958 RETVAL
2959
2960int
2961cctx_idle ()
2962 PROTOTYPE:
2963 CODE:
2964 RETVAL = cctx_idle;
2965 OUTPUT:
2966 RETVAL
2967
2968void
2969list ()
2970 PROTOTYPE:
2971 PPCODE:
2972{
2973 struct coro *coro;
2974 for (coro = coro_first; coro; coro = coro->next)
2975 if (coro->hv)
2976 XPUSHs (sv_2mortal (newRV_inc ((SV *)coro->hv)));
2977}
2978
2979void
2980call (Coro::State coro, SV *coderef)
2981 ALIAS:
2982 eval = 1
2983 CODE:
2984{
2985 if (coro->mainstack && ((coro->flags & CF_RUNNING) || coro->slot))
426 */ 2986 {
427 if (next->mainstack) 2987 struct coro temp;
2988
2989 if (!(coro->flags & CF_RUNNING))
428 { 2990 {
429 SAVE (prev); 2991 PUTBACK;
430 LOAD (next); 2992 save_perl (aTHX_ &temp);
431 /* mark this state as in-use */ 2993 load_perl (aTHX_ coro);
432 next->mainstack = 0;
433 next->tmps_ix = -2;
434 } 2994 }
435 else if (next->tmps_ix == -2) 2995
2996 {
2997 dSP;
2998 ENTER;
2999 SAVETMPS;
3000 PUTBACK;
3001 PUSHSTACK;
3002 PUSHMARK (SP);
3003
3004 if (ix)
3005 eval_sv (coderef, 0);
3006 else
3007 call_sv (coderef, G_KEEPERR | G_EVAL | G_VOID | G_DISCARD);
3008
3009 POPSTACK;
3010 SPAGAIN;
3011 FREETMPS;
3012 LEAVE;
3013 PUTBACK;
3014 }
3015
3016 if (!(coro->flags & CF_RUNNING))
436 { 3017 {
437 croak ("tried to transfer to running coroutine"); 3018 save_perl (aTHX_ coro);
438 } 3019 load_perl (aTHX_ &temp);
439 else
440 {
441 SAVE (prev);
442
443 /*
444 * emulate part of the perl startup here.
445 */
446 UNOP myop;
447
448 init_stacks (); /* from perl.c */
449 PL_op = (OP *)&myop;
450 /*PL_curcop = 0;*/
451 GvAV (PL_defgv) = (AV *)SvREFCNT_inc ((SV *)next->args);
452
453 SPAGAIN; 3020 SPAGAIN;
454 Zero(&myop, 1, UNOP);
455 myop.op_next = Nullop;
456 myop.op_flags = OPf_WANT_VOID;
457
458 PUSHMARK(SP);
459 XPUSHs ((SV*)get_cv(SUB_INIT, TRUE));
460 PUTBACK;
461 /*
462 * the next line is slightly wrong, as PL_op->op_next
463 * is actually being executed so we skip the first op.
464 * that doesn't matter, though, since it is only
465 * pp_nextstate and we never return...
466 */
467 PL_op = Perl_pp_entersub(aTHX);
468 SPAGAIN;
469
470 ENTER;
471 } 3021 }
472 } 3022 }
3023}
3024
3025SV *
3026is_ready (Coro::State coro)
3027 PROTOTYPE: $
3028 ALIAS:
3029 is_ready = CF_READY
3030 is_running = CF_RUNNING
3031 is_new = CF_NEW
3032 is_destroyed = CF_DESTROYED
3033 CODE:
3034 RETVAL = boolSV (coro->flags & ix);
3035 OUTPUT:
3036 RETVAL
473 3037
474void 3038void
475DESTROY(coro) 3039throw (Coro::State self, SV *throw = &PL_sv_undef)
476 Coro::State coro 3040 PROTOTYPE: $;$
477 CODE: 3041 CODE:
3042{
3043 struct coro *current = SvSTATE_current;
3044 SV **throwp = self == current ? &CORO_THROW : &self->except;
3045 SvREFCNT_dec (*throwp);
3046 *throwp = SvOK (throw) ? newSVsv (throw) : 0;
3047}
478 3048
479 if (coro->mainstack) 3049void
3050api_trace (SV *coro, int flags = CC_TRACE | CC_TRACE_SUB)
3051 PROTOTYPE: $;$
3052 C_ARGS: aTHX_ coro, flags
3053
3054SV *
3055has_cctx (Coro::State coro)
3056 PROTOTYPE: $
3057 CODE:
3058 /* maybe manage the running flag differently */
3059 RETVAL = boolSV (!!coro->cctx || (coro->flags & CF_RUNNING));
3060 OUTPUT:
3061 RETVAL
3062
3063int
3064is_traced (Coro::State coro)
3065 PROTOTYPE: $
3066 CODE:
3067 RETVAL = (coro->cctx ? coro->cctx->flags : 0) & CC_TRACE_ALL;
3068 OUTPUT:
3069 RETVAL
3070
3071UV
3072rss (Coro::State coro)
3073 PROTOTYPE: $
3074 ALIAS:
3075 usecount = 1
3076 CODE:
3077 switch (ix)
3078 {
3079 case 0: RETVAL = coro_rss (aTHX_ coro); break;
3080 case 1: RETVAL = coro->usecount; break;
3081 }
3082 OUTPUT:
3083 RETVAL
3084
3085void
3086force_cctx ()
3087 PROTOTYPE:
3088 CODE:
3089 cctx_current->idle_sp = 0;
3090
3091void
3092swap_defsv (Coro::State self)
3093 PROTOTYPE: $
3094 ALIAS:
3095 swap_defav = 1
3096 CODE:
3097 if (!self->slot)
3098 croak ("cannot swap state with coroutine that has no saved state,");
3099 else
480 { 3100 {
481 struct coro temp; 3101 SV **src = ix ? (SV **)&GvAV (PL_defgv) : &GvSV (PL_defgv);
3102 SV **dst = ix ? (SV **)&self->slot->defav : (SV **)&self->slot->defsv;
482 3103
483 SAVE(aTHX_ (&temp)); 3104 SV *tmp = *src; *src = *dst; *dst = tmp;
484 LOAD(aTHX_ coro);
485
486 destroy_stacks ();
487 SvREFCNT_dec ((SV *)GvAV (PL_defgv));
488
489 LOAD((&temp));
490 } 3105 }
491 3106
3107
3108MODULE = Coro::State PACKAGE = Coro
3109
3110BOOT:
3111{
3112 int i;
3113
3114 sv_pool_rss = coro_get_sv (aTHX_ "Coro::POOL_RSS" , TRUE);
3115 sv_pool_size = coro_get_sv (aTHX_ "Coro::POOL_SIZE" , TRUE);
3116 cv_coro_run = get_cv ( "Coro::_terminate", GV_ADD);
3117 cv_coro_terminate = get_cv ( "Coro::terminate" , GV_ADD);
3118 coro_current = coro_get_sv (aTHX_ "Coro::current" , FALSE); SvREADONLY_on (coro_current);
3119 av_async_pool = coro_get_av (aTHX_ "Coro::async_pool", TRUE);
3120 av_destroy = coro_get_av (aTHX_ "Coro::destroy" , TRUE);
3121 sv_manager = coro_get_sv (aTHX_ "Coro::manager" , TRUE);
3122 sv_idle = coro_get_sv (aTHX_ "Coro::idle" , TRUE);
3123
3124 sv_async_pool_idle = newSVpv ("[async pool idle]", 0); SvREADONLY_on (sv_async_pool_idle);
3125 sv_Coro = newSVpv ("Coro", 0); SvREADONLY_on (sv_Coro);
3126 cv_pool_handler = get_cv ("Coro::pool_handler", GV_ADD); SvREADONLY_on (cv_pool_handler);
3127 cv_coro_state_new = get_cv ("Coro::State::new", 0); SvREADONLY_on (cv_coro_state_new);
3128
3129 coro_stash = gv_stashpv ("Coro", TRUE);
3130
3131 newCONSTSUB (coro_stash, "PRIO_MAX", newSViv (PRIO_MAX));
3132 newCONSTSUB (coro_stash, "PRIO_HIGH", newSViv (PRIO_HIGH));
3133 newCONSTSUB (coro_stash, "PRIO_NORMAL", newSViv (PRIO_NORMAL));
3134 newCONSTSUB (coro_stash, "PRIO_LOW", newSViv (PRIO_LOW));
3135 newCONSTSUB (coro_stash, "PRIO_IDLE", newSViv (PRIO_IDLE));
3136 newCONSTSUB (coro_stash, "PRIO_MIN", newSViv (PRIO_MIN));
3137
3138 for (i = PRIO_MAX - PRIO_MIN + 1; i--; )
3139 coro_ready[i] = newAV ();
3140
3141 {
3142 SV *sv = coro_get_sv (aTHX_ "Coro::API", TRUE);
3143
3144 coroapi.schedule = api_schedule;
3145 coroapi.schedule_to = api_schedule_to;
3146 coroapi.cede = api_cede;
3147 coroapi.cede_notself = api_cede_notself;
3148 coroapi.ready = api_ready;
3149 coroapi.is_ready = api_is_ready;
3150 coroapi.nready = coro_nready;
3151 coroapi.current = coro_current;
3152
3153 /*GCoroAPI = &coroapi;*/
3154 sv_setiv (sv, (IV)&coroapi);
3155 SvREADONLY_on (sv);
3156 }
3157}
3158
3159void
3160terminate (...)
3161 CODE:
3162 CORO_EXECUTE_SLF_XS (slf_init_terminate);
3163
3164void
3165schedule (...)
3166 CODE:
3167 CORO_EXECUTE_SLF_XS (slf_init_schedule);
3168
3169void
3170schedule_to (...)
3171 CODE:
3172 CORO_EXECUTE_SLF_XS (slf_init_schedule_to);
3173
3174void
3175cede_to (...)
3176 CODE:
3177 CORO_EXECUTE_SLF_XS (slf_init_cede_to);
3178
3179void
3180cede (...)
3181 CODE:
3182 CORO_EXECUTE_SLF_XS (slf_init_cede);
3183
3184void
3185cede_notself (...)
3186 CODE:
3187 CORO_EXECUTE_SLF_XS (slf_init_cede_notself);
3188
3189void
3190_cancel (Coro::State self)
3191 CODE:
3192 coro_state_destroy (aTHX_ self);
3193 coro_call_on_destroy (aTHX_ self);
3194
3195void
3196_set_current (SV *current)
3197 PROTOTYPE: $
3198 CODE:
3199 SvREFCNT_dec (SvRV (coro_current));
3200 SvRV_set (coro_current, SvREFCNT_inc_NN (SvRV (current)));
3201
3202void
3203_set_readyhook (SV *hook)
3204 PROTOTYPE: $
3205 CODE:
492 SvREFCNT_dec (coro->args); 3206 SvREFCNT_dec (coro_readyhook);
493 Safefree (coro); 3207 coro_readyhook = SvOK (hook) ? newSVsv (hook) : 0;
494 3208
3209int
3210prio (Coro::State coro, int newprio = 0)
3211 PROTOTYPE: $;$
3212 ALIAS:
3213 nice = 1
3214 CODE:
3215{
3216 RETVAL = coro->prio;
495 3217
3218 if (items > 1)
3219 {
3220 if (ix)
3221 newprio = coro->prio - newprio;
3222
3223 if (newprio < PRIO_MIN) newprio = PRIO_MIN;
3224 if (newprio > PRIO_MAX) newprio = PRIO_MAX;
3225
3226 coro->prio = newprio;
3227 }
3228}
3229 OUTPUT:
3230 RETVAL
3231
3232SV *
3233ready (SV *self)
3234 PROTOTYPE: $
3235 CODE:
3236 RETVAL = boolSV (api_ready (aTHX_ self));
3237 OUTPUT:
3238 RETVAL
3239
3240int
3241nready (...)
3242 PROTOTYPE:
3243 CODE:
3244 RETVAL = coro_nready;
3245 OUTPUT:
3246 RETVAL
3247
3248void
3249_pool_handler (...)
3250 CODE:
3251 CORO_EXECUTE_SLF_XS (slf_init_pool_handler);
3252
3253void
3254async_pool (SV *cv, ...)
3255 PROTOTYPE: &@
3256 PPCODE:
3257{
3258 HV *hv = (HV *)av_pop (av_async_pool);
3259 AV *av = newAV ();
3260 SV *cb = ST (0);
3261 int i;
3262
3263 av_extend (av, items - 2);
3264 for (i = 1; i < items; ++i)
3265 av_push (av, SvREFCNT_inc_NN (ST (i)));
3266
3267 if ((SV *)hv == &PL_sv_undef)
3268 {
3269 PUSHMARK (SP);
3270 EXTEND (SP, 2);
3271 PUSHs (sv_Coro);
3272 PUSHs ((SV *)cv_pool_handler);
3273 PUTBACK;
3274 call_sv ((SV *)cv_coro_state_new, G_SCALAR);
3275 SPAGAIN;
3276
3277 hv = (HV *)SvREFCNT_inc_NN (SvRV (POPs));
3278 }
3279
3280 {
3281 struct coro *coro = SvSTATE_hv (hv);
3282
3283 assert (!coro->invoke_cb);
3284 assert (!coro->invoke_av);
3285 coro->invoke_cb = SvREFCNT_inc (cb);
3286 coro->invoke_av = av;
3287 }
3288
3289 api_ready (aTHX_ (SV *)hv);
3290
3291 if (GIMME_V != G_VOID)
3292 XPUSHs (sv_2mortal (newRV_noinc ((SV *)hv)));
3293 else
3294 SvREFCNT_dec (hv);
3295}
3296
3297SV *
3298rouse_cb ()
3299 PROTOTYPE:
3300 CODE:
3301 RETVAL = coro_new_rouse_cb (aTHX);
3302 OUTPUT:
3303 RETVAL
3304
3305void
3306rouse_wait (...)
3307 PROTOTYPE: ;$
3308 PPCODE:
3309 CORO_EXECUTE_SLF_XS (slf_init_rouse_wait);
3310
3311
3312MODULE = Coro::State PACKAGE = PerlIO::cede
3313
3314BOOT:
3315 PerlIO_define_layer (aTHX_ &PerlIO_cede);
3316
3317
3318MODULE = Coro::State PACKAGE = Coro::Semaphore
3319
3320SV *
3321new (SV *klass, SV *count = 0)
3322 CODE:
3323 RETVAL = sv_bless (
3324 coro_waitarray_new (aTHX_ count && SvOK (count) ? SvIV (count) : 1),
3325 GvSTASH (CvGV (cv))
3326 );
3327 OUTPUT:
3328 RETVAL
3329
3330# helper for Coro::Channel
3331SV *
3332_alloc (int count)
3333 CODE:
3334 RETVAL = coro_waitarray_new (aTHX_ count);
3335 OUTPUT:
3336 RETVAL
3337
3338SV *
3339count (SV *self)
3340 CODE:
3341 RETVAL = newSVsv (AvARRAY ((AV *)SvRV (self))[0]);
3342 OUTPUT:
3343 RETVAL
3344
3345void
3346up (SV *self, int adjust = 1)
3347 ALIAS:
3348 adjust = 1
3349 CODE:
3350 coro_semaphore_adjust (aTHX_ (AV *)SvRV (self), ix ? adjust : 1);
3351
3352void
3353down (...)
3354 CODE:
3355 CORO_EXECUTE_SLF_XS (slf_init_semaphore_down);
3356
3357void
3358wait (...)
3359 CODE:
3360 CORO_EXECUTE_SLF_XS (slf_init_semaphore_wait);
3361
3362void
3363try (SV *self)
3364 PPCODE:
3365{
3366 AV *av = (AV *)SvRV (self);
3367 SV *count_sv = AvARRAY (av)[0];
3368 IV count = SvIVX (count_sv);
3369
3370 if (count > 0)
3371 {
3372 --count;
3373 SvIVX (count_sv) = count;
3374 XSRETURN_YES;
3375 }
3376 else
3377 XSRETURN_NO;
3378}
3379
3380void
3381waiters (SV *self)
3382 PPCODE:
3383{
3384 AV *av = (AV *)SvRV (self);
3385 int wcount = AvFILLp (av) + 1 - 1;
3386
3387 if (GIMME_V == G_SCALAR)
3388 XPUSHs (sv_2mortal (newSViv (wcount)));
3389 else
3390 {
3391 int i;
3392 EXTEND (SP, wcount);
3393 for (i = 1; i <= wcount; ++i)
3394 PUSHs (sv_2mortal (newRV_inc (AvARRAY (av)[i])));
3395 }
3396}
3397
3398MODULE = Coro::State PACKAGE = Coro::Signal
3399
3400SV *
3401new (SV *klass)
3402 CODE:
3403 RETVAL = sv_bless (
3404 coro_waitarray_new (aTHX_ 0),
3405 GvSTASH (CvGV (cv))
3406 );
3407 OUTPUT:
3408 RETVAL
3409
3410void
3411wait (...)
3412 CODE:
3413 CORO_EXECUTE_SLF_XS (slf_init_signal_wait);
3414
3415void
3416broadcast (SV *self)
3417 CODE:
3418{
3419 AV *av = (AV *)SvRV (self);
3420 coro_signal_wake (aTHX_ av, AvFILLp (av));
3421}
3422
3423void
3424send (SV *self)
3425 CODE:
3426{
3427 AV *av = (AV *)SvRV (self);
3428
3429 if (AvFILLp (av))
3430 coro_signal_wake (aTHX_ av, 1);
3431 else
3432 SvIVX (AvARRAY (av)[0]) = 1; /* remember the signal */
3433}
3434
3435IV
3436awaited (SV *self)
3437 CODE:
3438 RETVAL = AvFILLp ((AV *)SvRV (self)) + 1 - 1;
3439 OUTPUT:
3440 RETVAL
3441
3442
3443MODULE = Coro::State PACKAGE = Coro::AnyEvent
3444
3445BOOT:
3446 sv_activity = coro_get_sv (aTHX_ "Coro::AnyEvent::ACTIVITY", TRUE);
3447
3448void
3449_schedule (...)
3450 CODE:
3451{
3452 static int incede;
3453
3454 api_cede_notself (aTHX);
3455
3456 ++incede;
3457 while (coro_nready >= incede && api_cede (aTHX))
3458 ;
3459
3460 sv_setsv (sv_activity, &PL_sv_undef);
3461 if (coro_nready >= incede)
3462 {
3463 PUSHMARK (SP);
3464 PUTBACK;
3465 call_pv ("Coro::AnyEvent::_activity", G_KEEPERR | G_EVAL | G_VOID | G_DISCARD);
3466 }
3467
3468 --incede;
3469}
3470
3471
3472MODULE = Coro::State PACKAGE = Coro::AIO
3473
3474void
3475_register (char *target, char *proto, SV *req)
3476 CODE:
3477{
3478 CV *req_cv = coro_sv_2cv (aTHX_ req);
3479 /* newXSproto doesn't return the CV on 5.8 */
3480 CV *slf_cv = newXS (target, coro_aio_req_xs, __FILE__);
3481 sv_setpv ((SV *)slf_cv, proto);
3482 sv_magicext ((SV *)slf_cv, (SV *)req_cv, CORO_MAGIC_type_aio, 0, 0, 0);
3483}
3484

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines