ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/Coro-Multicore/Multicore.xs
Revision: 1.14
Committed: Sat Dec 19 23:56:14 2015 UTC (8 years, 5 months ago) by root
Branch: MAIN
Changes since 1.13: +3 -2 lines
Log Message:
*** empty log message ***

File Contents

# Content
1 #define PERL_NO_GET_CONTEXT
2
3 #include "EXTERN.h"
4 #include "perl.h"
5 #include "XSUB.h"
6
7 #define X_STACKSIZE 1024 * sizeof (void *)
8
9 #include "CoroAPI.h"
10 #include "perlmulticore.h"
11 #include "schmorp.h"
12 #include "xthread.h"
13
14 #ifdef _WIN32
15 typedef char sigset_t;
16 #define pthread_sigmask(mode,new,old)
17 #endif
18
19 #ifndef SvREFCNT_dec_NN
20 #define SvREFCNT_dec_NN(sv) SvREFCNT_dec (sv)
21 #endif
22
23 #ifndef SvREFCNT_inc_NN
24 #define SvREFCNT_inc_NN(sv) SvREFCNT_inc (sv)
25 #endif
26
27 static X_TLS_DECLARE(current_key);
28
29 static s_epipe ep;
30 static void *perl_thx;
31 static sigset_t cursigset, fullsigset;
32
33 static int global_enable = 0;
34 static int thread_enable; /* 0 undefined, 1 disabled, 2 enabled */
35
36 /* assigned to a thread for each release/acquire */
37 struct tctx
38 {
39 void *coro;
40 int wait_f;
41 xcond_t acquire_c;
42 int jeret;
43 };
44
45 static struct tctx *tctx_free;
46
47 static struct tctx *
48 tctx_get (void)
49 {
50 struct tctx *ctx;
51
52 if (!tctx_free)
53 {
54 ctx = malloc (sizeof (*tctx_free));
55 X_COND_CREATE (ctx->acquire_c);
56 }
57 else
58 {
59 ctx = tctx_free;
60 tctx_free = tctx_free->coro;
61 }
62
63 return ctx;
64 }
65
66 static void
67 tctx_put (struct tctx *ctx)
68 {
69 ctx->coro = tctx_free;
70 tctx_free = ctx;
71 }
72
73 /* a stack of tctxs */
74 struct tctxs
75 {
76 struct tctx **ctxs;
77 int cur, max;
78 };
79
80 static struct tctx *
81 tctxs_get (struct tctxs *ctxs)
82 {
83 return ctxs->ctxs[--ctxs->cur];
84 }
85
86 static void
87 tctxs_put (struct tctxs *ctxs, struct tctx *ctx)
88 {
89 if (ctxs->cur >= ctxs->max)
90 {
91 ctxs->max = ctxs->max ? ctxs->max * 2 : 16;
92 ctxs->ctxs = realloc (ctxs->ctxs, ctxs->max * sizeof (ctxs->ctxs[0]));
93 }
94
95 ctxs->ctxs[ctxs->cur++] = ctx;
96 }
97
98 static xmutex_t release_m = X_MUTEX_INIT;
99 static xcond_t release_c = X_COND_INIT;
100 static struct tctxs releasers;
101 static int idle;
102 static int min_idle = 1;
103 static int curthreads, max_threads = 1; /* protected by release_m */
104
105 static xmutex_t acquire_m = X_MUTEX_INIT;
106 static struct tctxs acquirers;
107
108 X_THREAD_PROC(thread_proc)
109 {
110 PERL_SET_CONTEXT (perl_thx);
111
112 {
113 dTHX; /* inefficient, we already have perl_thx, but I see no better way */
114 dJMPENV;
115 struct tctx *ctx;
116 int catchret;
117
118 X_LOCK (release_m);
119
120 for (;;)
121 {
122 while (!releasers.cur)
123 if (idle <= min_idle || 1)
124 X_COND_WAIT (release_c, release_m);
125 else
126 {
127 struct timespec ts = { time (0) + idle - min_idle, 0 };
128
129 if (X_COND_TIMEDWAIT (release_c, release_m, ts) == ETIMEDOUT)
130 if (idle > min_idle && !releasers.cur)
131 break;
132 }
133
134 ctx = tctxs_get (&releasers);
135 --idle;
136 X_UNLOCK (release_m);
137
138 if (!ctx) /* timed out? */
139 break;
140
141 pthread_sigmask (SIG_SETMASK, &cursigset, 0);
142 JMPENV_PUSH (ctx->jeret);
143
144 if (!ctx->jeret)
145 while (ctx->coro)
146 CORO_SCHEDULE;
147
148 JMPENV_POP;
149 pthread_sigmask (SIG_SETMASK, &fullsigset, &cursigset);
150
151 X_LOCK (acquire_m);
152 ctx->wait_f = 1;
153 X_COND_SIGNAL (ctx->acquire_c);
154 X_UNLOCK (acquire_m);
155
156 X_LOCK (release_m);
157 ++idle;
158 }
159 }
160 }
161
162 static void
163 start_thread (void)
164 {
165 xthread_t tid;
166
167 if (curthreads >= max_threads && 0)
168 return;
169
170 ++curthreads;
171 ++idle;
172 xthread_create (&tid, thread_proc, 0);
173 }
174
175 static void
176 pmapi_release (void)
177 {
178 if (! ((thread_enable ? thread_enable : global_enable) & 1))
179 {
180 X_TLS_SET (current_key, 0);
181 return;
182 }
183
184 struct tctx *ctx = tctx_get ();
185 ctx->coro = SvREFCNT_inc_NN (CORO_CURRENT);
186 ctx->wait_f = 0;
187
188 X_TLS_SET (current_key, ctx);
189 pthread_sigmask (SIG_SETMASK, &fullsigset, &cursigset);
190
191 X_LOCK (release_m);
192
193 if (idle <= min_idle)
194 start_thread ();
195
196 tctxs_put (&releasers, ctx);
197 X_COND_SIGNAL (release_c);
198
199 while (!idle && releasers.cur)
200 {
201 X_UNLOCK (release_m);
202 X_LOCK (release_m);
203 }
204
205 X_UNLOCK (release_m);
206 }
207
208 static void
209 pmapi_acquire (void)
210 {
211 int jeret;
212 struct tctx *ctx = X_TLS_GET (current_key);
213
214 if (!ctx)
215 return;
216
217 X_LOCK (acquire_m);
218
219 tctxs_put (&acquirers, ctx);
220
221 s_epipe_signal (&ep);
222 while (!ctx->wait_f)
223 X_COND_WAIT (ctx->acquire_c, acquire_m);
224 X_UNLOCK (acquire_m);
225
226 jeret = ctx->jeret;
227 tctx_put (ctx);
228 pthread_sigmask (SIG_SETMASK, &cursigset, 0);
229
230 if (jeret)
231 JMPENV_JUMP (jeret);
232 }
233
234 static void
235 set_thread_enable (pTHX_ void *arg)
236 {
237 thread_enable = PTR2IV (arg);
238 }
239
240 MODULE = Coro::Multicore PACKAGE = Coro::Multicore
241
242 PROTOTYPES: DISABLE
243
244 BOOT:
245 {
246 #ifndef _WIN32
247 sigfillset (&fullsigset);
248 #endif
249
250 X_TLS_INIT (current_key);
251
252 if (s_epipe_new (&ep))
253 croak ("Coro::Multicore: unable to initialise event pipe.\n");
254
255 perl_thx = PERL_GET_CONTEXT;
256
257 I_CORO_API ("Coro::Multicore");
258
259 X_LOCK (release_m);
260 while (idle < min_idle)
261 start_thread ();
262 X_UNLOCK (release_m);
263
264 /* not perfectly efficient to do it this way, but it is simple */
265 perl_multicore_init (); /* calls release */
266 perl_multicore_acquire ();
267 perl_multicore_api->pmapi_release = pmapi_release;
268 perl_multicore_api->pmapi_acquire = pmapi_acquire;
269 }
270
271 bool
272 enable (bool enable = NO_INIT)
273 CODE:
274 RETVAL = global_enable;
275 if (items)
276 global_enable = enable;
277 OUTPUT:
278 RETVAL
279
280 void
281 scoped_enable ()
282 CODE:
283 LEAVE; /* see Guard.xs */
284 CORO_ENTERLEAVE_SCOPE_HOOK (set_thread_enable, (void *)1, set_thread_enable, (void *)0);
285 ENTER; /* see Guard.xs */
286
287 void
288 scoped_disable ()
289 CODE:
290 LEAVE; /* see Guard.xs */
291 CORO_ENTERLEAVE_SCOPE_HOOK (set_thread_enable, (void *)2, set_thread_enable, (void *)0);
292 ENTER; /* see Guard.xs */
293
294 U32
295 min_idle_threads (U32 min = NO_INIT)
296 CODE:
297 X_LOCK (acquire_m);
298 RETVAL = min_idle;
299 if (items)
300 min_idle = min;
301 X_UNLOCK (acquire_m);
302 OUTPUT:
303 RETVAL
304
305
306 int
307 fd ()
308 CODE:
309 RETVAL = s_epipe_fd (&ep);
310 OUTPUT:
311 RETVAL
312
313 void
314 poll (...)
315 CODE:
316 s_epipe_drain (&ep);
317 X_LOCK (acquire_m);
318 while (acquirers.cur)
319 {
320 struct tctx *ctx = tctxs_get (&acquirers);
321 CORO_READY ((SV *)ctx->coro);
322 SvREFCNT_dec_NN ((SV *)ctx->coro);
323 ctx->coro = 0;
324 }
325 X_UNLOCK (acquire_m);
326
327 void
328 sleep (NV seconds)
329 CODE:
330 perlinterp_release ();
331 usleep (seconds * 1e6);
332 perlinterp_acquire ();
333