ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/Coro-Multicore/Multicore.xs
Revision: 1.29
Committed: Tue Aug 3 14:15:39 2021 UTC (2 years, 8 months ago) by root
Branch: MAIN
CVS Tags: rel-1_07, HEAD
Changes since 1.28: +8 -0 lines
Log Message:
1.07

File Contents

# Content
1 /* most win32 perls are beyond fixing, requiring dTHX */
2 /* even for ISO-C functions such as malloc. avoid! avoid! avoid! */
3 /* and fail to define numerous symbols, but still overrwide them */
4 /* with non-working versions (e.g. setjmp). */
5 #ifdef _WIN32
6 /*# define PERL_CORE 1 fixes some, breaks others */
7 #else
8 # define PERL_NO_GET_CONTEXT
9 #endif
10
11 #include "EXTERN.h"
12 #include "perl.h"
13 #include "XSUB.h"
14
15 #define X_STACKSIZE 1024 * sizeof (void *)
16
17 #include "CoroAPI.h"
18 #include "perlmulticore.h"
19 #include "schmorp.h"
20 #include "xthread.h"
21
22 #ifdef _WIN32
23 #ifndef sigset_t
24 #define sigset_t int
25 #endif
26 #endif
27
28 #ifndef SvREFCNT_dec_NN
29 #define SvREFCNT_dec_NN(sv) SvREFCNT_dec (sv)
30 #endif
31
32 #ifndef SvREFCNT_dec_simple_void_NN
33 #define SvREFCNT_dec_simple_void_NN(sv) SvREFCNT_dec (sv)
34 #endif
35
36 #ifndef SvREFCNT_inc_NN
37 #define SvREFCNT_inc_NN(sv) SvREFCNT_inc (sv)
38 #endif
39
40 #ifndef RECURSION_CHECK
41 #define RECURSION_CHECK 0
42 #endif
43
44 static X_TLS_DECLARE(current_key);
45 #if RECURSION_CHECK
46 static X_TLS_DECLARE(check_key);
47 #endif
48
49 static void
50 fatal (const char *msg)
51 {
52 write (2, msg, strlen (msg));
53 abort ();
54 }
55
56 static s_epipe ep;
57 static void *perl_thx;
58 static sigset_t cursigset, fullsigset;
59
60 static int global_enable = 0;
61 static int thread_enable; /* 0 undefined, 1 disabled, 2 enabled */
62
63 /* assigned to a thread for each release/acquire */
64 struct tctx
65 {
66 void *coro;
67 int wait_f;
68 xcond_t acquire_c;
69 int jeret;
70 };
71
72 static struct tctx *tctx_free;
73
74 static struct tctx *
75 tctx_get (void)
76 {
77 struct tctx *ctx;
78
79 if (!tctx_free)
80 {
81 ctx = malloc (sizeof (*tctx_free));
82 X_COND_CREATE (ctx->acquire_c);
83 }
84 else
85 {
86 ctx = tctx_free;
87 tctx_free = tctx_free->coro;
88 }
89
90 return ctx;
91 }
92
93 static void
94 tctx_put (struct tctx *ctx)
95 {
96 ctx->coro = tctx_free;
97 tctx_free = ctx;
98 }
99
100 /* a stack of tctxs */
101 struct tctxs
102 {
103 struct tctx **ctxs;
104 int cur, max;
105 };
106
107 static struct tctx *
108 tctxs_get (struct tctxs *ctxs)
109 {
110 return ctxs->ctxs[--ctxs->cur];
111 }
112
113 static void
114 tctxs_put (struct tctxs *ctxs, struct tctx *ctx)
115 {
116 if (ctxs->cur >= ctxs->max)
117 {
118 ctxs->max = ctxs->max ? ctxs->max * 2 : 16;
119 ctxs->ctxs = realloc (ctxs->ctxs, ctxs->max * sizeof (ctxs->ctxs[0]));
120 }
121
122 ctxs->ctxs[ctxs->cur++] = ctx;
123 }
124
125 static xmutex_t release_m = X_MUTEX_INIT;
126 static xcond_t release_c = X_COND_INIT;
127 static struct tctxs releasers;
128 static int idle;
129 static int min_idle = 1;
130 static int curthreads, max_threads = 1; /* protected by release_m */
131
132 static xmutex_t acquire_m = X_MUTEX_INIT;
133 static struct tctxs acquirers;
134
135 X_THREAD_PROC(thread_proc)
136 {
137 PERL_SET_CONTEXT (perl_thx);
138
139 {
140 dTHXa (perl_thx);
141 dJMPENV;
142 struct tctx *ctx;
143 int catchret;
144
145 X_LOCK (release_m);
146
147 for (;;)
148 {
149 while (!releasers.cur)
150 if (idle <= min_idle || 1)
151 X_COND_WAIT (release_c, release_m);
152 else
153 {
154 struct timespec ts = { time (0) + idle - min_idle, 0 };
155
156 if (X_COND_TIMEDWAIT (release_c, release_m, ts) == ETIMEDOUT)
157 if (idle > min_idle && !releasers.cur)
158 break;
159 }
160
161 ctx = tctxs_get (&releasers);
162 --idle;
163 X_UNLOCK (release_m);
164
165 if (!ctx) /* timed out? */
166 break;
167
168 pthread_sigmask (SIG_SETMASK, &cursigset, 0);
169 JMPENV_PUSH (ctx->jeret);
170
171 if (!ctx->jeret)
172 while (ctx->coro)
173 CORO_SCHEDULE;
174
175 JMPENV_POP;
176 pthread_sigmask (SIG_SETMASK, &fullsigset, &cursigset);
177
178 X_LOCK (acquire_m);
179 ctx->wait_f = 1;
180 X_COND_SIGNAL (ctx->acquire_c);
181 X_UNLOCK (acquire_m);
182
183 X_LOCK (release_m);
184 ++idle;
185 }
186 }
187 }
188
189 static void
190 start_thread (void)
191 {
192 xthread_t tid;
193
194 if (!curthreads)
195 {
196 X_UNLOCK (release_m);
197 {
198 dTHX;
199 dSP;
200
201 PUSHSTACKi (PERLSI_REQUIRE);
202
203 eval_pv ("Coro::Multicore::init", 1);
204
205 POPSTACK;
206 }
207 X_LOCK (release_m);
208 }
209
210 if (curthreads >= max_threads && 0)
211 return;
212
213 ++curthreads;
214 ++idle;
215 xthread_create (&tid, thread_proc, 0);
216 }
217
218 static void
219 pmapi_release (void)
220 {
221 if (! ((thread_enable ? thread_enable : global_enable) & 1))
222 {
223 X_TLS_SET (current_key, 0);
224 return;
225 }
226
227 #if RECURSION_CHECK
228 if (X_TLS_GET (check_key))
229 fatal ("FATAL: perlinterp_release () called without valid perl context");
230
231 X_TLS_SET (check_key, &check_key);
232 #endif
233
234 struct tctx *ctx = tctx_get ();
235 ctx->coro = SvREFCNT_inc_simple_NN (CORO_CURRENT);
236 ctx->wait_f = 0;
237
238 X_TLS_SET (current_key, ctx);
239 pthread_sigmask (SIG_SETMASK, &fullsigset, &cursigset);
240
241 X_LOCK (release_m);
242
243 if (idle <= min_idle)
244 start_thread ();
245
246 tctxs_put (&releasers, ctx);
247 X_COND_SIGNAL (release_c);
248
249 while (!idle && releasers.cur)
250 {
251 X_UNLOCK (release_m);
252 X_LOCK (release_m);
253 }
254
255 X_UNLOCK (release_m);
256 }
257
258 static void
259 pmapi_acquire (void)
260 {
261 int jeret;
262 struct tctx *ctx = X_TLS_GET (current_key);
263
264 if (!ctx)
265 return;
266
267 #if RECURSION_CHECK
268 if (X_TLS_GET (check_key) != &check_key)
269 fatal ("FATAL: perlinterp_acquire () called with valid perl context");
270
271 X_TLS_SET (check_key, 0);
272 #endif
273
274 X_LOCK (acquire_m);
275
276 tctxs_put (&acquirers, ctx);
277
278 s_epipe_signal (&ep);
279 while (!ctx->wait_f)
280 X_COND_WAIT (ctx->acquire_c, acquire_m);
281 X_UNLOCK (acquire_m);
282
283 jeret = ctx->jeret;
284 tctx_put (ctx);
285 pthread_sigmask (SIG_SETMASK, &cursigset, 0);
286
287 if (jeret)
288 {
289 dTHX;
290 JMPENV_JUMP (jeret);
291 }
292 }
293
294 static void
295 set_thread_enable (pTHX_ void *arg)
296 {
297 thread_enable = PTR2IV (arg);
298 }
299
300 static void
301 atfork_child (void)
302 {
303 s_epipe_renew (&ep);
304 }
305
306 MODULE = Coro::Multicore PACKAGE = Coro::Multicore
307
308 PROTOTYPES: DISABLE
309
310 BOOT:
311 {
312 #ifndef _WIN32
313 sigfillset (&fullsigset);
314 #endif
315
316 X_TLS_INIT (current_key);
317 #if RECURSION_CHECK
318 X_TLS_INIT (check_key);
319 #endif
320
321 if (s_epipe_new (&ep))
322 croak ("Coro::Multicore: unable to initialise event pipe.\n");
323
324 pthread_atfork (0, 0, atfork_child);
325
326 perl_thx = PERL_GET_CONTEXT;
327
328 I_CORO_API ("Coro::Multicore");
329
330 if (0) { /*D*/
331 X_LOCK (release_m);
332 while (idle < min_idle)
333 start_thread ();
334 X_UNLOCK (release_m);
335 }
336
337 /* not perfectly efficient to do it this way, but it is simple */
338 perl_multicore_init (); /* calls release */
339 perl_multicore_api->pmapi_release = pmapi_release;
340 perl_multicore_api->pmapi_acquire = pmapi_acquire;
341 }
342
343 bool
344 enable (bool enable = NO_INIT)
345 CODE:
346 RETVAL = global_enable;
347 if (items)
348 global_enable = enable;
349 OUTPUT:
350 RETVAL
351
352 void
353 scoped_enable ()
354 CODE:
355 LEAVE; /* see Guard.xs */
356 CORO_ENTERLEAVE_SCOPE_HOOK (set_thread_enable, (void *)1, set_thread_enable, (void *)0);
357 ENTER; /* see Guard.xs */
358
359 void
360 scoped_disable ()
361 CODE:
362 LEAVE; /* see Guard.xs */
363 CORO_ENTERLEAVE_SCOPE_HOOK (set_thread_enable, (void *)2, set_thread_enable, (void *)0);
364 ENTER; /* see Guard.xs */
365
366 #if 0
367
368 U32
369 min_idle_threads (U32 min = NO_INIT)
370 CODE:
371 X_LOCK (acquire_m);
372 RETVAL = min_idle;
373 if (items)
374 min_idle = min;
375 X_UNLOCK (acquire_m);
376 OUTPUT:
377 RETVAL
378
379 #endif
380
381 int
382 fd ()
383 CODE:
384 RETVAL = s_epipe_fd (&ep);
385 OUTPUT:
386 RETVAL
387
388 void
389 poll (...)
390 CODE:
391 s_epipe_drain (&ep);
392 X_LOCK (acquire_m);
393 while (acquirers.cur)
394 {
395 struct tctx *ctx = tctxs_get (&acquirers);
396 CORO_READY ((SV *)ctx->coro);
397 SvREFCNT_dec_simple_void_NN ((SV *)ctx->coro);
398 ctx->coro = 0;
399 }
400 X_UNLOCK (acquire_m);
401
402 void
403 sleep (NV seconds)
404 CODE:
405 perlinterp_release ();
406 {
407 int nsec = seconds;
408 if (nsec) sleep (nsec);
409 nsec = (seconds - nsec) * 1e9;
410 if (nsec) usleep (nsec);
411 }
412 perlinterp_acquire ();
413