ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/Coro-Multicore/Multicore.xs
Revision: 1.8
Committed: Wed Jul 1 22:39:07 2015 UTC (8 years, 11 months ago) by root
Branch: MAIN
CVS Tags: rel-0_02
Changes since 1.7: +8 -0 lines
Log Message:
*** empty log message ***

File Contents

# Content
1 #define PERL_NO_GET_CONTEXT
2
3 #include "EXTERN.h"
4 #include "perl.h"
5 #include "XSUB.h"
6
7 #define X_STACKSIZE 1024 * sizeof (void *)
8
9 #include "CoroAPI.h"
10 #include "perlmulticore.h"
11 #include "schmorp.h"
12 #include "xthread.h"
13
14 #ifdef _WIN32
15 typedef char sigset_t;
16 #define pthread_sigmask(mode,new,old)
17 #endif
18
19 #ifndef SvREFCNT_dec_NN
20 #define SvREFCNT_dec_NN(sv) SvREFCNT_dec (sv)
21 #endif
22
23 #ifndef SvREFCNT_inc_NN
24 #define SvREFCNT_inc_NN(sv) SvREFCNT_inc (sv)
25 #endif
26
27 static pthread_key_t current_key;
28
29 static s_epipe ep;
30 static void *perl_thx;
31 static sigset_t cursigset, fullsigset;
32
33 static int global_enable = 0;
34 static int thread_enable; /* 0 undefined, 1 disabled, 2 enabled */
35
36 /* assigned to a thread for each release/acquire */
37 struct tctx
38 {
39 void *coro;
40 int wait_f;
41 xcond_t wait_c;
42 };
43
44 static struct tctx *tctx_free;
45
46 static int idle;
47 static int min_idle = 1;
48
49 static xmutex_t perl_m = X_MUTEX_INIT;
50 static xcond_t perl_c = X_COND_INIT;
51 static struct tctx *perl_f;
52
53 static xmutex_t wait_m = X_MUTEX_INIT;
54
55 static int wakeup_f;
56 static struct tctx **waiters;
57 static int waiters_count, waiters_max;
58
59 static struct tctx *
60 tctx_get (void)
61 {
62 struct tctx *ctx;
63
64 if (!tctx_free)
65 {
66 ctx = malloc (sizeof (*tctx_free));
67 X_COND_CREATE (ctx->wait_c);
68 }
69 else
70 {
71 ctx = tctx_free;
72 tctx_free = tctx_free->coro;
73 }
74
75 return ctx;
76 }
77
78 static void
79 tctx_put (struct tctx *ctx)
80 {
81 ctx->coro = tctx_free;
82 tctx_free = ctx;
83 }
84
85 X_THREAD_PROC(thread_proc)
86 {
87 PERL_SET_CONTEXT (perl_thx);
88
89 {
90 dTHX; /* inefficient, we already have perl_thx, but I see no better way */
91 struct tctx *ctx;
92
93 X_LOCK (perl_m);
94
95 for (;;)
96 {
97 while (!perl_f)
98 if (idle <= min_idle || 1)
99 X_COND_WAIT (perl_c, perl_m);
100 else
101 {
102 struct timespec ts = { time (0) + idle - min_idle, 0 };
103
104 if (X_COND_TIMEDWAIT (perl_c, perl_m, ts) == ETIMEDOUT)
105 if (idle > min_idle && !perl_f)
106 break;
107 }
108
109 ctx = perl_f;
110 perl_f = 0;
111 --idle;
112 X_UNLOCK (perl_m);
113
114 if (!ctx) /* timed out? */
115 break;
116
117 pthread_sigmask (SIG_SETMASK, &cursigset, 0);
118
119 while (ctx->coro)
120 CORO_SCHEDULE;
121
122 pthread_sigmask (SIG_SETMASK, &fullsigset, &cursigset);
123
124 X_LOCK (wait_m);
125 ctx->wait_f = 1;
126 X_COND_SIGNAL (ctx->wait_c);
127 X_UNLOCK (wait_m);
128
129 X_LOCK (perl_m);
130 ++idle;
131 }
132 }
133 }
134
135 static void
136 start_thread (void)
137 {
138 xthread_t tid;
139
140 ++idle;
141 xthread_create (&tid, thread_proc, 0);
142 }
143
144 static void
145 pmapi_release (void)
146 {
147 if (!(thread_enable ? thread_enable & 1 : global_enable))
148 {
149 pthread_setspecific (current_key, 0);
150 return;
151 }
152
153 struct tctx *ctx = tctx_get ();
154 ctx->coro = SvREFCNT_inc_NN (CORO_CURRENT);
155 ctx->wait_f = 0;
156
157 pthread_setspecific (current_key, ctx);
158 pthread_sigmask (SIG_SETMASK, &fullsigset, &cursigset);
159
160 X_LOCK (perl_m);
161
162 if (idle <= min_idle)
163 start_thread ();
164
165 perl_f = ctx;
166 X_COND_SIGNAL (perl_c);
167
168 X_UNLOCK (perl_m);
169 }
170
171 static void
172 pmapi_acquire (void)
173 {
174 struct tctx *ctx = pthread_getspecific (current_key);
175
176 if (!ctx)
177 return;
178
179 X_LOCK (wait_m);
180
181 if (waiters_count >= waiters_max)
182 {
183 waiters_max = waiters_max ? waiters_max * 2 : 16;
184 waiters = realloc (waiters, waiters_max * sizeof (*waiters));
185 }
186
187 waiters [waiters_count++] = ctx;
188
189 s_epipe_signal (&ep);
190 while (!ctx->wait_f)
191 X_COND_WAIT (ctx->wait_c, wait_m);
192 X_UNLOCK (wait_m);
193
194 tctx_put (ctx);
195 pthread_sigmask (SIG_SETMASK, &cursigset, 0);
196 }
197
198 static void
199 set_thread_enable (pTHX_ void *arg)
200 {
201 thread_enable = PTR2IV (arg);
202 }
203
204 MODULE = Coro::Multicore PACKAGE = Coro::Multicore
205
206 PROTOTYPES: DISABLE
207
208 BOOT:
209 {
210 #ifndef _WIN32
211 sigfillset (&fullsigset);
212 #endif
213
214 pthread_key_create (&current_key, 0);
215
216 if (s_epipe_new (&ep))
217 croak ("Coro::Multicore: unable to initialise event pipe.\n");
218
219 perl_thx = PERL_GET_CONTEXT;
220
221 I_CORO_API ("Coro::Multicore");
222
223 X_LOCK (perl_m);
224 while (idle < min_idle)
225 start_thread ();
226 start_thread ();//D
227 X_UNLOCK (perl_m);
228
229 /* not perfectly efficient to do it this way, but it's simple */
230 perl_multicore_init ();
231 perl_multicore_api->pmapi_release = pmapi_release;
232 perl_multicore_api->pmapi_acquire = pmapi_acquire;
233 }
234
235 bool
236 enable (bool enable = NO_INIT)
237 CODE:
238 RETVAL = global_enable;
239 if (items)
240 global_enable = enable;
241 OUTPUT:
242 RETVAL
243
244 void
245 scoped_enable ()
246 CODE:
247 LEAVE; /* see Guard.xs */
248 CORO_ENTERLEAVE_SCOPE_HOOK (set_thread_enable, (void *)1, set_thread_enable, (void *)0);
249 ENTER; /* see Guard.xs */
250
251 void
252 scoped_disable ()
253 CODE:
254 LEAVE; /* see Guard.xs */
255 CORO_ENTERLEAVE_SCOPE_HOOK (set_thread_enable, (void *)2, set_thread_enable, (void *)0);
256 ENTER; /* see Guard.xs */
257
258 U32
259 min_idle_threads (U32 min = NO_INIT)
260 CODE:
261 X_LOCK (wait_m);
262 RETVAL = min_idle;
263 if (items)
264 min_idle = min;
265 X_UNLOCK (wait_m);
266 OUTPUT:
267 RETVAL
268
269
270 int
271 fd ()
272 CODE:
273 RETVAL = s_epipe_fd (&ep);
274 OUTPUT:
275 RETVAL
276
277 void
278 poll (...)
279 CODE:
280 s_epipe_drain (&ep);
281 X_LOCK (wait_m);
282 while (waiters_count)
283 {
284 struct tctx *ctx = waiters [--waiters_count];
285 CORO_READY ((SV *)ctx->coro);
286 SvREFCNT_dec_NN ((SV *)ctx->coro);
287 ctx->coro = 0;
288 }
289 X_UNLOCK (wait_m);
290
291 void
292 sleep (NV seconds)
293 CODE:
294 perlinterp_release ();
295 usleep (seconds * 1e6);
296 perlinterp_acquire ();
297