ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/Coro-Multicore/Multicore.xs
Revision: 1.18
Committed: Mon Aug 13 10:22:49 2018 UTC (5 years, 9 months ago) by root
Branch: MAIN
Changes since 1.17: +8 -3 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.18 #ifndef _WIN32
2     /* most win32 perls are beyond fixing, requiring dTHX */
3     /* even for ISO-C functions such as malloc. avoid! avoid! avoid! */
4     # define PERL_NO_GET_CONTEXT
5     #endif
6 root 1.1
7     #include "EXTERN.h"
8     #include "perl.h"
9     #include "XSUB.h"
10    
11 root 1.7 #define X_STACKSIZE 1024 * sizeof (void *)
12 root 1.1
13     #include "CoroAPI.h"
14     #include "perlmulticore.h"
15     #include "schmorp.h"
16     #include "xthread.h"
17    
18     #ifdef _WIN32
19 root 1.18 #ifndef sigset_t
20     #define sigset_t int
21     #endif
22 root 1.1 #endif
23    
24 root 1.8 #ifndef SvREFCNT_dec_NN
25     #define SvREFCNT_dec_NN(sv) SvREFCNT_dec (sv)
26     #endif
27    
28     #ifndef SvREFCNT_inc_NN
29     #define SvREFCNT_inc_NN(sv) SvREFCNT_inc (sv)
30     #endif
31    
32 root 1.16 #define RECURSION_CHECK 0
33    
34 root 1.12 static X_TLS_DECLARE(current_key);
35 root 1.16 #if RECURSION_CHECK
36     static X_TLS_DECLARE(check_key);
37     #endif
38    
39 root 1.1
40     static s_epipe ep;
41     static void *perl_thx;
42     static sigset_t cursigset, fullsigset;
43    
44 root 1.6 static int global_enable = 0;
45 root 1.4 static int thread_enable; /* 0 undefined, 1 disabled, 2 enabled */
46    
47 root 1.6 /* assigned to a thread for each release/acquire */
48 root 1.1 struct tctx
49     {
50     void *coro;
51 root 1.6 int wait_f;
52 root 1.9 xcond_t acquire_c;
53 root 1.11 int jeret;
54 root 1.1 };
55    
56     static struct tctx *tctx_free;
57    
58     static struct tctx *
59     tctx_get (void)
60     {
61     struct tctx *ctx;
62    
63     if (!tctx_free)
64 root 1.3 {
65     ctx = malloc (sizeof (*tctx_free));
66 root 1.9 X_COND_CREATE (ctx->acquire_c);
67 root 1.3 }
68 root 1.1 else
69     {
70     ctx = tctx_free;
71     tctx_free = tctx_free->coro;
72     }
73    
74     return ctx;
75     }
76    
77     static void
78     tctx_put (struct tctx *ctx)
79     {
80     ctx->coro = tctx_free;
81     tctx_free = ctx;
82     }
83    
84 root 1.9 /* a stack of tctxs */
85     struct tctxs
86     {
87     struct tctx **ctxs;
88     int cur, max;
89     };
90    
91     static struct tctx *
92     tctxs_get (struct tctxs *ctxs)
93     {
94     return ctxs->ctxs[--ctxs->cur];
95     }
96    
97     static void
98     tctxs_put (struct tctxs *ctxs, struct tctx *ctx)
99     {
100     if (ctxs->cur >= ctxs->max)
101     {
102     ctxs->max = ctxs->max ? ctxs->max * 2 : 16;
103     ctxs->ctxs = realloc (ctxs->ctxs, ctxs->max * sizeof (ctxs->ctxs[0]));
104     }
105    
106     ctxs->ctxs[ctxs->cur++] = ctx;
107     }
108    
109     static xmutex_t release_m = X_MUTEX_INIT;
110     static xcond_t release_c = X_COND_INIT;
111     static struct tctxs releasers;
112     static int idle;
113     static int min_idle = 1;
114     static int curthreads, max_threads = 1; /* protected by release_m */
115    
116     static xmutex_t acquire_m = X_MUTEX_INIT;
117     static struct tctxs acquirers;
118    
119 root 1.1 X_THREAD_PROC(thread_proc)
120     {
121     PERL_SET_CONTEXT (perl_thx);
122    
123     {
124 root 1.17 dTHXa (perl_thx);
125 root 1.11 dJMPENV;
126 root 1.1 struct tctx *ctx;
127 root 1.11 int catchret;
128 root 1.1
129 root 1.9 X_LOCK (release_m);
130 root 1.6
131 root 1.1 for (;;)
132     {
133 root 1.9 while (!releasers.cur)
134 root 1.6 if (idle <= min_idle || 1)
135 root 1.9 X_COND_WAIT (release_c, release_m);
136 root 1.6 else
137     {
138     struct timespec ts = { time (0) + idle - min_idle, 0 };
139    
140 root 1.9 if (X_COND_TIMEDWAIT (release_c, release_m, ts) == ETIMEDOUT)
141     if (idle > min_idle && !releasers.cur)
142 root 1.6 break;
143     }
144    
145 root 1.9 ctx = tctxs_get (&releasers);
146 root 1.6 --idle;
147 root 1.9 X_UNLOCK (release_m);
148 root 1.3
149 root 1.6 if (!ctx) /* timed out? */
150     break;
151    
152 root 1.1 pthread_sigmask (SIG_SETMASK, &cursigset, 0);
153 root 1.11 JMPENV_PUSH (ctx->jeret);
154 root 1.1
155 root 1.11 if (!ctx->jeret)
156     while (ctx->coro)
157     CORO_SCHEDULE;
158 root 1.1
159 root 1.11 JMPENV_POP;
160 root 1.3 pthread_sigmask (SIG_SETMASK, &fullsigset, &cursigset);
161    
162 root 1.9 X_LOCK (acquire_m);
163 root 1.3 ctx->wait_f = 1;
164 root 1.9 X_COND_SIGNAL (ctx->acquire_c);
165     X_UNLOCK (acquire_m);
166 root 1.1
167 root 1.9 X_LOCK (release_m);
168 root 1.6 ++idle;
169 root 1.1 }
170     }
171     }
172    
173     static void
174     start_thread (void)
175     {
176     xthread_t tid;
177    
178 root 1.9 if (curthreads >= max_threads && 0)
179     return;
180    
181     ++curthreads;
182 root 1.6 ++idle;
183 root 1.1 xthread_create (&tid, thread_proc, 0);
184     }
185    
186     static void
187     pmapi_release (void)
188     {
189 root 1.16 #if RECURSION_CHECK
190     if (X_TLS_GET (check_key))
191     croak ("perlinterp_release () called without valid perl context");
192    
193     X_TLS_SET (check_key, &check_key);
194     #endif
195    
196 root 1.13 if (! ((thread_enable ? thread_enable : global_enable) & 1))
197 root 1.4 {
198 root 1.12 X_TLS_SET (current_key, 0);
199 root 1.4 return;
200     }
201    
202 root 1.1 struct tctx *ctx = tctx_get ();
203     ctx->coro = SvREFCNT_inc_NN (CORO_CURRENT);
204 root 1.3 ctx->wait_f = 0;
205    
206 root 1.12 X_TLS_SET (current_key, ctx);
207 root 1.1 pthread_sigmask (SIG_SETMASK, &fullsigset, &cursigset);
208    
209 root 1.9 X_LOCK (release_m);
210 root 1.6
211     if (idle <= min_idle)
212 root 1.1 start_thread ();
213    
214 root 1.9 tctxs_put (&releasers, ctx);
215     X_COND_SIGNAL (release_c);
216    
217     while (!idle && releasers.cur)
218     {
219     X_UNLOCK (release_m);
220     X_LOCK (release_m);
221     }
222 root 1.6
223 root 1.9 X_UNLOCK (release_m);
224 root 1.1 }
225    
226     static void
227     pmapi_acquire (void)
228     {
229 root 1.11 int jeret;
230 root 1.12 struct tctx *ctx = X_TLS_GET (current_key);
231 root 1.1
232 root 1.16 #if RECURSION_CHECK
233     if (X_TLS_GET (check_key) != &check_key)
234     croak ("perlinterp_acquire () called with valid perl context");
235    
236     X_TLS_SET (check_key, 0);
237     #endif
238    
239 root 1.4 if (!ctx)
240     return;
241    
242 root 1.9 X_LOCK (acquire_m);
243 root 1.1
244 root 1.9 tctxs_put (&acquirers, ctx);
245 root 1.1
246     s_epipe_signal (&ep);
247 root 1.3 while (!ctx->wait_f)
248 root 1.9 X_COND_WAIT (ctx->acquire_c, acquire_m);
249     X_UNLOCK (acquire_m);
250 root 1.1
251 root 1.11 jeret = ctx->jeret;
252 root 1.1 tctx_put (ctx);
253     pthread_sigmask (SIG_SETMASK, &cursigset, 0);
254 root 1.11
255     if (jeret)
256 root 1.17 {
257     dTHX;
258     JMPENV_JUMP (jeret);
259     }
260 root 1.1 }
261    
262 root 1.4 static void
263 root 1.6 set_thread_enable (pTHX_ void *arg)
264 root 1.4 {
265 root 1.6 thread_enable = PTR2IV (arg);
266 root 1.4 }
267    
268 root 1.1 MODULE = Coro::Multicore PACKAGE = Coro::Multicore
269    
270     PROTOTYPES: DISABLE
271    
272     BOOT:
273     {
274 root 1.16 #ifndef _WIN32
275 root 1.1 sigfillset (&fullsigset);
276 root 1.16 #endif
277 root 1.1
278 root 1.12 X_TLS_INIT (current_key);
279 root 1.16 #if RECURSION_CHECK
280     X_TLS_INIT (check_key);
281     #endif
282 root 1.1
283     if (s_epipe_new (&ep))
284     croak ("Coro::Multicore: unable to initialise event pipe.\n");
285    
286     perl_thx = PERL_GET_CONTEXT;
287    
288     I_CORO_API ("Coro::Multicore");
289    
290 root 1.9 X_LOCK (release_m);
291 root 1.6 while (idle < min_idle)
292     start_thread ();
293 root 1.9 X_UNLOCK (release_m);
294 root 1.6
295 root 1.14 /* not perfectly efficient to do it this way, but it is simple */
296     perl_multicore_init (); /* calls release */
297 root 1.1 perl_multicore_api->pmapi_release = pmapi_release;
298     perl_multicore_api->pmapi_acquire = pmapi_acquire;
299     }
300    
301 root 1.4 bool
302     enable (bool enable = NO_INIT)
303     CODE:
304     RETVAL = global_enable;
305     if (items)
306     global_enable = enable;
307     OUTPUT:
308     RETVAL
309    
310     void
311     scoped_enable ()
312     CODE:
313     LEAVE; /* see Guard.xs */
314 root 1.6 CORO_ENTERLEAVE_SCOPE_HOOK (set_thread_enable, (void *)1, set_thread_enable, (void *)0);
315 root 1.4 ENTER; /* see Guard.xs */
316    
317     void
318     scoped_disable ()
319     CODE:
320     LEAVE; /* see Guard.xs */
321 root 1.6 CORO_ENTERLEAVE_SCOPE_HOOK (set_thread_enable, (void *)2, set_thread_enable, (void *)0);
322 root 1.4 ENTER; /* see Guard.xs */
323    
324 root 1.1 U32
325 root 1.6 min_idle_threads (U32 min = NO_INIT)
326 root 1.1 CODE:
327 root 1.9 X_LOCK (acquire_m);
328 root 1.6 RETVAL = min_idle;
329 root 1.1 if (items)
330 root 1.6 min_idle = min;
331 root 1.9 X_UNLOCK (acquire_m);
332 root 1.1 OUTPUT:
333     RETVAL
334    
335    
336     int
337     fd ()
338     CODE:
339     RETVAL = s_epipe_fd (&ep);
340     OUTPUT:
341     RETVAL
342    
343     void
344     poll (...)
345     CODE:
346     s_epipe_drain (&ep);
347 root 1.9 X_LOCK (acquire_m);
348     while (acquirers.cur)
349 root 1.1 {
350 root 1.9 struct tctx *ctx = tctxs_get (&acquirers);
351 root 1.1 CORO_READY ((SV *)ctx->coro);
352     SvREFCNT_dec_NN ((SV *)ctx->coro);
353     ctx->coro = 0;
354     }
355 root 1.9 X_UNLOCK (acquire_m);
356 root 1.1
357     void
358 root 1.2 sleep (NV seconds)
359 root 1.1 CODE:
360     perlinterp_release ();
361 root 1.2 usleep (seconds * 1e6);
362 root 1.1 perlinterp_acquire ();
363 root 1.2