ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/Coro-Multicore/Multicore.xs
Revision: 1.21
Committed: Sun Aug 26 15:30:55 2018 UTC (5 years, 9 months ago) by root
Branch: MAIN
Changes since 1.20: +1 -1 lines
Log Message:
*** empty log message ***

File Contents

# Content
1 /* most win32 perls are beyond fixing, requiring dTHX */
2 /* even for ISO-C functions such as malloc. avoid! avoid! avoid! */
3 /* and fail to define numerous symbols, but still overrwide them */
4 /* with non-working versions (e.g. setjmp). */
5 #ifdef _WIN32
6 /*# define PERL_CORE 1 fixes some, breaks others */
7 #else
8 # define PERL_NO_GET_CONTEXT
9 #endif
10
11 #include "EXTERN.h"
12 #include "perl.h"
13 #include "XSUB.h"
14
15 #define X_STACKSIZE 1024 * sizeof (void *)
16
17 #include "CoroAPI.h"
18 #include "perlmulticore.h"
19 #include "schmorp.h"
20 #include "xthread.h"
21
22 #ifdef _WIN32
23 #ifndef sigset_t
24 #define sigset_t int
25 #endif
26 #endif
27
28 #ifndef SvREFCNT_dec_NN
29 #define SvREFCNT_dec_NN(sv) SvREFCNT_dec (sv)
30 #endif
31
32 #ifndef SvREFCNT_inc_NN
33 #define SvREFCNT_inc_NN(sv) SvREFCNT_inc (sv)
34 #endif
35
36 #define RECURSION_CHECK 0
37
38 static X_TLS_DECLARE(current_key);
39 #if RECURSION_CHECK
40 static X_TLS_DECLARE(check_key);
41 #endif
42
43
44 static s_epipe ep;
45 static void *perl_thx;
46 static sigset_t cursigset, fullsigset;
47
48 static int global_enable = 0;
49 static int thread_enable; /* 0 undefined, 1 disabled, 2 enabled */
50
51 /* assigned to a thread for each release/acquire */
52 struct tctx
53 {
54 void *coro;
55 int wait_f;
56 xcond_t acquire_c;
57 int jeret;
58 };
59
60 static struct tctx *tctx_free;
61
62 static struct tctx *
63 tctx_get (void)
64 {
65 struct tctx *ctx;
66
67 if (!tctx_free)
68 {
69 ctx = malloc (sizeof (*tctx_free));
70 X_COND_CREATE (ctx->acquire_c);
71 }
72 else
73 {
74 ctx = tctx_free;
75 tctx_free = tctx_free->coro;
76 }
77
78 return ctx;
79 }
80
81 static void
82 tctx_put (struct tctx *ctx)
83 {
84 ctx->coro = tctx_free;
85 tctx_free = ctx;
86 }
87
88 /* a stack of tctxs */
89 struct tctxs
90 {
91 struct tctx **ctxs;
92 int cur, max;
93 };
94
95 static struct tctx *
96 tctxs_get (struct tctxs *ctxs)
97 {
98 return ctxs->ctxs[--ctxs->cur];
99 }
100
101 static void
102 tctxs_put (struct tctxs *ctxs, struct tctx *ctx)
103 {
104 if (ctxs->cur >= ctxs->max)
105 {
106 ctxs->max = ctxs->max ? ctxs->max * 2 : 16;
107 ctxs->ctxs = realloc (ctxs->ctxs, ctxs->max * sizeof (ctxs->ctxs[0]));
108 }
109
110 ctxs->ctxs[ctxs->cur++] = ctx;
111 }
112
113 static xmutex_t release_m = X_MUTEX_INIT;
114 static xcond_t release_c = X_COND_INIT;
115 static struct tctxs releasers;
116 static int idle;
117 static int min_idle = 1;
118 static int curthreads, max_threads = 1; /* protected by release_m */
119
120 static xmutex_t acquire_m = X_MUTEX_INIT;
121 static struct tctxs acquirers;
122
123 X_THREAD_PROC(thread_proc)
124 {
125 PERL_SET_CONTEXT (perl_thx);
126
127 {
128 dTHXa (perl_thx);
129 dJMPENV;
130 struct tctx *ctx;
131 int catchret;
132
133 X_LOCK (release_m);
134
135 for (;;)
136 {
137 while (!releasers.cur)
138 if (idle <= min_idle || 1)
139 X_COND_WAIT (release_c, release_m);
140 else
141 {
142 struct timespec ts = { time (0) + idle - min_idle, 0 };
143
144 if (X_COND_TIMEDWAIT (release_c, release_m, ts) == ETIMEDOUT)
145 if (idle > min_idle && !releasers.cur)
146 break;
147 }
148
149 ctx = tctxs_get (&releasers);
150 --idle;
151 X_UNLOCK (release_m);
152
153 if (!ctx) /* timed out? */
154 break;
155
156 pthread_sigmask (SIG_SETMASK, &cursigset, 0);
157 JMPENV_PUSH (ctx->jeret);
158
159 if (!ctx->jeret)
160 while (ctx->coro)
161 CORO_SCHEDULE;
162
163 JMPENV_POP;
164 pthread_sigmask (SIG_SETMASK, &fullsigset, &cursigset);
165
166 X_LOCK (acquire_m);
167 ctx->wait_f = 1;
168 X_COND_SIGNAL (ctx->acquire_c);
169 X_UNLOCK (acquire_m);
170
171 X_LOCK (release_m);
172 ++idle;
173 }
174 }
175 }
176
177 static void
178 start_thread (void)
179 {
180 xthread_t tid;
181
182 if (curthreads >= max_threads && 0)
183 return;
184
185 ++curthreads;
186 ++idle;
187 xthread_create (&tid, thread_proc, 0);
188 }
189
190 static void
191 pmapi_release (void)
192 {
193 #if RECURSION_CHECK
194 if (X_TLS_GET (check_key))
195 croak ("perlinterp_release () called without valid perl context");
196
197 X_TLS_SET (check_key, &check_key);
198 #endif
199
200 if (! ((thread_enable ? thread_enable : global_enable) & 1))
201 {
202 X_TLS_SET (current_key, 0);
203 return;
204 }
205
206 struct tctx *ctx = tctx_get ();
207 ctx->coro = SvREFCNT_inc_simple_NN (CORO_CURRENT);
208 ctx->wait_f = 0;
209
210 X_TLS_SET (current_key, ctx);
211 pthread_sigmask (SIG_SETMASK, &fullsigset, &cursigset);
212
213 X_LOCK (release_m);
214
215 if (idle <= min_idle)
216 start_thread ();
217
218 tctxs_put (&releasers, ctx);
219 X_COND_SIGNAL (release_c);
220
221 while (!idle && releasers.cur)
222 {
223 X_UNLOCK (release_m);
224 X_LOCK (release_m);
225 }
226
227 X_UNLOCK (release_m);
228 }
229
230 static void
231 pmapi_acquire (void)
232 {
233 int jeret;
234 struct tctx *ctx = X_TLS_GET (current_key);
235
236 #if RECURSION_CHECK
237 if (X_TLS_GET (check_key) != &check_key)
238 croak ("perlinterp_acquire () called with valid perl context");
239
240 X_TLS_SET (check_key, 0);
241 #endif
242
243 if (!ctx)
244 return;
245
246 X_LOCK (acquire_m);
247
248 tctxs_put (&acquirers, ctx);
249
250 s_epipe_signal (&ep);
251 while (!ctx->wait_f)
252 X_COND_WAIT (ctx->acquire_c, acquire_m);
253 X_UNLOCK (acquire_m);
254
255 jeret = ctx->jeret;
256 tctx_put (ctx);
257 pthread_sigmask (SIG_SETMASK, &cursigset, 0);
258
259 if (jeret)
260 {
261 dTHX;
262 JMPENV_JUMP (jeret);
263 }
264 }
265
266 static void
267 set_thread_enable (pTHX_ void *arg)
268 {
269 thread_enable = PTR2IV (arg);
270 }
271
272 MODULE = Coro::Multicore PACKAGE = Coro::Multicore
273
274 PROTOTYPES: DISABLE
275
276 BOOT:
277 {
278 #ifndef _WIN32
279 sigfillset (&fullsigset);
280 #endif
281
282 X_TLS_INIT (current_key);
283 #if RECURSION_CHECK
284 X_TLS_INIT (check_key);
285 #endif
286
287 if (s_epipe_new (&ep))
288 croak ("Coro::Multicore: unable to initialise event pipe.\n");
289
290 perl_thx = PERL_GET_CONTEXT;
291
292 I_CORO_API ("Coro::Multicore");
293
294 X_LOCK (release_m);
295 while (idle < min_idle)
296 start_thread ();
297 X_UNLOCK (release_m);
298
299 /* not perfectly efficient to do it this way, but it is simple */
300 perl_multicore_init (); /* calls release */
301 perl_multicore_api->pmapi_release = pmapi_release;
302 perl_multicore_api->pmapi_acquire = pmapi_acquire;
303 }
304
305 bool
306 enable (bool enable = NO_INIT)
307 CODE:
308 RETVAL = global_enable;
309 if (items)
310 global_enable = enable;
311 OUTPUT:
312 RETVAL
313
314 void
315 scoped_enable ()
316 CODE:
317 LEAVE; /* see Guard.xs */
318 CORO_ENTERLEAVE_SCOPE_HOOK (set_thread_enable, (void *)1, set_thread_enable, (void *)0);
319 ENTER; /* see Guard.xs */
320
321 void
322 scoped_disable ()
323 CODE:
324 LEAVE; /* see Guard.xs */
325 CORO_ENTERLEAVE_SCOPE_HOOK (set_thread_enable, (void *)2, set_thread_enable, (void *)0);
326 ENTER; /* see Guard.xs */
327
328 U32
329 min_idle_threads (U32 min = NO_INIT)
330 CODE:
331 X_LOCK (acquire_m);
332 RETVAL = min_idle;
333 if (items)
334 min_idle = min;
335 X_UNLOCK (acquire_m);
336 OUTPUT:
337 RETVAL
338
339
340 int
341 fd ()
342 CODE:
343 RETVAL = s_epipe_fd (&ep);
344 OUTPUT:
345 RETVAL
346
347 void
348 poll (...)
349 CODE:
350 s_epipe_drain (&ep);
351 X_LOCK (acquire_m);
352 while (acquirers.cur)
353 {
354 struct tctx *ctx = tctxs_get (&acquirers);
355 CORO_READY ((SV *)ctx->coro);
356 SvREFCNT_dec_NN ((SV *)ctx->coro);
357 ctx->coro = 0;
358 }
359 X_UNLOCK (acquire_m);
360
361 void
362 sleep (NV seconds)
363 CODE:
364 perlinterp_release ();
365 {
366 int nsec = seconds;
367 if (nsec) sleep (nsec);
368 nsec = (seconds - nsec) * 1e9;
369 if (nsec) usleep (nsec);
370 }
371 perlinterp_acquire ();
372