ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/Coro/Coro/State.xs
(Generate patch)

Comparing Coro/Coro/State.xs (file contents):
Revision 1.5 by root, Tue Jul 17 02:55:29 2001 UTC vs.
Revision 1.329 by root, Tue Nov 25 20:48:41 2008 UTC

1#include "libcoro/coro.c"
2
3#define PERL_NO_GET_CONTEXT
4#define PERL_EXT
5
1#include "EXTERN.h" 6#include "EXTERN.h"
2#include "perl.h" 7#include "perl.h"
3#include "XSUB.h" 8#include "XSUB.h"
9#include "perliol.h"
4 10
5#if 0 11#include "patchlevel.h"
6# define CHK(x) (void *)0 12
13#include <stdio.h>
14#include <errno.h>
15#include <assert.h>
16
17#ifdef WIN32
18# undef setjmp
19# undef longjmp
20# undef _exit
21# define setjmp _setjmp /* deep magic */
7#else 22#else
8# define CHK(x) if (!(x)) croak("FATAL, CHK: " #x) 23# include <inttypes.h> /* most portable stdint.h */
24#endif
25
26#ifdef HAVE_MMAP
27# include <unistd.h>
28# include <sys/mman.h>
29# ifndef MAP_ANONYMOUS
30# ifdef MAP_ANON
31# define MAP_ANONYMOUS MAP_ANON
32# else
33# undef HAVE_MMAP
34# endif
9#endif 35# endif
36# include <limits.h>
37# ifndef PAGESIZE
38# define PAGESIZE pagesize
39# define BOOT_PAGESIZE pagesize = sysconf (_SC_PAGESIZE)
40static long pagesize;
41# else
42# define BOOT_PAGESIZE (void)0
43# endif
44#else
45# define PAGESIZE 0
46# define BOOT_PAGESIZE (void)0
47#endif
10 48
49#if CORO_USE_VALGRIND
50# include <valgrind/valgrind.h>
51#endif
52
53/* the maximum number of idle cctx that will be pooled */
54static int cctx_max_idle = 4;
55
56#define PERL_VERSION_ATLEAST(a,b,c) \
57 (PERL_REVISION > (a) \
58 || (PERL_REVISION == (a) \
59 && (PERL_VERSION > (b) \
60 || (PERL_VERSION == (b) && PERL_SUBVERSION >= (c)))))
61
62#if !PERL_VERSION_ATLEAST (5,6,0)
63# ifndef PL_ppaddr
64# define PL_ppaddr ppaddr
65# endif
66# ifndef call_sv
67# define call_sv perl_call_sv
68# endif
69# ifndef get_sv
70# define get_sv perl_get_sv
71# endif
72# ifndef get_cv
73# define get_cv perl_get_cv
74# endif
75# ifndef IS_PADGV
76# define IS_PADGV(v) 0
77# endif
78# ifndef IS_PADCONST
79# define IS_PADCONST(v) 0
80# endif
81#endif
82
83/* 5.11 */
84#ifndef CxHASARGS
85# define CxHASARGS(cx) (cx)->blk_sub.hasargs
86#endif
87
88/* 5.10.0 */
89#ifndef SvREFCNT_inc_NN
90# define SvREFCNT_inc_NN(sv) SvREFCNT_inc (sv)
91#endif
92
93/* 5.8.8 */
94#ifndef GV_NOTQUAL
95# define GV_NOTQUAL 0
96#endif
97#ifndef newSV
98# define newSV(l) NEWSV(0,l)
99#endif
100#ifndef CvISXSUB_on
101# define CvISXSUB_on(cv) (void)cv
102#endif
103#ifndef CvISXSUB
104# define CvISXSUB(cv) (CvXSUB (cv) ? TRUE : FALSE)
105#endif
106#ifndef Newx
107# define Newx(ptr,nitems,type) New (0,ptr,nitems,type)
108#endif
109
110/* 5.8.7 */
111#ifndef SvRV_set
112# define SvRV_set(s,v) SvRV(s) = (v)
113#endif
114
115#if !__i386 && !__x86_64 && !__powerpc && !__m68k && !__alpha && !__mips && !__sparc64
116# undef CORO_STACKGUARD
117#endif
118
119#ifndef CORO_STACKGUARD
120# define CORO_STACKGUARD 0
121#endif
122
123/* prefer perl internal functions over our own? */
124#ifndef CORO_PREFER_PERL_FUNCTIONS
125# define CORO_PREFER_PERL_FUNCTIONS 0
126#endif
127
128/* The next macros try to return the current stack pointer, in an as
129 * portable way as possible. */
130#if __GNUC__ >= 4
131# define dSTACKLEVEL int stacklevel_dummy
132# define STACKLEVEL __builtin_frame_address (0)
133#else
134# define dSTACKLEVEL volatile void *stacklevel
135# define STACKLEVEL ((void *)&stacklevel)
136#endif
137
138#define IN_DESTRUCT (PL_main_cv == Nullcv)
139
140#if __GNUC__ >= 3
141# define attribute(x) __attribute__(x)
142# define expect(expr,value) __builtin_expect ((expr),(value))
143# define INLINE static inline
144#else
145# define attribute(x)
146# define expect(expr,value) (expr)
147# define INLINE static
148#endif
149
150#define expect_false(expr) expect ((expr) != 0, 0)
151#define expect_true(expr) expect ((expr) != 0, 1)
152
153#define NOINLINE attribute ((noinline))
154
155#include "CoroAPI.h"
156#define GCoroAPI (&coroapi) /* very sneaky */
157
158#ifdef USE_ITHREADS
159# if CORO_PTHREAD
160static void *coro_thx;
161# endif
162#endif
163
164static double (*nvtime)(); /* so why doesn't it take void? */
165
166/* we hijack an hopefully unused CV flag for our purposes */
167#define CVf_SLF 0x4000
168static OP *pp_slf (pTHX);
169
170static U32 cctx_gen;
171static size_t cctx_stacksize = CORO_STACKSIZE;
172static struct CoroAPI coroapi;
173static AV *main_mainstack; /* used to differentiate between $main and others */
174static JMPENV *main_top_env;
175static HV *coro_state_stash, *coro_stash;
176static volatile SV *coro_mortal; /* will be freed/thrown after next transfer */
177
178static AV *av_destroy; /* destruction queue */
179static SV *sv_manager; /* the manager coro */
180static SV *sv_idle; /* $Coro::idle */
181
182static GV *irsgv; /* $/ */
183static GV *stdoutgv; /* *STDOUT */
184static SV *rv_diehook;
185static SV *rv_warnhook;
186static HV *hv_sig; /* %SIG */
187
188/* async_pool helper stuff */
189static SV *sv_pool_rss;
190static SV *sv_pool_size;
191static SV *sv_async_pool_idle; /* description string */
192static AV *av_async_pool; /* idle pool */
193static SV *sv_Coro; /* class string */
194static CV *cv_pool_handler;
195static CV *cv_coro_state_new;
196
197/* Coro::AnyEvent */
198static SV *sv_activity;
199
200static struct coro_cctx *cctx_first;
201static int cctx_count, cctx_idle;
202
203enum {
204 CC_MAPPED = 0x01,
205 CC_NOREUSE = 0x02, /* throw this away after tracing */
206 CC_TRACE = 0x04,
207 CC_TRACE_SUB = 0x08, /* trace sub calls */
208 CC_TRACE_LINE = 0x10, /* trace each statement */
209 CC_TRACE_ALL = CC_TRACE_SUB | CC_TRACE_LINE,
210};
211
212/* this is a structure representing a c-level coroutine */
213typedef struct coro_cctx
214{
215 struct coro_cctx *next;
216
217 /* the stack */
218 void *sptr;
219 size_t ssize;
220
221 /* cpu state */
222 void *idle_sp; /* sp of top-level transfer/schedule/cede call */
223 JMPENV *idle_te; /* same as idle_sp, but for top_env, TODO: remove once stable */
224 JMPENV *top_env;
225 coro_context cctx;
226
227 U32 gen;
228#if CORO_USE_VALGRIND
229 int valgrind_id;
230#endif
231 unsigned char flags;
232} coro_cctx;
233
234coro_cctx *cctx_current; /* the currently running cctx */
235
236/*****************************************************************************/
237
238enum {
239 CF_RUNNING = 0x0001, /* coroutine is running */
240 CF_READY = 0x0002, /* coroutine is ready */
241 CF_NEW = 0x0004, /* has never been switched to */
242 CF_DESTROYED = 0x0008, /* coroutine data has been freed */
243};
244
245/* the structure where most of the perl state is stored, overlaid on the cxstack */
246typedef struct
247{
248 SV *defsv;
249 AV *defav;
250 SV *errsv;
251 SV *irsgv;
252 HV *hinthv;
253#define VAR(name,type) type name;
254# include "state.h"
255#undef VAR
256} perl_slots;
257
258#define SLOT_COUNT ((sizeof (perl_slots) + sizeof (PERL_CONTEXT) - 1) / sizeof (PERL_CONTEXT))
259
260/* this is a structure representing a perl-level coroutine */
11struct coro { 261struct coro {
12 U8 dowarn; 262 /* the C coroutine allocated to this perl coroutine, if any */
13 AV *defav; 263 coro_cctx *cctx;
14 264
15 PERL_SI *curstackinfo; 265 /* state data */
16 AV *curstack; 266 struct CoroSLF slf_frame; /* saved slf frame */
17 AV *mainstack; 267 AV *mainstack;
18 SV **stack_sp; 268 perl_slots *slot; /* basically the saved sp */
19 OP *op;
20 SV **curpad;
21 SV **stack_base;
22 SV **stack_max;
23 SV **tmps_stack;
24 I32 tmps_floor;
25 I32 tmps_ix;
26 I32 tmps_max;
27 I32 *markstack;
28 I32 *markstack_ptr;
29 I32 *markstack_max;
30 I32 *scopestack;
31 I32 scopestack_ix;
32 I32 scopestack_max;
33 ANY *savestack;
34 I32 savestack_ix;
35 I32 savestack_max;
36 OP **retstack;
37 I32 retstack_ix;
38 I32 retstack_max;
39 COP *curcop;
40 269
41 AV *args; 270 CV *startcv; /* the CV to execute */
271 AV *args; /* data associated with this coroutine (initial args) */
272 int refcnt; /* coroutines are refcounted, yes */
273 int flags; /* CF_ flags */
274 HV *hv; /* the perl hash associated with this coro, if any */
275 void (*on_destroy)(pTHX_ struct coro *coro);
276
277 /* statistics */
278 int usecount; /* number of transfers to this coro */
279
280 /* coro process data */
281 int prio;
282 SV *except; /* exception to be thrown */
283 SV *rouse_cb;
284
285 /* async_pool */
286 SV *saved_deffh;
287 SV *invoke_cb;
288 AV *invoke_av;
289
290 /* linked list */
291 struct coro *next, *prev;
42}; 292};
43 293
44typedef struct coro *Coro__State; 294typedef struct coro *Coro__State;
45typedef struct coro *Coro__State_or_hashref; 295typedef struct coro *Coro__State_or_hashref;
46 296
47static HV *padlist_cache; 297/* the following variables are effectively part of the perl context */
298/* and get copied between struct coro and these variables */
299/* the mainr easonw e don't support windows process emulation */
300static struct CoroSLF slf_frame; /* the current slf frame */
48 301
49/* mostly copied from op.c:cv_clone2 */ 302/** Coro ********************************************************************/
50STATIC AV * 303
51clone_padlist (AV *protopadlist) 304#define PRIO_MAX 3
305#define PRIO_HIGH 1
306#define PRIO_NORMAL 0
307#define PRIO_LOW -1
308#define PRIO_IDLE -3
309#define PRIO_MIN -4
310
311/* for Coro.pm */
312static SV *coro_current;
313static SV *coro_readyhook;
314static AV *coro_ready [PRIO_MAX - PRIO_MIN + 1];
315static CV *cv_coro_run, *cv_coro_terminate;
316static struct coro *coro_first;
317#define coro_nready coroapi.nready
318
319/** lowlevel stuff **********************************************************/
320
321static SV *
322coro_get_sv (pTHX_ const char *name, int create)
52{ 323{
53 AV *av; 324#if PERL_VERSION_ATLEAST (5,10,0)
54 I32 ix; 325 /* silence stupid and wrong 5.10 warning that I am unable to switch off */
55 AV *protopad_name = (AV *) * av_fetch (protopadlist, 0, FALSE); 326 get_sv (name, create);
56 AV *protopad = (AV *) * av_fetch (protopadlist, 1, FALSE); 327#endif
57 SV **pname = AvARRAY (protopad_name); 328 return get_sv (name, create);
58 SV **ppad = AvARRAY (protopad); 329}
59 I32 fname = AvFILLp (protopad_name); 330
60 I32 fpad = AvFILLp (protopad); 331static AV *
332coro_get_av (pTHX_ const char *name, int create)
333{
334#if PERL_VERSION_ATLEAST (5,10,0)
335 /* silence stupid and wrong 5.10 warning that I am unable to switch off */
336 get_av (name, create);
337#endif
338 return get_av (name, create);
339}
340
341static HV *
342coro_get_hv (pTHX_ const char *name, int create)
343{
344#if PERL_VERSION_ATLEAST (5,10,0)
345 /* silence stupid and wrong 5.10 warning that I am unable to switch off */
346 get_hv (name, create);
347#endif
348 return get_hv (name, create);
349}
350
351/* may croak */
352INLINE CV *
353coro_sv_2cv (pTHX_ SV *sv)
354{
355 HV *st;
356 GV *gvp;
357 return sv_2cv (sv, &st, &gvp, 0);
358}
359
360static AV *
361coro_derive_padlist (pTHX_ CV *cv)
362{
363 AV *padlist = CvPADLIST (cv);
61 AV *newpadlist, *newpad_name, *newpad; 364 AV *newpadlist, *newpad;
62 SV **npad;
63
64 newpad_name = newAV ();
65 for (ix = fname; ix >= 0; ix--)
66 av_store (newpad_name, ix, SvREFCNT_inc (pname[ix]));
67
68 newpad = newAV ();
69 av_fill (newpad, AvFILLp (protopad));
70 npad = AvARRAY (newpad);
71 365
72 newpadlist = newAV (); 366 newpadlist = newAV ();
73 AvREAL_off (newpadlist); 367 AvREAL_off (newpadlist);
74 av_store (newpadlist, 0, (SV *) newpad_name); 368#if PERL_VERSION_ATLEAST (5,10,0)
369 Perl_pad_push (aTHX_ padlist, AvFILLp (padlist) + 1);
370#else
371 Perl_pad_push (aTHX_ padlist, AvFILLp (padlist) + 1, 1);
372#endif
373 newpad = (AV *)AvARRAY (padlist)[AvFILLp (padlist)];
374 --AvFILLp (padlist);
375
376 av_store (newpadlist, 0, SvREFCNT_inc_NN (*av_fetch (padlist, 0, FALSE)));
75 av_store (newpadlist, 1, (SV *) newpad); 377 av_store (newpadlist, 1, (SV *)newpad);
76 378
77 av = newAV (); /* will be @_ */ 379 return newpadlist;
78 av_extend (av, 0); 380}
79 av_store (newpad, 0, (SV *) av);
80 AvFLAGS (av) = AVf_REIFY;
81 381
82 for (ix = fpad; ix > 0; ix--) 382static void
383free_padlist (pTHX_ AV *padlist)
384{
385 /* may be during global destruction */
386 if (SvREFCNT (padlist))
83 { 387 {
84 SV *namesv = (ix <= fname) ? pname[ix] : Nullsv; 388 I32 i = AvFILLp (padlist);
85 if (namesv && namesv != &PL_sv_undef) 389 while (i >= 0)
86 { 390 {
87 char *name = SvPVX (namesv); /* XXX */ 391 SV **svp = av_fetch (padlist, i--, FALSE);
88 if (SvFLAGS (namesv) & SVf_FAKE || *name == '&') 392 if (svp)
89 { /* lexical from outside? */
90 npad[ix] = SvREFCNT_inc (ppad[ix]);
91 } 393 {
92 else
93 { /* our own lexical */
94 SV *sv; 394 SV *sv;
95 if (*name == '&') 395 while (&PL_sv_undef != (sv = av_pop ((AV *)*svp)))
96 sv = SvREFCNT_inc (ppad[ix]); 396 SvREFCNT_dec (sv);
97 else if (*name == '@') 397
98 sv = (SV *) newAV (); 398 SvREFCNT_dec (*svp);
99 else if (*name == '%')
100 sv = (SV *) newHV ();
101 else
102 sv = NEWSV (0, 0);
103 if (!SvPADBUSY (sv))
104 SvPADMY_on (sv);
105 npad[ix] = sv;
106 } 399 }
107 } 400 }
108 else if (IS_PADGV (ppad[ix]) || IS_PADCONST (ppad[ix]))
109 {
110 npad[ix] = SvREFCNT_inc (ppad[ix]);
111 }
112 else
113 {
114 SV *sv = NEWSV (0, 0);
115 SvPADTMP_on (sv);
116 npad[ix] = sv;
117 }
118 }
119 401
120#if 0 /* NONOTUNDERSTOOD */
121 /* Now that vars are all in place, clone nested closures. */
122
123 for (ix = fpad; ix > 0; ix--) {
124 SV* namesv = (ix <= fname) ? pname[ix] : Nullsv;
125 if (namesv
126 && namesv != &PL_sv_undef
127 && !(SvFLAGS(namesv) & SVf_FAKE)
128 && *SvPVX(namesv) == '&'
129 && CvCLONE(ppad[ix]))
130 {
131 CV *kid = cv_clone((CV*)ppad[ix]);
132 SvREFCNT_dec(ppad[ix]);
133 CvCLONE_on(kid);
134 SvPADMY_on(kid);
135 npad[ix] = (SV*)kid;
136 }
137 }
138#endif
139
140 return newpadlist;
141}
142
143STATIC AV *
144free_padlist (AV *padlist)
145{
146 /* may be during global destruction */
147 if (SvREFCNT(padlist))
148 {
149 I32 i = AvFILLp(padlist);
150 while (i >= 0)
151 {
152 SV **svp = av_fetch(padlist, i--, FALSE);
153 SV *sv = svp ? *svp : Nullsv;
154 if (sv)
155 SvREFCNT_dec(sv);
156 }
157
158 SvREFCNT_dec((SV*)padlist); 402 SvREFCNT_dec ((SV*)padlist);
403 }
404}
405
406static int
407coro_cv_free (pTHX_ SV *sv, MAGIC *mg)
408{
409 AV *padlist;
410 AV *av = (AV *)mg->mg_obj;
411
412 /* casting is fun. */
413 while (&PL_sv_undef != (SV *)(padlist = (AV *)av_pop (av)))
414 free_padlist (aTHX_ padlist);
415
416 SvREFCNT_dec (av); /* sv_magicext increased the refcount */
417
418 return 0;
419}
420
421#define CORO_MAGIC_type_cv 26
422#define CORO_MAGIC_type_state PERL_MAGIC_ext
423
424static MGVTBL coro_cv_vtbl = {
425 0, 0, 0, 0,
426 coro_cv_free
427};
428
429#define CORO_MAGIC_NN(sv, type) \
430 (expect_true (SvMAGIC (sv)->mg_type == type) \
431 ? SvMAGIC (sv) \
432 : mg_find (sv, type))
433
434#define CORO_MAGIC(sv, type) \
435 (expect_true (SvMAGIC (sv)) \
436 ? CORO_MAGIC_NN (sv, type) \
437 : 0)
438
439#define CORO_MAGIC_cv(cv) CORO_MAGIC (((SV *)(cv)), CORO_MAGIC_type_cv)
440#define CORO_MAGIC_state(sv) CORO_MAGIC_NN (((SV *)(sv)), CORO_MAGIC_type_state)
441
442INLINE struct coro *
443SvSTATE_ (pTHX_ SV *coro)
444{
445 HV *stash;
446 MAGIC *mg;
447
448 if (SvROK (coro))
449 coro = SvRV (coro);
450
451 if (expect_false (SvTYPE (coro) != SVt_PVHV))
452 croak ("Coro::State object required");
453
454 stash = SvSTASH (coro);
455 if (expect_false (stash != coro_stash && stash != coro_state_stash))
456 {
457 /* very slow, but rare, check */
458 if (!sv_derived_from (sv_2mortal (newRV_inc (coro)), "Coro::State"))
459 croak ("Coro::State object required");
460 }
461
462 mg = CORO_MAGIC_state (coro);
463 return (struct coro *)mg->mg_ptr;
464}
465
466#define SvSTATE(sv) SvSTATE_ (aTHX_ (sv))
467
468/* faster than SvSTATE, but expects a coroutine hv */
469#define SvSTATE_hv(hv) ((struct coro *)CORO_MAGIC_NN ((SV *)hv, CORO_MAGIC_type_state)->mg_ptr)
470#define SvSTATE_current SvSTATE_hv (SvRV (coro_current))
471
472/* the next two functions merely cache the padlists */
473static void
474get_padlist (pTHX_ CV *cv)
475{
476 MAGIC *mg = CORO_MAGIC_cv (cv);
477 AV *av;
478
479 if (expect_true (mg && AvFILLp ((av = (AV *)mg->mg_obj)) >= 0))
480 CvPADLIST (cv) = (AV *)AvARRAY (av)[AvFILLp (av)--];
481 else
482 {
483#if CORO_PREFER_PERL_FUNCTIONS
484 /* this is probably cleaner? but also slower! */
485 /* in practise, it seems to be less stable */
486 CV *cp = Perl_cv_clone (aTHX_ cv);
487 CvPADLIST (cv) = CvPADLIST (cp);
488 CvPADLIST (cp) = 0;
489 SvREFCNT_dec (cp);
490#else
491 CvPADLIST (cv) = coro_derive_padlist (aTHX_ cv);
492#endif
493 }
494}
495
496static void
497put_padlist (pTHX_ CV *cv)
498{
499 MAGIC *mg = CORO_MAGIC_cv (cv);
500 AV *av;
501
502 if (expect_false (!mg))
503 mg = sv_magicext ((SV *)cv, (SV *)newAV (), CORO_MAGIC_type_cv, &coro_cv_vtbl, 0, 0);
504
505 av = (AV *)mg->mg_obj;
506
507 if (expect_false (AvFILLp (av) >= AvMAX (av)))
508 av_extend (av, AvMAX (av) + 1);
509
510 AvARRAY (av)[++AvFILLp (av)] = (SV *)CvPADLIST (cv);
511}
512
513/** load & save, init *******************************************************/
514
515static void
516load_perl (pTHX_ Coro__State c)
517{
518 perl_slots *slot = c->slot;
519 c->slot = 0;
520
521 PL_mainstack = c->mainstack;
522
523 GvSV (PL_defgv) = slot->defsv;
524 GvAV (PL_defgv) = slot->defav;
525 GvSV (PL_errgv) = slot->errsv;
526 GvSV (irsgv) = slot->irsgv;
527 GvHV (PL_hintgv) = slot->hinthv;
528
529 #define VAR(name,type) PL_ ## name = slot->name;
530 # include "state.h"
531 #undef VAR
532
533 {
534 dSP;
535
536 CV *cv;
537
538 /* now do the ugly restore mess */
539 while (expect_true (cv = (CV *)POPs))
540 {
541 put_padlist (aTHX_ cv); /* mark this padlist as available */
542 CvDEPTH (cv) = PTR2IV (POPs);
543 CvPADLIST (cv) = (AV *)POPs;
544 }
545
546 PUTBACK;
159 } 547 }
160}
161 548
162/* the next tow functions merely cache the padlists */ 549 slf_frame = c->slf_frame;
163STATIC void 550 CORO_THROW = c->except;
164get_padlist (CV *cv)
165{
166 SV **he = hv_fetch (padlist_cache, (void *)&cv, sizeof (CV *), 0);
167
168 if (he && AvFILLp ((AV *)*he) >= 0)
169 CvPADLIST (cv) = (AV *)av_pop ((AV *)*he);
170 else
171 CvPADLIST (cv) = clone_padlist (CvPADLIST (cv));
172} 551}
173 552
174STATIC void
175put_padlist (CV *cv)
176{
177 SV **he = hv_fetch (padlist_cache, (void *)&cv, sizeof (CV *), 1);
178
179 if (SvTYPE (*he) != SVt_PVAV)
180 {
181 SvREFCNT_dec (*he);
182 *he = (SV *)newAV ();
183 }
184
185 av_push ((AV *)*he, (SV *)CvPADLIST (cv));
186}
187
188static void 553static void
189SAVE(pTHX_ Coro__State c) 554save_perl (pTHX_ Coro__State c)
190{ 555{
556 c->except = CORO_THROW;
557 c->slf_frame = slf_frame;
558
191 { 559 {
192 dSP; 560 dSP;
193 I32 cxix = cxstack_ix; 561 I32 cxix = cxstack_ix;
562 PERL_CONTEXT *ccstk = cxstack;
194 PERL_SI *top_si = PL_curstackinfo; 563 PERL_SI *top_si = PL_curstackinfo;
195 PERL_CONTEXT *ccstk = cxstack;
196 564
197 /* 565 /*
198 * the worst thing you can imagine happens first - we have to save 566 * the worst thing you can imagine happens first - we have to save
199 * (and reinitialize) all cv's in the whole callchain :( 567 * (and reinitialize) all cv's in the whole callchain :(
200 */ 568 */
201 569
202 PUSHs (Nullsv); 570 XPUSHs (Nullsv);
203 /* this loop was inspired by pp_caller */ 571 /* this loop was inspired by pp_caller */
204 for (;;) 572 for (;;)
205 { 573 {
206 while (cxix >= 0) 574 while (expect_true (cxix >= 0))
207 { 575 {
208 PERL_CONTEXT *cx = &ccstk[cxix--]; 576 PERL_CONTEXT *cx = &ccstk[cxix--];
209 577
210 if (CxTYPE(cx) == CXt_SUB) 578 if (expect_true (CxTYPE (cx) == CXt_SUB || CxTYPE (cx) == CXt_FORMAT))
211 { 579 {
212 CV *cv = cx->blk_sub.cv; 580 CV *cv = cx->blk_sub.cv;
581
213 if (CvDEPTH(cv)) 582 if (expect_true (CvDEPTH (cv)))
214 { 583 {
215#ifdef USE_THREADS
216 XPUSHs ((SV *)CvOWNER(cv));
217#endif
218 EXTEND (SP, 3); 584 EXTEND (SP, 3);
219 PUSHs ((SV *)CvDEPTH(cv));
220 PUSHs ((SV *)CvPADLIST(cv)); 585 PUSHs ((SV *)CvPADLIST (cv));
586 PUSHs (INT2PTR (SV *, (IV)CvDEPTH (cv)));
221 PUSHs ((SV *)cv); 587 PUSHs ((SV *)cv);
222 588
223 get_padlist (cv);
224
225 CvDEPTH(cv) = 0; 589 CvDEPTH (cv) = 0;
226#ifdef USE_THREADS 590 get_padlist (aTHX_ cv);
227 CvOWNER(cv) = 0;
228 error must unlock this cv etc.. etc...
229 if you are here wondering about this error message then
230 the reason is that it will not work as advertised yet
231#endif
232 } 591 }
233 } 592 }
234 else if (CxTYPE(cx) == CXt_FORMAT) 593 }
594
595 if (expect_true (top_si->si_type == PERLSI_MAIN))
596 break;
597
598 top_si = top_si->si_prev;
599 ccstk = top_si->si_cxstack;
600 cxix = top_si->si_cxix;
601 }
602
603 PUTBACK;
604 }
605
606 /* allocate some space on the context stack for our purposes */
607 /* we manually unroll here, as usually 2 slots is enough */
608 if (SLOT_COUNT >= 1) CXINC;
609 if (SLOT_COUNT >= 2) CXINC;
610 if (SLOT_COUNT >= 3) CXINC;
611 {
612 int i;
613 for (i = 3; i < SLOT_COUNT; ++i)
614 CXINC;
615 }
616 cxstack_ix -= SLOT_COUNT; /* undo allocation */
617
618 c->mainstack = PL_mainstack;
619
620 {
621 perl_slots *slot = c->slot = (perl_slots *)(cxstack + cxstack_ix + 1);
622
623 slot->defav = GvAV (PL_defgv);
624 slot->defsv = DEFSV;
625 slot->errsv = ERRSV;
626 slot->irsgv = GvSV (irsgv);
627 slot->hinthv = GvHV (PL_hintgv);
628
629 #define VAR(name,type) slot->name = PL_ ## name;
630 # include "state.h"
631 #undef VAR
632 }
633}
634
635/*
636 * allocate various perl stacks. This is almost an exact copy
637 * of perl.c:init_stacks, except that it uses less memory
638 * on the (sometimes correct) assumption that coroutines do
639 * not usually need a lot of stackspace.
640 */
641#if CORO_PREFER_PERL_FUNCTIONS
642# define coro_init_stacks(thx) init_stacks ()
643#else
644static void
645coro_init_stacks (pTHX)
646{
647 PL_curstackinfo = new_stackinfo(32, 8);
648 PL_curstackinfo->si_type = PERLSI_MAIN;
649 PL_curstack = PL_curstackinfo->si_stack;
650 PL_mainstack = PL_curstack; /* remember in case we switch stacks */
651
652 PL_stack_base = AvARRAY(PL_curstack);
653 PL_stack_sp = PL_stack_base;
654 PL_stack_max = PL_stack_base + AvMAX(PL_curstack);
655
656 New(50,PL_tmps_stack,32,SV*);
657 PL_tmps_floor = -1;
658 PL_tmps_ix = -1;
659 PL_tmps_max = 32;
660
661 New(54,PL_markstack,16,I32);
662 PL_markstack_ptr = PL_markstack;
663 PL_markstack_max = PL_markstack + 16;
664
665#ifdef SET_MARK_OFFSET
666 SET_MARK_OFFSET;
667#endif
668
669 New(54,PL_scopestack,8,I32);
670 PL_scopestack_ix = 0;
671 PL_scopestack_max = 8;
672
673 New(54,PL_savestack,24,ANY);
674 PL_savestack_ix = 0;
675 PL_savestack_max = 24;
676
677#if !PERL_VERSION_ATLEAST (5,10,0)
678 New(54,PL_retstack,4,OP*);
679 PL_retstack_ix = 0;
680 PL_retstack_max = 4;
681#endif
682}
683#endif
684
685/*
686 * destroy the stacks, the callchain etc...
687 */
688static void
689coro_destruct_stacks (pTHX)
690{
691 while (PL_curstackinfo->si_next)
692 PL_curstackinfo = PL_curstackinfo->si_next;
693
694 while (PL_curstackinfo)
695 {
696 PERL_SI *p = PL_curstackinfo->si_prev;
697
698 if (!IN_DESTRUCT)
699 SvREFCNT_dec (PL_curstackinfo->si_stack);
700
701 Safefree (PL_curstackinfo->si_cxstack);
702 Safefree (PL_curstackinfo);
703 PL_curstackinfo = p;
704 }
705
706 Safefree (PL_tmps_stack);
707 Safefree (PL_markstack);
708 Safefree (PL_scopestack);
709 Safefree (PL_savestack);
710#if !PERL_VERSION_ATLEAST (5,10,0)
711 Safefree (PL_retstack);
712#endif
713}
714
715#define CORO_RSS \
716 rss += sizeof (SYM (curstackinfo)); \
717 rss += (SYM (curstackinfo->si_cxmax) + 1) * sizeof (PERL_CONTEXT); \
718 rss += sizeof (SV) + sizeof (struct xpvav) + (1 + AvMAX (SYM (curstack))) * sizeof (SV *); \
719 rss += SYM (tmps_max) * sizeof (SV *); \
720 rss += (SYM (markstack_max) - SYM (markstack_ptr)) * sizeof (I32); \
721 rss += SYM (scopestack_max) * sizeof (I32); \
722 rss += SYM (savestack_max) * sizeof (ANY);
723
724static size_t
725coro_rss (pTHX_ struct coro *coro)
726{
727 size_t rss = sizeof (*coro);
728
729 if (coro->mainstack)
730 {
731 if (coro->flags & CF_RUNNING)
732 {
733 #define SYM(sym) PL_ ## sym
734 CORO_RSS;
735 #undef SYM
736 }
737 else
738 {
739 #define SYM(sym) coro->slot->sym
740 CORO_RSS;
741 #undef SYM
742 }
743 }
744
745 return rss;
746}
747
748/** coroutine stack handling ************************************************/
749
750static int (*orig_sigelem_get) (pTHX_ SV *sv, MAGIC *mg);
751static int (*orig_sigelem_set) (pTHX_ SV *sv, MAGIC *mg);
752static int (*orig_sigelem_clr) (pTHX_ SV *sv, MAGIC *mg);
753
754/* apparently < 5.8.8 */
755#ifndef MgPV_nolen_const
756#define MgPV_nolen_const(mg) (((((int)(mg)->mg_len)) == HEf_SVKEY) ? \
757 SvPV_nolen((SV*)((mg)->mg_ptr)) : \
758 (const char*)(mg)->mg_ptr)
759#endif
760
761/*
762 * This overrides the default magic get method of %SIG elements.
763 * The original one doesn't provide for reading back of PL_diehook/PL_warnhook
764 * and instead of tryign to save and restore the hash elements, we just provide
765 * readback here.
766 * We only do this when the hook is != 0, as they are often set to 0 temporarily,
767 * not expecting this to actually change the hook. This is a potential problem
768 * when a schedule happens then, but we ignore this.
769 */
770static int
771coro_sigelem_get (pTHX_ SV *sv, MAGIC *mg)
772{
773 const char *s = MgPV_nolen_const (mg);
774
775 if (*s == '_')
776 {
777 SV **svp = 0;
778
779 if (strEQ (s, "__DIE__" )) svp = &PL_diehook;
780 if (strEQ (s, "__WARN__")) svp = &PL_warnhook;
781
782 if (svp)
783 {
784 sv_setsv (sv, *svp ? *svp : &PL_sv_undef);
785 return 0;
786 }
787 }
788
789 return orig_sigelem_get ? orig_sigelem_get (aTHX_ sv, mg) : 0;
790}
791
792static int
793coro_sigelem_clr (pTHX_ SV *sv, MAGIC *mg)
794{
795 const char *s = MgPV_nolen_const (mg);
796
797 if (*s == '_')
798 {
799 SV **svp = 0;
800
801 if (strEQ (s, "__DIE__" )) svp = &PL_diehook;
802 if (strEQ (s, "__WARN__")) svp = &PL_warnhook;
803
804 if (svp)
805 {
806 SV *old = *svp;
807 *svp = 0;
808 SvREFCNT_dec (old);
809 return 0;
810 }
811 }
812
813 return orig_sigelem_clr ? orig_sigelem_clr (aTHX_ sv, mg) : 0;
814}
815
816static int
817coro_sigelem_set (pTHX_ SV *sv, MAGIC *mg)
818{
819 const char *s = MgPV_nolen_const (mg);
820
821 if (*s == '_')
822 {
823 SV **svp = 0;
824
825 if (strEQ (s, "__DIE__" )) svp = &PL_diehook;
826 if (strEQ (s, "__WARN__")) svp = &PL_warnhook;
827
828 if (svp)
829 {
830 SV *old = *svp;
831 *svp = newSVsv (sv);
832 SvREFCNT_dec (old);
833 return 0;
834 }
835 }
836
837 return orig_sigelem_set ? orig_sigelem_set (aTHX_ sv, mg) : 0;
838}
839
840static void
841prepare_nop (pTHX_ struct coro_transfer_args *ta)
842{
843 /* kind of mega-hacky, but works */
844 ta->next = ta->prev = (struct coro *)ta;
845}
846
847static int
848slf_check_nop (pTHX_ struct CoroSLF *frame)
849{
850 return 0;
851}
852
853static int
854slf_check_repeat (pTHX_ struct CoroSLF *frame)
855{
856 return 1;
857}
858
859static UNOP coro_setup_op;
860
861static void NOINLINE /* noinline to keep it out of the transfer fast path */
862coro_setup (pTHX_ struct coro *coro)
863{
864 /*
865 * emulate part of the perl startup here.
866 */
867 coro_init_stacks (aTHX);
868
869 PL_runops = RUNOPS_DEFAULT;
870 PL_curcop = &PL_compiling;
871 PL_in_eval = EVAL_NULL;
872 PL_comppad = 0;
873 PL_comppad_name = 0;
874 PL_comppad_name_fill = 0;
875 PL_comppad_name_floor = 0;
876 PL_curpm = 0;
877 PL_curpad = 0;
878 PL_localizing = 0;
879 PL_dirty = 0;
880 PL_restartop = 0;
881#if PERL_VERSION_ATLEAST (5,10,0)
882 PL_parser = 0;
883#endif
884 PL_hints = 0;
885
886 /* recreate the die/warn hooks */
887 PL_diehook = 0; SvSetMagicSV (*hv_fetch (hv_sig, "__DIE__" , sizeof ("__DIE__" ) - 1, 1), rv_diehook );
888 PL_warnhook = 0; SvSetMagicSV (*hv_fetch (hv_sig, "__WARN__", sizeof ("__WARN__") - 1, 1), rv_warnhook);
889
890 GvSV (PL_defgv) = newSV (0);
891 GvAV (PL_defgv) = coro->args; coro->args = 0;
892 GvSV (PL_errgv) = newSV (0);
893 GvSV (irsgv) = newSVpvn ("\n", 1); sv_magic (GvSV (irsgv), (SV *)irsgv, PERL_MAGIC_sv, "/", 0);
894 GvHV (PL_hintgv) = 0;
895 PL_rs = newSVsv (GvSV (irsgv));
896 PL_defoutgv = (GV *)SvREFCNT_inc_NN (stdoutgv);
897
898 {
899 dSP;
900 UNOP myop;
901
902 Zero (&myop, 1, UNOP);
903 myop.op_next = Nullop;
904 myop.op_type = OP_ENTERSUB;
905 myop.op_flags = OPf_WANT_VOID;
906
907 PUSHMARK (SP);
908 PUSHs ((SV *)coro->startcv);
909 PUTBACK;
910 PL_op = (OP *)&myop;
911 PL_op = PL_ppaddr[OP_ENTERSUB](aTHX);
912 }
913
914 /* this newly created coroutine might be run on an existing cctx which most
915 * likely was suspended in pp_slf, so we have to emulate entering pp_slf here.
916 */
917 slf_frame.prepare = prepare_nop; /* provide a nop function for an eventual pp_slf */
918 slf_frame.check = slf_check_nop; /* signal pp_slf to not repeat */
919
920 /* and we have to provide the pp_slf op in any case, so pp_slf can skip it */
921 coro_setup_op.op_next = PL_op;
922 coro_setup_op.op_type = OP_ENTERSUB;
923 coro_setup_op.op_ppaddr = pp_slf;
924 /* no flags etc. required, as an init function won't be called */
925
926 PL_op = (OP *)&coro_setup_op;
927
928 /* copy throw, in case it was set before coro_setup */
929 CORO_THROW = coro->except;
930}
931
932static void
933coro_destruct (pTHX_ struct coro *coro)
934{
935 if (!IN_DESTRUCT)
936 {
937 /* restore all saved variables and stuff */
938 LEAVE_SCOPE (0);
939 assert (PL_tmps_floor == -1);
940
941 /* free all temporaries */
942 FREETMPS;
943 assert (PL_tmps_ix == -1);
944
945 /* unwind all extra stacks */
946 POPSTACK_TO (PL_mainstack);
947
948 /* unwind main stack */
949 dounwind (-1);
950 }
951
952 SvREFCNT_dec (GvSV (PL_defgv));
953 SvREFCNT_dec (GvAV (PL_defgv));
954 SvREFCNT_dec (GvSV (PL_errgv));
955 SvREFCNT_dec (PL_defoutgv);
956 SvREFCNT_dec (PL_rs);
957 SvREFCNT_dec (GvSV (irsgv));
958 SvREFCNT_dec (GvHV (PL_hintgv));
959
960 SvREFCNT_dec (PL_diehook);
961 SvREFCNT_dec (PL_warnhook);
962
963 SvREFCNT_dec (coro->saved_deffh);
964 SvREFCNT_dec (coro->rouse_cb);
965 SvREFCNT_dec (coro->invoke_cb);
966 SvREFCNT_dec (coro->invoke_av);
967
968 coro_destruct_stacks (aTHX);
969}
970
971INLINE void
972free_coro_mortal (pTHX)
973{
974 if (expect_true (coro_mortal))
975 {
976 SvREFCNT_dec (coro_mortal);
977 coro_mortal = 0;
978 }
979}
980
981static int
982runops_trace (pTHX)
983{
984 COP *oldcop = 0;
985 int oldcxix = -2;
986
987 while ((PL_op = CALL_FPTR (PL_op->op_ppaddr) (aTHX)))
988 {
989 PERL_ASYNC_CHECK ();
990
991 if (cctx_current->flags & CC_TRACE_ALL)
992 {
993 if (PL_op->op_type == OP_LEAVESUB && cctx_current->flags & CC_TRACE_SUB)
994 {
995 PERL_CONTEXT *cx = &cxstack[cxstack_ix];
996 SV **bot, **top;
997 AV *av = newAV (); /* return values */
998 SV **cb;
999 dSP;
1000
1001 GV *gv = CvGV (cx->blk_sub.cv);
1002 SV *fullname = sv_2mortal (newSV (0));
1003 if (isGV (gv))
1004 gv_efullname3 (fullname, gv, 0);
1005
1006 bot = PL_stack_base + cx->blk_oldsp + 1;
1007 top = cx->blk_gimme == G_ARRAY ? SP + 1
1008 : cx->blk_gimme == G_SCALAR ? bot + 1
1009 : bot;
1010
1011 av_extend (av, top - bot);
1012 while (bot < top)
1013 av_push (av, SvREFCNT_inc_NN (*bot++));
1014
1015 PL_runops = RUNOPS_DEFAULT;
1016 ENTER;
1017 SAVETMPS;
1018 EXTEND (SP, 3);
1019 PUSHMARK (SP);
1020 PUSHs (&PL_sv_no);
1021 PUSHs (fullname);
1022 PUSHs (sv_2mortal (newRV_noinc ((SV *)av)));
1023 PUTBACK;
1024 cb = hv_fetch ((HV *)SvRV (coro_current), "_trace_sub_cb", sizeof ("_trace_sub_cb") - 1, 0);
1025 if (cb) call_sv (*cb, G_KEEPERR | G_EVAL | G_VOID | G_DISCARD);
1026 SPAGAIN;
1027 FREETMPS;
1028 LEAVE;
1029 PL_runops = runops_trace;
1030 }
1031
1032 if (oldcop != PL_curcop)
1033 {
1034 oldcop = PL_curcop;
1035
1036 if (PL_curcop != &PL_compiling)
1037 {
1038 SV **cb;
1039
1040 if (oldcxix != cxstack_ix && cctx_current->flags & CC_TRACE_SUB)
1041 {
1042 PERL_CONTEXT *cx = &cxstack[cxstack_ix];
1043
1044 if (CxTYPE (cx) == CXt_SUB && oldcxix < cxstack_ix)
1045 {
1046 dSP;
1047 GV *gv = CvGV (cx->blk_sub.cv);
1048 SV *fullname = sv_2mortal (newSV (0));
1049
1050 if (isGV (gv))
1051 gv_efullname3 (fullname, gv, 0);
1052
1053 PL_runops = RUNOPS_DEFAULT;
1054 ENTER;
1055 SAVETMPS;
1056 EXTEND (SP, 3);
1057 PUSHMARK (SP);
1058 PUSHs (&PL_sv_yes);
1059 PUSHs (fullname);
1060 PUSHs (CxHASARGS (cx) ? sv_2mortal (newRV_inc ((SV *)cx->blk_sub.argarray)) : &PL_sv_undef);
1061 PUTBACK;
1062 cb = hv_fetch ((HV *)SvRV (coro_current), "_trace_sub_cb", sizeof ("_trace_sub_cb") - 1, 0);
1063 if (cb) call_sv (*cb, G_KEEPERR | G_EVAL | G_VOID | G_DISCARD);
1064 SPAGAIN;
1065 FREETMPS;
1066 LEAVE;
1067 PL_runops = runops_trace;
1068 }
1069
1070 oldcxix = cxstack_ix;
1071 }
1072
1073 if (cctx_current->flags & CC_TRACE_LINE)
1074 {
1075 dSP;
1076
1077 PL_runops = RUNOPS_DEFAULT;
1078 ENTER;
1079 SAVETMPS;
1080 EXTEND (SP, 3);
1081 PL_runops = RUNOPS_DEFAULT;
1082 PUSHMARK (SP);
1083 PUSHs (sv_2mortal (newSVpv (OutCopFILE (oldcop), 0)));
1084 PUSHs (sv_2mortal (newSViv (CopLINE (oldcop))));
1085 PUTBACK;
1086 cb = hv_fetch ((HV *)SvRV (coro_current), "_trace_line_cb", sizeof ("_trace_line_cb") - 1, 0);
1087 if (cb) call_sv (*cb, G_KEEPERR | G_EVAL | G_VOID | G_DISCARD);
1088 SPAGAIN;
1089 FREETMPS;
1090 LEAVE;
1091 PL_runops = runops_trace;
1092 }
1093 }
1094 }
1095 }
1096 }
1097
1098 TAINT_NOT;
1099 return 0;
1100}
1101
1102static struct CoroSLF cctx_ssl_frame;
1103
1104static void
1105slf_prepare_set_stacklevel (pTHX_ struct coro_transfer_args *ta)
1106{
1107 ta->prev = 0;
1108}
1109
1110static int
1111slf_check_set_stacklevel (pTHX_ struct CoroSLF *frame)
1112{
1113 *frame = cctx_ssl_frame;
1114
1115 return frame->check (aTHX_ frame); /* execute the restored frame - there must be one */
1116}
1117
1118/* initialises PL_top_env and injects a pseudo-slf-call to set the stacklevel */
1119static void NOINLINE
1120cctx_prepare (pTHX)
1121{
1122 PL_top_env = &PL_start_env;
1123
1124 if (cctx_current->flags & CC_TRACE)
1125 PL_runops = runops_trace;
1126
1127 /* we already must be executing an SLF op, there is no other valid way
1128 * that can lead to creation of a new cctx */
1129 assert (("FATAL: can't prepare slf-less cctx in Coro module (please report)",
1130 slf_frame.prepare && PL_op->op_ppaddr == pp_slf));
1131
1132 /* we must emulate leaving pp_slf, which is done inside slf_check_set_stacklevel */
1133 cctx_ssl_frame = slf_frame;
1134
1135 slf_frame.prepare = slf_prepare_set_stacklevel;
1136 slf_frame.check = slf_check_set_stacklevel;
1137}
1138
1139/* the tail of transfer: execute stuff we can only do after a transfer */
1140INLINE void
1141transfer_tail (pTHX)
1142{
1143 free_coro_mortal (aTHX);
1144}
1145
1146/*
1147 * this is a _very_ stripped down perl interpreter ;)
1148 */
1149static void
1150cctx_run (void *arg)
1151{
1152#ifdef USE_ITHREADS
1153# if CORO_PTHREAD
1154 PERL_SET_CONTEXT (coro_thx);
1155# endif
1156#endif
1157 {
1158 dTHX;
1159
1160 /* normally we would need to skip the entersub here */
1161 /* not doing so will re-execute it, which is exactly what we want */
1162 /* PL_nop = PL_nop->op_next */
1163
1164 /* inject a fake subroutine call to cctx_init */
1165 cctx_prepare (aTHX);
1166
1167 /* cctx_run is the alternative tail of transfer() */
1168 transfer_tail (aTHX);
1169
1170 /* somebody or something will hit me for both perl_run and PL_restartop */
1171 PL_restartop = PL_op;
1172 perl_run (PL_curinterp);
1173 /*
1174 * Unfortunately, there is no way to get at the return values of the
1175 * coro body here, as perl_run destroys these
1176 */
1177
1178 /*
1179 * If perl-run returns we assume exit() was being called or the coro
1180 * fell off the end, which seems to be the only valid (non-bug)
1181 * reason for perl_run to return. We try to exit by jumping to the
1182 * bootstrap-time "top" top_env, as we cannot restore the "main"
1183 * coroutine as Coro has no such concept.
1184 * This actually isn't valid with the pthread backend, but OSes requiring
1185 * that backend are too broken to do it in a standards-compliant way.
1186 */
1187 PL_top_env = main_top_env;
1188 JMPENV_JUMP (2); /* I do not feel well about the hardcoded 2 at all */
1189 }
1190}
1191
1192static coro_cctx *
1193cctx_new ()
1194{
1195 coro_cctx *cctx;
1196
1197 ++cctx_count;
1198 New (0, cctx, 1, coro_cctx);
1199
1200 cctx->gen = cctx_gen;
1201 cctx->flags = 0;
1202 cctx->idle_sp = 0; /* can be accessed by transfer between cctx_run and set_stacklevel, on throw */
1203
1204 return cctx;
1205}
1206
1207/* create a new cctx only suitable as source */
1208static coro_cctx *
1209cctx_new_empty ()
1210{
1211 coro_cctx *cctx = cctx_new ();
1212
1213 cctx->sptr = 0;
1214 coro_create (&cctx->cctx, 0, 0, 0, 0);
1215
1216 return cctx;
1217}
1218
1219/* create a new cctx suitable as destination/running a perl interpreter */
1220static coro_cctx *
1221cctx_new_run ()
1222{
1223 coro_cctx *cctx = cctx_new ();
1224 void *stack_start;
1225 size_t stack_size;
1226
1227#if HAVE_MMAP
1228 cctx->ssize = ((cctx_stacksize * sizeof (long) + PAGESIZE - 1) / PAGESIZE + CORO_STACKGUARD) * PAGESIZE;
1229 /* mmap supposedly does allocate-on-write for us */
1230 cctx->sptr = mmap (0, cctx->ssize, PROT_EXEC|PROT_READ|PROT_WRITE, MAP_PRIVATE|MAP_ANONYMOUS, 0, 0);
1231
1232 if (cctx->sptr != (void *)-1)
1233 {
1234 #if CORO_STACKGUARD
1235 mprotect (cctx->sptr, CORO_STACKGUARD * PAGESIZE, PROT_NONE);
1236 #endif
1237 stack_start = (char *)cctx->sptr + CORO_STACKGUARD * PAGESIZE;
1238 stack_size = cctx->ssize - CORO_STACKGUARD * PAGESIZE;
1239 cctx->flags |= CC_MAPPED;
1240 }
1241 else
1242#endif
1243 {
1244 cctx->ssize = cctx_stacksize * (long)sizeof (long);
1245 New (0, cctx->sptr, cctx_stacksize, long);
1246
1247 if (!cctx->sptr)
1248 {
1249 perror ("FATAL: unable to allocate stack for coroutine, exiting.");
1250 _exit (EXIT_FAILURE);
1251 }
1252
1253 stack_start = cctx->sptr;
1254 stack_size = cctx->ssize;
1255 }
1256
1257 #if CORO_USE_VALGRIND
1258 cctx->valgrind_id = VALGRIND_STACK_REGISTER ((char *)stack_start, (char *)stack_start + stack_size);
1259 #endif
1260
1261 coro_create (&cctx->cctx, cctx_run, (void *)cctx, stack_start, stack_size);
1262
1263 return cctx;
1264}
1265
1266static void
1267cctx_destroy (coro_cctx *cctx)
1268{
1269 if (!cctx)
1270 return;
1271
1272 assert (cctx != cctx_current);//D temporary
1273
1274 --cctx_count;
1275 coro_destroy (&cctx->cctx);
1276
1277 /* coro_transfer creates new, empty cctx's */
1278 if (cctx->sptr)
1279 {
1280 #if CORO_USE_VALGRIND
1281 VALGRIND_STACK_DEREGISTER (cctx->valgrind_id);
1282 #endif
1283
1284#if HAVE_MMAP
1285 if (cctx->flags & CC_MAPPED)
1286 munmap (cctx->sptr, cctx->ssize);
1287 else
1288#endif
1289 Safefree (cctx->sptr);
1290 }
1291
1292 Safefree (cctx);
1293}
1294
1295/* wether this cctx should be destructed */
1296#define CCTX_EXPIRED(cctx) ((cctx)->gen != cctx_gen || ((cctx)->flags & CC_NOREUSE))
1297
1298static coro_cctx *
1299cctx_get (pTHX)
1300{
1301 while (expect_true (cctx_first))
1302 {
1303 coro_cctx *cctx = cctx_first;
1304 cctx_first = cctx->next;
1305 --cctx_idle;
1306
1307 if (expect_true (!CCTX_EXPIRED (cctx)))
1308 return cctx;
1309
1310 cctx_destroy (cctx);
1311 }
1312
1313 return cctx_new_run ();
1314}
1315
1316static void
1317cctx_put (coro_cctx *cctx)
1318{
1319 assert (("FATAL: cctx_put called on non-initialised cctx in Coro (please report)", cctx->sptr));
1320
1321 /* free another cctx if overlimit */
1322 if (expect_false (cctx_idle >= cctx_max_idle))
1323 {
1324 coro_cctx *first = cctx_first;
1325 cctx_first = first->next;
1326 --cctx_idle;
1327
1328 cctx_destroy (first);
1329 }
1330
1331 ++cctx_idle;
1332 cctx->next = cctx_first;
1333 cctx_first = cctx;
1334}
1335
1336/** coroutine switching *****************************************************/
1337
1338static void
1339transfer_check (pTHX_ struct coro *prev, struct coro *next)
1340{
1341 /* TODO: throwing up here is considered harmful */
1342
1343 if (expect_true (prev != next))
1344 {
1345 if (expect_false (!(prev->flags & (CF_RUNNING | CF_NEW))))
1346 croak ("Coro::State::transfer called with a suspended prev Coro::State, but can only transfer from running or new states,");
1347
1348 if (expect_false (next->flags & CF_RUNNING))
1349 croak ("Coro::State::transfer called with running next Coro::State, but can only transfer to inactive states,");
1350
1351 if (expect_false (next->flags & CF_DESTROYED))
1352 croak ("Coro::State::transfer called with destroyed next Coro::State, but can only transfer to inactive states,");
1353
1354#if !PERL_VERSION_ATLEAST (5,10,0)
1355 if (expect_false (PL_lex_state != LEX_NOTPARSING))
1356 croak ("Coro::State::transfer called while parsing, but this is not supported in your perl version,");
1357#endif
1358 }
1359}
1360
1361/* always use the TRANSFER macro */
1362static void NOINLINE /* noinline so we have a fixed stackframe */
1363transfer (pTHX_ struct coro *prev, struct coro *next, int force_cctx)
1364{
1365 dSTACKLEVEL;
1366
1367 /* sometimes transfer is only called to set idle_sp */
1368 if (expect_false (!prev))
1369 {
1370 cctx_current->idle_sp = STACKLEVEL;
1371 assert (cctx_current->idle_te = PL_top_env); /* just for the side-effect when asserts are enabled */
1372 }
1373 else if (expect_true (prev != next))
1374 {
1375 coro_cctx *cctx_prev;
1376
1377 if (expect_false (prev->flags & CF_NEW))
1378 {
1379 /* create a new empty/source context */
1380 prev->flags &= ~CF_NEW;
1381 prev->flags |= CF_RUNNING;
1382 }
1383
1384 prev->flags &= ~CF_RUNNING;
1385 next->flags |= CF_RUNNING;
1386
1387 /* first get rid of the old state */
1388 save_perl (aTHX_ prev);
1389
1390 if (expect_false (next->flags & CF_NEW))
1391 {
1392 /* need to start coroutine */
1393 next->flags &= ~CF_NEW;
1394 /* setup coroutine call */
1395 coro_setup (aTHX_ next);
1396 }
1397 else
1398 load_perl (aTHX_ next);
1399
1400 assert (!prev->cctx);//D temporary
1401
1402 /* possibly untie and reuse the cctx */
1403 if (expect_true (
1404 cctx_current->idle_sp == STACKLEVEL
1405 && !(cctx_current->flags & CC_TRACE)
1406 && !force_cctx
1407 ))
1408 {
1409 /* I assume that stacklevel is a stronger indicator than PL_top_env changes */
1410 assert (("FATAL: current top_env must equal previous top_env in Coro (please report)", PL_top_env == cctx_current->idle_te));
1411
1412 /* if the cctx is about to be destroyed we need to make sure we won't see it in cctx_get. */
1413 /* without this the next cctx_get might destroy the running cctx while still in use */
1414 if (expect_false (CCTX_EXPIRED (cctx_current)))
1415 if (expect_true (!next->cctx))
1416 next->cctx = cctx_get (aTHX);
1417
1418 cctx_put (cctx_current);
1419 }
1420 else
1421 prev->cctx = cctx_current;
1422
1423 ++next->usecount;
1424
1425 cctx_prev = cctx_current;
1426 cctx_current = expect_false (next->cctx) ? next->cctx : cctx_get (aTHX);
1427
1428 next->cctx = 0;
1429
1430 if (expect_false (cctx_prev != cctx_current))
1431 {
1432 cctx_prev->top_env = PL_top_env;
1433 PL_top_env = cctx_current->top_env;
1434 coro_transfer (&cctx_prev->cctx, &cctx_current->cctx);
1435 }
1436
1437 transfer_tail (aTHX);
1438 }
1439}
1440
1441#define TRANSFER(ta, force_cctx) transfer (aTHX_ (ta).prev, (ta).next, (force_cctx))
1442#define TRANSFER_CHECK(ta) transfer_check (aTHX_ (ta).prev, (ta).next)
1443
1444/** high level stuff ********************************************************/
1445
1446static int
1447coro_state_destroy (pTHX_ struct coro *coro)
1448{
1449 if (coro->flags & CF_DESTROYED)
1450 return 0;
1451
1452 if (coro->on_destroy)
1453 coro->on_destroy (aTHX_ coro);
1454
1455 coro->flags |= CF_DESTROYED;
1456
1457 if (coro->flags & CF_READY)
1458 {
1459 /* reduce nready, as destroying a ready coro effectively unreadies it */
1460 /* alternative: look through all ready queues and remove the coro */
1461 --coro_nready;
1462 }
1463 else
1464 coro->flags |= CF_READY; /* make sure it is NOT put into the readyqueue */
1465
1466 if (coro->mainstack && coro->mainstack != main_mainstack)
1467 {
1468 struct coro temp;
1469
1470 assert (("FATAL: tried to destroy currently running coroutine (please report)", !(coro->flags & CF_RUNNING)));
1471
1472 save_perl (aTHX_ &temp);
1473 load_perl (aTHX_ coro);
1474
1475 coro_destruct (aTHX_ coro);
1476
1477 load_perl (aTHX_ &temp);
1478
1479 coro->slot = 0;
1480 }
1481
1482 cctx_destroy (coro->cctx);
1483 SvREFCNT_dec (coro->startcv);
1484 SvREFCNT_dec (coro->args);
1485 SvREFCNT_dec (CORO_THROW);
1486
1487 if (coro->next) coro->next->prev = coro->prev;
1488 if (coro->prev) coro->prev->next = coro->next;
1489 if (coro == coro_first) coro_first = coro->next;
1490
1491 return 1;
1492}
1493
1494static int
1495coro_state_free (pTHX_ SV *sv, MAGIC *mg)
1496{
1497 struct coro *coro = (struct coro *)mg->mg_ptr;
1498 mg->mg_ptr = 0;
1499
1500 coro->hv = 0;
1501
1502 if (--coro->refcnt < 0)
1503 {
1504 coro_state_destroy (aTHX_ coro);
1505 Safefree (coro);
1506 }
1507
1508 return 0;
1509}
1510
1511static int
1512coro_state_dup (pTHX_ MAGIC *mg, CLONE_PARAMS *params)
1513{
1514 struct coro *coro = (struct coro *)mg->mg_ptr;
1515
1516 ++coro->refcnt;
1517
1518 return 0;
1519}
1520
1521static MGVTBL coro_state_vtbl = {
1522 0, 0, 0, 0,
1523 coro_state_free,
1524 0,
1525#ifdef MGf_DUP
1526 coro_state_dup,
1527#else
1528# define MGf_DUP 0
1529#endif
1530};
1531
1532static void
1533prepare_transfer (pTHX_ struct coro_transfer_args *ta, SV *prev_sv, SV *next_sv)
1534{
1535 ta->prev = SvSTATE (prev_sv);
1536 ta->next = SvSTATE (next_sv);
1537 TRANSFER_CHECK (*ta);
1538}
1539
1540static void
1541api_transfer (pTHX_ SV *prev_sv, SV *next_sv)
1542{
1543 struct coro_transfer_args ta;
1544
1545 prepare_transfer (aTHX_ &ta, prev_sv, next_sv);
1546 TRANSFER (ta, 1);
1547}
1548
1549/*****************************************************************************/
1550/* gensub: simple closure generation utility */
1551
1552#define GENSUB_ARG CvXSUBANY (cv).any_ptr
1553
1554/* create a closure from XS, returns a code reference */
1555/* the arg can be accessed via GENSUB_ARG from the callback */
1556/* the callback must use dXSARGS/XSRETURN */
1557static SV *
1558gensub (pTHX_ void (*xsub)(pTHX_ CV *), void *arg)
1559{
1560 CV *cv = (CV *)newSV (0);
1561
1562 sv_upgrade ((SV *)cv, SVt_PVCV);
1563
1564 CvANON_on (cv);
1565 CvISXSUB_on (cv);
1566 CvXSUB (cv) = xsub;
1567 GENSUB_ARG = arg;
1568
1569 return newRV_noinc ((SV *)cv);
1570}
1571
1572/** Coro ********************************************************************/
1573
1574INLINE void
1575coro_enq (pTHX_ struct coro *coro)
1576{
1577 av_push (coro_ready [coro->prio - PRIO_MIN], SvREFCNT_inc_NN (coro->hv));
1578}
1579
1580INLINE SV *
1581coro_deq (pTHX)
1582{
1583 int prio;
1584
1585 for (prio = PRIO_MAX - PRIO_MIN + 1; --prio >= 0; )
1586 if (AvFILLp (coro_ready [prio]) >= 0)
1587 return av_shift (coro_ready [prio]);
1588
1589 return 0;
1590}
1591
1592static int
1593api_ready (pTHX_ SV *coro_sv)
1594{
1595 struct coro *coro;
1596 SV *sv_hook;
1597 void (*xs_hook)(void);
1598
1599 coro = SvSTATE (coro_sv);
1600
1601 if (coro->flags & CF_READY)
1602 return 0;
1603
1604 coro->flags |= CF_READY;
1605
1606 sv_hook = coro_nready ? 0 : coro_readyhook;
1607 xs_hook = coro_nready ? 0 : coroapi.readyhook;
1608
1609 coro_enq (aTHX_ coro);
1610 ++coro_nready;
1611
1612 if (sv_hook)
1613 {
1614 dSP;
1615
1616 ENTER;
1617 SAVETMPS;
1618
1619 PUSHMARK (SP);
1620 PUTBACK;
1621 call_sv (sv_hook, G_VOID | G_DISCARD);
1622
1623 FREETMPS;
1624 LEAVE;
1625 }
1626
1627 if (xs_hook)
1628 xs_hook ();
1629
1630 return 1;
1631}
1632
1633static int
1634api_is_ready (pTHX_ SV *coro_sv)
1635{
1636 return !!(SvSTATE (coro_sv)->flags & CF_READY);
1637}
1638
1639/* expects to own a reference to next->hv */
1640INLINE void
1641prepare_schedule_to (pTHX_ struct coro_transfer_args *ta, struct coro *next)
1642{
1643 SV *prev_sv = SvRV (coro_current);
1644
1645 ta->prev = SvSTATE_hv (prev_sv);
1646 ta->next = next;
1647
1648 TRANSFER_CHECK (*ta);
1649
1650 SvRV_set (coro_current, (SV *)next->hv);
1651
1652 free_coro_mortal (aTHX);
1653 coro_mortal = prev_sv;
1654}
1655
1656static void
1657prepare_schedule (pTHX_ struct coro_transfer_args *ta)
1658{
1659 for (;;)
1660 {
1661 SV *next_sv = coro_deq (aTHX);
1662
1663 if (expect_true (next_sv))
1664 {
1665 struct coro *next = SvSTATE_hv (next_sv);
1666
1667 /* cannot transfer to destroyed coros, skip and look for next */
1668 if (expect_false (next->flags & CF_DESTROYED))
1669 SvREFCNT_dec (next_sv); /* coro_nready has already been taken care of by destroy */
1670 else
1671 {
1672 next->flags &= ~CF_READY;
1673 --coro_nready;
1674
1675 prepare_schedule_to (aTHX_ ta, next);
1676 break;
1677 }
1678 }
1679 else
1680 {
1681 /* nothing to schedule: call the idle handler */
1682 if (SvROK (sv_idle)
1683 && SvOBJECT (SvRV (sv_idle)))
1684 {
1685 ++coro_nready; /* hack so that api_ready doesn't invoke ready hook */
1686 api_ready (aTHX_ SvRV (sv_idle));
1687 --coro_nready;
1688 }
1689 else
1690 {
1691 dSP;
1692
1693 ENTER;
1694 SAVETMPS;
1695
1696 PUSHMARK (SP);
1697 PUTBACK;
1698 call_sv (sv_idle, G_VOID | G_DISCARD);
1699
1700 FREETMPS;
1701 LEAVE;
1702 }
1703 }
1704 }
1705}
1706
1707INLINE void
1708prepare_cede (pTHX_ struct coro_transfer_args *ta)
1709{
1710 api_ready (aTHX_ coro_current);
1711 prepare_schedule (aTHX_ ta);
1712}
1713
1714INLINE void
1715prepare_cede_notself (pTHX_ struct coro_transfer_args *ta)
1716{
1717 SV *prev = SvRV (coro_current);
1718
1719 if (coro_nready)
1720 {
1721 prepare_schedule (aTHX_ ta);
1722 api_ready (aTHX_ prev);
1723 }
1724 else
1725 prepare_nop (aTHX_ ta);
1726}
1727
1728static void
1729api_schedule (pTHX)
1730{
1731 struct coro_transfer_args ta;
1732
1733 prepare_schedule (aTHX_ &ta);
1734 TRANSFER (ta, 1);
1735}
1736
1737static void
1738api_schedule_to (pTHX_ SV *coro_sv)
1739{
1740 struct coro_transfer_args ta;
1741 struct coro *next = SvSTATE (coro_sv);
1742
1743 SvREFCNT_inc_NN (coro_sv);
1744 prepare_schedule_to (aTHX_ &ta, next);
1745}
1746
1747static int
1748api_cede (pTHX)
1749{
1750 struct coro_transfer_args ta;
1751
1752 prepare_cede (aTHX_ &ta);
1753
1754 if (expect_true (ta.prev != ta.next))
1755 {
1756 TRANSFER (ta, 1);
1757 return 1;
1758 }
1759 else
1760 return 0;
1761}
1762
1763static int
1764api_cede_notself (pTHX)
1765{
1766 if (coro_nready)
1767 {
1768 struct coro_transfer_args ta;
1769
1770 prepare_cede_notself (aTHX_ &ta);
1771 TRANSFER (ta, 1);
1772 return 1;
1773 }
1774 else
1775 return 0;
1776}
1777
1778static void
1779api_trace (pTHX_ SV *coro_sv, int flags)
1780{
1781 struct coro *coro = SvSTATE (coro_sv);
1782
1783 if (coro->flags & CF_RUNNING)
1784 croak ("cannot enable tracing on a running coroutine, caught");
1785
1786 if (flags & CC_TRACE)
1787 {
1788 if (!coro->cctx)
1789 coro->cctx = cctx_new_run ();
1790 else if (!(coro->cctx->flags & CC_TRACE))
1791 croak ("cannot enable tracing on coroutine with custom stack, caught");
1792
1793 coro->cctx->flags |= CC_NOREUSE | (flags & (CC_TRACE | CC_TRACE_ALL));
1794 }
1795 else if (coro->cctx && coro->cctx->flags & CC_TRACE)
1796 {
1797 coro->cctx->flags &= ~(CC_TRACE | CC_TRACE_ALL);
1798
1799 if (coro->flags & CF_RUNNING)
1800 PL_runops = RUNOPS_DEFAULT;
1801 else
1802 coro->slot->runops = RUNOPS_DEFAULT;
1803 }
1804}
1805
1806static void
1807coro_call_on_destroy (pTHX_ struct coro *coro)
1808{
1809 SV **on_destroyp = hv_fetch (coro->hv, "_on_destroy", sizeof ("_on_destroy") - 1, 0);
1810 SV **statusp = hv_fetch (coro->hv, "_status", sizeof ("_status") - 1, 0);
1811
1812 if (on_destroyp)
1813 {
1814 AV *on_destroy = (AV *)SvRV (*on_destroyp);
1815
1816 while (AvFILLp (on_destroy) >= 0)
1817 {
1818 dSP; /* don't disturb outer sp */
1819 SV *cb = av_pop (on_destroy);
1820
1821 PUSHMARK (SP);
1822
1823 if (statusp)
1824 {
1825 int i;
1826 AV *status = (AV *)SvRV (*statusp);
1827 EXTEND (SP, AvFILLp (status) + 1);
1828
1829 for (i = 0; i <= AvFILLp (status); ++i)
1830 PUSHs (AvARRAY (status)[i]);
1831 }
1832
1833 PUTBACK;
1834 call_sv (sv_2mortal (cb), G_VOID | G_DISCARD);
1835 }
1836 }
1837}
1838
1839static void
1840slf_init_terminate (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
1841{
1842 int i;
1843 HV *hv = (HV *)SvRV (coro_current);
1844 AV *av = newAV ();
1845
1846 av_extend (av, items - 1);
1847 for (i = 0; i < items; ++i)
1848 av_push (av, SvREFCNT_inc_NN (arg [i]));
1849
1850 hv_store (hv, "_status", sizeof ("_status") - 1, newRV_noinc ((SV *)av), 0);
1851
1852 av_push (av_destroy, (SV *)newRV_inc ((SV *)hv)); /* RVinc for perl */
1853 api_ready (aTHX_ sv_manager);
1854
1855 frame->prepare = prepare_schedule;
1856 frame->check = slf_check_repeat;
1857}
1858
1859/*****************************************************************************/
1860/* async pool handler */
1861
1862static int
1863slf_check_pool_handler (pTHX_ struct CoroSLF *frame)
1864{
1865 HV *hv = (HV *)SvRV (coro_current);
1866 struct coro *coro = (struct coro *)frame->data;
1867
1868 if (!coro->invoke_cb)
1869 return 1; /* loop till we have invoke */
1870 else
1871 {
1872 hv_store (hv, "desc", sizeof ("desc") - 1,
1873 newSVpvn ("[async_pool]", sizeof ("[async_pool]") - 1), 0);
1874
1875 coro->saved_deffh = SvREFCNT_inc_NN ((SV *)PL_defoutgv);
1876
1877 {
1878 dSP;
1879 XPUSHs (sv_2mortal (coro->invoke_cb)); coro->invoke_cb = 0;
1880 PUTBACK;
1881 }
1882
1883 SvREFCNT_dec (GvAV (PL_defgv));
1884 GvAV (PL_defgv) = coro->invoke_av;
1885 coro->invoke_av = 0;
1886
1887 return 0;
1888 }
1889}
1890
1891static void
1892slf_init_pool_handler (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
1893{
1894 HV *hv = (HV *)SvRV (coro_current);
1895 struct coro *coro = SvSTATE_hv ((SV *)hv);
1896
1897 if (expect_true (coro->saved_deffh))
1898 {
1899 /* subsequent iteration */
1900 SvREFCNT_dec ((SV *)PL_defoutgv); PL_defoutgv = (GV *)coro->saved_deffh;
1901 coro->saved_deffh = 0;
1902
1903 if (coro_rss (aTHX_ coro) > SvUV (sv_pool_rss)
1904 || av_len (av_async_pool) + 1 >= SvIV (sv_pool_size))
1905 {
1906 coro->invoke_cb = SvREFCNT_inc_NN ((SV *)cv_coro_terminate);
1907 coro->invoke_av = newAV ();
1908
1909 frame->prepare = prepare_nop;
1910 }
1911 else
1912 {
1913 av_clear (GvAV (PL_defgv));
1914 hv_store (hv, "desc", sizeof ("desc") - 1, SvREFCNT_inc_NN (sv_async_pool_idle), 0);
1915
1916 coro->prio = 0;
1917
1918 if (coro->cctx && (coro->cctx->flags & CC_TRACE))
1919 api_trace (aTHX_ coro_current, 0);
1920
1921 frame->prepare = prepare_schedule;
1922 av_push (av_async_pool, SvREFCNT_inc (hv));
1923 }
1924 }
1925 else
1926 {
1927 /* first iteration, simply fall through */
1928 frame->prepare = prepare_nop;
1929 }
1930
1931 frame->check = slf_check_pool_handler;
1932 frame->data = (void *)coro;
1933}
1934
1935/*****************************************************************************/
1936/* rouse callback */
1937
1938#define CORO_MAGIC_type_rouse PERL_MAGIC_ext
1939
1940static void
1941coro_rouse_callback (pTHX_ CV *cv)
1942{
1943 dXSARGS;
1944 SV *data = (SV *)GENSUB_ARG;
1945
1946 if (SvTYPE (SvRV (data)) != SVt_PVAV)
1947 {
1948 /* first call, set args */
1949 AV *av = newAV ();
1950 SV *coro = SvRV (data);
1951
1952 SvRV_set (data, (SV *)av);
1953 api_ready (aTHX_ coro);
1954 SvREFCNT_dec (coro);
1955
1956 /* better take a full copy of the arguments */
1957 while (items--)
1958 av_store (av, items, newSVsv (ST (items)));
1959 }
1960
1961 XSRETURN_EMPTY;
1962}
1963
1964static int
1965slf_check_rouse_wait (pTHX_ struct CoroSLF *frame)
1966{
1967 SV *data = (SV *)frame->data;
1968
1969 if (CORO_THROW)
1970 return 0;
1971
1972 if (SvTYPE (SvRV (data)) != SVt_PVAV)
1973 return 1;
1974
1975 /* now push all results on the stack */
1976 {
1977 dSP;
1978 AV *av = (AV *)SvRV (data);
1979 int i;
1980
1981 EXTEND (SP, AvFILLp (av) + 1);
1982 for (i = 0; i <= AvFILLp (av); ++i)
1983 PUSHs (sv_2mortal (AvARRAY (av)[i]));
1984
1985 /* we have stolen the elements, so ste length to zero and free */
1986 AvFILLp (av) = -1;
1987 av_undef (av);
1988
1989 PUTBACK;
1990 }
1991
1992 return 0;
1993}
1994
1995static void
1996slf_init_rouse_wait (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
1997{
1998 SV *cb;
1999
2000 if (items)
2001 cb = arg [0];
2002 else
2003 {
2004 struct coro *coro = SvSTATE_current;
2005
2006 if (!coro->rouse_cb)
2007 croak ("Coro::rouse_wait called without rouse callback, and no default rouse callback found either,");
2008
2009 cb = sv_2mortal (coro->rouse_cb);
2010 coro->rouse_cb = 0;
2011 }
2012
2013 if (!SvROK (cb)
2014 || SvTYPE (SvRV (cb)) != SVt_PVCV
2015 || CvXSUB ((CV *)SvRV (cb)) != coro_rouse_callback)
2016 croak ("Coro::rouse_wait called with illegal callback argument,");
2017
2018 {
2019 CV *cv = (CV *)SvRV (cb); /* for GENSUB_ARG */
2020 SV *data = (SV *)GENSUB_ARG;
2021
2022 frame->data = (void *)data;
2023 frame->prepare = SvTYPE (SvRV (data)) == SVt_PVAV ? prepare_nop : prepare_schedule;
2024 frame->check = slf_check_rouse_wait;
2025 }
2026}
2027
2028static SV *
2029coro_new_rouse_cb (pTHX)
2030{
2031 HV *hv = (HV *)SvRV (coro_current);
2032 struct coro *coro = SvSTATE_hv (hv);
2033 SV *data = newRV_inc ((SV *)hv);
2034 SV *cb = gensub (aTHX_ coro_rouse_callback, (void *)data);
2035
2036 sv_magicext (SvRV (cb), data, CORO_MAGIC_type_rouse, 0, 0, 0);
2037 SvREFCNT_dec (data); /* magicext increases the refcount */
2038
2039 SvREFCNT_dec (coro->rouse_cb);
2040 coro->rouse_cb = SvREFCNT_inc_NN (cb);
2041
2042 return cb;
2043}
2044
2045/*****************************************************************************/
2046/* schedule-like-function opcode (SLF) */
2047
2048static UNOP slf_restore; /* restore stack as entersub did, for first-re-run */
2049static const CV *slf_cv;
2050static SV **slf_argv;
2051static int slf_argc, slf_arga; /* count, allocated */
2052static I32 slf_ax; /* top of stack, for restore */
2053
2054/* this restores the stack in the case we patched the entersub, to */
2055/* recreate the stack frame as perl will on following calls */
2056/* since entersub cleared the stack */
2057static OP *
2058pp_restore (pTHX)
2059{
2060 int i;
2061 SV **SP = PL_stack_base + slf_ax;
2062
2063 PUSHMARK (SP);
2064
2065 EXTEND (SP, slf_argc + 1);
2066
2067 for (i = 0; i < slf_argc; ++i)
2068 PUSHs (sv_2mortal (slf_argv [i]));
2069
2070 PUSHs ((SV *)CvGV (slf_cv));
2071
2072 RETURNOP (slf_restore.op_first);
2073}
2074
2075static void
2076slf_prepare_transfer (pTHX_ struct coro_transfer_args *ta)
2077{
2078 SV **arg = (SV **)slf_frame.data;
2079
2080 prepare_transfer (aTHX_ ta, arg [0], arg [1]);
2081}
2082
2083static void
2084slf_init_transfer (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2085{
2086 if (items != 2)
2087 croak ("Coro::State::transfer (prev, next) expects two arguments, not %d,", items);
2088
2089 frame->prepare = slf_prepare_transfer;
2090 frame->check = slf_check_nop;
2091 frame->data = (void *)arg; /* let's hope it will stay valid */
2092}
2093
2094static void
2095slf_init_schedule (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2096{
2097 frame->prepare = prepare_schedule;
2098 frame->check = slf_check_nop;
2099}
2100
2101static void
2102slf_prepare_schedule_to (pTHX_ struct coro_transfer_args *ta)
2103{
2104 struct coro *next = (struct coro *)slf_frame.data;
2105
2106 SvREFCNT_inc_NN (next->hv);
2107 prepare_schedule_to (aTHX_ ta, next);
2108}
2109
2110static void
2111slf_init_schedule_to (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2112{
2113 if (!items)
2114 croak ("Coro::schedule_to expects a coroutine argument, caught");
2115
2116 frame->data = (void *)SvSTATE (arg [0]);
2117 frame->prepare = slf_prepare_schedule_to;
2118 frame->check = slf_check_nop;
2119}
2120
2121static void
2122slf_init_cede_to (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2123{
2124 api_ready (aTHX_ SvRV (coro_current));
2125
2126 slf_init_schedule_to (aTHX_ frame, cv, arg, items);
2127}
2128
2129static void
2130slf_init_cede (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2131{
2132 frame->prepare = prepare_cede;
2133 frame->check = slf_check_nop;
2134}
2135
2136static void
2137slf_init_cede_notself (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2138{
2139 frame->prepare = prepare_cede_notself;
2140 frame->check = slf_check_nop;
2141}
2142
2143/*
2144 * these not obviously related functions are all rolled into one
2145 * function to increase chances that they all will call transfer with the same
2146 * stack offset
2147 * SLF stands for "schedule-like-function".
2148 */
2149static OP *
2150pp_slf (pTHX)
2151{
2152 I32 checkmark; /* mark SP to see how many elements check has pushed */
2153
2154 /* set up the slf frame, unless it has already been set-up */
2155 /* the latter happens when a new coro has been started */
2156 /* or when a new cctx was attached to an existing coroutine */
2157 if (expect_true (!slf_frame.prepare))
2158 {
2159 /* first iteration */
2160 dSP;
2161 SV **arg = PL_stack_base + TOPMARK + 1;
2162 int items = SP - arg; /* args without function object */
2163 SV *gv = *sp;
2164
2165 /* do a quick consistency check on the "function" object, and if it isn't */
2166 /* for us, divert to the real entersub */
2167 if (SvTYPE (gv) != SVt_PVGV
2168 || !GvCV (gv)
2169 || !(CvFLAGS (GvCV (gv)) & CVf_SLF))
2170 return PL_ppaddr[OP_ENTERSUB](aTHX);
2171
2172 if (!(PL_op->op_flags & OPf_STACKED))
2173 {
2174 /* ampersand-form of call, use @_ instead of stack */
2175 AV *av = GvAV (PL_defgv);
2176 arg = AvARRAY (av);
2177 items = AvFILLp (av) + 1;
2178 }
2179
2180 /* now call the init function, which needs to set up slf_frame */
2181 ((coro_slf_cb)CvXSUBANY (GvCV (gv)).any_ptr)
2182 (aTHX_ &slf_frame, GvCV (gv), arg, items);
2183
2184 /* pop args */
2185 SP = PL_stack_base + POPMARK;
2186
2187 PUTBACK;
2188 }
2189
2190 /* now that we have a slf_frame, interpret it! */
2191 /* we use a callback system not to make the code needlessly */
2192 /* complicated, but so we can run multiple perl coros from one cctx */
2193
2194 do
2195 {
2196 struct coro_transfer_args ta;
2197
2198 slf_frame.prepare (aTHX_ &ta);
2199 TRANSFER (ta, 0);
2200
2201 checkmark = PL_stack_sp - PL_stack_base;
2202 }
2203 while (slf_frame.check (aTHX_ &slf_frame));
2204
2205 slf_frame.prepare = 0; /* invalidate the frame, we are done processing it */
2206
2207 /* exception handling */
2208 if (expect_false (CORO_THROW))
2209 {
2210 SV *exception = sv_2mortal (CORO_THROW);
2211
2212 CORO_THROW = 0;
2213 sv_setsv (ERRSV, exception);
2214 croak (0);
2215 }
2216
2217 /* return value handling - mostly like entersub */
2218 /* make sure we put something on the stack in scalar context */
2219 if (GIMME_V == G_SCALAR)
2220 {
2221 dSP;
2222 SV **bot = PL_stack_base + checkmark;
2223
2224 if (sp == bot) /* too few, push undef */
2225 bot [1] = &PL_sv_undef;
2226 else if (sp != bot + 1) /* too many, take last one */
2227 bot [1] = *sp;
2228
2229 SP = bot + 1;
2230
2231 PUTBACK;
2232 }
2233
2234 return NORMAL;
2235}
2236
2237static void
2238api_execute_slf (pTHX_ CV *cv, coro_slf_cb init_cb, I32 ax)
2239{
2240 int i;
2241 SV **arg = PL_stack_base + ax;
2242 int items = PL_stack_sp - arg + 1;
2243
2244 assert (("FATAL: SLF call with illegal CV value", !CvANON (cv)));
2245
2246 if (PL_op->op_ppaddr != PL_ppaddr [OP_ENTERSUB]
2247 && PL_op->op_ppaddr != pp_slf)
2248 croak ("FATAL: Coro SLF calls can only be made normally, not via goto or any other means, caught");
2249
2250 CvFLAGS (cv) |= CVf_SLF;
2251 CvXSUBANY (cv).any_ptr = (void *)init_cb;
2252 slf_cv = cv;
2253
2254 /* we patch the op, and then re-run the whole call */
2255 /* we have to put the same argument on the stack for this to work */
2256 /* and this will be done by pp_restore */
2257 slf_restore.op_next = (OP *)&slf_restore;
2258 slf_restore.op_type = OP_CUSTOM;
2259 slf_restore.op_ppaddr = pp_restore;
2260 slf_restore.op_first = PL_op;
2261
2262 slf_ax = ax - 1; /* undo the ax++ inside dAXMARK */
2263
2264 if (PL_op->op_flags & OPf_STACKED)
2265 {
2266 if (items > slf_arga)
2267 {
2268 slf_arga = items;
2269 free (slf_argv);
2270 slf_argv = malloc (slf_arga * sizeof (SV *));
2271 }
2272
2273 slf_argc = items;
2274
2275 for (i = 0; i < items; ++i)
2276 slf_argv [i] = SvREFCNT_inc (arg [i]);
2277 }
2278 else
2279 slf_argc = 0;
2280
2281 PL_op->op_ppaddr = pp_slf;
2282 /*PL_op->op_type = OP_CUSTOM; /* we do behave like entersub still */
2283
2284 PL_op = (OP *)&slf_restore;
2285}
2286
2287/*****************************************************************************/
2288/* PerlIO::cede */
2289
2290typedef struct
2291{
2292 PerlIOBuf base;
2293 NV next, every;
2294} PerlIOCede;
2295
2296static IV
2297PerlIOCede_pushed (pTHX_ PerlIO *f, const char *mode, SV *arg, PerlIO_funcs *tab)
2298{
2299 PerlIOCede *self = PerlIOSelf (f, PerlIOCede);
2300
2301 self->every = SvCUR (arg) ? SvNV (arg) : 0.01;
2302 self->next = nvtime () + self->every;
2303
2304 return PerlIOBuf_pushed (aTHX_ f, mode, Nullsv, tab);
2305}
2306
2307static SV *
2308PerlIOCede_getarg (pTHX_ PerlIO *f, CLONE_PARAMS *param, int flags)
2309{
2310 PerlIOCede *self = PerlIOSelf (f, PerlIOCede);
2311
2312 return newSVnv (self->every);
2313}
2314
2315static IV
2316PerlIOCede_flush (pTHX_ PerlIO *f)
2317{
2318 PerlIOCede *self = PerlIOSelf (f, PerlIOCede);
2319 double now = nvtime ();
2320
2321 if (now >= self->next)
2322 {
2323 api_cede (aTHX);
2324 self->next = now + self->every;
2325 }
2326
2327 return PerlIOBuf_flush (aTHX_ f);
2328}
2329
2330static PerlIO_funcs PerlIO_cede =
2331{
2332 sizeof(PerlIO_funcs),
2333 "cede",
2334 sizeof(PerlIOCede),
2335 PERLIO_K_DESTRUCT | PERLIO_K_RAW,
2336 PerlIOCede_pushed,
2337 PerlIOBuf_popped,
2338 PerlIOBuf_open,
2339 PerlIOBase_binmode,
2340 PerlIOCede_getarg,
2341 PerlIOBase_fileno,
2342 PerlIOBuf_dup,
2343 PerlIOBuf_read,
2344 PerlIOBuf_unread,
2345 PerlIOBuf_write,
2346 PerlIOBuf_seek,
2347 PerlIOBuf_tell,
2348 PerlIOBuf_close,
2349 PerlIOCede_flush,
2350 PerlIOBuf_fill,
2351 PerlIOBase_eof,
2352 PerlIOBase_error,
2353 PerlIOBase_clearerr,
2354 PerlIOBase_setlinebuf,
2355 PerlIOBuf_get_base,
2356 PerlIOBuf_bufsiz,
2357 PerlIOBuf_get_ptr,
2358 PerlIOBuf_get_cnt,
2359 PerlIOBuf_set_ptrcnt,
2360};
2361
2362/*****************************************************************************/
2363/* Coro::Semaphore & Coro::Signal */
2364
2365static SV *
2366coro_waitarray_new (pTHX_ int count)
2367{
2368 /* a semaphore contains a counter IV in $sem->[0] and any waiters after that */
2369 AV *av = newAV ();
2370 SV **ary;
2371
2372 /* unfortunately, building manually saves memory */
2373 Newx (ary, 2, SV *);
2374 AvALLOC (av) = ary;
2375#if PERL_VERSION_ATLEAST (5,10,0)
2376 AvARRAY (av) = ary;
2377#else
2378 /* 5.8.8 needs this syntax instead of AvARRAY = ary, yet */
2379 /* -DDEBUGGING flags this as a bug, despite it perfectly working */
2380 SvPVX ((SV *)av) = (char *)ary;
2381#endif
2382 AvMAX (av) = 1;
2383 AvFILLp (av) = 0;
2384 ary [0] = newSViv (count);
2385
2386 return newRV_noinc ((SV *)av);
2387}
2388
2389/* semaphore */
2390
2391static void
2392coro_semaphore_adjust (pTHX_ AV *av, IV adjust)
2393{
2394 SV *count_sv = AvARRAY (av)[0];
2395 IV count = SvIVX (count_sv);
2396
2397 count += adjust;
2398 SvIVX (count_sv) = count;
2399
2400 /* now wake up as many waiters as are expected to lock */
2401 while (count > 0 && AvFILLp (av) > 0)
2402 {
2403 SV *cb;
2404
2405 /* swap first two elements so we can shift a waiter */
2406 AvARRAY (av)[0] = AvARRAY (av)[1];
2407 AvARRAY (av)[1] = count_sv;
2408 cb = av_shift (av);
2409
2410 if (SvOBJECT (cb))
2411 {
2412 api_ready (aTHX_ cb);
2413 --count;
2414 }
2415 else if (SvTYPE (cb) == SVt_PVCV)
2416 {
2417 dSP;
2418 PUSHMARK (SP);
2419 XPUSHs (sv_2mortal (newRV_inc ((SV *)av)));
2420 PUTBACK;
2421 call_sv (cb, G_VOID | G_DISCARD | G_EVAL | G_KEEPERR);
2422 }
2423
2424 SvREFCNT_dec (cb);
2425 }
2426}
2427
2428static void
2429coro_semaphore_on_destroy (pTHX_ struct coro *coro)
2430{
2431 /* call $sem->adjust (0) to possibly wake up some other waiters */
2432 coro_semaphore_adjust (aTHX_ (AV *)coro->slf_frame.data, 0);
2433}
2434
2435static int
2436slf_check_semaphore_down_or_wait (pTHX_ struct CoroSLF *frame, int acquire)
2437{
2438 AV *av = (AV *)frame->data;
2439 SV *count_sv = AvARRAY (av)[0];
2440
2441 /* if we are about to throw, don't actually acquire the lock, just throw */
2442 if (CORO_THROW)
2443 return 0;
2444 else if (SvIVX (count_sv) > 0)
2445 {
2446 SvSTATE_current->on_destroy = 0;
2447
2448 if (acquire)
2449 SvIVX (count_sv) = SvIVX (count_sv) - 1;
2450 else
2451 coro_semaphore_adjust (aTHX_ av, 0);
2452
2453 return 0;
2454 }
2455 else
2456 {
2457 int i;
2458 /* if we were woken up but can't down, we look through the whole */
2459 /* waiters list and only add us if we aren't in there already */
2460 /* this avoids some degenerate memory usage cases */
2461
2462 for (i = 1; i <= AvFILLp (av); ++i)
2463 if (AvARRAY (av)[i] == SvRV (coro_current))
2464 return 1;
2465
2466 av_push (av, SvREFCNT_inc (SvRV (coro_current)));
2467 return 1;
2468 }
2469}
2470
2471static int
2472slf_check_semaphore_down (pTHX_ struct CoroSLF *frame)
2473{
2474 return slf_check_semaphore_down_or_wait (aTHX_ frame, 1);
2475}
2476
2477static int
2478slf_check_semaphore_wait (pTHX_ struct CoroSLF *frame)
2479{
2480 return slf_check_semaphore_down_or_wait (aTHX_ frame, 0);
2481}
2482
2483static void
2484slf_init_semaphore_down_or_wait (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2485{
2486 AV *av = (AV *)SvRV (arg [0]);
2487
2488 if (SvIVX (AvARRAY (av)[0]) > 0)
2489 {
2490 frame->data = (void *)av;
2491 frame->prepare = prepare_nop;
2492 }
2493 else
2494 {
2495 av_push (av, SvREFCNT_inc (SvRV (coro_current)));
2496
2497 frame->data = (void *)sv_2mortal (SvREFCNT_inc ((SV *)av));
2498 frame->prepare = prepare_schedule;
2499
2500 /* to avoid race conditions when a woken-up coro gets terminated */
2501 /* we arrange for a temporary on_destroy that calls adjust (0) */
2502 SvSTATE_current->on_destroy = coro_semaphore_on_destroy;
2503 }
2504}
2505
2506static void
2507slf_init_semaphore_down (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2508{
2509 slf_init_semaphore_down_or_wait (aTHX_ frame, cv, arg, items);
2510 frame->check = slf_check_semaphore_down;
2511}
2512
2513static void
2514slf_init_semaphore_wait (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2515{
2516 if (items >= 2)
2517 {
2518 /* callback form */
2519 AV *av = (AV *)SvRV (arg [0]);
2520 CV *cb_cv = coro_sv_2cv (aTHX_ arg [1]);
2521
2522 av_push (av, (SV *)SvREFCNT_inc_NN (cb_cv));
2523
2524 if (SvIVX (AvARRAY (av)[0]) > 0)
2525 coro_semaphore_adjust (aTHX_ av, 0);
2526
2527 frame->prepare = prepare_nop;
2528 frame->check = slf_check_nop;
2529 }
2530 else
2531 {
2532 slf_init_semaphore_down_or_wait (aTHX_ frame, cv, arg, items);
2533 frame->check = slf_check_semaphore_wait;
2534 }
2535}
2536
2537/* signal */
2538
2539static void
2540coro_signal_wake (pTHX_ AV *av, int count)
2541{
2542 SvIVX (AvARRAY (av)[0]) = 0;
2543
2544 /* now signal count waiters */
2545 while (count > 0 && AvFILLp (av) > 0)
2546 {
2547 SV *cb;
2548
2549 /* swap first two elements so we can shift a waiter */
2550 cb = AvARRAY (av)[0];
2551 AvARRAY (av)[0] = AvARRAY (av)[1];
2552 AvARRAY (av)[1] = cb;
2553
2554 cb = av_shift (av);
2555
2556 api_ready (aTHX_ cb);
2557 sv_setiv (cb, 0); /* signal waiter */
2558 SvREFCNT_dec (cb);
2559
2560 --count;
2561 }
2562}
2563
2564static int
2565slf_check_signal_wait (pTHX_ struct CoroSLF *frame)
2566{
2567 /* if we are about to throw, also stop waiting */
2568 return SvROK ((SV *)frame->data) && !CORO_THROW;
2569}
2570
2571static void
2572slf_init_signal_wait (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2573{
2574 AV *av = (AV *)SvRV (arg [0]);
2575
2576 if (SvIVX (AvARRAY (av)[0]))
2577 {
2578 SvIVX (AvARRAY (av)[0]) = 0;
2579 frame->prepare = prepare_nop;
2580 frame->check = slf_check_nop;
2581 }
2582 else
2583 {
2584 SV *waiter = newRV_inc (SvRV (coro_current)); /* owned by signal av */
2585
2586 av_push (av, waiter);
2587
2588 frame->data = (void *)sv_2mortal (SvREFCNT_inc_NN (waiter)); /* owned by process */
2589 frame->prepare = prepare_schedule;
2590 frame->check = slf_check_signal_wait;
2591 }
2592}
2593
2594/*****************************************************************************/
2595/* Coro::AIO */
2596
2597#define CORO_MAGIC_type_aio PERL_MAGIC_ext
2598
2599/* helper storage struct */
2600struct io_state
2601{
2602 int errorno;
2603 I32 laststype; /* U16 in 5.10.0 */
2604 int laststatval;
2605 Stat_t statcache;
2606};
2607
2608static void
2609coro_aio_callback (pTHX_ CV *cv)
2610{
2611 dXSARGS;
2612 AV *state = (AV *)GENSUB_ARG;
2613 SV *coro = av_pop (state);
2614 SV *data_sv = newSV (sizeof (struct io_state));
2615
2616 av_extend (state, items - 1);
2617
2618 sv_upgrade (data_sv, SVt_PV);
2619 SvCUR_set (data_sv, sizeof (struct io_state));
2620 SvPOK_only (data_sv);
2621
2622 {
2623 struct io_state *data = (struct io_state *)SvPVX (data_sv);
2624
2625 data->errorno = errno;
2626 data->laststype = PL_laststype;
2627 data->laststatval = PL_laststatval;
2628 data->statcache = PL_statcache;
2629 }
2630
2631 /* now build the result vector out of all the parameters and the data_sv */
2632 {
2633 int i;
2634
2635 for (i = 0; i < items; ++i)
2636 av_push (state, SvREFCNT_inc_NN (ST (i)));
2637 }
2638
2639 av_push (state, data_sv);
2640
2641 api_ready (aTHX_ coro);
2642 SvREFCNT_dec (coro);
2643 SvREFCNT_dec ((AV *)state);
2644}
2645
2646static int
2647slf_check_aio_req (pTHX_ struct CoroSLF *frame)
2648{
2649 AV *state = (AV *)frame->data;
2650
2651 /* if we are about to throw, return early */
2652 /* this does not cancel the aio request, but at least */
2653 /* it quickly returns */
2654 if (CORO_THROW)
2655 return 0;
2656
2657 /* one element that is an RV? repeat! */
2658 if (AvFILLp (state) == 0 && SvROK (AvARRAY (state)[0]))
2659 return 1;
2660
2661 /* restore status */
2662 {
2663 SV *data_sv = av_pop (state);
2664 struct io_state *data = (struct io_state *)SvPVX (data_sv);
2665
2666 errno = data->errorno;
2667 PL_laststype = data->laststype;
2668 PL_laststatval = data->laststatval;
2669 PL_statcache = data->statcache;
2670
2671 SvREFCNT_dec (data_sv);
2672 }
2673
2674 /* push result values */
2675 {
2676 dSP;
2677 int i;
2678
2679 EXTEND (SP, AvFILLp (state) + 1);
2680 for (i = 0; i <= AvFILLp (state); ++i)
2681 PUSHs (sv_2mortal (SvREFCNT_inc_NN (AvARRAY (state)[i])));
2682
2683 PUTBACK;
2684 }
2685
2686 return 0;
2687}
2688
2689static void
2690slf_init_aio_req (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2691{
2692 AV *state = (AV *)sv_2mortal ((SV *)newAV ());
2693 SV *coro_hv = SvRV (coro_current);
2694 struct coro *coro = SvSTATE_hv (coro_hv);
2695
2696 /* put our coroutine id on the state arg */
2697 av_push (state, SvREFCNT_inc_NN (coro_hv));
2698
2699 /* first see whether we have a non-zero priority and set it as AIO prio */
2700 if (coro->prio)
2701 {
2702 dSP;
2703
2704 static SV *prio_cv;
2705 static SV *prio_sv;
2706
2707 if (expect_false (!prio_cv))
2708 {
2709 prio_cv = (SV *)get_cv ("IO::AIO::aioreq_pri", 0);
2710 prio_sv = newSViv (0);
2711 }
2712
2713 PUSHMARK (SP);
2714 sv_setiv (prio_sv, coro->prio);
2715 XPUSHs (prio_sv);
2716
2717 PUTBACK;
2718 call_sv (prio_cv, G_VOID | G_DISCARD);
2719 }
2720
2721 /* now call the original request */
2722 {
2723 dSP;
2724 CV *req = (CV *)CORO_MAGIC_NN ((SV *)cv, CORO_MAGIC_type_aio)->mg_obj;
2725 int i;
2726
2727 PUSHMARK (SP);
2728
2729 /* first push all args to the stack */
2730 EXTEND (SP, items + 1);
2731
2732 for (i = 0; i < items; ++i)
2733 PUSHs (arg [i]);
2734
2735 /* now push the callback closure */
2736 PUSHs (sv_2mortal (gensub (aTHX_ coro_aio_callback, (void *)SvREFCNT_inc_NN ((SV *)state))));
2737
2738 /* now call the AIO function - we assume our request is uncancelable */
2739 PUTBACK;
2740 call_sv ((SV *)req, G_VOID | G_DISCARD);
2741 }
2742
2743 /* now that the requets is going, we loop toll we have a result */
2744 frame->data = (void *)state;
2745 frame->prepare = prepare_schedule;
2746 frame->check = slf_check_aio_req;
2747}
2748
2749static void
2750coro_aio_req_xs (pTHX_ CV *cv)
2751{
2752 dXSARGS;
2753
2754 CORO_EXECUTE_SLF_XS (slf_init_aio_req);
2755
2756 XSRETURN_EMPTY;
2757}
2758
2759/*****************************************************************************/
2760
2761#if CORO_CLONE
2762# include "clone.c"
2763#endif
2764
2765MODULE = Coro::State PACKAGE = Coro::State PREFIX = api_
2766
2767PROTOTYPES: DISABLE
2768
2769BOOT:
2770{
2771#ifdef USE_ITHREADS
2772# if CORO_PTHREAD
2773 coro_thx = PERL_GET_CONTEXT;
2774# endif
2775#endif
2776 BOOT_PAGESIZE;
2777
2778 cctx_current = cctx_new_empty ();
2779
2780 irsgv = gv_fetchpv ("/" , GV_ADD|GV_NOTQUAL, SVt_PV);
2781 stdoutgv = gv_fetchpv ("STDOUT", GV_ADD|GV_NOTQUAL, SVt_PVIO);
2782
2783 orig_sigelem_get = PL_vtbl_sigelem.svt_get; PL_vtbl_sigelem.svt_get = coro_sigelem_get;
2784 orig_sigelem_set = PL_vtbl_sigelem.svt_set; PL_vtbl_sigelem.svt_set = coro_sigelem_set;
2785 orig_sigelem_clr = PL_vtbl_sigelem.svt_clear; PL_vtbl_sigelem.svt_clear = coro_sigelem_clr;
2786
2787 hv_sig = coro_get_hv (aTHX_ "SIG", TRUE);
2788 rv_diehook = newRV_inc ((SV *)gv_fetchpv ("Coro::State::diehook" , 0, SVt_PVCV));
2789 rv_warnhook = newRV_inc ((SV *)gv_fetchpv ("Coro::State::warnhook", 0, SVt_PVCV));
2790
2791 coro_state_stash = gv_stashpv ("Coro::State", TRUE);
2792
2793 newCONSTSUB (coro_state_stash, "CC_TRACE" , newSViv (CC_TRACE));
2794 newCONSTSUB (coro_state_stash, "CC_TRACE_SUB" , newSViv (CC_TRACE_SUB));
2795 newCONSTSUB (coro_state_stash, "CC_TRACE_LINE", newSViv (CC_TRACE_LINE));
2796 newCONSTSUB (coro_state_stash, "CC_TRACE_ALL" , newSViv (CC_TRACE_ALL));
2797
2798 main_mainstack = PL_mainstack;
2799 main_top_env = PL_top_env;
2800
2801 while (main_top_env->je_prev)
2802 main_top_env = main_top_env->je_prev;
2803
2804 {
2805 SV *slf = sv_2mortal (newSViv (PTR2IV (pp_slf)));
2806
2807 if (!PL_custom_op_names) PL_custom_op_names = newHV ();
2808 hv_store_ent (PL_custom_op_names, slf, newSVpv ("coro_slf", 0), 0);
2809
2810 if (!PL_custom_op_descs) PL_custom_op_descs = newHV ();
2811 hv_store_ent (PL_custom_op_descs, slf, newSVpv ("coro schedule like function", 0), 0);
2812 }
2813
2814 coroapi.ver = CORO_API_VERSION;
2815 coroapi.rev = CORO_API_REVISION;
2816
2817 coroapi.transfer = api_transfer;
2818
2819 coroapi.sv_state = SvSTATE_;
2820 coroapi.execute_slf = api_execute_slf;
2821 coroapi.prepare_nop = prepare_nop;
2822 coroapi.prepare_schedule = prepare_schedule;
2823 coroapi.prepare_cede = prepare_cede;
2824 coroapi.prepare_cede_notself = prepare_cede_notself;
2825
2826 {
2827 SV **svp = hv_fetch (PL_modglobal, "Time::NVtime", 12, 0);
2828
2829 if (!svp) croak ("Time::HiRes is required");
2830 if (!SvIOK (*svp)) croak ("Time::NVtime isn't a function pointer");
2831
2832 nvtime = INT2PTR (double (*)(), SvIV (*svp));
2833 }
2834
2835 assert (("PRIO_NORMAL must be 0", !PRIO_NORMAL));
2836}
2837
2838SV *
2839new (char *klass, ...)
2840 ALIAS:
2841 Coro::new = 1
2842 CODE:
2843{
2844 struct coro *coro;
2845 MAGIC *mg;
2846 HV *hv;
2847 CV *cb;
2848 int i;
2849
2850 if (items > 1)
2851 {
2852 cb = coro_sv_2cv (aTHX_ ST (1));
2853
2854 if (!ix)
235 { 2855 {
236 /* I never used formats, so how should I know how these are implemented? */ 2856 if (CvISXSUB (cb))
237 /* my bold guess is as a simple, plain sub... */ 2857 croak ("Coro::State doesn't support XS functions as coroutine start, caught");
238 croak ("CXt_FORMAT not yet handled. Don't switch coroutines from within formats"); 2858
2859 if (!CvROOT (cb))
2860 croak ("Coro::State doesn't support autoloaded or undefined functions as coroutine start, caught");
239 } 2861 }
240 } 2862 }
241 2863
242 if (top_si->si_type == PERLSI_MAIN)
243 break;
244
245 top_si = top_si->si_prev;
246 ccstk = top_si->si_cxstack;
247 cxix = top_si->si_cxix;
248 }
249
250 PUTBACK;
251 }
252
253 c->dowarn = PL_dowarn;
254 c->defav = GvAV (PL_defgv);
255 c->curstackinfo = PL_curstackinfo;
256 c->curstack = PL_curstack;
257 c->mainstack = PL_mainstack;
258 c->stack_sp = PL_stack_sp;
259 c->op = PL_op;
260 c->curpad = PL_curpad;
261 c->stack_base = PL_stack_base;
262 c->stack_max = PL_stack_max;
263 c->tmps_stack = PL_tmps_stack;
264 c->tmps_floor = PL_tmps_floor;
265 c->tmps_ix = PL_tmps_ix;
266 c->tmps_max = PL_tmps_max;
267 c->markstack = PL_markstack;
268 c->markstack_ptr = PL_markstack_ptr;
269 c->markstack_max = PL_markstack_max;
270 c->scopestack = PL_scopestack;
271 c->scopestack_ix = PL_scopestack_ix;
272 c->scopestack_max = PL_scopestack_max;
273 c->savestack = PL_savestack;
274 c->savestack_ix = PL_savestack_ix;
275 c->savestack_max = PL_savestack_max;
276 c->retstack = PL_retstack;
277 c->retstack_ix = PL_retstack_ix;
278 c->retstack_max = PL_retstack_max;
279 c->curcop = PL_curcop;
280}
281
282static void
283LOAD(pTHX_ Coro__State c)
284{
285 PL_dowarn = c->dowarn;
286 GvAV (PL_defgv) = c->defav;
287 PL_curstackinfo = c->curstackinfo;
288 PL_curstack = c->curstack;
289 PL_mainstack = c->mainstack;
290 PL_stack_sp = c->stack_sp;
291 PL_op = c->op;
292 PL_curpad = c->curpad;
293 PL_stack_base = c->stack_base;
294 PL_stack_max = c->stack_max;
295 PL_tmps_stack = c->tmps_stack;
296 PL_tmps_floor = c->tmps_floor;
297 PL_tmps_ix = c->tmps_ix;
298 PL_tmps_max = c->tmps_max;
299 PL_markstack = c->markstack;
300 PL_markstack_ptr = c->markstack_ptr;
301 PL_markstack_max = c->markstack_max;
302 PL_scopestack = c->scopestack;
303 PL_scopestack_ix = c->scopestack_ix;
304 PL_scopestack_max = c->scopestack_max;
305 PL_savestack = c->savestack;
306 PL_savestack_ix = c->savestack_ix;
307 PL_savestack_max = c->savestack_max;
308 PL_retstack = c->retstack;
309 PL_retstack_ix = c->retstack_ix;
310 PL_retstack_max = c->retstack_max;
311 PL_curcop = c->curcop;
312
313 {
314 dSP;
315 CV *cv;
316
317 /* now do the ugly restore mess */
318 while ((cv = (CV *)POPs))
319 {
320 AV *padlist = (AV *)POPs;
321
322 put_padlist (cv);
323 CvPADLIST(cv) = padlist;
324 CvDEPTH(cv) = (I32)POPs;
325
326#ifdef USE_THREADS
327 CvOWNER(cv) = (struct perl_thread *)POPs;
328 error does not work either
329#endif
330 }
331
332 PUTBACK;
333 }
334}
335
336/* this is an EXACT copy of S_nuke_stacks in perl.c, which is unfortunately static */
337STATIC void
338destroy_stacks(pTHX)
339{
340 /* die does this while calling POPSTACK, but I just don't see why. */
341 dounwind(-1);
342
343 /* is this ugly, I ask? */
344 while (PL_scopestack_ix)
345 LEAVE;
346
347 while (PL_curstackinfo->si_next)
348 PL_curstackinfo = PL_curstackinfo->si_next;
349
350 while (PL_curstackinfo)
351 {
352 PERL_SI *p = PL_curstackinfo->si_prev;
353
354 SvREFCNT_dec(PL_curstackinfo->si_stack);
355 Safefree(PL_curstackinfo->si_cxstack);
356 Safefree(PL_curstackinfo);
357 PL_curstackinfo = p;
358 }
359
360 if (PL_scopestack_ix != 0)
361 Perl_warner(aTHX_ WARN_INTERNAL,
362 "Unbalanced scopes: %ld more ENTERs than LEAVEs\n",
363 (long)PL_scopestack_ix);
364 if (PL_savestack_ix != 0)
365 Perl_warner(aTHX_ WARN_INTERNAL,
366 "Unbalanced saves: %ld more saves than restores\n",
367 (long)PL_savestack_ix);
368 if (PL_tmps_floor != -1)
369 Perl_warner(aTHX_ WARN_INTERNAL,"Unbalanced tmps: %ld more allocs than frees\n",
370 (long)PL_tmps_floor + 1);
371 /*
372 */
373 Safefree(PL_tmps_stack);
374 Safefree(PL_markstack);
375 Safefree(PL_scopestack);
376 Safefree(PL_savestack);
377 Safefree(PL_retstack);
378}
379
380#define SUB_INIT "Coro::State::_newcoro"
381
382MODULE = Coro::State PACKAGE = Coro::State
383
384PROTOTYPES: ENABLE
385
386BOOT:
387 if (!padlist_cache)
388 padlist_cache = newHV ();
389
390Coro::State
391_newprocess(args)
392 SV * args
393 PROTOTYPE: $
394 CODE:
395 Coro__State coro;
396
397 if (!SvROK (args) || SvTYPE (SvRV (args)) != SVt_PVAV)
398 croak ("Coro::State::newprocess expects an arrayref");
399
400 New (0, coro, 1, struct coro); 2864 Newz (0, coro, 1, struct coro);
2865 coro->args = newAV ();
2866 coro->flags = CF_NEW;
401 2867
402 coro->mainstack = 0; /* actual work is done inside transfer */ 2868 if (coro_first) coro_first->prev = coro;
403 coro->args = (AV *)SvREFCNT_inc (SvRV (args)); 2869 coro->next = coro_first;
2870 coro_first = coro;
404 2871
405 RETVAL = coro; 2872 coro->hv = hv = newHV ();
2873 mg = sv_magicext ((SV *)hv, 0, CORO_MAGIC_type_state, &coro_state_vtbl, (char *)coro, 0);
2874 mg->mg_flags |= MGf_DUP;
2875 RETVAL = sv_bless (newRV_noinc ((SV *)hv), gv_stashpv (klass, 1));
2876
2877 if (items > 1)
2878 {
2879 av_extend (coro->args, items - 1 + ix - 1);
2880
2881 if (ix)
2882 {
2883 av_push (coro->args, SvREFCNT_inc_NN ((SV *)cb));
2884 cb = cv_coro_run;
2885 }
2886
2887 coro->startcv = (CV *)SvREFCNT_inc_NN ((SV *)cb);
2888
2889 for (i = 2; i < items; i++)
2890 av_push (coro->args, newSVsv (ST (i)));
2891 }
2892}
406 OUTPUT: 2893 OUTPUT:
407 RETVAL 2894 RETVAL
408 2895
409void 2896void
410transfer(prev,next) 2897transfer (...)
411 Coro::State_or_hashref prev 2898 PROTOTYPE: $$
412 Coro::State_or_hashref next 2899 CODE:
413 CODE: 2900 CORO_EXECUTE_SLF_XS (slf_init_transfer);
414 2901
415 if (prev != next) 2902bool
2903_destroy (SV *coro_sv)
2904 CODE:
2905 RETVAL = coro_state_destroy (aTHX_ SvSTATE (coro_sv));
2906 OUTPUT:
2907 RETVAL
2908
2909void
2910_exit (int code)
2911 PROTOTYPE: $
2912 CODE:
2913 _exit (code);
2914
2915SV *
2916clone (Coro::State coro)
2917 CODE:
2918{
2919#if CORO_CLONE
2920 struct coro *ncoro = coro_clone (coro);
2921 MAGIC *mg;
2922 /* TODO: too much duplication */
2923 ncoro->hv = newHV ();
2924 mg = sv_magicext ((SV *)ncoro->hv, 0, CORO_MAGIC_type_state, &coro_state_vtbl, (char *)ncoro, 0);
2925 mg->mg_flags |= MGf_DUP;
2926 RETVAL = sv_bless (newRV_noinc ((SV *)ncoro->hv), SvSTASH (coro->hv));
2927#else
2928 croak ("Coro::State->clone has not been configured into this installation of Coro, realised");
2929#endif
2930}
2931 OUTPUT:
2932 RETVAL
2933
2934int
2935cctx_stacksize (int new_stacksize = 0)
2936 PROTOTYPE: ;$
2937 CODE:
2938 RETVAL = cctx_stacksize;
2939 if (new_stacksize)
416 { 2940 {
417 PUTBACK; 2941 cctx_stacksize = new_stacksize;
418 SAVE (aTHX_ prev); 2942 ++cctx_gen;
419
420 /* 2943 }
421 * this could be done in newprocess which would lead to 2944 OUTPUT:
422 * extremely elegant and fast (just PUTBACK/SAVE/LOAD/SPAGAIN) 2945 RETVAL
423 * code here, but lazy allocation of stacks has also 2946
424 * some virtues and the overhead of the if() is nil. 2947int
2948cctx_max_idle (int max_idle = 0)
2949 PROTOTYPE: ;$
2950 CODE:
2951 RETVAL = cctx_max_idle;
2952 if (max_idle > 1)
2953 cctx_max_idle = max_idle;
2954 OUTPUT:
2955 RETVAL
2956
2957int
2958cctx_count ()
2959 PROTOTYPE:
2960 CODE:
2961 RETVAL = cctx_count;
2962 OUTPUT:
2963 RETVAL
2964
2965int
2966cctx_idle ()
2967 PROTOTYPE:
2968 CODE:
2969 RETVAL = cctx_idle;
2970 OUTPUT:
2971 RETVAL
2972
2973void
2974list ()
2975 PROTOTYPE:
2976 PPCODE:
2977{
2978 struct coro *coro;
2979 for (coro = coro_first; coro; coro = coro->next)
2980 if (coro->hv)
2981 XPUSHs (sv_2mortal (newRV_inc ((SV *)coro->hv)));
2982}
2983
2984void
2985call (Coro::State coro, SV *coderef)
2986 ALIAS:
2987 eval = 1
2988 CODE:
2989{
2990 if (coro->mainstack && ((coro->flags & CF_RUNNING) || coro->slot))
425 */ 2991 {
426 if (next->mainstack) 2992 struct coro temp;
2993
2994 if (!(coro->flags & CF_RUNNING))
427 { 2995 {
428 LOAD (aTHX_ next); 2996 PUTBACK;
429 next->mainstack = 0; /* unnecessary but much cleaner */ 2997 save_perl (aTHX_ &temp);
2998 load_perl (aTHX_ coro);
2999 }
3000
3001 {
3002 dSP;
3003 ENTER;
3004 SAVETMPS;
3005 PUTBACK;
3006 PUSHSTACK;
3007 PUSHMARK (SP);
3008
3009 if (ix)
3010 eval_sv (coderef, 0);
3011 else
3012 call_sv (coderef, G_KEEPERR | G_EVAL | G_VOID | G_DISCARD);
3013
3014 POPSTACK;
3015 SPAGAIN;
3016 FREETMPS;
3017 LEAVE;
3018 PUTBACK;
3019 }
3020
3021 if (!(coro->flags & CF_RUNNING))
3022 {
3023 save_perl (aTHX_ coro);
3024 load_perl (aTHX_ &temp);
430 SPAGAIN; 3025 SPAGAIN;
431 } 3026 }
432 else
433 {
434 /*
435 * emulate part of the perl startup here.
436 */
437 UNOP myop;
438
439 init_stacks (); /* from perl.c */
440 PL_op = (OP *)&myop;
441 /*PL_curcop = 0;*/
442 GvAV (PL_defgv) = (AV *)SvREFCNT_inc ((SV *)next->args);
443
444 SPAGAIN;
445 Zero(&myop, 1, UNOP);
446 myop.op_next = Nullop;
447 myop.op_flags = OPf_WANT_VOID;
448
449 PUSHMARK(SP);
450 XPUSHs ((SV*)get_cv(SUB_INIT, TRUE));
451 PUTBACK;
452 /*
453 * the next line is slightly wrong, as PL_op->op_next
454 * is actually being executed so we skip the first op.
455 * that doesn't matter, though, since it is only
456 * pp_nextstate and we never return...
457 */
458 PL_op = Perl_pp_entersub(aTHX);
459 SPAGAIN;
460
461 ENTER;
462 }
463 } 3027 }
3028}
3029
3030SV *
3031is_ready (Coro::State coro)
3032 PROTOTYPE: $
3033 ALIAS:
3034 is_ready = CF_READY
3035 is_running = CF_RUNNING
3036 is_new = CF_NEW
3037 is_destroyed = CF_DESTROYED
3038 CODE:
3039 RETVAL = boolSV (coro->flags & ix);
3040 OUTPUT:
3041 RETVAL
464 3042
465void 3043void
466DESTROY(coro) 3044throw (Coro::State self, SV *throw = &PL_sv_undef)
467 Coro::State coro 3045 PROTOTYPE: $;$
468 CODE: 3046 CODE:
3047{
3048 struct coro *current = SvSTATE_current;
3049 SV **throwp = self == current ? &CORO_THROW : &self->except;
3050 SvREFCNT_dec (*throwp);
3051 *throwp = SvOK (throw) ? newSVsv (throw) : 0;
3052}
469 3053
470 if (coro->mainstack) 3054void
3055api_trace (SV *coro, int flags = CC_TRACE | CC_TRACE_SUB)
3056 PROTOTYPE: $;$
3057 C_ARGS: aTHX_ coro, flags
3058
3059SV *
3060has_cctx (Coro::State coro)
3061 PROTOTYPE: $
3062 CODE:
3063 /* maybe manage the running flag differently */
3064 RETVAL = boolSV (!!coro->cctx || (coro->flags & CF_RUNNING));
3065 OUTPUT:
3066 RETVAL
3067
3068int
3069is_traced (Coro::State coro)
3070 PROTOTYPE: $
3071 CODE:
3072 RETVAL = (coro->cctx ? coro->cctx->flags : 0) & CC_TRACE_ALL;
3073 OUTPUT:
3074 RETVAL
3075
3076UV
3077rss (Coro::State coro)
3078 PROTOTYPE: $
3079 ALIAS:
3080 usecount = 1
3081 CODE:
3082 switch (ix)
3083 {
3084 case 0: RETVAL = coro_rss (aTHX_ coro); break;
3085 case 1: RETVAL = coro->usecount; break;
3086 }
3087 OUTPUT:
3088 RETVAL
3089
3090void
3091force_cctx ()
3092 PROTOTYPE:
3093 CODE:
3094 cctx_current->idle_sp = 0;
3095
3096void
3097swap_defsv (Coro::State self)
3098 PROTOTYPE: $
3099 ALIAS:
3100 swap_defav = 1
3101 CODE:
3102 if (!self->slot)
3103 croak ("cannot swap state with coroutine that has no saved state,");
3104 else
471 { 3105 {
472 struct coro temp; 3106 SV **src = ix ? (SV **)&GvAV (PL_defgv) : &GvSV (PL_defgv);
3107 SV **dst = ix ? (SV **)&self->slot->defav : (SV **)&self->slot->defsv;
473 3108
3109 SV *tmp = *src; *src = *dst; *dst = tmp;
3110 }
3111
3112
3113MODULE = Coro::State PACKAGE = Coro
3114
3115BOOT:
3116{
3117 int i;
3118
3119 sv_pool_rss = coro_get_sv (aTHX_ "Coro::POOL_RSS" , TRUE);
3120 sv_pool_size = coro_get_sv (aTHX_ "Coro::POOL_SIZE" , TRUE);
3121 cv_coro_run = get_cv ( "Coro::_terminate", GV_ADD);
3122 cv_coro_terminate = get_cv ( "Coro::terminate" , GV_ADD);
3123 coro_current = coro_get_sv (aTHX_ "Coro::current" , FALSE); SvREADONLY_on (coro_current);
3124 av_async_pool = coro_get_av (aTHX_ "Coro::async_pool", TRUE);
3125 av_destroy = coro_get_av (aTHX_ "Coro::destroy" , TRUE);
3126 sv_manager = coro_get_sv (aTHX_ "Coro::manager" , TRUE);
3127 sv_idle = coro_get_sv (aTHX_ "Coro::idle" , TRUE);
3128
3129 sv_async_pool_idle = newSVpv ("[async pool idle]", 0); SvREADONLY_on (sv_async_pool_idle);
3130 sv_Coro = newSVpv ("Coro", 0); SvREADONLY_on (sv_Coro);
3131 cv_pool_handler = get_cv ("Coro::pool_handler", GV_ADD); SvREADONLY_on (cv_pool_handler);
3132 cv_coro_state_new = get_cv ("Coro::State::new", 0); SvREADONLY_on (cv_coro_state_new);
3133
3134 coro_stash = gv_stashpv ("Coro", TRUE);
3135
3136 newCONSTSUB (coro_stash, "PRIO_MAX", newSViv (PRIO_MAX));
3137 newCONSTSUB (coro_stash, "PRIO_HIGH", newSViv (PRIO_HIGH));
3138 newCONSTSUB (coro_stash, "PRIO_NORMAL", newSViv (PRIO_NORMAL));
3139 newCONSTSUB (coro_stash, "PRIO_LOW", newSViv (PRIO_LOW));
3140 newCONSTSUB (coro_stash, "PRIO_IDLE", newSViv (PRIO_IDLE));
3141 newCONSTSUB (coro_stash, "PRIO_MIN", newSViv (PRIO_MIN));
3142
3143 for (i = PRIO_MAX - PRIO_MIN + 1; i--; )
3144 coro_ready[i] = newAV ();
3145
3146 {
3147 SV *sv = coro_get_sv (aTHX_ "Coro::API", TRUE);
3148
3149 coroapi.schedule = api_schedule;
3150 coroapi.schedule_to = api_schedule_to;
3151 coroapi.cede = api_cede;
3152 coroapi.cede_notself = api_cede_notself;
3153 coroapi.ready = api_ready;
3154 coroapi.is_ready = api_is_ready;
3155 coroapi.nready = coro_nready;
3156 coroapi.current = coro_current;
3157
3158 /*GCoroAPI = &coroapi;*/
3159 sv_setiv (sv, (IV)&coroapi);
3160 SvREADONLY_on (sv);
3161 }
3162}
3163
3164void
3165terminate (...)
3166 CODE:
3167 CORO_EXECUTE_SLF_XS (slf_init_terminate);
3168
3169void
3170schedule (...)
3171 CODE:
3172 CORO_EXECUTE_SLF_XS (slf_init_schedule);
3173
3174void
3175schedule_to (...)
3176 CODE:
3177 CORO_EXECUTE_SLF_XS (slf_init_schedule_to);
3178
3179void
3180cede_to (...)
3181 CODE:
3182 CORO_EXECUTE_SLF_XS (slf_init_cede_to);
3183
3184void
3185cede (...)
3186 CODE:
3187 CORO_EXECUTE_SLF_XS (slf_init_cede);
3188
3189void
3190cede_notself (...)
3191 CODE:
3192 CORO_EXECUTE_SLF_XS (slf_init_cede_notself);
3193
3194void
3195_cancel (Coro::State self)
3196 CODE:
3197 coro_state_destroy (aTHX_ self);
3198 coro_call_on_destroy (aTHX_ self);
3199
3200void
3201_set_current (SV *current)
3202 PROTOTYPE: $
3203 CODE:
3204 SvREFCNT_dec (SvRV (coro_current));
3205 SvRV_set (coro_current, SvREFCNT_inc_NN (SvRV (current)));
3206
3207void
3208_set_readyhook (SV *hook)
3209 PROTOTYPE: $
3210 CODE:
3211 SvREFCNT_dec (coro_readyhook);
3212 coro_readyhook = SvOK (hook) ? newSVsv (hook) : 0;
3213
3214int
3215prio (Coro::State coro, int newprio = 0)
3216 PROTOTYPE: $;$
3217 ALIAS:
3218 nice = 1
3219 CODE:
3220{
3221 RETVAL = coro->prio;
3222
3223 if (items > 1)
3224 {
3225 if (ix)
3226 newprio = coro->prio - newprio;
3227
3228 if (newprio < PRIO_MIN) newprio = PRIO_MIN;
3229 if (newprio > PRIO_MAX) newprio = PRIO_MAX;
3230
3231 coro->prio = newprio;
3232 }
3233}
3234 OUTPUT:
3235 RETVAL
3236
3237SV *
3238ready (SV *self)
3239 PROTOTYPE: $
3240 CODE:
3241 RETVAL = boolSV (api_ready (aTHX_ self));
3242 OUTPUT:
3243 RETVAL
3244
3245int
3246nready (...)
3247 PROTOTYPE:
3248 CODE:
3249 RETVAL = coro_nready;
3250 OUTPUT:
3251 RETVAL
3252
3253void
3254_pool_handler (...)
3255 CODE:
3256 CORO_EXECUTE_SLF_XS (slf_init_pool_handler);
3257
3258void
3259async_pool (SV *cv, ...)
3260 PROTOTYPE: &@
3261 PPCODE:
3262{
3263 HV *hv = (HV *)av_pop (av_async_pool);
3264 AV *av = newAV ();
3265 SV *cb = ST (0);
3266 int i;
3267
3268 av_extend (av, items - 2);
3269 for (i = 1; i < items; ++i)
3270 av_push (av, SvREFCNT_inc_NN (ST (i)));
3271
3272 if ((SV *)hv == &PL_sv_undef)
3273 {
3274 PUSHMARK (SP);
3275 EXTEND (SP, 2);
3276 PUSHs (sv_Coro);
3277 PUSHs ((SV *)cv_pool_handler);
474 PUTBACK; 3278 PUTBACK;
475 SAVE(aTHX_ (&temp)); 3279 call_sv ((SV *)cv_coro_state_new, G_SCALAR);
476 LOAD(aTHX_ coro);
477
478 destroy_stacks ();
479 SvREFCNT_dec ((SV *)GvAV (PL_defgv));
480
481 LOAD((&temp));
482 SPAGAIN; 3280 SPAGAIN;
3281
3282 hv = (HV *)SvREFCNT_inc_NN (SvRV (POPs));
483 } 3283 }
484 3284
3285 {
3286 struct coro *coro = SvSTATE_hv (hv);
3287
3288 assert (!coro->invoke_cb);
3289 assert (!coro->invoke_av);
3290 coro->invoke_cb = SvREFCNT_inc (cb);
3291 coro->invoke_av = av;
3292 }
3293
3294 api_ready (aTHX_ (SV *)hv);
3295
3296 if (GIMME_V != G_VOID)
3297 XPUSHs (sv_2mortal (newRV_noinc ((SV *)hv)));
3298 else
485 SvREFCNT_dec (coro->args); 3299 SvREFCNT_dec (hv);
486 Safefree (coro); 3300}
487 3301
3302SV *
3303rouse_cb ()
3304 PROTOTYPE:
3305 CODE:
3306 RETVAL = coro_new_rouse_cb (aTHX);
3307 OUTPUT:
3308 RETVAL
488 3309
3310void
3311rouse_wait (...)
3312 PROTOTYPE: ;$
3313 PPCODE:
3314 CORO_EXECUTE_SLF_XS (slf_init_rouse_wait);
3315
3316
3317MODULE = Coro::State PACKAGE = PerlIO::cede
3318
3319BOOT:
3320 PerlIO_define_layer (aTHX_ &PerlIO_cede);
3321
3322
3323MODULE = Coro::State PACKAGE = Coro::Semaphore
3324
3325SV *
3326new (SV *klass, SV *count = 0)
3327 CODE:
3328 RETVAL = sv_bless (
3329 coro_waitarray_new (aTHX_ count && SvOK (count) ? SvIV (count) : 1),
3330 GvSTASH (CvGV (cv))
3331 );
3332 OUTPUT:
3333 RETVAL
3334
3335# helper for Coro::Channel
3336SV *
3337_alloc (int count)
3338 CODE:
3339 RETVAL = coro_waitarray_new (aTHX_ count);
3340 OUTPUT:
3341 RETVAL
3342
3343SV *
3344count (SV *self)
3345 CODE:
3346 RETVAL = newSVsv (AvARRAY ((AV *)SvRV (self))[0]);
3347 OUTPUT:
3348 RETVAL
3349
3350void
3351up (SV *self, int adjust = 1)
3352 ALIAS:
3353 adjust = 1
3354 CODE:
3355 coro_semaphore_adjust (aTHX_ (AV *)SvRV (self), ix ? adjust : 1);
3356
3357void
3358down (...)
3359 CODE:
3360 CORO_EXECUTE_SLF_XS (slf_init_semaphore_down);
3361
3362void
3363wait (...)
3364 CODE:
3365 CORO_EXECUTE_SLF_XS (slf_init_semaphore_wait);
3366
3367void
3368try (SV *self)
3369 PPCODE:
3370{
3371 AV *av = (AV *)SvRV (self);
3372 SV *count_sv = AvARRAY (av)[0];
3373 IV count = SvIVX (count_sv);
3374
3375 if (count > 0)
3376 {
3377 --count;
3378 SvIVX (count_sv) = count;
3379 XSRETURN_YES;
3380 }
3381 else
3382 XSRETURN_NO;
3383}
3384
3385void
3386waiters (SV *self)
3387 PPCODE:
3388{
3389 AV *av = (AV *)SvRV (self);
3390 int wcount = AvFILLp (av) + 1 - 1;
3391
3392 if (GIMME_V == G_SCALAR)
3393 XPUSHs (sv_2mortal (newSViv (wcount)));
3394 else
3395 {
3396 int i;
3397 EXTEND (SP, wcount);
3398 for (i = 1; i <= wcount; ++i)
3399 PUSHs (sv_2mortal (newRV_inc (AvARRAY (av)[i])));
3400 }
3401}
3402
3403MODULE = Coro::State PACKAGE = Coro::Signal
3404
3405SV *
3406new (SV *klass)
3407 CODE:
3408 RETVAL = sv_bless (
3409 coro_waitarray_new (aTHX_ 0),
3410 GvSTASH (CvGV (cv))
3411 );
3412 OUTPUT:
3413 RETVAL
3414
3415void
3416wait (...)
3417 CODE:
3418 CORO_EXECUTE_SLF_XS (slf_init_signal_wait);
3419
3420void
3421broadcast (SV *self)
3422 CODE:
3423{
3424 AV *av = (AV *)SvRV (self);
3425 coro_signal_wake (aTHX_ av, AvFILLp (av));
3426}
3427
3428void
3429send (SV *self)
3430 CODE:
3431{
3432 AV *av = (AV *)SvRV (self);
3433
3434 if (AvFILLp (av))
3435 coro_signal_wake (aTHX_ av, 1);
3436 else
3437 SvIVX (AvARRAY (av)[0]) = 1; /* remember the signal */
3438}
3439
3440IV
3441awaited (SV *self)
3442 CODE:
3443 RETVAL = AvFILLp ((AV *)SvRV (self)) + 1 - 1;
3444 OUTPUT:
3445 RETVAL
3446
3447
3448MODULE = Coro::State PACKAGE = Coro::AnyEvent
3449
3450BOOT:
3451 sv_activity = coro_get_sv (aTHX_ "Coro::AnyEvent::ACTIVITY", TRUE);
3452
3453void
3454_schedule (...)
3455 CODE:
3456{
3457 static int incede;
3458
3459 api_cede_notself (aTHX);
3460
3461 ++incede;
3462 while (coro_nready >= incede && api_cede (aTHX))
3463 ;
3464
3465 sv_setsv (sv_activity, &PL_sv_undef);
3466 if (coro_nready >= incede)
3467 {
3468 PUSHMARK (SP);
3469 PUTBACK;
3470 call_pv ("Coro::AnyEvent::_activity", G_KEEPERR | G_EVAL | G_VOID | G_DISCARD);
3471 }
3472
3473 --incede;
3474}
3475
3476
3477MODULE = Coro::State PACKAGE = Coro::AIO
3478
3479void
3480_register (char *target, char *proto, SV *req)
3481 CODE:
3482{
3483 CV *req_cv = coro_sv_2cv (aTHX_ req);
3484 /* newXSproto doesn't return the CV on 5.8 */
3485 CV *slf_cv = newXS (target, coro_aio_req_xs, __FILE__);
3486 sv_setpv ((SV *)slf_cv, proto);
3487 sv_magicext ((SV *)slf_cv, (SV *)req_cv, CORO_MAGIC_type_aio, 0, 0, 0);
3488}
3489

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines