ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/Coro-Multicore/Multicore.xs
Revision: 1.18
Committed: Mon Aug 13 10:22:49 2018 UTC (5 years, 9 months ago) by root
Branch: MAIN
Changes since 1.17: +8 -3 lines
Log Message:
*** empty log message ***

File Contents

# Content
1 #ifndef _WIN32
2 /* most win32 perls are beyond fixing, requiring dTHX */
3 /* even for ISO-C functions such as malloc. avoid! avoid! avoid! */
4 # define PERL_NO_GET_CONTEXT
5 #endif
6
7 #include "EXTERN.h"
8 #include "perl.h"
9 #include "XSUB.h"
10
11 #define X_STACKSIZE 1024 * sizeof (void *)
12
13 #include "CoroAPI.h"
14 #include "perlmulticore.h"
15 #include "schmorp.h"
16 #include "xthread.h"
17
18 #ifdef _WIN32
19 #ifndef sigset_t
20 #define sigset_t int
21 #endif
22 #endif
23
24 #ifndef SvREFCNT_dec_NN
25 #define SvREFCNT_dec_NN(sv) SvREFCNT_dec (sv)
26 #endif
27
28 #ifndef SvREFCNT_inc_NN
29 #define SvREFCNT_inc_NN(sv) SvREFCNT_inc (sv)
30 #endif
31
32 #define RECURSION_CHECK 0
33
34 static X_TLS_DECLARE(current_key);
35 #if RECURSION_CHECK
36 static X_TLS_DECLARE(check_key);
37 #endif
38
39
40 static s_epipe ep;
41 static void *perl_thx;
42 static sigset_t cursigset, fullsigset;
43
44 static int global_enable = 0;
45 static int thread_enable; /* 0 undefined, 1 disabled, 2 enabled */
46
47 /* assigned to a thread for each release/acquire */
48 struct tctx
49 {
50 void *coro;
51 int wait_f;
52 xcond_t acquire_c;
53 int jeret;
54 };
55
56 static struct tctx *tctx_free;
57
58 static struct tctx *
59 tctx_get (void)
60 {
61 struct tctx *ctx;
62
63 if (!tctx_free)
64 {
65 ctx = malloc (sizeof (*tctx_free));
66 X_COND_CREATE (ctx->acquire_c);
67 }
68 else
69 {
70 ctx = tctx_free;
71 tctx_free = tctx_free->coro;
72 }
73
74 return ctx;
75 }
76
77 static void
78 tctx_put (struct tctx *ctx)
79 {
80 ctx->coro = tctx_free;
81 tctx_free = ctx;
82 }
83
84 /* a stack of tctxs */
85 struct tctxs
86 {
87 struct tctx **ctxs;
88 int cur, max;
89 };
90
91 static struct tctx *
92 tctxs_get (struct tctxs *ctxs)
93 {
94 return ctxs->ctxs[--ctxs->cur];
95 }
96
97 static void
98 tctxs_put (struct tctxs *ctxs, struct tctx *ctx)
99 {
100 if (ctxs->cur >= ctxs->max)
101 {
102 ctxs->max = ctxs->max ? ctxs->max * 2 : 16;
103 ctxs->ctxs = realloc (ctxs->ctxs, ctxs->max * sizeof (ctxs->ctxs[0]));
104 }
105
106 ctxs->ctxs[ctxs->cur++] = ctx;
107 }
108
109 static xmutex_t release_m = X_MUTEX_INIT;
110 static xcond_t release_c = X_COND_INIT;
111 static struct tctxs releasers;
112 static int idle;
113 static int min_idle = 1;
114 static int curthreads, max_threads = 1; /* protected by release_m */
115
116 static xmutex_t acquire_m = X_MUTEX_INIT;
117 static struct tctxs acquirers;
118
119 X_THREAD_PROC(thread_proc)
120 {
121 PERL_SET_CONTEXT (perl_thx);
122
123 {
124 dTHXa (perl_thx);
125 dJMPENV;
126 struct tctx *ctx;
127 int catchret;
128
129 X_LOCK (release_m);
130
131 for (;;)
132 {
133 while (!releasers.cur)
134 if (idle <= min_idle || 1)
135 X_COND_WAIT (release_c, release_m);
136 else
137 {
138 struct timespec ts = { time (0) + idle - min_idle, 0 };
139
140 if (X_COND_TIMEDWAIT (release_c, release_m, ts) == ETIMEDOUT)
141 if (idle > min_idle && !releasers.cur)
142 break;
143 }
144
145 ctx = tctxs_get (&releasers);
146 --idle;
147 X_UNLOCK (release_m);
148
149 if (!ctx) /* timed out? */
150 break;
151
152 pthread_sigmask (SIG_SETMASK, &cursigset, 0);
153 JMPENV_PUSH (ctx->jeret);
154
155 if (!ctx->jeret)
156 while (ctx->coro)
157 CORO_SCHEDULE;
158
159 JMPENV_POP;
160 pthread_sigmask (SIG_SETMASK, &fullsigset, &cursigset);
161
162 X_LOCK (acquire_m);
163 ctx->wait_f = 1;
164 X_COND_SIGNAL (ctx->acquire_c);
165 X_UNLOCK (acquire_m);
166
167 X_LOCK (release_m);
168 ++idle;
169 }
170 }
171 }
172
173 static void
174 start_thread (void)
175 {
176 xthread_t tid;
177
178 if (curthreads >= max_threads && 0)
179 return;
180
181 ++curthreads;
182 ++idle;
183 xthread_create (&tid, thread_proc, 0);
184 }
185
186 static void
187 pmapi_release (void)
188 {
189 #if RECURSION_CHECK
190 if (X_TLS_GET (check_key))
191 croak ("perlinterp_release () called without valid perl context");
192
193 X_TLS_SET (check_key, &check_key);
194 #endif
195
196 if (! ((thread_enable ? thread_enable : global_enable) & 1))
197 {
198 X_TLS_SET (current_key, 0);
199 return;
200 }
201
202 struct tctx *ctx = tctx_get ();
203 ctx->coro = SvREFCNT_inc_NN (CORO_CURRENT);
204 ctx->wait_f = 0;
205
206 X_TLS_SET (current_key, ctx);
207 pthread_sigmask (SIG_SETMASK, &fullsigset, &cursigset);
208
209 X_LOCK (release_m);
210
211 if (idle <= min_idle)
212 start_thread ();
213
214 tctxs_put (&releasers, ctx);
215 X_COND_SIGNAL (release_c);
216
217 while (!idle && releasers.cur)
218 {
219 X_UNLOCK (release_m);
220 X_LOCK (release_m);
221 }
222
223 X_UNLOCK (release_m);
224 }
225
226 static void
227 pmapi_acquire (void)
228 {
229 int jeret;
230 struct tctx *ctx = X_TLS_GET (current_key);
231
232 #if RECURSION_CHECK
233 if (X_TLS_GET (check_key) != &check_key)
234 croak ("perlinterp_acquire () called with valid perl context");
235
236 X_TLS_SET (check_key, 0);
237 #endif
238
239 if (!ctx)
240 return;
241
242 X_LOCK (acquire_m);
243
244 tctxs_put (&acquirers, ctx);
245
246 s_epipe_signal (&ep);
247 while (!ctx->wait_f)
248 X_COND_WAIT (ctx->acquire_c, acquire_m);
249 X_UNLOCK (acquire_m);
250
251 jeret = ctx->jeret;
252 tctx_put (ctx);
253 pthread_sigmask (SIG_SETMASK, &cursigset, 0);
254
255 if (jeret)
256 {
257 dTHX;
258 JMPENV_JUMP (jeret);
259 }
260 }
261
262 static void
263 set_thread_enable (pTHX_ void *arg)
264 {
265 thread_enable = PTR2IV (arg);
266 }
267
268 MODULE = Coro::Multicore PACKAGE = Coro::Multicore
269
270 PROTOTYPES: DISABLE
271
272 BOOT:
273 {
274 #ifndef _WIN32
275 sigfillset (&fullsigset);
276 #endif
277
278 X_TLS_INIT (current_key);
279 #if RECURSION_CHECK
280 X_TLS_INIT (check_key);
281 #endif
282
283 if (s_epipe_new (&ep))
284 croak ("Coro::Multicore: unable to initialise event pipe.\n");
285
286 perl_thx = PERL_GET_CONTEXT;
287
288 I_CORO_API ("Coro::Multicore");
289
290 X_LOCK (release_m);
291 while (idle < min_idle)
292 start_thread ();
293 X_UNLOCK (release_m);
294
295 /* not perfectly efficient to do it this way, but it is simple */
296 perl_multicore_init (); /* calls release */
297 perl_multicore_api->pmapi_release = pmapi_release;
298 perl_multicore_api->pmapi_acquire = pmapi_acquire;
299 }
300
301 bool
302 enable (bool enable = NO_INIT)
303 CODE:
304 RETVAL = global_enable;
305 if (items)
306 global_enable = enable;
307 OUTPUT:
308 RETVAL
309
310 void
311 scoped_enable ()
312 CODE:
313 LEAVE; /* see Guard.xs */
314 CORO_ENTERLEAVE_SCOPE_HOOK (set_thread_enable, (void *)1, set_thread_enable, (void *)0);
315 ENTER; /* see Guard.xs */
316
317 void
318 scoped_disable ()
319 CODE:
320 LEAVE; /* see Guard.xs */
321 CORO_ENTERLEAVE_SCOPE_HOOK (set_thread_enable, (void *)2, set_thread_enable, (void *)0);
322 ENTER; /* see Guard.xs */
323
324 U32
325 min_idle_threads (U32 min = NO_INIT)
326 CODE:
327 X_LOCK (acquire_m);
328 RETVAL = min_idle;
329 if (items)
330 min_idle = min;
331 X_UNLOCK (acquire_m);
332 OUTPUT:
333 RETVAL
334
335
336 int
337 fd ()
338 CODE:
339 RETVAL = s_epipe_fd (&ep);
340 OUTPUT:
341 RETVAL
342
343 void
344 poll (...)
345 CODE:
346 s_epipe_drain (&ep);
347 X_LOCK (acquire_m);
348 while (acquirers.cur)
349 {
350 struct tctx *ctx = tctxs_get (&acquirers);
351 CORO_READY ((SV *)ctx->coro);
352 SvREFCNT_dec_NN ((SV *)ctx->coro);
353 ctx->coro = 0;
354 }
355 X_UNLOCK (acquire_m);
356
357 void
358 sleep (NV seconds)
359 CODE:
360 perlinterp_release ();
361 usleep (seconds * 1e6);
362 perlinterp_acquire ();
363