ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/Coro-Multicore/Multicore.xs
Revision: 1.10
Committed: Fri Jul 3 02:55:10 2015 UTC (8 years, 11 months ago) by root
Branch: MAIN
Changes since 1.9: +0 -1 lines
Log Message:
*** empty log message ***

File Contents

# Content
1 #define PERL_NO_GET_CONTEXT
2
3 #include "EXTERN.h"
4 #include "perl.h"
5 #include "XSUB.h"
6
7 #define X_STACKSIZE 1024 * sizeof (void *)
8
9 #include "CoroAPI.h"
10 #include "perlmulticore.h"
11 #include "schmorp.h"
12 #include "xthread.h"
13
14 #ifdef _WIN32
15 typedef char sigset_t;
16 #define pthread_sigmask(mode,new,old)
17 #endif
18
19 #ifndef SvREFCNT_dec_NN
20 #define SvREFCNT_dec_NN(sv) SvREFCNT_dec (sv)
21 #endif
22
23 #ifndef SvREFCNT_inc_NN
24 #define SvREFCNT_inc_NN(sv) SvREFCNT_inc (sv)
25 #endif
26
27 static pthread_key_t current_key;
28
29 static s_epipe ep;
30 static void *perl_thx;
31 static sigset_t cursigset, fullsigset;
32
33 static int global_enable = 0;
34 static int thread_enable; /* 0 undefined, 1 disabled, 2 enabled */
35
36 /* assigned to a thread for each release/acquire */
37 struct tctx
38 {
39 void *coro;
40 int wait_f;
41 xcond_t acquire_c;
42 };
43
44 static struct tctx *tctx_free;
45
46 static struct tctx *
47 tctx_get (void)
48 {
49 struct tctx *ctx;
50
51 if (!tctx_free)
52 {
53 ctx = malloc (sizeof (*tctx_free));
54 X_COND_CREATE (ctx->acquire_c);
55 }
56 else
57 {
58 ctx = tctx_free;
59 tctx_free = tctx_free->coro;
60 }
61
62 return ctx;
63 }
64
65 static void
66 tctx_put (struct tctx *ctx)
67 {
68 ctx->coro = tctx_free;
69 tctx_free = ctx;
70 }
71
72 /* a stack of tctxs */
73 struct tctxs
74 {
75 struct tctx **ctxs;
76 int cur, max;
77 };
78
79 static struct tctx *
80 tctxs_get (struct tctxs *ctxs)
81 {
82 return ctxs->ctxs[--ctxs->cur];
83 }
84
85 static void
86 tctxs_put (struct tctxs *ctxs, struct tctx *ctx)
87 {
88 if (ctxs->cur >= ctxs->max)
89 {
90 ctxs->max = ctxs->max ? ctxs->max * 2 : 16;
91 ctxs->ctxs = realloc (ctxs->ctxs, ctxs->max * sizeof (ctxs->ctxs[0]));
92 }
93
94 ctxs->ctxs[ctxs->cur++] = ctx;
95 }
96
97 static xmutex_t release_m = X_MUTEX_INIT;
98 static xcond_t release_c = X_COND_INIT;
99 static struct tctxs releasers;
100 static int idle;
101 static int min_idle = 1;
102 static int curthreads, max_threads = 1; /* protected by release_m */
103
104 static xmutex_t acquire_m = X_MUTEX_INIT;
105 static struct tctxs acquirers;
106
107 X_THREAD_PROC(thread_proc)
108 {
109 PERL_SET_CONTEXT (perl_thx);
110
111 {
112 dTHX; /* inefficient, we already have perl_thx, but I see no better way */
113 struct tctx *ctx;
114
115 X_LOCK (release_m);
116
117 for (;;)
118 {
119 while (!releasers.cur)
120 if (idle <= min_idle || 1)
121 X_COND_WAIT (release_c, release_m);
122 else
123 {
124 struct timespec ts = { time (0) + idle - min_idle, 0 };
125
126 if (X_COND_TIMEDWAIT (release_c, release_m, ts) == ETIMEDOUT)
127 if (idle > min_idle && !releasers.cur)
128 break;
129 }
130
131 ctx = tctxs_get (&releasers);
132 --idle;
133 X_UNLOCK (release_m);
134
135 if (!ctx) /* timed out? */
136 break;
137
138 pthread_sigmask (SIG_SETMASK, &cursigset, 0);
139
140 while (ctx->coro)
141 CORO_SCHEDULE;
142
143 pthread_sigmask (SIG_SETMASK, &fullsigset, &cursigset);
144
145 X_LOCK (acquire_m);
146 ctx->wait_f = 1;
147 X_COND_SIGNAL (ctx->acquire_c);
148 X_UNLOCK (acquire_m);
149
150 X_LOCK (release_m);
151 ++idle;
152 }
153 }
154 }
155
156 static void
157 start_thread (void)
158 {
159 xthread_t tid;
160
161 if (curthreads >= max_threads && 0)
162 return;
163
164 ++curthreads;
165 ++idle;
166 xthread_create (&tid, thread_proc, 0);
167 }
168
169 static void
170 pmapi_release (void)
171 {
172 if (!(thread_enable ? thread_enable & 1 : global_enable))
173 {
174 pthread_setspecific (current_key, 0);
175 return;
176 }
177
178 struct tctx *ctx = tctx_get ();
179 ctx->coro = SvREFCNT_inc_NN (CORO_CURRENT);
180 ctx->wait_f = 0;
181
182 pthread_setspecific (current_key, ctx);
183 pthread_sigmask (SIG_SETMASK, &fullsigset, &cursigset);
184
185 X_LOCK (release_m);
186
187 if (idle <= min_idle)
188 start_thread ();
189
190 tctxs_put (&releasers, ctx);
191 X_COND_SIGNAL (release_c);
192
193 while (!idle && releasers.cur)
194 {
195 X_UNLOCK (release_m);
196 X_LOCK (release_m);
197 }
198
199 X_UNLOCK (release_m);
200 }
201
202 static void
203 pmapi_acquire (void)
204 {
205 struct tctx *ctx = pthread_getspecific (current_key);
206
207 if (!ctx)
208 return;
209
210 X_LOCK (acquire_m);
211
212 tctxs_put (&acquirers, ctx);
213
214 s_epipe_signal (&ep);
215 while (!ctx->wait_f)
216 X_COND_WAIT (ctx->acquire_c, acquire_m);
217 X_UNLOCK (acquire_m);
218
219 tctx_put (ctx);
220 pthread_sigmask (SIG_SETMASK, &cursigset, 0);
221 }
222
223 static void
224 set_thread_enable (pTHX_ void *arg)
225 {
226 thread_enable = PTR2IV (arg);
227 }
228
229 MODULE = Coro::Multicore PACKAGE = Coro::Multicore
230
231 PROTOTYPES: DISABLE
232
233 BOOT:
234 {
235 #ifndef _WIN32
236 sigfillset (&fullsigset);
237 #endif
238
239 pthread_key_create (&current_key, 0);
240
241 if (s_epipe_new (&ep))
242 croak ("Coro::Multicore: unable to initialise event pipe.\n");
243
244 perl_thx = PERL_GET_CONTEXT;
245
246 I_CORO_API ("Coro::Multicore");
247
248 X_LOCK (release_m);
249 while (idle < min_idle)
250 start_thread ();
251 X_UNLOCK (release_m);
252
253 /* not perfectly efficient to do it this way, but it's simple */
254 perl_multicore_init ();
255 perl_multicore_api->pmapi_release = pmapi_release;
256 perl_multicore_api->pmapi_acquire = pmapi_acquire;
257 }
258
259 bool
260 enable (bool enable = NO_INIT)
261 CODE:
262 RETVAL = global_enable;
263 if (items)
264 global_enable = enable;
265 OUTPUT:
266 RETVAL
267
268 void
269 scoped_enable ()
270 CODE:
271 LEAVE; /* see Guard.xs */
272 CORO_ENTERLEAVE_SCOPE_HOOK (set_thread_enable, (void *)1, set_thread_enable, (void *)0);
273 ENTER; /* see Guard.xs */
274
275 void
276 scoped_disable ()
277 CODE:
278 LEAVE; /* see Guard.xs */
279 CORO_ENTERLEAVE_SCOPE_HOOK (set_thread_enable, (void *)2, set_thread_enable, (void *)0);
280 ENTER; /* see Guard.xs */
281
282 U32
283 min_idle_threads (U32 min = NO_INIT)
284 CODE:
285 X_LOCK (acquire_m);
286 RETVAL = min_idle;
287 if (items)
288 min_idle = min;
289 X_UNLOCK (acquire_m);
290 OUTPUT:
291 RETVAL
292
293
294 int
295 fd ()
296 CODE:
297 RETVAL = s_epipe_fd (&ep);
298 OUTPUT:
299 RETVAL
300
301 void
302 poll (...)
303 CODE:
304 s_epipe_drain (&ep);
305 X_LOCK (acquire_m);
306 while (acquirers.cur)
307 {
308 struct tctx *ctx = tctxs_get (&acquirers);
309 CORO_READY ((SV *)ctx->coro);
310 SvREFCNT_dec_NN ((SV *)ctx->coro);
311 ctx->coro = 0;
312 }
313 X_UNLOCK (acquire_m);
314
315 void
316 sleep (NV seconds)
317 CODE:
318 perlinterp_release ();
319 usleep (seconds * 1e6);
320 perlinterp_acquire ();
321