ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/Coro-Multicore/Multicore.xs
Revision: 1.26
Committed: Tue Dec 3 07:12:28 2019 UTC (4 years, 5 months ago) by root
Branch: MAIN
CVS Tags: rel-1_04
Changes since 1.25: +7 -0 lines
Log Message:
1.04

File Contents

# User Rev Content
1 root 1.18 /* most win32 perls are beyond fixing, requiring dTHX */
2     /* even for ISO-C functions such as malloc. avoid! avoid! avoid! */
3 root 1.20 /* and fail to define numerous symbols, but still overrwide them */
4 root 1.19 /* with non-working versions (e.g. setjmp). */
5     #ifdef _WIN32
6 root 1.20 /*# define PERL_CORE 1 fixes some, breaks others */
7 root 1.19 #else
8 root 1.18 # define PERL_NO_GET_CONTEXT
9     #endif
10 root 1.1
11     #include "EXTERN.h"
12     #include "perl.h"
13     #include "XSUB.h"
14    
15 root 1.7 #define X_STACKSIZE 1024 * sizeof (void *)
16 root 1.1
17     #include "CoroAPI.h"
18     #include "perlmulticore.h"
19     #include "schmorp.h"
20     #include "xthread.h"
21    
22     #ifdef _WIN32
23 root 1.18 #ifndef sigset_t
24     #define sigset_t int
25     #endif
26 root 1.1 #endif
27    
28 root 1.8 #ifndef SvREFCNT_dec_NN
29     #define SvREFCNT_dec_NN(sv) SvREFCNT_dec (sv)
30     #endif
31    
32 root 1.22 #ifndef SvREFCNT_dec_simple_void_NN
33     #define SvREFCNT_dec_simple_void_NN(sv) SvREFCNT_dec (sv)
34     #endif
35    
36 root 1.8 #ifndef SvREFCNT_inc_NN
37     #define SvREFCNT_inc_NN(sv) SvREFCNT_inc (sv)
38     #endif
39    
40 root 1.23 #ifndef RECURSION_CHECK
41     #define RECURSION_CHECK 0
42     #endif
43 root 1.16
44 root 1.12 static X_TLS_DECLARE(current_key);
45 root 1.16 #if RECURSION_CHECK
46     static X_TLS_DECLARE(check_key);
47     #endif
48    
49 root 1.23 static void
50     fatal (const char *msg)
51     {
52     write (2, msg, strlen (msg));
53     abort ();
54     }
55 root 1.1
56     static s_epipe ep;
57     static void *perl_thx;
58     static sigset_t cursigset, fullsigset;
59    
60 root 1.6 static int global_enable = 0;
61 root 1.4 static int thread_enable; /* 0 undefined, 1 disabled, 2 enabled */
62    
63 root 1.6 /* assigned to a thread for each release/acquire */
64 root 1.1 struct tctx
65     {
66     void *coro;
67 root 1.6 int wait_f;
68 root 1.9 xcond_t acquire_c;
69 root 1.11 int jeret;
70 root 1.1 };
71    
72     static struct tctx *tctx_free;
73    
74     static struct tctx *
75     tctx_get (void)
76     {
77     struct tctx *ctx;
78    
79     if (!tctx_free)
80 root 1.3 {
81     ctx = malloc (sizeof (*tctx_free));
82 root 1.9 X_COND_CREATE (ctx->acquire_c);
83 root 1.3 }
84 root 1.1 else
85     {
86     ctx = tctx_free;
87     tctx_free = tctx_free->coro;
88     }
89    
90     return ctx;
91     }
92    
93     static void
94     tctx_put (struct tctx *ctx)
95     {
96     ctx->coro = tctx_free;
97     tctx_free = ctx;
98     }
99    
100 root 1.9 /* a stack of tctxs */
101     struct tctxs
102     {
103     struct tctx **ctxs;
104     int cur, max;
105     };
106    
107     static struct tctx *
108     tctxs_get (struct tctxs *ctxs)
109     {
110     return ctxs->ctxs[--ctxs->cur];
111     }
112    
113     static void
114     tctxs_put (struct tctxs *ctxs, struct tctx *ctx)
115     {
116     if (ctxs->cur >= ctxs->max)
117     {
118     ctxs->max = ctxs->max ? ctxs->max * 2 : 16;
119     ctxs->ctxs = realloc (ctxs->ctxs, ctxs->max * sizeof (ctxs->ctxs[0]));
120     }
121    
122     ctxs->ctxs[ctxs->cur++] = ctx;
123     }
124    
125     static xmutex_t release_m = X_MUTEX_INIT;
126     static xcond_t release_c = X_COND_INIT;
127     static struct tctxs releasers;
128     static int idle;
129     static int min_idle = 1;
130     static int curthreads, max_threads = 1; /* protected by release_m */
131    
132     static xmutex_t acquire_m = X_MUTEX_INIT;
133     static struct tctxs acquirers;
134    
135 root 1.1 X_THREAD_PROC(thread_proc)
136     {
137     PERL_SET_CONTEXT (perl_thx);
138    
139     {
140 root 1.17 dTHXa (perl_thx);
141 root 1.11 dJMPENV;
142 root 1.1 struct tctx *ctx;
143 root 1.11 int catchret;
144 root 1.1
145 root 1.9 X_LOCK (release_m);
146 root 1.6
147 root 1.1 for (;;)
148     {
149 root 1.9 while (!releasers.cur)
150 root 1.6 if (idle <= min_idle || 1)
151 root 1.9 X_COND_WAIT (release_c, release_m);
152 root 1.6 else
153     {
154     struct timespec ts = { time (0) + idle - min_idle, 0 };
155    
156 root 1.9 if (X_COND_TIMEDWAIT (release_c, release_m, ts) == ETIMEDOUT)
157     if (idle > min_idle && !releasers.cur)
158 root 1.6 break;
159     }
160    
161 root 1.9 ctx = tctxs_get (&releasers);
162 root 1.6 --idle;
163 root 1.9 X_UNLOCK (release_m);
164 root 1.3
165 root 1.6 if (!ctx) /* timed out? */
166     break;
167    
168 root 1.1 pthread_sigmask (SIG_SETMASK, &cursigset, 0);
169 root 1.11 JMPENV_PUSH (ctx->jeret);
170 root 1.1
171 root 1.11 if (!ctx->jeret)
172     while (ctx->coro)
173     CORO_SCHEDULE;
174 root 1.1
175 root 1.11 JMPENV_POP;
176 root 1.3 pthread_sigmask (SIG_SETMASK, &fullsigset, &cursigset);
177    
178 root 1.9 X_LOCK (acquire_m);
179 root 1.3 ctx->wait_f = 1;
180 root 1.9 X_COND_SIGNAL (ctx->acquire_c);
181     X_UNLOCK (acquire_m);
182 root 1.1
183 root 1.9 X_LOCK (release_m);
184 root 1.6 ++idle;
185 root 1.1 }
186     }
187     }
188    
189     static void
190     start_thread (void)
191     {
192     xthread_t tid;
193    
194 root 1.26 if (!curthreads)
195     {
196     X_UNLOCK (release_m);
197     eval_pv ("Coro::Multicore::init", 1);
198     X_LOCK (release_m);
199     }
200    
201 root 1.9 if (curthreads >= max_threads && 0)
202     return;
203    
204     ++curthreads;
205 root 1.6 ++idle;
206 root 1.1 xthread_create (&tid, thread_proc, 0);
207     }
208    
209     static void
210     pmapi_release (void)
211     {
212 root 1.25 if (! ((thread_enable ? thread_enable : global_enable) & 1))
213     {
214     X_TLS_SET (current_key, 0);
215     return;
216     }
217    
218 root 1.16 #if RECURSION_CHECK
219     if (X_TLS_GET (check_key))
220 root 1.24 fatal ("FATAL: perlinterp_release () called without valid perl context");
221 root 1.16
222     X_TLS_SET (check_key, &check_key);
223     #endif
224    
225 root 1.1 struct tctx *ctx = tctx_get ();
226 root 1.21 ctx->coro = SvREFCNT_inc_simple_NN (CORO_CURRENT);
227 root 1.3 ctx->wait_f = 0;
228    
229 root 1.12 X_TLS_SET (current_key, ctx);
230 root 1.1 pthread_sigmask (SIG_SETMASK, &fullsigset, &cursigset);
231    
232 root 1.9 X_LOCK (release_m);
233 root 1.6
234     if (idle <= min_idle)
235 root 1.1 start_thread ();
236    
237 root 1.9 tctxs_put (&releasers, ctx);
238     X_COND_SIGNAL (release_c);
239    
240     while (!idle && releasers.cur)
241     {
242     X_UNLOCK (release_m);
243     X_LOCK (release_m);
244     }
245 root 1.6
246 root 1.9 X_UNLOCK (release_m);
247 root 1.1 }
248    
249     static void
250     pmapi_acquire (void)
251     {
252 root 1.11 int jeret;
253 root 1.12 struct tctx *ctx = X_TLS_GET (current_key);
254 root 1.1
255 root 1.25 if (!ctx)
256     return;
257    
258 root 1.16 #if RECURSION_CHECK
259     if (X_TLS_GET (check_key) != &check_key)
260 root 1.24 fatal ("FATAL: perlinterp_acquire () called with valid perl context");
261 root 1.16
262     X_TLS_SET (check_key, 0);
263     #endif
264    
265 root 1.9 X_LOCK (acquire_m);
266 root 1.1
267 root 1.9 tctxs_put (&acquirers, ctx);
268 root 1.1
269     s_epipe_signal (&ep);
270 root 1.3 while (!ctx->wait_f)
271 root 1.9 X_COND_WAIT (ctx->acquire_c, acquire_m);
272     X_UNLOCK (acquire_m);
273 root 1.1
274 root 1.11 jeret = ctx->jeret;
275 root 1.1 tctx_put (ctx);
276     pthread_sigmask (SIG_SETMASK, &cursigset, 0);
277 root 1.11
278     if (jeret)
279 root 1.17 {
280     dTHX;
281     JMPENV_JUMP (jeret);
282     }
283 root 1.1 }
284    
285 root 1.4 static void
286 root 1.6 set_thread_enable (pTHX_ void *arg)
287 root 1.4 {
288 root 1.6 thread_enable = PTR2IV (arg);
289 root 1.4 }
290    
291 root 1.1 MODULE = Coro::Multicore PACKAGE = Coro::Multicore
292    
293     PROTOTYPES: DISABLE
294    
295     BOOT:
296     {
297 root 1.16 #ifndef _WIN32
298 root 1.1 sigfillset (&fullsigset);
299 root 1.16 #endif
300 root 1.1
301 root 1.12 X_TLS_INIT (current_key);
302 root 1.16 #if RECURSION_CHECK
303     X_TLS_INIT (check_key);
304     #endif
305 root 1.1
306     if (s_epipe_new (&ep))
307     croak ("Coro::Multicore: unable to initialise event pipe.\n");
308    
309     perl_thx = PERL_GET_CONTEXT;
310    
311     I_CORO_API ("Coro::Multicore");
312    
313 root 1.24 if (0) { /*D*/
314 root 1.9 X_LOCK (release_m);
315 root 1.6 while (idle < min_idle)
316     start_thread ();
317 root 1.9 X_UNLOCK (release_m);
318 root 1.24 }
319 root 1.6
320 root 1.14 /* not perfectly efficient to do it this way, but it is simple */
321     perl_multicore_init (); /* calls release */
322 root 1.1 perl_multicore_api->pmapi_release = pmapi_release;
323     perl_multicore_api->pmapi_acquire = pmapi_acquire;
324     }
325    
326 root 1.4 bool
327     enable (bool enable = NO_INIT)
328     CODE:
329     RETVAL = global_enable;
330     if (items)
331     global_enable = enable;
332     OUTPUT:
333     RETVAL
334    
335     void
336     scoped_enable ()
337     CODE:
338     LEAVE; /* see Guard.xs */
339 root 1.6 CORO_ENTERLEAVE_SCOPE_HOOK (set_thread_enable, (void *)1, set_thread_enable, (void *)0);
340 root 1.4 ENTER; /* see Guard.xs */
341    
342     void
343     scoped_disable ()
344     CODE:
345     LEAVE; /* see Guard.xs */
346 root 1.6 CORO_ENTERLEAVE_SCOPE_HOOK (set_thread_enable, (void *)2, set_thread_enable, (void *)0);
347 root 1.4 ENTER; /* see Guard.xs */
348    
349 root 1.25 #if 0
350    
351 root 1.1 U32
352 root 1.6 min_idle_threads (U32 min = NO_INIT)
353 root 1.1 CODE:
354 root 1.9 X_LOCK (acquire_m);
355 root 1.6 RETVAL = min_idle;
356 root 1.1 if (items)
357 root 1.6 min_idle = min;
358 root 1.9 X_UNLOCK (acquire_m);
359 root 1.1 OUTPUT:
360     RETVAL
361 root 1.25
362     #endif
363 root 1.1
364     int
365     fd ()
366     CODE:
367     RETVAL = s_epipe_fd (&ep);
368     OUTPUT:
369     RETVAL
370    
371     void
372     poll (...)
373     CODE:
374     s_epipe_drain (&ep);
375 root 1.9 X_LOCK (acquire_m);
376     while (acquirers.cur)
377 root 1.1 {
378 root 1.9 struct tctx *ctx = tctxs_get (&acquirers);
379 root 1.1 CORO_READY ((SV *)ctx->coro);
380 root 1.22 SvREFCNT_dec_simple_void_NN ((SV *)ctx->coro);
381 root 1.1 ctx->coro = 0;
382     }
383 root 1.9 X_UNLOCK (acquire_m);
384 root 1.1
385     void
386 root 1.2 sleep (NV seconds)
387 root 1.1 CODE:
388     perlinterp_release ();
389 root 1.20 {
390     int nsec = seconds;
391     if (nsec) sleep (nsec);
392     nsec = (seconds - nsec) * 1e9;
393     if (nsec) usleep (nsec);
394     }
395 root 1.1 perlinterp_acquire ();
396 root 1.2