… | |
… | |
2 | |
2 | |
3 | #include "EXTERN.h" |
3 | #include "EXTERN.h" |
4 | #include "perl.h" |
4 | #include "perl.h" |
5 | #include "XSUB.h" |
5 | #include "XSUB.h" |
6 | |
6 | |
7 | #define X_STACKSIZE -1 |
7 | #define X_STACKSIZE 1024 * sizeof (void *) |
8 | |
8 | |
9 | #include "CoroAPI.h" |
9 | #include "CoroAPI.h" |
10 | #include "perlmulticore.h" |
10 | #include "perlmulticore.h" |
11 | #include "schmorp.h" |
11 | #include "schmorp.h" |
12 | #include "xthread.h" |
12 | #include "xthread.h" |
… | |
… | |
14 | #ifdef _WIN32 |
14 | #ifdef _WIN32 |
15 | typedef char sigset_t; |
15 | typedef char sigset_t; |
16 | #define pthread_sigmask(mode,new,old) |
16 | #define pthread_sigmask(mode,new,old) |
17 | #endif |
17 | #endif |
18 | |
18 | |
19 | static pthread_key_t current_key; |
19 | #ifndef SvREFCNT_dec_NN |
|
|
20 | #define SvREFCNT_dec_NN(sv) SvREFCNT_dec (sv) |
|
|
21 | #endif |
|
|
22 | |
|
|
23 | #ifndef SvREFCNT_inc_NN |
|
|
24 | #define SvREFCNT_inc_NN(sv) SvREFCNT_inc (sv) |
|
|
25 | #endif |
|
|
26 | |
|
|
27 | static X_TLS_DECLARE(current_key); |
20 | |
28 | |
21 | static s_epipe ep; |
29 | static s_epipe ep; |
22 | static void *perl_thx; |
30 | static void *perl_thx; |
23 | static sigset_t cursigset, fullsigset; |
31 | static sigset_t cursigset, fullsigset; |
24 | |
32 | |
|
|
33 | static int global_enable = 0; |
|
|
34 | static int thread_enable; /* 0 undefined, 1 disabled, 2 enabled */ |
|
|
35 | |
|
|
36 | /* assigned to a thread for each release/acquire */ |
25 | struct tctx |
37 | struct tctx |
26 | { |
38 | { |
27 | void *coro; |
39 | void *coro; |
28 | xcond_t wait_c; |
|
|
29 | int wait_f; |
40 | int wait_f; |
|
|
41 | xcond_t acquire_c; |
|
|
42 | int jeret; |
30 | }; |
43 | }; |
31 | |
44 | |
32 | static struct tctx *tctx_free; |
45 | static struct tctx *tctx_free; |
33 | |
|
|
34 | static int available; |
|
|
35 | static int max_idle = 8; |
|
|
36 | |
|
|
37 | static xmutex_t perl_m = X_MUTEX_INIT; |
|
|
38 | static xcond_t perl_c = X_COND_INIT; |
|
|
39 | static struct tctx *perl_f; |
|
|
40 | |
|
|
41 | static xmutex_t wait_m = X_MUTEX_INIT; |
|
|
42 | |
|
|
43 | static int wakeup_f; |
|
|
44 | static struct tctx **waiters; |
|
|
45 | static int waiters_count, waiters_max; |
|
|
46 | |
46 | |
47 | static struct tctx * |
47 | static struct tctx * |
48 | tctx_get (void) |
48 | tctx_get (void) |
49 | { |
49 | { |
50 | struct tctx *ctx; |
50 | struct tctx *ctx; |
51 | |
51 | |
52 | if (!tctx_free) |
52 | if (!tctx_free) |
53 | { |
53 | { |
54 | ctx = malloc (sizeof (*tctx_free)); |
54 | ctx = malloc (sizeof (*tctx_free)); |
55 | X_COND_CREATE (ctx->wait_c); |
55 | X_COND_CREATE (ctx->acquire_c); |
56 | } |
56 | } |
57 | else |
57 | else |
58 | { |
58 | { |
59 | ctx = tctx_free; |
59 | ctx = tctx_free; |
60 | tctx_free = tctx_free->coro; |
60 | tctx_free = tctx_free->coro; |
… | |
… | |
68 | { |
68 | { |
69 | ctx->coro = tctx_free; |
69 | ctx->coro = tctx_free; |
70 | tctx_free = ctx; |
70 | tctx_free = ctx; |
71 | } |
71 | } |
72 | |
72 | |
|
|
73 | /* a stack of tctxs */ |
|
|
74 | struct tctxs |
|
|
75 | { |
|
|
76 | struct tctx **ctxs; |
|
|
77 | int cur, max; |
|
|
78 | }; |
|
|
79 | |
|
|
80 | static struct tctx * |
|
|
81 | tctxs_get (struct tctxs *ctxs) |
|
|
82 | { |
|
|
83 | return ctxs->ctxs[--ctxs->cur]; |
|
|
84 | } |
|
|
85 | |
|
|
86 | static void |
|
|
87 | tctxs_put (struct tctxs *ctxs, struct tctx *ctx) |
|
|
88 | { |
|
|
89 | if (ctxs->cur >= ctxs->max) |
|
|
90 | { |
|
|
91 | ctxs->max = ctxs->max ? ctxs->max * 2 : 16; |
|
|
92 | ctxs->ctxs = realloc (ctxs->ctxs, ctxs->max * sizeof (ctxs->ctxs[0])); |
|
|
93 | } |
|
|
94 | |
|
|
95 | ctxs->ctxs[ctxs->cur++] = ctx; |
|
|
96 | } |
|
|
97 | |
|
|
98 | static xmutex_t release_m = X_MUTEX_INIT; |
|
|
99 | static xcond_t release_c = X_COND_INIT; |
|
|
100 | static struct tctxs releasers; |
|
|
101 | static int idle; |
|
|
102 | static int min_idle = 1; |
|
|
103 | static int curthreads, max_threads = 1; /* protected by release_m */ |
|
|
104 | |
|
|
105 | static xmutex_t acquire_m = X_MUTEX_INIT; |
|
|
106 | static struct tctxs acquirers; |
|
|
107 | |
73 | X_THREAD_PROC(thread_proc) |
108 | X_THREAD_PROC(thread_proc) |
74 | { |
109 | { |
75 | PERL_SET_CONTEXT (perl_thx); |
110 | PERL_SET_CONTEXT (perl_thx); |
76 | |
111 | |
77 | { |
112 | { |
78 | dTHX; /* inefficient, we already have perl_thx, but I see no better way */ |
113 | dTHX; /* inefficient, we already have perl_thx, but I see no better way */ |
|
|
114 | dJMPENV; |
79 | struct tctx *ctx; |
115 | struct tctx *ctx; |
|
|
116 | int catchret; |
|
|
117 | |
|
|
118 | X_LOCK (release_m); |
80 | |
119 | |
81 | for (;;) |
120 | for (;;) |
82 | { |
121 | { |
83 | /* TODO: should really use some idle time and exit after that */ |
122 | while (!releasers.cur) |
84 | X_LOCK (perl_m); |
123 | if (idle <= min_idle || 1) |
85 | while (!perl_f) |
|
|
86 | X_COND_WAIT (perl_c, perl_m); |
124 | X_COND_WAIT (release_c, release_m); |
87 | ctx = perl_f; |
125 | else |
88 | perl_f = 0; |
126 | { |
|
|
127 | struct timespec ts = { time (0) + idle - min_idle, 0 }; |
|
|
128 | |
|
|
129 | if (X_COND_TIMEDWAIT (release_c, release_m, ts) == ETIMEDOUT) |
|
|
130 | if (idle > min_idle && !releasers.cur) |
|
|
131 | break; |
|
|
132 | } |
|
|
133 | |
|
|
134 | ctx = tctxs_get (&releasers); |
89 | --available; |
135 | --idle; |
90 | X_UNLOCK (perl_m); |
136 | X_UNLOCK (release_m); |
|
|
137 | |
|
|
138 | if (!ctx) /* timed out? */ |
|
|
139 | break; |
91 | |
140 | |
92 | pthread_sigmask (SIG_SETMASK, &cursigset, 0); |
141 | pthread_sigmask (SIG_SETMASK, &cursigset, 0); |
|
|
142 | JMPENV_PUSH (ctx->jeret); |
93 | |
143 | |
|
|
144 | if (!ctx->jeret) |
94 | while (ctx->coro) |
145 | while (ctx->coro) |
95 | CORO_SCHEDULE; |
146 | CORO_SCHEDULE; |
96 | |
147 | |
|
|
148 | JMPENV_POP; |
97 | pthread_sigmask (SIG_SETMASK, &fullsigset, &cursigset); |
149 | pthread_sigmask (SIG_SETMASK, &fullsigset, &cursigset); |
98 | |
150 | |
99 | X_LOCK (wait_m); |
151 | X_LOCK (acquire_m); |
100 | ctx->wait_f = 1; |
152 | ctx->wait_f = 1; |
101 | X_COND_SIGNAL (ctx->wait_c); |
153 | X_COND_SIGNAL (ctx->acquire_c); |
102 | |
|
|
103 | if (available >= max_idle) |
|
|
104 | { |
|
|
105 | X_UNLOCK (wait_m); |
|
|
106 | break; |
|
|
107 | } |
|
|
108 | |
|
|
109 | ++available; |
|
|
110 | X_UNLOCK (wait_m); |
154 | X_UNLOCK (acquire_m); |
|
|
155 | |
|
|
156 | X_LOCK (release_m); |
|
|
157 | ++idle; |
111 | } |
158 | } |
112 | } |
159 | } |
113 | } |
160 | } |
114 | |
161 | |
115 | static void |
162 | static void |
116 | start_thread (void) |
163 | start_thread (void) |
117 | { |
164 | { |
118 | xthread_t tid; |
165 | xthread_t tid; |
119 | |
166 | |
120 | ++available; |
167 | if (curthreads >= max_threads && 0) |
|
|
168 | return; |
|
|
169 | |
|
|
170 | ++curthreads; |
|
|
171 | ++idle; |
121 | xthread_create (&tid, thread_proc, 0); |
172 | xthread_create (&tid, thread_proc, 0); |
122 | } |
173 | } |
123 | |
174 | |
124 | static void |
175 | static void |
125 | pmapi_release (void) |
176 | pmapi_release (void) |
126 | { |
177 | { |
|
|
178 | if (! ((thread_enable ? thread_enable : global_enable) & 1)) |
|
|
179 | { |
|
|
180 | X_TLS_SET (current_key, 0); |
|
|
181 | return; |
|
|
182 | } |
|
|
183 | |
127 | struct tctx *ctx = tctx_get (); |
184 | struct tctx *ctx = tctx_get (); |
128 | ctx->coro = SvREFCNT_inc_NN (CORO_CURRENT); |
185 | ctx->coro = SvREFCNT_inc_NN (CORO_CURRENT); |
129 | ctx->wait_f = 0; |
186 | ctx->wait_f = 0; |
130 | |
187 | |
131 | pthread_setspecific (current_key, ctx); |
188 | X_TLS_SET (current_key, ctx); |
132 | pthread_sigmask (SIG_SETMASK, &fullsigset, &cursigset); |
189 | pthread_sigmask (SIG_SETMASK, &fullsigset, &cursigset); |
133 | |
190 | |
134 | if (!available) |
191 | X_LOCK (release_m); |
|
|
192 | |
|
|
193 | if (idle <= min_idle) |
135 | start_thread (); |
194 | start_thread (); |
136 | |
195 | |
137 | X_LOCK (perl_m); |
196 | tctxs_put (&releasers, ctx); |
138 | perl_f = ctx; |
|
|
139 | X_COND_SIGNAL (perl_c); |
197 | X_COND_SIGNAL (release_c); |
|
|
198 | |
|
|
199 | while (!idle && releasers.cur) |
|
|
200 | { |
|
|
201 | X_UNLOCK (release_m); |
|
|
202 | X_LOCK (release_m); |
|
|
203 | } |
|
|
204 | |
140 | X_UNLOCK (perl_m); |
205 | X_UNLOCK (release_m); |
141 | } |
206 | } |
142 | |
207 | |
143 | static void |
208 | static void |
144 | pmapi_acquire (void) |
209 | pmapi_acquire (void) |
145 | { |
210 | { |
146 | struct tctx *ctx = pthread_getspecific (current_key); |
211 | int jeret; |
|
|
212 | struct tctx *ctx = X_TLS_GET (current_key); |
147 | |
213 | |
|
|
214 | if (!ctx) |
|
|
215 | return; |
|
|
216 | |
148 | X_LOCK (wait_m); |
217 | X_LOCK (acquire_m); |
149 | |
218 | |
150 | if (waiters_count >= waiters_max) |
219 | tctxs_put (&acquirers, ctx); |
151 | { |
|
|
152 | waiters_max = waiters_max ? waiters_max * 2 : 16; |
|
|
153 | waiters = realloc (waiters, waiters_max * sizeof (*waiters)); |
|
|
154 | } |
|
|
155 | |
|
|
156 | waiters [waiters_count++] = ctx; |
|
|
157 | |
220 | |
158 | s_epipe_signal (&ep); |
221 | s_epipe_signal (&ep); |
159 | while (!ctx->wait_f) |
222 | while (!ctx->wait_f) |
160 | X_COND_WAIT (ctx->wait_c, wait_m); |
223 | X_COND_WAIT (ctx->acquire_c, acquire_m); |
161 | X_UNLOCK (wait_m); |
224 | X_UNLOCK (acquire_m); |
162 | |
225 | |
|
|
226 | jeret = ctx->jeret; |
163 | tctx_put (ctx); |
227 | tctx_put (ctx); |
164 | pthread_sigmask (SIG_SETMASK, &cursigset, 0); |
228 | pthread_sigmask (SIG_SETMASK, &cursigset, 0); |
|
|
229 | |
|
|
230 | if (jeret) |
|
|
231 | JMPENV_JUMP (jeret); |
|
|
232 | } |
|
|
233 | |
|
|
234 | static void |
|
|
235 | set_thread_enable (pTHX_ void *arg) |
|
|
236 | { |
|
|
237 | thread_enable = PTR2IV (arg); |
165 | } |
238 | } |
166 | |
239 | |
167 | MODULE = Coro::Multicore PACKAGE = Coro::Multicore |
240 | MODULE = Coro::Multicore PACKAGE = Coro::Multicore |
168 | |
241 | |
169 | PROTOTYPES: DISABLE |
242 | PROTOTYPES: DISABLE |
… | |
… | |
172 | { |
245 | { |
173 | #ifndef _WIN32 |
246 | #ifndef _WIN32 |
174 | sigfillset (&fullsigset); |
247 | sigfillset (&fullsigset); |
175 | #endif |
248 | #endif |
176 | |
249 | |
177 | pthread_key_create (¤t_key, 0); |
250 | X_TLS_INIT (current_key); |
178 | |
251 | |
179 | if (s_epipe_new (&ep)) |
252 | if (s_epipe_new (&ep)) |
180 | croak ("Coro::Multicore: unable to initialise event pipe.\n"); |
253 | croak ("Coro::Multicore: unable to initialise event pipe.\n"); |
181 | |
254 | |
182 | perl_thx = PERL_GET_CONTEXT; |
255 | perl_thx = PERL_GET_CONTEXT; |
183 | |
256 | |
184 | I_CORO_API ("Coro::Multicore"); |
257 | I_CORO_API ("Coro::Multicore"); |
185 | |
258 | |
|
|
259 | X_LOCK (release_m); |
|
|
260 | while (idle < min_idle) |
|
|
261 | start_thread (); |
|
|
262 | X_UNLOCK (release_m); |
|
|
263 | |
186 | /* not perfectly efficient to do it this way, but it's simple */ |
264 | /* not perfectly efficient to do it this way, but it is simple */ |
187 | perl_multicore_init (); |
265 | perl_multicore_init (); /* calls release */ |
188 | perl_multicore_api->pmapi_release = pmapi_release; |
266 | perl_multicore_api->pmapi_release = pmapi_release; |
189 | perl_multicore_api->pmapi_acquire = pmapi_acquire; |
267 | perl_multicore_api->pmapi_acquire = pmapi_acquire; |
190 | } |
268 | } |
191 | |
269 | |
|
|
270 | bool |
|
|
271 | enable (bool enable = NO_INIT) |
|
|
272 | CODE: |
|
|
273 | RETVAL = global_enable; |
|
|
274 | if (items) |
|
|
275 | global_enable = enable; |
|
|
276 | OUTPUT: |
|
|
277 | RETVAL |
|
|
278 | |
|
|
279 | void |
|
|
280 | scoped_enable () |
|
|
281 | CODE: |
|
|
282 | LEAVE; /* see Guard.xs */ |
|
|
283 | CORO_ENTERLEAVE_SCOPE_HOOK (set_thread_enable, (void *)1, set_thread_enable, (void *)0); |
|
|
284 | ENTER; /* see Guard.xs */ |
|
|
285 | |
|
|
286 | void |
|
|
287 | scoped_disable () |
|
|
288 | CODE: |
|
|
289 | LEAVE; /* see Guard.xs */ |
|
|
290 | CORO_ENTERLEAVE_SCOPE_HOOK (set_thread_enable, (void *)2, set_thread_enable, (void *)0); |
|
|
291 | ENTER; /* see Guard.xs */ |
|
|
292 | |
192 | U32 |
293 | U32 |
193 | max_idle_threads (U32 max = NO_INIT) |
294 | min_idle_threads (U32 min = NO_INIT) |
194 | CODE: |
295 | CODE: |
195 | X_LOCK (wait_m); |
296 | X_LOCK (acquire_m); |
196 | RETVAL = max_idle; |
297 | RETVAL = min_idle; |
197 | if (items) |
298 | if (items) |
198 | max_idle = max; |
299 | min_idle = min; |
199 | X_UNLOCK (wait_m); |
300 | X_UNLOCK (acquire_m); |
200 | OUTPUT: |
301 | OUTPUT: |
201 | RETVAL |
302 | RETVAL |
202 | |
303 | |
203 | |
304 | |
204 | int |
305 | int |
… | |
… | |
210 | |
311 | |
211 | void |
312 | void |
212 | poll (...) |
313 | poll (...) |
213 | CODE: |
314 | CODE: |
214 | s_epipe_drain (&ep); |
315 | s_epipe_drain (&ep); |
215 | X_LOCK (wait_m); |
316 | X_LOCK (acquire_m); |
216 | while (waiters_count) |
317 | while (acquirers.cur) |
217 | { |
318 | { |
218 | struct tctx *ctx = waiters [--waiters_count]; |
319 | struct tctx *ctx = tctxs_get (&acquirers); |
219 | CORO_READY ((SV *)ctx->coro); |
320 | CORO_READY ((SV *)ctx->coro); |
220 | SvREFCNT_dec_NN ((SV *)ctx->coro); |
321 | SvREFCNT_dec_NN ((SV *)ctx->coro); |
221 | ctx->coro = 0; |
322 | ctx->coro = 0; |
222 | } |
323 | } |
223 | X_UNLOCK (wait_m); |
324 | X_UNLOCK (acquire_m); |
224 | |
325 | |
225 | void |
326 | void |
226 | sleep (NV seconds) |
327 | sleep (NV seconds) |
227 | CODE: |
328 | CODE: |
228 | perlinterp_release (); |
329 | perlinterp_release (); |