ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/Coro-Multicore/Multicore.xs
Revision: 1.16
Committed: Sun Jan 24 04:20:49 2016 UTC (8 years, 4 months ago) by root
Branch: MAIN
CVS Tags: rel-0_03
Changes since 1.15: +25 -2 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 #define PERL_NO_GET_CONTEXT
2    
3     #include "EXTERN.h"
4     #include "perl.h"
5     #include "XSUB.h"
6    
7 root 1.7 #define X_STACKSIZE 1024 * sizeof (void *)
8 root 1.1
9     #include "CoroAPI.h"
10     #include "perlmulticore.h"
11     #include "schmorp.h"
12     #include "xthread.h"
13    
14     #ifdef _WIN32
15     typedef char sigset_t;
16     #define pthread_sigmask(mode,new,old)
17     #endif
18    
19 root 1.8 #ifndef SvREFCNT_dec_NN
20     #define SvREFCNT_dec_NN(sv) SvREFCNT_dec (sv)
21     #endif
22    
23     #ifndef SvREFCNT_inc_NN
24     #define SvREFCNT_inc_NN(sv) SvREFCNT_inc (sv)
25     #endif
26    
27 root 1.16 #define RECURSION_CHECK 0
28    
29 root 1.12 static X_TLS_DECLARE(current_key);
30 root 1.16 #if RECURSION_CHECK
31     static X_TLS_DECLARE(check_key);
32     #endif
33    
34 root 1.1
35     static s_epipe ep;
36     static void *perl_thx;
37     static sigset_t cursigset, fullsigset;
38    
39 root 1.6 static int global_enable = 0;
40 root 1.4 static int thread_enable; /* 0 undefined, 1 disabled, 2 enabled */
41    
42 root 1.6 /* assigned to a thread for each release/acquire */
43 root 1.1 struct tctx
44     {
45     void *coro;
46 root 1.6 int wait_f;
47 root 1.9 xcond_t acquire_c;
48 root 1.11 int jeret;
49 root 1.1 };
50    
51     static struct tctx *tctx_free;
52    
53     static struct tctx *
54     tctx_get (void)
55     {
56     struct tctx *ctx;
57    
58     if (!tctx_free)
59 root 1.3 {
60     ctx = malloc (sizeof (*tctx_free));
61 root 1.9 X_COND_CREATE (ctx->acquire_c);
62 root 1.3 }
63 root 1.1 else
64     {
65     ctx = tctx_free;
66     tctx_free = tctx_free->coro;
67     }
68    
69     return ctx;
70     }
71    
72     static void
73     tctx_put (struct tctx *ctx)
74     {
75     ctx->coro = tctx_free;
76     tctx_free = ctx;
77     }
78    
79 root 1.9 /* a stack of tctxs */
80     struct tctxs
81     {
82     struct tctx **ctxs;
83     int cur, max;
84     };
85    
86     static struct tctx *
87     tctxs_get (struct tctxs *ctxs)
88     {
89     return ctxs->ctxs[--ctxs->cur];
90     }
91    
92     static void
93     tctxs_put (struct tctxs *ctxs, struct tctx *ctx)
94     {
95     if (ctxs->cur >= ctxs->max)
96     {
97     ctxs->max = ctxs->max ? ctxs->max * 2 : 16;
98     ctxs->ctxs = realloc (ctxs->ctxs, ctxs->max * sizeof (ctxs->ctxs[0]));
99     }
100    
101     ctxs->ctxs[ctxs->cur++] = ctx;
102     }
103    
104     static xmutex_t release_m = X_MUTEX_INIT;
105     static xcond_t release_c = X_COND_INIT;
106     static struct tctxs releasers;
107     static int idle;
108     static int min_idle = 1;
109     static int curthreads, max_threads = 1; /* protected by release_m */
110    
111     static xmutex_t acquire_m = X_MUTEX_INIT;
112     static struct tctxs acquirers;
113    
114 root 1.1 X_THREAD_PROC(thread_proc)
115     {
116     PERL_SET_CONTEXT (perl_thx);
117    
118     {
119     dTHX; /* inefficient, we already have perl_thx, but I see no better way */
120 root 1.11 dJMPENV;
121 root 1.1 struct tctx *ctx;
122 root 1.11 int catchret;
123 root 1.1
124 root 1.9 X_LOCK (release_m);
125 root 1.6
126 root 1.1 for (;;)
127     {
128 root 1.9 while (!releasers.cur)
129 root 1.6 if (idle <= min_idle || 1)
130 root 1.9 X_COND_WAIT (release_c, release_m);
131 root 1.6 else
132     {
133     struct timespec ts = { time (0) + idle - min_idle, 0 };
134    
135 root 1.9 if (X_COND_TIMEDWAIT (release_c, release_m, ts) == ETIMEDOUT)
136     if (idle > min_idle && !releasers.cur)
137 root 1.6 break;
138     }
139    
140 root 1.9 ctx = tctxs_get (&releasers);
141 root 1.6 --idle;
142 root 1.9 X_UNLOCK (release_m);
143 root 1.3
144 root 1.6 if (!ctx) /* timed out? */
145     break;
146    
147 root 1.1 pthread_sigmask (SIG_SETMASK, &cursigset, 0);
148 root 1.11 JMPENV_PUSH (ctx->jeret);
149 root 1.1
150 root 1.11 if (!ctx->jeret)
151     while (ctx->coro)
152     CORO_SCHEDULE;
153 root 1.1
154 root 1.11 JMPENV_POP;
155 root 1.3 pthread_sigmask (SIG_SETMASK, &fullsigset, &cursigset);
156    
157 root 1.9 X_LOCK (acquire_m);
158 root 1.3 ctx->wait_f = 1;
159 root 1.9 X_COND_SIGNAL (ctx->acquire_c);
160     X_UNLOCK (acquire_m);
161 root 1.1
162 root 1.9 X_LOCK (release_m);
163 root 1.6 ++idle;
164 root 1.1 }
165     }
166     }
167    
168     static void
169     start_thread (void)
170     {
171     xthread_t tid;
172    
173 root 1.9 if (curthreads >= max_threads && 0)
174     return;
175    
176     ++curthreads;
177 root 1.6 ++idle;
178 root 1.1 xthread_create (&tid, thread_proc, 0);
179     }
180    
181     static void
182     pmapi_release (void)
183     {
184 root 1.16 #if RECURSION_CHECK
185     if (X_TLS_GET (check_key))
186     croak ("perlinterp_release () called without valid perl context");
187    
188     X_TLS_SET (check_key, &check_key);
189     #endif
190    
191 root 1.13 if (! ((thread_enable ? thread_enable : global_enable) & 1))
192 root 1.4 {
193 root 1.12 X_TLS_SET (current_key, 0);
194 root 1.4 return;
195     }
196    
197 root 1.1 struct tctx *ctx = tctx_get ();
198     ctx->coro = SvREFCNT_inc_NN (CORO_CURRENT);
199 root 1.3 ctx->wait_f = 0;
200    
201 root 1.12 X_TLS_SET (current_key, ctx);
202 root 1.1 pthread_sigmask (SIG_SETMASK, &fullsigset, &cursigset);
203    
204 root 1.9 X_LOCK (release_m);
205 root 1.6
206     if (idle <= min_idle)
207 root 1.1 start_thread ();
208    
209 root 1.9 tctxs_put (&releasers, ctx);
210     X_COND_SIGNAL (release_c);
211    
212     while (!idle && releasers.cur)
213     {
214     X_UNLOCK (release_m);
215     X_LOCK (release_m);
216     }
217 root 1.6
218 root 1.9 X_UNLOCK (release_m);
219 root 1.1 }
220    
221     static void
222     pmapi_acquire (void)
223     {
224 root 1.11 int jeret;
225 root 1.12 struct tctx *ctx = X_TLS_GET (current_key);
226 root 1.1
227 root 1.16 #if RECURSION_CHECK
228     if (X_TLS_GET (check_key) != &check_key)
229     croak ("perlinterp_acquire () called with valid perl context");
230    
231     X_TLS_SET (check_key, 0);
232     #endif
233    
234 root 1.4 if (!ctx)
235     return;
236    
237 root 1.9 X_LOCK (acquire_m);
238 root 1.1
239 root 1.9 tctxs_put (&acquirers, ctx);
240 root 1.1
241     s_epipe_signal (&ep);
242 root 1.3 while (!ctx->wait_f)
243 root 1.9 X_COND_WAIT (ctx->acquire_c, acquire_m);
244     X_UNLOCK (acquire_m);
245 root 1.1
246 root 1.11 jeret = ctx->jeret;
247 root 1.1 tctx_put (ctx);
248     pthread_sigmask (SIG_SETMASK, &cursigset, 0);
249 root 1.11
250     if (jeret)
251     JMPENV_JUMP (jeret);
252 root 1.1 }
253    
254 root 1.4 static void
255 root 1.6 set_thread_enable (pTHX_ void *arg)
256 root 1.4 {
257 root 1.6 thread_enable = PTR2IV (arg);
258 root 1.4 }
259    
260 root 1.1 MODULE = Coro::Multicore PACKAGE = Coro::Multicore
261    
262     PROTOTYPES: DISABLE
263    
264     BOOT:
265     {
266 root 1.16 #ifndef _WIN32
267 root 1.1 sigfillset (&fullsigset);
268 root 1.16 #endif
269 root 1.1
270 root 1.12 X_TLS_INIT (current_key);
271 root 1.16 #if RECURSION_CHECK
272     X_TLS_INIT (check_key);
273     #endif
274 root 1.1
275     if (s_epipe_new (&ep))
276     croak ("Coro::Multicore: unable to initialise event pipe.\n");
277    
278     perl_thx = PERL_GET_CONTEXT;
279    
280     I_CORO_API ("Coro::Multicore");
281    
282 root 1.9 X_LOCK (release_m);
283 root 1.6 while (idle < min_idle)
284     start_thread ();
285 root 1.9 X_UNLOCK (release_m);
286 root 1.6
287 root 1.14 /* not perfectly efficient to do it this way, but it is simple */
288     perl_multicore_init (); /* calls release */
289 root 1.1 perl_multicore_api->pmapi_release = pmapi_release;
290     perl_multicore_api->pmapi_acquire = pmapi_acquire;
291     }
292    
293 root 1.4 bool
294     enable (bool enable = NO_INIT)
295     CODE:
296     RETVAL = global_enable;
297     if (items)
298     global_enable = enable;
299     OUTPUT:
300     RETVAL
301    
302     void
303     scoped_enable ()
304     CODE:
305     LEAVE; /* see Guard.xs */
306 root 1.6 CORO_ENTERLEAVE_SCOPE_HOOK (set_thread_enable, (void *)1, set_thread_enable, (void *)0);
307 root 1.4 ENTER; /* see Guard.xs */
308    
309     void
310     scoped_disable ()
311     CODE:
312     LEAVE; /* see Guard.xs */
313 root 1.6 CORO_ENTERLEAVE_SCOPE_HOOK (set_thread_enable, (void *)2, set_thread_enable, (void *)0);
314 root 1.4 ENTER; /* see Guard.xs */
315    
316 root 1.1 U32
317 root 1.6 min_idle_threads (U32 min = NO_INIT)
318 root 1.1 CODE:
319 root 1.9 X_LOCK (acquire_m);
320 root 1.6 RETVAL = min_idle;
321 root 1.1 if (items)
322 root 1.6 min_idle = min;
323 root 1.9 X_UNLOCK (acquire_m);
324 root 1.1 OUTPUT:
325     RETVAL
326    
327    
328     int
329     fd ()
330     CODE:
331     RETVAL = s_epipe_fd (&ep);
332     OUTPUT:
333     RETVAL
334    
335     void
336     poll (...)
337     CODE:
338     s_epipe_drain (&ep);
339 root 1.9 X_LOCK (acquire_m);
340     while (acquirers.cur)
341 root 1.1 {
342 root 1.9 struct tctx *ctx = tctxs_get (&acquirers);
343 root 1.1 CORO_READY ((SV *)ctx->coro);
344     SvREFCNT_dec_NN ((SV *)ctx->coro);
345     ctx->coro = 0;
346     }
347 root 1.9 X_UNLOCK (acquire_m);
348 root 1.1
349     void
350 root 1.2 sleep (NV seconds)
351 root 1.1 CODE:
352     perlinterp_release ();
353 root 1.2 usleep (seconds * 1e6);
354 root 1.1 perlinterp_acquire ();
355 root 1.2