ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/Coro-Multicore/Multicore.xs
Revision: 1.25
Committed: Wed Mar 6 16:39:45 2019 UTC (5 years, 3 months ago) by root
Branch: MAIN
CVS Tags: rel-1_03
Changes since 1.24: +13 -10 lines
Log Message:
1.03

File Contents

# User Rev Content
1 root 1.18 /* most win32 perls are beyond fixing, requiring dTHX */
2     /* even for ISO-C functions such as malloc. avoid! avoid! avoid! */
3 root 1.20 /* and fail to define numerous symbols, but still overrwide them */
4 root 1.19 /* with non-working versions (e.g. setjmp). */
5     #ifdef _WIN32
6 root 1.20 /*# define PERL_CORE 1 fixes some, breaks others */
7 root 1.19 #else
8 root 1.18 # define PERL_NO_GET_CONTEXT
9     #endif
10 root 1.1
11     #include "EXTERN.h"
12     #include "perl.h"
13     #include "XSUB.h"
14    
15 root 1.7 #define X_STACKSIZE 1024 * sizeof (void *)
16 root 1.1
17     #include "CoroAPI.h"
18     #include "perlmulticore.h"
19     #include "schmorp.h"
20     #include "xthread.h"
21    
22     #ifdef _WIN32
23 root 1.18 #ifndef sigset_t
24     #define sigset_t int
25     #endif
26 root 1.1 #endif
27    
28 root 1.8 #ifndef SvREFCNT_dec_NN
29     #define SvREFCNT_dec_NN(sv) SvREFCNT_dec (sv)
30     #endif
31    
32 root 1.22 #ifndef SvREFCNT_dec_simple_void_NN
33     #define SvREFCNT_dec_simple_void_NN(sv) SvREFCNT_dec (sv)
34     #endif
35    
36 root 1.8 #ifndef SvREFCNT_inc_NN
37     #define SvREFCNT_inc_NN(sv) SvREFCNT_inc (sv)
38     #endif
39    
40 root 1.23 #ifndef RECURSION_CHECK
41     #define RECURSION_CHECK 0
42     #endif
43 root 1.16
44 root 1.12 static X_TLS_DECLARE(current_key);
45 root 1.16 #if RECURSION_CHECK
46     static X_TLS_DECLARE(check_key);
47     #endif
48    
49 root 1.23 static void
50     fatal (const char *msg)
51     {
52     write (2, msg, strlen (msg));
53     abort ();
54     }
55 root 1.1
56     static s_epipe ep;
57     static void *perl_thx;
58     static sigset_t cursigset, fullsigset;
59    
60 root 1.6 static int global_enable = 0;
61 root 1.4 static int thread_enable; /* 0 undefined, 1 disabled, 2 enabled */
62    
63 root 1.6 /* assigned to a thread for each release/acquire */
64 root 1.1 struct tctx
65     {
66     void *coro;
67 root 1.6 int wait_f;
68 root 1.9 xcond_t acquire_c;
69 root 1.11 int jeret;
70 root 1.1 };
71    
72     static struct tctx *tctx_free;
73    
74     static struct tctx *
75     tctx_get (void)
76     {
77     struct tctx *ctx;
78    
79     if (!tctx_free)
80 root 1.3 {
81     ctx = malloc (sizeof (*tctx_free));
82 root 1.9 X_COND_CREATE (ctx->acquire_c);
83 root 1.3 }
84 root 1.1 else
85     {
86     ctx = tctx_free;
87     tctx_free = tctx_free->coro;
88     }
89    
90     return ctx;
91     }
92    
93     static void
94     tctx_put (struct tctx *ctx)
95     {
96     ctx->coro = tctx_free;
97     tctx_free = ctx;
98     }
99    
100 root 1.9 /* a stack of tctxs */
101     struct tctxs
102     {
103     struct tctx **ctxs;
104     int cur, max;
105     };
106    
107     static struct tctx *
108     tctxs_get (struct tctxs *ctxs)
109     {
110     return ctxs->ctxs[--ctxs->cur];
111     }
112    
113     static void
114     tctxs_put (struct tctxs *ctxs, struct tctx *ctx)
115     {
116     if (ctxs->cur >= ctxs->max)
117     {
118     ctxs->max = ctxs->max ? ctxs->max * 2 : 16;
119     ctxs->ctxs = realloc (ctxs->ctxs, ctxs->max * sizeof (ctxs->ctxs[0]));
120     }
121    
122     ctxs->ctxs[ctxs->cur++] = ctx;
123     }
124    
125     static xmutex_t release_m = X_MUTEX_INIT;
126     static xcond_t release_c = X_COND_INIT;
127     static struct tctxs releasers;
128     static int idle;
129     static int min_idle = 1;
130     static int curthreads, max_threads = 1; /* protected by release_m */
131    
132     static xmutex_t acquire_m = X_MUTEX_INIT;
133     static struct tctxs acquirers;
134    
135 root 1.1 X_THREAD_PROC(thread_proc)
136     {
137     PERL_SET_CONTEXT (perl_thx);
138    
139     {
140 root 1.17 dTHXa (perl_thx);
141 root 1.11 dJMPENV;
142 root 1.1 struct tctx *ctx;
143 root 1.11 int catchret;
144 root 1.1
145 root 1.9 X_LOCK (release_m);
146 root 1.6
147 root 1.1 for (;;)
148     {
149 root 1.9 while (!releasers.cur)
150 root 1.6 if (idle <= min_idle || 1)
151 root 1.9 X_COND_WAIT (release_c, release_m);
152 root 1.6 else
153     {
154     struct timespec ts = { time (0) + idle - min_idle, 0 };
155    
156 root 1.9 if (X_COND_TIMEDWAIT (release_c, release_m, ts) == ETIMEDOUT)
157     if (idle > min_idle && !releasers.cur)
158 root 1.6 break;
159     }
160    
161 root 1.9 ctx = tctxs_get (&releasers);
162 root 1.6 --idle;
163 root 1.9 X_UNLOCK (release_m);
164 root 1.3
165 root 1.6 if (!ctx) /* timed out? */
166     break;
167    
168 root 1.1 pthread_sigmask (SIG_SETMASK, &cursigset, 0);
169 root 1.11 JMPENV_PUSH (ctx->jeret);
170 root 1.1
171 root 1.11 if (!ctx->jeret)
172     while (ctx->coro)
173     CORO_SCHEDULE;
174 root 1.1
175 root 1.11 JMPENV_POP;
176 root 1.3 pthread_sigmask (SIG_SETMASK, &fullsigset, &cursigset);
177    
178 root 1.9 X_LOCK (acquire_m);
179 root 1.3 ctx->wait_f = 1;
180 root 1.9 X_COND_SIGNAL (ctx->acquire_c);
181     X_UNLOCK (acquire_m);
182 root 1.1
183 root 1.9 X_LOCK (release_m);
184 root 1.6 ++idle;
185 root 1.1 }
186     }
187     }
188    
189     static void
190     start_thread (void)
191     {
192     xthread_t tid;
193    
194 root 1.9 if (curthreads >= max_threads && 0)
195     return;
196    
197     ++curthreads;
198 root 1.6 ++idle;
199 root 1.1 xthread_create (&tid, thread_proc, 0);
200     }
201    
202     static void
203     pmapi_release (void)
204     {
205 root 1.25 if (! ((thread_enable ? thread_enable : global_enable) & 1))
206     {
207     X_TLS_SET (current_key, 0);
208     return;
209     }
210    
211 root 1.16 #if RECURSION_CHECK
212     if (X_TLS_GET (check_key))
213 root 1.24 fatal ("FATAL: perlinterp_release () called without valid perl context");
214 root 1.16
215     X_TLS_SET (check_key, &check_key);
216     #endif
217    
218 root 1.1 struct tctx *ctx = tctx_get ();
219 root 1.21 ctx->coro = SvREFCNT_inc_simple_NN (CORO_CURRENT);
220 root 1.3 ctx->wait_f = 0;
221    
222 root 1.12 X_TLS_SET (current_key, ctx);
223 root 1.1 pthread_sigmask (SIG_SETMASK, &fullsigset, &cursigset);
224    
225 root 1.9 X_LOCK (release_m);
226 root 1.6
227     if (idle <= min_idle)
228 root 1.1 start_thread ();
229    
230 root 1.9 tctxs_put (&releasers, ctx);
231     X_COND_SIGNAL (release_c);
232    
233     while (!idle && releasers.cur)
234     {
235     X_UNLOCK (release_m);
236     X_LOCK (release_m);
237     }
238 root 1.6
239 root 1.9 X_UNLOCK (release_m);
240 root 1.1 }
241    
242     static void
243     pmapi_acquire (void)
244     {
245 root 1.11 int jeret;
246 root 1.12 struct tctx *ctx = X_TLS_GET (current_key);
247 root 1.1
248 root 1.25 if (!ctx)
249     return;
250    
251 root 1.16 #if RECURSION_CHECK
252     if (X_TLS_GET (check_key) != &check_key)
253 root 1.24 fatal ("FATAL: perlinterp_acquire () called with valid perl context");
254 root 1.16
255     X_TLS_SET (check_key, 0);
256     #endif
257    
258 root 1.9 X_LOCK (acquire_m);
259 root 1.1
260 root 1.9 tctxs_put (&acquirers, ctx);
261 root 1.1
262     s_epipe_signal (&ep);
263 root 1.3 while (!ctx->wait_f)
264 root 1.9 X_COND_WAIT (ctx->acquire_c, acquire_m);
265     X_UNLOCK (acquire_m);
266 root 1.1
267 root 1.11 jeret = ctx->jeret;
268 root 1.1 tctx_put (ctx);
269     pthread_sigmask (SIG_SETMASK, &cursigset, 0);
270 root 1.11
271     if (jeret)
272 root 1.17 {
273     dTHX;
274     JMPENV_JUMP (jeret);
275     }
276 root 1.1 }
277    
278 root 1.4 static void
279 root 1.6 set_thread_enable (pTHX_ void *arg)
280 root 1.4 {
281 root 1.6 thread_enable = PTR2IV (arg);
282 root 1.4 }
283    
284 root 1.1 MODULE = Coro::Multicore PACKAGE = Coro::Multicore
285    
286     PROTOTYPES: DISABLE
287    
288     BOOT:
289     {
290 root 1.16 #ifndef _WIN32
291 root 1.1 sigfillset (&fullsigset);
292 root 1.16 #endif
293 root 1.1
294 root 1.12 X_TLS_INIT (current_key);
295 root 1.16 #if RECURSION_CHECK
296     X_TLS_INIT (check_key);
297     #endif
298 root 1.1
299     if (s_epipe_new (&ep))
300     croak ("Coro::Multicore: unable to initialise event pipe.\n");
301    
302     perl_thx = PERL_GET_CONTEXT;
303    
304     I_CORO_API ("Coro::Multicore");
305    
306 root 1.24 if (0) { /*D*/
307 root 1.9 X_LOCK (release_m);
308 root 1.6 while (idle < min_idle)
309     start_thread ();
310 root 1.9 X_UNLOCK (release_m);
311 root 1.24 }
312 root 1.6
313 root 1.14 /* not perfectly efficient to do it this way, but it is simple */
314     perl_multicore_init (); /* calls release */
315 root 1.1 perl_multicore_api->pmapi_release = pmapi_release;
316     perl_multicore_api->pmapi_acquire = pmapi_acquire;
317     }
318    
319 root 1.4 bool
320     enable (bool enable = NO_INIT)
321     CODE:
322     RETVAL = global_enable;
323     if (items)
324     global_enable = enable;
325     OUTPUT:
326     RETVAL
327    
328     void
329     scoped_enable ()
330     CODE:
331     LEAVE; /* see Guard.xs */
332 root 1.6 CORO_ENTERLEAVE_SCOPE_HOOK (set_thread_enable, (void *)1, set_thread_enable, (void *)0);
333 root 1.4 ENTER; /* see Guard.xs */
334    
335     void
336     scoped_disable ()
337     CODE:
338     LEAVE; /* see Guard.xs */
339 root 1.6 CORO_ENTERLEAVE_SCOPE_HOOK (set_thread_enable, (void *)2, set_thread_enable, (void *)0);
340 root 1.4 ENTER; /* see Guard.xs */
341    
342 root 1.25 #if 0
343    
344 root 1.1 U32
345 root 1.6 min_idle_threads (U32 min = NO_INIT)
346 root 1.1 CODE:
347 root 1.9 X_LOCK (acquire_m);
348 root 1.6 RETVAL = min_idle;
349 root 1.1 if (items)
350 root 1.6 min_idle = min;
351 root 1.9 X_UNLOCK (acquire_m);
352 root 1.1 OUTPUT:
353     RETVAL
354 root 1.25
355     #endif
356 root 1.1
357     int
358     fd ()
359     CODE:
360     RETVAL = s_epipe_fd (&ep);
361     OUTPUT:
362     RETVAL
363    
364     void
365     poll (...)
366     CODE:
367     s_epipe_drain (&ep);
368 root 1.9 X_LOCK (acquire_m);
369     while (acquirers.cur)
370 root 1.1 {
371 root 1.9 struct tctx *ctx = tctxs_get (&acquirers);
372 root 1.1 CORO_READY ((SV *)ctx->coro);
373 root 1.22 SvREFCNT_dec_simple_void_NN ((SV *)ctx->coro);
374 root 1.1 ctx->coro = 0;
375     }
376 root 1.9 X_UNLOCK (acquire_m);
377 root 1.1
378     void
379 root 1.2 sleep (NV seconds)
380 root 1.1 CODE:
381     perlinterp_release ();
382 root 1.20 {
383     int nsec = seconds;
384     if (nsec) sleep (nsec);
385     nsec = (seconds - nsec) * 1e9;
386     if (nsec) usleep (nsec);
387     }
388 root 1.1 perlinterp_acquire ();
389 root 1.2