ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/Coro-Multicore/Multicore.xs
Revision: 1.17
Committed: Sun Aug 12 21:56:21 2018 UTC (5 years, 9 months ago) by root
Branch: MAIN
CVS Tags: rel-1_0
Changes since 1.16: +5 -2 lines
Log Message:
1.0

File Contents

# Content
1 #define PERL_NO_GET_CONTEXT
2
3 #include "EXTERN.h"
4 #include "perl.h"
5 #include "XSUB.h"
6
7 #define X_STACKSIZE 1024 * sizeof (void *)
8
9 #include "CoroAPI.h"
10 #include "perlmulticore.h"
11 #include "schmorp.h"
12 #include "xthread.h"
13
14 #ifdef _WIN32
15 typedef char sigset_t;
16 #define pthread_sigmask(mode,new,old)
17 #endif
18
19 #ifndef SvREFCNT_dec_NN
20 #define SvREFCNT_dec_NN(sv) SvREFCNT_dec (sv)
21 #endif
22
23 #ifndef SvREFCNT_inc_NN
24 #define SvREFCNT_inc_NN(sv) SvREFCNT_inc (sv)
25 #endif
26
27 #define RECURSION_CHECK 0
28
29 static X_TLS_DECLARE(current_key);
30 #if RECURSION_CHECK
31 static X_TLS_DECLARE(check_key);
32 #endif
33
34
35 static s_epipe ep;
36 static void *perl_thx;
37 static sigset_t cursigset, fullsigset;
38
39 static int global_enable = 0;
40 static int thread_enable; /* 0 undefined, 1 disabled, 2 enabled */
41
42 /* assigned to a thread for each release/acquire */
43 struct tctx
44 {
45 void *coro;
46 int wait_f;
47 xcond_t acquire_c;
48 int jeret;
49 };
50
51 static struct tctx *tctx_free;
52
53 static struct tctx *
54 tctx_get (void)
55 {
56 struct tctx *ctx;
57
58 if (!tctx_free)
59 {
60 ctx = malloc (sizeof (*tctx_free));
61 X_COND_CREATE (ctx->acquire_c);
62 }
63 else
64 {
65 ctx = tctx_free;
66 tctx_free = tctx_free->coro;
67 }
68
69 return ctx;
70 }
71
72 static void
73 tctx_put (struct tctx *ctx)
74 {
75 ctx->coro = tctx_free;
76 tctx_free = ctx;
77 }
78
79 /* a stack of tctxs */
80 struct tctxs
81 {
82 struct tctx **ctxs;
83 int cur, max;
84 };
85
86 static struct tctx *
87 tctxs_get (struct tctxs *ctxs)
88 {
89 return ctxs->ctxs[--ctxs->cur];
90 }
91
92 static void
93 tctxs_put (struct tctxs *ctxs, struct tctx *ctx)
94 {
95 if (ctxs->cur >= ctxs->max)
96 {
97 ctxs->max = ctxs->max ? ctxs->max * 2 : 16;
98 ctxs->ctxs = realloc (ctxs->ctxs, ctxs->max * sizeof (ctxs->ctxs[0]));
99 }
100
101 ctxs->ctxs[ctxs->cur++] = ctx;
102 }
103
104 static xmutex_t release_m = X_MUTEX_INIT;
105 static xcond_t release_c = X_COND_INIT;
106 static struct tctxs releasers;
107 static int idle;
108 static int min_idle = 1;
109 static int curthreads, max_threads = 1; /* protected by release_m */
110
111 static xmutex_t acquire_m = X_MUTEX_INIT;
112 static struct tctxs acquirers;
113
114 X_THREAD_PROC(thread_proc)
115 {
116 PERL_SET_CONTEXT (perl_thx);
117
118 {
119 dTHXa (perl_thx);
120 dJMPENV;
121 struct tctx *ctx;
122 int catchret;
123
124 X_LOCK (release_m);
125
126 for (;;)
127 {
128 while (!releasers.cur)
129 if (idle <= min_idle || 1)
130 X_COND_WAIT (release_c, release_m);
131 else
132 {
133 struct timespec ts = { time (0) + idle - min_idle, 0 };
134
135 if (X_COND_TIMEDWAIT (release_c, release_m, ts) == ETIMEDOUT)
136 if (idle > min_idle && !releasers.cur)
137 break;
138 }
139
140 ctx = tctxs_get (&releasers);
141 --idle;
142 X_UNLOCK (release_m);
143
144 if (!ctx) /* timed out? */
145 break;
146
147 pthread_sigmask (SIG_SETMASK, &cursigset, 0);
148 JMPENV_PUSH (ctx->jeret);
149
150 if (!ctx->jeret)
151 while (ctx->coro)
152 CORO_SCHEDULE;
153
154 JMPENV_POP;
155 pthread_sigmask (SIG_SETMASK, &fullsigset, &cursigset);
156
157 X_LOCK (acquire_m);
158 ctx->wait_f = 1;
159 X_COND_SIGNAL (ctx->acquire_c);
160 X_UNLOCK (acquire_m);
161
162 X_LOCK (release_m);
163 ++idle;
164 }
165 }
166 }
167
168 static void
169 start_thread (void)
170 {
171 xthread_t tid;
172
173 if (curthreads >= max_threads && 0)
174 return;
175
176 ++curthreads;
177 ++idle;
178 xthread_create (&tid, thread_proc, 0);
179 }
180
181 static void
182 pmapi_release (void)
183 {
184 #if RECURSION_CHECK
185 if (X_TLS_GET (check_key))
186 croak ("perlinterp_release () called without valid perl context");
187
188 X_TLS_SET (check_key, &check_key);
189 #endif
190
191 if (! ((thread_enable ? thread_enable : global_enable) & 1))
192 {
193 X_TLS_SET (current_key, 0);
194 return;
195 }
196
197 struct tctx *ctx = tctx_get ();
198 ctx->coro = SvREFCNT_inc_NN (CORO_CURRENT);
199 ctx->wait_f = 0;
200
201 X_TLS_SET (current_key, ctx);
202 pthread_sigmask (SIG_SETMASK, &fullsigset, &cursigset);
203
204 X_LOCK (release_m);
205
206 if (idle <= min_idle)
207 start_thread ();
208
209 tctxs_put (&releasers, ctx);
210 X_COND_SIGNAL (release_c);
211
212 while (!idle && releasers.cur)
213 {
214 X_UNLOCK (release_m);
215 X_LOCK (release_m);
216 }
217
218 X_UNLOCK (release_m);
219 }
220
221 static void
222 pmapi_acquire (void)
223 {
224 int jeret;
225 struct tctx *ctx = X_TLS_GET (current_key);
226
227 #if RECURSION_CHECK
228 if (X_TLS_GET (check_key) != &check_key)
229 croak ("perlinterp_acquire () called with valid perl context");
230
231 X_TLS_SET (check_key, 0);
232 #endif
233
234 if (!ctx)
235 return;
236
237 X_LOCK (acquire_m);
238
239 tctxs_put (&acquirers, ctx);
240
241 s_epipe_signal (&ep);
242 while (!ctx->wait_f)
243 X_COND_WAIT (ctx->acquire_c, acquire_m);
244 X_UNLOCK (acquire_m);
245
246 jeret = ctx->jeret;
247 tctx_put (ctx);
248 pthread_sigmask (SIG_SETMASK, &cursigset, 0);
249
250 if (jeret)
251 {
252 dTHX;
253 JMPENV_JUMP (jeret);
254 }
255 }
256
257 static void
258 set_thread_enable (pTHX_ void *arg)
259 {
260 thread_enable = PTR2IV (arg);
261 }
262
263 MODULE = Coro::Multicore PACKAGE = Coro::Multicore
264
265 PROTOTYPES: DISABLE
266
267 BOOT:
268 {
269 #ifndef _WIN32
270 sigfillset (&fullsigset);
271 #endif
272
273 X_TLS_INIT (current_key);
274 #if RECURSION_CHECK
275 X_TLS_INIT (check_key);
276 #endif
277
278 if (s_epipe_new (&ep))
279 croak ("Coro::Multicore: unable to initialise event pipe.\n");
280
281 perl_thx = PERL_GET_CONTEXT;
282
283 I_CORO_API ("Coro::Multicore");
284
285 X_LOCK (release_m);
286 while (idle < min_idle)
287 start_thread ();
288 X_UNLOCK (release_m);
289
290 /* not perfectly efficient to do it this way, but it is simple */
291 perl_multicore_init (); /* calls release */
292 perl_multicore_api->pmapi_release = pmapi_release;
293 perl_multicore_api->pmapi_acquire = pmapi_acquire;
294 }
295
296 bool
297 enable (bool enable = NO_INIT)
298 CODE:
299 RETVAL = global_enable;
300 if (items)
301 global_enable = enable;
302 OUTPUT:
303 RETVAL
304
305 void
306 scoped_enable ()
307 CODE:
308 LEAVE; /* see Guard.xs */
309 CORO_ENTERLEAVE_SCOPE_HOOK (set_thread_enable, (void *)1, set_thread_enable, (void *)0);
310 ENTER; /* see Guard.xs */
311
312 void
313 scoped_disable ()
314 CODE:
315 LEAVE; /* see Guard.xs */
316 CORO_ENTERLEAVE_SCOPE_HOOK (set_thread_enable, (void *)2, set_thread_enable, (void *)0);
317 ENTER; /* see Guard.xs */
318
319 U32
320 min_idle_threads (U32 min = NO_INIT)
321 CODE:
322 X_LOCK (acquire_m);
323 RETVAL = min_idle;
324 if (items)
325 min_idle = min;
326 X_UNLOCK (acquire_m);
327 OUTPUT:
328 RETVAL
329
330
331 int
332 fd ()
333 CODE:
334 RETVAL = s_epipe_fd (&ep);
335 OUTPUT:
336 RETVAL
337
338 void
339 poll (...)
340 CODE:
341 s_epipe_drain (&ep);
342 X_LOCK (acquire_m);
343 while (acquirers.cur)
344 {
345 struct tctx *ctx = tctxs_get (&acquirers);
346 CORO_READY ((SV *)ctx->coro);
347 SvREFCNT_dec_NN ((SV *)ctx->coro);
348 ctx->coro = 0;
349 }
350 X_UNLOCK (acquire_m);
351
352 void
353 sleep (NV seconds)
354 CODE:
355 perlinterp_release ();
356 usleep (seconds * 1e6);
357 perlinterp_acquire ();
358