ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/Coro-Multicore/Multicore.xs
Revision: 1.9
Committed: Fri Jul 3 02:35:48 2015 UTC (8 years, 11 months ago) by root
Branch: MAIN
Changes since 1.8: +75 -50 lines
Log Message:
*** empty log message ***

File Contents

# Content
1 #define PERL_NO_GET_CONTEXT
2
3 #include "EXTERN.h"
4 #include "perl.h"
5 #include "XSUB.h"
6
7 #define X_STACKSIZE 1024 * sizeof (void *)
8
9 #include "CoroAPI.h"
10 #include "perlmulticore.h"
11 #include "schmorp.h"
12 #include "xthread.h"
13
14 #ifdef _WIN32
15 typedef char sigset_t;
16 #define pthread_sigmask(mode,new,old)
17 #endif
18
19 #ifndef SvREFCNT_dec_NN
20 #define SvREFCNT_dec_NN(sv) SvREFCNT_dec (sv)
21 #endif
22
23 #ifndef SvREFCNT_inc_NN
24 #define SvREFCNT_inc_NN(sv) SvREFCNT_inc (sv)
25 #endif
26
27 static pthread_key_t current_key;
28
29 static s_epipe ep;
30 static void *perl_thx;
31 static sigset_t cursigset, fullsigset;
32
33 static int global_enable = 0;
34 static int thread_enable; /* 0 undefined, 1 disabled, 2 enabled */
35
36 /* assigned to a thread for each release/acquire */
37 struct tctx
38 {
39 void *coro;
40 int wait_f;
41 xcond_t acquire_c;
42 };
43
44 static struct tctx *tctx_free;
45
46 static struct tctx *
47 tctx_get (void)
48 {
49 struct tctx *ctx;
50
51 if (!tctx_free)
52 {
53 ctx = malloc (sizeof (*tctx_free));
54 X_COND_CREATE (ctx->acquire_c);
55 }
56 else
57 {
58 ctx = tctx_free;
59 tctx_free = tctx_free->coro;
60 }
61
62 return ctx;
63 }
64
65 static void
66 tctx_put (struct tctx *ctx)
67 {
68 ctx->coro = tctx_free;
69 tctx_free = ctx;
70 }
71
72 /* a stack of tctxs */
73 struct tctxs
74 {
75 struct tctx **ctxs;
76 int cur, max;
77 };
78
79 static struct tctx *
80 tctxs_get (struct tctxs *ctxs)
81 {
82 return ctxs->ctxs[--ctxs->cur];
83 }
84
85 static void
86 tctxs_put (struct tctxs *ctxs, struct tctx *ctx)
87 {
88 if (ctxs->cur >= ctxs->max)
89 {
90 ctxs->max = ctxs->max ? ctxs->max * 2 : 16;
91 ctxs->ctxs = realloc (ctxs->ctxs, ctxs->max * sizeof (ctxs->ctxs[0]));
92 }
93
94 ctxs->ctxs[ctxs->cur++] = ctx;
95 }
96
97 static xmutex_t release_m = X_MUTEX_INIT;
98 static xcond_t release_c = X_COND_INIT;
99 static struct tctxs releasers;
100 static int idle;
101 static int min_idle = 1;
102 static int curthreads, max_threads = 1; /* protected by release_m */
103
104 static xmutex_t acquire_m = X_MUTEX_INIT;
105 static struct tctxs acquirers;
106
107 X_THREAD_PROC(thread_proc)
108 {
109 PERL_SET_CONTEXT (perl_thx);
110
111 {
112 dTHX; /* inefficient, we already have perl_thx, but I see no better way */
113 struct tctx *ctx;
114
115 X_LOCK (release_m);
116
117 for (;;)
118 {
119 while (!releasers.cur)
120 if (idle <= min_idle || 1)
121 X_COND_WAIT (release_c, release_m);
122 else
123 {
124 struct timespec ts = { time (0) + idle - min_idle, 0 };
125
126 if (X_COND_TIMEDWAIT (release_c, release_m, ts) == ETIMEDOUT)
127 if (idle > min_idle && !releasers.cur)
128 break;
129 }
130
131 ctx = tctxs_get (&releasers);
132 --idle;
133 X_UNLOCK (release_m);
134
135 if (!ctx) /* timed out? */
136 break;
137
138 pthread_sigmask (SIG_SETMASK, &cursigset, 0);
139
140 while (ctx->coro)
141 CORO_SCHEDULE;
142
143 pthread_sigmask (SIG_SETMASK, &fullsigset, &cursigset);
144
145 X_LOCK (acquire_m);
146 ctx->wait_f = 1;
147 X_COND_SIGNAL (ctx->acquire_c);
148 X_UNLOCK (acquire_m);
149
150 X_LOCK (release_m);
151 ++idle;
152 }
153 }
154 }
155
156 static void
157 start_thread (void)
158 {
159 xthread_t tid;
160
161 if (curthreads >= max_threads && 0)
162 return;
163
164 ++curthreads;
165 ++idle;
166 xthread_create (&tid, thread_proc, 0);
167 }
168
169 static void
170 pmapi_release (void)
171 {
172 if (!(thread_enable ? thread_enable & 1 : global_enable))
173 {
174 pthread_setspecific (current_key, 0);
175 return;
176 }
177
178 struct tctx *ctx = tctx_get ();
179 ctx->coro = SvREFCNT_inc_NN (CORO_CURRENT);
180 ctx->wait_f = 0;
181
182 pthread_setspecific (current_key, ctx);
183 pthread_sigmask (SIG_SETMASK, &fullsigset, &cursigset);
184
185 X_LOCK (release_m);
186
187 if (idle <= min_idle)
188 start_thread ();
189
190 tctxs_put (&releasers, ctx);
191 X_COND_SIGNAL (release_c);
192
193 while (!idle && releasers.cur)
194 {
195 X_UNLOCK (release_m);
196 X_LOCK (release_m);
197 }
198
199 X_UNLOCK (release_m);
200 }
201
202 static void
203 pmapi_acquire (void)
204 {
205 struct tctx *ctx = pthread_getspecific (current_key);
206
207 if (!ctx)
208 return;
209
210 X_LOCK (acquire_m);
211
212 tctxs_put (&acquirers, ctx);
213
214 s_epipe_signal (&ep);
215 while (!ctx->wait_f)
216 X_COND_WAIT (ctx->acquire_c, acquire_m);
217 X_UNLOCK (acquire_m);
218
219 tctx_put (ctx);
220 pthread_sigmask (SIG_SETMASK, &cursigset, 0);
221 }
222
223 static void
224 set_thread_enable (pTHX_ void *arg)
225 {
226 thread_enable = PTR2IV (arg);
227 }
228
229 MODULE = Coro::Multicore PACKAGE = Coro::Multicore
230
231 PROTOTYPES: DISABLE
232
233 BOOT:
234 {
235 #ifndef _WIN32
236 sigfillset (&fullsigset);
237 sigemptyset (&fullsigset);
238 #endif
239
240 pthread_key_create (&current_key, 0);
241
242 if (s_epipe_new (&ep))
243 croak ("Coro::Multicore: unable to initialise event pipe.\n");
244
245 perl_thx = PERL_GET_CONTEXT;
246
247 I_CORO_API ("Coro::Multicore");
248
249 X_LOCK (release_m);
250 while (idle < min_idle)
251 start_thread ();
252 X_UNLOCK (release_m);
253
254 /* not perfectly efficient to do it this way, but it's simple */
255 perl_multicore_init ();
256 perl_multicore_api->pmapi_release = pmapi_release;
257 perl_multicore_api->pmapi_acquire = pmapi_acquire;
258 }
259
260 bool
261 enable (bool enable = NO_INIT)
262 CODE:
263 RETVAL = global_enable;
264 if (items)
265 global_enable = enable;
266 OUTPUT:
267 RETVAL
268
269 void
270 scoped_enable ()
271 CODE:
272 LEAVE; /* see Guard.xs */
273 CORO_ENTERLEAVE_SCOPE_HOOK (set_thread_enable, (void *)1, set_thread_enable, (void *)0);
274 ENTER; /* see Guard.xs */
275
276 void
277 scoped_disable ()
278 CODE:
279 LEAVE; /* see Guard.xs */
280 CORO_ENTERLEAVE_SCOPE_HOOK (set_thread_enable, (void *)2, set_thread_enable, (void *)0);
281 ENTER; /* see Guard.xs */
282
283 U32
284 min_idle_threads (U32 min = NO_INIT)
285 CODE:
286 X_LOCK (acquire_m);
287 RETVAL = min_idle;
288 if (items)
289 min_idle = min;
290 X_UNLOCK (acquire_m);
291 OUTPUT:
292 RETVAL
293
294
295 int
296 fd ()
297 CODE:
298 RETVAL = s_epipe_fd (&ep);
299 OUTPUT:
300 RETVAL
301
302 void
303 poll (...)
304 CODE:
305 s_epipe_drain (&ep);
306 X_LOCK (acquire_m);
307 while (acquirers.cur)
308 {
309 struct tctx *ctx = tctxs_get (&acquirers);
310 CORO_READY ((SV *)ctx->coro);
311 SvREFCNT_dec_NN ((SV *)ctx->coro);
312 ctx->coro = 0;
313 }
314 X_UNLOCK (acquire_m);
315
316 void
317 sleep (NV seconds)
318 CODE:
319 perlinterp_release ();
320 usleep (seconds * 1e6);
321 perlinterp_acquire ();
322