ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/Coro/Coro/State.xs
(Generate patch)

Comparing Coro/Coro/State.xs (file contents):
Revision 1.4 by root, Tue Jul 17 02:21:56 2001 UTC vs.
Revision 1.323 by root, Sat Nov 22 06:03:10 2008 UTC

1#include "libcoro/coro.c"
2
3#define PERL_NO_GET_CONTEXT
4#define PERL_EXT
5
1#include "EXTERN.h" 6#include "EXTERN.h"
2#include "perl.h" 7#include "perl.h"
3#include "XSUB.h" 8#include "XSUB.h"
9#include "perliol.h"
4 10
5#if 0 11#include "patchlevel.h"
6# define CHK(x) (void *)0 12
13#include <stdio.h>
14#include <errno.h>
15#include <assert.h>
16
17#ifdef WIN32
18# undef setjmp
19# undef longjmp
20# undef _exit
21# define setjmp _setjmp /* deep magic */
7#else 22#else
8# define CHK(x) if (!(x)) croak("FATAL, CHK: " #x) 23# include <inttypes.h> /* most portable stdint.h */
24#endif
25
26#ifdef HAVE_MMAP
27# include <unistd.h>
28# include <sys/mman.h>
29# ifndef MAP_ANONYMOUS
30# ifdef MAP_ANON
31# define MAP_ANONYMOUS MAP_ANON
32# else
33# undef HAVE_MMAP
34# endif
9#endif 35# endif
36# include <limits.h>
37# ifndef PAGESIZE
38# define PAGESIZE pagesize
39# define BOOT_PAGESIZE pagesize = sysconf (_SC_PAGESIZE)
40static long pagesize;
41# else
42# define BOOT_PAGESIZE (void)0
43# endif
44#else
45# define PAGESIZE 0
46# define BOOT_PAGESIZE (void)0
47#endif
10 48
49#if CORO_USE_VALGRIND
50# include <valgrind/valgrind.h>
51#endif
52
53/* the maximum number of idle cctx that will be pooled */
54static int cctx_max_idle = 4;
55
56#define PERL_VERSION_ATLEAST(a,b,c) \
57 (PERL_REVISION > (a) \
58 || (PERL_REVISION == (a) \
59 && (PERL_VERSION > (b) \
60 || (PERL_VERSION == (b) && PERL_SUBVERSION >= (c)))))
61
62#if !PERL_VERSION_ATLEAST (5,6,0)
63# ifndef PL_ppaddr
64# define PL_ppaddr ppaddr
65# endif
66# ifndef call_sv
67# define call_sv perl_call_sv
68# endif
69# ifndef get_sv
70# define get_sv perl_get_sv
71# endif
72# ifndef get_cv
73# define get_cv perl_get_cv
74# endif
75# ifndef IS_PADGV
76# define IS_PADGV(v) 0
77# endif
78# ifndef IS_PADCONST
79# define IS_PADCONST(v) 0
80# endif
81#endif
82
83/* 5.11 */
84#ifndef CxHASARGS
85# define CxHASARGS(cx) (cx)->blk_sub.hasargs
86#endif
87
88/* 5.10.0 */
89#ifndef SvREFCNT_inc_NN
90# define SvREFCNT_inc_NN(sv) SvREFCNT_inc (sv)
91#endif
92
93/* 5.8.8 */
94#ifndef GV_NOTQUAL
95# define GV_NOTQUAL 0
96#endif
97#ifndef newSV
98# define newSV(l) NEWSV(0,l)
99#endif
100#ifndef CvISXSUB_on
101# define CvISXSUB_on(cv) (void)cv
102#endif
103#ifndef CvISXSUB
104# define CvISXSUB(cv) (CvXSUB (cv) ? TRUE : FALSE)
105#endif
106#ifndef Newx
107# define Newx(ptr,nitems,type) New (0,ptr,nitems,type)
108#endif
109
110/* 5.8.7 */
111#ifndef SvRV_set
112# define SvRV_set(s,v) SvRV(s) = (v)
113#endif
114
115#if !__i386 && !__x86_64 && !__powerpc && !__m68k && !__alpha && !__mips && !__sparc64
116# undef CORO_STACKGUARD
117#endif
118
119#ifndef CORO_STACKGUARD
120# define CORO_STACKGUARD 0
121#endif
122
123/* prefer perl internal functions over our own? */
124#ifndef CORO_PREFER_PERL_FUNCTIONS
125# define CORO_PREFER_PERL_FUNCTIONS 0
126#endif
127
128/* The next macros try to return the current stack pointer, in an as
129 * portable way as possible. */
130#if __GNUC__ >= 4
131# define dSTACKLEVEL int stacklevel_dummy
132# define STACKLEVEL __builtin_frame_address (0)
133#else
134# define dSTACKLEVEL volatile void *stacklevel
135# define STACKLEVEL ((void *)&stacklevel)
136#endif
137
138#define IN_DESTRUCT (PL_main_cv == Nullcv)
139
140#if __GNUC__ >= 3
141# define attribute(x) __attribute__(x)
142# define expect(expr,value) __builtin_expect ((expr),(value))
143# define INLINE static inline
144#else
145# define attribute(x)
146# define expect(expr,value) (expr)
147# define INLINE static
148#endif
149
150#define expect_false(expr) expect ((expr) != 0, 0)
151#define expect_true(expr) expect ((expr) != 0, 1)
152
153#define NOINLINE attribute ((noinline))
154
155#include "CoroAPI.h"
156#define GCoroAPI (&coroapi) /* very sneaky */
157
158#ifdef USE_ITHREADS
159# if CORO_PTHREAD
160static void *coro_thx;
161# endif
162#endif
163
164static double (*nvtime)(); /* so why doesn't it take void? */
165
166/* we hijack an hopefully unused CV flag for our purposes */
167#define CVf_SLF 0x4000
168static OP *pp_slf (pTHX);
169
170static U32 cctx_gen;
171static size_t cctx_stacksize = CORO_STACKSIZE;
172static struct CoroAPI coroapi;
173static AV *main_mainstack; /* used to differentiate between $main and others */
174static JMPENV *main_top_env;
175static HV *coro_state_stash, *coro_stash;
176static volatile SV *coro_mortal; /* will be freed/thrown after next transfer */
177
178static AV *av_destroy; /* destruction queue */
179static SV *sv_manager; /* the manager coro */
180
181static GV *irsgv; /* $/ */
182static GV *stdoutgv; /* *STDOUT */
183static SV *rv_diehook;
184static SV *rv_warnhook;
185static HV *hv_sig; /* %SIG */
186
187/* async_pool helper stuff */
188static SV *sv_pool_rss;
189static SV *sv_pool_size;
190static SV *sv_async_pool_idle; /* description string */
191static AV *av_async_pool; /* idle pool */
192static SV *sv_Coro; /* class string */
193static CV *cv_pool_handler;
194static CV *cv_coro_state_new;
195
196/* Coro::AnyEvent */
197static SV *sv_activity;
198
199static struct coro_cctx *cctx_first;
200static int cctx_count, cctx_idle;
201
202enum {
203 CC_MAPPED = 0x01,
204 CC_NOREUSE = 0x02, /* throw this away after tracing */
205 CC_TRACE = 0x04,
206 CC_TRACE_SUB = 0x08, /* trace sub calls */
207 CC_TRACE_LINE = 0x10, /* trace each statement */
208 CC_TRACE_ALL = CC_TRACE_SUB | CC_TRACE_LINE,
209};
210
211/* this is a structure representing a c-level coroutine */
212typedef struct coro_cctx
213{
214 struct coro_cctx *next;
215
216 /* the stack */
217 void *sptr;
218 size_t ssize;
219
220 /* cpu state */
221 void *idle_sp; /* sp of top-level transfer/schedule/cede call */
222 JMPENV *idle_te; /* same as idle_sp, but for top_env, TODO: remove once stable */
223 JMPENV *top_env;
224 coro_context cctx;
225
226 U32 gen;
227#if CORO_USE_VALGRIND
228 int valgrind_id;
229#endif
230 unsigned char flags;
231} coro_cctx;
232
233coro_cctx *cctx_current; /* the currently running cctx */
234
235/*****************************************************************************/
236
237enum {
238 CF_RUNNING = 0x0001, /* coroutine is running */
239 CF_READY = 0x0002, /* coroutine is ready */
240 CF_NEW = 0x0004, /* has never been switched to */
241 CF_DESTROYED = 0x0008, /* coroutine data has been freed */
242};
243
244/* the structure where most of the perl state is stored, overlaid on the cxstack */
245typedef struct
246{
247 SV *defsv;
248 AV *defav;
249 SV *errsv;
250 SV *irsgv;
251#define VAR(name,type) type name;
252# include "state.h"
253#undef VAR
254} perl_slots;
255
256#define SLOT_COUNT ((sizeof (perl_slots) + sizeof (PERL_CONTEXT) - 1) / sizeof (PERL_CONTEXT))
257
258/* this is a structure representing a perl-level coroutine */
11struct coro { 259struct coro {
12 U8 dowarn; 260 /* the C coroutine allocated to this perl coroutine, if any */
13 AV *defav; 261 coro_cctx *cctx;
14 262
15 PERL_SI *curstackinfo; 263 /* state data */
16 AV *curstack; 264 struct CoroSLF slf_frame; /* saved slf frame */
17 AV *mainstack; 265 AV *mainstack;
18 SV **stack_sp; 266 perl_slots *slot; /* basically the saved sp */
19 OP *op;
20 SV **curpad;
21 SV **stack_base;
22 SV **stack_max;
23 SV **tmps_stack;
24 I32 tmps_floor;
25 I32 tmps_ix;
26 I32 tmps_max;
27 I32 *markstack;
28 I32 *markstack_ptr;
29 I32 *markstack_max;
30 I32 *scopestack;
31 I32 scopestack_ix;
32 I32 scopestack_max;
33 ANY *savestack;
34 I32 savestack_ix;
35 I32 savestack_max;
36 OP **retstack;
37 I32 retstack_ix;
38 I32 retstack_max;
39 COP *curcop;
40 267
41 AV *args; 268 CV *startcv; /* the CV to execute */
269 AV *args; /* data associated with this coroutine (initial args) */
270 int refcnt; /* coroutines are refcounted, yes */
271 int flags; /* CF_ flags */
272 HV *hv; /* the perl hash associated with this coro, if any */
273 void (*on_destroy)(pTHX_ struct coro *coro);
274
275 /* statistics */
276 int usecount; /* number of transfers to this coro */
277
278 /* coro process data */
279 int prio;
280 SV *except; /* exception to be thrown */
281 SV *rouse_cb;
282
283 /* async_pool */
284 SV *saved_deffh;
285 SV *invoke_cb;
286 AV *invoke_av;
287
288 /* linked list */
289 struct coro *next, *prev;
42}; 290};
43 291
44typedef struct coro *Coro__State; 292typedef struct coro *Coro__State;
45typedef struct coro *Coro__State_or_hashref; 293typedef struct coro *Coro__State_or_hashref;
46 294
47static HV *padlist_cache; 295/* the following variables are effectively part of the perl context */
296/* and get copied between struct coro and these variables */
297/* the mainr easonw e don't support windows process emulation */
298static struct CoroSLF slf_frame; /* the current slf frame */
48 299
49/* mostly copied from op.c:cv_clone2 */ 300/** Coro ********************************************************************/
50STATIC AV * 301
51clone_padlist (AV *protopadlist) 302#define PRIO_MAX 3
303#define PRIO_HIGH 1
304#define PRIO_NORMAL 0
305#define PRIO_LOW -1
306#define PRIO_IDLE -3
307#define PRIO_MIN -4
308
309/* for Coro.pm */
310static SV *coro_current;
311static SV *coro_readyhook;
312static AV *coro_ready [PRIO_MAX - PRIO_MIN + 1];
313static CV *cv_coro_run, *cv_coro_terminate;
314static struct coro *coro_first;
315#define coro_nready coroapi.nready
316
317/** lowlevel stuff **********************************************************/
318
319static SV *
320coro_get_sv (pTHX_ const char *name, int create)
52{ 321{
53 AV *av; 322#if PERL_VERSION_ATLEAST (5,10,0)
54 I32 ix; 323 /* silence stupid and wrong 5.10 warning that I am unable to switch off */
55 AV *protopad_name = (AV *) * av_fetch (protopadlist, 0, FALSE); 324 get_sv (name, create);
56 AV *protopad = (AV *) * av_fetch (protopadlist, 1, FALSE); 325#endif
57 SV **pname = AvARRAY (protopad_name); 326 return get_sv (name, create);
58 SV **ppad = AvARRAY (protopad); 327}
59 I32 fname = AvFILLp (protopad_name); 328
60 I32 fpad = AvFILLp (protopad); 329static AV *
330coro_get_av (pTHX_ const char *name, int create)
331{
332#if PERL_VERSION_ATLEAST (5,10,0)
333 /* silence stupid and wrong 5.10 warning that I am unable to switch off */
334 get_av (name, create);
335#endif
336 return get_av (name, create);
337}
338
339static HV *
340coro_get_hv (pTHX_ const char *name, int create)
341{
342#if PERL_VERSION_ATLEAST (5,10,0)
343 /* silence stupid and wrong 5.10 warning that I am unable to switch off */
344 get_hv (name, create);
345#endif
346 return get_hv (name, create);
347}
348
349/* may croak */
350INLINE CV *
351coro_sv_2cv (pTHX_ SV *sv)
352{
353 HV *st;
354 GV *gvp;
355 return sv_2cv (sv, &st, &gvp, 0);
356}
357
358static AV *
359coro_derive_padlist (pTHX_ CV *cv)
360{
361 AV *padlist = CvPADLIST (cv);
61 AV *newpadlist, *newpad_name, *newpad; 362 AV *newpadlist, *newpad;
62 SV **npad;
63
64 newpad_name = newAV ();
65 for (ix = fname; ix >= 0; ix--)
66 av_store (newpad_name, ix, SvREFCNT_inc (pname[ix]));
67
68 newpad = newAV ();
69 av_fill (newpad, AvFILLp (protopad));
70 npad = AvARRAY (newpad);
71 363
72 newpadlist = newAV (); 364 newpadlist = newAV ();
73 AvREAL_off (newpadlist); 365 AvREAL_off (newpadlist);
74 av_store (newpadlist, 0, (SV *) newpad_name); 366#if PERL_VERSION_ATLEAST (5,10,0)
367 Perl_pad_push (aTHX_ padlist, AvFILLp (padlist) + 1);
368#else
369 Perl_pad_push (aTHX_ padlist, AvFILLp (padlist) + 1, 1);
370#endif
371 newpad = (AV *)AvARRAY (padlist)[AvFILLp (padlist)];
372 --AvFILLp (padlist);
373
374 av_store (newpadlist, 0, SvREFCNT_inc_NN (*av_fetch (padlist, 0, FALSE)));
75 av_store (newpadlist, 1, (SV *) newpad); 375 av_store (newpadlist, 1, (SV *)newpad);
76 376
77 av = newAV (); /* will be @_ */ 377 return newpadlist;
78 av_extend (av, 0); 378}
79 av_store (newpad, 0, (SV *) av);
80 AvFLAGS (av) = AVf_REIFY;
81 379
82 for (ix = fpad; ix > 0; ix--) 380static void
381free_padlist (pTHX_ AV *padlist)
382{
383 /* may be during global destruction */
384 if (SvREFCNT (padlist))
83 { 385 {
84 SV *namesv = (ix <= fname) ? pname[ix] : Nullsv; 386 I32 i = AvFILLp (padlist);
85 if (namesv && namesv != &PL_sv_undef) 387 while (i >= 0)
86 { 388 {
87 char *name = SvPVX (namesv); /* XXX */ 389 SV **svp = av_fetch (padlist, i--, FALSE);
88 if (SvFLAGS (namesv) & SVf_FAKE || *name == '&') 390 if (svp)
89 { /* lexical from outside? */
90 npad[ix] = SvREFCNT_inc (ppad[ix]);
91 } 391 {
92 else
93 { /* our own lexical */
94 SV *sv; 392 SV *sv;
95 if (*name == '&') 393 while (&PL_sv_undef != (sv = av_pop ((AV *)*svp)))
96 sv = SvREFCNT_inc (ppad[ix]); 394 SvREFCNT_dec (sv);
97 else if (*name == '@') 395
98 sv = (SV *) newAV (); 396 SvREFCNT_dec (*svp);
99 else if (*name == '%')
100 sv = (SV *) newHV ();
101 else
102 sv = NEWSV (0, 0);
103 if (!SvPADBUSY (sv))
104 SvPADMY_on (sv);
105 npad[ix] = sv;
106 } 397 }
107 } 398 }
108 else if (IS_PADGV (ppad[ix]) || IS_PADCONST (ppad[ix]))
109 {
110 npad[ix] = SvREFCNT_inc (ppad[ix]);
111 }
112 else
113 {
114 SV *sv = NEWSV (0, 0);
115 SvPADTMP_on (sv);
116 npad[ix] = sv;
117 }
118 }
119 399
120#if 0 /* NONOTUNDERSTOOD */
121 /* Now that vars are all in place, clone nested closures. */
122
123 for (ix = fpad; ix > 0; ix--) {
124 SV* namesv = (ix <= fname) ? pname[ix] : Nullsv;
125 if (namesv
126 && namesv != &PL_sv_undef
127 && !(SvFLAGS(namesv) & SVf_FAKE)
128 && *SvPVX(namesv) == '&'
129 && CvCLONE(ppad[ix]))
130 {
131 CV *kid = cv_clone((CV*)ppad[ix]);
132 SvREFCNT_dec(ppad[ix]);
133 CvCLONE_on(kid);
134 SvPADMY_on(kid);
135 npad[ix] = (SV*)kid;
136 }
137 }
138#endif
139
140 return newpadlist;
141}
142
143STATIC AV *
144free_padlist (AV *padlist)
145{
146 /* may be during global destruction */
147 if (SvREFCNT(padlist))
148 {
149 I32 i = AvFILLp(padlist);
150 while (i >= 0)
151 {
152 SV **svp = av_fetch(padlist, i--, FALSE);
153 SV *sv = svp ? *svp : Nullsv;
154 if (sv)
155 SvREFCNT_dec(sv);
156 }
157
158 SvREFCNT_dec((SV*)padlist); 400 SvREFCNT_dec ((SV*)padlist);
401 }
402}
403
404static int
405coro_cv_free (pTHX_ SV *sv, MAGIC *mg)
406{
407 AV *padlist;
408 AV *av = (AV *)mg->mg_obj;
409
410 /* casting is fun. */
411 while (&PL_sv_undef != (SV *)(padlist = (AV *)av_pop (av)))
412 free_padlist (aTHX_ padlist);
413
414 SvREFCNT_dec (av); /* sv_magicext increased the refcount */
415
416 return 0;
417}
418
419#define CORO_MAGIC_type_cv 26
420#define CORO_MAGIC_type_state PERL_MAGIC_ext
421
422static MGVTBL coro_cv_vtbl = {
423 0, 0, 0, 0,
424 coro_cv_free
425};
426
427#define CORO_MAGIC_NN(sv, type) \
428 (expect_true (SvMAGIC (sv)->mg_type == type) \
429 ? SvMAGIC (sv) \
430 : mg_find (sv, type))
431
432#define CORO_MAGIC(sv, type) \
433 (expect_true (SvMAGIC (sv)) \
434 ? CORO_MAGIC_NN (sv, type) \
435 : 0)
436
437#define CORO_MAGIC_cv(cv) CORO_MAGIC (((SV *)(cv)), CORO_MAGIC_type_cv)
438#define CORO_MAGIC_state(sv) CORO_MAGIC_NN (((SV *)(sv)), CORO_MAGIC_type_state)
439
440INLINE struct coro *
441SvSTATE_ (pTHX_ SV *coro)
442{
443 HV *stash;
444 MAGIC *mg;
445
446 if (SvROK (coro))
447 coro = SvRV (coro);
448
449 if (expect_false (SvTYPE (coro) != SVt_PVHV))
450 croak ("Coro::State object required");
451
452 stash = SvSTASH (coro);
453 if (expect_false (stash != coro_stash && stash != coro_state_stash))
454 {
455 /* very slow, but rare, check */
456 if (!sv_derived_from (sv_2mortal (newRV_inc (coro)), "Coro::State"))
457 croak ("Coro::State object required");
458 }
459
460 mg = CORO_MAGIC_state (coro);
461 return (struct coro *)mg->mg_ptr;
462}
463
464#define SvSTATE(sv) SvSTATE_ (aTHX_ (sv))
465
466/* faster than SvSTATE, but expects a coroutine hv */
467#define SvSTATE_hv(hv) ((struct coro *)CORO_MAGIC_NN ((SV *)hv, CORO_MAGIC_type_state)->mg_ptr)
468#define SvSTATE_current SvSTATE_hv (SvRV (coro_current))
469
470/* the next two functions merely cache the padlists */
471static void
472get_padlist (pTHX_ CV *cv)
473{
474 MAGIC *mg = CORO_MAGIC_cv (cv);
475 AV *av;
476
477 if (expect_true (mg && AvFILLp ((av = (AV *)mg->mg_obj)) >= 0))
478 CvPADLIST (cv) = (AV *)AvARRAY (av)[AvFILLp (av)--];
479 else
480 {
481#if CORO_PREFER_PERL_FUNCTIONS
482 /* this is probably cleaner? but also slower! */
483 /* in practise, it seems to be less stable */
484 CV *cp = Perl_cv_clone (aTHX_ cv);
485 CvPADLIST (cv) = CvPADLIST (cp);
486 CvPADLIST (cp) = 0;
487 SvREFCNT_dec (cp);
488#else
489 CvPADLIST (cv) = coro_derive_padlist (aTHX_ cv);
490#endif
491 }
492}
493
494static void
495put_padlist (pTHX_ CV *cv)
496{
497 MAGIC *mg = CORO_MAGIC_cv (cv);
498 AV *av;
499
500 if (expect_false (!mg))
501 mg = sv_magicext ((SV *)cv, (SV *)newAV (), CORO_MAGIC_type_cv, &coro_cv_vtbl, 0, 0);
502
503 av = (AV *)mg->mg_obj;
504
505 if (expect_false (AvFILLp (av) >= AvMAX (av)))
506 av_extend (av, AvMAX (av) + 1);
507
508 AvARRAY (av)[++AvFILLp (av)] = (SV *)CvPADLIST (cv);
509}
510
511/** load & save, init *******************************************************/
512
513static void
514load_perl (pTHX_ Coro__State c)
515{
516 perl_slots *slot = c->slot;
517 c->slot = 0;
518
519 PL_mainstack = c->mainstack;
520
521 GvSV (PL_defgv) = slot->defsv;
522 GvAV (PL_defgv) = slot->defav;
523 GvSV (PL_errgv) = slot->errsv;
524 GvSV (irsgv) = slot->irsgv;
525
526 #define VAR(name,type) PL_ ## name = slot->name;
527 # include "state.h"
528 #undef VAR
529
530 {
531 dSP;
532
533 CV *cv;
534
535 /* now do the ugly restore mess */
536 while (expect_true (cv = (CV *)POPs))
537 {
538 put_padlist (aTHX_ cv); /* mark this padlist as available */
539 CvDEPTH (cv) = PTR2IV (POPs);
540 CvPADLIST (cv) = (AV *)POPs;
541 }
542
543 PUTBACK;
159 } 544 }
160}
161 545
162/* the next tow functions merely cache the padlists */ 546 slf_frame = c->slf_frame;
163STATIC void 547 CORO_THROW = c->except;
164get_padlist (CV *cv)
165{
166 SV **he = hv_fetch (padlist_cache, (void *)&cv, sizeof (CV *), 0);
167
168 if (he && AvFILLp ((AV *)*he) >= 0)
169 CvPADLIST (cv) = (AV *)av_pop ((AV *)*he);
170 else
171 CvPADLIST (cv) = clone_padlist (CvPADLIST (cv));
172} 548}
173 549
174STATIC void
175put_padlist (CV *cv)
176{
177 SV **he = hv_fetch (padlist_cache, (void *)&cv, sizeof (CV *), 1);
178
179 if (SvTYPE (*he) != SVt_PVAV)
180 {
181 SvREFCNT_dec (*he);
182 *he = (SV *)newAV ();
183 }
184
185 av_push ((AV *)*he, (SV *)CvPADLIST (cv));
186}
187
188static void 550static void
189SAVE(pTHX_ Coro__State c) 551save_perl (pTHX_ Coro__State c)
190{ 552{
553 c->except = CORO_THROW;
554 c->slf_frame = slf_frame;
555
191 { 556 {
192 dSP; 557 dSP;
193 I32 cxix = cxstack_ix; 558 I32 cxix = cxstack_ix;
559 PERL_CONTEXT *ccstk = cxstack;
194 PERL_SI *top_si = PL_curstackinfo; 560 PERL_SI *top_si = PL_curstackinfo;
195 PERL_CONTEXT *ccstk = cxstack;
196 561
197 /* 562 /*
198 * the worst thing you can imagine happens first - we have to save 563 * the worst thing you can imagine happens first - we have to save
199 * (and reinitialize) all cv's in the whole callchain :( 564 * (and reinitialize) all cv's in the whole callchain :(
200 */ 565 */
201 566
202 PUSHs (Nullsv); 567 XPUSHs (Nullsv);
203 /* this loop was inspired by pp_caller */ 568 /* this loop was inspired by pp_caller */
204 for (;;) 569 for (;;)
205 { 570 {
206 while (cxix >= 0) 571 while (expect_true (cxix >= 0))
207 { 572 {
208 PERL_CONTEXT *cx = &ccstk[cxix--]; 573 PERL_CONTEXT *cx = &ccstk[cxix--];
209 574
210 if (CxTYPE(cx) == CXt_SUB) 575 if (expect_true (CxTYPE (cx) == CXt_SUB || CxTYPE (cx) == CXt_FORMAT))
211 { 576 {
212 CV *cv = cx->blk_sub.cv; 577 CV *cv = cx->blk_sub.cv;
578
213 if (CvDEPTH(cv)) 579 if (expect_true (CvDEPTH (cv)))
214 { 580 {
215#ifdef USE_THREADS
216 XPUSHs ((SV *)CvOWNER(cv));
217#endif
218 EXTEND (SP, 3); 581 EXTEND (SP, 3);
219 PUSHs ((SV *)CvDEPTH(cv));
220 PUSHs ((SV *)CvPADLIST(cv)); 582 PUSHs ((SV *)CvPADLIST (cv));
583 PUSHs (INT2PTR (SV *, (IV)CvDEPTH (cv)));
221 PUSHs ((SV *)cv); 584 PUSHs ((SV *)cv);
222 585
223 get_padlist (cv);
224
225 CvDEPTH(cv) = 0; 586 CvDEPTH (cv) = 0;
226#ifdef USE_THREADS 587 get_padlist (aTHX_ cv);
227 CvOWNER(cv) = 0;
228 error must unlock this cv etc.. etc...
229 if you are here wondering about this error message then
230 the reason is that it will not work as advertised yet
231#endif
232 } 588 }
233 } 589 }
234 else if (CxTYPE(cx) == CXt_FORMAT) 590 }
591
592 if (expect_true (top_si->si_type == PERLSI_MAIN))
593 break;
594
595 top_si = top_si->si_prev;
596 ccstk = top_si->si_cxstack;
597 cxix = top_si->si_cxix;
598 }
599
600 PUTBACK;
601 }
602
603 /* allocate some space on the context stack for our purposes */
604 /* we manually unroll here, as usually 2 slots is enough */
605 if (SLOT_COUNT >= 1) CXINC;
606 if (SLOT_COUNT >= 2) CXINC;
607 if (SLOT_COUNT >= 3) CXINC;
608 {
609 int i;
610 for (i = 3; i < SLOT_COUNT; ++i)
611 CXINC;
612 }
613 cxstack_ix -= SLOT_COUNT; /* undo allocation */
614
615 c->mainstack = PL_mainstack;
616
617 {
618 perl_slots *slot = c->slot = (perl_slots *)(cxstack + cxstack_ix + 1);
619
620 slot->defav = GvAV (PL_defgv);
621 slot->defsv = DEFSV;
622 slot->errsv = ERRSV;
623 slot->irsgv = GvSV (irsgv);
624
625 #define VAR(name,type) slot->name = PL_ ## name;
626 # include "state.h"
627 #undef VAR
628 }
629}
630
631/*
632 * allocate various perl stacks. This is almost an exact copy
633 * of perl.c:init_stacks, except that it uses less memory
634 * on the (sometimes correct) assumption that coroutines do
635 * not usually need a lot of stackspace.
636 */
637#if CORO_PREFER_PERL_FUNCTIONS
638# define coro_init_stacks(thx) init_stacks ()
639#else
640static void
641coro_init_stacks (pTHX)
642{
643 PL_curstackinfo = new_stackinfo(32, 8);
644 PL_curstackinfo->si_type = PERLSI_MAIN;
645 PL_curstack = PL_curstackinfo->si_stack;
646 PL_mainstack = PL_curstack; /* remember in case we switch stacks */
647
648 PL_stack_base = AvARRAY(PL_curstack);
649 PL_stack_sp = PL_stack_base;
650 PL_stack_max = PL_stack_base + AvMAX(PL_curstack);
651
652 New(50,PL_tmps_stack,32,SV*);
653 PL_tmps_floor = -1;
654 PL_tmps_ix = -1;
655 PL_tmps_max = 32;
656
657 New(54,PL_markstack,16,I32);
658 PL_markstack_ptr = PL_markstack;
659 PL_markstack_max = PL_markstack + 16;
660
661#ifdef SET_MARK_OFFSET
662 SET_MARK_OFFSET;
663#endif
664
665 New(54,PL_scopestack,8,I32);
666 PL_scopestack_ix = 0;
667 PL_scopestack_max = 8;
668
669 New(54,PL_savestack,24,ANY);
670 PL_savestack_ix = 0;
671 PL_savestack_max = 24;
672
673#if !PERL_VERSION_ATLEAST (5,10,0)
674 New(54,PL_retstack,4,OP*);
675 PL_retstack_ix = 0;
676 PL_retstack_max = 4;
677#endif
678}
679#endif
680
681/*
682 * destroy the stacks, the callchain etc...
683 */
684static void
685coro_destruct_stacks (pTHX)
686{
687 while (PL_curstackinfo->si_next)
688 PL_curstackinfo = PL_curstackinfo->si_next;
689
690 while (PL_curstackinfo)
691 {
692 PERL_SI *p = PL_curstackinfo->si_prev;
693
694 if (!IN_DESTRUCT)
695 SvREFCNT_dec (PL_curstackinfo->si_stack);
696
697 Safefree (PL_curstackinfo->si_cxstack);
698 Safefree (PL_curstackinfo);
699 PL_curstackinfo = p;
700 }
701
702 Safefree (PL_tmps_stack);
703 Safefree (PL_markstack);
704 Safefree (PL_scopestack);
705 Safefree (PL_savestack);
706#if !PERL_VERSION_ATLEAST (5,10,0)
707 Safefree (PL_retstack);
708#endif
709}
710
711#define CORO_RSS \
712 rss += sizeof (SYM (curstackinfo)); \
713 rss += (SYM (curstackinfo->si_cxmax) + 1) * sizeof (PERL_CONTEXT); \
714 rss += sizeof (SV) + sizeof (struct xpvav) + (1 + AvMAX (SYM (curstack))) * sizeof (SV *); \
715 rss += SYM (tmps_max) * sizeof (SV *); \
716 rss += (SYM (markstack_max) - SYM (markstack_ptr)) * sizeof (I32); \
717 rss += SYM (scopestack_max) * sizeof (I32); \
718 rss += SYM (savestack_max) * sizeof (ANY);
719
720static size_t
721coro_rss (pTHX_ struct coro *coro)
722{
723 size_t rss = sizeof (*coro);
724
725 if (coro->mainstack)
726 {
727 if (coro->flags & CF_RUNNING)
728 {
729 #define SYM(sym) PL_ ## sym
730 CORO_RSS;
731 #undef SYM
732 }
733 else
734 {
735 #define SYM(sym) coro->slot->sym
736 CORO_RSS;
737 #undef SYM
738 }
739 }
740
741 return rss;
742}
743
744/** coroutine stack handling ************************************************/
745
746static int (*orig_sigelem_get) (pTHX_ SV *sv, MAGIC *mg);
747static int (*orig_sigelem_set) (pTHX_ SV *sv, MAGIC *mg);
748static int (*orig_sigelem_clr) (pTHX_ SV *sv, MAGIC *mg);
749
750/* apparently < 5.8.8 */
751#ifndef MgPV_nolen_const
752#define MgPV_nolen_const(mg) (((((int)(mg)->mg_len)) == HEf_SVKEY) ? \
753 SvPV_nolen((SV*)((mg)->mg_ptr)) : \
754 (const char*)(mg)->mg_ptr)
755#endif
756
757/*
758 * This overrides the default magic get method of %SIG elements.
759 * The original one doesn't provide for reading back of PL_diehook/PL_warnhook
760 * and instead of tryign to save and restore the hash elements, we just provide
761 * readback here.
762 * We only do this when the hook is != 0, as they are often set to 0 temporarily,
763 * not expecting this to actually change the hook. This is a potential problem
764 * when a schedule happens then, but we ignore this.
765 */
766static int
767coro_sigelem_get (pTHX_ SV *sv, MAGIC *mg)
768{
769 const char *s = MgPV_nolen_const (mg);
770
771 if (*s == '_')
772 {
773 SV **svp = 0;
774
775 if (strEQ (s, "__DIE__" )) svp = &PL_diehook;
776 if (strEQ (s, "__WARN__")) svp = &PL_warnhook;
777
778 if (svp)
779 {
780 sv_setsv (sv, *svp ? *svp : &PL_sv_undef);
781 return 0;
782 }
783 }
784
785 return orig_sigelem_get ? orig_sigelem_get (aTHX_ sv, mg) : 0;
786}
787
788static int
789coro_sigelem_clr (pTHX_ SV *sv, MAGIC *mg)
790{
791 const char *s = MgPV_nolen_const (mg);
792
793 if (*s == '_')
794 {
795 SV **svp = 0;
796
797 if (strEQ (s, "__DIE__" )) svp = &PL_diehook;
798 if (strEQ (s, "__WARN__")) svp = &PL_warnhook;
799
800 if (svp)
801 {
802 SV *old = *svp;
803 *svp = 0;
804 SvREFCNT_dec (old);
805 return 0;
806 }
807 }
808
809 return orig_sigelem_clr ? orig_sigelem_clr (aTHX_ sv, mg) : 0;
810}
811
812static int
813coro_sigelem_set (pTHX_ SV *sv, MAGIC *mg)
814{
815 const char *s = MgPV_nolen_const (mg);
816
817 if (*s == '_')
818 {
819 SV **svp = 0;
820
821 if (strEQ (s, "__DIE__" )) svp = &PL_diehook;
822 if (strEQ (s, "__WARN__")) svp = &PL_warnhook;
823
824 if (svp)
825 {
826 SV *old = *svp;
827 *svp = newSVsv (sv);
828 SvREFCNT_dec (old);
829 return 0;
830 }
831 }
832
833 return orig_sigelem_set ? orig_sigelem_set (aTHX_ sv, mg) : 0;
834}
835
836static void
837prepare_nop (pTHX_ struct coro_transfer_args *ta)
838{
839 /* kind of mega-hacky, but works */
840 ta->next = ta->prev = (struct coro *)ta;
841}
842
843static int
844slf_check_nop (pTHX_ struct CoroSLF *frame)
845{
846 return 0;
847}
848
849static int
850slf_check_repeat (pTHX_ struct CoroSLF *frame)
851{
852 return 1;
853}
854
855static UNOP coro_setup_op;
856
857static void NOINLINE /* noinline to keep it out of the transfer fast path */
858coro_setup (pTHX_ struct coro *coro)
859{
860 /*
861 * emulate part of the perl startup here.
862 */
863 coro_init_stacks (aTHX);
864
865 PL_runops = RUNOPS_DEFAULT;
866 PL_curcop = &PL_compiling;
867 PL_in_eval = EVAL_NULL;
868 PL_comppad = 0;
869 PL_curpm = 0;
870 PL_curpad = 0;
871 PL_localizing = 0;
872 PL_dirty = 0;
873 PL_restartop = 0;
874#if PERL_VERSION_ATLEAST (5,10,0)
875 PL_parser = 0;
876#endif
877
878 /* recreate the die/warn hooks */
879 PL_diehook = 0; SvSetMagicSV (*hv_fetch (hv_sig, "__DIE__" , sizeof ("__DIE__" ) - 1, 1), rv_diehook );
880 PL_warnhook = 0; SvSetMagicSV (*hv_fetch (hv_sig, "__WARN__", sizeof ("__WARN__") - 1, 1), rv_warnhook);
881
882 GvSV (PL_defgv) = newSV (0);
883 GvAV (PL_defgv) = coro->args; coro->args = 0;
884 GvSV (PL_errgv) = newSV (0);
885 GvSV (irsgv) = newSVpvn ("\n", 1); sv_magic (GvSV (irsgv), (SV *)irsgv, PERL_MAGIC_sv, "/", 0);
886 PL_rs = newSVsv (GvSV (irsgv));
887 PL_defoutgv = (GV *)SvREFCNT_inc_NN (stdoutgv);
888
889 {
890 dSP;
891 UNOP myop;
892
893 Zero (&myop, 1, UNOP);
894 myop.op_next = Nullop;
895 myop.op_type = OP_ENTERSUB;
896 myop.op_flags = OPf_WANT_VOID;
897
898 PUSHMARK (SP);
899 PUSHs ((SV *)coro->startcv);
900 PUTBACK;
901 PL_op = (OP *)&myop;
902 PL_op = PL_ppaddr[OP_ENTERSUB](aTHX);
903 }
904
905 /* this newly created coroutine might be run on an existing cctx which most
906 * likely was suspended in pp_slf, so we have to emulate entering pp_slf here.
907 */
908 slf_frame.prepare = prepare_nop; /* provide a nop function for an eventual pp_slf */
909 slf_frame.check = slf_check_nop; /* signal pp_slf to not repeat */
910
911 /* and we have to provide the pp_slf op in any case, so pp_slf can skip it */
912 coro_setup_op.op_next = PL_op;
913 coro_setup_op.op_type = OP_ENTERSUB;
914 coro_setup_op.op_ppaddr = pp_slf;
915 /* no flags etc. required, as an init function won't be called */
916
917 PL_op = (OP *)&coro_setup_op;
918
919 /* copy throw, in case it was set before coro_setup */
920 CORO_THROW = coro->except;
921}
922
923static void
924coro_destruct (pTHX_ struct coro *coro)
925{
926 if (!IN_DESTRUCT)
927 {
928 /* restore all saved variables and stuff */
929 LEAVE_SCOPE (0);
930 assert (PL_tmps_floor == -1);
931
932 /* free all temporaries */
933 FREETMPS;
934 assert (PL_tmps_ix == -1);
935
936 /* unwind all extra stacks */
937 POPSTACK_TO (PL_mainstack);
938
939 /* unwind main stack */
940 dounwind (-1);
941 }
942
943 SvREFCNT_dec (GvSV (PL_defgv));
944 SvREFCNT_dec (GvAV (PL_defgv));
945 SvREFCNT_dec (GvSV (PL_errgv));
946 SvREFCNT_dec (PL_defoutgv);
947 SvREFCNT_dec (PL_rs);
948 SvREFCNT_dec (GvSV (irsgv));
949
950 SvREFCNT_dec (PL_diehook);
951 SvREFCNT_dec (PL_warnhook);
952
953 SvREFCNT_dec (coro->saved_deffh);
954 SvREFCNT_dec (coro->rouse_cb);
955 SvREFCNT_dec (coro->invoke_cb);
956 SvREFCNT_dec (coro->invoke_av);
957
958 coro_destruct_stacks (aTHX);
959}
960
961INLINE void
962free_coro_mortal (pTHX)
963{
964 if (expect_true (coro_mortal))
965 {
966 SvREFCNT_dec (coro_mortal);
967 coro_mortal = 0;
968 }
969}
970
971static int
972runops_trace (pTHX)
973{
974 COP *oldcop = 0;
975 int oldcxix = -2;
976
977 while ((PL_op = CALL_FPTR (PL_op->op_ppaddr) (aTHX)))
978 {
979 PERL_ASYNC_CHECK ();
980
981 if (cctx_current->flags & CC_TRACE_ALL)
982 {
983 if (PL_op->op_type == OP_LEAVESUB && cctx_current->flags & CC_TRACE_SUB)
984 {
985 PERL_CONTEXT *cx = &cxstack[cxstack_ix];
986 SV **bot, **top;
987 AV *av = newAV (); /* return values */
988 SV **cb;
989 dSP;
990
991 GV *gv = CvGV (cx->blk_sub.cv);
992 SV *fullname = sv_2mortal (newSV (0));
993 if (isGV (gv))
994 gv_efullname3 (fullname, gv, 0);
995
996 bot = PL_stack_base + cx->blk_oldsp + 1;
997 top = cx->blk_gimme == G_ARRAY ? SP + 1
998 : cx->blk_gimme == G_SCALAR ? bot + 1
999 : bot;
1000
1001 av_extend (av, top - bot);
1002 while (bot < top)
1003 av_push (av, SvREFCNT_inc_NN (*bot++));
1004
1005 PL_runops = RUNOPS_DEFAULT;
1006 ENTER;
1007 SAVETMPS;
1008 EXTEND (SP, 3);
1009 PUSHMARK (SP);
1010 PUSHs (&PL_sv_no);
1011 PUSHs (fullname);
1012 PUSHs (sv_2mortal (newRV_noinc ((SV *)av)));
1013 PUTBACK;
1014 cb = hv_fetch ((HV *)SvRV (coro_current), "_trace_sub_cb", sizeof ("_trace_sub_cb") - 1, 0);
1015 if (cb) call_sv (*cb, G_KEEPERR | G_EVAL | G_VOID | G_DISCARD);
1016 SPAGAIN;
1017 FREETMPS;
1018 LEAVE;
1019 PL_runops = runops_trace;
1020 }
1021
1022 if (oldcop != PL_curcop)
1023 {
1024 oldcop = PL_curcop;
1025
1026 if (PL_curcop != &PL_compiling)
1027 {
1028 SV **cb;
1029
1030 if (oldcxix != cxstack_ix && cctx_current->flags & CC_TRACE_SUB)
1031 {
1032 PERL_CONTEXT *cx = &cxstack[cxstack_ix];
1033
1034 if (CxTYPE (cx) == CXt_SUB && oldcxix < cxstack_ix)
1035 {
1036 dSP;
1037 GV *gv = CvGV (cx->blk_sub.cv);
1038 SV *fullname = sv_2mortal (newSV (0));
1039
1040 if (isGV (gv))
1041 gv_efullname3 (fullname, gv, 0);
1042
1043 PL_runops = RUNOPS_DEFAULT;
1044 ENTER;
1045 SAVETMPS;
1046 EXTEND (SP, 3);
1047 PUSHMARK (SP);
1048 PUSHs (&PL_sv_yes);
1049 PUSHs (fullname);
1050 PUSHs (CxHASARGS (cx) ? sv_2mortal (newRV_inc ((SV *)cx->blk_sub.argarray)) : &PL_sv_undef);
1051 PUTBACK;
1052 cb = hv_fetch ((HV *)SvRV (coro_current), "_trace_sub_cb", sizeof ("_trace_sub_cb") - 1, 0);
1053 if (cb) call_sv (*cb, G_KEEPERR | G_EVAL | G_VOID | G_DISCARD);
1054 SPAGAIN;
1055 FREETMPS;
1056 LEAVE;
1057 PL_runops = runops_trace;
1058 }
1059
1060 oldcxix = cxstack_ix;
1061 }
1062
1063 if (cctx_current->flags & CC_TRACE_LINE)
1064 {
1065 dSP;
1066
1067 PL_runops = RUNOPS_DEFAULT;
1068 ENTER;
1069 SAVETMPS;
1070 EXTEND (SP, 3);
1071 PL_runops = RUNOPS_DEFAULT;
1072 PUSHMARK (SP);
1073 PUSHs (sv_2mortal (newSVpv (OutCopFILE (oldcop), 0)));
1074 PUSHs (sv_2mortal (newSViv (CopLINE (oldcop))));
1075 PUTBACK;
1076 cb = hv_fetch ((HV *)SvRV (coro_current), "_trace_line_cb", sizeof ("_trace_line_cb") - 1, 0);
1077 if (cb) call_sv (*cb, G_KEEPERR | G_EVAL | G_VOID | G_DISCARD);
1078 SPAGAIN;
1079 FREETMPS;
1080 LEAVE;
1081 PL_runops = runops_trace;
1082 }
1083 }
1084 }
1085 }
1086 }
1087
1088 TAINT_NOT;
1089 return 0;
1090}
1091
1092static struct CoroSLF cctx_ssl_frame;
1093
1094static void
1095slf_prepare_set_stacklevel (pTHX_ struct coro_transfer_args *ta)
1096{
1097 ta->prev = 0;
1098}
1099
1100static int
1101slf_check_set_stacklevel (pTHX_ struct CoroSLF *frame)
1102{
1103 *frame = cctx_ssl_frame;
1104
1105 return frame->check (aTHX_ frame); /* execute the restored frame - there must be one */
1106}
1107
1108/* initialises PL_top_env and injects a pseudo-slf-call to set the stacklevel */
1109static void NOINLINE
1110cctx_prepare (pTHX)
1111{
1112 PL_top_env = &PL_start_env;
1113
1114 if (cctx_current->flags & CC_TRACE)
1115 PL_runops = runops_trace;
1116
1117 /* we already must be executing an SLF op, there is no other valid way
1118 * that can lead to creation of a new cctx */
1119 assert (("FATAL: can't prepare slf-less cctx in Coro module (please report)",
1120 slf_frame.prepare && PL_op->op_ppaddr == pp_slf));
1121
1122 /* we must emulate leaving pp_slf, which is done inside slf_check_set_stacklevel */
1123 cctx_ssl_frame = slf_frame;
1124
1125 slf_frame.prepare = slf_prepare_set_stacklevel;
1126 slf_frame.check = slf_check_set_stacklevel;
1127}
1128
1129/* the tail of transfer: execute stuff we can only do after a transfer */
1130INLINE void
1131transfer_tail (pTHX)
1132{
1133 free_coro_mortal (aTHX);
1134}
1135
1136/*
1137 * this is a _very_ stripped down perl interpreter ;)
1138 */
1139static void
1140cctx_run (void *arg)
1141{
1142#ifdef USE_ITHREADS
1143# if CORO_PTHREAD
1144 PERL_SET_CONTEXT (coro_thx);
1145# endif
1146#endif
1147 {
1148 dTHX;
1149
1150 /* normally we would need to skip the entersub here */
1151 /* not doing so will re-execute it, which is exactly what we want */
1152 /* PL_nop = PL_nop->op_next */
1153
1154 /* inject a fake subroutine call to cctx_init */
1155 cctx_prepare (aTHX);
1156
1157 /* cctx_run is the alternative tail of transfer() */
1158 transfer_tail (aTHX);
1159
1160 /* somebody or something will hit me for both perl_run and PL_restartop */
1161 PL_restartop = PL_op;
1162 perl_run (PL_curinterp);
1163 /*
1164 * Unfortunately, there is no way to get at the return values of the
1165 * coro body here, as perl_run destroys these
1166 */
1167
1168 /*
1169 * If perl-run returns we assume exit() was being called or the coro
1170 * fell off the end, which seems to be the only valid (non-bug)
1171 * reason for perl_run to return. We try to exit by jumping to the
1172 * bootstrap-time "top" top_env, as we cannot restore the "main"
1173 * coroutine as Coro has no such concept
1174 */
1175 PL_top_env = main_top_env;
1176 JMPENV_JUMP (2); /* I do not feel well about the hardcoded 2 at all */
1177 }
1178}
1179
1180static coro_cctx *
1181cctx_new ()
1182{
1183 coro_cctx *cctx;
1184
1185 ++cctx_count;
1186 New (0, cctx, 1, coro_cctx);
1187
1188 cctx->gen = cctx_gen;
1189 cctx->flags = 0;
1190 cctx->idle_sp = 0; /* can be accessed by transfer between cctx_run and set_stacklevel, on throw */
1191
1192 return cctx;
1193}
1194
1195/* create a new cctx only suitable as source */
1196static coro_cctx *
1197cctx_new_empty ()
1198{
1199 coro_cctx *cctx = cctx_new ();
1200
1201 cctx->sptr = 0;
1202 coro_create (&cctx->cctx, 0, 0, 0, 0);
1203
1204 return cctx;
1205}
1206
1207/* create a new cctx suitable as destination/running a perl interpreter */
1208static coro_cctx *
1209cctx_new_run ()
1210{
1211 coro_cctx *cctx = cctx_new ();
1212 void *stack_start;
1213 size_t stack_size;
1214
1215#if HAVE_MMAP
1216 cctx->ssize = ((cctx_stacksize * sizeof (long) + PAGESIZE - 1) / PAGESIZE + CORO_STACKGUARD) * PAGESIZE;
1217 /* mmap supposedly does allocate-on-write for us */
1218 cctx->sptr = mmap (0, cctx->ssize, PROT_EXEC|PROT_READ|PROT_WRITE, MAP_PRIVATE|MAP_ANONYMOUS, 0, 0);
1219
1220 if (cctx->sptr != (void *)-1)
1221 {
1222 #if CORO_STACKGUARD
1223 mprotect (cctx->sptr, CORO_STACKGUARD * PAGESIZE, PROT_NONE);
1224 #endif
1225 stack_start = (char *)cctx->sptr + CORO_STACKGUARD * PAGESIZE;
1226 stack_size = cctx->ssize - CORO_STACKGUARD * PAGESIZE;
1227 cctx->flags |= CC_MAPPED;
1228 }
1229 else
1230#endif
1231 {
1232 cctx->ssize = cctx_stacksize * (long)sizeof (long);
1233 New (0, cctx->sptr, cctx_stacksize, long);
1234
1235 if (!cctx->sptr)
1236 {
1237 perror ("FATAL: unable to allocate stack for coroutine, exiting.");
1238 _exit (EXIT_FAILURE);
1239 }
1240
1241 stack_start = cctx->sptr;
1242 stack_size = cctx->ssize;
1243 }
1244
1245 #if CORO_USE_VALGRIND
1246 cctx->valgrind_id = VALGRIND_STACK_REGISTER ((char *)stack_start, (char *)stack_start + stack_size);
1247 #endif
1248
1249 coro_create (&cctx->cctx, cctx_run, (void *)cctx, stack_start, stack_size);
1250
1251 return cctx;
1252}
1253
1254static void
1255cctx_destroy (coro_cctx *cctx)
1256{
1257 if (!cctx)
1258 return;
1259
1260 assert (cctx != cctx_current);//D temporary
1261
1262 --cctx_count;
1263 coro_destroy (&cctx->cctx);
1264
1265 /* coro_transfer creates new, empty cctx's */
1266 if (cctx->sptr)
1267 {
1268 #if CORO_USE_VALGRIND
1269 VALGRIND_STACK_DEREGISTER (cctx->valgrind_id);
1270 #endif
1271
1272#if HAVE_MMAP
1273 if (cctx->flags & CC_MAPPED)
1274 munmap (cctx->sptr, cctx->ssize);
1275 else
1276#endif
1277 Safefree (cctx->sptr);
1278 }
1279
1280 Safefree (cctx);
1281}
1282
1283/* wether this cctx should be destructed */
1284#define CCTX_EXPIRED(cctx) ((cctx)->gen != cctx_gen || ((cctx)->flags & CC_NOREUSE))
1285
1286static coro_cctx *
1287cctx_get (pTHX)
1288{
1289 while (expect_true (cctx_first))
1290 {
1291 coro_cctx *cctx = cctx_first;
1292 cctx_first = cctx->next;
1293 --cctx_idle;
1294
1295 if (expect_true (!CCTX_EXPIRED (cctx)))
1296 return cctx;
1297
1298 cctx_destroy (cctx);
1299 }
1300
1301 return cctx_new_run ();
1302}
1303
1304static void
1305cctx_put (coro_cctx *cctx)
1306{
1307 assert (("FATAL: cctx_put called on non-initialised cctx in Coro (please report)", cctx->sptr));
1308
1309 /* free another cctx if overlimit */
1310 if (expect_false (cctx_idle >= cctx_max_idle))
1311 {
1312 coro_cctx *first = cctx_first;
1313 cctx_first = first->next;
1314 --cctx_idle;
1315
1316 cctx_destroy (first);
1317 }
1318
1319 ++cctx_idle;
1320 cctx->next = cctx_first;
1321 cctx_first = cctx;
1322}
1323
1324/** coroutine switching *****************************************************/
1325
1326static void
1327transfer_check (pTHX_ struct coro *prev, struct coro *next)
1328{
1329 /* TODO: throwing up here is considered harmful */
1330
1331 if (expect_true (prev != next))
1332 {
1333 if (expect_false (!(prev->flags & (CF_RUNNING | CF_NEW))))
1334 croak ("Coro::State::transfer called with non-running/new prev Coro::State, but can only transfer from running or new states,");
1335
1336 if (expect_false (next->flags & CF_RUNNING))
1337 croak ("Coro::State::transfer called with running next Coro::State, but can only transfer to inactive states,");
1338
1339 if (expect_false (next->flags & CF_DESTROYED))
1340 croak ("Coro::State::transfer called with destroyed next Coro::State, but can only transfer to inactive states,");
1341
1342#if !PERL_VERSION_ATLEAST (5,10,0)
1343 if (expect_false (PL_lex_state != LEX_NOTPARSING))
1344 croak ("Coro::State::transfer called while parsing, but this is not supported in your perl version,");
1345#endif
1346 }
1347}
1348
1349/* always use the TRANSFER macro */
1350static void NOINLINE /* noinline so we have a fixed stackframe */
1351transfer (pTHX_ struct coro *prev, struct coro *next, int force_cctx)
1352{
1353 dSTACKLEVEL;
1354
1355 /* sometimes transfer is only called to set idle_sp */
1356 if (expect_false (!prev))
1357 {
1358 cctx_current->idle_sp = STACKLEVEL;
1359 assert (cctx_current->idle_te = PL_top_env); /* just for the side-effect when asserts are enabled */
1360 }
1361 else if (expect_true (prev != next))
1362 {
1363 coro_cctx *cctx_prev;
1364
1365 if (expect_false (prev->flags & CF_NEW))
1366 {
1367 /* create a new empty/source context */
1368 prev->flags &= ~CF_NEW;
1369 prev->flags |= CF_RUNNING;
1370 }
1371
1372 prev->flags &= ~CF_RUNNING;
1373 next->flags |= CF_RUNNING;
1374
1375 /* first get rid of the old state */
1376 save_perl (aTHX_ prev);
1377
1378 if (expect_false (next->flags & CF_NEW))
1379 {
1380 /* need to start coroutine */
1381 next->flags &= ~CF_NEW;
1382 /* setup coroutine call */
1383 coro_setup (aTHX_ next);
1384 }
1385 else
1386 load_perl (aTHX_ next);
1387
1388 /* possibly untie and reuse the cctx */
1389 if (expect_true (
1390 cctx_current->idle_sp == STACKLEVEL
1391 && !(cctx_current->flags & CC_TRACE)
1392 && !force_cctx
1393 ))
1394 {
1395 /* I assume that stacklevel is a stronger indicator than PL_top_env changes */
1396 assert (("FATAL: current top_env must equal previous top_env in Coro (please report)", PL_top_env == cctx_current->idle_te));
1397
1398 /* if the cctx is about to be destroyed we need to make sure we won't see it in cctx_get */
1399 /* without this the next cctx_get might destroy the prev__cctx while still in use */
1400 if (expect_false (CCTX_EXPIRED (cctx_current)))
1401 if (!next->cctx)
1402 next->cctx = cctx_get (aTHX);
1403
1404 cctx_put (cctx_current);
1405 assert (!prev->cctx);//D temporary
1406 }
1407 else
1408 prev->cctx = cctx_current;
1409
1410 ++next->usecount;
1411
1412 cctx_prev = cctx_current;
1413 cctx_current = expect_false (next->cctx) ? next->cctx : cctx_get (aTHX);
1414
1415 next->cctx = 0;
1416
1417 if (expect_false (cctx_prev != cctx_current))
1418 {
1419 cctx_prev->top_env = PL_top_env;
1420 PL_top_env = cctx_current->top_env;
1421 coro_transfer (&cctx_prev->cctx, &cctx_current->cctx);
1422 }
1423
1424 transfer_tail (aTHX);
1425 }
1426}
1427
1428#define TRANSFER(ta, force_cctx) transfer (aTHX_ (ta).prev, (ta).next, (force_cctx))
1429#define TRANSFER_CHECK(ta) transfer_check (aTHX_ (ta).prev, (ta).next)
1430
1431/** high level stuff ********************************************************/
1432
1433static int
1434coro_state_destroy (pTHX_ struct coro *coro)
1435{
1436 if (coro->flags & CF_DESTROYED)
1437 return 0;
1438
1439 if (coro->on_destroy)
1440 coro->on_destroy (aTHX_ coro);
1441
1442 coro->flags |= CF_DESTROYED;
1443
1444 if (coro->flags & CF_READY)
1445 {
1446 /* reduce nready, as destroying a ready coro effectively unreadies it */
1447 /* alternative: look through all ready queues and remove the coro */
1448 --coro_nready;
1449 }
1450 else
1451 coro->flags |= CF_READY; /* make sure it is NOT put into the readyqueue */
1452
1453 if (coro->mainstack && coro->mainstack != main_mainstack)
1454 {
1455 struct coro temp;
1456
1457 assert (("FATAL: tried to destroy currently running coroutine (please report)", !(coro->flags & CF_RUNNING)));
1458
1459 save_perl (aTHX_ &temp);
1460 load_perl (aTHX_ coro);
1461
1462 coro_destruct (aTHX_ coro);
1463
1464 load_perl (aTHX_ &temp);
1465
1466 coro->slot = 0;
1467 }
1468
1469 cctx_destroy (coro->cctx);
1470 SvREFCNT_dec (coro->startcv);
1471 SvREFCNT_dec (coro->args);
1472 SvREFCNT_dec (CORO_THROW);
1473
1474 if (coro->next) coro->next->prev = coro->prev;
1475 if (coro->prev) coro->prev->next = coro->next;
1476 if (coro == coro_first) coro_first = coro->next;
1477
1478 return 1;
1479}
1480
1481static int
1482coro_state_free (pTHX_ SV *sv, MAGIC *mg)
1483{
1484 struct coro *coro = (struct coro *)mg->mg_ptr;
1485 mg->mg_ptr = 0;
1486
1487 coro->hv = 0;
1488
1489 if (--coro->refcnt < 0)
1490 {
1491 coro_state_destroy (aTHX_ coro);
1492 Safefree (coro);
1493 }
1494
1495 return 0;
1496}
1497
1498static int
1499coro_state_dup (pTHX_ MAGIC *mg, CLONE_PARAMS *params)
1500{
1501 struct coro *coro = (struct coro *)mg->mg_ptr;
1502
1503 ++coro->refcnt;
1504
1505 return 0;
1506}
1507
1508static MGVTBL coro_state_vtbl = {
1509 0, 0, 0, 0,
1510 coro_state_free,
1511 0,
1512#ifdef MGf_DUP
1513 coro_state_dup,
1514#else
1515# define MGf_DUP 0
1516#endif
1517};
1518
1519static void
1520prepare_transfer (pTHX_ struct coro_transfer_args *ta, SV *prev_sv, SV *next_sv)
1521{
1522 ta->prev = SvSTATE (prev_sv);
1523 ta->next = SvSTATE (next_sv);
1524 TRANSFER_CHECK (*ta);
1525}
1526
1527static void
1528api_transfer (pTHX_ SV *prev_sv, SV *next_sv)
1529{
1530 struct coro_transfer_args ta;
1531
1532 prepare_transfer (aTHX_ &ta, prev_sv, next_sv);
1533 TRANSFER (ta, 1);
1534}
1535
1536/*****************************************************************************/
1537/* gensub: simple closure generation utility */
1538
1539#define GENSUB_ARG CvXSUBANY (cv).any_ptr
1540
1541/* create a closure from XS, returns a code reference */
1542/* the arg can be accessed via GENSUB_ARG from the callback */
1543/* the callback must use dXSARGS/XSRETURN */
1544static SV *
1545gensub (pTHX_ void (*xsub)(pTHX_ CV *), void *arg)
1546{
1547 CV *cv = (CV *)newSV (0);
1548
1549 sv_upgrade ((SV *)cv, SVt_PVCV);
1550
1551 CvANON_on (cv);
1552 CvISXSUB_on (cv);
1553 CvXSUB (cv) = xsub;
1554 GENSUB_ARG = arg;
1555
1556 return newRV_noinc ((SV *)cv);
1557}
1558
1559/** Coro ********************************************************************/
1560
1561INLINE void
1562coro_enq (pTHX_ struct coro *coro)
1563{
1564 av_push (coro_ready [coro->prio - PRIO_MIN], SvREFCNT_inc_NN (coro->hv));
1565}
1566
1567INLINE SV *
1568coro_deq (pTHX)
1569{
1570 int prio;
1571
1572 for (prio = PRIO_MAX - PRIO_MIN + 1; --prio >= 0; )
1573 if (AvFILLp (coro_ready [prio]) >= 0)
1574 return av_shift (coro_ready [prio]);
1575
1576 return 0;
1577}
1578
1579static int
1580api_ready (pTHX_ SV *coro_sv)
1581{
1582 struct coro *coro;
1583 SV *sv_hook;
1584 void (*xs_hook)(void);
1585
1586 if (SvROK (coro_sv))
1587 coro_sv = SvRV (coro_sv);
1588
1589 coro = SvSTATE (coro_sv);
1590
1591 if (coro->flags & CF_READY)
1592 return 0;
1593
1594 coro->flags |= CF_READY;
1595
1596 sv_hook = coro_nready ? 0 : coro_readyhook;
1597 xs_hook = coro_nready ? 0 : coroapi.readyhook;
1598
1599 coro_enq (aTHX_ coro);
1600 ++coro_nready;
1601
1602 if (sv_hook)
1603 {
1604 dSP;
1605
1606 ENTER;
1607 SAVETMPS;
1608
1609 PUSHMARK (SP);
1610 PUTBACK;
1611 call_sv (sv_hook, G_VOID | G_DISCARD);
1612
1613 FREETMPS;
1614 LEAVE;
1615 }
1616
1617 if (xs_hook)
1618 xs_hook ();
1619
1620 return 1;
1621}
1622
1623static int
1624api_is_ready (pTHX_ SV *coro_sv)
1625{
1626 return !!(SvSTATE (coro_sv)->flags & CF_READY);
1627}
1628
1629/* expects to own a reference to next->hv */
1630INLINE void
1631prepare_schedule_to (pTHX_ struct coro_transfer_args *ta, struct coro *next)
1632{
1633 SV *prev_sv = SvRV (coro_current);
1634
1635 ta->prev = SvSTATE_hv (prev_sv);
1636 ta->next = next;
1637
1638 TRANSFER_CHECK (*ta);
1639
1640 SvRV_set (coro_current, (SV *)next->hv);
1641
1642 free_coro_mortal (aTHX);
1643 coro_mortal = prev_sv;
1644}
1645
1646static void
1647prepare_schedule (pTHX_ struct coro_transfer_args *ta)
1648{
1649 for (;;)
1650 {
1651 SV *next_sv = coro_deq (aTHX);
1652
1653 if (expect_true (next_sv))
1654 {
1655 struct coro *next = SvSTATE_hv (next_sv);
1656
1657 /* cannot transfer to destroyed coros, skip and look for next */
1658 if (expect_false (next->flags & CF_DESTROYED))
1659 SvREFCNT_dec (next_sv); /* coro_nready has already been taken care of by destroy */
1660 else
1661 {
1662 next->flags &= ~CF_READY;
1663 --coro_nready;
1664
1665 prepare_schedule_to (aTHX_ ta, next);
1666 break;
1667 }
1668 }
1669 else
1670 {
1671 /* nothing to schedule: call the idle handler */
1672 dSP;
1673
1674 ENTER;
1675 SAVETMPS;
1676
1677 PUSHMARK (SP);
1678 PUTBACK;
1679 call_sv (get_sv ("Coro::idle", FALSE), G_VOID | G_DISCARD);
1680
1681 FREETMPS;
1682 LEAVE;
1683 }
1684 }
1685}
1686
1687INLINE void
1688prepare_cede (pTHX_ struct coro_transfer_args *ta)
1689{
1690 api_ready (aTHX_ coro_current);
1691 prepare_schedule (aTHX_ ta);
1692}
1693
1694INLINE void
1695prepare_cede_notself (pTHX_ struct coro_transfer_args *ta)
1696{
1697 SV *prev = SvRV (coro_current);
1698
1699 if (coro_nready)
1700 {
1701 prepare_schedule (aTHX_ ta);
1702 api_ready (aTHX_ prev);
1703 }
1704 else
1705 prepare_nop (aTHX_ ta);
1706}
1707
1708static void
1709api_schedule (pTHX)
1710{
1711 struct coro_transfer_args ta;
1712
1713 prepare_schedule (aTHX_ &ta);
1714 TRANSFER (ta, 1);
1715}
1716
1717static void
1718api_schedule_to (pTHX_ SV *coro_sv)
1719{
1720 struct coro_transfer_args ta;
1721 struct coro *next = SvSTATE (coro_sv);
1722
1723 SvREFCNT_inc_NN (coro_sv);
1724 prepare_schedule_to (aTHX_ &ta, next);
1725}
1726
1727static int
1728api_cede (pTHX)
1729{
1730 struct coro_transfer_args ta;
1731
1732 prepare_cede (aTHX_ &ta);
1733
1734 if (expect_true (ta.prev != ta.next))
1735 {
1736 TRANSFER (ta, 1);
1737 return 1;
1738 }
1739 else
1740 return 0;
1741}
1742
1743static int
1744api_cede_notself (pTHX)
1745{
1746 if (coro_nready)
1747 {
1748 struct coro_transfer_args ta;
1749
1750 prepare_cede_notself (aTHX_ &ta);
1751 TRANSFER (ta, 1);
1752 return 1;
1753 }
1754 else
1755 return 0;
1756}
1757
1758static void
1759api_trace (pTHX_ SV *coro_sv, int flags)
1760{
1761 struct coro *coro = SvSTATE (coro_sv);
1762
1763 if (flags & CC_TRACE)
1764 {
1765 if (!coro->cctx)
1766 coro->cctx = cctx_new_run ();
1767 else if (!(coro->cctx->flags & CC_TRACE))
1768 croak ("cannot enable tracing on coroutine with custom stack,");
1769
1770 coro->cctx->flags |= CC_NOREUSE | (flags & (CC_TRACE | CC_TRACE_ALL));
1771 }
1772 else if (coro->cctx && coro->cctx->flags & CC_TRACE)
1773 {
1774 coro->cctx->flags &= ~(CC_TRACE | CC_TRACE_ALL);
1775
1776 if (coro->flags & CF_RUNNING)
1777 PL_runops = RUNOPS_DEFAULT;
1778 else
1779 coro->slot->runops = RUNOPS_DEFAULT;
1780 }
1781}
1782
1783static void
1784coro_call_on_destroy (pTHX_ struct coro *coro)
1785{
1786 SV **on_destroyp = hv_fetch (coro->hv, "_on_destroy", sizeof ("_on_destroy") - 1, 0);
1787 SV **statusp = hv_fetch (coro->hv, "_status", sizeof ("_status") - 1, 0);
1788
1789 if (on_destroyp)
1790 {
1791 AV *on_destroy = (AV *)SvRV (*on_destroyp);
1792
1793 while (AvFILLp (on_destroy) >= 0)
1794 {
1795 dSP; /* don't disturb outer sp */
1796 SV *cb = av_pop (on_destroy);
1797
1798 PUSHMARK (SP);
1799
1800 if (statusp)
1801 {
1802 int i;
1803 AV *status = (AV *)SvRV (*statusp);
1804 EXTEND (SP, AvFILLp (status) + 1);
1805
1806 for (i = 0; i <= AvFILLp (status); ++i)
1807 PUSHs (AvARRAY (status)[i]);
1808 }
1809
1810 PUTBACK;
1811 call_sv (sv_2mortal (cb), G_VOID | G_DISCARD);
1812 }
1813 }
1814}
1815
1816static void
1817slf_init_terminate (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
1818{
1819 int i;
1820 HV *hv = (HV *)SvRV (coro_current);
1821 AV *av = newAV ();
1822
1823 av_extend (av, items - 1);
1824 for (i = 0; i < items; ++i)
1825 av_push (av, SvREFCNT_inc_NN (arg [i]));
1826
1827 hv_store (hv, "_status", sizeof ("_status") - 1, newRV_noinc ((SV *)av), 0);
1828
1829 av_push (av_destroy, (SV *)newRV_inc ((SV *)hv)); /* RVinc for perl */
1830 api_ready (aTHX_ sv_manager);
1831
1832 frame->prepare = prepare_schedule;
1833 frame->check = slf_check_repeat;
1834}
1835
1836/*****************************************************************************/
1837/* async pool handler */
1838
1839static int
1840slf_check_pool_handler (pTHX_ struct CoroSLF *frame)
1841{
1842 HV *hv = (HV *)SvRV (coro_current);
1843 struct coro *coro = (struct coro *)frame->data;
1844
1845 if (!coro->invoke_cb)
1846 return 1; /* loop till we have invoke */
1847 else
1848 {
1849 hv_store (hv, "desc", sizeof ("desc") - 1,
1850 newSVpvn ("[async_pool]", sizeof ("[async_pool]") - 1), 0);
1851
1852 coro->saved_deffh = SvREFCNT_inc_NN ((SV *)PL_defoutgv);
1853
1854 {
1855 dSP;
1856 XPUSHs (sv_2mortal (coro->invoke_cb)); coro->invoke_cb = 0;
1857 PUTBACK;
1858 }
1859
1860 SvREFCNT_dec (GvAV (PL_defgv));
1861 GvAV (PL_defgv) = coro->invoke_av;
1862 coro->invoke_av = 0;
1863
1864 return 0;
1865 }
1866}
1867
1868static void
1869slf_init_pool_handler (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
1870{
1871 HV *hv = (HV *)SvRV (coro_current);
1872 struct coro *coro = SvSTATE_hv ((SV *)hv);
1873
1874 if (expect_true (coro->saved_deffh))
1875 {
1876 /* subsequent iteration */
1877 SvREFCNT_dec ((SV *)PL_defoutgv); PL_defoutgv = (GV *)coro->saved_deffh;
1878 coro->saved_deffh = 0;
1879
1880 if (coro_rss (aTHX_ coro) > SvUV (sv_pool_rss)
1881 || av_len (av_async_pool) + 1 >= SvIV (sv_pool_size))
1882 {
1883 coro->invoke_cb = SvREFCNT_inc_NN ((SV *)cv_coro_terminate);
1884 coro->invoke_av = newAV ();
1885
1886 frame->prepare = prepare_nop;
1887 }
1888 else
1889 {
1890 av_clear (GvAV (PL_defgv));
1891 hv_store (hv, "desc", sizeof ("desc") - 1, SvREFCNT_inc_NN (sv_async_pool_idle), 0);
1892
1893 coro->prio = 0;
1894
1895 if (coro->cctx && (coro->cctx->flags & CC_TRACE))
1896 api_trace (aTHX_ coro_current, 0);
1897
1898 frame->prepare = prepare_schedule;
1899 av_push (av_async_pool, SvREFCNT_inc (hv));
1900 }
1901 }
1902 else
1903 {
1904 /* first iteration, simply fall through */
1905 frame->prepare = prepare_nop;
1906 }
1907
1908 frame->check = slf_check_pool_handler;
1909 frame->data = (void *)coro;
1910}
1911
1912/*****************************************************************************/
1913/* rouse callback */
1914
1915#define CORO_MAGIC_type_rouse PERL_MAGIC_ext
1916
1917static void
1918coro_rouse_callback (pTHX_ CV *cv)
1919{
1920 dXSARGS;
1921 SV *data = (SV *)GENSUB_ARG;
1922
1923 if (SvTYPE (SvRV (data)) != SVt_PVAV)
1924 {
1925 /* first call, set args */
1926 AV *av = newAV ();
1927 SV *coro = SvRV (data);
1928
1929 SvRV_set (data, (SV *)av);
1930 api_ready (aTHX_ coro);
1931 SvREFCNT_dec (coro);
1932
1933 /* better take a full copy of the arguments */
1934 while (items--)
1935 av_store (av, items, newSVsv (ST (items)));
1936 }
1937
1938 XSRETURN_EMPTY;
1939}
1940
1941static int
1942slf_check_rouse_wait (pTHX_ struct CoroSLF *frame)
1943{
1944 SV *data = (SV *)frame->data;
1945
1946 if (CORO_THROW)
1947 return 0;
1948
1949 if (SvTYPE (SvRV (data)) != SVt_PVAV)
1950 return 1;
1951
1952 /* now push all results on the stack */
1953 {
1954 dSP;
1955 AV *av = (AV *)SvRV (data);
1956 int i;
1957
1958 EXTEND (SP, AvFILLp (av) + 1);
1959 for (i = 0; i <= AvFILLp (av); ++i)
1960 PUSHs (sv_2mortal (AvARRAY (av)[i]));
1961
1962 /* we have stolen the elements, so ste length to zero and free */
1963 AvFILLp (av) = -1;
1964 av_undef (av);
1965
1966 PUTBACK;
1967 }
1968
1969 return 0;
1970}
1971
1972static void
1973slf_init_rouse_wait (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
1974{
1975 SV *cb;
1976
1977 if (items)
1978 cb = arg [0];
1979 else
1980 {
1981 struct coro *coro = SvSTATE_current;
1982
1983 if (!coro->rouse_cb)
1984 croak ("Coro::rouse_wait called without rouse callback, and no default rouse callback found either,");
1985
1986 cb = sv_2mortal (coro->rouse_cb);
1987 coro->rouse_cb = 0;
1988 }
1989
1990 if (!SvROK (cb)
1991 || SvTYPE (SvRV (cb)) != SVt_PVCV
1992 || CvXSUB ((CV *)SvRV (cb)) != coro_rouse_callback)
1993 croak ("Coro::rouse_wait called with illegal callback argument,");
1994
1995 {
1996 CV *cv = (CV *)SvRV (cb); /* for GENSUB_ARG */
1997 SV *data = (SV *)GENSUB_ARG;
1998
1999 frame->data = (void *)data;
2000 frame->prepare = SvTYPE (SvRV (data)) == SVt_PVAV ? prepare_nop : prepare_schedule;
2001 frame->check = slf_check_rouse_wait;
2002 }
2003}
2004
2005static SV *
2006coro_new_rouse_cb (pTHX)
2007{
2008 HV *hv = (HV *)SvRV (coro_current);
2009 struct coro *coro = SvSTATE_hv (hv);
2010 SV *data = newRV_inc ((SV *)hv);
2011 SV *cb = gensub (aTHX_ coro_rouse_callback, (void *)data);
2012
2013 sv_magicext (SvRV (cb), data, CORO_MAGIC_type_rouse, 0, 0, 0);
2014 SvREFCNT_dec (data); /* magicext increases the refcount */
2015
2016 SvREFCNT_dec (coro->rouse_cb);
2017 coro->rouse_cb = SvREFCNT_inc_NN (cb);
2018
2019 return cb;
2020}
2021
2022/*****************************************************************************/
2023/* schedule-like-function opcode (SLF) */
2024
2025static UNOP slf_restore; /* restore stack as entersub did, for first-re-run */
2026static const CV *slf_cv;
2027static SV **slf_argv;
2028static int slf_argc, slf_arga; /* count, allocated */
2029static I32 slf_ax; /* top of stack, for restore */
2030
2031/* this restores the stack in the case we patched the entersub, to */
2032/* recreate the stack frame as perl will on following calls */
2033/* since entersub cleared the stack */
2034static OP *
2035pp_restore (pTHX)
2036{
2037 int i;
2038 SV **SP = PL_stack_base + slf_ax;
2039
2040 PUSHMARK (SP);
2041
2042 EXTEND (SP, slf_argc + 1);
2043
2044 for (i = 0; i < slf_argc; ++i)
2045 PUSHs (sv_2mortal (slf_argv [i]));
2046
2047 PUSHs ((SV *)CvGV (slf_cv));
2048
2049 RETURNOP (slf_restore.op_first);
2050}
2051
2052static void
2053slf_prepare_transfer (pTHX_ struct coro_transfer_args *ta)
2054{
2055 SV **arg = (SV **)slf_frame.data;
2056
2057 prepare_transfer (aTHX_ ta, arg [0], arg [1]);
2058}
2059
2060static void
2061slf_init_transfer (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2062{
2063 if (items != 2)
2064 croak ("Coro::State::transfer (prev, next) expects two arguments, not %d,", items);
2065
2066 frame->prepare = slf_prepare_transfer;
2067 frame->check = slf_check_nop;
2068 frame->data = (void *)arg; /* let's hope it will stay valid */
2069}
2070
2071static void
2072slf_init_schedule (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2073{
2074 frame->prepare = prepare_schedule;
2075 frame->check = slf_check_nop;
2076}
2077
2078static void
2079slf_prepare_schedule_to (pTHX_ struct coro_transfer_args *ta)
2080{
2081 struct coro *next = (struct coro *)slf_frame.data;
2082
2083 SvREFCNT_inc_NN (next->hv);
2084 prepare_schedule_to (aTHX_ ta, next);
2085}
2086
2087static void
2088slf_init_schedule_to (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2089{
2090 if (!items)
2091 croak ("Coro::schedule_to expects a coroutine argument, caught");
2092
2093 frame->data = (void *)SvSTATE (arg [0]);
2094 frame->prepare = slf_prepare_schedule_to;
2095 frame->check = slf_check_nop;
2096}
2097
2098static void
2099slf_init_cede_to (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2100{
2101 api_ready (aTHX_ SvRV (coro_current));
2102
2103 slf_init_schedule_to (aTHX_ frame, cv, arg, items);
2104}
2105
2106static void
2107slf_init_cede (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2108{
2109 frame->prepare = prepare_cede;
2110 frame->check = slf_check_nop;
2111}
2112
2113static void
2114slf_init_cede_notself (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2115{
2116 frame->prepare = prepare_cede_notself;
2117 frame->check = slf_check_nop;
2118}
2119
2120/*
2121 * these not obviously related functions are all rolled into one
2122 * function to increase chances that they all will call transfer with the same
2123 * stack offset
2124 * SLF stands for "schedule-like-function".
2125 */
2126static OP *
2127pp_slf (pTHX)
2128{
2129 I32 checkmark; /* mark SP to see how many elements check has pushed */
2130
2131 /* set up the slf frame, unless it has already been set-up */
2132 /* the latter happens when a new coro has been started */
2133 /* or when a new cctx was attached to an existing coroutine */
2134 if (expect_true (!slf_frame.prepare))
2135 {
2136 /* first iteration */
2137 dSP;
2138 SV **arg = PL_stack_base + TOPMARK + 1;
2139 int items = SP - arg; /* args without function object */
2140 SV *gv = *sp;
2141
2142 /* do a quick consistency check on the "function" object, and if it isn't */
2143 /* for us, divert to the real entersub */
2144 if (SvTYPE (gv) != SVt_PVGV
2145 || !GvCV (gv)
2146 || !(CvFLAGS (GvCV (gv)) & CVf_SLF))
2147 return PL_ppaddr[OP_ENTERSUB](aTHX);
2148
2149 if (!(PL_op->op_flags & OPf_STACKED))
2150 {
2151 /* ampersand-form of call, use @_ instead of stack */
2152 AV *av = GvAV (PL_defgv);
2153 arg = AvARRAY (av);
2154 items = AvFILLp (av) + 1;
2155 }
2156
2157 /* now call the init function, which needs to set up slf_frame */
2158 ((coro_slf_cb)CvXSUBANY (GvCV (gv)).any_ptr)
2159 (aTHX_ &slf_frame, GvCV (gv), arg, items);
2160
2161 /* pop args */
2162 SP = PL_stack_base + POPMARK;
2163
2164 PUTBACK;
2165 }
2166
2167 /* now that we have a slf_frame, interpret it! */
2168 /* we use a callback system not to make the code needlessly */
2169 /* complicated, but so we can run multiple perl coros from one cctx */
2170
2171 do
2172 {
2173 struct coro_transfer_args ta;
2174
2175 slf_frame.prepare (aTHX_ &ta);
2176 TRANSFER (ta, 0);
2177
2178 checkmark = PL_stack_sp - PL_stack_base;
2179 }
2180 while (slf_frame.check (aTHX_ &slf_frame));
2181
2182 slf_frame.prepare = 0; /* invalidate the frame, we are done processing it */
2183
2184 /* exception handling */
2185 if (expect_false (CORO_THROW))
2186 {
2187 SV *exception = sv_2mortal (CORO_THROW);
2188
2189 CORO_THROW = 0;
2190 sv_setsv (ERRSV, exception);
2191 croak (0);
2192 }
2193
2194 /* return value handling - mostly like entersub */
2195 /* make sure we put something on the stack in scalar context */
2196 if (GIMME_V == G_SCALAR)
2197 {
2198 dSP;
2199 SV **bot = PL_stack_base + checkmark;
2200
2201 if (sp == bot) /* too few, push undef */
2202 bot [1] = &PL_sv_undef;
2203 else if (sp != bot + 1) /* too many, take last one */
2204 bot [1] = *sp;
2205
2206 SP = bot + 1;
2207
2208 PUTBACK;
2209 }
2210
2211 return NORMAL;
2212}
2213
2214static void
2215api_execute_slf (pTHX_ CV *cv, coro_slf_cb init_cb, I32 ax)
2216{
2217 int i;
2218 SV **arg = PL_stack_base + ax;
2219 int items = PL_stack_sp - arg + 1;
2220
2221 assert (("FATAL: SLF call with illegal CV value", !CvANON (cv)));
2222
2223 if (PL_op->op_ppaddr != PL_ppaddr [OP_ENTERSUB]
2224 && PL_op->op_ppaddr != pp_slf)
2225 croak ("FATAL: Coro SLF calls can only be made normally, not via goto or any other means, caught");
2226
2227 CvFLAGS (cv) |= CVf_SLF;
2228 CvXSUBANY (cv).any_ptr = (void *)init_cb;
2229 slf_cv = cv;
2230
2231 /* we patch the op, and then re-run the whole call */
2232 /* we have to put the same argument on the stack for this to work */
2233 /* and this will be done by pp_restore */
2234 slf_restore.op_next = (OP *)&slf_restore;
2235 slf_restore.op_type = OP_CUSTOM;
2236 slf_restore.op_ppaddr = pp_restore;
2237 slf_restore.op_first = PL_op;
2238
2239 slf_ax = ax - 1; /* undo the ax++ inside dAXMARK */
2240
2241 if (PL_op->op_flags & OPf_STACKED)
2242 {
2243 if (items > slf_arga)
2244 {
2245 slf_arga = items;
2246 free (slf_argv);
2247 slf_argv = malloc (slf_arga * sizeof (SV *));
2248 }
2249
2250 slf_argc = items;
2251
2252 for (i = 0; i < items; ++i)
2253 slf_argv [i] = SvREFCNT_inc (arg [i]);
2254 }
2255 else
2256 slf_argc = 0;
2257
2258 PL_op->op_ppaddr = pp_slf;
2259 /*PL_op->op_type = OP_CUSTOM; /* we do behave like entersub still */
2260
2261 PL_op = (OP *)&slf_restore;
2262}
2263
2264/*****************************************************************************/
2265/* PerlIO::cede */
2266
2267typedef struct
2268{
2269 PerlIOBuf base;
2270 NV next, every;
2271} PerlIOCede;
2272
2273static IV
2274PerlIOCede_pushed (pTHX_ PerlIO *f, const char *mode, SV *arg, PerlIO_funcs *tab)
2275{
2276 PerlIOCede *self = PerlIOSelf (f, PerlIOCede);
2277
2278 self->every = SvCUR (arg) ? SvNV (arg) : 0.01;
2279 self->next = nvtime () + self->every;
2280
2281 return PerlIOBuf_pushed (aTHX_ f, mode, Nullsv, tab);
2282}
2283
2284static SV *
2285PerlIOCede_getarg (pTHX_ PerlIO *f, CLONE_PARAMS *param, int flags)
2286{
2287 PerlIOCede *self = PerlIOSelf (f, PerlIOCede);
2288
2289 return newSVnv (self->every);
2290}
2291
2292static IV
2293PerlIOCede_flush (pTHX_ PerlIO *f)
2294{
2295 PerlIOCede *self = PerlIOSelf (f, PerlIOCede);
2296 double now = nvtime ();
2297
2298 if (now >= self->next)
2299 {
2300 api_cede (aTHX);
2301 self->next = now + self->every;
2302 }
2303
2304 return PerlIOBuf_flush (aTHX_ f);
2305}
2306
2307static PerlIO_funcs PerlIO_cede =
2308{
2309 sizeof(PerlIO_funcs),
2310 "cede",
2311 sizeof(PerlIOCede),
2312 PERLIO_K_DESTRUCT | PERLIO_K_RAW,
2313 PerlIOCede_pushed,
2314 PerlIOBuf_popped,
2315 PerlIOBuf_open,
2316 PerlIOBase_binmode,
2317 PerlIOCede_getarg,
2318 PerlIOBase_fileno,
2319 PerlIOBuf_dup,
2320 PerlIOBuf_read,
2321 PerlIOBuf_unread,
2322 PerlIOBuf_write,
2323 PerlIOBuf_seek,
2324 PerlIOBuf_tell,
2325 PerlIOBuf_close,
2326 PerlIOCede_flush,
2327 PerlIOBuf_fill,
2328 PerlIOBase_eof,
2329 PerlIOBase_error,
2330 PerlIOBase_clearerr,
2331 PerlIOBase_setlinebuf,
2332 PerlIOBuf_get_base,
2333 PerlIOBuf_bufsiz,
2334 PerlIOBuf_get_ptr,
2335 PerlIOBuf_get_cnt,
2336 PerlIOBuf_set_ptrcnt,
2337};
2338
2339/*****************************************************************************/
2340/* Coro::Semaphore & Coro::Signal */
2341
2342static SV *
2343coro_waitarray_new (pTHX_ int count)
2344{
2345 /* a semaphore contains a counter IV in $sem->[0] and any waiters after that */
2346 AV *av = newAV ();
2347 SV **ary;
2348
2349 /* unfortunately, building manually saves memory */
2350 Newx (ary, 2, SV *);
2351 AvALLOC (av) = ary;
2352 /*AvARRAY (av) = ary;*/
2353 SvPVX ((SV *)av) = (char *)ary; /* 5.8.8 needs this syntax instead of AvARRAY = ary */
2354 AvMAX (av) = 1;
2355 AvFILLp (av) = 0;
2356 ary [0] = newSViv (count);
2357
2358 return newRV_noinc ((SV *)av);
2359}
2360
2361/* semaphore */
2362
2363static void
2364coro_semaphore_adjust (pTHX_ AV *av, IV adjust)
2365{
2366 SV *count_sv = AvARRAY (av)[0];
2367 IV count = SvIVX (count_sv);
2368
2369 count += adjust;
2370 SvIVX (count_sv) = count;
2371
2372 /* now wake up as many waiters as are expected to lock */
2373 while (count > 0 && AvFILLp (av) > 0)
2374 {
2375 SV *cb;
2376
2377 /* swap first two elements so we can shift a waiter */
2378 AvARRAY (av)[0] = AvARRAY (av)[1];
2379 AvARRAY (av)[1] = count_sv;
2380 cb = av_shift (av);
2381
2382 if (SvOBJECT (cb))
2383 {
2384 api_ready (aTHX_ cb);
2385 --count;
2386 }
2387 else if (SvTYPE (cb) == SVt_PVCV)
2388 {
2389 dSP;
2390 PUSHMARK (SP);
2391 XPUSHs (sv_2mortal (newRV_inc ((SV *)av)));
2392 PUTBACK;
2393 call_sv (cb, G_VOID | G_DISCARD | G_EVAL | G_KEEPERR);
2394 }
2395
2396 SvREFCNT_dec (cb);
2397 }
2398}
2399
2400static void
2401coro_semaphore_on_destroy (pTHX_ struct coro *coro)
2402{
2403 /* call $sem->adjust (0) to possibly wake up some other waiters */
2404 coro_semaphore_adjust (aTHX_ (AV *)coro->slf_frame.data, 0);
2405}
2406
2407static int
2408slf_check_semaphore_down_or_wait (pTHX_ struct CoroSLF *frame, int acquire)
2409{
2410 AV *av = (AV *)frame->data;
2411 SV *count_sv = AvARRAY (av)[0];
2412
2413 /* if we are about to throw, don't actually acquire the lock, just throw */
2414 if (CORO_THROW)
2415 return 0;
2416 else if (SvIVX (count_sv) > 0)
2417 {
2418 SvSTATE_current->on_destroy = 0;
2419
2420 if (acquire)
2421 SvIVX (count_sv) = SvIVX (count_sv) - 1;
2422 else
2423 coro_semaphore_adjust (aTHX_ av, 0);
2424
2425 return 0;
2426 }
2427 else
2428 {
2429 int i;
2430 /* if we were woken up but can't down, we look through the whole */
2431 /* waiters list and only add us if we aren't in there already */
2432 /* this avoids some degenerate memory usage cases */
2433
2434 for (i = 1; i <= AvFILLp (av); ++i)
2435 if (AvARRAY (av)[i] == SvRV (coro_current))
2436 return 1;
2437
2438 av_push (av, SvREFCNT_inc (SvRV (coro_current)));
2439 return 1;
2440 }
2441}
2442
2443static int
2444slf_check_semaphore_down (pTHX_ struct CoroSLF *frame)
2445{
2446 return slf_check_semaphore_down_or_wait (aTHX_ frame, 1);
2447}
2448
2449static int
2450slf_check_semaphore_wait (pTHX_ struct CoroSLF *frame)
2451{
2452 return slf_check_semaphore_down_or_wait (aTHX_ frame, 0);
2453}
2454
2455static void
2456slf_init_semaphore_down_or_wait (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2457{
2458 AV *av = (AV *)SvRV (arg [0]);
2459
2460 if (SvIVX (AvARRAY (av)[0]) > 0)
2461 {
2462 frame->data = (void *)av;
2463 frame->prepare = prepare_nop;
2464 }
2465 else
2466 {
2467 av_push (av, SvREFCNT_inc (SvRV (coro_current)));
2468
2469 frame->data = (void *)sv_2mortal (SvREFCNT_inc ((SV *)av));
2470 frame->prepare = prepare_schedule;
2471
2472 /* to avoid race conditions when a woken-up coro gets terminated */
2473 /* we arrange for a temporary on_destroy that calls adjust (0) */
2474 SvSTATE_current->on_destroy = coro_semaphore_on_destroy;
2475 }
2476}
2477
2478static void
2479slf_init_semaphore_down (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2480{
2481 slf_init_semaphore_down_or_wait (aTHX_ frame, cv, arg, items);
2482 frame->check = slf_check_semaphore_down;
2483}
2484
2485static void
2486slf_init_semaphore_wait (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2487{
2488 if (items >= 2)
2489 {
2490 /* callback form */
2491 AV *av = (AV *)SvRV (arg [0]);
2492 CV *cb_cv = coro_sv_2cv (aTHX_ arg [1]);
2493
2494 av_push (av, (SV *)SvREFCNT_inc_NN (cb_cv));
2495
2496 if (SvIVX (AvARRAY (av)[0]) > 0)
2497 coro_semaphore_adjust (aTHX_ av, 0);
2498
2499 frame->prepare = prepare_nop;
2500 frame->check = slf_check_nop;
2501 }
2502 else
2503 {
2504 slf_init_semaphore_down_or_wait (aTHX_ frame, cv, arg, items);
2505 frame->check = slf_check_semaphore_wait;
2506 }
2507}
2508
2509/* signal */
2510
2511static void
2512coro_signal_wake (pTHX_ AV *av, int count)
2513{
2514 SvIVX (AvARRAY (av)[0]) = 0;
2515
2516 /* now signal count waiters */
2517 while (count > 0 && AvFILLp (av) > 0)
2518 {
2519 SV *cb;
2520
2521 /* swap first two elements so we can shift a waiter */
2522 cb = AvARRAY (av)[0];
2523 AvARRAY (av)[0] = AvARRAY (av)[1];
2524 AvARRAY (av)[1] = cb;
2525
2526 cb = av_shift (av);
2527
2528 api_ready (aTHX_ cb);
2529 sv_setiv (cb, 0); /* signal waiter */
2530 SvREFCNT_dec (cb);
2531
2532 --count;
2533 }
2534}
2535
2536static int
2537slf_check_signal_wait (pTHX_ struct CoroSLF *frame)
2538{
2539 /* if we are about to throw, also stop waiting */
2540 return SvROK ((SV *)frame->data) && !CORO_THROW;
2541}
2542
2543static void
2544slf_init_signal_wait (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2545{
2546 AV *av = (AV *)SvRV (arg [0]);
2547
2548 if (SvIVX (AvARRAY (av)[0]))
2549 {
2550 SvIVX (AvARRAY (av)[0]) = 0;
2551 frame->prepare = prepare_nop;
2552 frame->check = slf_check_nop;
2553 }
2554 else
2555 {
2556 SV *waiter = newRV_inc (SvRV (coro_current)); /* owned by signal av */
2557
2558 av_push (av, waiter);
2559
2560 frame->data = (void *)sv_2mortal (SvREFCNT_inc_NN (waiter)); /* owned by process */
2561 frame->prepare = prepare_schedule;
2562 frame->check = slf_check_signal_wait;
2563 }
2564}
2565
2566/*****************************************************************************/
2567/* Coro::AIO */
2568
2569#define CORO_MAGIC_type_aio PERL_MAGIC_ext
2570
2571/* helper storage struct */
2572struct io_state
2573{
2574 int errorno;
2575 I32 laststype; /* U16 in 5.10.0 */
2576 int laststatval;
2577 Stat_t statcache;
2578};
2579
2580static void
2581coro_aio_callback (pTHX_ CV *cv)
2582{
2583 dXSARGS;
2584 AV *state = (AV *)GENSUB_ARG;
2585 SV *coro = av_pop (state);
2586 SV *data_sv = newSV (sizeof (struct io_state));
2587
2588 av_extend (state, items - 1);
2589
2590 sv_upgrade (data_sv, SVt_PV);
2591 SvCUR_set (data_sv, sizeof (struct io_state));
2592 SvPOK_only (data_sv);
2593
2594 {
2595 struct io_state *data = (struct io_state *)SvPVX (data_sv);
2596
2597 data->errorno = errno;
2598 data->laststype = PL_laststype;
2599 data->laststatval = PL_laststatval;
2600 data->statcache = PL_statcache;
2601 }
2602
2603 /* now build the result vector out of all the parameters and the data_sv */
2604 {
2605 int i;
2606
2607 for (i = 0; i < items; ++i)
2608 av_push (state, SvREFCNT_inc_NN (ST (i)));
2609 }
2610
2611 av_push (state, data_sv);
2612
2613 api_ready (aTHX_ coro);
2614 SvREFCNT_dec (coro);
2615 SvREFCNT_dec ((AV *)state);
2616}
2617
2618static int
2619slf_check_aio_req (pTHX_ struct CoroSLF *frame)
2620{
2621 AV *state = (AV *)frame->data;
2622
2623 /* if we are about to throw, return early */
2624 /* this does not cancel the aio request, but at least */
2625 /* it quickly returns */
2626 if (CORO_THROW)
2627 return 0;
2628
2629 /* one element that is an RV? repeat! */
2630 if (AvFILLp (state) == 0 && SvROK (AvARRAY (state)[0]))
2631 return 1;
2632
2633 /* restore status */
2634 {
2635 SV *data_sv = av_pop (state);
2636 struct io_state *data = (struct io_state *)SvPVX (data_sv);
2637
2638 errno = data->errorno;
2639 PL_laststype = data->laststype;
2640 PL_laststatval = data->laststatval;
2641 PL_statcache = data->statcache;
2642
2643 SvREFCNT_dec (data_sv);
2644 }
2645
2646 /* push result values */
2647 {
2648 dSP;
2649 int i;
2650
2651 EXTEND (SP, AvFILLp (state) + 1);
2652 for (i = 0; i <= AvFILLp (state); ++i)
2653 PUSHs (sv_2mortal (SvREFCNT_inc_NN (AvARRAY (state)[i])));
2654
2655 PUTBACK;
2656 }
2657
2658 return 0;
2659}
2660
2661static void
2662slf_init_aio_req (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2663{
2664 AV *state = (AV *)sv_2mortal ((SV *)newAV ());
2665 SV *coro_hv = SvRV (coro_current);
2666 struct coro *coro = SvSTATE_hv (coro_hv);
2667
2668 /* put our coroutine id on the state arg */
2669 av_push (state, SvREFCNT_inc_NN (coro_hv));
2670
2671 /* first see whether we have a non-zero priority and set it as AIO prio */
2672 if (coro->prio)
2673 {
2674 dSP;
2675
2676 static SV *prio_cv;
2677 static SV *prio_sv;
2678
2679 if (expect_false (!prio_cv))
2680 {
2681 prio_cv = (SV *)get_cv ("IO::AIO::aioreq_pri", 0);
2682 prio_sv = newSViv (0);
2683 }
2684
2685 PUSHMARK (SP);
2686 sv_setiv (prio_sv, coro->prio);
2687 XPUSHs (prio_sv);
2688
2689 PUTBACK;
2690 call_sv (prio_cv, G_VOID | G_DISCARD);
2691 }
2692
2693 /* now call the original request */
2694 {
2695 dSP;
2696 CV *req = (CV *)CORO_MAGIC_NN ((SV *)cv, CORO_MAGIC_type_aio)->mg_obj;
2697 int i;
2698
2699 PUSHMARK (SP);
2700
2701 /* first push all args to the stack */
2702 EXTEND (SP, items + 1);
2703
2704 for (i = 0; i < items; ++i)
2705 PUSHs (arg [i]);
2706
2707 /* now push the callback closure */
2708 PUSHs (sv_2mortal (gensub (aTHX_ coro_aio_callback, (void *)SvREFCNT_inc_NN ((SV *)state))));
2709
2710 /* now call the AIO function - we assume our request is uncancelable */
2711 PUTBACK;
2712 call_sv ((SV *)req, G_VOID | G_DISCARD);
2713 }
2714
2715 /* now that the requets is going, we loop toll we have a result */
2716 frame->data = (void *)state;
2717 frame->prepare = prepare_schedule;
2718 frame->check = slf_check_aio_req;
2719}
2720
2721static void
2722coro_aio_req_xs (pTHX_ CV *cv)
2723{
2724 dXSARGS;
2725
2726 CORO_EXECUTE_SLF_XS (slf_init_aio_req);
2727
2728 XSRETURN_EMPTY;
2729}
2730
2731/*****************************************************************************/
2732
2733#if CORO_CLONE
2734# include "clone.c"
2735#endif
2736
2737MODULE = Coro::State PACKAGE = Coro::State PREFIX = api_
2738
2739PROTOTYPES: DISABLE
2740
2741BOOT:
2742{
2743#ifdef USE_ITHREADS
2744# if CORO_PTHREAD
2745 coro_thx = PERL_GET_CONTEXT;
2746# endif
2747#endif
2748 BOOT_PAGESIZE;
2749
2750 cctx_current = cctx_new_empty ();
2751
2752 irsgv = gv_fetchpv ("/" , GV_ADD|GV_NOTQUAL, SVt_PV);
2753 stdoutgv = gv_fetchpv ("STDOUT", GV_ADD|GV_NOTQUAL, SVt_PVIO);
2754
2755 orig_sigelem_get = PL_vtbl_sigelem.svt_get; PL_vtbl_sigelem.svt_get = coro_sigelem_get;
2756 orig_sigelem_set = PL_vtbl_sigelem.svt_set; PL_vtbl_sigelem.svt_set = coro_sigelem_set;
2757 orig_sigelem_clr = PL_vtbl_sigelem.svt_clear; PL_vtbl_sigelem.svt_clear = coro_sigelem_clr;
2758
2759 hv_sig = coro_get_hv (aTHX_ "SIG", TRUE);
2760 rv_diehook = newRV_inc ((SV *)gv_fetchpv ("Coro::State::diehook" , 0, SVt_PVCV));
2761 rv_warnhook = newRV_inc ((SV *)gv_fetchpv ("Coro::State::warnhook", 0, SVt_PVCV));
2762
2763 coro_state_stash = gv_stashpv ("Coro::State", TRUE);
2764
2765 newCONSTSUB (coro_state_stash, "CC_TRACE" , newSViv (CC_TRACE));
2766 newCONSTSUB (coro_state_stash, "CC_TRACE_SUB" , newSViv (CC_TRACE_SUB));
2767 newCONSTSUB (coro_state_stash, "CC_TRACE_LINE", newSViv (CC_TRACE_LINE));
2768 newCONSTSUB (coro_state_stash, "CC_TRACE_ALL" , newSViv (CC_TRACE_ALL));
2769
2770 main_mainstack = PL_mainstack;
2771 main_top_env = PL_top_env;
2772
2773 while (main_top_env->je_prev)
2774 main_top_env = main_top_env->je_prev;
2775
2776 {
2777 SV *slf = sv_2mortal (newSViv (PTR2IV (pp_slf)));
2778
2779 if (!PL_custom_op_names) PL_custom_op_names = newHV ();
2780 hv_store_ent (PL_custom_op_names, slf, newSVpv ("coro_slf", 0), 0);
2781
2782 if (!PL_custom_op_descs) PL_custom_op_descs = newHV ();
2783 hv_store_ent (PL_custom_op_descs, slf, newSVpv ("coro schedule like function", 0), 0);
2784 }
2785
2786 coroapi.ver = CORO_API_VERSION;
2787 coroapi.rev = CORO_API_REVISION;
2788
2789 coroapi.transfer = api_transfer;
2790
2791 coroapi.sv_state = SvSTATE_;
2792 coroapi.execute_slf = api_execute_slf;
2793 coroapi.prepare_nop = prepare_nop;
2794 coroapi.prepare_schedule = prepare_schedule;
2795 coroapi.prepare_cede = prepare_cede;
2796 coroapi.prepare_cede_notself = prepare_cede_notself;
2797
2798 {
2799 SV **svp = hv_fetch (PL_modglobal, "Time::NVtime", 12, 0);
2800
2801 if (!svp) croak ("Time::HiRes is required");
2802 if (!SvIOK (*svp)) croak ("Time::NVtime isn't a function pointer");
2803
2804 nvtime = INT2PTR (double (*)(), SvIV (*svp));
2805 }
2806
2807 assert (("PRIO_NORMAL must be 0", !PRIO_NORMAL));
2808}
2809
2810SV *
2811new (char *klass, ...)
2812 ALIAS:
2813 Coro::new = 1
2814 CODE:
2815{
2816 struct coro *coro;
2817 MAGIC *mg;
2818 HV *hv;
2819 CV *cb;
2820 int i;
2821
2822 if (items > 1)
2823 {
2824 cb = coro_sv_2cv (aTHX_ ST (1));
2825
2826 if (!ix)
235 { 2827 {
236 /* I never used formats, so how should I know how these are implemented? */ 2828 if (CvISXSUB (cb))
237 /* my bold guess is as a simple, plain sub... */ 2829 croak ("Coro::State doesn't support XS functions as coroutine start, caught");
238 croak ("CXt_FORMAT not yet handled. Don't switch coroutines from within formats"); 2830
2831 if (!CvROOT (cb))
2832 croak ("Coro::State doesn't support autoloaded or undefined functions as coroutine start, caught");
239 } 2833 }
240 } 2834 }
241 2835
242 if (top_si->si_type == PERLSI_MAIN)
243 break;
244
245 top_si = top_si->si_prev;
246 ccstk = top_si->si_cxstack;
247 cxix = top_si->si_cxix;
248 }
249
250 PUTBACK;
251 }
252
253 c->dowarn = PL_dowarn;
254 c->defav = GvAV (PL_defgv);
255 c->curstackinfo = PL_curstackinfo;
256 c->curstack = PL_curstack;
257 c->mainstack = PL_mainstack;
258 c->stack_sp = PL_stack_sp;
259 c->op = PL_op;
260 c->curpad = PL_curpad;
261 c->stack_base = PL_stack_base;
262 c->stack_max = PL_stack_max;
263 c->tmps_stack = PL_tmps_stack;
264 c->tmps_floor = PL_tmps_floor;
265 c->tmps_ix = PL_tmps_ix;
266 c->tmps_max = PL_tmps_max;
267 c->markstack = PL_markstack;
268 c->markstack_ptr = PL_markstack_ptr;
269 c->markstack_max = PL_markstack_max;
270 c->scopestack = PL_scopestack;
271 c->scopestack_ix = PL_scopestack_ix;
272 c->scopestack_max = PL_scopestack_max;
273 c->savestack = PL_savestack;
274 c->savestack_ix = PL_savestack_ix;
275 c->savestack_max = PL_savestack_max;
276 c->retstack = PL_retstack;
277 c->retstack_ix = PL_retstack_ix;
278 c->retstack_max = PL_retstack_max;
279 c->curcop = PL_curcop;
280}
281
282static void
283LOAD(pTHX_ Coro__State c)
284{
285 PL_dowarn = c->dowarn;
286 GvAV (PL_defgv) = c->defav;
287 PL_curstackinfo = c->curstackinfo;
288 PL_curstack = c->curstack;
289 PL_mainstack = c->mainstack;
290 PL_stack_sp = c->stack_sp;
291 PL_op = c->op;
292 PL_curpad = c->curpad;
293 PL_stack_base = c->stack_base;
294 PL_stack_max = c->stack_max;
295 PL_tmps_stack = c->tmps_stack;
296 PL_tmps_floor = c->tmps_floor;
297 PL_tmps_ix = c->tmps_ix;
298 PL_tmps_max = c->tmps_max;
299 PL_markstack = c->markstack;
300 PL_markstack_ptr = c->markstack_ptr;
301 PL_markstack_max = c->markstack_max;
302 PL_scopestack = c->scopestack;
303 PL_scopestack_ix = c->scopestack_ix;
304 PL_scopestack_max = c->scopestack_max;
305 PL_savestack = c->savestack;
306 PL_savestack_ix = c->savestack_ix;
307 PL_savestack_max = c->savestack_max;
308 PL_retstack = c->retstack;
309 PL_retstack_ix = c->retstack_ix;
310 PL_retstack_max = c->retstack_max;
311 PL_curcop = c->curcop;
312
313 {
314 dSP;
315 CV *cv;
316
317 /* now do the ugly restore mess */
318 while ((cv = (CV *)POPs))
319 {
320 AV *padlist = (AV *)POPs;
321
322 put_padlist (cv);
323 CvPADLIST(cv) = padlist;
324 CvDEPTH(cv) = (I32)POPs;
325
326#ifdef USE_THREADS
327 CvOWNER(cv) = (struct perl_thread *)POPs;
328 error does not work either
329#endif
330 }
331
332 PUTBACK;
333 }
334}
335
336/* this is an EXACT copy of S_nuke_stacks in perl.c, which is unfortunately static */
337STATIC void
338destroy_stacks(pTHX)
339{
340 dSP;
341
342 /* die does this while calling POPSTACK, but I just don't see why. */
343 dounwind(-1);
344
345 /* is this ugly, I ask? */
346 while (PL_scopestack_ix)
347 LEAVE;
348
349 while (PL_curstackinfo->si_next)
350 PL_curstackinfo = PL_curstackinfo->si_next;
351
352 while (PL_curstackinfo)
353 {
354 PERL_SI *p = PL_curstackinfo->si_prev;
355
356 SvREFCNT_dec(PL_curstackinfo->si_stack);
357 Safefree(PL_curstackinfo->si_cxstack);
358 Safefree(PL_curstackinfo);
359 PL_curstackinfo = p;
360 }
361
362 if (PL_scopestack_ix != 0)
363 Perl_warner(aTHX_ WARN_INTERNAL,
364 "Unbalanced scopes: %ld more ENTERs than LEAVEs\n",
365 (long)PL_scopestack_ix);
366 if (PL_savestack_ix != 0)
367 Perl_warner(aTHX_ WARN_INTERNAL,
368 "Unbalanced saves: %ld more saves than restores\n",
369 (long)PL_savestack_ix);
370 if (PL_tmps_floor != -1)
371 Perl_warner(aTHX_ WARN_INTERNAL,"Unbalanced tmps: %ld more allocs than frees\n",
372 (long)PL_tmps_floor + 1);
373 /*
374 */
375 Safefree(PL_tmps_stack);
376 Safefree(PL_markstack);
377 Safefree(PL_scopestack);
378 Safefree(PL_savestack);
379 Safefree(PL_retstack);
380}
381
382#define SUB_INIT "Coro::State::_newcoro"
383
384MODULE = Coro::State PACKAGE = Coro::State
385
386PROTOTYPES: ENABLE
387
388BOOT:
389 if (!padlist_cache)
390 padlist_cache = newHV ();
391
392Coro::State
393_newprocess(args)
394 SV * args
395 PROTOTYPE: $
396 CODE:
397 Coro__State coro;
398
399 if (!SvROK (args) || SvTYPE (SvRV (args)) != SVt_PVAV)
400 croak ("Coro::State::newprocess expects an arrayref");
401
402 New (0, coro, 1, struct coro); 2836 Newz (0, coro, 1, struct coro);
2837 coro->args = newAV ();
2838 coro->flags = CF_NEW;
403 2839
404 coro->mainstack = 0; /* actual work is done inside transfer */ 2840 if (coro_first) coro_first->prev = coro;
405 coro->args = (AV *)SvREFCNT_inc (SvRV (args)); 2841 coro->next = coro_first;
2842 coro_first = coro;
406 2843
407 RETVAL = coro; 2844 coro->hv = hv = newHV ();
2845 mg = sv_magicext ((SV *)hv, 0, CORO_MAGIC_type_state, &coro_state_vtbl, (char *)coro, 0);
2846 mg->mg_flags |= MGf_DUP;
2847 RETVAL = sv_bless (newRV_noinc ((SV *)hv), gv_stashpv (klass, 1));
2848
2849 if (items > 1)
2850 {
2851 av_extend (coro->args, items - 1 + ix - 1);
2852
2853 if (ix)
2854 {
2855 av_push (coro->args, SvREFCNT_inc_NN ((SV *)cb));
2856 cb = cv_coro_run;
2857 }
2858
2859 coro->startcv = (CV *)SvREFCNT_inc_NN ((SV *)cb);
2860
2861 for (i = 2; i < items; i++)
2862 av_push (coro->args, newSVsv (ST (i)));
2863 }
2864}
408 OUTPUT: 2865 OUTPUT:
409 RETVAL 2866 RETVAL
410 2867
411void 2868void
412transfer(prev,next) 2869transfer (...)
413 Coro::State_or_hashref prev 2870 PROTOTYPE: $$
414 Coro::State_or_hashref next 2871 CODE:
415 CODE: 2872 CORO_EXECUTE_SLF_XS (slf_init_transfer);
416 2873
417 if (prev != next) 2874bool
2875_destroy (SV *coro_sv)
2876 CODE:
2877 RETVAL = coro_state_destroy (aTHX_ SvSTATE (coro_sv));
2878 OUTPUT:
2879 RETVAL
2880
2881void
2882_exit (int code)
2883 PROTOTYPE: $
2884 CODE:
2885 _exit (code);
2886
2887SV *
2888clone (Coro::State coro)
2889 CODE:
2890{
2891#if CORO_CLONE
2892 struct coro *ncoro = coro_clone (coro);
2893 MAGIC *mg;
2894 /* TODO: too much duplication */
2895 ncoro->hv = newHV ();
2896 mg = sv_magicext ((SV *)ncoro->hv, 0, CORO_MAGIC_type_state, &coro_state_vtbl, (char *)ncoro, 0);
2897 mg->mg_flags |= MGf_DUP;
2898 RETVAL = sv_bless (newRV_noinc ((SV *)ncoro->hv), SvSTASH (coro->hv));
2899#else
2900 croak ("Coro::State->clone has not been configured into this installation of Coro, realised");
2901#endif
2902}
2903 OUTPUT:
2904 RETVAL
2905
2906int
2907cctx_stacksize (int new_stacksize = 0)
2908 PROTOTYPE: ;$
2909 CODE:
2910 RETVAL = cctx_stacksize;
2911 if (new_stacksize)
418 { 2912 {
419 PUTBACK; 2913 cctx_stacksize = new_stacksize;
420 SAVE (aTHX_ prev); 2914 ++cctx_gen;
421
422 /* 2915 }
423 * this could be done in newprocess which would lead to 2916 OUTPUT:
424 * extremely elegant and fast (just PUTBACK/SAVE/LOAD/SPAGAIN) 2917 RETVAL
425 * code here, but lazy allocation of stacks has also 2918
426 * some virtues and the overhead of the if() is nil. 2919int
2920cctx_max_idle (int max_idle = 0)
2921 PROTOTYPE: ;$
2922 CODE:
2923 RETVAL = cctx_max_idle;
2924 if (max_idle > 1)
2925 cctx_max_idle = max_idle;
2926 OUTPUT:
2927 RETVAL
2928
2929int
2930cctx_count ()
2931 PROTOTYPE:
2932 CODE:
2933 RETVAL = cctx_count;
2934 OUTPUT:
2935 RETVAL
2936
2937int
2938cctx_idle ()
2939 PROTOTYPE:
2940 CODE:
2941 RETVAL = cctx_idle;
2942 OUTPUT:
2943 RETVAL
2944
2945void
2946list ()
2947 PROTOTYPE:
2948 PPCODE:
2949{
2950 struct coro *coro;
2951 for (coro = coro_first; coro; coro = coro->next)
2952 if (coro->hv)
2953 XPUSHs (sv_2mortal (newRV_inc ((SV *)coro->hv)));
2954}
2955
2956void
2957call (Coro::State coro, SV *coderef)
2958 ALIAS:
2959 eval = 1
2960 CODE:
2961{
2962 if (coro->mainstack && ((coro->flags & CF_RUNNING) || coro->slot))
427 */ 2963 {
428 if (next->mainstack) 2964 struct coro temp;
2965
2966 if (!(coro->flags & CF_RUNNING))
429 { 2967 {
430 LOAD (aTHX_ next); 2968 PUTBACK;
431 next->mainstack = 0; /* unnecessary but much cleaner */ 2969 save_perl (aTHX_ &temp);
2970 load_perl (aTHX_ coro);
2971 }
2972
2973 {
2974 dSP;
2975 ENTER;
2976 SAVETMPS;
2977 PUTBACK;
2978 PUSHSTACK;
2979 PUSHMARK (SP);
2980
2981 if (ix)
2982 eval_sv (coderef, 0);
2983 else
2984 call_sv (coderef, G_KEEPERR | G_EVAL | G_VOID | G_DISCARD);
2985
2986 POPSTACK;
2987 SPAGAIN;
2988 FREETMPS;
2989 LEAVE;
2990 PUTBACK;
2991 }
2992
2993 if (!(coro->flags & CF_RUNNING))
2994 {
2995 save_perl (aTHX_ coro);
2996 load_perl (aTHX_ &temp);
432 SPAGAIN; 2997 SPAGAIN;
433 } 2998 }
434 else
435 {
436 /*
437 * emulate part of the perl startup here.
438 */
439 UNOP myop;
440
441 init_stacks (); /* from perl.c */
442 PL_op = (OP *)&myop;
443 /*PL_curcop = 0;*/
444 GvAV (PL_defgv) = (SV *)SvREFCNT_inc (next->args);
445
446 SPAGAIN;
447 Zero(&myop, 1, UNOP);
448 myop.op_next = Nullop;
449 myop.op_flags = OPf_WANT_VOID;
450
451 PUSHMARK(SP);
452 XPUSHs ((SV*)get_cv(SUB_INIT, TRUE));
453 PUTBACK;
454 /*
455 * the next line is slightly wrong, as PL_op->op_next
456 * is actually being executed so we skip the first op.
457 * that doesn't matter, though, since it is only
458 * pp_nextstate and we never return...
459 */
460 PL_op = Perl_pp_entersub(aTHX);
461 SPAGAIN;
462
463 ENTER;
464 }
465 } 2999 }
3000}
3001
3002SV *
3003is_ready (Coro::State coro)
3004 PROTOTYPE: $
3005 ALIAS:
3006 is_ready = CF_READY
3007 is_running = CF_RUNNING
3008 is_new = CF_NEW
3009 is_destroyed = CF_DESTROYED
3010 CODE:
3011 RETVAL = boolSV (coro->flags & ix);
3012 OUTPUT:
3013 RETVAL
466 3014
467void 3015void
468DESTROY(coro) 3016throw (Coro::State self, SV *throw = &PL_sv_undef)
469 Coro::State coro 3017 PROTOTYPE: $;$
470 CODE: 3018 CODE:
3019{
3020 struct coro *current = SvSTATE_current;
3021 SV **throwp = self == current ? &CORO_THROW : &self->except;
3022 SvREFCNT_dec (*throwp);
3023 *throwp = SvOK (throw) ? newSVsv (throw) : 0;
3024}
471 3025
472 if (coro->mainstack) 3026void
3027api_trace (SV *coro, int flags = CC_TRACE | CC_TRACE_SUB)
3028 PROTOTYPE: $;$
3029 C_ARGS: aTHX_ coro, flags
3030
3031SV *
3032has_cctx (Coro::State coro)
3033 PROTOTYPE: $
3034 CODE:
3035 /* maybe manage the running flag differently */
3036 RETVAL = boolSV (!!coro->cctx || (coro->flags & CF_RUNNING));
3037 OUTPUT:
3038 RETVAL
3039
3040int
3041is_traced (Coro::State coro)
3042 PROTOTYPE: $
3043 CODE:
3044 RETVAL = (coro->cctx ? coro->cctx->flags : 0) & CC_TRACE_ALL;
3045 OUTPUT:
3046 RETVAL
3047
3048UV
3049rss (Coro::State coro)
3050 PROTOTYPE: $
3051 ALIAS:
3052 usecount = 1
3053 CODE:
3054 switch (ix)
3055 {
3056 case 0: RETVAL = coro_rss (aTHX_ coro); break;
3057 case 1: RETVAL = coro->usecount; break;
3058 }
3059 OUTPUT:
3060 RETVAL
3061
3062void
3063force_cctx ()
3064 PROTOTYPE:
3065 CODE:
3066 cctx_current->idle_sp = 0;
3067
3068void
3069swap_defsv (Coro::State self)
3070 PROTOTYPE: $
3071 ALIAS:
3072 swap_defav = 1
3073 CODE:
3074 if (!self->slot)
3075 croak ("cannot swap state with coroutine that has no saved state,");
3076 else
473 { 3077 {
474 struct coro temp; 3078 SV **src = ix ? (SV **)&GvAV (PL_defgv) : &GvSV (PL_defgv);
3079 SV **dst = ix ? (SV **)&self->slot->defav : (SV **)&self->slot->defsv;
475 3080
3081 SV *tmp = *src; *src = *dst; *dst = tmp;
3082 }
3083
3084
3085MODULE = Coro::State PACKAGE = Coro
3086
3087BOOT:
3088{
3089 int i;
3090
3091 sv_pool_rss = coro_get_sv (aTHX_ "Coro::POOL_RSS" , TRUE);
3092 sv_pool_size = coro_get_sv (aTHX_ "Coro::POOL_SIZE" , TRUE);
3093 cv_coro_run = get_cv ( "Coro::_terminate", GV_ADD);
3094 cv_coro_terminate = get_cv ( "Coro::terminate" , GV_ADD);
3095 coro_current = coro_get_sv (aTHX_ "Coro::current" , FALSE); SvREADONLY_on (coro_current);
3096 av_async_pool = coro_get_av (aTHX_ "Coro::async_pool", TRUE);
3097 av_destroy = coro_get_av (aTHX_ "Coro::destroy" , TRUE);
3098 sv_manager = coro_get_sv (aTHX_ "Coro::manager" , TRUE);
3099
3100 sv_async_pool_idle = newSVpv ("[async pool idle]", 0); SvREADONLY_on (sv_async_pool_idle);
3101 sv_Coro = newSVpv ("Coro", 0); SvREADONLY_on (sv_Coro);
3102 cv_pool_handler = get_cv ("Coro::pool_handler", GV_ADD); SvREADONLY_on (cv_pool_handler);
3103 cv_coro_state_new = get_cv ("Coro::State::new", 0); SvREADONLY_on (cv_coro_state_new);
3104
3105 coro_stash = gv_stashpv ("Coro", TRUE);
3106
3107 newCONSTSUB (coro_stash, "PRIO_MAX", newSViv (PRIO_MAX));
3108 newCONSTSUB (coro_stash, "PRIO_HIGH", newSViv (PRIO_HIGH));
3109 newCONSTSUB (coro_stash, "PRIO_NORMAL", newSViv (PRIO_NORMAL));
3110 newCONSTSUB (coro_stash, "PRIO_LOW", newSViv (PRIO_LOW));
3111 newCONSTSUB (coro_stash, "PRIO_IDLE", newSViv (PRIO_IDLE));
3112 newCONSTSUB (coro_stash, "PRIO_MIN", newSViv (PRIO_MIN));
3113
3114 for (i = PRIO_MAX - PRIO_MIN + 1; i--; )
3115 coro_ready[i] = newAV ();
3116
3117 {
3118 SV *sv = coro_get_sv (aTHX_ "Coro::API", TRUE);
3119
3120 coroapi.schedule = api_schedule;
3121 coroapi.schedule_to = api_schedule_to;
3122 coroapi.cede = api_cede;
3123 coroapi.cede_notself = api_cede_notself;
3124 coroapi.ready = api_ready;
3125 coroapi.is_ready = api_is_ready;
3126 coroapi.nready = coro_nready;
3127 coroapi.current = coro_current;
3128
3129 /*GCoroAPI = &coroapi;*/
3130 sv_setiv (sv, (IV)&coroapi);
3131 SvREADONLY_on (sv);
3132 }
3133}
3134
3135void
3136terminate (...)
3137 CODE:
3138 CORO_EXECUTE_SLF_XS (slf_init_terminate);
3139
3140void
3141schedule (...)
3142 CODE:
3143 CORO_EXECUTE_SLF_XS (slf_init_schedule);
3144
3145void
3146schedule_to (...)
3147 CODE:
3148 CORO_EXECUTE_SLF_XS (slf_init_schedule_to);
3149
3150void
3151cede_to (...)
3152 CODE:
3153 CORO_EXECUTE_SLF_XS (slf_init_cede_to);
3154
3155void
3156cede (...)
3157 CODE:
3158 CORO_EXECUTE_SLF_XS (slf_init_cede);
3159
3160void
3161cede_notself (...)
3162 CODE:
3163 CORO_EXECUTE_SLF_XS (slf_init_cede_notself);
3164
3165void
3166_cancel (Coro::State self)
3167 CODE:
3168 coro_state_destroy (aTHX_ self);
3169 coro_call_on_destroy (aTHX_ self);
3170
3171void
3172_set_current (SV *current)
3173 PROTOTYPE: $
3174 CODE:
3175 SvREFCNT_dec (SvRV (coro_current));
3176 SvRV_set (coro_current, SvREFCNT_inc_NN (SvRV (current)));
3177
3178void
3179_set_readyhook (SV *hook)
3180 PROTOTYPE: $
3181 CODE:
3182 SvREFCNT_dec (coro_readyhook);
3183 coro_readyhook = SvOK (hook) ? newSVsv (hook) : 0;
3184
3185int
3186prio (Coro::State coro, int newprio = 0)
3187 PROTOTYPE: $;$
3188 ALIAS:
3189 nice = 1
3190 CODE:
3191{
3192 RETVAL = coro->prio;
3193
3194 if (items > 1)
3195 {
3196 if (ix)
3197 newprio = coro->prio - newprio;
3198
3199 if (newprio < PRIO_MIN) newprio = PRIO_MIN;
3200 if (newprio > PRIO_MAX) newprio = PRIO_MAX;
3201
3202 coro->prio = newprio;
3203 }
3204}
3205 OUTPUT:
3206 RETVAL
3207
3208SV *
3209ready (SV *self)
3210 PROTOTYPE: $
3211 CODE:
3212 RETVAL = boolSV (api_ready (aTHX_ self));
3213 OUTPUT:
3214 RETVAL
3215
3216int
3217nready (...)
3218 PROTOTYPE:
3219 CODE:
3220 RETVAL = coro_nready;
3221 OUTPUT:
3222 RETVAL
3223
3224void
3225_pool_handler (...)
3226 CODE:
3227 CORO_EXECUTE_SLF_XS (slf_init_pool_handler);
3228
3229void
3230async_pool (SV *cv, ...)
3231 PROTOTYPE: &@
3232 PPCODE:
3233{
3234 HV *hv = (HV *)av_pop (av_async_pool);
3235 AV *av = newAV ();
3236 SV *cb = ST (0);
3237 int i;
3238
3239 av_extend (av, items - 2);
3240 for (i = 1; i < items; ++i)
3241 av_push (av, SvREFCNT_inc_NN (ST (i)));
3242
3243 if ((SV *)hv == &PL_sv_undef)
3244 {
3245 PUSHMARK (SP);
3246 EXTEND (SP, 2);
3247 PUSHs (sv_Coro);
3248 PUSHs ((SV *)cv_pool_handler);
476 PUTBACK; 3249 PUTBACK;
477 SAVE(aTHX_ (&temp)); 3250 call_sv ((SV *)cv_coro_state_new, G_SCALAR);
478 LOAD(aTHX_ coro);
479
480 destroy_stacks ();
481 SvREFCNT_dec ((SV *)GvAV (PL_defgv));
482
483 LOAD((&temp));
484 SPAGAIN; 3251 SPAGAIN;
3252
3253 hv = (HV *)SvREFCNT_inc_NN (SvRV (POPs));
485 } 3254 }
486 3255
3256 {
3257 struct coro *coro = SvSTATE_hv (hv);
3258
3259 assert (!coro->invoke_cb);
3260 assert (!coro->invoke_av);
3261 coro->invoke_cb = SvREFCNT_inc (cb);
3262 coro->invoke_av = av;
3263 }
3264
3265 api_ready (aTHX_ (SV *)hv);
3266
3267 if (GIMME_V != G_VOID)
3268 XPUSHs (sv_2mortal (newRV_noinc ((SV *)hv)));
3269 else
487 SvREFCNT_dec (coro->args); 3270 SvREFCNT_dec (hv);
488 Safefree (coro); 3271}
489 3272
3273SV *
3274rouse_cb ()
3275 PROTOTYPE:
3276 CODE:
3277 RETVAL = coro_new_rouse_cb (aTHX);
3278 OUTPUT:
3279 RETVAL
490 3280
3281void
3282rouse_wait (...)
3283 PROTOTYPE: ;$
3284 PPCODE:
3285 CORO_EXECUTE_SLF_XS (slf_init_rouse_wait);
3286
3287
3288MODULE = Coro::State PACKAGE = PerlIO::cede
3289
3290BOOT:
3291 PerlIO_define_layer (aTHX_ &PerlIO_cede);
3292
3293
3294MODULE = Coro::State PACKAGE = Coro::Semaphore
3295
3296SV *
3297new (SV *klass, SV *count = 0)
3298 CODE:
3299 RETVAL = sv_bless (
3300 coro_waitarray_new (aTHX_ count && SvOK (count) ? SvIV (count) : 1),
3301 GvSTASH (CvGV (cv))
3302 );
3303 OUTPUT:
3304 RETVAL
3305
3306# helper for Coro::Channel
3307SV *
3308_alloc (int count)
3309 CODE:
3310 RETVAL = coro_waitarray_new (aTHX_ count);
3311 OUTPUT:
3312 RETVAL
3313
3314SV *
3315count (SV *self)
3316 CODE:
3317 RETVAL = newSVsv (AvARRAY ((AV *)SvRV (self))[0]);
3318 OUTPUT:
3319 RETVAL
3320
3321void
3322up (SV *self, int adjust = 1)
3323 ALIAS:
3324 adjust = 1
3325 CODE:
3326 coro_semaphore_adjust (aTHX_ (AV *)SvRV (self), ix ? adjust : 1);
3327
3328void
3329down (...)
3330 CODE:
3331 CORO_EXECUTE_SLF_XS (slf_init_semaphore_down);
3332
3333void
3334wait (...)
3335 CODE:
3336 CORO_EXECUTE_SLF_XS (slf_init_semaphore_wait);
3337
3338void
3339try (SV *self)
3340 PPCODE:
3341{
3342 AV *av = (AV *)SvRV (self);
3343 SV *count_sv = AvARRAY (av)[0];
3344 IV count = SvIVX (count_sv);
3345
3346 if (count > 0)
3347 {
3348 --count;
3349 SvIVX (count_sv) = count;
3350 XSRETURN_YES;
3351 }
3352 else
3353 XSRETURN_NO;
3354}
3355
3356void
3357waiters (SV *self)
3358 PPCODE:
3359{
3360 AV *av = (AV *)SvRV (self);
3361 int wcount = AvFILLp (av) + 1 - 1;
3362
3363 if (GIMME_V == G_SCALAR)
3364 XPUSHs (sv_2mortal (newSViv (wcount)));
3365 else
3366 {
3367 int i;
3368 EXTEND (SP, wcount);
3369 for (i = 1; i <= wcount; ++i)
3370 PUSHs (sv_2mortal (newRV_inc (AvARRAY (av)[i])));
3371 }
3372}
3373
3374MODULE = Coro::State PACKAGE = Coro::Signal
3375
3376SV *
3377new (SV *klass)
3378 CODE:
3379 RETVAL = sv_bless (
3380 coro_waitarray_new (aTHX_ 0),
3381 GvSTASH (CvGV (cv))
3382 );
3383 OUTPUT:
3384 RETVAL
3385
3386void
3387wait (...)
3388 CODE:
3389 CORO_EXECUTE_SLF_XS (slf_init_signal_wait);
3390
3391void
3392broadcast (SV *self)
3393 CODE:
3394{
3395 AV *av = (AV *)SvRV (self);
3396 coro_signal_wake (aTHX_ av, AvFILLp (av));
3397}
3398
3399void
3400send (SV *self)
3401 CODE:
3402{
3403 AV *av = (AV *)SvRV (self);
3404
3405 if (AvFILLp (av))
3406 coro_signal_wake (aTHX_ av, 1);
3407 else
3408 SvIVX (AvARRAY (av)[0]) = 1; /* remember the signal */
3409}
3410
3411IV
3412awaited (SV *self)
3413 CODE:
3414 RETVAL = AvFILLp ((AV *)SvRV (self)) + 1 - 1;
3415 OUTPUT:
3416 RETVAL
3417
3418
3419MODULE = Coro::State PACKAGE = Coro::AnyEvent
3420
3421BOOT:
3422 sv_activity = coro_get_sv (aTHX_ "Coro::AnyEvent::ACTIVITY", TRUE);
3423
3424void
3425_schedule (...)
3426 CODE:
3427{
3428 static int incede;
3429
3430 api_cede_notself (aTHX);
3431
3432 ++incede;
3433 while (coro_nready >= incede && api_cede (aTHX))
3434 ;
3435
3436 sv_setsv (sv_activity, &PL_sv_undef);
3437 if (coro_nready >= incede)
3438 {
3439 PUSHMARK (SP);
3440 PUTBACK;
3441 call_pv ("Coro::AnyEvent::_activity", G_KEEPERR | G_EVAL | G_VOID | G_DISCARD);
3442 }
3443
3444 --incede;
3445}
3446
3447
3448MODULE = Coro::State PACKAGE = Coro::AIO
3449
3450void
3451_register (char *target, char *proto, SV *req)
3452 CODE:
3453{
3454 CV *req_cv = coro_sv_2cv (aTHX_ req);
3455 /* newXSproto doesn't return the CV on 5.8 */
3456 CV *slf_cv = newXS (target, coro_aio_req_xs, __FILE__);
3457 sv_setpv ((SV *)slf_cv, proto);
3458 sv_magicext ((SV *)slf_cv, (SV *)req_cv, CORO_MAGIC_type_aio, 0, 0, 0);
3459}
3460

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines