ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/Coro/Coro/State.xs
(Generate patch)

Comparing Coro/Coro/State.xs (file contents):
Revision 1.5 by root, Tue Jul 17 02:55:29 2001 UTC vs.
Revision 1.339 by root, Mon Dec 15 00:28:30 2008 UTC

1#include "libcoro/coro.c"
2
3#define PERL_NO_GET_CONTEXT
4#define PERL_EXT
5
1#include "EXTERN.h" 6#include "EXTERN.h"
2#include "perl.h" 7#include "perl.h"
3#include "XSUB.h" 8#include "XSUB.h"
9#include "perliol.h"
4 10
5#if 0 11#include "patchlevel.h"
6# define CHK(x) (void *)0 12
13#include <stdio.h>
14#include <errno.h>
15#include <assert.h>
16
17#ifdef WIN32
18# undef setjmp
19# undef longjmp
20# undef _exit
21# define setjmp _setjmp /* deep magic */
7#else 22#else
8# define CHK(x) if (!(x)) croak("FATAL, CHK: " #x) 23# include <inttypes.h> /* most portable stdint.h */
24#endif
25
26#ifdef HAVE_MMAP
27# include <unistd.h>
28# include <sys/mman.h>
29# ifndef MAP_ANONYMOUS
30# ifdef MAP_ANON
31# define MAP_ANONYMOUS MAP_ANON
32# else
33# undef HAVE_MMAP
34# endif
9#endif 35# endif
36# include <limits.h>
37# ifndef PAGESIZE
38# define PAGESIZE pagesize
39# define BOOT_PAGESIZE pagesize = sysconf (_SC_PAGESIZE)
40static long pagesize;
41# else
42# define BOOT_PAGESIZE (void)0
43# endif
44#else
45# define PAGESIZE 0
46# define BOOT_PAGESIZE (void)0
47#endif
10 48
49#if CORO_USE_VALGRIND
50# include <valgrind/valgrind.h>
51#endif
52
53/* the maximum number of idle cctx that will be pooled */
54static int cctx_max_idle = 4;
55
56#define PERL_VERSION_ATLEAST(a,b,c) \
57 (PERL_REVISION > (a) \
58 || (PERL_REVISION == (a) \
59 && (PERL_VERSION > (b) \
60 || (PERL_VERSION == (b) && PERL_SUBVERSION >= (c)))))
61
62#if !PERL_VERSION_ATLEAST (5,6,0)
63# ifndef PL_ppaddr
64# define PL_ppaddr ppaddr
65# endif
66# ifndef call_sv
67# define call_sv perl_call_sv
68# endif
69# ifndef get_sv
70# define get_sv perl_get_sv
71# endif
72# ifndef get_cv
73# define get_cv perl_get_cv
74# endif
75# ifndef IS_PADGV
76# define IS_PADGV(v) 0
77# endif
78# ifndef IS_PADCONST
79# define IS_PADCONST(v) 0
80# endif
81#endif
82
83/* 5.11 */
84#ifndef CxHASARGS
85# define CxHASARGS(cx) (cx)->blk_sub.hasargs
86#endif
87
88/* 5.10.0 */
89#ifndef SvREFCNT_inc_NN
90# define SvREFCNT_inc_NN(sv) SvREFCNT_inc (sv)
91#endif
92
93/* 5.8.8 */
94#ifndef GV_NOTQUAL
95# define GV_NOTQUAL 0
96#endif
97#ifndef newSV
98# define newSV(l) NEWSV(0,l)
99#endif
100#ifndef CvISXSUB_on
101# define CvISXSUB_on(cv) (void)cv
102#endif
103#ifndef CvISXSUB
104# define CvISXSUB(cv) (CvXSUB (cv) ? TRUE : FALSE)
105#endif
106#ifndef Newx
107# define Newx(ptr,nitems,type) New (0,ptr,nitems,type)
108#endif
109
110/* 5.8.7 */
111#ifndef SvRV_set
112# define SvRV_set(s,v) SvRV(s) = (v)
113#endif
114
115#if !__i386 && !__x86_64 && !__powerpc && !__m68k && !__alpha && !__mips && !__sparc64
116# undef CORO_STACKGUARD
117#endif
118
119#ifndef CORO_STACKGUARD
120# define CORO_STACKGUARD 0
121#endif
122
123/* prefer perl internal functions over our own? */
124#ifndef CORO_PREFER_PERL_FUNCTIONS
125# define CORO_PREFER_PERL_FUNCTIONS 0
126#endif
127
128/* The next macros try to return the current stack pointer, in an as
129 * portable way as possible. */
130#if __GNUC__ >= 4
131# define dSTACKLEVEL int stacklevel_dummy
132# define STACKLEVEL __builtin_frame_address (0)
133#else
134# define dSTACKLEVEL volatile void *stacklevel
135# define STACKLEVEL ((void *)&stacklevel)
136#endif
137
138#define IN_DESTRUCT PL_dirty
139
140#if __GNUC__ >= 3
141# define attribute(x) __attribute__(x)
142# define expect(expr,value) __builtin_expect ((expr),(value))
143# define INLINE static inline
144#else
145# define attribute(x)
146# define expect(expr,value) (expr)
147# define INLINE static
148#endif
149
150#define expect_false(expr) expect ((expr) != 0, 0)
151#define expect_true(expr) expect ((expr) != 0, 1)
152
153#define NOINLINE attribute ((noinline))
154
155#include "CoroAPI.h"
156#define GCoroAPI (&coroapi) /* very sneaky */
157
158#ifdef USE_ITHREADS
159# if CORO_PTHREAD
160static void *coro_thx;
161# endif
162#endif
163
164static double (*nvtime)(); /* so why doesn't it take void? */
165
166/* we hijack an hopefully unused CV flag for our purposes */
167#define CVf_SLF 0x4000
168static OP *pp_slf (pTHX);
169
170static U32 cctx_gen;
171static size_t cctx_stacksize = CORO_STACKSIZE;
172static struct CoroAPI coroapi;
173static AV *main_mainstack; /* used to differentiate between $main and others */
174static JMPENV *main_top_env;
175static HV *coro_state_stash, *coro_stash;
176static volatile SV *coro_mortal; /* will be freed/thrown after next transfer */
177
178static AV *av_destroy; /* destruction queue */
179static SV *sv_manager; /* the manager coro */
180static SV *sv_idle; /* $Coro::idle */
181
182static GV *irsgv; /* $/ */
183static GV *stdoutgv; /* *STDOUT */
184static SV *rv_diehook;
185static SV *rv_warnhook;
186static HV *hv_sig; /* %SIG */
187
188/* async_pool helper stuff */
189static SV *sv_pool_rss;
190static SV *sv_pool_size;
191static SV *sv_async_pool_idle; /* description string */
192static AV *av_async_pool; /* idle pool */
193static SV *sv_Coro; /* class string */
194static CV *cv_pool_handler;
195static CV *cv_coro_state_new;
196
197/* Coro::AnyEvent */
198static SV *sv_activity;
199
200static struct coro_cctx *cctx_first;
201static int cctx_count, cctx_idle;
202
203enum {
204 CC_MAPPED = 0x01,
205 CC_NOREUSE = 0x02, /* throw this away after tracing */
206 CC_TRACE = 0x04,
207 CC_TRACE_SUB = 0x08, /* trace sub calls */
208 CC_TRACE_LINE = 0x10, /* trace each statement */
209 CC_TRACE_ALL = CC_TRACE_SUB | CC_TRACE_LINE,
210};
211
212/* this is a structure representing a c-level coroutine */
213typedef struct coro_cctx
214{
215 struct coro_cctx *next;
216
217 /* the stack */
218 void *sptr;
219 size_t ssize;
220
221 /* cpu state */
222 void *idle_sp; /* sp of top-level transfer/schedule/cede call */
223 JMPENV *idle_te; /* same as idle_sp, but for top_env, TODO: remove once stable */
224 JMPENV *top_env;
225 coro_context cctx;
226
227 U32 gen;
228#if CORO_USE_VALGRIND
229 int valgrind_id;
230#endif
231 unsigned char flags;
232} coro_cctx;
233
234coro_cctx *cctx_current; /* the currently running cctx */
235
236/*****************************************************************************/
237
238enum {
239 CF_RUNNING = 0x0001, /* coroutine is running */
240 CF_READY = 0x0002, /* coroutine is ready */
241 CF_NEW = 0x0004, /* has never been switched to */
242 CF_DESTROYED = 0x0008, /* coroutine data has been freed */
243};
244
245/* the structure where most of the perl state is stored, overlaid on the cxstack */
246typedef struct
247{
248 SV *defsv;
249 AV *defav;
250 SV *errsv;
251 SV *irsgv;
252 HV *hinthv;
253#define VAR(name,type) type name;
254# include "state.h"
255#undef VAR
256} perl_slots;
257
258#define SLOT_COUNT ((sizeof (perl_slots) + sizeof (PERL_CONTEXT) - 1) / sizeof (PERL_CONTEXT))
259
260/* this is a structure representing a perl-level coroutine */
11struct coro { 261struct coro {
12 U8 dowarn; 262 /* the C coroutine allocated to this perl coroutine, if any */
13 AV *defav; 263 coro_cctx *cctx;
14 264
15 PERL_SI *curstackinfo; 265 /* state data */
16 AV *curstack; 266 struct CoroSLF slf_frame; /* saved slf frame */
17 AV *mainstack; 267 AV *mainstack;
18 SV **stack_sp; 268 perl_slots *slot; /* basically the saved sp */
19 OP *op;
20 SV **curpad;
21 SV **stack_base;
22 SV **stack_max;
23 SV **tmps_stack;
24 I32 tmps_floor;
25 I32 tmps_ix;
26 I32 tmps_max;
27 I32 *markstack;
28 I32 *markstack_ptr;
29 I32 *markstack_max;
30 I32 *scopestack;
31 I32 scopestack_ix;
32 I32 scopestack_max;
33 ANY *savestack;
34 I32 savestack_ix;
35 I32 savestack_max;
36 OP **retstack;
37 I32 retstack_ix;
38 I32 retstack_max;
39 COP *curcop;
40 269
41 AV *args; 270 CV *startcv; /* the CV to execute */
271 AV *args; /* data associated with this coroutine (initial args) */
272 int refcnt; /* coroutines are refcounted, yes */
273 int flags; /* CF_ flags */
274 HV *hv; /* the perl hash associated with this coro, if any */
275 void (*on_destroy)(pTHX_ struct coro *coro);
276
277 /* statistics */
278 int usecount; /* number of transfers to this coro */
279
280 /* coro process data */
281 int prio;
282 SV *except; /* exception to be thrown */
283 SV *rouse_cb;
284
285 /* async_pool */
286 SV *saved_deffh;
287 SV *invoke_cb;
288 AV *invoke_av;
289
290 /* on_enter/on_leave */
291 AV *on_enter;
292 AV *on_leave;
293
294 /* linked list */
295 struct coro *next, *prev;
42}; 296};
43 297
44typedef struct coro *Coro__State; 298typedef struct coro *Coro__State;
45typedef struct coro *Coro__State_or_hashref; 299typedef struct coro *Coro__State_or_hashref;
46 300
47static HV *padlist_cache; 301/* the following variables are effectively part of the perl context */
302/* and get copied between struct coro and these variables */
303/* the mainr easonw e don't support windows process emulation */
304static struct CoroSLF slf_frame; /* the current slf frame */
48 305
49/* mostly copied from op.c:cv_clone2 */ 306/** Coro ********************************************************************/
50STATIC AV * 307
51clone_padlist (AV *protopadlist) 308#define PRIO_MAX 3
309#define PRIO_HIGH 1
310#define PRIO_NORMAL 0
311#define PRIO_LOW -1
312#define PRIO_IDLE -3
313#define PRIO_MIN -4
314
315/* for Coro.pm */
316static SV *coro_current;
317static SV *coro_readyhook;
318static AV *coro_ready [PRIO_MAX - PRIO_MIN + 1];
319static CV *cv_coro_run, *cv_coro_terminate;
320static struct coro *coro_first;
321#define coro_nready coroapi.nready
322
323/** lowlevel stuff **********************************************************/
324
325static SV *
326coro_get_sv (pTHX_ const char *name, int create)
52{ 327{
53 AV *av; 328#if PERL_VERSION_ATLEAST (5,10,0)
54 I32 ix; 329 /* silence stupid and wrong 5.10 warning that I am unable to switch off */
55 AV *protopad_name = (AV *) * av_fetch (protopadlist, 0, FALSE); 330 get_sv (name, create);
56 AV *protopad = (AV *) * av_fetch (protopadlist, 1, FALSE); 331#endif
57 SV **pname = AvARRAY (protopad_name); 332 return get_sv (name, create);
58 SV **ppad = AvARRAY (protopad); 333}
59 I32 fname = AvFILLp (protopad_name); 334
60 I32 fpad = AvFILLp (protopad); 335static AV *
336coro_get_av (pTHX_ const char *name, int create)
337{
338#if PERL_VERSION_ATLEAST (5,10,0)
339 /* silence stupid and wrong 5.10 warning that I am unable to switch off */
340 get_av (name, create);
341#endif
342 return get_av (name, create);
343}
344
345static HV *
346coro_get_hv (pTHX_ const char *name, int create)
347{
348#if PERL_VERSION_ATLEAST (5,10,0)
349 /* silence stupid and wrong 5.10 warning that I am unable to switch off */
350 get_hv (name, create);
351#endif
352 return get_hv (name, create);
353}
354
355/* may croak */
356INLINE CV *
357coro_sv_2cv (pTHX_ SV *sv)
358{
359 HV *st;
360 GV *gvp;
361 CV *cv = sv_2cv (sv, &st, &gvp, 0);
362
363 if (!cv)
364 croak ("code reference expected");
365
366 return cv;
367}
368
369/*****************************************************************************/
370/* magic glue */
371
372#define CORO_MAGIC_type_cv 26
373#define CORO_MAGIC_type_state PERL_MAGIC_ext
374
375#define CORO_MAGIC_NN(sv, type) \
376 (expect_true (SvMAGIC (sv)->mg_type == type) \
377 ? SvMAGIC (sv) \
378 : mg_find (sv, type))
379
380#define CORO_MAGIC(sv, type) \
381 (expect_true (SvMAGIC (sv)) \
382 ? CORO_MAGIC_NN (sv, type) \
383 : 0)
384
385#define CORO_MAGIC_cv(cv) CORO_MAGIC (((SV *)(cv)), CORO_MAGIC_type_cv)
386#define CORO_MAGIC_state(sv) CORO_MAGIC_NN (((SV *)(sv)), CORO_MAGIC_type_state)
387
388INLINE struct coro *
389SvSTATE_ (pTHX_ SV *coro)
390{
391 HV *stash;
392 MAGIC *mg;
393
394 if (SvROK (coro))
395 coro = SvRV (coro);
396
397 if (expect_false (SvTYPE (coro) != SVt_PVHV))
398 croak ("Coro::State object required");
399
400 stash = SvSTASH (coro);
401 if (expect_false (stash != coro_stash && stash != coro_state_stash))
402 {
403 /* very slow, but rare, check */
404 if (!sv_derived_from (sv_2mortal (newRV_inc (coro)), "Coro::State"))
405 croak ("Coro::State object required");
406 }
407
408 mg = CORO_MAGIC_state (coro);
409 return (struct coro *)mg->mg_ptr;
410}
411
412#define SvSTATE(sv) SvSTATE_ (aTHX_ (sv))
413
414/* faster than SvSTATE, but expects a coroutine hv */
415#define SvSTATE_hv(hv) ((struct coro *)CORO_MAGIC_NN ((SV *)hv, CORO_MAGIC_type_state)->mg_ptr)
416#define SvSTATE_current SvSTATE_hv (SvRV (coro_current))
417
418/*****************************************************************************/
419/* padlist management and caching */
420
421static AV *
422coro_derive_padlist (pTHX_ CV *cv)
423{
424 AV *padlist = CvPADLIST (cv);
61 AV *newpadlist, *newpad_name, *newpad; 425 AV *newpadlist, *newpad;
62 SV **npad;
63
64 newpad_name = newAV ();
65 for (ix = fname; ix >= 0; ix--)
66 av_store (newpad_name, ix, SvREFCNT_inc (pname[ix]));
67
68 newpad = newAV ();
69 av_fill (newpad, AvFILLp (protopad));
70 npad = AvARRAY (newpad);
71 426
72 newpadlist = newAV (); 427 newpadlist = newAV ();
73 AvREAL_off (newpadlist); 428 AvREAL_off (newpadlist);
74 av_store (newpadlist, 0, (SV *) newpad_name); 429#if PERL_VERSION_ATLEAST (5,10,0)
430 Perl_pad_push (aTHX_ padlist, AvFILLp (padlist) + 1);
431#else
432 Perl_pad_push (aTHX_ padlist, AvFILLp (padlist) + 1, 1);
433#endif
434 newpad = (AV *)AvARRAY (padlist)[AvFILLp (padlist)];
435 --AvFILLp (padlist);
436
437 av_store (newpadlist, 0, SvREFCNT_inc_NN (AvARRAY (padlist)[0]));
75 av_store (newpadlist, 1, (SV *) newpad); 438 av_store (newpadlist, 1, (SV *)newpad);
76 439
77 av = newAV (); /* will be @_ */ 440 return newpadlist;
78 av_extend (av, 0); 441}
79 av_store (newpad, 0, (SV *) av);
80 AvFLAGS (av) = AVf_REIFY;
81 442
82 for (ix = fpad; ix > 0; ix--) 443static void
444free_padlist (pTHX_ AV *padlist)
445{
446 /* may be during global destruction */
447 if (!IN_DESTRUCT)
83 { 448 {
84 SV *namesv = (ix <= fname) ? pname[ix] : Nullsv; 449 I32 i = AvFILLp (padlist);
85 if (namesv && namesv != &PL_sv_undef) 450
451 while (i > 0) /* special-case index 0 */
86 { 452 {
87 char *name = SvPVX (namesv); /* XXX */ 453 /* we try to be extra-careful here */
88 if (SvFLAGS (namesv) & SVf_FAKE || *name == '&') 454 AV *av = (AV *)AvARRAY (padlist)[i--];
89 { /* lexical from outside? */ 455 I32 j = AvFILLp (av);
90 npad[ix] = SvREFCNT_inc (ppad[ix]); 456
91 } 457 while (j >= 0)
92 else 458 SvREFCNT_dec (AvARRAY (av)[j--]);
93 { /* our own lexical */ 459
94 SV *sv; 460 AvFILLp (av) = -1;
95 if (*name == '&') 461 SvREFCNT_dec (av);
96 sv = SvREFCNT_inc (ppad[ix]);
97 else if (*name == '@')
98 sv = (SV *) newAV ();
99 else if (*name == '%')
100 sv = (SV *) newHV ();
101 else
102 sv = NEWSV (0, 0);
103 if (!SvPADBUSY (sv))
104 SvPADMY_on (sv);
105 npad[ix] = sv;
106 }
107 } 462 }
108 else if (IS_PADGV (ppad[ix]) || IS_PADCONST (ppad[ix]))
109 {
110 npad[ix] = SvREFCNT_inc (ppad[ix]);
111 }
112 else
113 {
114 SV *sv = NEWSV (0, 0);
115 SvPADTMP_on (sv);
116 npad[ix] = sv;
117 }
118 }
119 463
120#if 0 /* NONOTUNDERSTOOD */ 464 SvREFCNT_dec (AvARRAY (padlist)[0]);
121 /* Now that vars are all in place, clone nested closures. */
122 465
123 for (ix = fpad; ix > 0; ix--) {
124 SV* namesv = (ix <= fname) ? pname[ix] : Nullsv;
125 if (namesv
126 && namesv != &PL_sv_undef
127 && !(SvFLAGS(namesv) & SVf_FAKE)
128 && *SvPVX(namesv) == '&'
129 && CvCLONE(ppad[ix]))
130 {
131 CV *kid = cv_clone((CV*)ppad[ix]);
132 SvREFCNT_dec(ppad[ix]);
133 CvCLONE_on(kid);
134 SvPADMY_on(kid);
135 npad[ix] = (SV*)kid;
136 }
137 }
138#endif
139
140 return newpadlist;
141}
142
143STATIC AV *
144free_padlist (AV *padlist)
145{
146 /* may be during global destruction */
147 if (SvREFCNT(padlist))
148 {
149 I32 i = AvFILLp(padlist); 466 AvFILLp (padlist) = -1;
150 while (i >= 0)
151 {
152 SV **svp = av_fetch(padlist, i--, FALSE);
153 SV *sv = svp ? *svp : Nullsv;
154 if (sv)
155 SvREFCNT_dec(sv);
156 }
157
158 SvREFCNT_dec((SV*)padlist); 467 SvREFCNT_dec ((SV*)padlist);
468 }
469}
470
471static int
472coro_cv_free (pTHX_ SV *sv, MAGIC *mg)
473{
474 AV *padlist;
475 AV *av = (AV *)mg->mg_obj;
476
477 /* casting is fun. */
478 while (&PL_sv_undef != (SV *)(padlist = (AV *)av_pop (av)))
479 free_padlist (aTHX_ padlist);
480
481 SvREFCNT_dec (av); /* sv_magicext increased the refcount */
482
483 return 0;
484}
485
486static MGVTBL coro_cv_vtbl = {
487 0, 0, 0, 0,
488 coro_cv_free
489};
490
491/* the next two functions merely cache the padlists */
492static void
493get_padlist (pTHX_ CV *cv)
494{
495 MAGIC *mg = CORO_MAGIC_cv (cv);
496 AV *av;
497
498 if (expect_true (mg && AvFILLp ((av = (AV *)mg->mg_obj)) >= 0))
499 CvPADLIST (cv) = (AV *)AvARRAY (av)[AvFILLp (av)--];
500 else
501 {
502#if CORO_PREFER_PERL_FUNCTIONS
503 /* this is probably cleaner? but also slower! */
504 /* in practise, it seems to be less stable */
505 CV *cp = Perl_cv_clone (aTHX_ cv);
506 CvPADLIST (cv) = CvPADLIST (cp);
507 CvPADLIST (cp) = 0;
508 SvREFCNT_dec (cp);
509#else
510 CvPADLIST (cv) = coro_derive_padlist (aTHX_ cv);
511#endif
512 }
513}
514
515static void
516put_padlist (pTHX_ CV *cv)
517{
518 MAGIC *mg = CORO_MAGIC_cv (cv);
519 AV *av;
520
521 if (expect_false (!mg))
522 mg = sv_magicext ((SV *)cv, (SV *)newAV (), CORO_MAGIC_type_cv, &coro_cv_vtbl, 0, 0);
523
524 av = (AV *)mg->mg_obj;
525
526 if (expect_false (AvFILLp (av) >= AvMAX (av)))
527 av_extend (av, AvFILLp (av) + 1);
528
529 AvARRAY (av)[++AvFILLp (av)] = (SV *)CvPADLIST (cv);
530}
531
532/** load & save, init *******************************************************/
533
534static void
535on_enterleave_call (pTHX_ SV *cb);
536
537static void
538load_perl (pTHX_ Coro__State c)
539{
540 perl_slots *slot = c->slot;
541 c->slot = 0;
542
543 PL_mainstack = c->mainstack;
544
545 GvSV (PL_defgv) = slot->defsv;
546 GvAV (PL_defgv) = slot->defav;
547 GvSV (PL_errgv) = slot->errsv;
548 GvSV (irsgv) = slot->irsgv;
549 GvHV (PL_hintgv) = slot->hinthv;
550
551 #define VAR(name,type) PL_ ## name = slot->name;
552 # include "state.h"
553 #undef VAR
554
555 {
556 dSP;
557
558 CV *cv;
559
560 /* now do the ugly restore mess */
561 while (expect_true (cv = (CV *)POPs))
562 {
563 put_padlist (aTHX_ cv); /* mark this padlist as available */
564 CvDEPTH (cv) = PTR2IV (POPs);
565 CvPADLIST (cv) = (AV *)POPs;
566 }
567
568 PUTBACK;
159 } 569 }
160}
161 570
162/* the next tow functions merely cache the padlists */ 571 slf_frame = c->slf_frame;
163STATIC void 572 CORO_THROW = c->except;
164get_padlist (CV *cv)
165{
166 SV **he = hv_fetch (padlist_cache, (void *)&cv, sizeof (CV *), 0);
167 573
168 if (he && AvFILLp ((AV *)*he) >= 0) 574 if (expect_false (c->on_enter))
169 CvPADLIST (cv) = (AV *)av_pop ((AV *)*he);
170 else
171 CvPADLIST (cv) = clone_padlist (CvPADLIST (cv));
172}
173
174STATIC void
175put_padlist (CV *cv)
176{
177 SV **he = hv_fetch (padlist_cache, (void *)&cv, sizeof (CV *), 1);
178
179 if (SvTYPE (*he) != SVt_PVAV)
180 {
181 SvREFCNT_dec (*he);
182 *he = (SV *)newAV ();
183 } 575 {
576 int i;
184 577
185 av_push ((AV *)*he, (SV *)CvPADLIST (cv)); 578 for (i = 0; i <= AvFILLp (c->on_enter); ++i)
579 on_enterleave_call (AvARRAY (c->on_enter)[i]);
580 }
186} 581}
187 582
188static void 583static void
189SAVE(pTHX_ Coro__State c) 584save_perl (pTHX_ Coro__State c)
190{ 585{
586 if (expect_false (c->on_leave))
587 {
588 int i;
589
590 for (i = AvFILLp (c->on_leave); i >= 0; --i)
591 on_enterleave_call (AvARRAY (c->on_leave)[i]);
592 }
593
594 c->except = CORO_THROW;
595 c->slf_frame = slf_frame;
596
191 { 597 {
192 dSP; 598 dSP;
193 I32 cxix = cxstack_ix; 599 I32 cxix = cxstack_ix;
600 PERL_CONTEXT *ccstk = cxstack;
194 PERL_SI *top_si = PL_curstackinfo; 601 PERL_SI *top_si = PL_curstackinfo;
195 PERL_CONTEXT *ccstk = cxstack;
196 602
197 /* 603 /*
198 * the worst thing you can imagine happens first - we have to save 604 * the worst thing you can imagine happens first - we have to save
199 * (and reinitialize) all cv's in the whole callchain :( 605 * (and reinitialize) all cv's in the whole callchain :(
200 */ 606 */
201 607
202 PUSHs (Nullsv); 608 XPUSHs (Nullsv);
203 /* this loop was inspired by pp_caller */ 609 /* this loop was inspired by pp_caller */
204 for (;;) 610 for (;;)
205 { 611 {
206 while (cxix >= 0) 612 while (expect_true (cxix >= 0))
207 { 613 {
208 PERL_CONTEXT *cx = &ccstk[cxix--]; 614 PERL_CONTEXT *cx = &ccstk[cxix--];
209 615
210 if (CxTYPE(cx) == CXt_SUB) 616 if (expect_true (CxTYPE (cx) == CXt_SUB || CxTYPE (cx) == CXt_FORMAT))
211 { 617 {
212 CV *cv = cx->blk_sub.cv; 618 CV *cv = cx->blk_sub.cv;
619
213 if (CvDEPTH(cv)) 620 if (expect_true (CvDEPTH (cv)))
214 { 621 {
215#ifdef USE_THREADS
216 XPUSHs ((SV *)CvOWNER(cv));
217#endif
218 EXTEND (SP, 3); 622 EXTEND (SP, 3);
219 PUSHs ((SV *)CvDEPTH(cv));
220 PUSHs ((SV *)CvPADLIST(cv)); 623 PUSHs ((SV *)CvPADLIST (cv));
624 PUSHs (INT2PTR (SV *, (IV)CvDEPTH (cv)));
221 PUSHs ((SV *)cv); 625 PUSHs ((SV *)cv);
222 626
223 get_padlist (cv);
224
225 CvDEPTH(cv) = 0; 627 CvDEPTH (cv) = 0;
226#ifdef USE_THREADS 628 get_padlist (aTHX_ cv);
227 CvOWNER(cv) = 0;
228 error must unlock this cv etc.. etc...
229 if you are here wondering about this error message then
230 the reason is that it will not work as advertised yet
231#endif
232 } 629 }
233 } 630 }
234 else if (CxTYPE(cx) == CXt_FORMAT) 631 }
632
633 if (expect_true (top_si->si_type == PERLSI_MAIN))
634 break;
635
636 top_si = top_si->si_prev;
637 ccstk = top_si->si_cxstack;
638 cxix = top_si->si_cxix;
639 }
640
641 PUTBACK;
642 }
643
644 /* allocate some space on the context stack for our purposes */
645 /* we manually unroll here, as usually 2 slots is enough */
646 if (SLOT_COUNT >= 1) CXINC;
647 if (SLOT_COUNT >= 2) CXINC;
648 if (SLOT_COUNT >= 3) CXINC;
649 {
650 int i;
651 for (i = 3; i < SLOT_COUNT; ++i)
652 CXINC;
653 }
654 cxstack_ix -= SLOT_COUNT; /* undo allocation */
655
656 c->mainstack = PL_mainstack;
657
658 {
659 perl_slots *slot = c->slot = (perl_slots *)(cxstack + cxstack_ix + 1);
660
661 slot->defav = GvAV (PL_defgv);
662 slot->defsv = DEFSV;
663 slot->errsv = ERRSV;
664 slot->irsgv = GvSV (irsgv);
665 slot->hinthv = GvHV (PL_hintgv);
666
667 #define VAR(name,type) slot->name = PL_ ## name;
668 # include "state.h"
669 #undef VAR
670 }
671}
672
673/*
674 * allocate various perl stacks. This is almost an exact copy
675 * of perl.c:init_stacks, except that it uses less memory
676 * on the (sometimes correct) assumption that coroutines do
677 * not usually need a lot of stackspace.
678 */
679#if CORO_PREFER_PERL_FUNCTIONS
680# define coro_init_stacks(thx) init_stacks ()
681#else
682static void
683coro_init_stacks (pTHX)
684{
685 PL_curstackinfo = new_stackinfo(32, 8);
686 PL_curstackinfo->si_type = PERLSI_MAIN;
687 PL_curstack = PL_curstackinfo->si_stack;
688 PL_mainstack = PL_curstack; /* remember in case we switch stacks */
689
690 PL_stack_base = AvARRAY(PL_curstack);
691 PL_stack_sp = PL_stack_base;
692 PL_stack_max = PL_stack_base + AvMAX(PL_curstack);
693
694 New(50,PL_tmps_stack,32,SV*);
695 PL_tmps_floor = -1;
696 PL_tmps_ix = -1;
697 PL_tmps_max = 32;
698
699 New(54,PL_markstack,16,I32);
700 PL_markstack_ptr = PL_markstack;
701 PL_markstack_max = PL_markstack + 16;
702
703#ifdef SET_MARK_OFFSET
704 SET_MARK_OFFSET;
705#endif
706
707 New(54,PL_scopestack,8,I32);
708 PL_scopestack_ix = 0;
709 PL_scopestack_max = 8;
710
711 New(54,PL_savestack,24,ANY);
712 PL_savestack_ix = 0;
713 PL_savestack_max = 24;
714
715#if !PERL_VERSION_ATLEAST (5,10,0)
716 New(54,PL_retstack,4,OP*);
717 PL_retstack_ix = 0;
718 PL_retstack_max = 4;
719#endif
720}
721#endif
722
723/*
724 * destroy the stacks, the callchain etc...
725 */
726static void
727coro_destruct_stacks (pTHX)
728{
729 while (PL_curstackinfo->si_next)
730 PL_curstackinfo = PL_curstackinfo->si_next;
731
732 while (PL_curstackinfo)
733 {
734 PERL_SI *p = PL_curstackinfo->si_prev;
735
736 if (!IN_DESTRUCT)
737 SvREFCNT_dec (PL_curstackinfo->si_stack);
738
739 Safefree (PL_curstackinfo->si_cxstack);
740 Safefree (PL_curstackinfo);
741 PL_curstackinfo = p;
742 }
743
744 Safefree (PL_tmps_stack);
745 Safefree (PL_markstack);
746 Safefree (PL_scopestack);
747 Safefree (PL_savestack);
748#if !PERL_VERSION_ATLEAST (5,10,0)
749 Safefree (PL_retstack);
750#endif
751}
752
753#define CORO_RSS \
754 rss += sizeof (SYM (curstackinfo)); \
755 rss += (SYM (curstackinfo->si_cxmax) + 1) * sizeof (PERL_CONTEXT); \
756 rss += sizeof (SV) + sizeof (struct xpvav) + (1 + AvMAX (SYM (curstack))) * sizeof (SV *); \
757 rss += SYM (tmps_max) * sizeof (SV *); \
758 rss += (SYM (markstack_max) - SYM (markstack_ptr)) * sizeof (I32); \
759 rss += SYM (scopestack_max) * sizeof (I32); \
760 rss += SYM (savestack_max) * sizeof (ANY);
761
762static size_t
763coro_rss (pTHX_ struct coro *coro)
764{
765 size_t rss = sizeof (*coro);
766
767 if (coro->mainstack)
768 {
769 if (coro->flags & CF_RUNNING)
770 {
771 #define SYM(sym) PL_ ## sym
772 CORO_RSS;
773 #undef SYM
774 }
775 else
776 {
777 #define SYM(sym) coro->slot->sym
778 CORO_RSS;
779 #undef SYM
780 }
781 }
782
783 return rss;
784}
785
786/** coroutine stack handling ************************************************/
787
788static int (*orig_sigelem_get) (pTHX_ SV *sv, MAGIC *mg);
789static int (*orig_sigelem_set) (pTHX_ SV *sv, MAGIC *mg);
790static int (*orig_sigelem_clr) (pTHX_ SV *sv, MAGIC *mg);
791
792/* apparently < 5.8.8 */
793#ifndef MgPV_nolen_const
794#define MgPV_nolen_const(mg) (((((int)(mg)->mg_len)) == HEf_SVKEY) ? \
795 SvPV_nolen((SV*)((mg)->mg_ptr)) : \
796 (const char*)(mg)->mg_ptr)
797#endif
798
799/*
800 * This overrides the default magic get method of %SIG elements.
801 * The original one doesn't provide for reading back of PL_diehook/PL_warnhook
802 * and instead of trying to save and restore the hash elements, we just provide
803 * readback here.
804 */
805static int
806coro_sigelem_get (pTHX_ SV *sv, MAGIC *mg)
807{
808 const char *s = MgPV_nolen_const (mg);
809
810 if (*s == '_')
811 {
812 SV **svp = 0;
813
814 if (strEQ (s, "__DIE__" )) svp = &PL_diehook;
815 if (strEQ (s, "__WARN__")) svp = &PL_warnhook;
816
817 if (svp)
818 {
819 sv_setsv (sv, *svp ? *svp : &PL_sv_undef);
820 return 0;
821 }
822 }
823
824 return orig_sigelem_get ? orig_sigelem_get (aTHX_ sv, mg) : 0;
825}
826
827static int
828coro_sigelem_clr (pTHX_ SV *sv, MAGIC *mg)
829{
830 const char *s = MgPV_nolen_const (mg);
831
832 if (*s == '_')
833 {
834 SV **svp = 0;
835
836 if (strEQ (s, "__DIE__" )) svp = &PL_diehook;
837 if (strEQ (s, "__WARN__")) svp = &PL_warnhook;
838
839 if (svp)
840 {
841 SV *old = *svp;
842 *svp = 0;
843 SvREFCNT_dec (old);
844 return 0;
845 }
846 }
847
848 return orig_sigelem_clr ? orig_sigelem_clr (aTHX_ sv, mg) : 0;
849}
850
851static int
852coro_sigelem_set (pTHX_ SV *sv, MAGIC *mg)
853{
854 const char *s = MgPV_nolen_const (mg);
855
856 if (*s == '_')
857 {
858 SV **svp = 0;
859
860 if (strEQ (s, "__DIE__" )) svp = &PL_diehook;
861 if (strEQ (s, "__WARN__")) svp = &PL_warnhook;
862
863 if (svp)
864 {
865 SV *old = *svp;
866 *svp = SvOK (sv) ? newSVsv (sv) : 0;
867 SvREFCNT_dec (old);
868 return 0;
869 }
870 }
871
872 return orig_sigelem_set ? orig_sigelem_set (aTHX_ sv, mg) : 0;
873}
874
875static void
876prepare_nop (pTHX_ struct coro_transfer_args *ta)
877{
878 /* kind of mega-hacky, but works */
879 ta->next = ta->prev = (struct coro *)ta;
880}
881
882static int
883slf_check_nop (pTHX_ struct CoroSLF *frame)
884{
885 return 0;
886}
887
888static int
889slf_check_repeat (pTHX_ struct CoroSLF *frame)
890{
891 return 1;
892}
893
894static UNOP coro_setup_op;
895
896static void NOINLINE /* noinline to keep it out of the transfer fast path */
897coro_setup (pTHX_ struct coro *coro)
898{
899 /*
900 * emulate part of the perl startup here.
901 */
902 coro_init_stacks (aTHX);
903
904 PL_runops = RUNOPS_DEFAULT;
905 PL_curcop = &PL_compiling;
906 PL_in_eval = EVAL_NULL;
907 PL_comppad = 0;
908 PL_comppad_name = 0;
909 PL_comppad_name_fill = 0;
910 PL_comppad_name_floor = 0;
911 PL_curpm = 0;
912 PL_curpad = 0;
913 PL_localizing = 0;
914 PL_dirty = 0;
915 PL_restartop = 0;
916#if PERL_VERSION_ATLEAST (5,10,0)
917 PL_parser = 0;
918#endif
919 PL_hints = 0;
920
921 /* recreate the die/warn hooks */
922 PL_diehook = 0; SvSetMagicSV (*hv_fetch (hv_sig, "__DIE__" , sizeof ("__DIE__" ) - 1, 1), rv_diehook );
923 PL_warnhook = 0; SvSetMagicSV (*hv_fetch (hv_sig, "__WARN__", sizeof ("__WARN__") - 1, 1), rv_warnhook);
924
925 GvSV (PL_defgv) = newSV (0);
926 GvAV (PL_defgv) = coro->args; coro->args = 0;
927 GvSV (PL_errgv) = newSV (0);
928 GvSV (irsgv) = newSVpvn ("\n", 1); sv_magic (GvSV (irsgv), (SV *)irsgv, PERL_MAGIC_sv, "/", 0);
929 GvHV (PL_hintgv) = 0;
930 PL_rs = newSVsv (GvSV (irsgv));
931 PL_defoutgv = (GV *)SvREFCNT_inc_NN (stdoutgv);
932
933 {
934 dSP;
935 UNOP myop;
936
937 Zero (&myop, 1, UNOP);
938 myop.op_next = Nullop;
939 myop.op_type = OP_ENTERSUB;
940 myop.op_flags = OPf_WANT_VOID;
941
942 PUSHMARK (SP);
943 PUSHs ((SV *)coro->startcv);
944 PUTBACK;
945 PL_op = (OP *)&myop;
946 PL_op = PL_ppaddr[OP_ENTERSUB](aTHX);
947 }
948
949 /* this newly created coroutine might be run on an existing cctx which most
950 * likely was suspended in pp_slf, so we have to emulate entering pp_slf here.
951 */
952 slf_frame.prepare = prepare_nop; /* provide a nop function for an eventual pp_slf */
953 slf_frame.check = slf_check_nop; /* signal pp_slf to not repeat */
954
955 /* and we have to provide the pp_slf op in any case, so pp_slf can skip it */
956 coro_setup_op.op_next = PL_op;
957 coro_setup_op.op_type = OP_ENTERSUB;
958 coro_setup_op.op_ppaddr = pp_slf;
959 /* no flags etc. required, as an init function won't be called */
960
961 PL_op = (OP *)&coro_setup_op;
962
963 /* copy throw, in case it was set before coro_setup */
964 CORO_THROW = coro->except;
965}
966
967static void
968coro_unwind_stacks (pTHX)
969{
970 if (!IN_DESTRUCT)
971 {
972 /* restore all saved variables and stuff */
973 LEAVE_SCOPE (0);
974 assert (PL_tmps_floor == -1);
975
976 /* free all temporaries */
977 FREETMPS;
978 assert (PL_tmps_ix == -1);
979
980 /* unwind all extra stacks */
981 POPSTACK_TO (PL_mainstack);
982
983 /* unwind main stack */
984 dounwind (-1);
985 }
986}
987
988static void
989coro_destruct_perl (pTHX_ struct coro *coro)
990{
991 coro_unwind_stacks (aTHX);
992
993 SvREFCNT_dec (GvSV (PL_defgv));
994 SvREFCNT_dec (GvAV (PL_defgv));
995 SvREFCNT_dec (GvSV (PL_errgv));
996 SvREFCNT_dec (PL_defoutgv);
997 SvREFCNT_dec (PL_rs);
998 SvREFCNT_dec (GvSV (irsgv));
999 SvREFCNT_dec (GvHV (PL_hintgv));
1000
1001 SvREFCNT_dec (PL_diehook);
1002 SvREFCNT_dec (PL_warnhook);
1003
1004 SvREFCNT_dec (coro->saved_deffh);
1005 SvREFCNT_dec (coro->rouse_cb);
1006 SvREFCNT_dec (coro->invoke_cb);
1007 SvREFCNT_dec (coro->invoke_av);
1008
1009 coro_destruct_stacks (aTHX);
1010}
1011
1012INLINE void
1013free_coro_mortal (pTHX)
1014{
1015 if (expect_true (coro_mortal))
1016 {
1017 SvREFCNT_dec (coro_mortal);
1018 coro_mortal = 0;
1019 }
1020}
1021
1022static int
1023runops_trace (pTHX)
1024{
1025 COP *oldcop = 0;
1026 int oldcxix = -2;
1027
1028 while ((PL_op = CALL_FPTR (PL_op->op_ppaddr) (aTHX)))
1029 {
1030 PERL_ASYNC_CHECK ();
1031
1032 if (cctx_current->flags & CC_TRACE_ALL)
1033 {
1034 if (PL_op->op_type == OP_LEAVESUB && cctx_current->flags & CC_TRACE_SUB)
1035 {
1036 PERL_CONTEXT *cx = &cxstack[cxstack_ix];
1037 SV **bot, **top;
1038 AV *av = newAV (); /* return values */
1039 SV **cb;
1040 dSP;
1041
1042 GV *gv = CvGV (cx->blk_sub.cv);
1043 SV *fullname = sv_2mortal (newSV (0));
1044 if (isGV (gv))
1045 gv_efullname3 (fullname, gv, 0);
1046
1047 bot = PL_stack_base + cx->blk_oldsp + 1;
1048 top = cx->blk_gimme == G_ARRAY ? SP + 1
1049 : cx->blk_gimme == G_SCALAR ? bot + 1
1050 : bot;
1051
1052 av_extend (av, top - bot);
1053 while (bot < top)
1054 av_push (av, SvREFCNT_inc_NN (*bot++));
1055
1056 PL_runops = RUNOPS_DEFAULT;
1057 ENTER;
1058 SAVETMPS;
1059 EXTEND (SP, 3);
1060 PUSHMARK (SP);
1061 PUSHs (&PL_sv_no);
1062 PUSHs (fullname);
1063 PUSHs (sv_2mortal (newRV_noinc ((SV *)av)));
1064 PUTBACK;
1065 cb = hv_fetch ((HV *)SvRV (coro_current), "_trace_sub_cb", sizeof ("_trace_sub_cb") - 1, 0);
1066 if (cb) call_sv (*cb, G_KEEPERR | G_EVAL | G_VOID | G_DISCARD);
1067 SPAGAIN;
1068 FREETMPS;
1069 LEAVE;
1070 PL_runops = runops_trace;
1071 }
1072
1073 if (oldcop != PL_curcop)
1074 {
1075 oldcop = PL_curcop;
1076
1077 if (PL_curcop != &PL_compiling)
1078 {
1079 SV **cb;
1080
1081 if (oldcxix != cxstack_ix && cctx_current->flags & CC_TRACE_SUB)
1082 {
1083 PERL_CONTEXT *cx = &cxstack[cxstack_ix];
1084
1085 if (CxTYPE (cx) == CXt_SUB && oldcxix < cxstack_ix)
1086 {
1087 dSP;
1088 GV *gv = CvGV (cx->blk_sub.cv);
1089 SV *fullname = sv_2mortal (newSV (0));
1090
1091 if (isGV (gv))
1092 gv_efullname3 (fullname, gv, 0);
1093
1094 PL_runops = RUNOPS_DEFAULT;
1095 ENTER;
1096 SAVETMPS;
1097 EXTEND (SP, 3);
1098 PUSHMARK (SP);
1099 PUSHs (&PL_sv_yes);
1100 PUSHs (fullname);
1101 PUSHs (CxHASARGS (cx) ? sv_2mortal (newRV_inc ((SV *)cx->blk_sub.argarray)) : &PL_sv_undef);
1102 PUTBACK;
1103 cb = hv_fetch ((HV *)SvRV (coro_current), "_trace_sub_cb", sizeof ("_trace_sub_cb") - 1, 0);
1104 if (cb) call_sv (*cb, G_KEEPERR | G_EVAL | G_VOID | G_DISCARD);
1105 SPAGAIN;
1106 FREETMPS;
1107 LEAVE;
1108 PL_runops = runops_trace;
1109 }
1110
1111 oldcxix = cxstack_ix;
1112 }
1113
1114 if (cctx_current->flags & CC_TRACE_LINE)
1115 {
1116 dSP;
1117
1118 PL_runops = RUNOPS_DEFAULT;
1119 ENTER;
1120 SAVETMPS;
1121 EXTEND (SP, 3);
1122 PL_runops = RUNOPS_DEFAULT;
1123 PUSHMARK (SP);
1124 PUSHs (sv_2mortal (newSVpv (OutCopFILE (oldcop), 0)));
1125 PUSHs (sv_2mortal (newSViv (CopLINE (oldcop))));
1126 PUTBACK;
1127 cb = hv_fetch ((HV *)SvRV (coro_current), "_trace_line_cb", sizeof ("_trace_line_cb") - 1, 0);
1128 if (cb) call_sv (*cb, G_KEEPERR | G_EVAL | G_VOID | G_DISCARD);
1129 SPAGAIN;
1130 FREETMPS;
1131 LEAVE;
1132 PL_runops = runops_trace;
1133 }
1134 }
1135 }
1136 }
1137 }
1138
1139 TAINT_NOT;
1140 return 0;
1141}
1142
1143static struct CoroSLF cctx_ssl_frame;
1144
1145static void
1146slf_prepare_set_stacklevel (pTHX_ struct coro_transfer_args *ta)
1147{
1148 ta->prev = 0;
1149}
1150
1151static int
1152slf_check_set_stacklevel (pTHX_ struct CoroSLF *frame)
1153{
1154 *frame = cctx_ssl_frame;
1155
1156 return frame->check (aTHX_ frame); /* execute the restored frame - there must be one */
1157}
1158
1159/* initialises PL_top_env and injects a pseudo-slf-call to set the stacklevel */
1160static void NOINLINE
1161cctx_prepare (pTHX)
1162{
1163 PL_top_env = &PL_start_env;
1164
1165 if (cctx_current->flags & CC_TRACE)
1166 PL_runops = runops_trace;
1167
1168 /* we already must be executing an SLF op, there is no other valid way
1169 * that can lead to creation of a new cctx */
1170 assert (("FATAL: can't prepare slf-less cctx in Coro module (please report)",
1171 slf_frame.prepare && PL_op->op_ppaddr == pp_slf));
1172
1173 /* we must emulate leaving pp_slf, which is done inside slf_check_set_stacklevel */
1174 cctx_ssl_frame = slf_frame;
1175
1176 slf_frame.prepare = slf_prepare_set_stacklevel;
1177 slf_frame.check = slf_check_set_stacklevel;
1178}
1179
1180/* the tail of transfer: execute stuff we can only do after a transfer */
1181INLINE void
1182transfer_tail (pTHX)
1183{
1184 free_coro_mortal (aTHX);
1185}
1186
1187/*
1188 * this is a _very_ stripped down perl interpreter ;)
1189 */
1190static void
1191cctx_run (void *arg)
1192{
1193#ifdef USE_ITHREADS
1194# if CORO_PTHREAD
1195 PERL_SET_CONTEXT (coro_thx);
1196# endif
1197#endif
1198 {
1199 dTHX;
1200
1201 /* normally we would need to skip the entersub here */
1202 /* not doing so will re-execute it, which is exactly what we want */
1203 /* PL_nop = PL_nop->op_next */
1204
1205 /* inject a fake subroutine call to cctx_init */
1206 cctx_prepare (aTHX);
1207
1208 /* cctx_run is the alternative tail of transfer() */
1209 transfer_tail (aTHX);
1210
1211 /* somebody or something will hit me for both perl_run and PL_restartop */
1212 PL_restartop = PL_op;
1213 perl_run (PL_curinterp);
1214 /*
1215 * Unfortunately, there is no way to get at the return values of the
1216 * coro body here, as perl_run destroys these
1217 */
1218
1219 /*
1220 * If perl-run returns we assume exit() was being called or the coro
1221 * fell off the end, which seems to be the only valid (non-bug)
1222 * reason for perl_run to return. We try to exit by jumping to the
1223 * bootstrap-time "top" top_env, as we cannot restore the "main"
1224 * coroutine as Coro has no such concept.
1225 * This actually isn't valid with the pthread backend, but OSes requiring
1226 * that backend are too broken to do it in a standards-compliant way.
1227 */
1228 PL_top_env = main_top_env;
1229 JMPENV_JUMP (2); /* I do not feel well about the hardcoded 2 at all */
1230 }
1231}
1232
1233static coro_cctx *
1234cctx_new ()
1235{
1236 coro_cctx *cctx;
1237
1238 ++cctx_count;
1239 New (0, cctx, 1, coro_cctx);
1240
1241 cctx->gen = cctx_gen;
1242 cctx->flags = 0;
1243 cctx->idle_sp = 0; /* can be accessed by transfer between cctx_run and set_stacklevel, on throw */
1244
1245 return cctx;
1246}
1247
1248/* create a new cctx only suitable as source */
1249static coro_cctx *
1250cctx_new_empty ()
1251{
1252 coro_cctx *cctx = cctx_new ();
1253
1254 cctx->sptr = 0;
1255 coro_create (&cctx->cctx, 0, 0, 0, 0);
1256
1257 return cctx;
1258}
1259
1260/* create a new cctx suitable as destination/running a perl interpreter */
1261static coro_cctx *
1262cctx_new_run ()
1263{
1264 coro_cctx *cctx = cctx_new ();
1265 void *stack_start;
1266 size_t stack_size;
1267
1268#if HAVE_MMAP
1269 cctx->ssize = ((cctx_stacksize * sizeof (long) + PAGESIZE - 1) / PAGESIZE + CORO_STACKGUARD) * PAGESIZE;
1270 /* mmap supposedly does allocate-on-write for us */
1271 cctx->sptr = mmap (0, cctx->ssize, PROT_EXEC|PROT_READ|PROT_WRITE, MAP_PRIVATE|MAP_ANONYMOUS, 0, 0);
1272
1273 if (cctx->sptr != (void *)-1)
1274 {
1275 #if CORO_STACKGUARD
1276 mprotect (cctx->sptr, CORO_STACKGUARD * PAGESIZE, PROT_NONE);
1277 #endif
1278 stack_start = (char *)cctx->sptr + CORO_STACKGUARD * PAGESIZE;
1279 stack_size = cctx->ssize - CORO_STACKGUARD * PAGESIZE;
1280 cctx->flags |= CC_MAPPED;
1281 }
1282 else
1283#endif
1284 {
1285 cctx->ssize = cctx_stacksize * (long)sizeof (long);
1286 New (0, cctx->sptr, cctx_stacksize, long);
1287
1288 if (!cctx->sptr)
1289 {
1290 perror ("FATAL: unable to allocate stack for coroutine, exiting.");
1291 _exit (EXIT_FAILURE);
1292 }
1293
1294 stack_start = cctx->sptr;
1295 stack_size = cctx->ssize;
1296 }
1297
1298 #if CORO_USE_VALGRIND
1299 cctx->valgrind_id = VALGRIND_STACK_REGISTER ((char *)stack_start, (char *)stack_start + stack_size);
1300 #endif
1301
1302 coro_create (&cctx->cctx, cctx_run, (void *)cctx, stack_start, stack_size);
1303
1304 return cctx;
1305}
1306
1307static void
1308cctx_destroy (coro_cctx *cctx)
1309{
1310 if (!cctx)
1311 return;
1312
1313 assert (("FATAL: tried to destroy current cctx", cctx != cctx_current));//D temporary?
1314
1315 --cctx_count;
1316 coro_destroy (&cctx->cctx);
1317
1318 /* coro_transfer creates new, empty cctx's */
1319 if (cctx->sptr)
1320 {
1321 #if CORO_USE_VALGRIND
1322 VALGRIND_STACK_DEREGISTER (cctx->valgrind_id);
1323 #endif
1324
1325#if HAVE_MMAP
1326 if (cctx->flags & CC_MAPPED)
1327 munmap (cctx->sptr, cctx->ssize);
1328 else
1329#endif
1330 Safefree (cctx->sptr);
1331 }
1332
1333 Safefree (cctx);
1334}
1335
1336/* wether this cctx should be destructed */
1337#define CCTX_EXPIRED(cctx) ((cctx)->gen != cctx_gen || ((cctx)->flags & CC_NOREUSE))
1338
1339static coro_cctx *
1340cctx_get (pTHX)
1341{
1342 while (expect_true (cctx_first))
1343 {
1344 coro_cctx *cctx = cctx_first;
1345 cctx_first = cctx->next;
1346 --cctx_idle;
1347
1348 if (expect_true (!CCTX_EXPIRED (cctx)))
1349 return cctx;
1350
1351 cctx_destroy (cctx);
1352 }
1353
1354 return cctx_new_run ();
1355}
1356
1357static void
1358cctx_put (coro_cctx *cctx)
1359{
1360 assert (("FATAL: cctx_put called on non-initialised cctx in Coro (please report)", cctx->sptr));
1361
1362 /* free another cctx if overlimit */
1363 if (expect_false (cctx_idle >= cctx_max_idle))
1364 {
1365 coro_cctx *first = cctx_first;
1366 cctx_first = first->next;
1367 --cctx_idle;
1368
1369 cctx_destroy (first);
1370 }
1371
1372 ++cctx_idle;
1373 cctx->next = cctx_first;
1374 cctx_first = cctx;
1375}
1376
1377/** coroutine switching *****************************************************/
1378
1379static void
1380transfer_check (pTHX_ struct coro *prev, struct coro *next)
1381{
1382 /* TODO: throwing up here is considered harmful */
1383
1384 if (expect_true (prev != next))
1385 {
1386 if (expect_false (!(prev->flags & (CF_RUNNING | CF_NEW))))
1387 croak ("Coro::State::transfer called with a suspended prev Coro::State, but can only transfer from running or new states,");
1388
1389 if (expect_false (next->flags & CF_RUNNING))
1390 croak ("Coro::State::transfer called with running next Coro::State, but can only transfer to inactive states,");
1391
1392 if (expect_false (next->flags & CF_DESTROYED))
1393 croak ("Coro::State::transfer called with destroyed next Coro::State, but can only transfer to inactive states,");
1394
1395#if !PERL_VERSION_ATLEAST (5,10,0)
1396 if (expect_false (PL_lex_state != LEX_NOTPARSING))
1397 croak ("Coro::State::transfer called while parsing, but this is not supported in your perl version,");
1398#endif
1399 }
1400}
1401
1402/* always use the TRANSFER macro */
1403static void NOINLINE /* noinline so we have a fixed stackframe */
1404transfer (pTHX_ struct coro *prev, struct coro *next, int force_cctx)
1405{
1406 dSTACKLEVEL;
1407
1408 /* sometimes transfer is only called to set idle_sp */
1409 if (expect_false (!prev))
1410 {
1411 cctx_current->idle_sp = STACKLEVEL;
1412 assert (cctx_current->idle_te = PL_top_env); /* just for the side-effect when asserts are enabled */
1413 }
1414 else if (expect_true (prev != next))
1415 {
1416 coro_cctx *cctx_prev;
1417
1418 if (expect_false (prev->flags & CF_NEW))
1419 {
1420 /* create a new empty/source context */
1421 prev->flags &= ~CF_NEW;
1422 prev->flags |= CF_RUNNING;
1423 }
1424
1425 prev->flags &= ~CF_RUNNING;
1426 next->flags |= CF_RUNNING;
1427
1428 /* first get rid of the old state */
1429 save_perl (aTHX_ prev);
1430
1431 if (expect_false (next->flags & CF_NEW))
1432 {
1433 /* need to start coroutine */
1434 next->flags &= ~CF_NEW;
1435 /* setup coroutine call */
1436 coro_setup (aTHX_ next);
1437 }
1438 else
1439 load_perl (aTHX_ next);
1440
1441 /* possibly untie and reuse the cctx */
1442 if (expect_true (
1443 cctx_current->idle_sp == STACKLEVEL
1444 && !(cctx_current->flags & CC_TRACE)
1445 && !force_cctx
1446 ))
1447 {
1448 /* I assume that stacklevel is a stronger indicator than PL_top_env changes */
1449 assert (("FATAL: current top_env must equal previous top_env in Coro (please report)", PL_top_env == cctx_current->idle_te));
1450
1451 /* if the cctx is about to be destroyed we need to make sure we won't see it in cctx_get. */
1452 /* without this the next cctx_get might destroy the running cctx while still in use */
1453 if (expect_false (CCTX_EXPIRED (cctx_current)))
1454 if (expect_true (!next->cctx))
1455 next->cctx = cctx_get (aTHX);
1456
1457 cctx_put (cctx_current);
1458 }
1459 else
1460 prev->cctx = cctx_current;
1461
1462 ++next->usecount;
1463
1464 cctx_prev = cctx_current;
1465 cctx_current = expect_false (next->cctx) ? next->cctx : cctx_get (aTHX);
1466
1467 next->cctx = 0;
1468
1469 if (expect_false (cctx_prev != cctx_current))
1470 {
1471 cctx_prev->top_env = PL_top_env;
1472 PL_top_env = cctx_current->top_env;
1473 coro_transfer (&cctx_prev->cctx, &cctx_current->cctx);
1474 }
1475
1476 transfer_tail (aTHX);
1477 }
1478}
1479
1480#define TRANSFER(ta, force_cctx) transfer (aTHX_ (ta).prev, (ta).next, (force_cctx))
1481#define TRANSFER_CHECK(ta) transfer_check (aTHX_ (ta).prev, (ta).next)
1482
1483/** high level stuff ********************************************************/
1484
1485static int
1486coro_state_destroy (pTHX_ struct coro *coro)
1487{
1488 if (coro->flags & CF_DESTROYED)
1489 return 0;
1490
1491 if (coro->on_destroy)
1492 coro->on_destroy (aTHX_ coro);
1493
1494 coro->flags |= CF_DESTROYED;
1495
1496 if (coro->flags & CF_READY)
1497 {
1498 /* reduce nready, as destroying a ready coro effectively unreadies it */
1499 /* alternative: look through all ready queues and remove the coro */
1500 --coro_nready;
1501 }
1502 else
1503 coro->flags |= CF_READY; /* make sure it is NOT put into the readyqueue */
1504
1505 if (coro->mainstack
1506 && coro->mainstack != main_mainstack
1507 && coro->slot
1508 && !PL_dirty)
1509 {
1510 struct coro *current = SvSTATE_current;
1511
1512 assert (("FATAL: tried to destroy currently running coroutine", coro->mainstack != PL_mainstack));
1513
1514 save_perl (aTHX_ current);
1515 load_perl (aTHX_ coro);
1516
1517 coro_destruct_perl (aTHX_ coro);
1518
1519 load_perl (aTHX_ current);
1520
1521 coro->slot = 0;
1522 }
1523
1524 cctx_destroy (coro->cctx);
1525 SvREFCNT_dec (coro->startcv);
1526 SvREFCNT_dec (coro->args);
1527 SvREFCNT_dec (CORO_THROW);
1528
1529 if (coro->next) coro->next->prev = coro->prev;
1530 if (coro->prev) coro->prev->next = coro->next;
1531 if (coro == coro_first) coro_first = coro->next;
1532
1533 return 1;
1534}
1535
1536static int
1537coro_state_free (pTHX_ SV *sv, MAGIC *mg)
1538{
1539 struct coro *coro = (struct coro *)mg->mg_ptr;
1540 mg->mg_ptr = 0;
1541
1542 coro->hv = 0;
1543
1544 if (--coro->refcnt < 0)
1545 {
1546 coro_state_destroy (aTHX_ coro);
1547 Safefree (coro);
1548 }
1549
1550 return 0;
1551}
1552
1553static int
1554coro_state_dup (pTHX_ MAGIC *mg, CLONE_PARAMS *params)
1555{
1556 struct coro *coro = (struct coro *)mg->mg_ptr;
1557
1558 ++coro->refcnt;
1559
1560 return 0;
1561}
1562
1563static MGVTBL coro_state_vtbl = {
1564 0, 0, 0, 0,
1565 coro_state_free,
1566 0,
1567#ifdef MGf_DUP
1568 coro_state_dup,
1569#else
1570# define MGf_DUP 0
1571#endif
1572};
1573
1574static void
1575prepare_transfer (pTHX_ struct coro_transfer_args *ta, SV *prev_sv, SV *next_sv)
1576{
1577 ta->prev = SvSTATE (prev_sv);
1578 ta->next = SvSTATE (next_sv);
1579 TRANSFER_CHECK (*ta);
1580}
1581
1582static void
1583api_transfer (pTHX_ SV *prev_sv, SV *next_sv)
1584{
1585 struct coro_transfer_args ta;
1586
1587 prepare_transfer (aTHX_ &ta, prev_sv, next_sv);
1588 TRANSFER (ta, 1);
1589}
1590
1591/*****************************************************************************/
1592/* gensub: simple closure generation utility */
1593
1594#define GENSUB_ARG CvXSUBANY (cv).any_ptr
1595
1596/* create a closure from XS, returns a code reference */
1597/* the arg can be accessed via GENSUB_ARG from the callback */
1598/* the callback must use dXSARGS/XSRETURN */
1599static SV *
1600gensub (pTHX_ void (*xsub)(pTHX_ CV *), void *arg)
1601{
1602 CV *cv = (CV *)newSV (0);
1603
1604 sv_upgrade ((SV *)cv, SVt_PVCV);
1605
1606 CvANON_on (cv);
1607 CvISXSUB_on (cv);
1608 CvXSUB (cv) = xsub;
1609 GENSUB_ARG = arg;
1610
1611 return newRV_noinc ((SV *)cv);
1612}
1613
1614/** Coro ********************************************************************/
1615
1616INLINE void
1617coro_enq (pTHX_ struct coro *coro)
1618{
1619 av_push (coro_ready [coro->prio - PRIO_MIN], SvREFCNT_inc_NN (coro->hv));
1620}
1621
1622INLINE SV *
1623coro_deq (pTHX)
1624{
1625 int prio;
1626
1627 for (prio = PRIO_MAX - PRIO_MIN + 1; --prio >= 0; )
1628 if (AvFILLp (coro_ready [prio]) >= 0)
1629 return av_shift (coro_ready [prio]);
1630
1631 return 0;
1632}
1633
1634static int
1635api_ready (pTHX_ SV *coro_sv)
1636{
1637 struct coro *coro;
1638 SV *sv_hook;
1639 void (*xs_hook)(void);
1640
1641 coro = SvSTATE (coro_sv);
1642
1643 if (coro->flags & CF_READY)
1644 return 0;
1645
1646 coro->flags |= CF_READY;
1647
1648 sv_hook = coro_nready ? 0 : coro_readyhook;
1649 xs_hook = coro_nready ? 0 : coroapi.readyhook;
1650
1651 coro_enq (aTHX_ coro);
1652 ++coro_nready;
1653
1654 if (sv_hook)
1655 {
1656 dSP;
1657
1658 ENTER;
1659 SAVETMPS;
1660
1661 PUSHMARK (SP);
1662 PUTBACK;
1663 call_sv (sv_hook, G_VOID | G_DISCARD);
1664
1665 FREETMPS;
1666 LEAVE;
1667 }
1668
1669 if (xs_hook)
1670 xs_hook ();
1671
1672 return 1;
1673}
1674
1675static int
1676api_is_ready (pTHX_ SV *coro_sv)
1677{
1678 return !!(SvSTATE (coro_sv)->flags & CF_READY);
1679}
1680
1681/* expects to own a reference to next->hv */
1682INLINE void
1683prepare_schedule_to (pTHX_ struct coro_transfer_args *ta, struct coro *next)
1684{
1685 SV *prev_sv = SvRV (coro_current);
1686
1687 ta->prev = SvSTATE_hv (prev_sv);
1688 ta->next = next;
1689
1690 TRANSFER_CHECK (*ta);
1691
1692 SvRV_set (coro_current, (SV *)next->hv);
1693
1694 free_coro_mortal (aTHX);
1695 coro_mortal = prev_sv;
1696}
1697
1698static void
1699prepare_schedule (pTHX_ struct coro_transfer_args *ta)
1700{
1701 for (;;)
1702 {
1703 SV *next_sv = coro_deq (aTHX);
1704
1705 if (expect_true (next_sv))
1706 {
1707 struct coro *next = SvSTATE_hv (next_sv);
1708
1709 /* cannot transfer to destroyed coros, skip and look for next */
1710 if (expect_false (next->flags & CF_DESTROYED))
1711 SvREFCNT_dec (next_sv); /* coro_nready has already been taken care of by destroy */
1712 else
1713 {
1714 next->flags &= ~CF_READY;
1715 --coro_nready;
1716
1717 prepare_schedule_to (aTHX_ ta, next);
1718 break;
1719 }
1720 }
1721 else
1722 {
1723 /* nothing to schedule: call the idle handler */
1724 if (SvROK (sv_idle)
1725 && SvOBJECT (SvRV (sv_idle)))
1726 {
1727 ++coro_nready; /* hack so that api_ready doesn't invoke ready hook */
1728 api_ready (aTHX_ SvRV (sv_idle));
1729 --coro_nready;
1730 }
1731 else
1732 {
1733 dSP;
1734
1735 ENTER;
1736 SAVETMPS;
1737
1738 PUSHMARK (SP);
1739 PUTBACK;
1740 call_sv (sv_idle, G_VOID | G_DISCARD);
1741
1742 FREETMPS;
1743 LEAVE;
1744 }
1745 }
1746 }
1747}
1748
1749INLINE void
1750prepare_cede (pTHX_ struct coro_transfer_args *ta)
1751{
1752 api_ready (aTHX_ coro_current);
1753 prepare_schedule (aTHX_ ta);
1754}
1755
1756INLINE void
1757prepare_cede_notself (pTHX_ struct coro_transfer_args *ta)
1758{
1759 SV *prev = SvRV (coro_current);
1760
1761 if (coro_nready)
1762 {
1763 prepare_schedule (aTHX_ ta);
1764 api_ready (aTHX_ prev);
1765 }
1766 else
1767 prepare_nop (aTHX_ ta);
1768}
1769
1770static void
1771api_schedule (pTHX)
1772{
1773 struct coro_transfer_args ta;
1774
1775 prepare_schedule (aTHX_ &ta);
1776 TRANSFER (ta, 1);
1777}
1778
1779static void
1780api_schedule_to (pTHX_ SV *coro_sv)
1781{
1782 struct coro_transfer_args ta;
1783 struct coro *next = SvSTATE (coro_sv);
1784
1785 SvREFCNT_inc_NN (coro_sv);
1786 prepare_schedule_to (aTHX_ &ta, next);
1787}
1788
1789static int
1790api_cede (pTHX)
1791{
1792 struct coro_transfer_args ta;
1793
1794 prepare_cede (aTHX_ &ta);
1795
1796 if (expect_true (ta.prev != ta.next))
1797 {
1798 TRANSFER (ta, 1);
1799 return 1;
1800 }
1801 else
1802 return 0;
1803}
1804
1805static int
1806api_cede_notself (pTHX)
1807{
1808 if (coro_nready)
1809 {
1810 struct coro_transfer_args ta;
1811
1812 prepare_cede_notself (aTHX_ &ta);
1813 TRANSFER (ta, 1);
1814 return 1;
1815 }
1816 else
1817 return 0;
1818}
1819
1820static void
1821api_trace (pTHX_ SV *coro_sv, int flags)
1822{
1823 struct coro *coro = SvSTATE (coro_sv);
1824
1825 if (coro->flags & CF_RUNNING)
1826 croak ("cannot enable tracing on a running coroutine, caught");
1827
1828 if (flags & CC_TRACE)
1829 {
1830 if (!coro->cctx)
1831 coro->cctx = cctx_new_run ();
1832 else if (!(coro->cctx->flags & CC_TRACE))
1833 croak ("cannot enable tracing on coroutine with custom stack, caught");
1834
1835 coro->cctx->flags |= CC_NOREUSE | (flags & (CC_TRACE | CC_TRACE_ALL));
1836 }
1837 else if (coro->cctx && coro->cctx->flags & CC_TRACE)
1838 {
1839 coro->cctx->flags &= ~(CC_TRACE | CC_TRACE_ALL);
1840
1841 if (coro->flags & CF_RUNNING)
1842 PL_runops = RUNOPS_DEFAULT;
1843 else
1844 coro->slot->runops = RUNOPS_DEFAULT;
1845 }
1846}
1847
1848static void
1849coro_call_on_destroy (pTHX_ struct coro *coro)
1850{
1851 SV **on_destroyp = hv_fetch (coro->hv, "_on_destroy", sizeof ("_on_destroy") - 1, 0);
1852 SV **statusp = hv_fetch (coro->hv, "_status", sizeof ("_status") - 1, 0);
1853
1854 if (on_destroyp)
1855 {
1856 AV *on_destroy = (AV *)SvRV (*on_destroyp);
1857
1858 while (AvFILLp (on_destroy) >= 0)
1859 {
1860 dSP; /* don't disturb outer sp */
1861 SV *cb = av_pop (on_destroy);
1862
1863 PUSHMARK (SP);
1864
1865 if (statusp)
1866 {
1867 int i;
1868 AV *status = (AV *)SvRV (*statusp);
1869 EXTEND (SP, AvFILLp (status) + 1);
1870
1871 for (i = 0; i <= AvFILLp (status); ++i)
1872 PUSHs (AvARRAY (status)[i]);
1873 }
1874
1875 PUTBACK;
1876 call_sv (sv_2mortal (cb), G_VOID | G_DISCARD);
1877 }
1878 }
1879}
1880
1881static void
1882slf_init_terminate (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
1883{
1884 int i;
1885 HV *hv = (HV *)SvRV (coro_current);
1886 AV *av = newAV ();
1887
1888 av_extend (av, items - 1);
1889 for (i = 0; i < items; ++i)
1890 av_push (av, SvREFCNT_inc_NN (arg [i]));
1891
1892 hv_store (hv, "_status", sizeof ("_status") - 1, newRV_noinc ((SV *)av), 0);
1893
1894 av_push (av_destroy, (SV *)newRV_inc ((SV *)hv)); /* RVinc for perl */
1895 api_ready (aTHX_ sv_manager);
1896
1897 frame->prepare = prepare_schedule;
1898 frame->check = slf_check_repeat;
1899
1900 /* as a minor optimisation, we could unwind all stacks here */
1901 /* but that puts extra pressure on pp_slf, and is not worth much */
1902 /*coro_unwind_stacks (aTHX);*/
1903}
1904
1905/*****************************************************************************/
1906/* async pool handler */
1907
1908static int
1909slf_check_pool_handler (pTHX_ struct CoroSLF *frame)
1910{
1911 HV *hv = (HV *)SvRV (coro_current);
1912 struct coro *coro = (struct coro *)frame->data;
1913
1914 if (!coro->invoke_cb)
1915 return 1; /* loop till we have invoke */
1916 else
1917 {
1918 hv_store (hv, "desc", sizeof ("desc") - 1,
1919 newSVpvn ("[async_pool]", sizeof ("[async_pool]") - 1), 0);
1920
1921 coro->saved_deffh = SvREFCNT_inc_NN ((SV *)PL_defoutgv);
1922
1923 {
1924 dSP;
1925 XPUSHs (sv_2mortal (coro->invoke_cb)); coro->invoke_cb = 0;
1926 PUTBACK;
1927 }
1928
1929 SvREFCNT_dec (GvAV (PL_defgv));
1930 GvAV (PL_defgv) = coro->invoke_av;
1931 coro->invoke_av = 0;
1932
1933 return 0;
1934 }
1935}
1936
1937static void
1938slf_init_pool_handler (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
1939{
1940 HV *hv = (HV *)SvRV (coro_current);
1941 struct coro *coro = SvSTATE_hv ((SV *)hv);
1942
1943 if (expect_true (coro->saved_deffh))
1944 {
1945 /* subsequent iteration */
1946 SvREFCNT_dec ((SV *)PL_defoutgv); PL_defoutgv = (GV *)coro->saved_deffh;
1947 coro->saved_deffh = 0;
1948
1949 if (coro_rss (aTHX_ coro) > SvUV (sv_pool_rss)
1950 || av_len (av_async_pool) + 1 >= SvIV (sv_pool_size))
1951 {
1952 coro->invoke_cb = SvREFCNT_inc_NN ((SV *)cv_coro_terminate);
1953 coro->invoke_av = newAV ();
1954
1955 frame->prepare = prepare_nop;
1956 }
1957 else
1958 {
1959 av_clear (GvAV (PL_defgv));
1960 hv_store (hv, "desc", sizeof ("desc") - 1, SvREFCNT_inc_NN (sv_async_pool_idle), 0);
1961
1962 coro->prio = 0;
1963
1964 if (coro->cctx && (coro->cctx->flags & CC_TRACE))
1965 api_trace (aTHX_ coro_current, 0);
1966
1967 frame->prepare = prepare_schedule;
1968 av_push (av_async_pool, SvREFCNT_inc (hv));
1969 }
1970 }
1971 else
1972 {
1973 /* first iteration, simply fall through */
1974 frame->prepare = prepare_nop;
1975 }
1976
1977 frame->check = slf_check_pool_handler;
1978 frame->data = (void *)coro;
1979}
1980
1981/*****************************************************************************/
1982/* rouse callback */
1983
1984#define CORO_MAGIC_type_rouse PERL_MAGIC_ext
1985
1986static void
1987coro_rouse_callback (pTHX_ CV *cv)
1988{
1989 dXSARGS;
1990 SV *data = (SV *)GENSUB_ARG;
1991
1992 if (SvTYPE (SvRV (data)) != SVt_PVAV)
1993 {
1994 /* first call, set args */
1995 AV *av = newAV ();
1996 SV *coro = SvRV (data);
1997
1998 SvRV_set (data, (SV *)av);
1999 api_ready (aTHX_ coro);
2000 SvREFCNT_dec (coro);
2001
2002 /* better take a full copy of the arguments */
2003 while (items--)
2004 av_store (av, items, newSVsv (ST (items)));
2005 }
2006
2007 XSRETURN_EMPTY;
2008}
2009
2010static int
2011slf_check_rouse_wait (pTHX_ struct CoroSLF *frame)
2012{
2013 SV *data = (SV *)frame->data;
2014
2015 if (CORO_THROW)
2016 return 0;
2017
2018 if (SvTYPE (SvRV (data)) != SVt_PVAV)
2019 return 1;
2020
2021 /* now push all results on the stack */
2022 {
2023 dSP;
2024 AV *av = (AV *)SvRV (data);
2025 int i;
2026
2027 EXTEND (SP, AvFILLp (av) + 1);
2028 for (i = 0; i <= AvFILLp (av); ++i)
2029 PUSHs (sv_2mortal (AvARRAY (av)[i]));
2030
2031 /* we have stolen the elements, so ste length to zero and free */
2032 AvFILLp (av) = -1;
2033 av_undef (av);
2034
2035 PUTBACK;
2036 }
2037
2038 return 0;
2039}
2040
2041static void
2042slf_init_rouse_wait (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2043{
2044 SV *cb;
2045
2046 if (items)
2047 cb = arg [0];
2048 else
2049 {
2050 struct coro *coro = SvSTATE_current;
2051
2052 if (!coro->rouse_cb)
2053 croak ("Coro::rouse_wait called without rouse callback, and no default rouse callback found either,");
2054
2055 cb = sv_2mortal (coro->rouse_cb);
2056 coro->rouse_cb = 0;
2057 }
2058
2059 if (!SvROK (cb)
2060 || SvTYPE (SvRV (cb)) != SVt_PVCV
2061 || CvXSUB ((CV *)SvRV (cb)) != coro_rouse_callback)
2062 croak ("Coro::rouse_wait called with illegal callback argument,");
2063
2064 {
2065 CV *cv = (CV *)SvRV (cb); /* for GENSUB_ARG */
2066 SV *data = (SV *)GENSUB_ARG;
2067
2068 frame->data = (void *)data;
2069 frame->prepare = SvTYPE (SvRV (data)) == SVt_PVAV ? prepare_nop : prepare_schedule;
2070 frame->check = slf_check_rouse_wait;
2071 }
2072}
2073
2074static SV *
2075coro_new_rouse_cb (pTHX)
2076{
2077 HV *hv = (HV *)SvRV (coro_current);
2078 struct coro *coro = SvSTATE_hv (hv);
2079 SV *data = newRV_inc ((SV *)hv);
2080 SV *cb = gensub (aTHX_ coro_rouse_callback, (void *)data);
2081
2082 sv_magicext (SvRV (cb), data, CORO_MAGIC_type_rouse, 0, 0, 0);
2083 SvREFCNT_dec (data); /* magicext increases the refcount */
2084
2085 SvREFCNT_dec (coro->rouse_cb);
2086 coro->rouse_cb = SvREFCNT_inc_NN (cb);
2087
2088 return cb;
2089}
2090
2091/*****************************************************************************/
2092/* schedule-like-function opcode (SLF) */
2093
2094static UNOP slf_restore; /* restore stack as entersub did, for first-re-run */
2095static const CV *slf_cv;
2096static SV **slf_argv;
2097static int slf_argc, slf_arga; /* count, allocated */
2098static I32 slf_ax; /* top of stack, for restore */
2099
2100/* this restores the stack in the case we patched the entersub, to */
2101/* recreate the stack frame as perl will on following calls */
2102/* since entersub cleared the stack */
2103static OP *
2104pp_restore (pTHX)
2105{
2106 int i;
2107 SV **SP = PL_stack_base + slf_ax;
2108
2109 PUSHMARK (SP);
2110
2111 EXTEND (SP, slf_argc + 1);
2112
2113 for (i = 0; i < slf_argc; ++i)
2114 PUSHs (sv_2mortal (slf_argv [i]));
2115
2116 PUSHs ((SV *)CvGV (slf_cv));
2117
2118 RETURNOP (slf_restore.op_first);
2119}
2120
2121static void
2122slf_prepare_transfer (pTHX_ struct coro_transfer_args *ta)
2123{
2124 SV **arg = (SV **)slf_frame.data;
2125
2126 prepare_transfer (aTHX_ ta, arg [0], arg [1]);
2127}
2128
2129static void
2130slf_init_transfer (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2131{
2132 if (items != 2)
2133 croak ("Coro::State::transfer (prev, next) expects two arguments, not %d,", items);
2134
2135 frame->prepare = slf_prepare_transfer;
2136 frame->check = slf_check_nop;
2137 frame->data = (void *)arg; /* let's hope it will stay valid */
2138}
2139
2140static void
2141slf_init_schedule (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2142{
2143 frame->prepare = prepare_schedule;
2144 frame->check = slf_check_nop;
2145}
2146
2147static void
2148slf_prepare_schedule_to (pTHX_ struct coro_transfer_args *ta)
2149{
2150 struct coro *next = (struct coro *)slf_frame.data;
2151
2152 SvREFCNT_inc_NN (next->hv);
2153 prepare_schedule_to (aTHX_ ta, next);
2154}
2155
2156static void
2157slf_init_schedule_to (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2158{
2159 if (!items)
2160 croak ("Coro::schedule_to expects a coroutine argument, caught");
2161
2162 frame->data = (void *)SvSTATE (arg [0]);
2163 frame->prepare = slf_prepare_schedule_to;
2164 frame->check = slf_check_nop;
2165}
2166
2167static void
2168slf_init_cede_to (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2169{
2170 api_ready (aTHX_ SvRV (coro_current));
2171
2172 slf_init_schedule_to (aTHX_ frame, cv, arg, items);
2173}
2174
2175static void
2176slf_init_cede (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2177{
2178 frame->prepare = prepare_cede;
2179 frame->check = slf_check_nop;
2180}
2181
2182static void
2183slf_init_cede_notself (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2184{
2185 frame->prepare = prepare_cede_notself;
2186 frame->check = slf_check_nop;
2187}
2188
2189/*
2190 * these not obviously related functions are all rolled into one
2191 * function to increase chances that they all will call transfer with the same
2192 * stack offset
2193 * SLF stands for "schedule-like-function".
2194 */
2195static OP *
2196pp_slf (pTHX)
2197{
2198 I32 checkmark; /* mark SP to see how many elements check has pushed */
2199
2200 /* set up the slf frame, unless it has already been set-up */
2201 /* the latter happens when a new coro has been started */
2202 /* or when a new cctx was attached to an existing coroutine */
2203 if (expect_true (!slf_frame.prepare))
2204 {
2205 /* first iteration */
2206 dSP;
2207 SV **arg = PL_stack_base + TOPMARK + 1;
2208 int items = SP - arg; /* args without function object */
2209 SV *gv = *sp;
2210
2211 /* do a quick consistency check on the "function" object, and if it isn't */
2212 /* for us, divert to the real entersub */
2213 if (SvTYPE (gv) != SVt_PVGV
2214 || !GvCV (gv)
2215 || !(CvFLAGS (GvCV (gv)) & CVf_SLF))
2216 return PL_ppaddr[OP_ENTERSUB](aTHX);
2217
2218 if (!(PL_op->op_flags & OPf_STACKED))
2219 {
2220 /* ampersand-form of call, use @_ instead of stack */
2221 AV *av = GvAV (PL_defgv);
2222 arg = AvARRAY (av);
2223 items = AvFILLp (av) + 1;
2224 }
2225
2226 /* now call the init function, which needs to set up slf_frame */
2227 ((coro_slf_cb)CvXSUBANY (GvCV (gv)).any_ptr)
2228 (aTHX_ &slf_frame, GvCV (gv), arg, items);
2229
2230 /* pop args */
2231 SP = PL_stack_base + POPMARK;
2232
2233 PUTBACK;
2234 }
2235
2236 /* now that we have a slf_frame, interpret it! */
2237 /* we use a callback system not to make the code needlessly */
2238 /* complicated, but so we can run multiple perl coros from one cctx */
2239
2240 do
2241 {
2242 struct coro_transfer_args ta;
2243
2244 slf_frame.prepare (aTHX_ &ta);
2245 TRANSFER (ta, 0);
2246
2247 checkmark = PL_stack_sp - PL_stack_base;
2248 }
2249 while (slf_frame.check (aTHX_ &slf_frame));
2250
2251 slf_frame.prepare = 0; /* invalidate the frame, we are done processing it */
2252
2253 /* exception handling */
2254 if (expect_false (CORO_THROW))
2255 {
2256 SV *exception = sv_2mortal (CORO_THROW);
2257
2258 CORO_THROW = 0;
2259 sv_setsv (ERRSV, exception);
2260 croak (0);
2261 }
2262
2263 /* return value handling - mostly like entersub */
2264 /* make sure we put something on the stack in scalar context */
2265 if (GIMME_V == G_SCALAR)
2266 {
2267 dSP;
2268 SV **bot = PL_stack_base + checkmark;
2269
2270 if (sp == bot) /* too few, push undef */
2271 bot [1] = &PL_sv_undef;
2272 else if (sp != bot + 1) /* too many, take last one */
2273 bot [1] = *sp;
2274
2275 SP = bot + 1;
2276
2277 PUTBACK;
2278 }
2279
2280 return NORMAL;
2281}
2282
2283static void
2284api_execute_slf (pTHX_ CV *cv, coro_slf_cb init_cb, I32 ax)
2285{
2286 int i;
2287 SV **arg = PL_stack_base + ax;
2288 int items = PL_stack_sp - arg + 1;
2289
2290 assert (("FATAL: SLF call with illegal CV value", !CvANON (cv)));
2291
2292 if (PL_op->op_ppaddr != PL_ppaddr [OP_ENTERSUB]
2293 && PL_op->op_ppaddr != pp_slf)
2294 croak ("FATAL: Coro SLF calls can only be made normally, not via goto or any other means, caught");
2295
2296 CvFLAGS (cv) |= CVf_SLF;
2297 CvXSUBANY (cv).any_ptr = (void *)init_cb;
2298 slf_cv = cv;
2299
2300 /* we patch the op, and then re-run the whole call */
2301 /* we have to put the same argument on the stack for this to work */
2302 /* and this will be done by pp_restore */
2303 slf_restore.op_next = (OP *)&slf_restore;
2304 slf_restore.op_type = OP_CUSTOM;
2305 slf_restore.op_ppaddr = pp_restore;
2306 slf_restore.op_first = PL_op;
2307
2308 slf_ax = ax - 1; /* undo the ax++ inside dAXMARK */
2309
2310 if (PL_op->op_flags & OPf_STACKED)
2311 {
2312 if (items > slf_arga)
2313 {
2314 slf_arga = items;
2315 free (slf_argv);
2316 slf_argv = malloc (slf_arga * sizeof (SV *));
2317 }
2318
2319 slf_argc = items;
2320
2321 for (i = 0; i < items; ++i)
2322 slf_argv [i] = SvREFCNT_inc (arg [i]);
2323 }
2324 else
2325 slf_argc = 0;
2326
2327 PL_op->op_ppaddr = pp_slf;
2328 /*PL_op->op_type = OP_CUSTOM; /* we do behave like entersub still */
2329
2330 PL_op = (OP *)&slf_restore;
2331}
2332
2333/*****************************************************************************/
2334/* dynamic wind */
2335
2336static void
2337on_enterleave_call (pTHX_ SV *cb)
2338{
2339 dSP;
2340
2341 PUSHSTACK;
2342
2343 PUSHMARK (SP);
2344 PUTBACK;
2345 call_sv (cb, G_VOID | G_DISCARD);
2346 SPAGAIN;
2347
2348 POPSTACK;
2349}
2350
2351static SV *
2352coro_avp_pop_and_free (AV **avp)
2353{
2354 AV *av = *avp;
2355 SV *res = av_pop (av);
2356
2357 if (AvFILLp (av) < 0)
2358 {
2359 *avp = 0;
2360 SvREFCNT_dec (av);
2361 }
2362
2363 return res;
2364}
2365
2366static void
2367coro_pop_on_enter (pTHX_ void *coro)
2368{
2369 SV *cb = coro_avp_pop_and_free (&((struct coro *)coro)->on_enter);
2370 SvREFCNT_dec (cb);
2371}
2372
2373static void
2374coro_pop_on_leave (pTHX_ void *coro)
2375{
2376 SV *cb = coro_avp_pop_and_free (&((struct coro *)coro)->on_leave);
2377 on_enterleave_call (sv_2mortal (cb));
2378}
2379
2380/*****************************************************************************/
2381/* PerlIO::cede */
2382
2383typedef struct
2384{
2385 PerlIOBuf base;
2386 NV next, every;
2387} PerlIOCede;
2388
2389static IV
2390PerlIOCede_pushed (pTHX_ PerlIO *f, const char *mode, SV *arg, PerlIO_funcs *tab)
2391{
2392 PerlIOCede *self = PerlIOSelf (f, PerlIOCede);
2393
2394 self->every = SvCUR (arg) ? SvNV (arg) : 0.01;
2395 self->next = nvtime () + self->every;
2396
2397 return PerlIOBuf_pushed (aTHX_ f, mode, Nullsv, tab);
2398}
2399
2400static SV *
2401PerlIOCede_getarg (pTHX_ PerlIO *f, CLONE_PARAMS *param, int flags)
2402{
2403 PerlIOCede *self = PerlIOSelf (f, PerlIOCede);
2404
2405 return newSVnv (self->every);
2406}
2407
2408static IV
2409PerlIOCede_flush (pTHX_ PerlIO *f)
2410{
2411 PerlIOCede *self = PerlIOSelf (f, PerlIOCede);
2412 double now = nvtime ();
2413
2414 if (now >= self->next)
2415 {
2416 api_cede (aTHX);
2417 self->next = now + self->every;
2418 }
2419
2420 return PerlIOBuf_flush (aTHX_ f);
2421}
2422
2423static PerlIO_funcs PerlIO_cede =
2424{
2425 sizeof(PerlIO_funcs),
2426 "cede",
2427 sizeof(PerlIOCede),
2428 PERLIO_K_DESTRUCT | PERLIO_K_RAW,
2429 PerlIOCede_pushed,
2430 PerlIOBuf_popped,
2431 PerlIOBuf_open,
2432 PerlIOBase_binmode,
2433 PerlIOCede_getarg,
2434 PerlIOBase_fileno,
2435 PerlIOBuf_dup,
2436 PerlIOBuf_read,
2437 PerlIOBuf_unread,
2438 PerlIOBuf_write,
2439 PerlIOBuf_seek,
2440 PerlIOBuf_tell,
2441 PerlIOBuf_close,
2442 PerlIOCede_flush,
2443 PerlIOBuf_fill,
2444 PerlIOBase_eof,
2445 PerlIOBase_error,
2446 PerlIOBase_clearerr,
2447 PerlIOBase_setlinebuf,
2448 PerlIOBuf_get_base,
2449 PerlIOBuf_bufsiz,
2450 PerlIOBuf_get_ptr,
2451 PerlIOBuf_get_cnt,
2452 PerlIOBuf_set_ptrcnt,
2453};
2454
2455/*****************************************************************************/
2456/* Coro::Semaphore & Coro::Signal */
2457
2458static SV *
2459coro_waitarray_new (pTHX_ int count)
2460{
2461 /* a semaphore contains a counter IV in $sem->[0] and any waiters after that */
2462 AV *av = newAV ();
2463 SV **ary;
2464
2465 /* unfortunately, building manually saves memory */
2466 Newx (ary, 2, SV *);
2467 AvALLOC (av) = ary;
2468#if PERL_VERSION_ATLEAST (5,10,0)
2469 AvARRAY (av) = ary;
2470#else
2471 /* 5.8.8 needs this syntax instead of AvARRAY = ary, yet */
2472 /* -DDEBUGGING flags this as a bug, despite it perfectly working */
2473 SvPVX ((SV *)av) = (char *)ary;
2474#endif
2475 AvMAX (av) = 1;
2476 AvFILLp (av) = 0;
2477 ary [0] = newSViv (count);
2478
2479 return newRV_noinc ((SV *)av);
2480}
2481
2482/* semaphore */
2483
2484static void
2485coro_semaphore_adjust (pTHX_ AV *av, IV adjust)
2486{
2487 SV *count_sv = AvARRAY (av)[0];
2488 IV count = SvIVX (count_sv);
2489
2490 count += adjust;
2491 SvIVX (count_sv) = count;
2492
2493 /* now wake up as many waiters as are expected to lock */
2494 while (count > 0 && AvFILLp (av) > 0)
2495 {
2496 SV *cb;
2497
2498 /* swap first two elements so we can shift a waiter */
2499 AvARRAY (av)[0] = AvARRAY (av)[1];
2500 AvARRAY (av)[1] = count_sv;
2501 cb = av_shift (av);
2502
2503 if (SvOBJECT (cb))
2504 {
2505 api_ready (aTHX_ cb);
2506 --count;
2507 }
2508 else if (SvTYPE (cb) == SVt_PVCV)
2509 {
2510 dSP;
2511 PUSHMARK (SP);
2512 XPUSHs (sv_2mortal (newRV_inc ((SV *)av)));
2513 PUTBACK;
2514 call_sv (cb, G_VOID | G_DISCARD | G_EVAL | G_KEEPERR);
2515 }
2516
2517 SvREFCNT_dec (cb);
2518 }
2519}
2520
2521static void
2522coro_semaphore_on_destroy (pTHX_ struct coro *coro)
2523{
2524 /* call $sem->adjust (0) to possibly wake up some other waiters */
2525 coro_semaphore_adjust (aTHX_ (AV *)coro->slf_frame.data, 0);
2526}
2527
2528static int
2529slf_check_semaphore_down_or_wait (pTHX_ struct CoroSLF *frame, int acquire)
2530{
2531 AV *av = (AV *)frame->data;
2532 SV *count_sv = AvARRAY (av)[0];
2533
2534 /* if we are about to throw, don't actually acquire the lock, just throw */
2535 if (CORO_THROW)
2536 return 0;
2537 else if (SvIVX (count_sv) > 0)
2538 {
2539 SvSTATE_current->on_destroy = 0;
2540
2541 if (acquire)
2542 SvIVX (count_sv) = SvIVX (count_sv) - 1;
2543 else
2544 coro_semaphore_adjust (aTHX_ av, 0);
2545
2546 return 0;
2547 }
2548 else
2549 {
2550 int i;
2551 /* if we were woken up but can't down, we look through the whole */
2552 /* waiters list and only add us if we aren't in there already */
2553 /* this avoids some degenerate memory usage cases */
2554
2555 for (i = 1; i <= AvFILLp (av); ++i)
2556 if (AvARRAY (av)[i] == SvRV (coro_current))
2557 return 1;
2558
2559 av_push (av, SvREFCNT_inc (SvRV (coro_current)));
2560 return 1;
2561 }
2562}
2563
2564static int
2565slf_check_semaphore_down (pTHX_ struct CoroSLF *frame)
2566{
2567 return slf_check_semaphore_down_or_wait (aTHX_ frame, 1);
2568}
2569
2570static int
2571slf_check_semaphore_wait (pTHX_ struct CoroSLF *frame)
2572{
2573 return slf_check_semaphore_down_or_wait (aTHX_ frame, 0);
2574}
2575
2576static void
2577slf_init_semaphore_down_or_wait (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2578{
2579 AV *av = (AV *)SvRV (arg [0]);
2580
2581 if (SvIVX (AvARRAY (av)[0]) > 0)
2582 {
2583 frame->data = (void *)av;
2584 frame->prepare = prepare_nop;
2585 }
2586 else
2587 {
2588 av_push (av, SvREFCNT_inc (SvRV (coro_current)));
2589
2590 frame->data = (void *)sv_2mortal (SvREFCNT_inc ((SV *)av));
2591 frame->prepare = prepare_schedule;
2592
2593 /* to avoid race conditions when a woken-up coro gets terminated */
2594 /* we arrange for a temporary on_destroy that calls adjust (0) */
2595 SvSTATE_current->on_destroy = coro_semaphore_on_destroy;
2596 }
2597}
2598
2599static void
2600slf_init_semaphore_down (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2601{
2602 slf_init_semaphore_down_or_wait (aTHX_ frame, cv, arg, items);
2603 frame->check = slf_check_semaphore_down;
2604}
2605
2606static void
2607slf_init_semaphore_wait (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2608{
2609 if (items >= 2)
2610 {
2611 /* callback form */
2612 AV *av = (AV *)SvRV (arg [0]);
2613 CV *cb_cv = coro_sv_2cv (aTHX_ arg [1]);
2614
2615 av_push (av, (SV *)SvREFCNT_inc_NN (cb_cv));
2616
2617 if (SvIVX (AvARRAY (av)[0]) > 0)
2618 coro_semaphore_adjust (aTHX_ av, 0);
2619
2620 frame->prepare = prepare_nop;
2621 frame->check = slf_check_nop;
2622 }
2623 else
2624 {
2625 slf_init_semaphore_down_or_wait (aTHX_ frame, cv, arg, items);
2626 frame->check = slf_check_semaphore_wait;
2627 }
2628}
2629
2630/* signal */
2631
2632static void
2633coro_signal_wake (pTHX_ AV *av, int count)
2634{
2635 SvIVX (AvARRAY (av)[0]) = 0;
2636
2637 /* now signal count waiters */
2638 while (count > 0 && AvFILLp (av) > 0)
2639 {
2640 SV *cb;
2641
2642 /* swap first two elements so we can shift a waiter */
2643 cb = AvARRAY (av)[0];
2644 AvARRAY (av)[0] = AvARRAY (av)[1];
2645 AvARRAY (av)[1] = cb;
2646
2647 cb = av_shift (av);
2648
2649 api_ready (aTHX_ cb);
2650 sv_setiv (cb, 0); /* signal waiter */
2651 SvREFCNT_dec (cb);
2652
2653 --count;
2654 }
2655}
2656
2657static int
2658slf_check_signal_wait (pTHX_ struct CoroSLF *frame)
2659{
2660 /* if we are about to throw, also stop waiting */
2661 return SvROK ((SV *)frame->data) && !CORO_THROW;
2662}
2663
2664static void
2665slf_init_signal_wait (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2666{
2667 AV *av = (AV *)SvRV (arg [0]);
2668
2669 if (SvIVX (AvARRAY (av)[0]))
2670 {
2671 SvIVX (AvARRAY (av)[0]) = 0;
2672 frame->prepare = prepare_nop;
2673 frame->check = slf_check_nop;
2674 }
2675 else
2676 {
2677 SV *waiter = newRV_inc (SvRV (coro_current)); /* owned by signal av */
2678
2679 av_push (av, waiter);
2680
2681 frame->data = (void *)sv_2mortal (SvREFCNT_inc_NN (waiter)); /* owned by process */
2682 frame->prepare = prepare_schedule;
2683 frame->check = slf_check_signal_wait;
2684 }
2685}
2686
2687/*****************************************************************************/
2688/* Coro::AIO */
2689
2690#define CORO_MAGIC_type_aio PERL_MAGIC_ext
2691
2692/* helper storage struct */
2693struct io_state
2694{
2695 int errorno;
2696 I32 laststype; /* U16 in 5.10.0 */
2697 int laststatval;
2698 Stat_t statcache;
2699};
2700
2701static void
2702coro_aio_callback (pTHX_ CV *cv)
2703{
2704 dXSARGS;
2705 AV *state = (AV *)GENSUB_ARG;
2706 SV *coro = av_pop (state);
2707 SV *data_sv = newSV (sizeof (struct io_state));
2708
2709 av_extend (state, items - 1);
2710
2711 sv_upgrade (data_sv, SVt_PV);
2712 SvCUR_set (data_sv, sizeof (struct io_state));
2713 SvPOK_only (data_sv);
2714
2715 {
2716 struct io_state *data = (struct io_state *)SvPVX (data_sv);
2717
2718 data->errorno = errno;
2719 data->laststype = PL_laststype;
2720 data->laststatval = PL_laststatval;
2721 data->statcache = PL_statcache;
2722 }
2723
2724 /* now build the result vector out of all the parameters and the data_sv */
2725 {
2726 int i;
2727
2728 for (i = 0; i < items; ++i)
2729 av_push (state, SvREFCNT_inc_NN (ST (i)));
2730 }
2731
2732 av_push (state, data_sv);
2733
2734 api_ready (aTHX_ coro);
2735 SvREFCNT_dec (coro);
2736 SvREFCNT_dec ((AV *)state);
2737}
2738
2739static int
2740slf_check_aio_req (pTHX_ struct CoroSLF *frame)
2741{
2742 AV *state = (AV *)frame->data;
2743
2744 /* if we are about to throw, return early */
2745 /* this does not cancel the aio request, but at least */
2746 /* it quickly returns */
2747 if (CORO_THROW)
2748 return 0;
2749
2750 /* one element that is an RV? repeat! */
2751 if (AvFILLp (state) == 0 && SvROK (AvARRAY (state)[0]))
2752 return 1;
2753
2754 /* restore status */
2755 {
2756 SV *data_sv = av_pop (state);
2757 struct io_state *data = (struct io_state *)SvPVX (data_sv);
2758
2759 errno = data->errorno;
2760 PL_laststype = data->laststype;
2761 PL_laststatval = data->laststatval;
2762 PL_statcache = data->statcache;
2763
2764 SvREFCNT_dec (data_sv);
2765 }
2766
2767 /* push result values */
2768 {
2769 dSP;
2770 int i;
2771
2772 EXTEND (SP, AvFILLp (state) + 1);
2773 for (i = 0; i <= AvFILLp (state); ++i)
2774 PUSHs (sv_2mortal (SvREFCNT_inc_NN (AvARRAY (state)[i])));
2775
2776 PUTBACK;
2777 }
2778
2779 return 0;
2780}
2781
2782static void
2783slf_init_aio_req (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2784{
2785 AV *state = (AV *)sv_2mortal ((SV *)newAV ());
2786 SV *coro_hv = SvRV (coro_current);
2787 struct coro *coro = SvSTATE_hv (coro_hv);
2788
2789 /* put our coroutine id on the state arg */
2790 av_push (state, SvREFCNT_inc_NN (coro_hv));
2791
2792 /* first see whether we have a non-zero priority and set it as AIO prio */
2793 if (coro->prio)
2794 {
2795 dSP;
2796
2797 static SV *prio_cv;
2798 static SV *prio_sv;
2799
2800 if (expect_false (!prio_cv))
2801 {
2802 prio_cv = (SV *)get_cv ("IO::AIO::aioreq_pri", 0);
2803 prio_sv = newSViv (0);
2804 }
2805
2806 PUSHMARK (SP);
2807 sv_setiv (prio_sv, coro->prio);
2808 XPUSHs (prio_sv);
2809
2810 PUTBACK;
2811 call_sv (prio_cv, G_VOID | G_DISCARD);
2812 }
2813
2814 /* now call the original request */
2815 {
2816 dSP;
2817 CV *req = (CV *)CORO_MAGIC_NN ((SV *)cv, CORO_MAGIC_type_aio)->mg_obj;
2818 int i;
2819
2820 PUSHMARK (SP);
2821
2822 /* first push all args to the stack */
2823 EXTEND (SP, items + 1);
2824
2825 for (i = 0; i < items; ++i)
2826 PUSHs (arg [i]);
2827
2828 /* now push the callback closure */
2829 PUSHs (sv_2mortal (gensub (aTHX_ coro_aio_callback, (void *)SvREFCNT_inc_NN ((SV *)state))));
2830
2831 /* now call the AIO function - we assume our request is uncancelable */
2832 PUTBACK;
2833 call_sv ((SV *)req, G_VOID | G_DISCARD);
2834 }
2835
2836 /* now that the requets is going, we loop toll we have a result */
2837 frame->data = (void *)state;
2838 frame->prepare = prepare_schedule;
2839 frame->check = slf_check_aio_req;
2840}
2841
2842static void
2843coro_aio_req_xs (pTHX_ CV *cv)
2844{
2845 dXSARGS;
2846
2847 CORO_EXECUTE_SLF_XS (slf_init_aio_req);
2848
2849 XSRETURN_EMPTY;
2850}
2851
2852/*****************************************************************************/
2853
2854#if CORO_CLONE
2855# include "clone.c"
2856#endif
2857
2858MODULE = Coro::State PACKAGE = Coro::State PREFIX = api_
2859
2860PROTOTYPES: DISABLE
2861
2862BOOT:
2863{
2864#ifdef USE_ITHREADS
2865# if CORO_PTHREAD
2866 coro_thx = PERL_GET_CONTEXT;
2867# endif
2868#endif
2869 BOOT_PAGESIZE;
2870
2871 cctx_current = cctx_new_empty ();
2872
2873 irsgv = gv_fetchpv ("/" , GV_ADD|GV_NOTQUAL, SVt_PV);
2874 stdoutgv = gv_fetchpv ("STDOUT", GV_ADD|GV_NOTQUAL, SVt_PVIO);
2875
2876 orig_sigelem_get = PL_vtbl_sigelem.svt_get; PL_vtbl_sigelem.svt_get = coro_sigelem_get;
2877 orig_sigelem_set = PL_vtbl_sigelem.svt_set; PL_vtbl_sigelem.svt_set = coro_sigelem_set;
2878 orig_sigelem_clr = PL_vtbl_sigelem.svt_clear; PL_vtbl_sigelem.svt_clear = coro_sigelem_clr;
2879
2880 hv_sig = coro_get_hv (aTHX_ "SIG", TRUE);
2881 rv_diehook = newRV_inc ((SV *)gv_fetchpv ("Coro::State::diehook" , 0, SVt_PVCV));
2882 rv_warnhook = newRV_inc ((SV *)gv_fetchpv ("Coro::State::warnhook", 0, SVt_PVCV));
2883
2884 coro_state_stash = gv_stashpv ("Coro::State", TRUE);
2885
2886 newCONSTSUB (coro_state_stash, "CC_TRACE" , newSViv (CC_TRACE));
2887 newCONSTSUB (coro_state_stash, "CC_TRACE_SUB" , newSViv (CC_TRACE_SUB));
2888 newCONSTSUB (coro_state_stash, "CC_TRACE_LINE", newSViv (CC_TRACE_LINE));
2889 newCONSTSUB (coro_state_stash, "CC_TRACE_ALL" , newSViv (CC_TRACE_ALL));
2890
2891 main_mainstack = PL_mainstack;
2892 main_top_env = PL_top_env;
2893
2894 while (main_top_env->je_prev)
2895 main_top_env = main_top_env->je_prev;
2896
2897 {
2898 SV *slf = sv_2mortal (newSViv (PTR2IV (pp_slf)));
2899
2900 if (!PL_custom_op_names) PL_custom_op_names = newHV ();
2901 hv_store_ent (PL_custom_op_names, slf, newSVpv ("coro_slf", 0), 0);
2902
2903 if (!PL_custom_op_descs) PL_custom_op_descs = newHV ();
2904 hv_store_ent (PL_custom_op_descs, slf, newSVpv ("coro schedule like function", 0), 0);
2905 }
2906
2907 coroapi.ver = CORO_API_VERSION;
2908 coroapi.rev = CORO_API_REVISION;
2909
2910 coroapi.transfer = api_transfer;
2911
2912 coroapi.sv_state = SvSTATE_;
2913 coroapi.execute_slf = api_execute_slf;
2914 coroapi.prepare_nop = prepare_nop;
2915 coroapi.prepare_schedule = prepare_schedule;
2916 coroapi.prepare_cede = prepare_cede;
2917 coroapi.prepare_cede_notself = prepare_cede_notself;
2918
2919 {
2920 SV **svp = hv_fetch (PL_modglobal, "Time::NVtime", 12, 0);
2921
2922 if (!svp) croak ("Time::HiRes is required");
2923 if (!SvIOK (*svp)) croak ("Time::NVtime isn't a function pointer");
2924
2925 nvtime = INT2PTR (double (*)(), SvIV (*svp));
2926 }
2927
2928 assert (("PRIO_NORMAL must be 0", !PRIO_NORMAL));
2929}
2930
2931SV *
2932new (char *klass, ...)
2933 ALIAS:
2934 Coro::new = 1
2935 CODE:
2936{
2937 struct coro *coro;
2938 MAGIC *mg;
2939 HV *hv;
2940 CV *cb;
2941 int i;
2942
2943 if (items > 1)
2944 {
2945 cb = coro_sv_2cv (aTHX_ ST (1));
2946
2947 if (!ix)
235 { 2948 {
236 /* I never used formats, so how should I know how these are implemented? */ 2949 if (CvISXSUB (cb))
237 /* my bold guess is as a simple, plain sub... */ 2950 croak ("Coro::State doesn't support XS functions as coroutine start, caught");
238 croak ("CXt_FORMAT not yet handled. Don't switch coroutines from within formats"); 2951
2952 if (!CvROOT (cb))
2953 croak ("Coro::State doesn't support autoloaded or undefined functions as coroutine start, caught");
239 } 2954 }
240 } 2955 }
241 2956
242 if (top_si->si_type == PERLSI_MAIN)
243 break;
244
245 top_si = top_si->si_prev;
246 ccstk = top_si->si_cxstack;
247 cxix = top_si->si_cxix;
248 }
249
250 PUTBACK;
251 }
252
253 c->dowarn = PL_dowarn;
254 c->defav = GvAV (PL_defgv);
255 c->curstackinfo = PL_curstackinfo;
256 c->curstack = PL_curstack;
257 c->mainstack = PL_mainstack;
258 c->stack_sp = PL_stack_sp;
259 c->op = PL_op;
260 c->curpad = PL_curpad;
261 c->stack_base = PL_stack_base;
262 c->stack_max = PL_stack_max;
263 c->tmps_stack = PL_tmps_stack;
264 c->tmps_floor = PL_tmps_floor;
265 c->tmps_ix = PL_tmps_ix;
266 c->tmps_max = PL_tmps_max;
267 c->markstack = PL_markstack;
268 c->markstack_ptr = PL_markstack_ptr;
269 c->markstack_max = PL_markstack_max;
270 c->scopestack = PL_scopestack;
271 c->scopestack_ix = PL_scopestack_ix;
272 c->scopestack_max = PL_scopestack_max;
273 c->savestack = PL_savestack;
274 c->savestack_ix = PL_savestack_ix;
275 c->savestack_max = PL_savestack_max;
276 c->retstack = PL_retstack;
277 c->retstack_ix = PL_retstack_ix;
278 c->retstack_max = PL_retstack_max;
279 c->curcop = PL_curcop;
280}
281
282static void
283LOAD(pTHX_ Coro__State c)
284{
285 PL_dowarn = c->dowarn;
286 GvAV (PL_defgv) = c->defav;
287 PL_curstackinfo = c->curstackinfo;
288 PL_curstack = c->curstack;
289 PL_mainstack = c->mainstack;
290 PL_stack_sp = c->stack_sp;
291 PL_op = c->op;
292 PL_curpad = c->curpad;
293 PL_stack_base = c->stack_base;
294 PL_stack_max = c->stack_max;
295 PL_tmps_stack = c->tmps_stack;
296 PL_tmps_floor = c->tmps_floor;
297 PL_tmps_ix = c->tmps_ix;
298 PL_tmps_max = c->tmps_max;
299 PL_markstack = c->markstack;
300 PL_markstack_ptr = c->markstack_ptr;
301 PL_markstack_max = c->markstack_max;
302 PL_scopestack = c->scopestack;
303 PL_scopestack_ix = c->scopestack_ix;
304 PL_scopestack_max = c->scopestack_max;
305 PL_savestack = c->savestack;
306 PL_savestack_ix = c->savestack_ix;
307 PL_savestack_max = c->savestack_max;
308 PL_retstack = c->retstack;
309 PL_retstack_ix = c->retstack_ix;
310 PL_retstack_max = c->retstack_max;
311 PL_curcop = c->curcop;
312
313 {
314 dSP;
315 CV *cv;
316
317 /* now do the ugly restore mess */
318 while ((cv = (CV *)POPs))
319 {
320 AV *padlist = (AV *)POPs;
321
322 put_padlist (cv);
323 CvPADLIST(cv) = padlist;
324 CvDEPTH(cv) = (I32)POPs;
325
326#ifdef USE_THREADS
327 CvOWNER(cv) = (struct perl_thread *)POPs;
328 error does not work either
329#endif
330 }
331
332 PUTBACK;
333 }
334}
335
336/* this is an EXACT copy of S_nuke_stacks in perl.c, which is unfortunately static */
337STATIC void
338destroy_stacks(pTHX)
339{
340 /* die does this while calling POPSTACK, but I just don't see why. */
341 dounwind(-1);
342
343 /* is this ugly, I ask? */
344 while (PL_scopestack_ix)
345 LEAVE;
346
347 while (PL_curstackinfo->si_next)
348 PL_curstackinfo = PL_curstackinfo->si_next;
349
350 while (PL_curstackinfo)
351 {
352 PERL_SI *p = PL_curstackinfo->si_prev;
353
354 SvREFCNT_dec(PL_curstackinfo->si_stack);
355 Safefree(PL_curstackinfo->si_cxstack);
356 Safefree(PL_curstackinfo);
357 PL_curstackinfo = p;
358 }
359
360 if (PL_scopestack_ix != 0)
361 Perl_warner(aTHX_ WARN_INTERNAL,
362 "Unbalanced scopes: %ld more ENTERs than LEAVEs\n",
363 (long)PL_scopestack_ix);
364 if (PL_savestack_ix != 0)
365 Perl_warner(aTHX_ WARN_INTERNAL,
366 "Unbalanced saves: %ld more saves than restores\n",
367 (long)PL_savestack_ix);
368 if (PL_tmps_floor != -1)
369 Perl_warner(aTHX_ WARN_INTERNAL,"Unbalanced tmps: %ld more allocs than frees\n",
370 (long)PL_tmps_floor + 1);
371 /*
372 */
373 Safefree(PL_tmps_stack);
374 Safefree(PL_markstack);
375 Safefree(PL_scopestack);
376 Safefree(PL_savestack);
377 Safefree(PL_retstack);
378}
379
380#define SUB_INIT "Coro::State::_newcoro"
381
382MODULE = Coro::State PACKAGE = Coro::State
383
384PROTOTYPES: ENABLE
385
386BOOT:
387 if (!padlist_cache)
388 padlist_cache = newHV ();
389
390Coro::State
391_newprocess(args)
392 SV * args
393 PROTOTYPE: $
394 CODE:
395 Coro__State coro;
396
397 if (!SvROK (args) || SvTYPE (SvRV (args)) != SVt_PVAV)
398 croak ("Coro::State::newprocess expects an arrayref");
399
400 New (0, coro, 1, struct coro); 2957 Newz (0, coro, 1, struct coro);
2958 coro->args = newAV ();
2959 coro->flags = CF_NEW;
401 2960
402 coro->mainstack = 0; /* actual work is done inside transfer */ 2961 if (coro_first) coro_first->prev = coro;
403 coro->args = (AV *)SvREFCNT_inc (SvRV (args)); 2962 coro->next = coro_first;
2963 coro_first = coro;
404 2964
405 RETVAL = coro; 2965 coro->hv = hv = newHV ();
2966 mg = sv_magicext ((SV *)hv, 0, CORO_MAGIC_type_state, &coro_state_vtbl, (char *)coro, 0);
2967 mg->mg_flags |= MGf_DUP;
2968 RETVAL = sv_bless (newRV_noinc ((SV *)hv), gv_stashpv (klass, 1));
2969
2970 if (items > 1)
2971 {
2972 av_extend (coro->args, items - 1 + ix - 1);
2973
2974 if (ix)
2975 {
2976 av_push (coro->args, SvREFCNT_inc_NN ((SV *)cb));
2977 cb = cv_coro_run;
2978 }
2979
2980 coro->startcv = (CV *)SvREFCNT_inc_NN ((SV *)cb);
2981
2982 for (i = 2; i < items; i++)
2983 av_push (coro->args, newSVsv (ST (i)));
2984 }
2985}
406 OUTPUT: 2986 OUTPUT:
407 RETVAL 2987 RETVAL
408 2988
409void 2989void
410transfer(prev,next) 2990transfer (...)
411 Coro::State_or_hashref prev 2991 PROTOTYPE: $$
412 Coro::State_or_hashref next 2992 CODE:
413 CODE: 2993 CORO_EXECUTE_SLF_XS (slf_init_transfer);
414 2994
415 if (prev != next) 2995bool
2996_destroy (SV *coro_sv)
2997 CODE:
2998 RETVAL = coro_state_destroy (aTHX_ SvSTATE (coro_sv));
2999 OUTPUT:
3000 RETVAL
3001
3002void
3003_exit (int code)
3004 PROTOTYPE: $
3005 CODE:
3006 _exit (code);
3007
3008SV *
3009clone (Coro::State coro)
3010 CODE:
3011{
3012#if CORO_CLONE
3013 struct coro *ncoro = coro_clone (aTHX_ coro);
3014 MAGIC *mg;
3015 /* TODO: too much duplication */
3016 ncoro->hv = newHV ();
3017 mg = sv_magicext ((SV *)ncoro->hv, 0, CORO_MAGIC_type_state, &coro_state_vtbl, (char *)ncoro, 0);
3018 mg->mg_flags |= MGf_DUP;
3019 RETVAL = sv_bless (newRV_noinc ((SV *)ncoro->hv), SvSTASH (coro->hv));
3020#else
3021 croak ("Coro::State->clone has not been configured into this installation of Coro, realised");
3022#endif
3023}
3024 OUTPUT:
3025 RETVAL
3026
3027int
3028cctx_stacksize (int new_stacksize = 0)
3029 PROTOTYPE: ;$
3030 CODE:
3031 RETVAL = cctx_stacksize;
3032 if (new_stacksize)
416 { 3033 {
417 PUTBACK; 3034 cctx_stacksize = new_stacksize;
418 SAVE (aTHX_ prev); 3035 ++cctx_gen;
419
420 /* 3036 }
421 * this could be done in newprocess which would lead to 3037 OUTPUT:
422 * extremely elegant and fast (just PUTBACK/SAVE/LOAD/SPAGAIN) 3038 RETVAL
423 * code here, but lazy allocation of stacks has also 3039
424 * some virtues and the overhead of the if() is nil. 3040int
3041cctx_max_idle (int max_idle = 0)
3042 PROTOTYPE: ;$
3043 CODE:
3044 RETVAL = cctx_max_idle;
3045 if (max_idle > 1)
3046 cctx_max_idle = max_idle;
3047 OUTPUT:
3048 RETVAL
3049
3050int
3051cctx_count ()
3052 PROTOTYPE:
3053 CODE:
3054 RETVAL = cctx_count;
3055 OUTPUT:
3056 RETVAL
3057
3058int
3059cctx_idle ()
3060 PROTOTYPE:
3061 CODE:
3062 RETVAL = cctx_idle;
3063 OUTPUT:
3064 RETVAL
3065
3066void
3067list ()
3068 PROTOTYPE:
3069 PPCODE:
3070{
3071 struct coro *coro;
3072 for (coro = coro_first; coro; coro = coro->next)
3073 if (coro->hv)
3074 XPUSHs (sv_2mortal (newRV_inc ((SV *)coro->hv)));
3075}
3076
3077void
3078call (Coro::State coro, SV *coderef)
3079 ALIAS:
3080 eval = 1
3081 CODE:
3082{
3083 if (coro->mainstack && ((coro->flags & CF_RUNNING) || coro->slot))
425 */ 3084 {
426 if (next->mainstack) 3085 struct coro *current = SvSTATE_current;
3086
3087 if (current != coro)
427 { 3088 {
428 LOAD (aTHX_ next); 3089 PUTBACK;
429 next->mainstack = 0; /* unnecessary but much cleaner */ 3090 save_perl (aTHX_ current);
3091 load_perl (aTHX_ coro);
430 SPAGAIN; 3092 SPAGAIN;
431 } 3093 }
3094
3095 PUSHSTACK;
3096
3097 PUSHMARK (SP);
3098 PUTBACK;
3099
3100 if (ix)
3101 eval_sv (coderef, 0);
432 else 3102 else
3103 call_sv (coderef, G_KEEPERR | G_EVAL | G_VOID | G_DISCARD);
3104
3105 SPAGAIN;
3106 POPSTACK;
3107
3108 if (current != coro)
433 { 3109 {
434 /* 3110 PUTBACK;
435 * emulate part of the perl startup here. 3111 save_perl (aTHX_ coro);
436 */ 3112 load_perl (aTHX_ current);
437 UNOP myop;
438
439 init_stacks (); /* from perl.c */
440 PL_op = (OP *)&myop;
441 /*PL_curcop = 0;*/
442 GvAV (PL_defgv) = (AV *)SvREFCNT_inc ((SV *)next->args);
443
444 SPAGAIN; 3113 SPAGAIN;
445 Zero(&myop, 1, UNOP);
446 myop.op_next = Nullop;
447 myop.op_flags = OPf_WANT_VOID;
448
449 PUSHMARK(SP);
450 XPUSHs ((SV*)get_cv(SUB_INIT, TRUE));
451 PUTBACK;
452 /*
453 * the next line is slightly wrong, as PL_op->op_next
454 * is actually being executed so we skip the first op.
455 * that doesn't matter, though, since it is only
456 * pp_nextstate and we never return...
457 */
458 PL_op = Perl_pp_entersub(aTHX);
459 SPAGAIN;
460
461 ENTER;
462 } 3114 }
463 } 3115 }
3116}
3117
3118SV *
3119is_ready (Coro::State coro)
3120 PROTOTYPE: $
3121 ALIAS:
3122 is_ready = CF_READY
3123 is_running = CF_RUNNING
3124 is_new = CF_NEW
3125 is_destroyed = CF_DESTROYED
3126 CODE:
3127 RETVAL = boolSV (coro->flags & ix);
3128 OUTPUT:
3129 RETVAL
464 3130
465void 3131void
466DESTROY(coro) 3132throw (Coro::State self, SV *throw = &PL_sv_undef)
467 Coro::State coro 3133 PROTOTYPE: $;$
468 CODE: 3134 CODE:
3135{
3136 struct coro *current = SvSTATE_current;
3137 SV **throwp = self == current ? &CORO_THROW : &self->except;
3138 SvREFCNT_dec (*throwp);
3139 *throwp = SvOK (throw) ? newSVsv (throw) : 0;
3140}
469 3141
470 if (coro->mainstack) 3142void
3143api_trace (SV *coro, int flags = CC_TRACE | CC_TRACE_SUB)
3144 PROTOTYPE: $;$
3145 C_ARGS: aTHX_ coro, flags
3146
3147SV *
3148has_cctx (Coro::State coro)
3149 PROTOTYPE: $
3150 CODE:
3151 /* maybe manage the running flag differently */
3152 RETVAL = boolSV (!!coro->cctx || (coro->flags & CF_RUNNING));
3153 OUTPUT:
3154 RETVAL
3155
3156int
3157is_traced (Coro::State coro)
3158 PROTOTYPE: $
3159 CODE:
3160 RETVAL = (coro->cctx ? coro->cctx->flags : 0) & CC_TRACE_ALL;
3161 OUTPUT:
3162 RETVAL
3163
3164UV
3165rss (Coro::State coro)
3166 PROTOTYPE: $
3167 ALIAS:
3168 usecount = 1
3169 CODE:
3170 switch (ix)
3171 {
3172 case 0: RETVAL = coro_rss (aTHX_ coro); break;
3173 case 1: RETVAL = coro->usecount; break;
3174 }
3175 OUTPUT:
3176 RETVAL
3177
3178void
3179force_cctx ()
3180 PROTOTYPE:
3181 CODE:
3182 cctx_current->idle_sp = 0;
3183
3184void
3185swap_defsv (Coro::State self)
3186 PROTOTYPE: $
3187 ALIAS:
3188 swap_defav = 1
3189 CODE:
3190 if (!self->slot)
3191 croak ("cannot swap state with coroutine that has no saved state,");
3192 else
471 { 3193 {
472 struct coro temp; 3194 SV **src = ix ? (SV **)&GvAV (PL_defgv) : &GvSV (PL_defgv);
3195 SV **dst = ix ? (SV **)&self->slot->defav : (SV **)&self->slot->defsv;
473 3196
3197 SV *tmp = *src; *src = *dst; *dst = tmp;
3198 }
3199
3200
3201MODULE = Coro::State PACKAGE = Coro
3202
3203BOOT:
3204{
3205 int i;
3206
3207 sv_pool_rss = coro_get_sv (aTHX_ "Coro::POOL_RSS" , TRUE);
3208 sv_pool_size = coro_get_sv (aTHX_ "Coro::POOL_SIZE" , TRUE);
3209 cv_coro_run = get_cv ( "Coro::_coro_run" , GV_ADD);
3210 cv_coro_terminate = get_cv ( "Coro::terminate" , GV_ADD);
3211 coro_current = coro_get_sv (aTHX_ "Coro::current" , FALSE); SvREADONLY_on (coro_current);
3212 av_async_pool = coro_get_av (aTHX_ "Coro::async_pool", TRUE);
3213 av_destroy = coro_get_av (aTHX_ "Coro::destroy" , TRUE);
3214 sv_manager = coro_get_sv (aTHX_ "Coro::manager" , TRUE);
3215 sv_idle = coro_get_sv (aTHX_ "Coro::idle" , TRUE);
3216
3217 sv_async_pool_idle = newSVpv ("[async pool idle]", 0); SvREADONLY_on (sv_async_pool_idle);
3218 sv_Coro = newSVpv ("Coro", 0); SvREADONLY_on (sv_Coro);
3219 cv_pool_handler = get_cv ("Coro::pool_handler", GV_ADD); SvREADONLY_on (cv_pool_handler);
3220 cv_coro_state_new = get_cv ("Coro::State::new", 0); SvREADONLY_on (cv_coro_state_new);
3221
3222 coro_stash = gv_stashpv ("Coro", TRUE);
3223
3224 newCONSTSUB (coro_stash, "PRIO_MAX", newSViv (PRIO_MAX));
3225 newCONSTSUB (coro_stash, "PRIO_HIGH", newSViv (PRIO_HIGH));
3226 newCONSTSUB (coro_stash, "PRIO_NORMAL", newSViv (PRIO_NORMAL));
3227 newCONSTSUB (coro_stash, "PRIO_LOW", newSViv (PRIO_LOW));
3228 newCONSTSUB (coro_stash, "PRIO_IDLE", newSViv (PRIO_IDLE));
3229 newCONSTSUB (coro_stash, "PRIO_MIN", newSViv (PRIO_MIN));
3230
3231 for (i = PRIO_MAX - PRIO_MIN + 1; i--; )
3232 coro_ready[i] = newAV ();
3233
3234 {
3235 SV *sv = coro_get_sv (aTHX_ "Coro::API", TRUE);
3236
3237 coroapi.schedule = api_schedule;
3238 coroapi.schedule_to = api_schedule_to;
3239 coroapi.cede = api_cede;
3240 coroapi.cede_notself = api_cede_notself;
3241 coroapi.ready = api_ready;
3242 coroapi.is_ready = api_is_ready;
3243 coroapi.nready = coro_nready;
3244 coroapi.current = coro_current;
3245
3246 /*GCoroAPI = &coroapi;*/
3247 sv_setiv (sv, (IV)&coroapi);
3248 SvREADONLY_on (sv);
3249 }
3250}
3251
3252void
3253terminate (...)
3254 CODE:
3255 CORO_EXECUTE_SLF_XS (slf_init_terminate);
3256
3257void
3258schedule (...)
3259 CODE:
3260 CORO_EXECUTE_SLF_XS (slf_init_schedule);
3261
3262void
3263schedule_to (...)
3264 CODE:
3265 CORO_EXECUTE_SLF_XS (slf_init_schedule_to);
3266
3267void
3268cede_to (...)
3269 CODE:
3270 CORO_EXECUTE_SLF_XS (slf_init_cede_to);
3271
3272void
3273cede (...)
3274 CODE:
3275 CORO_EXECUTE_SLF_XS (slf_init_cede);
3276
3277void
3278cede_notself (...)
3279 CODE:
3280 CORO_EXECUTE_SLF_XS (slf_init_cede_notself);
3281
3282void
3283_cancel (Coro::State self)
3284 CODE:
3285 coro_state_destroy (aTHX_ self);
3286 coro_call_on_destroy (aTHX_ self);
3287
3288void
3289_set_current (SV *current)
3290 PROTOTYPE: $
3291 CODE:
3292 SvREFCNT_dec (SvRV (coro_current));
3293 SvRV_set (coro_current, SvREFCNT_inc_NN (SvRV (current)));
3294
3295void
3296_set_readyhook (SV *hook)
3297 PROTOTYPE: $
3298 CODE:
3299 SvREFCNT_dec (coro_readyhook);
3300 coro_readyhook = SvOK (hook) ? newSVsv (hook) : 0;
3301
3302int
3303prio (Coro::State coro, int newprio = 0)
3304 PROTOTYPE: $;$
3305 ALIAS:
3306 nice = 1
3307 CODE:
3308{
3309 RETVAL = coro->prio;
3310
3311 if (items > 1)
3312 {
3313 if (ix)
3314 newprio = coro->prio - newprio;
3315
3316 if (newprio < PRIO_MIN) newprio = PRIO_MIN;
3317 if (newprio > PRIO_MAX) newprio = PRIO_MAX;
3318
3319 coro->prio = newprio;
3320 }
3321}
3322 OUTPUT:
3323 RETVAL
3324
3325SV *
3326ready (SV *self)
3327 PROTOTYPE: $
3328 CODE:
3329 RETVAL = boolSV (api_ready (aTHX_ self));
3330 OUTPUT:
3331 RETVAL
3332
3333int
3334nready (...)
3335 PROTOTYPE:
3336 CODE:
3337 RETVAL = coro_nready;
3338 OUTPUT:
3339 RETVAL
3340
3341void
3342_pool_handler (...)
3343 CODE:
3344 CORO_EXECUTE_SLF_XS (slf_init_pool_handler);
3345
3346void
3347async_pool (SV *cv, ...)
3348 PROTOTYPE: &@
3349 PPCODE:
3350{
3351 HV *hv = (HV *)av_pop (av_async_pool);
3352 AV *av = newAV ();
3353 SV *cb = ST (0);
3354 int i;
3355
3356 av_extend (av, items - 2);
3357 for (i = 1; i < items; ++i)
3358 av_push (av, SvREFCNT_inc_NN (ST (i)));
3359
3360 if ((SV *)hv == &PL_sv_undef)
3361 {
3362 PUSHMARK (SP);
3363 EXTEND (SP, 2);
3364 PUSHs (sv_Coro);
3365 PUSHs ((SV *)cv_pool_handler);
474 PUTBACK; 3366 PUTBACK;
475 SAVE(aTHX_ (&temp)); 3367 call_sv ((SV *)cv_coro_state_new, G_SCALAR);
476 LOAD(aTHX_ coro);
477
478 destroy_stacks ();
479 SvREFCNT_dec ((SV *)GvAV (PL_defgv));
480
481 LOAD((&temp));
482 SPAGAIN; 3368 SPAGAIN;
3369
3370 hv = (HV *)SvREFCNT_inc_NN (SvRV (POPs));
483 } 3371 }
484 3372
3373 {
3374 struct coro *coro = SvSTATE_hv (hv);
3375
3376 assert (!coro->invoke_cb);
3377 assert (!coro->invoke_av);
3378 coro->invoke_cb = SvREFCNT_inc (cb);
3379 coro->invoke_av = av;
3380 }
3381
3382 api_ready (aTHX_ (SV *)hv);
3383
3384 if (GIMME_V != G_VOID)
3385 XPUSHs (sv_2mortal (newRV_noinc ((SV *)hv)));
3386 else
485 SvREFCNT_dec (coro->args); 3387 SvREFCNT_dec (hv);
486 Safefree (coro); 3388}
487 3389
3390SV *
3391rouse_cb ()
3392 PROTOTYPE:
3393 CODE:
3394 RETVAL = coro_new_rouse_cb (aTHX);
3395 OUTPUT:
3396 RETVAL
488 3397
3398void
3399rouse_wait (...)
3400 PROTOTYPE: ;$
3401 PPCODE:
3402 CORO_EXECUTE_SLF_XS (slf_init_rouse_wait);
3403
3404void
3405on_enter (SV *block)
3406 ALIAS:
3407 on_leave = 1
3408 PROTOTYPE: &
3409 CODE:
3410{
3411 struct coro *coro = SvSTATE_current;
3412 AV **avp = ix ? &coro->on_leave : &coro->on_enter;
3413
3414 block = (SV *)coro_sv_2cv (block);
3415
3416 if (!*avp)
3417 *avp = newAV ();
3418
3419 av_push (*avp, SvREFCNT_inc (block));
3420
3421 if (!ix)
3422 on_enterleave_call (aTHX_ block);
3423
3424 LEAVE; /* pp_entersub unfortunately forces an ENTER/LEAVE around xs calls */
3425 SAVEDESTRUCTOR_X (ix ? coro_pop_on_leave : coro_pop_on_enter, (void *)coro);
3426 ENTER; /* pp_entersub unfortunately forces an ENTER/LEAVE around xs calls */
3427}
3428
3429
3430MODULE = Coro::State PACKAGE = PerlIO::cede
3431
3432BOOT:
3433 PerlIO_define_layer (aTHX_ &PerlIO_cede);
3434
3435
3436MODULE = Coro::State PACKAGE = Coro::Semaphore
3437
3438SV *
3439new (SV *klass, SV *count = 0)
3440 CODE:
3441 RETVAL = sv_bless (
3442 coro_waitarray_new (aTHX_ count && SvOK (count) ? SvIV (count) : 1),
3443 GvSTASH (CvGV (cv))
3444 );
3445 OUTPUT:
3446 RETVAL
3447
3448# helper for Coro::Channel
3449SV *
3450_alloc (int count)
3451 CODE:
3452 RETVAL = coro_waitarray_new (aTHX_ count);
3453 OUTPUT:
3454 RETVAL
3455
3456SV *
3457count (SV *self)
3458 CODE:
3459 RETVAL = newSVsv (AvARRAY ((AV *)SvRV (self))[0]);
3460 OUTPUT:
3461 RETVAL
3462
3463void
3464up (SV *self, int adjust = 1)
3465 ALIAS:
3466 adjust = 1
3467 CODE:
3468 coro_semaphore_adjust (aTHX_ (AV *)SvRV (self), ix ? adjust : 1);
3469
3470void
3471down (...)
3472 CODE:
3473 CORO_EXECUTE_SLF_XS (slf_init_semaphore_down);
3474
3475void
3476wait (...)
3477 CODE:
3478 CORO_EXECUTE_SLF_XS (slf_init_semaphore_wait);
3479
3480void
3481try (SV *self)
3482 PPCODE:
3483{
3484 AV *av = (AV *)SvRV (self);
3485 SV *count_sv = AvARRAY (av)[0];
3486 IV count = SvIVX (count_sv);
3487
3488 if (count > 0)
3489 {
3490 --count;
3491 SvIVX (count_sv) = count;
3492 XSRETURN_YES;
3493 }
3494 else
3495 XSRETURN_NO;
3496}
3497
3498void
3499waiters (SV *self)
3500 PPCODE:
3501{
3502 AV *av = (AV *)SvRV (self);
3503 int wcount = AvFILLp (av) + 1 - 1;
3504
3505 if (GIMME_V == G_SCALAR)
3506 XPUSHs (sv_2mortal (newSViv (wcount)));
3507 else
3508 {
3509 int i;
3510 EXTEND (SP, wcount);
3511 for (i = 1; i <= wcount; ++i)
3512 PUSHs (sv_2mortal (newRV_inc (AvARRAY (av)[i])));
3513 }
3514}
3515
3516MODULE = Coro::State PACKAGE = Coro::Signal
3517
3518SV *
3519new (SV *klass)
3520 CODE:
3521 RETVAL = sv_bless (
3522 coro_waitarray_new (aTHX_ 0),
3523 GvSTASH (CvGV (cv))
3524 );
3525 OUTPUT:
3526 RETVAL
3527
3528void
3529wait (...)
3530 CODE:
3531 CORO_EXECUTE_SLF_XS (slf_init_signal_wait);
3532
3533void
3534broadcast (SV *self)
3535 CODE:
3536{
3537 AV *av = (AV *)SvRV (self);
3538 coro_signal_wake (aTHX_ av, AvFILLp (av));
3539}
3540
3541void
3542send (SV *self)
3543 CODE:
3544{
3545 AV *av = (AV *)SvRV (self);
3546
3547 if (AvFILLp (av))
3548 coro_signal_wake (aTHX_ av, 1);
3549 else
3550 SvIVX (AvARRAY (av)[0]) = 1; /* remember the signal */
3551}
3552
3553IV
3554awaited (SV *self)
3555 CODE:
3556 RETVAL = AvFILLp ((AV *)SvRV (self)) + 1 - 1;
3557 OUTPUT:
3558 RETVAL
3559
3560
3561MODULE = Coro::State PACKAGE = Coro::AnyEvent
3562
3563BOOT:
3564 sv_activity = coro_get_sv (aTHX_ "Coro::AnyEvent::ACTIVITY", TRUE);
3565
3566void
3567_schedule (...)
3568 CODE:
3569{
3570 static int incede;
3571
3572 api_cede_notself (aTHX);
3573
3574 ++incede;
3575 while (coro_nready >= incede && api_cede (aTHX))
3576 ;
3577
3578 sv_setsv (sv_activity, &PL_sv_undef);
3579 if (coro_nready >= incede)
3580 {
3581 PUSHMARK (SP);
3582 PUTBACK;
3583 call_pv ("Coro::AnyEvent::_activity", G_KEEPERR | G_EVAL | G_VOID | G_DISCARD);
3584 }
3585
3586 --incede;
3587}
3588
3589
3590MODULE = Coro::State PACKAGE = Coro::AIO
3591
3592void
3593_register (char *target, char *proto, SV *req)
3594 CODE:
3595{
3596 CV *req_cv = coro_sv_2cv (aTHX_ req);
3597 /* newXSproto doesn't return the CV on 5.8 */
3598 CV *slf_cv = newXS (target, coro_aio_req_xs, __FILE__);
3599 sv_setpv ((SV *)slf_cv, proto);
3600 sv_magicext ((SV *)slf_cv, (SV *)req_cv, CORO_MAGIC_type_aio, 0, 0, 0);
3601}
3602

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines