ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/Coro-Multicore/Multicore.xs
Revision: 1.12
Committed: Sun Jul 5 03:50:57 2015 UTC (8 years, 10 months ago) by root
Branch: MAIN
Changes since 1.11: +5 -5 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 #define PERL_NO_GET_CONTEXT
2    
3     #include "EXTERN.h"
4     #include "perl.h"
5     #include "XSUB.h"
6    
7 root 1.7 #define X_STACKSIZE 1024 * sizeof (void *)
8 root 1.1
9     #include "CoroAPI.h"
10     #include "perlmulticore.h"
11     #include "schmorp.h"
12     #include "xthread.h"
13    
14     #ifdef _WIN32
15     typedef char sigset_t;
16     #define pthread_sigmask(mode,new,old)
17     #endif
18    
19 root 1.8 #ifndef SvREFCNT_dec_NN
20     #define SvREFCNT_dec_NN(sv) SvREFCNT_dec (sv)
21     #endif
22    
23     #ifndef SvREFCNT_inc_NN
24     #define SvREFCNT_inc_NN(sv) SvREFCNT_inc (sv)
25     #endif
26    
27 root 1.12 static X_TLS_DECLARE(current_key);
28 root 1.1
29     static s_epipe ep;
30     static void *perl_thx;
31     static sigset_t cursigset, fullsigset;
32    
33 root 1.6 static int global_enable = 0;
34 root 1.4 static int thread_enable; /* 0 undefined, 1 disabled, 2 enabled */
35    
36 root 1.6 /* assigned to a thread for each release/acquire */
37 root 1.1 struct tctx
38     {
39     void *coro;
40 root 1.6 int wait_f;
41 root 1.9 xcond_t acquire_c;
42 root 1.11 int jeret;
43 root 1.1 };
44    
45     static struct tctx *tctx_free;
46    
47     static struct tctx *
48     tctx_get (void)
49     {
50     struct tctx *ctx;
51    
52     if (!tctx_free)
53 root 1.3 {
54     ctx = malloc (sizeof (*tctx_free));
55 root 1.9 X_COND_CREATE (ctx->acquire_c);
56 root 1.3 }
57 root 1.1 else
58     {
59     ctx = tctx_free;
60     tctx_free = tctx_free->coro;
61     }
62    
63     return ctx;
64     }
65    
66     static void
67     tctx_put (struct tctx *ctx)
68     {
69     ctx->coro = tctx_free;
70     tctx_free = ctx;
71     }
72    
73 root 1.9 /* a stack of tctxs */
74     struct tctxs
75     {
76     struct tctx **ctxs;
77     int cur, max;
78     };
79    
80     static struct tctx *
81     tctxs_get (struct tctxs *ctxs)
82     {
83     return ctxs->ctxs[--ctxs->cur];
84     }
85    
86     static void
87     tctxs_put (struct tctxs *ctxs, struct tctx *ctx)
88     {
89     if (ctxs->cur >= ctxs->max)
90     {
91     ctxs->max = ctxs->max ? ctxs->max * 2 : 16;
92     ctxs->ctxs = realloc (ctxs->ctxs, ctxs->max * sizeof (ctxs->ctxs[0]));
93     }
94    
95     ctxs->ctxs[ctxs->cur++] = ctx;
96     }
97    
98     static xmutex_t release_m = X_MUTEX_INIT;
99     static xcond_t release_c = X_COND_INIT;
100     static struct tctxs releasers;
101     static int idle;
102     static int min_idle = 1;
103     static int curthreads, max_threads = 1; /* protected by release_m */
104    
105     static xmutex_t acquire_m = X_MUTEX_INIT;
106     static struct tctxs acquirers;
107    
108 root 1.1 X_THREAD_PROC(thread_proc)
109     {
110     PERL_SET_CONTEXT (perl_thx);
111    
112     {
113     dTHX; /* inefficient, we already have perl_thx, but I see no better way */
114 root 1.11 dJMPENV;
115 root 1.1 struct tctx *ctx;
116 root 1.11 int catchret;
117 root 1.1
118 root 1.9 X_LOCK (release_m);
119 root 1.6
120 root 1.1 for (;;)
121     {
122 root 1.9 while (!releasers.cur)
123 root 1.6 if (idle <= min_idle || 1)
124 root 1.9 X_COND_WAIT (release_c, release_m);
125 root 1.6 else
126     {
127     struct timespec ts = { time (0) + idle - min_idle, 0 };
128    
129 root 1.9 if (X_COND_TIMEDWAIT (release_c, release_m, ts) == ETIMEDOUT)
130     if (idle > min_idle && !releasers.cur)
131 root 1.6 break;
132     }
133    
134 root 1.9 ctx = tctxs_get (&releasers);
135 root 1.6 --idle;
136 root 1.9 X_UNLOCK (release_m);
137 root 1.3
138 root 1.6 if (!ctx) /* timed out? */
139     break;
140    
141 root 1.1 pthread_sigmask (SIG_SETMASK, &cursigset, 0);
142 root 1.11 JMPENV_PUSH (ctx->jeret);
143 root 1.1
144 root 1.11 if (!ctx->jeret)
145     while (ctx->coro)
146     CORO_SCHEDULE;
147 root 1.1
148 root 1.11 JMPENV_POP;
149 root 1.3 pthread_sigmask (SIG_SETMASK, &fullsigset, &cursigset);
150    
151 root 1.9 X_LOCK (acquire_m);
152 root 1.3 ctx->wait_f = 1;
153 root 1.9 X_COND_SIGNAL (ctx->acquire_c);
154     X_UNLOCK (acquire_m);
155 root 1.1
156 root 1.9 X_LOCK (release_m);
157 root 1.6 ++idle;
158 root 1.1 }
159     }
160     }
161    
162     static void
163     start_thread (void)
164     {
165     xthread_t tid;
166    
167 root 1.9 if (curthreads >= max_threads && 0)
168     return;
169    
170     ++curthreads;
171 root 1.6 ++idle;
172 root 1.1 xthread_create (&tid, thread_proc, 0);
173     }
174    
175     static void
176     pmapi_release (void)
177     {
178 root 1.4 if (!(thread_enable ? thread_enable & 1 : global_enable))
179     {
180 root 1.12 X_TLS_SET (current_key, 0);
181 root 1.4 return;
182     }
183    
184 root 1.1 struct tctx *ctx = tctx_get ();
185     ctx->coro = SvREFCNT_inc_NN (CORO_CURRENT);
186 root 1.3 ctx->wait_f = 0;
187    
188 root 1.12 X_TLS_SET (current_key, ctx);
189 root 1.1 pthread_sigmask (SIG_SETMASK, &fullsigset, &cursigset);
190    
191 root 1.9 X_LOCK (release_m);
192 root 1.6
193     if (idle <= min_idle)
194 root 1.1 start_thread ();
195    
196 root 1.9 tctxs_put (&releasers, ctx);
197     X_COND_SIGNAL (release_c);
198    
199     while (!idle && releasers.cur)
200     {
201     X_UNLOCK (release_m);
202     X_LOCK (release_m);
203     }
204 root 1.6
205 root 1.9 X_UNLOCK (release_m);
206 root 1.1 }
207    
208     static void
209     pmapi_acquire (void)
210     {
211 root 1.11 int jeret;
212 root 1.12 struct tctx *ctx = X_TLS_GET (current_key);
213 root 1.1
214 root 1.4 if (!ctx)
215     return;
216    
217 root 1.9 X_LOCK (acquire_m);
218 root 1.1
219 root 1.9 tctxs_put (&acquirers, ctx);
220 root 1.1
221     s_epipe_signal (&ep);
222 root 1.3 while (!ctx->wait_f)
223 root 1.9 X_COND_WAIT (ctx->acquire_c, acquire_m);
224     X_UNLOCK (acquire_m);
225 root 1.1
226 root 1.11 jeret = ctx->jeret;
227 root 1.1 tctx_put (ctx);
228     pthread_sigmask (SIG_SETMASK, &cursigset, 0);
229 root 1.11
230     if (jeret)
231     JMPENV_JUMP (jeret);
232 root 1.1 }
233    
234 root 1.4 static void
235 root 1.6 set_thread_enable (pTHX_ void *arg)
236 root 1.4 {
237 root 1.6 thread_enable = PTR2IV (arg);
238 root 1.4 }
239    
240 root 1.1 MODULE = Coro::Multicore PACKAGE = Coro::Multicore
241    
242     PROTOTYPES: DISABLE
243    
244     BOOT:
245     {
246     #ifndef _WIN32
247     sigfillset (&fullsigset);
248     #endif
249    
250 root 1.12 X_TLS_INIT (current_key);
251 root 1.1
252     if (s_epipe_new (&ep))
253     croak ("Coro::Multicore: unable to initialise event pipe.\n");
254    
255     perl_thx = PERL_GET_CONTEXT;
256    
257     I_CORO_API ("Coro::Multicore");
258    
259 root 1.9 X_LOCK (release_m);
260 root 1.6 while (idle < min_idle)
261     start_thread ();
262 root 1.9 X_UNLOCK (release_m);
263 root 1.6
264 root 1.1 /* not perfectly efficient to do it this way, but it's simple */
265     perl_multicore_init ();
266     perl_multicore_api->pmapi_release = pmapi_release;
267     perl_multicore_api->pmapi_acquire = pmapi_acquire;
268     }
269    
270 root 1.4 bool
271     enable (bool enable = NO_INIT)
272     CODE:
273     RETVAL = global_enable;
274     if (items)
275     global_enable = enable;
276     OUTPUT:
277     RETVAL
278    
279     void
280     scoped_enable ()
281     CODE:
282     LEAVE; /* see Guard.xs */
283 root 1.6 CORO_ENTERLEAVE_SCOPE_HOOK (set_thread_enable, (void *)1, set_thread_enable, (void *)0);
284 root 1.4 ENTER; /* see Guard.xs */
285    
286     void
287     scoped_disable ()
288     CODE:
289     LEAVE; /* see Guard.xs */
290 root 1.6 CORO_ENTERLEAVE_SCOPE_HOOK (set_thread_enable, (void *)2, set_thread_enable, (void *)0);
291 root 1.4 ENTER; /* see Guard.xs */
292    
293 root 1.1 U32
294 root 1.6 min_idle_threads (U32 min = NO_INIT)
295 root 1.1 CODE:
296 root 1.9 X_LOCK (acquire_m);
297 root 1.6 RETVAL = min_idle;
298 root 1.1 if (items)
299 root 1.6 min_idle = min;
300 root 1.9 X_UNLOCK (acquire_m);
301 root 1.1 OUTPUT:
302     RETVAL
303    
304    
305     int
306     fd ()
307     CODE:
308     RETVAL = s_epipe_fd (&ep);
309     OUTPUT:
310     RETVAL
311    
312     void
313     poll (...)
314     CODE:
315     s_epipe_drain (&ep);
316 root 1.9 X_LOCK (acquire_m);
317     while (acquirers.cur)
318 root 1.1 {
319 root 1.9 struct tctx *ctx = tctxs_get (&acquirers);
320 root 1.1 CORO_READY ((SV *)ctx->coro);
321     SvREFCNT_dec_NN ((SV *)ctx->coro);
322     ctx->coro = 0;
323     }
324 root 1.9 X_UNLOCK (acquire_m);
325 root 1.1
326     void
327 root 1.2 sleep (NV seconds)
328 root 1.1 CODE:
329     perlinterp_release ();
330 root 1.2 usleep (seconds * 1e6);
331 root 1.1 perlinterp_acquire ();
332 root 1.2