ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/Coro-Multicore/Multicore.xs
(Generate patch)

Comparing Coro-Multicore/Multicore.xs (file contents):
Revision 1.3 by root, Sun Jun 28 08:04:02 2015 UTC vs.
Revision 1.15 by root, Sat Dec 19 23:56:35 2015 UTC

2 2
3#include "EXTERN.h" 3#include "EXTERN.h"
4#include "perl.h" 4#include "perl.h"
5#include "XSUB.h" 5#include "XSUB.h"
6 6
7#define X_STACKSIZE -1 7#define X_STACKSIZE 1024 * sizeof (void *)
8 8
9#include "CoroAPI.h" 9#include "CoroAPI.h"
10#include "perlmulticore.h" 10#include "perlmulticore.h"
11#include "schmorp.h" 11#include "schmorp.h"
12#include "xthread.h" 12#include "xthread.h"
14#ifdef _WIN32 14#ifdef _WIN32
15 typedef char sigset_t; 15 typedef char sigset_t;
16 #define pthread_sigmask(mode,new,old) 16 #define pthread_sigmask(mode,new,old)
17#endif 17#endif
18 18
19static pthread_key_t current_key; 19#ifndef SvREFCNT_dec_NN
20 #define SvREFCNT_dec_NN(sv) SvREFCNT_dec (sv)
21#endif
22
23#ifndef SvREFCNT_inc_NN
24 #define SvREFCNT_inc_NN(sv) SvREFCNT_inc (sv)
25#endif
26
27static X_TLS_DECLARE(current_key);
20 28
21static s_epipe ep; 29static s_epipe ep;
22static void *perl_thx; 30static void *perl_thx;
23static sigset_t cursigset, fullsigset; 31static sigset_t cursigset, fullsigset;
24 32
33static int global_enable = 0;
34static int thread_enable; /* 0 undefined, 1 disabled, 2 enabled */
35
36/* assigned to a thread for each release/acquire */
25struct tctx 37struct tctx
26{ 38{
27 void *coro; 39 void *coro;
28 xcond_t wait_c;
29 int wait_f; 40 int wait_f;
41 xcond_t acquire_c;
42 int jeret;
30}; 43};
31 44
32static struct tctx *tctx_free; 45static struct tctx *tctx_free;
33
34static int available;
35static int max_idle = 8;
36
37static xmutex_t perl_m = X_MUTEX_INIT;
38static xcond_t perl_c = X_COND_INIT;
39static struct tctx *perl_f;
40
41static xmutex_t wait_m = X_MUTEX_INIT;
42
43static int wakeup_f;
44static struct tctx **waiters;
45static int waiters_count, waiters_max;
46 46
47static struct tctx * 47static struct tctx *
48tctx_get (void) 48tctx_get (void)
49{ 49{
50 struct tctx *ctx; 50 struct tctx *ctx;
51 51
52 if (!tctx_free) 52 if (!tctx_free)
53 { 53 {
54 ctx = malloc (sizeof (*tctx_free)); 54 ctx = malloc (sizeof (*tctx_free));
55 X_COND_CREATE (ctx->wait_c); 55 X_COND_CREATE (ctx->acquire_c);
56 } 56 }
57 else 57 else
58 { 58 {
59 ctx = tctx_free; 59 ctx = tctx_free;
60 tctx_free = tctx_free->coro; 60 tctx_free = tctx_free->coro;
68{ 68{
69 ctx->coro = tctx_free; 69 ctx->coro = tctx_free;
70 tctx_free = ctx; 70 tctx_free = ctx;
71} 71}
72 72
73/* a stack of tctxs */
74struct tctxs
75{
76 struct tctx **ctxs;
77 int cur, max;
78};
79
80static struct tctx *
81tctxs_get (struct tctxs *ctxs)
82{
83 return ctxs->ctxs[--ctxs->cur];
84}
85
86static void
87tctxs_put (struct tctxs *ctxs, struct tctx *ctx)
88{
89 if (ctxs->cur >= ctxs->max)
90 {
91 ctxs->max = ctxs->max ? ctxs->max * 2 : 16;
92 ctxs->ctxs = realloc (ctxs->ctxs, ctxs->max * sizeof (ctxs->ctxs[0]));
93 }
94
95 ctxs->ctxs[ctxs->cur++] = ctx;
96}
97
98static xmutex_t release_m = X_MUTEX_INIT;
99static xcond_t release_c = X_COND_INIT;
100static struct tctxs releasers;
101static int idle;
102static int min_idle = 1;
103static int curthreads, max_threads = 1; /* protected by release_m */
104
105static xmutex_t acquire_m = X_MUTEX_INIT;
106static struct tctxs acquirers;
107
73X_THREAD_PROC(thread_proc) 108X_THREAD_PROC(thread_proc)
74{ 109{
75 PERL_SET_CONTEXT (perl_thx); 110 PERL_SET_CONTEXT (perl_thx);
76 111
77 { 112 {
78 dTHX; /* inefficient, we already have perl_thx, but I see no better way */ 113 dTHX; /* inefficient, we already have perl_thx, but I see no better way */
114 dJMPENV;
79 struct tctx *ctx; 115 struct tctx *ctx;
116 int catchret;
117
118 X_LOCK (release_m);
80 119
81 for (;;) 120 for (;;)
82 { 121 {
83 /* TODO: should really use some idle time and exit after that */ 122 while (!releasers.cur)
84 X_LOCK (perl_m); 123 if (idle <= min_idle || 1)
85 while (!perl_f)
86 X_COND_WAIT (perl_c, perl_m); 124 X_COND_WAIT (release_c, release_m);
87 ctx = perl_f; 125 else
88 perl_f = 0; 126 {
127 struct timespec ts = { time (0) + idle - min_idle, 0 };
128
129 if (X_COND_TIMEDWAIT (release_c, release_m, ts) == ETIMEDOUT)
130 if (idle > min_idle && !releasers.cur)
131 break;
132 }
133
134 ctx = tctxs_get (&releasers);
89 --available; 135 --idle;
90 X_UNLOCK (perl_m); 136 X_UNLOCK (release_m);
137
138 if (!ctx) /* timed out? */
139 break;
91 140
92 pthread_sigmask (SIG_SETMASK, &cursigset, 0); 141 pthread_sigmask (SIG_SETMASK, &cursigset, 0);
142 JMPENV_PUSH (ctx->jeret);
93 143
144 if (!ctx->jeret)
94 while (ctx->coro) 145 while (ctx->coro)
95 CORO_SCHEDULE; 146 CORO_SCHEDULE;
96 147
148 JMPENV_POP;
97 pthread_sigmask (SIG_SETMASK, &fullsigset, &cursigset); 149 pthread_sigmask (SIG_SETMASK, &fullsigset, &cursigset);
98 150
99 X_LOCK (wait_m); 151 X_LOCK (acquire_m);
100 ctx->wait_f = 1; 152 ctx->wait_f = 1;
101 X_COND_SIGNAL (ctx->wait_c); 153 X_COND_SIGNAL (ctx->acquire_c);
102
103 if (available >= max_idle)
104 {
105 X_UNLOCK (wait_m);
106 break;
107 }
108
109 ++available;
110 X_UNLOCK (wait_m); 154 X_UNLOCK (acquire_m);
155
156 X_LOCK (release_m);
157 ++idle;
111 } 158 }
112 } 159 }
113} 160}
114 161
115static void 162static void
116start_thread (void) 163start_thread (void)
117{ 164{
118 xthread_t tid; 165 xthread_t tid;
119 166
120 ++available; 167 if (curthreads >= max_threads && 0)
168 return;
169
170 ++curthreads;
171 ++idle;
121 xthread_create (&tid, thread_proc, 0); 172 xthread_create (&tid, thread_proc, 0);
122} 173}
123 174
124static void 175static void
125pmapi_release (void) 176pmapi_release (void)
126{ 177{
178 if (! ((thread_enable ? thread_enable : global_enable) & 1))
179 {
180 X_TLS_SET (current_key, 0);
181 return;
182 }
183
127 struct tctx *ctx = tctx_get (); 184 struct tctx *ctx = tctx_get ();
128 ctx->coro = SvREFCNT_inc_NN (CORO_CURRENT); 185 ctx->coro = SvREFCNT_inc_NN (CORO_CURRENT);
129 ctx->wait_f = 0; 186 ctx->wait_f = 0;
130 187
131 pthread_setspecific (current_key, ctx); 188 X_TLS_SET (current_key, ctx);
132 pthread_sigmask (SIG_SETMASK, &fullsigset, &cursigset); 189 pthread_sigmask (SIG_SETMASK, &fullsigset, &cursigset);
133 190
134 if (!available) 191 X_LOCK (release_m);
192
193 if (idle <= min_idle)
135 start_thread (); 194 start_thread ();
136 195
137 X_LOCK (perl_m); 196 tctxs_put (&releasers, ctx);
138 perl_f = ctx;
139 X_COND_SIGNAL (perl_c); 197 X_COND_SIGNAL (release_c);
198
199 while (!idle && releasers.cur)
200 {
201 X_UNLOCK (release_m);
202 X_LOCK (release_m);
203 }
204
140 X_UNLOCK (perl_m); 205 X_UNLOCK (release_m);
141} 206}
142 207
143static void 208static void
144pmapi_acquire (void) 209pmapi_acquire (void)
145{ 210{
146 struct tctx *ctx = pthread_getspecific (current_key); 211 int jeret;
212 struct tctx *ctx = X_TLS_GET (current_key);
147 213
214 if (!ctx)
215 return;
216
148 X_LOCK (wait_m); 217 X_LOCK (acquire_m);
149 218
150 if (waiters_count >= waiters_max) 219 tctxs_put (&acquirers, ctx);
151 {
152 waiters_max = waiters_max ? waiters_max * 2 : 16;
153 waiters = realloc (waiters, waiters_max * sizeof (*waiters));
154 }
155
156 waiters [waiters_count++] = ctx;
157 220
158 s_epipe_signal (&ep); 221 s_epipe_signal (&ep);
159 while (!ctx->wait_f) 222 while (!ctx->wait_f)
160 X_COND_WAIT (ctx->wait_c, wait_m); 223 X_COND_WAIT (ctx->acquire_c, acquire_m);
161 X_UNLOCK (wait_m); 224 X_UNLOCK (acquire_m);
162 225
226 jeret = ctx->jeret;
163 tctx_put (ctx); 227 tctx_put (ctx);
164 pthread_sigmask (SIG_SETMASK, &cursigset, 0); 228 pthread_sigmask (SIG_SETMASK, &cursigset, 0);
229
230 if (jeret)
231 JMPENV_JUMP (jeret);
232}
233
234static void
235set_thread_enable (pTHX_ void *arg)
236{
237 thread_enable = PTR2IV (arg);
165} 238}
166 239
167MODULE = Coro::Multicore PACKAGE = Coro::Multicore 240MODULE = Coro::Multicore PACKAGE = Coro::Multicore
168 241
169PROTOTYPES: DISABLE 242PROTOTYPES: DISABLE
172{ 245{
173 #ifndef _WIN32 246 #ifndef _WIN32
174 sigfillset (&fullsigset); 247 sigfillset (&fullsigset);
175 #endif 248 #endif
176 249
177 pthread_key_create (&current_key, 0); 250 X_TLS_INIT (current_key);
178 251
179 if (s_epipe_new (&ep)) 252 if (s_epipe_new (&ep))
180 croak ("Coro::Multicore: unable to initialise event pipe.\n"); 253 croak ("Coro::Multicore: unable to initialise event pipe.\n");
181 254
182 perl_thx = PERL_GET_CONTEXT; 255 perl_thx = PERL_GET_CONTEXT;
183 256
184 I_CORO_API ("Coro::Multicore"); 257 I_CORO_API ("Coro::Multicore");
185 258
259 X_LOCK (release_m);
260 while (idle < min_idle)
261 start_thread ();
262 X_UNLOCK (release_m);
263
186 /* not perfectly efficient to do it this way, but it's simple */ 264 /* not perfectly efficient to do it this way, but it is simple */
187 perl_multicore_init (); 265 perl_multicore_init (); /* calls release */
188 perl_multicore_api->pmapi_release = pmapi_release; 266 perl_multicore_api->pmapi_release = pmapi_release;
189 perl_multicore_api->pmapi_acquire = pmapi_acquire; 267 perl_multicore_api->pmapi_acquire = pmapi_acquire;
190} 268}
191 269
270bool
271enable (bool enable = NO_INIT)
272 CODE:
273 RETVAL = global_enable;
274 if (items)
275 global_enable = enable;
276 OUTPUT:
277 RETVAL
278
279void
280scoped_enable ()
281 CODE:
282 LEAVE; /* see Guard.xs */
283 CORO_ENTERLEAVE_SCOPE_HOOK (set_thread_enable, (void *)1, set_thread_enable, (void *)0);
284 ENTER; /* see Guard.xs */
285
286void
287scoped_disable ()
288 CODE:
289 LEAVE; /* see Guard.xs */
290 CORO_ENTERLEAVE_SCOPE_HOOK (set_thread_enable, (void *)2, set_thread_enable, (void *)0);
291 ENTER; /* see Guard.xs */
292
192U32 293U32
193max_idle_threads (U32 max = NO_INIT) 294min_idle_threads (U32 min = NO_INIT)
194 CODE: 295 CODE:
195 X_LOCK (wait_m); 296 X_LOCK (acquire_m);
196 RETVAL = max_idle; 297 RETVAL = min_idle;
197 if (items) 298 if (items)
198 max_idle = max; 299 min_idle = min;
199 X_UNLOCK (wait_m); 300 X_UNLOCK (acquire_m);
200 OUTPUT: 301 OUTPUT:
201 RETVAL 302 RETVAL
202 303
203 304
204int 305int
210 311
211void 312void
212poll (...) 313poll (...)
213 CODE: 314 CODE:
214 s_epipe_drain (&ep); 315 s_epipe_drain (&ep);
215 X_LOCK (wait_m); 316 X_LOCK (acquire_m);
216 while (waiters_count) 317 while (acquirers.cur)
217 { 318 {
218 struct tctx *ctx = waiters [--waiters_count]; 319 struct tctx *ctx = tctxs_get (&acquirers);
219 CORO_READY ((SV *)ctx->coro); 320 CORO_READY ((SV *)ctx->coro);
220 SvREFCNT_dec_NN ((SV *)ctx->coro); 321 SvREFCNT_dec_NN ((SV *)ctx->coro);
221 ctx->coro = 0; 322 ctx->coro = 0;
222 } 323 }
223 X_UNLOCK (wait_m); 324 X_UNLOCK (acquire_m);
224 325
225void 326void
226sleep (NV seconds) 327sleep (NV seconds)
227 CODE: 328 CODE:
228 perlinterp_release (); 329 perlinterp_release ();

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines