ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/Coro-Multicore/Multicore.xs
Revision: 1.6
Committed: Mon Jun 29 13:15:53 2015 UTC (8 years, 11 months ago) by root
Branch: MAIN
Changes since 1.5: +42 -38 lines
Log Message:
*** empty log message ***

File Contents

# Content
1 #define PERL_NO_GET_CONTEXT
2
3 #include "EXTERN.h"
4 #include "perl.h"
5 #include "XSUB.h"
6
7 #define X_STACKSIZE -1
8
9 #include "CoroAPI.h"
10 #include "perlmulticore.h"
11 #include "schmorp.h"
12 #include "xthread.h"
13
14 #ifdef _WIN32
15 typedef char sigset_t;
16 #define pthread_sigmask(mode,new,old)
17 #endif
18
19 static pthread_key_t current_key;
20
21 static s_epipe ep;
22 static void *perl_thx;
23 static sigset_t cursigset, fullsigset;
24
25 static int global_enable = 0;
26 static int thread_enable; /* 0 undefined, 1 disabled, 2 enabled */
27
28 /* assigned to a thread for each release/acquire */
29 struct tctx
30 {
31 void *coro;
32 int wait_f;
33 xcond_t wait_c;
34 };
35
36 static struct tctx *tctx_free;
37
38 static int idle;
39 static int min_idle = 1;
40
41 static xmutex_t perl_m = X_MUTEX_INIT;
42 static xcond_t perl_c = X_COND_INIT;
43 static struct tctx *perl_f;
44
45 static xmutex_t wait_m = X_MUTEX_INIT;
46
47 static int wakeup_f;
48 static struct tctx **waiters;
49 static int waiters_count, waiters_max;
50
51 static struct tctx *
52 tctx_get (void)
53 {
54 struct tctx *ctx;
55
56 if (!tctx_free)
57 {
58 ctx = malloc (sizeof (*tctx_free));
59 X_COND_CREATE (ctx->wait_c);
60 }
61 else
62 {
63 ctx = tctx_free;
64 tctx_free = tctx_free->coro;
65 }
66
67 return ctx;
68 }
69
70 static void
71 tctx_put (struct tctx *ctx)
72 {
73 ctx->coro = tctx_free;
74 tctx_free = ctx;
75 }
76
77 X_THREAD_PROC(thread_proc)
78 {
79 PERL_SET_CONTEXT (perl_thx);
80
81 {
82 dTHX; /* inefficient, we already have perl_thx, but I see no better way */
83 struct tctx *ctx;
84
85 X_LOCK (perl_m);
86
87 for (;;)
88 {
89 while (!perl_f)
90 if (idle <= min_idle || 1)
91 X_COND_WAIT (perl_c, perl_m);
92 else
93 {
94 struct timespec ts = { time (0) + idle - min_idle, 0 };
95
96 if (X_COND_TIMEDWAIT (perl_c, perl_m, ts) == ETIMEDOUT)
97 if (idle > min_idle && !perl_f)
98 break;
99 }
100
101 ctx = perl_f;
102 perl_f = 0;
103 --idle;
104 X_UNLOCK (perl_m);
105
106 if (!ctx) /* timed out? */
107 break;
108
109 pthread_sigmask (SIG_SETMASK, &cursigset, 0);
110
111 while (ctx->coro)
112 CORO_SCHEDULE;
113
114 pthread_sigmask (SIG_SETMASK, &fullsigset, &cursigset);
115
116 X_LOCK (wait_m);
117 ctx->wait_f = 1;
118 X_COND_SIGNAL (ctx->wait_c);
119 X_UNLOCK (wait_m);
120
121 X_LOCK (perl_m);
122 ++idle;
123 }
124 }
125 }
126
127 static void
128 start_thread (void)
129 {
130 xthread_t tid;
131
132 ++idle;
133 xthread_create (&tid, thread_proc, 0);
134 }
135
136 static void
137 pmapi_release (void)
138 {
139 if (!(thread_enable ? thread_enable & 1 : global_enable))
140 {
141 pthread_setspecific (current_key, 0);
142 return;
143 }
144
145 struct tctx *ctx = tctx_get ();
146 ctx->coro = SvREFCNT_inc_NN (CORO_CURRENT);
147 ctx->wait_f = 0;
148
149 pthread_setspecific (current_key, ctx);
150 pthread_sigmask (SIG_SETMASK, &fullsigset, &cursigset);
151
152 X_LOCK (perl_m);
153
154 if (idle <= min_idle)
155 start_thread ();
156
157 perl_f = ctx;
158 X_COND_SIGNAL (perl_c);
159
160 X_UNLOCK (perl_m);
161 }
162
163 static void
164 pmapi_acquire (void)
165 {
166 struct tctx *ctx = pthread_getspecific (current_key);
167
168 if (!ctx)
169 return;
170
171 X_LOCK (wait_m);
172
173 if (waiters_count >= waiters_max)
174 {
175 waiters_max = waiters_max ? waiters_max * 2 : 16;
176 waiters = realloc (waiters, waiters_max * sizeof (*waiters));
177 }
178
179 waiters [waiters_count++] = ctx;
180
181 s_epipe_signal (&ep);
182 while (!ctx->wait_f)
183 X_COND_WAIT (ctx->wait_c, wait_m);
184 X_UNLOCK (wait_m);
185
186 tctx_put (ctx);
187 pthread_sigmask (SIG_SETMASK, &cursigset, 0);
188 }
189
190 static void
191 set_thread_enable (pTHX_ void *arg)
192 {
193 thread_enable = PTR2IV (arg);
194 }
195
196 MODULE = Coro::Multicore PACKAGE = Coro::Multicore
197
198 PROTOTYPES: DISABLE
199
200 BOOT:
201 {
202 #ifndef _WIN32
203 sigfillset (&fullsigset);
204 #endif
205
206 pthread_key_create (&current_key, 0);
207
208 if (s_epipe_new (&ep))
209 croak ("Coro::Multicore: unable to initialise event pipe.\n");
210
211 perl_thx = PERL_GET_CONTEXT;
212
213 I_CORO_API ("Coro::Multicore");
214
215 X_LOCK (perl_m);
216 while (idle < min_idle)
217 start_thread ();
218 start_thread ();//D
219 X_UNLOCK (perl_m);
220
221 /* not perfectly efficient to do it this way, but it's simple */
222 perl_multicore_init ();
223 perl_multicore_api->pmapi_release = pmapi_release;
224 perl_multicore_api->pmapi_acquire = pmapi_acquire;
225 }
226
227 bool
228 enable (bool enable = NO_INIT)
229 CODE:
230 RETVAL = global_enable;
231 if (items)
232 global_enable = enable;
233 OUTPUT:
234 RETVAL
235
236 void
237 scoped_enable ()
238 CODE:
239 LEAVE; /* see Guard.xs */
240 CORO_ENTERLEAVE_SCOPE_HOOK (set_thread_enable, (void *)1, set_thread_enable, (void *)0);
241 ENTER; /* see Guard.xs */
242
243 void
244 scoped_disable ()
245 CODE:
246 LEAVE; /* see Guard.xs */
247 CORO_ENTERLEAVE_SCOPE_HOOK (set_thread_enable, (void *)2, set_thread_enable, (void *)0);
248 ENTER; /* see Guard.xs */
249
250 U32
251 min_idle_threads (U32 min = NO_INIT)
252 CODE:
253 X_LOCK (wait_m);
254 RETVAL = min_idle;
255 if (items)
256 min_idle = min;
257 X_UNLOCK (wait_m);
258 OUTPUT:
259 RETVAL
260
261
262 int
263 fd ()
264 CODE:
265 RETVAL = s_epipe_fd (&ep);
266 OUTPUT:
267 RETVAL
268
269 void
270 poll (...)
271 CODE:
272 s_epipe_drain (&ep);
273 X_LOCK (wait_m);
274 while (waiters_count)
275 {
276 struct tctx *ctx = waiters [--waiters_count];
277 CORO_READY ((SV *)ctx->coro);
278 SvREFCNT_dec_NN ((SV *)ctx->coro);
279 ctx->coro = 0;
280 }
281 X_UNLOCK (wait_m);
282
283 void
284 sleep (NV seconds)
285 CODE:
286 perlinterp_release ();
287 usleep (seconds * 1e6);
288 perlinterp_acquire ();
289