ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/Coro/Coro/State.xs
(Generate patch)

Comparing Coro/Coro/State.xs (file contents):
Revision 1.4 by root, Tue Jul 17 02:21:56 2001 UTC vs.
Revision 1.331 by root, Thu Nov 27 12:00:59 2008 UTC

1#include "libcoro/coro.c"
2
3#define PERL_NO_GET_CONTEXT
4#define PERL_EXT
5
1#include "EXTERN.h" 6#include "EXTERN.h"
2#include "perl.h" 7#include "perl.h"
3#include "XSUB.h" 8#include "XSUB.h"
9#include "perliol.h"
4 10
5#if 0 11#include "patchlevel.h"
6# define CHK(x) (void *)0 12
13#include <stdio.h>
14#include <errno.h>
15#include <assert.h>
16
17#ifdef WIN32
18# undef setjmp
19# undef longjmp
20# undef _exit
21# define setjmp _setjmp /* deep magic */
7#else 22#else
8# define CHK(x) if (!(x)) croak("FATAL, CHK: " #x) 23# include <inttypes.h> /* most portable stdint.h */
24#endif
25
26#ifdef HAVE_MMAP
27# include <unistd.h>
28# include <sys/mman.h>
29# ifndef MAP_ANONYMOUS
30# ifdef MAP_ANON
31# define MAP_ANONYMOUS MAP_ANON
32# else
33# undef HAVE_MMAP
34# endif
9#endif 35# endif
36# include <limits.h>
37# ifndef PAGESIZE
38# define PAGESIZE pagesize
39# define BOOT_PAGESIZE pagesize = sysconf (_SC_PAGESIZE)
40static long pagesize;
41# else
42# define BOOT_PAGESIZE (void)0
43# endif
44#else
45# define PAGESIZE 0
46# define BOOT_PAGESIZE (void)0
47#endif
10 48
49#if CORO_USE_VALGRIND
50# include <valgrind/valgrind.h>
51#endif
52
53/* the maximum number of idle cctx that will be pooled */
54static int cctx_max_idle = 4;
55
56#define PERL_VERSION_ATLEAST(a,b,c) \
57 (PERL_REVISION > (a) \
58 || (PERL_REVISION == (a) \
59 && (PERL_VERSION > (b) \
60 || (PERL_VERSION == (b) && PERL_SUBVERSION >= (c)))))
61
62#if !PERL_VERSION_ATLEAST (5,6,0)
63# ifndef PL_ppaddr
64# define PL_ppaddr ppaddr
65# endif
66# ifndef call_sv
67# define call_sv perl_call_sv
68# endif
69# ifndef get_sv
70# define get_sv perl_get_sv
71# endif
72# ifndef get_cv
73# define get_cv perl_get_cv
74# endif
75# ifndef IS_PADGV
76# define IS_PADGV(v) 0
77# endif
78# ifndef IS_PADCONST
79# define IS_PADCONST(v) 0
80# endif
81#endif
82
83/* 5.11 */
84#ifndef CxHASARGS
85# define CxHASARGS(cx) (cx)->blk_sub.hasargs
86#endif
87
88/* 5.10.0 */
89#ifndef SvREFCNT_inc_NN
90# define SvREFCNT_inc_NN(sv) SvREFCNT_inc (sv)
91#endif
92
93/* 5.8.8 */
94#ifndef GV_NOTQUAL
95# define GV_NOTQUAL 0
96#endif
97#ifndef newSV
98# define newSV(l) NEWSV(0,l)
99#endif
100#ifndef CvISXSUB_on
101# define CvISXSUB_on(cv) (void)cv
102#endif
103#ifndef CvISXSUB
104# define CvISXSUB(cv) (CvXSUB (cv) ? TRUE : FALSE)
105#endif
106#ifndef Newx
107# define Newx(ptr,nitems,type) New (0,ptr,nitems,type)
108#endif
109
110/* 5.8.7 */
111#ifndef SvRV_set
112# define SvRV_set(s,v) SvRV(s) = (v)
113#endif
114
115#if !__i386 && !__x86_64 && !__powerpc && !__m68k && !__alpha && !__mips && !__sparc64
116# undef CORO_STACKGUARD
117#endif
118
119#ifndef CORO_STACKGUARD
120# define CORO_STACKGUARD 0
121#endif
122
123/* prefer perl internal functions over our own? */
124#ifndef CORO_PREFER_PERL_FUNCTIONS
125# define CORO_PREFER_PERL_FUNCTIONS 0
126#endif
127
128/* The next macros try to return the current stack pointer, in an as
129 * portable way as possible. */
130#if __GNUC__ >= 4
131# define dSTACKLEVEL int stacklevel_dummy
132# define STACKLEVEL __builtin_frame_address (0)
133#else
134# define dSTACKLEVEL volatile void *stacklevel
135# define STACKLEVEL ((void *)&stacklevel)
136#endif
137
138#define IN_DESTRUCT PL_dirty
139
140#if __GNUC__ >= 3
141# define attribute(x) __attribute__(x)
142# define expect(expr,value) __builtin_expect ((expr),(value))
143# define INLINE static inline
144#else
145# define attribute(x)
146# define expect(expr,value) (expr)
147# define INLINE static
148#endif
149
150#define expect_false(expr) expect ((expr) != 0, 0)
151#define expect_true(expr) expect ((expr) != 0, 1)
152
153#define NOINLINE attribute ((noinline))
154
155#include "CoroAPI.h"
156#define GCoroAPI (&coroapi) /* very sneaky */
157
158#ifdef USE_ITHREADS
159# if CORO_PTHREAD
160static void *coro_thx;
161# endif
162#endif
163
164static double (*nvtime)(); /* so why doesn't it take void? */
165
166/* we hijack an hopefully unused CV flag for our purposes */
167#define CVf_SLF 0x4000
168static OP *pp_slf (pTHX);
169
170static U32 cctx_gen;
171static size_t cctx_stacksize = CORO_STACKSIZE;
172static struct CoroAPI coroapi;
173static AV *main_mainstack; /* used to differentiate between $main and others */
174static JMPENV *main_top_env;
175static HV *coro_state_stash, *coro_stash;
176static volatile SV *coro_mortal; /* will be freed/thrown after next transfer */
177
178static AV *av_destroy; /* destruction queue */
179static SV *sv_manager; /* the manager coro */
180static SV *sv_idle; /* $Coro::idle */
181
182static GV *irsgv; /* $/ */
183static GV *stdoutgv; /* *STDOUT */
184static SV *rv_diehook;
185static SV *rv_warnhook;
186static HV *hv_sig; /* %SIG */
187
188/* async_pool helper stuff */
189static SV *sv_pool_rss;
190static SV *sv_pool_size;
191static SV *sv_async_pool_idle; /* description string */
192static AV *av_async_pool; /* idle pool */
193static SV *sv_Coro; /* class string */
194static CV *cv_pool_handler;
195static CV *cv_coro_state_new;
196
197/* Coro::AnyEvent */
198static SV *sv_activity;
199
200static struct coro_cctx *cctx_first;
201static int cctx_count, cctx_idle;
202
203enum {
204 CC_MAPPED = 0x01,
205 CC_NOREUSE = 0x02, /* throw this away after tracing */
206 CC_TRACE = 0x04,
207 CC_TRACE_SUB = 0x08, /* trace sub calls */
208 CC_TRACE_LINE = 0x10, /* trace each statement */
209 CC_TRACE_ALL = CC_TRACE_SUB | CC_TRACE_LINE,
210};
211
212/* this is a structure representing a c-level coroutine */
213typedef struct coro_cctx
214{
215 struct coro_cctx *next;
216
217 /* the stack */
218 void *sptr;
219 size_t ssize;
220
221 /* cpu state */
222 void *idle_sp; /* sp of top-level transfer/schedule/cede call */
223 JMPENV *idle_te; /* same as idle_sp, but for top_env, TODO: remove once stable */
224 JMPENV *top_env;
225 coro_context cctx;
226
227 U32 gen;
228#if CORO_USE_VALGRIND
229 int valgrind_id;
230#endif
231 unsigned char flags;
232} coro_cctx;
233
234coro_cctx *cctx_current; /* the currently running cctx */
235
236/*****************************************************************************/
237
238enum {
239 CF_RUNNING = 0x0001, /* coroutine is running */
240 CF_READY = 0x0002, /* coroutine is ready */
241 CF_NEW = 0x0004, /* has never been switched to */
242 CF_DESTROYED = 0x0008, /* coroutine data has been freed */
243};
244
245/* the structure where most of the perl state is stored, overlaid on the cxstack */
246typedef struct
247{
248 SV *defsv;
249 AV *defav;
250 SV *errsv;
251 SV *irsgv;
252 HV *hinthv;
253#define VAR(name,type) type name;
254# include "state.h"
255#undef VAR
256} perl_slots;
257
258#define SLOT_COUNT ((sizeof (perl_slots) + sizeof (PERL_CONTEXT) - 1) / sizeof (PERL_CONTEXT))
259
260/* this is a structure representing a perl-level coroutine */
11struct coro { 261struct coro {
12 U8 dowarn; 262 /* the C coroutine allocated to this perl coroutine, if any */
13 AV *defav; 263 coro_cctx *cctx;
14 264
15 PERL_SI *curstackinfo; 265 /* state data */
16 AV *curstack; 266 struct CoroSLF slf_frame; /* saved slf frame */
17 AV *mainstack; 267 AV *mainstack;
18 SV **stack_sp; 268 perl_slots *slot; /* basically the saved sp */
19 OP *op;
20 SV **curpad;
21 SV **stack_base;
22 SV **stack_max;
23 SV **tmps_stack;
24 I32 tmps_floor;
25 I32 tmps_ix;
26 I32 tmps_max;
27 I32 *markstack;
28 I32 *markstack_ptr;
29 I32 *markstack_max;
30 I32 *scopestack;
31 I32 scopestack_ix;
32 I32 scopestack_max;
33 ANY *savestack;
34 I32 savestack_ix;
35 I32 savestack_max;
36 OP **retstack;
37 I32 retstack_ix;
38 I32 retstack_max;
39 COP *curcop;
40 269
41 AV *args; 270 CV *startcv; /* the CV to execute */
271 AV *args; /* data associated with this coroutine (initial args) */
272 int refcnt; /* coroutines are refcounted, yes */
273 int flags; /* CF_ flags */
274 HV *hv; /* the perl hash associated with this coro, if any */
275 void (*on_destroy)(pTHX_ struct coro *coro);
276
277 /* statistics */
278 int usecount; /* number of transfers to this coro */
279
280 /* coro process data */
281 int prio;
282 SV *except; /* exception to be thrown */
283 SV *rouse_cb;
284
285 /* async_pool */
286 SV *saved_deffh;
287 SV *invoke_cb;
288 AV *invoke_av;
289
290 /* linked list */
291 struct coro *next, *prev;
42}; 292};
43 293
44typedef struct coro *Coro__State; 294typedef struct coro *Coro__State;
45typedef struct coro *Coro__State_or_hashref; 295typedef struct coro *Coro__State_or_hashref;
46 296
47static HV *padlist_cache; 297/* the following variables are effectively part of the perl context */
298/* and get copied between struct coro and these variables */
299/* the mainr easonw e don't support windows process emulation */
300static struct CoroSLF slf_frame; /* the current slf frame */
48 301
49/* mostly copied from op.c:cv_clone2 */ 302/** Coro ********************************************************************/
50STATIC AV * 303
51clone_padlist (AV *protopadlist) 304#define PRIO_MAX 3
305#define PRIO_HIGH 1
306#define PRIO_NORMAL 0
307#define PRIO_LOW -1
308#define PRIO_IDLE -3
309#define PRIO_MIN -4
310
311/* for Coro.pm */
312static SV *coro_current;
313static SV *coro_readyhook;
314static AV *coro_ready [PRIO_MAX - PRIO_MIN + 1];
315static CV *cv_coro_run, *cv_coro_terminate;
316static struct coro *coro_first;
317#define coro_nready coroapi.nready
318
319/** lowlevel stuff **********************************************************/
320
321static SV *
322coro_get_sv (pTHX_ const char *name, int create)
52{ 323{
53 AV *av; 324#if PERL_VERSION_ATLEAST (5,10,0)
54 I32 ix; 325 /* silence stupid and wrong 5.10 warning that I am unable to switch off */
55 AV *protopad_name = (AV *) * av_fetch (protopadlist, 0, FALSE); 326 get_sv (name, create);
56 AV *protopad = (AV *) * av_fetch (protopadlist, 1, FALSE); 327#endif
57 SV **pname = AvARRAY (protopad_name); 328 return get_sv (name, create);
58 SV **ppad = AvARRAY (protopad); 329}
59 I32 fname = AvFILLp (protopad_name); 330
60 I32 fpad = AvFILLp (protopad); 331static AV *
332coro_get_av (pTHX_ const char *name, int create)
333{
334#if PERL_VERSION_ATLEAST (5,10,0)
335 /* silence stupid and wrong 5.10 warning that I am unable to switch off */
336 get_av (name, create);
337#endif
338 return get_av (name, create);
339}
340
341static HV *
342coro_get_hv (pTHX_ const char *name, int create)
343{
344#if PERL_VERSION_ATLEAST (5,10,0)
345 /* silence stupid and wrong 5.10 warning that I am unable to switch off */
346 get_hv (name, create);
347#endif
348 return get_hv (name, create);
349}
350
351/* may croak */
352INLINE CV *
353coro_sv_2cv (pTHX_ SV *sv)
354{
355 HV *st;
356 GV *gvp;
357 return sv_2cv (sv, &st, &gvp, 0);
358}
359
360static AV *
361coro_derive_padlist (pTHX_ CV *cv)
362{
363 AV *padlist = CvPADLIST (cv);
61 AV *newpadlist, *newpad_name, *newpad; 364 AV *newpadlist, *newpad;
62 SV **npad;
63
64 newpad_name = newAV ();
65 for (ix = fname; ix >= 0; ix--)
66 av_store (newpad_name, ix, SvREFCNT_inc (pname[ix]));
67
68 newpad = newAV ();
69 av_fill (newpad, AvFILLp (protopad));
70 npad = AvARRAY (newpad);
71 365
72 newpadlist = newAV (); 366 newpadlist = newAV ();
73 AvREAL_off (newpadlist); 367 AvREAL_off (newpadlist);
74 av_store (newpadlist, 0, (SV *) newpad_name); 368#if PERL_VERSION_ATLEAST (5,10,0)
369 Perl_pad_push (aTHX_ padlist, AvFILLp (padlist) + 1);
370#else
371 Perl_pad_push (aTHX_ padlist, AvFILLp (padlist) + 1, 1);
372#endif
373 newpad = (AV *)AvARRAY (padlist)[AvFILLp (padlist)];
374 --AvFILLp (padlist);
375
376 av_store (newpadlist, 0, SvREFCNT_inc_NN (*av_fetch (padlist, 0, FALSE)));
75 av_store (newpadlist, 1, (SV *) newpad); 377 av_store (newpadlist, 1, (SV *)newpad);
76
77 av = newAV (); /* will be @_ */
78 av_extend (av, 0);
79 av_store (newpad, 0, (SV *) av);
80 AvFLAGS (av) = AVf_REIFY;
81
82 for (ix = fpad; ix > 0; ix--)
83 {
84 SV *namesv = (ix <= fname) ? pname[ix] : Nullsv;
85 if (namesv && namesv != &PL_sv_undef)
86 {
87 char *name = SvPVX (namesv); /* XXX */
88 if (SvFLAGS (namesv) & SVf_FAKE || *name == '&')
89 { /* lexical from outside? */
90 npad[ix] = SvREFCNT_inc (ppad[ix]);
91 }
92 else
93 { /* our own lexical */
94 SV *sv;
95 if (*name == '&')
96 sv = SvREFCNT_inc (ppad[ix]);
97 else if (*name == '@')
98 sv = (SV *) newAV ();
99 else if (*name == '%')
100 sv = (SV *) newHV ();
101 else
102 sv = NEWSV (0, 0);
103 if (!SvPADBUSY (sv))
104 SvPADMY_on (sv);
105 npad[ix] = sv;
106 }
107 }
108 else if (IS_PADGV (ppad[ix]) || IS_PADCONST (ppad[ix]))
109 {
110 npad[ix] = SvREFCNT_inc (ppad[ix]);
111 }
112 else
113 {
114 SV *sv = NEWSV (0, 0);
115 SvPADTMP_on (sv);
116 npad[ix] = sv;
117 }
118 }
119
120#if 0 /* NONOTUNDERSTOOD */
121 /* Now that vars are all in place, clone nested closures. */
122
123 for (ix = fpad; ix > 0; ix--) {
124 SV* namesv = (ix <= fname) ? pname[ix] : Nullsv;
125 if (namesv
126 && namesv != &PL_sv_undef
127 && !(SvFLAGS(namesv) & SVf_FAKE)
128 && *SvPVX(namesv) == '&'
129 && CvCLONE(ppad[ix]))
130 {
131 CV *kid = cv_clone((CV*)ppad[ix]);
132 SvREFCNT_dec(ppad[ix]);
133 CvCLONE_on(kid);
134 SvPADMY_on(kid);
135 npad[ix] = (SV*)kid;
136 }
137 }
138#endif
139 378
140 return newpadlist; 379 return newpadlist;
141} 380}
142 381
143STATIC AV * 382static void
144free_padlist (AV *padlist) 383free_padlist (pTHX_ AV *padlist)
145{ 384{
146 /* may be during global destruction */ 385 /* may be during global destruction */
147 if (SvREFCNT(padlist)) 386 if (!IN_DESTRUCT)
148 { 387 {
149 I32 i = AvFILLp(padlist); 388 I32 i = AvFILLp (padlist);
389
150 while (i >= 0) 390 while (i >= 0)
151 { 391 {
152 SV **svp = av_fetch(padlist, i--, FALSE); 392 /* we try to be extra-careful here */
153 SV *sv = svp ? *svp : Nullsv; 393 AV *av = (AV *)AvARRAY (padlist)[i--];
154 if (sv) 394
395 I32 i = AvFILLp (av);
396
397 while (i >= 0)
398 SvREFCNT_dec (AvARRAY (av)[i--]);
399
400 AvFILLp (av) = -1;
155 SvREFCNT_dec(sv); 401 SvREFCNT_dec (av);
156 } 402 }
157 403
404 AvFILLp (padlist) = -1;
158 SvREFCNT_dec((SV*)padlist); 405 SvREFCNT_dec ((SV*)padlist);
406 }
407}
408
409static int
410coro_cv_free (pTHX_ SV *sv, MAGIC *mg)
411{
412 AV *padlist;
413 AV *av = (AV *)mg->mg_obj;
414
415 /* casting is fun. */
416 while (&PL_sv_undef != (SV *)(padlist = (AV *)av_pop (av)))
417 free_padlist (aTHX_ padlist);
418
419 SvREFCNT_dec (av); /* sv_magicext increased the refcount */
420
421 return 0;
422}
423
424#define CORO_MAGIC_type_cv 26
425#define CORO_MAGIC_type_state PERL_MAGIC_ext
426
427static MGVTBL coro_cv_vtbl = {
428 0, 0, 0, 0,
429 coro_cv_free
430};
431
432#define CORO_MAGIC_NN(sv, type) \
433 (expect_true (SvMAGIC (sv)->mg_type == type) \
434 ? SvMAGIC (sv) \
435 : mg_find (sv, type))
436
437#define CORO_MAGIC(sv, type) \
438 (expect_true (SvMAGIC (sv)) \
439 ? CORO_MAGIC_NN (sv, type) \
440 : 0)
441
442#define CORO_MAGIC_cv(cv) CORO_MAGIC (((SV *)(cv)), CORO_MAGIC_type_cv)
443#define CORO_MAGIC_state(sv) CORO_MAGIC_NN (((SV *)(sv)), CORO_MAGIC_type_state)
444
445INLINE struct coro *
446SvSTATE_ (pTHX_ SV *coro)
447{
448 HV *stash;
449 MAGIC *mg;
450
451 if (SvROK (coro))
452 coro = SvRV (coro);
453
454 if (expect_false (SvTYPE (coro) != SVt_PVHV))
455 croak ("Coro::State object required");
456
457 stash = SvSTASH (coro);
458 if (expect_false (stash != coro_stash && stash != coro_state_stash))
459 {
460 /* very slow, but rare, check */
461 if (!sv_derived_from (sv_2mortal (newRV_inc (coro)), "Coro::State"))
462 croak ("Coro::State object required");
463 }
464
465 mg = CORO_MAGIC_state (coro);
466 return (struct coro *)mg->mg_ptr;
467}
468
469#define SvSTATE(sv) SvSTATE_ (aTHX_ (sv))
470
471/* faster than SvSTATE, but expects a coroutine hv */
472#define SvSTATE_hv(hv) ((struct coro *)CORO_MAGIC_NN ((SV *)hv, CORO_MAGIC_type_state)->mg_ptr)
473#define SvSTATE_current SvSTATE_hv (SvRV (coro_current))
474
475/* the next two functions merely cache the padlists */
476static void
477get_padlist (pTHX_ CV *cv)
478{
479 MAGIC *mg = CORO_MAGIC_cv (cv);
480 AV *av;
481
482 if (expect_true (mg && AvFILLp ((av = (AV *)mg->mg_obj)) >= 0))
483 CvPADLIST (cv) = (AV *)AvARRAY (av)[AvFILLp (av)--];
484 else
485 {
486#if CORO_PREFER_PERL_FUNCTIONS
487 /* this is probably cleaner? but also slower! */
488 /* in practise, it seems to be less stable */
489 CV *cp = Perl_cv_clone (aTHX_ cv);
490 CvPADLIST (cv) = CvPADLIST (cp);
491 CvPADLIST (cp) = 0;
492 SvREFCNT_dec (cp);
493#else
494 CvPADLIST (cv) = coro_derive_padlist (aTHX_ cv);
495#endif
496 }
497}
498
499static void
500put_padlist (pTHX_ CV *cv)
501{
502 MAGIC *mg = CORO_MAGIC_cv (cv);
503 AV *av;
504
505 if (expect_false (!mg))
506 mg = sv_magicext ((SV *)cv, (SV *)newAV (), CORO_MAGIC_type_cv, &coro_cv_vtbl, 0, 0);
507
508 av = (AV *)mg->mg_obj;
509
510 if (expect_false (AvFILLp (av) >= AvMAX (av)))
511 av_extend (av, AvFILLp (av) + 1);
512
513 AvARRAY (av)[++AvFILLp (av)] = (SV *)CvPADLIST (cv);
514}
515
516/** load & save, init *******************************************************/
517
518static void
519load_perl (pTHX_ Coro__State c)
520{
521 perl_slots *slot = c->slot;
522 c->slot = 0;
523
524 PL_mainstack = c->mainstack;
525
526 GvSV (PL_defgv) = slot->defsv;
527 GvAV (PL_defgv) = slot->defav;
528 GvSV (PL_errgv) = slot->errsv;
529 GvSV (irsgv) = slot->irsgv;
530 GvHV (PL_hintgv) = slot->hinthv;
531
532 #define VAR(name,type) PL_ ## name = slot->name;
533 # include "state.h"
534 #undef VAR
535
536 {
537 dSP;
538
539 CV *cv;
540
541 /* now do the ugly restore mess */
542 while (expect_true (cv = (CV *)POPs))
543 {
544 put_padlist (aTHX_ cv); /* mark this padlist as available */
545 CvDEPTH (cv) = PTR2IV (POPs);
546 CvPADLIST (cv) = (AV *)POPs;
547 }
548
549 PUTBACK;
159 } 550 }
160}
161 551
162/* the next tow functions merely cache the padlists */ 552 slf_frame = c->slf_frame;
163STATIC void 553 CORO_THROW = c->except;
164get_padlist (CV *cv)
165{
166 SV **he = hv_fetch (padlist_cache, (void *)&cv, sizeof (CV *), 0);
167
168 if (he && AvFILLp ((AV *)*he) >= 0)
169 CvPADLIST (cv) = (AV *)av_pop ((AV *)*he);
170 else
171 CvPADLIST (cv) = clone_padlist (CvPADLIST (cv));
172} 554}
173 555
174STATIC void
175put_padlist (CV *cv)
176{
177 SV **he = hv_fetch (padlist_cache, (void *)&cv, sizeof (CV *), 1);
178
179 if (SvTYPE (*he) != SVt_PVAV)
180 {
181 SvREFCNT_dec (*he);
182 *he = (SV *)newAV ();
183 }
184
185 av_push ((AV *)*he, (SV *)CvPADLIST (cv));
186}
187
188static void 556static void
189SAVE(pTHX_ Coro__State c) 557save_perl (pTHX_ Coro__State c)
190{ 558{
559 c->except = CORO_THROW;
560 c->slf_frame = slf_frame;
561
191 { 562 {
192 dSP; 563 dSP;
193 I32 cxix = cxstack_ix; 564 I32 cxix = cxstack_ix;
565 PERL_CONTEXT *ccstk = cxstack;
194 PERL_SI *top_si = PL_curstackinfo; 566 PERL_SI *top_si = PL_curstackinfo;
195 PERL_CONTEXT *ccstk = cxstack;
196 567
197 /* 568 /*
198 * the worst thing you can imagine happens first - we have to save 569 * the worst thing you can imagine happens first - we have to save
199 * (and reinitialize) all cv's in the whole callchain :( 570 * (and reinitialize) all cv's in the whole callchain :(
200 */ 571 */
201 572
202 PUSHs (Nullsv); 573 XPUSHs (Nullsv);
203 /* this loop was inspired by pp_caller */ 574 /* this loop was inspired by pp_caller */
204 for (;;) 575 for (;;)
205 { 576 {
206 while (cxix >= 0) 577 while (expect_true (cxix >= 0))
207 { 578 {
208 PERL_CONTEXT *cx = &ccstk[cxix--]; 579 PERL_CONTEXT *cx = &ccstk[cxix--];
209 580
210 if (CxTYPE(cx) == CXt_SUB) 581 if (expect_true (CxTYPE (cx) == CXt_SUB || CxTYPE (cx) == CXt_FORMAT))
211 { 582 {
212 CV *cv = cx->blk_sub.cv; 583 CV *cv = cx->blk_sub.cv;
584
213 if (CvDEPTH(cv)) 585 if (expect_true (CvDEPTH (cv)))
214 { 586 {
215#ifdef USE_THREADS
216 XPUSHs ((SV *)CvOWNER(cv));
217#endif
218 EXTEND (SP, 3); 587 EXTEND (SP, 3);
219 PUSHs ((SV *)CvDEPTH(cv));
220 PUSHs ((SV *)CvPADLIST(cv)); 588 PUSHs ((SV *)CvPADLIST (cv));
589 PUSHs (INT2PTR (SV *, (IV)CvDEPTH (cv)));
221 PUSHs ((SV *)cv); 590 PUSHs ((SV *)cv);
222 591
223 get_padlist (cv);
224
225 CvDEPTH(cv) = 0; 592 CvDEPTH (cv) = 0;
226#ifdef USE_THREADS 593 get_padlist (aTHX_ cv);
227 CvOWNER(cv) = 0;
228 error must unlock this cv etc.. etc...
229 if you are here wondering about this error message then
230 the reason is that it will not work as advertised yet
231#endif
232 } 594 }
233 } 595 }
234 else if (CxTYPE(cx) == CXt_FORMAT) 596 }
597
598 if (expect_true (top_si->si_type == PERLSI_MAIN))
599 break;
600
601 top_si = top_si->si_prev;
602 ccstk = top_si->si_cxstack;
603 cxix = top_si->si_cxix;
604 }
605
606 PUTBACK;
607 }
608
609 /* allocate some space on the context stack for our purposes */
610 /* we manually unroll here, as usually 2 slots is enough */
611 if (SLOT_COUNT >= 1) CXINC;
612 if (SLOT_COUNT >= 2) CXINC;
613 if (SLOT_COUNT >= 3) CXINC;
614 {
615 int i;
616 for (i = 3; i < SLOT_COUNT; ++i)
617 CXINC;
618 }
619 cxstack_ix -= SLOT_COUNT; /* undo allocation */
620
621 c->mainstack = PL_mainstack;
622
623 {
624 perl_slots *slot = c->slot = (perl_slots *)(cxstack + cxstack_ix + 1);
625
626 slot->defav = GvAV (PL_defgv);
627 slot->defsv = DEFSV;
628 slot->errsv = ERRSV;
629 slot->irsgv = GvSV (irsgv);
630 slot->hinthv = GvHV (PL_hintgv);
631
632 #define VAR(name,type) slot->name = PL_ ## name;
633 # include "state.h"
634 #undef VAR
635 }
636}
637
638/*
639 * allocate various perl stacks. This is almost an exact copy
640 * of perl.c:init_stacks, except that it uses less memory
641 * on the (sometimes correct) assumption that coroutines do
642 * not usually need a lot of stackspace.
643 */
644#if CORO_PREFER_PERL_FUNCTIONS
645# define coro_init_stacks(thx) init_stacks ()
646#else
647static void
648coro_init_stacks (pTHX)
649{
650 PL_curstackinfo = new_stackinfo(32, 8);
651 PL_curstackinfo->si_type = PERLSI_MAIN;
652 PL_curstack = PL_curstackinfo->si_stack;
653 PL_mainstack = PL_curstack; /* remember in case we switch stacks */
654
655 PL_stack_base = AvARRAY(PL_curstack);
656 PL_stack_sp = PL_stack_base;
657 PL_stack_max = PL_stack_base + AvMAX(PL_curstack);
658
659 New(50,PL_tmps_stack,32,SV*);
660 PL_tmps_floor = -1;
661 PL_tmps_ix = -1;
662 PL_tmps_max = 32;
663
664 New(54,PL_markstack,16,I32);
665 PL_markstack_ptr = PL_markstack;
666 PL_markstack_max = PL_markstack + 16;
667
668#ifdef SET_MARK_OFFSET
669 SET_MARK_OFFSET;
670#endif
671
672 New(54,PL_scopestack,8,I32);
673 PL_scopestack_ix = 0;
674 PL_scopestack_max = 8;
675
676 New(54,PL_savestack,24,ANY);
677 PL_savestack_ix = 0;
678 PL_savestack_max = 24;
679
680#if !PERL_VERSION_ATLEAST (5,10,0)
681 New(54,PL_retstack,4,OP*);
682 PL_retstack_ix = 0;
683 PL_retstack_max = 4;
684#endif
685}
686#endif
687
688/*
689 * destroy the stacks, the callchain etc...
690 */
691static void
692coro_destruct_stacks (pTHX)
693{
694 while (PL_curstackinfo->si_next)
695 PL_curstackinfo = PL_curstackinfo->si_next;
696
697 while (PL_curstackinfo)
698 {
699 PERL_SI *p = PL_curstackinfo->si_prev;
700
701 if (!IN_DESTRUCT)
702 SvREFCNT_dec (PL_curstackinfo->si_stack);
703
704 Safefree (PL_curstackinfo->si_cxstack);
705 Safefree (PL_curstackinfo);
706 PL_curstackinfo = p;
707 }
708
709 Safefree (PL_tmps_stack);
710 Safefree (PL_markstack);
711 Safefree (PL_scopestack);
712 Safefree (PL_savestack);
713#if !PERL_VERSION_ATLEAST (5,10,0)
714 Safefree (PL_retstack);
715#endif
716}
717
718#define CORO_RSS \
719 rss += sizeof (SYM (curstackinfo)); \
720 rss += (SYM (curstackinfo->si_cxmax) + 1) * sizeof (PERL_CONTEXT); \
721 rss += sizeof (SV) + sizeof (struct xpvav) + (1 + AvMAX (SYM (curstack))) * sizeof (SV *); \
722 rss += SYM (tmps_max) * sizeof (SV *); \
723 rss += (SYM (markstack_max) - SYM (markstack_ptr)) * sizeof (I32); \
724 rss += SYM (scopestack_max) * sizeof (I32); \
725 rss += SYM (savestack_max) * sizeof (ANY);
726
727static size_t
728coro_rss (pTHX_ struct coro *coro)
729{
730 size_t rss = sizeof (*coro);
731
732 if (coro->mainstack)
733 {
734 if (coro->flags & CF_RUNNING)
735 {
736 #define SYM(sym) PL_ ## sym
737 CORO_RSS;
738 #undef SYM
739 }
740 else
741 {
742 #define SYM(sym) coro->slot->sym
743 CORO_RSS;
744 #undef SYM
745 }
746 }
747
748 return rss;
749}
750
751/** coroutine stack handling ************************************************/
752
753static int (*orig_sigelem_get) (pTHX_ SV *sv, MAGIC *mg);
754static int (*orig_sigelem_set) (pTHX_ SV *sv, MAGIC *mg);
755static int (*orig_sigelem_clr) (pTHX_ SV *sv, MAGIC *mg);
756
757/* apparently < 5.8.8 */
758#ifndef MgPV_nolen_const
759#define MgPV_nolen_const(mg) (((((int)(mg)->mg_len)) == HEf_SVKEY) ? \
760 SvPV_nolen((SV*)((mg)->mg_ptr)) : \
761 (const char*)(mg)->mg_ptr)
762#endif
763
764/*
765 * This overrides the default magic get method of %SIG elements.
766 * The original one doesn't provide for reading back of PL_diehook/PL_warnhook
767 * and instead of tryign to save and restore the hash elements, we just provide
768 * readback here.
769 * We only do this when the hook is != 0, as they are often set to 0 temporarily,
770 * not expecting this to actually change the hook. This is a potential problem
771 * when a schedule happens then, but we ignore this.
772 */
773static int
774coro_sigelem_get (pTHX_ SV *sv, MAGIC *mg)
775{
776 const char *s = MgPV_nolen_const (mg);
777
778 if (*s == '_')
779 {
780 SV **svp = 0;
781
782 if (strEQ (s, "__DIE__" )) svp = &PL_diehook;
783 if (strEQ (s, "__WARN__")) svp = &PL_warnhook;
784
785 if (svp)
786 {
787 sv_setsv (sv, *svp ? *svp : &PL_sv_undef);
788 return 0;
789 }
790 }
791
792 return orig_sigelem_get ? orig_sigelem_get (aTHX_ sv, mg) : 0;
793}
794
795static int
796coro_sigelem_clr (pTHX_ SV *sv, MAGIC *mg)
797{
798 const char *s = MgPV_nolen_const (mg);
799
800 if (*s == '_')
801 {
802 SV **svp = 0;
803
804 if (strEQ (s, "__DIE__" )) svp = &PL_diehook;
805 if (strEQ (s, "__WARN__")) svp = &PL_warnhook;
806
807 if (svp)
808 {
809 SV *old = *svp;
810 *svp = 0;
811 SvREFCNT_dec (old);
812 return 0;
813 }
814 }
815
816 return orig_sigelem_clr ? orig_sigelem_clr (aTHX_ sv, mg) : 0;
817}
818
819static int
820coro_sigelem_set (pTHX_ SV *sv, MAGIC *mg)
821{
822 const char *s = MgPV_nolen_const (mg);
823
824 if (*s == '_')
825 {
826 SV **svp = 0;
827
828 if (strEQ (s, "__DIE__" )) svp = &PL_diehook;
829 if (strEQ (s, "__WARN__")) svp = &PL_warnhook;
830
831 if (svp)
832 {
833 SV *old = *svp;
834 *svp = newSVsv (sv);
835 SvREFCNT_dec (old);
836 return 0;
837 }
838 }
839
840 return orig_sigelem_set ? orig_sigelem_set (aTHX_ sv, mg) : 0;
841}
842
843static void
844prepare_nop (pTHX_ struct coro_transfer_args *ta)
845{
846 /* kind of mega-hacky, but works */
847 ta->next = ta->prev = (struct coro *)ta;
848}
849
850static int
851slf_check_nop (pTHX_ struct CoroSLF *frame)
852{
853 return 0;
854}
855
856static int
857slf_check_repeat (pTHX_ struct CoroSLF *frame)
858{
859 return 1;
860}
861
862static UNOP coro_setup_op;
863
864static void NOINLINE /* noinline to keep it out of the transfer fast path */
865coro_setup (pTHX_ struct coro *coro)
866{
867 /*
868 * emulate part of the perl startup here.
869 */
870 coro_init_stacks (aTHX);
871
872 PL_runops = RUNOPS_DEFAULT;
873 PL_curcop = &PL_compiling;
874 PL_in_eval = EVAL_NULL;
875 PL_comppad = 0;
876 PL_comppad_name = 0;
877 PL_comppad_name_fill = 0;
878 PL_comppad_name_floor = 0;
879 PL_curpm = 0;
880 PL_curpad = 0;
881 PL_localizing = 0;
882 PL_dirty = 0;
883 PL_restartop = 0;
884#if PERL_VERSION_ATLEAST (5,10,0)
885 PL_parser = 0;
886#endif
887 PL_hints = 0;
888
889 /* recreate the die/warn hooks */
890 PL_diehook = 0; SvSetMagicSV (*hv_fetch (hv_sig, "__DIE__" , sizeof ("__DIE__" ) - 1, 1), rv_diehook );
891 PL_warnhook = 0; SvSetMagicSV (*hv_fetch (hv_sig, "__WARN__", sizeof ("__WARN__") - 1, 1), rv_warnhook);
892
893 GvSV (PL_defgv) = newSV (0);
894 GvAV (PL_defgv) = coro->args; coro->args = 0;
895 GvSV (PL_errgv) = newSV (0);
896 GvSV (irsgv) = newSVpvn ("\n", 1); sv_magic (GvSV (irsgv), (SV *)irsgv, PERL_MAGIC_sv, "/", 0);
897 GvHV (PL_hintgv) = 0;
898 PL_rs = newSVsv (GvSV (irsgv));
899 PL_defoutgv = (GV *)SvREFCNT_inc_NN (stdoutgv);
900
901 {
902 dSP;
903 UNOP myop;
904
905 Zero (&myop, 1, UNOP);
906 myop.op_next = Nullop;
907 myop.op_type = OP_ENTERSUB;
908 myop.op_flags = OPf_WANT_VOID;
909
910 PUSHMARK (SP);
911 PUSHs ((SV *)coro->startcv);
912 PUTBACK;
913 PL_op = (OP *)&myop;
914 PL_op = PL_ppaddr[OP_ENTERSUB](aTHX);
915 }
916
917 /* this newly created coroutine might be run on an existing cctx which most
918 * likely was suspended in pp_slf, so we have to emulate entering pp_slf here.
919 */
920 slf_frame.prepare = prepare_nop; /* provide a nop function for an eventual pp_slf */
921 slf_frame.check = slf_check_nop; /* signal pp_slf to not repeat */
922
923 /* and we have to provide the pp_slf op in any case, so pp_slf can skip it */
924 coro_setup_op.op_next = PL_op;
925 coro_setup_op.op_type = OP_ENTERSUB;
926 coro_setup_op.op_ppaddr = pp_slf;
927 /* no flags etc. required, as an init function won't be called */
928
929 PL_op = (OP *)&coro_setup_op;
930
931 /* copy throw, in case it was set before coro_setup */
932 CORO_THROW = coro->except;
933}
934
935static void
936coro_destruct (pTHX_ struct coro *coro)
937{
938 if (!IN_DESTRUCT)
939 {
940 /* restore all saved variables and stuff */
941 LEAVE_SCOPE (0);
942 assert (PL_tmps_floor == -1);
943
944 /* free all temporaries */
945 FREETMPS;
946 assert (PL_tmps_ix == -1);
947
948 /* unwind all extra stacks */
949 POPSTACK_TO (PL_mainstack);
950
951 /* unwind main stack */
952 dounwind (-1);
953 }
954
955 SvREFCNT_dec (GvSV (PL_defgv));
956 SvREFCNT_dec (GvAV (PL_defgv));
957 SvREFCNT_dec (GvSV (PL_errgv));
958 SvREFCNT_dec (PL_defoutgv);
959 SvREFCNT_dec (PL_rs);
960 SvREFCNT_dec (GvSV (irsgv));
961 SvREFCNT_dec (GvHV (PL_hintgv));
962
963 SvREFCNT_dec (PL_diehook);
964 SvREFCNT_dec (PL_warnhook);
965
966 SvREFCNT_dec (coro->saved_deffh);
967 SvREFCNT_dec (coro->rouse_cb);
968 SvREFCNT_dec (coro->invoke_cb);
969 SvREFCNT_dec (coro->invoke_av);
970
971 coro_destruct_stacks (aTHX);
972}
973
974INLINE void
975free_coro_mortal (pTHX)
976{
977 if (expect_true (coro_mortal))
978 {
979 SvREFCNT_dec (coro_mortal);
980 coro_mortal = 0;
981 }
982}
983
984static int
985runops_trace (pTHX)
986{
987 COP *oldcop = 0;
988 int oldcxix = -2;
989
990 while ((PL_op = CALL_FPTR (PL_op->op_ppaddr) (aTHX)))
991 {
992 PERL_ASYNC_CHECK ();
993
994 if (cctx_current->flags & CC_TRACE_ALL)
995 {
996 if (PL_op->op_type == OP_LEAVESUB && cctx_current->flags & CC_TRACE_SUB)
997 {
998 PERL_CONTEXT *cx = &cxstack[cxstack_ix];
999 SV **bot, **top;
1000 AV *av = newAV (); /* return values */
1001 SV **cb;
1002 dSP;
1003
1004 GV *gv = CvGV (cx->blk_sub.cv);
1005 SV *fullname = sv_2mortal (newSV (0));
1006 if (isGV (gv))
1007 gv_efullname3 (fullname, gv, 0);
1008
1009 bot = PL_stack_base + cx->blk_oldsp + 1;
1010 top = cx->blk_gimme == G_ARRAY ? SP + 1
1011 : cx->blk_gimme == G_SCALAR ? bot + 1
1012 : bot;
1013
1014 av_extend (av, top - bot);
1015 while (bot < top)
1016 av_push (av, SvREFCNT_inc_NN (*bot++));
1017
1018 PL_runops = RUNOPS_DEFAULT;
1019 ENTER;
1020 SAVETMPS;
1021 EXTEND (SP, 3);
1022 PUSHMARK (SP);
1023 PUSHs (&PL_sv_no);
1024 PUSHs (fullname);
1025 PUSHs (sv_2mortal (newRV_noinc ((SV *)av)));
1026 PUTBACK;
1027 cb = hv_fetch ((HV *)SvRV (coro_current), "_trace_sub_cb", sizeof ("_trace_sub_cb") - 1, 0);
1028 if (cb) call_sv (*cb, G_KEEPERR | G_EVAL | G_VOID | G_DISCARD);
1029 SPAGAIN;
1030 FREETMPS;
1031 LEAVE;
1032 PL_runops = runops_trace;
1033 }
1034
1035 if (oldcop != PL_curcop)
1036 {
1037 oldcop = PL_curcop;
1038
1039 if (PL_curcop != &PL_compiling)
1040 {
1041 SV **cb;
1042
1043 if (oldcxix != cxstack_ix && cctx_current->flags & CC_TRACE_SUB)
1044 {
1045 PERL_CONTEXT *cx = &cxstack[cxstack_ix];
1046
1047 if (CxTYPE (cx) == CXt_SUB && oldcxix < cxstack_ix)
1048 {
1049 dSP;
1050 GV *gv = CvGV (cx->blk_sub.cv);
1051 SV *fullname = sv_2mortal (newSV (0));
1052
1053 if (isGV (gv))
1054 gv_efullname3 (fullname, gv, 0);
1055
1056 PL_runops = RUNOPS_DEFAULT;
1057 ENTER;
1058 SAVETMPS;
1059 EXTEND (SP, 3);
1060 PUSHMARK (SP);
1061 PUSHs (&PL_sv_yes);
1062 PUSHs (fullname);
1063 PUSHs (CxHASARGS (cx) ? sv_2mortal (newRV_inc ((SV *)cx->blk_sub.argarray)) : &PL_sv_undef);
1064 PUTBACK;
1065 cb = hv_fetch ((HV *)SvRV (coro_current), "_trace_sub_cb", sizeof ("_trace_sub_cb") - 1, 0);
1066 if (cb) call_sv (*cb, G_KEEPERR | G_EVAL | G_VOID | G_DISCARD);
1067 SPAGAIN;
1068 FREETMPS;
1069 LEAVE;
1070 PL_runops = runops_trace;
1071 }
1072
1073 oldcxix = cxstack_ix;
1074 }
1075
1076 if (cctx_current->flags & CC_TRACE_LINE)
1077 {
1078 dSP;
1079
1080 PL_runops = RUNOPS_DEFAULT;
1081 ENTER;
1082 SAVETMPS;
1083 EXTEND (SP, 3);
1084 PL_runops = RUNOPS_DEFAULT;
1085 PUSHMARK (SP);
1086 PUSHs (sv_2mortal (newSVpv (OutCopFILE (oldcop), 0)));
1087 PUSHs (sv_2mortal (newSViv (CopLINE (oldcop))));
1088 PUTBACK;
1089 cb = hv_fetch ((HV *)SvRV (coro_current), "_trace_line_cb", sizeof ("_trace_line_cb") - 1, 0);
1090 if (cb) call_sv (*cb, G_KEEPERR | G_EVAL | G_VOID | G_DISCARD);
1091 SPAGAIN;
1092 FREETMPS;
1093 LEAVE;
1094 PL_runops = runops_trace;
1095 }
1096 }
1097 }
1098 }
1099 }
1100
1101 TAINT_NOT;
1102 return 0;
1103}
1104
1105static struct CoroSLF cctx_ssl_frame;
1106
1107static void
1108slf_prepare_set_stacklevel (pTHX_ struct coro_transfer_args *ta)
1109{
1110 ta->prev = 0;
1111}
1112
1113static int
1114slf_check_set_stacklevel (pTHX_ struct CoroSLF *frame)
1115{
1116 *frame = cctx_ssl_frame;
1117
1118 return frame->check (aTHX_ frame); /* execute the restored frame - there must be one */
1119}
1120
1121/* initialises PL_top_env and injects a pseudo-slf-call to set the stacklevel */
1122static void NOINLINE
1123cctx_prepare (pTHX)
1124{
1125 PL_top_env = &PL_start_env;
1126
1127 if (cctx_current->flags & CC_TRACE)
1128 PL_runops = runops_trace;
1129
1130 /* we already must be executing an SLF op, there is no other valid way
1131 * that can lead to creation of a new cctx */
1132 assert (("FATAL: can't prepare slf-less cctx in Coro module (please report)",
1133 slf_frame.prepare && PL_op->op_ppaddr == pp_slf));
1134
1135 /* we must emulate leaving pp_slf, which is done inside slf_check_set_stacklevel */
1136 cctx_ssl_frame = slf_frame;
1137
1138 slf_frame.prepare = slf_prepare_set_stacklevel;
1139 slf_frame.check = slf_check_set_stacklevel;
1140}
1141
1142/* the tail of transfer: execute stuff we can only do after a transfer */
1143INLINE void
1144transfer_tail (pTHX)
1145{
1146 free_coro_mortal (aTHX);
1147}
1148
1149/*
1150 * this is a _very_ stripped down perl interpreter ;)
1151 */
1152static void
1153cctx_run (void *arg)
1154{
1155#ifdef USE_ITHREADS
1156# if CORO_PTHREAD
1157 PERL_SET_CONTEXT (coro_thx);
1158# endif
1159#endif
1160 {
1161 dTHX;
1162
1163 /* normally we would need to skip the entersub here */
1164 /* not doing so will re-execute it, which is exactly what we want */
1165 /* PL_nop = PL_nop->op_next */
1166
1167 /* inject a fake subroutine call to cctx_init */
1168 cctx_prepare (aTHX);
1169
1170 /* cctx_run is the alternative tail of transfer() */
1171 transfer_tail (aTHX);
1172
1173 /* somebody or something will hit me for both perl_run and PL_restartop */
1174 PL_restartop = PL_op;
1175 perl_run (PL_curinterp);
1176 /*
1177 * Unfortunately, there is no way to get at the return values of the
1178 * coro body here, as perl_run destroys these
1179 */
1180
1181 /*
1182 * If perl-run returns we assume exit() was being called or the coro
1183 * fell off the end, which seems to be the only valid (non-bug)
1184 * reason for perl_run to return. We try to exit by jumping to the
1185 * bootstrap-time "top" top_env, as we cannot restore the "main"
1186 * coroutine as Coro has no such concept.
1187 * This actually isn't valid with the pthread backend, but OSes requiring
1188 * that backend are too broken to do it in a standards-compliant way.
1189 */
1190 PL_top_env = main_top_env;
1191 JMPENV_JUMP (2); /* I do not feel well about the hardcoded 2 at all */
1192 }
1193}
1194
1195static coro_cctx *
1196cctx_new ()
1197{
1198 coro_cctx *cctx;
1199
1200 ++cctx_count;
1201 New (0, cctx, 1, coro_cctx);
1202
1203 cctx->gen = cctx_gen;
1204 cctx->flags = 0;
1205 cctx->idle_sp = 0; /* can be accessed by transfer between cctx_run and set_stacklevel, on throw */
1206
1207 return cctx;
1208}
1209
1210/* create a new cctx only suitable as source */
1211static coro_cctx *
1212cctx_new_empty ()
1213{
1214 coro_cctx *cctx = cctx_new ();
1215
1216 cctx->sptr = 0;
1217 coro_create (&cctx->cctx, 0, 0, 0, 0);
1218
1219 return cctx;
1220}
1221
1222/* create a new cctx suitable as destination/running a perl interpreter */
1223static coro_cctx *
1224cctx_new_run ()
1225{
1226 coro_cctx *cctx = cctx_new ();
1227 void *stack_start;
1228 size_t stack_size;
1229
1230#if HAVE_MMAP
1231 cctx->ssize = ((cctx_stacksize * sizeof (long) + PAGESIZE - 1) / PAGESIZE + CORO_STACKGUARD) * PAGESIZE;
1232 /* mmap supposedly does allocate-on-write for us */
1233 cctx->sptr = mmap (0, cctx->ssize, PROT_EXEC|PROT_READ|PROT_WRITE, MAP_PRIVATE|MAP_ANONYMOUS, 0, 0);
1234
1235 if (cctx->sptr != (void *)-1)
1236 {
1237 #if CORO_STACKGUARD
1238 mprotect (cctx->sptr, CORO_STACKGUARD * PAGESIZE, PROT_NONE);
1239 #endif
1240 stack_start = (char *)cctx->sptr + CORO_STACKGUARD * PAGESIZE;
1241 stack_size = cctx->ssize - CORO_STACKGUARD * PAGESIZE;
1242 cctx->flags |= CC_MAPPED;
1243 }
1244 else
1245#endif
1246 {
1247 cctx->ssize = cctx_stacksize * (long)sizeof (long);
1248 New (0, cctx->sptr, cctx_stacksize, long);
1249
1250 if (!cctx->sptr)
1251 {
1252 perror ("FATAL: unable to allocate stack for coroutine, exiting.");
1253 _exit (EXIT_FAILURE);
1254 }
1255
1256 stack_start = cctx->sptr;
1257 stack_size = cctx->ssize;
1258 }
1259
1260 #if CORO_USE_VALGRIND
1261 cctx->valgrind_id = VALGRIND_STACK_REGISTER ((char *)stack_start, (char *)stack_start + stack_size);
1262 #endif
1263
1264 coro_create (&cctx->cctx, cctx_run, (void *)cctx, stack_start, stack_size);
1265
1266 return cctx;
1267}
1268
1269static void
1270cctx_destroy (coro_cctx *cctx)
1271{
1272 if (!cctx)
1273 return;
1274
1275 assert (cctx != cctx_current);//D temporary
1276
1277 --cctx_count;
1278 coro_destroy (&cctx->cctx);
1279
1280 /* coro_transfer creates new, empty cctx's */
1281 if (cctx->sptr)
1282 {
1283 #if CORO_USE_VALGRIND
1284 VALGRIND_STACK_DEREGISTER (cctx->valgrind_id);
1285 #endif
1286
1287#if HAVE_MMAP
1288 if (cctx->flags & CC_MAPPED)
1289 munmap (cctx->sptr, cctx->ssize);
1290 else
1291#endif
1292 Safefree (cctx->sptr);
1293 }
1294
1295 Safefree (cctx);
1296}
1297
1298/* wether this cctx should be destructed */
1299#define CCTX_EXPIRED(cctx) ((cctx)->gen != cctx_gen || ((cctx)->flags & CC_NOREUSE))
1300
1301static coro_cctx *
1302cctx_get (pTHX)
1303{
1304 while (expect_true (cctx_first))
1305 {
1306 coro_cctx *cctx = cctx_first;
1307 cctx_first = cctx->next;
1308 --cctx_idle;
1309
1310 if (expect_true (!CCTX_EXPIRED (cctx)))
1311 return cctx;
1312
1313 cctx_destroy (cctx);
1314 }
1315
1316 return cctx_new_run ();
1317}
1318
1319static void
1320cctx_put (coro_cctx *cctx)
1321{
1322 assert (("FATAL: cctx_put called on non-initialised cctx in Coro (please report)", cctx->sptr));
1323
1324 /* free another cctx if overlimit */
1325 if (expect_false (cctx_idle >= cctx_max_idle))
1326 {
1327 coro_cctx *first = cctx_first;
1328 cctx_first = first->next;
1329 --cctx_idle;
1330
1331 cctx_destroy (first);
1332 }
1333
1334 ++cctx_idle;
1335 cctx->next = cctx_first;
1336 cctx_first = cctx;
1337}
1338
1339/** coroutine switching *****************************************************/
1340
1341static void
1342transfer_check (pTHX_ struct coro *prev, struct coro *next)
1343{
1344 /* TODO: throwing up here is considered harmful */
1345
1346 if (expect_true (prev != next))
1347 {
1348 if (expect_false (!(prev->flags & (CF_RUNNING | CF_NEW))))
1349 croak ("Coro::State::transfer called with a suspended prev Coro::State, but can only transfer from running or new states,");
1350
1351 if (expect_false (next->flags & CF_RUNNING))
1352 croak ("Coro::State::transfer called with running next Coro::State, but can only transfer to inactive states,");
1353
1354 if (expect_false (next->flags & CF_DESTROYED))
1355 croak ("Coro::State::transfer called with destroyed next Coro::State, but can only transfer to inactive states,");
1356
1357#if !PERL_VERSION_ATLEAST (5,10,0)
1358 if (expect_false (PL_lex_state != LEX_NOTPARSING))
1359 croak ("Coro::State::transfer called while parsing, but this is not supported in your perl version,");
1360#endif
1361 }
1362}
1363
1364/* always use the TRANSFER macro */
1365static void NOINLINE /* noinline so we have a fixed stackframe */
1366transfer (pTHX_ struct coro *prev, struct coro *next, int force_cctx)
1367{
1368 dSTACKLEVEL;
1369
1370 /* sometimes transfer is only called to set idle_sp */
1371 if (expect_false (!prev))
1372 {
1373 cctx_current->idle_sp = STACKLEVEL;
1374 assert (cctx_current->idle_te = PL_top_env); /* just for the side-effect when asserts are enabled */
1375 }
1376 else if (expect_true (prev != next))
1377 {
1378 coro_cctx *cctx_prev;
1379
1380 if (expect_false (prev->flags & CF_NEW))
1381 {
1382 /* create a new empty/source context */
1383 prev->flags &= ~CF_NEW;
1384 prev->flags |= CF_RUNNING;
1385 }
1386
1387 prev->flags &= ~CF_RUNNING;
1388 next->flags |= CF_RUNNING;
1389
1390 /* first get rid of the old state */
1391 save_perl (aTHX_ prev);
1392
1393 if (expect_false (next->flags & CF_NEW))
1394 {
1395 /* need to start coroutine */
1396 next->flags &= ~CF_NEW;
1397 /* setup coroutine call */
1398 coro_setup (aTHX_ next);
1399 }
1400 else
1401 load_perl (aTHX_ next);
1402
1403 assert (!prev->cctx);//D temporary
1404
1405 /* possibly untie and reuse the cctx */
1406 if (expect_true (
1407 cctx_current->idle_sp == STACKLEVEL
1408 && !(cctx_current->flags & CC_TRACE)
1409 && !force_cctx
1410 ))
1411 {
1412 /* I assume that stacklevel is a stronger indicator than PL_top_env changes */
1413 assert (("FATAL: current top_env must equal previous top_env in Coro (please report)", PL_top_env == cctx_current->idle_te));
1414
1415 /* if the cctx is about to be destroyed we need to make sure we won't see it in cctx_get. */
1416 /* without this the next cctx_get might destroy the running cctx while still in use */
1417 if (expect_false (CCTX_EXPIRED (cctx_current)))
1418 if (expect_true (!next->cctx))
1419 next->cctx = cctx_get (aTHX);
1420
1421 cctx_put (cctx_current);
1422 }
1423 else
1424 prev->cctx = cctx_current;
1425
1426 ++next->usecount;
1427
1428 cctx_prev = cctx_current;
1429 cctx_current = expect_false (next->cctx) ? next->cctx : cctx_get (aTHX);
1430
1431 next->cctx = 0;
1432
1433 if (expect_false (cctx_prev != cctx_current))
1434 {
1435 cctx_prev->top_env = PL_top_env;
1436 PL_top_env = cctx_current->top_env;
1437 coro_transfer (&cctx_prev->cctx, &cctx_current->cctx);
1438 }
1439
1440 transfer_tail (aTHX);
1441 }
1442}
1443
1444#define TRANSFER(ta, force_cctx) transfer (aTHX_ (ta).prev, (ta).next, (force_cctx))
1445#define TRANSFER_CHECK(ta) transfer_check (aTHX_ (ta).prev, (ta).next)
1446
1447/** high level stuff ********************************************************/
1448
1449static int
1450coro_state_destroy (pTHX_ struct coro *coro)
1451{
1452 if (coro->flags & CF_DESTROYED)
1453 return 0;
1454
1455 if (coro->on_destroy)
1456 coro->on_destroy (aTHX_ coro);
1457
1458 coro->flags |= CF_DESTROYED;
1459
1460 if (coro->flags & CF_READY)
1461 {
1462 /* reduce nready, as destroying a ready coro effectively unreadies it */
1463 /* alternative: look through all ready queues and remove the coro */
1464 --coro_nready;
1465 }
1466 else
1467 coro->flags |= CF_READY; /* make sure it is NOT put into the readyqueue */
1468
1469 if (coro->mainstack && coro->mainstack != main_mainstack)
1470 {
1471 struct coro temp;
1472
1473 assert (("FATAL: tried to destroy currently running coroutine (please report)", !(coro->flags & CF_RUNNING)));
1474
1475 save_perl (aTHX_ &temp);
1476 load_perl (aTHX_ coro);
1477
1478 coro_destruct (aTHX_ coro);
1479
1480 load_perl (aTHX_ &temp);
1481
1482 coro->slot = 0;
1483 }
1484
1485 cctx_destroy (coro->cctx);
1486 SvREFCNT_dec (coro->startcv);
1487 SvREFCNT_dec (coro->args);
1488 SvREFCNT_dec (CORO_THROW);
1489
1490 if (coro->next) coro->next->prev = coro->prev;
1491 if (coro->prev) coro->prev->next = coro->next;
1492 if (coro == coro_first) coro_first = coro->next;
1493
1494 return 1;
1495}
1496
1497static int
1498coro_state_free (pTHX_ SV *sv, MAGIC *mg)
1499{
1500 struct coro *coro = (struct coro *)mg->mg_ptr;
1501 mg->mg_ptr = 0;
1502
1503 coro->hv = 0;
1504
1505 if (--coro->refcnt < 0)
1506 {
1507 coro_state_destroy (aTHX_ coro);
1508 Safefree (coro);
1509 }
1510
1511 return 0;
1512}
1513
1514static int
1515coro_state_dup (pTHX_ MAGIC *mg, CLONE_PARAMS *params)
1516{
1517 struct coro *coro = (struct coro *)mg->mg_ptr;
1518
1519 ++coro->refcnt;
1520
1521 return 0;
1522}
1523
1524static MGVTBL coro_state_vtbl = {
1525 0, 0, 0, 0,
1526 coro_state_free,
1527 0,
1528#ifdef MGf_DUP
1529 coro_state_dup,
1530#else
1531# define MGf_DUP 0
1532#endif
1533};
1534
1535static void
1536prepare_transfer (pTHX_ struct coro_transfer_args *ta, SV *prev_sv, SV *next_sv)
1537{
1538 ta->prev = SvSTATE (prev_sv);
1539 ta->next = SvSTATE (next_sv);
1540 TRANSFER_CHECK (*ta);
1541}
1542
1543static void
1544api_transfer (pTHX_ SV *prev_sv, SV *next_sv)
1545{
1546 struct coro_transfer_args ta;
1547
1548 prepare_transfer (aTHX_ &ta, prev_sv, next_sv);
1549 TRANSFER (ta, 1);
1550}
1551
1552/*****************************************************************************/
1553/* gensub: simple closure generation utility */
1554
1555#define GENSUB_ARG CvXSUBANY (cv).any_ptr
1556
1557/* create a closure from XS, returns a code reference */
1558/* the arg can be accessed via GENSUB_ARG from the callback */
1559/* the callback must use dXSARGS/XSRETURN */
1560static SV *
1561gensub (pTHX_ void (*xsub)(pTHX_ CV *), void *arg)
1562{
1563 CV *cv = (CV *)newSV (0);
1564
1565 sv_upgrade ((SV *)cv, SVt_PVCV);
1566
1567 CvANON_on (cv);
1568 CvISXSUB_on (cv);
1569 CvXSUB (cv) = xsub;
1570 GENSUB_ARG = arg;
1571
1572 return newRV_noinc ((SV *)cv);
1573}
1574
1575/** Coro ********************************************************************/
1576
1577INLINE void
1578coro_enq (pTHX_ struct coro *coro)
1579{
1580 av_push (coro_ready [coro->prio - PRIO_MIN], SvREFCNT_inc_NN (coro->hv));
1581}
1582
1583INLINE SV *
1584coro_deq (pTHX)
1585{
1586 int prio;
1587
1588 for (prio = PRIO_MAX - PRIO_MIN + 1; --prio >= 0; )
1589 if (AvFILLp (coro_ready [prio]) >= 0)
1590 return av_shift (coro_ready [prio]);
1591
1592 return 0;
1593}
1594
1595static int
1596api_ready (pTHX_ SV *coro_sv)
1597{
1598 struct coro *coro;
1599 SV *sv_hook;
1600 void (*xs_hook)(void);
1601
1602 coro = SvSTATE (coro_sv);
1603
1604 if (coro->flags & CF_READY)
1605 return 0;
1606
1607 coro->flags |= CF_READY;
1608
1609 sv_hook = coro_nready ? 0 : coro_readyhook;
1610 xs_hook = coro_nready ? 0 : coroapi.readyhook;
1611
1612 coro_enq (aTHX_ coro);
1613 ++coro_nready;
1614
1615 if (sv_hook)
1616 {
1617 dSP;
1618
1619 ENTER;
1620 SAVETMPS;
1621
1622 PUSHMARK (SP);
1623 PUTBACK;
1624 call_sv (sv_hook, G_VOID | G_DISCARD);
1625
1626 FREETMPS;
1627 LEAVE;
1628 }
1629
1630 if (xs_hook)
1631 xs_hook ();
1632
1633 return 1;
1634}
1635
1636static int
1637api_is_ready (pTHX_ SV *coro_sv)
1638{
1639 return !!(SvSTATE (coro_sv)->flags & CF_READY);
1640}
1641
1642/* expects to own a reference to next->hv */
1643INLINE void
1644prepare_schedule_to (pTHX_ struct coro_transfer_args *ta, struct coro *next)
1645{
1646 SV *prev_sv = SvRV (coro_current);
1647
1648 ta->prev = SvSTATE_hv (prev_sv);
1649 ta->next = next;
1650
1651 TRANSFER_CHECK (*ta);
1652
1653 SvRV_set (coro_current, (SV *)next->hv);
1654
1655 free_coro_mortal (aTHX);
1656 coro_mortal = prev_sv;
1657}
1658
1659static void
1660prepare_schedule (pTHX_ struct coro_transfer_args *ta)
1661{
1662 for (;;)
1663 {
1664 SV *next_sv = coro_deq (aTHX);
1665
1666 if (expect_true (next_sv))
1667 {
1668 struct coro *next = SvSTATE_hv (next_sv);
1669
1670 /* cannot transfer to destroyed coros, skip and look for next */
1671 if (expect_false (next->flags & CF_DESTROYED))
1672 SvREFCNT_dec (next_sv); /* coro_nready has already been taken care of by destroy */
1673 else
1674 {
1675 next->flags &= ~CF_READY;
1676 --coro_nready;
1677
1678 prepare_schedule_to (aTHX_ ta, next);
1679 break;
1680 }
1681 }
1682 else
1683 {
1684 /* nothing to schedule: call the idle handler */
1685 if (SvROK (sv_idle)
1686 && SvOBJECT (SvRV (sv_idle)))
1687 {
1688 ++coro_nready; /* hack so that api_ready doesn't invoke ready hook */
1689 api_ready (aTHX_ SvRV (sv_idle));
1690 --coro_nready;
1691 }
1692 else
1693 {
1694 dSP;
1695
1696 ENTER;
1697 SAVETMPS;
1698
1699 PUSHMARK (SP);
1700 PUTBACK;
1701 call_sv (sv_idle, G_VOID | G_DISCARD);
1702
1703 FREETMPS;
1704 LEAVE;
1705 }
1706 }
1707 }
1708}
1709
1710INLINE void
1711prepare_cede (pTHX_ struct coro_transfer_args *ta)
1712{
1713 api_ready (aTHX_ coro_current);
1714 prepare_schedule (aTHX_ ta);
1715}
1716
1717INLINE void
1718prepare_cede_notself (pTHX_ struct coro_transfer_args *ta)
1719{
1720 SV *prev = SvRV (coro_current);
1721
1722 if (coro_nready)
1723 {
1724 prepare_schedule (aTHX_ ta);
1725 api_ready (aTHX_ prev);
1726 }
1727 else
1728 prepare_nop (aTHX_ ta);
1729}
1730
1731static void
1732api_schedule (pTHX)
1733{
1734 struct coro_transfer_args ta;
1735
1736 prepare_schedule (aTHX_ &ta);
1737 TRANSFER (ta, 1);
1738}
1739
1740static void
1741api_schedule_to (pTHX_ SV *coro_sv)
1742{
1743 struct coro_transfer_args ta;
1744 struct coro *next = SvSTATE (coro_sv);
1745
1746 SvREFCNT_inc_NN (coro_sv);
1747 prepare_schedule_to (aTHX_ &ta, next);
1748}
1749
1750static int
1751api_cede (pTHX)
1752{
1753 struct coro_transfer_args ta;
1754
1755 prepare_cede (aTHX_ &ta);
1756
1757 if (expect_true (ta.prev != ta.next))
1758 {
1759 TRANSFER (ta, 1);
1760 return 1;
1761 }
1762 else
1763 return 0;
1764}
1765
1766static int
1767api_cede_notself (pTHX)
1768{
1769 if (coro_nready)
1770 {
1771 struct coro_transfer_args ta;
1772
1773 prepare_cede_notself (aTHX_ &ta);
1774 TRANSFER (ta, 1);
1775 return 1;
1776 }
1777 else
1778 return 0;
1779}
1780
1781static void
1782api_trace (pTHX_ SV *coro_sv, int flags)
1783{
1784 struct coro *coro = SvSTATE (coro_sv);
1785
1786 if (coro->flags & CF_RUNNING)
1787 croak ("cannot enable tracing on a running coroutine, caught");
1788
1789 if (flags & CC_TRACE)
1790 {
1791 if (!coro->cctx)
1792 coro->cctx = cctx_new_run ();
1793 else if (!(coro->cctx->flags & CC_TRACE))
1794 croak ("cannot enable tracing on coroutine with custom stack, caught");
1795
1796 coro->cctx->flags |= CC_NOREUSE | (flags & (CC_TRACE | CC_TRACE_ALL));
1797 }
1798 else if (coro->cctx && coro->cctx->flags & CC_TRACE)
1799 {
1800 coro->cctx->flags &= ~(CC_TRACE | CC_TRACE_ALL);
1801
1802 if (coro->flags & CF_RUNNING)
1803 PL_runops = RUNOPS_DEFAULT;
1804 else
1805 coro->slot->runops = RUNOPS_DEFAULT;
1806 }
1807}
1808
1809static void
1810coro_call_on_destroy (pTHX_ struct coro *coro)
1811{
1812 SV **on_destroyp = hv_fetch (coro->hv, "_on_destroy", sizeof ("_on_destroy") - 1, 0);
1813 SV **statusp = hv_fetch (coro->hv, "_status", sizeof ("_status") - 1, 0);
1814
1815 if (on_destroyp)
1816 {
1817 AV *on_destroy = (AV *)SvRV (*on_destroyp);
1818
1819 while (AvFILLp (on_destroy) >= 0)
1820 {
1821 dSP; /* don't disturb outer sp */
1822 SV *cb = av_pop (on_destroy);
1823
1824 PUSHMARK (SP);
1825
1826 if (statusp)
1827 {
1828 int i;
1829 AV *status = (AV *)SvRV (*statusp);
1830 EXTEND (SP, AvFILLp (status) + 1);
1831
1832 for (i = 0; i <= AvFILLp (status); ++i)
1833 PUSHs (AvARRAY (status)[i]);
1834 }
1835
1836 PUTBACK;
1837 call_sv (sv_2mortal (cb), G_VOID | G_DISCARD);
1838 }
1839 }
1840}
1841
1842static void
1843slf_init_terminate (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
1844{
1845 int i;
1846 HV *hv = (HV *)SvRV (coro_current);
1847 AV *av = newAV ();
1848
1849 av_extend (av, items - 1);
1850 for (i = 0; i < items; ++i)
1851 av_push (av, SvREFCNT_inc_NN (arg [i]));
1852
1853 hv_store (hv, "_status", sizeof ("_status") - 1, newRV_noinc ((SV *)av), 0);
1854
1855 av_push (av_destroy, (SV *)newRV_inc ((SV *)hv)); /* RVinc for perl */
1856 api_ready (aTHX_ sv_manager);
1857
1858 frame->prepare = prepare_schedule;
1859 frame->check = slf_check_repeat;
1860}
1861
1862/*****************************************************************************/
1863/* async pool handler */
1864
1865static int
1866slf_check_pool_handler (pTHX_ struct CoroSLF *frame)
1867{
1868 HV *hv = (HV *)SvRV (coro_current);
1869 struct coro *coro = (struct coro *)frame->data;
1870
1871 if (!coro->invoke_cb)
1872 return 1; /* loop till we have invoke */
1873 else
1874 {
1875 hv_store (hv, "desc", sizeof ("desc") - 1,
1876 newSVpvn ("[async_pool]", sizeof ("[async_pool]") - 1), 0);
1877
1878 coro->saved_deffh = SvREFCNT_inc_NN ((SV *)PL_defoutgv);
1879
1880 {
1881 dSP;
1882 XPUSHs (sv_2mortal (coro->invoke_cb)); coro->invoke_cb = 0;
1883 PUTBACK;
1884 }
1885
1886 SvREFCNT_dec (GvAV (PL_defgv));
1887 GvAV (PL_defgv) = coro->invoke_av;
1888 coro->invoke_av = 0;
1889
1890 return 0;
1891 }
1892}
1893
1894static void
1895slf_init_pool_handler (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
1896{
1897 HV *hv = (HV *)SvRV (coro_current);
1898 struct coro *coro = SvSTATE_hv ((SV *)hv);
1899
1900 if (expect_true (coro->saved_deffh))
1901 {
1902 /* subsequent iteration */
1903 SvREFCNT_dec ((SV *)PL_defoutgv); PL_defoutgv = (GV *)coro->saved_deffh;
1904 coro->saved_deffh = 0;
1905
1906 if (coro_rss (aTHX_ coro) > SvUV (sv_pool_rss)
1907 || av_len (av_async_pool) + 1 >= SvIV (sv_pool_size))
1908 {
1909 coro->invoke_cb = SvREFCNT_inc_NN ((SV *)cv_coro_terminate);
1910 coro->invoke_av = newAV ();
1911
1912 frame->prepare = prepare_nop;
1913 }
1914 else
1915 {
1916 av_clear (GvAV (PL_defgv));
1917 hv_store (hv, "desc", sizeof ("desc") - 1, SvREFCNT_inc_NN (sv_async_pool_idle), 0);
1918
1919 coro->prio = 0;
1920
1921 if (coro->cctx && (coro->cctx->flags & CC_TRACE))
1922 api_trace (aTHX_ coro_current, 0);
1923
1924 frame->prepare = prepare_schedule;
1925 av_push (av_async_pool, SvREFCNT_inc (hv));
1926 }
1927 }
1928 else
1929 {
1930 /* first iteration, simply fall through */
1931 frame->prepare = prepare_nop;
1932 }
1933
1934 frame->check = slf_check_pool_handler;
1935 frame->data = (void *)coro;
1936}
1937
1938/*****************************************************************************/
1939/* rouse callback */
1940
1941#define CORO_MAGIC_type_rouse PERL_MAGIC_ext
1942
1943static void
1944coro_rouse_callback (pTHX_ CV *cv)
1945{
1946 dXSARGS;
1947 SV *data = (SV *)GENSUB_ARG;
1948
1949 if (SvTYPE (SvRV (data)) != SVt_PVAV)
1950 {
1951 /* first call, set args */
1952 AV *av = newAV ();
1953 SV *coro = SvRV (data);
1954
1955 SvRV_set (data, (SV *)av);
1956 api_ready (aTHX_ coro);
1957 SvREFCNT_dec (coro);
1958
1959 /* better take a full copy of the arguments */
1960 while (items--)
1961 av_store (av, items, newSVsv (ST (items)));
1962 }
1963
1964 XSRETURN_EMPTY;
1965}
1966
1967static int
1968slf_check_rouse_wait (pTHX_ struct CoroSLF *frame)
1969{
1970 SV *data = (SV *)frame->data;
1971
1972 if (CORO_THROW)
1973 return 0;
1974
1975 if (SvTYPE (SvRV (data)) != SVt_PVAV)
1976 return 1;
1977
1978 /* now push all results on the stack */
1979 {
1980 dSP;
1981 AV *av = (AV *)SvRV (data);
1982 int i;
1983
1984 EXTEND (SP, AvFILLp (av) + 1);
1985 for (i = 0; i <= AvFILLp (av); ++i)
1986 PUSHs (sv_2mortal (AvARRAY (av)[i]));
1987
1988 /* we have stolen the elements, so ste length to zero and free */
1989 AvFILLp (av) = -1;
1990 av_undef (av);
1991
1992 PUTBACK;
1993 }
1994
1995 return 0;
1996}
1997
1998static void
1999slf_init_rouse_wait (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2000{
2001 SV *cb;
2002
2003 if (items)
2004 cb = arg [0];
2005 else
2006 {
2007 struct coro *coro = SvSTATE_current;
2008
2009 if (!coro->rouse_cb)
2010 croak ("Coro::rouse_wait called without rouse callback, and no default rouse callback found either,");
2011
2012 cb = sv_2mortal (coro->rouse_cb);
2013 coro->rouse_cb = 0;
2014 }
2015
2016 if (!SvROK (cb)
2017 || SvTYPE (SvRV (cb)) != SVt_PVCV
2018 || CvXSUB ((CV *)SvRV (cb)) != coro_rouse_callback)
2019 croak ("Coro::rouse_wait called with illegal callback argument,");
2020
2021 {
2022 CV *cv = (CV *)SvRV (cb); /* for GENSUB_ARG */
2023 SV *data = (SV *)GENSUB_ARG;
2024
2025 frame->data = (void *)data;
2026 frame->prepare = SvTYPE (SvRV (data)) == SVt_PVAV ? prepare_nop : prepare_schedule;
2027 frame->check = slf_check_rouse_wait;
2028 }
2029}
2030
2031static SV *
2032coro_new_rouse_cb (pTHX)
2033{
2034 HV *hv = (HV *)SvRV (coro_current);
2035 struct coro *coro = SvSTATE_hv (hv);
2036 SV *data = newRV_inc ((SV *)hv);
2037 SV *cb = gensub (aTHX_ coro_rouse_callback, (void *)data);
2038
2039 sv_magicext (SvRV (cb), data, CORO_MAGIC_type_rouse, 0, 0, 0);
2040 SvREFCNT_dec (data); /* magicext increases the refcount */
2041
2042 SvREFCNT_dec (coro->rouse_cb);
2043 coro->rouse_cb = SvREFCNT_inc_NN (cb);
2044
2045 return cb;
2046}
2047
2048/*****************************************************************************/
2049/* schedule-like-function opcode (SLF) */
2050
2051static UNOP slf_restore; /* restore stack as entersub did, for first-re-run */
2052static const CV *slf_cv;
2053static SV **slf_argv;
2054static int slf_argc, slf_arga; /* count, allocated */
2055static I32 slf_ax; /* top of stack, for restore */
2056
2057/* this restores the stack in the case we patched the entersub, to */
2058/* recreate the stack frame as perl will on following calls */
2059/* since entersub cleared the stack */
2060static OP *
2061pp_restore (pTHX)
2062{
2063 int i;
2064 SV **SP = PL_stack_base + slf_ax;
2065
2066 PUSHMARK (SP);
2067
2068 EXTEND (SP, slf_argc + 1);
2069
2070 for (i = 0; i < slf_argc; ++i)
2071 PUSHs (sv_2mortal (slf_argv [i]));
2072
2073 PUSHs ((SV *)CvGV (slf_cv));
2074
2075 RETURNOP (slf_restore.op_first);
2076}
2077
2078static void
2079slf_prepare_transfer (pTHX_ struct coro_transfer_args *ta)
2080{
2081 SV **arg = (SV **)slf_frame.data;
2082
2083 prepare_transfer (aTHX_ ta, arg [0], arg [1]);
2084}
2085
2086static void
2087slf_init_transfer (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2088{
2089 if (items != 2)
2090 croak ("Coro::State::transfer (prev, next) expects two arguments, not %d,", items);
2091
2092 frame->prepare = slf_prepare_transfer;
2093 frame->check = slf_check_nop;
2094 frame->data = (void *)arg; /* let's hope it will stay valid */
2095}
2096
2097static void
2098slf_init_schedule (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2099{
2100 frame->prepare = prepare_schedule;
2101 frame->check = slf_check_nop;
2102}
2103
2104static void
2105slf_prepare_schedule_to (pTHX_ struct coro_transfer_args *ta)
2106{
2107 struct coro *next = (struct coro *)slf_frame.data;
2108
2109 SvREFCNT_inc_NN (next->hv);
2110 prepare_schedule_to (aTHX_ ta, next);
2111}
2112
2113static void
2114slf_init_schedule_to (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2115{
2116 if (!items)
2117 croak ("Coro::schedule_to expects a coroutine argument, caught");
2118
2119 frame->data = (void *)SvSTATE (arg [0]);
2120 frame->prepare = slf_prepare_schedule_to;
2121 frame->check = slf_check_nop;
2122}
2123
2124static void
2125slf_init_cede_to (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2126{
2127 api_ready (aTHX_ SvRV (coro_current));
2128
2129 slf_init_schedule_to (aTHX_ frame, cv, arg, items);
2130}
2131
2132static void
2133slf_init_cede (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2134{
2135 frame->prepare = prepare_cede;
2136 frame->check = slf_check_nop;
2137}
2138
2139static void
2140slf_init_cede_notself (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2141{
2142 frame->prepare = prepare_cede_notself;
2143 frame->check = slf_check_nop;
2144}
2145
2146/*
2147 * these not obviously related functions are all rolled into one
2148 * function to increase chances that they all will call transfer with the same
2149 * stack offset
2150 * SLF stands for "schedule-like-function".
2151 */
2152static OP *
2153pp_slf (pTHX)
2154{
2155 I32 checkmark; /* mark SP to see how many elements check has pushed */
2156
2157 /* set up the slf frame, unless it has already been set-up */
2158 /* the latter happens when a new coro has been started */
2159 /* or when a new cctx was attached to an existing coroutine */
2160 if (expect_true (!slf_frame.prepare))
2161 {
2162 /* first iteration */
2163 dSP;
2164 SV **arg = PL_stack_base + TOPMARK + 1;
2165 int items = SP - arg; /* args without function object */
2166 SV *gv = *sp;
2167
2168 /* do a quick consistency check on the "function" object, and if it isn't */
2169 /* for us, divert to the real entersub */
2170 if (SvTYPE (gv) != SVt_PVGV
2171 || !GvCV (gv)
2172 || !(CvFLAGS (GvCV (gv)) & CVf_SLF))
2173 return PL_ppaddr[OP_ENTERSUB](aTHX);
2174
2175 if (!(PL_op->op_flags & OPf_STACKED))
2176 {
2177 /* ampersand-form of call, use @_ instead of stack */
2178 AV *av = GvAV (PL_defgv);
2179 arg = AvARRAY (av);
2180 items = AvFILLp (av) + 1;
2181 }
2182
2183 /* now call the init function, which needs to set up slf_frame */
2184 ((coro_slf_cb)CvXSUBANY (GvCV (gv)).any_ptr)
2185 (aTHX_ &slf_frame, GvCV (gv), arg, items);
2186
2187 /* pop args */
2188 SP = PL_stack_base + POPMARK;
2189
2190 PUTBACK;
2191 }
2192
2193 /* now that we have a slf_frame, interpret it! */
2194 /* we use a callback system not to make the code needlessly */
2195 /* complicated, but so we can run multiple perl coros from one cctx */
2196
2197 do
2198 {
2199 struct coro_transfer_args ta;
2200
2201 slf_frame.prepare (aTHX_ &ta);
2202 TRANSFER (ta, 0);
2203
2204 checkmark = PL_stack_sp - PL_stack_base;
2205 }
2206 while (slf_frame.check (aTHX_ &slf_frame));
2207
2208 slf_frame.prepare = 0; /* invalidate the frame, we are done processing it */
2209
2210 /* exception handling */
2211 if (expect_false (CORO_THROW))
2212 {
2213 SV *exception = sv_2mortal (CORO_THROW);
2214
2215 CORO_THROW = 0;
2216 sv_setsv (ERRSV, exception);
2217 croak (0);
2218 }
2219
2220 /* return value handling - mostly like entersub */
2221 /* make sure we put something on the stack in scalar context */
2222 if (GIMME_V == G_SCALAR)
2223 {
2224 dSP;
2225 SV **bot = PL_stack_base + checkmark;
2226
2227 if (sp == bot) /* too few, push undef */
2228 bot [1] = &PL_sv_undef;
2229 else if (sp != bot + 1) /* too many, take last one */
2230 bot [1] = *sp;
2231
2232 SP = bot + 1;
2233
2234 PUTBACK;
2235 }
2236
2237 return NORMAL;
2238}
2239
2240static void
2241api_execute_slf (pTHX_ CV *cv, coro_slf_cb init_cb, I32 ax)
2242{
2243 int i;
2244 SV **arg = PL_stack_base + ax;
2245 int items = PL_stack_sp - arg + 1;
2246
2247 assert (("FATAL: SLF call with illegal CV value", !CvANON (cv)));
2248
2249 if (PL_op->op_ppaddr != PL_ppaddr [OP_ENTERSUB]
2250 && PL_op->op_ppaddr != pp_slf)
2251 croak ("FATAL: Coro SLF calls can only be made normally, not via goto or any other means, caught");
2252
2253 CvFLAGS (cv) |= CVf_SLF;
2254 CvXSUBANY (cv).any_ptr = (void *)init_cb;
2255 slf_cv = cv;
2256
2257 /* we patch the op, and then re-run the whole call */
2258 /* we have to put the same argument on the stack for this to work */
2259 /* and this will be done by pp_restore */
2260 slf_restore.op_next = (OP *)&slf_restore;
2261 slf_restore.op_type = OP_CUSTOM;
2262 slf_restore.op_ppaddr = pp_restore;
2263 slf_restore.op_first = PL_op;
2264
2265 slf_ax = ax - 1; /* undo the ax++ inside dAXMARK */
2266
2267 if (PL_op->op_flags & OPf_STACKED)
2268 {
2269 if (items > slf_arga)
2270 {
2271 slf_arga = items;
2272 free (slf_argv);
2273 slf_argv = malloc (slf_arga * sizeof (SV *));
2274 }
2275
2276 slf_argc = items;
2277
2278 for (i = 0; i < items; ++i)
2279 slf_argv [i] = SvREFCNT_inc (arg [i]);
2280 }
2281 else
2282 slf_argc = 0;
2283
2284 PL_op->op_ppaddr = pp_slf;
2285 /*PL_op->op_type = OP_CUSTOM; /* we do behave like entersub still */
2286
2287 PL_op = (OP *)&slf_restore;
2288}
2289
2290/*****************************************************************************/
2291/* PerlIO::cede */
2292
2293typedef struct
2294{
2295 PerlIOBuf base;
2296 NV next, every;
2297} PerlIOCede;
2298
2299static IV
2300PerlIOCede_pushed (pTHX_ PerlIO *f, const char *mode, SV *arg, PerlIO_funcs *tab)
2301{
2302 PerlIOCede *self = PerlIOSelf (f, PerlIOCede);
2303
2304 self->every = SvCUR (arg) ? SvNV (arg) : 0.01;
2305 self->next = nvtime () + self->every;
2306
2307 return PerlIOBuf_pushed (aTHX_ f, mode, Nullsv, tab);
2308}
2309
2310static SV *
2311PerlIOCede_getarg (pTHX_ PerlIO *f, CLONE_PARAMS *param, int flags)
2312{
2313 PerlIOCede *self = PerlIOSelf (f, PerlIOCede);
2314
2315 return newSVnv (self->every);
2316}
2317
2318static IV
2319PerlIOCede_flush (pTHX_ PerlIO *f)
2320{
2321 PerlIOCede *self = PerlIOSelf (f, PerlIOCede);
2322 double now = nvtime ();
2323
2324 if (now >= self->next)
2325 {
2326 api_cede (aTHX);
2327 self->next = now + self->every;
2328 }
2329
2330 return PerlIOBuf_flush (aTHX_ f);
2331}
2332
2333static PerlIO_funcs PerlIO_cede =
2334{
2335 sizeof(PerlIO_funcs),
2336 "cede",
2337 sizeof(PerlIOCede),
2338 PERLIO_K_DESTRUCT | PERLIO_K_RAW,
2339 PerlIOCede_pushed,
2340 PerlIOBuf_popped,
2341 PerlIOBuf_open,
2342 PerlIOBase_binmode,
2343 PerlIOCede_getarg,
2344 PerlIOBase_fileno,
2345 PerlIOBuf_dup,
2346 PerlIOBuf_read,
2347 PerlIOBuf_unread,
2348 PerlIOBuf_write,
2349 PerlIOBuf_seek,
2350 PerlIOBuf_tell,
2351 PerlIOBuf_close,
2352 PerlIOCede_flush,
2353 PerlIOBuf_fill,
2354 PerlIOBase_eof,
2355 PerlIOBase_error,
2356 PerlIOBase_clearerr,
2357 PerlIOBase_setlinebuf,
2358 PerlIOBuf_get_base,
2359 PerlIOBuf_bufsiz,
2360 PerlIOBuf_get_ptr,
2361 PerlIOBuf_get_cnt,
2362 PerlIOBuf_set_ptrcnt,
2363};
2364
2365/*****************************************************************************/
2366/* Coro::Semaphore & Coro::Signal */
2367
2368static SV *
2369coro_waitarray_new (pTHX_ int count)
2370{
2371 /* a semaphore contains a counter IV in $sem->[0] and any waiters after that */
2372 AV *av = newAV ();
2373 SV **ary;
2374
2375 /* unfortunately, building manually saves memory */
2376 Newx (ary, 2, SV *);
2377 AvALLOC (av) = ary;
2378#if PERL_VERSION_ATLEAST (5,10,0)
2379 AvARRAY (av) = ary;
2380#else
2381 /* 5.8.8 needs this syntax instead of AvARRAY = ary, yet */
2382 /* -DDEBUGGING flags this as a bug, despite it perfectly working */
2383 SvPVX ((SV *)av) = (char *)ary;
2384#endif
2385 AvMAX (av) = 1;
2386 AvFILLp (av) = 0;
2387 ary [0] = newSViv (count);
2388
2389 return newRV_noinc ((SV *)av);
2390}
2391
2392/* semaphore */
2393
2394static void
2395coro_semaphore_adjust (pTHX_ AV *av, IV adjust)
2396{
2397 SV *count_sv = AvARRAY (av)[0];
2398 IV count = SvIVX (count_sv);
2399
2400 count += adjust;
2401 SvIVX (count_sv) = count;
2402
2403 /* now wake up as many waiters as are expected to lock */
2404 while (count > 0 && AvFILLp (av) > 0)
2405 {
2406 SV *cb;
2407
2408 /* swap first two elements so we can shift a waiter */
2409 AvARRAY (av)[0] = AvARRAY (av)[1];
2410 AvARRAY (av)[1] = count_sv;
2411 cb = av_shift (av);
2412
2413 if (SvOBJECT (cb))
2414 {
2415 api_ready (aTHX_ cb);
2416 --count;
2417 }
2418 else if (SvTYPE (cb) == SVt_PVCV)
2419 {
2420 dSP;
2421 PUSHMARK (SP);
2422 XPUSHs (sv_2mortal (newRV_inc ((SV *)av)));
2423 PUTBACK;
2424 call_sv (cb, G_VOID | G_DISCARD | G_EVAL | G_KEEPERR);
2425 }
2426
2427 SvREFCNT_dec (cb);
2428 }
2429}
2430
2431static void
2432coro_semaphore_on_destroy (pTHX_ struct coro *coro)
2433{
2434 /* call $sem->adjust (0) to possibly wake up some other waiters */
2435 coro_semaphore_adjust (aTHX_ (AV *)coro->slf_frame.data, 0);
2436}
2437
2438static int
2439slf_check_semaphore_down_or_wait (pTHX_ struct CoroSLF *frame, int acquire)
2440{
2441 AV *av = (AV *)frame->data;
2442 SV *count_sv = AvARRAY (av)[0];
2443
2444 /* if we are about to throw, don't actually acquire the lock, just throw */
2445 if (CORO_THROW)
2446 return 0;
2447 else if (SvIVX (count_sv) > 0)
2448 {
2449 SvSTATE_current->on_destroy = 0;
2450
2451 if (acquire)
2452 SvIVX (count_sv) = SvIVX (count_sv) - 1;
2453 else
2454 coro_semaphore_adjust (aTHX_ av, 0);
2455
2456 return 0;
2457 }
2458 else
2459 {
2460 int i;
2461 /* if we were woken up but can't down, we look through the whole */
2462 /* waiters list and only add us if we aren't in there already */
2463 /* this avoids some degenerate memory usage cases */
2464
2465 for (i = 1; i <= AvFILLp (av); ++i)
2466 if (AvARRAY (av)[i] == SvRV (coro_current))
2467 return 1;
2468
2469 av_push (av, SvREFCNT_inc (SvRV (coro_current)));
2470 return 1;
2471 }
2472}
2473
2474static int
2475slf_check_semaphore_down (pTHX_ struct CoroSLF *frame)
2476{
2477 return slf_check_semaphore_down_or_wait (aTHX_ frame, 1);
2478}
2479
2480static int
2481slf_check_semaphore_wait (pTHX_ struct CoroSLF *frame)
2482{
2483 return slf_check_semaphore_down_or_wait (aTHX_ frame, 0);
2484}
2485
2486static void
2487slf_init_semaphore_down_or_wait (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2488{
2489 AV *av = (AV *)SvRV (arg [0]);
2490
2491 if (SvIVX (AvARRAY (av)[0]) > 0)
2492 {
2493 frame->data = (void *)av;
2494 frame->prepare = prepare_nop;
2495 }
2496 else
2497 {
2498 av_push (av, SvREFCNT_inc (SvRV (coro_current)));
2499
2500 frame->data = (void *)sv_2mortal (SvREFCNT_inc ((SV *)av));
2501 frame->prepare = prepare_schedule;
2502
2503 /* to avoid race conditions when a woken-up coro gets terminated */
2504 /* we arrange for a temporary on_destroy that calls adjust (0) */
2505 SvSTATE_current->on_destroy = coro_semaphore_on_destroy;
2506 }
2507}
2508
2509static void
2510slf_init_semaphore_down (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2511{
2512 slf_init_semaphore_down_or_wait (aTHX_ frame, cv, arg, items);
2513 frame->check = slf_check_semaphore_down;
2514}
2515
2516static void
2517slf_init_semaphore_wait (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2518{
2519 if (items >= 2)
2520 {
2521 /* callback form */
2522 AV *av = (AV *)SvRV (arg [0]);
2523 CV *cb_cv = coro_sv_2cv (aTHX_ arg [1]);
2524
2525 av_push (av, (SV *)SvREFCNT_inc_NN (cb_cv));
2526
2527 if (SvIVX (AvARRAY (av)[0]) > 0)
2528 coro_semaphore_adjust (aTHX_ av, 0);
2529
2530 frame->prepare = prepare_nop;
2531 frame->check = slf_check_nop;
2532 }
2533 else
2534 {
2535 slf_init_semaphore_down_or_wait (aTHX_ frame, cv, arg, items);
2536 frame->check = slf_check_semaphore_wait;
2537 }
2538}
2539
2540/* signal */
2541
2542static void
2543coro_signal_wake (pTHX_ AV *av, int count)
2544{
2545 SvIVX (AvARRAY (av)[0]) = 0;
2546
2547 /* now signal count waiters */
2548 while (count > 0 && AvFILLp (av) > 0)
2549 {
2550 SV *cb;
2551
2552 /* swap first two elements so we can shift a waiter */
2553 cb = AvARRAY (av)[0];
2554 AvARRAY (av)[0] = AvARRAY (av)[1];
2555 AvARRAY (av)[1] = cb;
2556
2557 cb = av_shift (av);
2558
2559 api_ready (aTHX_ cb);
2560 sv_setiv (cb, 0); /* signal waiter */
2561 SvREFCNT_dec (cb);
2562
2563 --count;
2564 }
2565}
2566
2567static int
2568slf_check_signal_wait (pTHX_ struct CoroSLF *frame)
2569{
2570 /* if we are about to throw, also stop waiting */
2571 return SvROK ((SV *)frame->data) && !CORO_THROW;
2572}
2573
2574static void
2575slf_init_signal_wait (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2576{
2577 AV *av = (AV *)SvRV (arg [0]);
2578
2579 if (SvIVX (AvARRAY (av)[0]))
2580 {
2581 SvIVX (AvARRAY (av)[0]) = 0;
2582 frame->prepare = prepare_nop;
2583 frame->check = slf_check_nop;
2584 }
2585 else
2586 {
2587 SV *waiter = newRV_inc (SvRV (coro_current)); /* owned by signal av */
2588
2589 av_push (av, waiter);
2590
2591 frame->data = (void *)sv_2mortal (SvREFCNT_inc_NN (waiter)); /* owned by process */
2592 frame->prepare = prepare_schedule;
2593 frame->check = slf_check_signal_wait;
2594 }
2595}
2596
2597/*****************************************************************************/
2598/* Coro::AIO */
2599
2600#define CORO_MAGIC_type_aio PERL_MAGIC_ext
2601
2602/* helper storage struct */
2603struct io_state
2604{
2605 int errorno;
2606 I32 laststype; /* U16 in 5.10.0 */
2607 int laststatval;
2608 Stat_t statcache;
2609};
2610
2611static void
2612coro_aio_callback (pTHX_ CV *cv)
2613{
2614 dXSARGS;
2615 AV *state = (AV *)GENSUB_ARG;
2616 SV *coro = av_pop (state);
2617 SV *data_sv = newSV (sizeof (struct io_state));
2618
2619 av_extend (state, items - 1);
2620
2621 sv_upgrade (data_sv, SVt_PV);
2622 SvCUR_set (data_sv, sizeof (struct io_state));
2623 SvPOK_only (data_sv);
2624
2625 {
2626 struct io_state *data = (struct io_state *)SvPVX (data_sv);
2627
2628 data->errorno = errno;
2629 data->laststype = PL_laststype;
2630 data->laststatval = PL_laststatval;
2631 data->statcache = PL_statcache;
2632 }
2633
2634 /* now build the result vector out of all the parameters and the data_sv */
2635 {
2636 int i;
2637
2638 for (i = 0; i < items; ++i)
2639 av_push (state, SvREFCNT_inc_NN (ST (i)));
2640 }
2641
2642 av_push (state, data_sv);
2643
2644 api_ready (aTHX_ coro);
2645 SvREFCNT_dec (coro);
2646 SvREFCNT_dec ((AV *)state);
2647}
2648
2649static int
2650slf_check_aio_req (pTHX_ struct CoroSLF *frame)
2651{
2652 AV *state = (AV *)frame->data;
2653
2654 /* if we are about to throw, return early */
2655 /* this does not cancel the aio request, but at least */
2656 /* it quickly returns */
2657 if (CORO_THROW)
2658 return 0;
2659
2660 /* one element that is an RV? repeat! */
2661 if (AvFILLp (state) == 0 && SvROK (AvARRAY (state)[0]))
2662 return 1;
2663
2664 /* restore status */
2665 {
2666 SV *data_sv = av_pop (state);
2667 struct io_state *data = (struct io_state *)SvPVX (data_sv);
2668
2669 errno = data->errorno;
2670 PL_laststype = data->laststype;
2671 PL_laststatval = data->laststatval;
2672 PL_statcache = data->statcache;
2673
2674 SvREFCNT_dec (data_sv);
2675 }
2676
2677 /* push result values */
2678 {
2679 dSP;
2680 int i;
2681
2682 EXTEND (SP, AvFILLp (state) + 1);
2683 for (i = 0; i <= AvFILLp (state); ++i)
2684 PUSHs (sv_2mortal (SvREFCNT_inc_NN (AvARRAY (state)[i])));
2685
2686 PUTBACK;
2687 }
2688
2689 return 0;
2690}
2691
2692static void
2693slf_init_aio_req (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
2694{
2695 AV *state = (AV *)sv_2mortal ((SV *)newAV ());
2696 SV *coro_hv = SvRV (coro_current);
2697 struct coro *coro = SvSTATE_hv (coro_hv);
2698
2699 /* put our coroutine id on the state arg */
2700 av_push (state, SvREFCNT_inc_NN (coro_hv));
2701
2702 /* first see whether we have a non-zero priority and set it as AIO prio */
2703 if (coro->prio)
2704 {
2705 dSP;
2706
2707 static SV *prio_cv;
2708 static SV *prio_sv;
2709
2710 if (expect_false (!prio_cv))
2711 {
2712 prio_cv = (SV *)get_cv ("IO::AIO::aioreq_pri", 0);
2713 prio_sv = newSViv (0);
2714 }
2715
2716 PUSHMARK (SP);
2717 sv_setiv (prio_sv, coro->prio);
2718 XPUSHs (prio_sv);
2719
2720 PUTBACK;
2721 call_sv (prio_cv, G_VOID | G_DISCARD);
2722 }
2723
2724 /* now call the original request */
2725 {
2726 dSP;
2727 CV *req = (CV *)CORO_MAGIC_NN ((SV *)cv, CORO_MAGIC_type_aio)->mg_obj;
2728 int i;
2729
2730 PUSHMARK (SP);
2731
2732 /* first push all args to the stack */
2733 EXTEND (SP, items + 1);
2734
2735 for (i = 0; i < items; ++i)
2736 PUSHs (arg [i]);
2737
2738 /* now push the callback closure */
2739 PUSHs (sv_2mortal (gensub (aTHX_ coro_aio_callback, (void *)SvREFCNT_inc_NN ((SV *)state))));
2740
2741 /* now call the AIO function - we assume our request is uncancelable */
2742 PUTBACK;
2743 call_sv ((SV *)req, G_VOID | G_DISCARD);
2744 }
2745
2746 /* now that the requets is going, we loop toll we have a result */
2747 frame->data = (void *)state;
2748 frame->prepare = prepare_schedule;
2749 frame->check = slf_check_aio_req;
2750}
2751
2752static void
2753coro_aio_req_xs (pTHX_ CV *cv)
2754{
2755 dXSARGS;
2756
2757 CORO_EXECUTE_SLF_XS (slf_init_aio_req);
2758
2759 XSRETURN_EMPTY;
2760}
2761
2762/*****************************************************************************/
2763
2764#if CORO_CLONE
2765# include "clone.c"
2766#endif
2767
2768MODULE = Coro::State PACKAGE = Coro::State PREFIX = api_
2769
2770PROTOTYPES: DISABLE
2771
2772BOOT:
2773{
2774#ifdef USE_ITHREADS
2775# if CORO_PTHREAD
2776 coro_thx = PERL_GET_CONTEXT;
2777# endif
2778#endif
2779 BOOT_PAGESIZE;
2780
2781 cctx_current = cctx_new_empty ();
2782
2783 irsgv = gv_fetchpv ("/" , GV_ADD|GV_NOTQUAL, SVt_PV);
2784 stdoutgv = gv_fetchpv ("STDOUT", GV_ADD|GV_NOTQUAL, SVt_PVIO);
2785
2786 orig_sigelem_get = PL_vtbl_sigelem.svt_get; PL_vtbl_sigelem.svt_get = coro_sigelem_get;
2787 orig_sigelem_set = PL_vtbl_sigelem.svt_set; PL_vtbl_sigelem.svt_set = coro_sigelem_set;
2788 orig_sigelem_clr = PL_vtbl_sigelem.svt_clear; PL_vtbl_sigelem.svt_clear = coro_sigelem_clr;
2789
2790 hv_sig = coro_get_hv (aTHX_ "SIG", TRUE);
2791 rv_diehook = newRV_inc ((SV *)gv_fetchpv ("Coro::State::diehook" , 0, SVt_PVCV));
2792 rv_warnhook = newRV_inc ((SV *)gv_fetchpv ("Coro::State::warnhook", 0, SVt_PVCV));
2793
2794 coro_state_stash = gv_stashpv ("Coro::State", TRUE);
2795
2796 newCONSTSUB (coro_state_stash, "CC_TRACE" , newSViv (CC_TRACE));
2797 newCONSTSUB (coro_state_stash, "CC_TRACE_SUB" , newSViv (CC_TRACE_SUB));
2798 newCONSTSUB (coro_state_stash, "CC_TRACE_LINE", newSViv (CC_TRACE_LINE));
2799 newCONSTSUB (coro_state_stash, "CC_TRACE_ALL" , newSViv (CC_TRACE_ALL));
2800
2801 main_mainstack = PL_mainstack;
2802 main_top_env = PL_top_env;
2803
2804 while (main_top_env->je_prev)
2805 main_top_env = main_top_env->je_prev;
2806
2807 {
2808 SV *slf = sv_2mortal (newSViv (PTR2IV (pp_slf)));
2809
2810 if (!PL_custom_op_names) PL_custom_op_names = newHV ();
2811 hv_store_ent (PL_custom_op_names, slf, newSVpv ("coro_slf", 0), 0);
2812
2813 if (!PL_custom_op_descs) PL_custom_op_descs = newHV ();
2814 hv_store_ent (PL_custom_op_descs, slf, newSVpv ("coro schedule like function", 0), 0);
2815 }
2816
2817 coroapi.ver = CORO_API_VERSION;
2818 coroapi.rev = CORO_API_REVISION;
2819
2820 coroapi.transfer = api_transfer;
2821
2822 coroapi.sv_state = SvSTATE_;
2823 coroapi.execute_slf = api_execute_slf;
2824 coroapi.prepare_nop = prepare_nop;
2825 coroapi.prepare_schedule = prepare_schedule;
2826 coroapi.prepare_cede = prepare_cede;
2827 coroapi.prepare_cede_notself = prepare_cede_notself;
2828
2829 {
2830 SV **svp = hv_fetch (PL_modglobal, "Time::NVtime", 12, 0);
2831
2832 if (!svp) croak ("Time::HiRes is required");
2833 if (!SvIOK (*svp)) croak ("Time::NVtime isn't a function pointer");
2834
2835 nvtime = INT2PTR (double (*)(), SvIV (*svp));
2836 }
2837
2838 assert (("PRIO_NORMAL must be 0", !PRIO_NORMAL));
2839}
2840
2841SV *
2842new (char *klass, ...)
2843 ALIAS:
2844 Coro::new = 1
2845 CODE:
2846{
2847 struct coro *coro;
2848 MAGIC *mg;
2849 HV *hv;
2850 CV *cb;
2851 int i;
2852
2853 if (items > 1)
2854 {
2855 cb = coro_sv_2cv (aTHX_ ST (1));
2856
2857 if (!ix)
235 { 2858 {
236 /* I never used formats, so how should I know how these are implemented? */ 2859 if (CvISXSUB (cb))
237 /* my bold guess is as a simple, plain sub... */ 2860 croak ("Coro::State doesn't support XS functions as coroutine start, caught");
238 croak ("CXt_FORMAT not yet handled. Don't switch coroutines from within formats"); 2861
2862 if (!CvROOT (cb))
2863 croak ("Coro::State doesn't support autoloaded or undefined functions as coroutine start, caught");
239 } 2864 }
240 } 2865 }
241 2866
242 if (top_si->si_type == PERLSI_MAIN)
243 break;
244
245 top_si = top_si->si_prev;
246 ccstk = top_si->si_cxstack;
247 cxix = top_si->si_cxix;
248 }
249
250 PUTBACK;
251 }
252
253 c->dowarn = PL_dowarn;
254 c->defav = GvAV (PL_defgv);
255 c->curstackinfo = PL_curstackinfo;
256 c->curstack = PL_curstack;
257 c->mainstack = PL_mainstack;
258 c->stack_sp = PL_stack_sp;
259 c->op = PL_op;
260 c->curpad = PL_curpad;
261 c->stack_base = PL_stack_base;
262 c->stack_max = PL_stack_max;
263 c->tmps_stack = PL_tmps_stack;
264 c->tmps_floor = PL_tmps_floor;
265 c->tmps_ix = PL_tmps_ix;
266 c->tmps_max = PL_tmps_max;
267 c->markstack = PL_markstack;
268 c->markstack_ptr = PL_markstack_ptr;
269 c->markstack_max = PL_markstack_max;
270 c->scopestack = PL_scopestack;
271 c->scopestack_ix = PL_scopestack_ix;
272 c->scopestack_max = PL_scopestack_max;
273 c->savestack = PL_savestack;
274 c->savestack_ix = PL_savestack_ix;
275 c->savestack_max = PL_savestack_max;
276 c->retstack = PL_retstack;
277 c->retstack_ix = PL_retstack_ix;
278 c->retstack_max = PL_retstack_max;
279 c->curcop = PL_curcop;
280}
281
282static void
283LOAD(pTHX_ Coro__State c)
284{
285 PL_dowarn = c->dowarn;
286 GvAV (PL_defgv) = c->defav;
287 PL_curstackinfo = c->curstackinfo;
288 PL_curstack = c->curstack;
289 PL_mainstack = c->mainstack;
290 PL_stack_sp = c->stack_sp;
291 PL_op = c->op;
292 PL_curpad = c->curpad;
293 PL_stack_base = c->stack_base;
294 PL_stack_max = c->stack_max;
295 PL_tmps_stack = c->tmps_stack;
296 PL_tmps_floor = c->tmps_floor;
297 PL_tmps_ix = c->tmps_ix;
298 PL_tmps_max = c->tmps_max;
299 PL_markstack = c->markstack;
300 PL_markstack_ptr = c->markstack_ptr;
301 PL_markstack_max = c->markstack_max;
302 PL_scopestack = c->scopestack;
303 PL_scopestack_ix = c->scopestack_ix;
304 PL_scopestack_max = c->scopestack_max;
305 PL_savestack = c->savestack;
306 PL_savestack_ix = c->savestack_ix;
307 PL_savestack_max = c->savestack_max;
308 PL_retstack = c->retstack;
309 PL_retstack_ix = c->retstack_ix;
310 PL_retstack_max = c->retstack_max;
311 PL_curcop = c->curcop;
312
313 {
314 dSP;
315 CV *cv;
316
317 /* now do the ugly restore mess */
318 while ((cv = (CV *)POPs))
319 {
320 AV *padlist = (AV *)POPs;
321
322 put_padlist (cv);
323 CvPADLIST(cv) = padlist;
324 CvDEPTH(cv) = (I32)POPs;
325
326#ifdef USE_THREADS
327 CvOWNER(cv) = (struct perl_thread *)POPs;
328 error does not work either
329#endif
330 }
331
332 PUTBACK;
333 }
334}
335
336/* this is an EXACT copy of S_nuke_stacks in perl.c, which is unfortunately static */
337STATIC void
338destroy_stacks(pTHX)
339{
340 dSP;
341
342 /* die does this while calling POPSTACK, but I just don't see why. */
343 dounwind(-1);
344
345 /* is this ugly, I ask? */
346 while (PL_scopestack_ix)
347 LEAVE;
348
349 while (PL_curstackinfo->si_next)
350 PL_curstackinfo = PL_curstackinfo->si_next;
351
352 while (PL_curstackinfo)
353 {
354 PERL_SI *p = PL_curstackinfo->si_prev;
355
356 SvREFCNT_dec(PL_curstackinfo->si_stack);
357 Safefree(PL_curstackinfo->si_cxstack);
358 Safefree(PL_curstackinfo);
359 PL_curstackinfo = p;
360 }
361
362 if (PL_scopestack_ix != 0)
363 Perl_warner(aTHX_ WARN_INTERNAL,
364 "Unbalanced scopes: %ld more ENTERs than LEAVEs\n",
365 (long)PL_scopestack_ix);
366 if (PL_savestack_ix != 0)
367 Perl_warner(aTHX_ WARN_INTERNAL,
368 "Unbalanced saves: %ld more saves than restores\n",
369 (long)PL_savestack_ix);
370 if (PL_tmps_floor != -1)
371 Perl_warner(aTHX_ WARN_INTERNAL,"Unbalanced tmps: %ld more allocs than frees\n",
372 (long)PL_tmps_floor + 1);
373 /*
374 */
375 Safefree(PL_tmps_stack);
376 Safefree(PL_markstack);
377 Safefree(PL_scopestack);
378 Safefree(PL_savestack);
379 Safefree(PL_retstack);
380}
381
382#define SUB_INIT "Coro::State::_newcoro"
383
384MODULE = Coro::State PACKAGE = Coro::State
385
386PROTOTYPES: ENABLE
387
388BOOT:
389 if (!padlist_cache)
390 padlist_cache = newHV ();
391
392Coro::State
393_newprocess(args)
394 SV * args
395 PROTOTYPE: $
396 CODE:
397 Coro__State coro;
398
399 if (!SvROK (args) || SvTYPE (SvRV (args)) != SVt_PVAV)
400 croak ("Coro::State::newprocess expects an arrayref");
401
402 New (0, coro, 1, struct coro); 2867 Newz (0, coro, 1, struct coro);
2868 coro->args = newAV ();
2869 coro->flags = CF_NEW;
403 2870
404 coro->mainstack = 0; /* actual work is done inside transfer */ 2871 if (coro_first) coro_first->prev = coro;
405 coro->args = (AV *)SvREFCNT_inc (SvRV (args)); 2872 coro->next = coro_first;
2873 coro_first = coro;
406 2874
407 RETVAL = coro; 2875 coro->hv = hv = newHV ();
2876 mg = sv_magicext ((SV *)hv, 0, CORO_MAGIC_type_state, &coro_state_vtbl, (char *)coro, 0);
2877 mg->mg_flags |= MGf_DUP;
2878 RETVAL = sv_bless (newRV_noinc ((SV *)hv), gv_stashpv (klass, 1));
2879
2880 if (items > 1)
2881 {
2882 av_extend (coro->args, items - 1 + ix - 1);
2883
2884 if (ix)
2885 {
2886 av_push (coro->args, SvREFCNT_inc_NN ((SV *)cb));
2887 cb = cv_coro_run;
2888 }
2889
2890 coro->startcv = (CV *)SvREFCNT_inc_NN ((SV *)cb);
2891
2892 for (i = 2; i < items; i++)
2893 av_push (coro->args, newSVsv (ST (i)));
2894 }
2895}
408 OUTPUT: 2896 OUTPUT:
409 RETVAL 2897 RETVAL
410 2898
411void 2899void
412transfer(prev,next) 2900transfer (...)
413 Coro::State_or_hashref prev 2901 PROTOTYPE: $$
414 Coro::State_or_hashref next 2902 CODE:
415 CODE: 2903 CORO_EXECUTE_SLF_XS (slf_init_transfer);
416 2904
417 if (prev != next) 2905bool
2906_destroy (SV *coro_sv)
2907 CODE:
2908 RETVAL = coro_state_destroy (aTHX_ SvSTATE (coro_sv));
2909 OUTPUT:
2910 RETVAL
2911
2912void
2913_exit (int code)
2914 PROTOTYPE: $
2915 CODE:
2916 _exit (code);
2917
2918SV *
2919clone (Coro::State coro)
2920 CODE:
2921{
2922#if CORO_CLONE
2923 struct coro *ncoro = coro_clone (coro);
2924 MAGIC *mg;
2925 /* TODO: too much duplication */
2926 ncoro->hv = newHV ();
2927 mg = sv_magicext ((SV *)ncoro->hv, 0, CORO_MAGIC_type_state, &coro_state_vtbl, (char *)ncoro, 0);
2928 mg->mg_flags |= MGf_DUP;
2929 RETVAL = sv_bless (newRV_noinc ((SV *)ncoro->hv), SvSTASH (coro->hv));
2930#else
2931 croak ("Coro::State->clone has not been configured into this installation of Coro, realised");
2932#endif
2933}
2934 OUTPUT:
2935 RETVAL
2936
2937int
2938cctx_stacksize (int new_stacksize = 0)
2939 PROTOTYPE: ;$
2940 CODE:
2941 RETVAL = cctx_stacksize;
2942 if (new_stacksize)
418 { 2943 {
419 PUTBACK; 2944 cctx_stacksize = new_stacksize;
420 SAVE (aTHX_ prev); 2945 ++cctx_gen;
421
422 /* 2946 }
423 * this could be done in newprocess which would lead to 2947 OUTPUT:
424 * extremely elegant and fast (just PUTBACK/SAVE/LOAD/SPAGAIN) 2948 RETVAL
425 * code here, but lazy allocation of stacks has also 2949
426 * some virtues and the overhead of the if() is nil. 2950int
2951cctx_max_idle (int max_idle = 0)
2952 PROTOTYPE: ;$
2953 CODE:
2954 RETVAL = cctx_max_idle;
2955 if (max_idle > 1)
2956 cctx_max_idle = max_idle;
2957 OUTPUT:
2958 RETVAL
2959
2960int
2961cctx_count ()
2962 PROTOTYPE:
2963 CODE:
2964 RETVAL = cctx_count;
2965 OUTPUT:
2966 RETVAL
2967
2968int
2969cctx_idle ()
2970 PROTOTYPE:
2971 CODE:
2972 RETVAL = cctx_idle;
2973 OUTPUT:
2974 RETVAL
2975
2976void
2977list ()
2978 PROTOTYPE:
2979 PPCODE:
2980{
2981 struct coro *coro;
2982 for (coro = coro_first; coro; coro = coro->next)
2983 if (coro->hv)
2984 XPUSHs (sv_2mortal (newRV_inc ((SV *)coro->hv)));
2985}
2986
2987void
2988call (Coro::State coro, SV *coderef)
2989 ALIAS:
2990 eval = 1
2991 CODE:
2992{
2993 if (coro->mainstack && ((coro->flags & CF_RUNNING) || coro->slot))
427 */ 2994 {
428 if (next->mainstack) 2995 struct coro temp;
2996
2997 if (!(coro->flags & CF_RUNNING))
429 { 2998 {
430 LOAD (aTHX_ next); 2999 PUTBACK;
431 next->mainstack = 0; /* unnecessary but much cleaner */ 3000 save_perl (aTHX_ &temp);
3001 load_perl (aTHX_ coro);
3002 }
3003
3004 {
3005 dSP;
3006 ENTER;
3007 SAVETMPS;
3008 PUTBACK;
3009 PUSHSTACK;
3010 PUSHMARK (SP);
3011
3012 if (ix)
3013 eval_sv (coderef, 0);
3014 else
3015 call_sv (coderef, G_KEEPERR | G_EVAL | G_VOID | G_DISCARD);
3016
3017 POPSTACK;
3018 SPAGAIN;
3019 FREETMPS;
3020 LEAVE;
3021 PUTBACK;
3022 }
3023
3024 if (!(coro->flags & CF_RUNNING))
3025 {
3026 save_perl (aTHX_ coro);
3027 load_perl (aTHX_ &temp);
432 SPAGAIN; 3028 SPAGAIN;
433 } 3029 }
434 else
435 {
436 /*
437 * emulate part of the perl startup here.
438 */
439 UNOP myop;
440
441 init_stacks (); /* from perl.c */
442 PL_op = (OP *)&myop;
443 /*PL_curcop = 0;*/
444 GvAV (PL_defgv) = (SV *)SvREFCNT_inc (next->args);
445
446 SPAGAIN;
447 Zero(&myop, 1, UNOP);
448 myop.op_next = Nullop;
449 myop.op_flags = OPf_WANT_VOID;
450
451 PUSHMARK(SP);
452 XPUSHs ((SV*)get_cv(SUB_INIT, TRUE));
453 PUTBACK;
454 /*
455 * the next line is slightly wrong, as PL_op->op_next
456 * is actually being executed so we skip the first op.
457 * that doesn't matter, though, since it is only
458 * pp_nextstate and we never return...
459 */
460 PL_op = Perl_pp_entersub(aTHX);
461 SPAGAIN;
462
463 ENTER;
464 }
465 } 3030 }
3031}
3032
3033SV *
3034is_ready (Coro::State coro)
3035 PROTOTYPE: $
3036 ALIAS:
3037 is_ready = CF_READY
3038 is_running = CF_RUNNING
3039 is_new = CF_NEW
3040 is_destroyed = CF_DESTROYED
3041 CODE:
3042 RETVAL = boolSV (coro->flags & ix);
3043 OUTPUT:
3044 RETVAL
466 3045
467void 3046void
468DESTROY(coro) 3047throw (Coro::State self, SV *throw = &PL_sv_undef)
469 Coro::State coro 3048 PROTOTYPE: $;$
470 CODE: 3049 CODE:
3050{
3051 struct coro *current = SvSTATE_current;
3052 SV **throwp = self == current ? &CORO_THROW : &self->except;
3053 SvREFCNT_dec (*throwp);
3054 *throwp = SvOK (throw) ? newSVsv (throw) : 0;
3055}
471 3056
472 if (coro->mainstack) 3057void
3058api_trace (SV *coro, int flags = CC_TRACE | CC_TRACE_SUB)
3059 PROTOTYPE: $;$
3060 C_ARGS: aTHX_ coro, flags
3061
3062SV *
3063has_cctx (Coro::State coro)
3064 PROTOTYPE: $
3065 CODE:
3066 /* maybe manage the running flag differently */
3067 RETVAL = boolSV (!!coro->cctx || (coro->flags & CF_RUNNING));
3068 OUTPUT:
3069 RETVAL
3070
3071int
3072is_traced (Coro::State coro)
3073 PROTOTYPE: $
3074 CODE:
3075 RETVAL = (coro->cctx ? coro->cctx->flags : 0) & CC_TRACE_ALL;
3076 OUTPUT:
3077 RETVAL
3078
3079UV
3080rss (Coro::State coro)
3081 PROTOTYPE: $
3082 ALIAS:
3083 usecount = 1
3084 CODE:
3085 switch (ix)
3086 {
3087 case 0: RETVAL = coro_rss (aTHX_ coro); break;
3088 case 1: RETVAL = coro->usecount; break;
3089 }
3090 OUTPUT:
3091 RETVAL
3092
3093void
3094force_cctx ()
3095 PROTOTYPE:
3096 CODE:
3097 cctx_current->idle_sp = 0;
3098
3099void
3100swap_defsv (Coro::State self)
3101 PROTOTYPE: $
3102 ALIAS:
3103 swap_defav = 1
3104 CODE:
3105 if (!self->slot)
3106 croak ("cannot swap state with coroutine that has no saved state,");
3107 else
473 { 3108 {
474 struct coro temp; 3109 SV **src = ix ? (SV **)&GvAV (PL_defgv) : &GvSV (PL_defgv);
3110 SV **dst = ix ? (SV **)&self->slot->defav : (SV **)&self->slot->defsv;
475 3111
3112 SV *tmp = *src; *src = *dst; *dst = tmp;
3113 }
3114
3115
3116MODULE = Coro::State PACKAGE = Coro
3117
3118BOOT:
3119{
3120 int i;
3121
3122 sv_pool_rss = coro_get_sv (aTHX_ "Coro::POOL_RSS" , TRUE);
3123 sv_pool_size = coro_get_sv (aTHX_ "Coro::POOL_SIZE" , TRUE);
3124 cv_coro_run = get_cv ( "Coro::_terminate", GV_ADD);
3125 cv_coro_terminate = get_cv ( "Coro::terminate" , GV_ADD);
3126 coro_current = coro_get_sv (aTHX_ "Coro::current" , FALSE); SvREADONLY_on (coro_current);
3127 av_async_pool = coro_get_av (aTHX_ "Coro::async_pool", TRUE);
3128 av_destroy = coro_get_av (aTHX_ "Coro::destroy" , TRUE);
3129 sv_manager = coro_get_sv (aTHX_ "Coro::manager" , TRUE);
3130 sv_idle = coro_get_sv (aTHX_ "Coro::idle" , TRUE);
3131
3132 sv_async_pool_idle = newSVpv ("[async pool idle]", 0); SvREADONLY_on (sv_async_pool_idle);
3133 sv_Coro = newSVpv ("Coro", 0); SvREADONLY_on (sv_Coro);
3134 cv_pool_handler = get_cv ("Coro::pool_handler", GV_ADD); SvREADONLY_on (cv_pool_handler);
3135 cv_coro_state_new = get_cv ("Coro::State::new", 0); SvREADONLY_on (cv_coro_state_new);
3136
3137 coro_stash = gv_stashpv ("Coro", TRUE);
3138
3139 newCONSTSUB (coro_stash, "PRIO_MAX", newSViv (PRIO_MAX));
3140 newCONSTSUB (coro_stash, "PRIO_HIGH", newSViv (PRIO_HIGH));
3141 newCONSTSUB (coro_stash, "PRIO_NORMAL", newSViv (PRIO_NORMAL));
3142 newCONSTSUB (coro_stash, "PRIO_LOW", newSViv (PRIO_LOW));
3143 newCONSTSUB (coro_stash, "PRIO_IDLE", newSViv (PRIO_IDLE));
3144 newCONSTSUB (coro_stash, "PRIO_MIN", newSViv (PRIO_MIN));
3145
3146 for (i = PRIO_MAX - PRIO_MIN + 1; i--; )
3147 coro_ready[i] = newAV ();
3148
3149 {
3150 SV *sv = coro_get_sv (aTHX_ "Coro::API", TRUE);
3151
3152 coroapi.schedule = api_schedule;
3153 coroapi.schedule_to = api_schedule_to;
3154 coroapi.cede = api_cede;
3155 coroapi.cede_notself = api_cede_notself;
3156 coroapi.ready = api_ready;
3157 coroapi.is_ready = api_is_ready;
3158 coroapi.nready = coro_nready;
3159 coroapi.current = coro_current;
3160
3161 /*GCoroAPI = &coroapi;*/
3162 sv_setiv (sv, (IV)&coroapi);
3163 SvREADONLY_on (sv);
3164 }
3165}
3166
3167void
3168terminate (...)
3169 CODE:
3170 CORO_EXECUTE_SLF_XS (slf_init_terminate);
3171
3172void
3173schedule (...)
3174 CODE:
3175 CORO_EXECUTE_SLF_XS (slf_init_schedule);
3176
3177void
3178schedule_to (...)
3179 CODE:
3180 CORO_EXECUTE_SLF_XS (slf_init_schedule_to);
3181
3182void
3183cede_to (...)
3184 CODE:
3185 CORO_EXECUTE_SLF_XS (slf_init_cede_to);
3186
3187void
3188cede (...)
3189 CODE:
3190 CORO_EXECUTE_SLF_XS (slf_init_cede);
3191
3192void
3193cede_notself (...)
3194 CODE:
3195 CORO_EXECUTE_SLF_XS (slf_init_cede_notself);
3196
3197void
3198_cancel (Coro::State self)
3199 CODE:
3200 coro_state_destroy (aTHX_ self);
3201 coro_call_on_destroy (aTHX_ self);
3202
3203void
3204_set_current (SV *current)
3205 PROTOTYPE: $
3206 CODE:
3207 SvREFCNT_dec (SvRV (coro_current));
3208 SvRV_set (coro_current, SvREFCNT_inc_NN (SvRV (current)));
3209
3210void
3211_set_readyhook (SV *hook)
3212 PROTOTYPE: $
3213 CODE:
3214 SvREFCNT_dec (coro_readyhook);
3215 coro_readyhook = SvOK (hook) ? newSVsv (hook) : 0;
3216
3217int
3218prio (Coro::State coro, int newprio = 0)
3219 PROTOTYPE: $;$
3220 ALIAS:
3221 nice = 1
3222 CODE:
3223{
3224 RETVAL = coro->prio;
3225
3226 if (items > 1)
3227 {
3228 if (ix)
3229 newprio = coro->prio - newprio;
3230
3231 if (newprio < PRIO_MIN) newprio = PRIO_MIN;
3232 if (newprio > PRIO_MAX) newprio = PRIO_MAX;
3233
3234 coro->prio = newprio;
3235 }
3236}
3237 OUTPUT:
3238 RETVAL
3239
3240SV *
3241ready (SV *self)
3242 PROTOTYPE: $
3243 CODE:
3244 RETVAL = boolSV (api_ready (aTHX_ self));
3245 OUTPUT:
3246 RETVAL
3247
3248int
3249nready (...)
3250 PROTOTYPE:
3251 CODE:
3252 RETVAL = coro_nready;
3253 OUTPUT:
3254 RETVAL
3255
3256void
3257_pool_handler (...)
3258 CODE:
3259 CORO_EXECUTE_SLF_XS (slf_init_pool_handler);
3260
3261void
3262async_pool (SV *cv, ...)
3263 PROTOTYPE: &@
3264 PPCODE:
3265{
3266 HV *hv = (HV *)av_pop (av_async_pool);
3267 AV *av = newAV ();
3268 SV *cb = ST (0);
3269 int i;
3270
3271 av_extend (av, items - 2);
3272 for (i = 1; i < items; ++i)
3273 av_push (av, SvREFCNT_inc_NN (ST (i)));
3274
3275 if ((SV *)hv == &PL_sv_undef)
3276 {
3277 PUSHMARK (SP);
3278 EXTEND (SP, 2);
3279 PUSHs (sv_Coro);
3280 PUSHs ((SV *)cv_pool_handler);
476 PUTBACK; 3281 PUTBACK;
477 SAVE(aTHX_ (&temp)); 3282 call_sv ((SV *)cv_coro_state_new, G_SCALAR);
478 LOAD(aTHX_ coro);
479
480 destroy_stacks ();
481 SvREFCNT_dec ((SV *)GvAV (PL_defgv));
482
483 LOAD((&temp));
484 SPAGAIN; 3283 SPAGAIN;
3284
3285 hv = (HV *)SvREFCNT_inc_NN (SvRV (POPs));
485 } 3286 }
486 3287
3288 {
3289 struct coro *coro = SvSTATE_hv (hv);
3290
3291 assert (!coro->invoke_cb);
3292 assert (!coro->invoke_av);
3293 coro->invoke_cb = SvREFCNT_inc (cb);
3294 coro->invoke_av = av;
3295 }
3296
3297 api_ready (aTHX_ (SV *)hv);
3298
3299 if (GIMME_V != G_VOID)
3300 XPUSHs (sv_2mortal (newRV_noinc ((SV *)hv)));
3301 else
487 SvREFCNT_dec (coro->args); 3302 SvREFCNT_dec (hv);
488 Safefree (coro); 3303}
489 3304
3305SV *
3306rouse_cb ()
3307 PROTOTYPE:
3308 CODE:
3309 RETVAL = coro_new_rouse_cb (aTHX);
3310 OUTPUT:
3311 RETVAL
490 3312
3313void
3314rouse_wait (...)
3315 PROTOTYPE: ;$
3316 PPCODE:
3317 CORO_EXECUTE_SLF_XS (slf_init_rouse_wait);
3318
3319
3320MODULE = Coro::State PACKAGE = PerlIO::cede
3321
3322BOOT:
3323 PerlIO_define_layer (aTHX_ &PerlIO_cede);
3324
3325
3326MODULE = Coro::State PACKAGE = Coro::Semaphore
3327
3328SV *
3329new (SV *klass, SV *count = 0)
3330 CODE:
3331 RETVAL = sv_bless (
3332 coro_waitarray_new (aTHX_ count && SvOK (count) ? SvIV (count) : 1),
3333 GvSTASH (CvGV (cv))
3334 );
3335 OUTPUT:
3336 RETVAL
3337
3338# helper for Coro::Channel
3339SV *
3340_alloc (int count)
3341 CODE:
3342 RETVAL = coro_waitarray_new (aTHX_ count);
3343 OUTPUT:
3344 RETVAL
3345
3346SV *
3347count (SV *self)
3348 CODE:
3349 RETVAL = newSVsv (AvARRAY ((AV *)SvRV (self))[0]);
3350 OUTPUT:
3351 RETVAL
3352
3353void
3354up (SV *self, int adjust = 1)
3355 ALIAS:
3356 adjust = 1
3357 CODE:
3358 coro_semaphore_adjust (aTHX_ (AV *)SvRV (self), ix ? adjust : 1);
3359
3360void
3361down (...)
3362 CODE:
3363 CORO_EXECUTE_SLF_XS (slf_init_semaphore_down);
3364
3365void
3366wait (...)
3367 CODE:
3368 CORO_EXECUTE_SLF_XS (slf_init_semaphore_wait);
3369
3370void
3371try (SV *self)
3372 PPCODE:
3373{
3374 AV *av = (AV *)SvRV (self);
3375 SV *count_sv = AvARRAY (av)[0];
3376 IV count = SvIVX (count_sv);
3377
3378 if (count > 0)
3379 {
3380 --count;
3381 SvIVX (count_sv) = count;
3382 XSRETURN_YES;
3383 }
3384 else
3385 XSRETURN_NO;
3386}
3387
3388void
3389waiters (SV *self)
3390 PPCODE:
3391{
3392 AV *av = (AV *)SvRV (self);
3393 int wcount = AvFILLp (av) + 1 - 1;
3394
3395 if (GIMME_V == G_SCALAR)
3396 XPUSHs (sv_2mortal (newSViv (wcount)));
3397 else
3398 {
3399 int i;
3400 EXTEND (SP, wcount);
3401 for (i = 1; i <= wcount; ++i)
3402 PUSHs (sv_2mortal (newRV_inc (AvARRAY (av)[i])));
3403 }
3404}
3405
3406MODULE = Coro::State PACKAGE = Coro::Signal
3407
3408SV *
3409new (SV *klass)
3410 CODE:
3411 RETVAL = sv_bless (
3412 coro_waitarray_new (aTHX_ 0),
3413 GvSTASH (CvGV (cv))
3414 );
3415 OUTPUT:
3416 RETVAL
3417
3418void
3419wait (...)
3420 CODE:
3421 CORO_EXECUTE_SLF_XS (slf_init_signal_wait);
3422
3423void
3424broadcast (SV *self)
3425 CODE:
3426{
3427 AV *av = (AV *)SvRV (self);
3428 coro_signal_wake (aTHX_ av, AvFILLp (av));
3429}
3430
3431void
3432send (SV *self)
3433 CODE:
3434{
3435 AV *av = (AV *)SvRV (self);
3436
3437 if (AvFILLp (av))
3438 coro_signal_wake (aTHX_ av, 1);
3439 else
3440 SvIVX (AvARRAY (av)[0]) = 1; /* remember the signal */
3441}
3442
3443IV
3444awaited (SV *self)
3445 CODE:
3446 RETVAL = AvFILLp ((AV *)SvRV (self)) + 1 - 1;
3447 OUTPUT:
3448 RETVAL
3449
3450
3451MODULE = Coro::State PACKAGE = Coro::AnyEvent
3452
3453BOOT:
3454 sv_activity = coro_get_sv (aTHX_ "Coro::AnyEvent::ACTIVITY", TRUE);
3455
3456void
3457_schedule (...)
3458 CODE:
3459{
3460 static int incede;
3461
3462 api_cede_notself (aTHX);
3463
3464 ++incede;
3465 while (coro_nready >= incede && api_cede (aTHX))
3466 ;
3467
3468 sv_setsv (sv_activity, &PL_sv_undef);
3469 if (coro_nready >= incede)
3470 {
3471 PUSHMARK (SP);
3472 PUTBACK;
3473 call_pv ("Coro::AnyEvent::_activity", G_KEEPERR | G_EVAL | G_VOID | G_DISCARD);
3474 }
3475
3476 --incede;
3477}
3478
3479
3480MODULE = Coro::State PACKAGE = Coro::AIO
3481
3482void
3483_register (char *target, char *proto, SV *req)
3484 CODE:
3485{
3486 CV *req_cv = coro_sv_2cv (aTHX_ req);
3487 /* newXSproto doesn't return the CV on 5.8 */
3488 CV *slf_cv = newXS (target, coro_aio_req_xs, __FILE__);
3489 sv_setpv ((SV *)slf_cv, proto);
3490 sv_magicext ((SV *)slf_cv, (SV *)req_cv, CORO_MAGIC_type_aio, 0, 0, 0);
3491}
3492

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines