ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/Coro-Multicore/Multicore.xs
(Generate patch)

Comparing Coro-Multicore/Multicore.xs (file contents):
Revision 1.5 by root, Sun Jun 28 18:43:17 2015 UTC vs.
Revision 1.21 by root, Sun Aug 26 15:30:55 2018 UTC

1/* most win32 perls are beyond fixing, requiring dTHX */
2/* even for ISO-C functions such as malloc. avoid! avoid! avoid! */
3/* and fail to define numerous symbols, but still overrwide them */
4/* with non-working versions (e.g. setjmp). */
5#ifdef _WIN32
6/*# define PERL_CORE 1 fixes some, breaks others */
7#else
1#define PERL_NO_GET_CONTEXT 8# define PERL_NO_GET_CONTEXT
9#endif
2 10
3#include "EXTERN.h" 11#include "EXTERN.h"
4#include "perl.h" 12#include "perl.h"
5#include "XSUB.h" 13#include "XSUB.h"
6 14
7#define X_STACKSIZE -1 15#define X_STACKSIZE 1024 * sizeof (void *)
8 16
9#include "CoroAPI.h" 17#include "CoroAPI.h"
10#include "perlmulticore.h" 18#include "perlmulticore.h"
11#include "schmorp.h" 19#include "schmorp.h"
12#include "xthread.h" 20#include "xthread.h"
13 21
14#ifdef _WIN32 22#ifdef _WIN32
15 typedef char sigset_t; 23 #ifndef sigset_t
16 #define pthread_sigmask(mode,new,old) 24 #define sigset_t int
17#endif 25 #endif
26#endif
18 27
19static pthread_key_t current_key; 28#ifndef SvREFCNT_dec_NN
29 #define SvREFCNT_dec_NN(sv) SvREFCNT_dec (sv)
30#endif
31
32#ifndef SvREFCNT_inc_NN
33 #define SvREFCNT_inc_NN(sv) SvREFCNT_inc (sv)
34#endif
35
36#define RECURSION_CHECK 0
37
38static X_TLS_DECLARE(current_key);
39#if RECURSION_CHECK
40static X_TLS_DECLARE(check_key);
41#endif
42
20 43
21static s_epipe ep; 44static s_epipe ep;
22static void *perl_thx; 45static void *perl_thx;
23static sigset_t cursigset, fullsigset; 46static sigset_t cursigset, fullsigset;
24 47
25static int global_enable = 1; 48static int global_enable = 0;
26static int thread_enable; /* 0 undefined, 1 disabled, 2 enabled */ 49static int thread_enable; /* 0 undefined, 1 disabled, 2 enabled */
27 50
51/* assigned to a thread for each release/acquire */
28struct tctx 52struct tctx
29{ 53{
30 void *coro; 54 void *coro;
31 xcond_t wait_c;
32 int wait_f; 55 int wait_f;
56 xcond_t acquire_c;
57 int jeret;
33}; 58};
34 59
35static struct tctx *tctx_free; 60static struct tctx *tctx_free;
36
37static int idle_timeout;
38static int idle;
39static int max_idle = 8;
40
41static xmutex_t perl_m = X_MUTEX_INIT;
42static xcond_t perl_c = X_COND_INIT;
43static struct tctx *perl_f;
44
45static xmutex_t wait_m = X_MUTEX_INIT;
46
47static int wakeup_f;
48static struct tctx **waiters;
49static int waiters_count, waiters_max;
50 61
51static struct tctx * 62static struct tctx *
52tctx_get (void) 63tctx_get (void)
53{ 64{
54 struct tctx *ctx; 65 struct tctx *ctx;
55 66
56 if (!tctx_free) 67 if (!tctx_free)
57 { 68 {
58 ctx = malloc (sizeof (*tctx_free)); 69 ctx = malloc (sizeof (*tctx_free));
59 X_COND_CREATE (ctx->wait_c); 70 X_COND_CREATE (ctx->acquire_c);
60 } 71 }
61 else 72 else
62 { 73 {
63 ctx = tctx_free; 74 ctx = tctx_free;
64 tctx_free = tctx_free->coro; 75 tctx_free = tctx_free->coro;
72{ 83{
73 ctx->coro = tctx_free; 84 ctx->coro = tctx_free;
74 tctx_free = ctx; 85 tctx_free = ctx;
75} 86}
76 87
88/* a stack of tctxs */
89struct tctxs
90{
91 struct tctx **ctxs;
92 int cur, max;
93};
94
95static struct tctx *
96tctxs_get (struct tctxs *ctxs)
97{
98 return ctxs->ctxs[--ctxs->cur];
99}
100
101static void
102tctxs_put (struct tctxs *ctxs, struct tctx *ctx)
103{
104 if (ctxs->cur >= ctxs->max)
105 {
106 ctxs->max = ctxs->max ? ctxs->max * 2 : 16;
107 ctxs->ctxs = realloc (ctxs->ctxs, ctxs->max * sizeof (ctxs->ctxs[0]));
108 }
109
110 ctxs->ctxs[ctxs->cur++] = ctx;
111}
112
113static xmutex_t release_m = X_MUTEX_INIT;
114static xcond_t release_c = X_COND_INIT;
115static struct tctxs releasers;
116static int idle;
117static int min_idle = 1;
118static int curthreads, max_threads = 1; /* protected by release_m */
119
120static xmutex_t acquire_m = X_MUTEX_INIT;
121static struct tctxs acquirers;
122
77X_THREAD_PROC(thread_proc) 123X_THREAD_PROC(thread_proc)
78{ 124{
79 PERL_SET_CONTEXT (perl_thx); 125 PERL_SET_CONTEXT (perl_thx);
80 126
81 { 127 {
82 dTHX; /* inefficient, we already have perl_thx, but I see no better way */ 128 dTHXa (perl_thx);
129 dJMPENV;
83 struct tctx *ctx; 130 struct tctx *ctx;
131 int catchret;
132
133 X_LOCK (release_m);
84 134
85 for (;;) 135 for (;;)
86 { 136 {
87 /* TODO: should really use some idle time and exit after that */ 137 while (!releasers.cur)
88 X_LOCK (perl_m); 138 if (idle <= min_idle || 1)
89 while (!perl_f)
90 X_COND_WAIT (perl_c, perl_m); 139 X_COND_WAIT (release_c, release_m);
91 ctx = perl_f; 140 else
92 perl_f = 0; 141 {
142 struct timespec ts = { time (0) + idle - min_idle, 0 };
143
144 if (X_COND_TIMEDWAIT (release_c, release_m, ts) == ETIMEDOUT)
145 if (idle > min_idle && !releasers.cur)
146 break;
147 }
148
149 ctx = tctxs_get (&releasers);
93 --available; 150 --idle;
94 X_UNLOCK (perl_m); 151 X_UNLOCK (release_m);
152
153 if (!ctx) /* timed out? */
154 break;
95 155
96 pthread_sigmask (SIG_SETMASK, &cursigset, 0); 156 pthread_sigmask (SIG_SETMASK, &cursigset, 0);
157 JMPENV_PUSH (ctx->jeret);
97 158
159 if (!ctx->jeret)
98 while (ctx->coro) 160 while (ctx->coro)
99 CORO_SCHEDULE; 161 CORO_SCHEDULE;
100 162
163 JMPENV_POP;
101 pthread_sigmask (SIG_SETMASK, &fullsigset, &cursigset); 164 pthread_sigmask (SIG_SETMASK, &fullsigset, &cursigset);
102 165
103 X_LOCK (wait_m); 166 X_LOCK (acquire_m);
104 ctx->wait_f = 1; 167 ctx->wait_f = 1;
105 X_COND_SIGNAL (ctx->wait_c); 168 X_COND_SIGNAL (ctx->acquire_c);
106
107 if (available >= max_idle)
108 {
109 X_UNLOCK (wait_m);
110 break;
111 }
112
113 ++available;
114 X_UNLOCK (wait_m); 169 X_UNLOCK (acquire_m);
170
171 X_LOCK (release_m);
172 ++idle;
115 } 173 }
116 } 174 }
117} 175}
118 176
119static void 177static void
120start_thread (void) 178start_thread (void)
121{ 179{
122 xthread_t tid; 180 xthread_t tid;
123 181
124 ++available; 182 if (curthreads >= max_threads && 0)
183 return;
184
185 ++curthreads;
186 ++idle;
125 xthread_create (&tid, thread_proc, 0); 187 xthread_create (&tid, thread_proc, 0);
126} 188}
127 189
128static void 190static void
129pmapi_release (void) 191pmapi_release (void)
130{ 192{
193 #if RECURSION_CHECK
194 if (X_TLS_GET (check_key))
195 croak ("perlinterp_release () called without valid perl context");
196
197 X_TLS_SET (check_key, &check_key);
198 #endif
199
131 if (!(thread_enable ? thread_enable & 1 : global_enable)) 200 if (! ((thread_enable ? thread_enable : global_enable) & 1))
132 { 201 {
133 pthread_setspecific (current_key, 0); 202 X_TLS_SET (current_key, 0);
134 return; 203 return;
135 } 204 }
136 205
137 struct tctx *ctx = tctx_get (); 206 struct tctx *ctx = tctx_get ();
138 ctx->coro = SvREFCNT_inc_NN (CORO_CURRENT); 207 ctx->coro = SvREFCNT_inc_simple_NN (CORO_CURRENT);
139 ctx->wait_f = 0; 208 ctx->wait_f = 0;
140 209
141 pthread_setspecific (current_key, ctx); 210 X_TLS_SET (current_key, ctx);
142 pthread_sigmask (SIG_SETMASK, &fullsigset, &cursigset); 211 pthread_sigmask (SIG_SETMASK, &fullsigset, &cursigset);
143 212
144 if (!available) 213 X_LOCK (release_m);
214
215 if (idle <= min_idle)
145 start_thread (); 216 start_thread ();
146 217
147 X_LOCK (perl_m); 218 tctxs_put (&releasers, ctx);
148 perl_f = ctx;
149 X_COND_SIGNAL (perl_c); 219 X_COND_SIGNAL (release_c);
220
221 while (!idle && releasers.cur)
222 {
223 X_UNLOCK (release_m);
224 X_LOCK (release_m);
225 }
226
150 X_UNLOCK (perl_m); 227 X_UNLOCK (release_m);
151} 228}
152 229
153static void 230static void
154pmapi_acquire (void) 231pmapi_acquire (void)
155{ 232{
156 struct tctx *ctx = pthread_getspecific (current_key); 233 int jeret;
234 struct tctx *ctx = X_TLS_GET (current_key);
235
236 #if RECURSION_CHECK
237 if (X_TLS_GET (check_key) != &check_key)
238 croak ("perlinterp_acquire () called with valid perl context");
239
240 X_TLS_SET (check_key, 0);
241 #endif
157 242
158 if (!ctx) 243 if (!ctx)
159 return; 244 return;
160 245
161 X_LOCK (wait_m); 246 X_LOCK (acquire_m);
162 247
163 if (waiters_count >= waiters_max) 248 tctxs_put (&acquirers, ctx);
164 {
165 waiters_max = waiters_max ? waiters_max * 2 : 16;
166 waiters = realloc (waiters, waiters_max * sizeof (*waiters));
167 }
168
169 waiters [waiters_count++] = ctx;
170 249
171 s_epipe_signal (&ep); 250 s_epipe_signal (&ep);
172 while (!ctx->wait_f) 251 while (!ctx->wait_f)
173 X_COND_WAIT (ctx->wait_c, wait_m); 252 X_COND_WAIT (ctx->acquire_c, acquire_m);
174 X_UNLOCK (wait_m); 253 X_UNLOCK (acquire_m);
175 254
255 jeret = ctx->jeret;
176 tctx_put (ctx); 256 tctx_put (ctx);
177 pthread_sigmask (SIG_SETMASK, &cursigset, 0); 257 pthread_sigmask (SIG_SETMASK, &cursigset, 0);
178}
179 258
180static void 259 if (jeret)
181set_enable_0 (pTHX) 260 {
182{ 261 dTHX;
183 thread_enable = 0; 262 JMPENV_JUMP (jeret);
263 }
184} 264}
185 265
186static void 266static void
187set_enable_1 (pTHX) 267set_thread_enable (pTHX_ void *arg)
188{ 268{
189 thread_enable = 1;
190}
191
192static void
193set_enable_2 (pTHX)
194{
195 thread_enable = 2; 269 thread_enable = PTR2IV (arg);
196} 270}
197 271
198MODULE = Coro::Multicore PACKAGE = Coro::Multicore 272MODULE = Coro::Multicore PACKAGE = Coro::Multicore
199 273
200PROTOTYPES: DISABLE 274PROTOTYPES: DISABLE
201 275
202BOOT: 276BOOT:
203{ 277{
204 #ifndef _WIN32 278#ifndef _WIN32
205 sigfillset (&fullsigset); 279 sigfillset (&fullsigset);
206 #endif 280#endif
207 281
208 pthread_key_create (&current_key, 0); 282 X_TLS_INIT (current_key);
283#if RECURSION_CHECK
284 X_TLS_INIT (check_key);
285#endif
209 286
210 if (s_epipe_new (&ep)) 287 if (s_epipe_new (&ep))
211 croak ("Coro::Multicore: unable to initialise event pipe.\n"); 288 croak ("Coro::Multicore: unable to initialise event pipe.\n");
212 289
213 perl_thx = PERL_GET_CONTEXT; 290 perl_thx = PERL_GET_CONTEXT;
214 291
215 I_CORO_API ("Coro::Multicore"); 292 I_CORO_API ("Coro::Multicore");
216 293
294 X_LOCK (release_m);
295 while (idle < min_idle)
296 start_thread ();
297 X_UNLOCK (release_m);
298
217 /* not perfectly efficient to do it this way, but it's simple */ 299 /* not perfectly efficient to do it this way, but it is simple */
218 perl_multicore_init (); 300 perl_multicore_init (); /* calls release */
219 perl_multicore_api->pmapi_release = pmapi_release; 301 perl_multicore_api->pmapi_release = pmapi_release;
220 perl_multicore_api->pmapi_acquire = pmapi_acquire; 302 perl_multicore_api->pmapi_acquire = pmapi_acquire;
221} 303}
222 304
223bool 305bool
231 313
232void 314void
233scoped_enable () 315scoped_enable ()
234 CODE: 316 CODE:
235 LEAVE; /* see Guard.xs */ 317 LEAVE; /* see Guard.xs */
236 CORO_ENTERLEAVE_SCOPE_HOOK (set_enable_1, set_enable_0); 318 CORO_ENTERLEAVE_SCOPE_HOOK (set_thread_enable, (void *)1, set_thread_enable, (void *)0);
237 ENTER; /* see Guard.xs */ 319 ENTER; /* see Guard.xs */
238 320
239void 321void
240scoped_disable () 322scoped_disable ()
241 CODE: 323 CODE:
242 LEAVE; /* see Guard.xs */ 324 LEAVE; /* see Guard.xs */
243 CORO_ENTERLEAVE_SCOPE_HOOK (set_enable_2, set_enable_0); 325 CORO_ENTERLEAVE_SCOPE_HOOK (set_thread_enable, (void *)2, set_thread_enable, (void *)0);
244 ENTER; /* see Guard.xs */ 326 ENTER; /* see Guard.xs */
245 327
246U32 328U32
247max_idle_threads (U32 max = NO_INIT) 329min_idle_threads (U32 min = NO_INIT)
248 CODE: 330 CODE:
249 X_LOCK (wait_m); 331 X_LOCK (acquire_m);
250 RETVAL = max_idle; 332 RETVAL = min_idle;
251 if (items) 333 if (items)
252 max_idle = max; 334 min_idle = min;
253 X_UNLOCK (wait_m); 335 X_UNLOCK (acquire_m);
254 OUTPUT: 336 OUTPUT:
255 RETVAL 337 RETVAL
256 338
257 339
258int 340int
264 346
265void 347void
266poll (...) 348poll (...)
267 CODE: 349 CODE:
268 s_epipe_drain (&ep); 350 s_epipe_drain (&ep);
269 X_LOCK (wait_m); 351 X_LOCK (acquire_m);
270 while (waiters_count) 352 while (acquirers.cur)
271 { 353 {
272 struct tctx *ctx = waiters [--waiters_count]; 354 struct tctx *ctx = tctxs_get (&acquirers);
273 CORO_READY ((SV *)ctx->coro); 355 CORO_READY ((SV *)ctx->coro);
274 SvREFCNT_dec_NN ((SV *)ctx->coro); 356 SvREFCNT_dec_NN ((SV *)ctx->coro);
275 ctx->coro = 0; 357 ctx->coro = 0;
276 } 358 }
277 X_UNLOCK (wait_m); 359 X_UNLOCK (acquire_m);
278 360
279void 361void
280sleep (NV seconds) 362sleep (NV seconds)
281 CODE: 363 CODE:
282 perlinterp_release (); 364 perlinterp_release ();
283 usleep (seconds * 1e6); 365 {
366 int nsec = seconds;
367 if (nsec) sleep (nsec);
368 nsec = (seconds - nsec) * 1e9;
369 if (nsec) usleep (nsec);
370 }
284 perlinterp_acquire (); 371 perlinterp_acquire ();
285 372

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines