ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/Coro-Multicore/Multicore.xs
Revision: 1.29
Committed: Tue Aug 3 14:15:39 2021 UTC (2 years, 9 months ago) by root
Branch: MAIN
CVS Tags: rel-1_07, HEAD
Changes since 1.28: +8 -0 lines
Log Message:
1.07

File Contents

# User Rev Content
1 root 1.18 /* most win32 perls are beyond fixing, requiring dTHX */
2     /* even for ISO-C functions such as malloc. avoid! avoid! avoid! */
3 root 1.20 /* and fail to define numerous symbols, but still overrwide them */
4 root 1.19 /* with non-working versions (e.g. setjmp). */
5     #ifdef _WIN32
6 root 1.20 /*# define PERL_CORE 1 fixes some, breaks others */
7 root 1.19 #else
8 root 1.18 # define PERL_NO_GET_CONTEXT
9     #endif
10 root 1.1
11     #include "EXTERN.h"
12     #include "perl.h"
13     #include "XSUB.h"
14    
15 root 1.7 #define X_STACKSIZE 1024 * sizeof (void *)
16 root 1.1
17     #include "CoroAPI.h"
18     #include "perlmulticore.h"
19     #include "schmorp.h"
20     #include "xthread.h"
21    
22     #ifdef _WIN32
23 root 1.18 #ifndef sigset_t
24     #define sigset_t int
25     #endif
26 root 1.1 #endif
27    
28 root 1.8 #ifndef SvREFCNT_dec_NN
29     #define SvREFCNT_dec_NN(sv) SvREFCNT_dec (sv)
30     #endif
31    
32 root 1.22 #ifndef SvREFCNT_dec_simple_void_NN
33     #define SvREFCNT_dec_simple_void_NN(sv) SvREFCNT_dec (sv)
34     #endif
35    
36 root 1.8 #ifndef SvREFCNT_inc_NN
37     #define SvREFCNT_inc_NN(sv) SvREFCNT_inc (sv)
38     #endif
39    
40 root 1.23 #ifndef RECURSION_CHECK
41     #define RECURSION_CHECK 0
42     #endif
43 root 1.16
44 root 1.12 static X_TLS_DECLARE(current_key);
45 root 1.16 #if RECURSION_CHECK
46     static X_TLS_DECLARE(check_key);
47     #endif
48    
49 root 1.23 static void
50     fatal (const char *msg)
51     {
52     write (2, msg, strlen (msg));
53     abort ();
54     }
55 root 1.1
56     static s_epipe ep;
57     static void *perl_thx;
58     static sigset_t cursigset, fullsigset;
59    
60 root 1.6 static int global_enable = 0;
61 root 1.4 static int thread_enable; /* 0 undefined, 1 disabled, 2 enabled */
62    
63 root 1.6 /* assigned to a thread for each release/acquire */
64 root 1.1 struct tctx
65     {
66     void *coro;
67 root 1.6 int wait_f;
68 root 1.9 xcond_t acquire_c;
69 root 1.11 int jeret;
70 root 1.1 };
71    
72     static struct tctx *tctx_free;
73    
74     static struct tctx *
75     tctx_get (void)
76     {
77     struct tctx *ctx;
78    
79     if (!tctx_free)
80 root 1.3 {
81     ctx = malloc (sizeof (*tctx_free));
82 root 1.9 X_COND_CREATE (ctx->acquire_c);
83 root 1.3 }
84 root 1.1 else
85     {
86     ctx = tctx_free;
87     tctx_free = tctx_free->coro;
88     }
89    
90     return ctx;
91     }
92    
93     static void
94     tctx_put (struct tctx *ctx)
95     {
96     ctx->coro = tctx_free;
97     tctx_free = ctx;
98     }
99    
100 root 1.9 /* a stack of tctxs */
101     struct tctxs
102     {
103     struct tctx **ctxs;
104     int cur, max;
105     };
106    
107     static struct tctx *
108     tctxs_get (struct tctxs *ctxs)
109     {
110     return ctxs->ctxs[--ctxs->cur];
111     }
112    
113     static void
114     tctxs_put (struct tctxs *ctxs, struct tctx *ctx)
115     {
116     if (ctxs->cur >= ctxs->max)
117     {
118     ctxs->max = ctxs->max ? ctxs->max * 2 : 16;
119     ctxs->ctxs = realloc (ctxs->ctxs, ctxs->max * sizeof (ctxs->ctxs[0]));
120     }
121    
122     ctxs->ctxs[ctxs->cur++] = ctx;
123     }
124    
125     static xmutex_t release_m = X_MUTEX_INIT;
126     static xcond_t release_c = X_COND_INIT;
127     static struct tctxs releasers;
128     static int idle;
129     static int min_idle = 1;
130     static int curthreads, max_threads = 1; /* protected by release_m */
131    
132     static xmutex_t acquire_m = X_MUTEX_INIT;
133     static struct tctxs acquirers;
134    
135 root 1.1 X_THREAD_PROC(thread_proc)
136     {
137     PERL_SET_CONTEXT (perl_thx);
138    
139     {
140 root 1.17 dTHXa (perl_thx);
141 root 1.11 dJMPENV;
142 root 1.1 struct tctx *ctx;
143 root 1.11 int catchret;
144 root 1.1
145 root 1.9 X_LOCK (release_m);
146 root 1.6
147 root 1.1 for (;;)
148     {
149 root 1.9 while (!releasers.cur)
150 root 1.6 if (idle <= min_idle || 1)
151 root 1.9 X_COND_WAIT (release_c, release_m);
152 root 1.6 else
153     {
154     struct timespec ts = { time (0) + idle - min_idle, 0 };
155    
156 root 1.9 if (X_COND_TIMEDWAIT (release_c, release_m, ts) == ETIMEDOUT)
157     if (idle > min_idle && !releasers.cur)
158 root 1.6 break;
159     }
160    
161 root 1.9 ctx = tctxs_get (&releasers);
162 root 1.6 --idle;
163 root 1.9 X_UNLOCK (release_m);
164 root 1.3
165 root 1.6 if (!ctx) /* timed out? */
166     break;
167    
168 root 1.1 pthread_sigmask (SIG_SETMASK, &cursigset, 0);
169 root 1.11 JMPENV_PUSH (ctx->jeret);
170 root 1.1
171 root 1.11 if (!ctx->jeret)
172     while (ctx->coro)
173     CORO_SCHEDULE;
174 root 1.1
175 root 1.11 JMPENV_POP;
176 root 1.3 pthread_sigmask (SIG_SETMASK, &fullsigset, &cursigset);
177    
178 root 1.9 X_LOCK (acquire_m);
179 root 1.3 ctx->wait_f = 1;
180 root 1.9 X_COND_SIGNAL (ctx->acquire_c);
181     X_UNLOCK (acquire_m);
182 root 1.1
183 root 1.9 X_LOCK (release_m);
184 root 1.6 ++idle;
185 root 1.1 }
186     }
187     }
188    
189     static void
190     start_thread (void)
191     {
192     xthread_t tid;
193    
194 root 1.26 if (!curthreads)
195     {
196     X_UNLOCK (release_m);
197 root 1.27 {
198     dTHX;
199 root 1.28 dSP;
200    
201     PUSHSTACKi (PERLSI_REQUIRE);
202    
203 root 1.27 eval_pv ("Coro::Multicore::init", 1);
204 root 1.28
205     POPSTACK;
206 root 1.27 }
207 root 1.26 X_LOCK (release_m);
208     }
209    
210 root 1.9 if (curthreads >= max_threads && 0)
211     return;
212    
213     ++curthreads;
214 root 1.6 ++idle;
215 root 1.1 xthread_create (&tid, thread_proc, 0);
216     }
217    
218     static void
219     pmapi_release (void)
220     {
221 root 1.25 if (! ((thread_enable ? thread_enable : global_enable) & 1))
222     {
223     X_TLS_SET (current_key, 0);
224     return;
225     }
226    
227 root 1.16 #if RECURSION_CHECK
228     if (X_TLS_GET (check_key))
229 root 1.24 fatal ("FATAL: perlinterp_release () called without valid perl context");
230 root 1.16
231     X_TLS_SET (check_key, &check_key);
232     #endif
233    
234 root 1.1 struct tctx *ctx = tctx_get ();
235 root 1.21 ctx->coro = SvREFCNT_inc_simple_NN (CORO_CURRENT);
236 root 1.3 ctx->wait_f = 0;
237    
238 root 1.12 X_TLS_SET (current_key, ctx);
239 root 1.1 pthread_sigmask (SIG_SETMASK, &fullsigset, &cursigset);
240    
241 root 1.9 X_LOCK (release_m);
242 root 1.6
243     if (idle <= min_idle)
244 root 1.1 start_thread ();
245    
246 root 1.9 tctxs_put (&releasers, ctx);
247     X_COND_SIGNAL (release_c);
248    
249     while (!idle && releasers.cur)
250     {
251     X_UNLOCK (release_m);
252     X_LOCK (release_m);
253     }
254 root 1.6
255 root 1.9 X_UNLOCK (release_m);
256 root 1.1 }
257    
258     static void
259     pmapi_acquire (void)
260     {
261 root 1.11 int jeret;
262 root 1.12 struct tctx *ctx = X_TLS_GET (current_key);
263 root 1.1
264 root 1.25 if (!ctx)
265     return;
266    
267 root 1.16 #if RECURSION_CHECK
268     if (X_TLS_GET (check_key) != &check_key)
269 root 1.24 fatal ("FATAL: perlinterp_acquire () called with valid perl context");
270 root 1.16
271     X_TLS_SET (check_key, 0);
272     #endif
273    
274 root 1.9 X_LOCK (acquire_m);
275 root 1.1
276 root 1.9 tctxs_put (&acquirers, ctx);
277 root 1.1
278     s_epipe_signal (&ep);
279 root 1.3 while (!ctx->wait_f)
280 root 1.9 X_COND_WAIT (ctx->acquire_c, acquire_m);
281     X_UNLOCK (acquire_m);
282 root 1.1
283 root 1.11 jeret = ctx->jeret;
284 root 1.1 tctx_put (ctx);
285     pthread_sigmask (SIG_SETMASK, &cursigset, 0);
286 root 1.11
287     if (jeret)
288 root 1.17 {
289     dTHX;
290     JMPENV_JUMP (jeret);
291     }
292 root 1.1 }
293    
294 root 1.4 static void
295 root 1.6 set_thread_enable (pTHX_ void *arg)
296 root 1.4 {
297 root 1.6 thread_enable = PTR2IV (arg);
298 root 1.4 }
299    
300 root 1.29 static void
301     atfork_child (void)
302     {
303     s_epipe_renew (&ep);
304     }
305    
306 root 1.1 MODULE = Coro::Multicore PACKAGE = Coro::Multicore
307    
308     PROTOTYPES: DISABLE
309    
310     BOOT:
311     {
312 root 1.16 #ifndef _WIN32
313 root 1.1 sigfillset (&fullsigset);
314 root 1.16 #endif
315 root 1.1
316 root 1.12 X_TLS_INIT (current_key);
317 root 1.16 #if RECURSION_CHECK
318     X_TLS_INIT (check_key);
319     #endif
320 root 1.1
321     if (s_epipe_new (&ep))
322     croak ("Coro::Multicore: unable to initialise event pipe.\n");
323    
324 root 1.29 pthread_atfork (0, 0, atfork_child);
325    
326 root 1.1 perl_thx = PERL_GET_CONTEXT;
327    
328     I_CORO_API ("Coro::Multicore");
329    
330 root 1.24 if (0) { /*D*/
331 root 1.9 X_LOCK (release_m);
332 root 1.6 while (idle < min_idle)
333     start_thread ();
334 root 1.9 X_UNLOCK (release_m);
335 root 1.24 }
336 root 1.6
337 root 1.14 /* not perfectly efficient to do it this way, but it is simple */
338     perl_multicore_init (); /* calls release */
339 root 1.1 perl_multicore_api->pmapi_release = pmapi_release;
340     perl_multicore_api->pmapi_acquire = pmapi_acquire;
341     }
342    
343 root 1.4 bool
344     enable (bool enable = NO_INIT)
345     CODE:
346     RETVAL = global_enable;
347     if (items)
348     global_enable = enable;
349     OUTPUT:
350     RETVAL
351    
352     void
353     scoped_enable ()
354     CODE:
355     LEAVE; /* see Guard.xs */
356 root 1.6 CORO_ENTERLEAVE_SCOPE_HOOK (set_thread_enable, (void *)1, set_thread_enable, (void *)0);
357 root 1.4 ENTER; /* see Guard.xs */
358    
359     void
360     scoped_disable ()
361     CODE:
362     LEAVE; /* see Guard.xs */
363 root 1.6 CORO_ENTERLEAVE_SCOPE_HOOK (set_thread_enable, (void *)2, set_thread_enable, (void *)0);
364 root 1.4 ENTER; /* see Guard.xs */
365    
366 root 1.25 #if 0
367    
368 root 1.1 U32
369 root 1.6 min_idle_threads (U32 min = NO_INIT)
370 root 1.1 CODE:
371 root 1.9 X_LOCK (acquire_m);
372 root 1.6 RETVAL = min_idle;
373 root 1.1 if (items)
374 root 1.6 min_idle = min;
375 root 1.9 X_UNLOCK (acquire_m);
376 root 1.1 OUTPUT:
377     RETVAL
378 root 1.25
379     #endif
380 root 1.1
381     int
382     fd ()
383     CODE:
384     RETVAL = s_epipe_fd (&ep);
385     OUTPUT:
386     RETVAL
387    
388     void
389     poll (...)
390     CODE:
391     s_epipe_drain (&ep);
392 root 1.9 X_LOCK (acquire_m);
393     while (acquirers.cur)
394 root 1.1 {
395 root 1.9 struct tctx *ctx = tctxs_get (&acquirers);
396 root 1.1 CORO_READY ((SV *)ctx->coro);
397 root 1.22 SvREFCNT_dec_simple_void_NN ((SV *)ctx->coro);
398 root 1.1 ctx->coro = 0;
399     }
400 root 1.9 X_UNLOCK (acquire_m);
401 root 1.1
402     void
403 root 1.2 sleep (NV seconds)
404 root 1.1 CODE:
405     perlinterp_release ();
406 root 1.20 {
407     int nsec = seconds;
408     if (nsec) sleep (nsec);
409     nsec = (seconds - nsec) * 1e9;
410     if (nsec) usleep (nsec);
411     }
412 root 1.1 perlinterp_acquire ();
413 root 1.2