… | |
… | |
36 | /* assigned to a thread for each release/acquire */ |
36 | /* assigned to a thread for each release/acquire */ |
37 | struct tctx |
37 | struct tctx |
38 | { |
38 | { |
39 | void *coro; |
39 | void *coro; |
40 | int wait_f; |
40 | int wait_f; |
41 | xcond_t wait_c; |
41 | xcond_t acquire_c; |
42 | }; |
42 | }; |
43 | |
43 | |
44 | static struct tctx *tctx_free; |
44 | static struct tctx *tctx_free; |
45 | |
45 | |
|
|
46 | static struct tctx * |
|
|
47 | tctx_get (void) |
|
|
48 | { |
|
|
49 | struct tctx *ctx; |
|
|
50 | |
|
|
51 | if (!tctx_free) |
|
|
52 | { |
|
|
53 | ctx = malloc (sizeof (*tctx_free)); |
|
|
54 | X_COND_CREATE (ctx->acquire_c); |
|
|
55 | } |
|
|
56 | else |
|
|
57 | { |
|
|
58 | ctx = tctx_free; |
|
|
59 | tctx_free = tctx_free->coro; |
|
|
60 | } |
|
|
61 | |
|
|
62 | return ctx; |
|
|
63 | } |
|
|
64 | |
|
|
65 | static void |
|
|
66 | tctx_put (struct tctx *ctx) |
|
|
67 | { |
|
|
68 | ctx->coro = tctx_free; |
|
|
69 | tctx_free = ctx; |
|
|
70 | } |
|
|
71 | |
|
|
72 | /* a stack of tctxs */ |
|
|
73 | struct tctxs |
|
|
74 | { |
|
|
75 | struct tctx **ctxs; |
|
|
76 | int cur, max; |
|
|
77 | }; |
|
|
78 | |
|
|
79 | static struct tctx * |
|
|
80 | tctxs_get (struct tctxs *ctxs) |
|
|
81 | { |
|
|
82 | return ctxs->ctxs[--ctxs->cur]; |
|
|
83 | } |
|
|
84 | |
|
|
85 | static void |
|
|
86 | tctxs_put (struct tctxs *ctxs, struct tctx *ctx) |
|
|
87 | { |
|
|
88 | if (ctxs->cur >= ctxs->max) |
|
|
89 | { |
|
|
90 | ctxs->max = ctxs->max ? ctxs->max * 2 : 16; |
|
|
91 | ctxs->ctxs = realloc (ctxs->ctxs, ctxs->max * sizeof (ctxs->ctxs[0])); |
|
|
92 | } |
|
|
93 | |
|
|
94 | ctxs->ctxs[ctxs->cur++] = ctx; |
|
|
95 | } |
|
|
96 | |
|
|
97 | static xmutex_t release_m = X_MUTEX_INIT; |
|
|
98 | static xcond_t release_c = X_COND_INIT; |
|
|
99 | static struct tctxs releasers; |
46 | static int idle; |
100 | static int idle; |
47 | static int min_idle = 1; |
101 | static int min_idle = 1; |
|
|
102 | static int curthreads, max_threads = 1; /* protected by release_m */ |
48 | |
103 | |
49 | static xmutex_t perl_m = X_MUTEX_INIT; |
|
|
50 | static xcond_t perl_c = X_COND_INIT; |
|
|
51 | static struct tctx *perl_f; |
|
|
52 | |
|
|
53 | static xmutex_t wait_m = X_MUTEX_INIT; |
104 | static xmutex_t acquire_m = X_MUTEX_INIT; |
54 | |
|
|
55 | static int wakeup_f; |
|
|
56 | static struct tctx **waiters; |
105 | static struct tctxs acquirers; |
57 | static int waiters_count, waiters_max; |
|
|
58 | |
|
|
59 | static struct tctx * |
|
|
60 | tctx_get (void) |
|
|
61 | { |
|
|
62 | struct tctx *ctx; |
|
|
63 | |
|
|
64 | if (!tctx_free) |
|
|
65 | { |
|
|
66 | ctx = malloc (sizeof (*tctx_free)); |
|
|
67 | X_COND_CREATE (ctx->wait_c); |
|
|
68 | } |
|
|
69 | else |
|
|
70 | { |
|
|
71 | ctx = tctx_free; |
|
|
72 | tctx_free = tctx_free->coro; |
|
|
73 | } |
|
|
74 | |
|
|
75 | return ctx; |
|
|
76 | } |
|
|
77 | |
|
|
78 | static void |
|
|
79 | tctx_put (struct tctx *ctx) |
|
|
80 | { |
|
|
81 | ctx->coro = tctx_free; |
|
|
82 | tctx_free = ctx; |
|
|
83 | } |
|
|
84 | |
106 | |
85 | X_THREAD_PROC(thread_proc) |
107 | X_THREAD_PROC(thread_proc) |
86 | { |
108 | { |
87 | PERL_SET_CONTEXT (perl_thx); |
109 | PERL_SET_CONTEXT (perl_thx); |
88 | |
110 | |
89 | { |
111 | { |
90 | dTHX; /* inefficient, we already have perl_thx, but I see no better way */ |
112 | dTHX; /* inefficient, we already have perl_thx, but I see no better way */ |
91 | struct tctx *ctx; |
113 | struct tctx *ctx; |
92 | |
114 | |
93 | X_LOCK (perl_m); |
115 | X_LOCK (release_m); |
94 | |
116 | |
95 | for (;;) |
117 | for (;;) |
96 | { |
118 | { |
97 | while (!perl_f) |
119 | while (!releasers.cur) |
98 | if (idle <= min_idle || 1) |
120 | if (idle <= min_idle || 1) |
99 | X_COND_WAIT (perl_c, perl_m); |
121 | X_COND_WAIT (release_c, release_m); |
100 | else |
122 | else |
101 | { |
123 | { |
102 | struct timespec ts = { time (0) + idle - min_idle, 0 }; |
124 | struct timespec ts = { time (0) + idle - min_idle, 0 }; |
103 | |
125 | |
104 | if (X_COND_TIMEDWAIT (perl_c, perl_m, ts) == ETIMEDOUT) |
126 | if (X_COND_TIMEDWAIT (release_c, release_m, ts) == ETIMEDOUT) |
105 | if (idle > min_idle && !perl_f) |
127 | if (idle > min_idle && !releasers.cur) |
106 | break; |
128 | break; |
107 | } |
129 | } |
108 | |
130 | |
109 | ctx = perl_f; |
131 | ctx = tctxs_get (&releasers); |
110 | perl_f = 0; |
|
|
111 | --idle; |
132 | --idle; |
112 | X_UNLOCK (perl_m); |
133 | X_UNLOCK (release_m); |
113 | |
134 | |
114 | if (!ctx) /* timed out? */ |
135 | if (!ctx) /* timed out? */ |
115 | break; |
136 | break; |
116 | |
137 | |
117 | pthread_sigmask (SIG_SETMASK, &cursigset, 0); |
138 | pthread_sigmask (SIG_SETMASK, &cursigset, 0); |
… | |
… | |
119 | while (ctx->coro) |
140 | while (ctx->coro) |
120 | CORO_SCHEDULE; |
141 | CORO_SCHEDULE; |
121 | |
142 | |
122 | pthread_sigmask (SIG_SETMASK, &fullsigset, &cursigset); |
143 | pthread_sigmask (SIG_SETMASK, &fullsigset, &cursigset); |
123 | |
144 | |
124 | X_LOCK (wait_m); |
145 | X_LOCK (acquire_m); |
125 | ctx->wait_f = 1; |
146 | ctx->wait_f = 1; |
126 | X_COND_SIGNAL (ctx->wait_c); |
147 | X_COND_SIGNAL (ctx->acquire_c); |
127 | X_UNLOCK (wait_m); |
148 | X_UNLOCK (acquire_m); |
128 | |
149 | |
129 | X_LOCK (perl_m); |
150 | X_LOCK (release_m); |
130 | ++idle; |
151 | ++idle; |
131 | } |
152 | } |
132 | } |
153 | } |
133 | } |
154 | } |
134 | |
155 | |
135 | static void |
156 | static void |
136 | start_thread (void) |
157 | start_thread (void) |
137 | { |
158 | { |
138 | xthread_t tid; |
159 | xthread_t tid; |
139 | |
160 | |
|
|
161 | if (curthreads >= max_threads && 0) |
|
|
162 | return; |
|
|
163 | |
|
|
164 | ++curthreads; |
140 | ++idle; |
165 | ++idle; |
141 | xthread_create (&tid, thread_proc, 0); |
166 | xthread_create (&tid, thread_proc, 0); |
142 | } |
167 | } |
143 | |
168 | |
144 | static void |
169 | static void |
… | |
… | |
155 | ctx->wait_f = 0; |
180 | ctx->wait_f = 0; |
156 | |
181 | |
157 | pthread_setspecific (current_key, ctx); |
182 | pthread_setspecific (current_key, ctx); |
158 | pthread_sigmask (SIG_SETMASK, &fullsigset, &cursigset); |
183 | pthread_sigmask (SIG_SETMASK, &fullsigset, &cursigset); |
159 | |
184 | |
160 | X_LOCK (perl_m); |
185 | X_LOCK (release_m); |
161 | |
186 | |
162 | if (idle <= min_idle) |
187 | if (idle <= min_idle) |
163 | start_thread (); |
188 | start_thread (); |
164 | |
189 | |
165 | perl_f = ctx; |
190 | tctxs_put (&releasers, ctx); |
166 | X_COND_SIGNAL (perl_c); |
191 | X_COND_SIGNAL (release_c); |
167 | |
192 | |
|
|
193 | while (!idle && releasers.cur) |
|
|
194 | { |
|
|
195 | X_UNLOCK (release_m); |
|
|
196 | X_LOCK (release_m); |
|
|
197 | } |
|
|
198 | |
168 | X_UNLOCK (perl_m); |
199 | X_UNLOCK (release_m); |
169 | } |
200 | } |
170 | |
201 | |
171 | static void |
202 | static void |
172 | pmapi_acquire (void) |
203 | pmapi_acquire (void) |
173 | { |
204 | { |
174 | struct tctx *ctx = pthread_getspecific (current_key); |
205 | struct tctx *ctx = pthread_getspecific (current_key); |
175 | |
206 | |
176 | if (!ctx) |
207 | if (!ctx) |
177 | return; |
208 | return; |
178 | |
209 | |
179 | X_LOCK (wait_m); |
210 | X_LOCK (acquire_m); |
180 | |
211 | |
181 | if (waiters_count >= waiters_max) |
212 | tctxs_put (&acquirers, ctx); |
182 | { |
|
|
183 | waiters_max = waiters_max ? waiters_max * 2 : 16; |
|
|
184 | waiters = realloc (waiters, waiters_max * sizeof (*waiters)); |
|
|
185 | } |
|
|
186 | |
|
|
187 | waiters [waiters_count++] = ctx; |
|
|
188 | |
213 | |
189 | s_epipe_signal (&ep); |
214 | s_epipe_signal (&ep); |
190 | while (!ctx->wait_f) |
215 | while (!ctx->wait_f) |
191 | X_COND_WAIT (ctx->wait_c, wait_m); |
216 | X_COND_WAIT (ctx->acquire_c, acquire_m); |
192 | X_UNLOCK (wait_m); |
217 | X_UNLOCK (acquire_m); |
193 | |
218 | |
194 | tctx_put (ctx); |
219 | tctx_put (ctx); |
195 | pthread_sigmask (SIG_SETMASK, &cursigset, 0); |
220 | pthread_sigmask (SIG_SETMASK, &cursigset, 0); |
196 | } |
221 | } |
197 | |
222 | |
… | |
… | |
207 | |
232 | |
208 | BOOT: |
233 | BOOT: |
209 | { |
234 | { |
210 | #ifndef _WIN32 |
235 | #ifndef _WIN32 |
211 | sigfillset (&fullsigset); |
236 | sigfillset (&fullsigset); |
|
|
237 | sigemptyset (&fullsigset); |
212 | #endif |
238 | #endif |
213 | |
239 | |
214 | pthread_key_create (¤t_key, 0); |
240 | pthread_key_create (¤t_key, 0); |
215 | |
241 | |
216 | if (s_epipe_new (&ep)) |
242 | if (s_epipe_new (&ep)) |
… | |
… | |
218 | |
244 | |
219 | perl_thx = PERL_GET_CONTEXT; |
245 | perl_thx = PERL_GET_CONTEXT; |
220 | |
246 | |
221 | I_CORO_API ("Coro::Multicore"); |
247 | I_CORO_API ("Coro::Multicore"); |
222 | |
248 | |
223 | X_LOCK (perl_m); |
249 | X_LOCK (release_m); |
224 | while (idle < min_idle) |
250 | while (idle < min_idle) |
225 | start_thread (); |
251 | start_thread (); |
226 | start_thread ();//D |
|
|
227 | X_UNLOCK (perl_m); |
252 | X_UNLOCK (release_m); |
228 | |
253 | |
229 | /* not perfectly efficient to do it this way, but it's simple */ |
254 | /* not perfectly efficient to do it this way, but it's simple */ |
230 | perl_multicore_init (); |
255 | perl_multicore_init (); |
231 | perl_multicore_api->pmapi_release = pmapi_release; |
256 | perl_multicore_api->pmapi_release = pmapi_release; |
232 | perl_multicore_api->pmapi_acquire = pmapi_acquire; |
257 | perl_multicore_api->pmapi_acquire = pmapi_acquire; |
… | |
… | |
256 | ENTER; /* see Guard.xs */ |
281 | ENTER; /* see Guard.xs */ |
257 | |
282 | |
258 | U32 |
283 | U32 |
259 | min_idle_threads (U32 min = NO_INIT) |
284 | min_idle_threads (U32 min = NO_INIT) |
260 | CODE: |
285 | CODE: |
261 | X_LOCK (wait_m); |
286 | X_LOCK (acquire_m); |
262 | RETVAL = min_idle; |
287 | RETVAL = min_idle; |
263 | if (items) |
288 | if (items) |
264 | min_idle = min; |
289 | min_idle = min; |
265 | X_UNLOCK (wait_m); |
290 | X_UNLOCK (acquire_m); |
266 | OUTPUT: |
291 | OUTPUT: |
267 | RETVAL |
292 | RETVAL |
268 | |
293 | |
269 | |
294 | |
270 | int |
295 | int |
… | |
… | |
276 | |
301 | |
277 | void |
302 | void |
278 | poll (...) |
303 | poll (...) |
279 | CODE: |
304 | CODE: |
280 | s_epipe_drain (&ep); |
305 | s_epipe_drain (&ep); |
281 | X_LOCK (wait_m); |
306 | X_LOCK (acquire_m); |
282 | while (waiters_count) |
307 | while (acquirers.cur) |
283 | { |
308 | { |
284 | struct tctx *ctx = waiters [--waiters_count]; |
309 | struct tctx *ctx = tctxs_get (&acquirers); |
285 | CORO_READY ((SV *)ctx->coro); |
310 | CORO_READY ((SV *)ctx->coro); |
286 | SvREFCNT_dec_NN ((SV *)ctx->coro); |
311 | SvREFCNT_dec_NN ((SV *)ctx->coro); |
287 | ctx->coro = 0; |
312 | ctx->coro = 0; |
288 | } |
313 | } |
289 | X_UNLOCK (wait_m); |
314 | X_UNLOCK (acquire_m); |
290 | |
315 | |
291 | void |
316 | void |
292 | sleep (NV seconds) |
317 | sleep (NV seconds) |
293 | CODE: |
318 | CODE: |
294 | perlinterp_release (); |
319 | perlinterp_release (); |