|
|
1 | /* most win32 perls are beyond fixing, requiring dTHX */ |
|
|
2 | /* even for ISO-C functions such as malloc. avoid! avoid! avoid! */ |
|
|
3 | /* and fail to define numerous symbols, but still overrwide them */ |
|
|
4 | /* with non-working versions (e.g. setjmp). */ |
|
|
5 | #ifdef _WIN32 |
|
|
6 | /*# define PERL_CORE 1 fixes some, breaks others */ |
|
|
7 | #else |
1 | #define PERL_NO_GET_CONTEXT |
8 | # define PERL_NO_GET_CONTEXT |
|
|
9 | #endif |
2 | |
10 | |
3 | #include "EXTERN.h" |
11 | #include "EXTERN.h" |
4 | #include "perl.h" |
12 | #include "perl.h" |
5 | #include "XSUB.h" |
13 | #include "XSUB.h" |
6 | |
14 | |
7 | #define X_STACKSIZE -1 |
15 | #define X_STACKSIZE 1024 * sizeof (void *) |
8 | |
16 | |
9 | #include "CoroAPI.h" |
17 | #include "CoroAPI.h" |
10 | #include "perlmulticore.h" |
18 | #include "perlmulticore.h" |
11 | #include "schmorp.h" |
19 | #include "schmorp.h" |
12 | #include "xthread.h" |
20 | #include "xthread.h" |
13 | |
21 | |
14 | #ifdef _WIN32 |
22 | #ifdef _WIN32 |
15 | typedef char sigset_t; |
23 | #ifndef sigset_t |
16 | #define pthread_sigmask(mode,new,old) |
24 | #define sigset_t int |
17 | #endif |
25 | #endif |
|
|
26 | #endif |
18 | |
27 | |
19 | static pthread_key_t current_key; |
28 | #ifndef SvREFCNT_dec_NN |
|
|
29 | #define SvREFCNT_dec_NN(sv) SvREFCNT_dec (sv) |
|
|
30 | #endif |
|
|
31 | |
|
|
32 | #ifndef SvREFCNT_inc_NN |
|
|
33 | #define SvREFCNT_inc_NN(sv) SvREFCNT_inc (sv) |
|
|
34 | #endif |
|
|
35 | |
|
|
36 | #define RECURSION_CHECK 0 |
|
|
37 | |
|
|
38 | static X_TLS_DECLARE(current_key); |
|
|
39 | #if RECURSION_CHECK |
|
|
40 | static X_TLS_DECLARE(check_key); |
|
|
41 | #endif |
|
|
42 | |
20 | |
43 | |
21 | static s_epipe ep; |
44 | static s_epipe ep; |
22 | static void *perl_thx; |
45 | static void *perl_thx; |
23 | static sigset_t cursigset, fullsigset; |
46 | static sigset_t cursigset, fullsigset; |
24 | |
47 | |
25 | static int global_enable = 1; |
48 | static int global_enable = 0; |
26 | static int thread_enable; /* 0 undefined, 1 disabled, 2 enabled */ |
49 | static int thread_enable; /* 0 undefined, 1 disabled, 2 enabled */ |
27 | |
50 | |
|
|
51 | /* assigned to a thread for each release/acquire */ |
28 | struct tctx |
52 | struct tctx |
29 | { |
53 | { |
30 | void *coro; |
54 | void *coro; |
31 | xcond_t wait_c; |
|
|
32 | int wait_f; |
55 | int wait_f; |
|
|
56 | xcond_t acquire_c; |
|
|
57 | int jeret; |
33 | }; |
58 | }; |
34 | |
59 | |
35 | static struct tctx *tctx_free; |
60 | static struct tctx *tctx_free; |
36 | |
|
|
37 | static int idle_timeout; |
|
|
38 | static int idle; |
|
|
39 | static int max_idle = 8; |
|
|
40 | |
|
|
41 | static xmutex_t perl_m = X_MUTEX_INIT; |
|
|
42 | static xcond_t perl_c = X_COND_INIT; |
|
|
43 | static struct tctx *perl_f; |
|
|
44 | |
|
|
45 | static xmutex_t wait_m = X_MUTEX_INIT; |
|
|
46 | |
|
|
47 | static int wakeup_f; |
|
|
48 | static struct tctx **waiters; |
|
|
49 | static int waiters_count, waiters_max; |
|
|
50 | |
61 | |
51 | static struct tctx * |
62 | static struct tctx * |
52 | tctx_get (void) |
63 | tctx_get (void) |
53 | { |
64 | { |
54 | struct tctx *ctx; |
65 | struct tctx *ctx; |
55 | |
66 | |
56 | if (!tctx_free) |
67 | if (!tctx_free) |
57 | { |
68 | { |
58 | ctx = malloc (sizeof (*tctx_free)); |
69 | ctx = malloc (sizeof (*tctx_free)); |
59 | X_COND_CREATE (ctx->wait_c); |
70 | X_COND_CREATE (ctx->acquire_c); |
60 | } |
71 | } |
61 | else |
72 | else |
62 | { |
73 | { |
63 | ctx = tctx_free; |
74 | ctx = tctx_free; |
64 | tctx_free = tctx_free->coro; |
75 | tctx_free = tctx_free->coro; |
… | |
… | |
72 | { |
83 | { |
73 | ctx->coro = tctx_free; |
84 | ctx->coro = tctx_free; |
74 | tctx_free = ctx; |
85 | tctx_free = ctx; |
75 | } |
86 | } |
76 | |
87 | |
|
|
88 | /* a stack of tctxs */ |
|
|
89 | struct tctxs |
|
|
90 | { |
|
|
91 | struct tctx **ctxs; |
|
|
92 | int cur, max; |
|
|
93 | }; |
|
|
94 | |
|
|
95 | static struct tctx * |
|
|
96 | tctxs_get (struct tctxs *ctxs) |
|
|
97 | { |
|
|
98 | return ctxs->ctxs[--ctxs->cur]; |
|
|
99 | } |
|
|
100 | |
|
|
101 | static void |
|
|
102 | tctxs_put (struct tctxs *ctxs, struct tctx *ctx) |
|
|
103 | { |
|
|
104 | if (ctxs->cur >= ctxs->max) |
|
|
105 | { |
|
|
106 | ctxs->max = ctxs->max ? ctxs->max * 2 : 16; |
|
|
107 | ctxs->ctxs = realloc (ctxs->ctxs, ctxs->max * sizeof (ctxs->ctxs[0])); |
|
|
108 | } |
|
|
109 | |
|
|
110 | ctxs->ctxs[ctxs->cur++] = ctx; |
|
|
111 | } |
|
|
112 | |
|
|
113 | static xmutex_t release_m = X_MUTEX_INIT; |
|
|
114 | static xcond_t release_c = X_COND_INIT; |
|
|
115 | static struct tctxs releasers; |
|
|
116 | static int idle; |
|
|
117 | static int min_idle = 1; |
|
|
118 | static int curthreads, max_threads = 1; /* protected by release_m */ |
|
|
119 | |
|
|
120 | static xmutex_t acquire_m = X_MUTEX_INIT; |
|
|
121 | static struct tctxs acquirers; |
|
|
122 | |
77 | X_THREAD_PROC(thread_proc) |
123 | X_THREAD_PROC(thread_proc) |
78 | { |
124 | { |
79 | PERL_SET_CONTEXT (perl_thx); |
125 | PERL_SET_CONTEXT (perl_thx); |
80 | |
126 | |
81 | { |
127 | { |
82 | dTHX; /* inefficient, we already have perl_thx, but I see no better way */ |
128 | dTHXa (perl_thx); |
|
|
129 | dJMPENV; |
83 | struct tctx *ctx; |
130 | struct tctx *ctx; |
|
|
131 | int catchret; |
|
|
132 | |
|
|
133 | X_LOCK (release_m); |
84 | |
134 | |
85 | for (;;) |
135 | for (;;) |
86 | { |
136 | { |
87 | /* TODO: should really use some idle time and exit after that */ |
137 | while (!releasers.cur) |
88 | X_LOCK (perl_m); |
138 | if (idle <= min_idle || 1) |
89 | while (!perl_f) |
|
|
90 | X_COND_WAIT (perl_c, perl_m); |
139 | X_COND_WAIT (release_c, release_m); |
91 | ctx = perl_f; |
140 | else |
92 | perl_f = 0; |
141 | { |
|
|
142 | struct timespec ts = { time (0) + idle - min_idle, 0 }; |
|
|
143 | |
|
|
144 | if (X_COND_TIMEDWAIT (release_c, release_m, ts) == ETIMEDOUT) |
|
|
145 | if (idle > min_idle && !releasers.cur) |
|
|
146 | break; |
|
|
147 | } |
|
|
148 | |
|
|
149 | ctx = tctxs_get (&releasers); |
93 | --available; |
150 | --idle; |
94 | X_UNLOCK (perl_m); |
151 | X_UNLOCK (release_m); |
|
|
152 | |
|
|
153 | if (!ctx) /* timed out? */ |
|
|
154 | break; |
95 | |
155 | |
96 | pthread_sigmask (SIG_SETMASK, &cursigset, 0); |
156 | pthread_sigmask (SIG_SETMASK, &cursigset, 0); |
|
|
157 | JMPENV_PUSH (ctx->jeret); |
97 | |
158 | |
|
|
159 | if (!ctx->jeret) |
98 | while (ctx->coro) |
160 | while (ctx->coro) |
99 | CORO_SCHEDULE; |
161 | CORO_SCHEDULE; |
100 | |
162 | |
|
|
163 | JMPENV_POP; |
101 | pthread_sigmask (SIG_SETMASK, &fullsigset, &cursigset); |
164 | pthread_sigmask (SIG_SETMASK, &fullsigset, &cursigset); |
102 | |
165 | |
103 | X_LOCK (wait_m); |
166 | X_LOCK (acquire_m); |
104 | ctx->wait_f = 1; |
167 | ctx->wait_f = 1; |
105 | X_COND_SIGNAL (ctx->wait_c); |
168 | X_COND_SIGNAL (ctx->acquire_c); |
106 | |
|
|
107 | if (available >= max_idle) |
|
|
108 | { |
|
|
109 | X_UNLOCK (wait_m); |
|
|
110 | break; |
|
|
111 | } |
|
|
112 | |
|
|
113 | ++available; |
|
|
114 | X_UNLOCK (wait_m); |
169 | X_UNLOCK (acquire_m); |
|
|
170 | |
|
|
171 | X_LOCK (release_m); |
|
|
172 | ++idle; |
115 | } |
173 | } |
116 | } |
174 | } |
117 | } |
175 | } |
118 | |
176 | |
119 | static void |
177 | static void |
120 | start_thread (void) |
178 | start_thread (void) |
121 | { |
179 | { |
122 | xthread_t tid; |
180 | xthread_t tid; |
123 | |
181 | |
124 | ++available; |
182 | if (curthreads >= max_threads && 0) |
|
|
183 | return; |
|
|
184 | |
|
|
185 | ++curthreads; |
|
|
186 | ++idle; |
125 | xthread_create (&tid, thread_proc, 0); |
187 | xthread_create (&tid, thread_proc, 0); |
126 | } |
188 | } |
127 | |
189 | |
128 | static void |
190 | static void |
129 | pmapi_release (void) |
191 | pmapi_release (void) |
130 | { |
192 | { |
|
|
193 | #if RECURSION_CHECK |
|
|
194 | if (X_TLS_GET (check_key)) |
|
|
195 | croak ("perlinterp_release () called without valid perl context"); |
|
|
196 | |
|
|
197 | X_TLS_SET (check_key, &check_key); |
|
|
198 | #endif |
|
|
199 | |
131 | if (!(thread_enable ? thread_enable & 1 : global_enable)) |
200 | if (! ((thread_enable ? thread_enable : global_enable) & 1)) |
132 | { |
201 | { |
133 | pthread_setspecific (current_key, 0); |
202 | X_TLS_SET (current_key, 0); |
134 | return; |
203 | return; |
135 | } |
204 | } |
136 | |
205 | |
137 | struct tctx *ctx = tctx_get (); |
206 | struct tctx *ctx = tctx_get (); |
138 | ctx->coro = SvREFCNT_inc_NN (CORO_CURRENT); |
207 | ctx->coro = SvREFCNT_inc_simple_NN (CORO_CURRENT); |
139 | ctx->wait_f = 0; |
208 | ctx->wait_f = 0; |
140 | |
209 | |
141 | pthread_setspecific (current_key, ctx); |
210 | X_TLS_SET (current_key, ctx); |
142 | pthread_sigmask (SIG_SETMASK, &fullsigset, &cursigset); |
211 | pthread_sigmask (SIG_SETMASK, &fullsigset, &cursigset); |
143 | |
212 | |
144 | if (!available) |
213 | X_LOCK (release_m); |
|
|
214 | |
|
|
215 | if (idle <= min_idle) |
145 | start_thread (); |
216 | start_thread (); |
146 | |
217 | |
147 | X_LOCK (perl_m); |
218 | tctxs_put (&releasers, ctx); |
148 | perl_f = ctx; |
|
|
149 | X_COND_SIGNAL (perl_c); |
219 | X_COND_SIGNAL (release_c); |
|
|
220 | |
|
|
221 | while (!idle && releasers.cur) |
|
|
222 | { |
|
|
223 | X_UNLOCK (release_m); |
|
|
224 | X_LOCK (release_m); |
|
|
225 | } |
|
|
226 | |
150 | X_UNLOCK (perl_m); |
227 | X_UNLOCK (release_m); |
151 | } |
228 | } |
152 | |
229 | |
153 | static void |
230 | static void |
154 | pmapi_acquire (void) |
231 | pmapi_acquire (void) |
155 | { |
232 | { |
156 | struct tctx *ctx = pthread_getspecific (current_key); |
233 | int jeret; |
|
|
234 | struct tctx *ctx = X_TLS_GET (current_key); |
|
|
235 | |
|
|
236 | #if RECURSION_CHECK |
|
|
237 | if (X_TLS_GET (check_key) != &check_key) |
|
|
238 | croak ("perlinterp_acquire () called with valid perl context"); |
|
|
239 | |
|
|
240 | X_TLS_SET (check_key, 0); |
|
|
241 | #endif |
157 | |
242 | |
158 | if (!ctx) |
243 | if (!ctx) |
159 | return; |
244 | return; |
160 | |
245 | |
161 | X_LOCK (wait_m); |
246 | X_LOCK (acquire_m); |
162 | |
247 | |
163 | if (waiters_count >= waiters_max) |
248 | tctxs_put (&acquirers, ctx); |
164 | { |
|
|
165 | waiters_max = waiters_max ? waiters_max * 2 : 16; |
|
|
166 | waiters = realloc (waiters, waiters_max * sizeof (*waiters)); |
|
|
167 | } |
|
|
168 | |
|
|
169 | waiters [waiters_count++] = ctx; |
|
|
170 | |
249 | |
171 | s_epipe_signal (&ep); |
250 | s_epipe_signal (&ep); |
172 | while (!ctx->wait_f) |
251 | while (!ctx->wait_f) |
173 | X_COND_WAIT (ctx->wait_c, wait_m); |
252 | X_COND_WAIT (ctx->acquire_c, acquire_m); |
174 | X_UNLOCK (wait_m); |
253 | X_UNLOCK (acquire_m); |
175 | |
254 | |
|
|
255 | jeret = ctx->jeret; |
176 | tctx_put (ctx); |
256 | tctx_put (ctx); |
177 | pthread_sigmask (SIG_SETMASK, &cursigset, 0); |
257 | pthread_sigmask (SIG_SETMASK, &cursigset, 0); |
178 | } |
|
|
179 | |
258 | |
180 | static void |
259 | if (jeret) |
181 | set_enable_0 (pTHX) |
260 | { |
182 | { |
261 | dTHX; |
183 | thread_enable = 0; |
262 | JMPENV_JUMP (jeret); |
|
|
263 | } |
184 | } |
264 | } |
185 | |
265 | |
186 | static void |
266 | static void |
187 | set_enable_1 (pTHX) |
267 | set_thread_enable (pTHX_ void *arg) |
188 | { |
268 | { |
189 | thread_enable = 1; |
|
|
190 | } |
|
|
191 | |
|
|
192 | static void |
|
|
193 | set_enable_2 (pTHX) |
|
|
194 | { |
|
|
195 | thread_enable = 2; |
269 | thread_enable = PTR2IV (arg); |
196 | } |
270 | } |
197 | |
271 | |
198 | MODULE = Coro::Multicore PACKAGE = Coro::Multicore |
272 | MODULE = Coro::Multicore PACKAGE = Coro::Multicore |
199 | |
273 | |
200 | PROTOTYPES: DISABLE |
274 | PROTOTYPES: DISABLE |
201 | |
275 | |
202 | BOOT: |
276 | BOOT: |
203 | { |
277 | { |
204 | #ifndef _WIN32 |
278 | #ifndef _WIN32 |
205 | sigfillset (&fullsigset); |
279 | sigfillset (&fullsigset); |
206 | #endif |
280 | #endif |
207 | |
281 | |
208 | pthread_key_create (¤t_key, 0); |
282 | X_TLS_INIT (current_key); |
|
|
283 | #if RECURSION_CHECK |
|
|
284 | X_TLS_INIT (check_key); |
|
|
285 | #endif |
209 | |
286 | |
210 | if (s_epipe_new (&ep)) |
287 | if (s_epipe_new (&ep)) |
211 | croak ("Coro::Multicore: unable to initialise event pipe.\n"); |
288 | croak ("Coro::Multicore: unable to initialise event pipe.\n"); |
212 | |
289 | |
213 | perl_thx = PERL_GET_CONTEXT; |
290 | perl_thx = PERL_GET_CONTEXT; |
214 | |
291 | |
215 | I_CORO_API ("Coro::Multicore"); |
292 | I_CORO_API ("Coro::Multicore"); |
216 | |
293 | |
|
|
294 | X_LOCK (release_m); |
|
|
295 | while (idle < min_idle) |
|
|
296 | start_thread (); |
|
|
297 | X_UNLOCK (release_m); |
|
|
298 | |
217 | /* not perfectly efficient to do it this way, but it's simple */ |
299 | /* not perfectly efficient to do it this way, but it is simple */ |
218 | perl_multicore_init (); |
300 | perl_multicore_init (); /* calls release */ |
219 | perl_multicore_api->pmapi_release = pmapi_release; |
301 | perl_multicore_api->pmapi_release = pmapi_release; |
220 | perl_multicore_api->pmapi_acquire = pmapi_acquire; |
302 | perl_multicore_api->pmapi_acquire = pmapi_acquire; |
221 | } |
303 | } |
222 | |
304 | |
223 | bool |
305 | bool |
… | |
… | |
231 | |
313 | |
232 | void |
314 | void |
233 | scoped_enable () |
315 | scoped_enable () |
234 | CODE: |
316 | CODE: |
235 | LEAVE; /* see Guard.xs */ |
317 | LEAVE; /* see Guard.xs */ |
236 | CORO_ENTERLEAVE_SCOPE_HOOK (set_enable_1, set_enable_0); |
318 | CORO_ENTERLEAVE_SCOPE_HOOK (set_thread_enable, (void *)1, set_thread_enable, (void *)0); |
237 | ENTER; /* see Guard.xs */ |
319 | ENTER; /* see Guard.xs */ |
238 | |
320 | |
239 | void |
321 | void |
240 | scoped_disable () |
322 | scoped_disable () |
241 | CODE: |
323 | CODE: |
242 | LEAVE; /* see Guard.xs */ |
324 | LEAVE; /* see Guard.xs */ |
243 | CORO_ENTERLEAVE_SCOPE_HOOK (set_enable_2, set_enable_0); |
325 | CORO_ENTERLEAVE_SCOPE_HOOK (set_thread_enable, (void *)2, set_thread_enable, (void *)0); |
244 | ENTER; /* see Guard.xs */ |
326 | ENTER; /* see Guard.xs */ |
245 | |
327 | |
246 | U32 |
328 | U32 |
247 | max_idle_threads (U32 max = NO_INIT) |
329 | min_idle_threads (U32 min = NO_INIT) |
248 | CODE: |
330 | CODE: |
249 | X_LOCK (wait_m); |
331 | X_LOCK (acquire_m); |
250 | RETVAL = max_idle; |
332 | RETVAL = min_idle; |
251 | if (items) |
333 | if (items) |
252 | max_idle = max; |
334 | min_idle = min; |
253 | X_UNLOCK (wait_m); |
335 | X_UNLOCK (acquire_m); |
254 | OUTPUT: |
336 | OUTPUT: |
255 | RETVAL |
337 | RETVAL |
256 | |
338 | |
257 | |
339 | |
258 | int |
340 | int |
… | |
… | |
264 | |
346 | |
265 | void |
347 | void |
266 | poll (...) |
348 | poll (...) |
267 | CODE: |
349 | CODE: |
268 | s_epipe_drain (&ep); |
350 | s_epipe_drain (&ep); |
269 | X_LOCK (wait_m); |
351 | X_LOCK (acquire_m); |
270 | while (waiters_count) |
352 | while (acquirers.cur) |
271 | { |
353 | { |
272 | struct tctx *ctx = waiters [--waiters_count]; |
354 | struct tctx *ctx = tctxs_get (&acquirers); |
273 | CORO_READY ((SV *)ctx->coro); |
355 | CORO_READY ((SV *)ctx->coro); |
274 | SvREFCNT_dec_NN ((SV *)ctx->coro); |
356 | SvREFCNT_dec_NN ((SV *)ctx->coro); |
275 | ctx->coro = 0; |
357 | ctx->coro = 0; |
276 | } |
358 | } |
277 | X_UNLOCK (wait_m); |
359 | X_UNLOCK (acquire_m); |
278 | |
360 | |
279 | void |
361 | void |
280 | sleep (NV seconds) |
362 | sleep (NV seconds) |
281 | CODE: |
363 | CODE: |
282 | perlinterp_release (); |
364 | perlinterp_release (); |
283 | usleep (seconds * 1e6); |
365 | { |
|
|
366 | int nsec = seconds; |
|
|
367 | if (nsec) sleep (nsec); |
|
|
368 | nsec = (seconds - nsec) * 1e9; |
|
|
369 | if (nsec) usleep (nsec); |
|
|
370 | } |
284 | perlinterp_acquire (); |
371 | perlinterp_acquire (); |
285 | |
372 | |