ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/Coro-Multicore/Multicore.xs
Revision: 1.22
Committed: Sun Aug 26 15:42:57 2018 UTC (5 years, 9 months ago) by root
Branch: MAIN
Changes since 1.21: +5 -1 lines
Log Message:
*** empty log message ***

File Contents

# Content
1 /* most win32 perls are beyond fixing, requiring dTHX */
2 /* even for ISO-C functions such as malloc. avoid! avoid! avoid! */
3 /* and fail to define numerous symbols, but still overrwide them */
4 /* with non-working versions (e.g. setjmp). */
5 #ifdef _WIN32
6 /*# define PERL_CORE 1 fixes some, breaks others */
7 #else
8 # define PERL_NO_GET_CONTEXT
9 #endif
10
11 #include "EXTERN.h"
12 #include "perl.h"
13 #include "XSUB.h"
14
15 #define X_STACKSIZE 1024 * sizeof (void *)
16
17 #include "CoroAPI.h"
18 #include "perlmulticore.h"
19 #include "schmorp.h"
20 #include "xthread.h"
21
22 #ifdef _WIN32
23 #ifndef sigset_t
24 #define sigset_t int
25 #endif
26 #endif
27
28 #ifndef SvREFCNT_dec_NN
29 #define SvREFCNT_dec_NN(sv) SvREFCNT_dec (sv)
30 #endif
31
32 #ifndef SvREFCNT_dec_simple_void_NN
33 #define SvREFCNT_dec_simple_void_NN(sv) SvREFCNT_dec (sv)
34 #endif
35
36 #ifndef SvREFCNT_inc_NN
37 #define SvREFCNT_inc_NN(sv) SvREFCNT_inc (sv)
38 #endif
39
40 #define RECURSION_CHECK 0
41
42 static X_TLS_DECLARE(current_key);
43 #if RECURSION_CHECK
44 static X_TLS_DECLARE(check_key);
45 #endif
46
47
48 static s_epipe ep;
49 static void *perl_thx;
50 static sigset_t cursigset, fullsigset;
51
52 static int global_enable = 0;
53 static int thread_enable; /* 0 undefined, 1 disabled, 2 enabled */
54
55 /* assigned to a thread for each release/acquire */
56 struct tctx
57 {
58 void *coro;
59 int wait_f;
60 xcond_t acquire_c;
61 int jeret;
62 };
63
64 static struct tctx *tctx_free;
65
66 static struct tctx *
67 tctx_get (void)
68 {
69 struct tctx *ctx;
70
71 if (!tctx_free)
72 {
73 ctx = malloc (sizeof (*tctx_free));
74 X_COND_CREATE (ctx->acquire_c);
75 }
76 else
77 {
78 ctx = tctx_free;
79 tctx_free = tctx_free->coro;
80 }
81
82 return ctx;
83 }
84
85 static void
86 tctx_put (struct tctx *ctx)
87 {
88 ctx->coro = tctx_free;
89 tctx_free = ctx;
90 }
91
92 /* a stack of tctxs */
93 struct tctxs
94 {
95 struct tctx **ctxs;
96 int cur, max;
97 };
98
99 static struct tctx *
100 tctxs_get (struct tctxs *ctxs)
101 {
102 return ctxs->ctxs[--ctxs->cur];
103 }
104
105 static void
106 tctxs_put (struct tctxs *ctxs, struct tctx *ctx)
107 {
108 if (ctxs->cur >= ctxs->max)
109 {
110 ctxs->max = ctxs->max ? ctxs->max * 2 : 16;
111 ctxs->ctxs = realloc (ctxs->ctxs, ctxs->max * sizeof (ctxs->ctxs[0]));
112 }
113
114 ctxs->ctxs[ctxs->cur++] = ctx;
115 }
116
117 static xmutex_t release_m = X_MUTEX_INIT;
118 static xcond_t release_c = X_COND_INIT;
119 static struct tctxs releasers;
120 static int idle;
121 static int min_idle = 1;
122 static int curthreads, max_threads = 1; /* protected by release_m */
123
124 static xmutex_t acquire_m = X_MUTEX_INIT;
125 static struct tctxs acquirers;
126
127 X_THREAD_PROC(thread_proc)
128 {
129 PERL_SET_CONTEXT (perl_thx);
130
131 {
132 dTHXa (perl_thx);
133 dJMPENV;
134 struct tctx *ctx;
135 int catchret;
136
137 X_LOCK (release_m);
138
139 for (;;)
140 {
141 while (!releasers.cur)
142 if (idle <= min_idle || 1)
143 X_COND_WAIT (release_c, release_m);
144 else
145 {
146 struct timespec ts = { time (0) + idle - min_idle, 0 };
147
148 if (X_COND_TIMEDWAIT (release_c, release_m, ts) == ETIMEDOUT)
149 if (idle > min_idle && !releasers.cur)
150 break;
151 }
152
153 ctx = tctxs_get (&releasers);
154 --idle;
155 X_UNLOCK (release_m);
156
157 if (!ctx) /* timed out? */
158 break;
159
160 pthread_sigmask (SIG_SETMASK, &cursigset, 0);
161 JMPENV_PUSH (ctx->jeret);
162
163 if (!ctx->jeret)
164 while (ctx->coro)
165 CORO_SCHEDULE;
166
167 JMPENV_POP;
168 pthread_sigmask (SIG_SETMASK, &fullsigset, &cursigset);
169
170 X_LOCK (acquire_m);
171 ctx->wait_f = 1;
172 X_COND_SIGNAL (ctx->acquire_c);
173 X_UNLOCK (acquire_m);
174
175 X_LOCK (release_m);
176 ++idle;
177 }
178 }
179 }
180
181 static void
182 start_thread (void)
183 {
184 xthread_t tid;
185
186 if (curthreads >= max_threads && 0)
187 return;
188
189 ++curthreads;
190 ++idle;
191 xthread_create (&tid, thread_proc, 0);
192 }
193
194 static void
195 pmapi_release (void)
196 {
197 #if RECURSION_CHECK
198 if (X_TLS_GET (check_key))
199 croak ("perlinterp_release () called without valid perl context");
200
201 X_TLS_SET (check_key, &check_key);
202 #endif
203
204 if (! ((thread_enable ? thread_enable : global_enable) & 1))
205 {
206 X_TLS_SET (current_key, 0);
207 return;
208 }
209
210 struct tctx *ctx = tctx_get ();
211 ctx->coro = SvREFCNT_inc_simple_NN (CORO_CURRENT);
212 ctx->wait_f = 0;
213
214 X_TLS_SET (current_key, ctx);
215 pthread_sigmask (SIG_SETMASK, &fullsigset, &cursigset);
216
217 X_LOCK (release_m);
218
219 if (idle <= min_idle)
220 start_thread ();
221
222 tctxs_put (&releasers, ctx);
223 X_COND_SIGNAL (release_c);
224
225 while (!idle && releasers.cur)
226 {
227 X_UNLOCK (release_m);
228 X_LOCK (release_m);
229 }
230
231 X_UNLOCK (release_m);
232 }
233
234 static void
235 pmapi_acquire (void)
236 {
237 int jeret;
238 struct tctx *ctx = X_TLS_GET (current_key);
239
240 #if RECURSION_CHECK
241 if (X_TLS_GET (check_key) != &check_key)
242 croak ("perlinterp_acquire () called with valid perl context");
243
244 X_TLS_SET (check_key, 0);
245 #endif
246
247 if (!ctx)
248 return;
249
250 X_LOCK (acquire_m);
251
252 tctxs_put (&acquirers, ctx);
253
254 s_epipe_signal (&ep);
255 while (!ctx->wait_f)
256 X_COND_WAIT (ctx->acquire_c, acquire_m);
257 X_UNLOCK (acquire_m);
258
259 jeret = ctx->jeret;
260 tctx_put (ctx);
261 pthread_sigmask (SIG_SETMASK, &cursigset, 0);
262
263 if (jeret)
264 {
265 dTHX;
266 JMPENV_JUMP (jeret);
267 }
268 }
269
270 static void
271 set_thread_enable (pTHX_ void *arg)
272 {
273 thread_enable = PTR2IV (arg);
274 }
275
276 MODULE = Coro::Multicore PACKAGE = Coro::Multicore
277
278 PROTOTYPES: DISABLE
279
280 BOOT:
281 {
282 #ifndef _WIN32
283 sigfillset (&fullsigset);
284 #endif
285
286 X_TLS_INIT (current_key);
287 #if RECURSION_CHECK
288 X_TLS_INIT (check_key);
289 #endif
290
291 if (s_epipe_new (&ep))
292 croak ("Coro::Multicore: unable to initialise event pipe.\n");
293
294 perl_thx = PERL_GET_CONTEXT;
295
296 I_CORO_API ("Coro::Multicore");
297
298 X_LOCK (release_m);
299 while (idle < min_idle)
300 start_thread ();
301 X_UNLOCK (release_m);
302
303 /* not perfectly efficient to do it this way, but it is simple */
304 perl_multicore_init (); /* calls release */
305 perl_multicore_api->pmapi_release = pmapi_release;
306 perl_multicore_api->pmapi_acquire = pmapi_acquire;
307 }
308
309 bool
310 enable (bool enable = NO_INIT)
311 CODE:
312 RETVAL = global_enable;
313 if (items)
314 global_enable = enable;
315 OUTPUT:
316 RETVAL
317
318 void
319 scoped_enable ()
320 CODE:
321 LEAVE; /* see Guard.xs */
322 CORO_ENTERLEAVE_SCOPE_HOOK (set_thread_enable, (void *)1, set_thread_enable, (void *)0);
323 ENTER; /* see Guard.xs */
324
325 void
326 scoped_disable ()
327 CODE:
328 LEAVE; /* see Guard.xs */
329 CORO_ENTERLEAVE_SCOPE_HOOK (set_thread_enable, (void *)2, set_thread_enable, (void *)0);
330 ENTER; /* see Guard.xs */
331
332 U32
333 min_idle_threads (U32 min = NO_INIT)
334 CODE:
335 X_LOCK (acquire_m);
336 RETVAL = min_idle;
337 if (items)
338 min_idle = min;
339 X_UNLOCK (acquire_m);
340 OUTPUT:
341 RETVAL
342
343
344 int
345 fd ()
346 CODE:
347 RETVAL = s_epipe_fd (&ep);
348 OUTPUT:
349 RETVAL
350
351 void
352 poll (...)
353 CODE:
354 s_epipe_drain (&ep);
355 X_LOCK (acquire_m);
356 while (acquirers.cur)
357 {
358 struct tctx *ctx = tctxs_get (&acquirers);
359 CORO_READY ((SV *)ctx->coro);
360 SvREFCNT_dec_simple_void_NN ((SV *)ctx->coro);
361 ctx->coro = 0;
362 }
363 X_UNLOCK (acquire_m);
364
365 void
366 sleep (NV seconds)
367 CODE:
368 perlinterp_release ();
369 {
370 int nsec = seconds;
371 if (nsec) sleep (nsec);
372 nsec = (seconds - nsec) * 1e9;
373 if (nsec) usleep (nsec);
374 }
375 perlinterp_acquire ();
376