ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/Coro-Multicore/Multicore.xs
Revision: 1.20
Committed: Tue Aug 14 16:53:05 2018 UTC (5 years, 9 months ago) by root
Branch: MAIN
CVS Tags: rel-1_01
Changes since 1.19: +8 -3 lines
Log Message:
1.01

File Contents

# User Rev Content
1 root 1.18 /* most win32 perls are beyond fixing, requiring dTHX */
2     /* even for ISO-C functions such as malloc. avoid! avoid! avoid! */
3 root 1.20 /* and fail to define numerous symbols, but still overrwide them */
4 root 1.19 /* with non-working versions (e.g. setjmp). */
5     #ifdef _WIN32
6 root 1.20 /*# define PERL_CORE 1 fixes some, breaks others */
7 root 1.19 #else
8 root 1.18 # define PERL_NO_GET_CONTEXT
9     #endif
10 root 1.1
11     #include "EXTERN.h"
12     #include "perl.h"
13     #include "XSUB.h"
14    
15 root 1.7 #define X_STACKSIZE 1024 * sizeof (void *)
16 root 1.1
17     #include "CoroAPI.h"
18     #include "perlmulticore.h"
19     #include "schmorp.h"
20     #include "xthread.h"
21    
22     #ifdef _WIN32
23 root 1.18 #ifndef sigset_t
24     #define sigset_t int
25     #endif
26 root 1.1 #endif
27    
28 root 1.8 #ifndef SvREFCNT_dec_NN
29     #define SvREFCNT_dec_NN(sv) SvREFCNT_dec (sv)
30     #endif
31    
32     #ifndef SvREFCNT_inc_NN
33     #define SvREFCNT_inc_NN(sv) SvREFCNT_inc (sv)
34     #endif
35    
36 root 1.16 #define RECURSION_CHECK 0
37    
38 root 1.12 static X_TLS_DECLARE(current_key);
39 root 1.16 #if RECURSION_CHECK
40     static X_TLS_DECLARE(check_key);
41     #endif
42    
43 root 1.1
44     static s_epipe ep;
45     static void *perl_thx;
46     static sigset_t cursigset, fullsigset;
47    
48 root 1.6 static int global_enable = 0;
49 root 1.4 static int thread_enable; /* 0 undefined, 1 disabled, 2 enabled */
50    
51 root 1.6 /* assigned to a thread for each release/acquire */
52 root 1.1 struct tctx
53     {
54     void *coro;
55 root 1.6 int wait_f;
56 root 1.9 xcond_t acquire_c;
57 root 1.11 int jeret;
58 root 1.1 };
59    
60     static struct tctx *tctx_free;
61    
62     static struct tctx *
63     tctx_get (void)
64     {
65     struct tctx *ctx;
66    
67     if (!tctx_free)
68 root 1.3 {
69     ctx = malloc (sizeof (*tctx_free));
70 root 1.9 X_COND_CREATE (ctx->acquire_c);
71 root 1.3 }
72 root 1.1 else
73     {
74     ctx = tctx_free;
75     tctx_free = tctx_free->coro;
76     }
77    
78     return ctx;
79     }
80    
81     static void
82     tctx_put (struct tctx *ctx)
83     {
84     ctx->coro = tctx_free;
85     tctx_free = ctx;
86     }
87    
88 root 1.9 /* a stack of tctxs */
89     struct tctxs
90     {
91     struct tctx **ctxs;
92     int cur, max;
93     };
94    
95     static struct tctx *
96     tctxs_get (struct tctxs *ctxs)
97     {
98     return ctxs->ctxs[--ctxs->cur];
99     }
100    
101     static void
102     tctxs_put (struct tctxs *ctxs, struct tctx *ctx)
103     {
104     if (ctxs->cur >= ctxs->max)
105     {
106     ctxs->max = ctxs->max ? ctxs->max * 2 : 16;
107     ctxs->ctxs = realloc (ctxs->ctxs, ctxs->max * sizeof (ctxs->ctxs[0]));
108     }
109    
110     ctxs->ctxs[ctxs->cur++] = ctx;
111     }
112    
113     static xmutex_t release_m = X_MUTEX_INIT;
114     static xcond_t release_c = X_COND_INIT;
115     static struct tctxs releasers;
116     static int idle;
117     static int min_idle = 1;
118     static int curthreads, max_threads = 1; /* protected by release_m */
119    
120     static xmutex_t acquire_m = X_MUTEX_INIT;
121     static struct tctxs acquirers;
122    
123 root 1.1 X_THREAD_PROC(thread_proc)
124     {
125     PERL_SET_CONTEXT (perl_thx);
126    
127     {
128 root 1.17 dTHXa (perl_thx);
129 root 1.11 dJMPENV;
130 root 1.1 struct tctx *ctx;
131 root 1.11 int catchret;
132 root 1.1
133 root 1.9 X_LOCK (release_m);
134 root 1.6
135 root 1.1 for (;;)
136     {
137 root 1.9 while (!releasers.cur)
138 root 1.6 if (idle <= min_idle || 1)
139 root 1.9 X_COND_WAIT (release_c, release_m);
140 root 1.6 else
141     {
142     struct timespec ts = { time (0) + idle - min_idle, 0 };
143    
144 root 1.9 if (X_COND_TIMEDWAIT (release_c, release_m, ts) == ETIMEDOUT)
145     if (idle > min_idle && !releasers.cur)
146 root 1.6 break;
147     }
148    
149 root 1.9 ctx = tctxs_get (&releasers);
150 root 1.6 --idle;
151 root 1.9 X_UNLOCK (release_m);
152 root 1.3
153 root 1.6 if (!ctx) /* timed out? */
154     break;
155    
156 root 1.1 pthread_sigmask (SIG_SETMASK, &cursigset, 0);
157 root 1.11 JMPENV_PUSH (ctx->jeret);
158 root 1.1
159 root 1.11 if (!ctx->jeret)
160     while (ctx->coro)
161     CORO_SCHEDULE;
162 root 1.1
163 root 1.11 JMPENV_POP;
164 root 1.3 pthread_sigmask (SIG_SETMASK, &fullsigset, &cursigset);
165    
166 root 1.9 X_LOCK (acquire_m);
167 root 1.3 ctx->wait_f = 1;
168 root 1.9 X_COND_SIGNAL (ctx->acquire_c);
169     X_UNLOCK (acquire_m);
170 root 1.1
171 root 1.9 X_LOCK (release_m);
172 root 1.6 ++idle;
173 root 1.1 }
174     }
175     }
176    
177     static void
178     start_thread (void)
179     {
180     xthread_t tid;
181    
182 root 1.9 if (curthreads >= max_threads && 0)
183     return;
184    
185     ++curthreads;
186 root 1.6 ++idle;
187 root 1.1 xthread_create (&tid, thread_proc, 0);
188     }
189    
190     static void
191     pmapi_release (void)
192     {
193 root 1.16 #if RECURSION_CHECK
194     if (X_TLS_GET (check_key))
195     croak ("perlinterp_release () called without valid perl context");
196    
197     X_TLS_SET (check_key, &check_key);
198     #endif
199    
200 root 1.13 if (! ((thread_enable ? thread_enable : global_enable) & 1))
201 root 1.4 {
202 root 1.12 X_TLS_SET (current_key, 0);
203 root 1.4 return;
204     }
205    
206 root 1.1 struct tctx *ctx = tctx_get ();
207     ctx->coro = SvREFCNT_inc_NN (CORO_CURRENT);
208 root 1.3 ctx->wait_f = 0;
209    
210 root 1.12 X_TLS_SET (current_key, ctx);
211 root 1.1 pthread_sigmask (SIG_SETMASK, &fullsigset, &cursigset);
212    
213 root 1.9 X_LOCK (release_m);
214 root 1.6
215     if (idle <= min_idle)
216 root 1.1 start_thread ();
217    
218 root 1.9 tctxs_put (&releasers, ctx);
219     X_COND_SIGNAL (release_c);
220    
221     while (!idle && releasers.cur)
222     {
223     X_UNLOCK (release_m);
224     X_LOCK (release_m);
225     }
226 root 1.6
227 root 1.9 X_UNLOCK (release_m);
228 root 1.1 }
229    
230     static void
231     pmapi_acquire (void)
232     {
233 root 1.11 int jeret;
234 root 1.12 struct tctx *ctx = X_TLS_GET (current_key);
235 root 1.1
236 root 1.16 #if RECURSION_CHECK
237     if (X_TLS_GET (check_key) != &check_key)
238     croak ("perlinterp_acquire () called with valid perl context");
239    
240     X_TLS_SET (check_key, 0);
241     #endif
242    
243 root 1.4 if (!ctx)
244     return;
245    
246 root 1.9 X_LOCK (acquire_m);
247 root 1.1
248 root 1.9 tctxs_put (&acquirers, ctx);
249 root 1.1
250     s_epipe_signal (&ep);
251 root 1.3 while (!ctx->wait_f)
252 root 1.9 X_COND_WAIT (ctx->acquire_c, acquire_m);
253     X_UNLOCK (acquire_m);
254 root 1.1
255 root 1.11 jeret = ctx->jeret;
256 root 1.1 tctx_put (ctx);
257     pthread_sigmask (SIG_SETMASK, &cursigset, 0);
258 root 1.11
259     if (jeret)
260 root 1.17 {
261     dTHX;
262     JMPENV_JUMP (jeret);
263     }
264 root 1.1 }
265    
266 root 1.4 static void
267 root 1.6 set_thread_enable (pTHX_ void *arg)
268 root 1.4 {
269 root 1.6 thread_enable = PTR2IV (arg);
270 root 1.4 }
271    
272 root 1.1 MODULE = Coro::Multicore PACKAGE = Coro::Multicore
273    
274     PROTOTYPES: DISABLE
275    
276     BOOT:
277     {
278 root 1.16 #ifndef _WIN32
279 root 1.1 sigfillset (&fullsigset);
280 root 1.16 #endif
281 root 1.1
282 root 1.12 X_TLS_INIT (current_key);
283 root 1.16 #if RECURSION_CHECK
284     X_TLS_INIT (check_key);
285     #endif
286 root 1.1
287     if (s_epipe_new (&ep))
288     croak ("Coro::Multicore: unable to initialise event pipe.\n");
289    
290     perl_thx = PERL_GET_CONTEXT;
291    
292     I_CORO_API ("Coro::Multicore");
293    
294 root 1.9 X_LOCK (release_m);
295 root 1.6 while (idle < min_idle)
296     start_thread ();
297 root 1.9 X_UNLOCK (release_m);
298 root 1.6
299 root 1.14 /* not perfectly efficient to do it this way, but it is simple */
300     perl_multicore_init (); /* calls release */
301 root 1.1 perl_multicore_api->pmapi_release = pmapi_release;
302     perl_multicore_api->pmapi_acquire = pmapi_acquire;
303     }
304    
305 root 1.4 bool
306     enable (bool enable = NO_INIT)
307     CODE:
308     RETVAL = global_enable;
309     if (items)
310     global_enable = enable;
311     OUTPUT:
312     RETVAL
313    
314     void
315     scoped_enable ()
316     CODE:
317     LEAVE; /* see Guard.xs */
318 root 1.6 CORO_ENTERLEAVE_SCOPE_HOOK (set_thread_enable, (void *)1, set_thread_enable, (void *)0);
319 root 1.4 ENTER; /* see Guard.xs */
320    
321     void
322     scoped_disable ()
323     CODE:
324     LEAVE; /* see Guard.xs */
325 root 1.6 CORO_ENTERLEAVE_SCOPE_HOOK (set_thread_enable, (void *)2, set_thread_enable, (void *)0);
326 root 1.4 ENTER; /* see Guard.xs */
327    
328 root 1.1 U32
329 root 1.6 min_idle_threads (U32 min = NO_INIT)
330 root 1.1 CODE:
331 root 1.9 X_LOCK (acquire_m);
332 root 1.6 RETVAL = min_idle;
333 root 1.1 if (items)
334 root 1.6 min_idle = min;
335 root 1.9 X_UNLOCK (acquire_m);
336 root 1.1 OUTPUT:
337     RETVAL
338    
339    
340     int
341     fd ()
342     CODE:
343     RETVAL = s_epipe_fd (&ep);
344     OUTPUT:
345     RETVAL
346    
347     void
348     poll (...)
349     CODE:
350     s_epipe_drain (&ep);
351 root 1.9 X_LOCK (acquire_m);
352     while (acquirers.cur)
353 root 1.1 {
354 root 1.9 struct tctx *ctx = tctxs_get (&acquirers);
355 root 1.1 CORO_READY ((SV *)ctx->coro);
356     SvREFCNT_dec_NN ((SV *)ctx->coro);
357     ctx->coro = 0;
358     }
359 root 1.9 X_UNLOCK (acquire_m);
360 root 1.1
361     void
362 root 1.2 sleep (NV seconds)
363 root 1.1 CODE:
364     perlinterp_release ();
365 root 1.20 {
366     int nsec = seconds;
367     if (nsec) sleep (nsec);
368     nsec = (seconds - nsec) * 1e9;
369     if (nsec) usleep (nsec);
370     }
371 root 1.1 perlinterp_acquire ();
372 root 1.2