ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/Coro-Multicore/Multicore.xs
Revision: 1.22
Committed: Sun Aug 26 15:42:57 2018 UTC (5 years, 9 months ago) by root
Branch: MAIN
Changes since 1.21: +5 -1 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.18 /* most win32 perls are beyond fixing, requiring dTHX */
2     /* even for ISO-C functions such as malloc. avoid! avoid! avoid! */
3 root 1.20 /* and fail to define numerous symbols, but still overrwide them */
4 root 1.19 /* with non-working versions (e.g. setjmp). */
5     #ifdef _WIN32
6 root 1.20 /*# define PERL_CORE 1 fixes some, breaks others */
7 root 1.19 #else
8 root 1.18 # define PERL_NO_GET_CONTEXT
9     #endif
10 root 1.1
11     #include "EXTERN.h"
12     #include "perl.h"
13     #include "XSUB.h"
14    
15 root 1.7 #define X_STACKSIZE 1024 * sizeof (void *)
16 root 1.1
17     #include "CoroAPI.h"
18     #include "perlmulticore.h"
19     #include "schmorp.h"
20     #include "xthread.h"
21    
22     #ifdef _WIN32
23 root 1.18 #ifndef sigset_t
24     #define sigset_t int
25     #endif
26 root 1.1 #endif
27    
28 root 1.8 #ifndef SvREFCNT_dec_NN
29     #define SvREFCNT_dec_NN(sv) SvREFCNT_dec (sv)
30     #endif
31    
32 root 1.22 #ifndef SvREFCNT_dec_simple_void_NN
33     #define SvREFCNT_dec_simple_void_NN(sv) SvREFCNT_dec (sv)
34     #endif
35    
36 root 1.8 #ifndef SvREFCNT_inc_NN
37     #define SvREFCNT_inc_NN(sv) SvREFCNT_inc (sv)
38     #endif
39    
40 root 1.16 #define RECURSION_CHECK 0
41    
42 root 1.12 static X_TLS_DECLARE(current_key);
43 root 1.16 #if RECURSION_CHECK
44     static X_TLS_DECLARE(check_key);
45     #endif
46    
47 root 1.1
48     static s_epipe ep;
49     static void *perl_thx;
50     static sigset_t cursigset, fullsigset;
51    
52 root 1.6 static int global_enable = 0;
53 root 1.4 static int thread_enable; /* 0 undefined, 1 disabled, 2 enabled */
54    
55 root 1.6 /* assigned to a thread for each release/acquire */
56 root 1.1 struct tctx
57     {
58     void *coro;
59 root 1.6 int wait_f;
60 root 1.9 xcond_t acquire_c;
61 root 1.11 int jeret;
62 root 1.1 };
63    
64     static struct tctx *tctx_free;
65    
66     static struct tctx *
67     tctx_get (void)
68     {
69     struct tctx *ctx;
70    
71     if (!tctx_free)
72 root 1.3 {
73     ctx = malloc (sizeof (*tctx_free));
74 root 1.9 X_COND_CREATE (ctx->acquire_c);
75 root 1.3 }
76 root 1.1 else
77     {
78     ctx = tctx_free;
79     tctx_free = tctx_free->coro;
80     }
81    
82     return ctx;
83     }
84    
85     static void
86     tctx_put (struct tctx *ctx)
87     {
88     ctx->coro = tctx_free;
89     tctx_free = ctx;
90     }
91    
92 root 1.9 /* a stack of tctxs */
93     struct tctxs
94     {
95     struct tctx **ctxs;
96     int cur, max;
97     };
98    
99     static struct tctx *
100     tctxs_get (struct tctxs *ctxs)
101     {
102     return ctxs->ctxs[--ctxs->cur];
103     }
104    
105     static void
106     tctxs_put (struct tctxs *ctxs, struct tctx *ctx)
107     {
108     if (ctxs->cur >= ctxs->max)
109     {
110     ctxs->max = ctxs->max ? ctxs->max * 2 : 16;
111     ctxs->ctxs = realloc (ctxs->ctxs, ctxs->max * sizeof (ctxs->ctxs[0]));
112     }
113    
114     ctxs->ctxs[ctxs->cur++] = ctx;
115     }
116    
117     static xmutex_t release_m = X_MUTEX_INIT;
118     static xcond_t release_c = X_COND_INIT;
119     static struct tctxs releasers;
120     static int idle;
121     static int min_idle = 1;
122     static int curthreads, max_threads = 1; /* protected by release_m */
123    
124     static xmutex_t acquire_m = X_MUTEX_INIT;
125     static struct tctxs acquirers;
126    
127 root 1.1 X_THREAD_PROC(thread_proc)
128     {
129     PERL_SET_CONTEXT (perl_thx);
130    
131     {
132 root 1.17 dTHXa (perl_thx);
133 root 1.11 dJMPENV;
134 root 1.1 struct tctx *ctx;
135 root 1.11 int catchret;
136 root 1.1
137 root 1.9 X_LOCK (release_m);
138 root 1.6
139 root 1.1 for (;;)
140     {
141 root 1.9 while (!releasers.cur)
142 root 1.6 if (idle <= min_idle || 1)
143 root 1.9 X_COND_WAIT (release_c, release_m);
144 root 1.6 else
145     {
146     struct timespec ts = { time (0) + idle - min_idle, 0 };
147    
148 root 1.9 if (X_COND_TIMEDWAIT (release_c, release_m, ts) == ETIMEDOUT)
149     if (idle > min_idle && !releasers.cur)
150 root 1.6 break;
151     }
152    
153 root 1.9 ctx = tctxs_get (&releasers);
154 root 1.6 --idle;
155 root 1.9 X_UNLOCK (release_m);
156 root 1.3
157 root 1.6 if (!ctx) /* timed out? */
158     break;
159    
160 root 1.1 pthread_sigmask (SIG_SETMASK, &cursigset, 0);
161 root 1.11 JMPENV_PUSH (ctx->jeret);
162 root 1.1
163 root 1.11 if (!ctx->jeret)
164     while (ctx->coro)
165     CORO_SCHEDULE;
166 root 1.1
167 root 1.11 JMPENV_POP;
168 root 1.3 pthread_sigmask (SIG_SETMASK, &fullsigset, &cursigset);
169    
170 root 1.9 X_LOCK (acquire_m);
171 root 1.3 ctx->wait_f = 1;
172 root 1.9 X_COND_SIGNAL (ctx->acquire_c);
173     X_UNLOCK (acquire_m);
174 root 1.1
175 root 1.9 X_LOCK (release_m);
176 root 1.6 ++idle;
177 root 1.1 }
178     }
179     }
180    
181     static void
182     start_thread (void)
183     {
184     xthread_t tid;
185    
186 root 1.9 if (curthreads >= max_threads && 0)
187     return;
188    
189     ++curthreads;
190 root 1.6 ++idle;
191 root 1.1 xthread_create (&tid, thread_proc, 0);
192     }
193    
194     static void
195     pmapi_release (void)
196     {
197 root 1.16 #if RECURSION_CHECK
198     if (X_TLS_GET (check_key))
199     croak ("perlinterp_release () called without valid perl context");
200    
201     X_TLS_SET (check_key, &check_key);
202     #endif
203    
204 root 1.13 if (! ((thread_enable ? thread_enable : global_enable) & 1))
205 root 1.4 {
206 root 1.12 X_TLS_SET (current_key, 0);
207 root 1.4 return;
208     }
209    
210 root 1.1 struct tctx *ctx = tctx_get ();
211 root 1.21 ctx->coro = SvREFCNT_inc_simple_NN (CORO_CURRENT);
212 root 1.3 ctx->wait_f = 0;
213    
214 root 1.12 X_TLS_SET (current_key, ctx);
215 root 1.1 pthread_sigmask (SIG_SETMASK, &fullsigset, &cursigset);
216    
217 root 1.9 X_LOCK (release_m);
218 root 1.6
219     if (idle <= min_idle)
220 root 1.1 start_thread ();
221    
222 root 1.9 tctxs_put (&releasers, ctx);
223     X_COND_SIGNAL (release_c);
224    
225     while (!idle && releasers.cur)
226     {
227     X_UNLOCK (release_m);
228     X_LOCK (release_m);
229     }
230 root 1.6
231 root 1.9 X_UNLOCK (release_m);
232 root 1.1 }
233    
234     static void
235     pmapi_acquire (void)
236     {
237 root 1.11 int jeret;
238 root 1.12 struct tctx *ctx = X_TLS_GET (current_key);
239 root 1.1
240 root 1.16 #if RECURSION_CHECK
241     if (X_TLS_GET (check_key) != &check_key)
242     croak ("perlinterp_acquire () called with valid perl context");
243    
244     X_TLS_SET (check_key, 0);
245     #endif
246    
247 root 1.4 if (!ctx)
248     return;
249    
250 root 1.9 X_LOCK (acquire_m);
251 root 1.1
252 root 1.9 tctxs_put (&acquirers, ctx);
253 root 1.1
254     s_epipe_signal (&ep);
255 root 1.3 while (!ctx->wait_f)
256 root 1.9 X_COND_WAIT (ctx->acquire_c, acquire_m);
257     X_UNLOCK (acquire_m);
258 root 1.1
259 root 1.11 jeret = ctx->jeret;
260 root 1.1 tctx_put (ctx);
261     pthread_sigmask (SIG_SETMASK, &cursigset, 0);
262 root 1.11
263     if (jeret)
264 root 1.17 {
265     dTHX;
266     JMPENV_JUMP (jeret);
267     }
268 root 1.1 }
269    
270 root 1.4 static void
271 root 1.6 set_thread_enable (pTHX_ void *arg)
272 root 1.4 {
273 root 1.6 thread_enable = PTR2IV (arg);
274 root 1.4 }
275    
276 root 1.1 MODULE = Coro::Multicore PACKAGE = Coro::Multicore
277    
278     PROTOTYPES: DISABLE
279    
280     BOOT:
281     {
282 root 1.16 #ifndef _WIN32
283 root 1.1 sigfillset (&fullsigset);
284 root 1.16 #endif
285 root 1.1
286 root 1.12 X_TLS_INIT (current_key);
287 root 1.16 #if RECURSION_CHECK
288     X_TLS_INIT (check_key);
289     #endif
290 root 1.1
291     if (s_epipe_new (&ep))
292     croak ("Coro::Multicore: unable to initialise event pipe.\n");
293    
294     perl_thx = PERL_GET_CONTEXT;
295    
296     I_CORO_API ("Coro::Multicore");
297    
298 root 1.9 X_LOCK (release_m);
299 root 1.6 while (idle < min_idle)
300     start_thread ();
301 root 1.9 X_UNLOCK (release_m);
302 root 1.6
303 root 1.14 /* not perfectly efficient to do it this way, but it is simple */
304     perl_multicore_init (); /* calls release */
305 root 1.1 perl_multicore_api->pmapi_release = pmapi_release;
306     perl_multicore_api->pmapi_acquire = pmapi_acquire;
307     }
308    
309 root 1.4 bool
310     enable (bool enable = NO_INIT)
311     CODE:
312     RETVAL = global_enable;
313     if (items)
314     global_enable = enable;
315     OUTPUT:
316     RETVAL
317    
318     void
319     scoped_enable ()
320     CODE:
321     LEAVE; /* see Guard.xs */
322 root 1.6 CORO_ENTERLEAVE_SCOPE_HOOK (set_thread_enable, (void *)1, set_thread_enable, (void *)0);
323 root 1.4 ENTER; /* see Guard.xs */
324    
325     void
326     scoped_disable ()
327     CODE:
328     LEAVE; /* see Guard.xs */
329 root 1.6 CORO_ENTERLEAVE_SCOPE_HOOK (set_thread_enable, (void *)2, set_thread_enable, (void *)0);
330 root 1.4 ENTER; /* see Guard.xs */
331    
332 root 1.1 U32
333 root 1.6 min_idle_threads (U32 min = NO_INIT)
334 root 1.1 CODE:
335 root 1.9 X_LOCK (acquire_m);
336 root 1.6 RETVAL = min_idle;
337 root 1.1 if (items)
338 root 1.6 min_idle = min;
339 root 1.9 X_UNLOCK (acquire_m);
340 root 1.1 OUTPUT:
341     RETVAL
342    
343    
344     int
345     fd ()
346     CODE:
347     RETVAL = s_epipe_fd (&ep);
348     OUTPUT:
349     RETVAL
350    
351     void
352     poll (...)
353     CODE:
354     s_epipe_drain (&ep);
355 root 1.9 X_LOCK (acquire_m);
356     while (acquirers.cur)
357 root 1.1 {
358 root 1.9 struct tctx *ctx = tctxs_get (&acquirers);
359 root 1.1 CORO_READY ((SV *)ctx->coro);
360 root 1.22 SvREFCNT_dec_simple_void_NN ((SV *)ctx->coro);
361 root 1.1 ctx->coro = 0;
362     }
363 root 1.9 X_UNLOCK (acquire_m);
364 root 1.1
365     void
366 root 1.2 sleep (NV seconds)
367 root 1.1 CODE:
368     perlinterp_release ();
369 root 1.20 {
370     int nsec = seconds;
371     if (nsec) sleep (nsec);
372     nsec = (seconds - nsec) * 1e9;
373     if (nsec) usleep (nsec);
374     }
375 root 1.1 perlinterp_acquire ();
376 root 1.2